1#![doc = include_str!("../docs/encryption.md")]
17#![cfg_attr(target_family = "wasm", allow(unused_imports))]
18
19#[cfg(feature = "experimental-send-custom-to-device")]
20use std::ops::Deref;
21use std::{
22 collections::{BTreeMap, HashSet},
23 io::{Cursor, Read, Write},
24 iter,
25 path::PathBuf,
26 sync::Arc,
27};
28
29use eyeball::{SharedObservable, Subscriber};
30use futures_core::Stream;
31use futures_util::{
32 future::try_join,
33 stream::{self, StreamExt},
34};
35#[cfg(feature = "experimental-send-custom-to-device")]
36use matrix_sdk_base::crypto::CollectStrategy;
37use matrix_sdk_base::{
38 crypto::{
39 store::types::{RoomKeyBundleInfo, RoomKeyInfo},
40 types::requests::{
41 OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
42 },
43 CrossSigningBootstrapRequests, OlmMachine,
44 },
45 StateStoreDataKey, StateStoreDataValue,
46};
47use matrix_sdk_common::{executor::spawn, locks::Mutex as StdMutex};
48use ruma::{
49 api::client::{
50 error::ErrorBody,
51 keys::{
52 get_keys, upload_keys, upload_signatures::v3::Request as UploadSignaturesRequest,
53 upload_signing_keys::v3::Request as UploadSigningKeysRequest,
54 },
55 message::send_message_event,
56 to_device::send_event_to_device::v3::{
57 Request as RumaToDeviceRequest, Response as ToDeviceResponse,
58 },
59 uiaa::{AuthData, UiaaInfo},
60 },
61 assign,
62 events::{
63 direct::DirectUserIdentifier,
64 room::{MediaSource, ThumbnailInfo},
65 },
66 DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, TransactionId, UserId,
67};
68#[cfg(feature = "experimental-send-custom-to-device")]
69use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices};
70use serde::Deserialize;
71use tasks::BundleReceiverTask;
72use tokio::sync::{Mutex, RwLockReadGuard};
73use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
74use tracing::{debug, error, instrument, trace, warn};
75use url::Url;
76use vodozemac::Curve25519PublicKey;
77
78use self::{
79 backups::{types::BackupClientState, Backups},
80 futures::UploadEncryptedFile,
81 identities::{Device, DeviceUpdates, IdentityUpdates, UserDevices, UserIdentity},
82 recovery::{Recovery, RecoveryState},
83 secret_storage::SecretStorage,
84 tasks::{BackupDownloadTask, BackupUploadingTask, ClientTasks},
85 verification::{SasVerification, Verification, VerificationRequest},
86};
87use crate::{
88 attachment::Thumbnail,
89 client::{ClientInner, WeakClient},
90 error::HttpResult,
91 store_locks::CrossProcessStoreLockGuard,
92 Client, Error, HttpError, Result, Room, RumaApiError, TransmissionProgress,
93};
94
95pub mod backups;
96pub mod futures;
97pub mod identities;
98pub mod recovery;
99pub mod secret_storage;
100pub(crate) mod tasks;
101pub mod verification;
102
103pub use matrix_sdk_base::crypto::{
104 olm::{
105 SessionCreationError as MegolmSessionCreationError,
106 SessionExportError as OlmSessionExportError,
107 },
108 vodozemac, CrossSigningStatus, CryptoStoreError, DecryptorError, EventError, KeyExportError,
109 LocalTrust, MediaEncryptionInfo, MegolmError, OlmError, RoomKeyImportResult, SecretImportError,
110 SessionCreationError, SignatureError, VERSION,
111};
112
113#[cfg(feature = "experimental-send-custom-to-device")]
114use crate::config::RequestConfig;
115pub use crate::error::RoomKeyImportError;
116
117pub(crate) struct EncryptionData {
119 pub tasks: StdMutex<ClientTasks>,
122
123 pub encryption_settings: EncryptionSettings,
125
126 pub backup_state: BackupClientState,
128
129 pub recovery_state: SharedObservable<RecoveryState>,
131}
132
133impl EncryptionData {
134 pub fn new(encryption_settings: EncryptionSettings) -> Self {
135 Self {
136 encryption_settings,
137
138 tasks: StdMutex::new(Default::default()),
139 backup_state: Default::default(),
140 recovery_state: Default::default(),
141 }
142 }
143
144 pub fn initialize_tasks(&self, client: &Arc<ClientInner>) {
145 let weak_client = WeakClient::from_inner(client);
146
147 let mut tasks = self.tasks.lock();
148 tasks.upload_room_keys = Some(BackupUploadingTask::new(weak_client.clone()));
149
150 if self.encryption_settings.backup_download_strategy
151 == BackupDownloadStrategy::AfterDecryptionFailure
152 {
153 tasks.download_room_keys = Some(BackupDownloadTask::new(weak_client));
154 }
155 }
156
157 pub fn initialize_recovery_state_update_task(&self, client: &Client) {
163 let mut guard = self.tasks.lock();
164
165 let future = Recovery::update_state_after_backup_state_change(client);
166 let join_handle = spawn(future);
167
168 guard.update_recovery_state_after_backup = Some(join_handle);
169 }
170}
171
172#[derive(Clone, Copy, Debug, Default)]
174pub struct EncryptionSettings {
175 pub auto_enable_cross_signing: bool,
181
182 pub backup_download_strategy: BackupDownloadStrategy,
187
188 pub auto_enable_backups: bool,
190}
191
192#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
194#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
195pub enum BackupDownloadStrategy {
196 OneShot,
207
208 AfterDecryptionFailure,
210
211 #[default]
216 Manual,
217}
218
219#[derive(Clone, Copy, Debug, Eq, PartialEq)]
224pub enum VerificationState {
225 Unknown,
227 Verified,
230 Unverified,
232}
233
234#[derive(Debug)]
236pub struct CrossProcessLockStoreGuardWithGeneration {
237 _guard: CrossProcessStoreLockGuard,
238 generation: u64,
239}
240
241impl CrossProcessLockStoreGuardWithGeneration {
242 pub fn generation(&self) -> u64 {
244 self.generation
245 }
246}
247
248#[derive(Debug)]
259pub struct CrossSigningResetHandle {
260 client: Client,
261 upload_request: UploadSigningKeysRequest,
262 signatures_request: UploadSignaturesRequest,
263 auth_type: CrossSigningResetAuthType,
264 is_cancelled: Mutex<bool>,
265}
266
267impl CrossSigningResetHandle {
268 pub fn new(
270 client: Client,
271 upload_request: UploadSigningKeysRequest,
272 signatures_request: UploadSignaturesRequest,
273 auth_type: CrossSigningResetAuthType,
274 ) -> Self {
275 Self {
276 client,
277 upload_request,
278 signatures_request,
279 auth_type,
280 is_cancelled: Mutex::new(false),
281 }
282 }
283
284 pub fn auth_type(&self) -> &CrossSigningResetAuthType {
287 &self.auth_type
288 }
289
290 pub async fn auth(&self, auth: Option<AuthData>) -> Result<()> {
294 let mut upload_request = self.upload_request.clone();
295 upload_request.auth = auth;
296
297 while let Err(e) = self.client.send(upload_request.clone()).await {
298 if *self.is_cancelled.lock().await {
299 return Ok(());
300 }
301
302 match e.as_uiaa_response() {
303 Some(uiaa_info) => {
304 if uiaa_info.auth_error.is_some() {
305 return Err(e.into());
306 }
307 }
308 None => return Err(e.into()),
309 }
310 }
311
312 self.client.send(self.signatures_request.clone()).await?;
313
314 Ok(())
315 }
316
317 pub async fn cancel(&self) {
319 *self.is_cancelled.lock().await = true;
320 }
321}
322
323#[derive(Debug, Clone)]
326pub enum CrossSigningResetAuthType {
327 Uiaa(UiaaInfo),
329 OAuth(OAuthCrossSigningResetInfo),
332}
333
334impl CrossSigningResetAuthType {
335 fn new(error: &HttpError) -> Result<Option<Self>> {
336 if let Some(auth_info) = error.as_uiaa_response() {
337 if let Ok(auth_info) = OAuthCrossSigningResetInfo::from_auth_info(auth_info) {
338 Ok(Some(CrossSigningResetAuthType::OAuth(auth_info)))
339 } else {
340 Ok(Some(CrossSigningResetAuthType::Uiaa(auth_info.clone())))
341 }
342 } else {
343 Ok(None)
344 }
345 }
346}
347
348#[derive(Debug, Clone, Deserialize)]
351pub struct OAuthCrossSigningResetInfo {
352 pub approval_url: Url,
354}
355
356impl OAuthCrossSigningResetInfo {
357 fn from_auth_info(auth_info: &UiaaInfo) -> Result<Self> {
358 let parameters = serde_json::from_str::<OAuthCrossSigningResetUiaaParameters>(
359 auth_info.params.as_ref().map(|value| value.get()).unwrap_or_default(),
360 )?;
361
362 Ok(OAuthCrossSigningResetInfo { approval_url: parameters.reset.url })
363 }
364}
365
366#[derive(Debug, Deserialize)]
369struct OAuthCrossSigningResetUiaaParameters {
370 #[serde(rename = "org.matrix.cross_signing_reset")]
372 reset: OAuthCrossSigningResetUiaaResetParameter,
373}
374
375#[derive(Debug, Deserialize)]
378struct OAuthCrossSigningResetUiaaResetParameter {
379 url: Url,
381}
382
383impl Client {
384 pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
385 self.base_client().olm_machine().await
386 }
387
388 pub(crate) async fn mark_request_as_sent(
389 &self,
390 request_id: &TransactionId,
391 response: impl Into<matrix_sdk_base::crypto::types::requests::AnyIncomingResponse<'_>>,
392 ) -> Result<(), matrix_sdk_base::Error> {
393 Ok(self
394 .olm_machine()
395 .await
396 .as_ref()
397 .expect(
398 "We should have an olm machine once we try to mark E2EE related requests as sent",
399 )
400 .mark_request_as_sent(request_id, response)
401 .await?)
402 }
403
404 #[instrument(skip(self, device_keys))]
410 pub(crate) async fn keys_query(
411 &self,
412 request_id: &TransactionId,
413 device_keys: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
414 ) -> Result<get_keys::v3::Response> {
415 let request = assign!(get_keys::v3::Request::new(), { device_keys });
416
417 let response = self.send(request).await?;
418 self.mark_request_as_sent(request_id, &response).await?;
419 self.encryption().update_state_after_keys_query(&response).await;
420
421 Ok(response)
422 }
423
424 pub fn upload_encrypted_file<'a, R: Read + ?Sized + 'a>(
458 &'a self,
459 reader: &'a mut R,
460 ) -> UploadEncryptedFile<'a, R> {
461 UploadEncryptedFile::new(self, reader)
462 }
463
464 pub(crate) async fn upload_encrypted_media_and_thumbnail(
467 &self,
468 data: &[u8],
469 thumbnail: Option<Thumbnail>,
470 send_progress: SharedObservable<TransmissionProgress>,
471 ) -> Result<(MediaSource, Option<(MediaSource, Box<ThumbnailInfo>)>)> {
472 let upload_thumbnail = self.upload_encrypted_thumbnail(thumbnail, send_progress.clone());
473
474 let upload_attachment = async {
475 let mut cursor = Cursor::new(data);
476 self.upload_encrypted_file(&mut cursor)
477 .with_send_progress_observable(send_progress)
478 .await
479 };
480
481 let (thumbnail, file) = try_join(upload_thumbnail, upload_attachment).await?;
482
483 Ok((MediaSource::Encrypted(Box::new(file)), thumbnail))
484 }
485
486 async fn upload_encrypted_thumbnail(
489 &self,
490 thumbnail: Option<Thumbnail>,
491 send_progress: SharedObservable<TransmissionProgress>,
492 ) -> Result<Option<(MediaSource, Box<ThumbnailInfo>)>> {
493 let Some(thumbnail) = thumbnail else {
494 return Ok(None);
495 };
496
497 let (data, _, thumbnail_info) = thumbnail.into_parts();
498 let mut cursor = Cursor::new(data);
499
500 let file = self
501 .upload_encrypted_file(&mut cursor)
502 .with_send_progress_observable(send_progress)
503 .await?;
504
505 Ok(Some((MediaSource::Encrypted(Box::new(file)), thumbnail_info)))
506 }
507
508 pub(crate) async fn claim_one_time_keys(
514 &self,
515 users: impl Iterator<Item = &UserId>,
516 ) -> Result<()> {
517 let _lock = self.locks().key_claim_lock.lock().await;
518
519 if let Some((request_id, request)) = self
520 .olm_machine()
521 .await
522 .as_ref()
523 .ok_or(Error::NoOlmMachine)?
524 .get_missing_sessions(users)
525 .await?
526 {
527 let response = self.send(request).await?;
528 self.mark_request_as_sent(&request_id, &response).await?;
529 }
530
531 Ok(())
532 }
533
534 #[instrument(skip(self, request))]
544 pub(crate) async fn keys_upload(
545 &self,
546 request_id: &TransactionId,
547 request: &upload_keys::v3::Request,
548 ) -> Result<upload_keys::v3::Response> {
549 debug!(
550 device_keys = request.device_keys.is_some(),
551 one_time_key_count = request.one_time_keys.len(),
552 "Uploading public encryption keys",
553 );
554
555 let response = self.send(request.clone()).await?;
556 self.mark_request_as_sent(request_id, &response).await?;
557
558 Ok(response)
559 }
560
561 pub(crate) async fn room_send_helper(
562 &self,
563 request: &RoomMessageRequest,
564 ) -> Result<send_message_event::v3::Response> {
565 let content = request.content.clone();
566 let txn_id = request.txn_id.clone();
567 let room_id = &request.room_id;
568
569 self.get_room(room_id)
570 .expect("Can't send a message to a room that isn't known to the store")
571 .send(*content)
572 .with_transaction_id(txn_id)
573 .await
574 }
575
576 pub(crate) async fn send_to_device(
577 &self,
578 request: &ToDeviceRequest,
579 ) -> HttpResult<ToDeviceResponse> {
580 let request = RumaToDeviceRequest::new_raw(
581 request.event_type.clone(),
582 request.txn_id.clone(),
583 request.messages.clone(),
584 );
585
586 self.send(request).await
587 }
588
589 pub(crate) async fn send_verification_request(
590 &self,
591 request: OutgoingVerificationRequest,
592 ) -> Result<()> {
593 use matrix_sdk_base::crypto::types::requests::OutgoingVerificationRequest::*;
594
595 match request {
596 ToDevice(t) => {
597 self.send_to_device(&t).await?;
598 }
599 InRoom(r) => {
600 self.room_send_helper(&r).await?;
601 }
602 }
603
604 Ok(())
605 }
606
607 pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
609 let rooms = self.joined_rooms();
610
611 let room = rooms.into_iter().find(|r| {
613 let targets = r.direct_targets();
614 targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id))
615 });
616
617 trace!(?room, "Found room");
618 room
619 }
620
621 async fn send_outgoing_request(&self, r: OutgoingRequest) -> Result<()> {
622 use matrix_sdk_base::crypto::types::requests::AnyOutgoingRequest;
623
624 match r.request() {
625 AnyOutgoingRequest::KeysQuery(request) => {
626 self.keys_query(r.request_id(), request.device_keys.clone()).await?;
627 }
628 AnyOutgoingRequest::KeysUpload(request) => {
629 let response = self.keys_upload(r.request_id(), request).await;
630
631 if let Err(e) = &response {
632 match e.as_ruma_api_error() {
633 Some(RumaApiError::ClientApi(e)) if e.status_code == 400 => {
634 if let ErrorBody::Standard { message, .. } = &e.body {
635 {
640 let already_reported = self
641 .state_store()
642 .get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded)
643 .await?
644 .is_some();
645
646 if message.starts_with("One time key") && !already_reported {
647 tracing::error!(
648 sentry = true,
649 error_message = message,
650 "Duplicate one-time keys have been uploaded"
651 );
652
653 self.state_store()
654 .set_kv_data(
655 StateStoreDataKey::OneTimeKeyAlreadyUploaded,
656 StateStoreDataValue::OneTimeKeyAlreadyUploaded,
657 )
658 .await?;
659 }
660 }
661 }
662 }
663 _ => {}
664 }
665
666 response?;
667 }
668 }
669 AnyOutgoingRequest::ToDeviceRequest(request) => {
670 let response = self.send_to_device(request).await?;
671 self.mark_request_as_sent(r.request_id(), &response).await?;
672 }
673 AnyOutgoingRequest::SignatureUpload(request) => {
674 let response = self.send(request.clone()).await?;
675 self.mark_request_as_sent(r.request_id(), &response).await?;
676 }
677 AnyOutgoingRequest::RoomMessage(request) => {
678 let response = self.room_send_helper(request).await?;
679 self.mark_request_as_sent(r.request_id(), &response).await?;
680 }
681 AnyOutgoingRequest::KeysClaim(request) => {
682 let response = self.send(request.clone()).await?;
683 self.mark_request_as_sent(r.request_id(), &response).await?;
684 }
685 }
686
687 Ok(())
688 }
689
690 #[instrument(skip_all)]
691 pub(crate) async fn send_outgoing_requests(&self) -> Result<()> {
692 const MAX_CONCURRENT_REQUESTS: usize = 20;
693
694 if let Err(e) = self.claim_one_time_keys(iter::empty()).await {
697 warn!("Error while claiming one-time keys {:?}", e);
698 }
699
700 let outgoing_requests = stream::iter(
701 self.olm_machine()
702 .await
703 .as_ref()
704 .ok_or(Error::NoOlmMachine)?
705 .outgoing_requests()
706 .await?,
707 )
708 .map(|r| self.send_outgoing_request(r));
709
710 let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS);
711
712 requests
713 .for_each(|r| async move {
714 match r {
715 Ok(_) => (),
716 Err(e) => warn!(error = ?e, "Error when sending out an outgoing E2EE request"),
717 }
718 })
719 .await;
720
721 Ok(())
722 }
723}
724
725#[cfg(any(feature = "testing", test))]
726impl Client {
727 pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
729 self.olm_machine().await
730 }
731}
732
733#[derive(Debug, Clone)]
737pub struct Encryption {
738 client: Client,
740}
741
742impl Encryption {
743 pub(crate) fn new(client: Client) -> Self {
744 Self { client }
745 }
746
747 pub(crate) fn settings(&self) -> EncryptionSettings {
749 self.client.inner.e2ee.encryption_settings
750 }
751
752 pub async fn ed25519_key(&self) -> Option<String> {
755 self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().ed25519.to_base64())
756 }
757
758 pub async fn curve25519_key(&self) -> Option<Curve25519PublicKey> {
760 self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().curve25519)
761 }
762
763 pub async fn device_creation_timestamp(&self) -> MilliSecondsSinceUnixEpoch {
765 match self.get_own_device().await {
766 Ok(Some(device)) => device.first_time_seen_ts(),
767 _ => MilliSecondsSinceUnixEpoch::now(),
769 }
770 }
771
772 pub(crate) async fn import_secrets_bundle(
773 &self,
774 bundle: &matrix_sdk_base::crypto::types::SecretsBundle,
775 ) -> Result<(), SecretImportError> {
776 let olm_machine = self.client.olm_machine().await;
777 let olm_machine =
778 olm_machine.as_ref().expect("This should only be called once we have an OlmMachine");
779
780 olm_machine.store().import_secrets_bundle(bundle).await
781 }
782
783 pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
788 let olm = self.client.olm_machine().await;
789 let machine = olm.as_ref()?;
790 Some(machine.cross_signing_status().await)
791 }
792
793 pub async fn tracked_users(&self) -> Result<HashSet<OwnedUserId>, CryptoStoreError> {
798 if let Some(machine) = self.client.olm_machine().await.as_ref() {
799 machine.tracked_users().await
800 } else {
801 Ok(HashSet::new())
802 }
803 }
804
805 pub fn verification_state(&self) -> Subscriber<VerificationState> {
828 self.client.inner.verification_state.subscribe_reset()
829 }
830
831 pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
833 let olm = self.client.olm_machine().await;
834 let olm = olm.as_ref()?;
835 #[allow(clippy::bind_instead_of_map)]
836 olm.get_verification(user_id, flow_id).and_then(|v| match v {
837 matrix_sdk_base::crypto::Verification::SasV1(sas) => {
838 Some(SasVerification { inner: sas, client: self.client.clone() }.into())
839 }
840 #[cfg(feature = "qrcode")]
841 matrix_sdk_base::crypto::Verification::QrV1(qr) => {
842 Some(verification::QrVerification { inner: qr, client: self.client.clone() }.into())
843 }
844 _ => None,
845 })
846 }
847
848 pub async fn get_verification_request(
851 &self,
852 user_id: &UserId,
853 flow_id: impl AsRef<str>,
854 ) -> Option<VerificationRequest> {
855 let olm = self.client.olm_machine().await;
856 let olm = olm.as_ref()?;
857
858 olm.get_verification_request(user_id, flow_id)
859 .map(|r| VerificationRequest { inner: r, client: self.client.clone() })
860 }
861
862 pub async fn get_device(
896 &self,
897 user_id: &UserId,
898 device_id: &DeviceId,
899 ) -> Result<Option<Device>, CryptoStoreError> {
900 let olm = self.client.olm_machine().await;
901 let Some(machine) = olm.as_ref() else { return Ok(None) };
902 let device = machine.get_device(user_id, device_id, None).await?;
903 Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
904 }
905
906 pub async fn get_own_device(&self) -> Result<Option<Device>, CryptoStoreError> {
913 let olm = self.client.olm_machine().await;
914 let Some(machine) = olm.as_ref() else { return Ok(None) };
915 let device = machine.get_device(machine.user_id(), machine.device_id(), None).await?;
916 Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
917 }
918
919 pub async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices, Error> {
945 let devices = self
946 .client
947 .olm_machine()
948 .await
949 .as_ref()
950 .ok_or(Error::NoOlmMachine)?
951 .get_user_devices(user_id, None)
952 .await?;
953
954 Ok(UserDevices { inner: devices, client: self.client.clone() })
955 }
956
957 pub async fn get_user_identity(
993 &self,
994 user_id: &UserId,
995 ) -> Result<Option<UserIdentity>, CryptoStoreError> {
996 let olm = self.client.olm_machine().await;
997 let Some(olm) = olm.as_ref() else { return Ok(None) };
998 let identity = olm.get_identity(user_id, None).await?;
999
1000 Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1001 }
1002
1003 pub async fn request_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
1041 let olm = self.client.olm_machine().await;
1042 let Some(olm) = olm.as_ref() else { return Ok(None) };
1043
1044 let (request_id, request) = olm.query_keys_for_users(iter::once(user_id));
1045 self.client.keys_query(&request_id, request.device_keys).await?;
1046
1047 let identity = olm.get_identity(user_id, None).await?;
1048 Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1049 }
1050
1051 pub async fn devices_stream(&self) -> Result<impl Stream<Item = DeviceUpdates>> {
1082 let olm = self.client.olm_machine().await;
1083 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1084 let client = self.client.to_owned();
1085
1086 Ok(olm
1087 .store()
1088 .devices_stream()
1089 .map(move |updates| DeviceUpdates::new(client.to_owned(), updates)))
1090 }
1091
1092 pub async fn user_identities_stream(&self) -> Result<impl Stream<Item = IdentityUpdates>> {
1120 let olm = self.client.olm_machine().await;
1121 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1122 let client = self.client.to_owned();
1123
1124 Ok(olm
1125 .store()
1126 .user_identities_stream()
1127 .map(move |updates| IdentityUpdates::new(client.to_owned(), updates)))
1128 }
1129
1130 pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData>) -> Result<()> {
1169 let olm = self.client.olm_machine().await;
1170 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1171
1172 let CrossSigningBootstrapRequests {
1173 upload_signing_keys_req,
1174 upload_keys_req,
1175 upload_signatures_req,
1176 } = olm.bootstrap_cross_signing(false).await?;
1177
1178 let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1179 auth: auth_data,
1180 master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1181 self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1182 user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1183 });
1184
1185 if let Some(req) = upload_keys_req {
1186 self.client.send_outgoing_request(req).await?;
1187 }
1188 self.client.send(upload_signing_keys_req).await?;
1189 self.client.send(upload_signatures_req).await?;
1190
1191 Ok(())
1192 }
1193
1194 pub async fn reset_cross_signing(&self) -> Result<Option<CrossSigningResetHandle>> {
1231 let olm = self.client.olm_machine().await;
1232 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1233
1234 let CrossSigningBootstrapRequests {
1235 upload_keys_req,
1236 upload_signing_keys_req,
1237 upload_signatures_req,
1238 } = olm.bootstrap_cross_signing(true).await?;
1239
1240 let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1241 auth: None,
1242 master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1243 self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1244 user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1245 });
1246
1247 if let Some(req) = upload_keys_req {
1248 self.client.send_outgoing_request(req).await?;
1249 }
1250
1251 if let Err(error) = self.client.send(upload_signing_keys_req.clone()).await {
1252 if let Ok(Some(auth_type)) = CrossSigningResetAuthType::new(&error) {
1253 let client = self.client.clone();
1254
1255 Ok(Some(CrossSigningResetHandle::new(
1256 client,
1257 upload_signing_keys_req,
1258 upload_signatures_req,
1259 auth_type,
1260 )))
1261 } else {
1262 Err(error.into())
1263 }
1264 } else {
1265 self.client.send(upload_signatures_req).await?;
1266
1267 Ok(None)
1268 }
1269 }
1270
1271 async fn ensure_initial_key_query(&self) -> Result<()> {
1274 let olm_machine = self.client.olm_machine().await;
1275 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1276
1277 let user_id = olm_machine.user_id();
1278
1279 if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1280 let (request_id, request) = olm_machine.query_keys_for_users([olm_machine.user_id()]);
1281 self.client.keys_query(&request_id, request.device_keys).await?;
1282 }
1283
1284 Ok(())
1285 }
1286
1287 pub async fn bootstrap_cross_signing_if_needed(
1334 &self,
1335 auth_data: Option<AuthData>,
1336 ) -> Result<()> {
1337 let olm_machine = self.client.olm_machine().await;
1338 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1339 let user_id = olm_machine.user_id();
1340
1341 self.ensure_initial_key_query().await?;
1342
1343 if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1344 self.bootstrap_cross_signing(auth_data).await?;
1345 }
1346
1347 Ok(())
1348 }
1349
1350 #[cfg(not(target_family = "wasm"))]
1402 pub async fn export_room_keys(
1403 &self,
1404 path: PathBuf,
1405 passphrase: &str,
1406 predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
1407 ) -> Result<()> {
1408 let olm = self.client.olm_machine().await;
1409 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1410
1411 let keys = olm.store().export_room_keys(predicate).await?;
1412 let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1413
1414 let encrypt = move || -> Result<()> {
1415 let export: String =
1416 matrix_sdk_base::crypto::encrypt_room_key_export(&keys, &passphrase, 500_000)?;
1417 let mut file = std::fs::File::create(path)?;
1418 file.write_all(&export.into_bytes())?;
1419 Ok(())
1420 };
1421
1422 let task = tokio::task::spawn_blocking(encrypt);
1423 task.await.expect("Task join error")
1424 }
1425
1426 #[cfg(not(target_family = "wasm"))]
1464 pub async fn import_room_keys(
1465 &self,
1466 path: PathBuf,
1467 passphrase: &str,
1468 ) -> Result<RoomKeyImportResult, RoomKeyImportError> {
1469 let olm = self.client.olm_machine().await;
1470 let olm = olm.as_ref().ok_or(RoomKeyImportError::StoreClosed)?;
1471 let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1472
1473 let decrypt = move || {
1474 let file = std::fs::File::open(path)?;
1475 matrix_sdk_base::crypto::decrypt_room_key_export(file, &passphrase)
1476 };
1477
1478 let task = tokio::task::spawn_blocking(decrypt);
1479 let import = task.await.expect("Task join error")?;
1480
1481 let ret = olm.store().import_exported_room_keys(import, |_, _| {}).await?;
1482
1483 self.backups().maybe_trigger_backup();
1484
1485 Ok(ret)
1486 }
1487
1488 pub async fn room_keys_received_stream(
1519 &self,
1520 ) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>>> {
1521 let olm = self.client.olm_machine().await;
1522 let olm = olm.as_ref()?;
1523
1524 Some(olm.store().room_keys_received_stream())
1525 }
1526
1527 pub async fn historic_room_key_stream(&self) -> Option<impl Stream<Item = RoomKeyBundleInfo>> {
1558 let olm = self.client.olm_machine().await;
1559 let olm = olm.as_ref()?;
1560
1561 Some(olm.store().historic_room_key_stream())
1562 }
1563
1564 pub fn secret_storage(&self) -> SecretStorage {
1566 SecretStorage { client: self.client.to_owned() }
1567 }
1568
1569 pub fn backups(&self) -> Backups {
1571 Backups { client: self.client.to_owned() }
1572 }
1573
1574 pub fn recovery(&self) -> Recovery {
1576 Recovery { client: self.client.to_owned() }
1577 }
1578
1579 pub async fn enable_cross_process_store_lock(&self, lock_value: String) -> Result<(), Error> {
1591 if let Some(prev_lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1593 let prev_holder = prev_lock.lock_holder();
1594 if prev_holder == lock_value {
1595 return Ok(());
1596 }
1597 warn!(
1598 "Recreating cross-process store lock with a different holder value: \
1599 prev was {prev_holder}, new is {lock_value}"
1600 );
1601 }
1602
1603 let olm_machine = self.client.base_client().olm_machine().await;
1604 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1605
1606 let lock =
1607 olm_machine.store().create_store_lock("cross_process_lock".to_owned(), lock_value);
1608
1609 {
1614 let guard = lock.try_lock_once().await?;
1615 if guard.is_some() {
1616 olm_machine
1617 .initialize_crypto_store_generation(
1618 &self.client.locks().crypto_store_generation,
1619 )
1620 .await?;
1621 }
1622 }
1623
1624 self.client
1625 .locks()
1626 .cross_process_crypto_store_lock
1627 .set(lock)
1628 .map_err(|_| Error::BadCryptoStoreState)?;
1629
1630 Ok(())
1631 }
1632
1633 async fn on_lock_newly_acquired(&self) -> Result<u64, Error> {
1638 let olm_machine_guard = self.client.olm_machine().await;
1639 if let Some(olm_machine) = olm_machine_guard.as_ref() {
1640 let (new_gen, generation_number) = olm_machine
1641 .maintain_crypto_store_generation(&self.client.locks().crypto_store_generation)
1642 .await?;
1643 if new_gen {
1645 drop(olm_machine_guard);
1647 self.client.base_client().regenerate_olm(None).await?;
1649 }
1650 Ok(generation_number)
1651 } else {
1652 warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised");
1657 Ok(0)
1658 }
1659 }
1660
1661 pub async fn spin_lock_store(
1667 &self,
1668 max_backoff: Option<u32>,
1669 ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1670 if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1671 let guard = lock.spin_lock(max_backoff).await?;
1672
1673 let generation = self.on_lock_newly_acquired().await?;
1674
1675 Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation }))
1676 } else {
1677 Ok(None)
1678 }
1679 }
1680
1681 pub async fn try_lock_store_once(
1686 &self,
1687 ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1688 if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1689 let maybe_guard = lock.try_lock_once().await?;
1690
1691 let Some(guard) = maybe_guard else {
1692 return Ok(None);
1693 };
1694
1695 let generation = self.on_lock_newly_acquired().await?;
1696
1697 Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation }))
1698 } else {
1699 Ok(None)
1700 }
1701 }
1702
1703 #[cfg(any(test, feature = "testing"))]
1705 pub async fn uploaded_key_count(&self) -> Result<u64> {
1706 let olm_machine = self.client.olm_machine().await;
1707 let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?;
1708 Ok(olm_machine.uploaded_key_count().await?)
1709 }
1710
1711 pub(crate) async fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1735 let bundle_receiver_task = if self.client.inner.enable_share_history_on_invite {
1739 Some(BundleReceiverTask::new(&self.client).await)
1740 } else {
1741 None
1742 };
1743
1744 let mut tasks = self.client.inner.e2ee.tasks.lock();
1745
1746 let this = self.clone();
1747
1748 tasks.setup_e2ee = Some(spawn(async move {
1749 this.update_verification_state().await;
1752
1753 if this.settings().auto_enable_cross_signing {
1754 if let Err(e) = this.bootstrap_cross_signing_if_needed(auth_data).await {
1755 error!("Couldn't bootstrap cross signing {e:?}");
1756 }
1757 }
1758
1759 if let Err(e) = this.backups().setup_and_resume().await {
1760 error!("Couldn't setup and resume backups {e:?}");
1761 }
1762 if let Err(e) = this.recovery().setup().await {
1763 error!("Couldn't setup and resume recovery {e:?}");
1764 }
1765 }));
1766
1767 tasks.receive_historic_room_key_bundles = bundle_receiver_task;
1768 }
1769
1770 pub async fn wait_for_e2ee_initialization_tasks(&self) {
1773 let task = self.client.inner.e2ee.tasks.lock().setup_e2ee.take();
1774
1775 if let Some(task) = task {
1776 if let Err(err) = task.await {
1777 warn!("Error when initializing backups: {err}");
1778 }
1779 }
1780 }
1781
1782 pub(crate) async fn ensure_device_keys_upload(&self) -> Result<()> {
1792 let olm = self.client.olm_machine().await;
1793 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1794
1795 if let Some((request_id, request)) = olm.upload_device_keys().await? {
1796 self.client.keys_upload(&request_id, &request).await?;
1797
1798 let (request_id, request) = olm.query_keys_for_users([olm.user_id()]);
1799 self.client.keys_query(&request_id, request.device_keys).await?;
1800 }
1801
1802 Ok(())
1803 }
1804
1805 pub(crate) async fn update_state_after_keys_query(&self, response: &get_keys::v3::Response) {
1806 self.recovery().update_state_after_keys_query(response).await;
1807
1808 if let Some(user_id) = self.client.user_id() {
1810 let contains_own_device = response.device_keys.contains_key(user_id);
1811
1812 if contains_own_device {
1813 self.update_verification_state().await;
1814 }
1815 }
1816 }
1817
1818 async fn update_verification_state(&self) {
1819 match self.get_own_device().await {
1820 Ok(device) => {
1821 if let Some(device) = device {
1822 let is_verified = device.is_cross_signed_by_owner();
1823
1824 if is_verified {
1825 self.client.inner.verification_state.set(VerificationState::Verified);
1826 } else {
1827 self.client.inner.verification_state.set(VerificationState::Unverified);
1828 }
1829 } else {
1830 warn!("Couldn't find out own device in the store.");
1831 self.client.inner.verification_state.set(VerificationState::Unknown);
1832 }
1833 }
1834 Err(error) => {
1835 warn!("Failed retrieving own device: {error}");
1836 self.client.inner.verification_state.set(VerificationState::Unknown);
1837 }
1838 }
1839 }
1840
1841 #[cfg(feature = "experimental-send-custom-to-device")]
1850 pub async fn encrypt_and_send_raw_to_device(
1851 &self,
1852 recipient_devices: Vec<&Device>,
1853 event_type: &str,
1854 content: Raw<AnyToDeviceEventContent>,
1855 share_strategy: CollectStrategy,
1856 ) -> Result<Vec<(OwnedUserId, OwnedDeviceId)>> {
1857 let users = recipient_devices.iter().map(|device| device.user_id());
1858
1859 self.client.claim_one_time_keys(users).await?;
1863
1864 let olm = self.client.olm_machine().await;
1865 let olm = olm.as_ref().expect("Olm machine wasn't started");
1866
1867 let (requests, withhelds) = olm
1868 .encrypt_content_for_devices(
1869 recipient_devices.into_iter().map(|d| d.deref().clone()).collect(),
1870 event_type,
1871 &content
1872 .deserialize_as::<serde_json::Value>()
1873 .expect("Deserialize as Value will always work"),
1874 share_strategy,
1875 )
1876 .await?;
1877
1878 let mut failures: Vec<(OwnedUserId, OwnedDeviceId)> = Default::default();
1879
1880 withhelds.iter().for_each(|(d, _)| {
1882 failures.push((d.user_id().to_owned(), d.device_id().to_owned()));
1883 });
1884
1885 for request in requests {
1887 let ruma_request = RumaToDeviceRequest::new_raw(
1888 request.event_type.clone(),
1889 request.txn_id.clone(),
1890 request.messages.clone(),
1891 );
1892
1893 let send_result = self
1894 .client
1895 .send_inner(ruma_request, Some(RequestConfig::short_retry()), Default::default())
1896 .await;
1897
1898 if send_result.is_err() {
1900 for (user_id, device_map) in request.messages {
1902 for device_id in device_map.keys() {
1903 match device_id {
1904 DeviceIdOrAllDevices::DeviceId(device_id) => {
1905 failures.push((user_id.clone(), device_id.to_owned()));
1906 }
1907 DeviceIdOrAllDevices::AllDevices => {
1908 }
1910 }
1911 }
1912 }
1913 }
1914 }
1915
1916 Ok(failures)
1917 }
1918}
1919
1920#[cfg(all(test, not(target_family = "wasm")))]
1921mod tests {
1922 use std::{
1923 ops::Not,
1924 sync::{
1925 atomic::{AtomicBool, Ordering},
1926 Arc,
1927 },
1928 time::Duration,
1929 };
1930
1931 use matrix_sdk_test::{
1932 async_test, event_factory::EventFactory, test_json, JoinedRoomBuilder, StateTestEvent,
1933 SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
1934 };
1935 use ruma::{
1936 event_id,
1937 events::{reaction::ReactionEventContent, relation::Annotation},
1938 user_id,
1939 };
1940 use serde_json::json;
1941 use wiremock::{
1942 matchers::{header, method, path_regex},
1943 Mock, MockServer, Request, ResponseTemplate,
1944 };
1945
1946 use crate::{
1947 assert_next_matches_with_timeout,
1948 config::RequestConfig,
1949 encryption::{OAuthCrossSigningResetInfo, VerificationState},
1950 test_utils::{
1951 client::mock_matrix_session, logged_in_client, no_retry_test_client, set_client_session,
1952 },
1953 Client,
1954 };
1955
1956 #[async_test]
1957 async fn test_reaction_sending() {
1958 let server = MockServer::start().await;
1959 let client = logged_in_client(Some(server.uri())).await;
1960
1961 let event_id = event_id!("$2:example.org");
1962
1963 Mock::given(method("GET"))
1964 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
1965 .and(header("authorization", "Bearer 1234"))
1966 .respond_with(
1967 ResponseTemplate::new(200)
1968 .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
1969 )
1970 .mount(&server)
1971 .await;
1972
1973 Mock::given(method("PUT"))
1974 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/m\.reaction/.*".to_owned()))
1975 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1976 "event_id": event_id,
1977 })))
1978 .mount(&server)
1979 .await;
1980
1981 let response = SyncResponseBuilder::default()
1982 .add_joined_room(
1983 JoinedRoomBuilder::default()
1984 .add_state_event(StateTestEvent::Member)
1985 .add_state_event(StateTestEvent::PowerLevels)
1986 .add_state_event(StateTestEvent::Encryption),
1987 )
1988 .build_sync_response();
1989
1990 client.base_client().receive_sync_response(response).await.unwrap();
1991
1992 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
1993 assert!(room
1994 .latest_encryption_state()
1995 .await
1996 .expect("Getting encryption state")
1997 .is_encrypted());
1998
1999 let event_id = event_id!("$1:example.org");
2000 let reaction = ReactionEventContent::new(Annotation::new(event_id.into(), "🐈".to_owned()));
2001 room.send(reaction).await.expect("Sending the reaction should not fail");
2002
2003 room.send_raw("m.reaction", json!({})).await.expect("Sending the reaction should not fail");
2004 }
2005
2006 #[async_test]
2007 async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
2008 let server = MockServer::start().await;
2009 let client = logged_in_client(Some(server.uri())).await;
2010 let user_id = user_id!("@invited:localhost");
2014
2015 let f = EventFactory::new();
2017 let response = SyncResponseBuilder::default()
2018 .add_joined_room(
2019 JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberAdditional),
2020 )
2021 .add_global_account_data(
2022 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
2023 )
2024 .build_sync_response();
2025 client.base_client().receive_sync_response(response).await.unwrap();
2026
2027 let found_room = client.get_dm_room(user_id).expect("DM not found!");
2029 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
2030 }
2031
2032 #[async_test]
2033 async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
2034 let server = MockServer::start().await;
2035 let client = logged_in_client(Some(server.uri())).await;
2036 let user_id = user_id!("@invited:localhost");
2038
2039 let f = EventFactory::new();
2041 let response = SyncResponseBuilder::default()
2042 .add_joined_room(
2043 JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberInvite),
2044 )
2045 .add_global_account_data(
2046 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
2047 )
2048 .build_sync_response();
2049 client.base_client().receive_sync_response(response).await.unwrap();
2050
2051 let found_room = client.get_dm_room(user_id).expect("DM not found!");
2053 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
2054 }
2055
2056 #[async_test]
2057 async fn test_get_dm_room_still_finds_left_room() {
2058 let server = MockServer::start().await;
2062 let client = logged_in_client(Some(server.uri())).await;
2063 let user_id = user_id!("@invited:localhost");
2067
2068 let f = EventFactory::new();
2070 let response = SyncResponseBuilder::default()
2071 .add_joined_room(
2072 JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberLeave),
2073 )
2074 .add_global_account_data(
2075 f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
2076 )
2077 .build_sync_response();
2078 client.base_client().receive_sync_response(response).await.unwrap();
2079
2080 let found_room = client.get_dm_room(user_id).expect("DM not found!");
2082 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
2083 }
2084
2085 #[cfg(feature = "sqlite")]
2086 #[async_test]
2087 async fn test_generation_counter_invalidates_olm_machine() {
2088 use matrix_sdk_base::store::RoomLoadSettings;
2091 let sqlite_path = std::env::temp_dir().join("generation_counter_sqlite.db");
2092 let session = mock_matrix_session();
2093
2094 let client1 = Client::builder()
2095 .homeserver_url("http://localhost:1234")
2096 .request_config(RequestConfig::new().disable_retry())
2097 .sqlite_store(&sqlite_path, None)
2098 .build()
2099 .await
2100 .unwrap();
2101 client1
2102 .matrix_auth()
2103 .restore_session(session.clone(), RoomLoadSettings::default())
2104 .await
2105 .unwrap();
2106
2107 let client2 = Client::builder()
2108 .homeserver_url("http://localhost:1234")
2109 .request_config(RequestConfig::new().disable_retry())
2110 .sqlite_store(sqlite_path, None)
2111 .build()
2112 .await
2113 .unwrap();
2114 client2.matrix_auth().restore_session(session, RoomLoadSettings::default()).await.unwrap();
2115
2116 let guard = client1.encryption().try_lock_store_once().await.unwrap();
2118 assert!(guard.is_none());
2119
2120 client1.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2121 client2.encryption().enable_cross_process_store_lock("client2".to_owned()).await.unwrap();
2122
2123 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2125 assert!(acquired1.is_some());
2126
2127 let initial_olm_machine =
2129 client1.olm_machine().await.clone().expect("must have an olm machine");
2130
2131 let decryption_key = matrix_sdk_base::crypto::store::types::BackupDecryptionKey::new()
2133 .expect("Can't create new recovery key");
2134 let backup_key = decryption_key.megolm_v1_public_key();
2135 backup_key.set_version("1".to_owned());
2136 initial_olm_machine
2137 .backup_machine()
2138 .save_decryption_key(Some(decryption_key.to_owned()), Some("1".to_owned()))
2139 .await
2140 .expect("Should save");
2141
2142 initial_olm_machine.backup_machine().enable_backup_v1(backup_key.clone()).await.unwrap();
2143
2144 assert!(client1.encryption().backups().are_enabled().await);
2145
2146 let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2148 assert!(acquired2.is_none());
2149
2150 drop(acquired1);
2152 tokio::time::sleep(Duration::from_millis(100)).await;
2153
2154 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2156 assert!(acquired1.is_some());
2157
2158 let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2160 assert!(initial_olm_machine.same_as(&olm_machine));
2161
2162 drop(acquired1);
2164 tokio::time::sleep(Duration::from_millis(100)).await;
2165
2166 let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2168 assert!(acquired2.is_some());
2169
2170 drop(acquired2);
2172 tokio::time::sleep(Duration::from_millis(100)).await;
2173
2174 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2176 assert!(acquired1.is_some());
2177
2178 let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2180
2181 assert!(!initial_olm_machine.same_as(&olm_machine));
2182
2183 let backup_key_new = olm_machine.backup_machine().get_backup_keys().await.unwrap();
2184 assert!(backup_key_new.decryption_key.is_some());
2185 assert_eq!(
2186 backup_key_new.decryption_key.unwrap().megolm_v1_public_key().to_base64(),
2187 backup_key.to_base64()
2188 );
2189 assert!(client1.encryption().backups().are_enabled().await);
2190 }
2191
2192 #[cfg(feature = "sqlite")]
2193 #[async_test]
2194 async fn test_generation_counter_no_spurious_invalidation() {
2195 use matrix_sdk_base::store::RoomLoadSettings;
2198 let sqlite_path =
2199 std::env::temp_dir().join("generation_counter_no_spurious_invalidations.db");
2200 let session = mock_matrix_session();
2201
2202 let client = Client::builder()
2203 .homeserver_url("http://localhost:1234")
2204 .request_config(RequestConfig::new().disable_retry())
2205 .sqlite_store(&sqlite_path, None)
2206 .build()
2207 .await
2208 .unwrap();
2209 client
2210 .matrix_auth()
2211 .restore_session(session.clone(), RoomLoadSettings::default())
2212 .await
2213 .unwrap();
2214
2215 let initial_olm_machine = client.olm_machine().await.as_ref().unwrap().clone();
2216
2217 client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2218
2219 let after_enabling_lock = client.olm_machine().await.as_ref().unwrap().clone();
2221 assert!(initial_olm_machine.same_as(&after_enabling_lock));
2222
2223 {
2224 let client2 = Client::builder()
2226 .homeserver_url("http://localhost:1234")
2227 .request_config(RequestConfig::new().disable_retry())
2228 .sqlite_store(sqlite_path, None)
2229 .build()
2230 .await
2231 .unwrap();
2232 client2
2233 .matrix_auth()
2234 .restore_session(session, RoomLoadSettings::default())
2235 .await
2236 .unwrap();
2237
2238 client2
2239 .encryption()
2240 .enable_cross_process_store_lock("client2".to_owned())
2241 .await
2242 .unwrap();
2243
2244 let guard = client2.encryption().spin_lock_store(None).await.unwrap();
2245 assert!(guard.is_some());
2246
2247 drop(guard);
2248 tokio::time::sleep(Duration::from_millis(100)).await;
2249 }
2250
2251 {
2252 let acquired = client.encryption().try_lock_store_once().await.unwrap();
2253 assert!(acquired.is_some());
2254 }
2255
2256 let after_taking_lock_first_time = client.olm_machine().await.as_ref().unwrap().clone();
2258 assert!(!initial_olm_machine.same_as(&after_taking_lock_first_time));
2259
2260 {
2261 let acquired = client.encryption().try_lock_store_once().await.unwrap();
2262 assert!(acquired.is_some());
2263 }
2264
2265 let after_taking_lock_second_time = client.olm_machine().await.as_ref().unwrap().clone();
2267 assert!(after_taking_lock_first_time.same_as(&after_taking_lock_second_time));
2268 }
2269
2270 #[async_test]
2271 async fn test_update_verification_state_is_updated_before_any_requests_happen() {
2272 let client = no_retry_test_client(None).await;
2274 let server = MockServer::start().await;
2275
2276 let mut verification_state = client.encryption().verification_state();
2278
2279 assert_next_matches_with_timeout!(verification_state, VerificationState::Unknown);
2281
2282 let keys_requested = Arc::new(AtomicBool::new(false));
2285 let inner_bool = keys_requested.clone();
2286
2287 Mock::given(method("GET"))
2288 .and(path_regex(
2289 r"/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
2290 ))
2291 .respond_with(move |_req: &Request| {
2292 inner_bool.fetch_or(true, Ordering::SeqCst);
2293 ResponseTemplate::new(200).set_body_json(json!({}))
2294 })
2295 .mount(&server)
2296 .await;
2297
2298 set_client_session(&client).await;
2300
2301 assert!(keys_requested.load(Ordering::SeqCst).not());
2303 assert_next_matches_with_timeout!(verification_state, VerificationState::Unverified);
2304 }
2305
2306 #[test]
2307 fn test_oauth_reset_info_from_uiaa_info() {
2308 let auth_info = json!({
2309 "session": "dummy",
2310 "flows": [
2311 {
2312 "stages": [
2313 "org.matrix.cross_signing_reset"
2314 ]
2315 }
2316 ],
2317 "params": {
2318 "org.matrix.cross_signing_reset": {
2319 "url": "https://example.org/account/account?action=org.matrix.cross_signing_reset"
2320 }
2321 },
2322 "msg": "To reset..."
2323 });
2324
2325 let auth_info = serde_json::from_value(auth_info)
2326 .expect("We should be able to deserialize the UiaaInfo");
2327 OAuthCrossSigningResetInfo::from_auth_info(&auth_info)
2328 .expect("We should be able to fetch the cross-signing reset info from the auth info");
2329 }
2330}