matrix_sdk_crypto/store/
mod.rs

1// Copyright 2020 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Types and traits to implement the storage layer for the [`OlmMachine`]
16//!
17//! The storage layer for the [`OlmMachine`] can be customized using a trait.
18//! Implementing your own [`CryptoStore`]
19//!
20//! An in-memory only store is provided as well as a SQLite-based one, depending
21//! on your needs and targets a custom store may be implemented, e.g. for
22//! `wasm-unknown-unknown` an indexeddb store would be needed
23//!
24//! ```
25//! # use std::sync::Arc;
26//! # use matrix_sdk_crypto::{
27//! #     OlmMachine,
28//! #     store::MemoryStore,
29//! # };
30//! # use ruma::{device_id, user_id};
31//! # let user_id = user_id!("@example:localhost");
32//! # let device_id = device_id!("TEST");
33//! let store = Arc::new(MemoryStore::new());
34//!
35//! let machine = OlmMachine::with_store(user_id, device_id, store, None);
36//! ```
37//!
38//! [`OlmMachine`]: /matrix_sdk_crypto/struct.OlmMachine.html
39//! [`CryptoStore`]: trait.Cryptostore.html
40
41use std::{
42    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
43    fmt::Debug,
44    ops::Deref,
45    pin::pin,
46    sync::{atomic::Ordering, Arc},
47    time::Duration,
48};
49
50use as_variant::as_variant;
51use futures_core::Stream;
52use futures_util::StreamExt;
53use matrix_sdk_common::locks::RwLock as StdRwLock;
54use ruma::{
55    encryption::KeyUsage, events::secret::request::SecretName, DeviceId, OwnedDeviceId,
56    OwnedRoomId, OwnedUserId, RoomId, UserId,
57};
58use serde::{de::DeserializeOwned, Deserialize, Serialize};
59use thiserror::Error;
60use tokio::sync::{Mutex, MutexGuard, Notify, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
61use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
62use tracing::{info, warn};
63use vodozemac::{base64_encode, megolm::SessionOrdering, Curve25519PublicKey};
64use zeroize::{Zeroize, ZeroizeOnDrop};
65
66#[cfg(doc)]
67use crate::{backups::BackupMachine, identities::OwnUserIdentity};
68use crate::{
69    gossiping::GossippedSecret,
70    identities::{user::UserIdentity, Device, DeviceData, UserDevices, UserIdentityData},
71    olm::{
72        Account, ExportedRoomKey, InboundGroupSession, OlmMessageHash, OutboundGroupSession,
73        PrivateCrossSigningIdentity, Session, StaticAccountData,
74    },
75    types::{
76        events::room_key_withheld::RoomKeyWithheldEvent, BackupSecrets, CrossSigningSecrets,
77        EventEncryptionAlgorithm, MegolmBackupV1Curve25519AesSha2Secrets, SecretsBundle,
78    },
79    verification::VerificationMachine,
80    CrossSigningStatus, OwnUserIdentityData, RoomKeyImportResult,
81};
82
83pub mod caches;
84mod crypto_store_wrapper;
85mod error;
86mod memorystore;
87mod traits;
88
89#[cfg(any(test, feature = "testing"))]
90#[macro_use]
91#[allow(missing_docs)]
92pub mod integration_tests;
93
94use caches::{SequenceNumber, UsersForKeyQuery};
95pub(crate) use crypto_store_wrapper::CryptoStoreWrapper;
96pub use error::{CryptoStoreError, Result};
97use matrix_sdk_common::{
98    deserialized_responses::WithheldCode, store_locks::CrossProcessStoreLock, timeout::timeout,
99};
100pub use memorystore::MemoryStore;
101pub use traits::{CryptoStore, DynCryptoStore, IntoCryptoStore};
102
103use crate::types::{
104    events::room_key_withheld::RoomKeyWithheldContent, room_history::RoomKeyBundle,
105};
106pub use crate::{
107    dehydrated_devices::DehydrationError,
108    gossiping::{GossipRequest, SecretInfo},
109};
110
111/// A wrapper for our CryptoStore trait object.
112///
113/// This is needed because we want to have a generic interface so we can
114/// store/restore objects that we can serialize. Since trait objects and
115/// generics don't mix let the CryptoStore store strings and this wrapper
116/// adds the generic interface on top.
117#[derive(Debug, Clone)]
118pub struct Store {
119    inner: Arc<StoreInner>,
120}
121
122#[derive(Debug, Default)]
123pub(crate) struct KeyQueryManager {
124    /// Record of the users that are waiting for a /keys/query.
125    users_for_key_query: Mutex<UsersForKeyQuery>,
126
127    /// Notifier that is triggered each time an update is received for a user.
128    users_for_key_query_notify: Notify,
129}
130
131impl KeyQueryManager {
132    pub async fn synced<'a>(&'a self, cache: &'a StoreCache) -> Result<SyncedKeyQueryManager<'a>> {
133        self.ensure_sync_tracked_users(cache).await?;
134        Ok(SyncedKeyQueryManager { cache, manager: self })
135    }
136
137    /// Load the list of users for whom we are tracking their device lists and
138    /// fill out our caches.
139    ///
140    /// This method ensures that we're only going to load the users from the
141    /// actual [`CryptoStore`] once, it will also make sure that any
142    /// concurrent calls to this method get deduplicated.
143    async fn ensure_sync_tracked_users(&self, cache: &StoreCache) -> Result<()> {
144        // Check if the users are loaded, and in that case do nothing.
145        let loaded = cache.loaded_tracked_users.read().await;
146        if *loaded {
147            return Ok(());
148        }
149
150        // Otherwise, we may load the users.
151        drop(loaded);
152        let mut loaded = cache.loaded_tracked_users.write().await;
153
154        // Check again if the users have been loaded, in case another call to this
155        // method loaded the tracked users between the time we tried to
156        // acquire the lock and the time we actually acquired the lock.
157        if *loaded {
158            return Ok(());
159        }
160
161        let tracked_users = cache.store.load_tracked_users().await?;
162
163        let mut query_users_lock = self.users_for_key_query.lock().await;
164        let mut tracked_users_cache = cache.tracked_users.write();
165        for user in tracked_users {
166            tracked_users_cache.insert(user.user_id.to_owned());
167
168            if user.dirty {
169                query_users_lock.insert_user(&user.user_id);
170            }
171        }
172
173        *loaded = true;
174
175        Ok(())
176    }
177
178    /// Wait for a `/keys/query` response to be received if one is expected for
179    /// the given user.
180    ///
181    /// If the given timeout elapses, the method will stop waiting and return
182    /// `UserKeyQueryResult::TimeoutExpired`.
183    ///
184    /// Requires a [`StoreCacheGuard`] to make sure the users for which a key
185    /// query is pending are up to date, but doesn't hold on to it
186    /// thereafter: the lock is short-lived in this case.
187    pub async fn wait_if_user_key_query_pending(
188        &self,
189        cache: StoreCacheGuard,
190        timeout_duration: Duration,
191        user: &UserId,
192    ) -> Result<UserKeyQueryResult> {
193        {
194            // Drop the cache early, so we don't keep it while waiting (since writing the
195            // results requires to write in the cache, thus take another lock).
196            self.ensure_sync_tracked_users(&cache).await?;
197            drop(cache);
198        }
199
200        let mut users_for_key_query = self.users_for_key_query.lock().await;
201        let Some(waiter) = users_for_key_query.maybe_register_waiting_task(user) else {
202            return Ok(UserKeyQueryResult::WasNotPending);
203        };
204
205        let wait_for_completion = async {
206            while !waiter.completed.load(Ordering::Relaxed) {
207                // Register for being notified before releasing the mutex, so
208                // it's impossible to miss a wakeup between the last check for
209                // whether we should wait, and starting to wait.
210                let mut notified = pin!(self.users_for_key_query_notify.notified());
211                notified.as_mut().enable();
212                drop(users_for_key_query);
213
214                // Wait for a notification
215                notified.await;
216
217                // Reclaim the lock before checking the flag to avoid races
218                // when two notifications happen right after each other and the
219                // second one sets the flag we want to wait for.
220                users_for_key_query = self.users_for_key_query.lock().await;
221            }
222        };
223
224        match timeout(Box::pin(wait_for_completion), timeout_duration).await {
225            Err(_) => {
226                warn!(
227                    user_id = ?user,
228                    "The user has a pending `/keys/query` request which did \
229                    not finish yet, some devices might be missing."
230                );
231
232                Ok(UserKeyQueryResult::TimeoutExpired)
233            }
234            _ => Ok(UserKeyQueryResult::WasPending),
235        }
236    }
237}
238
239pub(crate) struct SyncedKeyQueryManager<'a> {
240    cache: &'a StoreCache,
241    manager: &'a KeyQueryManager,
242}
243
244impl SyncedKeyQueryManager<'_> {
245    /// Add entries to the list of users being tracked for device changes
246    ///
247    /// Any users not already on the list are flagged as awaiting a key query.
248    /// Users that were already in the list are unaffected.
249    pub async fn update_tracked_users(&self, users: impl Iterator<Item = &UserId>) -> Result<()> {
250        let mut store_updates = Vec::new();
251        let mut key_query_lock = self.manager.users_for_key_query.lock().await;
252
253        {
254            let mut tracked_users = self.cache.tracked_users.write();
255            for user_id in users {
256                if tracked_users.insert(user_id.to_owned()) {
257                    key_query_lock.insert_user(user_id);
258                    store_updates.push((user_id, true))
259                }
260            }
261        }
262
263        self.cache.store.save_tracked_users(&store_updates).await
264    }
265
266    /// Process notifications that users have changed devices.
267    ///
268    /// This is used to handle the list of device-list updates that is received
269    /// from the `/sync` response. Any users *whose device lists we are
270    /// tracking* are flagged as needing a key query. Users whose devices we
271    /// are not tracking are ignored.
272    pub async fn mark_tracked_users_as_changed(
273        &self,
274        users: impl Iterator<Item = &UserId>,
275    ) -> Result<()> {
276        let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
277        let mut key_query_lock = self.manager.users_for_key_query.lock().await;
278
279        {
280            let tracked_users = &self.cache.tracked_users.read();
281            for user_id in users {
282                if tracked_users.contains(user_id) {
283                    key_query_lock.insert_user(user_id);
284                    store_updates.push((user_id, true));
285                }
286            }
287        }
288
289        self.cache.store.save_tracked_users(&store_updates).await
290    }
291
292    /// Flag that the given users devices are now up-to-date.
293    ///
294    /// This is called after processing the response to a /keys/query request.
295    /// Any users whose device lists we are tracking are removed from the
296    /// list of those pending a /keys/query.
297    pub async fn mark_tracked_users_as_up_to_date(
298        &self,
299        users: impl Iterator<Item = &UserId>,
300        sequence_number: SequenceNumber,
301    ) -> Result<()> {
302        let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
303        let mut key_query_lock = self.manager.users_for_key_query.lock().await;
304
305        {
306            let tracked_users = self.cache.tracked_users.read();
307            for user_id in users {
308                if tracked_users.contains(user_id) {
309                    let clean = key_query_lock.maybe_remove_user(user_id, sequence_number);
310                    store_updates.push((user_id, !clean));
311                }
312            }
313        }
314
315        self.cache.store.save_tracked_users(&store_updates).await?;
316        // wake up any tasks that may have been waiting for updates
317        self.manager.users_for_key_query_notify.notify_waiters();
318
319        Ok(())
320    }
321
322    /// Get the set of users that has the outdate/dirty flag set for their list
323    /// of devices.
324    ///
325    /// This set should be included in a `/keys/query` request which will update
326    /// the device list.
327    ///
328    /// # Returns
329    ///
330    /// A pair `(users, sequence_number)`, where `users` is the list of users to
331    /// be queried, and `sequence_number` is the current sequence number,
332    /// which should be returned in `mark_tracked_users_as_up_to_date`.
333    pub async fn users_for_key_query(&self) -> (HashSet<OwnedUserId>, SequenceNumber) {
334        self.manager.users_for_key_query.lock().await.users_for_key_query()
335    }
336
337    /// See the docs for [`crate::OlmMachine::tracked_users()`].
338    pub fn tracked_users(&self) -> HashSet<OwnedUserId> {
339        self.cache.tracked_users.read().iter().cloned().collect()
340    }
341
342    /// Mark the given user as being tracked for device lists, and mark that it
343    /// has an outdated device list.
344    ///
345    /// This means that the user will be considered for a `/keys/query` request
346    /// next time [`Store::users_for_key_query()`] is called.
347    pub async fn mark_user_as_changed(&self, user: &UserId) -> Result<()> {
348        self.manager.users_for_key_query.lock().await.insert_user(user);
349        self.cache.tracked_users.write().insert(user.to_owned());
350
351        self.cache.store.save_tracked_users(&[(user, true)]).await
352    }
353}
354
355#[derive(Debug)]
356pub(crate) struct StoreCache {
357    store: Arc<CryptoStoreWrapper>,
358    tracked_users: StdRwLock<BTreeSet<OwnedUserId>>,
359    loaded_tracked_users: RwLock<bool>,
360    account: Mutex<Option<Account>>,
361}
362
363impl StoreCache {
364    pub(crate) fn store_wrapper(&self) -> &CryptoStoreWrapper {
365        self.store.as_ref()
366    }
367
368    /// Returns a reference to the `Account`.
369    ///
370    /// Either load the account from the cache, or the store if missing from
371    /// the cache.
372    ///
373    /// Note there should always be an account stored at least in the store, so
374    /// this doesn't return an `Option`.
375    ///
376    /// Note: this method should remain private, otherwise it's possible to ask
377    /// for a `StoreTransaction`, then get the `StoreTransaction::cache()`
378    /// and thus have two different live copies of the `Account` at once.
379    async fn account(&self) -> Result<impl Deref<Target = Account> + '_> {
380        let mut guard = self.account.lock().await;
381        if guard.is_some() {
382            Ok(MutexGuard::map(guard, |acc| acc.as_mut().unwrap()))
383        } else {
384            match self.store.load_account().await? {
385                Some(account) => {
386                    *guard = Some(account);
387                    Ok(MutexGuard::map(guard, |acc| acc.as_mut().unwrap()))
388                }
389                None => Err(CryptoStoreError::AccountUnset),
390            }
391        }
392    }
393}
394
395/// Read-only store cache guard.
396///
397/// This type should hold all the methods that are available when the cache is
398/// borrowed in read-only mode, while all the write operations on those fields
399/// should happen as part of a `StoreTransaction`.
400pub(crate) struct StoreCacheGuard {
401    cache: OwnedRwLockReadGuard<StoreCache>,
402    // TODO: (bnjbvr, #2624) add cross-process lock guard here.
403}
404
405impl StoreCacheGuard {
406    /// Returns a reference to the `Account`.
407    ///
408    /// Either load the account from the cache, or the store if missing from
409    /// the cache.
410    ///
411    /// Note there should always be an account stored at least in the store, so
412    /// this doesn't return an `Option`.
413    pub async fn account(&self) -> Result<impl Deref<Target = Account> + '_> {
414        self.cache.account().await
415    }
416}
417
418impl Deref for StoreCacheGuard {
419    type Target = StoreCache;
420
421    fn deref(&self) -> &Self::Target {
422        &self.cache
423    }
424}
425
426/// A temporary transaction (that implies a write) to the underlying store.
427#[allow(missing_debug_implementations)]
428pub struct StoreTransaction {
429    store: Store,
430    changes: PendingChanges,
431    // TODO hold onto the cross-process crypto store lock + cache.
432    cache: OwnedRwLockWriteGuard<StoreCache>,
433}
434
435impl StoreTransaction {
436    /// Starts a new `StoreTransaction`.
437    async fn new(store: Store) -> Self {
438        let cache = store.inner.cache.clone();
439
440        Self { store, changes: PendingChanges::default(), cache: cache.clone().write_owned().await }
441    }
442
443    pub(crate) fn cache(&self) -> &StoreCache {
444        &self.cache
445    }
446
447    /// Returns a reference to the current `Store`.
448    pub fn store(&self) -> &Store {
449        &self.store
450    }
451
452    /// Gets a `Account` for update.
453    ///
454    /// Note: since it's guaranteed that one can't have both a
455    /// `StoreTransaction` and a `StoreCacheGuard` at runtime (since the
456    /// underlying `StoreCache` is guarded by a `RwLock` mutex), this ensures
457    /// that we can't have two copies of an `Account` alive at the same time.
458    pub async fn account(&mut self) -> Result<&mut Account> {
459        if self.changes.account.is_none() {
460            // Make sure the cache loaded the account.
461            let _ = self.cache.account().await?;
462            self.changes.account = self.cache.account.lock().await.take();
463        }
464        Ok(self.changes.account.as_mut().unwrap())
465    }
466
467    /// Commits all dirty fields to the store, and maintains the cache so it
468    /// reflects the current state of the database.
469    pub async fn commit(self) -> Result<()> {
470        if self.changes.is_empty() {
471            return Ok(());
472        }
473
474        // Save changes in the database.
475        let account = self.changes.account.as_ref().map(|acc| acc.deep_clone());
476
477        self.store.save_pending_changes(self.changes).await?;
478
479        // Make the cache coherent with the database.
480        if let Some(account) = account {
481            *self.cache.account.lock().await = Some(account);
482        }
483
484        Ok(())
485    }
486}
487
488#[derive(Debug)]
489struct StoreInner {
490    identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
491    store: Arc<CryptoStoreWrapper>,
492
493    /// In-memory cache for the current crypto store.
494    ///
495    /// ⚠ Must remain private.
496    cache: Arc<RwLock<StoreCache>>,
497
498    verification_machine: VerificationMachine,
499
500    /// Static account data that never changes (and thus can be loaded once and
501    /// for all when creating the store).
502    static_account: StaticAccountData,
503}
504
505/// Aggregated changes to be saved in the database.
506///
507/// This is an update version of `Changes` that will replace it as #2624
508/// progresses.
509// If you ever add a field here, make sure to update `Changes::is_empty` too.
510#[derive(Default, Debug)]
511#[allow(missing_docs)]
512pub struct PendingChanges {
513    pub account: Option<Account>,
514}
515
516impl PendingChanges {
517    /// Are there any changes stored or is this an empty `Changes` struct?
518    pub fn is_empty(&self) -> bool {
519        self.account.is_none()
520    }
521}
522
523/// Aggregated changes to be saved in the database.
524// If you ever add a field here, make sure to update `Changes::is_empty` too.
525#[derive(Default, Debug)]
526#[allow(missing_docs)]
527pub struct Changes {
528    pub private_identity: Option<PrivateCrossSigningIdentity>,
529    pub backup_version: Option<String>,
530    pub backup_decryption_key: Option<BackupDecryptionKey>,
531    pub dehydrated_device_pickle_key: Option<DehydratedDeviceKey>,
532    pub sessions: Vec<Session>,
533    pub message_hashes: Vec<OlmMessageHash>,
534    pub inbound_group_sessions: Vec<InboundGroupSession>,
535    pub outbound_group_sessions: Vec<OutboundGroupSession>,
536    pub key_requests: Vec<GossipRequest>,
537    pub identities: IdentityChanges,
538    pub devices: DeviceChanges,
539    /// Stores when a `m.room_key.withheld` is received
540    pub withheld_session_info: BTreeMap<OwnedRoomId, BTreeMap<String, RoomKeyWithheldEvent>>,
541    pub room_settings: HashMap<OwnedRoomId, RoomSettings>,
542    pub secrets: Vec<GossippedSecret>,
543    pub next_batch_token: Option<String>,
544}
545
546/// A user for which we are tracking the list of devices.
547#[derive(Clone, Debug, Serialize, Deserialize)]
548pub struct TrackedUser {
549    /// The user ID of the user.
550    pub user_id: OwnedUserId,
551    /// The outdate/dirty flag of the user, remembers if the list of devices for
552    /// the user is considered to be out of date. If the list of devices is
553    /// out of date, a `/keys/query` request should be sent out for this
554    /// user.
555    pub dirty: bool,
556}
557
558impl Changes {
559    /// Are there any changes stored or is this an empty `Changes` struct?
560    pub fn is_empty(&self) -> bool {
561        self.private_identity.is_none()
562            && self.backup_version.is_none()
563            && self.backup_decryption_key.is_none()
564            && self.dehydrated_device_pickle_key.is_none()
565            && self.sessions.is_empty()
566            && self.message_hashes.is_empty()
567            && self.inbound_group_sessions.is_empty()
568            && self.outbound_group_sessions.is_empty()
569            && self.key_requests.is_empty()
570            && self.identities.is_empty()
571            && self.devices.is_empty()
572            && self.withheld_session_info.is_empty()
573            && self.room_settings.is_empty()
574            && self.secrets.is_empty()
575            && self.next_batch_token.is_none()
576    }
577}
578
579/// This struct is used to remember whether an identity has undergone a change
580/// or remains the same as the one we already know about.
581///
582/// When the homeserver informs us of a potential change in a user's identity or
583/// device during a `/sync` response, it triggers a `/keys/query` request from
584/// our side. In response to this query, the server provides a comprehensive
585/// snapshot of all the user's devices and identities.
586///
587/// Our responsibility is to discern whether a device or identity is new,
588/// changed, or unchanged.
589#[derive(Debug, Clone, Default)]
590#[allow(missing_docs)]
591pub struct IdentityChanges {
592    pub new: Vec<UserIdentityData>,
593    pub changed: Vec<UserIdentityData>,
594    pub unchanged: Vec<UserIdentityData>,
595}
596
597impl IdentityChanges {
598    fn is_empty(&self) -> bool {
599        self.new.is_empty() && self.changed.is_empty()
600    }
601
602    /// Convert the vectors contained in the [`IdentityChanges`] into
603    /// three maps from user id to user identity (new, updated, unchanged).
604    fn into_maps(
605        self,
606    ) -> (
607        BTreeMap<OwnedUserId, UserIdentityData>,
608        BTreeMap<OwnedUserId, UserIdentityData>,
609        BTreeMap<OwnedUserId, UserIdentityData>,
610    ) {
611        let new: BTreeMap<_, _> = self
612            .new
613            .into_iter()
614            .map(|identity| (identity.user_id().to_owned(), identity))
615            .collect();
616
617        let changed: BTreeMap<_, _> = self
618            .changed
619            .into_iter()
620            .map(|identity| (identity.user_id().to_owned(), identity))
621            .collect();
622
623        let unchanged: BTreeMap<_, _> = self
624            .unchanged
625            .into_iter()
626            .map(|identity| (identity.user_id().to_owned(), identity))
627            .collect();
628
629        (new, changed, unchanged)
630    }
631}
632
633#[derive(Debug, Clone, Default)]
634#[allow(missing_docs)]
635pub struct DeviceChanges {
636    pub new: Vec<DeviceData>,
637    pub changed: Vec<DeviceData>,
638    pub deleted: Vec<DeviceData>,
639}
640
641/// Convert the devices and vectors contained in the [`DeviceChanges`] into
642/// a [`DeviceUpdates`] struct.
643///
644/// The [`DeviceChanges`] will contain vectors of [`DeviceData`]s which
645/// we want to convert to a [`Device`].
646fn collect_device_updates(
647    verification_machine: VerificationMachine,
648    own_identity: Option<OwnUserIdentityData>,
649    identities: IdentityChanges,
650    devices: DeviceChanges,
651) -> DeviceUpdates {
652    let mut new: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
653    let mut changed: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
654
655    let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
656
657    let map_device = |device: DeviceData| {
658        let device_owner_identity = new_identities
659            .get(device.user_id())
660            .or_else(|| changed_identities.get(device.user_id()))
661            .or_else(|| unchanged_identities.get(device.user_id()))
662            .cloned();
663
664        Device {
665            inner: device,
666            verification_machine: verification_machine.to_owned(),
667            own_identity: own_identity.to_owned(),
668            device_owner_identity,
669        }
670    };
671
672    for device in devices.new {
673        let device = map_device(device);
674
675        new.entry(device.user_id().to_owned())
676            .or_default()
677            .insert(device.device_id().to_owned(), device);
678    }
679
680    for device in devices.changed {
681        let device = map_device(device);
682
683        changed
684            .entry(device.user_id().to_owned())
685            .or_default()
686            .insert(device.device_id().to_owned(), device.to_owned());
687    }
688
689    DeviceUpdates { new, changed }
690}
691
692/// Updates about [`Device`]s which got received over the `/keys/query`
693/// endpoint.
694#[derive(Clone, Debug, Default)]
695pub struct DeviceUpdates {
696    /// The list of newly discovered devices.
697    ///
698    /// A device being in this list does not necessarily mean that the device
699    /// was just created, it just means that it's the first time we're
700    /// seeing this device.
701    pub new: BTreeMap<OwnedUserId, BTreeMap<OwnedDeviceId, Device>>,
702    /// The list of changed devices.
703    pub changed: BTreeMap<OwnedUserId, BTreeMap<OwnedDeviceId, Device>>,
704}
705
706/// Updates about [`UserIdentity`]s which got received over the `/keys/query`
707/// endpoint.
708#[derive(Clone, Debug, Default)]
709pub struct IdentityUpdates {
710    /// The list of newly discovered user identities .
711    ///
712    /// A identity being in this list does not necessarily mean that the
713    /// identity was just created, it just means that it's the first time
714    /// we're seeing this identity.
715    pub new: BTreeMap<OwnedUserId, UserIdentity>,
716    /// The list of changed identities.
717    pub changed: BTreeMap<OwnedUserId, UserIdentity>,
718    /// The list of unchanged identities.
719    pub unchanged: BTreeMap<OwnedUserId, UserIdentity>,
720}
721
722/// The private part of a backup key.
723///
724/// The private part of the key is not used on a regular basis. Rather, it is
725/// used only when we need to *recover* the backup.
726///
727/// Typically, this private key is itself encrypted and stored in server-side
728/// secret storage (SSSS), whence it can be retrieved when it is needed for a
729/// recovery operation. Alternatively, the key can be "gossiped" between devices
730/// via "secret sharing".
731#[derive(Clone, Zeroize, ZeroizeOnDrop, Deserialize, Serialize)]
732#[serde(transparent)]
733pub struct BackupDecryptionKey {
734    pub(crate) inner: Box<[u8; BackupDecryptionKey::KEY_SIZE]>,
735}
736
737impl BackupDecryptionKey {
738    /// The number of bytes the decryption key will hold.
739    pub const KEY_SIZE: usize = 32;
740
741    /// Create a new random decryption key.
742    pub fn new() -> Result<Self, rand::Error> {
743        let mut rng = rand::thread_rng();
744
745        let mut key = Box::new([0u8; Self::KEY_SIZE]);
746        rand::Fill::try_fill(key.as_mut_slice(), &mut rng)?;
747
748        Ok(Self { inner: key })
749    }
750
751    /// Export the [`BackupDecryptionKey`] as a base64 encoded string.
752    pub fn to_base64(&self) -> String {
753        base64_encode(self.inner.as_slice())
754    }
755}
756
757#[cfg(not(tarpaulin_include))]
758impl Debug for BackupDecryptionKey {
759    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
760        f.debug_tuple("BackupDecryptionKey").field(&"...").finish()
761    }
762}
763
764/// The pickle key used to safely store the dehydrated device pickle.
765///
766/// This input key material will be expanded using HKDF into an AES key, MAC
767/// key, and an initialization vector (IV).
768#[derive(Clone, Zeroize, ZeroizeOnDrop, Deserialize, Serialize)]
769#[serde(transparent)]
770pub struct DehydratedDeviceKey {
771    pub(crate) inner: Box<[u8; DehydratedDeviceKey::KEY_SIZE]>,
772}
773
774impl DehydratedDeviceKey {
775    /// The number of bytes the encryption key will hold.
776    pub const KEY_SIZE: usize = 32;
777
778    /// Generates a new random pickle key.
779    pub fn new() -> Result<Self, rand::Error> {
780        let mut rng = rand::thread_rng();
781
782        let mut key = Box::new([0u8; Self::KEY_SIZE]);
783        rand::Fill::try_fill(key.as_mut_slice(), &mut rng)?;
784
785        Ok(Self { inner: key })
786    }
787
788    /// Creates a new dehydration pickle key from the given slice.
789    ///
790    /// Fail if the slice length is not 32.
791    pub fn from_slice(slice: &[u8]) -> Result<Self, DehydrationError> {
792        if slice.len() == 32 {
793            let mut key = Box::new([0u8; 32]);
794            key.copy_from_slice(slice);
795            Ok(DehydratedDeviceKey { inner: key })
796        } else {
797            Err(DehydrationError::PickleKeyLength(slice.len()))
798        }
799    }
800
801    /// Creates a dehydration pickle key from the given bytes.
802    pub fn from_bytes(raw_key: &[u8; 32]) -> Self {
803        let mut inner = Box::new([0u8; Self::KEY_SIZE]);
804        inner.copy_from_slice(raw_key);
805
806        Self { inner }
807    }
808
809    /// Export the [`DehydratedDeviceKey`] as a base64 encoded string.
810    pub fn to_base64(&self) -> String {
811        base64_encode(self.inner.as_slice())
812    }
813}
814
815impl From<&[u8; 32]> for DehydratedDeviceKey {
816    fn from(value: &[u8; 32]) -> Self {
817        DehydratedDeviceKey { inner: Box::new(*value) }
818    }
819}
820
821impl From<DehydratedDeviceKey> for Vec<u8> {
822    fn from(key: DehydratedDeviceKey) -> Self {
823        key.inner.to_vec()
824    }
825}
826
827#[cfg(not(tarpaulin_include))]
828impl Debug for DehydratedDeviceKey {
829    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
830        f.debug_tuple("DehydratedDeviceKey").field(&"...").finish()
831    }
832}
833
834impl DeviceChanges {
835    /// Merge the given `DeviceChanges` into this instance of `DeviceChanges`.
836    pub fn extend(&mut self, other: DeviceChanges) {
837        self.new.extend(other.new);
838        self.changed.extend(other.changed);
839        self.deleted.extend(other.deleted);
840    }
841
842    fn is_empty(&self) -> bool {
843        self.new.is_empty() && self.changed.is_empty() && self.deleted.is_empty()
844    }
845}
846
847/// Struct holding info about how many room keys the store has.
848#[derive(Debug, Clone, Default)]
849pub struct RoomKeyCounts {
850    /// The total number of room keys the store has.
851    pub total: usize,
852    /// The number of backed up room keys the store has.
853    pub backed_up: usize,
854}
855
856/// Stored versions of the backup keys.
857#[derive(Default, Clone, Debug)]
858pub struct BackupKeys {
859    /// The key used to decrypt backed up room keys.
860    pub decryption_key: Option<BackupDecryptionKey>,
861    /// The version that we are using for backups.
862    pub backup_version: Option<String>,
863}
864
865/// A struct containing private cross signing keys that can be backed up or
866/// uploaded to the secret store.
867#[derive(Default, Zeroize, ZeroizeOnDrop)]
868pub struct CrossSigningKeyExport {
869    /// The seed of the master key encoded as unpadded base64.
870    pub master_key: Option<String>,
871    /// The seed of the self signing key encoded as unpadded base64.
872    pub self_signing_key: Option<String>,
873    /// The seed of the user signing key encoded as unpadded base64.
874    pub user_signing_key: Option<String>,
875}
876
877#[cfg(not(tarpaulin_include))]
878impl Debug for CrossSigningKeyExport {
879    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
880        f.debug_struct("CrossSigningKeyExport")
881            .field("master_key", &self.master_key.is_some())
882            .field("self_signing_key", &self.self_signing_key.is_some())
883            .field("user_signing_key", &self.user_signing_key.is_some())
884            .finish_non_exhaustive()
885    }
886}
887
888/// Error describing what went wrong when importing private cross signing keys
889/// or the key backup key.
890#[derive(Debug, Error)]
891pub enum SecretImportError {
892    /// The key that we tried to import was invalid.
893    #[error(transparent)]
894    Key(#[from] vodozemac::KeyError),
895    /// The public key of the imported private key doesn't match to the public
896    /// key that was uploaded to the server.
897    #[error(
898        "The public key of the imported private key doesn't match to the \
899            public key that was uploaded to the server"
900    )]
901    MismatchedPublicKeys,
902    /// The new version of the identity couldn't be stored.
903    #[error(transparent)]
904    Store(#[from] CryptoStoreError),
905}
906
907/// Error describing what went wrong when exporting a [`SecretsBundle`].
908///
909/// The [`SecretsBundle`] can only be exported if we have all cross-signing
910/// private keys in the store.
911#[derive(Debug, Error)]
912pub enum SecretsBundleExportError {
913    /// The store itself had an error.
914    #[error(transparent)]
915    Store(#[from] CryptoStoreError),
916    /// We're missing one or multiple cross-signing keys.
917    #[error("The store is missing one or multiple cross-signing keys")]
918    MissingCrossSigningKey(KeyUsage),
919    /// We're missing all cross-signing keys.
920    #[error("The store doesn't contain any cross-signing keys")]
921    MissingCrossSigningKeys,
922    /// We have a backup key stored, but we don't know the version of the
923    /// backup.
924    #[error("The store contains a backup key, but no backup version")]
925    MissingBackupVersion,
926}
927
928/// Result type telling us if a `/keys/query` response was expected for a given
929/// user.
930#[derive(Clone, Copy, Debug, PartialEq, Eq)]
931pub(crate) enum UserKeyQueryResult {
932    WasPending,
933    WasNotPending,
934
935    /// A query was pending, but we gave up waiting
936    TimeoutExpired,
937}
938
939/// Room encryption settings which are modified by state events or user options
940#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
941pub struct RoomSettings {
942    /// The encryption algorithm that should be used in the room.
943    pub algorithm: EventEncryptionAlgorithm,
944
945    /// Should untrusted devices receive the room key, or should they be
946    /// excluded from the conversation.
947    pub only_allow_trusted_devices: bool,
948
949    /// The maximum time an encryption session should be used for, before it is
950    /// rotated.
951    pub session_rotation_period: Option<Duration>,
952
953    /// The maximum number of messages an encryption session should be used for,
954    /// before it is rotated.
955    pub session_rotation_period_messages: Option<usize>,
956}
957
958impl Default for RoomSettings {
959    fn default() -> Self {
960        Self {
961            algorithm: EventEncryptionAlgorithm::MegolmV1AesSha2,
962            only_allow_trusted_devices: false,
963            session_rotation_period: None,
964            session_rotation_period_messages: None,
965        }
966    }
967}
968
969/// Information on a room key that has been received or imported.
970#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
971pub struct RoomKeyInfo {
972    /// The [messaging algorithm] that this key is used for. Will be one of the
973    /// `m.megolm.*` algorithms.
974    ///
975    /// [messaging algorithm]: https://spec.matrix.org/v1.6/client-server-api/#messaging-algorithms
976    pub algorithm: EventEncryptionAlgorithm,
977
978    /// The room where the key is used.
979    pub room_id: OwnedRoomId,
980
981    /// The Curve25519 key of the device which initiated the session originally.
982    pub sender_key: Curve25519PublicKey,
983
984    /// The ID of the session that the key is for.
985    pub session_id: String,
986}
987
988impl From<&InboundGroupSession> for RoomKeyInfo {
989    fn from(group_session: &InboundGroupSession) -> Self {
990        RoomKeyInfo {
991            algorithm: group_session.algorithm().clone(),
992            room_id: group_session.room_id().to_owned(),
993            sender_key: group_session.sender_key(),
994            session_id: group_session.session_id().to_owned(),
995        }
996    }
997}
998
999/// Information on a room key that has been withheld
1000#[derive(Clone, Debug, Deserialize, Serialize)]
1001pub struct RoomKeyWithheldInfo {
1002    /// The room where the key is used.
1003    pub room_id: OwnedRoomId,
1004
1005    /// The ID of the session that the key is for.
1006    pub session_id: String,
1007
1008    /// The `m.room_key.withheld` event that notified us that the key is being
1009    /// withheld.
1010    pub withheld_event: RoomKeyWithheldEvent,
1011}
1012
1013impl Store {
1014    /// Create a new Store.
1015    pub(crate) fn new(
1016        account: StaticAccountData,
1017        identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
1018        store: Arc<CryptoStoreWrapper>,
1019        verification_machine: VerificationMachine,
1020    ) -> Self {
1021        Self {
1022            inner: Arc::new(StoreInner {
1023                static_account: account,
1024                identity,
1025                store: store.clone(),
1026                verification_machine,
1027                cache: Arc::new(RwLock::new(StoreCache {
1028                    store,
1029                    tracked_users: Default::default(),
1030                    loaded_tracked_users: Default::default(),
1031                    account: Default::default(),
1032                })),
1033            }),
1034        }
1035    }
1036
1037    /// UserId associated with this store
1038    pub(crate) fn user_id(&self) -> &UserId {
1039        &self.inner.static_account.user_id
1040    }
1041
1042    /// DeviceId associated with this store
1043    pub(crate) fn device_id(&self) -> &DeviceId {
1044        self.inner.verification_machine.own_device_id()
1045    }
1046
1047    /// The static data for the account associated with this store.
1048    pub(crate) fn static_account(&self) -> &StaticAccountData {
1049        &self.inner.static_account
1050    }
1051
1052    pub(crate) async fn cache(&self) -> Result<StoreCacheGuard> {
1053        // TODO: (bnjbvr, #2624) If configured with a cross-process lock:
1054        // - try to take the lock,
1055        // - if acquired, look if another process touched the underlying storage,
1056        // - if yes, reload everything; if no, return current cache
1057        Ok(StoreCacheGuard { cache: self.inner.cache.clone().read_owned().await })
1058    }
1059
1060    pub(crate) async fn transaction(&self) -> StoreTransaction {
1061        StoreTransaction::new(self.clone()).await
1062    }
1063
1064    // Note: bnjbvr lost against borrowck here. Ideally, the `F` parameter would
1065    // take a `&StoreTransaction`, but callers didn't quite like that.
1066    pub(crate) async fn with_transaction<
1067        T,
1068        Fut: futures_core::Future<Output = Result<(StoreTransaction, T), crate::OlmError>>,
1069        F: FnOnce(StoreTransaction) -> Fut,
1070    >(
1071        &self,
1072        func: F,
1073    ) -> Result<T, crate::OlmError> {
1074        let tr = self.transaction().await;
1075        let (tr, res) = func(tr).await?;
1076        tr.commit().await?;
1077        Ok(res)
1078    }
1079
1080    #[cfg(test)]
1081    /// test helper to reset the cross signing identity
1082    pub(crate) async fn reset_cross_signing_identity(&self) {
1083        self.inner.identity.lock().await.reset();
1084    }
1085
1086    /// PrivateCrossSigningIdentity associated with this store
1087    pub(crate) fn private_identity(&self) -> Arc<Mutex<PrivateCrossSigningIdentity>> {
1088        self.inner.identity.clone()
1089    }
1090
1091    /// Save the given Sessions to the store
1092    pub(crate) async fn save_sessions(&self, sessions: &[Session]) -> Result<()> {
1093        let changes = Changes { sessions: sessions.to_vec(), ..Default::default() };
1094
1095        self.save_changes(changes).await
1096    }
1097
1098    pub(crate) async fn get_sessions(
1099        &self,
1100        sender_key: &str,
1101    ) -> Result<Option<Arc<Mutex<Vec<Session>>>>> {
1102        self.inner.store.get_sessions(sender_key).await
1103    }
1104
1105    pub(crate) async fn save_changes(&self, changes: Changes) -> Result<()> {
1106        self.inner.store.save_changes(changes).await
1107    }
1108
1109    /// Compare the given `InboundGroupSession` with an existing session we have
1110    /// in the store.
1111    ///
1112    /// This method returns `SessionOrdering::Better` if the given session is
1113    /// better than the one we already have or if we don't have such a
1114    /// session in the store.
1115    pub(crate) async fn compare_group_session(
1116        &self,
1117        session: &InboundGroupSession,
1118    ) -> Result<SessionOrdering> {
1119        let old_session = self
1120            .inner
1121            .store
1122            .get_inbound_group_session(session.room_id(), session.session_id())
1123            .await?;
1124
1125        Ok(if let Some(old_session) = old_session {
1126            session.compare(&old_session).await
1127        } else {
1128            SessionOrdering::Better
1129        })
1130    }
1131
1132    #[cfg(test)]
1133    /// Testing helper to allow to save only a set of devices
1134    pub(crate) async fn save_device_data(&self, devices: &[DeviceData]) -> Result<()> {
1135        let changes = Changes {
1136            devices: DeviceChanges { changed: devices.to_vec(), ..Default::default() },
1137            ..Default::default()
1138        };
1139
1140        self.save_changes(changes).await
1141    }
1142
1143    /// Convenience helper to persist an array of [`InboundGroupSession`]s.
1144    pub(crate) async fn save_inbound_group_sessions(
1145        &self,
1146        sessions: &[InboundGroupSession],
1147    ) -> Result<()> {
1148        let changes = Changes { inbound_group_sessions: sessions.to_vec(), ..Default::default() };
1149
1150        self.save_changes(changes).await
1151    }
1152
1153    /// Get the display name of our own device.
1154    pub(crate) async fn device_display_name(&self) -> Result<Option<String>, CryptoStoreError> {
1155        Ok(self
1156            .inner
1157            .store
1158            .get_device(self.user_id(), self.device_id())
1159            .await?
1160            .and_then(|d| d.display_name().map(|d| d.to_owned())))
1161    }
1162
1163    /// Get the device data for the given [`UserId`] and [`DeviceId`].
1164    ///
1165    /// *Note*: This method will include our own device which is always present
1166    /// in the store.
1167    pub(crate) async fn get_device_data(
1168        &self,
1169        user_id: &UserId,
1170        device_id: &DeviceId,
1171    ) -> Result<Option<DeviceData>> {
1172        self.inner.store.get_device(user_id, device_id).await
1173    }
1174
1175    /// Get the device data for the given [`UserId`] and [`DeviceId`].
1176    ///
1177    /// *Note*: This method will **not** include our own device.
1178    ///
1179    /// Use this method if you need a list of recipients for a given user, since
1180    /// we don't want to encrypt for our own device, otherwise take a look at
1181    /// the [`Store::get_device_data_for_user`] method.
1182    pub(crate) async fn get_device_data_for_user_filtered(
1183        &self,
1184        user_id: &UserId,
1185    ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
1186        self.inner.store.get_user_devices(user_id).await.map(|mut d| {
1187            if user_id == self.user_id() {
1188                d.remove(self.device_id());
1189            }
1190            d
1191        })
1192    }
1193
1194    /// Get the [`DeviceData`] for all the devices a user has.
1195    ///
1196    /// *Note*: This method will include our own device which is always present
1197    /// in the store.
1198    ///
1199    /// Use this method if you need to operate on or update all devices of a
1200    /// user, otherwise take a look at the
1201    /// [`Store::get_device_data_for_user_filtered`] method.
1202    pub(crate) async fn get_device_data_for_user(
1203        &self,
1204        user_id: &UserId,
1205    ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
1206        self.inner.store.get_user_devices(user_id).await
1207    }
1208
1209    /// Get a [`Device`] for the given user with the given
1210    /// [`Curve25519PublicKey`] key.
1211    ///
1212    /// *Note*: This method will include our own device which is always present
1213    /// in the store.
1214    pub(crate) async fn get_device_from_curve_key(
1215        &self,
1216        user_id: &UserId,
1217        curve_key: Curve25519PublicKey,
1218    ) -> Result<Option<Device>> {
1219        self.get_user_devices(user_id)
1220            .await
1221            .map(|d| d.devices().find(|d| d.curve25519_key() == Some(curve_key)))
1222    }
1223
1224    /// Get all devices associated with the given [`UserId`].
1225    ///
1226    /// This method is more expensive than the
1227    /// [`Store::get_device_data_for_user`] method, since a [`Device`]
1228    /// requires the [`OwnUserIdentityData`] and the [`UserIdentityData`] of the
1229    /// device owner to be fetched from the store as well.
1230    ///
1231    /// *Note*: This method will include our own device which is always present
1232    /// in the store.
1233    pub(crate) async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices> {
1234        let devices = self.get_device_data_for_user(user_id).await?;
1235
1236        let own_identity = self
1237            .inner
1238            .store
1239            .get_user_identity(self.user_id())
1240            .await?
1241            .and_then(|i| i.own().cloned());
1242        let device_owner_identity = self.inner.store.get_user_identity(user_id).await?;
1243
1244        Ok(UserDevices {
1245            inner: devices,
1246            verification_machine: self.inner.verification_machine.clone(),
1247            own_identity,
1248            device_owner_identity,
1249        })
1250    }
1251
1252    /// Get a [`Device`] for the given user with the given [`DeviceId`].
1253    ///
1254    /// This method is more expensive than the [`Store::get_device_data`] method
1255    /// since a [`Device`] requires the [`OwnUserIdentityData`] and the
1256    /// [`UserIdentityData`] of the device owner to be fetched from the
1257    /// store as well.
1258    ///
1259    /// *Note*: This method will include our own device which is always present
1260    /// in the store.
1261    pub(crate) async fn get_device(
1262        &self,
1263        user_id: &UserId,
1264        device_id: &DeviceId,
1265    ) -> Result<Option<Device>> {
1266        if let Some(device_data) = self.inner.store.get_device(user_id, device_id).await? {
1267            Ok(Some(self.wrap_device_data(device_data).await?))
1268        } else {
1269            Ok(None)
1270        }
1271    }
1272
1273    /// Create a new device using the supplied [`DeviceData`]. Normally we would
1274    /// call [`Self::get_device`] to find an existing device inside this
1275    /// store. Only call this if you have some existing DeviceData and want
1276    /// to wrap it with the extra information provided by a [`Device`].
1277    pub(crate) async fn wrap_device_data(&self, device_data: DeviceData) -> Result<Device> {
1278        let own_identity = self
1279            .inner
1280            .store
1281            .get_user_identity(self.user_id())
1282            .await?
1283            .and_then(|i| i.own().cloned());
1284
1285        let device_owner_identity =
1286            self.inner.store.get_user_identity(device_data.user_id()).await?;
1287
1288        Ok(Device {
1289            inner: device_data,
1290            verification_machine: self.inner.verification_machine.clone(),
1291            own_identity,
1292            device_owner_identity,
1293        })
1294    }
1295
1296    ///  Get the Identity of `user_id`
1297    pub(crate) async fn get_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
1298        let own_identity = self
1299            .inner
1300            .store
1301            .get_user_identity(self.user_id())
1302            .await?
1303            .and_then(as_variant!(UserIdentityData::Own));
1304
1305        Ok(self.inner.store.get_user_identity(user_id).await?.map(|i| {
1306            UserIdentity::new(
1307                self.clone(),
1308                i,
1309                self.inner.verification_machine.to_owned(),
1310                own_identity,
1311            )
1312        }))
1313    }
1314
1315    /// Try to export the secret with the given secret name.
1316    ///
1317    /// The exported secret will be encoded as unpadded base64. Returns `Null`
1318    /// if the secret can't be found.
1319    ///
1320    /// # Arguments
1321    ///
1322    /// * `secret_name` - The name of the secret that should be exported.
1323    pub async fn export_secret(
1324        &self,
1325        secret_name: &SecretName,
1326    ) -> Result<Option<String>, CryptoStoreError> {
1327        Ok(match secret_name {
1328            SecretName::CrossSigningMasterKey
1329            | SecretName::CrossSigningUserSigningKey
1330            | SecretName::CrossSigningSelfSigningKey => {
1331                self.inner.identity.lock().await.export_secret(secret_name).await
1332            }
1333            SecretName::RecoveryKey => {
1334                if let Some(key) = self.load_backup_keys().await?.decryption_key {
1335                    let exported = key.to_base64();
1336                    Some(exported)
1337                } else {
1338                    None
1339                }
1340            }
1341            name => {
1342                warn!(secret = ?name, "Unknown secret was requested");
1343                None
1344            }
1345        })
1346    }
1347
1348    /// Export all the private cross signing keys we have.
1349    ///
1350    /// The export will contain the seed for the ed25519 keys as a unpadded
1351    /// base64 encoded string.
1352    ///
1353    /// This method returns `None` if we don't have any private cross signing
1354    /// keys.
1355    pub async fn export_cross_signing_keys(
1356        &self,
1357    ) -> Result<Option<CrossSigningKeyExport>, CryptoStoreError> {
1358        let master_key = self.export_secret(&SecretName::CrossSigningMasterKey).await?;
1359        let self_signing_key = self.export_secret(&SecretName::CrossSigningSelfSigningKey).await?;
1360        let user_signing_key = self.export_secret(&SecretName::CrossSigningUserSigningKey).await?;
1361
1362        Ok(if master_key.is_none() && self_signing_key.is_none() && user_signing_key.is_none() {
1363            None
1364        } else {
1365            Some(CrossSigningKeyExport { master_key, self_signing_key, user_signing_key })
1366        })
1367    }
1368
1369    /// Import our private cross signing keys.
1370    ///
1371    /// The export needs to contain the seed for the Ed25519 keys as an unpadded
1372    /// base64 encoded string.
1373    pub async fn import_cross_signing_keys(
1374        &self,
1375        export: CrossSigningKeyExport,
1376    ) -> Result<CrossSigningStatus, SecretImportError> {
1377        if let Some(public_identity) =
1378            self.get_identity(self.user_id()).await?.and_then(|i| i.own())
1379        {
1380            let identity = self.inner.identity.lock().await;
1381
1382            identity
1383                .import_secrets(
1384                    public_identity.to_owned(),
1385                    export.master_key.as_deref(),
1386                    export.self_signing_key.as_deref(),
1387                    export.user_signing_key.as_deref(),
1388                )
1389                .await?;
1390
1391            let status = identity.status().await;
1392
1393            let diff = identity.get_public_identity_diff(&public_identity.inner).await;
1394
1395            let mut changes =
1396                Changes { private_identity: Some(identity.clone()), ..Default::default() };
1397
1398            if diff.none_differ() {
1399                public_identity.mark_as_verified();
1400                changes.identities.changed.push(UserIdentityData::Own(public_identity.inner));
1401            }
1402
1403            info!(?status, "Successfully imported the private cross-signing keys");
1404
1405            self.save_changes(changes).await?;
1406        } else {
1407            warn!("No public identity found while importing cross-signing keys, a /keys/query needs to be done");
1408        }
1409
1410        Ok(self.inner.identity.lock().await.status().await)
1411    }
1412
1413    /// Export all the secrets we have in the store into a [`SecretsBundle`].
1414    ///
1415    /// This method will export all the private cross-signing keys and, if
1416    /// available, the private part of a backup key and its accompanying
1417    /// version.
1418    ///
1419    /// The method will fail if we don't have all three private cross-signing
1420    /// keys available.
1421    ///
1422    /// **Warning**: Only export this and share it with a trusted recipient,
1423    /// i.e. if an existing device is sharing this with a new device.
1424    pub async fn export_secrets_bundle(&self) -> Result<SecretsBundle, SecretsBundleExportError> {
1425        let Some(cross_signing) = self.export_cross_signing_keys().await? else {
1426            return Err(SecretsBundleExportError::MissingCrossSigningKeys);
1427        };
1428
1429        let Some(master_key) = cross_signing.master_key.clone() else {
1430            return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::Master));
1431        };
1432
1433        let Some(user_signing_key) = cross_signing.user_signing_key.clone() else {
1434            return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::UserSigning));
1435        };
1436
1437        let Some(self_signing_key) = cross_signing.self_signing_key.clone() else {
1438            return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::SelfSigning));
1439        };
1440
1441        let backup_keys = self.load_backup_keys().await?;
1442
1443        let backup = if let Some(key) = backup_keys.decryption_key {
1444            if let Some(backup_version) = backup_keys.backup_version {
1445                Some(BackupSecrets::MegolmBackupV1Curve25519AesSha2(
1446                    MegolmBackupV1Curve25519AesSha2Secrets { key, backup_version },
1447                ))
1448            } else {
1449                return Err(SecretsBundleExportError::MissingBackupVersion);
1450            }
1451        } else {
1452            None
1453        };
1454
1455        Ok(SecretsBundle {
1456            cross_signing: CrossSigningSecrets { master_key, user_signing_key, self_signing_key },
1457            backup,
1458        })
1459    }
1460
1461    /// Import and persists secrets from a [`SecretsBundle`].
1462    ///
1463    /// This method will import all the private cross-signing keys and, if
1464    /// available, the private part of a backup key and its accompanying
1465    /// version into the store.
1466    ///
1467    /// **Warning**: Only import this from a trusted source, i.e. if an existing
1468    /// device is sharing this with a new device. The imported cross-signing
1469    /// keys will create a [`OwnUserIdentity`] and mark it as verified.
1470    ///
1471    /// The backup key will be persisted in the store and can be enabled using
1472    /// the [`BackupMachine`].
1473    pub async fn import_secrets_bundle(
1474        &self,
1475        bundle: &SecretsBundle,
1476    ) -> Result<(), SecretImportError> {
1477        let mut changes = Changes::default();
1478
1479        if let Some(backup_bundle) = &bundle.backup {
1480            match backup_bundle {
1481                BackupSecrets::MegolmBackupV1Curve25519AesSha2(bundle) => {
1482                    changes.backup_decryption_key = Some(bundle.key.clone());
1483                    changes.backup_version = Some(bundle.backup_version.clone());
1484                }
1485            }
1486        }
1487
1488        let identity = self.inner.identity.lock().await;
1489
1490        identity
1491            .import_secrets_unchecked(
1492                Some(&bundle.cross_signing.master_key),
1493                Some(&bundle.cross_signing.self_signing_key),
1494                Some(&bundle.cross_signing.user_signing_key),
1495            )
1496            .await?;
1497
1498        let public_identity = identity.to_public_identity().await.expect(
1499            "We should be able to create a new public identity since we just imported \
1500             all the private cross-signing keys",
1501        );
1502
1503        changes.private_identity = Some(identity.clone());
1504        changes.identities.new.push(UserIdentityData::Own(public_identity));
1505
1506        Ok(self.save_changes(changes).await?)
1507    }
1508
1509    /// Import the given `secret` named `secret_name` into the keystore.
1510    pub async fn import_secret(&self, secret: &GossippedSecret) -> Result<(), SecretImportError> {
1511        match &secret.secret_name {
1512            SecretName::CrossSigningMasterKey
1513            | SecretName::CrossSigningUserSigningKey
1514            | SecretName::CrossSigningSelfSigningKey => {
1515                if let Some(public_identity) =
1516                    self.get_identity(self.user_id()).await?.and_then(|i| i.own())
1517                {
1518                    let identity = self.inner.identity.lock().await;
1519
1520                    identity
1521                        .import_secret(
1522                            public_identity,
1523                            &secret.secret_name,
1524                            &secret.event.content.secret,
1525                        )
1526                        .await?;
1527                    info!(
1528                        secret_name = ?secret.secret_name,
1529                        "Successfully imported a private cross signing key"
1530                    );
1531
1532                    let changes =
1533                        Changes { private_identity: Some(identity.clone()), ..Default::default() };
1534
1535                    self.save_changes(changes).await?;
1536                }
1537            }
1538            SecretName::RecoveryKey => {
1539                // We don't import the decryption key here since we'll want to
1540                // check if the public key matches to the latest version on the
1541                // server. We instead put the secret into a secret inbox where
1542                // it will stay until it either gets overwritten
1543                // or the user accepts the secret.
1544            }
1545            name => {
1546                warn!(secret = ?name, "Tried to import an unknown secret");
1547            }
1548        }
1549
1550        Ok(())
1551    }
1552
1553    /// Check whether there is a global flag to only encrypt messages for
1554    /// trusted devices or for everyone.
1555    pub async fn get_only_allow_trusted_devices(&self) -> Result<bool> {
1556        let value = self.get_value("only_allow_trusted_devices").await?.unwrap_or_default();
1557        Ok(value)
1558    }
1559
1560    /// Set global flag whether to encrypt messages for untrusted devices, or
1561    /// whether they should be excluded from the conversation.
1562    pub async fn set_only_allow_trusted_devices(
1563        &self,
1564        block_untrusted_devices: bool,
1565    ) -> Result<()> {
1566        self.set_value("only_allow_trusted_devices", &block_untrusted_devices).await
1567    }
1568
1569    /// Get custom stored value associated with a key
1570    pub async fn get_value<T: DeserializeOwned>(&self, key: &str) -> Result<Option<T>> {
1571        let Some(value) = self.get_custom_value(key).await? else {
1572            return Ok(None);
1573        };
1574        let deserialized = self.deserialize_value(&value)?;
1575        Ok(Some(deserialized))
1576    }
1577
1578    /// Store custom value associated with a key
1579    pub async fn set_value(&self, key: &str, value: &impl Serialize) -> Result<()> {
1580        let serialized = self.serialize_value(value)?;
1581        self.set_custom_value(key, serialized).await?;
1582        Ok(())
1583    }
1584
1585    fn serialize_value(&self, value: &impl Serialize) -> Result<Vec<u8>> {
1586        let serialized =
1587            rmp_serde::to_vec_named(value).map_err(|x| CryptoStoreError::Backend(x.into()))?;
1588        Ok(serialized)
1589    }
1590
1591    fn deserialize_value<T: DeserializeOwned>(&self, value: &[u8]) -> Result<T> {
1592        let deserialized =
1593            rmp_serde::from_slice(value).map_err(|e| CryptoStoreError::Backend(e.into()))?;
1594        Ok(deserialized)
1595    }
1596
1597    /// Receive notifications of room keys being received as a [`Stream`].
1598    ///
1599    /// Each time a room key is updated in any way, an update will be sent to
1600    /// the stream. Updates that happen at the same time are batched into a
1601    /// [`Vec`].
1602    ///
1603    /// If the reader of the stream lags too far behind an error will be sent to
1604    /// the reader.
1605    ///
1606    /// The stream will terminate once all references to the underlying
1607    /// `CryptoStoreWrapper` are dropped.
1608    pub fn room_keys_received_stream(
1609        &self,
1610    ) -> impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> {
1611        self.inner.store.room_keys_received_stream()
1612    }
1613
1614    /// Receive notifications of received `m.room_key.withheld` messages.
1615    ///
1616    /// Each time an `m.room_key.withheld` is received and stored, an update
1617    /// will be sent to the stream. Updates that happen at the same time are
1618    /// batched into a [`Vec`].
1619    ///
1620    /// If the reader of the stream lags too far behind, a warning will be
1621    /// logged and items will be dropped.
1622    pub fn room_keys_withheld_received_stream(
1623        &self,
1624    ) -> impl Stream<Item = Vec<RoomKeyWithheldInfo>> {
1625        self.inner.store.room_keys_withheld_received_stream()
1626    }
1627
1628    /// Returns a stream of user identity updates, allowing users to listen for
1629    /// notifications about new or changed user identities.
1630    ///
1631    /// The stream produced by this method emits updates whenever a new user
1632    /// identity is discovered or when an existing identities information is
1633    /// changed. Users can subscribe to this stream and receive updates in
1634    /// real-time.
1635    ///
1636    /// Caution: the returned stream will never terminate, and it holds a
1637    /// reference to the [`CryptoStore`]. Listeners should be careful to avoid
1638    /// resource leaks.
1639    ///
1640    /// # Examples
1641    ///
1642    /// ```no_run
1643    /// # use matrix_sdk_crypto::OlmMachine;
1644    /// # use ruma::{device_id, user_id};
1645    /// # use futures_util::{pin_mut, StreamExt};
1646    /// # let machine: OlmMachine = unimplemented!();
1647    /// # futures_executor::block_on(async {
1648    /// let identities_stream = machine.store().user_identities_stream();
1649    /// pin_mut!(identities_stream);
1650    ///
1651    /// for identity_updates in identities_stream.next().await {
1652    ///     for (_, identity) in identity_updates.new {
1653    ///         println!("A new identity has been added {}", identity.user_id());
1654    ///     }
1655    /// }
1656    /// # });
1657    /// ```
1658    pub fn user_identities_stream(&self) -> impl Stream<Item = IdentityUpdates> {
1659        let verification_machine = self.inner.verification_machine.to_owned();
1660
1661        let this = self.clone();
1662        self.inner.store.identities_stream().map(move |(own_identity, identities, _)| {
1663            let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
1664
1665            let map_identity = |(user_id, identity)| {
1666                (
1667                    user_id,
1668                    UserIdentity::new(
1669                        this.clone(),
1670                        identity,
1671                        verification_machine.to_owned(),
1672                        own_identity.to_owned(),
1673                    ),
1674                )
1675            };
1676
1677            let new = new_identities.into_iter().map(map_identity).collect();
1678            let changed = changed_identities.into_iter().map(map_identity).collect();
1679            let unchanged = unchanged_identities.into_iter().map(map_identity).collect();
1680
1681            IdentityUpdates { new, changed, unchanged }
1682        })
1683    }
1684
1685    /// Returns a stream of device updates, allowing users to listen for
1686    /// notifications about new or changed devices.
1687    ///
1688    /// The stream produced by this method emits updates whenever a new device
1689    /// is discovered or when an existing device's information is changed. Users
1690    /// can subscribe to this stream and receive updates in real-time.
1691    ///
1692    /// Caution: the returned stream will never terminate, and it holds a
1693    /// reference to the [`CryptoStore`]. Listeners should be careful to avoid
1694    /// resource leaks.
1695    ///
1696    /// # Examples
1697    ///
1698    /// ```no_run
1699    /// # use matrix_sdk_crypto::OlmMachine;
1700    /// # use ruma::{device_id, user_id};
1701    /// # use futures_util::{pin_mut, StreamExt};
1702    /// # let machine: OlmMachine = unimplemented!();
1703    /// # futures_executor::block_on(async {
1704    /// let devices_stream = machine.store().devices_stream();
1705    /// pin_mut!(devices_stream);
1706    ///
1707    /// for device_updates in devices_stream.next().await {
1708    ///     if let Some(user_devices) = device_updates.new.get(machine.user_id()) {
1709    ///         for device in user_devices.values() {
1710    ///             println!("A new device has been added {}", device.device_id());
1711    ///         }
1712    ///     }
1713    /// }
1714    /// # });
1715    /// ```
1716    pub fn devices_stream(&self) -> impl Stream<Item = DeviceUpdates> {
1717        let verification_machine = self.inner.verification_machine.to_owned();
1718
1719        self.inner.store.identities_stream().map(move |(own_identity, identities, devices)| {
1720            collect_device_updates(
1721                verification_machine.to_owned(),
1722                own_identity,
1723                identities,
1724                devices,
1725            )
1726        })
1727    }
1728
1729    /// Returns a [`Stream`] of user identity and device updates
1730    ///
1731    /// The stream returned by this method returns the same data as
1732    /// [`Store::user_identities_stream`] and [`Store::devices_stream`] but does
1733    /// not include references to the `VerificationMachine`. It is therefore a
1734    /// lower-level view on that data.
1735    ///
1736    /// The stream will terminate once all references to the underlying
1737    /// `CryptoStoreWrapper` are dropped.
1738    pub fn identities_stream_raw(&self) -> impl Stream<Item = (IdentityChanges, DeviceChanges)> {
1739        self.inner.store.identities_stream().map(|(_, identities, devices)| (identities, devices))
1740    }
1741
1742    /// Creates a `CrossProcessStoreLock` for this store, that will contain the
1743    /// given key and value when hold.
1744    pub fn create_store_lock(
1745        &self,
1746        lock_key: String,
1747        lock_value: String,
1748    ) -> CrossProcessStoreLock<LockableCryptoStore> {
1749        self.inner.store.create_store_lock(lock_key, lock_value)
1750    }
1751
1752    /// Receive notifications of gossipped secrets being received and stored in
1753    /// the secret inbox as a [`Stream`].
1754    ///
1755    /// The gossipped secrets are received using the `m.secret.send` event type
1756    /// and are guaranteed to have been received over a 1-to-1 Olm
1757    /// [`Session`] from a verified [`Device`].
1758    ///
1759    /// The [`GossippedSecret`] can also be later found in the secret inbox and
1760    /// retrieved using the [`CryptoStore::get_secrets_from_inbox()`] method.
1761    ///
1762    /// After a suitable secret of a certain type has been found it can be
1763    /// removed from the store
1764    /// using the [`CryptoStore::delete_secrets_from_inbox()`] method.
1765    ///
1766    /// The only secret this will currently broadcast is the
1767    /// `m.megolm_backup.v1`.
1768    ///
1769    /// If the reader of the stream lags too far behind, a warning will be
1770    /// logged and items will be dropped.
1771    ///
1772    /// # Examples
1773    ///
1774    /// ```no_run
1775    /// # use matrix_sdk_crypto::OlmMachine;
1776    /// # use ruma::{device_id, user_id};
1777    /// # use futures_util::{pin_mut, StreamExt};
1778    /// # let alice = user_id!("@alice:example.org").to_owned();
1779    /// # futures_executor::block_on(async {
1780    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1781    ///
1782    /// let secret_stream = machine.store().secrets_stream();
1783    /// pin_mut!(secret_stream);
1784    ///
1785    /// for secret in secret_stream.next().await {
1786    ///     // Accept the secret if it's valid, then delete all the secrets of this type.
1787    ///     machine.store().delete_secrets_from_inbox(&secret.secret_name);
1788    /// }
1789    /// # });
1790    /// ```
1791    pub fn secrets_stream(&self) -> impl Stream<Item = GossippedSecret> {
1792        self.inner.store.secrets_stream()
1793    }
1794
1795    /// Import the given room keys into the store.
1796    ///
1797    /// # Arguments
1798    ///
1799    /// * `exported_keys` - The keys to be imported.
1800    /// * `from_backup_version` - If the keys came from key backup, the key
1801    ///   backup version. This will cause the keys to be marked as already
1802    ///   backed up, and therefore not requiring another backup.
1803    /// * `progress_listener` - Callback which will be called after each key is
1804    ///   processed. Called with arguments `(processed, total)` where
1805    ///   `processed` is the number of keys processed so far, and `total` is the
1806    ///   total number of keys (i.e., `exported_keys.len()`).
1807    pub async fn import_room_keys(
1808        &self,
1809        exported_keys: Vec<ExportedRoomKey>,
1810        from_backup_version: Option<&str>,
1811        progress_listener: impl Fn(usize, usize),
1812    ) -> Result<RoomKeyImportResult> {
1813        let mut sessions = Vec::new();
1814
1815        async fn new_session_better(
1816            session: &InboundGroupSession,
1817            old_session: Option<InboundGroupSession>,
1818        ) -> bool {
1819            if let Some(old_session) = &old_session {
1820                session.compare(old_session).await == SessionOrdering::Better
1821            } else {
1822                true
1823            }
1824        }
1825
1826        let total_count = exported_keys.len();
1827        let mut keys = BTreeMap::new();
1828
1829        for (i, key) in exported_keys.into_iter().enumerate() {
1830            match InboundGroupSession::from_export(&key) {
1831                Ok(session) => {
1832                    let old_session = self
1833                        .inner
1834                        .store
1835                        .get_inbound_group_session(session.room_id(), session.session_id())
1836                        .await?;
1837
1838                    // Only import the session if we didn't have this session or
1839                    // if it's a better version of the same session.
1840                    if new_session_better(&session, old_session).await {
1841                        if from_backup_version.is_some() {
1842                            session.mark_as_backed_up();
1843                        }
1844
1845                        keys.entry(session.room_id().to_owned())
1846                            .or_insert_with(BTreeMap::new)
1847                            .entry(session.sender_key().to_base64())
1848                            .or_insert_with(BTreeSet::new)
1849                            .insert(session.session_id().to_owned());
1850
1851                        sessions.push(session);
1852                    }
1853                }
1854                Err(e) => {
1855                    warn!(
1856                        sender_key= key.sender_key.to_base64(),
1857                        room_id = ?key.room_id,
1858                        session_id = key.session_id,
1859                        error = ?e,
1860                        "Couldn't import a room key from a file export."
1861                    );
1862                }
1863            }
1864
1865            progress_listener(i, total_count);
1866        }
1867
1868        let imported_count = sessions.len();
1869
1870        self.inner.store.save_inbound_group_sessions(sessions, from_backup_version).await?;
1871
1872        info!(total_count, imported_count, room_keys = ?keys, "Successfully imported room keys");
1873
1874        Ok(RoomKeyImportResult::new(imported_count, total_count, keys))
1875    }
1876
1877    /// Import the given room keys into our store.
1878    ///
1879    /// # Arguments
1880    ///
1881    /// * `exported_keys` - A list of previously exported keys that should be
1882    ///   imported into our store. If we already have a better version of a key
1883    ///   the key will *not* be imported.
1884    ///
1885    /// Returns a tuple of numbers that represent the number of sessions that
1886    /// were imported and the total number of sessions that were found in the
1887    /// key export.
1888    ///
1889    /// # Examples
1890    ///
1891    /// ```no_run
1892    /// # use std::io::Cursor;
1893    /// # use matrix_sdk_crypto::{OlmMachine, decrypt_room_key_export};
1894    /// # use ruma::{device_id, user_id};
1895    /// # let alice = user_id!("@alice:example.org");
1896    /// # async {
1897    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1898    /// # let export = Cursor::new("".to_owned());
1899    /// let exported_keys = decrypt_room_key_export(export, "1234").unwrap();
1900    /// machine.store().import_exported_room_keys(exported_keys, |_, _| {}).await.unwrap();
1901    /// # };
1902    /// ```
1903    pub async fn import_exported_room_keys(
1904        &self,
1905        exported_keys: Vec<ExportedRoomKey>,
1906        progress_listener: impl Fn(usize, usize),
1907    ) -> Result<RoomKeyImportResult> {
1908        self.import_room_keys(exported_keys, None, progress_listener).await
1909    }
1910
1911    pub(crate) fn crypto_store(&self) -> Arc<CryptoStoreWrapper> {
1912        self.inner.store.clone()
1913    }
1914
1915    /// Export the keys that match the given predicate.
1916    ///
1917    /// # Arguments
1918    ///
1919    /// * `predicate` - A closure that will be called for every known
1920    ///   `InboundGroupSession`, which represents a room key. If the closure
1921    ///   returns `true` the `InboundGroupSession` will be included in the
1922    ///   export, if the closure returns `false` it will not be included.
1923    ///
1924    /// # Examples
1925    ///
1926    /// ```no_run
1927    /// # use matrix_sdk_crypto::{OlmMachine, encrypt_room_key_export};
1928    /// # use ruma::{device_id, user_id, room_id};
1929    /// # let alice = user_id!("@alice:example.org");
1930    /// # async {
1931    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1932    /// let room_id = room_id!("!test:localhost");
1933    /// let exported_keys = machine.store().export_room_keys(|s| s.room_id() == room_id).await.unwrap();
1934    /// let encrypted_export = encrypt_room_key_export(&exported_keys, "1234", 1);
1935    /// # };
1936    /// ```
1937    pub async fn export_room_keys(
1938        &self,
1939        predicate: impl FnMut(&InboundGroupSession) -> bool,
1940    ) -> Result<Vec<ExportedRoomKey>> {
1941        let mut exported = Vec::new();
1942
1943        let mut sessions = self.get_inbound_group_sessions().await?;
1944        sessions.retain(predicate);
1945
1946        for session in sessions {
1947            let export = session.export().await;
1948            exported.push(export);
1949        }
1950
1951        Ok(exported)
1952    }
1953
1954    /// Export room keys matching a predicate, providing them as an async
1955    /// `Stream`.
1956    ///
1957    /// # Arguments
1958    ///
1959    /// * `predicate` - A closure that will be called for every known
1960    ///   `InboundGroupSession`, which represents a room key. If the closure
1961    ///   returns `true` the `InboundGroupSession` will be included in the
1962    ///   export, if the closure returns `false` it will not be included.
1963    ///
1964    /// # Examples
1965    ///
1966    /// ```no_run
1967    /// use std::pin::pin;
1968    ///
1969    /// use matrix_sdk_crypto::{olm::ExportedRoomKey, OlmMachine};
1970    /// use ruma::{device_id, room_id, user_id};
1971    /// use tokio_stream::StreamExt;
1972    /// # async {
1973    /// let alice = user_id!("@alice:example.org");
1974    /// let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1975    /// let room_id = room_id!("!test:localhost");
1976    /// let mut keys = pin!(machine
1977    ///     .store()
1978    ///     .export_room_keys_stream(|s| s.room_id() == room_id)
1979    ///     .await
1980    ///     .unwrap());
1981    /// while let Some(key) = keys.next().await {
1982    ///     println!("{}", key.room_id);
1983    /// }
1984    /// # };
1985    /// ```
1986    pub async fn export_room_keys_stream(
1987        &self,
1988        predicate: impl FnMut(&InboundGroupSession) -> bool,
1989    ) -> Result<impl Stream<Item = ExportedRoomKey>> {
1990        // TODO: if/when there is a get_inbound_group_sessions_stream, use that here.
1991        let sessions = self.get_inbound_group_sessions().await?;
1992        Ok(futures_util::stream::iter(sessions.into_iter().filter(predicate))
1993            .then(|session| async move { session.export().await }))
1994    }
1995
1996    /// Assemble a room key bundle for sharing encrypted history, as per
1997    /// [MSC4268].
1998    ///
1999    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
2000    pub async fn build_room_key_bundle(
2001        &self,
2002        room_id: &RoomId,
2003    ) -> std::result::Result<RoomKeyBundle, CryptoStoreError> {
2004        // TODO: make this WAY more efficient. We should only fetch sessions for the
2005        //   correct room.
2006        let mut sessions = self.get_inbound_group_sessions().await?;
2007        sessions.retain(|session| session.room_id == room_id);
2008
2009        let mut bundle = RoomKeyBundle::default();
2010        for session in sessions {
2011            if session.shared_history() {
2012                bundle.room_keys.push(session.export().await.into());
2013            } else {
2014                bundle.withheld.push(RoomKeyWithheldContent::new(
2015                    session.algorithm().to_owned(),
2016                    WithheldCode::Unauthorised,
2017                    session.room_id().to_owned(),
2018                    session.session_id().to_owned(),
2019                    session.sender_key().to_owned(),
2020                    self.device_id().to_owned(),
2021                ));
2022            }
2023        }
2024
2025        Ok(bundle)
2026    }
2027}
2028
2029impl Deref for Store {
2030    type Target = DynCryptoStore;
2031
2032    fn deref(&self) -> &Self::Target {
2033        self.inner.store.deref().deref()
2034    }
2035}
2036
2037/// A crypto store that implements primitives for cross-process locking.
2038#[derive(Clone, Debug)]
2039pub struct LockableCryptoStore(Arc<dyn CryptoStore<Error = CryptoStoreError>>);
2040
2041#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
2042#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
2043impl matrix_sdk_common::store_locks::BackingStore for LockableCryptoStore {
2044    type LockError = CryptoStoreError;
2045
2046    async fn try_lock(
2047        &self,
2048        lease_duration_ms: u32,
2049        key: &str,
2050        holder: &str,
2051    ) -> std::result::Result<bool, Self::LockError> {
2052        self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
2053    }
2054}
2055
2056#[cfg(test)]
2057mod tests {
2058    use std::pin::pin;
2059
2060    use futures_util::StreamExt;
2061    use insta::{_macro_support::Content, assert_json_snapshot, internals::ContentPath};
2062    use matrix_sdk_test::async_test;
2063    use ruma::{device_id, room_id, user_id, RoomId};
2064    use vodozemac::megolm::SessionKey;
2065
2066    use crate::{
2067        machine::test_helpers::get_machine_pair,
2068        olm::{InboundGroupSession, SenderData},
2069        store::DehydratedDeviceKey,
2070        types::EventEncryptionAlgorithm,
2071        OlmMachine,
2072    };
2073
2074    #[async_test]
2075    async fn test_import_room_keys_notifies_stream() {
2076        use futures_util::FutureExt;
2077
2078        let (alice, bob, _) =
2079            get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2080
2081        let room1_id = room_id!("!room1:localhost");
2082        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2083        let exported_sessions = alice.store().export_room_keys(|_| true).await.unwrap();
2084
2085        let mut room_keys_received_stream = Box::pin(bob.store().room_keys_received_stream());
2086        bob.store().import_room_keys(exported_sessions, None, |_, _| {}).await.unwrap();
2087
2088        let room_keys = room_keys_received_stream
2089            .next()
2090            .now_or_never()
2091            .flatten()
2092            .expect("We should have received an update of room key infos")
2093            .unwrap();
2094        assert_eq!(room_keys.len(), 1);
2095        assert_eq!(room_keys[0].room_id, "!room1:localhost");
2096    }
2097
2098    #[async_test]
2099    async fn test_export_room_keys_provides_selected_keys() {
2100        // Given an OlmMachine with room keys in it
2101        let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2102        let room1_id = room_id!("!room1:localhost");
2103        let room2_id = room_id!("!room2:localhost");
2104        let room3_id = room_id!("!room3:localhost");
2105        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2106        alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2107        alice.create_outbound_group_session_with_defaults_test_helper(room3_id).await.unwrap();
2108
2109        // When I export some of the keys
2110        let keys = alice
2111            .store()
2112            .export_room_keys(|s| s.room_id() == room2_id || s.room_id() == room3_id)
2113            .await
2114            .unwrap();
2115
2116        // Then the requested keys were provided
2117        assert_eq!(keys.len(), 2);
2118        assert_eq!(keys[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2119        assert_eq!(keys[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2120        assert_eq!(keys[0].room_id, "!room2:localhost");
2121        assert_eq!(keys[1].room_id, "!room3:localhost");
2122        assert_eq!(keys[0].session_key.to_base64().len(), 220);
2123        assert_eq!(keys[1].session_key.to_base64().len(), 220);
2124    }
2125
2126    #[async_test]
2127    async fn test_export_room_keys_stream_can_provide_all_keys() {
2128        // Given an OlmMachine with room keys in it
2129        let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2130        let room1_id = room_id!("!room1:localhost");
2131        let room2_id = room_id!("!room2:localhost");
2132        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2133        alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2134
2135        // When I export the keys as a stream
2136        let mut keys = pin!(alice.store().export_room_keys_stream(|_| true).await.unwrap());
2137
2138        // And collect them
2139        let mut collected = vec![];
2140        while let Some(key) = keys.next().await {
2141            collected.push(key);
2142        }
2143
2144        // Then all the keys were provided
2145        assert_eq!(collected.len(), 2);
2146        assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2147        assert_eq!(collected[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2148        assert_eq!(collected[0].room_id, "!room1:localhost");
2149        assert_eq!(collected[1].room_id, "!room2:localhost");
2150        assert_eq!(collected[0].session_key.to_base64().len(), 220);
2151        assert_eq!(collected[1].session_key.to_base64().len(), 220);
2152    }
2153
2154    #[async_test]
2155    async fn test_export_room_keys_stream_can_provide_a_subset_of_keys() {
2156        // Given an OlmMachine with room keys in it
2157        let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2158        let room1_id = room_id!("!room1:localhost");
2159        let room2_id = room_id!("!room2:localhost");
2160        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2161        alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2162
2163        // When I export the keys as a stream
2164        let mut keys =
2165            pin!(alice.store().export_room_keys_stream(|s| s.room_id() == room1_id).await.unwrap());
2166
2167        // And collect them
2168        let mut collected = vec![];
2169        while let Some(key) = keys.next().await {
2170            collected.push(key);
2171        }
2172
2173        // Then all the keys matching our predicate were provided, and no others
2174        assert_eq!(collected.len(), 1);
2175        assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2176        assert_eq!(collected[0].room_id, "!room1:localhost");
2177        assert_eq!(collected[0].session_key.to_base64().len(), 220);
2178    }
2179
2180    #[async_test]
2181    async fn test_export_secrets_bundle() {
2182        let user_id = user_id!("@alice:example.com");
2183        let (first, second, _) = get_machine_pair(user_id, user_id, false).await;
2184
2185        let _ = first
2186            .bootstrap_cross_signing(false)
2187            .await
2188            .expect("We should be able to bootstrap cross-signing");
2189
2190        let bundle = first.store().export_secrets_bundle().await.expect(
2191            "We should be able to export the secrets bundle, now that we \
2192             have the cross-signing keys",
2193        );
2194
2195        assert!(bundle.backup.is_none(), "The bundle should not contain a backup key");
2196
2197        second
2198            .store()
2199            .import_secrets_bundle(&bundle)
2200            .await
2201            .expect("We should be able to import the secrets bundle");
2202
2203        let status = second.cross_signing_status().await;
2204        let identity = second.get_identity(user_id, None).await.unwrap().unwrap().own().unwrap();
2205
2206        assert!(identity.is_verified(), "The public identity should be marked as verified.");
2207
2208        assert!(status.is_complete(), "We should have imported all the cross-signing keys");
2209    }
2210
2211    #[async_test]
2212    async fn test_create_dehydrated_device_key() {
2213        let pickle_key = DehydratedDeviceKey::new()
2214            .expect("Should be able to create a random dehydrated device key");
2215
2216        let to_vec = pickle_key.inner.to_vec();
2217        let pickle_key_from_slice = DehydratedDeviceKey::from_slice(to_vec.as_slice())
2218            .expect("Should be able to create a dehydrated device key from slice");
2219
2220        assert_eq!(pickle_key_from_slice.to_base64(), pickle_key.to_base64());
2221    }
2222
2223    #[async_test]
2224    async fn test_create_dehydrated_errors() {
2225        let too_small = [0u8; 22];
2226        let pickle_key = DehydratedDeviceKey::from_slice(&too_small);
2227
2228        assert!(pickle_key.is_err());
2229
2230        let too_big = [0u8; 40];
2231        let pickle_key = DehydratedDeviceKey::from_slice(&too_big);
2232
2233        assert!(pickle_key.is_err());
2234    }
2235
2236    #[async_test]
2237    async fn test_build_room_key_bundle() {
2238        // Given: Alice has sent a number of room keys to Bob, including some in the
2239        // wrong room, and some that are not marked as shared...
2240        let alice = OlmMachine::new(user_id!("@a:s.co"), device_id!("ALICE")).await;
2241        let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("BOB")).await;
2242
2243        let room1_id = room_id!("!room1:localhost");
2244        let room2_id = room_id!("!room2:localhost");
2245
2246        /* We use hardcoded megolm session data, to get a stable output snapshot. These were all created with:
2247
2248           println!("{}", vodozemac::megolm::GroupSession::new(Default::default()).session_key().to_base64());
2249        */
2250        let session_key1 = "AgAAAAC2XHVzsMBKs4QCRElJ92CJKyGtknCSC8HY7cQ7UYwndMKLQAejXLh5UA0l6s736mgctcUMNvELScUWrObdflrHo+vth/gWreXOaCnaSxmyjjKErQwyIYTkUfqbHy40RJfEesLwnN23on9XAkch/iy8R2+Jz7B8zfG01f2Ow2SxPQFnAndcO1ZSD2GmXgedy6n4B20MWI1jGP2wiexOWbFSya8DO/VxC9m5+/mF+WwYqdpKn9g4Y05Yw4uz7cdjTc3rXm7xK+8E7hI//5QD1nHPvuKYbjjM9u2JSL+Bzp61Cw";
2251        let session_key2 = "AgAAAAC1BXreFTUQQSBGekTEuYxhdytRKyv4JgDGcG+VOBYdPNGgs807SdibCGJky4lJ3I+7ZDGHoUzZPZP/4ogGu4kxni0PWdtWuN7+5zsuamgoFF/BkaGeUUGv6kgIkx8pyPpM5SASTUEP9bN2loDSpUPYwfiIqz74DgC4WQ4435sTBctYvKz8n+TDJwdLXpyT6zKljuqADAioud+s/iqx9LYn9HpbBfezZcvbg67GtE113pLrvde3IcPI5s6dNHK2onGO2B2eoaobcen18bbEDnlUGPeIivArLya7Da6us14jBQ";
2252        let session_key3 = "AgAAAAAM9KFsliaUUhGSXgwOzM5UemjkNH4n8NHgvC/y8hhw13zTF+ooGD4uIYEXYX630oNvQm/EvgZo+dkoc0re+vsqsx4sQeNODdSjcBsWOa0oDF+irQn9oYoLUDPI1IBtY1rX+FV99Zm/xnG7uFOX7aTVlko2GSdejy1w9mfobmfxu5aUc04A9zaKJP1pOthZvRAlhpymGYHgsDtWPrrjyc/yypMflE4kIUEEEtu1kT6mrAmcl615XYRAHYK9G2+fZsGvokwzbkl4nulGwcZMpQEoM0nD2o3GWgX81HW3nGfKBg";
2253        let session_key4 = "AgAAAAA4Kkesxq2h4v9PLD6Sm3Smxspz1PXTqytQPCMQMkkrHNmzV2bHlJ+6/Al9cu8vh1Oj69AK0WUAeJOJuaiskEeg/PI3P03+UYLeC379RzgqwSHdBgdQ41G2vD6zpgmE/8vYToe+qpCZACtPOswZxyqxHH+T/Iq0nv13JmlFGIeA6fEPfr5Y28B49viG74Fs9rxV9EH5PfjbuPM/p+Sz5obShuaBPKQBX1jT913nEXPoIJ06exNZGr0285nw/LgVvNlmWmbqNnbzO2cNZjQWA+xZYz5FSfyCxwqEBbEdUCuRCQ";
2254
2255        let sessions = [
2256            create_inbound_group_session_with_visibility(
2257                &alice,
2258                room1_id,
2259                &SessionKey::from_base64(session_key1).unwrap(),
2260                true,
2261            ),
2262            create_inbound_group_session_with_visibility(
2263                &alice,
2264                room1_id,
2265                &SessionKey::from_base64(session_key2).unwrap(),
2266                true,
2267            ),
2268            create_inbound_group_session_with_visibility(
2269                &alice,
2270                room1_id,
2271                &SessionKey::from_base64(session_key3).unwrap(),
2272                false,
2273            ),
2274            create_inbound_group_session_with_visibility(
2275                &alice,
2276                room2_id,
2277                &SessionKey::from_base64(session_key4).unwrap(),
2278                true,
2279            ),
2280        ];
2281        bob.store().save_inbound_group_sessions(&sessions).await.unwrap();
2282
2283        // When I build the bundle
2284        let mut bundle = bob.store().build_room_key_bundle(room1_id).await.unwrap();
2285
2286        // Then the bundle matches the snapshot.
2287
2288        // We sort the sessions in the bundle, so that the snapshot is stable.
2289        bundle.room_keys.sort_by_key(|session| session.session_id.clone());
2290
2291        // We also substitute alice's keys in the snapshot with placeholders
2292        let alice_curve_key = alice.identity_keys().curve25519.to_base64();
2293        let map_alice_curve_key = move |value: Content, _path: ContentPath<'_>| {
2294            assert_eq!(value.as_str().unwrap(), alice_curve_key);
2295            "[alice curve key]"
2296        };
2297        let alice_ed25519_key = alice.identity_keys().ed25519.to_base64();
2298        let map_alice_ed25519_key = move |value: Content, _path: ContentPath<'_>| {
2299            assert_eq!(value.as_str().unwrap(), alice_ed25519_key);
2300            "[alice ed25519 key]"
2301        };
2302
2303        insta::with_settings!({ sort_maps => true }, {
2304            assert_json_snapshot!(bundle, {
2305                ".room_keys[].sender_key" => insta::dynamic_redaction(map_alice_curve_key.clone()),
2306                ".withheld[].sender_key" => insta::dynamic_redaction(map_alice_curve_key),
2307                ".room_keys[].sender_claimed_keys.ed25519" => insta::dynamic_redaction(map_alice_ed25519_key),
2308            });
2309        });
2310    }
2311
2312    /// Create an inbound Megolm session for the given room.
2313    ///
2314    /// `olm_machine` is used to set the `sender_key` and `signing_key`
2315    /// fields of the resultant session.
2316    fn create_inbound_group_session_with_visibility(
2317        olm_machine: &OlmMachine,
2318        room_id: &RoomId,
2319        session_key: &SessionKey,
2320        shared_history: bool,
2321    ) -> InboundGroupSession {
2322        let identity_keys = &olm_machine.store().static_account().identity_keys;
2323        InboundGroupSession::new(
2324            identity_keys.curve25519,
2325            identity_keys.ed25519,
2326            room_id,
2327            session_key,
2328            SenderData::unknown(),
2329            EventEncryptionAlgorithm::MegolmV1AesSha2,
2330            None,
2331            shared_history,
2332        )
2333        .unwrap()
2334    }
2335}