matrix_sdk/encryption/
mod.rs

1// Copyright 2021 The Matrix.org Foundation C.I.C.
2// Copyright 2021 Damir Jelić
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![doc = include_str!("../docs/encryption.md")]
17#![cfg_attr(target_arch = "wasm32", allow(unused_imports))]
18
19use std::{
20    collections::{BTreeMap, HashSet},
21    io::{Cursor, Read, Write},
22    iter,
23    path::PathBuf,
24    sync::Arc,
25};
26
27use eyeball::{SharedObservable, Subscriber};
28use futures_core::Stream;
29use futures_util::{
30    future::try_join,
31    stream::{self, StreamExt},
32};
33use matrix_sdk_base::crypto::{
34    store::RoomKeyInfo,
35    types::requests::{
36        OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
37    },
38    CrossSigningBootstrapRequests, OlmMachine,
39};
40use matrix_sdk_common::{executor::spawn, locks::Mutex as StdMutex};
41use ruma::{
42    api::client::{
43        keys::{
44            get_keys, upload_keys, upload_signatures::v3::Request as UploadSignaturesRequest,
45            upload_signing_keys::v3::Request as UploadSigningKeysRequest,
46        },
47        message::send_message_event,
48        to_device::send_event_to_device::v3::{
49            Request as RumaToDeviceRequest, Response as ToDeviceResponse,
50        },
51        uiaa::{AuthData, UiaaInfo},
52    },
53    assign,
54    events::{
55        direct::DirectUserIdentifier,
56        room::{MediaSource, ThumbnailInfo},
57    },
58    DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, TransactionId, UserId,
59};
60use serde::Deserialize;
61use tokio::sync::{Mutex, RwLockReadGuard};
62use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
63use tracing::{debug, error, instrument, trace, warn};
64use url::Url;
65use vodozemac::Curve25519PublicKey;
66
67use self::{
68    backups::{types::BackupClientState, Backups},
69    futures::UploadEncryptedFile,
70    identities::{Device, DeviceUpdates, IdentityUpdates, UserDevices, UserIdentity},
71    recovery::{Recovery, RecoveryState},
72    secret_storage::SecretStorage,
73    tasks::{BackupDownloadTask, BackupUploadingTask, ClientTasks},
74    verification::{SasVerification, Verification, VerificationRequest},
75};
76use crate::{
77    attachment::Thumbnail,
78    client::{ClientInner, WeakClient},
79    error::HttpResult,
80    store_locks::CrossProcessStoreLockGuard,
81    Client, Error, HttpError, Result, Room, TransmissionProgress,
82};
83
84pub mod backups;
85pub mod futures;
86pub mod identities;
87pub mod recovery;
88pub mod secret_storage;
89pub(crate) mod tasks;
90pub mod verification;
91
92pub use matrix_sdk_base::crypto::{
93    olm::{
94        SessionCreationError as MegolmSessionCreationError,
95        SessionExportError as OlmSessionExportError,
96    },
97    vodozemac, CrossSigningStatus, CryptoStoreError, DecryptorError, EventError, KeyExportError,
98    LocalTrust, MediaEncryptionInfo, MegolmError, OlmError, RoomKeyImportResult, SecretImportError,
99    SessionCreationError, SignatureError, VERSION,
100};
101
102pub use crate::error::RoomKeyImportError;
103
104/// All the data related to the encryption state.
105pub(crate) struct EncryptionData {
106    /// Background tasks related to encryption (key backup, initialization
107    /// tasks, etc.).
108    pub tasks: StdMutex<ClientTasks>,
109
110    /// End-to-end encryption settings.
111    pub encryption_settings: EncryptionSettings,
112
113    /// All state related to key backup.
114    pub backup_state: BackupClientState,
115
116    /// All state related to secret storage recovery.
117    pub recovery_state: SharedObservable<RecoveryState>,
118}
119
120impl EncryptionData {
121    pub fn new(encryption_settings: EncryptionSettings) -> Self {
122        Self {
123            encryption_settings,
124
125            tasks: StdMutex::new(Default::default()),
126            backup_state: Default::default(),
127            recovery_state: Default::default(),
128        }
129    }
130
131    pub fn initialize_room_key_tasks(&self, client: &Arc<ClientInner>) {
132        let weak_client = WeakClient::from_inner(client);
133
134        let mut tasks = self.tasks.lock();
135        tasks.upload_room_keys = Some(BackupUploadingTask::new(weak_client.clone()));
136
137        if self.encryption_settings.backup_download_strategy
138            == BackupDownloadStrategy::AfterDecryptionFailure
139        {
140            tasks.download_room_keys = Some(BackupDownloadTask::new(weak_client));
141        }
142    }
143
144    /// Initialize the background task which listens for changes in the
145    /// [`backups::BackupState`] and updataes the [`recovery::RecoveryState`].
146    ///
147    /// This should happen after the usual tasks have been set up and after the
148    /// E2EE initialization tasks have been set up.
149    pub fn initialize_recovery_state_update_task(&self, client: &Client) {
150        let mut guard = self.tasks.lock();
151
152        let future = Recovery::update_state_after_backup_state_change(client);
153        let join_handle = spawn(future);
154
155        guard.update_recovery_state_after_backup = Some(join_handle);
156    }
157}
158
159/// Settings for end-to-end encryption features.
160#[derive(Clone, Copy, Debug, Default)]
161pub struct EncryptionSettings {
162    /// Automatically bootstrap cross-signing for a user once they're logged, in
163    /// case it's not already done yet.
164    ///
165    /// This requires to login with a username and password, or that MSC3967 is
166    /// enabled on the server, as of 2023-10-20.
167    pub auto_enable_cross_signing: bool,
168
169    /// Select a strategy to download room keys from the backup, by default room
170    /// keys won't be downloaded from the backup automatically.
171    ///
172    /// Take a look at the [`BackupDownloadStrategy`] enum for more options.
173    pub backup_download_strategy: BackupDownloadStrategy,
174
175    /// Automatically create a backup version if no backup exists.
176    pub auto_enable_backups: bool,
177}
178
179/// Settings for end-to-end encryption features.
180#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
181#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
182pub enum BackupDownloadStrategy {
183    /// Automatically download all room keys from the backup when the backup
184    /// recovery key has been received. The backup recovery key can be received
185    /// in two ways:
186    ///
187    /// 1. Received as a `m.secret.send` to-device event, after a successful
188    ///    interactive verification.
189    /// 2. Imported from secret storage (4S) using the
190    ///    [`SecretStore::import_secrets()`] method.
191    ///
192    /// [`SecretStore::import_secrets()`]: crate::encryption::secret_storage::SecretStore::import_secrets
193    OneShot,
194
195    /// Attempt to download a single room key if an event fails to be decrypted.
196    AfterDecryptionFailure,
197
198    /// Don't download any room keys automatically. The user can manually
199    /// download room keys using the [`Backups::download_room_key()`] methods.
200    ///
201    /// This is the default option.
202    #[default]
203    Manual,
204}
205
206/// The verification state of our own device
207///
208/// This enum tells us if our own user identity trusts these devices, in other
209/// words it tells us if the user identity has signed the device.
210#[derive(Clone, Copy, Debug, Eq, PartialEq)]
211pub enum VerificationState {
212    /// The verification state is unknown for now.
213    Unknown,
214    /// The device is considered to be verified, it has been signed by its user
215    /// identity.
216    Verified,
217    /// The device is unverified.
218    Unverified,
219}
220
221/// Wraps together a `CrossProcessLockStoreGuard` and a generation number.
222#[derive(Debug)]
223pub struct CrossProcessLockStoreGuardWithGeneration {
224    _guard: CrossProcessStoreLockGuard,
225    generation: u64,
226}
227
228impl CrossProcessLockStoreGuardWithGeneration {
229    /// Return the Crypto Store generation associated with this store lock.
230    pub fn generation(&self) -> u64 {
231        self.generation
232    }
233}
234
235/// A stateful struct remembering the cross-signing keys we need to upload.
236///
237/// Since the `/_matrix/client/v3/keys/device_signing/upload` might require
238/// additional authentication, this struct will contain information on the type
239/// of authentication the user needs to complete before the upload might be
240/// continued.
241///
242/// More info can be found in the [spec].
243///
244/// [spec]: https://spec.matrix.org/v1.11/client-server-api/#post_matrixclientv3keysdevice_signingupload
245#[derive(Debug)]
246pub struct CrossSigningResetHandle {
247    client: Client,
248    upload_request: UploadSigningKeysRequest,
249    signatures_request: UploadSignaturesRequest,
250    auth_type: CrossSigningResetAuthType,
251    is_cancelled: Mutex<bool>,
252}
253
254impl CrossSigningResetHandle {
255    /// Set up a new `CrossSigningResetHandle`.
256    pub fn new(
257        client: Client,
258        upload_request: UploadSigningKeysRequest,
259        signatures_request: UploadSignaturesRequest,
260        auth_type: CrossSigningResetAuthType,
261    ) -> Self {
262        Self {
263            client,
264            upload_request,
265            signatures_request,
266            auth_type,
267            is_cancelled: Mutex::new(false),
268        }
269    }
270
271    /// Get the [`CrossSigningResetAuthType`] this cross-signing reset process
272    /// is using.
273    pub fn auth_type(&self) -> &CrossSigningResetAuthType {
274        &self.auth_type
275    }
276
277    /// Continue the cross-signing reset by either waiting for the
278    /// authentication to be done on the side of the OAuth 2.0 server or by
279    /// providing additional [`AuthData`] the homeserver requires.
280    pub async fn auth(&self, auth: Option<AuthData>) -> Result<()> {
281        let mut upload_request = self.upload_request.clone();
282        upload_request.auth = auth;
283
284        while let Err(e) = self.client.send(upload_request.clone()).await {
285            if *self.is_cancelled.lock().await {
286                return Ok(());
287            }
288
289            match e.as_uiaa_response() {
290                Some(uiaa_info) => {
291                    if uiaa_info.auth_error.is_some() {
292                        return Err(e.into());
293                    }
294                }
295                None => return Err(e.into()),
296            }
297        }
298
299        self.client.send(self.signatures_request.clone()).await?;
300
301        Ok(())
302    }
303
304    /// Cancel the ongoing identity reset process
305    pub async fn cancel(&self) {
306        *self.is_cancelled.lock().await = true;
307    }
308}
309
310/// information about the additional authentication that is required before the
311/// cross-signing keys can be uploaded.
312#[derive(Debug, Clone)]
313pub enum CrossSigningResetAuthType {
314    /// The homeserver requires user-interactive authentication.
315    Uiaa(UiaaInfo),
316    /// OAuth 2.0 is used for authentication and the user needs to open a URL to
317    /// approve the upload of cross-signing keys.
318    OAuth(OAuthCrossSigningResetInfo),
319}
320
321impl CrossSigningResetAuthType {
322    fn new(error: &HttpError) -> Result<Option<Self>> {
323        if let Some(auth_info) = error.as_uiaa_response() {
324            if let Ok(auth_info) = OAuthCrossSigningResetInfo::from_auth_info(auth_info) {
325                Ok(Some(CrossSigningResetAuthType::OAuth(auth_info)))
326            } else {
327                Ok(Some(CrossSigningResetAuthType::Uiaa(auth_info.clone())))
328            }
329        } else {
330            Ok(None)
331        }
332    }
333}
334
335/// OAuth 2.0 specific information about the required authentication for the
336/// upload of cross-signing keys.
337#[derive(Debug, Clone, Deserialize)]
338pub struct OAuthCrossSigningResetInfo {
339    /// The URL where the user can approve the reset of the cross-signing keys.
340    pub approval_url: Url,
341}
342
343impl OAuthCrossSigningResetInfo {
344    fn from_auth_info(auth_info: &UiaaInfo) -> Result<Self> {
345        let parameters =
346            serde_json::from_str::<OAuthCrossSigningResetUiaaParameters>(auth_info.params.get())?;
347
348        Ok(OAuthCrossSigningResetInfo { approval_url: parameters.reset.url })
349    }
350}
351
352/// The parsed `parameters` part of a [`ruma::api::client::uiaa::UiaaInfo`]
353/// response
354#[derive(Debug, Deserialize)]
355struct OAuthCrossSigningResetUiaaParameters {
356    /// The URL where the user can approve the reset of the cross-signing keys.
357    #[serde(rename = "org.matrix.cross_signing_reset")]
358    reset: OAuthCrossSigningResetUiaaResetParameter,
359}
360
361/// The `org.matrix.cross_signing_reset` part of the Uiaa response `parameters``
362/// dictionary.
363#[derive(Debug, Deserialize)]
364struct OAuthCrossSigningResetUiaaResetParameter {
365    /// The URL where the user can approve the reset of the cross-signing keys.
366    url: Url,
367}
368
369impl Client {
370    pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
371        self.base_client().olm_machine().await
372    }
373
374    pub(crate) async fn mark_request_as_sent(
375        &self,
376        request_id: &TransactionId,
377        response: impl Into<matrix_sdk_base::crypto::types::requests::AnyIncomingResponse<'_>>,
378    ) -> Result<(), matrix_sdk_base::Error> {
379        Ok(self
380            .olm_machine()
381            .await
382            .as_ref()
383            .expect(
384                "We should have an olm machine once we try to mark E2EE related requests as sent",
385            )
386            .mark_request_as_sent(request_id, response)
387            .await?)
388    }
389
390    /// Query the server for users device keys.
391    ///
392    /// # Panics
393    ///
394    /// Panics if no key query needs to be done.
395    #[instrument(skip(self, device_keys))]
396    pub(crate) async fn keys_query(
397        &self,
398        request_id: &TransactionId,
399        device_keys: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
400    ) -> Result<get_keys::v3::Response> {
401        let request = assign!(get_keys::v3::Request::new(), { device_keys });
402
403        let response = self.send(request).await?;
404        self.mark_request_as_sent(request_id, &response).await?;
405        self.encryption().update_state_after_keys_query(&response).await;
406
407        Ok(response)
408    }
409
410    /// Construct a [`EncryptedFile`][ruma::events::room::EncryptedFile] by
411    /// encrypting and uploading a provided reader.
412    ///
413    /// # Arguments
414    ///
415    /// * `content_type` - The content type of the file.
416    /// * `reader` - The reader that should be encrypted and uploaded.
417    ///
418    /// # Examples
419    ///
420    /// ```no_run
421    /// # use matrix_sdk::Client;
422    /// # use url::Url;
423    /// # use matrix_sdk::ruma::{room_id, OwnedRoomId};
424    /// use serde::{Deserialize, Serialize};
425    /// use matrix_sdk::ruma::events::{macros::EventContent, room::EncryptedFile};
426    ///
427    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
428    /// #[ruma_event(type = "com.example.custom", kind = MessageLike)]
429    /// struct CustomEventContent {
430    ///     encrypted_file: EncryptedFile,
431    /// }
432    ///
433    /// # async {
434    /// # let homeserver = Url::parse("http://example.com")?;
435    /// # let client = Client::new(homeserver).await?;
436    /// # let room = client.get_room(&room_id!("!test:example.com")).unwrap();
437    /// let mut reader = std::io::Cursor::new(b"Hello, world!");
438    /// let encrypted_file = client.upload_encrypted_file(&mime::TEXT_PLAIN, &mut reader).await?;
439    ///
440    /// room.send(CustomEventContent { encrypted_file }).await?;
441    /// # anyhow::Ok(()) };
442    /// ```
443    pub fn upload_encrypted_file<'a, R: Read + ?Sized + 'a>(
444        &'a self,
445        content_type: &'a mime::Mime,
446        reader: &'a mut R,
447    ) -> UploadEncryptedFile<'a, R> {
448        UploadEncryptedFile::new(self, content_type, reader)
449    }
450
451    /// Encrypt and upload the file and thumbnails, and return the source
452    /// information.
453    pub(crate) async fn upload_encrypted_media_and_thumbnail(
454        &self,
455        content_type: &mime::Mime,
456        data: &[u8],
457        thumbnail: Option<Thumbnail>,
458        send_progress: SharedObservable<TransmissionProgress>,
459    ) -> Result<(MediaSource, Option<(MediaSource, Box<ThumbnailInfo>)>)> {
460        let upload_thumbnail = self.upload_encrypted_thumbnail(thumbnail, send_progress.clone());
461
462        let upload_attachment = async {
463            let mut cursor = Cursor::new(data);
464            self.upload_encrypted_file(content_type, &mut cursor)
465                .with_send_progress_observable(send_progress)
466                .await
467        };
468
469        let (thumbnail, file) = try_join(upload_thumbnail, upload_attachment).await?;
470
471        Ok((MediaSource::Encrypted(Box::new(file)), thumbnail))
472    }
473
474    /// Uploads an encrypted thumbnail to the media repository, and returns
475    /// its source and extra information.
476    async fn upload_encrypted_thumbnail(
477        &self,
478        thumbnail: Option<Thumbnail>,
479        send_progress: SharedObservable<TransmissionProgress>,
480    ) -> Result<Option<(MediaSource, Box<ThumbnailInfo>)>> {
481        let Some(thumbnail) = thumbnail else {
482            return Ok(None);
483        };
484
485        let (data, content_type, thumbnail_info) = thumbnail.into_parts();
486        let mut cursor = Cursor::new(data);
487
488        let file = self
489            .upload_encrypted_file(&content_type, &mut cursor)
490            .with_send_progress_observable(send_progress)
491            .await?;
492
493        Ok(Some((MediaSource::Encrypted(Box::new(file)), thumbnail_info)))
494    }
495
496    /// Claim one-time keys creating new Olm sessions.
497    ///
498    /// # Arguments
499    ///
500    /// * `users` - The list of user/device pairs that we should claim keys for.
501    pub(crate) async fn claim_one_time_keys(
502        &self,
503        users: impl Iterator<Item = &UserId>,
504    ) -> Result<()> {
505        let _lock = self.locks().key_claim_lock.lock().await;
506
507        if let Some((request_id, request)) = self
508            .olm_machine()
509            .await
510            .as_ref()
511            .ok_or(Error::NoOlmMachine)?
512            .get_missing_sessions(users)
513            .await?
514        {
515            let response = self.send(request).await?;
516            self.mark_request_as_sent(&request_id, &response).await?;
517        }
518
519        Ok(())
520    }
521
522    /// Upload the E2E encryption keys.
523    ///
524    /// This uploads the long lived device keys as well as the required amount
525    /// of one-time keys.
526    ///
527    /// # Panics
528    ///
529    /// Panics if the client isn't logged in, or if no encryption keys need to
530    /// be uploaded.
531    #[instrument(skip(self, request))]
532    pub(crate) async fn keys_upload(
533        &self,
534        request_id: &TransactionId,
535        request: &upload_keys::v3::Request,
536    ) -> Result<upload_keys::v3::Response> {
537        debug!(
538            device_keys = request.device_keys.is_some(),
539            one_time_key_count = request.one_time_keys.len(),
540            "Uploading public encryption keys",
541        );
542
543        let response = self.send(request.clone()).await?;
544        self.mark_request_as_sent(request_id, &response).await?;
545
546        Ok(response)
547    }
548
549    pub(crate) async fn room_send_helper(
550        &self,
551        request: &RoomMessageRequest,
552    ) -> Result<send_message_event::v3::Response> {
553        let content = request.content.clone();
554        let txn_id = request.txn_id.clone();
555        let room_id = &request.room_id;
556
557        self.get_room(room_id)
558            .expect("Can't send a message to a room that isn't known to the store")
559            .send(content)
560            .with_transaction_id(txn_id)
561            .await
562    }
563
564    pub(crate) async fn send_to_device(
565        &self,
566        request: &ToDeviceRequest,
567    ) -> HttpResult<ToDeviceResponse> {
568        let request = RumaToDeviceRequest::new_raw(
569            request.event_type.clone(),
570            request.txn_id.clone(),
571            request.messages.clone(),
572        );
573
574        self.send(request).await
575    }
576
577    pub(crate) async fn send_verification_request(
578        &self,
579        request: OutgoingVerificationRequest,
580    ) -> Result<()> {
581        use matrix_sdk_base::crypto::types::requests::OutgoingVerificationRequest::*;
582
583        match request {
584            ToDevice(t) => {
585                self.send_to_device(&t).await?;
586            }
587            InRoom(r) => {
588                self.room_send_helper(&r).await?;
589            }
590        }
591
592        Ok(())
593    }
594
595    /// Get the existing DM room with the given user, if any.
596    pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
597        let rooms = self.joined_rooms();
598
599        // Find the room we share with the `user_id` and only with `user_id`
600        let room = rooms.into_iter().find(|r| {
601            let targets = r.direct_targets();
602            targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id))
603        });
604
605        trace!(?room, "Found room");
606        room
607    }
608
609    async fn send_outgoing_request(&self, r: OutgoingRequest) -> Result<()> {
610        use matrix_sdk_base::crypto::types::requests::AnyOutgoingRequest;
611
612        match r.request() {
613            AnyOutgoingRequest::KeysQuery(request) => {
614                self.keys_query(r.request_id(), request.device_keys.clone()).await?;
615            }
616            AnyOutgoingRequest::KeysUpload(request) => {
617                self.keys_upload(r.request_id(), request).await?;
618            }
619            AnyOutgoingRequest::ToDeviceRequest(request) => {
620                let response = self.send_to_device(request).await?;
621                self.mark_request_as_sent(r.request_id(), &response).await?;
622            }
623            AnyOutgoingRequest::SignatureUpload(request) => {
624                let response = self.send(request.clone()).await?;
625                self.mark_request_as_sent(r.request_id(), &response).await?;
626            }
627            AnyOutgoingRequest::RoomMessage(request) => {
628                let response = self.room_send_helper(request).await?;
629                self.mark_request_as_sent(r.request_id(), &response).await?;
630            }
631            AnyOutgoingRequest::KeysClaim(request) => {
632                let response = self.send(request.clone()).await?;
633                self.mark_request_as_sent(r.request_id(), &response).await?;
634            }
635        }
636
637        Ok(())
638    }
639
640    pub(crate) async fn send_outgoing_requests(&self) -> Result<()> {
641        const MAX_CONCURRENT_REQUESTS: usize = 20;
642
643        // This is needed because sometimes we need to automatically
644        // claim some one-time keys to unwedge an existing Olm session.
645        if let Err(e) = self.claim_one_time_keys(iter::empty()).await {
646            warn!("Error while claiming one-time keys {:?}", e);
647        }
648
649        let outgoing_requests = stream::iter(
650            self.olm_machine()
651                .await
652                .as_ref()
653                .ok_or(Error::NoOlmMachine)?
654                .outgoing_requests()
655                .await?,
656        )
657        .map(|r| self.send_outgoing_request(r));
658
659        let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS);
660
661        requests
662            .for_each(|r| async move {
663                match r {
664                    Ok(_) => (),
665                    Err(e) => warn!(error = ?e, "Error when sending out an outgoing E2EE request"),
666                }
667            })
668            .await;
669
670        Ok(())
671    }
672}
673
674#[cfg(any(feature = "testing", test))]
675impl Client {
676    /// Get the olm machine, for testing purposes only.
677    pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
678        self.olm_machine().await
679    }
680}
681
682/// A high-level API to manage the client's encryption.
683///
684/// To get this, use [`Client::encryption()`].
685#[derive(Debug, Clone)]
686pub struct Encryption {
687    /// The underlying client.
688    client: Client,
689}
690
691impl Encryption {
692    pub(crate) fn new(client: Client) -> Self {
693        Self { client }
694    }
695
696    /// Returns the current encryption settings for this client.
697    pub(crate) fn settings(&self) -> EncryptionSettings {
698        self.client.inner.e2ee.encryption_settings
699    }
700
701    /// Get the public ed25519 key of our own device. This is usually what is
702    /// called the fingerprint of the device.
703    pub async fn ed25519_key(&self) -> Option<String> {
704        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().ed25519.to_base64())
705    }
706
707    /// Get the public Curve25519 key of our own device.
708    pub async fn curve25519_key(&self) -> Option<Curve25519PublicKey> {
709        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().curve25519)
710    }
711
712    /// Get the current device creation timestamp.
713    pub async fn device_creation_timestamp(&self) -> MilliSecondsSinceUnixEpoch {
714        match self.get_own_device().await {
715            Ok(Some(device)) => device.first_time_seen_ts(),
716            // Should not happen, there should always be an own device
717            _ => MilliSecondsSinceUnixEpoch::now(),
718        }
719    }
720
721    #[cfg(not(target_arch = "wasm32"))]
722    pub(crate) async fn import_secrets_bundle(
723        &self,
724        bundle: &matrix_sdk_base::crypto::types::SecretsBundle,
725    ) -> Result<(), SecretImportError> {
726        let olm_machine = self.client.olm_machine().await;
727        let olm_machine =
728            olm_machine.as_ref().expect("This should only be called once we have an OlmMachine");
729
730        olm_machine.store().import_secrets_bundle(bundle).await
731    }
732
733    /// Get the status of the private cross signing keys.
734    ///
735    /// This can be used to check which private cross signing keys we have
736    /// stored locally.
737    pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
738        let olm = self.client.olm_machine().await;
739        let machine = olm.as_ref()?;
740        Some(machine.cross_signing_status().await)
741    }
742
743    /// Get all the tracked users we know about
744    ///
745    /// Tracked users are users for which we keep the device list of E2EE
746    /// capable devices up to date.
747    pub async fn tracked_users(&self) -> Result<HashSet<OwnedUserId>, CryptoStoreError> {
748        if let Some(machine) = self.client.olm_machine().await.as_ref() {
749            machine.tracked_users().await
750        } else {
751            Ok(HashSet::new())
752        }
753    }
754
755    /// Get a [`Subscriber`] for the [`VerificationState`].
756    ///
757    /// # Examples
758    ///
759    /// ```no_run
760    /// use matrix_sdk::{encryption, Client};
761    /// use url::Url;
762    ///
763    /// # async {
764    /// let homeserver = Url::parse("http://example.com")?;
765    /// let client = Client::new(homeserver).await?;
766    /// let mut subscriber = client.encryption().verification_state();
767    ///
768    /// let current_value = subscriber.get();
769    ///
770    /// println!("The current verification state is: {current_value:?}");
771    ///
772    /// if let Some(verification_state) = subscriber.next().await {
773    ///     println!("Received verification state update {:?}", verification_state)
774    /// }
775    /// # anyhow::Ok(()) };
776    /// ```
777    pub fn verification_state(&self) -> Subscriber<VerificationState> {
778        self.client.inner.verification_state.subscribe_reset()
779    }
780
781    /// Get a verification object with the given flow id.
782    pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
783        let olm = self.client.olm_machine().await;
784        let olm = olm.as_ref()?;
785        #[allow(clippy::bind_instead_of_map)]
786        olm.get_verification(user_id, flow_id).and_then(|v| match v {
787            matrix_sdk_base::crypto::Verification::SasV1(s) => {
788                Some(SasVerification { inner: s, client: self.client.clone() }.into())
789            }
790            #[cfg(feature = "qrcode")]
791            matrix_sdk_base::crypto::Verification::QrV1(qr) => {
792                Some(verification::QrVerification { inner: qr, client: self.client.clone() }.into())
793            }
794            _ => None,
795        })
796    }
797
798    /// Get a `VerificationRequest` object for the given user with the given
799    /// flow id.
800    pub async fn get_verification_request(
801        &self,
802        user_id: &UserId,
803        flow_id: impl AsRef<str>,
804    ) -> Option<VerificationRequest> {
805        let olm = self.client.olm_machine().await;
806        let olm = olm.as_ref()?;
807
808        olm.get_verification_request(user_id, flow_id)
809            .map(|r| VerificationRequest { inner: r, client: self.client.clone() })
810    }
811
812    /// Get a specific device of a user.
813    ///
814    /// # Arguments
815    ///
816    /// * `user_id` - The unique id of the user that the device belongs to.
817    ///
818    /// * `device_id` - The unique id of the device.
819    ///
820    /// Returns a `Device` if one is found and the crypto store didn't throw an
821    /// error.
822    ///
823    /// This will always return None if the client hasn't been logged in.
824    ///
825    /// # Examples
826    ///
827    /// ```no_run
828    /// # use matrix_sdk::{Client, ruma::{device_id, user_id}};
829    /// # use url::Url;
830    /// # async {
831    /// # let alice = user_id!("@alice:example.org");
832    /// # let homeserver = Url::parse("http://example.com")?;
833    /// # let client = Client::new(homeserver).await?;
834    /// if let Some(device) =
835    ///     client.encryption().get_device(alice, device_id!("DEVICEID")).await?
836    /// {
837    ///     println!("{:?}", device.is_verified());
838    ///
839    ///     if !device.is_verified() {
840    ///         let verification = device.request_verification().await?;
841    ///     }
842    /// }
843    /// # anyhow::Ok(()) };
844    /// ```
845    pub async fn get_device(
846        &self,
847        user_id: &UserId,
848        device_id: &DeviceId,
849    ) -> Result<Option<Device>, CryptoStoreError> {
850        let olm = self.client.olm_machine().await;
851        let Some(machine) = olm.as_ref() else { return Ok(None) };
852        let device = machine.get_device(user_id, device_id, None).await?;
853        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
854    }
855
856    /// A convenience method to retrieve your own device from the store.
857    ///
858    /// This is the same as calling [`Encryption::get_device()`] with your own
859    /// user and device ID.
860    ///
861    /// This will always return a device, unless you are not logged in.
862    pub async fn get_own_device(&self) -> Result<Option<Device>, CryptoStoreError> {
863        let olm = self.client.olm_machine().await;
864        let Some(machine) = olm.as_ref() else { return Ok(None) };
865        let device = machine.get_device(machine.user_id(), machine.device_id(), None).await?;
866        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
867    }
868
869    /// Get a map holding all the devices of an user.
870    ///
871    /// This will always return an empty map if the client hasn't been logged
872    /// in.
873    ///
874    /// # Arguments
875    ///
876    /// * `user_id` - The unique id of the user that the devices belong to.
877    ///
878    /// # Examples
879    ///
880    /// ```no_run
881    /// # use matrix_sdk::{Client, ruma::user_id};
882    /// # use url::Url;
883    /// # async {
884    /// # let alice = user_id!("@alice:example.org");
885    /// # let homeserver = Url::parse("http://example.com")?;
886    /// # let client = Client::new(homeserver).await?;
887    /// let devices = client.encryption().get_user_devices(alice).await?;
888    ///
889    /// for device in devices.devices() {
890    ///     println!("{device:?}");
891    /// }
892    /// # anyhow::Ok(()) };
893    /// ```
894    pub async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices, Error> {
895        let devices = self
896            .client
897            .olm_machine()
898            .await
899            .as_ref()
900            .ok_or(Error::NoOlmMachine)?
901            .get_user_devices(user_id, None)
902            .await?;
903
904        Ok(UserDevices { inner: devices, client: self.client.clone() })
905    }
906
907    /// Get the E2EE identity of a user from the crypto store.
908    ///
909    /// Usually, we only have the E2EE identity of a user locally if the user
910    /// is tracked, meaning that we are both members of the same encrypted room.
911    ///
912    /// To get the E2EE identity of a user even if it is not available locally
913    /// use [`Encryption::request_user_identity()`].
914    ///
915    /// # Arguments
916    ///
917    /// * `user_id` - The unique id of the user that the identity belongs to.
918    ///
919    /// Returns a `UserIdentity` if one is found and the crypto store
920    /// didn't throw an error.
921    ///
922    /// This will always return None if the client hasn't been logged in.
923    ///
924    /// # Examples
925    ///
926    /// ```no_run
927    /// # use matrix_sdk::{Client, ruma::user_id};
928    /// # use url::Url;
929    /// # async {
930    /// # let alice = user_id!("@alice:example.org");
931    /// # let homeserver = Url::parse("http://example.com")?;
932    /// # let client = Client::new(homeserver).await?;
933    /// let user = client.encryption().get_user_identity(alice).await?;
934    ///
935    /// if let Some(user) = user {
936    ///     println!("{:?}", user.is_verified());
937    ///
938    ///     let verification = user.request_verification().await?;
939    /// }
940    /// # anyhow::Ok(()) };
941    /// ```
942    pub async fn get_user_identity(
943        &self,
944        user_id: &UserId,
945    ) -> Result<Option<UserIdentity>, CryptoStoreError> {
946        let olm = self.client.olm_machine().await;
947        let Some(olm) = olm.as_ref() else { return Ok(None) };
948        let identity = olm.get_identity(user_id, None).await?;
949
950        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
951    }
952
953    /// Get the E2EE identity of a user from the homeserver.
954    ///
955    /// The E2EE identity returned is always guaranteed to be up-to-date. If the
956    /// E2EE identity is not found, it should mean that the user did not set
957    /// up cross-signing.
958    ///
959    /// If you want the E2EE identity of a user without making a request to the
960    /// homeserver, use [`Encryption::get_user_identity()`] instead.
961    ///
962    /// # Arguments
963    ///
964    /// * `user_id` - The ID of the user that the identity belongs to.
965    ///
966    /// Returns a [`UserIdentity`] if one is found. Returns an error if there
967    /// was an issue with the crypto store or with the request to the
968    /// homeserver.
969    ///
970    /// This will always return `None` if the client hasn't been logged in.
971    ///
972    /// # Examples
973    ///
974    /// ```no_run
975    /// # use matrix_sdk::{Client, ruma::user_id};
976    /// # use url::Url;
977    /// # async {
978    /// # let alice = user_id!("@alice:example.org");
979    /// # let homeserver = Url::parse("http://example.com")?;
980    /// # let client = Client::new(homeserver).await?;
981    /// let user = client.encryption().request_user_identity(alice).await?;
982    ///
983    /// if let Some(user) = user {
984    ///     println!("User is verified: {:?}", user.is_verified());
985    ///
986    ///     let verification = user.request_verification().await?;
987    /// }
988    /// # anyhow::Ok(()) };
989    /// ```
990    pub async fn request_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
991        let olm = self.client.olm_machine().await;
992        let Some(olm) = olm.as_ref() else { return Ok(None) };
993
994        let (request_id, request) = olm.query_keys_for_users(iter::once(user_id));
995        self.client.keys_query(&request_id, request.device_keys).await?;
996
997        let identity = olm.get_identity(user_id, None).await?;
998        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
999    }
1000
1001    /// Returns a stream of device updates, allowing users to listen for
1002    /// notifications about new or changed devices.
1003    ///
1004    /// The stream produced by this method emits updates whenever a new device
1005    /// is discovered or when an existing device's information is changed. Users
1006    /// can subscribe to this stream and receive updates in real-time.
1007    ///
1008    /// # Examples
1009    ///
1010    /// ```no_run
1011    /// # use matrix_sdk::Client;
1012    /// # use ruma::{device_id, user_id};
1013    /// # use futures_util::{pin_mut, StreamExt};
1014    /// # let client: Client = unimplemented!();
1015    /// # async {
1016    /// let devices_stream = client.encryption().devices_stream().await?;
1017    /// let user_id = client
1018    ///     .user_id()
1019    ///     .expect("We should know our user id after we have logged in");
1020    /// pin_mut!(devices_stream);
1021    ///
1022    /// for device_updates in devices_stream.next().await {
1023    ///     if let Some(user_devices) = device_updates.new.get(user_id) {
1024    ///         for device in user_devices.values() {
1025    ///             println!("A new device has been added {}", device.device_id());
1026    ///         }
1027    ///     }
1028    /// }
1029    /// # anyhow::Ok(()) };
1030    /// ```
1031    pub async fn devices_stream(&self) -> Result<impl Stream<Item = DeviceUpdates>> {
1032        let olm = self.client.olm_machine().await;
1033        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1034        let client = self.client.to_owned();
1035
1036        Ok(olm
1037            .store()
1038            .devices_stream()
1039            .map(move |updates| DeviceUpdates::new(client.to_owned(), updates)))
1040    }
1041
1042    /// Returns a stream of user identity updates, allowing users to listen for
1043    /// notifications about new or changed user identities.
1044    ///
1045    /// The stream produced by this method emits updates whenever a new user
1046    /// identity is discovered or when an existing identities information is
1047    /// changed. Users can subscribe to this stream and receive updates in
1048    /// real-time.
1049    ///
1050    /// # Examples
1051    ///
1052    /// ```no_run
1053    /// # use matrix_sdk::Client;
1054    /// # use ruma::{device_id, user_id};
1055    /// # use futures_util::{pin_mut, StreamExt};
1056    /// # let client: Client = unimplemented!();
1057    /// # async {
1058    /// let identities_stream =
1059    ///     client.encryption().user_identities_stream().await?;
1060    /// pin_mut!(identities_stream);
1061    ///
1062    /// for identity_updates in identities_stream.next().await {
1063    ///     for (_, identity) in identity_updates.new {
1064    ///         println!("A new identity has been added {}", identity.user_id());
1065    ///     }
1066    /// }
1067    /// # anyhow::Ok(()) };
1068    /// ```
1069    pub async fn user_identities_stream(&self) -> Result<impl Stream<Item = IdentityUpdates>> {
1070        let olm = self.client.olm_machine().await;
1071        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1072        let client = self.client.to_owned();
1073
1074        Ok(olm
1075            .store()
1076            .user_identities_stream()
1077            .map(move |updates| IdentityUpdates::new(client.to_owned(), updates)))
1078    }
1079
1080    /// Create and upload a new cross signing identity.
1081    ///
1082    /// # Arguments
1083    ///
1084    /// * `auth_data` - This request requires user interactive auth, the first
1085    ///   request needs to set this to `None` and will always fail with an
1086    ///   `UiaaResponse`. The response will contain information for the
1087    ///   interactive auth and the same request needs to be made but this time
1088    ///   with some `auth_data` provided.
1089    ///
1090    /// # Examples
1091    ///
1092    /// ```no_run
1093    /// # use std::collections::BTreeMap;
1094    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1095    /// # use url::Url;
1096    /// # use serde_json::json;
1097    /// # async {
1098    /// # let homeserver = Url::parse("http://example.com")?;
1099    /// # let client = Client::new(homeserver).await?;
1100    /// if let Err(e) = client.encryption().bootstrap_cross_signing(None).await {
1101    ///     if let Some(response) = e.as_uiaa_response() {
1102    ///         let mut password = uiaa::Password::new(
1103    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
1104    ///             "wordpass".to_owned(),
1105    ///         );
1106    ///         password.session = response.session.clone();
1107    ///
1108    ///         client
1109    ///             .encryption()
1110    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1111    ///             .await
1112    ///             .expect("Couldn't bootstrap cross signing")
1113    ///     } else {
1114    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1115    ///     }
1116    /// }
1117    /// # anyhow::Ok(()) };
1118    pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData>) -> Result<()> {
1119        let olm = self.client.olm_machine().await;
1120        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1121
1122        let CrossSigningBootstrapRequests {
1123            upload_signing_keys_req,
1124            upload_keys_req,
1125            upload_signatures_req,
1126        } = olm.bootstrap_cross_signing(false).await?;
1127
1128        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1129            auth: auth_data,
1130            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1131            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1132            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1133        });
1134
1135        if let Some(req) = upload_keys_req {
1136            self.client.send_outgoing_request(req).await?;
1137        }
1138        self.client.send(upload_signing_keys_req).await?;
1139        self.client.send(upload_signatures_req).await?;
1140
1141        Ok(())
1142    }
1143
1144    /// Reset the cross-signing keys.
1145    ///
1146    /// # Example
1147    ///
1148    /// ```no_run
1149    /// # use matrix_sdk::{ruma::api::client::uiaa, Client, encryption::CrossSigningResetAuthType};
1150    /// # use url::Url;
1151    /// # async {
1152    /// # let homeserver = Url::parse("http://example.com")?;
1153    /// # let client = Client::new(homeserver).await?;
1154    /// # let user_id = unimplemented!();
1155    /// let encryption = client.encryption();
1156    ///
1157    /// if let Some(handle) = encryption.reset_cross_signing().await? {
1158    ///     match handle.auth_type() {
1159    ///         CrossSigningResetAuthType::Uiaa(uiaa) => {
1160    ///             use matrix_sdk::ruma::api::client::uiaa;
1161    ///
1162    ///             let password = "1234".to_owned();
1163    ///             let mut password = uiaa::Password::new(user_id, password);
1164    ///             password.session = uiaa.session;
1165    ///
1166    ///             handle.auth(Some(uiaa::AuthData::Password(password))).await?;
1167    ///         }
1168    ///         CrossSigningResetAuthType::OAuth(o) => {
1169    ///             println!(
1170    ///                 "To reset your end-to-end encryption cross-signing identity, \
1171    ///                 you first need to approve it at {}",
1172    ///                 o.approval_url
1173    ///             );
1174    ///             handle.auth(None).await?;
1175    ///         }
1176    ///     }
1177    /// }
1178    /// # anyhow::Ok(()) };
1179    /// ```
1180    pub async fn reset_cross_signing(&self) -> Result<Option<CrossSigningResetHandle>> {
1181        let olm = self.client.olm_machine().await;
1182        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1183
1184        let CrossSigningBootstrapRequests {
1185            upload_keys_req,
1186            upload_signing_keys_req,
1187            upload_signatures_req,
1188        } = olm.bootstrap_cross_signing(true).await?;
1189
1190        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1191            auth: None,
1192            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1193            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1194            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1195        });
1196
1197        if let Some(req) = upload_keys_req {
1198            self.client.send_outgoing_request(req).await?;
1199        }
1200
1201        if let Err(error) = self.client.send(upload_signing_keys_req.clone()).await {
1202            if let Ok(Some(auth_type)) = CrossSigningResetAuthType::new(&error) {
1203                let client = self.client.clone();
1204
1205                Ok(Some(CrossSigningResetHandle::new(
1206                    client,
1207                    upload_signing_keys_req,
1208                    upload_signatures_req,
1209                    auth_type,
1210                )))
1211            } else {
1212                Err(error.into())
1213            }
1214        } else {
1215            self.client.send(upload_signatures_req).await?;
1216
1217            Ok(None)
1218        }
1219    }
1220
1221    /// Query the user's own device keys, if, and only if, we didn't have their
1222    /// identity in the first place.
1223    async fn ensure_initial_key_query(&self) -> Result<()> {
1224        let olm_machine = self.client.olm_machine().await;
1225        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1226
1227        let user_id = olm_machine.user_id();
1228
1229        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1230            let (request_id, request) = olm_machine.query_keys_for_users([olm_machine.user_id()]);
1231            self.client.keys_query(&request_id, request.device_keys).await?;
1232        }
1233
1234        Ok(())
1235    }
1236
1237    /// Create and upload a new cross signing identity, if that has not been
1238    /// done yet.
1239    ///
1240    /// This will only create a new cross-signing identity if the user had never
1241    /// done it before. If the user did it before, then this is a no-op.
1242    ///
1243    /// See also the documentation of [`Self::bootstrap_cross_signing`] for the
1244    /// behavior of this function.
1245    ///
1246    /// # Arguments
1247    ///
1248    /// * `auth_data` - This request requires user interactive auth, the first
1249    ///   request needs to set this to `None` and will always fail with an
1250    ///   `UiaaResponse`. The response will contain information for the
1251    ///   interactive auth and the same request needs to be made but this time
1252    ///   with some `auth_data` provided.
1253    ///
1254    /// # Examples
1255    /// ```no_run
1256    /// # use std::collections::BTreeMap;
1257    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1258    /// # use url::Url;
1259    /// # use serde_json::json;
1260    /// # async {
1261    /// # let homeserver = Url::parse("http://example.com")?;
1262    /// # let client = Client::new(homeserver).await?;
1263    /// if let Err(e) = client.encryption().bootstrap_cross_signing_if_needed(None).await {
1264    ///     if let Some(response) = e.as_uiaa_response() {
1265    ///         let mut password = uiaa::Password::new(
1266    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
1267    ///             "wordpass".to_owned(),
1268    ///         );
1269    ///         password.session = response.session.clone();
1270    ///
1271    ///         // Note, on the failed attempt we can use `bootstrap_cross_signing` immediately, to
1272    ///         // avoid checks.
1273    ///         client
1274    ///             .encryption()
1275    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1276    ///             .await
1277    ///             .expect("Couldn't bootstrap cross signing")
1278    ///     } else {
1279    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1280    ///     }
1281    /// }
1282    /// # anyhow::Ok(()) };
1283    pub async fn bootstrap_cross_signing_if_needed(
1284        &self,
1285        auth_data: Option<AuthData>,
1286    ) -> Result<()> {
1287        let olm_machine = self.client.olm_machine().await;
1288        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1289        let user_id = olm_machine.user_id();
1290
1291        self.ensure_initial_key_query().await?;
1292
1293        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1294            self.bootstrap_cross_signing(auth_data).await?;
1295        }
1296
1297        Ok(())
1298    }
1299
1300    /// Export E2EE keys that match the given predicate encrypting them with the
1301    /// given passphrase.
1302    ///
1303    /// # Arguments
1304    ///
1305    /// * `path` - The file path where the exported key file will be saved.
1306    ///
1307    /// * `passphrase` - The passphrase that will be used to encrypt the
1308    ///   exported room keys.
1309    ///
1310    /// * `predicate` - A closure that will be called for every known
1311    ///   `InboundGroupSession`, which represents a room key. If the closure
1312    ///   returns `true` the `InboundGroupSessoin` will be included in the
1313    ///   export, if the closure returns `false` it will not be included.
1314    ///
1315    /// # Panics
1316    ///
1317    /// This method will panic if it isn't run on a Tokio runtime.
1318    ///
1319    /// This method will panic if it can't get enough randomness from the OS to
1320    /// encrypt the exported keys securely.
1321    ///
1322    /// # Examples
1323    ///
1324    /// ```no_run
1325    /// # use std::{path::PathBuf, time::Duration};
1326    /// # use matrix_sdk::{
1327    /// #     Client, config::SyncSettings,
1328    /// #     ruma::room_id,
1329    /// # };
1330    /// # use url::Url;
1331    /// # async {
1332    /// # let homeserver = Url::parse("http://localhost:8080")?;
1333    /// # let mut client = Client::new(homeserver).await?;
1334    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1335    /// // Export all room keys.
1336    /// client
1337    ///     .encryption()
1338    ///     .export_room_keys(path, "secret-passphrase", |_| true)
1339    ///     .await?;
1340    ///
1341    /// // Export only the room keys for a certain room.
1342    /// let path = PathBuf::from("/home/example/e2e-room-keys.txt");
1343    /// let room_id = room_id!("!test:localhost");
1344    ///
1345    /// client
1346    ///     .encryption()
1347    ///     .export_room_keys(path, "secret-passphrase", |s| s.room_id() == room_id)
1348    ///     .await?;
1349    /// # anyhow::Ok(()) };
1350    /// ```
1351    #[cfg(not(target_arch = "wasm32"))]
1352    pub async fn export_room_keys(
1353        &self,
1354        path: PathBuf,
1355        passphrase: &str,
1356        predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
1357    ) -> Result<()> {
1358        let olm = self.client.olm_machine().await;
1359        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1360
1361        let keys = olm.store().export_room_keys(predicate).await?;
1362        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1363
1364        let encrypt = move || -> Result<()> {
1365            let export: String =
1366                matrix_sdk_base::crypto::encrypt_room_key_export(&keys, &passphrase, 500_000)?;
1367            let mut file = std::fs::File::create(path)?;
1368            file.write_all(&export.into_bytes())?;
1369            Ok(())
1370        };
1371
1372        let task = tokio::task::spawn_blocking(encrypt);
1373        task.await.expect("Task join error")
1374    }
1375
1376    /// Import E2EE keys from the given file path.
1377    ///
1378    /// # Arguments
1379    ///
1380    /// * `path` - The file path where the exported key file will can be found.
1381    ///
1382    /// * `passphrase` - The passphrase that should be used to decrypt the
1383    ///   exported room keys.
1384    ///
1385    /// Returns a tuple of numbers that represent the number of sessions that
1386    /// were imported and the total number of sessions that were found in the
1387    /// key export.
1388    ///
1389    /// # Panics
1390    ///
1391    /// This method will panic if it isn't run on a Tokio runtime.
1392    ///
1393    /// ```no_run
1394    /// # use std::{path::PathBuf, time::Duration};
1395    /// # use matrix_sdk::{
1396    /// #     Client, config::SyncSettings,
1397    /// #     ruma::room_id,
1398    /// # };
1399    /// # use url::Url;
1400    /// # async {
1401    /// # let homeserver = Url::parse("http://localhost:8080")?;
1402    /// # let mut client = Client::new(homeserver).await?;
1403    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1404    /// let result =
1405    ///     client.encryption().import_room_keys(path, "secret-passphrase").await?;
1406    ///
1407    /// println!(
1408    ///     "Imported {} room keys out of {}",
1409    ///     result.imported_count, result.total_count
1410    /// );
1411    /// # anyhow::Ok(()) };
1412    /// ```
1413    #[cfg(not(target_arch = "wasm32"))]
1414    pub async fn import_room_keys(
1415        &self,
1416        path: PathBuf,
1417        passphrase: &str,
1418    ) -> Result<RoomKeyImportResult, RoomKeyImportError> {
1419        let olm = self.client.olm_machine().await;
1420        let olm = olm.as_ref().ok_or(RoomKeyImportError::StoreClosed)?;
1421        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1422
1423        let decrypt = move || {
1424            let file = std::fs::File::open(path)?;
1425            matrix_sdk_base::crypto::decrypt_room_key_export(file, &passphrase)
1426        };
1427
1428        let task = tokio::task::spawn_blocking(decrypt);
1429        let import = task.await.expect("Task join error")?;
1430
1431        let ret = olm.store().import_exported_room_keys(import, |_, _| {}).await?;
1432
1433        self.backups().maybe_trigger_backup();
1434
1435        Ok(ret)
1436    }
1437
1438    /// Receive notifications of room keys being received as a [`Stream`].
1439    ///
1440    /// Each time a room key is updated in any way, an update will be sent to
1441    /// the stream. Updates that happen at the same time are batched into a
1442    /// [`Vec`].
1443    ///
1444    /// If the reader of the stream lags too far behind, an error is broadcast
1445    /// containing the number of skipped items.
1446    ///
1447    /// # Examples
1448    ///
1449    /// ```no_run
1450    /// # use matrix_sdk::Client;
1451    /// # use url::Url;
1452    /// # async {
1453    /// # let homeserver = Url::parse("http://example.com")?;
1454    /// # let client = Client::new(homeserver).await?;
1455    /// use futures_util::StreamExt;
1456    ///
1457    /// let Some(mut room_keys_stream) =
1458    ///     client.encryption().room_keys_received_stream().await
1459    /// else {
1460    ///     return Ok(());
1461    /// };
1462    ///
1463    /// while let Some(update) = room_keys_stream.next().await {
1464    ///     println!("Received room keys {update:?}");
1465    /// }
1466    /// # anyhow::Ok(()) };
1467    /// ```
1468    pub async fn room_keys_received_stream(
1469        &self,
1470    ) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>>> {
1471        let olm = self.client.olm_machine().await;
1472        let olm = olm.as_ref()?;
1473
1474        Some(olm.store().room_keys_received_stream())
1475    }
1476
1477    /// Get the secret storage manager of the client.
1478    pub fn secret_storage(&self) -> SecretStorage {
1479        SecretStorage { client: self.client.to_owned() }
1480    }
1481
1482    /// Get the backups manager of the client.
1483    pub fn backups(&self) -> Backups {
1484        Backups { client: self.client.to_owned() }
1485    }
1486
1487    /// Get the recovery manager of the client.
1488    pub fn recovery(&self) -> Recovery {
1489        Recovery { client: self.client.to_owned() }
1490    }
1491
1492    /// Enables the crypto-store cross-process lock.
1493    ///
1494    /// This may be required if there are multiple processes that may do writes
1495    /// to the same crypto store. In that case, it's necessary to create a
1496    /// lock, so that only one process writes to it, otherwise this may
1497    /// cause confusing issues because of stale data contained in in-memory
1498    /// caches.
1499    ///
1500    /// The provided `lock_value` must be a unique identifier for this process.
1501    /// Check [`Client::cross_process_store_locks_holder_name`] to
1502    /// get the global value.
1503    pub async fn enable_cross_process_store_lock(&self, lock_value: String) -> Result<(), Error> {
1504        // If the lock has already been created, don't recreate it from scratch.
1505        if let Some(prev_lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1506            let prev_holder = prev_lock.lock_holder();
1507            if prev_holder == lock_value {
1508                return Ok(());
1509            }
1510            warn!("Recreating cross-process store lock with a different holder value: prev was {prev_holder}, new is {lock_value}");
1511        }
1512
1513        let olm_machine = self.client.base_client().olm_machine().await;
1514        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1515
1516        let lock =
1517            olm_machine.store().create_store_lock("cross_process_lock".to_owned(), lock_value);
1518
1519        // Gently try to initialize the crypto store generation counter.
1520        //
1521        // If we don't get the lock immediately, then it is already acquired by another
1522        // process, and we'll get to reload next time we acquire the lock.
1523        {
1524            let guard = lock.try_lock_once().await?;
1525            if guard.is_some() {
1526                olm_machine
1527                    .initialize_crypto_store_generation(
1528                        &self.client.locks().crypto_store_generation,
1529                    )
1530                    .await?;
1531            }
1532        }
1533
1534        self.client
1535            .locks()
1536            .cross_process_crypto_store_lock
1537            .set(lock)
1538            .map_err(|_| Error::BadCryptoStoreState)?;
1539
1540        Ok(())
1541    }
1542
1543    /// Maybe reload the `OlmMachine` after acquiring the lock for the first
1544    /// time.
1545    ///
1546    /// Returns the current generation number.
1547    async fn on_lock_newly_acquired(&self) -> Result<u64, Error> {
1548        let olm_machine_guard = self.client.olm_machine().await;
1549        if let Some(olm_machine) = olm_machine_guard.as_ref() {
1550            let (new_gen, generation_number) = olm_machine
1551                .maintain_crypto_store_generation(&self.client.locks().crypto_store_generation)
1552                .await?;
1553            // If the crypto store generation has changed,
1554            if new_gen {
1555                // (get rid of the reference to the current crypto store first)
1556                drop(olm_machine_guard);
1557                // Recreate the OlmMachine.
1558                self.client.base_client().regenerate_olm(None).await?;
1559            }
1560            Ok(generation_number)
1561        } else {
1562            // XXX: not sure this is reachable. Seems like the OlmMachine should always have
1563            // been initialised by the time we get here. Ideally we'd panic, or return an
1564            // error, but for now I'm just adding some logging to check if it
1565            // happens, and returning the magic number 0.
1566            warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised");
1567            Ok(0)
1568        }
1569    }
1570
1571    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1572    /// spin-waits until the lock is available.
1573    ///
1574    /// May reload the `OlmMachine`, after obtaining the lock but not on the
1575    /// first time.
1576    pub async fn spin_lock_store(
1577        &self,
1578        max_backoff: Option<u32>,
1579    ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1580        if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1581            let guard = lock.spin_lock(max_backoff).await?;
1582
1583            let generation = self.on_lock_newly_acquired().await?;
1584
1585            Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation }))
1586        } else {
1587            Ok(None)
1588        }
1589    }
1590
1591    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1592    /// attempts to lock it once.
1593    ///
1594    /// Returns a guard to the lock, if it was obtained.
1595    pub async fn try_lock_store_once(
1596        &self,
1597    ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1598        if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1599            let maybe_guard = lock.try_lock_once().await?;
1600
1601            let Some(guard) = maybe_guard else {
1602                return Ok(None);
1603            };
1604
1605            let generation = self.on_lock_newly_acquired().await?;
1606
1607            Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation }))
1608        } else {
1609            Ok(None)
1610        }
1611    }
1612
1613    /// Testing purposes only.
1614    #[cfg(any(test, feature = "testing"))]
1615    pub async fn uploaded_key_count(&self) -> Result<u64> {
1616        let olm_machine = self.client.olm_machine().await;
1617        let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?;
1618        Ok(olm_machine.uploaded_key_count().await?)
1619    }
1620
1621    /// Bootstrap encryption and enables event listeners for the E2EE support.
1622    ///
1623    /// Based on the `EncryptionSettings`, this call might:
1624    /// - Bootstrap cross-signing if needed (POST `/device_signing/upload`)
1625    /// - Create a key backup if needed (POST `/room_keys/version`)
1626    /// - Create a secret storage if needed (PUT `/account_data/{type}`)
1627    ///
1628    /// As part of this process, and if needed, the current device keys would be
1629    /// uploaded to the server, new account data would be added, and cross
1630    /// signing keys and signatures might be uploaded.
1631    ///
1632    /// Should be called once we
1633    /// created a [`OlmMachine`], i.e. after logging in.
1634    ///
1635    /// # Arguments
1636    ///
1637    /// * `auth_data` - Some requests may require re-authentication. To prevent
1638    ///   the user from having to re-enter their password (or use other
1639    ///   methods), we can provide the authentication data here. This is
1640    ///   necessary for uploading cross-signing keys. However, please note that
1641    ///   there is a proposal (MSC3967) to remove this requirement, which would
1642    ///   allow for the initial upload of cross-signing keys without
1643    ///   authentication, rendering this parameter obsolete.
1644    pub(crate) fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1645        let mut tasks = self.client.inner.e2ee.tasks.lock();
1646
1647        let this = self.clone();
1648        tasks.setup_e2ee = Some(spawn(async move {
1649            // Update the current state first, so we don't have to wait for the result of
1650            // network requests
1651            this.update_verification_state().await;
1652
1653            if this.settings().auto_enable_cross_signing {
1654                if let Err(e) = this.bootstrap_cross_signing_if_needed(auth_data).await {
1655                    error!("Couldn't bootstrap cross signing {e:?}");
1656                }
1657            }
1658
1659            if let Err(e) = this.backups().setup_and_resume().await {
1660                error!("Couldn't setup and resume backups {e:?}");
1661            }
1662            if let Err(e) = this.recovery().setup().await {
1663                error!("Couldn't setup and resume recovery {e:?}");
1664            }
1665        }));
1666    }
1667
1668    /// Waits for end-to-end encryption initialization tasks to finish, if any
1669    /// was running in the background.
1670    pub async fn wait_for_e2ee_initialization_tasks(&self) {
1671        let task = self.client.inner.e2ee.tasks.lock().setup_e2ee.take();
1672
1673        if let Some(task) = task {
1674            if let Err(err) = task.await {
1675                warn!("Error when initializing backups: {err}");
1676            }
1677        }
1678    }
1679
1680    /// Upload the device keys and initial set of one-time keys to the server.
1681    ///
1682    /// This should only be called when the user logs in for the first time,
1683    /// the method will ensure that other devices see our own device as an
1684    /// end-to-end encryption enabled one.
1685    ///
1686    /// **Warning**: Do not use this method if we're already calling
1687    /// [`Client::send_outgoing_request()`]. This method is intended for
1688    /// explicitly uploading the device keys before starting a sync.
1689    #[cfg(not(target_arch = "wasm32"))]
1690    pub(crate) async fn ensure_device_keys_upload(&self) -> Result<()> {
1691        let olm = self.client.olm_machine().await;
1692        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1693
1694        if let Some((request_id, request)) = olm.upload_device_keys().await? {
1695            self.client.keys_upload(&request_id, &request).await?;
1696
1697            let (request_id, request) = olm.query_keys_for_users([olm.user_id()]);
1698            self.client.keys_query(&request_id, request.device_keys).await?;
1699        }
1700
1701        Ok(())
1702    }
1703
1704    pub(crate) async fn update_state_after_keys_query(&self, response: &get_keys::v3::Response) {
1705        self.recovery().update_state_after_keys_query(response).await;
1706
1707        // Only update the verification_state if our own devices changed
1708        if let Some(user_id) = self.client.user_id() {
1709            let contains_own_device = response.device_keys.contains_key(user_id);
1710
1711            if contains_own_device {
1712                self.update_verification_state().await;
1713            }
1714        }
1715    }
1716
1717    async fn update_verification_state(&self) {
1718        match self.get_own_device().await {
1719            Ok(device) => {
1720                if let Some(device) = device {
1721                    let is_verified = device.is_cross_signed_by_owner();
1722
1723                    if is_verified {
1724                        self.client.inner.verification_state.set(VerificationState::Verified);
1725                    } else {
1726                        self.client.inner.verification_state.set(VerificationState::Unverified);
1727                    }
1728                } else {
1729                    warn!("Couldn't find out own device in the store.");
1730                    self.client.inner.verification_state.set(VerificationState::Unknown);
1731                }
1732            }
1733            Err(error) => {
1734                warn!("Failed retrieving own device: {error}");
1735                self.client.inner.verification_state.set(VerificationState::Unknown);
1736            }
1737        }
1738    }
1739}
1740
1741#[cfg(all(test, not(target_arch = "wasm32")))]
1742mod tests {
1743    use std::{
1744        ops::Not,
1745        sync::{
1746            atomic::{AtomicBool, Ordering},
1747            Arc,
1748        },
1749        time::Duration,
1750    };
1751
1752    use matrix_sdk_test::{
1753        async_test, test_json, GlobalAccountDataTestEvent, JoinedRoomBuilder, StateTestEvent,
1754        SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
1755    };
1756    use ruma::{
1757        event_id,
1758        events::{reaction::ReactionEventContent, relation::Annotation},
1759        user_id,
1760    };
1761    use serde_json::json;
1762    use wiremock::{
1763        matchers::{header, method, path_regex},
1764        Mock, MockServer, Request, ResponseTemplate,
1765    };
1766
1767    use crate::{
1768        assert_next_matches_with_timeout,
1769        config::RequestConfig,
1770        encryption::{OAuthCrossSigningResetInfo, VerificationState},
1771        test_utils::{
1772            client::mock_matrix_session, logged_in_client, no_retry_test_client, set_client_session,
1773        },
1774        Client,
1775    };
1776
1777    #[async_test]
1778    async fn test_reaction_sending() {
1779        let server = MockServer::start().await;
1780        let client = logged_in_client(Some(server.uri())).await;
1781
1782        let event_id = event_id!("$2:example.org");
1783
1784        Mock::given(method("GET"))
1785            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
1786            .and(header("authorization", "Bearer 1234"))
1787            .respond_with(
1788                ResponseTemplate::new(200)
1789                    .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
1790            )
1791            .mount(&server)
1792            .await;
1793
1794        Mock::given(method("PUT"))
1795            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/m\.reaction/.*".to_owned()))
1796            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1797                "event_id": event_id,
1798            })))
1799            .mount(&server)
1800            .await;
1801
1802        let response = SyncResponseBuilder::default()
1803            .add_joined_room(
1804                JoinedRoomBuilder::default()
1805                    .add_state_event(StateTestEvent::Member)
1806                    .add_state_event(StateTestEvent::PowerLevels)
1807                    .add_state_event(StateTestEvent::Encryption),
1808            )
1809            .build_sync_response();
1810
1811        client.base_client().receive_sync_response(response).await.unwrap();
1812
1813        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
1814        assert!(room
1815            .latest_encryption_state()
1816            .await
1817            .expect("Getting encryption state")
1818            .is_encrypted());
1819
1820        let event_id = event_id!("$1:example.org");
1821        let reaction = ReactionEventContent::new(Annotation::new(event_id.into(), "🐈".to_owned()));
1822        room.send(reaction).await.expect("Sending the reaction should not fail");
1823
1824        room.send_raw("m.reaction", json!({})).await.expect("Sending the reaction should not fail");
1825    }
1826
1827    #[async_test]
1828    async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
1829        let server = MockServer::start().await;
1830        let client = logged_in_client(Some(server.uri())).await;
1831        // This is the user ID that is inside MemberAdditional.
1832        // Note the confusing username, so we can share
1833        // GlobalAccountDataTestEvent::Direct with the invited test.
1834        let user_id = user_id!("@invited:localhost");
1835
1836        // When we receive a sync response saying "invited" is invited to a DM
1837        let response = SyncResponseBuilder::default()
1838            .add_joined_room(
1839                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberAdditional),
1840            )
1841            .add_global_account_data_event(GlobalAccountDataTestEvent::Direct)
1842            .build_sync_response();
1843        client.base_client().receive_sync_response(response).await.unwrap();
1844
1845        // Then get_dm_room finds this room
1846        let found_room = client.get_dm_room(user_id).expect("DM not found!");
1847        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
1848    }
1849
1850    #[async_test]
1851    async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
1852        let server = MockServer::start().await;
1853        let client = logged_in_client(Some(server.uri())).await;
1854        // This is the user ID that is inside MemberInvite
1855        let user_id = user_id!("@invited:localhost");
1856
1857        // When we receive a sync response saying "invited" is invited to a DM
1858        let response = SyncResponseBuilder::default()
1859            .add_joined_room(
1860                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberInvite),
1861            )
1862            .add_global_account_data_event(GlobalAccountDataTestEvent::Direct)
1863            .build_sync_response();
1864        client.base_client().receive_sync_response(response).await.unwrap();
1865
1866        // Then get_dm_room finds this room
1867        let found_room = client.get_dm_room(user_id).expect("DM not found!");
1868        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
1869    }
1870
1871    #[async_test]
1872    async fn test_get_dm_room_still_finds_left_room() {
1873        // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
1874        // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
1875
1876        let server = MockServer::start().await;
1877        let client = logged_in_client(Some(server.uri())).await;
1878        // This is the user ID that is inside MemberAdditional.
1879        // Note the confusing username, so we can share
1880        // GlobalAccountDataTestEvent::Direct with the invited test.
1881        let user_id = user_id!("@invited:localhost");
1882
1883        // When we receive a sync response saying "invited" is invited to a DM
1884        let response = SyncResponseBuilder::default()
1885            .add_joined_room(
1886                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberLeave),
1887            )
1888            .add_global_account_data_event(GlobalAccountDataTestEvent::Direct)
1889            .build_sync_response();
1890        client.base_client().receive_sync_response(response).await.unwrap();
1891
1892        // Then get_dm_room finds this room
1893        let found_room = client.get_dm_room(user_id).expect("DM not found!");
1894        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
1895    }
1896
1897    #[cfg(feature = "sqlite")]
1898    #[async_test]
1899    async fn test_generation_counter_invalidates_olm_machine() {
1900        // Create two clients using the same sqlite database.
1901
1902        use matrix_sdk_base::store::RoomLoadSettings;
1903        let sqlite_path = std::env::temp_dir().join("generation_counter_sqlite.db");
1904        let session = mock_matrix_session();
1905
1906        let client1 = Client::builder()
1907            .homeserver_url("http://localhost:1234")
1908            .request_config(RequestConfig::new().disable_retry())
1909            .sqlite_store(&sqlite_path, None)
1910            .build()
1911            .await
1912            .unwrap();
1913        client1
1914            .matrix_auth()
1915            .restore_session(session.clone(), RoomLoadSettings::default())
1916            .await
1917            .unwrap();
1918
1919        let client2 = Client::builder()
1920            .homeserver_url("http://localhost:1234")
1921            .request_config(RequestConfig::new().disable_retry())
1922            .sqlite_store(sqlite_path, None)
1923            .build()
1924            .await
1925            .unwrap();
1926        client2.matrix_auth().restore_session(session, RoomLoadSettings::default()).await.unwrap();
1927
1928        // When the lock isn't enabled, any attempt at locking won't return a guard.
1929        let guard = client1.encryption().try_lock_store_once().await.unwrap();
1930        assert!(guard.is_none());
1931
1932        client1.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
1933        client2.encryption().enable_cross_process_store_lock("client2".to_owned()).await.unwrap();
1934
1935        // One client can take the lock.
1936        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
1937        assert!(acquired1.is_some());
1938
1939        // Keep the olm machine, so we can see if it's changed later, by comparing Arcs.
1940        let initial_olm_machine =
1941            client1.olm_machine().await.clone().expect("must have an olm machine");
1942
1943        // Also enable backup to check that new machine has the same backup keys.
1944        let decryption_key = matrix_sdk_base::crypto::store::BackupDecryptionKey::new()
1945            .expect("Can't create new recovery key");
1946        let backup_key = decryption_key.megolm_v1_public_key();
1947        backup_key.set_version("1".to_owned());
1948        initial_olm_machine
1949            .backup_machine()
1950            .save_decryption_key(Some(decryption_key.to_owned()), Some("1".to_owned()))
1951            .await
1952            .expect("Should save");
1953
1954        initial_olm_machine.backup_machine().enable_backup_v1(backup_key.clone()).await.unwrap();
1955
1956        assert!(client1.encryption().backups().are_enabled().await);
1957
1958        // The other client can't take the lock too.
1959        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
1960        assert!(acquired2.is_none());
1961
1962        // Now have the first client release the lock,
1963        drop(acquired1);
1964        tokio::time::sleep(Duration::from_millis(100)).await;
1965
1966        // And re-take it.
1967        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
1968        assert!(acquired1.is_some());
1969
1970        // In that case, the Olm Machine shouldn't change.
1971        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
1972        assert!(initial_olm_machine.same_as(&olm_machine));
1973
1974        // Ok, release again.
1975        drop(acquired1);
1976        tokio::time::sleep(Duration::from_millis(100)).await;
1977
1978        // Client2 can acquire the lock.
1979        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
1980        assert!(acquired2.is_some());
1981
1982        // And then release it.
1983        drop(acquired2);
1984        tokio::time::sleep(Duration::from_millis(100)).await;
1985
1986        // Client1 can acquire it again,
1987        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
1988        assert!(acquired1.is_some());
1989
1990        // But now its olm machine has been invalidated and thus regenerated!
1991        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
1992
1993        assert!(!initial_olm_machine.same_as(&olm_machine));
1994
1995        let backup_key_new = olm_machine.backup_machine().get_backup_keys().await.unwrap();
1996        assert!(backup_key_new.decryption_key.is_some());
1997        assert_eq!(
1998            backup_key_new.decryption_key.unwrap().megolm_v1_public_key().to_base64(),
1999            backup_key.to_base64()
2000        );
2001        assert!(client1.encryption().backups().are_enabled().await);
2002    }
2003
2004    #[cfg(feature = "sqlite")]
2005    #[async_test]
2006    async fn test_generation_counter_no_spurious_invalidation() {
2007        // Create two clients using the same sqlite database.
2008
2009        use matrix_sdk_base::store::RoomLoadSettings;
2010        let sqlite_path =
2011            std::env::temp_dir().join("generation_counter_no_spurious_invalidations.db");
2012        let session = mock_matrix_session();
2013
2014        let client = Client::builder()
2015            .homeserver_url("http://localhost:1234")
2016            .request_config(RequestConfig::new().disable_retry())
2017            .sqlite_store(&sqlite_path, None)
2018            .build()
2019            .await
2020            .unwrap();
2021        client
2022            .matrix_auth()
2023            .restore_session(session.clone(), RoomLoadSettings::default())
2024            .await
2025            .unwrap();
2026
2027        let initial_olm_machine = client.olm_machine().await.as_ref().unwrap().clone();
2028
2029        client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2030
2031        // Enabling the lock doesn't update the olm machine.
2032        let after_enabling_lock = client.olm_machine().await.as_ref().unwrap().clone();
2033        assert!(initial_olm_machine.same_as(&after_enabling_lock));
2034
2035        {
2036            // Simulate that another client hold the lock before.
2037            let client2 = Client::builder()
2038                .homeserver_url("http://localhost:1234")
2039                .request_config(RequestConfig::new().disable_retry())
2040                .sqlite_store(sqlite_path, None)
2041                .build()
2042                .await
2043                .unwrap();
2044            client2
2045                .matrix_auth()
2046                .restore_session(session, RoomLoadSettings::default())
2047                .await
2048                .unwrap();
2049
2050            client2
2051                .encryption()
2052                .enable_cross_process_store_lock("client2".to_owned())
2053                .await
2054                .unwrap();
2055
2056            let guard = client2.encryption().spin_lock_store(None).await.unwrap();
2057            assert!(guard.is_some());
2058
2059            drop(guard);
2060            tokio::time::sleep(Duration::from_millis(100)).await;
2061        }
2062
2063        {
2064            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2065            assert!(acquired.is_some());
2066        }
2067
2068        // Taking the lock the first time will update the olm machine.
2069        let after_taking_lock_first_time = client.olm_machine().await.as_ref().unwrap().clone();
2070        assert!(!initial_olm_machine.same_as(&after_taking_lock_first_time));
2071
2072        {
2073            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2074            assert!(acquired.is_some());
2075        }
2076
2077        // Re-taking the lock doesn't update the olm machine.
2078        let after_taking_lock_second_time = client.olm_machine().await.as_ref().unwrap().clone();
2079        assert!(after_taking_lock_first_time.same_as(&after_taking_lock_second_time));
2080    }
2081
2082    #[async_test]
2083    async fn test_update_verification_state_is_updated_before_any_requests_happen() {
2084        // Given a client and a server
2085        let client = no_retry_test_client(None).await;
2086        let server = MockServer::start().await;
2087
2088        // When we subscribe to its verification state
2089        let mut verification_state = client.encryption().verification_state();
2090
2091        // We can get its initial value, and it's Unknown
2092        assert_next_matches_with_timeout!(verification_state, VerificationState::Unknown);
2093
2094        // We set up a mocked request to check this endpoint is not called before
2095        // reading the new state
2096        let keys_requested = Arc::new(AtomicBool::new(false));
2097        let inner_bool = keys_requested.clone();
2098
2099        Mock::given(method("GET"))
2100            .and(path_regex(
2101                r"/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
2102            ))
2103            .respond_with(move |_req: &Request| {
2104                inner_bool.fetch_or(true, Ordering::SeqCst);
2105                ResponseTemplate::new(200).set_body_json(json!({}))
2106            })
2107            .mount(&server)
2108            .await;
2109
2110        // When the session is initialised and the encryption tasks spawn
2111        set_client_session(&client).await;
2112
2113        // Then we can get an updated value without waiting for any network requests
2114        assert!(keys_requested.load(Ordering::SeqCst).not());
2115        assert_next_matches_with_timeout!(verification_state, VerificationState::Unverified);
2116    }
2117
2118    #[test]
2119    fn test_oauth_reset_info_from_uiaa_info() {
2120        let auth_info = json!({
2121            "session": "dummy",
2122            "flows": [
2123                {
2124                    "stages": [
2125                        "org.matrix.cross_signing_reset"
2126                    ]
2127                }
2128            ],
2129            "params": {
2130                "org.matrix.cross_signing_reset": {
2131                    "url": "https://example.org/account/account?action=org.matrix.cross_signing_reset"
2132                }
2133            },
2134            "msg": "To reset..."
2135        });
2136
2137        let auth_info = serde_json::from_value(auth_info)
2138            .expect("We should be able to deserialize the UiaaInfo");
2139        OAuthCrossSigningResetInfo::from_auth_info(&auth_info)
2140            .expect("We should be able to fetch the cross-signing reset info from the auth info");
2141    }
2142}