matrix_sdk/encryption/
mod.rs

1// Copyright 2021 The Matrix.org Foundation C.I.C.
2// Copyright 2021 Damir Jelić
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![doc = include_str!("../docs/encryption.md")]
17#![cfg_attr(target_family = "wasm", allow(unused_imports))]
18
19#[cfg(feature = "experimental-send-custom-to-device")]
20use std::ops::Deref;
21use std::{
22    collections::{BTreeMap, HashSet},
23    io::{Cursor, Read, Write},
24    iter,
25    path::PathBuf,
26    sync::Arc,
27};
28
29use eyeball::{SharedObservable, Subscriber};
30use futures_core::Stream;
31use futures_util::{
32    future::try_join,
33    stream::{self, StreamExt},
34};
35#[cfg(feature = "experimental-send-custom-to-device")]
36use matrix_sdk_base::crypto::CollectStrategy;
37use matrix_sdk_base::{
38    crypto::{
39        store::types::{RoomKeyBundleInfo, RoomKeyInfo},
40        types::requests::{
41            OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
42        },
43        CrossSigningBootstrapRequests, OlmMachine,
44    },
45    StateStoreDataKey, StateStoreDataValue,
46};
47use matrix_sdk_common::{executor::spawn, locks::Mutex as StdMutex};
48use ruma::{
49    api::client::{
50        error::ErrorBody,
51        keys::{
52            get_keys, upload_keys, upload_signatures::v3::Request as UploadSignaturesRequest,
53            upload_signing_keys::v3::Request as UploadSigningKeysRequest,
54        },
55        message::send_message_event,
56        to_device::send_event_to_device::v3::{
57            Request as RumaToDeviceRequest, Response as ToDeviceResponse,
58        },
59        uiaa::{AuthData, UiaaInfo},
60    },
61    assign,
62    events::{
63        direct::DirectUserIdentifier,
64        room::{MediaSource, ThumbnailInfo},
65    },
66    DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, TransactionId, UserId,
67};
68#[cfg(feature = "experimental-send-custom-to-device")]
69use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices};
70use serde::Deserialize;
71use tasks::BundleReceiverTask;
72use tokio::sync::{Mutex, RwLockReadGuard};
73use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
74use tracing::{debug, error, instrument, trace, warn};
75use url::Url;
76use vodozemac::Curve25519PublicKey;
77
78use self::{
79    backups::{types::BackupClientState, Backups},
80    futures::UploadEncryptedFile,
81    identities::{Device, DeviceUpdates, IdentityUpdates, UserDevices, UserIdentity},
82    recovery::{Recovery, RecoveryState},
83    secret_storage::SecretStorage,
84    tasks::{BackupDownloadTask, BackupUploadingTask, ClientTasks},
85    verification::{SasVerification, Verification, VerificationRequest},
86};
87use crate::{
88    attachment::Thumbnail,
89    client::{ClientInner, WeakClient},
90    error::HttpResult,
91    store_locks::CrossProcessStoreLockGuard,
92    Client, Error, HttpError, Result, Room, RumaApiError, TransmissionProgress,
93};
94
95pub mod backups;
96pub mod futures;
97pub mod identities;
98pub mod recovery;
99pub mod secret_storage;
100pub(crate) mod tasks;
101pub mod verification;
102
103pub use matrix_sdk_base::crypto::{
104    olm::{
105        SessionCreationError as MegolmSessionCreationError,
106        SessionExportError as OlmSessionExportError,
107    },
108    vodozemac, CrossSigningStatus, CryptoStoreError, DecryptorError, EventError, KeyExportError,
109    LocalTrust, MediaEncryptionInfo, MegolmError, OlmError, RoomKeyImportResult, SecretImportError,
110    SessionCreationError, SignatureError, VERSION,
111};
112
113#[cfg(feature = "experimental-send-custom-to-device")]
114use crate::config::RequestConfig;
115pub use crate::error::RoomKeyImportError;
116
117/// All the data related to the encryption state.
118pub(crate) struct EncryptionData {
119    /// Background tasks related to encryption (key backup, initialization
120    /// tasks, etc.).
121    pub tasks: StdMutex<ClientTasks>,
122
123    /// End-to-end encryption settings.
124    pub encryption_settings: EncryptionSettings,
125
126    /// All state related to key backup.
127    pub backup_state: BackupClientState,
128
129    /// All state related to secret storage recovery.
130    pub recovery_state: SharedObservable<RecoveryState>,
131}
132
133impl EncryptionData {
134    pub fn new(encryption_settings: EncryptionSettings) -> Self {
135        Self {
136            encryption_settings,
137
138            tasks: StdMutex::new(Default::default()),
139            backup_state: Default::default(),
140            recovery_state: Default::default(),
141        }
142    }
143
144    pub fn initialize_tasks(&self, client: &Arc<ClientInner>) {
145        let weak_client = WeakClient::from_inner(client);
146
147        let mut tasks = self.tasks.lock();
148        tasks.upload_room_keys = Some(BackupUploadingTask::new(weak_client.clone()));
149
150        if self.encryption_settings.backup_download_strategy
151            == BackupDownloadStrategy::AfterDecryptionFailure
152        {
153            tasks.download_room_keys = Some(BackupDownloadTask::new(weak_client));
154        }
155    }
156
157    /// Initialize the background task which listens for changes in the
158    /// [`backups::BackupState`] and updataes the [`recovery::RecoveryState`].
159    ///
160    /// This should happen after the usual tasks have been set up and after the
161    /// E2EE initialization tasks have been set up.
162    pub fn initialize_recovery_state_update_task(&self, client: &Client) {
163        let mut guard = self.tasks.lock();
164
165        let future = Recovery::update_state_after_backup_state_change(client);
166        let join_handle = spawn(future);
167
168        guard.update_recovery_state_after_backup = Some(join_handle);
169    }
170}
171
172/// Settings for end-to-end encryption features.
173#[derive(Clone, Copy, Debug, Default)]
174pub struct EncryptionSettings {
175    /// Automatically bootstrap cross-signing for a user once they're logged, in
176    /// case it's not already done yet.
177    ///
178    /// This requires to login with a username and password, or that MSC3967 is
179    /// enabled on the server, as of 2023-10-20.
180    pub auto_enable_cross_signing: bool,
181
182    /// Select a strategy to download room keys from the backup, by default room
183    /// keys won't be downloaded from the backup automatically.
184    ///
185    /// Take a look at the [`BackupDownloadStrategy`] enum for more options.
186    pub backup_download_strategy: BackupDownloadStrategy,
187
188    /// Automatically create a backup version if no backup exists.
189    pub auto_enable_backups: bool,
190}
191
192/// Settings for end-to-end encryption features.
193#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
194#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
195pub enum BackupDownloadStrategy {
196    /// Automatically download all room keys from the backup when the backup
197    /// recovery key has been received. The backup recovery key can be received
198    /// in two ways:
199    ///
200    /// 1. Received as a `m.secret.send` to-device event, after a successful
201    ///    interactive verification.
202    /// 2. Imported from secret storage (4S) using the
203    ///    [`SecretStore::import_secrets()`] method.
204    ///
205    /// [`SecretStore::import_secrets()`]: crate::encryption::secret_storage::SecretStore::import_secrets
206    OneShot,
207
208    /// Attempt to download a single room key if an event fails to be decrypted.
209    AfterDecryptionFailure,
210
211    /// Don't download any room keys automatically. The user can manually
212    /// download room keys using the [`Backups::download_room_key()`] methods.
213    ///
214    /// This is the default option.
215    #[default]
216    Manual,
217}
218
219/// The verification state of our own device
220///
221/// This enum tells us if our own user identity trusts these devices, in other
222/// words it tells us if the user identity has signed the device.
223#[derive(Clone, Copy, Debug, Eq, PartialEq)]
224pub enum VerificationState {
225    /// The verification state is unknown for now.
226    Unknown,
227    /// The device is considered to be verified, it has been signed by its user
228    /// identity.
229    Verified,
230    /// The device is unverified.
231    Unverified,
232}
233
234/// Wraps together a `CrossProcessLockStoreGuard` and a generation number.
235#[derive(Debug)]
236pub struct CrossProcessLockStoreGuardWithGeneration {
237    _guard: CrossProcessStoreLockGuard,
238    generation: u64,
239}
240
241impl CrossProcessLockStoreGuardWithGeneration {
242    /// Return the Crypto Store generation associated with this store lock.
243    pub fn generation(&self) -> u64 {
244        self.generation
245    }
246}
247
248/// A stateful struct remembering the cross-signing keys we need to upload.
249///
250/// Since the `/_matrix/client/v3/keys/device_signing/upload` might require
251/// additional authentication, this struct will contain information on the type
252/// of authentication the user needs to complete before the upload might be
253/// continued.
254///
255/// More info can be found in the [spec].
256///
257/// [spec]: https://spec.matrix.org/v1.11/client-server-api/#post_matrixclientv3keysdevice_signingupload
258#[derive(Debug)]
259pub struct CrossSigningResetHandle {
260    client: Client,
261    upload_request: UploadSigningKeysRequest,
262    signatures_request: UploadSignaturesRequest,
263    auth_type: CrossSigningResetAuthType,
264    is_cancelled: Mutex<bool>,
265}
266
267impl CrossSigningResetHandle {
268    /// Set up a new `CrossSigningResetHandle`.
269    pub fn new(
270        client: Client,
271        upload_request: UploadSigningKeysRequest,
272        signatures_request: UploadSignaturesRequest,
273        auth_type: CrossSigningResetAuthType,
274    ) -> Self {
275        Self {
276            client,
277            upload_request,
278            signatures_request,
279            auth_type,
280            is_cancelled: Mutex::new(false),
281        }
282    }
283
284    /// Get the [`CrossSigningResetAuthType`] this cross-signing reset process
285    /// is using.
286    pub fn auth_type(&self) -> &CrossSigningResetAuthType {
287        &self.auth_type
288    }
289
290    /// Continue the cross-signing reset by either waiting for the
291    /// authentication to be done on the side of the OAuth 2.0 server or by
292    /// providing additional [`AuthData`] the homeserver requires.
293    pub async fn auth(&self, auth: Option<AuthData>) -> Result<()> {
294        let mut upload_request = self.upload_request.clone();
295        upload_request.auth = auth;
296
297        while let Err(e) = self.client.send(upload_request.clone()).await {
298            if *self.is_cancelled.lock().await {
299                return Ok(());
300            }
301
302            match e.as_uiaa_response() {
303                Some(uiaa_info) => {
304                    if uiaa_info.auth_error.is_some() {
305                        return Err(e.into());
306                    }
307                }
308                None => return Err(e.into()),
309            }
310        }
311
312        self.client.send(self.signatures_request.clone()).await?;
313
314        Ok(())
315    }
316
317    /// Cancel the ongoing identity reset process
318    pub async fn cancel(&self) {
319        *self.is_cancelled.lock().await = true;
320    }
321}
322
323/// information about the additional authentication that is required before the
324/// cross-signing keys can be uploaded.
325#[derive(Debug, Clone)]
326pub enum CrossSigningResetAuthType {
327    /// The homeserver requires user-interactive authentication.
328    Uiaa(UiaaInfo),
329    /// OAuth 2.0 is used for authentication and the user needs to open a URL to
330    /// approve the upload of cross-signing keys.
331    OAuth(OAuthCrossSigningResetInfo),
332}
333
334impl CrossSigningResetAuthType {
335    fn new(error: &HttpError) -> Result<Option<Self>> {
336        if let Some(auth_info) = error.as_uiaa_response() {
337            if let Ok(auth_info) = OAuthCrossSigningResetInfo::from_auth_info(auth_info) {
338                Ok(Some(CrossSigningResetAuthType::OAuth(auth_info)))
339            } else {
340                Ok(Some(CrossSigningResetAuthType::Uiaa(auth_info.clone())))
341            }
342        } else {
343            Ok(None)
344        }
345    }
346}
347
348/// OAuth 2.0 specific information about the required authentication for the
349/// upload of cross-signing keys.
350#[derive(Debug, Clone, Deserialize)]
351pub struct OAuthCrossSigningResetInfo {
352    /// The URL where the user can approve the reset of the cross-signing keys.
353    pub approval_url: Url,
354}
355
356impl OAuthCrossSigningResetInfo {
357    fn from_auth_info(auth_info: &UiaaInfo) -> Result<Self> {
358        let parameters = serde_json::from_str::<OAuthCrossSigningResetUiaaParameters>(
359            auth_info.params.as_ref().map(|value| value.get()).unwrap_or_default(),
360        )?;
361
362        Ok(OAuthCrossSigningResetInfo { approval_url: parameters.reset.url })
363    }
364}
365
366/// The parsed `parameters` part of a [`ruma::api::client::uiaa::UiaaInfo`]
367/// response
368#[derive(Debug, Deserialize)]
369struct OAuthCrossSigningResetUiaaParameters {
370    /// The URL where the user can approve the reset of the cross-signing keys.
371    #[serde(rename = "org.matrix.cross_signing_reset")]
372    reset: OAuthCrossSigningResetUiaaResetParameter,
373}
374
375/// The `org.matrix.cross_signing_reset` part of the Uiaa response `parameters``
376/// dictionary.
377#[derive(Debug, Deserialize)]
378struct OAuthCrossSigningResetUiaaResetParameter {
379    /// The URL where the user can approve the reset of the cross-signing keys.
380    url: Url,
381}
382
383impl Client {
384    pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
385        self.base_client().olm_machine().await
386    }
387
388    pub(crate) async fn mark_request_as_sent(
389        &self,
390        request_id: &TransactionId,
391        response: impl Into<matrix_sdk_base::crypto::types::requests::AnyIncomingResponse<'_>>,
392    ) -> Result<(), matrix_sdk_base::Error> {
393        Ok(self
394            .olm_machine()
395            .await
396            .as_ref()
397            .expect(
398                "We should have an olm machine once we try to mark E2EE related requests as sent",
399            )
400            .mark_request_as_sent(request_id, response)
401            .await?)
402    }
403
404    /// Query the server for users device keys.
405    ///
406    /// # Panics
407    ///
408    /// Panics if no key query needs to be done.
409    #[instrument(skip(self, device_keys))]
410    pub(crate) async fn keys_query(
411        &self,
412        request_id: &TransactionId,
413        device_keys: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
414    ) -> Result<get_keys::v3::Response> {
415        let request = assign!(get_keys::v3::Request::new(), { device_keys });
416
417        let response = self.send(request).await?;
418        self.mark_request_as_sent(request_id, &response).await?;
419        self.encryption().update_state_after_keys_query(&response).await;
420
421        Ok(response)
422    }
423
424    /// Construct a [`EncryptedFile`][ruma::events::room::EncryptedFile] by
425    /// encrypting and uploading a provided reader.
426    ///
427    /// # Arguments
428    ///
429    /// * `content_type` - The content type of the file.
430    /// * `reader` - The reader that should be encrypted and uploaded.
431    ///
432    /// # Examples
433    ///
434    /// ```no_run
435    /// # use matrix_sdk::Client;
436    /// # use url::Url;
437    /// # use matrix_sdk::ruma::{room_id, OwnedRoomId};
438    /// use serde::{Deserialize, Serialize};
439    /// use matrix_sdk::ruma::events::{macros::EventContent, room::EncryptedFile};
440    ///
441    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
442    /// #[ruma_event(type = "com.example.custom", kind = MessageLike)]
443    /// struct CustomEventContent {
444    ///     encrypted_file: EncryptedFile,
445    /// }
446    ///
447    /// # async {
448    /// # let homeserver = Url::parse("http://example.com")?;
449    /// # let client = Client::new(homeserver).await?;
450    /// # let room = client.get_room(&room_id!("!test:example.com")).unwrap();
451    /// let mut reader = std::io::Cursor::new(b"Hello, world!");
452    /// let encrypted_file = client.upload_encrypted_file(&mut reader).await?;
453    ///
454    /// room.send(CustomEventContent { encrypted_file }).await?;
455    /// # anyhow::Ok(()) };
456    /// ```
457    pub fn upload_encrypted_file<'a, R: Read + ?Sized + 'a>(
458        &'a self,
459        reader: &'a mut R,
460    ) -> UploadEncryptedFile<'a, R> {
461        UploadEncryptedFile::new(self, reader)
462    }
463
464    /// Encrypt and upload the file and thumbnails, and return the source
465    /// information.
466    pub(crate) async fn upload_encrypted_media_and_thumbnail(
467        &self,
468        data: &[u8],
469        thumbnail: Option<Thumbnail>,
470        send_progress: SharedObservable<TransmissionProgress>,
471    ) -> Result<(MediaSource, Option<(MediaSource, Box<ThumbnailInfo>)>)> {
472        let upload_thumbnail = self.upload_encrypted_thumbnail(thumbnail, send_progress.clone());
473
474        let upload_attachment = async {
475            let mut cursor = Cursor::new(data);
476            self.upload_encrypted_file(&mut cursor)
477                .with_send_progress_observable(send_progress)
478                .await
479        };
480
481        let (thumbnail, file) = try_join(upload_thumbnail, upload_attachment).await?;
482
483        Ok((MediaSource::Encrypted(Box::new(file)), thumbnail))
484    }
485
486    /// Uploads an encrypted thumbnail to the media repository, and returns
487    /// its source and extra information.
488    async fn upload_encrypted_thumbnail(
489        &self,
490        thumbnail: Option<Thumbnail>,
491        send_progress: SharedObservable<TransmissionProgress>,
492    ) -> Result<Option<(MediaSource, Box<ThumbnailInfo>)>> {
493        let Some(thumbnail) = thumbnail else {
494            return Ok(None);
495        };
496
497        let (data, _, thumbnail_info) = thumbnail.into_parts();
498        let mut cursor = Cursor::new(data);
499
500        let file = self
501            .upload_encrypted_file(&mut cursor)
502            .with_send_progress_observable(send_progress)
503            .await?;
504
505        Ok(Some((MediaSource::Encrypted(Box::new(file)), thumbnail_info)))
506    }
507
508    /// Claim one-time keys creating new Olm sessions.
509    ///
510    /// # Arguments
511    ///
512    /// * `users` - The list of user/device pairs that we should claim keys for.
513    pub(crate) async fn claim_one_time_keys(
514        &self,
515        users: impl Iterator<Item = &UserId>,
516    ) -> Result<()> {
517        let _lock = self.locks().key_claim_lock.lock().await;
518
519        if let Some((request_id, request)) = self
520            .olm_machine()
521            .await
522            .as_ref()
523            .ok_or(Error::NoOlmMachine)?
524            .get_missing_sessions(users)
525            .await?
526        {
527            let response = self.send(request).await?;
528            self.mark_request_as_sent(&request_id, &response).await?;
529        }
530
531        Ok(())
532    }
533
534    /// Upload the E2E encryption keys.
535    ///
536    /// This uploads the long lived device keys as well as the required amount
537    /// of one-time keys.
538    ///
539    /// # Panics
540    ///
541    /// Panics if the client isn't logged in, or if no encryption keys need to
542    /// be uploaded.
543    #[instrument(skip(self, request))]
544    pub(crate) async fn keys_upload(
545        &self,
546        request_id: &TransactionId,
547        request: &upload_keys::v3::Request,
548    ) -> Result<upload_keys::v3::Response> {
549        debug!(
550            device_keys = request.device_keys.is_some(),
551            one_time_key_count = request.one_time_keys.len(),
552            "Uploading public encryption keys",
553        );
554
555        let response = self.send(request.clone()).await?;
556        self.mark_request_as_sent(request_id, &response).await?;
557
558        Ok(response)
559    }
560
561    pub(crate) async fn room_send_helper(
562        &self,
563        request: &RoomMessageRequest,
564    ) -> Result<send_message_event::v3::Response> {
565        let content = request.content.clone();
566        let txn_id = request.txn_id.clone();
567        let room_id = &request.room_id;
568
569        self.get_room(room_id)
570            .expect("Can't send a message to a room that isn't known to the store")
571            .send(*content)
572            .with_transaction_id(txn_id)
573            .await
574    }
575
576    pub(crate) async fn send_to_device(
577        &self,
578        request: &ToDeviceRequest,
579    ) -> HttpResult<ToDeviceResponse> {
580        let request = RumaToDeviceRequest::new_raw(
581            request.event_type.clone(),
582            request.txn_id.clone(),
583            request.messages.clone(),
584        );
585
586        self.send(request).await
587    }
588
589    pub(crate) async fn send_verification_request(
590        &self,
591        request: OutgoingVerificationRequest,
592    ) -> Result<()> {
593        use matrix_sdk_base::crypto::types::requests::OutgoingVerificationRequest::*;
594
595        match request {
596            ToDevice(t) => {
597                self.send_to_device(&t).await?;
598            }
599            InRoom(r) => {
600                self.room_send_helper(&r).await?;
601            }
602        }
603
604        Ok(())
605    }
606
607    /// Get the existing DM room with the given user, if any.
608    pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
609        let rooms = self.joined_rooms();
610
611        // Find the room we share with the `user_id` and only with `user_id`
612        let room = rooms.into_iter().find(|r| {
613            let targets = r.direct_targets();
614            targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id))
615        });
616
617        trace!(?room, "Found room");
618        room
619    }
620
621    async fn send_outgoing_request(&self, r: OutgoingRequest) -> Result<()> {
622        use matrix_sdk_base::crypto::types::requests::AnyOutgoingRequest;
623
624        match r.request() {
625            AnyOutgoingRequest::KeysQuery(request) => {
626                self.keys_query(r.request_id(), request.device_keys.clone()).await?;
627            }
628            AnyOutgoingRequest::KeysUpload(request) => {
629                let response = self.keys_upload(r.request_id(), request).await;
630
631                if let Err(e) = &response {
632                    match e.as_ruma_api_error() {
633                        Some(RumaApiError::ClientApi(e)) if e.status_code == 400 => {
634                            if let ErrorBody::Standard { message, .. } = &e.body {
635                                // This is one of the nastiest errors we can have. The server
636                                // telling us that we already have a one-time key uploaded means
637                                // that we forgot about some of our one-time keys. This will lead to
638                                // UTDs.
639                                {
640                                    let already_reported = self
641                                        .state_store()
642                                        .get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded)
643                                        .await?
644                                        .is_some();
645
646                                    if message.starts_with("One time key") && !already_reported {
647                                        tracing::error!(
648                                            sentry = true,
649                                            error_message = message,
650                                            "Duplicate one-time keys have been uploaded"
651                                        );
652
653                                        self.state_store()
654                                            .set_kv_data(
655                                                StateStoreDataKey::OneTimeKeyAlreadyUploaded,
656                                                StateStoreDataValue::OneTimeKeyAlreadyUploaded,
657                                            )
658                                            .await?;
659                                    }
660                                }
661                            }
662                        }
663                        _ => {}
664                    }
665
666                    response?;
667                }
668            }
669            AnyOutgoingRequest::ToDeviceRequest(request) => {
670                let response = self.send_to_device(request).await?;
671                self.mark_request_as_sent(r.request_id(), &response).await?;
672            }
673            AnyOutgoingRequest::SignatureUpload(request) => {
674                let response = self.send(request.clone()).await?;
675                self.mark_request_as_sent(r.request_id(), &response).await?;
676            }
677            AnyOutgoingRequest::RoomMessage(request) => {
678                let response = self.room_send_helper(request).await?;
679                self.mark_request_as_sent(r.request_id(), &response).await?;
680            }
681            AnyOutgoingRequest::KeysClaim(request) => {
682                let response = self.send(request.clone()).await?;
683                self.mark_request_as_sent(r.request_id(), &response).await?;
684            }
685        }
686
687        Ok(())
688    }
689
690    #[instrument(skip_all)]
691    pub(crate) async fn send_outgoing_requests(&self) -> Result<()> {
692        const MAX_CONCURRENT_REQUESTS: usize = 20;
693
694        // This is needed because sometimes we need to automatically
695        // claim some one-time keys to unwedge an existing Olm session.
696        if let Err(e) = self.claim_one_time_keys(iter::empty()).await {
697            warn!("Error while claiming one-time keys {:?}", e);
698        }
699
700        let outgoing_requests = stream::iter(
701            self.olm_machine()
702                .await
703                .as_ref()
704                .ok_or(Error::NoOlmMachine)?
705                .outgoing_requests()
706                .await?,
707        )
708        .map(|r| self.send_outgoing_request(r));
709
710        let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS);
711
712        requests
713            .for_each(|r| async move {
714                match r {
715                    Ok(_) => (),
716                    Err(e) => warn!(error = ?e, "Error when sending out an outgoing E2EE request"),
717                }
718            })
719            .await;
720
721        Ok(())
722    }
723}
724
725#[cfg(any(feature = "testing", test))]
726impl Client {
727    /// Get the olm machine, for testing purposes only.
728    pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
729        self.olm_machine().await
730    }
731}
732
733/// A high-level API to manage the client's encryption.
734///
735/// To get this, use [`Client::encryption()`].
736#[derive(Debug, Clone)]
737pub struct Encryption {
738    /// The underlying client.
739    client: Client,
740}
741
742impl Encryption {
743    pub(crate) fn new(client: Client) -> Self {
744        Self { client }
745    }
746
747    /// Returns the current encryption settings for this client.
748    pub(crate) fn settings(&self) -> EncryptionSettings {
749        self.client.inner.e2ee.encryption_settings
750    }
751
752    /// Get the public ed25519 key of our own device. This is usually what is
753    /// called the fingerprint of the device.
754    pub async fn ed25519_key(&self) -> Option<String> {
755        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().ed25519.to_base64())
756    }
757
758    /// Get the public Curve25519 key of our own device.
759    pub async fn curve25519_key(&self) -> Option<Curve25519PublicKey> {
760        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().curve25519)
761    }
762
763    /// Get the current device creation timestamp.
764    pub async fn device_creation_timestamp(&self) -> MilliSecondsSinceUnixEpoch {
765        match self.get_own_device().await {
766            Ok(Some(device)) => device.first_time_seen_ts(),
767            // Should not happen, there should always be an own device
768            _ => MilliSecondsSinceUnixEpoch::now(),
769        }
770    }
771
772    pub(crate) async fn import_secrets_bundle(
773        &self,
774        bundle: &matrix_sdk_base::crypto::types::SecretsBundle,
775    ) -> Result<(), SecretImportError> {
776        let olm_machine = self.client.olm_machine().await;
777        let olm_machine =
778            olm_machine.as_ref().expect("This should only be called once we have an OlmMachine");
779
780        olm_machine.store().import_secrets_bundle(bundle).await
781    }
782
783    /// Get the status of the private cross signing keys.
784    ///
785    /// This can be used to check which private cross signing keys we have
786    /// stored locally.
787    pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
788        let olm = self.client.olm_machine().await;
789        let machine = olm.as_ref()?;
790        Some(machine.cross_signing_status().await)
791    }
792
793    /// Get all the tracked users we know about
794    ///
795    /// Tracked users are users for which we keep the device list of E2EE
796    /// capable devices up to date.
797    pub async fn tracked_users(&self) -> Result<HashSet<OwnedUserId>, CryptoStoreError> {
798        if let Some(machine) = self.client.olm_machine().await.as_ref() {
799            machine.tracked_users().await
800        } else {
801            Ok(HashSet::new())
802        }
803    }
804
805    /// Get a [`Subscriber`] for the [`VerificationState`].
806    ///
807    /// # Examples
808    ///
809    /// ```no_run
810    /// use matrix_sdk::{encryption, Client};
811    /// use url::Url;
812    ///
813    /// # async {
814    /// let homeserver = Url::parse("http://example.com")?;
815    /// let client = Client::new(homeserver).await?;
816    /// let mut subscriber = client.encryption().verification_state();
817    ///
818    /// let current_value = subscriber.get();
819    ///
820    /// println!("The current verification state is: {current_value:?}");
821    ///
822    /// if let Some(verification_state) = subscriber.next().await {
823    ///     println!("Received verification state update {:?}", verification_state)
824    /// }
825    /// # anyhow::Ok(()) };
826    /// ```
827    pub fn verification_state(&self) -> Subscriber<VerificationState> {
828        self.client.inner.verification_state.subscribe_reset()
829    }
830
831    /// Get a verification object with the given flow id.
832    pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
833        let olm = self.client.olm_machine().await;
834        let olm = olm.as_ref()?;
835        #[allow(clippy::bind_instead_of_map)]
836        olm.get_verification(user_id, flow_id).and_then(|v| match v {
837            matrix_sdk_base::crypto::Verification::SasV1(sas) => {
838                Some(SasVerification { inner: sas, client: self.client.clone() }.into())
839            }
840            #[cfg(feature = "qrcode")]
841            matrix_sdk_base::crypto::Verification::QrV1(qr) => {
842                Some(verification::QrVerification { inner: qr, client: self.client.clone() }.into())
843            }
844            _ => None,
845        })
846    }
847
848    /// Get a `VerificationRequest` object for the given user with the given
849    /// flow id.
850    pub async fn get_verification_request(
851        &self,
852        user_id: &UserId,
853        flow_id: impl AsRef<str>,
854    ) -> Option<VerificationRequest> {
855        let olm = self.client.olm_machine().await;
856        let olm = olm.as_ref()?;
857
858        olm.get_verification_request(user_id, flow_id)
859            .map(|r| VerificationRequest { inner: r, client: self.client.clone() })
860    }
861
862    /// Get a specific device of a user.
863    ///
864    /// # Arguments
865    ///
866    /// * `user_id` - The unique id of the user that the device belongs to.
867    ///
868    /// * `device_id` - The unique id of the device.
869    ///
870    /// Returns a `Device` if one is found and the crypto store didn't throw an
871    /// error.
872    ///
873    /// This will always return None if the client hasn't been logged in.
874    ///
875    /// # Examples
876    ///
877    /// ```no_run
878    /// # use matrix_sdk::{Client, ruma::{device_id, user_id}};
879    /// # use url::Url;
880    /// # async {
881    /// # let alice = user_id!("@alice:example.org");
882    /// # let homeserver = Url::parse("http://example.com")?;
883    /// # let client = Client::new(homeserver).await?;
884    /// if let Some(device) =
885    ///     client.encryption().get_device(alice, device_id!("DEVICEID")).await?
886    /// {
887    ///     println!("{:?}", device.is_verified());
888    ///
889    ///     if !device.is_verified() {
890    ///         let verification = device.request_verification().await?;
891    ///     }
892    /// }
893    /// # anyhow::Ok(()) };
894    /// ```
895    pub async fn get_device(
896        &self,
897        user_id: &UserId,
898        device_id: &DeviceId,
899    ) -> Result<Option<Device>, CryptoStoreError> {
900        let olm = self.client.olm_machine().await;
901        let Some(machine) = olm.as_ref() else { return Ok(None) };
902        let device = machine.get_device(user_id, device_id, None).await?;
903        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
904    }
905
906    /// A convenience method to retrieve your own device from the store.
907    ///
908    /// This is the same as calling [`Encryption::get_device()`] with your own
909    /// user and device ID.
910    ///
911    /// This will always return a device, unless you are not logged in.
912    pub async fn get_own_device(&self) -> Result<Option<Device>, CryptoStoreError> {
913        let olm = self.client.olm_machine().await;
914        let Some(machine) = olm.as_ref() else { return Ok(None) };
915        let device = machine.get_device(machine.user_id(), machine.device_id(), None).await?;
916        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
917    }
918
919    /// Get a map holding all the devices of an user.
920    ///
921    /// This will always return an empty map if the client hasn't been logged
922    /// in.
923    ///
924    /// # Arguments
925    ///
926    /// * `user_id` - The unique id of the user that the devices belong to.
927    ///
928    /// # Examples
929    ///
930    /// ```no_run
931    /// # use matrix_sdk::{Client, ruma::user_id};
932    /// # use url::Url;
933    /// # async {
934    /// # let alice = user_id!("@alice:example.org");
935    /// # let homeserver = Url::parse("http://example.com")?;
936    /// # let client = Client::new(homeserver).await?;
937    /// let devices = client.encryption().get_user_devices(alice).await?;
938    ///
939    /// for device in devices.devices() {
940    ///     println!("{device:?}");
941    /// }
942    /// # anyhow::Ok(()) };
943    /// ```
944    pub async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices, Error> {
945        let devices = self
946            .client
947            .olm_machine()
948            .await
949            .as_ref()
950            .ok_or(Error::NoOlmMachine)?
951            .get_user_devices(user_id, None)
952            .await?;
953
954        Ok(UserDevices { inner: devices, client: self.client.clone() })
955    }
956
957    /// Get the E2EE identity of a user from the crypto store.
958    ///
959    /// Usually, we only have the E2EE identity of a user locally if the user
960    /// is tracked, meaning that we are both members of the same encrypted room.
961    ///
962    /// To get the E2EE identity of a user even if it is not available locally
963    /// use [`Encryption::request_user_identity()`].
964    ///
965    /// # Arguments
966    ///
967    /// * `user_id` - The unique id of the user that the identity belongs to.
968    ///
969    /// Returns a `UserIdentity` if one is found and the crypto store
970    /// didn't throw an error.
971    ///
972    /// This will always return None if the client hasn't been logged in.
973    ///
974    /// # Examples
975    ///
976    /// ```no_run
977    /// # use matrix_sdk::{Client, ruma::user_id};
978    /// # use url::Url;
979    /// # async {
980    /// # let alice = user_id!("@alice:example.org");
981    /// # let homeserver = Url::parse("http://example.com")?;
982    /// # let client = Client::new(homeserver).await?;
983    /// let user = client.encryption().get_user_identity(alice).await?;
984    ///
985    /// if let Some(user) = user {
986    ///     println!("{:?}", user.is_verified());
987    ///
988    ///     let verification = user.request_verification().await?;
989    /// }
990    /// # anyhow::Ok(()) };
991    /// ```
992    pub async fn get_user_identity(
993        &self,
994        user_id: &UserId,
995    ) -> Result<Option<UserIdentity>, CryptoStoreError> {
996        let olm = self.client.olm_machine().await;
997        let Some(olm) = olm.as_ref() else { return Ok(None) };
998        let identity = olm.get_identity(user_id, None).await?;
999
1000        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1001    }
1002
1003    /// Get the E2EE identity of a user from the homeserver.
1004    ///
1005    /// The E2EE identity returned is always guaranteed to be up-to-date. If the
1006    /// E2EE identity is not found, it should mean that the user did not set
1007    /// up cross-signing.
1008    ///
1009    /// If you want the E2EE identity of a user without making a request to the
1010    /// homeserver, use [`Encryption::get_user_identity()`] instead.
1011    ///
1012    /// # Arguments
1013    ///
1014    /// * `user_id` - The ID of the user that the identity belongs to.
1015    ///
1016    /// Returns a [`UserIdentity`] if one is found. Returns an error if there
1017    /// was an issue with the crypto store or with the request to the
1018    /// homeserver.
1019    ///
1020    /// This will always return `None` if the client hasn't been logged in.
1021    ///
1022    /// # Examples
1023    ///
1024    /// ```no_run
1025    /// # use matrix_sdk::{Client, ruma::user_id};
1026    /// # use url::Url;
1027    /// # async {
1028    /// # let alice = user_id!("@alice:example.org");
1029    /// # let homeserver = Url::parse("http://example.com")?;
1030    /// # let client = Client::new(homeserver).await?;
1031    /// let user = client.encryption().request_user_identity(alice).await?;
1032    ///
1033    /// if let Some(user) = user {
1034    ///     println!("User is verified: {:?}", user.is_verified());
1035    ///
1036    ///     let verification = user.request_verification().await?;
1037    /// }
1038    /// # anyhow::Ok(()) };
1039    /// ```
1040    pub async fn request_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
1041        let olm = self.client.olm_machine().await;
1042        let Some(olm) = olm.as_ref() else { return Ok(None) };
1043
1044        let (request_id, request) = olm.query_keys_for_users(iter::once(user_id));
1045        self.client.keys_query(&request_id, request.device_keys).await?;
1046
1047        let identity = olm.get_identity(user_id, None).await?;
1048        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1049    }
1050
1051    /// Returns a stream of device updates, allowing users to listen for
1052    /// notifications about new or changed devices.
1053    ///
1054    /// The stream produced by this method emits updates whenever a new device
1055    /// is discovered or when an existing device's information is changed. Users
1056    /// can subscribe to this stream and receive updates in real-time.
1057    ///
1058    /// # Examples
1059    ///
1060    /// ```no_run
1061    /// # use matrix_sdk::Client;
1062    /// # use ruma::{device_id, user_id};
1063    /// # use futures_util::{pin_mut, StreamExt};
1064    /// # let client: Client = unimplemented!();
1065    /// # async {
1066    /// let devices_stream = client.encryption().devices_stream().await?;
1067    /// let user_id = client
1068    ///     .user_id()
1069    ///     .expect("We should know our user id after we have logged in");
1070    /// pin_mut!(devices_stream);
1071    ///
1072    /// for device_updates in devices_stream.next().await {
1073    ///     if let Some(user_devices) = device_updates.new.get(user_id) {
1074    ///         for device in user_devices.values() {
1075    ///             println!("A new device has been added {}", device.device_id());
1076    ///         }
1077    ///     }
1078    /// }
1079    /// # anyhow::Ok(()) };
1080    /// ```
1081    pub async fn devices_stream(&self) -> Result<impl Stream<Item = DeviceUpdates>> {
1082        let olm = self.client.olm_machine().await;
1083        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1084        let client = self.client.to_owned();
1085
1086        Ok(olm
1087            .store()
1088            .devices_stream()
1089            .map(move |updates| DeviceUpdates::new(client.to_owned(), updates)))
1090    }
1091
1092    /// Returns a stream of user identity updates, allowing users to listen for
1093    /// notifications about new or changed user identities.
1094    ///
1095    /// The stream produced by this method emits updates whenever a new user
1096    /// identity is discovered or when an existing identities information is
1097    /// changed. Users can subscribe to this stream and receive updates in
1098    /// real-time.
1099    ///
1100    /// # Examples
1101    ///
1102    /// ```no_run
1103    /// # use matrix_sdk::Client;
1104    /// # use ruma::{device_id, user_id};
1105    /// # use futures_util::{pin_mut, StreamExt};
1106    /// # let client: Client = unimplemented!();
1107    /// # async {
1108    /// let identities_stream =
1109    ///     client.encryption().user_identities_stream().await?;
1110    /// pin_mut!(identities_stream);
1111    ///
1112    /// for identity_updates in identities_stream.next().await {
1113    ///     for (_, identity) in identity_updates.new {
1114    ///         println!("A new identity has been added {}", identity.user_id());
1115    ///     }
1116    /// }
1117    /// # anyhow::Ok(()) };
1118    /// ```
1119    pub async fn user_identities_stream(&self) -> Result<impl Stream<Item = IdentityUpdates>> {
1120        let olm = self.client.olm_machine().await;
1121        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1122        let client = self.client.to_owned();
1123
1124        Ok(olm
1125            .store()
1126            .user_identities_stream()
1127            .map(move |updates| IdentityUpdates::new(client.to_owned(), updates)))
1128    }
1129
1130    /// Create and upload a new cross signing identity.
1131    ///
1132    /// # Arguments
1133    ///
1134    /// * `auth_data` - This request requires user interactive auth, the first
1135    ///   request needs to set this to `None` and will always fail with an
1136    ///   `UiaaResponse`. The response will contain information for the
1137    ///   interactive auth and the same request needs to be made but this time
1138    ///   with some `auth_data` provided.
1139    ///
1140    /// # Examples
1141    ///
1142    /// ```no_run
1143    /// # use std::collections::BTreeMap;
1144    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1145    /// # use url::Url;
1146    /// # use serde_json::json;
1147    /// # async {
1148    /// # let homeserver = Url::parse("http://example.com")?;
1149    /// # let client = Client::new(homeserver).await?;
1150    /// if let Err(e) = client.encryption().bootstrap_cross_signing(None).await {
1151    ///     if let Some(response) = e.as_uiaa_response() {
1152    ///         let mut password = uiaa::Password::new(
1153    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
1154    ///             "wordpass".to_owned(),
1155    ///         );
1156    ///         password.session = response.session.clone();
1157    ///
1158    ///         client
1159    ///             .encryption()
1160    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1161    ///             .await
1162    ///             .expect("Couldn't bootstrap cross signing")
1163    ///     } else {
1164    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1165    ///     }
1166    /// }
1167    /// # anyhow::Ok(()) };
1168    pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData>) -> Result<()> {
1169        let olm = self.client.olm_machine().await;
1170        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1171
1172        let CrossSigningBootstrapRequests {
1173            upload_signing_keys_req,
1174            upload_keys_req,
1175            upload_signatures_req,
1176        } = olm.bootstrap_cross_signing(false).await?;
1177
1178        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1179            auth: auth_data,
1180            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1181            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1182            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1183        });
1184
1185        if let Some(req) = upload_keys_req {
1186            self.client.send_outgoing_request(req).await?;
1187        }
1188        self.client.send(upload_signing_keys_req).await?;
1189        self.client.send(upload_signatures_req).await?;
1190
1191        Ok(())
1192    }
1193
1194    /// Reset the cross-signing keys.
1195    ///
1196    /// # Example
1197    ///
1198    /// ```no_run
1199    /// # use matrix_sdk::{ruma::api::client::uiaa, Client, encryption::CrossSigningResetAuthType};
1200    /// # use url::Url;
1201    /// # async {
1202    /// # let homeserver = Url::parse("http://example.com")?;
1203    /// # let client = Client::new(homeserver).await?;
1204    /// # let user_id = unimplemented!();
1205    /// let encryption = client.encryption();
1206    ///
1207    /// if let Some(handle) = encryption.reset_cross_signing().await? {
1208    ///     match handle.auth_type() {
1209    ///         CrossSigningResetAuthType::Uiaa(uiaa) => {
1210    ///             use matrix_sdk::ruma::api::client::uiaa;
1211    ///
1212    ///             let password = "1234".to_owned();
1213    ///             let mut password = uiaa::Password::new(user_id, password);
1214    ///             password.session = uiaa.session;
1215    ///
1216    ///             handle.auth(Some(uiaa::AuthData::Password(password))).await?;
1217    ///         }
1218    ///         CrossSigningResetAuthType::OAuth(o) => {
1219    ///             println!(
1220    ///                 "To reset your end-to-end encryption cross-signing identity, \
1221    ///                 you first need to approve it at {}",
1222    ///                 o.approval_url
1223    ///             );
1224    ///             handle.auth(None).await?;
1225    ///         }
1226    ///     }
1227    /// }
1228    /// # anyhow::Ok(()) };
1229    /// ```
1230    pub async fn reset_cross_signing(&self) -> Result<Option<CrossSigningResetHandle>> {
1231        let olm = self.client.olm_machine().await;
1232        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1233
1234        let CrossSigningBootstrapRequests {
1235            upload_keys_req,
1236            upload_signing_keys_req,
1237            upload_signatures_req,
1238        } = olm.bootstrap_cross_signing(true).await?;
1239
1240        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1241            auth: None,
1242            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1243            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1244            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1245        });
1246
1247        if let Some(req) = upload_keys_req {
1248            self.client.send_outgoing_request(req).await?;
1249        }
1250
1251        if let Err(error) = self.client.send(upload_signing_keys_req.clone()).await {
1252            if let Ok(Some(auth_type)) = CrossSigningResetAuthType::new(&error) {
1253                let client = self.client.clone();
1254
1255                Ok(Some(CrossSigningResetHandle::new(
1256                    client,
1257                    upload_signing_keys_req,
1258                    upload_signatures_req,
1259                    auth_type,
1260                )))
1261            } else {
1262                Err(error.into())
1263            }
1264        } else {
1265            self.client.send(upload_signatures_req).await?;
1266
1267            Ok(None)
1268        }
1269    }
1270
1271    /// Query the user's own device keys, if, and only if, we didn't have their
1272    /// identity in the first place.
1273    async fn ensure_initial_key_query(&self) -> Result<()> {
1274        let olm_machine = self.client.olm_machine().await;
1275        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1276
1277        let user_id = olm_machine.user_id();
1278
1279        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1280            let (request_id, request) = olm_machine.query_keys_for_users([olm_machine.user_id()]);
1281            self.client.keys_query(&request_id, request.device_keys).await?;
1282        }
1283
1284        Ok(())
1285    }
1286
1287    /// Create and upload a new cross signing identity, if that has not been
1288    /// done yet.
1289    ///
1290    /// This will only create a new cross-signing identity if the user had never
1291    /// done it before. If the user did it before, then this is a no-op.
1292    ///
1293    /// See also the documentation of [`Self::bootstrap_cross_signing`] for the
1294    /// behavior of this function.
1295    ///
1296    /// # Arguments
1297    ///
1298    /// * `auth_data` - This request requires user interactive auth, the first
1299    ///   request needs to set this to `None` and will always fail with an
1300    ///   `UiaaResponse`. The response will contain information for the
1301    ///   interactive auth and the same request needs to be made but this time
1302    ///   with some `auth_data` provided.
1303    ///
1304    /// # Examples
1305    /// ```no_run
1306    /// # use std::collections::BTreeMap;
1307    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1308    /// # use url::Url;
1309    /// # use serde_json::json;
1310    /// # async {
1311    /// # let homeserver = Url::parse("http://example.com")?;
1312    /// # let client = Client::new(homeserver).await?;
1313    /// if let Err(e) = client.encryption().bootstrap_cross_signing_if_needed(None).await {
1314    ///     if let Some(response) = e.as_uiaa_response() {
1315    ///         let mut password = uiaa::Password::new(
1316    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
1317    ///             "wordpass".to_owned(),
1318    ///         );
1319    ///         password.session = response.session.clone();
1320    ///
1321    ///         // Note, on the failed attempt we can use `bootstrap_cross_signing` immediately, to
1322    ///         // avoid checks.
1323    ///         client
1324    ///             .encryption()
1325    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1326    ///             .await
1327    ///             .expect("Couldn't bootstrap cross signing")
1328    ///     } else {
1329    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1330    ///     }
1331    /// }
1332    /// # anyhow::Ok(()) };
1333    pub async fn bootstrap_cross_signing_if_needed(
1334        &self,
1335        auth_data: Option<AuthData>,
1336    ) -> Result<()> {
1337        let olm_machine = self.client.olm_machine().await;
1338        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1339        let user_id = olm_machine.user_id();
1340
1341        self.ensure_initial_key_query().await?;
1342
1343        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1344            self.bootstrap_cross_signing(auth_data).await?;
1345        }
1346
1347        Ok(())
1348    }
1349
1350    /// Export E2EE keys that match the given predicate encrypting them with the
1351    /// given passphrase.
1352    ///
1353    /// # Arguments
1354    ///
1355    /// * `path` - The file path where the exported key file will be saved.
1356    ///
1357    /// * `passphrase` - The passphrase that will be used to encrypt the
1358    ///   exported room keys.
1359    ///
1360    /// * `predicate` - A closure that will be called for every known
1361    ///   `InboundGroupSession`, which represents a room key. If the closure
1362    ///   returns `true` the `InboundGroupSessoin` will be included in the
1363    ///   export, if the closure returns `false` it will not be included.
1364    ///
1365    /// # Panics
1366    ///
1367    /// This method will panic if it isn't run on a Tokio runtime.
1368    ///
1369    /// This method will panic if it can't get enough randomness from the OS to
1370    /// encrypt the exported keys securely.
1371    ///
1372    /// # Examples
1373    ///
1374    /// ```no_run
1375    /// # use std::{path::PathBuf, time::Duration};
1376    /// # use matrix_sdk::{
1377    /// #     Client, config::SyncSettings,
1378    /// #     ruma::room_id,
1379    /// # };
1380    /// # use url::Url;
1381    /// # async {
1382    /// # let homeserver = Url::parse("http://localhost:8080")?;
1383    /// # let mut client = Client::new(homeserver).await?;
1384    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1385    /// // Export all room keys.
1386    /// client
1387    ///     .encryption()
1388    ///     .export_room_keys(path, "secret-passphrase", |_| true)
1389    ///     .await?;
1390    ///
1391    /// // Export only the room keys for a certain room.
1392    /// let path = PathBuf::from("/home/example/e2e-room-keys.txt");
1393    /// let room_id = room_id!("!test:localhost");
1394    ///
1395    /// client
1396    ///     .encryption()
1397    ///     .export_room_keys(path, "secret-passphrase", |s| s.room_id() == room_id)
1398    ///     .await?;
1399    /// # anyhow::Ok(()) };
1400    /// ```
1401    #[cfg(not(target_family = "wasm"))]
1402    pub async fn export_room_keys(
1403        &self,
1404        path: PathBuf,
1405        passphrase: &str,
1406        predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
1407    ) -> Result<()> {
1408        let olm = self.client.olm_machine().await;
1409        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1410
1411        let keys = olm.store().export_room_keys(predicate).await?;
1412        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1413
1414        let encrypt = move || -> Result<()> {
1415            let export: String =
1416                matrix_sdk_base::crypto::encrypt_room_key_export(&keys, &passphrase, 500_000)?;
1417            let mut file = std::fs::File::create(path)?;
1418            file.write_all(&export.into_bytes())?;
1419            Ok(())
1420        };
1421
1422        let task = tokio::task::spawn_blocking(encrypt);
1423        task.await.expect("Task join error")
1424    }
1425
1426    /// Import E2EE keys from the given file path.
1427    ///
1428    /// # Arguments
1429    ///
1430    /// * `path` - The file path where the exported key file will can be found.
1431    ///
1432    /// * `passphrase` - The passphrase that should be used to decrypt the
1433    ///   exported room keys.
1434    ///
1435    /// Returns a tuple of numbers that represent the number of sessions that
1436    /// were imported and the total number of sessions that were found in the
1437    /// key export.
1438    ///
1439    /// # Panics
1440    ///
1441    /// This method will panic if it isn't run on a Tokio runtime.
1442    ///
1443    /// ```no_run
1444    /// # use std::{path::PathBuf, time::Duration};
1445    /// # use matrix_sdk::{
1446    /// #     Client, config::SyncSettings,
1447    /// #     ruma::room_id,
1448    /// # };
1449    /// # use url::Url;
1450    /// # async {
1451    /// # let homeserver = Url::parse("http://localhost:8080")?;
1452    /// # let mut client = Client::new(homeserver).await?;
1453    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1454    /// let result =
1455    ///     client.encryption().import_room_keys(path, "secret-passphrase").await?;
1456    ///
1457    /// println!(
1458    ///     "Imported {} room keys out of {}",
1459    ///     result.imported_count, result.total_count
1460    /// );
1461    /// # anyhow::Ok(()) };
1462    /// ```
1463    #[cfg(not(target_family = "wasm"))]
1464    pub async fn import_room_keys(
1465        &self,
1466        path: PathBuf,
1467        passphrase: &str,
1468    ) -> Result<RoomKeyImportResult, RoomKeyImportError> {
1469        let olm = self.client.olm_machine().await;
1470        let olm = olm.as_ref().ok_or(RoomKeyImportError::StoreClosed)?;
1471        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1472
1473        let decrypt = move || {
1474            let file = std::fs::File::open(path)?;
1475            matrix_sdk_base::crypto::decrypt_room_key_export(file, &passphrase)
1476        };
1477
1478        let task = tokio::task::spawn_blocking(decrypt);
1479        let import = task.await.expect("Task join error")?;
1480
1481        let ret = olm.store().import_exported_room_keys(import, |_, _| {}).await?;
1482
1483        self.backups().maybe_trigger_backup();
1484
1485        Ok(ret)
1486    }
1487
1488    /// Receive notifications of room keys being received as a [`Stream`].
1489    ///
1490    /// Each time a room key is updated in any way, an update will be sent to
1491    /// the stream. Updates that happen at the same time are batched into a
1492    /// [`Vec`].
1493    ///
1494    /// If the reader of the stream lags too far behind, an error is broadcast
1495    /// containing the number of skipped items.
1496    ///
1497    /// # Examples
1498    ///
1499    /// ```no_run
1500    /// # use matrix_sdk::Client;
1501    /// # use url::Url;
1502    /// # async {
1503    /// # let homeserver = Url::parse("http://example.com")?;
1504    /// # let client = Client::new(homeserver).await?;
1505    /// use futures_util::StreamExt;
1506    ///
1507    /// let Some(mut room_keys_stream) =
1508    ///     client.encryption().room_keys_received_stream().await
1509    /// else {
1510    ///     return Ok(());
1511    /// };
1512    ///
1513    /// while let Some(update) = room_keys_stream.next().await {
1514    ///     println!("Received room keys {update:?}");
1515    /// }
1516    /// # anyhow::Ok(()) };
1517    /// ```
1518    pub async fn room_keys_received_stream(
1519        &self,
1520    ) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>>> {
1521        let olm = self.client.olm_machine().await;
1522        let olm = olm.as_ref()?;
1523
1524        Some(olm.store().room_keys_received_stream())
1525    }
1526
1527    /// Receive notifications of historic room key bundles as a [`Stream`].
1528    ///
1529    /// Historic room key bundles are defined in [MSC4268](https://github.com/matrix-org/matrix-spec-proposals/pull/4268).
1530    ///
1531    /// Each time a historic room key bundle was received, an update will be
1532    /// sent to the stream. This stream is useful for informative purposes
1533    /// exclusively, historic room key bundles are handled by the SDK
1534    /// automatically.
1535    ///
1536    /// # Examples
1537    ///
1538    /// ```no_run
1539    /// # use matrix_sdk::Client;
1540    /// # use url::Url;
1541    /// # async {
1542    /// # let homeserver = Url::parse("http://example.com")?;
1543    /// # let client = Client::new(homeserver).await?;
1544    /// use futures_util::StreamExt;
1545    ///
1546    /// let Some(mut bundle_stream) =
1547    ///     client.encryption().historic_room_key_stream().await
1548    /// else {
1549    ///     return Ok(());
1550    /// };
1551    ///
1552    /// while let Some(bundle_info) = bundle_stream.next().await {
1553    ///     println!("Received a historic room key bundle {bundle_info:?}");
1554    /// }
1555    /// # anyhow::Ok(()) };
1556    /// ```
1557    pub async fn historic_room_key_stream(&self) -> Option<impl Stream<Item = RoomKeyBundleInfo>> {
1558        let olm = self.client.olm_machine().await;
1559        let olm = olm.as_ref()?;
1560
1561        Some(olm.store().historic_room_key_stream())
1562    }
1563
1564    /// Get the secret storage manager of the client.
1565    pub fn secret_storage(&self) -> SecretStorage {
1566        SecretStorage { client: self.client.to_owned() }
1567    }
1568
1569    /// Get the backups manager of the client.
1570    pub fn backups(&self) -> Backups {
1571        Backups { client: self.client.to_owned() }
1572    }
1573
1574    /// Get the recovery manager of the client.
1575    pub fn recovery(&self) -> Recovery {
1576        Recovery { client: self.client.to_owned() }
1577    }
1578
1579    /// Enables the crypto-store cross-process lock.
1580    ///
1581    /// This may be required if there are multiple processes that may do writes
1582    /// to the same crypto store. In that case, it's necessary to create a
1583    /// lock, so that only one process writes to it, otherwise this may
1584    /// cause confusing issues because of stale data contained in in-memory
1585    /// caches.
1586    ///
1587    /// The provided `lock_value` must be a unique identifier for this process.
1588    /// Check [`Client::cross_process_store_locks_holder_name`] to
1589    /// get the global value.
1590    pub async fn enable_cross_process_store_lock(&self, lock_value: String) -> Result<(), Error> {
1591        // If the lock has already been created, don't recreate it from scratch.
1592        if let Some(prev_lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1593            let prev_holder = prev_lock.lock_holder();
1594            if prev_holder == lock_value {
1595                return Ok(());
1596            }
1597            warn!(
1598                "Recreating cross-process store lock with a different holder value: \
1599                 prev was {prev_holder}, new is {lock_value}"
1600            );
1601        }
1602
1603        let olm_machine = self.client.base_client().olm_machine().await;
1604        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1605
1606        let lock =
1607            olm_machine.store().create_store_lock("cross_process_lock".to_owned(), lock_value);
1608
1609        // Gently try to initialize the crypto store generation counter.
1610        //
1611        // If we don't get the lock immediately, then it is already acquired by another
1612        // process, and we'll get to reload next time we acquire the lock.
1613        {
1614            let guard = lock.try_lock_once().await?;
1615            if guard.is_some() {
1616                olm_machine
1617                    .initialize_crypto_store_generation(
1618                        &self.client.locks().crypto_store_generation,
1619                    )
1620                    .await?;
1621            }
1622        }
1623
1624        self.client
1625            .locks()
1626            .cross_process_crypto_store_lock
1627            .set(lock)
1628            .map_err(|_| Error::BadCryptoStoreState)?;
1629
1630        Ok(())
1631    }
1632
1633    /// Maybe reload the `OlmMachine` after acquiring the lock for the first
1634    /// time.
1635    ///
1636    /// Returns the current generation number.
1637    async fn on_lock_newly_acquired(&self) -> Result<u64, Error> {
1638        let olm_machine_guard = self.client.olm_machine().await;
1639        if let Some(olm_machine) = olm_machine_guard.as_ref() {
1640            let (new_gen, generation_number) = olm_machine
1641                .maintain_crypto_store_generation(&self.client.locks().crypto_store_generation)
1642                .await?;
1643            // If the crypto store generation has changed,
1644            if new_gen {
1645                // (get rid of the reference to the current crypto store first)
1646                drop(olm_machine_guard);
1647                // Recreate the OlmMachine.
1648                self.client.base_client().regenerate_olm(None).await?;
1649            }
1650            Ok(generation_number)
1651        } else {
1652            // XXX: not sure this is reachable. Seems like the OlmMachine should always have
1653            // been initialised by the time we get here. Ideally we'd panic, or return an
1654            // error, but for now I'm just adding some logging to check if it
1655            // happens, and returning the magic number 0.
1656            warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised");
1657            Ok(0)
1658        }
1659    }
1660
1661    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1662    /// spin-waits until the lock is available.
1663    ///
1664    /// May reload the `OlmMachine`, after obtaining the lock but not on the
1665    /// first time.
1666    pub async fn spin_lock_store(
1667        &self,
1668        max_backoff: Option<u32>,
1669    ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1670        if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1671            let guard = lock.spin_lock(max_backoff).await?;
1672
1673            let generation = self.on_lock_newly_acquired().await?;
1674
1675            Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation }))
1676        } else {
1677            Ok(None)
1678        }
1679    }
1680
1681    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1682    /// attempts to lock it once.
1683    ///
1684    /// Returns a guard to the lock, if it was obtained.
1685    pub async fn try_lock_store_once(
1686        &self,
1687    ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1688        if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1689            let maybe_guard = lock.try_lock_once().await?;
1690
1691            let Some(guard) = maybe_guard else {
1692                return Ok(None);
1693            };
1694
1695            let generation = self.on_lock_newly_acquired().await?;
1696
1697            Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation }))
1698        } else {
1699            Ok(None)
1700        }
1701    }
1702
1703    /// Testing purposes only.
1704    #[cfg(any(test, feature = "testing"))]
1705    pub async fn uploaded_key_count(&self) -> Result<u64> {
1706        let olm_machine = self.client.olm_machine().await;
1707        let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?;
1708        Ok(olm_machine.uploaded_key_count().await?)
1709    }
1710
1711    /// Bootstrap encryption and enables event listeners for the E2EE support.
1712    ///
1713    /// Based on the `EncryptionSettings`, this call might:
1714    /// - Bootstrap cross-signing if needed (POST `/device_signing/upload`)
1715    /// - Create a key backup if needed (POST `/room_keys/version`)
1716    /// - Create a secret storage if needed (PUT `/account_data/{type}`)
1717    ///
1718    /// As part of this process, and if needed, the current device keys would be
1719    /// uploaded to the server, new account data would be added, and cross
1720    /// signing keys and signatures might be uploaded.
1721    ///
1722    /// Should be called once we
1723    /// created a [`OlmMachine`], i.e. after logging in.
1724    ///
1725    /// # Arguments
1726    ///
1727    /// * `auth_data` - Some requests may require re-authentication. To prevent
1728    ///   the user from having to re-enter their password (or use other
1729    ///   methods), we can provide the authentication data here. This is
1730    ///   necessary for uploading cross-signing keys. However, please note that
1731    ///   there is a proposal (MSC3967) to remove this requirement, which would
1732    ///   allow for the initial upload of cross-signing keys without
1733    ///   authentication, rendering this parameter obsolete.
1734    pub(crate) async fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1735        // It's fine to be async here as we're only getting the lock protecting the
1736        // `OlmMachine`. Since the lock shouldn't be that contested right after logging
1737        // in we won't delay the login or restoration of the Client.
1738        let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite {
1739            Some(BundleReceiverTask::new(&self.client).await)
1740        } else {
1741            None
1742        };
1743
1744        let mut tasks = self.client.inner.e2ee.tasks.lock();
1745
1746        let this = self.clone();
1747
1748        tasks.setup_e2ee = Some(spawn(async move {
1749            // Update the current state first, so we don't have to wait for the result of
1750            // network requests
1751            this.update_verification_state().await;
1752
1753            if this.settings().auto_enable_cross_signing {
1754                if let Err(e) = this.bootstrap_cross_signing_if_needed(auth_data).await {
1755                    error!("Couldn't bootstrap cross signing {e:?}");
1756                }
1757            }
1758
1759            if let Err(e) = this.backups().setup_and_resume().await {
1760                error!("Couldn't setup and resume backups {e:?}");
1761            }
1762            if let Err(e) = this.recovery().setup().await {
1763                error!("Couldn't setup and resume recovery {e:?}");
1764            }
1765        }));
1766
1767        tasks.receive_historic_room_key_bundles = bundle_receiver_task;
1768    }
1769
1770    /// Waits for end-to-end encryption initialization tasks to finish, if any
1771    /// was running in the background.
1772    pub async fn wait_for_e2ee_initialization_tasks(&self) {
1773        let task = self.client.inner.e2ee.tasks.lock().setup_e2ee.take();
1774
1775        if let Some(task) = task {
1776            if let Err(err) = task.await {
1777                warn!("Error when initializing backups: {err}");
1778            }
1779        }
1780    }
1781
1782    /// Upload the device keys and initial set of one-time keys to the server.
1783    ///
1784    /// This should only be called when the user logs in for the first time,
1785    /// the method will ensure that other devices see our own device as an
1786    /// end-to-end encryption enabled one.
1787    ///
1788    /// **Warning**: Do not use this method if we're already calling
1789    /// [`Client::send_outgoing_request()`]. This method is intended for
1790    /// explicitly uploading the device keys before starting a sync.
1791    pub(crate) async fn ensure_device_keys_upload(&self) -> Result<()> {
1792        let olm = self.client.olm_machine().await;
1793        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1794
1795        if let Some((request_id, request)) = olm.upload_device_keys().await? {
1796            self.client.keys_upload(&request_id, &request).await?;
1797
1798            let (request_id, request) = olm.query_keys_for_users([olm.user_id()]);
1799            self.client.keys_query(&request_id, request.device_keys).await?;
1800        }
1801
1802        Ok(())
1803    }
1804
1805    pub(crate) async fn update_state_after_keys_query(&self, response: &get_keys::v3::Response) {
1806        self.recovery().update_state_after_keys_query(response).await;
1807
1808        // Only update the verification_state if our own devices changed
1809        if let Some(user_id) = self.client.user_id() {
1810            let contains_own_device = response.device_keys.contains_key(user_id);
1811
1812            if contains_own_device {
1813                self.update_verification_state().await;
1814            }
1815        }
1816    }
1817
1818    async fn update_verification_state(&self) {
1819        match self.get_own_device().await {
1820            Ok(device) => {
1821                if let Some(device) = device {
1822                    let is_verified = device.is_cross_signed_by_owner();
1823
1824                    if is_verified {
1825                        self.client.inner.verification_state.set(VerificationState::Verified);
1826                    } else {
1827                        self.client.inner.verification_state.set(VerificationState::Unverified);
1828                    }
1829                } else {
1830                    warn!("Couldn't find out own device in the store.");
1831                    self.client.inner.verification_state.set(VerificationState::Unknown);
1832                }
1833            }
1834            Err(error) => {
1835                warn!("Failed retrieving own device: {error}");
1836                self.client.inner.verification_state.set(VerificationState::Unknown);
1837            }
1838        }
1839    }
1840
1841    /// Encrypts then send the given content via the `/sendToDevice` end-point
1842    /// using Olm encryption.
1843    ///
1844    /// If there are a lot of recipient devices multiple `/sendToDevice`
1845    /// requests might be sent out.
1846    ///
1847    /// # Returns
1848    /// A list of failures. The list of devices that couldn't get the messages.
1849    #[cfg(feature = "experimental-send-custom-to-device")]
1850    pub async fn encrypt_and_send_raw_to_device(
1851        &self,
1852        recipient_devices: Vec<&Device>,
1853        event_type: &str,
1854        content: Raw<AnyToDeviceEventContent>,
1855        share_strategy: CollectStrategy,
1856    ) -> Result<Vec<(OwnedUserId, OwnedDeviceId)>> {
1857        let users = recipient_devices.iter().map(|device| device.user_id());
1858
1859        // Will claim one-time-key for users that needs it
1860        // TODO: For later optimisation: This will establish missing olm sessions with
1861        // all this users devices, but we just want for some devices.
1862        self.client.claim_one_time_keys(users).await?;
1863
1864        let olm = self.client.olm_machine().await;
1865        let olm = olm.as_ref().expect("Olm machine wasn't started");
1866
1867        let (requests, withhelds) = olm
1868            .encrypt_content_for_devices(
1869                recipient_devices.into_iter().map(|d| d.deref().clone()).collect(),
1870                event_type,
1871                &content
1872                    .deserialize_as::<serde_json::Value>()
1873                    .expect("Deserialize as Value will always work"),
1874                share_strategy,
1875            )
1876            .await?;
1877
1878        let mut failures: Vec<(OwnedUserId, OwnedDeviceId)> = Default::default();
1879
1880        // Push the withhelds in the failures
1881        withhelds.iter().for_each(|(d, _)| {
1882            failures.push((d.user_id().to_owned(), d.device_id().to_owned()));
1883        });
1884
1885        // TODO: parallelize that? it's already grouping 250 devices per chunk.
1886        for request in requests {
1887            let ruma_request = RumaToDeviceRequest::new_raw(
1888                request.event_type.clone(),
1889                request.txn_id.clone(),
1890                request.messages.clone(),
1891            );
1892
1893            let send_result = self
1894                .client
1895                .send_inner(ruma_request, Some(RequestConfig::short_retry()), Default::default())
1896                .await;
1897
1898            // If the sending failed we need to collect the failures to report them
1899            if send_result.is_err() {
1900                // Mark the sending as failed
1901                for (user_id, device_map) in request.messages {
1902                    for device_id in device_map.keys() {
1903                        match device_id {
1904                            DeviceIdOrAllDevices::DeviceId(device_id) => {
1905                                failures.push((user_id.clone(), device_id.to_owned()));
1906                            }
1907                            DeviceIdOrAllDevices::AllDevices => {
1908                                // Cannot happen in this case
1909                            }
1910                        }
1911                    }
1912                }
1913            }
1914        }
1915
1916        Ok(failures)
1917    }
1918}
1919
1920#[cfg(all(test, not(target_family = "wasm")))]
1921mod tests {
1922    use std::{
1923        ops::Not,
1924        sync::{
1925            atomic::{AtomicBool, Ordering},
1926            Arc,
1927        },
1928        time::Duration,
1929    };
1930
1931    use matrix_sdk_test::{
1932        async_test, event_factory::EventFactory, test_json, JoinedRoomBuilder, StateTestEvent,
1933        SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
1934    };
1935    use ruma::{
1936        event_id,
1937        events::{reaction::ReactionEventContent, relation::Annotation},
1938        user_id,
1939    };
1940    use serde_json::json;
1941    use wiremock::{
1942        matchers::{header, method, path_regex},
1943        Mock, MockServer, Request, ResponseTemplate,
1944    };
1945
1946    use crate::{
1947        assert_next_matches_with_timeout,
1948        config::RequestConfig,
1949        encryption::{OAuthCrossSigningResetInfo, VerificationState},
1950        test_utils::{
1951            client::mock_matrix_session, logged_in_client, no_retry_test_client, set_client_session,
1952        },
1953        Client,
1954    };
1955
1956    #[async_test]
1957    async fn test_reaction_sending() {
1958        let server = MockServer::start().await;
1959        let client = logged_in_client(Some(server.uri())).await;
1960
1961        let event_id = event_id!("$2:example.org");
1962
1963        Mock::given(method("GET"))
1964            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
1965            .and(header("authorization", "Bearer 1234"))
1966            .respond_with(
1967                ResponseTemplate::new(200)
1968                    .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
1969            )
1970            .mount(&server)
1971            .await;
1972
1973        Mock::given(method("PUT"))
1974            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/m\.reaction/.*".to_owned()))
1975            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1976                "event_id": event_id,
1977            })))
1978            .mount(&server)
1979            .await;
1980
1981        let response = SyncResponseBuilder::default()
1982            .add_joined_room(
1983                JoinedRoomBuilder::default()
1984                    .add_state_event(StateTestEvent::Member)
1985                    .add_state_event(StateTestEvent::PowerLevels)
1986                    .add_state_event(StateTestEvent::Encryption),
1987            )
1988            .build_sync_response();
1989
1990        client.base_client().receive_sync_response(response).await.unwrap();
1991
1992        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
1993        assert!(room
1994            .latest_encryption_state()
1995            .await
1996            .expect("Getting encryption state")
1997            .is_encrypted());
1998
1999        let event_id = event_id!("$1:example.org");
2000        let reaction = ReactionEventContent::new(Annotation::new(event_id.into(), "🐈".to_owned()));
2001        room.send(reaction).await.expect("Sending the reaction should not fail");
2002
2003        room.send_raw("m.reaction", json!({})).await.expect("Sending the reaction should not fail");
2004    }
2005
2006    #[async_test]
2007    async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
2008        let server = MockServer::start().await;
2009        let client = logged_in_client(Some(server.uri())).await;
2010        // This is the user ID that is inside MemberAdditional.
2011        // Note the confusing username, so we can share
2012        // GlobalAccountDataTestEvent::Direct with the invited test.
2013        let user_id = user_id!("@invited:localhost");
2014
2015        // When we receive a sync response saying "invited" is invited to a DM
2016        let f = EventFactory::new();
2017        let response = SyncResponseBuilder::default()
2018            .add_joined_room(
2019                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberAdditional),
2020            )
2021            .add_global_account_data(
2022                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
2023            )
2024            .build_sync_response();
2025        client.base_client().receive_sync_response(response).await.unwrap();
2026
2027        // Then get_dm_room finds this room
2028        let found_room = client.get_dm_room(user_id).expect("DM not found!");
2029        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
2030    }
2031
2032    #[async_test]
2033    async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
2034        let server = MockServer::start().await;
2035        let client = logged_in_client(Some(server.uri())).await;
2036        // This is the user ID that is inside MemberInvite
2037        let user_id = user_id!("@invited:localhost");
2038
2039        // When we receive a sync response saying "invited" is invited to a DM
2040        let f = EventFactory::new();
2041        let response = SyncResponseBuilder::default()
2042            .add_joined_room(
2043                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberInvite),
2044            )
2045            .add_global_account_data(
2046                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
2047            )
2048            .build_sync_response();
2049        client.base_client().receive_sync_response(response).await.unwrap();
2050
2051        // Then get_dm_room finds this room
2052        let found_room = client.get_dm_room(user_id).expect("DM not found!");
2053        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
2054    }
2055
2056    #[async_test]
2057    async fn test_get_dm_room_still_finds_left_room() {
2058        // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
2059        // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
2060
2061        let server = MockServer::start().await;
2062        let client = logged_in_client(Some(server.uri())).await;
2063        // This is the user ID that is inside MemberAdditional.
2064        // Note the confusing username, so we can share
2065        // GlobalAccountDataTestEvent::Direct with the invited test.
2066        let user_id = user_id!("@invited:localhost");
2067
2068        // When we receive a sync response saying "invited" is invited to a DM
2069        let f = EventFactory::new();
2070        let response = SyncResponseBuilder::default()
2071            .add_joined_room(
2072                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberLeave),
2073            )
2074            .add_global_account_data(
2075                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
2076            )
2077            .build_sync_response();
2078        client.base_client().receive_sync_response(response).await.unwrap();
2079
2080        // Then get_dm_room finds this room
2081        let found_room = client.get_dm_room(user_id).expect("DM not found!");
2082        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
2083    }
2084
2085    #[cfg(feature = "sqlite")]
2086    #[async_test]
2087    async fn test_generation_counter_invalidates_olm_machine() {
2088        // Create two clients using the same sqlite database.
2089
2090        use matrix_sdk_base::store::RoomLoadSettings;
2091        let sqlite_path = std::env::temp_dir().join("generation_counter_sqlite.db");
2092        let session = mock_matrix_session();
2093
2094        let client1 = Client::builder()
2095            .homeserver_url("http://localhost:1234")
2096            .request_config(RequestConfig::new().disable_retry())
2097            .sqlite_store(&sqlite_path, None)
2098            .build()
2099            .await
2100            .unwrap();
2101        client1
2102            .matrix_auth()
2103            .restore_session(session.clone(), RoomLoadSettings::default())
2104            .await
2105            .unwrap();
2106
2107        let client2 = Client::builder()
2108            .homeserver_url("http://localhost:1234")
2109            .request_config(RequestConfig::new().disable_retry())
2110            .sqlite_store(sqlite_path, None)
2111            .build()
2112            .await
2113            .unwrap();
2114        client2.matrix_auth().restore_session(session, RoomLoadSettings::default()).await.unwrap();
2115
2116        // When the lock isn't enabled, any attempt at locking won't return a guard.
2117        let guard = client1.encryption().try_lock_store_once().await.unwrap();
2118        assert!(guard.is_none());
2119
2120        client1.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2121        client2.encryption().enable_cross_process_store_lock("client2".to_owned()).await.unwrap();
2122
2123        // One client can take the lock.
2124        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2125        assert!(acquired1.is_some());
2126
2127        // Keep the olm machine, so we can see if it's changed later, by comparing Arcs.
2128        let initial_olm_machine =
2129            client1.olm_machine().await.clone().expect("must have an olm machine");
2130
2131        // Also enable backup to check that new machine has the same backup keys.
2132        let decryption_key = matrix_sdk_base::crypto::store::types::BackupDecryptionKey::new()
2133            .expect("Can't create new recovery key");
2134        let backup_key = decryption_key.megolm_v1_public_key();
2135        backup_key.set_version("1".to_owned());
2136        initial_olm_machine
2137            .backup_machine()
2138            .save_decryption_key(Some(decryption_key.to_owned()), Some("1".to_owned()))
2139            .await
2140            .expect("Should save");
2141
2142        initial_olm_machine.backup_machine().enable_backup_v1(backup_key.clone()).await.unwrap();
2143
2144        assert!(client1.encryption().backups().are_enabled().await);
2145
2146        // The other client can't take the lock too.
2147        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2148        assert!(acquired2.is_none());
2149
2150        // Now have the first client release the lock,
2151        drop(acquired1);
2152        tokio::time::sleep(Duration::from_millis(100)).await;
2153
2154        // And re-take it.
2155        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2156        assert!(acquired1.is_some());
2157
2158        // In that case, the Olm Machine shouldn't change.
2159        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2160        assert!(initial_olm_machine.same_as(&olm_machine));
2161
2162        // Ok, release again.
2163        drop(acquired1);
2164        tokio::time::sleep(Duration::from_millis(100)).await;
2165
2166        // Client2 can acquire the lock.
2167        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2168        assert!(acquired2.is_some());
2169
2170        // And then release it.
2171        drop(acquired2);
2172        tokio::time::sleep(Duration::from_millis(100)).await;
2173
2174        // Client1 can acquire it again,
2175        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2176        assert!(acquired1.is_some());
2177
2178        // But now its olm machine has been invalidated and thus regenerated!
2179        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2180
2181        assert!(!initial_olm_machine.same_as(&olm_machine));
2182
2183        let backup_key_new = olm_machine.backup_machine().get_backup_keys().await.unwrap();
2184        assert!(backup_key_new.decryption_key.is_some());
2185        assert_eq!(
2186            backup_key_new.decryption_key.unwrap().megolm_v1_public_key().to_base64(),
2187            backup_key.to_base64()
2188        );
2189        assert!(client1.encryption().backups().are_enabled().await);
2190    }
2191
2192    #[cfg(feature = "sqlite")]
2193    #[async_test]
2194    async fn test_generation_counter_no_spurious_invalidation() {
2195        // Create two clients using the same sqlite database.
2196
2197        use matrix_sdk_base::store::RoomLoadSettings;
2198        let sqlite_path =
2199            std::env::temp_dir().join("generation_counter_no_spurious_invalidations.db");
2200        let session = mock_matrix_session();
2201
2202        let client = Client::builder()
2203            .homeserver_url("http://localhost:1234")
2204            .request_config(RequestConfig::new().disable_retry())
2205            .sqlite_store(&sqlite_path, None)
2206            .build()
2207            .await
2208            .unwrap();
2209        client
2210            .matrix_auth()
2211            .restore_session(session.clone(), RoomLoadSettings::default())
2212            .await
2213            .unwrap();
2214
2215        let initial_olm_machine = client.olm_machine().await.as_ref().unwrap().clone();
2216
2217        client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2218
2219        // Enabling the lock doesn't update the olm machine.
2220        let after_enabling_lock = client.olm_machine().await.as_ref().unwrap().clone();
2221        assert!(initial_olm_machine.same_as(&after_enabling_lock));
2222
2223        {
2224            // Simulate that another client hold the lock before.
2225            let client2 = Client::builder()
2226                .homeserver_url("http://localhost:1234")
2227                .request_config(RequestConfig::new().disable_retry())
2228                .sqlite_store(sqlite_path, None)
2229                .build()
2230                .await
2231                .unwrap();
2232            client2
2233                .matrix_auth()
2234                .restore_session(session, RoomLoadSettings::default())
2235                .await
2236                .unwrap();
2237
2238            client2
2239                .encryption()
2240                .enable_cross_process_store_lock("client2".to_owned())
2241                .await
2242                .unwrap();
2243
2244            let guard = client2.encryption().spin_lock_store(None).await.unwrap();
2245            assert!(guard.is_some());
2246
2247            drop(guard);
2248            tokio::time::sleep(Duration::from_millis(100)).await;
2249        }
2250
2251        {
2252            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2253            assert!(acquired.is_some());
2254        }
2255
2256        // Taking the lock the first time will update the olm machine.
2257        let after_taking_lock_first_time = client.olm_machine().await.as_ref().unwrap().clone();
2258        assert!(!initial_olm_machine.same_as(&after_taking_lock_first_time));
2259
2260        {
2261            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2262            assert!(acquired.is_some());
2263        }
2264
2265        // Re-taking the lock doesn't update the olm machine.
2266        let after_taking_lock_second_time = client.olm_machine().await.as_ref().unwrap().clone();
2267        assert!(after_taking_lock_first_time.same_as(&after_taking_lock_second_time));
2268    }
2269
2270    #[async_test]
2271    async fn test_update_verification_state_is_updated_before_any_requests_happen() {
2272        // Given a client and a server
2273        let client = no_retry_test_client(None).await;
2274        let server = MockServer::start().await;
2275
2276        // When we subscribe to its verification state
2277        let mut verification_state = client.encryption().verification_state();
2278
2279        // We can get its initial value, and it's Unknown
2280        assert_next_matches_with_timeout!(verification_state, VerificationState::Unknown);
2281
2282        // We set up a mocked request to check this endpoint is not called before
2283        // reading the new state
2284        let keys_requested = Arc::new(AtomicBool::new(false));
2285        let inner_bool = keys_requested.clone();
2286
2287        Mock::given(method("GET"))
2288            .and(path_regex(
2289                r"/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
2290            ))
2291            .respond_with(move |_req: &Request| {
2292                inner_bool.fetch_or(true, Ordering::SeqCst);
2293                ResponseTemplate::new(200).set_body_json(json!({}))
2294            })
2295            .mount(&server)
2296            .await;
2297
2298        // When the session is initialised and the encryption tasks spawn
2299        set_client_session(&client).await;
2300
2301        // Then we can get an updated value without waiting for any network requests
2302        assert!(keys_requested.load(Ordering::SeqCst).not());
2303        assert_next_matches_with_timeout!(verification_state, VerificationState::Unverified);
2304    }
2305
2306    #[test]
2307    fn test_oauth_reset_info_from_uiaa_info() {
2308        let auth_info = json!({
2309            "session": "dummy",
2310            "flows": [
2311                {
2312                    "stages": [
2313                        "org.matrix.cross_signing_reset"
2314                    ]
2315                }
2316            ],
2317            "params": {
2318                "org.matrix.cross_signing_reset": {
2319                    "url": "https://example.org/account/account?action=org.matrix.cross_signing_reset"
2320                }
2321            },
2322            "msg": "To reset..."
2323        });
2324
2325        let auth_info = serde_json::from_value(auth_info)
2326            .expect("We should be able to deserialize the UiaaInfo");
2327        OAuthCrossSigningResetInfo::from_auth_info(&auth_info)
2328            .expect("We should be able to fetch the cross-signing reset info from the auth info");
2329    }
2330}