1use std::{
42 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
43 fmt::Debug,
44 ops::Deref,
45 pin::pin,
46 sync::{atomic::Ordering, Arc},
47 time::Duration,
48};
49
50use as_variant::as_variant;
51use futures_core::Stream;
52use futures_util::StreamExt;
53use itertools::{Either, Itertools};
54use ruma::{
55 encryption::KeyUsage, events::secret::request::SecretName, DeviceId, OwnedDeviceId,
56 OwnedUserId, RoomId, UserId,
57};
58use serde::{de::DeserializeOwned, Serialize};
59use thiserror::Error;
60use tokio::sync::{Mutex, Notify, OwnedRwLockWriteGuard, RwLock};
61use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
62use tracing::{info, instrument, trace, warn};
63use types::{RoomKeyBundleInfo, StoredRoomKeyBundleData};
64use vodozemac::{megolm::SessionOrdering, Curve25519PublicKey};
65
66use self::types::{
67 Changes, CrossSigningKeyExport, DeviceChanges, DeviceUpdates, IdentityChanges, IdentityUpdates,
68 PendingChanges, RoomKeyInfo, RoomKeyWithheldInfo, UserKeyQueryResult,
69};
70#[cfg(doc)]
71use crate::{backups::BackupMachine, identities::OwnUserIdentity};
72use crate::{
73 gossiping::GossippedSecret,
74 identities::{user::UserIdentity, Device, DeviceData, UserDevices, UserIdentityData},
75 olm::{
76 Account, ExportedRoomKey, InboundGroupSession, PrivateCrossSigningIdentity, SenderData,
77 Session, StaticAccountData,
78 },
79 types::{
80 BackupSecrets, CrossSigningSecrets, MegolmBackupV1Curve25519AesSha2Secrets, RoomKeyExport,
81 SecretsBundle,
82 },
83 verification::VerificationMachine,
84 CrossSigningStatus, OwnUserIdentityData, RoomKeyImportResult,
85};
86
87pub mod caches;
88mod crypto_store_wrapper;
89mod error;
90mod memorystore;
91mod traits;
92pub mod types;
93
94#[cfg(any(test, feature = "testing"))]
95#[macro_use]
96#[allow(missing_docs)]
97pub mod integration_tests;
98
99pub(crate) use crypto_store_wrapper::CryptoStoreWrapper;
100pub use error::{CryptoStoreError, Result};
101use matrix_sdk_common::{
102 cross_process_lock::CrossProcessLock, deserialized_responses::WithheldCode, timeout::timeout,
103};
104pub use memorystore::MemoryStore;
105pub use traits::{CryptoStore, DynCryptoStore, IntoCryptoStore};
106
107use self::caches::{SequenceNumber, StoreCache, StoreCacheGuard, UsersForKeyQuery};
108use crate::types::{
109 events::room_key_withheld::RoomKeyWithheldContent, room_history::RoomKeyBundle,
110};
111pub use crate::{
112 dehydrated_devices::DehydrationError,
113 gossiping::{GossipRequest, SecretInfo},
114};
115
116#[derive(Debug, Clone)]
123pub struct Store {
124 inner: Arc<StoreInner>,
125}
126
127#[derive(Debug, Default)]
128pub(crate) struct KeyQueryManager {
129 users_for_key_query: Mutex<UsersForKeyQuery>,
131
132 users_for_key_query_notify: Notify,
134}
135
136impl KeyQueryManager {
137 pub async fn synced<'a>(&'a self, cache: &'a StoreCache) -> Result<SyncedKeyQueryManager<'a>> {
138 self.ensure_sync_tracked_users(cache).await?;
139 Ok(SyncedKeyQueryManager { cache, manager: self })
140 }
141
142 async fn ensure_sync_tracked_users(&self, cache: &StoreCache) -> Result<()> {
149 let loaded = cache.loaded_tracked_users.read().await;
151 if *loaded {
152 return Ok(());
153 }
154
155 drop(loaded);
157 let mut loaded = cache.loaded_tracked_users.write().await;
158
159 if *loaded {
163 return Ok(());
164 }
165
166 let tracked_users = cache.store.load_tracked_users().await?;
167
168 let mut query_users_lock = self.users_for_key_query.lock().await;
169 let mut tracked_users_cache = cache.tracked_users.write();
170 for user in tracked_users {
171 tracked_users_cache.insert(user.user_id.to_owned());
172
173 if user.dirty {
174 query_users_lock.insert_user(&user.user_id);
175 }
176 }
177
178 *loaded = true;
179
180 Ok(())
181 }
182
183 pub async fn wait_if_user_key_query_pending(
193 &self,
194 cache: StoreCacheGuard,
195 timeout_duration: Duration,
196 user: &UserId,
197 ) -> Result<UserKeyQueryResult> {
198 {
199 self.ensure_sync_tracked_users(&cache).await?;
202 drop(cache);
203 }
204
205 let mut users_for_key_query = self.users_for_key_query.lock().await;
206 let Some(waiter) = users_for_key_query.maybe_register_waiting_task(user) else {
207 return Ok(UserKeyQueryResult::WasNotPending);
208 };
209
210 let wait_for_completion = async {
211 while !waiter.completed.load(Ordering::Relaxed) {
212 let mut notified = pin!(self.users_for_key_query_notify.notified());
216 notified.as_mut().enable();
217 drop(users_for_key_query);
218
219 notified.await;
221
222 users_for_key_query = self.users_for_key_query.lock().await;
226 }
227 };
228
229 match timeout(Box::pin(wait_for_completion), timeout_duration).await {
230 Err(_) => {
231 warn!(
232 user_id = ?user,
233 "The user has a pending `/keys/query` request which did \
234 not finish yet, some devices might be missing."
235 );
236
237 Ok(UserKeyQueryResult::TimeoutExpired)
238 }
239 _ => Ok(UserKeyQueryResult::WasPending),
240 }
241 }
242}
243
244pub(crate) struct SyncedKeyQueryManager<'a> {
245 cache: &'a StoreCache,
246 manager: &'a KeyQueryManager,
247}
248
249impl SyncedKeyQueryManager<'_> {
250 pub async fn update_tracked_users(&self, users: impl Iterator<Item = &UserId>) -> Result<()> {
255 let mut store_updates = Vec::new();
256 let mut key_query_lock = self.manager.users_for_key_query.lock().await;
257
258 {
259 let mut tracked_users = self.cache.tracked_users.write();
260 for user_id in users {
261 if tracked_users.insert(user_id.to_owned()) {
262 key_query_lock.insert_user(user_id);
263 store_updates.push((user_id, true))
264 }
265 }
266 }
267
268 self.cache.store.save_tracked_users(&store_updates).await
269 }
270
271 pub async fn mark_tracked_users_as_changed(
278 &self,
279 users: impl Iterator<Item = &UserId>,
280 ) -> Result<()> {
281 let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
282 let mut key_query_lock = self.manager.users_for_key_query.lock().await;
283
284 {
285 let tracked_users = &self.cache.tracked_users.read();
286 for user_id in users {
287 if tracked_users.contains(user_id) {
288 key_query_lock.insert_user(user_id);
289 store_updates.push((user_id, true));
290 }
291 }
292 }
293
294 self.cache.store.save_tracked_users(&store_updates).await
295 }
296
297 pub async fn mark_tracked_users_as_up_to_date(
303 &self,
304 users: impl Iterator<Item = &UserId>,
305 sequence_number: SequenceNumber,
306 ) -> Result<()> {
307 let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
308 let mut key_query_lock = self.manager.users_for_key_query.lock().await;
309
310 {
311 let tracked_users = self.cache.tracked_users.read();
312 for user_id in users {
313 if tracked_users.contains(user_id) {
314 let clean = key_query_lock.maybe_remove_user(user_id, sequence_number);
315 store_updates.push((user_id, !clean));
316 }
317 }
318 }
319
320 self.cache.store.save_tracked_users(&store_updates).await?;
321 self.manager.users_for_key_query_notify.notify_waiters();
323
324 Ok(())
325 }
326
327 pub async fn users_for_key_query(&self) -> (HashSet<OwnedUserId>, SequenceNumber) {
339 self.manager.users_for_key_query.lock().await.users_for_key_query()
340 }
341
342 pub fn tracked_users(&self) -> HashSet<OwnedUserId> {
344 self.cache.tracked_users.read().iter().cloned().collect()
345 }
346
347 pub async fn mark_user_as_changed(&self, user: &UserId) -> Result<()> {
353 self.manager.users_for_key_query.lock().await.insert_user(user);
354 self.cache.tracked_users.write().insert(user.to_owned());
355
356 self.cache.store.save_tracked_users(&[(user, true)]).await
357 }
358}
359
360fn collect_device_updates(
366 verification_machine: VerificationMachine,
367 own_identity: Option<OwnUserIdentityData>,
368 identities: IdentityChanges,
369 devices: DeviceChanges,
370) -> DeviceUpdates {
371 let mut new: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
372 let mut changed: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
373
374 let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
375
376 let map_device = |device: DeviceData| {
377 let device_owner_identity = new_identities
378 .get(device.user_id())
379 .or_else(|| changed_identities.get(device.user_id()))
380 .or_else(|| unchanged_identities.get(device.user_id()))
381 .cloned();
382
383 Device {
384 inner: device,
385 verification_machine: verification_machine.to_owned(),
386 own_identity: own_identity.to_owned(),
387 device_owner_identity,
388 }
389 };
390
391 for device in devices.new {
392 let device = map_device(device);
393
394 new.entry(device.user_id().to_owned())
395 .or_default()
396 .insert(device.device_id().to_owned(), device);
397 }
398
399 for device in devices.changed {
400 let device = map_device(device);
401
402 changed
403 .entry(device.user_id().to_owned())
404 .or_default()
405 .insert(device.device_id().to_owned(), device.to_owned());
406 }
407
408 DeviceUpdates { new, changed }
409}
410
411#[allow(missing_debug_implementations)]
413pub struct StoreTransaction {
414 store: Store,
415 changes: PendingChanges,
416 cache: OwnedRwLockWriteGuard<StoreCache>,
418}
419
420impl StoreTransaction {
421 async fn new(store: Store) -> Self {
423 let cache = store.inner.cache.clone();
424
425 Self { store, changes: PendingChanges::default(), cache: cache.clone().write_owned().await }
426 }
427
428 pub(crate) fn cache(&self) -> &StoreCache {
429 &self.cache
430 }
431
432 pub fn store(&self) -> &Store {
434 &self.store
435 }
436
437 pub async fn account(&mut self) -> Result<&mut Account> {
444 if self.changes.account.is_none() {
445 let _ = self.cache.account().await?;
447 self.changes.account = self.cache.account.lock().await.take();
448 }
449 Ok(self.changes.account.as_mut().unwrap())
450 }
451
452 pub async fn commit(self) -> Result<()> {
455 if self.changes.is_empty() {
456 return Ok(());
457 }
458
459 let account = self.changes.account.as_ref().map(|acc| acc.deep_clone());
461
462 self.store.save_pending_changes(self.changes).await?;
463
464 if let Some(account) = account {
466 *self.cache.account.lock().await = Some(account);
467 }
468
469 Ok(())
470 }
471}
472
473#[derive(Debug)]
474struct StoreInner {
475 identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
476 store: Arc<CryptoStoreWrapper>,
477
478 cache: Arc<RwLock<StoreCache>>,
482
483 verification_machine: VerificationMachine,
484
485 static_account: StaticAccountData,
488}
489
490#[derive(Debug, Error)]
493pub enum SecretImportError {
494 #[error("Error while importing {name}: {error}")]
496 Key {
497 name: SecretName,
499 error: vodozemac::KeyError,
501 },
502 #[error(
505 "Error while importing {name}: The public key of the imported private \
506 key doesn't match the public key that was uploaded to the server"
507 )]
508 MismatchedPublicKeys {
509 name: SecretName,
511 },
512 #[error(transparent)]
514 Store(#[from] CryptoStoreError),
515}
516
517#[derive(Debug, Error)]
522pub enum SecretsBundleExportError {
523 #[error(transparent)]
525 Store(#[from] CryptoStoreError),
526 #[error("The store is missing one or multiple cross-signing keys")]
528 MissingCrossSigningKey(KeyUsage),
529 #[error("The store doesn't contain any cross-signing keys")]
531 MissingCrossSigningKeys,
532 #[error("The store contains a backup key, but no backup version")]
535 MissingBackupVersion,
536}
537
538impl Store {
539 pub(crate) fn new(
541 account: StaticAccountData,
542 identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
543 store: Arc<CryptoStoreWrapper>,
544 verification_machine: VerificationMachine,
545 ) -> Self {
546 Self {
547 inner: Arc::new(StoreInner {
548 static_account: account,
549 identity,
550 store: store.clone(),
551 verification_machine,
552 cache: Arc::new(RwLock::new(StoreCache {
553 store,
554 tracked_users: Default::default(),
555 loaded_tracked_users: Default::default(),
556 account: Default::default(),
557 })),
558 }),
559 }
560 }
561
562 pub(crate) fn user_id(&self) -> &UserId {
564 &self.inner.static_account.user_id
565 }
566
567 pub(crate) fn device_id(&self) -> &DeviceId {
569 self.inner.verification_machine.own_device_id()
570 }
571
572 pub(crate) fn static_account(&self) -> &StaticAccountData {
574 &self.inner.static_account
575 }
576
577 pub(crate) async fn cache(&self) -> Result<StoreCacheGuard> {
578 Ok(StoreCacheGuard { cache: self.inner.cache.clone().read_owned().await })
583 }
584
585 pub(crate) async fn transaction(&self) -> StoreTransaction {
586 StoreTransaction::new(self.clone()).await
587 }
588
589 pub(crate) async fn with_transaction<
592 T,
593 Fut: futures_core::Future<Output = Result<(StoreTransaction, T), crate::OlmError>>,
594 F: FnOnce(StoreTransaction) -> Fut,
595 >(
596 &self,
597 func: F,
598 ) -> Result<T, crate::OlmError> {
599 let tr = self.transaction().await;
600 let (tr, res) = func(tr).await?;
601 tr.commit().await?;
602 Ok(res)
603 }
604
605 #[cfg(test)]
606 pub(crate) async fn reset_cross_signing_identity(&self) {
608 self.inner.identity.lock().await.reset();
609 }
610
611 pub(crate) fn private_identity(&self) -> Arc<Mutex<PrivateCrossSigningIdentity>> {
613 self.inner.identity.clone()
614 }
615
616 pub(crate) async fn save_sessions(&self, sessions: &[Session]) -> Result<()> {
618 let changes = Changes { sessions: sessions.to_vec(), ..Default::default() };
619
620 self.save_changes(changes).await
621 }
622
623 pub(crate) async fn get_sessions(
624 &self,
625 sender_key: &str,
626 ) -> Result<Option<Arc<Mutex<Vec<Session>>>>> {
627 self.inner.store.get_sessions(sender_key).await
628 }
629
630 pub(crate) async fn save_changes(&self, changes: Changes) -> Result<()> {
631 self.inner.store.save_changes(changes).await
632 }
633
634 pub(crate) async fn compare_group_session(
641 &self,
642 session: &InboundGroupSession,
643 ) -> Result<SessionOrdering> {
644 let old_session = self
645 .inner
646 .store
647 .get_inbound_group_session(session.room_id(), session.session_id())
648 .await?;
649
650 Ok(if let Some(old_session) = old_session {
651 session.compare(&old_session).await
652 } else {
653 SessionOrdering::Better
654 })
655 }
656
657 #[cfg(test)]
658 pub(crate) async fn save_device_data(&self, devices: &[DeviceData]) -> Result<()> {
660 use types::DeviceChanges;
661
662 let changes = Changes {
663 devices: DeviceChanges { changed: devices.to_vec(), ..Default::default() },
664 ..Default::default()
665 };
666
667 self.save_changes(changes).await
668 }
669
670 pub(crate) async fn save_inbound_group_sessions(
672 &self,
673 sessions: &[InboundGroupSession],
674 ) -> Result<()> {
675 let changes = Changes { inbound_group_sessions: sessions.to_vec(), ..Default::default() };
676
677 self.save_changes(changes).await
678 }
679
680 pub(crate) async fn device_display_name(&self) -> Result<Option<String>, CryptoStoreError> {
682 Ok(self
683 .inner
684 .store
685 .get_device(self.user_id(), self.device_id())
686 .await?
687 .and_then(|d| d.display_name().map(|d| d.to_owned())))
688 }
689
690 pub(crate) async fn get_device_data(
695 &self,
696 user_id: &UserId,
697 device_id: &DeviceId,
698 ) -> Result<Option<DeviceData>> {
699 self.inner.store.get_device(user_id, device_id).await
700 }
701
702 pub(crate) async fn get_device_data_for_user_filtered(
710 &self,
711 user_id: &UserId,
712 ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
713 self.inner.store.get_user_devices(user_id).await.map(|mut d| {
714 if user_id == self.user_id() {
715 d.remove(self.device_id());
716 }
717 d
718 })
719 }
720
721 pub(crate) async fn get_device_data_for_user(
730 &self,
731 user_id: &UserId,
732 ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
733 self.inner.store.get_user_devices(user_id).await
734 }
735
736 pub(crate) async fn get_device_from_curve_key(
742 &self,
743 user_id: &UserId,
744 curve_key: Curve25519PublicKey,
745 ) -> Result<Option<Device>> {
746 self.get_user_devices(user_id)
747 .await
748 .map(|d| d.devices().find(|d| d.curve25519_key() == Some(curve_key)))
749 }
750
751 pub(crate) async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices> {
761 let devices = self.get_device_data_for_user(user_id).await?;
762
763 let own_identity = self
764 .inner
765 .store
766 .get_user_identity(self.user_id())
767 .await?
768 .and_then(|i| i.own().cloned());
769 let device_owner_identity = self.inner.store.get_user_identity(user_id).await?;
770
771 Ok(UserDevices {
772 inner: devices,
773 verification_machine: self.inner.verification_machine.clone(),
774 own_identity,
775 device_owner_identity,
776 })
777 }
778
779 pub(crate) async fn get_device(
789 &self,
790 user_id: &UserId,
791 device_id: &DeviceId,
792 ) -> Result<Option<Device>> {
793 if let Some(device_data) = self.inner.store.get_device(user_id, device_id).await? {
794 Ok(Some(self.wrap_device_data(device_data).await?))
795 } else {
796 Ok(None)
797 }
798 }
799
800 pub(crate) async fn wrap_device_data(&self, device_data: DeviceData) -> Result<Device> {
805 let own_identity = self
806 .inner
807 .store
808 .get_user_identity(self.user_id())
809 .await?
810 .and_then(|i| i.own().cloned());
811
812 let device_owner_identity =
813 self.inner.store.get_user_identity(device_data.user_id()).await?;
814
815 Ok(Device {
816 inner: device_data,
817 verification_machine: self.inner.verification_machine.clone(),
818 own_identity,
819 device_owner_identity,
820 })
821 }
822
823 pub(crate) async fn get_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
825 let own_identity = self
826 .inner
827 .store
828 .get_user_identity(self.user_id())
829 .await?
830 .and_then(as_variant!(UserIdentityData::Own));
831
832 Ok(self.inner.store.get_user_identity(user_id).await?.map(|i| {
833 UserIdentity::new(
834 self.clone(),
835 i,
836 self.inner.verification_machine.to_owned(),
837 own_identity,
838 )
839 }))
840 }
841
842 pub async fn export_secret(
851 &self,
852 secret_name: &SecretName,
853 ) -> Result<Option<String>, CryptoStoreError> {
854 Ok(match secret_name {
855 SecretName::CrossSigningMasterKey
856 | SecretName::CrossSigningUserSigningKey
857 | SecretName::CrossSigningSelfSigningKey => {
858 self.inner.identity.lock().await.export_secret(secret_name).await
859 }
860 SecretName::RecoveryKey => {
861 if let Some(key) = self.load_backup_keys().await?.decryption_key {
862 let exported = key.to_base64();
863 Some(exported)
864 } else {
865 None
866 }
867 }
868 name => {
869 warn!(secret = ?name, "Unknown secret was requested");
870 None
871 }
872 })
873 }
874
875 pub async fn export_cross_signing_keys(
883 &self,
884 ) -> Result<Option<CrossSigningKeyExport>, CryptoStoreError> {
885 let master_key = self.export_secret(&SecretName::CrossSigningMasterKey).await?;
886 let self_signing_key = self.export_secret(&SecretName::CrossSigningSelfSigningKey).await?;
887 let user_signing_key = self.export_secret(&SecretName::CrossSigningUserSigningKey).await?;
888
889 Ok(if master_key.is_none() && self_signing_key.is_none() && user_signing_key.is_none() {
890 None
891 } else {
892 Some(CrossSigningKeyExport { master_key, self_signing_key, user_signing_key })
893 })
894 }
895
896 pub async fn import_cross_signing_keys(
901 &self,
902 export: CrossSigningKeyExport,
903 ) -> Result<CrossSigningStatus, SecretImportError> {
904 if let Some(public_identity) =
905 self.get_identity(self.user_id()).await?.and_then(|i| i.own())
906 {
907 let identity = self.inner.identity.lock().await;
908
909 identity
910 .import_secrets(
911 public_identity.to_owned(),
912 export.master_key.as_deref(),
913 export.self_signing_key.as_deref(),
914 export.user_signing_key.as_deref(),
915 )
916 .await?;
917
918 let status = identity.status().await;
919
920 let diff = identity.get_public_identity_diff(&public_identity.inner).await;
921
922 let mut changes =
923 Changes { private_identity: Some(identity.clone()), ..Default::default() };
924
925 if diff.none_differ() {
926 public_identity.mark_as_verified();
927 changes.identities.changed.push(UserIdentityData::Own(public_identity.inner));
928 }
929
930 info!(?status, "Successfully imported the private cross-signing keys");
931
932 self.save_changes(changes).await?;
933 } else {
934 warn!(
935 "No public identity found while importing cross-signing keys, \
936 a /keys/query needs to be done"
937 );
938 }
939
940 Ok(self.inner.identity.lock().await.status().await)
941 }
942
943 pub async fn export_secrets_bundle(&self) -> Result<SecretsBundle, SecretsBundleExportError> {
955 let Some(cross_signing) = self.export_cross_signing_keys().await? else {
956 return Err(SecretsBundleExportError::MissingCrossSigningKeys);
957 };
958
959 let Some(master_key) = cross_signing.master_key.clone() else {
960 return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::Master));
961 };
962
963 let Some(user_signing_key) = cross_signing.user_signing_key.clone() else {
964 return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::UserSigning));
965 };
966
967 let Some(self_signing_key) = cross_signing.self_signing_key.clone() else {
968 return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::SelfSigning));
969 };
970
971 let backup_keys = self.load_backup_keys().await?;
972
973 let backup = if let Some(key) = backup_keys.decryption_key {
974 if let Some(backup_version) = backup_keys.backup_version {
975 Some(BackupSecrets::MegolmBackupV1Curve25519AesSha2(
976 MegolmBackupV1Curve25519AesSha2Secrets { key, backup_version },
977 ))
978 } else {
979 return Err(SecretsBundleExportError::MissingBackupVersion);
980 }
981 } else {
982 None
983 };
984
985 Ok(SecretsBundle {
986 cross_signing: CrossSigningSecrets { master_key, user_signing_key, self_signing_key },
987 backup,
988 })
989 }
990
991 pub async fn import_secrets_bundle(
1004 &self,
1005 bundle: &SecretsBundle,
1006 ) -> Result<(), SecretImportError> {
1007 let mut changes = Changes::default();
1008
1009 if let Some(backup_bundle) = &bundle.backup {
1010 match backup_bundle {
1011 BackupSecrets::MegolmBackupV1Curve25519AesSha2(bundle) => {
1012 changes.backup_decryption_key = Some(bundle.key.clone());
1013 changes.backup_version = Some(bundle.backup_version.clone());
1014 }
1015 }
1016 }
1017
1018 let identity = self.inner.identity.lock().await;
1019
1020 identity
1021 .import_secrets_unchecked(
1022 Some(&bundle.cross_signing.master_key),
1023 Some(&bundle.cross_signing.self_signing_key),
1024 Some(&bundle.cross_signing.user_signing_key),
1025 )
1026 .await?;
1027
1028 let public_identity = identity.to_public_identity().await.expect(
1029 "We should be able to create a new public identity since we just imported \
1030 all the private cross-signing keys",
1031 );
1032
1033 changes.private_identity = Some(identity.clone());
1034 changes.identities.new.push(UserIdentityData::Own(public_identity));
1035
1036 Ok(self.save_changes(changes).await?)
1037 }
1038
1039 pub async fn import_secret(&self, secret: &GossippedSecret) -> Result<(), SecretImportError> {
1041 match &secret.secret_name {
1042 SecretName::CrossSigningMasterKey
1043 | SecretName::CrossSigningUserSigningKey
1044 | SecretName::CrossSigningSelfSigningKey => {
1045 if let Some(public_identity) =
1046 self.get_identity(self.user_id()).await?.and_then(|i| i.own())
1047 {
1048 let identity = self.inner.identity.lock().await;
1049
1050 identity
1051 .import_secret(
1052 public_identity,
1053 &secret.secret_name,
1054 &secret.event.content.secret,
1055 )
1056 .await?;
1057 info!(
1058 secret_name = ?secret.secret_name,
1059 "Successfully imported a private cross signing key"
1060 );
1061
1062 let changes =
1063 Changes { private_identity: Some(identity.clone()), ..Default::default() };
1064
1065 self.save_changes(changes).await?;
1066 }
1067 }
1068 SecretName::RecoveryKey => {
1069 }
1075 name => {
1076 warn!(secret = ?name, "Tried to import an unknown secret");
1077 }
1078 }
1079
1080 Ok(())
1081 }
1082
1083 pub async fn get_only_allow_trusted_devices(&self) -> Result<bool> {
1086 let value = self.get_value("only_allow_trusted_devices").await?.unwrap_or_default();
1087 Ok(value)
1088 }
1089
1090 pub async fn set_only_allow_trusted_devices(
1093 &self,
1094 block_untrusted_devices: bool,
1095 ) -> Result<()> {
1096 self.set_value("only_allow_trusted_devices", &block_untrusted_devices).await
1097 }
1098
1099 pub async fn get_value<T: DeserializeOwned>(&self, key: &str) -> Result<Option<T>> {
1101 let Some(value) = self.get_custom_value(key).await? else {
1102 return Ok(None);
1103 };
1104 let deserialized = self.deserialize_value(&value)?;
1105 Ok(Some(deserialized))
1106 }
1107
1108 pub async fn set_value(&self, key: &str, value: &impl Serialize) -> Result<()> {
1110 let serialized = self.serialize_value(value)?;
1111 self.set_custom_value(key, serialized).await?;
1112 Ok(())
1113 }
1114
1115 fn serialize_value(&self, value: &impl Serialize) -> Result<Vec<u8>> {
1116 let serialized =
1117 rmp_serde::to_vec_named(value).map_err(|x| CryptoStoreError::Backend(x.into()))?;
1118 Ok(serialized)
1119 }
1120
1121 fn deserialize_value<T: DeserializeOwned>(&self, value: &[u8]) -> Result<T> {
1122 let deserialized =
1123 rmp_serde::from_slice(value).map_err(|e| CryptoStoreError::Backend(e.into()))?;
1124 Ok(deserialized)
1125 }
1126
1127 pub fn room_keys_received_stream(
1139 &self,
1140 ) -> impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<> {
1141 self.inner.store.room_keys_received_stream()
1142 }
1143
1144 pub fn room_keys_withheld_received_stream(
1153 &self,
1154 ) -> impl Stream<Item = Vec<RoomKeyWithheldInfo>> + use<> {
1155 self.inner.store.room_keys_withheld_received_stream()
1156 }
1157
1158 pub fn user_identities_stream(&self) -> impl Stream<Item = IdentityUpdates> + use<> {
1189 let verification_machine = self.inner.verification_machine.to_owned();
1190
1191 let this = self.clone();
1192 self.inner.store.identities_stream().map(move |(own_identity, identities, _)| {
1193 let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
1194
1195 let map_identity = |(user_id, identity)| {
1196 (
1197 user_id,
1198 UserIdentity::new(
1199 this.clone(),
1200 identity,
1201 verification_machine.to_owned(),
1202 own_identity.to_owned(),
1203 ),
1204 )
1205 };
1206
1207 let new = new_identities.into_iter().map(map_identity).collect();
1208 let changed = changed_identities.into_iter().map(map_identity).collect();
1209 let unchanged = unchanged_identities.into_iter().map(map_identity).collect();
1210
1211 IdentityUpdates { new, changed, unchanged }
1212 })
1213 }
1214
1215 pub fn devices_stream(&self) -> impl Stream<Item = DeviceUpdates> + use<> {
1247 let verification_machine = self.inner.verification_machine.to_owned();
1248
1249 self.inner.store.identities_stream().map(move |(own_identity, identities, devices)| {
1250 collect_device_updates(
1251 verification_machine.to_owned(),
1252 own_identity,
1253 identities,
1254 devices,
1255 )
1256 })
1257 }
1258
1259 pub fn identities_stream_raw(
1269 &self,
1270 ) -> impl Stream<Item = (IdentityChanges, DeviceChanges)> + use<> {
1271 self.inner.store.identities_stream().map(|(_, identities, devices)| (identities, devices))
1272 }
1273
1274 pub fn create_store_lock(
1277 &self,
1278 lock_key: String,
1279 lock_value: String,
1280 ) -> CrossProcessLock<LockableCryptoStore> {
1281 self.inner.store.create_store_lock(lock_key, lock_value)
1282 }
1283
1284 pub fn secrets_stream(&self) -> impl Stream<Item = GossippedSecret> + use<> {
1324 self.inner.store.secrets_stream()
1325 }
1326
1327 pub fn historic_room_key_stream(&self) -> impl Stream<Item = RoomKeyBundleInfo> + use<> {
1367 self.inner.store.historic_room_key_stream()
1368 }
1369
1370 pub async fn import_room_keys(
1383 &self,
1384 exported_keys: Vec<ExportedRoomKey>,
1385 from_backup_version: Option<&str>,
1386 progress_listener: impl Fn(usize, usize),
1387 ) -> Result<RoomKeyImportResult> {
1388 let exported_keys: Vec<&ExportedRoomKey> = exported_keys.iter().collect();
1389 self.import_sessions_impl(exported_keys, from_backup_version, progress_listener).await
1390 }
1391
1392 pub async fn import_exported_room_keys(
1419 &self,
1420 exported_keys: Vec<ExportedRoomKey>,
1421 progress_listener: impl Fn(usize, usize),
1422 ) -> Result<RoomKeyImportResult> {
1423 self.import_room_keys(exported_keys, None, progress_listener).await
1424 }
1425
1426 async fn import_sessions_impl<T>(
1427 &self,
1428 room_keys: Vec<T>,
1429 from_backup_version: Option<&str>,
1430 progress_listener: impl Fn(usize, usize),
1431 ) -> Result<RoomKeyImportResult>
1432 where
1433 T: TryInto<InboundGroupSession> + RoomKeyExport + Copy,
1434 T::Error: Debug,
1435 {
1436 let mut sessions = Vec::new();
1437
1438 async fn new_session_better(
1439 session: &InboundGroupSession,
1440 old_session: Option<InboundGroupSession>,
1441 ) -> bool {
1442 if let Some(old_session) = &old_session {
1443 session.compare(old_session).await == SessionOrdering::Better
1444 } else {
1445 true
1446 }
1447 }
1448
1449 let total_count = room_keys.len();
1450 let mut keys = BTreeMap::new();
1451
1452 for (i, key) in room_keys.into_iter().enumerate() {
1453 match key.try_into() {
1454 Ok(session) => {
1455 let old_session = self
1456 .inner
1457 .store
1458 .get_inbound_group_session(session.room_id(), session.session_id())
1459 .await?;
1460
1461 if new_session_better(&session, old_session).await {
1464 if from_backup_version.is_some() {
1465 session.mark_as_backed_up();
1466 }
1467
1468 keys.entry(session.room_id().to_owned())
1469 .or_insert_with(BTreeMap::new)
1470 .entry(session.sender_key().to_base64())
1471 .or_insert_with(BTreeSet::new)
1472 .insert(session.session_id().to_owned());
1473
1474 sessions.push(session);
1475 }
1476 }
1477 Err(e) => {
1478 warn!(
1479 sender_key = key.sender_key().to_base64(),
1480 room_id = ?key.room_id(),
1481 session_id = key.session_id(),
1482 error = ?e,
1483 "Couldn't import a room key from a file export."
1484 );
1485 }
1486 }
1487
1488 progress_listener(i, total_count);
1489 }
1490
1491 let imported_count = sessions.len();
1492
1493 self.inner.store.save_inbound_group_sessions(sessions, from_backup_version).await?;
1494
1495 info!(total_count, imported_count, room_keys = ?keys, "Successfully imported room keys");
1496
1497 Ok(RoomKeyImportResult::new(imported_count, total_count, keys))
1498 }
1499
1500 pub(crate) fn crypto_store(&self) -> Arc<CryptoStoreWrapper> {
1501 self.inner.store.clone()
1502 }
1503
1504 pub async fn export_room_keys(
1527 &self,
1528 predicate: impl FnMut(&InboundGroupSession) -> bool,
1529 ) -> Result<Vec<ExportedRoomKey>> {
1530 let mut exported = Vec::new();
1531
1532 let mut sessions = self.get_inbound_group_sessions().await?;
1533 sessions.retain(predicate);
1534
1535 for session in sessions {
1536 let export = session.export().await;
1537 exported.push(export);
1538 }
1539
1540 Ok(exported)
1541 }
1542
1543 pub async fn export_room_keys_stream(
1576 &self,
1577 predicate: impl FnMut(&InboundGroupSession) -> bool,
1578 ) -> Result<impl Stream<Item = ExportedRoomKey>> {
1579 let sessions = self.get_inbound_group_sessions().await?;
1581 Ok(futures_util::stream::iter(sessions.into_iter().filter(predicate))
1582 .then(|session| async move { session.export().await }))
1583 }
1584
1585 pub async fn build_room_key_bundle(
1590 &self,
1591 room_id: &RoomId,
1592 ) -> std::result::Result<RoomKeyBundle, CryptoStoreError> {
1593 let sessions = self.get_inbound_group_sessions_by_room_id(room_id).await?;
1594
1595 let mut bundle = RoomKeyBundle::default();
1596 for session in sessions {
1597 if session.shared_history() {
1598 bundle.room_keys.push(session.export().await.into());
1599 } else {
1600 bundle.withheld.push(RoomKeyWithheldContent::new(
1601 session.algorithm().to_owned(),
1602 WithheldCode::Unauthorised,
1603 session.room_id().to_owned(),
1604 session.session_id().to_owned(),
1605 session.sender_key().to_owned(),
1606 self.device_id().to_owned(),
1607 ));
1608 }
1609 }
1610
1611 Ok(bundle)
1612 }
1613
1614 #[instrument(skip(self, bundle, progress_listener), fields(bundle_size = bundle.room_keys.len(), sender_data))]
1624 pub async fn receive_room_key_bundle(
1625 &self,
1626 bundle_info: &StoredRoomKeyBundleData,
1627 bundle: RoomKeyBundle,
1628 progress_listener: impl Fn(usize, usize),
1629 ) -> Result<(), CryptoStoreError> {
1630 let sender_data = if bundle_info.sender_data.should_recalculate() {
1631 let device = self
1632 .get_device_from_curve_key(&bundle_info.sender_user, bundle_info.sender_key)
1633 .await?;
1634
1635 device
1636 .as_ref()
1637 .map(SenderData::from_device)
1638 .unwrap_or_else(|| bundle_info.sender_data.clone())
1639 } else {
1640 bundle_info.sender_data.clone()
1641 };
1642
1643 tracing::Span::current().record("sender_data", tracing::field::debug(&sender_data));
1644
1645 match sender_data {
1646 SenderData::UnknownDevice { .. }
1647 | SenderData::VerificationViolation(_)
1648 | SenderData::DeviceInfo { .. } => {
1649 warn!("Not accepting a historic room key bundle due to insufficient trust in the sender");
1650 Ok(())
1651 }
1652 SenderData::SenderUnverified(_) | SenderData::SenderVerified(_) => {
1653 let (good, bad): (Vec<_>, Vec<_>) = bundle.room_keys.iter().partition_map(|key| {
1654 if key.room_id != bundle_info.bundle_data.room_id {
1655 trace!("Ignoring key for incorrect room {} in bundle", key.room_id);
1656 Either::Right(key)
1657 } else {
1658 Either::Left(key)
1659 }
1660 });
1661
1662 match (bad.is_empty(), good.is_empty()) {
1663 (true, true) => {
1665 warn!("Received a completely empty room key bundle");
1666 }
1667
1668 (false, true) => {
1670 let bad_keys: Vec<_> =
1671 bad.iter().map(|&key| (&key.room_id, &key.session_id)).collect();
1672
1673 warn!(
1674 ?bad_keys,
1675 "Received a room key bundle for the wrong room, ignoring all room keys from the bundle"
1676 );
1677 }
1678
1679 (_, false) => {
1681 if !bad.is_empty() {
1684 warn!(
1685 bad_key_count = bad.len(),
1686 "The room key bundle contained some room keys \
1687 that were meant for a different room"
1688 );
1689 }
1690
1691 self.import_sessions_impl(good, None, progress_listener).await?;
1692 }
1693 }
1694
1695 Ok(())
1696 }
1697 }
1698 }
1699}
1700
1701impl Deref for Store {
1702 type Target = DynCryptoStore;
1703
1704 fn deref(&self) -> &Self::Target {
1705 self.inner.store.deref().deref()
1706 }
1707}
1708
1709#[derive(Clone, Debug)]
1711pub struct LockableCryptoStore(Arc<dyn CryptoStore<Error = CryptoStoreError>>);
1712
1713impl matrix_sdk_common::cross_process_lock::TryLock for LockableCryptoStore {
1714 type LockError = CryptoStoreError;
1715
1716 async fn try_lock(
1717 &self,
1718 lease_duration_ms: u32,
1719 key: &str,
1720 holder: &str,
1721 ) -> std::result::Result<bool, Self::LockError> {
1722 self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
1723 }
1724}
1725
1726#[cfg(test)]
1727mod tests {
1728 use std::pin::pin;
1729
1730 use futures_util::StreamExt;
1731 use insta::{_macro_support::Content, assert_json_snapshot, internals::ContentPath};
1732 use matrix_sdk_test::async_test;
1733 use ruma::{device_id, room_id, user_id, RoomId};
1734 use vodozemac::megolm::SessionKey;
1735
1736 use crate::{
1737 machine::test_helpers::get_machine_pair,
1738 olm::{InboundGroupSession, SenderData},
1739 store::types::DehydratedDeviceKey,
1740 types::EventEncryptionAlgorithm,
1741 OlmMachine,
1742 };
1743
1744 #[async_test]
1745 async fn test_import_room_keys_notifies_stream() {
1746 use futures_util::FutureExt;
1747
1748 let (alice, bob, _) =
1749 get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
1750
1751 let room1_id = room_id!("!room1:localhost");
1752 alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
1753 let exported_sessions = alice.store().export_room_keys(|_| true).await.unwrap();
1754
1755 let mut room_keys_received_stream = Box::pin(bob.store().room_keys_received_stream());
1756 bob.store().import_room_keys(exported_sessions, None, |_, _| {}).await.unwrap();
1757
1758 let room_keys = room_keys_received_stream
1759 .next()
1760 .now_or_never()
1761 .flatten()
1762 .expect("We should have received an update of room key infos")
1763 .unwrap();
1764 assert_eq!(room_keys.len(), 1);
1765 assert_eq!(room_keys[0].room_id, "!room1:localhost");
1766 }
1767
1768 #[async_test]
1769 async fn test_export_room_keys_provides_selected_keys() {
1770 let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
1772 let room1_id = room_id!("!room1:localhost");
1773 let room2_id = room_id!("!room2:localhost");
1774 let room3_id = room_id!("!room3:localhost");
1775 alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
1776 alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
1777 alice.create_outbound_group_session_with_defaults_test_helper(room3_id).await.unwrap();
1778
1779 let keys = alice
1781 .store()
1782 .export_room_keys(|s| s.room_id() == room2_id || s.room_id() == room3_id)
1783 .await
1784 .unwrap();
1785
1786 assert_eq!(keys.len(), 2);
1788 assert_eq!(keys[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
1789 assert_eq!(keys[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
1790 assert_eq!(keys[0].room_id, "!room2:localhost");
1791 assert_eq!(keys[1].room_id, "!room3:localhost");
1792 assert_eq!(keys[0].session_key.to_base64().len(), 220);
1793 assert_eq!(keys[1].session_key.to_base64().len(), 220);
1794 }
1795
1796 #[async_test]
1797 async fn test_export_room_keys_stream_can_provide_all_keys() {
1798 let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
1800 let room1_id = room_id!("!room1:localhost");
1801 let room2_id = room_id!("!room2:localhost");
1802 alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
1803 alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
1804
1805 let mut keys = pin!(alice.store().export_room_keys_stream(|_| true).await.unwrap());
1807
1808 let mut collected = vec![];
1810 while let Some(key) = keys.next().await {
1811 collected.push(key);
1812 }
1813
1814 assert_eq!(collected.len(), 2);
1816 assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
1817 assert_eq!(collected[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
1818 assert_eq!(collected[0].room_id, "!room1:localhost");
1819 assert_eq!(collected[1].room_id, "!room2:localhost");
1820 assert_eq!(collected[0].session_key.to_base64().len(), 220);
1821 assert_eq!(collected[1].session_key.to_base64().len(), 220);
1822 }
1823
1824 #[async_test]
1825 async fn test_export_room_keys_stream_can_provide_a_subset_of_keys() {
1826 let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
1828 let room1_id = room_id!("!room1:localhost");
1829 let room2_id = room_id!("!room2:localhost");
1830 alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
1831 alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
1832
1833 let mut keys =
1835 pin!(alice.store().export_room_keys_stream(|s| s.room_id() == room1_id).await.unwrap());
1836
1837 let mut collected = vec![];
1839 while let Some(key) = keys.next().await {
1840 collected.push(key);
1841 }
1842
1843 assert_eq!(collected.len(), 1);
1845 assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
1846 assert_eq!(collected[0].room_id, "!room1:localhost");
1847 assert_eq!(collected[0].session_key.to_base64().len(), 220);
1848 }
1849
1850 #[async_test]
1851 async fn test_export_secrets_bundle() {
1852 let user_id = user_id!("@alice:example.com");
1853 let (first, second, _) = get_machine_pair(user_id, user_id, false).await;
1854
1855 let _ = first
1856 .bootstrap_cross_signing(false)
1857 .await
1858 .expect("We should be able to bootstrap cross-signing");
1859
1860 let bundle = first.store().export_secrets_bundle().await.expect(
1861 "We should be able to export the secrets bundle, now that we \
1862 have the cross-signing keys",
1863 );
1864
1865 assert!(bundle.backup.is_none(), "The bundle should not contain a backup key");
1866
1867 second
1868 .store()
1869 .import_secrets_bundle(&bundle)
1870 .await
1871 .expect("We should be able to import the secrets bundle");
1872
1873 let status = second.cross_signing_status().await;
1874 let identity = second.get_identity(user_id, None).await.unwrap().unwrap().own().unwrap();
1875
1876 assert!(identity.is_verified(), "The public identity should be marked as verified.");
1877
1878 assert!(status.is_complete(), "We should have imported all the cross-signing keys");
1879 }
1880
1881 #[async_test]
1882 async fn test_create_dehydrated_device_key() {
1883 let pickle_key = DehydratedDeviceKey::new()
1884 .expect("Should be able to create a random dehydrated device key");
1885
1886 let to_vec = pickle_key.inner.to_vec();
1887 let pickle_key_from_slice = DehydratedDeviceKey::from_slice(to_vec.as_slice())
1888 .expect("Should be able to create a dehydrated device key from slice");
1889
1890 assert_eq!(pickle_key_from_slice.to_base64(), pickle_key.to_base64());
1891 }
1892
1893 #[async_test]
1894 async fn test_create_dehydrated_errors() {
1895 let too_small = [0u8; 22];
1896 let pickle_key = DehydratedDeviceKey::from_slice(&too_small);
1897
1898 assert!(pickle_key.is_err());
1899
1900 let too_big = [0u8; 40];
1901 let pickle_key = DehydratedDeviceKey::from_slice(&too_big);
1902
1903 assert!(pickle_key.is_err());
1904 }
1905
1906 #[async_test]
1907 async fn test_build_room_key_bundle() {
1908 let alice = OlmMachine::new(user_id!("@a:s.co"), device_id!("ALICE")).await;
1911 let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("BOB")).await;
1912
1913 let room1_id = room_id!("!room1:localhost");
1914 let room2_id = room_id!("!room2:localhost");
1915
1916 let session_key1 = "AgAAAAC2XHVzsMBKs4QCRElJ92CJKyGtknCSC8HY7cQ7UYwndMKLQAejXLh5UA0l6s736mgctcUMNvELScUWrObdflrHo+vth/gWreXOaCnaSxmyjjKErQwyIYTkUfqbHy40RJfEesLwnN23on9XAkch/iy8R2+Jz7B8zfG01f2Ow2SxPQFnAndcO1ZSD2GmXgedy6n4B20MWI1jGP2wiexOWbFSya8DO/VxC9m5+/mF+WwYqdpKn9g4Y05Yw4uz7cdjTc3rXm7xK+8E7hI//5QD1nHPvuKYbjjM9u2JSL+Bzp61Cw";
1921 let session_key2 = "AgAAAAC1BXreFTUQQSBGekTEuYxhdytRKyv4JgDGcG+VOBYdPNGgs807SdibCGJky4lJ3I+7ZDGHoUzZPZP/4ogGu4kxni0PWdtWuN7+5zsuamgoFF/BkaGeUUGv6kgIkx8pyPpM5SASTUEP9bN2loDSpUPYwfiIqz74DgC4WQ4435sTBctYvKz8n+TDJwdLXpyT6zKljuqADAioud+s/iqx9LYn9HpbBfezZcvbg67GtE113pLrvde3IcPI5s6dNHK2onGO2B2eoaobcen18bbEDnlUGPeIivArLya7Da6us14jBQ";
1922 let session_key3 = "AgAAAAAM9KFsliaUUhGSXgwOzM5UemjkNH4n8NHgvC/y8hhw13zTF+ooGD4uIYEXYX630oNvQm/EvgZo+dkoc0re+vsqsx4sQeNODdSjcBsWOa0oDF+irQn9oYoLUDPI1IBtY1rX+FV99Zm/xnG7uFOX7aTVlko2GSdejy1w9mfobmfxu5aUc04A9zaKJP1pOthZvRAlhpymGYHgsDtWPrrjyc/yypMflE4kIUEEEtu1kT6mrAmcl615XYRAHYK9G2+fZsGvokwzbkl4nulGwcZMpQEoM0nD2o3GWgX81HW3nGfKBg";
1923 let session_key4 = "AgAAAAA4Kkesxq2h4v9PLD6Sm3Smxspz1PXTqytQPCMQMkkrHNmzV2bHlJ+6/Al9cu8vh1Oj69AK0WUAeJOJuaiskEeg/PI3P03+UYLeC379RzgqwSHdBgdQ41G2vD6zpgmE/8vYToe+qpCZACtPOswZxyqxHH+T/Iq0nv13JmlFGIeA6fEPfr5Y28B49viG74Fs9rxV9EH5PfjbuPM/p+Sz5obShuaBPKQBX1jT913nEXPoIJ06exNZGr0285nw/LgVvNlmWmbqNnbzO2cNZjQWA+xZYz5FSfyCxwqEBbEdUCuRCQ";
1924
1925 let sessions = [
1926 create_inbound_group_session_with_visibility(
1927 &alice,
1928 room1_id,
1929 &SessionKey::from_base64(session_key1).unwrap(),
1930 true,
1931 ),
1932 create_inbound_group_session_with_visibility(
1933 &alice,
1934 room1_id,
1935 &SessionKey::from_base64(session_key2).unwrap(),
1936 true,
1937 ),
1938 create_inbound_group_session_with_visibility(
1939 &alice,
1940 room1_id,
1941 &SessionKey::from_base64(session_key3).unwrap(),
1942 false,
1943 ),
1944 create_inbound_group_session_with_visibility(
1945 &alice,
1946 room2_id,
1947 &SessionKey::from_base64(session_key4).unwrap(),
1948 true,
1949 ),
1950 ];
1951 bob.store().save_inbound_group_sessions(&sessions).await.unwrap();
1952
1953 let mut bundle = bob.store().build_room_key_bundle(room1_id).await.unwrap();
1955
1956 bundle.room_keys.sort_by_key(|session| session.session_id.clone());
1960
1961 let alice_curve_key = alice.identity_keys().curve25519.to_base64();
1963 let map_alice_curve_key = move |value: Content, _path: ContentPath<'_>| {
1964 assert_eq!(value.as_str().unwrap(), alice_curve_key);
1965 "[alice curve key]"
1966 };
1967 let alice_ed25519_key = alice.identity_keys().ed25519.to_base64();
1968 let map_alice_ed25519_key = move |value: Content, _path: ContentPath<'_>| {
1969 assert_eq!(value.as_str().unwrap(), alice_ed25519_key);
1970 "[alice ed25519 key]"
1971 };
1972
1973 insta::with_settings!({ sort_maps => true }, {
1974 assert_json_snapshot!(bundle, {
1975 ".room_keys[].sender_key" => insta::dynamic_redaction(map_alice_curve_key.clone()),
1976 ".withheld[].sender_key" => insta::dynamic_redaction(map_alice_curve_key),
1977 ".room_keys[].sender_claimed_keys.ed25519" => insta::dynamic_redaction(map_alice_ed25519_key),
1978 });
1979 });
1980 }
1981
1982 fn create_inbound_group_session_with_visibility(
1987 olm_machine: &OlmMachine,
1988 room_id: &RoomId,
1989 session_key: &SessionKey,
1990 shared_history: bool,
1991 ) -> InboundGroupSession {
1992 let identity_keys = &olm_machine.store().static_account().identity_keys;
1993 InboundGroupSession::new(
1994 identity_keys.curve25519,
1995 identity_keys.ed25519,
1996 room_id,
1997 session_key,
1998 SenderData::unknown(),
1999 EventEncryptionAlgorithm::MegolmV1AesSha2,
2000 None,
2001 shared_history,
2002 )
2003 .unwrap()
2004 }
2005}