1#![doc = include_str!("../docs/encryption.md")]
17#![cfg_attr(target_family = "wasm", allow(unused_imports))]
18
19#[cfg(feature = "experimental-send-custom-to-device")]
20use std::ops::Deref;
21use std::{
22 collections::{BTreeMap, HashSet},
23 io::{Cursor, Read, Write},
24 iter,
25 path::PathBuf,
26 str::FromStr,
27 sync::Arc,
28 time::Duration,
29};
30
31use eyeball::{SharedObservable, Subscriber};
32use futures_core::Stream;
33use futures_util::{
34 future::try_join,
35 stream::{self, StreamExt},
36};
37#[cfg(feature = "experimental-send-custom-to-device")]
38use matrix_sdk_base::crypto::CollectStrategy;
39use matrix_sdk_base::{
40 StateStoreDataKey, StateStoreDataValue,
41 cross_process_lock::CrossProcessLockError,
42 crypto::{
43 CrossSigningBootstrapRequests, OlmMachine,
44 store::types::{RoomKeyBundleInfo, RoomKeyInfo},
45 types::{
46 SignedKey,
47 requests::{
48 OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
49 },
50 },
51 },
52 sleep::sleep,
53};
54use matrix_sdk_common::{executor::spawn, locks::Mutex as StdMutex};
55use ruma::{
56 DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, TransactionId, UserId,
57 api::client::{
58 error::{ErrorBody, StandardErrorBody},
59 keys::{
60 get_keys, upload_keys, upload_signatures::v3::Request as UploadSignaturesRequest,
61 upload_signing_keys::v3::Request as UploadSigningKeysRequest,
62 },
63 message::send_message_event,
64 to_device::send_event_to_device::v3::{
65 Request as RumaToDeviceRequest, Response as ToDeviceResponse,
66 },
67 uiaa::{AuthData, AuthType, OAuthParams, UiaaInfo},
68 },
69 assign,
70 events::room::{
71 MediaSource, ThumbnailInfo,
72 member::{MembershipChange, OriginalSyncRoomMemberEvent},
73 },
74};
75#[cfg(feature = "experimental-send-custom-to-device")]
76use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices};
77use serde::{Deserialize, de::Error as _};
78use tasks::BundleReceiverTask;
79use tokio::sync::{Mutex, RwLockReadGuard};
80use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
81use tracing::{debug, error, instrument, warn};
82use url::Url;
83use vodozemac::Curve25519PublicKey;
84
85use self::{
86 backups::{Backups, types::BackupClientState},
87 futures::UploadEncryptedFile,
88 identities::{Device, DeviceUpdates, IdentityUpdates, UserDevices, UserIdentity},
89 recovery::{Recovery, RecoveryState},
90 secret_storage::SecretStorage,
91 tasks::{BackupDownloadTask, BackupUploadingTask, ClientTasks},
92 verification::{SasVerification, Verification, VerificationRequest},
93};
94use crate::{
95 Client, Error, HttpError, Result, Room, RumaApiError, TransmissionProgress,
96 attachment::Thumbnail,
97 client::{ClientInner, WeakClient},
98 cross_process_lock::CrossProcessLockGuard,
99 error::HttpResult,
100};
101
102pub mod backups;
103pub mod futures;
104pub mod identities;
105pub mod recovery;
106pub mod secret_storage;
107pub(crate) mod tasks;
108pub mod verification;
109
110pub use matrix_sdk_base::crypto::{
111 CrossSigningStatus, CryptoStoreError, DecryptorError, EventError, KeyExportError, LocalTrust,
112 MediaEncryptionInfo, MegolmError, OlmError, RoomKeyImportResult, SecretImportError,
113 SessionCreationError, SignatureError, VERSION,
114 olm::{
115 SessionCreationError as MegolmSessionCreationError,
116 SessionExportError as OlmSessionExportError,
117 },
118 vodozemac,
119};
120use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
121
122#[cfg(feature = "experimental-send-custom-to-device")]
123use crate::config::RequestConfig;
124pub use crate::error::RoomKeyImportError;
125
126pub(crate) struct EncryptionData {
128 pub tasks: StdMutex<ClientTasks>,
131
132 pub encryption_settings: EncryptionSettings,
134
135 pub backup_state: BackupClientState,
137
138 pub recovery_state: SharedObservable<RecoveryState>,
140}
141
142impl EncryptionData {
143 pub fn new(encryption_settings: EncryptionSettings) -> Self {
144 Self {
145 encryption_settings,
146
147 tasks: StdMutex::new(Default::default()),
148 backup_state: Default::default(),
149 recovery_state: Default::default(),
150 }
151 }
152
153 pub fn initialize_tasks(&self, client: &Arc<ClientInner>) {
154 let weak_client = WeakClient::from_inner(client);
155
156 let mut tasks = self.tasks.lock();
157 tasks.upload_room_keys = Some(BackupUploadingTask::new(weak_client.clone()));
158
159 if self.encryption_settings.backup_download_strategy
160 == BackupDownloadStrategy::AfterDecryptionFailure
161 {
162 tasks.download_room_keys = Some(BackupDownloadTask::new(weak_client));
163 }
164 }
165
166 pub fn initialize_recovery_state_update_task(&self, client: &Client) {
172 let mut guard = self.tasks.lock();
173
174 let future = Recovery::update_state_after_backup_state_change(client);
175 let join_handle = spawn(future);
176
177 guard.update_recovery_state_after_backup = Some(join_handle);
178 }
179}
180
181#[derive(Clone, Copy, Debug, Default)]
183pub struct EncryptionSettings {
184 pub auto_enable_cross_signing: bool,
190
191 pub backup_download_strategy: BackupDownloadStrategy,
196
197 pub auto_enable_backups: bool,
199}
200
201#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
203#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
204pub enum BackupDownloadStrategy {
205 OneShot,
216
217 AfterDecryptionFailure,
219
220 #[default]
225 Manual,
226}
227
228#[derive(Clone, Copy, Debug, Eq, PartialEq)]
233pub enum VerificationState {
234 Unknown,
236 Verified,
239 Unverified,
241}
242
243#[derive(Debug)]
245pub struct CrossProcessLockStoreGuardWithGeneration {
246 _guard: CrossProcessLockGuard,
247 generation: u64,
248}
249
250impl CrossProcessLockStoreGuardWithGeneration {
251 pub fn generation(&self) -> u64 {
253 self.generation
254 }
255}
256
257#[derive(Debug)]
268pub struct CrossSigningResetHandle {
269 client: Client,
270 upload_request: UploadSigningKeysRequest,
271 signatures_request: UploadSignaturesRequest,
272 auth_type: CrossSigningResetAuthType,
273 is_cancelled: Mutex<bool>,
274}
275
276impl CrossSigningResetHandle {
277 pub fn new(
279 client: Client,
280 upload_request: UploadSigningKeysRequest,
281 signatures_request: UploadSignaturesRequest,
282 auth_type: CrossSigningResetAuthType,
283 ) -> Self {
284 Self {
285 client,
286 upload_request,
287 signatures_request,
288 auth_type,
289 is_cancelled: Mutex::new(false),
290 }
291 }
292
293 pub fn auth_type(&self) -> &CrossSigningResetAuthType {
296 &self.auth_type
297 }
298
299 pub async fn auth(&self, auth: Option<AuthData>) -> Result<()> {
303 const RETRY_EVERY: Duration = Duration::from_millis(500);
305
306 const TIMEOUT: Duration = Duration::from_mins(2);
308
309 tokio::time::timeout(TIMEOUT, async {
310 let mut upload_request = self.upload_request.clone();
311 upload_request.auth = auth;
312
313 debug!(
314 "Repeatedly PUTting to keys/device_signing/upload until it works \
315 or we hit a permanent failure."
316 );
317 while let Err(e) = self.client.send(upload_request.clone()).await {
318 if *self.is_cancelled.lock().await {
319 return Ok(());
320 }
321
322 match e.as_uiaa_response() {
323 Some(uiaa_info) => {
324 if uiaa_info.auth_error.is_some() {
325 return Err(e.into());
326 }
327 }
328 None => return Err(e.into()),
329 }
330
331 debug!(
332 "PUT to keys/device_signing/upload failed with 401. Retrying after \
333 a short delay."
334 );
335 sleep(RETRY_EVERY).await;
336 }
337
338 self.client.send(self.signatures_request.clone()).await?;
339
340 Ok(())
341 })
342 .await
343 .unwrap_or_else(|_| {
344 warn!("Timed out waiting for keys/device_signing/upload to succeed.");
345 Err(Error::Timeout)
346 })
347 }
348
349 pub async fn cancel(&self) {
351 *self.is_cancelled.lock().await = true;
352 }
353}
354
355#[derive(Debug, Clone)]
358pub enum CrossSigningResetAuthType {
359 Uiaa(UiaaInfo),
361 OAuth(OAuthCrossSigningResetInfo),
364}
365
366impl CrossSigningResetAuthType {
367 fn new(error: &HttpError) -> Result<Option<Self>> {
368 if let Some(auth_info) = error.as_uiaa_response() {
369 if let Ok(Some(auth_info)) = OAuthCrossSigningResetInfo::from_auth_info(auth_info) {
370 Ok(Some(CrossSigningResetAuthType::OAuth(auth_info)))
371 } else {
372 Ok(Some(CrossSigningResetAuthType::Uiaa(auth_info.clone())))
373 }
374 } else {
375 Ok(None)
376 }
377 }
378}
379
380#[derive(Debug, Clone, Deserialize)]
383pub struct OAuthCrossSigningResetInfo {
384 pub approval_url: Url,
386}
387
388impl OAuthCrossSigningResetInfo {
389 fn from_auth_info(auth_info: &UiaaInfo) -> Result<Option<Self>> {
390 let Some(parameters) = auth_info.params::<OAuthParams>(&AuthType::OAuth)? else {
391 return Ok(None);
392 };
393
394 Ok(Some(OAuthCrossSigningResetInfo { approval_url: parameters.url.as_str().try_into()? }))
395 }
396}
397
398#[derive(Clone, Debug)]
401pub struct DuplicateOneTimeKeyErrorMessage {
402 pub old_key: Curve25519PublicKey,
404 pub new_key: Curve25519PublicKey,
406}
407
408impl FromStr for DuplicateOneTimeKeyErrorMessage {
409 type Err = serde_json::Error;
410
411 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
412 let mut split = s.split_terminator(';');
416
417 let old_key = split
418 .next()
419 .ok_or(serde_json::Error::custom("Old key is missing in the error message"))?;
420 let new_key = split
421 .next()
422 .ok_or(serde_json::Error::custom("New key is missing in the error message"))?;
423
424 let old_key_index = old_key
427 .find("Old key:")
428 .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
429
430 let old_key = old_key[old_key_index..]
431 .trim()
432 .strip_prefix("Old key:")
433 .ok_or(serde_json::Error::custom("Old key is missing the prefix"))?;
434
435 let new_key = new_key
438 .trim()
439 .strip_prefix("new key:")
440 .ok_or(serde_json::Error::custom("New key is missing the prefix"))?;
441
442 let new_key = new_key.replace("'", "\"");
445
446 let old_key: SignedKey = serde_json::from_str(old_key)?;
448 let new_key: SignedKey = serde_json::from_str(&new_key)?;
449
450 let old_key = old_key.key();
452 let new_key = new_key.key();
453
454 Ok(Self { old_key, new_key })
455 }
456}
457
458impl Client {
459 pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
460 self.base_client().olm_machine().await
461 }
462
463 pub(crate) async fn mark_request_as_sent(
464 &self,
465 request_id: &TransactionId,
466 response: impl Into<matrix_sdk_base::crypto::types::requests::AnyIncomingResponse<'_>>,
467 ) -> Result<(), matrix_sdk_base::Error> {
468 Ok(self
469 .olm_machine()
470 .await
471 .as_ref()
472 .expect(
473 "We should have an olm machine once we try to mark E2EE related requests as sent",
474 )
475 .mark_request_as_sent(request_id, response)
476 .await?)
477 }
478
479 #[instrument(skip(self, device_keys))]
485 pub(crate) async fn keys_query(
486 &self,
487 request_id: &TransactionId,
488 device_keys: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
489 ) -> Result<get_keys::v3::Response> {
490 let request = assign!(get_keys::v3::Request::new(), { device_keys });
491
492 let response = self.send(request).await?;
493 self.mark_request_as_sent(request_id, &response).await?;
494 self.encryption().update_state_after_keys_query(&response).await;
495
496 Ok(response)
497 }
498
499 pub fn upload_encrypted_file<'a, R: Read + ?Sized + 'a>(
533 &'a self,
534 reader: &'a mut R,
535 ) -> UploadEncryptedFile<'a, R> {
536 UploadEncryptedFile::new(self, reader)
537 }
538
539 pub(crate) async fn upload_encrypted_media_and_thumbnail(
542 &self,
543 data: &[u8],
544 thumbnail: Option<Thumbnail>,
545 send_progress: SharedObservable<TransmissionProgress>,
546 ) -> Result<(MediaSource, Option<(MediaSource, Box<ThumbnailInfo>)>)> {
547 let upload_thumbnail = self.upload_encrypted_thumbnail(thumbnail, send_progress.clone());
548
549 let upload_attachment = async {
550 let mut cursor = Cursor::new(data);
551 self.upload_encrypted_file(&mut cursor)
552 .with_send_progress_observable(send_progress)
553 .await
554 };
555
556 let (thumbnail, file) = try_join(upload_thumbnail, upload_attachment).await?;
557
558 Ok((MediaSource::Encrypted(Box::new(file)), thumbnail))
559 }
560
561 async fn upload_encrypted_thumbnail(
564 &self,
565 thumbnail: Option<Thumbnail>,
566 send_progress: SharedObservable<TransmissionProgress>,
567 ) -> Result<Option<(MediaSource, Box<ThumbnailInfo>)>> {
568 let Some(thumbnail) = thumbnail else {
569 return Ok(None);
570 };
571
572 let (data, _, thumbnail_info) = thumbnail.into_parts();
573 let mut cursor = Cursor::new(data);
574
575 let file = self
576 .upload_encrypted_file(&mut cursor)
577 .with_send_progress_observable(send_progress)
578 .await?;
579
580 Ok(Some((MediaSource::Encrypted(Box::new(file)), thumbnail_info)))
581 }
582
583 pub(crate) async fn claim_one_time_keys(
589 &self,
590 users: impl Iterator<Item = &UserId>,
591 ) -> Result<()> {
592 let _lock = self.locks().key_claim_lock.lock().await;
593
594 if let Some((request_id, request)) = self
595 .olm_machine()
596 .await
597 .as_ref()
598 .ok_or(Error::NoOlmMachine)?
599 .get_missing_sessions(users)
600 .await?
601 {
602 let response = self.send(request).await?;
603 self.mark_request_as_sent(&request_id, &response).await?;
604 }
605
606 Ok(())
607 }
608
609 #[instrument(skip(self, request))]
619 pub(crate) async fn keys_upload(
620 &self,
621 request_id: &TransactionId,
622 request: &upload_keys::v3::Request,
623 ) -> Result<upload_keys::v3::Response> {
624 debug!(
625 device_keys = request.device_keys.is_some(),
626 one_time_key_count = request.one_time_keys.len(),
627 "Uploading public encryption keys",
628 );
629
630 let response = self.send(request.clone()).await?;
631 self.mark_request_as_sent(request_id, &response).await?;
632
633 Ok(response)
634 }
635
636 pub(crate) async fn room_send_helper(
637 &self,
638 request: &RoomMessageRequest,
639 ) -> Result<send_message_event::v3::Response> {
640 let content = request.content.clone();
641 let txn_id = request.txn_id.clone();
642 let room_id = &request.room_id;
643
644 self.get_room(room_id)
645 .expect("Can't send a message to a room that isn't known to the store")
646 .send(*content)
647 .with_transaction_id(txn_id)
648 .await
649 .map(|result| result.response)
650 }
651
652 pub(crate) async fn send_to_device(
653 &self,
654 request: &ToDeviceRequest,
655 ) -> HttpResult<ToDeviceResponse> {
656 let request = RumaToDeviceRequest::new_raw(
657 request.event_type.clone(),
658 request.txn_id.clone(),
659 request.messages.clone(),
660 );
661
662 self.send(request).await
663 }
664
665 pub(crate) async fn send_verification_request(
666 &self,
667 request: OutgoingVerificationRequest,
668 ) -> Result<()> {
669 use matrix_sdk_base::crypto::types::requests::OutgoingVerificationRequest::*;
670
671 match request {
672 ToDevice(t) => {
673 self.send_to_device(&t).await?;
674 }
675 InRoom(r) => {
676 self.room_send_helper(&r).await?;
677 }
678 }
679
680 Ok(())
681 }
682
683 async fn send_outgoing_request(&self, r: OutgoingRequest) -> Result<()> {
684 use matrix_sdk_base::crypto::types::requests::AnyOutgoingRequest;
685
686 match r.request() {
687 AnyOutgoingRequest::KeysQuery(request) => {
688 self.keys_query(r.request_id(), request.device_keys.clone()).await?;
689 }
690 AnyOutgoingRequest::KeysUpload(request) => {
691 let response = self.keys_upload(r.request_id(), request).await;
692
693 if let Err(e) = &response {
694 match e.as_ruma_api_error() {
695 Some(RumaApiError::ClientApi(e)) if e.status_code == 400 => {
696 if let ErrorBody::Standard(StandardErrorBody { message, .. }) = &e.body
697 {
698 {
703 let already_reported = self
704 .state_store()
705 .get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded)
706 .await?
707 .is_some();
708
709 if message.starts_with("One time key") && !already_reported {
710 let error_message =
711 DuplicateOneTimeKeyErrorMessage::from_str(message);
712
713 if let Ok(message) = &error_message {
714 error!(
715 sentry = true,
716 old_key = %message.old_key,
717 new_key = %message.new_key,
718 "Duplicate one-time keys have been uploaded"
719 );
720 } else {
721 error!(
722 sentry = true,
723 "Duplicate one-time keys have been uploaded"
724 );
725 }
726
727 self.state_store()
728 .set_kv_data(
729 StateStoreDataKey::OneTimeKeyAlreadyUploaded,
730 StateStoreDataValue::OneTimeKeyAlreadyUploaded,
731 )
732 .await?;
733
734 if let Err(e) = self
735 .inner
736 .duplicate_key_upload_error_sender
737 .send(error_message.ok())
738 {
739 error!(
740 "Failed to dispatch duplicate key upload error notification: {}",
741 e
742 );
743 }
744 }
745 }
746 }
747 }
748 _ => {}
749 }
750
751 response?;
752 }
753 }
754 AnyOutgoingRequest::ToDeviceRequest(request) => {
755 let response = self.send_to_device(request).await?;
756 self.mark_request_as_sent(r.request_id(), &response).await?;
757 }
758 AnyOutgoingRequest::SignatureUpload(request) => {
759 let response = self.send(request.clone()).await?;
760 self.mark_request_as_sent(r.request_id(), &response).await?;
761 }
762 AnyOutgoingRequest::RoomMessage(request) => {
763 let response = self.room_send_helper(request).await?;
764 self.mark_request_as_sent(r.request_id(), &response).await?;
765 }
766 AnyOutgoingRequest::KeysClaim(request) => {
767 let response = self.send(request.clone()).await?;
768 self.mark_request_as_sent(r.request_id(), &response).await?;
769 }
770 }
771
772 Ok(())
773 }
774
775 #[instrument(skip_all)]
776 pub(crate) async fn send_outgoing_requests(&self) -> Result<()> {
777 const MAX_CONCURRENT_REQUESTS: usize = 20;
778
779 if let Err(e) = self.claim_one_time_keys(iter::empty()).await {
782 warn!("Error while claiming one-time keys {:?}", e);
783 }
784
785 let outgoing_requests = stream::iter(
786 self.olm_machine()
787 .await
788 .as_ref()
789 .ok_or(Error::NoOlmMachine)?
790 .outgoing_requests()
791 .await?,
792 )
793 .map(|r| self.send_outgoing_request(r));
794
795 let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS);
796
797 requests
798 .for_each(|r| async move {
799 match r {
800 Ok(_) => (),
801 Err(e) => warn!(error = ?e, "Error when sending out an outgoing E2EE request"),
802 }
803 })
804 .await;
805
806 Ok(())
807 }
808}
809
810#[cfg(any(feature = "testing", test))]
811impl Client {
812 pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
814 self.olm_machine().await
815 }
816
817 pub fn abort_bundle_receiver_task(&self) {
819 let tasks = self.inner.e2ee.tasks.lock();
820 if let Some(task) = tasks.receive_historic_room_key_bundles.as_ref() {
821 task.abort()
822 }
823 }
824}
825
826#[derive(Debug, Clone)]
830pub struct Encryption {
831 client: Client,
833}
834
835impl Encryption {
836 pub(crate) fn new(client: Client) -> Self {
837 Self { client }
838 }
839
840 pub(crate) fn settings(&self) -> EncryptionSettings {
842 self.client.inner.e2ee.encryption_settings
843 }
844
845 pub async fn ed25519_key(&self) -> Option<String> {
848 self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().ed25519.to_base64())
849 }
850
851 pub async fn curve25519_key(&self) -> Option<Curve25519PublicKey> {
853 self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().curve25519)
854 }
855
856 pub async fn device_creation_timestamp(&self) -> MilliSecondsSinceUnixEpoch {
858 match self.get_own_device().await {
859 Ok(Some(device)) => device.first_time_seen_ts(),
860 _ => MilliSecondsSinceUnixEpoch::now(),
862 }
863 }
864
865 pub(crate) async fn import_secrets_bundle(
866 &self,
867 bundle: &matrix_sdk_base::crypto::types::SecretsBundle,
868 ) -> Result<(), SecretImportError> {
869 let olm_machine = self.client.olm_machine().await;
870 let olm_machine =
871 olm_machine.as_ref().expect("This should only be called once we have an OlmMachine");
872
873 olm_machine.store().import_secrets_bundle(bundle).await
874 }
875
876 pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
881 let olm = self.client.olm_machine().await;
882 let machine = olm.as_ref()?;
883 Some(machine.cross_signing_status().await)
884 }
885
886 pub async fn has_devices_to_verify_against(&self) -> Result<bool> {
892 let olm_machine = self.client.olm_machine().await;
893 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
894 let user_id = olm_machine.user_id();
895
896 self.ensure_initial_key_query().await?;
897
898 let devices = self.get_user_devices(user_id).await?;
899
900 let ret = devices.devices().any(|device| {
901 device.is_cross_signed_by_owner()
902 && device.curve25519_key().is_some()
903 && !device.is_dehydrated()
904 });
905
906 Ok(ret)
907 }
908
909 pub async fn tracked_users(&self) -> Result<HashSet<OwnedUserId>, CryptoStoreError> {
914 if let Some(machine) = self.client.olm_machine().await.as_ref() {
915 machine.tracked_users().await
916 } else {
917 Ok(HashSet::new())
918 }
919 }
920
921 pub fn verification_state(&self) -> Subscriber<VerificationState> {
944 self.client.inner.verification_state.subscribe_reset()
945 }
946
947 pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
949 let olm = self.client.olm_machine().await;
950 let olm = olm.as_ref()?;
951 #[allow(clippy::bind_instead_of_map)]
952 olm.get_verification(user_id, flow_id).and_then(|v| match v {
953 matrix_sdk_base::crypto::Verification::SasV1(sas) => {
954 Some(SasVerification { inner: sas, client: self.client.clone() }.into())
955 }
956 #[cfg(feature = "qrcode")]
957 matrix_sdk_base::crypto::Verification::QrV1(qr) => {
958 Some(verification::QrVerification { inner: qr, client: self.client.clone() }.into())
959 }
960 _ => None,
961 })
962 }
963
964 pub async fn get_verification_request(
967 &self,
968 user_id: &UserId,
969 flow_id: impl AsRef<str>,
970 ) -> Option<VerificationRequest> {
971 let olm = self.client.olm_machine().await;
972 let olm = olm.as_ref()?;
973
974 olm.get_verification_request(user_id, flow_id)
975 .map(|r| VerificationRequest { inner: r, client: self.client.clone() })
976 }
977
978 pub async fn get_device(
1012 &self,
1013 user_id: &UserId,
1014 device_id: &DeviceId,
1015 ) -> Result<Option<Device>, CryptoStoreError> {
1016 let olm = self.client.olm_machine().await;
1017 let Some(machine) = olm.as_ref() else { return Ok(None) };
1018 let device = machine.get_device(user_id, device_id, None).await?;
1019 Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
1020 }
1021
1022 pub async fn get_own_device(&self) -> Result<Option<Device>, CryptoStoreError> {
1029 let olm = self.client.olm_machine().await;
1030 let Some(machine) = olm.as_ref() else { return Ok(None) };
1031 let device = machine.get_device(machine.user_id(), machine.device_id(), None).await?;
1032 Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
1033 }
1034
1035 pub async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices, Error> {
1061 let devices = self
1062 .client
1063 .olm_machine()
1064 .await
1065 .as_ref()
1066 .ok_or(Error::NoOlmMachine)?
1067 .get_user_devices(user_id, None)
1068 .await?;
1069
1070 Ok(UserDevices { inner: devices, client: self.client.clone() })
1071 }
1072
1073 pub async fn get_user_identity(
1109 &self,
1110 user_id: &UserId,
1111 ) -> Result<Option<UserIdentity>, CryptoStoreError> {
1112 let olm = self.client.olm_machine().await;
1113 let Some(olm) = olm.as_ref() else { return Ok(None) };
1114 let identity = olm.get_identity(user_id, None).await?;
1115
1116 Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1117 }
1118
1119 pub async fn request_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
1157 let olm = self.client.olm_machine().await;
1158 let Some(olm) = olm.as_ref() else { return Ok(None) };
1159
1160 let (request_id, request) = olm.query_keys_for_users(iter::once(user_id));
1161 self.client.keys_query(&request_id, request.device_keys).await?;
1162
1163 let identity = olm.get_identity(user_id, None).await?;
1164 Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1165 }
1166
1167 pub async fn devices_stream(&self) -> Result<impl Stream<Item = DeviceUpdates> + use<>> {
1198 let olm = self.client.olm_machine().await;
1199 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1200 let client = self.client.to_owned();
1201
1202 Ok(olm
1203 .store()
1204 .devices_stream()
1205 .map(move |updates| DeviceUpdates::new(client.to_owned(), updates)))
1206 }
1207
1208 pub async fn user_identities_stream(
1236 &self,
1237 ) -> Result<impl Stream<Item = IdentityUpdates> + use<>> {
1238 let olm = self.client.olm_machine().await;
1239 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1240 let client = self.client.to_owned();
1241
1242 Ok(olm
1243 .store()
1244 .user_identities_stream()
1245 .map(move |updates| IdentityUpdates::new(client.to_owned(), updates)))
1246 }
1247
1248 pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData>) -> Result<()> {
1287 let olm = self.client.olm_machine().await;
1288 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1289
1290 let CrossSigningBootstrapRequests {
1291 upload_signing_keys_req,
1292 upload_keys_req,
1293 upload_signatures_req,
1294 } = olm.bootstrap_cross_signing(false).await?;
1295
1296 let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1297 auth: auth_data,
1298 master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1299 self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1300 user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1301 });
1302
1303 if let Some(req) = upload_keys_req {
1304 self.client.send_outgoing_request(req).await?;
1305 }
1306 self.client.send(upload_signing_keys_req).await?;
1307 self.client.send(upload_signatures_req).await?;
1308
1309 Ok(())
1310 }
1311
1312 pub async fn reset_cross_signing(&self) -> Result<Option<CrossSigningResetHandle>> {
1349 let olm = self.client.olm_machine().await;
1350 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1351
1352 let CrossSigningBootstrapRequests {
1353 upload_keys_req,
1354 upload_signing_keys_req,
1355 upload_signatures_req,
1356 } = olm.bootstrap_cross_signing(true).await?;
1357
1358 let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1359 auth: None,
1360 master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1361 self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1362 user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1363 });
1364
1365 if let Some(req) = upload_keys_req {
1366 self.client.send_outgoing_request(req).await?;
1367 }
1368
1369 if let Err(error) = self.client.send(upload_signing_keys_req.clone()).await {
1370 if let Ok(Some(auth_type)) = CrossSigningResetAuthType::new(&error) {
1371 let client = self.client.clone();
1372
1373 Ok(Some(CrossSigningResetHandle::new(
1374 client,
1375 upload_signing_keys_req,
1376 upload_signatures_req,
1377 auth_type,
1378 )))
1379 } else {
1380 Err(error.into())
1381 }
1382 } else {
1383 self.client.send(upload_signatures_req).await?;
1384
1385 Ok(None)
1386 }
1387 }
1388
1389 async fn ensure_initial_key_query(&self) -> Result<()> {
1392 let olm_machine = self.client.olm_machine().await;
1393 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1394
1395 let user_id = olm_machine.user_id();
1396
1397 if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1398 let (request_id, request) = olm_machine.query_keys_for_users([olm_machine.user_id()]);
1399 self.client.keys_query(&request_id, request.device_keys).await?;
1400 }
1401
1402 Ok(())
1403 }
1404
1405 pub async fn bootstrap_cross_signing_if_needed(
1452 &self,
1453 auth_data: Option<AuthData>,
1454 ) -> Result<()> {
1455 let olm_machine = self.client.olm_machine().await;
1456 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1457 let user_id = olm_machine.user_id();
1458
1459 self.ensure_initial_key_query().await?;
1460
1461 if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1462 self.bootstrap_cross_signing(auth_data).await?;
1463 }
1464
1465 Ok(())
1466 }
1467
1468 #[cfg(not(target_family = "wasm"))]
1520 pub async fn export_room_keys(
1521 &self,
1522 path: PathBuf,
1523 passphrase: &str,
1524 predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
1525 ) -> Result<()> {
1526 let olm = self.client.olm_machine().await;
1527 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1528
1529 let keys = olm.store().export_room_keys(predicate).await?;
1530 let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1531
1532 let encrypt = move || -> Result<()> {
1533 let export: String =
1534 matrix_sdk_base::crypto::encrypt_room_key_export(&keys, &passphrase, 500_000)?;
1535 let mut file = std::fs::File::create(path)?;
1536 file.write_all(&export.into_bytes())?;
1537 Ok(())
1538 };
1539
1540 let task = tokio::task::spawn_blocking(encrypt);
1541 task.await.expect("Task join error")
1542 }
1543
1544 #[cfg(not(target_family = "wasm"))]
1582 pub async fn import_room_keys(
1583 &self,
1584 path: PathBuf,
1585 passphrase: &str,
1586 ) -> Result<RoomKeyImportResult, RoomKeyImportError> {
1587 let olm = self.client.olm_machine().await;
1588 let olm = olm.as_ref().ok_or(RoomKeyImportError::StoreClosed)?;
1589 let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1590
1591 let decrypt = move || {
1592 let file = std::fs::File::open(path)?;
1593 matrix_sdk_base::crypto::decrypt_room_key_export(file, &passphrase)
1594 };
1595
1596 let task = tokio::task::spawn_blocking(decrypt);
1597 let import = task.await.expect("Task join error")?;
1598
1599 let ret = olm.store().import_exported_room_keys(import, |_, _| {}).await?;
1600
1601 self.backups().maybe_trigger_backup();
1602
1603 Ok(ret)
1604 }
1605
1606 pub async fn room_keys_received_stream(
1637 &self,
1638 ) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<>>
1639 {
1640 let olm = self.client.olm_machine().await;
1641 let olm = olm.as_ref()?;
1642
1643 Some(olm.store().room_keys_received_stream())
1644 }
1645
1646 pub async fn historic_room_key_stream(
1677 &self,
1678 ) -> Option<impl Stream<Item = RoomKeyBundleInfo> + use<>> {
1679 let olm = self.client.olm_machine().await;
1680 let olm = olm.as_ref()?;
1681
1682 Some(olm.store().historic_room_key_stream())
1683 }
1684
1685 pub fn secret_storage(&self) -> SecretStorage {
1687 SecretStorage { client: self.client.to_owned() }
1688 }
1689
1690 pub fn backups(&self) -> Backups {
1692 Backups { client: self.client.to_owned() }
1693 }
1694
1695 pub fn recovery(&self) -> Recovery {
1697 Recovery { client: self.client.to_owned() }
1698 }
1699
1700 pub async fn enable_cross_process_store_lock(&self, lock_value: String) -> Result<(), Error> {
1712 if let Some(prev_lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1714 let prev_holder = prev_lock.lock_holder();
1715 if prev_holder.is_some() && prev_holder.unwrap() == lock_value {
1716 return Ok(());
1717 }
1718 warn!(
1719 "Recreating cross-process store lock with a different holder value: \
1720 prev was {prev_holder:?}, new is {lock_value}"
1721 );
1722 }
1723
1724 let olm_machine = self.client.base_client().olm_machine().await;
1725 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1726
1727 let lock = olm_machine.store().create_store_lock(
1728 "cross_process_lock".to_owned(),
1729 CrossProcessLockConfig::multi_process(lock_value.to_owned()),
1730 );
1731
1732 {
1737 let lock_result = lock.try_lock_once().await?;
1738
1739 if lock_result.is_ok() {
1740 olm_machine
1741 .initialize_crypto_store_generation(
1742 &self.client.locks().crypto_store_generation,
1743 )
1744 .await?;
1745 }
1746 }
1747
1748 self.client
1749 .locks()
1750 .cross_process_crypto_store_lock
1751 .set(lock)
1752 .map_err(|_| Error::BadCryptoStoreState)?;
1753
1754 Ok(())
1755 }
1756
1757 async fn on_lock_newly_acquired(&self) -> Result<u64, Error> {
1762 let olm_machine_guard = self.client.olm_machine().await;
1763 if let Some(olm_machine) = olm_machine_guard.as_ref() {
1764 let (new_gen, generation_number) = olm_machine
1765 .maintain_crypto_store_generation(&self.client.locks().crypto_store_generation)
1766 .await?;
1767 if new_gen {
1769 drop(olm_machine_guard);
1771 self.client.base_client().regenerate_olm(None).await?;
1773 }
1774 Ok(generation_number)
1775 } else {
1776 warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised");
1781 Ok(0)
1782 }
1783 }
1784
1785 pub async fn spin_lock_store(
1791 &self,
1792 max_backoff: Option<u32>,
1793 ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1794 if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1795 let guard = lock
1796 .spin_lock(max_backoff)
1797 .await
1798 .map_err(|err| {
1799 Error::CrossProcessLockError(Box::new(CrossProcessLockError::TryLock(
1800 Arc::new(err),
1801 )))
1802 })?
1803 .map_err(|err| Error::CrossProcessLockError(Box::new(err.into())))?;
1804
1805 let generation = self.on_lock_newly_acquired().await?;
1806
1807 Ok(Some(CrossProcessLockStoreGuardWithGeneration {
1808 _guard: guard.into_guard(),
1809 generation,
1810 }))
1811 } else {
1812 Ok(None)
1813 }
1814 }
1815
1816 pub async fn try_lock_store_once(
1821 &self,
1822 ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1823 if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1824 let lock_result = lock.try_lock_once().await?;
1825
1826 let Some(guard) = lock_result.ok() else {
1827 return Ok(None);
1828 };
1829
1830 let generation = self.on_lock_newly_acquired().await?;
1831
1832 Ok(Some(CrossProcessLockStoreGuardWithGeneration {
1833 _guard: guard.into_guard(),
1834 generation,
1835 }))
1836 } else {
1837 Ok(None)
1838 }
1839 }
1840
1841 #[cfg(any(test, feature = "testing"))]
1843 pub async fn uploaded_key_count(&self) -> Result<u64> {
1844 let olm_machine = self.client.olm_machine().await;
1845 let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?;
1846 Ok(olm_machine.uploaded_key_count().await?)
1847 }
1848
1849 pub(crate) async fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1873 let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite {
1877 Some(BundleReceiverTask::new(&self.client).await)
1878 } else {
1879 None
1880 };
1881
1882 let mut tasks = self.client.inner.e2ee.tasks.lock();
1883
1884 let this = self.clone();
1885
1886 tasks.setup_e2ee = Some(spawn(async move {
1887 this.update_verification_state().await;
1890
1891 if this.settings().auto_enable_cross_signing
1892 && let Err(e) = this.bootstrap_cross_signing_if_needed(auth_data).await
1893 {
1894 error!("Couldn't bootstrap cross signing {e:?}");
1895 }
1896
1897 if let Err(e) = this.backups().setup_and_resume().await {
1898 error!("Couldn't setup and resume backups {e:?}");
1899 }
1900 if let Err(e) = this.recovery().setup().await {
1901 error!("Couldn't setup and resume recovery {e:?}");
1902 }
1903 }));
1904
1905 tasks.receive_historic_room_key_bundles = bundle_receiver_task;
1906
1907 self.setup_room_membership_session_discard_handler();
1908 }
1909
1910 pub async fn wait_for_e2ee_initialization_tasks(&self) {
1913 let task = self.client.inner.e2ee.tasks.lock().setup_e2ee.take();
1914
1915 if let Some(task) = task
1916 && let Err(err) = task.await
1917 {
1918 warn!("Error when initializing backups: {err}");
1919 }
1920 }
1921
1922 pub(crate) async fn ensure_device_keys_upload(&self) -> Result<()> {
1932 let olm = self.client.olm_machine().await;
1933 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1934
1935 if let Some((request_id, request)) = olm.upload_device_keys().await? {
1936 self.client.keys_upload(&request_id, &request).await?;
1937
1938 let (request_id, request) = olm.query_keys_for_users([olm.user_id()]);
1939 self.client.keys_query(&request_id, request.device_keys).await?;
1940 }
1941
1942 Ok(())
1943 }
1944
1945 pub(crate) async fn update_state_after_keys_query(&self, response: &get_keys::v3::Response) {
1946 self.recovery().update_state_after_keys_query(response).await;
1947
1948 if let Some(user_id) = self.client.user_id() {
1950 let contains_own_device = response.device_keys.contains_key(user_id);
1951
1952 if contains_own_device {
1953 self.update_verification_state().await;
1954 }
1955 }
1956 }
1957
1958 async fn update_verification_state(&self) {
1959 match self.get_own_device().await {
1960 Ok(device) => {
1961 if let Some(device) = device {
1962 let is_verified = device.is_cross_signed_by_owner();
1963
1964 if is_verified {
1965 self.client.inner.verification_state.set(VerificationState::Verified);
1966 } else {
1967 self.client.inner.verification_state.set(VerificationState::Unverified);
1968 }
1969 } else {
1970 warn!("Couldn't find out own device in the store.");
1971 self.client.inner.verification_state.set(VerificationState::Unknown);
1972 }
1973 }
1974 Err(error) => {
1975 warn!("Failed retrieving own device: {error}");
1976 self.client.inner.verification_state.set(VerificationState::Unknown);
1977 }
1978 }
1979 }
1980
1981 fn setup_room_membership_session_discard_handler(&self) {
2001 let client = WeakClient::from_client(&self.client);
2002 self.client.add_event_handler(|ev: OriginalSyncRoomMemberEvent, room: Room| async move {
2003 let Some(client) = client.get() else {
2004 return;
2006 };
2007 let Some(user_id) = client.user_id() else {
2008 return;
2010 };
2011 let olm = client.olm_machine().await;
2012 let Some(olm) = olm.as_ref() else {
2013 warn!("Cannot discard session - Olm machine is not available");
2014 return;
2015 };
2016
2017 if !matches!(ev.membership_change(), MembershipChange::Left) || ev.sender == user_id {
2018 return;
2020 }
2021
2022 debug!(room_id = ?room.room_id(), member_id = ?ev.sender, "Discarding session as a user left the room");
2023
2024 if let Err(e) = olm.discard_room_key(room.room_id()).await {
2027 warn!(
2028 room_id = ?room.room_id(),
2029 "Error discarding room key after member leave: {e:?}"
2030 );
2031 }
2032 });
2033 }
2034
2035 #[cfg(feature = "experimental-send-custom-to-device")]
2044 pub async fn encrypt_and_send_raw_to_device(
2045 &self,
2046 recipient_devices: Vec<&Device>,
2047 event_type: &str,
2048 content: Raw<AnyToDeviceEventContent>,
2049 share_strategy: CollectStrategy,
2050 ) -> Result<Vec<(OwnedUserId, OwnedDeviceId)>> {
2051 let users = recipient_devices.iter().map(|device| device.user_id());
2052
2053 self.client.claim_one_time_keys(users).await?;
2057
2058 let olm = self.client.olm_machine().await;
2059 let olm = olm.as_ref().expect("Olm machine wasn't started");
2060
2061 let (requests, withhelds) = olm
2062 .encrypt_content_for_devices(
2063 recipient_devices.into_iter().map(|d| d.deref().clone()).collect(),
2064 event_type,
2065 &content
2066 .deserialize_as::<serde_json::Value>()
2067 .expect("Deserialize as Value will always work"),
2068 share_strategy,
2069 )
2070 .await?;
2071
2072 let mut failures: Vec<(OwnedUserId, OwnedDeviceId)> = Default::default();
2073
2074 withhelds.iter().for_each(|(d, _)| {
2076 failures.push((d.user_id().to_owned(), d.device_id().to_owned()));
2077 });
2078
2079 for request in requests {
2081 let ruma_request = RumaToDeviceRequest::new_raw(
2082 request.event_type.clone(),
2083 request.txn_id.clone(),
2084 request.messages.clone(),
2085 );
2086
2087 let send_result = self
2088 .client
2089 .send_inner(ruma_request, Some(RequestConfig::short_retry()), Default::default())
2090 .await;
2091
2092 if send_result.is_err() {
2094 for (user_id, device_map) in request.messages {
2096 for device_id in device_map.keys() {
2097 match device_id {
2098 DeviceIdOrAllDevices::DeviceId(device_id) => {
2099 failures.push((user_id.clone(), device_id.to_owned()));
2100 }
2101 DeviceIdOrAllDevices::AllDevices => {
2102 }
2104 }
2105 }
2106 }
2107 }
2108 }
2109
2110 Ok(failures)
2111 }
2112}
2113
2114#[cfg(all(test, not(target_family = "wasm")))]
2115mod tests {
2116 use std::{
2117 ops::Not,
2118 str::FromStr,
2119 sync::{
2120 Arc,
2121 atomic::{AtomicBool, Ordering},
2122 },
2123 time::Duration,
2124 };
2125
2126 use matrix_sdk_test::{
2127 DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
2128 event_factory::EventFactory,
2129 };
2130 use ruma::{
2131 event_id,
2132 events::{reaction::ReactionEventContent, relation::Annotation},
2133 user_id,
2134 };
2135 use serde_json::json;
2136 use wiremock::{
2137 Mock, MockServer, Request, ResponseTemplate,
2138 matchers::{header, method, path_regex},
2139 };
2140
2141 use crate::{
2142 Client, assert_next_matches_with_timeout,
2143 config::RequestConfig,
2144 encryption::{
2145 DuplicateOneTimeKeyErrorMessage, OAuthCrossSigningResetInfo, VerificationState,
2146 },
2147 test_utils::{
2148 client::mock_matrix_session, logged_in_client, no_retry_test_client, set_client_session,
2149 },
2150 };
2151
2152 #[async_test]
2153 async fn test_reaction_sending() {
2154 let server = MockServer::start().await;
2155 let client = logged_in_client(Some(server.uri())).await;
2156
2157 let event_id = event_id!("$2:example.org");
2158
2159 Mock::given(method("GET"))
2160 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
2161 .and(header("authorization", "Bearer 1234"))
2162 .respond_with(
2163 ResponseTemplate::new(200)
2164 .set_body_json(EventFactory::new().room_encryption().into_content()),
2165 )
2166 .mount(&server)
2167 .await;
2168
2169 Mock::given(method("PUT"))
2170 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/m\.reaction/.*".to_owned()))
2171 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2172 "event_id": event_id,
2173 })))
2174 .mount(&server)
2175 .await;
2176
2177 let f = EventFactory::new().sender(user_id!("@example:localhost"));
2178 let response = SyncResponseBuilder::default()
2179 .add_joined_room(
2180 JoinedRoomBuilder::default()
2181 .add_state_event(
2182 f.member(user_id!("@example:localhost")).display_name("example"),
2183 )
2184 .add_state_event(f.default_power_levels())
2185 .add_state_event(f.room_encryption()),
2186 )
2187 .build_sync_response();
2188
2189 client.base_client().receive_sync_response(response).await.unwrap();
2190
2191 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
2192 assert!(
2193 room.latest_encryption_state().await.expect("Getting encryption state").is_encrypted()
2194 );
2195
2196 let event_id = event_id!("$1:example.org");
2197 let reaction = ReactionEventContent::new(Annotation::new(event_id.into(), "🐈".to_owned()));
2198 room.send(reaction).await.expect("Sending the reaction should not fail");
2199
2200 room.send_raw("m.reaction", json!({})).await.expect("Sending the reaction should not fail");
2201 }
2202
2203 #[cfg(feature = "sqlite")]
2204 #[async_test]
2205 async fn test_generation_counter_invalidates_olm_machine() {
2206 use matrix_sdk_base::store::RoomLoadSettings;
2209 let sqlite_path = std::env::temp_dir().join("generation_counter_sqlite.db");
2210 let session = mock_matrix_session();
2211
2212 let client1 = Client::builder()
2213 .homeserver_url("http://localhost:1234")
2214 .request_config(RequestConfig::new().disable_retry())
2215 .sqlite_store(&sqlite_path, None)
2216 .build()
2217 .await
2218 .unwrap();
2219 client1
2220 .matrix_auth()
2221 .restore_session(session.clone(), RoomLoadSettings::default())
2222 .await
2223 .unwrap();
2224
2225 let client2 = Client::builder()
2226 .homeserver_url("http://localhost:1234")
2227 .request_config(RequestConfig::new().disable_retry())
2228 .sqlite_store(sqlite_path, None)
2229 .build()
2230 .await
2231 .unwrap();
2232 client2.matrix_auth().restore_session(session, RoomLoadSettings::default()).await.unwrap();
2233
2234 let guard = client1.encryption().try_lock_store_once().await.unwrap();
2236 assert!(guard.is_none());
2237
2238 client1.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2239 client2.encryption().enable_cross_process_store_lock("client2".to_owned()).await.unwrap();
2240
2241 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2243 assert!(acquired1.is_some());
2244
2245 let initial_olm_machine =
2247 client1.olm_machine().await.clone().expect("must have an olm machine");
2248
2249 let decryption_key = matrix_sdk_base::crypto::store::types::BackupDecryptionKey::new()
2251 .expect("Can't create new recovery key");
2252 let backup_key = decryption_key.megolm_v1_public_key();
2253 backup_key.set_version("1".to_owned());
2254 initial_olm_machine
2255 .backup_machine()
2256 .save_decryption_key(Some(decryption_key.to_owned()), Some("1".to_owned()))
2257 .await
2258 .expect("Should save");
2259
2260 initial_olm_machine.backup_machine().enable_backup_v1(backup_key.clone()).await.unwrap();
2261
2262 assert!(client1.encryption().backups().are_enabled().await);
2263
2264 let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2266 assert!(acquired2.is_none());
2267
2268 drop(acquired1);
2270 tokio::time::sleep(Duration::from_millis(100)).await;
2271
2272 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2274 assert!(acquired1.is_some());
2275
2276 let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2278 assert!(initial_olm_machine.same_as(&olm_machine));
2279
2280 drop(acquired1);
2282 tokio::time::sleep(Duration::from_millis(100)).await;
2283
2284 let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2286 assert!(acquired2.is_some());
2287
2288 drop(acquired2);
2290 tokio::time::sleep(Duration::from_millis(100)).await;
2291
2292 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2294 assert!(acquired1.is_some());
2295
2296 let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2298
2299 assert!(!initial_olm_machine.same_as(&olm_machine));
2300
2301 let backup_key_new = olm_machine.backup_machine().get_backup_keys().await.unwrap();
2302 assert!(backup_key_new.decryption_key.is_some());
2303 assert_eq!(
2304 backup_key_new.decryption_key.unwrap().megolm_v1_public_key().to_base64(),
2305 backup_key.to_base64()
2306 );
2307 assert!(client1.encryption().backups().are_enabled().await);
2308 }
2309
2310 #[cfg(feature = "sqlite")]
2311 #[async_test]
2312 async fn test_generation_counter_no_spurious_invalidation() {
2313 use matrix_sdk_base::store::RoomLoadSettings;
2316 let sqlite_path =
2317 std::env::temp_dir().join("generation_counter_no_spurious_invalidations.db");
2318 let session = mock_matrix_session();
2319
2320 let client = Client::builder()
2321 .homeserver_url("http://localhost:1234")
2322 .request_config(RequestConfig::new().disable_retry())
2323 .sqlite_store(&sqlite_path, None)
2324 .build()
2325 .await
2326 .unwrap();
2327 client
2328 .matrix_auth()
2329 .restore_session(session.clone(), RoomLoadSettings::default())
2330 .await
2331 .unwrap();
2332
2333 let initial_olm_machine = client.olm_machine().await.as_ref().unwrap().clone();
2334
2335 client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2336
2337 let after_enabling_lock = client.olm_machine().await.as_ref().unwrap().clone();
2339 assert!(initial_olm_machine.same_as(&after_enabling_lock));
2340
2341 {
2342 let client2 = Client::builder()
2344 .homeserver_url("http://localhost:1234")
2345 .request_config(RequestConfig::new().disable_retry())
2346 .sqlite_store(sqlite_path, None)
2347 .build()
2348 .await
2349 .unwrap();
2350 client2
2351 .matrix_auth()
2352 .restore_session(session, RoomLoadSettings::default())
2353 .await
2354 .unwrap();
2355
2356 client2
2357 .encryption()
2358 .enable_cross_process_store_lock("client2".to_owned())
2359 .await
2360 .unwrap();
2361
2362 let guard = client2.encryption().spin_lock_store(None).await.unwrap();
2363 assert!(guard.is_some());
2364
2365 drop(guard);
2366 tokio::time::sleep(Duration::from_millis(100)).await;
2367 }
2368
2369 {
2370 let acquired = client.encryption().try_lock_store_once().await.unwrap();
2371 assert!(acquired.is_some());
2372 }
2373
2374 let after_taking_lock_first_time = client.olm_machine().await.as_ref().unwrap().clone();
2376 assert!(!initial_olm_machine.same_as(&after_taking_lock_first_time));
2377
2378 {
2379 let acquired = client.encryption().try_lock_store_once().await.unwrap();
2380 assert!(acquired.is_some());
2381 }
2382
2383 let after_taking_lock_second_time = client.olm_machine().await.as_ref().unwrap().clone();
2385 assert!(after_taking_lock_first_time.same_as(&after_taking_lock_second_time));
2386 }
2387
2388 #[async_test]
2389 async fn test_update_verification_state_is_updated_before_any_requests_happen() {
2390 let client = no_retry_test_client(None).await;
2392 let server = MockServer::start().await;
2393
2394 let mut verification_state = client.encryption().verification_state();
2396
2397 assert_next_matches_with_timeout!(verification_state, VerificationState::Unknown);
2399
2400 let keys_requested = Arc::new(AtomicBool::new(false));
2403 let inner_bool = keys_requested.clone();
2404
2405 Mock::given(method("GET"))
2406 .and(path_regex(
2407 r"/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
2408 ))
2409 .respond_with(move |_req: &Request| {
2410 inner_bool.fetch_or(true, Ordering::SeqCst);
2411 ResponseTemplate::new(200).set_body_json(json!({}))
2412 })
2413 .mount(&server)
2414 .await;
2415
2416 set_client_session(&client).await;
2418
2419 assert!(keys_requested.load(Ordering::SeqCst).not());
2421 assert_next_matches_with_timeout!(verification_state, VerificationState::Unverified);
2422 }
2423
2424 #[test]
2425 fn test_oauth_reset_info_from_uiaa_info() {
2426 let auth_info = json!({
2427 "session": "dummy",
2428 "flows": [
2429 {
2430 "stages": [
2431 "org.matrix.cross_signing_reset"
2432 ]
2433 }
2434 ],
2435 "params": {
2436 "org.matrix.cross_signing_reset": {
2437 "url": "https://example.org/account/account?action=org.matrix.cross_signing_reset"
2438 }
2439 },
2440 "msg": "To reset..."
2441 });
2442
2443 let auth_info = serde_json::from_value(auth_info)
2444 .expect("We should be able to deserialize the UiaaInfo");
2445 OAuthCrossSigningResetInfo::from_auth_info(&auth_info)
2446 .expect("We should be able to fetch the cross-signing reset info from the auth info");
2447 }
2448
2449 #[test]
2450 fn test_duplicate_one_time_key_error_parsing() {
2451 let message = concat!(
2452 r#"One time key signed_curve25519:AAAAAAAAAAA already exists. "#,
2453 r#"Old key: {"key":"dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs","#,
2454 r#""signatures":{"@example:matrix.org":{"ed25519:AAAAAAAAAA":""#,
2455 r#"Fk45zHAbrd+1j9wZXLjL2Y/+DU/Mnz9yuvlfYBOOT7qExN2Jdud+5BAuNs8nZ/caS4wTF39Kg3zQpzaGERoCBg"}}};"#,
2456 r#" new key: {'key': 'CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno', "#,
2457 r#"'signatures': {'@example:matrix.org': {'ed25519:AAAAAAAAAA': "#,
2458 r#"'BQ9Gp0p+6srF+c8OyruqKKd9R4yaub3THYAyyBB/7X/rG8BwcAqFynzl1aGyFYun4Q+087a5OSiglCXI+/kQAA'}}}"#
2459 );
2460 let message = DuplicateOneTimeKeyErrorMessage::from_str(message)
2461 .expect("We should be able to parse the error message");
2462
2463 assert_eq!(message.old_key.to_base64(), "dBcZBzQaiQYWf6rBPh2QypIOB/dxSoTeyaFaxNNbeHs");
2464 assert_eq!(message.new_key.to_base64(), "CY0TWVK1/Kj3ZADuBcGe3UKvpT+IKAPMUsMeJhSDqno");
2465
2466 DuplicateOneTimeKeyErrorMessage::from_str("One time key already exists.")
2467 .expect_err("We shouldn't be able to parse an incomplete error message");
2468 }
2469
2470 fn devices_to_verify_against_keys_query_response(
2474 devices: Vec<serde_json::Value>,
2475 ) -> serde_json::Value {
2476 let device_keys: serde_json::Map<String, serde_json::Value> = devices
2477 .into_iter()
2478 .map(|device| (device.get("device_id").unwrap().as_str().unwrap().to_owned(), device))
2479 .collect();
2480 json!({
2481 "device_keys": {
2482 "@example:localhost": device_keys,
2483 },
2484 "master_keys": {
2485 "@example:localhost": {
2486 "keys": {
2487 "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU",
2488 },
2489 "usage": ["master"],
2490 "user_id": "@example:localhost",
2491 },
2492 },
2493 "self_signing_keys": {
2494 "@example:localhost": {
2495 "keys": {
2496 "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM",
2497 },
2498 "usage": ["self_signing"],
2499 "user_id": "@example:localhost",
2500 "signatures": {
2501 "@example:localhost": {
2502 "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "etO1bB+rCk+TQ/FcjQ8eWu/RsRNQNNQ1Ek+PD6//j8yz6igRjfvuHZaMvr/quAFrirfgExph2TdOwlDgN5bFCQ",
2503 },
2504 },
2505 },
2506 },
2507 "user_signing_keys": {
2508 "@example:localhost": {
2509 "keys": {
2510 "ed25519:CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0": "CBaovtekFxzf2Ijjhk4B49drOH0/qmhBbptFlVW7HC0",
2511 },
2512 "usage": ["user_signing"],
2513 "user_id": "@example:localhost",
2514 "signatures": {
2515 "@example:localhost": {
2516 "ed25519:PJklDgml7Xtt1Wr8jsWvB+lC5YD/bVDpHL+fYuItNxU": "E/DFi/hQTIb/7eSB+HbCXeTLFaLjqWHzLO9GwjL1qdhfO7ew4p6YdtXSH3T2YYr1dKCPteH/4nMYVwOhww2CBg",
2517 },
2518 },
2519 },
2520 }
2521 })
2522 }
2523
2524 #[async_test]
2527 async fn test_devices_to_verify_against_no_devices() {
2530 let server = MockServer::start().await;
2531 let client = logged_in_client(Some(server.uri())).await;
2532
2533 Mock::given(method("POST"))
2534 .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2535 .respond_with(
2536 ResponseTemplate::new(200)
2537 .set_body_json(devices_to_verify_against_keys_query_response(vec![])),
2538 )
2539 .mount(&server)
2540 .await;
2541
2542 assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2543 }
2544
2545 #[async_test]
2546 async fn test_devices_to_verify_against_cross_signed() {
2549 let server = MockServer::start().await;
2550 let client = logged_in_client(Some(server.uri())).await;
2551
2552 Mock::given(method("POST"))
2553 .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2554 .respond_with(ResponseTemplate::new(200).set_body_json(
2555 devices_to_verify_against_keys_query_response(vec![
2556 json!({
2557 "algorithms": [
2558 "m.olm.v1.curve25519-aes-sha2",
2559 "m.megolm.v1.aes-sha2",
2560 ],
2561 "user_id": "@example:localhost",
2562 "device_id": "SIGNEDDEVICE",
2563 "keys": {
2564 "curve25519:SIGNEDDEVICE": "o1LqUtH/sqd3WF+BB2Qr77uw3sDmZhMOz68/IV9aHxs",
2565 "ed25519:SIGNEDDEVICE": "iVoEfMOoUqxXVMLdpZCOgvQuCrT3/kQWkBmB3Phi/lo",
2566 },
2567 "signatures": {
2568 "@example:localhost": {
2569 "ed25519:SIGNEDDEVICE": "C7yRu1fNrdD2EobVdtANMqk3LBtWtTRWrIU22xVS8/Om1kmA/luzek64R3N6JsZhYczVmZYBKhUC9kRvHHwOBg",
2570 "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "frfh2HP28GclmGvwTic00Fj4nZCvm4RlRA6U56mnD5920hOi04+L055ojzp6ybZXvC/GQYfyTHwQXlUN1nvxBA",
2571 },
2572 },
2573 })
2574 ])
2575 ))
2576 .mount(&server)
2577 .await;
2578
2579 assert!(client.encryption().has_devices_to_verify_against().await.unwrap());
2580 }
2581
2582 #[async_test]
2583 async fn test_devices_to_verify_against_dehydrated_and_unsigned() {
2586 let server = MockServer::start().await;
2587 let client = logged_in_client(Some(server.uri())).await;
2588 let user_id = client.user_id().unwrap();
2589 let olm_machine = client.olm_machine().await;
2590 let olm_machine = olm_machine.as_ref().unwrap();
2591
2592 Mock::given(method("POST"))
2593 .and(path_regex(r"^/_matrix/client/r0/keys/query".to_owned()))
2594 .respond_with(ResponseTemplate::new(200).set_body_json(
2595 devices_to_verify_against_keys_query_response(vec![
2596 json!({
2597 "algorithms": [
2598 "m.olm.v1.curve25519-aes-sha2",
2599 "m.megolm.v1.aes-sha2",
2600 ],
2601 "user_id": "@example:localhost",
2602 "device_id": "DEHYDRATEDDEVICE",
2603 "keys": {
2604 "curve25519:DEHYDRATEDDEVICE": "XOn5VguAgokZ3p9mBz2yOB395fn6j75G8jIPcXEWQGY",
2605 "ed25519:DEHYDRATEDDEVICE": "4GG5xmBT7z4rgUgmWNlKZ+ABE3QlGgTorF+luCnKfYI",
2606 },
2607 "dehydrated": true,
2608 "signatures": {
2609 "@example:localhost": {
2610 "ed25519:DEHYDRATEDDEVICE": "+OMasB7nzVlMV+zRDxkh4h8h/Q0bY42P1SPv7X2IURIelT5G+d+AYSmg30N4maphxEDBqt/vI8/lIr71exc3Dg",
2611 "ed25519:jobZVcxG+PBLwZMsF4XEJSJTVqOgDxd0Ud3J/bw3HYM": "8DzynAgbYgXX1Md5d4Vw91Zstpoi4dpG7levFeVhi4psCAWuBnV76Qu1s2TGjQQ0CLDXEqcxxuX9X4eUK5TGCg",
2612 },
2613 },
2614 }),
2615 json!({
2616 "algorithms": [
2617 "m.olm.v1.curve25519-aes-sha2",
2618 "m.megolm.v1.aes-sha2",
2619 ],
2620 "user_id": "@example:localhost",
2621 "device_id": "UNSIGNEDDEVICE",
2622 "keys": {
2623 "curve25519:UNSIGNEDDEVICE": "mMby6NpprkHxj+ONfO9Z5lBqVUHJBMkrPFSNJhogBkg",
2624 "ed25519:UNSIGNEDDEVICE": "Zifq39ZDrlIaSRf0Hh22owEqXCPE+1JSSgs6LDlubwQ",
2625 },
2626 "signatures": {
2627 "@example:localhost": {
2628 "ed25519:UNSIGNEDDEVICE": "+L29RoDKoTufPGm/Bae65KHno7Z1H7GYhxSKpB4RQZRS7NrR29AMW1PVhEsIozYuDVEFuMZ0L8H3dlcaHxagBA",
2629 },
2630 },
2631 }),
2632 ])
2633 ))
2634 .mount(&server)
2635 .await;
2636
2637 let (request_id, request) = olm_machine.query_keys_for_users([user_id]);
2638 client.keys_query(&request_id, request.device_keys).await.unwrap();
2639
2640 assert!(!client.encryption().has_devices_to_verify_against().await.unwrap());
2641 }
2642}