1#![doc = include_str!("../docs/encryption.md")]
17#![cfg_attr(target_family = "wasm", allow(unused_imports))]
18
19#[cfg(feature = "experimental-send-custom-to-device")]
20use std::ops::Deref;
21use std::{
22 collections::{BTreeMap, HashSet},
23 io::{Cursor, Read, Write},
24 iter,
25 path::PathBuf,
26 str::FromStr,
27 sync::Arc,
28};
29
30use eyeball::{SharedObservable, Subscriber};
31use futures_core::Stream;
32use futures_util::{
33 future::try_join,
34 stream::{self, StreamExt},
35};
36#[cfg(feature = "experimental-send-custom-to-device")]
37use matrix_sdk_base::crypto::CollectStrategy;
38use matrix_sdk_base::{
39 StateStoreDataKey, StateStoreDataValue,
40 cross_process_lock::CrossProcessLockError,
41 crypto::{
42 CrossSigningBootstrapRequests, OlmMachine,
43 store::types::{RoomKeyBundleInfo, RoomKeyInfo},
44 types::{
45 SignedKey,
46 requests::{
47 OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
48 },
49 },
50 },
51};
52use matrix_sdk_common::{executor::spawn, locks::Mutex as StdMutex};
53use ruma::{
54 DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, TransactionId, UserId,
55 api::client::{
56 error::{ErrorBody, StandardErrorBody},
57 keys::{
58 get_keys, upload_keys, upload_signatures::v3::Request as UploadSignaturesRequest,
59 upload_signing_keys::v3::Request as UploadSigningKeysRequest,
60 },
61 message::send_message_event,
62 to_device::send_event_to_device::v3::{
63 Request as RumaToDeviceRequest, Response as ToDeviceResponse,
64 },
65 uiaa::{AuthData, AuthType, OAuthParams, UiaaInfo},
66 },
67 assign,
68 events::room::{MediaSource, ThumbnailInfo},
69};
70#[cfg(feature = "experimental-send-custom-to-device")]
71use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices};
72use serde::{Deserialize, de::Error as _};
73use tasks::BundleReceiverTask;
74use tokio::sync::{Mutex, RwLockReadGuard};
75use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
76use tracing::{debug, error, instrument, warn};
77use url::Url;
78use vodozemac::Curve25519PublicKey;
79
80use self::{
81 backups::{Backups, types::BackupClientState},
82 futures::UploadEncryptedFile,
83 identities::{Device, DeviceUpdates, IdentityUpdates, UserDevices, UserIdentity},
84 recovery::{Recovery, RecoveryState},
85 secret_storage::SecretStorage,
86 tasks::{BackupDownloadTask, BackupUploadingTask, ClientTasks},
87 verification::{SasVerification, Verification, VerificationRequest},
88};
89use crate::{
90 Client, Error, HttpError, Result, RumaApiError, TransmissionProgress,
91 attachment::Thumbnail,
92 client::{ClientInner, WeakClient},
93 cross_process_lock::CrossProcessLockGuard,
94 error::HttpResult,
95};
96
97pub mod backups;
98pub mod futures;
99pub mod identities;
100pub mod recovery;
101pub mod secret_storage;
102pub(crate) mod tasks;
103pub mod verification;
104
105pub use matrix_sdk_base::crypto::{
106 CrossSigningStatus, CryptoStoreError, DecryptorError, EventError, KeyExportError, LocalTrust,
107 MediaEncryptionInfo, MegolmError, OlmError, RoomKeyImportResult, SecretImportError,
108 SessionCreationError, SignatureError, VERSION,
109 olm::{
110 SessionCreationError as MegolmSessionCreationError,
111 SessionExportError as OlmSessionExportError,
112 },
113 vodozemac,
114};
115
116#[cfg(feature = "experimental-send-custom-to-device")]
117use crate::config::RequestConfig;
118pub use crate::error::RoomKeyImportError;
119
120pub(crate) struct EncryptionData {
122 pub tasks: StdMutex<ClientTasks>,
125
126 pub encryption_settings: EncryptionSettings,
128
129 pub backup_state: BackupClientState,
131
132 pub recovery_state: SharedObservable<RecoveryState>,
134}
135
136impl EncryptionData {
137 pub fn new(encryption_settings: EncryptionSettings) -> Self {
138 Self {
139 encryption_settings,
140
141 tasks: StdMutex::new(Default::default()),
142 backup_state: Default::default(),
143 recovery_state: Default::default(),
144 }
145 }
146
147 pub fn initialize_tasks(&self, client: &Arc<ClientInner>) {
148 let weak_client = WeakClient::from_inner(client);
149
150 let mut tasks = self.tasks.lock();
151 tasks.upload_room_keys = Some(BackupUploadingTask::new(weak_client.clone()));
152
153 if self.encryption_settings.backup_download_strategy
154 == BackupDownloadStrategy::AfterDecryptionFailure
155 {
156 tasks.download_room_keys = Some(BackupDownloadTask::new(weak_client));
157 }
158 }
159
160 pub fn initialize_recovery_state_update_task(&self, client: &Client) {
166 let mut guard = self.tasks.lock();
167
168 let future = Recovery::update_state_after_backup_state_change(client);
169 let join_handle = spawn(future);
170
171 guard.update_recovery_state_after_backup = Some(join_handle);
172 }
173}
174
175#[derive(Clone, Copy, Debug, Default)]
177pub struct EncryptionSettings {
178 pub auto_enable_cross_signing: bool,
184
185 pub backup_download_strategy: BackupDownloadStrategy,
190
191 pub auto_enable_backups: bool,
193}
194
195#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
197#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
198pub enum BackupDownloadStrategy {
199 OneShot,
210
211 AfterDecryptionFailure,
213
214 #[default]
219 Manual,
220}
221
222#[derive(Clone, Copy, Debug, Eq, PartialEq)]
227pub enum VerificationState {
228 Unknown,
230 Verified,
233 Unverified,
235}
236
237#[derive(Debug)]
239pub struct CrossProcessLockStoreGuardWithGeneration {
240 _guard: CrossProcessLockGuard,
241 generation: u64,
242}
243
244impl CrossProcessLockStoreGuardWithGeneration {
245 pub fn generation(&self) -> u64 {
247 self.generation
248 }
249}
250
251#[derive(Debug)]
262pub struct CrossSigningResetHandle {
263 client: Client,
264 upload_request: UploadSigningKeysRequest,
265 signatures_request: UploadSignaturesRequest,
266 auth_type: CrossSigningResetAuthType,
267 is_cancelled: Mutex<bool>,
268}
269
270impl CrossSigningResetHandle {
271 pub fn new(
273 client: Client,
274 upload_request: UploadSigningKeysRequest,
275 signatures_request: UploadSignaturesRequest,
276 auth_type: CrossSigningResetAuthType,
277 ) -> Self {
278 Self {
279 client,
280 upload_request,
281 signatures_request,
282 auth_type,
283 is_cancelled: Mutex::new(false),
284 }
285 }
286
287 pub fn auth_type(&self) -> &CrossSigningResetAuthType {
290 &self.auth_type
291 }
292
293 pub async fn auth(&self, auth: Option<AuthData>) -> Result<()> {
297 let mut upload_request = self.upload_request.clone();
298 upload_request.auth = auth;
299
300 while let Err(e) = self.client.send(upload_request.clone()).await {
301 if *self.is_cancelled.lock().await {
302 return Ok(());
303 }
304
305 match e.as_uiaa_response() {
306 Some(uiaa_info) => {
307 if uiaa_info.auth_error.is_some() {
308 return Err(e.into());
309 }
310 }
311 None => return Err(e.into()),
312 }
313 }
314
315 self.client.send(self.signatures_request.clone()).await?;
316
317 Ok(())
318 }
319
320 pub async fn cancel(&self) {
322 *self.is_cancelled.lock().await = true;
323 }
324}
325
326#[derive(Debug, Clone)]
329pub enum CrossSigningResetAuthType {
330 Uiaa(UiaaInfo),
332 OAuth(OAuthCrossSigningResetInfo),
335}
336
337impl CrossSigningResetAuthType {
338 fn new(error: &HttpError) -> Result<Option<Self>> {
339 if let Some(auth_info) = error.as_uiaa_response() {
340 if let Ok(Some(auth_info)) = OAuthCrossSigningResetInfo::from_auth_info(auth_info) {
341 Ok(Some(CrossSigningResetAuthType::OAuth(auth_info)))
342 } else {
343 Ok(Some(CrossSigningResetAuthType::Uiaa(auth_info.clone())))
344 }
345 } else {
346 Ok(None)
347 }
348 }
349}
350
351#[derive(Debug, Clone, Deserialize)]
354pub struct OAuthCrossSigningResetInfo {
355 pub approval_url: Url,
357}
358
359impl OAuthCrossSigningResetInfo {
360 fn from_auth_info(auth_info: &UiaaInfo) -> Result<Option<Self>> {
361 let Some(parameters) = auth_info.params::<OAuthParams>(&AuthType::OAuth)? else {
362 return Ok(None);
363 };
364
365 Ok(Some(OAuthCrossSigningResetInfo { approval_url: parameters.url.as_str().try_into()? }))
366 }
367}
368
369#[derive(Debug)]
372struct DuplicateOneTimeKeyErrorMessage {
373 old_key: Curve25519PublicKey,
375 new_key: Curve25519PublicKey,
377}
378
379impl FromStr for DuplicateOneTimeKeyErrorMessage {
380 type Err = serde_json::Error;
381
382 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
383 let mut split = s.split_terminator(';');
387
388 let old_key = split
389 .next()
390 .ok_or(serde_json::Error::custom("Old key is missing in the error message"))?;
391 let new_key = split
392 .next()
393 .ok_or(serde_json::Error::custom("New key is missing in the error message"))?;
394
395 let old_key_index = old_key
398 .find("Old key:")
399 .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
400
401 let old_key = old_key[old_key_index..]
402 .trim()
403 .strip_prefix("Old key:")
404 .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
405
406 let new_key = new_key
409 .trim()
410 .strip_prefix("new key:")
411 .ok_or(serde_json::Error::custom("New key is missing the prefix"))?;
412
413 let new_key = new_key.replace("'", "\"");
416
417 let old_key: SignedKey = serde_json::from_str(old_key)?;
419 let new_key: SignedKey = serde_json::from_str(&new_key)?;
420
421 let old_key = old_key.key();
423 let new_key = new_key.key();
424
425 Ok(Self { old_key, new_key })
426 }
427}
428
429impl Client {
430 pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
431 self.base_client().olm_machine().await
432 }
433
434 pub(crate) async fn mark_request_as_sent(
435 &self,
436 request_id: &TransactionId,
437 response: impl Into<matrix_sdk_base::crypto::types::requests::AnyIncomingResponse<'_>>,
438 ) -> Result<(), matrix_sdk_base::Error> {
439 Ok(self
440 .olm_machine()
441 .await
442 .as_ref()
443 .expect(
444 "We should have an olm machine once we try to mark E2EE related requests as sent",
445 )
446 .mark_request_as_sent(request_id, response)
447 .await?)
448 }
449
450 #[instrument(skip(self, device_keys))]
456 pub(crate) async fn keys_query(
457 &self,
458 request_id: &TransactionId,
459 device_keys: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
460 ) -> Result<get_keys::v3::Response> {
461 let request = assign!(get_keys::v3::Request::new(), { device_keys });
462
463 let response = self.send(request).await?;
464 self.mark_request_as_sent(request_id, &response).await?;
465 self.encryption().update_state_after_keys_query(&response).await;
466
467 Ok(response)
468 }
469
470 pub fn upload_encrypted_file<'a, R: Read + ?Sized + 'a>(
504 &'a self,
505 reader: &'a mut R,
506 ) -> UploadEncryptedFile<'a, R> {
507 UploadEncryptedFile::new(self, reader)
508 }
509
510 pub(crate) async fn upload_encrypted_media_and_thumbnail(
513 &self,
514 data: &[u8],
515 thumbnail: Option<Thumbnail>,
516 send_progress: SharedObservable<TransmissionProgress>,
517 ) -> Result<(MediaSource, Option<(MediaSource, Box<ThumbnailInfo>)>)> {
518 let upload_thumbnail = self.upload_encrypted_thumbnail(thumbnail, send_progress.clone());
519
520 let upload_attachment = async {
521 let mut cursor = Cursor::new(data);
522 self.upload_encrypted_file(&mut cursor)
523 .with_send_progress_observable(send_progress)
524 .await
525 };
526
527 let (thumbnail, file) = try_join(upload_thumbnail, upload_attachment).await?;
528
529 Ok((MediaSource::Encrypted(Box::new(file)), thumbnail))
530 }
531
532 async fn upload_encrypted_thumbnail(
535 &self,
536 thumbnail: Option<Thumbnail>,
537 send_progress: SharedObservable<TransmissionProgress>,
538 ) -> Result<Option<(MediaSource, Box<ThumbnailInfo>)>> {
539 let Some(thumbnail) = thumbnail else {
540 return Ok(None);
541 };
542
543 let (data, _, thumbnail_info) = thumbnail.into_parts();
544 let mut cursor = Cursor::new(data);
545
546 let file = self
547 .upload_encrypted_file(&mut cursor)
548 .with_send_progress_observable(send_progress)
549 .await?;
550
551 Ok(Some((MediaSource::Encrypted(Box::new(file)), thumbnail_info)))
552 }
553
554 pub(crate) async fn claim_one_time_keys(
560 &self,
561 users: impl Iterator<Item = &UserId>,
562 ) -> Result<()> {
563 let _lock = self.locks().key_claim_lock.lock().await;
564
565 if let Some((request_id, request)) = self
566 .olm_machine()
567 .await
568 .as_ref()
569 .ok_or(Error::NoOlmMachine)?
570 .get_missing_sessions(users)
571 .await?
572 {
573 let response = self.send(request).await?;
574 self.mark_request_as_sent(&request_id, &response).await?;
575 }
576
577 Ok(())
578 }
579
580 #[instrument(skip(self, request))]
590 pub(crate) async fn keys_upload(
591 &self,
592 request_id: &TransactionId,
593 request: &upload_keys::v3::Request,
594 ) -> Result<upload_keys::v3::Response> {
595 debug!(
596 device_keys = request.device_keys.is_some(),
597 one_time_key_count = request.one_time_keys.len(),
598 "Uploading public encryption keys",
599 );
600
601 let response = self.send(request.clone()).await?;
602 self.mark_request_as_sent(request_id, &response).await?;
603
604 Ok(response)
605 }
606
607 pub(crate) async fn room_send_helper(
608 &self,
609 request: &RoomMessageRequest,
610 ) -> Result<send_message_event::v3::Response> {
611 let content = request.content.clone();
612 let txn_id = request.txn_id.clone();
613 let room_id = &request.room_id;
614
615 self.get_room(room_id)
616 .expect("Can't send a message to a room that isn't known to the store")
617 .send(*content)
618 .with_transaction_id(txn_id)
619 .await
620 .map(|result| result.response)
621 }
622
623 pub(crate) async fn send_to_device(
624 &self,
625 request: &ToDeviceRequest,
626 ) -> HttpResult<ToDeviceResponse> {
627 let request = RumaToDeviceRequest::new_raw(
628 request.event_type.clone(),
629 request.txn_id.clone(),
630 request.messages.clone(),
631 );
632
633 self.send(request).await
634 }
635
636 pub(crate) async fn send_verification_request(
637 &self,
638 request: OutgoingVerificationRequest,
639 ) -> Result<()> {
640 use matrix_sdk_base::crypto::types::requests::OutgoingVerificationRequest::*;
641
642 match request {
643 ToDevice(t) => {
644 self.send_to_device(&t).await?;
645 }
646 InRoom(r) => {
647 self.room_send_helper(&r).await?;
648 }
649 }
650
651 Ok(())
652 }
653
654 async fn send_outgoing_request(&self, r: OutgoingRequest) -> Result<()> {
655 use matrix_sdk_base::crypto::types::requests::AnyOutgoingRequest;
656
657 match r.request() {
658 AnyOutgoingRequest::KeysQuery(request) => {
659 self.keys_query(r.request_id(), request.device_keys.clone()).await?;
660 }
661 AnyOutgoingRequest::KeysUpload(request) => {
662 let response = self.keys_upload(r.request_id(), request).await;
663
664 if let Err(e) = &response {
665 match e.as_ruma_api_error() {
666 Some(RumaApiError::ClientApi(e)) if e.status_code == 400 => {
667 if let ErrorBody::Standard(StandardErrorBody { message, .. }) = &e.body
668 {
669 {
674 let already_reported = self
675 .state_store()
676 .get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded)
677 .await?
678 .is_some();
679
680 if message.starts_with("One time key") && !already_reported {
681 if let Ok(message) =
682 DuplicateOneTimeKeyErrorMessage::from_str(message)
683 {
684 error!(
685 sentry = true,
686 old_key = %message.old_key,
687 new_key = %message.new_key,
688 "Duplicate one-time keys have been uploaded"
689 );
690 } else {
691 error!(
692 sentry = true,
693 "Duplicate one-time keys have been uploaded"
694 );
695 }
696
697 self.state_store()
698 .set_kv_data(
699 StateStoreDataKey::OneTimeKeyAlreadyUploaded,
700 StateStoreDataValue::OneTimeKeyAlreadyUploaded,
701 )
702 .await?;
703 }
704 }
705 }
706 }
707 _ => {}
708 }
709
710 response?;
711 }
712 }
713 AnyOutgoingRequest::ToDeviceRequest(request) => {
714 let response = self.send_to_device(request).await?;
715 self.mark_request_as_sent(r.request_id(), &response).await?;
716 }
717 AnyOutgoingRequest::SignatureUpload(request) => {
718 let response = self.send(request.clone()).await?;
719 self.mark_request_as_sent(r.request_id(), &response).await?;
720 }
721 AnyOutgoingRequest::RoomMessage(request) => {
722 let response = self.room_send_helper(request).await?;
723 self.mark_request_as_sent(r.request_id(), &response).await?;
724 }
725 AnyOutgoingRequest::KeysClaim(request) => {
726 let response = self.send(request.clone()).await?;
727 self.mark_request_as_sent(r.request_id(), &response).await?;
728 }
729 }
730
731 Ok(())
732 }
733
734 #[instrument(skip_all)]
735 pub(crate) async fn send_outgoing_requests(&self) -> Result<()> {
736 const MAX_CONCURRENT_REQUESTS: usize = 20;
737
738 if let Err(e) = self.claim_one_time_keys(iter::empty()).await {
741 warn!("Error while claiming one-time keys {:?}", e);
742 }
743
744 let outgoing_requests = stream::iter(
745 self.olm_machine()
746 .await
747 .as_ref()
748 .ok_or(Error::NoOlmMachine)?
749 .outgoing_requests()
750 .await?,
751 )
752 .map(|r| self.send_outgoing_request(r));
753
754 let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS);
755
756 requests
757 .for_each(|r| async move {
758 match r {
759 Ok(_) => (),
760 Err(e) => warn!(error = ?e, "Error when sending out an outgoing E2EE request"),
761 }
762 })
763 .await;
764
765 Ok(())
766 }
767}
768
769#[cfg(any(feature = "testing", test))]
770impl Client {
771 pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
773 self.olm_machine().await
774 }
775}
776
777#[derive(Debug, Clone)]
781pub struct Encryption {
782 client: Client,
784}
785
786impl Encryption {
787 pub(crate) fn new(client: Client) -> Self {
788 Self { client }
789 }
790
791 pub(crate) fn settings(&self) -> EncryptionSettings {
793 self.client.inner.e2ee.encryption_settings
794 }
795
796 pub async fn ed25519_key(&self) -> Option<String> {
799 self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().ed25519.to_base64())
800 }
801
802 pub async fn curve25519_key(&self) -> Option<Curve25519PublicKey> {
804 self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().curve25519)
805 }
806
807 pub async fn device_creation_timestamp(&self) -> MilliSecondsSinceUnixEpoch {
809 match self.get_own_device().await {
810 Ok(Some(device)) => device.first_time_seen_ts(),
811 _ => MilliSecondsSinceUnixEpoch::now(),
813 }
814 }
815
816 pub(crate) async fn import_secrets_bundle(
817 &self,
818 bundle: &matrix_sdk_base::crypto::types::SecretsBundle,
819 ) -> Result<(), SecretImportError> {
820 let olm_machine = self.client.olm_machine().await;
821 let olm_machine =
822 olm_machine.as_ref().expect("This should only be called once we have an OlmMachine");
823
824 olm_machine.store().import_secrets_bundle(bundle).await
825 }
826
827 pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
832 let olm = self.client.olm_machine().await;
833 let machine = olm.as_ref()?;
834 Some(machine.cross_signing_status().await)
835 }
836
837 pub async fn has_devices_to_verify_against(&self) -> Result<bool> {
843 let olm_machine = self.client.olm_machine().await;
844 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
845 let user_id = olm_machine.user_id();
846
847 self.ensure_initial_key_query().await?;
848
849 let devices = self.get_user_devices(user_id).await?;
850
851 let ret = devices.devices().any(|device| {
852 device.is_cross_signed_by_owner()
853 && device.curve25519_key().is_some()
854 && !device.is_dehydrated()
855 });
856
857 Ok(ret)
858 }
859
860 pub async fn tracked_users(&self) -> Result<HashSet<OwnedUserId>, CryptoStoreError> {
865 if let Some(machine) = self.client.olm_machine().await.as_ref() {
866 machine.tracked_users().await
867 } else {
868 Ok(HashSet::new())
869 }
870 }
871
872 pub fn verification_state(&self) -> Subscriber<VerificationState> {
895 self.client.inner.verification_state.subscribe_reset()
896 }
897
898 pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
900 let olm = self.client.olm_machine().await;
901 let olm = olm.as_ref()?;
902 #[allow(clippy::bind_instead_of_map)]
903 olm.get_verification(user_id, flow_id).and_then(|v| match v {
904 matrix_sdk_base::crypto::Verification::SasV1(sas) => {
905 Some(SasVerification { inner: sas, client: self.client.clone() }.into())
906 }
907 #[cfg(feature = "qrcode")]
908 matrix_sdk_base::crypto::Verification::QrV1(qr) => {
909 Some(verification::QrVerification { inner: qr, client: self.client.clone() }.into())
910 }
911 _ => None,
912 })
913 }
914
915 pub async fn get_verification_request(
918 &self,
919 user_id: &UserId,
920 flow_id: impl AsRef<str>,
921 ) -> Option<VerificationRequest> {
922 let olm = self.client.olm_machine().await;
923 let olm = olm.as_ref()?;
924
925 olm.get_verification_request(user_id, flow_id)
926 .map(|r| VerificationRequest { inner: r, client: self.client.clone() })
927 }
928
929 pub async fn get_device(
963 &self,
964 user_id: &UserId,
965 device_id: &DeviceId,
966 ) -> Result<Option<Device>, CryptoStoreError> {
967 let olm = self.client.olm_machine().await;
968 let Some(machine) = olm.as_ref() else { return Ok(None) };
969 let device = machine.get_device(user_id, device_id, None).await?;
970 Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
971 }
972
973 pub async fn get_own_device(&self) -> Result<Option<Device>, CryptoStoreError> {
980 let olm = self.client.olm_machine().await;
981 let Some(machine) = olm.as_ref() else { return Ok(None) };
982 let device = machine.get_device(machine.user_id(), machine.device_id(), None).await?;
983 Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
984 }
985
986 pub async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices, Error> {
1012 let devices = self
1013 .client
1014 .olm_machine()
1015 .await
1016 .as_ref()
1017 .ok_or(Error::NoOlmMachine)?
1018 .get_user_devices(user_id, None)
1019 .await?;
1020
1021 Ok(UserDevices { inner: devices, client: self.client.clone() })
1022 }
1023
1024 pub async fn get_user_identity(
1060 &self,
1061 user_id: &UserId,
1062 ) -> Result<Option<UserIdentity>, CryptoStoreError> {
1063 let olm = self.client.olm_machine().await;
1064 let Some(olm) = olm.as_ref() else { return Ok(None) };
1065 let identity = olm.get_identity(user_id, None).await?;
1066
1067 Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1068 }
1069
1070 pub async fn request_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
1108 let olm = self.client.olm_machine().await;
1109 let Some(olm) = olm.as_ref() else { return Ok(None) };
1110
1111 let (request_id, request) = olm.query_keys_for_users(iter::once(user_id));
1112 self.client.keys_query(&request_id, request.device_keys).await?;
1113
1114 let identity = olm.get_identity(user_id, None).await?;
1115 Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1116 }
1117
1118 pub async fn devices_stream(&self) -> Result<impl Stream<Item = DeviceUpdates> + use<>> {
1149 let olm = self.client.olm_machine().await;
1150 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1151 let client = self.client.to_owned();
1152
1153 Ok(olm
1154 .store()
1155 .devices_stream()
1156 .map(move |updates| DeviceUpdates::new(client.to_owned(), updates)))
1157 }
1158
1159 pub async fn user_identities_stream(
1187 &self,
1188 ) -> Result<impl Stream<Item = IdentityUpdates> + use<>> {
1189 let olm = self.client.olm_machine().await;
1190 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1191 let client = self.client.to_owned();
1192
1193 Ok(olm
1194 .store()
1195 .user_identities_stream()
1196 .map(move |updates| IdentityUpdates::new(client.to_owned(), updates)))
1197 }
1198
1199 pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData>) -> Result<()> {
1238 let olm = self.client.olm_machine().await;
1239 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1240
1241 let CrossSigningBootstrapRequests {
1242 upload_signing_keys_req,
1243 upload_keys_req,
1244 upload_signatures_req,
1245 } = olm.bootstrap_cross_signing(false).await?;
1246
1247 let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1248 auth: auth_data,
1249 master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1250 self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1251 user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1252 });
1253
1254 if let Some(req) = upload_keys_req {
1255 self.client.send_outgoing_request(req).await?;
1256 }
1257 self.client.send(upload_signing_keys_req).await?;
1258 self.client.send(upload_signatures_req).await?;
1259
1260 Ok(())
1261 }
1262
1263 pub async fn reset_cross_signing(&self) -> Result<Option<CrossSigningResetHandle>> {
1300 let olm = self.client.olm_machine().await;
1301 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1302
1303 let CrossSigningBootstrapRequests {
1304 upload_keys_req,
1305 upload_signing_keys_req,
1306 upload_signatures_req,
1307 } = olm.bootstrap_cross_signing(true).await?;
1308
1309 let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1310 auth: None,
1311 master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1312 self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1313 user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1314 });
1315
1316 if let Some(req) = upload_keys_req {
1317 self.client.send_outgoing_request(req).await?;
1318 }
1319
1320 if let Err(error) = self.client.send(upload_signing_keys_req.clone()).await {
1321 if let Ok(Some(auth_type)) = CrossSigningResetAuthType::new(&error) {
1322 let client = self.client.clone();
1323
1324 Ok(Some(CrossSigningResetHandle::new(
1325 client,
1326 upload_signing_keys_req,
1327 upload_signatures_req,
1328 auth_type,
1329 )))
1330 } else {
1331 Err(error.into())
1332 }
1333 } else {
1334 self.client.send(upload_signatures_req).await?;
1335
1336 Ok(None)
1337 }
1338 }
1339
1340 async fn ensure_initial_key_query(&self) -> Result<()> {
1343 let olm_machine = self.client.olm_machine().await;
1344 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1345
1346 let user_id = olm_machine.user_id();
1347
1348 if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1349 let (request_id, request) = olm_machine.query_keys_for_users([olm_machine.user_id()]);
1350 self.client.keys_query(&request_id, request.device_keys).await?;
1351 }
1352
1353 Ok(())
1354 }
1355
1356 pub async fn bootstrap_cross_signing_if_needed(
1403 &self,
1404 auth_data: Option<AuthData>,
1405 ) -> Result<()> {
1406 let olm_machine = self.client.olm_machine().await;
1407 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1408 let user_id = olm_machine.user_id();
1409
1410 self.ensure_initial_key_query().await?;
1411
1412 if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1413 self.bootstrap_cross_signing(auth_data).await?;
1414 }
1415
1416 Ok(())
1417 }
1418
1419 #[cfg(not(target_family = "wasm"))]
1471 pub async fn export_room_keys(
1472 &self,
1473 path: PathBuf,
1474 passphrase: &str,
1475 predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
1476 ) -> Result<()> {
1477 let olm = self.client.olm_machine().await;
1478 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1479
1480 let keys = olm.store().export_room_keys(predicate).await?;
1481 let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1482
1483 let encrypt = move || -> Result<()> {
1484 let export: String =
1485 matrix_sdk_base::crypto::encrypt_room_key_export(&keys, &passphrase, 500_000)?;
1486 let mut file = std::fs::File::create(path)?;
1487 file.write_all(&export.into_bytes())?;
1488 Ok(())
1489 };
1490
1491 let task = tokio::task::spawn_blocking(encrypt);
1492 task.await.expect("Task join error")
1493 }
1494
1495 #[cfg(not(target_family = "wasm"))]
1533 pub async fn import_room_keys(
1534 &self,
1535 path: PathBuf,
1536 passphrase: &str,
1537 ) -> Result<RoomKeyImportResult, RoomKeyImportError> {
1538 let olm = self.client.olm_machine().await;
1539 let olm = olm.as_ref().ok_or(RoomKeyImportError::StoreClosed)?;
1540 let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1541
1542 let decrypt = move || {
1543 let file = std::fs::File::open(path)?;
1544 matrix_sdk_base::crypto::decrypt_room_key_export(file, &passphrase)
1545 };
1546
1547 let task = tokio::task::spawn_blocking(decrypt);
1548 let import = task.await.expect("Task join error")?;
1549
1550 let ret = olm.store().import_exported_room_keys(import, |_, _| {}).await?;
1551
1552 self.backups().maybe_trigger_backup();
1553
1554 Ok(ret)
1555 }
1556
1557 pub async fn room_keys_received_stream(
1588 &self,
1589 ) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<>>
1590 {
1591 let olm = self.client.olm_machine().await;
1592 let olm = olm.as_ref()?;
1593
1594 Some(olm.store().room_keys_received_stream())
1595 }
1596
1597 pub async fn historic_room_key_stream(
1628 &self,
1629 ) -> Option<impl Stream<Item = RoomKeyBundleInfo> + use<>> {
1630 let olm = self.client.olm_machine().await;
1631 let olm = olm.as_ref()?;
1632
1633 Some(olm.store().historic_room_key_stream())
1634 }
1635
1636 pub fn secret_storage(&self) -> SecretStorage {
1638 SecretStorage { client: self.client.to_owned() }
1639 }
1640
1641 pub fn backups(&self) -> Backups {
1643 Backups { client: self.client.to_owned() }
1644 }
1645
1646 pub fn recovery(&self) -> Recovery {
1648 Recovery { client: self.client.to_owned() }
1649 }
1650
1651 pub async fn enable_cross_process_store_lock(&self, lock_value: String) -> Result<(), Error> {
1663 if let Some(prev_lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1665 let prev_holder = prev_lock.lock_holder();
1666 if prev_holder == lock_value {
1667 return Ok(());
1668 }
1669 warn!(
1670 "Recreating cross-process store lock with a different holder value: \
1671 prev was {prev_holder}, new is {lock_value}"
1672 );
1673 }
1674
1675 let olm_machine = self.client.base_client().olm_machine().await;
1676 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1677
1678 let lock =
1679 olm_machine.store().create_store_lock("cross_process_lock".to_owned(), lock_value);
1680
1681 {
1686 let lock_result = lock.try_lock_once().await?;
1687
1688 if lock_result.is_ok() {
1689 olm_machine
1690 .initialize_crypto_store_generation(
1691 &self.client.locks().crypto_store_generation,
1692 )
1693 .await?;
1694 }
1695 }
1696
1697 self.client
1698 .locks()
1699 .cross_process_crypto_store_lock
1700 .set(lock)
1701 .map_err(|_| Error::BadCryptoStoreState)?;
1702
1703 Ok(())
1704 }
1705
1706 async fn on_lock_newly_acquired(&self) -> Result<u64, Error> {
1711 let olm_machine_guard = self.client.olm_machine().await;
1712 if let Some(olm_machine) = olm_machine_guard.as_ref() {
1713 let (new_gen, generation_number) = olm_machine
1714 .maintain_crypto_store_generation(&self.client.locks().crypto_store_generation)
1715 .await?;
1716 if new_gen {
1718 drop(olm_machine_guard);
1720 self.client.base_client().regenerate_olm(None).await?;
1722 }
1723 Ok(generation_number)
1724 } else {
1725 warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised");
1730 Ok(0)
1731 }
1732 }
1733
1734 pub async fn spin_lock_store(
1740 &self,
1741 max_backoff: Option<u32>,
1742 ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1743 if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1744 let guard = lock
1745 .spin_lock(max_backoff)
1746 .await
1747 .map_err(|err| {
1748 Error::CrossProcessLockError(Box::new(CrossProcessLockError::TryLock(
1749 Box::new(err),
1750 )))
1751 })?
1752 .map_err(|err| Error::CrossProcessLockError(Box::new(err.into())))?;
1753
1754 let generation = self.on_lock_newly_acquired().await?;
1755
1756 Ok(Some(CrossProcessLockStoreGuardWithGeneration {
1757 _guard: guard.into_guard(),
1758 generation,
1759 }))
1760 } else {
1761 Ok(None)
1762 }
1763 }
1764
1765 pub async fn try_lock_store_once(
1770 &self,
1771 ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1772 if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1773 let lock_result = lock.try_lock_once().await?;
1774
1775 let Some(guard) = lock_result.ok() else {
1776 return Ok(None);
1777 };
1778
1779 let generation = self.on_lock_newly_acquired().await?;
1780
1781 Ok(Some(CrossProcessLockStoreGuardWithGeneration {
1782 _guard: guard.into_guard(),
1783 generation,
1784 }))
1785 } else {
1786 Ok(None)
1787 }
1788 }
1789
1790 #[cfg(any(test, feature = "testing"))]
1792 pub async fn uploaded_key_count(&self) -> Result<u64> {
1793 let olm_machine = self.client.olm_machine().await;
1794 let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?;
1795 Ok(olm_machine.uploaded_key_count().await?)
1796 }
1797
1798 pub(crate) async fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1822 let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite {
1826 Some(BundleReceiverTask::new(&self.client).await)
1827 } else {
1828 None
1829 };
1830
1831 let mut tasks = self.client.inner.e2ee.tasks.lock();
1832
1833 let this = self.clone();
1834
1835 tasks.setup_e2ee = Some(spawn(async move {
1836 this.update_verification_state().await;
1839
1840 if this.settings().auto_enable_cross_signing
1841 && let Err(e) = this.bootstrap_cross_signing_if_needed(auth_data).await
1842 {
1843 error!("Couldn't bootstrap cross signing {e:?}");
1844 }
1845
1846 if let Err(e) = this.backups().setup_and_resume().await {
1847 error!("Couldn't setup and resume backups {e:?}");
1848 }
1849 if let Err(e) = this.recovery().setup().await {
1850 error!("Couldn't setup and resume recovery {e:?}");
1851 }
1852 }));
1853
1854 tasks.receive_historic_room_key_bundles = bundle_receiver_task;
1855 }
1856
1857 pub async fn wait_for_e2ee_initialization_tasks(&self) {
1860 let task = self.client.inner.e2ee.tasks.lock().setup_e2ee.take();
1861
1862 if let Some(task) = task
1863 && let Err(err) = task.await
1864 {
1865 warn!("Error when initializing backups: {err}");
1866 }
1867 }
1868
1869 pub(crate) async fn ensure_device_keys_upload(&self) -> Result<()> {
1879 let olm = self.client.olm_machine().await;
1880 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1881
1882 if let Some((request_id, request)) = olm.upload_device_keys().await? {
1883 self.client.keys_upload(&request_id, &request).await?;
1884
1885 let (request_id, request) = olm.query_keys_for_users([olm.user_id()]);
1886 self.client.keys_query(&request_id, request.device_keys).await?;
1887 }
1888
1889 Ok(())
1890 }
1891
1892 pub(crate) async fn update_state_after_keys_query(&self, response: &get_keys::v3::Response) {
1893 self.recovery().update_state_after_keys_query(response).await;
1894
1895 if let Some(user_id) = self.client.user_id() {
1897 let contains_own_device = response.device_keys.contains_key(user_id);
1898
1899 if contains_own_device {
1900 self.update_verification_state().await;
1901 }
1902 }
1903 }
1904
1905 async fn update_verification_state(&self) {
1906 match self.get_own_device().await {
1907 Ok(device) => {
1908 if let Some(device) = device {
1909 let is_verified = device.is_cross_signed_by_owner();
1910
1911 if is_verified {
1912 self.client.inner.verification_state.set(VerificationState::Verified);
1913 } else {
1914 self.client.inner.verification_state.set(VerificationState::Unverified);
1915 }
1916 } else {
1917 warn!("Couldn't find out own device in the store.");
1918 self.client.inner.verification_state.set(VerificationState::Unknown);
1919 }
1920 }
1921 Err(error) => {
1922 warn!("Failed retrieving own device: {error}");
1923 self.client.inner.verification_state.set(VerificationState::Unknown);
1924 }
1925 }
1926 }
1927
1928 #[cfg(feature = "experimental-send-custom-to-device")]
1937 pub async fn encrypt_and_send_raw_to_device(
1938 &self,
1939 recipient_devices: Vec<&Device>,
1940 event_type: &str,
1941 content: Raw<AnyToDeviceEventContent>,
1942 share_strategy: CollectStrategy,
1943 ) -> Result<Vec<(OwnedUserId, OwnedDeviceId)>> {
1944 let users = recipient_devices.iter().map(|device| device.user_id());
1945
1946 self.client.claim_one_time_keys(users).await?;
1950
1951 let olm = self.client.olm_machine().await;
1952 let olm = olm.as_ref().expect("Olm machine wasn't started");
1953
1954 let (requests, withhelds) = olm
1955 .encrypt_content_for_devices(
1956 recipient_devices.into_iter().map(|d| d.deref().clone()).collect(),
1957 event_type,
1958 &content
1959 .deserialize_as::<serde_json::Value>()
1960 .expect("Deserialize as Value will always work"),
1961 share_strategy,
1962 )
1963 .await?;
1964
1965 let mut failures: Vec<(OwnedUserId, OwnedDeviceId)> = Default::default();
1966
1967 withhelds.iter().for_each(|(d, _)| {
1969 failures.push((d.user_id().to_owned(), d.device_id().to_owned()));
1970 });
1971
1972 for request in requests {
1974 let ruma_request = RumaToDeviceRequest::new_raw(
1975 request.event_type.clone(),
1976 request.txn_id.clone(),
1977 request.messages.clone(),
1978 );
1979
1980 let send_result = self
1981 .client
1982 .send_inner(ruma_request, Some(RequestConfig::short_retry()), Default::default())
1983 .await;
1984
1985 if send_result.is_err() {
1987 for (user_id, device_map) in request.messages {
1989 for device_id in device_map.keys() {
1990 match device_id {
1991 DeviceIdOrAllDevices::DeviceId(device_id) => {
1992 failures.push((user_id.clone(), device_id.to_owned()));
1993 }
1994 DeviceIdOrAllDevices::AllDevices => {
1995 }
1997 }
1998 }
1999 }
2000 }
2001 }
2002
2003 Ok(failures)
2004 }
2005}
2006
2007#[cfg(all(test, not(target_family = "wasm")))]
2008mod tests {
2009 use std::{
2010 ops::Not,
2011 str::FromStr,
2012 sync::{
2013 Arc,
2014 atomic::{AtomicBool, Ordering},
2015 },
2016 time::Duration,
2017 };
2018
2019 use matrix_sdk_test::{
2020 DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
2021 test_json,
2022 };
2023 use ruma::{
2024 event_id,
2025 events::{reaction::ReactionEventContent, relation::Annotation},
2026 };
2027 use serde_json::json;
2028 use wiremock::{
2029 Mock, MockServer, Request, ResponseTemplate,
2030 matchers::{header, method, path_regex},
2031 };
2032
2033 use crate::{
2034 Client, assert_next_matches_with_timeout,
2035 config::RequestConfig,
2036 encryption::{
2037 DuplicateOneTimeKeyErrorMessage, OAuthCrossSigningResetInfo, VerificationState,
2038 },
2039 test_utils::{
2040 client::mock_matrix_session, logged_in_client, no_retry_test_client, set_client_session,
2041 },
2042 };
2043
2044 #[async_test]
2045 async fn test_reaction_sending() {
2046 let server = MockServer::start().await;
2047 let client = logged_in_client(Some(server.uri())).await;
2048
2049 let event_id = event_id!("$2:example.org");
2050
2051 Mock::given(method("GET"))
2052 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
2053 .and(header("authorization", "Bearer 1234"))
2054 .respond_with(
2055 ResponseTemplate::new(200)
2056 .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
2057 )
2058 .mount(&server)
2059 .await;
2060
2061 Mock::given(method("PUT"))
2062 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/m\.reaction/.*".to_owned()))
2063 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2064 "event_id": event_id,
2065 })))
2066 .mount(&server)
2067 .await;
2068
2069 let response = SyncResponseBuilder::default()
2070 .add_joined_room(
2071 JoinedRoomBuilder::default()
2072 .add_state_event(StateTestEvent::Member)
2073 .add_state_event(StateTestEvent::PowerLevels)
2074 .add_state_event(StateTestEvent::Encryption),
2075 )
2076 .build_sync_response();
2077
2078 client.base_client().receive_sync_response(response).await.unwrap();
2079
2080 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
2081 assert!(
2082 room.latest_encryption_state().await.expect("Getting encryption state").is_encrypted()
2083 );
2084
2085 let event_id = event_id!("$1:example.org");
2086 let reaction = ReactionEventContent::new(Annotation::new(event_id.into(), "🐈".to_owned()));
2087 room.send(reaction).await.expect("Sending the reaction should not fail");
2088
2089 room.send_raw("m.reaction", json!({})).await.expect("Sending the reaction should not fail");
2090 }
2091
2092 #[cfg(feature = "sqlite")]
2093 #[async_test]
2094 async fn test_generation_counter_invalidates_olm_machine() {
2095 use matrix_sdk_base::store::RoomLoadSettings;
2098 let sqlite_path = std::env::temp_dir().join("generation_counter_sqlite.db");
2099 let session = mock_matrix_session();
2100
2101 let client1 = Client::builder()
2102 .homeserver_url("http://localhost:1234")
2103 .request_config(RequestConfig::new().disable_retry())
2104 .sqlite_store(&sqlite_path, None)
2105 .build()
2106 .await
2107 .unwrap();
2108 client1
2109 .matrix_auth()
2110 .restore_session(session.clone(), RoomLoadSettings::default())
2111 .await
2112 .unwrap();
2113
2114 let client2 = Client::builder()
2115 .homeserver_url("http://localhost:1234")
2116 .request_config(RequestConfig::new().disable_retry())
2117 .sqlite_store(sqlite_path, None)
2118 .build()
2119 .await
2120 .unwrap();
2121 client2.matrix_auth().restore_session(session, RoomLoadSettings::default()).await.unwrap();
2122
2123 let guard = client1.encryption().try_lock_store_once().await.unwrap();
2125 assert!(guard.is_none());
2126
2127 client1.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2128 client2.encryption().enable_cross_process_store_lock("client2".to_owned()).await.unwrap();
2129
2130 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2132 assert!(acquired1.is_some());
2133
2134 let initial_olm_machine =
2136 client1.olm_machine().await.clone().expect("must have an olm machine");
2137
2138 let decryption_key = matrix_sdk_base::crypto::store::types::BackupDecryptionKey::new()
2140 .expect("Can't create new recovery key");
2141 let backup_key = decryption_key.megolm_v1_public_key();
2142 backup_key.set_version("1".to_owned());
2143 initial_olm_machine
2144 .backup_machine()
2145 .save_decryption_key(Some(decryption_key.to_owned()), Some("1".to_owned()))
2146 .await
2147 .expect("Should save");
2148
2149 initial_olm_machine.backup_machine().enable_backup_v1(backup_key.clone()).await.unwrap();
2150
2151 assert!(client1.encryption().backups().are_enabled().await);
2152
2153 let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2155 assert!(acquired2.is_none());
2156
2157 drop(acquired1);
2159 tokio::time::sleep(Duration::from_millis(100)).await;
2160
2161 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2163 assert!(acquired1.is_some());
2164
2165 let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2167 assert!(initial_olm_machine.same_as(&olm_machine));
2168
2169 drop(acquired1);
2171 tokio::time::sleep(Duration::from_millis(100)).await;
2172
2173 let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2175 assert!(acquired2.is_some());
2176
2177 drop(acquired2);
2179 tokio::time::sleep(Duration::from_millis(100)).await;
2180
2181 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2183 assert!(acquired1.is_some());
2184
2185 let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2187
2188 assert!(!initial_olm_machine.same_as(&olm_machine));
2189
2190 let backup_key_new = olm_machine.backup_machine().get_backup_keys().await.unwrap();
2191 assert!(backup_key_new.decryption_key.is_some());
2192 assert_eq!(
2193 backup_key_new.decryption_key.unwrap().megolm_v1_public_key().to_base64(),
2194 backup_key.to_base64()
2195 );
2196 assert!(client1.encryption().backups().are_enabled().await);
2197 }
2198
2199 #[cfg(feature = "sqlite")]
2200 #[async_test]
2201 async fn test_generation_counter_no_spurious_invalidation() {
2202 use matrix_sdk_base::store::RoomLoadSettings;
2205 let sqlite_path =
2206 std::env::temp_dir().join("generation_counter_no_spurious_invalidations.db");
2207 let session = mock_matrix_session();
2208
2209 let client = Client::builder()
2210 .homeserver_url("http://localhost:1234")
2211 .request_config(RequestConfig::new().disable_retry())
2212 .sqlite_store(&sqlite_path, None)
2213 .build()
2214 .await
2215 .unwrap();
2216 client
2217 .matrix_auth()
2218 .restore_session(session.clone(), RoomLoadSettings::default())
2219 .await
2220 .unwrap();
2221
2222 let initial_olm_machine = client.olm_machine().await.as_ref().unwrap().clone();
2223
2224 client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2225
2226 let after_enabling_lock = client.olm_machine().await.as_ref().unwrap().clone();
2228 assert!(initial_olm_machine.same_as(&after_enabling_lock));
2229
2230 {
2231 let client2 = Client::builder()
2233 .homeserver_url("http://localhost:1234")
2234 .request_config(RequestConfig::new().disable_retry())
2235 .sqlite_store(sqlite_path, None)
2236 .build()
2237 .await
2238 .unwrap();
2239 client2
2240 .matrix_auth()
2241 .restore_session(session, RoomLoadSettings::default())
2242 .await
2243 .unwrap();
2244
2245 client2
2246 .encryption()
2247 .enable_cross_process_store_lock("client2".to_owned())
2248 .await
2249 .unwrap();
2250
2251 let guard = client2.encryption().spin_lock_store(None).await.unwrap();
2252 assert!(guard.is_some());
2253
2254 drop(guard);
2255 tokio::time::sleep(Duration::from_millis(100)).await;
2256 }
2257
2258 {
2259 let acquired = client.encryption().try_lock_store_once().await.unwrap();
2260 assert!(acquired.is_some());
2261 }
2262
2263 let after_taking_lock_first_time = client.olm_machine().await.as_ref().unwrap().clone();
2265 assert!(!initial_olm_machine.same_as(&after_taking_lock_first_time));
2266
2267 {
2268 let acquired = client.encryption().try_lock_store_once().await.unwrap();
2269 assert!(acquired.is_some());
2270 }
2271
2272 let after_taking_lock_second_time = client.olm_machine().await.as_ref().unwrap().clone();
2274 assert!(after_taking_lock_first_time.same_as(&after_taking_lock_second_time));
2275 }
2276
2277 #[async_test]
2278 async fn test_update_verification_state_is_updated_before_any_requests_happen() {
2279 let client = no_retry_test_client(None).await;
2281 let server = MockServer::start().await;
2282
2283 let mut verification_state = client.encryption().verification_state();
2285
2286 assert_next_matches_with_timeout!(verification_state, VerificationState::Unknown);
2288
2289 let keys_requested = Arc::new(AtomicBool::new(false));
2292 let inner_bool = keys_requested.clone();
2293
2294 Mock::given(method("GET"))
2295 .and(path_regex(
2296 r"/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
2297 ))
2298 .respond_with(move |_req: &Request| {
2299 inner_bool.fetch_or(true, Ordering::SeqCst);
2300 ResponseTemplate::new(200).set_body_json(json!({}))
2301 })
2302 .mount(&server)
2303 .await;
2304
2305 set_client_session(&client).await;
2307
2308 assert!(keys_requested.load(Ordering::SeqCst).not());
2310 assert_next_matches_with_timeout!(verification_state, VerificationState::Unverified);
2311 }
2312
2313 #[test]
2314 fn test_oauth_reset_info_from_uiaa_info() {
2315 let auth_info = json!({
2316 "session": "dummy",
2317 "flows": [
2318 {
2319 "stages": [
2320 "org.matrix.cross_signing_reset"
2321 ]
2322 }
2323 ],
2324 "params": {
2325 "org.matrix.cross_signing_reset": {
2326 "url": "https://example.org/account/account?action=org.matrix.cross_signing_reset"
2327 }
2328 },
2329 "msg": "To reset..."
2330 });
2331
2332 let auth_info = serde_json::from_value(auth_info)
2333 .expect("We should be able to deserialize the UiaaInfo");
2334 OAuthCrossSigningResetInfo::from_auth_info(&auth_info)
2335 .expect("We should be able to fetch the cross-signing reset info from the auth info");
2336 }
2337
2338 #[test]
2339 fn test_duplicate_one_time_key_error_parsing() {
2340 let message = concat!(
2341 r#"One time key signed_curve25519:AAAAAAAAAAA already exists. "#,
2342 r#"Old key: {"key":"dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs","#,
2343 r#""signatures":{"@example:matrix.org":{"ed25519:AAAAAAAAAA":""#,
2344 r#"Fk45zHAbrd+1j9wZXLjL2Y/+DU/Mnz9yuvlfYBOOT7qExN2Jdud+5BAuNs8nZ/caS4wTF39Kg3zQpzaGERoCBg"}}};"#,
2345 r#" new key: {'key': 'CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno', "#,
2346 r#"'signatures': {'@example:matrix.org': {'ed25519:AAAAAAAAAA': "#,
2347 r#"'BQ9Gp0p+6srF+c8OyruqKKd9R4yaub3THYAyyBB/7X/rG8BwcAqFynzl1aGyFYun4Q+087a5OSiglCXI+/kQAA'}}}"#
2348 );
2349 let message = DuplicateOneTimeKeyErrorMessage::from_str(message)
2350 .expect("We should be able to parse the error message");
2351
2352 assert_eq!(message.old_key.to_base64(), "dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs");
2353 assert_eq!(message.new_key.to_base64(), "CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno");
2354
2355 DuplicateOneTimeKeyErrorMessage::from_str("One time key already exists.")
2356 .expect_err("We shouldn't be able to parse an incomplete error message");
2357 }
2358
2359 fn devices_to_verify_against_keys_query_response(
2363 devices: Vec<serde_json::Value>,
2364 ) -> serde_json::Value {
2365 let device_keys: serde_json::Map<String, serde_json::Value> = devices
2366 .into_iter()
2367 .map(|device| (device.get("device_id").unwrap().as_str().unwrap().to_owned(), device))
2368 .collect();
2369 json!({
2370 "device_keys": {
2371 "@example:localhost": device_keys,
2372 },
2373 "master_keys": {
2374 "@example:localhost": {
2375 "keys": {
2376 "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU",
2377 },
2378 "usage": ["master"],
2379 "user_id": "@example:localhost",
2380 },
2381 },
2382 "self_signing_keys": {
2383 "@example:localhost": {
2384 "keys": {
2385 "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM",
2386 },
2387 "usage": ["self_signing"],
2388 "user_id": "@example:localhost",
2389 "signatures": {
2390 "@example:localhost": {
2391 "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "etO1bB+rCk+TQ/FcjQ8eWu/RsRNQNNQ1Ek+PD6//j8yz6igRjfvuHZaMvr/quAFrirfgExph2TdOwlDgN5bFCQ",
2392 },
2393 },
2394 },
2395 },
2396 "user_signing_keys": {
2397 "@example:localhost": {
2398 "keys": {
2399 "ed25519:CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0": "CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0",
2400 },
2401 "usage": ["user_signing"],
2402 "user_id": "@example:localhost",
2403 "signatures": {
2404 "@example:localhost": {
2405 "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "E/DFi/hQTIb/7eSB+HbCXeTLFaLjqWHzLO9GwjL1qdhfO7ew4p6YdtXSH3T2YYr1dKCPteH/4nMYVwOhww2CBg",
2406 },
2407 },
2408 },
2409 }
2410 })
2411 }
2412
2413 #[async_test]
2416 async fn test_devices_to_verify_against_no_devices() {
2419 let server = MockServer::start().await;
2420 let client = logged_in_client(Some(server.uri())).await;
2421
2422 Mock::given(method("POST"))
2423 .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2424 .respond_with(
2425 ResponseTemplate::new(200)
2426 .set_body_json(devices_to_verify_against_keys_query_response(vec![])),
2427 )
2428 .mount(&server)
2429 .await;
2430
2431 assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2432 }
2433
2434 #[async_test]
2435 async fn test_devices_to_verify_against_cross_signed() {
2438 let server = MockServer::start().await;
2439 let client = logged_in_client(Some(server.uri())).await;
2440
2441 Mock::given(method("POST"))
2442 .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2443 .respond_with(ResponseTemplate::new(200).set_body_json(
2444 devices_to_verify_against_keys_query_response(vec![
2445 json!({
2446 "algorithms": [
2447 "m.olm.v1.curve25519-aes-sha2",
2448 "m.megolm.v1.aes-sha2",
2449 ],
2450 "user_id": "@example:localhost",
2451 "device_id": "SIGNEDDEVICE",
2452 "keys": {
2453 "curve25519:SIGNEDDEVICE": "o1LqUtH/sqd3WF+BB2Qr77uw3sDmZhMOz68/IV9aHxs",
2454 "ed25519:SIGNEDDEVICE": "iVoEfMOoUqxXVMLdpZCOgvQuCrT3/kQWkBmB3Phi/lo",
2455 },
2456 "signatures": {
2457 "@example:localhost": {
2458 "ed25519:SIGNEDDEVICE": "C7yRu1fNrdD2EobVdtANMqk3LBtWtTRWrIU22xVS8/Om1kmA/luzek64R3N6JsZhYczVmZYBKhUC9kRvHHwOBg",
2459 "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "frfh2HP28GclmGvwTic00Fj4nZCvm4RlRA6U56mnD5920hOi04+L055ojzp6ybZXvC/GQYfyTHwQXlUN1nvxBA",
2460 },
2461 },
2462 })
2463 ])
2464 ))
2465 .mount(&server)
2466 .await;
2467
2468 assert!(client.encryption().has_devices_to_verify_against().await.unwrap());
2469 }
2470
2471 #[async_test]
2472 async fn test_devices_to_verify_against_dehydrated_and_unsigned() {
2475 let server = MockServer::start().await;
2476 let client = logged_in_client(Some(server.uri())).await;
2477 let user_id = client.user_id().unwrap();
2478 let olm_machine = client.olm_machine().await;
2479 let olm_machine = olm_machine.as_ref().unwrap();
2480
2481 Mock::given(method("POST"))
2482 .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2483 .respond_with(ResponseTemplate::new(200).set_body_json(
2484 devices_to_verify_against_keys_query_response(vec![
2485 json!({
2486 "algorithms": [
2487 "m.olm.v1.curve25519-aes-sha2",
2488 "m.megolm.v1.aes-sha2",
2489 ],
2490 "user_id": "@example:localhost",
2491 "device_id": "DEHYDRATEDDEVICE",
2492 "keys": {
2493 "curve25519:DEHYDRATEDDEVICE": "XOn5VguAgokZ3p9mBz2yOB395fn6j75G8jIPcXEWQGY",
2494 "ed25519:DEHYDRATEDDEVICE": "4GG5xmBT7z4rgUgmWNlKZ+ABE3QlGgTorF+luCnKfYI",
2495 },
2496 "dehydrated": true,
2497 "signatures": {
2498 "@example:localhost": {
2499 "ed25519:DEHYDRATEDDEVICE": "+OMasB7nzVlMV+zRDxkh4h8h/Q0bY42P1SPv7X2IURIelT5G+d+AYSmg30N4maphxEDBqt/vI8/lIr71exc3Dg",
2500 "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "8DzynAgbYgXX1Md5d4Vw91Zstpoi4dpG7levFeVhi4psCAWuBnV76Qu1s2TGjQQ0CLDXEqcxxuX9X4eUK5TGCg",
2501 },
2502 },
2503 }),
2504 json!({
2505 "algorithms": [
2506 "m.olm.v1.curve25519-aes-sha2",
2507 "m.megolm.v1.aes-sha2",
2508 ],
2509 "user_id": "@example:localhost",
2510 "device_id": "UNSIGNEDDEVICE",
2511 "keys": {
2512 "curve25519:UNSIGNEDDEVICE": "mMby6NpprkHxj+ONfO9Z5lBqVUHJBMkrPFSNJhogBkg",
2513 "ed25519:UNSIGNEDDEVICE": "Zifq39ZDrlIaSRf0Hh22owEqXCPE+1JSSgs6LDlubwQ",
2514 },
2515 "signatures": {
2516 "@example:localhost": {
2517 "ed25519:UNSIGNEDDEVICE": "+L29RoDKoTufPGm/Bae65KHno7Z1H7GYhxSKpB4RQZRS7NrR29AMW1PVhEsIozYuDVEFuMZ0L8H3dlcaHxagBA",
2518 },
2519 },
2520 }),
2521 ])
2522 ))
2523 .mount(&server)
2524 .await;
2525
2526 let (request_id, request) = olm_machine.query_keys_for_users([user_id]);
2527 client.keys_query(&request_id, request.device_keys).await.unwrap();
2528
2529 assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2530 }
2531}