matrix_sdk_crypto/store/
mod.rs

1// Copyright 2020 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Types and traits to implement the storage layer for the [`OlmMachine`]
16//!
17//! The storage layer for the [`OlmMachine`] can be customized using a trait.
18//! Implementing your own [`CryptoStore`]
19//!
20//! An in-memory only store is provided as well as an SQLite-based one,
21//! depending on your needs and targets a custom store may be implemented, e.g.
22//! for `wasm-unknown-unknown` an indexeddb store would be needed
23//!
24//! ```
25//! # use std::sync::Arc;
26//! # use matrix_sdk_crypto::{
27//! #     OlmMachine,
28//! #     store::MemoryStore,
29//! # };
30//! # use ruma::{device_id, user_id};
31//! # let user_id = user_id!("@example:localhost");
32//! # let device_id = device_id!("TEST");
33//! let store = Arc::new(MemoryStore::new());
34//!
35//! let machine = OlmMachine::with_store(user_id, device_id, store, None);
36//! ```
37//!
38//! [`OlmMachine`]: /matrix_sdk_crypto/struct.OlmMachine.html
39//! [`CryptoStore`]: trait.Cryptostore.html
40
41use std::{
42    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
43    fmt::Debug,
44    ops::Deref,
45    pin::pin,
46    sync::{atomic::Ordering, Arc},
47    time::Duration,
48};
49
50use as_variant::as_variant;
51use futures_core::Stream;
52use futures_util::StreamExt;
53use itertools::{Either, Itertools};
54use ruma::{
55    encryption::KeyUsage, events::secret::request::SecretName, DeviceId, OwnedDeviceId,
56    OwnedUserId, RoomId, UserId,
57};
58use serde::{de::DeserializeOwned, Serialize};
59use thiserror::Error;
60use tokio::sync::{Mutex, Notify, OwnedRwLockWriteGuard, RwLock};
61use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
62use tracing::{info, instrument, trace, warn};
63use types::{RoomKeyBundleInfo, StoredRoomKeyBundleData};
64use vodozemac::{megolm::SessionOrdering, Curve25519PublicKey};
65
66use self::types::{
67    Changes, CrossSigningKeyExport, DeviceChanges, DeviceUpdates, IdentityChanges, IdentityUpdates,
68    PendingChanges, RoomKeyInfo, RoomKeyWithheldInfo, UserKeyQueryResult,
69};
70#[cfg(doc)]
71use crate::{backups::BackupMachine, identities::OwnUserIdentity};
72use crate::{
73    gossiping::GossippedSecret,
74    identities::{user::UserIdentity, Device, DeviceData, UserDevices, UserIdentityData},
75    olm::{
76        Account, ExportedRoomKey, InboundGroupSession, PrivateCrossSigningIdentity, SenderData,
77        Session, StaticAccountData,
78    },
79    types::{
80        BackupSecrets, CrossSigningSecrets, MegolmBackupV1Curve25519AesSha2Secrets, RoomKeyExport,
81        SecretsBundle,
82    },
83    verification::VerificationMachine,
84    CrossSigningStatus, OwnUserIdentityData, RoomKeyImportResult,
85};
86
87pub mod caches;
88mod crypto_store_wrapper;
89mod error;
90mod memorystore;
91mod traits;
92pub mod types;
93
94#[cfg(any(test, feature = "testing"))]
95#[macro_use]
96#[allow(missing_docs)]
97pub mod integration_tests;
98
99pub(crate) use crypto_store_wrapper::CryptoStoreWrapper;
100pub use error::{CryptoStoreError, Result};
101use matrix_sdk_common::{
102    cross_process_lock::CrossProcessLock, deserialized_responses::WithheldCode, timeout::timeout,
103};
104pub use memorystore::MemoryStore;
105pub use traits::{CryptoStore, DynCryptoStore, IntoCryptoStore};
106
107use self::caches::{SequenceNumber, StoreCache, StoreCacheGuard, UsersForKeyQuery};
108use crate::types::{
109    events::room_key_withheld::RoomKeyWithheldContent, room_history::RoomKeyBundle,
110};
111pub use crate::{
112    dehydrated_devices::DehydrationError,
113    gossiping::{GossipRequest, SecretInfo},
114};
115
116/// A wrapper for our CryptoStore trait object.
117///
118/// This is needed because we want to have a generic interface so we can
119/// store/restore objects that we can serialize. Since trait objects and
120/// generics don't mix let the CryptoStore store strings and this wrapper
121/// adds the generic interface on top.
122#[derive(Debug, Clone)]
123pub struct Store {
124    inner: Arc<StoreInner>,
125}
126
127#[derive(Debug, Default)]
128pub(crate) struct KeyQueryManager {
129    /// Record of the users that are waiting for a /keys/query.
130    users_for_key_query: Mutex<UsersForKeyQuery>,
131
132    /// Notifier that is triggered each time an update is received for a user.
133    users_for_key_query_notify: Notify,
134}
135
136impl KeyQueryManager {
137    pub async fn synced<'a>(&'a self, cache: &'a StoreCache) -> Result<SyncedKeyQueryManager<'a>> {
138        self.ensure_sync_tracked_users(cache).await?;
139        Ok(SyncedKeyQueryManager { cache, manager: self })
140    }
141
142    /// Load the list of users for whom we are tracking their device lists and
143    /// fill out our caches.
144    ///
145    /// This method ensures that we're only going to load the users from the
146    /// actual [`CryptoStore`] once, it will also make sure that any
147    /// concurrent calls to this method get deduplicated.
148    async fn ensure_sync_tracked_users(&self, cache: &StoreCache) -> Result<()> {
149        // Check if the users are loaded, and in that case do nothing.
150        let loaded = cache.loaded_tracked_users.read().await;
151        if *loaded {
152            return Ok(());
153        }
154
155        // Otherwise, we may load the users.
156        drop(loaded);
157        let mut loaded = cache.loaded_tracked_users.write().await;
158
159        // Check again if the users have been loaded, in case another call to this
160        // method loaded the tracked users between the time we tried to
161        // acquire the lock and the time we actually acquired the lock.
162        if *loaded {
163            return Ok(());
164        }
165
166        let tracked_users = cache.store.load_tracked_users().await?;
167
168        let mut query_users_lock = self.users_for_key_query.lock().await;
169        let mut tracked_users_cache = cache.tracked_users.write();
170        for user in tracked_users {
171            tracked_users_cache.insert(user.user_id.to_owned());
172
173            if user.dirty {
174                query_users_lock.insert_user(&user.user_id);
175            }
176        }
177
178        *loaded = true;
179
180        Ok(())
181    }
182
183    /// Wait for a `/keys/query` response to be received if one is expected for
184    /// the given user.
185    ///
186    /// If the given timeout elapses, the method will stop waiting and return
187    /// [`UserKeyQueryResult::TimeoutExpired`].
188    ///
189    /// Requires a [`StoreCacheGuard`] to make sure the users for which a key
190    /// query is pending are up to date, but doesn't hold on to it
191    /// thereafter: the lock is short-lived in this case.
192    pub async fn wait_if_user_key_query_pending(
193        &self,
194        cache: StoreCacheGuard,
195        timeout_duration: Duration,
196        user: &UserId,
197    ) -> Result<UserKeyQueryResult> {
198        {
199            // Drop the cache early, so we don't keep it while waiting (since writing the
200            // results requires to write in the cache, thus take another lock).
201            self.ensure_sync_tracked_users(&cache).await?;
202            drop(cache);
203        }
204
205        let mut users_for_key_query = self.users_for_key_query.lock().await;
206        let Some(waiter) = users_for_key_query.maybe_register_waiting_task(user) else {
207            return Ok(UserKeyQueryResult::WasNotPending);
208        };
209
210        let wait_for_completion = async {
211            while !waiter.completed.load(Ordering::Relaxed) {
212                // Register for being notified before releasing the mutex, so
213                // it's impossible to miss a wakeup between the last check for
214                // whether we should wait, and starting to wait.
215                let mut notified = pin!(self.users_for_key_query_notify.notified());
216                notified.as_mut().enable();
217                drop(users_for_key_query);
218
219                // Wait for a notification
220                notified.await;
221
222                // Reclaim the lock before checking the flag to avoid races
223                // when two notifications happen right after each other and the
224                // second one sets the flag we want to wait for.
225                users_for_key_query = self.users_for_key_query.lock().await;
226            }
227        };
228
229        match timeout(Box::pin(wait_for_completion), timeout_duration).await {
230            Err(_) => {
231                warn!(
232                    user_id = ?user,
233                    "The user has a pending `/keys/query` request which did \
234                    not finish yet, some devices might be missing."
235                );
236
237                Ok(UserKeyQueryResult::TimeoutExpired)
238            }
239            _ => Ok(UserKeyQueryResult::WasPending),
240        }
241    }
242}
243
244pub(crate) struct SyncedKeyQueryManager<'a> {
245    cache: &'a StoreCache,
246    manager: &'a KeyQueryManager,
247}
248
249impl SyncedKeyQueryManager<'_> {
250    /// Add entries to the list of users being tracked for device changes
251    ///
252    /// Any users not already on the list are flagged as awaiting a key query.
253    /// Users that were already in the list are unaffected.
254    pub async fn update_tracked_users(&self, users: impl Iterator<Item = &UserId>) -> Result<()> {
255        let mut store_updates = Vec::new();
256        let mut key_query_lock = self.manager.users_for_key_query.lock().await;
257
258        {
259            let mut tracked_users = self.cache.tracked_users.write();
260            for user_id in users {
261                if tracked_users.insert(user_id.to_owned()) {
262                    key_query_lock.insert_user(user_id);
263                    store_updates.push((user_id, true))
264                }
265            }
266        }
267
268        self.cache.store.save_tracked_users(&store_updates).await
269    }
270
271    /// Process notifications that users have changed devices.
272    ///
273    /// This is used to handle the list of device-list updates that is received
274    /// from the `/sync` response. Any users *whose device lists we are
275    /// tracking* are flagged as needing a key query. Users whose devices we
276    /// are not tracking are ignored.
277    pub async fn mark_tracked_users_as_changed(
278        &self,
279        users: impl Iterator<Item = &UserId>,
280    ) -> Result<()> {
281        let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
282        let mut key_query_lock = self.manager.users_for_key_query.lock().await;
283
284        {
285            let tracked_users = &self.cache.tracked_users.read();
286            for user_id in users {
287                if tracked_users.contains(user_id) {
288                    key_query_lock.insert_user(user_id);
289                    store_updates.push((user_id, true));
290                }
291            }
292        }
293
294        self.cache.store.save_tracked_users(&store_updates).await
295    }
296
297    /// Flag that the given users devices are now up-to-date.
298    ///
299    /// This is called after processing the response to a /keys/query request.
300    /// Any users whose device lists we are tracking are removed from the
301    /// list of those pending a /keys/query.
302    pub async fn mark_tracked_users_as_up_to_date(
303        &self,
304        users: impl Iterator<Item = &UserId>,
305        sequence_number: SequenceNumber,
306    ) -> Result<()> {
307        let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
308        let mut key_query_lock = self.manager.users_for_key_query.lock().await;
309
310        {
311            let tracked_users = self.cache.tracked_users.read();
312            for user_id in users {
313                if tracked_users.contains(user_id) {
314                    let clean = key_query_lock.maybe_remove_user(user_id, sequence_number);
315                    store_updates.push((user_id, !clean));
316                }
317            }
318        }
319
320        self.cache.store.save_tracked_users(&store_updates).await?;
321        // wake up any tasks that may have been waiting for updates
322        self.manager.users_for_key_query_notify.notify_waiters();
323
324        Ok(())
325    }
326
327    /// Get the set of users that has the outdate/dirty flag set for their list
328    /// of devices.
329    ///
330    /// This set should be included in a `/keys/query` request which will update
331    /// the device list.
332    ///
333    /// # Returns
334    ///
335    /// A pair `(users, sequence_number)`, where `users` is the list of users to
336    /// be queried, and `sequence_number` is the current sequence number,
337    /// which should be returned in `mark_tracked_users_as_up_to_date`.
338    pub async fn users_for_key_query(&self) -> (HashSet<OwnedUserId>, SequenceNumber) {
339        self.manager.users_for_key_query.lock().await.users_for_key_query()
340    }
341
342    /// See the docs for [`crate::OlmMachine::tracked_users()`].
343    pub fn tracked_users(&self) -> HashSet<OwnedUserId> {
344        self.cache.tracked_users.read().iter().cloned().collect()
345    }
346
347    /// Mark the given user as being tracked for device lists, and mark that it
348    /// has an outdated device list.
349    ///
350    /// This means that the user will be considered for a `/keys/query` request
351    /// next time [`Store::users_for_key_query()`] is called.
352    pub async fn mark_user_as_changed(&self, user: &UserId) -> Result<()> {
353        self.manager.users_for_key_query.lock().await.insert_user(user);
354        self.cache.tracked_users.write().insert(user.to_owned());
355
356        self.cache.store.save_tracked_users(&[(user, true)]).await
357    }
358}
359
360/// Convert the devices and vectors contained in the [`DeviceChanges`] into
361/// a [`DeviceUpdates`] struct.
362///
363/// The [`DeviceChanges`] will contain vectors of [`DeviceData`]s which
364/// we want to convert to a [`Device`].
365fn collect_device_updates(
366    verification_machine: VerificationMachine,
367    own_identity: Option<OwnUserIdentityData>,
368    identities: IdentityChanges,
369    devices: DeviceChanges,
370) -> DeviceUpdates {
371    let mut new: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
372    let mut changed: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
373
374    let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
375
376    let map_device = |device: DeviceData| {
377        let device_owner_identity = new_identities
378            .get(device.user_id())
379            .or_else(|| changed_identities.get(device.user_id()))
380            .or_else(|| unchanged_identities.get(device.user_id()))
381            .cloned();
382
383        Device {
384            inner: device,
385            verification_machine: verification_machine.to_owned(),
386            own_identity: own_identity.to_owned(),
387            device_owner_identity,
388        }
389    };
390
391    for device in devices.new {
392        let device = map_device(device);
393
394        new.entry(device.user_id().to_owned())
395            .or_default()
396            .insert(device.device_id().to_owned(), device);
397    }
398
399    for device in devices.changed {
400        let device = map_device(device);
401
402        changed
403            .entry(device.user_id().to_owned())
404            .or_default()
405            .insert(device.device_id().to_owned(), device.to_owned());
406    }
407
408    DeviceUpdates { new, changed }
409}
410
411/// A temporary transaction (that implies a write) to the underlying store.
412#[allow(missing_debug_implementations)]
413pub struct StoreTransaction {
414    store: Store,
415    changes: PendingChanges,
416    // TODO hold onto the cross-process crypto store lock + cache.
417    cache: OwnedRwLockWriteGuard<StoreCache>,
418}
419
420impl StoreTransaction {
421    /// Starts a new `StoreTransaction`.
422    async fn new(store: Store) -> Self {
423        let cache = store.inner.cache.clone();
424
425        Self { store, changes: PendingChanges::default(), cache: cache.clone().write_owned().await }
426    }
427
428    pub(crate) fn cache(&self) -> &StoreCache {
429        &self.cache
430    }
431
432    /// Returns a reference to the current `Store`.
433    pub fn store(&self) -> &Store {
434        &self.store
435    }
436
437    /// Gets a `Account` for update.
438    ///
439    /// Note: since it's guaranteed that one can't have both a
440    /// `StoreTransaction` and a `StoreCacheGuard` at runtime (since the
441    /// underlying `StoreCache` is guarded by a `RwLock` mutex), this ensures
442    /// that we can't have two copies of an `Account` alive at the same time.
443    pub async fn account(&mut self) -> Result<&mut Account> {
444        if self.changes.account.is_none() {
445            // Make sure the cache loaded the account.
446            let _ = self.cache.account().await?;
447            self.changes.account = self.cache.account.lock().await.take();
448        }
449        Ok(self.changes.account.as_mut().unwrap())
450    }
451
452    /// Commits all dirty fields to the store, and maintains the cache so it
453    /// reflects the current state of the database.
454    pub async fn commit(self) -> Result<()> {
455        if self.changes.is_empty() {
456            return Ok(());
457        }
458
459        // Save changes in the database.
460        let account = self.changes.account.as_ref().map(|acc| acc.deep_clone());
461
462        self.store.save_pending_changes(self.changes).await?;
463
464        // Make the cache coherent with the database.
465        if let Some(account) = account {
466            *self.cache.account.lock().await = Some(account);
467        }
468
469        Ok(())
470    }
471}
472
473#[derive(Debug)]
474struct StoreInner {
475    identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
476    store: Arc<CryptoStoreWrapper>,
477
478    /// In-memory cache for the current crypto store.
479    ///
480    /// ⚠ Must remain private.
481    cache: Arc<RwLock<StoreCache>>,
482
483    verification_machine: VerificationMachine,
484
485    /// Static account data that never changes (and thus can be loaded once and
486    /// for all when creating the store).
487    static_account: StaticAccountData,
488}
489
490/// Error describing what went wrong when importing private cross signing keys
491/// or the key backup key.
492#[derive(Debug, Error)]
493pub enum SecretImportError {
494    /// The key that we tried to import was invalid.
495    #[error("Error while importing {name}: {error}")]
496    Key {
497        /// The name of the secret that was being imported.
498        name: SecretName,
499        /// The key error that occurred.
500        error: vodozemac::KeyError,
501    },
502    /// The public key of the imported private key doesn't match the public
503    /// key that was uploaded to the server.
504    #[error(
505        "Error while importing {name}: The public key of the imported private \
506            key doesn't match the public key that was uploaded to the server"
507    )]
508    MismatchedPublicKeys {
509        /// The name of the secret that was being imported.
510        name: SecretName,
511    },
512    /// The new version of the identity couldn't be stored.
513    #[error(transparent)]
514    Store(#[from] CryptoStoreError),
515}
516
517/// Error describing what went wrong when exporting a [`SecretsBundle`].
518///
519/// The [`SecretsBundle`] can only be exported if we have all cross-signing
520/// private keys in the store.
521#[derive(Debug, Error)]
522pub enum SecretsBundleExportError {
523    /// The store itself had an error.
524    #[error(transparent)]
525    Store(#[from] CryptoStoreError),
526    /// We're missing one or multiple cross-signing keys.
527    #[error("The store is missing one or multiple cross-signing keys")]
528    MissingCrossSigningKey(KeyUsage),
529    /// We're missing all cross-signing keys.
530    #[error("The store doesn't contain any cross-signing keys")]
531    MissingCrossSigningKeys,
532    /// We have a backup key stored, but we don't know the version of the
533    /// backup.
534    #[error("The store contains a backup key, but no backup version")]
535    MissingBackupVersion,
536}
537
538impl Store {
539    /// Create a new Store.
540    pub(crate) fn new(
541        account: StaticAccountData,
542        identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
543        store: Arc<CryptoStoreWrapper>,
544        verification_machine: VerificationMachine,
545    ) -> Self {
546        Self {
547            inner: Arc::new(StoreInner {
548                static_account: account,
549                identity,
550                store: store.clone(),
551                verification_machine,
552                cache: Arc::new(RwLock::new(StoreCache {
553                    store,
554                    tracked_users: Default::default(),
555                    loaded_tracked_users: Default::default(),
556                    account: Default::default(),
557                })),
558            }),
559        }
560    }
561
562    /// UserId associated with this store
563    pub(crate) fn user_id(&self) -> &UserId {
564        &self.inner.static_account.user_id
565    }
566
567    /// DeviceId associated with this store
568    pub(crate) fn device_id(&self) -> &DeviceId {
569        self.inner.verification_machine.own_device_id()
570    }
571
572    /// The static data for the account associated with this store.
573    pub(crate) fn static_account(&self) -> &StaticAccountData {
574        &self.inner.static_account
575    }
576
577    pub(crate) async fn cache(&self) -> Result<StoreCacheGuard> {
578        // TODO: (bnjbvr, #2624) If configured with a cross-process lock:
579        // - try to take the lock,
580        // - if acquired, look if another process touched the underlying storage,
581        // - if yes, reload everything; if no, return current cache
582        Ok(StoreCacheGuard { cache: self.inner.cache.clone().read_owned().await })
583    }
584
585    pub(crate) async fn transaction(&self) -> StoreTransaction {
586        StoreTransaction::new(self.clone()).await
587    }
588
589    // Note: bnjbvr lost against borrowck here. Ideally, the `F` parameter would
590    // take a `&StoreTransaction`, but callers didn't quite like that.
591    pub(crate) async fn with_transaction<
592        T,
593        Fut: futures_core::Future<Output = Result<(StoreTransaction, T), crate::OlmError>>,
594        F: FnOnce(StoreTransaction) -> Fut,
595    >(
596        &self,
597        func: F,
598    ) -> Result<T, crate::OlmError> {
599        let tr = self.transaction().await;
600        let (tr, res) = func(tr).await?;
601        tr.commit().await?;
602        Ok(res)
603    }
604
605    #[cfg(test)]
606    /// test helper to reset the cross signing identity
607    pub(crate) async fn reset_cross_signing_identity(&self) {
608        self.inner.identity.lock().await.reset();
609    }
610
611    /// PrivateCrossSigningIdentity associated with this store
612    pub(crate) fn private_identity(&self) -> Arc<Mutex<PrivateCrossSigningIdentity>> {
613        self.inner.identity.clone()
614    }
615
616    /// Save the given Sessions to the store
617    pub(crate) async fn save_sessions(&self, sessions: &[Session]) -> Result<()> {
618        let changes = Changes { sessions: sessions.to_vec(), ..Default::default() };
619
620        self.save_changes(changes).await
621    }
622
623    pub(crate) async fn get_sessions(
624        &self,
625        sender_key: &str,
626    ) -> Result<Option<Arc<Mutex<Vec<Session>>>>> {
627        self.inner.store.get_sessions(sender_key).await
628    }
629
630    pub(crate) async fn save_changes(&self, changes: Changes) -> Result<()> {
631        self.inner.store.save_changes(changes).await
632    }
633
634    /// Compare the given `InboundGroupSession` with an existing session we have
635    /// in the store.
636    ///
637    /// This method returns `SessionOrdering::Better` if the given session is
638    /// better than the one we already have or if we don't have such a
639    /// session in the store.
640    pub(crate) async fn compare_group_session(
641        &self,
642        session: &InboundGroupSession,
643    ) -> Result<SessionOrdering> {
644        let old_session = self
645            .inner
646            .store
647            .get_inbound_group_session(session.room_id(), session.session_id())
648            .await?;
649
650        Ok(if let Some(old_session) = old_session {
651            session.compare(&old_session).await
652        } else {
653            SessionOrdering::Better
654        })
655    }
656
657    #[cfg(test)]
658    /// Testing helper to allow to save only a set of devices
659    pub(crate) async fn save_device_data(&self, devices: &[DeviceData]) -> Result<()> {
660        use types::DeviceChanges;
661
662        let changes = Changes {
663            devices: DeviceChanges { changed: devices.to_vec(), ..Default::default() },
664            ..Default::default()
665        };
666
667        self.save_changes(changes).await
668    }
669
670    /// Convenience helper to persist an array of [`InboundGroupSession`]s.
671    pub(crate) async fn save_inbound_group_sessions(
672        &self,
673        sessions: &[InboundGroupSession],
674    ) -> Result<()> {
675        let changes = Changes { inbound_group_sessions: sessions.to_vec(), ..Default::default() };
676
677        self.save_changes(changes).await
678    }
679
680    /// Get the display name of our own device.
681    pub(crate) async fn device_display_name(&self) -> Result<Option<String>, CryptoStoreError> {
682        Ok(self
683            .inner
684            .store
685            .get_device(self.user_id(), self.device_id())
686            .await?
687            .and_then(|d| d.display_name().map(|d| d.to_owned())))
688    }
689
690    /// Get the device data for the given [`UserId`] and [`DeviceId`].
691    ///
692    /// *Note*: This method will include our own device which is always present
693    /// in the store.
694    pub(crate) async fn get_device_data(
695        &self,
696        user_id: &UserId,
697        device_id: &DeviceId,
698    ) -> Result<Option<DeviceData>> {
699        self.inner.store.get_device(user_id, device_id).await
700    }
701
702    /// Get the device data for the given [`UserId`] and [`DeviceId`].
703    ///
704    /// *Note*: This method will **not** include our own device.
705    ///
706    /// Use this method if you need a list of recipients for a given user, since
707    /// we don't want to encrypt for our own device, otherwise take a look at
708    /// the [`Store::get_device_data_for_user`] method.
709    pub(crate) async fn get_device_data_for_user_filtered(
710        &self,
711        user_id: &UserId,
712    ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
713        self.inner.store.get_user_devices(user_id).await.map(|mut d| {
714            if user_id == self.user_id() {
715                d.remove(self.device_id());
716            }
717            d
718        })
719    }
720
721    /// Get the [`DeviceData`] for all the devices a user has.
722    ///
723    /// *Note*: This method will include our own device which is always present
724    /// in the store.
725    ///
726    /// Use this method if you need to operate on or update all devices of a
727    /// user, otherwise take a look at the
728    /// [`Store::get_device_data_for_user_filtered`] method.
729    pub(crate) async fn get_device_data_for_user(
730        &self,
731        user_id: &UserId,
732    ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
733        self.inner.store.get_user_devices(user_id).await
734    }
735
736    /// Get a [`Device`] for the given user with the given
737    /// [`Curve25519PublicKey`] key.
738    ///
739    /// *Note*: This method will include our own device which is always present
740    /// in the store.
741    pub(crate) async fn get_device_from_curve_key(
742        &self,
743        user_id: &UserId,
744        curve_key: Curve25519PublicKey,
745    ) -> Result<Option<Device>> {
746        self.get_user_devices(user_id)
747            .await
748            .map(|d| d.devices().find(|d| d.curve25519_key() == Some(curve_key)))
749    }
750
751    /// Get all devices associated with the given [`UserId`].
752    ///
753    /// This method is more expensive than the
754    /// [`Store::get_device_data_for_user`] method, since a [`Device`]
755    /// requires the [`OwnUserIdentityData`] and the [`UserIdentityData`] of the
756    /// device owner to be fetched from the store as well.
757    ///
758    /// *Note*: This method will include our own device which is always present
759    /// in the store.
760    pub(crate) async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices> {
761        let devices = self.get_device_data_for_user(user_id).await?;
762
763        let own_identity = self
764            .inner
765            .store
766            .get_user_identity(self.user_id())
767            .await?
768            .and_then(|i| i.own().cloned());
769        let device_owner_identity = self.inner.store.get_user_identity(user_id).await?;
770
771        Ok(UserDevices {
772            inner: devices,
773            verification_machine: self.inner.verification_machine.clone(),
774            own_identity,
775            device_owner_identity,
776        })
777    }
778
779    /// Get a [`Device`] for the given user with the given [`DeviceId`].
780    ///
781    /// This method is more expensive than the [`Store::get_device_data`] method
782    /// since a [`Device`] requires the [`OwnUserIdentityData`] and the
783    /// [`UserIdentityData`] of the device owner to be fetched from the
784    /// store as well.
785    ///
786    /// *Note*: This method will include our own device which is always present
787    /// in the store.
788    pub(crate) async fn get_device(
789        &self,
790        user_id: &UserId,
791        device_id: &DeviceId,
792    ) -> Result<Option<Device>> {
793        if let Some(device_data) = self.inner.store.get_device(user_id, device_id).await? {
794            Ok(Some(self.wrap_device_data(device_data).await?))
795        } else {
796            Ok(None)
797        }
798    }
799
800    /// Create a new device using the supplied [`DeviceData`]. Normally we would
801    /// call [`Self::get_device`] to find an existing device inside this
802    /// store. Only call this if you have some existing DeviceData and want
803    /// to wrap it with the extra information provided by a [`Device`].
804    pub(crate) async fn wrap_device_data(&self, device_data: DeviceData) -> Result<Device> {
805        let own_identity = self
806            .inner
807            .store
808            .get_user_identity(self.user_id())
809            .await?
810            .and_then(|i| i.own().cloned());
811
812        let device_owner_identity =
813            self.inner.store.get_user_identity(device_data.user_id()).await?;
814
815        Ok(Device {
816            inner: device_data,
817            verification_machine: self.inner.verification_machine.clone(),
818            own_identity,
819            device_owner_identity,
820        })
821    }
822
823    ///  Get the Identity of `user_id`
824    pub(crate) async fn get_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
825        let own_identity = self
826            .inner
827            .store
828            .get_user_identity(self.user_id())
829            .await?
830            .and_then(as_variant!(UserIdentityData::Own));
831
832        Ok(self.inner.store.get_user_identity(user_id).await?.map(|i| {
833            UserIdentity::new(
834                self.clone(),
835                i,
836                self.inner.verification_machine.to_owned(),
837                own_identity,
838            )
839        }))
840    }
841
842    /// Try to export the secret with the given secret name.
843    ///
844    /// The exported secret will be encoded as unpadded base64. Returns `Null`
845    /// if the secret can't be found.
846    ///
847    /// # Arguments
848    ///
849    /// * `secret_name` - The name of the secret that should be exported.
850    pub async fn export_secret(
851        &self,
852        secret_name: &SecretName,
853    ) -> Result<Option<String>, CryptoStoreError> {
854        Ok(match secret_name {
855            SecretName::CrossSigningMasterKey
856            | SecretName::CrossSigningUserSigningKey
857            | SecretName::CrossSigningSelfSigningKey => {
858                self.inner.identity.lock().await.export_secret(secret_name).await
859            }
860            SecretName::RecoveryKey => {
861                if let Some(key) = self.load_backup_keys().await?.decryption_key {
862                    let exported = key.to_base64();
863                    Some(exported)
864                } else {
865                    None
866                }
867            }
868            name => {
869                warn!(secret = ?name, "Unknown secret was requested");
870                None
871            }
872        })
873    }
874
875    /// Export all the private cross signing keys we have.
876    ///
877    /// The export will contain the seed for the ed25519 keys as a unpadded
878    /// base64 encoded string.
879    ///
880    /// This method returns `None` if we don't have any private cross signing
881    /// keys.
882    pub async fn export_cross_signing_keys(
883        &self,
884    ) -> Result<Option<CrossSigningKeyExport>, CryptoStoreError> {
885        let master_key = self.export_secret(&SecretName::CrossSigningMasterKey).await?;
886        let self_signing_key = self.export_secret(&SecretName::CrossSigningSelfSigningKey).await?;
887        let user_signing_key = self.export_secret(&SecretName::CrossSigningUserSigningKey).await?;
888
889        Ok(if master_key.is_none() && self_signing_key.is_none() && user_signing_key.is_none() {
890            None
891        } else {
892            Some(CrossSigningKeyExport { master_key, self_signing_key, user_signing_key })
893        })
894    }
895
896    /// Import our private cross signing keys.
897    ///
898    /// The export needs to contain the seed for the Ed25519 keys as an unpadded
899    /// base64 encoded string.
900    pub async fn import_cross_signing_keys(
901        &self,
902        export: CrossSigningKeyExport,
903    ) -> Result<CrossSigningStatus, SecretImportError> {
904        if let Some(public_identity) =
905            self.get_identity(self.user_id()).await?.and_then(|i| i.own())
906        {
907            let identity = self.inner.identity.lock().await;
908
909            identity
910                .import_secrets(
911                    public_identity.to_owned(),
912                    export.master_key.as_deref(),
913                    export.self_signing_key.as_deref(),
914                    export.user_signing_key.as_deref(),
915                )
916                .await?;
917
918            let status = identity.status().await;
919
920            let diff = identity.get_public_identity_diff(&public_identity.inner).await;
921
922            let mut changes =
923                Changes { private_identity: Some(identity.clone()), ..Default::default() };
924
925            if diff.none_differ() {
926                public_identity.mark_as_verified();
927                changes.identities.changed.push(UserIdentityData::Own(public_identity.inner));
928            }
929
930            info!(?status, "Successfully imported the private cross-signing keys");
931
932            self.save_changes(changes).await?;
933        } else {
934            warn!(
935                "No public identity found while importing cross-signing keys, \
936                 a /keys/query needs to be done"
937            );
938        }
939
940        Ok(self.inner.identity.lock().await.status().await)
941    }
942
943    /// Export all the secrets we have in the store into a [`SecretsBundle`].
944    ///
945    /// This method will export all the private cross-signing keys and, if
946    /// available, the private part of a backup key and its accompanying
947    /// version.
948    ///
949    /// The method will fail if we don't have all three private cross-signing
950    /// keys available.
951    ///
952    /// **Warning**: Only export this and share it with a trusted recipient,
953    /// i.e. if an existing device is sharing this with a new device.
954    pub async fn export_secrets_bundle(&self) -> Result<SecretsBundle, SecretsBundleExportError> {
955        let Some(cross_signing) = self.export_cross_signing_keys().await? else {
956            return Err(SecretsBundleExportError::MissingCrossSigningKeys);
957        };
958
959        let Some(master_key) = cross_signing.master_key.clone() else {
960            return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::Master));
961        };
962
963        let Some(user_signing_key) = cross_signing.user_signing_key.clone() else {
964            return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::UserSigning));
965        };
966
967        let Some(self_signing_key) = cross_signing.self_signing_key.clone() else {
968            return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::SelfSigning));
969        };
970
971        let backup_keys = self.load_backup_keys().await?;
972
973        let backup = if let Some(key) = backup_keys.decryption_key {
974            if let Some(backup_version) = backup_keys.backup_version {
975                Some(BackupSecrets::MegolmBackupV1Curve25519AesSha2(
976                    MegolmBackupV1Curve25519AesSha2Secrets { key, backup_version },
977                ))
978            } else {
979                return Err(SecretsBundleExportError::MissingBackupVersion);
980            }
981        } else {
982            None
983        };
984
985        Ok(SecretsBundle {
986            cross_signing: CrossSigningSecrets { master_key, user_signing_key, self_signing_key },
987            backup,
988        })
989    }
990
991    /// Import and persists secrets from a [`SecretsBundle`].
992    ///
993    /// This method will import all the private cross-signing keys and, if
994    /// available, the private part of a backup key and its accompanying
995    /// version into the store.
996    ///
997    /// **Warning**: Only import this from a trusted source, i.e. if an existing
998    /// device is sharing this with a new device. The imported cross-signing
999    /// keys will create a [`OwnUserIdentity`] and mark it as verified.
1000    ///
1001    /// The backup key will be persisted in the store and can be enabled using
1002    /// the [`BackupMachine`].
1003    pub async fn import_secrets_bundle(
1004        &self,
1005        bundle: &SecretsBundle,
1006    ) -> Result<(), SecretImportError> {
1007        let mut changes = Changes::default();
1008
1009        if let Some(backup_bundle) = &bundle.backup {
1010            match backup_bundle {
1011                BackupSecrets::MegolmBackupV1Curve25519AesSha2(bundle) => {
1012                    changes.backup_decryption_key = Some(bundle.key.clone());
1013                    changes.backup_version = Some(bundle.backup_version.clone());
1014                }
1015            }
1016        }
1017
1018        let identity = self.inner.identity.lock().await;
1019
1020        identity
1021            .import_secrets_unchecked(
1022                Some(&bundle.cross_signing.master_key),
1023                Some(&bundle.cross_signing.self_signing_key),
1024                Some(&bundle.cross_signing.user_signing_key),
1025            )
1026            .await?;
1027
1028        let public_identity = identity.to_public_identity().await.expect(
1029            "We should be able to create a new public identity since we just imported \
1030             all the private cross-signing keys",
1031        );
1032
1033        changes.private_identity = Some(identity.clone());
1034        changes.identities.new.push(UserIdentityData::Own(public_identity));
1035
1036        Ok(self.save_changes(changes).await?)
1037    }
1038
1039    /// Import the given `secret` named `secret_name` into the keystore.
1040    pub async fn import_secret(&self, secret: &GossippedSecret) -> Result<(), SecretImportError> {
1041        match &secret.secret_name {
1042            SecretName::CrossSigningMasterKey
1043            | SecretName::CrossSigningUserSigningKey
1044            | SecretName::CrossSigningSelfSigningKey => {
1045                if let Some(public_identity) =
1046                    self.get_identity(self.user_id()).await?.and_then(|i| i.own())
1047                {
1048                    let identity = self.inner.identity.lock().await;
1049
1050                    identity
1051                        .import_secret(
1052                            public_identity,
1053                            &secret.secret_name,
1054                            &secret.event.content.secret,
1055                        )
1056                        .await?;
1057                    info!(
1058                        secret_name = ?secret.secret_name,
1059                        "Successfully imported a private cross signing key"
1060                    );
1061
1062                    let changes =
1063                        Changes { private_identity: Some(identity.clone()), ..Default::default() };
1064
1065                    self.save_changes(changes).await?;
1066                }
1067            }
1068            SecretName::RecoveryKey => {
1069                // We don't import the decryption key here since we'll want to
1070                // check if the public key matches to the latest version on the
1071                // server. We instead put the secret into a secret inbox where
1072                // it will stay until it either gets overwritten
1073                // or the user accepts the secret.
1074            }
1075            name => {
1076                warn!(secret = ?name, "Tried to import an unknown secret");
1077            }
1078        }
1079
1080        Ok(())
1081    }
1082
1083    /// Check whether there is a global flag to only encrypt messages for
1084    /// trusted devices or for everyone.
1085    pub async fn get_only_allow_trusted_devices(&self) -> Result<bool> {
1086        let value = self.get_value("only_allow_trusted_devices").await?.unwrap_or_default();
1087        Ok(value)
1088    }
1089
1090    /// Set global flag whether to encrypt messages for untrusted devices, or
1091    /// whether they should be excluded from the conversation.
1092    pub async fn set_only_allow_trusted_devices(
1093        &self,
1094        block_untrusted_devices: bool,
1095    ) -> Result<()> {
1096        self.set_value("only_allow_trusted_devices", &block_untrusted_devices).await
1097    }
1098
1099    /// Get custom stored value associated with a key
1100    pub async fn get_value<T: DeserializeOwned>(&self, key: &str) -> Result<Option<T>> {
1101        let Some(value) = self.get_custom_value(key).await? else {
1102            return Ok(None);
1103        };
1104        let deserialized = self.deserialize_value(&value)?;
1105        Ok(Some(deserialized))
1106    }
1107
1108    /// Store custom value associated with a key
1109    pub async fn set_value(&self, key: &str, value: &impl Serialize) -> Result<()> {
1110        let serialized = self.serialize_value(value)?;
1111        self.set_custom_value(key, serialized).await?;
1112        Ok(())
1113    }
1114
1115    fn serialize_value(&self, value: &impl Serialize) -> Result<Vec<u8>> {
1116        let serialized =
1117            rmp_serde::to_vec_named(value).map_err(|x| CryptoStoreError::Backend(x.into()))?;
1118        Ok(serialized)
1119    }
1120
1121    fn deserialize_value<T: DeserializeOwned>(&self, value: &[u8]) -> Result<T> {
1122        let deserialized =
1123            rmp_serde::from_slice(value).map_err(|e| CryptoStoreError::Backend(e.into()))?;
1124        Ok(deserialized)
1125    }
1126
1127    /// Receive notifications of room keys being received as a [`Stream`].
1128    ///
1129    /// Each time a room key is updated in any way, an update will be sent to
1130    /// the stream. Updates that happen at the same time are batched into a
1131    /// [`Vec`].
1132    ///
1133    /// If the reader of the stream lags too far behind an error will be sent to
1134    /// the reader.
1135    ///
1136    /// The stream will terminate once all references to the underlying
1137    /// `CryptoStoreWrapper` are dropped.
1138    pub fn room_keys_received_stream(
1139        &self,
1140    ) -> impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<> {
1141        self.inner.store.room_keys_received_stream()
1142    }
1143
1144    /// Receive notifications of received `m.room_key.withheld` messages.
1145    ///
1146    /// Each time an `m.room_key.withheld` is received and stored, an update
1147    /// will be sent to the stream. Updates that happen at the same time are
1148    /// batched into a [`Vec`].
1149    ///
1150    /// If the reader of the stream lags too far behind, a warning will be
1151    /// logged and items will be dropped.
1152    pub fn room_keys_withheld_received_stream(
1153        &self,
1154    ) -> impl Stream<Item = Vec<RoomKeyWithheldInfo>> + use<> {
1155        self.inner.store.room_keys_withheld_received_stream()
1156    }
1157
1158    /// Returns a stream of user identity updates, allowing users to listen for
1159    /// notifications about new or changed user identities.
1160    ///
1161    /// The stream produced by this method emits updates whenever a new user
1162    /// identity is discovered or when an existing identities information is
1163    /// changed. Users can subscribe to this stream and receive updates in
1164    /// real-time.
1165    ///
1166    /// Caution: the returned stream will never terminate, and it holds a
1167    /// reference to the [`CryptoStore`]. Listeners should be careful to avoid
1168    /// resource leaks.
1169    ///
1170    /// # Examples
1171    ///
1172    /// ```no_run
1173    /// # use matrix_sdk_crypto::OlmMachine;
1174    /// # use ruma::{device_id, user_id};
1175    /// # use futures_util::{pin_mut, StreamExt};
1176    /// # let machine: OlmMachine = unimplemented!();
1177    /// # futures_executor::block_on(async {
1178    /// let identities_stream = machine.store().user_identities_stream();
1179    /// pin_mut!(identities_stream);
1180    ///
1181    /// for identity_updates in identities_stream.next().await {
1182    ///     for (_, identity) in identity_updates.new {
1183    ///         println!("A new identity has been added {}", identity.user_id());
1184    ///     }
1185    /// }
1186    /// # });
1187    /// ```
1188    pub fn user_identities_stream(&self) -> impl Stream<Item = IdentityUpdates> + use<> {
1189        let verification_machine = self.inner.verification_machine.to_owned();
1190
1191        let this = self.clone();
1192        self.inner.store.identities_stream().map(move |(own_identity, identities, _)| {
1193            let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
1194
1195            let map_identity = |(user_id, identity)| {
1196                (
1197                    user_id,
1198                    UserIdentity::new(
1199                        this.clone(),
1200                        identity,
1201                        verification_machine.to_owned(),
1202                        own_identity.to_owned(),
1203                    ),
1204                )
1205            };
1206
1207            let new = new_identities.into_iter().map(map_identity).collect();
1208            let changed = changed_identities.into_iter().map(map_identity).collect();
1209            let unchanged = unchanged_identities.into_iter().map(map_identity).collect();
1210
1211            IdentityUpdates { new, changed, unchanged }
1212        })
1213    }
1214
1215    /// Returns a stream of device updates, allowing users to listen for
1216    /// notifications about new or changed devices.
1217    ///
1218    /// The stream produced by this method emits updates whenever a new device
1219    /// is discovered or when an existing device's information is changed. Users
1220    /// can subscribe to this stream and receive updates in real-time.
1221    ///
1222    /// Caution: the returned stream will never terminate, and it holds a
1223    /// reference to the [`CryptoStore`]. Listeners should be careful to avoid
1224    /// resource leaks.
1225    ///
1226    /// # Examples
1227    ///
1228    /// ```no_run
1229    /// # use matrix_sdk_crypto::OlmMachine;
1230    /// # use ruma::{device_id, user_id};
1231    /// # use futures_util::{pin_mut, StreamExt};
1232    /// # let machine: OlmMachine = unimplemented!();
1233    /// # futures_executor::block_on(async {
1234    /// let devices_stream = machine.store().devices_stream();
1235    /// pin_mut!(devices_stream);
1236    ///
1237    /// for device_updates in devices_stream.next().await {
1238    ///     if let Some(user_devices) = device_updates.new.get(machine.user_id()) {
1239    ///         for device in user_devices.values() {
1240    ///             println!("A new device has been added {}", device.device_id());
1241    ///         }
1242    ///     }
1243    /// }
1244    /// # });
1245    /// ```
1246    pub fn devices_stream(&self) -> impl Stream<Item = DeviceUpdates> + use<> {
1247        let verification_machine = self.inner.verification_machine.to_owned();
1248
1249        self.inner.store.identities_stream().map(move |(own_identity, identities, devices)| {
1250            collect_device_updates(
1251                verification_machine.to_owned(),
1252                own_identity,
1253                identities,
1254                devices,
1255            )
1256        })
1257    }
1258
1259    /// Returns a [`Stream`] of user identity and device updates
1260    ///
1261    /// The stream returned by this method returns the same data as
1262    /// [`Store::user_identities_stream`] and [`Store::devices_stream`] but does
1263    /// not include references to the `VerificationMachine`. It is therefore a
1264    /// lower-level view on that data.
1265    ///
1266    /// The stream will terminate once all references to the underlying
1267    /// `CryptoStoreWrapper` are dropped.
1268    pub fn identities_stream_raw(
1269        &self,
1270    ) -> impl Stream<Item = (IdentityChanges, DeviceChanges)> + use<> {
1271        self.inner.store.identities_stream().map(|(_, identities, devices)| (identities, devices))
1272    }
1273
1274    /// Creates a [`CrossProcessLock`] for this store, that will contain the
1275    /// given key and value when hold.
1276    pub fn create_store_lock(
1277        &self,
1278        lock_key: String,
1279        lock_value: String,
1280    ) -> CrossProcessLock<LockableCryptoStore> {
1281        self.inner.store.create_store_lock(lock_key, lock_value)
1282    }
1283
1284    /// Receive notifications of gossipped secrets being received and stored in
1285    /// the secret inbox as a [`Stream`].
1286    ///
1287    /// The gossipped secrets are received using the `m.secret.send` event type
1288    /// and are guaranteed to have been received over a 1-to-1 Olm
1289    /// [`Session`] from a verified [`Device`].
1290    ///
1291    /// The [`GossippedSecret`] can also be later found in the secret inbox and
1292    /// retrieved using the [`CryptoStore::get_secrets_from_inbox()`] method.
1293    ///
1294    /// After a suitable secret of a certain type has been found it can be
1295    /// removed from the store
1296    /// using the [`CryptoStore::delete_secrets_from_inbox()`] method.
1297    ///
1298    /// The only secret this will currently broadcast is the
1299    /// `m.megolm_backup.v1`.
1300    ///
1301    /// If the reader of the stream lags too far behind, a warning will be
1302    /// logged and items will be dropped.
1303    ///
1304    /// # Examples
1305    ///
1306    /// ```no_run
1307    /// # use matrix_sdk_crypto::OlmMachine;
1308    /// # use ruma::{device_id, user_id};
1309    /// # use futures_util::{pin_mut, StreamExt};
1310    /// # let alice = user_id!("@alice:example.org").to_owned();
1311    /// # futures_executor::block_on(async {
1312    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1313    ///
1314    /// let secret_stream = machine.store().secrets_stream();
1315    /// pin_mut!(secret_stream);
1316    ///
1317    /// for secret in secret_stream.next().await {
1318    ///     // Accept the secret if it's valid, then delete all the secrets of this type.
1319    ///     machine.store().delete_secrets_from_inbox(&secret.secret_name);
1320    /// }
1321    /// # });
1322    /// ```
1323    pub fn secrets_stream(&self) -> impl Stream<Item = GossippedSecret> + use<> {
1324        self.inner.store.secrets_stream()
1325    }
1326
1327    /// Receive notifications of historic room key bundles as a [`Stream`].
1328    ///
1329    /// Historic room key bundles are defined in [MSC4268](https://github.com/matrix-org/matrix-spec-proposals/pull/4268).
1330    ///
1331    /// Each time a historic room key bundle was received, an update will be
1332    /// sent to the stream. This stream can be used to accept historic room key
1333    /// bundles that arrive out of order, i.e. the bundle arrives after the
1334    /// user has already accepted a room invitation.
1335    ///
1336    /// # Examples
1337    ///
1338    /// ```no_run
1339    /// # use matrix_sdk_crypto::{
1340    /// #    OlmMachine,
1341    /// #    store::types::StoredRoomKeyBundleData,
1342    /// #    types::room_history::RoomKeyBundle
1343    /// # };
1344    /// # use ruma::{device_id, user_id};
1345    /// # use futures_util::{pin_mut, StreamExt};
1346    /// # let alice = user_id!("@alice:example.org").to_owned();
1347    /// # async {
1348    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1349    /// let bundle_stream = machine.store().historic_room_key_stream();
1350    /// pin_mut!(bundle_stream);
1351    ///
1352    /// while let Some(bundle_info) = bundle_stream.next().await {
1353    ///     // Try to find the bundle content in the store and if it's valid accept it.
1354    ///     if let Some(bundle_data) = machine.store().get_received_room_key_bundle_data(&bundle_info.room_id, &bundle_info.sender).await? {
1355    ///         // Download the bundle now and import it.
1356    ///         let bundle: RoomKeyBundle = todo!("Download the bundle");
1357    ///         machine.store().receive_room_key_bundle(
1358    ///             &bundle_data,
1359    ///             bundle,
1360    ///             |_, _| {},
1361    ///         ).await?;
1362    ///     }
1363    /// }
1364    /// # anyhow::Ok(()) };
1365    /// ```
1366    pub fn historic_room_key_stream(&self) -> impl Stream<Item = RoomKeyBundleInfo> + use<> {
1367        self.inner.store.historic_room_key_stream()
1368    }
1369
1370    /// Import the given room keys into the store.
1371    ///
1372    /// # Arguments
1373    ///
1374    /// * `exported_keys` - The keys to be imported.
1375    /// * `from_backup_version` - If the keys came from key backup, the key
1376    ///   backup version. This will cause the keys to be marked as already
1377    ///   backed up, and therefore not requiring another backup.
1378    /// * `progress_listener` - Callback which will be called after each key is
1379    ///   processed. Called with arguments `(processed, total)` where
1380    ///   `processed` is the number of keys processed so far, and `total` is the
1381    ///   total number of keys (i.e., `exported_keys.len()`).
1382    pub async fn import_room_keys(
1383        &self,
1384        exported_keys: Vec<ExportedRoomKey>,
1385        from_backup_version: Option<&str>,
1386        progress_listener: impl Fn(usize, usize),
1387    ) -> Result<RoomKeyImportResult> {
1388        let exported_keys: Vec<&ExportedRoomKey> = exported_keys.iter().collect();
1389        self.import_sessions_impl(exported_keys, from_backup_version, progress_listener).await
1390    }
1391
1392    /// Import the given room keys into our store.
1393    ///
1394    /// # Arguments
1395    ///
1396    /// * `exported_keys` - A list of previously exported keys that should be
1397    ///   imported into our store. If we already have a better version of a key
1398    ///   the key will *not* be imported.
1399    ///
1400    /// Returns a tuple of numbers that represent the number of sessions that
1401    /// were imported and the total number of sessions that were found in the
1402    /// key export.
1403    ///
1404    /// # Examples
1405    ///
1406    /// ```no_run
1407    /// # use std::io::Cursor;
1408    /// # use matrix_sdk_crypto::{OlmMachine, decrypt_room_key_export};
1409    /// # use ruma::{device_id, user_id};
1410    /// # let alice = user_id!("@alice:example.org");
1411    /// # async {
1412    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1413    /// # let export = Cursor::new("".to_owned());
1414    /// let exported_keys = decrypt_room_key_export(export, "1234").unwrap();
1415    /// machine.store().import_exported_room_keys(exported_keys, |_, _| {}).await.unwrap();
1416    /// # };
1417    /// ```
1418    pub async fn import_exported_room_keys(
1419        &self,
1420        exported_keys: Vec<ExportedRoomKey>,
1421        progress_listener: impl Fn(usize, usize),
1422    ) -> Result<RoomKeyImportResult> {
1423        self.import_room_keys(exported_keys, None, progress_listener).await
1424    }
1425
1426    async fn import_sessions_impl<T>(
1427        &self,
1428        room_keys: Vec<T>,
1429        from_backup_version: Option<&str>,
1430        progress_listener: impl Fn(usize, usize),
1431    ) -> Result<RoomKeyImportResult>
1432    where
1433        T: TryInto<InboundGroupSession> + RoomKeyExport + Copy,
1434        T::Error: Debug,
1435    {
1436        let mut sessions = Vec::new();
1437
1438        async fn new_session_better(
1439            session: &InboundGroupSession,
1440            old_session: Option<InboundGroupSession>,
1441        ) -> bool {
1442            if let Some(old_session) = &old_session {
1443                session.compare(old_session).await == SessionOrdering::Better
1444            } else {
1445                true
1446            }
1447        }
1448
1449        let total_count = room_keys.len();
1450        let mut keys = BTreeMap::new();
1451
1452        for (i, key) in room_keys.into_iter().enumerate() {
1453            match key.try_into() {
1454                Ok(session) => {
1455                    let old_session = self
1456                        .inner
1457                        .store
1458                        .get_inbound_group_session(session.room_id(), session.session_id())
1459                        .await?;
1460
1461                    // Only import the session if we didn't have this session or
1462                    // if it's a better version of the same session.
1463                    if new_session_better(&session, old_session).await {
1464                        if from_backup_version.is_some() {
1465                            session.mark_as_backed_up();
1466                        }
1467
1468                        keys.entry(session.room_id().to_owned())
1469                            .or_insert_with(BTreeMap::new)
1470                            .entry(session.sender_key().to_base64())
1471                            .or_insert_with(BTreeSet::new)
1472                            .insert(session.session_id().to_owned());
1473
1474                        sessions.push(session);
1475                    }
1476                }
1477                Err(e) => {
1478                    warn!(
1479                        sender_key = key.sender_key().to_base64(),
1480                        room_id = ?key.room_id(),
1481                        session_id = key.session_id(),
1482                        error = ?e,
1483                        "Couldn't import a room key from a file export."
1484                    );
1485                }
1486            }
1487
1488            progress_listener(i, total_count);
1489        }
1490
1491        let imported_count = sessions.len();
1492
1493        self.inner.store.save_inbound_group_sessions(sessions, from_backup_version).await?;
1494
1495        info!(total_count, imported_count, room_keys = ?keys, "Successfully imported room keys");
1496
1497        Ok(RoomKeyImportResult::new(imported_count, total_count, keys))
1498    }
1499
1500    pub(crate) fn crypto_store(&self) -> Arc<CryptoStoreWrapper> {
1501        self.inner.store.clone()
1502    }
1503
1504    /// Export the keys that match the given predicate.
1505    ///
1506    /// # Arguments
1507    ///
1508    /// * `predicate` - A closure that will be called for every known
1509    ///   `InboundGroupSession`, which represents a room key. If the closure
1510    ///   returns `true` the `InboundGroupSession` will be included in the
1511    ///   export, if the closure returns `false` it will not be included.
1512    ///
1513    /// # Examples
1514    ///
1515    /// ```no_run
1516    /// # use matrix_sdk_crypto::{OlmMachine, encrypt_room_key_export};
1517    /// # use ruma::{device_id, user_id, room_id};
1518    /// # let alice = user_id!("@alice:example.org");
1519    /// # async {
1520    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1521    /// let room_id = room_id!("!test:localhost");
1522    /// let exported_keys = machine.store().export_room_keys(|s| s.room_id() == room_id).await.unwrap();
1523    /// let encrypted_export = encrypt_room_key_export(&exported_keys, "1234", 1);
1524    /// # };
1525    /// ```
1526    pub async fn export_room_keys(
1527        &self,
1528        predicate: impl FnMut(&InboundGroupSession) -> bool,
1529    ) -> Result<Vec<ExportedRoomKey>> {
1530        let mut exported = Vec::new();
1531
1532        let mut sessions = self.get_inbound_group_sessions().await?;
1533        sessions.retain(predicate);
1534
1535        for session in sessions {
1536            let export = session.export().await;
1537            exported.push(export);
1538        }
1539
1540        Ok(exported)
1541    }
1542
1543    /// Export room keys matching a predicate, providing them as an async
1544    /// `Stream`.
1545    ///
1546    /// # Arguments
1547    ///
1548    /// * `predicate` - A closure that will be called for every known
1549    ///   `InboundGroupSession`, which represents a room key. If the closure
1550    ///   returns `true` the `InboundGroupSession` will be included in the
1551    ///   export, if the closure returns `false` it will not be included.
1552    ///
1553    /// # Examples
1554    ///
1555    /// ```no_run
1556    /// use std::pin::pin;
1557    ///
1558    /// use matrix_sdk_crypto::{olm::ExportedRoomKey, OlmMachine};
1559    /// use ruma::{device_id, room_id, user_id};
1560    /// use tokio_stream::StreamExt;
1561    /// # async {
1562    /// let alice = user_id!("@alice:example.org");
1563    /// let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1564    /// let room_id = room_id!("!test:localhost");
1565    /// let mut keys = pin!(machine
1566    ///     .store()
1567    ///     .export_room_keys_stream(|s| s.room_id() == room_id)
1568    ///     .await
1569    ///     .unwrap());
1570    /// while let Some(key) = keys.next().await {
1571    ///     println!("{}", key.room_id);
1572    /// }
1573    /// # };
1574    /// ```
1575    pub async fn export_room_keys_stream(
1576        &self,
1577        predicate: impl FnMut(&InboundGroupSession) -> bool,
1578    ) -> Result<impl Stream<Item = ExportedRoomKey>> {
1579        // TODO: if/when there is a get_inbound_group_sessions_stream, use that here.
1580        let sessions = self.get_inbound_group_sessions().await?;
1581        Ok(futures_util::stream::iter(sessions.into_iter().filter(predicate))
1582            .then(|session| async move { session.export().await }))
1583    }
1584
1585    /// Assemble a room key bundle for sharing encrypted history, as per
1586    /// [MSC4268].
1587    ///
1588    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
1589    pub async fn build_room_key_bundle(
1590        &self,
1591        room_id: &RoomId,
1592    ) -> std::result::Result<RoomKeyBundle, CryptoStoreError> {
1593        let sessions = self.get_inbound_group_sessions_by_room_id(room_id).await?;
1594
1595        let mut bundle = RoomKeyBundle::default();
1596        for session in sessions {
1597            if session.shared_history() {
1598                bundle.room_keys.push(session.export().await.into());
1599            } else {
1600                bundle.withheld.push(RoomKeyWithheldContent::new(
1601                    session.algorithm().to_owned(),
1602                    WithheldCode::Unauthorised,
1603                    session.room_id().to_owned(),
1604                    session.session_id().to_owned(),
1605                    session.sender_key().to_owned(),
1606                    self.device_id().to_owned(),
1607                ));
1608            }
1609        }
1610
1611        Ok(bundle)
1612    }
1613
1614    /// Import the contents of a downloaded and decrypted [MSC4268] key bundle.
1615    ///
1616    /// # Arguments
1617    ///
1618    /// * `bundle_info` - The [`StoredRoomKeyBundleData`] of the bundle that is
1619    ///   being received.
1620    /// * `bundle` - The decrypted and deserialized bundle itself.
1621    ///
1622    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
1623    #[instrument(skip(self, bundle, progress_listener), fields(bundle_size = bundle.room_keys.len(), sender_data))]
1624    pub async fn receive_room_key_bundle(
1625        &self,
1626        bundle_info: &StoredRoomKeyBundleData,
1627        bundle: RoomKeyBundle,
1628        progress_listener: impl Fn(usize, usize),
1629    ) -> Result<(), CryptoStoreError> {
1630        let sender_data = if bundle_info.sender_data.should_recalculate() {
1631            let device = self
1632                .get_device_from_curve_key(&bundle_info.sender_user, bundle_info.sender_key)
1633                .await?;
1634
1635            device
1636                .as_ref()
1637                .map(SenderData::from_device)
1638                .unwrap_or_else(|| bundle_info.sender_data.clone())
1639        } else {
1640            bundle_info.sender_data.clone()
1641        };
1642
1643        tracing::Span::current().record("sender_data", tracing::field::debug(&sender_data));
1644
1645        match sender_data {
1646            SenderData::UnknownDevice { .. }
1647            | SenderData::VerificationViolation(_)
1648            | SenderData::DeviceInfo { .. } => {
1649                warn!("Not accepting a historic room key bundle due to insufficient trust in the sender");
1650                Ok(())
1651            }
1652            SenderData::SenderUnverified(_) | SenderData::SenderVerified(_) => {
1653                let (good, bad): (Vec<_>, Vec<_>) = bundle.room_keys.iter().partition_map(|key| {
1654                    if key.room_id != bundle_info.bundle_data.room_id {
1655                        trace!("Ignoring key for incorrect room {} in bundle", key.room_id);
1656                        Either::Right(key)
1657                    } else {
1658                        Either::Left(key)
1659                    }
1660                });
1661
1662                match (bad.is_empty(), good.is_empty()) {
1663                    // Case 1: Completely empty bundle.
1664                    (true, true) => {
1665                        warn!("Received a completely empty room key bundle");
1666                    }
1667
1668                    // Case 2: A bundle for the wrong room.
1669                    (false, true) => {
1670                        let bad_keys: Vec<_> =
1671                            bad.iter().map(|&key| (&key.room_id, &key.session_id)).collect();
1672
1673                        warn!(
1674                    ?bad_keys,
1675                    "Received a room key bundle for the wrong room, ignoring all room keys from the bundle"
1676                );
1677                    }
1678
1679                    // Case 3: A bundle containing useful room keys.
1680                    (_, false) => {
1681                        // We have at least some good keys, if we also have some bad ones let's
1682                        // mention that here.
1683                        if !bad.is_empty() {
1684                            warn!(
1685                                bad_key_count = bad.len(),
1686                                "The room key bundle contained some room keys \
1687                         that were meant for a different room"
1688                            );
1689                        }
1690
1691                        self.import_sessions_impl(good, None, progress_listener).await?;
1692                    }
1693                }
1694
1695                Ok(())
1696            }
1697        }
1698    }
1699}
1700
1701impl Deref for Store {
1702    type Target = DynCryptoStore;
1703
1704    fn deref(&self) -> &Self::Target {
1705        self.inner.store.deref().deref()
1706    }
1707}
1708
1709/// A crypto store that implements primitives for cross-process locking.
1710#[derive(Clone, Debug)]
1711pub struct LockableCryptoStore(Arc<dyn CryptoStore<Error = CryptoStoreError>>);
1712
1713impl matrix_sdk_common::cross_process_lock::TryLock for LockableCryptoStore {
1714    type LockError = CryptoStoreError;
1715
1716    async fn try_lock(
1717        &self,
1718        lease_duration_ms: u32,
1719        key: &str,
1720        holder: &str,
1721    ) -> std::result::Result<bool, Self::LockError> {
1722        self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
1723    }
1724}
1725
1726#[cfg(test)]
1727mod tests {
1728    use std::pin::pin;
1729
1730    use futures_util::StreamExt;
1731    use insta::{_macro_support::Content, assert_json_snapshot, internals::ContentPath};
1732    use matrix_sdk_test::async_test;
1733    use ruma::{device_id, room_id, user_id, RoomId};
1734    use vodozemac::megolm::SessionKey;
1735
1736    use crate::{
1737        machine::test_helpers::get_machine_pair,
1738        olm::{InboundGroupSession, SenderData},
1739        store::types::DehydratedDeviceKey,
1740        types::EventEncryptionAlgorithm,
1741        OlmMachine,
1742    };
1743
1744    #[async_test]
1745    async fn test_import_room_keys_notifies_stream() {
1746        use futures_util::FutureExt;
1747
1748        let (alice, bob, _) =
1749            get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
1750
1751        let room1_id = room_id!("!room1:localhost");
1752        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
1753        let exported_sessions = alice.store().export_room_keys(|_| true).await.unwrap();
1754
1755        let mut room_keys_received_stream = Box::pin(bob.store().room_keys_received_stream());
1756        bob.store().import_room_keys(exported_sessions, None, |_, _| {}).await.unwrap();
1757
1758        let room_keys = room_keys_received_stream
1759            .next()
1760            .now_or_never()
1761            .flatten()
1762            .expect("We should have received an update of room key infos")
1763            .unwrap();
1764        assert_eq!(room_keys.len(), 1);
1765        assert_eq!(room_keys[0].room_id, "!room1:localhost");
1766    }
1767
1768    #[async_test]
1769    async fn test_export_room_keys_provides_selected_keys() {
1770        // Given an OlmMachine with room keys in it
1771        let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
1772        let room1_id = room_id!("!room1:localhost");
1773        let room2_id = room_id!("!room2:localhost");
1774        let room3_id = room_id!("!room3:localhost");
1775        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
1776        alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
1777        alice.create_outbound_group_session_with_defaults_test_helper(room3_id).await.unwrap();
1778
1779        // When I export some of the keys
1780        let keys = alice
1781            .store()
1782            .export_room_keys(|s| s.room_id() == room2_id || s.room_id() == room3_id)
1783            .await
1784            .unwrap();
1785
1786        // Then the requested keys were provided
1787        assert_eq!(keys.len(), 2);
1788        assert_eq!(keys[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
1789        assert_eq!(keys[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
1790        assert_eq!(keys[0].room_id, "!room2:localhost");
1791        assert_eq!(keys[1].room_id, "!room3:localhost");
1792        assert_eq!(keys[0].session_key.to_base64().len(), 220);
1793        assert_eq!(keys[1].session_key.to_base64().len(), 220);
1794    }
1795
1796    #[async_test]
1797    async fn test_export_room_keys_stream_can_provide_all_keys() {
1798        // Given an OlmMachine with room keys in it
1799        let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
1800        let room1_id = room_id!("!room1:localhost");
1801        let room2_id = room_id!("!room2:localhost");
1802        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
1803        alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
1804
1805        // When I export the keys as a stream
1806        let mut keys = pin!(alice.store().export_room_keys_stream(|_| true).await.unwrap());
1807
1808        // And collect them
1809        let mut collected = vec![];
1810        while let Some(key) = keys.next().await {
1811            collected.push(key);
1812        }
1813
1814        // Then all the keys were provided
1815        assert_eq!(collected.len(), 2);
1816        assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
1817        assert_eq!(collected[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
1818        assert_eq!(collected[0].room_id, "!room1:localhost");
1819        assert_eq!(collected[1].room_id, "!room2:localhost");
1820        assert_eq!(collected[0].session_key.to_base64().len(), 220);
1821        assert_eq!(collected[1].session_key.to_base64().len(), 220);
1822    }
1823
1824    #[async_test]
1825    async fn test_export_room_keys_stream_can_provide_a_subset_of_keys() {
1826        // Given an OlmMachine with room keys in it
1827        let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
1828        let room1_id = room_id!("!room1:localhost");
1829        let room2_id = room_id!("!room2:localhost");
1830        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
1831        alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
1832
1833        // When I export the keys as a stream
1834        let mut keys =
1835            pin!(alice.store().export_room_keys_stream(|s| s.room_id() == room1_id).await.unwrap());
1836
1837        // And collect them
1838        let mut collected = vec![];
1839        while let Some(key) = keys.next().await {
1840            collected.push(key);
1841        }
1842
1843        // Then all the keys matching our predicate were provided, and no others
1844        assert_eq!(collected.len(), 1);
1845        assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
1846        assert_eq!(collected[0].room_id, "!room1:localhost");
1847        assert_eq!(collected[0].session_key.to_base64().len(), 220);
1848    }
1849
1850    #[async_test]
1851    async fn test_export_secrets_bundle() {
1852        let user_id = user_id!("@alice:example.com");
1853        let (first, second, _) = get_machine_pair(user_id, user_id, false).await;
1854
1855        let _ = first
1856            .bootstrap_cross_signing(false)
1857            .await
1858            .expect("We should be able to bootstrap cross-signing");
1859
1860        let bundle = first.store().export_secrets_bundle().await.expect(
1861            "We should be able to export the secrets bundle, now that we \
1862             have the cross-signing keys",
1863        );
1864
1865        assert!(bundle.backup.is_none(), "The bundle should not contain a backup key");
1866
1867        second
1868            .store()
1869            .import_secrets_bundle(&bundle)
1870            .await
1871            .expect("We should be able to import the secrets bundle");
1872
1873        let status = second.cross_signing_status().await;
1874        let identity = second.get_identity(user_id, None).await.unwrap().unwrap().own().unwrap();
1875
1876        assert!(identity.is_verified(), "The public identity should be marked as verified.");
1877
1878        assert!(status.is_complete(), "We should have imported all the cross-signing keys");
1879    }
1880
1881    #[async_test]
1882    async fn test_create_dehydrated_device_key() {
1883        let pickle_key = DehydratedDeviceKey::new()
1884            .expect("Should be able to create a random dehydrated device key");
1885
1886        let to_vec = pickle_key.inner.to_vec();
1887        let pickle_key_from_slice = DehydratedDeviceKey::from_slice(to_vec.as_slice())
1888            .expect("Should be able to create a dehydrated device key from slice");
1889
1890        assert_eq!(pickle_key_from_slice.to_base64(), pickle_key.to_base64());
1891    }
1892
1893    #[async_test]
1894    async fn test_create_dehydrated_errors() {
1895        let too_small = [0u8; 22];
1896        let pickle_key = DehydratedDeviceKey::from_slice(&too_small);
1897
1898        assert!(pickle_key.is_err());
1899
1900        let too_big = [0u8; 40];
1901        let pickle_key = DehydratedDeviceKey::from_slice(&too_big);
1902
1903        assert!(pickle_key.is_err());
1904    }
1905
1906    #[async_test]
1907    async fn test_build_room_key_bundle() {
1908        // Given: Alice has sent a number of room keys to Bob, including some in the
1909        // wrong room, and some that are not marked as shared...
1910        let alice = OlmMachine::new(user_id!("@a:s.co"), device_id!("ALICE")).await;
1911        let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("BOB")).await;
1912
1913        let room1_id = room_id!("!room1:localhost");
1914        let room2_id = room_id!("!room2:localhost");
1915
1916        /* We use hardcoded megolm session data, to get a stable output snapshot. These were all created with:
1917
1918           println!("{}", vodozemac::megolm::GroupSession::new(Default::default()).session_key().to_base64());
1919        */
1920        let session_key1 = "AgAAAAC2XHVzsMBKs4QCRElJ92CJKyGtknCSC8HY7cQ7UYwndMKLQAejXLh5UA0l6s736mgctcUMNvELScUWrObdflrHo+vth/gWreXOaCnaSxmyjjKErQwyIYTkUfqbHy40RJfEesLwnN23on9XAkch/iy8R2+Jz7B8zfG01f2Ow2SxPQFnAndcO1ZSD2GmXgedy6n4B20MWI1jGP2wiexOWbFSya8DO/VxC9m5+/mF+WwYqdpKn9g4Y05Yw4uz7cdjTc3rXm7xK+8E7hI//5QD1nHPvuKYbjjM9u2JSL+Bzp61Cw";
1921        let session_key2 = "AgAAAAC1BXreFTUQQSBGekTEuYxhdytRKyv4JgDGcG+VOBYdPNGgs807SdibCGJky4lJ3I+7ZDGHoUzZPZP/4ogGu4kxni0PWdtWuN7+5zsuamgoFF/BkaGeUUGv6kgIkx8pyPpM5SASTUEP9bN2loDSpUPYwfiIqz74DgC4WQ4435sTBctYvKz8n+TDJwdLXpyT6zKljuqADAioud+s/iqx9LYn9HpbBfezZcvbg67GtE113pLrvde3IcPI5s6dNHK2onGO2B2eoaobcen18bbEDnlUGPeIivArLya7Da6us14jBQ";
1922        let session_key3 = "AgAAAAAM9KFsliaUUhGSXgwOzM5UemjkNH4n8NHgvC/y8hhw13zTF+ooGD4uIYEXYX630oNvQm/EvgZo+dkoc0re+vsqsx4sQeNODdSjcBsWOa0oDF+irQn9oYoLUDPI1IBtY1rX+FV99Zm/xnG7uFOX7aTVlko2GSdejy1w9mfobmfxu5aUc04A9zaKJP1pOthZvRAlhpymGYHgsDtWPrrjyc/yypMflE4kIUEEEtu1kT6mrAmcl615XYRAHYK9G2+fZsGvokwzbkl4nulGwcZMpQEoM0nD2o3GWgX81HW3nGfKBg";
1923        let session_key4 = "AgAAAAA4Kkesxq2h4v9PLD6Sm3Smxspz1PXTqytQPCMQMkkrHNmzV2bHlJ+6/Al9cu8vh1Oj69AK0WUAeJOJuaiskEeg/PI3P03+UYLeC379RzgqwSHdBgdQ41G2vD6zpgmE/8vYToe+qpCZACtPOswZxyqxHH+T/Iq0nv13JmlFGIeA6fEPfr5Y28B49viG74Fs9rxV9EH5PfjbuPM/p+Sz5obShuaBPKQBX1jT913nEXPoIJ06exNZGr0285nw/LgVvNlmWmbqNnbzO2cNZjQWA+xZYz5FSfyCxwqEBbEdUCuRCQ";
1924
1925        let sessions = [
1926            create_inbound_group_session_with_visibility(
1927                &alice,
1928                room1_id,
1929                &SessionKey::from_base64(session_key1).unwrap(),
1930                true,
1931            ),
1932            create_inbound_group_session_with_visibility(
1933                &alice,
1934                room1_id,
1935                &SessionKey::from_base64(session_key2).unwrap(),
1936                true,
1937            ),
1938            create_inbound_group_session_with_visibility(
1939                &alice,
1940                room1_id,
1941                &SessionKey::from_base64(session_key3).unwrap(),
1942                false,
1943            ),
1944            create_inbound_group_session_with_visibility(
1945                &alice,
1946                room2_id,
1947                &SessionKey::from_base64(session_key4).unwrap(),
1948                true,
1949            ),
1950        ];
1951        bob.store().save_inbound_group_sessions(&sessions).await.unwrap();
1952
1953        // When I build the bundle
1954        let mut bundle = bob.store().build_room_key_bundle(room1_id).await.unwrap();
1955
1956        // Then the bundle matches the snapshot.
1957
1958        // We sort the sessions in the bundle, so that the snapshot is stable.
1959        bundle.room_keys.sort_by_key(|session| session.session_id.clone());
1960
1961        // We also substitute alice's keys in the snapshot with placeholders
1962        let alice_curve_key = alice.identity_keys().curve25519.to_base64();
1963        let map_alice_curve_key = move |value: Content, _path: ContentPath<'_>| {
1964            assert_eq!(value.as_str().unwrap(), alice_curve_key);
1965            "[alice curve key]"
1966        };
1967        let alice_ed25519_key = alice.identity_keys().ed25519.to_base64();
1968        let map_alice_ed25519_key = move |value: Content, _path: ContentPath<'_>| {
1969            assert_eq!(value.as_str().unwrap(), alice_ed25519_key);
1970            "[alice ed25519 key]"
1971        };
1972
1973        insta::with_settings!({ sort_maps => true }, {
1974            assert_json_snapshot!(bundle, {
1975                ".room_keys[].sender_key" => insta::dynamic_redaction(map_alice_curve_key.clone()),
1976                ".withheld[].sender_key" => insta::dynamic_redaction(map_alice_curve_key),
1977                ".room_keys[].sender_claimed_keys.ed25519" => insta::dynamic_redaction(map_alice_ed25519_key),
1978            });
1979        });
1980    }
1981
1982    /// Create an inbound Megolm session for the given room.
1983    ///
1984    /// `olm_machine` is used to set the `sender_key` and `signing_key`
1985    /// fields of the resultant session.
1986    fn create_inbound_group_session_with_visibility(
1987        olm_machine: &OlmMachine,
1988        room_id: &RoomId,
1989        session_key: &SessionKey,
1990        shared_history: bool,
1991    ) -> InboundGroupSession {
1992        let identity_keys = &olm_machine.store().static_account().identity_keys;
1993        InboundGroupSession::new(
1994            identity_keys.curve25519,
1995            identity_keys.ed25519,
1996            room_id,
1997            session_key,
1998            SenderData::unknown(),
1999            EventEncryptionAlgorithm::MegolmV1AesSha2,
2000            None,
2001            shared_history,
2002        )
2003        .unwrap()
2004    }
2005}