1use std::{
42 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
43 fmt::Debug,
44 ops::Deref,
45 pin::pin,
46 sync::{Arc, atomic::Ordering},
47 time::Duration,
48};
49
50use as_variant::as_variant;
51use futures_core::Stream;
52use futures_util::StreamExt;
53use itertools::{Either, Itertools};
54use ruma::{
55 DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, RoomId, UserId,
56 encryption::KeyUsage, events::secret::request::SecretName,
57};
58use serde::{Serialize, de::DeserializeOwned};
59use thiserror::Error;
60use tokio::sync::{Mutex, Notify, OwnedRwLockWriteGuard, RwLock};
61use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
62use tracing::{info, instrument, trace, warn};
63use types::{RoomKeyBundleInfo, StoredRoomKeyBundleData};
64use vodozemac::{Curve25519PublicKey, megolm::SessionOrdering};
65
66use self::types::{
67 Changes, CrossSigningKeyExport, DeviceChanges, DeviceUpdates, IdentityChanges, IdentityUpdates,
68 PendingChanges, RoomKeyInfo, RoomKeyWithheldInfo, RoomPendingKeyBundleDetails,
69 UserKeyQueryResult,
70};
71use crate::{
72 CrossSigningStatus, OwnUserIdentityData, RoomKeyImportResult,
73 gossiping::GossippedSecret,
74 identities::{Device, DeviceData, UserDevices, UserIdentityData, user::UserIdentity},
75 olm::{
76 Account, ExportedRoomKey, ForwarderData, InboundGroupSession, PrivateCrossSigningIdentity,
77 SenderData, Session, StaticAccountData,
78 },
79 store::types::RoomKeyWithheldEntry,
80 types::{
81 BackupSecrets, CrossSigningSecrets, MegolmBackupV1Curve25519AesSha2Secrets, RoomKeyExport,
82 SecretsBundle,
83 },
84 verification::VerificationMachine,
85};
86#[cfg(doc)]
87use crate::{backups::BackupMachine, identities::OwnUserIdentity};
88
89pub mod caches;
90mod crypto_store_wrapper;
91mod error;
92mod memorystore;
93mod traits;
94pub mod types;
95
96#[cfg(any(test, feature = "testing"))]
97#[macro_use]
98#[allow(missing_docs)]
99pub mod integration_tests;
100
101pub(crate) use crypto_store_wrapper::CryptoStoreWrapper;
102pub use error::{CryptoStoreError, Result};
103use matrix_sdk_common::{
104 cross_process_lock::{CrossProcessLock, CrossProcessLockConfig, CrossProcessLockGeneration},
105 deserialized_responses::WithheldCode,
106 timeout::timeout,
107};
108pub use memorystore::MemoryStore;
109pub use traits::{CryptoStore, DynCryptoStore, IntoCryptoStore};
110
111use self::caches::{SequenceNumber, StoreCache, StoreCacheGuard, UsersForKeyQuery};
112use crate::types::{
113 events::room_key_withheld::RoomKeyWithheldContent, room_history::RoomKeyBundle,
114};
115pub use crate::{
116 dehydrated_devices::DehydrationError,
117 gossiping::{GossipRequest, SecretInfo},
118};
119
120#[derive(Debug, Clone)]
127pub struct Store {
128 inner: Arc<StoreInner>,
129}
130
131#[derive(Debug, Default)]
132pub(crate) struct KeyQueryManager {
133 users_for_key_query: Mutex<UsersForKeyQuery>,
135
136 users_for_key_query_notify: Notify,
138}
139
140impl KeyQueryManager {
141 pub async fn synced<'a>(&'a self, cache: &'a StoreCache) -> Result<SyncedKeyQueryManager<'a>> {
142 self.ensure_sync_tracked_users(cache).await?;
143 Ok(SyncedKeyQueryManager { cache, manager: self })
144 }
145
146 async fn ensure_sync_tracked_users(&self, cache: &StoreCache) -> Result<()> {
153 let loaded = cache.loaded_tracked_users.read().await;
155 if *loaded {
156 return Ok(());
157 }
158
159 drop(loaded);
161 let mut loaded = cache.loaded_tracked_users.write().await;
162
163 if *loaded {
167 return Ok(());
168 }
169
170 let tracked_users = cache.store.load_tracked_users().await?;
171
172 let mut query_users_lock = self.users_for_key_query.lock().await;
173 let mut tracked_users_cache = cache.tracked_users.write();
174 for user in tracked_users {
175 tracked_users_cache.insert(user.user_id.to_owned());
176
177 if user.dirty {
178 query_users_lock.insert_user(&user.user_id);
179 }
180 }
181
182 *loaded = true;
183
184 Ok(())
185 }
186
187 pub async fn wait_if_user_key_query_pending(
197 &self,
198 cache: StoreCacheGuard,
199 timeout_duration: Duration,
200 user: &UserId,
201 ) -> Result<UserKeyQueryResult> {
202 {
203 self.ensure_sync_tracked_users(&cache).await?;
206 drop(cache);
207 }
208
209 let mut users_for_key_query = self.users_for_key_query.lock().await;
210 let Some(waiter) = users_for_key_query.maybe_register_waiting_task(user) else {
211 return Ok(UserKeyQueryResult::WasNotPending);
212 };
213
214 let wait_for_completion = async {
215 while !waiter.completed.load(Ordering::Relaxed) {
216 let mut notified = pin!(self.users_for_key_query_notify.notified());
220 notified.as_mut().enable();
221 drop(users_for_key_query);
222
223 notified.await;
225
226 users_for_key_query = self.users_for_key_query.lock().await;
230 }
231 };
232
233 match timeout(Box::pin(wait_for_completion), timeout_duration).await {
234 Err(_) => {
235 warn!(
236 user_id = ?user,
237 "The user has a pending `/keys/query` request which did \
238 not finish yet, some devices might be missing."
239 );
240
241 Ok(UserKeyQueryResult::TimeoutExpired)
242 }
243 _ => Ok(UserKeyQueryResult::WasPending),
244 }
245 }
246}
247
248pub(crate) struct SyncedKeyQueryManager<'a> {
249 cache: &'a StoreCache,
250 manager: &'a KeyQueryManager,
251}
252
253impl SyncedKeyQueryManager<'_> {
254 pub async fn update_tracked_users(&self, users: impl Iterator<Item = &UserId>) -> Result<()> {
259 let mut store_updates = Vec::new();
260 let mut key_query_lock = self.manager.users_for_key_query.lock().await;
261
262 {
263 let mut tracked_users = self.cache.tracked_users.write();
264 for user_id in users {
265 if tracked_users.insert(user_id.to_owned()) {
266 key_query_lock.insert_user(user_id);
267 store_updates.push((user_id, true))
268 }
269 }
270 }
271
272 self.cache.store.save_tracked_users(&store_updates).await
273 }
274
275 pub async fn mark_tracked_users_as_changed(
282 &self,
283 users: impl Iterator<Item = &UserId>,
284 ) -> Result<()> {
285 let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
286 let mut key_query_lock = self.manager.users_for_key_query.lock().await;
287
288 {
289 let tracked_users = &self.cache.tracked_users.read();
290 for user_id in users {
291 if tracked_users.contains(user_id) {
292 key_query_lock.insert_user(user_id);
293 store_updates.push((user_id, true));
294 }
295 }
296 }
297
298 self.cache.store.save_tracked_users(&store_updates).await
299 }
300
301 pub async fn mark_tracked_users_as_up_to_date(
307 &self,
308 users: impl Iterator<Item = &UserId>,
309 sequence_number: SequenceNumber,
310 ) -> Result<()> {
311 let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
312 let mut key_query_lock = self.manager.users_for_key_query.lock().await;
313
314 {
315 let tracked_users = self.cache.tracked_users.read();
316 for user_id in users {
317 if tracked_users.contains(user_id) {
318 let clean = key_query_lock.maybe_remove_user(user_id, sequence_number);
319 store_updates.push((user_id, !clean));
320 }
321 }
322 }
323
324 self.cache.store.save_tracked_users(&store_updates).await?;
325 self.manager.users_for_key_query_notify.notify_waiters();
327
328 Ok(())
329 }
330
331 pub async fn users_for_key_query(&self) -> (HashSet<OwnedUserId>, SequenceNumber) {
343 self.manager.users_for_key_query.lock().await.users_for_key_query()
344 }
345
346 pub fn tracked_users(&self) -> HashSet<OwnedUserId> {
348 self.cache.tracked_users.read().iter().cloned().collect()
349 }
350
351 pub async fn mark_user_as_changed(&self, user: &UserId) -> Result<()> {
357 self.manager.users_for_key_query.lock().await.insert_user(user);
358 self.cache.tracked_users.write().insert(user.to_owned());
359
360 self.cache.store.save_tracked_users(&[(user, true)]).await
361 }
362}
363
364fn collect_device_updates(
370 verification_machine: VerificationMachine,
371 own_identity: Option<OwnUserIdentityData>,
372 identities: IdentityChanges,
373 devices: DeviceChanges,
374) -> DeviceUpdates {
375 let mut new: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
376 let mut changed: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
377
378 let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
379
380 let map_device = |device: DeviceData| {
381 let device_owner_identity = new_identities
382 .get(device.user_id())
383 .or_else(|| changed_identities.get(device.user_id()))
384 .or_else(|| unchanged_identities.get(device.user_id()))
385 .cloned();
386
387 Device {
388 inner: device,
389 verification_machine: verification_machine.to_owned(),
390 own_identity: own_identity.to_owned(),
391 device_owner_identity,
392 }
393 };
394
395 for device in devices.new {
396 let device = map_device(device);
397
398 new.entry(device.user_id().to_owned())
399 .or_default()
400 .insert(device.device_id().to_owned(), device);
401 }
402
403 for device in devices.changed {
404 let device = map_device(device);
405
406 changed
407 .entry(device.user_id().to_owned())
408 .or_default()
409 .insert(device.device_id().to_owned(), device.to_owned());
410 }
411
412 DeviceUpdates { new, changed }
413}
414
415#[allow(missing_debug_implementations)]
417pub struct StoreTransaction {
418 store: Store,
419 changes: PendingChanges,
420 cache: OwnedRwLockWriteGuard<StoreCache>,
422}
423
424impl StoreTransaction {
425 async fn new(store: Store) -> Self {
427 let cache = store.inner.cache.clone();
428
429 Self { store, changes: PendingChanges::default(), cache: cache.clone().write_owned().await }
430 }
431
432 pub(crate) fn cache(&self) -> &StoreCache {
433 &self.cache
434 }
435
436 pub fn store(&self) -> &Store {
438 &self.store
439 }
440
441 pub async fn account(&mut self) -> Result<&mut Account> {
448 if self.changes.account.is_none() {
449 let _ = self.cache.account().await?;
451 self.changes.account = self.cache.account.lock().await.take();
452 }
453 Ok(self.changes.account.as_mut().unwrap())
454 }
455
456 pub async fn commit(self) -> Result<()> {
459 if self.changes.is_empty() {
460 return Ok(());
461 }
462
463 let account = self.changes.account.as_ref().map(|acc| acc.deep_clone());
465
466 self.store.save_pending_changes(self.changes).await?;
467
468 if let Some(account) = account {
470 *self.cache.account.lock().await = Some(account);
471 }
472
473 Ok(())
474 }
475}
476
477#[derive(Debug)]
478struct StoreInner {
479 identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
480 store: Arc<CryptoStoreWrapper>,
481
482 cache: Arc<RwLock<StoreCache>>,
486
487 verification_machine: VerificationMachine,
488
489 static_account: StaticAccountData,
492}
493
494#[derive(Debug, Error)]
497pub enum SecretImportError {
498 #[error("Error while importing {name}: {error}")]
500 Key {
501 name: SecretName,
503 error: vodozemac::KeyError,
505 },
506 #[error(
509 "Error while importing {name}: The public key of the imported private \
510 key doesn't match the public key that was uploaded to the server"
511 )]
512 MismatchedPublicKeys {
513 name: SecretName,
515 },
516 #[error(transparent)]
518 Store(#[from] CryptoStoreError),
519}
520
521#[derive(Debug, Error)]
526pub enum SecretsBundleExportError {
527 #[error(transparent)]
529 Store(#[from] CryptoStoreError),
530 #[error("The store is missing one or multiple cross-signing keys")]
532 MissingCrossSigningKey(KeyUsage),
533 #[error("The store doesn't contain any cross-signing keys")]
535 MissingCrossSigningKeys,
536 #[error("The store contains a backup key, but no backup version")]
539 MissingBackupVersion,
540}
541
542impl Store {
543 pub(crate) fn new(
545 account: StaticAccountData,
546 identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
547 store: Arc<CryptoStoreWrapper>,
548 verification_machine: VerificationMachine,
549 ) -> Self {
550 Self {
551 inner: Arc::new(StoreInner {
552 static_account: account,
553 identity,
554 store: store.clone(),
555 verification_machine,
556 cache: Arc::new(RwLock::new(StoreCache {
557 store,
558 tracked_users: Default::default(),
559 loaded_tracked_users: Default::default(),
560 account: Default::default(),
561 })),
562 }),
563 }
564 }
565
566 pub(crate) fn user_id(&self) -> &UserId {
568 &self.inner.static_account.user_id
569 }
570
571 pub(crate) fn device_id(&self) -> &DeviceId {
573 self.inner.verification_machine.own_device_id()
574 }
575
576 pub(crate) fn static_account(&self) -> &StaticAccountData {
578 &self.inner.static_account
579 }
580
581 pub(crate) async fn cache(&self) -> Result<StoreCacheGuard> {
582 Ok(StoreCacheGuard { cache: self.inner.cache.clone().read_owned().await })
587 }
588
589 pub(crate) async fn transaction(&self) -> StoreTransaction {
590 StoreTransaction::new(self.clone()).await
591 }
592
593 pub(crate) async fn with_transaction<
596 T,
597 Fut: Future<Output = Result<(StoreTransaction, T), crate::OlmError>>,
598 F: FnOnce(StoreTransaction) -> Fut,
599 >(
600 &self,
601 func: F,
602 ) -> Result<T, crate::OlmError> {
603 let tr = self.transaction().await;
604 let (tr, res) = func(tr).await?;
605 tr.commit().await?;
606 Ok(res)
607 }
608
609 #[cfg(test)]
610 pub(crate) async fn reset_cross_signing_identity(&self) {
612 self.inner.identity.lock().await.reset();
613 }
614
615 pub(crate) fn private_identity(&self) -> Arc<Mutex<PrivateCrossSigningIdentity>> {
617 self.inner.identity.clone()
618 }
619
620 pub(crate) async fn save_sessions(&self, sessions: &[Session]) -> Result<()> {
622 let changes = Changes { sessions: sessions.to_vec(), ..Default::default() };
623
624 self.save_changes(changes).await
625 }
626
627 pub(crate) async fn get_sessions(
628 &self,
629 sender_key: &str,
630 ) -> Result<Option<Arc<Mutex<Vec<Session>>>>> {
631 self.inner.store.get_sessions(sender_key).await
632 }
633
634 pub(crate) async fn save_changes(&self, changes: Changes) -> Result<()> {
635 self.inner.store.save_changes(changes).await
636 }
637
638 pub(crate) async fn merge_received_group_session(
646 &self,
647 session: InboundGroupSession,
648 ) -> Result<Option<InboundGroupSession>> {
649 let old_session = self
650 .inner
651 .store
652 .get_inbound_group_session(session.room_id(), session.session_id())
653 .await?;
654
655 let Some(old_session) = old_session else {
657 info!("Received a new megolm room key");
658 return Ok(Some(session));
659 };
660
661 let index_comparison = session.compare_ratchet(&old_session).await;
662 let trust_level_comparison =
663 session.sender_data.compare_trust_level(&old_session.sender_data);
664
665 let result = match (index_comparison, trust_level_comparison) {
666 (SessionOrdering::Unconnected, _) => {
667 warn!(
671 "Received a group session with an ratchet that does not connect to the one in the store, discarding"
672 );
673 None
674 }
675
676 (SessionOrdering::Better, std::cmp::Ordering::Greater)
677 | (SessionOrdering::Better, std::cmp::Ordering::Equal)
678 | (SessionOrdering::Equal, std::cmp::Ordering::Greater) => {
679 info!(
681 ?index_comparison,
682 ?trust_level_comparison,
683 "Received a megolm room key that we have a worse version of, merging"
684 );
685 Some(session)
686 }
687
688 (SessionOrdering::Worse, std::cmp::Ordering::Less)
689 | (SessionOrdering::Worse, std::cmp::Ordering::Equal)
690 | (SessionOrdering::Equal, std::cmp::Ordering::Less) => {
691 warn!(
693 ?index_comparison,
694 ?trust_level_comparison,
695 "Received a megolm room key that we already have a better version \
696 of, discarding"
697 );
698 None
699 }
700
701 (SessionOrdering::Equal, std::cmp::Ordering::Equal) => {
702 info!("Received a megolm room key that we already have, discarding");
704 None
705 }
706
707 (SessionOrdering::Better, std::cmp::Ordering::Less) => {
708 info!("Upgrading a previously-received megolm session with new ratchet");
711 let result = old_session.with_ratchet(&session);
712 result.reset_backup_state();
714 Some(result)
715 }
716
717 (SessionOrdering::Worse, std::cmp::Ordering::Greater) => {
718 info!("Upgrading a previously-received megolm session with new sender data");
721 Some(session.with_ratchet(&old_session))
722 }
723 };
724
725 Ok(result)
726 }
727
728 #[cfg(test)]
729 pub(crate) async fn save_device_data(&self, devices: &[DeviceData]) -> Result<()> {
731 use types::DeviceChanges;
732
733 let changes = Changes {
734 devices: DeviceChanges { changed: devices.to_vec(), ..Default::default() },
735 ..Default::default()
736 };
737
738 self.save_changes(changes).await
739 }
740
741 pub(crate) async fn save_inbound_group_sessions(
743 &self,
744 sessions: &[InboundGroupSession],
745 ) -> Result<()> {
746 let changes = Changes { inbound_group_sessions: sessions.to_vec(), ..Default::default() };
747
748 self.save_changes(changes).await
749 }
750
751 pub(crate) async fn device_display_name(&self) -> Result<Option<String>, CryptoStoreError> {
753 Ok(self
754 .inner
755 .store
756 .get_device(self.user_id(), self.device_id())
757 .await?
758 .and_then(|d| d.display_name().map(|d| d.to_owned())))
759 }
760
761 pub(crate) async fn get_device_data(
766 &self,
767 user_id: &UserId,
768 device_id: &DeviceId,
769 ) -> Result<Option<DeviceData>> {
770 self.inner.store.get_device(user_id, device_id).await
771 }
772
773 pub(crate) async fn get_device_data_for_user_filtered(
781 &self,
782 user_id: &UserId,
783 ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
784 self.inner.store.get_user_devices(user_id).await.map(|mut d| {
785 if user_id == self.user_id() {
786 d.remove(self.device_id());
787 }
788 d
789 })
790 }
791
792 pub(crate) async fn get_device_data_for_user(
801 &self,
802 user_id: &UserId,
803 ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
804 self.inner.store.get_user_devices(user_id).await
805 }
806
807 pub(crate) async fn get_device_from_curve_key(
813 &self,
814 user_id: &UserId,
815 curve_key: Curve25519PublicKey,
816 ) -> Result<Option<Device>> {
817 self.get_user_devices(user_id)
818 .await
819 .map(|d| d.devices().find(|d| d.curve25519_key() == Some(curve_key)))
820 }
821
822 pub(crate) async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices> {
832 let devices = self.get_device_data_for_user(user_id).await?;
833
834 let own_identity = self
835 .inner
836 .store
837 .get_user_identity(self.user_id())
838 .await?
839 .and_then(|i| i.own().cloned());
840 let device_owner_identity = self.inner.store.get_user_identity(user_id).await?;
841
842 Ok(UserDevices {
843 inner: devices,
844 verification_machine: self.inner.verification_machine.clone(),
845 own_identity,
846 device_owner_identity,
847 })
848 }
849
850 pub(crate) async fn get_device(
860 &self,
861 user_id: &UserId,
862 device_id: &DeviceId,
863 ) -> Result<Option<Device>> {
864 if let Some(device_data) = self.inner.store.get_device(user_id, device_id).await? {
865 Ok(Some(self.wrap_device_data(device_data).await?))
866 } else {
867 Ok(None)
868 }
869 }
870
871 pub(crate) async fn wrap_device_data(&self, device_data: DeviceData) -> Result<Device> {
876 let own_identity = self
877 .inner
878 .store
879 .get_user_identity(self.user_id())
880 .await?
881 .and_then(|i| i.own().cloned());
882
883 let device_owner_identity =
884 self.inner.store.get_user_identity(device_data.user_id()).await?;
885
886 Ok(Device {
887 inner: device_data,
888 verification_machine: self.inner.verification_machine.clone(),
889 own_identity,
890 device_owner_identity,
891 })
892 }
893
894 pub(crate) async fn get_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
896 let own_identity = self
897 .inner
898 .store
899 .get_user_identity(self.user_id())
900 .await?
901 .and_then(as_variant!(UserIdentityData::Own));
902
903 Ok(self.inner.store.get_user_identity(user_id).await?.map(|i| {
904 UserIdentity::new(
905 self.clone(),
906 i,
907 self.inner.verification_machine.to_owned(),
908 own_identity,
909 )
910 }))
911 }
912
913 pub async fn export_secret(
922 &self,
923 secret_name: &SecretName,
924 ) -> Result<Option<String>, CryptoStoreError> {
925 Ok(match secret_name {
926 SecretName::CrossSigningMasterKey
927 | SecretName::CrossSigningUserSigningKey
928 | SecretName::CrossSigningSelfSigningKey => {
929 self.inner.identity.lock().await.export_secret(secret_name).await
930 }
931 SecretName::RecoveryKey => {
932 if let Some(key) = self.load_backup_keys().await?.decryption_key {
933 let exported = key.to_base64();
934 Some(exported)
935 } else {
936 None
937 }
938 }
939 name => {
940 warn!(secret = ?name, "Unknown secret was requested");
941 None
942 }
943 })
944 }
945
946 pub async fn export_cross_signing_keys(
954 &self,
955 ) -> Result<Option<CrossSigningKeyExport>, CryptoStoreError> {
956 let master_key = self.export_secret(&SecretName::CrossSigningMasterKey).await?;
957 let self_signing_key = self.export_secret(&SecretName::CrossSigningSelfSigningKey).await?;
958 let user_signing_key = self.export_secret(&SecretName::CrossSigningUserSigningKey).await?;
959
960 Ok(if master_key.is_none() && self_signing_key.is_none() && user_signing_key.is_none() {
961 None
962 } else {
963 Some(CrossSigningKeyExport { master_key, self_signing_key, user_signing_key })
964 })
965 }
966
967 pub async fn import_cross_signing_keys(
972 &self,
973 export: CrossSigningKeyExport,
974 ) -> Result<CrossSigningStatus, SecretImportError> {
975 if let Some(public_identity) =
976 self.get_identity(self.user_id()).await?.and_then(|i| i.own())
977 {
978 let identity = self.inner.identity.lock().await;
979
980 identity
981 .import_secrets(
982 public_identity.to_owned(),
983 export.master_key.as_deref(),
984 export.self_signing_key.as_deref(),
985 export.user_signing_key.as_deref(),
986 )
987 .await?;
988
989 let status = identity.status().await;
990
991 let diff = identity.get_public_identity_diff(&public_identity.inner).await;
992
993 let mut changes =
994 Changes { private_identity: Some(identity.clone()), ..Default::default() };
995
996 if diff.none_differ() {
997 public_identity.mark_as_verified();
998 changes.identities.changed.push(UserIdentityData::Own(public_identity.inner));
999 }
1000
1001 info!(?status, "Successfully imported the private cross-signing keys");
1002
1003 self.save_changes(changes).await?;
1004 } else {
1005 warn!(
1006 "No public identity found while importing cross-signing keys, \
1007 a /keys/query needs to be done"
1008 );
1009 }
1010
1011 Ok(self.inner.identity.lock().await.status().await)
1012 }
1013
1014 pub async fn export_secrets_bundle(&self) -> Result<SecretsBundle, SecretsBundleExportError> {
1026 let Some(cross_signing) = self.export_cross_signing_keys().await? else {
1027 return Err(SecretsBundleExportError::MissingCrossSigningKeys);
1028 };
1029
1030 let Some(master_key) = cross_signing.master_key.clone() else {
1031 return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::Master));
1032 };
1033
1034 let Some(user_signing_key) = cross_signing.user_signing_key.clone() else {
1035 return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::UserSigning));
1036 };
1037
1038 let Some(self_signing_key) = cross_signing.self_signing_key.clone() else {
1039 return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::SelfSigning));
1040 };
1041
1042 let backup_keys = self.load_backup_keys().await?;
1043
1044 let backup = if let Some(key) = backup_keys.decryption_key {
1045 if let Some(backup_version) = backup_keys.backup_version {
1046 Some(BackupSecrets::MegolmBackupV1Curve25519AesSha2(
1047 MegolmBackupV1Curve25519AesSha2Secrets { key, backup_version },
1048 ))
1049 } else {
1050 return Err(SecretsBundleExportError::MissingBackupVersion);
1051 }
1052 } else {
1053 None
1054 };
1055
1056 Ok(SecretsBundle {
1057 cross_signing: CrossSigningSecrets { master_key, user_signing_key, self_signing_key },
1058 backup,
1059 })
1060 }
1061
1062 pub async fn import_secrets_bundle(
1075 &self,
1076 bundle: &SecretsBundle,
1077 ) -> Result<(), SecretImportError> {
1078 let mut changes = Changes::default();
1079
1080 if let Some(backup_bundle) = &bundle.backup {
1081 match backup_bundle {
1082 BackupSecrets::MegolmBackupV1Curve25519AesSha2(bundle) => {
1083 changes.backup_decryption_key = Some(bundle.key.clone());
1084 changes.backup_version = Some(bundle.backup_version.clone());
1085 }
1086 }
1087 }
1088
1089 let identity = self.inner.identity.lock().await;
1090
1091 identity
1092 .import_secrets_unchecked(
1093 Some(&bundle.cross_signing.master_key),
1094 Some(&bundle.cross_signing.self_signing_key),
1095 Some(&bundle.cross_signing.user_signing_key),
1096 )
1097 .await?;
1098
1099 let public_identity = identity.to_public_identity().await.expect(
1100 "We should be able to create a new public identity since we just imported \
1101 all the private cross-signing keys",
1102 );
1103
1104 changes.private_identity = Some(identity.clone());
1105 changes.identities.new.push(UserIdentityData::Own(public_identity));
1106
1107 Ok(self.save_changes(changes).await?)
1108 }
1109
1110 pub async fn import_secret(&self, secret: &GossippedSecret) -> Result<(), SecretImportError> {
1112 match &secret.secret_name {
1113 SecretName::CrossSigningMasterKey
1114 | SecretName::CrossSigningUserSigningKey
1115 | SecretName::CrossSigningSelfSigningKey => {
1116 if let Some(public_identity) =
1117 self.get_identity(self.user_id()).await?.and_then(|i| i.own())
1118 {
1119 let identity = self.inner.identity.lock().await;
1120
1121 identity
1122 .import_secret(
1123 public_identity,
1124 &secret.secret_name,
1125 &secret.event.content.secret,
1126 )
1127 .await?;
1128 info!(
1129 secret_name = ?secret.secret_name,
1130 "Successfully imported a private cross signing key"
1131 );
1132
1133 let changes =
1134 Changes { private_identity: Some(identity.clone()), ..Default::default() };
1135
1136 self.save_changes(changes).await?;
1137 }
1138 }
1139 SecretName::RecoveryKey => {
1140 }
1146 name => {
1147 warn!(secret = ?name, "Tried to import an unknown secret");
1148 }
1149 }
1150
1151 Ok(())
1152 }
1153
1154 pub async fn get_only_allow_trusted_devices(&self) -> Result<bool> {
1157 let value = self.get_value("only_allow_trusted_devices").await?.unwrap_or_default();
1158 Ok(value)
1159 }
1160
1161 pub async fn set_only_allow_trusted_devices(
1164 &self,
1165 block_untrusted_devices: bool,
1166 ) -> Result<()> {
1167 self.set_value("only_allow_trusted_devices", &block_untrusted_devices).await
1168 }
1169
1170 pub async fn get_value<T: DeserializeOwned>(&self, key: &str) -> Result<Option<T>> {
1172 let Some(value) = self.get_custom_value(key).await? else {
1173 return Ok(None);
1174 };
1175 let deserialized = self.deserialize_value(&value)?;
1176 Ok(Some(deserialized))
1177 }
1178
1179 pub async fn set_value(&self, key: &str, value: &impl Serialize) -> Result<()> {
1181 let serialized = self.serialize_value(value)?;
1182 self.set_custom_value(key, serialized).await?;
1183 Ok(())
1184 }
1185
1186 fn serialize_value(&self, value: &impl Serialize) -> Result<Vec<u8>> {
1187 let serialized =
1188 rmp_serde::to_vec_named(value).map_err(|x| CryptoStoreError::Backend(x.into()))?;
1189 Ok(serialized)
1190 }
1191
1192 fn deserialize_value<T: DeserializeOwned>(&self, value: &[u8]) -> Result<T> {
1193 let deserialized =
1194 rmp_serde::from_slice(value).map_err(|e| CryptoStoreError::Backend(e.into()))?;
1195 Ok(deserialized)
1196 }
1197
1198 pub fn room_keys_received_stream(
1210 &self,
1211 ) -> impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<> {
1212 self.inner.store.room_keys_received_stream()
1213 }
1214
1215 pub fn room_keys_withheld_received_stream(
1224 &self,
1225 ) -> impl Stream<Item = Vec<RoomKeyWithheldInfo>> + use<> {
1226 self.inner.store.room_keys_withheld_received_stream()
1227 }
1228
1229 pub fn user_identities_stream(&self) -> impl Stream<Item = IdentityUpdates> + use<> {
1260 let verification_machine = self.inner.verification_machine.to_owned();
1261
1262 let this = self.clone();
1263 self.inner.store.identities_stream().map(move |(own_identity, identities, _)| {
1264 let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
1265
1266 let map_identity = |(user_id, identity)| {
1267 (
1268 user_id,
1269 UserIdentity::new(
1270 this.clone(),
1271 identity,
1272 verification_machine.to_owned(),
1273 own_identity.to_owned(),
1274 ),
1275 )
1276 };
1277
1278 let new = new_identities.into_iter().map(map_identity).collect();
1279 let changed = changed_identities.into_iter().map(map_identity).collect();
1280 let unchanged = unchanged_identities.into_iter().map(map_identity).collect();
1281
1282 IdentityUpdates { new, changed, unchanged }
1283 })
1284 }
1285
1286 pub fn devices_stream(&self) -> impl Stream<Item = DeviceUpdates> + use<> {
1318 let verification_machine = self.inner.verification_machine.to_owned();
1319
1320 self.inner.store.identities_stream().map(move |(own_identity, identities, devices)| {
1321 collect_device_updates(
1322 verification_machine.to_owned(),
1323 own_identity,
1324 identities,
1325 devices,
1326 )
1327 })
1328 }
1329
1330 pub fn identities_stream_raw(
1340 &self,
1341 ) -> impl Stream<Item = (IdentityChanges, DeviceChanges)> + use<> {
1342 self.inner.store.identities_stream().map(|(_, identities, devices)| (identities, devices))
1343 }
1344
1345 pub fn create_store_lock(
1348 &self,
1349 lock_key: String,
1350 config: CrossProcessLockConfig,
1351 ) -> CrossProcessLock<LockableCryptoStore> {
1352 self.inner.store.create_store_lock(lock_key, config)
1353 }
1354
1355 pub fn secrets_stream(&self) -> impl Stream<Item = GossippedSecret> + use<> {
1395 self.inner.store.secrets_stream()
1396 }
1397
1398 pub fn historic_room_key_stream(&self) -> impl Stream<Item = RoomKeyBundleInfo> + use<> {
1438 self.inner.store.historic_room_key_stream()
1439 }
1440
1441 pub async fn import_room_keys(
1454 &self,
1455 exported_keys: Vec<ExportedRoomKey>,
1456 from_backup_version: Option<&str>,
1457 progress_listener: impl Fn(usize, usize),
1458 ) -> Result<RoomKeyImportResult> {
1459 let exported_keys = exported_keys.iter().filter_map(|key| {
1460 key.try_into()
1461 .map_err(|e| {
1462 warn!(
1463 sender_key = key.sender_key().to_base64(),
1464 room_id = ?key.room_id(),
1465 session_id = key.session_id(),
1466 error = ?e,
1467 "Couldn't import a room key from a file export."
1468 );
1469 })
1470 .ok()
1471 });
1472 self.import_sessions_impl(exported_keys, from_backup_version, progress_listener).await
1473 }
1474
1475 pub async fn import_exported_room_keys(
1502 &self,
1503 exported_keys: Vec<ExportedRoomKey>,
1504 progress_listener: impl Fn(usize, usize),
1505 ) -> Result<RoomKeyImportResult> {
1506 self.import_room_keys(exported_keys, None, progress_listener).await
1507 }
1508
1509 async fn import_sessions_impl(
1510 &self,
1511 sessions: impl Iterator<Item = InboundGroupSession>,
1512 from_backup_version: Option<&str>,
1513 progress_listener: impl Fn(usize, usize),
1514 ) -> Result<RoomKeyImportResult> {
1515 let sessions: Vec<_> = sessions.collect();
1516 let mut imported_sessions = Vec::new();
1517
1518 let total_count = sessions.len();
1519 let mut keys = BTreeMap::new();
1520
1521 for (i, session) in sessions.into_iter().enumerate() {
1522 if let Some(merged) = self.merge_received_group_session(session).await? {
1525 if from_backup_version.is_some() {
1526 merged.mark_as_backed_up();
1527 }
1528
1529 keys.entry(merged.room_id().to_owned())
1530 .or_insert_with(BTreeMap::new)
1531 .entry(merged.sender_key().to_base64())
1532 .or_insert_with(BTreeSet::new)
1533 .insert(merged.session_id().to_owned());
1534
1535 imported_sessions.push(merged);
1536 }
1537
1538 progress_listener(i, total_count);
1539 }
1540
1541 let imported_count = imported_sessions.len();
1542
1543 self.inner
1544 .store
1545 .save_inbound_group_sessions(imported_sessions, from_backup_version)
1546 .await?;
1547
1548 info!(total_count, imported_count, room_keys = ?keys, "Successfully imported room keys");
1549
1550 Ok(RoomKeyImportResult::new(imported_count, total_count, keys))
1551 }
1552
1553 pub(crate) fn crypto_store(&self) -> Arc<CryptoStoreWrapper> {
1554 self.inner.store.clone()
1555 }
1556
1557 pub async fn export_room_keys(
1580 &self,
1581 predicate: impl FnMut(&InboundGroupSession) -> bool,
1582 ) -> Result<Vec<ExportedRoomKey>> {
1583 let mut exported = Vec::new();
1584
1585 let mut sessions = self.get_inbound_group_sessions().await?;
1586 sessions.retain(predicate);
1587
1588 for session in sessions {
1589 let export = session.export().await;
1590 exported.push(export);
1591 }
1592
1593 Ok(exported)
1594 }
1595
1596 pub async fn export_room_keys_stream(
1631 &self,
1632 predicate: impl FnMut(&InboundGroupSession) -> bool,
1633 ) -> Result<impl Stream<Item = ExportedRoomKey>> {
1634 let sessions = self.get_inbound_group_sessions().await?;
1636 Ok(futures_util::stream::iter(sessions.into_iter().filter(predicate))
1637 .then(|session| async move { session.export().await }))
1638 }
1639
1640 pub async fn build_room_key_bundle(
1645 &self,
1646 room_id: &RoomId,
1647 ) -> std::result::Result<RoomKeyBundle, CryptoStoreError> {
1648 let sessions = self.get_inbound_group_sessions_by_room_id(room_id).await?;
1649
1650 let mut bundle = RoomKeyBundle::default();
1651 for session in sessions {
1652 if session.shared_history() {
1653 bundle.room_keys.push(session.export().await.into());
1654 } else {
1655 bundle.withheld.push(RoomKeyWithheldContent::new(
1656 session.algorithm().to_owned(),
1657 WithheldCode::HistoryNotShared,
1658 session.room_id().to_owned(),
1659 session.session_id().to_owned(),
1660 session.sender_key().to_owned(),
1661 self.device_id().to_owned(),
1662 ));
1663 }
1664 }
1665
1666 let withhelds = self.get_withheld_sessions_by_room_id(room_id).await?;
1669 for withheld in withhelds {
1670 if withheld.content.withheld_code() == WithheldCode::HistoryNotShared {
1671 bundle.withheld.push(withheld.content);
1672 }
1673 }
1674
1675 Ok(bundle)
1676 }
1677
1678 #[instrument(skip(self, bundle, progress_listener), fields(bundle_size = bundle.room_keys.len(), sender_data))]
1688 pub async fn receive_room_key_bundle(
1689 &self,
1690 bundle_info: &StoredRoomKeyBundleData,
1691 bundle: RoomKeyBundle,
1692 progress_listener: impl Fn(usize, usize),
1693 ) -> Result<(), CryptoStoreError> {
1694 let sender_data = if bundle_info.sender_data.should_recalculate() {
1695 let device = self
1696 .get_device_from_curve_key(&bundle_info.sender_user, bundle_info.sender_key)
1697 .await?;
1698
1699 device
1700 .as_ref()
1701 .map(SenderData::from_device)
1702 .unwrap_or_else(|| bundle_info.sender_data.clone())
1703 } else {
1704 bundle_info.sender_data.clone()
1705 };
1706
1707 tracing::Span::current().record("sender_data", tracing::field::debug(&sender_data));
1708
1709 let Ok(forwarder_data) = (&sender_data).try_into() else {
1713 warn!(
1714 "Not accepting a historic room key bundle due to insufficient trust in the sender"
1715 );
1716 return Ok(());
1717 };
1718
1719 self.import_room_key_bundle_sessions(
1720 bundle_info,
1721 &bundle,
1722 &forwarder_data,
1723 progress_listener,
1724 )
1725 .await?;
1726 self.import_room_key_bundle_withheld_info(bundle_info, &bundle).await?;
1727
1728 Ok(())
1729 }
1730
1731 async fn import_room_key_bundle_sessions(
1732 &self,
1733 bundle_info: &StoredRoomKeyBundleData,
1734 bundle: &RoomKeyBundle,
1735 forwarder_data: &ForwarderData,
1736 progress_listener: impl Fn(usize, usize),
1737 ) -> Result<(), CryptoStoreError> {
1738 let (good, bad): (Vec<_>, Vec<_>) = bundle.room_keys.iter().partition_map(|key| {
1739 if key.room_id != bundle_info.bundle_data.room_id {
1740 trace!("Ignoring key for incorrect room {} in bundle", key.room_id);
1741 Either::Right(key)
1742 } else {
1743 Either::Left(key)
1744 }
1745 });
1746
1747 match (bad.is_empty(), good.is_empty()) {
1748 (true, true) => {
1750 warn!("Received a completely empty room key bundle");
1751 }
1752
1753 (false, true) => {
1755 let bad_keys: Vec<_> =
1756 bad.iter().map(|&key| (&key.room_id, &key.session_id)).collect();
1757
1758 warn!(
1759 ?bad_keys,
1760 "Received a room key bundle for the wrong room, ignoring all room keys from the bundle"
1761 );
1762 }
1763
1764 (_, false) => {
1766 if !bad.is_empty() {
1769 warn!(
1770 bad_key_count = bad.len(),
1771 "The room key bundle contained some room keys \
1772 that were meant for a different room"
1773 );
1774 }
1775
1776 let keys = good.iter().filter_map(|key| {
1777 key.try_into_inbound_group_session(forwarder_data)
1778 .map_err(|e| {
1779 warn!(
1780 sender_key = ?key.sender_key().to_base64(),
1781 room_id = ?key.room_id(),
1782 session_id = key.session_id(),
1783 error = ?e,
1784 "Couldn't import a room key from a key bundle."
1785 );
1786 })
1787 .ok()
1788 });
1789
1790 self.import_sessions_impl(keys, None, progress_listener).await?;
1791 }
1792 }
1793
1794 Ok(())
1795 }
1796
1797 async fn import_room_key_bundle_withheld_info(
1798 &self,
1799 bundle_info: &StoredRoomKeyBundleData,
1800 bundle: &RoomKeyBundle,
1801 ) -> Result<(), CryptoStoreError> {
1802 let mut session_id_to_withheld_code_map = BTreeMap::new();
1803
1804 let mut changes = Changes::default();
1805 for withheld in &bundle.withheld {
1806 let (room_id, session_id) = match withheld {
1807 RoomKeyWithheldContent::MegolmV1AesSha2(c) => match (c.room_id(), c.session_id()) {
1808 (Some(room_id), Some(session_id)) => (room_id, session_id),
1809 _ => continue,
1810 },
1811 #[cfg(feature = "experimental-algorithms")]
1812 RoomKeyWithheldContent::MegolmV2AesSha2(c) => match (c.room_id(), c.session_id()) {
1813 (Some(room_id), Some(session_id)) => (room_id, session_id),
1814 _ => continue,
1815 },
1816 RoomKeyWithheldContent::Unknown(_) => continue,
1817 };
1818
1819 if room_id != bundle_info.bundle_data.room_id {
1820 trace!("Ignoring withheld info for incorrect room {} in bundle", room_id);
1821 continue;
1822 }
1823
1824 changes.withheld_session_info.entry(room_id.to_owned()).or_default().insert(
1825 session_id.to_owned(),
1826 RoomKeyWithheldEntry {
1827 sender: bundle_info.sender_user.clone(),
1828 content: withheld.to_owned(),
1829 },
1830 );
1831 session_id_to_withheld_code_map.insert(session_id, withheld.withheld_code());
1832 }
1833
1834 self.save_changes(changes).await?;
1835
1836 info!(
1837 room_id = ?bundle_info.bundle_data.room_id,
1838 ?session_id_to_withheld_code_map,
1839 "Successfully imported withheld info from room key bundle",
1840 );
1841
1842 Ok(())
1843 }
1844
1845 pub async fn store_room_pending_key_bundle(
1851 &self,
1852 room_id: &RoomId,
1853 inviter: &UserId,
1854 ) -> Result<(), CryptoStoreError> {
1855 let invite_accepted_at = MilliSecondsSinceUnixEpoch::now();
1856 self.save_changes(Changes {
1857 rooms_pending_key_bundle: HashMap::from([(
1858 room_id.to_owned(),
1859 Some(RoomPendingKeyBundleDetails {
1860 room_id: room_id.to_owned(),
1861 invite_accepted_at,
1862 inviter: inviter.to_owned(),
1863 }),
1864 )]),
1865 ..Default::default()
1866 })
1867 .await?;
1868
1869 Ok(())
1870 }
1871
1872 pub async fn clear_room_pending_key_bundle(&self, room_id: &RoomId) -> Result<()> {
1876 self.save_changes(Changes {
1877 rooms_pending_key_bundle: HashMap::from([(room_id.to_owned(), None)]),
1878 ..Default::default()
1879 })
1880 .await
1881 }
1882}
1883
1884impl Deref for Store {
1885 type Target = DynCryptoStore;
1886
1887 fn deref(&self) -> &Self::Target {
1888 self.inner.store.deref().deref()
1889 }
1890}
1891
1892#[derive(Clone, Debug)]
1894pub struct LockableCryptoStore(Arc<dyn CryptoStore<Error = CryptoStoreError>>);
1895
1896impl matrix_sdk_common::cross_process_lock::TryLock for LockableCryptoStore {
1897 type LockError = CryptoStoreError;
1898
1899 async fn try_lock(
1900 &self,
1901 lease_duration_ms: u32,
1902 key: &str,
1903 holder: &str,
1904 ) -> std::result::Result<Option<CrossProcessLockGeneration>, Self::LockError> {
1905 self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
1906 }
1907}
1908
1909#[cfg(test)]
1910mod tests {
1911 use std::{collections::BTreeMap, pin::pin};
1912
1913 use assert_matches2::{assert_let, assert_matches};
1914 use futures_util::StreamExt;
1915 use insta::{_macro_support::Content, assert_json_snapshot, internals::ContentPath};
1916 use matrix_sdk_test::async_test;
1917 use ruma::{
1918 RoomId, device_id,
1919 events::room::{EncryptedFileInit, JsonWebKeyInit},
1920 owned_device_id, owned_mxc_uri, room_id,
1921 serde::Base64,
1922 user_id,
1923 };
1924 use serde_json::json;
1925 use vodozemac::{Ed25519Keypair, megolm::SessionKey};
1926
1927 use crate::{
1928 Account, OlmMachine,
1929 machine::test_helpers::get_machine_pair,
1930 olm::{InboundGroupSession, SenderData},
1931 store::types::{DehydratedDeviceKey, RoomKeyWithheldEntry, StoredRoomKeyBundleData},
1932 types::{
1933 EventEncryptionAlgorithm,
1934 events::{
1935 room_key_bundle::RoomKeyBundleContent,
1936 room_key_withheld::{MegolmV1AesSha2WithheldContent, RoomKeyWithheldContent},
1937 },
1938 },
1939 };
1940
1941 #[async_test]
1942 async fn test_merge_received_group_session() {
1943 let alice_account = Account::with_device_id(user_id!("@a:s.co"), device_id!("ABC"));
1944 let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("DEF")).await;
1945
1946 let room_id = room_id!("!test:localhost");
1947
1948 let megolm_signing_key = Ed25519Keypair::new();
1949 let inbound = make_inbound_group_session(&alice_account, &megolm_signing_key, room_id);
1950
1951 let mut inbound_at_index_5 =
1953 InboundGroupSession::from_export(&inbound.export_at_index(5).await).unwrap();
1954 inbound_at_index_5.sender_data = inbound.sender_data.clone();
1955 bob.store().save_inbound_group_sessions(&[inbound_at_index_5.clone()]).await.unwrap();
1956
1957 let disconnected = make_inbound_group_session(&alice_account, &megolm_signing_key, room_id);
1959 assert_eq!(bob.store().merge_received_group_session(disconnected).await.unwrap(), None);
1960
1961 let mut worse =
1963 InboundGroupSession::from_export(&inbound.export_at_index(10).await).unwrap();
1964 worse.sender_data = inbound.sender_data.clone();
1965 assert_eq!(bob.store().merge_received_group_session(worse).await.unwrap(), None);
1966
1967 let mut copy = InboundGroupSession::from_pickle(inbound_at_index_5.pickle().await).unwrap();
1969 copy.sender_data = inbound.sender_data.clone();
1970 assert_eq!(bob.store().merge_received_group_session(copy).await.unwrap(), None);
1971
1972 let mut better =
1974 InboundGroupSession::from_export(&inbound.export_at_index(0).await).unwrap();
1975 better.sender_data = inbound.sender_data.clone();
1976 assert_let!(Some(update) = bob.store().merge_received_group_session(better).await.unwrap());
1977 assert_eq!(update.first_known_index(), 0);
1978
1979 {
1981 let mut worse_ratchet_better_trust =
1982 InboundGroupSession::from_export(&inbound.export_at_index(10).await).unwrap();
1983 let updated_sender_data = SenderData::sender_verified(
1984 alice_account.user_id(),
1985 alice_account.device_id(),
1986 Ed25519Keypair::new().public_key(),
1987 );
1988 worse_ratchet_better_trust.sender_data = updated_sender_data.clone();
1989 assert_let!(
1990 Some(update) = bob
1991 .store()
1992 .merge_received_group_session(worse_ratchet_better_trust)
1993 .await
1994 .unwrap()
1995 );
1996 assert_eq!(update.sender_data, updated_sender_data);
1997 assert_eq!(update.first_known_index(), 5);
1998 assert_eq!(
1999 update.export_at_index(0).await.session_key.to_bytes(),
2000 inbound.export_at_index(5).await.session_key.to_bytes()
2001 );
2002 }
2003
2004 {
2006 let mut better_ratchet_worse_trust =
2007 InboundGroupSession::from_export(&inbound.export_at_index(0).await).unwrap();
2008 let updated_sender_data = SenderData::unknown();
2009 better_ratchet_worse_trust.sender_data = updated_sender_data.clone();
2010 assert_let!(
2011 Some(update) = bob
2012 .store()
2013 .merge_received_group_session(better_ratchet_worse_trust)
2014 .await
2015 .unwrap()
2016 );
2017 assert_eq!(update.sender_data, inbound.sender_data);
2018 assert_eq!(update.first_known_index(), 0);
2019 assert_eq!(
2020 update.export_at_index(0).await.session_key.to_bytes(),
2021 inbound.export_at_index(0).await.session_key.to_bytes()
2022 );
2023 }
2024 }
2025
2026 fn make_inbound_group_session(
2029 sender_account: &Account,
2030 signing_key: &Ed25519Keypair,
2031 room_id: &RoomId,
2032 ) -> InboundGroupSession {
2033 InboundGroupSession::new(
2034 sender_account.identity_keys.curve25519,
2035 sender_account.identity_keys.ed25519,
2036 room_id,
2037 &make_session_key(signing_key),
2038 SenderData::device_info(crate::types::DeviceKeys::new(
2039 sender_account.user_id().to_owned(),
2040 sender_account.device_id().to_owned(),
2041 vec![],
2042 BTreeMap::new(),
2043 crate::types::Signatures::new(),
2044 )),
2045 None,
2046 EventEncryptionAlgorithm::MegolmV1AesSha2,
2047 Some(ruma::events::room::history_visibility::HistoryVisibility::Shared),
2048 true,
2049 )
2050 .unwrap()
2051 }
2052
2053 fn make_session_key(signing_key: &Ed25519Keypair) -> SessionKey {
2056 use rand::Rng;
2057
2058 let mut session_key_bytes = vec![0u8; 229];
2063 session_key_bytes[0] = 2;
2065 rand::thread_rng().fill(&mut session_key_bytes[5..133]);
2068 session_key_bytes[133..165].copy_from_slice(signing_key.public_key().as_bytes());
2070 let sig = signing_key.sign(&session_key_bytes[0..165]);
2072 session_key_bytes[165..229].copy_from_slice(&sig.to_bytes());
2073
2074 SessionKey::from_bytes(&session_key_bytes).unwrap()
2075 }
2076
2077 #[async_test]
2078 async fn test_import_room_keys_notifies_stream() {
2079 use futures_util::FutureExt;
2080
2081 let (alice, bob, _) =
2082 get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2083
2084 let room1_id = room_id!("!room1:localhost");
2085 alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2086 let exported_sessions = alice.store().export_room_keys(|_| true).await.unwrap();
2087
2088 let mut room_keys_received_stream = Box::pin(bob.store().room_keys_received_stream());
2089 bob.store().import_room_keys(exported_sessions, None, |_, _| {}).await.unwrap();
2090
2091 let room_keys = room_keys_received_stream
2092 .next()
2093 .now_or_never()
2094 .flatten()
2095 .expect("We should have received an update of room key infos")
2096 .unwrap();
2097 assert_eq!(room_keys.len(), 1);
2098 assert_eq!(room_keys[0].room_id, "!room1:localhost");
2099 }
2100
2101 #[async_test]
2102 async fn test_export_room_keys_provides_selected_keys() {
2103 let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2105 let room1_id = room_id!("!room1:localhost");
2106 let room2_id = room_id!("!room2:localhost");
2107 let room3_id = room_id!("!room3:localhost");
2108 alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2109 alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2110 alice.create_outbound_group_session_with_defaults_test_helper(room3_id).await.unwrap();
2111
2112 let keys = alice
2114 .store()
2115 .export_room_keys(|s| s.room_id() == room2_id || s.room_id() == room3_id)
2116 .await
2117 .unwrap();
2118
2119 assert_eq!(keys.len(), 2);
2121 assert_eq!(keys[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2122 assert_eq!(keys[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2123 assert_eq!(keys[0].room_id, "!room2:localhost");
2124 assert_eq!(keys[1].room_id, "!room3:localhost");
2125 assert_eq!(keys[0].session_key.to_base64().len(), 220);
2126 assert_eq!(keys[1].session_key.to_base64().len(), 220);
2127 }
2128
2129 #[async_test]
2130 async fn test_export_room_keys_stream_can_provide_all_keys() {
2131 let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2133 let room1_id = room_id!("!room1:localhost");
2134 let room2_id = room_id!("!room2:localhost");
2135 alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2136 alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2137
2138 let mut keys = pin!(alice.store().export_room_keys_stream(|_| true).await.unwrap());
2140
2141 let mut collected = vec![];
2143 while let Some(key) = keys.next().await {
2144 collected.push(key);
2145 }
2146
2147 assert_eq!(collected.len(), 2);
2149 assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2150 assert_eq!(collected[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2151 assert_eq!(collected[0].room_id, "!room1:localhost");
2152 assert_eq!(collected[1].room_id, "!room2:localhost");
2153 assert_eq!(collected[0].session_key.to_base64().len(), 220);
2154 assert_eq!(collected[1].session_key.to_base64().len(), 220);
2155 }
2156
2157 #[async_test]
2158 async fn test_export_room_keys_stream_can_provide_a_subset_of_keys() {
2159 let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2161 let room1_id = room_id!("!room1:localhost");
2162 let room2_id = room_id!("!room2:localhost");
2163 alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2164 alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2165
2166 let mut keys =
2168 pin!(alice.store().export_room_keys_stream(|s| s.room_id() == room1_id).await.unwrap());
2169
2170 let mut collected = vec![];
2172 while let Some(key) = keys.next().await {
2173 collected.push(key);
2174 }
2175
2176 assert_eq!(collected.len(), 1);
2178 assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2179 assert_eq!(collected[0].room_id, "!room1:localhost");
2180 assert_eq!(collected[0].session_key.to_base64().len(), 220);
2181 }
2182
2183 #[async_test]
2184 async fn test_export_secrets_bundle() {
2185 let user_id = user_id!("@alice:example.com");
2186 let (first, second, _) = get_machine_pair(user_id, user_id, false).await;
2187
2188 let _ = first
2189 .bootstrap_cross_signing(false)
2190 .await
2191 .expect("We should be able to bootstrap cross-signing");
2192
2193 let bundle = first.store().export_secrets_bundle().await.expect(
2194 "We should be able to export the secrets bundle, now that we \
2195 have the cross-signing keys",
2196 );
2197
2198 assert!(bundle.backup.is_none(), "The bundle should not contain a backup key");
2199
2200 second
2201 .store()
2202 .import_secrets_bundle(&bundle)
2203 .await
2204 .expect("We should be able to import the secrets bundle");
2205
2206 let status = second.cross_signing_status().await;
2207 let identity = second.get_identity(user_id, None).await.unwrap().unwrap().own().unwrap();
2208
2209 assert!(identity.is_verified(), "The public identity should be marked as verified.");
2210
2211 assert!(status.is_complete(), "We should have imported all the cross-signing keys");
2212 }
2213
2214 #[async_test]
2215 async fn test_create_dehydrated_device_key() {
2216 let pickle_key = DehydratedDeviceKey::new()
2217 .expect("Should be able to create a random dehydrated device key");
2218
2219 let to_vec = pickle_key.inner.to_vec();
2220 let pickle_key_from_slice = DehydratedDeviceKey::from_slice(to_vec.as_slice())
2221 .expect("Should be able to create a dehydrated device key from slice");
2222
2223 assert_eq!(pickle_key_from_slice.to_base64(), pickle_key.to_base64());
2224 }
2225
2226 #[async_test]
2227 async fn test_create_dehydrated_errors() {
2228 let too_small = [0u8; 22];
2229 let pickle_key = DehydratedDeviceKey::from_slice(&too_small);
2230
2231 assert!(pickle_key.is_err());
2232
2233 let too_big = [0u8; 40];
2234 let pickle_key = DehydratedDeviceKey::from_slice(&too_big);
2235
2236 assert!(pickle_key.is_err());
2237 }
2238
2239 #[async_test]
2240 async fn test_build_room_key_bundle() {
2241 let alice = OlmMachine::new(user_id!("@a:s.co"), device_id!("ALICE")).await;
2244 let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("BOB")).await;
2245
2246 let room1_id = room_id!("!room1:localhost");
2247 let room2_id = room_id!("!room2:localhost");
2248
2249 let session_key1 = "AgAAAAC2XHVzsMBKs4QCRElJ92CJKyGtknCSC8HY7cQ7UYwndMKLQAejXLh5UA0l6s736mgctcUMNvELScUWrObdflrHo+vth/gWreXOaCnaSxmyjjKErQwyIYTkUfqbHy40RJfEesLwnN23on9XAkch/iy8R2+Jz7B8zfG01f2Ow2SxPQFnAndcO1ZSD2GmXgedy6n4B20MWI1jGP2wiexOWbFSya8DO/VxC9m5+/mF+WwYqdpKn9g4Y05Yw4uz7cdjTc3rXm7xK+8E7hI//5QD1nHPvuKYbjjM9u2JSL+Bzp61Cw";
2254 let session_key2 = "AgAAAAC1BXreFTUQQSBGekTEuYxhdytRKyv4JgDGcG+VOBYdPNGgs807SdibCGJky4lJ3I+7ZDGHoUzZPZP/4ogGu4kxni0PWdtWuN7+5zsuamgoFF/BkaGeUUGv6kgIkx8pyPpM5SASTUEP9bN2loDSpUPYwfiIqz74DgC4WQ4435sTBctYvKz8n+TDJwdLXpyT6zKljuqADAioud+s/iqx9LYn9HpbBfezZcvbg67GtE113pLrvde3IcPI5s6dNHK2onGO2B2eoaobcen18bbEDnlUGPeIivArLya7Da6us14jBQ";
2255 let session_key3 = "AgAAAAAM9KFsliaUUhGSXgwOzM5UemjkNH4n8NHgvC/y8hhw13zTF+ooGD4uIYEXYX630oNvQm/EvgZo+dkoc0re+vsqsx4sQeNODdSjcBsWOa0oDF+irQn9oYoLUDPI1IBtY1rX+FV99Zm/xnG7uFOX7aTVlko2GSdejy1w9mfobmfxu5aUc04A9zaKJP1pOthZvRAlhpymGYHgsDtWPrrjyc/yypMflE4kIUEEEtu1kT6mrAmcl615XYRAHYK9G2+fZsGvokwzbkl4nulGwcZMpQEoM0nD2o3GWgX81HW3nGfKBg";
2256 let session_key4 = "AgAAAAA4Kkesxq2h4v9PLD6Sm3Smxspz1PXTqytQPCMQMkkrHNmzV2bHlJ+6/Al9cu8vh1Oj69AK0WUAeJOJuaiskEeg/PI3P03+UYLeC379RzgqwSHdBgdQ41G2vD6zpgmE/8vYToe+qpCZACtPOswZxyqxHH+T/Iq0nv13JmlFGIeA6fEPfr5Y28B49viG74Fs9rxV9EH5PfjbuPM/p+Sz5obShuaBPKQBX1jT913nEXPoIJ06exNZGr0285nw/LgVvNlmWmbqNnbzO2cNZjQWA+xZYz5FSfyCxwqEBbEdUCuRCQ";
2257
2258 let sessions = [
2259 create_inbound_group_session_with_visibility(
2260 &alice,
2261 room1_id,
2262 &SessionKey::from_base64(session_key1).unwrap(),
2263 true,
2264 ),
2265 create_inbound_group_session_with_visibility(
2266 &alice,
2267 room1_id,
2268 &SessionKey::from_base64(session_key2).unwrap(),
2269 true,
2270 ),
2271 create_inbound_group_session_with_visibility(
2272 &alice,
2273 room1_id,
2274 &SessionKey::from_base64(session_key3).unwrap(),
2275 false,
2276 ),
2277 create_inbound_group_session_with_visibility(
2278 &alice,
2279 room2_id,
2280 &SessionKey::from_base64(session_key4).unwrap(),
2281 true,
2282 ),
2283 ];
2284 bob.store().save_inbound_group_sessions(&sessions).await.unwrap();
2285
2286 let mut bundle = bob.store().build_room_key_bundle(room1_id).await.unwrap();
2288
2289 bundle.room_keys.sort_by_key(|session| session.session_id.clone());
2293
2294 let algorithm = if cfg!(feature = "experimental-algorithms") {
2296 "m.megolm.v2.aes-sha2"
2297 } else {
2298 "m.megolm.v1.aes-sha2"
2299 };
2300 let map_algorithm = move |value: Content, _path: ContentPath<'_>| {
2301 assert_eq!(value.as_str().unwrap(), algorithm);
2302 "[algorithm]"
2303 };
2304
2305 let alice_curve_key = alice.identity_keys().curve25519.to_base64();
2307 let map_alice_curve_key = move |value: Content, _path: ContentPath<'_>| {
2308 assert_eq!(value.as_str().unwrap(), alice_curve_key);
2309 "[alice curve key]"
2310 };
2311 let alice_ed25519_key = alice.identity_keys().ed25519.to_base64();
2312 let map_alice_ed25519_key = move |value: Content, _path: ContentPath<'_>| {
2313 assert_eq!(value.as_str().unwrap(), alice_ed25519_key);
2314 "[alice ed25519 key]"
2315 };
2316
2317 insta::with_settings!({ sort_maps => true }, {
2318 assert_json_snapshot!(bundle, {
2319 ".withheld[].algorithm" => insta::dynamic_redaction(map_algorithm),
2320 ".room_keys[].algorithm" => insta::dynamic_redaction(map_algorithm),
2321 ".room_keys[].sender_key" => insta::dynamic_redaction(map_alice_curve_key.clone()),
2322 ".withheld[].sender_key" => insta::dynamic_redaction(map_alice_curve_key),
2323 ".room_keys[].sender_claimed_keys.ed25519" => insta::dynamic_redaction(map_alice_ed25519_key),
2324 });
2325 });
2326 }
2327
2328 #[async_test]
2329 async fn test_receive_room_key_bundle() {
2330 let alice = OlmMachine::new(user_id!("@a:s.co"), device_id!("ALICE")).await;
2331 let alice_key = alice.identity_keys().curve25519;
2332 let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("BOB")).await;
2333
2334 let room_id = room_id!("!room1:localhost");
2335
2336 let session_key1 = "AgAAAAC2XHVzsMBKs4QCRElJ92CJKyGtknCSC8HY7cQ7UYwndMKLQAejXLh5UA0l6s736mgctcUMNvELScUWrObdflrHo+vth/gWreXOaCnaSxmyjjKErQwyIYTkUfqbHy40RJfEesLwnN23on9XAkch/iy8R2+Jz7B8zfG01f2Ow2SxPQFnAndcO1ZSD2GmXgedy6n4B20MWI1jGP2wiexOWbFSya8DO/VxC9m5+/mF+WwYqdpKn9g4Y05Yw4uz7cdjTc3rXm7xK+8E7hI//5QD1nHPvuKYbjjM9u2JSL+Bzp61Cw";
2337 let session_key2 = "AgAAAAC1BXreFTUQQSBGekTEuYxhdytRKyv4JgDGcG+VOBYdPNGgs807SdibCGJky4lJ3I+7ZDGHoUzZPZP/4ogGu4kxni0PWdtWuN7+5zsuamgoFF/BkaGeUUGv6kgIkx8pyPpM5SASTUEP9bN2loDSpUPYwfiIqz74DgC4WQ4435sTBctYvKz8n+TDJwdLXpyT6zKljuqADAioud+s/iqx9LYn9HpbBfezZcvbg67GtE113pLrvde3IcPI5s6dNHK2onGO2B2eoaobcen18bbEDnlUGPeIivArLya7Da6us14jBQ";
2338
2339 let sessions = [
2340 create_inbound_group_session_with_visibility(
2341 &alice,
2342 room_id,
2343 &SessionKey::from_base64(session_key1).unwrap(),
2344 true,
2345 ),
2346 create_inbound_group_session_with_visibility(
2347 &alice,
2348 room_id,
2349 &SessionKey::from_base64(session_key2).unwrap(),
2350 false,
2351 ),
2352 ];
2353
2354 alice.store().save_inbound_group_sessions(&sessions).await.unwrap();
2355 let bundle = alice.store().build_room_key_bundle(room_id).await.unwrap();
2356
2357 bob.store()
2358 .receive_room_key_bundle(
2359 &StoredRoomKeyBundleData {
2360 sender_user: alice.user_id().to_owned(),
2361 sender_key: alice_key,
2362 sender_data: SenderData::sender_verified(
2363 alice.user_id(),
2364 device_id!("ALICE"),
2365 alice.identity_keys().ed25519,
2366 ),
2367
2368 bundle_data: RoomKeyBundleContent {
2369 room_id: room_id.to_owned(),
2370 file: EncryptedFileInit {
2373 url: owned_mxc_uri!("mxc://example.com/0"),
2374 key: JsonWebKeyInit {
2375 kty: "oct".to_owned(),
2376 key_ops: vec!["encrypt".to_owned(), "decrypt".to_owned()],
2377 alg: "A256CTR.".to_owned(),
2378 k: Base64::new(vec![0u8; 128]),
2379 ext: true,
2380 }
2381 .into(),
2382 iv: Base64::new(vec![0u8; 128]),
2383 hashes: vec![("sha256".to_owned(), Base64::new(vec![0u8; 128]))]
2384 .into_iter()
2385 .collect(),
2386 v: "v2".to_owned(),
2387 }
2388 .into(),
2389 },
2390 },
2391 bundle,
2392 |_, _| {},
2393 )
2394 .await
2395 .unwrap();
2396
2397 let imported_sessions =
2399 bob.store().get_inbound_group_sessions_by_room_id(room_id).await.unwrap();
2400
2401 assert_eq!(imported_sessions.len(), 1);
2402 assert_eq!(imported_sessions[0].room_id(), room_id);
2403
2404 assert_eq!(
2406 imported_sessions[0]
2407 .forwarder_data
2408 .as_ref()
2409 .expect("Session should contain forwarder data.")
2410 .user_id(),
2411 alice.user_id()
2412 );
2413
2414 assert_matches!(
2415 bob.store()
2416 .get_withheld_info(room_id, sessions[1].session_id())
2417 .await
2418 .unwrap()
2419 .expect("Withheld info should be present in the store."),
2420 RoomKeyWithheldEntry {
2421 #[cfg(not(feature = "experimental-algorithms"))]
2422 content: RoomKeyWithheldContent::MegolmV1AesSha2(
2423 MegolmV1AesSha2WithheldContent::HistoryNotShared(_)
2424 ),
2425 #[cfg(feature = "experimental-algorithms")]
2426 content: RoomKeyWithheldContent::MegolmV2AesSha2(
2427 MegolmV1AesSha2WithheldContent::HistoryNotShared(_)
2428 ),
2429 ..
2430 }
2431 );
2432 }
2433
2434 #[async_test]
2440 async fn test_deserialize_room_key_withheld_entry_from_to_device_event() {
2441 let entry: RoomKeyWithheldEntry = serde_json::from_value(json!(
2442 {
2443 "content": {
2444 "algorithm": "m.megolm.v1.aes-sha2",
2445 "code": "m.unauthorised",
2446 "from_device": "ALICE",
2447 "reason": "You are not authorised to read the message.",
2448 "room_id": "!roomid:s.co",
2449 "sender_key": "7hIcOrEroXYdzjtCBvBjUiqvT0Me7g+ymeXqoc65RS0",
2450 "session_id": "session123"
2451 },
2452 "sender": "@alice:s.co",
2453 "type": "m.room_key.withheld"
2454 }
2455 ))
2456 .unwrap();
2457
2458 assert_matches!(
2459 entry,
2460 RoomKeyWithheldEntry {
2461 sender,
2462 content: RoomKeyWithheldContent::MegolmV1AesSha2(
2463 MegolmV1AesSha2WithheldContent::Unauthorised(withheld_content,)
2464 ),
2465 }
2466 );
2467
2468 assert_eq!(sender, "@alice:s.co");
2469 assert_eq!(withheld_content.room_id, "!roomid:s.co");
2470 assert_eq!(withheld_content.session_id, "session123");
2471 assert_eq!(
2472 withheld_content.sender_key.to_base64(),
2473 "7hIcOrEroXYdzjtCBvBjUiqvT0Me7g+ymeXqoc65RS0"
2474 );
2475 assert_eq!(withheld_content.from_device, Some(owned_device_id!("ALICE")));
2476 }
2477
2478 fn create_inbound_group_session_with_visibility(
2489 olm_machine: &OlmMachine,
2490 room_id: &RoomId,
2491 session_key: &SessionKey,
2492 shared_history: bool,
2493 ) -> InboundGroupSession {
2494 let identity_keys = &olm_machine.store().static_account().identity_keys;
2495 InboundGroupSession::new(
2496 identity_keys.curve25519,
2497 identity_keys.ed25519,
2498 room_id,
2499 session_key,
2500 SenderData::unknown(),
2501 None,
2502 #[cfg(not(feature = "experimental-algorithms"))]
2503 EventEncryptionAlgorithm::MegolmV1AesSha2,
2504 #[cfg(feature = "experimental-algorithms")]
2505 EventEncryptionAlgorithm::MegolmV2AesSha2,
2506 None,
2507 shared_history,
2508 )
2509 .unwrap()
2510 }
2511}