1use std::{
42 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
43 fmt::Debug,
44 ops::Deref,
45 pin::pin,
46 sync::{Arc, atomic::Ordering},
47 time::Duration,
48};
49
50use as_variant::as_variant;
51use futures_core::Stream;
52use futures_util::StreamExt;
53use itertools::{Either, Itertools};
54use ruma::{
55 DeviceId, OwnedDeviceId, OwnedUserId, RoomId, UserId, encryption::KeyUsage,
56 events::secret::request::SecretName,
57};
58use serde::{Serialize, de::DeserializeOwned};
59use thiserror::Error;
60use tokio::sync::{Mutex, Notify, OwnedRwLockWriteGuard, RwLock};
61use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
62use tracing::{info, instrument, trace, warn};
63use types::{RoomKeyBundleInfo, StoredRoomKeyBundleData};
64use vodozemac::{Curve25519PublicKey, megolm::SessionOrdering};
65
66use self::types::{
67 Changes, CrossSigningKeyExport, DeviceChanges, DeviceUpdates, IdentityChanges, IdentityUpdates,
68 PendingChanges, RoomKeyInfo, RoomKeyWithheldInfo, UserKeyQueryResult,
69};
70use crate::{
71 CrossSigningStatus, OwnUserIdentityData, RoomKeyImportResult,
72 gossiping::GossippedSecret,
73 identities::{Device, DeviceData, UserDevices, UserIdentityData, user::UserIdentity},
74 olm::{
75 Account, ExportedRoomKey, ForwarderData, InboundGroupSession, PrivateCrossSigningIdentity,
76 SenderData, Session, StaticAccountData,
77 },
78 store::types::RoomKeyWithheldEntry,
79 types::{
80 BackupSecrets, CrossSigningSecrets, MegolmBackupV1Curve25519AesSha2Secrets, RoomKeyExport,
81 SecretsBundle,
82 },
83 verification::VerificationMachine,
84};
85#[cfg(doc)]
86use crate::{backups::BackupMachine, identities::OwnUserIdentity};
87
88pub mod caches;
89mod crypto_store_wrapper;
90mod error;
91mod memorystore;
92mod traits;
93pub mod types;
94
95#[cfg(any(test, feature = "testing"))]
96#[macro_use]
97#[allow(missing_docs)]
98pub mod integration_tests;
99
100pub(crate) use crypto_store_wrapper::CryptoStoreWrapper;
101pub use error::{CryptoStoreError, Result};
102use matrix_sdk_common::{
103 cross_process_lock::{CrossProcessLock, CrossProcessLockGeneration},
104 deserialized_responses::WithheldCode,
105 timeout::timeout,
106};
107pub use memorystore::MemoryStore;
108pub use traits::{CryptoStore, DynCryptoStore, IntoCryptoStore};
109
110use self::caches::{SequenceNumber, StoreCache, StoreCacheGuard, UsersForKeyQuery};
111use crate::types::{
112 events::room_key_withheld::RoomKeyWithheldContent, room_history::RoomKeyBundle,
113};
114pub use crate::{
115 dehydrated_devices::DehydrationError,
116 gossiping::{GossipRequest, SecretInfo},
117};
118
119#[derive(Debug, Clone)]
126pub struct Store {
127 inner: Arc<StoreInner>,
128}
129
130#[derive(Debug, Default)]
131pub(crate) struct KeyQueryManager {
132 users_for_key_query: Mutex<UsersForKeyQuery>,
134
135 users_for_key_query_notify: Notify,
137}
138
139impl KeyQueryManager {
140 pub async fn synced<'a>(&'a self, cache: &'a StoreCache) -> Result<SyncedKeyQueryManager<'a>> {
141 self.ensure_sync_tracked_users(cache).await?;
142 Ok(SyncedKeyQueryManager { cache, manager: self })
143 }
144
145 async fn ensure_sync_tracked_users(&self, cache: &StoreCache) -> Result<()> {
152 let loaded = cache.loaded_tracked_users.read().await;
154 if *loaded {
155 return Ok(());
156 }
157
158 drop(loaded);
160 let mut loaded = cache.loaded_tracked_users.write().await;
161
162 if *loaded {
166 return Ok(());
167 }
168
169 let tracked_users = cache.store.load_tracked_users().await?;
170
171 let mut query_users_lock = self.users_for_key_query.lock().await;
172 let mut tracked_users_cache = cache.tracked_users.write();
173 for user in tracked_users {
174 tracked_users_cache.insert(user.user_id.to_owned());
175
176 if user.dirty {
177 query_users_lock.insert_user(&user.user_id);
178 }
179 }
180
181 *loaded = true;
182
183 Ok(())
184 }
185
186 pub async fn wait_if_user_key_query_pending(
196 &self,
197 cache: StoreCacheGuard,
198 timeout_duration: Duration,
199 user: &UserId,
200 ) -> Result<UserKeyQueryResult> {
201 {
202 self.ensure_sync_tracked_users(&cache).await?;
205 drop(cache);
206 }
207
208 let mut users_for_key_query = self.users_for_key_query.lock().await;
209 let Some(waiter) = users_for_key_query.maybe_register_waiting_task(user) else {
210 return Ok(UserKeyQueryResult::WasNotPending);
211 };
212
213 let wait_for_completion = async {
214 while !waiter.completed.load(Ordering::Relaxed) {
215 let mut notified = pin!(self.users_for_key_query_notify.notified());
219 notified.as_mut().enable();
220 drop(users_for_key_query);
221
222 notified.await;
224
225 users_for_key_query = self.users_for_key_query.lock().await;
229 }
230 };
231
232 match timeout(Box::pin(wait_for_completion), timeout_duration).await {
233 Err(_) => {
234 warn!(
235 user_id = ?user,
236 "The user has a pending `/keys/query` request which did \
237 not finish yet, some devices might be missing."
238 );
239
240 Ok(UserKeyQueryResult::TimeoutExpired)
241 }
242 _ => Ok(UserKeyQueryResult::WasPending),
243 }
244 }
245}
246
247pub(crate) struct SyncedKeyQueryManager<'a> {
248 cache: &'a StoreCache,
249 manager: &'a KeyQueryManager,
250}
251
252impl SyncedKeyQueryManager<'_> {
253 pub async fn update_tracked_users(&self, users: impl Iterator<Item = &UserId>) -> Result<()> {
258 let mut store_updates = Vec::new();
259 let mut key_query_lock = self.manager.users_for_key_query.lock().await;
260
261 {
262 let mut tracked_users = self.cache.tracked_users.write();
263 for user_id in users {
264 if tracked_users.insert(user_id.to_owned()) {
265 key_query_lock.insert_user(user_id);
266 store_updates.push((user_id, true))
267 }
268 }
269 }
270
271 self.cache.store.save_tracked_users(&store_updates).await
272 }
273
274 pub async fn mark_tracked_users_as_changed(
281 &self,
282 users: impl Iterator<Item = &UserId>,
283 ) -> Result<()> {
284 let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
285 let mut key_query_lock = self.manager.users_for_key_query.lock().await;
286
287 {
288 let tracked_users = &self.cache.tracked_users.read();
289 for user_id in users {
290 if tracked_users.contains(user_id) {
291 key_query_lock.insert_user(user_id);
292 store_updates.push((user_id, true));
293 }
294 }
295 }
296
297 self.cache.store.save_tracked_users(&store_updates).await
298 }
299
300 pub async fn mark_tracked_users_as_up_to_date(
306 &self,
307 users: impl Iterator<Item = &UserId>,
308 sequence_number: SequenceNumber,
309 ) -> Result<()> {
310 let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
311 let mut key_query_lock = self.manager.users_for_key_query.lock().await;
312
313 {
314 let tracked_users = self.cache.tracked_users.read();
315 for user_id in users {
316 if tracked_users.contains(user_id) {
317 let clean = key_query_lock.maybe_remove_user(user_id, sequence_number);
318 store_updates.push((user_id, !clean));
319 }
320 }
321 }
322
323 self.cache.store.save_tracked_users(&store_updates).await?;
324 self.manager.users_for_key_query_notify.notify_waiters();
326
327 Ok(())
328 }
329
330 pub async fn users_for_key_query(&self) -> (HashSet<OwnedUserId>, SequenceNumber) {
342 self.manager.users_for_key_query.lock().await.users_for_key_query()
343 }
344
345 pub fn tracked_users(&self) -> HashSet<OwnedUserId> {
347 self.cache.tracked_users.read().iter().cloned().collect()
348 }
349
350 pub async fn mark_user_as_changed(&self, user: &UserId) -> Result<()> {
356 self.manager.users_for_key_query.lock().await.insert_user(user);
357 self.cache.tracked_users.write().insert(user.to_owned());
358
359 self.cache.store.save_tracked_users(&[(user, true)]).await
360 }
361}
362
363fn collect_device_updates(
369 verification_machine: VerificationMachine,
370 own_identity: Option<OwnUserIdentityData>,
371 identities: IdentityChanges,
372 devices: DeviceChanges,
373) -> DeviceUpdates {
374 let mut new: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
375 let mut changed: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
376
377 let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
378
379 let map_device = |device: DeviceData| {
380 let device_owner_identity = new_identities
381 .get(device.user_id())
382 .or_else(|| changed_identities.get(device.user_id()))
383 .or_else(|| unchanged_identities.get(device.user_id()))
384 .cloned();
385
386 Device {
387 inner: device,
388 verification_machine: verification_machine.to_owned(),
389 own_identity: own_identity.to_owned(),
390 device_owner_identity,
391 }
392 };
393
394 for device in devices.new {
395 let device = map_device(device);
396
397 new.entry(device.user_id().to_owned())
398 .or_default()
399 .insert(device.device_id().to_owned(), device);
400 }
401
402 for device in devices.changed {
403 let device = map_device(device);
404
405 changed
406 .entry(device.user_id().to_owned())
407 .or_default()
408 .insert(device.device_id().to_owned(), device.to_owned());
409 }
410
411 DeviceUpdates { new, changed }
412}
413
414#[allow(missing_debug_implementations)]
416pub struct StoreTransaction {
417 store: Store,
418 changes: PendingChanges,
419 cache: OwnedRwLockWriteGuard<StoreCache>,
421}
422
423impl StoreTransaction {
424 async fn new(store: Store) -> Self {
426 let cache = store.inner.cache.clone();
427
428 Self { store, changes: PendingChanges::default(), cache: cache.clone().write_owned().await }
429 }
430
431 pub(crate) fn cache(&self) -> &StoreCache {
432 &self.cache
433 }
434
435 pub fn store(&self) -> &Store {
437 &self.store
438 }
439
440 pub async fn account(&mut self) -> Result<&mut Account> {
447 if self.changes.account.is_none() {
448 let _ = self.cache.account().await?;
450 self.changes.account = self.cache.account.lock().await.take();
451 }
452 Ok(self.changes.account.as_mut().unwrap())
453 }
454
455 pub async fn commit(self) -> Result<()> {
458 if self.changes.is_empty() {
459 return Ok(());
460 }
461
462 let account = self.changes.account.as_ref().map(|acc| acc.deep_clone());
464
465 self.store.save_pending_changes(self.changes).await?;
466
467 if let Some(account) = account {
469 *self.cache.account.lock().await = Some(account);
470 }
471
472 Ok(())
473 }
474}
475
476#[derive(Debug)]
477struct StoreInner {
478 identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
479 store: Arc<CryptoStoreWrapper>,
480
481 cache: Arc<RwLock<StoreCache>>,
485
486 verification_machine: VerificationMachine,
487
488 static_account: StaticAccountData,
491}
492
493#[derive(Debug, Error)]
496pub enum SecretImportError {
497 #[error("Error while importing {name}: {error}")]
499 Key {
500 name: SecretName,
502 error: vodozemac::KeyError,
504 },
505 #[error(
508 "Error while importing {name}: The public key of the imported private \
509 key doesn't match the public key that was uploaded to the server"
510 )]
511 MismatchedPublicKeys {
512 name: SecretName,
514 },
515 #[error(transparent)]
517 Store(#[from] CryptoStoreError),
518}
519
520#[derive(Debug, Error)]
525pub enum SecretsBundleExportError {
526 #[error(transparent)]
528 Store(#[from] CryptoStoreError),
529 #[error("The store is missing one or multiple cross-signing keys")]
531 MissingCrossSigningKey(KeyUsage),
532 #[error("The store doesn't contain any cross-signing keys")]
534 MissingCrossSigningKeys,
535 #[error("The store contains a backup key, but no backup version")]
538 MissingBackupVersion,
539}
540
541impl Store {
542 pub(crate) fn new(
544 account: StaticAccountData,
545 identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
546 store: Arc<CryptoStoreWrapper>,
547 verification_machine: VerificationMachine,
548 ) -> Self {
549 Self {
550 inner: Arc::new(StoreInner {
551 static_account: account,
552 identity,
553 store: store.clone(),
554 verification_machine,
555 cache: Arc::new(RwLock::new(StoreCache {
556 store,
557 tracked_users: Default::default(),
558 loaded_tracked_users: Default::default(),
559 account: Default::default(),
560 })),
561 }),
562 }
563 }
564
565 pub(crate) fn user_id(&self) -> &UserId {
567 &self.inner.static_account.user_id
568 }
569
570 pub(crate) fn device_id(&self) -> &DeviceId {
572 self.inner.verification_machine.own_device_id()
573 }
574
575 pub(crate) fn static_account(&self) -> &StaticAccountData {
577 &self.inner.static_account
578 }
579
580 pub(crate) async fn cache(&self) -> Result<StoreCacheGuard> {
581 Ok(StoreCacheGuard { cache: self.inner.cache.clone().read_owned().await })
586 }
587
588 pub(crate) async fn transaction(&self) -> StoreTransaction {
589 StoreTransaction::new(self.clone()).await
590 }
591
592 pub(crate) async fn with_transaction<
595 T,
596 Fut: Future<Output = Result<(StoreTransaction, T), crate::OlmError>>,
597 F: FnOnce(StoreTransaction) -> Fut,
598 >(
599 &self,
600 func: F,
601 ) -> Result<T, crate::OlmError> {
602 let tr = self.transaction().await;
603 let (tr, res) = func(tr).await?;
604 tr.commit().await?;
605 Ok(res)
606 }
607
608 #[cfg(test)]
609 pub(crate) async fn reset_cross_signing_identity(&self) {
611 self.inner.identity.lock().await.reset();
612 }
613
614 pub(crate) fn private_identity(&self) -> Arc<Mutex<PrivateCrossSigningIdentity>> {
616 self.inner.identity.clone()
617 }
618
619 pub(crate) async fn save_sessions(&self, sessions: &[Session]) -> Result<()> {
621 let changes = Changes { sessions: sessions.to_vec(), ..Default::default() };
622
623 self.save_changes(changes).await
624 }
625
626 pub(crate) async fn get_sessions(
627 &self,
628 sender_key: &str,
629 ) -> Result<Option<Arc<Mutex<Vec<Session>>>>> {
630 self.inner.store.get_sessions(sender_key).await
631 }
632
633 pub(crate) async fn save_changes(&self, changes: Changes) -> Result<()> {
634 self.inner.store.save_changes(changes).await
635 }
636
637 pub(crate) async fn merge_received_group_session(
645 &self,
646 session: InboundGroupSession,
647 ) -> Result<Option<InboundGroupSession>> {
648 let old_session = self
649 .inner
650 .store
651 .get_inbound_group_session(session.room_id(), session.session_id())
652 .await?;
653
654 let Some(old_session) = old_session else {
656 info!("Received a new megolm room key");
657 return Ok(Some(session));
658 };
659
660 let index_comparison = session.compare_ratchet(&old_session).await;
661 let trust_level_comparison =
662 session.sender_data.compare_trust_level(&old_session.sender_data);
663
664 let result = match (index_comparison, trust_level_comparison) {
665 (SessionOrdering::Unconnected, _) => {
666 warn!(
670 "Received a group session with an ratchet that does not connect to the one in the store, discarding"
671 );
672 None
673 }
674
675 (SessionOrdering::Better, std::cmp::Ordering::Greater)
676 | (SessionOrdering::Better, std::cmp::Ordering::Equal)
677 | (SessionOrdering::Equal, std::cmp::Ordering::Greater) => {
678 info!(
680 ?index_comparison,
681 ?trust_level_comparison,
682 "Received a megolm room key that we have a worse version of, merging"
683 );
684 Some(session)
685 }
686
687 (SessionOrdering::Worse, std::cmp::Ordering::Less)
688 | (SessionOrdering::Worse, std::cmp::Ordering::Equal)
689 | (SessionOrdering::Equal, std::cmp::Ordering::Less) => {
690 warn!(
692 ?index_comparison,
693 ?trust_level_comparison,
694 "Received a megolm room key that we already have a better version \
695 of, discarding"
696 );
697 None
698 }
699
700 (SessionOrdering::Equal, std::cmp::Ordering::Equal) => {
701 info!("Received a megolm room key that we already have, discarding");
703 None
704 }
705
706 (SessionOrdering::Better, std::cmp::Ordering::Less) => {
707 info!("Upgrading a previously-received megolm session with new ratchet");
710 let result = old_session.with_ratchet(&session);
711 result.reset_backup_state();
713 Some(result)
714 }
715
716 (SessionOrdering::Worse, std::cmp::Ordering::Greater) => {
717 info!("Upgrading a previously-received megolm session with new sender data");
720 Some(session.with_ratchet(&old_session))
721 }
722 };
723
724 Ok(result)
725 }
726
727 #[cfg(test)]
728 pub(crate) async fn save_device_data(&self, devices: &[DeviceData]) -> Result<()> {
730 use types::DeviceChanges;
731
732 let changes = Changes {
733 devices: DeviceChanges { changed: devices.to_vec(), ..Default::default() },
734 ..Default::default()
735 };
736
737 self.save_changes(changes).await
738 }
739
740 pub(crate) async fn save_inbound_group_sessions(
742 &self,
743 sessions: &[InboundGroupSession],
744 ) -> Result<()> {
745 let changes = Changes { inbound_group_sessions: sessions.to_vec(), ..Default::default() };
746
747 self.save_changes(changes).await
748 }
749
750 pub(crate) async fn device_display_name(&self) -> Result<Option<String>, CryptoStoreError> {
752 Ok(self
753 .inner
754 .store
755 .get_device(self.user_id(), self.device_id())
756 .await?
757 .and_then(|d| d.display_name().map(|d| d.to_owned())))
758 }
759
760 pub(crate) async fn get_device_data(
765 &self,
766 user_id: &UserId,
767 device_id: &DeviceId,
768 ) -> Result<Option<DeviceData>> {
769 self.inner.store.get_device(user_id, device_id).await
770 }
771
772 pub(crate) async fn get_device_data_for_user_filtered(
780 &self,
781 user_id: &UserId,
782 ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
783 self.inner.store.get_user_devices(user_id).await.map(|mut d| {
784 if user_id == self.user_id() {
785 d.remove(self.device_id());
786 }
787 d
788 })
789 }
790
791 pub(crate) async fn get_device_data_for_user(
800 &self,
801 user_id: &UserId,
802 ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
803 self.inner.store.get_user_devices(user_id).await
804 }
805
806 pub(crate) async fn get_device_from_curve_key(
812 &self,
813 user_id: &UserId,
814 curve_key: Curve25519PublicKey,
815 ) -> Result<Option<Device>> {
816 self.get_user_devices(user_id)
817 .await
818 .map(|d| d.devices().find(|d| d.curve25519_key() == Some(curve_key)))
819 }
820
821 pub(crate) async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices> {
831 let devices = self.get_device_data_for_user(user_id).await?;
832
833 let own_identity = self
834 .inner
835 .store
836 .get_user_identity(self.user_id())
837 .await?
838 .and_then(|i| i.own().cloned());
839 let device_owner_identity = self.inner.store.get_user_identity(user_id).await?;
840
841 Ok(UserDevices {
842 inner: devices,
843 verification_machine: self.inner.verification_machine.clone(),
844 own_identity,
845 device_owner_identity,
846 })
847 }
848
849 pub(crate) async fn get_device(
859 &self,
860 user_id: &UserId,
861 device_id: &DeviceId,
862 ) -> Result<Option<Device>> {
863 if let Some(device_data) = self.inner.store.get_device(user_id, device_id).await? {
864 Ok(Some(self.wrap_device_data(device_data).await?))
865 } else {
866 Ok(None)
867 }
868 }
869
870 pub(crate) async fn wrap_device_data(&self, device_data: DeviceData) -> Result<Device> {
875 let own_identity = self
876 .inner
877 .store
878 .get_user_identity(self.user_id())
879 .await?
880 .and_then(|i| i.own().cloned());
881
882 let device_owner_identity =
883 self.inner.store.get_user_identity(device_data.user_id()).await?;
884
885 Ok(Device {
886 inner: device_data,
887 verification_machine: self.inner.verification_machine.clone(),
888 own_identity,
889 device_owner_identity,
890 })
891 }
892
893 pub(crate) async fn get_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
895 let own_identity = self
896 .inner
897 .store
898 .get_user_identity(self.user_id())
899 .await?
900 .and_then(as_variant!(UserIdentityData::Own));
901
902 Ok(self.inner.store.get_user_identity(user_id).await?.map(|i| {
903 UserIdentity::new(
904 self.clone(),
905 i,
906 self.inner.verification_machine.to_owned(),
907 own_identity,
908 )
909 }))
910 }
911
912 pub async fn export_secret(
921 &self,
922 secret_name: &SecretName,
923 ) -> Result<Option<String>, CryptoStoreError> {
924 Ok(match secret_name {
925 SecretName::CrossSigningMasterKey
926 | SecretName::CrossSigningUserSigningKey
927 | SecretName::CrossSigningSelfSigningKey => {
928 self.inner.identity.lock().await.export_secret(secret_name).await
929 }
930 SecretName::RecoveryKey => {
931 if let Some(key) = self.load_backup_keys().await?.decryption_key {
932 let exported = key.to_base64();
933 Some(exported)
934 } else {
935 None
936 }
937 }
938 name => {
939 warn!(secret = ?name, "Unknown secret was requested");
940 None
941 }
942 })
943 }
944
945 pub async fn export_cross_signing_keys(
953 &self,
954 ) -> Result<Option<CrossSigningKeyExport>, CryptoStoreError> {
955 let master_key = self.export_secret(&SecretName::CrossSigningMasterKey).await?;
956 let self_signing_key = self.export_secret(&SecretName::CrossSigningSelfSigningKey).await?;
957 let user_signing_key = self.export_secret(&SecretName::CrossSigningUserSigningKey).await?;
958
959 Ok(if master_key.is_none() && self_signing_key.is_none() && user_signing_key.is_none() {
960 None
961 } else {
962 Some(CrossSigningKeyExport { master_key, self_signing_key, user_signing_key })
963 })
964 }
965
966 pub async fn import_cross_signing_keys(
971 &self,
972 export: CrossSigningKeyExport,
973 ) -> Result<CrossSigningStatus, SecretImportError> {
974 if let Some(public_identity) =
975 self.get_identity(self.user_id()).await?.and_then(|i| i.own())
976 {
977 let identity = self.inner.identity.lock().await;
978
979 identity
980 .import_secrets(
981 public_identity.to_owned(),
982 export.master_key.as_deref(),
983 export.self_signing_key.as_deref(),
984 export.user_signing_key.as_deref(),
985 )
986 .await?;
987
988 let status = identity.status().await;
989
990 let diff = identity.get_public_identity_diff(&public_identity.inner).await;
991
992 let mut changes =
993 Changes { private_identity: Some(identity.clone()), ..Default::default() };
994
995 if diff.none_differ() {
996 public_identity.mark_as_verified();
997 changes.identities.changed.push(UserIdentityData::Own(public_identity.inner));
998 }
999
1000 info!(?status, "Successfully imported the private cross-signing keys");
1001
1002 self.save_changes(changes).await?;
1003 } else {
1004 warn!(
1005 "No public identity found while importing cross-signing keys, \
1006 a /keys/query needs to be done"
1007 );
1008 }
1009
1010 Ok(self.inner.identity.lock().await.status().await)
1011 }
1012
1013 pub async fn export_secrets_bundle(&self) -> Result<SecretsBundle, SecretsBundleExportError> {
1025 let Some(cross_signing) = self.export_cross_signing_keys().await? else {
1026 return Err(SecretsBundleExportError::MissingCrossSigningKeys);
1027 };
1028
1029 let Some(master_key) = cross_signing.master_key.clone() else {
1030 return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::Master));
1031 };
1032
1033 let Some(user_signing_key) = cross_signing.user_signing_key.clone() else {
1034 return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::UserSigning));
1035 };
1036
1037 let Some(self_signing_key) = cross_signing.self_signing_key.clone() else {
1038 return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::SelfSigning));
1039 };
1040
1041 let backup_keys = self.load_backup_keys().await?;
1042
1043 let backup = if let Some(key) = backup_keys.decryption_key {
1044 if let Some(backup_version) = backup_keys.backup_version {
1045 Some(BackupSecrets::MegolmBackupV1Curve25519AesSha2(
1046 MegolmBackupV1Curve25519AesSha2Secrets { key, backup_version },
1047 ))
1048 } else {
1049 return Err(SecretsBundleExportError::MissingBackupVersion);
1050 }
1051 } else {
1052 None
1053 };
1054
1055 Ok(SecretsBundle {
1056 cross_signing: CrossSigningSecrets { master_key, user_signing_key, self_signing_key },
1057 backup,
1058 })
1059 }
1060
1061 pub async fn import_secrets_bundle(
1074 &self,
1075 bundle: &SecretsBundle,
1076 ) -> Result<(), SecretImportError> {
1077 let mut changes = Changes::default();
1078
1079 if let Some(backup_bundle) = &bundle.backup {
1080 match backup_bundle {
1081 BackupSecrets::MegolmBackupV1Curve25519AesSha2(bundle) => {
1082 changes.backup_decryption_key = Some(bundle.key.clone());
1083 changes.backup_version = Some(bundle.backup_version.clone());
1084 }
1085 }
1086 }
1087
1088 let identity = self.inner.identity.lock().await;
1089
1090 identity
1091 .import_secrets_unchecked(
1092 Some(&bundle.cross_signing.master_key),
1093 Some(&bundle.cross_signing.self_signing_key),
1094 Some(&bundle.cross_signing.user_signing_key),
1095 )
1096 .await?;
1097
1098 let public_identity = identity.to_public_identity().await.expect(
1099 "We should be able to create a new public identity since we just imported \
1100 all the private cross-signing keys",
1101 );
1102
1103 changes.private_identity = Some(identity.clone());
1104 changes.identities.new.push(UserIdentityData::Own(public_identity));
1105
1106 Ok(self.save_changes(changes).await?)
1107 }
1108
1109 pub async fn import_secret(&self, secret: &GossippedSecret) -> Result<(), SecretImportError> {
1111 match &secret.secret_name {
1112 SecretName::CrossSigningMasterKey
1113 | SecretName::CrossSigningUserSigningKey
1114 | SecretName::CrossSigningSelfSigningKey => {
1115 if let Some(public_identity) =
1116 self.get_identity(self.user_id()).await?.and_then(|i| i.own())
1117 {
1118 let identity = self.inner.identity.lock().await;
1119
1120 identity
1121 .import_secret(
1122 public_identity,
1123 &secret.secret_name,
1124 &secret.event.content.secret,
1125 )
1126 .await?;
1127 info!(
1128 secret_name = ?secret.secret_name,
1129 "Successfully imported a private cross signing key"
1130 );
1131
1132 let changes =
1133 Changes { private_identity: Some(identity.clone()), ..Default::default() };
1134
1135 self.save_changes(changes).await?;
1136 }
1137 }
1138 SecretName::RecoveryKey => {
1139 }
1145 name => {
1146 warn!(secret = ?name, "Tried to import an unknown secret");
1147 }
1148 }
1149
1150 Ok(())
1151 }
1152
1153 pub async fn get_only_allow_trusted_devices(&self) -> Result<bool> {
1156 let value = self.get_value("only_allow_trusted_devices").await?.unwrap_or_default();
1157 Ok(value)
1158 }
1159
1160 pub async fn set_only_allow_trusted_devices(
1163 &self,
1164 block_untrusted_devices: bool,
1165 ) -> Result<()> {
1166 self.set_value("only_allow_trusted_devices", &block_untrusted_devices).await
1167 }
1168
1169 pub async fn get_value<T: DeserializeOwned>(&self, key: &str) -> Result<Option<T>> {
1171 let Some(value) = self.get_custom_value(key).await? else {
1172 return Ok(None);
1173 };
1174 let deserialized = self.deserialize_value(&value)?;
1175 Ok(Some(deserialized))
1176 }
1177
1178 pub async fn set_value(&self, key: &str, value: &impl Serialize) -> Result<()> {
1180 let serialized = self.serialize_value(value)?;
1181 self.set_custom_value(key, serialized).await?;
1182 Ok(())
1183 }
1184
1185 fn serialize_value(&self, value: &impl Serialize) -> Result<Vec<u8>> {
1186 let serialized =
1187 rmp_serde::to_vec_named(value).map_err(|x| CryptoStoreError::Backend(x.into()))?;
1188 Ok(serialized)
1189 }
1190
1191 fn deserialize_value<T: DeserializeOwned>(&self, value: &[u8]) -> Result<T> {
1192 let deserialized =
1193 rmp_serde::from_slice(value).map_err(|e| CryptoStoreError::Backend(e.into()))?;
1194 Ok(deserialized)
1195 }
1196
1197 pub fn room_keys_received_stream(
1209 &self,
1210 ) -> impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<> {
1211 self.inner.store.room_keys_received_stream()
1212 }
1213
1214 pub fn room_keys_withheld_received_stream(
1223 &self,
1224 ) -> impl Stream<Item = Vec<RoomKeyWithheldInfo>> + use<> {
1225 self.inner.store.room_keys_withheld_received_stream()
1226 }
1227
1228 pub fn user_identities_stream(&self) -> impl Stream<Item = IdentityUpdates> + use<> {
1259 let verification_machine = self.inner.verification_machine.to_owned();
1260
1261 let this = self.clone();
1262 self.inner.store.identities_stream().map(move |(own_identity, identities, _)| {
1263 let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
1264
1265 let map_identity = |(user_id, identity)| {
1266 (
1267 user_id,
1268 UserIdentity::new(
1269 this.clone(),
1270 identity,
1271 verification_machine.to_owned(),
1272 own_identity.to_owned(),
1273 ),
1274 )
1275 };
1276
1277 let new = new_identities.into_iter().map(map_identity).collect();
1278 let changed = changed_identities.into_iter().map(map_identity).collect();
1279 let unchanged = unchanged_identities.into_iter().map(map_identity).collect();
1280
1281 IdentityUpdates { new, changed, unchanged }
1282 })
1283 }
1284
1285 pub fn devices_stream(&self) -> impl Stream<Item = DeviceUpdates> + use<> {
1317 let verification_machine = self.inner.verification_machine.to_owned();
1318
1319 self.inner.store.identities_stream().map(move |(own_identity, identities, devices)| {
1320 collect_device_updates(
1321 verification_machine.to_owned(),
1322 own_identity,
1323 identities,
1324 devices,
1325 )
1326 })
1327 }
1328
1329 pub fn identities_stream_raw(
1339 &self,
1340 ) -> impl Stream<Item = (IdentityChanges, DeviceChanges)> + use<> {
1341 self.inner.store.identities_stream().map(|(_, identities, devices)| (identities, devices))
1342 }
1343
1344 pub fn create_store_lock(
1347 &self,
1348 lock_key: String,
1349 lock_value: String,
1350 ) -> CrossProcessLock<LockableCryptoStore> {
1351 self.inner.store.create_store_lock(lock_key, lock_value)
1352 }
1353
1354 pub fn secrets_stream(&self) -> impl Stream<Item = GossippedSecret> + use<> {
1394 self.inner.store.secrets_stream()
1395 }
1396
1397 pub fn historic_room_key_stream(&self) -> impl Stream<Item = RoomKeyBundleInfo> + use<> {
1437 self.inner.store.historic_room_key_stream()
1438 }
1439
1440 pub async fn import_room_keys(
1453 &self,
1454 exported_keys: Vec<ExportedRoomKey>,
1455 from_backup_version: Option<&str>,
1456 progress_listener: impl Fn(usize, usize),
1457 ) -> Result<RoomKeyImportResult> {
1458 let exported_keys = exported_keys.iter().filter_map(|key| {
1459 key.try_into()
1460 .map_err(|e| {
1461 warn!(
1462 sender_key = key.sender_key().to_base64(),
1463 room_id = ?key.room_id(),
1464 session_id = key.session_id(),
1465 error = ?e,
1466 "Couldn't import a room key from a file export."
1467 );
1468 })
1469 .ok()
1470 });
1471 self.import_sessions_impl(exported_keys, from_backup_version, progress_listener).await
1472 }
1473
1474 pub async fn import_exported_room_keys(
1501 &self,
1502 exported_keys: Vec<ExportedRoomKey>,
1503 progress_listener: impl Fn(usize, usize),
1504 ) -> Result<RoomKeyImportResult> {
1505 self.import_room_keys(exported_keys, None, progress_listener).await
1506 }
1507
1508 async fn import_sessions_impl(
1509 &self,
1510 sessions: impl Iterator<Item = InboundGroupSession>,
1511 from_backup_version: Option<&str>,
1512 progress_listener: impl Fn(usize, usize),
1513 ) -> Result<RoomKeyImportResult> {
1514 let sessions: Vec<_> = sessions.collect();
1515 let mut imported_sessions = Vec::new();
1516
1517 let total_count = sessions.len();
1518 let mut keys = BTreeMap::new();
1519
1520 for (i, session) in sessions.into_iter().enumerate() {
1521 if let Some(merged) = self.merge_received_group_session(session).await? {
1524 if from_backup_version.is_some() {
1525 merged.mark_as_backed_up();
1526 }
1527
1528 keys.entry(merged.room_id().to_owned())
1529 .or_insert_with(BTreeMap::new)
1530 .entry(merged.sender_key().to_base64())
1531 .or_insert_with(BTreeSet::new)
1532 .insert(merged.session_id().to_owned());
1533
1534 imported_sessions.push(merged);
1535 }
1536
1537 progress_listener(i, total_count);
1538 }
1539
1540 let imported_count = imported_sessions.len();
1541
1542 self.inner
1543 .store
1544 .save_inbound_group_sessions(imported_sessions, from_backup_version)
1545 .await?;
1546
1547 info!(total_count, imported_count, room_keys = ?keys, "Successfully imported room keys");
1548
1549 Ok(RoomKeyImportResult::new(imported_count, total_count, keys))
1550 }
1551
1552 pub(crate) fn crypto_store(&self) -> Arc<CryptoStoreWrapper> {
1553 self.inner.store.clone()
1554 }
1555
1556 pub async fn export_room_keys(
1579 &self,
1580 predicate: impl FnMut(&InboundGroupSession) -> bool,
1581 ) -> Result<Vec<ExportedRoomKey>> {
1582 let mut exported = Vec::new();
1583
1584 let mut sessions = self.get_inbound_group_sessions().await?;
1585 sessions.retain(predicate);
1586
1587 for session in sessions {
1588 let export = session.export().await;
1589 exported.push(export);
1590 }
1591
1592 Ok(exported)
1593 }
1594
1595 pub async fn export_room_keys_stream(
1630 &self,
1631 predicate: impl FnMut(&InboundGroupSession) -> bool,
1632 ) -> Result<impl Stream<Item = ExportedRoomKey>> {
1633 let sessions = self.get_inbound_group_sessions().await?;
1635 Ok(futures_util::stream::iter(sessions.into_iter().filter(predicate))
1636 .then(|session| async move { session.export().await }))
1637 }
1638
1639 pub async fn build_room_key_bundle(
1644 &self,
1645 room_id: &RoomId,
1646 ) -> std::result::Result<RoomKeyBundle, CryptoStoreError> {
1647 let sessions = self.get_inbound_group_sessions_by_room_id(room_id).await?;
1648
1649 let mut bundle = RoomKeyBundle::default();
1650 for session in sessions {
1651 if session.shared_history() {
1652 bundle.room_keys.push(session.export().await.into());
1653 } else {
1654 bundle.withheld.push(RoomKeyWithheldContent::new(
1655 session.algorithm().to_owned(),
1656 WithheldCode::HistoryNotShared,
1657 session.room_id().to_owned(),
1658 session.session_id().to_owned(),
1659 session.sender_key().to_owned(),
1660 self.device_id().to_owned(),
1661 ));
1662 }
1663 }
1664
1665 let withhelds = self.get_withheld_sessions_by_room_id(room_id).await?;
1668 for withheld in withhelds {
1669 if withheld.content.withheld_code() == WithheldCode::HistoryNotShared {
1670 bundle.withheld.push(withheld.content);
1671 }
1672 }
1673
1674 Ok(bundle)
1675 }
1676
1677 #[instrument(skip(self, bundle, progress_listener), fields(bundle_size = bundle.room_keys.len(), sender_data))]
1687 pub async fn receive_room_key_bundle(
1688 &self,
1689 bundle_info: &StoredRoomKeyBundleData,
1690 bundle: RoomKeyBundle,
1691 progress_listener: impl Fn(usize, usize),
1692 ) -> Result<(), CryptoStoreError> {
1693 let sender_data = if bundle_info.sender_data.should_recalculate() {
1694 let device = self
1695 .get_device_from_curve_key(&bundle_info.sender_user, bundle_info.sender_key)
1696 .await?;
1697
1698 device
1699 .as_ref()
1700 .map(SenderData::from_device)
1701 .unwrap_or_else(|| bundle_info.sender_data.clone())
1702 } else {
1703 bundle_info.sender_data.clone()
1704 };
1705
1706 tracing::Span::current().record("sender_data", tracing::field::debug(&sender_data));
1707
1708 let Ok(forwarder_data) = (&sender_data).try_into() else {
1712 warn!(
1713 "Not accepting a historic room key bundle due to insufficient trust in the sender"
1714 );
1715 return Ok(());
1716 };
1717
1718 self.import_room_key_bundle_sessions(
1719 bundle_info,
1720 &bundle,
1721 &forwarder_data,
1722 progress_listener,
1723 )
1724 .await?;
1725 self.import_room_key_bundle_withheld_info(bundle_info, &bundle).await?;
1726
1727 Ok(())
1728 }
1729
1730 async fn import_room_key_bundle_sessions(
1731 &self,
1732 bundle_info: &StoredRoomKeyBundleData,
1733 bundle: &RoomKeyBundle,
1734 forwarder_data: &ForwarderData,
1735 progress_listener: impl Fn(usize, usize),
1736 ) -> Result<(), CryptoStoreError> {
1737 let (good, bad): (Vec<_>, Vec<_>) = bundle.room_keys.iter().partition_map(|key| {
1738 if key.room_id != bundle_info.bundle_data.room_id {
1739 trace!("Ignoring key for incorrect room {} in bundle", key.room_id);
1740 Either::Right(key)
1741 } else {
1742 Either::Left(key)
1743 }
1744 });
1745
1746 match (bad.is_empty(), good.is_empty()) {
1747 (true, true) => {
1749 warn!("Received a completely empty room key bundle");
1750 }
1751
1752 (false, true) => {
1754 let bad_keys: Vec<_> =
1755 bad.iter().map(|&key| (&key.room_id, &key.session_id)).collect();
1756
1757 warn!(
1758 ?bad_keys,
1759 "Received a room key bundle for the wrong room, ignoring all room keys from the bundle"
1760 );
1761 }
1762
1763 (_, false) => {
1765 if !bad.is_empty() {
1768 warn!(
1769 bad_key_count = bad.len(),
1770 "The room key bundle contained some room keys \
1771 that were meant for a different room"
1772 );
1773 }
1774
1775 let keys = good.iter().filter_map(|key| {
1776 key.try_into_inbound_group_session(forwarder_data)
1777 .map_err(|e| {
1778 warn!(
1779 sender_key = ?key.sender_key().to_base64(),
1780 room_id = ?key.room_id(),
1781 session_id = key.session_id(),
1782 error = ?e,
1783 "Couldn't import a room key from a key bundle."
1784 );
1785 })
1786 .ok()
1787 });
1788
1789 self.import_sessions_impl(keys, None, progress_listener).await?;
1790 }
1791 }
1792
1793 Ok(())
1794 }
1795
1796 async fn import_room_key_bundle_withheld_info(
1797 &self,
1798 bundle_info: &StoredRoomKeyBundleData,
1799 bundle: &RoomKeyBundle,
1800 ) -> Result<(), CryptoStoreError> {
1801 let mut session_id_to_withheld_code_map = BTreeMap::new();
1802
1803 let mut changes = Changes::default();
1804 for withheld in &bundle.withheld {
1805 let (room_id, session_id) = match withheld {
1806 RoomKeyWithheldContent::MegolmV1AesSha2(c) => match (c.room_id(), c.session_id()) {
1807 (Some(room_id), Some(session_id)) => (room_id, session_id),
1808 _ => continue,
1809 },
1810 #[cfg(feature = "experimental-algorithms")]
1811 RoomKeyWithheldContent::MegolmV2AesSha2(c) => match (c.room_id(), c.session_id()) {
1812 (Some(room_id), Some(session_id)) => (room_id, session_id),
1813 _ => continue,
1814 },
1815 RoomKeyWithheldContent::Unknown(_) => continue,
1816 };
1817
1818 if room_id != bundle_info.bundle_data.room_id {
1819 trace!("Ignoring withheld info for incorrect room {} in bundle", room_id);
1820 continue;
1821 }
1822
1823 changes.withheld_session_info.entry(room_id.to_owned()).or_default().insert(
1824 session_id.to_owned(),
1825 RoomKeyWithheldEntry {
1826 sender: bundle_info.sender_user.clone(),
1827 content: withheld.to_owned(),
1828 },
1829 );
1830 session_id_to_withheld_code_map.insert(session_id, withheld.withheld_code());
1831 }
1832
1833 self.save_changes(changes).await?;
1834
1835 info!(
1836 room_id = ?bundle_info.bundle_data.room_id,
1837 ?session_id_to_withheld_code_map,
1838 "Successfully imported withheld info from room key bundle",
1839 );
1840
1841 Ok(())
1842 }
1843}
1844
1845impl Deref for Store {
1846 type Target = DynCryptoStore;
1847
1848 fn deref(&self) -> &Self::Target {
1849 self.inner.store.deref().deref()
1850 }
1851}
1852
1853#[derive(Clone, Debug)]
1855pub struct LockableCryptoStore(Arc<dyn CryptoStore<Error = CryptoStoreError>>);
1856
1857impl matrix_sdk_common::cross_process_lock::TryLock for LockableCryptoStore {
1858 type LockError = CryptoStoreError;
1859
1860 async fn try_lock(
1861 &self,
1862 lease_duration_ms: u32,
1863 key: &str,
1864 holder: &str,
1865 ) -> std::result::Result<Option<CrossProcessLockGeneration>, Self::LockError> {
1866 self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
1867 }
1868}
1869
1870#[cfg(test)]
1871mod tests {
1872 use std::{collections::BTreeMap, pin::pin};
1873
1874 use assert_matches2::{assert_let, assert_matches};
1875 use futures_util::StreamExt;
1876 use insta::{_macro_support::Content, assert_json_snapshot, internals::ContentPath};
1877 use matrix_sdk_test::async_test;
1878 use ruma::{
1879 RoomId, device_id,
1880 events::room::{EncryptedFileInit, JsonWebKeyInit},
1881 owned_device_id, owned_mxc_uri, room_id,
1882 serde::Base64,
1883 user_id,
1884 };
1885 use serde_json::json;
1886 use vodozemac::{Ed25519Keypair, megolm::SessionKey};
1887
1888 use crate::{
1889 Account, OlmMachine,
1890 machine::test_helpers::get_machine_pair,
1891 olm::{InboundGroupSession, SenderData},
1892 store::types::{DehydratedDeviceKey, RoomKeyWithheldEntry, StoredRoomKeyBundleData},
1893 types::{
1894 EventEncryptionAlgorithm,
1895 events::{
1896 room_key_bundle::RoomKeyBundleContent,
1897 room_key_withheld::{MegolmV1AesSha2WithheldContent, RoomKeyWithheldContent},
1898 },
1899 },
1900 };
1901
1902 #[async_test]
1903 async fn test_merge_received_group_session() {
1904 let alice_account = Account::with_device_id(user_id!("@a:s.co"), device_id!("ABC"));
1905 let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("DEF")).await;
1906
1907 let room_id = room_id!("!test:localhost");
1908
1909 let megolm_signing_key = Ed25519Keypair::new();
1910 let inbound = make_inbound_group_session(&alice_account, &megolm_signing_key, room_id);
1911
1912 let mut inbound_at_index_5 =
1914 InboundGroupSession::from_export(&inbound.export_at_index(5).await).unwrap();
1915 inbound_at_index_5.sender_data = inbound.sender_data.clone();
1916 bob.store().save_inbound_group_sessions(&[inbound_at_index_5.clone()]).await.unwrap();
1917
1918 let disconnected = make_inbound_group_session(&alice_account, &megolm_signing_key, room_id);
1920 assert_eq!(bob.store().merge_received_group_session(disconnected).await.unwrap(), None);
1921
1922 let mut worse =
1924 InboundGroupSession::from_export(&inbound.export_at_index(10).await).unwrap();
1925 worse.sender_data = inbound.sender_data.clone();
1926 assert_eq!(bob.store().merge_received_group_session(worse).await.unwrap(), None);
1927
1928 let mut copy = InboundGroupSession::from_pickle(inbound_at_index_5.pickle().await).unwrap();
1930 copy.sender_data = inbound.sender_data.clone();
1931 assert_eq!(bob.store().merge_received_group_session(copy).await.unwrap(), None);
1932
1933 let mut better =
1935 InboundGroupSession::from_export(&inbound.export_at_index(0).await).unwrap();
1936 better.sender_data = inbound.sender_data.clone();
1937 assert_let!(Some(update) = bob.store().merge_received_group_session(better).await.unwrap());
1938 assert_eq!(update.first_known_index(), 0);
1939
1940 {
1942 let mut worse_ratchet_better_trust =
1943 InboundGroupSession::from_export(&inbound.export_at_index(10).await).unwrap();
1944 let updated_sender_data = SenderData::sender_verified(
1945 alice_account.user_id(),
1946 alice_account.device_id(),
1947 Ed25519Keypair::new().public_key(),
1948 );
1949 worse_ratchet_better_trust.sender_data = updated_sender_data.clone();
1950 assert_let!(
1951 Some(update) = bob
1952 .store()
1953 .merge_received_group_session(worse_ratchet_better_trust)
1954 .await
1955 .unwrap()
1956 );
1957 assert_eq!(update.sender_data, updated_sender_data);
1958 assert_eq!(update.first_known_index(), 5);
1959 assert_eq!(
1960 update.export_at_index(0).await.session_key.to_bytes(),
1961 inbound.export_at_index(5).await.session_key.to_bytes()
1962 );
1963 }
1964
1965 {
1967 let mut better_ratchet_worse_trust =
1968 InboundGroupSession::from_export(&inbound.export_at_index(0).await).unwrap();
1969 let updated_sender_data = SenderData::unknown();
1970 better_ratchet_worse_trust.sender_data = updated_sender_data.clone();
1971 assert_let!(
1972 Some(update) = bob
1973 .store()
1974 .merge_received_group_session(better_ratchet_worse_trust)
1975 .await
1976 .unwrap()
1977 );
1978 assert_eq!(update.sender_data, inbound.sender_data);
1979 assert_eq!(update.first_known_index(), 0);
1980 assert_eq!(
1981 update.export_at_index(0).await.session_key.to_bytes(),
1982 inbound.export_at_index(0).await.session_key.to_bytes()
1983 );
1984 }
1985 }
1986
1987 fn make_inbound_group_session(
1990 sender_account: &Account,
1991 signing_key: &Ed25519Keypair,
1992 room_id: &RoomId,
1993 ) -> InboundGroupSession {
1994 InboundGroupSession::new(
1995 sender_account.identity_keys.curve25519,
1996 sender_account.identity_keys.ed25519,
1997 room_id,
1998 &make_session_key(signing_key),
1999 SenderData::device_info(crate::types::DeviceKeys::new(
2000 sender_account.user_id().to_owned(),
2001 sender_account.device_id().to_owned(),
2002 vec![],
2003 BTreeMap::new(),
2004 crate::types::Signatures::new(),
2005 )),
2006 None,
2007 EventEncryptionAlgorithm::MegolmV1AesSha2,
2008 Some(ruma::events::room::history_visibility::HistoryVisibility::Shared),
2009 true,
2010 )
2011 .unwrap()
2012 }
2013
2014 fn make_session_key(signing_key: &Ed25519Keypair) -> SessionKey {
2017 use rand::Rng;
2018
2019 let mut session_key_bytes = vec![0u8; 229];
2024 session_key_bytes[0] = 2;
2026 rand::thread_rng().fill(&mut session_key_bytes[5..133]);
2029 session_key_bytes[133..165].copy_from_slice(signing_key.public_key().as_bytes());
2031 let sig = signing_key.sign(&session_key_bytes[0..165]);
2033 session_key_bytes[165..229].copy_from_slice(&sig.to_bytes());
2034
2035 SessionKey::from_bytes(&session_key_bytes).unwrap()
2036 }
2037
2038 #[async_test]
2039 async fn test_import_room_keys_notifies_stream() {
2040 use futures_util::FutureExt;
2041
2042 let (alice, bob, _) =
2043 get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2044
2045 let room1_id = room_id!("!room1:localhost");
2046 alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2047 let exported_sessions = alice.store().export_room_keys(|_| true).await.unwrap();
2048
2049 let mut room_keys_received_stream = Box::pin(bob.store().room_keys_received_stream());
2050 bob.store().import_room_keys(exported_sessions, None, |_, _| {}).await.unwrap();
2051
2052 let room_keys = room_keys_received_stream
2053 .next()
2054 .now_or_never()
2055 .flatten()
2056 .expect("We should have received an update of room key infos")
2057 .unwrap();
2058 assert_eq!(room_keys.len(), 1);
2059 assert_eq!(room_keys[0].room_id, "!room1:localhost");
2060 }
2061
2062 #[async_test]
2063 async fn test_export_room_keys_provides_selected_keys() {
2064 let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2066 let room1_id = room_id!("!room1:localhost");
2067 let room2_id = room_id!("!room2:localhost");
2068 let room3_id = room_id!("!room3:localhost");
2069 alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2070 alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2071 alice.create_outbound_group_session_with_defaults_test_helper(room3_id).await.unwrap();
2072
2073 let keys = alice
2075 .store()
2076 .export_room_keys(|s| s.room_id() == room2_id || s.room_id() == room3_id)
2077 .await
2078 .unwrap();
2079
2080 assert_eq!(keys.len(), 2);
2082 assert_eq!(keys[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2083 assert_eq!(keys[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2084 assert_eq!(keys[0].room_id, "!room2:localhost");
2085 assert_eq!(keys[1].room_id, "!room3:localhost");
2086 assert_eq!(keys[0].session_key.to_base64().len(), 220);
2087 assert_eq!(keys[1].session_key.to_base64().len(), 220);
2088 }
2089
2090 #[async_test]
2091 async fn test_export_room_keys_stream_can_provide_all_keys() {
2092 let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2094 let room1_id = room_id!("!room1:localhost");
2095 let room2_id = room_id!("!room2:localhost");
2096 alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2097 alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2098
2099 let mut keys = pin!(alice.store().export_room_keys_stream(|_| true).await.unwrap());
2101
2102 let mut collected = vec![];
2104 while let Some(key) = keys.next().await {
2105 collected.push(key);
2106 }
2107
2108 assert_eq!(collected.len(), 2);
2110 assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2111 assert_eq!(collected[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2112 assert_eq!(collected[0].room_id, "!room1:localhost");
2113 assert_eq!(collected[1].room_id, "!room2:localhost");
2114 assert_eq!(collected[0].session_key.to_base64().len(), 220);
2115 assert_eq!(collected[1].session_key.to_base64().len(), 220);
2116 }
2117
2118 #[async_test]
2119 async fn test_export_room_keys_stream_can_provide_a_subset_of_keys() {
2120 let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2122 let room1_id = room_id!("!room1:localhost");
2123 let room2_id = room_id!("!room2:localhost");
2124 alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2125 alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2126
2127 let mut keys =
2129 pin!(alice.store().export_room_keys_stream(|s| s.room_id() == room1_id).await.unwrap());
2130
2131 let mut collected = vec![];
2133 while let Some(key) = keys.next().await {
2134 collected.push(key);
2135 }
2136
2137 assert_eq!(collected.len(), 1);
2139 assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2140 assert_eq!(collected[0].room_id, "!room1:localhost");
2141 assert_eq!(collected[0].session_key.to_base64().len(), 220);
2142 }
2143
2144 #[async_test]
2145 async fn test_export_secrets_bundle() {
2146 let user_id = user_id!("@alice:example.com");
2147 let (first, second, _) = get_machine_pair(user_id, user_id, false).await;
2148
2149 let _ = first
2150 .bootstrap_cross_signing(false)
2151 .await
2152 .expect("We should be able to bootstrap cross-signing");
2153
2154 let bundle = first.store().export_secrets_bundle().await.expect(
2155 "We should be able to export the secrets bundle, now that we \
2156 have the cross-signing keys",
2157 );
2158
2159 assert!(bundle.backup.is_none(), "The bundle should not contain a backup key");
2160
2161 second
2162 .store()
2163 .import_secrets_bundle(&bundle)
2164 .await
2165 .expect("We should be able to import the secrets bundle");
2166
2167 let status = second.cross_signing_status().await;
2168 let identity = second.get_identity(user_id, None).await.unwrap().unwrap().own().unwrap();
2169
2170 assert!(identity.is_verified(), "The public identity should be marked as verified.");
2171
2172 assert!(status.is_complete(), "We should have imported all the cross-signing keys");
2173 }
2174
2175 #[async_test]
2176 async fn test_create_dehydrated_device_key() {
2177 let pickle_key = DehydratedDeviceKey::new()
2178 .expect("Should be able to create a random dehydrated device key");
2179
2180 let to_vec = pickle_key.inner.to_vec();
2181 let pickle_key_from_slice = DehydratedDeviceKey::from_slice(to_vec.as_slice())
2182 .expect("Should be able to create a dehydrated device key from slice");
2183
2184 assert_eq!(pickle_key_from_slice.to_base64(), pickle_key.to_base64());
2185 }
2186
2187 #[async_test]
2188 async fn test_create_dehydrated_errors() {
2189 let too_small = [0u8; 22];
2190 let pickle_key = DehydratedDeviceKey::from_slice(&too_small);
2191
2192 assert!(pickle_key.is_err());
2193
2194 let too_big = [0u8; 40];
2195 let pickle_key = DehydratedDeviceKey::from_slice(&too_big);
2196
2197 assert!(pickle_key.is_err());
2198 }
2199
2200 #[async_test]
2201 async fn test_build_room_key_bundle() {
2202 let alice = OlmMachine::new(user_id!("@a:s.co"), device_id!("ALICE")).await;
2205 let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("BOB")).await;
2206
2207 let room1_id = room_id!("!room1:localhost");
2208 let room2_id = room_id!("!room2:localhost");
2209
2210 let session_key1 = "AgAAAAC2XHVzsMBKs4QCRElJ92CJKyGtknCSC8HY7cQ7UYwndMKLQAejXLh5UA0l6s736mgctcUMNvELScUWrObdflrHo+vth/gWreXOaCnaSxmyjjKErQwyIYTkUfqbHy40RJfEesLwnN23on9XAkch/iy8R2+Jz7B8zfG01f2Ow2SxPQFnAndcO1ZSD2GmXgedy6n4B20MWI1jGP2wiexOWbFSya8DO/VxC9m5+/mF+WwYqdpKn9g4Y05Yw4uz7cdjTc3rXm7xK+8E7hI//5QD1nHPvuKYbjjM9u2JSL+Bzp61Cw";
2215 let session_key2 = "AgAAAAC1BXreFTUQQSBGekTEuYxhdytRKyv4JgDGcG+VOBYdPNGgs807SdibCGJky4lJ3I+7ZDGHoUzZPZP/4ogGu4kxni0PWdtWuN7+5zsuamgoFF/BkaGeUUGv6kgIkx8pyPpM5SASTUEP9bN2loDSpUPYwfiIqz74DgC4WQ4435sTBctYvKz8n+TDJwdLXpyT6zKljuqADAioud+s/iqx9LYn9HpbBfezZcvbg67GtE113pLrvde3IcPI5s6dNHK2onGO2B2eoaobcen18bbEDnlUGPeIivArLya7Da6us14jBQ";
2216 let session_key3 = "AgAAAAAM9KFsliaUUhGSXgwOzM5UemjkNH4n8NHgvC/y8hhw13zTF+ooGD4uIYEXYX630oNvQm/EvgZo+dkoc0re+vsqsx4sQeNODdSjcBsWOa0oDF+irQn9oYoLUDPI1IBtY1rX+FV99Zm/xnG7uFOX7aTVlko2GSdejy1w9mfobmfxu5aUc04A9zaKJP1pOthZvRAlhpymGYHgsDtWPrrjyc/yypMflE4kIUEEEtu1kT6mrAmcl615XYRAHYK9G2+fZsGvokwzbkl4nulGwcZMpQEoM0nD2o3GWgX81HW3nGfKBg";
2217 let session_key4 = "AgAAAAA4Kkesxq2h4v9PLD6Sm3Smxspz1PXTqytQPCMQMkkrHNmzV2bHlJ+6/Al9cu8vh1Oj69AK0WUAeJOJuaiskEeg/PI3P03+UYLeC379RzgqwSHdBgdQ41G2vD6zpgmE/8vYToe+qpCZACtPOswZxyqxHH+T/Iq0nv13JmlFGIeA6fEPfr5Y28B49viG74Fs9rxV9EH5PfjbuPM/p+Sz5obShuaBPKQBX1jT913nEXPoIJ06exNZGr0285nw/LgVvNlmWmbqNnbzO2cNZjQWA+xZYz5FSfyCxwqEBbEdUCuRCQ";
2218
2219 let sessions = [
2220 create_inbound_group_session_with_visibility(
2221 &alice,
2222 room1_id,
2223 &SessionKey::from_base64(session_key1).unwrap(),
2224 true,
2225 ),
2226 create_inbound_group_session_with_visibility(
2227 &alice,
2228 room1_id,
2229 &SessionKey::from_base64(session_key2).unwrap(),
2230 true,
2231 ),
2232 create_inbound_group_session_with_visibility(
2233 &alice,
2234 room1_id,
2235 &SessionKey::from_base64(session_key3).unwrap(),
2236 false,
2237 ),
2238 create_inbound_group_session_with_visibility(
2239 &alice,
2240 room2_id,
2241 &SessionKey::from_base64(session_key4).unwrap(),
2242 true,
2243 ),
2244 ];
2245 bob.store().save_inbound_group_sessions(&sessions).await.unwrap();
2246
2247 let mut bundle = bob.store().build_room_key_bundle(room1_id).await.unwrap();
2249
2250 bundle.room_keys.sort_by_key(|session| session.session_id.clone());
2254
2255 let algorithm = if cfg!(feature = "experimental-algorithms") {
2257 "m.megolm.v2.aes-sha2"
2258 } else {
2259 "m.megolm.v1.aes-sha2"
2260 };
2261 let map_algorithm = move |value: Content, _path: ContentPath<'_>| {
2262 assert_eq!(value.as_str().unwrap(), algorithm);
2263 "[algorithm]"
2264 };
2265
2266 let alice_curve_key = alice.identity_keys().curve25519.to_base64();
2268 let map_alice_curve_key = move |value: Content, _path: ContentPath<'_>| {
2269 assert_eq!(value.as_str().unwrap(), alice_curve_key);
2270 "[alice curve key]"
2271 };
2272 let alice_ed25519_key = alice.identity_keys().ed25519.to_base64();
2273 let map_alice_ed25519_key = move |value: Content, _path: ContentPath<'_>| {
2274 assert_eq!(value.as_str().unwrap(), alice_ed25519_key);
2275 "[alice ed25519 key]"
2276 };
2277
2278 insta::with_settings!({ sort_maps => true }, {
2279 assert_json_snapshot!(bundle, {
2280 ".withheld[].algorithm" => insta::dynamic_redaction(map_algorithm),
2281 ".room_keys[].algorithm" => insta::dynamic_redaction(map_algorithm),
2282 ".room_keys[].sender_key" => insta::dynamic_redaction(map_alice_curve_key.clone()),
2283 ".withheld[].sender_key" => insta::dynamic_redaction(map_alice_curve_key),
2284 ".room_keys[].sender_claimed_keys.ed25519" => insta::dynamic_redaction(map_alice_ed25519_key),
2285 });
2286 });
2287 }
2288
2289 #[async_test]
2290 async fn test_receive_room_key_bundle() {
2291 let alice = OlmMachine::new(user_id!("@a:s.co"), device_id!("ALICE")).await;
2292 let alice_key = alice.identity_keys().curve25519;
2293 let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("BOB")).await;
2294
2295 let room_id = room_id!("!room1:localhost");
2296
2297 let session_key1 = "AgAAAAC2XHVzsMBKs4QCRElJ92CJKyGtknCSC8HY7cQ7UYwndMKLQAejXLh5UA0l6s736mgctcUMNvELScUWrObdflrHo+vth/gWreXOaCnaSxmyjjKErQwyIYTkUfqbHy40RJfEesLwnN23on9XAkch/iy8R2+Jz7B8zfG01f2Ow2SxPQFnAndcO1ZSD2GmXgedy6n4B20MWI1jGP2wiexOWbFSya8DO/VxC9m5+/mF+WwYqdpKn9g4Y05Yw4uz7cdjTc3rXm7xK+8E7hI//5QD1nHPvuKYbjjM9u2JSL+Bzp61Cw";
2298 let session_key2 = "AgAAAAC1BXreFTUQQSBGekTEuYxhdytRKyv4JgDGcG+VOBYdPNGgs807SdibCGJky4lJ3I+7ZDGHoUzZPZP/4ogGu4kxni0PWdtWuN7+5zsuamgoFF/BkaGeUUGv6kgIkx8pyPpM5SASTUEP9bN2loDSpUPYwfiIqz74DgC4WQ4435sTBctYvKz8n+TDJwdLXpyT6zKljuqADAioud+s/iqx9LYn9HpbBfezZcvbg67GtE113pLrvde3IcPI5s6dNHK2onGO2B2eoaobcen18bbEDnlUGPeIivArLya7Da6us14jBQ";
2299
2300 let sessions = [
2301 create_inbound_group_session_with_visibility(
2302 &alice,
2303 room_id,
2304 &SessionKey::from_base64(session_key1).unwrap(),
2305 true,
2306 ),
2307 create_inbound_group_session_with_visibility(
2308 &alice,
2309 room_id,
2310 &SessionKey::from_base64(session_key2).unwrap(),
2311 false,
2312 ),
2313 ];
2314
2315 alice.store().save_inbound_group_sessions(&sessions).await.unwrap();
2316 let bundle = alice.store().build_room_key_bundle(room_id).await.unwrap();
2317
2318 bob.store()
2319 .receive_room_key_bundle(
2320 &StoredRoomKeyBundleData {
2321 sender_user: alice.user_id().to_owned(),
2322 sender_key: alice_key,
2323 sender_data: SenderData::sender_verified(
2324 alice.user_id(),
2325 device_id!("ALICE"),
2326 alice.identity_keys().ed25519,
2327 ),
2328
2329 bundle_data: RoomKeyBundleContent {
2330 room_id: room_id.to_owned(),
2331 file: EncryptedFileInit {
2334 url: owned_mxc_uri!("mxc://example.com/0"),
2335 key: JsonWebKeyInit {
2336 kty: "oct".to_owned(),
2337 key_ops: vec!["encrypt".to_owned(), "decrypt".to_owned()],
2338 alg: "A256CTR.".to_owned(),
2339 k: Base64::new(vec![0u8; 128]),
2340 ext: true,
2341 }
2342 .into(),
2343 iv: Base64::new(vec![0u8; 128]),
2344 hashes: vec![("sha256".to_owned(), Base64::new(vec![0u8; 128]))]
2345 .into_iter()
2346 .collect(),
2347 v: "v2".to_owned(),
2348 }
2349 .into(),
2350 },
2351 },
2352 bundle,
2353 |_, _| {},
2354 )
2355 .await
2356 .unwrap();
2357
2358 let imported_sessions =
2360 bob.store().get_inbound_group_sessions_by_room_id(room_id).await.unwrap();
2361
2362 assert_eq!(imported_sessions.len(), 1);
2363 assert_eq!(imported_sessions[0].room_id(), room_id);
2364
2365 assert_eq!(
2367 imported_sessions[0]
2368 .forwarder_data
2369 .as_ref()
2370 .expect("Session should contain forwarder data.")
2371 .user_id(),
2372 alice.user_id()
2373 );
2374
2375 assert_matches!(
2376 bob.store()
2377 .get_withheld_info(room_id, sessions[1].session_id())
2378 .await
2379 .unwrap()
2380 .expect("Withheld info should be present in the store."),
2381 RoomKeyWithheldEntry {
2382 #[cfg(not(feature = "experimental-algorithms"))]
2383 content: RoomKeyWithheldContent::MegolmV1AesSha2(
2384 MegolmV1AesSha2WithheldContent::HistoryNotShared(_)
2385 ),
2386 #[cfg(feature = "experimental-algorithms")]
2387 content: RoomKeyWithheldContent::MegolmV2AesSha2(
2388 MegolmV1AesSha2WithheldContent::HistoryNotShared(_)
2389 ),
2390 ..
2391 }
2392 );
2393 }
2394
2395 #[async_test]
2401 async fn test_deserialize_room_key_withheld_entry_from_to_device_event() {
2402 let entry: RoomKeyWithheldEntry = serde_json::from_value(json!(
2403 {
2404 "content": {
2405 "algorithm": "m.megolm.v1.aes-sha2",
2406 "code": "m.unauthorised",
2407 "from_device": "ALICE",
2408 "reason": "You are not authorised to read the message.",
2409 "room_id": "!roomid:s.co",
2410 "sender_key": "7hIcOrEroXYdzjtCBvBjUiqvT0Me7g+ymeXqoc65RS0",
2411 "session_id": "session123"
2412 },
2413 "sender": "@alice:s.co",
2414 "type": "m.room_key.withheld"
2415 }
2416 ))
2417 .unwrap();
2418
2419 assert_matches!(
2420 entry,
2421 RoomKeyWithheldEntry {
2422 sender,
2423 content: RoomKeyWithheldContent::MegolmV1AesSha2(
2424 MegolmV1AesSha2WithheldContent::Unauthorised(withheld_content,)
2425 ),
2426 }
2427 );
2428
2429 assert_eq!(sender, "@alice:s.co");
2430 assert_eq!(withheld_content.room_id, "!roomid:s.co");
2431 assert_eq!(withheld_content.session_id, "session123");
2432 assert_eq!(
2433 withheld_content.sender_key.to_base64(),
2434 "7hIcOrEroXYdzjtCBvBjUiqvT0Me7g+ymeXqoc65RS0"
2435 );
2436 assert_eq!(withheld_content.from_device, Some(owned_device_id!("ALICE")));
2437 }
2438
2439 fn create_inbound_group_session_with_visibility(
2450 olm_machine: &OlmMachine,
2451 room_id: &RoomId,
2452 session_key: &SessionKey,
2453 shared_history: bool,
2454 ) -> InboundGroupSession {
2455 let identity_keys = &olm_machine.store().static_account().identity_keys;
2456 InboundGroupSession::new(
2457 identity_keys.curve25519,
2458 identity_keys.ed25519,
2459 room_id,
2460 session_key,
2461 SenderData::unknown(),
2462 None,
2463 #[cfg(not(feature = "experimental-algorithms"))]
2464 EventEncryptionAlgorithm::MegolmV1AesSha2,
2465 #[cfg(feature = "experimental-algorithms")]
2466 EventEncryptionAlgorithm::MegolmV2AesSha2,
2467 None,
2468 shared_history,
2469 )
2470 .unwrap()
2471 }
2472}