1#![doc = include_str!("../docs/encryption.md")]
17#![cfg_attr(target_family = "wasm", allow(unused_imports))]
18
19#[cfg(feature = "experimental-send-custom-to-device")]
20use std::ops::Deref;
21use std::{
22 collections::{BTreeMap, HashSet},
23 io::{Cursor, Read, Write},
24 iter,
25 path::PathBuf,
26 sync::Arc,
27};
28
29use eyeball::{SharedObservable, Subscriber};
30use futures_core::Stream;
31use futures_util::{
32 future::try_join,
33 stream::{self, StreamExt},
34};
35use matrix_sdk_base::crypto::{
36 store::types::RoomKeyInfo,
37 types::requests::{
38 OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
39 },
40 CrossSigningBootstrapRequests, OlmMachine,
41};
42use matrix_sdk_common::{executor::spawn, locks::Mutex as StdMutex};
43use ruma::{
44 api::client::{
45 keys::{
46 get_keys, upload_keys, upload_signatures::v3::Request as UploadSignaturesRequest,
47 upload_signing_keys::v3::Request as UploadSigningKeysRequest,
48 },
49 message::send_message_event,
50 to_device::send_event_to_device::v3::{
51 Request as RumaToDeviceRequest, Response as ToDeviceResponse,
52 },
53 uiaa::{AuthData, UiaaInfo},
54 },
55 assign,
56 events::{
57 direct::DirectUserIdentifier,
58 room::{MediaSource, ThumbnailInfo},
59 },
60 DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, TransactionId, UserId,
61};
62#[cfg(feature = "experimental-send-custom-to-device")]
63use ruma::{events::AnyToDeviceEventContent, serde::Raw, to_device::DeviceIdOrAllDevices};
64use serde::Deserialize;
65use tokio::sync::{Mutex, RwLockReadGuard};
66use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
67use tracing::{debug, error, instrument, trace, warn};
68use url::Url;
69use vodozemac::Curve25519PublicKey;
70
71use self::{
72 backups::{types::BackupClientState, Backups},
73 futures::UploadEncryptedFile,
74 identities::{Device, DeviceUpdates, IdentityUpdates, UserDevices, UserIdentity},
75 recovery::{Recovery, RecoveryState},
76 secret_storage::SecretStorage,
77 tasks::{BackupDownloadTask, BackupUploadingTask, ClientTasks},
78 verification::{SasVerification, Verification, VerificationRequest},
79};
80use crate::{
81 attachment::Thumbnail,
82 client::{ClientInner, WeakClient},
83 error::HttpResult,
84 store_locks::CrossProcessStoreLockGuard,
85 Client, Error, HttpError, Result, Room, TransmissionProgress,
86};
87
88pub mod backups;
89pub mod futures;
90pub mod identities;
91pub mod recovery;
92pub mod secret_storage;
93pub(crate) mod tasks;
94pub mod verification;
95
96pub use matrix_sdk_base::crypto::{
97 olm::{
98 SessionCreationError as MegolmSessionCreationError,
99 SessionExportError as OlmSessionExportError,
100 },
101 vodozemac, CrossSigningStatus, CryptoStoreError, DecryptorError, EventError, KeyExportError,
102 LocalTrust, MediaEncryptionInfo, MegolmError, OlmError, RoomKeyImportResult, SecretImportError,
103 SessionCreationError, SignatureError, VERSION,
104};
105
106#[cfg(feature = "experimental-send-custom-to-device")]
107use crate::config::RequestConfig;
108pub use crate::error::RoomKeyImportError;
109
110pub(crate) struct EncryptionData {
112 pub tasks: StdMutex<ClientTasks>,
115
116 pub encryption_settings: EncryptionSettings,
118
119 pub backup_state: BackupClientState,
121
122 pub recovery_state: SharedObservable<RecoveryState>,
124}
125
126impl EncryptionData {
127 pub fn new(encryption_settings: EncryptionSettings) -> Self {
128 Self {
129 encryption_settings,
130
131 tasks: StdMutex::new(Default::default()),
132 backup_state: Default::default(),
133 recovery_state: Default::default(),
134 }
135 }
136
137 pub fn initialize_room_key_tasks(&self, client: &Arc<ClientInner>) {
138 let weak_client = WeakClient::from_inner(client);
139
140 let mut tasks = self.tasks.lock();
141 tasks.upload_room_keys = Some(BackupUploadingTask::new(weak_client.clone()));
142
143 if self.encryption_settings.backup_download_strategy
144 == BackupDownloadStrategy::AfterDecryptionFailure
145 {
146 tasks.download_room_keys = Some(BackupDownloadTask::new(weak_client));
147 }
148 }
149
150 pub fn initialize_recovery_state_update_task(&self, client: &Client) {
156 let mut guard = self.tasks.lock();
157
158 let future = Recovery::update_state_after_backup_state_change(client);
159 let join_handle = spawn(future);
160
161 guard.update_recovery_state_after_backup = Some(join_handle);
162 }
163}
164
165#[derive(Clone, Copy, Debug, Default)]
167pub struct EncryptionSettings {
168 pub auto_enable_cross_signing: bool,
174
175 pub backup_download_strategy: BackupDownloadStrategy,
180
181 pub auto_enable_backups: bool,
183}
184
185#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
187#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
188pub enum BackupDownloadStrategy {
189 OneShot,
200
201 AfterDecryptionFailure,
203
204 #[default]
209 Manual,
210}
211
212#[derive(Clone, Copy, Debug, Eq, PartialEq)]
217pub enum VerificationState {
218 Unknown,
220 Verified,
223 Unverified,
225}
226
227#[derive(Debug)]
229pub struct CrossProcessLockStoreGuardWithGeneration {
230 _guard: CrossProcessStoreLockGuard,
231 generation: u64,
232}
233
234impl CrossProcessLockStoreGuardWithGeneration {
235 pub fn generation(&self) -> u64 {
237 self.generation
238 }
239}
240
241#[derive(Debug)]
252pub struct CrossSigningResetHandle {
253 client: Client,
254 upload_request: UploadSigningKeysRequest,
255 signatures_request: UploadSignaturesRequest,
256 auth_type: CrossSigningResetAuthType,
257 is_cancelled: Mutex<bool>,
258}
259
260impl CrossSigningResetHandle {
261 pub fn new(
263 client: Client,
264 upload_request: UploadSigningKeysRequest,
265 signatures_request: UploadSignaturesRequest,
266 auth_type: CrossSigningResetAuthType,
267 ) -> Self {
268 Self {
269 client,
270 upload_request,
271 signatures_request,
272 auth_type,
273 is_cancelled: Mutex::new(false),
274 }
275 }
276
277 pub fn auth_type(&self) -> &CrossSigningResetAuthType {
280 &self.auth_type
281 }
282
283 pub async fn auth(&self, auth: Option<AuthData>) -> Result<()> {
287 let mut upload_request = self.upload_request.clone();
288 upload_request.auth = auth;
289
290 while let Err(e) = self.client.send(upload_request.clone()).await {
291 if *self.is_cancelled.lock().await {
292 return Ok(());
293 }
294
295 match e.as_uiaa_response() {
296 Some(uiaa_info) => {
297 if uiaa_info.auth_error.is_some() {
298 return Err(e.into());
299 }
300 }
301 None => return Err(e.into()),
302 }
303 }
304
305 self.client.send(self.signatures_request.clone()).await?;
306
307 Ok(())
308 }
309
310 pub async fn cancel(&self) {
312 *self.is_cancelled.lock().await = true;
313 }
314}
315
316#[derive(Debug, Clone)]
319pub enum CrossSigningResetAuthType {
320 Uiaa(UiaaInfo),
322 OAuth(OAuthCrossSigningResetInfo),
325}
326
327impl CrossSigningResetAuthType {
328 fn new(error: &HttpError) -> Result<Option<Self>> {
329 if let Some(auth_info) = error.as_uiaa_response() {
330 if let Ok(auth_info) = OAuthCrossSigningResetInfo::from_auth_info(auth_info) {
331 Ok(Some(CrossSigningResetAuthType::OAuth(auth_info)))
332 } else {
333 Ok(Some(CrossSigningResetAuthType::Uiaa(auth_info.clone())))
334 }
335 } else {
336 Ok(None)
337 }
338 }
339}
340
341#[derive(Debug, Clone, Deserialize)]
344pub struct OAuthCrossSigningResetInfo {
345 pub approval_url: Url,
347}
348
349impl OAuthCrossSigningResetInfo {
350 fn from_auth_info(auth_info: &UiaaInfo) -> Result<Self> {
351 let parameters =
352 serde_json::from_str::<OAuthCrossSigningResetUiaaParameters>(auth_info.params.get())?;
353
354 Ok(OAuthCrossSigningResetInfo { approval_url: parameters.reset.url })
355 }
356}
357
358#[derive(Debug, Deserialize)]
361struct OAuthCrossSigningResetUiaaParameters {
362 #[serde(rename = "org.matrix.cross_signing_reset")]
364 reset: OAuthCrossSigningResetUiaaResetParameter,
365}
366
367#[derive(Debug, Deserialize)]
370struct OAuthCrossSigningResetUiaaResetParameter {
371 url: Url,
373}
374
375impl Client {
376 pub(crate) async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
377 self.base_client().olm_machine().await
378 }
379
380 pub(crate) async fn mark_request_as_sent(
381 &self,
382 request_id: &TransactionId,
383 response: impl Into<matrix_sdk_base::crypto::types::requests::AnyIncomingResponse<'_>>,
384 ) -> Result<(), matrix_sdk_base::Error> {
385 Ok(self
386 .olm_machine()
387 .await
388 .as_ref()
389 .expect(
390 "We should have an olm machine once we try to mark E2EE related requests as sent",
391 )
392 .mark_request_as_sent(request_id, response)
393 .await?)
394 }
395
396 #[instrument(skip(self, device_keys))]
402 pub(crate) async fn keys_query(
403 &self,
404 request_id: &TransactionId,
405 device_keys: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
406 ) -> Result<get_keys::v3::Response> {
407 let request = assign!(get_keys::v3::Request::new(), { device_keys });
408
409 let response = self.send(request).await?;
410 self.mark_request_as_sent(request_id, &response).await?;
411 self.encryption().update_state_after_keys_query(&response).await;
412
413 Ok(response)
414 }
415
416 pub fn upload_encrypted_file<'a, R: Read + ?Sized + 'a>(
450 &'a self,
451 reader: &'a mut R,
452 ) -> UploadEncryptedFile<'a, R> {
453 UploadEncryptedFile::new(self, reader)
454 }
455
456 pub(crate) async fn upload_encrypted_media_and_thumbnail(
459 &self,
460 data: &[u8],
461 thumbnail: Option<Thumbnail>,
462 send_progress: SharedObservable<TransmissionProgress>,
463 ) -> Result<(MediaSource, Option<(MediaSource, Box<ThumbnailInfo>)>)> {
464 let upload_thumbnail = self.upload_encrypted_thumbnail(thumbnail, send_progress.clone());
465
466 let upload_attachment = async {
467 let mut cursor = Cursor::new(data);
468 self.upload_encrypted_file(&mut cursor)
469 .with_send_progress_observable(send_progress)
470 .await
471 };
472
473 let (thumbnail, file) = try_join(upload_thumbnail, upload_attachment).await?;
474
475 Ok((MediaSource::Encrypted(Box::new(file)), thumbnail))
476 }
477
478 async fn upload_encrypted_thumbnail(
481 &self,
482 thumbnail: Option<Thumbnail>,
483 send_progress: SharedObservable<TransmissionProgress>,
484 ) -> Result<Option<(MediaSource, Box<ThumbnailInfo>)>> {
485 let Some(thumbnail) = thumbnail else {
486 return Ok(None);
487 };
488
489 let (data, _, thumbnail_info) = thumbnail.into_parts();
490 let mut cursor = Cursor::new(data);
491
492 let file = self
493 .upload_encrypted_file(&mut cursor)
494 .with_send_progress_observable(send_progress)
495 .await?;
496
497 Ok(Some((MediaSource::Encrypted(Box::new(file)), thumbnail_info)))
498 }
499
500 pub(crate) async fn claim_one_time_keys(
506 &self,
507 users: impl Iterator<Item = &UserId>,
508 ) -> Result<()> {
509 let _lock = self.locks().key_claim_lock.lock().await;
510
511 if let Some((request_id, request)) = self
512 .olm_machine()
513 .await
514 .as_ref()
515 .ok_or(Error::NoOlmMachine)?
516 .get_missing_sessions(users)
517 .await?
518 {
519 let response = self.send(request).await?;
520 self.mark_request_as_sent(&request_id, &response).await?;
521 }
522
523 Ok(())
524 }
525
526 #[instrument(skip(self, request))]
536 pub(crate) async fn keys_upload(
537 &self,
538 request_id: &TransactionId,
539 request: &upload_keys::v3::Request,
540 ) -> Result<upload_keys::v3::Response> {
541 debug!(
542 device_keys = request.device_keys.is_some(),
543 one_time_key_count = request.one_time_keys.len(),
544 "Uploading public encryption keys",
545 );
546
547 let response = self.send(request.clone()).await?;
548 self.mark_request_as_sent(request_id, &response).await?;
549
550 Ok(response)
551 }
552
553 pub(crate) async fn room_send_helper(
554 &self,
555 request: &RoomMessageRequest,
556 ) -> Result<send_message_event::v3::Response> {
557 let content = request.content.clone();
558 let txn_id = request.txn_id.clone();
559 let room_id = &request.room_id;
560
561 self.get_room(room_id)
562 .expect("Can't send a message to a room that isn't known to the store")
563 .send(*content)
564 .with_transaction_id(txn_id)
565 .await
566 }
567
568 pub(crate) async fn send_to_device(
569 &self,
570 request: &ToDeviceRequest,
571 ) -> HttpResult<ToDeviceResponse> {
572 let request = RumaToDeviceRequest::new_raw(
573 request.event_type.clone(),
574 request.txn_id.clone(),
575 request.messages.clone(),
576 );
577
578 self.send(request).await
579 }
580
581 pub(crate) async fn send_verification_request(
582 &self,
583 request: OutgoingVerificationRequest,
584 ) -> Result<()> {
585 use matrix_sdk_base::crypto::types::requests::OutgoingVerificationRequest::*;
586
587 match request {
588 ToDevice(t) => {
589 self.send_to_device(&t).await?;
590 }
591 InRoom(r) => {
592 self.room_send_helper(&r).await?;
593 }
594 }
595
596 Ok(())
597 }
598
599 pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
601 let rooms = self.joined_rooms();
602
603 let room = rooms.into_iter().find(|r| {
605 let targets = r.direct_targets();
606 targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id))
607 });
608
609 trace!(?room, "Found room");
610 room
611 }
612
613 async fn send_outgoing_request(&self, r: OutgoingRequest) -> Result<()> {
614 use matrix_sdk_base::crypto::types::requests::AnyOutgoingRequest;
615
616 match r.request() {
617 AnyOutgoingRequest::KeysQuery(request) => {
618 self.keys_query(r.request_id(), request.device_keys.clone()).await?;
619 }
620 AnyOutgoingRequest::KeysUpload(request) => {
621 self.keys_upload(r.request_id(), request).await?;
622 }
623 AnyOutgoingRequest::ToDeviceRequest(request) => {
624 let response = self.send_to_device(request).await?;
625 self.mark_request_as_sent(r.request_id(), &response).await?;
626 }
627 AnyOutgoingRequest::SignatureUpload(request) => {
628 let response = self.send(request.clone()).await?;
629 self.mark_request_as_sent(r.request_id(), &response).await?;
630 }
631 AnyOutgoingRequest::RoomMessage(request) => {
632 let response = self.room_send_helper(request).await?;
633 self.mark_request_as_sent(r.request_id(), &response).await?;
634 }
635 AnyOutgoingRequest::KeysClaim(request) => {
636 let response = self.send(request.clone()).await?;
637 self.mark_request_as_sent(r.request_id(), &response).await?;
638 }
639 }
640
641 Ok(())
642 }
643
644 #[instrument(skip_all)]
645 pub(crate) async fn send_outgoing_requests(&self) -> Result<()> {
646 const MAX_CONCURRENT_REQUESTS: usize = 20;
647
648 if let Err(e) = self.claim_one_time_keys(iter::empty()).await {
651 warn!("Error while claiming one-time keys {:?}", e);
652 }
653
654 let outgoing_requests = stream::iter(
655 self.olm_machine()
656 .await
657 .as_ref()
658 .ok_or(Error::NoOlmMachine)?
659 .outgoing_requests()
660 .await?,
661 )
662 .map(|r| self.send_outgoing_request(r));
663
664 let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS);
665
666 requests
667 .for_each(|r| async move {
668 match r {
669 Ok(_) => (),
670 Err(e) => warn!(error = ?e, "Error when sending out an outgoing E2EE request"),
671 }
672 })
673 .await;
674
675 Ok(())
676 }
677}
678
679#[cfg(any(feature = "testing", test))]
680impl Client {
681 pub async fn olm_machine_for_testing(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
683 self.olm_machine().await
684 }
685}
686
687#[derive(Debug, Clone)]
691pub struct Encryption {
692 client: Client,
694}
695
696impl Encryption {
697 pub(crate) fn new(client: Client) -> Self {
698 Self { client }
699 }
700
701 pub(crate) fn settings(&self) -> EncryptionSettings {
703 self.client.inner.e2ee.encryption_settings
704 }
705
706 pub async fn ed25519_key(&self) -> Option<String> {
709 self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().ed25519.to_base64())
710 }
711
712 pub async fn curve25519_key(&self) -> Option<Curve25519PublicKey> {
714 self.client.olm_machine().await.as_ref().map(|o| o.identity_keys().curve25519)
715 }
716
717 pub async fn device_creation_timestamp(&self) -> MilliSecondsSinceUnixEpoch {
719 match self.get_own_device().await {
720 Ok(Some(device)) => device.first_time_seen_ts(),
721 _ => MilliSecondsSinceUnixEpoch::now(),
723 }
724 }
725
726 pub(crate) async fn import_secrets_bundle(
727 &self,
728 bundle: &matrix_sdk_base::crypto::types::SecretsBundle,
729 ) -> Result<(), SecretImportError> {
730 let olm_machine = self.client.olm_machine().await;
731 let olm_machine =
732 olm_machine.as_ref().expect("This should only be called once we have an OlmMachine");
733
734 olm_machine.store().import_secrets_bundle(bundle).await
735 }
736
737 pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
742 let olm = self.client.olm_machine().await;
743 let machine = olm.as_ref()?;
744 Some(machine.cross_signing_status().await)
745 }
746
747 pub async fn tracked_users(&self) -> Result<HashSet<OwnedUserId>, CryptoStoreError> {
752 if let Some(machine) = self.client.olm_machine().await.as_ref() {
753 machine.tracked_users().await
754 } else {
755 Ok(HashSet::new())
756 }
757 }
758
759 pub fn verification_state(&self) -> Subscriber<VerificationState> {
782 self.client.inner.verification_state.subscribe_reset()
783 }
784
785 pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
787 let olm = self.client.olm_machine().await;
788 let olm = olm.as_ref()?;
789 #[allow(clippy::bind_instead_of_map)]
790 olm.get_verification(user_id, flow_id).and_then(|v| match v {
791 matrix_sdk_base::crypto::Verification::SasV1(sas) => {
792 Some(SasVerification { inner: sas, client: self.client.clone() }.into())
793 }
794 #[cfg(feature = "qrcode")]
795 matrix_sdk_base::crypto::Verification::QrV1(qr) => {
796 Some(verification::QrVerification { inner: qr, client: self.client.clone() }.into())
797 }
798 _ => None,
799 })
800 }
801
802 pub async fn get_verification_request(
805 &self,
806 user_id: &UserId,
807 flow_id: impl AsRef<str>,
808 ) -> Option<VerificationRequest> {
809 let olm = self.client.olm_machine().await;
810 let olm = olm.as_ref()?;
811
812 olm.get_verification_request(user_id, flow_id)
813 .map(|r| VerificationRequest { inner: r, client: self.client.clone() })
814 }
815
816 pub async fn get_device(
850 &self,
851 user_id: &UserId,
852 device_id: &DeviceId,
853 ) -> Result<Option<Device>, CryptoStoreError> {
854 let olm = self.client.olm_machine().await;
855 let Some(machine) = olm.as_ref() else { return Ok(None) };
856 let device = machine.get_device(user_id, device_id, None).await?;
857 Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
858 }
859
860 pub async fn get_own_device(&self) -> Result<Option<Device>, CryptoStoreError> {
867 let olm = self.client.olm_machine().await;
868 let Some(machine) = olm.as_ref() else { return Ok(None) };
869 let device = machine.get_device(machine.user_id(), machine.device_id(), None).await?;
870 Ok(device.map(|d| Device { inner: d, client: self.client.clone() }))
871 }
872
873 pub async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices, Error> {
899 let devices = self
900 .client
901 .olm_machine()
902 .await
903 .as_ref()
904 .ok_or(Error::NoOlmMachine)?
905 .get_user_devices(user_id, None)
906 .await?;
907
908 Ok(UserDevices { inner: devices, client: self.client.clone() })
909 }
910
911 pub async fn get_user_identity(
947 &self,
948 user_id: &UserId,
949 ) -> Result<Option<UserIdentity>, CryptoStoreError> {
950 let olm = self.client.olm_machine().await;
951 let Some(olm) = olm.as_ref() else { return Ok(None) };
952 let identity = olm.get_identity(user_id, None).await?;
953
954 Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
955 }
956
957 pub async fn request_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
995 let olm = self.client.olm_machine().await;
996 let Some(olm) = olm.as_ref() else { return Ok(None) };
997
998 let (request_id, request) = olm.query_keys_for_users(iter::once(user_id));
999 self.client.keys_query(&request_id, request.device_keys).await?;
1000
1001 let identity = olm.get_identity(user_id, None).await?;
1002 Ok(identity.map(|i| UserIdentity::new(self.client.clone(), i)))
1003 }
1004
1005 pub async fn devices_stream(&self) -> Result<impl Stream<Item = DeviceUpdates>> {
1036 let olm = self.client.olm_machine().await;
1037 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1038 let client = self.client.to_owned();
1039
1040 Ok(olm
1041 .store()
1042 .devices_stream()
1043 .map(move |updates| DeviceUpdates::new(client.to_owned(), updates)))
1044 }
1045
1046 pub async fn user_identities_stream(&self) -> Result<impl Stream<Item = IdentityUpdates>> {
1074 let olm = self.client.olm_machine().await;
1075 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1076 let client = self.client.to_owned();
1077
1078 Ok(olm
1079 .store()
1080 .user_identities_stream()
1081 .map(move |updates| IdentityUpdates::new(client.to_owned(), updates)))
1082 }
1083
1084 pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData>) -> Result<()> {
1123 let olm = self.client.olm_machine().await;
1124 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1125
1126 let CrossSigningBootstrapRequests {
1127 upload_signing_keys_req,
1128 upload_keys_req,
1129 upload_signatures_req,
1130 } = olm.bootstrap_cross_signing(false).await?;
1131
1132 let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1133 auth: auth_data,
1134 master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1135 self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1136 user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1137 });
1138
1139 if let Some(req) = upload_keys_req {
1140 self.client.send_outgoing_request(req).await?;
1141 }
1142 self.client.send(upload_signing_keys_req).await?;
1143 self.client.send(upload_signatures_req).await?;
1144
1145 Ok(())
1146 }
1147
1148 pub async fn reset_cross_signing(&self) -> Result<Option<CrossSigningResetHandle>> {
1185 let olm = self.client.olm_machine().await;
1186 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1187
1188 let CrossSigningBootstrapRequests {
1189 upload_keys_req,
1190 upload_signing_keys_req,
1191 upload_signatures_req,
1192 } = olm.bootstrap_cross_signing(true).await?;
1193
1194 let upload_signing_keys_req = assign!(UploadSigningKeysRequest::new(), {
1195 auth: None,
1196 master_key: upload_signing_keys_req.master_key.map(|c| c.to_raw()),
1197 self_signing_key: upload_signing_keys_req.self_signing_key.map(|c| c.to_raw()),
1198 user_signing_key: upload_signing_keys_req.user_signing_key.map(|c| c.to_raw()),
1199 });
1200
1201 if let Some(req) = upload_keys_req {
1202 self.client.send_outgoing_request(req).await?;
1203 }
1204
1205 if let Err(error) = self.client.send(upload_signing_keys_req.clone()).await {
1206 if let Ok(Some(auth_type)) = CrossSigningResetAuthType::new(&error) {
1207 let client = self.client.clone();
1208
1209 Ok(Some(CrossSigningResetHandle::new(
1210 client,
1211 upload_signing_keys_req,
1212 upload_signatures_req,
1213 auth_type,
1214 )))
1215 } else {
1216 Err(error.into())
1217 }
1218 } else {
1219 self.client.send(upload_signatures_req).await?;
1220
1221 Ok(None)
1222 }
1223 }
1224
1225 async fn ensure_initial_key_query(&self) -> Result<()> {
1228 let olm_machine = self.client.olm_machine().await;
1229 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1230
1231 let user_id = olm_machine.user_id();
1232
1233 if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1234 let (request_id, request) = olm_machine.query_keys_for_users([olm_machine.user_id()]);
1235 self.client.keys_query(&request_id, request.device_keys).await?;
1236 }
1237
1238 Ok(())
1239 }
1240
1241 pub async fn bootstrap_cross_signing_if_needed(
1288 &self,
1289 auth_data: Option<AuthData>,
1290 ) -> Result<()> {
1291 let olm_machine = self.client.olm_machine().await;
1292 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1293 let user_id = olm_machine.user_id();
1294
1295 self.ensure_initial_key_query().await?;
1296
1297 if self.client.encryption().get_user_identity(user_id).await?.is_none() {
1298 self.bootstrap_cross_signing(auth_data).await?;
1299 }
1300
1301 Ok(())
1302 }
1303
1304 #[cfg(not(target_family = "wasm"))]
1356 pub async fn export_room_keys(
1357 &self,
1358 path: PathBuf,
1359 passphrase: &str,
1360 predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
1361 ) -> Result<()> {
1362 let olm = self.client.olm_machine().await;
1363 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1364
1365 let keys = olm.store().export_room_keys(predicate).await?;
1366 let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1367
1368 let encrypt = move || -> Result<()> {
1369 let export: String =
1370 matrix_sdk_base::crypto::encrypt_room_key_export(&keys, &passphrase, 500_000)?;
1371 let mut file = std::fs::File::create(path)?;
1372 file.write_all(&export.into_bytes())?;
1373 Ok(())
1374 };
1375
1376 let task = tokio::task::spawn_blocking(encrypt);
1377 task.await.expect("Task join error")
1378 }
1379
1380 #[cfg(not(target_family = "wasm"))]
1418 pub async fn import_room_keys(
1419 &self,
1420 path: PathBuf,
1421 passphrase: &str,
1422 ) -> Result<RoomKeyImportResult, RoomKeyImportError> {
1423 let olm = self.client.olm_machine().await;
1424 let olm = olm.as_ref().ok_or(RoomKeyImportError::StoreClosed)?;
1425 let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
1426
1427 let decrypt = move || {
1428 let file = std::fs::File::open(path)?;
1429 matrix_sdk_base::crypto::decrypt_room_key_export(file, &passphrase)
1430 };
1431
1432 let task = tokio::task::spawn_blocking(decrypt);
1433 let import = task.await.expect("Task join error")?;
1434
1435 let ret = olm.store().import_exported_room_keys(import, |_, _| {}).await?;
1436
1437 self.backups().maybe_trigger_backup();
1438
1439 Ok(ret)
1440 }
1441
1442 pub async fn room_keys_received_stream(
1473 &self,
1474 ) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>>> {
1475 let olm = self.client.olm_machine().await;
1476 let olm = olm.as_ref()?;
1477
1478 Some(olm.store().room_keys_received_stream())
1479 }
1480
1481 pub fn secret_storage(&self) -> SecretStorage {
1483 SecretStorage { client: self.client.to_owned() }
1484 }
1485
1486 pub fn backups(&self) -> Backups {
1488 Backups { client: self.client.to_owned() }
1489 }
1490
1491 pub fn recovery(&self) -> Recovery {
1493 Recovery { client: self.client.to_owned() }
1494 }
1495
1496 pub async fn enable_cross_process_store_lock(&self, lock_value: String) -> Result<(), Error> {
1508 if let Some(prev_lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1510 let prev_holder = prev_lock.lock_holder();
1511 if prev_holder == lock_value {
1512 return Ok(());
1513 }
1514 warn!("Recreating cross-process store lock with a different holder value: prev was {prev_holder}, new is {lock_value}");
1515 }
1516
1517 let olm_machine = self.client.base_client().olm_machine().await;
1518 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
1519
1520 let lock =
1521 olm_machine.store().create_store_lock("cross_process_lock".to_owned(), lock_value);
1522
1523 {
1528 let guard = lock.try_lock_once().await?;
1529 if guard.is_some() {
1530 olm_machine
1531 .initialize_crypto_store_generation(
1532 &self.client.locks().crypto_store_generation,
1533 )
1534 .await?;
1535 }
1536 }
1537
1538 self.client
1539 .locks()
1540 .cross_process_crypto_store_lock
1541 .set(lock)
1542 .map_err(|_| Error::BadCryptoStoreState)?;
1543
1544 Ok(())
1545 }
1546
1547 async fn on_lock_newly_acquired(&self) -> Result<u64, Error> {
1552 let olm_machine_guard = self.client.olm_machine().await;
1553 if let Some(olm_machine) = olm_machine_guard.as_ref() {
1554 let (new_gen, generation_number) = olm_machine
1555 .maintain_crypto_store_generation(&self.client.locks().crypto_store_generation)
1556 .await?;
1557 if new_gen {
1559 drop(olm_machine_guard);
1561 self.client.base_client().regenerate_olm(None).await?;
1563 }
1564 Ok(generation_number)
1565 } else {
1566 warn!("Encryption::on_lock_newly_acquired: called before OlmMachine initialised");
1571 Ok(0)
1572 }
1573 }
1574
1575 pub async fn spin_lock_store(
1581 &self,
1582 max_backoff: Option<u32>,
1583 ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1584 if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1585 let guard = lock.spin_lock(max_backoff).await?;
1586
1587 let generation = self.on_lock_newly_acquired().await?;
1588
1589 Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation }))
1590 } else {
1591 Ok(None)
1592 }
1593 }
1594
1595 pub async fn try_lock_store_once(
1600 &self,
1601 ) -> Result<Option<CrossProcessLockStoreGuardWithGeneration>, Error> {
1602 if let Some(lock) = self.client.locks().cross_process_crypto_store_lock.get() {
1603 let maybe_guard = lock.try_lock_once().await?;
1604
1605 let Some(guard) = maybe_guard else {
1606 return Ok(None);
1607 };
1608
1609 let generation = self.on_lock_newly_acquired().await?;
1610
1611 Ok(Some(CrossProcessLockStoreGuardWithGeneration { _guard: guard, generation }))
1612 } else {
1613 Ok(None)
1614 }
1615 }
1616
1617 #[cfg(any(test, feature = "testing"))]
1619 pub async fn uploaded_key_count(&self) -> Result<u64> {
1620 let olm_machine = self.client.olm_machine().await;
1621 let olm_machine = olm_machine.as_ref().ok_or(Error::AuthenticationRequired)?;
1622 Ok(olm_machine.uploaded_key_count().await?)
1623 }
1624
1625 pub(crate) fn spawn_initialization_task(&self, auth_data: Option<AuthData>) {
1649 let mut tasks = self.client.inner.e2ee.tasks.lock();
1650
1651 let this = self.clone();
1652 tasks.setup_e2ee = Some(spawn(async move {
1653 this.update_verification_state().await;
1656
1657 if this.settings().auto_enable_cross_signing {
1658 if let Err(e) = this.bootstrap_cross_signing_if_needed(auth_data).await {
1659 error!("Couldn't bootstrap cross signing {e:?}");
1660 }
1661 }
1662
1663 if let Err(e) = this.backups().setup_and_resume().await {
1664 error!("Couldn't setup and resume backups {e:?}");
1665 }
1666 if let Err(e) = this.recovery().setup().await {
1667 error!("Couldn't setup and resume recovery {e:?}");
1668 }
1669 }));
1670 }
1671
1672 pub async fn wait_for_e2ee_initialization_tasks(&self) {
1675 let task = self.client.inner.e2ee.tasks.lock().setup_e2ee.take();
1676
1677 if let Some(task) = task {
1678 if let Err(err) = task.await {
1679 warn!("Error when initializing backups: {err}");
1680 }
1681 }
1682 }
1683
1684 pub(crate) async fn ensure_device_keys_upload(&self) -> Result<()> {
1694 let olm = self.client.olm_machine().await;
1695 let olm = olm.as_ref().ok_or(Error::NoOlmMachine)?;
1696
1697 if let Some((request_id, request)) = olm.upload_device_keys().await? {
1698 self.client.keys_upload(&request_id, &request).await?;
1699
1700 let (request_id, request) = olm.query_keys_for_users([olm.user_id()]);
1701 self.client.keys_query(&request_id, request.device_keys).await?;
1702 }
1703
1704 Ok(())
1705 }
1706
1707 pub(crate) async fn update_state_after_keys_query(&self, response: &get_keys::v3::Response) {
1708 self.recovery().update_state_after_keys_query(response).await;
1709
1710 if let Some(user_id) = self.client.user_id() {
1712 let contains_own_device = response.device_keys.contains_key(user_id);
1713
1714 if contains_own_device {
1715 self.update_verification_state().await;
1716 }
1717 }
1718 }
1719
1720 async fn update_verification_state(&self) {
1721 match self.get_own_device().await {
1722 Ok(device) => {
1723 if let Some(device) = device {
1724 let is_verified = device.is_cross_signed_by_owner();
1725
1726 if is_verified {
1727 self.client.inner.verification_state.set(VerificationState::Verified);
1728 } else {
1729 self.client.inner.verification_state.set(VerificationState::Unverified);
1730 }
1731 } else {
1732 warn!("Couldn't find out own device in the store.");
1733 self.client.inner.verification_state.set(VerificationState::Unknown);
1734 }
1735 }
1736 Err(error) => {
1737 warn!("Failed retrieving own device: {error}");
1738 self.client.inner.verification_state.set(VerificationState::Unknown);
1739 }
1740 }
1741 }
1742
1743 #[cfg(feature = "experimental-send-custom-to-device")]
1752 pub async fn encrypt_and_send_raw_to_device(
1753 &self,
1754 recipient_devices: Vec<&Device>,
1755 event_type: &str,
1756 content: Raw<AnyToDeviceEventContent>,
1757 ) -> Result<Vec<(OwnedUserId, OwnedDeviceId)>> {
1758 let users = recipient_devices.iter().map(|device| device.user_id());
1759
1760 self.client.claim_one_time_keys(users).await?;
1764
1765 let olm = self.client.olm_machine().await;
1766 let olm = olm.as_ref().expect("Olm machine wasn't started");
1767
1768 let (requests, withhelds) = olm
1769 .encrypt_content_for_devices(
1770 recipient_devices.into_iter().map(|d| d.deref().clone()).collect(),
1771 event_type,
1772 &content
1773 .deserialize_as::<serde_json::Value>()
1774 .expect("Deserialize as Value will always work"),
1775 )
1776 .await?;
1777
1778 let mut failures: Vec<(OwnedUserId, OwnedDeviceId)> = Default::default();
1779
1780 withhelds.iter().for_each(|(d, _)| {
1782 failures.push((d.user_id().to_owned(), d.device_id().to_owned()));
1783 });
1784
1785 for request in requests {
1787 let ruma_request = RumaToDeviceRequest::new_raw(
1788 request.event_type.clone(),
1789 request.txn_id.clone(),
1790 request.messages.clone(),
1791 );
1792
1793 let send_result = self
1794 .client
1795 .send_inner(ruma_request, Some(RequestConfig::short_retry()), Default::default())
1796 .await;
1797
1798 if send_result.is_err() {
1800 for (user_id, device_map) in request.messages {
1802 for device_id in device_map.keys() {
1803 match device_id {
1804 DeviceIdOrAllDevices::DeviceId(device_id) => {
1805 failures.push((user_id.clone(), device_id.to_owned()));
1806 }
1807 DeviceIdOrAllDevices::AllDevices => {
1808 }
1810 }
1811 }
1812 }
1813 }
1814 }
1815
1816 Ok(failures)
1817 }
1818}
1819
1820#[cfg(all(test, not(target_family = "wasm")))]
1821mod tests {
1822 use std::{
1823 ops::Not,
1824 sync::{
1825 atomic::{AtomicBool, Ordering},
1826 Arc,
1827 },
1828 time::Duration,
1829 };
1830
1831 use matrix_sdk_test::{
1832 async_test, test_json, GlobalAccountDataTestEvent, JoinedRoomBuilder, StateTestEvent,
1833 SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
1834 };
1835 use ruma::{
1836 event_id,
1837 events::{reaction::ReactionEventContent, relation::Annotation},
1838 user_id,
1839 };
1840 use serde_json::json;
1841 use wiremock::{
1842 matchers::{header, method, path_regex},
1843 Mock, MockServer, Request, ResponseTemplate,
1844 };
1845
1846 use crate::{
1847 assert_next_matches_with_timeout,
1848 config::RequestConfig,
1849 encryption::{OAuthCrossSigningResetInfo, VerificationState},
1850 test_utils::{
1851 client::mock_matrix_session, logged_in_client, no_retry_test_client, set_client_session,
1852 },
1853 Client,
1854 };
1855
1856 #[async_test]
1857 async fn test_reaction_sending() {
1858 let server = MockServer::start().await;
1859 let client = logged_in_client(Some(server.uri())).await;
1860
1861 let event_id = event_id!("$2:example.org");
1862
1863 Mock::given(method("GET"))
1864 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
1865 .and(header("authorization", "Bearer 1234"))
1866 .respond_with(
1867 ResponseTemplate::new(200)
1868 .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
1869 )
1870 .mount(&server)
1871 .await;
1872
1873 Mock::given(method("PUT"))
1874 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/m\.reaction/.*".to_owned()))
1875 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1876 "event_id": event_id,
1877 })))
1878 .mount(&server)
1879 .await;
1880
1881 let response = SyncResponseBuilder::default()
1882 .add_joined_room(
1883 JoinedRoomBuilder::default()
1884 .add_state_event(StateTestEvent::Member)
1885 .add_state_event(StateTestEvent::PowerLevels)
1886 .add_state_event(StateTestEvent::Encryption),
1887 )
1888 .build_sync_response();
1889
1890 client.base_client().receive_sync_response(response).await.unwrap();
1891
1892 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
1893 assert!(room
1894 .latest_encryption_state()
1895 .await
1896 .expect("Getting encryption state")
1897 .is_encrypted());
1898
1899 let event_id = event_id!("$1:example.org");
1900 let reaction = ReactionEventContent::new(Annotation::new(event_id.into(), "🐈".to_owned()));
1901 room.send(reaction).await.expect("Sending the reaction should not fail");
1902
1903 room.send_raw("m.reaction", json!({})).await.expect("Sending the reaction should not fail");
1904 }
1905
1906 #[async_test]
1907 async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
1908 let server = MockServer::start().await;
1909 let client = logged_in_client(Some(server.uri())).await;
1910 let user_id = user_id!("@invited:localhost");
1914
1915 let response = SyncResponseBuilder::default()
1917 .add_joined_room(
1918 JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberAdditional),
1919 )
1920 .add_global_account_data_event(GlobalAccountDataTestEvent::Direct)
1921 .build_sync_response();
1922 client.base_client().receive_sync_response(response).await.unwrap();
1923
1924 let found_room = client.get_dm_room(user_id).expect("DM not found!");
1926 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
1927 }
1928
1929 #[async_test]
1930 async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
1931 let server = MockServer::start().await;
1932 let client = logged_in_client(Some(server.uri())).await;
1933 let user_id = user_id!("@invited:localhost");
1935
1936 let response = SyncResponseBuilder::default()
1938 .add_joined_room(
1939 JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberInvite),
1940 )
1941 .add_global_account_data_event(GlobalAccountDataTestEvent::Direct)
1942 .build_sync_response();
1943 client.base_client().receive_sync_response(response).await.unwrap();
1944
1945 let found_room = client.get_dm_room(user_id).expect("DM not found!");
1947 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
1948 }
1949
1950 #[async_test]
1951 async fn test_get_dm_room_still_finds_left_room() {
1952 let server = MockServer::start().await;
1956 let client = logged_in_client(Some(server.uri())).await;
1957 let user_id = user_id!("@invited:localhost");
1961
1962 let response = SyncResponseBuilder::default()
1964 .add_joined_room(
1965 JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberLeave),
1966 )
1967 .add_global_account_data_event(GlobalAccountDataTestEvent::Direct)
1968 .build_sync_response();
1969 client.base_client().receive_sync_response(response).await.unwrap();
1970
1971 let found_room = client.get_dm_room(user_id).expect("DM not found!");
1973 assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
1974 }
1975
1976 #[cfg(feature = "sqlite")]
1977 #[async_test]
1978 async fn test_generation_counter_invalidates_olm_machine() {
1979 use matrix_sdk_base::store::RoomLoadSettings;
1982 let sqlite_path = std::env::temp_dir().join("generation_counter_sqlite.db");
1983 let session = mock_matrix_session();
1984
1985 let client1 = Client::builder()
1986 .homeserver_url("http://localhost:1234")
1987 .request_config(RequestConfig::new().disable_retry())
1988 .sqlite_store(&sqlite_path, None)
1989 .build()
1990 .await
1991 .unwrap();
1992 client1
1993 .matrix_auth()
1994 .restore_session(session.clone(), RoomLoadSettings::default())
1995 .await
1996 .unwrap();
1997
1998 let client2 = Client::builder()
1999 .homeserver_url("http://localhost:1234")
2000 .request_config(RequestConfig::new().disable_retry())
2001 .sqlite_store(sqlite_path, None)
2002 .build()
2003 .await
2004 .unwrap();
2005 client2.matrix_auth().restore_session(session, RoomLoadSettings::default()).await.unwrap();
2006
2007 let guard = client1.encryption().try_lock_store_once().await.unwrap();
2009 assert!(guard.is_none());
2010
2011 client1.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2012 client2.encryption().enable_cross_process_store_lock("client2".to_owned()).await.unwrap();
2013
2014 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2016 assert!(acquired1.is_some());
2017
2018 let initial_olm_machine =
2020 client1.olm_machine().await.clone().expect("must have an olm machine");
2021
2022 let decryption_key = matrix_sdk_base::crypto::store::types::BackupDecryptionKey::new()
2024 .expect("Can't create new recovery key");
2025 let backup_key = decryption_key.megolm_v1_public_key();
2026 backup_key.set_version("1".to_owned());
2027 initial_olm_machine
2028 .backup_machine()
2029 .save_decryption_key(Some(decryption_key.to_owned()), Some("1".to_owned()))
2030 .await
2031 .expect("Should save");
2032
2033 initial_olm_machine.backup_machine().enable_backup_v1(backup_key.clone()).await.unwrap();
2034
2035 assert!(client1.encryption().backups().are_enabled().await);
2036
2037 let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2039 assert!(acquired2.is_none());
2040
2041 drop(acquired1);
2043 tokio::time::sleep(Duration::from_millis(100)).await;
2044
2045 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2047 assert!(acquired1.is_some());
2048
2049 let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2051 assert!(initial_olm_machine.same_as(&olm_machine));
2052
2053 drop(acquired1);
2055 tokio::time::sleep(Duration::from_millis(100)).await;
2056
2057 let acquired2 = client2.encryption().try_lock_store_once().await.unwrap();
2059 assert!(acquired2.is_some());
2060
2061 drop(acquired2);
2063 tokio::time::sleep(Duration::from_millis(100)).await;
2064
2065 let acquired1 = client1.encryption().try_lock_store_once().await.unwrap();
2067 assert!(acquired1.is_some());
2068
2069 let olm_machine = client1.olm_machine().await.clone().expect("must have an olm machine");
2071
2072 assert!(!initial_olm_machine.same_as(&olm_machine));
2073
2074 let backup_key_new = olm_machine.backup_machine().get_backup_keys().await.unwrap();
2075 assert!(backup_key_new.decryption_key.is_some());
2076 assert_eq!(
2077 backup_key_new.decryption_key.unwrap().megolm_v1_public_key().to_base64(),
2078 backup_key.to_base64()
2079 );
2080 assert!(client1.encryption().backups().are_enabled().await);
2081 }
2082
2083 #[cfg(feature = "sqlite")]
2084 #[async_test]
2085 async fn test_generation_counter_no_spurious_invalidation() {
2086 use matrix_sdk_base::store::RoomLoadSettings;
2089 let sqlite_path =
2090 std::env::temp_dir().join("generation_counter_no_spurious_invalidations.db");
2091 let session = mock_matrix_session();
2092
2093 let client = Client::builder()
2094 .homeserver_url("http://localhost:1234")
2095 .request_config(RequestConfig::new().disable_retry())
2096 .sqlite_store(&sqlite_path, None)
2097 .build()
2098 .await
2099 .unwrap();
2100 client
2101 .matrix_auth()
2102 .restore_session(session.clone(), RoomLoadSettings::default())
2103 .await
2104 .unwrap();
2105
2106 let initial_olm_machine = client.olm_machine().await.as_ref().unwrap().clone();
2107
2108 client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
2109
2110 let after_enabling_lock = client.olm_machine().await.as_ref().unwrap().clone();
2112 assert!(initial_olm_machine.same_as(&after_enabling_lock));
2113
2114 {
2115 let client2 = Client::builder()
2117 .homeserver_url("http://localhost:1234")
2118 .request_config(RequestConfig::new().disable_retry())
2119 .sqlite_store(sqlite_path, None)
2120 .build()
2121 .await
2122 .unwrap();
2123 client2
2124 .matrix_auth()
2125 .restore_session(session, RoomLoadSettings::default())
2126 .await
2127 .unwrap();
2128
2129 client2
2130 .encryption()
2131 .enable_cross_process_store_lock("client2".to_owned())
2132 .await
2133 .unwrap();
2134
2135 let guard = client2.encryption().spin_lock_store(None).await.unwrap();
2136 assert!(guard.is_some());
2137
2138 drop(guard);
2139 tokio::time::sleep(Duration::from_millis(100)).await;
2140 }
2141
2142 {
2143 let acquired = client.encryption().try_lock_store_once().await.unwrap();
2144 assert!(acquired.is_some());
2145 }
2146
2147 let after_taking_lock_first_time = client.olm_machine().await.as_ref().unwrap().clone();
2149 assert!(!initial_olm_machine.same_as(&after_taking_lock_first_time));
2150
2151 {
2152 let acquired = client.encryption().try_lock_store_once().await.unwrap();
2153 assert!(acquired.is_some());
2154 }
2155
2156 let after_taking_lock_second_time = client.olm_machine().await.as_ref().unwrap().clone();
2158 assert!(after_taking_lock_first_time.same_as(&after_taking_lock_second_time));
2159 }
2160
2161 #[async_test]
2162 async fn test_update_verification_state_is_updated_before_any_requests_happen() {
2163 let client = no_retry_test_client(None).await;
2165 let server = MockServer::start().await;
2166
2167 let mut verification_state = client.encryption().verification_state();
2169
2170 assert_next_matches_with_timeout!(verification_state, VerificationState::Unknown);
2172
2173 let keys_requested = Arc::new(AtomicBool::new(false));
2176 let inner_bool = keys_requested.clone();
2177
2178 Mock::given(method("GET"))
2179 .and(path_regex(
2180 r"/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
2181 ))
2182 .respond_with(move |_req: &Request| {
2183 inner_bool.fetch_or(true, Ordering::SeqCst);
2184 ResponseTemplate::new(200).set_body_json(json!({}))
2185 })
2186 .mount(&server)
2187 .await;
2188
2189 set_client_session(&client).await;
2191
2192 assert!(keys_requested.load(Ordering::SeqCst).not());
2194 assert_next_matches_with_timeout!(verification_state, VerificationState::Unverified);
2195 }
2196
2197 #[test]
2198 fn test_oauth_reset_info_from_uiaa_info() {
2199 let auth_info = json!({
2200 "session": "dummy",
2201 "flows": [
2202 {
2203 "stages": [
2204 "org.matrix.cross_signing_reset"
2205 ]
2206 }
2207 ],
2208 "params": {
2209 "org.matrix.cross_signing_reset": {
2210 "url": "https://example.org/account/account?action=org.matrix.cross_signing_reset"
2211 }
2212 },
2213 "msg": "To reset..."
2214 });
2215
2216 let auth_info = serde_json::from_value(auth_info)
2217 .expect("We should be able to deserialize the UiaaInfo");
2218 OAuthCrossSigningResetInfo::from_auth_info(&auth_info)
2219 .expect("We should be able to fetch the cross-signing reset info from the auth info");
2220 }
2221}