1#![doc = include_str!("../docs/encryption.md")]
17#![cfg_attr(target_family = "wasm", allow(unused_imports))]
18
19#[cfg(feature = "experimental-send-custom-to-device")]
20use std::ops::Deref;
21use std::{
22 collections::{BTreeMap, HashSet},
23 io::{Cursor, Read, Write},
24 iter,
25 path::{Path, PathBuf},
26 str::FromStr,
27 sync::Arc,
28 time::Duration,
29};
30
31use eyeball::{SharedObservable, Subscriber};
32use futures_core::Stream;
33use futures_util::{
34 future::try_join,
35 stream::{self, StreamExt},
36};
37#[cfg(feature = "experimental-send-custom-to-device")]
38use matrix_sdk_base::crypto::CollectStrategy;
39use matrix_sdk_base::{
40 StateStoreDataKey, StateStoreDataValue,
41 cross_process_lock::{AcquireCrossProcessLockFn, CrossProcessLock, CrossProcessLockError},
42 crypto::{
43 CrossSigningBootstrapRequests, OlmMachine,
44 store::{
45 LockableCryptoStore, SecretImportError,
46 types::{RoomKeyBundleInfo, RoomKeyInfo},
47 },
48 types::{
49 SecretsBundle, SignedKey,
50 requests::{
51 OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
52 },
53 },
54 },
55 sleep::sleep,
56};
57use matrix_sdk_common::{executor::spawn, locks::Mutex as StdMutex};
58use ruma::{
59 DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, TransactionId, UserId,
60 api::{
61 client::{
62 keys::{
63 get_keys, upload_keys, upload_signatures::v3::Request as UploadSignaturesRequest,
64 upload_signing_keys::v3::Request as UploadSigningKeysRequest,
65 },
66 message::send_message_event,
67 to_device::send_event_to_device::v3::{
68 Request as RumaToDeviceRequest, Response as ToDeviceResponse,
69 },
70 uiaa::{AuthData, AuthType, OAuthParams, UiaaInfo},
71 },
72 error::{ErrorBody, StandardErrorBody},
73 },
74 assign,
75 events::room::{
76 MediaSource, ThumbnailInfo,
77 member::{MembershipChange, OriginalSyncRoomMemberEvent},
78 },
79};
80#[cfg(feature = "experimental-send-custom-to-device")]
81use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices};
82use serde::{Deserialize, de::Error as _};
83use tasks::BundleReceiverTask;
84use tokio::sync::{Mutex, RwLockReadGuard};
85use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
86use tracing::{Span, debug, error, instrument, warn};
87use url::Url;
88use vodozemac::Curve25519PublicKey;
89
90use self::{
91 backups::{Backups, types::BackupClientState},
92 futures::UploadEncryptedFile,
93 identities::{Device, DeviceUpdates, IdentityUpdates, UserDevices, UserIdentity},
94 recovery::{Recovery, RecoveryState},
95 secret_storage::SecretStorage,
96 tasks::{BackupDownloadTask, BackupUploadingTask, ClientTasks},
97 verification::{SasVerification, Verification, VerificationRequest},
98};
99use crate::{
100 Client, Error, HttpError, Result, Room, TransmissionProgress,
101 attachment::Thumbnail,
102 client::{ClientInner, WeakClient},
103 cross_process_lock::CrossProcessLockGuard,
104 error::HttpResult,
105};
106
107pub mod backups;
108pub mod futures;
109pub mod identities;
110pub mod recovery;
111pub mod secret_storage;
112pub(crate) mod tasks;
113pub mod verification;
114
115use matrix_sdk_base::crypto::OlmMachineBuilder;
116pub use matrix_sdk_base::crypto::{
117 CrossSigningStatus, CryptoStoreError, DecryptorError, EventError, KeyExportError, LocalTrust,
118 MediaEncryptionInfo, MegolmError, OlmError, RoomKeyImportResult, SessionCreationError,
119 SignatureError, VERSION,
120 olm::{
121 SessionCreationError as MegolmSessionCreationError,
122 SessionExportError as OlmSessionExportError,
123 },
124 vodozemac,
125};
126use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
127
128#[cfg(feature = "experimental-send-custom-to-device")]
129use crate::config::RequestConfig;
130pub use crate::error::RoomKeyImportError;
131
132#[cfg(feature = "sqlite")]
135#[derive(Debug, thiserror::Error)]
136pub enum BundleExportError {
137 #[error(transparent)]
139 OpenStoreError(#[from] matrix_sdk_sqlite::OpenStoreError),
140 #[error(transparent)]
142 StoreError(#[from] CryptoStoreError),
143 #[error(transparent)]
146 SecretExport(#[from] matrix_sdk_base::crypto::store::SecretsBundleExportError),
147}
148
149#[derive(Debug, thiserror::Error)]
152pub enum BundleImportError {
153 #[error(transparent)]
155 SecretImport(#[from] SecretImportError),
156 #[error(transparent)]
158 DeviceKeys(#[from] Error),
159}
160
161#[cfg(feature = "sqlite")]
170pub async fn export_secrets_bundle_from_store(
171 database_path: impl AsRef<Path>,
172 passphrase: Option<&str>,
173) -> std::result::Result<Option<(OwnedUserId, SecretsBundle)>, BundleExportError> {
174 use matrix_sdk_base::crypto::store::CryptoStore;
175
176 let store = matrix_sdk_sqlite::SqliteCryptoStore::open(database_path, passphrase).await?;
177 let account =
178 store.load_account().await.map_err(|e| BundleExportError::StoreError(e.into()))?;
179
180 if let Some(account) = account {
181 let machine = OlmMachineBuilder::new(&account.user_id, &account.device_id)
182 .with_crypto_store(store)
183 .build()
184 .await
185 .map_err(BundleExportError::StoreError)?;
186
187 let bundle = machine.store().export_secrets_bundle().await?;
188
189 Ok(Some((account.user_id.to_owned(), bundle)))
190 } else {
191 Ok(None)
192 }
193}
194
195pub(crate) struct EncryptionData {
197 pub tasks: StdMutex<ClientTasks>,
200
201 pub encryption_settings: EncryptionSettings,
203
204 pub backup_state: BackupClientState,
206
207 pub recovery_state: SharedObservable<RecoveryState>,
209}
210
211impl EncryptionData {
212 pub fn new(encryption_settings: EncryptionSettings) -> Self {
213 Self {
214 encryption_settings,
215
216 tasks: StdMutex::new(Default::default()),
217 backup_state: Default::default(),
218 recovery_state: Default::default(),
219 }
220 }
221
222 pub fn initialize_tasks(&self, client: &Arc<ClientInner>) {
223 let weak_client = WeakClient::from_inner(client);
224
225 let mut tasks = self.tasks.lock();
226 tasks.upload_room_keys = Some(BackupUploadingTask::new(weak_client.clone()));
227
228 if self.encryption_settings.backup_download_strategy
229 == BackupDownloadStrategy::AfterDecryptionFailure
230 {
231 tasks.download_room_keys = Some(BackupDownloadTask::new(weak_client));
232 }
233 }
234
235 pub fn initialize_recovery_state_update_task(&self, client: &Client) {
241 let mut guard = self.tasks.lock();
242
243 let future = Recovery::update_state_after_backup_state_change(client);
244 let join_handle = spawn(future);
245
246 guard.update_recovery_state_after_backup = Some(join_handle);
247 }
248}
249
250#[derive(Clone, Copy, Debug, Default)]
252pub struct EncryptionSettings {
253 pub auto_enable_cross_signing: bool,
259
260 pub backup_download_strategy: BackupDownloadStrategy,
265
266 pub auto_enable_backups: bool,
268}
269
270#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
272#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
273pub enum BackupDownloadStrategy {
274 OneShot,
285
286 AfterDecryptionFailure,
288
289 #[default]
294 Manual,
295}
296
297#[derive(Clone, Copy, Debug, Eq, PartialEq)]
302pub enum VerificationState {
303 Unknown,
305 Verified,
308 Unverified,
310}
311
312#[derive(Debug)]
323pub struct CrossSigningResetHandle {
324 client: Client,
325 upload_request: UploadSigningKeysRequest,
326 signatures_request: UploadSignaturesRequest,
327 auth_type: CrossSigningResetAuthType,
328 is_cancelled: Mutex<bool>,
329}
330
331impl CrossSigningResetHandle {
332 pub fn new(
334 client: Client,
335 upload_request: UploadSigningKeysRequest,
336 signatures_request: UploadSignaturesRequest,
337 auth_type: CrossSigningResetAuthType,
338 ) -> Self {
339 Self {
340 client,
341 upload_request,
342 signatures_request,
343 auth_type,
344 is_cancelled: Mutex::new(false),
345 }
346 }
347
348 pub fn auth_type(&self) -> &CrossSigningResetAuthType {
351 &self.auth_type
352 }
353
354 pub async fn auth(&self, auth: Option<AuthData>) -> Result<()> {
358 const RETRY_EVERY: Duration = Duration::from_millis(500);
360
361 const TIMEOUT: Duration = Duration::from_mins(2);
363
364 tokio::time::timeout(TIMEOUT, async {
365 let mut upload_request = self.upload_request.clone();
366 upload_request.auth = auth;
367
368 debug!(
369 "Repeatedly PUTting to keys/device_signing/upload until it works \
370 or we hit a permanent failure."
371 );
372 while let Err(e) = self.client.send(upload_request.clone()).await {
373 if *self.is_cancelled.lock().await {
374 return Ok(());
375 }
376
377 match e.as_uiaa_response() {
378 Some(uiaa_info) => {
379 if !matches!(self.auth_type, CrossSigningResetAuthType::OAuth(_))
382 && uiaa_info.auth_error.is_some()
383 {
384 return Err(e.into());
385 }
386 }
387 None => return Err(e.into()),
388 }
389
390 debug!(
391 "PUT to keys/device_signing/upload failed with 401. Retrying after \
392 a short delay."
393 );
394 sleep(RETRY_EVERY).await;
395 }
396
397 self.client.send(self.signatures_request.clone()).await?;
398
399 Ok(())
400 })
401 .await
402 .unwrap_or_else(|_| {
403 warn!("Timed out waiting for keys/device_signing/upload to succeed.");
404 Err(Error::Timeout)
405 })
406 }
407
408 pub async fn cancel(&self) {
410 *self.is_cancelled.lock().await = true;
411 }
412}
413
414#[derive(Debug, Clone)]
417pub enum CrossSigningResetAuthType {
418 Uiaa(UiaaInfo),
420 OAuth(OAuthCrossSigningResetInfo),
423}
424
425impl CrossSigningResetAuthType {
426 fn new(error: &HttpError) -> Result<Option<Self>> {
427 if let Some(auth_info) = error.as_uiaa_response() {
428 if let Ok(Some(auth_info)) = OAuthCrossSigningResetInfo::from_auth_info(auth_info) {
429 Ok(Some(CrossSigningResetAuthType::OAuth(auth_info)))
430 } else {
431 Ok(Some(CrossSigningResetAuthType::Uiaa(auth_info.clone())))
432 }
433 } else {
434 Ok(None)
435 }
436 }
437}
438
439#[derive(Debug, Clone, Deserialize)]
442pub struct OAuthCrossSigningResetInfo {
443 pub approval_url: Url,
445
446 pub session: Option<String>,
448}
449
450impl OAuthCrossSigningResetInfo {
451 fn from_auth_info(auth_info: &UiaaInfo) -> Result<Option<Self>> {
452 let Some(parameters) = auth_info.params::<OAuthParams>(&AuthType::OAuth)? else {
453 return Ok(None);
454 };
455
456 Ok(Some(OAuthCrossSigningResetInfo {
457 approval_url: parameters.url.as_str().try_into()?,
458 session: auth_info.session.clone(),
459 }))
460 }
461}
462
463#[derive(Clone, Debug)]
466pub struct DuplicateOneTimeKeyErrorMessage {
467 pub old_key: Curve25519PublicKey,
469 pub new_key: Curve25519PublicKey,
471}
472
473impl FromStr for DuplicateOneTimeKeyErrorMessage {
474 type Err = serde_json::Error;
475
476 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
477 let mut split = s.split_terminator(';');
481
482 let old_key = split
483 .next()
484 .ok_or(serde_json::Error::custom("Old key is missing in the error message"))?;
485 let new_key = split
486 .next()
487 .ok_or(serde_json::Error::custom("New key is missing in the error message"))?;
488
489 let old_key_index = old_key
492 .find("Old key:")
493 .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
494
495 let old_key = old_key[old_key_index..]
496 .trim()
497 .strip_prefix("Old key:")
498 .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
499
500 let new_key = new_key
503 .trim()
504 .strip_prefix("new key:")
505 .ok_or(serde_json::Error::custom("New key is missing the prefix"))?;
506
507 let new_key = new_key.replace("'", "\"");
510
511 let old_key: SignedKey = serde_json::from_str(old_key)?;
513 let new_key: SignedKey = serde_json::from_str(&new_key)?;
514
515 let old_key = old_key.key();
517 let new_key = new_key.key();
518
519 Ok(Self { old_key, new_key })
520 }
521}
522
523impl Client {
524 pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
525 self.base_client().olm_machine().await
526 }
527
528 pub(crate) async fn mark_request_as_sent(
529 &self,
530 request_id: &TransactionId,
531 response: impl Into<matrix_sdk_base::crypto::types::requests::AnyIncomingResponse<'_>>,
532 ) -> Result<(), matrix_sdk_base::Error> {
533 Ok(self
534 .olm_machine()
535 .await
536 .as_ref()
537 .expect(
538 "We should have an olm machine once we try to mark E2EE related requests as sent",
539 )
540 .mark_request_as_sent(request_id, response)
541 .await?)
542 }
543
544 #[instrument(skip(self, device_keys))]
550 pub(crate) async fn keys_query(
551 &self,
552 request_id: &TransactionId,
553 device_keys: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
554 ) -> Result<get_keys::v3::Response> {
555 let request = assign!(get_keys::v3::Request::new(), { device_keys });
556
557 let response = self.send(request).await?;
558 self.mark_request_as_sent(request_id, &response).await?;
559 self.encryption().update_state_after_keys_query(&response).await;
560
561 Ok(response)
562 }
563
564 pub fn upload_encrypted_file<'a, R: Read + ?Sized + 'a>(
598 &'a self,
599 reader: &'a mut R,
600 ) -> UploadEncryptedFile<'a, R> {
601 UploadEncryptedFile::new(self, reader)
602 }
603
604 pub(crate) async fn upload_encrypted_media_and_thumbnail(
607 &self,
608 data: &[u8],
609 thumbnail: Option<Thumbnail>,
610 send_progress: SharedObservable<TransmissionProgress>,
611 ) -> Result<(MediaSource, Option<(MediaSource, Box<ThumbnailInfo>)>)> {
612 let upload_thumbnail = self.upload_encrypted_thumbnail(thumbnail, send_progress.clone());
613
614 let upload_attachment = async {
615 let mut cursor = Cursor::new(data);
616 self.upload_encrypted_file(&mut cursor)
617 .with_send_progress_observable(send_progress)
618 .await
619 };
620
621 let (thumbnail, file) = try_join(upload_thumbnail, upload_attachment).await?;
622
623 Ok((MediaSource::Encrypted(Box::new(file)), thumbnail))
624 }
625
626 async fn upload_encrypted_thumbnail(
629 &self,
630 thumbnail: Option<Thumbnail>,
631 send_progress: SharedObservable<TransmissionProgress>,
632 ) -> Result<Option<(MediaSource, Box<ThumbnailInfo>)>> {
633 let Some(thumbnail) = thumbnail else {
634 return Ok(None);
635 };
636
637 let (data, _, thumbnail_info) = thumbnail.into_parts();
638 let mut cursor = Cursor::new(data);
639
640 let file = self
641 .upload_encrypted_file(&mut cursor)
642 .with_send_progress_observable(send_progress)
643 .await?;
644
645 Ok(Some((MediaSource::Encrypted(Box::new(file)), thumbnail_info)))
646 }
647
648 pub(crate) async fn claim_one_time_keys(
654 &self,
655 users: impl Iterator<Item = &UserId>,
656 ) -> Result<()> {
657 let _lock = self.locks().key_claim_lock.lock().await;
658
659 if let Some((request_id, request)) = self
660 .olm_machine()
661 .await
662 .as_ref()
663 .ok_or(Error::NoOlmMachine)?
664 .get_missing_sessions(users)
665 .await?
666 {
667 let response = self.send(request).await?;
668 self.mark_request_as_sent(&request_id, &response).await?;
669 }
670
671 Ok(())
672 }
673
674 #[instrument(skip(self, request))]
684 pub(crate) async fn keys_upload(
685 &self,
686 request_id: &TransactionId,
687 request: &upload_keys::v3::Request,
688 ) -> Result<upload_keys::v3::Response> {
689 debug!(
690 device_keys = request.device_keys.is_some(),
691 one_time_key_count = request.one_time_keys.len(),
692 "Uploading public encryption keys",
693 );
694
695 let response = self.send(request.clone()).await?;
696 self.mark_request_as_sent(request_id, &response).await?;
697
698 Ok(response)
699 }
700
701 pub(crate) async fn room_send_helper(
702 &self,
703 request: &RoomMessageRequest,
704 ) -> Result<send_message_event::v3::Response> {
705 let content = request.content.clone();
706 let txn_id = request.txn_id.clone();
707 let room_id = &request.room_id;
708
709 self.get_room(room_id)
710 .expect("Can't send a message to a room that isn't known to the store")
711 .send(*content)
712 .with_transaction_id(txn_id)
713 .await
714 .map(|result| result.response)
715 }
716
717 pub(crate) async fn send_to_device(
718 &self,
719 request: &ToDeviceRequest,
720 ) -> HttpResult<ToDeviceResponse> {
721 let request = RumaToDeviceRequest::new_raw(
722 request.event_type.clone(),
723 request.txn_id.clone(),
724 request.messages.clone(),
725 );
726
727 self.send(request).await
728 }
729
730 pub(crate) async fn send_verification_request(
731 &self,
732 request: OutgoingVerificationRequest,
733 ) -> Result<()> {
734 use matrix_sdk_base::crypto::types::requests::OutgoingVerificationRequest::*;
735
736 match request {
737 ToDevice(t) => {
738 self.send_to_device(&t).await?;
739 }
740 InRoom(r) => {
741 self.room_send_helper(&r).await?;
742 }
743 }
744
745 Ok(())
746 }
747
748 async fn send_outgoing_request(&self, r: OutgoingRequest) -> Result<()> {
749 use matrix_sdk_base::crypto::types::requests::AnyOutgoingRequest;
750
751 match r.request() {
752 AnyOutgoingRequest::KeysQuery(request) => {
753 self.keys_query(r.request_id(), request.device_keys.clone()).await?;
754 }
755 AnyOutgoingRequest::KeysUpload(request) => {
756 let response = self.keys_upload(r.request_id(), request).await;
757
758 if let Err(e) = &response {
759 match e.as_client_api_error() {
760 Some(e) if e.status_code == 400 => {
761 if let ErrorBody::Standard(StandardErrorBody { message, .. }) = &e.body
762 {
763 {
768 let already_reported = self
769 .state_store()
770 .get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded)
771 .await?
772 .is_some();
773
774 if message.starts_with("One time key") && !already_reported {
775 let error_message =
776 DuplicateOneTimeKeyErrorMessage::from_str(message);
777
778 if let Ok(message) = &error_message {
779 error!(
780 sentry = true,
781 old_key = %message.old_key,
782 new_key = %message.new_key,
783 "Duplicate one-time keys have been uploaded"
784 );
785 } else {
786 error!(
787 sentry = true,
788 "Duplicate one-time keys have been uploaded"
789 );
790 }
791
792 self.state_store()
793 .set_kv_data(
794 StateStoreDataKey::OneTimeKeyAlreadyUploaded,
795 StateStoreDataValue::OneTimeKeyAlreadyUploaded,
796 )
797 .await?;
798
799 if let Err(e) = self
800 .inner
801 .duplicate_key_upload_error_sender
802 .send(error_message.ok())
803 {
804 error!(
805 "Failed to dispatch duplicate key upload error notification: {}",
806 e
807 );
808 }
809 }
810 }
811 }
812 }
813 _ => {}
814 }
815
816 response?;
817 }
818 }
819 AnyOutgoingRequest::ToDeviceRequest(request) => {
820 let response = self.send_to_device(request).await?;
821 self.mark_request_as_sent(r.request_id(), &response).await?;
822 }
823 AnyOutgoingRequest::SignatureUpload(request) => {
824 let response = self.send(request.clone()).await?;
825 self.mark_request_as_sent(r.request_id(), &response).await?;
826 }
827 AnyOutgoingRequest::RoomMessage(request) => {
828 let response = self.room_send_helper(request).await?;
829 self.mark_request_as_sent(r.request_id(), &response).await?;
830 }
831 AnyOutgoingRequest::KeysClaim(request) => {
832 let response = self.send(request.clone()).await?;
833 self.mark_request_as_sent(r.request_id(), &response).await?;
834 }
835 }
836
837 Ok(())
838 }
839
840 #[instrument(skip_all)]
841 pub(crate) async fn send_outgoing_requests(&self) -> Result<()> {
842 const MAX_CONCURRENT_REQUESTS: usize = 20;
843
844 if let Err(e) = self.claim_one_time_keys(iter::empty()).await {
847 warn!("Error while claiming one-time keys {:?}", e);
848 }
849
850 let outgoing_requests = stream::iter(
851 self.olm_machine()
852 .await
853 .as_ref()
854 .ok_or(Error::NoOlmMachine)?
855 .outgoing_requests()
856 .await?,
857 )
858 .map(|r| self.send_outgoing_request(r));
859
860 let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS);
861
862 requests
863 .for_each(|r| async move {
864 match r {
865 Ok(_) => (),
866 Err(e) => warn!(error = ?e, "Error when sending out an outgoing E2EE request"),
867 }
868 })
869 .await;
870
871 Ok(())
872 }
873}
874
875#[cfg(any(feature = "testing", test))]
876impl Client {
877 pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
879 self.olm_machine().await
880 }
881
882 pub fn abort_bundle_receiver_task(&self) {
884 let tasks = self.inner.e2ee.tasks.lock();
885 if let Some(task) = tasks.receive_historic_room_key_bundles.as_ref() {
886 task.abort()
887 }
888 }
889}
890
891#[derive(Debug, Clone)]
895pub struct Encryption {
896 client: Client,
898}
899
900impl Encryption {
901 pub(crate) fn new(client: Client) -> Self {
902 Self { client }
903 }
904
905 pub(crate) fn settings(&self) -> EncryptionSettings {
907 self.client.inner.e2ee.encryption_settings
908 }
909
910 pub async fn ed25519_key(&self) -> Option<String> {
913 self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().ed25519.to_base64())
914 }
915
916 pub async fn curve25519_key(&self) -> Option<Curve25519PublicKey> {
918 self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().curve25519)
919 }
920
921 pub async fn device_creation_timestamp(&self) -> MilliSecondsSinceUnixEpoch {
923 match self.get_own_device().await {
924 Ok(Some(device)) => device.first_time_seen_ts(),
925 _ => MilliSecondsSinceUnixEpoch::now(),
927 }
928 }
929
930 pub async fn import_secrets_bundle(
943 &self,
944 bundle: &SecretsBundle,
945 ) -> Result<(), BundleImportError> {
946 self.import_secrets_bundle_impl(bundle).await?;
947
948 self.ensure_device_keys_upload().await?;
951 self.wait_for_e2ee_initialization_tasks().await;
952
953 if !self.backups().are_enabled().await {
958 self.backups().maybe_resume_backups().await?;
959 }
960
961 Ok(())
962 }
963
964 pub(crate) async fn import_secrets_bundle_impl(
965 &self,
966 bundle: &SecretsBundle,
967 ) -> Result<(), SecretImportError> {
968 let olm_machine = self.client.olm_machine().await;
969 let olm_machine =
970 olm_machine.as_ref().expect("This should only be called once we have an OlmMachine");
971
972 olm_machine.store().import_secrets_bundle(bundle).await
973 }
974
975 pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
980 let olm = self.client.olm_machine().await;
981 let machine = olm.as_ref()?;
982 Some(machine.cross_signing_status().await)
983 }
984
985 pub async fn has_devices_to_verify_against(&self) -> Result<bool> {
991 let olm_machine = self.client.olm_machine().await;
992 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
993 let user_id = olm_machine.user_id();
994
995 self.ensure_initial_key_query().await?;
996
997 let devices = self.get_user_devices(user_id).await?;
998
999 let ret = devices.devices().any(|device| {
1000 device.is_cross_signed_by_owner()
1001 && device.curve25519_key().is_some()
1002 && !device.is_dehydrated()
1003 });
1004
1005 Ok(ret)
1006 }
1007
1008 pub async fn tracked_users(&self) -> Result<HashSet<OwnedUserId>, CryptoStoreError> {
1013 if let Some(machine) = self.client.olm_machine().await.as_ref() {
1014 machine.tracked_users().await
1015 } else {
1016 Ok(HashSet::new())
1017 }
1018 }
1019
1020 pub fn verification_state(&self) -> Subscriber<VerificationState> {
1043 self.client.inner.verification_state.subscribe_reset()
1044 }
1045
1046 pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
1048 let olm = self.client.olm_machine().await;
1049 let olm = olm.as_ref()?;
1050 #[allow(clippy::bind_instead_of_map)]
1051 olm.get_verification(user_id, flow_id).and_then(|v| match v {
1052 matrix_sdk_base::crypto::Verification::SasV1(sas) => {
1053 Some(SasVerification { inner: sas, client: self.client.clone() }.into())
1054 }
1055 #[cfg(feature = "qrcode")]
1056 matrix_sdk_base::crypto::Verification::QrV1(qr) => {
1057 Some(verification::QrVerification { inner: qr, client: self.client.clone() }.into())
1058 }
1059 _ => None,
1060 })
1061 }
1062
1063 pub async fn get_verification_request(
1066 &self,
1067 user_id: &UserId,
1068 flow_id: impl AsRef<str>,
1069 ) -> Option<VerificationRequest> {
1070 let olm = self.client.olm_machine().await;
1071 let olm = olm.as_ref()?;
1072
1073 olm.get_verification_request(user_id, flow_id)
1074 .map(|r| VerificationRequest { inner: r, client: self.client.clone() })
1075 }
1076
1077 pub async fn get_device(
1111 &self,
1112 user_id: &UserId,
1113 device_id: &DeviceId,
1114 ) -> Result<Option<Device>, CryptoStoreError> {
1115 let olm = self.client.olm_machine().await;
1116 let Some(machine) = olm.as_ref() else { return Ok(None) };
1117 let device = machine.get_device(user_id, device_id, None).await?;
1118 Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
1119 }
1120
1121 pub async fn get_own_device(&self) -> Result<Option<Device>, CryptoStoreError> {
1128 let olm = self.client.olm_machine().await;
1129 let Some(machine) = olm.as_ref() else { return Ok(None) };
1130 let device = machine.get_device(machine.user_id(), machine.device_id(), None).await?;
1131 Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
1132 }
1133
1134 pub async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices, Error> {
1160 let devices = self
1161 .client
1162 .olm_machine()
1163 .await
1164 .as_ref()
1165 .ok_or(Error::NoOlmMachine)?
1166 .get_user_devices(user_id, None)
1167 .await?;
1168
1169 Ok(UserDevices { inner: devices, client: self.client.clone() })
1170 }
1171
1172 pub async fn get_user_identity(
1208 &self,
1209 user_id: &UserId,
1210 ) -> Result<Option<UserIdentity>, CryptoStoreError> {
1211 let olm = self.client.olm_machine().await;
1212 let Some(olm) = olm.as_ref() else { return Ok(None) };
1213 let identity = olm.get_identity(user_id, None).await?;
1214
1215 Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1216 }
1217
1218 pub async fn request_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
1256 let olm = self.client.olm_machine().await;
1257 let Some(olm) = olm.as_ref() else { return Ok(None) };
1258
1259 let (request_id, request) = olm.query_keys_for_users(iter::once(user_id));
1260 self.client.keys_query(&request_id, request.device_keys).await?;
1261
1262 let identity = olm.get_identity(user_id, None).await?;
1263 Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1264 }
1265
1266 pub async fn devices_stream(&self) -> Result<impl Stream<Item = DeviceUpdates> + use<>> {
1297 let olm = self.client.olm_machine().await;
1298 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1299 let client = self.client.to_owned();
1300
1301 Ok(olm
1302 .store()
1303 .devices_stream()
1304 .map(move |updates| DeviceUpdates::new(client.to_owned(), updates)))
1305 }
1306
1307 pub async fn user_identities_stream(
1335 &self,
1336 ) -> Result<impl Stream<Item = IdentityUpdates> + use<>> {
1337 let olm = self.client.olm_machine().await;
1338 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1339 let client = self.client.to_owned();
1340
1341 Ok(olm
1342 .store()
1343 .user_identities_stream()
1344 .map(move |updates| IdentityUpdates::new(client.to_owned(), updates)))
1345 }
1346
1347 pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData>) -> Result<()> {
1386 let olm = self.client.olm_machine().await;
1387 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1388
1389 let CrossSigningBootstrapRequests {
1390 upload_signing_keys_req,
1391 upload_keys_req,
1392 upload_signatures_req,
1393 } = olm.bootstrap_cross_signing(false).await?;
1394
1395 let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1396 auth: auth_data,
1397 master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1398 self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1399 user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1400 });
1401
1402 if let Some(req) = upload_keys_req {
1403 self.client.send_outgoing_request(req).await?;
1404 }
1405 self.client.send(upload_signing_keys_req).await?;
1406 self.client.send(upload_signatures_req).await?;
1407
1408 Ok(())
1409 }
1410
1411 pub async fn reset_cross_signing(&self) -> Result<Option<CrossSigningResetHandle>> {
1452 let olm = self.client.olm_machine().await;
1453 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1454
1455 let CrossSigningBootstrapRequests {
1456 upload_keys_req,
1457 upload_signing_keys_req,
1458 upload_signatures_req,
1459 } = olm.bootstrap_cross_signing(true).await?;
1460
1461 let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1462 auth: None,
1463 master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1464 self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1465 user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1466 });
1467
1468 if let Some(req) = upload_keys_req {
1469 self.client.send_outgoing_request(req).await?;
1470 }
1471
1472 if let Err(error) = self.client.send(upload_signing_keys_req.clone()).await {
1473 if let Ok(Some(auth_type)) = CrossSigningResetAuthType::new(&error) {
1474 let client = self.client.clone();
1475
1476 Ok(Some(CrossSigningResetHandle::new(
1477 client,
1478 upload_signing_keys_req,
1479 upload_signatures_req,
1480 auth_type,
1481 )))
1482 } else {
1483 Err(error.into())
1484 }
1485 } else {
1486 self.client.send(upload_signatures_req).await?;
1487
1488 Ok(None)
1489 }
1490 }
1491
1492 async fn ensure_initial_key_query(&self) -> Result<()> {
1495 let olm_machine = self.client.olm_machine().await;
1496 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1497
1498 let user_id = olm_machine.user_id();
1499
1500 if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1501 let (request_id, request) = olm_machine.query_keys_for_users([olm_machine.user_id()]);
1502 self.client.keys_query(&request_id, request.device_keys).await?;
1503 }
1504
1505 Ok(())
1506 }
1507
1508 pub async fn bootstrap_cross_signing_if_needed(
1555 &self,
1556 auth_data: Option<AuthData>,
1557 ) -> Result<()> {
1558 let olm_machine = self.client.olm_machine().await;
1559 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1560 let user_id = olm_machine.user_id();
1561
1562 self.ensure_initial_key_query().await?;
1563
1564 if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1565 self.bootstrap_cross_signing(auth_data).await?;
1566 }
1567
1568 Ok(())
1569 }
1570
1571 #[cfg(not(target_family = "wasm"))]
1623 pub async fn export_room_keys(
1624 &self,
1625 path: PathBuf,
1626 passphrase: &str,
1627 predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
1628 ) -> Result<()> {
1629 let olm = self.client.olm_machine().await;
1630 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1631
1632 let keys = olm.store().export_room_keys(predicate).await?;
1633 let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1634
1635 let encrypt = move || -> Result<()> {
1636 let export: String =
1637 matrix_sdk_base::crypto::encrypt_room_key_export(&keys, &passphrase, 500_000)?;
1638 let mut file = std::fs::File::create(path)?;
1639 file.write_all(&export.into_bytes())?;
1640 Ok(())
1641 };
1642
1643 let task = tokio::task::spawn_blocking(encrypt);
1644 task.await.expect("Task join error")
1645 }
1646
1647 #[cfg(not(target_family = "wasm"))]
1685 pub async fn import_room_keys(
1686 &self,
1687 path: PathBuf,
1688 passphrase: &str,
1689 ) -> Result<RoomKeyImportResult, RoomKeyImportError> {
1690 let olm = self.client.olm_machine().await;
1691 let olm = olm.as_ref().ok_or(RoomKeyImportError::StoreClosed)?;
1692 let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1693
1694 let decrypt = move || {
1695 let file = std::fs::File::open(path)?;
1696 matrix_sdk_base::crypto::decrypt_room_key_export(file, &passphrase)
1697 };
1698
1699 let task = tokio::task::spawn_blocking(decrypt);
1700 let import = task.await.expect("Task join error")?;
1701
1702 let ret = olm.store().import_exported_room_keys(import, |_, _| {}).await?;
1703
1704 self.backups().maybe_trigger_backup();
1705
1706 Ok(ret)
1707 }
1708
1709 pub async fn room_keys_received_stream(
1740 &self,
1741 ) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<>>
1742 {
1743 let olm = self.client.olm_machine().await;
1744 let olm = olm.as_ref()?;
1745
1746 Some(olm.store().room_keys_received_stream())
1747 }
1748
1749 pub async fn historic_room_key_stream(
1780 &self,
1781 ) -> Option<impl Stream<Item = RoomKeyBundleInfo> + use<>> {
1782 let olm = self.client.olm_machine().await;
1783 let olm = olm.as_ref()?;
1784
1785 Some(olm.store().historic_room_key_stream())
1786 }
1787
1788 pub fn secret_storage(&self) -> SecretStorage {
1790 SecretStorage { client: self.client.to_owned() }
1791 }
1792
1793 pub fn backups(&self) -> Backups {
1795 Backups { client: self.client.to_owned() }
1796 }
1797
1798 pub fn recovery(&self) -> Recovery {
1800 Recovery { client: self.client.to_owned() }
1801 }
1802
1803 pub async fn enable_cross_process_store_lock(&self, lock_value: String) -> Result<(), Error> {
1815 if let Some(prev_lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1817 let prev_holder = prev_lock.lock_holder();
1818 if prev_holder.is_some() && prev_holder.unwrap() == lock_value {
1819 return Ok(());
1820 }
1821 warn!(
1822 "Recreating cross-process store lock with a different holder value: \
1823 prev was {prev_holder:?}, new is {lock_value}"
1824 );
1825 }
1826
1827 let olm_machine = self.client.base_client().olm_machine().await;
1828 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1829
1830 let lock = olm_machine.store().create_store_lock(
1831 "cross_process_lock".to_owned(),
1832 CrossProcessLockConfig::multi_process(lock_value.to_owned()),
1833 );
1834
1835 {
1840 let lock_result = lock.try_lock_once().await?;
1841
1842 if lock_result.is_ok() {
1843 olm_machine
1844 .initialize_crypto_store_generation(
1845 &self.client.locks().crypto_store_generation,
1846 )
1847 .await?;
1848 }
1849 }
1850
1851 self.client
1852 .locks()
1853 .cross_process_crypto_store_lock
1854 .set(lock)
1855 .map_err(|_| Error::BadCryptoStoreState)?;
1856
1857 Ok(())
1858 }
1859
1860 #[instrument(skip(self), fields(olm_machine_new_generation, olm_machine_generation))]
1865 async fn on_lock_newly_acquired(&self) -> Result<u64, Error> {
1866 let olm_machine_guard = self.client.olm_machine().await;
1867 if let Some(olm_machine) = olm_machine_guard.as_ref() {
1868 let (new_gen, generation_number) = olm_machine
1869 .maintain_crypto_store_generation(&self.client.locks().crypto_store_generation)
1870 .await?;
1871
1872 Span::current()
1873 .record("olm_machine_new_generation", new_gen)
1874 .record("olm_machine_generation", generation_number);
1875 debug!("OlmMachine generation maintained in CryptoStore");
1876
1877 if new_gen {
1879 drop(olm_machine_guard);
1881 self.client.base_client().regenerate_olm(None).await?;
1883 }
1884 Ok(generation_number)
1885 } else {
1886 warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised");
1891 Ok(0)
1892 }
1893 }
1894
1895 pub async fn spin_lock_store(
1903 &self,
1904 max_backoff: Option<u32>,
1905 ) -> Result<Option<CrossProcessLockGuard>, Error> {
1906 self.lock_store(async move |lock| lock.spin_lock(max_backoff).await).await
1907 }
1908
1909 pub async fn try_lock_store_once(&self) -> Result<Option<CrossProcessLockGuard>, Error> {
1917 match self.lock_store(CrossProcessLock::try_lock_once).await {
1918 Err(Error::CrossProcessLockError(e))
1919 if matches!(*e, CrossProcessLockError::Unobtained(_)) =>
1920 {
1921 Ok(None)
1922 }
1923 other => other,
1924 }
1925 }
1926
1927 pub async fn lock_store<F: AcquireCrossProcessLockFn<LockableCryptoStore>>(
1934 &self,
1935 acquire: F,
1936 ) -> Result<Option<CrossProcessLockGuard>, Error> {
1937 let wrap_err = |e: CryptoStoreError| {
1938 Error::CrossProcessLockError(Box::new(CrossProcessLockError::TryLock(Arc::new(e))))
1939 };
1940 if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1941 let guard = acquire(lock).await.map_err(wrap_err)??;
1942 let _ = self.on_lock_newly_acquired().await?;
1943 Ok(Some(guard.into_guard()))
1944 } else {
1945 Ok(None)
1946 }
1947 }
1948
1949 #[cfg(any(test, feature = "testing"))]
1951 pub async fn uploaded_key_count(&self) -> Result<u64> {
1952 let olm_machine = self.client.olm_machine().await;
1953 let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?;
1954 Ok(olm_machine.uploaded_key_count().await?)
1955 }
1956
1957 pub(crate) async fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1981 let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite {
1985 Some(BundleReceiverTask::new(&self.client).await)
1986 } else {
1987 None
1988 };
1989
1990 let mut tasks = self.client.inner.e2ee.tasks.lock();
1991
1992 let this = self.clone();
1993
1994 tasks.setup_e2ee = Some(spawn(async move {
1995 this.update_verification_state().await;
1998
1999 if this.settings().auto_enable_cross_signing
2000 && let Err(e) = this.bootstrap_cross_signing_if_needed(auth_data).await
2001 {
2002 error!("Couldn't bootstrap cross signing {e:?}");
2003 }
2004
2005 if let Err(e) = this.backups().setup_and_resume().await {
2006 error!("Couldn't setup and resume backups {e:?}");
2007 }
2008 if let Err(e) = this.recovery().setup().await {
2009 error!("Couldn't setup and resume recovery {e:?}");
2010 }
2011 }));
2012
2013 tasks.receive_historic_room_key_bundles = bundle_receiver_task;
2014
2015 self.setup_room_membership_session_discard_handler();
2016 }
2017
2018 pub async fn wait_for_e2ee_initialization_tasks(&self) {
2021 let task = self.client.inner.e2ee.tasks.lock().setup_e2ee.take();
2022
2023 if let Some(task) = task
2024 && let Err(err) = task.await
2025 {
2026 warn!("Error when initializing backups: {err}");
2027 }
2028 }
2029
2030 pub(crate) async fn ensure_device_keys_upload(&self) -> Result<()> {
2040 let olm = self.client.olm_machine().await;
2041 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
2042
2043 if let Some((request_id, request)) = olm.upload_device_keys().await? {
2044 self.client.keys_upload(&request_id, &request).await?;
2045
2046 let (request_id, request) = olm.query_keys_for_users([olm.user_id()]);
2047 self.client.keys_query(&request_id, request.device_keys).await?;
2048 }
2049
2050 Ok(())
2051 }
2052
2053 pub(crate) async fn update_state_after_keys_query(&self, response: &get_keys::v3::Response) {
2054 self.recovery().update_state_after_keys_query(response).await;
2055
2056 if let Some(user_id) = self.client.user_id() {
2058 let contains_own_device = response.device_keys.contains_key(user_id);
2059
2060 if contains_own_device {
2061 self.update_verification_state().await;
2062 }
2063 }
2064 }
2065
2066 async fn update_verification_state(&self) {
2067 match self.get_own_device().await {
2068 Ok(device) => {
2069 if let Some(device) = device {
2070 let is_verified = device.is_cross_signed_by_owner();
2071
2072 if is_verified {
2073 self.client.inner.verification_state.set(VerificationState::Verified);
2074 } else {
2075 self.client.inner.verification_state.set(VerificationState::Unverified);
2076 }
2077 } else {
2078 warn!("Couldn't find out own device in the store.");
2079 self.client.inner.verification_state.set(VerificationState::Unknown);
2080 }
2081 }
2082 Err(error) => {
2083 warn!("Failed retrieving own device: {error}");
2084 self.client.inner.verification_state.set(VerificationState::Unknown);
2085 }
2086 }
2087 }
2088
2089 fn setup_room_membership_session_discard_handler(&self) {
2109 let client = WeakClient::from_client(&self.client);
2110 self.client.add_event_handler(|ev: OriginalSyncRoomMemberEvent, room: Room| async move {
2111 let Some(client) = client.get() else {
2112 return;
2114 };
2115 let Some(user_id) = client.user_id() else {
2116 return;
2118 };
2119 let olm = client.olm_machine().await;
2120 let Some(olm) = olm.as_ref() else {
2121 warn!("Cannot discard session - Olm machine is not available");
2122 return;
2123 };
2124
2125 if matches!(
2126 ev.membership_change(),
2127 MembershipChange::Joined |
2128 MembershipChange::Invited |
2129 MembershipChange::KnockAccepted |
2130 MembershipChange::InvitationAccepted |
2131 MembershipChange::ProfileChanged { .. }
2132 ) || ev.sender == user_id {
2133 return;
2135 }
2136
2137 debug!(room_id = ?room.room_id(), member_id = ?ev.sender, "Discarding session as a user left the room");
2138
2139 if let Err(e) = olm.discard_room_key(room.room_id()).await {
2142 warn!(
2143 room_id = ?room.room_id(),
2144 "Error discarding room key after member leave: {e:?}"
2145 );
2146 }
2147 });
2148 }
2149
2150 #[cfg(feature = "experimental-send-custom-to-device")]
2159 pub async fn encrypt_and_send_raw_to_device(
2160 &self,
2161 recipient_devices: Vec<&Device>,
2162 event_type: &str,
2163 content: Raw<AnyToDeviceEventContent>,
2164 share_strategy: CollectStrategy,
2165 ) -> Result<Vec<(OwnedUserId, OwnedDeviceId)>> {
2166 let users = recipient_devices.iter().map(|device| device.user_id());
2167
2168 self.client.claim_one_time_keys(users).await?;
2172
2173 let olm = self.client.olm_machine().await;
2174 let olm = olm.as_ref().expect("Olm machine wasn't started");
2175
2176 let (requests, withhelds) = olm
2177 .encrypt_content_for_devices(
2178 recipient_devices.into_iter().map(|d| d.deref().clone()).collect(),
2179 event_type,
2180 &content
2181 .deserialize_as::<serde_json::Value>()
2182 .expect("Deserialize as Value will always work"),
2183 share_strategy,
2184 )
2185 .await?;
2186
2187 let mut failures: Vec<(OwnedUserId, OwnedDeviceId)> = Default::default();
2188
2189 withhelds.iter().for_each(|(d, _)| {
2191 failures.push((d.user_id().to_owned(), d.device_id().to_owned()));
2192 });
2193
2194 for request in requests {
2196 let ruma_request = RumaToDeviceRequest::new_raw(
2197 request.event_type.clone(),
2198 request.txn_id.clone(),
2199 request.messages.clone(),
2200 );
2201
2202 let send_result = self
2203 .client
2204 .send_inner(ruma_request, Some(RequestConfig::short_retry()), Default::default())
2205 .await;
2206
2207 if send_result.is_err() {
2209 for (user_id, device_map) in request.messages {
2211 for device_id in device_map.keys() {
2212 match device_id {
2213 DeviceIdOrAllDevices::DeviceId(device_id) => {
2214 failures.push((user_id.clone(), device_id.to_owned()));
2215 }
2216 DeviceIdOrAllDevices::AllDevices => {
2217 }
2219 }
2220 }
2221 }
2222 }
2223 }
2224
2225 Ok(failures)
2226 }
2227}
2228
2229#[cfg(all(test, not(target_family = "wasm")))]
2230mod tests {
2231 use std::{
2232 ops::Not,
2233 str::FromStr,
2234 sync::{
2235 Arc,
2236 atomic::{AtomicBool, Ordering},
2237 },
2238 time::Duration,
2239 };
2240
2241 use matrix_sdk_test::{
2242 DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
2243 event_factory::EventFactory,
2244 };
2245 use ruma::{
2246 event_id,
2247 events::{reaction::ReactionEventContent, relation::Annotation},
2248 user_id,
2249 };
2250 use serde_json::json;
2251 use wiremock::{
2252 Mock, MockServer, Request, ResponseTemplate,
2253 matchers::{header, method, path_regex},
2254 };
2255
2256 use crate::{
2257 Client, assert_next_matches_with_timeout,
2258 config::RequestConfig,
2259 encryption::{
2260 DuplicateOneTimeKeyErrorMessage, OAuthCrossSigningResetInfo, VerificationState,
2261 },
2262 test_utils::{
2263 client::mock_matrix_session, logged_in_client, no_retry_test_client, set_client_session,
2264 },
2265 };
2266
2267 #[async_test]
2268 async fn test_reaction_sending() {
2269 let server = MockServer::start().await;
2270 let client = logged_in_client(Some(server.uri())).await;
2271
2272 let event_id = event_id!("$2:example.org");
2273
2274 Mock::given(method("GET"))
2275 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
2276 .and(header("authorization", "Bearer 1234"))
2277 .respond_with(
2278 ResponseTemplate::new(200)
2279 .set_body_json(EventFactory::new().room_encryption().into_content()),
2280 )
2281 .mount(&server)
2282 .await;
2283
2284 Mock::given(method("PUT"))
2285 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/m\.reaction/.*".to_owned()))
2286 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2287 "event_id": event_id,
2288 })))
2289 .mount(&server)
2290 .await;
2291
2292 let f = EventFactory::new().sender(user_id!("@example:localhost"));
2293 let response = SyncResponseBuilder::default()
2294 .add_joined_room(
2295 JoinedRoomBuilder::default()
2296 .add_state_event(
2297 f.member(user_id!("@example:localhost")).display_name("example"),
2298 )
2299 .add_state_event(f.default_power_levels())
2300 .add_state_event(f.room_encryption()),
2301 )
2302 .build_sync_response();
2303
2304 client.base_client().receive_sync_response(response).await.unwrap();
2305
2306 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
2307 assert!(
2308 room.latest_encryption_state().await.expect("Getting encryption state").is_encrypted()
2309 );
2310
2311 let event_id = event_id!("$1:example.org");
2312 let reaction = ReactionEventContent::new(Annotation::new(event_id.into(), "🐈".to_owned()));
2313 room.send(reaction).await.expect("Sending the reaction should not fail");
2314
2315 room.send_raw("m.reaction", json!({})).await.expect("Sending the reaction should not fail");
2316 }
2317
2318 #[cfg(feature = "sqlite")]
2319 #[async_test]
2320 async fn test_generation_counter_invalidates_olm_machine() {
2321 use matrix_sdk_base::store::RoomLoadSettings;
2324 let sqlite_path = std::env::temp_dir().join("generation_counter_sqlite.db");
2325 let session = mock_matrix_session();
2326
2327 let client1 = Client::builder()
2328 .homeserver_url("http://localhost:1234")
2329 .request_config(RequestConfig::new().disable_retry())
2330 .sqlite_store(&sqlite_path, None)
2331 .build()
2332 .await
2333 .unwrap();
2334 client1
2335 .matrix_auth()
2336 .restore_session(session.clone(), RoomLoadSettings::default())
2337 .await
2338 .unwrap();
2339
2340 let client2 = Client::builder()
2341 .homeserver_url("http://localhost:1234")
2342 .request_config(RequestConfig::new().disable_retry())
2343 .sqlite_store(sqlite_path, None)
2344 .build()
2345 .await
2346 .unwrap();
2347 client2.matrix_auth().restore_session(session, RoomLoadSettings::default()).await.unwrap();
2348
2349 let guard = client1.encryption().try_lock_store_once().await.unwrap();
2351 assert!(guard.is_none());
2352
2353 client1.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2354 client2.encryption().enable_cross_process_store_lock("client2".to_owned()).await.unwrap();
2355
2356 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2358 assert!(acquired1.is_some());
2359
2360 let initial_olm_machine =
2362 client1.olm_machine().await.clone().expect("must have an olm machine");
2363
2364 let decryption_key = matrix_sdk_base::crypto::store::types::BackupDecryptionKey::new();
2366 let backup_key = decryption_key.megolm_v1_public_key();
2367 backup_key.set_version("1".to_owned());
2368 initial_olm_machine
2369 .backup_machine()
2370 .save_decryption_key(Some(decryption_key.to_owned()), Some("1".to_owned()))
2371 .await
2372 .expect("Should save");
2373
2374 initial_olm_machine.backup_machine().enable_backup_v1(backup_key.clone()).await.unwrap();
2375
2376 assert!(client1.encryption().backups().are_enabled().await);
2377
2378 let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2380 assert!(acquired2.is_none());
2381
2382 drop(acquired1);
2384 tokio::time::sleep(Duration::from_millis(100)).await;
2385
2386 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2388 assert!(acquired1.is_some());
2389
2390 let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2392 assert!(initial_olm_machine.same_as(&olm_machine));
2393
2394 drop(acquired1);
2396 tokio::time::sleep(Duration::from_millis(100)).await;
2397
2398 let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2400 assert!(acquired2.is_some());
2401
2402 drop(acquired2);
2404 tokio::time::sleep(Duration::from_millis(100)).await;
2405
2406 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2408 assert!(acquired1.is_some());
2409
2410 let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2412
2413 assert!(!initial_olm_machine.same_as(&olm_machine));
2414
2415 let backup_key_new = olm_machine.backup_machine().get_backup_keys().await.unwrap();
2416 assert!(backup_key_new.decryption_key.is_some());
2417 assert_eq!(
2418 backup_key_new.decryption_key.unwrap().megolm_v1_public_key().to_base64(),
2419 backup_key.to_base64()
2420 );
2421 assert!(client1.encryption().backups().are_enabled().await);
2422 }
2423
2424 #[cfg(feature = "sqlite")]
2425 #[async_test]
2426 async fn test_generation_counter_no_spurious_invalidation() {
2427 use matrix_sdk_base::store::RoomLoadSettings;
2430 let sqlite_path =
2431 std::env::temp_dir().join("generation_counter_no_spurious_invalidations.db");
2432 let session = mock_matrix_session();
2433
2434 let client = Client::builder()
2435 .homeserver_url("http://localhost:1234")
2436 .request_config(RequestConfig::new().disable_retry())
2437 .sqlite_store(&sqlite_path, None)
2438 .build()
2439 .await
2440 .unwrap();
2441 client
2442 .matrix_auth()
2443 .restore_session(session.clone(), RoomLoadSettings::default())
2444 .await
2445 .unwrap();
2446
2447 let initial_olm_machine = client.olm_machine().await.as_ref().unwrap().clone();
2448
2449 client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2450
2451 let after_enabling_lock = client.olm_machine().await.as_ref().unwrap().clone();
2453 assert!(initial_olm_machine.same_as(&after_enabling_lock));
2454
2455 {
2456 let client2 = Client::builder()
2458 .homeserver_url("http://localhost:1234")
2459 .request_config(RequestConfig::new().disable_retry())
2460 .sqlite_store(sqlite_path, None)
2461 .build()
2462 .await
2463 .unwrap();
2464 client2
2465 .matrix_auth()
2466 .restore_session(session, RoomLoadSettings::default())
2467 .await
2468 .unwrap();
2469
2470 client2
2471 .encryption()
2472 .enable_cross_process_store_lock("client2".to_owned())
2473 .await
2474 .unwrap();
2475
2476 let guard = client2.encryption().spin_lock_store(None).await.unwrap();
2477 assert!(guard.is_some());
2478
2479 drop(guard);
2480 tokio::time::sleep(Duration::from_millis(100)).await;
2481 }
2482
2483 {
2484 let acquired = client.encryption().try_lock_store_once().await.unwrap();
2485 assert!(acquired.is_some());
2486 }
2487
2488 let after_taking_lock_first_time = client.olm_machine().await.as_ref().unwrap().clone();
2490 assert!(!initial_olm_machine.same_as(&after_taking_lock_first_time));
2491
2492 {
2493 let acquired = client.encryption().try_lock_store_once().await.unwrap();
2494 assert!(acquired.is_some());
2495 }
2496
2497 let after_taking_lock_second_time = client.olm_machine().await.as_ref().unwrap().clone();
2499 assert!(after_taking_lock_first_time.same_as(&after_taking_lock_second_time));
2500 }
2501
2502 #[async_test]
2503 async fn test_update_verification_state_is_updated_before_any_requests_happen() {
2504 let client = no_retry_test_client(None).await;
2506 let server = MockServer::start().await;
2507
2508 let mut verification_state = client.encryption().verification_state();
2510
2511 assert_next_matches_with_timeout!(verification_state, VerificationState::Unknown);
2513
2514 let keys_requested = Arc::new(AtomicBool::new(false));
2517 let inner_bool = keys_requested.clone();
2518
2519 Mock::given(method("GET"))
2520 .and(path_regex(
2521 r"/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
2522 ))
2523 .respond_with(move |_req: &Request| {
2524 inner_bool.fetch_or(true, Ordering::SeqCst);
2525 ResponseTemplate::new(200).set_body_json(json!({}))
2526 })
2527 .mount(&server)
2528 .await;
2529
2530 set_client_session(&client).await;
2532
2533 assert!(keys_requested.load(Ordering::SeqCst).not());
2535 assert_next_matches_with_timeout!(verification_state, VerificationState::Unverified);
2536 }
2537
2538 #[test]
2539 fn test_oauth_reset_info_from_uiaa_info() {
2540 let auth_info = json!({
2541 "session": "dummy",
2542 "flows": [
2543 {
2544 "stages": [
2545 "org.matrix.cross_signing_reset"
2546 ]
2547 }
2548 ],
2549 "params": {
2550 "org.matrix.cross_signing_reset": {
2551 "url": "https://example.org/account/account?action=org.matrix.cross_signing_reset"
2552 }
2553 },
2554 "msg": "To reset..."
2555 });
2556
2557 let auth_info = serde_json::from_value(auth_info)
2558 .expect("We should be able to deserialize the UiaaInfo");
2559 OAuthCrossSigningResetInfo::from_auth_info(&auth_info)
2560 .expect("We should be able to fetch the cross-signing reset info from the auth info");
2561 }
2562
2563 #[test]
2564 fn test_duplicate_one_time_key_error_parsing() {
2565 let message = concat!(
2566 r#"One time key signed_curve25519:AAAAAAAAAAA already exists. "#,
2567 r#"Old key: {"key":"dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs","#,
2568 r#""signatures":{"@example:matrix.org":{"ed25519:AAAAAAAAAA":""#,
2569 r#"Fk45zHAbrd+1j9wZXLjL2Y/+DU/Mnz9yuvlfYBOOT7qExN2Jdud+5BAuNs8nZ/caS4wTF39Kg3zQpzaGERoCBg"}}};"#,
2570 r#" new key: {'key': 'CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno', "#,
2571 r#"'signatures': {'@example:matrix.org': {'ed25519:AAAAAAAAAA': "#,
2572 r#"'BQ9Gp0p+6srF+c8OyruqKKd9R4yaub3THYAyyBB/7X/rG8BwcAqFynzl1aGyFYun4Q+087a5OSiglCXI+/kQAA'}}}"#
2573 );
2574 let message = DuplicateOneTimeKeyErrorMessage::from_str(message)
2575 .expect("We should be able to parse the error message");
2576
2577 assert_eq!(message.old_key.to_base64(), "dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs");
2578 assert_eq!(message.new_key.to_base64(), "CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno");
2579
2580 DuplicateOneTimeKeyErrorMessage::from_str("One time key already exists.")
2581 .expect_err("We shouldn't be able to parse an incomplete error message");
2582 }
2583
2584 fn devices_to_verify_against_keys_query_response(
2588 devices: Vec<serde_json::Value>,
2589 ) -> serde_json::Value {
2590 let device_keys: serde_json::Map<String, serde_json::Value> = devices
2591 .into_iter()
2592 .map(|device| (device.get("device_id").unwrap().as_str().unwrap().to_owned(), device))
2593 .collect();
2594 json!({
2595 "device_keys": {
2596 "@example:localhost": device_keys,
2597 },
2598 "master_keys": {
2599 "@example:localhost": {
2600 "keys": {
2601 "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU",
2602 },
2603 "usage": ["master"],
2604 "user_id": "@example:localhost",
2605 },
2606 },
2607 "self_signing_keys": {
2608 "@example:localhost": {
2609 "keys": {
2610 "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM",
2611 },
2612 "usage": ["self_signing"],
2613 "user_id": "@example:localhost",
2614 "signatures": {
2615 "@example:localhost": {
2616 "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "etO1bB+rCk+TQ/FcjQ8eWu/RsRNQNNQ1Ek+PD6//j8yz6igRjfvuHZaMvr/quAFrirfgExph2TdOwlDgN5bFCQ",
2617 },
2618 },
2619 },
2620 },
2621 "user_signing_keys": {
2622 "@example:localhost": {
2623 "keys": {
2624 "ed25519:CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0": "CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0",
2625 },
2626 "usage": ["user_signing"],
2627 "user_id": "@example:localhost",
2628 "signatures": {
2629 "@example:localhost": {
2630 "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "E/DFi/hQTIb/7eSB+HbCXeTLFaLjqWHzLO9GwjL1qdhfO7ew4p6YdtXSH3T2YYr1dKCPteH/4nMYVwOhww2CBg",
2631 },
2632 },
2633 },
2634 }
2635 })
2636 }
2637
2638 #[async_test]
2641 async fn test_devices_to_verify_against_no_devices() {
2644 let server = MockServer::start().await;
2645 let client = logged_in_client(Some(server.uri())).await;
2646
2647 Mock::given(method("POST"))
2648 .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2649 .respond_with(
2650 ResponseTemplate::new(200)
2651 .set_body_json(devices_to_verify_against_keys_query_response(vec![])),
2652 )
2653 .mount(&server)
2654 .await;
2655
2656 assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2657 }
2658
2659 #[async_test]
2660 async fn test_devices_to_verify_against_cross_signed() {
2663 let server = MockServer::start().await;
2664 let client = logged_in_client(Some(server.uri())).await;
2665
2666 Mock::given(method("POST"))
2667 .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2668 .respond_with(ResponseTemplate::new(200).set_body_json(
2669 devices_to_verify_against_keys_query_response(vec![
2670 json!({
2671 "algorithms": [
2672 "m.olm.v1.curve25519-aes-sha2",
2673 "m.megolm.v1.aes-sha2",
2674 ],
2675 "user_id": "@example:localhost",
2676 "device_id": "SIGNEDDEVICE",
2677 "keys": {
2678 "curve25519:SIGNEDDEVICE": "o1LqUtH/sqd3WF+BB2Qr77uw3sDmZhMOz68/IV9aHxs",
2679 "ed25519:SIGNEDDEVICE": "iVoEfMOoUqxXVMLdpZCOgvQuCrT3/kQWkBmB3Phi/lo",
2680 },
2681 "signatures": {
2682 "@example:localhost": {
2683 "ed25519:SIGNEDDEVICE": "C7yRu1fNrdD2EobVdtANMqk3LBtWtTRWrIU22xVS8/Om1kmA/luzek64R3N6JsZhYczVmZYBKhUC9kRvHHwOBg",
2684 "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "frfh2HP28GclmGvwTic00Fj4nZCvm4RlRA6U56mnD5920hOi04+L055ojzp6ybZXvC/GQYfyTHwQXlUN1nvxBA",
2685 },
2686 },
2687 })
2688 ])
2689 ))
2690 .mount(&server)
2691 .await;
2692
2693 assert!(client.encryption().has_devices_to_verify_against().await.unwrap());
2694 }
2695
2696 #[async_test]
2697 async fn test_devices_to_verify_against_dehydrated_and_unsigned() {
2700 let server = MockServer::start().await;
2701 let client = logged_in_client(Some(server.uri())).await;
2702 let user_id = client.user_id().unwrap();
2703 let olm_machine = client.olm_machine().await;
2704 let olm_machine = olm_machine.as_ref().unwrap();
2705
2706 Mock::given(method("POST"))
2707 .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2708 .respond_with(ResponseTemplate::new(200).set_body_json(
2709 devices_to_verify_against_keys_query_response(vec![
2710 json!({
2711 "algorithms": [
2712 "m.olm.v1.curve25519-aes-sha2",
2713 "m.megolm.v1.aes-sha2",
2714 ],
2715 "user_id": "@example:localhost",
2716 "device_id": "DEHYDRATEDDEVICE",
2717 "keys": {
2718 "curve25519:DEHYDRATEDDEVICE": "XOn5VguAgokZ3p9mBz2yOB395fn6j75G8jIPcXEWQGY",
2719 "ed25519:DEHYDRATEDDEVICE": "4GG5xmBT7z4rgUgmWNlKZ+ABE3QlGgTorF+luCnKfYI",
2720 },
2721 "dehydrated": true,
2722 "signatures": {
2723 "@example:localhost": {
2724 "ed25519:DEHYDRATEDDEVICE": "+OMasB7nzVlMV+zRDxkh4h8h/Q0bY42P1SPv7X2IURIelT5G+d+AYSmg30N4maphxEDBqt/vI8/lIr71exc3Dg",
2725 "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "8DzynAgbYgXX1Md5d4Vw91Zstpoi4dpG7levFeVhi4psCAWuBnV76Qu1s2TGjQQ0CLDXEqcxxuX9X4eUK5TGCg",
2726 },
2727 },
2728 }),
2729 json!({
2730 "algorithms": [
2731 "m.olm.v1.curve25519-aes-sha2",
2732 "m.megolm.v1.aes-sha2",
2733 ],
2734 "user_id": "@example:localhost",
2735 "device_id": "UNSIGNEDDEVICE",
2736 "keys": {
2737 "curve25519:UNSIGNEDDEVICE": "mMby6NpprkHxj+ONfO9Z5lBqVUHJBMkrPFSNJhogBkg",
2738 "ed25519:UNSIGNEDDEVICE": "Zifq39ZDrlIaSRf0Hh22owEqXCPE+1JSSgs6LDlubwQ",
2739 },
2740 "signatures": {
2741 "@example:localhost": {
2742 "ed25519:UNSIGNEDDEVICE": "+L29RoDKoTufPGm/Bae65KHno7Z1H7GYhxSKpB4RQZRS7NrR29AMW1PVhEsIozYuDVEFuMZ0L8H3dlcaHxagBA",
2743 },
2744 },
2745 }),
2746 ])
2747 ))
2748 .mount(&server)
2749 .await;
2750
2751 let (request_id, request) = olm_machine.query_keys_for_users([user_id]);
2752 client.keys_query(&request_id, request.device_keys).await.unwrap();
2753
2754 assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2755 }
2756}