1#![doc = include_str!("../docs/encryption.md")]
17#![cfg_attr(target_family = "wasm", allow(unused_imports))]
18
19#[cfg(feature = "experimental-send-custom-to-device")]
20use std::ops::Deref;
21use std::{
22 collections::{BTreeMap, HashSet},
23 io::{Cursor, Read, Write},
24 iter,
25 path::{Path, PathBuf},
26 str::FromStr,
27 sync::Arc,
28 time::Duration,
29};
30
31use eyeball::{SharedObservable, Subscriber};
32use futures_core::Stream;
33use futures_util::{
34 future::try_join,
35 stream::{self, StreamExt},
36};
37#[cfg(feature = "experimental-send-custom-to-device")]
38use matrix_sdk_base::crypto::CollectStrategy;
39use matrix_sdk_base::{
40 StateStoreDataKey, StateStoreDataValue,
41 cross_process_lock::{
42 AcquireCrossProcessLockFn, CrossProcessLock, CrossProcessLockError, CrossProcessLockState,
43 },
44 crypto::{
45 CrossSigningBootstrapRequests, OlmMachine,
46 store::{
47 LockableCryptoStore, SecretImportError,
48 types::{RoomKeyBundleInfo, RoomKeyInfo},
49 },
50 types::{
51 SecretsBundle, SignedKey,
52 requests::{
53 OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
54 },
55 },
56 },
57 sleep::sleep,
58};
59use matrix_sdk_common::{executor::spawn, locks::Mutex as StdMutex};
60use ruma::{
61 DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, TransactionId, UserId,
62 api::{
63 client::{
64 keys::{
65 get_keys, upload_keys, upload_signatures::v3::Request as UploadSignaturesRequest,
66 upload_signing_keys::v3::Request as UploadSigningKeysRequest,
67 },
68 message::send_message_event,
69 to_device::send_event_to_device::v3::{
70 Request as RumaToDeviceRequest, Response as ToDeviceResponse,
71 },
72 uiaa::{AuthData, AuthType, OAuthParams, UiaaInfo},
73 },
74 error::{ErrorBody, StandardErrorBody},
75 },
76 assign,
77 events::room::{
78 MediaSource, ThumbnailInfo,
79 member::{MembershipChange, OriginalSyncRoomMemberEvent},
80 },
81};
82#[cfg(feature = "experimental-send-custom-to-device")]
83use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices};
84use serde::{Deserialize, de::Error as _};
85use tasks::BundleReceiverTask;
86use tokio::sync::{Mutex, RwLockReadGuard};
87use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
88use tracing::{debug, error, instrument, warn};
89use url::Url;
90use vodozemac::Curve25519PublicKey;
91
92use self::{
93 backups::{Backups, types::BackupClientState},
94 futures::UploadEncryptedFile,
95 identities::{Device, DeviceUpdates, IdentityUpdates, UserDevices, UserIdentity},
96 recovery::{Recovery, RecoveryState},
97 secret_storage::SecretStorage,
98 tasks::{BackupDownloadTask, BackupUploadingTask, ClientTasks},
99 verification::{SasVerification, Verification, VerificationRequest},
100};
101use crate::{
102 Client, Error, HttpError, Result, Room, RumaApiError, TransmissionProgress,
103 attachment::Thumbnail,
104 client::{ClientInner, WeakClient},
105 cross_process_lock::CrossProcessLockGuard,
106 error::HttpResult,
107};
108
109pub mod backups;
110pub mod futures;
111pub mod identities;
112pub mod recovery;
113pub mod secret_storage;
114pub(crate) mod tasks;
115pub mod verification;
116
117pub use matrix_sdk_base::crypto::{
118 CrossSigningStatus, CryptoStoreError, DecryptorError, EventError, KeyExportError, LocalTrust,
119 MediaEncryptionInfo, MegolmError, OlmError, RoomKeyImportResult, SessionCreationError,
120 SignatureError, VERSION,
121 olm::{
122 SessionCreationError as MegolmSessionCreationError,
123 SessionExportError as OlmSessionExportError,
124 },
125 vodozemac,
126};
127use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
128
129#[cfg(feature = "experimental-send-custom-to-device")]
130use crate::config::RequestConfig;
131pub use crate::error::RoomKeyImportError;
132
133#[cfg(feature = "sqlite")]
136#[derive(Debug, thiserror::Error)]
137pub enum BundleExportError {
138 #[error(transparent)]
140 OpenStoreError(#[from] matrix_sdk_sqlite::OpenStoreError),
141 #[error(transparent)]
143 StoreError(#[from] CryptoStoreError),
144 #[error(transparent)]
147 SecretExport(#[from] matrix_sdk_base::crypto::store::SecretsBundleExportError),
148}
149
150#[derive(Debug, thiserror::Error)]
153pub enum BundleImportError {
154 #[error(transparent)]
156 SecretImport(#[from] SecretImportError),
157 #[error(transparent)]
159 DeviceKeys(#[from] Error),
160}
161
162#[cfg(feature = "sqlite")]
171pub async fn export_secrets_bundle_from_store(
172 database_path: impl AsRef<Path>,
173 passphrase: Option<&str>,
174) -> std::result::Result<Option<(OwnedUserId, SecretsBundle)>, BundleExportError> {
175 use matrix_sdk_base::crypto::store::CryptoStore;
176
177 let store = matrix_sdk_sqlite::SqliteCryptoStore::open(database_path, passphrase).await?;
178 let account =
179 store.load_account().await.map_err(|e| BundleExportError::StoreError(e.into()))?;
180
181 if let Some(account) = account {
182 let machine = OlmMachine::with_store(&account.user_id, &account.device_id, store, None)
183 .await
184 .map_err(BundleExportError::StoreError)?;
185
186 let bundle = machine.store().export_secrets_bundle().await?;
187
188 Ok(Some((account.user_id.to_owned(), bundle)))
189 } else {
190 Ok(None)
191 }
192}
193
194pub(crate) struct EncryptionData {
196 pub tasks: StdMutex<ClientTasks>,
199
200 pub encryption_settings: EncryptionSettings,
202
203 pub backup_state: BackupClientState,
205
206 pub recovery_state: SharedObservable<RecoveryState>,
208}
209
210impl EncryptionData {
211 pub fn new(encryption_settings: EncryptionSettings) -> Self {
212 Self {
213 encryption_settings,
214
215 tasks: StdMutex::new(Default::default()),
216 backup_state: Default::default(),
217 recovery_state: Default::default(),
218 }
219 }
220
221 pub fn initialize_tasks(&self, client: &Arc<ClientInner>) {
222 let weak_client = WeakClient::from_inner(client);
223
224 let mut tasks = self.tasks.lock();
225 tasks.upload_room_keys = Some(BackupUploadingTask::new(weak_client.clone()));
226
227 if self.encryption_settings.backup_download_strategy
228 == BackupDownloadStrategy::AfterDecryptionFailure
229 {
230 tasks.download_room_keys = Some(BackupDownloadTask::new(weak_client));
231 }
232 }
233
234 pub fn initialize_recovery_state_update_task(&self, client: &Client) {
240 let mut guard = self.tasks.lock();
241
242 let future = Recovery::update_state_after_backup_state_change(client);
243 let join_handle = spawn(future);
244
245 guard.update_recovery_state_after_backup = Some(join_handle);
246 }
247}
248
249#[derive(Clone, Copy, Debug, Default)]
251pub struct EncryptionSettings {
252 pub auto_enable_cross_signing: bool,
258
259 pub backup_download_strategy: BackupDownloadStrategy,
264
265 pub auto_enable_backups: bool,
267}
268
269#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
271#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
272pub enum BackupDownloadStrategy {
273 OneShot,
284
285 AfterDecryptionFailure,
287
288 #[default]
293 Manual,
294}
295
296#[derive(Clone, Copy, Debug, Eq, PartialEq)]
301pub enum VerificationState {
302 Unknown,
304 Verified,
307 Unverified,
309}
310
311#[derive(Debug)]
322pub struct CrossSigningResetHandle {
323 client: Client,
324 upload_request: UploadSigningKeysRequest,
325 signatures_request: UploadSignaturesRequest,
326 auth_type: CrossSigningResetAuthType,
327 is_cancelled: Mutex<bool>,
328}
329
330impl CrossSigningResetHandle {
331 pub fn new(
333 client: Client,
334 upload_request: UploadSigningKeysRequest,
335 signatures_request: UploadSignaturesRequest,
336 auth_type: CrossSigningResetAuthType,
337 ) -> Self {
338 Self {
339 client,
340 upload_request,
341 signatures_request,
342 auth_type,
343 is_cancelled: Mutex::new(false),
344 }
345 }
346
347 pub fn auth_type(&self) -> &CrossSigningResetAuthType {
350 &self.auth_type
351 }
352
353 pub async fn auth(&self, auth: Option<AuthData>) -> Result<()> {
357 const RETRY_EVERY: Duration = Duration::from_millis(500);
359
360 const TIMEOUT: Duration = Duration::from_mins(2);
362
363 tokio::time::timeout(TIMEOUT, async {
364 let mut upload_request = self.upload_request.clone();
365 upload_request.auth = auth;
366
367 debug!(
368 "Repeatedly PUTting to keys/device_signing/upload until it works \
369 or we hit a permanent failure."
370 );
371 while let Err(e) = self.client.send(upload_request.clone()).await {
372 if *self.is_cancelled.lock().await {
373 return Ok(());
374 }
375
376 match e.as_uiaa_response() {
377 Some(uiaa_info) => {
378 if uiaa_info.auth_error.is_some() {
379 return Err(e.into());
380 }
381 }
382 None => return Err(e.into()),
383 }
384
385 debug!(
386 "PUT to keys/device_signing/upload failed with 401. Retrying after \
387 a short delay."
388 );
389 sleep(RETRY_EVERY).await;
390 }
391
392 self.client.send(self.signatures_request.clone()).await?;
393
394 Ok(())
395 })
396 .await
397 .unwrap_or_else(|_| {
398 warn!("Timed out waiting for keys/device_signing/upload to succeed.");
399 Err(Error::Timeout)
400 })
401 }
402
403 pub async fn cancel(&self) {
405 *self.is_cancelled.lock().await = true;
406 }
407}
408
409#[derive(Debug, Clone)]
412pub enum CrossSigningResetAuthType {
413 Uiaa(UiaaInfo),
415 OAuth(OAuthCrossSigningResetInfo),
418}
419
420impl CrossSigningResetAuthType {
421 fn new(error: &HttpError) -> Result<Option<Self>> {
422 if let Some(auth_info) = error.as_uiaa_response() {
423 if let Ok(Some(auth_info)) = OAuthCrossSigningResetInfo::from_auth_info(auth_info) {
424 Ok(Some(CrossSigningResetAuthType::OAuth(auth_info)))
425 } else {
426 Ok(Some(CrossSigningResetAuthType::Uiaa(auth_info.clone())))
427 }
428 } else {
429 Ok(None)
430 }
431 }
432}
433
434#[derive(Debug, Clone, Deserialize)]
437pub struct OAuthCrossSigningResetInfo {
438 pub approval_url: Url,
440}
441
442impl OAuthCrossSigningResetInfo {
443 fn from_auth_info(auth_info: &UiaaInfo) -> Result<Option<Self>> {
444 let Some(parameters) = auth_info.params::<OAuthParams>(&AuthType::OAuth)? else {
445 return Ok(None);
446 };
447
448 Ok(Some(OAuthCrossSigningResetInfo { approval_url: parameters.url.as_str().try_into()? }))
449 }
450}
451
452#[derive(Clone, Debug)]
455pub struct DuplicateOneTimeKeyErrorMessage {
456 pub old_key: Curve25519PublicKey,
458 pub new_key: Curve25519PublicKey,
460}
461
462impl FromStr for DuplicateOneTimeKeyErrorMessage {
463 type Err = serde_json::Error;
464
465 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
466 let mut split = s.split_terminator(';');
470
471 let old_key = split
472 .next()
473 .ok_or(serde_json::Error::custom("Old key is missing in the error message"))?;
474 let new_key = split
475 .next()
476 .ok_or(serde_json::Error::custom("New key is missing in the error message"))?;
477
478 let old_key_index = old_key
481 .find("Old key:")
482 .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
483
484 let old_key = old_key[old_key_index..]
485 .trim()
486 .strip_prefix("Old key:")
487 .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
488
489 let new_key = new_key
492 .trim()
493 .strip_prefix("new key:")
494 .ok_or(serde_json::Error::custom("New key is missing the prefix"))?;
495
496 let new_key = new_key.replace("'", "\"");
499
500 let old_key: SignedKey = serde_json::from_str(old_key)?;
502 let new_key: SignedKey = serde_json::from_str(&new_key)?;
503
504 let old_key = old_key.key();
506 let new_key = new_key.key();
507
508 Ok(Self { old_key, new_key })
509 }
510}
511
512impl Client {
513 pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
514 self.base_client().olm_machine().await
515 }
516
517 pub(crate) async fn mark_request_as_sent(
518 &self,
519 request_id: &TransactionId,
520 response: impl Into<matrix_sdk_base::crypto::types::requests::AnyIncomingResponse<'_>>,
521 ) -> Result<(), matrix_sdk_base::Error> {
522 Ok(self
523 .olm_machine()
524 .await
525 .as_ref()
526 .expect(
527 "We should have an olm machine once we try to mark E2EE related requests as sent",
528 )
529 .mark_request_as_sent(request_id, response)
530 .await?)
531 }
532
533 #[instrument(skip(self, device_keys))]
539 pub(crate) async fn keys_query(
540 &self,
541 request_id: &TransactionId,
542 device_keys: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
543 ) -> Result<get_keys::v3::Response> {
544 let request = assign!(get_keys::v3::Request::new(), { device_keys });
545
546 let response = self.send(request).await?;
547 self.mark_request_as_sent(request_id, &response).await?;
548 self.encryption().update_state_after_keys_query(&response).await;
549
550 Ok(response)
551 }
552
553 pub fn upload_encrypted_file<'a, R: Read + ?Sized + 'a>(
587 &'a self,
588 reader: &'a mut R,
589 ) -> UploadEncryptedFile<'a, R> {
590 UploadEncryptedFile::new(self, reader)
591 }
592
593 pub(crate) async fn upload_encrypted_media_and_thumbnail(
596 &self,
597 data: &[u8],
598 thumbnail: Option<Thumbnail>,
599 send_progress: SharedObservable<TransmissionProgress>,
600 ) -> Result<(MediaSource, Option<(MediaSource, Box<ThumbnailInfo>)>)> {
601 let upload_thumbnail = self.upload_encrypted_thumbnail(thumbnail, send_progress.clone());
602
603 let upload_attachment = async {
604 let mut cursor = Cursor::new(data);
605 self.upload_encrypted_file(&mut cursor)
606 .with_send_progress_observable(send_progress)
607 .await
608 };
609
610 let (thumbnail, file) = try_join(upload_thumbnail, upload_attachment).await?;
611
612 Ok((MediaSource::Encrypted(Box::new(file)), thumbnail))
613 }
614
615 async fn upload_encrypted_thumbnail(
618 &self,
619 thumbnail: Option<Thumbnail>,
620 send_progress: SharedObservable<TransmissionProgress>,
621 ) -> Result<Option<(MediaSource, Box<ThumbnailInfo>)>> {
622 let Some(thumbnail) = thumbnail else {
623 return Ok(None);
624 };
625
626 let (data, _, thumbnail_info) = thumbnail.into_parts();
627 let mut cursor = Cursor::new(data);
628
629 let file = self
630 .upload_encrypted_file(&mut cursor)
631 .with_send_progress_observable(send_progress)
632 .await?;
633
634 Ok(Some((MediaSource::Encrypted(Box::new(file)), thumbnail_info)))
635 }
636
637 pub(crate) async fn claim_one_time_keys(
643 &self,
644 users: impl Iterator<Item = &UserId>,
645 ) -> Result<()> {
646 let _lock = self.locks().key_claim_lock.lock().await;
647
648 if let Some((request_id, request)) = self
649 .olm_machine()
650 .await
651 .as_ref()
652 .ok_or(Error::NoOlmMachine)?
653 .get_missing_sessions(users)
654 .await?
655 {
656 let response = self.send(request).await?;
657 self.mark_request_as_sent(&request_id, &response).await?;
658 }
659
660 Ok(())
661 }
662
663 #[instrument(skip(self, request))]
673 pub(crate) async fn keys_upload(
674 &self,
675 request_id: &TransactionId,
676 request: &upload_keys::v3::Request,
677 ) -> Result<upload_keys::v3::Response> {
678 debug!(
679 device_keys = request.device_keys.is_some(),
680 one_time_key_count = request.one_time_keys.len(),
681 "Uploading public encryption keys",
682 );
683
684 let response = self.send(request.clone()).await?;
685 self.mark_request_as_sent(request_id, &response).await?;
686
687 Ok(response)
688 }
689
690 pub(crate) async fn room_send_helper(
691 &self,
692 request: &RoomMessageRequest,
693 ) -> Result<send_message_event::v3::Response> {
694 let content = request.content.clone();
695 let txn_id = request.txn_id.clone();
696 let room_id = &request.room_id;
697
698 self.get_room(room_id)
699 .expect("Can't send a message to a room that isn't known to the store")
700 .send(*content)
701 .with_transaction_id(txn_id)
702 .await
703 .map(|result| result.response)
704 }
705
706 pub(crate) async fn send_to_device(
707 &self,
708 request: &ToDeviceRequest,
709 ) -> HttpResult<ToDeviceResponse> {
710 let request = RumaToDeviceRequest::new_raw(
711 request.event_type.clone(),
712 request.txn_id.clone(),
713 request.messages.clone(),
714 );
715
716 self.send(request).await
717 }
718
719 pub(crate) async fn send_verification_request(
720 &self,
721 request: OutgoingVerificationRequest,
722 ) -> Result<()> {
723 use matrix_sdk_base::crypto::types::requests::OutgoingVerificationRequest::*;
724
725 match request {
726 ToDevice(t) => {
727 self.send_to_device(&t).await?;
728 }
729 InRoom(r) => {
730 self.room_send_helper(&r).await?;
731 }
732 }
733
734 Ok(())
735 }
736
737 async fn send_outgoing_request(&self, r: OutgoingRequest) -> Result<()> {
738 use matrix_sdk_base::crypto::types::requests::AnyOutgoingRequest;
739
740 match r.request() {
741 AnyOutgoingRequest::KeysQuery(request) => {
742 self.keys_query(r.request_id(), request.device_keys.clone()).await?;
743 }
744 AnyOutgoingRequest::KeysUpload(request) => {
745 let response = self.keys_upload(r.request_id(), request).await;
746
747 if let Err(e) = &response {
748 match e.as_ruma_api_error() {
749 Some(RumaApiError::ClientApi(e)) if e.status_code == 400 => {
750 if let ErrorBody::Standard(StandardErrorBody { message, .. }) = &e.body
751 {
752 {
757 let already_reported = self
758 .state_store()
759 .get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded)
760 .await?
761 .is_some();
762
763 if message.starts_with("One time key") && !already_reported {
764 let error_message =
765 DuplicateOneTimeKeyErrorMessage::from_str(message);
766
767 if let Ok(message) = &error_message {
768 error!(
769 sentry = true,
770 old_key = %message.old_key,
771 new_key = %message.new_key,
772 "Duplicate one-time keys have been uploaded"
773 );
774 } else {
775 error!(
776 sentry = true,
777 "Duplicate one-time keys have been uploaded"
778 );
779 }
780
781 self.state_store()
782 .set_kv_data(
783 StateStoreDataKey::OneTimeKeyAlreadyUploaded,
784 StateStoreDataValue::OneTimeKeyAlreadyUploaded,
785 )
786 .await?;
787
788 if let Err(e) = self
789 .inner
790 .duplicate_key_upload_error_sender
791 .send(error_message.ok())
792 {
793 error!(
794 "Failed to dispatch duplicate key upload error notification: {}",
795 e
796 );
797 }
798 }
799 }
800 }
801 }
802 _ => {}
803 }
804
805 response?;
806 }
807 }
808 AnyOutgoingRequest::ToDeviceRequest(request) => {
809 let response = self.send_to_device(request).await?;
810 self.mark_request_as_sent(r.request_id(), &response).await?;
811 }
812 AnyOutgoingRequest::SignatureUpload(request) => {
813 let response = self.send(request.clone()).await?;
814 self.mark_request_as_sent(r.request_id(), &response).await?;
815 }
816 AnyOutgoingRequest::RoomMessage(request) => {
817 let response = self.room_send_helper(request).await?;
818 self.mark_request_as_sent(r.request_id(), &response).await?;
819 }
820 AnyOutgoingRequest::KeysClaim(request) => {
821 let response = self.send(request.clone()).await?;
822 self.mark_request_as_sent(r.request_id(), &response).await?;
823 }
824 }
825
826 Ok(())
827 }
828
829 #[instrument(skip_all)]
830 pub(crate) async fn send_outgoing_requests(&self) -> Result<()> {
831 const MAX_CONCURRENT_REQUESTS: usize = 20;
832
833 if let Err(e) = self.claim_one_time_keys(iter::empty()).await {
836 warn!("Error while claiming one-time keys {:?}", e);
837 }
838
839 let outgoing_requests = stream::iter(
840 self.olm_machine()
841 .await
842 .as_ref()
843 .ok_or(Error::NoOlmMachine)?
844 .outgoing_requests()
845 .await?,
846 )
847 .map(|r| self.send_outgoing_request(r));
848
849 let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS);
850
851 requests
852 .for_each(|r| async move {
853 match r {
854 Ok(_) => (),
855 Err(e) => warn!(error = ?e, "Error when sending out an outgoing E2EE request"),
856 }
857 })
858 .await;
859
860 Ok(())
861 }
862}
863
864#[cfg(any(feature = "testing", test))]
865impl Client {
866 pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
868 self.olm_machine().await
869 }
870
871 pub fn abort_bundle_receiver_task(&self) {
873 let tasks = self.inner.e2ee.tasks.lock();
874 if let Some(task) = tasks.receive_historic_room_key_bundles.as_ref() {
875 task.abort()
876 }
877 }
878}
879
880#[derive(Debug, Clone)]
884pub struct Encryption {
885 client: Client,
887}
888
889impl Encryption {
890 pub(crate) fn new(client: Client) -> Self {
891 Self { client }
892 }
893
894 pub(crate) fn settings(&self) -> EncryptionSettings {
896 self.client.inner.e2ee.encryption_settings
897 }
898
899 pub async fn ed25519_key(&self) -> Option<String> {
902 self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().ed25519.to_base64())
903 }
904
905 pub async fn curve25519_key(&self) -> Option<Curve25519PublicKey> {
907 self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().curve25519)
908 }
909
910 pub async fn device_creation_timestamp(&self) -> MilliSecondsSinceUnixEpoch {
912 match self.get_own_device().await {
913 Ok(Some(device)) => device.first_time_seen_ts(),
914 _ => MilliSecondsSinceUnixEpoch::now(),
916 }
917 }
918
919 pub async fn import_secrets_bundle(
932 &self,
933 bundle: &SecretsBundle,
934 ) -> Result<(), BundleImportError> {
935 self.import_secrets_bundle_impl(bundle).await?;
936
937 self.ensure_device_keys_upload().await?;
940 self.wait_for_e2ee_initialization_tasks().await;
941
942 if !self.backups().are_enabled().await {
947 self.backups().maybe_resume_backups().await?;
948 }
949
950 Ok(())
951 }
952
953 pub(crate) async fn import_secrets_bundle_impl(
954 &self,
955 bundle: &SecretsBundle,
956 ) -> Result<(), SecretImportError> {
957 let olm_machine = self.client.olm_machine().await;
958 let olm_machine =
959 olm_machine.as_ref().expect("This should only be called once we have an OlmMachine");
960
961 olm_machine.store().import_secrets_bundle(bundle).await
962 }
963
964 pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
969 let olm = self.client.olm_machine().await;
970 let machine = olm.as_ref()?;
971 Some(machine.cross_signing_status().await)
972 }
973
974 pub async fn has_devices_to_verify_against(&self) -> Result<bool> {
980 let olm_machine = self.client.olm_machine().await;
981 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
982 let user_id = olm_machine.user_id();
983
984 self.ensure_initial_key_query().await?;
985
986 let devices = self.get_user_devices(user_id).await?;
987
988 let ret = devices.devices().any(|device| {
989 device.is_cross_signed_by_owner()
990 && device.curve25519_key().is_some()
991 && !device.is_dehydrated()
992 });
993
994 Ok(ret)
995 }
996
997 pub async fn tracked_users(&self) -> Result<HashSet<OwnedUserId>, CryptoStoreError> {
1002 if let Some(machine) = self.client.olm_machine().await.as_ref() {
1003 machine.tracked_users().await
1004 } else {
1005 Ok(HashSet::new())
1006 }
1007 }
1008
1009 pub fn verification_state(&self) -> Subscriber<VerificationState> {
1032 self.client.inner.verification_state.subscribe_reset()
1033 }
1034
1035 pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
1037 let olm = self.client.olm_machine().await;
1038 let olm = olm.as_ref()?;
1039 #[allow(clippy::bind_instead_of_map)]
1040 olm.get_verification(user_id, flow_id).and_then(|v| match v {
1041 matrix_sdk_base::crypto::Verification::SasV1(sas) => {
1042 Some(SasVerification { inner: sas, client: self.client.clone() }.into())
1043 }
1044 #[cfg(feature = "qrcode")]
1045 matrix_sdk_base::crypto::Verification::QrV1(qr) => {
1046 Some(verification::QrVerification { inner: qr, client: self.client.clone() }.into())
1047 }
1048 _ => None,
1049 })
1050 }
1051
1052 pub async fn get_verification_request(
1055 &self,
1056 user_id: &UserId,
1057 flow_id: impl AsRef<str>,
1058 ) -> Option<VerificationRequest> {
1059 let olm = self.client.olm_machine().await;
1060 let olm = olm.as_ref()?;
1061
1062 olm.get_verification_request(user_id, flow_id)
1063 .map(|r| VerificationRequest { inner: r, client: self.client.clone() })
1064 }
1065
1066 pub async fn get_device(
1100 &self,
1101 user_id: &UserId,
1102 device_id: &DeviceId,
1103 ) -> Result<Option<Device>, CryptoStoreError> {
1104 let olm = self.client.olm_machine().await;
1105 let Some(machine) = olm.as_ref() else { return Ok(None) };
1106 let device = machine.get_device(user_id, device_id, None).await?;
1107 Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
1108 }
1109
1110 pub async fn get_own_device(&self) -> Result<Option<Device>, CryptoStoreError> {
1117 let olm = self.client.olm_machine().await;
1118 let Some(machine) = olm.as_ref() else { return Ok(None) };
1119 let device = machine.get_device(machine.user_id(), machine.device_id(), None).await?;
1120 Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
1121 }
1122
1123 pub async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices, Error> {
1149 let devices = self
1150 .client
1151 .olm_machine()
1152 .await
1153 .as_ref()
1154 .ok_or(Error::NoOlmMachine)?
1155 .get_user_devices(user_id, None)
1156 .await?;
1157
1158 Ok(UserDevices { inner: devices, client: self.client.clone() })
1159 }
1160
1161 pub async fn get_user_identity(
1197 &self,
1198 user_id: &UserId,
1199 ) -> Result<Option<UserIdentity>, CryptoStoreError> {
1200 let olm = self.client.olm_machine().await;
1201 let Some(olm) = olm.as_ref() else { return Ok(None) };
1202 let identity = olm.get_identity(user_id, None).await?;
1203
1204 Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1205 }
1206
1207 pub async fn request_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
1245 let olm = self.client.olm_machine().await;
1246 let Some(olm) = olm.as_ref() else { return Ok(None) };
1247
1248 let (request_id, request) = olm.query_keys_for_users(iter::once(user_id));
1249 self.client.keys_query(&request_id, request.device_keys).await?;
1250
1251 let identity = olm.get_identity(user_id, None).await?;
1252 Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1253 }
1254
1255 pub async fn devices_stream(&self) -> Result<impl Stream<Item = DeviceUpdates> + use<>> {
1286 let olm = self.client.olm_machine().await;
1287 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1288 let client = self.client.to_owned();
1289
1290 Ok(olm
1291 .store()
1292 .devices_stream()
1293 .map(move |updates| DeviceUpdates::new(client.to_owned(), updates)))
1294 }
1295
1296 pub async fn user_identities_stream(
1324 &self,
1325 ) -> Result<impl Stream<Item = IdentityUpdates> + use<>> {
1326 let olm = self.client.olm_machine().await;
1327 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1328 let client = self.client.to_owned();
1329
1330 Ok(olm
1331 .store()
1332 .user_identities_stream()
1333 .map(move |updates| IdentityUpdates::new(client.to_owned(), updates)))
1334 }
1335
1336 pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData>) -> Result<()> {
1375 let olm = self.client.olm_machine().await;
1376 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1377
1378 let CrossSigningBootstrapRequests {
1379 upload_signing_keys_req,
1380 upload_keys_req,
1381 upload_signatures_req,
1382 } = olm.bootstrap_cross_signing(false).await?;
1383
1384 let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1385 auth: auth_data,
1386 master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1387 self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1388 user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1389 });
1390
1391 if let Some(req) = upload_keys_req {
1392 self.client.send_outgoing_request(req).await?;
1393 }
1394 self.client.send(upload_signing_keys_req).await?;
1395 self.client.send(upload_signatures_req).await?;
1396
1397 Ok(())
1398 }
1399
1400 pub async fn reset_cross_signing(&self) -> Result<Option<CrossSigningResetHandle>> {
1437 let olm = self.client.olm_machine().await;
1438 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1439
1440 let CrossSigningBootstrapRequests {
1441 upload_keys_req,
1442 upload_signing_keys_req,
1443 upload_signatures_req,
1444 } = olm.bootstrap_cross_signing(true).await?;
1445
1446 let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1447 auth: None,
1448 master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1449 self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1450 user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1451 });
1452
1453 if let Some(req) = upload_keys_req {
1454 self.client.send_outgoing_request(req).await?;
1455 }
1456
1457 if let Err(error) = self.client.send(upload_signing_keys_req.clone()).await {
1458 if let Ok(Some(auth_type)) = CrossSigningResetAuthType::new(&error) {
1459 let client = self.client.clone();
1460
1461 Ok(Some(CrossSigningResetHandle::new(
1462 client,
1463 upload_signing_keys_req,
1464 upload_signatures_req,
1465 auth_type,
1466 )))
1467 } else {
1468 Err(error.into())
1469 }
1470 } else {
1471 self.client.send(upload_signatures_req).await?;
1472
1473 Ok(None)
1474 }
1475 }
1476
1477 async fn ensure_initial_key_query(&self) -> Result<()> {
1480 let olm_machine = self.client.olm_machine().await;
1481 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1482
1483 let user_id = olm_machine.user_id();
1484
1485 if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1486 let (request_id, request) = olm_machine.query_keys_for_users([olm_machine.user_id()]);
1487 self.client.keys_query(&request_id, request.device_keys).await?;
1488 }
1489
1490 Ok(())
1491 }
1492
1493 pub async fn bootstrap_cross_signing_if_needed(
1540 &self,
1541 auth_data: Option<AuthData>,
1542 ) -> Result<()> {
1543 let olm_machine = self.client.olm_machine().await;
1544 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1545 let user_id = olm_machine.user_id();
1546
1547 self.ensure_initial_key_query().await?;
1548
1549 if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1550 self.bootstrap_cross_signing(auth_data).await?;
1551 }
1552
1553 Ok(())
1554 }
1555
1556 #[cfg(not(target_family = "wasm"))]
1608 pub async fn export_room_keys(
1609 &self,
1610 path: PathBuf,
1611 passphrase: &str,
1612 predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
1613 ) -> Result<()> {
1614 let olm = self.client.olm_machine().await;
1615 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1616
1617 let keys = olm.store().export_room_keys(predicate).await?;
1618 let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1619
1620 let encrypt = move || -> Result<()> {
1621 let export: String =
1622 matrix_sdk_base::crypto::encrypt_room_key_export(&keys, &passphrase, 500_000)?;
1623 let mut file = std::fs::File::create(path)?;
1624 file.write_all(&export.into_bytes())?;
1625 Ok(())
1626 };
1627
1628 let task = tokio::task::spawn_blocking(encrypt);
1629 task.await.expect("Task join error")
1630 }
1631
1632 #[cfg(not(target_family = "wasm"))]
1670 pub async fn import_room_keys(
1671 &self,
1672 path: PathBuf,
1673 passphrase: &str,
1674 ) -> Result<RoomKeyImportResult, RoomKeyImportError> {
1675 let olm = self.client.olm_machine().await;
1676 let olm = olm.as_ref().ok_or(RoomKeyImportError::StoreClosed)?;
1677 let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1678
1679 let decrypt = move || {
1680 let file = std::fs::File::open(path)?;
1681 matrix_sdk_base::crypto::decrypt_room_key_export(file, &passphrase)
1682 };
1683
1684 let task = tokio::task::spawn_blocking(decrypt);
1685 let import = task.await.expect("Task join error")?;
1686
1687 let ret = olm.store().import_exported_room_keys(import, |_, _| {}).await?;
1688
1689 self.backups().maybe_trigger_backup();
1690
1691 Ok(ret)
1692 }
1693
1694 pub async fn room_keys_received_stream(
1725 &self,
1726 ) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<>>
1727 {
1728 let olm = self.client.olm_machine().await;
1729 let olm = olm.as_ref()?;
1730
1731 Some(olm.store().room_keys_received_stream())
1732 }
1733
1734 pub async fn historic_room_key_stream(
1765 &self,
1766 ) -> Option<impl Stream<Item = RoomKeyBundleInfo> + use<>> {
1767 let olm = self.client.olm_machine().await;
1768 let olm = olm.as_ref()?;
1769
1770 Some(olm.store().historic_room_key_stream())
1771 }
1772
1773 pub fn secret_storage(&self) -> SecretStorage {
1775 SecretStorage { client: self.client.to_owned() }
1776 }
1777
1778 pub fn backups(&self) -> Backups {
1780 Backups { client: self.client.to_owned() }
1781 }
1782
1783 pub fn recovery(&self) -> Recovery {
1785 Recovery { client: self.client.to_owned() }
1786 }
1787
1788 pub async fn enable_cross_process_store_lock(&self, lock_value: String) -> Result<(), Error> {
1800 if let Some(prev_lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1802 let prev_holder = prev_lock.lock_holder();
1803 if prev_holder.is_some() && prev_holder.unwrap() == lock_value {
1804 return Ok(());
1805 }
1806 warn!(
1807 "Recreating cross-process store lock with a different holder value: \
1808 prev was {prev_holder:?}, new is {lock_value}"
1809 );
1810 }
1811
1812 let olm_machine = self.client.base_client().olm_machine().await;
1813 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1814
1815 let lock = olm_machine.store().create_store_lock(
1816 "cross_process_lock".to_owned(),
1817 CrossProcessLockConfig::multi_process(lock_value.to_owned()),
1818 );
1819
1820 self.client
1821 .locks()
1822 .cross_process_crypto_store_lock
1823 .set(lock)
1824 .map_err(|_| Error::BadCryptoStoreState)?;
1825
1826 Ok(())
1827 }
1828
1829 pub async fn spin_lock_store(
1837 &self,
1838 max_backoff: Option<u32>,
1839 ) -> Result<Option<CrossProcessLockGuard>, Error> {
1840 self.lock_store(async move |lock| lock.spin_lock(max_backoff).await).await
1841 }
1842
1843 pub async fn try_lock_store_once(&self) -> Result<Option<CrossProcessLockGuard>, Error> {
1851 self.lock_store(CrossProcessLock::try_lock_once).await
1852 }
1853
1854 pub async fn lock_store<F: AcquireCrossProcessLockFn<LockableCryptoStore>>(
1861 &self,
1862 acquire: F,
1863 ) -> Result<Option<CrossProcessLockGuard>, Error> {
1864 let wrap_err = |e: CryptoStoreError| {
1865 Error::CrossProcessLockError(Box::new(CrossProcessLockError::TryLock(Arc::new(e))))
1866 };
1867 if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1868 Ok(Some(match acquire(lock).await.map_err(wrap_err)?? {
1869 CrossProcessLockState::Clean(guard) => guard,
1870 CrossProcessLockState::Dirty(guard) => {
1871 self.client.base_client().regenerate_olm(None).await?;
1872 guard.clear_dirty();
1873 guard
1874 }
1875 }))
1876 } else {
1877 Ok(None)
1878 }
1879 }
1880
1881 #[cfg(any(test, feature = "testing"))]
1883 pub async fn uploaded_key_count(&self) -> Result<u64> {
1884 let olm_machine = self.client.olm_machine().await;
1885 let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?;
1886 Ok(olm_machine.uploaded_key_count().await?)
1887 }
1888
1889 pub(crate) async fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1913 let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite {
1917 Some(BundleReceiverTask::new(&self.client).await)
1918 } else {
1919 None
1920 };
1921
1922 let mut tasks = self.client.inner.e2ee.tasks.lock();
1923
1924 let this = self.clone();
1925
1926 tasks.setup_e2ee = Some(spawn(async move {
1927 this.update_verification_state().await;
1930
1931 if this.settings().auto_enable_cross_signing
1932 && let Err(e) = this.bootstrap_cross_signing_if_needed(auth_data).await
1933 {
1934 error!("Couldn't bootstrap cross signing {e:?}");
1935 }
1936
1937 if let Err(e) = this.backups().setup_and_resume().await {
1938 error!("Couldn't setup and resume backups {e:?}");
1939 }
1940 if let Err(e) = this.recovery().setup().await {
1941 error!("Couldn't setup and resume recovery {e:?}");
1942 }
1943 }));
1944
1945 tasks.receive_historic_room_key_bundles = bundle_receiver_task;
1946
1947 self.setup_room_membership_session_discard_handler();
1948 }
1949
1950 pub async fn wait_for_e2ee_initialization_tasks(&self) {
1953 let task = self.client.inner.e2ee.tasks.lock().setup_e2ee.take();
1954
1955 if let Some(task) = task
1956 && let Err(err) = task.await
1957 {
1958 warn!("Error when initializing backups: {err}");
1959 }
1960 }
1961
1962 pub(crate) async fn ensure_device_keys_upload(&self) -> Result<()> {
1972 let olm = self.client.olm_machine().await;
1973 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1974
1975 if let Some((request_id, request)) = olm.upload_device_keys().await? {
1976 self.client.keys_upload(&request_id, &request).await?;
1977
1978 let (request_id, request) = olm.query_keys_for_users([olm.user_id()]);
1979 self.client.keys_query(&request_id, request.device_keys).await?;
1980 }
1981
1982 Ok(())
1983 }
1984
1985 pub(crate) async fn update_state_after_keys_query(&self, response: &get_keys::v3::Response) {
1986 self.recovery().update_state_after_keys_query(response).await;
1987
1988 if let Some(user_id) = self.client.user_id() {
1990 let contains_own_device = response.device_keys.contains_key(user_id);
1991
1992 if contains_own_device {
1993 self.update_verification_state().await;
1994 }
1995 }
1996 }
1997
1998 async fn update_verification_state(&self) {
1999 match self.get_own_device().await {
2000 Ok(device) => {
2001 if let Some(device) = device {
2002 let is_verified = device.is_cross_signed_by_owner();
2003
2004 if is_verified {
2005 self.client.inner.verification_state.set(VerificationState::Verified);
2006 } else {
2007 self.client.inner.verification_state.set(VerificationState::Unverified);
2008 }
2009 } else {
2010 warn!("Couldn't find out own device in the store.");
2011 self.client.inner.verification_state.set(VerificationState::Unknown);
2012 }
2013 }
2014 Err(error) => {
2015 warn!("Failed retrieving own device: {error}");
2016 self.client.inner.verification_state.set(VerificationState::Unknown);
2017 }
2018 }
2019 }
2020
2021 fn setup_room_membership_session_discard_handler(&self) {
2041 let client = WeakClient::from_client(&self.client);
2042 self.client.add_event_handler(|ev: OriginalSyncRoomMemberEvent, room: Room| async move {
2043 let Some(client) = client.get() else {
2044 return;
2046 };
2047 let Some(user_id) = client.user_id() else {
2048 return;
2050 };
2051 let olm = client.olm_machine().await;
2052 let Some(olm) = olm.as_ref() else {
2053 warn!("Cannot discard session - Olm machine is not available");
2054 return;
2055 };
2056
2057 if !matches!(ev.membership_change(), MembershipChange::Left) || ev.sender == user_id {
2058 return;
2060 }
2061
2062 debug!(room_id = ?room.room_id(), member_id = ?ev.sender, "Discarding session as a user left the room");
2063
2064 if let Err(e) = olm.discard_room_key(room.room_id()).await {
2067 warn!(
2068 room_id = ?room.room_id(),
2069 "Error discarding room key after member leave: {e:?}"
2070 );
2071 }
2072 });
2073 }
2074
2075 #[cfg(feature = "experimental-send-custom-to-device")]
2084 pub async fn encrypt_and_send_raw_to_device(
2085 &self,
2086 recipient_devices: Vec<&Device>,
2087 event_type: &str,
2088 content: Raw<AnyToDeviceEventContent>,
2089 share_strategy: CollectStrategy,
2090 ) -> Result<Vec<(OwnedUserId, OwnedDeviceId)>> {
2091 let users = recipient_devices.iter().map(|device| device.user_id());
2092
2093 self.client.claim_one_time_keys(users).await?;
2097
2098 let olm = self.client.olm_machine().await;
2099 let olm = olm.as_ref().expect("Olm machine wasn't started");
2100
2101 let (requests, withhelds) = olm
2102 .encrypt_content_for_devices(
2103 recipient_devices.into_iter().map(|d| d.deref().clone()).collect(),
2104 event_type,
2105 &content
2106 .deserialize_as::<serde_json::Value>()
2107 .expect("Deserialize as Value will always work"),
2108 share_strategy,
2109 )
2110 .await?;
2111
2112 let mut failures: Vec<(OwnedUserId, OwnedDeviceId)> = Default::default();
2113
2114 withhelds.iter().for_each(|(d, _)| {
2116 failures.push((d.user_id().to_owned(), d.device_id().to_owned()));
2117 });
2118
2119 for request in requests {
2121 let ruma_request = RumaToDeviceRequest::new_raw(
2122 request.event_type.clone(),
2123 request.txn_id.clone(),
2124 request.messages.clone(),
2125 );
2126
2127 let send_result = self
2128 .client
2129 .send_inner(ruma_request, Some(RequestConfig::short_retry()), Default::default())
2130 .await;
2131
2132 if send_result.is_err() {
2134 for (user_id, device_map) in request.messages {
2136 for device_id in device_map.keys() {
2137 match device_id {
2138 DeviceIdOrAllDevices::DeviceId(device_id) => {
2139 failures.push((user_id.clone(), device_id.to_owned()));
2140 }
2141 DeviceIdOrAllDevices::AllDevices => {
2142 }
2144 }
2145 }
2146 }
2147 }
2148 }
2149
2150 Ok(failures)
2151 }
2152}
2153
2154#[cfg(all(test, not(target_family = "wasm")))]
2155mod tests {
2156 use std::{
2157 ops::Not,
2158 str::FromStr,
2159 sync::{
2160 Arc,
2161 atomic::{AtomicBool, Ordering},
2162 },
2163 time::Duration,
2164 };
2165
2166 use matrix_sdk_test::{
2167 DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
2168 event_factory::EventFactory,
2169 };
2170 use ruma::{
2171 event_id,
2172 events::{reaction::ReactionEventContent, relation::Annotation},
2173 user_id,
2174 };
2175 use serde_json::json;
2176 use wiremock::{
2177 Mock, MockServer, Request, ResponseTemplate,
2178 matchers::{header, method, path_regex},
2179 };
2180
2181 use crate::{
2182 Client, Error, assert_next_matches_with_timeout,
2183 config::RequestConfig,
2184 encryption::{
2185 DuplicateOneTimeKeyErrorMessage, OAuthCrossSigningResetInfo, VerificationState,
2186 },
2187 test_utils::{
2188 client::mock_matrix_session, logged_in_client, no_retry_test_client, set_client_session,
2189 },
2190 };
2191
2192 #[async_test]
2193 async fn test_reaction_sending() {
2194 let server = MockServer::start().await;
2195 let client = logged_in_client(Some(server.uri())).await;
2196
2197 let event_id = event_id!("$2:example.org");
2198
2199 Mock::given(method("GET"))
2200 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
2201 .and(header("authorization", "Bearer 1234"))
2202 .respond_with(
2203 ResponseTemplate::new(200)
2204 .set_body_json(EventFactory::new().room_encryption().into_content()),
2205 )
2206 .mount(&server)
2207 .await;
2208
2209 Mock::given(method("PUT"))
2210 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/m\.reaction/.*".to_owned()))
2211 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2212 "event_id": event_id,
2213 })))
2214 .mount(&server)
2215 .await;
2216
2217 let f = EventFactory::new().sender(user_id!("@example:localhost"));
2218 let response = SyncResponseBuilder::default()
2219 .add_joined_room(
2220 JoinedRoomBuilder::default()
2221 .add_state_event(
2222 f.member(user_id!("@example:localhost")).display_name("example"),
2223 )
2224 .add_state_event(f.default_power_levels())
2225 .add_state_event(f.room_encryption()),
2226 )
2227 .build_sync_response();
2228
2229 client.base_client().receive_sync_response(response).await.unwrap();
2230
2231 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
2232 assert!(
2233 room.latest_encryption_state().await.expect("Getting encryption state").is_encrypted()
2234 );
2235
2236 let event_id = event_id!("$1:example.org");
2237 let reaction = ReactionEventContent::new(Annotation::new(event_id.into(), "🐈".to_owned()));
2238 room.send(reaction).await.expect("Sending the reaction should not fail");
2239
2240 room.send_raw("m.reaction", json!({})).await.expect("Sending the reaction should not fail");
2241 }
2242
2243 #[cfg(feature = "sqlite")]
2244 #[async_test]
2245 async fn test_generation_counter_invalidates_olm_machine() {
2246 use matrix_sdk_base::store::RoomLoadSettings;
2249 let sqlite_path = std::env::temp_dir().join("generation_counter_sqlite.db");
2250 let session = mock_matrix_session();
2251
2252 let client1 = Client::builder()
2253 .homeserver_url("http://localhost:1234")
2254 .request_config(RequestConfig::new().disable_retry())
2255 .sqlite_store(&sqlite_path, None)
2256 .build()
2257 .await
2258 .unwrap();
2259 client1
2260 .matrix_auth()
2261 .restore_session(session.clone(), RoomLoadSettings::default())
2262 .await
2263 .unwrap();
2264
2265 let client2 = Client::builder()
2266 .homeserver_url("http://localhost:1234")
2267 .request_config(RequestConfig::new().disable_retry())
2268 .sqlite_store(sqlite_path, None)
2269 .build()
2270 .await
2271 .unwrap();
2272 client2.matrix_auth().restore_session(session, RoomLoadSettings::default()).await.unwrap();
2273
2274 let guard = client1.encryption().try_lock_store_once().await.unwrap();
2276 assert!(guard.is_none());
2277
2278 client1.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2279 client2.encryption().enable_cross_process_store_lock("client2".to_owned()).await.unwrap();
2280
2281 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2283 assert!(acquired1.is_some());
2284
2285 let initial_olm_machine =
2287 client1.olm_machine().await.clone().expect("must have an olm machine");
2288
2289 let decryption_key = matrix_sdk_base::crypto::store::types::BackupDecryptionKey::new();
2291 let backup_key = decryption_key.megolm_v1_public_key();
2292 backup_key.set_version("1".to_owned());
2293 initial_olm_machine
2294 .backup_machine()
2295 .save_decryption_key(Some(decryption_key.to_owned()), Some("1".to_owned()))
2296 .await
2297 .expect("Should save");
2298
2299 initial_olm_machine.backup_machine().enable_backup_v1(backup_key.clone()).await.unwrap();
2300
2301 assert!(client1.encryption().backups().are_enabled().await);
2302
2303 let error = client2.encryption().try_lock_store_once().await.unwrap_err();
2305 assert!(matches!(error, Error::CrossProcessLockError(_)));
2306
2307 drop(acquired1);
2309 tokio::time::sleep(Duration::from_millis(100)).await;
2310
2311 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2313 assert!(acquired1.is_some());
2314
2315 let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2317 assert!(initial_olm_machine.same_as(&olm_machine));
2318
2319 drop(acquired1);
2321 tokio::time::sleep(Duration::from_millis(100)).await;
2322
2323 let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2325 assert!(acquired2.is_some());
2326
2327 drop(acquired2);
2329 tokio::time::sleep(Duration::from_millis(100)).await;
2330
2331 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2333 assert!(acquired1.is_some());
2334
2335 let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2337
2338 assert!(!initial_olm_machine.same_as(&olm_machine));
2339
2340 let backup_key_new = olm_machine.backup_machine().get_backup_keys().await.unwrap();
2341 assert!(backup_key_new.decryption_key.is_some());
2342 assert_eq!(
2343 backup_key_new.decryption_key.unwrap().megolm_v1_public_key().to_base64(),
2344 backup_key.to_base64()
2345 );
2346 assert!(client1.encryption().backups().are_enabled().await);
2347 }
2348
2349 #[cfg(feature = "sqlite")]
2350 #[async_test]
2351 async fn test_generation_counter_no_spurious_invalidation() {
2352 use matrix_sdk_base::store::RoomLoadSettings;
2355 let sqlite_path =
2356 std::env::temp_dir().join("generation_counter_no_spurious_invalidations.db");
2357 let session = mock_matrix_session();
2358
2359 let client = Client::builder()
2360 .homeserver_url("http://localhost:1234")
2361 .request_config(RequestConfig::new().disable_retry())
2362 .sqlite_store(&sqlite_path, None)
2363 .build()
2364 .await
2365 .unwrap();
2366 client
2367 .matrix_auth()
2368 .restore_session(session.clone(), RoomLoadSettings::default())
2369 .await
2370 .unwrap();
2371
2372 let initial_olm_machine = client.olm_machine().await.as_ref().unwrap().clone();
2373
2374 client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2375
2376 let after_enabling_lock = client.olm_machine().await.as_ref().unwrap().clone();
2378 assert!(initial_olm_machine.same_as(&after_enabling_lock));
2379
2380 {
2381 let acquired = client.encryption().try_lock_store_once().await.unwrap();
2382 assert!(acquired.is_some());
2383 }
2384
2385 let after_taking_lock_first_time = client.olm_machine().await.as_ref().unwrap().clone();
2387 assert!(initial_olm_machine.same_as(&after_taking_lock_first_time));
2388
2389 {
2390 let client2 = Client::builder()
2392 .homeserver_url("http://localhost:1234")
2393 .request_config(RequestConfig::new().disable_retry())
2394 .sqlite_store(sqlite_path, None)
2395 .build()
2396 .await
2397 .unwrap();
2398 client2
2399 .matrix_auth()
2400 .restore_session(session, RoomLoadSettings::default())
2401 .await
2402 .unwrap();
2403
2404 client2
2405 .encryption()
2406 .enable_cross_process_store_lock("client2".to_owned())
2407 .await
2408 .unwrap();
2409
2410 let guard = client2.encryption().spin_lock_store(None).await.unwrap();
2411 assert!(guard.is_some());
2412
2413 drop(guard);
2414 tokio::time::sleep(Duration::from_millis(100)).await;
2415 }
2416
2417 {
2418 let acquired = client.encryption().try_lock_store_once().await.unwrap();
2419 assert!(acquired.is_some());
2420 }
2421
2422 let after_taking_lock_second_time = client.olm_machine().await.as_ref().unwrap().clone();
2424 assert!(!after_taking_lock_first_time.same_as(&after_taking_lock_second_time));
2425
2426 {
2427 let acquired = client.encryption().try_lock_store_once().await.unwrap();
2428 assert!(acquired.is_some());
2429 }
2430
2431 let after_taking_lock_third_time = client.olm_machine().await.as_ref().unwrap().clone();
2433 assert!(after_taking_lock_second_time.same_as(&after_taking_lock_third_time));
2434 }
2435
2436 #[async_test]
2437 async fn test_update_verification_state_is_updated_before_any_requests_happen() {
2438 let client = no_retry_test_client(None).await;
2440 let server = MockServer::start().await;
2441
2442 let mut verification_state = client.encryption().verification_state();
2444
2445 assert_next_matches_with_timeout!(verification_state, VerificationState::Unknown);
2447
2448 let keys_requested = Arc::new(AtomicBool::new(false));
2451 let inner_bool = keys_requested.clone();
2452
2453 Mock::given(method("GET"))
2454 .and(path_regex(
2455 r"/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
2456 ))
2457 .respond_with(move |_req: &Request| {
2458 inner_bool.fetch_or(true, Ordering::SeqCst);
2459 ResponseTemplate::new(200).set_body_json(json!({}))
2460 })
2461 .mount(&server)
2462 .await;
2463
2464 set_client_session(&client).await;
2466
2467 assert!(keys_requested.load(Ordering::SeqCst).not());
2469 assert_next_matches_with_timeout!(verification_state, VerificationState::Unverified);
2470 }
2471
2472 #[test]
2473 fn test_oauth_reset_info_from_uiaa_info() {
2474 let auth_info = json!({
2475 "session": "dummy",
2476 "flows": [
2477 {
2478 "stages": [
2479 "org.matrix.cross_signing_reset"
2480 ]
2481 }
2482 ],
2483 "params": {
2484 "org.matrix.cross_signing_reset": {
2485 "url": "https://example.org/account/account?action=org.matrix.cross_signing_reset"
2486 }
2487 },
2488 "msg": "To reset..."
2489 });
2490
2491 let auth_info = serde_json::from_value(auth_info)
2492 .expect("We should be able to deserialize the UiaaInfo");
2493 OAuthCrossSigningResetInfo::from_auth_info(&auth_info)
2494 .expect("We should be able to fetch the cross-signing reset info from the auth info");
2495 }
2496
2497 #[test]
2498 fn test_duplicate_one_time_key_error_parsing() {
2499 let message = concat!(
2500 r#"One time key signed_curve25519:AAAAAAAAAAA already exists. "#,
2501 r#"Old key: {"key":"dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs","#,
2502 r#""signatures":{"@example:matrix.org":{"ed25519:AAAAAAAAAA":""#,
2503 r#"Fk45zHAbrd+1j9wZXLjL2Y/+DU/Mnz9yuvlfYBOOT7qExN2Jdud+5BAuNs8nZ/caS4wTF39Kg3zQpzaGERoCBg"}}};"#,
2504 r#" new key: {'key': 'CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno', "#,
2505 r#"'signatures': {'@example:matrix.org': {'ed25519:AAAAAAAAAA': "#,
2506 r#"'BQ9Gp0p+6srF+c8OyruqKKd9R4yaub3THYAyyBB/7X/rG8BwcAqFynzl1aGyFYun4Q+087a5OSiglCXI+/kQAA'}}}"#
2507 );
2508 let message = DuplicateOneTimeKeyErrorMessage::from_str(message)
2509 .expect("We should be able to parse the error message");
2510
2511 assert_eq!(message.old_key.to_base64(), "dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs");
2512 assert_eq!(message.new_key.to_base64(), "CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno");
2513
2514 DuplicateOneTimeKeyErrorMessage::from_str("One time key already exists.")
2515 .expect_err("We shouldn't be able to parse an incomplete error message");
2516 }
2517
2518 fn devices_to_verify_against_keys_query_response(
2522 devices: Vec<serde_json::Value>,
2523 ) -> serde_json::Value {
2524 let device_keys: serde_json::Map<String, serde_json::Value> = devices
2525 .into_iter()
2526 .map(|device| (device.get("device_id").unwrap().as_str().unwrap().to_owned(), device))
2527 .collect();
2528 json!({
2529 "device_keys": {
2530 "@example:localhost": device_keys,
2531 },
2532 "master_keys": {
2533 "@example:localhost": {
2534 "keys": {
2535 "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU",
2536 },
2537 "usage": ["master"],
2538 "user_id": "@example:localhost",
2539 },
2540 },
2541 "self_signing_keys": {
2542 "@example:localhost": {
2543 "keys": {
2544 "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM",
2545 },
2546 "usage": ["self_signing"],
2547 "user_id": "@example:localhost",
2548 "signatures": {
2549 "@example:localhost": {
2550 "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "etO1bB+rCk+TQ/FcjQ8eWu/RsRNQNNQ1Ek+PD6//j8yz6igRjfvuHZaMvr/quAFrirfgExph2TdOwlDgN5bFCQ",
2551 },
2552 },
2553 },
2554 },
2555 "user_signing_keys": {
2556 "@example:localhost": {
2557 "keys": {
2558 "ed25519:CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0": "CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0",
2559 },
2560 "usage": ["user_signing"],
2561 "user_id": "@example:localhost",
2562 "signatures": {
2563 "@example:localhost": {
2564 "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "E/DFi/hQTIb/7eSB+HbCXeTLFaLjqWHzLO9GwjL1qdhfO7ew4p6YdtXSH3T2YYr1dKCPteH/4nMYVwOhww2CBg",
2565 },
2566 },
2567 },
2568 }
2569 })
2570 }
2571
2572 #[async_test]
2575 async fn test_devices_to_verify_against_no_devices() {
2578 let server = MockServer::start().await;
2579 let client = logged_in_client(Some(server.uri())).await;
2580
2581 Mock::given(method("POST"))
2582 .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2583 .respond_with(
2584 ResponseTemplate::new(200)
2585 .set_body_json(devices_to_verify_against_keys_query_response(vec![])),
2586 )
2587 .mount(&server)
2588 .await;
2589
2590 assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2591 }
2592
2593 #[async_test]
2594 async fn test_devices_to_verify_against_cross_signed() {
2597 let server = MockServer::start().await;
2598 let client = logged_in_client(Some(server.uri())).await;
2599
2600 Mock::given(method("POST"))
2601 .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2602 .respond_with(ResponseTemplate::new(200).set_body_json(
2603 devices_to_verify_against_keys_query_response(vec![
2604 json!({
2605 "algorithms": [
2606 "m.olm.v1.curve25519-aes-sha2",
2607 "m.megolm.v1.aes-sha2",
2608 ],
2609 "user_id": "@example:localhost",
2610 "device_id": "SIGNEDDEVICE",
2611 "keys": {
2612 "curve25519:SIGNEDDEVICE": "o1LqUtH/sqd3WF+BB2Qr77uw3sDmZhMOz68/IV9aHxs",
2613 "ed25519:SIGNEDDEVICE": "iVoEfMOoUqxXVMLdpZCOgvQuCrT3/kQWkBmB3Phi/lo",
2614 },
2615 "signatures": {
2616 "@example:localhost": {
2617 "ed25519:SIGNEDDEVICE": "C7yRu1fNrdD2EobVdtANMqk3LBtWtTRWrIU22xVS8/Om1kmA/luzek64R3N6JsZhYczVmZYBKhUC9kRvHHwOBg",
2618 "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "frfh2HP28GclmGvwTic00Fj4nZCvm4RlRA6U56mnD5920hOi04+L055ojzp6ybZXvC/GQYfyTHwQXlUN1nvxBA",
2619 },
2620 },
2621 })
2622 ])
2623 ))
2624 .mount(&server)
2625 .await;
2626
2627 assert!(client.encryption().has_devices_to_verify_against().await.unwrap());
2628 }
2629
2630 #[async_test]
2631 async fn test_devices_to_verify_against_dehydrated_and_unsigned() {
2634 let server = MockServer::start().await;
2635 let client = logged_in_client(Some(server.uri())).await;
2636 let user_id = client.user_id().unwrap();
2637 let olm_machine = client.olm_machine().await;
2638 let olm_machine = olm_machine.as_ref().unwrap();
2639
2640 Mock::given(method("POST"))
2641 .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2642 .respond_with(ResponseTemplate::new(200).set_body_json(
2643 devices_to_verify_against_keys_query_response(vec![
2644 json!({
2645 "algorithms": [
2646 "m.olm.v1.curve25519-aes-sha2",
2647 "m.megolm.v1.aes-sha2",
2648 ],
2649 "user_id": "@example:localhost",
2650 "device_id": "DEHYDRATEDDEVICE",
2651 "keys": {
2652 "curve25519:DEHYDRATEDDEVICE": "XOn5VguAgokZ3p9mBz2yOB395fn6j75G8jIPcXEWQGY",
2653 "ed25519:DEHYDRATEDDEVICE": "4GG5xmBT7z4rgUgmWNlKZ+ABE3QlGgTorF+luCnKfYI",
2654 },
2655 "dehydrated": true,
2656 "signatures": {
2657 "@example:localhost": {
2658 "ed25519:DEHYDRATEDDEVICE": "+OMasB7nzVlMV+zRDxkh4h8h/Q0bY42P1SPv7X2IURIelT5G+d+AYSmg30N4maphxEDBqt/vI8/lIr71exc3Dg",
2659 "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "8DzynAgbYgXX1Md5d4Vw91Zstpoi4dpG7levFeVhi4psCAWuBnV76Qu1s2TGjQQ0CLDXEqcxxuX9X4eUK5TGCg",
2660 },
2661 },
2662 }),
2663 json!({
2664 "algorithms": [
2665 "m.olm.v1.curve25519-aes-sha2",
2666 "m.megolm.v1.aes-sha2",
2667 ],
2668 "user_id": "@example:localhost",
2669 "device_id": "UNSIGNEDDEVICE",
2670 "keys": {
2671 "curve25519:UNSIGNEDDEVICE": "mMby6NpprkHxj+ONfO9Z5lBqVUHJBMkrPFSNJhogBkg",
2672 "ed25519:UNSIGNEDDEVICE": "Zifq39ZDrlIaSRf0Hh22owEqXCPE+1JSSgs6LDlubwQ",
2673 },
2674 "signatures": {
2675 "@example:localhost": {
2676 "ed25519:UNSIGNEDDEVICE": "+L29RoDKoTufPGm/Bae65KHno7Z1H7GYhxSKpB4RQZRS7NrR29AMW1PVhEsIozYuDVEFuMZ0L8H3dlcaHxagBA",
2677 },
2678 },
2679 }),
2680 ])
2681 ))
2682 .mount(&server)
2683 .await;
2684
2685 let (request_id, request) = olm_machine.query_keys_for_users([user_id]);
2686 client.keys_query(&request_id, request.device_keys).await.unwrap();
2687
2688 assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2689 }
2690}