Skip to main content

matrix_sdk/encryption/
mod.rs

1// Copyright 2021 The Matrix.org Foundation C.I.C.
2// Copyright 2021 Damir Jelić
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![doc = include_str!("../docs/encryption.md")]
17#![cfg_attr(target_family = "wasm", allow(unused_imports))]
18
19#[cfg(feature = "experimental-send-custom-to-device")]
20use std::ops::Deref;
21use std::{
22    collections::{BTreeMap, HashSet},
23    io::{Cursor, Read, Write},
24    iter,
25    path::{Path, PathBuf},
26    str::FromStr,
27    sync::Arc,
28    time::Duration,
29};
30
31use eyeball::{SharedObservable, Subscriber};
32use futures_core::Stream;
33use futures_util::{
34    future::try_join,
35    stream::{self, StreamExt},
36};
37#[cfg(feature = "experimental-send-custom-to-device")]
38use matrix_sdk_base::crypto::CollectStrategy;
39use matrix_sdk_base::{
40    StateStoreDataKey, StateStoreDataValue,
41    cross_process_lock::{AcquireCrossProcessLockFn, CrossProcessLock, CrossProcessLockError},
42    crypto::{
43        CrossSigningBootstrapRequests, OlmMachine,
44        store::{
45            LockableCryptoStore, SecretImportError,
46            types::{RoomKeyBundleInfo, RoomKeyInfo},
47        },
48        types::{
49            SecretsBundle, SignedKey,
50            requests::{
51                OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
52            },
53        },
54    },
55    sleep::sleep,
56};
57use matrix_sdk_common::{executor::spawn, locks::Mutex as StdMutex};
58use ruma::{
59    DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, TransactionId, UserId,
60    api::{
61        client::{
62            keys::{
63                get_keys, upload_keys, upload_signatures::v3::Request as UploadSignaturesRequest,
64                upload_signing_keys::v3::Request as UploadSigningKeysRequest,
65            },
66            message::send_message_event,
67            to_device::send_event_to_device::v3::{
68                Request as RumaToDeviceRequest, Response as ToDeviceResponse,
69            },
70            uiaa::{AuthData, AuthType, OAuthParams, UiaaInfo},
71        },
72        error::{ErrorBody, StandardErrorBody},
73    },
74    assign,
75    events::room::{
76        MediaSource, ThumbnailInfo,
77        member::{MembershipChange, OriginalSyncRoomMemberEvent},
78    },
79};
80#[cfg(feature = "experimental-send-custom-to-device")]
81use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices};
82use serde::{Deserialize, de::Error as _};
83use tasks::BundleReceiverTask;
84use tokio::sync::{Mutex, RwLockReadGuard};
85use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
86use tracing::{Span, debug, error, instrument, warn};
87use url::Url;
88use vodozemac::Curve25519PublicKey;
89
90use self::{
91    backups::{Backups, types::BackupClientState},
92    futures::UploadEncryptedFile,
93    identities::{Device, DeviceUpdates, IdentityUpdates, UserDevices, UserIdentity},
94    recovery::{Recovery, RecoveryState},
95    secret_storage::SecretStorage,
96    tasks::{BackupDownloadTask, BackupUploadingTask, ClientTasks},
97    verification::{SasVerification, Verification, VerificationRequest},
98};
99use crate::{
100    Client, Error, HttpError, Result, Room, RumaApiError, TransmissionProgress,
101    attachment::Thumbnail,
102    client::{ClientInner, WeakClient},
103    cross_process_lock::CrossProcessLockGuard,
104    error::HttpResult,
105};
106
107pub mod backups;
108pub mod futures;
109pub mod identities;
110pub mod recovery;
111pub mod secret_storage;
112pub(crate) mod tasks;
113pub mod verification;
114
115pub use matrix_sdk_base::crypto::{
116    CrossSigningStatus, CryptoStoreError, DecryptorError, EventError, KeyExportError, LocalTrust,
117    MediaEncryptionInfo, MegolmError, OlmError, RoomKeyImportResult, SessionCreationError,
118    SignatureError, VERSION,
119    olm::{
120        SessionCreationError as MegolmSessionCreationError,
121        SessionExportError as OlmSessionExportError,
122    },
123    vodozemac,
124};
125use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
126
127#[cfg(feature = "experimental-send-custom-to-device")]
128use crate::config::RequestConfig;
129pub use crate::error::RoomKeyImportError;
130
131/// Error type describing failures that can happen while exporting a
132/// [`SecretsBundle`] from a SQLite store.
133#[cfg(feature = "sqlite")]
134#[derive(Debug, thiserror::Error)]
135pub enum BundleExportError {
136    /// The SQLite store couldn't be opened.
137    #[error(transparent)]
138    OpenStoreError(#[from] matrix_sdk_sqlite::OpenStoreError),
139    /// Data from the SQLite store couldn't be exported.
140    #[error(transparent)]
141    StoreError(#[from] CryptoStoreError),
142    /// The store doesn't contain a secrets bundle or it couldn't be read from
143    /// the store.
144    #[error(transparent)]
145    SecretExport(#[from] matrix_sdk_base::crypto::store::SecretsBundleExportError),
146}
147
148/// Error type describing failures that can happen while importing a
149/// [`SecretsBundle`].
150#[derive(Debug, thiserror::Error)]
151pub enum BundleImportError {
152    /// The bundle couldn't be imported.
153    #[error(transparent)]
154    SecretImport(#[from] SecretImportError),
155    /// The cross-signed device keys couldn't been uploaded.
156    #[error(transparent)]
157    DeviceKeys(#[from] Error),
158}
159
160/// Attempt to export a [`SecretsBundle`] from a crypto store.
161///
162/// This method can be used to retrieve a [`SecretsBundle`] from an existing
163/// `matrix-sdk`-based client in order to import the [`SecretsBundle`] in
164/// another [`Client`] instance.
165///
166/// This can be useful for migration purposes or to allow existing client
167/// instances create new ones that will be fully verified.
168#[cfg(feature = "sqlite")]
169pub async fn export_secrets_bundle_from_store(
170    database_path: impl AsRef<Path>,
171    passphrase: Option<&str>,
172) -> std::result::Result<Option<(OwnedUserId, SecretsBundle)>, BundleExportError> {
173    use matrix_sdk_base::crypto::store::CryptoStore;
174
175    let store = matrix_sdk_sqlite::SqliteCryptoStore::open(database_path, passphrase).await?;
176    let account =
177        store.load_account().await.map_err(|e| BundleExportError::StoreError(e.into()))?;
178
179    if let Some(account) = account {
180        let machine = OlmMachine::with_store(&account.user_id, &account.device_id, store, None)
181            .await
182            .map_err(BundleExportError::StoreError)?;
183
184        let bundle = machine.store().export_secrets_bundle().await?;
185
186        Ok(Some((account.user_id.to_owned(), bundle)))
187    } else {
188        Ok(None)
189    }
190}
191
192/// All the data related to the encryption state.
193pub(crate) struct EncryptionData {
194    /// Background tasks related to encryption (key backup, initialization
195    /// tasks, etc.).
196    pub tasks: StdMutex<ClientTasks>,
197
198    /// End-to-end encryption settings.
199    pub encryption_settings: EncryptionSettings,
200
201    /// All state related to key backup.
202    pub backup_state: BackupClientState,
203
204    /// All state related to secret storage recovery.
205    pub recovery_state: SharedObservable<RecoveryState>,
206}
207
208impl EncryptionData {
209    pub fn new(encryption_settings: EncryptionSettings) -> Self {
210        Self {
211            encryption_settings,
212
213            tasks: StdMutex::new(Default::default()),
214            backup_state: Default::default(),
215            recovery_state: Default::default(),
216        }
217    }
218
219    pub fn initialize_tasks(&self, client: &Arc<ClientInner>) {
220        let weak_client = WeakClient::from_inner(client);
221
222        let mut tasks = self.tasks.lock();
223        tasks.upload_room_keys = Some(BackupUploadingTask::new(weak_client.clone()));
224
225        if self.encryption_settings.backup_download_strategy
226            == BackupDownloadStrategy::AfterDecryptionFailure
227        {
228            tasks.download_room_keys = Some(BackupDownloadTask::new(weak_client));
229        }
230    }
231
232    /// Initialize the background task which listens for changes in the
233    /// [`backups::BackupState`] and updataes the [`recovery::RecoveryState`].
234    ///
235    /// This should happen after the usual tasks have been set up and after the
236    /// E2EE initialization tasks have been set up.
237    pub fn initialize_recovery_state_update_task(&self, client: &Client) {
238        let mut guard = self.tasks.lock();
239
240        let future = Recovery::update_state_after_backup_state_change(client);
241        let join_handle = spawn(future);
242
243        guard.update_recovery_state_after_backup = Some(join_handle);
244    }
245}
246
247/// Settings for end-to-end encryption features.
248#[derive(Clone, Copy, Debug, Default)]
249pub struct EncryptionSettings {
250    /// Automatically bootstrap cross-signing for a user once they're logged, in
251    /// case it's not already done yet.
252    ///
253    /// This requires to login with a username and password, or that MSC3967 is
254    /// enabled on the server, as of 2023-10-20.
255    pub auto_enable_cross_signing: bool,
256
257    /// Select a strategy to download room keys from the backup, by default room
258    /// keys won't be downloaded from the backup automatically.
259    ///
260    /// Take a look at the [`BackupDownloadStrategy`] enum for more options.
261    pub backup_download_strategy: BackupDownloadStrategy,
262
263    /// Automatically create a backup version if no backup exists.
264    pub auto_enable_backups: bool,
265}
266
267/// Settings for end-to-end encryption features.
268#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
269#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
270pub enum BackupDownloadStrategy {
271    /// Automatically download all room keys from the backup when the backup
272    /// recovery key has been received. The backup recovery key can be received
273    /// in two ways:
274    ///
275    /// 1. Received as a `m.secret.send` to-device event, after a successful
276    ///    interactive verification.
277    /// 2. Imported from secret storage (4S) using the
278    ///    [`SecretStore::import_secrets()`] method.
279    ///
280    /// [`SecretStore::import_secrets()`]: crate::encryption::secret_storage::SecretStore::import_secrets
281    OneShot,
282
283    /// Attempt to download a single room key if an event fails to be decrypted.
284    AfterDecryptionFailure,
285
286    /// Don't download any room keys automatically. The user can manually
287    /// download room keys using the [`Backups::download_room_key()`] methods.
288    ///
289    /// This is the default option.
290    #[default]
291    Manual,
292}
293
294/// The verification state of our own device
295///
296/// This enum tells us if our own user identity trusts these devices, in other
297/// words it tells us if the user identity has signed the device.
298#[derive(Clone, Copy, Debug, Eq, PartialEq)]
299pub enum VerificationState {
300    /// The verification state is unknown for now.
301    Unknown,
302    /// The device is considered to be verified, it has been signed by its user
303    /// identity.
304    Verified,
305    /// The device is unverified.
306    Unverified,
307}
308
309/// A stateful struct remembering the cross-signing keys we need to upload.
310///
311/// Since the `/_matrix/client/v3/keys/device_signing/upload` might require
312/// additional authentication, this struct will contain information on the type
313/// of authentication the user needs to complete before the upload might be
314/// continued.
315///
316/// More info can be found in the [spec].
317///
318/// [spec]: https://spec.matrix.org/v1.11/client-server-api/#post_matrixclientv3keysdevice_signingupload
319#[derive(Debug)]
320pub struct CrossSigningResetHandle {
321    client: Client,
322    upload_request: UploadSigningKeysRequest,
323    signatures_request: UploadSignaturesRequest,
324    auth_type: CrossSigningResetAuthType,
325    is_cancelled: Mutex<bool>,
326}
327
328impl CrossSigningResetHandle {
329    /// Set up a new `CrossSigningResetHandle`.
330    pub fn new(
331        client: Client,
332        upload_request: UploadSigningKeysRequest,
333        signatures_request: UploadSignaturesRequest,
334        auth_type: CrossSigningResetAuthType,
335    ) -> Self {
336        Self {
337            client,
338            upload_request,
339            signatures_request,
340            auth_type,
341            is_cancelled: Mutex::new(false),
342        }
343    }
344
345    /// Get the [`CrossSigningResetAuthType`] this cross-signing reset process
346    /// is using.
347    pub fn auth_type(&self) -> &CrossSigningResetAuthType {
348        &self.auth_type
349    }
350
351    /// Continue the cross-signing reset by either waiting for the
352    /// authentication to be done on the side of the OAuth 2.0 server or by
353    /// providing additional [`AuthData`] the homeserver requires.
354    pub async fn auth(&self, auth: Option<AuthData>) -> Result<()> {
355        // Poll to see whether the reset has been authorized twice per second.
356        const RETRY_EVERY: Duration = Duration::from_millis(500);
357
358        // Give up after two minutes of polling.
359        const TIMEOUT: Duration = Duration::from_mins(2);
360
361        tokio::time::timeout(TIMEOUT, async {
362            let mut upload_request = self.upload_request.clone();
363            upload_request.auth = auth;
364
365            debug!(
366                "Repeatedly PUTting to keys/device_signing/upload until it works \
367                or we hit a permanent failure."
368            );
369            while let Err(e) = self.client.send(upload_request.clone()).await {
370                if *self.is_cancelled.lock().await {
371                    return Ok(());
372                }
373
374                match e.as_uiaa_response() {
375                    Some(uiaa_info) => {
376                        if uiaa_info.auth_error.is_some() {
377                            return Err(e.into());
378                        }
379                    }
380                    None => return Err(e.into()),
381                }
382
383                debug!(
384                    "PUT to keys/device_signing/upload failed with 401. Retrying after \
385                    a short delay."
386                );
387                sleep(RETRY_EVERY).await;
388            }
389
390            self.client.send(self.signatures_request.clone()).await?;
391
392            Ok(())
393        })
394        .await
395        .unwrap_or_else(|_| {
396            warn!("Timed out waiting for keys/device_signing/upload to succeed.");
397            Err(Error::Timeout)
398        })
399    }
400
401    /// Cancel the ongoing identity reset process
402    pub async fn cancel(&self) {
403        *self.is_cancelled.lock().await = true;
404    }
405}
406
407/// information about the additional authentication that is required before the
408/// cross-signing keys can be uploaded.
409#[derive(Debug, Clone)]
410pub enum CrossSigningResetAuthType {
411    /// The homeserver requires user-interactive authentication.
412    Uiaa(UiaaInfo),
413    /// OAuth 2.0 is used for authentication and the user needs to open a URL to
414    /// approve the upload of cross-signing keys.
415    OAuth(OAuthCrossSigningResetInfo),
416}
417
418impl CrossSigningResetAuthType {
419    fn new(error: &HttpError) -> Result<Option<Self>> {
420        if let Some(auth_info) = error.as_uiaa_response() {
421            if let Ok(Some(auth_info)) = OAuthCrossSigningResetInfo::from_auth_info(auth_info) {
422                Ok(Some(CrossSigningResetAuthType::OAuth(auth_info)))
423            } else {
424                Ok(Some(CrossSigningResetAuthType::Uiaa(auth_info.clone())))
425            }
426        } else {
427            Ok(None)
428        }
429    }
430}
431
432/// OAuth 2.0 specific information about the required authentication for the
433/// upload of cross-signing keys.
434#[derive(Debug, Clone, Deserialize)]
435pub struct OAuthCrossSigningResetInfo {
436    /// The URL where the user can approve the reset of the cross-signing keys.
437    pub approval_url: Url,
438}
439
440impl OAuthCrossSigningResetInfo {
441    fn from_auth_info(auth_info: &UiaaInfo) -> Result<Option<Self>> {
442        let Some(parameters) = auth_info.params::<OAuthParams>(&AuthType::OAuth)? else {
443            return Ok(None);
444        };
445
446        Ok(Some(OAuthCrossSigningResetInfo { approval_url: parameters.url.as_str().try_into()? }))
447    }
448}
449
450/// A struct that helps to parse the custom error message Synapse posts if a
451/// duplicate one-time key is uploaded.
452#[derive(Clone, Debug)]
453pub struct DuplicateOneTimeKeyErrorMessage {
454    /// The previously uploaded one-time key.
455    pub old_key: Curve25519PublicKey,
456    /// The one-time key we're attempting to upload right now.
457    pub new_key: Curve25519PublicKey,
458}
459
460impl FromStr for DuplicateOneTimeKeyErrorMessage {
461    type Err = serde_json::Error;
462
463    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
464        // First we split the string into two parts, the part containing the old key and
465        // the part containing the new key. The parts are conveniently separated
466        // by a `;` character.
467        let mut split = s.split_terminator(';');
468
469        let old_key = split
470            .next()
471            .ok_or(serde_json::Error::custom("Old key is missing in the error message"))?;
472        let new_key = split
473            .next()
474            .ok_or(serde_json::Error::custom("New key is missing in the error message"))?;
475
476        // Now we remove the lengthy prefix from the part containing the old key, we
477        // should be left with just the JSON of the signed key.
478        let old_key_index = old_key
479            .find("Old key:")
480            .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
481
482        let old_key = old_key[old_key_index..]
483            .trim()
484            .strip_prefix("Old key:")
485            .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
486
487        // The part containing the new key is much simpler, we just remove a static
488        // prefix.
489        let new_key = new_key
490            .trim()
491            .strip_prefix("new key:")
492            .ok_or(serde_json::Error::custom("New key is missing the prefix"))?;
493
494        // The JSON containing the new key is for some reason quoted using single
495        // quotes, so let's replace them with normal double quotes.
496        let new_key = new_key.replace("'", "\"");
497
498        // Let's deserialize now.
499        let old_key: SignedKey = serde_json::from_str(old_key)?;
500        let new_key: SignedKey = serde_json::from_str(&new_key)?;
501
502        // Pick out the Curve keys, we don't care about the rest that much.
503        let old_key = old_key.key();
504        let new_key = new_key.key();
505
506        Ok(Self { old_key, new_key })
507    }
508}
509
510impl Client {
511    pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
512        self.base_client().olm_machine().await
513    }
514
515    pub(crate) async fn mark_request_as_sent(
516        &self,
517        request_id: &TransactionId,
518        response: impl Into<matrix_sdk_base::crypto::types::requests::AnyIncomingResponse<'_>>,
519    ) -> Result<(), matrix_sdk_base::Error> {
520        Ok(self
521            .olm_machine()
522            .await
523            .as_ref()
524            .expect(
525                "We should have an olm machine once we try to mark E2EE related requests as sent",
526            )
527            .mark_request_as_sent(request_id, response)
528            .await?)
529    }
530
531    /// Query the server for users device keys.
532    ///
533    /// # Panics
534    ///
535    /// Panics if no key query needs to be done.
536    #[instrument(skip(self, device_keys))]
537    pub(crate) async fn keys_query(
538        &self,
539        request_id: &TransactionId,
540        device_keys: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
541    ) -> Result<get_keys::v3::Response> {
542        let request = assign!(get_keys::v3::Request::new(), { device_keys });
543
544        let response = self.send(request).await?;
545        self.mark_request_as_sent(request_id, &response).await?;
546        self.encryption().update_state_after_keys_query(&response).await;
547
548        Ok(response)
549    }
550
551    /// Construct a [`EncryptedFile`][ruma::events::room::EncryptedFile] by
552    /// encrypting and uploading a provided reader.
553    ///
554    /// # Arguments
555    ///
556    /// * `content_type` - The content type of the file.
557    /// * `reader` - The reader that should be encrypted and uploaded.
558    ///
559    /// # Examples
560    ///
561    /// ```no_run
562    /// # use matrix_sdk::Client;
563    /// # use url::Url;
564    /// # use matrix_sdk::ruma::{room_id, OwnedRoomId};
565    /// use serde::{Deserialize, Serialize};
566    /// use matrix_sdk::ruma::events::{macros::EventContent, room::EncryptedFile};
567    ///
568    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
569    /// #[ruma_event(type = "com.example.custom", kind = MessageLike)]
570    /// struct CustomEventContent {
571    ///     encrypted_file: EncryptedFile,
572    /// }
573    ///
574    /// # async {
575    /// # let homeserver = Url::parse("http://example.com")?;
576    /// # let client = Client::new(homeserver).await?;
577    /// # let room = client.get_room(&room_id!("!test:example.com")).unwrap();
578    /// let mut reader = std::io::Cursor::new(b"Hello, world!");
579    /// let encrypted_file = client.upload_encrypted_file(&mut reader).await?;
580    ///
581    /// room.send(CustomEventContent { encrypted_file }).await?;
582    /// # anyhow::Ok(()) };
583    /// ```
584    pub fn upload_encrypted_file<'a, R: Read + ?Sized + 'a>(
585        &'a self,
586        reader: &'a mut R,
587    ) -> UploadEncryptedFile<'a, R> {
588        UploadEncryptedFile::new(self, reader)
589    }
590
591    /// Encrypt and upload the file and thumbnails, and return the source
592    /// information.
593    pub(crate) async fn upload_encrypted_media_and_thumbnail(
594        &self,
595        data: &[u8],
596        thumbnail: Option<Thumbnail>,
597        send_progress: SharedObservable<TransmissionProgress>,
598    ) -> Result<(MediaSource, Option<(MediaSource, Box<ThumbnailInfo>)>)> {
599        let upload_thumbnail = self.upload_encrypted_thumbnail(thumbnail, send_progress.clone());
600
601        let upload_attachment = async {
602            let mut cursor = Cursor::new(data);
603            self.upload_encrypted_file(&mut cursor)
604                .with_send_progress_observable(send_progress)
605                .await
606        };
607
608        let (thumbnail, file) = try_join(upload_thumbnail, upload_attachment).await?;
609
610        Ok((MediaSource::Encrypted(Box::new(file)), thumbnail))
611    }
612
613    /// Uploads an encrypted thumbnail to the media repository, and returns
614    /// its source and extra information.
615    async fn upload_encrypted_thumbnail(
616        &self,
617        thumbnail: Option<Thumbnail>,
618        send_progress: SharedObservable<TransmissionProgress>,
619    ) -> Result<Option<(MediaSource, Box<ThumbnailInfo>)>> {
620        let Some(thumbnail) = thumbnail else {
621            return Ok(None);
622        };
623
624        let (data, _, thumbnail_info) = thumbnail.into_parts();
625        let mut cursor = Cursor::new(data);
626
627        let file = self
628            .upload_encrypted_file(&mut cursor)
629            .with_send_progress_observable(send_progress)
630            .await?;
631
632        Ok(Some((MediaSource::Encrypted(Box::new(file)), thumbnail_info)))
633    }
634
635    /// Claim one-time keys creating new Olm sessions.
636    ///
637    /// # Arguments
638    ///
639    /// * `users` - The list of user/device pairs that we should claim keys for.
640    pub(crate) async fn claim_one_time_keys(
641        &self,
642        users: impl Iterator<Item = &UserId>,
643    ) -> Result<()> {
644        let _lock = self.locks().key_claim_lock.lock().await;
645
646        if let Some((request_id, request)) = self
647            .olm_machine()
648            .await
649            .as_ref()
650            .ok_or(Error::NoOlmMachine)?
651            .get_missing_sessions(users)
652            .await?
653        {
654            let response = self.send(request).await?;
655            self.mark_request_as_sent(&request_id, &response).await?;
656        }
657
658        Ok(())
659    }
660
661    /// Upload the E2E encryption keys.
662    ///
663    /// This uploads the long lived device keys as well as the required amount
664    /// of one-time keys.
665    ///
666    /// # Panics
667    ///
668    /// Panics if the client isn't logged in, or if no encryption keys need to
669    /// be uploaded.
670    #[instrument(skip(self, request))]
671    pub(crate) async fn keys_upload(
672        &self,
673        request_id: &TransactionId,
674        request: &upload_keys::v3::Request,
675    ) -> Result<upload_keys::v3::Response> {
676        debug!(
677            device_keys = request.device_keys.is_some(),
678            one_time_key_count = request.one_time_keys.len(),
679            "Uploading public encryption keys",
680        );
681
682        let response = self.send(request.clone()).await?;
683        self.mark_request_as_sent(request_id, &response).await?;
684
685        Ok(response)
686    }
687
688    pub(crate) async fn room_send_helper(
689        &self,
690        request: &RoomMessageRequest,
691    ) -> Result<send_message_event::v3::Response> {
692        let content = request.content.clone();
693        let txn_id = request.txn_id.clone();
694        let room_id = &request.room_id;
695
696        self.get_room(room_id)
697            .expect("Can't send a message to a room that isn't known to the store")
698            .send(*content)
699            .with_transaction_id(txn_id)
700            .await
701            .map(|result| result.response)
702    }
703
704    pub(crate) async fn send_to_device(
705        &self,
706        request: &ToDeviceRequest,
707    ) -> HttpResult<ToDeviceResponse> {
708        let request = RumaToDeviceRequest::new_raw(
709            request.event_type.clone(),
710            request.txn_id.clone(),
711            request.messages.clone(),
712        );
713
714        self.send(request).await
715    }
716
717    pub(crate) async fn send_verification_request(
718        &self,
719        request: OutgoingVerificationRequest,
720    ) -> Result<()> {
721        use matrix_sdk_base::crypto::types::requests::OutgoingVerificationRequest::*;
722
723        match request {
724            ToDevice(t) => {
725                self.send_to_device(&t).await?;
726            }
727            InRoom(r) => {
728                self.room_send_helper(&r).await?;
729            }
730        }
731
732        Ok(())
733    }
734
735    async fn send_outgoing_request(&self, r: OutgoingRequest) -> Result<()> {
736        use matrix_sdk_base::crypto::types::requests::AnyOutgoingRequest;
737
738        match r.request() {
739            AnyOutgoingRequest::KeysQuery(request) => {
740                self.keys_query(r.request_id(), request.device_keys.clone()).await?;
741            }
742            AnyOutgoingRequest::KeysUpload(request) => {
743                let response = self.keys_upload(r.request_id(), request).await;
744
745                if let Err(e) = &response {
746                    match e.as_ruma_api_error() {
747                        Some(RumaApiError::ClientApi(e)) if e.status_code == 400 => {
748                            if let ErrorBody::Standard(StandardErrorBody { message, .. }) = &e.body
749                            {
750                                // This is one of the nastiest errors we can have. The server
751                                // telling us that we already have a one-time key uploaded means
752                                // that we forgot about some of our one-time keys. This will lead to
753                                // UTDs.
754                                {
755                                    let already_reported = self
756                                        .state_store()
757                                        .get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded)
758                                        .await?
759                                        .is_some();
760
761                                    if message.starts_with("One time key") && !already_reported {
762                                        let error_message =
763                                            DuplicateOneTimeKeyErrorMessage::from_str(message);
764
765                                        if let Ok(message) = &error_message {
766                                            error!(
767                                                sentry = true,
768                                                old_key = %message.old_key,
769                                                new_key = %message.new_key,
770                                                "Duplicate one-time keys have been uploaded"
771                                            );
772                                        } else {
773                                            error!(
774                                                sentry = true,
775                                                "Duplicate one-time keys have been uploaded"
776                                            );
777                                        }
778
779                                        self.state_store()
780                                            .set_kv_data(
781                                                StateStoreDataKey::OneTimeKeyAlreadyUploaded,
782                                                StateStoreDataValue::OneTimeKeyAlreadyUploaded,
783                                            )
784                                            .await?;
785
786                                        if let Err(e) = self
787                                            .inner
788                                            .duplicate_key_upload_error_sender
789                                            .send(error_message.ok())
790                                        {
791                                            error!(
792                                                "Failed to dispatch duplicate key upload error notification: {}",
793                                                e
794                                            );
795                                        }
796                                    }
797                                }
798                            }
799                        }
800                        _ => {}
801                    }
802
803                    response?;
804                }
805            }
806            AnyOutgoingRequest::ToDeviceRequest(request) => {
807                let response = self.send_to_device(request).await?;
808                self.mark_request_as_sent(r.request_id(), &response).await?;
809            }
810            AnyOutgoingRequest::SignatureUpload(request) => {
811                let response = self.send(request.clone()).await?;
812                self.mark_request_as_sent(r.request_id(), &response).await?;
813            }
814            AnyOutgoingRequest::RoomMessage(request) => {
815                let response = self.room_send_helper(request).await?;
816                self.mark_request_as_sent(r.request_id(), &response).await?;
817            }
818            AnyOutgoingRequest::KeysClaim(request) => {
819                let response = self.send(request.clone()).await?;
820                self.mark_request_as_sent(r.request_id(), &response).await?;
821            }
822        }
823
824        Ok(())
825    }
826
827    #[instrument(skip_all)]
828    pub(crate) async fn send_outgoing_requests(&self) -> Result<()> {
829        const MAX_CONCURRENT_REQUESTS: usize = 20;
830
831        // This is needed because sometimes we need to automatically
832        // claim some one-time keys to unwedge an existing Olm session.
833        if let Err(e) = self.claim_one_time_keys(iter::empty()).await {
834            warn!("Error while claiming one-time keys {:?}", e);
835        }
836
837        let outgoing_requests = stream::iter(
838            self.olm_machine()
839                .await
840                .as_ref()
841                .ok_or(Error::NoOlmMachine)?
842                .outgoing_requests()
843                .await?,
844        )
845        .map(|r| self.send_outgoing_request(r));
846
847        let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS);
848
849        requests
850            .for_each(|r| async move {
851                match r {
852                    Ok(_) => (),
853                    Err(e) => warn!(error = ?e, "Error when sending out an outgoing E2EE request"),
854                }
855            })
856            .await;
857
858        Ok(())
859    }
860}
861
862#[cfg(any(feature = "testing", test))]
863impl Client {
864    /// Get the olm machine, for testing purposes only.
865    pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
866        self.olm_machine().await
867    }
868
869    /// Aborts the client's bundle receiver task, for testing purposes only.
870    pub fn abort_bundle_receiver_task(&self) {
871        let tasks = self.inner.e2ee.tasks.lock();
872        if let Some(task) = tasks.receive_historic_room_key_bundles.as_ref() {
873            task.abort()
874        }
875    }
876}
877
878/// A high-level API to manage the client's encryption.
879///
880/// To get this, use [`Client::encryption()`].
881#[derive(Debug, Clone)]
882pub struct Encryption {
883    /// The underlying client.
884    client: Client,
885}
886
887impl Encryption {
888    pub(crate) fn new(client: Client) -> Self {
889        Self { client }
890    }
891
892    /// Returns the current encryption settings for this client.
893    pub(crate) fn settings(&self) -> EncryptionSettings {
894        self.client.inner.e2ee.encryption_settings
895    }
896
897    /// Get the public ed25519 key of our own device. This is usually what is
898    /// called the fingerprint of the device.
899    pub async fn ed25519_key(&self) -> Option<String> {
900        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().ed25519.to_base64())
901    }
902
903    /// Get the public Curve25519 key of our own device.
904    pub async fn curve25519_key(&self) -> Option<Curve25519PublicKey> {
905        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().curve25519)
906    }
907
908    /// Get the current device creation timestamp.
909    pub async fn device_creation_timestamp(&self) -> MilliSecondsSinceUnixEpoch {
910        match self.get_own_device().await {
911            Ok(Some(device)) => device.first_time_seen_ts(),
912            // Should not happen, there should always be an own device
913            _ => MilliSecondsSinceUnixEpoch::now(),
914        }
915    }
916
917    /// This method will import all the private cross-signing keys and, if
918    /// available, the private part of a backup key and its accompanying
919    /// version into the store.
920    ///
921    /// Importing all the secrets will mark the device as verified and enable
922    /// backups if a backup key was available in the bundle.
923    ///
924    /// **Warning**: Only import this from a trusted source, i.e. if an existing
925    /// device is sharing this with a new device.
926    ///
927    /// **Warning*: Only call this method right after logging in and before the
928    /// initial sync has been started.
929    pub async fn import_secrets_bundle(
930        &self,
931        bundle: &SecretsBundle,
932    ) -> Result<(), BundleImportError> {
933        self.import_secrets_bundle_impl(bundle).await?;
934
935        // Upload the device keys, this will ensure that other devices see us as a fully
936        // verified device as soon as this method returns.
937        self.ensure_device_keys_upload().await?;
938        self.wait_for_e2ee_initialization_tasks().await;
939
940        // If our initialization tasks completed before we imported the secrets bundle,
941        // backups might not have been enabled.
942        //
943        // In this case attempt to enable them again.
944        if !self.backups().are_enabled().await {
945            self.backups().maybe_resume_backups().await?;
946        }
947
948        Ok(())
949    }
950
951    pub(crate) async fn import_secrets_bundle_impl(
952        &self,
953        bundle: &SecretsBundle,
954    ) -> Result<(), SecretImportError> {
955        let olm_machine = self.client.olm_machine().await;
956        let olm_machine =
957            olm_machine.as_ref().expect("This should only be called once we have an OlmMachine");
958
959        olm_machine.store().import_secrets_bundle(bundle).await
960    }
961
962    /// Get the status of the private cross signing keys.
963    ///
964    /// This can be used to check which private cross signing keys we have
965    /// stored locally.
966    pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
967        let olm = self.client.olm_machine().await;
968        let machine = olm.as_ref()?;
969        Some(machine.cross_signing_status().await)
970    }
971
972    /// Does the user have other devices that the current device can verify
973    /// against?
974    ///
975    /// The device must be signed by the user's cross-signing key, must have an
976    /// identity, and must not be a dehydrated device.
977    pub async fn has_devices_to_verify_against(&self) -> Result<bool> {
978        let olm_machine = self.client.olm_machine().await;
979        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
980        let user_id = olm_machine.user_id();
981
982        self.ensure_initial_key_query().await?;
983
984        let devices = self.get_user_devices(user_id).await?;
985
986        let ret = devices.devices().any(|device| {
987            device.is_cross_signed_by_owner()
988                && device.curve25519_key().is_some()
989                && !device.is_dehydrated()
990        });
991
992        Ok(ret)
993    }
994
995    /// Get all the tracked users we know about
996    ///
997    /// Tracked users are users for which we keep the device list of E2EE
998    /// capable devices up to date.
999    pub async fn tracked_users(&self) -> Result<HashSet<OwnedUserId>, CryptoStoreError> {
1000        if let Some(machine) = self.client.olm_machine().await.as_ref() {
1001            machine.tracked_users().await
1002        } else {
1003            Ok(HashSet::new())
1004        }
1005    }
1006
1007    /// Get a [`Subscriber`] for the [`VerificationState`].
1008    ///
1009    /// # Examples
1010    ///
1011    /// ```no_run
1012    /// use matrix_sdk::{Client, encryption};
1013    /// use url::Url;
1014    ///
1015    /// # async {
1016    /// let homeserver = Url::parse("http://example.com")?;
1017    /// let client = Client::new(homeserver).await?;
1018    /// let mut subscriber = client.encryption().verification_state();
1019    ///
1020    /// let current_value = subscriber.get();
1021    ///
1022    /// println!("The current verification state is: {current_value:?}");
1023    ///
1024    /// if let Some(verification_state) = subscriber.next().await {
1025    ///     println!("Received verification state update {:?}", verification_state)
1026    /// }
1027    /// # anyhow::Ok(()) };
1028    /// ```
1029    pub fn verification_state(&self) -> Subscriber<VerificationState> {
1030        self.client.inner.verification_state.subscribe_reset()
1031    }
1032
1033    /// Get a verification object with the given flow id.
1034    pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
1035        let olm = self.client.olm_machine().await;
1036        let olm = olm.as_ref()?;
1037        #[allow(clippy::bind_instead_of_map)]
1038        olm.get_verification(user_id, flow_id).and_then(|v| match v {
1039            matrix_sdk_base::crypto::Verification::SasV1(sas) => {
1040                Some(SasVerification { inner: sas, client: self.client.clone() }.into())
1041            }
1042            #[cfg(feature = "qrcode")]
1043            matrix_sdk_base::crypto::Verification::QrV1(qr) => {
1044                Some(verification::QrVerification { inner: qr, client: self.client.clone() }.into())
1045            }
1046            _ => None,
1047        })
1048    }
1049
1050    /// Get a `VerificationRequest` object for the given user with the given
1051    /// flow id.
1052    pub async fn get_verification_request(
1053        &self,
1054        user_id: &UserId,
1055        flow_id: impl AsRef<str>,
1056    ) -> Option<VerificationRequest> {
1057        let olm = self.client.olm_machine().await;
1058        let olm = olm.as_ref()?;
1059
1060        olm.get_verification_request(user_id, flow_id)
1061            .map(|r| VerificationRequest { inner: r, client: self.client.clone() })
1062    }
1063
1064    /// Get a specific device of a user.
1065    ///
1066    /// # Arguments
1067    ///
1068    /// * `user_id` - The unique id of the user that the device belongs to.
1069    ///
1070    /// * `device_id` - The unique id of the device.
1071    ///
1072    /// Returns a `Device` if one is found and the crypto store didn't throw an
1073    /// error.
1074    ///
1075    /// This will always return None if the client hasn't been logged in.
1076    ///
1077    /// # Examples
1078    ///
1079    /// ```no_run
1080    /// # use matrix_sdk::{Client, ruma::{device_id, user_id}};
1081    /// # use url::Url;
1082    /// # async {
1083    /// # let alice = user_id!("@alice:example.org");
1084    /// # let homeserver = Url::parse("http://example.com")?;
1085    /// # let client = Client::new(homeserver).await?;
1086    /// if let Some(device) =
1087    ///     client.encryption().get_device(alice, device_id!("DEVICEID")).await?
1088    /// {
1089    ///     println!("{:?}", device.is_verified());
1090    ///
1091    ///     if !device.is_verified() {
1092    ///         let verification = device.request_verification().await?;
1093    ///     }
1094    /// }
1095    /// # anyhow::Ok(()) };
1096    /// ```
1097    pub async fn get_device(
1098        &self,
1099        user_id: &UserId,
1100        device_id: &DeviceId,
1101    ) -> Result<Option<Device>, CryptoStoreError> {
1102        let olm = self.client.olm_machine().await;
1103        let Some(machine) = olm.as_ref() else { return Ok(None) };
1104        let device = machine.get_device(user_id, device_id, None).await?;
1105        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
1106    }
1107
1108    /// A convenience method to retrieve your own device from the store.
1109    ///
1110    /// This is the same as calling [`Encryption::get_device()`] with your own
1111    /// user and device ID.
1112    ///
1113    /// This will always return a device, unless you are not logged in.
1114    pub async fn get_own_device(&self) -> Result<Option<Device>, CryptoStoreError> {
1115        let olm = self.client.olm_machine().await;
1116        let Some(machine) = olm.as_ref() else { return Ok(None) };
1117        let device = machine.get_device(machine.user_id(), machine.device_id(), None).await?;
1118        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
1119    }
1120
1121    /// Get a map holding all the devices of an user.
1122    ///
1123    /// This will always return an empty map if the client hasn't been logged
1124    /// in.
1125    ///
1126    /// # Arguments
1127    ///
1128    /// * `user_id` - The unique id of the user that the devices belong to.
1129    ///
1130    /// # Examples
1131    ///
1132    /// ```no_run
1133    /// # use matrix_sdk::{Client, ruma::user_id};
1134    /// # use url::Url;
1135    /// # async {
1136    /// # let alice = user_id!("@alice:example.org");
1137    /// # let homeserver = Url::parse("http://example.com")?;
1138    /// # let client = Client::new(homeserver).await?;
1139    /// let devices = client.encryption().get_user_devices(alice).await?;
1140    ///
1141    /// for device in devices.devices() {
1142    ///     println!("{device:?}");
1143    /// }
1144    /// # anyhow::Ok(()) };
1145    /// ```
1146    pub async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices, Error> {
1147        let devices = self
1148            .client
1149            .olm_machine()
1150            .await
1151            .as_ref()
1152            .ok_or(Error::NoOlmMachine)?
1153            .get_user_devices(user_id, None)
1154            .await?;
1155
1156        Ok(UserDevices { inner: devices, client: self.client.clone() })
1157    }
1158
1159    /// Get the E2EE identity of a user from the crypto store.
1160    ///
1161    /// Usually, we only have the E2EE identity of a user locally if the user
1162    /// is tracked, meaning that we are both members of the same encrypted room.
1163    ///
1164    /// To get the E2EE identity of a user even if it is not available locally
1165    /// use [`Encryption::request_user_identity()`].
1166    ///
1167    /// # Arguments
1168    ///
1169    /// * `user_id` - The unique id of the user that the identity belongs to.
1170    ///
1171    /// Returns a `UserIdentity` if one is found and the crypto store
1172    /// didn't throw an error.
1173    ///
1174    /// This will always return None if the client hasn't been logged in.
1175    ///
1176    /// # Examples
1177    ///
1178    /// ```no_run
1179    /// # use matrix_sdk::{Client, ruma::user_id};
1180    /// # use url::Url;
1181    /// # async {
1182    /// # let alice = user_id!("@alice:example.org");
1183    /// # let homeserver = Url::parse("http://example.com")?;
1184    /// # let client = Client::new(homeserver).await?;
1185    /// let user = client.encryption().get_user_identity(alice).await?;
1186    ///
1187    /// if let Some(user) = user {
1188    ///     println!("{:?}", user.is_verified());
1189    ///
1190    ///     let verification = user.request_verification().await?;
1191    /// }
1192    /// # anyhow::Ok(()) };
1193    /// ```
1194    pub async fn get_user_identity(
1195        &self,
1196        user_id: &UserId,
1197    ) -> Result<Option<UserIdentity>, CryptoStoreError> {
1198        let olm = self.client.olm_machine().await;
1199        let Some(olm) = olm.as_ref() else { return Ok(None) };
1200        let identity = olm.get_identity(user_id, None).await?;
1201
1202        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1203    }
1204
1205    /// Get the E2EE identity of a user from the homeserver.
1206    ///
1207    /// The E2EE identity returned is always guaranteed to be up-to-date. If the
1208    /// E2EE identity is not found, it should mean that the user did not set
1209    /// up cross-signing.
1210    ///
1211    /// If you want the E2EE identity of a user without making a request to the
1212    /// homeserver, use [`Encryption::get_user_identity()`] instead.
1213    ///
1214    /// # Arguments
1215    ///
1216    /// * `user_id` - The ID of the user that the identity belongs to.
1217    ///
1218    /// Returns a [`UserIdentity`] if one is found. Returns an error if there
1219    /// was an issue with the crypto store or with the request to the
1220    /// homeserver.
1221    ///
1222    /// This will always return `None` if the client hasn't been logged in.
1223    ///
1224    /// # Examples
1225    ///
1226    /// ```no_run
1227    /// # use matrix_sdk::{Client, ruma::user_id};
1228    /// # use url::Url;
1229    /// # async {
1230    /// # let alice = user_id!("@alice:example.org");
1231    /// # let homeserver = Url::parse("http://example.com")?;
1232    /// # let client = Client::new(homeserver).await?;
1233    /// let user = client.encryption().request_user_identity(alice).await?;
1234    ///
1235    /// if let Some(user) = user {
1236    ///     println!("User is verified: {:?}", user.is_verified());
1237    ///
1238    ///     let verification = user.request_verification().await?;
1239    /// }
1240    /// # anyhow::Ok(()) };
1241    /// ```
1242    pub async fn request_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
1243        let olm = self.client.olm_machine().await;
1244        let Some(olm) = olm.as_ref() else { return Ok(None) };
1245
1246        let (request_id, request) = olm.query_keys_for_users(iter::once(user_id));
1247        self.client.keys_query(&request_id, request.device_keys).await?;
1248
1249        let identity = olm.get_identity(user_id, None).await?;
1250        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1251    }
1252
1253    /// Returns a stream of device updates, allowing users to listen for
1254    /// notifications about new or changed devices.
1255    ///
1256    /// The stream produced by this method emits updates whenever a new device
1257    /// is discovered or when an existing device's information is changed. Users
1258    /// can subscribe to this stream and receive updates in real-time.
1259    ///
1260    /// # Examples
1261    ///
1262    /// ```no_run
1263    /// # use matrix_sdk::Client;
1264    /// # use ruma::{device_id, user_id};
1265    /// # use futures_util::{pin_mut, StreamExt};
1266    /// # let client: Client = unimplemented!();
1267    /// # async {
1268    /// let devices_stream = client.encryption().devices_stream().await?;
1269    /// let user_id = client
1270    ///     .user_id()
1271    ///     .expect("We should know our user id after we have logged in");
1272    /// pin_mut!(devices_stream);
1273    ///
1274    /// for device_updates in devices_stream.next().await {
1275    ///     if let Some(user_devices) = device_updates.new.get(user_id) {
1276    ///         for device in user_devices.values() {
1277    ///             println!("A new device has been added {}", device.device_id());
1278    ///         }
1279    ///     }
1280    /// }
1281    /// # anyhow::Ok(()) };
1282    /// ```
1283    pub async fn devices_stream(&self) -> Result<impl Stream<Item = DeviceUpdates> + use<>> {
1284        let olm = self.client.olm_machine().await;
1285        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1286        let client = self.client.to_owned();
1287
1288        Ok(olm
1289            .store()
1290            .devices_stream()
1291            .map(move |updates| DeviceUpdates::new(client.to_owned(), updates)))
1292    }
1293
1294    /// Returns a stream of user identity updates, allowing users to listen for
1295    /// notifications about new or changed user identities.
1296    ///
1297    /// The stream produced by this method emits updates whenever a new user
1298    /// identity is discovered or when an existing identities information is
1299    /// changed. Users can subscribe to this stream and receive updates in
1300    /// real-time.
1301    ///
1302    /// # Examples
1303    ///
1304    /// ```no_run
1305    /// # use matrix_sdk::Client;
1306    /// # use ruma::{device_id, user_id};
1307    /// # use futures_util::{pin_mut, StreamExt};
1308    /// # let client: Client = unimplemented!();
1309    /// # async {
1310    /// let identities_stream =
1311    ///     client.encryption().user_identities_stream().await?;
1312    /// pin_mut!(identities_stream);
1313    ///
1314    /// for identity_updates in identities_stream.next().await {
1315    ///     for (_, identity) in identity_updates.new {
1316    ///         println!("A new identity has been added {}", identity.user_id());
1317    ///     }
1318    /// }
1319    /// # anyhow::Ok(()) };
1320    /// ```
1321    pub async fn user_identities_stream(
1322        &self,
1323    ) -> Result<impl Stream<Item = IdentityUpdates> + use<>> {
1324        let olm = self.client.olm_machine().await;
1325        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1326        let client = self.client.to_owned();
1327
1328        Ok(olm
1329            .store()
1330            .user_identities_stream()
1331            .map(move |updates| IdentityUpdates::new(client.to_owned(), updates)))
1332    }
1333
1334    /// Create and upload a new cross signing identity.
1335    ///
1336    /// # Arguments
1337    ///
1338    /// * `auth_data` - This request requires user interactive auth, the first
1339    ///   request needs to set this to `None` and will always fail with an
1340    ///   `UiaaResponse`. The response will contain information for the
1341    ///   interactive auth and the same request needs to be made but this time
1342    ///   with some `auth_data` provided.
1343    ///
1344    /// # Examples
1345    ///
1346    /// ```no_run
1347    /// # use std::collections::BTreeMap;
1348    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1349    /// # use url::Url;
1350    /// # use serde_json::json;
1351    /// # async {
1352    /// # let homeserver = Url::parse("http://example.com")?;
1353    /// # let client = Client::new(homeserver).await?;
1354    /// if let Err(e) = client.encryption().bootstrap_cross_signing(None).await {
1355    ///     if let Some(response) = e.as_uiaa_response() {
1356    ///         let mut password = uiaa::Password::new(
1357    ///             uiaa::UserIdentifier::Matrix(uiaa::MatrixUserIdentifier::new("example".to_owned())),
1358    ///             "wordpass".to_owned(),
1359    ///         );
1360    ///         password.session = response.session.clone();
1361    ///
1362    ///         client
1363    ///             .encryption()
1364    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1365    ///             .await
1366    ///             .expect("Couldn't bootstrap cross signing")
1367    ///     } else {
1368    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1369    ///     }
1370    /// }
1371    /// # anyhow::Ok(()) };
1372    pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData>) -> Result<()> {
1373        let olm = self.client.olm_machine().await;
1374        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1375
1376        let CrossSigningBootstrapRequests {
1377            upload_signing_keys_req,
1378            upload_keys_req,
1379            upload_signatures_req,
1380        } = olm.bootstrap_cross_signing(false).await?;
1381
1382        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1383            auth: auth_data,
1384            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1385            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1386            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1387        });
1388
1389        if let Some(req) = upload_keys_req {
1390            self.client.send_outgoing_request(req).await?;
1391        }
1392        self.client.send(upload_signing_keys_req).await?;
1393        self.client.send(upload_signatures_req).await?;
1394
1395        Ok(())
1396    }
1397
1398    /// Reset the cross-signing keys.
1399    ///
1400    /// # Example
1401    ///
1402    /// ```no_run
1403    /// # use matrix_sdk::{ruma::api::client::uiaa, Client, encryption::CrossSigningResetAuthType};
1404    /// # use url::Url;
1405    /// # async {
1406    /// # let homeserver = Url::parse("http://example.com")?;
1407    /// # let client = Client::new(homeserver).await?;
1408    /// # let user_id = unimplemented!();
1409    /// let encryption = client.encryption();
1410    ///
1411    /// if let Some(handle) = encryption.reset_cross_signing().await? {
1412    ///     match handle.auth_type() {
1413    ///         CrossSigningResetAuthType::Uiaa(uiaa) => {
1414    ///             use matrix_sdk::ruma::api::client::uiaa;
1415    ///
1416    ///             let password = "1234".to_owned();
1417    ///             let mut password = uiaa::Password::new(user_id, password);
1418    ///             password.session = uiaa.session;
1419    ///
1420    ///             handle.auth(Some(uiaa::AuthData::Password(password))).await?;
1421    ///         }
1422    ///         CrossSigningResetAuthType::OAuth(o) => {
1423    ///             println!(
1424    ///                 "To reset your end-to-end encryption cross-signing identity, \
1425    ///                 you first need to approve it at {}",
1426    ///                 o.approval_url
1427    ///             );
1428    ///             handle.auth(None).await?;
1429    ///         }
1430    ///     }
1431    /// }
1432    /// # anyhow::Ok(()) };
1433    /// ```
1434    pub async fn reset_cross_signing(&self) -> Result<Option<CrossSigningResetHandle>> {
1435        let olm = self.client.olm_machine().await;
1436        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1437
1438        let CrossSigningBootstrapRequests {
1439            upload_keys_req,
1440            upload_signing_keys_req,
1441            upload_signatures_req,
1442        } = olm.bootstrap_cross_signing(true).await?;
1443
1444        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1445            auth: None,
1446            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1447            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1448            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1449        });
1450
1451        if let Some(req) = upload_keys_req {
1452            self.client.send_outgoing_request(req).await?;
1453        }
1454
1455        if let Err(error) = self.client.send(upload_signing_keys_req.clone()).await {
1456            if let Ok(Some(auth_type)) = CrossSigningResetAuthType::new(&error) {
1457                let client = self.client.clone();
1458
1459                Ok(Some(CrossSigningResetHandle::new(
1460                    client,
1461                    upload_signing_keys_req,
1462                    upload_signatures_req,
1463                    auth_type,
1464                )))
1465            } else {
1466                Err(error.into())
1467            }
1468        } else {
1469            self.client.send(upload_signatures_req).await?;
1470
1471            Ok(None)
1472        }
1473    }
1474
1475    /// Query the user's own device keys, if, and only if, we didn't have their
1476    /// identity in the first place.
1477    async fn ensure_initial_key_query(&self) -> Result<()> {
1478        let olm_machine = self.client.olm_machine().await;
1479        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1480
1481        let user_id = olm_machine.user_id();
1482
1483        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1484            let (request_id, request) = olm_machine.query_keys_for_users([olm_machine.user_id()]);
1485            self.client.keys_query(&request_id, request.device_keys).await?;
1486        }
1487
1488        Ok(())
1489    }
1490
1491    /// Create and upload a new cross signing identity, if that has not been
1492    /// done yet.
1493    ///
1494    /// This will only create a new cross-signing identity if the user had never
1495    /// done it before. If the user did it before, then this is a no-op.
1496    ///
1497    /// See also the documentation of [`Self::bootstrap_cross_signing`] for the
1498    /// behavior of this function.
1499    ///
1500    /// # Arguments
1501    ///
1502    /// * `auth_data` - This request requires user interactive auth, the first
1503    ///   request needs to set this to `None` and will always fail with an
1504    ///   `UiaaResponse`. The response will contain information for the
1505    ///   interactive auth and the same request needs to be made but this time
1506    ///   with some `auth_data` provided.
1507    ///
1508    /// # Examples
1509    /// ```no_run
1510    /// # use std::collections::BTreeMap;
1511    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1512    /// # use url::Url;
1513    /// # use serde_json::json;
1514    /// # async {
1515    /// # let homeserver = Url::parse("http://example.com")?;
1516    /// # let client = Client::new(homeserver).await?;
1517    /// if let Err(e) = client.encryption().bootstrap_cross_signing_if_needed(None).await {
1518    ///     if let Some(response) = e.as_uiaa_response() {
1519    ///         let mut password = uiaa::Password::new(
1520    ///             uiaa::UserIdentifier::Matrix(uiaa::MatrixUserIdentifier::new("example".to_owned())),
1521    ///             "wordpass".to_owned(),
1522    ///         );
1523    ///         password.session = response.session.clone();
1524    ///
1525    ///         // Note, on the failed attempt we can use `bootstrap_cross_signing` immediately, to
1526    ///         // avoid checks.
1527    ///         client
1528    ///             .encryption()
1529    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1530    ///             .await
1531    ///             .expect("Couldn't bootstrap cross signing")
1532    ///     } else {
1533    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1534    ///     }
1535    /// }
1536    /// # anyhow::Ok(()) };
1537    pub async fn bootstrap_cross_signing_if_needed(
1538        &self,
1539        auth_data: Option<AuthData>,
1540    ) -> Result<()> {
1541        let olm_machine = self.client.olm_machine().await;
1542        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1543        let user_id = olm_machine.user_id();
1544
1545        self.ensure_initial_key_query().await?;
1546
1547        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1548            self.bootstrap_cross_signing(auth_data).await?;
1549        }
1550
1551        Ok(())
1552    }
1553
1554    /// Export E2EE keys that match the given predicate encrypting them with the
1555    /// given passphrase.
1556    ///
1557    /// # Arguments
1558    ///
1559    /// * `path` - The file path where the exported key file will be saved.
1560    ///
1561    /// * `passphrase` - The passphrase that will be used to encrypt the
1562    ///   exported room keys.
1563    ///
1564    /// * `predicate` - A closure that will be called for every known
1565    ///   `InboundGroupSession`, which represents a room key. If the closure
1566    ///   returns `true` the `InboundGroupSessoin` will be included in the
1567    ///   export, if the closure returns `false` it will not be included.
1568    ///
1569    /// # Panics
1570    ///
1571    /// This method will panic if it isn't run on a Tokio runtime.
1572    ///
1573    /// This method will panic if it can't get enough randomness from the OS to
1574    /// encrypt the exported keys securely.
1575    ///
1576    /// # Examples
1577    ///
1578    /// ```no_run
1579    /// # use std::{path::PathBuf, time::Duration};
1580    /// # use matrix_sdk::{
1581    /// #     Client, config::SyncSettings,
1582    /// #     ruma::room_id,
1583    /// # };
1584    /// # use url::Url;
1585    /// # async {
1586    /// # let homeserver = Url::parse("http://localhost:8080")?;
1587    /// # let mut client = Client::new(homeserver).await?;
1588    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1589    /// // Export all room keys.
1590    /// client
1591    ///     .encryption()
1592    ///     .export_room_keys(path, "secret-passphrase", |_| true)
1593    ///     .await?;
1594    ///
1595    /// // Export only the room keys for a certain room.
1596    /// let path = PathBuf::from("/home/example/e2e-room-keys.txt");
1597    /// let room_id = room_id!("!test:localhost");
1598    ///
1599    /// client
1600    ///     .encryption()
1601    ///     .export_room_keys(path, "secret-passphrase", |s| s.room_id() == room_id)
1602    ///     .await?;
1603    /// # anyhow::Ok(()) };
1604    /// ```
1605    #[cfg(not(target_family = "wasm"))]
1606    pub async fn export_room_keys(
1607        &self,
1608        path: PathBuf,
1609        passphrase: &str,
1610        predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
1611    ) -> Result<()> {
1612        let olm = self.client.olm_machine().await;
1613        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1614
1615        let keys = olm.store().export_room_keys(predicate).await?;
1616        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1617
1618        let encrypt = move || -> Result<()> {
1619            let export: String =
1620                matrix_sdk_base::crypto::encrypt_room_key_export(&keys, &passphrase, 500_000)?;
1621            let mut file = std::fs::File::create(path)?;
1622            file.write_all(&export.into_bytes())?;
1623            Ok(())
1624        };
1625
1626        let task = tokio::task::spawn_blocking(encrypt);
1627        task.await.expect("Task join error")
1628    }
1629
1630    /// Import E2EE keys from the given file path.
1631    ///
1632    /// # Arguments
1633    ///
1634    /// * `path` - The file path where the exported key file will can be found.
1635    ///
1636    /// * `passphrase` - The passphrase that should be used to decrypt the
1637    ///   exported room keys.
1638    ///
1639    /// Returns a tuple of numbers that represent the number of sessions that
1640    /// were imported and the total number of sessions that were found in the
1641    /// key export.
1642    ///
1643    /// # Panics
1644    ///
1645    /// This method will panic if it isn't run on a Tokio runtime.
1646    ///
1647    /// ```no_run
1648    /// # use std::{path::PathBuf, time::Duration};
1649    /// # use matrix_sdk::{
1650    /// #     Client, config::SyncSettings,
1651    /// #     ruma::room_id,
1652    /// # };
1653    /// # use url::Url;
1654    /// # async {
1655    /// # let homeserver = Url::parse("http://localhost:8080")?;
1656    /// # let mut client = Client::new(homeserver).await?;
1657    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1658    /// let result =
1659    ///     client.encryption().import_room_keys(path, "secret-passphrase").await?;
1660    ///
1661    /// println!(
1662    ///     "Imported {} room keys out of {}",
1663    ///     result.imported_count, result.total_count
1664    /// );
1665    /// # anyhow::Ok(()) };
1666    /// ```
1667    #[cfg(not(target_family = "wasm"))]
1668    pub async fn import_room_keys(
1669        &self,
1670        path: PathBuf,
1671        passphrase: &str,
1672    ) -> Result<RoomKeyImportResult, RoomKeyImportError> {
1673        let olm = self.client.olm_machine().await;
1674        let olm = olm.as_ref().ok_or(RoomKeyImportError::StoreClosed)?;
1675        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1676
1677        let decrypt = move || {
1678            let file = std::fs::File::open(path)?;
1679            matrix_sdk_base::crypto::decrypt_room_key_export(file, &passphrase)
1680        };
1681
1682        let task = tokio::task::spawn_blocking(decrypt);
1683        let import = task.await.expect("Task join error")?;
1684
1685        let ret = olm.store().import_exported_room_keys(import, |_, _| {}).await?;
1686
1687        self.backups().maybe_trigger_backup();
1688
1689        Ok(ret)
1690    }
1691
1692    /// Receive notifications of room keys being received as a [`Stream`].
1693    ///
1694    /// Each time a room key is updated in any way, an update will be sent to
1695    /// the stream. Updates that happen at the same time are batched into a
1696    /// [`Vec`].
1697    ///
1698    /// If the reader of the stream lags too far behind, an error is broadcast
1699    /// containing the number of skipped items.
1700    ///
1701    /// # Examples
1702    ///
1703    /// ```no_run
1704    /// # use matrix_sdk::Client;
1705    /// # use url::Url;
1706    /// # async {
1707    /// # let homeserver = Url::parse("http://example.com")?;
1708    /// # let client = Client::new(homeserver).await?;
1709    /// use futures_util::StreamExt;
1710    ///
1711    /// let Some(mut room_keys_stream) =
1712    ///     client.encryption().room_keys_received_stream().await
1713    /// else {
1714    ///     return Ok(());
1715    /// };
1716    ///
1717    /// while let Some(update) = room_keys_stream.next().await {
1718    ///     println!("Received room keys {update:?}");
1719    /// }
1720    /// # anyhow::Ok(()) };
1721    /// ```
1722    pub async fn room_keys_received_stream(
1723        &self,
1724    ) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<>>
1725    {
1726        let olm = self.client.olm_machine().await;
1727        let olm = olm.as_ref()?;
1728
1729        Some(olm.store().room_keys_received_stream())
1730    }
1731
1732    /// Receive notifications of historic room key bundles as a [`Stream`].
1733    ///
1734    /// Historic room key bundles are defined in [MSC4268](https://github.com/matrix-org/matrix-spec-proposals/pull/4268).
1735    ///
1736    /// Each time a historic room key bundle was received, an update will be
1737    /// sent to the stream. This stream is useful for informative purposes
1738    /// exclusively, historic room key bundles are handled by the SDK
1739    /// automatically.
1740    ///
1741    /// # Examples
1742    ///
1743    /// ```no_run
1744    /// # use matrix_sdk::Client;
1745    /// # use url::Url;
1746    /// # async {
1747    /// # let homeserver = Url::parse("http://example.com")?;
1748    /// # let client = Client::new(homeserver).await?;
1749    /// use futures_util::StreamExt;
1750    ///
1751    /// let Some(mut bundle_stream) =
1752    ///     client.encryption().historic_room_key_stream().await
1753    /// else {
1754    ///     return Ok(());
1755    /// };
1756    ///
1757    /// while let Some(bundle_info) = bundle_stream.next().await {
1758    ///     println!("Received a historic room key bundle {bundle_info:?}");
1759    /// }
1760    /// # anyhow::Ok(()) };
1761    /// ```
1762    pub async fn historic_room_key_stream(
1763        &self,
1764    ) -> Option<impl Stream<Item = RoomKeyBundleInfo> + use<>> {
1765        let olm = self.client.olm_machine().await;
1766        let olm = olm.as_ref()?;
1767
1768        Some(olm.store().historic_room_key_stream())
1769    }
1770
1771    /// Get the secret storage manager of the client.
1772    pub fn secret_storage(&self) -> SecretStorage {
1773        SecretStorage { client: self.client.to_owned() }
1774    }
1775
1776    /// Get the backups manager of the client.
1777    pub fn backups(&self) -> Backups {
1778        Backups { client: self.client.to_owned() }
1779    }
1780
1781    /// Get the recovery manager of the client.
1782    pub fn recovery(&self) -> Recovery {
1783        Recovery { client: self.client.to_owned() }
1784    }
1785
1786    /// Enables the crypto-store cross-process lock.
1787    ///
1788    /// This may be required if there are multiple processes that may do writes
1789    /// to the same crypto store. In that case, it's necessary to create a
1790    /// lock, so that only one process writes to it, otherwise this may
1791    /// cause confusing issues because of stale data contained in in-memory
1792    /// caches.
1793    ///
1794    /// The provided `lock_value` must be a unique identifier for this process.
1795    /// Use [`Client::cross_process_lock_config`] to get the global value, if
1796    /// multi-process is enabled.
1797    pub async fn enable_cross_process_store_lock(&self, lock_value: String) -> Result<(), Error> {
1798        // If the lock has already been created, don't recreate it from scratch.
1799        if let Some(prev_lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1800            let prev_holder = prev_lock.lock_holder();
1801            if prev_holder.is_some() && prev_holder.unwrap() == lock_value {
1802                return Ok(());
1803            }
1804            warn!(
1805                "Recreating cross-process store lock with a different holder value: \
1806                 prev was {prev_holder:?}, new is {lock_value}"
1807            );
1808        }
1809
1810        let olm_machine = self.client.base_client().olm_machine().await;
1811        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1812
1813        let lock = olm_machine.store().create_store_lock(
1814            "cross_process_lock".to_owned(),
1815            CrossProcessLockConfig::multi_process(lock_value.to_owned()),
1816        );
1817
1818        // Gently try to initialize the crypto store generation counter.
1819        //
1820        // If we don't get the lock immediately, then it is already acquired by another
1821        // process, and we'll get to reload next time we acquire the lock.
1822        {
1823            let lock_result = lock.try_lock_once().await?;
1824
1825            if lock_result.is_ok() {
1826                olm_machine
1827                    .initialize_crypto_store_generation(
1828                        &self.client.locks().crypto_store_generation,
1829                    )
1830                    .await?;
1831            }
1832        }
1833
1834        self.client
1835            .locks()
1836            .cross_process_crypto_store_lock
1837            .set(lock)
1838            .map_err(|_| Error::BadCryptoStoreState)?;
1839
1840        Ok(())
1841    }
1842
1843    /// Maybe reload the `OlmMachine` after acquiring the lock for the first
1844    /// time.
1845    ///
1846    /// Returns the current generation number.
1847    #[instrument(skip(self), fields(olm_machine_new_generation, olm_machine_generation))]
1848    async fn on_lock_newly_acquired(&self) -> Result<u64, Error> {
1849        let olm_machine_guard = self.client.olm_machine().await;
1850        if let Some(olm_machine) = olm_machine_guard.as_ref() {
1851            let (new_gen, generation_number) = olm_machine
1852                .maintain_crypto_store_generation(&self.client.locks().crypto_store_generation)
1853                .await?;
1854
1855            Span::current()
1856                .record("olm_machine_new_generation", new_gen)
1857                .record("olm_machine_generation", generation_number);
1858            debug!("OlmMachine generation maintained in CryptoStore");
1859
1860            // If the crypto store generation has changed,
1861            if new_gen {
1862                // (get rid of the reference to the current crypto store first)
1863                drop(olm_machine_guard);
1864                // Recreate the OlmMachine.
1865                self.client.base_client().regenerate_olm(None).await?;
1866            }
1867            Ok(generation_number)
1868        } else {
1869            // XXX: not sure this is reachable. Seems like the OlmMachine should always have
1870            // been initialised by the time we get here. Ideally we'd panic, or return an
1871            // error, but for now I'm just adding some logging to check if it
1872            // happens, and returning the magic number 0.
1873            warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised");
1874            Ok(0)
1875        }
1876    }
1877
1878    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1879    /// spin-waits until the lock is available.
1880    ///
1881    /// May reload the `OlmMachine`, after obtaining the lock but not on the
1882    /// first time.
1883    ///
1884    /// Returns a guard to the lock, if it was obtained.
1885    pub async fn spin_lock_store(
1886        &self,
1887        max_backoff: Option<u32>,
1888    ) -> Result<Option<CrossProcessLockGuard>, Error> {
1889        self.lock_store(async move |lock| lock.spin_lock(max_backoff).await).await
1890    }
1891
1892    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1893    /// attempts to lock it once.
1894    ///
1895    /// May reload the `OlmMachine`, after obtaining the lock but not on the
1896    /// first time.
1897    ///
1898    /// Returns a guard to the lock, if it was obtained.
1899    pub async fn try_lock_store_once(&self) -> Result<Option<CrossProcessLockGuard>, Error> {
1900        match self.lock_store(CrossProcessLock::try_lock_once).await {
1901            Err(Error::CrossProcessLockError(e))
1902                if matches!(*e, CrossProcessLockError::Unobtained(_)) =>
1903            {
1904                Ok(None)
1905            }
1906            other => other,
1907        }
1908    }
1909
1910    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1911    /// locks the store with the given function, `acquire`.
1912    ///
1913    /// Reloads the `OlmMachine` after obtaining the lock, if the lock is dirty.
1914    ///
1915    /// Returns a guard to the lock if it was obtained.
1916    pub async fn lock_store<F: AcquireCrossProcessLockFn<LockableCryptoStore>>(
1917        &self,
1918        acquire: F,
1919    ) -> Result<Option<CrossProcessLockGuard>, Error> {
1920        let wrap_err = |e: CryptoStoreError| {
1921            Error::CrossProcessLockError(Box::new(CrossProcessLockError::TryLock(Arc::new(e))))
1922        };
1923        if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1924            let guard = acquire(lock).await.map_err(wrap_err)??;
1925            let _ = self.on_lock_newly_acquired().await?;
1926            Ok(Some(guard.into_guard()))
1927        } else {
1928            Ok(None)
1929        }
1930    }
1931
1932    /// Testing purposes only.
1933    #[cfg(any(test, feature = "testing"))]
1934    pub async fn uploaded_key_count(&self) -> Result<u64> {
1935        let olm_machine = self.client.olm_machine().await;
1936        let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?;
1937        Ok(olm_machine.uploaded_key_count().await?)
1938    }
1939
1940    /// Bootstrap encryption and enables event listeners for the E2EE support.
1941    ///
1942    /// Based on the `EncryptionSettings`, this call might:
1943    /// - Bootstrap cross-signing if needed (POST `/device_signing/upload`)
1944    /// - Create a key backup if needed (POST `/room_keys/version`)
1945    /// - Create a secret storage if needed (PUT `/account_data/{type}`)
1946    ///
1947    /// As part of this process, and if needed, the current device keys would be
1948    /// uploaded to the server, new account data would be added, and cross
1949    /// signing keys and signatures might be uploaded.
1950    ///
1951    /// Should be called once we
1952    /// created a [`OlmMachine`], i.e. after logging in.
1953    ///
1954    /// # Arguments
1955    ///
1956    /// * `auth_data` - Some requests may require re-authentication. To prevent
1957    ///   the user from having to re-enter their password (or use other
1958    ///   methods), we can provide the authentication data here. This is
1959    ///   necessary for uploading cross-signing keys. However, please note that
1960    ///   there is a proposal (MSC3967) to remove this requirement, which would
1961    ///   allow for the initial upload of cross-signing keys without
1962    ///   authentication, rendering this parameter obsolete.
1963    pub(crate) async fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1964        // It's fine to be async here as we're only getting the lock protecting the
1965        // `OlmMachine`. Since the lock shouldn't be that contested right after logging
1966        // in we won't delay the login or restoration of the Client.
1967        let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite {
1968            Some(BundleReceiverTask::new(&self.client).await)
1969        } else {
1970            None
1971        };
1972
1973        let mut tasks = self.client.inner.e2ee.tasks.lock();
1974
1975        let this = self.clone();
1976
1977        tasks.setup_e2ee = Some(spawn(async move {
1978            // Update the current state first, so we don't have to wait for the result of
1979            // network requests
1980            this.update_verification_state().await;
1981
1982            if this.settings().auto_enable_cross_signing
1983                && let Err(e) = this.bootstrap_cross_signing_if_needed(auth_data).await
1984            {
1985                error!("Couldn't bootstrap cross signing {e:?}");
1986            }
1987
1988            if let Err(e) = this.backups().setup_and_resume().await {
1989                error!("Couldn't setup and resume backups {e:?}");
1990            }
1991            if let Err(e) = this.recovery().setup().await {
1992                error!("Couldn't setup and resume recovery {e:?}");
1993            }
1994        }));
1995
1996        tasks.receive_historic_room_key_bundles = bundle_receiver_task;
1997
1998        self.setup_room_membership_session_discard_handler();
1999    }
2000
2001    /// Waits for end-to-end encryption initialization tasks to finish, if any
2002    /// was running in the background.
2003    pub async fn wait_for_e2ee_initialization_tasks(&self) {
2004        let task = self.client.inner.e2ee.tasks.lock().setup_e2ee.take();
2005
2006        if let Some(task) = task
2007            && let Err(err) = task.await
2008        {
2009            warn!("Error when initializing backups: {err}");
2010        }
2011    }
2012
2013    /// Upload the device keys and initial set of one-time keys to the server.
2014    ///
2015    /// This should only be called when the user logs in for the first time,
2016    /// the method will ensure that other devices see our own device as an
2017    /// end-to-end encryption enabled one.
2018    ///
2019    /// **Warning**: Do not use this method if we're already calling
2020    /// [`Client::send_outgoing_request()`]. This method is intended for
2021    /// explicitly uploading the device keys before starting a sync.
2022    pub(crate) async fn ensure_device_keys_upload(&self) -> Result<()> {
2023        let olm = self.client.olm_machine().await;
2024        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
2025
2026        if let Some((request_id, request)) = olm.upload_device_keys().await? {
2027            self.client.keys_upload(&request_id, &request).await?;
2028
2029            let (request_id, request) = olm.query_keys_for_users([olm.user_id()]);
2030            self.client.keys_query(&request_id, request.device_keys).await?;
2031        }
2032
2033        Ok(())
2034    }
2035
2036    pub(crate) async fn update_state_after_keys_query(&self, response: &get_keys::v3::Response) {
2037        self.recovery().update_state_after_keys_query(response).await;
2038
2039        // Only update the verification_state if our own devices changed
2040        if let Some(user_id) = self.client.user_id() {
2041            let contains_own_device = response.device_keys.contains_key(user_id);
2042
2043            if contains_own_device {
2044                self.update_verification_state().await;
2045            }
2046        }
2047    }
2048
2049    async fn update_verification_state(&self) {
2050        match self.get_own_device().await {
2051            Ok(device) => {
2052                if let Some(device) = device {
2053                    let is_verified = device.is_cross_signed_by_owner();
2054
2055                    if is_verified {
2056                        self.client.inner.verification_state.set(VerificationState::Verified);
2057                    } else {
2058                        self.client.inner.verification_state.set(VerificationState::Unverified);
2059                    }
2060                } else {
2061                    warn!("Couldn't find out own device in the store.");
2062                    self.client.inner.verification_state.set(VerificationState::Unknown);
2063                }
2064            }
2065            Err(error) => {
2066                warn!("Failed retrieving own device: {error}");
2067                self.client.inner.verification_state.set(VerificationState::Unknown);
2068            }
2069        }
2070    }
2071
2072    /// Sets up a handler to rotate room keys when a user leaves a room.
2073    ///
2074    /// Previously, it was sufficient to check if we need to rotate the room key
2075    /// prior to sending a message. However, the history sharing feature
2076    /// ([MSC4268]) breaks this logic:
2077    ///
2078    /// 1. Alice sends a message M1 in room X;
2079    /// 2. Bob invites Charlie, who joins and immediately leaves the room;
2080    /// 3. Alice sends another message M2 in room X.
2081    ///
2082    /// Under the old logic, Alice would not rotate her key after Charlie
2083    /// leaves, resulting in M2 being encrypted with the same session as M1.
2084    /// This would allow Charlie to decrypt M2 if he ever gains access to
2085    /// the event.
2086    ///
2087    /// This handler listens for changes to the room membership, and discards
2088    /// the current room key if the event is a `leave` event.
2089    ///
2090    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
2091    fn setup_room_membership_session_discard_handler(&self) {
2092        let client = WeakClient::from_client(&self.client);
2093        self.client.add_event_handler(|ev: OriginalSyncRoomMemberEvent, room: Room| async move {
2094            let Some(client) = client.get() else {
2095                // The main client has been dropped.
2096                return;
2097            };
2098            let Some(user_id) = client.user_id() else {
2099                // We aren't logged in, so this shouldn't ever happen.
2100                return;
2101            };
2102            let olm = client.olm_machine().await;
2103            let Some(olm) = olm.as_ref() else {
2104                warn!("Cannot discard session - Olm machine is not available");
2105                return;
2106            };
2107
2108            if matches!(
2109                ev.membership_change(),
2110                MembershipChange::Joined |
2111                MembershipChange::Invited |
2112                MembershipChange::KnockAccepted |
2113                MembershipChange::InvitationAccepted |
2114                MembershipChange::ProfileChanged { .. }
2115            ) || ev.sender == user_id {
2116                // We can ignore events that did not remove us, and those that we sent.
2117                return;
2118            }
2119
2120            debug!(room_id = ?room.room_id(), member_id = ?ev.sender, "Discarding session as a user left the room");
2121
2122            // Attempt to discard the current room key. This won't do anything if we don't have one,
2123            // but that's fine since we will create a new room key whenever we try to send a message.
2124            if let Err(e) = olm.discard_room_key(room.room_id()).await {
2125                warn!(
2126                    room_id = ?room.room_id(),
2127                    "Error discarding room key after member leave: {e:?}"
2128                );
2129            }
2130        });
2131    }
2132
2133    /// Encrypts then send the given content via the `/sendToDevice` end-point
2134    /// using Olm encryption.
2135    ///
2136    /// If there are a lot of recipient devices multiple `/sendToDevice`
2137    /// requests might be sent out.
2138    ///
2139    /// # Returns
2140    /// A list of failures. The list of devices that couldn't get the messages.
2141    #[cfg(feature = "experimental-send-custom-to-device")]
2142    pub async fn encrypt_and_send_raw_to_device(
2143        &self,
2144        recipient_devices: Vec<&Device>,
2145        event_type: &str,
2146        content: Raw<AnyToDeviceEventContent>,
2147        share_strategy: CollectStrategy,
2148    ) -> Result<Vec<(OwnedUserId, OwnedDeviceId)>> {
2149        let users = recipient_devices.iter().map(|device| device.user_id());
2150
2151        // Will claim one-time-key for users that needs it
2152        // TODO: For later optimisation: This will establish missing olm sessions with
2153        // all this users devices, but we just want for some devices.
2154        self.client.claim_one_time_keys(users).await?;
2155
2156        let olm = self.client.olm_machine().await;
2157        let olm = olm.as_ref().expect("Olm machine wasn't started");
2158
2159        let (requests, withhelds) = olm
2160            .encrypt_content_for_devices(
2161                recipient_devices.into_iter().map(|d| d.deref().clone()).collect(),
2162                event_type,
2163                &content
2164                    .deserialize_as::<serde_json::Value>()
2165                    .expect("Deserialize as Value will always work"),
2166                share_strategy,
2167            )
2168            .await?;
2169
2170        let mut failures: Vec<(OwnedUserId, OwnedDeviceId)> = Default::default();
2171
2172        // Push the withhelds in the failures
2173        withhelds.iter().for_each(|(d, _)| {
2174            failures.push((d.user_id().to_owned(), d.device_id().to_owned()));
2175        });
2176
2177        // TODO: parallelize that? it's already grouping 250 devices per chunk.
2178        for request in requests {
2179            let ruma_request = RumaToDeviceRequest::new_raw(
2180                request.event_type.clone(),
2181                request.txn_id.clone(),
2182                request.messages.clone(),
2183            );
2184
2185            let send_result = self
2186                .client
2187                .send_inner(ruma_request, Some(RequestConfig::short_retry()), Default::default())
2188                .await;
2189
2190            // If the sending failed we need to collect the failures to report them
2191            if send_result.is_err() {
2192                // Mark the sending as failed
2193                for (user_id, device_map) in request.messages {
2194                    for device_id in device_map.keys() {
2195                        match device_id {
2196                            DeviceIdOrAllDevices::DeviceId(device_id) => {
2197                                failures.push((user_id.clone(), device_id.to_owned()));
2198                            }
2199                            DeviceIdOrAllDevices::AllDevices => {
2200                                // Cannot happen in this case
2201                            }
2202                        }
2203                    }
2204                }
2205            }
2206        }
2207
2208        Ok(failures)
2209    }
2210}
2211
2212#[cfg(all(test, not(target_family = "wasm")))]
2213mod tests {
2214    use std::{
2215        ops::Not,
2216        str::FromStr,
2217        sync::{
2218            Arc,
2219            atomic::{AtomicBool, Ordering},
2220        },
2221        time::Duration,
2222    };
2223
2224    use matrix_sdk_test::{
2225        DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
2226        event_factory::EventFactory,
2227    };
2228    use ruma::{
2229        event_id,
2230        events::{reaction::ReactionEventContent, relation::Annotation},
2231        user_id,
2232    };
2233    use serde_json::json;
2234    use wiremock::{
2235        Mock, MockServer, Request, ResponseTemplate,
2236        matchers::{header, method, path_regex},
2237    };
2238
2239    use crate::{
2240        Client, assert_next_matches_with_timeout,
2241        config::RequestConfig,
2242        encryption::{
2243            DuplicateOneTimeKeyErrorMessage, OAuthCrossSigningResetInfo, VerificationState,
2244        },
2245        test_utils::{
2246            client::mock_matrix_session, logged_in_client, no_retry_test_client, set_client_session,
2247        },
2248    };
2249
2250    #[async_test]
2251    async fn test_reaction_sending() {
2252        let server = MockServer::start().await;
2253        let client = logged_in_client(Some(server.uri())).await;
2254
2255        let event_id = event_id!("$2:example.org");
2256
2257        Mock::given(method("GET"))
2258            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
2259            .and(header("authorization", "Bearer 1234"))
2260            .respond_with(
2261                ResponseTemplate::new(200)
2262                    .set_body_json(EventFactory::new().room_encryption().into_content()),
2263            )
2264            .mount(&server)
2265            .await;
2266
2267        Mock::given(method("PUT"))
2268            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/m\.reaction/.*".to_owned()))
2269            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2270                "event_id": event_id,
2271            })))
2272            .mount(&server)
2273            .await;
2274
2275        let f = EventFactory::new().sender(user_id!("@example:localhost"));
2276        let response = SyncResponseBuilder::default()
2277            .add_joined_room(
2278                JoinedRoomBuilder::default()
2279                    .add_state_event(
2280                        f.member(user_id!("@example:localhost")).display_name("example"),
2281                    )
2282                    .add_state_event(f.default_power_levels())
2283                    .add_state_event(f.room_encryption()),
2284            )
2285            .build_sync_response();
2286
2287        client.base_client().receive_sync_response(response).await.unwrap();
2288
2289        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
2290        assert!(
2291            room.latest_encryption_state().await.expect("Getting encryption state").is_encrypted()
2292        );
2293
2294        let event_id = event_id!("$1:example.org");
2295        let reaction = ReactionEventContent::new(Annotation::new(event_id.into(), "🐈".to_owned()));
2296        room.send(reaction).await.expect("Sending the reaction should not fail");
2297
2298        room.send_raw("m.reaction", json!({})).await.expect("Sending the reaction should not fail");
2299    }
2300
2301    #[cfg(feature = "sqlite")]
2302    #[async_test]
2303    async fn test_generation_counter_invalidates_olm_machine() {
2304        // Create two clients using the same sqlite database.
2305
2306        use matrix_sdk_base::store::RoomLoadSettings;
2307        let sqlite_path = std::env::temp_dir().join("generation_counter_sqlite.db");
2308        let session = mock_matrix_session();
2309
2310        let client1 = Client::builder()
2311            .homeserver_url("http://localhost:1234")
2312            .request_config(RequestConfig::new().disable_retry())
2313            .sqlite_store(&sqlite_path, None)
2314            .build()
2315            .await
2316            .unwrap();
2317        client1
2318            .matrix_auth()
2319            .restore_session(session.clone(), RoomLoadSettings::default())
2320            .await
2321            .unwrap();
2322
2323        let client2 = Client::builder()
2324            .homeserver_url("http://localhost:1234")
2325            .request_config(RequestConfig::new().disable_retry())
2326            .sqlite_store(sqlite_path, None)
2327            .build()
2328            .await
2329            .unwrap();
2330        client2.matrix_auth().restore_session(session, RoomLoadSettings::default()).await.unwrap();
2331
2332        // When the lock isn't enabled, any attempt at locking won't return a guard.
2333        let guard = client1.encryption().try_lock_store_once().await.unwrap();
2334        assert!(guard.is_none());
2335
2336        client1.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2337        client2.encryption().enable_cross_process_store_lock("client2".to_owned()).await.unwrap();
2338
2339        // One client can take the lock.
2340        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2341        assert!(acquired1.is_some());
2342
2343        // Keep the olm machine, so we can see if it's changed later, by comparing Arcs.
2344        let initial_olm_machine =
2345            client1.olm_machine().await.clone().expect("must have an olm machine");
2346
2347        // Also enable backup to check that new machine has the same backup keys.
2348        let decryption_key = matrix_sdk_base::crypto::store::types::BackupDecryptionKey::new();
2349        let backup_key = decryption_key.megolm_v1_public_key();
2350        backup_key.set_version("1".to_owned());
2351        initial_olm_machine
2352            .backup_machine()
2353            .save_decryption_key(Some(decryption_key.to_owned()), Some("1".to_owned()))
2354            .await
2355            .expect("Should save");
2356
2357        initial_olm_machine.backup_machine().enable_backup_v1(backup_key.clone()).await.unwrap();
2358
2359        assert!(client1.encryption().backups().are_enabled().await);
2360
2361        // The other client can't take the lock too.
2362        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2363        assert!(acquired2.is_none());
2364
2365        // Now have the first client release the lock,
2366        drop(acquired1);
2367        tokio::time::sleep(Duration::from_millis(100)).await;
2368
2369        // And re-take it.
2370        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2371        assert!(acquired1.is_some());
2372
2373        // In that case, the Olm Machine shouldn't change.
2374        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2375        assert!(initial_olm_machine.same_as(&olm_machine));
2376
2377        // Ok, release again.
2378        drop(acquired1);
2379        tokio::time::sleep(Duration::from_millis(100)).await;
2380
2381        // Client2 can acquire the lock.
2382        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2383        assert!(acquired2.is_some());
2384
2385        // And then release it.
2386        drop(acquired2);
2387        tokio::time::sleep(Duration::from_millis(100)).await;
2388
2389        // Client1 can acquire it again,
2390        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2391        assert!(acquired1.is_some());
2392
2393        // But now its olm machine has been invalidated and thus regenerated!
2394        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2395
2396        assert!(!initial_olm_machine.same_as(&olm_machine));
2397
2398        let backup_key_new = olm_machine.backup_machine().get_backup_keys().await.unwrap();
2399        assert!(backup_key_new.decryption_key.is_some());
2400        assert_eq!(
2401            backup_key_new.decryption_key.unwrap().megolm_v1_public_key().to_base64(),
2402            backup_key.to_base64()
2403        );
2404        assert!(client1.encryption().backups().are_enabled().await);
2405    }
2406
2407    #[cfg(feature = "sqlite")]
2408    #[async_test]
2409    async fn test_generation_counter_no_spurious_invalidation() {
2410        // Create two clients using the same sqlite database.
2411
2412        use matrix_sdk_base::store::RoomLoadSettings;
2413        let sqlite_path =
2414            std::env::temp_dir().join("generation_counter_no_spurious_invalidations.db");
2415        let session = mock_matrix_session();
2416
2417        let client = Client::builder()
2418            .homeserver_url("http://localhost:1234")
2419            .request_config(RequestConfig::new().disable_retry())
2420            .sqlite_store(&sqlite_path, None)
2421            .build()
2422            .await
2423            .unwrap();
2424        client
2425            .matrix_auth()
2426            .restore_session(session.clone(), RoomLoadSettings::default())
2427            .await
2428            .unwrap();
2429
2430        let initial_olm_machine = client.olm_machine().await.as_ref().unwrap().clone();
2431
2432        client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2433
2434        // Enabling the lock doesn't update the olm machine.
2435        let after_enabling_lock = client.olm_machine().await.as_ref().unwrap().clone();
2436        assert!(initial_olm_machine.same_as(&after_enabling_lock));
2437
2438        {
2439            // Simulate that another client hold the lock before.
2440            let client2 = Client::builder()
2441                .homeserver_url("http://localhost:1234")
2442                .request_config(RequestConfig::new().disable_retry())
2443                .sqlite_store(sqlite_path, None)
2444                .build()
2445                .await
2446                .unwrap();
2447            client2
2448                .matrix_auth()
2449                .restore_session(session, RoomLoadSettings::default())
2450                .await
2451                .unwrap();
2452
2453            client2
2454                .encryption()
2455                .enable_cross_process_store_lock("client2".to_owned())
2456                .await
2457                .unwrap();
2458
2459            let guard = client2.encryption().spin_lock_store(None).await.unwrap();
2460            assert!(guard.is_some());
2461
2462            drop(guard);
2463            tokio::time::sleep(Duration::from_millis(100)).await;
2464        }
2465
2466        {
2467            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2468            assert!(acquired.is_some());
2469        }
2470
2471        // Taking the lock the first time will update the olm machine.
2472        let after_taking_lock_first_time = client.olm_machine().await.as_ref().unwrap().clone();
2473        assert!(!initial_olm_machine.same_as(&after_taking_lock_first_time));
2474
2475        {
2476            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2477            assert!(acquired.is_some());
2478        }
2479
2480        // Re-taking the lock doesn't update the olm machine.
2481        let after_taking_lock_second_time = client.olm_machine().await.as_ref().unwrap().clone();
2482        assert!(after_taking_lock_first_time.same_as(&after_taking_lock_second_time));
2483    }
2484
2485    #[async_test]
2486    async fn test_update_verification_state_is_updated_before_any_requests_happen() {
2487        // Given a client and a server
2488        let client = no_retry_test_client(None).await;
2489        let server = MockServer::start().await;
2490
2491        // When we subscribe to its verification state
2492        let mut verification_state = client.encryption().verification_state();
2493
2494        // We can get its initial value, and it's Unknown
2495        assert_next_matches_with_timeout!(verification_state, VerificationState::Unknown);
2496
2497        // We set up a mocked request to check this endpoint is not called before
2498        // reading the new state
2499        let keys_requested = Arc::new(AtomicBool::new(false));
2500        let inner_bool = keys_requested.clone();
2501
2502        Mock::given(method("GET"))
2503            .and(path_regex(
2504                r"/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
2505            ))
2506            .respond_with(move |_req: &Request| {
2507                inner_bool.fetch_or(true, Ordering::SeqCst);
2508                ResponseTemplate::new(200).set_body_json(json!({}))
2509            })
2510            .mount(&server)
2511            .await;
2512
2513        // When the session is initialised and the encryption tasks spawn
2514        set_client_session(&client).await;
2515
2516        // Then we can get an updated value without waiting for any network requests
2517        assert!(keys_requested.load(Ordering::SeqCst).not());
2518        assert_next_matches_with_timeout!(verification_state, VerificationState::Unverified);
2519    }
2520
2521    #[test]
2522    fn test_oauth_reset_info_from_uiaa_info() {
2523        let auth_info = json!({
2524            "session": "dummy",
2525            "flows": [
2526                {
2527                    "stages": [
2528                        "org.matrix.cross_signing_reset"
2529                    ]
2530                }
2531            ],
2532            "params": {
2533                "org.matrix.cross_signing_reset": {
2534                    "url": "https://example.org/account/account?action=org.matrix.cross_signing_reset"
2535                }
2536            },
2537            "msg": "To reset..."
2538        });
2539
2540        let auth_info = serde_json::from_value(auth_info)
2541            .expect("We should be able to deserialize the UiaaInfo");
2542        OAuthCrossSigningResetInfo::from_auth_info(&auth_info)
2543            .expect("We should be able to fetch the cross-signing reset info from the auth info");
2544    }
2545
2546    #[test]
2547    fn test_duplicate_one_time_key_error_parsing() {
2548        let message = concat!(
2549            r#"One time key signed_curve25519:AAAAAAAAAAA already exists. "#,
2550            r#"Old key: {"key":"dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs","#,
2551            r#""signatures":{"@example:matrix.org":{"ed25519:AAAAAAAAAA":""#,
2552            r#"Fk45zHAbrd+1j9wZXLjL2Y/+DU/Mnz9yuvlfYBOOT7qExN2Jdud+5BAuNs8nZ/caS4wTF39Kg3zQpzaGERoCBg"}}};"#,
2553            r#" new key: {'key': 'CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno', "#,
2554            r#"'signatures': {'@example:matrix.org': {'ed25519:AAAAAAAAAA': "#,
2555            r#"'BQ9Gp0p+6srF+c8OyruqKKd9R4yaub3THYAyyBB/7X/rG8BwcAqFynzl1aGyFYun4Q+087a5OSiglCXI+/kQAA'}}}"#
2556        );
2557        let message = DuplicateOneTimeKeyErrorMessage::from_str(message)
2558            .expect("We should be able to parse the error message");
2559
2560        assert_eq!(message.old_key.to_base64(), "dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs");
2561        assert_eq!(message.new_key.to_base64(), "CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno");
2562
2563        DuplicateOneTimeKeyErrorMessage::from_str("One time key already exists.")
2564            .expect_err("We shouldn't be able to parse an incomplete error message");
2565    }
2566
2567    // Helper function for the test_devices_to_verify_against_* tests.  Make a
2568    // response to a /keys/query request using the given device keys and a
2569    // pre-defined set of cross-signing keys.
2570    fn devices_to_verify_against_keys_query_response(
2571        devices: Vec<serde_json::Value>,
2572    ) -> serde_json::Value {
2573        let device_keys: serde_json::Map<String, serde_json::Value> = devices
2574            .into_iter()
2575            .map(|device| (device.get("device_id").unwrap().as_str().unwrap().to_owned(), device))
2576            .collect();
2577        json!({
2578            "device_keys": {
2579                "@example:localhost": device_keys,
2580            },
2581            "master_keys": {
2582                "@example:localhost": {
2583                    "keys": {
2584                        "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU",
2585                    },
2586                    "usage": ["master"],
2587                    "user_id": "@example:localhost",
2588                },
2589            },
2590            "self_signing_keys": {
2591                "@example:localhost": {
2592                    "keys": {
2593                        "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM",
2594                    },
2595                    "usage": ["self_signing"],
2596                    "user_id": "@example:localhost",
2597                    "signatures": {
2598                        "@example:localhost": {
2599                            "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "etO1bB+rCk+TQ/FcjQ8eWu/RsRNQNNQ1Ek+PD6//j8yz6igRjfvuHZaMvr/quAFrirfgExph2TdOwlDgN5bFCQ",
2600                        },
2601                    },
2602                },
2603            },
2604            "user_signing_keys": {
2605                "@example:localhost": {
2606                    "keys": {
2607                        "ed25519:CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0": "CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0",
2608                    },
2609                    "usage": ["user_signing"],
2610                    "user_id": "@example:localhost",
2611                    "signatures": {
2612                        "@example:localhost": {
2613                            "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "E/DFi/hQTIb/7eSB+HbCXeTLFaLjqWHzLO9GwjL1qdhfO7ew4p6YdtXSH3T2YYr1dKCPteH/4nMYVwOhww2CBg",
2614                        },
2615                    },
2616                },
2617            }
2618        })
2619    }
2620
2621    // The following three tests test that we can detect whether the user has
2622    // other devices that they can verify against under different conditions.
2623    #[async_test]
2624    /// Test that we detect that can't verify against another device if we have
2625    /// no devices.
2626    async fn test_devices_to_verify_against_no_devices() {
2627        let server = MockServer::start().await;
2628        let client = logged_in_client(Some(server.uri())).await;
2629
2630        Mock::given(method("POST"))
2631            .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2632            .respond_with(
2633                ResponseTemplate::new(200)
2634                    .set_body_json(devices_to_verify_against_keys_query_response(vec![])),
2635            )
2636            .mount(&server)
2637            .await;
2638
2639        assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2640    }
2641
2642    #[async_test]
2643    /// Test that we detect that we can verify against another cross-signed
2644    /// regular device.
2645    async fn test_devices_to_verify_against_cross_signed() {
2646        let server = MockServer::start().await;
2647        let client = logged_in_client(Some(server.uri())).await;
2648
2649        Mock::given(method("POST"))
2650            .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2651            .respond_with(ResponseTemplate::new(200).set_body_json(
2652                devices_to_verify_against_keys_query_response(vec![
2653                    json!({
2654                        "algorithms": [
2655                            "m.olm.v1.curve25519-aes-sha2",
2656                            "m.megolm.v1.aes-sha2",
2657                        ],
2658                        "user_id": "@example:localhost",
2659                        "device_id": "SIGNEDDEVICE",
2660                        "keys": {
2661                            "curve25519:SIGNEDDEVICE": "o1LqUtH/sqd3WF+BB2Qr77uw3sDmZhMOz68/IV9aHxs",
2662                            "ed25519:SIGNEDDEVICE": "iVoEfMOoUqxXVMLdpZCOgvQuCrT3/kQWkBmB3Phi/lo",
2663                        },
2664                        "signatures": {
2665                            "@example:localhost": {
2666                                "ed25519:SIGNEDDEVICE": "C7yRu1fNrdD2EobVdtANMqk3LBtWtTRWrIU22xVS8/Om1kmA/luzek64R3N6JsZhYczVmZYBKhUC9kRvHHwOBg",
2667                                "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "frfh2HP28GclmGvwTic00Fj4nZCvm4RlRA6U56mnD5920hOi04+L055ojzp6ybZXvC/GQYfyTHwQXlUN1nvxBA",
2668                            },
2669                        },
2670                    })
2671                ])
2672            ))
2673            .mount(&server)
2674            .await;
2675
2676        assert!(client.encryption().has_devices_to_verify_against().await.unwrap());
2677    }
2678
2679    #[async_test]
2680    /// Test that we detect that we can't verify against a dehydrated or
2681    /// unsigned device.
2682    async fn test_devices_to_verify_against_dehydrated_and_unsigned() {
2683        let server = MockServer::start().await;
2684        let client = logged_in_client(Some(server.uri())).await;
2685        let user_id = client.user_id().unwrap();
2686        let olm_machine = client.olm_machine().await;
2687        let olm_machine = olm_machine.as_ref().unwrap();
2688
2689        Mock::given(method("POST"))
2690            .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2691            .respond_with(ResponseTemplate::new(200).set_body_json(
2692                devices_to_verify_against_keys_query_response(vec![
2693                    json!({
2694                        "algorithms": [
2695                            "m.olm.v1.curve25519-aes-sha2",
2696                            "m.megolm.v1.aes-sha2",
2697                        ],
2698                        "user_id": "@example:localhost",
2699                        "device_id": "DEHYDRATEDDEVICE",
2700                        "keys": {
2701                            "curve25519:DEHYDRATEDDEVICE": "XOn5VguAgokZ3p9mBz2yOB395fn6j75G8jIPcXEWQGY",
2702                            "ed25519:DEHYDRATEDDEVICE": "4GG5xmBT7z4rgUgmWNlKZ+ABE3QlGgTorF+luCnKfYI",
2703                        },
2704                        "dehydrated": true,
2705                        "signatures": {
2706                            "@example:localhost": {
2707                                "ed25519:DEHYDRATEDDEVICE": "+OMasB7nzVlMV+zRDxkh4h8h/Q0bY42P1SPv7X2IURIelT5G+d+AYSmg30N4maphxEDBqt/vI8/lIr71exc3Dg",
2708                                "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "8DzynAgbYgXX1Md5d4Vw91Zstpoi4dpG7levFeVhi4psCAWuBnV76Qu1s2TGjQQ0CLDXEqcxxuX9X4eUK5TGCg",
2709                            },
2710                        },
2711                    }),
2712                    json!({
2713                        "algorithms": [
2714                            "m.olm.v1.curve25519-aes-sha2",
2715                            "m.megolm.v1.aes-sha2",
2716                        ],
2717                        "user_id": "@example:localhost",
2718                        "device_id": "UNSIGNEDDEVICE",
2719                        "keys": {
2720                            "curve25519:UNSIGNEDDEVICE": "mMby6NpprkHxj+ONfO9Z5lBqVUHJBMkrPFSNJhogBkg",
2721                            "ed25519:UNSIGNEDDEVICE": "Zifq39ZDrlIaSRf0Hh22owEqXCPE+1JSSgs6LDlubwQ",
2722                        },
2723                        "signatures": {
2724                            "@example:localhost": {
2725                                "ed25519:UNSIGNEDDEVICE": "+L29RoDKoTufPGm/Bae65KHno7Z1H7GYhxSKpB4RQZRS7NrR29AMW1PVhEsIozYuDVEFuMZ0L8H3dlcaHxagBA",
2726                            },
2727                        },
2728                    }),
2729                ])
2730            ))
2731            .mount(&server)
2732            .await;
2733
2734        let (request_id, request) = olm_machine.query_keys_for_users([user_id]);
2735        client.keys_query(&request_id, request.device_keys).await.unwrap();
2736
2737        assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2738    }
2739}