Skip to main content

matrix_sdk_crypto/store/
mod.rs

1// Copyright 2020 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Types and traits to implement the storage layer for the [`OlmMachine`]
16//!
17//! The storage layer for the [`OlmMachine`] can be customized using a trait.
18//! Implementing your own [`CryptoStore`]
19//!
20//! An in-memory only store is provided as well as an SQLite-based one,
21//! depending on your needs and targets a custom store may be implemented, e.g.
22//! for `wasm-unknown-unknown` an indexeddb store would be needed
23//!
24//! ```
25//! # use std::sync::Arc;
26//! # use matrix_sdk_crypto::{
27//! #     OlmMachine,
28//! #     store::MemoryStore,
29//! # };
30//! # use ruma::{device_id, user_id};
31//! # let user_id = user_id!("@example:localhost");
32//! # let device_id = device_id!("TEST");
33//! let store = Arc::new(MemoryStore::new());
34//!
35//! let machine = OlmMachine::with_store(user_id, device_id, store, None);
36//! ```
37//!
38//! [`OlmMachine`]: /matrix_sdk_crypto/struct.OlmMachine.html
39//! [`CryptoStore`]: trait.Cryptostore.html
40
41use std::{
42    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
43    fmt::Debug,
44    ops::Deref,
45    pin::pin,
46    sync::{Arc, atomic::Ordering},
47    time::Duration,
48};
49
50use as_variant::as_variant;
51use futures_core::Stream;
52use futures_util::StreamExt;
53use itertools::{Either, Itertools};
54use ruma::{
55    DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, RoomId, UserId,
56    encryption::KeyUsage, events::secret::request::SecretName,
57};
58use serde::{Serialize, de::DeserializeOwned};
59use thiserror::Error;
60use tokio::sync::{Mutex, Notify, OwnedRwLockWriteGuard, RwLock};
61use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
62use tracing::{info, instrument, trace, warn};
63use types::{RoomKeyBundleInfo, StoredRoomKeyBundleData};
64use vodozemac::{Curve25519PublicKey, megolm::SessionOrdering};
65
66use self::types::{
67    Changes, CrossSigningKeyExport, DeviceChanges, DeviceUpdates, IdentityChanges, IdentityUpdates,
68    PendingChanges, RoomKeyInfo, RoomKeyWithheldInfo, RoomPendingKeyBundleDetails,
69    UserKeyQueryResult,
70};
71use crate::{
72    CrossSigningStatus, OwnUserIdentityData, RoomKeyImportResult,
73    gossiping::GossippedSecret,
74    identities::{Device, DeviceData, UserDevices, UserIdentityData, user::UserIdentity},
75    olm::{
76        Account, ExportedRoomKey, ForwarderData, InboundGroupSession, PrivateCrossSigningIdentity,
77        SenderData, Session, StaticAccountData,
78    },
79    store::types::RoomKeyWithheldEntry,
80    types::{
81        BackupSecrets, CrossSigningSecrets, MegolmBackupV1Curve25519AesSha2Secrets, RoomKeyExport,
82        SecretsBundle,
83    },
84    verification::VerificationMachine,
85};
86#[cfg(doc)]
87use crate::{backups::BackupMachine, identities::OwnUserIdentity};
88
89pub mod caches;
90mod crypto_store_wrapper;
91mod error;
92mod memorystore;
93mod traits;
94pub mod types;
95
96#[cfg(any(test, feature = "testing"))]
97#[macro_use]
98#[allow(missing_docs)]
99pub mod integration_tests;
100
101pub(crate) use crypto_store_wrapper::CryptoStoreWrapper;
102pub use error::{CryptoStoreError, Result};
103use matrix_sdk_common::{
104    cross_process_lock::{CrossProcessLock, CrossProcessLockConfig, CrossProcessLockGeneration},
105    deserialized_responses::WithheldCode,
106    timeout::timeout,
107};
108pub use memorystore::MemoryStore;
109pub use traits::{CryptoStore, DynCryptoStore, IntoCryptoStore};
110
111use self::caches::{SequenceNumber, StoreCache, StoreCacheGuard, UsersForKeyQuery};
112use crate::types::{
113    events::room_key_withheld::RoomKeyWithheldContent, room_history::RoomKeyBundle,
114};
115pub use crate::{
116    dehydrated_devices::DehydrationError,
117    gossiping::{GossipRequest, SecretInfo},
118};
119
120/// A wrapper for our CryptoStore trait object.
121///
122/// This is needed because we want to have a generic interface so we can
123/// store/restore objects that we can serialize. Since trait objects and
124/// generics don't mix let the CryptoStore store strings and this wrapper
125/// adds the generic interface on top.
126#[derive(Debug, Clone)]
127pub struct Store {
128    inner: Arc<StoreInner>,
129}
130
131#[derive(Debug, Default)]
132pub(crate) struct KeyQueryManager {
133    /// Record of the users that are waiting for a /keys/query.
134    users_for_key_query: Mutex<UsersForKeyQuery>,
135
136    /// Notifier that is triggered each time an update is received for a user.
137    users_for_key_query_notify: Notify,
138}
139
140impl KeyQueryManager {
141    pub async fn synced<'a>(&'a self, cache: &'a StoreCache) -> Result<SyncedKeyQueryManager<'a>> {
142        self.ensure_sync_tracked_users(cache).await?;
143        Ok(SyncedKeyQueryManager { cache, manager: self })
144    }
145
146    /// Load the list of users for whom we are tracking their device lists and
147    /// fill out our caches.
148    ///
149    /// This method ensures that we're only going to load the users from the
150    /// actual [`CryptoStore`] once, it will also make sure that any
151    /// concurrent calls to this method get deduplicated.
152    async fn ensure_sync_tracked_users(&self, cache: &StoreCache) -> Result<()> {
153        // Check if the users are loaded, and in that case do nothing.
154        let loaded = cache.loaded_tracked_users.read().await;
155        if *loaded {
156            return Ok(());
157        }
158
159        // Otherwise, we may load the users.
160        drop(loaded);
161        let mut loaded = cache.loaded_tracked_users.write().await;
162
163        // Check again if the users have been loaded, in case another call to this
164        // method loaded the tracked users between the time we tried to
165        // acquire the lock and the time we actually acquired the lock.
166        if *loaded {
167            return Ok(());
168        }
169
170        let tracked_users = cache.store.load_tracked_users().await?;
171
172        let mut query_users_lock = self.users_for_key_query.lock().await;
173        let mut tracked_users_cache = cache.tracked_users.write();
174        for user in tracked_users {
175            tracked_users_cache.insert(user.user_id.to_owned());
176
177            if user.dirty {
178                query_users_lock.insert_user(&user.user_id);
179            }
180        }
181
182        *loaded = true;
183
184        Ok(())
185    }
186
187    /// Wait for a `/keys/query` response to be received if one is expected for
188    /// the given user.
189    ///
190    /// If the given timeout elapses, the method will stop waiting and return
191    /// [`UserKeyQueryResult::TimeoutExpired`].
192    ///
193    /// Requires a [`StoreCacheGuard`] to make sure the users for which a key
194    /// query is pending are up to date, but doesn't hold on to it
195    /// thereafter: the lock is short-lived in this case.
196    pub async fn wait_if_user_key_query_pending(
197        &self,
198        cache: StoreCacheGuard,
199        timeout_duration: Duration,
200        user: &UserId,
201    ) -> Result<UserKeyQueryResult> {
202        {
203            // Drop the cache early, so we don't keep it while waiting (since writing the
204            // results requires to write in the cache, thus take another lock).
205            self.ensure_sync_tracked_users(&cache).await?;
206            drop(cache);
207        }
208
209        let mut users_for_key_query = self.users_for_key_query.lock().await;
210        let Some(waiter) = users_for_key_query.maybe_register_waiting_task(user) else {
211            return Ok(UserKeyQueryResult::WasNotPending);
212        };
213
214        let wait_for_completion = async {
215            while !waiter.completed.load(Ordering::Relaxed) {
216                // Register for being notified before releasing the mutex, so
217                // it's impossible to miss a wakeup between the last check for
218                // whether we should wait, and starting to wait.
219                let mut notified = pin!(self.users_for_key_query_notify.notified());
220                notified.as_mut().enable();
221                drop(users_for_key_query);
222
223                // Wait for a notification
224                notified.await;
225
226                // Reclaim the lock before checking the flag to avoid races
227                // when two notifications happen right after each other and the
228                // second one sets the flag we want to wait for.
229                users_for_key_query = self.users_for_key_query.lock().await;
230            }
231        };
232
233        match timeout(Box::pin(wait_for_completion), timeout_duration).await {
234            Err(_) => {
235                warn!(
236                    user_id = ?user,
237                    "The user has a pending `/keys/query` request which did \
238                    not finish yet, some devices might be missing."
239                );
240
241                Ok(UserKeyQueryResult::TimeoutExpired)
242            }
243            _ => Ok(UserKeyQueryResult::WasPending),
244        }
245    }
246}
247
248pub(crate) struct SyncedKeyQueryManager<'a> {
249    cache: &'a StoreCache,
250    manager: &'a KeyQueryManager,
251}
252
253impl SyncedKeyQueryManager<'_> {
254    /// Add entries to the list of users being tracked for device changes
255    ///
256    /// Any users not already on the list are flagged as awaiting a key query.
257    /// Users that were already in the list are unaffected.
258    pub async fn update_tracked_users(&self, users: impl Iterator<Item = &UserId>) -> Result<()> {
259        let mut store_updates = Vec::new();
260        let mut key_query_lock = self.manager.users_for_key_query.lock().await;
261
262        {
263            let mut tracked_users = self.cache.tracked_users.write();
264            for user_id in users {
265                if tracked_users.insert(user_id.to_owned()) {
266                    key_query_lock.insert_user(user_id);
267                    store_updates.push((user_id, true))
268                }
269            }
270        }
271
272        self.cache.store.save_tracked_users(&store_updates).await
273    }
274
275    /// Process notifications that users have changed devices.
276    ///
277    /// This is used to handle the list of device-list updates that is received
278    /// from the `/sync` response. Any users *whose device lists we are
279    /// tracking* are flagged as needing a key query. Users whose devices we
280    /// are not tracking are ignored.
281    pub async fn mark_tracked_users_as_changed(
282        &self,
283        users: impl Iterator<Item = &UserId>,
284    ) -> Result<()> {
285        let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
286        let mut key_query_lock = self.manager.users_for_key_query.lock().await;
287
288        {
289            let tracked_users = &self.cache.tracked_users.read();
290            for user_id in users {
291                if tracked_users.contains(user_id) {
292                    key_query_lock.insert_user(user_id);
293                    store_updates.push((user_id, true));
294                }
295            }
296        }
297
298        self.cache.store.save_tracked_users(&store_updates).await
299    }
300
301    /// Flag that the given users devices are now up-to-date.
302    ///
303    /// This is called after processing the response to a /keys/query request.
304    /// Any users whose device lists we are tracking are removed from the
305    /// list of those pending a /keys/query.
306    pub async fn mark_tracked_users_as_up_to_date(
307        &self,
308        users: impl Iterator<Item = &UserId>,
309        sequence_number: SequenceNumber,
310    ) -> Result<()> {
311        let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
312        let mut key_query_lock = self.manager.users_for_key_query.lock().await;
313
314        {
315            let tracked_users = self.cache.tracked_users.read();
316            for user_id in users {
317                if tracked_users.contains(user_id) {
318                    let clean = key_query_lock.maybe_remove_user(user_id, sequence_number);
319                    store_updates.push((user_id, !clean));
320                }
321            }
322        }
323
324        self.cache.store.save_tracked_users(&store_updates).await?;
325        // wake up any tasks that may have been waiting for updates
326        self.manager.users_for_key_query_notify.notify_waiters();
327
328        Ok(())
329    }
330
331    /// Get the set of users that has the outdate/dirty flag set for their list
332    /// of devices.
333    ///
334    /// This set should be included in a `/keys/query` request which will update
335    /// the device list.
336    ///
337    /// # Returns
338    ///
339    /// A pair `(users, sequence_number)`, where `users` is the list of users to
340    /// be queried, and `sequence_number` is the current sequence number,
341    /// which should be returned in `mark_tracked_users_as_up_to_date`.
342    pub async fn users_for_key_query(&self) -> (HashSet<OwnedUserId>, SequenceNumber) {
343        self.manager.users_for_key_query.lock().await.users_for_key_query()
344    }
345
346    /// See the docs for [`crate::OlmMachine::tracked_users()`].
347    pub fn tracked_users(&self) -> HashSet<OwnedUserId> {
348        self.cache.tracked_users.read().iter().cloned().collect()
349    }
350
351    /// Mark the given user as being tracked for device lists, and mark that it
352    /// has an outdated device list.
353    ///
354    /// This means that the user will be considered for a `/keys/query` request
355    /// next time [`Store::users_for_key_query()`] is called.
356    pub async fn mark_user_as_changed(&self, user: &UserId) -> Result<()> {
357        self.manager.users_for_key_query.lock().await.insert_user(user);
358        self.cache.tracked_users.write().insert(user.to_owned());
359
360        self.cache.store.save_tracked_users(&[(user, true)]).await
361    }
362}
363
364/// Convert the devices and vectors contained in the [`DeviceChanges`] into
365/// a [`DeviceUpdates`] struct.
366///
367/// The [`DeviceChanges`] will contain vectors of [`DeviceData`]s which
368/// we want to convert to a [`Device`].
369fn collect_device_updates(
370    verification_machine: VerificationMachine,
371    own_identity: Option<OwnUserIdentityData>,
372    identities: IdentityChanges,
373    devices: DeviceChanges,
374) -> DeviceUpdates {
375    let mut new: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
376    let mut changed: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
377
378    let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
379
380    let map_device = |device: DeviceData| {
381        let device_owner_identity = new_identities
382            .get(device.user_id())
383            .or_else(|| changed_identities.get(device.user_id()))
384            .or_else(|| unchanged_identities.get(device.user_id()))
385            .cloned();
386
387        Device {
388            inner: device,
389            verification_machine: verification_machine.to_owned(),
390            own_identity: own_identity.to_owned(),
391            device_owner_identity,
392        }
393    };
394
395    for device in devices.new {
396        let device = map_device(device);
397
398        new.entry(device.user_id().to_owned())
399            .or_default()
400            .insert(device.device_id().to_owned(), device);
401    }
402
403    for device in devices.changed {
404        let device = map_device(device);
405
406        changed
407            .entry(device.user_id().to_owned())
408            .or_default()
409            .insert(device.device_id().to_owned(), device.to_owned());
410    }
411
412    DeviceUpdates { new, changed }
413}
414
415/// A temporary transaction (that implies a write) to the underlying store.
416#[allow(missing_debug_implementations)]
417pub struct StoreTransaction {
418    store: Store,
419    changes: PendingChanges,
420    // TODO hold onto the cross-process crypto store lock + cache.
421    cache: OwnedRwLockWriteGuard<StoreCache>,
422}
423
424impl StoreTransaction {
425    /// Starts a new `StoreTransaction`.
426    async fn new(store: Store) -> Self {
427        let cache = store.inner.cache.clone();
428
429        Self { store, changes: PendingChanges::default(), cache: cache.clone().write_owned().await }
430    }
431
432    pub(crate) fn cache(&self) -> &StoreCache {
433        &self.cache
434    }
435
436    /// Returns a reference to the current `Store`.
437    pub fn store(&self) -> &Store {
438        &self.store
439    }
440
441    /// Gets a `Account` for update.
442    ///
443    /// Note: since it's guaranteed that one can't have both a
444    /// `StoreTransaction` and a `StoreCacheGuard` at runtime (since the
445    /// underlying `StoreCache` is guarded by a `RwLock` mutex), this ensures
446    /// that we can't have two copies of an `Account` alive at the same time.
447    pub async fn account(&mut self) -> Result<&mut Account> {
448        if self.changes.account.is_none() {
449            // Make sure the cache loaded the account.
450            let _ = self.cache.account().await?;
451            self.changes.account = self.cache.account.lock().await.take();
452        }
453        Ok(self.changes.account.as_mut().unwrap())
454    }
455
456    /// Commits all dirty fields to the store, and maintains the cache so it
457    /// reflects the current state of the database.
458    pub async fn commit(self) -> Result<()> {
459        if self.changes.is_empty() {
460            return Ok(());
461        }
462
463        // Save changes in the database.
464        let account = self.changes.account.as_ref().map(|acc| acc.deep_clone());
465
466        self.store.save_pending_changes(self.changes).await?;
467
468        // Make the cache coherent with the database.
469        if let Some(account) = account {
470            *self.cache.account.lock().await = Some(account);
471        }
472
473        Ok(())
474    }
475}
476
477#[derive(Debug)]
478struct StoreInner {
479    identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
480    store: Arc<CryptoStoreWrapper>,
481
482    /// In-memory cache for the current crypto store.
483    ///
484    /// ⚠ Must remain private.
485    cache: Arc<RwLock<StoreCache>>,
486
487    verification_machine: VerificationMachine,
488
489    /// Static account data that never changes (and thus can be loaded once and
490    /// for all when creating the store).
491    static_account: StaticAccountData,
492}
493
494/// Error describing what went wrong when importing private cross signing keys
495/// or the key backup key.
496#[derive(Debug, Error)]
497pub enum SecretImportError {
498    /// The key that we tried to import was invalid.
499    #[error("Error while importing {name}: {error}")]
500    Key {
501        /// The name of the secret that was being imported.
502        name: SecretName,
503        /// The key error that occurred.
504        error: vodozemac::KeyError,
505    },
506    /// The public key of the imported private key doesn't match the public
507    /// key that was uploaded to the server.
508    #[error(
509        "Error while importing {name}: The public key of the imported private \
510            key doesn't match the public key that was uploaded to the server"
511    )]
512    MismatchedPublicKeys {
513        /// The name of the secret that was being imported.
514        name: SecretName,
515    },
516    /// The new version of the identity couldn't be stored.
517    #[error(transparent)]
518    Store(#[from] CryptoStoreError),
519}
520
521/// Error describing what went wrong when exporting a [`SecretsBundle`].
522///
523/// The [`SecretsBundle`] can only be exported if we have all cross-signing
524/// private keys in the store.
525#[derive(Debug, Error)]
526pub enum SecretsBundleExportError {
527    /// The store itself had an error.
528    #[error(transparent)]
529    Store(#[from] CryptoStoreError),
530    /// We're missing one or multiple cross-signing keys.
531    #[error("The store is missing one or multiple cross-signing keys")]
532    MissingCrossSigningKey(KeyUsage),
533    /// We're missing all cross-signing keys.
534    #[error("The store doesn't contain any cross-signing keys")]
535    MissingCrossSigningKeys,
536    /// We have a backup key stored, but we don't know the version of the
537    /// backup.
538    #[error("The store contains a backup key, but no backup version")]
539    MissingBackupVersion,
540}
541
542impl Store {
543    /// Create a new Store.
544    pub(crate) fn new(
545        account: StaticAccountData,
546        identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
547        store: Arc<CryptoStoreWrapper>,
548        verification_machine: VerificationMachine,
549    ) -> Self {
550        Self {
551            inner: Arc::new(StoreInner {
552                static_account: account,
553                identity,
554                store: store.clone(),
555                verification_machine,
556                cache: Arc::new(RwLock::new(StoreCache {
557                    store,
558                    tracked_users: Default::default(),
559                    loaded_tracked_users: Default::default(),
560                    account: Default::default(),
561                })),
562            }),
563        }
564    }
565
566    /// UserId associated with this store
567    pub(crate) fn user_id(&self) -> &UserId {
568        &self.inner.static_account.user_id
569    }
570
571    /// DeviceId associated with this store
572    pub(crate) fn device_id(&self) -> &DeviceId {
573        self.inner.verification_machine.own_device_id()
574    }
575
576    /// The static data for the account associated with this store.
577    pub(crate) fn static_account(&self) -> &StaticAccountData {
578        &self.inner.static_account
579    }
580
581    pub(crate) async fn cache(&self) -> Result<StoreCacheGuard> {
582        // TODO: (bnjbvr, #2624) If configured with a cross-process lock:
583        // - try to take the lock,
584        // - if acquired, look if another process touched the underlying storage,
585        // - if yes, reload everything; if no, return current cache
586        Ok(StoreCacheGuard { cache: self.inner.cache.clone().read_owned().await })
587    }
588
589    pub(crate) async fn transaction(&self) -> StoreTransaction {
590        StoreTransaction::new(self.clone()).await
591    }
592
593    // Note: bnjbvr lost against borrowck here. Ideally, the `F` parameter would
594    // take a `&StoreTransaction`, but callers didn't quite like that.
595    pub(crate) async fn with_transaction<
596        T,
597        Fut: Future<Output = Result<(StoreTransaction, T), crate::OlmError>>,
598        F: FnOnce(StoreTransaction) -> Fut,
599    >(
600        &self,
601        func: F,
602    ) -> Result<T, crate::OlmError> {
603        let tr = self.transaction().await;
604        let (tr, res) = func(tr).await?;
605        tr.commit().await?;
606        Ok(res)
607    }
608
609    #[cfg(test)]
610    /// test helper to reset the cross signing identity
611    pub(crate) async fn reset_cross_signing_identity(&self) {
612        self.inner.identity.lock().await.reset();
613    }
614
615    /// PrivateCrossSigningIdentity associated with this store
616    pub(crate) fn private_identity(&self) -> Arc<Mutex<PrivateCrossSigningIdentity>> {
617        self.inner.identity.clone()
618    }
619
620    /// Save the given Sessions to the store
621    pub(crate) async fn save_sessions(&self, sessions: &[Session]) -> Result<()> {
622        let changes = Changes { sessions: sessions.to_vec(), ..Default::default() };
623
624        self.save_changes(changes).await
625    }
626
627    pub(crate) async fn get_sessions(
628        &self,
629        sender_key: &str,
630    ) -> Result<Option<Arc<Mutex<Vec<Session>>>>> {
631        self.inner.store.get_sessions(sender_key).await
632    }
633
634    pub(crate) async fn save_changes(&self, changes: Changes) -> Result<()> {
635        self.inner.store.save_changes(changes).await
636    }
637
638    /// Given an `InboundGroupSession` which we have just received, see if we
639    /// have a matching session already in the store, and determine how to
640    /// handle it.
641    ///
642    /// If the store already has everything we can gather from the new session,
643    /// returns `None`. Otherwise, returns a merged session which should be
644    /// persisted to the store.
645    pub(crate) async fn merge_received_group_session(
646        &self,
647        session: InboundGroupSession,
648    ) -> Result<Option<InboundGroupSession>> {
649        let old_session = self
650            .inner
651            .store
652            .get_inbound_group_session(session.room_id(), session.session_id())
653            .await?;
654
655        // If there is no old session, just use the new session.
656        let Some(old_session) = old_session else {
657            info!("Received a new megolm room key");
658            return Ok(Some(session));
659        };
660
661        let index_comparison = session.compare_ratchet(&old_session).await;
662        let trust_level_comparison =
663            session.sender_data.compare_trust_level(&old_session.sender_data);
664
665        let result = match (index_comparison, trust_level_comparison) {
666            (SessionOrdering::Unconnected, _) => {
667                // If this happens, it means that we have two sessions purporting to have the
668                // same session id, but where the ratchets do not match up.
669                // In other words, someone is playing silly buggers.
670                warn!(
671                    "Received a group session with an ratchet that does not connect to the one in the store, discarding"
672                );
673                None
674            }
675
676            (SessionOrdering::Better, std::cmp::Ordering::Greater)
677            | (SessionOrdering::Better, std::cmp::Ordering::Equal)
678            | (SessionOrdering::Equal, std::cmp::Ordering::Greater) => {
679                // The new session is unambiguously better than what we have in the store.
680                info!(
681                    ?index_comparison,
682                    ?trust_level_comparison,
683                    "Received a megolm room key that we have a worse version of, merging"
684                );
685                Some(session)
686            }
687
688            (SessionOrdering::Worse, std::cmp::Ordering::Less)
689            | (SessionOrdering::Worse, std::cmp::Ordering::Equal)
690            | (SessionOrdering::Equal, std::cmp::Ordering::Less) => {
691                // The new session is unambiguously worse than the one we have in the store.
692                warn!(
693                    ?index_comparison,
694                    ?trust_level_comparison,
695                    "Received a megolm room key that we already have a better version \
696                     of, discarding"
697                );
698                None
699            }
700
701            (SessionOrdering::Equal, std::cmp::Ordering::Equal) => {
702                // The new session is the same as what we have.
703                info!("Received a megolm room key that we already have, discarding");
704                None
705            }
706
707            (SessionOrdering::Better, std::cmp::Ordering::Less) => {
708                // We need to take the ratchet from the new session, and the
709                // sender data from the old session.
710                info!("Upgrading a previously-received megolm session with new ratchet");
711                let result = old_session.with_ratchet(&session);
712                // We'll need to back it up again.
713                result.reset_backup_state();
714                Some(result)
715            }
716
717            (SessionOrdering::Worse, std::cmp::Ordering::Greater) => {
718                // We need to take the ratchet from the old session, and the
719                // sender data from the new session.
720                info!("Upgrading a previously-received megolm session with new sender data");
721                Some(session.with_ratchet(&old_session))
722            }
723        };
724
725        Ok(result)
726    }
727
728    #[cfg(test)]
729    /// Testing helper to allow to save only a set of devices
730    pub(crate) async fn save_device_data(&self, devices: &[DeviceData]) -> Result<()> {
731        use types::DeviceChanges;
732
733        let changes = Changes {
734            devices: DeviceChanges { changed: devices.to_vec(), ..Default::default() },
735            ..Default::default()
736        };
737
738        self.save_changes(changes).await
739    }
740
741    /// Convenience helper to persist an array of [`InboundGroupSession`]s.
742    pub(crate) async fn save_inbound_group_sessions(
743        &self,
744        sessions: &[InboundGroupSession],
745    ) -> Result<()> {
746        let changes = Changes { inbound_group_sessions: sessions.to_vec(), ..Default::default() };
747
748        self.save_changes(changes).await
749    }
750
751    /// Get the display name of our own device.
752    pub(crate) async fn device_display_name(&self) -> Result<Option<String>, CryptoStoreError> {
753        Ok(self
754            .inner
755            .store
756            .get_device(self.user_id(), self.device_id())
757            .await?
758            .and_then(|d| d.display_name().map(|d| d.to_owned())))
759    }
760
761    /// Get the device data for the given [`UserId`] and [`DeviceId`].
762    ///
763    /// *Note*: This method will include our own device which is always present
764    /// in the store.
765    pub(crate) async fn get_device_data(
766        &self,
767        user_id: &UserId,
768        device_id: &DeviceId,
769    ) -> Result<Option<DeviceData>> {
770        self.inner.store.get_device(user_id, device_id).await
771    }
772
773    /// Get the device data for the given [`UserId`] and [`DeviceId`].
774    ///
775    /// *Note*: This method will **not** include our own device.
776    ///
777    /// Use this method if you need a list of recipients for a given user, since
778    /// we don't want to encrypt for our own device, otherwise take a look at
779    /// the [`Store::get_device_data_for_user`] method.
780    pub(crate) async fn get_device_data_for_user_filtered(
781        &self,
782        user_id: &UserId,
783    ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
784        self.inner.store.get_user_devices(user_id).await.map(|mut d| {
785            if user_id == self.user_id() {
786                d.remove(self.device_id());
787            }
788            d
789        })
790    }
791
792    /// Get the [`DeviceData`] for all the devices a user has.
793    ///
794    /// *Note*: This method will include our own device which is always present
795    /// in the store.
796    ///
797    /// Use this method if you need to operate on or update all devices of a
798    /// user, otherwise take a look at the
799    /// [`Store::get_device_data_for_user_filtered`] method.
800    pub(crate) async fn get_device_data_for_user(
801        &self,
802        user_id: &UserId,
803    ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
804        self.inner.store.get_user_devices(user_id).await
805    }
806
807    /// Get a [`Device`] for the given user with the given
808    /// [`Curve25519PublicKey`] key.
809    ///
810    /// *Note*: This method will include our own device which is always present
811    /// in the store.
812    pub(crate) async fn get_device_from_curve_key(
813        &self,
814        user_id: &UserId,
815        curve_key: Curve25519PublicKey,
816    ) -> Result<Option<Device>> {
817        self.get_user_devices(user_id)
818            .await
819            .map(|d| d.devices().find(|d| d.curve25519_key() == Some(curve_key)))
820    }
821
822    /// Get all devices associated with the given [`UserId`].
823    ///
824    /// This method is more expensive than the
825    /// [`Store::get_device_data_for_user`] method, since a [`Device`]
826    /// requires the [`OwnUserIdentityData`] and the [`UserIdentityData`] of the
827    /// device owner to be fetched from the store as well.
828    ///
829    /// *Note*: This method will include our own device which is always present
830    /// in the store.
831    pub(crate) async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices> {
832        let devices = self.get_device_data_for_user(user_id).await?;
833
834        let own_identity = self
835            .inner
836            .store
837            .get_user_identity(self.user_id())
838            .await?
839            .and_then(|i| i.own().cloned());
840        let device_owner_identity = self.inner.store.get_user_identity(user_id).await?;
841
842        Ok(UserDevices {
843            inner: devices,
844            verification_machine: self.inner.verification_machine.clone(),
845            own_identity,
846            device_owner_identity,
847        })
848    }
849
850    /// Get a [`Device`] for the given user with the given [`DeviceId`].
851    ///
852    /// This method is more expensive than the [`Store::get_device_data`] method
853    /// since a [`Device`] requires the [`OwnUserIdentityData`] and the
854    /// [`UserIdentityData`] of the device owner to be fetched from the
855    /// store as well.
856    ///
857    /// *Note*: This method will include our own device which is always present
858    /// in the store.
859    pub(crate) async fn get_device(
860        &self,
861        user_id: &UserId,
862        device_id: &DeviceId,
863    ) -> Result<Option<Device>> {
864        if let Some(device_data) = self.inner.store.get_device(user_id, device_id).await? {
865            Ok(Some(self.wrap_device_data(device_data).await?))
866        } else {
867            Ok(None)
868        }
869    }
870
871    /// Create a new device using the supplied [`DeviceData`]. Normally we would
872    /// call [`Self::get_device`] to find an existing device inside this
873    /// store. Only call this if you have some existing DeviceData and want
874    /// to wrap it with the extra information provided by a [`Device`].
875    pub(crate) async fn wrap_device_data(&self, device_data: DeviceData) -> Result<Device> {
876        let own_identity = self
877            .inner
878            .store
879            .get_user_identity(self.user_id())
880            .await?
881            .and_then(|i| i.own().cloned());
882
883        let device_owner_identity =
884            self.inner.store.get_user_identity(device_data.user_id()).await?;
885
886        Ok(Device {
887            inner: device_data,
888            verification_machine: self.inner.verification_machine.clone(),
889            own_identity,
890            device_owner_identity,
891        })
892    }
893
894    ///  Get the Identity of `user_id`
895    pub(crate) async fn get_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
896        let own_identity = self
897            .inner
898            .store
899            .get_user_identity(self.user_id())
900            .await?
901            .and_then(as_variant!(UserIdentityData::Own));
902
903        Ok(self.inner.store.get_user_identity(user_id).await?.map(|i| {
904            UserIdentity::new(
905                self.clone(),
906                i,
907                self.inner.verification_machine.to_owned(),
908                own_identity,
909            )
910        }))
911    }
912
913    /// Try to export the secret with the given secret name.
914    ///
915    /// The exported secret will be encoded as unpadded base64. Returns `Null`
916    /// if the secret can't be found.
917    ///
918    /// # Arguments
919    ///
920    /// * `secret_name` - The name of the secret that should be exported.
921    pub async fn export_secret(
922        &self,
923        secret_name: &SecretName,
924    ) -> Result<Option<String>, CryptoStoreError> {
925        Ok(match secret_name {
926            SecretName::CrossSigningMasterKey
927            | SecretName::CrossSigningUserSigningKey
928            | SecretName::CrossSigningSelfSigningKey => {
929                self.inner.identity.lock().await.export_secret(secret_name).await
930            }
931            SecretName::RecoveryKey => {
932                if let Some(key) = self.load_backup_keys().await?.decryption_key {
933                    let exported = key.to_base64();
934                    Some(exported)
935                } else {
936                    None
937                }
938            }
939            name => {
940                warn!(secret = ?name, "Unknown secret was requested");
941                None
942            }
943        })
944    }
945
946    /// Export all the private cross signing keys we have.
947    ///
948    /// The export will contain the seed for the ed25519 keys as a unpadded
949    /// base64 encoded string.
950    ///
951    /// This method returns `None` if we don't have any private cross signing
952    /// keys.
953    pub async fn export_cross_signing_keys(
954        &self,
955    ) -> Result<Option<CrossSigningKeyExport>, CryptoStoreError> {
956        let master_key = self.export_secret(&SecretName::CrossSigningMasterKey).await?;
957        let self_signing_key = self.export_secret(&SecretName::CrossSigningSelfSigningKey).await?;
958        let user_signing_key = self.export_secret(&SecretName::CrossSigningUserSigningKey).await?;
959
960        Ok(if master_key.is_none() && self_signing_key.is_none() && user_signing_key.is_none() {
961            None
962        } else {
963            Some(CrossSigningKeyExport { master_key, self_signing_key, user_signing_key })
964        })
965    }
966
967    /// Import our private cross signing keys.
968    ///
969    /// The export needs to contain the seed for the Ed25519 keys as an unpadded
970    /// base64 encoded string.
971    pub async fn import_cross_signing_keys(
972        &self,
973        export: CrossSigningKeyExport,
974    ) -> Result<CrossSigningStatus, SecretImportError> {
975        if let Some(public_identity) =
976            self.get_identity(self.user_id()).await?.and_then(|i| i.own())
977        {
978            let identity = self.inner.identity.lock().await;
979
980            identity
981                .import_secrets(
982                    public_identity.to_owned(),
983                    export.master_key.as_deref(),
984                    export.self_signing_key.as_deref(),
985                    export.user_signing_key.as_deref(),
986                )
987                .await?;
988
989            let status = identity.status().await;
990
991            let diff = identity.get_public_identity_diff(&public_identity.inner).await;
992
993            let mut changes =
994                Changes { private_identity: Some(identity.clone()), ..Default::default() };
995
996            if diff.none_differ() {
997                public_identity.mark_as_verified();
998                changes.identities.changed.push(UserIdentityData::Own(public_identity.inner));
999            }
1000
1001            info!(?status, "Successfully imported the private cross-signing keys");
1002
1003            self.save_changes(changes).await?;
1004        } else {
1005            warn!(
1006                "No public identity found while importing cross-signing keys, \
1007                 a /keys/query needs to be done"
1008            );
1009        }
1010
1011        Ok(self.inner.identity.lock().await.status().await)
1012    }
1013
1014    /// Export all the secrets we have in the store into a [`SecretsBundle`].
1015    ///
1016    /// This method will export all the private cross-signing keys and, if
1017    /// available, the private part of a backup key and its accompanying
1018    /// version.
1019    ///
1020    /// The method will fail if we don't have all three private cross-signing
1021    /// keys available.
1022    ///
1023    /// **Warning**: Only export this and share it with a trusted recipient,
1024    /// i.e. if an existing device is sharing this with a new device.
1025    pub async fn export_secrets_bundle(&self) -> Result<SecretsBundle, SecretsBundleExportError> {
1026        let Some(cross_signing) = self.export_cross_signing_keys().await? else {
1027            return Err(SecretsBundleExportError::MissingCrossSigningKeys);
1028        };
1029
1030        let Some(master_key) = cross_signing.master_key.clone() else {
1031            return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::Master));
1032        };
1033
1034        let Some(user_signing_key) = cross_signing.user_signing_key.clone() else {
1035            return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::UserSigning));
1036        };
1037
1038        let Some(self_signing_key) = cross_signing.self_signing_key.clone() else {
1039            return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::SelfSigning));
1040        };
1041
1042        let backup_keys = self.load_backup_keys().await?;
1043
1044        let backup = if let Some(key) = backup_keys.decryption_key {
1045            if let Some(backup_version) = backup_keys.backup_version {
1046                Some(BackupSecrets::MegolmBackupV1Curve25519AesSha2(
1047                    MegolmBackupV1Curve25519AesSha2Secrets { key, backup_version },
1048                ))
1049            } else {
1050                return Err(SecretsBundleExportError::MissingBackupVersion);
1051            }
1052        } else {
1053            None
1054        };
1055
1056        Ok(SecretsBundle {
1057            cross_signing: CrossSigningSecrets { master_key, user_signing_key, self_signing_key },
1058            backup,
1059        })
1060    }
1061
1062    /// Import and persists secrets from a [`SecretsBundle`].
1063    ///
1064    /// This method will import all the private cross-signing keys and, if
1065    /// available, the private part of a backup key and its accompanying
1066    /// version into the store.
1067    ///
1068    /// **Warning**: Only import this from a trusted source, i.e. if an existing
1069    /// device is sharing this with a new device. The imported cross-signing
1070    /// keys will create a [`OwnUserIdentity`] and mark it as verified.
1071    ///
1072    /// The backup key will be persisted in the store and can be enabled using
1073    /// the [`BackupMachine`].
1074    pub async fn import_secrets_bundle(
1075        &self,
1076        bundle: &SecretsBundle,
1077    ) -> Result<(), SecretImportError> {
1078        let mut changes = Changes::default();
1079
1080        if let Some(backup_bundle) = &bundle.backup {
1081            match backup_bundle {
1082                BackupSecrets::MegolmBackupV1Curve25519AesSha2(bundle) => {
1083                    changes.backup_decryption_key = Some(bundle.key.clone());
1084                    changes.backup_version = Some(bundle.backup_version.clone());
1085                }
1086            }
1087        }
1088
1089        let identity = self.inner.identity.lock().await;
1090
1091        identity
1092            .import_secrets_unchecked(
1093                Some(&bundle.cross_signing.master_key),
1094                Some(&bundle.cross_signing.self_signing_key),
1095                Some(&bundle.cross_signing.user_signing_key),
1096            )
1097            .await?;
1098
1099        let public_identity = identity.to_public_identity().await.expect(
1100            "We should be able to create a new public identity since we just imported \
1101             all the private cross-signing keys",
1102        );
1103
1104        changes.private_identity = Some(identity.clone());
1105        changes.identities.new.push(UserIdentityData::Own(public_identity));
1106
1107        Ok(self.save_changes(changes).await?)
1108    }
1109
1110    /// Import the given `secret` named `secret_name` into the keystore.
1111    pub async fn import_secret(&self, secret: &GossippedSecret) -> Result<(), SecretImportError> {
1112        match &secret.secret_name {
1113            SecretName::CrossSigningMasterKey
1114            | SecretName::CrossSigningUserSigningKey
1115            | SecretName::CrossSigningSelfSigningKey => {
1116                if let Some(public_identity) =
1117                    self.get_identity(self.user_id()).await?.and_then(|i| i.own())
1118                {
1119                    let identity = self.inner.identity.lock().await;
1120
1121                    identity
1122                        .import_secret(
1123                            public_identity,
1124                            &secret.secret_name,
1125                            &secret.event.content.secret,
1126                        )
1127                        .await?;
1128                    info!(
1129                        secret_name = ?secret.secret_name,
1130                        "Successfully imported a private cross signing key"
1131                    );
1132
1133                    let changes =
1134                        Changes { private_identity: Some(identity.clone()), ..Default::default() };
1135
1136                    self.save_changes(changes).await?;
1137                }
1138            }
1139            SecretName::RecoveryKey => {
1140                // We don't import the decryption key here since we'll want to
1141                // check if the public key matches to the latest version on the
1142                // server. We instead put the secret into a secret inbox where
1143                // it will stay until it either gets overwritten
1144                // or the user accepts the secret.
1145            }
1146            name => {
1147                warn!(secret = ?name, "Tried to import an unknown secret");
1148            }
1149        }
1150
1151        Ok(())
1152    }
1153
1154    /// Check whether there is a global flag to only encrypt messages for
1155    /// trusted devices or for everyone.
1156    pub async fn get_only_allow_trusted_devices(&self) -> Result<bool> {
1157        let value = self.get_value("only_allow_trusted_devices").await?.unwrap_or_default();
1158        Ok(value)
1159    }
1160
1161    /// Set global flag whether to encrypt messages for untrusted devices, or
1162    /// whether they should be excluded from the conversation.
1163    pub async fn set_only_allow_trusted_devices(
1164        &self,
1165        block_untrusted_devices: bool,
1166    ) -> Result<()> {
1167        self.set_value("only_allow_trusted_devices", &block_untrusted_devices).await
1168    }
1169
1170    /// Get custom stored value associated with a key
1171    pub async fn get_value<T: DeserializeOwned>(&self, key: &str) -> Result<Option<T>> {
1172        let Some(value) = self.get_custom_value(key).await? else {
1173            return Ok(None);
1174        };
1175        let deserialized = self.deserialize_value(&value)?;
1176        Ok(Some(deserialized))
1177    }
1178
1179    /// Store custom value associated with a key
1180    pub async fn set_value(&self, key: &str, value: &impl Serialize) -> Result<()> {
1181        let serialized = self.serialize_value(value)?;
1182        self.set_custom_value(key, serialized).await?;
1183        Ok(())
1184    }
1185
1186    fn serialize_value(&self, value: &impl Serialize) -> Result<Vec<u8>> {
1187        let serialized =
1188            rmp_serde::to_vec_named(value).map_err(|x| CryptoStoreError::Backend(x.into()))?;
1189        Ok(serialized)
1190    }
1191
1192    fn deserialize_value<T: DeserializeOwned>(&self, value: &[u8]) -> Result<T> {
1193        let deserialized =
1194            rmp_serde::from_slice(value).map_err(|e| CryptoStoreError::Backend(e.into()))?;
1195        Ok(deserialized)
1196    }
1197
1198    /// Receive notifications of room keys being received as a [`Stream`].
1199    ///
1200    /// Each time a room key is updated in any way, an update will be sent to
1201    /// the stream. Updates that happen at the same time are batched into a
1202    /// [`Vec`].
1203    ///
1204    /// If the reader of the stream lags too far behind an error will be sent to
1205    /// the reader.
1206    ///
1207    /// The stream will terminate once all references to the underlying
1208    /// `CryptoStoreWrapper` are dropped.
1209    pub fn room_keys_received_stream(
1210        &self,
1211    ) -> impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<> {
1212        self.inner.store.room_keys_received_stream()
1213    }
1214
1215    /// Receive notifications of received `m.room_key.withheld` messages.
1216    ///
1217    /// Each time an `m.room_key.withheld` is received and stored, an update
1218    /// will be sent to the stream. Updates that happen at the same time are
1219    /// batched into a [`Vec`].
1220    ///
1221    /// If the reader of the stream lags too far behind, a warning will be
1222    /// logged and items will be dropped.
1223    pub fn room_keys_withheld_received_stream(
1224        &self,
1225    ) -> impl Stream<Item = Vec<RoomKeyWithheldInfo>> + use<> {
1226        self.inner.store.room_keys_withheld_received_stream()
1227    }
1228
1229    /// Returns a stream of user identity updates, allowing users to listen for
1230    /// notifications about new or changed user identities.
1231    ///
1232    /// The stream produced by this method emits updates whenever a new user
1233    /// identity is discovered or when an existing identities information is
1234    /// changed. Users can subscribe to this stream and receive updates in
1235    /// real-time.
1236    ///
1237    /// Caution: the returned stream will never terminate, and it holds a
1238    /// reference to the [`CryptoStore`]. Listeners should be careful to avoid
1239    /// resource leaks.
1240    ///
1241    /// # Examples
1242    ///
1243    /// ```no_run
1244    /// # use matrix_sdk_crypto::OlmMachine;
1245    /// # use ruma::{device_id, user_id};
1246    /// # use futures_util::{pin_mut, StreamExt};
1247    /// # let machine: OlmMachine = unimplemented!();
1248    /// # futures_executor::block_on(async {
1249    /// let identities_stream = machine.store().user_identities_stream();
1250    /// pin_mut!(identities_stream);
1251    ///
1252    /// for identity_updates in identities_stream.next().await {
1253    ///     for (_, identity) in identity_updates.new {
1254    ///         println!("A new identity has been added {}", identity.user_id());
1255    ///     }
1256    /// }
1257    /// # });
1258    /// ```
1259    pub fn user_identities_stream(&self) -> impl Stream<Item = IdentityUpdates> + use<> {
1260        let verification_machine = self.inner.verification_machine.to_owned();
1261
1262        let this = self.clone();
1263        self.inner.store.identities_stream().map(move |(own_identity, identities, _)| {
1264            let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
1265
1266            let map_identity = |(user_id, identity)| {
1267                (
1268                    user_id,
1269                    UserIdentity::new(
1270                        this.clone(),
1271                        identity,
1272                        verification_machine.to_owned(),
1273                        own_identity.to_owned(),
1274                    ),
1275                )
1276            };
1277
1278            let new = new_identities.into_iter().map(map_identity).collect();
1279            let changed = changed_identities.into_iter().map(map_identity).collect();
1280            let unchanged = unchanged_identities.into_iter().map(map_identity).collect();
1281
1282            IdentityUpdates { new, changed, unchanged }
1283        })
1284    }
1285
1286    /// Returns a stream of device updates, allowing users to listen for
1287    /// notifications about new or changed devices.
1288    ///
1289    /// The stream produced by this method emits updates whenever a new device
1290    /// is discovered or when an existing device's information is changed. Users
1291    /// can subscribe to this stream and receive updates in real-time.
1292    ///
1293    /// Caution: the returned stream will never terminate, and it holds a
1294    /// reference to the [`CryptoStore`]. Listeners should be careful to avoid
1295    /// resource leaks.
1296    ///
1297    /// # Examples
1298    ///
1299    /// ```no_run
1300    /// # use matrix_sdk_crypto::OlmMachine;
1301    /// # use ruma::{device_id, user_id};
1302    /// # use futures_util::{pin_mut, StreamExt};
1303    /// # let machine: OlmMachine = unimplemented!();
1304    /// # futures_executor::block_on(async {
1305    /// let devices_stream = machine.store().devices_stream();
1306    /// pin_mut!(devices_stream);
1307    ///
1308    /// for device_updates in devices_stream.next().await {
1309    ///     if let Some(user_devices) = device_updates.new.get(machine.user_id()) {
1310    ///         for device in user_devices.values() {
1311    ///             println!("A new device has been added {}", device.device_id());
1312    ///         }
1313    ///     }
1314    /// }
1315    /// # });
1316    /// ```
1317    pub fn devices_stream(&self) -> impl Stream<Item = DeviceUpdates> + use<> {
1318        let verification_machine = self.inner.verification_machine.to_owned();
1319
1320        self.inner.store.identities_stream().map(move |(own_identity, identities, devices)| {
1321            collect_device_updates(
1322                verification_machine.to_owned(),
1323                own_identity,
1324                identities,
1325                devices,
1326            )
1327        })
1328    }
1329
1330    /// Returns a [`Stream`] of user identity and device updates
1331    ///
1332    /// The stream returned by this method returns the same data as
1333    /// [`Store::user_identities_stream`] and [`Store::devices_stream`] but does
1334    /// not include references to the `VerificationMachine`. It is therefore a
1335    /// lower-level view on that data.
1336    ///
1337    /// The stream will terminate once all references to the underlying
1338    /// `CryptoStoreWrapper` are dropped.
1339    pub fn identities_stream_raw(
1340        &self,
1341    ) -> impl Stream<Item = (IdentityChanges, DeviceChanges)> + use<> {
1342        self.inner.store.identities_stream().map(|(_, identities, devices)| (identities, devices))
1343    }
1344
1345    /// Creates a [`CrossProcessLock`] for this store, that will contain the
1346    /// given key and value when hold.
1347    pub fn create_store_lock(
1348        &self,
1349        lock_key: String,
1350        config: CrossProcessLockConfig,
1351    ) -> CrossProcessLock<LockableCryptoStore> {
1352        self.inner.store.create_store_lock(lock_key, config)
1353    }
1354
1355    /// Receive notifications of gossipped secrets being received and stored in
1356    /// the secret inbox as a [`Stream`].
1357    ///
1358    /// The gossipped secrets are received using the `m.secret.send` event type
1359    /// and are guaranteed to have been received over a 1-to-1 Olm
1360    /// [`Session`] from a verified [`Device`].
1361    ///
1362    /// The [`GossippedSecret`] can also be later found in the secret inbox and
1363    /// retrieved using the [`CryptoStore::get_secrets_from_inbox()`] method.
1364    ///
1365    /// After a suitable secret of a certain type has been found it can be
1366    /// removed from the store
1367    /// using the [`CryptoStore::delete_secrets_from_inbox()`] method.
1368    ///
1369    /// The only secret this will currently broadcast is the
1370    /// `m.megolm_backup.v1`.
1371    ///
1372    /// If the reader of the stream lags too far behind, a warning will be
1373    /// logged and items will be dropped.
1374    ///
1375    /// # Examples
1376    ///
1377    /// ```no_run
1378    /// # use matrix_sdk_crypto::OlmMachine;
1379    /// # use ruma::{device_id, owned_user_id};
1380    /// # use futures_util::{pin_mut, StreamExt};
1381    /// # let alice = owned_user_id!("@alice:example.org");
1382    /// # futures_executor::block_on(async {
1383    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1384    ///
1385    /// let secret_stream = machine.store().secrets_stream();
1386    /// pin_mut!(secret_stream);
1387    ///
1388    /// for secret in secret_stream.next().await {
1389    ///     // Accept the secret if it's valid, then delete all the secrets of this type.
1390    ///     machine.store().delete_secrets_from_inbox(&secret.secret_name);
1391    /// }
1392    /// # });
1393    /// ```
1394    pub fn secrets_stream(&self) -> impl Stream<Item = GossippedSecret> + use<> {
1395        self.inner.store.secrets_stream()
1396    }
1397
1398    /// Receive notifications of historic room key bundles as a [`Stream`].
1399    ///
1400    /// Historic room key bundles are defined in [MSC4268](https://github.com/matrix-org/matrix-spec-proposals/pull/4268).
1401    ///
1402    /// Each time a historic room key bundle was received, an update will be
1403    /// sent to the stream. This stream can be used to accept historic room key
1404    /// bundles that arrive out of order, i.e. the bundle arrives after the
1405    /// user has already accepted a room invitation.
1406    ///
1407    /// # Examples
1408    ///
1409    /// ```no_run
1410    /// # use matrix_sdk_crypto::{
1411    /// #    OlmMachine,
1412    /// #    store::types::StoredRoomKeyBundleData,
1413    /// #    types::room_history::RoomKeyBundle
1414    /// # };
1415    /// # use ruma::{device_id, owned_user_id};
1416    /// # use futures_util::{pin_mut, StreamExt};
1417    /// # let alice = owned_user_id!("@alice:example.org");
1418    /// # async {
1419    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1420    /// let bundle_stream = machine.store().historic_room_key_stream();
1421    /// pin_mut!(bundle_stream);
1422    ///
1423    /// while let Some(bundle_info) = bundle_stream.next().await {
1424    ///     // Try to find the bundle content in the store and if it's valid accept it.
1425    ///     if let Some(bundle_data) = machine.store().get_received_room_key_bundle_data(&bundle_info.room_id, &bundle_info.sender).await? {
1426    ///         // Download the bundle now and import it.
1427    ///         let bundle: RoomKeyBundle = todo!("Download the bundle");
1428    ///         machine.store().receive_room_key_bundle(
1429    ///             &bundle_data,
1430    ///             bundle,
1431    ///             |_, _| {},
1432    ///         ).await?;
1433    ///     }
1434    /// }
1435    /// # anyhow::Ok(()) };
1436    /// ```
1437    pub fn historic_room_key_stream(&self) -> impl Stream<Item = RoomKeyBundleInfo> + use<> {
1438        self.inner.store.historic_room_key_stream()
1439    }
1440
1441    /// Import the given room keys into the store.
1442    ///
1443    /// # Arguments
1444    ///
1445    /// * `exported_keys` - The keys to be imported.
1446    /// * `from_backup_version` - If the keys came from key backup, the key
1447    ///   backup version. This will cause the keys to be marked as already
1448    ///   backed up, and therefore not requiring another backup.
1449    /// * `progress_listener` - Callback which will be called after each key is
1450    ///   processed. Called with arguments `(processed, total)` where
1451    ///   `processed` is the number of keys processed so far, and `total` is the
1452    ///   total number of keys (i.e., `exported_keys.len()`).
1453    pub async fn import_room_keys(
1454        &self,
1455        exported_keys: Vec<ExportedRoomKey>,
1456        from_backup_version: Option<&str>,
1457        progress_listener: impl Fn(usize, usize),
1458    ) -> Result<RoomKeyImportResult> {
1459        let exported_keys = exported_keys.iter().filter_map(|key| {
1460            key.try_into()
1461                .map_err(|e| {
1462                    warn!(
1463                        sender_key = key.sender_key().to_base64(),
1464                        room_id = ?key.room_id(),
1465                        session_id = key.session_id(),
1466                        error = ?e,
1467                        "Couldn't import a room key from a file export."
1468                    );
1469                })
1470                .ok()
1471        });
1472        self.import_sessions_impl(exported_keys, from_backup_version, progress_listener).await
1473    }
1474
1475    /// Import the given room keys into our store.
1476    ///
1477    /// # Arguments
1478    ///
1479    /// * `exported_keys` - A list of previously exported keys that should be
1480    ///   imported into our store. If we already have a better version of a key
1481    ///   the key will *not* be imported.
1482    ///
1483    /// Returns a tuple of numbers that represent the number of sessions that
1484    /// were imported and the total number of sessions that were found in the
1485    /// key export.
1486    ///
1487    /// # Examples
1488    ///
1489    /// ```no_run
1490    /// # use std::io::Cursor;
1491    /// # use matrix_sdk_crypto::{OlmMachine, decrypt_room_key_export};
1492    /// # use ruma::{device_id, user_id};
1493    /// # let alice = user_id!("@alice:example.org");
1494    /// # async {
1495    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1496    /// # let export = Cursor::new("".to_owned());
1497    /// let exported_keys = decrypt_room_key_export(export, "1234").unwrap();
1498    /// machine.store().import_exported_room_keys(exported_keys, |_, _| {}).await.unwrap();
1499    /// # };
1500    /// ```
1501    pub async fn import_exported_room_keys(
1502        &self,
1503        exported_keys: Vec<ExportedRoomKey>,
1504        progress_listener: impl Fn(usize, usize),
1505    ) -> Result<RoomKeyImportResult> {
1506        self.import_room_keys(exported_keys, None, progress_listener).await
1507    }
1508
1509    async fn import_sessions_impl(
1510        &self,
1511        sessions: impl Iterator<Item = InboundGroupSession>,
1512        from_backup_version: Option<&str>,
1513        progress_listener: impl Fn(usize, usize),
1514    ) -> Result<RoomKeyImportResult> {
1515        let sessions: Vec<_> = sessions.collect();
1516        let mut imported_sessions = Vec::new();
1517
1518        let total_count = sessions.len();
1519        let mut keys = BTreeMap::new();
1520
1521        for (i, session) in sessions.into_iter().enumerate() {
1522            // Only import the session if we didn't have this session or
1523            // if it's a better version of the same session.
1524            if let Some(merged) = self.merge_received_group_session(session).await? {
1525                if from_backup_version.is_some() {
1526                    merged.mark_as_backed_up();
1527                }
1528
1529                keys.entry(merged.room_id().to_owned())
1530                    .or_insert_with(BTreeMap::new)
1531                    .entry(merged.sender_key().to_base64())
1532                    .or_insert_with(BTreeSet::new)
1533                    .insert(merged.session_id().to_owned());
1534
1535                imported_sessions.push(merged);
1536            }
1537
1538            progress_listener(i, total_count);
1539        }
1540
1541        let imported_count = imported_sessions.len();
1542
1543        self.inner
1544            .store
1545            .save_inbound_group_sessions(imported_sessions, from_backup_version)
1546            .await?;
1547
1548        info!(total_count, imported_count, room_keys = ?keys, "Successfully imported room keys");
1549
1550        Ok(RoomKeyImportResult::new(imported_count, total_count, keys))
1551    }
1552
1553    pub(crate) fn crypto_store(&self) -> Arc<CryptoStoreWrapper> {
1554        self.inner.store.clone()
1555    }
1556
1557    /// Export the keys that match the given predicate.
1558    ///
1559    /// # Arguments
1560    ///
1561    /// * `predicate` - A closure that will be called for every known
1562    ///   `InboundGroupSession`, which represents a room key. If the closure
1563    ///   returns `true` the `InboundGroupSession` will be included in the
1564    ///   export, if the closure returns `false` it will not be included.
1565    ///
1566    /// # Examples
1567    ///
1568    /// ```no_run
1569    /// # use matrix_sdk_crypto::{OlmMachine, encrypt_room_key_export};
1570    /// # use ruma::{device_id, user_id, room_id};
1571    /// # let alice = user_id!("@alice:example.org");
1572    /// # async {
1573    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1574    /// let room_id = room_id!("!test:localhost");
1575    /// let exported_keys = machine.store().export_room_keys(|s| s.room_id() == room_id).await.unwrap();
1576    /// let encrypted_export = encrypt_room_key_export(&exported_keys, "1234", 1);
1577    /// # };
1578    /// ```
1579    pub async fn export_room_keys(
1580        &self,
1581        predicate: impl FnMut(&InboundGroupSession) -> bool,
1582    ) -> Result<Vec<ExportedRoomKey>> {
1583        let mut exported = Vec::new();
1584
1585        let mut sessions = self.get_inbound_group_sessions().await?;
1586        sessions.retain(predicate);
1587
1588        for session in sessions {
1589            let export = session.export().await;
1590            exported.push(export);
1591        }
1592
1593        Ok(exported)
1594    }
1595
1596    /// Export room keys matching a predicate, providing them as an async
1597    /// `Stream`.
1598    ///
1599    /// # Arguments
1600    ///
1601    /// * `predicate` - A closure that will be called for every known
1602    ///   `InboundGroupSession`, which represents a room key. If the closure
1603    ///   returns `true` the `InboundGroupSession` will be included in the
1604    ///   export, if the closure returns `false` it will not be included.
1605    ///
1606    /// # Examples
1607    ///
1608    /// ```no_run
1609    /// use std::pin::pin;
1610    ///
1611    /// use matrix_sdk_crypto::{OlmMachine, olm::ExportedRoomKey};
1612    /// use ruma::{device_id, room_id, user_id};
1613    /// use tokio_stream::StreamExt;
1614    /// # async {
1615    /// let alice = user_id!("@alice:example.org");
1616    /// let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1617    /// let room_id = room_id!("!test:localhost");
1618    /// let mut keys = pin!(
1619    ///     machine
1620    ///         .store()
1621    ///         .export_room_keys_stream(|s| s.room_id() == room_id)
1622    ///         .await
1623    ///         .unwrap()
1624    /// );
1625    /// while let Some(key) = keys.next().await {
1626    ///     println!("{}", key.room_id);
1627    /// }
1628    /// # };
1629    /// ```
1630    pub async fn export_room_keys_stream(
1631        &self,
1632        predicate: impl FnMut(&InboundGroupSession) -> bool,
1633    ) -> Result<impl Stream<Item = ExportedRoomKey>> {
1634        // TODO: if/when there is a get_inbound_group_sessions_stream, use that here.
1635        let sessions = self.get_inbound_group_sessions().await?;
1636        Ok(futures_util::stream::iter(sessions.into_iter().filter(predicate))
1637            .then(|session| async move { session.export().await }))
1638    }
1639
1640    /// Assemble a room key bundle for sharing encrypted history, as per
1641    /// [MSC4268].
1642    ///
1643    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
1644    pub async fn build_room_key_bundle(
1645        &self,
1646        room_id: &RoomId,
1647    ) -> std::result::Result<RoomKeyBundle, CryptoStoreError> {
1648        let sessions = self.get_inbound_group_sessions_by_room_id(room_id).await?;
1649
1650        let mut bundle = RoomKeyBundle::default();
1651        for session in sessions {
1652            if session.shared_history() {
1653                bundle.room_keys.push(session.export().await.into());
1654            } else {
1655                bundle.withheld.push(RoomKeyWithheldContent::new(
1656                    session.algorithm().to_owned(),
1657                    WithheldCode::HistoryNotShared,
1658                    session.room_id().to_owned(),
1659                    session.session_id().to_owned(),
1660                    session.sender_key().to_owned(),
1661                    self.device_id().to_owned(),
1662                ));
1663            }
1664        }
1665
1666        // If we received a key bundle ourselves, in which one or more sessions was
1667        // marked as "history not shared", pass that on to the new user.
1668        let withhelds = self.get_withheld_sessions_by_room_id(room_id).await?;
1669        for withheld in withhelds {
1670            if withheld.content.withheld_code() == WithheldCode::HistoryNotShared {
1671                bundle.withheld.push(withheld.content);
1672            }
1673        }
1674
1675        Ok(bundle)
1676    }
1677
1678    /// Import the contents of a downloaded and decrypted [MSC4268] key bundle.
1679    ///
1680    /// # Arguments
1681    ///
1682    /// * `bundle_info` - The [`StoredRoomKeyBundleData`] of the bundle that is
1683    ///   being received.
1684    /// * `bundle` - The decrypted and deserialized bundle itself.
1685    ///
1686    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
1687    #[instrument(skip(self, bundle, progress_listener), fields(bundle_size = bundle.room_keys.len(), sender_data))]
1688    pub async fn receive_room_key_bundle(
1689        &self,
1690        bundle_info: &StoredRoomKeyBundleData,
1691        bundle: RoomKeyBundle,
1692        progress_listener: impl Fn(usize, usize),
1693    ) -> Result<(), CryptoStoreError> {
1694        let sender_data = if bundle_info.sender_data.should_recalculate() {
1695            let device = self
1696                .get_device_from_curve_key(&bundle_info.sender_user, bundle_info.sender_key)
1697                .await?;
1698
1699            device
1700                .as_ref()
1701                .map(SenderData::from_device)
1702                .unwrap_or_else(|| bundle_info.sender_data.clone())
1703        } else {
1704            bundle_info.sender_data.clone()
1705        };
1706
1707        tracing::Span::current().record("sender_data", tracing::field::debug(&sender_data));
1708
1709        // The sender's device must be either `SenderData::SenderUnverified` (i.e.,
1710        // TOFU-trusted) or `SenderData::SenderVerified` (i.e., fully verified
1711        // via user verification and cross-signing).
1712        let Ok(forwarder_data) = (&sender_data).try_into() else {
1713            warn!(
1714                "Not accepting a historic room key bundle due to insufficient trust in the sender"
1715            );
1716            return Ok(());
1717        };
1718
1719        self.import_room_key_bundle_sessions(
1720            bundle_info,
1721            &bundle,
1722            &forwarder_data,
1723            progress_listener,
1724        )
1725        .await?;
1726        self.import_room_key_bundle_withheld_info(bundle_info, &bundle).await?;
1727
1728        Ok(())
1729    }
1730
1731    async fn import_room_key_bundle_sessions(
1732        &self,
1733        bundle_info: &StoredRoomKeyBundleData,
1734        bundle: &RoomKeyBundle,
1735        forwarder_data: &ForwarderData,
1736        progress_listener: impl Fn(usize, usize),
1737    ) -> Result<(), CryptoStoreError> {
1738        let (good, bad): (Vec<_>, Vec<_>) = bundle.room_keys.iter().partition_map(|key| {
1739            if key.room_id != bundle_info.bundle_data.room_id {
1740                trace!("Ignoring key for incorrect room {} in bundle", key.room_id);
1741                Either::Right(key)
1742            } else {
1743                Either::Left(key)
1744            }
1745        });
1746
1747        match (bad.is_empty(), good.is_empty()) {
1748            // Case 1: Completely empty bundle.
1749            (true, true) => {
1750                warn!("Received a completely empty room key bundle");
1751            }
1752
1753            // Case 2: A bundle for the wrong room.
1754            (false, true) => {
1755                let bad_keys: Vec<_> =
1756                    bad.iter().map(|&key| (&key.room_id, &key.session_id)).collect();
1757
1758                warn!(
1759                    ?bad_keys,
1760                    "Received a room key bundle for the wrong room, ignoring all room keys from the bundle"
1761                );
1762            }
1763
1764            // Case 3: A bundle containing useful room keys.
1765            (_, false) => {
1766                // We have at least some good keys, if we also have some bad ones let's
1767                // mention that here.
1768                if !bad.is_empty() {
1769                    warn!(
1770                        bad_key_count = bad.len(),
1771                        "The room key bundle contained some room keys \
1772                         that were meant for a different room"
1773                    );
1774                }
1775
1776                let keys = good.iter().filter_map(|key| {
1777                    key.try_into_inbound_group_session(forwarder_data)
1778                        .map_err(|e| {
1779                            warn!(
1780                                sender_key = ?key.sender_key().to_base64(),
1781                                room_id = ?key.room_id(),
1782                                session_id = key.session_id(),
1783                                error = ?e,
1784                                "Couldn't import a room key from a key bundle."
1785                            );
1786                        })
1787                        .ok()
1788                });
1789
1790                self.import_sessions_impl(keys, None, progress_listener).await?;
1791            }
1792        }
1793
1794        Ok(())
1795    }
1796
1797    async fn import_room_key_bundle_withheld_info(
1798        &self,
1799        bundle_info: &StoredRoomKeyBundleData,
1800        bundle: &RoomKeyBundle,
1801    ) -> Result<(), CryptoStoreError> {
1802        let mut session_id_to_withheld_code_map = BTreeMap::new();
1803
1804        let mut changes = Changes::default();
1805        for withheld in &bundle.withheld {
1806            let (room_id, session_id) = match withheld {
1807                RoomKeyWithheldContent::MegolmV1AesSha2(c) => match (c.room_id(), c.session_id()) {
1808                    (Some(room_id), Some(session_id)) => (room_id, session_id),
1809                    _ => continue,
1810                },
1811                #[cfg(feature = "experimental-algorithms")]
1812                RoomKeyWithheldContent::MegolmV2AesSha2(c) => match (c.room_id(), c.session_id()) {
1813                    (Some(room_id), Some(session_id)) => (room_id, session_id),
1814                    _ => continue,
1815                },
1816                RoomKeyWithheldContent::Unknown(_) => continue,
1817            };
1818
1819            if room_id != bundle_info.bundle_data.room_id {
1820                trace!("Ignoring withheld info for incorrect room {} in bundle", room_id);
1821                continue;
1822            }
1823
1824            changes.withheld_session_info.entry(room_id.to_owned()).or_default().insert(
1825                session_id.to_owned(),
1826                RoomKeyWithheldEntry {
1827                    sender: bundle_info.sender_user.clone(),
1828                    content: withheld.to_owned(),
1829                },
1830            );
1831            session_id_to_withheld_code_map.insert(session_id, withheld.withheld_code());
1832        }
1833
1834        self.save_changes(changes).await?;
1835
1836        info!(
1837            room_id = ?bundle_info.bundle_data.room_id,
1838            ?session_id_to_withheld_code_map,
1839            "Successfully imported withheld info from room key bundle",
1840        );
1841
1842        Ok(())
1843    }
1844
1845    /// Store the fact that we have accepted an invite for a given room on this
1846    /// client, so should accept an [MSC4268] key bundle if one arrives
1847    /// soon.
1848    ///
1849    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
1850    pub async fn store_room_pending_key_bundle(
1851        &self,
1852        room_id: &RoomId,
1853        inviter: &UserId,
1854    ) -> Result<(), CryptoStoreError> {
1855        let invite_accepted_at = MilliSecondsSinceUnixEpoch::now();
1856        self.save_changes(Changes {
1857            rooms_pending_key_bundle: HashMap::from([(
1858                room_id.to_owned(),
1859                Some(RoomPendingKeyBundleDetails {
1860                    room_id: room_id.to_owned(),
1861                    invite_accepted_at,
1862                    inviter: inviter.to_owned(),
1863                }),
1864            )]),
1865            ..Default::default()
1866        })
1867        .await?;
1868
1869        Ok(())
1870    }
1871
1872    /// Clear the record of accepting an invite for a specific room.
1873    ///
1874    /// The counterpart of [`Store::store_room_pending_key_bundle`].
1875    pub async fn clear_room_pending_key_bundle(&self, room_id: &RoomId) -> Result<()> {
1876        self.save_changes(Changes {
1877            rooms_pending_key_bundle: HashMap::from([(room_id.to_owned(), None)]),
1878            ..Default::default()
1879        })
1880        .await
1881    }
1882}
1883
1884impl Deref for Store {
1885    type Target = DynCryptoStore;
1886
1887    fn deref(&self) -> &Self::Target {
1888        self.inner.store.deref().deref()
1889    }
1890}
1891
1892/// A crypto store that implements primitives for cross-process locking.
1893#[derive(Clone, Debug)]
1894pub struct LockableCryptoStore(Arc<dyn CryptoStore<Error = CryptoStoreError>>);
1895
1896impl matrix_sdk_common::cross_process_lock::TryLock for LockableCryptoStore {
1897    type LockError = CryptoStoreError;
1898
1899    async fn try_lock(
1900        &self,
1901        lease_duration_ms: u32,
1902        key: &str,
1903        holder: &str,
1904    ) -> std::result::Result<Option<CrossProcessLockGeneration>, Self::LockError> {
1905        self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
1906    }
1907}
1908
1909#[cfg(test)]
1910mod tests {
1911    use std::{collections::BTreeMap, pin::pin};
1912
1913    use assert_matches2::{assert_let, assert_matches};
1914    use futures_util::StreamExt;
1915    use insta::{_macro_support::Content, assert_json_snapshot, internals::ContentPath};
1916    use matrix_sdk_test::async_test;
1917    use ruma::{
1918        RoomId, device_id,
1919        events::room::{EncryptedFileInit, JsonWebKeyInit},
1920        owned_device_id, owned_mxc_uri, room_id,
1921        serde::Base64,
1922        user_id,
1923    };
1924    use serde_json::json;
1925    use vodozemac::{Ed25519Keypair, megolm::SessionKey};
1926
1927    use crate::{
1928        Account, OlmMachine,
1929        machine::test_helpers::get_machine_pair,
1930        olm::{InboundGroupSession, SenderData},
1931        store::types::{DehydratedDeviceKey, RoomKeyWithheldEntry, StoredRoomKeyBundleData},
1932        types::{
1933            EventEncryptionAlgorithm,
1934            events::{
1935                room_key_bundle::RoomKeyBundleContent,
1936                room_key_withheld::{MegolmV1AesSha2WithheldContent, RoomKeyWithheldContent},
1937            },
1938        },
1939    };
1940
1941    #[async_test]
1942    async fn test_merge_received_group_session() {
1943        let alice_account = Account::with_device_id(user_id!("@a:s.co"), device_id!("ABC"));
1944        let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("DEF")).await;
1945
1946        let room_id = room_id!("!test:localhost");
1947
1948        let megolm_signing_key = Ed25519Keypair::new();
1949        let inbound = make_inbound_group_session(&alice_account, &megolm_signing_key, room_id);
1950
1951        // Bob already knows about the session, at index 5, with the device keys.
1952        let mut inbound_at_index_5 =
1953            InboundGroupSession::from_export(&inbound.export_at_index(5).await).unwrap();
1954        inbound_at_index_5.sender_data = inbound.sender_data.clone();
1955        bob.store().save_inbound_group_sessions(&[inbound_at_index_5.clone()]).await.unwrap();
1956
1957        // No changes if we get a disconnected session.
1958        let disconnected = make_inbound_group_session(&alice_account, &megolm_signing_key, room_id);
1959        assert_eq!(bob.store().merge_received_group_session(disconnected).await.unwrap(), None);
1960
1961        // No changes needed when we receive a worse copy of the session
1962        let mut worse =
1963            InboundGroupSession::from_export(&inbound.export_at_index(10).await).unwrap();
1964        worse.sender_data = inbound.sender_data.clone();
1965        assert_eq!(bob.store().merge_received_group_session(worse).await.unwrap(), None);
1966
1967        // Nor when we receive an exact copy of what we already have
1968        let mut copy = InboundGroupSession::from_pickle(inbound_at_index_5.pickle().await).unwrap();
1969        copy.sender_data = inbound.sender_data.clone();
1970        assert_eq!(bob.store().merge_received_group_session(copy).await.unwrap(), None);
1971
1972        // But when we receive a better copy of the session, we should get it back
1973        let mut better =
1974            InboundGroupSession::from_export(&inbound.export_at_index(0).await).unwrap();
1975        better.sender_data = inbound.sender_data.clone();
1976        assert_let!(Some(update) = bob.store().merge_received_group_session(better).await.unwrap());
1977        assert_eq!(update.first_known_index(), 0);
1978
1979        // A worse copy of the ratchet, but better trust data
1980        {
1981            let mut worse_ratchet_better_trust =
1982                InboundGroupSession::from_export(&inbound.export_at_index(10).await).unwrap();
1983            let updated_sender_data = SenderData::sender_verified(
1984                alice_account.user_id(),
1985                alice_account.device_id(),
1986                Ed25519Keypair::new().public_key(),
1987            );
1988            worse_ratchet_better_trust.sender_data = updated_sender_data.clone();
1989            assert_let!(
1990                Some(update) = bob
1991                    .store()
1992                    .merge_received_group_session(worse_ratchet_better_trust)
1993                    .await
1994                    .unwrap()
1995            );
1996            assert_eq!(update.sender_data, updated_sender_data);
1997            assert_eq!(update.first_known_index(), 5);
1998            assert_eq!(
1999                update.export_at_index(0).await.session_key.to_bytes(),
2000                inbound.export_at_index(5).await.session_key.to_bytes()
2001            );
2002        }
2003
2004        // A better copy of the ratchet, but worse trust data
2005        {
2006            let mut better_ratchet_worse_trust =
2007                InboundGroupSession::from_export(&inbound.export_at_index(0).await).unwrap();
2008            let updated_sender_data = SenderData::unknown();
2009            better_ratchet_worse_trust.sender_data = updated_sender_data.clone();
2010            assert_let!(
2011                Some(update) = bob
2012                    .store()
2013                    .merge_received_group_session(better_ratchet_worse_trust)
2014                    .await
2015                    .unwrap()
2016            );
2017            assert_eq!(update.sender_data, inbound.sender_data);
2018            assert_eq!(update.first_known_index(), 0);
2019            assert_eq!(
2020                update.export_at_index(0).await.session_key.to_bytes(),
2021                inbound.export_at_index(0).await.session_key.to_bytes()
2022            );
2023        }
2024    }
2025
2026    /// Create an [`InboundGroupSession`] for the given room, using the given
2027    /// Ed25519 key as the signing key/session ID.
2028    fn make_inbound_group_session(
2029        sender_account: &Account,
2030        signing_key: &Ed25519Keypair,
2031        room_id: &RoomId,
2032    ) -> InboundGroupSession {
2033        InboundGroupSession::new(
2034            sender_account.identity_keys.curve25519,
2035            sender_account.identity_keys.ed25519,
2036            room_id,
2037            &make_session_key(signing_key),
2038            SenderData::device_info(crate::types::DeviceKeys::new(
2039                sender_account.user_id().to_owned(),
2040                sender_account.device_id().to_owned(),
2041                vec![],
2042                BTreeMap::new(),
2043                crate::types::Signatures::new(),
2044            )),
2045            None,
2046            EventEncryptionAlgorithm::MegolmV1AesSha2,
2047            Some(ruma::events::room::history_visibility::HistoryVisibility::Shared),
2048            true,
2049        )
2050        .unwrap()
2051    }
2052
2053    /// Make a Megolm [`SessionKey`] using the given Ed25519 key as a signing
2054    /// key/session ID.
2055    fn make_session_key(signing_key: &Ed25519Keypair) -> SessionKey {
2056        use rand::Rng;
2057
2058        // `SessionKey::new` is not public, so the easiest way to construct a Megolm
2059        // session using a known Ed25519 key is to build a byte array in the export
2060        // format.
2061
2062        let mut session_key_bytes = vec![0u8; 229];
2063        // 0: version
2064        session_key_bytes[0] = 2;
2065        // 1..5: index
2066        // 5..133: ratchet key
2067        rand::thread_rng().fill(&mut session_key_bytes[5..133]);
2068        // 133..165: public ed25519 key
2069        session_key_bytes[133..165].copy_from_slice(signing_key.public_key().as_bytes());
2070        // 165..229: signature
2071        let sig = signing_key.sign(&session_key_bytes[0..165]);
2072        session_key_bytes[165..229].copy_from_slice(&sig.to_bytes());
2073
2074        SessionKey::from_bytes(&session_key_bytes).unwrap()
2075    }
2076
2077    #[async_test]
2078    async fn test_import_room_keys_notifies_stream() {
2079        use futures_util::FutureExt;
2080
2081        let (alice, bob, _) =
2082            get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2083
2084        let room1_id = room_id!("!room1:localhost");
2085        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2086        let exported_sessions = alice.store().export_room_keys(|_| true).await.unwrap();
2087
2088        let mut room_keys_received_stream = Box::pin(bob.store().room_keys_received_stream());
2089        bob.store().import_room_keys(exported_sessions, None, |_, _| {}).await.unwrap();
2090
2091        let room_keys = room_keys_received_stream
2092            .next()
2093            .now_or_never()
2094            .flatten()
2095            .expect("We should have received an update of room key infos")
2096            .unwrap();
2097        assert_eq!(room_keys.len(), 1);
2098        assert_eq!(room_keys[0].room_id, "!room1:localhost");
2099    }
2100
2101    #[async_test]
2102    async fn test_export_room_keys_provides_selected_keys() {
2103        // Given an OlmMachine with room keys in it
2104        let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2105        let room1_id = room_id!("!room1:localhost");
2106        let room2_id = room_id!("!room2:localhost");
2107        let room3_id = room_id!("!room3:localhost");
2108        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2109        alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2110        alice.create_outbound_group_session_with_defaults_test_helper(room3_id).await.unwrap();
2111
2112        // When I export some of the keys
2113        let keys = alice
2114            .store()
2115            .export_room_keys(|s| s.room_id() == room2_id || s.room_id() == room3_id)
2116            .await
2117            .unwrap();
2118
2119        // Then the requested keys were provided
2120        assert_eq!(keys.len(), 2);
2121        assert_eq!(keys[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2122        assert_eq!(keys[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2123        assert_eq!(keys[0].room_id, "!room2:localhost");
2124        assert_eq!(keys[1].room_id, "!room3:localhost");
2125        assert_eq!(keys[0].session_key.to_base64().len(), 220);
2126        assert_eq!(keys[1].session_key.to_base64().len(), 220);
2127    }
2128
2129    #[async_test]
2130    async fn test_export_room_keys_stream_can_provide_all_keys() {
2131        // Given an OlmMachine with room keys in it
2132        let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2133        let room1_id = room_id!("!room1:localhost");
2134        let room2_id = room_id!("!room2:localhost");
2135        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2136        alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2137
2138        // When I export the keys as a stream
2139        let mut keys = pin!(alice.store().export_room_keys_stream(|_| true).await.unwrap());
2140
2141        // And collect them
2142        let mut collected = vec![];
2143        while let Some(key) = keys.next().await {
2144            collected.push(key);
2145        }
2146
2147        // Then all the keys were provided
2148        assert_eq!(collected.len(), 2);
2149        assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2150        assert_eq!(collected[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2151        assert_eq!(collected[0].room_id, "!room1:localhost");
2152        assert_eq!(collected[1].room_id, "!room2:localhost");
2153        assert_eq!(collected[0].session_key.to_base64().len(), 220);
2154        assert_eq!(collected[1].session_key.to_base64().len(), 220);
2155    }
2156
2157    #[async_test]
2158    async fn test_export_room_keys_stream_can_provide_a_subset_of_keys() {
2159        // Given an OlmMachine with room keys in it
2160        let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2161        let room1_id = room_id!("!room1:localhost");
2162        let room2_id = room_id!("!room2:localhost");
2163        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2164        alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2165
2166        // When I export the keys as a stream
2167        let mut keys =
2168            pin!(alice.store().export_room_keys_stream(|s| s.room_id() == room1_id).await.unwrap());
2169
2170        // And collect them
2171        let mut collected = vec![];
2172        while let Some(key) = keys.next().await {
2173            collected.push(key);
2174        }
2175
2176        // Then all the keys matching our predicate were provided, and no others
2177        assert_eq!(collected.len(), 1);
2178        assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2179        assert_eq!(collected[0].room_id, "!room1:localhost");
2180        assert_eq!(collected[0].session_key.to_base64().len(), 220);
2181    }
2182
2183    #[async_test]
2184    async fn test_export_secrets_bundle() {
2185        let user_id = user_id!("@alice:example.com");
2186        let (first, second, _) = get_machine_pair(user_id, user_id, false).await;
2187
2188        let _ = first
2189            .bootstrap_cross_signing(false)
2190            .await
2191            .expect("We should be able to bootstrap cross-signing");
2192
2193        let bundle = first.store().export_secrets_bundle().await.expect(
2194            "We should be able to export the secrets bundle, now that we \
2195             have the cross-signing keys",
2196        );
2197
2198        assert!(bundle.backup.is_none(), "The bundle should not contain a backup key");
2199
2200        second
2201            .store()
2202            .import_secrets_bundle(&bundle)
2203            .await
2204            .expect("We should be able to import the secrets bundle");
2205
2206        let status = second.cross_signing_status().await;
2207        let identity = second.get_identity(user_id, None).await.unwrap().unwrap().own().unwrap();
2208
2209        assert!(identity.is_verified(), "The public identity should be marked as verified.");
2210
2211        assert!(status.is_complete(), "We should have imported all the cross-signing keys");
2212    }
2213
2214    #[async_test]
2215    async fn test_create_dehydrated_device_key() {
2216        let pickle_key = DehydratedDeviceKey::new()
2217            .expect("Should be able to create a random dehydrated device key");
2218
2219        let to_vec = pickle_key.inner.to_vec();
2220        let pickle_key_from_slice = DehydratedDeviceKey::from_slice(to_vec.as_slice())
2221            .expect("Should be able to create a dehydrated device key from slice");
2222
2223        assert_eq!(pickle_key_from_slice.to_base64(), pickle_key.to_base64());
2224    }
2225
2226    #[async_test]
2227    async fn test_create_dehydrated_errors() {
2228        let too_small = [0u8; 22];
2229        let pickle_key = DehydratedDeviceKey::from_slice(&too_small);
2230
2231        assert!(pickle_key.is_err());
2232
2233        let too_big = [0u8; 40];
2234        let pickle_key = DehydratedDeviceKey::from_slice(&too_big);
2235
2236        assert!(pickle_key.is_err());
2237    }
2238
2239    #[async_test]
2240    async fn test_build_room_key_bundle() {
2241        // Given: Alice has sent a number of room keys to Bob, including some in the
2242        // wrong room, and some that are not marked as shared...
2243        let alice = OlmMachine::new(user_id!("@a:s.co"), device_id!("ALICE")).await;
2244        let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("BOB")).await;
2245
2246        let room1_id = room_id!("!room1:localhost");
2247        let room2_id = room_id!("!room2:localhost");
2248
2249        /* We use hardcoded megolm session data, to get a stable output snapshot. These were all created with:
2250
2251           println!("{}", vodozemac::megolm::GroupSession::new(Default::default()).session_key().to_base64());
2252        */
2253        let session_key1 = "AgAAAAC2XHVzsMBKs4QCRElJ92CJKyGtknCSC8HY7cQ7UYwndMKLQAejXLh5UA0l6s736mgctcUMNvELScUWrObdflrHo+vth/gWreXOaCnaSxmyjjKErQwyIYTkUfqbHy40RJfEesLwnN23on9XAkch/iy8R2+Jz7B8zfG01f2Ow2SxPQFnAndcO1ZSD2GmXgedy6n4B20MWI1jGP2wiexOWbFSya8DO/VxC9m5+/mF+WwYqdpKn9g4Y05Yw4uz7cdjTc3rXm7xK+8E7hI//5QD1nHPvuKYbjjM9u2JSL+Bzp61Cw";
2254        let session_key2 = "AgAAAAC1BXreFTUQQSBGekTEuYxhdytRKyv4JgDGcG+VOBYdPNGgs807SdibCGJky4lJ3I+7ZDGHoUzZPZP/4ogGu4kxni0PWdtWuN7+5zsuamgoFF/BkaGeUUGv6kgIkx8pyPpM5SASTUEP9bN2loDSpUPYwfiIqz74DgC4WQ4435sTBctYvKz8n+TDJwdLXpyT6zKljuqADAioud+s/iqx9LYn9HpbBfezZcvbg67GtE113pLrvde3IcPI5s6dNHK2onGO2B2eoaobcen18bbEDnlUGPeIivArLya7Da6us14jBQ";
2255        let session_key3 = "AgAAAAAM9KFsliaUUhGSXgwOzM5UemjkNH4n8NHgvC/y8hhw13zTF+ooGD4uIYEXYX630oNvQm/EvgZo+dkoc0re+vsqsx4sQeNODdSjcBsWOa0oDF+irQn9oYoLUDPI1IBtY1rX+FV99Zm/xnG7uFOX7aTVlko2GSdejy1w9mfobmfxu5aUc04A9zaKJP1pOthZvRAlhpymGYHgsDtWPrrjyc/yypMflE4kIUEEEtu1kT6mrAmcl615XYRAHYK9G2+fZsGvokwzbkl4nulGwcZMpQEoM0nD2o3GWgX81HW3nGfKBg";
2256        let session_key4 = "AgAAAAA4Kkesxq2h4v9PLD6Sm3Smxspz1PXTqytQPCMQMkkrHNmzV2bHlJ+6/Al9cu8vh1Oj69AK0WUAeJOJuaiskEeg/PI3P03+UYLeC379RzgqwSHdBgdQ41G2vD6zpgmE/8vYToe+qpCZACtPOswZxyqxHH+T/Iq0nv13JmlFGIeA6fEPfr5Y28B49viG74Fs9rxV9EH5PfjbuPM/p+Sz5obShuaBPKQBX1jT913nEXPoIJ06exNZGr0285nw/LgVvNlmWmbqNnbzO2cNZjQWA+xZYz5FSfyCxwqEBbEdUCuRCQ";
2257
2258        let sessions = [
2259            create_inbound_group_session_with_visibility(
2260                &alice,
2261                room1_id,
2262                &SessionKey::from_base64(session_key1).unwrap(),
2263                true,
2264            ),
2265            create_inbound_group_session_with_visibility(
2266                &alice,
2267                room1_id,
2268                &SessionKey::from_base64(session_key2).unwrap(),
2269                true,
2270            ),
2271            create_inbound_group_session_with_visibility(
2272                &alice,
2273                room1_id,
2274                &SessionKey::from_base64(session_key3).unwrap(),
2275                false,
2276            ),
2277            create_inbound_group_session_with_visibility(
2278                &alice,
2279                room2_id,
2280                &SessionKey::from_base64(session_key4).unwrap(),
2281                true,
2282            ),
2283        ];
2284        bob.store().save_inbound_group_sessions(&sessions).await.unwrap();
2285
2286        // When I build the bundle
2287        let mut bundle = bob.store().build_room_key_bundle(room1_id).await.unwrap();
2288
2289        // Then the bundle matches the snapshot.
2290
2291        // We sort the sessions in the bundle, so that the snapshot is stable.
2292        bundle.room_keys.sort_by_key(|session| session.session_id.clone());
2293
2294        // We substitute the algorithm, since this changes based on feature flags.
2295        let algorithm = if cfg!(feature = "experimental-algorithms") {
2296            "m.megolm.v2.aes-sha2"
2297        } else {
2298            "m.megolm.v1.aes-sha2"
2299        };
2300        let map_algorithm = move |value: Content, _path: ContentPath<'_>| {
2301            assert_eq!(value.as_str().unwrap(), algorithm);
2302            "[algorithm]"
2303        };
2304
2305        // We also substitute alice's keys in the snapshot with placeholders
2306        let alice_curve_key = alice.identity_keys().curve25519.to_base64();
2307        let map_alice_curve_key = move |value: Content, _path: ContentPath<'_>| {
2308            assert_eq!(value.as_str().unwrap(), alice_curve_key);
2309            "[alice curve key]"
2310        };
2311        let alice_ed25519_key = alice.identity_keys().ed25519.to_base64();
2312        let map_alice_ed25519_key = move |value: Content, _path: ContentPath<'_>| {
2313            assert_eq!(value.as_str().unwrap(), alice_ed25519_key);
2314            "[alice ed25519 key]"
2315        };
2316
2317        insta::with_settings!({ sort_maps => true }, {
2318            assert_json_snapshot!(bundle, {
2319                ".withheld[].algorithm" => insta::dynamic_redaction(map_algorithm),
2320                ".room_keys[].algorithm" => insta::dynamic_redaction(map_algorithm),
2321                ".room_keys[].sender_key" => insta::dynamic_redaction(map_alice_curve_key.clone()),
2322                ".withheld[].sender_key" => insta::dynamic_redaction(map_alice_curve_key),
2323                ".room_keys[].sender_claimed_keys.ed25519" => insta::dynamic_redaction(map_alice_ed25519_key),
2324            });
2325        });
2326    }
2327
2328    #[async_test]
2329    async fn test_receive_room_key_bundle() {
2330        let alice = OlmMachine::new(user_id!("@a:s.co"), device_id!("ALICE")).await;
2331        let alice_key = alice.identity_keys().curve25519;
2332        let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("BOB")).await;
2333
2334        let room_id = room_id!("!room1:localhost");
2335
2336        let session_key1 = "AgAAAAC2XHVzsMBKs4QCRElJ92CJKyGtknCSC8HY7cQ7UYwndMKLQAejXLh5UA0l6s736mgctcUMNvELScUWrObdflrHo+vth/gWreXOaCnaSxmyjjKErQwyIYTkUfqbHy40RJfEesLwnN23on9XAkch/iy8R2+Jz7B8zfG01f2Ow2SxPQFnAndcO1ZSD2GmXgedy6n4B20MWI1jGP2wiexOWbFSya8DO/VxC9m5+/mF+WwYqdpKn9g4Y05Yw4uz7cdjTc3rXm7xK+8E7hI//5QD1nHPvuKYbjjM9u2JSL+Bzp61Cw";
2337        let session_key2 = "AgAAAAC1BXreFTUQQSBGekTEuYxhdytRKyv4JgDGcG+VOBYdPNGgs807SdibCGJky4lJ3I+7ZDGHoUzZPZP/4ogGu4kxni0PWdtWuN7+5zsuamgoFF/BkaGeUUGv6kgIkx8pyPpM5SASTUEP9bN2loDSpUPYwfiIqz74DgC4WQ4435sTBctYvKz8n+TDJwdLXpyT6zKljuqADAioud+s/iqx9LYn9HpbBfezZcvbg67GtE113pLrvde3IcPI5s6dNHK2onGO2B2eoaobcen18bbEDnlUGPeIivArLya7Da6us14jBQ";
2338
2339        let sessions = [
2340            create_inbound_group_session_with_visibility(
2341                &alice,
2342                room_id,
2343                &SessionKey::from_base64(session_key1).unwrap(),
2344                true,
2345            ),
2346            create_inbound_group_session_with_visibility(
2347                &alice,
2348                room_id,
2349                &SessionKey::from_base64(session_key2).unwrap(),
2350                false,
2351            ),
2352        ];
2353
2354        alice.store().save_inbound_group_sessions(&sessions).await.unwrap();
2355        let bundle = alice.store().build_room_key_bundle(room_id).await.unwrap();
2356
2357        bob.store()
2358            .receive_room_key_bundle(
2359                &StoredRoomKeyBundleData {
2360                    sender_user: alice.user_id().to_owned(),
2361                    sender_key: alice_key,
2362                    sender_data: SenderData::sender_verified(
2363                        alice.user_id(),
2364                        device_id!("ALICE"),
2365                        alice.identity_keys().ed25519,
2366                    ),
2367
2368                    bundle_data: RoomKeyBundleContent {
2369                        room_id: room_id.to_owned(),
2370                        // This isn't used at all in the method call, so we can fill it with
2371                        // garbage.
2372                        file: EncryptedFileInit {
2373                            url: owned_mxc_uri!("mxc://example.com/0"),
2374                            key: JsonWebKeyInit {
2375                                kty: "oct".to_owned(),
2376                                key_ops: vec!["encrypt".to_owned(), "decrypt".to_owned()],
2377                                alg: "A256CTR.".to_owned(),
2378                                k: Base64::new(vec![0u8; 128]),
2379                                ext: true,
2380                            }
2381                            .into(),
2382                            iv: Base64::new(vec![0u8; 128]),
2383                            hashes: vec![("sha256".to_owned(), Base64::new(vec![0u8; 128]))]
2384                                .into_iter()
2385                                .collect(),
2386                            v: "v2".to_owned(),
2387                        }
2388                        .into(),
2389                    },
2390                },
2391                bundle,
2392                |_, _| {},
2393            )
2394            .await
2395            .unwrap();
2396
2397        // The room key should be imported successfully
2398        let imported_sessions =
2399            bob.store().get_inbound_group_sessions_by_room_id(room_id).await.unwrap();
2400
2401        assert_eq!(imported_sessions.len(), 1);
2402        assert_eq!(imported_sessions[0].room_id(), room_id);
2403
2404        // The session forwarder data should be set correctly.
2405        assert_eq!(
2406            imported_sessions[0]
2407                .forwarder_data
2408                .as_ref()
2409                .expect("Session should contain forwarder data.")
2410                .user_id(),
2411            alice.user_id()
2412        );
2413
2414        assert_matches!(
2415            bob.store()
2416                .get_withheld_info(room_id, sessions[1].session_id())
2417                .await
2418                .unwrap()
2419                .expect("Withheld info should be present in the store."),
2420            RoomKeyWithheldEntry {
2421                #[cfg(not(feature = "experimental-algorithms"))]
2422                content: RoomKeyWithheldContent::MegolmV1AesSha2(
2423                    MegolmV1AesSha2WithheldContent::HistoryNotShared(_)
2424                ),
2425                #[cfg(feature = "experimental-algorithms")]
2426                content: RoomKeyWithheldContent::MegolmV2AesSha2(
2427                    MegolmV1AesSha2WithheldContent::HistoryNotShared(_)
2428                ),
2429                ..
2430            }
2431        );
2432    }
2433
2434    /// Tests that the new store format introduced in [#5737][#5737] does not
2435    /// conflict with items already in the store that were serialised with the
2436    /// older format.
2437    ///
2438    /// [#5737]: https://github.com/matrix-org/matrix-rust-sdk/pull/5737
2439    #[async_test]
2440    async fn test_deserialize_room_key_withheld_entry_from_to_device_event() {
2441        let entry: RoomKeyWithheldEntry = serde_json::from_value(json!(
2442            {
2443              "content": {
2444                "algorithm": "m.megolm.v1.aes-sha2",
2445                "code": "m.unauthorised",
2446                "from_device": "ALICE",
2447                "reason": "You are not authorised to read the message.",
2448                "room_id": "!roomid:s.co",
2449                "sender_key": "7hIcOrEroXYdzjtCBvBjUiqvT0Me7g+ymeXqoc65RS0",
2450                "session_id": "session123"
2451              },
2452              "sender": "@alice:s.co",
2453              "type": "m.room_key.withheld"
2454            }
2455        ))
2456        .unwrap();
2457
2458        assert_matches!(
2459            entry,
2460            RoomKeyWithheldEntry {
2461                sender,
2462                content: RoomKeyWithheldContent::MegolmV1AesSha2(
2463                    MegolmV1AesSha2WithheldContent::Unauthorised(withheld_content,)
2464                ),
2465            }
2466        );
2467
2468        assert_eq!(sender, "@alice:s.co");
2469        assert_eq!(withheld_content.room_id, "!roomid:s.co");
2470        assert_eq!(withheld_content.session_id, "session123");
2471        assert_eq!(
2472            withheld_content.sender_key.to_base64(),
2473            "7hIcOrEroXYdzjtCBvBjUiqvT0Me7g+ymeXqoc65RS0"
2474        );
2475        assert_eq!(withheld_content.from_device, Some(owned_device_id!("ALICE")));
2476    }
2477
2478    /// Create an inbound Megolm session for the given room.
2479    ///
2480    /// `olm_machine` is used to set the `sender_key` and `signing_key`
2481    /// fields of the resultant session.
2482    ///
2483    /// The encryption algorithm used for the session depends on the
2484    /// `experimental-algorithms` feature flag:
2485    ///
2486    /// - When not set, the session uses `m.megolm.v1.aes-sha2`.
2487    /// - When set, the session uses `m.megolm.v2.aes-sha2`.
2488    fn create_inbound_group_session_with_visibility(
2489        olm_machine: &OlmMachine,
2490        room_id: &RoomId,
2491        session_key: &SessionKey,
2492        shared_history: bool,
2493    ) -> InboundGroupSession {
2494        let identity_keys = &olm_machine.store().static_account().identity_keys;
2495        InboundGroupSession::new(
2496            identity_keys.curve25519,
2497            identity_keys.ed25519,
2498            room_id,
2499            session_key,
2500            SenderData::unknown(),
2501            None,
2502            #[cfg(not(feature = "experimental-algorithms"))]
2503            EventEncryptionAlgorithm::MegolmV1AesSha2,
2504            #[cfg(feature = "experimental-algorithms")]
2505            EventEncryptionAlgorithm::MegolmV2AesSha2,
2506            None,
2507            shared_history,
2508        )
2509        .unwrap()
2510    }
2511}