1#![doc = include_str!("../docs/encryption.md")]
17#![cfg_attr(target_family = "wasm", allow(unused_imports))]
18
19#[cfg(feature = "experimental-send-custom-to-device")]
20use std::ops::Deref;
21use std::{
22 collections::{BTreeMap, HashSet},
23 io::{Cursor, Read, Write},
24 iter,
25 path::PathBuf,
26 str::FromStr,
27 sync::Arc,
28};
29
30use eyeball::{SharedObservable, Subscriber};
31use futures_core::Stream;
32use futures_util::{
33 future::try_join,
34 stream::{self, StreamExt},
35};
36#[cfg(feature = "experimental-send-custom-to-device")]
37use matrix_sdk_base::crypto::CollectStrategy;
38use matrix_sdk_base::{
39 StateStoreDataKey, StateStoreDataValue,
40 crypto::{
41 CrossSigningBootstrapRequests, OlmMachine,
42 store::types::{RoomKeyBundleInfo, RoomKeyInfo},
43 types::{
44 SignedKey,
45 requests::{
46 OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
47 },
48 },
49 },
50};
51use matrix_sdk_common::{executor::spawn, locks::Mutex as StdMutex};
52use ruma::{
53 DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, TransactionId, UserId,
54 api::client::{
55 error::ErrorBody,
56 keys::{
57 get_keys, upload_keys, upload_signatures::v3::Request as UploadSignaturesRequest,
58 upload_signing_keys::v3::Request as UploadSigningKeysRequest,
59 },
60 message::send_message_event,
61 to_device::send_event_to_device::v3::{
62 Request as RumaToDeviceRequest, Response as ToDeviceResponse,
63 },
64 uiaa::{AuthData, UiaaInfo},
65 },
66 assign,
67 events::{
68 direct::DirectUserIdentifier,
69 room::{MediaSource, ThumbnailInfo},
70 },
71};
72#[cfg(feature = "experimental-send-custom-to-device")]
73use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices};
74use serde::{Deserialize, de::Error as _};
75use tasks::BundleReceiverTask;
76use tokio::sync::{Mutex, RwLockReadGuard};
77use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
78use tracing::{debug, error, instrument, trace, warn};
79use url::Url;
80use vodozemac::Curve25519PublicKey;
81
82use self::{
83 backups::{Backups, types::BackupClientState},
84 futures::UploadEncryptedFile,
85 identities::{Device, DeviceUpdates, IdentityUpdates, UserDevices, UserIdentity},
86 recovery::{Recovery, RecoveryState},
87 secret_storage::SecretStorage,
88 tasks::{BackupDownloadTask, BackupUploadingTask, ClientTasks},
89 verification::{SasVerification, Verification, VerificationRequest},
90};
91use crate::{
92 Client, Error, HttpError, Result, Room, RumaApiError, TransmissionProgress,
93 attachment::Thumbnail,
94 client::{ClientInner, WeakClient},
95 cross_process_lock::CrossProcessLockGuard,
96 error::HttpResult,
97};
98
99pub mod backups;
100pub mod futures;
101pub mod identities;
102pub mod recovery;
103pub mod secret_storage;
104pub(crate) mod tasks;
105pub mod verification;
106
107pub use matrix_sdk_base::crypto::{
108 CrossSigningStatus, CryptoStoreError, DecryptorError, EventError, KeyExportError, LocalTrust,
109 MediaEncryptionInfo, MegolmError, OlmError, RoomKeyImportResult, SecretImportError,
110 SessionCreationError, SignatureError, VERSION,
111 olm::{
112 SessionCreationError as MegolmSessionCreationError,
113 SessionExportError as OlmSessionExportError,
114 },
115 vodozemac,
116};
117
118#[cfg(feature = "experimental-send-custom-to-device")]
119use crate::config::RequestConfig;
120pub use crate::error::RoomKeyImportError;
121
122pub(crate) struct EncryptionData {
124 pub tasks: StdMutex<ClientTasks>,
127
128 pub encryption_settings: EncryptionSettings,
130
131 pub backup_state: BackupClientState,
133
134 pub recovery_state: SharedObservable<RecoveryState>,
136}
137
138impl EncryptionData {
139 pub fn new(encryption_settings: EncryptionSettings) -> Self {
140 Self {
141 encryption_settings,
142
143 tasks: StdMutex::new(Default::default()),
144 backup_state: Default::default(),
145 recovery_state: Default::default(),
146 }
147 }
148
149 pub fn initialize_tasks(&self, client: &Arc<ClientInner>) {
150 let weak_client = WeakClient::from_inner(client);
151
152 let mut tasks = self.tasks.lock();
153 tasks.upload_room_keys = Some(BackupUploadingTask::new(weak_client.clone()));
154
155 if self.encryption_settings.backup_download_strategy
156 == BackupDownloadStrategy::AfterDecryptionFailure
157 {
158 tasks.download_room_keys = Some(BackupDownloadTask::new(weak_client));
159 }
160 }
161
162 pub fn initialize_recovery_state_update_task(&self, client: &Client) {
168 let mut guard = self.tasks.lock();
169
170 let future = Recovery::update_state_after_backup_state_change(client);
171 let join_handle = spawn(future);
172
173 guard.update_recovery_state_after_backup = Some(join_handle);
174 }
175}
176
177#[derive(Clone, Copy, Debug, Default)]
179pub struct EncryptionSettings {
180 pub auto_enable_cross_signing: bool,
186
187 pub backup_download_strategy: BackupDownloadStrategy,
192
193 pub auto_enable_backups: bool,
195}
196
197#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
199#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
200pub enum BackupDownloadStrategy {
201 OneShot,
212
213 AfterDecryptionFailure,
215
216 #[default]
221 Manual,
222}
223
224#[derive(Clone, Copy, Debug, Eq, PartialEq)]
229pub enum VerificationState {
230 Unknown,
232 Verified,
235 Unverified,
237}
238
239#[derive(Debug)]
241pub struct CrossProcessLockStoreGuardWithGeneration {
242 _guard: CrossProcessLockGuard,
243 generation: u64,
244}
245
246impl CrossProcessLockStoreGuardWithGeneration {
247 pub fn generation(&self) -> u64 {
249 self.generation
250 }
251}
252
253#[derive(Debug)]
264pub struct CrossSigningResetHandle {
265 client: Client,
266 upload_request: UploadSigningKeysRequest,
267 signatures_request: UploadSignaturesRequest,
268 auth_type: CrossSigningResetAuthType,
269 is_cancelled: Mutex<bool>,
270}
271
272impl CrossSigningResetHandle {
273 pub fn new(
275 client: Client,
276 upload_request: UploadSigningKeysRequest,
277 signatures_request: UploadSignaturesRequest,
278 auth_type: CrossSigningResetAuthType,
279 ) -> Self {
280 Self {
281 client,
282 upload_request,
283 signatures_request,
284 auth_type,
285 is_cancelled: Mutex::new(false),
286 }
287 }
288
289 pub fn auth_type(&self) -> &CrossSigningResetAuthType {
292 &self.auth_type
293 }
294
295 pub async fn auth(&self, auth: Option<AuthData>) -> Result<()> {
299 let mut upload_request = self.upload_request.clone();
300 upload_request.auth = auth;
301
302 while let Err(e) = self.client.send(upload_request.clone()).await {
303 if *self.is_cancelled.lock().await {
304 return Ok(());
305 }
306
307 match e.as_uiaa_response() {
308 Some(uiaa_info) => {
309 if uiaa_info.auth_error.is_some() {
310 return Err(e.into());
311 }
312 }
313 None => return Err(e.into()),
314 }
315 }
316
317 self.client.send(self.signatures_request.clone()).await?;
318
319 Ok(())
320 }
321
322 pub async fn cancel(&self) {
324 *self.is_cancelled.lock().await = true;
325 }
326}
327
328#[derive(Debug, Clone)]
331pub enum CrossSigningResetAuthType {
332 Uiaa(UiaaInfo),
334 OAuth(OAuthCrossSigningResetInfo),
337}
338
339impl CrossSigningResetAuthType {
340 fn new(error: &HttpError) -> Result<Option<Self>> {
341 if let Some(auth_info) = error.as_uiaa_response() {
342 if let Ok(auth_info) = OAuthCrossSigningResetInfo::from_auth_info(auth_info) {
343 Ok(Some(CrossSigningResetAuthType::OAuth(auth_info)))
344 } else {
345 Ok(Some(CrossSigningResetAuthType::Uiaa(auth_info.clone())))
346 }
347 } else {
348 Ok(None)
349 }
350 }
351}
352
353#[derive(Debug, Clone, Deserialize)]
356pub struct OAuthCrossSigningResetInfo {
357 pub approval_url: Url,
359}
360
361impl OAuthCrossSigningResetInfo {
362 fn from_auth_info(auth_info: &UiaaInfo) -> Result<Self> {
363 let parameters = serde_json::from_str::<OAuthCrossSigningResetUiaaParameters>(
364 auth_info.params.as_ref().map(|value| value.get()).unwrap_or_default(),
365 )?;
366
367 Ok(OAuthCrossSigningResetInfo { approval_url: parameters.reset.url })
368 }
369}
370
371#[derive(Debug, Deserialize)]
374struct OAuthCrossSigningResetUiaaParameters {
375 #[serde(rename = "org.matrix.cross_signing_reset")]
377 reset: OAuthCrossSigningResetUiaaResetParameter,
378}
379
380#[derive(Debug, Deserialize)]
383struct OAuthCrossSigningResetUiaaResetParameter {
384 url: Url,
386}
387
388#[derive(Debug)]
391struct DuplicateOneTimeKeyErrorMessage {
392 old_key: Curve25519PublicKey,
394 new_key: Curve25519PublicKey,
396}
397
398impl FromStr for DuplicateOneTimeKeyErrorMessage {
399 type Err = serde_json::Error;
400
401 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
402 let mut split = s.split_terminator(';');
406
407 let old_key = split
408 .next()
409 .ok_or(serde_json::Error::custom("Old key is missing in the error message"))?;
410 let new_key = split
411 .next()
412 .ok_or(serde_json::Error::custom("New key is missing in the error message"))?;
413
414 let old_key_index = old_key
417 .find("Old key:")
418 .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
419
420 let old_key = old_key[old_key_index..]
421 .trim()
422 .strip_prefix("Old key:")
423 .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
424
425 let new_key = new_key
428 .trim()
429 .strip_prefix("new key:")
430 .ok_or(serde_json::Error::custom("New key is missing the prefix"))?;
431
432 let new_key = new_key.replace("'", "\"");
435
436 let old_key: SignedKey = serde_json::from_str(old_key)?;
438 let new_key: SignedKey = serde_json::from_str(&new_key)?;
439
440 let old_key = old_key.key();
442 let new_key = new_key.key();
443
444 Ok(Self { old_key, new_key })
445 }
446}
447
448impl Client {
449 pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
450 self.base_client().olm_machine().await
451 }
452
453 pub(crate) async fn mark_request_as_sent(
454 &self,
455 request_id: &TransactionId,
456 response: impl Into<matrix_sdk_base::crypto::types::requests::AnyIncomingResponse<'_>>,
457 ) -> Result<(), matrix_sdk_base::Error> {
458 Ok(self
459 .olm_machine()
460 .await
461 .as_ref()
462 .expect(
463 "We should have an olm machine once we try to mark E2EE related requests as sent",
464 )
465 .mark_request_as_sent(request_id, response)
466 .await?)
467 }
468
469 #[instrument(skip(self, device_keys))]
475 pub(crate) async fn keys_query(
476 &self,
477 request_id: &TransactionId,
478 device_keys: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
479 ) -> Result<get_keys::v3::Response> {
480 let request = assign!(get_keys::v3::Request::new(), { device_keys });
481
482 let response = self.send(request).await?;
483 self.mark_request_as_sent(request_id, &response).await?;
484 self.encryption().update_state_after_keys_query(&response).await;
485
486 Ok(response)
487 }
488
489 pub fn upload_encrypted_file<'a, R: Read + ?Sized + 'a>(
523 &'a self,
524 reader: &'a mut R,
525 ) -> UploadEncryptedFile<'a, R> {
526 UploadEncryptedFile::new(self, reader)
527 }
528
529 pub(crate) async fn upload_encrypted_media_and_thumbnail(
532 &self,
533 data: &[u8],
534 thumbnail: Option<Thumbnail>,
535 send_progress: SharedObservable<TransmissionProgress>,
536 ) -> Result<(MediaSource, Option<(MediaSource, Box<ThumbnailInfo>)>)> {
537 let upload_thumbnail = self.upload_encrypted_thumbnail(thumbnail, send_progress.clone());
538
539 let upload_attachment = async {
540 let mut cursor = Cursor::new(data);
541 self.upload_encrypted_file(&mut cursor)
542 .with_send_progress_observable(send_progress)
543 .await
544 };
545
546 let (thumbnail, file) = try_join(upload_thumbnail, upload_attachment).await?;
547
548 Ok((MediaSource::Encrypted(Box::new(file)), thumbnail))
549 }
550
551 async fn upload_encrypted_thumbnail(
554 &self,
555 thumbnail: Option<Thumbnail>,
556 send_progress: SharedObservable<TransmissionProgress>,
557 ) -> Result<Option<(MediaSource, Box<ThumbnailInfo>)>> {
558 let Some(thumbnail) = thumbnail else {
559 return Ok(None);
560 };
561
562 let (data, _, thumbnail_info) = thumbnail.into_parts();
563 let mut cursor = Cursor::new(data);
564
565 let file = self
566 .upload_encrypted_file(&mut cursor)
567 .with_send_progress_observable(send_progress)
568 .await?;
569
570 Ok(Some((MediaSource::Encrypted(Box::new(file)), thumbnail_info)))
571 }
572
573 pub(crate) async fn claim_one_time_keys(
579 &self,
580 users: impl Iterator<Item = &UserId>,
581 ) -> Result<()> {
582 let _lock = self.locks().key_claim_lock.lock().await;
583
584 if let Some((request_id, request)) = self
585 .olm_machine()
586 .await
587 .as_ref()
588 .ok_or(Error::NoOlmMachine)?
589 .get_missing_sessions(users)
590 .await?
591 {
592 let response = self.send(request).await?;
593 self.mark_request_as_sent(&request_id, &response).await?;
594 }
595
596 Ok(())
597 }
598
599 #[instrument(skip(self, request))]
609 pub(crate) async fn keys_upload(
610 &self,
611 request_id: &TransactionId,
612 request: &upload_keys::v3::Request,
613 ) -> Result<upload_keys::v3::Response> {
614 debug!(
615 device_keys = request.device_keys.is_some(),
616 one_time_key_count = request.one_time_keys.len(),
617 "Uploading public encryption keys",
618 );
619
620 let response = self.send(request.clone()).await?;
621 self.mark_request_as_sent(request_id, &response).await?;
622
623 Ok(response)
624 }
625
626 pub(crate) async fn room_send_helper(
627 &self,
628 request: &RoomMessageRequest,
629 ) -> Result<send_message_event::v3::Response> {
630 let content = request.content.clone();
631 let txn_id = request.txn_id.clone();
632 let room_id = &request.room_id;
633
634 self.get_room(room_id)
635 .expect("Can't send a message to a room that isn't known to the store")
636 .send(*content)
637 .with_transaction_id(txn_id)
638 .await
639 }
640
641 pub(crate) async fn send_to_device(
642 &self,
643 request: &ToDeviceRequest,
644 ) -> HttpResult<ToDeviceResponse> {
645 let request = RumaToDeviceRequest::new_raw(
646 request.event_type.clone(),
647 request.txn_id.clone(),
648 request.messages.clone(),
649 );
650
651 self.send(request).await
652 }
653
654 pub(crate) async fn send_verification_request(
655 &self,
656 request: OutgoingVerificationRequest,
657 ) -> Result<()> {
658 use matrix_sdk_base::crypto::types::requests::OutgoingVerificationRequest::*;
659
660 match request {
661 ToDevice(t) => {
662 self.send_to_device(&t).await?;
663 }
664 InRoom(r) => {
665 self.room_send_helper(&r).await?;
666 }
667 }
668
669 Ok(())
670 }
671
672 pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
674 let rooms = self.joined_rooms();
675
676 let room = rooms.into_iter().find(|r| {
678 let targets = r.direct_targets();
679 targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id))
680 });
681
682 trace!(?room, "Found room");
683 room
684 }
685
686 async fn send_outgoing_request(&self, r: OutgoingRequest) -> Result<()> {
687 use matrix_sdk_base::crypto::types::requests::AnyOutgoingRequest;
688
689 match r.request() {
690 AnyOutgoingRequest::KeysQuery(request) => {
691 self.keys_query(r.request_id(), request.device_keys.clone()).await?;
692 }
693 AnyOutgoingRequest::KeysUpload(request) => {
694 let response = self.keys_upload(r.request_id(), request).await;
695
696 if let Err(e) = &response {
697 match e.as_ruma_api_error() {
698 Some(RumaApiError::ClientApi(e)) if e.status_code == 400 => {
699 if let ErrorBody::Standard { message, .. } = &e.body {
700 {
705 let already_reported = self
706 .state_store()
707 .get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded)
708 .await?
709 .is_some();
710
711 if message.starts_with("One time key") && !already_reported {
712 if let Ok(message) =
713 DuplicateOneTimeKeyErrorMessage::from_str(message)
714 {
715 error!(
716 sentry = true,
717 old_key = %message.old_key,
718 new_key = %message.new_key,
719 "Duplicate one-time keys have been uploaded"
720 );
721 } else {
722 error!(
723 sentry = true,
724 "Duplicate one-time keys have been uploaded"
725 );
726 }
727
728 self.state_store()
729 .set_kv_data(
730 StateStoreDataKey::OneTimeKeyAlreadyUploaded,
731 StateStoreDataValue::OneTimeKeyAlreadyUploaded,
732 )
733 .await?;
734 }
735 }
736 }
737 }
738 _ => {}
739 }
740
741 response?;
742 }
743 }
744 AnyOutgoingRequest::ToDeviceRequest(request) => {
745 let response = self.send_to_device(request).await?;
746 self.mark_request_as_sent(r.request_id(), &response).await?;
747 }
748 AnyOutgoingRequest::SignatureUpload(request) => {
749 let response = self.send(request.clone()).await?;
750 self.mark_request_as_sent(r.request_id(), &response).await?;
751 }
752 AnyOutgoingRequest::RoomMessage(request) => {
753 let response = self.room_send_helper(request).await?;
754 self.mark_request_as_sent(r.request_id(), &response).await?;
755 }
756 AnyOutgoingRequest::KeysClaim(request) => {
757 let response = self.send(request.clone()).await?;
758 self.mark_request_as_sent(r.request_id(), &response).await?;
759 }
760 }
761
762 Ok(())
763 }
764
765 #[instrument(skip_all)]
766 pub(crate) async fn send_outgoing_requests(&self) -> Result<()> {
767 const MAX_CONCURRENT_REQUESTS: usize = 20;
768
769 if let Err(e) = self.claim_one_time_keys(iter::empty()).await {
772 warn!("Error while claiming one-time keys {:?}", e);
773 }
774
775 let outgoing_requests = stream::iter(
776 self.olm_machine()
777 .await
778 .as_ref()
779 .ok_or(Error::NoOlmMachine)?
780 .outgoing_requests()
781 .await?,
782 )
783 .map(|r| self.send_outgoing_request(r));
784
785 let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS);
786
787 requests
788 .for_each(|r| async move {
789 match r {
790 Ok(_) => (),
791 Err(e) => warn!(error = ?e, "Error when sending out an outgoing E2EE request"),
792 }
793 })
794 .await;
795
796 Ok(())
797 }
798}
799
800#[cfg(any(feature = "testing", test))]
801impl Client {
802 pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
804 self.olm_machine().await
805 }
806}
807
808#[derive(Debug, Clone)]
812pub struct Encryption {
813 client: Client,
815}
816
817impl Encryption {
818 pub(crate) fn new(client: Client) -> Self {
819 Self { client }
820 }
821
822 pub(crate) fn settings(&self) -> EncryptionSettings {
824 self.client.inner.e2ee.encryption_settings
825 }
826
827 pub async fn ed25519_key(&self) -> Option<String> {
830 self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().ed25519.to_base64())
831 }
832
833 pub async fn curve25519_key(&self) -> Option<Curve25519PublicKey> {
835 self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().curve25519)
836 }
837
838 pub async fn device_creation_timestamp(&self) -> MilliSecondsSinceUnixEpoch {
840 match self.get_own_device().await {
841 Ok(Some(device)) => device.first_time_seen_ts(),
842 _ => MilliSecondsSinceUnixEpoch::now(),
844 }
845 }
846
847 pub(crate) async fn import_secrets_bundle(
848 &self,
849 bundle: &matrix_sdk_base::crypto::types::SecretsBundle,
850 ) -> Result<(), SecretImportError> {
851 let olm_machine = self.client.olm_machine().await;
852 let olm_machine =
853 olm_machine.as_ref().expect("This should only be called once we have an OlmMachine");
854
855 olm_machine.store().import_secrets_bundle(bundle).await
856 }
857
858 pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
863 let olm = self.client.olm_machine().await;
864 let machine = olm.as_ref()?;
865 Some(machine.cross_signing_status().await)
866 }
867
868 pub async fn tracked_users(&self) -> Result<HashSet<OwnedUserId>, CryptoStoreError> {
873 if let Some(machine) = self.client.olm_machine().await.as_ref() {
874 machine.tracked_users().await
875 } else {
876 Ok(HashSet::new())
877 }
878 }
879
880 pub fn verification_state(&self) -> Subscriber<VerificationState> {
903 self.client.inner.verification_state.subscribe_reset()
904 }
905
906 pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
908 let olm = self.client.olm_machine().await;
909 let olm = olm.as_ref()?;
910 #[allow(clippy::bind_instead_of_map)]
911 olm.get_verification(user_id, flow_id).and_then(|v| match v {
912 matrix_sdk_base::crypto::Verification::SasV1(sas) => {
913 Some(SasVerification { inner: sas, client: self.client.clone() }.into())
914 }
915 #[cfg(feature = "qrcode")]
916 matrix_sdk_base::crypto::Verification::QrV1(qr) => {
917 Some(verification::QrVerification { inner: qr, client: self.client.clone() }.into())
918 }
919 _ => None,
920 })
921 }
922
923 pub async fn get_verification_request(
926 &self,
927 user_id: &UserId,
928 flow_id: impl AsRef<str>,
929 ) -> Option<VerificationRequest> {
930 let olm = self.client.olm_machine().await;
931 let olm = olm.as_ref()?;
932
933 olm.get_verification_request(user_id, flow_id)
934 .map(|r| VerificationRequest { inner: r, client: self.client.clone() })
935 }
936
937 pub async fn get_device(
971 &self,
972 user_id: &UserId,
973 device_id: &DeviceId,
974 ) -> Result<Option<Device>, CryptoStoreError> {
975 let olm = self.client.olm_machine().await;
976 let Some(machine) = olm.as_ref() else { return Ok(None) };
977 let device = machine.get_device(user_id, device_id, None).await?;
978 Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
979 }
980
981 pub async fn get_own_device(&self) -> Result<Option<Device>, CryptoStoreError> {
988 let olm = self.client.olm_machine().await;
989 let Some(machine) = olm.as_ref() else { return Ok(None) };
990 let device = machine.get_device(machine.user_id(), machine.device_id(), None).await?;
991 Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
992 }
993
994 pub async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices, Error> {
1020 let devices = self
1021 .client
1022 .olm_machine()
1023 .await
1024 .as_ref()
1025 .ok_or(Error::NoOlmMachine)?
1026 .get_user_devices(user_id, None)
1027 .await?;
1028
1029 Ok(UserDevices { inner: devices, client: self.client.clone() })
1030 }
1031
1032 pub async fn get_user_identity(
1068 &self,
1069 user_id: &UserId,
1070 ) -> Result<Option<UserIdentity>, CryptoStoreError> {
1071 let olm = self.client.olm_machine().await;
1072 let Some(olm) = olm.as_ref() else { return Ok(None) };
1073 let identity = olm.get_identity(user_id, None).await?;
1074
1075 Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1076 }
1077
1078 pub async fn request_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
1116 let olm = self.client.olm_machine().await;
1117 let Some(olm) = olm.as_ref() else { return Ok(None) };
1118
1119 let (request_id, request) = olm.query_keys_for_users(iter::once(user_id));
1120 self.client.keys_query(&request_id, request.device_keys).await?;
1121
1122 let identity = olm.get_identity(user_id, None).await?;
1123 Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1124 }
1125
1126 pub async fn devices_stream(&self) -> Result<impl Stream<Item = DeviceUpdates> + use<>> {
1157 let olm = self.client.olm_machine().await;
1158 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1159 let client = self.client.to_owned();
1160
1161 Ok(olm
1162 .store()
1163 .devices_stream()
1164 .map(move |updates| DeviceUpdates::new(client.to_owned(), updates)))
1165 }
1166
1167 pub async fn user_identities_stream(
1195 &self,
1196 ) -> Result<impl Stream<Item = IdentityUpdates> + use<>> {
1197 let olm = self.client.olm_machine().await;
1198 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1199 let client = self.client.to_owned();
1200
1201 Ok(olm
1202 .store()
1203 .user_identities_stream()
1204 .map(move |updates| IdentityUpdates::new(client.to_owned(), updates)))
1205 }
1206
1207 pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData>) -> Result<()> {
1246 let olm = self.client.olm_machine().await;
1247 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1248
1249 let CrossSigningBootstrapRequests {
1250 upload_signing_keys_req,
1251 upload_keys_req,
1252 upload_signatures_req,
1253 } = olm.bootstrap_cross_signing(false).await?;
1254
1255 let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1256 auth: auth_data,
1257 master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1258 self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1259 user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1260 });
1261
1262 if let Some(req) = upload_keys_req {
1263 self.client.send_outgoing_request(req).await?;
1264 }
1265 self.client.send(upload_signing_keys_req).await?;
1266 self.client.send(upload_signatures_req).await?;
1267
1268 Ok(())
1269 }
1270
1271 pub async fn reset_cross_signing(&self) -> Result<Option<CrossSigningResetHandle>> {
1308 let olm = self.client.olm_machine().await;
1309 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1310
1311 let CrossSigningBootstrapRequests {
1312 upload_keys_req,
1313 upload_signing_keys_req,
1314 upload_signatures_req,
1315 } = olm.bootstrap_cross_signing(true).await?;
1316
1317 let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1318 auth: None,
1319 master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1320 self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1321 user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1322 });
1323
1324 if let Some(req) = upload_keys_req {
1325 self.client.send_outgoing_request(req).await?;
1326 }
1327
1328 if let Err(error) = self.client.send(upload_signing_keys_req.clone()).await {
1329 if let Ok(Some(auth_type)) = CrossSigningResetAuthType::new(&error) {
1330 let client = self.client.clone();
1331
1332 Ok(Some(CrossSigningResetHandle::new(
1333 client,
1334 upload_signing_keys_req,
1335 upload_signatures_req,
1336 auth_type,
1337 )))
1338 } else {
1339 Err(error.into())
1340 }
1341 } else {
1342 self.client.send(upload_signatures_req).await?;
1343
1344 Ok(None)
1345 }
1346 }
1347
1348 async fn ensure_initial_key_query(&self) -> Result<()> {
1351 let olm_machine = self.client.olm_machine().await;
1352 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1353
1354 let user_id = olm_machine.user_id();
1355
1356 if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1357 let (request_id, request) = olm_machine.query_keys_for_users([olm_machine.user_id()]);
1358 self.client.keys_query(&request_id, request.device_keys).await?;
1359 }
1360
1361 Ok(())
1362 }
1363
1364 pub async fn bootstrap_cross_signing_if_needed(
1411 &self,
1412 auth_data: Option<AuthData>,
1413 ) -> Result<()> {
1414 let olm_machine = self.client.olm_machine().await;
1415 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1416 let user_id = olm_machine.user_id();
1417
1418 self.ensure_initial_key_query().await?;
1419
1420 if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1421 self.bootstrap_cross_signing(auth_data).await?;
1422 }
1423
1424 Ok(())
1425 }
1426
1427 #[cfg(not(target_family = "wasm"))]
1479 pub async fn export_room_keys(
1480 &self,
1481 path: PathBuf,
1482 passphrase: &str,
1483 predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
1484 ) -> Result<()> {
1485 let olm = self.client.olm_machine().await;
1486 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1487
1488 let keys = olm.store().export_room_keys(predicate).await?;
1489 let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1490
1491 let encrypt = move || -> Result<()> {
1492 let export: String =
1493 matrix_sdk_base::crypto::encrypt_room_key_export(&keys, &passphrase, 500_000)?;
1494 let mut file = std::fs::File::create(path)?;
1495 file.write_all(&export.into_bytes())?;
1496 Ok(())
1497 };
1498
1499 let task = tokio::task::spawn_blocking(encrypt);
1500 task.await.expect("Task join error")
1501 }
1502
1503 #[cfg(not(target_family = "wasm"))]
1541 pub async fn import_room_keys(
1542 &self,
1543 path: PathBuf,
1544 passphrase: &str,
1545 ) -> Result<RoomKeyImportResult, RoomKeyImportError> {
1546 let olm = self.client.olm_machine().await;
1547 let olm = olm.as_ref().ok_or(RoomKeyImportError::StoreClosed)?;
1548 let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1549
1550 let decrypt = move || {
1551 let file = std::fs::File::open(path)?;
1552 matrix_sdk_base::crypto::decrypt_room_key_export(file, &passphrase)
1553 };
1554
1555 let task = tokio::task::spawn_blocking(decrypt);
1556 let import = task.await.expect("Task join error")?;
1557
1558 let ret = olm.store().import_exported_room_keys(import, |_, _| {}).await?;
1559
1560 self.backups().maybe_trigger_backup();
1561
1562 Ok(ret)
1563 }
1564
1565 pub async fn room_keys_received_stream(
1596 &self,
1597 ) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<>>
1598 {
1599 let olm = self.client.olm_machine().await;
1600 let olm = olm.as_ref()?;
1601
1602 Some(olm.store().room_keys_received_stream())
1603 }
1604
1605 pub async fn historic_room_key_stream(
1636 &self,
1637 ) -> Option<impl Stream<Item = RoomKeyBundleInfo> + use<>> {
1638 let olm = self.client.olm_machine().await;
1639 let olm = olm.as_ref()?;
1640
1641 Some(olm.store().historic_room_key_stream())
1642 }
1643
1644 pub fn secret_storage(&self) -> SecretStorage {
1646 SecretStorage { client: self.client.to_owned() }
1647 }
1648
1649 pub fn backups(&self) -> Backups {
1651 Backups { client: self.client.to_owned() }
1652 }
1653
1654 pub fn recovery(&self) -> Recovery {
1656 Recovery { client: self.client.to_owned() }
1657 }
1658
1659 pub async fn enable_cross_process_store_lock(&self, lock_value: String) -> Result<(), Error> {
1671 if let Some(prev_lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1673 let prev_holder = prev_lock.lock_holder();
1674 if prev_holder == lock_value {
1675 return Ok(());
1676 }
1677 warn!(
1678 "Recreating cross-process store lock with a different holder value: \
1679 prev was {prev_holder}, new is {lock_value}"
1680 );
1681 }
1682
1683 let olm_machine = self.client.base_client().olm_machine().await;
1684 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1685
1686 let lock =
1687 olm_machine.store().create_store_lock("cross_process_lock".to_owned(), lock_value);
1688
1689 {
1694 let guard = lock.try_lock_once().await?;
1695 if guard.is_some() {
1696 olm_machine
1697 .initialize_crypto_store_generation(
1698 &self.client.locks().crypto_store_generation,
1699 )
1700 .await?;
1701 }
1702 }
1703
1704 self.client
1705 .locks()
1706 .cross_process_crypto_store_lock
1707 .set(lock)
1708 .map_err(|_| Error::BadCryptoStoreState)?;
1709
1710 Ok(())
1711 }
1712
1713 async fn on_lock_newly_acquired(&self) -> Result<u64, Error> {
1718 let olm_machine_guard = self.client.olm_machine().await;
1719 if let Some(olm_machine) = olm_machine_guard.as_ref() {
1720 let (new_gen, generation_number) = olm_machine
1721 .maintain_crypto_store_generation(&self.client.locks().crypto_store_generation)
1722 .await?;
1723 if new_gen {
1725 drop(olm_machine_guard);
1727 self.client.base_client().regenerate_olm(None).await?;
1729 }
1730 Ok(generation_number)
1731 } else {
1732 warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised");
1737 Ok(0)
1738 }
1739 }
1740
1741 pub async fn spin_lock_store(
1747 &self,
1748 max_backoff: Option<u32>,
1749 ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1750 if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1751 let guard = lock.spin_lock(max_backoff).await?;
1752
1753 let generation = self.on_lock_newly_acquired().await?;
1754
1755 Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation }))
1756 } else {
1757 Ok(None)
1758 }
1759 }
1760
1761 pub async fn try_lock_store_once(
1766 &self,
1767 ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1768 if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1769 let maybe_guard = lock.try_lock_once().await?;
1770
1771 let Some(guard) = maybe_guard else {
1772 return Ok(None);
1773 };
1774
1775 let generation = self.on_lock_newly_acquired().await?;
1776
1777 Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation }))
1778 } else {
1779 Ok(None)
1780 }
1781 }
1782
1783 #[cfg(any(test, feature = "testing"))]
1785 pub async fn uploaded_key_count(&self) -> Result<u64> {
1786 let olm_machine = self.client.olm_machine().await;
1787 let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?;
1788 Ok(olm_machine.uploaded_key_count().await?)
1789 }
1790
1791 pub(crate) async fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1815 let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite {
1819 Some(BundleReceiverTask::new(&self.client).await)
1820 } else {
1821 None
1822 };
1823
1824 let mut tasks = self.client.inner.e2ee.tasks.lock();
1825
1826 let this = self.clone();
1827
1828 tasks.setup_e2ee = Some(spawn(async move {
1829 this.update_verification_state().await;
1832
1833 if this.settings().auto_enable_cross_signing
1834 && let Err(e) = this.bootstrap_cross_signing_if_needed(auth_data).await
1835 {
1836 error!("Couldn't bootstrap cross signing {e:?}");
1837 }
1838
1839 if let Err(e) = this.backups().setup_and_resume().await {
1840 error!("Couldn't setup and resume backups {e:?}");
1841 }
1842 if let Err(e) = this.recovery().setup().await {
1843 error!("Couldn't setup and resume recovery {e:?}");
1844 }
1845 }));
1846
1847 tasks.receive_historic_room_key_bundles = bundle_receiver_task;
1848 }
1849
1850 pub async fn wait_for_e2ee_initialization_tasks(&self) {
1853 let task = self.client.inner.e2ee.tasks.lock().setup_e2ee.take();
1854
1855 if let Some(task) = task
1856 && let Err(err) = task.await
1857 {
1858 warn!("Error when initializing backups: {err}");
1859 }
1860 }
1861
1862 pub(crate) async fn ensure_device_keys_upload(&self) -> Result<()> {
1872 let olm = self.client.olm_machine().await;
1873 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1874
1875 if let Some((request_id, request)) = olm.upload_device_keys().await? {
1876 self.client.keys_upload(&request_id, &request).await?;
1877
1878 let (request_id, request) = olm.query_keys_for_users([olm.user_id()]);
1879 self.client.keys_query(&request_id, request.device_keys).await?;
1880 }
1881
1882 Ok(())
1883 }
1884
1885 pub(crate) async fn update_state_after_keys_query(&self, response: &get_keys::v3::Response) {
1886 self.recovery().update_state_after_keys_query(response).await;
1887
1888 if let Some(user_id) = self.client.user_id() {
1890 let contains_own_device = response.device_keys.contains_key(user_id);
1891
1892 if contains_own_device {
1893 self.update_verification_state().await;
1894 }
1895 }
1896 }
1897
1898 async fn update_verification_state(&self) {
1899 match self.get_own_device().await {
1900 Ok(device) => {
1901 if let Some(device) = device {
1902 let is_verified = device.is_cross_signed_by_owner();
1903
1904 if is_verified {
1905 self.client.inner.verification_state.set(VerificationState::Verified);
1906 } else {
1907 self.client.inner.verification_state.set(VerificationState::Unverified);
1908 }
1909 } else {
1910 warn!("Couldn't find out own device in the store.");
1911 self.client.inner.verification_state.set(VerificationState::Unknown);
1912 }
1913 }
1914 Err(error) => {
1915 warn!("Failed retrieving own device: {error}");
1916 self.client.inner.verification_state.set(VerificationState::Unknown);
1917 }
1918 }
1919 }
1920
1921 #[cfg(feature = "experimental-send-custom-to-device")]
1930 pub async fn encrypt_and_send_raw_to_device(
1931 &self,
1932 recipient_devices: Vec<&Device>,
1933 event_type: &str,
1934 content: Raw<AnyToDeviceEventContent>,
1935 share_strategy: CollectStrategy,
1936 ) -> Result<Vec<(OwnedUserId, OwnedDeviceId)>> {
1937 let users = recipient_devices.iter().map(|device| device.user_id());
1938
1939 self.client.claim_one_time_keys(users).await?;
1943
1944 let olm = self.client.olm_machine().await;
1945 let olm = olm.as_ref().expect("Olm machine wasn't started");
1946
1947 let (requests, withhelds) = olm
1948 .encrypt_content_for_devices(
1949 recipient_devices.into_iter().map(|d| d.deref().clone()).collect(),
1950 event_type,
1951 &content
1952 .deserialize_as::<serde_json::Value>()
1953 .expect("Deserialize as Value will always work"),
1954 share_strategy,
1955 )
1956 .await?;
1957
1958 let mut failures: Vec<(OwnedUserId, OwnedDeviceId)> = Default::default();
1959
1960 withhelds.iter().for_each(|(d, _)| {
1962 failures.push((d.user_id().to_owned(), d.device_id().to_owned()));
1963 });
1964
1965 for request in requests {
1967 let ruma_request = RumaToDeviceRequest::new_raw(
1968 request.event_type.clone(),
1969 request.txn_id.clone(),
1970 request.messages.clone(),
1971 );
1972
1973 let send_result = self
1974 .client
1975 .send_inner(ruma_request, Some(RequestConfig::short_retry()), Default::default())
1976 .await;
1977
1978 if send_result.is_err() {
1980 for (user_id, device_map) in request.messages {
1982 for device_id in device_map.keys() {
1983 match device_id {
1984 DeviceIdOrAllDevices::DeviceId(device_id) => {
1985 failures.push((user_id.clone(), device_id.to_owned()));
1986 }
1987 DeviceIdOrAllDevices::AllDevices => {
1988 }
1990 }
1991 }
1992 }
1993 }
1994 }
1995
1996 Ok(failures)
1997 }
1998}
1999
2000#[cfg(all(test, not(target_family = "wasm")))]
2001mod tests {
2002 use std::{
2003 ops::Not,
2004 str::FromStr,
2005 sync::{
2006 Arc,
2007 atomic::{AtomicBool, Ordering},
2008 },
2009 time::Duration,
2010 };
2011
2012 use matrix_sdk_test::{
2013 DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
2014 event_factory::EventFactory, test_json,
2015 };
2016 use ruma::{
2017 event_id,
2018 events::{reaction::ReactionEventContent, relation::Annotation},
2019 user_id,
2020 };
2021 use serde_json::json;
2022 use wiremock::{
2023 Mock, MockServer, Request, ResponseTemplate,
2024 matchers::{header, method, path_regex},
2025 };
2026
2027 use crate::{
2028 Client, assert_next_matches_with_timeout,
2029 config::RequestConfig,
2030 encryption::{
2031 DuplicateOneTimeKeyErrorMessage, OAuthCrossSigningResetInfo, VerificationState,
2032 },
2033 test_utils::{
2034 client::mock_matrix_session, logged_in_client, no_retry_test_client, set_client_session,
2035 },
2036 };
2037
2038 #[async_test]
2039 async fn test_reaction_sending() {
2040 let server = MockServer::start().await;
2041 let client = logged_in_client(Some(server.uri())).await;
2042
2043 let event_id = event_id!("$2:example.org");
2044
2045 Mock::given(method("GET"))
2046 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
2047 .and(header("authorization", "Bearer 1234"))
2048 .respond_with(
2049 ResponseTemplate::new(200)
2050 .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
2051 )
2052 .mount(&server)
2053 .await;
2054
2055 Mock::given(method("PUT"))
2056 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/m\.reaction/.*".to_owned()))
2057 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2058 "event_id": event_id,
2059 })))
2060 .mount(&server)
2061 .await;
2062
2063 let response = SyncResponseBuilder::default()
2064 .add_joined_room(
2065 JoinedRoomBuilder::default()
2066 .add_state_event(StateTestEvent::Member)
2067 .add_state_event(StateTestEvent::PowerLevels)
2068 .add_state_event(StateTestEvent::Encryption),
2069 )
2070 .build_sync_response();
2071
2072 client.base_client().receive_sync_response(response).await.unwrap();
2073
2074 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
2075 assert!(
2076 room.latest_encryption_state().await.expect("Getting encryption state").is_encrypted()
2077 );
2078
2079 let event_id = event_id!("$1:example.org");
2080 let reaction = ReactionEventContent::new(Annotation::new(event_id.into(), "🐈".to_owned()));
2081 room.send(reaction).await.expect("Sending the reaction should not fail");
2082
2083 room.send_raw("m.reaction", json!({})).await.expect("Sending the reaction should not fail");
2084 }
2085
2086 #[async_test]
2087 async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
2088 let server = MockServer::start().await;
2089 let client = logged_in_client(Some(server.uri())).await;
2090 let user_id = user_id!("@invited:localhost");
2094
2095 let f = EventFactory::new();
2097 let response = SyncResponseBuilder::default()
2098 .add_joined_room(
2099 JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberAdditional),
2100 )
2101 .add_global_account_data(
2102 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
2103 )
2104 .build_sync_response();
2105 client.base_client().receive_sync_response(response).await.unwrap();
2106
2107 let found_room = client.get_dm_room(user_id).expect("DM not found!");
2109 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
2110 }
2111
2112 #[async_test]
2113 async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
2114 let server = MockServer::start().await;
2115 let client = logged_in_client(Some(server.uri())).await;
2116 let user_id = user_id!("@invited:localhost");
2118
2119 let f = EventFactory::new();
2121 let response = SyncResponseBuilder::default()
2122 .add_joined_room(
2123 JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberInvite),
2124 )
2125 .add_global_account_data(
2126 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
2127 )
2128 .build_sync_response();
2129 client.base_client().receive_sync_response(response).await.unwrap();
2130
2131 let found_room = client.get_dm_room(user_id).expect("DM not found!");
2133 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
2134 }
2135
2136 #[async_test]
2137 async fn test_get_dm_room_still_finds_left_room() {
2138 let server = MockServer::start().await;
2142 let client = logged_in_client(Some(server.uri())).await;
2143 let user_id = user_id!("@invited:localhost");
2147
2148 let f = EventFactory::new();
2150 let response = SyncResponseBuilder::default()
2151 .add_joined_room(
2152 JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberLeave),
2153 )
2154 .add_global_account_data(
2155 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
2156 )
2157 .build_sync_response();
2158 client.base_client().receive_sync_response(response).await.unwrap();
2159
2160 let found_room = client.get_dm_room(user_id).expect("DM not found!");
2162 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
2163 }
2164
2165 #[cfg(feature = "sqlite")]
2166 #[async_test]
2167 async fn test_generation_counter_invalidates_olm_machine() {
2168 use matrix_sdk_base::store::RoomLoadSettings;
2171 let sqlite_path = std::env::temp_dir().join("generation_counter_sqlite.db");
2172 let session = mock_matrix_session();
2173
2174 let client1 = Client::builder()
2175 .homeserver_url("http://localhost:1234")
2176 .request_config(RequestConfig::new().disable_retry())
2177 .sqlite_store(&sqlite_path, None)
2178 .build()
2179 .await
2180 .unwrap();
2181 client1
2182 .matrix_auth()
2183 .restore_session(session.clone(), RoomLoadSettings::default())
2184 .await
2185 .unwrap();
2186
2187 let client2 = Client::builder()
2188 .homeserver_url("http://localhost:1234")
2189 .request_config(RequestConfig::new().disable_retry())
2190 .sqlite_store(sqlite_path, None)
2191 .build()
2192 .await
2193 .unwrap();
2194 client2.matrix_auth().restore_session(session, RoomLoadSettings::default()).await.unwrap();
2195
2196 let guard = client1.encryption().try_lock_store_once().await.unwrap();
2198 assert!(guard.is_none());
2199
2200 client1.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2201 client2.encryption().enable_cross_process_store_lock("client2".to_owned()).await.unwrap();
2202
2203 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2205 assert!(acquired1.is_some());
2206
2207 let initial_olm_machine =
2209 client1.olm_machine().await.clone().expect("must have an olm machine");
2210
2211 let decryption_key = matrix_sdk_base::crypto::store::types::BackupDecryptionKey::new()
2213 .expect("Can't create new recovery key");
2214 let backup_key = decryption_key.megolm_v1_public_key();
2215 backup_key.set_version("1".to_owned());
2216 initial_olm_machine
2217 .backup_machine()
2218 .save_decryption_key(Some(decryption_key.to_owned()), Some("1".to_owned()))
2219 .await
2220 .expect("Should save");
2221
2222 initial_olm_machine.backup_machine().enable_backup_v1(backup_key.clone()).await.unwrap();
2223
2224 assert!(client1.encryption().backups().are_enabled().await);
2225
2226 let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2228 assert!(acquired2.is_none());
2229
2230 drop(acquired1);
2232 tokio::time::sleep(Duration::from_millis(100)).await;
2233
2234 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2236 assert!(acquired1.is_some());
2237
2238 let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2240 assert!(initial_olm_machine.same_as(&olm_machine));
2241
2242 drop(acquired1);
2244 tokio::time::sleep(Duration::from_millis(100)).await;
2245
2246 let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2248 assert!(acquired2.is_some());
2249
2250 drop(acquired2);
2252 tokio::time::sleep(Duration::from_millis(100)).await;
2253
2254 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2256 assert!(acquired1.is_some());
2257
2258 let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2260
2261 assert!(!initial_olm_machine.same_as(&olm_machine));
2262
2263 let backup_key_new = olm_machine.backup_machine().get_backup_keys().await.unwrap();
2264 assert!(backup_key_new.decryption_key.is_some());
2265 assert_eq!(
2266 backup_key_new.decryption_key.unwrap().megolm_v1_public_key().to_base64(),
2267 backup_key.to_base64()
2268 );
2269 assert!(client1.encryption().backups().are_enabled().await);
2270 }
2271
2272 #[cfg(feature = "sqlite")]
2273 #[async_test]
2274 async fn test_generation_counter_no_spurious_invalidation() {
2275 use matrix_sdk_base::store::RoomLoadSettings;
2278 let sqlite_path =
2279 std::env::temp_dir().join("generation_counter_no_spurious_invalidations.db");
2280 let session = mock_matrix_session();
2281
2282 let client = Client::builder()
2283 .homeserver_url("http://localhost:1234")
2284 .request_config(RequestConfig::new().disable_retry())
2285 .sqlite_store(&sqlite_path, None)
2286 .build()
2287 .await
2288 .unwrap();
2289 client
2290 .matrix_auth()
2291 .restore_session(session.clone(), RoomLoadSettings::default())
2292 .await
2293 .unwrap();
2294
2295 let initial_olm_machine = client.olm_machine().await.as_ref().unwrap().clone();
2296
2297 client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2298
2299 let after_enabling_lock = client.olm_machine().await.as_ref().unwrap().clone();
2301 assert!(initial_olm_machine.same_as(&after_enabling_lock));
2302
2303 {
2304 let client2 = Client::builder()
2306 .homeserver_url("http://localhost:1234")
2307 .request_config(RequestConfig::new().disable_retry())
2308 .sqlite_store(sqlite_path, None)
2309 .build()
2310 .await
2311 .unwrap();
2312 client2
2313 .matrix_auth()
2314 .restore_session(session, RoomLoadSettings::default())
2315 .await
2316 .unwrap();
2317
2318 client2
2319 .encryption()
2320 .enable_cross_process_store_lock("client2".to_owned())
2321 .await
2322 .unwrap();
2323
2324 let guard = client2.encryption().spin_lock_store(None).await.unwrap();
2325 assert!(guard.is_some());
2326
2327 drop(guard);
2328 tokio::time::sleep(Duration::from_millis(100)).await;
2329 }
2330
2331 {
2332 let acquired = client.encryption().try_lock_store_once().await.unwrap();
2333 assert!(acquired.is_some());
2334 }
2335
2336 let after_taking_lock_first_time = client.olm_machine().await.as_ref().unwrap().clone();
2338 assert!(!initial_olm_machine.same_as(&after_taking_lock_first_time));
2339
2340 {
2341 let acquired = client.encryption().try_lock_store_once().await.unwrap();
2342 assert!(acquired.is_some());
2343 }
2344
2345 let after_taking_lock_second_time = client.olm_machine().await.as_ref().unwrap().clone();
2347 assert!(after_taking_lock_first_time.same_as(&after_taking_lock_second_time));
2348 }
2349
2350 #[async_test]
2351 async fn test_update_verification_state_is_updated_before_any_requests_happen() {
2352 let client = no_retry_test_client(None).await;
2354 let server = MockServer::start().await;
2355
2356 let mut verification_state = client.encryption().verification_state();
2358
2359 assert_next_matches_with_timeout!(verification_state, VerificationState::Unknown);
2361
2362 let keys_requested = Arc::new(AtomicBool::new(false));
2365 let inner_bool = keys_requested.clone();
2366
2367 Mock::given(method("GET"))
2368 .and(path_regex(
2369 r"/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
2370 ))
2371 .respond_with(move |_req: &Request| {
2372 inner_bool.fetch_or(true, Ordering::SeqCst);
2373 ResponseTemplate::new(200).set_body_json(json!({}))
2374 })
2375 .mount(&server)
2376 .await;
2377
2378 set_client_session(&client).await;
2380
2381 assert!(keys_requested.load(Ordering::SeqCst).not());
2383 assert_next_matches_with_timeout!(verification_state, VerificationState::Unverified);
2384 }
2385
2386 #[test]
2387 fn test_oauth_reset_info_from_uiaa_info() {
2388 let auth_info = json!({
2389 "session": "dummy",
2390 "flows": [
2391 {
2392 "stages": [
2393 "org.matrix.cross_signing_reset"
2394 ]
2395 }
2396 ],
2397 "params": {
2398 "org.matrix.cross_signing_reset": {
2399 "url": "https://example.org/account/account?action=org.matrix.cross_signing_reset"
2400 }
2401 },
2402 "msg": "To reset..."
2403 });
2404
2405 let auth_info = serde_json::from_value(auth_info)
2406 .expect("We should be able to deserialize the UiaaInfo");
2407 OAuthCrossSigningResetInfo::from_auth_info(&auth_info)
2408 .expect("We should be able to fetch the cross-signing reset info from the auth info");
2409 }
2410
2411 #[test]
2412 fn test_duplicate_one_time_key_error_parsing() {
2413 let message = concat!(
2414 r#"One time key signed_curve25519:AAAAAAAAAAA already exists. "#,
2415 r#"Old key: {"key":"dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs","#,
2416 r#""signatures":{"@example:matrix.org":{"ed25519:AAAAAAAAAA":""#,
2417 r#"Fk45zHAbrd+1j9wZXLjL2Y/+DU/Mnz9yuvlfYBOOT7qExN2Jdud+5BAuNs8nZ/caS4wTF39Kg3zQpzaGERoCBg"}}};"#,
2418 r#" new key: {'key': 'CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno', "#,
2419 r#"'signatures': {'@example:matrix.org': {'ed25519:AAAAAAAAAA': "#,
2420 r#"'BQ9Gp0p+6srF+c8OyruqKKd9R4yaub3THYAyyBB/7X/rG8BwcAqFynzl1aGyFYun4Q+087a5OSiglCXI+/kQAA'}}}"#
2421 );
2422 let message = DuplicateOneTimeKeyErrorMessage::from_str(message)
2423 .expect("We should be able to parse the error message");
2424
2425 assert_eq!(message.old_key.to_base64(), "dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs");
2426 assert_eq!(message.new_key.to_base64(), "CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno");
2427
2428 DuplicateOneTimeKeyErrorMessage::from_str("One time key already exists.")
2429 .expect_err("We shouldn't be able to parse an incomplete error message");
2430 }
2431}