matrix_sdk/encryption/
mod.rs

1// Copyright 2021 The Matrix.org Foundation C.I.C.
2// Copyright 2021 Damir Jelić
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![doc = include_str!("../docs/encryption.md")]
17#![cfg_attr(target_family = "wasm", allow(unused_imports))]
18
19#[cfg(feature = "experimental-send-custom-to-device")]
20use std::ops::Deref;
21use std::{
22    collections::{BTreeMap, HashSet},
23    io::{Cursor, Read, Write},
24    iter,
25    path::PathBuf,
26    str::FromStr,
27    sync::Arc,
28};
29
30use eyeball::{SharedObservable, Subscriber};
31use futures_core::Stream;
32use futures_util::{
33    future::try_join,
34    stream::{self, StreamExt},
35};
36#[cfg(feature = "experimental-send-custom-to-device")]
37use matrix_sdk_base::crypto::CollectStrategy;
38use matrix_sdk_base::{
39    StateStoreDataKey, StateStoreDataValue,
40    crypto::{
41        CrossSigningBootstrapRequests, OlmMachine,
42        store::types::{RoomKeyBundleInfo, RoomKeyInfo},
43        types::{
44            SignedKey,
45            requests::{
46                OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
47            },
48        },
49    },
50};
51use matrix_sdk_common::{executor::spawn, locks::Mutex as StdMutex};
52use ruma::{
53    DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, TransactionId, UserId,
54    api::client::{
55        error::ErrorBody,
56        keys::{
57            get_keys, upload_keys, upload_signatures::v3::Request as UploadSignaturesRequest,
58            upload_signing_keys::v3::Request as UploadSigningKeysRequest,
59        },
60        message::send_message_event,
61        to_device::send_event_to_device::v3::{
62            Request as RumaToDeviceRequest, Response as ToDeviceResponse,
63        },
64        uiaa::{AuthData, UiaaInfo},
65    },
66    assign,
67    events::{
68        direct::DirectUserIdentifier,
69        room::{MediaSource, ThumbnailInfo},
70    },
71};
72#[cfg(feature = "experimental-send-custom-to-device")]
73use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices};
74use serde::{Deserialize, de::Error as _};
75use tasks::BundleReceiverTask;
76use tokio::sync::{Mutex, RwLockReadGuard};
77use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
78use tracing::{debug, error, instrument, trace, warn};
79use url::Url;
80use vodozemac::Curve25519PublicKey;
81
82use self::{
83    backups::{Backups, types::BackupClientState},
84    futures::UploadEncryptedFile,
85    identities::{Device, DeviceUpdates, IdentityUpdates, UserDevices, UserIdentity},
86    recovery::{Recovery, RecoveryState},
87    secret_storage::SecretStorage,
88    tasks::{BackupDownloadTask, BackupUploadingTask, ClientTasks},
89    verification::{SasVerification, Verification, VerificationRequest},
90};
91use crate::{
92    Client, Error, HttpError, Result, Room, RumaApiError, TransmissionProgress,
93    attachment::Thumbnail,
94    client::{ClientInner, WeakClient},
95    cross_process_lock::CrossProcessLockGuard,
96    error::HttpResult,
97};
98
99pub mod backups;
100pub mod futures;
101pub mod identities;
102pub mod recovery;
103pub mod secret_storage;
104pub(crate) mod tasks;
105pub mod verification;
106
107pub use matrix_sdk_base::crypto::{
108    CrossSigningStatus, CryptoStoreError, DecryptorError, EventError, KeyExportError, LocalTrust,
109    MediaEncryptionInfo, MegolmError, OlmError, RoomKeyImportResult, SecretImportError,
110    SessionCreationError, SignatureError, VERSION,
111    olm::{
112        SessionCreationError as MegolmSessionCreationError,
113        SessionExportError as OlmSessionExportError,
114    },
115    vodozemac,
116};
117
118#[cfg(feature = "experimental-send-custom-to-device")]
119use crate::config::RequestConfig;
120pub use crate::error::RoomKeyImportError;
121
122/// All the data related to the encryption state.
123pub(crate) struct EncryptionData {
124    /// Background tasks related to encryption (key backup, initialization
125    /// tasks, etc.).
126    pub tasks: StdMutex<ClientTasks>,
127
128    /// End-to-end encryption settings.
129    pub encryption_settings: EncryptionSettings,
130
131    /// All state related to key backup.
132    pub backup_state: BackupClientState,
133
134    /// All state related to secret storage recovery.
135    pub recovery_state: SharedObservable<RecoveryState>,
136}
137
138impl EncryptionData {
139    pub fn new(encryption_settings: EncryptionSettings) -> Self {
140        Self {
141            encryption_settings,
142
143            tasks: StdMutex::new(Default::default()),
144            backup_state: Default::default(),
145            recovery_state: Default::default(),
146        }
147    }
148
149    pub fn initialize_tasks(&self, client: &Arc<ClientInner>) {
150        let weak_client = WeakClient::from_inner(client);
151
152        let mut tasks = self.tasks.lock();
153        tasks.upload_room_keys = Some(BackupUploadingTask::new(weak_client.clone()));
154
155        if self.encryption_settings.backup_download_strategy
156            == BackupDownloadStrategy::AfterDecryptionFailure
157        {
158            tasks.download_room_keys = Some(BackupDownloadTask::new(weak_client));
159        }
160    }
161
162    /// Initialize the background task which listens for changes in the
163    /// [`backups::BackupState`] and updataes the [`recovery::RecoveryState`].
164    ///
165    /// This should happen after the usual tasks have been set up and after the
166    /// E2EE initialization tasks have been set up.
167    pub fn initialize_recovery_state_update_task(&self, client: &Client) {
168        let mut guard = self.tasks.lock();
169
170        let future = Recovery::update_state_after_backup_state_change(client);
171        let join_handle = spawn(future);
172
173        guard.update_recovery_state_after_backup = Some(join_handle);
174    }
175}
176
177/// Settings for end-to-end encryption features.
178#[derive(Clone, Copy, Debug, Default)]
179pub struct EncryptionSettings {
180    /// Automatically bootstrap cross-signing for a user once they're logged, in
181    /// case it's not already done yet.
182    ///
183    /// This requires to login with a username and password, or that MSC3967 is
184    /// enabled on the server, as of 2023-10-20.
185    pub auto_enable_cross_signing: bool,
186
187    /// Select a strategy to download room keys from the backup, by default room
188    /// keys won't be downloaded from the backup automatically.
189    ///
190    /// Take a look at the [`BackupDownloadStrategy`] enum for more options.
191    pub backup_download_strategy: BackupDownloadStrategy,
192
193    /// Automatically create a backup version if no backup exists.
194    pub auto_enable_backups: bool,
195}
196
197/// Settings for end-to-end encryption features.
198#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
199#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
200pub enum BackupDownloadStrategy {
201    /// Automatically download all room keys from the backup when the backup
202    /// recovery key has been received. The backup recovery key can be received
203    /// in two ways:
204    ///
205    /// 1. Received as a `m.secret.send` to-device event, after a successful
206    ///    interactive verification.
207    /// 2. Imported from secret storage (4S) using the
208    ///    [`SecretStore::import_secrets()`] method.
209    ///
210    /// [`SecretStore::import_secrets()`]: crate::encryption::secret_storage::SecretStore::import_secrets
211    OneShot,
212
213    /// Attempt to download a single room key if an event fails to be decrypted.
214    AfterDecryptionFailure,
215
216    /// Don't download any room keys automatically. The user can manually
217    /// download room keys using the [`Backups::download_room_key()`] methods.
218    ///
219    /// This is the default option.
220    #[default]
221    Manual,
222}
223
224/// The verification state of our own device
225///
226/// This enum tells us if our own user identity trusts these devices, in other
227/// words it tells us if the user identity has signed the device.
228#[derive(Clone, Copy, Debug, Eq, PartialEq)]
229pub enum VerificationState {
230    /// The verification state is unknown for now.
231    Unknown,
232    /// The device is considered to be verified, it has been signed by its user
233    /// identity.
234    Verified,
235    /// The device is unverified.
236    Unverified,
237}
238
239/// Wraps together a `CrossProcessLockStoreGuard` and a generation number.
240#[derive(Debug)]
241pub struct CrossProcessLockStoreGuardWithGeneration {
242    _guard: CrossProcessLockGuard,
243    generation: u64,
244}
245
246impl CrossProcessLockStoreGuardWithGeneration {
247    /// Return the Crypto Store generation associated with this store lock.
248    pub fn generation(&self) -> u64 {
249        self.generation
250    }
251}
252
253/// A stateful struct remembering the cross-signing keys we need to upload.
254///
255/// Since the `/_matrix/client/v3/keys/device_signing/upload` might require
256/// additional authentication, this struct will contain information on the type
257/// of authentication the user needs to complete before the upload might be
258/// continued.
259///
260/// More info can be found in the [spec].
261///
262/// [spec]: https://spec.matrix.org/v1.11/client-server-api/#post_matrixclientv3keysdevice_signingupload
263#[derive(Debug)]
264pub struct CrossSigningResetHandle {
265    client: Client,
266    upload_request: UploadSigningKeysRequest,
267    signatures_request: UploadSignaturesRequest,
268    auth_type: CrossSigningResetAuthType,
269    is_cancelled: Mutex<bool>,
270}
271
272impl CrossSigningResetHandle {
273    /// Set up a new `CrossSigningResetHandle`.
274    pub fn new(
275        client: Client,
276        upload_request: UploadSigningKeysRequest,
277        signatures_request: UploadSignaturesRequest,
278        auth_type: CrossSigningResetAuthType,
279    ) -> Self {
280        Self {
281            client,
282            upload_request,
283            signatures_request,
284            auth_type,
285            is_cancelled: Mutex::new(false),
286        }
287    }
288
289    /// Get the [`CrossSigningResetAuthType`] this cross-signing reset process
290    /// is using.
291    pub fn auth_type(&self) -> &CrossSigningResetAuthType {
292        &self.auth_type
293    }
294
295    /// Continue the cross-signing reset by either waiting for the
296    /// authentication to be done on the side of the OAuth 2.0 server or by
297    /// providing additional [`AuthData`] the homeserver requires.
298    pub async fn auth(&self, auth: Option<AuthData>) -> Result<()> {
299        let mut upload_request = self.upload_request.clone();
300        upload_request.auth = auth;
301
302        while let Err(e) = self.client.send(upload_request.clone()).await {
303            if *self.is_cancelled.lock().await {
304                return Ok(());
305            }
306
307            match e.as_uiaa_response() {
308                Some(uiaa_info) => {
309                    if uiaa_info.auth_error.is_some() {
310                        return Err(e.into());
311                    }
312                }
313                None => return Err(e.into()),
314            }
315        }
316
317        self.client.send(self.signatures_request.clone()).await?;
318
319        Ok(())
320    }
321
322    /// Cancel the ongoing identity reset process
323    pub async fn cancel(&self) {
324        *self.is_cancelled.lock().await = true;
325    }
326}
327
328/// information about the additional authentication that is required before the
329/// cross-signing keys can be uploaded.
330#[derive(Debug, Clone)]
331pub enum CrossSigningResetAuthType {
332    /// The homeserver requires user-interactive authentication.
333    Uiaa(UiaaInfo),
334    /// OAuth 2.0 is used for authentication and the user needs to open a URL to
335    /// approve the upload of cross-signing keys.
336    OAuth(OAuthCrossSigningResetInfo),
337}
338
339impl CrossSigningResetAuthType {
340    fn new(error: &HttpError) -> Result<Option<Self>> {
341        if let Some(auth_info) = error.as_uiaa_response() {
342            if let Ok(auth_info) = OAuthCrossSigningResetInfo::from_auth_info(auth_info) {
343                Ok(Some(CrossSigningResetAuthType::OAuth(auth_info)))
344            } else {
345                Ok(Some(CrossSigningResetAuthType::Uiaa(auth_info.clone())))
346            }
347        } else {
348            Ok(None)
349        }
350    }
351}
352
353/// OAuth 2.0 specific information about the required authentication for the
354/// upload of cross-signing keys.
355#[derive(Debug, Clone, Deserialize)]
356pub struct OAuthCrossSigningResetInfo {
357    /// The URL where the user can approve the reset of the cross-signing keys.
358    pub approval_url: Url,
359}
360
361impl OAuthCrossSigningResetInfo {
362    fn from_auth_info(auth_info: &UiaaInfo) -> Result<Self> {
363        let parameters = serde_json::from_str::<OAuthCrossSigningResetUiaaParameters>(
364            auth_info.params.as_ref().map(|value| value.get()).unwrap_or_default(),
365        )?;
366
367        Ok(OAuthCrossSigningResetInfo { approval_url: parameters.reset.url })
368    }
369}
370
371/// The parsed `parameters` part of a [`ruma::api::client::uiaa::UiaaInfo`]
372/// response
373#[derive(Debug, Deserialize)]
374struct OAuthCrossSigningResetUiaaParameters {
375    /// The URL where the user can approve the reset of the cross-signing keys.
376    #[serde(rename = "org.matrix.cross_signing_reset")]
377    reset: OAuthCrossSigningResetUiaaResetParameter,
378}
379
380/// The `org.matrix.cross_signing_reset` part of the Uiaa response `parameters``
381/// dictionary.
382#[derive(Debug, Deserialize)]
383struct OAuthCrossSigningResetUiaaResetParameter {
384    /// The URL where the user can approve the reset of the cross-signing keys.
385    url: Url,
386}
387
388/// A struct that helps to parse the custom error message Synapse posts if a
389/// duplicate one-time key is uploaded.
390#[derive(Debug)]
391struct DuplicateOneTimeKeyErrorMessage {
392    /// The previously uploaded one-time key.
393    old_key: Curve25519PublicKey,
394    /// The one-time key we're attempting to upload right now.
395    new_key: Curve25519PublicKey,
396}
397
398impl FromStr for DuplicateOneTimeKeyErrorMessage {
399    type Err = serde_json::Error;
400
401    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
402        // First we split the string into two parts, the part containing the old key and
403        // the part containing the new key. The parts are conveniently separated
404        // by a `;` character.
405        let mut split = s.split_terminator(';');
406
407        let old_key = split
408            .next()
409            .ok_or(serde_json::Error::custom("Old key is missing in the error message"))?;
410        let new_key = split
411            .next()
412            .ok_or(serde_json::Error::custom("New key is missing in the error message"))?;
413
414        // Now we remove the lengthy prefix from the part containing the old key, we
415        // should be left with just the JSON of the signed key.
416        let old_key_index = old_key
417            .find("Old key:")
418            .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
419
420        let old_key = old_key[old_key_index..]
421            .trim()
422            .strip_prefix("Old key:")
423            .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
424
425        // The part containing the new key is much simpler, we just remove a static
426        // prefix.
427        let new_key = new_key
428            .trim()
429            .strip_prefix("new key:")
430            .ok_or(serde_json::Error::custom("New key is missing the prefix"))?;
431
432        // The JSON containing the new key is for some reason quoted using single
433        // quotes, so let's replace them with normal double quotes.
434        let new_key = new_key.replace("'", "\"");
435
436        // Let's deserialize now.
437        let old_key: SignedKey = serde_json::from_str(old_key)?;
438        let new_key: SignedKey = serde_json::from_str(&new_key)?;
439
440        // Pick out the Curve keys, we don't care about the rest that much.
441        let old_key = old_key.key();
442        let new_key = new_key.key();
443
444        Ok(Self { old_key, new_key })
445    }
446}
447
448impl Client {
449    pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
450        self.base_client().olm_machine().await
451    }
452
453    pub(crate) async fn mark_request_as_sent(
454        &self,
455        request_id: &TransactionId,
456        response: impl Into<matrix_sdk_base::crypto::types::requests::AnyIncomingResponse<'_>>,
457    ) -> Result<(), matrix_sdk_base::Error> {
458        Ok(self
459            .olm_machine()
460            .await
461            .as_ref()
462            .expect(
463                "We should have an olm machine once we try to mark E2EE related requests as sent",
464            )
465            .mark_request_as_sent(request_id, response)
466            .await?)
467    }
468
469    /// Query the server for users device keys.
470    ///
471    /// # Panics
472    ///
473    /// Panics if no key query needs to be done.
474    #[instrument(skip(self, device_keys))]
475    pub(crate) async fn keys_query(
476        &self,
477        request_id: &TransactionId,
478        device_keys: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
479    ) -> Result<get_keys::v3::Response> {
480        let request = assign!(get_keys::v3::Request::new(), { device_keys });
481
482        let response = self.send(request).await?;
483        self.mark_request_as_sent(request_id, &response).await?;
484        self.encryption().update_state_after_keys_query(&response).await;
485
486        Ok(response)
487    }
488
489    /// Construct a [`EncryptedFile`][ruma::events::room::EncryptedFile] by
490    /// encrypting and uploading a provided reader.
491    ///
492    /// # Arguments
493    ///
494    /// * `content_type` - The content type of the file.
495    /// * `reader` - The reader that should be encrypted and uploaded.
496    ///
497    /// # Examples
498    ///
499    /// ```no_run
500    /// # use matrix_sdk::Client;
501    /// # use url::Url;
502    /// # use matrix_sdk::ruma::{room_id, OwnedRoomId};
503    /// use serde::{Deserialize, Serialize};
504    /// use matrix_sdk::ruma::events::{macros::EventContent, room::EncryptedFile};
505    ///
506    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
507    /// #[ruma_event(type = "com.example.custom", kind = MessageLike)]
508    /// struct CustomEventContent {
509    ///     encrypted_file: EncryptedFile,
510    /// }
511    ///
512    /// # async {
513    /// # let homeserver = Url::parse("http://example.com")?;
514    /// # let client = Client::new(homeserver).await?;
515    /// # let room = client.get_room(&room_id!("!test:example.com")).unwrap();
516    /// let mut reader = std::io::Cursor::new(b"Hello, world!");
517    /// let encrypted_file = client.upload_encrypted_file(&mut reader).await?;
518    ///
519    /// room.send(CustomEventContent { encrypted_file }).await?;
520    /// # anyhow::Ok(()) };
521    /// ```
522    pub fn upload_encrypted_file<'a, R: Read + ?Sized + 'a>(
523        &'a self,
524        reader: &'a mut R,
525    ) -> UploadEncryptedFile<'a, R> {
526        UploadEncryptedFile::new(self, reader)
527    }
528
529    /// Encrypt and upload the file and thumbnails, and return the source
530    /// information.
531    pub(crate) async fn upload_encrypted_media_and_thumbnail(
532        &self,
533        data: &[u8],
534        thumbnail: Option<Thumbnail>,
535        send_progress: SharedObservable<TransmissionProgress>,
536    ) -> Result<(MediaSource, Option<(MediaSource, Box<ThumbnailInfo>)>)> {
537        let upload_thumbnail = self.upload_encrypted_thumbnail(thumbnail, send_progress.clone());
538
539        let upload_attachment = async {
540            let mut cursor = Cursor::new(data);
541            self.upload_encrypted_file(&mut cursor)
542                .with_send_progress_observable(send_progress)
543                .await
544        };
545
546        let (thumbnail, file) = try_join(upload_thumbnail, upload_attachment).await?;
547
548        Ok((MediaSource::Encrypted(Box::new(file)), thumbnail))
549    }
550
551    /// Uploads an encrypted thumbnail to the media repository, and returns
552    /// its source and extra information.
553    async fn upload_encrypted_thumbnail(
554        &self,
555        thumbnail: Option<Thumbnail>,
556        send_progress: SharedObservable<TransmissionProgress>,
557    ) -> Result<Option<(MediaSource, Box<ThumbnailInfo>)>> {
558        let Some(thumbnail) = thumbnail else {
559            return Ok(None);
560        };
561
562        let (data, _, thumbnail_info) = thumbnail.into_parts();
563        let mut cursor = Cursor::new(data);
564
565        let file = self
566            .upload_encrypted_file(&mut cursor)
567            .with_send_progress_observable(send_progress)
568            .await?;
569
570        Ok(Some((MediaSource::Encrypted(Box::new(file)), thumbnail_info)))
571    }
572
573    /// Claim one-time keys creating new Olm sessions.
574    ///
575    /// # Arguments
576    ///
577    /// * `users` - The list of user/device pairs that we should claim keys for.
578    pub(crate) async fn claim_one_time_keys(
579        &self,
580        users: impl Iterator<Item = &UserId>,
581    ) -> Result<()> {
582        let _lock = self.locks().key_claim_lock.lock().await;
583
584        if let Some((request_id, request)) = self
585            .olm_machine()
586            .await
587            .as_ref()
588            .ok_or(Error::NoOlmMachine)?
589            .get_missing_sessions(users)
590            .await?
591        {
592            let response = self.send(request).await?;
593            self.mark_request_as_sent(&request_id, &response).await?;
594        }
595
596        Ok(())
597    }
598
599    /// Upload the E2E encryption keys.
600    ///
601    /// This uploads the long lived device keys as well as the required amount
602    /// of one-time keys.
603    ///
604    /// # Panics
605    ///
606    /// Panics if the client isn't logged in, or if no encryption keys need to
607    /// be uploaded.
608    #[instrument(skip(self, request))]
609    pub(crate) async fn keys_upload(
610        &self,
611        request_id: &TransactionId,
612        request: &upload_keys::v3::Request,
613    ) -> Result<upload_keys::v3::Response> {
614        debug!(
615            device_keys = request.device_keys.is_some(),
616            one_time_key_count = request.one_time_keys.len(),
617            "Uploading public encryption keys",
618        );
619
620        let response = self.send(request.clone()).await?;
621        self.mark_request_as_sent(request_id, &response).await?;
622
623        Ok(response)
624    }
625
626    pub(crate) async fn room_send_helper(
627        &self,
628        request: &RoomMessageRequest,
629    ) -> Result<send_message_event::v3::Response> {
630        let content = request.content.clone();
631        let txn_id = request.txn_id.clone();
632        let room_id = &request.room_id;
633
634        self.get_room(room_id)
635            .expect("Can't send a message to a room that isn't known to the store")
636            .send(*content)
637            .with_transaction_id(txn_id)
638            .await
639    }
640
641    pub(crate) async fn send_to_device(
642        &self,
643        request: &ToDeviceRequest,
644    ) -> HttpResult<ToDeviceResponse> {
645        let request = RumaToDeviceRequest::new_raw(
646            request.event_type.clone(),
647            request.txn_id.clone(),
648            request.messages.clone(),
649        );
650
651        self.send(request).await
652    }
653
654    pub(crate) async fn send_verification_request(
655        &self,
656        request: OutgoingVerificationRequest,
657    ) -> Result<()> {
658        use matrix_sdk_base::crypto::types::requests::OutgoingVerificationRequest::*;
659
660        match request {
661            ToDevice(t) => {
662                self.send_to_device(&t).await?;
663            }
664            InRoom(r) => {
665                self.room_send_helper(&r).await?;
666            }
667        }
668
669        Ok(())
670    }
671
672    /// Get the existing DM room with the given user, if any.
673    pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
674        let rooms = self.joined_rooms();
675
676        // Find the room we share with the `user_id` and only with `user_id`
677        let room = rooms.into_iter().find(|r| {
678            let targets = r.direct_targets();
679            targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id))
680        });
681
682        trace!(?room, "Found room");
683        room
684    }
685
686    async fn send_outgoing_request(&self, r: OutgoingRequest) -> Result<()> {
687        use matrix_sdk_base::crypto::types::requests::AnyOutgoingRequest;
688
689        match r.request() {
690            AnyOutgoingRequest::KeysQuery(request) => {
691                self.keys_query(r.request_id(), request.device_keys.clone()).await?;
692            }
693            AnyOutgoingRequest::KeysUpload(request) => {
694                let response = self.keys_upload(r.request_id(), request).await;
695
696                if let Err(e) = &response {
697                    match e.as_ruma_api_error() {
698                        Some(RumaApiError::ClientApi(e)) if e.status_code == 400 => {
699                            if let ErrorBody::Standard { message, .. } = &e.body {
700                                // This is one of the nastiest errors we can have. The server
701                                // telling us that we already have a one-time key uploaded means
702                                // that we forgot about some of our one-time keys. This will lead to
703                                // UTDs.
704                                {
705                                    let already_reported = self
706                                        .state_store()
707                                        .get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded)
708                                        .await?
709                                        .is_some();
710
711                                    if message.starts_with("One time key") && !already_reported {
712                                        if let Ok(message) =
713                                            DuplicateOneTimeKeyErrorMessage::from_str(message)
714                                        {
715                                            error!(
716                                                sentry = true,
717                                                old_key = %message.old_key,
718                                                new_key = %message.new_key,
719                                                "Duplicate one-time keys have been uploaded"
720                                            );
721                                        } else {
722                                            error!(
723                                                sentry = true,
724                                                "Duplicate one-time keys have been uploaded"
725                                            );
726                                        }
727
728                                        self.state_store()
729                                            .set_kv_data(
730                                                StateStoreDataKey::OneTimeKeyAlreadyUploaded,
731                                                StateStoreDataValue::OneTimeKeyAlreadyUploaded,
732                                            )
733                                            .await?;
734                                    }
735                                }
736                            }
737                        }
738                        _ => {}
739                    }
740
741                    response?;
742                }
743            }
744            AnyOutgoingRequest::ToDeviceRequest(request) => {
745                let response = self.send_to_device(request).await?;
746                self.mark_request_as_sent(r.request_id(), &response).await?;
747            }
748            AnyOutgoingRequest::SignatureUpload(request) => {
749                let response = self.send(request.clone()).await?;
750                self.mark_request_as_sent(r.request_id(), &response).await?;
751            }
752            AnyOutgoingRequest::RoomMessage(request) => {
753                let response = self.room_send_helper(request).await?;
754                self.mark_request_as_sent(r.request_id(), &response).await?;
755            }
756            AnyOutgoingRequest::KeysClaim(request) => {
757                let response = self.send(request.clone()).await?;
758                self.mark_request_as_sent(r.request_id(), &response).await?;
759            }
760        }
761
762        Ok(())
763    }
764
765    #[instrument(skip_all)]
766    pub(crate) async fn send_outgoing_requests(&self) -> Result<()> {
767        const MAX_CONCURRENT_REQUESTS: usize = 20;
768
769        // This is needed because sometimes we need to automatically
770        // claim some one-time keys to unwedge an existing Olm session.
771        if let Err(e) = self.claim_one_time_keys(iter::empty()).await {
772            warn!("Error while claiming one-time keys {:?}", e);
773        }
774
775        let outgoing_requests = stream::iter(
776            self.olm_machine()
777                .await
778                .as_ref()
779                .ok_or(Error::NoOlmMachine)?
780                .outgoing_requests()
781                .await?,
782        )
783        .map(|r| self.send_outgoing_request(r));
784
785        let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS);
786
787        requests
788            .for_each(|r| async move {
789                match r {
790                    Ok(_) => (),
791                    Err(e) => warn!(error = ?e, "Error when sending out an outgoing E2EE request"),
792                }
793            })
794            .await;
795
796        Ok(())
797    }
798}
799
800#[cfg(any(feature = "testing", test))]
801impl Client {
802    /// Get the olm machine, for testing purposes only.
803    pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
804        self.olm_machine().await
805    }
806}
807
808/// A high-level API to manage the client's encryption.
809///
810/// To get this, use [`Client::encryption()`].
811#[derive(Debug, Clone)]
812pub struct Encryption {
813    /// The underlying client.
814    client: Client,
815}
816
817impl Encryption {
818    pub(crate) fn new(client: Client) -> Self {
819        Self { client }
820    }
821
822    /// Returns the current encryption settings for this client.
823    pub(crate) fn settings(&self) -> EncryptionSettings {
824        self.client.inner.e2ee.encryption_settings
825    }
826
827    /// Get the public ed25519 key of our own device. This is usually what is
828    /// called the fingerprint of the device.
829    pub async fn ed25519_key(&self) -> Option<String> {
830        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().ed25519.to_base64())
831    }
832
833    /// Get the public Curve25519 key of our own device.
834    pub async fn curve25519_key(&self) -> Option<Curve25519PublicKey> {
835        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().curve25519)
836    }
837
838    /// Get the current device creation timestamp.
839    pub async fn device_creation_timestamp(&self) -> MilliSecondsSinceUnixEpoch {
840        match self.get_own_device().await {
841            Ok(Some(device)) => device.first_time_seen_ts(),
842            // Should not happen, there should always be an own device
843            _ => MilliSecondsSinceUnixEpoch::now(),
844        }
845    }
846
847    pub(crate) async fn import_secrets_bundle(
848        &self,
849        bundle: &matrix_sdk_base::crypto::types::SecretsBundle,
850    ) -> Result<(), SecretImportError> {
851        let olm_machine = self.client.olm_machine().await;
852        let olm_machine =
853            olm_machine.as_ref().expect("This should only be called once we have an OlmMachine");
854
855        olm_machine.store().import_secrets_bundle(bundle).await
856    }
857
858    /// Get the status of the private cross signing keys.
859    ///
860    /// This can be used to check which private cross signing keys we have
861    /// stored locally.
862    pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
863        let olm = self.client.olm_machine().await;
864        let machine = olm.as_ref()?;
865        Some(machine.cross_signing_status().await)
866    }
867
868    /// Get all the tracked users we know about
869    ///
870    /// Tracked users are users for which we keep the device list of E2EE
871    /// capable devices up to date.
872    pub async fn tracked_users(&self) -> Result<HashSet<OwnedUserId>, CryptoStoreError> {
873        if let Some(machine) = self.client.olm_machine().await.as_ref() {
874            machine.tracked_users().await
875        } else {
876            Ok(HashSet::new())
877        }
878    }
879
880    /// Get a [`Subscriber`] for the [`VerificationState`].
881    ///
882    /// # Examples
883    ///
884    /// ```no_run
885    /// use matrix_sdk::{Client, encryption};
886    /// use url::Url;
887    ///
888    /// # async {
889    /// let homeserver = Url::parse("http://example.com")?;
890    /// let client = Client::new(homeserver).await?;
891    /// let mut subscriber = client.encryption().verification_state();
892    ///
893    /// let current_value = subscriber.get();
894    ///
895    /// println!("The current verification state is: {current_value:?}");
896    ///
897    /// if let Some(verification_state) = subscriber.next().await {
898    ///     println!("Received verification state update {:?}", verification_state)
899    /// }
900    /// # anyhow::Ok(()) };
901    /// ```
902    pub fn verification_state(&self) -> Subscriber<VerificationState> {
903        self.client.inner.verification_state.subscribe_reset()
904    }
905
906    /// Get a verification object with the given flow id.
907    pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
908        let olm = self.client.olm_machine().await;
909        let olm = olm.as_ref()?;
910        #[allow(clippy::bind_instead_of_map)]
911        olm.get_verification(user_id, flow_id).and_then(|v| match v {
912            matrix_sdk_base::crypto::Verification::SasV1(sas) => {
913                Some(SasVerification { inner: sas, client: self.client.clone() }.into())
914            }
915            #[cfg(feature = "qrcode")]
916            matrix_sdk_base::crypto::Verification::QrV1(qr) => {
917                Some(verification::QrVerification { inner: qr, client: self.client.clone() }.into())
918            }
919            _ => None,
920        })
921    }
922
923    /// Get a `VerificationRequest` object for the given user with the given
924    /// flow id.
925    pub async fn get_verification_request(
926        &self,
927        user_id: &UserId,
928        flow_id: impl AsRef<str>,
929    ) -> Option<VerificationRequest> {
930        let olm = self.client.olm_machine().await;
931        let olm = olm.as_ref()?;
932
933        olm.get_verification_request(user_id, flow_id)
934            .map(|r| VerificationRequest { inner: r, client: self.client.clone() })
935    }
936
937    /// Get a specific device of a user.
938    ///
939    /// # Arguments
940    ///
941    /// * `user_id` - The unique id of the user that the device belongs to.
942    ///
943    /// * `device_id` - The unique id of the device.
944    ///
945    /// Returns a `Device` if one is found and the crypto store didn't throw an
946    /// error.
947    ///
948    /// This will always return None if the client hasn't been logged in.
949    ///
950    /// # Examples
951    ///
952    /// ```no_run
953    /// # use matrix_sdk::{Client, ruma::{device_id, user_id}};
954    /// # use url::Url;
955    /// # async {
956    /// # let alice = user_id!("@alice:example.org");
957    /// # let homeserver = Url::parse("http://example.com")?;
958    /// # let client = Client::new(homeserver).await?;
959    /// if let Some(device) =
960    ///     client.encryption().get_device(alice, device_id!("DEVICEID")).await?
961    /// {
962    ///     println!("{:?}", device.is_verified());
963    ///
964    ///     if !device.is_verified() {
965    ///         let verification = device.request_verification().await?;
966    ///     }
967    /// }
968    /// # anyhow::Ok(()) };
969    /// ```
970    pub async fn get_device(
971        &self,
972        user_id: &UserId,
973        device_id: &DeviceId,
974    ) -> Result<Option<Device>, CryptoStoreError> {
975        let olm = self.client.olm_machine().await;
976        let Some(machine) = olm.as_ref() else { return Ok(None) };
977        let device = machine.get_device(user_id, device_id, None).await?;
978        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
979    }
980
981    /// A convenience method to retrieve your own device from the store.
982    ///
983    /// This is the same as calling [`Encryption::get_device()`] with your own
984    /// user and device ID.
985    ///
986    /// This will always return a device, unless you are not logged in.
987    pub async fn get_own_device(&self) -> Result<Option<Device>, CryptoStoreError> {
988        let olm = self.client.olm_machine().await;
989        let Some(machine) = olm.as_ref() else { return Ok(None) };
990        let device = machine.get_device(machine.user_id(), machine.device_id(), None).await?;
991        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
992    }
993
994    /// Get a map holding all the devices of an user.
995    ///
996    /// This will always return an empty map if the client hasn't been logged
997    /// in.
998    ///
999    /// # Arguments
1000    ///
1001    /// * `user_id` - The unique id of the user that the devices belong to.
1002    ///
1003    /// # Examples
1004    ///
1005    /// ```no_run
1006    /// # use matrix_sdk::{Client, ruma::user_id};
1007    /// # use url::Url;
1008    /// # async {
1009    /// # let alice = user_id!("@alice:example.org");
1010    /// # let homeserver = Url::parse("http://example.com")?;
1011    /// # let client = Client::new(homeserver).await?;
1012    /// let devices = client.encryption().get_user_devices(alice).await?;
1013    ///
1014    /// for device in devices.devices() {
1015    ///     println!("{device:?}");
1016    /// }
1017    /// # anyhow::Ok(()) };
1018    /// ```
1019    pub async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices, Error> {
1020        let devices = self
1021            .client
1022            .olm_machine()
1023            .await
1024            .as_ref()
1025            .ok_or(Error::NoOlmMachine)?
1026            .get_user_devices(user_id, None)
1027            .await?;
1028
1029        Ok(UserDevices { inner: devices, client: self.client.clone() })
1030    }
1031
1032    /// Get the E2EE identity of a user from the crypto store.
1033    ///
1034    /// Usually, we only have the E2EE identity of a user locally if the user
1035    /// is tracked, meaning that we are both members of the same encrypted room.
1036    ///
1037    /// To get the E2EE identity of a user even if it is not available locally
1038    /// use [`Encryption::request_user_identity()`].
1039    ///
1040    /// # Arguments
1041    ///
1042    /// * `user_id` - The unique id of the user that the identity belongs to.
1043    ///
1044    /// Returns a `UserIdentity` if one is found and the crypto store
1045    /// didn't throw an error.
1046    ///
1047    /// This will always return None if the client hasn't been logged in.
1048    ///
1049    /// # Examples
1050    ///
1051    /// ```no_run
1052    /// # use matrix_sdk::{Client, ruma::user_id};
1053    /// # use url::Url;
1054    /// # async {
1055    /// # let alice = user_id!("@alice:example.org");
1056    /// # let homeserver = Url::parse("http://example.com")?;
1057    /// # let client = Client::new(homeserver).await?;
1058    /// let user = client.encryption().get_user_identity(alice).await?;
1059    ///
1060    /// if let Some(user) = user {
1061    ///     println!("{:?}", user.is_verified());
1062    ///
1063    ///     let verification = user.request_verification().await?;
1064    /// }
1065    /// # anyhow::Ok(()) };
1066    /// ```
1067    pub async fn get_user_identity(
1068        &self,
1069        user_id: &UserId,
1070    ) -> Result<Option<UserIdentity>, CryptoStoreError> {
1071        let olm = self.client.olm_machine().await;
1072        let Some(olm) = olm.as_ref() else { return Ok(None) };
1073        let identity = olm.get_identity(user_id, None).await?;
1074
1075        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1076    }
1077
1078    /// Get the E2EE identity of a user from the homeserver.
1079    ///
1080    /// The E2EE identity returned is always guaranteed to be up-to-date. If the
1081    /// E2EE identity is not found, it should mean that the user did not set
1082    /// up cross-signing.
1083    ///
1084    /// If you want the E2EE identity of a user without making a request to the
1085    /// homeserver, use [`Encryption::get_user_identity()`] instead.
1086    ///
1087    /// # Arguments
1088    ///
1089    /// * `user_id` - The ID of the user that the identity belongs to.
1090    ///
1091    /// Returns a [`UserIdentity`] if one is found. Returns an error if there
1092    /// was an issue with the crypto store or with the request to the
1093    /// homeserver.
1094    ///
1095    /// This will always return `None` if the client hasn't been logged in.
1096    ///
1097    /// # Examples
1098    ///
1099    /// ```no_run
1100    /// # use matrix_sdk::{Client, ruma::user_id};
1101    /// # use url::Url;
1102    /// # async {
1103    /// # let alice = user_id!("@alice:example.org");
1104    /// # let homeserver = Url::parse("http://example.com")?;
1105    /// # let client = Client::new(homeserver).await?;
1106    /// let user = client.encryption().request_user_identity(alice).await?;
1107    ///
1108    /// if let Some(user) = user {
1109    ///     println!("User is verified: {:?}", user.is_verified());
1110    ///
1111    ///     let verification = user.request_verification().await?;
1112    /// }
1113    /// # anyhow::Ok(()) };
1114    /// ```
1115    pub async fn request_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
1116        let olm = self.client.olm_machine().await;
1117        let Some(olm) = olm.as_ref() else { return Ok(None) };
1118
1119        let (request_id, request) = olm.query_keys_for_users(iter::once(user_id));
1120        self.client.keys_query(&request_id, request.device_keys).await?;
1121
1122        let identity = olm.get_identity(user_id, None).await?;
1123        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1124    }
1125
1126    /// Returns a stream of device updates, allowing users to listen for
1127    /// notifications about new or changed devices.
1128    ///
1129    /// The stream produced by this method emits updates whenever a new device
1130    /// is discovered or when an existing device's information is changed. Users
1131    /// can subscribe to this stream and receive updates in real-time.
1132    ///
1133    /// # Examples
1134    ///
1135    /// ```no_run
1136    /// # use matrix_sdk::Client;
1137    /// # use ruma::{device_id, user_id};
1138    /// # use futures_util::{pin_mut, StreamExt};
1139    /// # let client: Client = unimplemented!();
1140    /// # async {
1141    /// let devices_stream = client.encryption().devices_stream().await?;
1142    /// let user_id = client
1143    ///     .user_id()
1144    ///     .expect("We should know our user id after we have logged in");
1145    /// pin_mut!(devices_stream);
1146    ///
1147    /// for device_updates in devices_stream.next().await {
1148    ///     if let Some(user_devices) = device_updates.new.get(user_id) {
1149    ///         for device in user_devices.values() {
1150    ///             println!("A new device has been added {}", device.device_id());
1151    ///         }
1152    ///     }
1153    /// }
1154    /// # anyhow::Ok(()) };
1155    /// ```
1156    pub async fn devices_stream(&self) -> Result<impl Stream<Item = DeviceUpdates> + use<>> {
1157        let olm = self.client.olm_machine().await;
1158        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1159        let client = self.client.to_owned();
1160
1161        Ok(olm
1162            .store()
1163            .devices_stream()
1164            .map(move |updates| DeviceUpdates::new(client.to_owned(), updates)))
1165    }
1166
1167    /// Returns a stream of user identity updates, allowing users to listen for
1168    /// notifications about new or changed user identities.
1169    ///
1170    /// The stream produced by this method emits updates whenever a new user
1171    /// identity is discovered or when an existing identities information is
1172    /// changed. Users can subscribe to this stream and receive updates in
1173    /// real-time.
1174    ///
1175    /// # Examples
1176    ///
1177    /// ```no_run
1178    /// # use matrix_sdk::Client;
1179    /// # use ruma::{device_id, user_id};
1180    /// # use futures_util::{pin_mut, StreamExt};
1181    /// # let client: Client = unimplemented!();
1182    /// # async {
1183    /// let identities_stream =
1184    ///     client.encryption().user_identities_stream().await?;
1185    /// pin_mut!(identities_stream);
1186    ///
1187    /// for identity_updates in identities_stream.next().await {
1188    ///     for (_, identity) in identity_updates.new {
1189    ///         println!("A new identity has been added {}", identity.user_id());
1190    ///     }
1191    /// }
1192    /// # anyhow::Ok(()) };
1193    /// ```
1194    pub async fn user_identities_stream(
1195        &self,
1196    ) -> Result<impl Stream<Item = IdentityUpdates> + use<>> {
1197        let olm = self.client.olm_machine().await;
1198        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1199        let client = self.client.to_owned();
1200
1201        Ok(olm
1202            .store()
1203            .user_identities_stream()
1204            .map(move |updates| IdentityUpdates::new(client.to_owned(), updates)))
1205    }
1206
1207    /// Create and upload a new cross signing identity.
1208    ///
1209    /// # Arguments
1210    ///
1211    /// * `auth_data` - This request requires user interactive auth, the first
1212    ///   request needs to set this to `None` and will always fail with an
1213    ///   `UiaaResponse`. The response will contain information for the
1214    ///   interactive auth and the same request needs to be made but this time
1215    ///   with some `auth_data` provided.
1216    ///
1217    /// # Examples
1218    ///
1219    /// ```no_run
1220    /// # use std::collections::BTreeMap;
1221    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1222    /// # use url::Url;
1223    /// # use serde_json::json;
1224    /// # async {
1225    /// # let homeserver = Url::parse("http://example.com")?;
1226    /// # let client = Client::new(homeserver).await?;
1227    /// if let Err(e) = client.encryption().bootstrap_cross_signing(None).await {
1228    ///     if let Some(response) = e.as_uiaa_response() {
1229    ///         let mut password = uiaa::Password::new(
1230    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
1231    ///             "wordpass".to_owned(),
1232    ///         );
1233    ///         password.session = response.session.clone();
1234    ///
1235    ///         client
1236    ///             .encryption()
1237    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1238    ///             .await
1239    ///             .expect("Couldn't bootstrap cross signing")
1240    ///     } else {
1241    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1242    ///     }
1243    /// }
1244    /// # anyhow::Ok(()) };
1245    pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData>) -> Result<()> {
1246        let olm = self.client.olm_machine().await;
1247        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1248
1249        let CrossSigningBootstrapRequests {
1250            upload_signing_keys_req,
1251            upload_keys_req,
1252            upload_signatures_req,
1253        } = olm.bootstrap_cross_signing(false).await?;
1254
1255        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1256            auth: auth_data,
1257            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1258            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1259            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1260        });
1261
1262        if let Some(req) = upload_keys_req {
1263            self.client.send_outgoing_request(req).await?;
1264        }
1265        self.client.send(upload_signing_keys_req).await?;
1266        self.client.send(upload_signatures_req).await?;
1267
1268        Ok(())
1269    }
1270
1271    /// Reset the cross-signing keys.
1272    ///
1273    /// # Example
1274    ///
1275    /// ```no_run
1276    /// # use matrix_sdk::{ruma::api::client::uiaa, Client, encryption::CrossSigningResetAuthType};
1277    /// # use url::Url;
1278    /// # async {
1279    /// # let homeserver = Url::parse("http://example.com")?;
1280    /// # let client = Client::new(homeserver).await?;
1281    /// # let user_id = unimplemented!();
1282    /// let encryption = client.encryption();
1283    ///
1284    /// if let Some(handle) = encryption.reset_cross_signing().await? {
1285    ///     match handle.auth_type() {
1286    ///         CrossSigningResetAuthType::Uiaa(uiaa) => {
1287    ///             use matrix_sdk::ruma::api::client::uiaa;
1288    ///
1289    ///             let password = "1234".to_owned();
1290    ///             let mut password = uiaa::Password::new(user_id, password);
1291    ///             password.session = uiaa.session;
1292    ///
1293    ///             handle.auth(Some(uiaa::AuthData::Password(password))).await?;
1294    ///         }
1295    ///         CrossSigningResetAuthType::OAuth(o) => {
1296    ///             println!(
1297    ///                 "To reset your end-to-end encryption cross-signing identity, \
1298    ///                 you first need to approve it at {}",
1299    ///                 o.approval_url
1300    ///             );
1301    ///             handle.auth(None).await?;
1302    ///         }
1303    ///     }
1304    /// }
1305    /// # anyhow::Ok(()) };
1306    /// ```
1307    pub async fn reset_cross_signing(&self) -> Result<Option<CrossSigningResetHandle>> {
1308        let olm = self.client.olm_machine().await;
1309        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1310
1311        let CrossSigningBootstrapRequests {
1312            upload_keys_req,
1313            upload_signing_keys_req,
1314            upload_signatures_req,
1315        } = olm.bootstrap_cross_signing(true).await?;
1316
1317        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1318            auth: None,
1319            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1320            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1321            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1322        });
1323
1324        if let Some(req) = upload_keys_req {
1325            self.client.send_outgoing_request(req).await?;
1326        }
1327
1328        if let Err(error) = self.client.send(upload_signing_keys_req.clone()).await {
1329            if let Ok(Some(auth_type)) = CrossSigningResetAuthType::new(&error) {
1330                let client = self.client.clone();
1331
1332                Ok(Some(CrossSigningResetHandle::new(
1333                    client,
1334                    upload_signing_keys_req,
1335                    upload_signatures_req,
1336                    auth_type,
1337                )))
1338            } else {
1339                Err(error.into())
1340            }
1341        } else {
1342            self.client.send(upload_signatures_req).await?;
1343
1344            Ok(None)
1345        }
1346    }
1347
1348    /// Query the user's own device keys, if, and only if, we didn't have their
1349    /// identity in the first place.
1350    async fn ensure_initial_key_query(&self) -> Result<()> {
1351        let olm_machine = self.client.olm_machine().await;
1352        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1353
1354        let user_id = olm_machine.user_id();
1355
1356        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1357            let (request_id, request) = olm_machine.query_keys_for_users([olm_machine.user_id()]);
1358            self.client.keys_query(&request_id, request.device_keys).await?;
1359        }
1360
1361        Ok(())
1362    }
1363
1364    /// Create and upload a new cross signing identity, if that has not been
1365    /// done yet.
1366    ///
1367    /// This will only create a new cross-signing identity if the user had never
1368    /// done it before. If the user did it before, then this is a no-op.
1369    ///
1370    /// See also the documentation of [`Self::bootstrap_cross_signing`] for the
1371    /// behavior of this function.
1372    ///
1373    /// # Arguments
1374    ///
1375    /// * `auth_data` - This request requires user interactive auth, the first
1376    ///   request needs to set this to `None` and will always fail with an
1377    ///   `UiaaResponse`. The response will contain information for the
1378    ///   interactive auth and the same request needs to be made but this time
1379    ///   with some `auth_data` provided.
1380    ///
1381    /// # Examples
1382    /// ```no_run
1383    /// # use std::collections::BTreeMap;
1384    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1385    /// # use url::Url;
1386    /// # use serde_json::json;
1387    /// # async {
1388    /// # let homeserver = Url::parse("http://example.com")?;
1389    /// # let client = Client::new(homeserver).await?;
1390    /// if let Err(e) = client.encryption().bootstrap_cross_signing_if_needed(None).await {
1391    ///     if let Some(response) = e.as_uiaa_response() {
1392    ///         let mut password = uiaa::Password::new(
1393    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
1394    ///             "wordpass".to_owned(),
1395    ///         );
1396    ///         password.session = response.session.clone();
1397    ///
1398    ///         // Note, on the failed attempt we can use `bootstrap_cross_signing` immediately, to
1399    ///         // avoid checks.
1400    ///         client
1401    ///             .encryption()
1402    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1403    ///             .await
1404    ///             .expect("Couldn't bootstrap cross signing")
1405    ///     } else {
1406    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1407    ///     }
1408    /// }
1409    /// # anyhow::Ok(()) };
1410    pub async fn bootstrap_cross_signing_if_needed(
1411        &self,
1412        auth_data: Option<AuthData>,
1413    ) -> Result<()> {
1414        let olm_machine = self.client.olm_machine().await;
1415        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1416        let user_id = olm_machine.user_id();
1417
1418        self.ensure_initial_key_query().await?;
1419
1420        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1421            self.bootstrap_cross_signing(auth_data).await?;
1422        }
1423
1424        Ok(())
1425    }
1426
1427    /// Export E2EE keys that match the given predicate encrypting them with the
1428    /// given passphrase.
1429    ///
1430    /// # Arguments
1431    ///
1432    /// * `path` - The file path where the exported key file will be saved.
1433    ///
1434    /// * `passphrase` - The passphrase that will be used to encrypt the
1435    ///   exported room keys.
1436    ///
1437    /// * `predicate` - A closure that will be called for every known
1438    ///   `InboundGroupSession`, which represents a room key. If the closure
1439    ///   returns `true` the `InboundGroupSessoin` will be included in the
1440    ///   export, if the closure returns `false` it will not be included.
1441    ///
1442    /// # Panics
1443    ///
1444    /// This method will panic if it isn't run on a Tokio runtime.
1445    ///
1446    /// This method will panic if it can't get enough randomness from the OS to
1447    /// encrypt the exported keys securely.
1448    ///
1449    /// # Examples
1450    ///
1451    /// ```no_run
1452    /// # use std::{path::PathBuf, time::Duration};
1453    /// # use matrix_sdk::{
1454    /// #     Client, config::SyncSettings,
1455    /// #     ruma::room_id,
1456    /// # };
1457    /// # use url::Url;
1458    /// # async {
1459    /// # let homeserver = Url::parse("http://localhost:8080")?;
1460    /// # let mut client = Client::new(homeserver).await?;
1461    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1462    /// // Export all room keys.
1463    /// client
1464    ///     .encryption()
1465    ///     .export_room_keys(path, "secret-passphrase", |_| true)
1466    ///     .await?;
1467    ///
1468    /// // Export only the room keys for a certain room.
1469    /// let path = PathBuf::from("/home/example/e2e-room-keys.txt");
1470    /// let room_id = room_id!("!test:localhost");
1471    ///
1472    /// client
1473    ///     .encryption()
1474    ///     .export_room_keys(path, "secret-passphrase", |s| s.room_id() == room_id)
1475    ///     .await?;
1476    /// # anyhow::Ok(()) };
1477    /// ```
1478    #[cfg(not(target_family = "wasm"))]
1479    pub async fn export_room_keys(
1480        &self,
1481        path: PathBuf,
1482        passphrase: &str,
1483        predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
1484    ) -> Result<()> {
1485        let olm = self.client.olm_machine().await;
1486        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1487
1488        let keys = olm.store().export_room_keys(predicate).await?;
1489        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1490
1491        let encrypt = move || -> Result<()> {
1492            let export: String =
1493                matrix_sdk_base::crypto::encrypt_room_key_export(&keys, &passphrase, 500_000)?;
1494            let mut file = std::fs::File::create(path)?;
1495            file.write_all(&export.into_bytes())?;
1496            Ok(())
1497        };
1498
1499        let task = tokio::task::spawn_blocking(encrypt);
1500        task.await.expect("Task join error")
1501    }
1502
1503    /// Import E2EE keys from the given file path.
1504    ///
1505    /// # Arguments
1506    ///
1507    /// * `path` - The file path where the exported key file will can be found.
1508    ///
1509    /// * `passphrase` - The passphrase that should be used to decrypt the
1510    ///   exported room keys.
1511    ///
1512    /// Returns a tuple of numbers that represent the number of sessions that
1513    /// were imported and the total number of sessions that were found in the
1514    /// key export.
1515    ///
1516    /// # Panics
1517    ///
1518    /// This method will panic if it isn't run on a Tokio runtime.
1519    ///
1520    /// ```no_run
1521    /// # use std::{path::PathBuf, time::Duration};
1522    /// # use matrix_sdk::{
1523    /// #     Client, config::SyncSettings,
1524    /// #     ruma::room_id,
1525    /// # };
1526    /// # use url::Url;
1527    /// # async {
1528    /// # let homeserver = Url::parse("http://localhost:8080")?;
1529    /// # let mut client = Client::new(homeserver).await?;
1530    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1531    /// let result =
1532    ///     client.encryption().import_room_keys(path, "secret-passphrase").await?;
1533    ///
1534    /// println!(
1535    ///     "Imported {} room keys out of {}",
1536    ///     result.imported_count, result.total_count
1537    /// );
1538    /// # anyhow::Ok(()) };
1539    /// ```
1540    #[cfg(not(target_family = "wasm"))]
1541    pub async fn import_room_keys(
1542        &self,
1543        path: PathBuf,
1544        passphrase: &str,
1545    ) -> Result<RoomKeyImportResult, RoomKeyImportError> {
1546        let olm = self.client.olm_machine().await;
1547        let olm = olm.as_ref().ok_or(RoomKeyImportError::StoreClosed)?;
1548        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1549
1550        let decrypt = move || {
1551            let file = std::fs::File::open(path)?;
1552            matrix_sdk_base::crypto::decrypt_room_key_export(file, &passphrase)
1553        };
1554
1555        let task = tokio::task::spawn_blocking(decrypt);
1556        let import = task.await.expect("Task join error")?;
1557
1558        let ret = olm.store().import_exported_room_keys(import, |_, _| {}).await?;
1559
1560        self.backups().maybe_trigger_backup();
1561
1562        Ok(ret)
1563    }
1564
1565    /// Receive notifications of room keys being received as a [`Stream`].
1566    ///
1567    /// Each time a room key is updated in any way, an update will be sent to
1568    /// the stream. Updates that happen at the same time are batched into a
1569    /// [`Vec`].
1570    ///
1571    /// If the reader of the stream lags too far behind, an error is broadcast
1572    /// containing the number of skipped items.
1573    ///
1574    /// # Examples
1575    ///
1576    /// ```no_run
1577    /// # use matrix_sdk::Client;
1578    /// # use url::Url;
1579    /// # async {
1580    /// # let homeserver = Url::parse("http://example.com")?;
1581    /// # let client = Client::new(homeserver).await?;
1582    /// use futures_util::StreamExt;
1583    ///
1584    /// let Some(mut room_keys_stream) =
1585    ///     client.encryption().room_keys_received_stream().await
1586    /// else {
1587    ///     return Ok(());
1588    /// };
1589    ///
1590    /// while let Some(update) = room_keys_stream.next().await {
1591    ///     println!("Received room keys {update:?}");
1592    /// }
1593    /// # anyhow::Ok(()) };
1594    /// ```
1595    pub async fn room_keys_received_stream(
1596        &self,
1597    ) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<>>
1598    {
1599        let olm = self.client.olm_machine().await;
1600        let olm = olm.as_ref()?;
1601
1602        Some(olm.store().room_keys_received_stream())
1603    }
1604
1605    /// Receive notifications of historic room key bundles as a [`Stream`].
1606    ///
1607    /// Historic room key bundles are defined in [MSC4268](https://github.com/matrix-org/matrix-spec-proposals/pull/4268).
1608    ///
1609    /// Each time a historic room key bundle was received, an update will be
1610    /// sent to the stream. This stream is useful for informative purposes
1611    /// exclusively, historic room key bundles are handled by the SDK
1612    /// automatically.
1613    ///
1614    /// # Examples
1615    ///
1616    /// ```no_run
1617    /// # use matrix_sdk::Client;
1618    /// # use url::Url;
1619    /// # async {
1620    /// # let homeserver = Url::parse("http://example.com")?;
1621    /// # let client = Client::new(homeserver).await?;
1622    /// use futures_util::StreamExt;
1623    ///
1624    /// let Some(mut bundle_stream) =
1625    ///     client.encryption().historic_room_key_stream().await
1626    /// else {
1627    ///     return Ok(());
1628    /// };
1629    ///
1630    /// while let Some(bundle_info) = bundle_stream.next().await {
1631    ///     println!("Received a historic room key bundle {bundle_info:?}");
1632    /// }
1633    /// # anyhow::Ok(()) };
1634    /// ```
1635    pub async fn historic_room_key_stream(
1636        &self,
1637    ) -> Option<impl Stream<Item = RoomKeyBundleInfo> + use<>> {
1638        let olm = self.client.olm_machine().await;
1639        let olm = olm.as_ref()?;
1640
1641        Some(olm.store().historic_room_key_stream())
1642    }
1643
1644    /// Get the secret storage manager of the client.
1645    pub fn secret_storage(&self) -> SecretStorage {
1646        SecretStorage { client: self.client.to_owned() }
1647    }
1648
1649    /// Get the backups manager of the client.
1650    pub fn backups(&self) -> Backups {
1651        Backups { client: self.client.to_owned() }
1652    }
1653
1654    /// Get the recovery manager of the client.
1655    pub fn recovery(&self) -> Recovery {
1656        Recovery { client: self.client.to_owned() }
1657    }
1658
1659    /// Enables the crypto-store cross-process lock.
1660    ///
1661    /// This may be required if there are multiple processes that may do writes
1662    /// to the same crypto store. In that case, it's necessary to create a
1663    /// lock, so that only one process writes to it, otherwise this may
1664    /// cause confusing issues because of stale data contained in in-memory
1665    /// caches.
1666    ///
1667    /// The provided `lock_value` must be a unique identifier for this process.
1668    /// Check [`Client::cross_process_store_locks_holder_name`] to
1669    /// get the global value.
1670    pub async fn enable_cross_process_store_lock(&self, lock_value: String) -> Result<(), Error> {
1671        // If the lock has already been created, don't recreate it from scratch.
1672        if let Some(prev_lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1673            let prev_holder = prev_lock.lock_holder();
1674            if prev_holder == lock_value {
1675                return Ok(());
1676            }
1677            warn!(
1678                "Recreating cross-process store lock with a different holder value: \
1679                 prev was {prev_holder}, new is {lock_value}"
1680            );
1681        }
1682
1683        let olm_machine = self.client.base_client().olm_machine().await;
1684        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1685
1686        let lock =
1687            olm_machine.store().create_store_lock("cross_process_lock".to_owned(), lock_value);
1688
1689        // Gently try to initialize the crypto store generation counter.
1690        //
1691        // If we don't get the lock immediately, then it is already acquired by another
1692        // process, and we'll get to reload next time we acquire the lock.
1693        {
1694            let guard = lock.try_lock_once().await?;
1695            if guard.is_some() {
1696                olm_machine
1697                    .initialize_crypto_store_generation(
1698                        &self.client.locks().crypto_store_generation,
1699                    )
1700                    .await?;
1701            }
1702        }
1703
1704        self.client
1705            .locks()
1706            .cross_process_crypto_store_lock
1707            .set(lock)
1708            .map_err(|_| Error::BadCryptoStoreState)?;
1709
1710        Ok(())
1711    }
1712
1713    /// Maybe reload the `OlmMachine` after acquiring the lock for the first
1714    /// time.
1715    ///
1716    /// Returns the current generation number.
1717    async fn on_lock_newly_acquired(&self) -> Result<u64, Error> {
1718        let olm_machine_guard = self.client.olm_machine().await;
1719        if let Some(olm_machine) = olm_machine_guard.as_ref() {
1720            let (new_gen, generation_number) = olm_machine
1721                .maintain_crypto_store_generation(&self.client.locks().crypto_store_generation)
1722                .await?;
1723            // If the crypto store generation has changed,
1724            if new_gen {
1725                // (get rid of the reference to the current crypto store first)
1726                drop(olm_machine_guard);
1727                // Recreate the OlmMachine.
1728                self.client.base_client().regenerate_olm(None).await?;
1729            }
1730            Ok(generation_number)
1731        } else {
1732            // XXX: not sure this is reachable. Seems like the OlmMachine should always have
1733            // been initialised by the time we get here. Ideally we'd panic, or return an
1734            // error, but for now I'm just adding some logging to check if it
1735            // happens, and returning the magic number 0.
1736            warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised");
1737            Ok(0)
1738        }
1739    }
1740
1741    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1742    /// spin-waits until the lock is available.
1743    ///
1744    /// May reload the `OlmMachine`, after obtaining the lock but not on the
1745    /// first time.
1746    pub async fn spin_lock_store(
1747        &self,
1748        max_backoff: Option<u32>,
1749    ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1750        if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1751            let guard = lock.spin_lock(max_backoff).await?;
1752
1753            let generation = self.on_lock_newly_acquired().await?;
1754
1755            Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation }))
1756        } else {
1757            Ok(None)
1758        }
1759    }
1760
1761    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1762    /// attempts to lock it once.
1763    ///
1764    /// Returns a guard to the lock, if it was obtained.
1765    pub async fn try_lock_store_once(
1766        &self,
1767    ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1768        if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1769            let maybe_guard = lock.try_lock_once().await?;
1770
1771            let Some(guard) = maybe_guard else {
1772                return Ok(None);
1773            };
1774
1775            let generation = self.on_lock_newly_acquired().await?;
1776
1777            Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation }))
1778        } else {
1779            Ok(None)
1780        }
1781    }
1782
1783    /// Testing purposes only.
1784    #[cfg(any(test, feature = "testing"))]
1785    pub async fn uploaded_key_count(&self) -> Result<u64> {
1786        let olm_machine = self.client.olm_machine().await;
1787        let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?;
1788        Ok(olm_machine.uploaded_key_count().await?)
1789    }
1790
1791    /// Bootstrap encryption and enables event listeners for the E2EE support.
1792    ///
1793    /// Based on the `EncryptionSettings`, this call might:
1794    /// - Bootstrap cross-signing if needed (POST `/device_signing/upload`)
1795    /// - Create a key backup if needed (POST `/room_keys/version`)
1796    /// - Create a secret storage if needed (PUT `/account_data/{type}`)
1797    ///
1798    /// As part of this process, and if needed, the current device keys would be
1799    /// uploaded to the server, new account data would be added, and cross
1800    /// signing keys and signatures might be uploaded.
1801    ///
1802    /// Should be called once we
1803    /// created a [`OlmMachine`], i.e. after logging in.
1804    ///
1805    /// # Arguments
1806    ///
1807    /// * `auth_data` - Some requests may require re-authentication. To prevent
1808    ///   the user from having to re-enter their password (or use other
1809    ///   methods), we can provide the authentication data here. This is
1810    ///   necessary for uploading cross-signing keys. However, please note that
1811    ///   there is a proposal (MSC3967) to remove this requirement, which would
1812    ///   allow for the initial upload of cross-signing keys without
1813    ///   authentication, rendering this parameter obsolete.
1814    pub(crate) async fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1815        // It's fine to be async here as we're only getting the lock protecting the
1816        // `OlmMachine`. Since the lock shouldn't be that contested right after logging
1817        // in we won't delay the login or restoration of the Client.
1818        let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite {
1819            Some(BundleReceiverTask::new(&self.client).await)
1820        } else {
1821            None
1822        };
1823
1824        let mut tasks = self.client.inner.e2ee.tasks.lock();
1825
1826        let this = self.clone();
1827
1828        tasks.setup_e2ee = Some(spawn(async move {
1829            // Update the current state first, so we don't have to wait for the result of
1830            // network requests
1831            this.update_verification_state().await;
1832
1833            if this.settings().auto_enable_cross_signing
1834                && let Err(e) = this.bootstrap_cross_signing_if_needed(auth_data).await
1835            {
1836                error!("Couldn't bootstrap cross signing {e:?}");
1837            }
1838
1839            if let Err(e) = this.backups().setup_and_resume().await {
1840                error!("Couldn't setup and resume backups {e:?}");
1841            }
1842            if let Err(e) = this.recovery().setup().await {
1843                error!("Couldn't setup and resume recovery {e:?}");
1844            }
1845        }));
1846
1847        tasks.receive_historic_room_key_bundles = bundle_receiver_task;
1848    }
1849
1850    /// Waits for end-to-end encryption initialization tasks to finish, if any
1851    /// was running in the background.
1852    pub async fn wait_for_e2ee_initialization_tasks(&self) {
1853        let task = self.client.inner.e2ee.tasks.lock().setup_e2ee.take();
1854
1855        if let Some(task) = task
1856            && let Err(err) = task.await
1857        {
1858            warn!("Error when initializing backups: {err}");
1859        }
1860    }
1861
1862    /// Upload the device keys and initial set of one-time keys to the server.
1863    ///
1864    /// This should only be called when the user logs in for the first time,
1865    /// the method will ensure that other devices see our own device as an
1866    /// end-to-end encryption enabled one.
1867    ///
1868    /// **Warning**: Do not use this method if we're already calling
1869    /// [`Client::send_outgoing_request()`]. This method is intended for
1870    /// explicitly uploading the device keys before starting a sync.
1871    pub(crate) async fn ensure_device_keys_upload(&self) -> Result<()> {
1872        let olm = self.client.olm_machine().await;
1873        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1874
1875        if let Some((request_id, request)) = olm.upload_device_keys().await? {
1876            self.client.keys_upload(&request_id, &request).await?;
1877
1878            let (request_id, request) = olm.query_keys_for_users([olm.user_id()]);
1879            self.client.keys_query(&request_id, request.device_keys).await?;
1880        }
1881
1882        Ok(())
1883    }
1884
1885    pub(crate) async fn update_state_after_keys_query(&self, response: &get_keys::v3::Response) {
1886        self.recovery().update_state_after_keys_query(response).await;
1887
1888        // Only update the verification_state if our own devices changed
1889        if let Some(user_id) = self.client.user_id() {
1890            let contains_own_device = response.device_keys.contains_key(user_id);
1891
1892            if contains_own_device {
1893                self.update_verification_state().await;
1894            }
1895        }
1896    }
1897
1898    async fn update_verification_state(&self) {
1899        match self.get_own_device().await {
1900            Ok(device) => {
1901                if let Some(device) = device {
1902                    let is_verified = device.is_cross_signed_by_owner();
1903
1904                    if is_verified {
1905                        self.client.inner.verification_state.set(VerificationState::Verified);
1906                    } else {
1907                        self.client.inner.verification_state.set(VerificationState::Unverified);
1908                    }
1909                } else {
1910                    warn!("Couldn't find out own device in the store.");
1911                    self.client.inner.verification_state.set(VerificationState::Unknown);
1912                }
1913            }
1914            Err(error) => {
1915                warn!("Failed retrieving own device: {error}");
1916                self.client.inner.verification_state.set(VerificationState::Unknown);
1917            }
1918        }
1919    }
1920
1921    /// Encrypts then send the given content via the `/sendToDevice` end-point
1922    /// using Olm encryption.
1923    ///
1924    /// If there are a lot of recipient devices multiple `/sendToDevice`
1925    /// requests might be sent out.
1926    ///
1927    /// # Returns
1928    /// A list of failures. The list of devices that couldn't get the messages.
1929    #[cfg(feature = "experimental-send-custom-to-device")]
1930    pub async fn encrypt_and_send_raw_to_device(
1931        &self,
1932        recipient_devices: Vec<&Device>,
1933        event_type: &str,
1934        content: Raw<AnyToDeviceEventContent>,
1935        share_strategy: CollectStrategy,
1936    ) -> Result<Vec<(OwnedUserId, OwnedDeviceId)>> {
1937        let users = recipient_devices.iter().map(|device| device.user_id());
1938
1939        // Will claim one-time-key for users that needs it
1940        // TODO: For later optimisation: This will establish missing olm sessions with
1941        // all this users devices, but we just want for some devices.
1942        self.client.claim_one_time_keys(users).await?;
1943
1944        let olm = self.client.olm_machine().await;
1945        let olm = olm.as_ref().expect("Olm machine wasn't started");
1946
1947        let (requests, withhelds) = olm
1948            .encrypt_content_for_devices(
1949                recipient_devices.into_iter().map(|d| d.deref().clone()).collect(),
1950                event_type,
1951                &content
1952                    .deserialize_as::<serde_json::Value>()
1953                    .expect("Deserialize as Value will always work"),
1954                share_strategy,
1955            )
1956            .await?;
1957
1958        let mut failures: Vec<(OwnedUserId, OwnedDeviceId)> = Default::default();
1959
1960        // Push the withhelds in the failures
1961        withhelds.iter().for_each(|(d, _)| {
1962            failures.push((d.user_id().to_owned(), d.device_id().to_owned()));
1963        });
1964
1965        // TODO: parallelize that? it's already grouping 250 devices per chunk.
1966        for request in requests {
1967            let ruma_request = RumaToDeviceRequest::new_raw(
1968                request.event_type.clone(),
1969                request.txn_id.clone(),
1970                request.messages.clone(),
1971            );
1972
1973            let send_result = self
1974                .client
1975                .send_inner(ruma_request, Some(RequestConfig::short_retry()), Default::default())
1976                .await;
1977
1978            // If the sending failed we need to collect the failures to report them
1979            if send_result.is_err() {
1980                // Mark the sending as failed
1981                for (user_id, device_map) in request.messages {
1982                    for device_id in device_map.keys() {
1983                        match device_id {
1984                            DeviceIdOrAllDevices::DeviceId(device_id) => {
1985                                failures.push((user_id.clone(), device_id.to_owned()));
1986                            }
1987                            DeviceIdOrAllDevices::AllDevices => {
1988                                // Cannot happen in this case
1989                            }
1990                        }
1991                    }
1992                }
1993            }
1994        }
1995
1996        Ok(failures)
1997    }
1998}
1999
2000#[cfg(all(test, not(target_family = "wasm")))]
2001mod tests {
2002    use std::{
2003        ops::Not,
2004        str::FromStr,
2005        sync::{
2006            Arc,
2007            atomic::{AtomicBool, Ordering},
2008        },
2009        time::Duration,
2010    };
2011
2012    use matrix_sdk_test::{
2013        DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
2014        event_factory::EventFactory, test_json,
2015    };
2016    use ruma::{
2017        event_id,
2018        events::{reaction::ReactionEventContent, relation::Annotation},
2019        user_id,
2020    };
2021    use serde_json::json;
2022    use wiremock::{
2023        Mock, MockServer, Request, ResponseTemplate,
2024        matchers::{header, method, path_regex},
2025    };
2026
2027    use crate::{
2028        Client, assert_next_matches_with_timeout,
2029        config::RequestConfig,
2030        encryption::{
2031            DuplicateOneTimeKeyErrorMessage, OAuthCrossSigningResetInfo, VerificationState,
2032        },
2033        test_utils::{
2034            client::mock_matrix_session, logged_in_client, no_retry_test_client, set_client_session,
2035        },
2036    };
2037
2038    #[async_test]
2039    async fn test_reaction_sending() {
2040        let server = MockServer::start().await;
2041        let client = logged_in_client(Some(server.uri())).await;
2042
2043        let event_id = event_id!("$2:example.org");
2044
2045        Mock::given(method("GET"))
2046            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
2047            .and(header("authorization", "Bearer 1234"))
2048            .respond_with(
2049                ResponseTemplate::new(200)
2050                    .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
2051            )
2052            .mount(&server)
2053            .await;
2054
2055        Mock::given(method("PUT"))
2056            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/m\.reaction/.*".to_owned()))
2057            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2058                "event_id": event_id,
2059            })))
2060            .mount(&server)
2061            .await;
2062
2063        let response = SyncResponseBuilder::default()
2064            .add_joined_room(
2065                JoinedRoomBuilder::default()
2066                    .add_state_event(StateTestEvent::Member)
2067                    .add_state_event(StateTestEvent::PowerLevels)
2068                    .add_state_event(StateTestEvent::Encryption),
2069            )
2070            .build_sync_response();
2071
2072        client.base_client().receive_sync_response(response).await.unwrap();
2073
2074        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
2075        assert!(
2076            room.latest_encryption_state().await.expect("Getting encryption state").is_encrypted()
2077        );
2078
2079        let event_id = event_id!("$1:example.org");
2080        let reaction = ReactionEventContent::new(Annotation::new(event_id.into(), "🐈".to_owned()));
2081        room.send(reaction).await.expect("Sending the reaction should not fail");
2082
2083        room.send_raw("m.reaction", json!({})).await.expect("Sending the reaction should not fail");
2084    }
2085
2086    #[async_test]
2087    async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
2088        let server = MockServer::start().await;
2089        let client = logged_in_client(Some(server.uri())).await;
2090        // This is the user ID that is inside MemberAdditional.
2091        // Note the confusing username, so we can share
2092        // GlobalAccountDataTestEvent::Direct with the invited test.
2093        let user_id = user_id!("@invited:localhost");
2094
2095        // When we receive a sync response saying "invited" is invited to a DM
2096        let f = EventFactory::new();
2097        let response = SyncResponseBuilder::default()
2098            .add_joined_room(
2099                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberAdditional),
2100            )
2101            .add_global_account_data(
2102                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
2103            )
2104            .build_sync_response();
2105        client.base_client().receive_sync_response(response).await.unwrap();
2106
2107        // Then get_dm_room finds this room
2108        let found_room = client.get_dm_room(user_id).expect("DM not found!");
2109        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
2110    }
2111
2112    #[async_test]
2113    async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
2114        let server = MockServer::start().await;
2115        let client = logged_in_client(Some(server.uri())).await;
2116        // This is the user ID that is inside MemberInvite
2117        let user_id = user_id!("@invited:localhost");
2118
2119        // When we receive a sync response saying "invited" is invited to a DM
2120        let f = EventFactory::new();
2121        let response = SyncResponseBuilder::default()
2122            .add_joined_room(
2123                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberInvite),
2124            )
2125            .add_global_account_data(
2126                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
2127            )
2128            .build_sync_response();
2129        client.base_client().receive_sync_response(response).await.unwrap();
2130
2131        // Then get_dm_room finds this room
2132        let found_room = client.get_dm_room(user_id).expect("DM not found!");
2133        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
2134    }
2135
2136    #[async_test]
2137    async fn test_get_dm_room_still_finds_left_room() {
2138        // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
2139        // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
2140
2141        let server = MockServer::start().await;
2142        let client = logged_in_client(Some(server.uri())).await;
2143        // This is the user ID that is inside MemberAdditional.
2144        // Note the confusing username, so we can share
2145        // GlobalAccountDataTestEvent::Direct with the invited test.
2146        let user_id = user_id!("@invited:localhost");
2147
2148        // When we receive a sync response saying "invited" is invited to a DM
2149        let f = EventFactory::new();
2150        let response = SyncResponseBuilder::default()
2151            .add_joined_room(
2152                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberLeave),
2153            )
2154            .add_global_account_data(
2155                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
2156            )
2157            .build_sync_response();
2158        client.base_client().receive_sync_response(response).await.unwrap();
2159
2160        // Then get_dm_room finds this room
2161        let found_room = client.get_dm_room(user_id).expect("DM not found!");
2162        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
2163    }
2164
2165    #[cfg(feature = "sqlite")]
2166    #[async_test]
2167    async fn test_generation_counter_invalidates_olm_machine() {
2168        // Create two clients using the same sqlite database.
2169
2170        use matrix_sdk_base::store::RoomLoadSettings;
2171        let sqlite_path = std::env::temp_dir().join("generation_counter_sqlite.db");
2172        let session = mock_matrix_session();
2173
2174        let client1 = Client::builder()
2175            .homeserver_url("http://localhost:1234")
2176            .request_config(RequestConfig::new().disable_retry())
2177            .sqlite_store(&sqlite_path, None)
2178            .build()
2179            .await
2180            .unwrap();
2181        client1
2182            .matrix_auth()
2183            .restore_session(session.clone(), RoomLoadSettings::default())
2184            .await
2185            .unwrap();
2186
2187        let client2 = Client::builder()
2188            .homeserver_url("http://localhost:1234")
2189            .request_config(RequestConfig::new().disable_retry())
2190            .sqlite_store(sqlite_path, None)
2191            .build()
2192            .await
2193            .unwrap();
2194        client2.matrix_auth().restore_session(session, RoomLoadSettings::default()).await.unwrap();
2195
2196        // When the lock isn't enabled, any attempt at locking won't return a guard.
2197        let guard = client1.encryption().try_lock_store_once().await.unwrap();
2198        assert!(guard.is_none());
2199
2200        client1.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2201        client2.encryption().enable_cross_process_store_lock("client2".to_owned()).await.unwrap();
2202
2203        // One client can take the lock.
2204        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2205        assert!(acquired1.is_some());
2206
2207        // Keep the olm machine, so we can see if it's changed later, by comparing Arcs.
2208        let initial_olm_machine =
2209            client1.olm_machine().await.clone().expect("must have an olm machine");
2210
2211        // Also enable backup to check that new machine has the same backup keys.
2212        let decryption_key = matrix_sdk_base::crypto::store::types::BackupDecryptionKey::new()
2213            .expect("Can't create new recovery key");
2214        let backup_key = decryption_key.megolm_v1_public_key();
2215        backup_key.set_version("1".to_owned());
2216        initial_olm_machine
2217            .backup_machine()
2218            .save_decryption_key(Some(decryption_key.to_owned()), Some("1".to_owned()))
2219            .await
2220            .expect("Should save");
2221
2222        initial_olm_machine.backup_machine().enable_backup_v1(backup_key.clone()).await.unwrap();
2223
2224        assert!(client1.encryption().backups().are_enabled().await);
2225
2226        // The other client can't take the lock too.
2227        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2228        assert!(acquired2.is_none());
2229
2230        // Now have the first client release the lock,
2231        drop(acquired1);
2232        tokio::time::sleep(Duration::from_millis(100)).await;
2233
2234        // And re-take it.
2235        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2236        assert!(acquired1.is_some());
2237
2238        // In that case, the Olm Machine shouldn't change.
2239        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2240        assert!(initial_olm_machine.same_as(&olm_machine));
2241
2242        // Ok, release again.
2243        drop(acquired1);
2244        tokio::time::sleep(Duration::from_millis(100)).await;
2245
2246        // Client2 can acquire the lock.
2247        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2248        assert!(acquired2.is_some());
2249
2250        // And then release it.
2251        drop(acquired2);
2252        tokio::time::sleep(Duration::from_millis(100)).await;
2253
2254        // Client1 can acquire it again,
2255        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2256        assert!(acquired1.is_some());
2257
2258        // But now its olm machine has been invalidated and thus regenerated!
2259        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2260
2261        assert!(!initial_olm_machine.same_as(&olm_machine));
2262
2263        let backup_key_new = olm_machine.backup_machine().get_backup_keys().await.unwrap();
2264        assert!(backup_key_new.decryption_key.is_some());
2265        assert_eq!(
2266            backup_key_new.decryption_key.unwrap().megolm_v1_public_key().to_base64(),
2267            backup_key.to_base64()
2268        );
2269        assert!(client1.encryption().backups().are_enabled().await);
2270    }
2271
2272    #[cfg(feature = "sqlite")]
2273    #[async_test]
2274    async fn test_generation_counter_no_spurious_invalidation() {
2275        // Create two clients using the same sqlite database.
2276
2277        use matrix_sdk_base::store::RoomLoadSettings;
2278        let sqlite_path =
2279            std::env::temp_dir().join("generation_counter_no_spurious_invalidations.db");
2280        let session = mock_matrix_session();
2281
2282        let client = Client::builder()
2283            .homeserver_url("http://localhost:1234")
2284            .request_config(RequestConfig::new().disable_retry())
2285            .sqlite_store(&sqlite_path, None)
2286            .build()
2287            .await
2288            .unwrap();
2289        client
2290            .matrix_auth()
2291            .restore_session(session.clone(), RoomLoadSettings::default())
2292            .await
2293            .unwrap();
2294
2295        let initial_olm_machine = client.olm_machine().await.as_ref().unwrap().clone();
2296
2297        client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2298
2299        // Enabling the lock doesn't update the olm machine.
2300        let after_enabling_lock = client.olm_machine().await.as_ref().unwrap().clone();
2301        assert!(initial_olm_machine.same_as(&after_enabling_lock));
2302
2303        {
2304            // Simulate that another client hold the lock before.
2305            let client2 = Client::builder()
2306                .homeserver_url("http://localhost:1234")
2307                .request_config(RequestConfig::new().disable_retry())
2308                .sqlite_store(sqlite_path, None)
2309                .build()
2310                .await
2311                .unwrap();
2312            client2
2313                .matrix_auth()
2314                .restore_session(session, RoomLoadSettings::default())
2315                .await
2316                .unwrap();
2317
2318            client2
2319                .encryption()
2320                .enable_cross_process_store_lock("client2".to_owned())
2321                .await
2322                .unwrap();
2323
2324            let guard = client2.encryption().spin_lock_store(None).await.unwrap();
2325            assert!(guard.is_some());
2326
2327            drop(guard);
2328            tokio::time::sleep(Duration::from_millis(100)).await;
2329        }
2330
2331        {
2332            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2333            assert!(acquired.is_some());
2334        }
2335
2336        // Taking the lock the first time will update the olm machine.
2337        let after_taking_lock_first_time = client.olm_machine().await.as_ref().unwrap().clone();
2338        assert!(!initial_olm_machine.same_as(&after_taking_lock_first_time));
2339
2340        {
2341            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2342            assert!(acquired.is_some());
2343        }
2344
2345        // Re-taking the lock doesn't update the olm machine.
2346        let after_taking_lock_second_time = client.olm_machine().await.as_ref().unwrap().clone();
2347        assert!(after_taking_lock_first_time.same_as(&after_taking_lock_second_time));
2348    }
2349
2350    #[async_test]
2351    async fn test_update_verification_state_is_updated_before_any_requests_happen() {
2352        // Given a client and a server
2353        let client = no_retry_test_client(None).await;
2354        let server = MockServer::start().await;
2355
2356        // When we subscribe to its verification state
2357        let mut verification_state = client.encryption().verification_state();
2358
2359        // We can get its initial value, and it's Unknown
2360        assert_next_matches_with_timeout!(verification_state, VerificationState::Unknown);
2361
2362        // We set up a mocked request to check this endpoint is not called before
2363        // reading the new state
2364        let keys_requested = Arc::new(AtomicBool::new(false));
2365        let inner_bool = keys_requested.clone();
2366
2367        Mock::given(method("GET"))
2368            .and(path_regex(
2369                r"/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
2370            ))
2371            .respond_with(move |_req: &Request| {
2372                inner_bool.fetch_or(true, Ordering::SeqCst);
2373                ResponseTemplate::new(200).set_body_json(json!({}))
2374            })
2375            .mount(&server)
2376            .await;
2377
2378        // When the session is initialised and the encryption tasks spawn
2379        set_client_session(&client).await;
2380
2381        // Then we can get an updated value without waiting for any network requests
2382        assert!(keys_requested.load(Ordering::SeqCst).not());
2383        assert_next_matches_with_timeout!(verification_state, VerificationState::Unverified);
2384    }
2385
2386    #[test]
2387    fn test_oauth_reset_info_from_uiaa_info() {
2388        let auth_info = json!({
2389            "session": "dummy",
2390            "flows": [
2391                {
2392                    "stages": [
2393                        "org.matrix.cross_signing_reset"
2394                    ]
2395                }
2396            ],
2397            "params": {
2398                "org.matrix.cross_signing_reset": {
2399                    "url": "https://example.org/account/account?action=org.matrix.cross_signing_reset"
2400                }
2401            },
2402            "msg": "To reset..."
2403        });
2404
2405        let auth_info = serde_json::from_value(auth_info)
2406            .expect("We should be able to deserialize the UiaaInfo");
2407        OAuthCrossSigningResetInfo::from_auth_info(&auth_info)
2408            .expect("We should be able to fetch the cross-signing reset info from the auth info");
2409    }
2410
2411    #[test]
2412    fn test_duplicate_one_time_key_error_parsing() {
2413        let message = concat!(
2414            r#"One time key signed_curve25519:AAAAAAAAAAA already exists. "#,
2415            r#"Old key: {"key":"dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs","#,
2416            r#""signatures":{"@example:matrix.org":{"ed25519:AAAAAAAAAA":""#,
2417            r#"Fk45zHAbrd+1j9wZXLjL2Y/+DU/Mnz9yuvlfYBOOT7qExN2Jdud+5BAuNs8nZ/caS4wTF39Kg3zQpzaGERoCBg"}}};"#,
2418            r#" new key: {'key': 'CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno', "#,
2419            r#"'signatures': {'@example:matrix.org': {'ed25519:AAAAAAAAAA': "#,
2420            r#"'BQ9Gp0p+6srF+c8OyruqKKd9R4yaub3THYAyyBB/7X/rG8BwcAqFynzl1aGyFYun4Q+087a5OSiglCXI+/kQAA'}}}"#
2421        );
2422        let message = DuplicateOneTimeKeyErrorMessage::from_str(message)
2423            .expect("We should be able to parse the error message");
2424
2425        assert_eq!(message.old_key.to_base64(), "dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs");
2426        assert_eq!(message.new_key.to_base64(), "CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno");
2427
2428        DuplicateOneTimeKeyErrorMessage::from_str("One time key already exists.")
2429            .expect_err("We shouldn't be able to parse an incomplete error message");
2430    }
2431}