Skip to main content

matrix_sdk/encryption/
mod.rs

1// Copyright 2021 The Matrix.org Foundation C.I.C.
2// Copyright 2021 Damir Jelić
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![doc = include_str!("../docs/encryption.md")]
17#![cfg_attr(target_family = "wasm", allow(unused_imports))]
18
19#[cfg(feature = "experimental-send-custom-to-device")]
20use std::ops::Deref;
21use std::{
22    collections::{BTreeMap, HashSet},
23    io::{Cursor, Read, Write},
24    iter,
25    path::{Path, PathBuf},
26    str::FromStr,
27    sync::Arc,
28    time::Duration,
29};
30
31use eyeball::{SharedObservable, Subscriber};
32use futures_core::Stream;
33use futures_util::{
34    future::try_join,
35    stream::{self, StreamExt},
36};
37#[cfg(feature = "experimental-send-custom-to-device")]
38use matrix_sdk_base::crypto::CollectStrategy;
39use matrix_sdk_base::{
40    StateStoreDataKey, StateStoreDataValue,
41    cross_process_lock::{AcquireCrossProcessLockFn, CrossProcessLock, CrossProcessLockError},
42    crypto::{
43        CrossSigningBootstrapRequests, OlmMachine,
44        store::{
45            LockableCryptoStore, SecretImportError,
46            types::{RoomKeyBundleInfo, RoomKeyInfo},
47        },
48        types::{
49            SecretsBundle, SignedKey,
50            requests::{
51                OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
52            },
53        },
54    },
55    sleep::sleep,
56};
57use matrix_sdk_common::{executor::spawn, locks::Mutex as StdMutex};
58use ruma::{
59    DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, TransactionId, UserId,
60    api::{
61        client::{
62            keys::{
63                get_keys, upload_keys, upload_signatures::v3::Request as UploadSignaturesRequest,
64                upload_signing_keys::v3::Request as UploadSigningKeysRequest,
65            },
66            message::send_message_event,
67            to_device::send_event_to_device::v3::{
68                Request as RumaToDeviceRequest, Response as ToDeviceResponse,
69            },
70            uiaa::{AuthData, AuthType, OAuthParams, UiaaInfo},
71        },
72        error::{ErrorBody, StandardErrorBody},
73    },
74    assign,
75    events::room::{
76        MediaSource, ThumbnailInfo,
77        member::{MembershipChange, OriginalSyncRoomMemberEvent},
78    },
79};
80#[cfg(feature = "experimental-send-custom-to-device")]
81use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices};
82use serde::{Deserialize, de::Error as _};
83use tasks::BundleReceiverTask;
84use tokio::sync::{Mutex, RwLockReadGuard};
85use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
86use tracing::{Span, debug, error, instrument, warn};
87use url::Url;
88use vodozemac::Curve25519PublicKey;
89
90use self::{
91    backups::{Backups, types::BackupClientState},
92    futures::UploadEncryptedFile,
93    identities::{Device, DeviceUpdates, IdentityUpdates, UserDevices, UserIdentity},
94    recovery::{Recovery, RecoveryState},
95    secret_storage::SecretStorage,
96    tasks::{BackupDownloadTask, BackupUploadingTask, ClientTasks},
97    verification::{SasVerification, Verification, VerificationRequest},
98};
99use crate::{
100    Client, Error, HttpError, Result, Room, TransmissionProgress,
101    attachment::Thumbnail,
102    client::{ClientInner, WeakClient},
103    cross_process_lock::CrossProcessLockGuard,
104    error::HttpResult,
105};
106
107pub mod backups;
108pub mod futures;
109pub mod identities;
110pub mod recovery;
111pub mod secret_storage;
112pub(crate) mod tasks;
113pub mod verification;
114
115use matrix_sdk_base::crypto::OlmMachineBuilder;
116pub use matrix_sdk_base::crypto::{
117    CrossSigningStatus, CryptoStoreError, DecryptorError, EventError, KeyExportError, LocalTrust,
118    MediaEncryptionInfo, MegolmError, OlmError, RoomKeyImportResult, SessionCreationError,
119    SignatureError, VERSION,
120    olm::{
121        SessionCreationError as MegolmSessionCreationError,
122        SessionExportError as OlmSessionExportError,
123    },
124    vodozemac,
125};
126use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
127
128#[cfg(feature = "experimental-send-custom-to-device")]
129use crate::config::RequestConfig;
130pub use crate::error::RoomKeyImportError;
131
132/// Error type describing failures that can happen while exporting a
133/// [`SecretsBundle`] from a SQLite store.
134#[cfg(feature = "sqlite")]
135#[derive(Debug, thiserror::Error)]
136pub enum BundleExportError {
137    /// The SQLite store couldn't be opened.
138    #[error(transparent)]
139    OpenStoreError(#[from] matrix_sdk_sqlite::OpenStoreError),
140    /// Data from the SQLite store couldn't be exported.
141    #[error(transparent)]
142    StoreError(#[from] CryptoStoreError),
143    /// The store doesn't contain a secrets bundle or it couldn't be read from
144    /// the store.
145    #[error(transparent)]
146    SecretExport(#[from] matrix_sdk_base::crypto::store::SecretsBundleExportError),
147}
148
149/// Error type describing failures that can happen while importing a
150/// [`SecretsBundle`].
151#[derive(Debug, thiserror::Error)]
152pub enum BundleImportError {
153    /// The bundle couldn't be imported.
154    #[error(transparent)]
155    SecretImport(#[from] SecretImportError),
156    /// The cross-signed device keys couldn't been uploaded.
157    #[error(transparent)]
158    DeviceKeys(#[from] Error),
159}
160
161/// Attempt to export a [`SecretsBundle`] from a crypto store.
162///
163/// This method can be used to retrieve a [`SecretsBundle`] from an existing
164/// `matrix-sdk`-based client in order to import the [`SecretsBundle`] in
165/// another [`Client`] instance.
166///
167/// This can be useful for migration purposes or to allow existing client
168/// instances create new ones that will be fully verified.
169#[cfg(feature = "sqlite")]
170pub async fn export_secrets_bundle_from_store(
171    database_path: impl AsRef<Path>,
172    passphrase: Option<&str>,
173) -> std::result::Result<Option<(OwnedUserId, SecretsBundle)>, BundleExportError> {
174    use matrix_sdk_base::crypto::store::CryptoStore;
175
176    let store = matrix_sdk_sqlite::SqliteCryptoStore::open(database_path, passphrase).await?;
177    let account =
178        store.load_account().await.map_err(|e| BundleExportError::StoreError(e.into()))?;
179
180    if let Some(account) = account {
181        let machine = OlmMachineBuilder::new(&account.user_id, &account.device_id)
182            .with_crypto_store(store)
183            .build()
184            .await
185            .map_err(BundleExportError::StoreError)?;
186
187        let bundle = machine.store().export_secrets_bundle().await?;
188
189        Ok(Some((account.user_id.to_owned(), bundle)))
190    } else {
191        Ok(None)
192    }
193}
194
195/// All the data related to the encryption state.
196pub(crate) struct EncryptionData {
197    /// Background tasks related to encryption (key backup, initialization
198    /// tasks, etc.).
199    pub tasks: StdMutex<ClientTasks>,
200
201    /// End-to-end encryption settings.
202    pub encryption_settings: EncryptionSettings,
203
204    /// All state related to key backup.
205    pub backup_state: BackupClientState,
206
207    /// All state related to secret storage recovery.
208    pub recovery_state: SharedObservable<RecoveryState>,
209}
210
211impl EncryptionData {
212    pub fn new(encryption_settings: EncryptionSettings) -> Self {
213        Self {
214            encryption_settings,
215
216            tasks: StdMutex::new(Default::default()),
217            backup_state: Default::default(),
218            recovery_state: Default::default(),
219        }
220    }
221
222    pub fn initialize_tasks(&self, client: &Arc<ClientInner>) {
223        let weak_client = WeakClient::from_inner(client);
224
225        let mut tasks = self.tasks.lock();
226        tasks.upload_room_keys = Some(BackupUploadingTask::new(weak_client.clone()));
227
228        if self.encryption_settings.backup_download_strategy
229            == BackupDownloadStrategy::AfterDecryptionFailure
230        {
231            tasks.download_room_keys = Some(BackupDownloadTask::new(weak_client));
232        }
233    }
234
235    /// Initialize the background task which listens for changes in the
236    /// [`backups::BackupState`] and updataes the [`recovery::RecoveryState`].
237    ///
238    /// This should happen after the usual tasks have been set up and after the
239    /// E2EE initialization tasks have been set up.
240    pub fn initialize_recovery_state_update_task(&self, client: &Client) {
241        let mut guard = self.tasks.lock();
242
243        let future = Recovery::update_state_after_backup_state_change(client);
244        let join_handle = spawn(future);
245
246        guard.update_recovery_state_after_backup = Some(join_handle);
247    }
248}
249
250/// Settings for end-to-end encryption features.
251#[derive(Clone, Copy, Debug, Default)]
252pub struct EncryptionSettings {
253    /// Automatically bootstrap cross-signing for a user once they're logged, in
254    /// case it's not already done yet.
255    ///
256    /// This requires to login with a username and password, or that MSC3967 is
257    /// enabled on the server, as of 2023-10-20.
258    pub auto_enable_cross_signing: bool,
259
260    /// Select a strategy to download room keys from the backup, by default room
261    /// keys won't be downloaded from the backup automatically.
262    ///
263    /// Take a look at the [`BackupDownloadStrategy`] enum for more options.
264    pub backup_download_strategy: BackupDownloadStrategy,
265
266    /// Automatically create a backup version if no backup exists.
267    pub auto_enable_backups: bool,
268}
269
270/// Settings for end-to-end encryption features.
271#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
272#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
273pub enum BackupDownloadStrategy {
274    /// Automatically download all room keys from the backup when the backup
275    /// recovery key has been received. The backup recovery key can be received
276    /// in two ways:
277    ///
278    /// 1. Received as a `m.secret.send` to-device event, after a successful
279    ///    interactive verification.
280    /// 2. Imported from secret storage (4S) using the
281    ///    [`SecretStore::import_secrets()`] method.
282    ///
283    /// [`SecretStore::import_secrets()`]: crate::encryption::secret_storage::SecretStore::import_secrets
284    OneShot,
285
286    /// Attempt to download a single room key if an event fails to be decrypted.
287    AfterDecryptionFailure,
288
289    /// Don't download any room keys automatically. The user can manually
290    /// download room keys using the [`Backups::download_room_key()`] methods.
291    ///
292    /// This is the default option.
293    #[default]
294    Manual,
295}
296
297/// The verification state of our own device
298///
299/// This enum tells us if our own user identity trusts these devices, in other
300/// words it tells us if the user identity has signed the device.
301#[derive(Clone, Copy, Debug, Eq, PartialEq)]
302pub enum VerificationState {
303    /// The verification state is unknown for now.
304    Unknown,
305    /// The device is considered to be verified, it has been signed by its user
306    /// identity.
307    Verified,
308    /// The device is unverified.
309    Unverified,
310}
311
312/// A stateful struct remembering the cross-signing keys we need to upload.
313///
314/// Since the `/_matrix/client/v3/keys/device_signing/upload` might require
315/// additional authentication, this struct will contain information on the type
316/// of authentication the user needs to complete before the upload might be
317/// continued.
318///
319/// More info can be found in the [spec].
320///
321/// [spec]: https://spec.matrix.org/v1.11/client-server-api/#post_matrixclientv3keysdevice_signingupload
322#[derive(Debug)]
323pub struct CrossSigningResetHandle {
324    client: Client,
325    upload_request: UploadSigningKeysRequest,
326    signatures_request: UploadSignaturesRequest,
327    auth_type: CrossSigningResetAuthType,
328    is_cancelled: Mutex<bool>,
329}
330
331impl CrossSigningResetHandle {
332    /// Set up a new `CrossSigningResetHandle`.
333    pub fn new(
334        client: Client,
335        upload_request: UploadSigningKeysRequest,
336        signatures_request: UploadSignaturesRequest,
337        auth_type: CrossSigningResetAuthType,
338    ) -> Self {
339        Self {
340            client,
341            upload_request,
342            signatures_request,
343            auth_type,
344            is_cancelled: Mutex::new(false),
345        }
346    }
347
348    /// Get the [`CrossSigningResetAuthType`] this cross-signing reset process
349    /// is using.
350    pub fn auth_type(&self) -> &CrossSigningResetAuthType {
351        &self.auth_type
352    }
353
354    /// Continue the cross-signing reset by either waiting for the
355    /// authentication to be done on the side of the OAuth 2.0 server or by
356    /// providing additional [`AuthData`] the homeserver requires.
357    pub async fn auth(&self, auth: Option<AuthData>) -> Result<()> {
358        // Poll to see whether the reset has been authorized twice per second.
359        const RETRY_EVERY: Duration = Duration::from_millis(500);
360
361        // Give up after two minutes of polling.
362        const TIMEOUT: Duration = Duration::from_mins(2);
363
364        tokio::time::timeout(TIMEOUT, async {
365            let mut upload_request = self.upload_request.clone();
366            upload_request.auth = auth;
367
368            debug!(
369                "Repeatedly PUTting to keys/device_signing/upload until it works \
370                or we hit a permanent failure."
371            );
372            while let Err(e) = self.client.send(upload_request.clone()).await {
373                if *self.is_cancelled.lock().await {
374                    return Ok(());
375                }
376
377                match e.as_uiaa_response() {
378                    Some(uiaa_info) => {
379                        // Return the error except if we are at the `m.oauth` stage where we want to
380                        // keep polling.
381                        if !matches!(self.auth_type, CrossSigningResetAuthType::OAuth(_))
382                            && uiaa_info.auth_error.is_some()
383                        {
384                            return Err(e.into());
385                        }
386                    }
387                    None => return Err(e.into()),
388                }
389
390                debug!(
391                    "PUT to keys/device_signing/upload failed with 401. Retrying after \
392                    a short delay."
393                );
394                sleep(RETRY_EVERY).await;
395            }
396
397            self.client.send(self.signatures_request.clone()).await?;
398
399            Ok(())
400        })
401        .await
402        .unwrap_or_else(|_| {
403            warn!("Timed out waiting for keys/device_signing/upload to succeed.");
404            Err(Error::Timeout)
405        })
406    }
407
408    /// Cancel the ongoing identity reset process
409    pub async fn cancel(&self) {
410        *self.is_cancelled.lock().await = true;
411    }
412}
413
414/// information about the additional authentication that is required before the
415/// cross-signing keys can be uploaded.
416#[derive(Debug, Clone)]
417pub enum CrossSigningResetAuthType {
418    /// The homeserver requires user-interactive authentication.
419    Uiaa(UiaaInfo),
420    /// OAuth 2.0 is used for authentication and the user needs to open a URL to
421    /// approve the upload of cross-signing keys.
422    OAuth(OAuthCrossSigningResetInfo),
423}
424
425impl CrossSigningResetAuthType {
426    fn new(error: &HttpError) -> Result<Option<Self>> {
427        if let Some(auth_info) = error.as_uiaa_response() {
428            if let Ok(Some(auth_info)) = OAuthCrossSigningResetInfo::from_auth_info(auth_info) {
429                Ok(Some(CrossSigningResetAuthType::OAuth(auth_info)))
430            } else {
431                Ok(Some(CrossSigningResetAuthType::Uiaa(auth_info.clone())))
432            }
433        } else {
434            Ok(None)
435        }
436    }
437}
438
439/// OAuth 2.0 specific information about the required authentication for the
440/// upload of cross-signing keys.
441#[derive(Debug, Clone, Deserialize)]
442pub struct OAuthCrossSigningResetInfo {
443    /// The URL where the user can approve the reset of the cross-signing keys.
444    pub approval_url: Url,
445
446    /// Session key to use to complete the authentication.
447    pub session: Option<String>,
448}
449
450impl OAuthCrossSigningResetInfo {
451    fn from_auth_info(auth_info: &UiaaInfo) -> Result<Option<Self>> {
452        let Some(parameters) = auth_info.params::<OAuthParams>(&AuthType::OAuth)? else {
453            return Ok(None);
454        };
455
456        Ok(Some(OAuthCrossSigningResetInfo {
457            approval_url: parameters.url.as_str().try_into()?,
458            session: auth_info.session.clone(),
459        }))
460    }
461}
462
463/// A struct that helps to parse the custom error message Synapse posts if a
464/// duplicate one-time key is uploaded.
465#[derive(Clone, Debug)]
466pub struct DuplicateOneTimeKeyErrorMessage {
467    /// The previously uploaded one-time key.
468    pub old_key: Curve25519PublicKey,
469    /// The one-time key we're attempting to upload right now.
470    pub new_key: Curve25519PublicKey,
471}
472
473impl FromStr for DuplicateOneTimeKeyErrorMessage {
474    type Err = serde_json::Error;
475
476    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
477        // First we split the string into two parts, the part containing the old key and
478        // the part containing the new key. The parts are conveniently separated
479        // by a `;` character.
480        let mut split = s.split_terminator(';');
481
482        let old_key = split
483            .next()
484            .ok_or(serde_json::Error::custom("Old key is missing in the error message"))?;
485        let new_key = split
486            .next()
487            .ok_or(serde_json::Error::custom("New key is missing in the error message"))?;
488
489        // Now we remove the lengthy prefix from the part containing the old key, we
490        // should be left with just the JSON of the signed key.
491        let old_key_index = old_key
492            .find("Old key:")
493            .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
494
495        let old_key = old_key[old_key_index..]
496            .trim()
497            .strip_prefix("Old key:")
498            .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
499
500        // The part containing the new key is much simpler, we just remove a static
501        // prefix.
502        let new_key = new_key
503            .trim()
504            .strip_prefix("new key:")
505            .ok_or(serde_json::Error::custom("New key is missing the prefix"))?;
506
507        // The JSON containing the new key is for some reason quoted using single
508        // quotes, so let's replace them with normal double quotes.
509        let new_key = new_key.replace("'", "\"");
510
511        // Let's deserialize now.
512        let old_key: SignedKey = serde_json::from_str(old_key)?;
513        let new_key: SignedKey = serde_json::from_str(&new_key)?;
514
515        // Pick out the Curve keys, we don't care about the rest that much.
516        let old_key = old_key.key();
517        let new_key = new_key.key();
518
519        Ok(Self { old_key, new_key })
520    }
521}
522
523impl Client {
524    pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
525        self.base_client().olm_machine().await
526    }
527
528    pub(crate) async fn mark_request_as_sent(
529        &self,
530        request_id: &TransactionId,
531        response: impl Into<matrix_sdk_base::crypto::types::requests::AnyIncomingResponse<'_>>,
532    ) -> Result<(), matrix_sdk_base::Error> {
533        Ok(self
534            .olm_machine()
535            .await
536            .as_ref()
537            .expect(
538                "We should have an olm machine once we try to mark E2EE related requests as sent",
539            )
540            .mark_request_as_sent(request_id, response)
541            .await?)
542    }
543
544    /// Query the server for users device keys.
545    ///
546    /// # Panics
547    ///
548    /// Panics if no key query needs to be done.
549    #[instrument(skip(self, device_keys))]
550    pub(crate) async fn keys_query(
551        &self,
552        request_id: &TransactionId,
553        device_keys: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
554    ) -> Result<get_keys::v3::Response> {
555        let request = assign!(get_keys::v3::Request::new(), { device_keys });
556
557        let response = self.send(request).await?;
558        self.mark_request_as_sent(request_id, &response).await?;
559        self.encryption().update_state_after_keys_query(&response).await;
560
561        Ok(response)
562    }
563
564    /// Construct a [`EncryptedFile`][ruma::events::room::EncryptedFile] by
565    /// encrypting and uploading a provided reader.
566    ///
567    /// # Arguments
568    ///
569    /// * `content_type` - The content type of the file.
570    /// * `reader` - The reader that should be encrypted and uploaded.
571    ///
572    /// # Examples
573    ///
574    /// ```no_run
575    /// # use matrix_sdk::Client;
576    /// # use url::Url;
577    /// # use matrix_sdk::ruma::{room_id, OwnedRoomId};
578    /// use serde::{Deserialize, Serialize};
579    /// use matrix_sdk::ruma::events::{macros::EventContent, room::EncryptedFile};
580    ///
581    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
582    /// #[ruma_event(type = "com.example.custom", kind = MessageLike)]
583    /// struct CustomEventContent {
584    ///     encrypted_file: EncryptedFile,
585    /// }
586    ///
587    /// # async {
588    /// # let homeserver = Url::parse("http://example.com")?;
589    /// # let client = Client::new(homeserver).await?;
590    /// # let room = client.get_room(&room_id!("!test:example.com")).unwrap();
591    /// let mut reader = std::io::Cursor::new(b"Hello, world!");
592    /// let encrypted_file = client.upload_encrypted_file(&mut reader).await?;
593    ///
594    /// room.send(CustomEventContent { encrypted_file }).await?;
595    /// # anyhow::Ok(()) };
596    /// ```
597    pub fn upload_encrypted_file<'a, R: Read + ?Sized + 'a>(
598        &'a self,
599        reader: &'a mut R,
600    ) -> UploadEncryptedFile<'a, R> {
601        UploadEncryptedFile::new(self, reader)
602    }
603
604    /// Encrypt and upload the file and thumbnails, and return the source
605    /// information.
606    pub(crate) async fn upload_encrypted_media_and_thumbnail(
607        &self,
608        data: &[u8],
609        thumbnail: Option<Thumbnail>,
610        send_progress: SharedObservable<TransmissionProgress>,
611    ) -> Result<(MediaSource, Option<(MediaSource, Box<ThumbnailInfo>)>)> {
612        let upload_thumbnail = self.upload_encrypted_thumbnail(thumbnail, send_progress.clone());
613
614        let upload_attachment = async {
615            let mut cursor = Cursor::new(data);
616            self.upload_encrypted_file(&mut cursor)
617                .with_send_progress_observable(send_progress)
618                .await
619        };
620
621        let (thumbnail, file) = try_join(upload_thumbnail, upload_attachment).await?;
622
623        Ok((MediaSource::Encrypted(Box::new(file)), thumbnail))
624    }
625
626    /// Uploads an encrypted thumbnail to the media repository, and returns
627    /// its source and extra information.
628    async fn upload_encrypted_thumbnail(
629        &self,
630        thumbnail: Option<Thumbnail>,
631        send_progress: SharedObservable<TransmissionProgress>,
632    ) -> Result<Option<(MediaSource, Box<ThumbnailInfo>)>> {
633        let Some(thumbnail) = thumbnail else {
634            return Ok(None);
635        };
636
637        let (data, _, thumbnail_info) = thumbnail.into_parts();
638        let mut cursor = Cursor::new(data);
639
640        let file = self
641            .upload_encrypted_file(&mut cursor)
642            .with_send_progress_observable(send_progress)
643            .await?;
644
645        Ok(Some((MediaSource::Encrypted(Box::new(file)), thumbnail_info)))
646    }
647
648    /// Claim one-time keys creating new Olm sessions.
649    ///
650    /// # Arguments
651    ///
652    /// * `users` - The list of user/device pairs that we should claim keys for.
653    pub(crate) async fn claim_one_time_keys(
654        &self,
655        users: impl Iterator<Item = &UserId>,
656    ) -> Result<()> {
657        let _lock = self.locks().key_claim_lock.lock().await;
658
659        if let Some((request_id, request)) = self
660            .olm_machine()
661            .await
662            .as_ref()
663            .ok_or(Error::NoOlmMachine)?
664            .get_missing_sessions(users)
665            .await?
666        {
667            let response = self.send(request).await?;
668            self.mark_request_as_sent(&request_id, &response).await?;
669        }
670
671        Ok(())
672    }
673
674    /// Upload the E2E encryption keys.
675    ///
676    /// This uploads the long lived device keys as well as the required amount
677    /// of one-time keys.
678    ///
679    /// # Panics
680    ///
681    /// Panics if the client isn't logged in, or if no encryption keys need to
682    /// be uploaded.
683    #[instrument(skip(self, request))]
684    pub(crate) async fn keys_upload(
685        &self,
686        request_id: &TransactionId,
687        request: &upload_keys::v3::Request,
688    ) -> Result<upload_keys::v3::Response> {
689        debug!(
690            device_keys = request.device_keys.is_some(),
691            one_time_key_count = request.one_time_keys.len(),
692            "Uploading public encryption keys",
693        );
694
695        let response = self.send(request.clone()).await?;
696        self.mark_request_as_sent(request_id, &response).await?;
697
698        Ok(response)
699    }
700
701    pub(crate) async fn room_send_helper(
702        &self,
703        request: &RoomMessageRequest,
704    ) -> Result<send_message_event::v3::Response> {
705        let content = request.content.clone();
706        let txn_id = request.txn_id.clone();
707        let room_id = &request.room_id;
708
709        self.get_room(room_id)
710            .expect("Can't send a message to a room that isn't known to the store")
711            .send(*content)
712            .with_transaction_id(txn_id)
713            .await
714            .map(|result| result.response)
715    }
716
717    pub(crate) async fn send_to_device(
718        &self,
719        request: &ToDeviceRequest,
720    ) -> HttpResult<ToDeviceResponse> {
721        let request = RumaToDeviceRequest::new_raw(
722            request.event_type.clone(),
723            request.txn_id.clone(),
724            request.messages.clone(),
725        );
726
727        self.send(request).await
728    }
729
730    pub(crate) async fn send_verification_request(
731        &self,
732        request: OutgoingVerificationRequest,
733    ) -> Result<()> {
734        use matrix_sdk_base::crypto::types::requests::OutgoingVerificationRequest::*;
735
736        match request {
737            ToDevice(t) => {
738                self.send_to_device(&t).await?;
739            }
740            InRoom(r) => {
741                self.room_send_helper(&r).await?;
742            }
743        }
744
745        Ok(())
746    }
747
748    async fn send_outgoing_request(&self, r: OutgoingRequest) -> Result<()> {
749        use matrix_sdk_base::crypto::types::requests::AnyOutgoingRequest;
750
751        match r.request() {
752            AnyOutgoingRequest::KeysQuery(request) => {
753                self.keys_query(r.request_id(), request.device_keys.clone()).await?;
754            }
755            AnyOutgoingRequest::KeysUpload(request) => {
756                let response = self.keys_upload(r.request_id(), request).await;
757
758                if let Err(e) = &response {
759                    match e.as_client_api_error() {
760                        Some(e) if e.status_code == 400 => {
761                            if let ErrorBody::Standard(StandardErrorBody { message, .. }) = &e.body
762                            {
763                                // This is one of the nastiest errors we can have. The server
764                                // telling us that we already have a one-time key uploaded means
765                                // that we forgot about some of our one-time keys. This will lead to
766                                // UTDs.
767                                {
768                                    let already_reported = self
769                                        .state_store()
770                                        .get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded)
771                                        .await?
772                                        .is_some();
773
774                                    if message.starts_with("One time key") && !already_reported {
775                                        let error_message =
776                                            DuplicateOneTimeKeyErrorMessage::from_str(message);
777
778                                        if let Ok(message) = &error_message {
779                                            error!(
780                                                sentry = true,
781                                                old_key = %message.old_key,
782                                                new_key = %message.new_key,
783                                                "Duplicate one-time keys have been uploaded"
784                                            );
785                                        } else {
786                                            error!(
787                                                sentry = true,
788                                                "Duplicate one-time keys have been uploaded"
789                                            );
790                                        }
791
792                                        self.state_store()
793                                            .set_kv_data(
794                                                StateStoreDataKey::OneTimeKeyAlreadyUploaded,
795                                                StateStoreDataValue::OneTimeKeyAlreadyUploaded,
796                                            )
797                                            .await?;
798
799                                        if let Err(e) = self
800                                            .inner
801                                            .duplicate_key_upload_error_sender
802                                            .send(error_message.ok())
803                                        {
804                                            error!(
805                                                "Failed to dispatch duplicate key upload error notification: {}",
806                                                e
807                                            );
808                                        }
809                                    }
810                                }
811                            }
812                        }
813                        _ => {}
814                    }
815
816                    response?;
817                }
818            }
819            AnyOutgoingRequest::ToDeviceRequest(request) => {
820                let response = self.send_to_device(request).await?;
821                self.mark_request_as_sent(r.request_id(), &response).await?;
822            }
823            AnyOutgoingRequest::SignatureUpload(request) => {
824                let response = self.send(request.clone()).await?;
825                self.mark_request_as_sent(r.request_id(), &response).await?;
826            }
827            AnyOutgoingRequest::RoomMessage(request) => {
828                let response = self.room_send_helper(request).await?;
829                self.mark_request_as_sent(r.request_id(), &response).await?;
830            }
831            AnyOutgoingRequest::KeysClaim(request) => {
832                let response = self.send(request.clone()).await?;
833                self.mark_request_as_sent(r.request_id(), &response).await?;
834            }
835        }
836
837        Ok(())
838    }
839
840    #[instrument(skip_all)]
841    pub(crate) async fn send_outgoing_requests(&self) -> Result<()> {
842        const MAX_CONCURRENT_REQUESTS: usize = 20;
843
844        // This is needed because sometimes we need to automatically
845        // claim some one-time keys to unwedge an existing Olm session.
846        if let Err(e) = self.claim_one_time_keys(iter::empty()).await {
847            warn!("Error while claiming one-time keys {:?}", e);
848        }
849
850        let outgoing_requests = stream::iter(
851            self.olm_machine()
852                .await
853                .as_ref()
854                .ok_or(Error::NoOlmMachine)?
855                .outgoing_requests()
856                .await?,
857        )
858        .map(|r| self.send_outgoing_request(r));
859
860        let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS);
861
862        requests
863            .for_each(|r| async move {
864                match r {
865                    Ok(_) => (),
866                    Err(e) => warn!(error = ?e, "Error when sending out an outgoing E2EE request"),
867                }
868            })
869            .await;
870
871        Ok(())
872    }
873}
874
875#[cfg(any(feature = "testing", test))]
876impl Client {
877    /// Get the olm machine, for testing purposes only.
878    pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
879        self.olm_machine().await
880    }
881
882    /// Aborts the client's bundle receiver task, for testing purposes only.
883    pub fn abort_bundle_receiver_task(&self) {
884        let tasks = self.inner.e2ee.tasks.lock();
885        if let Some(task) = tasks.receive_historic_room_key_bundles.as_ref() {
886            task.abort()
887        }
888    }
889}
890
891/// A high-level API to manage the client's encryption.
892///
893/// To get this, use [`Client::encryption()`].
894#[derive(Debug, Clone)]
895pub struct Encryption {
896    /// The underlying client.
897    client: Client,
898}
899
900impl Encryption {
901    pub(crate) fn new(client: Client) -> Self {
902        Self { client }
903    }
904
905    /// Returns the current encryption settings for this client.
906    pub(crate) fn settings(&self) -> EncryptionSettings {
907        self.client.inner.e2ee.encryption_settings
908    }
909
910    /// Get the public ed25519 key of our own device. This is usually what is
911    /// called the fingerprint of the device.
912    pub async fn ed25519_key(&self) -> Option<String> {
913        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().ed25519.to_base64())
914    }
915
916    /// Get the public Curve25519 key of our own device.
917    pub async fn curve25519_key(&self) -> Option<Curve25519PublicKey> {
918        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().curve25519)
919    }
920
921    /// Get the current device creation timestamp.
922    pub async fn device_creation_timestamp(&self) -> MilliSecondsSinceUnixEpoch {
923        match self.get_own_device().await {
924            Ok(Some(device)) => device.first_time_seen_ts(),
925            // Should not happen, there should always be an own device
926            _ => MilliSecondsSinceUnixEpoch::now(),
927        }
928    }
929
930    /// This method will import all the private cross-signing keys and, if
931    /// available, the private part of a backup key and its accompanying
932    /// version into the store.
933    ///
934    /// Importing all the secrets will mark the device as verified and enable
935    /// backups if a backup key was available in the bundle.
936    ///
937    /// **Warning**: Only import this from a trusted source, i.e. if an existing
938    /// device is sharing this with a new device.
939    ///
940    /// **Warning*: Only call this method right after logging in and before the
941    /// initial sync has been started.
942    pub async fn import_secrets_bundle(
943        &self,
944        bundle: &SecretsBundle,
945    ) -> Result<(), BundleImportError> {
946        self.import_secrets_bundle_impl(bundle).await?;
947
948        // Upload the device keys, this will ensure that other devices see us as a fully
949        // verified device as soon as this method returns.
950        self.ensure_device_keys_upload().await?;
951        self.wait_for_e2ee_initialization_tasks().await;
952
953        // If our initialization tasks completed before we imported the secrets bundle,
954        // backups might not have been enabled.
955        //
956        // In this case attempt to enable them again.
957        if !self.backups().are_enabled().await {
958            self.backups().maybe_resume_backups().await?;
959        }
960
961        Ok(())
962    }
963
964    pub(crate) async fn import_secrets_bundle_impl(
965        &self,
966        bundle: &SecretsBundle,
967    ) -> Result<(), SecretImportError> {
968        let olm_machine = self.client.olm_machine().await;
969        let olm_machine =
970            olm_machine.as_ref().expect("This should only be called once we have an OlmMachine");
971
972        olm_machine.store().import_secrets_bundle(bundle).await
973    }
974
975    /// Get the status of the private cross signing keys.
976    ///
977    /// This can be used to check which private cross signing keys we have
978    /// stored locally.
979    pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
980        let olm = self.client.olm_machine().await;
981        let machine = olm.as_ref()?;
982        Some(machine.cross_signing_status().await)
983    }
984
985    /// Does the user have other devices that the current device can verify
986    /// against?
987    ///
988    /// The device must be signed by the user's cross-signing key, must have an
989    /// identity, and must not be a dehydrated device.
990    pub async fn has_devices_to_verify_against(&self) -> Result<bool> {
991        let olm_machine = self.client.olm_machine().await;
992        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
993        let user_id = olm_machine.user_id();
994
995        self.ensure_initial_key_query().await?;
996
997        let devices = self.get_user_devices(user_id).await?;
998
999        let ret = devices.devices().any(|device| {
1000            device.is_cross_signed_by_owner()
1001                && device.curve25519_key().is_some()
1002                && !device.is_dehydrated()
1003        });
1004
1005        Ok(ret)
1006    }
1007
1008    /// Get all the tracked users we know about
1009    ///
1010    /// Tracked users are users for which we keep the device list of E2EE
1011    /// capable devices up to date.
1012    pub async fn tracked_users(&self) -> Result<HashSet<OwnedUserId>, CryptoStoreError> {
1013        if let Some(machine) = self.client.olm_machine().await.as_ref() {
1014            machine.tracked_users().await
1015        } else {
1016            Ok(HashSet::new())
1017        }
1018    }
1019
1020    /// Get a [`Subscriber`] for the [`VerificationState`].
1021    ///
1022    /// # Examples
1023    ///
1024    /// ```no_run
1025    /// use matrix_sdk::{Client, encryption};
1026    /// use url::Url;
1027    ///
1028    /// # async {
1029    /// let homeserver = Url::parse("http://example.com")?;
1030    /// let client = Client::new(homeserver).await?;
1031    /// let mut subscriber = client.encryption().verification_state();
1032    ///
1033    /// let current_value = subscriber.get();
1034    ///
1035    /// println!("The current verification state is: {current_value:?}");
1036    ///
1037    /// if let Some(verification_state) = subscriber.next().await {
1038    ///     println!("Received verification state update {:?}", verification_state)
1039    /// }
1040    /// # anyhow::Ok(()) };
1041    /// ```
1042    pub fn verification_state(&self) -> Subscriber<VerificationState> {
1043        self.client.inner.verification_state.subscribe_reset()
1044    }
1045
1046    /// Get a verification object with the given flow id.
1047    pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
1048        let olm = self.client.olm_machine().await;
1049        let olm = olm.as_ref()?;
1050        #[allow(clippy::bind_instead_of_map)]
1051        olm.get_verification(user_id, flow_id).and_then(|v| match v {
1052            matrix_sdk_base::crypto::Verification::SasV1(sas) => {
1053                Some(SasVerification { inner: sas, client: self.client.clone() }.into())
1054            }
1055            #[cfg(feature = "qrcode")]
1056            matrix_sdk_base::crypto::Verification::QrV1(qr) => {
1057                Some(verification::QrVerification { inner: qr, client: self.client.clone() }.into())
1058            }
1059            _ => None,
1060        })
1061    }
1062
1063    /// Get a `VerificationRequest` object for the given user with the given
1064    /// flow id.
1065    pub async fn get_verification_request(
1066        &self,
1067        user_id: &UserId,
1068        flow_id: impl AsRef<str>,
1069    ) -> Option<VerificationRequest> {
1070        let olm = self.client.olm_machine().await;
1071        let olm = olm.as_ref()?;
1072
1073        olm.get_verification_request(user_id, flow_id)
1074            .map(|r| VerificationRequest { inner: r, client: self.client.clone() })
1075    }
1076
1077    /// Get a specific device of a user.
1078    ///
1079    /// # Arguments
1080    ///
1081    /// * `user_id` - The unique id of the user that the device belongs to.
1082    ///
1083    /// * `device_id` - The unique id of the device.
1084    ///
1085    /// Returns a `Device` if one is found and the crypto store didn't throw an
1086    /// error.
1087    ///
1088    /// This will always return None if the client hasn't been logged in.
1089    ///
1090    /// # Examples
1091    ///
1092    /// ```no_run
1093    /// # use matrix_sdk::{Client, ruma::{device_id, user_id}};
1094    /// # use url::Url;
1095    /// # async {
1096    /// # let alice = user_id!("@alice:example.org");
1097    /// # let homeserver = Url::parse("http://example.com")?;
1098    /// # let client = Client::new(homeserver).await?;
1099    /// if let Some(device) =
1100    ///     client.encryption().get_device(alice, device_id!("DEVICEID")).await?
1101    /// {
1102    ///     println!("{:?}", device.is_verified());
1103    ///
1104    ///     if !device.is_verified() {
1105    ///         let verification = device.request_verification().await?;
1106    ///     }
1107    /// }
1108    /// # anyhow::Ok(()) };
1109    /// ```
1110    pub async fn get_device(
1111        &self,
1112        user_id: &UserId,
1113        device_id: &DeviceId,
1114    ) -> Result<Option<Device>, CryptoStoreError> {
1115        let olm = self.client.olm_machine().await;
1116        let Some(machine) = olm.as_ref() else { return Ok(None) };
1117        let device = machine.get_device(user_id, device_id, None).await?;
1118        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
1119    }
1120
1121    /// A convenience method to retrieve your own device from the store.
1122    ///
1123    /// This is the same as calling [`Encryption::get_device()`] with your own
1124    /// user and device ID.
1125    ///
1126    /// This will always return a device, unless you are not logged in.
1127    pub async fn get_own_device(&self) -> Result<Option<Device>, CryptoStoreError> {
1128        let olm = self.client.olm_machine().await;
1129        let Some(machine) = olm.as_ref() else { return Ok(None) };
1130        let device = machine.get_device(machine.user_id(), machine.device_id(), None).await?;
1131        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
1132    }
1133
1134    /// Get a map holding all the devices of an user.
1135    ///
1136    /// This will always return an empty map if the client hasn't been logged
1137    /// in.
1138    ///
1139    /// # Arguments
1140    ///
1141    /// * `user_id` - The unique id of the user that the devices belong to.
1142    ///
1143    /// # Examples
1144    ///
1145    /// ```no_run
1146    /// # use matrix_sdk::{Client, ruma::user_id};
1147    /// # use url::Url;
1148    /// # async {
1149    /// # let alice = user_id!("@alice:example.org");
1150    /// # let homeserver = Url::parse("http://example.com")?;
1151    /// # let client = Client::new(homeserver).await?;
1152    /// let devices = client.encryption().get_user_devices(alice).await?;
1153    ///
1154    /// for device in devices.devices() {
1155    ///     println!("{device:?}");
1156    /// }
1157    /// # anyhow::Ok(()) };
1158    /// ```
1159    pub async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices, Error> {
1160        let devices = self
1161            .client
1162            .olm_machine()
1163            .await
1164            .as_ref()
1165            .ok_or(Error::NoOlmMachine)?
1166            .get_user_devices(user_id, None)
1167            .await?;
1168
1169        Ok(UserDevices { inner: devices, client: self.client.clone() })
1170    }
1171
1172    /// Get the E2EE identity of a user from the crypto store.
1173    ///
1174    /// Usually, we only have the E2EE identity of a user locally if the user
1175    /// is tracked, meaning that we are both members of the same encrypted room.
1176    ///
1177    /// To get the E2EE identity of a user even if it is not available locally
1178    /// use [`Encryption::request_user_identity()`].
1179    ///
1180    /// # Arguments
1181    ///
1182    /// * `user_id` - The unique id of the user that the identity belongs to.
1183    ///
1184    /// Returns a `UserIdentity` if one is found and the crypto store
1185    /// didn't throw an error.
1186    ///
1187    /// This will always return None if the client hasn't been logged in.
1188    ///
1189    /// # Examples
1190    ///
1191    /// ```no_run
1192    /// # use matrix_sdk::{Client, ruma::user_id};
1193    /// # use url::Url;
1194    /// # async {
1195    /// # let alice = user_id!("@alice:example.org");
1196    /// # let homeserver = Url::parse("http://example.com")?;
1197    /// # let client = Client::new(homeserver).await?;
1198    /// let user = client.encryption().get_user_identity(alice).await?;
1199    ///
1200    /// if let Some(user) = user {
1201    ///     println!("{:?}", user.is_verified());
1202    ///
1203    ///     let verification = user.request_verification().await?;
1204    /// }
1205    /// # anyhow::Ok(()) };
1206    /// ```
1207    pub async fn get_user_identity(
1208        &self,
1209        user_id: &UserId,
1210    ) -> Result<Option<UserIdentity>, CryptoStoreError> {
1211        let olm = self.client.olm_machine().await;
1212        let Some(olm) = olm.as_ref() else { return Ok(None) };
1213        let identity = olm.get_identity(user_id, None).await?;
1214
1215        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1216    }
1217
1218    /// Get the E2EE identity of a user from the homeserver.
1219    ///
1220    /// The E2EE identity returned is always guaranteed to be up-to-date. If the
1221    /// E2EE identity is not found, it should mean that the user did not set
1222    /// up cross-signing.
1223    ///
1224    /// If you want the E2EE identity of a user without making a request to the
1225    /// homeserver, use [`Encryption::get_user_identity()`] instead.
1226    ///
1227    /// # Arguments
1228    ///
1229    /// * `user_id` - The ID of the user that the identity belongs to.
1230    ///
1231    /// Returns a [`UserIdentity`] if one is found. Returns an error if there
1232    /// was an issue with the crypto store or with the request to the
1233    /// homeserver.
1234    ///
1235    /// This will always return `None` if the client hasn't been logged in.
1236    ///
1237    /// # Examples
1238    ///
1239    /// ```no_run
1240    /// # use matrix_sdk::{Client, ruma::user_id};
1241    /// # use url::Url;
1242    /// # async {
1243    /// # let alice = user_id!("@alice:example.org");
1244    /// # let homeserver = Url::parse("http://example.com")?;
1245    /// # let client = Client::new(homeserver).await?;
1246    /// let user = client.encryption().request_user_identity(alice).await?;
1247    ///
1248    /// if let Some(user) = user {
1249    ///     println!("User is verified: {:?}", user.is_verified());
1250    ///
1251    ///     let verification = user.request_verification().await?;
1252    /// }
1253    /// # anyhow::Ok(()) };
1254    /// ```
1255    pub async fn request_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
1256        let olm = self.client.olm_machine().await;
1257        let Some(olm) = olm.as_ref() else { return Ok(None) };
1258
1259        let (request_id, request) = olm.query_keys_for_users(iter::once(user_id));
1260        self.client.keys_query(&request_id, request.device_keys).await?;
1261
1262        let identity = olm.get_identity(user_id, None).await?;
1263        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1264    }
1265
1266    /// Returns a stream of device updates, allowing users to listen for
1267    /// notifications about new or changed devices.
1268    ///
1269    /// The stream produced by this method emits updates whenever a new device
1270    /// is discovered or when an existing device's information is changed. Users
1271    /// can subscribe to this stream and receive updates in real-time.
1272    ///
1273    /// # Examples
1274    ///
1275    /// ```no_run
1276    /// # use matrix_sdk::Client;
1277    /// # use ruma::{device_id, user_id};
1278    /// # use futures_util::{pin_mut, StreamExt};
1279    /// # let client: Client = unimplemented!();
1280    /// # async {
1281    /// let devices_stream = client.encryption().devices_stream().await?;
1282    /// let user_id = client
1283    ///     .user_id()
1284    ///     .expect("We should know our user id after we have logged in");
1285    /// pin_mut!(devices_stream);
1286    ///
1287    /// for device_updates in devices_stream.next().await {
1288    ///     if let Some(user_devices) = device_updates.new.get(user_id) {
1289    ///         for device in user_devices.values() {
1290    ///             println!("A new device has been added {}", device.device_id());
1291    ///         }
1292    ///     }
1293    /// }
1294    /// # anyhow::Ok(()) };
1295    /// ```
1296    pub async fn devices_stream(&self) -> Result<impl Stream<Item = DeviceUpdates> + use<>> {
1297        let olm = self.client.olm_machine().await;
1298        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1299        let client = self.client.to_owned();
1300
1301        Ok(olm
1302            .store()
1303            .devices_stream()
1304            .map(move |updates| DeviceUpdates::new(client.to_owned(), updates)))
1305    }
1306
1307    /// Returns a stream of user identity updates, allowing users to listen for
1308    /// notifications about new or changed user identities.
1309    ///
1310    /// The stream produced by this method emits updates whenever a new user
1311    /// identity is discovered or when an existing identities information is
1312    /// changed. Users can subscribe to this stream and receive updates in
1313    /// real-time.
1314    ///
1315    /// # Examples
1316    ///
1317    /// ```no_run
1318    /// # use matrix_sdk::Client;
1319    /// # use ruma::{device_id, user_id};
1320    /// # use futures_util::{pin_mut, StreamExt};
1321    /// # let client: Client = unimplemented!();
1322    /// # async {
1323    /// let identities_stream =
1324    ///     client.encryption().user_identities_stream().await?;
1325    /// pin_mut!(identities_stream);
1326    ///
1327    /// for identity_updates in identities_stream.next().await {
1328    ///     for (_, identity) in identity_updates.new {
1329    ///         println!("A new identity has been added {}", identity.user_id());
1330    ///     }
1331    /// }
1332    /// # anyhow::Ok(()) };
1333    /// ```
1334    pub async fn user_identities_stream(
1335        &self,
1336    ) -> Result<impl Stream<Item = IdentityUpdates> + use<>> {
1337        let olm = self.client.olm_machine().await;
1338        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1339        let client = self.client.to_owned();
1340
1341        Ok(olm
1342            .store()
1343            .user_identities_stream()
1344            .map(move |updates| IdentityUpdates::new(client.to_owned(), updates)))
1345    }
1346
1347    /// Create and upload a new cross signing identity.
1348    ///
1349    /// # Arguments
1350    ///
1351    /// * `auth_data` - This request requires user interactive auth, the first
1352    ///   request needs to set this to `None` and will always fail with an
1353    ///   `UiaaResponse`. The response will contain information for the
1354    ///   interactive auth and the same request needs to be made but this time
1355    ///   with some `auth_data` provided.
1356    ///
1357    /// # Examples
1358    ///
1359    /// ```no_run
1360    /// # use std::collections::BTreeMap;
1361    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1362    /// # use url::Url;
1363    /// # use serde_json::json;
1364    /// # async {
1365    /// # let homeserver = Url::parse("http://example.com")?;
1366    /// # let client = Client::new(homeserver).await?;
1367    /// if let Err(e) = client.encryption().bootstrap_cross_signing(None).await {
1368    ///     if let Some(response) = e.as_uiaa_response() {
1369    ///         let mut password = uiaa::Password::new(
1370    ///             uiaa::UserIdentifier::Matrix(uiaa::MatrixUserIdentifier::new("example".to_owned())),
1371    ///             "wordpass".to_owned(),
1372    ///         );
1373    ///         password.session = response.session.clone();
1374    ///
1375    ///         client
1376    ///             .encryption()
1377    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1378    ///             .await
1379    ///             .expect("Couldn't bootstrap cross signing")
1380    ///     } else {
1381    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1382    ///     }
1383    /// }
1384    /// # anyhow::Ok(()) };
1385    pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData>) -> Result<()> {
1386        let olm = self.client.olm_machine().await;
1387        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1388
1389        let CrossSigningBootstrapRequests {
1390            upload_signing_keys_req,
1391            upload_keys_req,
1392            upload_signatures_req,
1393        } = olm.bootstrap_cross_signing(false).await?;
1394
1395        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1396            auth: auth_data,
1397            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1398            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1399            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1400        });
1401
1402        if let Some(req) = upload_keys_req {
1403            self.client.send_outgoing_request(req).await?;
1404        }
1405        self.client.send(upload_signing_keys_req).await?;
1406        self.client.send(upload_signatures_req).await?;
1407
1408        Ok(())
1409    }
1410
1411    /// Reset the cross-signing keys.
1412    ///
1413    /// # Example
1414    ///
1415    /// ```no_run
1416    /// use matrix_sdk::{ruma::api::client::uiaa, encryption::CrossSigningResetAuthType};
1417    ///
1418    /// # async {
1419    /// # let homeserver = url::Url::parse("http://example.com")?;
1420    /// # let client = matrix_sdk::Client::new(homeserver).await?;
1421    /// # let user_id = unimplemented!();
1422    /// let encryption = client.encryption();
1423    ///
1424    /// if let Some(handle) = encryption.reset_cross_signing().await? {
1425    ///     match handle.auth_type() {
1426    ///         CrossSigningResetAuthType::Uiaa(uiaa) => {
1427    ///             use matrix_sdk::ruma::api::client::uiaa;
1428    ///
1429    ///             let password = "1234".to_owned();
1430    ///             let mut password = uiaa::Password::new(user_id, password);
1431    ///             password.session = uiaa.session;
1432    ///
1433    ///             handle.auth(Some(uiaa::AuthData::Password(password))).await?;
1434    ///         }
1435    ///         CrossSigningResetAuthType::OAuth(o) => {
1436    ///             println!(
1437    ///                 "To reset your end-to-end encryption cross-signing identity, \
1438    ///                 you first need to approve it at {}",
1439    ///                 o.approval_url
1440    ///             );
1441    ///
1442    ///             let mut oauth = uiaa::OAuth::new();
1443    ///             oauth.session = o.session;
1444    ///
1445    ///             handle.auth(Some(uiaa::AuthData::OAuth(oauth))).await?;
1446    ///         }
1447    ///     }
1448    /// }
1449    /// # anyhow::Ok(()) };
1450    /// ```
1451    pub async fn reset_cross_signing(&self) -> Result<Option<CrossSigningResetHandle>> {
1452        let olm = self.client.olm_machine().await;
1453        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1454
1455        let CrossSigningBootstrapRequests {
1456            upload_keys_req,
1457            upload_signing_keys_req,
1458            upload_signatures_req,
1459        } = olm.bootstrap_cross_signing(true).await?;
1460
1461        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1462            auth: None,
1463            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1464            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1465            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1466        });
1467
1468        if let Some(req) = upload_keys_req {
1469            self.client.send_outgoing_request(req).await?;
1470        }
1471
1472        if let Err(error) = self.client.send(upload_signing_keys_req.clone()).await {
1473            if let Ok(Some(auth_type)) = CrossSigningResetAuthType::new(&error) {
1474                let client = self.client.clone();
1475
1476                Ok(Some(CrossSigningResetHandle::new(
1477                    client,
1478                    upload_signing_keys_req,
1479                    upload_signatures_req,
1480                    auth_type,
1481                )))
1482            } else {
1483                Err(error.into())
1484            }
1485        } else {
1486            self.client.send(upload_signatures_req).await?;
1487
1488            Ok(None)
1489        }
1490    }
1491
1492    /// Query the user's own device keys, if, and only if, we didn't have their
1493    /// identity in the first place.
1494    async fn ensure_initial_key_query(&self) -> Result<()> {
1495        let olm_machine = self.client.olm_machine().await;
1496        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1497
1498        let user_id = olm_machine.user_id();
1499
1500        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1501            let (request_id, request) = olm_machine.query_keys_for_users([olm_machine.user_id()]);
1502            self.client.keys_query(&request_id, request.device_keys).await?;
1503        }
1504
1505        Ok(())
1506    }
1507
1508    /// Create and upload a new cross signing identity, if that has not been
1509    /// done yet.
1510    ///
1511    /// This will only create a new cross-signing identity if the user had never
1512    /// done it before. If the user did it before, then this is a no-op.
1513    ///
1514    /// See also the documentation of [`Self::bootstrap_cross_signing`] for the
1515    /// behavior of this function.
1516    ///
1517    /// # Arguments
1518    ///
1519    /// * `auth_data` - This request requires user interactive auth, the first
1520    ///   request needs to set this to `None` and will always fail with an
1521    ///   `UiaaResponse`. The response will contain information for the
1522    ///   interactive auth and the same request needs to be made but this time
1523    ///   with some `auth_data` provided.
1524    ///
1525    /// # Examples
1526    /// ```no_run
1527    /// # use std::collections::BTreeMap;
1528    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1529    /// # use url::Url;
1530    /// # use serde_json::json;
1531    /// # async {
1532    /// # let homeserver = Url::parse("http://example.com")?;
1533    /// # let client = Client::new(homeserver).await?;
1534    /// if let Err(e) = client.encryption().bootstrap_cross_signing_if_needed(None).await {
1535    ///     if let Some(response) = e.as_uiaa_response() {
1536    ///         let mut password = uiaa::Password::new(
1537    ///             uiaa::UserIdentifier::Matrix(uiaa::MatrixUserIdentifier::new("example".to_owned())),
1538    ///             "wordpass".to_owned(),
1539    ///         );
1540    ///         password.session = response.session.clone();
1541    ///
1542    ///         // Note, on the failed attempt we can use `bootstrap_cross_signing` immediately, to
1543    ///         // avoid checks.
1544    ///         client
1545    ///             .encryption()
1546    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1547    ///             .await
1548    ///             .expect("Couldn't bootstrap cross signing")
1549    ///     } else {
1550    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1551    ///     }
1552    /// }
1553    /// # anyhow::Ok(()) };
1554    pub async fn bootstrap_cross_signing_if_needed(
1555        &self,
1556        auth_data: Option<AuthData>,
1557    ) -> Result<()> {
1558        let olm_machine = self.client.olm_machine().await;
1559        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1560        let user_id = olm_machine.user_id();
1561
1562        self.ensure_initial_key_query().await?;
1563
1564        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1565            self.bootstrap_cross_signing(auth_data).await?;
1566        }
1567
1568        Ok(())
1569    }
1570
1571    /// Export E2EE keys that match the given predicate encrypting them with the
1572    /// given passphrase.
1573    ///
1574    /// # Arguments
1575    ///
1576    /// * `path` - The file path where the exported key file will be saved.
1577    ///
1578    /// * `passphrase` - The passphrase that will be used to encrypt the
1579    ///   exported room keys.
1580    ///
1581    /// * `predicate` - A closure that will be called for every known
1582    ///   `InboundGroupSession`, which represents a room key. If the closure
1583    ///   returns `true` the `InboundGroupSessoin` will be included in the
1584    ///   export, if the closure returns `false` it will not be included.
1585    ///
1586    /// # Panics
1587    ///
1588    /// This method will panic if it isn't run on a Tokio runtime.
1589    ///
1590    /// This method will panic if it can't get enough randomness from the OS to
1591    /// encrypt the exported keys securely.
1592    ///
1593    /// # Examples
1594    ///
1595    /// ```no_run
1596    /// # use std::{path::PathBuf, time::Duration};
1597    /// # use matrix_sdk::{
1598    /// #     Client, config::SyncSettings,
1599    /// #     ruma::room_id,
1600    /// # };
1601    /// # use url::Url;
1602    /// # async {
1603    /// # let homeserver = Url::parse("http://localhost:8080")?;
1604    /// # let mut client = Client::new(homeserver).await?;
1605    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1606    /// // Export all room keys.
1607    /// client
1608    ///     .encryption()
1609    ///     .export_room_keys(path, "secret-passphrase", |_| true)
1610    ///     .await?;
1611    ///
1612    /// // Export only the room keys for a certain room.
1613    /// let path = PathBuf::from("/home/example/e2e-room-keys.txt");
1614    /// let room_id = room_id!("!test:localhost");
1615    ///
1616    /// client
1617    ///     .encryption()
1618    ///     .export_room_keys(path, "secret-passphrase", |s| s.room_id() == room_id)
1619    ///     .await?;
1620    /// # anyhow::Ok(()) };
1621    /// ```
1622    #[cfg(not(target_family = "wasm"))]
1623    pub async fn export_room_keys(
1624        &self,
1625        path: PathBuf,
1626        passphrase: &str,
1627        predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
1628    ) -> Result<()> {
1629        let olm = self.client.olm_machine().await;
1630        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1631
1632        let keys = olm.store().export_room_keys(predicate).await?;
1633        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1634
1635        let encrypt = move || -> Result<()> {
1636            let export: String =
1637                matrix_sdk_base::crypto::encrypt_room_key_export(&keys, &passphrase, 500_000)?;
1638            let mut file = std::fs::File::create(path)?;
1639            file.write_all(&export.into_bytes())?;
1640            Ok(())
1641        };
1642
1643        let task = tokio::task::spawn_blocking(encrypt);
1644        task.await.expect("Task join error")
1645    }
1646
1647    /// Import E2EE keys from the given file path.
1648    ///
1649    /// # Arguments
1650    ///
1651    /// * `path` - The file path where the exported key file will can be found.
1652    ///
1653    /// * `passphrase` - The passphrase that should be used to decrypt the
1654    ///   exported room keys.
1655    ///
1656    /// Returns a tuple of numbers that represent the number of sessions that
1657    /// were imported and the total number of sessions that were found in the
1658    /// key export.
1659    ///
1660    /// # Panics
1661    ///
1662    /// This method will panic if it isn't run on a Tokio runtime.
1663    ///
1664    /// ```no_run
1665    /// # use std::{path::PathBuf, time::Duration};
1666    /// # use matrix_sdk::{
1667    /// #     Client, config::SyncSettings,
1668    /// #     ruma::room_id,
1669    /// # };
1670    /// # use url::Url;
1671    /// # async {
1672    /// # let homeserver = Url::parse("http://localhost:8080")?;
1673    /// # let mut client = Client::new(homeserver).await?;
1674    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1675    /// let result =
1676    ///     client.encryption().import_room_keys(path, "secret-passphrase").await?;
1677    ///
1678    /// println!(
1679    ///     "Imported {} room keys out of {}",
1680    ///     result.imported_count, result.total_count
1681    /// );
1682    /// # anyhow::Ok(()) };
1683    /// ```
1684    #[cfg(not(target_family = "wasm"))]
1685    pub async fn import_room_keys(
1686        &self,
1687        path: PathBuf,
1688        passphrase: &str,
1689    ) -> Result<RoomKeyImportResult, RoomKeyImportError> {
1690        let olm = self.client.olm_machine().await;
1691        let olm = olm.as_ref().ok_or(RoomKeyImportError::StoreClosed)?;
1692        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1693
1694        let decrypt = move || {
1695            let file = std::fs::File::open(path)?;
1696            matrix_sdk_base::crypto::decrypt_room_key_export(file, &passphrase)
1697        };
1698
1699        let task = tokio::task::spawn_blocking(decrypt);
1700        let import = task.await.expect("Task join error")?;
1701
1702        let ret = olm.store().import_exported_room_keys(import, |_, _| {}).await?;
1703
1704        self.backups().maybe_trigger_backup();
1705
1706        Ok(ret)
1707    }
1708
1709    /// Receive notifications of room keys being received as a [`Stream`].
1710    ///
1711    /// Each time a room key is updated in any way, an update will be sent to
1712    /// the stream. Updates that happen at the same time are batched into a
1713    /// [`Vec`].
1714    ///
1715    /// If the reader of the stream lags too far behind, an error is broadcast
1716    /// containing the number of skipped items.
1717    ///
1718    /// # Examples
1719    ///
1720    /// ```no_run
1721    /// # use matrix_sdk::Client;
1722    /// # use url::Url;
1723    /// # async {
1724    /// # let homeserver = Url::parse("http://example.com")?;
1725    /// # let client = Client::new(homeserver).await?;
1726    /// use futures_util::StreamExt;
1727    ///
1728    /// let Some(mut room_keys_stream) =
1729    ///     client.encryption().room_keys_received_stream().await
1730    /// else {
1731    ///     return Ok(());
1732    /// };
1733    ///
1734    /// while let Some(update) = room_keys_stream.next().await {
1735    ///     println!("Received room keys {update:?}");
1736    /// }
1737    /// # anyhow::Ok(()) };
1738    /// ```
1739    pub async fn room_keys_received_stream(
1740        &self,
1741    ) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<>>
1742    {
1743        let olm = self.client.olm_machine().await;
1744        let olm = olm.as_ref()?;
1745
1746        Some(olm.store().room_keys_received_stream())
1747    }
1748
1749    /// Receive notifications of historic room key bundles as a [`Stream`].
1750    ///
1751    /// Historic room key bundles are defined in [MSC4268](https://github.com/matrix-org/matrix-spec-proposals/pull/4268).
1752    ///
1753    /// Each time a historic room key bundle was received, an update will be
1754    /// sent to the stream. This stream is useful for informative purposes
1755    /// exclusively, historic room key bundles are handled by the SDK
1756    /// automatically.
1757    ///
1758    /// # Examples
1759    ///
1760    /// ```no_run
1761    /// # use matrix_sdk::Client;
1762    /// # use url::Url;
1763    /// # async {
1764    /// # let homeserver = Url::parse("http://example.com")?;
1765    /// # let client = Client::new(homeserver).await?;
1766    /// use futures_util::StreamExt;
1767    ///
1768    /// let Some(mut bundle_stream) =
1769    ///     client.encryption().historic_room_key_stream().await
1770    /// else {
1771    ///     return Ok(());
1772    /// };
1773    ///
1774    /// while let Some(bundle_info) = bundle_stream.next().await {
1775    ///     println!("Received a historic room key bundle {bundle_info:?}");
1776    /// }
1777    /// # anyhow::Ok(()) };
1778    /// ```
1779    pub async fn historic_room_key_stream(
1780        &self,
1781    ) -> Option<impl Stream<Item = RoomKeyBundleInfo> + use<>> {
1782        let olm = self.client.olm_machine().await;
1783        let olm = olm.as_ref()?;
1784
1785        Some(olm.store().historic_room_key_stream())
1786    }
1787
1788    /// Get the secret storage manager of the client.
1789    pub fn secret_storage(&self) -> SecretStorage {
1790        SecretStorage { client: self.client.to_owned() }
1791    }
1792
1793    /// Get the backups manager of the client.
1794    pub fn backups(&self) -> Backups {
1795        Backups { client: self.client.to_owned() }
1796    }
1797
1798    /// Get the recovery manager of the client.
1799    pub fn recovery(&self) -> Recovery {
1800        Recovery { client: self.client.to_owned() }
1801    }
1802
1803    /// Enables the crypto-store cross-process lock.
1804    ///
1805    /// This may be required if there are multiple processes that may do writes
1806    /// to the same crypto store. In that case, it's necessary to create a
1807    /// lock, so that only one process writes to it, otherwise this may
1808    /// cause confusing issues because of stale data contained in in-memory
1809    /// caches.
1810    ///
1811    /// The provided `lock_value` must be a unique identifier for this process.
1812    /// Use [`Client::cross_process_lock_config`] to get the global value, if
1813    /// multi-process is enabled.
1814    pub async fn enable_cross_process_store_lock(&self, lock_value: String) -> Result<(), Error> {
1815        // If the lock has already been created, don't recreate it from scratch.
1816        if let Some(prev_lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1817            let prev_holder = prev_lock.lock_holder();
1818            if prev_holder.is_some() && prev_holder.unwrap() == lock_value {
1819                return Ok(());
1820            }
1821            warn!(
1822                "Recreating cross-process store lock with a different holder value: \
1823                 prev was {prev_holder:?}, new is {lock_value}"
1824            );
1825        }
1826
1827        let olm_machine = self.client.base_client().olm_machine().await;
1828        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1829
1830        let lock = olm_machine.store().create_store_lock(
1831            "cross_process_lock".to_owned(),
1832            CrossProcessLockConfig::multi_process(lock_value.to_owned()),
1833        );
1834
1835        // Gently try to initialize the crypto store generation counter.
1836        //
1837        // If we don't get the lock immediately, then it is already acquired by another
1838        // process, and we'll get to reload next time we acquire the lock.
1839        {
1840            let lock_result = lock.try_lock_once().await?;
1841
1842            if lock_result.is_ok() {
1843                olm_machine
1844                    .initialize_crypto_store_generation(
1845                        &self.client.locks().crypto_store_generation,
1846                    )
1847                    .await?;
1848            }
1849        }
1850
1851        self.client
1852            .locks()
1853            .cross_process_crypto_store_lock
1854            .set(lock)
1855            .map_err(|_| Error::BadCryptoStoreState)?;
1856
1857        Ok(())
1858    }
1859
1860    /// Maybe reload the `OlmMachine` after acquiring the lock for the first
1861    /// time.
1862    ///
1863    /// Returns the current generation number.
1864    #[instrument(skip(self), fields(olm_machine_new_generation, olm_machine_generation))]
1865    async fn on_lock_newly_acquired(&self) -> Result<u64, Error> {
1866        let olm_machine_guard = self.client.olm_machine().await;
1867        if let Some(olm_machine) = olm_machine_guard.as_ref() {
1868            let (new_gen, generation_number) = olm_machine
1869                .maintain_crypto_store_generation(&self.client.locks().crypto_store_generation)
1870                .await?;
1871
1872            Span::current()
1873                .record("olm_machine_new_generation", new_gen)
1874                .record("olm_machine_generation", generation_number);
1875            debug!("OlmMachine generation maintained in CryptoStore");
1876
1877            // If the crypto store generation has changed,
1878            if new_gen {
1879                // (get rid of the reference to the current crypto store first)
1880                drop(olm_machine_guard);
1881                // Recreate the OlmMachine.
1882                self.client.base_client().regenerate_olm(None).await?;
1883            }
1884            Ok(generation_number)
1885        } else {
1886            // XXX: not sure this is reachable. Seems like the OlmMachine should always have
1887            // been initialised by the time we get here. Ideally we'd panic, or return an
1888            // error, but for now I'm just adding some logging to check if it
1889            // happens, and returning the magic number 0.
1890            warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised");
1891            Ok(0)
1892        }
1893    }
1894
1895    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1896    /// spin-waits until the lock is available.
1897    ///
1898    /// May reload the `OlmMachine`, after obtaining the lock but not on the
1899    /// first time.
1900    ///
1901    /// Returns a guard to the lock, if it was obtained.
1902    pub async fn spin_lock_store(
1903        &self,
1904        max_backoff: Option<u32>,
1905    ) -> Result<Option<CrossProcessLockGuard>, Error> {
1906        self.lock_store(async move |lock| lock.spin_lock(max_backoff).await).await
1907    }
1908
1909    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1910    /// attempts to lock it once.
1911    ///
1912    /// May reload the `OlmMachine`, after obtaining the lock but not on the
1913    /// first time.
1914    ///
1915    /// Returns a guard to the lock, if it was obtained.
1916    pub async fn try_lock_store_once(&self) -> Result<Option<CrossProcessLockGuard>, Error> {
1917        match self.lock_store(CrossProcessLock::try_lock_once).await {
1918            Err(Error::CrossProcessLockError(e))
1919                if matches!(*e, CrossProcessLockError::Unobtained(_)) =>
1920            {
1921                Ok(None)
1922            }
1923            other => other,
1924        }
1925    }
1926
1927    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1928    /// locks the store with the given function, `acquire`.
1929    ///
1930    /// Reloads the `OlmMachine` after obtaining the lock, if the lock is dirty.
1931    ///
1932    /// Returns a guard to the lock if it was obtained.
1933    pub async fn lock_store<F: AcquireCrossProcessLockFn<LockableCryptoStore>>(
1934        &self,
1935        acquire: F,
1936    ) -> Result<Option<CrossProcessLockGuard>, Error> {
1937        let wrap_err = |e: CryptoStoreError| {
1938            Error::CrossProcessLockError(Box::new(CrossProcessLockError::TryLock(Arc::new(e))))
1939        };
1940        if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1941            let guard = acquire(lock).await.map_err(wrap_err)??;
1942            let _ = self.on_lock_newly_acquired().await?;
1943            Ok(Some(guard.into_guard()))
1944        } else {
1945            Ok(None)
1946        }
1947    }
1948
1949    /// Testing purposes only.
1950    #[cfg(any(test, feature = "testing"))]
1951    pub async fn uploaded_key_count(&self) -> Result<u64> {
1952        let olm_machine = self.client.olm_machine().await;
1953        let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?;
1954        Ok(olm_machine.uploaded_key_count().await?)
1955    }
1956
1957    /// Bootstrap encryption and enables event listeners for the E2EE support.
1958    ///
1959    /// Based on the `EncryptionSettings`, this call might:
1960    /// - Bootstrap cross-signing if needed (POST `/device_signing/upload`)
1961    /// - Create a key backup if needed (POST `/room_keys/version`)
1962    /// - Create a secret storage if needed (PUT `/account_data/{type}`)
1963    ///
1964    /// As part of this process, and if needed, the current device keys would be
1965    /// uploaded to the server, new account data would be added, and cross
1966    /// signing keys and signatures might be uploaded.
1967    ///
1968    /// Should be called once we
1969    /// created a [`OlmMachine`], i.e. after logging in.
1970    ///
1971    /// # Arguments
1972    ///
1973    /// * `auth_data` - Some requests may require re-authentication. To prevent
1974    ///   the user from having to re-enter their password (or use other
1975    ///   methods), we can provide the authentication data here. This is
1976    ///   necessary for uploading cross-signing keys. However, please note that
1977    ///   there is a proposal (MSC3967) to remove this requirement, which would
1978    ///   allow for the initial upload of cross-signing keys without
1979    ///   authentication, rendering this parameter obsolete.
1980    pub(crate) async fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1981        // It's fine to be async here as we're only getting the lock protecting the
1982        // `OlmMachine`. Since the lock shouldn't be that contested right after logging
1983        // in we won't delay the login or restoration of the Client.
1984        let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite {
1985            Some(BundleReceiverTask::new(&self.client).await)
1986        } else {
1987            None
1988        };
1989
1990        let mut tasks = self.client.inner.e2ee.tasks.lock();
1991
1992        let this = self.clone();
1993
1994        tasks.setup_e2ee = Some(spawn(async move {
1995            // Update the current state first, so we don't have to wait for the result of
1996            // network requests
1997            this.update_verification_state().await;
1998
1999            if this.settings().auto_enable_cross_signing
2000                && let Err(e) = this.bootstrap_cross_signing_if_needed(auth_data).await
2001            {
2002                error!("Couldn't bootstrap cross signing {e:?}");
2003            }
2004
2005            if let Err(e) = this.backups().setup_and_resume().await {
2006                error!("Couldn't setup and resume backups {e:?}");
2007            }
2008            if let Err(e) = this.recovery().setup().await {
2009                error!("Couldn't setup and resume recovery {e:?}");
2010            }
2011        }));
2012
2013        tasks.receive_historic_room_key_bundles = bundle_receiver_task;
2014
2015        self.setup_room_membership_session_discard_handler();
2016    }
2017
2018    /// Waits for end-to-end encryption initialization tasks to finish, if any
2019    /// was running in the background.
2020    pub async fn wait_for_e2ee_initialization_tasks(&self) {
2021        let task = self.client.inner.e2ee.tasks.lock().setup_e2ee.take();
2022
2023        if let Some(task) = task
2024            && let Err(err) = task.await
2025        {
2026            warn!("Error when initializing backups: {err}");
2027        }
2028    }
2029
2030    /// Upload the device keys and initial set of one-time keys to the server.
2031    ///
2032    /// This should only be called when the user logs in for the first time,
2033    /// the method will ensure that other devices see our own device as an
2034    /// end-to-end encryption enabled one.
2035    ///
2036    /// **Warning**: Do not use this method if we're already calling
2037    /// [`Client::send_outgoing_request()`]. This method is intended for
2038    /// explicitly uploading the device keys before starting a sync.
2039    pub(crate) async fn ensure_device_keys_upload(&self) -> Result<()> {
2040        let olm = self.client.olm_machine().await;
2041        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
2042
2043        if let Some((request_id, request)) = olm.upload_device_keys().await? {
2044            self.client.keys_upload(&request_id, &request).await?;
2045
2046            let (request_id, request) = olm.query_keys_for_users([olm.user_id()]);
2047            self.client.keys_query(&request_id, request.device_keys).await?;
2048        }
2049
2050        Ok(())
2051    }
2052
2053    pub(crate) async fn update_state_after_keys_query(&self, response: &get_keys::v3::Response) {
2054        self.recovery().update_state_after_keys_query(response).await;
2055
2056        // Only update the verification_state if our own devices changed
2057        if let Some(user_id) = self.client.user_id() {
2058            let contains_own_device = response.device_keys.contains_key(user_id);
2059
2060            if contains_own_device {
2061                self.update_verification_state().await;
2062            }
2063        }
2064    }
2065
2066    async fn update_verification_state(&self) {
2067        match self.get_own_device().await {
2068            Ok(device) => {
2069                if let Some(device) = device {
2070                    let is_verified = device.is_cross_signed_by_owner();
2071
2072                    if is_verified {
2073                        self.client.inner.verification_state.set(VerificationState::Verified);
2074                    } else {
2075                        self.client.inner.verification_state.set(VerificationState::Unverified);
2076                    }
2077                } else {
2078                    warn!("Couldn't find out own device in the store.");
2079                    self.client.inner.verification_state.set(VerificationState::Unknown);
2080                }
2081            }
2082            Err(error) => {
2083                warn!("Failed retrieving own device: {error}");
2084                self.client.inner.verification_state.set(VerificationState::Unknown);
2085            }
2086        }
2087    }
2088
2089    /// Sets up a handler to rotate room keys when a user leaves a room.
2090    ///
2091    /// Previously, it was sufficient to check if we need to rotate the room key
2092    /// prior to sending a message. However, the history sharing feature
2093    /// ([MSC4268]) breaks this logic:
2094    ///
2095    /// 1. Alice sends a message M1 in room X;
2096    /// 2. Bob invites Charlie, who joins and immediately leaves the room;
2097    /// 3. Alice sends another message M2 in room X.
2098    ///
2099    /// Under the old logic, Alice would not rotate her key after Charlie
2100    /// leaves, resulting in M2 being encrypted with the same session as M1.
2101    /// This would allow Charlie to decrypt M2 if he ever gains access to
2102    /// the event.
2103    ///
2104    /// This handler listens for changes to the room membership, and discards
2105    /// the current room key if the event is a `leave` event.
2106    ///
2107    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
2108    fn setup_room_membership_session_discard_handler(&self) {
2109        let client = WeakClient::from_client(&self.client);
2110        self.client.add_event_handler(|ev: OriginalSyncRoomMemberEvent, room: Room| async move {
2111            let Some(client) = client.get() else {
2112                // The main client has been dropped.
2113                return;
2114            };
2115            let Some(user_id) = client.user_id() else {
2116                // We aren't logged in, so this shouldn't ever happen.
2117                return;
2118            };
2119            let olm = client.olm_machine().await;
2120            let Some(olm) = olm.as_ref() else {
2121                warn!("Cannot discard session - Olm machine is not available");
2122                return;
2123            };
2124
2125            if matches!(
2126                ev.membership_change(),
2127                MembershipChange::Joined |
2128                MembershipChange::Invited |
2129                MembershipChange::KnockAccepted |
2130                MembershipChange::InvitationAccepted |
2131                MembershipChange::ProfileChanged { .. }
2132            ) || ev.sender == user_id {
2133                // We can ignore events that did not remove us, and those that we sent.
2134                return;
2135            }
2136
2137            debug!(room_id = ?room.room_id(), member_id = ?ev.sender, "Discarding session as a user left the room");
2138
2139            // Attempt to discard the current room key. This won't do anything if we don't have one,
2140            // but that's fine since we will create a new room key whenever we try to send a message.
2141            if let Err(e) = olm.discard_room_key(room.room_id()).await {
2142                warn!(
2143                    room_id = ?room.room_id(),
2144                    "Error discarding room key after member leave: {e:?}"
2145                );
2146            }
2147        });
2148    }
2149
2150    /// Encrypts then send the given content via the `/sendToDevice` end-point
2151    /// using Olm encryption.
2152    ///
2153    /// If there are a lot of recipient devices multiple `/sendToDevice`
2154    /// requests might be sent out.
2155    ///
2156    /// # Returns
2157    /// A list of failures. The list of devices that couldn't get the messages.
2158    #[cfg(feature = "experimental-send-custom-to-device")]
2159    pub async fn encrypt_and_send_raw_to_device(
2160        &self,
2161        recipient_devices: Vec<&Device>,
2162        event_type: &str,
2163        content: Raw<AnyToDeviceEventContent>,
2164        share_strategy: CollectStrategy,
2165    ) -> Result<Vec<(OwnedUserId, OwnedDeviceId)>> {
2166        let users = recipient_devices.iter().map(|device| device.user_id());
2167
2168        // Will claim one-time-key for users that needs it
2169        // TODO: For later optimisation: This will establish missing olm sessions with
2170        // all this users devices, but we just want for some devices.
2171        self.client.claim_one_time_keys(users).await?;
2172
2173        let olm = self.client.olm_machine().await;
2174        let olm = olm.as_ref().expect("Olm machine wasn't started");
2175
2176        let (requests, withhelds) = olm
2177            .encrypt_content_for_devices(
2178                recipient_devices.into_iter().map(|d| d.deref().clone()).collect(),
2179                event_type,
2180                &content
2181                    .deserialize_as::<serde_json::Value>()
2182                    .expect("Deserialize as Value will always work"),
2183                share_strategy,
2184            )
2185            .await?;
2186
2187        let mut failures: Vec<(OwnedUserId, OwnedDeviceId)> = Default::default();
2188
2189        // Push the withhelds in the failures
2190        withhelds.iter().for_each(|(d, _)| {
2191            failures.push((d.user_id().to_owned(), d.device_id().to_owned()));
2192        });
2193
2194        // TODO: parallelize that? it's already grouping 250 devices per chunk.
2195        for request in requests {
2196            let ruma_request = RumaToDeviceRequest::new_raw(
2197                request.event_type.clone(),
2198                request.txn_id.clone(),
2199                request.messages.clone(),
2200            );
2201
2202            let send_result = self
2203                .client
2204                .send_inner(ruma_request, Some(RequestConfig::short_retry()), Default::default())
2205                .await;
2206
2207            // If the sending failed we need to collect the failures to report them
2208            if send_result.is_err() {
2209                // Mark the sending as failed
2210                for (user_id, device_map) in request.messages {
2211                    for device_id in device_map.keys() {
2212                        match device_id {
2213                            DeviceIdOrAllDevices::DeviceId(device_id) => {
2214                                failures.push((user_id.clone(), device_id.to_owned()));
2215                            }
2216                            DeviceIdOrAllDevices::AllDevices => {
2217                                // Cannot happen in this case
2218                            }
2219                        }
2220                    }
2221                }
2222            }
2223        }
2224
2225        Ok(failures)
2226    }
2227}
2228
2229#[cfg(all(test, not(target_family = "wasm")))]
2230mod tests {
2231    use std::{
2232        ops::Not,
2233        str::FromStr,
2234        sync::{
2235            Arc,
2236            atomic::{AtomicBool, Ordering},
2237        },
2238        time::Duration,
2239    };
2240
2241    use matrix_sdk_test::{
2242        DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
2243        event_factory::EventFactory,
2244    };
2245    use ruma::{
2246        event_id,
2247        events::{reaction::ReactionEventContent, relation::Annotation},
2248        user_id,
2249    };
2250    use serde_json::json;
2251    use wiremock::{
2252        Mock, MockServer, Request, ResponseTemplate,
2253        matchers::{header, method, path_regex},
2254    };
2255
2256    use crate::{
2257        Client, assert_next_matches_with_timeout,
2258        config::RequestConfig,
2259        encryption::{
2260            DuplicateOneTimeKeyErrorMessage, OAuthCrossSigningResetInfo, VerificationState,
2261        },
2262        test_utils::{
2263            client::mock_matrix_session, logged_in_client, no_retry_test_client, set_client_session,
2264        },
2265    };
2266
2267    #[async_test]
2268    async fn test_reaction_sending() {
2269        let server = MockServer::start().await;
2270        let client = logged_in_client(Some(server.uri())).await;
2271
2272        let event_id = event_id!("$2:example.org");
2273
2274        Mock::given(method("GET"))
2275            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
2276            .and(header("authorization", "Bearer 1234"))
2277            .respond_with(
2278                ResponseTemplate::new(200)
2279                    .set_body_json(EventFactory::new().room_encryption().into_content()),
2280            )
2281            .mount(&server)
2282            .await;
2283
2284        Mock::given(method("PUT"))
2285            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/m\.reaction/.*".to_owned()))
2286            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2287                "event_id": event_id,
2288            })))
2289            .mount(&server)
2290            .await;
2291
2292        let f = EventFactory::new().sender(user_id!("@example:localhost"));
2293        let response = SyncResponseBuilder::default()
2294            .add_joined_room(
2295                JoinedRoomBuilder::default()
2296                    .add_state_event(
2297                        f.member(user_id!("@example:localhost")).display_name("example"),
2298                    )
2299                    .add_state_event(f.default_power_levels())
2300                    .add_state_event(f.room_encryption()),
2301            )
2302            .build_sync_response();
2303
2304        client.base_client().receive_sync_response(response).await.unwrap();
2305
2306        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
2307        assert!(
2308            room.latest_encryption_state().await.expect("Getting encryption state").is_encrypted()
2309        );
2310
2311        let event_id = event_id!("$1:example.org");
2312        let reaction = ReactionEventContent::new(Annotation::new(event_id.into(), "🐈".to_owned()));
2313        room.send(reaction).await.expect("Sending the reaction should not fail");
2314
2315        room.send_raw("m.reaction", json!({})).await.expect("Sending the reaction should not fail");
2316    }
2317
2318    #[cfg(feature = "sqlite")]
2319    #[async_test]
2320    async fn test_generation_counter_invalidates_olm_machine() {
2321        // Create two clients using the same sqlite database.
2322
2323        use matrix_sdk_base::store::RoomLoadSettings;
2324        let sqlite_path = std::env::temp_dir().join("generation_counter_sqlite.db");
2325        let session = mock_matrix_session();
2326
2327        let client1 = Client::builder()
2328            .homeserver_url("http://localhost:1234")
2329            .request_config(RequestConfig::new().disable_retry())
2330            .sqlite_store(&sqlite_path, None)
2331            .build()
2332            .await
2333            .unwrap();
2334        client1
2335            .matrix_auth()
2336            .restore_session(session.clone(), RoomLoadSettings::default())
2337            .await
2338            .unwrap();
2339
2340        let client2 = Client::builder()
2341            .homeserver_url("http://localhost:1234")
2342            .request_config(RequestConfig::new().disable_retry())
2343            .sqlite_store(sqlite_path, None)
2344            .build()
2345            .await
2346            .unwrap();
2347        client2.matrix_auth().restore_session(session, RoomLoadSettings::default()).await.unwrap();
2348
2349        // When the lock isn't enabled, any attempt at locking won't return a guard.
2350        let guard = client1.encryption().try_lock_store_once().await.unwrap();
2351        assert!(guard.is_none());
2352
2353        client1.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2354        client2.encryption().enable_cross_process_store_lock("client2".to_owned()).await.unwrap();
2355
2356        // One client can take the lock.
2357        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2358        assert!(acquired1.is_some());
2359
2360        // Keep the olm machine, so we can see if it's changed later, by comparing Arcs.
2361        let initial_olm_machine =
2362            client1.olm_machine().await.clone().expect("must have an olm machine");
2363
2364        // Also enable backup to check that new machine has the same backup keys.
2365        let decryption_key = matrix_sdk_base::crypto::store::types::BackupDecryptionKey::new();
2366        let backup_key = decryption_key.megolm_v1_public_key();
2367        backup_key.set_version("1".to_owned());
2368        initial_olm_machine
2369            .backup_machine()
2370            .save_decryption_key(Some(decryption_key.to_owned()), Some("1".to_owned()))
2371            .await
2372            .expect("Should save");
2373
2374        initial_olm_machine.backup_machine().enable_backup_v1(backup_key.clone()).await.unwrap();
2375
2376        assert!(client1.encryption().backups().are_enabled().await);
2377
2378        // The other client can't take the lock too.
2379        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2380        assert!(acquired2.is_none());
2381
2382        // Now have the first client release the lock,
2383        drop(acquired1);
2384        tokio::time::sleep(Duration::from_millis(100)).await;
2385
2386        // And re-take it.
2387        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2388        assert!(acquired1.is_some());
2389
2390        // In that case, the Olm Machine shouldn't change.
2391        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2392        assert!(initial_olm_machine.same_as(&olm_machine));
2393
2394        // Ok, release again.
2395        drop(acquired1);
2396        tokio::time::sleep(Duration::from_millis(100)).await;
2397
2398        // Client2 can acquire the lock.
2399        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2400        assert!(acquired2.is_some());
2401
2402        // And then release it.
2403        drop(acquired2);
2404        tokio::time::sleep(Duration::from_millis(100)).await;
2405
2406        // Client1 can acquire it again,
2407        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2408        assert!(acquired1.is_some());
2409
2410        // But now its olm machine has been invalidated and thus regenerated!
2411        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2412
2413        assert!(!initial_olm_machine.same_as(&olm_machine));
2414
2415        let backup_key_new = olm_machine.backup_machine().get_backup_keys().await.unwrap();
2416        assert!(backup_key_new.decryption_key.is_some());
2417        assert_eq!(
2418            backup_key_new.decryption_key.unwrap().megolm_v1_public_key().to_base64(),
2419            backup_key.to_base64()
2420        );
2421        assert!(client1.encryption().backups().are_enabled().await);
2422    }
2423
2424    #[cfg(feature = "sqlite")]
2425    #[async_test]
2426    async fn test_generation_counter_no_spurious_invalidation() {
2427        // Create two clients using the same sqlite database.
2428
2429        use matrix_sdk_base::store::RoomLoadSettings;
2430        let sqlite_path =
2431            std::env::temp_dir().join("generation_counter_no_spurious_invalidations.db");
2432        let session = mock_matrix_session();
2433
2434        let client = Client::builder()
2435            .homeserver_url("http://localhost:1234")
2436            .request_config(RequestConfig::new().disable_retry())
2437            .sqlite_store(&sqlite_path, None)
2438            .build()
2439            .await
2440            .unwrap();
2441        client
2442            .matrix_auth()
2443            .restore_session(session.clone(), RoomLoadSettings::default())
2444            .await
2445            .unwrap();
2446
2447        let initial_olm_machine = client.olm_machine().await.as_ref().unwrap().clone();
2448
2449        client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2450
2451        // Enabling the lock doesn't update the olm machine.
2452        let after_enabling_lock = client.olm_machine().await.as_ref().unwrap().clone();
2453        assert!(initial_olm_machine.same_as(&after_enabling_lock));
2454
2455        {
2456            // Simulate that another client hold the lock before.
2457            let client2 = Client::builder()
2458                .homeserver_url("http://localhost:1234")
2459                .request_config(RequestConfig::new().disable_retry())
2460                .sqlite_store(sqlite_path, None)
2461                .build()
2462                .await
2463                .unwrap();
2464            client2
2465                .matrix_auth()
2466                .restore_session(session, RoomLoadSettings::default())
2467                .await
2468                .unwrap();
2469
2470            client2
2471                .encryption()
2472                .enable_cross_process_store_lock("client2".to_owned())
2473                .await
2474                .unwrap();
2475
2476            let guard = client2.encryption().spin_lock_store(None).await.unwrap();
2477            assert!(guard.is_some());
2478
2479            drop(guard);
2480            tokio::time::sleep(Duration::from_millis(100)).await;
2481        }
2482
2483        {
2484            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2485            assert!(acquired.is_some());
2486        }
2487
2488        // Taking the lock the first time will update the olm machine.
2489        let after_taking_lock_first_time = client.olm_machine().await.as_ref().unwrap().clone();
2490        assert!(!initial_olm_machine.same_as(&after_taking_lock_first_time));
2491
2492        {
2493            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2494            assert!(acquired.is_some());
2495        }
2496
2497        // Re-taking the lock doesn't update the olm machine.
2498        let after_taking_lock_second_time = client.olm_machine().await.as_ref().unwrap().clone();
2499        assert!(after_taking_lock_first_time.same_as(&after_taking_lock_second_time));
2500    }
2501
2502    #[async_test]
2503    async fn test_update_verification_state_is_updated_before_any_requests_happen() {
2504        // Given a client and a server
2505        let client = no_retry_test_client(None).await;
2506        let server = MockServer::start().await;
2507
2508        // When we subscribe to its verification state
2509        let mut verification_state = client.encryption().verification_state();
2510
2511        // We can get its initial value, and it's Unknown
2512        assert_next_matches_with_timeout!(verification_state, VerificationState::Unknown);
2513
2514        // We set up a mocked request to check this endpoint is not called before
2515        // reading the new state
2516        let keys_requested = Arc::new(AtomicBool::new(false));
2517        let inner_bool = keys_requested.clone();
2518
2519        Mock::given(method("GET"))
2520            .and(path_regex(
2521                r"/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
2522            ))
2523            .respond_with(move |_req: &Request| {
2524                inner_bool.fetch_or(true, Ordering::SeqCst);
2525                ResponseTemplate::new(200).set_body_json(json!({}))
2526            })
2527            .mount(&server)
2528            .await;
2529
2530        // When the session is initialised and the encryption tasks spawn
2531        set_client_session(&client).await;
2532
2533        // Then we can get an updated value without waiting for any network requests
2534        assert!(keys_requested.load(Ordering::SeqCst).not());
2535        assert_next_matches_with_timeout!(verification_state, VerificationState::Unverified);
2536    }
2537
2538    #[test]
2539    fn test_oauth_reset_info_from_uiaa_info() {
2540        let auth_info = json!({
2541            "session": "dummy",
2542            "flows": [
2543                {
2544                    "stages": [
2545                        "org.matrix.cross_signing_reset"
2546                    ]
2547                }
2548            ],
2549            "params": {
2550                "org.matrix.cross_signing_reset": {
2551                    "url": "https://example.org/account/account?action=org.matrix.cross_signing_reset"
2552                }
2553            },
2554            "msg": "To reset..."
2555        });
2556
2557        let auth_info = serde_json::from_value(auth_info)
2558            .expect("We should be able to deserialize the UiaaInfo");
2559        OAuthCrossSigningResetInfo::from_auth_info(&auth_info)
2560            .expect("We should be able to fetch the cross-signing reset info from the auth info");
2561    }
2562
2563    #[test]
2564    fn test_duplicate_one_time_key_error_parsing() {
2565        let message = concat!(
2566            r#"One time key signed_curve25519:AAAAAAAAAAA already exists. "#,
2567            r#"Old key: {"key":"dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs","#,
2568            r#""signatures":{"@example:matrix.org":{"ed25519:AAAAAAAAAA":""#,
2569            r#"Fk45zHAbrd+1j9wZXLjL2Y/+DU/Mnz9yuvlfYBOOT7qExN2Jdud+5BAuNs8nZ/caS4wTF39Kg3zQpzaGERoCBg"}}};"#,
2570            r#" new key: {'key': 'CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno', "#,
2571            r#"'signatures': {'@example:matrix.org': {'ed25519:AAAAAAAAAA': "#,
2572            r#"'BQ9Gp0p+6srF+c8OyruqKKd9R4yaub3THYAyyBB/7X/rG8BwcAqFynzl1aGyFYun4Q+087a5OSiglCXI+/kQAA'}}}"#
2573        );
2574        let message = DuplicateOneTimeKeyErrorMessage::from_str(message)
2575            .expect("We should be able to parse the error message");
2576
2577        assert_eq!(message.old_key.to_base64(), "dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs");
2578        assert_eq!(message.new_key.to_base64(), "CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno");
2579
2580        DuplicateOneTimeKeyErrorMessage::from_str("One time key already exists.")
2581            .expect_err("We shouldn't be able to parse an incomplete error message");
2582    }
2583
2584    // Helper function for the test_devices_to_verify_against_* tests.  Make a
2585    // response to a /keys/query request using the given device keys and a
2586    // pre-defined set of cross-signing keys.
2587    fn devices_to_verify_against_keys_query_response(
2588        devices: Vec<serde_json::Value>,
2589    ) -> serde_json::Value {
2590        let device_keys: serde_json::Map<String, serde_json::Value> = devices
2591            .into_iter()
2592            .map(|device| (device.get("device_id").unwrap().as_str().unwrap().to_owned(), device))
2593            .collect();
2594        json!({
2595            "device_keys": {
2596                "@example:localhost": device_keys,
2597            },
2598            "master_keys": {
2599                "@example:localhost": {
2600                    "keys": {
2601                        "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU",
2602                    },
2603                    "usage": ["master"],
2604                    "user_id": "@example:localhost",
2605                },
2606            },
2607            "self_signing_keys": {
2608                "@example:localhost": {
2609                    "keys": {
2610                        "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM",
2611                    },
2612                    "usage": ["self_signing"],
2613                    "user_id": "@example:localhost",
2614                    "signatures": {
2615                        "@example:localhost": {
2616                            "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "etO1bB+rCk+TQ/FcjQ8eWu/RsRNQNNQ1Ek+PD6//j8yz6igRjfvuHZaMvr/quAFrirfgExph2TdOwlDgN5bFCQ",
2617                        },
2618                    },
2619                },
2620            },
2621            "user_signing_keys": {
2622                "@example:localhost": {
2623                    "keys": {
2624                        "ed25519:CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0": "CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0",
2625                    },
2626                    "usage": ["user_signing"],
2627                    "user_id": "@example:localhost",
2628                    "signatures": {
2629                        "@example:localhost": {
2630                            "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "E/DFi/hQTIb/7eSB+HbCXeTLFaLjqWHzLO9GwjL1qdhfO7ew4p6YdtXSH3T2YYr1dKCPteH/4nMYVwOhww2CBg",
2631                        },
2632                    },
2633                },
2634            }
2635        })
2636    }
2637
2638    // The following three tests test that we can detect whether the user has
2639    // other devices that they can verify against under different conditions.
2640    #[async_test]
2641    /// Test that we detect that can't verify against another device if we have
2642    /// no devices.
2643    async fn test_devices_to_verify_against_no_devices() {
2644        let server = MockServer::start().await;
2645        let client = logged_in_client(Some(server.uri())).await;
2646
2647        Mock::given(method("POST"))
2648            .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2649            .respond_with(
2650                ResponseTemplate::new(200)
2651                    .set_body_json(devices_to_verify_against_keys_query_response(vec![])),
2652            )
2653            .mount(&server)
2654            .await;
2655
2656        assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2657    }
2658
2659    #[async_test]
2660    /// Test that we detect that we can verify against another cross-signed
2661    /// regular device.
2662    async fn test_devices_to_verify_against_cross_signed() {
2663        let server = MockServer::start().await;
2664        let client = logged_in_client(Some(server.uri())).await;
2665
2666        Mock::given(method("POST"))
2667            .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2668            .respond_with(ResponseTemplate::new(200).set_body_json(
2669                devices_to_verify_against_keys_query_response(vec![
2670                    json!({
2671                        "algorithms": [
2672                            "m.olm.v1.curve25519-aes-sha2",
2673                            "m.megolm.v1.aes-sha2",
2674                        ],
2675                        "user_id": "@example:localhost",
2676                        "device_id": "SIGNEDDEVICE",
2677                        "keys": {
2678                            "curve25519:SIGNEDDEVICE": "o1LqUtH/sqd3WF+BB2Qr77uw3sDmZhMOz68/IV9aHxs",
2679                            "ed25519:SIGNEDDEVICE": "iVoEfMOoUqxXVMLdpZCOgvQuCrT3/kQWkBmB3Phi/lo",
2680                        },
2681                        "signatures": {
2682                            "@example:localhost": {
2683                                "ed25519:SIGNEDDEVICE": "C7yRu1fNrdD2EobVdtANMqk3LBtWtTRWrIU22xVS8/Om1kmA/luzek64R3N6JsZhYczVmZYBKhUC9kRvHHwOBg",
2684                                "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "frfh2HP28GclmGvwTic00Fj4nZCvm4RlRA6U56mnD5920hOi04+L055ojzp6ybZXvC/GQYfyTHwQXlUN1nvxBA",
2685                            },
2686                        },
2687                    })
2688                ])
2689            ))
2690            .mount(&server)
2691            .await;
2692
2693        assert!(client.encryption().has_devices_to_verify_against().await.unwrap());
2694    }
2695
2696    #[async_test]
2697    /// Test that we detect that we can't verify against a dehydrated or
2698    /// unsigned device.
2699    async fn test_devices_to_verify_against_dehydrated_and_unsigned() {
2700        let server = MockServer::start().await;
2701        let client = logged_in_client(Some(server.uri())).await;
2702        let user_id = client.user_id().unwrap();
2703        let olm_machine = client.olm_machine().await;
2704        let olm_machine = olm_machine.as_ref().unwrap();
2705
2706        Mock::given(method("POST"))
2707            .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2708            .respond_with(ResponseTemplate::new(200).set_body_json(
2709                devices_to_verify_against_keys_query_response(vec![
2710                    json!({
2711                        "algorithms": [
2712                            "m.olm.v1.curve25519-aes-sha2",
2713                            "m.megolm.v1.aes-sha2",
2714                        ],
2715                        "user_id": "@example:localhost",
2716                        "device_id": "DEHYDRATEDDEVICE",
2717                        "keys": {
2718                            "curve25519:DEHYDRATEDDEVICE": "XOn5VguAgokZ3p9mBz2yOB395fn6j75G8jIPcXEWQGY",
2719                            "ed25519:DEHYDRATEDDEVICE": "4GG5xmBT7z4rgUgmWNlKZ+ABE3QlGgTorF+luCnKfYI",
2720                        },
2721                        "dehydrated": true,
2722                        "signatures": {
2723                            "@example:localhost": {
2724                                "ed25519:DEHYDRATEDDEVICE": "+OMasB7nzVlMV+zRDxkh4h8h/Q0bY42P1SPv7X2IURIelT5G+d+AYSmg30N4maphxEDBqt/vI8/lIr71exc3Dg",
2725                                "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "8DzynAgbYgXX1Md5d4Vw91Zstpoi4dpG7levFeVhi4psCAWuBnV76Qu1s2TGjQQ0CLDXEqcxxuX9X4eUK5TGCg",
2726                            },
2727                        },
2728                    }),
2729                    json!({
2730                        "algorithms": [
2731                            "m.olm.v1.curve25519-aes-sha2",
2732                            "m.megolm.v1.aes-sha2",
2733                        ],
2734                        "user_id": "@example:localhost",
2735                        "device_id": "UNSIGNEDDEVICE",
2736                        "keys": {
2737                            "curve25519:UNSIGNEDDEVICE": "mMby6NpprkHxj+ONfO9Z5lBqVUHJBMkrPFSNJhogBkg",
2738                            "ed25519:UNSIGNEDDEVICE": "Zifq39ZDrlIaSRf0Hh22owEqXCPE+1JSSgs6LDlubwQ",
2739                        },
2740                        "signatures": {
2741                            "@example:localhost": {
2742                                "ed25519:UNSIGNEDDEVICE": "+L29RoDKoTufPGm/Bae65KHno7Z1H7GYhxSKpB4RQZRS7NrR29AMW1PVhEsIozYuDVEFuMZ0L8H3dlcaHxagBA",
2743                            },
2744                        },
2745                    }),
2746                ])
2747            ))
2748            .mount(&server)
2749            .await;
2750
2751        let (request_id, request) = olm_machine.query_keys_for_users([user_id]);
2752        client.keys_query(&request_id, request.device_keys).await.unwrap();
2753
2754        assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2755    }
2756}