1#![doc = include_str!("../docs/encryption.md")]
17#![cfg_attr(target_family = "wasm", allow(unused_imports))]
18
19#[cfg(feature = "experimental-send-custom-to-device")]
20use std::ops::Deref;
21use std::{
22 collections::{BTreeMap, HashSet},
23 io::{Cursor, Read, Write},
24 iter,
25 path::PathBuf,
26 str::FromStr,
27 sync::Arc,
28};
29
30use eyeball::{SharedObservable, Subscriber};
31use futures_core::Stream;
32use futures_util::{
33 future::try_join,
34 stream::{self, StreamExt},
35};
36#[cfg(feature = "experimental-send-custom-to-device")]
37use matrix_sdk_base::crypto::CollectStrategy;
38use matrix_sdk_base::{
39 StateStoreDataKey, StateStoreDataValue,
40 cross_process_lock::CrossProcessLockError,
41 crypto::{
42 CrossSigningBootstrapRequests, OlmMachine,
43 store::types::{RoomKeyBundleInfo, RoomKeyInfo},
44 types::{
45 SignedKey,
46 requests::{
47 OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
48 },
49 },
50 },
51};
52use matrix_sdk_common::{executor::spawn, locks::Mutex as StdMutex};
53use ruma::{
54 DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, TransactionId, UserId,
55 api::client::{
56 error::{ErrorBody, StandardErrorBody},
57 keys::{
58 get_keys, upload_keys, upload_signatures::v3::Request as UploadSignaturesRequest,
59 upload_signing_keys::v3::Request as UploadSigningKeysRequest,
60 },
61 message::send_message_event,
62 to_device::send_event_to_device::v3::{
63 Request as RumaToDeviceRequest, Response as ToDeviceResponse,
64 },
65 uiaa::{AuthData, UiaaInfo},
66 },
67 assign,
68 events::room::{MediaSource, ThumbnailInfo},
69};
70#[cfg(feature = "experimental-send-custom-to-device")]
71use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices};
72use serde::{Deserialize, de::Error as _};
73use tasks::BundleReceiverTask;
74use tokio::sync::{Mutex, RwLockReadGuard};
75use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
76use tracing::{debug, error, instrument, warn};
77use url::Url;
78use vodozemac::Curve25519PublicKey;
79
80use self::{
81 backups::{Backups, types::BackupClientState},
82 futures::UploadEncryptedFile,
83 identities::{Device, DeviceUpdates, IdentityUpdates, UserDevices, UserIdentity},
84 recovery::{Recovery, RecoveryState},
85 secret_storage::SecretStorage,
86 tasks::{BackupDownloadTask, BackupUploadingTask, ClientTasks},
87 verification::{SasVerification, Verification, VerificationRequest},
88};
89use crate::{
90 Client, Error, HttpError, Result, RumaApiError, TransmissionProgress,
91 attachment::Thumbnail,
92 client::{ClientInner, WeakClient},
93 cross_process_lock::CrossProcessLockGuard,
94 error::HttpResult,
95};
96
97pub mod backups;
98pub mod futures;
99pub mod identities;
100pub mod recovery;
101pub mod secret_storage;
102pub(crate) mod tasks;
103pub mod verification;
104
105pub use matrix_sdk_base::crypto::{
106 CrossSigningStatus, CryptoStoreError, DecryptorError, EventError, KeyExportError, LocalTrust,
107 MediaEncryptionInfo, MegolmError, OlmError, RoomKeyImportResult, SecretImportError,
108 SessionCreationError, SignatureError, VERSION,
109 olm::{
110 SessionCreationError as MegolmSessionCreationError,
111 SessionExportError as OlmSessionExportError,
112 },
113 vodozemac,
114};
115
116#[cfg(feature = "experimental-send-custom-to-device")]
117use crate::config::RequestConfig;
118pub use crate::error::RoomKeyImportError;
119
120pub(crate) struct EncryptionData {
122 pub tasks: StdMutex<ClientTasks>,
125
126 pub encryption_settings: EncryptionSettings,
128
129 pub backup_state: BackupClientState,
131
132 pub recovery_state: SharedObservable<RecoveryState>,
134}
135
136impl EncryptionData {
137 pub fn new(encryption_settings: EncryptionSettings) -> Self {
138 Self {
139 encryption_settings,
140
141 tasks: StdMutex::new(Default::default()),
142 backup_state: Default::default(),
143 recovery_state: Default::default(),
144 }
145 }
146
147 pub fn initialize_tasks(&self, client: &Arc<ClientInner>) {
148 let weak_client = WeakClient::from_inner(client);
149
150 let mut tasks = self.tasks.lock();
151 tasks.upload_room_keys = Some(BackupUploadingTask::new(weak_client.clone()));
152
153 if self.encryption_settings.backup_download_strategy
154 == BackupDownloadStrategy::AfterDecryptionFailure
155 {
156 tasks.download_room_keys = Some(BackupDownloadTask::new(weak_client));
157 }
158 }
159
160 pub fn initialize_recovery_state_update_task(&self, client: &Client) {
166 let mut guard = self.tasks.lock();
167
168 let future = Recovery::update_state_after_backup_state_change(client);
169 let join_handle = spawn(future);
170
171 guard.update_recovery_state_after_backup = Some(join_handle);
172 }
173}
174
175#[derive(Clone, Copy, Debug, Default)]
177pub struct EncryptionSettings {
178 pub auto_enable_cross_signing: bool,
184
185 pub backup_download_strategy: BackupDownloadStrategy,
190
191 pub auto_enable_backups: bool,
193}
194
195#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
197#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
198pub enum BackupDownloadStrategy {
199 OneShot,
210
211 AfterDecryptionFailure,
213
214 #[default]
219 Manual,
220}
221
222#[derive(Clone, Copy, Debug, Eq, PartialEq)]
227pub enum VerificationState {
228 Unknown,
230 Verified,
233 Unverified,
235}
236
237#[derive(Debug)]
239pub struct CrossProcessLockStoreGuardWithGeneration {
240 _guard: CrossProcessLockGuard,
241 generation: u64,
242}
243
244impl CrossProcessLockStoreGuardWithGeneration {
245 pub fn generation(&self) -> u64 {
247 self.generation
248 }
249}
250
251#[derive(Debug)]
262pub struct CrossSigningResetHandle {
263 client: Client,
264 upload_request: UploadSigningKeysRequest,
265 signatures_request: UploadSignaturesRequest,
266 auth_type: CrossSigningResetAuthType,
267 is_cancelled: Mutex<bool>,
268}
269
270impl CrossSigningResetHandle {
271 pub fn new(
273 client: Client,
274 upload_request: UploadSigningKeysRequest,
275 signatures_request: UploadSignaturesRequest,
276 auth_type: CrossSigningResetAuthType,
277 ) -> Self {
278 Self {
279 client,
280 upload_request,
281 signatures_request,
282 auth_type,
283 is_cancelled: Mutex::new(false),
284 }
285 }
286
287 pub fn auth_type(&self) -> &CrossSigningResetAuthType {
290 &self.auth_type
291 }
292
293 pub async fn auth(&self, auth: Option<AuthData>) -> Result<()> {
297 let mut upload_request = self.upload_request.clone();
298 upload_request.auth = auth;
299
300 while let Err(e) = self.client.send(upload_request.clone()).await {
301 if *self.is_cancelled.lock().await {
302 return Ok(());
303 }
304
305 match e.as_uiaa_response() {
306 Some(uiaa_info) => {
307 if uiaa_info.auth_error.is_some() {
308 return Err(e.into());
309 }
310 }
311 None => return Err(e.into()),
312 }
313 }
314
315 self.client.send(self.signatures_request.clone()).await?;
316
317 Ok(())
318 }
319
320 pub async fn cancel(&self) {
322 *self.is_cancelled.lock().await = true;
323 }
324}
325
326#[derive(Debug, Clone)]
329pub enum CrossSigningResetAuthType {
330 Uiaa(UiaaInfo),
332 OAuth(OAuthCrossSigningResetInfo),
335}
336
337impl CrossSigningResetAuthType {
338 fn new(error: &HttpError) -> Result<Option<Self>> {
339 if let Some(auth_info) = error.as_uiaa_response() {
340 if let Ok(auth_info) = OAuthCrossSigningResetInfo::from_auth_info(auth_info) {
341 Ok(Some(CrossSigningResetAuthType::OAuth(auth_info)))
342 } else {
343 Ok(Some(CrossSigningResetAuthType::Uiaa(auth_info.clone())))
344 }
345 } else {
346 Ok(None)
347 }
348 }
349}
350
351#[derive(Debug, Clone, Deserialize)]
354pub struct OAuthCrossSigningResetInfo {
355 pub approval_url: Url,
357}
358
359impl OAuthCrossSigningResetInfo {
360 fn from_auth_info(auth_info: &UiaaInfo) -> Result<Self> {
361 let parameters = serde_json::from_str::<OAuthCrossSigningResetUiaaParameters>(
362 auth_info.params.as_ref().map(|value| value.get()).unwrap_or_default(),
363 )?;
364
365 Ok(OAuthCrossSigningResetInfo { approval_url: parameters.reset.url })
366 }
367}
368
369#[derive(Debug, Deserialize)]
372struct OAuthCrossSigningResetUiaaParameters {
373 #[serde(rename = "org.matrix.cross_signing_reset")]
375 reset: OAuthCrossSigningResetUiaaResetParameter,
376}
377
378#[derive(Debug, Deserialize)]
381struct OAuthCrossSigningResetUiaaResetParameter {
382 url: Url,
384}
385
386#[derive(Debug)]
389struct DuplicateOneTimeKeyErrorMessage {
390 old_key: Curve25519PublicKey,
392 new_key: Curve25519PublicKey,
394}
395
396impl FromStr for DuplicateOneTimeKeyErrorMessage {
397 type Err = serde_json::Error;
398
399 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
400 let mut split = s.split_terminator(';');
404
405 let old_key = split
406 .next()
407 .ok_or(serde_json::Error::custom("Old key is missing in the error message"))?;
408 let new_key = split
409 .next()
410 .ok_or(serde_json::Error::custom("New key is missing in the error message"))?;
411
412 let old_key_index = old_key
415 .find("Old key:")
416 .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
417
418 let old_key = old_key[old_key_index..]
419 .trim()
420 .strip_prefix("Old key:")
421 .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
422
423 let new_key = new_key
426 .trim()
427 .strip_prefix("new key:")
428 .ok_or(serde_json::Error::custom("New key is missing the prefix"))?;
429
430 let new_key = new_key.replace("'", "\"");
433
434 let old_key: SignedKey = serde_json::from_str(old_key)?;
436 let new_key: SignedKey = serde_json::from_str(&new_key)?;
437
438 let old_key = old_key.key();
440 let new_key = new_key.key();
441
442 Ok(Self { old_key, new_key })
443 }
444}
445
446impl Client {
447 pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
448 self.base_client().olm_machine().await
449 }
450
451 pub(crate) async fn mark_request_as_sent(
452 &self,
453 request_id: &TransactionId,
454 response: impl Into<matrix_sdk_base::crypto::types::requests::AnyIncomingResponse<'_>>,
455 ) -> Result<(), matrix_sdk_base::Error> {
456 Ok(self
457 .olm_machine()
458 .await
459 .as_ref()
460 .expect(
461 "We should have an olm machine once we try to mark E2EE related requests as sent",
462 )
463 .mark_request_as_sent(request_id, response)
464 .await?)
465 }
466
467 #[instrument(skip(self, device_keys))]
473 pub(crate) async fn keys_query(
474 &self,
475 request_id: &TransactionId,
476 device_keys: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
477 ) -> Result<get_keys::v3::Response> {
478 let request = assign!(get_keys::v3::Request::new(), { device_keys });
479
480 let response = self.send(request).await?;
481 self.mark_request_as_sent(request_id, &response).await?;
482 self.encryption().update_state_after_keys_query(&response).await;
483
484 Ok(response)
485 }
486
487 pub fn upload_encrypted_file<'a, R: Read + ?Sized + 'a>(
521 &'a self,
522 reader: &'a mut R,
523 ) -> UploadEncryptedFile<'a, R> {
524 UploadEncryptedFile::new(self, reader)
525 }
526
527 pub(crate) async fn upload_encrypted_media_and_thumbnail(
530 &self,
531 data: &[u8],
532 thumbnail: Option<Thumbnail>,
533 send_progress: SharedObservable<TransmissionProgress>,
534 ) -> Result<(MediaSource, Option<(MediaSource, Box<ThumbnailInfo>)>)> {
535 let upload_thumbnail = self.upload_encrypted_thumbnail(thumbnail, send_progress.clone());
536
537 let upload_attachment = async {
538 let mut cursor = Cursor::new(data);
539 self.upload_encrypted_file(&mut cursor)
540 .with_send_progress_observable(send_progress)
541 .await
542 };
543
544 let (thumbnail, file) = try_join(upload_thumbnail, upload_attachment).await?;
545
546 Ok((MediaSource::Encrypted(Box::new(file)), thumbnail))
547 }
548
549 async fn upload_encrypted_thumbnail(
552 &self,
553 thumbnail: Option<Thumbnail>,
554 send_progress: SharedObservable<TransmissionProgress>,
555 ) -> Result<Option<(MediaSource, Box<ThumbnailInfo>)>> {
556 let Some(thumbnail) = thumbnail else {
557 return Ok(None);
558 };
559
560 let (data, _, thumbnail_info) = thumbnail.into_parts();
561 let mut cursor = Cursor::new(data);
562
563 let file = self
564 .upload_encrypted_file(&mut cursor)
565 .with_send_progress_observable(send_progress)
566 .await?;
567
568 Ok(Some((MediaSource::Encrypted(Box::new(file)), thumbnail_info)))
569 }
570
571 pub(crate) async fn claim_one_time_keys(
577 &self,
578 users: impl Iterator<Item = &UserId>,
579 ) -> Result<()> {
580 let _lock = self.locks().key_claim_lock.lock().await;
581
582 if let Some((request_id, request)) = self
583 .olm_machine()
584 .await
585 .as_ref()
586 .ok_or(Error::NoOlmMachine)?
587 .get_missing_sessions(users)
588 .await?
589 {
590 let response = self.send(request).await?;
591 self.mark_request_as_sent(&request_id, &response).await?;
592 }
593
594 Ok(())
595 }
596
597 #[instrument(skip(self, request))]
607 pub(crate) async fn keys_upload(
608 &self,
609 request_id: &TransactionId,
610 request: &upload_keys::v3::Request,
611 ) -> Result<upload_keys::v3::Response> {
612 debug!(
613 device_keys = request.device_keys.is_some(),
614 one_time_key_count = request.one_time_keys.len(),
615 "Uploading public encryption keys",
616 );
617
618 let response = self.send(request.clone()).await?;
619 self.mark_request_as_sent(request_id, &response).await?;
620
621 Ok(response)
622 }
623
624 pub(crate) async fn room_send_helper(
625 &self,
626 request: &RoomMessageRequest,
627 ) -> Result<send_message_event::v3::Response> {
628 let content = request.content.clone();
629 let txn_id = request.txn_id.clone();
630 let room_id = &request.room_id;
631
632 self.get_room(room_id)
633 .expect("Can't send a message to a room that isn't known to the store")
634 .send(*content)
635 .with_transaction_id(txn_id)
636 .await
637 }
638
639 pub(crate) async fn send_to_device(
640 &self,
641 request: &ToDeviceRequest,
642 ) -> HttpResult<ToDeviceResponse> {
643 let request = RumaToDeviceRequest::new_raw(
644 request.event_type.clone(),
645 request.txn_id.clone(),
646 request.messages.clone(),
647 );
648
649 self.send(request).await
650 }
651
652 pub(crate) async fn send_verification_request(
653 &self,
654 request: OutgoingVerificationRequest,
655 ) -> Result<()> {
656 use matrix_sdk_base::crypto::types::requests::OutgoingVerificationRequest::*;
657
658 match request {
659 ToDevice(t) => {
660 self.send_to_device(&t).await?;
661 }
662 InRoom(r) => {
663 self.room_send_helper(&r).await?;
664 }
665 }
666
667 Ok(())
668 }
669
670 async fn send_outgoing_request(&self, r: OutgoingRequest) -> Result<()> {
671 use matrix_sdk_base::crypto::types::requests::AnyOutgoingRequest;
672
673 match r.request() {
674 AnyOutgoingRequest::KeysQuery(request) => {
675 self.keys_query(r.request_id(), request.device_keys.clone()).await?;
676 }
677 AnyOutgoingRequest::KeysUpload(request) => {
678 let response = self.keys_upload(r.request_id(), request).await;
679
680 if let Err(e) = &response {
681 match e.as_ruma_api_error() {
682 Some(RumaApiError::ClientApi(e)) if e.status_code == 400 => {
683 if let ErrorBody::Standard(StandardErrorBody { message, .. }) = &e.body
684 {
685 {
690 let already_reported = self
691 .state_store()
692 .get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded)
693 .await?
694 .is_some();
695
696 if message.starts_with("One time key") && !already_reported {
697 if let Ok(message) =
698 DuplicateOneTimeKeyErrorMessage::from_str(message)
699 {
700 error!(
701 sentry = true,
702 old_key = %message.old_key,
703 new_key = %message.new_key,
704 "Duplicate one-time keys have been uploaded"
705 );
706 } else {
707 error!(
708 sentry = true,
709 "Duplicate one-time keys have been uploaded"
710 );
711 }
712
713 self.state_store()
714 .set_kv_data(
715 StateStoreDataKey::OneTimeKeyAlreadyUploaded,
716 StateStoreDataValue::OneTimeKeyAlreadyUploaded,
717 )
718 .await?;
719 }
720 }
721 }
722 }
723 _ => {}
724 }
725
726 response?;
727 }
728 }
729 AnyOutgoingRequest::ToDeviceRequest(request) => {
730 let response = self.send_to_device(request).await?;
731 self.mark_request_as_sent(r.request_id(), &response).await?;
732 }
733 AnyOutgoingRequest::SignatureUpload(request) => {
734 let response = self.send(request.clone()).await?;
735 self.mark_request_as_sent(r.request_id(), &response).await?;
736 }
737 AnyOutgoingRequest::RoomMessage(request) => {
738 let response = self.room_send_helper(request).await?;
739 self.mark_request_as_sent(r.request_id(), &response).await?;
740 }
741 AnyOutgoingRequest::KeysClaim(request) => {
742 let response = self.send(request.clone()).await?;
743 self.mark_request_as_sent(r.request_id(), &response).await?;
744 }
745 }
746
747 Ok(())
748 }
749
750 #[instrument(skip_all)]
751 pub(crate) async fn send_outgoing_requests(&self) -> Result<()> {
752 const MAX_CONCURRENT_REQUESTS: usize = 20;
753
754 if let Err(e) = self.claim_one_time_keys(iter::empty()).await {
757 warn!("Error while claiming one-time keys {:?}", e);
758 }
759
760 let outgoing_requests = stream::iter(
761 self.olm_machine()
762 .await
763 .as_ref()
764 .ok_or(Error::NoOlmMachine)?
765 .outgoing_requests()
766 .await?,
767 )
768 .map(|r| self.send_outgoing_request(r));
769
770 let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS);
771
772 requests
773 .for_each(|r| async move {
774 match r {
775 Ok(_) => (),
776 Err(e) => warn!(error = ?e, "Error when sending out an outgoing E2EE request"),
777 }
778 })
779 .await;
780
781 Ok(())
782 }
783}
784
785#[cfg(any(feature = "testing", test))]
786impl Client {
787 pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
789 self.olm_machine().await
790 }
791}
792
793#[derive(Debug, Clone)]
797pub struct Encryption {
798 client: Client,
800}
801
802impl Encryption {
803 pub(crate) fn new(client: Client) -> Self {
804 Self { client }
805 }
806
807 pub(crate) fn settings(&self) -> EncryptionSettings {
809 self.client.inner.e2ee.encryption_settings
810 }
811
812 pub async fn ed25519_key(&self) -> Option<String> {
815 self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().ed25519.to_base64())
816 }
817
818 pub async fn curve25519_key(&self) -> Option<Curve25519PublicKey> {
820 self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().curve25519)
821 }
822
823 pub async fn device_creation_timestamp(&self) -> MilliSecondsSinceUnixEpoch {
825 match self.get_own_device().await {
826 Ok(Some(device)) => device.first_time_seen_ts(),
827 _ => MilliSecondsSinceUnixEpoch::now(),
829 }
830 }
831
832 pub(crate) async fn import_secrets_bundle(
833 &self,
834 bundle: &matrix_sdk_base::crypto::types::SecretsBundle,
835 ) -> Result<(), SecretImportError> {
836 let olm_machine = self.client.olm_machine().await;
837 let olm_machine =
838 olm_machine.as_ref().expect("This should only be called once we have an OlmMachine");
839
840 olm_machine.store().import_secrets_bundle(bundle).await
841 }
842
843 pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
848 let olm = self.client.olm_machine().await;
849 let machine = olm.as_ref()?;
850 Some(machine.cross_signing_status().await)
851 }
852
853 pub async fn has_devices_to_verify_against(&self) -> Result<bool> {
859 let olm_machine = self.client.olm_machine().await;
860 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
861 let user_id = olm_machine.user_id();
862
863 self.ensure_initial_key_query().await?;
864
865 let devices = self.get_user_devices(user_id).await?;
866
867 let ret = devices.devices().any(|device| {
868 device.is_cross_signed_by_owner()
869 && device.curve25519_key().is_some()
870 && !device.is_dehydrated()
871 });
872
873 Ok(ret)
874 }
875
876 pub async fn tracked_users(&self) -> Result<HashSet<OwnedUserId>, CryptoStoreError> {
881 if let Some(machine) = self.client.olm_machine().await.as_ref() {
882 machine.tracked_users().await
883 } else {
884 Ok(HashSet::new())
885 }
886 }
887
888 pub fn verification_state(&self) -> Subscriber<VerificationState> {
911 self.client.inner.verification_state.subscribe_reset()
912 }
913
914 pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
916 let olm = self.client.olm_machine().await;
917 let olm = olm.as_ref()?;
918 #[allow(clippy::bind_instead_of_map)]
919 olm.get_verification(user_id, flow_id).and_then(|v| match v {
920 matrix_sdk_base::crypto::Verification::SasV1(sas) => {
921 Some(SasVerification { inner: sas, client: self.client.clone() }.into())
922 }
923 #[cfg(feature = "qrcode")]
924 matrix_sdk_base::crypto::Verification::QrV1(qr) => {
925 Some(verification::QrVerification { inner: qr, client: self.client.clone() }.into())
926 }
927 _ => None,
928 })
929 }
930
931 pub async fn get_verification_request(
934 &self,
935 user_id: &UserId,
936 flow_id: impl AsRef<str>,
937 ) -> Option<VerificationRequest> {
938 let olm = self.client.olm_machine().await;
939 let olm = olm.as_ref()?;
940
941 olm.get_verification_request(user_id, flow_id)
942 .map(|r| VerificationRequest { inner: r, client: self.client.clone() })
943 }
944
945 pub async fn get_device(
979 &self,
980 user_id: &UserId,
981 device_id: &DeviceId,
982 ) -> Result<Option<Device>, CryptoStoreError> {
983 let olm = self.client.olm_machine().await;
984 let Some(machine) = olm.as_ref() else { return Ok(None) };
985 let device = machine.get_device(user_id, device_id, None).await?;
986 Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
987 }
988
989 pub async fn get_own_device(&self) -> Result<Option<Device>, CryptoStoreError> {
996 let olm = self.client.olm_machine().await;
997 let Some(machine) = olm.as_ref() else { return Ok(None) };
998 let device = machine.get_device(machine.user_id(), machine.device_id(), None).await?;
999 Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
1000 }
1001
1002 pub async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices, Error> {
1028 let devices = self
1029 .client
1030 .olm_machine()
1031 .await
1032 .as_ref()
1033 .ok_or(Error::NoOlmMachine)?
1034 .get_user_devices(user_id, None)
1035 .await?;
1036
1037 Ok(UserDevices { inner: devices, client: self.client.clone() })
1038 }
1039
1040 pub async fn get_user_identity(
1076 &self,
1077 user_id: &UserId,
1078 ) -> Result<Option<UserIdentity>, CryptoStoreError> {
1079 let olm = self.client.olm_machine().await;
1080 let Some(olm) = olm.as_ref() else { return Ok(None) };
1081 let identity = olm.get_identity(user_id, None).await?;
1082
1083 Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1084 }
1085
1086 pub async fn request_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
1124 let olm = self.client.olm_machine().await;
1125 let Some(olm) = olm.as_ref() else { return Ok(None) };
1126
1127 let (request_id, request) = olm.query_keys_for_users(iter::once(user_id));
1128 self.client.keys_query(&request_id, request.device_keys).await?;
1129
1130 let identity = olm.get_identity(user_id, None).await?;
1131 Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1132 }
1133
1134 pub async fn devices_stream(&self) -> Result<impl Stream<Item = DeviceUpdates> + use<>> {
1165 let olm = self.client.olm_machine().await;
1166 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1167 let client = self.client.to_owned();
1168
1169 Ok(olm
1170 .store()
1171 .devices_stream()
1172 .map(move |updates| DeviceUpdates::new(client.to_owned(), updates)))
1173 }
1174
1175 pub async fn user_identities_stream(
1203 &self,
1204 ) -> Result<impl Stream<Item = IdentityUpdates> + use<>> {
1205 let olm = self.client.olm_machine().await;
1206 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1207 let client = self.client.to_owned();
1208
1209 Ok(olm
1210 .store()
1211 .user_identities_stream()
1212 .map(move |updates| IdentityUpdates::new(client.to_owned(), updates)))
1213 }
1214
1215 pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData>) -> Result<()> {
1254 let olm = self.client.olm_machine().await;
1255 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1256
1257 let CrossSigningBootstrapRequests {
1258 upload_signing_keys_req,
1259 upload_keys_req,
1260 upload_signatures_req,
1261 } = olm.bootstrap_cross_signing(false).await?;
1262
1263 let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1264 auth: auth_data,
1265 master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1266 self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1267 user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1268 });
1269
1270 if let Some(req) = upload_keys_req {
1271 self.client.send_outgoing_request(req).await?;
1272 }
1273 self.client.send(upload_signing_keys_req).await?;
1274 self.client.send(upload_signatures_req).await?;
1275
1276 Ok(())
1277 }
1278
1279 pub async fn reset_cross_signing(&self) -> Result<Option<CrossSigningResetHandle>> {
1316 let olm = self.client.olm_machine().await;
1317 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1318
1319 let CrossSigningBootstrapRequests {
1320 upload_keys_req,
1321 upload_signing_keys_req,
1322 upload_signatures_req,
1323 } = olm.bootstrap_cross_signing(true).await?;
1324
1325 let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1326 auth: None,
1327 master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1328 self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1329 user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1330 });
1331
1332 if let Some(req) = upload_keys_req {
1333 self.client.send_outgoing_request(req).await?;
1334 }
1335
1336 if let Err(error) = self.client.send(upload_signing_keys_req.clone()).await {
1337 if let Ok(Some(auth_type)) = CrossSigningResetAuthType::new(&error) {
1338 let client = self.client.clone();
1339
1340 Ok(Some(CrossSigningResetHandle::new(
1341 client,
1342 upload_signing_keys_req,
1343 upload_signatures_req,
1344 auth_type,
1345 )))
1346 } else {
1347 Err(error.into())
1348 }
1349 } else {
1350 self.client.send(upload_signatures_req).await?;
1351
1352 Ok(None)
1353 }
1354 }
1355
1356 async fn ensure_initial_key_query(&self) -> Result<()> {
1359 let olm_machine = self.client.olm_machine().await;
1360 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1361
1362 let user_id = olm_machine.user_id();
1363
1364 if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1365 let (request_id, request) = olm_machine.query_keys_for_users([olm_machine.user_id()]);
1366 self.client.keys_query(&request_id, request.device_keys).await?;
1367 }
1368
1369 Ok(())
1370 }
1371
1372 pub async fn bootstrap_cross_signing_if_needed(
1419 &self,
1420 auth_data: Option<AuthData>,
1421 ) -> Result<()> {
1422 let olm_machine = self.client.olm_machine().await;
1423 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1424 let user_id = olm_machine.user_id();
1425
1426 self.ensure_initial_key_query().await?;
1427
1428 if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1429 self.bootstrap_cross_signing(auth_data).await?;
1430 }
1431
1432 Ok(())
1433 }
1434
1435 #[cfg(not(target_family = "wasm"))]
1487 pub async fn export_room_keys(
1488 &self,
1489 path: PathBuf,
1490 passphrase: &str,
1491 predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
1492 ) -> Result<()> {
1493 let olm = self.client.olm_machine().await;
1494 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1495
1496 let keys = olm.store().export_room_keys(predicate).await?;
1497 let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1498
1499 let encrypt = move || -> Result<()> {
1500 let export: String =
1501 matrix_sdk_base::crypto::encrypt_room_key_export(&keys, &passphrase, 500_000)?;
1502 let mut file = std::fs::File::create(path)?;
1503 file.write_all(&export.into_bytes())?;
1504 Ok(())
1505 };
1506
1507 let task = tokio::task::spawn_blocking(encrypt);
1508 task.await.expect("Task join error")
1509 }
1510
1511 #[cfg(not(target_family = "wasm"))]
1549 pub async fn import_room_keys(
1550 &self,
1551 path: PathBuf,
1552 passphrase: &str,
1553 ) -> Result<RoomKeyImportResult, RoomKeyImportError> {
1554 let olm = self.client.olm_machine().await;
1555 let olm = olm.as_ref().ok_or(RoomKeyImportError::StoreClosed)?;
1556 let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1557
1558 let decrypt = move || {
1559 let file = std::fs::File::open(path)?;
1560 matrix_sdk_base::crypto::decrypt_room_key_export(file, &passphrase)
1561 };
1562
1563 let task = tokio::task::spawn_blocking(decrypt);
1564 let import = task.await.expect("Task join error")?;
1565
1566 let ret = olm.store().import_exported_room_keys(import, |_, _| {}).await?;
1567
1568 self.backups().maybe_trigger_backup();
1569
1570 Ok(ret)
1571 }
1572
1573 pub async fn room_keys_received_stream(
1604 &self,
1605 ) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<>>
1606 {
1607 let olm = self.client.olm_machine().await;
1608 let olm = olm.as_ref()?;
1609
1610 Some(olm.store().room_keys_received_stream())
1611 }
1612
1613 pub async fn historic_room_key_stream(
1644 &self,
1645 ) -> Option<impl Stream<Item = RoomKeyBundleInfo> + use<>> {
1646 let olm = self.client.olm_machine().await;
1647 let olm = olm.as_ref()?;
1648
1649 Some(olm.store().historic_room_key_stream())
1650 }
1651
1652 pub fn secret_storage(&self) -> SecretStorage {
1654 SecretStorage { client: self.client.to_owned() }
1655 }
1656
1657 pub fn backups(&self) -> Backups {
1659 Backups { client: self.client.to_owned() }
1660 }
1661
1662 pub fn recovery(&self) -> Recovery {
1664 Recovery { client: self.client.to_owned() }
1665 }
1666
1667 pub async fn enable_cross_process_store_lock(&self, lock_value: String) -> Result<(), Error> {
1679 if let Some(prev_lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1681 let prev_holder = prev_lock.lock_holder();
1682 if prev_holder == lock_value {
1683 return Ok(());
1684 }
1685 warn!(
1686 "Recreating cross-process store lock with a different holder value: \
1687 prev was {prev_holder}, new is {lock_value}"
1688 );
1689 }
1690
1691 let olm_machine = self.client.base_client().olm_machine().await;
1692 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1693
1694 let lock =
1695 olm_machine.store().create_store_lock("cross_process_lock".to_owned(), lock_value);
1696
1697 {
1702 let lock_result = lock.try_lock_once().await?;
1703
1704 if lock_result.is_ok() {
1705 olm_machine
1706 .initialize_crypto_store_generation(
1707 &self.client.locks().crypto_store_generation,
1708 )
1709 .await?;
1710 }
1711 }
1712
1713 self.client
1714 .locks()
1715 .cross_process_crypto_store_lock
1716 .set(lock)
1717 .map_err(|_| Error::BadCryptoStoreState)?;
1718
1719 Ok(())
1720 }
1721
1722 async fn on_lock_newly_acquired(&self) -> Result<u64, Error> {
1727 let olm_machine_guard = self.client.olm_machine().await;
1728 if let Some(olm_machine) = olm_machine_guard.as_ref() {
1729 let (new_gen, generation_number) = olm_machine
1730 .maintain_crypto_store_generation(&self.client.locks().crypto_store_generation)
1731 .await?;
1732 if new_gen {
1734 drop(olm_machine_guard);
1736 self.client.base_client().regenerate_olm(None).await?;
1738 }
1739 Ok(generation_number)
1740 } else {
1741 warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised");
1746 Ok(0)
1747 }
1748 }
1749
1750 pub async fn spin_lock_store(
1756 &self,
1757 max_backoff: Option<u32>,
1758 ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1759 if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1760 let guard = lock
1761 .spin_lock(max_backoff)
1762 .await
1763 .map_err(|err| {
1764 Error::CrossProcessLockError(Box::new(CrossProcessLockError::TryLock(
1765 Box::new(err),
1766 )))
1767 })?
1768 .map_err(|err| Error::CrossProcessLockError(Box::new(err.into())))?;
1769
1770 let generation = self.on_lock_newly_acquired().await?;
1771
1772 Ok(Some(CrossProcessLockStoreGuardWithGeneration {
1773 _guard: guard.into_guard(),
1774 generation,
1775 }))
1776 } else {
1777 Ok(None)
1778 }
1779 }
1780
1781 pub async fn try_lock_store_once(
1786 &self,
1787 ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1788 if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1789 let lock_result = lock.try_lock_once().await?;
1790
1791 let Some(guard) = lock_result.ok() else {
1792 return Ok(None);
1793 };
1794
1795 let generation = self.on_lock_newly_acquired().await?;
1796
1797 Ok(Some(CrossProcessLockStoreGuardWithGeneration {
1798 _guard: guard.into_guard(),
1799 generation,
1800 }))
1801 } else {
1802 Ok(None)
1803 }
1804 }
1805
1806 #[cfg(any(test, feature = "testing"))]
1808 pub async fn uploaded_key_count(&self) -> Result<u64> {
1809 let olm_machine = self.client.olm_machine().await;
1810 let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?;
1811 Ok(olm_machine.uploaded_key_count().await?)
1812 }
1813
1814 pub(crate) async fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1838 let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite {
1842 Some(BundleReceiverTask::new(&self.client).await)
1843 } else {
1844 None
1845 };
1846
1847 let mut tasks = self.client.inner.e2ee.tasks.lock();
1848
1849 let this = self.clone();
1850
1851 tasks.setup_e2ee = Some(spawn(async move {
1852 this.update_verification_state().await;
1855
1856 if this.settings().auto_enable_cross_signing
1857 && let Err(e) = this.bootstrap_cross_signing_if_needed(auth_data).await
1858 {
1859 error!("Couldn't bootstrap cross signing {e:?}");
1860 }
1861
1862 if let Err(e) = this.backups().setup_and_resume().await {
1863 error!("Couldn't setup and resume backups {e:?}");
1864 }
1865 if let Err(e) = this.recovery().setup().await {
1866 error!("Couldn't setup and resume recovery {e:?}");
1867 }
1868 }));
1869
1870 tasks.receive_historic_room_key_bundles = bundle_receiver_task;
1871 }
1872
1873 pub async fn wait_for_e2ee_initialization_tasks(&self) {
1876 let task = self.client.inner.e2ee.tasks.lock().setup_e2ee.take();
1877
1878 if let Some(task) = task
1879 && let Err(err) = task.await
1880 {
1881 warn!("Error when initializing backups: {err}");
1882 }
1883 }
1884
1885 pub(crate) async fn ensure_device_keys_upload(&self) -> Result<()> {
1895 let olm = self.client.olm_machine().await;
1896 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1897
1898 if let Some((request_id, request)) = olm.upload_device_keys().await? {
1899 self.client.keys_upload(&request_id, &request).await?;
1900
1901 let (request_id, request) = olm.query_keys_for_users([olm.user_id()]);
1902 self.client.keys_query(&request_id, request.device_keys).await?;
1903 }
1904
1905 Ok(())
1906 }
1907
1908 pub(crate) async fn update_state_after_keys_query(&self, response: &get_keys::v3::Response) {
1909 self.recovery().update_state_after_keys_query(response).await;
1910
1911 if let Some(user_id) = self.client.user_id() {
1913 let contains_own_device = response.device_keys.contains_key(user_id);
1914
1915 if contains_own_device {
1916 self.update_verification_state().await;
1917 }
1918 }
1919 }
1920
1921 async fn update_verification_state(&self) {
1922 match self.get_own_device().await {
1923 Ok(device) => {
1924 if let Some(device) = device {
1925 let is_verified = device.is_cross_signed_by_owner();
1926
1927 if is_verified {
1928 self.client.inner.verification_state.set(VerificationState::Verified);
1929 } else {
1930 self.client.inner.verification_state.set(VerificationState::Unverified);
1931 }
1932 } else {
1933 warn!("Couldn't find out own device in the store.");
1934 self.client.inner.verification_state.set(VerificationState::Unknown);
1935 }
1936 }
1937 Err(error) => {
1938 warn!("Failed retrieving own device: {error}");
1939 self.client.inner.verification_state.set(VerificationState::Unknown);
1940 }
1941 }
1942 }
1943
1944 #[cfg(feature = "experimental-send-custom-to-device")]
1953 pub async fn encrypt_and_send_raw_to_device(
1954 &self,
1955 recipient_devices: Vec<&Device>,
1956 event_type: &str,
1957 content: Raw<AnyToDeviceEventContent>,
1958 share_strategy: CollectStrategy,
1959 ) -> Result<Vec<(OwnedUserId, OwnedDeviceId)>> {
1960 let users = recipient_devices.iter().map(|device| device.user_id());
1961
1962 self.client.claim_one_time_keys(users).await?;
1966
1967 let olm = self.client.olm_machine().await;
1968 let olm = olm.as_ref().expect("Olm machine wasn't started");
1969
1970 let (requests, withhelds) = olm
1971 .encrypt_content_for_devices(
1972 recipient_devices.into_iter().map(|d| d.deref().clone()).collect(),
1973 event_type,
1974 &content
1975 .deserialize_as::<serde_json::Value>()
1976 .expect("Deserialize as Value will always work"),
1977 share_strategy,
1978 )
1979 .await?;
1980
1981 let mut failures: Vec<(OwnedUserId, OwnedDeviceId)> = Default::default();
1982
1983 withhelds.iter().for_each(|(d, _)| {
1985 failures.push((d.user_id().to_owned(), d.device_id().to_owned()));
1986 });
1987
1988 for request in requests {
1990 let ruma_request = RumaToDeviceRequest::new_raw(
1991 request.event_type.clone(),
1992 request.txn_id.clone(),
1993 request.messages.clone(),
1994 );
1995
1996 let send_result = self
1997 .client
1998 .send_inner(ruma_request, Some(RequestConfig::short_retry()), Default::default())
1999 .await;
2000
2001 if send_result.is_err() {
2003 for (user_id, device_map) in request.messages {
2005 for device_id in device_map.keys() {
2006 match device_id {
2007 DeviceIdOrAllDevices::DeviceId(device_id) => {
2008 failures.push((user_id.clone(), device_id.to_owned()));
2009 }
2010 DeviceIdOrAllDevices::AllDevices => {
2011 }
2013 }
2014 }
2015 }
2016 }
2017 }
2018
2019 Ok(failures)
2020 }
2021}
2022
2023#[cfg(all(test, not(target_family = "wasm")))]
2024mod tests {
2025 use std::{
2026 ops::Not,
2027 str::FromStr,
2028 sync::{
2029 Arc,
2030 atomic::{AtomicBool, Ordering},
2031 },
2032 time::Duration,
2033 };
2034
2035 use matrix_sdk_test::{
2036 DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
2037 test_json,
2038 };
2039 use ruma::{
2040 event_id,
2041 events::{reaction::ReactionEventContent, relation::Annotation},
2042 };
2043 use serde_json::json;
2044 use wiremock::{
2045 Mock, MockServer, Request, ResponseTemplate,
2046 matchers::{header, method, path_regex},
2047 };
2048
2049 use crate::{
2050 Client, assert_next_matches_with_timeout,
2051 config::RequestConfig,
2052 encryption::{
2053 DuplicateOneTimeKeyErrorMessage, OAuthCrossSigningResetInfo, VerificationState,
2054 },
2055 test_utils::{
2056 client::mock_matrix_session, logged_in_client, no_retry_test_client, set_client_session,
2057 },
2058 };
2059
2060 #[async_test]
2061 async fn test_reaction_sending() {
2062 let server = MockServer::start().await;
2063 let client = logged_in_client(Some(server.uri())).await;
2064
2065 let event_id = event_id!("$2:example.org");
2066
2067 Mock::given(method("GET"))
2068 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
2069 .and(header("authorization", "Bearer 1234"))
2070 .respond_with(
2071 ResponseTemplate::new(200)
2072 .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
2073 )
2074 .mount(&server)
2075 .await;
2076
2077 Mock::given(method("PUT"))
2078 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/m\.reaction/.*".to_owned()))
2079 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2080 "event_id": event_id,
2081 })))
2082 .mount(&server)
2083 .await;
2084
2085 let response = SyncResponseBuilder::default()
2086 .add_joined_room(
2087 JoinedRoomBuilder::default()
2088 .add_state_event(StateTestEvent::Member)
2089 .add_state_event(StateTestEvent::PowerLevels)
2090 .add_state_event(StateTestEvent::Encryption),
2091 )
2092 .build_sync_response();
2093
2094 client.base_client().receive_sync_response(response).await.unwrap();
2095
2096 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
2097 assert!(
2098 room.latest_encryption_state().await.expect("Getting encryption state").is_encrypted()
2099 );
2100
2101 let event_id = event_id!("$1:example.org");
2102 let reaction = ReactionEventContent::new(Annotation::new(event_id.into(), "🐈".to_owned()));
2103 room.send(reaction).await.expect("Sending the reaction should not fail");
2104
2105 room.send_raw("m.reaction", json!({})).await.expect("Sending the reaction should not fail");
2106 }
2107
2108 #[cfg(feature = "sqlite")]
2109 #[async_test]
2110 async fn test_generation_counter_invalidates_olm_machine() {
2111 use matrix_sdk_base::store::RoomLoadSettings;
2114 let sqlite_path = std::env::temp_dir().join("generation_counter_sqlite.db");
2115 let session = mock_matrix_session();
2116
2117 let client1 = Client::builder()
2118 .homeserver_url("http://localhost:1234")
2119 .request_config(RequestConfig::new().disable_retry())
2120 .sqlite_store(&sqlite_path, None)
2121 .build()
2122 .await
2123 .unwrap();
2124 client1
2125 .matrix_auth()
2126 .restore_session(session.clone(), RoomLoadSettings::default())
2127 .await
2128 .unwrap();
2129
2130 let client2 = Client::builder()
2131 .homeserver_url("http://localhost:1234")
2132 .request_config(RequestConfig::new().disable_retry())
2133 .sqlite_store(sqlite_path, None)
2134 .build()
2135 .await
2136 .unwrap();
2137 client2.matrix_auth().restore_session(session, RoomLoadSettings::default()).await.unwrap();
2138
2139 let guard = client1.encryption().try_lock_store_once().await.unwrap();
2141 assert!(guard.is_none());
2142
2143 client1.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2144 client2.encryption().enable_cross_process_store_lock("client2".to_owned()).await.unwrap();
2145
2146 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2148 assert!(acquired1.is_some());
2149
2150 let initial_olm_machine =
2152 client1.olm_machine().await.clone().expect("must have an olm machine");
2153
2154 let decryption_key = matrix_sdk_base::crypto::store::types::BackupDecryptionKey::new()
2156 .expect("Can't create new recovery key");
2157 let backup_key = decryption_key.megolm_v1_public_key();
2158 backup_key.set_version("1".to_owned());
2159 initial_olm_machine
2160 .backup_machine()
2161 .save_decryption_key(Some(decryption_key.to_owned()), Some("1".to_owned()))
2162 .await
2163 .expect("Should save");
2164
2165 initial_olm_machine.backup_machine().enable_backup_v1(backup_key.clone()).await.unwrap();
2166
2167 assert!(client1.encryption().backups().are_enabled().await);
2168
2169 let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2171 assert!(acquired2.is_none());
2172
2173 drop(acquired1);
2175 tokio::time::sleep(Duration::from_millis(100)).await;
2176
2177 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2179 assert!(acquired1.is_some());
2180
2181 let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2183 assert!(initial_olm_machine.same_as(&olm_machine));
2184
2185 drop(acquired1);
2187 tokio::time::sleep(Duration::from_millis(100)).await;
2188
2189 let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2191 assert!(acquired2.is_some());
2192
2193 drop(acquired2);
2195 tokio::time::sleep(Duration::from_millis(100)).await;
2196
2197 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2199 assert!(acquired1.is_some());
2200
2201 let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2203
2204 assert!(!initial_olm_machine.same_as(&olm_machine));
2205
2206 let backup_key_new = olm_machine.backup_machine().get_backup_keys().await.unwrap();
2207 assert!(backup_key_new.decryption_key.is_some());
2208 assert_eq!(
2209 backup_key_new.decryption_key.unwrap().megolm_v1_public_key().to_base64(),
2210 backup_key.to_base64()
2211 );
2212 assert!(client1.encryption().backups().are_enabled().await);
2213 }
2214
2215 #[cfg(feature = "sqlite")]
2216 #[async_test]
2217 async fn test_generation_counter_no_spurious_invalidation() {
2218 use matrix_sdk_base::store::RoomLoadSettings;
2221 let sqlite_path =
2222 std::env::temp_dir().join("generation_counter_no_spurious_invalidations.db");
2223 let session = mock_matrix_session();
2224
2225 let client = Client::builder()
2226 .homeserver_url("http://localhost:1234")
2227 .request_config(RequestConfig::new().disable_retry())
2228 .sqlite_store(&sqlite_path, None)
2229 .build()
2230 .await
2231 .unwrap();
2232 client
2233 .matrix_auth()
2234 .restore_session(session.clone(), RoomLoadSettings::default())
2235 .await
2236 .unwrap();
2237
2238 let initial_olm_machine = client.olm_machine().await.as_ref().unwrap().clone();
2239
2240 client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2241
2242 let after_enabling_lock = client.olm_machine().await.as_ref().unwrap().clone();
2244 assert!(initial_olm_machine.same_as(&after_enabling_lock));
2245
2246 {
2247 let client2 = Client::builder()
2249 .homeserver_url("http://localhost:1234")
2250 .request_config(RequestConfig::new().disable_retry())
2251 .sqlite_store(sqlite_path, None)
2252 .build()
2253 .await
2254 .unwrap();
2255 client2
2256 .matrix_auth()
2257 .restore_session(session, RoomLoadSettings::default())
2258 .await
2259 .unwrap();
2260
2261 client2
2262 .encryption()
2263 .enable_cross_process_store_lock("client2".to_owned())
2264 .await
2265 .unwrap();
2266
2267 let guard = client2.encryption().spin_lock_store(None).await.unwrap();
2268 assert!(guard.is_some());
2269
2270 drop(guard);
2271 tokio::time::sleep(Duration::from_millis(100)).await;
2272 }
2273
2274 {
2275 let acquired = client.encryption().try_lock_store_once().await.unwrap();
2276 assert!(acquired.is_some());
2277 }
2278
2279 let after_taking_lock_first_time = client.olm_machine().await.as_ref().unwrap().clone();
2281 assert!(!initial_olm_machine.same_as(&after_taking_lock_first_time));
2282
2283 {
2284 let acquired = client.encryption().try_lock_store_once().await.unwrap();
2285 assert!(acquired.is_some());
2286 }
2287
2288 let after_taking_lock_second_time = client.olm_machine().await.as_ref().unwrap().clone();
2290 assert!(after_taking_lock_first_time.same_as(&after_taking_lock_second_time));
2291 }
2292
2293 #[async_test]
2294 async fn test_update_verification_state_is_updated_before_any_requests_happen() {
2295 let client = no_retry_test_client(None).await;
2297 let server = MockServer::start().await;
2298
2299 let mut verification_state = client.encryption().verification_state();
2301
2302 assert_next_matches_with_timeout!(verification_state, VerificationState::Unknown);
2304
2305 let keys_requested = Arc::new(AtomicBool::new(false));
2308 let inner_bool = keys_requested.clone();
2309
2310 Mock::given(method("GET"))
2311 .and(path_regex(
2312 r"/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
2313 ))
2314 .respond_with(move |_req: &Request| {
2315 inner_bool.fetch_or(true, Ordering::SeqCst);
2316 ResponseTemplate::new(200).set_body_json(json!({}))
2317 })
2318 .mount(&server)
2319 .await;
2320
2321 set_client_session(&client).await;
2323
2324 assert!(keys_requested.load(Ordering::SeqCst).not());
2326 assert_next_matches_with_timeout!(verification_state, VerificationState::Unverified);
2327 }
2328
2329 #[test]
2330 fn test_oauth_reset_info_from_uiaa_info() {
2331 let auth_info = json!({
2332 "session": "dummy",
2333 "flows": [
2334 {
2335 "stages": [
2336 "org.matrix.cross_signing_reset"
2337 ]
2338 }
2339 ],
2340 "params": {
2341 "org.matrix.cross_signing_reset": {
2342 "url": "https://example.org/account/account?action=org.matrix.cross_signing_reset"
2343 }
2344 },
2345 "msg": "To reset..."
2346 });
2347
2348 let auth_info = serde_json::from_value(auth_info)
2349 .expect("We should be able to deserialize the UiaaInfo");
2350 OAuthCrossSigningResetInfo::from_auth_info(&auth_info)
2351 .expect("We should be able to fetch the cross-signing reset info from the auth info");
2352 }
2353
2354 #[test]
2355 fn test_duplicate_one_time_key_error_parsing() {
2356 let message = concat!(
2357 r#"One time key signed_curve25519:AAAAAAAAAAA already exists. "#,
2358 r#"Old key: {"key":"dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs","#,
2359 r#""signatures":{"@example:matrix.org":{"ed25519:AAAAAAAAAA":""#,
2360 r#"Fk45zHAbrd+1j9wZXLjL2Y/+DU/Mnz9yuvlfYBOOT7qExN2Jdud+5BAuNs8nZ/caS4wTF39Kg3zQpzaGERoCBg"}}};"#,
2361 r#" new key: {'key': 'CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno', "#,
2362 r#"'signatures': {'@example:matrix.org': {'ed25519:AAAAAAAAAA': "#,
2363 r#"'BQ9Gp0p+6srF+c8OyruqKKd9R4yaub3THYAyyBB/7X/rG8BwcAqFynzl1aGyFYun4Q+087a5OSiglCXI+/kQAA'}}}"#
2364 );
2365 let message = DuplicateOneTimeKeyErrorMessage::from_str(message)
2366 .expect("We should be able to parse the error message");
2367
2368 assert_eq!(message.old_key.to_base64(), "dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs");
2369 assert_eq!(message.new_key.to_base64(), "CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno");
2370
2371 DuplicateOneTimeKeyErrorMessage::from_str("One time key already exists.")
2372 .expect_err("We shouldn't be able to parse an incomplete error message");
2373 }
2374
2375 fn devices_to_verify_against_keys_query_response(
2379 devices: Vec<serde_json::Value>,
2380 ) -> serde_json::Value {
2381 let device_keys: serde_json::Map<String, serde_json::Value> = devices
2382 .into_iter()
2383 .map(|device| (device.get("device_id").unwrap().as_str().unwrap().to_owned(), device))
2384 .collect();
2385 json!({
2386 "device_keys": {
2387 "@example:localhost": device_keys,
2388 },
2389 "master_keys": {
2390 "@example:localhost": {
2391 "keys": {
2392 "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU",
2393 },
2394 "usage": ["master"],
2395 "user_id": "@example:localhost",
2396 },
2397 },
2398 "self_signing_keys": {
2399 "@example:localhost": {
2400 "keys": {
2401 "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM",
2402 },
2403 "usage": ["self_signing"],
2404 "user_id": "@example:localhost",
2405 "signatures": {
2406 "@example:localhost": {
2407 "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "etO1bB+rCk+TQ/FcjQ8eWu/RsRNQNNQ1Ek+PD6//j8yz6igRjfvuHZaMvr/quAFrirfgExph2TdOwlDgN5bFCQ",
2408 },
2409 },
2410 },
2411 },
2412 "user_signing_keys": {
2413 "@example:localhost": {
2414 "keys": {
2415 "ed25519:CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0": "CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0",
2416 },
2417 "usage": ["user_signing"],
2418 "user_id": "@example:localhost",
2419 "signatures": {
2420 "@example:localhost": {
2421 "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "E/DFi/hQTIb/7eSB+HbCXeTLFaLjqWHzLO9GwjL1qdhfO7ew4p6YdtXSH3T2YYr1dKCPteH/4nMYVwOhww2CBg",
2422 },
2423 },
2424 },
2425 }
2426 })
2427 }
2428
2429 #[async_test]
2432 async fn test_devices_to_verify_against_no_devices() {
2435 let server = MockServer::start().await;
2436 let client = logged_in_client(Some(server.uri())).await;
2437
2438 Mock::given(method("POST"))
2439 .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2440 .respond_with(
2441 ResponseTemplate::new(200)
2442 .set_body_json(devices_to_verify_against_keys_query_response(vec![])),
2443 )
2444 .mount(&server)
2445 .await;
2446
2447 assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2448 }
2449
2450 #[async_test]
2451 async fn test_devices_to_verify_against_cross_signed() {
2454 let server = MockServer::start().await;
2455 let client = logged_in_client(Some(server.uri())).await;
2456
2457 Mock::given(method("POST"))
2458 .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2459 .respond_with(ResponseTemplate::new(200).set_body_json(
2460 devices_to_verify_against_keys_query_response(vec![
2461 json!({
2462 "algorithms": [
2463 "m.olm.v1.curve25519-aes-sha2",
2464 "m.megolm.v1.aes-sha2",
2465 ],
2466 "user_id": "@example:localhost",
2467 "device_id": "SIGNEDDEVICE",
2468 "keys": {
2469 "curve25519:SIGNEDDEVICE": "o1LqUtH/sqd3WF+BB2Qr77uw3sDmZhMOz68/IV9aHxs",
2470 "ed25519:SIGNEDDEVICE": "iVoEfMOoUqxXVMLdpZCOgvQuCrT3/kQWkBmB3Phi/lo",
2471 },
2472 "signatures": {
2473 "@example:localhost": {
2474 "ed25519:SIGNEDDEVICE": "C7yRu1fNrdD2EobVdtANMqk3LBtWtTRWrIU22xVS8/Om1kmA/luzek64R3N6JsZhYczVmZYBKhUC9kRvHHwOBg",
2475 "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "frfh2HP28GclmGvwTic00Fj4nZCvm4RlRA6U56mnD5920hOi04+L055ojzp6ybZXvC/GQYfyTHwQXlUN1nvxBA",
2476 },
2477 },
2478 })
2479 ])
2480 ))
2481 .mount(&server)
2482 .await;
2483
2484 assert!(client.encryption().has_devices_to_verify_against().await.unwrap());
2485 }
2486
2487 #[async_test]
2488 async fn test_devices_to_verify_against_dehydrated_and_unsigned() {
2491 let server = MockServer::start().await;
2492 let client = logged_in_client(Some(server.uri())).await;
2493 let user_id = client.user_id().unwrap();
2494 let olm_machine = client.olm_machine().await;
2495 let olm_machine = olm_machine.as_ref().unwrap();
2496
2497 Mock::given(method("POST"))
2498 .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2499 .respond_with(ResponseTemplate::new(200).set_body_json(
2500 devices_to_verify_against_keys_query_response(vec![
2501 json!({
2502 "algorithms": [
2503 "m.olm.v1.curve25519-aes-sha2",
2504 "m.megolm.v1.aes-sha2",
2505 ],
2506 "user_id": "@example:localhost",
2507 "device_id": "DEHYDRATEDDEVICE",
2508 "keys": {
2509 "curve25519:DEHYDRATEDDEVICE": "XOn5VguAgokZ3p9mBz2yOB395fn6j75G8jIPcXEWQGY",
2510 "ed25519:DEHYDRATEDDEVICE": "4GG5xmBT7z4rgUgmWNlKZ+ABE3QlGgTorF+luCnKfYI",
2511 },
2512 "dehydrated": true,
2513 "signatures": {
2514 "@example:localhost": {
2515 "ed25519:DEHYDRATEDDEVICE": "+OMasB7nzVlMV+zRDxkh4h8h/Q0bY42P1SPv7X2IURIelT5G+d+AYSmg30N4maphxEDBqt/vI8/lIr71exc3Dg",
2516 "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "8DzynAgbYgXX1Md5d4Vw91Zstpoi4dpG7levFeVhi4psCAWuBnV76Qu1s2TGjQQ0CLDXEqcxxuX9X4eUK5TGCg",
2517 },
2518 },
2519 }),
2520 json!({
2521 "algorithms": [
2522 "m.olm.v1.curve25519-aes-sha2",
2523 "m.megolm.v1.aes-sha2",
2524 ],
2525 "user_id": "@example:localhost",
2526 "device_id": "UNSIGNEDDEVICE",
2527 "keys": {
2528 "curve25519:UNSIGNEDDEVICE": "mMby6NpprkHxj+ONfO9Z5lBqVUHJBMkrPFSNJhogBkg",
2529 "ed25519:UNSIGNEDDEVICE": "Zifq39ZDrlIaSRf0Hh22owEqXCPE+1JSSgs6LDlubwQ",
2530 },
2531 "signatures": {
2532 "@example:localhost": {
2533 "ed25519:UNSIGNEDDEVICE": "+L29RoDKoTufPGm/Bae65KHno7Z1H7GYhxSKpB4RQZRS7NrR29AMW1PVhEsIozYuDVEFuMZ0L8H3dlcaHxagBA",
2534 },
2535 },
2536 }),
2537 ])
2538 ))
2539 .mount(&server)
2540 .await;
2541
2542 let (request_id, request) = olm_machine.query_keys_for_users([user_id]);
2543 client.keys_query(&request_id, request.device_keys).await.unwrap();
2544
2545 assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2546 }
2547}