matrix_sdk/encryption/
mod.rs

1// Copyright 2021 The Matrix.org Foundation C.I.C.
2// Copyright 2021 Damir Jelić
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![doc = include_str!("../docs/encryption.md")]
17#![cfg_attr(target_family = "wasm", allow(unused_imports))]
18
19#[cfg(feature = "experimental-send-custom-to-device")]
20use std::ops::Deref;
21use std::{
22    collections::{BTreeMap, HashSet},
23    io::{Cursor, Read, Write},
24    iter,
25    path::PathBuf,
26    sync::Arc,
27};
28
29use eyeball::{SharedObservable, Subscriber};
30use futures_core::Stream;
31use futures_util::{
32    future::try_join,
33    stream::{self, StreamExt},
34};
35use matrix_sdk_base::crypto::{
36    store::types::RoomKeyInfo,
37    types::requests::{
38        OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
39    },
40    CrossSigningBootstrapRequests, OlmMachine,
41};
42use matrix_sdk_common::{executor::spawn, locks::Mutex as StdMutex};
43use ruma::{
44    api::client::{
45        keys::{
46            get_keys, upload_keys, upload_signatures::v3::Request as UploadSignaturesRequest,
47            upload_signing_keys::v3::Request as UploadSigningKeysRequest,
48        },
49        message::send_message_event,
50        to_device::send_event_to_device::v3::{
51            Request as RumaToDeviceRequest, Response as ToDeviceResponse,
52        },
53        uiaa::{AuthData, UiaaInfo},
54    },
55    assign,
56    events::{
57        direct::DirectUserIdentifier,
58        room::{MediaSource, ThumbnailInfo},
59    },
60    DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, TransactionId, UserId,
61};
62#[cfg(feature = "experimental-send-custom-to-device")]
63use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices};
64use serde::Deserialize;
65use tokio::sync::{Mutex, RwLockReadGuard};
66use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
67use tracing::{debug, error, instrument, trace, warn};
68use url::Url;
69use vodozemac::Curve25519PublicKey;
70
71use self::{
72    backups::{types::BackupClientState, Backups},
73    futures::UploadEncryptedFile,
74    identities::{Device, DeviceUpdates, IdentityUpdates, UserDevices, UserIdentity},
75    recovery::{Recovery, RecoveryState},
76    secret_storage::SecretStorage,
77    tasks::{BackupDownloadTask, BackupUploadingTask, ClientTasks},
78    verification::{SasVerification, Verification, VerificationRequest},
79};
80use crate::{
81    attachment::Thumbnail,
82    client::{ClientInner, WeakClient},
83    error::HttpResult,
84    store_locks::CrossProcessStoreLockGuard,
85    Client, Error, HttpError, Result, Room, TransmissionProgress,
86};
87
88pub mod backups;
89pub mod futures;
90pub mod identities;
91pub mod recovery;
92pub mod secret_storage;
93pub(crate) mod tasks;
94pub mod verification;
95
96pub use matrix_sdk_base::crypto::{
97    olm::{
98        SessionCreationError as MegolmSessionCreationError,
99        SessionExportError as OlmSessionExportError,
100    },
101    vodozemac, CrossSigningStatus, CryptoStoreError, DecryptorError, EventError, KeyExportError,
102    LocalTrust, MediaEncryptionInfo, MegolmError, OlmError, RoomKeyImportResult, SecretImportError,
103    SessionCreationError, SignatureError, VERSION,
104};
105
106#[cfg(feature = "experimental-send-custom-to-device")]
107use crate::config::RequestConfig;
108pub use crate::error::RoomKeyImportError;
109
110/// All the data related to the encryption state.
111pub(crate) struct EncryptionData {
112    /// Background tasks related to encryption (key backup, initialization
113    /// tasks, etc.).
114    pub tasks: StdMutex<ClientTasks>,
115
116    /// End-to-end encryption settings.
117    pub encryption_settings: EncryptionSettings,
118
119    /// All state related to key backup.
120    pub backup_state: BackupClientState,
121
122    /// All state related to secret storage recovery.
123    pub recovery_state: SharedObservable<RecoveryState>,
124}
125
126impl EncryptionData {
127    pub fn new(encryption_settings: EncryptionSettings) -> Self {
128        Self {
129            encryption_settings,
130
131            tasks: StdMutex::new(Default::default()),
132            backup_state: Default::default(),
133            recovery_state: Default::default(),
134        }
135    }
136
137    pub fn initialize_room_key_tasks(&self, client: &Arc<ClientInner>) {
138        let weak_client = WeakClient::from_inner(client);
139
140        let mut tasks = self.tasks.lock();
141        tasks.upload_room_keys = Some(BackupUploadingTask::new(weak_client.clone()));
142
143        if self.encryption_settings.backup_download_strategy
144            == BackupDownloadStrategy::AfterDecryptionFailure
145        {
146            tasks.download_room_keys = Some(BackupDownloadTask::new(weak_client));
147        }
148    }
149
150    /// Initialize the background task which listens for changes in the
151    /// [`backups::BackupState`] and updataes the [`recovery::RecoveryState`].
152    ///
153    /// This should happen after the usual tasks have been set up and after the
154    /// E2EE initialization tasks have been set up.
155    pub fn initialize_recovery_state_update_task(&self, client: &Client) {
156        let mut guard = self.tasks.lock();
157
158        let future = Recovery::update_state_after_backup_state_change(client);
159        let join_handle = spawn(future);
160
161        guard.update_recovery_state_after_backup = Some(join_handle);
162    }
163}
164
165/// Settings for end-to-end encryption features.
166#[derive(Clone, Copy, Debug, Default)]
167pub struct EncryptionSettings {
168    /// Automatically bootstrap cross-signing for a user once they're logged, in
169    /// case it's not already done yet.
170    ///
171    /// This requires to login with a username and password, or that MSC3967 is
172    /// enabled on the server, as of 2023-10-20.
173    pub auto_enable_cross_signing: bool,
174
175    /// Select a strategy to download room keys from the backup, by default room
176    /// keys won't be downloaded from the backup automatically.
177    ///
178    /// Take a look at the [`BackupDownloadStrategy`] enum for more options.
179    pub backup_download_strategy: BackupDownloadStrategy,
180
181    /// Automatically create a backup version if no backup exists.
182    pub auto_enable_backups: bool,
183}
184
185/// Settings for end-to-end encryption features.
186#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
187#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
188pub enum BackupDownloadStrategy {
189    /// Automatically download all room keys from the backup when the backup
190    /// recovery key has been received. The backup recovery key can be received
191    /// in two ways:
192    ///
193    /// 1. Received as a `m.secret.send` to-device event, after a successful
194    ///    interactive verification.
195    /// 2. Imported from secret storage (4S) using the
196    ///    [`SecretStore::import_secrets()`] method.
197    ///
198    /// [`SecretStore::import_secrets()`]: crate::encryption::secret_storage::SecretStore::import_secrets
199    OneShot,
200
201    /// Attempt to download a single room key if an event fails to be decrypted.
202    AfterDecryptionFailure,
203
204    /// Don't download any room keys automatically. The user can manually
205    /// download room keys using the [`Backups::download_room_key()`] methods.
206    ///
207    /// This is the default option.
208    #[default]
209    Manual,
210}
211
212/// The verification state of our own device
213///
214/// This enum tells us if our own user identity trusts these devices, in other
215/// words it tells us if the user identity has signed the device.
216#[derive(Clone, Copy, Debug, Eq, PartialEq)]
217pub enum VerificationState {
218    /// The verification state is unknown for now.
219    Unknown,
220    /// The device is considered to be verified, it has been signed by its user
221    /// identity.
222    Verified,
223    /// The device is unverified.
224    Unverified,
225}
226
227/// Wraps together a `CrossProcessLockStoreGuard` and a generation number.
228#[derive(Debug)]
229pub struct CrossProcessLockStoreGuardWithGeneration {
230    _guard: CrossProcessStoreLockGuard,
231    generation: u64,
232}
233
234impl CrossProcessLockStoreGuardWithGeneration {
235    /// Return the Crypto Store generation associated with this store lock.
236    pub fn generation(&self) -> u64 {
237        self.generation
238    }
239}
240
241/// A stateful struct remembering the cross-signing keys we need to upload.
242///
243/// Since the `/_matrix/client/v3/keys/device_signing/upload` might require
244/// additional authentication, this struct will contain information on the type
245/// of authentication the user needs to complete before the upload might be
246/// continued.
247///
248/// More info can be found in the [spec].
249///
250/// [spec]: https://spec.matrix.org/v1.11/client-server-api/#post_matrixclientv3keysdevice_signingupload
251#[derive(Debug)]
252pub struct CrossSigningResetHandle {
253    client: Client,
254    upload_request: UploadSigningKeysRequest,
255    signatures_request: UploadSignaturesRequest,
256    auth_type: CrossSigningResetAuthType,
257    is_cancelled: Mutex<bool>,
258}
259
260impl CrossSigningResetHandle {
261    /// Set up a new `CrossSigningResetHandle`.
262    pub fn new(
263        client: Client,
264        upload_request: UploadSigningKeysRequest,
265        signatures_request: UploadSignaturesRequest,
266        auth_type: CrossSigningResetAuthType,
267    ) -> Self {
268        Self {
269            client,
270            upload_request,
271            signatures_request,
272            auth_type,
273            is_cancelled: Mutex::new(false),
274        }
275    }
276
277    /// Get the [`CrossSigningResetAuthType`] this cross-signing reset process
278    /// is using.
279    pub fn auth_type(&self) -> &CrossSigningResetAuthType {
280        &self.auth_type
281    }
282
283    /// Continue the cross-signing reset by either waiting for the
284    /// authentication to be done on the side of the OAuth 2.0 server or by
285    /// providing additional [`AuthData`] the homeserver requires.
286    pub async fn auth(&self, auth: Option<AuthData>) -> Result<()> {
287        let mut upload_request = self.upload_request.clone();
288        upload_request.auth = auth;
289
290        while let Err(e) = self.client.send(upload_request.clone()).await {
291            if *self.is_cancelled.lock().await {
292                return Ok(());
293            }
294
295            match e.as_uiaa_response() {
296                Some(uiaa_info) => {
297                    if uiaa_info.auth_error.is_some() {
298                        return Err(e.into());
299                    }
300                }
301                None => return Err(e.into()),
302            }
303        }
304
305        self.client.send(self.signatures_request.clone()).await?;
306
307        Ok(())
308    }
309
310    /// Cancel the ongoing identity reset process
311    pub async fn cancel(&self) {
312        *self.is_cancelled.lock().await = true;
313    }
314}
315
316/// information about the additional authentication that is required before the
317/// cross-signing keys can be uploaded.
318#[derive(Debug, Clone)]
319pub enum CrossSigningResetAuthType {
320    /// The homeserver requires user-interactive authentication.
321    Uiaa(UiaaInfo),
322    /// OAuth 2.0 is used for authentication and the user needs to open a URL to
323    /// approve the upload of cross-signing keys.
324    OAuth(OAuthCrossSigningResetInfo),
325}
326
327impl CrossSigningResetAuthType {
328    fn new(error: &HttpError) -> Result<Option<Self>> {
329        if let Some(auth_info) = error.as_uiaa_response() {
330            if let Ok(auth_info) = OAuthCrossSigningResetInfo::from_auth_info(auth_info) {
331                Ok(Some(CrossSigningResetAuthType::OAuth(auth_info)))
332            } else {
333                Ok(Some(CrossSigningResetAuthType::Uiaa(auth_info.clone())))
334            }
335        } else {
336            Ok(None)
337        }
338    }
339}
340
341/// OAuth 2.0 specific information about the required authentication for the
342/// upload of cross-signing keys.
343#[derive(Debug, Clone, Deserialize)]
344pub struct OAuthCrossSigningResetInfo {
345    /// The URL where the user can approve the reset of the cross-signing keys.
346    pub approval_url: Url,
347}
348
349impl OAuthCrossSigningResetInfo {
350    fn from_auth_info(auth_info: &UiaaInfo) -> Result<Self> {
351        let parameters =
352            serde_json::from_str::<OAuthCrossSigningResetUiaaParameters>(auth_info.params.get())?;
353
354        Ok(OAuthCrossSigningResetInfo { approval_url: parameters.reset.url })
355    }
356}
357
358/// The parsed `parameters` part of a [`ruma::api::client::uiaa::UiaaInfo`]
359/// response
360#[derive(Debug, Deserialize)]
361struct OAuthCrossSigningResetUiaaParameters {
362    /// The URL where the user can approve the reset of the cross-signing keys.
363    #[serde(rename = "org.matrix.cross_signing_reset")]
364    reset: OAuthCrossSigningResetUiaaResetParameter,
365}
366
367/// The `org.matrix.cross_signing_reset` part of the Uiaa response `parameters``
368/// dictionary.
369#[derive(Debug, Deserialize)]
370struct OAuthCrossSigningResetUiaaResetParameter {
371    /// The URL where the user can approve the reset of the cross-signing keys.
372    url: Url,
373}
374
375impl Client {
376    pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
377        self.base_client().olm_machine().await
378    }
379
380    pub(crate) async fn mark_request_as_sent(
381        &self,
382        request_id: &TransactionId,
383        response: impl Into<matrix_sdk_base::crypto::types::requests::AnyIncomingResponse<'_>>,
384    ) -> Result<(), matrix_sdk_base::Error> {
385        Ok(self
386            .olm_machine()
387            .await
388            .as_ref()
389            .expect(
390                "We should have an olm machine once we try to mark E2EE related requests as sent",
391            )
392            .mark_request_as_sent(request_id, response)
393            .await?)
394    }
395
396    /// Query the server for users device keys.
397    ///
398    /// # Panics
399    ///
400    /// Panics if no key query needs to be done.
401    #[instrument(skip(self, device_keys))]
402    pub(crate) async fn keys_query(
403        &self,
404        request_id: &TransactionId,
405        device_keys: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
406    ) -> Result<get_keys::v3::Response> {
407        let request = assign!(get_keys::v3::Request::new(), { device_keys });
408
409        let response = self.send(request).await?;
410        self.mark_request_as_sent(request_id, &response).await?;
411        self.encryption().update_state_after_keys_query(&response).await;
412
413        Ok(response)
414    }
415
416    /// Construct a [`EncryptedFile`][ruma::events::room::EncryptedFile] by
417    /// encrypting and uploading a provided reader.
418    ///
419    /// # Arguments
420    ///
421    /// * `content_type` - The content type of the file.
422    /// * `reader` - The reader that should be encrypted and uploaded.
423    ///
424    /// # Examples
425    ///
426    /// ```no_run
427    /// # use matrix_sdk::Client;
428    /// # use url::Url;
429    /// # use matrix_sdk::ruma::{room_id, OwnedRoomId};
430    /// use serde::{Deserialize, Serialize};
431    /// use matrix_sdk::ruma::events::{macros::EventContent, room::EncryptedFile};
432    ///
433    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
434    /// #[ruma_event(type = "com.example.custom", kind = MessageLike)]
435    /// struct CustomEventContent {
436    ///     encrypted_file: EncryptedFile,
437    /// }
438    ///
439    /// # async {
440    /// # let homeserver = Url::parse("http://example.com")?;
441    /// # let client = Client::new(homeserver).await?;
442    /// # let room = client.get_room(&room_id!("!test:example.com")).unwrap();
443    /// let mut reader = std::io::Cursor::new(b"Hello, world!");
444    /// let encrypted_file = client.upload_encrypted_file(&mut reader).await?;
445    ///
446    /// room.send(CustomEventContent { encrypted_file }).await?;
447    /// # anyhow::Ok(()) };
448    /// ```
449    pub fn upload_encrypted_file<'a, R: Read + ?Sized + 'a>(
450        &'a self,
451        reader: &'a mut R,
452    ) -> UploadEncryptedFile<'a, R> {
453        UploadEncryptedFile::new(self, reader)
454    }
455
456    /// Encrypt and upload the file and thumbnails, and return the source
457    /// information.
458    pub(crate) async fn upload_encrypted_media_and_thumbnail(
459        &self,
460        data: &[u8],
461        thumbnail: Option<Thumbnail>,
462        send_progress: SharedObservable<TransmissionProgress>,
463    ) -> Result<(MediaSource, Option<(MediaSource, Box<ThumbnailInfo>)>)> {
464        let upload_thumbnail = self.upload_encrypted_thumbnail(thumbnail, send_progress.clone());
465
466        let upload_attachment = async {
467            let mut cursor = Cursor::new(data);
468            self.upload_encrypted_file(&mut cursor)
469                .with_send_progress_observable(send_progress)
470                .await
471        };
472
473        let (thumbnail, file) = try_join(upload_thumbnail, upload_attachment).await?;
474
475        Ok((MediaSource::Encrypted(Box::new(file)), thumbnail))
476    }
477
478    /// Uploads an encrypted thumbnail to the media repository, and returns
479    /// its source and extra information.
480    async fn upload_encrypted_thumbnail(
481        &self,
482        thumbnail: Option<Thumbnail>,
483        send_progress: SharedObservable<TransmissionProgress>,
484    ) -> Result<Option<(MediaSource, Box<ThumbnailInfo>)>> {
485        let Some(thumbnail) = thumbnail else {
486            return Ok(None);
487        };
488
489        let (data, _, thumbnail_info) = thumbnail.into_parts();
490        let mut cursor = Cursor::new(data);
491
492        let file = self
493            .upload_encrypted_file(&mut cursor)
494            .with_send_progress_observable(send_progress)
495            .await?;
496
497        Ok(Some((MediaSource::Encrypted(Box::new(file)), thumbnail_info)))
498    }
499
500    /// Claim one-time keys creating new Olm sessions.
501    ///
502    /// # Arguments
503    ///
504    /// * `users` - The list of user/device pairs that we should claim keys for.
505    pub(crate) async fn claim_one_time_keys(
506        &self,
507        users: impl Iterator<Item = &UserId>,
508    ) -> Result<()> {
509        let _lock = self.locks().key_claim_lock.lock().await;
510
511        if let Some((request_id, request)) = self
512            .olm_machine()
513            .await
514            .as_ref()
515            .ok_or(Error::NoOlmMachine)?
516            .get_missing_sessions(users)
517            .await?
518        {
519            let response = self.send(request).await?;
520            self.mark_request_as_sent(&request_id, &response).await?;
521        }
522
523        Ok(())
524    }
525
526    /// Upload the E2E encryption keys.
527    ///
528    /// This uploads the long lived device keys as well as the required amount
529    /// of one-time keys.
530    ///
531    /// # Panics
532    ///
533    /// Panics if the client isn't logged in, or if no encryption keys need to
534    /// be uploaded.
535    #[instrument(skip(self, request))]
536    pub(crate) async fn keys_upload(
537        &self,
538        request_id: &TransactionId,
539        request: &upload_keys::v3::Request,
540    ) -> Result<upload_keys::v3::Response> {
541        debug!(
542            device_keys = request.device_keys.is_some(),
543            one_time_key_count = request.one_time_keys.len(),
544            "Uploading public encryption keys",
545        );
546
547        let response = self.send(request.clone()).await?;
548        self.mark_request_as_sent(request_id, &response).await?;
549
550        Ok(response)
551    }
552
553    pub(crate) async fn room_send_helper(
554        &self,
555        request: &RoomMessageRequest,
556    ) -> Result<send_message_event::v3::Response> {
557        let content = request.content.clone();
558        let txn_id = request.txn_id.clone();
559        let room_id = &request.room_id;
560
561        self.get_room(room_id)
562            .expect("Can't send a message to a room that isn't known to the store")
563            .send(*content)
564            .with_transaction_id(txn_id)
565            .await
566    }
567
568    pub(crate) async fn send_to_device(
569        &self,
570        request: &ToDeviceRequest,
571    ) -> HttpResult<ToDeviceResponse> {
572        let request = RumaToDeviceRequest::new_raw(
573            request.event_type.clone(),
574            request.txn_id.clone(),
575            request.messages.clone(),
576        );
577
578        self.send(request).await
579    }
580
581    pub(crate) async fn send_verification_request(
582        &self,
583        request: OutgoingVerificationRequest,
584    ) -> Result<()> {
585        use matrix_sdk_base::crypto::types::requests::OutgoingVerificationRequest::*;
586
587        match request {
588            ToDevice(t) => {
589                self.send_to_device(&t).await?;
590            }
591            InRoom(r) => {
592                self.room_send_helper(&r).await?;
593            }
594        }
595
596        Ok(())
597    }
598
599    /// Get the existing DM room with the given user, if any.
600    pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
601        let rooms = self.joined_rooms();
602
603        // Find the room we share with the `user_id` and only with `user_id`
604        let room = rooms.into_iter().find(|r| {
605            let targets = r.direct_targets();
606            targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id))
607        });
608
609        trace!(?room, "Found room");
610        room
611    }
612
613    async fn send_outgoing_request(&self, r: OutgoingRequest) -> Result<()> {
614        use matrix_sdk_base::crypto::types::requests::AnyOutgoingRequest;
615
616        match r.request() {
617            AnyOutgoingRequest::KeysQuery(request) => {
618                self.keys_query(r.request_id(), request.device_keys.clone()).await?;
619            }
620            AnyOutgoingRequest::KeysUpload(request) => {
621                self.keys_upload(r.request_id(), request).await?;
622            }
623            AnyOutgoingRequest::ToDeviceRequest(request) => {
624                let response = self.send_to_device(request).await?;
625                self.mark_request_as_sent(r.request_id(), &response).await?;
626            }
627            AnyOutgoingRequest::SignatureUpload(request) => {
628                let response = self.send(request.clone()).await?;
629                self.mark_request_as_sent(r.request_id(), &response).await?;
630            }
631            AnyOutgoingRequest::RoomMessage(request) => {
632                let response = self.room_send_helper(request).await?;
633                self.mark_request_as_sent(r.request_id(), &response).await?;
634            }
635            AnyOutgoingRequest::KeysClaim(request) => {
636                let response = self.send(request.clone()).await?;
637                self.mark_request_as_sent(r.request_id(), &response).await?;
638            }
639        }
640
641        Ok(())
642    }
643
644    #[instrument(skip_all)]
645    pub(crate) async fn send_outgoing_requests(&self) -> Result<()> {
646        const MAX_CONCURRENT_REQUESTS: usize = 20;
647
648        // This is needed because sometimes we need to automatically
649        // claim some one-time keys to unwedge an existing Olm session.
650        if let Err(e) = self.claim_one_time_keys(iter::empty()).await {
651            warn!("Error while claiming one-time keys {:?}", e);
652        }
653
654        let outgoing_requests = stream::iter(
655            self.olm_machine()
656                .await
657                .as_ref()
658                .ok_or(Error::NoOlmMachine)?
659                .outgoing_requests()
660                .await?,
661        )
662        .map(|r| self.send_outgoing_request(r));
663
664        let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS);
665
666        requests
667            .for_each(|r| async move {
668                match r {
669                    Ok(_) => (),
670                    Err(e) => warn!(error = ?e, "Error when sending out an outgoing E2EE request"),
671                }
672            })
673            .await;
674
675        Ok(())
676    }
677}
678
679#[cfg(any(feature = "testing", test))]
680impl Client {
681    /// Get the olm machine, for testing purposes only.
682    pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
683        self.olm_machine().await
684    }
685}
686
687/// A high-level API to manage the client's encryption.
688///
689/// To get this, use [`Client::encryption()`].
690#[derive(Debug, Clone)]
691pub struct Encryption {
692    /// The underlying client.
693    client: Client,
694}
695
696impl Encryption {
697    pub(crate) fn new(client: Client) -> Self {
698        Self { client }
699    }
700
701    /// Returns the current encryption settings for this client.
702    pub(crate) fn settings(&self) -> EncryptionSettings {
703        self.client.inner.e2ee.encryption_settings
704    }
705
706    /// Get the public ed25519 key of our own device. This is usually what is
707    /// called the fingerprint of the device.
708    pub async fn ed25519_key(&self) -> Option<String> {
709        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().ed25519.to_base64())
710    }
711
712    /// Get the public Curve25519 key of our own device.
713    pub async fn curve25519_key(&self) -> Option<Curve25519PublicKey> {
714        self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().curve25519)
715    }
716
717    /// Get the current device creation timestamp.
718    pub async fn device_creation_timestamp(&self) -> MilliSecondsSinceUnixEpoch {
719        match self.get_own_device().await {
720            Ok(Some(device)) => device.first_time_seen_ts(),
721            // Should not happen, there should always be an own device
722            _ => MilliSecondsSinceUnixEpoch::now(),
723        }
724    }
725
726    pub(crate) async fn import_secrets_bundle(
727        &self,
728        bundle: &matrix_sdk_base::crypto::types::SecretsBundle,
729    ) -> Result<(), SecretImportError> {
730        let olm_machine = self.client.olm_machine().await;
731        let olm_machine =
732            olm_machine.as_ref().expect("This should only be called once we have an OlmMachine");
733
734        olm_machine.store().import_secrets_bundle(bundle).await
735    }
736
737    /// Get the status of the private cross signing keys.
738    ///
739    /// This can be used to check which private cross signing keys we have
740    /// stored locally.
741    pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
742        let olm = self.client.olm_machine().await;
743        let machine = olm.as_ref()?;
744        Some(machine.cross_signing_status().await)
745    }
746
747    /// Get all the tracked users we know about
748    ///
749    /// Tracked users are users for which we keep the device list of E2EE
750    /// capable devices up to date.
751    pub async fn tracked_users(&self) -> Result<HashSet<OwnedUserId>, CryptoStoreError> {
752        if let Some(machine) = self.client.olm_machine().await.as_ref() {
753            machine.tracked_users().await
754        } else {
755            Ok(HashSet::new())
756        }
757    }
758
759    /// Get a [`Subscriber`] for the [`VerificationState`].
760    ///
761    /// # Examples
762    ///
763    /// ```no_run
764    /// use matrix_sdk::{encryption, Client};
765    /// use url::Url;
766    ///
767    /// # async {
768    /// let homeserver = Url::parse("http://example.com")?;
769    /// let client = Client::new(homeserver).await?;
770    /// let mut subscriber = client.encryption().verification_state();
771    ///
772    /// let current_value = subscriber.get();
773    ///
774    /// println!("The current verification state is: {current_value:?}");
775    ///
776    /// if let Some(verification_state) = subscriber.next().await {
777    ///     println!("Received verification state update {:?}", verification_state)
778    /// }
779    /// # anyhow::Ok(()) };
780    /// ```
781    pub fn verification_state(&self) -> Subscriber<VerificationState> {
782        self.client.inner.verification_state.subscribe_reset()
783    }
784
785    /// Get a verification object with the given flow id.
786    pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
787        let olm = self.client.olm_machine().await;
788        let olm = olm.as_ref()?;
789        #[allow(clippy::bind_instead_of_map)]
790        olm.get_verification(user_id, flow_id).and_then(|v| match v {
791            matrix_sdk_base::crypto::Verification::SasV1(sas) => {
792                Some(SasVerification { inner: sas, client: self.client.clone() }.into())
793            }
794            #[cfg(feature = "qrcode")]
795            matrix_sdk_base::crypto::Verification::QrV1(qr) => {
796                Some(verification::QrVerification { inner: qr, client: self.client.clone() }.into())
797            }
798            _ => None,
799        })
800    }
801
802    /// Get a `VerificationRequest` object for the given user with the given
803    /// flow id.
804    pub async fn get_verification_request(
805        &self,
806        user_id: &UserId,
807        flow_id: impl AsRef<str>,
808    ) -> Option<VerificationRequest> {
809        let olm = self.client.olm_machine().await;
810        let olm = olm.as_ref()?;
811
812        olm.get_verification_request(user_id, flow_id)
813            .map(|r| VerificationRequest { inner: r, client: self.client.clone() })
814    }
815
816    /// Get a specific device of a user.
817    ///
818    /// # Arguments
819    ///
820    /// * `user_id` - The unique id of the user that the device belongs to.
821    ///
822    /// * `device_id` - The unique id of the device.
823    ///
824    /// Returns a `Device` if one is found and the crypto store didn't throw an
825    /// error.
826    ///
827    /// This will always return None if the client hasn't been logged in.
828    ///
829    /// # Examples
830    ///
831    /// ```no_run
832    /// # use matrix_sdk::{Client, ruma::{device_id, user_id}};
833    /// # use url::Url;
834    /// # async {
835    /// # let alice = user_id!("@alice:example.org");
836    /// # let homeserver = Url::parse("http://example.com")?;
837    /// # let client = Client::new(homeserver).await?;
838    /// if let Some(device) =
839    ///     client.encryption().get_device(alice, device_id!("DEVICEID")).await?
840    /// {
841    ///     println!("{:?}", device.is_verified());
842    ///
843    ///     if !device.is_verified() {
844    ///         let verification = device.request_verification().await?;
845    ///     }
846    /// }
847    /// # anyhow::Ok(()) };
848    /// ```
849    pub async fn get_device(
850        &self,
851        user_id: &UserId,
852        device_id: &DeviceId,
853    ) -> Result<Option<Device>, CryptoStoreError> {
854        let olm = self.client.olm_machine().await;
855        let Some(machine) = olm.as_ref() else { return Ok(None) };
856        let device = machine.get_device(user_id, device_id, None).await?;
857        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
858    }
859
860    /// A convenience method to retrieve your own device from the store.
861    ///
862    /// This is the same as calling [`Encryption::get_device()`] with your own
863    /// user and device ID.
864    ///
865    /// This will always return a device, unless you are not logged in.
866    pub async fn get_own_device(&self) -> Result<Option<Device>, CryptoStoreError> {
867        let olm = self.client.olm_machine().await;
868        let Some(machine) = olm.as_ref() else { return Ok(None) };
869        let device = machine.get_device(machine.user_id(), machine.device_id(), None).await?;
870        Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
871    }
872
873    /// Get a map holding all the devices of an user.
874    ///
875    /// This will always return an empty map if the client hasn't been logged
876    /// in.
877    ///
878    /// # Arguments
879    ///
880    /// * `user_id` - The unique id of the user that the devices belong to.
881    ///
882    /// # Examples
883    ///
884    /// ```no_run
885    /// # use matrix_sdk::{Client, ruma::user_id};
886    /// # use url::Url;
887    /// # async {
888    /// # let alice = user_id!("@alice:example.org");
889    /// # let homeserver = Url::parse("http://example.com")?;
890    /// # let client = Client::new(homeserver).await?;
891    /// let devices = client.encryption().get_user_devices(alice).await?;
892    ///
893    /// for device in devices.devices() {
894    ///     println!("{device:?}");
895    /// }
896    /// # anyhow::Ok(()) };
897    /// ```
898    pub async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices, Error> {
899        let devices = self
900            .client
901            .olm_machine()
902            .await
903            .as_ref()
904            .ok_or(Error::NoOlmMachine)?
905            .get_user_devices(user_id, None)
906            .await?;
907
908        Ok(UserDevices { inner: devices, client: self.client.clone() })
909    }
910
911    /// Get the E2EE identity of a user from the crypto store.
912    ///
913    /// Usually, we only have the E2EE identity of a user locally if the user
914    /// is tracked, meaning that we are both members of the same encrypted room.
915    ///
916    /// To get the E2EE identity of a user even if it is not available locally
917    /// use [`Encryption::request_user_identity()`].
918    ///
919    /// # Arguments
920    ///
921    /// * `user_id` - The unique id of the user that the identity belongs to.
922    ///
923    /// Returns a `UserIdentity` if one is found and the crypto store
924    /// didn't throw an error.
925    ///
926    /// This will always return None if the client hasn't been logged in.
927    ///
928    /// # Examples
929    ///
930    /// ```no_run
931    /// # use matrix_sdk::{Client, ruma::user_id};
932    /// # use url::Url;
933    /// # async {
934    /// # let alice = user_id!("@alice:example.org");
935    /// # let homeserver = Url::parse("http://example.com")?;
936    /// # let client = Client::new(homeserver).await?;
937    /// let user = client.encryption().get_user_identity(alice).await?;
938    ///
939    /// if let Some(user) = user {
940    ///     println!("{:?}", user.is_verified());
941    ///
942    ///     let verification = user.request_verification().await?;
943    /// }
944    /// # anyhow::Ok(()) };
945    /// ```
946    pub async fn get_user_identity(
947        &self,
948        user_id: &UserId,
949    ) -> Result<Option<UserIdentity>, CryptoStoreError> {
950        let olm = self.client.olm_machine().await;
951        let Some(olm) = olm.as_ref() else { return Ok(None) };
952        let identity = olm.get_identity(user_id, None).await?;
953
954        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
955    }
956
957    /// Get the E2EE identity of a user from the homeserver.
958    ///
959    /// The E2EE identity returned is always guaranteed to be up-to-date. If the
960    /// E2EE identity is not found, it should mean that the user did not set
961    /// up cross-signing.
962    ///
963    /// If you want the E2EE identity of a user without making a request to the
964    /// homeserver, use [`Encryption::get_user_identity()`] instead.
965    ///
966    /// # Arguments
967    ///
968    /// * `user_id` - The ID of the user that the identity belongs to.
969    ///
970    /// Returns a [`UserIdentity`] if one is found. Returns an error if there
971    /// was an issue with the crypto store or with the request to the
972    /// homeserver.
973    ///
974    /// This will always return `None` if the client hasn't been logged in.
975    ///
976    /// # Examples
977    ///
978    /// ```no_run
979    /// # use matrix_sdk::{Client, ruma::user_id};
980    /// # use url::Url;
981    /// # async {
982    /// # let alice = user_id!("@alice:example.org");
983    /// # let homeserver = Url::parse("http://example.com")?;
984    /// # let client = Client::new(homeserver).await?;
985    /// let user = client.encryption().request_user_identity(alice).await?;
986    ///
987    /// if let Some(user) = user {
988    ///     println!("User is verified: {:?}", user.is_verified());
989    ///
990    ///     let verification = user.request_verification().await?;
991    /// }
992    /// # anyhow::Ok(()) };
993    /// ```
994    pub async fn request_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
995        let olm = self.client.olm_machine().await;
996        let Some(olm) = olm.as_ref() else { return Ok(None) };
997
998        let (request_id, request) = olm.query_keys_for_users(iter::once(user_id));
999        self.client.keys_query(&request_id, request.device_keys).await?;
1000
1001        let identity = olm.get_identity(user_id, None).await?;
1002        Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1003    }
1004
1005    /// Returns a stream of device updates, allowing users to listen for
1006    /// notifications about new or changed devices.
1007    ///
1008    /// The stream produced by this method emits updates whenever a new device
1009    /// is discovered or when an existing device's information is changed. Users
1010    /// can subscribe to this stream and receive updates in real-time.
1011    ///
1012    /// # Examples
1013    ///
1014    /// ```no_run
1015    /// # use matrix_sdk::Client;
1016    /// # use ruma::{device_id, user_id};
1017    /// # use futures_util::{pin_mut, StreamExt};
1018    /// # let client: Client = unimplemented!();
1019    /// # async {
1020    /// let devices_stream = client.encryption().devices_stream().await?;
1021    /// let user_id = client
1022    ///     .user_id()
1023    ///     .expect("We should know our user id after we have logged in");
1024    /// pin_mut!(devices_stream);
1025    ///
1026    /// for device_updates in devices_stream.next().await {
1027    ///     if let Some(user_devices) = device_updates.new.get(user_id) {
1028    ///         for device in user_devices.values() {
1029    ///             println!("A new device has been added {}", device.device_id());
1030    ///         }
1031    ///     }
1032    /// }
1033    /// # anyhow::Ok(()) };
1034    /// ```
1035    pub async fn devices_stream(&self) -> Result<impl Stream<Item = DeviceUpdates>> {
1036        let olm = self.client.olm_machine().await;
1037        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1038        let client = self.client.to_owned();
1039
1040        Ok(olm
1041            .store()
1042            .devices_stream()
1043            .map(move |updates| DeviceUpdates::new(client.to_owned(), updates)))
1044    }
1045
1046    /// Returns a stream of user identity updates, allowing users to listen for
1047    /// notifications about new or changed user identities.
1048    ///
1049    /// The stream produced by this method emits updates whenever a new user
1050    /// identity is discovered or when an existing identities information is
1051    /// changed. Users can subscribe to this stream and receive updates in
1052    /// real-time.
1053    ///
1054    /// # Examples
1055    ///
1056    /// ```no_run
1057    /// # use matrix_sdk::Client;
1058    /// # use ruma::{device_id, user_id};
1059    /// # use futures_util::{pin_mut, StreamExt};
1060    /// # let client: Client = unimplemented!();
1061    /// # async {
1062    /// let identities_stream =
1063    ///     client.encryption().user_identities_stream().await?;
1064    /// pin_mut!(identities_stream);
1065    ///
1066    /// for identity_updates in identities_stream.next().await {
1067    ///     for (_, identity) in identity_updates.new {
1068    ///         println!("A new identity has been added {}", identity.user_id());
1069    ///     }
1070    /// }
1071    /// # anyhow::Ok(()) };
1072    /// ```
1073    pub async fn user_identities_stream(&self) -> Result<impl Stream<Item = IdentityUpdates>> {
1074        let olm = self.client.olm_machine().await;
1075        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1076        let client = self.client.to_owned();
1077
1078        Ok(olm
1079            .store()
1080            .user_identities_stream()
1081            .map(move |updates| IdentityUpdates::new(client.to_owned(), updates)))
1082    }
1083
1084    /// Create and upload a new cross signing identity.
1085    ///
1086    /// # Arguments
1087    ///
1088    /// * `auth_data` - This request requires user interactive auth, the first
1089    ///   request needs to set this to `None` and will always fail with an
1090    ///   `UiaaResponse`. The response will contain information for the
1091    ///   interactive auth and the same request needs to be made but this time
1092    ///   with some `auth_data` provided.
1093    ///
1094    /// # Examples
1095    ///
1096    /// ```no_run
1097    /// # use std::collections::BTreeMap;
1098    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1099    /// # use url::Url;
1100    /// # use serde_json::json;
1101    /// # async {
1102    /// # let homeserver = Url::parse("http://example.com")?;
1103    /// # let client = Client::new(homeserver).await?;
1104    /// if let Err(e) = client.encryption().bootstrap_cross_signing(None).await {
1105    ///     if let Some(response) = e.as_uiaa_response() {
1106    ///         let mut password = uiaa::Password::new(
1107    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
1108    ///             "wordpass".to_owned(),
1109    ///         );
1110    ///         password.session = response.session.clone();
1111    ///
1112    ///         client
1113    ///             .encryption()
1114    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1115    ///             .await
1116    ///             .expect("Couldn't bootstrap cross signing")
1117    ///     } else {
1118    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1119    ///     }
1120    /// }
1121    /// # anyhow::Ok(()) };
1122    pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData>) -> Result<()> {
1123        let olm = self.client.olm_machine().await;
1124        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1125
1126        let CrossSigningBootstrapRequests {
1127            upload_signing_keys_req,
1128            upload_keys_req,
1129            upload_signatures_req,
1130        } = olm.bootstrap_cross_signing(false).await?;
1131
1132        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1133            auth: auth_data,
1134            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1135            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1136            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1137        });
1138
1139        if let Some(req) = upload_keys_req {
1140            self.client.send_outgoing_request(req).await?;
1141        }
1142        self.client.send(upload_signing_keys_req).await?;
1143        self.client.send(upload_signatures_req).await?;
1144
1145        Ok(())
1146    }
1147
1148    /// Reset the cross-signing keys.
1149    ///
1150    /// # Example
1151    ///
1152    /// ```no_run
1153    /// # use matrix_sdk::{ruma::api::client::uiaa, Client, encryption::CrossSigningResetAuthType};
1154    /// # use url::Url;
1155    /// # async {
1156    /// # let homeserver = Url::parse("http://example.com")?;
1157    /// # let client = Client::new(homeserver).await?;
1158    /// # let user_id = unimplemented!();
1159    /// let encryption = client.encryption();
1160    ///
1161    /// if let Some(handle) = encryption.reset_cross_signing().await? {
1162    ///     match handle.auth_type() {
1163    ///         CrossSigningResetAuthType::Uiaa(uiaa) => {
1164    ///             use matrix_sdk::ruma::api::client::uiaa;
1165    ///
1166    ///             let password = "1234".to_owned();
1167    ///             let mut password = uiaa::Password::new(user_id, password);
1168    ///             password.session = uiaa.session;
1169    ///
1170    ///             handle.auth(Some(uiaa::AuthData::Password(password))).await?;
1171    ///         }
1172    ///         CrossSigningResetAuthType::OAuth(o) => {
1173    ///             println!(
1174    ///                 "To reset your end-to-end encryption cross-signing identity, \
1175    ///                 you first need to approve it at {}",
1176    ///                 o.approval_url
1177    ///             );
1178    ///             handle.auth(None).await?;
1179    ///         }
1180    ///     }
1181    /// }
1182    /// # anyhow::Ok(()) };
1183    /// ```
1184    pub async fn reset_cross_signing(&self) -> Result<Option<CrossSigningResetHandle>> {
1185        let olm = self.client.olm_machine().await;
1186        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1187
1188        let CrossSigningBootstrapRequests {
1189            upload_keys_req,
1190            upload_signing_keys_req,
1191            upload_signatures_req,
1192        } = olm.bootstrap_cross_signing(true).await?;
1193
1194        let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1195            auth: None,
1196            master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1197            self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1198            user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1199        });
1200
1201        if let Some(req) = upload_keys_req {
1202            self.client.send_outgoing_request(req).await?;
1203        }
1204
1205        if let Err(error) = self.client.send(upload_signing_keys_req.clone()).await {
1206            if let Ok(Some(auth_type)) = CrossSigningResetAuthType::new(&error) {
1207                let client = self.client.clone();
1208
1209                Ok(Some(CrossSigningResetHandle::new(
1210                    client,
1211                    upload_signing_keys_req,
1212                    upload_signatures_req,
1213                    auth_type,
1214                )))
1215            } else {
1216                Err(error.into())
1217            }
1218        } else {
1219            self.client.send(upload_signatures_req).await?;
1220
1221            Ok(None)
1222        }
1223    }
1224
1225    /// Query the user's own device keys, if, and only if, we didn't have their
1226    /// identity in the first place.
1227    async fn ensure_initial_key_query(&self) -> Result<()> {
1228        let olm_machine = self.client.olm_machine().await;
1229        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1230
1231        let user_id = olm_machine.user_id();
1232
1233        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1234            let (request_id, request) = olm_machine.query_keys_for_users([olm_machine.user_id()]);
1235            self.client.keys_query(&request_id, request.device_keys).await?;
1236        }
1237
1238        Ok(())
1239    }
1240
1241    /// Create and upload a new cross signing identity, if that has not been
1242    /// done yet.
1243    ///
1244    /// This will only create a new cross-signing identity if the user had never
1245    /// done it before. If the user did it before, then this is a no-op.
1246    ///
1247    /// See also the documentation of [`Self::bootstrap_cross_signing`] for the
1248    /// behavior of this function.
1249    ///
1250    /// # Arguments
1251    ///
1252    /// * `auth_data` - This request requires user interactive auth, the first
1253    ///   request needs to set this to `None` and will always fail with an
1254    ///   `UiaaResponse`. The response will contain information for the
1255    ///   interactive auth and the same request needs to be made but this time
1256    ///   with some `auth_data` provided.
1257    ///
1258    /// # Examples
1259    /// ```no_run
1260    /// # use std::collections::BTreeMap;
1261    /// # use matrix_sdk::{ruma::api::client::uiaa, Client};
1262    /// # use url::Url;
1263    /// # use serde_json::json;
1264    /// # async {
1265    /// # let homeserver = Url::parse("http://example.com")?;
1266    /// # let client = Client::new(homeserver).await?;
1267    /// if let Err(e) = client.encryption().bootstrap_cross_signing_if_needed(None).await {
1268    ///     if let Some(response) = e.as_uiaa_response() {
1269    ///         let mut password = uiaa::Password::new(
1270    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
1271    ///             "wordpass".to_owned(),
1272    ///         );
1273    ///         password.session = response.session.clone();
1274    ///
1275    ///         // Note, on the failed attempt we can use `bootstrap_cross_signing` immediately, to
1276    ///         // avoid checks.
1277    ///         client
1278    ///             .encryption()
1279    ///             .bootstrap_cross_signing(Some(uiaa::AuthData::Password(password)))
1280    ///             .await
1281    ///             .expect("Couldn't bootstrap cross signing")
1282    ///     } else {
1283    ///         panic!("Error during cross signing bootstrap {:#?}", e);
1284    ///     }
1285    /// }
1286    /// # anyhow::Ok(()) };
1287    pub async fn bootstrap_cross_signing_if_needed(
1288        &self,
1289        auth_data: Option<AuthData>,
1290    ) -> Result<()> {
1291        let olm_machine = self.client.olm_machine().await;
1292        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1293        let user_id = olm_machine.user_id();
1294
1295        self.ensure_initial_key_query().await?;
1296
1297        if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1298            self.bootstrap_cross_signing(auth_data).await?;
1299        }
1300
1301        Ok(())
1302    }
1303
1304    /// Export E2EE keys that match the given predicate encrypting them with the
1305    /// given passphrase.
1306    ///
1307    /// # Arguments
1308    ///
1309    /// * `path` - The file path where the exported key file will be saved.
1310    ///
1311    /// * `passphrase` - The passphrase that will be used to encrypt the
1312    ///   exported room keys.
1313    ///
1314    /// * `predicate` - A closure that will be called for every known
1315    ///   `InboundGroupSession`, which represents a room key. If the closure
1316    ///   returns `true` the `InboundGroupSessoin` will be included in the
1317    ///   export, if the closure returns `false` it will not be included.
1318    ///
1319    /// # Panics
1320    ///
1321    /// This method will panic if it isn't run on a Tokio runtime.
1322    ///
1323    /// This method will panic if it can't get enough randomness from the OS to
1324    /// encrypt the exported keys securely.
1325    ///
1326    /// # Examples
1327    ///
1328    /// ```no_run
1329    /// # use std::{path::PathBuf, time::Duration};
1330    /// # use matrix_sdk::{
1331    /// #     Client, config::SyncSettings,
1332    /// #     ruma::room_id,
1333    /// # };
1334    /// # use url::Url;
1335    /// # async {
1336    /// # let homeserver = Url::parse("http://localhost:8080")?;
1337    /// # let mut client = Client::new(homeserver).await?;
1338    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1339    /// // Export all room keys.
1340    /// client
1341    ///     .encryption()
1342    ///     .export_room_keys(path, "secret-passphrase", |_| true)
1343    ///     .await?;
1344    ///
1345    /// // Export only the room keys for a certain room.
1346    /// let path = PathBuf::from("/home/example/e2e-room-keys.txt");
1347    /// let room_id = room_id!("!test:localhost");
1348    ///
1349    /// client
1350    ///     .encryption()
1351    ///     .export_room_keys(path, "secret-passphrase", |s| s.room_id() == room_id)
1352    ///     .await?;
1353    /// # anyhow::Ok(()) };
1354    /// ```
1355    #[cfg(not(target_family = "wasm"))]
1356    pub async fn export_room_keys(
1357        &self,
1358        path: PathBuf,
1359        passphrase: &str,
1360        predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
1361    ) -> Result<()> {
1362        let olm = self.client.olm_machine().await;
1363        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1364
1365        let keys = olm.store().export_room_keys(predicate).await?;
1366        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1367
1368        let encrypt = move || -> Result<()> {
1369            let export: String =
1370                matrix_sdk_base::crypto::encrypt_room_key_export(&keys, &passphrase, 500_000)?;
1371            let mut file = std::fs::File::create(path)?;
1372            file.write_all(&export.into_bytes())?;
1373            Ok(())
1374        };
1375
1376        let task = tokio::task::spawn_blocking(encrypt);
1377        task.await.expect("Task join error")
1378    }
1379
1380    /// Import E2EE keys from the given file path.
1381    ///
1382    /// # Arguments
1383    ///
1384    /// * `path` - The file path where the exported key file will can be found.
1385    ///
1386    /// * `passphrase` - The passphrase that should be used to decrypt the
1387    ///   exported room keys.
1388    ///
1389    /// Returns a tuple of numbers that represent the number of sessions that
1390    /// were imported and the total number of sessions that were found in the
1391    /// key export.
1392    ///
1393    /// # Panics
1394    ///
1395    /// This method will panic if it isn't run on a Tokio runtime.
1396    ///
1397    /// ```no_run
1398    /// # use std::{path::PathBuf, time::Duration};
1399    /// # use matrix_sdk::{
1400    /// #     Client, config::SyncSettings,
1401    /// #     ruma::room_id,
1402    /// # };
1403    /// # use url::Url;
1404    /// # async {
1405    /// # let homeserver = Url::parse("http://localhost:8080")?;
1406    /// # let mut client = Client::new(homeserver).await?;
1407    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
1408    /// let result =
1409    ///     client.encryption().import_room_keys(path, "secret-passphrase").await?;
1410    ///
1411    /// println!(
1412    ///     "Imported {} room keys out of {}",
1413    ///     result.imported_count, result.total_count
1414    /// );
1415    /// # anyhow::Ok(()) };
1416    /// ```
1417    #[cfg(not(target_family = "wasm"))]
1418    pub async fn import_room_keys(
1419        &self,
1420        path: PathBuf,
1421        passphrase: &str,
1422    ) -> Result<RoomKeyImportResult, RoomKeyImportError> {
1423        let olm = self.client.olm_machine().await;
1424        let olm = olm.as_ref().ok_or(RoomKeyImportError::StoreClosed)?;
1425        let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1426
1427        let decrypt = move || {
1428            let file = std::fs::File::open(path)?;
1429            matrix_sdk_base::crypto::decrypt_room_key_export(file, &passphrase)
1430        };
1431
1432        let task = tokio::task::spawn_blocking(decrypt);
1433        let import = task.await.expect("Task join error")?;
1434
1435        let ret = olm.store().import_exported_room_keys(import, |_, _| {}).await?;
1436
1437        self.backups().maybe_trigger_backup();
1438
1439        Ok(ret)
1440    }
1441
1442    /// Receive notifications of room keys being received as a [`Stream`].
1443    ///
1444    /// Each time a room key is updated in any way, an update will be sent to
1445    /// the stream. Updates that happen at the same time are batched into a
1446    /// [`Vec`].
1447    ///
1448    /// If the reader of the stream lags too far behind, an error is broadcast
1449    /// containing the number of skipped items.
1450    ///
1451    /// # Examples
1452    ///
1453    /// ```no_run
1454    /// # use matrix_sdk::Client;
1455    /// # use url::Url;
1456    /// # async {
1457    /// # let homeserver = Url::parse("http://example.com")?;
1458    /// # let client = Client::new(homeserver).await?;
1459    /// use futures_util::StreamExt;
1460    ///
1461    /// let Some(mut room_keys_stream) =
1462    ///     client.encryption().room_keys_received_stream().await
1463    /// else {
1464    ///     return Ok(());
1465    /// };
1466    ///
1467    /// while let Some(update) = room_keys_stream.next().await {
1468    ///     println!("Received room keys {update:?}");
1469    /// }
1470    /// # anyhow::Ok(()) };
1471    /// ```
1472    pub async fn room_keys_received_stream(
1473        &self,
1474    ) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>>> {
1475        let olm = self.client.olm_machine().await;
1476        let olm = olm.as_ref()?;
1477
1478        Some(olm.store().room_keys_received_stream())
1479    }
1480
1481    /// Get the secret storage manager of the client.
1482    pub fn secret_storage(&self) -> SecretStorage {
1483        SecretStorage { client: self.client.to_owned() }
1484    }
1485
1486    /// Get the backups manager of the client.
1487    pub fn backups(&self) -> Backups {
1488        Backups { client: self.client.to_owned() }
1489    }
1490
1491    /// Get the recovery manager of the client.
1492    pub fn recovery(&self) -> Recovery {
1493        Recovery { client: self.client.to_owned() }
1494    }
1495
1496    /// Enables the crypto-store cross-process lock.
1497    ///
1498    /// This may be required if there are multiple processes that may do writes
1499    /// to the same crypto store. In that case, it's necessary to create a
1500    /// lock, so that only one process writes to it, otherwise this may
1501    /// cause confusing issues because of stale data contained in in-memory
1502    /// caches.
1503    ///
1504    /// The provided `lock_value` must be a unique identifier for this process.
1505    /// Check [`Client::cross_process_store_locks_holder_name`] to
1506    /// get the global value.
1507    pub async fn enable_cross_process_store_lock(&self, lock_value: String) -> Result<(), Error> {
1508        // If the lock has already been created, don't recreate it from scratch.
1509        if let Some(prev_lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1510            let prev_holder = prev_lock.lock_holder();
1511            if prev_holder == lock_value {
1512                return Ok(());
1513            }
1514            warn!("Recreating cross-process store lock with a different holder value: prev was {prev_holder}, new is {lock_value}");
1515        }
1516
1517        let olm_machine = self.client.base_client().olm_machine().await;
1518        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1519
1520        let lock =
1521            olm_machine.store().create_store_lock("cross_process_lock".to_owned(), lock_value);
1522
1523        // Gently try to initialize the crypto store generation counter.
1524        //
1525        // If we don't get the lock immediately, then it is already acquired by another
1526        // process, and we'll get to reload next time we acquire the lock.
1527        {
1528            let guard = lock.try_lock_once().await?;
1529            if guard.is_some() {
1530                olm_machine
1531                    .initialize_crypto_store_generation(
1532                        &self.client.locks().crypto_store_generation,
1533                    )
1534                    .await?;
1535            }
1536        }
1537
1538        self.client
1539            .locks()
1540            .cross_process_crypto_store_lock
1541            .set(lock)
1542            .map_err(|_| Error::BadCryptoStoreState)?;
1543
1544        Ok(())
1545    }
1546
1547    /// Maybe reload the `OlmMachine` after acquiring the lock for the first
1548    /// time.
1549    ///
1550    /// Returns the current generation number.
1551    async fn on_lock_newly_acquired(&self) -> Result<u64, Error> {
1552        let olm_machine_guard = self.client.olm_machine().await;
1553        if let Some(olm_machine) = olm_machine_guard.as_ref() {
1554            let (new_gen, generation_number) = olm_machine
1555                .maintain_crypto_store_generation(&self.client.locks().crypto_store_generation)
1556                .await?;
1557            // If the crypto store generation has changed,
1558            if new_gen {
1559                // (get rid of the reference to the current crypto store first)
1560                drop(olm_machine_guard);
1561                // Recreate the OlmMachine.
1562                self.client.base_client().regenerate_olm(None).await?;
1563            }
1564            Ok(generation_number)
1565        } else {
1566            // XXX: not sure this is reachable. Seems like the OlmMachine should always have
1567            // been initialised by the time we get here. Ideally we'd panic, or return an
1568            // error, but for now I'm just adding some logging to check if it
1569            // happens, and returning the magic number 0.
1570            warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised");
1571            Ok(0)
1572        }
1573    }
1574
1575    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1576    /// spin-waits until the lock is available.
1577    ///
1578    /// May reload the `OlmMachine`, after obtaining the lock but not on the
1579    /// first time.
1580    pub async fn spin_lock_store(
1581        &self,
1582        max_backoff: Option<u32>,
1583    ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1584        if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1585            let guard = lock.spin_lock(max_backoff).await?;
1586
1587            let generation = self.on_lock_newly_acquired().await?;
1588
1589            Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation }))
1590        } else {
1591            Ok(None)
1592        }
1593    }
1594
1595    /// If a lock was created with [`Self::enable_cross_process_store_lock`],
1596    /// attempts to lock it once.
1597    ///
1598    /// Returns a guard to the lock, if it was obtained.
1599    pub async fn try_lock_store_once(
1600        &self,
1601    ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1602        if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1603            let maybe_guard = lock.try_lock_once().await?;
1604
1605            let Some(guard) = maybe_guard else {
1606                return Ok(None);
1607            };
1608
1609            let generation = self.on_lock_newly_acquired().await?;
1610
1611            Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation }))
1612        } else {
1613            Ok(None)
1614        }
1615    }
1616
1617    /// Testing purposes only.
1618    #[cfg(any(test, feature = "testing"))]
1619    pub async fn uploaded_key_count(&self) -> Result<u64> {
1620        let olm_machine = self.client.olm_machine().await;
1621        let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?;
1622        Ok(olm_machine.uploaded_key_count().await?)
1623    }
1624
1625    /// Bootstrap encryption and enables event listeners for the E2EE support.
1626    ///
1627    /// Based on the `EncryptionSettings`, this call might:
1628    /// - Bootstrap cross-signing if needed (POST `/device_signing/upload`)
1629    /// - Create a key backup if needed (POST `/room_keys/version`)
1630    /// - Create a secret storage if needed (PUT `/account_data/{type}`)
1631    ///
1632    /// As part of this process, and if needed, the current device keys would be
1633    /// uploaded to the server, new account data would be added, and cross
1634    /// signing keys and signatures might be uploaded.
1635    ///
1636    /// Should be called once we
1637    /// created a [`OlmMachine`], i.e. after logging in.
1638    ///
1639    /// # Arguments
1640    ///
1641    /// * `auth_data` - Some requests may require re-authentication. To prevent
1642    ///   the user from having to re-enter their password (or use other
1643    ///   methods), we can provide the authentication data here. This is
1644    ///   necessary for uploading cross-signing keys. However, please note that
1645    ///   there is a proposal (MSC3967) to remove this requirement, which would
1646    ///   allow for the initial upload of cross-signing keys without
1647    ///   authentication, rendering this parameter obsolete.
1648    pub(crate) fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1649        let mut tasks = self.client.inner.e2ee.tasks.lock();
1650
1651        let this = self.clone();
1652        tasks.setup_e2ee = Some(spawn(async move {
1653            // Update the current state first, so we don't have to wait for the result of
1654            // network requests
1655            this.update_verification_state().await;
1656
1657            if this.settings().auto_enable_cross_signing {
1658                if let Err(e) = this.bootstrap_cross_signing_if_needed(auth_data).await {
1659                    error!("Couldn't bootstrap cross signing {e:?}");
1660                }
1661            }
1662
1663            if let Err(e) = this.backups().setup_and_resume().await {
1664                error!("Couldn't setup and resume backups {e:?}");
1665            }
1666            if let Err(e) = this.recovery().setup().await {
1667                error!("Couldn't setup and resume recovery {e:?}");
1668            }
1669        }));
1670    }
1671
1672    /// Waits for end-to-end encryption initialization tasks to finish, if any
1673    /// was running in the background.
1674    pub async fn wait_for_e2ee_initialization_tasks(&self) {
1675        let task = self.client.inner.e2ee.tasks.lock().setup_e2ee.take();
1676
1677        if let Some(task) = task {
1678            if let Err(err) = task.await {
1679                warn!("Error when initializing backups: {err}");
1680            }
1681        }
1682    }
1683
1684    /// Upload the device keys and initial set of one-time keys to the server.
1685    ///
1686    /// This should only be called when the user logs in for the first time,
1687    /// the method will ensure that other devices see our own device as an
1688    /// end-to-end encryption enabled one.
1689    ///
1690    /// **Warning**: Do not use this method if we're already calling
1691    /// [`Client::send_outgoing_request()`]. This method is intended for
1692    /// explicitly uploading the device keys before starting a sync.
1693    pub(crate) async fn ensure_device_keys_upload(&self) -> Result<()> {
1694        let olm = self.client.olm_machine().await;
1695        let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1696
1697        if let Some((request_id, request)) = olm.upload_device_keys().await? {
1698            self.client.keys_upload(&request_id, &request).await?;
1699
1700            let (request_id, request) = olm.query_keys_for_users([olm.user_id()]);
1701            self.client.keys_query(&request_id, request.device_keys).await?;
1702        }
1703
1704        Ok(())
1705    }
1706
1707    pub(crate) async fn update_state_after_keys_query(&self, response: &get_keys::v3::Response) {
1708        self.recovery().update_state_after_keys_query(response).await;
1709
1710        // Only update the verification_state if our own devices changed
1711        if let Some(user_id) = self.client.user_id() {
1712            let contains_own_device = response.device_keys.contains_key(user_id);
1713
1714            if contains_own_device {
1715                self.update_verification_state().await;
1716            }
1717        }
1718    }
1719
1720    async fn update_verification_state(&self) {
1721        match self.get_own_device().await {
1722            Ok(device) => {
1723                if let Some(device) = device {
1724                    let is_verified = device.is_cross_signed_by_owner();
1725
1726                    if is_verified {
1727                        self.client.inner.verification_state.set(VerificationState::Verified);
1728                    } else {
1729                        self.client.inner.verification_state.set(VerificationState::Unverified);
1730                    }
1731                } else {
1732                    warn!("Couldn't find out own device in the store.");
1733                    self.client.inner.verification_state.set(VerificationState::Unknown);
1734                }
1735            }
1736            Err(error) => {
1737                warn!("Failed retrieving own device: {error}");
1738                self.client.inner.verification_state.set(VerificationState::Unknown);
1739            }
1740        }
1741    }
1742
1743    /// Encrypts then send the given content via the `/sendToDevice` end-point
1744    /// using Olm encryption.
1745    ///
1746    /// If there are a lot of recipient devices multiple `/sendToDevice`
1747    /// requests might be sent out.
1748    ///
1749    /// # Returns
1750    /// A list of failures. The list of devices that couldn't get the messages.
1751    #[cfg(feature = "experimental-send-custom-to-device")]
1752    pub async fn encrypt_and_send_raw_to_device(
1753        &self,
1754        recipient_devices: Vec<&Device>,
1755        event_type: &str,
1756        content: Raw<AnyToDeviceEventContent>,
1757    ) -> Result<Vec<(OwnedUserId, OwnedDeviceId)>> {
1758        let users = recipient_devices.iter().map(|device| device.user_id());
1759
1760        // Will claim one-time-key for users that needs it
1761        // TODO: For later optimisation: This will establish missing olm sessions with
1762        // all this users devices, but we just want for some devices.
1763        self.client.claim_one_time_keys(users).await?;
1764
1765        let olm = self.client.olm_machine().await;
1766        let olm = olm.as_ref().expect("Olm machine wasn't started");
1767
1768        let (requests, withhelds) = olm
1769            .encrypt_content_for_devices(
1770                recipient_devices.into_iter().map(|d| d.deref().clone()).collect(),
1771                event_type,
1772                &content
1773                    .deserialize_as::<serde_json::Value>()
1774                    .expect("Deserialize as Value will always work"),
1775            )
1776            .await?;
1777
1778        let mut failures: Vec<(OwnedUserId, OwnedDeviceId)> = Default::default();
1779
1780        // Push the withhelds in the failures
1781        withhelds.iter().for_each(|(d, _)| {
1782            failures.push((d.user_id().to_owned(), d.device_id().to_owned()));
1783        });
1784
1785        // TODO: parallelize that? it's already grouping 250 devices per chunk.
1786        for request in requests {
1787            let ruma_request = RumaToDeviceRequest::new_raw(
1788                request.event_type.clone(),
1789                request.txn_id.clone(),
1790                request.messages.clone(),
1791            );
1792
1793            let send_result = self
1794                .client
1795                .send_inner(ruma_request, Some(RequestConfig::short_retry()), Default::default())
1796                .await;
1797
1798            // If the sending failed we need to collect the failures to report them
1799            if send_result.is_err() {
1800                // Mark the sending as failed
1801                for (user_id, device_map) in request.messages {
1802                    for device_id in device_map.keys() {
1803                        match device_id {
1804                            DeviceIdOrAllDevices::DeviceId(device_id) => {
1805                                failures.push((user_id.clone(), device_id.to_owned()));
1806                            }
1807                            DeviceIdOrAllDevices::AllDevices => {
1808                                // Cannot happen in this case
1809                            }
1810                        }
1811                    }
1812                }
1813            }
1814        }
1815
1816        Ok(failures)
1817    }
1818}
1819
1820#[cfg(all(test, not(target_family = "wasm")))]
1821mod tests {
1822    use std::{
1823        ops::Not,
1824        sync::{
1825            atomic::{AtomicBool, Ordering},
1826            Arc,
1827        },
1828        time::Duration,
1829    };
1830
1831    use matrix_sdk_test::{
1832        async_test, test_json, GlobalAccountDataTestEvent, JoinedRoomBuilder, StateTestEvent,
1833        SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
1834    };
1835    use ruma::{
1836        event_id,
1837        events::{reaction::ReactionEventContent, relation::Annotation},
1838        user_id,
1839    };
1840    use serde_json::json;
1841    use wiremock::{
1842        matchers::{header, method, path_regex},
1843        Mock, MockServer, Request, ResponseTemplate,
1844    };
1845
1846    use crate::{
1847        assert_next_matches_with_timeout,
1848        config::RequestConfig,
1849        encryption::{OAuthCrossSigningResetInfo, VerificationState},
1850        test_utils::{
1851            client::mock_matrix_session, logged_in_client, no_retry_test_client, set_client_session,
1852        },
1853        Client,
1854    };
1855
1856    #[async_test]
1857    async fn test_reaction_sending() {
1858        let server = MockServer::start().await;
1859        let client = logged_in_client(Some(server.uri())).await;
1860
1861        let event_id = event_id!("$2:example.org");
1862
1863        Mock::given(method("GET"))
1864            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
1865            .and(header("authorization", "Bearer 1234"))
1866            .respond_with(
1867                ResponseTemplate::new(200)
1868                    .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
1869            )
1870            .mount(&server)
1871            .await;
1872
1873        Mock::given(method("PUT"))
1874            .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/m\.reaction/.*".to_owned()))
1875            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1876                "event_id": event_id,
1877            })))
1878            .mount(&server)
1879            .await;
1880
1881        let response = SyncResponseBuilder::default()
1882            .add_joined_room(
1883                JoinedRoomBuilder::default()
1884                    .add_state_event(StateTestEvent::Member)
1885                    .add_state_event(StateTestEvent::PowerLevels)
1886                    .add_state_event(StateTestEvent::Encryption),
1887            )
1888            .build_sync_response();
1889
1890        client.base_client().receive_sync_response(response).await.unwrap();
1891
1892        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
1893        assert!(room
1894            .latest_encryption_state()
1895            .await
1896            .expect("Getting encryption state")
1897            .is_encrypted());
1898
1899        let event_id = event_id!("$1:example.org");
1900        let reaction = ReactionEventContent::new(Annotation::new(event_id.into(), "🐈".to_owned()));
1901        room.send(reaction).await.expect("Sending the reaction should not fail");
1902
1903        room.send_raw("m.reaction", json!({})).await.expect("Sending the reaction should not fail");
1904    }
1905
1906    #[async_test]
1907    async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
1908        let server = MockServer::start().await;
1909        let client = logged_in_client(Some(server.uri())).await;
1910        // This is the user ID that is inside MemberAdditional.
1911        // Note the confusing username, so we can share
1912        // GlobalAccountDataTestEvent::Direct with the invited test.
1913        let user_id = user_id!("@invited:localhost");
1914
1915        // When we receive a sync response saying "invited" is invited to a DM
1916        let response = SyncResponseBuilder::default()
1917            .add_joined_room(
1918                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberAdditional),
1919            )
1920            .add_global_account_data_event(GlobalAccountDataTestEvent::Direct)
1921            .build_sync_response();
1922        client.base_client().receive_sync_response(response).await.unwrap();
1923
1924        // Then get_dm_room finds this room
1925        let found_room = client.get_dm_room(user_id).expect("DM not found!");
1926        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
1927    }
1928
1929    #[async_test]
1930    async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
1931        let server = MockServer::start().await;
1932        let client = logged_in_client(Some(server.uri())).await;
1933        // This is the user ID that is inside MemberInvite
1934        let user_id = user_id!("@invited:localhost");
1935
1936        // When we receive a sync response saying "invited" is invited to a DM
1937        let response = SyncResponseBuilder::default()
1938            .add_joined_room(
1939                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberInvite),
1940            )
1941            .add_global_account_data_event(GlobalAccountDataTestEvent::Direct)
1942            .build_sync_response();
1943        client.base_client().receive_sync_response(response).await.unwrap();
1944
1945        // Then get_dm_room finds this room
1946        let found_room = client.get_dm_room(user_id).expect("DM not found!");
1947        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
1948    }
1949
1950    #[async_test]
1951    async fn test_get_dm_room_still_finds_left_room() {
1952        // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
1953        // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
1954
1955        let server = MockServer::start().await;
1956        let client = logged_in_client(Some(server.uri())).await;
1957        // This is the user ID that is inside MemberAdditional.
1958        // Note the confusing username, so we can share
1959        // GlobalAccountDataTestEvent::Direct with the invited test.
1960        let user_id = user_id!("@invited:localhost");
1961
1962        // When we receive a sync response saying "invited" is invited to a DM
1963        let response = SyncResponseBuilder::default()
1964            .add_joined_room(
1965                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberLeave),
1966            )
1967            .add_global_account_data_event(GlobalAccountDataTestEvent::Direct)
1968            .build_sync_response();
1969        client.base_client().receive_sync_response(response).await.unwrap();
1970
1971        // Then get_dm_room finds this room
1972        let found_room = client.get_dm_room(user_id).expect("DM not found!");
1973        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
1974    }
1975
1976    #[cfg(feature = "sqlite")]
1977    #[async_test]
1978    async fn test_generation_counter_invalidates_olm_machine() {
1979        // Create two clients using the same sqlite database.
1980
1981        use matrix_sdk_base::store::RoomLoadSettings;
1982        let sqlite_path = std::env::temp_dir().join("generation_counter_sqlite.db");
1983        let session = mock_matrix_session();
1984
1985        let client1 = Client::builder()
1986            .homeserver_url("http://localhost:1234")
1987            .request_config(RequestConfig::new().disable_retry())
1988            .sqlite_store(&sqlite_path, None)
1989            .build()
1990            .await
1991            .unwrap();
1992        client1
1993            .matrix_auth()
1994            .restore_session(session.clone(), RoomLoadSettings::default())
1995            .await
1996            .unwrap();
1997
1998        let client2 = Client::builder()
1999            .homeserver_url("http://localhost:1234")
2000            .request_config(RequestConfig::new().disable_retry())
2001            .sqlite_store(sqlite_path, None)
2002            .build()
2003            .await
2004            .unwrap();
2005        client2.matrix_auth().restore_session(session, RoomLoadSettings::default()).await.unwrap();
2006
2007        // When the lock isn't enabled, any attempt at locking won't return a guard.
2008        let guard = client1.encryption().try_lock_store_once().await.unwrap();
2009        assert!(guard.is_none());
2010
2011        client1.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2012        client2.encryption().enable_cross_process_store_lock("client2".to_owned()).await.unwrap();
2013
2014        // One client can take the lock.
2015        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2016        assert!(acquired1.is_some());
2017
2018        // Keep the olm machine, so we can see if it's changed later, by comparing Arcs.
2019        let initial_olm_machine =
2020            client1.olm_machine().await.clone().expect("must have an olm machine");
2021
2022        // Also enable backup to check that new machine has the same backup keys.
2023        let decryption_key = matrix_sdk_base::crypto::store::types::BackupDecryptionKey::new()
2024            .expect("Can't create new recovery key");
2025        let backup_key = decryption_key.megolm_v1_public_key();
2026        backup_key.set_version("1".to_owned());
2027        initial_olm_machine
2028            .backup_machine()
2029            .save_decryption_key(Some(decryption_key.to_owned()), Some("1".to_owned()))
2030            .await
2031            .expect("Should save");
2032
2033        initial_olm_machine.backup_machine().enable_backup_v1(backup_key.clone()).await.unwrap();
2034
2035        assert!(client1.encryption().backups().are_enabled().await);
2036
2037        // The other client can't take the lock too.
2038        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2039        assert!(acquired2.is_none());
2040
2041        // Now have the first client release the lock,
2042        drop(acquired1);
2043        tokio::time::sleep(Duration::from_millis(100)).await;
2044
2045        // And re-take it.
2046        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2047        assert!(acquired1.is_some());
2048
2049        // In that case, the Olm Machine shouldn't change.
2050        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2051        assert!(initial_olm_machine.same_as(&olm_machine));
2052
2053        // Ok, release again.
2054        drop(acquired1);
2055        tokio::time::sleep(Duration::from_millis(100)).await;
2056
2057        // Client2 can acquire the lock.
2058        let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2059        assert!(acquired2.is_some());
2060
2061        // And then release it.
2062        drop(acquired2);
2063        tokio::time::sleep(Duration::from_millis(100)).await;
2064
2065        // Client1 can acquire it again,
2066        let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2067        assert!(acquired1.is_some());
2068
2069        // But now its olm machine has been invalidated and thus regenerated!
2070        let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2071
2072        assert!(!initial_olm_machine.same_as(&olm_machine));
2073
2074        let backup_key_new = olm_machine.backup_machine().get_backup_keys().await.unwrap();
2075        assert!(backup_key_new.decryption_key.is_some());
2076        assert_eq!(
2077            backup_key_new.decryption_key.unwrap().megolm_v1_public_key().to_base64(),
2078            backup_key.to_base64()
2079        );
2080        assert!(client1.encryption().backups().are_enabled().await);
2081    }
2082
2083    #[cfg(feature = "sqlite")]
2084    #[async_test]
2085    async fn test_generation_counter_no_spurious_invalidation() {
2086        // Create two clients using the same sqlite database.
2087
2088        use matrix_sdk_base::store::RoomLoadSettings;
2089        let sqlite_path =
2090            std::env::temp_dir().join("generation_counter_no_spurious_invalidations.db");
2091        let session = mock_matrix_session();
2092
2093        let client = Client::builder()
2094            .homeserver_url("http://localhost:1234")
2095            .request_config(RequestConfig::new().disable_retry())
2096            .sqlite_store(&sqlite_path, None)
2097            .build()
2098            .await
2099            .unwrap();
2100        client
2101            .matrix_auth()
2102            .restore_session(session.clone(), RoomLoadSettings::default())
2103            .await
2104            .unwrap();
2105
2106        let initial_olm_machine = client.olm_machine().await.as_ref().unwrap().clone();
2107
2108        client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2109
2110        // Enabling the lock doesn't update the olm machine.
2111        let after_enabling_lock = client.olm_machine().await.as_ref().unwrap().clone();
2112        assert!(initial_olm_machine.same_as(&after_enabling_lock));
2113
2114        {
2115            // Simulate that another client hold the lock before.
2116            let client2 = Client::builder()
2117                .homeserver_url("http://localhost:1234")
2118                .request_config(RequestConfig::new().disable_retry())
2119                .sqlite_store(sqlite_path, None)
2120                .build()
2121                .await
2122                .unwrap();
2123            client2
2124                .matrix_auth()
2125                .restore_session(session, RoomLoadSettings::default())
2126                .await
2127                .unwrap();
2128
2129            client2
2130                .encryption()
2131                .enable_cross_process_store_lock("client2".to_owned())
2132                .await
2133                .unwrap();
2134
2135            let guard = client2.encryption().spin_lock_store(None).await.unwrap();
2136            assert!(guard.is_some());
2137
2138            drop(guard);
2139            tokio::time::sleep(Duration::from_millis(100)).await;
2140        }
2141
2142        {
2143            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2144            assert!(acquired.is_some());
2145        }
2146
2147        // Taking the lock the first time will update the olm machine.
2148        let after_taking_lock_first_time = client.olm_machine().await.as_ref().unwrap().clone();
2149        assert!(!initial_olm_machine.same_as(&after_taking_lock_first_time));
2150
2151        {
2152            let acquired = client.encryption().try_lock_store_once().await.unwrap();
2153            assert!(acquired.is_some());
2154        }
2155
2156        // Re-taking the lock doesn't update the olm machine.
2157        let after_taking_lock_second_time = client.olm_machine().await.as_ref().unwrap().clone();
2158        assert!(after_taking_lock_first_time.same_as(&after_taking_lock_second_time));
2159    }
2160
2161    #[async_test]
2162    async fn test_update_verification_state_is_updated_before_any_requests_happen() {
2163        // Given a client and a server
2164        let client = no_retry_test_client(None).await;
2165        let server = MockServer::start().await;
2166
2167        // When we subscribe to its verification state
2168        let mut verification_state = client.encryption().verification_state();
2169
2170        // We can get its initial value, and it's Unknown
2171        assert_next_matches_with_timeout!(verification_state, VerificationState::Unknown);
2172
2173        // We set up a mocked request to check this endpoint is not called before
2174        // reading the new state
2175        let keys_requested = Arc::new(AtomicBool::new(false));
2176        let inner_bool = keys_requested.clone();
2177
2178        Mock::given(method("GET"))
2179            .and(path_regex(
2180                r"/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
2181            ))
2182            .respond_with(move |_req: &Request| {
2183                inner_bool.fetch_or(true, Ordering::SeqCst);
2184                ResponseTemplate::new(200).set_body_json(json!({}))
2185            })
2186            .mount(&server)
2187            .await;
2188
2189        // When the session is initialised and the encryption tasks spawn
2190        set_client_session(&client).await;
2191
2192        // Then we can get an updated value without waiting for any network requests
2193        assert!(keys_requested.load(Ordering::SeqCst).not());
2194        assert_next_matches_with_timeout!(verification_state, VerificationState::Unverified);
2195    }
2196
2197    #[test]
2198    fn test_oauth_reset_info_from_uiaa_info() {
2199        let auth_info = json!({
2200            "session": "dummy",
2201            "flows": [
2202                {
2203                    "stages": [
2204                        "org.matrix.cross_signing_reset"
2205                    ]
2206                }
2207            ],
2208            "params": {
2209                "org.matrix.cross_signing_reset": {
2210                    "url": "https://example.org/account/account?action=org.matrix.cross_signing_reset"
2211                }
2212            },
2213            "msg": "To reset..."
2214        });
2215
2216        let auth_info = serde_json::from_value(auth_info)
2217            .expect("We should be able to deserialize the UiaaInfo");
2218        OAuthCrossSigningResetInfo::from_auth_info(&auth_info)
2219            .expect("We should be able to fetch the cross-signing reset info from the auth info");
2220    }
2221}