1use std::{
42 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
43 fmt::Debug,
44 ops::Deref,
45 pin::pin,
46 sync::{atomic::Ordering, Arc},
47 time::Duration,
48};
49
50use as_variant::as_variant;
51use futures_core::Stream;
52use futures_util::StreamExt;
53use itertools::{Either, Itertools};
54use ruma::{
55 encryption::KeyUsage, events::secret::request::SecretName, DeviceId, OwnedDeviceId,
56 OwnedUserId, RoomId, UserId,
57};
58use serde::{de::DeserializeOwned, Serialize};
59use thiserror::Error;
60use tokio::sync::{Mutex, Notify, OwnedRwLockWriteGuard, RwLock};
61use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
62use tracing::{info, instrument, trace, warn};
63use types::{RoomKeyBundleInfo, StoredRoomKeyBundleData};
64use vodozemac::{megolm::SessionOrdering, Curve25519PublicKey};
65
66use self::types::{
67 Changes, CrossSigningKeyExport, DeviceChanges, DeviceUpdates, IdentityChanges, IdentityUpdates,
68 PendingChanges, RoomKeyInfo, RoomKeyWithheldInfo, UserKeyQueryResult,
69};
70#[cfg(doc)]
71use crate::{backups::BackupMachine, identities::OwnUserIdentity};
72use crate::{
73 gossiping::GossippedSecret,
74 identities::{user::UserIdentity, Device, DeviceData, UserDevices, UserIdentityData},
75 olm::{
76 Account, ExportedRoomKey, InboundGroupSession, PrivateCrossSigningIdentity, SenderData,
77 Session, StaticAccountData,
78 },
79 store::types::RoomKeyWithheldEntry,
80 types::{
81 BackupSecrets, CrossSigningSecrets, MegolmBackupV1Curve25519AesSha2Secrets, RoomKeyExport,
82 SecretsBundle,
83 },
84 verification::VerificationMachine,
85 CrossSigningStatus, OwnUserIdentityData, RoomKeyImportResult,
86};
87
88pub mod caches;
89mod crypto_store_wrapper;
90mod error;
91mod memorystore;
92mod traits;
93pub mod types;
94
95#[cfg(any(test, feature = "testing"))]
96#[macro_use]
97#[allow(missing_docs)]
98pub mod integration_tests;
99
100pub(crate) use crypto_store_wrapper::CryptoStoreWrapper;
101pub use error::{CryptoStoreError, Result};
102use matrix_sdk_common::{
103 cross_process_lock::CrossProcessLock, deserialized_responses::WithheldCode, timeout::timeout,
104};
105pub use memorystore::MemoryStore;
106pub use traits::{CryptoStore, DynCryptoStore, IntoCryptoStore};
107
108use self::caches::{SequenceNumber, StoreCache, StoreCacheGuard, UsersForKeyQuery};
109use crate::types::{
110 events::room_key_withheld::RoomKeyWithheldContent, room_history::RoomKeyBundle,
111};
112pub use crate::{
113 dehydrated_devices::DehydrationError,
114 gossiping::{GossipRequest, SecretInfo},
115};
116
117#[derive(Debug, Clone)]
124pub struct Store {
125 inner: Arc<StoreInner>,
126}
127
128#[derive(Debug, Default)]
129pub(crate) struct KeyQueryManager {
130 users_for_key_query: Mutex<UsersForKeyQuery>,
132
133 users_for_key_query_notify: Notify,
135}
136
137impl KeyQueryManager {
138 pub async fn synced<'a>(&'a self, cache: &'a StoreCache) -> Result<SyncedKeyQueryManager<'a>> {
139 self.ensure_sync_tracked_users(cache).await?;
140 Ok(SyncedKeyQueryManager { cache, manager: self })
141 }
142
143 async fn ensure_sync_tracked_users(&self, cache: &StoreCache) -> Result<()> {
150 let loaded = cache.loaded_tracked_users.read().await;
152 if *loaded {
153 return Ok(());
154 }
155
156 drop(loaded);
158 let mut loaded = cache.loaded_tracked_users.write().await;
159
160 if *loaded {
164 return Ok(());
165 }
166
167 let tracked_users = cache.store.load_tracked_users().await?;
168
169 let mut query_users_lock = self.users_for_key_query.lock().await;
170 let mut tracked_users_cache = cache.tracked_users.write();
171 for user in tracked_users {
172 tracked_users_cache.insert(user.user_id.to_owned());
173
174 if user.dirty {
175 query_users_lock.insert_user(&user.user_id);
176 }
177 }
178
179 *loaded = true;
180
181 Ok(())
182 }
183
184 pub async fn wait_if_user_key_query_pending(
194 &self,
195 cache: StoreCacheGuard,
196 timeout_duration: Duration,
197 user: &UserId,
198 ) -> Result<UserKeyQueryResult> {
199 {
200 self.ensure_sync_tracked_users(&cache).await?;
203 drop(cache);
204 }
205
206 let mut users_for_key_query = self.users_for_key_query.lock().await;
207 let Some(waiter) = users_for_key_query.maybe_register_waiting_task(user) else {
208 return Ok(UserKeyQueryResult::WasNotPending);
209 };
210
211 let wait_for_completion = async {
212 while !waiter.completed.load(Ordering::Relaxed) {
213 let mut notified = pin!(self.users_for_key_query_notify.notified());
217 notified.as_mut().enable();
218 drop(users_for_key_query);
219
220 notified.await;
222
223 users_for_key_query = self.users_for_key_query.lock().await;
227 }
228 };
229
230 match timeout(Box::pin(wait_for_completion), timeout_duration).await {
231 Err(_) => {
232 warn!(
233 user_id = ?user,
234 "The user has a pending `/keys/query` request which did \
235 not finish yet, some devices might be missing."
236 );
237
238 Ok(UserKeyQueryResult::TimeoutExpired)
239 }
240 _ => Ok(UserKeyQueryResult::WasPending),
241 }
242 }
243}
244
245pub(crate) struct SyncedKeyQueryManager<'a> {
246 cache: &'a StoreCache,
247 manager: &'a KeyQueryManager,
248}
249
250impl SyncedKeyQueryManager<'_> {
251 pub async fn update_tracked_users(&self, users: impl Iterator<Item = &UserId>) -> Result<()> {
256 let mut store_updates = Vec::new();
257 let mut key_query_lock = self.manager.users_for_key_query.lock().await;
258
259 {
260 let mut tracked_users = self.cache.tracked_users.write();
261 for user_id in users {
262 if tracked_users.insert(user_id.to_owned()) {
263 key_query_lock.insert_user(user_id);
264 store_updates.push((user_id, true))
265 }
266 }
267 }
268
269 self.cache.store.save_tracked_users(&store_updates).await
270 }
271
272 pub async fn mark_tracked_users_as_changed(
279 &self,
280 users: impl Iterator<Item = &UserId>,
281 ) -> Result<()> {
282 let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
283 let mut key_query_lock = self.manager.users_for_key_query.lock().await;
284
285 {
286 let tracked_users = &self.cache.tracked_users.read();
287 for user_id in users {
288 if tracked_users.contains(user_id) {
289 key_query_lock.insert_user(user_id);
290 store_updates.push((user_id, true));
291 }
292 }
293 }
294
295 self.cache.store.save_tracked_users(&store_updates).await
296 }
297
298 pub async fn mark_tracked_users_as_up_to_date(
304 &self,
305 users: impl Iterator<Item = &UserId>,
306 sequence_number: SequenceNumber,
307 ) -> Result<()> {
308 let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
309 let mut key_query_lock = self.manager.users_for_key_query.lock().await;
310
311 {
312 let tracked_users = self.cache.tracked_users.read();
313 for user_id in users {
314 if tracked_users.contains(user_id) {
315 let clean = key_query_lock.maybe_remove_user(user_id, sequence_number);
316 store_updates.push((user_id, !clean));
317 }
318 }
319 }
320
321 self.cache.store.save_tracked_users(&store_updates).await?;
322 self.manager.users_for_key_query_notify.notify_waiters();
324
325 Ok(())
326 }
327
328 pub async fn users_for_key_query(&self) -> (HashSet<OwnedUserId>, SequenceNumber) {
340 self.manager.users_for_key_query.lock().await.users_for_key_query()
341 }
342
343 pub fn tracked_users(&self) -> HashSet<OwnedUserId> {
345 self.cache.tracked_users.read().iter().cloned().collect()
346 }
347
348 pub async fn mark_user_as_changed(&self, user: &UserId) -> Result<()> {
354 self.manager.users_for_key_query.lock().await.insert_user(user);
355 self.cache.tracked_users.write().insert(user.to_owned());
356
357 self.cache.store.save_tracked_users(&[(user, true)]).await
358 }
359}
360
361fn collect_device_updates(
367 verification_machine: VerificationMachine,
368 own_identity: Option<OwnUserIdentityData>,
369 identities: IdentityChanges,
370 devices: DeviceChanges,
371) -> DeviceUpdates {
372 let mut new: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
373 let mut changed: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
374
375 let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
376
377 let map_device = |device: DeviceData| {
378 let device_owner_identity = new_identities
379 .get(device.user_id())
380 .or_else(|| changed_identities.get(device.user_id()))
381 .or_else(|| unchanged_identities.get(device.user_id()))
382 .cloned();
383
384 Device {
385 inner: device,
386 verification_machine: verification_machine.to_owned(),
387 own_identity: own_identity.to_owned(),
388 device_owner_identity,
389 }
390 };
391
392 for device in devices.new {
393 let device = map_device(device);
394
395 new.entry(device.user_id().to_owned())
396 .or_default()
397 .insert(device.device_id().to_owned(), device);
398 }
399
400 for device in devices.changed {
401 let device = map_device(device);
402
403 changed
404 .entry(device.user_id().to_owned())
405 .or_default()
406 .insert(device.device_id().to_owned(), device.to_owned());
407 }
408
409 DeviceUpdates { new, changed }
410}
411
412#[allow(missing_debug_implementations)]
414pub struct StoreTransaction {
415 store: Store,
416 changes: PendingChanges,
417 cache: OwnedRwLockWriteGuard<StoreCache>,
419}
420
421impl StoreTransaction {
422 async fn new(store: Store) -> Self {
424 let cache = store.inner.cache.clone();
425
426 Self { store, changes: PendingChanges::default(), cache: cache.clone().write_owned().await }
427 }
428
429 pub(crate) fn cache(&self) -> &StoreCache {
430 &self.cache
431 }
432
433 pub fn store(&self) -> &Store {
435 &self.store
436 }
437
438 pub async fn account(&mut self) -> Result<&mut Account> {
445 if self.changes.account.is_none() {
446 let _ = self.cache.account().await?;
448 self.changes.account = self.cache.account.lock().await.take();
449 }
450 Ok(self.changes.account.as_mut().unwrap())
451 }
452
453 pub async fn commit(self) -> Result<()> {
456 if self.changes.is_empty() {
457 return Ok(());
458 }
459
460 let account = self.changes.account.as_ref().map(|acc| acc.deep_clone());
462
463 self.store.save_pending_changes(self.changes).await?;
464
465 if let Some(account) = account {
467 *self.cache.account.lock().await = Some(account);
468 }
469
470 Ok(())
471 }
472}
473
474#[derive(Debug)]
475struct StoreInner {
476 identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
477 store: Arc<CryptoStoreWrapper>,
478
479 cache: Arc<RwLock<StoreCache>>,
483
484 verification_machine: VerificationMachine,
485
486 static_account: StaticAccountData,
489}
490
491#[derive(Debug, Error)]
494pub enum SecretImportError {
495 #[error("Error while importing {name}: {error}")]
497 Key {
498 name: SecretName,
500 error: vodozemac::KeyError,
502 },
503 #[error(
506 "Error while importing {name}: The public key of the imported private \
507 key doesn't match the public key that was uploaded to the server"
508 )]
509 MismatchedPublicKeys {
510 name: SecretName,
512 },
513 #[error(transparent)]
515 Store(#[from] CryptoStoreError),
516}
517
518#[derive(Debug, Error)]
523pub enum SecretsBundleExportError {
524 #[error(transparent)]
526 Store(#[from] CryptoStoreError),
527 #[error("The store is missing one or multiple cross-signing keys")]
529 MissingCrossSigningKey(KeyUsage),
530 #[error("The store doesn't contain any cross-signing keys")]
532 MissingCrossSigningKeys,
533 #[error("The store contains a backup key, but no backup version")]
536 MissingBackupVersion,
537}
538
539impl Store {
540 pub(crate) fn new(
542 account: StaticAccountData,
543 identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
544 store: Arc<CryptoStoreWrapper>,
545 verification_machine: VerificationMachine,
546 ) -> Self {
547 Self {
548 inner: Arc::new(StoreInner {
549 static_account: account,
550 identity,
551 store: store.clone(),
552 verification_machine,
553 cache: Arc::new(RwLock::new(StoreCache {
554 store,
555 tracked_users: Default::default(),
556 loaded_tracked_users: Default::default(),
557 account: Default::default(),
558 })),
559 }),
560 }
561 }
562
563 pub(crate) fn user_id(&self) -> &UserId {
565 &self.inner.static_account.user_id
566 }
567
568 pub(crate) fn device_id(&self) -> &DeviceId {
570 self.inner.verification_machine.own_device_id()
571 }
572
573 pub(crate) fn static_account(&self) -> &StaticAccountData {
575 &self.inner.static_account
576 }
577
578 pub(crate) async fn cache(&self) -> Result<StoreCacheGuard> {
579 Ok(StoreCacheGuard { cache: self.inner.cache.clone().read_owned().await })
584 }
585
586 pub(crate) async fn transaction(&self) -> StoreTransaction {
587 StoreTransaction::new(self.clone()).await
588 }
589
590 pub(crate) async fn with_transaction<
593 T,
594 Fut: futures_core::Future<Output = Result<(StoreTransaction, T), crate::OlmError>>,
595 F: FnOnce(StoreTransaction) -> Fut,
596 >(
597 &self,
598 func: F,
599 ) -> Result<T, crate::OlmError> {
600 let tr = self.transaction().await;
601 let (tr, res) = func(tr).await?;
602 tr.commit().await?;
603 Ok(res)
604 }
605
606 #[cfg(test)]
607 pub(crate) async fn reset_cross_signing_identity(&self) {
609 self.inner.identity.lock().await.reset();
610 }
611
612 pub(crate) fn private_identity(&self) -> Arc<Mutex<PrivateCrossSigningIdentity>> {
614 self.inner.identity.clone()
615 }
616
617 pub(crate) async fn save_sessions(&self, sessions: &[Session]) -> Result<()> {
619 let changes = Changes { sessions: sessions.to_vec(), ..Default::default() };
620
621 self.save_changes(changes).await
622 }
623
624 pub(crate) async fn get_sessions(
625 &self,
626 sender_key: &str,
627 ) -> Result<Option<Arc<Mutex<Vec<Session>>>>> {
628 self.inner.store.get_sessions(sender_key).await
629 }
630
631 pub(crate) async fn save_changes(&self, changes: Changes) -> Result<()> {
632 self.inner.store.save_changes(changes).await
633 }
634
635 pub(crate) async fn compare_group_session(
642 &self,
643 session: &InboundGroupSession,
644 ) -> Result<SessionOrdering> {
645 let old_session = self
646 .inner
647 .store
648 .get_inbound_group_session(session.room_id(), session.session_id())
649 .await?;
650
651 Ok(if let Some(old_session) = old_session {
652 session.compare(&old_session).await
653 } else {
654 SessionOrdering::Better
655 })
656 }
657
658 #[cfg(test)]
659 pub(crate) async fn save_device_data(&self, devices: &[DeviceData]) -> Result<()> {
661 use types::DeviceChanges;
662
663 let changes = Changes {
664 devices: DeviceChanges { changed: devices.to_vec(), ..Default::default() },
665 ..Default::default()
666 };
667
668 self.save_changes(changes).await
669 }
670
671 pub(crate) async fn save_inbound_group_sessions(
673 &self,
674 sessions: &[InboundGroupSession],
675 ) -> Result<()> {
676 let changes = Changes { inbound_group_sessions: sessions.to_vec(), ..Default::default() };
677
678 self.save_changes(changes).await
679 }
680
681 pub(crate) async fn device_display_name(&self) -> Result<Option<String>, CryptoStoreError> {
683 Ok(self
684 .inner
685 .store
686 .get_device(self.user_id(), self.device_id())
687 .await?
688 .and_then(|d| d.display_name().map(|d| d.to_owned())))
689 }
690
691 pub(crate) async fn get_device_data(
696 &self,
697 user_id: &UserId,
698 device_id: &DeviceId,
699 ) -> Result<Option<DeviceData>> {
700 self.inner.store.get_device(user_id, device_id).await
701 }
702
703 pub(crate) async fn get_device_data_for_user_filtered(
711 &self,
712 user_id: &UserId,
713 ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
714 self.inner.store.get_user_devices(user_id).await.map(|mut d| {
715 if user_id == self.user_id() {
716 d.remove(self.device_id());
717 }
718 d
719 })
720 }
721
722 pub(crate) async fn get_device_data_for_user(
731 &self,
732 user_id: &UserId,
733 ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
734 self.inner.store.get_user_devices(user_id).await
735 }
736
737 pub(crate) async fn get_device_from_curve_key(
743 &self,
744 user_id: &UserId,
745 curve_key: Curve25519PublicKey,
746 ) -> Result<Option<Device>> {
747 self.get_user_devices(user_id)
748 .await
749 .map(|d| d.devices().find(|d| d.curve25519_key() == Some(curve_key)))
750 }
751
752 pub(crate) async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices> {
762 let devices = self.get_device_data_for_user(user_id).await?;
763
764 let own_identity = self
765 .inner
766 .store
767 .get_user_identity(self.user_id())
768 .await?
769 .and_then(|i| i.own().cloned());
770 let device_owner_identity = self.inner.store.get_user_identity(user_id).await?;
771
772 Ok(UserDevices {
773 inner: devices,
774 verification_machine: self.inner.verification_machine.clone(),
775 own_identity,
776 device_owner_identity,
777 })
778 }
779
780 pub(crate) async fn get_device(
790 &self,
791 user_id: &UserId,
792 device_id: &DeviceId,
793 ) -> Result<Option<Device>> {
794 if let Some(device_data) = self.inner.store.get_device(user_id, device_id).await? {
795 Ok(Some(self.wrap_device_data(device_data).await?))
796 } else {
797 Ok(None)
798 }
799 }
800
801 pub(crate) async fn wrap_device_data(&self, device_data: DeviceData) -> Result<Device> {
806 let own_identity = self
807 .inner
808 .store
809 .get_user_identity(self.user_id())
810 .await?
811 .and_then(|i| i.own().cloned());
812
813 let device_owner_identity =
814 self.inner.store.get_user_identity(device_data.user_id()).await?;
815
816 Ok(Device {
817 inner: device_data,
818 verification_machine: self.inner.verification_machine.clone(),
819 own_identity,
820 device_owner_identity,
821 })
822 }
823
824 pub(crate) async fn get_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
826 let own_identity = self
827 .inner
828 .store
829 .get_user_identity(self.user_id())
830 .await?
831 .and_then(as_variant!(UserIdentityData::Own));
832
833 Ok(self.inner.store.get_user_identity(user_id).await?.map(|i| {
834 UserIdentity::new(
835 self.clone(),
836 i,
837 self.inner.verification_machine.to_owned(),
838 own_identity,
839 )
840 }))
841 }
842
843 pub async fn export_secret(
852 &self,
853 secret_name: &SecretName,
854 ) -> Result<Option<String>, CryptoStoreError> {
855 Ok(match secret_name {
856 SecretName::CrossSigningMasterKey
857 | SecretName::CrossSigningUserSigningKey
858 | SecretName::CrossSigningSelfSigningKey => {
859 self.inner.identity.lock().await.export_secret(secret_name).await
860 }
861 SecretName::RecoveryKey => {
862 if let Some(key) = self.load_backup_keys().await?.decryption_key {
863 let exported = key.to_base64();
864 Some(exported)
865 } else {
866 None
867 }
868 }
869 name => {
870 warn!(secret = ?name, "Unknown secret was requested");
871 None
872 }
873 })
874 }
875
876 pub async fn export_cross_signing_keys(
884 &self,
885 ) -> Result<Option<CrossSigningKeyExport>, CryptoStoreError> {
886 let master_key = self.export_secret(&SecretName::CrossSigningMasterKey).await?;
887 let self_signing_key = self.export_secret(&SecretName::CrossSigningSelfSigningKey).await?;
888 let user_signing_key = self.export_secret(&SecretName::CrossSigningUserSigningKey).await?;
889
890 Ok(if master_key.is_none() && self_signing_key.is_none() && user_signing_key.is_none() {
891 None
892 } else {
893 Some(CrossSigningKeyExport { master_key, self_signing_key, user_signing_key })
894 })
895 }
896
897 pub async fn import_cross_signing_keys(
902 &self,
903 export: CrossSigningKeyExport,
904 ) -> Result<CrossSigningStatus, SecretImportError> {
905 if let Some(public_identity) =
906 self.get_identity(self.user_id()).await?.and_then(|i| i.own())
907 {
908 let identity = self.inner.identity.lock().await;
909
910 identity
911 .import_secrets(
912 public_identity.to_owned(),
913 export.master_key.as_deref(),
914 export.self_signing_key.as_deref(),
915 export.user_signing_key.as_deref(),
916 )
917 .await?;
918
919 let status = identity.status().await;
920
921 let diff = identity.get_public_identity_diff(&public_identity.inner).await;
922
923 let mut changes =
924 Changes { private_identity: Some(identity.clone()), ..Default::default() };
925
926 if diff.none_differ() {
927 public_identity.mark_as_verified();
928 changes.identities.changed.push(UserIdentityData::Own(public_identity.inner));
929 }
930
931 info!(?status, "Successfully imported the private cross-signing keys");
932
933 self.save_changes(changes).await?;
934 } else {
935 warn!(
936 "No public identity found while importing cross-signing keys, \
937 a /keys/query needs to be done"
938 );
939 }
940
941 Ok(self.inner.identity.lock().await.status().await)
942 }
943
944 pub async fn export_secrets_bundle(&self) -> Result<SecretsBundle, SecretsBundleExportError> {
956 let Some(cross_signing) = self.export_cross_signing_keys().await? else {
957 return Err(SecretsBundleExportError::MissingCrossSigningKeys);
958 };
959
960 let Some(master_key) = cross_signing.master_key.clone() else {
961 return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::Master));
962 };
963
964 let Some(user_signing_key) = cross_signing.user_signing_key.clone() else {
965 return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::UserSigning));
966 };
967
968 let Some(self_signing_key) = cross_signing.self_signing_key.clone() else {
969 return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::SelfSigning));
970 };
971
972 let backup_keys = self.load_backup_keys().await?;
973
974 let backup = if let Some(key) = backup_keys.decryption_key {
975 if let Some(backup_version) = backup_keys.backup_version {
976 Some(BackupSecrets::MegolmBackupV1Curve25519AesSha2(
977 MegolmBackupV1Curve25519AesSha2Secrets { key, backup_version },
978 ))
979 } else {
980 return Err(SecretsBundleExportError::MissingBackupVersion);
981 }
982 } else {
983 None
984 };
985
986 Ok(SecretsBundle {
987 cross_signing: CrossSigningSecrets { master_key, user_signing_key, self_signing_key },
988 backup,
989 })
990 }
991
992 pub async fn import_secrets_bundle(
1005 &self,
1006 bundle: &SecretsBundle,
1007 ) -> Result<(), SecretImportError> {
1008 let mut changes = Changes::default();
1009
1010 if let Some(backup_bundle) = &bundle.backup {
1011 match backup_bundle {
1012 BackupSecrets::MegolmBackupV1Curve25519AesSha2(bundle) => {
1013 changes.backup_decryption_key = Some(bundle.key.clone());
1014 changes.backup_version = Some(bundle.backup_version.clone());
1015 }
1016 }
1017 }
1018
1019 let identity = self.inner.identity.lock().await;
1020
1021 identity
1022 .import_secrets_unchecked(
1023 Some(&bundle.cross_signing.master_key),
1024 Some(&bundle.cross_signing.self_signing_key),
1025 Some(&bundle.cross_signing.user_signing_key),
1026 )
1027 .await?;
1028
1029 let public_identity = identity.to_public_identity().await.expect(
1030 "We should be able to create a new public identity since we just imported \
1031 all the private cross-signing keys",
1032 );
1033
1034 changes.private_identity = Some(identity.clone());
1035 changes.identities.new.push(UserIdentityData::Own(public_identity));
1036
1037 Ok(self.save_changes(changes).await?)
1038 }
1039
1040 pub async fn import_secret(&self, secret: &GossippedSecret) -> Result<(), SecretImportError> {
1042 match &secret.secret_name {
1043 SecretName::CrossSigningMasterKey
1044 | SecretName::CrossSigningUserSigningKey
1045 | SecretName::CrossSigningSelfSigningKey => {
1046 if let Some(public_identity) =
1047 self.get_identity(self.user_id()).await?.and_then(|i| i.own())
1048 {
1049 let identity = self.inner.identity.lock().await;
1050
1051 identity
1052 .import_secret(
1053 public_identity,
1054 &secret.secret_name,
1055 &secret.event.content.secret,
1056 )
1057 .await?;
1058 info!(
1059 secret_name = ?secret.secret_name,
1060 "Successfully imported a private cross signing key"
1061 );
1062
1063 let changes =
1064 Changes { private_identity: Some(identity.clone()), ..Default::default() };
1065
1066 self.save_changes(changes).await?;
1067 }
1068 }
1069 SecretName::RecoveryKey => {
1070 }
1076 name => {
1077 warn!(secret = ?name, "Tried to import an unknown secret");
1078 }
1079 }
1080
1081 Ok(())
1082 }
1083
1084 pub async fn get_only_allow_trusted_devices(&self) -> Result<bool> {
1087 let value = self.get_value("only_allow_trusted_devices").await?.unwrap_or_default();
1088 Ok(value)
1089 }
1090
1091 pub async fn set_only_allow_trusted_devices(
1094 &self,
1095 block_untrusted_devices: bool,
1096 ) -> Result<()> {
1097 self.set_value("only_allow_trusted_devices", &block_untrusted_devices).await
1098 }
1099
1100 pub async fn get_value<T: DeserializeOwned>(&self, key: &str) -> Result<Option<T>> {
1102 let Some(value) = self.get_custom_value(key).await? else {
1103 return Ok(None);
1104 };
1105 let deserialized = self.deserialize_value(&value)?;
1106 Ok(Some(deserialized))
1107 }
1108
1109 pub async fn set_value(&self, key: &str, value: &impl Serialize) -> Result<()> {
1111 let serialized = self.serialize_value(value)?;
1112 self.set_custom_value(key, serialized).await?;
1113 Ok(())
1114 }
1115
1116 fn serialize_value(&self, value: &impl Serialize) -> Result<Vec<u8>> {
1117 let serialized =
1118 rmp_serde::to_vec_named(value).map_err(|x| CryptoStoreError::Backend(x.into()))?;
1119 Ok(serialized)
1120 }
1121
1122 fn deserialize_value<T: DeserializeOwned>(&self, value: &[u8]) -> Result<T> {
1123 let deserialized =
1124 rmp_serde::from_slice(value).map_err(|e| CryptoStoreError::Backend(e.into()))?;
1125 Ok(deserialized)
1126 }
1127
1128 pub fn room_keys_received_stream(
1140 &self,
1141 ) -> impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<> {
1142 self.inner.store.room_keys_received_stream()
1143 }
1144
1145 pub fn room_keys_withheld_received_stream(
1154 &self,
1155 ) -> impl Stream<Item = Vec<RoomKeyWithheldInfo>> + use<> {
1156 self.inner.store.room_keys_withheld_received_stream()
1157 }
1158
1159 pub fn user_identities_stream(&self) -> impl Stream<Item = IdentityUpdates> + use<> {
1190 let verification_machine = self.inner.verification_machine.to_owned();
1191
1192 let this = self.clone();
1193 self.inner.store.identities_stream().map(move |(own_identity, identities, _)| {
1194 let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
1195
1196 let map_identity = |(user_id, identity)| {
1197 (
1198 user_id,
1199 UserIdentity::new(
1200 this.clone(),
1201 identity,
1202 verification_machine.to_owned(),
1203 own_identity.to_owned(),
1204 ),
1205 )
1206 };
1207
1208 let new = new_identities.into_iter().map(map_identity).collect();
1209 let changed = changed_identities.into_iter().map(map_identity).collect();
1210 let unchanged = unchanged_identities.into_iter().map(map_identity).collect();
1211
1212 IdentityUpdates { new, changed, unchanged }
1213 })
1214 }
1215
1216 pub fn devices_stream(&self) -> impl Stream<Item = DeviceUpdates> + use<> {
1248 let verification_machine = self.inner.verification_machine.to_owned();
1249
1250 self.inner.store.identities_stream().map(move |(own_identity, identities, devices)| {
1251 collect_device_updates(
1252 verification_machine.to_owned(),
1253 own_identity,
1254 identities,
1255 devices,
1256 )
1257 })
1258 }
1259
1260 pub fn identities_stream_raw(
1270 &self,
1271 ) -> impl Stream<Item = (IdentityChanges, DeviceChanges)> + use<> {
1272 self.inner.store.identities_stream().map(|(_, identities, devices)| (identities, devices))
1273 }
1274
1275 pub fn create_store_lock(
1278 &self,
1279 lock_key: String,
1280 lock_value: String,
1281 ) -> CrossProcessLock<LockableCryptoStore> {
1282 self.inner.store.create_store_lock(lock_key, lock_value)
1283 }
1284
1285 pub fn secrets_stream(&self) -> impl Stream<Item = GossippedSecret> + use<> {
1325 self.inner.store.secrets_stream()
1326 }
1327
1328 pub fn historic_room_key_stream(&self) -> impl Stream<Item = RoomKeyBundleInfo> + use<> {
1368 self.inner.store.historic_room_key_stream()
1369 }
1370
1371 pub async fn import_room_keys(
1384 &self,
1385 exported_keys: Vec<ExportedRoomKey>,
1386 from_backup_version: Option<&str>,
1387 progress_listener: impl Fn(usize, usize),
1388 ) -> Result<RoomKeyImportResult> {
1389 let exported_keys: Vec<&ExportedRoomKey> = exported_keys.iter().collect();
1390 self.import_sessions_impl(exported_keys, from_backup_version, progress_listener).await
1391 }
1392
1393 pub async fn import_exported_room_keys(
1420 &self,
1421 exported_keys: Vec<ExportedRoomKey>,
1422 progress_listener: impl Fn(usize, usize),
1423 ) -> Result<RoomKeyImportResult> {
1424 self.import_room_keys(exported_keys, None, progress_listener).await
1425 }
1426
1427 async fn import_sessions_impl<T>(
1428 &self,
1429 room_keys: Vec<T>,
1430 from_backup_version: Option<&str>,
1431 progress_listener: impl Fn(usize, usize),
1432 ) -> Result<RoomKeyImportResult>
1433 where
1434 T: TryInto<InboundGroupSession> + RoomKeyExport + Copy,
1435 T::Error: Debug,
1436 {
1437 let mut sessions = Vec::new();
1438
1439 async fn new_session_better(
1440 session: &InboundGroupSession,
1441 old_session: Option<InboundGroupSession>,
1442 ) -> bool {
1443 if let Some(old_session) = &old_session {
1444 session.compare(old_session).await == SessionOrdering::Better
1445 } else {
1446 true
1447 }
1448 }
1449
1450 let total_count = room_keys.len();
1451 let mut keys = BTreeMap::new();
1452
1453 for (i, key) in room_keys.into_iter().enumerate() {
1454 match key.try_into() {
1455 Ok(session) => {
1456 let old_session = self
1457 .inner
1458 .store
1459 .get_inbound_group_session(session.room_id(), session.session_id())
1460 .await?;
1461
1462 if new_session_better(&session, old_session).await {
1465 if from_backup_version.is_some() {
1466 session.mark_as_backed_up();
1467 }
1468
1469 keys.entry(session.room_id().to_owned())
1470 .or_insert_with(BTreeMap::new)
1471 .entry(session.sender_key().to_base64())
1472 .or_insert_with(BTreeSet::new)
1473 .insert(session.session_id().to_owned());
1474
1475 sessions.push(session);
1476 }
1477 }
1478 Err(e) => {
1479 warn!(
1480 sender_key = key.sender_key().to_base64(),
1481 room_id = ?key.room_id(),
1482 session_id = key.session_id(),
1483 error = ?e,
1484 "Couldn't import a room key from a file export."
1485 );
1486 }
1487 }
1488
1489 progress_listener(i, total_count);
1490 }
1491
1492 let imported_count = sessions.len();
1493
1494 self.inner.store.save_inbound_group_sessions(sessions, from_backup_version).await?;
1495
1496 info!(total_count, imported_count, room_keys = ?keys, "Successfully imported room keys");
1497
1498 Ok(RoomKeyImportResult::new(imported_count, total_count, keys))
1499 }
1500
1501 pub(crate) fn crypto_store(&self) -> Arc<CryptoStoreWrapper> {
1502 self.inner.store.clone()
1503 }
1504
1505 pub async fn export_room_keys(
1528 &self,
1529 predicate: impl FnMut(&InboundGroupSession) -> bool,
1530 ) -> Result<Vec<ExportedRoomKey>> {
1531 let mut exported = Vec::new();
1532
1533 let mut sessions = self.get_inbound_group_sessions().await?;
1534 sessions.retain(predicate);
1535
1536 for session in sessions {
1537 let export = session.export().await;
1538 exported.push(export);
1539 }
1540
1541 Ok(exported)
1542 }
1543
1544 pub async fn export_room_keys_stream(
1577 &self,
1578 predicate: impl FnMut(&InboundGroupSession) -> bool,
1579 ) -> Result<impl Stream<Item = ExportedRoomKey>> {
1580 let sessions = self.get_inbound_group_sessions().await?;
1582 Ok(futures_util::stream::iter(sessions.into_iter().filter(predicate))
1583 .then(|session| async move { session.export().await }))
1584 }
1585
1586 pub async fn build_room_key_bundle(
1591 &self,
1592 room_id: &RoomId,
1593 ) -> std::result::Result<RoomKeyBundle, CryptoStoreError> {
1594 let sessions = self.get_inbound_group_sessions_by_room_id(room_id).await?;
1595
1596 let mut bundle = RoomKeyBundle::default();
1597 for session in sessions {
1598 if session.shared_history() {
1599 bundle.room_keys.push(session.export().await.into());
1600 } else {
1601 bundle.withheld.push(RoomKeyWithheldContent::new(
1602 session.algorithm().to_owned(),
1603 WithheldCode::HistoryNotShared,
1604 session.room_id().to_owned(),
1605 session.session_id().to_owned(),
1606 session.sender_key().to_owned(),
1607 self.device_id().to_owned(),
1608 ));
1609 }
1610 }
1611
1612 let withhelds = self.get_withheld_sessions_by_room_id(room_id).await?;
1615 for withheld in withhelds {
1616 if withheld.content.withheld_code() == WithheldCode::HistoryNotShared {
1617 bundle.withheld.push(withheld.content);
1618 }
1619 }
1620
1621 Ok(bundle)
1622 }
1623
1624 #[instrument(skip(self, bundle, progress_listener), fields(bundle_size = bundle.room_keys.len(), sender_data))]
1634 pub async fn receive_room_key_bundle(
1635 &self,
1636 bundle_info: &StoredRoomKeyBundleData,
1637 bundle: RoomKeyBundle,
1638 progress_listener: impl Fn(usize, usize),
1639 ) -> Result<(), CryptoStoreError> {
1640 let sender_data = if bundle_info.sender_data.should_recalculate() {
1641 let device = self
1642 .get_device_from_curve_key(&bundle_info.sender_user, bundle_info.sender_key)
1643 .await?;
1644
1645 device
1646 .as_ref()
1647 .map(SenderData::from_device)
1648 .unwrap_or_else(|| bundle_info.sender_data.clone())
1649 } else {
1650 bundle_info.sender_data.clone()
1651 };
1652
1653 tracing::Span::current().record("sender_data", tracing::field::debug(&sender_data));
1654
1655 if matches!(
1656 &sender_data,
1657 SenderData::UnknownDevice { .. }
1658 | SenderData::VerificationViolation(_)
1659 | SenderData::DeviceInfo { .. }
1660 ) {
1661 warn!(
1662 "Not accepting a historic room key bundle due to insufficient trust in the sender"
1663 );
1664 return Ok(());
1665 }
1666
1667 self.import_room_key_bundle_sessions(bundle_info, &bundle, progress_listener).await?;
1668 self.import_room_key_bundle_withheld_info(bundle_info, &bundle).await?;
1669
1670 Ok(())
1671 }
1672
1673 async fn import_room_key_bundle_sessions(
1674 &self,
1675 bundle_info: &StoredRoomKeyBundleData,
1676 bundle: &RoomKeyBundle,
1677 progress_listener: impl Fn(usize, usize),
1678 ) -> Result<(), CryptoStoreError> {
1679 let (good, bad): (Vec<_>, Vec<_>) = bundle.room_keys.iter().partition_map(|key| {
1680 if key.room_id != bundle_info.bundle_data.room_id {
1681 trace!("Ignoring key for incorrect room {} in bundle", key.room_id);
1682 Either::Right(key)
1683 } else {
1684 Either::Left(key)
1685 }
1686 });
1687
1688 match (bad.is_empty(), good.is_empty()) {
1689 (true, true) => {
1691 warn!("Received a completely empty room key bundle");
1692 }
1693
1694 (false, true) => {
1696 let bad_keys: Vec<_> =
1697 bad.iter().map(|&key| (&key.room_id, &key.session_id)).collect();
1698
1699 warn!(
1700 ?bad_keys,
1701 "Received a room key bundle for the wrong room, ignoring all room keys from the bundle"
1702 );
1703 }
1704
1705 (_, false) => {
1707 if !bad.is_empty() {
1710 warn!(
1711 bad_key_count = bad.len(),
1712 "The room key bundle contained some room keys \
1713 that were meant for a different room"
1714 );
1715 }
1716
1717 self.import_sessions_impl(good, None, progress_listener).await?;
1718 }
1719 }
1720
1721 Ok(())
1722 }
1723
1724 async fn import_room_key_bundle_withheld_info(
1725 &self,
1726 bundle_info: &StoredRoomKeyBundleData,
1727 bundle: &RoomKeyBundle,
1728 ) -> Result<(), CryptoStoreError> {
1729 let mut session_id_to_withheld_code_map = BTreeMap::new();
1730
1731 let mut changes = Changes::default();
1732 for withheld in &bundle.withheld {
1733 let (room_id, session_id) = match withheld {
1734 RoomKeyWithheldContent::MegolmV1AesSha2(c) => match (c.room_id(), c.session_id()) {
1735 (Some(room_id), Some(session_id)) => (room_id, session_id),
1736 _ => continue,
1737 },
1738 #[cfg(feature = "experimental-algorithms")]
1739 RoomKeyWithheldContent::MegolmV2AesSha2(c) => match (c.room_id(), c.session_id()) {
1740 (Some(room_id), Some(session_id)) => (room_id, session_id),
1741 _ => continue,
1742 },
1743 RoomKeyWithheldContent::Unknown(_) => continue,
1744 };
1745
1746 if room_id != bundle_info.bundle_data.room_id {
1747 trace!("Ignoring withheld info for incorrect room {} in bundle", room_id);
1748 continue;
1749 }
1750
1751 changes.withheld_session_info.entry(room_id.to_owned()).or_default().insert(
1752 session_id.to_owned(),
1753 RoomKeyWithheldEntry {
1754 sender: bundle_info.sender_user.clone(),
1755 content: withheld.to_owned(),
1756 },
1757 );
1758 session_id_to_withheld_code_map.insert(session_id, withheld.withheld_code());
1759 }
1760
1761 self.save_changes(changes).await?;
1762
1763 info!(
1764 room_id = ?bundle_info.bundle_data.room_id,
1765 ?session_id_to_withheld_code_map,
1766 "Successfully imported withheld info from room key bundle",
1767 );
1768
1769 Ok(())
1770 }
1771}
1772
1773impl Deref for Store {
1774 type Target = DynCryptoStore;
1775
1776 fn deref(&self) -> &Self::Target {
1777 self.inner.store.deref().deref()
1778 }
1779}
1780
1781#[derive(Clone, Debug)]
1783pub struct LockableCryptoStore(Arc<dyn CryptoStore<Error = CryptoStoreError>>);
1784
1785impl matrix_sdk_common::cross_process_lock::TryLock for LockableCryptoStore {
1786 type LockError = CryptoStoreError;
1787
1788 async fn try_lock(
1789 &self,
1790 lease_duration_ms: u32,
1791 key: &str,
1792 holder: &str,
1793 ) -> std::result::Result<bool, Self::LockError> {
1794 self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
1795 }
1796}
1797
1798#[cfg(test)]
1799mod tests {
1800 use std::pin::pin;
1801
1802 use assert_matches2::assert_matches;
1803 use futures_util::StreamExt;
1804 use insta::{_macro_support::Content, assert_json_snapshot, internals::ContentPath};
1805 use matrix_sdk_test::async_test;
1806 use ruma::{
1807 device_id,
1808 events::room::{EncryptedFileInit, JsonWebKeyInit},
1809 owned_device_id, owned_mxc_uri, room_id,
1810 serde::Base64,
1811 user_id, RoomId,
1812 };
1813 use serde_json::json;
1814 use vodozemac::megolm::SessionKey;
1815
1816 use crate::{
1817 machine::test_helpers::get_machine_pair,
1818 olm::{InboundGroupSession, SenderData},
1819 store::types::{DehydratedDeviceKey, RoomKeyWithheldEntry, StoredRoomKeyBundleData},
1820 types::{
1821 events::{
1822 room_key_bundle::RoomKeyBundleContent,
1823 room_key_withheld::{MegolmV1AesSha2WithheldContent, RoomKeyWithheldContent},
1824 },
1825 EventEncryptionAlgorithm,
1826 },
1827 OlmMachine,
1828 };
1829
1830 #[async_test]
1831 async fn test_import_room_keys_notifies_stream() {
1832 use futures_util::FutureExt;
1833
1834 let (alice, bob, _) =
1835 get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
1836
1837 let room1_id = room_id!("!room1:localhost");
1838 alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
1839 let exported_sessions = alice.store().export_room_keys(|_| true).await.unwrap();
1840
1841 let mut room_keys_received_stream = Box::pin(bob.store().room_keys_received_stream());
1842 bob.store().import_room_keys(exported_sessions, None, |_, _| {}).await.unwrap();
1843
1844 let room_keys = room_keys_received_stream
1845 .next()
1846 .now_or_never()
1847 .flatten()
1848 .expect("We should have received an update of room key infos")
1849 .unwrap();
1850 assert_eq!(room_keys.len(), 1);
1851 assert_eq!(room_keys[0].room_id, "!room1:localhost");
1852 }
1853
1854 #[async_test]
1855 async fn test_export_room_keys_provides_selected_keys() {
1856 let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
1858 let room1_id = room_id!("!room1:localhost");
1859 let room2_id = room_id!("!room2:localhost");
1860 let room3_id = room_id!("!room3:localhost");
1861 alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
1862 alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
1863 alice.create_outbound_group_session_with_defaults_test_helper(room3_id).await.unwrap();
1864
1865 let keys = alice
1867 .store()
1868 .export_room_keys(|s| s.room_id() == room2_id || s.room_id() == room3_id)
1869 .await
1870 .unwrap();
1871
1872 assert_eq!(keys.len(), 2);
1874 assert_eq!(keys[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
1875 assert_eq!(keys[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
1876 assert_eq!(keys[0].room_id, "!room2:localhost");
1877 assert_eq!(keys[1].room_id, "!room3:localhost");
1878 assert_eq!(keys[0].session_key.to_base64().len(), 220);
1879 assert_eq!(keys[1].session_key.to_base64().len(), 220);
1880 }
1881
1882 #[async_test]
1883 async fn test_export_room_keys_stream_can_provide_all_keys() {
1884 let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
1886 let room1_id = room_id!("!room1:localhost");
1887 let room2_id = room_id!("!room2:localhost");
1888 alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
1889 alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
1890
1891 let mut keys = pin!(alice.store().export_room_keys_stream(|_| true).await.unwrap());
1893
1894 let mut collected = vec![];
1896 while let Some(key) = keys.next().await {
1897 collected.push(key);
1898 }
1899
1900 assert_eq!(collected.len(), 2);
1902 assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
1903 assert_eq!(collected[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
1904 assert_eq!(collected[0].room_id, "!room1:localhost");
1905 assert_eq!(collected[1].room_id, "!room2:localhost");
1906 assert_eq!(collected[0].session_key.to_base64().len(), 220);
1907 assert_eq!(collected[1].session_key.to_base64().len(), 220);
1908 }
1909
1910 #[async_test]
1911 async fn test_export_room_keys_stream_can_provide_a_subset_of_keys() {
1912 let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
1914 let room1_id = room_id!("!room1:localhost");
1915 let room2_id = room_id!("!room2:localhost");
1916 alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
1917 alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
1918
1919 let mut keys =
1921 pin!(alice.store().export_room_keys_stream(|s| s.room_id() == room1_id).await.unwrap());
1922
1923 let mut collected = vec![];
1925 while let Some(key) = keys.next().await {
1926 collected.push(key);
1927 }
1928
1929 assert_eq!(collected.len(), 1);
1931 assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
1932 assert_eq!(collected[0].room_id, "!room1:localhost");
1933 assert_eq!(collected[0].session_key.to_base64().len(), 220);
1934 }
1935
1936 #[async_test]
1937 async fn test_export_secrets_bundle() {
1938 let user_id = user_id!("@alice:example.com");
1939 let (first, second, _) = get_machine_pair(user_id, user_id, false).await;
1940
1941 let _ = first
1942 .bootstrap_cross_signing(false)
1943 .await
1944 .expect("We should be able to bootstrap cross-signing");
1945
1946 let bundle = first.store().export_secrets_bundle().await.expect(
1947 "We should be able to export the secrets bundle, now that we \
1948 have the cross-signing keys",
1949 );
1950
1951 assert!(bundle.backup.is_none(), "The bundle should not contain a backup key");
1952
1953 second
1954 .store()
1955 .import_secrets_bundle(&bundle)
1956 .await
1957 .expect("We should be able to import the secrets bundle");
1958
1959 let status = second.cross_signing_status().await;
1960 let identity = second.get_identity(user_id, None).await.unwrap().unwrap().own().unwrap();
1961
1962 assert!(identity.is_verified(), "The public identity should be marked as verified.");
1963
1964 assert!(status.is_complete(), "We should have imported all the cross-signing keys");
1965 }
1966
1967 #[async_test]
1968 async fn test_create_dehydrated_device_key() {
1969 let pickle_key = DehydratedDeviceKey::new()
1970 .expect("Should be able to create a random dehydrated device key");
1971
1972 let to_vec = pickle_key.inner.to_vec();
1973 let pickle_key_from_slice = DehydratedDeviceKey::from_slice(to_vec.as_slice())
1974 .expect("Should be able to create a dehydrated device key from slice");
1975
1976 assert_eq!(pickle_key_from_slice.to_base64(), pickle_key.to_base64());
1977 }
1978
1979 #[async_test]
1980 async fn test_create_dehydrated_errors() {
1981 let too_small = [0u8; 22];
1982 let pickle_key = DehydratedDeviceKey::from_slice(&too_small);
1983
1984 assert!(pickle_key.is_err());
1985
1986 let too_big = [0u8; 40];
1987 let pickle_key = DehydratedDeviceKey::from_slice(&too_big);
1988
1989 assert!(pickle_key.is_err());
1990 }
1991
1992 #[async_test]
1993 async fn test_build_room_key_bundle() {
1994 let alice = OlmMachine::new(user_id!("@a:s.co"), device_id!("ALICE")).await;
1997 let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("BOB")).await;
1998
1999 let room1_id = room_id!("!room1:localhost");
2000 let room2_id = room_id!("!room2:localhost");
2001
2002 let session_key1 = "AgAAAAC2XHVzsMBKs4QCRElJ92CJKyGtknCSC8HY7cQ7UYwndMKLQAejXLh5UA0l6s736mgctcUMNvELScUWrObdflrHo+vth/gWreXOaCnaSxmyjjKErQwyIYTkUfqbHy40RJfEesLwnN23on9XAkch/iy8R2+Jz7B8zfG01f2Ow2SxPQFnAndcO1ZSD2GmXgedy6n4B20MWI1jGP2wiexOWbFSya8DO/VxC9m5+/mF+WwYqdpKn9g4Y05Yw4uz7cdjTc3rXm7xK+8E7hI//5QD1nHPvuKYbjjM9u2JSL+Bzp61Cw";
2007 let session_key2 = "AgAAAAC1BXreFTUQQSBGekTEuYxhdytRKyv4JgDGcG+VOBYdPNGgs807SdibCGJky4lJ3I+7ZDGHoUzZPZP/4ogGu4kxni0PWdtWuN7+5zsuamgoFF/BkaGeUUGv6kgIkx8pyPpM5SASTUEP9bN2loDSpUPYwfiIqz74DgC4WQ4435sTBctYvKz8n+TDJwdLXpyT6zKljuqADAioud+s/iqx9LYn9HpbBfezZcvbg67GtE113pLrvde3IcPI5s6dNHK2onGO2B2eoaobcen18bbEDnlUGPeIivArLya7Da6us14jBQ";
2008 let session_key3 = "AgAAAAAM9KFsliaUUhGSXgwOzM5UemjkNH4n8NHgvC/y8hhw13zTF+ooGD4uIYEXYX630oNvQm/EvgZo+dkoc0re+vsqsx4sQeNODdSjcBsWOa0oDF+irQn9oYoLUDPI1IBtY1rX+FV99Zm/xnG7uFOX7aTVlko2GSdejy1w9mfobmfxu5aUc04A9zaKJP1pOthZvRAlhpymGYHgsDtWPrrjyc/yypMflE4kIUEEEtu1kT6mrAmcl615XYRAHYK9G2+fZsGvokwzbkl4nulGwcZMpQEoM0nD2o3GWgX81HW3nGfKBg";
2009 let session_key4 = "AgAAAAA4Kkesxq2h4v9PLD6Sm3Smxspz1PXTqytQPCMQMkkrHNmzV2bHlJ+6/Al9cu8vh1Oj69AK0WUAeJOJuaiskEeg/PI3P03+UYLeC379RzgqwSHdBgdQ41G2vD6zpgmE/8vYToe+qpCZACtPOswZxyqxHH+T/Iq0nv13JmlFGIeA6fEPfr5Y28B49viG74Fs9rxV9EH5PfjbuPM/p+Sz5obShuaBPKQBX1jT913nEXPoIJ06exNZGr0285nw/LgVvNlmWmbqNnbzO2cNZjQWA+xZYz5FSfyCxwqEBbEdUCuRCQ";
2010
2011 let sessions = [
2012 create_inbound_group_session_with_visibility(
2013 &alice,
2014 room1_id,
2015 &SessionKey::from_base64(session_key1).unwrap(),
2016 true,
2017 ),
2018 create_inbound_group_session_with_visibility(
2019 &alice,
2020 room1_id,
2021 &SessionKey::from_base64(session_key2).unwrap(),
2022 true,
2023 ),
2024 create_inbound_group_session_with_visibility(
2025 &alice,
2026 room1_id,
2027 &SessionKey::from_base64(session_key3).unwrap(),
2028 false,
2029 ),
2030 create_inbound_group_session_with_visibility(
2031 &alice,
2032 room2_id,
2033 &SessionKey::from_base64(session_key4).unwrap(),
2034 true,
2035 ),
2036 ];
2037 bob.store().save_inbound_group_sessions(&sessions).await.unwrap();
2038
2039 let mut bundle = bob.store().build_room_key_bundle(room1_id).await.unwrap();
2041
2042 bundle.room_keys.sort_by_key(|session| session.session_id.clone());
2046
2047 let algorithm = if cfg!(feature = "experimental-algorithms") {
2049 "m.megolm.v2.aes-sha2"
2050 } else {
2051 "m.megolm.v1.aes-sha2"
2052 };
2053 let map_algorithm = move |value: Content, _path: ContentPath<'_>| {
2054 assert_eq!(value.as_str().unwrap(), algorithm);
2055 "[algorithm]"
2056 };
2057
2058 let alice_curve_key = alice.identity_keys().curve25519.to_base64();
2060 let map_alice_curve_key = move |value: Content, _path: ContentPath<'_>| {
2061 assert_eq!(value.as_str().unwrap(), alice_curve_key);
2062 "[alice curve key]"
2063 };
2064 let alice_ed25519_key = alice.identity_keys().ed25519.to_base64();
2065 let map_alice_ed25519_key = move |value: Content, _path: ContentPath<'_>| {
2066 assert_eq!(value.as_str().unwrap(), alice_ed25519_key);
2067 "[alice ed25519 key]"
2068 };
2069
2070 insta::with_settings!({ sort_maps => true }, {
2071 assert_json_snapshot!(bundle, {
2072 ".withheld[].algorithm" => insta::dynamic_redaction(map_algorithm),
2073 ".room_keys[].algorithm" => insta::dynamic_redaction(map_algorithm),
2074 ".room_keys[].sender_key" => insta::dynamic_redaction(map_alice_curve_key.clone()),
2075 ".withheld[].sender_key" => insta::dynamic_redaction(map_alice_curve_key),
2076 ".room_keys[].sender_claimed_keys.ed25519" => insta::dynamic_redaction(map_alice_ed25519_key),
2077 });
2078 });
2079 }
2080
2081 #[async_test]
2082 async fn test_receive_room_key_bundle() {
2083 let alice = OlmMachine::new(user_id!("@a:s.co"), device_id!("ALICE")).await;
2084 let alice_key = alice.identity_keys().curve25519;
2085 let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("BOB")).await;
2086
2087 let room_id = room_id!("!room1:localhost");
2088
2089 let session_key1 = "AgAAAAC2XHVzsMBKs4QCRElJ92CJKyGtknCSC8HY7cQ7UYwndMKLQAejXLh5UA0l6s736mgctcUMNvELScUWrObdflrHo+vth/gWreXOaCnaSxmyjjKErQwyIYTkUfqbHy40RJfEesLwnN23on9XAkch/iy8R2+Jz7B8zfG01f2Ow2SxPQFnAndcO1ZSD2GmXgedy6n4B20MWI1jGP2wiexOWbFSya8DO/VxC9m5+/mF+WwYqdpKn9g4Y05Yw4uz7cdjTc3rXm7xK+8E7hI//5QD1nHPvuKYbjjM9u2JSL+Bzp61Cw";
2090 let session_key2 = "AgAAAAC1BXreFTUQQSBGekTEuYxhdytRKyv4JgDGcG+VOBYdPNGgs807SdibCGJky4lJ3I+7ZDGHoUzZPZP/4ogGu4kxni0PWdtWuN7+5zsuamgoFF/BkaGeUUGv6kgIkx8pyPpM5SASTUEP9bN2loDSpUPYwfiIqz74DgC4WQ4435sTBctYvKz8n+TDJwdLXpyT6zKljuqADAioud+s/iqx9LYn9HpbBfezZcvbg67GtE113pLrvde3IcPI5s6dNHK2onGO2B2eoaobcen18bbEDnlUGPeIivArLya7Da6us14jBQ";
2091
2092 let sessions = [
2093 create_inbound_group_session_with_visibility(
2094 &alice,
2095 room_id,
2096 &SessionKey::from_base64(session_key1).unwrap(),
2097 true,
2098 ),
2099 create_inbound_group_session_with_visibility(
2100 &alice,
2101 room_id,
2102 &SessionKey::from_base64(session_key2).unwrap(),
2103 false,
2104 ),
2105 ];
2106
2107 alice.store().save_inbound_group_sessions(&sessions).await.unwrap();
2108 let bundle = alice.store().build_room_key_bundle(room_id).await.unwrap();
2109
2110 bob.store()
2111 .receive_room_key_bundle(
2112 &StoredRoomKeyBundleData {
2113 sender_user: alice.user_id().to_owned(),
2114 sender_key: alice_key,
2115 sender_data: SenderData::sender_verified(
2116 alice.user_id(),
2117 device_id!("ALICE"),
2118 alice.identity_keys().ed25519,
2119 ),
2120
2121 bundle_data: RoomKeyBundleContent {
2122 room_id: room_id.to_owned(),
2123 file: EncryptedFileInit {
2126 url: owned_mxc_uri!("mxc://example.com/0"),
2127 key: JsonWebKeyInit {
2128 kty: "oct".to_owned(),
2129 key_ops: vec!["encrypt".to_owned(), "decrypt".to_owned()],
2130 alg: "A256CTR.".to_owned(),
2131 k: Base64::new(vec![0u8; 128]),
2132 ext: true,
2133 }
2134 .into(),
2135 iv: Base64::new(vec![0u8; 128]),
2136 hashes: vec![("sha256".to_owned(), Base64::new(vec![0u8; 128]))]
2137 .into_iter()
2138 .collect(),
2139 v: "v2".to_owned(),
2140 }
2141 .into(),
2142 },
2143 },
2144 bundle,
2145 |_, _| {},
2146 )
2147 .await
2148 .unwrap();
2149
2150 let imported_sessions =
2152 bob.store().get_inbound_group_sessions_by_room_id(room_id).await.unwrap();
2153
2154 assert_eq!(imported_sessions.len(), 1);
2155 assert_eq!(imported_sessions[0].room_id(), room_id);
2156
2157 assert_matches!(
2158 bob.store()
2159 .get_withheld_info(room_id, sessions[1].session_id())
2160 .await
2161 .unwrap()
2162 .expect("Withheld info should be present in the store."),
2163 RoomKeyWithheldEntry {
2164 #[cfg(not(feature = "experimental-algorithms"))]
2165 content: RoomKeyWithheldContent::MegolmV1AesSha2(
2166 MegolmV1AesSha2WithheldContent::HistoryNotShared(_)
2167 ),
2168 #[cfg(feature = "experimental-algorithms")]
2169 content: RoomKeyWithheldContent::MegolmV2AesSha2(
2170 MegolmV1AesSha2WithheldContent::HistoryNotShared(_)
2171 ),
2172 ..
2173 }
2174 );
2175 }
2176
2177 #[async_test]
2183 async fn test_deserialize_room_key_withheld_entry_from_to_device_event() {
2184 let entry: RoomKeyWithheldEntry = serde_json::from_value(json!(
2185 {
2186 "content": {
2187 "algorithm": "m.megolm.v1.aes-sha2",
2188 "code": "m.unauthorised",
2189 "from_device": "ALICE",
2190 "reason": "You are not authorised to read the message.",
2191 "room_id": "!roomid:s.co",
2192 "sender_key": "7hIcOrEroXYdzjtCBvBjUiqvT0Me7g+ymeXqoc65RS0",
2193 "session_id": "session123"
2194 },
2195 "sender": "@alice:s.co",
2196 "type": "m.room_key.withheld"
2197 }
2198 ))
2199 .unwrap();
2200
2201 assert_matches!(
2202 entry,
2203 RoomKeyWithheldEntry {
2204 sender,
2205 content: RoomKeyWithheldContent::MegolmV1AesSha2(
2206 MegolmV1AesSha2WithheldContent::Unauthorised(withheld_content,)
2207 ),
2208 }
2209 );
2210
2211 assert_eq!(sender, "@alice:s.co");
2212 assert_eq!(withheld_content.room_id, "!roomid:s.co");
2213 assert_eq!(withheld_content.session_id, "session123");
2214 assert_eq!(
2215 withheld_content.sender_key.to_base64(),
2216 "7hIcOrEroXYdzjtCBvBjUiqvT0Me7g+ymeXqoc65RS0"
2217 );
2218 assert_eq!(withheld_content.from_device, Some(owned_device_id!("ALICE")));
2219 }
2220
2221 fn create_inbound_group_session_with_visibility(
2232 olm_machine: &OlmMachine,
2233 room_id: &RoomId,
2234 session_key: &SessionKey,
2235 shared_history: bool,
2236 ) -> InboundGroupSession {
2237 let identity_keys = &olm_machine.store().static_account().identity_keys;
2238 InboundGroupSession::new(
2239 identity_keys.curve25519,
2240 identity_keys.ed25519,
2241 room_id,
2242 session_key,
2243 SenderData::unknown(),
2244 #[cfg(not(feature = "experimental-algorithms"))]
2245 EventEncryptionAlgorithm::MegolmV1AesSha2,
2246 #[cfg(feature = "experimental-algorithms")]
2247 EventEncryptionAlgorithm::MegolmV2AesSha2,
2248 None,
2249 shared_history,
2250 )
2251 .unwrap()
2252 }
2253}