Skip to main content

matrix_sdk/encryption/
mod.rs

1// Copyright 2021 The Matrix.org Foundation C.I.C.
2// Copyright 2021 Damir Jelić
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![doc = include_str!("../docs/encryption.md")]
17#![cfg_attr(target_family = "wasm", allow(unused_imports))]
18
19#[cfg(feature = "experimental-send-custom-to-device")]
20use std::ops::Deref;
21use std::{
22    collections::{BTreeMap, HashSet},
23    io::{Cursor, Read, Write},
24    iter,
25    path::{Path, PathBuf},
26    str::FromStr,
27    sync::Arc,
28    time::Duration,
29};
30
31use eyeball::{SharedObservable, Subscriber};
32use futures_core::Stream;
33use futures_util::{
34    future::try_join,
35    stream::{self, StreamExt},
36};
37#[cfg(feature = "experimental-send-custom-to-device")]
38use matrix_sdk_base::crypto::CollectStrategy;
39use matrix_sdk_base::{
40    StateStoreDataKey, StateStoreDataValue,
41    cross_process_lock::{
42        AcquireCrossProcessLockFn, CrossProcessLock, CrossProcessLockError, CrossProcessLockState,
43    },
44    crypto::{
45        CrossSigningBootstrapRequests, OlmMachine,
46        store::{
47            LockableCryptoStore, SecretImportError,
48            types::{RoomKeyBundleInfo, RoomKeyInfo},
49        },
50        types::{
51            SecretsBundle, SignedKey,
52            requests::{
53                OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
54            },
55        },
56    },
57    sleep::sleep,
58};
59use matrix_sdk_common::{executor::spawn, locks::Mutex as StdMutex};
60use ruma::{
61    DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, TransactionId, UserId,
62    api::{
63        client::{
64            keys::{
65                get_keys, upload_keys, upload_signatures::v3::Request as UploadSignaturesRequest,
66                upload_signing_keys::v3::Request as UploadSigningKeysRequest,
67            },
68            message::send_message_event,
69            to_device::send_event_to_device::v3::{
70                Request as RumaToDeviceRequest, Response as ToDeviceResponse,
71            },
72            uiaa::{AuthData, AuthType, OAuthParams, UiaaInfo},
73        },
74        error::{ErrorBody, StandardErrorBody},
75    },
76    assign,
77    events::room::{
78        MediaSource, ThumbnailInfo,
79        member::{MembershipChange, OriginalSyncRoomMemberEvent},
80    },
81};
82#[cfg(feature = "experimental-send-custom-to-device")]
83use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices};
84use serde::{Deserialize, de::Error as _};
85use tasks::BundleReceiverTask;
86use tokio::sync::{Mutex, RwLockReadGuard};
87use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
88use tracing::{debug, error, instrument, warn};
89use url::Url;
90use vodozemac::Curve25519PublicKey;
91
92use self::{
93    backups::{Backups, types::BackupClientState},
94    futures::UploadEncryptedFile,
95    identities::{Device, DeviceUpdates, IdentityUpdates, UserDevices, UserIdentity},
96    recovery::{Recovery, RecoveryState},
97    secret_storage::SecretStorage,
98    tasks::{BackupDownloadTask, BackupUploadingTask, ClientTasks},
99    verification::{SasVerification, Verification, VerificationRequest},
100};
101use crate::{
102    Client, Error, HttpError, Result, Room, RumaApiError, TransmissionProgress,
103    attachment::Thumbnail,
104    client::{ClientInner, WeakClient},
105    cross_process_lock::CrossProcessLockGuard,
106    error::HttpResult,
107};
108
109pub mod backups;
110pub mod futures;
111pub mod identities;
112pub mod recovery;
113pub mod secret_storage;
114pub(crate) mod tasks;
115pub mod verification;
116
117pub use matrix_sdk_base::crypto::{
118    CrossSigningStatus, CryptoStoreError, DecryptorError, EventError, KeyExportError, LocalTrust,
119    MediaEncryptionInfo, MegolmError, OlmError, RoomKeyImportResult, SessionCreationError,
120    SignatureError, VERSION,
121    olm::{
122        SessionCreationError as MegolmSessionCreationError,
123        SessionExportError as OlmSessionExportError,
124    },
125    vodozemac,
126};
127use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
128
129#[cfg(feature = "experimental-send-custom-to-device")]
130use crate::config::RequestConfig;
131pub use crate::error::RoomKeyImportError;
132
133/// Error type describing failures that can happen while exporting a
134/// [`SecretsBundle`] from a SQLite store.
135#[cfg(feature = "sqlite")]
136#[derive(Debug, thiserror::Error)]
137pub enum BundleExportError {
138    /// The SQLite store couldn't be opened.
139    #[error(transparent)]
140    OpenStoreError(#[from] matrix_sdk_sqlite::OpenStoreError),
141    /// Data from the SQLite store couldn't be exported.
142    #[error(transparent)]
143    StoreError(#[from] CryptoStoreError),
144    /// The store doesn't contain a secrets bundle or it couldn't be read from
145    /// the store.
146    #[error(transparent)]
147    SecretExport(#[from] matrix_sdk_base::crypto::store::SecretsBundleExportError),
148}
149
150/// Error type describing failures that can happen while importing a
151/// [`SecretsBundle`].
152#[derive(Debug, thiserror::Error)]
153pub enum BundleImportError {
154    /// The bundle couldn't be imported.
155    #[error(transparent)]
156    SecretImport(#[from] SecretImportError),
157    /// The cross-signed device keys couldn't been uploaded.
158    #[error(transparent)]
159    DeviceKeys(#[from] Error),
160}
161
162/// Attempt to export a [`SecretsBundle`] from a crypto store.
163///
164/// This method can be used to retrieve a [`SecretsBundle`] from an existing
165/// `matrix-sdk`-based client in order to import the [`SecretsBundle`] in
166/// another [`Client`] instance.
167///
168/// This can be useful for migration purposes or to allow existing client
169/// instances create new ones that will be fully verified.
170#[cfg(feature = "sqlite")]
171pub async fn export_secrets_bundle_from_store(
172    database_path: impl AsRef<Path>,
173    passphrase: Option<&str>,
174) -> std::result::Result<Option<(OwnedUserId, SecretsBundle)>, BundleExportError> {
175    use matrix_sdk_base::crypto::store::CryptoStore;
176
177    let store = matrix_sdk_sqlite::SqliteCryptoStore::open(database_path, passphrase).await?;
178    let account =
179        store.load_account().await.map_err(|e| BundleExportError::StoreError(e.into()))?;
180
181    if let Some(account) = account {
182        let machine = OlmMachine::with_store(&account.user_id, &account.device_id, store, None)
183            .await
184            .map_err(BundleExportError::StoreError)?;
185
186        let bundle = machine.store().export_secrets_bundle().await?;
187
188        Ok(Some((account.user_id.to_owned(), bundle)))
189    } else {
190        Ok(None)
191    }
192}
193
194/// All the data related to the encryption state.
195pub(crate) struct EncryptionData {
196    /// Background tasks related to encryption (key backup, initialization
197    /// tasks, etc.).
198    pub tasks: StdMutex<ClientTasks>,
199
200    /// End-to-end encryption settings.
201    pub encryption_settings: EncryptionSettings,
202
203    /// All state related to key backup.
204    pub backup_state: BackupClientState,
205
206    /// All state related to secret storage recovery.
207    pub recovery_state: SharedObservable<RecoveryState>,
208}
209
210impl EncryptionData {
211    pub fn new(encryption_settings: EncryptionSettings) -> Self {
212        Self {
213            encryption_settings,
214
215            tasks: StdMutex::new(Default::default()),
216            backup_state: Default::default(),
217            recovery_state: Default::default(),
218        }
219    }
220
221    pub fn initialize_tasks(&self, client: &Arc<ClientInner>) {
222        let weak_client = WeakClient::from_inner(client);
223
224        let mut tasks = self.tasks.lock();
225        tasks.upload_room_keys = Some(BackupUploadingTask::new(weak_client.clone()));
226
227        if self.encryption_settings.backup_download_strategy
228            == BackupDownloadStrategy::AfterDecryptionFailure
229        {
230            tasks.download_room_keys = Some(BackupDownloadTask::new(weak_client));
231        }
232    }
233
234    /// Initialize the background task which listens for changes in the
235    /// [`backups::BackupState`] and updataes the [`recovery::RecoveryState`].
236    ///
237    /// This should happen after the usual tasks have been set up and after the
238    /// E2EE initialization tasks have been set up.
239    pub fn initialize_recovery_state_update_task(&self, client: &Client) {
240        let mut guard = self.tasks.lock();
241
242        let future = Recovery::update_state_after_backup_state_change(client);
243        let join_handle = spawn(future);
244
245        guard.update_recovery_state_after_backup = Some(join_handle);
246    }
247}
248
249/// Settings for end-to-end encryption features.
250#[derive(Clone, Copy, Debug, Default)]
251pub struct EncryptionSettings {
252    /// Automatically bootstrap cross-signing for a user once they're logged, in
253    /// case it's not already done yet.
254    ///
255    /// This requires to login with a username and password, or that MSC3967 is
256    /// enabled on the server, as of 2023-10-20.
257    pub auto_enable_cross_signing: bool,
258
259    /// Select a strategy to download room keys from the backup, by default room
260    /// keys won't be downloaded from the backup automatically.
261    ///
262    /// Take a look at the [`BackupDownloadStrategy`] enum for more options.
263    pub backup_download_strategy: BackupDownloadStrategy,
264
265    /// Automatically create a backup version if no backup exists.
266    pub auto_enable_backups: bool,
267}
268
269/// Settings for end-to-end encryption features.
270#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
271#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
272pub enum BackupDownloadStrategy {
273    /// Automatically download all room keys from the backup when the backup
274    /// recovery key has been received. The backup recovery key can be received
275    /// in two ways:
276    ///
277    /// 1. Received as a `m.secret.send` to-device event, after a successful
278    ///    interactive verification.
279    /// 2. Imported from secret storage (4S) using the
280    ///    [`SecretStore::import_secrets()`] method.
281    ///
282    /// [`SecretStore::import_secrets()`]: crate::encryption::secret_storage::SecretStore::import_secrets
283    OneShot,
284
285    /// Attempt to download a single room key if an event fails to be decrypted.
286    AfterDecryptionFailure,
287
288    /// Don't download any room keys automatically. The user can manually
289    /// download room keys using the [`Backups::download_room_key()`] methods.
290    ///
291    /// This is the default option.
292    #[default]
293    Manual,
294}
295
296/// The verification state of our own device
297///
298/// This enum tells us if our own user identity trusts these devices, in other
299/// words it tells us if the user identity has signed the device.
300#[derive(Clone, Copy, Debug, Eq, PartialEq)]
301pub enum VerificationState {
302    /// The verification state is unknown for now.
303    Unknown,
304    /// The device is considered to be verified, it has been signed by its user
305    /// identity.
306    Verified,
307    /// The device is unverified.
308    Unverified,
309}
310
311/// A stateful struct remembering the cross-signing keys we need to upload.
312///
313/// Since the `/_matrix/client/v3/keys/device_signing/upload` might require
314/// additional authentication, this struct will contain information on the type
315/// of authentication the user needs to complete before the upload might be
316/// continued.
317///
318/// More info can be found in the [spec].
319///
320/// [spec]: https://spec.matrix.org/v1.11/client-server-api/#post_matrixclientv3keysdevice_signingupload
321#[derive(Debug)]
322pub struct CrossSigningResetHandle {
323    client: Client,
324    upload_request: UploadSigningKeysRequest,
325    signatures_request: UploadSignaturesRequest,
326    auth_type: CrossSigningResetAuthType,
327    is_cancelled: Mutex<bool>,
328}
329
330impl CrossSigningResetHandle {
331    /// Set up a new `CrossSigningResetHandle`.
332    pub fn new(
333        client: Client,
334        upload_request: UploadSigningKeysRequest,
335        signatures_request: UploadSignaturesRequest,
336        auth_type: CrossSigningResetAuthType,
337    ) -> Self {
338        Self {
339            client,
340            upload_request,
341            signatures_request,
342            auth_type,
343            is_cancelled: Mutex::new(false),
344        }
345    }
346
347    /// Get the [`CrossSigningResetAuthType`] this cross-signing reset process
348    /// is using.
349    pub fn auth_type(&self) -> &CrossSigningResetAuthType {
350        &self.auth_type
351    }
352
353    /// Continue the cross-signing reset by either waiting for the
354    /// authentication to be done on the side of the OAuth 2.0 server or by
355    /// providing additional [`AuthData`] the homeserver requires.
356    pub async fn auth(&self, auth: Option<AuthData>) -> Result<()> {
357        // Poll to see whether the reset has been authorized twice per second.
358        const RETRY_EVERY: Duration = Duration::from_millis(500);
359
360        // Give up after two minutes of polling.
361        const TIMEOUT: Duration = Duration::from_mins(2);
362
363        tokio::time::timeout(TIMEOUT, async {
364            let mut upload_request = self.upload_request.clone();
365            upload_request.auth = auth;
366
367            debug!(
368                "Repeatedly PUTting to keys/device_signing/upload until it works \
369                or we hit a permanent failure."
370            );
371            while let Err(e) = self.client.send(upload_request.clone()).await {
372                if *self.is_cancelled.lock().await {
373                    return Ok(());
374                }
375
376                match e.as_uiaa_response() {
377                    Some(uiaa_info) => {
378                        if uiaa_info.auth_error.is_some() {
379                            return Err(e.into());
380                        }
381                    }
382                    None => return Err(e.into()),
383                }
384
385                debug!(
386                    "PUT to keys/device_signing/upload failed with 401. Retrying after \
387                    a short delay."
388                );
389                sleep(RETRY_EVERY).await;
390            }
391
392            self.client.send(self.signatures_request.clone()).await?;
393
394            Ok(())
395        })
396        .await
397        .unwrap_or_else(|_| {
398            warn!("Timed out waiting for keys/device_signing/upload to succeed.");
399            Err(Error::Timeout)
400        })
401    }
402
403    /// Cancel the ongoing identity reset process
404    pub async fn cancel(&self) {
405        *self.is_cancelled.lock().await = true;
406    }
407}
408
409/// information about the additional authentication that is required before the
410/// cross-signing keys can be uploaded.
411#[derive(Debug, Clone)]
412pub enum CrossSigningResetAuthType {
413    /// The homeserver requires user-interactive authentication.
414    Uiaa(UiaaInfo),
415    /// OAuth 2.0 is used for authentication and the user needs to open a URL to
416    /// approve the upload of cross-signing keys.
417    OAuth(OAuthCrossSigningResetInfo),
418}
419
420impl CrossSigningResetAuthType {
421    fn new(error: &HttpError) -> Result<Option<Self>> {
422        if let Some(auth_info) = error.as_uiaa_response() {
423            if let Ok(Some(auth_info)) = OAuthCrossSigningResetInfo::from_auth_info(auth_info) {
424                Ok(Some(CrossSigningResetAuthType::OAuth(auth_info)))
425            } else {
426                Ok(Some(CrossSigningResetAuthType::Uiaa(auth_info.clone())))
427            }
428        } else {
429            Ok(None)
430        }
431    }
432}
433
434/// OAuth 2.0 specific information about the required authentication for the
435/// upload of cross-signing keys.
436#[derive(Debug, Clone, Deserialize)]
437pub struct OAuthCrossSigningResetInfo {
438    /// The URL where the user can approve the reset of the cross-signing keys.
439    pub approval_url: Url,
440}
441
442impl OAuthCrossSigningResetInfo {
443    fn from_auth_info(auth_info: &UiaaInfo) -> Result<Option<Self>> {
444        let Some(parameters) = auth_info.params::<OAuthParams>(&AuthType::OAuth)? else {
445            return Ok(None);
446        };
447
448        Ok(Some(OAuthCrossSigningResetInfo { approval_url: parameters.url.as_str().try_into()? }))
449    }
450}
451
452/// A struct that helps to parse the custom error message Synapse posts if a
453/// duplicate one-time key is uploaded.
454#[derive(Clone, Debug)]
455pub struct DuplicateOneTimeKeyErrorMessage {
456    /// The previously uploaded one-time key.
457    pub old_key: Curve25519PublicKey,
458    /// The one-time key we're attempting to upload right now.
459    pub new_key: Curve25519PublicKey,
460}
461
462impl FromStr for DuplicateOneTimeKeyErrorMessage {
463    type Err = serde_json::Error;
464
465    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
466        // First we split the string into two parts, the part containing the old key and
467        // the part containing the new key. The parts are conveniently separated
468        // by a `;` character.
469        let mut split = s.split_terminator(';');
470
471        let old_key = split
472            .next()
473            .ok_or(serde_json::Error::custom("Old key is missing in the error message"))?;
474        let new_key = split
475            .next()
476            .ok_or(serde_json::Error::custom("New key is missing in the error message"))?;
477
478        // Now we remove the lengthy prefix from the part containing the old key, we
479        // should be left with just the JSON of the signed key.
480        let old_key_index = old_key
481            .find("Old key:")
482            .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
483
484        let old_key = old_key[old_key_index..]
485            .trim()
486            .strip_prefix("Old key:")
487            .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
488
489        // The part containing the new key is much simpler, we just remove a static
490        // prefix.
491        let new_key = new_key
492            .trim()
493            .strip_prefix("new key:")
494            .ok_or(serde_json::Error::custom("New key is missing the prefix"))?;
495
496        // The JSON containing the new key is for some reason quoted using single
497        // quotes, so let's replace them with normal double quotes.
498        let new_key = new_key.replace("'", "\"");
499
500        // Let's deserialize now.
501        let old_key: SignedKey = serde_json::from_str(old_key)?;
502        let new_key: SignedKey = serde_json::from_str(&new_key)?;
503
504        // Pick out the Curve keys, we don't care about the rest that much.
505        let old_key = old_key.key();
506        let new_key = new_key.key();
507
508        Ok(Self { old_key, new_key })
509    }
510}
511
512impl Client {
513    pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
514        self.base_client().olm_machine().await
515    }
516
517    pub(crate) async fn mark_request_as_sent(
518        &self,
519        request_id: &TransactionId,
520        response: impl Into<matrix_sdk_base::crypto::types::requests::AnyIncomingResponse<'_>>,
521    ) -> Result<(), matrix_sdk_base::Error> {
522        Ok(self
523            .olm_machine()
524            .await
525            .as_ref()
526            .expect(
527                "We should have an olm machine once we try to mark E2EE related requests as sent",
528            )
529            .mark_request_as_sent(request_id, response)
530            .await?)
531    }
532
533    /// Query the server for users device keys.
534    ///
535    /// # Panics
536    ///
537    /// Panics if no key query needs to be done.
538    #[instrument(skip(self, device_keys))]
539    pub(crate) async fn keys_query(
540        &self,
541        request_id: &TransactionId,
542        device_keys: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
543    ) -> Result<get_keys::v3::Response> {
544        let request = assign!(get_keys::v3::Request::new(), { device_keys });
545
546        let response = self.send(request).await?;
547        self.mark_request_as_sent(request_id, &response).await?;
548        self.encryption().update_state_after_keys_query(&response).await;
549
550        Ok(response)
551    }
552
553    /// Construct a [`EncryptedFile`][ruma::events::room::EncryptedFile] by
554    /// encrypting and uploading a provided reader.
555    ///
556    /// # Arguments
557    ///
558    /// * `content_type` - The content type of the file.
559    /// * `reader` - The reader that should be encrypted and uploaded.
560    ///
561    /// # Examples
562    ///
563    /// ```no_run
564    /// # use matrix_sdk::Client;
565    /// # use url::Url;
566    /// # use matrix_sdk::ruma::{room_id, OwnedRoomId};
567    /// use serde::{Deserialize, Serialize};
568    /// use matrix_sdk::ruma::events::{macros::EventContent, room::EncryptedFile};
569    ///
570    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
571    /// #[ruma_event(type = "com.example.custom", kind = MessageLike)]
572    /// struct CustomEventContent {
573    ///     encrypted_file: EncryptedFile,
574    /// }
575    ///
576    /// # async {
577    /// # let homeserver = Url::parse("http://example.com")?;
578    /// # let client = Client::new(homeserver).await?;
579    /// # let room = client.get_room(&room_id!("!test:example.com")).unwrap();
580    /// let mut reader = std::io::Cursor::new(b"Hello, world!");
581    /// let encrypted_file = client.upload_encrypted_file(&mut reader).await?;
582    ///
583    /// room.send(CustomEventContent { encrypted_file }).await?;
584    /// # anyhow::Ok(()) };
585    /// ```
586    pub fn upload_encrypted_file<'a, R: Read + ?Sized + 'a>(
587        &'a self,
588        reader: &'a mut R,
589    ) -> UploadEncryptedFile<'a, R> {
590        UploadEncryptedFile::new(self, reader)
591    }
592
593    /// Encrypt and upload the file and thumbnails, and return the source
594    /// information.
595    pub(crate) async fn upload_encrypted_media_and_thumbnail(
596        &self,
597        data: &[u8],
598        thumbnail: Option<Thumbnail>,
599        send_progress: SharedObservable<TransmissionProgress>,
600    ) -> Result<(MediaSource, Option<(MediaSource, Box<ThumbnailInfo>)>)> {
601        let upload_thumbnail = self.upload_encrypted_thumbnail(thumbnail, send_progress.clone());
602
603        let upload_attachment = async {
604            let mut cursor = Cursor::new(data);
605            self.upload_encrypted_file(&mut cursor)
606                .with_send_progress_observable(send_progress)
607                .await
608        };
609
610        let (thumbnail, file) = try_join(upload_thumbnail, upload_attachment).await?;
611
612        Ok((MediaSource::Encrypted(Box::new(file)), thumbnail))
613    }
614
615    /// Uploads an encrypted thumbnail to the media repository, and returns
616    /// its source and extra information.
617    async fn upload_encrypted_thumbnail(
618        &self,
619        thumbnail: Option<Thumbnail>,
620        send_progress: SharedObservable<TransmissionProgress>,
621    ) -> Result<Option<(MediaSource, Box<ThumbnailInfo>)>> {
622        let Some(thumbnail) = thumbnail else {
623            return Ok(None);
624        };
625
626        let (data, _, thumbnail_info) = thumbnail.into_parts();
627        let mut cursor = Cursor::new(data);
628
629        let file = self
630            .upload_encrypted_file(&mut cursor)
631            .with_send_progress_observable(send_progress)
632            .await?;
633
634        Ok(Some((MediaSource::Encrypted(Box::new(file)), thumbnail_info)))
635    }
636
637    /// Claim one-time keys creating new Olm sessions.
638    ///
639    /// # Arguments
640    ///
641    /// * `users` - The list of user/device pairs that we should claim keys for.
642    pub(crate) async fn claim_one_time_keys(
643        &self,
644        users: impl Iterator<Item = &UserId>,
645    ) -> Result<()> {
646        let _lock = self.locks().key_claim_lock.lock().await;
647
648        if let Some((request_id, request)) = self
649            .olm_machine()
650            .await
651            .as_ref()
652            .ok_or(Error::NoOlmMachine)?
653            .get_missing_sessions(users)
654            .await?
655        {
656            let response = self.send(request).await?;
657            self.mark_request_as_sent(&request_id, &response).await?;
658        }
659
660        Ok(())
661    }
662
663    /// Upload the E2E encryption keys.
664    ///
665    /// This uploads the long lived device keys as well as the required amount
666    /// of one-time keys.
667    ///
668    /// # Panics
669    ///
670    /// Panics if the client isn't logged in, or if no encryption keys need to
671    /// be uploaded.
672    #[instrument(skip(self, request))]
673    pub(crate) async fn keys_upload(
674        &self,
675        request_id: &TransactionId,
676        request: &upload_keys::v3::Request,
677    ) -> Result<upload_keys::v3::Response> {
678        debug!(
679            device_keys = request.device_keys.is_some(),
680            one_time_key_count = request.one_time_keys.len(),
681            "Uploading public encryption keys",
682        );
683
684        let response = self.send(request.clone()).await?;
685        self.mark_request_as_sent(request_id, &response).await?;
686
687        Ok(response)
688    }
689
690    pub(crate) async fn room_send_helper(
691        &self,
692        request: &RoomMessageRequest,
693    ) -> Result<send_message_event::v3::Response> {
694        let content = request.content.clone();
695        let txn_id = request.txn_id.clone();
696        let room_id = &request.room_id;
697
698        self.get_room(room_id)
699            .expect("Can't send a message to a room that isn't known to the store")
700            .send(*content)
701            .with_transaction_id(txn_id)
702            .await
703            .map(|result| result.response)
704    }
705
706    pub(crate) async fn send_to_device(
707        &self,
708        request: &ToDeviceRequest,
709    ) -> HttpResult<ToDeviceResponse> {
710        let request = RumaToDeviceRequest::new_raw(
711            request.event_type.clone(),
712            request.txn_id.clone(),
713            request.messages.clone(),
714        );
715
716        self.send(request).await
717    }
718
719    pub(crate) async fn send_verification_request(
720        &self,
721        request: OutgoingVerificationRequest,
722    ) -> Result<()> {
723        use matrix_sdk_base::crypto::types::requests::OutgoingVerificationRequest::*;
724
725        match request {
726            ToDevice(t) => {
727                self.send_to_device(&t).await?;
728            }
729            InRoom(r) => {
730                self.room_send_helper(&r).await?;
731            }
732        }
733
734        Ok(())
735    }
736
737    async fn send_outgoing_request(&self, r: OutgoingRequest) -> Result<()> {
738        use matrix_sdk_base::crypto::types::requests::AnyOutgoingRequest;
739
740        match r.request() {
741            AnyOutgoingRequest::KeysQuery(request) => {
742                self.keys_query(r.request_id(), request.device_keys.clone()).await?;
743            }
744            AnyOutgoingRequest::KeysUpload(request) => {
745                let response = self.keys_upload(r.request_id(), request).await;
746
747                if let Err(e) = &response {
748                    match e.as_ruma_api_error() {
749                        Some(RumaApiError::ClientApi(e)) if e.status_code == 400 => {
750                            if let ErrorBody::Standard(StandardErrorBody { message, .. }) = &e.body
751                            {
752                                // This is one of the nastiest errors we can have. The server
753                                // telling us that we already have a one-time key uploaded means
754                                // that we forgot about some of our one-time keys. This will lead to
755                                // UTDs.
756                                {
757                                    let already_reported = self
758                                        .state_store()
759                                        .get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded)
760                                        .await?
761                                        .is_some();
762
763                                    if message.starts_with("One time key") && !already_reported {
764                                        let error_message =
765                                            DuplicateOneTimeKeyErrorMessage::from_str(message);
766
767                                        if let Ok(message) = &error_message {
768                                            error!(
769                                                sentry = true,
770                                                old_key = %message.old_key,
771                                                new_key = %message.new_key,
772                                                "Duplicate one-time keys have been uploaded"
773                                            );
774                                        } else {
775                                            error!(
776                                                sentry = true,
777                                                "Duplicate one-time keys have been uploaded"
778                                            );
779                                        }
780
781                                        self.state_store()
782                                            .set_kv_data(
783                                                StateStoreDataKey::OneTimeKeyAlreadyUploaded,
784                                                StateStoreDataValue::OneTimeKeyAlreadyUploaded,
785                                            )
786                                            .await?;
787
788                                        if let Err(e) = self
789                                            .inner
790                                            .duplicate_key_upload_error_sender
791                                            .send(error_message.ok())
792                                        {
793                                            error!(
794                                                "Failed to dispatch duplicate key upload error notification: {}",
795                                                e
796                                            );
797                                        }
798                                    }
799                                }
800                            }
801                        }
802                        _ => {}
803                    }
804
805                    response?;
806                }
807            }
808            AnyOutgoingRequest::ToDeviceRequest(request) => {
809                let response = self.send_to_device(request).await?;
810                self.mark_request_as_sent(r.request_id(), &response).await?;
811            }
812            AnyOutgoingRequest::SignatureUpload(request) => {
813                let response = self.send(request.clone()).await?;
814                self.mark_request_as_sent(r.request_id(), &response).await?;
815            }
816            AnyOutgoingRequest::RoomMessage(request) => {
817                let response = self.room_send_helper(request).await?;
818                self.mark_request_as_sent(r.request_id(), &response).await?;
819            }
820            AnyOutgoingRequest::KeysClaim(request) => {
821                let response = self.send(request.clone()).await?;
822                self.mark_request_as_sent(r.request_id(), &response).await?;
823            }
824        }
825
826        Ok(())
827    }
828
829    #[instrument(skip_all)]
830    pub(crate) async fn send_outgoing_requests(&self) -> Result<()> {
831        const MAX_CONCURRENT_REQUESTS: usize = 20;
832
833        // This is needed because sometimes we need to automatically
834        // claim some one-time keys to unwedge an existing Olm session.
835        if let Err(e) = self.claim_one_time_keys(iter::empty()).await {
836            warn!("Error while claiming one-time keys {:?}", e);
837        }
838
839        let outgoing_requests = stream::iter(
840            self.olm_machine()
841                .await
842                .as_ref()
843                .ok_or(Error::NoOlmMachine)?
844                .outgoing_requests()
845                .await?,
846        )
847        .map(|r| self.send_outgoing_request(r));
848
849        let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS);
850
851        requests
852            .for_each(|r| async move {
853                match r {
854                    Ok(_) => (),
855                    Err(e) => warn!(error = ?e, "Error when sending out an outgoing E2EE request"),
856                }
857            })
858            .await;
859
860        Ok(())
861    }
862}
863
864#[cfg(any(feature = "testing", test))]
865impl Client {
866    /// Get the olm machine, for testing purposes only.
867    pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
868        self.olm_machine().await
869    }
870
871    /// Aborts the client's bundle receiver task, for testing purposes only.
872    pub fn abort_bundle_receiver_task(&self) {
873        let tasks = self.inner.e2ee.tasks.lock();
874        if let Some(task) = tasks.receive_historic_room_key_bundles.as_ref() {
875            task.abort()
876        }
877    }
878}
879
880/// A high-level API to manage the client's encryption.
881///
882/// To get this, use [`Client::encryption()`].
883#[derive(Debug, Clone)]
884pub struct Encryption {
885    /// The underlying client.
886    client: Client,
887}
888
889impl Encryption {
890    pub(crate) fn new(client: Client) -> Self {
891        Self { client }
892    }
893
894    /// Returns the current encryption settings for this client.
895    pub(crate) fn settings(&self) -> EncryptionSettings {
896        self.client.inner.e2ee.encryption_settings
897    }
898
899    /// Get the public ed25519 key of our own device. This is usually what is
900    /// called the fingerprint of the device.
901    pub async fn ed25519_key(&self) -> Option<String> {
902        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().ed25519.to_base64())
903    }
904
905    /// Get the public Curve25519 key of our own device.
906    pub async fn curve25519_key(&self) -> Option<Curve25519PublicKey> {
907        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().curve25519)
908    }
909
910    /// Get the current device creation timestamp.
911    pub async fn device_creation_timestamp(&self) -> MilliSecondsSinceUnixEpoch {
912        match self.get_own_device().await {
913            Ok(Some(device)) => device.first_time_seen_ts(),
914            // Should not happen, there should always be an own device
915            _ => MilliSecondsSinceUnixEpoch::now(),
916        }
917    }
918
919    /// This method will import all the private cross-signing keys and, if
920    /// available, the private part of a backup key and its accompanying
921    /// version into the store.
922    ///
923    /// Importing all the secrets will mark the device as verified and enable
924    /// backups if a backup key was available in the bundle.
925    ///
926    /// **Warning**: Only import this from a trusted source, i.e. if an existing
927    /// device is sharing this with a new device.
928    ///
929    /// **Warning*: Only call this method right after logging in and before the
930    /// initial sync has been started.
931    pub async fn import_secrets_bundle(
932        &self,
933        bundle: &SecretsBundle,
934    ) -> Result<(), BundleImportError> {
935        self.import_secrets_bundle_impl(bundle).await?;
936
937        // Upload the device keys, this will ensure that other devices see us as a fully
938        // verified device as soon as this method returns.
939        self.ensure_device_keys_upload().await?;
940        self.wait_for_e2ee_initialization_tasks().await;
941
942        // If our initialization tasks completed before we imported the secrets bundle,
943        // backups might not have been enabled.
944        //
945        // In this case attempt to enable them again.
946        if !self.backups().are_enabled().await {
947            self.backups().maybe_resume_backups().await?;
948        }
949
950        Ok(())
951    }
952
953    pub(crate) async fn import_secrets_bundle_impl(
954        &self,
955        bundle: &SecretsBundle,
956    ) -> Result<(), SecretImportError> {
957        let olm_machine = self.client.olm_machine().await;
958        let olm_machine =
959            olm_machine.as_ref().expect("This should only be called once we have an OlmMachine");
960
961        olm_machine.store().import_secrets_bundle(bundle).await
962    }
963
964    /// Get the status of the private cross signing keys.
965    ///
966    /// This can be used to check which private cross signing keys we have
967    /// stored locally.
968    pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
969        let olm = self.client.olm_machine().await;
970        let machine = olm.as_ref()?;
971        Some(machine.cross_signing_status().await)
972    }
973
974    /// Does the user have other devices that the current device can verify
975    /// against?
976    ///
977    /// The device must be signed by the user's cross-signing key, must have an
978    /// identity, and must not be a dehydrated device.
979    pub async fn has_devices_to_verify_against(&self) -> Result<bool> {
980        let olm_machine = self.client.olm_machine().await;
981        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
982        let user_id = olm_machine.user_id();
983
984        self.ensure_initial_key_query().await?;
985
986        let devices = self.get_user_devices(user_id).await?;
987
988        let ret = devices.devices().any(|device| {
989            device.is_cross_signed_by_owner()
990                && device.curve25519_key().is_some()
991                && !device.is_dehydrated()
992        });
993
994        Ok(ret)
995    }
996
997    /// Get all the tracked users we know about
998    ///
999    /// Tracked users are users for which we keep the device list of E2EE
1000    /// capable devices up to date.
1001    pub async fn tracked_users(&self) -> Result<HashSet<OwnedUserId>, CryptoStoreError> {
1002        if let Some(machine) = self.client.olm_machine().await.as_ref() {
1003            machine.tracked_users().await
1004        } else {
1005            Ok(HashSet::new())
1006        }
1007    }
1008
1009    /// Get a [`Subscriber`] for the [`VerificationState`].
1010    ///
1011    /// # Examples
1012    ///
1013    /// ```no_run
1014    /// use matrix_sdk::{Client, encryption};
1015    /// use url::Url;
1016    ///
1017    /// # async {
1018    /// let homeserver = Url::parse("http://example.com")?;
1019    /// let client = Client::new(homeserver).await?;
1020    /// let mut subscriber = client.encryption().verification_state();
1021    ///
1022    /// let current_value = subscriber.get();
1023    ///
1024    /// println!("The current verification state is: {current_value:?}");
1025    ///
1026    /// if let Some(verification_state) = subscriber.next().await {
1027    ///     println!("Received verification state update {:?}", verification_state)
1028    /// }
1029    /// # anyhow::Ok(()) };
1030    /// ```
1031    pub fn verification_state(&self) -> Subscriber<VerificationState> {
1032        self.client.inner.verification_state.subscribe_reset()
1033    }
1034
1035    /// Get a verification object with the given flow id.
1036    pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
1037        let olm = self.client.olm_machine().await;
1038        let olm = olm.as_ref()?;
1039        #[allow(clippy::bind_instead_of_map)]
1040        olm.get_verification(user_id, flow_id).and_then(|v| match v {
1041            matrix_sdk_base::crypto::Verification::SasV1(sas) => {
1042                Some(SasVerification { inner: sas, client: self.client.clone() }.into())
1043            }
1044            #[cfg(feature = "qrcode")]
1045            matrix_sdk_base::crypto::Verification::QrV1(qr) => {
1046                Some(verification::QrVerification { inner: qr, client: self.client.clone() }.into())
1047            }
1048            _ => None,
1049        })
1050    }
1051
1052    /// Get a `VerificationRequest` object for the given user with the given
1053    /// flow id.
1054    pub async fn get_verification_request(
1055        &self,
1056        user_id: &UserId,
1057        flow_id: impl AsRef<str>,
1058    ) -> Option<VerificationRequest> {
1059        let olm = self.client.olm_machine().await;
1060        let olm = olm.as_ref()?;
1061
1062        olm.get_verification_request(user_id, flow_id)
1063            .map(|r| VerificationRequest { inner: r, client: self.client.clone() })
1064    }
1065
1066    /// Get a specific device of a user.
1067    ///
1068    /// # Arguments
1069    ///
1070    /// * `user_id` - The unique id of the user that the device belongs to.
1071    ///
1072    /// * `device_id` - The unique id of the device.
1073    ///
1074    /// Returns a `Device` if one is found and the crypto store didn't throw an
1075    /// error.
1076    ///
1077    /// This will always return None if the client hasn't been logged in.
1078    ///
1079    /// # Examples
1080    ///
1081    /// ```no_run
1082    /// # use matrix_sdk::{Client, ruma::{device_id, user_id}};
1083    /// # use url::Url;
1084    /// # async {
1085    /// # let alice = user_id!("@alice:example.org");
1086    /// # let homeserver = Url::parse("http://example.com")?;
1087    /// # let client = Client::new(homeserver).await?;
1088    /// if let Some(device) =
1089    ///     client.encryption().get_device(alice, device_id!("DEVICEID")).await?
1090    /// {
1091    ///     println!("{:?}", device.is_verified());
1092    ///
1093    ///     if !device.is_verified() {
1094    ///         let verification = device.request_verification().await?;
1095    ///     }
1096    /// }
1097    /// # anyhow::Ok(()) };
1098    /// ```
1099    pub async fn get_device(
1100        &self,
1101        user_id: &UserId,
1102        device_id: &DeviceId,
1103    ) -> Result<Option<Device>, CryptoStoreError> {
1104        let olm = self.client.olm_machine().await;
1105        let Some(machine) = olm.as_ref() else { return Ok(None) };
1106        let device = machine.get_device(user_id, device_id, None).await?;
1107        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
1108    }
1109
1110    /// A convenience method to retrieve your own device from the store.
1111    ///
1112    /// This is the same as calling [`Encryption::get_device()`] with your own
1113    /// user and device ID.
1114    ///
1115    /// This will always return a device, unless you are not logged in.
1116    pub async fn get_own_device(&self) -> Result<Option<Device>, CryptoStoreError> {
1117        let olm = self.client.olm_machine().await;
1118        let Some(machine) = olm.as_ref() else { return Ok(None) };
1119        let device = machine.get_device(machine.user_id(), machine.device_id(), None).await?;
1120        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
1121    }
1122
1123    /// Get a map holding all the devices of an user.
1124    ///
1125    /// This will always return an empty map if the client hasn't been logged
1126    /// in.
1127    ///
1128    /// # Arguments
1129    ///
1130    /// * `user_id` - The unique id of the user that the devices belong to.
1131    ///
1132    /// # Examples
1133    ///
1134    /// ```no_run
1135    /// # use matrix_sdk::{Client, ruma::user_id};
1136    /// # use url::Url;
1137    /// # async {
1138    /// # let alice = user_id!("@alice:example.org");
1139    /// # let homeserver = Url::parse("http://example.com")?;
1140    /// # let client = Client::new(homeserver).await?;
1141    /// let devices = client.encryption().get_user_devices(alice).await?;
1142    ///
1143    /// for device in devices.devices() {
1144    ///     println!("{device:?}");
1145    /// }
1146    /// # anyhow::Ok(()) };
1147    /// ```
1148    pub async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices, Error> {
1149        let devices = self
1150            .client
1151            .olm_machine()
1152            .await
1153            .as_ref()
1154            .ok_or(Error::NoOlmMachine)?
1155            .get_user_devices(user_id, None)
1156            .await?;
1157
1158        Ok(UserDevices { inner: devices, client: self.client.clone() })
1159    }
1160
1161    /// Get the E2EE identity of a user from the crypto store.
1162    ///
1163    /// Usually, we only have the E2EE identity of a user locally if the user
1164    /// is tracked, meaning that we are both members of the same encrypted room.
1165    ///
1166    /// To get the E2EE identity of a user even if it is not available locally
1167    /// use [`Encryption::request_user_identity()`].
1168    ///
1169    /// # Arguments
1170    ///
1171    /// * `user_id` - The unique id of the user that the identity belongs to.
1172    ///
1173    /// Returns a `UserIdentity` if one is found and the crypto store
1174    /// didn't throw an error.
1175    ///
1176    /// This will always return None if the client hasn't been logged in.
1177    ///
1178    /// # Examples
1179    ///
1180    /// ```no_run
1181    /// # use matrix_sdk::{Client, ruma::user_id};
1182    /// # use url::Url;
1183    /// # async {
1184    /// # let alice = user_id!("@alice:example.org");
1185    /// # let homeserver = Url::parse("http://example.com")?;
1186    /// # let client = Client::new(homeserver).await?;
1187    /// let user = client.encryption().get_user_identity(alice).await?;
1188    ///
1189    /// if let Some(user) = user {
1190    ///     println!("{:?}", user.is_verified());
1191    ///
1192    ///     let verification = user.request_verification().await?;
1193    /// }
1194    /// # anyhow::Ok(()) };
1195    /// ```
1196    pub async fn get_user_identity(
1197        &self,
1198        user_id: &UserId,
1199    ) -> Result<Option<UserIdentity>, CryptoStoreError> {
1200        let olm = self.client.olm_machine().await;
1201        let Some(olm) = olm.as_ref() else { return Ok(None) };
1202        let identity = olm.get_identity(user_id, None).await?;
1203
1204        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1205    }
1206
1207    /// Get the E2EE identity of a user from the homeserver.
1208    ///
1209    /// The E2EE identity returned is always guaranteed to be up-to-date. If the
1210    /// E2EE identity is not found, it should mean that the user did not set
1211    /// up cross-signing.
1212    ///
1213    /// If you want the E2EE identity of a user without making a request to the
1214    /// homeserver, use [`Encryption::get_user_identity()`] instead.
1215    ///
1216    /// # Arguments
1217    ///
1218    /// * `user_id` - The ID of the user that the identity belongs to.
1219    ///
1220    /// Returns a [`UserIdentity`] if one is found. Returns an error if there
1221    /// was an issue with the crypto store or with the request to the
1222    /// homeserver.
1223    ///
1224    /// This will always return `None` if the client hasn't been logged in.
1225    ///
1226    /// # Examples
1227    ///
1228    /// ```no_run
1229    /// # use matrix_sdk::{Client, ruma::user_id};
1230    /// # use url::Url;
1231    /// # async {
1232    /// # let alice = user_id!("@alice:example.org");
1233    /// # let homeserver = Url::parse("http://example.com")?;
1234    /// # let client = Client::new(homeserver).await?;
1235    /// let user = client.encryption().request_user_identity(alice).await?;
1236    ///
1237    /// if let Some(user) = user {
1238    ///     println!("User is verified: {:?}", user.is_verified());
1239    ///
1240    ///     let verification = user.request_verification().await?;
1241    /// }
1242    /// # anyhow::Ok(()) };
1243    /// ```
1244    pub async fn request_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
1245        let olm = self.client.olm_machine().await;
1246        let Some(olm) = olm.as_ref() else { return Ok(None) };
1247
1248        let (request_id, request) = olm.query_keys_for_users(iter::once(user_id));
1249        self.client.keys_query(&request_id, request.device_keys).await?;
1250
1251        let identity = olm.get_identity(user_id, None).await?;
1252        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1253    }
1254
1255    /// Returns a stream of device updates, allowing users to listen for
1256    /// notifications about new or changed devices.
1257    ///
1258    /// The stream produced by this method emits updates whenever a new device
1259    /// is discovered or when an existing device's information is changed. Users
1260    /// can subscribe to this stream and receive updates in real-time.
1261    ///
1262    /// # Examples
1263    ///
1264    /// ```no_run
1265    /// # use matrix_sdk::Client;
1266    /// # use ruma::{device_id, user_id};
1267    /// # use futures_util::{pin_mut, StreamExt};
1268    /// # let client: Client = unimplemented!();
1269    /// # async {
1270    /// let devices_stream = client.encryption().devices_stream().await?;
1271    /// let user_id = client
1272    ///     .user_id()
1273    ///     .expect("We should know our user id after we have logged in");
1274    /// pin_mut!(devices_stream);
1275    ///
1276    /// for device_updates in devices_stream.next().await {
1277    ///     if let Some(user_devices) = device_updates.new.get(user_id) {
1278    ///         for device in user_devices.values() {
1279    ///             println!("A new device has been added {}", device.device_id());
1280    ///         }
1281    ///     }
1282    /// }
1283    /// # anyhow::Ok(()) };
1284    /// ```
1285    pub async fn devices_stream(&self) -> Result<impl Stream<Item = DeviceUpdates> + use<>> {
1286        let olm = self.client.olm_machine().await;
1287        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1288        let client = self.client.to_owned();
1289
1290        Ok(olm
1291            .store()
1292            .devices_stream()
1293            .map(move |updates| DeviceUpdates::new(client.to_owned(), updates)))
1294    }
1295
1296    /// Returns a stream of user identity updates, allowing users to listen for
1297    /// notifications about new or changed user identities.
1298    ///
1299    /// The stream produced by this method emits updates whenever a new user
1300    /// identity is discovered or when an existing identities information is
1301    /// changed. Users can subscribe to this stream and receive updates in
1302    /// real-time.
1303    ///
1304    /// # Examples
1305    ///
1306    /// ```no_run
1307    /// # use matrix_sdk::Client;
1308    /// # use ruma::{device_id, user_id};
1309    /// # use futures_util::{pin_mut, StreamExt};
1310    /// # let client: Client = unimplemented!();
1311    /// # async {
1312    /// let identities_stream =
1313    ///     client.encryption().user_identities_stream().await?;
1314    /// pin_mut!(identities_stream);
1315    ///
1316    /// for identity_updates in identities_stream.next().await {
1317    ///     for (_, identity) in identity_updates.new {
1318    ///         println!("A new identity has been added {}", identity.user_id());
1319    ///     }
1320    /// }
1321    /// # anyhow::Ok(()) };
1322    /// ```
1323    pub async fn user_identities_stream(
1324        &self,
1325    ) -> Result<impl Stream<Item = IdentityUpdates> + use<>> {
1326        let olm = self.client.olm_machine().await;
1327        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1328        let client = self.client.to_owned();
1329
1330        Ok(olm
1331            .store()
1332            .user_identities_stream()
1333            .map(move |updates| IdentityUpdates::new(client.to_owned(), updates)))
1334    }
1335
1336    /// Create and upload a new cross signing identity.
1337    ///
1338    /// # Arguments
1339    ///
1340    /// * `auth_data` - This request requires user interactive auth, the first
1341    ///   request needs to set this to `None` and will always fail with an
1342    ///   `UiaaResponse`. The response will contain information for the
1343    ///   interactive auth and the same request needs to be made but this time
1344    ///   with some `auth_data` provided.
1345    ///
1346    /// # Examples
1347    ///
1348    /// ```no_run
1349    /// # use std::collections::BTreeMap;
1350    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1351    /// # use url::Url;
1352    /// # use serde_json::json;
1353    /// # async {
1354    /// # let homeserver = Url::parse("http://example.com")?;
1355    /// # let client = Client::new(homeserver).await?;
1356    /// if let Err(e) = client.encryption().bootstrap_cross_signing(None).await {
1357    ///     if let Some(response) = e.as_uiaa_response() {
1358    ///         let mut password = uiaa::Password::new(
1359    ///             uiaa::UserIdentifier::Matrix(uiaa::MatrixUserIdentifier::new("example".to_owned())),
1360    ///             "wordpass".to_owned(),
1361    ///         );
1362    ///         password.session = response.session.clone();
1363    ///
1364    ///         client
1365    ///             .encryption()
1366    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1367    ///             .await
1368    ///             .expect("Couldn't bootstrap cross signing")
1369    ///     } else {
1370    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1371    ///     }
1372    /// }
1373    /// # anyhow::Ok(()) };
1374    pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData>) -> Result<()> {
1375        let olm = self.client.olm_machine().await;
1376        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1377
1378        let CrossSigningBootstrapRequests {
1379            upload_signing_keys_req,
1380            upload_keys_req,
1381            upload_signatures_req,
1382        } = olm.bootstrap_cross_signing(false).await?;
1383
1384        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1385            auth: auth_data,
1386            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1387            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1388            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1389        });
1390
1391        if let Some(req) = upload_keys_req {
1392            self.client.send_outgoing_request(req).await?;
1393        }
1394        self.client.send(upload_signing_keys_req).await?;
1395        self.client.send(upload_signatures_req).await?;
1396
1397        Ok(())
1398    }
1399
1400    /// Reset the cross-signing keys.
1401    ///
1402    /// # Example
1403    ///
1404    /// ```no_run
1405    /// # use matrix_sdk::{ruma::api::client::uiaa, Client, encryption::CrossSigningResetAuthType};
1406    /// # use url::Url;
1407    /// # async {
1408    /// # let homeserver = Url::parse("http://example.com")?;
1409    /// # let client = Client::new(homeserver).await?;
1410    /// # let user_id = unimplemented!();
1411    /// let encryption = client.encryption();
1412    ///
1413    /// if let Some(handle) = encryption.reset_cross_signing().await? {
1414    ///     match handle.auth_type() {
1415    ///         CrossSigningResetAuthType::Uiaa(uiaa) => {
1416    ///             use matrix_sdk::ruma::api::client::uiaa;
1417    ///
1418    ///             let password = "1234".to_owned();
1419    ///             let mut password = uiaa::Password::new(user_id, password);
1420    ///             password.session = uiaa.session;
1421    ///
1422    ///             handle.auth(Some(uiaa::AuthData::Password(password))).await?;
1423    ///         }
1424    ///         CrossSigningResetAuthType::OAuth(o) => {
1425    ///             println!(
1426    ///                 "To reset your end-to-end encryption cross-signing identity, \
1427    ///                 you first need to approve it at {}",
1428    ///                 o.approval_url
1429    ///             );
1430    ///             handle.auth(None).await?;
1431    ///         }
1432    ///     }
1433    /// }
1434    /// # anyhow::Ok(()) };
1435    /// ```
1436    pub async fn reset_cross_signing(&self) -> Result<Option<CrossSigningResetHandle>> {
1437        let olm = self.client.olm_machine().await;
1438        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1439
1440        let CrossSigningBootstrapRequests {
1441            upload_keys_req,
1442            upload_signing_keys_req,
1443            upload_signatures_req,
1444        } = olm.bootstrap_cross_signing(true).await?;
1445
1446        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1447            auth: None,
1448            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1449            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1450            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1451        });
1452
1453        if let Some(req) = upload_keys_req {
1454            self.client.send_outgoing_request(req).await?;
1455        }
1456
1457        if let Err(error) = self.client.send(upload_signing_keys_req.clone()).await {
1458            if let Ok(Some(auth_type)) = CrossSigningResetAuthType::new(&error) {
1459                let client = self.client.clone();
1460
1461                Ok(Some(CrossSigningResetHandle::new(
1462                    client,
1463                    upload_signing_keys_req,
1464                    upload_signatures_req,
1465                    auth_type,
1466                )))
1467            } else {
1468                Err(error.into())
1469            }
1470        } else {
1471            self.client.send(upload_signatures_req).await?;
1472
1473            Ok(None)
1474        }
1475    }
1476
1477    /// Query the user's own device keys, if, and only if, we didn't have their
1478    /// identity in the first place.
1479    async fn ensure_initial_key_query(&self) -> Result<()> {
1480        let olm_machine = self.client.olm_machine().await;
1481        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1482
1483        let user_id = olm_machine.user_id();
1484
1485        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1486            let (request_id, request) = olm_machine.query_keys_for_users([olm_machine.user_id()]);
1487            self.client.keys_query(&request_id, request.device_keys).await?;
1488        }
1489
1490        Ok(())
1491    }
1492
1493    /// Create and upload a new cross signing identity, if that has not been
1494    /// done yet.
1495    ///
1496    /// This will only create a new cross-signing identity if the user had never
1497    /// done it before. If the user did it before, then this is a no-op.
1498    ///
1499    /// See also the documentation of [`Self::bootstrap_cross_signing`] for the
1500    /// behavior of this function.
1501    ///
1502    /// # Arguments
1503    ///
1504    /// * `auth_data` - This request requires user interactive auth, the first
1505    ///   request needs to set this to `None` and will always fail with an
1506    ///   `UiaaResponse`. The response will contain information for the
1507    ///   interactive auth and the same request needs to be made but this time
1508    ///   with some `auth_data` provided.
1509    ///
1510    /// # Examples
1511    /// ```no_run
1512    /// # use std::collections::BTreeMap;
1513    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1514    /// # use url::Url;
1515    /// # use serde_json::json;
1516    /// # async {
1517    /// # let homeserver = Url::parse("http://example.com")?;
1518    /// # let client = Client::new(homeserver).await?;
1519    /// if let Err(e) = client.encryption().bootstrap_cross_signing_if_needed(None).await {
1520    ///     if let Some(response) = e.as_uiaa_response() {
1521    ///         let mut password = uiaa::Password::new(
1522    ///             uiaa::UserIdentifier::Matrix(uiaa::MatrixUserIdentifier::new("example".to_owned())),
1523    ///             "wordpass".to_owned(),
1524    ///         );
1525    ///         password.session = response.session.clone();
1526    ///
1527    ///         // Note, on the failed attempt we can use `bootstrap_cross_signing` immediately, to
1528    ///         // avoid checks.
1529    ///         client
1530    ///             .encryption()
1531    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1532    ///             .await
1533    ///             .expect("Couldn't bootstrap cross signing")
1534    ///     } else {
1535    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1536    ///     }
1537    /// }
1538    /// # anyhow::Ok(()) };
1539    pub async fn bootstrap_cross_signing_if_needed(
1540        &self,
1541        auth_data: Option<AuthData>,
1542    ) -> Result<()> {
1543        let olm_machine = self.client.olm_machine().await;
1544        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1545        let user_id = olm_machine.user_id();
1546
1547        self.ensure_initial_key_query().await?;
1548
1549        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1550            self.bootstrap_cross_signing(auth_data).await?;
1551        }
1552
1553        Ok(())
1554    }
1555
1556    /// Export E2EE keys that match the given predicate encrypting them with the
1557    /// given passphrase.
1558    ///
1559    /// # Arguments
1560    ///
1561    /// * `path` - The file path where the exported key file will be saved.
1562    ///
1563    /// * `passphrase` - The passphrase that will be used to encrypt the
1564    ///   exported room keys.
1565    ///
1566    /// * `predicate` - A closure that will be called for every known
1567    ///   `InboundGroupSession`, which represents a room key. If the closure
1568    ///   returns `true` the `InboundGroupSessoin` will be included in the
1569    ///   export, if the closure returns `false` it will not be included.
1570    ///
1571    /// # Panics
1572    ///
1573    /// This method will panic if it isn't run on a Tokio runtime.
1574    ///
1575    /// This method will panic if it can't get enough randomness from the OS to
1576    /// encrypt the exported keys securely.
1577    ///
1578    /// # Examples
1579    ///
1580    /// ```no_run
1581    /// # use std::{path::PathBuf, time::Duration};
1582    /// # use matrix_sdk::{
1583    /// #     Client, config::SyncSettings,
1584    /// #     ruma::room_id,
1585    /// # };
1586    /// # use url::Url;
1587    /// # async {
1588    /// # let homeserver = Url::parse("http://localhost:8080")?;
1589    /// # let mut client = Client::new(homeserver).await?;
1590    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1591    /// // Export all room keys.
1592    /// client
1593    ///     .encryption()
1594    ///     .export_room_keys(path, "secret-passphrase", |_| true)
1595    ///     .await?;
1596    ///
1597    /// // Export only the room keys for a certain room.
1598    /// let path = PathBuf::from("/home/example/e2e-room-keys.txt");
1599    /// let room_id = room_id!("!test:localhost");
1600    ///
1601    /// client
1602    ///     .encryption()
1603    ///     .export_room_keys(path, "secret-passphrase", |s| s.room_id() == room_id)
1604    ///     .await?;
1605    /// # anyhow::Ok(()) };
1606    /// ```
1607    #[cfg(not(target_family = "wasm"))]
1608    pub async fn export_room_keys(
1609        &self,
1610        path: PathBuf,
1611        passphrase: &str,
1612        predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
1613    ) -> Result<()> {
1614        let olm = self.client.olm_machine().await;
1615        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1616
1617        let keys = olm.store().export_room_keys(predicate).await?;
1618        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1619
1620        let encrypt = move || -> Result<()> {
1621            let export: String =
1622                matrix_sdk_base::crypto::encrypt_room_key_export(&keys, &passphrase, 500_000)?;
1623            let mut file = std::fs::File::create(path)?;
1624            file.write_all(&export.into_bytes())?;
1625            Ok(())
1626        };
1627
1628        let task = tokio::task::spawn_blocking(encrypt);
1629        task.await.expect("Task join error")
1630    }
1631
1632    /// Import E2EE keys from the given file path.
1633    ///
1634    /// # Arguments
1635    ///
1636    /// * `path` - The file path where the exported key file will can be found.
1637    ///
1638    /// * `passphrase` - The passphrase that should be used to decrypt the
1639    ///   exported room keys.
1640    ///
1641    /// Returns a tuple of numbers that represent the number of sessions that
1642    /// were imported and the total number of sessions that were found in the
1643    /// key export.
1644    ///
1645    /// # Panics
1646    ///
1647    /// This method will panic if it isn't run on a Tokio runtime.
1648    ///
1649    /// ```no_run
1650    /// # use std::{path::PathBuf, time::Duration};
1651    /// # use matrix_sdk::{
1652    /// #     Client, config::SyncSettings,
1653    /// #     ruma::room_id,
1654    /// # };
1655    /// # use url::Url;
1656    /// # async {
1657    /// # let homeserver = Url::parse("http://localhost:8080")?;
1658    /// # let mut client = Client::new(homeserver).await?;
1659    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1660    /// let result =
1661    ///     client.encryption().import_room_keys(path, "secret-passphrase").await?;
1662    ///
1663    /// println!(
1664    ///     "Imported {} room keys out of {}",
1665    ///     result.imported_count, result.total_count
1666    /// );
1667    /// # anyhow::Ok(()) };
1668    /// ```
1669    #[cfg(not(target_family = "wasm"))]
1670    pub async fn import_room_keys(
1671        &self,
1672        path: PathBuf,
1673        passphrase: &str,
1674    ) -> Result<RoomKeyImportResult, RoomKeyImportError> {
1675        let olm = self.client.olm_machine().await;
1676        let olm = olm.as_ref().ok_or(RoomKeyImportError::StoreClosed)?;
1677        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1678
1679        let decrypt = move || {
1680            let file = std::fs::File::open(path)?;
1681            matrix_sdk_base::crypto::decrypt_room_key_export(file, &passphrase)
1682        };
1683
1684        let task = tokio::task::spawn_blocking(decrypt);
1685        let import = task.await.expect("Task join error")?;
1686
1687        let ret = olm.store().import_exported_room_keys(import, |_, _| {}).await?;
1688
1689        self.backups().maybe_trigger_backup();
1690
1691        Ok(ret)
1692    }
1693
1694    /// Receive notifications of room keys being received as a [`Stream`].
1695    ///
1696    /// Each time a room key is updated in any way, an update will be sent to
1697    /// the stream. Updates that happen at the same time are batched into a
1698    /// [`Vec`].
1699    ///
1700    /// If the reader of the stream lags too far behind, an error is broadcast
1701    /// containing the number of skipped items.
1702    ///
1703    /// # Examples
1704    ///
1705    /// ```no_run
1706    /// # use matrix_sdk::Client;
1707    /// # use url::Url;
1708    /// # async {
1709    /// # let homeserver = Url::parse("http://example.com")?;
1710    /// # let client = Client::new(homeserver).await?;
1711    /// use futures_util::StreamExt;
1712    ///
1713    /// let Some(mut room_keys_stream) =
1714    ///     client.encryption().room_keys_received_stream().await
1715    /// else {
1716    ///     return Ok(());
1717    /// };
1718    ///
1719    /// while let Some(update) = room_keys_stream.next().await {
1720    ///     println!("Received room keys {update:?}");
1721    /// }
1722    /// # anyhow::Ok(()) };
1723    /// ```
1724    pub async fn room_keys_received_stream(
1725        &self,
1726    ) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<>>
1727    {
1728        let olm = self.client.olm_machine().await;
1729        let olm = olm.as_ref()?;
1730
1731        Some(olm.store().room_keys_received_stream())
1732    }
1733
1734    /// Receive notifications of historic room key bundles as a [`Stream`].
1735    ///
1736    /// Historic room key bundles are defined in [MSC4268](https://github.com/matrix-org/matrix-spec-proposals/pull/4268).
1737    ///
1738    /// Each time a historic room key bundle was received, an update will be
1739    /// sent to the stream. This stream is useful for informative purposes
1740    /// exclusively, historic room key bundles are handled by the SDK
1741    /// automatically.
1742    ///
1743    /// # Examples
1744    ///
1745    /// ```no_run
1746    /// # use matrix_sdk::Client;
1747    /// # use url::Url;
1748    /// # async {
1749    /// # let homeserver = Url::parse("http://example.com")?;
1750    /// # let client = Client::new(homeserver).await?;
1751    /// use futures_util::StreamExt;
1752    ///
1753    /// let Some(mut bundle_stream) =
1754    ///     client.encryption().historic_room_key_stream().await
1755    /// else {
1756    ///     return Ok(());
1757    /// };
1758    ///
1759    /// while let Some(bundle_info) = bundle_stream.next().await {
1760    ///     println!("Received a historic room key bundle {bundle_info:?}");
1761    /// }
1762    /// # anyhow::Ok(()) };
1763    /// ```
1764    pub async fn historic_room_key_stream(
1765        &self,
1766    ) -> Option<impl Stream<Item = RoomKeyBundleInfo> + use<>> {
1767        let olm = self.client.olm_machine().await;
1768        let olm = olm.as_ref()?;
1769
1770        Some(olm.store().historic_room_key_stream())
1771    }
1772
1773    /// Get the secret storage manager of the client.
1774    pub fn secret_storage(&self) -> SecretStorage {
1775        SecretStorage { client: self.client.to_owned() }
1776    }
1777
1778    /// Get the backups manager of the client.
1779    pub fn backups(&self) -> Backups {
1780        Backups { client: self.client.to_owned() }
1781    }
1782
1783    /// Get the recovery manager of the client.
1784    pub fn recovery(&self) -> Recovery {
1785        Recovery { client: self.client.to_owned() }
1786    }
1787
1788    /// Enables the crypto-store cross-process lock.
1789    ///
1790    /// This may be required if there are multiple processes that may do writes
1791    /// to the same crypto store. In that case, it's necessary to create a
1792    /// lock, so that only one process writes to it, otherwise this may
1793    /// cause confusing issues because of stale data contained in in-memory
1794    /// caches.
1795    ///
1796    /// The provided `lock_value` must be a unique identifier for this process.
1797    /// Use [`Client::cross_process_lock_config`] to get the global value, if
1798    /// multi-process is enabled.
1799    pub async fn enable_cross_process_store_lock(&self, lock_value: String) -> Result<(), Error> {
1800        // If the lock has already been created, don't recreate it from scratch.
1801        if let Some(prev_lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1802            let prev_holder = prev_lock.lock_holder();
1803            if prev_holder.is_some() && prev_holder.unwrap() == lock_value {
1804                return Ok(());
1805            }
1806            warn!(
1807                "Recreating cross-process store lock with a different holder value: \
1808                 prev was {prev_holder:?}, new is {lock_value}"
1809            );
1810        }
1811
1812        let olm_machine = self.client.base_client().olm_machine().await;
1813        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1814
1815        let lock = olm_machine.store().create_store_lock(
1816            "cross_process_lock".to_owned(),
1817            CrossProcessLockConfig::multi_process(lock_value.to_owned()),
1818        );
1819
1820        self.client
1821            .locks()
1822            .cross_process_crypto_store_lock
1823            .set(lock)
1824            .map_err(|_| Error::BadCryptoStoreState)?;
1825
1826        Ok(())
1827    }
1828
1829    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1830    /// spin-waits until the lock is available.
1831    ///
1832    /// May reload the `OlmMachine`, after obtaining the lock but not on the
1833    /// first time.
1834    ///
1835    /// Returns a guard to the lock, if it was obtained.
1836    pub async fn spin_lock_store(
1837        &self,
1838        max_backoff: Option<u32>,
1839    ) -> Result<Option<CrossProcessLockGuard>, Error> {
1840        self.lock_store(async move |lock| lock.spin_lock(max_backoff).await).await
1841    }
1842
1843    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1844    /// attempts to lock it once.
1845    ///
1846    /// May reload the `OlmMachine`, after obtaining the lock but not on the
1847    /// first time.
1848    ///
1849    /// Returns a guard to the lock, if it was obtained.
1850    pub async fn try_lock_store_once(&self) -> Result<Option<CrossProcessLockGuard>, Error> {
1851        self.lock_store(CrossProcessLock::try_lock_once).await
1852    }
1853
1854    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1855    /// locks the store with the given function, `acquire`.
1856    ///
1857    /// Reloads the `OlmMachine` after obtaining the lock, if the lock is dirty.
1858    ///
1859    /// Returns a guard to the lock if it was obtained.
1860    pub async fn lock_store<F: AcquireCrossProcessLockFn<LockableCryptoStore>>(
1861        &self,
1862        acquire: F,
1863    ) -> Result<Option<CrossProcessLockGuard>, Error> {
1864        let wrap_err = |e: CryptoStoreError| {
1865            Error::CrossProcessLockError(Box::new(CrossProcessLockError::TryLock(Arc::new(e))))
1866        };
1867        if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1868            Ok(Some(match acquire(lock).await.map_err(wrap_err)?? {
1869                CrossProcessLockState::Clean(guard) => guard,
1870                CrossProcessLockState::Dirty(guard) => {
1871                    self.client.base_client().regenerate_olm(None).await?;
1872                    guard.clear_dirty();
1873                    guard
1874                }
1875            }))
1876        } else {
1877            Ok(None)
1878        }
1879    }
1880
1881    /// Testing purposes only.
1882    #[cfg(any(test, feature = "testing"))]
1883    pub async fn uploaded_key_count(&self) -> Result<u64> {
1884        let olm_machine = self.client.olm_machine().await;
1885        let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?;
1886        Ok(olm_machine.uploaded_key_count().await?)
1887    }
1888
1889    /// Bootstrap encryption and enables event listeners for the E2EE support.
1890    ///
1891    /// Based on the `EncryptionSettings`, this call might:
1892    /// - Bootstrap cross-signing if needed (POST `/device_signing/upload`)
1893    /// - Create a key backup if needed (POST `/room_keys/version`)
1894    /// - Create a secret storage if needed (PUT `/account_data/{type}`)
1895    ///
1896    /// As part of this process, and if needed, the current device keys would be
1897    /// uploaded to the server, new account data would be added, and cross
1898    /// signing keys and signatures might be uploaded.
1899    ///
1900    /// Should be called once we
1901    /// created a [`OlmMachine`], i.e. after logging in.
1902    ///
1903    /// # Arguments
1904    ///
1905    /// * `auth_data` - Some requests may require re-authentication. To prevent
1906    ///   the user from having to re-enter their password (or use other
1907    ///   methods), we can provide the authentication data here. This is
1908    ///   necessary for uploading cross-signing keys. However, please note that
1909    ///   there is a proposal (MSC3967) to remove this requirement, which would
1910    ///   allow for the initial upload of cross-signing keys without
1911    ///   authentication, rendering this parameter obsolete.
1912    pub(crate) async fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1913        // It's fine to be async here as we're only getting the lock protecting the
1914        // `OlmMachine`. Since the lock shouldn't be that contested right after logging
1915        // in we won't delay the login or restoration of the Client.
1916        let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite {
1917            Some(BundleReceiverTask::new(&self.client).await)
1918        } else {
1919            None
1920        };
1921
1922        let mut tasks = self.client.inner.e2ee.tasks.lock();
1923
1924        let this = self.clone();
1925
1926        tasks.setup_e2ee = Some(spawn(async move {
1927            // Update the current state first, so we don't have to wait for the result of
1928            // network requests
1929            this.update_verification_state().await;
1930
1931            if this.settings().auto_enable_cross_signing
1932                && let Err(e) = this.bootstrap_cross_signing_if_needed(auth_data).await
1933            {
1934                error!("Couldn't bootstrap cross signing {e:?}");
1935            }
1936
1937            if let Err(e) = this.backups().setup_and_resume().await {
1938                error!("Couldn't setup and resume backups {e:?}");
1939            }
1940            if let Err(e) = this.recovery().setup().await {
1941                error!("Couldn't setup and resume recovery {e:?}");
1942            }
1943        }));
1944
1945        tasks.receive_historic_room_key_bundles = bundle_receiver_task;
1946
1947        self.setup_room_membership_session_discard_handler();
1948    }
1949
1950    /// Waits for end-to-end encryption initialization tasks to finish, if any
1951    /// was running in the background.
1952    pub async fn wait_for_e2ee_initialization_tasks(&self) {
1953        let task = self.client.inner.e2ee.tasks.lock().setup_e2ee.take();
1954
1955        if let Some(task) = task
1956            && let Err(err) = task.await
1957        {
1958            warn!("Error when initializing backups: {err}");
1959        }
1960    }
1961
1962    /// Upload the device keys and initial set of one-time keys to the server.
1963    ///
1964    /// This should only be called when the user logs in for the first time,
1965    /// the method will ensure that other devices see our own device as an
1966    /// end-to-end encryption enabled one.
1967    ///
1968    /// **Warning**: Do not use this method if we're already calling
1969    /// [`Client::send_outgoing_request()`]. This method is intended for
1970    /// explicitly uploading the device keys before starting a sync.
1971    pub(crate) async fn ensure_device_keys_upload(&self) -> Result<()> {
1972        let olm = self.client.olm_machine().await;
1973        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1974
1975        if let Some((request_id, request)) = olm.upload_device_keys().await? {
1976            self.client.keys_upload(&request_id, &request).await?;
1977
1978            let (request_id, request) = olm.query_keys_for_users([olm.user_id()]);
1979            self.client.keys_query(&request_id, request.device_keys).await?;
1980        }
1981
1982        Ok(())
1983    }
1984
1985    pub(crate) async fn update_state_after_keys_query(&self, response: &get_keys::v3::Response) {
1986        self.recovery().update_state_after_keys_query(response).await;
1987
1988        // Only update the verification_state if our own devices changed
1989        if let Some(user_id) = self.client.user_id() {
1990            let contains_own_device = response.device_keys.contains_key(user_id);
1991
1992            if contains_own_device {
1993                self.update_verification_state().await;
1994            }
1995        }
1996    }
1997
1998    async fn update_verification_state(&self) {
1999        match self.get_own_device().await {
2000            Ok(device) => {
2001                if let Some(device) = device {
2002                    let is_verified = device.is_cross_signed_by_owner();
2003
2004                    if is_verified {
2005                        self.client.inner.verification_state.set(VerificationState::Verified);
2006                    } else {
2007                        self.client.inner.verification_state.set(VerificationState::Unverified);
2008                    }
2009                } else {
2010                    warn!("Couldn't find out own device in the store.");
2011                    self.client.inner.verification_state.set(VerificationState::Unknown);
2012                }
2013            }
2014            Err(error) => {
2015                warn!("Failed retrieving own device: {error}");
2016                self.client.inner.verification_state.set(VerificationState::Unknown);
2017            }
2018        }
2019    }
2020
2021    /// Sets up a handler to rotate room keys when a user leaves a room.
2022    ///
2023    /// Previously, it was sufficient to check if we need to rotate the room key
2024    /// prior to sending a message. However, the history sharing feature
2025    /// ([MSC4268]) breaks this logic:
2026    ///
2027    /// 1. Alice sends a message M1 in room X;
2028    /// 2. Bob invites Charlie, who joins and immediately leaves the room;
2029    /// 3. Alice sends another message M2 in room X.
2030    ///
2031    /// Under the old logic, Alice would not rotate her key after Charlie
2032    /// leaves, resulting in M2 being encrypted with the same session as M1.
2033    /// This would allow Charlie to decrypt M2 if he ever gains access to
2034    /// the event.
2035    ///
2036    /// This handler listens for changes to the room membership, and discards
2037    /// the current room key if the event is a `leave` event.
2038    ///
2039    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
2040    fn setup_room_membership_session_discard_handler(&self) {
2041        let client = WeakClient::from_client(&self.client);
2042        self.client.add_event_handler(|ev: OriginalSyncRoomMemberEvent, room: Room| async move {
2043            let Some(client) = client.get() else {
2044                // The main client has been dropped.
2045                return;
2046            };
2047            let Some(user_id) = client.user_id() else {
2048                // We aren't logged in, so this shouldn't ever happen.
2049                return;
2050            };
2051            let olm = client.olm_machine().await;
2052            let Some(olm) = olm.as_ref() else {
2053                warn!("Cannot discard session - Olm machine is not available");
2054                return;
2055            };
2056
2057            if !matches!(ev.membership_change(), MembershipChange::Left) || ev.sender == user_id {
2058                // We can ignore non-leave events and those that we sent.
2059                return;
2060            }
2061
2062            debug!(room_id = ?room.room_id(), member_id = ?ev.sender, "Discarding session as a user left the room");
2063
2064            // Attempt to discard the current room key. This won't do anything if we don't have one,
2065            // but that's fine since we will create a new room key whenever we try to send a message.
2066            if let Err(e) = olm.discard_room_key(room.room_id()).await {
2067                warn!(
2068                    room_id = ?room.room_id(),
2069                    "Error discarding room key after member leave: {e:?}"
2070                );
2071            }
2072        });
2073    }
2074
2075    /// Encrypts then send the given content via the `/sendToDevice` end-point
2076    /// using Olm encryption.
2077    ///
2078    /// If there are a lot of recipient devices multiple `/sendToDevice`
2079    /// requests might be sent out.
2080    ///
2081    /// # Returns
2082    /// A list of failures. The list of devices that couldn't get the messages.
2083    #[cfg(feature = "experimental-send-custom-to-device")]
2084    pub async fn encrypt_and_send_raw_to_device(
2085        &self,
2086        recipient_devices: Vec<&Device>,
2087        event_type: &str,
2088        content: Raw<AnyToDeviceEventContent>,
2089        share_strategy: CollectStrategy,
2090    ) -> Result<Vec<(OwnedUserId, OwnedDeviceId)>> {
2091        let users = recipient_devices.iter().map(|device| device.user_id());
2092
2093        // Will claim one-time-key for users that needs it
2094        // TODO: For later optimisation: This will establish missing olm sessions with
2095        // all this users devices, but we just want for some devices.
2096        self.client.claim_one_time_keys(users).await?;
2097
2098        let olm = self.client.olm_machine().await;
2099        let olm = olm.as_ref().expect("Olm machine wasn't started");
2100
2101        let (requests, withhelds) = olm
2102            .encrypt_content_for_devices(
2103                recipient_devices.into_iter().map(|d| d.deref().clone()).collect(),
2104                event_type,
2105                &content
2106                    .deserialize_as::<serde_json::Value>()
2107                    .expect("Deserialize as Value will always work"),
2108                share_strategy,
2109            )
2110            .await?;
2111
2112        let mut failures: Vec<(OwnedUserId, OwnedDeviceId)> = Default::default();
2113
2114        // Push the withhelds in the failures
2115        withhelds.iter().for_each(|(d, _)| {
2116            failures.push((d.user_id().to_owned(), d.device_id().to_owned()));
2117        });
2118
2119        // TODO: parallelize that? it's already grouping 250 devices per chunk.
2120        for request in requests {
2121            let ruma_request = RumaToDeviceRequest::new_raw(
2122                request.event_type.clone(),
2123                request.txn_id.clone(),
2124                request.messages.clone(),
2125            );
2126
2127            let send_result = self
2128                .client
2129                .send_inner(ruma_request, Some(RequestConfig::short_retry()), Default::default())
2130                .await;
2131
2132            // If the sending failed we need to collect the failures to report them
2133            if send_result.is_err() {
2134                // Mark the sending as failed
2135                for (user_id, device_map) in request.messages {
2136                    for device_id in device_map.keys() {
2137                        match device_id {
2138                            DeviceIdOrAllDevices::DeviceId(device_id) => {
2139                                failures.push((user_id.clone(), device_id.to_owned()));
2140                            }
2141                            DeviceIdOrAllDevices::AllDevices => {
2142                                // Cannot happen in this case
2143                            }
2144                        }
2145                    }
2146                }
2147            }
2148        }
2149
2150        Ok(failures)
2151    }
2152}
2153
2154#[cfg(all(test, not(target_family = "wasm")))]
2155mod tests {
2156    use std::{
2157        ops::Not,
2158        str::FromStr,
2159        sync::{
2160            Arc,
2161            atomic::{AtomicBool, Ordering},
2162        },
2163        time::Duration,
2164    };
2165
2166    use matrix_sdk_test::{
2167        DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
2168        event_factory::EventFactory,
2169    };
2170    use ruma::{
2171        event_id,
2172        events::{reaction::ReactionEventContent, relation::Annotation},
2173        user_id,
2174    };
2175    use serde_json::json;
2176    use wiremock::{
2177        Mock, MockServer, Request, ResponseTemplate,
2178        matchers::{header, method, path_regex},
2179    };
2180
2181    use crate::{
2182        Client, Error, assert_next_matches_with_timeout,
2183        config::RequestConfig,
2184        encryption::{
2185            DuplicateOneTimeKeyErrorMessage, OAuthCrossSigningResetInfo, VerificationState,
2186        },
2187        test_utils::{
2188            client::mock_matrix_session, logged_in_client, no_retry_test_client, set_client_session,
2189        },
2190    };
2191
2192    #[async_test]
2193    async fn test_reaction_sending() {
2194        let server = MockServer::start().await;
2195        let client = logged_in_client(Some(server.uri())).await;
2196
2197        let event_id = event_id!("$2:example.org");
2198
2199        Mock::given(method("GET"))
2200            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
2201            .and(header("authorization", "Bearer 1234"))
2202            .respond_with(
2203                ResponseTemplate::new(200)
2204                    .set_body_json(EventFactory::new().room_encryption().into_content()),
2205            )
2206            .mount(&server)
2207            .await;
2208
2209        Mock::given(method("PUT"))
2210            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/m\.reaction/.*".to_owned()))
2211            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2212                "event_id": event_id,
2213            })))
2214            .mount(&server)
2215            .await;
2216
2217        let f = EventFactory::new().sender(user_id!("@example:localhost"));
2218        let response = SyncResponseBuilder::default()
2219            .add_joined_room(
2220                JoinedRoomBuilder::default()
2221                    .add_state_event(
2222                        f.member(user_id!("@example:localhost")).display_name("example"),
2223                    )
2224                    .add_state_event(f.default_power_levels())
2225                    .add_state_event(f.room_encryption()),
2226            )
2227            .build_sync_response();
2228
2229        client.base_client().receive_sync_response(response).await.unwrap();
2230
2231        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
2232        assert!(
2233            room.latest_encryption_state().await.expect("Getting encryption state").is_encrypted()
2234        );
2235
2236        let event_id = event_id!("$1:example.org");
2237        let reaction = ReactionEventContent::new(Annotation::new(event_id.into(), "🐈".to_owned()));
2238        room.send(reaction).await.expect("Sending the reaction should not fail");
2239
2240        room.send_raw("m.reaction", json!({})).await.expect("Sending the reaction should not fail");
2241    }
2242
2243    #[cfg(feature = "sqlite")]
2244    #[async_test]
2245    async fn test_generation_counter_invalidates_olm_machine() {
2246        // Create two clients using the same sqlite database.
2247
2248        use matrix_sdk_base::store::RoomLoadSettings;
2249        let sqlite_path = std::env::temp_dir().join("generation_counter_sqlite.db");
2250        let session = mock_matrix_session();
2251
2252        let client1 = Client::builder()
2253            .homeserver_url("http://localhost:1234")
2254            .request_config(RequestConfig::new().disable_retry())
2255            .sqlite_store(&sqlite_path, None)
2256            .build()
2257            .await
2258            .unwrap();
2259        client1
2260            .matrix_auth()
2261            .restore_session(session.clone(), RoomLoadSettings::default())
2262            .await
2263            .unwrap();
2264
2265        let client2 = Client::builder()
2266            .homeserver_url("http://localhost:1234")
2267            .request_config(RequestConfig::new().disable_retry())
2268            .sqlite_store(sqlite_path, None)
2269            .build()
2270            .await
2271            .unwrap();
2272        client2.matrix_auth().restore_session(session, RoomLoadSettings::default()).await.unwrap();
2273
2274        // When the lock isn't enabled, any attempt at locking won't return a guard.
2275        let guard = client1.encryption().try_lock_store_once().await.unwrap();
2276        assert!(guard.is_none());
2277
2278        client1.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2279        client2.encryption().enable_cross_process_store_lock("client2".to_owned()).await.unwrap();
2280
2281        // One client can take the lock.
2282        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2283        assert!(acquired1.is_some());
2284
2285        // Keep the olm machine, so we can see if it's changed later, by comparing Arcs.
2286        let initial_olm_machine =
2287            client1.olm_machine().await.clone().expect("must have an olm machine");
2288
2289        // Also enable backup to check that new machine has the same backup keys.
2290        let decryption_key = matrix_sdk_base::crypto::store::types::BackupDecryptionKey::new();
2291        let backup_key = decryption_key.megolm_v1_public_key();
2292        backup_key.set_version("1".to_owned());
2293        initial_olm_machine
2294            .backup_machine()
2295            .save_decryption_key(Some(decryption_key.to_owned()), Some("1".to_owned()))
2296            .await
2297            .expect("Should save");
2298
2299        initial_olm_machine.backup_machine().enable_backup_v1(backup_key.clone()).await.unwrap();
2300
2301        assert!(client1.encryption().backups().are_enabled().await);
2302
2303        // The other client can't take the lock too.
2304        let error = client2.encryption().try_lock_store_once().await.unwrap_err();
2305        assert!(matches!(error, Error::CrossProcessLockError(_)));
2306
2307        // Now have the first client release the lock,
2308        drop(acquired1);
2309        tokio::time::sleep(Duration::from_millis(100)).await;
2310
2311        // And re-take it.
2312        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2313        assert!(acquired1.is_some());
2314
2315        // In that case, the Olm Machine shouldn't change.
2316        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2317        assert!(initial_olm_machine.same_as(&olm_machine));
2318
2319        // Ok, release again.
2320        drop(acquired1);
2321        tokio::time::sleep(Duration::from_millis(100)).await;
2322
2323        // Client2 can acquire the lock.
2324        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2325        assert!(acquired2.is_some());
2326
2327        // And then release it.
2328        drop(acquired2);
2329        tokio::time::sleep(Duration::from_millis(100)).await;
2330
2331        // Client1 can acquire it again,
2332        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2333        assert!(acquired1.is_some());
2334
2335        // But now its olm machine has been invalidated and thus regenerated!
2336        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2337
2338        assert!(!initial_olm_machine.same_as(&olm_machine));
2339
2340        let backup_key_new = olm_machine.backup_machine().get_backup_keys().await.unwrap();
2341        assert!(backup_key_new.decryption_key.is_some());
2342        assert_eq!(
2343            backup_key_new.decryption_key.unwrap().megolm_v1_public_key().to_base64(),
2344            backup_key.to_base64()
2345        );
2346        assert!(client1.encryption().backups().are_enabled().await);
2347    }
2348
2349    #[cfg(feature = "sqlite")]
2350    #[async_test]
2351    async fn test_generation_counter_no_spurious_invalidation() {
2352        // Create two clients using the same sqlite database.
2353
2354        use matrix_sdk_base::store::RoomLoadSettings;
2355        let sqlite_path =
2356            std::env::temp_dir().join("generation_counter_no_spurious_invalidations.db");
2357        let session = mock_matrix_session();
2358
2359        let client = Client::builder()
2360            .homeserver_url("http://localhost:1234")
2361            .request_config(RequestConfig::new().disable_retry())
2362            .sqlite_store(&sqlite_path, None)
2363            .build()
2364            .await
2365            .unwrap();
2366        client
2367            .matrix_auth()
2368            .restore_session(session.clone(), RoomLoadSettings::default())
2369            .await
2370            .unwrap();
2371
2372        let initial_olm_machine = client.olm_machine().await.as_ref().unwrap().clone();
2373
2374        client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2375
2376        // Enabling the lock doesn't update the olm machine.
2377        let after_enabling_lock = client.olm_machine().await.as_ref().unwrap().clone();
2378        assert!(initial_olm_machine.same_as(&after_enabling_lock));
2379
2380        {
2381            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2382            assert!(acquired.is_some());
2383        }
2384
2385        // Taking the lock the first time will not update the olm machine.
2386        let after_taking_lock_first_time = client.olm_machine().await.as_ref().unwrap().clone();
2387        assert!(initial_olm_machine.same_as(&after_taking_lock_first_time));
2388
2389        {
2390            // Simulate that another client hold the lock before.
2391            let client2 = Client::builder()
2392                .homeserver_url("http://localhost:1234")
2393                .request_config(RequestConfig::new().disable_retry())
2394                .sqlite_store(sqlite_path, None)
2395                .build()
2396                .await
2397                .unwrap();
2398            client2
2399                .matrix_auth()
2400                .restore_session(session, RoomLoadSettings::default())
2401                .await
2402                .unwrap();
2403
2404            client2
2405                .encryption()
2406                .enable_cross_process_store_lock("client2".to_owned())
2407                .await
2408                .unwrap();
2409
2410            let guard = client2.encryption().spin_lock_store(None).await.unwrap();
2411            assert!(guard.is_some());
2412
2413            drop(guard);
2414            tokio::time::sleep(Duration::from_millis(100)).await;
2415        }
2416
2417        {
2418            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2419            assert!(acquired.is_some());
2420        }
2421
2422        // Taking the lock the second time updates the olm machine.
2423        let after_taking_lock_second_time = client.olm_machine().await.as_ref().unwrap().clone();
2424        assert!(!after_taking_lock_first_time.same_as(&after_taking_lock_second_time));
2425
2426        {
2427            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2428            assert!(acquired.is_some());
2429        }
2430
2431        // Re-taking the lock doesn't update the olm machine.
2432        let after_taking_lock_third_time = client.olm_machine().await.as_ref().unwrap().clone();
2433        assert!(after_taking_lock_second_time.same_as(&after_taking_lock_third_time));
2434    }
2435
2436    #[async_test]
2437    async fn test_update_verification_state_is_updated_before_any_requests_happen() {
2438        // Given a client and a server
2439        let client = no_retry_test_client(None).await;
2440        let server = MockServer::start().await;
2441
2442        // When we subscribe to its verification state
2443        let mut verification_state = client.encryption().verification_state();
2444
2445        // We can get its initial value, and it's Unknown
2446        assert_next_matches_with_timeout!(verification_state, VerificationState::Unknown);
2447
2448        // We set up a mocked request to check this endpoint is not called before
2449        // reading the new state
2450        let keys_requested = Arc::new(AtomicBool::new(false));
2451        let inner_bool = keys_requested.clone();
2452
2453        Mock::given(method("GET"))
2454            .and(path_regex(
2455                r"/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
2456            ))
2457            .respond_with(move |_req: &Request| {
2458                inner_bool.fetch_or(true, Ordering::SeqCst);
2459                ResponseTemplate::new(200).set_body_json(json!({}))
2460            })
2461            .mount(&server)
2462            .await;
2463
2464        // When the session is initialised and the encryption tasks spawn
2465        set_client_session(&client).await;
2466
2467        // Then we can get an updated value without waiting for any network requests
2468        assert!(keys_requested.load(Ordering::SeqCst).not());
2469        assert_next_matches_with_timeout!(verification_state, VerificationState::Unverified);
2470    }
2471
2472    #[test]
2473    fn test_oauth_reset_info_from_uiaa_info() {
2474        let auth_info = json!({
2475            "session": "dummy",
2476            "flows": [
2477                {
2478                    "stages": [
2479                        "org.matrix.cross_signing_reset"
2480                    ]
2481                }
2482            ],
2483            "params": {
2484                "org.matrix.cross_signing_reset": {
2485                    "url": "https://example.org/account/account?action=org.matrix.cross_signing_reset"
2486                }
2487            },
2488            "msg": "To reset..."
2489        });
2490
2491        let auth_info = serde_json::from_value(auth_info)
2492            .expect("We should be able to deserialize the UiaaInfo");
2493        OAuthCrossSigningResetInfo::from_auth_info(&auth_info)
2494            .expect("We should be able to fetch the cross-signing reset info from the auth info");
2495    }
2496
2497    #[test]
2498    fn test_duplicate_one_time_key_error_parsing() {
2499        let message = concat!(
2500            r#"One time key signed_curve25519:AAAAAAAAAAA already exists. "#,
2501            r#"Old key: {"key":"dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs","#,
2502            r#""signatures":{"@example:matrix.org":{"ed25519:AAAAAAAAAA":""#,
2503            r#"Fk45zHAbrd+1j9wZXLjL2Y/+DU/Mnz9yuvlfYBOOT7qExN2Jdud+5BAuNs8nZ/caS4wTF39Kg3zQpzaGERoCBg"}}};"#,
2504            r#" new key: {'key': 'CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno', "#,
2505            r#"'signatures': {'@example:matrix.org': {'ed25519:AAAAAAAAAA': "#,
2506            r#"'BQ9Gp0p+6srF+c8OyruqKKd9R4yaub3THYAyyBB/7X/rG8BwcAqFynzl1aGyFYun4Q+087a5OSiglCXI+/kQAA'}}}"#
2507        );
2508        let message = DuplicateOneTimeKeyErrorMessage::from_str(message)
2509            .expect("We should be able to parse the error message");
2510
2511        assert_eq!(message.old_key.to_base64(), "dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs");
2512        assert_eq!(message.new_key.to_base64(), "CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno");
2513
2514        DuplicateOneTimeKeyErrorMessage::from_str("One time key already exists.")
2515            .expect_err("We shouldn't be able to parse an incomplete error message");
2516    }
2517
2518    // Helper function for the test_devices_to_verify_against_* tests.  Make a
2519    // response to a /keys/query request using the given device keys and a
2520    // pre-defined set of cross-signing keys.
2521    fn devices_to_verify_against_keys_query_response(
2522        devices: Vec<serde_json::Value>,
2523    ) -> serde_json::Value {
2524        let device_keys: serde_json::Map<String, serde_json::Value> = devices
2525            .into_iter()
2526            .map(|device| (device.get("device_id").unwrap().as_str().unwrap().to_owned(), device))
2527            .collect();
2528        json!({
2529            "device_keys": {
2530                "@example:localhost": device_keys,
2531            },
2532            "master_keys": {
2533                "@example:localhost": {
2534                    "keys": {
2535                        "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU",
2536                    },
2537                    "usage": ["master"],
2538                    "user_id": "@example:localhost",
2539                },
2540            },
2541            "self_signing_keys": {
2542                "@example:localhost": {
2543                    "keys": {
2544                        "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM",
2545                    },
2546                    "usage": ["self_signing"],
2547                    "user_id": "@example:localhost",
2548                    "signatures": {
2549                        "@example:localhost": {
2550                            "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "etO1bB+rCk+TQ/FcjQ8eWu/RsRNQNNQ1Ek+PD6//j8yz6igRjfvuHZaMvr/quAFrirfgExph2TdOwlDgN5bFCQ",
2551                        },
2552                    },
2553                },
2554            },
2555            "user_signing_keys": {
2556                "@example:localhost": {
2557                    "keys": {
2558                        "ed25519:CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0": "CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0",
2559                    },
2560                    "usage": ["user_signing"],
2561                    "user_id": "@example:localhost",
2562                    "signatures": {
2563                        "@example:localhost": {
2564                            "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "E/DFi/hQTIb/7eSB+HbCXeTLFaLjqWHzLO9GwjL1qdhfO7ew4p6YdtXSH3T2YYr1dKCPteH/4nMYVwOhww2CBg",
2565                        },
2566                    },
2567                },
2568            }
2569        })
2570    }
2571
2572    // The following three tests test that we can detect whether the user has
2573    // other devices that they can verify against under different conditions.
2574    #[async_test]
2575    /// Test that we detect that can't verify against another device if we have
2576    /// no devices.
2577    async fn test_devices_to_verify_against_no_devices() {
2578        let server = MockServer::start().await;
2579        let client = logged_in_client(Some(server.uri())).await;
2580
2581        Mock::given(method("POST"))
2582            .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2583            .respond_with(
2584                ResponseTemplate::new(200)
2585                    .set_body_json(devices_to_verify_against_keys_query_response(vec![])),
2586            )
2587            .mount(&server)
2588            .await;
2589
2590        assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2591    }
2592
2593    #[async_test]
2594    /// Test that we detect that we can verify against another cross-signed
2595    /// regular device.
2596    async fn test_devices_to_verify_against_cross_signed() {
2597        let server = MockServer::start().await;
2598        let client = logged_in_client(Some(server.uri())).await;
2599
2600        Mock::given(method("POST"))
2601            .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2602            .respond_with(ResponseTemplate::new(200).set_body_json(
2603                devices_to_verify_against_keys_query_response(vec![
2604                    json!({
2605                        "algorithms": [
2606                            "m.olm.v1.curve25519-aes-sha2",
2607                            "m.megolm.v1.aes-sha2",
2608                        ],
2609                        "user_id": "@example:localhost",
2610                        "device_id": "SIGNEDDEVICE",
2611                        "keys": {
2612                            "curve25519:SIGNEDDEVICE": "o1LqUtH/sqd3WF+BB2Qr77uw3sDmZhMOz68/IV9aHxs",
2613                            "ed25519:SIGNEDDEVICE": "iVoEfMOoUqxXVMLdpZCOgvQuCrT3/kQWkBmB3Phi/lo",
2614                        },
2615                        "signatures": {
2616                            "@example:localhost": {
2617                                "ed25519:SIGNEDDEVICE": "C7yRu1fNrdD2EobVdtANMqk3LBtWtTRWrIU22xVS8/Om1kmA/luzek64R3N6JsZhYczVmZYBKhUC9kRvHHwOBg",
2618                                "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "frfh2HP28GclmGvwTic00Fj4nZCvm4RlRA6U56mnD5920hOi04+L055ojzp6ybZXvC/GQYfyTHwQXlUN1nvxBA",
2619                            },
2620                        },
2621                    })
2622                ])
2623            ))
2624            .mount(&server)
2625            .await;
2626
2627        assert!(client.encryption().has_devices_to_verify_against().await.unwrap());
2628    }
2629
2630    #[async_test]
2631    /// Test that we detect that we can't verify against a dehydrated or
2632    /// unsigned device.
2633    async fn test_devices_to_verify_against_dehydrated_and_unsigned() {
2634        let server = MockServer::start().await;
2635        let client = logged_in_client(Some(server.uri())).await;
2636        let user_id = client.user_id().unwrap();
2637        let olm_machine = client.olm_machine().await;
2638        let olm_machine = olm_machine.as_ref().unwrap();
2639
2640        Mock::given(method("POST"))
2641            .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2642            .respond_with(ResponseTemplate::new(200).set_body_json(
2643                devices_to_verify_against_keys_query_response(vec![
2644                    json!({
2645                        "algorithms": [
2646                            "m.olm.v1.curve25519-aes-sha2",
2647                            "m.megolm.v1.aes-sha2",
2648                        ],
2649                        "user_id": "@example:localhost",
2650                        "device_id": "DEHYDRATEDDEVICE",
2651                        "keys": {
2652                            "curve25519:DEHYDRATEDDEVICE": "XOn5VguAgokZ3p9mBz2yOB395fn6j75G8jIPcXEWQGY",
2653                            "ed25519:DEHYDRATEDDEVICE": "4GG5xmBT7z4rgUgmWNlKZ+ABE3QlGgTorF+luCnKfYI",
2654                        },
2655                        "dehydrated": true,
2656                        "signatures": {
2657                            "@example:localhost": {
2658                                "ed25519:DEHYDRATEDDEVICE": "+OMasB7nzVlMV+zRDxkh4h8h/Q0bY42P1SPv7X2IURIelT5G+d+AYSmg30N4maphxEDBqt/vI8/lIr71exc3Dg",
2659                                "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "8DzynAgbYgXX1Md5d4Vw91Zstpoi4dpG7levFeVhi4psCAWuBnV76Qu1s2TGjQQ0CLDXEqcxxuX9X4eUK5TGCg",
2660                            },
2661                        },
2662                    }),
2663                    json!({
2664                        "algorithms": [
2665                            "m.olm.v1.curve25519-aes-sha2",
2666                            "m.megolm.v1.aes-sha2",
2667                        ],
2668                        "user_id": "@example:localhost",
2669                        "device_id": "UNSIGNEDDEVICE",
2670                        "keys": {
2671                            "curve25519:UNSIGNEDDEVICE": "mMby6NpprkHxj+ONfO9Z5lBqVUHJBMkrPFSNJhogBkg",
2672                            "ed25519:UNSIGNEDDEVICE": "Zifq39ZDrlIaSRf0Hh22owEqXCPE+1JSSgs6LDlubwQ",
2673                        },
2674                        "signatures": {
2675                            "@example:localhost": {
2676                                "ed25519:UNSIGNEDDEVICE": "+L29RoDKoTufPGm/Bae65KHno7Z1H7GYhxSKpB4RQZRS7NrR29AMW1PVhEsIozYuDVEFuMZ0L8H3dlcaHxagBA",
2677                            },
2678                        },
2679                    }),
2680                ])
2681            ))
2682            .mount(&server)
2683            .await;
2684
2685        let (request_id, request) = olm_machine.query_keys_for_users([user_id]);
2686        client.keys_query(&request_id, request.device_keys).await.unwrap();
2687
2688        assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2689    }
2690}