Skip to main content

matrix_sdk/encryption/
mod.rs

1// Copyright 2021 The Matrix.org Foundation C.I.C.
2// Copyright 2021 Damir Jelić
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![doc = include_str!("../docs/encryption.md")]
17#![cfg_attr(target_family = "wasm", allow(unused_imports))]
18
19#[cfg(feature = "experimental-send-custom-to-device")]
20use std::ops::Deref;
21use std::{
22    collections::{BTreeMap, HashSet},
23    io::{Cursor, Read, Write},
24    iter,
25    path::PathBuf,
26    str::FromStr,
27    sync::Arc,
28    time::Duration,
29};
30
31use eyeball::{SharedObservable, Subscriber};
32use futures_core::Stream;
33use futures_util::{
34    future::try_join,
35    stream::{self, StreamExt},
36};
37#[cfg(feature = "experimental-send-custom-to-device")]
38use matrix_sdk_base::crypto::CollectStrategy;
39use matrix_sdk_base::{
40    StateStoreDataKey, StateStoreDataValue,
41    cross_process_lock::CrossProcessLockError,
42    crypto::{
43        CrossSigningBootstrapRequests, OlmMachine,
44        store::types::{RoomKeyBundleInfo, RoomKeyInfo},
45        types::{
46            SignedKey,
47            requests::{
48                OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
49            },
50        },
51    },
52    sleep::sleep,
53};
54use matrix_sdk_common::{executor::spawn, locks::Mutex as StdMutex};
55use ruma::{
56    DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, TransactionId, UserId,
57    api::client::{
58        error::{ErrorBody, StandardErrorBody},
59        keys::{
60            get_keys, upload_keys, upload_signatures::v3::Request as UploadSignaturesRequest,
61            upload_signing_keys::v3::Request as UploadSigningKeysRequest,
62        },
63        message::send_message_event,
64        to_device::send_event_to_device::v3::{
65            Request as RumaToDeviceRequest, Response as ToDeviceResponse,
66        },
67        uiaa::{AuthData, AuthType, OAuthParams, UiaaInfo},
68    },
69    assign,
70    events::room::{
71        MediaSource, ThumbnailInfo,
72        member::{MembershipChange, OriginalSyncRoomMemberEvent},
73    },
74};
75#[cfg(feature = "experimental-send-custom-to-device")]
76use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices};
77use serde::{Deserialize, de::Error as _};
78use tasks::BundleReceiverTask;
79use tokio::sync::{Mutex, RwLockReadGuard};
80use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
81use tracing::{debug, error, instrument, warn};
82use url::Url;
83use vodozemac::Curve25519PublicKey;
84
85use self::{
86    backups::{Backups, types::BackupClientState},
87    futures::UploadEncryptedFile,
88    identities::{Device, DeviceUpdates, IdentityUpdates, UserDevices, UserIdentity},
89    recovery::{Recovery, RecoveryState},
90    secret_storage::SecretStorage,
91    tasks::{BackupDownloadTask, BackupUploadingTask, ClientTasks},
92    verification::{SasVerification, Verification, VerificationRequest},
93};
94use crate::{
95    Client, Error, HttpError, Result, Room, RumaApiError, TransmissionProgress,
96    attachment::Thumbnail,
97    client::{ClientInner, WeakClient},
98    cross_process_lock::CrossProcessLockGuard,
99    error::HttpResult,
100};
101
102pub mod backups;
103pub mod futures;
104pub mod identities;
105pub mod recovery;
106pub mod secret_storage;
107pub(crate) mod tasks;
108pub mod verification;
109
110pub use matrix_sdk_base::crypto::{
111    CrossSigningStatus, CryptoStoreError, DecryptorError, EventError, KeyExportError, LocalTrust,
112    MediaEncryptionInfo, MegolmError, OlmError, RoomKeyImportResult, SecretImportError,
113    SessionCreationError, SignatureError, VERSION,
114    olm::{
115        SessionCreationError as MegolmSessionCreationError,
116        SessionExportError as OlmSessionExportError,
117    },
118    vodozemac,
119};
120use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
121
122#[cfg(feature = "experimental-send-custom-to-device")]
123use crate::config::RequestConfig;
124pub use crate::error::RoomKeyImportError;
125
126/// All the data related to the encryption state.
127pub(crate) struct EncryptionData {
128    /// Background tasks related to encryption (key backup, initialization
129    /// tasks, etc.).
130    pub tasks: StdMutex<ClientTasks>,
131
132    /// End-to-end encryption settings.
133    pub encryption_settings: EncryptionSettings,
134
135    /// All state related to key backup.
136    pub backup_state: BackupClientState,
137
138    /// All state related to secret storage recovery.
139    pub recovery_state: SharedObservable<RecoveryState>,
140}
141
142impl EncryptionData {
143    pub fn new(encryption_settings: EncryptionSettings) -> Self {
144        Self {
145            encryption_settings,
146
147            tasks: StdMutex::new(Default::default()),
148            backup_state: Default::default(),
149            recovery_state: Default::default(),
150        }
151    }
152
153    pub fn initialize_tasks(&self, client: &Arc<ClientInner>) {
154        let weak_client = WeakClient::from_inner(client);
155
156        let mut tasks = self.tasks.lock();
157        tasks.upload_room_keys = Some(BackupUploadingTask::new(weak_client.clone()));
158
159        if self.encryption_settings.backup_download_strategy
160            == BackupDownloadStrategy::AfterDecryptionFailure
161        {
162            tasks.download_room_keys = Some(BackupDownloadTask::new(weak_client));
163        }
164    }
165
166    /// Initialize the background task which listens for changes in the
167    /// [`backups::BackupState`] and updataes the [`recovery::RecoveryState`].
168    ///
169    /// This should happen after the usual tasks have been set up and after the
170    /// E2EE initialization tasks have been set up.
171    pub fn initialize_recovery_state_update_task(&self, client: &Client) {
172        let mut guard = self.tasks.lock();
173
174        let future = Recovery::update_state_after_backup_state_change(client);
175        let join_handle = spawn(future);
176
177        guard.update_recovery_state_after_backup = Some(join_handle);
178    }
179}
180
181/// Settings for end-to-end encryption features.
182#[derive(Clone, Copy, Debug, Default)]
183pub struct EncryptionSettings {
184    /// Automatically bootstrap cross-signing for a user once they're logged, in
185    /// case it's not already done yet.
186    ///
187    /// This requires to login with a username and password, or that MSC3967 is
188    /// enabled on the server, as of 2023-10-20.
189    pub auto_enable_cross_signing: bool,
190
191    /// Select a strategy to download room keys from the backup, by default room
192    /// keys won't be downloaded from the backup automatically.
193    ///
194    /// Take a look at the [`BackupDownloadStrategy`] enum for more options.
195    pub backup_download_strategy: BackupDownloadStrategy,
196
197    /// Automatically create a backup version if no backup exists.
198    pub auto_enable_backups: bool,
199}
200
201/// Settings for end-to-end encryption features.
202#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
203#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
204pub enum BackupDownloadStrategy {
205    /// Automatically download all room keys from the backup when the backup
206    /// recovery key has been received. The backup recovery key can be received
207    /// in two ways:
208    ///
209    /// 1. Received as a `m.secret.send` to-device event, after a successful
210    ///    interactive verification.
211    /// 2. Imported from secret storage (4S) using the
212    ///    [`SecretStore::import_secrets()`] method.
213    ///
214    /// [`SecretStore::import_secrets()`]: crate::encryption::secret_storage::SecretStore::import_secrets
215    OneShot,
216
217    /// Attempt to download a single room key if an event fails to be decrypted.
218    AfterDecryptionFailure,
219
220    /// Don't download any room keys automatically. The user can manually
221    /// download room keys using the [`Backups::download_room_key()`] methods.
222    ///
223    /// This is the default option.
224    #[default]
225    Manual,
226}
227
228/// The verification state of our own device
229///
230/// This enum tells us if our own user identity trusts these devices, in other
231/// words it tells us if the user identity has signed the device.
232#[derive(Clone, Copy, Debug, Eq, PartialEq)]
233pub enum VerificationState {
234    /// The verification state is unknown for now.
235    Unknown,
236    /// The device is considered to be verified, it has been signed by its user
237    /// identity.
238    Verified,
239    /// The device is unverified.
240    Unverified,
241}
242
243/// Wraps together a `CrossProcessLockStoreGuard` and a generation number.
244#[derive(Debug)]
245pub struct CrossProcessLockStoreGuardWithGeneration {
246    _guard: CrossProcessLockGuard,
247    generation: u64,
248}
249
250impl CrossProcessLockStoreGuardWithGeneration {
251    /// Return the Crypto Store generation associated with this store lock.
252    pub fn generation(&self) -> u64 {
253        self.generation
254    }
255}
256
257/// A stateful struct remembering the cross-signing keys we need to upload.
258///
259/// Since the `/_matrix/client/v3/keys/device_signing/upload` might require
260/// additional authentication, this struct will contain information on the type
261/// of authentication the user needs to complete before the upload might be
262/// continued.
263///
264/// More info can be found in the [spec].
265///
266/// [spec]: https://spec.matrix.org/v1.11/client-server-api/#post_matrixclientv3keysdevice_signingupload
267#[derive(Debug)]
268pub struct CrossSigningResetHandle {
269    client: Client,
270    upload_request: UploadSigningKeysRequest,
271    signatures_request: UploadSignaturesRequest,
272    auth_type: CrossSigningResetAuthType,
273    is_cancelled: Mutex<bool>,
274}
275
276impl CrossSigningResetHandle {
277    /// Set up a new `CrossSigningResetHandle`.
278    pub fn new(
279        client: Client,
280        upload_request: UploadSigningKeysRequest,
281        signatures_request: UploadSignaturesRequest,
282        auth_type: CrossSigningResetAuthType,
283    ) -> Self {
284        Self {
285            client,
286            upload_request,
287            signatures_request,
288            auth_type,
289            is_cancelled: Mutex::new(false),
290        }
291    }
292
293    /// Get the [`CrossSigningResetAuthType`] this cross-signing reset process
294    /// is using.
295    pub fn auth_type(&self) -> &CrossSigningResetAuthType {
296        &self.auth_type
297    }
298
299    /// Continue the cross-signing reset by either waiting for the
300    /// authentication to be done on the side of the OAuth 2.0 server or by
301    /// providing additional [`AuthData`] the homeserver requires.
302    pub async fn auth(&self, auth: Option<AuthData>) -> Result<()> {
303        // Poll to see whether the reset has been authorized twice per second.
304        const RETRY_EVERY: Duration = Duration::from_millis(500);
305
306        // Give up after two minutes of polling.
307        const TIMEOUT: Duration = Duration::from_mins(2);
308
309        tokio::time::timeout(TIMEOUT, async {
310            let mut upload_request = self.upload_request.clone();
311            upload_request.auth = auth;
312
313            debug!(
314                "Repeatedly PUTting to keys/device_signing/upload until it works \
315                or we hit a permanent failure."
316            );
317            while let Err(e) = self.client.send(upload_request.clone()).await {
318                if *self.is_cancelled.lock().await {
319                    return Ok(());
320                }
321
322                match e.as_uiaa_response() {
323                    Some(uiaa_info) => {
324                        if uiaa_info.auth_error.is_some() {
325                            return Err(e.into());
326                        }
327                    }
328                    None => return Err(e.into()),
329                }
330
331                debug!(
332                    "PUT to keys/device_signing/upload failed with 401. Retrying after \
333                    a short delay."
334                );
335                sleep(RETRY_EVERY).await;
336            }
337
338            self.client.send(self.signatures_request.clone()).await?;
339
340            Ok(())
341        })
342        .await
343        .unwrap_or_else(|_| {
344            warn!("Timed out waiting for keys/device_signing/upload to succeed.");
345            Err(Error::Timeout)
346        })
347    }
348
349    /// Cancel the ongoing identity reset process
350    pub async fn cancel(&self) {
351        *self.is_cancelled.lock().await = true;
352    }
353}
354
355/// information about the additional authentication that is required before the
356/// cross-signing keys can be uploaded.
357#[derive(Debug, Clone)]
358pub enum CrossSigningResetAuthType {
359    /// The homeserver requires user-interactive authentication.
360    Uiaa(UiaaInfo),
361    /// OAuth 2.0 is used for authentication and the user needs to open a URL to
362    /// approve the upload of cross-signing keys.
363    OAuth(OAuthCrossSigningResetInfo),
364}
365
366impl CrossSigningResetAuthType {
367    fn new(error: &HttpError) -> Result<Option<Self>> {
368        if let Some(auth_info) = error.as_uiaa_response() {
369            if let Ok(Some(auth_info)) = OAuthCrossSigningResetInfo::from_auth_info(auth_info) {
370                Ok(Some(CrossSigningResetAuthType::OAuth(auth_info)))
371            } else {
372                Ok(Some(CrossSigningResetAuthType::Uiaa(auth_info.clone())))
373            }
374        } else {
375            Ok(None)
376        }
377    }
378}
379
380/// OAuth 2.0 specific information about the required authentication for the
381/// upload of cross-signing keys.
382#[derive(Debug, Clone, Deserialize)]
383pub struct OAuthCrossSigningResetInfo {
384    /// The URL where the user can approve the reset of the cross-signing keys.
385    pub approval_url: Url,
386}
387
388impl OAuthCrossSigningResetInfo {
389    fn from_auth_info(auth_info: &UiaaInfo) -> Result<Option<Self>> {
390        let Some(parameters) = auth_info.params::<OAuthParams>(&AuthType::OAuth)? else {
391            return Ok(None);
392        };
393
394        Ok(Some(OAuthCrossSigningResetInfo { approval_url: parameters.url.as_str().try_into()? }))
395    }
396}
397
398/// A struct that helps to parse the custom error message Synapse posts if a
399/// duplicate one-time key is uploaded.
400#[derive(Clone, Debug)]
401pub struct DuplicateOneTimeKeyErrorMessage {
402    /// The previously uploaded one-time key.
403    pub old_key: Curve25519PublicKey,
404    /// The one-time key we're attempting to upload right now.
405    pub new_key: Curve25519PublicKey,
406}
407
408impl FromStr for DuplicateOneTimeKeyErrorMessage {
409    type Err = serde_json::Error;
410
411    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
412        // First we split the string into two parts, the part containing the old key and
413        // the part containing the new key. The parts are conveniently separated
414        // by a `;` character.
415        let mut split = s.split_terminator(';');
416
417        let old_key = split
418            .next()
419            .ok_or(serde_json::Error::custom("Old key is missing in the error message"))?;
420        let new_key = split
421            .next()
422            .ok_or(serde_json::Error::custom("New key is missing in the error message"))?;
423
424        // Now we remove the lengthy prefix from the part containing the old key, we
425        // should be left with just the JSON of the signed key.
426        let old_key_index = old_key
427            .find("Old key:")
428            .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
429
430        let old_key = old_key[old_key_index..]
431            .trim()
432            .strip_prefix("Old key:")
433            .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
434
435        // The part containing the new key is much simpler, we just remove a static
436        // prefix.
437        let new_key = new_key
438            .trim()
439            .strip_prefix("new key:")
440            .ok_or(serde_json::Error::custom("New key is missing the prefix"))?;
441
442        // The JSON containing the new key is for some reason quoted using single
443        // quotes, so let's replace them with normal double quotes.
444        let new_key = new_key.replace("'", "\"");
445
446        // Let's deserialize now.
447        let old_key: SignedKey = serde_json::from_str(old_key)?;
448        let new_key: SignedKey = serde_json::from_str(&new_key)?;
449
450        // Pick out the Curve keys, we don't care about the rest that much.
451        let old_key = old_key.key();
452        let new_key = new_key.key();
453
454        Ok(Self { old_key, new_key })
455    }
456}
457
458impl Client {
459    pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
460        self.base_client().olm_machine().await
461    }
462
463    pub(crate) async fn mark_request_as_sent(
464        &self,
465        request_id: &TransactionId,
466        response: impl Into<matrix_sdk_base::crypto::types::requests::AnyIncomingResponse<'_>>,
467    ) -> Result<(), matrix_sdk_base::Error> {
468        Ok(self
469            .olm_machine()
470            .await
471            .as_ref()
472            .expect(
473                "We should have an olm machine once we try to mark E2EE related requests as sent",
474            )
475            .mark_request_as_sent(request_id, response)
476            .await?)
477    }
478
479    /// Query the server for users device keys.
480    ///
481    /// # Panics
482    ///
483    /// Panics if no key query needs to be done.
484    #[instrument(skip(self, device_keys))]
485    pub(crate) async fn keys_query(
486        &self,
487        request_id: &TransactionId,
488        device_keys: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
489    ) -> Result<get_keys::v3::Response> {
490        let request = assign!(get_keys::v3::Request::new(), { device_keys });
491
492        let response = self.send(request).await?;
493        self.mark_request_as_sent(request_id, &response).await?;
494        self.encryption().update_state_after_keys_query(&response).await;
495
496        Ok(response)
497    }
498
499    /// Construct a [`EncryptedFile`][ruma::events::room::EncryptedFile] by
500    /// encrypting and uploading a provided reader.
501    ///
502    /// # Arguments
503    ///
504    /// * `content_type` - The content type of the file.
505    /// * `reader` - The reader that should be encrypted and uploaded.
506    ///
507    /// # Examples
508    ///
509    /// ```no_run
510    /// # use matrix_sdk::Client;
511    /// # use url::Url;
512    /// # use matrix_sdk::ruma::{room_id, OwnedRoomId};
513    /// use serde::{Deserialize, Serialize};
514    /// use matrix_sdk::ruma::events::{macros::EventContent, room::EncryptedFile};
515    ///
516    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
517    /// #[ruma_event(type = "com.example.custom", kind = MessageLike)]
518    /// struct CustomEventContent {
519    ///     encrypted_file: EncryptedFile,
520    /// }
521    ///
522    /// # async {
523    /// # let homeserver = Url::parse("http://example.com")?;
524    /// # let client = Client::new(homeserver).await?;
525    /// # let room = client.get_room(&room_id!("!test:example.com")).unwrap();
526    /// let mut reader = std::io::Cursor::new(b"Hello, world!");
527    /// let encrypted_file = client.upload_encrypted_file(&mut reader).await?;
528    ///
529    /// room.send(CustomEventContent { encrypted_file }).await?;
530    /// # anyhow::Ok(()) };
531    /// ```
532    pub fn upload_encrypted_file<'a, R: Read + ?Sized + 'a>(
533        &'a self,
534        reader: &'a mut R,
535    ) -> UploadEncryptedFile<'a, R> {
536        UploadEncryptedFile::new(self, reader)
537    }
538
539    /// Encrypt and upload the file and thumbnails, and return the source
540    /// information.
541    pub(crate) async fn upload_encrypted_media_and_thumbnail(
542        &self,
543        data: &[u8],
544        thumbnail: Option<Thumbnail>,
545        send_progress: SharedObservable<TransmissionProgress>,
546    ) -> Result<(MediaSource, Option<(MediaSource, Box<ThumbnailInfo>)>)> {
547        let upload_thumbnail = self.upload_encrypted_thumbnail(thumbnail, send_progress.clone());
548
549        let upload_attachment = async {
550            let mut cursor = Cursor::new(data);
551            self.upload_encrypted_file(&mut cursor)
552                .with_send_progress_observable(send_progress)
553                .await
554        };
555
556        let (thumbnail, file) = try_join(upload_thumbnail, upload_attachment).await?;
557
558        Ok((MediaSource::Encrypted(Box::new(file)), thumbnail))
559    }
560
561    /// Uploads an encrypted thumbnail to the media repository, and returns
562    /// its source and extra information.
563    async fn upload_encrypted_thumbnail(
564        &self,
565        thumbnail: Option<Thumbnail>,
566        send_progress: SharedObservable<TransmissionProgress>,
567    ) -> Result<Option<(MediaSource, Box<ThumbnailInfo>)>> {
568        let Some(thumbnail) = thumbnail else {
569            return Ok(None);
570        };
571
572        let (data, _, thumbnail_info) = thumbnail.into_parts();
573        let mut cursor = Cursor::new(data);
574
575        let file = self
576            .upload_encrypted_file(&mut cursor)
577            .with_send_progress_observable(send_progress)
578            .await?;
579
580        Ok(Some((MediaSource::Encrypted(Box::new(file)), thumbnail_info)))
581    }
582
583    /// Claim one-time keys creating new Olm sessions.
584    ///
585    /// # Arguments
586    ///
587    /// * `users` - The list of user/device pairs that we should claim keys for.
588    pub(crate) async fn claim_one_time_keys(
589        &self,
590        users: impl Iterator<Item = &UserId>,
591    ) -> Result<()> {
592        let _lock = self.locks().key_claim_lock.lock().await;
593
594        if let Some((request_id, request)) = self
595            .olm_machine()
596            .await
597            .as_ref()
598            .ok_or(Error::NoOlmMachine)?
599            .get_missing_sessions(users)
600            .await?
601        {
602            let response = self.send(request).await?;
603            self.mark_request_as_sent(&request_id, &response).await?;
604        }
605
606        Ok(())
607    }
608
609    /// Upload the E2E encryption keys.
610    ///
611    /// This uploads the long lived device keys as well as the required amount
612    /// of one-time keys.
613    ///
614    /// # Panics
615    ///
616    /// Panics if the client isn't logged in, or if no encryption keys need to
617    /// be uploaded.
618    #[instrument(skip(self, request))]
619    pub(crate) async fn keys_upload(
620        &self,
621        request_id: &TransactionId,
622        request: &upload_keys::v3::Request,
623    ) -> Result<upload_keys::v3::Response> {
624        debug!(
625            device_keys = request.device_keys.is_some(),
626            one_time_key_count = request.one_time_keys.len(),
627            "Uploading public encryption keys",
628        );
629
630        let response = self.send(request.clone()).await?;
631        self.mark_request_as_sent(request_id, &response).await?;
632
633        Ok(response)
634    }
635
636    pub(crate) async fn room_send_helper(
637        &self,
638        request: &RoomMessageRequest,
639    ) -> Result<send_message_event::v3::Response> {
640        let content = request.content.clone();
641        let txn_id = request.txn_id.clone();
642        let room_id = &request.room_id;
643
644        self.get_room(room_id)
645            .expect("Can't send a message to a room that isn't known to the store")
646            .send(*content)
647            .with_transaction_id(txn_id)
648            .await
649            .map(|result| result.response)
650    }
651
652    pub(crate) async fn send_to_device(
653        &self,
654        request: &ToDeviceRequest,
655    ) -> HttpResult<ToDeviceResponse> {
656        let request = RumaToDeviceRequest::new_raw(
657            request.event_type.clone(),
658            request.txn_id.clone(),
659            request.messages.clone(),
660        );
661
662        self.send(request).await
663    }
664
665    pub(crate) async fn send_verification_request(
666        &self,
667        request: OutgoingVerificationRequest,
668    ) -> Result<()> {
669        use matrix_sdk_base::crypto::types::requests::OutgoingVerificationRequest::*;
670
671        match request {
672            ToDevice(t) => {
673                self.send_to_device(&t).await?;
674            }
675            InRoom(r) => {
676                self.room_send_helper(&r).await?;
677            }
678        }
679
680        Ok(())
681    }
682
683    async fn send_outgoing_request(&self, r: OutgoingRequest) -> Result<()> {
684        use matrix_sdk_base::crypto::types::requests::AnyOutgoingRequest;
685
686        match r.request() {
687            AnyOutgoingRequest::KeysQuery(request) => {
688                self.keys_query(r.request_id(), request.device_keys.clone()).await?;
689            }
690            AnyOutgoingRequest::KeysUpload(request) => {
691                let response = self.keys_upload(r.request_id(), request).await;
692
693                if let Err(e) = &response {
694                    match e.as_ruma_api_error() {
695                        Some(RumaApiError::ClientApi(e)) if e.status_code == 400 => {
696                            if let ErrorBody::Standard(StandardErrorBody { message, .. }) = &e.body
697                            {
698                                // This is one of the nastiest errors we can have. The server
699                                // telling us that we already have a one-time key uploaded means
700                                // that we forgot about some of our one-time keys. This will lead to
701                                // UTDs.
702                                {
703                                    let already_reported = self
704                                        .state_store()
705                                        .get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded)
706                                        .await?
707                                        .is_some();
708
709                                    if message.starts_with("One time key") && !already_reported {
710                                        let error_message =
711                                            DuplicateOneTimeKeyErrorMessage::from_str(message);
712
713                                        if let Ok(message) = &error_message {
714                                            error!(
715                                                sentry = true,
716                                                old_key = %message.old_key,
717                                                new_key = %message.new_key,
718                                                "Duplicate one-time keys have been uploaded"
719                                            );
720                                        } else {
721                                            error!(
722                                                sentry = true,
723                                                "Duplicate one-time keys have been uploaded"
724                                            );
725                                        }
726
727                                        self.state_store()
728                                            .set_kv_data(
729                                                StateStoreDataKey::OneTimeKeyAlreadyUploaded,
730                                                StateStoreDataValue::OneTimeKeyAlreadyUploaded,
731                                            )
732                                            .await?;
733
734                                        if let Err(e) = self
735                                            .inner
736                                            .duplicate_key_upload_error_sender
737                                            .send(error_message.ok())
738                                        {
739                                            error!(
740                                                "Failed to dispatch duplicate key upload error notification: {}",
741                                                e
742                                            );
743                                        }
744                                    }
745                                }
746                            }
747                        }
748                        _ => {}
749                    }
750
751                    response?;
752                }
753            }
754            AnyOutgoingRequest::ToDeviceRequest(request) => {
755                let response = self.send_to_device(request).await?;
756                self.mark_request_as_sent(r.request_id(), &response).await?;
757            }
758            AnyOutgoingRequest::SignatureUpload(request) => {
759                let response = self.send(request.clone()).await?;
760                self.mark_request_as_sent(r.request_id(), &response).await?;
761            }
762            AnyOutgoingRequest::RoomMessage(request) => {
763                let response = self.room_send_helper(request).await?;
764                self.mark_request_as_sent(r.request_id(), &response).await?;
765            }
766            AnyOutgoingRequest::KeysClaim(request) => {
767                let response = self.send(request.clone()).await?;
768                self.mark_request_as_sent(r.request_id(), &response).await?;
769            }
770        }
771
772        Ok(())
773    }
774
775    #[instrument(skip_all)]
776    pub(crate) async fn send_outgoing_requests(&self) -> Result<()> {
777        const MAX_CONCURRENT_REQUESTS: usize = 20;
778
779        // This is needed because sometimes we need to automatically
780        // claim some one-time keys to unwedge an existing Olm session.
781        if let Err(e) = self.claim_one_time_keys(iter::empty()).await {
782            warn!("Error while claiming one-time keys {:?}", e);
783        }
784
785        let outgoing_requests = stream::iter(
786            self.olm_machine()
787                .await
788                .as_ref()
789                .ok_or(Error::NoOlmMachine)?
790                .outgoing_requests()
791                .await?,
792        )
793        .map(|r| self.send_outgoing_request(r));
794
795        let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS);
796
797        requests
798            .for_each(|r| async move {
799                match r {
800                    Ok(_) => (),
801                    Err(e) => warn!(error = ?e, "Error when sending out an outgoing E2EE request"),
802                }
803            })
804            .await;
805
806        Ok(())
807    }
808}
809
810#[cfg(any(feature = "testing", test))]
811impl Client {
812    /// Get the olm machine, for testing purposes only.
813    pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
814        self.olm_machine().await
815    }
816
817    /// Aborts the client's bundle receiver task, for testing purposes only.
818    pub fn abort_bundle_receiver_task(&self) {
819        let tasks = self.inner.e2ee.tasks.lock();
820        if let Some(task) = tasks.receive_historic_room_key_bundles.as_ref() {
821            task.abort()
822        }
823    }
824}
825
826/// A high-level API to manage the client's encryption.
827///
828/// To get this, use [`Client::encryption()`].
829#[derive(Debug, Clone)]
830pub struct Encryption {
831    /// The underlying client.
832    client: Client,
833}
834
835impl Encryption {
836    pub(crate) fn new(client: Client) -> Self {
837        Self { client }
838    }
839
840    /// Returns the current encryption settings for this client.
841    pub(crate) fn settings(&self) -> EncryptionSettings {
842        self.client.inner.e2ee.encryption_settings
843    }
844
845    /// Get the public ed25519 key of our own device. This is usually what is
846    /// called the fingerprint of the device.
847    pub async fn ed25519_key(&self) -> Option<String> {
848        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().ed25519.to_base64())
849    }
850
851    /// Get the public Curve25519 key of our own device.
852    pub async fn curve25519_key(&self) -> Option<Curve25519PublicKey> {
853        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().curve25519)
854    }
855
856    /// Get the current device creation timestamp.
857    pub async fn device_creation_timestamp(&self) -> MilliSecondsSinceUnixEpoch {
858        match self.get_own_device().await {
859            Ok(Some(device)) => device.first_time_seen_ts(),
860            // Should not happen, there should always be an own device
861            _ => MilliSecondsSinceUnixEpoch::now(),
862        }
863    }
864
865    pub(crate) async fn import_secrets_bundle(
866        &self,
867        bundle: &matrix_sdk_base::crypto::types::SecretsBundle,
868    ) -> Result<(), SecretImportError> {
869        let olm_machine = self.client.olm_machine().await;
870        let olm_machine =
871            olm_machine.as_ref().expect("This should only be called once we have an OlmMachine");
872
873        olm_machine.store().import_secrets_bundle(bundle).await
874    }
875
876    /// Get the status of the private cross signing keys.
877    ///
878    /// This can be used to check which private cross signing keys we have
879    /// stored locally.
880    pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
881        let olm = self.client.olm_machine().await;
882        let machine = olm.as_ref()?;
883        Some(machine.cross_signing_status().await)
884    }
885
886    /// Does the user have other devices that the current device can verify
887    /// against?
888    ///
889    /// The device must be signed by the user's cross-signing key, must have an
890    /// identity, and must not be a dehydrated device.
891    pub async fn has_devices_to_verify_against(&self) -> Result<bool> {
892        let olm_machine = self.client.olm_machine().await;
893        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
894        let user_id = olm_machine.user_id();
895
896        self.ensure_initial_key_query().await?;
897
898        let devices = self.get_user_devices(user_id).await?;
899
900        let ret = devices.devices().any(|device| {
901            device.is_cross_signed_by_owner()
902                && device.curve25519_key().is_some()
903                && !device.is_dehydrated()
904        });
905
906        Ok(ret)
907    }
908
909    /// Get all the tracked users we know about
910    ///
911    /// Tracked users are users for which we keep the device list of E2EE
912    /// capable devices up to date.
913    pub async fn tracked_users(&self) -> Result<HashSet<OwnedUserId>, CryptoStoreError> {
914        if let Some(machine) = self.client.olm_machine().await.as_ref() {
915            machine.tracked_users().await
916        } else {
917            Ok(HashSet::new())
918        }
919    }
920
921    /// Get a [`Subscriber`] for the [`VerificationState`].
922    ///
923    /// # Examples
924    ///
925    /// ```no_run
926    /// use matrix_sdk::{Client, encryption};
927    /// use url::Url;
928    ///
929    /// # async {
930    /// let homeserver = Url::parse("http://example.com")?;
931    /// let client = Client::new(homeserver).await?;
932    /// let mut subscriber = client.encryption().verification_state();
933    ///
934    /// let current_value = subscriber.get();
935    ///
936    /// println!("The current verification state is: {current_value:?}");
937    ///
938    /// if let Some(verification_state) = subscriber.next().await {
939    ///     println!("Received verification state update {:?}", verification_state)
940    /// }
941    /// # anyhow::Ok(()) };
942    /// ```
943    pub fn verification_state(&self) -> Subscriber<VerificationState> {
944        self.client.inner.verification_state.subscribe_reset()
945    }
946
947    /// Get a verification object with the given flow id.
948    pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
949        let olm = self.client.olm_machine().await;
950        let olm = olm.as_ref()?;
951        #[allow(clippy::bind_instead_of_map)]
952        olm.get_verification(user_id, flow_id).and_then(|v| match v {
953            matrix_sdk_base::crypto::Verification::SasV1(sas) => {
954                Some(SasVerification { inner: sas, client: self.client.clone() }.into())
955            }
956            #[cfg(feature = "qrcode")]
957            matrix_sdk_base::crypto::Verification::QrV1(qr) => {
958                Some(verification::QrVerification { inner: qr, client: self.client.clone() }.into())
959            }
960            _ => None,
961        })
962    }
963
964    /// Get a `VerificationRequest` object for the given user with the given
965    /// flow id.
966    pub async fn get_verification_request(
967        &self,
968        user_id: &UserId,
969        flow_id: impl AsRef<str>,
970    ) -> Option<VerificationRequest> {
971        let olm = self.client.olm_machine().await;
972        let olm = olm.as_ref()?;
973
974        olm.get_verification_request(user_id, flow_id)
975            .map(|r| VerificationRequest { inner: r, client: self.client.clone() })
976    }
977
978    /// Get a specific device of a user.
979    ///
980    /// # Arguments
981    ///
982    /// * `user_id` - The unique id of the user that the device belongs to.
983    ///
984    /// * `device_id` - The unique id of the device.
985    ///
986    /// Returns a `Device` if one is found and the crypto store didn't throw an
987    /// error.
988    ///
989    /// This will always return None if the client hasn't been logged in.
990    ///
991    /// # Examples
992    ///
993    /// ```no_run
994    /// # use matrix_sdk::{Client, ruma::{device_id, user_id}};
995    /// # use url::Url;
996    /// # async {
997    /// # let alice = user_id!("@alice:example.org");
998    /// # let homeserver = Url::parse("http://example.com")?;
999    /// # let client = Client::new(homeserver).await?;
1000    /// if let Some(device) =
1001    ///     client.encryption().get_device(alice, device_id!("DEVICEID")).await?
1002    /// {
1003    ///     println!("{:?}", device.is_verified());
1004    ///
1005    ///     if !device.is_verified() {
1006    ///         let verification = device.request_verification().await?;
1007    ///     }
1008    /// }
1009    /// # anyhow::Ok(()) };
1010    /// ```
1011    pub async fn get_device(
1012        &self,
1013        user_id: &UserId,
1014        device_id: &DeviceId,
1015    ) -> Result<Option<Device>, CryptoStoreError> {
1016        let olm = self.client.olm_machine().await;
1017        let Some(machine) = olm.as_ref() else { return Ok(None) };
1018        let device = machine.get_device(user_id, device_id, None).await?;
1019        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
1020    }
1021
1022    /// A convenience method to retrieve your own device from the store.
1023    ///
1024    /// This is the same as calling [`Encryption::get_device()`] with your own
1025    /// user and device ID.
1026    ///
1027    /// This will always return a device, unless you are not logged in.
1028    pub async fn get_own_device(&self) -> Result<Option<Device>, CryptoStoreError> {
1029        let olm = self.client.olm_machine().await;
1030        let Some(machine) = olm.as_ref() else { return Ok(None) };
1031        let device = machine.get_device(machine.user_id(), machine.device_id(), None).await?;
1032        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
1033    }
1034
1035    /// Get a map holding all the devices of an user.
1036    ///
1037    /// This will always return an empty map if the client hasn't been logged
1038    /// in.
1039    ///
1040    /// # Arguments
1041    ///
1042    /// * `user_id` - The unique id of the user that the devices belong to.
1043    ///
1044    /// # Examples
1045    ///
1046    /// ```no_run
1047    /// # use matrix_sdk::{Client, ruma::user_id};
1048    /// # use url::Url;
1049    /// # async {
1050    /// # let alice = user_id!("@alice:example.org");
1051    /// # let homeserver = Url::parse("http://example.com")?;
1052    /// # let client = Client::new(homeserver).await?;
1053    /// let devices = client.encryption().get_user_devices(alice).await?;
1054    ///
1055    /// for device in devices.devices() {
1056    ///     println!("{device:?}");
1057    /// }
1058    /// # anyhow::Ok(()) };
1059    /// ```
1060    pub async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices, Error> {
1061        let devices = self
1062            .client
1063            .olm_machine()
1064            .await
1065            .as_ref()
1066            .ok_or(Error::NoOlmMachine)?
1067            .get_user_devices(user_id, None)
1068            .await?;
1069
1070        Ok(UserDevices { inner: devices, client: self.client.clone() })
1071    }
1072
1073    /// Get the E2EE identity of a user from the crypto store.
1074    ///
1075    /// Usually, we only have the E2EE identity of a user locally if the user
1076    /// is tracked, meaning that we are both members of the same encrypted room.
1077    ///
1078    /// To get the E2EE identity of a user even if it is not available locally
1079    /// use [`Encryption::request_user_identity()`].
1080    ///
1081    /// # Arguments
1082    ///
1083    /// * `user_id` - The unique id of the user that the identity belongs to.
1084    ///
1085    /// Returns a `UserIdentity` if one is found and the crypto store
1086    /// didn't throw an error.
1087    ///
1088    /// This will always return None if the client hasn't been logged in.
1089    ///
1090    /// # Examples
1091    ///
1092    /// ```no_run
1093    /// # use matrix_sdk::{Client, ruma::user_id};
1094    /// # use url::Url;
1095    /// # async {
1096    /// # let alice = user_id!("@alice:example.org");
1097    /// # let homeserver = Url::parse("http://example.com")?;
1098    /// # let client = Client::new(homeserver).await?;
1099    /// let user = client.encryption().get_user_identity(alice).await?;
1100    ///
1101    /// if let Some(user) = user {
1102    ///     println!("{:?}", user.is_verified());
1103    ///
1104    ///     let verification = user.request_verification().await?;
1105    /// }
1106    /// # anyhow::Ok(()) };
1107    /// ```
1108    pub async fn get_user_identity(
1109        &self,
1110        user_id: &UserId,
1111    ) -> Result<Option<UserIdentity>, CryptoStoreError> {
1112        let olm = self.client.olm_machine().await;
1113        let Some(olm) = olm.as_ref() else { return Ok(None) };
1114        let identity = olm.get_identity(user_id, None).await?;
1115
1116        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1117    }
1118
1119    /// Get the E2EE identity of a user from the homeserver.
1120    ///
1121    /// The E2EE identity returned is always guaranteed to be up-to-date. If the
1122    /// E2EE identity is not found, it should mean that the user did not set
1123    /// up cross-signing.
1124    ///
1125    /// If you want the E2EE identity of a user without making a request to the
1126    /// homeserver, use [`Encryption::get_user_identity()`] instead.
1127    ///
1128    /// # Arguments
1129    ///
1130    /// * `user_id` - The ID of the user that the identity belongs to.
1131    ///
1132    /// Returns a [`UserIdentity`] if one is found. Returns an error if there
1133    /// was an issue with the crypto store or with the request to the
1134    /// homeserver.
1135    ///
1136    /// This will always return `None` if the client hasn't been logged in.
1137    ///
1138    /// # Examples
1139    ///
1140    /// ```no_run
1141    /// # use matrix_sdk::{Client, ruma::user_id};
1142    /// # use url::Url;
1143    /// # async {
1144    /// # let alice = user_id!("@alice:example.org");
1145    /// # let homeserver = Url::parse("http://example.com")?;
1146    /// # let client = Client::new(homeserver).await?;
1147    /// let user = client.encryption().request_user_identity(alice).await?;
1148    ///
1149    /// if let Some(user) = user {
1150    ///     println!("User is verified: {:?}", user.is_verified());
1151    ///
1152    ///     let verification = user.request_verification().await?;
1153    /// }
1154    /// # anyhow::Ok(()) };
1155    /// ```
1156    pub async fn request_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
1157        let olm = self.client.olm_machine().await;
1158        let Some(olm) = olm.as_ref() else { return Ok(None) };
1159
1160        let (request_id, request) = olm.query_keys_for_users(iter::once(user_id));
1161        self.client.keys_query(&request_id, request.device_keys).await?;
1162
1163        let identity = olm.get_identity(user_id, None).await?;
1164        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1165    }
1166
1167    /// Returns a stream of device updates, allowing users to listen for
1168    /// notifications about new or changed devices.
1169    ///
1170    /// The stream produced by this method emits updates whenever a new device
1171    /// is discovered or when an existing device's information is changed. Users
1172    /// can subscribe to this stream and receive updates in real-time.
1173    ///
1174    /// # Examples
1175    ///
1176    /// ```no_run
1177    /// # use matrix_sdk::Client;
1178    /// # use ruma::{device_id, user_id};
1179    /// # use futures_util::{pin_mut, StreamExt};
1180    /// # let client: Client = unimplemented!();
1181    /// # async {
1182    /// let devices_stream = client.encryption().devices_stream().await?;
1183    /// let user_id = client
1184    ///     .user_id()
1185    ///     .expect("We should know our user id after we have logged in");
1186    /// pin_mut!(devices_stream);
1187    ///
1188    /// for device_updates in devices_stream.next().await {
1189    ///     if let Some(user_devices) = device_updates.new.get(user_id) {
1190    ///         for device in user_devices.values() {
1191    ///             println!("A new device has been added {}", device.device_id());
1192    ///         }
1193    ///     }
1194    /// }
1195    /// # anyhow::Ok(()) };
1196    /// ```
1197    pub async fn devices_stream(&self) -> Result<impl Stream<Item = DeviceUpdates> + use<>> {
1198        let olm = self.client.olm_machine().await;
1199        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1200        let client = self.client.to_owned();
1201
1202        Ok(olm
1203            .store()
1204            .devices_stream()
1205            .map(move |updates| DeviceUpdates::new(client.to_owned(), updates)))
1206    }
1207
1208    /// Returns a stream of user identity updates, allowing users to listen for
1209    /// notifications about new or changed user identities.
1210    ///
1211    /// The stream produced by this method emits updates whenever a new user
1212    /// identity is discovered or when an existing identities information is
1213    /// changed. Users can subscribe to this stream and receive updates in
1214    /// real-time.
1215    ///
1216    /// # Examples
1217    ///
1218    /// ```no_run
1219    /// # use matrix_sdk::Client;
1220    /// # use ruma::{device_id, user_id};
1221    /// # use futures_util::{pin_mut, StreamExt};
1222    /// # let client: Client = unimplemented!();
1223    /// # async {
1224    /// let identities_stream =
1225    ///     client.encryption().user_identities_stream().await?;
1226    /// pin_mut!(identities_stream);
1227    ///
1228    /// for identity_updates in identities_stream.next().await {
1229    ///     for (_, identity) in identity_updates.new {
1230    ///         println!("A new identity has been added {}", identity.user_id());
1231    ///     }
1232    /// }
1233    /// # anyhow::Ok(()) };
1234    /// ```
1235    pub async fn user_identities_stream(
1236        &self,
1237    ) -> Result<impl Stream<Item = IdentityUpdates> + use<>> {
1238        let olm = self.client.olm_machine().await;
1239        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1240        let client = self.client.to_owned();
1241
1242        Ok(olm
1243            .store()
1244            .user_identities_stream()
1245            .map(move |updates| IdentityUpdates::new(client.to_owned(), updates)))
1246    }
1247
1248    /// Create and upload a new cross signing identity.
1249    ///
1250    /// # Arguments
1251    ///
1252    /// * `auth_data` - This request requires user interactive auth, the first
1253    ///   request needs to set this to `None` and will always fail with an
1254    ///   `UiaaResponse`. The response will contain information for the
1255    ///   interactive auth and the same request needs to be made but this time
1256    ///   with some `auth_data` provided.
1257    ///
1258    /// # Examples
1259    ///
1260    /// ```no_run
1261    /// # use std::collections::BTreeMap;
1262    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1263    /// # use url::Url;
1264    /// # use serde_json::json;
1265    /// # async {
1266    /// # let homeserver = Url::parse("http://example.com")?;
1267    /// # let client = Client::new(homeserver).await?;
1268    /// if let Err(e) = client.encryption().bootstrap_cross_signing(None).await {
1269    ///     if let Some(response) = e.as_uiaa_response() {
1270    ///         let mut password = uiaa::Password::new(
1271    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
1272    ///             "wordpass".to_owned(),
1273    ///         );
1274    ///         password.session = response.session.clone();
1275    ///
1276    ///         client
1277    ///             .encryption()
1278    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1279    ///             .await
1280    ///             .expect("Couldn't bootstrap cross signing")
1281    ///     } else {
1282    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1283    ///     }
1284    /// }
1285    /// # anyhow::Ok(()) };
1286    pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData>) -> Result<()> {
1287        let olm = self.client.olm_machine().await;
1288        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1289
1290        let CrossSigningBootstrapRequests {
1291            upload_signing_keys_req,
1292            upload_keys_req,
1293            upload_signatures_req,
1294        } = olm.bootstrap_cross_signing(false).await?;
1295
1296        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1297            auth: auth_data,
1298            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1299            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1300            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1301        });
1302
1303        if let Some(req) = upload_keys_req {
1304            self.client.send_outgoing_request(req).await?;
1305        }
1306        self.client.send(upload_signing_keys_req).await?;
1307        self.client.send(upload_signatures_req).await?;
1308
1309        Ok(())
1310    }
1311
1312    /// Reset the cross-signing keys.
1313    ///
1314    /// # Example
1315    ///
1316    /// ```no_run
1317    /// # use matrix_sdk::{ruma::api::client::uiaa, Client, encryption::CrossSigningResetAuthType};
1318    /// # use url::Url;
1319    /// # async {
1320    /// # let homeserver = Url::parse("http://example.com")?;
1321    /// # let client = Client::new(homeserver).await?;
1322    /// # let user_id = unimplemented!();
1323    /// let encryption = client.encryption();
1324    ///
1325    /// if let Some(handle) = encryption.reset_cross_signing().await? {
1326    ///     match handle.auth_type() {
1327    ///         CrossSigningResetAuthType::Uiaa(uiaa) => {
1328    ///             use matrix_sdk::ruma::api::client::uiaa;
1329    ///
1330    ///             let password = "1234".to_owned();
1331    ///             let mut password = uiaa::Password::new(user_id, password);
1332    ///             password.session = uiaa.session;
1333    ///
1334    ///             handle.auth(Some(uiaa::AuthData::Password(password))).await?;
1335    ///         }
1336    ///         CrossSigningResetAuthType::OAuth(o) => {
1337    ///             println!(
1338    ///                 "To reset your end-to-end encryption cross-signing identity, \
1339    ///                 you first need to approve it at {}",
1340    ///                 o.approval_url
1341    ///             );
1342    ///             handle.auth(None).await?;
1343    ///         }
1344    ///     }
1345    /// }
1346    /// # anyhow::Ok(()) };
1347    /// ```
1348    pub async fn reset_cross_signing(&self) -> Result<Option<CrossSigningResetHandle>> {
1349        let olm = self.client.olm_machine().await;
1350        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1351
1352        let CrossSigningBootstrapRequests {
1353            upload_keys_req,
1354            upload_signing_keys_req,
1355            upload_signatures_req,
1356        } = olm.bootstrap_cross_signing(true).await?;
1357
1358        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1359            auth: None,
1360            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1361            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1362            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1363        });
1364
1365        if let Some(req) = upload_keys_req {
1366            self.client.send_outgoing_request(req).await?;
1367        }
1368
1369        if let Err(error) = self.client.send(upload_signing_keys_req.clone()).await {
1370            if let Ok(Some(auth_type)) = CrossSigningResetAuthType::new(&error) {
1371                let client = self.client.clone();
1372
1373                Ok(Some(CrossSigningResetHandle::new(
1374                    client,
1375                    upload_signing_keys_req,
1376                    upload_signatures_req,
1377                    auth_type,
1378                )))
1379            } else {
1380                Err(error.into())
1381            }
1382        } else {
1383            self.client.send(upload_signatures_req).await?;
1384
1385            Ok(None)
1386        }
1387    }
1388
1389    /// Query the user's own device keys, if, and only if, we didn't have their
1390    /// identity in the first place.
1391    async fn ensure_initial_key_query(&self) -> Result<()> {
1392        let olm_machine = self.client.olm_machine().await;
1393        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1394
1395        let user_id = olm_machine.user_id();
1396
1397        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1398            let (request_id, request) = olm_machine.query_keys_for_users([olm_machine.user_id()]);
1399            self.client.keys_query(&request_id, request.device_keys).await?;
1400        }
1401
1402        Ok(())
1403    }
1404
1405    /// Create and upload a new cross signing identity, if that has not been
1406    /// done yet.
1407    ///
1408    /// This will only create a new cross-signing identity if the user had never
1409    /// done it before. If the user did it before, then this is a no-op.
1410    ///
1411    /// See also the documentation of [`Self::bootstrap_cross_signing`] for the
1412    /// behavior of this function.
1413    ///
1414    /// # Arguments
1415    ///
1416    /// * `auth_data` - This request requires user interactive auth, the first
1417    ///   request needs to set this to `None` and will always fail with an
1418    ///   `UiaaResponse`. The response will contain information for the
1419    ///   interactive auth and the same request needs to be made but this time
1420    ///   with some `auth_data` provided.
1421    ///
1422    /// # Examples
1423    /// ```no_run
1424    /// # use std::collections::BTreeMap;
1425    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1426    /// # use url::Url;
1427    /// # use serde_json::json;
1428    /// # async {
1429    /// # let homeserver = Url::parse("http://example.com")?;
1430    /// # let client = Client::new(homeserver).await?;
1431    /// if let Err(e) = client.encryption().bootstrap_cross_signing_if_needed(None).await {
1432    ///     if let Some(response) = e.as_uiaa_response() {
1433    ///         let mut password = uiaa::Password::new(
1434    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
1435    ///             "wordpass".to_owned(),
1436    ///         );
1437    ///         password.session = response.session.clone();
1438    ///
1439    ///         // Note, on the failed attempt we can use `bootstrap_cross_signing` immediately, to
1440    ///         // avoid checks.
1441    ///         client
1442    ///             .encryption()
1443    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1444    ///             .await
1445    ///             .expect("Couldn't bootstrap cross signing")
1446    ///     } else {
1447    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1448    ///     }
1449    /// }
1450    /// # anyhow::Ok(()) };
1451    pub async fn bootstrap_cross_signing_if_needed(
1452        &self,
1453        auth_data: Option<AuthData>,
1454    ) -> Result<()> {
1455        let olm_machine = self.client.olm_machine().await;
1456        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1457        let user_id = olm_machine.user_id();
1458
1459        self.ensure_initial_key_query().await?;
1460
1461        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1462            self.bootstrap_cross_signing(auth_data).await?;
1463        }
1464
1465        Ok(())
1466    }
1467
1468    /// Export E2EE keys that match the given predicate encrypting them with the
1469    /// given passphrase.
1470    ///
1471    /// # Arguments
1472    ///
1473    /// * `path` - The file path where the exported key file will be saved.
1474    ///
1475    /// * `passphrase` - The passphrase that will be used to encrypt the
1476    ///   exported room keys.
1477    ///
1478    /// * `predicate` - A closure that will be called for every known
1479    ///   `InboundGroupSession`, which represents a room key. If the closure
1480    ///   returns `true` the `InboundGroupSessoin` will be included in the
1481    ///   export, if the closure returns `false` it will not be included.
1482    ///
1483    /// # Panics
1484    ///
1485    /// This method will panic if it isn't run on a Tokio runtime.
1486    ///
1487    /// This method will panic if it can't get enough randomness from the OS to
1488    /// encrypt the exported keys securely.
1489    ///
1490    /// # Examples
1491    ///
1492    /// ```no_run
1493    /// # use std::{path::PathBuf, time::Duration};
1494    /// # use matrix_sdk::{
1495    /// #     Client, config::SyncSettings,
1496    /// #     ruma::room_id,
1497    /// # };
1498    /// # use url::Url;
1499    /// # async {
1500    /// # let homeserver = Url::parse("http://localhost:8080")?;
1501    /// # let mut client = Client::new(homeserver).await?;
1502    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1503    /// // Export all room keys.
1504    /// client
1505    ///     .encryption()
1506    ///     .export_room_keys(path, "secret-passphrase", |_| true)
1507    ///     .await?;
1508    ///
1509    /// // Export only the room keys for a certain room.
1510    /// let path = PathBuf::from("/home/example/e2e-room-keys.txt");
1511    /// let room_id = room_id!("!test:localhost");
1512    ///
1513    /// client
1514    ///     .encryption()
1515    ///     .export_room_keys(path, "secret-passphrase", |s| s.room_id() == room_id)
1516    ///     .await?;
1517    /// # anyhow::Ok(()) };
1518    /// ```
1519    #[cfg(not(target_family = "wasm"))]
1520    pub async fn export_room_keys(
1521        &self,
1522        path: PathBuf,
1523        passphrase: &str,
1524        predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
1525    ) -> Result<()> {
1526        let olm = self.client.olm_machine().await;
1527        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1528
1529        let keys = olm.store().export_room_keys(predicate).await?;
1530        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1531
1532        let encrypt = move || -> Result<()> {
1533            let export: String =
1534                matrix_sdk_base::crypto::encrypt_room_key_export(&keys, &passphrase, 500_000)?;
1535            let mut file = std::fs::File::create(path)?;
1536            file.write_all(&export.into_bytes())?;
1537            Ok(())
1538        };
1539
1540        let task = tokio::task::spawn_blocking(encrypt);
1541        task.await.expect("Task join error")
1542    }
1543
1544    /// Import E2EE keys from the given file path.
1545    ///
1546    /// # Arguments
1547    ///
1548    /// * `path` - The file path where the exported key file will can be found.
1549    ///
1550    /// * `passphrase` - The passphrase that should be used to decrypt the
1551    ///   exported room keys.
1552    ///
1553    /// Returns a tuple of numbers that represent the number of sessions that
1554    /// were imported and the total number of sessions that were found in the
1555    /// key export.
1556    ///
1557    /// # Panics
1558    ///
1559    /// This method will panic if it isn't run on a Tokio runtime.
1560    ///
1561    /// ```no_run
1562    /// # use std::{path::PathBuf, time::Duration};
1563    /// # use matrix_sdk::{
1564    /// #     Client, config::SyncSettings,
1565    /// #     ruma::room_id,
1566    /// # };
1567    /// # use url::Url;
1568    /// # async {
1569    /// # let homeserver = Url::parse("http://localhost:8080")?;
1570    /// # let mut client = Client::new(homeserver).await?;
1571    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1572    /// let result =
1573    ///     client.encryption().import_room_keys(path, "secret-passphrase").await?;
1574    ///
1575    /// println!(
1576    ///     "Imported {} room keys out of {}",
1577    ///     result.imported_count, result.total_count
1578    /// );
1579    /// # anyhow::Ok(()) };
1580    /// ```
1581    #[cfg(not(target_family = "wasm"))]
1582    pub async fn import_room_keys(
1583        &self,
1584        path: PathBuf,
1585        passphrase: &str,
1586    ) -> Result<RoomKeyImportResult, RoomKeyImportError> {
1587        let olm = self.client.olm_machine().await;
1588        let olm = olm.as_ref().ok_or(RoomKeyImportError::StoreClosed)?;
1589        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1590
1591        let decrypt = move || {
1592            let file = std::fs::File::open(path)?;
1593            matrix_sdk_base::crypto::decrypt_room_key_export(file, &passphrase)
1594        };
1595
1596        let task = tokio::task::spawn_blocking(decrypt);
1597        let import = task.await.expect("Task join error")?;
1598
1599        let ret = olm.store().import_exported_room_keys(import, |_, _| {}).await?;
1600
1601        self.backups().maybe_trigger_backup();
1602
1603        Ok(ret)
1604    }
1605
1606    /// Receive notifications of room keys being received as a [`Stream`].
1607    ///
1608    /// Each time a room key is updated in any way, an update will be sent to
1609    /// the stream. Updates that happen at the same time are batched into a
1610    /// [`Vec`].
1611    ///
1612    /// If the reader of the stream lags too far behind, an error is broadcast
1613    /// containing the number of skipped items.
1614    ///
1615    /// # Examples
1616    ///
1617    /// ```no_run
1618    /// # use matrix_sdk::Client;
1619    /// # use url::Url;
1620    /// # async {
1621    /// # let homeserver = Url::parse("http://example.com")?;
1622    /// # let client = Client::new(homeserver).await?;
1623    /// use futures_util::StreamExt;
1624    ///
1625    /// let Some(mut room_keys_stream) =
1626    ///     client.encryption().room_keys_received_stream().await
1627    /// else {
1628    ///     return Ok(());
1629    /// };
1630    ///
1631    /// while let Some(update) = room_keys_stream.next().await {
1632    ///     println!("Received room keys {update:?}");
1633    /// }
1634    /// # anyhow::Ok(()) };
1635    /// ```
1636    pub async fn room_keys_received_stream(
1637        &self,
1638    ) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<>>
1639    {
1640        let olm = self.client.olm_machine().await;
1641        let olm = olm.as_ref()?;
1642
1643        Some(olm.store().room_keys_received_stream())
1644    }
1645
1646    /// Receive notifications of historic room key bundles as a [`Stream`].
1647    ///
1648    /// Historic room key bundles are defined in [MSC4268](https://github.com/matrix-org/matrix-spec-proposals/pull/4268).
1649    ///
1650    /// Each time a historic room key bundle was received, an update will be
1651    /// sent to the stream. This stream is useful for informative purposes
1652    /// exclusively, historic room key bundles are handled by the SDK
1653    /// automatically.
1654    ///
1655    /// # Examples
1656    ///
1657    /// ```no_run
1658    /// # use matrix_sdk::Client;
1659    /// # use url::Url;
1660    /// # async {
1661    /// # let homeserver = Url::parse("http://example.com")?;
1662    /// # let client = Client::new(homeserver).await?;
1663    /// use futures_util::StreamExt;
1664    ///
1665    /// let Some(mut bundle_stream) =
1666    ///     client.encryption().historic_room_key_stream().await
1667    /// else {
1668    ///     return Ok(());
1669    /// };
1670    ///
1671    /// while let Some(bundle_info) = bundle_stream.next().await {
1672    ///     println!("Received a historic room key bundle {bundle_info:?}");
1673    /// }
1674    /// # anyhow::Ok(()) };
1675    /// ```
1676    pub async fn historic_room_key_stream(
1677        &self,
1678    ) -> Option<impl Stream<Item = RoomKeyBundleInfo> + use<>> {
1679        let olm = self.client.olm_machine().await;
1680        let olm = olm.as_ref()?;
1681
1682        Some(olm.store().historic_room_key_stream())
1683    }
1684
1685    /// Get the secret storage manager of the client.
1686    pub fn secret_storage(&self) -> SecretStorage {
1687        SecretStorage { client: self.client.to_owned() }
1688    }
1689
1690    /// Get the backups manager of the client.
1691    pub fn backups(&self) -> Backups {
1692        Backups { client: self.client.to_owned() }
1693    }
1694
1695    /// Get the recovery manager of the client.
1696    pub fn recovery(&self) -> Recovery {
1697        Recovery { client: self.client.to_owned() }
1698    }
1699
1700    /// Enables the crypto-store cross-process lock.
1701    ///
1702    /// This may be required if there are multiple processes that may do writes
1703    /// to the same crypto store. In that case, it's necessary to create a
1704    /// lock, so that only one process writes to it, otherwise this may
1705    /// cause confusing issues because of stale data contained in in-memory
1706    /// caches.
1707    ///
1708    /// The provided `lock_value` must be a unique identifier for this process.
1709    /// Use [`Client::cross_process_lock_config`] to get the global value, if
1710    /// multi-process is enabled.
1711    pub async fn enable_cross_process_store_lock(&self, lock_value: String) -> Result<(), Error> {
1712        // If the lock has already been created, don't recreate it from scratch.
1713        if let Some(prev_lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1714            let prev_holder = prev_lock.lock_holder();
1715            if prev_holder.is_some() && prev_holder.unwrap() == lock_value {
1716                return Ok(());
1717            }
1718            warn!(
1719                "Recreating cross-process store lock with a different holder value: \
1720                 prev was {prev_holder:?}, new is {lock_value}"
1721            );
1722        }
1723
1724        let olm_machine = self.client.base_client().olm_machine().await;
1725        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1726
1727        let lock = olm_machine.store().create_store_lock(
1728            "cross_process_lock".to_owned(),
1729            CrossProcessLockConfig::multi_process(lock_value.to_owned()),
1730        );
1731
1732        // Gently try to initialize the crypto store generation counter.
1733        //
1734        // If we don't get the lock immediately, then it is already acquired by another
1735        // process, and we'll get to reload next time we acquire the lock.
1736        {
1737            let lock_result = lock.try_lock_once().await?;
1738
1739            if lock_result.is_ok() {
1740                olm_machine
1741                    .initialize_crypto_store_generation(
1742                        &self.client.locks().crypto_store_generation,
1743                    )
1744                    .await?;
1745            }
1746        }
1747
1748        self.client
1749            .locks()
1750            .cross_process_crypto_store_lock
1751            .set(lock)
1752            .map_err(|_| Error::BadCryptoStoreState)?;
1753
1754        Ok(())
1755    }
1756
1757    /// Maybe reload the `OlmMachine` after acquiring the lock for the first
1758    /// time.
1759    ///
1760    /// Returns the current generation number.
1761    async fn on_lock_newly_acquired(&self) -> Result<u64, Error> {
1762        let olm_machine_guard = self.client.olm_machine().await;
1763        if let Some(olm_machine) = olm_machine_guard.as_ref() {
1764            let (new_gen, generation_number) = olm_machine
1765                .maintain_crypto_store_generation(&self.client.locks().crypto_store_generation)
1766                .await?;
1767            // If the crypto store generation has changed,
1768            if new_gen {
1769                // (get rid of the reference to the current crypto store first)
1770                drop(olm_machine_guard);
1771                // Recreate the OlmMachine.
1772                self.client.base_client().regenerate_olm(None).await?;
1773            }
1774            Ok(generation_number)
1775        } else {
1776            // XXX: not sure this is reachable. Seems like the OlmMachine should always have
1777            // been initialised by the time we get here. Ideally we'd panic, or return an
1778            // error, but for now I'm just adding some logging to check if it
1779            // happens, and returning the magic number 0.
1780            warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised");
1781            Ok(0)
1782        }
1783    }
1784
1785    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1786    /// spin-waits until the lock is available.
1787    ///
1788    /// May reload the `OlmMachine`, after obtaining the lock but not on the
1789    /// first time.
1790    pub async fn spin_lock_store(
1791        &self,
1792        max_backoff: Option<u32>,
1793    ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1794        if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1795            let guard = lock
1796                .spin_lock(max_backoff)
1797                .await
1798                .map_err(|err| {
1799                    Error::CrossProcessLockError(Box::new(CrossProcessLockError::TryLock(
1800                        Arc::new(err),
1801                    )))
1802                })?
1803                .map_err(|err| Error::CrossProcessLockError(Box::new(err.into())))?;
1804
1805            let generation = self.on_lock_newly_acquired().await?;
1806
1807            Ok(Some(CrossProcessLockStoreGuardWithGeneration {
1808                _guard: guard.into_guard(),
1809                generation,
1810            }))
1811        } else {
1812            Ok(None)
1813        }
1814    }
1815
1816    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1817    /// attempts to lock it once.
1818    ///
1819    /// Returns a guard to the lock, if it was obtained.
1820    pub async fn try_lock_store_once(
1821        &self,
1822    ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1823        if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1824            let lock_result = lock.try_lock_once().await?;
1825
1826            let Some(guard) = lock_result.ok() else {
1827                return Ok(None);
1828            };
1829
1830            let generation = self.on_lock_newly_acquired().await?;
1831
1832            Ok(Some(CrossProcessLockStoreGuardWithGeneration {
1833                _guard: guard.into_guard(),
1834                generation,
1835            }))
1836        } else {
1837            Ok(None)
1838        }
1839    }
1840
1841    /// Testing purposes only.
1842    #[cfg(any(test, feature = "testing"))]
1843    pub async fn uploaded_key_count(&self) -> Result<u64> {
1844        let olm_machine = self.client.olm_machine().await;
1845        let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?;
1846        Ok(olm_machine.uploaded_key_count().await?)
1847    }
1848
1849    /// Bootstrap encryption and enables event listeners for the E2EE support.
1850    ///
1851    /// Based on the `EncryptionSettings`, this call might:
1852    /// - Bootstrap cross-signing if needed (POST `/device_signing/upload`)
1853    /// - Create a key backup if needed (POST `/room_keys/version`)
1854    /// - Create a secret storage if needed (PUT `/account_data/{type}`)
1855    ///
1856    /// As part of this process, and if needed, the current device keys would be
1857    /// uploaded to the server, new account data would be added, and cross
1858    /// signing keys and signatures might be uploaded.
1859    ///
1860    /// Should be called once we
1861    /// created a [`OlmMachine`], i.e. after logging in.
1862    ///
1863    /// # Arguments
1864    ///
1865    /// * `auth_data` - Some requests may require re-authentication. To prevent
1866    ///   the user from having to re-enter their password (or use other
1867    ///   methods), we can provide the authentication data here. This is
1868    ///   necessary for uploading cross-signing keys. However, please note that
1869    ///   there is a proposal (MSC3967) to remove this requirement, which would
1870    ///   allow for the initial upload of cross-signing keys without
1871    ///   authentication, rendering this parameter obsolete.
1872    pub(crate) async fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1873        // It's fine to be async here as we're only getting the lock protecting the
1874        // `OlmMachine`. Since the lock shouldn't be that contested right after logging
1875        // in we won't delay the login or restoration of the Client.
1876        let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite {
1877            Some(BundleReceiverTask::new(&self.client).await)
1878        } else {
1879            None
1880        };
1881
1882        let mut tasks = self.client.inner.e2ee.tasks.lock();
1883
1884        let this = self.clone();
1885
1886        tasks.setup_e2ee = Some(spawn(async move {
1887            // Update the current state first, so we don't have to wait for the result of
1888            // network requests
1889            this.update_verification_state().await;
1890
1891            if this.settings().auto_enable_cross_signing
1892                && let Err(e) = this.bootstrap_cross_signing_if_needed(auth_data).await
1893            {
1894                error!("Couldn't bootstrap cross signing {e:?}");
1895            }
1896
1897            if let Err(e) = this.backups().setup_and_resume().await {
1898                error!("Couldn't setup and resume backups {e:?}");
1899            }
1900            if let Err(e) = this.recovery().setup().await {
1901                error!("Couldn't setup and resume recovery {e:?}");
1902            }
1903        }));
1904
1905        tasks.receive_historic_room_key_bundles = bundle_receiver_task;
1906
1907        self.setup_room_membership_session_discard_handler();
1908    }
1909
1910    /// Waits for end-to-end encryption initialization tasks to finish, if any
1911    /// was running in the background.
1912    pub async fn wait_for_e2ee_initialization_tasks(&self) {
1913        let task = self.client.inner.e2ee.tasks.lock().setup_e2ee.take();
1914
1915        if let Some(task) = task
1916            && let Err(err) = task.await
1917        {
1918            warn!("Error when initializing backups: {err}");
1919        }
1920    }
1921
1922    /// Upload the device keys and initial set of one-time keys to the server.
1923    ///
1924    /// This should only be called when the user logs in for the first time,
1925    /// the method will ensure that other devices see our own device as an
1926    /// end-to-end encryption enabled one.
1927    ///
1928    /// **Warning**: Do not use this method if we're already calling
1929    /// [`Client::send_outgoing_request()`]. This method is intended for
1930    /// explicitly uploading the device keys before starting a sync.
1931    pub(crate) async fn ensure_device_keys_upload(&self) -> Result<()> {
1932        let olm = self.client.olm_machine().await;
1933        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1934
1935        if let Some((request_id, request)) = olm.upload_device_keys().await? {
1936            self.client.keys_upload(&request_id, &request).await?;
1937
1938            let (request_id, request) = olm.query_keys_for_users([olm.user_id()]);
1939            self.client.keys_query(&request_id, request.device_keys).await?;
1940        }
1941
1942        Ok(())
1943    }
1944
1945    pub(crate) async fn update_state_after_keys_query(&self, response: &get_keys::v3::Response) {
1946        self.recovery().update_state_after_keys_query(response).await;
1947
1948        // Only update the verification_state if our own devices changed
1949        if let Some(user_id) = self.client.user_id() {
1950            let contains_own_device = response.device_keys.contains_key(user_id);
1951
1952            if contains_own_device {
1953                self.update_verification_state().await;
1954            }
1955        }
1956    }
1957
1958    async fn update_verification_state(&self) {
1959        match self.get_own_device().await {
1960            Ok(device) => {
1961                if let Some(device) = device {
1962                    let is_verified = device.is_cross_signed_by_owner();
1963
1964                    if is_verified {
1965                        self.client.inner.verification_state.set(VerificationState::Verified);
1966                    } else {
1967                        self.client.inner.verification_state.set(VerificationState::Unverified);
1968                    }
1969                } else {
1970                    warn!("Couldn't find out own device in the store.");
1971                    self.client.inner.verification_state.set(VerificationState::Unknown);
1972                }
1973            }
1974            Err(error) => {
1975                warn!("Failed retrieving own device: {error}");
1976                self.client.inner.verification_state.set(VerificationState::Unknown);
1977            }
1978        }
1979    }
1980
1981    /// Sets up a handler to rotate room keys when a user leaves a room.
1982    ///
1983    /// Previously, it was sufficient to check if we need to rotate the room key
1984    /// prior to sending a message. However, the history sharing feature
1985    /// ([MSC4268]) breaks this logic:
1986    ///
1987    /// 1. Alice sends a message M1 in room X;
1988    /// 2. Bob invites Charlie, who joins and immediately leaves the room;
1989    /// 3. Alice sends another message M2 in room X.
1990    ///
1991    /// Under the old logic, Alice would not rotate her key after Charlie
1992    /// leaves, resulting in M2 being encrypted with the same session as M1.
1993    /// This would allow Charlie to decrypt M2 if he ever gains access to
1994    /// the event.
1995    ///
1996    /// This handler listens for changes to the room membership, and discards
1997    /// the current room key if the event is a `leave` event.
1998    ///
1999    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
2000    fn setup_room_membership_session_discard_handler(&self) {
2001        let client = WeakClient::from_client(&self.client);
2002        self.client.add_event_handler(|ev: OriginalSyncRoomMemberEvent, room: Room| async move {
2003            let Some(client) = client.get() else {
2004                // The main client has been dropped.
2005                return;
2006            };
2007            let Some(user_id) = client.user_id() else {
2008                // We aren't logged in, so this shouldn't ever happen.
2009                return;
2010            };
2011            let olm = client.olm_machine().await;
2012            let Some(olm) = olm.as_ref() else {
2013                warn!("Cannot discard session - Olm machine is not available");
2014                return;
2015            };
2016
2017            if !matches!(ev.membership_change(), MembershipChange::Left) || ev.sender == user_id {
2018                // We can ignore non-leave events and those that we sent.
2019                return;
2020            }
2021
2022            debug!(room_id = ?room.room_id(), member_id = ?ev.sender, "Discarding session as a user left the room");
2023
2024            // Attempt to discard the current room key. This won't do anything if we don't have one,
2025            // but that's fine since we will create a new room key whenever we try to send a message.
2026            if let Err(e) = olm.discard_room_key(room.room_id()).await {
2027                warn!(
2028                    room_id = ?room.room_id(),
2029                    "Error discarding room key after member leave: {e:?}"
2030                );
2031            }
2032        });
2033    }
2034
2035    /// Encrypts then send the given content via the `/sendToDevice` end-point
2036    /// using Olm encryption.
2037    ///
2038    /// If there are a lot of recipient devices multiple `/sendToDevice`
2039    /// requests might be sent out.
2040    ///
2041    /// # Returns
2042    /// A list of failures. The list of devices that couldn't get the messages.
2043    #[cfg(feature = "experimental-send-custom-to-device")]
2044    pub async fn encrypt_and_send_raw_to_device(
2045        &self,
2046        recipient_devices: Vec<&Device>,
2047        event_type: &str,
2048        content: Raw<AnyToDeviceEventContent>,
2049        share_strategy: CollectStrategy,
2050    ) -> Result<Vec<(OwnedUserId, OwnedDeviceId)>> {
2051        let users = recipient_devices.iter().map(|device| device.user_id());
2052
2053        // Will claim one-time-key for users that needs it
2054        // TODO: For later optimisation: This will establish missing olm sessions with
2055        // all this users devices, but we just want for some devices.
2056        self.client.claim_one_time_keys(users).await?;
2057
2058        let olm = self.client.olm_machine().await;
2059        let olm = olm.as_ref().expect("Olm machine wasn't started");
2060
2061        let (requests, withhelds) = olm
2062            .encrypt_content_for_devices(
2063                recipient_devices.into_iter().map(|d| d.deref().clone()).collect(),
2064                event_type,
2065                &content
2066                    .deserialize_as::<serde_json::Value>()
2067                    .expect("Deserialize as Value will always work"),
2068                share_strategy,
2069            )
2070            .await?;
2071
2072        let mut failures: Vec<(OwnedUserId, OwnedDeviceId)> = Default::default();
2073
2074        // Push the withhelds in the failures
2075        withhelds.iter().for_each(|(d, _)| {
2076            failures.push((d.user_id().to_owned(), d.device_id().to_owned()));
2077        });
2078
2079        // TODO: parallelize that? it's already grouping 250 devices per chunk.
2080        for request in requests {
2081            let ruma_request = RumaToDeviceRequest::new_raw(
2082                request.event_type.clone(),
2083                request.txn_id.clone(),
2084                request.messages.clone(),
2085            );
2086
2087            let send_result = self
2088                .client
2089                .send_inner(ruma_request, Some(RequestConfig::short_retry()), Default::default())
2090                .await;
2091
2092            // If the sending failed we need to collect the failures to report them
2093            if send_result.is_err() {
2094                // Mark the sending as failed
2095                for (user_id, device_map) in request.messages {
2096                    for device_id in device_map.keys() {
2097                        match device_id {
2098                            DeviceIdOrAllDevices::DeviceId(device_id) => {
2099                                failures.push((user_id.clone(), device_id.to_owned()));
2100                            }
2101                            DeviceIdOrAllDevices::AllDevices => {
2102                                // Cannot happen in this case
2103                            }
2104                        }
2105                    }
2106                }
2107            }
2108        }
2109
2110        Ok(failures)
2111    }
2112}
2113
2114#[cfg(all(test, not(target_family = "wasm")))]
2115mod tests {
2116    use std::{
2117        ops::Not,
2118        str::FromStr,
2119        sync::{
2120            Arc,
2121            atomic::{AtomicBool, Ordering},
2122        },
2123        time::Duration,
2124    };
2125
2126    use matrix_sdk_test::{
2127        DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
2128        event_factory::EventFactory,
2129    };
2130    use ruma::{
2131        event_id,
2132        events::{reaction::ReactionEventContent, relation::Annotation},
2133        user_id,
2134    };
2135    use serde_json::json;
2136    use wiremock::{
2137        Mock, MockServer, Request, ResponseTemplate,
2138        matchers::{header, method, path_regex},
2139    };
2140
2141    use crate::{
2142        Client, assert_next_matches_with_timeout,
2143        config::RequestConfig,
2144        encryption::{
2145            DuplicateOneTimeKeyErrorMessage, OAuthCrossSigningResetInfo, VerificationState,
2146        },
2147        test_utils::{
2148            client::mock_matrix_session, logged_in_client, no_retry_test_client, set_client_session,
2149        },
2150    };
2151
2152    #[async_test]
2153    async fn test_reaction_sending() {
2154        let server = MockServer::start().await;
2155        let client = logged_in_client(Some(server.uri())).await;
2156
2157        let event_id = event_id!("$2:example.org");
2158
2159        Mock::given(method("GET"))
2160            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
2161            .and(header("authorization", "Bearer 1234"))
2162            .respond_with(
2163                ResponseTemplate::new(200)
2164                    .set_body_json(EventFactory::new().room_encryption().into_content()),
2165            )
2166            .mount(&server)
2167            .await;
2168
2169        Mock::given(method("PUT"))
2170            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/m\.reaction/.*".to_owned()))
2171            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2172                "event_id": event_id,
2173            })))
2174            .mount(&server)
2175            .await;
2176
2177        let f = EventFactory::new().sender(user_id!("@example:localhost"));
2178        let response = SyncResponseBuilder::default()
2179            .add_joined_room(
2180                JoinedRoomBuilder::default()
2181                    .add_state_event(
2182                        f.member(user_id!("@example:localhost")).display_name("example"),
2183                    )
2184                    .add_state_event(f.default_power_levels())
2185                    .add_state_event(f.room_encryption()),
2186            )
2187            .build_sync_response();
2188
2189        client.base_client().receive_sync_response(response).await.unwrap();
2190
2191        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
2192        assert!(
2193            room.latest_encryption_state().await.expect("Getting encryption state").is_encrypted()
2194        );
2195
2196        let event_id = event_id!("$1:example.org");
2197        let reaction = ReactionEventContent::new(Annotation::new(event_id.into(), "🐈".to_owned()));
2198        room.send(reaction).await.expect("Sending the reaction should not fail");
2199
2200        room.send_raw("m.reaction", json!({})).await.expect("Sending the reaction should not fail");
2201    }
2202
2203    #[cfg(feature = "sqlite")]
2204    #[async_test]
2205    async fn test_generation_counter_invalidates_olm_machine() {
2206        // Create two clients using the same sqlite database.
2207
2208        use matrix_sdk_base::store::RoomLoadSettings;
2209        let sqlite_path = std::env::temp_dir().join("generation_counter_sqlite.db");
2210        let session = mock_matrix_session();
2211
2212        let client1 = Client::builder()
2213            .homeserver_url("http://localhost:1234")
2214            .request_config(RequestConfig::new().disable_retry())
2215            .sqlite_store(&sqlite_path, None)
2216            .build()
2217            .await
2218            .unwrap();
2219        client1
2220            .matrix_auth()
2221            .restore_session(session.clone(), RoomLoadSettings::default())
2222            .await
2223            .unwrap();
2224
2225        let client2 = Client::builder()
2226            .homeserver_url("http://localhost:1234")
2227            .request_config(RequestConfig::new().disable_retry())
2228            .sqlite_store(sqlite_path, None)
2229            .build()
2230            .await
2231            .unwrap();
2232        client2.matrix_auth().restore_session(session, RoomLoadSettings::default()).await.unwrap();
2233
2234        // When the lock isn't enabled, any attempt at locking won't return a guard.
2235        let guard = client1.encryption().try_lock_store_once().await.unwrap();
2236        assert!(guard.is_none());
2237
2238        client1.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2239        client2.encryption().enable_cross_process_store_lock("client2".to_owned()).await.unwrap();
2240
2241        // One client can take the lock.
2242        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2243        assert!(acquired1.is_some());
2244
2245        // Keep the olm machine, so we can see if it's changed later, by comparing Arcs.
2246        let initial_olm_machine =
2247            client1.olm_machine().await.clone().expect("must have an olm machine");
2248
2249        // Also enable backup to check that new machine has the same backup keys.
2250        let decryption_key = matrix_sdk_base::crypto::store::types::BackupDecryptionKey::new()
2251            .expect("Can't create new recovery key");
2252        let backup_key = decryption_key.megolm_v1_public_key();
2253        backup_key.set_version("1".to_owned());
2254        initial_olm_machine
2255            .backup_machine()
2256            .save_decryption_key(Some(decryption_key.to_owned()), Some("1".to_owned()))
2257            .await
2258            .expect("Should save");
2259
2260        initial_olm_machine.backup_machine().enable_backup_v1(backup_key.clone()).await.unwrap();
2261
2262        assert!(client1.encryption().backups().are_enabled().await);
2263
2264        // The other client can't take the lock too.
2265        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2266        assert!(acquired2.is_none());
2267
2268        // Now have the first client release the lock,
2269        drop(acquired1);
2270        tokio::time::sleep(Duration::from_millis(100)).await;
2271
2272        // And re-take it.
2273        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2274        assert!(acquired1.is_some());
2275
2276        // In that case, the Olm Machine shouldn't change.
2277        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2278        assert!(initial_olm_machine.same_as(&olm_machine));
2279
2280        // Ok, release again.
2281        drop(acquired1);
2282        tokio::time::sleep(Duration::from_millis(100)).await;
2283
2284        // Client2 can acquire the lock.
2285        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2286        assert!(acquired2.is_some());
2287
2288        // And then release it.
2289        drop(acquired2);
2290        tokio::time::sleep(Duration::from_millis(100)).await;
2291
2292        // Client1 can acquire it again,
2293        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2294        assert!(acquired1.is_some());
2295
2296        // But now its olm machine has been invalidated and thus regenerated!
2297        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2298
2299        assert!(!initial_olm_machine.same_as(&olm_machine));
2300
2301        let backup_key_new = olm_machine.backup_machine().get_backup_keys().await.unwrap();
2302        assert!(backup_key_new.decryption_key.is_some());
2303        assert_eq!(
2304            backup_key_new.decryption_key.unwrap().megolm_v1_public_key().to_base64(),
2305            backup_key.to_base64()
2306        );
2307        assert!(client1.encryption().backups().are_enabled().await);
2308    }
2309
2310    #[cfg(feature = "sqlite")]
2311    #[async_test]
2312    async fn test_generation_counter_no_spurious_invalidation() {
2313        // Create two clients using the same sqlite database.
2314
2315        use matrix_sdk_base::store::RoomLoadSettings;
2316        let sqlite_path =
2317            std::env::temp_dir().join("generation_counter_no_spurious_invalidations.db");
2318        let session = mock_matrix_session();
2319
2320        let client = Client::builder()
2321            .homeserver_url("http://localhost:1234")
2322            .request_config(RequestConfig::new().disable_retry())
2323            .sqlite_store(&sqlite_path, None)
2324            .build()
2325            .await
2326            .unwrap();
2327        client
2328            .matrix_auth()
2329            .restore_session(session.clone(), RoomLoadSettings::default())
2330            .await
2331            .unwrap();
2332
2333        let initial_olm_machine = client.olm_machine().await.as_ref().unwrap().clone();
2334
2335        client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2336
2337        // Enabling the lock doesn't update the olm machine.
2338        let after_enabling_lock = client.olm_machine().await.as_ref().unwrap().clone();
2339        assert!(initial_olm_machine.same_as(&after_enabling_lock));
2340
2341        {
2342            // Simulate that another client hold the lock before.
2343            let client2 = Client::builder()
2344                .homeserver_url("http://localhost:1234")
2345                .request_config(RequestConfig::new().disable_retry())
2346                .sqlite_store(sqlite_path, None)
2347                .build()
2348                .await
2349                .unwrap();
2350            client2
2351                .matrix_auth()
2352                .restore_session(session, RoomLoadSettings::default())
2353                .await
2354                .unwrap();
2355
2356            client2
2357                .encryption()
2358                .enable_cross_process_store_lock("client2".to_owned())
2359                .await
2360                .unwrap();
2361
2362            let guard = client2.encryption().spin_lock_store(None).await.unwrap();
2363            assert!(guard.is_some());
2364
2365            drop(guard);
2366            tokio::time::sleep(Duration::from_millis(100)).await;
2367        }
2368
2369        {
2370            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2371            assert!(acquired.is_some());
2372        }
2373
2374        // Taking the lock the first time will update the olm machine.
2375        let after_taking_lock_first_time = client.olm_machine().await.as_ref().unwrap().clone();
2376        assert!(!initial_olm_machine.same_as(&after_taking_lock_first_time));
2377
2378        {
2379            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2380            assert!(acquired.is_some());
2381        }
2382
2383        // Re-taking the lock doesn't update the olm machine.
2384        let after_taking_lock_second_time = client.olm_machine().await.as_ref().unwrap().clone();
2385        assert!(after_taking_lock_first_time.same_as(&after_taking_lock_second_time));
2386    }
2387
2388    #[async_test]
2389    async fn test_update_verification_state_is_updated_before_any_requests_happen() {
2390        // Given a client and a server
2391        let client = no_retry_test_client(None).await;
2392        let server = MockServer::start().await;
2393
2394        // When we subscribe to its verification state
2395        let mut verification_state = client.encryption().verification_state();
2396
2397        // We can get its initial value, and it's Unknown
2398        assert_next_matches_with_timeout!(verification_state, VerificationState::Unknown);
2399
2400        // We set up a mocked request to check this endpoint is not called before
2401        // reading the new state
2402        let keys_requested = Arc::new(AtomicBool::new(false));
2403        let inner_bool = keys_requested.clone();
2404
2405        Mock::given(method("GET"))
2406            .and(path_regex(
2407                r"/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
2408            ))
2409            .respond_with(move |_req: &Request| {
2410                inner_bool.fetch_or(true, Ordering::SeqCst);
2411                ResponseTemplate::new(200).set_body_json(json!({}))
2412            })
2413            .mount(&server)
2414            .await;
2415
2416        // When the session is initialised and the encryption tasks spawn
2417        set_client_session(&client).await;
2418
2419        // Then we can get an updated value without waiting for any network requests
2420        assert!(keys_requested.load(Ordering::SeqCst).not());
2421        assert_next_matches_with_timeout!(verification_state, VerificationState::Unverified);
2422    }
2423
2424    #[test]
2425    fn test_oauth_reset_info_from_uiaa_info() {
2426        let auth_info = json!({
2427            "session": "dummy",
2428            "flows": [
2429                {
2430                    "stages": [
2431                        "org.matrix.cross_signing_reset"
2432                    ]
2433                }
2434            ],
2435            "params": {
2436                "org.matrix.cross_signing_reset": {
2437                    "url": "https://example.org/account/account?action=org.matrix.cross_signing_reset"
2438                }
2439            },
2440            "msg": "To reset..."
2441        });
2442
2443        let auth_info = serde_json::from_value(auth_info)
2444            .expect("We should be able to deserialize the UiaaInfo");
2445        OAuthCrossSigningResetInfo::from_auth_info(&auth_info)
2446            .expect("We should be able to fetch the cross-signing reset info from the auth info");
2447    }
2448
2449    #[test]
2450    fn test_duplicate_one_time_key_error_parsing() {
2451        let message = concat!(
2452            r#"One time key signed_curve25519:AAAAAAAAAAA already exists. "#,
2453            r#"Old key: {"key":"dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs","#,
2454            r#""signatures":{"@example:matrix.org":{"ed25519:AAAAAAAAAA":""#,
2455            r#"Fk45zHAbrd+1j9wZXLjL2Y/+DU/Mnz9yuvlfYBOOT7qExN2Jdud+5BAuNs8nZ/caS4wTF39Kg3zQpzaGERoCBg"}}};"#,
2456            r#" new key: {'key': 'CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno', "#,
2457            r#"'signatures': {'@example:matrix.org': {'ed25519:AAAAAAAAAA': "#,
2458            r#"'BQ9Gp0p+6srF+c8OyruqKKd9R4yaub3THYAyyBB/7X/rG8BwcAqFynzl1aGyFYun4Q+087a5OSiglCXI+/kQAA'}}}"#
2459        );
2460        let message = DuplicateOneTimeKeyErrorMessage::from_str(message)
2461            .expect("We should be able to parse the error message");
2462
2463        assert_eq!(message.old_key.to_base64(), "dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs");
2464        assert_eq!(message.new_key.to_base64(), "CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno");
2465
2466        DuplicateOneTimeKeyErrorMessage::from_str("One time key already exists.")
2467            .expect_err("We shouldn't be able to parse an incomplete error message");
2468    }
2469
2470    // Helper function for the test_devices_to_verify_against_* tests.  Make a
2471    // response to a /keys/query request using the given device keys and a
2472    // pre-defined set of cross-signing keys.
2473    fn devices_to_verify_against_keys_query_response(
2474        devices: Vec<serde_json::Value>,
2475    ) -> serde_json::Value {
2476        let device_keys: serde_json::Map<String, serde_json::Value> = devices
2477            .into_iter()
2478            .map(|device| (device.get("device_id").unwrap().as_str().unwrap().to_owned(), device))
2479            .collect();
2480        json!({
2481            "device_keys": {
2482                "@example:localhost": device_keys,
2483            },
2484            "master_keys": {
2485                "@example:localhost": {
2486                    "keys": {
2487                        "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU",
2488                    },
2489                    "usage": ["master"],
2490                    "user_id": "@example:localhost",
2491                },
2492            },
2493            "self_signing_keys": {
2494                "@example:localhost": {
2495                    "keys": {
2496                        "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM",
2497                    },
2498                    "usage": ["self_signing"],
2499                    "user_id": "@example:localhost",
2500                    "signatures": {
2501                        "@example:localhost": {
2502                            "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "etO1bB+rCk+TQ/FcjQ8eWu/RsRNQNNQ1Ek+PD6//j8yz6igRjfvuHZaMvr/quAFrirfgExph2TdOwlDgN5bFCQ",
2503                        },
2504                    },
2505                },
2506            },
2507            "user_signing_keys": {
2508                "@example:localhost": {
2509                    "keys": {
2510                        "ed25519:CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0": "CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0",
2511                    },
2512                    "usage": ["user_signing"],
2513                    "user_id": "@example:localhost",
2514                    "signatures": {
2515                        "@example:localhost": {
2516                            "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "E/DFi/hQTIb/7eSB+HbCXeTLFaLjqWHzLO9GwjL1qdhfO7ew4p6YdtXSH3T2YYr1dKCPteH/4nMYVwOhww2CBg",
2517                        },
2518                    },
2519                },
2520            }
2521        })
2522    }
2523
2524    // The following three tests test that we can detect whether the user has
2525    // other devices that they can verify against under different conditions.
2526    #[async_test]
2527    /// Test that we detect that can't verify against another device if we have
2528    /// no devices.
2529    async fn test_devices_to_verify_against_no_devices() {
2530        let server = MockServer::start().await;
2531        let client = logged_in_client(Some(server.uri())).await;
2532
2533        Mock::given(method("POST"))
2534            .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2535            .respond_with(
2536                ResponseTemplate::new(200)
2537                    .set_body_json(devices_to_verify_against_keys_query_response(vec![])),
2538            )
2539            .mount(&server)
2540            .await;
2541
2542        assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2543    }
2544
2545    #[async_test]
2546    /// Test that we detect that we can verify against another cross-signed
2547    /// regular device.
2548    async fn test_devices_to_verify_against_cross_signed() {
2549        let server = MockServer::start().await;
2550        let client = logged_in_client(Some(server.uri())).await;
2551
2552        Mock::given(method("POST"))
2553            .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2554            .respond_with(ResponseTemplate::new(200).set_body_json(
2555                devices_to_verify_against_keys_query_response(vec![
2556                    json!({
2557                        "algorithms": [
2558                            "m.olm.v1.curve25519-aes-sha2",
2559                            "m.megolm.v1.aes-sha2",
2560                        ],
2561                        "user_id": "@example:localhost",
2562                        "device_id": "SIGNEDDEVICE",
2563                        "keys": {
2564                            "curve25519:SIGNEDDEVICE": "o1LqUtH/sqd3WF+BB2Qr77uw3sDmZhMOz68/IV9aHxs",
2565                            "ed25519:SIGNEDDEVICE": "iVoEfMOoUqxXVMLdpZCOgvQuCrT3/kQWkBmB3Phi/lo",
2566                        },
2567                        "signatures": {
2568                            "@example:localhost": {
2569                                "ed25519:SIGNEDDEVICE": "C7yRu1fNrdD2EobVdtANMqk3LBtWtTRWrIU22xVS8/Om1kmA/luzek64R3N6JsZhYczVmZYBKhUC9kRvHHwOBg",
2570                                "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "frfh2HP28GclmGvwTic00Fj4nZCvm4RlRA6U56mnD5920hOi04+L055ojzp6ybZXvC/GQYfyTHwQXlUN1nvxBA",
2571                            },
2572                        },
2573                    })
2574                ])
2575            ))
2576            .mount(&server)
2577            .await;
2578
2579        assert!(client.encryption().has_devices_to_verify_against().await.unwrap());
2580    }
2581
2582    #[async_test]
2583    /// Test that we detect that we can't verify against a dehydrated or
2584    /// unsigned device.
2585    async fn test_devices_to_verify_against_dehydrated_and_unsigned() {
2586        let server = MockServer::start().await;
2587        let client = logged_in_client(Some(server.uri())).await;
2588        let user_id = client.user_id().unwrap();
2589        let olm_machine = client.olm_machine().await;
2590        let olm_machine = olm_machine.as_ref().unwrap();
2591
2592        Mock::given(method("POST"))
2593            .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2594            .respond_with(ResponseTemplate::new(200).set_body_json(
2595                devices_to_verify_against_keys_query_response(vec![
2596                    json!({
2597                        "algorithms": [
2598                            "m.olm.v1.curve25519-aes-sha2",
2599                            "m.megolm.v1.aes-sha2",
2600                        ],
2601                        "user_id": "@example:localhost",
2602                        "device_id": "DEHYDRATEDDEVICE",
2603                        "keys": {
2604                            "curve25519:DEHYDRATEDDEVICE": "XOn5VguAgokZ3p9mBz2yOB395fn6j75G8jIPcXEWQGY",
2605                            "ed25519:DEHYDRATEDDEVICE": "4GG5xmBT7z4rgUgmWNlKZ+ABE3QlGgTorF+luCnKfYI",
2606                        },
2607                        "dehydrated": true,
2608                        "signatures": {
2609                            "@example:localhost": {
2610                                "ed25519:DEHYDRATEDDEVICE": "+OMasB7nzVlMV+zRDxkh4h8h/Q0bY42P1SPv7X2IURIelT5G+d+AYSmg30N4maphxEDBqt/vI8/lIr71exc3Dg",
2611                                "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "8DzynAgbYgXX1Md5d4Vw91Zstpoi4dpG7levFeVhi4psCAWuBnV76Qu1s2TGjQQ0CLDXEqcxxuX9X4eUK5TGCg",
2612                            },
2613                        },
2614                    }),
2615                    json!({
2616                        "algorithms": [
2617                            "m.olm.v1.curve25519-aes-sha2",
2618                            "m.megolm.v1.aes-sha2",
2619                        ],
2620                        "user_id": "@example:localhost",
2621                        "device_id": "UNSIGNEDDEVICE",
2622                        "keys": {
2623                            "curve25519:UNSIGNEDDEVICE": "mMby6NpprkHxj+ONfO9Z5lBqVUHJBMkrPFSNJhogBkg",
2624                            "ed25519:UNSIGNEDDEVICE": "Zifq39ZDrlIaSRf0Hh22owEqXCPE+1JSSgs6LDlubwQ",
2625                        },
2626                        "signatures": {
2627                            "@example:localhost": {
2628                                "ed25519:UNSIGNEDDEVICE": "+L29RoDKoTufPGm/Bae65KHno7Z1H7GYhxSKpB4RQZRS7NrR29AMW1PVhEsIozYuDVEFuMZ0L8H3dlcaHxagBA",
2629                            },
2630                        },
2631                    }),
2632                ])
2633            ))
2634            .mount(&server)
2635            .await;
2636
2637        let (request_id, request) = olm_machine.query_keys_for_users([user_id]);
2638        client.keys_query(&request_id, request.device_keys).await.unwrap();
2639
2640        assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2641    }
2642}