1use std::{
45 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
46 fmt::Debug,
47 ops::Deref,
48 pin::pin,
49 sync::{Arc, atomic::Ordering},
50 time::Duration,
51};
52
53use as_variant::as_variant;
54use futures_core::Stream;
55use futures_util::StreamExt;
56use itertools::{Either, Itertools};
57use ruma::{
58 DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, RoomId, UserId,
59 encryption::KeyUsage, events::secret::request::SecretName,
60};
61use serde::{Serialize, de::DeserializeOwned};
62use thiserror::Error;
63use tokio::sync::{Mutex, Notify, OwnedRwLockWriteGuard, RwLock};
64use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
65use tracing::{info, instrument, trace, warn};
66use types::{RoomKeyBundleInfo, StoredRoomKeyBundleData};
67use vodozemac::{Curve25519PublicKey, megolm::SessionOrdering};
68
69use self::types::{
70 Changes, CrossSigningKeyExport, DeviceChanges, DeviceUpdates, IdentityChanges, IdentityUpdates,
71 PendingChanges, RoomKeyInfo, RoomKeyWithheldInfo, RoomPendingKeyBundleDetails,
72 UserKeyQueryResult,
73};
74use crate::{
75 CrossSigningStatus, OwnUserIdentityData, RoomKeyImportResult,
76 gossiping::GossippedSecret,
77 identities::{Device, DeviceData, UserDevices, UserIdentityData, user::UserIdentity},
78 olm::{
79 Account, ExportedRoomKey, ForwarderData, InboundGroupSession, PrivateCrossSigningIdentity,
80 SenderData, Session, StaticAccountData,
81 },
82 store::types::{RoomKeyWithheldEntry, SecretsInboxItem},
83 types::{
84 BackupSecrets, CrossSigningSecrets, MegolmBackupV1Curve25519AesSha2Secrets, RoomKeyExport,
85 SecretsBundle,
86 },
87 verification::VerificationMachine,
88};
89#[cfg(doc)]
90use crate::{backups::BackupMachine, identities::OwnUserIdentity};
91
92pub mod caches;
93mod crypto_store_wrapper;
94mod error;
95mod memorystore;
96mod traits;
97pub mod types;
98
99#[cfg(any(test, feature = "testing"))]
100#[macro_use]
101#[allow(missing_docs)]
102pub mod integration_tests;
103
104pub(crate) use crypto_store_wrapper::CryptoStoreWrapper;
105pub use error::{CryptoStoreError, Result};
106use matrix_sdk_common::{
107 cross_process_lock::{CrossProcessLock, CrossProcessLockConfig, CrossProcessLockGeneration},
108 deserialized_responses::WithheldCode,
109 timeout::timeout,
110};
111pub use memorystore::MemoryStore;
112pub use traits::{CryptoStore, DynCryptoStore, IntoCryptoStore};
113
114use self::caches::{SequenceNumber, StoreCache, StoreCacheGuard, UsersForKeyQuery};
115use crate::types::{
116 events::room_key_withheld::RoomKeyWithheldContent, room_history::RoomKeyBundle,
117};
118pub use crate::{
119 dehydrated_devices::DehydrationError,
120 gossiping::{GossipRequest, SecretInfo},
121};
122
123#[derive(Debug, Clone)]
130pub struct Store {
131 inner: Arc<StoreInner>,
132}
133
134#[derive(Debug, Default)]
135pub(crate) struct KeyQueryManager {
136 users_for_key_query: Mutex<UsersForKeyQuery>,
138
139 users_for_key_query_notify: Notify,
141}
142
143impl KeyQueryManager {
144 pub async fn synced<'a>(&'a self, cache: &'a StoreCache) -> Result<SyncedKeyQueryManager<'a>> {
145 self.ensure_sync_tracked_users(cache).await?;
146 Ok(SyncedKeyQueryManager { cache, manager: self })
147 }
148
149 async fn ensure_sync_tracked_users(&self, cache: &StoreCache) -> Result<()> {
156 let loaded = cache.loaded_tracked_users.read().await;
158 if *loaded {
159 return Ok(());
160 }
161
162 drop(loaded);
164 let mut loaded = cache.loaded_tracked_users.write().await;
165
166 if *loaded {
170 return Ok(());
171 }
172
173 let tracked_users = cache.store.load_tracked_users().await?;
174
175 let mut query_users_lock = self.users_for_key_query.lock().await;
176 let mut tracked_users_cache = cache.tracked_users.write();
177 for user in tracked_users {
178 tracked_users_cache.insert(user.user_id.to_owned());
179
180 if user.dirty {
181 query_users_lock.insert_user(&user.user_id);
182 }
183 }
184
185 *loaded = true;
186
187 Ok(())
188 }
189
190 pub async fn wait_if_user_key_query_pending(
200 &self,
201 cache: StoreCacheGuard,
202 timeout_duration: Duration,
203 user: &UserId,
204 ) -> Result<UserKeyQueryResult> {
205 {
206 self.ensure_sync_tracked_users(&cache).await?;
209 drop(cache);
210 }
211
212 let mut users_for_key_query = self.users_for_key_query.lock().await;
213 let Some(waiter) = users_for_key_query.maybe_register_waiting_task(user) else {
214 return Ok(UserKeyQueryResult::WasNotPending);
215 };
216
217 let wait_for_completion = async {
218 while !waiter.completed.load(Ordering::Relaxed) {
219 let mut notified = pin!(self.users_for_key_query_notify.notified());
223 notified.as_mut().enable();
224 drop(users_for_key_query);
225
226 notified.await;
228
229 users_for_key_query = self.users_for_key_query.lock().await;
233 }
234 };
235
236 match timeout(Box::pin(wait_for_completion), timeout_duration).await {
237 Err(_) => {
238 warn!(
239 user_id = ?user,
240 "The user has a pending `/keys/query` request which did \
241 not finish yet, some devices might be missing."
242 );
243
244 Ok(UserKeyQueryResult::TimeoutExpired)
245 }
246 _ => Ok(UserKeyQueryResult::WasPending),
247 }
248 }
249}
250
251pub(crate) struct SyncedKeyQueryManager<'a> {
252 cache: &'a StoreCache,
253 manager: &'a KeyQueryManager,
254}
255
256impl SyncedKeyQueryManager<'_> {
257 pub async fn update_tracked_users(&self, users: impl Iterator<Item = &UserId>) -> Result<()> {
262 let mut store_updates = Vec::new();
263 let mut key_query_lock = self.manager.users_for_key_query.lock().await;
264
265 {
266 let mut tracked_users = self.cache.tracked_users.write();
267 for user_id in users {
268 if tracked_users.insert(user_id.to_owned()) {
269 key_query_lock.insert_user(user_id);
270 store_updates.push((user_id, true))
271 }
272 }
273 }
274
275 self.cache.store.save_tracked_users(&store_updates).await
276 }
277
278 pub async fn mark_tracked_users_as_changed(
285 &self,
286 users: impl Iterator<Item = &UserId>,
287 ) -> Result<()> {
288 let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
289 let mut key_query_lock = self.manager.users_for_key_query.lock().await;
290
291 {
292 let tracked_users = &self.cache.tracked_users.read();
293 for user_id in users {
294 if tracked_users.contains(user_id) {
295 key_query_lock.insert_user(user_id);
296 store_updates.push((user_id, true));
297 }
298 }
299 }
300
301 self.cache.store.save_tracked_users(&store_updates).await
302 }
303
304 pub async fn mark_tracked_users_as_up_to_date(
310 &self,
311 users: impl Iterator<Item = &UserId>,
312 sequence_number: SequenceNumber,
313 ) -> Result<()> {
314 let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
315 let mut key_query_lock = self.manager.users_for_key_query.lock().await;
316
317 {
318 let tracked_users = self.cache.tracked_users.read();
319 for user_id in users {
320 if tracked_users.contains(user_id) {
321 let clean = key_query_lock.maybe_remove_user(user_id, sequence_number);
322 store_updates.push((user_id, !clean));
323 }
324 }
325 }
326
327 self.cache.store.save_tracked_users(&store_updates).await?;
328 self.manager.users_for_key_query_notify.notify_waiters();
330
331 Ok(())
332 }
333
334 pub async fn users_for_key_query(&self) -> (HashSet<OwnedUserId>, SequenceNumber) {
346 self.manager.users_for_key_query.lock().await.users_for_key_query()
347 }
348
349 pub fn tracked_users(&self) -> HashSet<OwnedUserId> {
351 self.cache.tracked_users.read().iter().cloned().collect()
352 }
353
354 pub async fn mark_user_as_changed(&self, user: &UserId) -> Result<()> {
360 self.manager.users_for_key_query.lock().await.insert_user(user);
361 self.cache.tracked_users.write().insert(user.to_owned());
362
363 self.cache.store.save_tracked_users(&[(user, true)]).await
364 }
365}
366
367fn collect_device_updates(
373 verification_machine: VerificationMachine,
374 own_identity: Option<OwnUserIdentityData>,
375 identities: IdentityChanges,
376 devices: DeviceChanges,
377) -> DeviceUpdates {
378 let mut new: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
379 let mut changed: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
380
381 let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
382
383 let map_device = |device: DeviceData| {
384 let device_owner_identity = new_identities
385 .get(device.user_id())
386 .or_else(|| changed_identities.get(device.user_id()))
387 .or_else(|| unchanged_identities.get(device.user_id()))
388 .cloned();
389
390 Device {
391 inner: device,
392 verification_machine: verification_machine.to_owned(),
393 own_identity: own_identity.to_owned(),
394 device_owner_identity,
395 }
396 };
397
398 for device in devices.new {
399 let device = map_device(device);
400
401 new.entry(device.user_id().to_owned())
402 .or_default()
403 .insert(device.device_id().to_owned(), device);
404 }
405
406 for device in devices.changed {
407 let device = map_device(device);
408
409 changed
410 .entry(device.user_id().to_owned())
411 .or_default()
412 .insert(device.device_id().to_owned(), device.to_owned());
413 }
414
415 DeviceUpdates { new, changed }
416}
417
418#[allow(missing_debug_implementations)]
420pub struct StoreTransaction {
421 store: Store,
422 changes: PendingChanges,
423 cache: OwnedRwLockWriteGuard<StoreCache>,
425}
426
427impl StoreTransaction {
428 async fn new(store: Store) -> Self {
430 let cache = store.inner.cache.clone();
431
432 Self { store, changes: PendingChanges::default(), cache: cache.clone().write_owned().await }
433 }
434
435 pub(crate) fn cache(&self) -> &StoreCache {
436 &self.cache
437 }
438
439 pub fn store(&self) -> &Store {
441 &self.store
442 }
443
444 pub async fn account(&mut self) -> Result<&mut Account> {
451 if self.changes.account.is_none() {
452 let _ = self.cache.account().await?;
454 self.changes.account = self.cache.account.lock().await.take();
455 }
456 Ok(self.changes.account.as_mut().unwrap())
457 }
458
459 pub async fn commit(self) -> Result<()> {
462 if self.changes.is_empty() {
463 return Ok(());
464 }
465
466 let account = self.changes.account.as_ref().map(|acc| acc.deep_clone());
468
469 self.store.save_pending_changes(self.changes).await?;
470
471 if let Some(account) = account {
473 *self.cache.account.lock().await = Some(account);
474 }
475
476 Ok(())
477 }
478}
479
480#[derive(Debug)]
481struct StoreInner {
482 identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
483 store: Arc<CryptoStoreWrapper>,
484
485 cache: Arc<RwLock<StoreCache>>,
489
490 verification_machine: VerificationMachine,
491
492 static_account: StaticAccountData,
495}
496
497#[derive(Debug, Error)]
500pub enum SecretImportError {
501 #[error("Error while importing {name}: {error}")]
503 Key {
504 name: SecretName,
506 error: vodozemac::KeyError,
508 },
509 #[error(
512 "Error while importing {name}: The public key of the imported private \
513 key doesn't match the public key that was uploaded to the server"
514 )]
515 MismatchedPublicKeys {
516 name: SecretName,
518 },
519 #[error(transparent)]
521 Store(#[from] CryptoStoreError),
522}
523
524#[derive(Debug, Error)]
529pub enum SecretsBundleExportError {
530 #[error(transparent)]
532 Store(#[from] CryptoStoreError),
533 #[error("The store is missing one or multiple cross-signing keys")]
535 MissingCrossSigningKey(KeyUsage),
536 #[error("The store doesn't contain any cross-signing keys")]
538 MissingCrossSigningKeys,
539 #[error("The store contains a backup key, but no backup version")]
542 MissingBackupVersion,
543}
544
545impl Store {
546 pub(crate) fn new(
548 account: StaticAccountData,
549 identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
550 store: Arc<CryptoStoreWrapper>,
551 verification_machine: VerificationMachine,
552 ) -> Self {
553 Self {
554 inner: Arc::new(StoreInner {
555 static_account: account,
556 identity,
557 store: store.clone(),
558 verification_machine,
559 cache: Arc::new(RwLock::new(StoreCache {
560 store,
561 tracked_users: Default::default(),
562 loaded_tracked_users: Default::default(),
563 account: Default::default(),
564 })),
565 }),
566 }
567 }
568
569 pub(crate) fn user_id(&self) -> &UserId {
571 &self.inner.static_account.user_id
572 }
573
574 pub(crate) fn device_id(&self) -> &DeviceId {
576 self.inner.verification_machine.own_device_id()
577 }
578
579 pub(crate) fn static_account(&self) -> &StaticAccountData {
581 &self.inner.static_account
582 }
583
584 pub(crate) async fn cache(&self) -> Result<StoreCacheGuard> {
585 Ok(StoreCacheGuard { cache: self.inner.cache.clone().read_owned().await })
586 }
587
588 pub(crate) async fn transaction(&self) -> StoreTransaction {
589 StoreTransaction::new(self.clone()).await
590 }
591
592 pub(crate) async fn with_transaction<T>(
593 &self,
594 func: impl AsyncFnOnce(&mut StoreTransaction) -> Result<T, crate::OlmError>,
595 ) -> Result<T, crate::OlmError> {
596 let mut tr = self.transaction().await;
597 let res = func(&mut tr).await?;
598 tr.commit().await?;
599 Ok(res)
600 }
601
602 #[cfg(test)]
603 pub(crate) async fn reset_cross_signing_identity(&self) {
605 self.inner.identity.lock().await.reset();
606 }
607
608 pub(crate) fn private_identity(&self) -> Arc<Mutex<PrivateCrossSigningIdentity>> {
610 self.inner.identity.clone()
611 }
612
613 pub(crate) async fn save_sessions(&self, sessions: &[Session]) -> Result<()> {
615 let changes = Changes { sessions: sessions.to_vec(), ..Default::default() };
616
617 self.save_changes(changes).await
618 }
619
620 pub(crate) async fn get_sessions(
621 &self,
622 sender_key: &str,
623 ) -> Result<Option<Arc<Mutex<Vec<Session>>>>> {
624 self.inner.store.get_sessions(sender_key).await
625 }
626
627 pub(crate) async fn save_changes(&self, changes: Changes) -> Result<()> {
628 self.inner.store.save_changes(changes).await
629 }
630
631 pub(crate) async fn merge_received_group_session(
639 &self,
640 session: InboundGroupSession,
641 ) -> Result<Option<InboundGroupSession>> {
642 let old_session = self
643 .inner
644 .store
645 .get_inbound_group_session(session.room_id(), session.session_id())
646 .await?;
647
648 let Some(old_session) = old_session else {
650 info!("Received a new megolm room key");
651 return Ok(Some(session));
652 };
653
654 let index_comparison = session.compare_ratchet(&old_session).await;
655 let trust_level_comparison =
656 session.sender_data.compare_trust_level(&old_session.sender_data);
657
658 let result = match (index_comparison, trust_level_comparison) {
659 (SessionOrdering::Unconnected, _) => {
660 warn!(
664 "Received a group session with an ratchet that does not connect to the one in the store, discarding"
665 );
666 None
667 }
668
669 (SessionOrdering::Better, std::cmp::Ordering::Greater)
670 | (SessionOrdering::Better, std::cmp::Ordering::Equal)
671 | (SessionOrdering::Equal, std::cmp::Ordering::Greater) => {
672 info!(
674 ?index_comparison,
675 ?trust_level_comparison,
676 "Received a megolm room key that we have a worse version of, merging"
677 );
678 Some(session)
679 }
680
681 (SessionOrdering::Worse, std::cmp::Ordering::Less)
682 | (SessionOrdering::Worse, std::cmp::Ordering::Equal)
683 | (SessionOrdering::Equal, std::cmp::Ordering::Less) => {
684 warn!(
686 ?index_comparison,
687 ?trust_level_comparison,
688 "Received a megolm room key that we already have a better version \
689 of, discarding"
690 );
691 None
692 }
693
694 (SessionOrdering::Equal, std::cmp::Ordering::Equal) => {
695 info!("Received a megolm room key that we already have, discarding");
697 None
698 }
699
700 (SessionOrdering::Better, std::cmp::Ordering::Less) => {
701 info!("Upgrading a previously-received megolm session with new ratchet");
704 let result = old_session.with_ratchet(&session);
705 result.reset_backup_state();
707 Some(result)
708 }
709
710 (SessionOrdering::Worse, std::cmp::Ordering::Greater) => {
711 info!("Upgrading a previously-received megolm session with new sender data");
714 Some(session.with_ratchet(&old_session))
715 }
716 };
717
718 Ok(result)
719 }
720
721 #[cfg(test)]
722 pub(crate) async fn save_device_data(&self, devices: &[DeviceData]) -> Result<()> {
724 use types::DeviceChanges;
725
726 let changes = Changes {
727 devices: DeviceChanges { changed: devices.to_vec(), ..Default::default() },
728 ..Default::default()
729 };
730
731 self.save_changes(changes).await
732 }
733
734 pub(crate) async fn save_inbound_group_sessions(
736 &self,
737 sessions: &[InboundGroupSession],
738 ) -> Result<()> {
739 let changes = Changes { inbound_group_sessions: sessions.to_vec(), ..Default::default() };
740
741 self.save_changes(changes).await
742 }
743
744 pub(crate) async fn device_display_name(&self) -> Result<Option<String>, CryptoStoreError> {
746 Ok(self
747 .inner
748 .store
749 .get_device(self.user_id(), self.device_id())
750 .await?
751 .and_then(|d| d.display_name().map(|d| d.to_owned())))
752 }
753
754 pub(crate) async fn get_device_data(
759 &self,
760 user_id: &UserId,
761 device_id: &DeviceId,
762 ) -> Result<Option<DeviceData>> {
763 self.inner.store.get_device(user_id, device_id).await
764 }
765
766 pub(crate) async fn get_device_data_for_user_filtered(
774 &self,
775 user_id: &UserId,
776 ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
777 self.inner.store.get_user_devices(user_id).await.map(|mut d| {
778 if user_id == self.user_id() {
779 d.remove(self.device_id());
780 }
781 d
782 })
783 }
784
785 pub(crate) async fn get_device_data_for_user(
794 &self,
795 user_id: &UserId,
796 ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
797 self.inner.store.get_user_devices(user_id).await
798 }
799
800 pub(crate) async fn get_device_from_curve_key(
806 &self,
807 user_id: &UserId,
808 curve_key: Curve25519PublicKey,
809 ) -> Result<Option<Device>> {
810 self.get_user_devices(user_id)
811 .await
812 .map(|d| d.devices().find(|d| d.curve25519_key() == Some(curve_key)))
813 }
814
815 pub(crate) async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices> {
825 let devices = self.get_device_data_for_user(user_id).await?;
826
827 let own_identity = self
828 .inner
829 .store
830 .get_user_identity(self.user_id())
831 .await?
832 .and_then(|i| i.own().cloned());
833 let device_owner_identity = self.inner.store.get_user_identity(user_id).await?;
834
835 Ok(UserDevices {
836 inner: devices,
837 verification_machine: self.inner.verification_machine.clone(),
838 own_identity,
839 device_owner_identity,
840 })
841 }
842
843 pub(crate) async fn get_device(
853 &self,
854 user_id: &UserId,
855 device_id: &DeviceId,
856 ) -> Result<Option<Device>> {
857 if let Some(device_data) = self.inner.store.get_device(user_id, device_id).await? {
858 Ok(Some(self.wrap_device_data(device_data).await?))
859 } else {
860 Ok(None)
861 }
862 }
863
864 pub(crate) async fn wrap_device_data(&self, device_data: DeviceData) -> Result<Device> {
869 let own_identity = self
870 .inner
871 .store
872 .get_user_identity(self.user_id())
873 .await?
874 .and_then(|i| i.own().cloned());
875
876 let device_owner_identity =
877 self.inner.store.get_user_identity(device_data.user_id()).await?;
878
879 Ok(Device {
880 inner: device_data,
881 verification_machine: self.inner.verification_machine.clone(),
882 own_identity,
883 device_owner_identity,
884 })
885 }
886
887 pub(crate) async fn get_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
889 let own_identity = self
890 .inner
891 .store
892 .get_user_identity(self.user_id())
893 .await?
894 .and_then(as_variant!(UserIdentityData::Own));
895
896 Ok(self.inner.store.get_user_identity(user_id).await?.map(|i| {
897 UserIdentity::new(
898 self.clone(),
899 i,
900 self.inner.verification_machine.to_owned(),
901 own_identity,
902 )
903 }))
904 }
905
906 pub async fn export_secret(
915 &self,
916 secret_name: &SecretName,
917 ) -> Result<Option<String>, CryptoStoreError> {
918 Ok(match secret_name {
919 SecretName::CrossSigningMasterKey
920 | SecretName::CrossSigningUserSigningKey
921 | SecretName::CrossSigningSelfSigningKey => {
922 self.inner.identity.lock().await.export_secret(secret_name).await
923 }
924 SecretName::RecoveryKey => {
925 if let Some(key) = self.load_backup_keys().await?.decryption_key {
926 let exported = key.to_base64();
927 Some(exported)
928 } else {
929 None
930 }
931 }
932 name => {
933 warn!(secret = ?name, "Unknown secret was requested");
934 None
935 }
936 })
937 }
938
939 pub async fn export_cross_signing_keys(
947 &self,
948 ) -> Result<Option<CrossSigningKeyExport>, CryptoStoreError> {
949 let master_key = self.export_secret(&SecretName::CrossSigningMasterKey).await?;
950 let self_signing_key = self.export_secret(&SecretName::CrossSigningSelfSigningKey).await?;
951 let user_signing_key = self.export_secret(&SecretName::CrossSigningUserSigningKey).await?;
952
953 Ok(if master_key.is_none() && self_signing_key.is_none() && user_signing_key.is_none() {
954 None
955 } else {
956 Some(CrossSigningKeyExport { master_key, self_signing_key, user_signing_key })
957 })
958 }
959
960 pub async fn import_cross_signing_keys(
965 &self,
966 export: CrossSigningKeyExport,
967 ) -> Result<CrossSigningStatus, SecretImportError> {
968 if let Some(public_identity) =
969 self.get_identity(self.user_id()).await?.and_then(|i| i.own())
970 {
971 let identity = self.inner.identity.lock().await;
972
973 identity
974 .import_secrets(
975 public_identity.to_owned(),
976 export.master_key.as_deref(),
977 export.self_signing_key.as_deref(),
978 export.user_signing_key.as_deref(),
979 )
980 .await?;
981
982 let status = identity.status().await;
983
984 let diff = identity.get_public_identity_diff(&public_identity.inner).await;
985
986 let mut changes =
987 Changes { private_identity: Some(identity.clone()), ..Default::default() };
988
989 if diff.none_differ() {
990 public_identity.mark_as_verified();
991 changes.identities.changed.push(UserIdentityData::Own(public_identity.inner));
992 }
993
994 info!(?status, "Successfully imported the private cross-signing keys");
995
996 self.save_changes(changes).await?;
997 } else {
998 warn!(
999 "No public identity found while importing cross-signing keys, \
1000 a /keys/query needs to be done"
1001 );
1002 }
1003
1004 Ok(self.inner.identity.lock().await.status().await)
1005 }
1006
1007 pub async fn export_secrets_bundle(&self) -> Result<SecretsBundle, SecretsBundleExportError> {
1019 let Some(cross_signing) = self.export_cross_signing_keys().await? else {
1020 return Err(SecretsBundleExportError::MissingCrossSigningKeys);
1021 };
1022
1023 let Some(master_key) = cross_signing.master_key.clone() else {
1024 return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::Master));
1025 };
1026
1027 let Some(user_signing_key) = cross_signing.user_signing_key.clone() else {
1028 return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::UserSigning));
1029 };
1030
1031 let Some(self_signing_key) = cross_signing.self_signing_key.clone() else {
1032 return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::SelfSigning));
1033 };
1034
1035 let backup_keys = self.load_backup_keys().await?;
1036
1037 let backup = if let Some(key) = backup_keys.decryption_key {
1038 if let Some(backup_version) = backup_keys.backup_version {
1039 Some(BackupSecrets::MegolmBackupV1Curve25519AesSha2(
1040 MegolmBackupV1Curve25519AesSha2Secrets { key, backup_version },
1041 ))
1042 } else {
1043 return Err(SecretsBundleExportError::MissingBackupVersion);
1044 }
1045 } else {
1046 None
1047 };
1048
1049 Ok(SecretsBundle {
1050 cross_signing: CrossSigningSecrets { master_key, user_signing_key, self_signing_key },
1051 backup,
1052 })
1053 }
1054
1055 pub async fn import_secrets_bundle(
1068 &self,
1069 bundle: &SecretsBundle,
1070 ) -> Result<(), SecretImportError> {
1071 let mut changes = Changes::default();
1072
1073 if let Some(backup_bundle) = &bundle.backup {
1074 match backup_bundle {
1075 BackupSecrets::MegolmBackupV1Curve25519AesSha2(bundle) => {
1076 changes.backup_decryption_key = Some(bundle.key.clone());
1077 changes.backup_version = Some(bundle.backup_version.clone());
1078 }
1079 }
1080 }
1081
1082 let identity = self.inner.identity.lock().await;
1083
1084 identity
1085 .import_secrets_unchecked(
1086 Some(&bundle.cross_signing.master_key),
1087 Some(&bundle.cross_signing.self_signing_key),
1088 Some(&bundle.cross_signing.user_signing_key),
1089 )
1090 .await?;
1091
1092 let public_identity = identity.to_public_identity().await.expect(
1093 "We should be able to create a new public identity since we just imported \
1094 all the private cross-signing keys",
1095 );
1096
1097 changes.private_identity = Some(identity.clone());
1098 changes.identities.new.push(UserIdentityData::Own(public_identity));
1099
1100 Ok(self.save_changes(changes).await?)
1101 }
1102
1103 pub async fn import_secret(&self, secret: &GossippedSecret) -> Result<(), SecretImportError> {
1105 match &secret.secret_name {
1106 SecretName::CrossSigningMasterKey
1107 | SecretName::CrossSigningUserSigningKey
1108 | SecretName::CrossSigningSelfSigningKey => {
1109 if let Some(public_identity) =
1110 self.get_identity(self.user_id()).await?.and_then(|i| i.own())
1111 {
1112 let identity = self.inner.identity.lock().await;
1113
1114 identity
1115 .import_secret(
1116 public_identity,
1117 &secret.secret_name,
1118 &secret.event.content.secret,
1119 )
1120 .await?;
1121 info!(
1122 secret_name = ?secret.secret_name,
1123 "Successfully imported a private cross signing key"
1124 );
1125
1126 let changes =
1127 Changes { private_identity: Some(identity.clone()), ..Default::default() };
1128
1129 self.save_changes(changes).await?;
1130 }
1131 }
1132 SecretName::RecoveryKey => {
1133 }
1139 name => {
1140 warn!(secret = ?name, "Tried to import an unknown secret");
1141 }
1142 }
1143
1144 Ok(())
1145 }
1146
1147 pub async fn get_only_allow_trusted_devices(&self) -> Result<bool> {
1150 let value = self.get_value("only_allow_trusted_devices").await?.unwrap_or_default();
1151 Ok(value)
1152 }
1153
1154 pub async fn set_only_allow_trusted_devices(
1157 &self,
1158 block_untrusted_devices: bool,
1159 ) -> Result<()> {
1160 self.set_value("only_allow_trusted_devices", &block_untrusted_devices).await
1161 }
1162
1163 pub async fn get_value<T: DeserializeOwned>(&self, key: &str) -> Result<Option<T>> {
1165 let Some(value) = self.get_custom_value(key).await? else {
1166 return Ok(None);
1167 };
1168 let deserialized = self.deserialize_value(&value)?;
1169 Ok(Some(deserialized))
1170 }
1171
1172 pub async fn set_value(&self, key: &str, value: &impl Serialize) -> Result<()> {
1174 let serialized = self.serialize_value(value)?;
1175 self.set_custom_value(key, serialized).await?;
1176 Ok(())
1177 }
1178
1179 fn serialize_value(&self, value: &impl Serialize) -> Result<Vec<u8>> {
1180 let serialized =
1181 rmp_serde::to_vec_named(value).map_err(|x| CryptoStoreError::Backend(x.into()))?;
1182 Ok(serialized)
1183 }
1184
1185 fn deserialize_value<T: DeserializeOwned>(&self, value: &[u8]) -> Result<T> {
1186 let deserialized =
1187 rmp_serde::from_slice(value).map_err(|e| CryptoStoreError::Backend(e.into()))?;
1188 Ok(deserialized)
1189 }
1190
1191 pub fn room_keys_received_stream(
1203 &self,
1204 ) -> impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<> {
1205 self.inner.store.room_keys_received_stream()
1206 }
1207
1208 pub fn room_keys_withheld_received_stream(
1217 &self,
1218 ) -> impl Stream<Item = Vec<RoomKeyWithheldInfo>> + use<> {
1219 self.inner.store.room_keys_withheld_received_stream()
1220 }
1221
1222 pub fn user_identities_stream(&self) -> impl Stream<Item = IdentityUpdates> + use<> {
1253 let verification_machine = self.inner.verification_machine.to_owned();
1254
1255 let this = self.clone();
1256 self.inner.store.identities_stream().map(move |(own_identity, identities, _)| {
1257 let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
1258
1259 let map_identity = |(user_id, identity)| {
1260 (
1261 user_id,
1262 UserIdentity::new(
1263 this.clone(),
1264 identity,
1265 verification_machine.to_owned(),
1266 own_identity.to_owned(),
1267 ),
1268 )
1269 };
1270
1271 let new = new_identities.into_iter().map(map_identity).collect();
1272 let changed = changed_identities.into_iter().map(map_identity).collect();
1273 let unchanged = unchanged_identities.into_iter().map(map_identity).collect();
1274
1275 IdentityUpdates { new, changed, unchanged }
1276 })
1277 }
1278
1279 pub fn devices_stream(&self) -> impl Stream<Item = DeviceUpdates> + use<> {
1311 let verification_machine = self.inner.verification_machine.to_owned();
1312
1313 self.inner.store.identities_stream().map(move |(own_identity, identities, devices)| {
1314 collect_device_updates(
1315 verification_machine.to_owned(),
1316 own_identity,
1317 identities,
1318 devices,
1319 )
1320 })
1321 }
1322
1323 pub fn identities_stream_raw(
1333 &self,
1334 ) -> impl Stream<Item = (IdentityChanges, DeviceChanges)> + use<> {
1335 self.inner.store.identities_stream().map(|(_, identities, devices)| (identities, devices))
1336 }
1337
1338 pub fn create_store_lock(
1341 &self,
1342 lock_key: String,
1343 config: CrossProcessLockConfig,
1344 ) -> CrossProcessLock<LockableCryptoStore> {
1345 self.inner.store.create_store_lock(lock_key, config)
1346 }
1347
1348 pub fn secrets_stream(&self) -> impl Stream<Item = SecretsInboxItem> + use<> {
1388 self.inner.store.secrets_stream()
1389 }
1390
1391 pub fn historic_room_key_stream(&self) -> impl Stream<Item = RoomKeyBundleInfo> + use<> {
1431 self.inner.store.historic_room_key_stream()
1432 }
1433
1434 pub async fn import_room_keys(
1447 &self,
1448 exported_keys: Vec<ExportedRoomKey>,
1449 from_backup_version: Option<&str>,
1450 progress_listener: impl Fn(usize, usize),
1451 ) -> Result<RoomKeyImportResult> {
1452 let exported_keys = exported_keys.iter().filter_map(|key| {
1453 key.try_into()
1454 .map_err(|e| {
1455 warn!(
1456 sender_key = key.sender_key().to_base64(),
1457 room_id = ?key.room_id(),
1458 session_id = key.session_id(),
1459 error = ?e,
1460 "Couldn't import a room key from a file export."
1461 );
1462 })
1463 .ok()
1464 });
1465 self.import_sessions_impl(exported_keys, from_backup_version, progress_listener).await
1466 }
1467
1468 pub async fn import_exported_room_keys(
1495 &self,
1496 exported_keys: Vec<ExportedRoomKey>,
1497 progress_listener: impl Fn(usize, usize),
1498 ) -> Result<RoomKeyImportResult> {
1499 self.import_room_keys(exported_keys, None, progress_listener).await
1500 }
1501
1502 async fn import_sessions_impl(
1503 &self,
1504 sessions: impl Iterator<Item = InboundGroupSession>,
1505 from_backup_version: Option<&str>,
1506 progress_listener: impl Fn(usize, usize),
1507 ) -> Result<RoomKeyImportResult> {
1508 let sessions: Vec<_> = sessions.collect();
1509 let mut imported_sessions = Vec::new();
1510
1511 let total_count = sessions.len();
1512 let mut keys = BTreeMap::new();
1513
1514 for (i, session) in sessions.into_iter().enumerate() {
1515 if let Some(merged) = self.merge_received_group_session(session).await? {
1518 if from_backup_version.is_some() {
1519 merged.mark_as_backed_up();
1520 }
1521
1522 keys.entry(merged.room_id().to_owned())
1523 .or_insert_with(BTreeMap::new)
1524 .entry(merged.sender_key().to_base64())
1525 .or_insert_with(BTreeSet::new)
1526 .insert(merged.session_id().to_owned());
1527
1528 imported_sessions.push(merged);
1529 }
1530
1531 progress_listener(i, total_count);
1532 }
1533
1534 let imported_count = imported_sessions.len();
1535
1536 self.inner
1537 .store
1538 .save_inbound_group_sessions(imported_sessions, from_backup_version)
1539 .await?;
1540
1541 info!(total_count, imported_count, room_keys = ?keys, "Successfully imported room keys");
1542
1543 Ok(RoomKeyImportResult::new(imported_count, total_count, keys))
1544 }
1545
1546 pub(crate) fn crypto_store(&self) -> Arc<CryptoStoreWrapper> {
1547 self.inner.store.clone()
1548 }
1549
1550 pub async fn export_room_keys(
1573 &self,
1574 predicate: impl FnMut(&InboundGroupSession) -> bool,
1575 ) -> Result<Vec<ExportedRoomKey>> {
1576 let mut exported = Vec::new();
1577
1578 let mut sessions = self.get_inbound_group_sessions().await?;
1579 sessions.retain(predicate);
1580
1581 for session in sessions {
1582 let export = session.export().await;
1583 exported.push(export);
1584 }
1585
1586 Ok(exported)
1587 }
1588
1589 pub async fn export_room_keys_stream(
1624 &self,
1625 predicate: impl FnMut(&InboundGroupSession) -> bool,
1626 ) -> Result<impl Stream<Item = ExportedRoomKey>> {
1627 let sessions = self.get_inbound_group_sessions().await?;
1629 Ok(futures_util::stream::iter(sessions.into_iter().filter(predicate))
1630 .then(|session| async move { session.export().await }))
1631 }
1632
1633 pub async fn build_room_key_bundle(
1638 &self,
1639 room_id: &RoomId,
1640 ) -> std::result::Result<RoomKeyBundle, CryptoStoreError> {
1641 let sessions = self.get_inbound_group_sessions_by_room_id(room_id).await?;
1642
1643 let mut bundle = RoomKeyBundle::default();
1644 for session in sessions {
1645 if session.shared_history() {
1646 bundle.room_keys.push(session.export().await.into());
1647 } else {
1648 bundle.withheld.push(RoomKeyWithheldContent::new(
1649 session.algorithm().to_owned(),
1650 WithheldCode::HistoryNotShared,
1651 session.room_id().to_owned(),
1652 session.session_id().to_owned(),
1653 session.sender_key().to_owned(),
1654 self.device_id().to_owned(),
1655 ));
1656 }
1657 }
1658
1659 let withhelds = self.get_withheld_sessions_by_room_id(room_id).await?;
1662 for withheld in withhelds {
1663 if withheld.content.withheld_code() == WithheldCode::HistoryNotShared {
1664 bundle.withheld.push(withheld.content);
1665 }
1666 }
1667
1668 Ok(bundle)
1669 }
1670
1671 #[instrument(skip(self, bundle, progress_listener), fields(bundle_size = bundle.room_keys.len(), sender_data))]
1681 pub async fn receive_room_key_bundle(
1682 &self,
1683 bundle_info: &StoredRoomKeyBundleData,
1684 bundle: RoomKeyBundle,
1685 progress_listener: impl Fn(usize, usize),
1686 ) -> Result<(), CryptoStoreError> {
1687 let sender_data = if bundle_info.sender_data.should_recalculate() {
1688 let device = self
1689 .get_device_from_curve_key(&bundle_info.sender_user, bundle_info.sender_key)
1690 .await?;
1691
1692 device
1693 .as_ref()
1694 .map(SenderData::from_device)
1695 .unwrap_or_else(|| bundle_info.sender_data.clone())
1696 } else {
1697 bundle_info.sender_data.clone()
1698 };
1699
1700 tracing::Span::current().record("sender_data", tracing::field::debug(&sender_data));
1701
1702 let Ok(forwarder_data) = (&sender_data).try_into() else {
1706 warn!(
1707 "Not accepting a historic room key bundle due to insufficient trust in the sender"
1708 );
1709 return Ok(());
1710 };
1711
1712 self.import_room_key_bundle_sessions(
1713 bundle_info,
1714 &bundle,
1715 &forwarder_data,
1716 progress_listener,
1717 )
1718 .await?;
1719 self.import_room_key_bundle_withheld_info(bundle_info, &bundle).await?;
1720
1721 Ok(())
1722 }
1723
1724 async fn import_room_key_bundle_sessions(
1725 &self,
1726 bundle_info: &StoredRoomKeyBundleData,
1727 bundle: &RoomKeyBundle,
1728 forwarder_data: &ForwarderData,
1729 progress_listener: impl Fn(usize, usize),
1730 ) -> Result<(), CryptoStoreError> {
1731 let (good, bad): (Vec<_>, Vec<_>) = bundle.room_keys.iter().partition_map(|key| {
1732 if key.room_id != bundle_info.bundle_data.room_id {
1733 trace!("Ignoring key for incorrect room {} in bundle", key.room_id);
1734 Either::Right(key)
1735 } else {
1736 Either::Left(key)
1737 }
1738 });
1739
1740 match (bad.is_empty(), good.is_empty()) {
1741 (true, true) => {
1743 warn!("Received a completely empty room key bundle");
1744 }
1745
1746 (false, true) => {
1748 let bad_keys: Vec<_> =
1749 bad.iter().map(|&key| (&key.room_id, &key.session_id)).collect();
1750
1751 warn!(
1752 ?bad_keys,
1753 "Received a room key bundle for the wrong room, ignoring all room keys from the bundle"
1754 );
1755 }
1756
1757 (_, false) => {
1759 if !bad.is_empty() {
1762 warn!(
1763 bad_key_count = bad.len(),
1764 "The room key bundle contained some room keys \
1765 that were meant for a different room"
1766 );
1767 }
1768
1769 let keys = good.iter().filter_map(|key| {
1770 key.try_into_inbound_group_session(forwarder_data)
1771 .map_err(|e| {
1772 warn!(
1773 sender_key = ?key.sender_key().to_base64(),
1774 room_id = ?key.room_id(),
1775 session_id = key.session_id(),
1776 error = ?e,
1777 "Couldn't import a room key from a key bundle."
1778 );
1779 })
1780 .ok()
1781 });
1782
1783 self.import_sessions_impl(keys, None, progress_listener).await?;
1784 }
1785 }
1786
1787 Ok(())
1788 }
1789
1790 async fn import_room_key_bundle_withheld_info(
1791 &self,
1792 bundle_info: &StoredRoomKeyBundleData,
1793 bundle: &RoomKeyBundle,
1794 ) -> Result<(), CryptoStoreError> {
1795 let mut session_id_to_withheld_code_map = BTreeMap::new();
1796
1797 let mut changes = Changes::default();
1798 for withheld in &bundle.withheld {
1799 let (room_id, session_id) = match withheld {
1800 RoomKeyWithheldContent::MegolmV1AesSha2(c) => match (c.room_id(), c.session_id()) {
1801 (Some(room_id), Some(session_id)) => (room_id, session_id),
1802 _ => continue,
1803 },
1804 #[cfg(feature = "experimental-algorithms")]
1805 RoomKeyWithheldContent::MegolmV2AesSha2(c) => match (c.room_id(), c.session_id()) {
1806 (Some(room_id), Some(session_id)) => (room_id, session_id),
1807 _ => continue,
1808 },
1809 RoomKeyWithheldContent::Unknown(_) => continue,
1810 };
1811
1812 if room_id != bundle_info.bundle_data.room_id {
1813 trace!("Ignoring withheld info for incorrect room {} in bundle", room_id);
1814 continue;
1815 }
1816
1817 changes.withheld_session_info.entry(room_id.to_owned()).or_default().insert(
1818 session_id.to_owned(),
1819 RoomKeyWithheldEntry {
1820 sender: bundle_info.sender_user.clone(),
1821 content: withheld.to_owned(),
1822 },
1823 );
1824 session_id_to_withheld_code_map.insert(session_id, withheld.withheld_code());
1825 }
1826
1827 self.save_changes(changes).await?;
1828
1829 info!(
1830 room_id = ?bundle_info.bundle_data.room_id,
1831 ?session_id_to_withheld_code_map,
1832 "Successfully imported withheld info from room key bundle",
1833 );
1834
1835 Ok(())
1836 }
1837
1838 pub async fn store_room_pending_key_bundle(
1844 &self,
1845 room_id: &RoomId,
1846 inviter: &UserId,
1847 ) -> Result<(), CryptoStoreError> {
1848 let invite_accepted_at = MilliSecondsSinceUnixEpoch::now();
1849 self.save_changes(Changes {
1850 rooms_pending_key_bundle: HashMap::from([(
1851 room_id.to_owned(),
1852 Some(RoomPendingKeyBundleDetails {
1853 room_id: room_id.to_owned(),
1854 invite_accepted_at,
1855 inviter: inviter.to_owned(),
1856 }),
1857 )]),
1858 ..Default::default()
1859 })
1860 .await?;
1861
1862 Ok(())
1863 }
1864
1865 pub async fn clear_room_pending_key_bundle(&self, room_id: &RoomId) -> Result<()> {
1869 self.save_changes(Changes {
1870 rooms_pending_key_bundle: HashMap::from([(room_id.to_owned(), None)]),
1871 ..Default::default()
1872 })
1873 .await
1874 }
1875}
1876
1877impl Deref for Store {
1878 type Target = DynCryptoStore;
1879
1880 fn deref(&self) -> &Self::Target {
1881 self.inner.store.deref().deref()
1882 }
1883}
1884
1885#[derive(Clone, Debug)]
1887pub struct LockableCryptoStore(Arc<dyn CryptoStore<Error = CryptoStoreError>>);
1888
1889impl matrix_sdk_common::cross_process_lock::TryLock for LockableCryptoStore {
1890 type LockError = CryptoStoreError;
1891
1892 async fn try_lock(
1893 &self,
1894 lease_duration_ms: u32,
1895 key: &str,
1896 holder: &str,
1897 ) -> std::result::Result<Option<CrossProcessLockGeneration>, Self::LockError> {
1898 self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
1899 }
1900}
1901
1902#[cfg(test)]
1903mod tests {
1904 use std::{collections::BTreeMap, pin::pin};
1905
1906 use assert_matches2::{assert_let, assert_matches};
1907 use futures_util::StreamExt;
1908 use insta::{_macro_support::Content, assert_json_snapshot, internals::ContentPath};
1909 use matrix_sdk_test::async_test;
1910 use rand::RngExt;
1911 use ruma::{
1912 RoomId, device_id,
1913 events::room::{EncryptedFile, EncryptedFileHashes, V2EncryptedFileInfo},
1914 owned_device_id, owned_mxc_uri, room_id, user_id,
1915 };
1916 use serde_json::json;
1917 use vodozemac::{Ed25519Keypair, megolm::SessionKey};
1918
1919 use crate::{
1920 Account, OlmMachine,
1921 machine::test_helpers::get_machine_pair,
1922 olm::{InboundGroupSession, SenderData},
1923 store::types::{DehydratedDeviceKey, RoomKeyWithheldEntry, StoredRoomKeyBundleData},
1924 types::{
1925 EventEncryptionAlgorithm,
1926 events::{
1927 room_key_bundle::RoomKeyBundleContent,
1928 room_key_withheld::{MegolmV1AesSha2WithheldContent, RoomKeyWithheldContent},
1929 },
1930 },
1931 };
1932
1933 #[async_test]
1934 async fn test_merge_received_group_session() {
1935 let alice_account = Account::with_device_id(user_id!("@a:s.co"), device_id!("ABC"));
1936 let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("DEF")).await;
1937
1938 let room_id = room_id!("!test:localhost");
1939
1940 let megolm_signing_key = Ed25519Keypair::new();
1941 let inbound = make_inbound_group_session(&alice_account, &megolm_signing_key, room_id);
1942
1943 let mut inbound_at_index_5 =
1945 InboundGroupSession::from_export(&inbound.export_at_index(5).await).unwrap();
1946 inbound_at_index_5.sender_data = inbound.sender_data.clone();
1947 bob.store().save_inbound_group_sessions(&[inbound_at_index_5.clone()]).await.unwrap();
1948
1949 let disconnected = make_inbound_group_session(&alice_account, &megolm_signing_key, room_id);
1951 assert_eq!(bob.store().merge_received_group_session(disconnected).await.unwrap(), None);
1952
1953 let mut worse =
1955 InboundGroupSession::from_export(&inbound.export_at_index(10).await).unwrap();
1956 worse.sender_data = inbound.sender_data.clone();
1957 assert_eq!(bob.store().merge_received_group_session(worse).await.unwrap(), None);
1958
1959 let mut copy = InboundGroupSession::from_pickle(inbound_at_index_5.pickle().await).unwrap();
1961 copy.sender_data = inbound.sender_data.clone();
1962 assert_eq!(bob.store().merge_received_group_session(copy).await.unwrap(), None);
1963
1964 let mut better =
1966 InboundGroupSession::from_export(&inbound.export_at_index(0).await).unwrap();
1967 better.sender_data = inbound.sender_data.clone();
1968 assert_let!(Some(update) = bob.store().merge_received_group_session(better).await.unwrap());
1969 assert_eq!(update.first_known_index(), 0);
1970
1971 {
1973 let mut worse_ratchet_better_trust =
1974 InboundGroupSession::from_export(&inbound.export_at_index(10).await).unwrap();
1975 let updated_sender_data = SenderData::sender_verified(
1976 alice_account.user_id(),
1977 alice_account.device_id(),
1978 Ed25519Keypair::new().public_key(),
1979 );
1980 worse_ratchet_better_trust.sender_data = updated_sender_data.clone();
1981 assert_let!(
1982 Some(update) = bob
1983 .store()
1984 .merge_received_group_session(worse_ratchet_better_trust)
1985 .await
1986 .unwrap()
1987 );
1988 assert_eq!(update.sender_data, updated_sender_data);
1989 assert_eq!(update.first_known_index(), 5);
1990 assert_eq!(
1991 update.export_at_index(0).await.session_key.to_bytes(),
1992 inbound.export_at_index(5).await.session_key.to_bytes()
1993 );
1994 }
1995
1996 {
1998 let mut better_ratchet_worse_trust =
1999 InboundGroupSession::from_export(&inbound.export_at_index(0).await).unwrap();
2000 let updated_sender_data = SenderData::unknown();
2001 better_ratchet_worse_trust.sender_data = updated_sender_data.clone();
2002 assert_let!(
2003 Some(update) = bob
2004 .store()
2005 .merge_received_group_session(better_ratchet_worse_trust)
2006 .await
2007 .unwrap()
2008 );
2009 assert_eq!(update.sender_data, inbound.sender_data);
2010 assert_eq!(update.first_known_index(), 0);
2011 assert_eq!(
2012 update.export_at_index(0).await.session_key.to_bytes(),
2013 inbound.export_at_index(0).await.session_key.to_bytes()
2014 );
2015 }
2016 }
2017
2018 fn make_inbound_group_session(
2021 sender_account: &Account,
2022 signing_key: &Ed25519Keypair,
2023 room_id: &RoomId,
2024 ) -> InboundGroupSession {
2025 InboundGroupSession::new(
2026 sender_account.identity_keys.curve25519,
2027 sender_account.identity_keys.ed25519,
2028 room_id,
2029 &make_session_key(signing_key),
2030 SenderData::device_info(crate::types::DeviceKeys::new(
2031 sender_account.user_id().to_owned(),
2032 sender_account.device_id().to_owned(),
2033 vec![],
2034 BTreeMap::new(),
2035 crate::types::Signatures::new(),
2036 )),
2037 None,
2038 EventEncryptionAlgorithm::MegolmV1AesSha2,
2039 Some(ruma::events::room::history_visibility::HistoryVisibility::Shared),
2040 true,
2041 )
2042 .unwrap()
2043 }
2044
2045 fn make_session_key(signing_key: &Ed25519Keypair) -> SessionKey {
2048 let mut session_key_bytes = vec![0u8; 229];
2053 session_key_bytes[0] = 2;
2055 rand::rng().fill(&mut session_key_bytes[5..133]);
2058 session_key_bytes[133..165].copy_from_slice(signing_key.public_key().as_bytes());
2060 let sig = signing_key.sign(&session_key_bytes[0..165]);
2062 session_key_bytes[165..229].copy_from_slice(&sig.to_bytes());
2063
2064 SessionKey::from_bytes(&session_key_bytes).unwrap()
2065 }
2066
2067 #[async_test]
2068 async fn test_import_room_keys_notifies_stream() {
2069 use futures_util::FutureExt;
2070
2071 let (alice, bob, _) =
2072 get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2073
2074 let room1_id = room_id!("!room1:localhost");
2075 alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2076 let exported_sessions = alice.store().export_room_keys(|_| true).await.unwrap();
2077
2078 let mut room_keys_received_stream = Box::pin(bob.store().room_keys_received_stream());
2079 bob.store().import_room_keys(exported_sessions, None, |_, _| {}).await.unwrap();
2080
2081 let room_keys = room_keys_received_stream
2082 .next()
2083 .now_or_never()
2084 .flatten()
2085 .expect("We should have received an update of room key infos")
2086 .unwrap();
2087 assert_eq!(room_keys.len(), 1);
2088 assert_eq!(room_keys[0].room_id, "!room1:localhost");
2089 }
2090
2091 #[async_test]
2092 async fn test_export_room_keys_provides_selected_keys() {
2093 let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2095 let room1_id = room_id!("!room1:localhost");
2096 let room2_id = room_id!("!room2:localhost");
2097 let room3_id = room_id!("!room3:localhost");
2098 alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2099 alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2100 alice.create_outbound_group_session_with_defaults_test_helper(room3_id).await.unwrap();
2101
2102 let keys = alice
2104 .store()
2105 .export_room_keys(|s| s.room_id() == room2_id || s.room_id() == room3_id)
2106 .await
2107 .unwrap();
2108
2109 assert_eq!(keys.len(), 2);
2111 assert_eq!(keys[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2112 assert_eq!(keys[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2113 assert_eq!(keys[0].room_id, "!room2:localhost");
2114 assert_eq!(keys[1].room_id, "!room3:localhost");
2115 assert_eq!(keys[0].session_key.to_base64().len(), 220);
2116 assert_eq!(keys[1].session_key.to_base64().len(), 220);
2117 }
2118
2119 #[async_test]
2120 async fn test_export_room_keys_stream_can_provide_all_keys() {
2121 let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2123 let room1_id = room_id!("!room1:localhost");
2124 let room2_id = room_id!("!room2:localhost");
2125 alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2126 alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2127
2128 let mut keys = pin!(alice.store().export_room_keys_stream(|_| true).await.unwrap());
2130
2131 let mut collected = vec![];
2133 while let Some(key) = keys.next().await {
2134 collected.push(key);
2135 }
2136
2137 assert_eq!(collected.len(), 2);
2139 assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2140 assert_eq!(collected[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2141 assert_eq!(collected[0].room_id, "!room1:localhost");
2142 assert_eq!(collected[1].room_id, "!room2:localhost");
2143 assert_eq!(collected[0].session_key.to_base64().len(), 220);
2144 assert_eq!(collected[1].session_key.to_base64().len(), 220);
2145 }
2146
2147 #[async_test]
2148 async fn test_export_room_keys_stream_can_provide_a_subset_of_keys() {
2149 let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2151 let room1_id = room_id!("!room1:localhost");
2152 let room2_id = room_id!("!room2:localhost");
2153 alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2154 alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2155
2156 let mut keys =
2158 pin!(alice.store().export_room_keys_stream(|s| s.room_id() == room1_id).await.unwrap());
2159
2160 let mut collected = vec![];
2162 while let Some(key) = keys.next().await {
2163 collected.push(key);
2164 }
2165
2166 assert_eq!(collected.len(), 1);
2168 assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2169 assert_eq!(collected[0].room_id, "!room1:localhost");
2170 assert_eq!(collected[0].session_key.to_base64().len(), 220);
2171 }
2172
2173 #[async_test]
2174 async fn test_export_secrets_bundle() {
2175 let user_id = user_id!("@alice:example.com");
2176 let (first, second, _) = get_machine_pair(user_id, user_id, false).await;
2177
2178 let _ = first
2179 .bootstrap_cross_signing(false)
2180 .await
2181 .expect("We should be able to bootstrap cross-signing");
2182
2183 let bundle = first.store().export_secrets_bundle().await.expect(
2184 "We should be able to export the secrets bundle, now that we \
2185 have the cross-signing keys",
2186 );
2187
2188 assert!(bundle.backup.is_none(), "The bundle should not contain a backup key");
2189
2190 second
2191 .store()
2192 .import_secrets_bundle(&bundle)
2193 .await
2194 .expect("We should be able to import the secrets bundle");
2195
2196 let status = second.cross_signing_status().await;
2197 let identity = second.get_identity(user_id, None).await.unwrap().unwrap().own().unwrap();
2198
2199 assert!(identity.is_verified(), "The public identity should be marked as verified.");
2200
2201 assert!(status.is_complete(), "We should have imported all the cross-signing keys");
2202 }
2203
2204 #[async_test]
2205 async fn test_create_dehydrated_device_key() {
2206 let pickle_key = DehydratedDeviceKey::new();
2207
2208 let to_vec = pickle_key.inner.to_vec();
2209 let pickle_key_from_slice = DehydratedDeviceKey::from_slice(to_vec.as_slice())
2210 .expect("Should be able to create a dehydrated device key from slice");
2211
2212 assert_eq!(pickle_key_from_slice.to_base64(), pickle_key.to_base64());
2213 }
2214
2215 #[async_test]
2216 async fn test_create_dehydrated_errors() {
2217 let too_small = [0u8; 22];
2218 let pickle_key = DehydratedDeviceKey::from_slice(&too_small);
2219
2220 assert!(pickle_key.is_err());
2221
2222 let too_big = [0u8; 40];
2223 let pickle_key = DehydratedDeviceKey::from_slice(&too_big);
2224
2225 assert!(pickle_key.is_err());
2226 }
2227
2228 #[async_test]
2229 async fn test_build_room_key_bundle() {
2230 let alice = OlmMachine::new(user_id!("@a:s.co"), device_id!("ALICE")).await;
2233 let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("BOB")).await;
2234
2235 let room1_id = room_id!("!room1:localhost");
2236 let room2_id = room_id!("!room2:localhost");
2237
2238 let session_key1 = "AgAAAAC2XHVzsMBKs4QCRElJ92CJKyGtknCSC8HY7cQ7UYwndMKLQAejXLh5UA0l6s736mgctcUMNvELScUWrObdflrHo+vth/gWreXOaCnaSxmyjjKErQwyIYTkUfqbHy40RJfEesLwnN23on9XAkch/iy8R2+Jz7B8zfG01f2Ow2SxPQFnAndcO1ZSD2GmXgedy6n4B20MWI1jGP2wiexOWbFSya8DO/VxC9m5+/mF+WwYqdpKn9g4Y05Yw4uz7cdjTc3rXm7xK+8E7hI//5QD1nHPvuKYbjjM9u2JSL+Bzp61Cw";
2243 let session_key2 = "AgAAAAC1BXreFTUQQSBGekTEuYxhdytRKyv4JgDGcG+VOBYdPNGgs807SdibCGJky4lJ3I+7ZDGHoUzZPZP/4ogGu4kxni0PWdtWuN7+5zsuamgoFF/BkaGeUUGv6kgIkx8pyPpM5SASTUEP9bN2loDSpUPYwfiIqz74DgC4WQ4435sTBctYvKz8n+TDJwdLXpyT6zKljuqADAioud+s/iqx9LYn9HpbBfezZcvbg67GtE113pLrvde3IcPI5s6dNHK2onGO2B2eoaobcen18bbEDnlUGPeIivArLya7Da6us14jBQ";
2244 let session_key3 = "AgAAAAAM9KFsliaUUhGSXgwOzM5UemjkNH4n8NHgvC/y8hhw13zTF+ooGD4uIYEXYX630oNvQm/EvgZo+dkoc0re+vsqsx4sQeNODdSjcBsWOa0oDF+irQn9oYoLUDPI1IBtY1rX+FV99Zm/xnG7uFOX7aTVlko2GSdejy1w9mfobmfxu5aUc04A9zaKJP1pOthZvRAlhpymGYHgsDtWPrrjyc/yypMflE4kIUEEEtu1kT6mrAmcl615XYRAHYK9G2+fZsGvokwzbkl4nulGwcZMpQEoM0nD2o3GWgX81HW3nGfKBg";
2245 let session_key4 = "AgAAAAA4Kkesxq2h4v9PLD6Sm3Smxspz1PXTqytQPCMQMkkrHNmzV2bHlJ+6/Al9cu8vh1Oj69AK0WUAeJOJuaiskEeg/PI3P03+UYLeC379RzgqwSHdBgdQ41G2vD6zpgmE/8vYToe+qpCZACtPOswZxyqxHH+T/Iq0nv13JmlFGIeA6fEPfr5Y28B49viG74Fs9rxV9EH5PfjbuPM/p+Sz5obShuaBPKQBX1jT913nEXPoIJ06exNZGr0285nw/LgVvNlmWmbqNnbzO2cNZjQWA+xZYz5FSfyCxwqEBbEdUCuRCQ";
2246
2247 let sessions = [
2248 create_inbound_group_session_with_visibility(
2249 &alice,
2250 room1_id,
2251 &SessionKey::from_base64(session_key1).unwrap(),
2252 true,
2253 ),
2254 create_inbound_group_session_with_visibility(
2255 &alice,
2256 room1_id,
2257 &SessionKey::from_base64(session_key2).unwrap(),
2258 true,
2259 ),
2260 create_inbound_group_session_with_visibility(
2261 &alice,
2262 room1_id,
2263 &SessionKey::from_base64(session_key3).unwrap(),
2264 false,
2265 ),
2266 create_inbound_group_session_with_visibility(
2267 &alice,
2268 room2_id,
2269 &SessionKey::from_base64(session_key4).unwrap(),
2270 true,
2271 ),
2272 ];
2273 bob.store().save_inbound_group_sessions(&sessions).await.unwrap();
2274
2275 let mut bundle = bob.store().build_room_key_bundle(room1_id).await.unwrap();
2277
2278 bundle.room_keys.sort_by_key(|session| session.session_id.clone());
2282
2283 let algorithm = if cfg!(feature = "experimental-algorithms") {
2285 "m.megolm.v2.aes-sha2"
2286 } else {
2287 "m.megolm.v1.aes-sha2"
2288 };
2289 let map_algorithm = move |value: Content, _path: ContentPath<'_>| {
2290 assert_eq!(value.as_str().unwrap(), algorithm);
2291 "[algorithm]"
2292 };
2293
2294 let alice_curve_key = alice.identity_keys().curve25519.to_base64();
2296 let map_alice_curve_key = move |value: Content, _path: ContentPath<'_>| {
2297 assert_eq!(value.as_str().unwrap(), alice_curve_key);
2298 "[alice curve key]"
2299 };
2300 let alice_ed25519_key = alice.identity_keys().ed25519.to_base64();
2301 let map_alice_ed25519_key = move |value: Content, _path: ContentPath<'_>| {
2302 assert_eq!(value.as_str().unwrap(), alice_ed25519_key);
2303 "[alice ed25519 key]"
2304 };
2305
2306 insta::with_settings!({ sort_maps => true }, {
2307 assert_json_snapshot!(bundle, {
2308 ".withheld[].algorithm" => insta::dynamic_redaction(map_algorithm),
2309 ".room_keys[].algorithm" => insta::dynamic_redaction(map_algorithm),
2310 ".room_keys[].sender_key" => insta::dynamic_redaction(map_alice_curve_key.clone()),
2311 ".withheld[].sender_key" => insta::dynamic_redaction(map_alice_curve_key),
2312 ".room_keys[].sender_claimed_keys.ed25519" => insta::dynamic_redaction(map_alice_ed25519_key),
2313 });
2314 });
2315 }
2316
2317 #[async_test]
2318 async fn test_receive_room_key_bundle() {
2319 let alice = OlmMachine::new(user_id!("@a:s.co"), device_id!("ALICE")).await;
2320 let alice_key = alice.identity_keys().curve25519;
2321 let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("BOB")).await;
2322
2323 let room_id = room_id!("!room1:localhost");
2324
2325 let session_key1 = "AgAAAAC2XHVzsMBKs4QCRElJ92CJKyGtknCSC8HY7cQ7UYwndMKLQAejXLh5UA0l6s736mgctcUMNvELScUWrObdflrHo+vth/gWreXOaCnaSxmyjjKErQwyIYTkUfqbHy40RJfEesLwnN23on9XAkch/iy8R2+Jz7B8zfG01f2Ow2SxPQFnAndcO1ZSD2GmXgedy6n4B20MWI1jGP2wiexOWbFSya8DO/VxC9m5+/mF+WwYqdpKn9g4Y05Yw4uz7cdjTc3rXm7xK+8E7hI//5QD1nHPvuKYbjjM9u2JSL+Bzp61Cw";
2326 let session_key2 = "AgAAAAC1BXreFTUQQSBGekTEuYxhdytRKyv4JgDGcG+VOBYdPNGgs807SdibCGJky4lJ3I+7ZDGHoUzZPZP/4ogGu4kxni0PWdtWuN7+5zsuamgoFF/BkaGeUUGv6kgIkx8pyPpM5SASTUEP9bN2loDSpUPYwfiIqz74DgC4WQ4435sTBctYvKz8n+TDJwdLXpyT6zKljuqADAioud+s/iqx9LYn9HpbBfezZcvbg67GtE113pLrvde3IcPI5s6dNHK2onGO2B2eoaobcen18bbEDnlUGPeIivArLya7Da6us14jBQ";
2327
2328 let sessions = [
2329 create_inbound_group_session_with_visibility(
2330 &alice,
2331 room_id,
2332 &SessionKey::from_base64(session_key1).unwrap(),
2333 true,
2334 ),
2335 create_inbound_group_session_with_visibility(
2336 &alice,
2337 room_id,
2338 &SessionKey::from_base64(session_key2).unwrap(),
2339 false,
2340 ),
2341 ];
2342
2343 alice.store().save_inbound_group_sessions(&sessions).await.unwrap();
2344 let bundle = alice.store().build_room_key_bundle(room_id).await.unwrap();
2345
2346 bob.store()
2347 .receive_room_key_bundle(
2348 &StoredRoomKeyBundleData {
2349 sender_user: alice.user_id().to_owned(),
2350 sender_key: alice_key,
2351 sender_data: SenderData::sender_verified(
2352 alice.user_id(),
2353 device_id!("ALICE"),
2354 alice.identity_keys().ed25519,
2355 ),
2356
2357 bundle_data: RoomKeyBundleContent {
2358 room_id: room_id.to_owned(),
2359 file: EncryptedFile::new(
2362 owned_mxc_uri!("mxc://example.com/0"),
2363 V2EncryptedFileInfo::encode([0; 32], [0; 16]).into(),
2364 EncryptedFileHashes::with_sha256([0; 32]),
2365 ),
2366 },
2367 },
2368 bundle,
2369 |_, _| {},
2370 )
2371 .await
2372 .unwrap();
2373
2374 let imported_sessions =
2376 bob.store().get_inbound_group_sessions_by_room_id(room_id).await.unwrap();
2377
2378 assert_eq!(imported_sessions.len(), 1);
2379 assert_eq!(imported_sessions[0].room_id(), room_id);
2380
2381 assert_eq!(
2383 imported_sessions[0]
2384 .forwarder_data
2385 .as_ref()
2386 .expect("Session should contain forwarder data.")
2387 .user_id(),
2388 alice.user_id()
2389 );
2390
2391 assert_matches!(
2392 bob.store()
2393 .get_withheld_info(room_id, sessions[1].session_id())
2394 .await
2395 .unwrap()
2396 .expect("Withheld info should be present in the store."),
2397 RoomKeyWithheldEntry {
2398 #[cfg(not(feature = "experimental-algorithms"))]
2399 content: RoomKeyWithheldContent::MegolmV1AesSha2(
2400 MegolmV1AesSha2WithheldContent::HistoryNotShared(_)
2401 ),
2402 #[cfg(feature = "experimental-algorithms")]
2403 content: RoomKeyWithheldContent::MegolmV2AesSha2(
2404 MegolmV1AesSha2WithheldContent::HistoryNotShared(_)
2405 ),
2406 ..
2407 }
2408 );
2409 }
2410
2411 #[async_test]
2417 async fn test_deserialize_room_key_withheld_entry_from_to_device_event() {
2418 let entry: RoomKeyWithheldEntry = serde_json::from_value(json!(
2419 {
2420 "content": {
2421 "algorithm": "m.megolm.v1.aes-sha2",
2422 "code": "m.unauthorised",
2423 "from_device": "ALICE",
2424 "reason": "You are not authorised to read the message.",
2425 "room_id": "!roomid:s.co",
2426 "sender_key": "7hIcOrEroXYdzjtCBvBjUiqvT0Me7g+ymeXqoc65RS0",
2427 "session_id": "session123"
2428 },
2429 "sender": "@alice:s.co",
2430 "type": "m.room_key.withheld"
2431 }
2432 ))
2433 .unwrap();
2434
2435 assert_matches!(
2436 entry,
2437 RoomKeyWithheldEntry {
2438 sender,
2439 content: RoomKeyWithheldContent::MegolmV1AesSha2(
2440 MegolmV1AesSha2WithheldContent::Unauthorised(withheld_content,)
2441 ),
2442 }
2443 );
2444
2445 assert_eq!(sender, "@alice:s.co");
2446 assert_eq!(withheld_content.room_id, "!roomid:s.co");
2447 assert_eq!(withheld_content.session_id, "session123");
2448 assert_eq!(
2449 withheld_content.sender_key.to_base64(),
2450 "7hIcOrEroXYdzjtCBvBjUiqvT0Me7g+ymeXqoc65RS0"
2451 );
2452 assert_eq!(withheld_content.from_device, Some(owned_device_id!("ALICE")));
2453 }
2454
2455 fn create_inbound_group_session_with_visibility(
2466 olm_machine: &OlmMachine,
2467 room_id: &RoomId,
2468 session_key: &SessionKey,
2469 shared_history: bool,
2470 ) -> InboundGroupSession {
2471 let identity_keys = &olm_machine.store().static_account().identity_keys;
2472 InboundGroupSession::new(
2473 identity_keys.curve25519,
2474 identity_keys.ed25519,
2475 room_id,
2476 session_key,
2477 SenderData::unknown(),
2478 None,
2479 #[cfg(not(feature = "experimental-algorithms"))]
2480 EventEncryptionAlgorithm::MegolmV1AesSha2,
2481 #[cfg(feature = "experimental-algorithms")]
2482 EventEncryptionAlgorithm::MegolmV2AesSha2,
2483 None,
2484 shared_history,
2485 )
2486 .unwrap()
2487 }
2488}