1#![doc = include_str!("../docs/encryption.md")]
17#![cfg_attr(target_family = "wasm", allow(unused_imports))]
18
19#[cfg(feature = "experimental-send-custom-to-device")]
20use std::ops::Deref;
21use std::{
22 collections::{BTreeMap, HashSet},
23 io::{Cursor, Read, Write},
24 iter,
25 path::PathBuf,
26 str::FromStr,
27 sync::Arc,
28};
29
30use eyeball::{SharedObservable, Subscriber};
31use futures_core::Stream;
32use futures_util::{
33 future::try_join,
34 stream::{self, StreamExt},
35};
36#[cfg(feature = "experimental-send-custom-to-device")]
37use matrix_sdk_base::crypto::CollectStrategy;
38use matrix_sdk_base::{
39 StateStoreDataKey, StateStoreDataValue,
40 crypto::{
41 CrossSigningBootstrapRequests, OlmMachine,
42 store::types::{RoomKeyBundleInfo, RoomKeyInfo},
43 types::{
44 SignedKey,
45 requests::{
46 OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
47 },
48 },
49 },
50};
51use matrix_sdk_common::{executor::spawn, locks::Mutex as StdMutex};
52use ruma::{
53 DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, TransactionId, UserId,
54 api::client::{
55 error::ErrorBody,
56 keys::{
57 get_keys, upload_keys, upload_signatures::v3::Request as UploadSignaturesRequest,
58 upload_signing_keys::v3::Request as UploadSigningKeysRequest,
59 },
60 message::send_message_event,
61 to_device::send_event_to_device::v3::{
62 Request as RumaToDeviceRequest, Response as ToDeviceResponse,
63 },
64 uiaa::{AuthData, UiaaInfo},
65 },
66 assign,
67 events::{
68 direct::DirectUserIdentifier,
69 room::{MediaSource, ThumbnailInfo},
70 },
71};
72#[cfg(feature = "experimental-send-custom-to-device")]
73use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices};
74use serde::{Deserialize, de::Error as _};
75use tasks::BundleReceiverTask;
76use tokio::sync::{Mutex, RwLockReadGuard};
77use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
78use tracing::{debug, error, instrument, trace, warn};
79use url::Url;
80use vodozemac::Curve25519PublicKey;
81
82use self::{
83 backups::{Backups, types::BackupClientState},
84 futures::UploadEncryptedFile,
85 identities::{Device, DeviceUpdates, IdentityUpdates, UserDevices, UserIdentity},
86 recovery::{Recovery, RecoveryState},
87 secret_storage::SecretStorage,
88 tasks::{BackupDownloadTask, BackupUploadingTask, ClientTasks},
89 verification::{SasVerification, Verification, VerificationRequest},
90};
91use crate::{
92 Client, Error, HttpError, Result, Room, RumaApiError, TransmissionProgress,
93 attachment::Thumbnail,
94 client::{ClientInner, WeakClient},
95 cross_process_lock::CrossProcessLockGuard,
96 error::HttpResult,
97};
98
99pub mod backups;
100pub mod futures;
101pub mod identities;
102pub mod recovery;
103pub mod secret_storage;
104pub(crate) mod tasks;
105pub mod verification;
106
107pub use matrix_sdk_base::crypto::{
108 CrossSigningStatus, CryptoStoreError, DecryptorError, EventError, KeyExportError, LocalTrust,
109 MediaEncryptionInfo, MegolmError, OlmError, RoomKeyImportResult, SecretImportError,
110 SessionCreationError, SignatureError, VERSION,
111 olm::{
112 SessionCreationError as MegolmSessionCreationError,
113 SessionExportError as OlmSessionExportError,
114 },
115 vodozemac,
116};
117
118#[cfg(feature = "experimental-send-custom-to-device")]
119use crate::config::RequestConfig;
120pub use crate::error::RoomKeyImportError;
121
122pub(crate) struct EncryptionData {
124 pub tasks: StdMutex<ClientTasks>,
127
128 pub encryption_settings: EncryptionSettings,
130
131 pub backup_state: BackupClientState,
133
134 pub recovery_state: SharedObservable<RecoveryState>,
136}
137
138impl EncryptionData {
139 pub fn new(encryption_settings: EncryptionSettings) -> Self {
140 Self {
141 encryption_settings,
142
143 tasks: StdMutex::new(Default::default()),
144 backup_state: Default::default(),
145 recovery_state: Default::default(),
146 }
147 }
148
149 pub fn initialize_tasks(&self, client: &Arc<ClientInner>) {
150 let weak_client = WeakClient::from_inner(client);
151
152 let mut tasks = self.tasks.lock();
153 tasks.upload_room_keys = Some(BackupUploadingTask::new(weak_client.clone()));
154
155 if self.encryption_settings.backup_download_strategy
156 == BackupDownloadStrategy::AfterDecryptionFailure
157 {
158 tasks.download_room_keys = Some(BackupDownloadTask::new(weak_client));
159 }
160 }
161
162 pub fn initialize_recovery_state_update_task(&self, client: &Client) {
168 let mut guard = self.tasks.lock();
169
170 let future = Recovery::update_state_after_backup_state_change(client);
171 let join_handle = spawn(future);
172
173 guard.update_recovery_state_after_backup = Some(join_handle);
174 }
175}
176
177#[derive(Clone, Copy, Debug, Default)]
179pub struct EncryptionSettings {
180 pub auto_enable_cross_signing: bool,
186
187 pub backup_download_strategy: BackupDownloadStrategy,
192
193 pub auto_enable_backups: bool,
195}
196
197#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
199#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
200pub enum BackupDownloadStrategy {
201 OneShot,
212
213 AfterDecryptionFailure,
215
216 #[default]
221 Manual,
222}
223
224#[derive(Clone, Copy, Debug, Eq, PartialEq)]
229pub enum VerificationState {
230 Unknown,
232 Verified,
235 Unverified,
237}
238
239#[derive(Debug)]
241pub struct CrossProcessLockStoreGuardWithGeneration {
242 _guard: CrossProcessLockGuard,
243 generation: u64,
244}
245
246impl CrossProcessLockStoreGuardWithGeneration {
247 pub fn generation(&self) -> u64 {
249 self.generation
250 }
251}
252
253#[derive(Debug)]
264pub struct CrossSigningResetHandle {
265 client: Client,
266 upload_request: UploadSigningKeysRequest,
267 signatures_request: UploadSignaturesRequest,
268 auth_type: CrossSigningResetAuthType,
269 is_cancelled: Mutex<bool>,
270}
271
272impl CrossSigningResetHandle {
273 pub fn new(
275 client: Client,
276 upload_request: UploadSigningKeysRequest,
277 signatures_request: UploadSignaturesRequest,
278 auth_type: CrossSigningResetAuthType,
279 ) -> Self {
280 Self {
281 client,
282 upload_request,
283 signatures_request,
284 auth_type,
285 is_cancelled: Mutex::new(false),
286 }
287 }
288
289 pub fn auth_type(&self) -> &CrossSigningResetAuthType {
292 &self.auth_type
293 }
294
295 pub async fn auth(&self, auth: Option<AuthData>) -> Result<()> {
299 let mut upload_request = self.upload_request.clone();
300 upload_request.auth = auth;
301
302 while let Err(e) = self.client.send(upload_request.clone()).await {
303 if *self.is_cancelled.lock().await {
304 return Ok(());
305 }
306
307 match e.as_uiaa_response() {
308 Some(uiaa_info) => {
309 if uiaa_info.auth_error.is_some() {
310 return Err(e.into());
311 }
312 }
313 None => return Err(e.into()),
314 }
315 }
316
317 self.client.send(self.signatures_request.clone()).await?;
318
319 Ok(())
320 }
321
322 pub async fn cancel(&self) {
324 *self.is_cancelled.lock().await = true;
325 }
326}
327
328#[derive(Debug, Clone)]
331pub enum CrossSigningResetAuthType {
332 Uiaa(UiaaInfo),
334 OAuth(OAuthCrossSigningResetInfo),
337}
338
339impl CrossSigningResetAuthType {
340 fn new(error: &HttpError) -> Result<Option<Self>> {
341 if let Some(auth_info) = error.as_uiaa_response() {
342 if let Ok(auth_info) = OAuthCrossSigningResetInfo::from_auth_info(auth_info) {
343 Ok(Some(CrossSigningResetAuthType::OAuth(auth_info)))
344 } else {
345 Ok(Some(CrossSigningResetAuthType::Uiaa(auth_info.clone())))
346 }
347 } else {
348 Ok(None)
349 }
350 }
351}
352
353#[derive(Debug, Clone, Deserialize)]
356pub struct OAuthCrossSigningResetInfo {
357 pub approval_url: Url,
359}
360
361impl OAuthCrossSigningResetInfo {
362 fn from_auth_info(auth_info: &UiaaInfo) -> Result<Self> {
363 let parameters = serde_json::from_str::<OAuthCrossSigningResetUiaaParameters>(
364 auth_info.params.as_ref().map(|value| value.get()).unwrap_or_default(),
365 )?;
366
367 Ok(OAuthCrossSigningResetInfo { approval_url: parameters.reset.url })
368 }
369}
370
371#[derive(Debug, Deserialize)]
374struct OAuthCrossSigningResetUiaaParameters {
375 #[serde(rename = "org.matrix.cross_signing_reset")]
377 reset: OAuthCrossSigningResetUiaaResetParameter,
378}
379
380#[derive(Debug, Deserialize)]
383struct OAuthCrossSigningResetUiaaResetParameter {
384 url: Url,
386}
387
388#[derive(Debug)]
391struct DuplicateOneTimeKeyErrorMessage {
392 old_key: Curve25519PublicKey,
394 new_key: Curve25519PublicKey,
396}
397
398impl FromStr for DuplicateOneTimeKeyErrorMessage {
399 type Err = serde_json::Error;
400
401 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
402 let mut split = s.split_terminator(';');
406
407 let old_key = split
408 .next()
409 .ok_or(serde_json::Error::custom("Old key is missing in the error message"))?;
410 let new_key = split
411 .next()
412 .ok_or(serde_json::Error::custom("New key is missing in the error message"))?;
413
414 let old_key_index = old_key
417 .find("Old key:")
418 .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
419
420 let old_key = old_key[old_key_index..]
421 .trim()
422 .strip_prefix("Old key:")
423 .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
424
425 let new_key = new_key
428 .trim()
429 .strip_prefix("new key:")
430 .ok_or(serde_json::Error::custom("New key is missing the prefix"))?;
431
432 let new_key = new_key.replace("'", "\"");
435
436 let old_key: SignedKey = serde_json::from_str(old_key)?;
438 let new_key: SignedKey = serde_json::from_str(&new_key)?;
439
440 let old_key = old_key.key();
442 let new_key = new_key.key();
443
444 Ok(Self { old_key, new_key })
445 }
446}
447
448impl Client {
449 pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
450 self.base_client().olm_machine().await
451 }
452
453 pub(crate) async fn mark_request_as_sent(
454 &self,
455 request_id: &TransactionId,
456 response: impl Into<matrix_sdk_base::crypto::types::requests::AnyIncomingResponse<'_>>,
457 ) -> Result<(), matrix_sdk_base::Error> {
458 Ok(self
459 .olm_machine()
460 .await
461 .as_ref()
462 .expect(
463 "We should have an olm machine once we try to mark E2EE related requests as sent",
464 )
465 .mark_request_as_sent(request_id, response)
466 .await?)
467 }
468
469 #[instrument(skip(self, device_keys))]
475 pub(crate) async fn keys_query(
476 &self,
477 request_id: &TransactionId,
478 device_keys: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
479 ) -> Result<get_keys::v3::Response> {
480 let request = assign!(get_keys::v3::Request::new(), { device_keys });
481
482 let response = self.send(request).await?;
483 self.mark_request_as_sent(request_id, &response).await?;
484 self.encryption().update_state_after_keys_query(&response).await;
485
486 Ok(response)
487 }
488
489 pub fn upload_encrypted_file<'a, R: Read + ?Sized + 'a>(
523 &'a self,
524 reader: &'a mut R,
525 ) -> UploadEncryptedFile<'a, R> {
526 UploadEncryptedFile::new(self, reader)
527 }
528
529 pub(crate) async fn upload_encrypted_media_and_thumbnail(
532 &self,
533 data: &[u8],
534 thumbnail: Option<Thumbnail>,
535 send_progress: SharedObservable<TransmissionProgress>,
536 ) -> Result<(MediaSource, Option<(MediaSource, Box<ThumbnailInfo>)>)> {
537 let upload_thumbnail = self.upload_encrypted_thumbnail(thumbnail, send_progress.clone());
538
539 let upload_attachment = async {
540 let mut cursor = Cursor::new(data);
541 self.upload_encrypted_file(&mut cursor)
542 .with_send_progress_observable(send_progress)
543 .await
544 };
545
546 let (thumbnail, file) = try_join(upload_thumbnail, upload_attachment).await?;
547
548 Ok((MediaSource::Encrypted(Box::new(file)), thumbnail))
549 }
550
551 async fn upload_encrypted_thumbnail(
554 &self,
555 thumbnail: Option<Thumbnail>,
556 send_progress: SharedObservable<TransmissionProgress>,
557 ) -> Result<Option<(MediaSource, Box<ThumbnailInfo>)>> {
558 let Some(thumbnail) = thumbnail else {
559 return Ok(None);
560 };
561
562 let (data, _, thumbnail_info) = thumbnail.into_parts();
563 let mut cursor = Cursor::new(data);
564
565 let file = self
566 .upload_encrypted_file(&mut cursor)
567 .with_send_progress_observable(send_progress)
568 .await?;
569
570 Ok(Some((MediaSource::Encrypted(Box::new(file)), thumbnail_info)))
571 }
572
573 pub(crate) async fn claim_one_time_keys(
579 &self,
580 users: impl Iterator<Item = &UserId>,
581 ) -> Result<()> {
582 let _lock = self.locks().key_claim_lock.lock().await;
583
584 if let Some((request_id, request)) = self
585 .olm_machine()
586 .await
587 .as_ref()
588 .ok_or(Error::NoOlmMachine)?
589 .get_missing_sessions(users)
590 .await?
591 {
592 let response = self.send(request).await?;
593 self.mark_request_as_sent(&request_id, &response).await?;
594 }
595
596 Ok(())
597 }
598
599 #[instrument(skip(self, request))]
609 pub(crate) async fn keys_upload(
610 &self,
611 request_id: &TransactionId,
612 request: &upload_keys::v3::Request,
613 ) -> Result<upload_keys::v3::Response> {
614 debug!(
615 device_keys = request.device_keys.is_some(),
616 one_time_key_count = request.one_time_keys.len(),
617 "Uploading public encryption keys",
618 );
619
620 let response = self.send(request.clone()).await?;
621 self.mark_request_as_sent(request_id, &response).await?;
622
623 Ok(response)
624 }
625
626 pub(crate) async fn room_send_helper(
627 &self,
628 request: &RoomMessageRequest,
629 ) -> Result<send_message_event::v3::Response> {
630 let content = request.content.clone();
631 let txn_id = request.txn_id.clone();
632 let room_id = &request.room_id;
633
634 self.get_room(room_id)
635 .expect("Can't send a message to a room that isn't known to the store")
636 .send(*content)
637 .with_transaction_id(txn_id)
638 .await
639 }
640
641 pub(crate) async fn send_to_device(
642 &self,
643 request: &ToDeviceRequest,
644 ) -> HttpResult<ToDeviceResponse> {
645 let request = RumaToDeviceRequest::new_raw(
646 request.event_type.clone(),
647 request.txn_id.clone(),
648 request.messages.clone(),
649 );
650
651 self.send(request).await
652 }
653
654 pub(crate) async fn send_verification_request(
655 &self,
656 request: OutgoingVerificationRequest,
657 ) -> Result<()> {
658 use matrix_sdk_base::crypto::types::requests::OutgoingVerificationRequest::*;
659
660 match request {
661 ToDevice(t) => {
662 self.send_to_device(&t).await?;
663 }
664 InRoom(r) => {
665 self.room_send_helper(&r).await?;
666 }
667 }
668
669 Ok(())
670 }
671
672 pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
674 let rooms = self.joined_rooms();
675
676 let room = rooms.into_iter().find(|r| {
678 let targets = r.direct_targets();
679 targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id))
680 });
681
682 trace!(?room, "Found room");
683 room
684 }
685
686 async fn send_outgoing_request(&self, r: OutgoingRequest) -> Result<()> {
687 use matrix_sdk_base::crypto::types::requests::AnyOutgoingRequest;
688
689 match r.request() {
690 AnyOutgoingRequest::KeysQuery(request) => {
691 self.keys_query(r.request_id(), request.device_keys.clone()).await?;
692 }
693 AnyOutgoingRequest::KeysUpload(request) => {
694 let response = self.keys_upload(r.request_id(), request).await;
695
696 if let Err(e) = &response {
697 match e.as_ruma_api_error() {
698 Some(RumaApiError::ClientApi(e)) if e.status_code == 400 => {
699 if let ErrorBody::Standard { message, .. } = &e.body {
700 {
705 let already_reported = self
706 .state_store()
707 .get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded)
708 .await?
709 .is_some();
710
711 if message.starts_with("One time key") && !already_reported {
712 if let Ok(message) =
713 DuplicateOneTimeKeyErrorMessage::from_str(message)
714 {
715 error!(
716 sentry = true,
717 old_key = %message.old_key,
718 new_key = %message.new_key,
719 "Duplicate one-time keys have been uploaded"
720 );
721 } else {
722 error!(
723 sentry = true,
724 "Duplicate one-time keys have been uploaded"
725 );
726 }
727
728 self.state_store()
729 .set_kv_data(
730 StateStoreDataKey::OneTimeKeyAlreadyUploaded,
731 StateStoreDataValue::OneTimeKeyAlreadyUploaded,
732 )
733 .await?;
734 }
735 }
736 }
737 }
738 _ => {}
739 }
740
741 response?;
742 }
743 }
744 AnyOutgoingRequest::ToDeviceRequest(request) => {
745 let response = self.send_to_device(request).await?;
746 self.mark_request_as_sent(r.request_id(), &response).await?;
747 }
748 AnyOutgoingRequest::SignatureUpload(request) => {
749 let response = self.send(request.clone()).await?;
750 self.mark_request_as_sent(r.request_id(), &response).await?;
751 }
752 AnyOutgoingRequest::RoomMessage(request) => {
753 let response = self.room_send_helper(request).await?;
754 self.mark_request_as_sent(r.request_id(), &response).await?;
755 }
756 AnyOutgoingRequest::KeysClaim(request) => {
757 let response = self.send(request.clone()).await?;
758 self.mark_request_as_sent(r.request_id(), &response).await?;
759 }
760 }
761
762 Ok(())
763 }
764
765 #[instrument(skip_all)]
766 pub(crate) async fn send_outgoing_requests(&self) -> Result<()> {
767 const MAX_CONCURRENT_REQUESTS: usize = 20;
768
769 if let Err(e) = self.claim_one_time_keys(iter::empty()).await {
772 warn!("Error while claiming one-time keys {:?}", e);
773 }
774
775 let outgoing_requests = stream::iter(
776 self.olm_machine()
777 .await
778 .as_ref()
779 .ok_or(Error::NoOlmMachine)?
780 .outgoing_requests()
781 .await?,
782 )
783 .map(|r| self.send_outgoing_request(r));
784
785 let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS);
786
787 requests
788 .for_each(|r| async move {
789 match r {
790 Ok(_) => (),
791 Err(e) => warn!(error = ?e, "Error when sending out an outgoing E2EE request"),
792 }
793 })
794 .await;
795
796 Ok(())
797 }
798}
799
800#[cfg(any(feature = "testing", test))]
801impl Client {
802 pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
804 self.olm_machine().await
805 }
806}
807
808#[derive(Debug, Clone)]
812pub struct Encryption {
813 client: Client,
815}
816
817impl Encryption {
818 pub(crate) fn new(client: Client) -> Self {
819 Self { client }
820 }
821
822 pub(crate) fn settings(&self) -> EncryptionSettings {
824 self.client.inner.e2ee.encryption_settings
825 }
826
827 pub async fn ed25519_key(&self) -> Option<String> {
830 self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().ed25519.to_base64())
831 }
832
833 pub async fn curve25519_key(&self) -> Option<Curve25519PublicKey> {
835 self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().curve25519)
836 }
837
838 pub async fn device_creation_timestamp(&self) -> MilliSecondsSinceUnixEpoch {
840 match self.get_own_device().await {
841 Ok(Some(device)) => device.first_time_seen_ts(),
842 _ => MilliSecondsSinceUnixEpoch::now(),
844 }
845 }
846
847 pub(crate) async fn import_secrets_bundle(
848 &self,
849 bundle: &matrix_sdk_base::crypto::types::SecretsBundle,
850 ) -> Result<(), SecretImportError> {
851 let olm_machine = self.client.olm_machine().await;
852 let olm_machine =
853 olm_machine.as_ref().expect("This should only be called once we have an OlmMachine");
854
855 olm_machine.store().import_secrets_bundle(bundle).await
856 }
857
858 pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
863 let olm = self.client.olm_machine().await;
864 let machine = olm.as_ref()?;
865 Some(machine.cross_signing_status().await)
866 }
867
868 pub async fn has_devices_to_verify_against(&self) -> Result<bool> {
874 let olm_machine = self.client.olm_machine().await;
875 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
876 let user_id = olm_machine.user_id();
877
878 self.ensure_initial_key_query().await?;
879
880 let devices = self.get_user_devices(user_id).await?;
881
882 let ret = devices.devices().any(|device| {
883 device.is_cross_signed_by_owner()
884 && device.curve25519_key().is_some()
885 && !device.is_dehydrated()
886 });
887
888 Ok(ret)
889 }
890
891 pub async fn tracked_users(&self) -> Result<HashSet<OwnedUserId>, CryptoStoreError> {
896 if let Some(machine) = self.client.olm_machine().await.as_ref() {
897 machine.tracked_users().await
898 } else {
899 Ok(HashSet::new())
900 }
901 }
902
903 pub fn verification_state(&self) -> Subscriber<VerificationState> {
926 self.client.inner.verification_state.subscribe_reset()
927 }
928
929 pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
931 let olm = self.client.olm_machine().await;
932 let olm = olm.as_ref()?;
933 #[allow(clippy::bind_instead_of_map)]
934 olm.get_verification(user_id, flow_id).and_then(|v| match v {
935 matrix_sdk_base::crypto::Verification::SasV1(sas) => {
936 Some(SasVerification { inner: sas, client: self.client.clone() }.into())
937 }
938 #[cfg(feature = "qrcode")]
939 matrix_sdk_base::crypto::Verification::QrV1(qr) => {
940 Some(verification::QrVerification { inner: qr, client: self.client.clone() }.into())
941 }
942 _ => None,
943 })
944 }
945
946 pub async fn get_verification_request(
949 &self,
950 user_id: &UserId,
951 flow_id: impl AsRef<str>,
952 ) -> Option<VerificationRequest> {
953 let olm = self.client.olm_machine().await;
954 let olm = olm.as_ref()?;
955
956 olm.get_verification_request(user_id, flow_id)
957 .map(|r| VerificationRequest { inner: r, client: self.client.clone() })
958 }
959
960 pub async fn get_device(
994 &self,
995 user_id: &UserId,
996 device_id: &DeviceId,
997 ) -> Result<Option<Device>, CryptoStoreError> {
998 let olm = self.client.olm_machine().await;
999 let Some(machine) = olm.as_ref() else { return Ok(None) };
1000 let device = machine.get_device(user_id, device_id, None).await?;
1001 Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
1002 }
1003
1004 pub async fn get_own_device(&self) -> Result<Option<Device>, CryptoStoreError> {
1011 let olm = self.client.olm_machine().await;
1012 let Some(machine) = olm.as_ref() else { return Ok(None) };
1013 let device = machine.get_device(machine.user_id(), machine.device_id(), None).await?;
1014 Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
1015 }
1016
1017 pub async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices, Error> {
1043 let devices = self
1044 .client
1045 .olm_machine()
1046 .await
1047 .as_ref()
1048 .ok_or(Error::NoOlmMachine)?
1049 .get_user_devices(user_id, None)
1050 .await?;
1051
1052 Ok(UserDevices { inner: devices, client: self.client.clone() })
1053 }
1054
1055 pub async fn get_user_identity(
1091 &self,
1092 user_id: &UserId,
1093 ) -> Result<Option<UserIdentity>, CryptoStoreError> {
1094 let olm = self.client.olm_machine().await;
1095 let Some(olm) = olm.as_ref() else { return Ok(None) };
1096 let identity = olm.get_identity(user_id, None).await?;
1097
1098 Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1099 }
1100
1101 pub async fn request_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
1139 let olm = self.client.olm_machine().await;
1140 let Some(olm) = olm.as_ref() else { return Ok(None) };
1141
1142 let (request_id, request) = olm.query_keys_for_users(iter::once(user_id));
1143 self.client.keys_query(&request_id, request.device_keys).await?;
1144
1145 let identity = olm.get_identity(user_id, None).await?;
1146 Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1147 }
1148
1149 pub async fn devices_stream(&self) -> Result<impl Stream<Item = DeviceUpdates> + use<>> {
1180 let olm = self.client.olm_machine().await;
1181 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1182 let client = self.client.to_owned();
1183
1184 Ok(olm
1185 .store()
1186 .devices_stream()
1187 .map(move |updates| DeviceUpdates::new(client.to_owned(), updates)))
1188 }
1189
1190 pub async fn user_identities_stream(
1218 &self,
1219 ) -> Result<impl Stream<Item = IdentityUpdates> + use<>> {
1220 let olm = self.client.olm_machine().await;
1221 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1222 let client = self.client.to_owned();
1223
1224 Ok(olm
1225 .store()
1226 .user_identities_stream()
1227 .map(move |updates| IdentityUpdates::new(client.to_owned(), updates)))
1228 }
1229
1230 pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData>) -> Result<()> {
1269 let olm = self.client.olm_machine().await;
1270 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1271
1272 let CrossSigningBootstrapRequests {
1273 upload_signing_keys_req,
1274 upload_keys_req,
1275 upload_signatures_req,
1276 } = olm.bootstrap_cross_signing(false).await?;
1277
1278 let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1279 auth: auth_data,
1280 master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1281 self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1282 user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1283 });
1284
1285 if let Some(req) = upload_keys_req {
1286 self.client.send_outgoing_request(req).await?;
1287 }
1288 self.client.send(upload_signing_keys_req).await?;
1289 self.client.send(upload_signatures_req).await?;
1290
1291 Ok(())
1292 }
1293
1294 pub async fn reset_cross_signing(&self) -> Result<Option<CrossSigningResetHandle>> {
1331 let olm = self.client.olm_machine().await;
1332 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1333
1334 let CrossSigningBootstrapRequests {
1335 upload_keys_req,
1336 upload_signing_keys_req,
1337 upload_signatures_req,
1338 } = olm.bootstrap_cross_signing(true).await?;
1339
1340 let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1341 auth: None,
1342 master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1343 self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1344 user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1345 });
1346
1347 if let Some(req) = upload_keys_req {
1348 self.client.send_outgoing_request(req).await?;
1349 }
1350
1351 if let Err(error) = self.client.send(upload_signing_keys_req.clone()).await {
1352 if let Ok(Some(auth_type)) = CrossSigningResetAuthType::new(&error) {
1353 let client = self.client.clone();
1354
1355 Ok(Some(CrossSigningResetHandle::new(
1356 client,
1357 upload_signing_keys_req,
1358 upload_signatures_req,
1359 auth_type,
1360 )))
1361 } else {
1362 Err(error.into())
1363 }
1364 } else {
1365 self.client.send(upload_signatures_req).await?;
1366
1367 Ok(None)
1368 }
1369 }
1370
1371 async fn ensure_initial_key_query(&self) -> Result<()> {
1374 let olm_machine = self.client.olm_machine().await;
1375 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1376
1377 let user_id = olm_machine.user_id();
1378
1379 if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1380 let (request_id, request) = olm_machine.query_keys_for_users([olm_machine.user_id()]);
1381 self.client.keys_query(&request_id, request.device_keys).await?;
1382 }
1383
1384 Ok(())
1385 }
1386
1387 pub async fn bootstrap_cross_signing_if_needed(
1434 &self,
1435 auth_data: Option<AuthData>,
1436 ) -> Result<()> {
1437 let olm_machine = self.client.olm_machine().await;
1438 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1439 let user_id = olm_machine.user_id();
1440
1441 self.ensure_initial_key_query().await?;
1442
1443 if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1444 self.bootstrap_cross_signing(auth_data).await?;
1445 }
1446
1447 Ok(())
1448 }
1449
1450 #[cfg(not(target_family = "wasm"))]
1502 pub async fn export_room_keys(
1503 &self,
1504 path: PathBuf,
1505 passphrase: &str,
1506 predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
1507 ) -> Result<()> {
1508 let olm = self.client.olm_machine().await;
1509 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1510
1511 let keys = olm.store().export_room_keys(predicate).await?;
1512 let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1513
1514 let encrypt = move || -> Result<()> {
1515 let export: String =
1516 matrix_sdk_base::crypto::encrypt_room_key_export(&keys, &passphrase, 500_000)?;
1517 let mut file = std::fs::File::create(path)?;
1518 file.write_all(&export.into_bytes())?;
1519 Ok(())
1520 };
1521
1522 let task = tokio::task::spawn_blocking(encrypt);
1523 task.await.expect("Task join error")
1524 }
1525
1526 #[cfg(not(target_family = "wasm"))]
1564 pub async fn import_room_keys(
1565 &self,
1566 path: PathBuf,
1567 passphrase: &str,
1568 ) -> Result<RoomKeyImportResult, RoomKeyImportError> {
1569 let olm = self.client.olm_machine().await;
1570 let olm = olm.as_ref().ok_or(RoomKeyImportError::StoreClosed)?;
1571 let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1572
1573 let decrypt = move || {
1574 let file = std::fs::File::open(path)?;
1575 matrix_sdk_base::crypto::decrypt_room_key_export(file, &passphrase)
1576 };
1577
1578 let task = tokio::task::spawn_blocking(decrypt);
1579 let import = task.await.expect("Task join error")?;
1580
1581 let ret = olm.store().import_exported_room_keys(import, |_, _| {}).await?;
1582
1583 self.backups().maybe_trigger_backup();
1584
1585 Ok(ret)
1586 }
1587
1588 pub async fn room_keys_received_stream(
1619 &self,
1620 ) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<>>
1621 {
1622 let olm = self.client.olm_machine().await;
1623 let olm = olm.as_ref()?;
1624
1625 Some(olm.store().room_keys_received_stream())
1626 }
1627
1628 pub async fn historic_room_key_stream(
1659 &self,
1660 ) -> Option<impl Stream<Item = RoomKeyBundleInfo> + use<>> {
1661 let olm = self.client.olm_machine().await;
1662 let olm = olm.as_ref()?;
1663
1664 Some(olm.store().historic_room_key_stream())
1665 }
1666
1667 pub fn secret_storage(&self) -> SecretStorage {
1669 SecretStorage { client: self.client.to_owned() }
1670 }
1671
1672 pub fn backups(&self) -> Backups {
1674 Backups { client: self.client.to_owned() }
1675 }
1676
1677 pub fn recovery(&self) -> Recovery {
1679 Recovery { client: self.client.to_owned() }
1680 }
1681
1682 pub async fn enable_cross_process_store_lock(&self, lock_value: String) -> Result<(), Error> {
1694 if let Some(prev_lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1696 let prev_holder = prev_lock.lock_holder();
1697 if prev_holder == lock_value {
1698 return Ok(());
1699 }
1700 warn!(
1701 "Recreating cross-process store lock with a different holder value: \
1702 prev was {prev_holder}, new is {lock_value}"
1703 );
1704 }
1705
1706 let olm_machine = self.client.base_client().olm_machine().await;
1707 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1708
1709 let lock =
1710 olm_machine.store().create_store_lock("cross_process_lock".to_owned(), lock_value);
1711
1712 {
1717 let guard = lock.try_lock_once().await?;
1718 if guard.is_some() {
1719 olm_machine
1720 .initialize_crypto_store_generation(
1721 &self.client.locks().crypto_store_generation,
1722 )
1723 .await?;
1724 }
1725 }
1726
1727 self.client
1728 .locks()
1729 .cross_process_crypto_store_lock
1730 .set(lock)
1731 .map_err(|_| Error::BadCryptoStoreState)?;
1732
1733 Ok(())
1734 }
1735
1736 async fn on_lock_newly_acquired(&self) -> Result<u64, Error> {
1741 let olm_machine_guard = self.client.olm_machine().await;
1742 if let Some(olm_machine) = olm_machine_guard.as_ref() {
1743 let (new_gen, generation_number) = olm_machine
1744 .maintain_crypto_store_generation(&self.client.locks().crypto_store_generation)
1745 .await?;
1746 if new_gen {
1748 drop(olm_machine_guard);
1750 self.client.base_client().regenerate_olm(None).await?;
1752 }
1753 Ok(generation_number)
1754 } else {
1755 warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised");
1760 Ok(0)
1761 }
1762 }
1763
1764 pub async fn spin_lock_store(
1770 &self,
1771 max_backoff: Option<u32>,
1772 ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1773 if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1774 let guard = lock.spin_lock(max_backoff).await?;
1775
1776 let generation = self.on_lock_newly_acquired().await?;
1777
1778 Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation }))
1779 } else {
1780 Ok(None)
1781 }
1782 }
1783
1784 pub async fn try_lock_store_once(
1789 &self,
1790 ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1791 if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1792 let maybe_guard = lock.try_lock_once().await?;
1793
1794 let Some(guard) = maybe_guard else {
1795 return Ok(None);
1796 };
1797
1798 let generation = self.on_lock_newly_acquired().await?;
1799
1800 Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation }))
1801 } else {
1802 Ok(None)
1803 }
1804 }
1805
1806 #[cfg(any(test, feature = "testing"))]
1808 pub async fn uploaded_key_count(&self) -> Result<u64> {
1809 let olm_machine = self.client.olm_machine().await;
1810 let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?;
1811 Ok(olm_machine.uploaded_key_count().await?)
1812 }
1813
1814 pub(crate) async fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1838 let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite {
1842 Some(BundleReceiverTask::new(&self.client).await)
1843 } else {
1844 None
1845 };
1846
1847 let mut tasks = self.client.inner.e2ee.tasks.lock();
1848
1849 let this = self.clone();
1850
1851 tasks.setup_e2ee = Some(spawn(async move {
1852 this.update_verification_state().await;
1855
1856 if this.settings().auto_enable_cross_signing
1857 && let Err(e) = this.bootstrap_cross_signing_if_needed(auth_data).await
1858 {
1859 error!("Couldn't bootstrap cross signing {e:?}");
1860 }
1861
1862 if let Err(e) = this.backups().setup_and_resume().await {
1863 error!("Couldn't setup and resume backups {e:?}");
1864 }
1865 if let Err(e) = this.recovery().setup().await {
1866 error!("Couldn't setup and resume recovery {e:?}");
1867 }
1868 }));
1869
1870 tasks.receive_historic_room_key_bundles = bundle_receiver_task;
1871 }
1872
1873 pub async fn wait_for_e2ee_initialization_tasks(&self) {
1876 let task = self.client.inner.e2ee.tasks.lock().setup_e2ee.take();
1877
1878 if let Some(task) = task
1879 && let Err(err) = task.await
1880 {
1881 warn!("Error when initializing backups: {err}");
1882 }
1883 }
1884
1885 pub(crate) async fn ensure_device_keys_upload(&self) -> Result<()> {
1895 let olm = self.client.olm_machine().await;
1896 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1897
1898 if let Some((request_id, request)) = olm.upload_device_keys().await? {
1899 self.client.keys_upload(&request_id, &request).await?;
1900
1901 let (request_id, request) = olm.query_keys_for_users([olm.user_id()]);
1902 self.client.keys_query(&request_id, request.device_keys).await?;
1903 }
1904
1905 Ok(())
1906 }
1907
1908 pub(crate) async fn update_state_after_keys_query(&self, response: &get_keys::v3::Response) {
1909 self.recovery().update_state_after_keys_query(response).await;
1910
1911 if let Some(user_id) = self.client.user_id() {
1913 let contains_own_device = response.device_keys.contains_key(user_id);
1914
1915 if contains_own_device {
1916 self.update_verification_state().await;
1917 }
1918 }
1919 }
1920
1921 async fn update_verification_state(&self) {
1922 match self.get_own_device().await {
1923 Ok(device) => {
1924 if let Some(device) = device {
1925 let is_verified = device.is_cross_signed_by_owner();
1926
1927 if is_verified {
1928 self.client.inner.verification_state.set(VerificationState::Verified);
1929 } else {
1930 self.client.inner.verification_state.set(VerificationState::Unverified);
1931 }
1932 } else {
1933 warn!("Couldn't find out own device in the store.");
1934 self.client.inner.verification_state.set(VerificationState::Unknown);
1935 }
1936 }
1937 Err(error) => {
1938 warn!("Failed retrieving own device: {error}");
1939 self.client.inner.verification_state.set(VerificationState::Unknown);
1940 }
1941 }
1942 }
1943
1944 #[cfg(feature = "experimental-send-custom-to-device")]
1953 pub async fn encrypt_and_send_raw_to_device(
1954 &self,
1955 recipient_devices: Vec<&Device>,
1956 event_type: &str,
1957 content: Raw<AnyToDeviceEventContent>,
1958 share_strategy: CollectStrategy,
1959 ) -> Result<Vec<(OwnedUserId, OwnedDeviceId)>> {
1960 let users = recipient_devices.iter().map(|device| device.user_id());
1961
1962 self.client.claim_one_time_keys(users).await?;
1966
1967 let olm = self.client.olm_machine().await;
1968 let olm = olm.as_ref().expect("Olm machine wasn't started");
1969
1970 let (requests, withhelds) = olm
1971 .encrypt_content_for_devices(
1972 recipient_devices.into_iter().map(|d| d.deref().clone()).collect(),
1973 event_type,
1974 &content
1975 .deserialize_as::<serde_json::Value>()
1976 .expect("Deserialize as Value will always work"),
1977 share_strategy,
1978 )
1979 .await?;
1980
1981 let mut failures: Vec<(OwnedUserId, OwnedDeviceId)> = Default::default();
1982
1983 withhelds.iter().for_each(|(d, _)| {
1985 failures.push((d.user_id().to_owned(), d.device_id().to_owned()));
1986 });
1987
1988 for request in requests {
1990 let ruma_request = RumaToDeviceRequest::new_raw(
1991 request.event_type.clone(),
1992 request.txn_id.clone(),
1993 request.messages.clone(),
1994 );
1995
1996 let send_result = self
1997 .client
1998 .send_inner(ruma_request, Some(RequestConfig::short_retry()), Default::default())
1999 .await;
2000
2001 if send_result.is_err() {
2003 for (user_id, device_map) in request.messages {
2005 for device_id in device_map.keys() {
2006 match device_id {
2007 DeviceIdOrAllDevices::DeviceId(device_id) => {
2008 failures.push((user_id.clone(), device_id.to_owned()));
2009 }
2010 DeviceIdOrAllDevices::AllDevices => {
2011 }
2013 }
2014 }
2015 }
2016 }
2017 }
2018
2019 Ok(failures)
2020 }
2021}
2022
2023#[cfg(all(test, not(target_family = "wasm")))]
2024mod tests {
2025 use std::{
2026 ops::Not,
2027 str::FromStr,
2028 sync::{
2029 Arc,
2030 atomic::{AtomicBool, Ordering},
2031 },
2032 time::Duration,
2033 };
2034
2035 use matrix_sdk_test::{
2036 DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
2037 event_factory::EventFactory, test_json,
2038 };
2039 use ruma::{
2040 event_id,
2041 events::{reaction::ReactionEventContent, relation::Annotation},
2042 user_id,
2043 };
2044 use serde_json::json;
2045 use wiremock::{
2046 Mock, MockServer, Request, ResponseTemplate,
2047 matchers::{header, method, path_regex},
2048 };
2049
2050 use crate::{
2051 Client, assert_next_matches_with_timeout,
2052 config::RequestConfig,
2053 encryption::{
2054 DuplicateOneTimeKeyErrorMessage, OAuthCrossSigningResetInfo, VerificationState,
2055 },
2056 test_utils::{
2057 client::mock_matrix_session, logged_in_client, no_retry_test_client, set_client_session,
2058 },
2059 };
2060
2061 #[async_test]
2062 async fn test_reaction_sending() {
2063 let server = MockServer::start().await;
2064 let client = logged_in_client(Some(server.uri())).await;
2065
2066 let event_id = event_id!("$2:example.org");
2067
2068 Mock::given(method("GET"))
2069 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
2070 .and(header("authorization", "Bearer 1234"))
2071 .respond_with(
2072 ResponseTemplate::new(200)
2073 .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
2074 )
2075 .mount(&server)
2076 .await;
2077
2078 Mock::given(method("PUT"))
2079 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/m\.reaction/.*".to_owned()))
2080 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2081 "event_id": event_id,
2082 })))
2083 .mount(&server)
2084 .await;
2085
2086 let response = SyncResponseBuilder::default()
2087 .add_joined_room(
2088 JoinedRoomBuilder::default()
2089 .add_state_event(StateTestEvent::Member)
2090 .add_state_event(StateTestEvent::PowerLevels)
2091 .add_state_event(StateTestEvent::Encryption),
2092 )
2093 .build_sync_response();
2094
2095 client.base_client().receive_sync_response(response).await.unwrap();
2096
2097 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
2098 assert!(
2099 room.latest_encryption_state().await.expect("Getting encryption state").is_encrypted()
2100 );
2101
2102 let event_id = event_id!("$1:example.org");
2103 let reaction = ReactionEventContent::new(Annotation::new(event_id.into(), "🐈".to_owned()));
2104 room.send(reaction).await.expect("Sending the reaction should not fail");
2105
2106 room.send_raw("m.reaction", json!({})).await.expect("Sending the reaction should not fail");
2107 }
2108
2109 #[async_test]
2110 async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
2111 let server = MockServer::start().await;
2112 let client = logged_in_client(Some(server.uri())).await;
2113 let user_id = user_id!("@invited:localhost");
2117
2118 let f = EventFactory::new();
2120 let response = SyncResponseBuilder::default()
2121 .add_joined_room(
2122 JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberAdditional),
2123 )
2124 .add_global_account_data(
2125 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
2126 )
2127 .build_sync_response();
2128 client.base_client().receive_sync_response(response).await.unwrap();
2129
2130 let found_room = client.get_dm_room(user_id).expect("DM not found!");
2132 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
2133 }
2134
2135 #[async_test]
2136 async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
2137 let server = MockServer::start().await;
2138 let client = logged_in_client(Some(server.uri())).await;
2139 let user_id = user_id!("@invited:localhost");
2141
2142 let f = EventFactory::new();
2144 let response = SyncResponseBuilder::default()
2145 .add_joined_room(
2146 JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberInvite),
2147 )
2148 .add_global_account_data(
2149 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
2150 )
2151 .build_sync_response();
2152 client.base_client().receive_sync_response(response).await.unwrap();
2153
2154 let found_room = client.get_dm_room(user_id).expect("DM not found!");
2156 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
2157 }
2158
2159 #[async_test]
2160 async fn test_get_dm_room_still_finds_left_room() {
2161 let server = MockServer::start().await;
2165 let client = logged_in_client(Some(server.uri())).await;
2166 let user_id = user_id!("@invited:localhost");
2170
2171 let f = EventFactory::new();
2173 let response = SyncResponseBuilder::default()
2174 .add_joined_room(
2175 JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberLeave),
2176 )
2177 .add_global_account_data(
2178 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
2179 )
2180 .build_sync_response();
2181 client.base_client().receive_sync_response(response).await.unwrap();
2182
2183 let found_room = client.get_dm_room(user_id).expect("DM not found!");
2185 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
2186 }
2187
2188 #[cfg(feature = "sqlite")]
2189 #[async_test]
2190 async fn test_generation_counter_invalidates_olm_machine() {
2191 use matrix_sdk_base::store::RoomLoadSettings;
2194 let sqlite_path = std::env::temp_dir().join("generation_counter_sqlite.db");
2195 let session = mock_matrix_session();
2196
2197 let client1 = Client::builder()
2198 .homeserver_url("http://localhost:1234")
2199 .request_config(RequestConfig::new().disable_retry())
2200 .sqlite_store(&sqlite_path, None)
2201 .build()
2202 .await
2203 .unwrap();
2204 client1
2205 .matrix_auth()
2206 .restore_session(session.clone(), RoomLoadSettings::default())
2207 .await
2208 .unwrap();
2209
2210 let client2 = Client::builder()
2211 .homeserver_url("http://localhost:1234")
2212 .request_config(RequestConfig::new().disable_retry())
2213 .sqlite_store(sqlite_path, None)
2214 .build()
2215 .await
2216 .unwrap();
2217 client2.matrix_auth().restore_session(session, RoomLoadSettings::default()).await.unwrap();
2218
2219 let guard = client1.encryption().try_lock_store_once().await.unwrap();
2221 assert!(guard.is_none());
2222
2223 client1.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2224 client2.encryption().enable_cross_process_store_lock("client2".to_owned()).await.unwrap();
2225
2226 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2228 assert!(acquired1.is_some());
2229
2230 let initial_olm_machine =
2232 client1.olm_machine().await.clone().expect("must have an olm machine");
2233
2234 let decryption_key = matrix_sdk_base::crypto::store::types::BackupDecryptionKey::new()
2236 .expect("Can't create new recovery key");
2237 let backup_key = decryption_key.megolm_v1_public_key();
2238 backup_key.set_version("1".to_owned());
2239 initial_olm_machine
2240 .backup_machine()
2241 .save_decryption_key(Some(decryption_key.to_owned()), Some("1".to_owned()))
2242 .await
2243 .expect("Should save");
2244
2245 initial_olm_machine.backup_machine().enable_backup_v1(backup_key.clone()).await.unwrap();
2246
2247 assert!(client1.encryption().backups().are_enabled().await);
2248
2249 let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2251 assert!(acquired2.is_none());
2252
2253 drop(acquired1);
2255 tokio::time::sleep(Duration::from_millis(100)).await;
2256
2257 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2259 assert!(acquired1.is_some());
2260
2261 let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2263 assert!(initial_olm_machine.same_as(&olm_machine));
2264
2265 drop(acquired1);
2267 tokio::time::sleep(Duration::from_millis(100)).await;
2268
2269 let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2271 assert!(acquired2.is_some());
2272
2273 drop(acquired2);
2275 tokio::time::sleep(Duration::from_millis(100)).await;
2276
2277 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2279 assert!(acquired1.is_some());
2280
2281 let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2283
2284 assert!(!initial_olm_machine.same_as(&olm_machine));
2285
2286 let backup_key_new = olm_machine.backup_machine().get_backup_keys().await.unwrap();
2287 assert!(backup_key_new.decryption_key.is_some());
2288 assert_eq!(
2289 backup_key_new.decryption_key.unwrap().megolm_v1_public_key().to_base64(),
2290 backup_key.to_base64()
2291 );
2292 assert!(client1.encryption().backups().are_enabled().await);
2293 }
2294
2295 #[cfg(feature = "sqlite")]
2296 #[async_test]
2297 async fn test_generation_counter_no_spurious_invalidation() {
2298 use matrix_sdk_base::store::RoomLoadSettings;
2301 let sqlite_path =
2302 std::env::temp_dir().join("generation_counter_no_spurious_invalidations.db");
2303 let session = mock_matrix_session();
2304
2305 let client = Client::builder()
2306 .homeserver_url("http://localhost:1234")
2307 .request_config(RequestConfig::new().disable_retry())
2308 .sqlite_store(&sqlite_path, None)
2309 .build()
2310 .await
2311 .unwrap();
2312 client
2313 .matrix_auth()
2314 .restore_session(session.clone(), RoomLoadSettings::default())
2315 .await
2316 .unwrap();
2317
2318 let initial_olm_machine = client.olm_machine().await.as_ref().unwrap().clone();
2319
2320 client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2321
2322 let after_enabling_lock = client.olm_machine().await.as_ref().unwrap().clone();
2324 assert!(initial_olm_machine.same_as(&after_enabling_lock));
2325
2326 {
2327 let client2 = Client::builder()
2329 .homeserver_url("http://localhost:1234")
2330 .request_config(RequestConfig::new().disable_retry())
2331 .sqlite_store(sqlite_path, None)
2332 .build()
2333 .await
2334 .unwrap();
2335 client2
2336 .matrix_auth()
2337 .restore_session(session, RoomLoadSettings::default())
2338 .await
2339 .unwrap();
2340
2341 client2
2342 .encryption()
2343 .enable_cross_process_store_lock("client2".to_owned())
2344 .await
2345 .unwrap();
2346
2347 let guard = client2.encryption().spin_lock_store(None).await.unwrap();
2348 assert!(guard.is_some());
2349
2350 drop(guard);
2351 tokio::time::sleep(Duration::from_millis(100)).await;
2352 }
2353
2354 {
2355 let acquired = client.encryption().try_lock_store_once().await.unwrap();
2356 assert!(acquired.is_some());
2357 }
2358
2359 let after_taking_lock_first_time = client.olm_machine().await.as_ref().unwrap().clone();
2361 assert!(!initial_olm_machine.same_as(&after_taking_lock_first_time));
2362
2363 {
2364 let acquired = client.encryption().try_lock_store_once().await.unwrap();
2365 assert!(acquired.is_some());
2366 }
2367
2368 let after_taking_lock_second_time = client.olm_machine().await.as_ref().unwrap().clone();
2370 assert!(after_taking_lock_first_time.same_as(&after_taking_lock_second_time));
2371 }
2372
2373 #[async_test]
2374 async fn test_update_verification_state_is_updated_before_any_requests_happen() {
2375 let client = no_retry_test_client(None).await;
2377 let server = MockServer::start().await;
2378
2379 let mut verification_state = client.encryption().verification_state();
2381
2382 assert_next_matches_with_timeout!(verification_state, VerificationState::Unknown);
2384
2385 let keys_requested = Arc::new(AtomicBool::new(false));
2388 let inner_bool = keys_requested.clone();
2389
2390 Mock::given(method("GET"))
2391 .and(path_regex(
2392 r"/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
2393 ))
2394 .respond_with(move |_req: &Request| {
2395 inner_bool.fetch_or(true, Ordering::SeqCst);
2396 ResponseTemplate::new(200).set_body_json(json!({}))
2397 })
2398 .mount(&server)
2399 .await;
2400
2401 set_client_session(&client).await;
2403
2404 assert!(keys_requested.load(Ordering::SeqCst).not());
2406 assert_next_matches_with_timeout!(verification_state, VerificationState::Unverified);
2407 }
2408
2409 #[test]
2410 fn test_oauth_reset_info_from_uiaa_info() {
2411 let auth_info = json!({
2412 "session": "dummy",
2413 "flows": [
2414 {
2415 "stages": [
2416 "org.matrix.cross_signing_reset"
2417 ]
2418 }
2419 ],
2420 "params": {
2421 "org.matrix.cross_signing_reset": {
2422 "url": "https://example.org/account/account?action=org.matrix.cross_signing_reset"
2423 }
2424 },
2425 "msg": "To reset..."
2426 });
2427
2428 let auth_info = serde_json::from_value(auth_info)
2429 .expect("We should be able to deserialize the UiaaInfo");
2430 OAuthCrossSigningResetInfo::from_auth_info(&auth_info)
2431 .expect("We should be able to fetch the cross-signing reset info from the auth info");
2432 }
2433
2434 #[test]
2435 fn test_duplicate_one_time_key_error_parsing() {
2436 let message = concat!(
2437 r#"One time key signed_curve25519:AAAAAAAAAAA already exists. "#,
2438 r#"Old key: {"key":"dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs","#,
2439 r#""signatures":{"@example:matrix.org":{"ed25519:AAAAAAAAAA":""#,
2440 r#"Fk45zHAbrd+1j9wZXLjL2Y/+DU/Mnz9yuvlfYBOOT7qExN2Jdud+5BAuNs8nZ/caS4wTF39Kg3zQpzaGERoCBg"}}};"#,
2441 r#" new key: {'key': 'CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno', "#,
2442 r#"'signatures': {'@example:matrix.org': {'ed25519:AAAAAAAAAA': "#,
2443 r#"'BQ9Gp0p+6srF+c8OyruqKKd9R4yaub3THYAyyBB/7X/rG8BwcAqFynzl1aGyFYun4Q+087a5OSiglCXI+/kQAA'}}}"#
2444 );
2445 let message = DuplicateOneTimeKeyErrorMessage::from_str(message)
2446 .expect("We should be able to parse the error message");
2447
2448 assert_eq!(message.old_key.to_base64(), "dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs");
2449 assert_eq!(message.new_key.to_base64(), "CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno");
2450
2451 DuplicateOneTimeKeyErrorMessage::from_str("One time key already exists.")
2452 .expect_err("We shouldn't be able to parse an incomplete error message");
2453 }
2454
2455 fn devices_to_verify_against_keys_query_response(
2459 devices: Vec<serde_json::Value>,
2460 ) -> serde_json::Value {
2461 let device_keys: serde_json::Map<String, serde_json::Value> = devices
2462 .into_iter()
2463 .map(|device| (device.get("device_id").unwrap().as_str().unwrap().to_owned(), device))
2464 .collect();
2465 json!({
2466 "device_keys": {
2467 "@example:localhost": device_keys,
2468 },
2469 "master_keys": {
2470 "@example:localhost": {
2471 "keys": {
2472 "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU",
2473 },
2474 "usage": ["master"],
2475 "user_id": "@example:localhost",
2476 },
2477 },
2478 "self_signing_keys": {
2479 "@example:localhost": {
2480 "keys": {
2481 "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM",
2482 },
2483 "usage": ["self_signing"],
2484 "user_id": "@example:localhost",
2485 "signatures": {
2486 "@example:localhost": {
2487 "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "etO1bB+rCk+TQ/FcjQ8eWu/RsRNQNNQ1Ek+PD6//j8yz6igRjfvuHZaMvr/quAFrirfgExph2TdOwlDgN5bFCQ",
2488 },
2489 },
2490 },
2491 },
2492 "user_signing_keys": {
2493 "@example:localhost": {
2494 "keys": {
2495 "ed25519:CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0": "CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0",
2496 },
2497 "usage": ["user_signing"],
2498 "user_id": "@example:localhost",
2499 "signatures": {
2500 "@example:localhost": {
2501 "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "E/DFi/hQTIb/7eSB+HbCXeTLFaLjqWHzLO9GwjL1qdhfO7ew4p6YdtXSH3T2YYr1dKCPteH/4nMYVwOhww2CBg",
2502 },
2503 },
2504 },
2505 }
2506 })
2507 }
2508
2509 #[async_test]
2512 async fn test_devices_to_verify_against_no_devices() {
2515 let server = MockServer::start().await;
2516 let client = logged_in_client(Some(server.uri())).await;
2517
2518 Mock::given(method("POST"))
2519 .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2520 .respond_with(
2521 ResponseTemplate::new(200)
2522 .set_body_json(devices_to_verify_against_keys_query_response(vec![])),
2523 )
2524 .mount(&server)
2525 .await;
2526
2527 assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2528 }
2529
2530 #[async_test]
2531 async fn test_devices_to_verify_against_cross_signed() {
2534 let server = MockServer::start().await;
2535 let client = logged_in_client(Some(server.uri())).await;
2536
2537 Mock::given(method("POST"))
2538 .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2539 .respond_with(ResponseTemplate::new(200).set_body_json(
2540 devices_to_verify_against_keys_query_response(vec![
2541 json!({
2542 "algorithms": [
2543 "m.olm.v1.curve25519-aes-sha2",
2544 "m.megolm.v1.aes-sha2",
2545 ],
2546 "user_id": "@example:localhost",
2547 "device_id": "SIGNEDDEVICE",
2548 "keys": {
2549 "curve25519:SIGNEDDEVICE": "o1LqUtH/sqd3WF+BB2Qr77uw3sDmZhMOz68/IV9aHxs",
2550 "ed25519:SIGNEDDEVICE": "iVoEfMOoUqxXVMLdpZCOgvQuCrT3/kQWkBmB3Phi/lo",
2551 },
2552 "signatures": {
2553 "@example:localhost": {
2554 "ed25519:SIGNEDDEVICE": "C7yRu1fNrdD2EobVdtANMqk3LBtWtTRWrIU22xVS8/Om1kmA/luzek64R3N6JsZhYczVmZYBKhUC9kRvHHwOBg",
2555 "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "frfh2HP28GclmGvwTic00Fj4nZCvm4RlRA6U56mnD5920hOi04+L055ojzp6ybZXvC/GQYfyTHwQXlUN1nvxBA",
2556 },
2557 },
2558 })
2559 ])
2560 ))
2561 .mount(&server)
2562 .await;
2563
2564 assert!(client.encryption().has_devices_to_verify_against().await.unwrap());
2565 }
2566
2567 #[async_test]
2568 async fn test_devices_to_verify_against_dehydrated_and_unsigned() {
2571 let server = MockServer::start().await;
2572 let client = logged_in_client(Some(server.uri())).await;
2573 let user_id = client.user_id().unwrap();
2574 let olm_machine = client.olm_machine().await;
2575 let olm_machine = olm_machine.as_ref().unwrap();
2576
2577 Mock::given(method("POST"))
2578 .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2579 .respond_with(ResponseTemplate::new(200).set_body_json(
2580 devices_to_verify_against_keys_query_response(vec![
2581 json!({
2582 "algorithms": [
2583 "m.olm.v1.curve25519-aes-sha2",
2584 "m.megolm.v1.aes-sha2",
2585 ],
2586 "user_id": "@example:localhost",
2587 "device_id": "DEHYDRATEDDEVICE",
2588 "keys": {
2589 "curve25519:DEHYDRATEDDEVICE": "XOn5VguAgokZ3p9mBz2yOB395fn6j75G8jIPcXEWQGY",
2590 "ed25519:DEHYDRATEDDEVICE": "4GG5xmBT7z4rgUgmWNlKZ+ABE3QlGgTorF+luCnKfYI",
2591 },
2592 "dehydrated": true,
2593 "signatures": {
2594 "@example:localhost": {
2595 "ed25519:DEHYDRATEDDEVICE": "+OMasB7nzVlMV+zRDxkh4h8h/Q0bY42P1SPv7X2IURIelT5G+d+AYSmg30N4maphxEDBqt/vI8/lIr71exc3Dg",
2596 "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "8DzynAgbYgXX1Md5d4Vw91Zstpoi4dpG7levFeVhi4psCAWuBnV76Qu1s2TGjQQ0CLDXEqcxxuX9X4eUK5TGCg",
2597 },
2598 },
2599 }),
2600 json!({
2601 "algorithms": [
2602 "m.olm.v1.curve25519-aes-sha2",
2603 "m.megolm.v1.aes-sha2",
2604 ],
2605 "user_id": "@example:localhost",
2606 "device_id": "UNSIGNEDDEVICE",
2607 "keys": {
2608 "curve25519:UNSIGNEDDEVICE": "mMby6NpprkHxj+ONfO9Z5lBqVUHJBMkrPFSNJhogBkg",
2609 "ed25519:UNSIGNEDDEVICE": "Zifq39ZDrlIaSRf0Hh22owEqXCPE+1JSSgs6LDlubwQ",
2610 },
2611 "signatures": {
2612 "@example:localhost": {
2613 "ed25519:UNSIGNEDDEVICE": "+L29RoDKoTufPGm/Bae65KHno7Z1H7GYhxSKpB4RQZRS7NrR29AMW1PVhEsIozYuDVEFuMZ0L8H3dlcaHxagBA",
2614 },
2615 },
2616 }),
2617 ])
2618 ))
2619 .mount(&server)
2620 .await;
2621
2622 let (request_id, request) = olm_machine.query_keys_for_users([user_id]);
2623 client.keys_query(&request_id, request.device_keys).await.unwrap();
2624
2625 assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2626 }
2627}