1#![doc = include_str!("../docs/encryption.md")]
17#![cfg_attr(target_family = "wasm", allow(unused_imports))]
18
19#[cfg(feature = "experimental-send-custom-to-device")]
20use std::ops::Deref;
21use std::{
22 collections::{BTreeMap, HashSet},
23 io::{Cursor, Read, Write},
24 iter,
25 path::{Path, PathBuf},
26 str::FromStr,
27 sync::Arc,
28 time::Duration,
29};
30
31use eyeball::{SharedObservable, Subscriber};
32use futures_core::Stream;
33use futures_util::{
34 future::try_join,
35 stream::{self, StreamExt},
36};
37#[cfg(feature = "experimental-send-custom-to-device")]
38use matrix_sdk_base::crypto::CollectStrategy;
39use matrix_sdk_base::{
40 StateStoreDataKey, StateStoreDataValue,
41 cross_process_lock::{AcquireCrossProcessLockFn, CrossProcessLock, CrossProcessLockError},
42 crypto::{
43 CrossSigningBootstrapRequests, OlmMachine,
44 store::{
45 LockableCryptoStore, SecretImportError,
46 types::{RoomKeyBundleInfo, RoomKeyInfo},
47 },
48 types::{
49 SecretsBundle, SignedKey,
50 requests::{
51 OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
52 },
53 },
54 },
55 sleep::sleep,
56};
57use matrix_sdk_common::{executor::spawn, locks::Mutex as StdMutex};
58use ruma::{
59 DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, TransactionId, UserId,
60 api::{
61 client::{
62 keys::{
63 get_keys, upload_keys, upload_signatures::v3::Request as UploadSignaturesRequest,
64 upload_signing_keys::v3::Request as UploadSigningKeysRequest,
65 },
66 message::send_message_event,
67 to_device::send_event_to_device::v3::{
68 Request as RumaToDeviceRequest, Response as ToDeviceResponse,
69 },
70 uiaa::{AuthData, AuthType, OAuthParams, UiaaInfo},
71 },
72 error::{ErrorBody, StandardErrorBody},
73 },
74 assign,
75 events::room::{
76 MediaSource, ThumbnailInfo,
77 member::{MembershipChange, OriginalSyncRoomMemberEvent},
78 },
79};
80#[cfg(feature = "experimental-send-custom-to-device")]
81use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices};
82use serde::{Deserialize, de::Error as _};
83use tasks::BundleReceiverTask;
84use tokio::sync::{Mutex, RwLockReadGuard};
85use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
86use tracing::{Span, debug, error, instrument, warn};
87use url::Url;
88use vodozemac::Curve25519PublicKey;
89
90use self::{
91 backups::{Backups, types::BackupClientState},
92 futures::UploadEncryptedFile,
93 identities::{Device, DeviceUpdates, IdentityUpdates, UserDevices, UserIdentity},
94 recovery::{Recovery, RecoveryState},
95 secret_storage::SecretStorage,
96 tasks::{BackupDownloadTask, BackupUploadingTask, ClientTasks},
97 verification::{SasVerification, Verification, VerificationRequest},
98};
99use crate::{
100 Client, Error, HttpError, Result, Room, RumaApiError, TransmissionProgress,
101 attachment::Thumbnail,
102 client::{ClientInner, WeakClient},
103 cross_process_lock::CrossProcessLockGuard,
104 error::HttpResult,
105};
106
107pub mod backups;
108pub mod futures;
109pub mod identities;
110pub mod recovery;
111pub mod secret_storage;
112pub(crate) mod tasks;
113pub mod verification;
114
115pub use matrix_sdk_base::crypto::{
116 CrossSigningStatus, CryptoStoreError, DecryptorError, EventError, KeyExportError, LocalTrust,
117 MediaEncryptionInfo, MegolmError, OlmError, RoomKeyImportResult, SessionCreationError,
118 SignatureError, VERSION,
119 olm::{
120 SessionCreationError as MegolmSessionCreationError,
121 SessionExportError as OlmSessionExportError,
122 },
123 vodozemac,
124};
125use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
126
127#[cfg(feature = "experimental-send-custom-to-device")]
128use crate::config::RequestConfig;
129pub use crate::error::RoomKeyImportError;
130
131#[cfg(feature = "sqlite")]
134#[derive(Debug, thiserror::Error)]
135pub enum BundleExportError {
136 #[error(transparent)]
138 OpenStoreError(#[from] matrix_sdk_sqlite::OpenStoreError),
139 #[error(transparent)]
141 StoreError(#[from] CryptoStoreError),
142 #[error(transparent)]
145 SecretExport(#[from] matrix_sdk_base::crypto::store::SecretsBundleExportError),
146}
147
148#[derive(Debug, thiserror::Error)]
151pub enum BundleImportError {
152 #[error(transparent)]
154 SecretImport(#[from] SecretImportError),
155 #[error(transparent)]
157 DeviceKeys(#[from] Error),
158}
159
160#[cfg(feature = "sqlite")]
169pub async fn export_secrets_bundle_from_store(
170 database_path: impl AsRef<Path>,
171 passphrase: Option<&str>,
172) -> std::result::Result<Option<(OwnedUserId, SecretsBundle)>, BundleExportError> {
173 use matrix_sdk_base::crypto::store::CryptoStore;
174
175 let store = matrix_sdk_sqlite::SqliteCryptoStore::open(database_path, passphrase).await?;
176 let account =
177 store.load_account().await.map_err(|e| BundleExportError::StoreError(e.into()))?;
178
179 if let Some(account) = account {
180 let machine = OlmMachine::with_store(&account.user_id, &account.device_id, store, None)
181 .await
182 .map_err(BundleExportError::StoreError)?;
183
184 let bundle = machine.store().export_secrets_bundle().await?;
185
186 Ok(Some((account.user_id.to_owned(), bundle)))
187 } else {
188 Ok(None)
189 }
190}
191
192pub(crate) struct EncryptionData {
194 pub tasks: StdMutex<ClientTasks>,
197
198 pub encryption_settings: EncryptionSettings,
200
201 pub backup_state: BackupClientState,
203
204 pub recovery_state: SharedObservable<RecoveryState>,
206}
207
208impl EncryptionData {
209 pub fn new(encryption_settings: EncryptionSettings) -> Self {
210 Self {
211 encryption_settings,
212
213 tasks: StdMutex::new(Default::default()),
214 backup_state: Default::default(),
215 recovery_state: Default::default(),
216 }
217 }
218
219 pub fn initialize_tasks(&self, client: &Arc<ClientInner>) {
220 let weak_client = WeakClient::from_inner(client);
221
222 let mut tasks = self.tasks.lock();
223 tasks.upload_room_keys = Some(BackupUploadingTask::new(weak_client.clone()));
224
225 if self.encryption_settings.backup_download_strategy
226 == BackupDownloadStrategy::AfterDecryptionFailure
227 {
228 tasks.download_room_keys = Some(BackupDownloadTask::new(weak_client));
229 }
230 }
231
232 pub fn initialize_recovery_state_update_task(&self, client: &Client) {
238 let mut guard = self.tasks.lock();
239
240 let future = Recovery::update_state_after_backup_state_change(client);
241 let join_handle = spawn(future);
242
243 guard.update_recovery_state_after_backup = Some(join_handle);
244 }
245}
246
247#[derive(Clone, Copy, Debug, Default)]
249pub struct EncryptionSettings {
250 pub auto_enable_cross_signing: bool,
256
257 pub backup_download_strategy: BackupDownloadStrategy,
262
263 pub auto_enable_backups: bool,
265}
266
267#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
269#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
270pub enum BackupDownloadStrategy {
271 OneShot,
282
283 AfterDecryptionFailure,
285
286 #[default]
291 Manual,
292}
293
294#[derive(Clone, Copy, Debug, Eq, PartialEq)]
299pub enum VerificationState {
300 Unknown,
302 Verified,
305 Unverified,
307}
308
309#[derive(Debug)]
320pub struct CrossSigningResetHandle {
321 client: Client,
322 upload_request: UploadSigningKeysRequest,
323 signatures_request: UploadSignaturesRequest,
324 auth_type: CrossSigningResetAuthType,
325 is_cancelled: Mutex<bool>,
326}
327
328impl CrossSigningResetHandle {
329 pub fn new(
331 client: Client,
332 upload_request: UploadSigningKeysRequest,
333 signatures_request: UploadSignaturesRequest,
334 auth_type: CrossSigningResetAuthType,
335 ) -> Self {
336 Self {
337 client,
338 upload_request,
339 signatures_request,
340 auth_type,
341 is_cancelled: Mutex::new(false),
342 }
343 }
344
345 pub fn auth_type(&self) -> &CrossSigningResetAuthType {
348 &self.auth_type
349 }
350
351 pub async fn auth(&self, auth: Option<AuthData>) -> Result<()> {
355 const RETRY_EVERY: Duration = Duration::from_millis(500);
357
358 const TIMEOUT: Duration = Duration::from_mins(2);
360
361 tokio::time::timeout(TIMEOUT, async {
362 let mut upload_request = self.upload_request.clone();
363 upload_request.auth = auth;
364
365 debug!(
366 "Repeatedly PUTting to keys/device_signing/upload until it works \
367 or we hit a permanent failure."
368 );
369 while let Err(e) = self.client.send(upload_request.clone()).await {
370 if *self.is_cancelled.lock().await {
371 return Ok(());
372 }
373
374 match e.as_uiaa_response() {
375 Some(uiaa_info) => {
376 if uiaa_info.auth_error.is_some() {
377 return Err(e.into());
378 }
379 }
380 None => return Err(e.into()),
381 }
382
383 debug!(
384 "PUT to keys/device_signing/upload failed with 401. Retrying after \
385 a short delay."
386 );
387 sleep(RETRY_EVERY).await;
388 }
389
390 self.client.send(self.signatures_request.clone()).await?;
391
392 Ok(())
393 })
394 .await
395 .unwrap_or_else(|_| {
396 warn!("Timed out waiting for keys/device_signing/upload to succeed.");
397 Err(Error::Timeout)
398 })
399 }
400
401 pub async fn cancel(&self) {
403 *self.is_cancelled.lock().await = true;
404 }
405}
406
407#[derive(Debug, Clone)]
410pub enum CrossSigningResetAuthType {
411 Uiaa(UiaaInfo),
413 OAuth(OAuthCrossSigningResetInfo),
416}
417
418impl CrossSigningResetAuthType {
419 fn new(error: &HttpError) -> Result<Option<Self>> {
420 if let Some(auth_info) = error.as_uiaa_response() {
421 if let Ok(Some(auth_info)) = OAuthCrossSigningResetInfo::from_auth_info(auth_info) {
422 Ok(Some(CrossSigningResetAuthType::OAuth(auth_info)))
423 } else {
424 Ok(Some(CrossSigningResetAuthType::Uiaa(auth_info.clone())))
425 }
426 } else {
427 Ok(None)
428 }
429 }
430}
431
432#[derive(Debug, Clone, Deserialize)]
435pub struct OAuthCrossSigningResetInfo {
436 pub approval_url: Url,
438}
439
440impl OAuthCrossSigningResetInfo {
441 fn from_auth_info(auth_info: &UiaaInfo) -> Result<Option<Self>> {
442 let Some(parameters) = auth_info.params::<OAuthParams>(&AuthType::OAuth)? else {
443 return Ok(None);
444 };
445
446 Ok(Some(OAuthCrossSigningResetInfo { approval_url: parameters.url.as_str().try_into()? }))
447 }
448}
449
450#[derive(Clone, Debug)]
453pub struct DuplicateOneTimeKeyErrorMessage {
454 pub old_key: Curve25519PublicKey,
456 pub new_key: Curve25519PublicKey,
458}
459
460impl FromStr for DuplicateOneTimeKeyErrorMessage {
461 type Err = serde_json::Error;
462
463 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
464 let mut split = s.split_terminator(';');
468
469 let old_key = split
470 .next()
471 .ok_or(serde_json::Error::custom("Old key is missing in the error message"))?;
472 let new_key = split
473 .next()
474 .ok_or(serde_json::Error::custom("New key is missing in the error message"))?;
475
476 let old_key_index = old_key
479 .find("Old key:")
480 .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
481
482 let old_key = old_key[old_key_index..]
483 .trim()
484 .strip_prefix("Old key:")
485 .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
486
487 let new_key = new_key
490 .trim()
491 .strip_prefix("new key:")
492 .ok_or(serde_json::Error::custom("New key is missing the prefix"))?;
493
494 let new_key = new_key.replace("'", "\"");
497
498 let old_key: SignedKey = serde_json::from_str(old_key)?;
500 let new_key: SignedKey = serde_json::from_str(&new_key)?;
501
502 let old_key = old_key.key();
504 let new_key = new_key.key();
505
506 Ok(Self { old_key, new_key })
507 }
508}
509
510impl Client {
511 pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
512 self.base_client().olm_machine().await
513 }
514
515 pub(crate) async fn mark_request_as_sent(
516 &self,
517 request_id: &TransactionId,
518 response: impl Into<matrix_sdk_base::crypto::types::requests::AnyIncomingResponse<'_>>,
519 ) -> Result<(), matrix_sdk_base::Error> {
520 Ok(self
521 .olm_machine()
522 .await
523 .as_ref()
524 .expect(
525 "We should have an olm machine once we try to mark E2EE related requests as sent",
526 )
527 .mark_request_as_sent(request_id, response)
528 .await?)
529 }
530
531 #[instrument(skip(self, device_keys))]
537 pub(crate) async fn keys_query(
538 &self,
539 request_id: &TransactionId,
540 device_keys: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
541 ) -> Result<get_keys::v3::Response> {
542 let request = assign!(get_keys::v3::Request::new(), { device_keys });
543
544 let response = self.send(request).await?;
545 self.mark_request_as_sent(request_id, &response).await?;
546 self.encryption().update_state_after_keys_query(&response).await;
547
548 Ok(response)
549 }
550
551 pub fn upload_encrypted_file<'a, R: Read + ?Sized + 'a>(
585 &'a self,
586 reader: &'a mut R,
587 ) -> UploadEncryptedFile<'a, R> {
588 UploadEncryptedFile::new(self, reader)
589 }
590
591 pub(crate) async fn upload_encrypted_media_and_thumbnail(
594 &self,
595 data: &[u8],
596 thumbnail: Option<Thumbnail>,
597 send_progress: SharedObservable<TransmissionProgress>,
598 ) -> Result<(MediaSource, Option<(MediaSource, Box<ThumbnailInfo>)>)> {
599 let upload_thumbnail = self.upload_encrypted_thumbnail(thumbnail, send_progress.clone());
600
601 let upload_attachment = async {
602 let mut cursor = Cursor::new(data);
603 self.upload_encrypted_file(&mut cursor)
604 .with_send_progress_observable(send_progress)
605 .await
606 };
607
608 let (thumbnail, file) = try_join(upload_thumbnail, upload_attachment).await?;
609
610 Ok((MediaSource::Encrypted(Box::new(file)), thumbnail))
611 }
612
613 async fn upload_encrypted_thumbnail(
616 &self,
617 thumbnail: Option<Thumbnail>,
618 send_progress: SharedObservable<TransmissionProgress>,
619 ) -> Result<Option<(MediaSource, Box<ThumbnailInfo>)>> {
620 let Some(thumbnail) = thumbnail else {
621 return Ok(None);
622 };
623
624 let (data, _, thumbnail_info) = thumbnail.into_parts();
625 let mut cursor = Cursor::new(data);
626
627 let file = self
628 .upload_encrypted_file(&mut cursor)
629 .with_send_progress_observable(send_progress)
630 .await?;
631
632 Ok(Some((MediaSource::Encrypted(Box::new(file)), thumbnail_info)))
633 }
634
635 pub(crate) async fn claim_one_time_keys(
641 &self,
642 users: impl Iterator<Item = &UserId>,
643 ) -> Result<()> {
644 let _lock = self.locks().key_claim_lock.lock().await;
645
646 if let Some((request_id, request)) = self
647 .olm_machine()
648 .await
649 .as_ref()
650 .ok_or(Error::NoOlmMachine)?
651 .get_missing_sessions(users)
652 .await?
653 {
654 let response = self.send(request).await?;
655 self.mark_request_as_sent(&request_id, &response).await?;
656 }
657
658 Ok(())
659 }
660
661 #[instrument(skip(self, request))]
671 pub(crate) async fn keys_upload(
672 &self,
673 request_id: &TransactionId,
674 request: &upload_keys::v3::Request,
675 ) -> Result<upload_keys::v3::Response> {
676 debug!(
677 device_keys = request.device_keys.is_some(),
678 one_time_key_count = request.one_time_keys.len(),
679 "Uploading public encryption keys",
680 );
681
682 let response = self.send(request.clone()).await?;
683 self.mark_request_as_sent(request_id, &response).await?;
684
685 Ok(response)
686 }
687
688 pub(crate) async fn room_send_helper(
689 &self,
690 request: &RoomMessageRequest,
691 ) -> Result<send_message_event::v3::Response> {
692 let content = request.content.clone();
693 let txn_id = request.txn_id.clone();
694 let room_id = &request.room_id;
695
696 self.get_room(room_id)
697 .expect("Can't send a message to a room that isn't known to the store")
698 .send(*content)
699 .with_transaction_id(txn_id)
700 .await
701 .map(|result| result.response)
702 }
703
704 pub(crate) async fn send_to_device(
705 &self,
706 request: &ToDeviceRequest,
707 ) -> HttpResult<ToDeviceResponse> {
708 let request = RumaToDeviceRequest::new_raw(
709 request.event_type.clone(),
710 request.txn_id.clone(),
711 request.messages.clone(),
712 );
713
714 self.send(request).await
715 }
716
717 pub(crate) async fn send_verification_request(
718 &self,
719 request: OutgoingVerificationRequest,
720 ) -> Result<()> {
721 use matrix_sdk_base::crypto::types::requests::OutgoingVerificationRequest::*;
722
723 match request {
724 ToDevice(t) => {
725 self.send_to_device(&t).await?;
726 }
727 InRoom(r) => {
728 self.room_send_helper(&r).await?;
729 }
730 }
731
732 Ok(())
733 }
734
735 async fn send_outgoing_request(&self, r: OutgoingRequest) -> Result<()> {
736 use matrix_sdk_base::crypto::types::requests::AnyOutgoingRequest;
737
738 match r.request() {
739 AnyOutgoingRequest::KeysQuery(request) => {
740 self.keys_query(r.request_id(), request.device_keys.clone()).await?;
741 }
742 AnyOutgoingRequest::KeysUpload(request) => {
743 let response = self.keys_upload(r.request_id(), request).await;
744
745 if let Err(e) = &response {
746 match e.as_ruma_api_error() {
747 Some(RumaApiError::ClientApi(e)) if e.status_code == 400 => {
748 if let ErrorBody::Standard(StandardErrorBody { message, .. }) = &e.body
749 {
750 {
755 let already_reported = self
756 .state_store()
757 .get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded)
758 .await?
759 .is_some();
760
761 if message.starts_with("One time key") && !already_reported {
762 let error_message =
763 DuplicateOneTimeKeyErrorMessage::from_str(message);
764
765 if let Ok(message) = &error_message {
766 error!(
767 sentry = true,
768 old_key = %message.old_key,
769 new_key = %message.new_key,
770 "Duplicate one-time keys have been uploaded"
771 );
772 } else {
773 error!(
774 sentry = true,
775 "Duplicate one-time keys have been uploaded"
776 );
777 }
778
779 self.state_store()
780 .set_kv_data(
781 StateStoreDataKey::OneTimeKeyAlreadyUploaded,
782 StateStoreDataValue::OneTimeKeyAlreadyUploaded,
783 )
784 .await?;
785
786 if let Err(e) = self
787 .inner
788 .duplicate_key_upload_error_sender
789 .send(error_message.ok())
790 {
791 error!(
792 "Failed to dispatch duplicate key upload error notification: {}",
793 e
794 );
795 }
796 }
797 }
798 }
799 }
800 _ => {}
801 }
802
803 response?;
804 }
805 }
806 AnyOutgoingRequest::ToDeviceRequest(request) => {
807 let response = self.send_to_device(request).await?;
808 self.mark_request_as_sent(r.request_id(), &response).await?;
809 }
810 AnyOutgoingRequest::SignatureUpload(request) => {
811 let response = self.send(request.clone()).await?;
812 self.mark_request_as_sent(r.request_id(), &response).await?;
813 }
814 AnyOutgoingRequest::RoomMessage(request) => {
815 let response = self.room_send_helper(request).await?;
816 self.mark_request_as_sent(r.request_id(), &response).await?;
817 }
818 AnyOutgoingRequest::KeysClaim(request) => {
819 let response = self.send(request.clone()).await?;
820 self.mark_request_as_sent(r.request_id(), &response).await?;
821 }
822 }
823
824 Ok(())
825 }
826
827 #[instrument(skip_all)]
828 pub(crate) async fn send_outgoing_requests(&self) -> Result<()> {
829 const MAX_CONCURRENT_REQUESTS: usize = 20;
830
831 if let Err(e) = self.claim_one_time_keys(iter::empty()).await {
834 warn!("Error while claiming one-time keys {:?}", e);
835 }
836
837 let outgoing_requests = stream::iter(
838 self.olm_machine()
839 .await
840 .as_ref()
841 .ok_or(Error::NoOlmMachine)?
842 .outgoing_requests()
843 .await?,
844 )
845 .map(|r| self.send_outgoing_request(r));
846
847 let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS);
848
849 requests
850 .for_each(|r| async move {
851 match r {
852 Ok(_) => (),
853 Err(e) => warn!(error = ?e, "Error when sending out an outgoing E2EE request"),
854 }
855 })
856 .await;
857
858 Ok(())
859 }
860}
861
862#[cfg(any(feature = "testing", test))]
863impl Client {
864 pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
866 self.olm_machine().await
867 }
868
869 pub fn abort_bundle_receiver_task(&self) {
871 let tasks = self.inner.e2ee.tasks.lock();
872 if let Some(task) = tasks.receive_historic_room_key_bundles.as_ref() {
873 task.abort()
874 }
875 }
876}
877
878#[derive(Debug, Clone)]
882pub struct Encryption {
883 client: Client,
885}
886
887impl Encryption {
888 pub(crate) fn new(client: Client) -> Self {
889 Self { client }
890 }
891
892 pub(crate) fn settings(&self) -> EncryptionSettings {
894 self.client.inner.e2ee.encryption_settings
895 }
896
897 pub async fn ed25519_key(&self) -> Option<String> {
900 self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().ed25519.to_base64())
901 }
902
903 pub async fn curve25519_key(&self) -> Option<Curve25519PublicKey> {
905 self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().curve25519)
906 }
907
908 pub async fn device_creation_timestamp(&self) -> MilliSecondsSinceUnixEpoch {
910 match self.get_own_device().await {
911 Ok(Some(device)) => device.first_time_seen_ts(),
912 _ => MilliSecondsSinceUnixEpoch::now(),
914 }
915 }
916
917 pub async fn import_secrets_bundle(
930 &self,
931 bundle: &SecretsBundle,
932 ) -> Result<(), BundleImportError> {
933 self.import_secrets_bundle_impl(bundle).await?;
934
935 self.ensure_device_keys_upload().await?;
938 self.wait_for_e2ee_initialization_tasks().await;
939
940 if !self.backups().are_enabled().await {
945 self.backups().maybe_resume_backups().await?;
946 }
947
948 Ok(())
949 }
950
951 pub(crate) async fn import_secrets_bundle_impl(
952 &self,
953 bundle: &SecretsBundle,
954 ) -> Result<(), SecretImportError> {
955 let olm_machine = self.client.olm_machine().await;
956 let olm_machine =
957 olm_machine.as_ref().expect("This should only be called once we have an OlmMachine");
958
959 olm_machine.store().import_secrets_bundle(bundle).await
960 }
961
962 pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
967 let olm = self.client.olm_machine().await;
968 let machine = olm.as_ref()?;
969 Some(machine.cross_signing_status().await)
970 }
971
972 pub async fn has_devices_to_verify_against(&self) -> Result<bool> {
978 let olm_machine = self.client.olm_machine().await;
979 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
980 let user_id = olm_machine.user_id();
981
982 self.ensure_initial_key_query().await?;
983
984 let devices = self.get_user_devices(user_id).await?;
985
986 let ret = devices.devices().any(|device| {
987 device.is_cross_signed_by_owner()
988 && device.curve25519_key().is_some()
989 && !device.is_dehydrated()
990 });
991
992 Ok(ret)
993 }
994
995 pub async fn tracked_users(&self) -> Result<HashSet<OwnedUserId>, CryptoStoreError> {
1000 if let Some(machine) = self.client.olm_machine().await.as_ref() {
1001 machine.tracked_users().await
1002 } else {
1003 Ok(HashSet::new())
1004 }
1005 }
1006
1007 pub fn verification_state(&self) -> Subscriber<VerificationState> {
1030 self.client.inner.verification_state.subscribe_reset()
1031 }
1032
1033 pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
1035 let olm = self.client.olm_machine().await;
1036 let olm = olm.as_ref()?;
1037 #[allow(clippy::bind_instead_of_map)]
1038 olm.get_verification(user_id, flow_id).and_then(|v| match v {
1039 matrix_sdk_base::crypto::Verification::SasV1(sas) => {
1040 Some(SasVerification { inner: sas, client: self.client.clone() }.into())
1041 }
1042 #[cfg(feature = "qrcode")]
1043 matrix_sdk_base::crypto::Verification::QrV1(qr) => {
1044 Some(verification::QrVerification { inner: qr, client: self.client.clone() }.into())
1045 }
1046 _ => None,
1047 })
1048 }
1049
1050 pub async fn get_verification_request(
1053 &self,
1054 user_id: &UserId,
1055 flow_id: impl AsRef<str>,
1056 ) -> Option<VerificationRequest> {
1057 let olm = self.client.olm_machine().await;
1058 let olm = olm.as_ref()?;
1059
1060 olm.get_verification_request(user_id, flow_id)
1061 .map(|r| VerificationRequest { inner: r, client: self.client.clone() })
1062 }
1063
1064 pub async fn get_device(
1098 &self,
1099 user_id: &UserId,
1100 device_id: &DeviceId,
1101 ) -> Result<Option<Device>, CryptoStoreError> {
1102 let olm = self.client.olm_machine().await;
1103 let Some(machine) = olm.as_ref() else { return Ok(None) };
1104 let device = machine.get_device(user_id, device_id, None).await?;
1105 Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
1106 }
1107
1108 pub async fn get_own_device(&self) -> Result<Option<Device>, CryptoStoreError> {
1115 let olm = self.client.olm_machine().await;
1116 let Some(machine) = olm.as_ref() else { return Ok(None) };
1117 let device = machine.get_device(machine.user_id(), machine.device_id(), None).await?;
1118 Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
1119 }
1120
1121 pub async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices, Error> {
1147 let devices = self
1148 .client
1149 .olm_machine()
1150 .await
1151 .as_ref()
1152 .ok_or(Error::NoOlmMachine)?
1153 .get_user_devices(user_id, None)
1154 .await?;
1155
1156 Ok(UserDevices { inner: devices, client: self.client.clone() })
1157 }
1158
1159 pub async fn get_user_identity(
1195 &self,
1196 user_id: &UserId,
1197 ) -> Result<Option<UserIdentity>, CryptoStoreError> {
1198 let olm = self.client.olm_machine().await;
1199 let Some(olm) = olm.as_ref() else { return Ok(None) };
1200 let identity = olm.get_identity(user_id, None).await?;
1201
1202 Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1203 }
1204
1205 pub async fn request_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
1243 let olm = self.client.olm_machine().await;
1244 let Some(olm) = olm.as_ref() else { return Ok(None) };
1245
1246 let (request_id, request) = olm.query_keys_for_users(iter::once(user_id));
1247 self.client.keys_query(&request_id, request.device_keys).await?;
1248
1249 let identity = olm.get_identity(user_id, None).await?;
1250 Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1251 }
1252
1253 pub async fn devices_stream(&self) -> Result<impl Stream<Item = DeviceUpdates> + use<>> {
1284 let olm = self.client.olm_machine().await;
1285 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1286 let client = self.client.to_owned();
1287
1288 Ok(olm
1289 .store()
1290 .devices_stream()
1291 .map(move |updates| DeviceUpdates::new(client.to_owned(), updates)))
1292 }
1293
1294 pub async fn user_identities_stream(
1322 &self,
1323 ) -> Result<impl Stream<Item = IdentityUpdates> + use<>> {
1324 let olm = self.client.olm_machine().await;
1325 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1326 let client = self.client.to_owned();
1327
1328 Ok(olm
1329 .store()
1330 .user_identities_stream()
1331 .map(move |updates| IdentityUpdates::new(client.to_owned(), updates)))
1332 }
1333
1334 pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData>) -> Result<()> {
1373 let olm = self.client.olm_machine().await;
1374 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1375
1376 let CrossSigningBootstrapRequests {
1377 upload_signing_keys_req,
1378 upload_keys_req,
1379 upload_signatures_req,
1380 } = olm.bootstrap_cross_signing(false).await?;
1381
1382 let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1383 auth: auth_data,
1384 master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1385 self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1386 user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1387 });
1388
1389 if let Some(req) = upload_keys_req {
1390 self.client.send_outgoing_request(req).await?;
1391 }
1392 self.client.send(upload_signing_keys_req).await?;
1393 self.client.send(upload_signatures_req).await?;
1394
1395 Ok(())
1396 }
1397
1398 pub async fn reset_cross_signing(&self) -> Result<Option<CrossSigningResetHandle>> {
1435 let olm = self.client.olm_machine().await;
1436 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1437
1438 let CrossSigningBootstrapRequests {
1439 upload_keys_req,
1440 upload_signing_keys_req,
1441 upload_signatures_req,
1442 } = olm.bootstrap_cross_signing(true).await?;
1443
1444 let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1445 auth: None,
1446 master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1447 self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1448 user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1449 });
1450
1451 if let Some(req) = upload_keys_req {
1452 self.client.send_outgoing_request(req).await?;
1453 }
1454
1455 if let Err(error) = self.client.send(upload_signing_keys_req.clone()).await {
1456 if let Ok(Some(auth_type)) = CrossSigningResetAuthType::new(&error) {
1457 let client = self.client.clone();
1458
1459 Ok(Some(CrossSigningResetHandle::new(
1460 client,
1461 upload_signing_keys_req,
1462 upload_signatures_req,
1463 auth_type,
1464 )))
1465 } else {
1466 Err(error.into())
1467 }
1468 } else {
1469 self.client.send(upload_signatures_req).await?;
1470
1471 Ok(None)
1472 }
1473 }
1474
1475 async fn ensure_initial_key_query(&self) -> Result<()> {
1478 let olm_machine = self.client.olm_machine().await;
1479 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1480
1481 let user_id = olm_machine.user_id();
1482
1483 if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1484 let (request_id, request) = olm_machine.query_keys_for_users([olm_machine.user_id()]);
1485 self.client.keys_query(&request_id, request.device_keys).await?;
1486 }
1487
1488 Ok(())
1489 }
1490
1491 pub async fn bootstrap_cross_signing_if_needed(
1538 &self,
1539 auth_data: Option<AuthData>,
1540 ) -> Result<()> {
1541 let olm_machine = self.client.olm_machine().await;
1542 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1543 let user_id = olm_machine.user_id();
1544
1545 self.ensure_initial_key_query().await?;
1546
1547 if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1548 self.bootstrap_cross_signing(auth_data).await?;
1549 }
1550
1551 Ok(())
1552 }
1553
1554 #[cfg(not(target_family = "wasm"))]
1606 pub async fn export_room_keys(
1607 &self,
1608 path: PathBuf,
1609 passphrase: &str,
1610 predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
1611 ) -> Result<()> {
1612 let olm = self.client.olm_machine().await;
1613 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1614
1615 let keys = olm.store().export_room_keys(predicate).await?;
1616 let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1617
1618 let encrypt = move || -> Result<()> {
1619 let export: String =
1620 matrix_sdk_base::crypto::encrypt_room_key_export(&keys, &passphrase, 500_000)?;
1621 let mut file = std::fs::File::create(path)?;
1622 file.write_all(&export.into_bytes())?;
1623 Ok(())
1624 };
1625
1626 let task = tokio::task::spawn_blocking(encrypt);
1627 task.await.expect("Task join error")
1628 }
1629
1630 #[cfg(not(target_family = "wasm"))]
1668 pub async fn import_room_keys(
1669 &self,
1670 path: PathBuf,
1671 passphrase: &str,
1672 ) -> Result<RoomKeyImportResult, RoomKeyImportError> {
1673 let olm = self.client.olm_machine().await;
1674 let olm = olm.as_ref().ok_or(RoomKeyImportError::StoreClosed)?;
1675 let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1676
1677 let decrypt = move || {
1678 let file = std::fs::File::open(path)?;
1679 matrix_sdk_base::crypto::decrypt_room_key_export(file, &passphrase)
1680 };
1681
1682 let task = tokio::task::spawn_blocking(decrypt);
1683 let import = task.await.expect("Task join error")?;
1684
1685 let ret = olm.store().import_exported_room_keys(import, |_, _| {}).await?;
1686
1687 self.backups().maybe_trigger_backup();
1688
1689 Ok(ret)
1690 }
1691
1692 pub async fn room_keys_received_stream(
1723 &self,
1724 ) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<>>
1725 {
1726 let olm = self.client.olm_machine().await;
1727 let olm = olm.as_ref()?;
1728
1729 Some(olm.store().room_keys_received_stream())
1730 }
1731
1732 pub async fn historic_room_key_stream(
1763 &self,
1764 ) -> Option<impl Stream<Item = RoomKeyBundleInfo> + use<>> {
1765 let olm = self.client.olm_machine().await;
1766 let olm = olm.as_ref()?;
1767
1768 Some(olm.store().historic_room_key_stream())
1769 }
1770
1771 pub fn secret_storage(&self) -> SecretStorage {
1773 SecretStorage { client: self.client.to_owned() }
1774 }
1775
1776 pub fn backups(&self) -> Backups {
1778 Backups { client: self.client.to_owned() }
1779 }
1780
1781 pub fn recovery(&self) -> Recovery {
1783 Recovery { client: self.client.to_owned() }
1784 }
1785
1786 pub async fn enable_cross_process_store_lock(&self, lock_value: String) -> Result<(), Error> {
1798 if let Some(prev_lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1800 let prev_holder = prev_lock.lock_holder();
1801 if prev_holder.is_some() && prev_holder.unwrap() == lock_value {
1802 return Ok(());
1803 }
1804 warn!(
1805 "Recreating cross-process store lock with a different holder value: \
1806 prev was {prev_holder:?}, new is {lock_value}"
1807 );
1808 }
1809
1810 let olm_machine = self.client.base_client().olm_machine().await;
1811 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1812
1813 let lock = olm_machine.store().create_store_lock(
1814 "cross_process_lock".to_owned(),
1815 CrossProcessLockConfig::multi_process(lock_value.to_owned()),
1816 );
1817
1818 {
1823 let lock_result = lock.try_lock_once().await?;
1824
1825 if lock_result.is_ok() {
1826 olm_machine
1827 .initialize_crypto_store_generation(
1828 &self.client.locks().crypto_store_generation,
1829 )
1830 .await?;
1831 }
1832 }
1833
1834 self.client
1835 .locks()
1836 .cross_process_crypto_store_lock
1837 .set(lock)
1838 .map_err(|_| Error::BadCryptoStoreState)?;
1839
1840 Ok(())
1841 }
1842
1843 #[instrument(skip(self), fields(olm_machine_new_generation, olm_machine_generation))]
1848 async fn on_lock_newly_acquired(&self) -> Result<u64, Error> {
1849 let olm_machine_guard = self.client.olm_machine().await;
1850 if let Some(olm_machine) = olm_machine_guard.as_ref() {
1851 let (new_gen, generation_number) = olm_machine
1852 .maintain_crypto_store_generation(&self.client.locks().crypto_store_generation)
1853 .await?;
1854
1855 Span::current()
1856 .record("olm_machine_new_generation", new_gen)
1857 .record("olm_machine_generation", generation_number);
1858 debug!("OlmMachine generation maintained in CryptoStore");
1859
1860 if new_gen {
1862 drop(olm_machine_guard);
1864 self.client.base_client().regenerate_olm(None).await?;
1866 }
1867 Ok(generation_number)
1868 } else {
1869 warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised");
1874 Ok(0)
1875 }
1876 }
1877
1878 pub async fn spin_lock_store(
1886 &self,
1887 max_backoff: Option<u32>,
1888 ) -> Result<Option<CrossProcessLockGuard>, Error> {
1889 self.lock_store(async move |lock| lock.spin_lock(max_backoff).await).await
1890 }
1891
1892 pub async fn try_lock_store_once(&self) -> Result<Option<CrossProcessLockGuard>, Error> {
1900 match self.lock_store(CrossProcessLock::try_lock_once).await {
1901 Err(Error::CrossProcessLockError(e))
1902 if matches!(*e, CrossProcessLockError::Unobtained(_)) =>
1903 {
1904 Ok(None)
1905 }
1906 other => other,
1907 }
1908 }
1909
1910 pub async fn lock_store<F: AcquireCrossProcessLockFn<LockableCryptoStore>>(
1917 &self,
1918 acquire: F,
1919 ) -> Result<Option<CrossProcessLockGuard>, Error> {
1920 let wrap_err = |e: CryptoStoreError| {
1921 Error::CrossProcessLockError(Box::new(CrossProcessLockError::TryLock(Arc::new(e))))
1922 };
1923 if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1924 let guard = acquire(lock).await.map_err(wrap_err)??;
1925 let _ = self.on_lock_newly_acquired().await?;
1926 Ok(Some(guard.into_guard()))
1927 } else {
1928 Ok(None)
1929 }
1930 }
1931
1932 #[cfg(any(test, feature = "testing"))]
1934 pub async fn uploaded_key_count(&self) -> Result<u64> {
1935 let olm_machine = self.client.olm_machine().await;
1936 let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?;
1937 Ok(olm_machine.uploaded_key_count().await?)
1938 }
1939
1940 pub(crate) async fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1964 let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite {
1968 Some(BundleReceiverTask::new(&self.client).await)
1969 } else {
1970 None
1971 };
1972
1973 let mut tasks = self.client.inner.e2ee.tasks.lock();
1974
1975 let this = self.clone();
1976
1977 tasks.setup_e2ee = Some(spawn(async move {
1978 this.update_verification_state().await;
1981
1982 if this.settings().auto_enable_cross_signing
1983 && let Err(e) = this.bootstrap_cross_signing_if_needed(auth_data).await
1984 {
1985 error!("Couldn't bootstrap cross signing {e:?}");
1986 }
1987
1988 if let Err(e) = this.backups().setup_and_resume().await {
1989 error!("Couldn't setup and resume backups {e:?}");
1990 }
1991 if let Err(e) = this.recovery().setup().await {
1992 error!("Couldn't setup and resume recovery {e:?}");
1993 }
1994 }));
1995
1996 tasks.receive_historic_room_key_bundles = bundle_receiver_task;
1997
1998 self.setup_room_membership_session_discard_handler();
1999 }
2000
2001 pub async fn wait_for_e2ee_initialization_tasks(&self) {
2004 let task = self.client.inner.e2ee.tasks.lock().setup_e2ee.take();
2005
2006 if let Some(task) = task
2007 && let Err(err) = task.await
2008 {
2009 warn!("Error when initializing backups: {err}");
2010 }
2011 }
2012
2013 pub(crate) async fn ensure_device_keys_upload(&self) -> Result<()> {
2023 let olm = self.client.olm_machine().await;
2024 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
2025
2026 if let Some((request_id, request)) = olm.upload_device_keys().await? {
2027 self.client.keys_upload(&request_id, &request).await?;
2028
2029 let (request_id, request) = olm.query_keys_for_users([olm.user_id()]);
2030 self.client.keys_query(&request_id, request.device_keys).await?;
2031 }
2032
2033 Ok(())
2034 }
2035
2036 pub(crate) async fn update_state_after_keys_query(&self, response: &get_keys::v3::Response) {
2037 self.recovery().update_state_after_keys_query(response).await;
2038
2039 if let Some(user_id) = self.client.user_id() {
2041 let contains_own_device = response.device_keys.contains_key(user_id);
2042
2043 if contains_own_device {
2044 self.update_verification_state().await;
2045 }
2046 }
2047 }
2048
2049 async fn update_verification_state(&self) {
2050 match self.get_own_device().await {
2051 Ok(device) => {
2052 if let Some(device) = device {
2053 let is_verified = device.is_cross_signed_by_owner();
2054
2055 if is_verified {
2056 self.client.inner.verification_state.set(VerificationState::Verified);
2057 } else {
2058 self.client.inner.verification_state.set(VerificationState::Unverified);
2059 }
2060 } else {
2061 warn!("Couldn't find out own device in the store.");
2062 self.client.inner.verification_state.set(VerificationState::Unknown);
2063 }
2064 }
2065 Err(error) => {
2066 warn!("Failed retrieving own device: {error}");
2067 self.client.inner.verification_state.set(VerificationState::Unknown);
2068 }
2069 }
2070 }
2071
2072 fn setup_room_membership_session_discard_handler(&self) {
2092 let client = WeakClient::from_client(&self.client);
2093 self.client.add_event_handler(|ev: OriginalSyncRoomMemberEvent, room: Room| async move {
2094 let Some(client) = client.get() else {
2095 return;
2097 };
2098 let Some(user_id) = client.user_id() else {
2099 return;
2101 };
2102 let olm = client.olm_machine().await;
2103 let Some(olm) = olm.as_ref() else {
2104 warn!("Cannot discard session - Olm machine is not available");
2105 return;
2106 };
2107
2108 if matches!(
2109 ev.membership_change(),
2110 MembershipChange::Joined |
2111 MembershipChange::Invited |
2112 MembershipChange::KnockAccepted |
2113 MembershipChange::InvitationAccepted |
2114 MembershipChange::ProfileChanged { .. }
2115 ) || ev.sender == user_id {
2116 return;
2118 }
2119
2120 debug!(room_id = ?room.room_id(), member_id = ?ev.sender, "Discarding session as a user left the room");
2121
2122 if let Err(e) = olm.discard_room_key(room.room_id()).await {
2125 warn!(
2126 room_id = ?room.room_id(),
2127 "Error discarding room key after member leave: {e:?}"
2128 );
2129 }
2130 });
2131 }
2132
2133 #[cfg(feature = "experimental-send-custom-to-device")]
2142 pub async fn encrypt_and_send_raw_to_device(
2143 &self,
2144 recipient_devices: Vec<&Device>,
2145 event_type: &str,
2146 content: Raw<AnyToDeviceEventContent>,
2147 share_strategy: CollectStrategy,
2148 ) -> Result<Vec<(OwnedUserId, OwnedDeviceId)>> {
2149 let users = recipient_devices.iter().map(|device| device.user_id());
2150
2151 self.client.claim_one_time_keys(users).await?;
2155
2156 let olm = self.client.olm_machine().await;
2157 let olm = olm.as_ref().expect("Olm machine wasn't started");
2158
2159 let (requests, withhelds) = olm
2160 .encrypt_content_for_devices(
2161 recipient_devices.into_iter().map(|d| d.deref().clone()).collect(),
2162 event_type,
2163 &content
2164 .deserialize_as::<serde_json::Value>()
2165 .expect("Deserialize as Value will always work"),
2166 share_strategy,
2167 )
2168 .await?;
2169
2170 let mut failures: Vec<(OwnedUserId, OwnedDeviceId)> = Default::default();
2171
2172 withhelds.iter().for_each(|(d, _)| {
2174 failures.push((d.user_id().to_owned(), d.device_id().to_owned()));
2175 });
2176
2177 for request in requests {
2179 let ruma_request = RumaToDeviceRequest::new_raw(
2180 request.event_type.clone(),
2181 request.txn_id.clone(),
2182 request.messages.clone(),
2183 );
2184
2185 let send_result = self
2186 .client
2187 .send_inner(ruma_request, Some(RequestConfig::short_retry()), Default::default())
2188 .await;
2189
2190 if send_result.is_err() {
2192 for (user_id, device_map) in request.messages {
2194 for device_id in device_map.keys() {
2195 match device_id {
2196 DeviceIdOrAllDevices::DeviceId(device_id) => {
2197 failures.push((user_id.clone(), device_id.to_owned()));
2198 }
2199 DeviceIdOrAllDevices::AllDevices => {
2200 }
2202 }
2203 }
2204 }
2205 }
2206 }
2207
2208 Ok(failures)
2209 }
2210}
2211
2212#[cfg(all(test, not(target_family = "wasm")))]
2213mod tests {
2214 use std::{
2215 ops::Not,
2216 str::FromStr,
2217 sync::{
2218 Arc,
2219 atomic::{AtomicBool, Ordering},
2220 },
2221 time::Duration,
2222 };
2223
2224 use matrix_sdk_test::{
2225 DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
2226 event_factory::EventFactory,
2227 };
2228 use ruma::{
2229 event_id,
2230 events::{reaction::ReactionEventContent, relation::Annotation},
2231 user_id,
2232 };
2233 use serde_json::json;
2234 use wiremock::{
2235 Mock, MockServer, Request, ResponseTemplate,
2236 matchers::{header, method, path_regex},
2237 };
2238
2239 use crate::{
2240 Client, assert_next_matches_with_timeout,
2241 config::RequestConfig,
2242 encryption::{
2243 DuplicateOneTimeKeyErrorMessage, OAuthCrossSigningResetInfo, VerificationState,
2244 },
2245 test_utils::{
2246 client::mock_matrix_session, logged_in_client, no_retry_test_client, set_client_session,
2247 },
2248 };
2249
2250 #[async_test]
2251 async fn test_reaction_sending() {
2252 let server = MockServer::start().await;
2253 let client = logged_in_client(Some(server.uri())).await;
2254
2255 let event_id = event_id!("$2:example.org");
2256
2257 Mock::given(method("GET"))
2258 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
2259 .and(header("authorization", "Bearer 1234"))
2260 .respond_with(
2261 ResponseTemplate::new(200)
2262 .set_body_json(EventFactory::new().room_encryption().into_content()),
2263 )
2264 .mount(&server)
2265 .await;
2266
2267 Mock::given(method("PUT"))
2268 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/m\.reaction/.*".to_owned()))
2269 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2270 "event_id": event_id,
2271 })))
2272 .mount(&server)
2273 .await;
2274
2275 let f = EventFactory::new().sender(user_id!("@example:localhost"));
2276 let response = SyncResponseBuilder::default()
2277 .add_joined_room(
2278 JoinedRoomBuilder::default()
2279 .add_state_event(
2280 f.member(user_id!("@example:localhost")).display_name("example"),
2281 )
2282 .add_state_event(f.default_power_levels())
2283 .add_state_event(f.room_encryption()),
2284 )
2285 .build_sync_response();
2286
2287 client.base_client().receive_sync_response(response).await.unwrap();
2288
2289 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
2290 assert!(
2291 room.latest_encryption_state().await.expect("Getting encryption state").is_encrypted()
2292 );
2293
2294 let event_id = event_id!("$1:example.org");
2295 let reaction = ReactionEventContent::new(Annotation::new(event_id.into(), "🐈".to_owned()));
2296 room.send(reaction).await.expect("Sending the reaction should not fail");
2297
2298 room.send_raw("m.reaction", json!({})).await.expect("Sending the reaction should not fail");
2299 }
2300
2301 #[cfg(feature = "sqlite")]
2302 #[async_test]
2303 async fn test_generation_counter_invalidates_olm_machine() {
2304 use matrix_sdk_base::store::RoomLoadSettings;
2307 let sqlite_path = std::env::temp_dir().join("generation_counter_sqlite.db");
2308 let session = mock_matrix_session();
2309
2310 let client1 = Client::builder()
2311 .homeserver_url("http://localhost:1234")
2312 .request_config(RequestConfig::new().disable_retry())
2313 .sqlite_store(&sqlite_path, None)
2314 .build()
2315 .await
2316 .unwrap();
2317 client1
2318 .matrix_auth()
2319 .restore_session(session.clone(), RoomLoadSettings::default())
2320 .await
2321 .unwrap();
2322
2323 let client2 = Client::builder()
2324 .homeserver_url("http://localhost:1234")
2325 .request_config(RequestConfig::new().disable_retry())
2326 .sqlite_store(sqlite_path, None)
2327 .build()
2328 .await
2329 .unwrap();
2330 client2.matrix_auth().restore_session(session, RoomLoadSettings::default()).await.unwrap();
2331
2332 let guard = client1.encryption().try_lock_store_once().await.unwrap();
2334 assert!(guard.is_none());
2335
2336 client1.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2337 client2.encryption().enable_cross_process_store_lock("client2".to_owned()).await.unwrap();
2338
2339 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2341 assert!(acquired1.is_some());
2342
2343 let initial_olm_machine =
2345 client1.olm_machine().await.clone().expect("must have an olm machine");
2346
2347 let decryption_key = matrix_sdk_base::crypto::store::types::BackupDecryptionKey::new();
2349 let backup_key = decryption_key.megolm_v1_public_key();
2350 backup_key.set_version("1".to_owned());
2351 initial_olm_machine
2352 .backup_machine()
2353 .save_decryption_key(Some(decryption_key.to_owned()), Some("1".to_owned()))
2354 .await
2355 .expect("Should save");
2356
2357 initial_olm_machine.backup_machine().enable_backup_v1(backup_key.clone()).await.unwrap();
2358
2359 assert!(client1.encryption().backups().are_enabled().await);
2360
2361 let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2363 assert!(acquired2.is_none());
2364
2365 drop(acquired1);
2367 tokio::time::sleep(Duration::from_millis(100)).await;
2368
2369 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2371 assert!(acquired1.is_some());
2372
2373 let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2375 assert!(initial_olm_machine.same_as(&olm_machine));
2376
2377 drop(acquired1);
2379 tokio::time::sleep(Duration::from_millis(100)).await;
2380
2381 let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2383 assert!(acquired2.is_some());
2384
2385 drop(acquired2);
2387 tokio::time::sleep(Duration::from_millis(100)).await;
2388
2389 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2391 assert!(acquired1.is_some());
2392
2393 let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2395
2396 assert!(!initial_olm_machine.same_as(&olm_machine));
2397
2398 let backup_key_new = olm_machine.backup_machine().get_backup_keys().await.unwrap();
2399 assert!(backup_key_new.decryption_key.is_some());
2400 assert_eq!(
2401 backup_key_new.decryption_key.unwrap().megolm_v1_public_key().to_base64(),
2402 backup_key.to_base64()
2403 );
2404 assert!(client1.encryption().backups().are_enabled().await);
2405 }
2406
2407 #[cfg(feature = "sqlite")]
2408 #[async_test]
2409 async fn test_generation_counter_no_spurious_invalidation() {
2410 use matrix_sdk_base::store::RoomLoadSettings;
2413 let sqlite_path =
2414 std::env::temp_dir().join("generation_counter_no_spurious_invalidations.db");
2415 let session = mock_matrix_session();
2416
2417 let client = Client::builder()
2418 .homeserver_url("http://localhost:1234")
2419 .request_config(RequestConfig::new().disable_retry())
2420 .sqlite_store(&sqlite_path, None)
2421 .build()
2422 .await
2423 .unwrap();
2424 client
2425 .matrix_auth()
2426 .restore_session(session.clone(), RoomLoadSettings::default())
2427 .await
2428 .unwrap();
2429
2430 let initial_olm_machine = client.olm_machine().await.as_ref().unwrap().clone();
2431
2432 client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2433
2434 let after_enabling_lock = client.olm_machine().await.as_ref().unwrap().clone();
2436 assert!(initial_olm_machine.same_as(&after_enabling_lock));
2437
2438 {
2439 let client2 = Client::builder()
2441 .homeserver_url("http://localhost:1234")
2442 .request_config(RequestConfig::new().disable_retry())
2443 .sqlite_store(sqlite_path, None)
2444 .build()
2445 .await
2446 .unwrap();
2447 client2
2448 .matrix_auth()
2449 .restore_session(session, RoomLoadSettings::default())
2450 .await
2451 .unwrap();
2452
2453 client2
2454 .encryption()
2455 .enable_cross_process_store_lock("client2".to_owned())
2456 .await
2457 .unwrap();
2458
2459 let guard = client2.encryption().spin_lock_store(None).await.unwrap();
2460 assert!(guard.is_some());
2461
2462 drop(guard);
2463 tokio::time::sleep(Duration::from_millis(100)).await;
2464 }
2465
2466 {
2467 let acquired = client.encryption().try_lock_store_once().await.unwrap();
2468 assert!(acquired.is_some());
2469 }
2470
2471 let after_taking_lock_first_time = client.olm_machine().await.as_ref().unwrap().clone();
2473 assert!(!initial_olm_machine.same_as(&after_taking_lock_first_time));
2474
2475 {
2476 let acquired = client.encryption().try_lock_store_once().await.unwrap();
2477 assert!(acquired.is_some());
2478 }
2479
2480 let after_taking_lock_second_time = client.olm_machine().await.as_ref().unwrap().clone();
2482 assert!(after_taking_lock_first_time.same_as(&after_taking_lock_second_time));
2483 }
2484
2485 #[async_test]
2486 async fn test_update_verification_state_is_updated_before_any_requests_happen() {
2487 let client = no_retry_test_client(None).await;
2489 let server = MockServer::start().await;
2490
2491 let mut verification_state = client.encryption().verification_state();
2493
2494 assert_next_matches_with_timeout!(verification_state, VerificationState::Unknown);
2496
2497 let keys_requested = Arc::new(AtomicBool::new(false));
2500 let inner_bool = keys_requested.clone();
2501
2502 Mock::given(method("GET"))
2503 .and(path_regex(
2504 r"/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
2505 ))
2506 .respond_with(move |_req: &Request| {
2507 inner_bool.fetch_or(true, Ordering::SeqCst);
2508 ResponseTemplate::new(200).set_body_json(json!({}))
2509 })
2510 .mount(&server)
2511 .await;
2512
2513 set_client_session(&client).await;
2515
2516 assert!(keys_requested.load(Ordering::SeqCst).not());
2518 assert_next_matches_with_timeout!(verification_state, VerificationState::Unverified);
2519 }
2520
2521 #[test]
2522 fn test_oauth_reset_info_from_uiaa_info() {
2523 let auth_info = json!({
2524 "session": "dummy",
2525 "flows": [
2526 {
2527 "stages": [
2528 "org.matrix.cross_signing_reset"
2529 ]
2530 }
2531 ],
2532 "params": {
2533 "org.matrix.cross_signing_reset": {
2534 "url": "https://example.org/account/account?action=org.matrix.cross_signing_reset"
2535 }
2536 },
2537 "msg": "To reset..."
2538 });
2539
2540 let auth_info = serde_json::from_value(auth_info)
2541 .expect("We should be able to deserialize the UiaaInfo");
2542 OAuthCrossSigningResetInfo::from_auth_info(&auth_info)
2543 .expect("We should be able to fetch the cross-signing reset info from the auth info");
2544 }
2545
2546 #[test]
2547 fn test_duplicate_one_time_key_error_parsing() {
2548 let message = concat!(
2549 r#"One time key signed_curve25519:AAAAAAAAAAA already exists. "#,
2550 r#"Old key: {"key":"dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs","#,
2551 r#""signatures":{"@example:matrix.org":{"ed25519:AAAAAAAAAA":""#,
2552 r#"Fk45zHAbrd+1j9wZXLjL2Y/+DU/Mnz9yuvlfYBOOT7qExN2Jdud+5BAuNs8nZ/caS4wTF39Kg3zQpzaGERoCBg"}}};"#,
2553 r#" new key: {'key': 'CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno', "#,
2554 r#"'signatures': {'@example:matrix.org': {'ed25519:AAAAAAAAAA': "#,
2555 r#"'BQ9Gp0p+6srF+c8OyruqKKd9R4yaub3THYAyyBB/7X/rG8BwcAqFynzl1aGyFYun4Q+087a5OSiglCXI+/kQAA'}}}"#
2556 );
2557 let message = DuplicateOneTimeKeyErrorMessage::from_str(message)
2558 .expect("We should be able to parse the error message");
2559
2560 assert_eq!(message.old_key.to_base64(), "dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs");
2561 assert_eq!(message.new_key.to_base64(), "CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno");
2562
2563 DuplicateOneTimeKeyErrorMessage::from_str("One time key already exists.")
2564 .expect_err("We shouldn't be able to parse an incomplete error message");
2565 }
2566
2567 fn devices_to_verify_against_keys_query_response(
2571 devices: Vec<serde_json::Value>,
2572 ) -> serde_json::Value {
2573 let device_keys: serde_json::Map<String, serde_json::Value> = devices
2574 .into_iter()
2575 .map(|device| (device.get("device_id").unwrap().as_str().unwrap().to_owned(), device))
2576 .collect();
2577 json!({
2578 "device_keys": {
2579 "@example:localhost": device_keys,
2580 },
2581 "master_keys": {
2582 "@example:localhost": {
2583 "keys": {
2584 "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU",
2585 },
2586 "usage": ["master"],
2587 "user_id": "@example:localhost",
2588 },
2589 },
2590 "self_signing_keys": {
2591 "@example:localhost": {
2592 "keys": {
2593 "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM",
2594 },
2595 "usage": ["self_signing"],
2596 "user_id": "@example:localhost",
2597 "signatures": {
2598 "@example:localhost": {
2599 "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "etO1bB+rCk+TQ/FcjQ8eWu/RsRNQNNQ1Ek+PD6//j8yz6igRjfvuHZaMvr/quAFrirfgExph2TdOwlDgN5bFCQ",
2600 },
2601 },
2602 },
2603 },
2604 "user_signing_keys": {
2605 "@example:localhost": {
2606 "keys": {
2607 "ed25519:CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0": "CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0",
2608 },
2609 "usage": ["user_signing"],
2610 "user_id": "@example:localhost",
2611 "signatures": {
2612 "@example:localhost": {
2613 "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "E/DFi/hQTIb/7eSB+HbCXeTLFaLjqWHzLO9GwjL1qdhfO7ew4p6YdtXSH3T2YYr1dKCPteH/4nMYVwOhww2CBg",
2614 },
2615 },
2616 },
2617 }
2618 })
2619 }
2620
2621 #[async_test]
2624 async fn test_devices_to_verify_against_no_devices() {
2627 let server = MockServer::start().await;
2628 let client = logged_in_client(Some(server.uri())).await;
2629
2630 Mock::given(method("POST"))
2631 .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2632 .respond_with(
2633 ResponseTemplate::new(200)
2634 .set_body_json(devices_to_verify_against_keys_query_response(vec![])),
2635 )
2636 .mount(&server)
2637 .await;
2638
2639 assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2640 }
2641
2642 #[async_test]
2643 async fn test_devices_to_verify_against_cross_signed() {
2646 let server = MockServer::start().await;
2647 let client = logged_in_client(Some(server.uri())).await;
2648
2649 Mock::given(method("POST"))
2650 .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2651 .respond_with(ResponseTemplate::new(200).set_body_json(
2652 devices_to_verify_against_keys_query_response(vec![
2653 json!({
2654 "algorithms": [
2655 "m.olm.v1.curve25519-aes-sha2",
2656 "m.megolm.v1.aes-sha2",
2657 ],
2658 "user_id": "@example:localhost",
2659 "device_id": "SIGNEDDEVICE",
2660 "keys": {
2661 "curve25519:SIGNEDDEVICE": "o1LqUtH/sqd3WF+BB2Qr77uw3sDmZhMOz68/IV9aHxs",
2662 "ed25519:SIGNEDDEVICE": "iVoEfMOoUqxXVMLdpZCOgvQuCrT3/kQWkBmB3Phi/lo",
2663 },
2664 "signatures": {
2665 "@example:localhost": {
2666 "ed25519:SIGNEDDEVICE": "C7yRu1fNrdD2EobVdtANMqk3LBtWtTRWrIU22xVS8/Om1kmA/luzek64R3N6JsZhYczVmZYBKhUC9kRvHHwOBg",
2667 "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "frfh2HP28GclmGvwTic00Fj4nZCvm4RlRA6U56mnD5920hOi04+L055ojzp6ybZXvC/GQYfyTHwQXlUN1nvxBA",
2668 },
2669 },
2670 })
2671 ])
2672 ))
2673 .mount(&server)
2674 .await;
2675
2676 assert!(client.encryption().has_devices_to_verify_against().await.unwrap());
2677 }
2678
2679 #[async_test]
2680 async fn test_devices_to_verify_against_dehydrated_and_unsigned() {
2683 let server = MockServer::start().await;
2684 let client = logged_in_client(Some(server.uri())).await;
2685 let user_id = client.user_id().unwrap();
2686 let olm_machine = client.olm_machine().await;
2687 let olm_machine = olm_machine.as_ref().unwrap();
2688
2689 Mock::given(method("POST"))
2690 .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2691 .respond_with(ResponseTemplate::new(200).set_body_json(
2692 devices_to_verify_against_keys_query_response(vec![
2693 json!({
2694 "algorithms": [
2695 "m.olm.v1.curve25519-aes-sha2",
2696 "m.megolm.v1.aes-sha2",
2697 ],
2698 "user_id": "@example:localhost",
2699 "device_id": "DEHYDRATEDDEVICE",
2700 "keys": {
2701 "curve25519:DEHYDRATEDDEVICE": "XOn5VguAgokZ3p9mBz2yOB395fn6j75G8jIPcXEWQGY",
2702 "ed25519:DEHYDRATEDDEVICE": "4GG5xmBT7z4rgUgmWNlKZ+ABE3QlGgTorF+luCnKfYI",
2703 },
2704 "dehydrated": true,
2705 "signatures": {
2706 "@example:localhost": {
2707 "ed25519:DEHYDRATEDDEVICE": "+OMasB7nzVlMV+zRDxkh4h8h/Q0bY42P1SPv7X2IURIelT5G+d+AYSmg30N4maphxEDBqt/vI8/lIr71exc3Dg",
2708 "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "8DzynAgbYgXX1Md5d4Vw91Zstpoi4dpG7levFeVhi4psCAWuBnV76Qu1s2TGjQQ0CLDXEqcxxuX9X4eUK5TGCg",
2709 },
2710 },
2711 }),
2712 json!({
2713 "algorithms": [
2714 "m.olm.v1.curve25519-aes-sha2",
2715 "m.megolm.v1.aes-sha2",
2716 ],
2717 "user_id": "@example:localhost",
2718 "device_id": "UNSIGNEDDEVICE",
2719 "keys": {
2720 "curve25519:UNSIGNEDDEVICE": "mMby6NpprkHxj+ONfO9Z5lBqVUHJBMkrPFSNJhogBkg",
2721 "ed25519:UNSIGNEDDEVICE": "Zifq39ZDrlIaSRf0Hh22owEqXCPE+1JSSgs6LDlubwQ",
2722 },
2723 "signatures": {
2724 "@example:localhost": {
2725 "ed25519:UNSIGNEDDEVICE": "+L29RoDKoTufPGm/Bae65KHno7Z1H7GYhxSKpB4RQZRS7NrR29AMW1PVhEsIozYuDVEFuMZ0L8H3dlcaHxagBA",
2726 },
2727 },
2728 }),
2729 ])
2730 ))
2731 .mount(&server)
2732 .await;
2733
2734 let (request_id, request) = olm_machine.query_keys_for_users([user_id]);
2735 client.keys_query(&request_id, request.device_keys).await.unwrap();
2736
2737 assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2738 }
2739}