matrix_sdk_crypto/store/
mod.rs

1// Copyright 2020 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Types and traits to implement the storage layer for the [`OlmMachine`]
16//!
17//! The storage layer for the [`OlmMachine`] can be customized using a trait.
18//! Implementing your own [`CryptoStore`]
19//!
20//! An in-memory only store is provided as well as an SQLite-based one,
21//! depending on your needs and targets a custom store may be implemented, e.g.
22//! for `wasm-unknown-unknown` an indexeddb store would be needed
23//!
24//! ```
25//! # use std::sync::Arc;
26//! # use matrix_sdk_crypto::{
27//! #     OlmMachine,
28//! #     store::MemoryStore,
29//! # };
30//! # use ruma::{device_id, user_id};
31//! # let user_id = user_id!("@example:localhost");
32//! # let device_id = device_id!("TEST");
33//! let store = Arc::new(MemoryStore::new());
34//!
35//! let machine = OlmMachine::with_store(user_id, device_id, store, None);
36//! ```
37//!
38//! [`OlmMachine`]: /matrix_sdk_crypto/struct.OlmMachine.html
39//! [`CryptoStore`]: trait.Cryptostore.html
40
41use std::{
42    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
43    fmt::Debug,
44    ops::Deref,
45    pin::pin,
46    sync::{atomic::Ordering, Arc},
47    time::Duration,
48};
49
50use as_variant::as_variant;
51use futures_core::Stream;
52use futures_util::StreamExt;
53use itertools::{Either, Itertools};
54use ruma::{
55    encryption::KeyUsage, events::secret::request::SecretName, DeviceId, OwnedDeviceId,
56    OwnedUserId, RoomId, UserId,
57};
58use serde::{de::DeserializeOwned, Serialize};
59use thiserror::Error;
60use tokio::sync::{Mutex, Notify, OwnedRwLockWriteGuard, RwLock};
61use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
62use tracing::{info, instrument, trace, warn};
63use types::{RoomKeyBundleInfo, StoredRoomKeyBundleData};
64use vodozemac::{megolm::SessionOrdering, Curve25519PublicKey};
65
66use self::types::{
67    Changes, CrossSigningKeyExport, DeviceChanges, DeviceUpdates, IdentityChanges, IdentityUpdates,
68    PendingChanges, RoomKeyInfo, RoomKeyWithheldInfo, UserKeyQueryResult,
69};
70#[cfg(doc)]
71use crate::{backups::BackupMachine, identities::OwnUserIdentity};
72use crate::{
73    gossiping::GossippedSecret,
74    identities::{user::UserIdentity, Device, DeviceData, UserDevices, UserIdentityData},
75    olm::{
76        Account, ExportedRoomKey, InboundGroupSession, PrivateCrossSigningIdentity, SenderData,
77        Session, StaticAccountData,
78    },
79    store::types::RoomKeyWithheldEntry,
80    types::{
81        BackupSecrets, CrossSigningSecrets, MegolmBackupV1Curve25519AesSha2Secrets, RoomKeyExport,
82        SecretsBundle,
83    },
84    verification::VerificationMachine,
85    CrossSigningStatus, OwnUserIdentityData, RoomKeyImportResult,
86};
87
88pub mod caches;
89mod crypto_store_wrapper;
90mod error;
91mod memorystore;
92mod traits;
93pub mod types;
94
95#[cfg(any(test, feature = "testing"))]
96#[macro_use]
97#[allow(missing_docs)]
98pub mod integration_tests;
99
100pub(crate) use crypto_store_wrapper::CryptoStoreWrapper;
101pub use error::{CryptoStoreError, Result};
102use matrix_sdk_common::{
103    cross_process_lock::CrossProcessLock, deserialized_responses::WithheldCode, timeout::timeout,
104};
105pub use memorystore::MemoryStore;
106pub use traits::{CryptoStore, DynCryptoStore, IntoCryptoStore};
107
108use self::caches::{SequenceNumber, StoreCache, StoreCacheGuard, UsersForKeyQuery};
109use crate::types::{
110    events::room_key_withheld::RoomKeyWithheldContent, room_history::RoomKeyBundle,
111};
112pub use crate::{
113    dehydrated_devices::DehydrationError,
114    gossiping::{GossipRequest, SecretInfo},
115};
116
117/// A wrapper for our CryptoStore trait object.
118///
119/// This is needed because we want to have a generic interface so we can
120/// store/restore objects that we can serialize. Since trait objects and
121/// generics don't mix let the CryptoStore store strings and this wrapper
122/// adds the generic interface on top.
123#[derive(Debug, Clone)]
124pub struct Store {
125    inner: Arc<StoreInner>,
126}
127
128#[derive(Debug, Default)]
129pub(crate) struct KeyQueryManager {
130    /// Record of the users that are waiting for a /keys/query.
131    users_for_key_query: Mutex<UsersForKeyQuery>,
132
133    /// Notifier that is triggered each time an update is received for a user.
134    users_for_key_query_notify: Notify,
135}
136
137impl KeyQueryManager {
138    pub async fn synced<'a>(&'a self, cache: &'a StoreCache) -> Result<SyncedKeyQueryManager<'a>> {
139        self.ensure_sync_tracked_users(cache).await?;
140        Ok(SyncedKeyQueryManager { cache, manager: self })
141    }
142
143    /// Load the list of users for whom we are tracking their device lists and
144    /// fill out our caches.
145    ///
146    /// This method ensures that we're only going to load the users from the
147    /// actual [`CryptoStore`] once, it will also make sure that any
148    /// concurrent calls to this method get deduplicated.
149    async fn ensure_sync_tracked_users(&self, cache: &StoreCache) -> Result<()> {
150        // Check if the users are loaded, and in that case do nothing.
151        let loaded = cache.loaded_tracked_users.read().await;
152        if *loaded {
153            return Ok(());
154        }
155
156        // Otherwise, we may load the users.
157        drop(loaded);
158        let mut loaded = cache.loaded_tracked_users.write().await;
159
160        // Check again if the users have been loaded, in case another call to this
161        // method loaded the tracked users between the time we tried to
162        // acquire the lock and the time we actually acquired the lock.
163        if *loaded {
164            return Ok(());
165        }
166
167        let tracked_users = cache.store.load_tracked_users().await?;
168
169        let mut query_users_lock = self.users_for_key_query.lock().await;
170        let mut tracked_users_cache = cache.tracked_users.write();
171        for user in tracked_users {
172            tracked_users_cache.insert(user.user_id.to_owned());
173
174            if user.dirty {
175                query_users_lock.insert_user(&user.user_id);
176            }
177        }
178
179        *loaded = true;
180
181        Ok(())
182    }
183
184    /// Wait for a `/keys/query` response to be received if one is expected for
185    /// the given user.
186    ///
187    /// If the given timeout elapses, the method will stop waiting and return
188    /// [`UserKeyQueryResult::TimeoutExpired`].
189    ///
190    /// Requires a [`StoreCacheGuard`] to make sure the users for which a key
191    /// query is pending are up to date, but doesn't hold on to it
192    /// thereafter: the lock is short-lived in this case.
193    pub async fn wait_if_user_key_query_pending(
194        &self,
195        cache: StoreCacheGuard,
196        timeout_duration: Duration,
197        user: &UserId,
198    ) -> Result<UserKeyQueryResult> {
199        {
200            // Drop the cache early, so we don't keep it while waiting (since writing the
201            // results requires to write in the cache, thus take another lock).
202            self.ensure_sync_tracked_users(&cache).await?;
203            drop(cache);
204        }
205
206        let mut users_for_key_query = self.users_for_key_query.lock().await;
207        let Some(waiter) = users_for_key_query.maybe_register_waiting_task(user) else {
208            return Ok(UserKeyQueryResult::WasNotPending);
209        };
210
211        let wait_for_completion = async {
212            while !waiter.completed.load(Ordering::Relaxed) {
213                // Register for being notified before releasing the mutex, so
214                // it's impossible to miss a wakeup between the last check for
215                // whether we should wait, and starting to wait.
216                let mut notified = pin!(self.users_for_key_query_notify.notified());
217                notified.as_mut().enable();
218                drop(users_for_key_query);
219
220                // Wait for a notification
221                notified.await;
222
223                // Reclaim the lock before checking the flag to avoid races
224                // when two notifications happen right after each other and the
225                // second one sets the flag we want to wait for.
226                users_for_key_query = self.users_for_key_query.lock().await;
227            }
228        };
229
230        match timeout(Box::pin(wait_for_completion), timeout_duration).await {
231            Err(_) => {
232                warn!(
233                    user_id = ?user,
234                    "The user has a pending `/keys/query` request which did \
235                    not finish yet, some devices might be missing."
236                );
237
238                Ok(UserKeyQueryResult::TimeoutExpired)
239            }
240            _ => Ok(UserKeyQueryResult::WasPending),
241        }
242    }
243}
244
245pub(crate) struct SyncedKeyQueryManager<'a> {
246    cache: &'a StoreCache,
247    manager: &'a KeyQueryManager,
248}
249
250impl SyncedKeyQueryManager<'_> {
251    /// Add entries to the list of users being tracked for device changes
252    ///
253    /// Any users not already on the list are flagged as awaiting a key query.
254    /// Users that were already in the list are unaffected.
255    pub async fn update_tracked_users(&self, users: impl Iterator<Item = &UserId>) -> Result<()> {
256        let mut store_updates = Vec::new();
257        let mut key_query_lock = self.manager.users_for_key_query.lock().await;
258
259        {
260            let mut tracked_users = self.cache.tracked_users.write();
261            for user_id in users {
262                if tracked_users.insert(user_id.to_owned()) {
263                    key_query_lock.insert_user(user_id);
264                    store_updates.push((user_id, true))
265                }
266            }
267        }
268
269        self.cache.store.save_tracked_users(&store_updates).await
270    }
271
272    /// Process notifications that users have changed devices.
273    ///
274    /// This is used to handle the list of device-list updates that is received
275    /// from the `/sync` response. Any users *whose device lists we are
276    /// tracking* are flagged as needing a key query. Users whose devices we
277    /// are not tracking are ignored.
278    pub async fn mark_tracked_users_as_changed(
279        &self,
280        users: impl Iterator<Item = &UserId>,
281    ) -> Result<()> {
282        let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
283        let mut key_query_lock = self.manager.users_for_key_query.lock().await;
284
285        {
286            let tracked_users = &self.cache.tracked_users.read();
287            for user_id in users {
288                if tracked_users.contains(user_id) {
289                    key_query_lock.insert_user(user_id);
290                    store_updates.push((user_id, true));
291                }
292            }
293        }
294
295        self.cache.store.save_tracked_users(&store_updates).await
296    }
297
298    /// Flag that the given users devices are now up-to-date.
299    ///
300    /// This is called after processing the response to a /keys/query request.
301    /// Any users whose device lists we are tracking are removed from the
302    /// list of those pending a /keys/query.
303    pub async fn mark_tracked_users_as_up_to_date(
304        &self,
305        users: impl Iterator<Item = &UserId>,
306        sequence_number: SequenceNumber,
307    ) -> Result<()> {
308        let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
309        let mut key_query_lock = self.manager.users_for_key_query.lock().await;
310
311        {
312            let tracked_users = self.cache.tracked_users.read();
313            for user_id in users {
314                if tracked_users.contains(user_id) {
315                    let clean = key_query_lock.maybe_remove_user(user_id, sequence_number);
316                    store_updates.push((user_id, !clean));
317                }
318            }
319        }
320
321        self.cache.store.save_tracked_users(&store_updates).await?;
322        // wake up any tasks that may have been waiting for updates
323        self.manager.users_for_key_query_notify.notify_waiters();
324
325        Ok(())
326    }
327
328    /// Get the set of users that has the outdate/dirty flag set for their list
329    /// of devices.
330    ///
331    /// This set should be included in a `/keys/query` request which will update
332    /// the device list.
333    ///
334    /// # Returns
335    ///
336    /// A pair `(users, sequence_number)`, where `users` is the list of users to
337    /// be queried, and `sequence_number` is the current sequence number,
338    /// which should be returned in `mark_tracked_users_as_up_to_date`.
339    pub async fn users_for_key_query(&self) -> (HashSet<OwnedUserId>, SequenceNumber) {
340        self.manager.users_for_key_query.lock().await.users_for_key_query()
341    }
342
343    /// See the docs for [`crate::OlmMachine::tracked_users()`].
344    pub fn tracked_users(&self) -> HashSet<OwnedUserId> {
345        self.cache.tracked_users.read().iter().cloned().collect()
346    }
347
348    /// Mark the given user as being tracked for device lists, and mark that it
349    /// has an outdated device list.
350    ///
351    /// This means that the user will be considered for a `/keys/query` request
352    /// next time [`Store::users_for_key_query()`] is called.
353    pub async fn mark_user_as_changed(&self, user: &UserId) -> Result<()> {
354        self.manager.users_for_key_query.lock().await.insert_user(user);
355        self.cache.tracked_users.write().insert(user.to_owned());
356
357        self.cache.store.save_tracked_users(&[(user, true)]).await
358    }
359}
360
361/// Convert the devices and vectors contained in the [`DeviceChanges`] into
362/// a [`DeviceUpdates`] struct.
363///
364/// The [`DeviceChanges`] will contain vectors of [`DeviceData`]s which
365/// we want to convert to a [`Device`].
366fn collect_device_updates(
367    verification_machine: VerificationMachine,
368    own_identity: Option<OwnUserIdentityData>,
369    identities: IdentityChanges,
370    devices: DeviceChanges,
371) -> DeviceUpdates {
372    let mut new: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
373    let mut changed: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
374
375    let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
376
377    let map_device = |device: DeviceData| {
378        let device_owner_identity = new_identities
379            .get(device.user_id())
380            .or_else(|| changed_identities.get(device.user_id()))
381            .or_else(|| unchanged_identities.get(device.user_id()))
382            .cloned();
383
384        Device {
385            inner: device,
386            verification_machine: verification_machine.to_owned(),
387            own_identity: own_identity.to_owned(),
388            device_owner_identity,
389        }
390    };
391
392    for device in devices.new {
393        let device = map_device(device);
394
395        new.entry(device.user_id().to_owned())
396            .or_default()
397            .insert(device.device_id().to_owned(), device);
398    }
399
400    for device in devices.changed {
401        let device = map_device(device);
402
403        changed
404            .entry(device.user_id().to_owned())
405            .or_default()
406            .insert(device.device_id().to_owned(), device.to_owned());
407    }
408
409    DeviceUpdates { new, changed }
410}
411
412/// A temporary transaction (that implies a write) to the underlying store.
413#[allow(missing_debug_implementations)]
414pub struct StoreTransaction {
415    store: Store,
416    changes: PendingChanges,
417    // TODO hold onto the cross-process crypto store lock + cache.
418    cache: OwnedRwLockWriteGuard<StoreCache>,
419}
420
421impl StoreTransaction {
422    /// Starts a new `StoreTransaction`.
423    async fn new(store: Store) -> Self {
424        let cache = store.inner.cache.clone();
425
426        Self { store, changes: PendingChanges::default(), cache: cache.clone().write_owned().await }
427    }
428
429    pub(crate) fn cache(&self) -> &StoreCache {
430        &self.cache
431    }
432
433    /// Returns a reference to the current `Store`.
434    pub fn store(&self) -> &Store {
435        &self.store
436    }
437
438    /// Gets a `Account` for update.
439    ///
440    /// Note: since it's guaranteed that one can't have both a
441    /// `StoreTransaction` and a `StoreCacheGuard` at runtime (since the
442    /// underlying `StoreCache` is guarded by a `RwLock` mutex), this ensures
443    /// that we can't have two copies of an `Account` alive at the same time.
444    pub async fn account(&mut self) -> Result<&mut Account> {
445        if self.changes.account.is_none() {
446            // Make sure the cache loaded the account.
447            let _ = self.cache.account().await?;
448            self.changes.account = self.cache.account.lock().await.take();
449        }
450        Ok(self.changes.account.as_mut().unwrap())
451    }
452
453    /// Commits all dirty fields to the store, and maintains the cache so it
454    /// reflects the current state of the database.
455    pub async fn commit(self) -> Result<()> {
456        if self.changes.is_empty() {
457            return Ok(());
458        }
459
460        // Save changes in the database.
461        let account = self.changes.account.as_ref().map(|acc| acc.deep_clone());
462
463        self.store.save_pending_changes(self.changes).await?;
464
465        // Make the cache coherent with the database.
466        if let Some(account) = account {
467            *self.cache.account.lock().await = Some(account);
468        }
469
470        Ok(())
471    }
472}
473
474#[derive(Debug)]
475struct StoreInner {
476    identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
477    store: Arc<CryptoStoreWrapper>,
478
479    /// In-memory cache for the current crypto store.
480    ///
481    /// ⚠ Must remain private.
482    cache: Arc<RwLock<StoreCache>>,
483
484    verification_machine: VerificationMachine,
485
486    /// Static account data that never changes (and thus can be loaded once and
487    /// for all when creating the store).
488    static_account: StaticAccountData,
489}
490
491/// Error describing what went wrong when importing private cross signing keys
492/// or the key backup key.
493#[derive(Debug, Error)]
494pub enum SecretImportError {
495    /// The key that we tried to import was invalid.
496    #[error("Error while importing {name}: {error}")]
497    Key {
498        /// The name of the secret that was being imported.
499        name: SecretName,
500        /// The key error that occurred.
501        error: vodozemac::KeyError,
502    },
503    /// The public key of the imported private key doesn't match the public
504    /// key that was uploaded to the server.
505    #[error(
506        "Error while importing {name}: The public key of the imported private \
507            key doesn't match the public key that was uploaded to the server"
508    )]
509    MismatchedPublicKeys {
510        /// The name of the secret that was being imported.
511        name: SecretName,
512    },
513    /// The new version of the identity couldn't be stored.
514    #[error(transparent)]
515    Store(#[from] CryptoStoreError),
516}
517
518/// Error describing what went wrong when exporting a [`SecretsBundle`].
519///
520/// The [`SecretsBundle`] can only be exported if we have all cross-signing
521/// private keys in the store.
522#[derive(Debug, Error)]
523pub enum SecretsBundleExportError {
524    /// The store itself had an error.
525    #[error(transparent)]
526    Store(#[from] CryptoStoreError),
527    /// We're missing one or multiple cross-signing keys.
528    #[error("The store is missing one or multiple cross-signing keys")]
529    MissingCrossSigningKey(KeyUsage),
530    /// We're missing all cross-signing keys.
531    #[error("The store doesn't contain any cross-signing keys")]
532    MissingCrossSigningKeys,
533    /// We have a backup key stored, but we don't know the version of the
534    /// backup.
535    #[error("The store contains a backup key, but no backup version")]
536    MissingBackupVersion,
537}
538
539impl Store {
540    /// Create a new Store.
541    pub(crate) fn new(
542        account: StaticAccountData,
543        identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
544        store: Arc<CryptoStoreWrapper>,
545        verification_machine: VerificationMachine,
546    ) -> Self {
547        Self {
548            inner: Arc::new(StoreInner {
549                static_account: account,
550                identity,
551                store: store.clone(),
552                verification_machine,
553                cache: Arc::new(RwLock::new(StoreCache {
554                    store,
555                    tracked_users: Default::default(),
556                    loaded_tracked_users: Default::default(),
557                    account: Default::default(),
558                })),
559            }),
560        }
561    }
562
563    /// UserId associated with this store
564    pub(crate) fn user_id(&self) -> &UserId {
565        &self.inner.static_account.user_id
566    }
567
568    /// DeviceId associated with this store
569    pub(crate) fn device_id(&self) -> &DeviceId {
570        self.inner.verification_machine.own_device_id()
571    }
572
573    /// The static data for the account associated with this store.
574    pub(crate) fn static_account(&self) -> &StaticAccountData {
575        &self.inner.static_account
576    }
577
578    pub(crate) async fn cache(&self) -> Result<StoreCacheGuard> {
579        // TODO: (bnjbvr, #2624) If configured with a cross-process lock:
580        // - try to take the lock,
581        // - if acquired, look if another process touched the underlying storage,
582        // - if yes, reload everything; if no, return current cache
583        Ok(StoreCacheGuard { cache: self.inner.cache.clone().read_owned().await })
584    }
585
586    pub(crate) async fn transaction(&self) -> StoreTransaction {
587        StoreTransaction::new(self.clone()).await
588    }
589
590    // Note: bnjbvr lost against borrowck here. Ideally, the `F` parameter would
591    // take a `&StoreTransaction`, but callers didn't quite like that.
592    pub(crate) async fn with_transaction<
593        T,
594        Fut: futures_core::Future<Output = Result<(StoreTransaction, T), crate::OlmError>>,
595        F: FnOnce(StoreTransaction) -> Fut,
596    >(
597        &self,
598        func: F,
599    ) -> Result<T, crate::OlmError> {
600        let tr = self.transaction().await;
601        let (tr, res) = func(tr).await?;
602        tr.commit().await?;
603        Ok(res)
604    }
605
606    #[cfg(test)]
607    /// test helper to reset the cross signing identity
608    pub(crate) async fn reset_cross_signing_identity(&self) {
609        self.inner.identity.lock().await.reset();
610    }
611
612    /// PrivateCrossSigningIdentity associated with this store
613    pub(crate) fn private_identity(&self) -> Arc<Mutex<PrivateCrossSigningIdentity>> {
614        self.inner.identity.clone()
615    }
616
617    /// Save the given Sessions to the store
618    pub(crate) async fn save_sessions(&self, sessions: &[Session]) -> Result<()> {
619        let changes = Changes { sessions: sessions.to_vec(), ..Default::default() };
620
621        self.save_changes(changes).await
622    }
623
624    pub(crate) async fn get_sessions(
625        &self,
626        sender_key: &str,
627    ) -> Result<Option<Arc<Mutex<Vec<Session>>>>> {
628        self.inner.store.get_sessions(sender_key).await
629    }
630
631    pub(crate) async fn save_changes(&self, changes: Changes) -> Result<()> {
632        self.inner.store.save_changes(changes).await
633    }
634
635    /// Compare the given `InboundGroupSession` with an existing session we have
636    /// in the store.
637    ///
638    /// This method returns `SessionOrdering::Better` if the given session is
639    /// better than the one we already have or if we don't have such a
640    /// session in the store.
641    pub(crate) async fn compare_group_session(
642        &self,
643        session: &InboundGroupSession,
644    ) -> Result<SessionOrdering> {
645        let old_session = self
646            .inner
647            .store
648            .get_inbound_group_session(session.room_id(), session.session_id())
649            .await?;
650
651        Ok(if let Some(old_session) = old_session {
652            session.compare(&old_session).await
653        } else {
654            SessionOrdering::Better
655        })
656    }
657
658    #[cfg(test)]
659    /// Testing helper to allow to save only a set of devices
660    pub(crate) async fn save_device_data(&self, devices: &[DeviceData]) -> Result<()> {
661        use types::DeviceChanges;
662
663        let changes = Changes {
664            devices: DeviceChanges { changed: devices.to_vec(), ..Default::default() },
665            ..Default::default()
666        };
667
668        self.save_changes(changes).await
669    }
670
671    /// Convenience helper to persist an array of [`InboundGroupSession`]s.
672    pub(crate) async fn save_inbound_group_sessions(
673        &self,
674        sessions: &[InboundGroupSession],
675    ) -> Result<()> {
676        let changes = Changes { inbound_group_sessions: sessions.to_vec(), ..Default::default() };
677
678        self.save_changes(changes).await
679    }
680
681    /// Get the display name of our own device.
682    pub(crate) async fn device_display_name(&self) -> Result<Option<String>, CryptoStoreError> {
683        Ok(self
684            .inner
685            .store
686            .get_device(self.user_id(), self.device_id())
687            .await?
688            .and_then(|d| d.display_name().map(|d| d.to_owned())))
689    }
690
691    /// Get the device data for the given [`UserId`] and [`DeviceId`].
692    ///
693    /// *Note*: This method will include our own device which is always present
694    /// in the store.
695    pub(crate) async fn get_device_data(
696        &self,
697        user_id: &UserId,
698        device_id: &DeviceId,
699    ) -> Result<Option<DeviceData>> {
700        self.inner.store.get_device(user_id, device_id).await
701    }
702
703    /// Get the device data for the given [`UserId`] and [`DeviceId`].
704    ///
705    /// *Note*: This method will **not** include our own device.
706    ///
707    /// Use this method if you need a list of recipients for a given user, since
708    /// we don't want to encrypt for our own device, otherwise take a look at
709    /// the [`Store::get_device_data_for_user`] method.
710    pub(crate) async fn get_device_data_for_user_filtered(
711        &self,
712        user_id: &UserId,
713    ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
714        self.inner.store.get_user_devices(user_id).await.map(|mut d| {
715            if user_id == self.user_id() {
716                d.remove(self.device_id());
717            }
718            d
719        })
720    }
721
722    /// Get the [`DeviceData`] for all the devices a user has.
723    ///
724    /// *Note*: This method will include our own device which is always present
725    /// in the store.
726    ///
727    /// Use this method if you need to operate on or update all devices of a
728    /// user, otherwise take a look at the
729    /// [`Store::get_device_data_for_user_filtered`] method.
730    pub(crate) async fn get_device_data_for_user(
731        &self,
732        user_id: &UserId,
733    ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
734        self.inner.store.get_user_devices(user_id).await
735    }
736
737    /// Get a [`Device`] for the given user with the given
738    /// [`Curve25519PublicKey`] key.
739    ///
740    /// *Note*: This method will include our own device which is always present
741    /// in the store.
742    pub(crate) async fn get_device_from_curve_key(
743        &self,
744        user_id: &UserId,
745        curve_key: Curve25519PublicKey,
746    ) -> Result<Option<Device>> {
747        self.get_user_devices(user_id)
748            .await
749            .map(|d| d.devices().find(|d| d.curve25519_key() == Some(curve_key)))
750    }
751
752    /// Get all devices associated with the given [`UserId`].
753    ///
754    /// This method is more expensive than the
755    /// [`Store::get_device_data_for_user`] method, since a [`Device`]
756    /// requires the [`OwnUserIdentityData`] and the [`UserIdentityData`] of the
757    /// device owner to be fetched from the store as well.
758    ///
759    /// *Note*: This method will include our own device which is always present
760    /// in the store.
761    pub(crate) async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices> {
762        let devices = self.get_device_data_for_user(user_id).await?;
763
764        let own_identity = self
765            .inner
766            .store
767            .get_user_identity(self.user_id())
768            .await?
769            .and_then(|i| i.own().cloned());
770        let device_owner_identity = self.inner.store.get_user_identity(user_id).await?;
771
772        Ok(UserDevices {
773            inner: devices,
774            verification_machine: self.inner.verification_machine.clone(),
775            own_identity,
776            device_owner_identity,
777        })
778    }
779
780    /// Get a [`Device`] for the given user with the given [`DeviceId`].
781    ///
782    /// This method is more expensive than the [`Store::get_device_data`] method
783    /// since a [`Device`] requires the [`OwnUserIdentityData`] and the
784    /// [`UserIdentityData`] of the device owner to be fetched from the
785    /// store as well.
786    ///
787    /// *Note*: This method will include our own device which is always present
788    /// in the store.
789    pub(crate) async fn get_device(
790        &self,
791        user_id: &UserId,
792        device_id: &DeviceId,
793    ) -> Result<Option<Device>> {
794        if let Some(device_data) = self.inner.store.get_device(user_id, device_id).await? {
795            Ok(Some(self.wrap_device_data(device_data).await?))
796        } else {
797            Ok(None)
798        }
799    }
800
801    /// Create a new device using the supplied [`DeviceData`]. Normally we would
802    /// call [`Self::get_device`] to find an existing device inside this
803    /// store. Only call this if you have some existing DeviceData and want
804    /// to wrap it with the extra information provided by a [`Device`].
805    pub(crate) async fn wrap_device_data(&self, device_data: DeviceData) -> Result<Device> {
806        let own_identity = self
807            .inner
808            .store
809            .get_user_identity(self.user_id())
810            .await?
811            .and_then(|i| i.own().cloned());
812
813        let device_owner_identity =
814            self.inner.store.get_user_identity(device_data.user_id()).await?;
815
816        Ok(Device {
817            inner: device_data,
818            verification_machine: self.inner.verification_machine.clone(),
819            own_identity,
820            device_owner_identity,
821        })
822    }
823
824    ///  Get the Identity of `user_id`
825    pub(crate) async fn get_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
826        let own_identity = self
827            .inner
828            .store
829            .get_user_identity(self.user_id())
830            .await?
831            .and_then(as_variant!(UserIdentityData::Own));
832
833        Ok(self.inner.store.get_user_identity(user_id).await?.map(|i| {
834            UserIdentity::new(
835                self.clone(),
836                i,
837                self.inner.verification_machine.to_owned(),
838                own_identity,
839            )
840        }))
841    }
842
843    /// Try to export the secret with the given secret name.
844    ///
845    /// The exported secret will be encoded as unpadded base64. Returns `Null`
846    /// if the secret can't be found.
847    ///
848    /// # Arguments
849    ///
850    /// * `secret_name` - The name of the secret that should be exported.
851    pub async fn export_secret(
852        &self,
853        secret_name: &SecretName,
854    ) -> Result<Option<String>, CryptoStoreError> {
855        Ok(match secret_name {
856            SecretName::CrossSigningMasterKey
857            | SecretName::CrossSigningUserSigningKey
858            | SecretName::CrossSigningSelfSigningKey => {
859                self.inner.identity.lock().await.export_secret(secret_name).await
860            }
861            SecretName::RecoveryKey => {
862                if let Some(key) = self.load_backup_keys().await?.decryption_key {
863                    let exported = key.to_base64();
864                    Some(exported)
865                } else {
866                    None
867                }
868            }
869            name => {
870                warn!(secret = ?name, "Unknown secret was requested");
871                None
872            }
873        })
874    }
875
876    /// Export all the private cross signing keys we have.
877    ///
878    /// The export will contain the seed for the ed25519 keys as a unpadded
879    /// base64 encoded string.
880    ///
881    /// This method returns `None` if we don't have any private cross signing
882    /// keys.
883    pub async fn export_cross_signing_keys(
884        &self,
885    ) -> Result<Option<CrossSigningKeyExport>, CryptoStoreError> {
886        let master_key = self.export_secret(&SecretName::CrossSigningMasterKey).await?;
887        let self_signing_key = self.export_secret(&SecretName::CrossSigningSelfSigningKey).await?;
888        let user_signing_key = self.export_secret(&SecretName::CrossSigningUserSigningKey).await?;
889
890        Ok(if master_key.is_none() && self_signing_key.is_none() && user_signing_key.is_none() {
891            None
892        } else {
893            Some(CrossSigningKeyExport { master_key, self_signing_key, user_signing_key })
894        })
895    }
896
897    /// Import our private cross signing keys.
898    ///
899    /// The export needs to contain the seed for the Ed25519 keys as an unpadded
900    /// base64 encoded string.
901    pub async fn import_cross_signing_keys(
902        &self,
903        export: CrossSigningKeyExport,
904    ) -> Result<CrossSigningStatus, SecretImportError> {
905        if let Some(public_identity) =
906            self.get_identity(self.user_id()).await?.and_then(|i| i.own())
907        {
908            let identity = self.inner.identity.lock().await;
909
910            identity
911                .import_secrets(
912                    public_identity.to_owned(),
913                    export.master_key.as_deref(),
914                    export.self_signing_key.as_deref(),
915                    export.user_signing_key.as_deref(),
916                )
917                .await?;
918
919            let status = identity.status().await;
920
921            let diff = identity.get_public_identity_diff(&public_identity.inner).await;
922
923            let mut changes =
924                Changes { private_identity: Some(identity.clone()), ..Default::default() };
925
926            if diff.none_differ() {
927                public_identity.mark_as_verified();
928                changes.identities.changed.push(UserIdentityData::Own(public_identity.inner));
929            }
930
931            info!(?status, "Successfully imported the private cross-signing keys");
932
933            self.save_changes(changes).await?;
934        } else {
935            warn!(
936                "No public identity found while importing cross-signing keys, \
937                 a /keys/query needs to be done"
938            );
939        }
940
941        Ok(self.inner.identity.lock().await.status().await)
942    }
943
944    /// Export all the secrets we have in the store into a [`SecretsBundle`].
945    ///
946    /// This method will export all the private cross-signing keys and, if
947    /// available, the private part of a backup key and its accompanying
948    /// version.
949    ///
950    /// The method will fail if we don't have all three private cross-signing
951    /// keys available.
952    ///
953    /// **Warning**: Only export this and share it with a trusted recipient,
954    /// i.e. if an existing device is sharing this with a new device.
955    pub async fn export_secrets_bundle(&self) -> Result<SecretsBundle, SecretsBundleExportError> {
956        let Some(cross_signing) = self.export_cross_signing_keys().await? else {
957            return Err(SecretsBundleExportError::MissingCrossSigningKeys);
958        };
959
960        let Some(master_key) = cross_signing.master_key.clone() else {
961            return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::Master));
962        };
963
964        let Some(user_signing_key) = cross_signing.user_signing_key.clone() else {
965            return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::UserSigning));
966        };
967
968        let Some(self_signing_key) = cross_signing.self_signing_key.clone() else {
969            return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::SelfSigning));
970        };
971
972        let backup_keys = self.load_backup_keys().await?;
973
974        let backup = if let Some(key) = backup_keys.decryption_key {
975            if let Some(backup_version) = backup_keys.backup_version {
976                Some(BackupSecrets::MegolmBackupV1Curve25519AesSha2(
977                    MegolmBackupV1Curve25519AesSha2Secrets { key, backup_version },
978                ))
979            } else {
980                return Err(SecretsBundleExportError::MissingBackupVersion);
981            }
982        } else {
983            None
984        };
985
986        Ok(SecretsBundle {
987            cross_signing: CrossSigningSecrets { master_key, user_signing_key, self_signing_key },
988            backup,
989        })
990    }
991
992    /// Import and persists secrets from a [`SecretsBundle`].
993    ///
994    /// This method will import all the private cross-signing keys and, if
995    /// available, the private part of a backup key and its accompanying
996    /// version into the store.
997    ///
998    /// **Warning**: Only import this from a trusted source, i.e. if an existing
999    /// device is sharing this with a new device. The imported cross-signing
1000    /// keys will create a [`OwnUserIdentity`] and mark it as verified.
1001    ///
1002    /// The backup key will be persisted in the store and can be enabled using
1003    /// the [`BackupMachine`].
1004    pub async fn import_secrets_bundle(
1005        &self,
1006        bundle: &SecretsBundle,
1007    ) -> Result<(), SecretImportError> {
1008        let mut changes = Changes::default();
1009
1010        if let Some(backup_bundle) = &bundle.backup {
1011            match backup_bundle {
1012                BackupSecrets::MegolmBackupV1Curve25519AesSha2(bundle) => {
1013                    changes.backup_decryption_key = Some(bundle.key.clone());
1014                    changes.backup_version = Some(bundle.backup_version.clone());
1015                }
1016            }
1017        }
1018
1019        let identity = self.inner.identity.lock().await;
1020
1021        identity
1022            .import_secrets_unchecked(
1023                Some(&bundle.cross_signing.master_key),
1024                Some(&bundle.cross_signing.self_signing_key),
1025                Some(&bundle.cross_signing.user_signing_key),
1026            )
1027            .await?;
1028
1029        let public_identity = identity.to_public_identity().await.expect(
1030            "We should be able to create a new public identity since we just imported \
1031             all the private cross-signing keys",
1032        );
1033
1034        changes.private_identity = Some(identity.clone());
1035        changes.identities.new.push(UserIdentityData::Own(public_identity));
1036
1037        Ok(self.save_changes(changes).await?)
1038    }
1039
1040    /// Import the given `secret` named `secret_name` into the keystore.
1041    pub async fn import_secret(&self, secret: &GossippedSecret) -> Result<(), SecretImportError> {
1042        match &secret.secret_name {
1043            SecretName::CrossSigningMasterKey
1044            | SecretName::CrossSigningUserSigningKey
1045            | SecretName::CrossSigningSelfSigningKey => {
1046                if let Some(public_identity) =
1047                    self.get_identity(self.user_id()).await?.and_then(|i| i.own())
1048                {
1049                    let identity = self.inner.identity.lock().await;
1050
1051                    identity
1052                        .import_secret(
1053                            public_identity,
1054                            &secret.secret_name,
1055                            &secret.event.content.secret,
1056                        )
1057                        .await?;
1058                    info!(
1059                        secret_name = ?secret.secret_name,
1060                        "Successfully imported a private cross signing key"
1061                    );
1062
1063                    let changes =
1064                        Changes { private_identity: Some(identity.clone()), ..Default::default() };
1065
1066                    self.save_changes(changes).await?;
1067                }
1068            }
1069            SecretName::RecoveryKey => {
1070                // We don't import the decryption key here since we'll want to
1071                // check if the public key matches to the latest version on the
1072                // server. We instead put the secret into a secret inbox where
1073                // it will stay until it either gets overwritten
1074                // or the user accepts the secret.
1075            }
1076            name => {
1077                warn!(secret = ?name, "Tried to import an unknown secret");
1078            }
1079        }
1080
1081        Ok(())
1082    }
1083
1084    /// Check whether there is a global flag to only encrypt messages for
1085    /// trusted devices or for everyone.
1086    pub async fn get_only_allow_trusted_devices(&self) -> Result<bool> {
1087        let value = self.get_value("only_allow_trusted_devices").await?.unwrap_or_default();
1088        Ok(value)
1089    }
1090
1091    /// Set global flag whether to encrypt messages for untrusted devices, or
1092    /// whether they should be excluded from the conversation.
1093    pub async fn set_only_allow_trusted_devices(
1094        &self,
1095        block_untrusted_devices: bool,
1096    ) -> Result<()> {
1097        self.set_value("only_allow_trusted_devices", &block_untrusted_devices).await
1098    }
1099
1100    /// Get custom stored value associated with a key
1101    pub async fn get_value<T: DeserializeOwned>(&self, key: &str) -> Result<Option<T>> {
1102        let Some(value) = self.get_custom_value(key).await? else {
1103            return Ok(None);
1104        };
1105        let deserialized = self.deserialize_value(&value)?;
1106        Ok(Some(deserialized))
1107    }
1108
1109    /// Store custom value associated with a key
1110    pub async fn set_value(&self, key: &str, value: &impl Serialize) -> Result<()> {
1111        let serialized = self.serialize_value(value)?;
1112        self.set_custom_value(key, serialized).await?;
1113        Ok(())
1114    }
1115
1116    fn serialize_value(&self, value: &impl Serialize) -> Result<Vec<u8>> {
1117        let serialized =
1118            rmp_serde::to_vec_named(value).map_err(|x| CryptoStoreError::Backend(x.into()))?;
1119        Ok(serialized)
1120    }
1121
1122    fn deserialize_value<T: DeserializeOwned>(&self, value: &[u8]) -> Result<T> {
1123        let deserialized =
1124            rmp_serde::from_slice(value).map_err(|e| CryptoStoreError::Backend(e.into()))?;
1125        Ok(deserialized)
1126    }
1127
1128    /// Receive notifications of room keys being received as a [`Stream`].
1129    ///
1130    /// Each time a room key is updated in any way, an update will be sent to
1131    /// the stream. Updates that happen at the same time are batched into a
1132    /// [`Vec`].
1133    ///
1134    /// If the reader of the stream lags too far behind an error will be sent to
1135    /// the reader.
1136    ///
1137    /// The stream will terminate once all references to the underlying
1138    /// `CryptoStoreWrapper` are dropped.
1139    pub fn room_keys_received_stream(
1140        &self,
1141    ) -> impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<> {
1142        self.inner.store.room_keys_received_stream()
1143    }
1144
1145    /// Receive notifications of received `m.room_key.withheld` messages.
1146    ///
1147    /// Each time an `m.room_key.withheld` is received and stored, an update
1148    /// will be sent to the stream. Updates that happen at the same time are
1149    /// batched into a [`Vec`].
1150    ///
1151    /// If the reader of the stream lags too far behind, a warning will be
1152    /// logged and items will be dropped.
1153    pub fn room_keys_withheld_received_stream(
1154        &self,
1155    ) -> impl Stream<Item = Vec<RoomKeyWithheldInfo>> + use<> {
1156        self.inner.store.room_keys_withheld_received_stream()
1157    }
1158
1159    /// Returns a stream of user identity updates, allowing users to listen for
1160    /// notifications about new or changed user identities.
1161    ///
1162    /// The stream produced by this method emits updates whenever a new user
1163    /// identity is discovered or when an existing identities information is
1164    /// changed. Users can subscribe to this stream and receive updates in
1165    /// real-time.
1166    ///
1167    /// Caution: the returned stream will never terminate, and it holds a
1168    /// reference to the [`CryptoStore`]. Listeners should be careful to avoid
1169    /// resource leaks.
1170    ///
1171    /// # Examples
1172    ///
1173    /// ```no_run
1174    /// # use matrix_sdk_crypto::OlmMachine;
1175    /// # use ruma::{device_id, user_id};
1176    /// # use futures_util::{pin_mut, StreamExt};
1177    /// # let machine: OlmMachine = unimplemented!();
1178    /// # futures_executor::block_on(async {
1179    /// let identities_stream = machine.store().user_identities_stream();
1180    /// pin_mut!(identities_stream);
1181    ///
1182    /// for identity_updates in identities_stream.next().await {
1183    ///     for (_, identity) in identity_updates.new {
1184    ///         println!("A new identity has been added {}", identity.user_id());
1185    ///     }
1186    /// }
1187    /// # });
1188    /// ```
1189    pub fn user_identities_stream(&self) -> impl Stream<Item = IdentityUpdates> + use<> {
1190        let verification_machine = self.inner.verification_machine.to_owned();
1191
1192        let this = self.clone();
1193        self.inner.store.identities_stream().map(move |(own_identity, identities, _)| {
1194            let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
1195
1196            let map_identity = |(user_id, identity)| {
1197                (
1198                    user_id,
1199                    UserIdentity::new(
1200                        this.clone(),
1201                        identity,
1202                        verification_machine.to_owned(),
1203                        own_identity.to_owned(),
1204                    ),
1205                )
1206            };
1207
1208            let new = new_identities.into_iter().map(map_identity).collect();
1209            let changed = changed_identities.into_iter().map(map_identity).collect();
1210            let unchanged = unchanged_identities.into_iter().map(map_identity).collect();
1211
1212            IdentityUpdates { new, changed, unchanged }
1213        })
1214    }
1215
1216    /// Returns a stream of device updates, allowing users to listen for
1217    /// notifications about new or changed devices.
1218    ///
1219    /// The stream produced by this method emits updates whenever a new device
1220    /// is discovered or when an existing device's information is changed. Users
1221    /// can subscribe to this stream and receive updates in real-time.
1222    ///
1223    /// Caution: the returned stream will never terminate, and it holds a
1224    /// reference to the [`CryptoStore`]. Listeners should be careful to avoid
1225    /// resource leaks.
1226    ///
1227    /// # Examples
1228    ///
1229    /// ```no_run
1230    /// # use matrix_sdk_crypto::OlmMachine;
1231    /// # use ruma::{device_id, user_id};
1232    /// # use futures_util::{pin_mut, StreamExt};
1233    /// # let machine: OlmMachine = unimplemented!();
1234    /// # futures_executor::block_on(async {
1235    /// let devices_stream = machine.store().devices_stream();
1236    /// pin_mut!(devices_stream);
1237    ///
1238    /// for device_updates in devices_stream.next().await {
1239    ///     if let Some(user_devices) = device_updates.new.get(machine.user_id()) {
1240    ///         for device in user_devices.values() {
1241    ///             println!("A new device has been added {}", device.device_id());
1242    ///         }
1243    ///     }
1244    /// }
1245    /// # });
1246    /// ```
1247    pub fn devices_stream(&self) -> impl Stream<Item = DeviceUpdates> + use<> {
1248        let verification_machine = self.inner.verification_machine.to_owned();
1249
1250        self.inner.store.identities_stream().map(move |(own_identity, identities, devices)| {
1251            collect_device_updates(
1252                verification_machine.to_owned(),
1253                own_identity,
1254                identities,
1255                devices,
1256            )
1257        })
1258    }
1259
1260    /// Returns a [`Stream`] of user identity and device updates
1261    ///
1262    /// The stream returned by this method returns the same data as
1263    /// [`Store::user_identities_stream`] and [`Store::devices_stream`] but does
1264    /// not include references to the `VerificationMachine`. It is therefore a
1265    /// lower-level view on that data.
1266    ///
1267    /// The stream will terminate once all references to the underlying
1268    /// `CryptoStoreWrapper` are dropped.
1269    pub fn identities_stream_raw(
1270        &self,
1271    ) -> impl Stream<Item = (IdentityChanges, DeviceChanges)> + use<> {
1272        self.inner.store.identities_stream().map(|(_, identities, devices)| (identities, devices))
1273    }
1274
1275    /// Creates a [`CrossProcessLock`] for this store, that will contain the
1276    /// given key and value when hold.
1277    pub fn create_store_lock(
1278        &self,
1279        lock_key: String,
1280        lock_value: String,
1281    ) -> CrossProcessLock<LockableCryptoStore> {
1282        self.inner.store.create_store_lock(lock_key, lock_value)
1283    }
1284
1285    /// Receive notifications of gossipped secrets being received and stored in
1286    /// the secret inbox as a [`Stream`].
1287    ///
1288    /// The gossipped secrets are received using the `m.secret.send` event type
1289    /// and are guaranteed to have been received over a 1-to-1 Olm
1290    /// [`Session`] from a verified [`Device`].
1291    ///
1292    /// The [`GossippedSecret`] can also be later found in the secret inbox and
1293    /// retrieved using the [`CryptoStore::get_secrets_from_inbox()`] method.
1294    ///
1295    /// After a suitable secret of a certain type has been found it can be
1296    /// removed from the store
1297    /// using the [`CryptoStore::delete_secrets_from_inbox()`] method.
1298    ///
1299    /// The only secret this will currently broadcast is the
1300    /// `m.megolm_backup.v1`.
1301    ///
1302    /// If the reader of the stream lags too far behind, a warning will be
1303    /// logged and items will be dropped.
1304    ///
1305    /// # Examples
1306    ///
1307    /// ```no_run
1308    /// # use matrix_sdk_crypto::OlmMachine;
1309    /// # use ruma::{device_id, user_id};
1310    /// # use futures_util::{pin_mut, StreamExt};
1311    /// # let alice = user_id!("@alice:example.org").to_owned();
1312    /// # futures_executor::block_on(async {
1313    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1314    ///
1315    /// let secret_stream = machine.store().secrets_stream();
1316    /// pin_mut!(secret_stream);
1317    ///
1318    /// for secret in secret_stream.next().await {
1319    ///     // Accept the secret if it's valid, then delete all the secrets of this type.
1320    ///     machine.store().delete_secrets_from_inbox(&secret.secret_name);
1321    /// }
1322    /// # });
1323    /// ```
1324    pub fn secrets_stream(&self) -> impl Stream<Item = GossippedSecret> + use<> {
1325        self.inner.store.secrets_stream()
1326    }
1327
1328    /// Receive notifications of historic room key bundles as a [`Stream`].
1329    ///
1330    /// Historic room key bundles are defined in [MSC4268](https://github.com/matrix-org/matrix-spec-proposals/pull/4268).
1331    ///
1332    /// Each time a historic room key bundle was received, an update will be
1333    /// sent to the stream. This stream can be used to accept historic room key
1334    /// bundles that arrive out of order, i.e. the bundle arrives after the
1335    /// user has already accepted a room invitation.
1336    ///
1337    /// # Examples
1338    ///
1339    /// ```no_run
1340    /// # use matrix_sdk_crypto::{
1341    /// #    OlmMachine,
1342    /// #    store::types::StoredRoomKeyBundleData,
1343    /// #    types::room_history::RoomKeyBundle
1344    /// # };
1345    /// # use ruma::{device_id, user_id};
1346    /// # use futures_util::{pin_mut, StreamExt};
1347    /// # let alice = user_id!("@alice:example.org").to_owned();
1348    /// # async {
1349    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1350    /// let bundle_stream = machine.store().historic_room_key_stream();
1351    /// pin_mut!(bundle_stream);
1352    ///
1353    /// while let Some(bundle_info) = bundle_stream.next().await {
1354    ///     // Try to find the bundle content in the store and if it's valid accept it.
1355    ///     if let Some(bundle_data) = machine.store().get_received_room_key_bundle_data(&bundle_info.room_id, &bundle_info.sender).await? {
1356    ///         // Download the bundle now and import it.
1357    ///         let bundle: RoomKeyBundle = todo!("Download the bundle");
1358    ///         machine.store().receive_room_key_bundle(
1359    ///             &bundle_data,
1360    ///             bundle,
1361    ///             |_, _| {},
1362    ///         ).await?;
1363    ///     }
1364    /// }
1365    /// # anyhow::Ok(()) };
1366    /// ```
1367    pub fn historic_room_key_stream(&self) -> impl Stream<Item = RoomKeyBundleInfo> + use<> {
1368        self.inner.store.historic_room_key_stream()
1369    }
1370
1371    /// Import the given room keys into the store.
1372    ///
1373    /// # Arguments
1374    ///
1375    /// * `exported_keys` - The keys to be imported.
1376    /// * `from_backup_version` - If the keys came from key backup, the key
1377    ///   backup version. This will cause the keys to be marked as already
1378    ///   backed up, and therefore not requiring another backup.
1379    /// * `progress_listener` - Callback which will be called after each key is
1380    ///   processed. Called with arguments `(processed, total)` where
1381    ///   `processed` is the number of keys processed so far, and `total` is the
1382    ///   total number of keys (i.e., `exported_keys.len()`).
1383    pub async fn import_room_keys(
1384        &self,
1385        exported_keys: Vec<ExportedRoomKey>,
1386        from_backup_version: Option<&str>,
1387        progress_listener: impl Fn(usize, usize),
1388    ) -> Result<RoomKeyImportResult> {
1389        let exported_keys: Vec<&ExportedRoomKey> = exported_keys.iter().collect();
1390        self.import_sessions_impl(exported_keys, from_backup_version, progress_listener).await
1391    }
1392
1393    /// Import the given room keys into our store.
1394    ///
1395    /// # Arguments
1396    ///
1397    /// * `exported_keys` - A list of previously exported keys that should be
1398    ///   imported into our store. If we already have a better version of a key
1399    ///   the key will *not* be imported.
1400    ///
1401    /// Returns a tuple of numbers that represent the number of sessions that
1402    /// were imported and the total number of sessions that were found in the
1403    /// key export.
1404    ///
1405    /// # Examples
1406    ///
1407    /// ```no_run
1408    /// # use std::io::Cursor;
1409    /// # use matrix_sdk_crypto::{OlmMachine, decrypt_room_key_export};
1410    /// # use ruma::{device_id, user_id};
1411    /// # let alice = user_id!("@alice:example.org");
1412    /// # async {
1413    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1414    /// # let export = Cursor::new("".to_owned());
1415    /// let exported_keys = decrypt_room_key_export(export, "1234").unwrap();
1416    /// machine.store().import_exported_room_keys(exported_keys, |_, _| {}).await.unwrap();
1417    /// # };
1418    /// ```
1419    pub async fn import_exported_room_keys(
1420        &self,
1421        exported_keys: Vec<ExportedRoomKey>,
1422        progress_listener: impl Fn(usize, usize),
1423    ) -> Result<RoomKeyImportResult> {
1424        self.import_room_keys(exported_keys, None, progress_listener).await
1425    }
1426
1427    async fn import_sessions_impl<T>(
1428        &self,
1429        room_keys: Vec<T>,
1430        from_backup_version: Option<&str>,
1431        progress_listener: impl Fn(usize, usize),
1432    ) -> Result<RoomKeyImportResult>
1433    where
1434        T: TryInto<InboundGroupSession> + RoomKeyExport + Copy,
1435        T::Error: Debug,
1436    {
1437        let mut sessions = Vec::new();
1438
1439        async fn new_session_better(
1440            session: &InboundGroupSession,
1441            old_session: Option<InboundGroupSession>,
1442        ) -> bool {
1443            if let Some(old_session) = &old_session {
1444                session.compare(old_session).await == SessionOrdering::Better
1445            } else {
1446                true
1447            }
1448        }
1449
1450        let total_count = room_keys.len();
1451        let mut keys = BTreeMap::new();
1452
1453        for (i, key) in room_keys.into_iter().enumerate() {
1454            match key.try_into() {
1455                Ok(session) => {
1456                    let old_session = self
1457                        .inner
1458                        .store
1459                        .get_inbound_group_session(session.room_id(), session.session_id())
1460                        .await?;
1461
1462                    // Only import the session if we didn't have this session or
1463                    // if it's a better version of the same session.
1464                    if new_session_better(&session, old_session).await {
1465                        if from_backup_version.is_some() {
1466                            session.mark_as_backed_up();
1467                        }
1468
1469                        keys.entry(session.room_id().to_owned())
1470                            .or_insert_with(BTreeMap::new)
1471                            .entry(session.sender_key().to_base64())
1472                            .or_insert_with(BTreeSet::new)
1473                            .insert(session.session_id().to_owned());
1474
1475                        sessions.push(session);
1476                    }
1477                }
1478                Err(e) => {
1479                    warn!(
1480                        sender_key = key.sender_key().to_base64(),
1481                        room_id = ?key.room_id(),
1482                        session_id = key.session_id(),
1483                        error = ?e,
1484                        "Couldn't import a room key from a file export."
1485                    );
1486                }
1487            }
1488
1489            progress_listener(i, total_count);
1490        }
1491
1492        let imported_count = sessions.len();
1493
1494        self.inner.store.save_inbound_group_sessions(sessions, from_backup_version).await?;
1495
1496        info!(total_count, imported_count, room_keys = ?keys, "Successfully imported room keys");
1497
1498        Ok(RoomKeyImportResult::new(imported_count, total_count, keys))
1499    }
1500
1501    pub(crate) fn crypto_store(&self) -> Arc<CryptoStoreWrapper> {
1502        self.inner.store.clone()
1503    }
1504
1505    /// Export the keys that match the given predicate.
1506    ///
1507    /// # Arguments
1508    ///
1509    /// * `predicate` - A closure that will be called for every known
1510    ///   `InboundGroupSession`, which represents a room key. If the closure
1511    ///   returns `true` the `InboundGroupSession` will be included in the
1512    ///   export, if the closure returns `false` it will not be included.
1513    ///
1514    /// # Examples
1515    ///
1516    /// ```no_run
1517    /// # use matrix_sdk_crypto::{OlmMachine, encrypt_room_key_export};
1518    /// # use ruma::{device_id, user_id, room_id};
1519    /// # let alice = user_id!("@alice:example.org");
1520    /// # async {
1521    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1522    /// let room_id = room_id!("!test:localhost");
1523    /// let exported_keys = machine.store().export_room_keys(|s| s.room_id() == room_id).await.unwrap();
1524    /// let encrypted_export = encrypt_room_key_export(&exported_keys, "1234", 1);
1525    /// # };
1526    /// ```
1527    pub async fn export_room_keys(
1528        &self,
1529        predicate: impl FnMut(&InboundGroupSession) -> bool,
1530    ) -> Result<Vec<ExportedRoomKey>> {
1531        let mut exported = Vec::new();
1532
1533        let mut sessions = self.get_inbound_group_sessions().await?;
1534        sessions.retain(predicate);
1535
1536        for session in sessions {
1537            let export = session.export().await;
1538            exported.push(export);
1539        }
1540
1541        Ok(exported)
1542    }
1543
1544    /// Export room keys matching a predicate, providing them as an async
1545    /// `Stream`.
1546    ///
1547    /// # Arguments
1548    ///
1549    /// * `predicate` - A closure that will be called for every known
1550    ///   `InboundGroupSession`, which represents a room key. If the closure
1551    ///   returns `true` the `InboundGroupSession` will be included in the
1552    ///   export, if the closure returns `false` it will not be included.
1553    ///
1554    /// # Examples
1555    ///
1556    /// ```no_run
1557    /// use std::pin::pin;
1558    ///
1559    /// use matrix_sdk_crypto::{olm::ExportedRoomKey, OlmMachine};
1560    /// use ruma::{device_id, room_id, user_id};
1561    /// use tokio_stream::StreamExt;
1562    /// # async {
1563    /// let alice = user_id!("@alice:example.org");
1564    /// let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1565    /// let room_id = room_id!("!test:localhost");
1566    /// let mut keys = pin!(machine
1567    ///     .store()
1568    ///     .export_room_keys_stream(|s| s.room_id() == room_id)
1569    ///     .await
1570    ///     .unwrap());
1571    /// while let Some(key) = keys.next().await {
1572    ///     println!("{}", key.room_id);
1573    /// }
1574    /// # };
1575    /// ```
1576    pub async fn export_room_keys_stream(
1577        &self,
1578        predicate: impl FnMut(&InboundGroupSession) -> bool,
1579    ) -> Result<impl Stream<Item = ExportedRoomKey>> {
1580        // TODO: if/when there is a get_inbound_group_sessions_stream, use that here.
1581        let sessions = self.get_inbound_group_sessions().await?;
1582        Ok(futures_util::stream::iter(sessions.into_iter().filter(predicate))
1583            .then(|session| async move { session.export().await }))
1584    }
1585
1586    /// Assemble a room key bundle for sharing encrypted history, as per
1587    /// [MSC4268].
1588    ///
1589    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
1590    pub async fn build_room_key_bundle(
1591        &self,
1592        room_id: &RoomId,
1593    ) -> std::result::Result<RoomKeyBundle, CryptoStoreError> {
1594        let sessions = self.get_inbound_group_sessions_by_room_id(room_id).await?;
1595
1596        let mut bundle = RoomKeyBundle::default();
1597        for session in sessions {
1598            if session.shared_history() {
1599                bundle.room_keys.push(session.export().await.into());
1600            } else {
1601                bundle.withheld.push(RoomKeyWithheldContent::new(
1602                    session.algorithm().to_owned(),
1603                    WithheldCode::HistoryNotShared,
1604                    session.room_id().to_owned(),
1605                    session.session_id().to_owned(),
1606                    session.sender_key().to_owned(),
1607                    self.device_id().to_owned(),
1608                ));
1609            }
1610        }
1611
1612        // If we received a key bundle ourselves, in which one or more sessions was
1613        // marked as "history not shared", pass that on to the new user.
1614        let withhelds = self.get_withheld_sessions_by_room_id(room_id).await?;
1615        for withheld in withhelds {
1616            if withheld.content.withheld_code() == WithheldCode::HistoryNotShared {
1617                bundle.withheld.push(withheld.content);
1618            }
1619        }
1620
1621        Ok(bundle)
1622    }
1623
1624    /// Import the contents of a downloaded and decrypted [MSC4268] key bundle.
1625    ///
1626    /// # Arguments
1627    ///
1628    /// * `bundle_info` - The [`StoredRoomKeyBundleData`] of the bundle that is
1629    ///   being received.
1630    /// * `bundle` - The decrypted and deserialized bundle itself.
1631    ///
1632    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
1633    #[instrument(skip(self, bundle, progress_listener), fields(bundle_size = bundle.room_keys.len(), sender_data))]
1634    pub async fn receive_room_key_bundle(
1635        &self,
1636        bundle_info: &StoredRoomKeyBundleData,
1637        bundle: RoomKeyBundle,
1638        progress_listener: impl Fn(usize, usize),
1639    ) -> Result<(), CryptoStoreError> {
1640        let sender_data = if bundle_info.sender_data.should_recalculate() {
1641            let device = self
1642                .get_device_from_curve_key(&bundle_info.sender_user, bundle_info.sender_key)
1643                .await?;
1644
1645            device
1646                .as_ref()
1647                .map(SenderData::from_device)
1648                .unwrap_or_else(|| bundle_info.sender_data.clone())
1649        } else {
1650            bundle_info.sender_data.clone()
1651        };
1652
1653        tracing::Span::current().record("sender_data", tracing::field::debug(&sender_data));
1654
1655        if matches!(
1656            &sender_data,
1657            SenderData::UnknownDevice { .. }
1658                | SenderData::VerificationViolation(_)
1659                | SenderData::DeviceInfo { .. }
1660        ) {
1661            warn!(
1662                "Not accepting a historic room key bundle due to insufficient trust in the sender"
1663            );
1664            return Ok(());
1665        }
1666
1667        self.import_room_key_bundle_sessions(bundle_info, &bundle, progress_listener).await?;
1668        self.import_room_key_bundle_withheld_info(bundle_info, &bundle).await?;
1669
1670        Ok(())
1671    }
1672
1673    async fn import_room_key_bundle_sessions(
1674        &self,
1675        bundle_info: &StoredRoomKeyBundleData,
1676        bundle: &RoomKeyBundle,
1677        progress_listener: impl Fn(usize, usize),
1678    ) -> Result<(), CryptoStoreError> {
1679        let (good, bad): (Vec<_>, Vec<_>) = bundle.room_keys.iter().partition_map(|key| {
1680            if key.room_id != bundle_info.bundle_data.room_id {
1681                trace!("Ignoring key for incorrect room {} in bundle", key.room_id);
1682                Either::Right(key)
1683            } else {
1684                Either::Left(key)
1685            }
1686        });
1687
1688        match (bad.is_empty(), good.is_empty()) {
1689            // Case 1: Completely empty bundle.
1690            (true, true) => {
1691                warn!("Received a completely empty room key bundle");
1692            }
1693
1694            // Case 2: A bundle for the wrong room.
1695            (false, true) => {
1696                let bad_keys: Vec<_> =
1697                    bad.iter().map(|&key| (&key.room_id, &key.session_id)).collect();
1698
1699                warn!(
1700                    ?bad_keys,
1701                    "Received a room key bundle for the wrong room, ignoring all room keys from the bundle"
1702                );
1703            }
1704
1705            // Case 3: A bundle containing useful room keys.
1706            (_, false) => {
1707                // We have at least some good keys, if we also have some bad ones let's
1708                // mention that here.
1709                if !bad.is_empty() {
1710                    warn!(
1711                        bad_key_count = bad.len(),
1712                        "The room key bundle contained some room keys \
1713                         that were meant for a different room"
1714                    );
1715                }
1716
1717                self.import_sessions_impl(good, None, progress_listener).await?;
1718            }
1719        }
1720
1721        Ok(())
1722    }
1723
1724    async fn import_room_key_bundle_withheld_info(
1725        &self,
1726        bundle_info: &StoredRoomKeyBundleData,
1727        bundle: &RoomKeyBundle,
1728    ) -> Result<(), CryptoStoreError> {
1729        let mut session_id_to_withheld_code_map = BTreeMap::new();
1730
1731        let mut changes = Changes::default();
1732        for withheld in &bundle.withheld {
1733            let (room_id, session_id) = match withheld {
1734                RoomKeyWithheldContent::MegolmV1AesSha2(c) => match (c.room_id(), c.session_id()) {
1735                    (Some(room_id), Some(session_id)) => (room_id, session_id),
1736                    _ => continue,
1737                },
1738                #[cfg(feature = "experimental-algorithms")]
1739                RoomKeyWithheldContent::MegolmV2AesSha2(c) => match (c.room_id(), c.session_id()) {
1740                    (Some(room_id), Some(session_id)) => (room_id, session_id),
1741                    _ => continue,
1742                },
1743                RoomKeyWithheldContent::Unknown(_) => continue,
1744            };
1745
1746            if room_id != bundle_info.bundle_data.room_id {
1747                trace!("Ignoring withheld info for incorrect room {} in bundle", room_id);
1748                continue;
1749            }
1750
1751            changes.withheld_session_info.entry(room_id.to_owned()).or_default().insert(
1752                session_id.to_owned(),
1753                RoomKeyWithheldEntry {
1754                    sender: bundle_info.sender_user.clone(),
1755                    content: withheld.to_owned(),
1756                },
1757            );
1758            session_id_to_withheld_code_map.insert(session_id, withheld.withheld_code());
1759        }
1760
1761        self.save_changes(changes).await?;
1762
1763        info!(
1764            room_id = ?bundle_info.bundle_data.room_id,
1765            ?session_id_to_withheld_code_map,
1766            "Successfully imported withheld info from room key bundle",
1767        );
1768
1769        Ok(())
1770    }
1771}
1772
1773impl Deref for Store {
1774    type Target = DynCryptoStore;
1775
1776    fn deref(&self) -> &Self::Target {
1777        self.inner.store.deref().deref()
1778    }
1779}
1780
1781/// A crypto store that implements primitives for cross-process locking.
1782#[derive(Clone, Debug)]
1783pub struct LockableCryptoStore(Arc<dyn CryptoStore<Error = CryptoStoreError>>);
1784
1785impl matrix_sdk_common::cross_process_lock::TryLock for LockableCryptoStore {
1786    type LockError = CryptoStoreError;
1787
1788    async fn try_lock(
1789        &self,
1790        lease_duration_ms: u32,
1791        key: &str,
1792        holder: &str,
1793    ) -> std::result::Result<bool, Self::LockError> {
1794        self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
1795    }
1796}
1797
1798#[cfg(test)]
1799mod tests {
1800    use std::pin::pin;
1801
1802    use assert_matches2::assert_matches;
1803    use futures_util::StreamExt;
1804    use insta::{_macro_support::Content, assert_json_snapshot, internals::ContentPath};
1805    use matrix_sdk_test::async_test;
1806    use ruma::{
1807        device_id,
1808        events::room::{EncryptedFileInit, JsonWebKeyInit},
1809        owned_device_id, owned_mxc_uri, room_id,
1810        serde::Base64,
1811        user_id, RoomId,
1812    };
1813    use serde_json::json;
1814    use vodozemac::megolm::SessionKey;
1815
1816    use crate::{
1817        machine::test_helpers::get_machine_pair,
1818        olm::{InboundGroupSession, SenderData},
1819        store::types::{DehydratedDeviceKey, RoomKeyWithheldEntry, StoredRoomKeyBundleData},
1820        types::{
1821            events::{
1822                room_key_bundle::RoomKeyBundleContent,
1823                room_key_withheld::{MegolmV1AesSha2WithheldContent, RoomKeyWithheldContent},
1824            },
1825            EventEncryptionAlgorithm,
1826        },
1827        OlmMachine,
1828    };
1829
1830    #[async_test]
1831    async fn test_import_room_keys_notifies_stream() {
1832        use futures_util::FutureExt;
1833
1834        let (alice, bob, _) =
1835            get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
1836
1837        let room1_id = room_id!("!room1:localhost");
1838        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
1839        let exported_sessions = alice.store().export_room_keys(|_| true).await.unwrap();
1840
1841        let mut room_keys_received_stream = Box::pin(bob.store().room_keys_received_stream());
1842        bob.store().import_room_keys(exported_sessions, None, |_, _| {}).await.unwrap();
1843
1844        let room_keys = room_keys_received_stream
1845            .next()
1846            .now_or_never()
1847            .flatten()
1848            .expect("We should have received an update of room key infos")
1849            .unwrap();
1850        assert_eq!(room_keys.len(), 1);
1851        assert_eq!(room_keys[0].room_id, "!room1:localhost");
1852    }
1853
1854    #[async_test]
1855    async fn test_export_room_keys_provides_selected_keys() {
1856        // Given an OlmMachine with room keys in it
1857        let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
1858        let room1_id = room_id!("!room1:localhost");
1859        let room2_id = room_id!("!room2:localhost");
1860        let room3_id = room_id!("!room3:localhost");
1861        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
1862        alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
1863        alice.create_outbound_group_session_with_defaults_test_helper(room3_id).await.unwrap();
1864
1865        // When I export some of the keys
1866        let keys = alice
1867            .store()
1868            .export_room_keys(|s| s.room_id() == room2_id || s.room_id() == room3_id)
1869            .await
1870            .unwrap();
1871
1872        // Then the requested keys were provided
1873        assert_eq!(keys.len(), 2);
1874        assert_eq!(keys[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
1875        assert_eq!(keys[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
1876        assert_eq!(keys[0].room_id, "!room2:localhost");
1877        assert_eq!(keys[1].room_id, "!room3:localhost");
1878        assert_eq!(keys[0].session_key.to_base64().len(), 220);
1879        assert_eq!(keys[1].session_key.to_base64().len(), 220);
1880    }
1881
1882    #[async_test]
1883    async fn test_export_room_keys_stream_can_provide_all_keys() {
1884        // Given an OlmMachine with room keys in it
1885        let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
1886        let room1_id = room_id!("!room1:localhost");
1887        let room2_id = room_id!("!room2:localhost");
1888        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
1889        alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
1890
1891        // When I export the keys as a stream
1892        let mut keys = pin!(alice.store().export_room_keys_stream(|_| true).await.unwrap());
1893
1894        // And collect them
1895        let mut collected = vec![];
1896        while let Some(key) = keys.next().await {
1897            collected.push(key);
1898        }
1899
1900        // Then all the keys were provided
1901        assert_eq!(collected.len(), 2);
1902        assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
1903        assert_eq!(collected[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
1904        assert_eq!(collected[0].room_id, "!room1:localhost");
1905        assert_eq!(collected[1].room_id, "!room2:localhost");
1906        assert_eq!(collected[0].session_key.to_base64().len(), 220);
1907        assert_eq!(collected[1].session_key.to_base64().len(), 220);
1908    }
1909
1910    #[async_test]
1911    async fn test_export_room_keys_stream_can_provide_a_subset_of_keys() {
1912        // Given an OlmMachine with room keys in it
1913        let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
1914        let room1_id = room_id!("!room1:localhost");
1915        let room2_id = room_id!("!room2:localhost");
1916        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
1917        alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
1918
1919        // When I export the keys as a stream
1920        let mut keys =
1921            pin!(alice.store().export_room_keys_stream(|s| s.room_id() == room1_id).await.unwrap());
1922
1923        // And collect them
1924        let mut collected = vec![];
1925        while let Some(key) = keys.next().await {
1926            collected.push(key);
1927        }
1928
1929        // Then all the keys matching our predicate were provided, and no others
1930        assert_eq!(collected.len(), 1);
1931        assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
1932        assert_eq!(collected[0].room_id, "!room1:localhost");
1933        assert_eq!(collected[0].session_key.to_base64().len(), 220);
1934    }
1935
1936    #[async_test]
1937    async fn test_export_secrets_bundle() {
1938        let user_id = user_id!("@alice:example.com");
1939        let (first, second, _) = get_machine_pair(user_id, user_id, false).await;
1940
1941        let _ = first
1942            .bootstrap_cross_signing(false)
1943            .await
1944            .expect("We should be able to bootstrap cross-signing");
1945
1946        let bundle = first.store().export_secrets_bundle().await.expect(
1947            "We should be able to export the secrets bundle, now that we \
1948             have the cross-signing keys",
1949        );
1950
1951        assert!(bundle.backup.is_none(), "The bundle should not contain a backup key");
1952
1953        second
1954            .store()
1955            .import_secrets_bundle(&bundle)
1956            .await
1957            .expect("We should be able to import the secrets bundle");
1958
1959        let status = second.cross_signing_status().await;
1960        let identity = second.get_identity(user_id, None).await.unwrap().unwrap().own().unwrap();
1961
1962        assert!(identity.is_verified(), "The public identity should be marked as verified.");
1963
1964        assert!(status.is_complete(), "We should have imported all the cross-signing keys");
1965    }
1966
1967    #[async_test]
1968    async fn test_create_dehydrated_device_key() {
1969        let pickle_key = DehydratedDeviceKey::new()
1970            .expect("Should be able to create a random dehydrated device key");
1971
1972        let to_vec = pickle_key.inner.to_vec();
1973        let pickle_key_from_slice = DehydratedDeviceKey::from_slice(to_vec.as_slice())
1974            .expect("Should be able to create a dehydrated device key from slice");
1975
1976        assert_eq!(pickle_key_from_slice.to_base64(), pickle_key.to_base64());
1977    }
1978
1979    #[async_test]
1980    async fn test_create_dehydrated_errors() {
1981        let too_small = [0u8; 22];
1982        let pickle_key = DehydratedDeviceKey::from_slice(&too_small);
1983
1984        assert!(pickle_key.is_err());
1985
1986        let too_big = [0u8; 40];
1987        let pickle_key = DehydratedDeviceKey::from_slice(&too_big);
1988
1989        assert!(pickle_key.is_err());
1990    }
1991
1992    #[async_test]
1993    async fn test_build_room_key_bundle() {
1994        // Given: Alice has sent a number of room keys to Bob, including some in the
1995        // wrong room, and some that are not marked as shared...
1996        let alice = OlmMachine::new(user_id!("@a:s.co"), device_id!("ALICE")).await;
1997        let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("BOB")).await;
1998
1999        let room1_id = room_id!("!room1:localhost");
2000        let room2_id = room_id!("!room2:localhost");
2001
2002        /* We use hardcoded megolm session data, to get a stable output snapshot. These were all created with:
2003
2004           println!("{}", vodozemac::megolm::GroupSession::new(Default::default()).session_key().to_base64());
2005        */
2006        let session_key1 = "AgAAAAC2XHVzsMBKs4QCRElJ92CJKyGtknCSC8HY7cQ7UYwndMKLQAejXLh5UA0l6s736mgctcUMNvELScUWrObdflrHo+vth/gWreXOaCnaSxmyjjKErQwyIYTkUfqbHy40RJfEesLwnN23on9XAkch/iy8R2+Jz7B8zfG01f2Ow2SxPQFnAndcO1ZSD2GmXgedy6n4B20MWI1jGP2wiexOWbFSya8DO/VxC9m5+/mF+WwYqdpKn9g4Y05Yw4uz7cdjTc3rXm7xK+8E7hI//5QD1nHPvuKYbjjM9u2JSL+Bzp61Cw";
2007        let session_key2 = "AgAAAAC1BXreFTUQQSBGekTEuYxhdytRKyv4JgDGcG+VOBYdPNGgs807SdibCGJky4lJ3I+7ZDGHoUzZPZP/4ogGu4kxni0PWdtWuN7+5zsuamgoFF/BkaGeUUGv6kgIkx8pyPpM5SASTUEP9bN2loDSpUPYwfiIqz74DgC4WQ4435sTBctYvKz8n+TDJwdLXpyT6zKljuqADAioud+s/iqx9LYn9HpbBfezZcvbg67GtE113pLrvde3IcPI5s6dNHK2onGO2B2eoaobcen18bbEDnlUGPeIivArLya7Da6us14jBQ";
2008        let session_key3 = "AgAAAAAM9KFsliaUUhGSXgwOzM5UemjkNH4n8NHgvC/y8hhw13zTF+ooGD4uIYEXYX630oNvQm/EvgZo+dkoc0re+vsqsx4sQeNODdSjcBsWOa0oDF+irQn9oYoLUDPI1IBtY1rX+FV99Zm/xnG7uFOX7aTVlko2GSdejy1w9mfobmfxu5aUc04A9zaKJP1pOthZvRAlhpymGYHgsDtWPrrjyc/yypMflE4kIUEEEtu1kT6mrAmcl615XYRAHYK9G2+fZsGvokwzbkl4nulGwcZMpQEoM0nD2o3GWgX81HW3nGfKBg";
2009        let session_key4 = "AgAAAAA4Kkesxq2h4v9PLD6Sm3Smxspz1PXTqytQPCMQMkkrHNmzV2bHlJ+6/Al9cu8vh1Oj69AK0WUAeJOJuaiskEeg/PI3P03+UYLeC379RzgqwSHdBgdQ41G2vD6zpgmE/8vYToe+qpCZACtPOswZxyqxHH+T/Iq0nv13JmlFGIeA6fEPfr5Y28B49viG74Fs9rxV9EH5PfjbuPM/p+Sz5obShuaBPKQBX1jT913nEXPoIJ06exNZGr0285nw/LgVvNlmWmbqNnbzO2cNZjQWA+xZYz5FSfyCxwqEBbEdUCuRCQ";
2010
2011        let sessions = [
2012            create_inbound_group_session_with_visibility(
2013                &alice,
2014                room1_id,
2015                &SessionKey::from_base64(session_key1).unwrap(),
2016                true,
2017            ),
2018            create_inbound_group_session_with_visibility(
2019                &alice,
2020                room1_id,
2021                &SessionKey::from_base64(session_key2).unwrap(),
2022                true,
2023            ),
2024            create_inbound_group_session_with_visibility(
2025                &alice,
2026                room1_id,
2027                &SessionKey::from_base64(session_key3).unwrap(),
2028                false,
2029            ),
2030            create_inbound_group_session_with_visibility(
2031                &alice,
2032                room2_id,
2033                &SessionKey::from_base64(session_key4).unwrap(),
2034                true,
2035            ),
2036        ];
2037        bob.store().save_inbound_group_sessions(&sessions).await.unwrap();
2038
2039        // When I build the bundle
2040        let mut bundle = bob.store().build_room_key_bundle(room1_id).await.unwrap();
2041
2042        // Then the bundle matches the snapshot.
2043
2044        // We sort the sessions in the bundle, so that the snapshot is stable.
2045        bundle.room_keys.sort_by_key(|session| session.session_id.clone());
2046
2047        // We substitute the algorithm, since this changes based on feature flags.
2048        let algorithm = if cfg!(feature = "experimental-algorithms") {
2049            "m.megolm.v2.aes-sha2"
2050        } else {
2051            "m.megolm.v1.aes-sha2"
2052        };
2053        let map_algorithm = move |value: Content, _path: ContentPath<'_>| {
2054            assert_eq!(value.as_str().unwrap(), algorithm);
2055            "[algorithm]"
2056        };
2057
2058        // We also substitute alice's keys in the snapshot with placeholders
2059        let alice_curve_key = alice.identity_keys().curve25519.to_base64();
2060        let map_alice_curve_key = move |value: Content, _path: ContentPath<'_>| {
2061            assert_eq!(value.as_str().unwrap(), alice_curve_key);
2062            "[alice curve key]"
2063        };
2064        let alice_ed25519_key = alice.identity_keys().ed25519.to_base64();
2065        let map_alice_ed25519_key = move |value: Content, _path: ContentPath<'_>| {
2066            assert_eq!(value.as_str().unwrap(), alice_ed25519_key);
2067            "[alice ed25519 key]"
2068        };
2069
2070        insta::with_settings!({ sort_maps => true }, {
2071            assert_json_snapshot!(bundle, {
2072                ".withheld[].algorithm" => insta::dynamic_redaction(map_algorithm),
2073                ".room_keys[].algorithm" => insta::dynamic_redaction(map_algorithm),
2074                ".room_keys[].sender_key" => insta::dynamic_redaction(map_alice_curve_key.clone()),
2075                ".withheld[].sender_key" => insta::dynamic_redaction(map_alice_curve_key),
2076                ".room_keys[].sender_claimed_keys.ed25519" => insta::dynamic_redaction(map_alice_ed25519_key),
2077            });
2078        });
2079    }
2080
2081    #[async_test]
2082    async fn test_receive_room_key_bundle() {
2083        let alice = OlmMachine::new(user_id!("@a:s.co"), device_id!("ALICE")).await;
2084        let alice_key = alice.identity_keys().curve25519;
2085        let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("BOB")).await;
2086
2087        let room_id = room_id!("!room1:localhost");
2088
2089        let session_key1 = "AgAAAAC2XHVzsMBKs4QCRElJ92CJKyGtknCSC8HY7cQ7UYwndMKLQAejXLh5UA0l6s736mgctcUMNvELScUWrObdflrHo+vth/gWreXOaCnaSxmyjjKErQwyIYTkUfqbHy40RJfEesLwnN23on9XAkch/iy8R2+Jz7B8zfG01f2Ow2SxPQFnAndcO1ZSD2GmXgedy6n4B20MWI1jGP2wiexOWbFSya8DO/VxC9m5+/mF+WwYqdpKn9g4Y05Yw4uz7cdjTc3rXm7xK+8E7hI//5QD1nHPvuKYbjjM9u2JSL+Bzp61Cw";
2090        let session_key2 = "AgAAAAC1BXreFTUQQSBGekTEuYxhdytRKyv4JgDGcG+VOBYdPNGgs807SdibCGJky4lJ3I+7ZDGHoUzZPZP/4ogGu4kxni0PWdtWuN7+5zsuamgoFF/BkaGeUUGv6kgIkx8pyPpM5SASTUEP9bN2loDSpUPYwfiIqz74DgC4WQ4435sTBctYvKz8n+TDJwdLXpyT6zKljuqADAioud+s/iqx9LYn9HpbBfezZcvbg67GtE113pLrvde3IcPI5s6dNHK2onGO2B2eoaobcen18bbEDnlUGPeIivArLya7Da6us14jBQ";
2091
2092        let sessions = [
2093            create_inbound_group_session_with_visibility(
2094                &alice,
2095                room_id,
2096                &SessionKey::from_base64(session_key1).unwrap(),
2097                true,
2098            ),
2099            create_inbound_group_session_with_visibility(
2100                &alice,
2101                room_id,
2102                &SessionKey::from_base64(session_key2).unwrap(),
2103                false,
2104            ),
2105        ];
2106
2107        alice.store().save_inbound_group_sessions(&sessions).await.unwrap();
2108        let bundle = alice.store().build_room_key_bundle(room_id).await.unwrap();
2109
2110        bob.store()
2111            .receive_room_key_bundle(
2112                &StoredRoomKeyBundleData {
2113                    sender_user: alice.user_id().to_owned(),
2114                    sender_key: alice_key,
2115                    sender_data: SenderData::sender_verified(
2116                        alice.user_id(),
2117                        device_id!("ALICE"),
2118                        alice.identity_keys().ed25519,
2119                    ),
2120
2121                    bundle_data: RoomKeyBundleContent {
2122                        room_id: room_id.to_owned(),
2123                        // This isn't used at all in the method call, so we can fill it with
2124                        // garbage.
2125                        file: EncryptedFileInit {
2126                            url: owned_mxc_uri!("mxc://example.com/0"),
2127                            key: JsonWebKeyInit {
2128                                kty: "oct".to_owned(),
2129                                key_ops: vec!["encrypt".to_owned(), "decrypt".to_owned()],
2130                                alg: "A256CTR.".to_owned(),
2131                                k: Base64::new(vec![0u8; 128]),
2132                                ext: true,
2133                            }
2134                            .into(),
2135                            iv: Base64::new(vec![0u8; 128]),
2136                            hashes: vec![("sha256".to_owned(), Base64::new(vec![0u8; 128]))]
2137                                .into_iter()
2138                                .collect(),
2139                            v: "v2".to_owned(),
2140                        }
2141                        .into(),
2142                    },
2143                },
2144                bundle,
2145                |_, _| {},
2146            )
2147            .await
2148            .unwrap();
2149
2150        // The room key should be imported successfully
2151        let imported_sessions =
2152            bob.store().get_inbound_group_sessions_by_room_id(room_id).await.unwrap();
2153
2154        assert_eq!(imported_sessions.len(), 1);
2155        assert_eq!(imported_sessions[0].room_id(), room_id);
2156
2157        assert_matches!(
2158            bob.store()
2159                .get_withheld_info(room_id, sessions[1].session_id())
2160                .await
2161                .unwrap()
2162                .expect("Withheld info should be present in the store."),
2163            RoomKeyWithheldEntry {
2164                #[cfg(not(feature = "experimental-algorithms"))]
2165                content: RoomKeyWithheldContent::MegolmV1AesSha2(
2166                    MegolmV1AesSha2WithheldContent::HistoryNotShared(_)
2167                ),
2168                #[cfg(feature = "experimental-algorithms")]
2169                content: RoomKeyWithheldContent::MegolmV2AesSha2(
2170                    MegolmV1AesSha2WithheldContent::HistoryNotShared(_)
2171                ),
2172                ..
2173            }
2174        );
2175    }
2176
2177    /// Tests that the new store format introduced in [#5737][#5737] does not
2178    /// conflict with items already in the store that were serialised with the
2179    /// older format.
2180    ///
2181    /// [#5737]: https://github.com/matrix-org/matrix-rust-sdk/pull/5737
2182    #[async_test]
2183    async fn test_deserialize_room_key_withheld_entry_from_to_device_event() {
2184        let entry: RoomKeyWithheldEntry = serde_json::from_value(json!(
2185            {
2186              "content": {
2187                "algorithm": "m.megolm.v1.aes-sha2",
2188                "code": "m.unauthorised",
2189                "from_device": "ALICE",
2190                "reason": "You are not authorised to read the message.",
2191                "room_id": "!roomid:s.co",
2192                "sender_key": "7hIcOrEroXYdzjtCBvBjUiqvT0Me7g+ymeXqoc65RS0",
2193                "session_id": "session123"
2194              },
2195              "sender": "@alice:s.co",
2196              "type": "m.room_key.withheld"
2197            }
2198        ))
2199        .unwrap();
2200
2201        assert_matches!(
2202            entry,
2203            RoomKeyWithheldEntry {
2204                sender,
2205                content: RoomKeyWithheldContent::MegolmV1AesSha2(
2206                    MegolmV1AesSha2WithheldContent::Unauthorised(withheld_content,)
2207                ),
2208            }
2209        );
2210
2211        assert_eq!(sender, "@alice:s.co");
2212        assert_eq!(withheld_content.room_id, "!roomid:s.co");
2213        assert_eq!(withheld_content.session_id, "session123");
2214        assert_eq!(
2215            withheld_content.sender_key.to_base64(),
2216            "7hIcOrEroXYdzjtCBvBjUiqvT0Me7g+ymeXqoc65RS0"
2217        );
2218        assert_eq!(withheld_content.from_device, Some(owned_device_id!("ALICE")));
2219    }
2220
2221    /// Create an inbound Megolm session for the given room.
2222    ///
2223    /// `olm_machine` is used to set the `sender_key` and `signing_key`
2224    /// fields of the resultant session.
2225    ///
2226    /// The encryption algorithm used for the session depends on the
2227    /// `experimental-algorithms` feature flag:
2228    ///
2229    /// - When not set, the session uses `m.megolm.v1.aes-sha2`.
2230    /// - When set, the session uses `m.megolm.v2.aes-sha2`.
2231    fn create_inbound_group_session_with_visibility(
2232        olm_machine: &OlmMachine,
2233        room_id: &RoomId,
2234        session_key: &SessionKey,
2235        shared_history: bool,
2236    ) -> InboundGroupSession {
2237        let identity_keys = &olm_machine.store().static_account().identity_keys;
2238        InboundGroupSession::new(
2239            identity_keys.curve25519,
2240            identity_keys.ed25519,
2241            room_id,
2242            session_key,
2243            SenderData::unknown(),
2244            #[cfg(not(feature = "experimental-algorithms"))]
2245            EventEncryptionAlgorithm::MegolmV1AesSha2,
2246            #[cfg(feature = "experimental-algorithms")]
2247            EventEncryptionAlgorithm::MegolmV2AesSha2,
2248            None,
2249            shared_history,
2250        )
2251        .unwrap()
2252    }
2253}