1#![doc = include_str!("../docs/encryption.md")]
17#![cfg_attr(target_family = "wasm", allow(unused_imports))]
18
19#[cfg(feature = "experimental-send-custom-to-device")]
20use std::ops::Deref;
21use std::{
22 collections::{BTreeMap, HashSet},
23 io::{Cursor, Read, Write},
24 iter,
25 path::PathBuf,
26 str::FromStr,
27 sync::Arc,
28};
29
30use eyeball::{SharedObservable, Subscriber};
31use futures_core::Stream;
32use futures_util::{
33 future::try_join,
34 stream::{self, StreamExt},
35};
36#[cfg(feature = "experimental-send-custom-to-device")]
37use matrix_sdk_base::crypto::CollectStrategy;
38use matrix_sdk_base::{
39 StateStoreDataKey, StateStoreDataValue,
40 cross_process_lock::CrossProcessLockError,
41 crypto::{
42 CrossSigningBootstrapRequests, OlmMachine,
43 store::types::{RoomKeyBundleInfo, RoomKeyInfo},
44 types::{
45 SignedKey,
46 requests::{
47 OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
48 },
49 },
50 },
51};
52use matrix_sdk_common::{executor::spawn, locks::Mutex as StdMutex};
53use ruma::{
54 DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, TransactionId, UserId,
55 api::client::{
56 error::{ErrorBody, StandardErrorBody},
57 keys::{
58 get_keys, upload_keys, upload_signatures::v3::Request as UploadSignaturesRequest,
59 upload_signing_keys::v3::Request as UploadSigningKeysRequest,
60 },
61 message::send_message_event,
62 to_device::send_event_to_device::v3::{
63 Request as RumaToDeviceRequest, Response as ToDeviceResponse,
64 },
65 uiaa::{AuthData, AuthType, OAuthParams, UiaaInfo},
66 },
67 assign,
68 events::room::{MediaSource, ThumbnailInfo},
69};
70#[cfg(feature = "experimental-send-custom-to-device")]
71use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices};
72use serde::{Deserialize, de::Error as _};
73use tasks::BundleReceiverTask;
74use tokio::sync::{Mutex, RwLockReadGuard};
75use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
76use tracing::{debug, error, instrument, warn};
77use url::Url;
78use vodozemac::Curve25519PublicKey;
79
80use self::{
81 backups::{Backups, types::BackupClientState},
82 futures::UploadEncryptedFile,
83 identities::{Device, DeviceUpdates, IdentityUpdates, UserDevices, UserIdentity},
84 recovery::{Recovery, RecoveryState},
85 secret_storage::SecretStorage,
86 tasks::{BackupDownloadTask, BackupUploadingTask, ClientTasks},
87 verification::{SasVerification, Verification, VerificationRequest},
88};
89use crate::{
90 Client, Error, HttpError, Result, RumaApiError, TransmissionProgress,
91 attachment::Thumbnail,
92 client::{ClientInner, WeakClient},
93 cross_process_lock::CrossProcessLockGuard,
94 error::HttpResult,
95};
96
97pub mod backups;
98pub mod futures;
99pub mod identities;
100pub mod recovery;
101pub mod secret_storage;
102pub(crate) mod tasks;
103pub mod verification;
104
105pub use matrix_sdk_base::crypto::{
106 CrossSigningStatus, CryptoStoreError, DecryptorError, EventError, KeyExportError, LocalTrust,
107 MediaEncryptionInfo, MegolmError, OlmError, RoomKeyImportResult, SecretImportError,
108 SessionCreationError, SignatureError, VERSION,
109 olm::{
110 SessionCreationError as MegolmSessionCreationError,
111 SessionExportError as OlmSessionExportError,
112 },
113 vodozemac,
114};
115use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
116
117#[cfg(feature = "experimental-send-custom-to-device")]
118use crate::config::RequestConfig;
119pub use crate::error::RoomKeyImportError;
120
121pub(crate) struct EncryptionData {
123 pub tasks: StdMutex<ClientTasks>,
126
127 pub encryption_settings: EncryptionSettings,
129
130 pub backup_state: BackupClientState,
132
133 pub recovery_state: SharedObservable<RecoveryState>,
135}
136
137impl EncryptionData {
138 pub fn new(encryption_settings: EncryptionSettings) -> Self {
139 Self {
140 encryption_settings,
141
142 tasks: StdMutex::new(Default::default()),
143 backup_state: Default::default(),
144 recovery_state: Default::default(),
145 }
146 }
147
148 pub fn initialize_tasks(&self, client: &Arc<ClientInner>) {
149 let weak_client = WeakClient::from_inner(client);
150
151 let mut tasks = self.tasks.lock();
152 tasks.upload_room_keys = Some(BackupUploadingTask::new(weak_client.clone()));
153
154 if self.encryption_settings.backup_download_strategy
155 == BackupDownloadStrategy::AfterDecryptionFailure
156 {
157 tasks.download_room_keys = Some(BackupDownloadTask::new(weak_client));
158 }
159 }
160
161 pub fn initialize_recovery_state_update_task(&self, client: &Client) {
167 let mut guard = self.tasks.lock();
168
169 let future = Recovery::update_state_after_backup_state_change(client);
170 let join_handle = spawn(future);
171
172 guard.update_recovery_state_after_backup = Some(join_handle);
173 }
174}
175
176#[derive(Clone, Copy, Debug, Default)]
178pub struct EncryptionSettings {
179 pub auto_enable_cross_signing: bool,
185
186 pub backup_download_strategy: BackupDownloadStrategy,
191
192 pub auto_enable_backups: bool,
194}
195
196#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
198#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
199pub enum BackupDownloadStrategy {
200 OneShot,
211
212 AfterDecryptionFailure,
214
215 #[default]
220 Manual,
221}
222
223#[derive(Clone, Copy, Debug, Eq, PartialEq)]
228pub enum VerificationState {
229 Unknown,
231 Verified,
234 Unverified,
236}
237
238#[derive(Debug)]
240pub struct CrossProcessLockStoreGuardWithGeneration {
241 _guard: CrossProcessLockGuard,
242 generation: u64,
243}
244
245impl CrossProcessLockStoreGuardWithGeneration {
246 pub fn generation(&self) -> u64 {
248 self.generation
249 }
250}
251
252#[derive(Debug)]
263pub struct CrossSigningResetHandle {
264 client: Client,
265 upload_request: UploadSigningKeysRequest,
266 signatures_request: UploadSignaturesRequest,
267 auth_type: CrossSigningResetAuthType,
268 is_cancelled: Mutex<bool>,
269}
270
271impl CrossSigningResetHandle {
272 pub fn new(
274 client: Client,
275 upload_request: UploadSigningKeysRequest,
276 signatures_request: UploadSignaturesRequest,
277 auth_type: CrossSigningResetAuthType,
278 ) -> Self {
279 Self {
280 client,
281 upload_request,
282 signatures_request,
283 auth_type,
284 is_cancelled: Mutex::new(false),
285 }
286 }
287
288 pub fn auth_type(&self) -> &CrossSigningResetAuthType {
291 &self.auth_type
292 }
293
294 pub async fn auth(&self, auth: Option<AuthData>) -> Result<()> {
298 let mut upload_request = self.upload_request.clone();
299 upload_request.auth = auth;
300
301 while let Err(e) = self.client.send(upload_request.clone()).await {
302 if *self.is_cancelled.lock().await {
303 return Ok(());
304 }
305
306 match e.as_uiaa_response() {
307 Some(uiaa_info) => {
308 if uiaa_info.auth_error.is_some() {
309 return Err(e.into());
310 }
311 }
312 None => return Err(e.into()),
313 }
314 }
315
316 self.client.send(self.signatures_request.clone()).await?;
317
318 Ok(())
319 }
320
321 pub async fn cancel(&self) {
323 *self.is_cancelled.lock().await = true;
324 }
325}
326
327#[derive(Debug, Clone)]
330pub enum CrossSigningResetAuthType {
331 Uiaa(UiaaInfo),
333 OAuth(OAuthCrossSigningResetInfo),
336}
337
338impl CrossSigningResetAuthType {
339 fn new(error: &HttpError) -> Result<Option<Self>> {
340 if let Some(auth_info) = error.as_uiaa_response() {
341 if let Ok(Some(auth_info)) = OAuthCrossSigningResetInfo::from_auth_info(auth_info) {
342 Ok(Some(CrossSigningResetAuthType::OAuth(auth_info)))
343 } else {
344 Ok(Some(CrossSigningResetAuthType::Uiaa(auth_info.clone())))
345 }
346 } else {
347 Ok(None)
348 }
349 }
350}
351
352#[derive(Debug, Clone, Deserialize)]
355pub struct OAuthCrossSigningResetInfo {
356 pub approval_url: Url,
358}
359
360impl OAuthCrossSigningResetInfo {
361 fn from_auth_info(auth_info: &UiaaInfo) -> Result<Option<Self>> {
362 let Some(parameters) = auth_info.params::<OAuthParams>(&AuthType::OAuth)? else {
363 return Ok(None);
364 };
365
366 Ok(Some(OAuthCrossSigningResetInfo { approval_url: parameters.url.as_str().try_into()? }))
367 }
368}
369
370#[derive(Clone, Debug)]
373pub struct DuplicateOneTimeKeyErrorMessage {
374 pub old_key: Curve25519PublicKey,
376 pub new_key: Curve25519PublicKey,
378}
379
380impl FromStr for DuplicateOneTimeKeyErrorMessage {
381 type Err = serde_json::Error;
382
383 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
384 let mut split = s.split_terminator(';');
388
389 let old_key = split
390 .next()
391 .ok_or(serde_json::Error::custom("Old key is missing in the error message"))?;
392 let new_key = split
393 .next()
394 .ok_or(serde_json::Error::custom("New key is missing in the error message"))?;
395
396 let old_key_index = old_key
399 .find("Old key:")
400 .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
401
402 let old_key = old_key[old_key_index..]
403 .trim()
404 .strip_prefix("Old key:")
405 .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
406
407 let new_key = new_key
410 .trim()
411 .strip_prefix("new key:")
412 .ok_or(serde_json::Error::custom("New key is missing the prefix"))?;
413
414 let new_key = new_key.replace("'", "\"");
417
418 let old_key: SignedKey = serde_json::from_str(old_key)?;
420 let new_key: SignedKey = serde_json::from_str(&new_key)?;
421
422 let old_key = old_key.key();
424 let new_key = new_key.key();
425
426 Ok(Self { old_key, new_key })
427 }
428}
429
430impl Client {
431 pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
432 self.base_client().olm_machine().await
433 }
434
435 pub(crate) async fn mark_request_as_sent(
436 &self,
437 request_id: &TransactionId,
438 response: impl Into<matrix_sdk_base::crypto::types::requests::AnyIncomingResponse<'_>>,
439 ) -> Result<(), matrix_sdk_base::Error> {
440 Ok(self
441 .olm_machine()
442 .await
443 .as_ref()
444 .expect(
445 "We should have an olm machine once we try to mark E2EE related requests as sent",
446 )
447 .mark_request_as_sent(request_id, response)
448 .await?)
449 }
450
451 #[instrument(skip(self, device_keys))]
457 pub(crate) async fn keys_query(
458 &self,
459 request_id: &TransactionId,
460 device_keys: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
461 ) -> Result<get_keys::v3::Response> {
462 let request = assign!(get_keys::v3::Request::new(), { device_keys });
463
464 let response = self.send(request).await?;
465 self.mark_request_as_sent(request_id, &response).await?;
466 self.encryption().update_state_after_keys_query(&response).await;
467
468 Ok(response)
469 }
470
471 pub fn upload_encrypted_file<'a, R: Read + ?Sized + 'a>(
505 &'a self,
506 reader: &'a mut R,
507 ) -> UploadEncryptedFile<'a, R> {
508 UploadEncryptedFile::new(self, reader)
509 }
510
511 pub(crate) async fn upload_encrypted_media_and_thumbnail(
514 &self,
515 data: &[u8],
516 thumbnail: Option<Thumbnail>,
517 send_progress: SharedObservable<TransmissionProgress>,
518 ) -> Result<(MediaSource, Option<(MediaSource, Box<ThumbnailInfo>)>)> {
519 let upload_thumbnail = self.upload_encrypted_thumbnail(thumbnail, send_progress.clone());
520
521 let upload_attachment = async {
522 let mut cursor = Cursor::new(data);
523 self.upload_encrypted_file(&mut cursor)
524 .with_send_progress_observable(send_progress)
525 .await
526 };
527
528 let (thumbnail, file) = try_join(upload_thumbnail, upload_attachment).await?;
529
530 Ok((MediaSource::Encrypted(Box::new(file)), thumbnail))
531 }
532
533 async fn upload_encrypted_thumbnail(
536 &self,
537 thumbnail: Option<Thumbnail>,
538 send_progress: SharedObservable<TransmissionProgress>,
539 ) -> Result<Option<(MediaSource, Box<ThumbnailInfo>)>> {
540 let Some(thumbnail) = thumbnail else {
541 return Ok(None);
542 };
543
544 let (data, _, thumbnail_info) = thumbnail.into_parts();
545 let mut cursor = Cursor::new(data);
546
547 let file = self
548 .upload_encrypted_file(&mut cursor)
549 .with_send_progress_observable(send_progress)
550 .await?;
551
552 Ok(Some((MediaSource::Encrypted(Box::new(file)), thumbnail_info)))
553 }
554
555 pub(crate) async fn claim_one_time_keys(
561 &self,
562 users: impl Iterator<Item = &UserId>,
563 ) -> Result<()> {
564 let _lock = self.locks().key_claim_lock.lock().await;
565
566 if let Some((request_id, request)) = self
567 .olm_machine()
568 .await
569 .as_ref()
570 .ok_or(Error::NoOlmMachine)?
571 .get_missing_sessions(users)
572 .await?
573 {
574 let response = self.send(request).await?;
575 self.mark_request_as_sent(&request_id, &response).await?;
576 }
577
578 Ok(())
579 }
580
581 #[instrument(skip(self, request))]
591 pub(crate) async fn keys_upload(
592 &self,
593 request_id: &TransactionId,
594 request: &upload_keys::v3::Request,
595 ) -> Result<upload_keys::v3::Response> {
596 debug!(
597 device_keys = request.device_keys.is_some(),
598 one_time_key_count = request.one_time_keys.len(),
599 "Uploading public encryption keys",
600 );
601
602 let response = self.send(request.clone()).await?;
603 self.mark_request_as_sent(request_id, &response).await?;
604
605 Ok(response)
606 }
607
608 pub(crate) async fn room_send_helper(
609 &self,
610 request: &RoomMessageRequest,
611 ) -> Result<send_message_event::v3::Response> {
612 let content = request.content.clone();
613 let txn_id = request.txn_id.clone();
614 let room_id = &request.room_id;
615
616 self.get_room(room_id)
617 .expect("Can't send a message to a room that isn't known to the store")
618 .send(*content)
619 .with_transaction_id(txn_id)
620 .await
621 .map(|result| result.response)
622 }
623
624 pub(crate) async fn send_to_device(
625 &self,
626 request: &ToDeviceRequest,
627 ) -> HttpResult<ToDeviceResponse> {
628 let request = RumaToDeviceRequest::new_raw(
629 request.event_type.clone(),
630 request.txn_id.clone(),
631 request.messages.clone(),
632 );
633
634 self.send(request).await
635 }
636
637 pub(crate) async fn send_verification_request(
638 &self,
639 request: OutgoingVerificationRequest,
640 ) -> Result<()> {
641 use matrix_sdk_base::crypto::types::requests::OutgoingVerificationRequest::*;
642
643 match request {
644 ToDevice(t) => {
645 self.send_to_device(&t).await?;
646 }
647 InRoom(r) => {
648 self.room_send_helper(&r).await?;
649 }
650 }
651
652 Ok(())
653 }
654
655 async fn send_outgoing_request(&self, r: OutgoingRequest) -> Result<()> {
656 use matrix_sdk_base::crypto::types::requests::AnyOutgoingRequest;
657
658 match r.request() {
659 AnyOutgoingRequest::KeysQuery(request) => {
660 self.keys_query(r.request_id(), request.device_keys.clone()).await?;
661 }
662 AnyOutgoingRequest::KeysUpload(request) => {
663 let response = self.keys_upload(r.request_id(), request).await;
664
665 if let Err(e) = &response {
666 match e.as_ruma_api_error() {
667 Some(RumaApiError::ClientApi(e)) if e.status_code == 400 => {
668 if let ErrorBody::Standard(StandardErrorBody { message, .. }) = &e.body
669 {
670 {
675 let already_reported = self
676 .state_store()
677 .get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded)
678 .await?
679 .is_some();
680
681 if message.starts_with("One time key") && !already_reported {
682 let error_message =
683 DuplicateOneTimeKeyErrorMessage::from_str(message);
684
685 if let Ok(message) = &error_message {
686 error!(
687 sentry = true,
688 old_key = %message.old_key,
689 new_key = %message.new_key,
690 "Duplicate one-time keys have been uploaded"
691 );
692 } else {
693 error!(
694 sentry = true,
695 "Duplicate one-time keys have been uploaded"
696 );
697 }
698
699 self.state_store()
700 .set_kv_data(
701 StateStoreDataKey::OneTimeKeyAlreadyUploaded,
702 StateStoreDataValue::OneTimeKeyAlreadyUploaded,
703 )
704 .await?;
705
706 if let Err(e) = self
707 .inner
708 .duplicate_key_upload_error_sender
709 .send(error_message.ok())
710 {
711 error!(
712 "Failed to dispatch duplicate key upload error notification: {}",
713 e
714 );
715 }
716 }
717 }
718 }
719 }
720 _ => {}
721 }
722
723 response?;
724 }
725 }
726 AnyOutgoingRequest::ToDeviceRequest(request) => {
727 let response = self.send_to_device(request).await?;
728 self.mark_request_as_sent(r.request_id(), &response).await?;
729 }
730 AnyOutgoingRequest::SignatureUpload(request) => {
731 let response = self.send(request.clone()).await?;
732 self.mark_request_as_sent(r.request_id(), &response).await?;
733 }
734 AnyOutgoingRequest::RoomMessage(request) => {
735 let response = self.room_send_helper(request).await?;
736 self.mark_request_as_sent(r.request_id(), &response).await?;
737 }
738 AnyOutgoingRequest::KeysClaim(request) => {
739 let response = self.send(request.clone()).await?;
740 self.mark_request_as_sent(r.request_id(), &response).await?;
741 }
742 }
743
744 Ok(())
745 }
746
747 #[instrument(skip_all)]
748 pub(crate) async fn send_outgoing_requests(&self) -> Result<()> {
749 const MAX_CONCURRENT_REQUESTS: usize = 20;
750
751 if let Err(e) = self.claim_one_time_keys(iter::empty()).await {
754 warn!("Error while claiming one-time keys {:?}", e);
755 }
756
757 let outgoing_requests = stream::iter(
758 self.olm_machine()
759 .await
760 .as_ref()
761 .ok_or(Error::NoOlmMachine)?
762 .outgoing_requests()
763 .await?,
764 )
765 .map(|r| self.send_outgoing_request(r));
766
767 let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS);
768
769 requests
770 .for_each(|r| async move {
771 match r {
772 Ok(_) => (),
773 Err(e) => warn!(error = ?e, "Error when sending out an outgoing E2EE request"),
774 }
775 })
776 .await;
777
778 Ok(())
779 }
780}
781
782#[cfg(any(feature = "testing", test))]
783impl Client {
784 pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
786 self.olm_machine().await
787 }
788}
789
790#[derive(Debug, Clone)]
794pub struct Encryption {
795 client: Client,
797}
798
799impl Encryption {
800 pub(crate) fn new(client: Client) -> Self {
801 Self { client }
802 }
803
804 pub(crate) fn settings(&self) -> EncryptionSettings {
806 self.client.inner.e2ee.encryption_settings
807 }
808
809 pub async fn ed25519_key(&self) -> Option<String> {
812 self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().ed25519.to_base64())
813 }
814
815 pub async fn curve25519_key(&self) -> Option<Curve25519PublicKey> {
817 self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().curve25519)
818 }
819
820 pub async fn device_creation_timestamp(&self) -> MilliSecondsSinceUnixEpoch {
822 match self.get_own_device().await {
823 Ok(Some(device)) => device.first_time_seen_ts(),
824 _ => MilliSecondsSinceUnixEpoch::now(),
826 }
827 }
828
829 pub(crate) async fn import_secrets_bundle(
830 &self,
831 bundle: &matrix_sdk_base::crypto::types::SecretsBundle,
832 ) -> Result<(), SecretImportError> {
833 let olm_machine = self.client.olm_machine().await;
834 let olm_machine =
835 olm_machine.as_ref().expect("This should only be called once we have an OlmMachine");
836
837 olm_machine.store().import_secrets_bundle(bundle).await
838 }
839
840 pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
845 let olm = self.client.olm_machine().await;
846 let machine = olm.as_ref()?;
847 Some(machine.cross_signing_status().await)
848 }
849
850 pub async fn has_devices_to_verify_against(&self) -> Result<bool> {
856 let olm_machine = self.client.olm_machine().await;
857 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
858 let user_id = olm_machine.user_id();
859
860 self.ensure_initial_key_query().await?;
861
862 let devices = self.get_user_devices(user_id).await?;
863
864 let ret = devices.devices().any(|device| {
865 device.is_cross_signed_by_owner()
866 && device.curve25519_key().is_some()
867 && !device.is_dehydrated()
868 });
869
870 Ok(ret)
871 }
872
873 pub async fn tracked_users(&self) -> Result<HashSet<OwnedUserId>, CryptoStoreError> {
878 if let Some(machine) = self.client.olm_machine().await.as_ref() {
879 machine.tracked_users().await
880 } else {
881 Ok(HashSet::new())
882 }
883 }
884
885 pub fn verification_state(&self) -> Subscriber<VerificationState> {
908 self.client.inner.verification_state.subscribe_reset()
909 }
910
911 pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
913 let olm = self.client.olm_machine().await;
914 let olm = olm.as_ref()?;
915 #[allow(clippy::bind_instead_of_map)]
916 olm.get_verification(user_id, flow_id).and_then(|v| match v {
917 matrix_sdk_base::crypto::Verification::SasV1(sas) => {
918 Some(SasVerification { inner: sas, client: self.client.clone() }.into())
919 }
920 #[cfg(feature = "qrcode")]
921 matrix_sdk_base::crypto::Verification::QrV1(qr) => {
922 Some(verification::QrVerification { inner: qr, client: self.client.clone() }.into())
923 }
924 _ => None,
925 })
926 }
927
928 pub async fn get_verification_request(
931 &self,
932 user_id: &UserId,
933 flow_id: impl AsRef<str>,
934 ) -> Option<VerificationRequest> {
935 let olm = self.client.olm_machine().await;
936 let olm = olm.as_ref()?;
937
938 olm.get_verification_request(user_id, flow_id)
939 .map(|r| VerificationRequest { inner: r, client: self.client.clone() })
940 }
941
942 pub async fn get_device(
976 &self,
977 user_id: &UserId,
978 device_id: &DeviceId,
979 ) -> Result<Option<Device>, CryptoStoreError> {
980 let olm = self.client.olm_machine().await;
981 let Some(machine) = olm.as_ref() else { return Ok(None) };
982 let device = machine.get_device(user_id, device_id, None).await?;
983 Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
984 }
985
986 pub async fn get_own_device(&self) -> Result<Option<Device>, CryptoStoreError> {
993 let olm = self.client.olm_machine().await;
994 let Some(machine) = olm.as_ref() else { return Ok(None) };
995 let device = machine.get_device(machine.user_id(), machine.device_id(), None).await?;
996 Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
997 }
998
999 pub async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices, Error> {
1025 let devices = self
1026 .client
1027 .olm_machine()
1028 .await
1029 .as_ref()
1030 .ok_or(Error::NoOlmMachine)?
1031 .get_user_devices(user_id, None)
1032 .await?;
1033
1034 Ok(UserDevices { inner: devices, client: self.client.clone() })
1035 }
1036
1037 pub async fn get_user_identity(
1073 &self,
1074 user_id: &UserId,
1075 ) -> Result<Option<UserIdentity>, CryptoStoreError> {
1076 let olm = self.client.olm_machine().await;
1077 let Some(olm) = olm.as_ref() else { return Ok(None) };
1078 let identity = olm.get_identity(user_id, None).await?;
1079
1080 Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1081 }
1082
1083 pub async fn request_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
1121 let olm = self.client.olm_machine().await;
1122 let Some(olm) = olm.as_ref() else { return Ok(None) };
1123
1124 let (request_id, request) = olm.query_keys_for_users(iter::once(user_id));
1125 self.client.keys_query(&request_id, request.device_keys).await?;
1126
1127 let identity = olm.get_identity(user_id, None).await?;
1128 Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1129 }
1130
1131 pub async fn devices_stream(&self) -> Result<impl Stream<Item = DeviceUpdates> + use<>> {
1162 let olm = self.client.olm_machine().await;
1163 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1164 let client = self.client.to_owned();
1165
1166 Ok(olm
1167 .store()
1168 .devices_stream()
1169 .map(move |updates| DeviceUpdates::new(client.to_owned(), updates)))
1170 }
1171
1172 pub async fn user_identities_stream(
1200 &self,
1201 ) -> Result<impl Stream<Item = IdentityUpdates> + use<>> {
1202 let olm = self.client.olm_machine().await;
1203 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1204 let client = self.client.to_owned();
1205
1206 Ok(olm
1207 .store()
1208 .user_identities_stream()
1209 .map(move |updates| IdentityUpdates::new(client.to_owned(), updates)))
1210 }
1211
1212 pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData>) -> Result<()> {
1251 let olm = self.client.olm_machine().await;
1252 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1253
1254 let CrossSigningBootstrapRequests {
1255 upload_signing_keys_req,
1256 upload_keys_req,
1257 upload_signatures_req,
1258 } = olm.bootstrap_cross_signing(false).await?;
1259
1260 let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1261 auth: auth_data,
1262 master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1263 self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1264 user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1265 });
1266
1267 if let Some(req) = upload_keys_req {
1268 self.client.send_outgoing_request(req).await?;
1269 }
1270 self.client.send(upload_signing_keys_req).await?;
1271 self.client.send(upload_signatures_req).await?;
1272
1273 Ok(())
1274 }
1275
1276 pub async fn reset_cross_signing(&self) -> Result<Option<CrossSigningResetHandle>> {
1313 let olm = self.client.olm_machine().await;
1314 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1315
1316 let CrossSigningBootstrapRequests {
1317 upload_keys_req,
1318 upload_signing_keys_req,
1319 upload_signatures_req,
1320 } = olm.bootstrap_cross_signing(true).await?;
1321
1322 let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1323 auth: None,
1324 master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1325 self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1326 user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1327 });
1328
1329 if let Some(req) = upload_keys_req {
1330 self.client.send_outgoing_request(req).await?;
1331 }
1332
1333 if let Err(error) = self.client.send(upload_signing_keys_req.clone()).await {
1334 if let Ok(Some(auth_type)) = CrossSigningResetAuthType::new(&error) {
1335 let client = self.client.clone();
1336
1337 Ok(Some(CrossSigningResetHandle::new(
1338 client,
1339 upload_signing_keys_req,
1340 upload_signatures_req,
1341 auth_type,
1342 )))
1343 } else {
1344 Err(error.into())
1345 }
1346 } else {
1347 self.client.send(upload_signatures_req).await?;
1348
1349 Ok(None)
1350 }
1351 }
1352
1353 async fn ensure_initial_key_query(&self) -> Result<()> {
1356 let olm_machine = self.client.olm_machine().await;
1357 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1358
1359 let user_id = olm_machine.user_id();
1360
1361 if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1362 let (request_id, request) = olm_machine.query_keys_for_users([olm_machine.user_id()]);
1363 self.client.keys_query(&request_id, request.device_keys).await?;
1364 }
1365
1366 Ok(())
1367 }
1368
1369 pub async fn bootstrap_cross_signing_if_needed(
1416 &self,
1417 auth_data: Option<AuthData>,
1418 ) -> Result<()> {
1419 let olm_machine = self.client.olm_machine().await;
1420 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1421 let user_id = olm_machine.user_id();
1422
1423 self.ensure_initial_key_query().await?;
1424
1425 if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1426 self.bootstrap_cross_signing(auth_data).await?;
1427 }
1428
1429 Ok(())
1430 }
1431
1432 #[cfg(not(target_family = "wasm"))]
1484 pub async fn export_room_keys(
1485 &self,
1486 path: PathBuf,
1487 passphrase: &str,
1488 predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
1489 ) -> Result<()> {
1490 let olm = self.client.olm_machine().await;
1491 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1492
1493 let keys = olm.store().export_room_keys(predicate).await?;
1494 let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1495
1496 let encrypt = move || -> Result<()> {
1497 let export: String =
1498 matrix_sdk_base::crypto::encrypt_room_key_export(&keys, &passphrase, 500_000)?;
1499 let mut file = std::fs::File::create(path)?;
1500 file.write_all(&export.into_bytes())?;
1501 Ok(())
1502 };
1503
1504 let task = tokio::task::spawn_blocking(encrypt);
1505 task.await.expect("Task join error")
1506 }
1507
1508 #[cfg(not(target_family = "wasm"))]
1546 pub async fn import_room_keys(
1547 &self,
1548 path: PathBuf,
1549 passphrase: &str,
1550 ) -> Result<RoomKeyImportResult, RoomKeyImportError> {
1551 let olm = self.client.olm_machine().await;
1552 let olm = olm.as_ref().ok_or(RoomKeyImportError::StoreClosed)?;
1553 let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1554
1555 let decrypt = move || {
1556 let file = std::fs::File::open(path)?;
1557 matrix_sdk_base::crypto::decrypt_room_key_export(file, &passphrase)
1558 };
1559
1560 let task = tokio::task::spawn_blocking(decrypt);
1561 let import = task.await.expect("Task join error")?;
1562
1563 let ret = olm.store().import_exported_room_keys(import, |_, _| {}).await?;
1564
1565 self.backups().maybe_trigger_backup();
1566
1567 Ok(ret)
1568 }
1569
1570 pub async fn room_keys_received_stream(
1601 &self,
1602 ) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<>>
1603 {
1604 let olm = self.client.olm_machine().await;
1605 let olm = olm.as_ref()?;
1606
1607 Some(olm.store().room_keys_received_stream())
1608 }
1609
1610 pub async fn historic_room_key_stream(
1641 &self,
1642 ) -> Option<impl Stream<Item = RoomKeyBundleInfo> + use<>> {
1643 let olm = self.client.olm_machine().await;
1644 let olm = olm.as_ref()?;
1645
1646 Some(olm.store().historic_room_key_stream())
1647 }
1648
1649 pub fn secret_storage(&self) -> SecretStorage {
1651 SecretStorage { client: self.client.to_owned() }
1652 }
1653
1654 pub fn backups(&self) -> Backups {
1656 Backups { client: self.client.to_owned() }
1657 }
1658
1659 pub fn recovery(&self) -> Recovery {
1661 Recovery { client: self.client.to_owned() }
1662 }
1663
1664 pub async fn enable_cross_process_store_lock(&self, lock_value: String) -> Result<(), Error> {
1676 if let Some(prev_lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1678 let prev_holder = prev_lock.lock_holder();
1679 if prev_holder.is_some() && prev_holder.unwrap() == lock_value {
1680 return Ok(());
1681 }
1682 warn!(
1683 "Recreating cross-process store lock with a different holder value: \
1684 prev was {prev_holder:?}, new is {lock_value}"
1685 );
1686 }
1687
1688 let olm_machine = self.client.base_client().olm_machine().await;
1689 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1690
1691 let lock = olm_machine.store().create_store_lock(
1692 "cross_process_lock".to_owned(),
1693 CrossProcessLockConfig::multi_process(lock_value.to_owned()),
1694 );
1695
1696 {
1701 let lock_result = lock.try_lock_once().await?;
1702
1703 if lock_result.is_ok() {
1704 olm_machine
1705 .initialize_crypto_store_generation(
1706 &self.client.locks().crypto_store_generation,
1707 )
1708 .await?;
1709 }
1710 }
1711
1712 self.client
1713 .locks()
1714 .cross_process_crypto_store_lock
1715 .set(lock)
1716 .map_err(|_| Error::BadCryptoStoreState)?;
1717
1718 Ok(())
1719 }
1720
1721 async fn on_lock_newly_acquired(&self) -> Result<u64, Error> {
1726 let olm_machine_guard = self.client.olm_machine().await;
1727 if let Some(olm_machine) = olm_machine_guard.as_ref() {
1728 let (new_gen, generation_number) = olm_machine
1729 .maintain_crypto_store_generation(&self.client.locks().crypto_store_generation)
1730 .await?;
1731 if new_gen {
1733 drop(olm_machine_guard);
1735 self.client.base_client().regenerate_olm(None).await?;
1737 }
1738 Ok(generation_number)
1739 } else {
1740 warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised");
1745 Ok(0)
1746 }
1747 }
1748
1749 pub async fn spin_lock_store(
1755 &self,
1756 max_backoff: Option<u32>,
1757 ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1758 if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1759 let guard = lock
1760 .spin_lock(max_backoff)
1761 .await
1762 .map_err(|err| {
1763 Error::CrossProcessLockError(Box::new(CrossProcessLockError::TryLock(
1764 Box::new(err),
1765 )))
1766 })?
1767 .map_err(|err| Error::CrossProcessLockError(Box::new(err.into())))?;
1768
1769 let generation = self.on_lock_newly_acquired().await?;
1770
1771 Ok(Some(CrossProcessLockStoreGuardWithGeneration {
1772 _guard: guard.into_guard(),
1773 generation,
1774 }))
1775 } else {
1776 Ok(None)
1777 }
1778 }
1779
1780 pub async fn try_lock_store_once(
1785 &self,
1786 ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1787 if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1788 let lock_result = lock.try_lock_once().await?;
1789
1790 let Some(guard) = lock_result.ok() else {
1791 return Ok(None);
1792 };
1793
1794 let generation = self.on_lock_newly_acquired().await?;
1795
1796 Ok(Some(CrossProcessLockStoreGuardWithGeneration {
1797 _guard: guard.into_guard(),
1798 generation,
1799 }))
1800 } else {
1801 Ok(None)
1802 }
1803 }
1804
1805 #[cfg(any(test, feature = "testing"))]
1807 pub async fn uploaded_key_count(&self) -> Result<u64> {
1808 let olm_machine = self.client.olm_machine().await;
1809 let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?;
1810 Ok(olm_machine.uploaded_key_count().await?)
1811 }
1812
1813 pub(crate) async fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1837 let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite {
1841 Some(BundleReceiverTask::new(&self.client).await)
1842 } else {
1843 None
1844 };
1845
1846 let mut tasks = self.client.inner.e2ee.tasks.lock();
1847
1848 let this = self.clone();
1849
1850 tasks.setup_e2ee = Some(spawn(async move {
1851 this.update_verification_state().await;
1854
1855 if this.settings().auto_enable_cross_signing
1856 && let Err(e) = this.bootstrap_cross_signing_if_needed(auth_data).await
1857 {
1858 error!("Couldn't bootstrap cross signing {e:?}");
1859 }
1860
1861 if let Err(e) = this.backups().setup_and_resume().await {
1862 error!("Couldn't setup and resume backups {e:?}");
1863 }
1864 if let Err(e) = this.recovery().setup().await {
1865 error!("Couldn't setup and resume recovery {e:?}");
1866 }
1867 }));
1868
1869 tasks.receive_historic_room_key_bundles = bundle_receiver_task;
1870 }
1871
1872 pub async fn wait_for_e2ee_initialization_tasks(&self) {
1875 let task = self.client.inner.e2ee.tasks.lock().setup_e2ee.take();
1876
1877 if let Some(task) = task
1878 && let Err(err) = task.await
1879 {
1880 warn!("Error when initializing backups: {err}");
1881 }
1882 }
1883
1884 pub(crate) async fn ensure_device_keys_upload(&self) -> Result<()> {
1894 let olm = self.client.olm_machine().await;
1895 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1896
1897 if let Some((request_id, request)) = olm.upload_device_keys().await? {
1898 self.client.keys_upload(&request_id, &request).await?;
1899
1900 let (request_id, request) = olm.query_keys_for_users([olm.user_id()]);
1901 self.client.keys_query(&request_id, request.device_keys).await?;
1902 }
1903
1904 Ok(())
1905 }
1906
1907 pub(crate) async fn update_state_after_keys_query(&self, response: &get_keys::v3::Response) {
1908 self.recovery().update_state_after_keys_query(response).await;
1909
1910 if let Some(user_id) = self.client.user_id() {
1912 let contains_own_device = response.device_keys.contains_key(user_id);
1913
1914 if contains_own_device {
1915 self.update_verification_state().await;
1916 }
1917 }
1918 }
1919
1920 async fn update_verification_state(&self) {
1921 match self.get_own_device().await {
1922 Ok(device) => {
1923 if let Some(device) = device {
1924 let is_verified = device.is_cross_signed_by_owner();
1925
1926 if is_verified {
1927 self.client.inner.verification_state.set(VerificationState::Verified);
1928 } else {
1929 self.client.inner.verification_state.set(VerificationState::Unverified);
1930 }
1931 } else {
1932 warn!("Couldn't find out own device in the store.");
1933 self.client.inner.verification_state.set(VerificationState::Unknown);
1934 }
1935 }
1936 Err(error) => {
1937 warn!("Failed retrieving own device: {error}");
1938 self.client.inner.verification_state.set(VerificationState::Unknown);
1939 }
1940 }
1941 }
1942
1943 #[cfg(feature = "experimental-send-custom-to-device")]
1952 pub async fn encrypt_and_send_raw_to_device(
1953 &self,
1954 recipient_devices: Vec<&Device>,
1955 event_type: &str,
1956 content: Raw<AnyToDeviceEventContent>,
1957 share_strategy: CollectStrategy,
1958 ) -> Result<Vec<(OwnedUserId, OwnedDeviceId)>> {
1959 let users = recipient_devices.iter().map(|device| device.user_id());
1960
1961 self.client.claim_one_time_keys(users).await?;
1965
1966 let olm = self.client.olm_machine().await;
1967 let olm = olm.as_ref().expect("Olm machine wasn't started");
1968
1969 let (requests, withhelds) = olm
1970 .encrypt_content_for_devices(
1971 recipient_devices.into_iter().map(|d| d.deref().clone()).collect(),
1972 event_type,
1973 &content
1974 .deserialize_as::<serde_json::Value>()
1975 .expect("Deserialize as Value will always work"),
1976 share_strategy,
1977 )
1978 .await?;
1979
1980 let mut failures: Vec<(OwnedUserId, OwnedDeviceId)> = Default::default();
1981
1982 withhelds.iter().for_each(|(d, _)| {
1984 failures.push((d.user_id().to_owned(), d.device_id().to_owned()));
1985 });
1986
1987 for request in requests {
1989 let ruma_request = RumaToDeviceRequest::new_raw(
1990 request.event_type.clone(),
1991 request.txn_id.clone(),
1992 request.messages.clone(),
1993 );
1994
1995 let send_result = self
1996 .client
1997 .send_inner(ruma_request, Some(RequestConfig::short_retry()), Default::default())
1998 .await;
1999
2000 if send_result.is_err() {
2002 for (user_id, device_map) in request.messages {
2004 for device_id in device_map.keys() {
2005 match device_id {
2006 DeviceIdOrAllDevices::DeviceId(device_id) => {
2007 failures.push((user_id.clone(), device_id.to_owned()));
2008 }
2009 DeviceIdOrAllDevices::AllDevices => {
2010 }
2012 }
2013 }
2014 }
2015 }
2016 }
2017
2018 Ok(failures)
2019 }
2020}
2021
2022#[cfg(all(test, not(target_family = "wasm")))]
2023mod tests {
2024 use std::{
2025 ops::Not,
2026 str::FromStr,
2027 sync::{
2028 Arc,
2029 atomic::{AtomicBool, Ordering},
2030 },
2031 time::Duration,
2032 };
2033
2034 use matrix_sdk_test::{
2035 DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
2036 event_factory::EventFactory,
2037 };
2038 use ruma::{
2039 event_id,
2040 events::{reaction::ReactionEventContent, relation::Annotation},
2041 user_id,
2042 };
2043 use serde_json::json;
2044 use wiremock::{
2045 Mock, MockServer, Request, ResponseTemplate,
2046 matchers::{header, method, path_regex},
2047 };
2048
2049 use crate::{
2050 Client, assert_next_matches_with_timeout,
2051 config::RequestConfig,
2052 encryption::{
2053 DuplicateOneTimeKeyErrorMessage, OAuthCrossSigningResetInfo, VerificationState,
2054 },
2055 test_utils::{
2056 client::mock_matrix_session, logged_in_client, no_retry_test_client, set_client_session,
2057 },
2058 };
2059
2060 #[async_test]
2061 async fn test_reaction_sending() {
2062 let server = MockServer::start().await;
2063 let client = logged_in_client(Some(server.uri())).await;
2064
2065 let event_id = event_id!("$2:example.org");
2066
2067 Mock::given(method("GET"))
2068 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
2069 .and(header("authorization", "Bearer 1234"))
2070 .respond_with(
2071 ResponseTemplate::new(200)
2072 .set_body_json(EventFactory::new().room_encryption().into_content()),
2073 )
2074 .mount(&server)
2075 .await;
2076
2077 Mock::given(method("PUT"))
2078 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/m\.reaction/.*".to_owned()))
2079 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2080 "event_id": event_id,
2081 })))
2082 .mount(&server)
2083 .await;
2084
2085 let f = EventFactory::new().sender(user_id!("@example:localhost"));
2086 let response = SyncResponseBuilder::default()
2087 .add_joined_room(
2088 JoinedRoomBuilder::default()
2089 .add_state_event(
2090 f.member(user_id!("@example:localhost")).display_name("example"),
2091 )
2092 .add_state_event(f.default_power_levels())
2093 .add_state_event(f.room_encryption()),
2094 )
2095 .build_sync_response();
2096
2097 client.base_client().receive_sync_response(response).await.unwrap();
2098
2099 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
2100 assert!(
2101 room.latest_encryption_state().await.expect("Getting encryption state").is_encrypted()
2102 );
2103
2104 let event_id = event_id!("$1:example.org");
2105 let reaction = ReactionEventContent::new(Annotation::new(event_id.into(), "🐈".to_owned()));
2106 room.send(reaction).await.expect("Sending the reaction should not fail");
2107
2108 room.send_raw("m.reaction", json!({})).await.expect("Sending the reaction should not fail");
2109 }
2110
2111 #[cfg(feature = "sqlite")]
2112 #[async_test]
2113 async fn test_generation_counter_invalidates_olm_machine() {
2114 use matrix_sdk_base::store::RoomLoadSettings;
2117 let sqlite_path = std::env::temp_dir().join("generation_counter_sqlite.db");
2118 let session = mock_matrix_session();
2119
2120 let client1 = Client::builder()
2121 .homeserver_url("http://localhost:1234")
2122 .request_config(RequestConfig::new().disable_retry())
2123 .sqlite_store(&sqlite_path, None)
2124 .build()
2125 .await
2126 .unwrap();
2127 client1
2128 .matrix_auth()
2129 .restore_session(session.clone(), RoomLoadSettings::default())
2130 .await
2131 .unwrap();
2132
2133 let client2 = Client::builder()
2134 .homeserver_url("http://localhost:1234")
2135 .request_config(RequestConfig::new().disable_retry())
2136 .sqlite_store(sqlite_path, None)
2137 .build()
2138 .await
2139 .unwrap();
2140 client2.matrix_auth().restore_session(session, RoomLoadSettings::default()).await.unwrap();
2141
2142 let guard = client1.encryption().try_lock_store_once().await.unwrap();
2144 assert!(guard.is_none());
2145
2146 client1.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2147 client2.encryption().enable_cross_process_store_lock("client2".to_owned()).await.unwrap();
2148
2149 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2151 assert!(acquired1.is_some());
2152
2153 let initial_olm_machine =
2155 client1.olm_machine().await.clone().expect("must have an olm machine");
2156
2157 let decryption_key = matrix_sdk_base::crypto::store::types::BackupDecryptionKey::new()
2159 .expect("Can't create new recovery key");
2160 let backup_key = decryption_key.megolm_v1_public_key();
2161 backup_key.set_version("1".to_owned());
2162 initial_olm_machine
2163 .backup_machine()
2164 .save_decryption_key(Some(decryption_key.to_owned()), Some("1".to_owned()))
2165 .await
2166 .expect("Should save");
2167
2168 initial_olm_machine.backup_machine().enable_backup_v1(backup_key.clone()).await.unwrap();
2169
2170 assert!(client1.encryption().backups().are_enabled().await);
2171
2172 let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2174 assert!(acquired2.is_none());
2175
2176 drop(acquired1);
2178 tokio::time::sleep(Duration::from_millis(100)).await;
2179
2180 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2182 assert!(acquired1.is_some());
2183
2184 let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2186 assert!(initial_olm_machine.same_as(&olm_machine));
2187
2188 drop(acquired1);
2190 tokio::time::sleep(Duration::from_millis(100)).await;
2191
2192 let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2194 assert!(acquired2.is_some());
2195
2196 drop(acquired2);
2198 tokio::time::sleep(Duration::from_millis(100)).await;
2199
2200 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2202 assert!(acquired1.is_some());
2203
2204 let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2206
2207 assert!(!initial_olm_machine.same_as(&olm_machine));
2208
2209 let backup_key_new = olm_machine.backup_machine().get_backup_keys().await.unwrap();
2210 assert!(backup_key_new.decryption_key.is_some());
2211 assert_eq!(
2212 backup_key_new.decryption_key.unwrap().megolm_v1_public_key().to_base64(),
2213 backup_key.to_base64()
2214 );
2215 assert!(client1.encryption().backups().are_enabled().await);
2216 }
2217
2218 #[cfg(feature = "sqlite")]
2219 #[async_test]
2220 async fn test_generation_counter_no_spurious_invalidation() {
2221 use matrix_sdk_base::store::RoomLoadSettings;
2224 let sqlite_path =
2225 std::env::temp_dir().join("generation_counter_no_spurious_invalidations.db");
2226 let session = mock_matrix_session();
2227
2228 let client = Client::builder()
2229 .homeserver_url("http://localhost:1234")
2230 .request_config(RequestConfig::new().disable_retry())
2231 .sqlite_store(&sqlite_path, None)
2232 .build()
2233 .await
2234 .unwrap();
2235 client
2236 .matrix_auth()
2237 .restore_session(session.clone(), RoomLoadSettings::default())
2238 .await
2239 .unwrap();
2240
2241 let initial_olm_machine = client.olm_machine().await.as_ref().unwrap().clone();
2242
2243 client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2244
2245 let after_enabling_lock = client.olm_machine().await.as_ref().unwrap().clone();
2247 assert!(initial_olm_machine.same_as(&after_enabling_lock));
2248
2249 {
2250 let client2 = Client::builder()
2252 .homeserver_url("http://localhost:1234")
2253 .request_config(RequestConfig::new().disable_retry())
2254 .sqlite_store(sqlite_path, None)
2255 .build()
2256 .await
2257 .unwrap();
2258 client2
2259 .matrix_auth()
2260 .restore_session(session, RoomLoadSettings::default())
2261 .await
2262 .unwrap();
2263
2264 client2
2265 .encryption()
2266 .enable_cross_process_store_lock("client2".to_owned())
2267 .await
2268 .unwrap();
2269
2270 let guard = client2.encryption().spin_lock_store(None).await.unwrap();
2271 assert!(guard.is_some());
2272
2273 drop(guard);
2274 tokio::time::sleep(Duration::from_millis(100)).await;
2275 }
2276
2277 {
2278 let acquired = client.encryption().try_lock_store_once().await.unwrap();
2279 assert!(acquired.is_some());
2280 }
2281
2282 let after_taking_lock_first_time = client.olm_machine().await.as_ref().unwrap().clone();
2284 assert!(!initial_olm_machine.same_as(&after_taking_lock_first_time));
2285
2286 {
2287 let acquired = client.encryption().try_lock_store_once().await.unwrap();
2288 assert!(acquired.is_some());
2289 }
2290
2291 let after_taking_lock_second_time = client.olm_machine().await.as_ref().unwrap().clone();
2293 assert!(after_taking_lock_first_time.same_as(&after_taking_lock_second_time));
2294 }
2295
2296 #[async_test]
2297 async fn test_update_verification_state_is_updated_before_any_requests_happen() {
2298 let client = no_retry_test_client(None).await;
2300 let server = MockServer::start().await;
2301
2302 let mut verification_state = client.encryption().verification_state();
2304
2305 assert_next_matches_with_timeout!(verification_state, VerificationState::Unknown);
2307
2308 let keys_requested = Arc::new(AtomicBool::new(false));
2311 let inner_bool = keys_requested.clone();
2312
2313 Mock::given(method("GET"))
2314 .and(path_regex(
2315 r"/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
2316 ))
2317 .respond_with(move |_req: &Request| {
2318 inner_bool.fetch_or(true, Ordering::SeqCst);
2319 ResponseTemplate::new(200).set_body_json(json!({}))
2320 })
2321 .mount(&server)
2322 .await;
2323
2324 set_client_session(&client).await;
2326
2327 assert!(keys_requested.load(Ordering::SeqCst).not());
2329 assert_next_matches_with_timeout!(verification_state, VerificationState::Unverified);
2330 }
2331
2332 #[test]
2333 fn test_oauth_reset_info_from_uiaa_info() {
2334 let auth_info = json!({
2335 "session": "dummy",
2336 "flows": [
2337 {
2338 "stages": [
2339 "org.matrix.cross_signing_reset"
2340 ]
2341 }
2342 ],
2343 "params": {
2344 "org.matrix.cross_signing_reset": {
2345 "url": "https://example.org/account/account?action=org.matrix.cross_signing_reset"
2346 }
2347 },
2348 "msg": "To reset..."
2349 });
2350
2351 let auth_info = serde_json::from_value(auth_info)
2352 .expect("We should be able to deserialize the UiaaInfo");
2353 OAuthCrossSigningResetInfo::from_auth_info(&auth_info)
2354 .expect("We should be able to fetch the cross-signing reset info from the auth info");
2355 }
2356
2357 #[test]
2358 fn test_duplicate_one_time_key_error_parsing() {
2359 let message = concat!(
2360 r#"One time key signed_curve25519:AAAAAAAAAAA already exists. "#,
2361 r#"Old key: {"key":"dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs","#,
2362 r#""signatures":{"@example:matrix.org":{"ed25519:AAAAAAAAAA":""#,
2363 r#"Fk45zHAbrd+1j9wZXLjL2Y/+DU/Mnz9yuvlfYBOOT7qExN2Jdud+5BAuNs8nZ/caS4wTF39Kg3zQpzaGERoCBg"}}};"#,
2364 r#" new key: {'key': 'CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno', "#,
2365 r#"'signatures': {'@example:matrix.org': {'ed25519:AAAAAAAAAA': "#,
2366 r#"'BQ9Gp0p+6srF+c8OyruqKKd9R4yaub3THYAyyBB/7X/rG8BwcAqFynzl1aGyFYun4Q+087a5OSiglCXI+/kQAA'}}}"#
2367 );
2368 let message = DuplicateOneTimeKeyErrorMessage::from_str(message)
2369 .expect("We should be able to parse the error message");
2370
2371 assert_eq!(message.old_key.to_base64(), "dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs");
2372 assert_eq!(message.new_key.to_base64(), "CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno");
2373
2374 DuplicateOneTimeKeyErrorMessage::from_str("One time key already exists.")
2375 .expect_err("We shouldn't be able to parse an incomplete error message");
2376 }
2377
2378 fn devices_to_verify_against_keys_query_response(
2382 devices: Vec<serde_json::Value>,
2383 ) -> serde_json::Value {
2384 let device_keys: serde_json::Map<String, serde_json::Value> = devices
2385 .into_iter()
2386 .map(|device| (device.get("device_id").unwrap().as_str().unwrap().to_owned(), device))
2387 .collect();
2388 json!({
2389 "device_keys": {
2390 "@example:localhost": device_keys,
2391 },
2392 "master_keys": {
2393 "@example:localhost": {
2394 "keys": {
2395 "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU",
2396 },
2397 "usage": ["master"],
2398 "user_id": "@example:localhost",
2399 },
2400 },
2401 "self_signing_keys": {
2402 "@example:localhost": {
2403 "keys": {
2404 "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM",
2405 },
2406 "usage": ["self_signing"],
2407 "user_id": "@example:localhost",
2408 "signatures": {
2409 "@example:localhost": {
2410 "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "etO1bB+rCk+TQ/FcjQ8eWu/RsRNQNNQ1Ek+PD6//j8yz6igRjfvuHZaMvr/quAFrirfgExph2TdOwlDgN5bFCQ",
2411 },
2412 },
2413 },
2414 },
2415 "user_signing_keys": {
2416 "@example:localhost": {
2417 "keys": {
2418 "ed25519:CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0": "CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0",
2419 },
2420 "usage": ["user_signing"],
2421 "user_id": "@example:localhost",
2422 "signatures": {
2423 "@example:localhost": {
2424 "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "E/DFi/hQTIb/7eSB+HbCXeTLFaLjqWHzLO9GwjL1qdhfO7ew4p6YdtXSH3T2YYr1dKCPteH/4nMYVwOhww2CBg",
2425 },
2426 },
2427 },
2428 }
2429 })
2430 }
2431
2432 #[async_test]
2435 async fn test_devices_to_verify_against_no_devices() {
2438 let server = MockServer::start().await;
2439 let client = logged_in_client(Some(server.uri())).await;
2440
2441 Mock::given(method("POST"))
2442 .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2443 .respond_with(
2444 ResponseTemplate::new(200)
2445 .set_body_json(devices_to_verify_against_keys_query_response(vec![])),
2446 )
2447 .mount(&server)
2448 .await;
2449
2450 assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2451 }
2452
2453 #[async_test]
2454 async fn test_devices_to_verify_against_cross_signed() {
2457 let server = MockServer::start().await;
2458 let client = logged_in_client(Some(server.uri())).await;
2459
2460 Mock::given(method("POST"))
2461 .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2462 .respond_with(ResponseTemplate::new(200).set_body_json(
2463 devices_to_verify_against_keys_query_response(vec![
2464 json!({
2465 "algorithms": [
2466 "m.olm.v1.curve25519-aes-sha2",
2467 "m.megolm.v1.aes-sha2",
2468 ],
2469 "user_id": "@example:localhost",
2470 "device_id": "SIGNEDDEVICE",
2471 "keys": {
2472 "curve25519:SIGNEDDEVICE": "o1LqUtH/sqd3WF+BB2Qr77uw3sDmZhMOz68/IV9aHxs",
2473 "ed25519:SIGNEDDEVICE": "iVoEfMOoUqxXVMLdpZCOgvQuCrT3/kQWkBmB3Phi/lo",
2474 },
2475 "signatures": {
2476 "@example:localhost": {
2477 "ed25519:SIGNEDDEVICE": "C7yRu1fNrdD2EobVdtANMqk3LBtWtTRWrIU22xVS8/Om1kmA/luzek64R3N6JsZhYczVmZYBKhUC9kRvHHwOBg",
2478 "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "frfh2HP28GclmGvwTic00Fj4nZCvm4RlRA6U56mnD5920hOi04+L055ojzp6ybZXvC/GQYfyTHwQXlUN1nvxBA",
2479 },
2480 },
2481 })
2482 ])
2483 ))
2484 .mount(&server)
2485 .await;
2486
2487 assert!(client.encryption().has_devices_to_verify_against().await.unwrap());
2488 }
2489
2490 #[async_test]
2491 async fn test_devices_to_verify_against_dehydrated_and_unsigned() {
2494 let server = MockServer::start().await;
2495 let client = logged_in_client(Some(server.uri())).await;
2496 let user_id = client.user_id().unwrap();
2497 let olm_machine = client.olm_machine().await;
2498 let olm_machine = olm_machine.as_ref().unwrap();
2499
2500 Mock::given(method("POST"))
2501 .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2502 .respond_with(ResponseTemplate::new(200).set_body_json(
2503 devices_to_verify_against_keys_query_response(vec![
2504 json!({
2505 "algorithms": [
2506 "m.olm.v1.curve25519-aes-sha2",
2507 "m.megolm.v1.aes-sha2",
2508 ],
2509 "user_id": "@example:localhost",
2510 "device_id": "DEHYDRATEDDEVICE",
2511 "keys": {
2512 "curve25519:DEHYDRATEDDEVICE": "XOn5VguAgokZ3p9mBz2yOB395fn6j75G8jIPcXEWQGY",
2513 "ed25519:DEHYDRATEDDEVICE": "4GG5xmBT7z4rgUgmWNlKZ+ABE3QlGgTorF+luCnKfYI",
2514 },
2515 "dehydrated": true,
2516 "signatures": {
2517 "@example:localhost": {
2518 "ed25519:DEHYDRATEDDEVICE": "+OMasB7nzVlMV+zRDxkh4h8h/Q0bY42P1SPv7X2IURIelT5G+d+AYSmg30N4maphxEDBqt/vI8/lIr71exc3Dg",
2519 "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "8DzynAgbYgXX1Md5d4Vw91Zstpoi4dpG7levFeVhi4psCAWuBnV76Qu1s2TGjQQ0CLDXEqcxxuX9X4eUK5TGCg",
2520 },
2521 },
2522 }),
2523 json!({
2524 "algorithms": [
2525 "m.olm.v1.curve25519-aes-sha2",
2526 "m.megolm.v1.aes-sha2",
2527 ],
2528 "user_id": "@example:localhost",
2529 "device_id": "UNSIGNEDDEVICE",
2530 "keys": {
2531 "curve25519:UNSIGNEDDEVICE": "mMby6NpprkHxj+ONfO9Z5lBqVUHJBMkrPFSNJhogBkg",
2532 "ed25519:UNSIGNEDDEVICE": "Zifq39ZDrlIaSRf0Hh22owEqXCPE+1JSSgs6LDlubwQ",
2533 },
2534 "signatures": {
2535 "@example:localhost": {
2536 "ed25519:UNSIGNEDDEVICE": "+L29RoDKoTufPGm/Bae65KHno7Z1H7GYhxSKpB4RQZRS7NrR29AMW1PVhEsIozYuDVEFuMZ0L8H3dlcaHxagBA",
2537 },
2538 },
2539 }),
2540 ])
2541 ))
2542 .mount(&server)
2543 .await;
2544
2545 let (request_id, request) = olm_machine.query_keys_for_users([user_id]);
2546 client.keys_query(&request_id, request.device_keys).await.unwrap();
2547
2548 assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2549 }
2550}