matrix_sdk_crypto/store/
mod.rs

1// Copyright 2020 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Types and traits to implement the storage layer for the [`OlmMachine`]
16//!
17//! The storage layer for the [`OlmMachine`] can be customized using a trait.
18//! Implementing your own [`CryptoStore`]
19//!
20//! An in-memory only store is provided as well as an SQLite-based one,
21//! depending on your needs and targets a custom store may be implemented, e.g.
22//! for `wasm-unknown-unknown` an indexeddb store would be needed
23//!
24//! ```
25//! # use std::sync::Arc;
26//! # use matrix_sdk_crypto::{
27//! #     OlmMachine,
28//! #     store::MemoryStore,
29//! # };
30//! # use ruma::{device_id, user_id};
31//! # let user_id = user_id!("@example:localhost");
32//! # let device_id = device_id!("TEST");
33//! let store = Arc::new(MemoryStore::new());
34//!
35//! let machine = OlmMachine::with_store(user_id, device_id, store, None);
36//! ```
37//!
38//! [`OlmMachine`]: /matrix_sdk_crypto/struct.OlmMachine.html
39//! [`CryptoStore`]: trait.Cryptostore.html
40
41use std::{
42    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
43    fmt::Debug,
44    ops::Deref,
45    pin::pin,
46    sync::{Arc, atomic::Ordering},
47    time::Duration,
48};
49
50use as_variant::as_variant;
51use futures_core::Stream;
52use futures_util::StreamExt;
53use itertools::{Either, Itertools};
54use ruma::{
55    DeviceId, OwnedDeviceId, OwnedUserId, RoomId, UserId, encryption::KeyUsage,
56    events::secret::request::SecretName,
57};
58use serde::{Serialize, de::DeserializeOwned};
59use thiserror::Error;
60use tokio::sync::{Mutex, Notify, OwnedRwLockWriteGuard, RwLock};
61use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
62use tracing::{info, instrument, trace, warn};
63use types::{RoomKeyBundleInfo, StoredRoomKeyBundleData};
64use vodozemac::{Curve25519PublicKey, megolm::SessionOrdering};
65
66use self::types::{
67    Changes, CrossSigningKeyExport, DeviceChanges, DeviceUpdates, IdentityChanges, IdentityUpdates,
68    PendingChanges, RoomKeyInfo, RoomKeyWithheldInfo, UserKeyQueryResult,
69};
70use crate::{
71    CrossSigningStatus, OwnUserIdentityData, RoomKeyImportResult,
72    gossiping::GossippedSecret,
73    identities::{Device, DeviceData, UserDevices, UserIdentityData, user::UserIdentity},
74    olm::{
75        Account, ExportedRoomKey, ForwarderData, InboundGroupSession, PrivateCrossSigningIdentity,
76        SenderData, Session, StaticAccountData,
77    },
78    store::types::RoomKeyWithheldEntry,
79    types::{
80        BackupSecrets, CrossSigningSecrets, MegolmBackupV1Curve25519AesSha2Secrets, RoomKeyExport,
81        SecretsBundle,
82    },
83    verification::VerificationMachine,
84};
85#[cfg(doc)]
86use crate::{backups::BackupMachine, identities::OwnUserIdentity};
87
88pub mod caches;
89mod crypto_store_wrapper;
90mod error;
91mod memorystore;
92mod traits;
93pub mod types;
94
95#[cfg(any(test, feature = "testing"))]
96#[macro_use]
97#[allow(missing_docs)]
98pub mod integration_tests;
99
100pub(crate) use crypto_store_wrapper::CryptoStoreWrapper;
101pub use error::{CryptoStoreError, Result};
102use matrix_sdk_common::{
103    cross_process_lock::{CrossProcessLock, CrossProcessLockGeneration},
104    deserialized_responses::WithheldCode,
105    timeout::timeout,
106};
107pub use memorystore::MemoryStore;
108pub use traits::{CryptoStore, DynCryptoStore, IntoCryptoStore};
109
110use self::caches::{SequenceNumber, StoreCache, StoreCacheGuard, UsersForKeyQuery};
111use crate::types::{
112    events::room_key_withheld::RoomKeyWithheldContent, room_history::RoomKeyBundle,
113};
114pub use crate::{
115    dehydrated_devices::DehydrationError,
116    gossiping::{GossipRequest, SecretInfo},
117};
118
119/// A wrapper for our CryptoStore trait object.
120///
121/// This is needed because we want to have a generic interface so we can
122/// store/restore objects that we can serialize. Since trait objects and
123/// generics don't mix let the CryptoStore store strings and this wrapper
124/// adds the generic interface on top.
125#[derive(Debug, Clone)]
126pub struct Store {
127    inner: Arc<StoreInner>,
128}
129
130#[derive(Debug, Default)]
131pub(crate) struct KeyQueryManager {
132    /// Record of the users that are waiting for a /keys/query.
133    users_for_key_query: Mutex<UsersForKeyQuery>,
134
135    /// Notifier that is triggered each time an update is received for a user.
136    users_for_key_query_notify: Notify,
137}
138
139impl KeyQueryManager {
140    pub async fn synced<'a>(&'a self, cache: &'a StoreCache) -> Result<SyncedKeyQueryManager<'a>> {
141        self.ensure_sync_tracked_users(cache).await?;
142        Ok(SyncedKeyQueryManager { cache, manager: self })
143    }
144
145    /// Load the list of users for whom we are tracking their device lists and
146    /// fill out our caches.
147    ///
148    /// This method ensures that we're only going to load the users from the
149    /// actual [`CryptoStore`] once, it will also make sure that any
150    /// concurrent calls to this method get deduplicated.
151    async fn ensure_sync_tracked_users(&self, cache: &StoreCache) -> Result<()> {
152        // Check if the users are loaded, and in that case do nothing.
153        let loaded = cache.loaded_tracked_users.read().await;
154        if *loaded {
155            return Ok(());
156        }
157
158        // Otherwise, we may load the users.
159        drop(loaded);
160        let mut loaded = cache.loaded_tracked_users.write().await;
161
162        // Check again if the users have been loaded, in case another call to this
163        // method loaded the tracked users between the time we tried to
164        // acquire the lock and the time we actually acquired the lock.
165        if *loaded {
166            return Ok(());
167        }
168
169        let tracked_users = cache.store.load_tracked_users().await?;
170
171        let mut query_users_lock = self.users_for_key_query.lock().await;
172        let mut tracked_users_cache = cache.tracked_users.write();
173        for user in tracked_users {
174            tracked_users_cache.insert(user.user_id.to_owned());
175
176            if user.dirty {
177                query_users_lock.insert_user(&user.user_id);
178            }
179        }
180
181        *loaded = true;
182
183        Ok(())
184    }
185
186    /// Wait for a `/keys/query` response to be received if one is expected for
187    /// the given user.
188    ///
189    /// If the given timeout elapses, the method will stop waiting and return
190    /// [`UserKeyQueryResult::TimeoutExpired`].
191    ///
192    /// Requires a [`StoreCacheGuard`] to make sure the users for which a key
193    /// query is pending are up to date, but doesn't hold on to it
194    /// thereafter: the lock is short-lived in this case.
195    pub async fn wait_if_user_key_query_pending(
196        &self,
197        cache: StoreCacheGuard,
198        timeout_duration: Duration,
199        user: &UserId,
200    ) -> Result<UserKeyQueryResult> {
201        {
202            // Drop the cache early, so we don't keep it while waiting (since writing the
203            // results requires to write in the cache, thus take another lock).
204            self.ensure_sync_tracked_users(&cache).await?;
205            drop(cache);
206        }
207
208        let mut users_for_key_query = self.users_for_key_query.lock().await;
209        let Some(waiter) = users_for_key_query.maybe_register_waiting_task(user) else {
210            return Ok(UserKeyQueryResult::WasNotPending);
211        };
212
213        let wait_for_completion = async {
214            while !waiter.completed.load(Ordering::Relaxed) {
215                // Register for being notified before releasing the mutex, so
216                // it's impossible to miss a wakeup between the last check for
217                // whether we should wait, and starting to wait.
218                let mut notified = pin!(self.users_for_key_query_notify.notified());
219                notified.as_mut().enable();
220                drop(users_for_key_query);
221
222                // Wait for a notification
223                notified.await;
224
225                // Reclaim the lock before checking the flag to avoid races
226                // when two notifications happen right after each other and the
227                // second one sets the flag we want to wait for.
228                users_for_key_query = self.users_for_key_query.lock().await;
229            }
230        };
231
232        match timeout(Box::pin(wait_for_completion), timeout_duration).await {
233            Err(_) => {
234                warn!(
235                    user_id = ?user,
236                    "The user has a pending `/keys/query` request which did \
237                    not finish yet, some devices might be missing."
238                );
239
240                Ok(UserKeyQueryResult::TimeoutExpired)
241            }
242            _ => Ok(UserKeyQueryResult::WasPending),
243        }
244    }
245}
246
247pub(crate) struct SyncedKeyQueryManager<'a> {
248    cache: &'a StoreCache,
249    manager: &'a KeyQueryManager,
250}
251
252impl SyncedKeyQueryManager<'_> {
253    /// Add entries to the list of users being tracked for device changes
254    ///
255    /// Any users not already on the list are flagged as awaiting a key query.
256    /// Users that were already in the list are unaffected.
257    pub async fn update_tracked_users(&self, users: impl Iterator<Item = &UserId>) -> Result<()> {
258        let mut store_updates = Vec::new();
259        let mut key_query_lock = self.manager.users_for_key_query.lock().await;
260
261        {
262            let mut tracked_users = self.cache.tracked_users.write();
263            for user_id in users {
264                if tracked_users.insert(user_id.to_owned()) {
265                    key_query_lock.insert_user(user_id);
266                    store_updates.push((user_id, true))
267                }
268            }
269        }
270
271        self.cache.store.save_tracked_users(&store_updates).await
272    }
273
274    /// Process notifications that users have changed devices.
275    ///
276    /// This is used to handle the list of device-list updates that is received
277    /// from the `/sync` response. Any users *whose device lists we are
278    /// tracking* are flagged as needing a key query. Users whose devices we
279    /// are not tracking are ignored.
280    pub async fn mark_tracked_users_as_changed(
281        &self,
282        users: impl Iterator<Item = &UserId>,
283    ) -> Result<()> {
284        let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
285        let mut key_query_lock = self.manager.users_for_key_query.lock().await;
286
287        {
288            let tracked_users = &self.cache.tracked_users.read();
289            for user_id in users {
290                if tracked_users.contains(user_id) {
291                    key_query_lock.insert_user(user_id);
292                    store_updates.push((user_id, true));
293                }
294            }
295        }
296
297        self.cache.store.save_tracked_users(&store_updates).await
298    }
299
300    /// Flag that the given users devices are now up-to-date.
301    ///
302    /// This is called after processing the response to a /keys/query request.
303    /// Any users whose device lists we are tracking are removed from the
304    /// list of those pending a /keys/query.
305    pub async fn mark_tracked_users_as_up_to_date(
306        &self,
307        users: impl Iterator<Item = &UserId>,
308        sequence_number: SequenceNumber,
309    ) -> Result<()> {
310        let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
311        let mut key_query_lock = self.manager.users_for_key_query.lock().await;
312
313        {
314            let tracked_users = self.cache.tracked_users.read();
315            for user_id in users {
316                if tracked_users.contains(user_id) {
317                    let clean = key_query_lock.maybe_remove_user(user_id, sequence_number);
318                    store_updates.push((user_id, !clean));
319                }
320            }
321        }
322
323        self.cache.store.save_tracked_users(&store_updates).await?;
324        // wake up any tasks that may have been waiting for updates
325        self.manager.users_for_key_query_notify.notify_waiters();
326
327        Ok(())
328    }
329
330    /// Get the set of users that has the outdate/dirty flag set for their list
331    /// of devices.
332    ///
333    /// This set should be included in a `/keys/query` request which will update
334    /// the device list.
335    ///
336    /// # Returns
337    ///
338    /// A pair `(users, sequence_number)`, where `users` is the list of users to
339    /// be queried, and `sequence_number` is the current sequence number,
340    /// which should be returned in `mark_tracked_users_as_up_to_date`.
341    pub async fn users_for_key_query(&self) -> (HashSet<OwnedUserId>, SequenceNumber) {
342        self.manager.users_for_key_query.lock().await.users_for_key_query()
343    }
344
345    /// See the docs for [`crate::OlmMachine::tracked_users()`].
346    pub fn tracked_users(&self) -> HashSet<OwnedUserId> {
347        self.cache.tracked_users.read().iter().cloned().collect()
348    }
349
350    /// Mark the given user as being tracked for device lists, and mark that it
351    /// has an outdated device list.
352    ///
353    /// This means that the user will be considered for a `/keys/query` request
354    /// next time [`Store::users_for_key_query()`] is called.
355    pub async fn mark_user_as_changed(&self, user: &UserId) -> Result<()> {
356        self.manager.users_for_key_query.lock().await.insert_user(user);
357        self.cache.tracked_users.write().insert(user.to_owned());
358
359        self.cache.store.save_tracked_users(&[(user, true)]).await
360    }
361}
362
363/// Convert the devices and vectors contained in the [`DeviceChanges`] into
364/// a [`DeviceUpdates`] struct.
365///
366/// The [`DeviceChanges`] will contain vectors of [`DeviceData`]s which
367/// we want to convert to a [`Device`].
368fn collect_device_updates(
369    verification_machine: VerificationMachine,
370    own_identity: Option<OwnUserIdentityData>,
371    identities: IdentityChanges,
372    devices: DeviceChanges,
373) -> DeviceUpdates {
374    let mut new: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
375    let mut changed: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
376
377    let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
378
379    let map_device = |device: DeviceData| {
380        let device_owner_identity = new_identities
381            .get(device.user_id())
382            .or_else(|| changed_identities.get(device.user_id()))
383            .or_else(|| unchanged_identities.get(device.user_id()))
384            .cloned();
385
386        Device {
387            inner: device,
388            verification_machine: verification_machine.to_owned(),
389            own_identity: own_identity.to_owned(),
390            device_owner_identity,
391        }
392    };
393
394    for device in devices.new {
395        let device = map_device(device);
396
397        new.entry(device.user_id().to_owned())
398            .or_default()
399            .insert(device.device_id().to_owned(), device);
400    }
401
402    for device in devices.changed {
403        let device = map_device(device);
404
405        changed
406            .entry(device.user_id().to_owned())
407            .or_default()
408            .insert(device.device_id().to_owned(), device.to_owned());
409    }
410
411    DeviceUpdates { new, changed }
412}
413
414/// A temporary transaction (that implies a write) to the underlying store.
415#[allow(missing_debug_implementations)]
416pub struct StoreTransaction {
417    store: Store,
418    changes: PendingChanges,
419    // TODO hold onto the cross-process crypto store lock + cache.
420    cache: OwnedRwLockWriteGuard<StoreCache>,
421}
422
423impl StoreTransaction {
424    /// Starts a new `StoreTransaction`.
425    async fn new(store: Store) -> Self {
426        let cache = store.inner.cache.clone();
427
428        Self { store, changes: PendingChanges::default(), cache: cache.clone().write_owned().await }
429    }
430
431    pub(crate) fn cache(&self) -> &StoreCache {
432        &self.cache
433    }
434
435    /// Returns a reference to the current `Store`.
436    pub fn store(&self) -> &Store {
437        &self.store
438    }
439
440    /// Gets a `Account` for update.
441    ///
442    /// Note: since it's guaranteed that one can't have both a
443    /// `StoreTransaction` and a `StoreCacheGuard` at runtime (since the
444    /// underlying `StoreCache` is guarded by a `RwLock` mutex), this ensures
445    /// that we can't have two copies of an `Account` alive at the same time.
446    pub async fn account(&mut self) -> Result<&mut Account> {
447        if self.changes.account.is_none() {
448            // Make sure the cache loaded the account.
449            let _ = self.cache.account().await?;
450            self.changes.account = self.cache.account.lock().await.take();
451        }
452        Ok(self.changes.account.as_mut().unwrap())
453    }
454
455    /// Commits all dirty fields to the store, and maintains the cache so it
456    /// reflects the current state of the database.
457    pub async fn commit(self) -> Result<()> {
458        if self.changes.is_empty() {
459            return Ok(());
460        }
461
462        // Save changes in the database.
463        let account = self.changes.account.as_ref().map(|acc| acc.deep_clone());
464
465        self.store.save_pending_changes(self.changes).await?;
466
467        // Make the cache coherent with the database.
468        if let Some(account) = account {
469            *self.cache.account.lock().await = Some(account);
470        }
471
472        Ok(())
473    }
474}
475
476#[derive(Debug)]
477struct StoreInner {
478    identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
479    store: Arc<CryptoStoreWrapper>,
480
481    /// In-memory cache for the current crypto store.
482    ///
483    /// ⚠ Must remain private.
484    cache: Arc<RwLock<StoreCache>>,
485
486    verification_machine: VerificationMachine,
487
488    /// Static account data that never changes (and thus can be loaded once and
489    /// for all when creating the store).
490    static_account: StaticAccountData,
491}
492
493/// Error describing what went wrong when importing private cross signing keys
494/// or the key backup key.
495#[derive(Debug, Error)]
496pub enum SecretImportError {
497    /// The key that we tried to import was invalid.
498    #[error("Error while importing {name}: {error}")]
499    Key {
500        /// The name of the secret that was being imported.
501        name: SecretName,
502        /// The key error that occurred.
503        error: vodozemac::KeyError,
504    },
505    /// The public key of the imported private key doesn't match the public
506    /// key that was uploaded to the server.
507    #[error(
508        "Error while importing {name}: The public key of the imported private \
509            key doesn't match the public key that was uploaded to the server"
510    )]
511    MismatchedPublicKeys {
512        /// The name of the secret that was being imported.
513        name: SecretName,
514    },
515    /// The new version of the identity couldn't be stored.
516    #[error(transparent)]
517    Store(#[from] CryptoStoreError),
518}
519
520/// Error describing what went wrong when exporting a [`SecretsBundle`].
521///
522/// The [`SecretsBundle`] can only be exported if we have all cross-signing
523/// private keys in the store.
524#[derive(Debug, Error)]
525pub enum SecretsBundleExportError {
526    /// The store itself had an error.
527    #[error(transparent)]
528    Store(#[from] CryptoStoreError),
529    /// We're missing one or multiple cross-signing keys.
530    #[error("The store is missing one or multiple cross-signing keys")]
531    MissingCrossSigningKey(KeyUsage),
532    /// We're missing all cross-signing keys.
533    #[error("The store doesn't contain any cross-signing keys")]
534    MissingCrossSigningKeys,
535    /// We have a backup key stored, but we don't know the version of the
536    /// backup.
537    #[error("The store contains a backup key, but no backup version")]
538    MissingBackupVersion,
539}
540
541impl Store {
542    /// Create a new Store.
543    pub(crate) fn new(
544        account: StaticAccountData,
545        identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
546        store: Arc<CryptoStoreWrapper>,
547        verification_machine: VerificationMachine,
548    ) -> Self {
549        Self {
550            inner: Arc::new(StoreInner {
551                static_account: account,
552                identity,
553                store: store.clone(),
554                verification_machine,
555                cache: Arc::new(RwLock::new(StoreCache {
556                    store,
557                    tracked_users: Default::default(),
558                    loaded_tracked_users: Default::default(),
559                    account: Default::default(),
560                })),
561            }),
562        }
563    }
564
565    /// UserId associated with this store
566    pub(crate) fn user_id(&self) -> &UserId {
567        &self.inner.static_account.user_id
568    }
569
570    /// DeviceId associated with this store
571    pub(crate) fn device_id(&self) -> &DeviceId {
572        self.inner.verification_machine.own_device_id()
573    }
574
575    /// The static data for the account associated with this store.
576    pub(crate) fn static_account(&self) -> &StaticAccountData {
577        &self.inner.static_account
578    }
579
580    pub(crate) async fn cache(&self) -> Result<StoreCacheGuard> {
581        // TODO: (bnjbvr, #2624) If configured with a cross-process lock:
582        // - try to take the lock,
583        // - if acquired, look if another process touched the underlying storage,
584        // - if yes, reload everything; if no, return current cache
585        Ok(StoreCacheGuard { cache: self.inner.cache.clone().read_owned().await })
586    }
587
588    pub(crate) async fn transaction(&self) -> StoreTransaction {
589        StoreTransaction::new(self.clone()).await
590    }
591
592    // Note: bnjbvr lost against borrowck here. Ideally, the `F` parameter would
593    // take a `&StoreTransaction`, but callers didn't quite like that.
594    pub(crate) async fn with_transaction<
595        T,
596        Fut: Future<Output = Result<(StoreTransaction, T), crate::OlmError>>,
597        F: FnOnce(StoreTransaction) -> Fut,
598    >(
599        &self,
600        func: F,
601    ) -> Result<T, crate::OlmError> {
602        let tr = self.transaction().await;
603        let (tr, res) = func(tr).await?;
604        tr.commit().await?;
605        Ok(res)
606    }
607
608    #[cfg(test)]
609    /// test helper to reset the cross signing identity
610    pub(crate) async fn reset_cross_signing_identity(&self) {
611        self.inner.identity.lock().await.reset();
612    }
613
614    /// PrivateCrossSigningIdentity associated with this store
615    pub(crate) fn private_identity(&self) -> Arc<Mutex<PrivateCrossSigningIdentity>> {
616        self.inner.identity.clone()
617    }
618
619    /// Save the given Sessions to the store
620    pub(crate) async fn save_sessions(&self, sessions: &[Session]) -> Result<()> {
621        let changes = Changes { sessions: sessions.to_vec(), ..Default::default() };
622
623        self.save_changes(changes).await
624    }
625
626    pub(crate) async fn get_sessions(
627        &self,
628        sender_key: &str,
629    ) -> Result<Option<Arc<Mutex<Vec<Session>>>>> {
630        self.inner.store.get_sessions(sender_key).await
631    }
632
633    pub(crate) async fn save_changes(&self, changes: Changes) -> Result<()> {
634        self.inner.store.save_changes(changes).await
635    }
636
637    /// Given an `InboundGroupSession` which we have just received, see if we
638    /// have a matching session already in the store, and determine how to
639    /// handle it.
640    ///
641    /// If the store already has everything we can gather from the new session,
642    /// returns `None`. Otherwise, returns a merged session which should be
643    /// persisted to the store.
644    pub(crate) async fn merge_received_group_session(
645        &self,
646        session: InboundGroupSession,
647    ) -> Result<Option<InboundGroupSession>> {
648        let old_session = self
649            .inner
650            .store
651            .get_inbound_group_session(session.room_id(), session.session_id())
652            .await?;
653
654        // If there is no old session, just use the new session.
655        let Some(old_session) = old_session else {
656            info!("Received a new megolm room key");
657            return Ok(Some(session));
658        };
659
660        let index_comparison = session.compare_ratchet(&old_session).await;
661        let trust_level_comparison =
662            session.sender_data.compare_trust_level(&old_session.sender_data);
663
664        let result = match (index_comparison, trust_level_comparison) {
665            (SessionOrdering::Unconnected, _) => {
666                // If this happens, it means that we have two sessions purporting to have the
667                // same session id, but where the ratchets do not match up.
668                // In other words, someone is playing silly buggers.
669                warn!(
670                    "Received a group session with an ratchet that does not connect to the one in the store, discarding"
671                );
672                None
673            }
674
675            (SessionOrdering::Better, std::cmp::Ordering::Greater)
676            | (SessionOrdering::Better, std::cmp::Ordering::Equal)
677            | (SessionOrdering::Equal, std::cmp::Ordering::Greater) => {
678                // The new session is unambiguously better than what we have in the store.
679                info!(
680                    ?index_comparison,
681                    ?trust_level_comparison,
682                    "Received a megolm room key that we have a worse version of, merging"
683                );
684                Some(session)
685            }
686
687            (SessionOrdering::Worse, std::cmp::Ordering::Less)
688            | (SessionOrdering::Worse, std::cmp::Ordering::Equal)
689            | (SessionOrdering::Equal, std::cmp::Ordering::Less) => {
690                // The new session is unambiguously worse than the one we have in the store.
691                warn!(
692                    ?index_comparison,
693                    ?trust_level_comparison,
694                    "Received a megolm room key that we already have a better version \
695                     of, discarding"
696                );
697                None
698            }
699
700            (SessionOrdering::Equal, std::cmp::Ordering::Equal) => {
701                // The new session is the same as what we have.
702                info!("Received a megolm room key that we already have, discarding");
703                None
704            }
705
706            (SessionOrdering::Better, std::cmp::Ordering::Less) => {
707                // We need to take the ratchet from the new session, and the
708                // sender data from the old session.
709                info!("Upgrading a previously-received megolm session with new ratchet");
710                let result = old_session.with_ratchet(&session);
711                // We'll need to back it up again.
712                result.reset_backup_state();
713                Some(result)
714            }
715
716            (SessionOrdering::Worse, std::cmp::Ordering::Greater) => {
717                // We need to take the ratchet from the old session, and the
718                // sender data from the new session.
719                info!("Upgrading a previously-received megolm session with new sender data");
720                Some(session.with_ratchet(&old_session))
721            }
722        };
723
724        Ok(result)
725    }
726
727    #[cfg(test)]
728    /// Testing helper to allow to save only a set of devices
729    pub(crate) async fn save_device_data(&self, devices: &[DeviceData]) -> Result<()> {
730        use types::DeviceChanges;
731
732        let changes = Changes {
733            devices: DeviceChanges { changed: devices.to_vec(), ..Default::default() },
734            ..Default::default()
735        };
736
737        self.save_changes(changes).await
738    }
739
740    /// Convenience helper to persist an array of [`InboundGroupSession`]s.
741    pub(crate) async fn save_inbound_group_sessions(
742        &self,
743        sessions: &[InboundGroupSession],
744    ) -> Result<()> {
745        let changes = Changes { inbound_group_sessions: sessions.to_vec(), ..Default::default() };
746
747        self.save_changes(changes).await
748    }
749
750    /// Get the display name of our own device.
751    pub(crate) async fn device_display_name(&self) -> Result<Option<String>, CryptoStoreError> {
752        Ok(self
753            .inner
754            .store
755            .get_device(self.user_id(), self.device_id())
756            .await?
757            .and_then(|d| d.display_name().map(|d| d.to_owned())))
758    }
759
760    /// Get the device data for the given [`UserId`] and [`DeviceId`].
761    ///
762    /// *Note*: This method will include our own device which is always present
763    /// in the store.
764    pub(crate) async fn get_device_data(
765        &self,
766        user_id: &UserId,
767        device_id: &DeviceId,
768    ) -> Result<Option<DeviceData>> {
769        self.inner.store.get_device(user_id, device_id).await
770    }
771
772    /// Get the device data for the given [`UserId`] and [`DeviceId`].
773    ///
774    /// *Note*: This method will **not** include our own device.
775    ///
776    /// Use this method if you need a list of recipients for a given user, since
777    /// we don't want to encrypt for our own device, otherwise take a look at
778    /// the [`Store::get_device_data_for_user`] method.
779    pub(crate) async fn get_device_data_for_user_filtered(
780        &self,
781        user_id: &UserId,
782    ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
783        self.inner.store.get_user_devices(user_id).await.map(|mut d| {
784            if user_id == self.user_id() {
785                d.remove(self.device_id());
786            }
787            d
788        })
789    }
790
791    /// Get the [`DeviceData`] for all the devices a user has.
792    ///
793    /// *Note*: This method will include our own device which is always present
794    /// in the store.
795    ///
796    /// Use this method if you need to operate on or update all devices of a
797    /// user, otherwise take a look at the
798    /// [`Store::get_device_data_for_user_filtered`] method.
799    pub(crate) async fn get_device_data_for_user(
800        &self,
801        user_id: &UserId,
802    ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
803        self.inner.store.get_user_devices(user_id).await
804    }
805
806    /// Get a [`Device`] for the given user with the given
807    /// [`Curve25519PublicKey`] key.
808    ///
809    /// *Note*: This method will include our own device which is always present
810    /// in the store.
811    pub(crate) async fn get_device_from_curve_key(
812        &self,
813        user_id: &UserId,
814        curve_key: Curve25519PublicKey,
815    ) -> Result<Option<Device>> {
816        self.get_user_devices(user_id)
817            .await
818            .map(|d| d.devices().find(|d| d.curve25519_key() == Some(curve_key)))
819    }
820
821    /// Get all devices associated with the given [`UserId`].
822    ///
823    /// This method is more expensive than the
824    /// [`Store::get_device_data_for_user`] method, since a [`Device`]
825    /// requires the [`OwnUserIdentityData`] and the [`UserIdentityData`] of the
826    /// device owner to be fetched from the store as well.
827    ///
828    /// *Note*: This method will include our own device which is always present
829    /// in the store.
830    pub(crate) async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices> {
831        let devices = self.get_device_data_for_user(user_id).await?;
832
833        let own_identity = self
834            .inner
835            .store
836            .get_user_identity(self.user_id())
837            .await?
838            .and_then(|i| i.own().cloned());
839        let device_owner_identity = self.inner.store.get_user_identity(user_id).await?;
840
841        Ok(UserDevices {
842            inner: devices,
843            verification_machine: self.inner.verification_machine.clone(),
844            own_identity,
845            device_owner_identity,
846        })
847    }
848
849    /// Get a [`Device`] for the given user with the given [`DeviceId`].
850    ///
851    /// This method is more expensive than the [`Store::get_device_data`] method
852    /// since a [`Device`] requires the [`OwnUserIdentityData`] and the
853    /// [`UserIdentityData`] of the device owner to be fetched from the
854    /// store as well.
855    ///
856    /// *Note*: This method will include our own device which is always present
857    /// in the store.
858    pub(crate) async fn get_device(
859        &self,
860        user_id: &UserId,
861        device_id: &DeviceId,
862    ) -> Result<Option<Device>> {
863        if let Some(device_data) = self.inner.store.get_device(user_id, device_id).await? {
864            Ok(Some(self.wrap_device_data(device_data).await?))
865        } else {
866            Ok(None)
867        }
868    }
869
870    /// Create a new device using the supplied [`DeviceData`]. Normally we would
871    /// call [`Self::get_device`] to find an existing device inside this
872    /// store. Only call this if you have some existing DeviceData and want
873    /// to wrap it with the extra information provided by a [`Device`].
874    pub(crate) async fn wrap_device_data(&self, device_data: DeviceData) -> Result<Device> {
875        let own_identity = self
876            .inner
877            .store
878            .get_user_identity(self.user_id())
879            .await?
880            .and_then(|i| i.own().cloned());
881
882        let device_owner_identity =
883            self.inner.store.get_user_identity(device_data.user_id()).await?;
884
885        Ok(Device {
886            inner: device_data,
887            verification_machine: self.inner.verification_machine.clone(),
888            own_identity,
889            device_owner_identity,
890        })
891    }
892
893    ///  Get the Identity of `user_id`
894    pub(crate) async fn get_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
895        let own_identity = self
896            .inner
897            .store
898            .get_user_identity(self.user_id())
899            .await?
900            .and_then(as_variant!(UserIdentityData::Own));
901
902        Ok(self.inner.store.get_user_identity(user_id).await?.map(|i| {
903            UserIdentity::new(
904                self.clone(),
905                i,
906                self.inner.verification_machine.to_owned(),
907                own_identity,
908            )
909        }))
910    }
911
912    /// Try to export the secret with the given secret name.
913    ///
914    /// The exported secret will be encoded as unpadded base64. Returns `Null`
915    /// if the secret can't be found.
916    ///
917    /// # Arguments
918    ///
919    /// * `secret_name` - The name of the secret that should be exported.
920    pub async fn export_secret(
921        &self,
922        secret_name: &SecretName,
923    ) -> Result<Option<String>, CryptoStoreError> {
924        Ok(match secret_name {
925            SecretName::CrossSigningMasterKey
926            | SecretName::CrossSigningUserSigningKey
927            | SecretName::CrossSigningSelfSigningKey => {
928                self.inner.identity.lock().await.export_secret(secret_name).await
929            }
930            SecretName::RecoveryKey => {
931                if let Some(key) = self.load_backup_keys().await?.decryption_key {
932                    let exported = key.to_base64();
933                    Some(exported)
934                } else {
935                    None
936                }
937            }
938            name => {
939                warn!(secret = ?name, "Unknown secret was requested");
940                None
941            }
942        })
943    }
944
945    /// Export all the private cross signing keys we have.
946    ///
947    /// The export will contain the seed for the ed25519 keys as a unpadded
948    /// base64 encoded string.
949    ///
950    /// This method returns `None` if we don't have any private cross signing
951    /// keys.
952    pub async fn export_cross_signing_keys(
953        &self,
954    ) -> Result<Option<CrossSigningKeyExport>, CryptoStoreError> {
955        let master_key = self.export_secret(&SecretName::CrossSigningMasterKey).await?;
956        let self_signing_key = self.export_secret(&SecretName::CrossSigningSelfSigningKey).await?;
957        let user_signing_key = self.export_secret(&SecretName::CrossSigningUserSigningKey).await?;
958
959        Ok(if master_key.is_none() && self_signing_key.is_none() && user_signing_key.is_none() {
960            None
961        } else {
962            Some(CrossSigningKeyExport { master_key, self_signing_key, user_signing_key })
963        })
964    }
965
966    /// Import our private cross signing keys.
967    ///
968    /// The export needs to contain the seed for the Ed25519 keys as an unpadded
969    /// base64 encoded string.
970    pub async fn import_cross_signing_keys(
971        &self,
972        export: CrossSigningKeyExport,
973    ) -> Result<CrossSigningStatus, SecretImportError> {
974        if let Some(public_identity) =
975            self.get_identity(self.user_id()).await?.and_then(|i| i.own())
976        {
977            let identity = self.inner.identity.lock().await;
978
979            identity
980                .import_secrets(
981                    public_identity.to_owned(),
982                    export.master_key.as_deref(),
983                    export.self_signing_key.as_deref(),
984                    export.user_signing_key.as_deref(),
985                )
986                .await?;
987
988            let status = identity.status().await;
989
990            let diff = identity.get_public_identity_diff(&public_identity.inner).await;
991
992            let mut changes =
993                Changes { private_identity: Some(identity.clone()), ..Default::default() };
994
995            if diff.none_differ() {
996                public_identity.mark_as_verified();
997                changes.identities.changed.push(UserIdentityData::Own(public_identity.inner));
998            }
999
1000            info!(?status, "Successfully imported the private cross-signing keys");
1001
1002            self.save_changes(changes).await?;
1003        } else {
1004            warn!(
1005                "No public identity found while importing cross-signing keys, \
1006                 a /keys/query needs to be done"
1007            );
1008        }
1009
1010        Ok(self.inner.identity.lock().await.status().await)
1011    }
1012
1013    /// Export all the secrets we have in the store into a [`SecretsBundle`].
1014    ///
1015    /// This method will export all the private cross-signing keys and, if
1016    /// available, the private part of a backup key and its accompanying
1017    /// version.
1018    ///
1019    /// The method will fail if we don't have all three private cross-signing
1020    /// keys available.
1021    ///
1022    /// **Warning**: Only export this and share it with a trusted recipient,
1023    /// i.e. if an existing device is sharing this with a new device.
1024    pub async fn export_secrets_bundle(&self) -> Result<SecretsBundle, SecretsBundleExportError> {
1025        let Some(cross_signing) = self.export_cross_signing_keys().await? else {
1026            return Err(SecretsBundleExportError::MissingCrossSigningKeys);
1027        };
1028
1029        let Some(master_key) = cross_signing.master_key.clone() else {
1030            return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::Master));
1031        };
1032
1033        let Some(user_signing_key) = cross_signing.user_signing_key.clone() else {
1034            return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::UserSigning));
1035        };
1036
1037        let Some(self_signing_key) = cross_signing.self_signing_key.clone() else {
1038            return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::SelfSigning));
1039        };
1040
1041        let backup_keys = self.load_backup_keys().await?;
1042
1043        let backup = if let Some(key) = backup_keys.decryption_key {
1044            if let Some(backup_version) = backup_keys.backup_version {
1045                Some(BackupSecrets::MegolmBackupV1Curve25519AesSha2(
1046                    MegolmBackupV1Curve25519AesSha2Secrets { key, backup_version },
1047                ))
1048            } else {
1049                return Err(SecretsBundleExportError::MissingBackupVersion);
1050            }
1051        } else {
1052            None
1053        };
1054
1055        Ok(SecretsBundle {
1056            cross_signing: CrossSigningSecrets { master_key, user_signing_key, self_signing_key },
1057            backup,
1058        })
1059    }
1060
1061    /// Import and persists secrets from a [`SecretsBundle`].
1062    ///
1063    /// This method will import all the private cross-signing keys and, if
1064    /// available, the private part of a backup key and its accompanying
1065    /// version into the store.
1066    ///
1067    /// **Warning**: Only import this from a trusted source, i.e. if an existing
1068    /// device is sharing this with a new device. The imported cross-signing
1069    /// keys will create a [`OwnUserIdentity`] and mark it as verified.
1070    ///
1071    /// The backup key will be persisted in the store and can be enabled using
1072    /// the [`BackupMachine`].
1073    pub async fn import_secrets_bundle(
1074        &self,
1075        bundle: &SecretsBundle,
1076    ) -> Result<(), SecretImportError> {
1077        let mut changes = Changes::default();
1078
1079        if let Some(backup_bundle) = &bundle.backup {
1080            match backup_bundle {
1081                BackupSecrets::MegolmBackupV1Curve25519AesSha2(bundle) => {
1082                    changes.backup_decryption_key = Some(bundle.key.clone());
1083                    changes.backup_version = Some(bundle.backup_version.clone());
1084                }
1085            }
1086        }
1087
1088        let identity = self.inner.identity.lock().await;
1089
1090        identity
1091            .import_secrets_unchecked(
1092                Some(&bundle.cross_signing.master_key),
1093                Some(&bundle.cross_signing.self_signing_key),
1094                Some(&bundle.cross_signing.user_signing_key),
1095            )
1096            .await?;
1097
1098        let public_identity = identity.to_public_identity().await.expect(
1099            "We should be able to create a new public identity since we just imported \
1100             all the private cross-signing keys",
1101        );
1102
1103        changes.private_identity = Some(identity.clone());
1104        changes.identities.new.push(UserIdentityData::Own(public_identity));
1105
1106        Ok(self.save_changes(changes).await?)
1107    }
1108
1109    /// Import the given `secret` named `secret_name` into the keystore.
1110    pub async fn import_secret(&self, secret: &GossippedSecret) -> Result<(), SecretImportError> {
1111        match &secret.secret_name {
1112            SecretName::CrossSigningMasterKey
1113            | SecretName::CrossSigningUserSigningKey
1114            | SecretName::CrossSigningSelfSigningKey => {
1115                if let Some(public_identity) =
1116                    self.get_identity(self.user_id()).await?.and_then(|i| i.own())
1117                {
1118                    let identity = self.inner.identity.lock().await;
1119
1120                    identity
1121                        .import_secret(
1122                            public_identity,
1123                            &secret.secret_name,
1124                            &secret.event.content.secret,
1125                        )
1126                        .await?;
1127                    info!(
1128                        secret_name = ?secret.secret_name,
1129                        "Successfully imported a private cross signing key"
1130                    );
1131
1132                    let changes =
1133                        Changes { private_identity: Some(identity.clone()), ..Default::default() };
1134
1135                    self.save_changes(changes).await?;
1136                }
1137            }
1138            SecretName::RecoveryKey => {
1139                // We don't import the decryption key here since we'll want to
1140                // check if the public key matches to the latest version on the
1141                // server. We instead put the secret into a secret inbox where
1142                // it will stay until it either gets overwritten
1143                // or the user accepts the secret.
1144            }
1145            name => {
1146                warn!(secret = ?name, "Tried to import an unknown secret");
1147            }
1148        }
1149
1150        Ok(())
1151    }
1152
1153    /// Check whether there is a global flag to only encrypt messages for
1154    /// trusted devices or for everyone.
1155    pub async fn get_only_allow_trusted_devices(&self) -> Result<bool> {
1156        let value = self.get_value("only_allow_trusted_devices").await?.unwrap_or_default();
1157        Ok(value)
1158    }
1159
1160    /// Set global flag whether to encrypt messages for untrusted devices, or
1161    /// whether they should be excluded from the conversation.
1162    pub async fn set_only_allow_trusted_devices(
1163        &self,
1164        block_untrusted_devices: bool,
1165    ) -> Result<()> {
1166        self.set_value("only_allow_trusted_devices", &block_untrusted_devices).await
1167    }
1168
1169    /// Get custom stored value associated with a key
1170    pub async fn get_value<T: DeserializeOwned>(&self, key: &str) -> Result<Option<T>> {
1171        let Some(value) = self.get_custom_value(key).await? else {
1172            return Ok(None);
1173        };
1174        let deserialized = self.deserialize_value(&value)?;
1175        Ok(Some(deserialized))
1176    }
1177
1178    /// Store custom value associated with a key
1179    pub async fn set_value(&self, key: &str, value: &impl Serialize) -> Result<()> {
1180        let serialized = self.serialize_value(value)?;
1181        self.set_custom_value(key, serialized).await?;
1182        Ok(())
1183    }
1184
1185    fn serialize_value(&self, value: &impl Serialize) -> Result<Vec<u8>> {
1186        let serialized =
1187            rmp_serde::to_vec_named(value).map_err(|x| CryptoStoreError::Backend(x.into()))?;
1188        Ok(serialized)
1189    }
1190
1191    fn deserialize_value<T: DeserializeOwned>(&self, value: &[u8]) -> Result<T> {
1192        let deserialized =
1193            rmp_serde::from_slice(value).map_err(|e| CryptoStoreError::Backend(e.into()))?;
1194        Ok(deserialized)
1195    }
1196
1197    /// Receive notifications of room keys being received as a [`Stream`].
1198    ///
1199    /// Each time a room key is updated in any way, an update will be sent to
1200    /// the stream. Updates that happen at the same time are batched into a
1201    /// [`Vec`].
1202    ///
1203    /// If the reader of the stream lags too far behind an error will be sent to
1204    /// the reader.
1205    ///
1206    /// The stream will terminate once all references to the underlying
1207    /// `CryptoStoreWrapper` are dropped.
1208    pub fn room_keys_received_stream(
1209        &self,
1210    ) -> impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<> {
1211        self.inner.store.room_keys_received_stream()
1212    }
1213
1214    /// Receive notifications of received `m.room_key.withheld` messages.
1215    ///
1216    /// Each time an `m.room_key.withheld` is received and stored, an update
1217    /// will be sent to the stream. Updates that happen at the same time are
1218    /// batched into a [`Vec`].
1219    ///
1220    /// If the reader of the stream lags too far behind, a warning will be
1221    /// logged and items will be dropped.
1222    pub fn room_keys_withheld_received_stream(
1223        &self,
1224    ) -> impl Stream<Item = Vec<RoomKeyWithheldInfo>> + use<> {
1225        self.inner.store.room_keys_withheld_received_stream()
1226    }
1227
1228    /// Returns a stream of user identity updates, allowing users to listen for
1229    /// notifications about new or changed user identities.
1230    ///
1231    /// The stream produced by this method emits updates whenever a new user
1232    /// identity is discovered or when an existing identities information is
1233    /// changed. Users can subscribe to this stream and receive updates in
1234    /// real-time.
1235    ///
1236    /// Caution: the returned stream will never terminate, and it holds a
1237    /// reference to the [`CryptoStore`]. Listeners should be careful to avoid
1238    /// resource leaks.
1239    ///
1240    /// # Examples
1241    ///
1242    /// ```no_run
1243    /// # use matrix_sdk_crypto::OlmMachine;
1244    /// # use ruma::{device_id, user_id};
1245    /// # use futures_util::{pin_mut, StreamExt};
1246    /// # let machine: OlmMachine = unimplemented!();
1247    /// # futures_executor::block_on(async {
1248    /// let identities_stream = machine.store().user_identities_stream();
1249    /// pin_mut!(identities_stream);
1250    ///
1251    /// for identity_updates in identities_stream.next().await {
1252    ///     for (_, identity) in identity_updates.new {
1253    ///         println!("A new identity has been added {}", identity.user_id());
1254    ///     }
1255    /// }
1256    /// # });
1257    /// ```
1258    pub fn user_identities_stream(&self) -> impl Stream<Item = IdentityUpdates> + use<> {
1259        let verification_machine = self.inner.verification_machine.to_owned();
1260
1261        let this = self.clone();
1262        self.inner.store.identities_stream().map(move |(own_identity, identities, _)| {
1263            let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
1264
1265            let map_identity = |(user_id, identity)| {
1266                (
1267                    user_id,
1268                    UserIdentity::new(
1269                        this.clone(),
1270                        identity,
1271                        verification_machine.to_owned(),
1272                        own_identity.to_owned(),
1273                    ),
1274                )
1275            };
1276
1277            let new = new_identities.into_iter().map(map_identity).collect();
1278            let changed = changed_identities.into_iter().map(map_identity).collect();
1279            let unchanged = unchanged_identities.into_iter().map(map_identity).collect();
1280
1281            IdentityUpdates { new, changed, unchanged }
1282        })
1283    }
1284
1285    /// Returns a stream of device updates, allowing users to listen for
1286    /// notifications about new or changed devices.
1287    ///
1288    /// The stream produced by this method emits updates whenever a new device
1289    /// is discovered or when an existing device's information is changed. Users
1290    /// can subscribe to this stream and receive updates in real-time.
1291    ///
1292    /// Caution: the returned stream will never terminate, and it holds a
1293    /// reference to the [`CryptoStore`]. Listeners should be careful to avoid
1294    /// resource leaks.
1295    ///
1296    /// # Examples
1297    ///
1298    /// ```no_run
1299    /// # use matrix_sdk_crypto::OlmMachine;
1300    /// # use ruma::{device_id, user_id};
1301    /// # use futures_util::{pin_mut, StreamExt};
1302    /// # let machine: OlmMachine = unimplemented!();
1303    /// # futures_executor::block_on(async {
1304    /// let devices_stream = machine.store().devices_stream();
1305    /// pin_mut!(devices_stream);
1306    ///
1307    /// for device_updates in devices_stream.next().await {
1308    ///     if let Some(user_devices) = device_updates.new.get(machine.user_id()) {
1309    ///         for device in user_devices.values() {
1310    ///             println!("A new device has been added {}", device.device_id());
1311    ///         }
1312    ///     }
1313    /// }
1314    /// # });
1315    /// ```
1316    pub fn devices_stream(&self) -> impl Stream<Item = DeviceUpdates> + use<> {
1317        let verification_machine = self.inner.verification_machine.to_owned();
1318
1319        self.inner.store.identities_stream().map(move |(own_identity, identities, devices)| {
1320            collect_device_updates(
1321                verification_machine.to_owned(),
1322                own_identity,
1323                identities,
1324                devices,
1325            )
1326        })
1327    }
1328
1329    /// Returns a [`Stream`] of user identity and device updates
1330    ///
1331    /// The stream returned by this method returns the same data as
1332    /// [`Store::user_identities_stream`] and [`Store::devices_stream`] but does
1333    /// not include references to the `VerificationMachine`. It is therefore a
1334    /// lower-level view on that data.
1335    ///
1336    /// The stream will terminate once all references to the underlying
1337    /// `CryptoStoreWrapper` are dropped.
1338    pub fn identities_stream_raw(
1339        &self,
1340    ) -> impl Stream<Item = (IdentityChanges, DeviceChanges)> + use<> {
1341        self.inner.store.identities_stream().map(|(_, identities, devices)| (identities, devices))
1342    }
1343
1344    /// Creates a [`CrossProcessLock`] for this store, that will contain the
1345    /// given key and value when hold.
1346    pub fn create_store_lock(
1347        &self,
1348        lock_key: String,
1349        lock_value: String,
1350    ) -> CrossProcessLock<LockableCryptoStore> {
1351        self.inner.store.create_store_lock(lock_key, lock_value)
1352    }
1353
1354    /// Receive notifications of gossipped secrets being received and stored in
1355    /// the secret inbox as a [`Stream`].
1356    ///
1357    /// The gossipped secrets are received using the `m.secret.send` event type
1358    /// and are guaranteed to have been received over a 1-to-1 Olm
1359    /// [`Session`] from a verified [`Device`].
1360    ///
1361    /// The [`GossippedSecret`] can also be later found in the secret inbox and
1362    /// retrieved using the [`CryptoStore::get_secrets_from_inbox()`] method.
1363    ///
1364    /// After a suitable secret of a certain type has been found it can be
1365    /// removed from the store
1366    /// using the [`CryptoStore::delete_secrets_from_inbox()`] method.
1367    ///
1368    /// The only secret this will currently broadcast is the
1369    /// `m.megolm_backup.v1`.
1370    ///
1371    /// If the reader of the stream lags too far behind, a warning will be
1372    /// logged and items will be dropped.
1373    ///
1374    /// # Examples
1375    ///
1376    /// ```no_run
1377    /// # use matrix_sdk_crypto::OlmMachine;
1378    /// # use ruma::{device_id, user_id};
1379    /// # use futures_util::{pin_mut, StreamExt};
1380    /// # let alice = user_id!("@alice:example.org").to_owned();
1381    /// # futures_executor::block_on(async {
1382    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1383    ///
1384    /// let secret_stream = machine.store().secrets_stream();
1385    /// pin_mut!(secret_stream);
1386    ///
1387    /// for secret in secret_stream.next().await {
1388    ///     // Accept the secret if it's valid, then delete all the secrets of this type.
1389    ///     machine.store().delete_secrets_from_inbox(&secret.secret_name);
1390    /// }
1391    /// # });
1392    /// ```
1393    pub fn secrets_stream(&self) -> impl Stream<Item = GossippedSecret> + use<> {
1394        self.inner.store.secrets_stream()
1395    }
1396
1397    /// Receive notifications of historic room key bundles as a [`Stream`].
1398    ///
1399    /// Historic room key bundles are defined in [MSC4268](https://github.com/matrix-org/matrix-spec-proposals/pull/4268).
1400    ///
1401    /// Each time a historic room key bundle was received, an update will be
1402    /// sent to the stream. This stream can be used to accept historic room key
1403    /// bundles that arrive out of order, i.e. the bundle arrives after the
1404    /// user has already accepted a room invitation.
1405    ///
1406    /// # Examples
1407    ///
1408    /// ```no_run
1409    /// # use matrix_sdk_crypto::{
1410    /// #    OlmMachine,
1411    /// #    store::types::StoredRoomKeyBundleData,
1412    /// #    types::room_history::RoomKeyBundle
1413    /// # };
1414    /// # use ruma::{device_id, user_id};
1415    /// # use futures_util::{pin_mut, StreamExt};
1416    /// # let alice = user_id!("@alice:example.org").to_owned();
1417    /// # async {
1418    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1419    /// let bundle_stream = machine.store().historic_room_key_stream();
1420    /// pin_mut!(bundle_stream);
1421    ///
1422    /// while let Some(bundle_info) = bundle_stream.next().await {
1423    ///     // Try to find the bundle content in the store and if it's valid accept it.
1424    ///     if let Some(bundle_data) = machine.store().get_received_room_key_bundle_data(&bundle_info.room_id, &bundle_info.sender).await? {
1425    ///         // Download the bundle now and import it.
1426    ///         let bundle: RoomKeyBundle = todo!("Download the bundle");
1427    ///         machine.store().receive_room_key_bundle(
1428    ///             &bundle_data,
1429    ///             bundle,
1430    ///             |_, _| {},
1431    ///         ).await?;
1432    ///     }
1433    /// }
1434    /// # anyhow::Ok(()) };
1435    /// ```
1436    pub fn historic_room_key_stream(&self) -> impl Stream<Item = RoomKeyBundleInfo> + use<> {
1437        self.inner.store.historic_room_key_stream()
1438    }
1439
1440    /// Import the given room keys into the store.
1441    ///
1442    /// # Arguments
1443    ///
1444    /// * `exported_keys` - The keys to be imported.
1445    /// * `from_backup_version` - If the keys came from key backup, the key
1446    ///   backup version. This will cause the keys to be marked as already
1447    ///   backed up, and therefore not requiring another backup.
1448    /// * `progress_listener` - Callback which will be called after each key is
1449    ///   processed. Called with arguments `(processed, total)` where
1450    ///   `processed` is the number of keys processed so far, and `total` is the
1451    ///   total number of keys (i.e., `exported_keys.len()`).
1452    pub async fn import_room_keys(
1453        &self,
1454        exported_keys: Vec<ExportedRoomKey>,
1455        from_backup_version: Option<&str>,
1456        progress_listener: impl Fn(usize, usize),
1457    ) -> Result<RoomKeyImportResult> {
1458        let exported_keys = exported_keys.iter().filter_map(|key| {
1459            key.try_into()
1460                .map_err(|e| {
1461                    warn!(
1462                        sender_key = key.sender_key().to_base64(),
1463                        room_id = ?key.room_id(),
1464                        session_id = key.session_id(),
1465                        error = ?e,
1466                        "Couldn't import a room key from a file export."
1467                    );
1468                })
1469                .ok()
1470        });
1471        self.import_sessions_impl(exported_keys, from_backup_version, progress_listener).await
1472    }
1473
1474    /// Import the given room keys into our store.
1475    ///
1476    /// # Arguments
1477    ///
1478    /// * `exported_keys` - A list of previously exported keys that should be
1479    ///   imported into our store. If we already have a better version of a key
1480    ///   the key will *not* be imported.
1481    ///
1482    /// Returns a tuple of numbers that represent the number of sessions that
1483    /// were imported and the total number of sessions that were found in the
1484    /// key export.
1485    ///
1486    /// # Examples
1487    ///
1488    /// ```no_run
1489    /// # use std::io::Cursor;
1490    /// # use matrix_sdk_crypto::{OlmMachine, decrypt_room_key_export};
1491    /// # use ruma::{device_id, user_id};
1492    /// # let alice = user_id!("@alice:example.org");
1493    /// # async {
1494    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1495    /// # let export = Cursor::new("".to_owned());
1496    /// let exported_keys = decrypt_room_key_export(export, "1234").unwrap();
1497    /// machine.store().import_exported_room_keys(exported_keys, |_, _| {}).await.unwrap();
1498    /// # };
1499    /// ```
1500    pub async fn import_exported_room_keys(
1501        &self,
1502        exported_keys: Vec<ExportedRoomKey>,
1503        progress_listener: impl Fn(usize, usize),
1504    ) -> Result<RoomKeyImportResult> {
1505        self.import_room_keys(exported_keys, None, progress_listener).await
1506    }
1507
1508    async fn import_sessions_impl(
1509        &self,
1510        sessions: impl Iterator<Item = InboundGroupSession>,
1511        from_backup_version: Option<&str>,
1512        progress_listener: impl Fn(usize, usize),
1513    ) -> Result<RoomKeyImportResult> {
1514        let sessions: Vec<_> = sessions.collect();
1515        let mut imported_sessions = Vec::new();
1516
1517        let total_count = sessions.len();
1518        let mut keys = BTreeMap::new();
1519
1520        for (i, session) in sessions.into_iter().enumerate() {
1521            // Only import the session if we didn't have this session or
1522            // if it's a better version of the same session.
1523            if let Some(merged) = self.merge_received_group_session(session).await? {
1524                if from_backup_version.is_some() {
1525                    merged.mark_as_backed_up();
1526                }
1527
1528                keys.entry(merged.room_id().to_owned())
1529                    .or_insert_with(BTreeMap::new)
1530                    .entry(merged.sender_key().to_base64())
1531                    .or_insert_with(BTreeSet::new)
1532                    .insert(merged.session_id().to_owned());
1533
1534                imported_sessions.push(merged);
1535            }
1536
1537            progress_listener(i, total_count);
1538        }
1539
1540        let imported_count = imported_sessions.len();
1541
1542        self.inner
1543            .store
1544            .save_inbound_group_sessions(imported_sessions, from_backup_version)
1545            .await?;
1546
1547        info!(total_count, imported_count, room_keys = ?keys, "Successfully imported room keys");
1548
1549        Ok(RoomKeyImportResult::new(imported_count, total_count, keys))
1550    }
1551
1552    pub(crate) fn crypto_store(&self) -> Arc<CryptoStoreWrapper> {
1553        self.inner.store.clone()
1554    }
1555
1556    /// Export the keys that match the given predicate.
1557    ///
1558    /// # Arguments
1559    ///
1560    /// * `predicate` - A closure that will be called for every known
1561    ///   `InboundGroupSession`, which represents a room key. If the closure
1562    ///   returns `true` the `InboundGroupSession` will be included in the
1563    ///   export, if the closure returns `false` it will not be included.
1564    ///
1565    /// # Examples
1566    ///
1567    /// ```no_run
1568    /// # use matrix_sdk_crypto::{OlmMachine, encrypt_room_key_export};
1569    /// # use ruma::{device_id, user_id, room_id};
1570    /// # let alice = user_id!("@alice:example.org");
1571    /// # async {
1572    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1573    /// let room_id = room_id!("!test:localhost");
1574    /// let exported_keys = machine.store().export_room_keys(|s| s.room_id() == room_id).await.unwrap();
1575    /// let encrypted_export = encrypt_room_key_export(&exported_keys, "1234", 1);
1576    /// # };
1577    /// ```
1578    pub async fn export_room_keys(
1579        &self,
1580        predicate: impl FnMut(&InboundGroupSession) -> bool,
1581    ) -> Result<Vec<ExportedRoomKey>> {
1582        let mut exported = Vec::new();
1583
1584        let mut sessions = self.get_inbound_group_sessions().await?;
1585        sessions.retain(predicate);
1586
1587        for session in sessions {
1588            let export = session.export().await;
1589            exported.push(export);
1590        }
1591
1592        Ok(exported)
1593    }
1594
1595    /// Export room keys matching a predicate, providing them as an async
1596    /// `Stream`.
1597    ///
1598    /// # Arguments
1599    ///
1600    /// * `predicate` - A closure that will be called for every known
1601    ///   `InboundGroupSession`, which represents a room key. If the closure
1602    ///   returns `true` the `InboundGroupSession` will be included in the
1603    ///   export, if the closure returns `false` it will not be included.
1604    ///
1605    /// # Examples
1606    ///
1607    /// ```no_run
1608    /// use std::pin::pin;
1609    ///
1610    /// use matrix_sdk_crypto::{OlmMachine, olm::ExportedRoomKey};
1611    /// use ruma::{device_id, room_id, user_id};
1612    /// use tokio_stream::StreamExt;
1613    /// # async {
1614    /// let alice = user_id!("@alice:example.org");
1615    /// let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1616    /// let room_id = room_id!("!test:localhost");
1617    /// let mut keys = pin!(
1618    ///     machine
1619    ///         .store()
1620    ///         .export_room_keys_stream(|s| s.room_id() == room_id)
1621    ///         .await
1622    ///         .unwrap()
1623    /// );
1624    /// while let Some(key) = keys.next().await {
1625    ///     println!("{}", key.room_id);
1626    /// }
1627    /// # };
1628    /// ```
1629    pub async fn export_room_keys_stream(
1630        &self,
1631        predicate: impl FnMut(&InboundGroupSession) -> bool,
1632    ) -> Result<impl Stream<Item = ExportedRoomKey>> {
1633        // TODO: if/when there is a get_inbound_group_sessions_stream, use that here.
1634        let sessions = self.get_inbound_group_sessions().await?;
1635        Ok(futures_util::stream::iter(sessions.into_iter().filter(predicate))
1636            .then(|session| async move { session.export().await }))
1637    }
1638
1639    /// Assemble a room key bundle for sharing encrypted history, as per
1640    /// [MSC4268].
1641    ///
1642    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
1643    pub async fn build_room_key_bundle(
1644        &self,
1645        room_id: &RoomId,
1646    ) -> std::result::Result<RoomKeyBundle, CryptoStoreError> {
1647        let sessions = self.get_inbound_group_sessions_by_room_id(room_id).await?;
1648
1649        let mut bundle = RoomKeyBundle::default();
1650        for session in sessions {
1651            if session.shared_history() {
1652                bundle.room_keys.push(session.export().await.into());
1653            } else {
1654                bundle.withheld.push(RoomKeyWithheldContent::new(
1655                    session.algorithm().to_owned(),
1656                    WithheldCode::HistoryNotShared,
1657                    session.room_id().to_owned(),
1658                    session.session_id().to_owned(),
1659                    session.sender_key().to_owned(),
1660                    self.device_id().to_owned(),
1661                ));
1662            }
1663        }
1664
1665        // If we received a key bundle ourselves, in which one or more sessions was
1666        // marked as "history not shared", pass that on to the new user.
1667        let withhelds = self.get_withheld_sessions_by_room_id(room_id).await?;
1668        for withheld in withhelds {
1669            if withheld.content.withheld_code() == WithheldCode::HistoryNotShared {
1670                bundle.withheld.push(withheld.content);
1671            }
1672        }
1673
1674        Ok(bundle)
1675    }
1676
1677    /// Import the contents of a downloaded and decrypted [MSC4268] key bundle.
1678    ///
1679    /// # Arguments
1680    ///
1681    /// * `bundle_info` - The [`StoredRoomKeyBundleData`] of the bundle that is
1682    ///   being received.
1683    /// * `bundle` - The decrypted and deserialized bundle itself.
1684    ///
1685    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
1686    #[instrument(skip(self, bundle, progress_listener), fields(bundle_size = bundle.room_keys.len(), sender_data))]
1687    pub async fn receive_room_key_bundle(
1688        &self,
1689        bundle_info: &StoredRoomKeyBundleData,
1690        bundle: RoomKeyBundle,
1691        progress_listener: impl Fn(usize, usize),
1692    ) -> Result<(), CryptoStoreError> {
1693        let sender_data = if bundle_info.sender_data.should_recalculate() {
1694            let device = self
1695                .get_device_from_curve_key(&bundle_info.sender_user, bundle_info.sender_key)
1696                .await?;
1697
1698            device
1699                .as_ref()
1700                .map(SenderData::from_device)
1701                .unwrap_or_else(|| bundle_info.sender_data.clone())
1702        } else {
1703            bundle_info.sender_data.clone()
1704        };
1705
1706        tracing::Span::current().record("sender_data", tracing::field::debug(&sender_data));
1707
1708        // The sender's device must be either `SenderData::SenderUnverified` (i.e.,
1709        // TOFU-trusted) or `SenderData::SenderVerified` (i.e., fully verified
1710        // via user verification and cross-signing).
1711        let Ok(forwarder_data) = (&sender_data).try_into() else {
1712            warn!(
1713                "Not accepting a historic room key bundle due to insufficient trust in the sender"
1714            );
1715            return Ok(());
1716        };
1717
1718        self.import_room_key_bundle_sessions(
1719            bundle_info,
1720            &bundle,
1721            &forwarder_data,
1722            progress_listener,
1723        )
1724        .await?;
1725        self.import_room_key_bundle_withheld_info(bundle_info, &bundle).await?;
1726
1727        Ok(())
1728    }
1729
1730    async fn import_room_key_bundle_sessions(
1731        &self,
1732        bundle_info: &StoredRoomKeyBundleData,
1733        bundle: &RoomKeyBundle,
1734        forwarder_data: &ForwarderData,
1735        progress_listener: impl Fn(usize, usize),
1736    ) -> Result<(), CryptoStoreError> {
1737        let (good, bad): (Vec<_>, Vec<_>) = bundle.room_keys.iter().partition_map(|key| {
1738            if key.room_id != bundle_info.bundle_data.room_id {
1739                trace!("Ignoring key for incorrect room {} in bundle", key.room_id);
1740                Either::Right(key)
1741            } else {
1742                Either::Left(key)
1743            }
1744        });
1745
1746        match (bad.is_empty(), good.is_empty()) {
1747            // Case 1: Completely empty bundle.
1748            (true, true) => {
1749                warn!("Received a completely empty room key bundle");
1750            }
1751
1752            // Case 2: A bundle for the wrong room.
1753            (false, true) => {
1754                let bad_keys: Vec<_> =
1755                    bad.iter().map(|&key| (&key.room_id, &key.session_id)).collect();
1756
1757                warn!(
1758                    ?bad_keys,
1759                    "Received a room key bundle for the wrong room, ignoring all room keys from the bundle"
1760                );
1761            }
1762
1763            // Case 3: A bundle containing useful room keys.
1764            (_, false) => {
1765                // We have at least some good keys, if we also have some bad ones let's
1766                // mention that here.
1767                if !bad.is_empty() {
1768                    warn!(
1769                        bad_key_count = bad.len(),
1770                        "The room key bundle contained some room keys \
1771                         that were meant for a different room"
1772                    );
1773                }
1774
1775                let keys = good.iter().filter_map(|key| {
1776                    key.try_into_inbound_group_session(forwarder_data)
1777                        .map_err(|e| {
1778                            warn!(
1779                                sender_key = ?key.sender_key().to_base64(),
1780                                room_id = ?key.room_id(),
1781                                session_id = key.session_id(),
1782                                error = ?e,
1783                                "Couldn't import a room key from a key bundle."
1784                            );
1785                        })
1786                        .ok()
1787                });
1788
1789                self.import_sessions_impl(keys, None, progress_listener).await?;
1790            }
1791        }
1792
1793        Ok(())
1794    }
1795
1796    async fn import_room_key_bundle_withheld_info(
1797        &self,
1798        bundle_info: &StoredRoomKeyBundleData,
1799        bundle: &RoomKeyBundle,
1800    ) -> Result<(), CryptoStoreError> {
1801        let mut session_id_to_withheld_code_map = BTreeMap::new();
1802
1803        let mut changes = Changes::default();
1804        for withheld in &bundle.withheld {
1805            let (room_id, session_id) = match withheld {
1806                RoomKeyWithheldContent::MegolmV1AesSha2(c) => match (c.room_id(), c.session_id()) {
1807                    (Some(room_id), Some(session_id)) => (room_id, session_id),
1808                    _ => continue,
1809                },
1810                #[cfg(feature = "experimental-algorithms")]
1811                RoomKeyWithheldContent::MegolmV2AesSha2(c) => match (c.room_id(), c.session_id()) {
1812                    (Some(room_id), Some(session_id)) => (room_id, session_id),
1813                    _ => continue,
1814                },
1815                RoomKeyWithheldContent::Unknown(_) => continue,
1816            };
1817
1818            if room_id != bundle_info.bundle_data.room_id {
1819                trace!("Ignoring withheld info for incorrect room {} in bundle", room_id);
1820                continue;
1821            }
1822
1823            changes.withheld_session_info.entry(room_id.to_owned()).or_default().insert(
1824                session_id.to_owned(),
1825                RoomKeyWithheldEntry {
1826                    sender: bundle_info.sender_user.clone(),
1827                    content: withheld.to_owned(),
1828                },
1829            );
1830            session_id_to_withheld_code_map.insert(session_id, withheld.withheld_code());
1831        }
1832
1833        self.save_changes(changes).await?;
1834
1835        info!(
1836            room_id = ?bundle_info.bundle_data.room_id,
1837            ?session_id_to_withheld_code_map,
1838            "Successfully imported withheld info from room key bundle",
1839        );
1840
1841        Ok(())
1842    }
1843}
1844
1845impl Deref for Store {
1846    type Target = DynCryptoStore;
1847
1848    fn deref(&self) -> &Self::Target {
1849        self.inner.store.deref().deref()
1850    }
1851}
1852
1853/// A crypto store that implements primitives for cross-process locking.
1854#[derive(Clone, Debug)]
1855pub struct LockableCryptoStore(Arc<dyn CryptoStore<Error = CryptoStoreError>>);
1856
1857impl matrix_sdk_common::cross_process_lock::TryLock for LockableCryptoStore {
1858    type LockError = CryptoStoreError;
1859
1860    async fn try_lock(
1861        &self,
1862        lease_duration_ms: u32,
1863        key: &str,
1864        holder: &str,
1865    ) -> std::result::Result<Option<CrossProcessLockGeneration>, Self::LockError> {
1866        self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
1867    }
1868}
1869
1870#[cfg(test)]
1871mod tests {
1872    use std::{collections::BTreeMap, pin::pin};
1873
1874    use assert_matches2::{assert_let, assert_matches};
1875    use futures_util::StreamExt;
1876    use insta::{_macro_support::Content, assert_json_snapshot, internals::ContentPath};
1877    use matrix_sdk_test::async_test;
1878    use ruma::{
1879        RoomId, device_id,
1880        events::room::{EncryptedFileInit, JsonWebKeyInit},
1881        owned_device_id, owned_mxc_uri, room_id,
1882        serde::Base64,
1883        user_id,
1884    };
1885    use serde_json::json;
1886    use vodozemac::{Ed25519Keypair, megolm::SessionKey};
1887
1888    use crate::{
1889        Account, OlmMachine,
1890        machine::test_helpers::get_machine_pair,
1891        olm::{InboundGroupSession, SenderData},
1892        store::types::{DehydratedDeviceKey, RoomKeyWithheldEntry, StoredRoomKeyBundleData},
1893        types::{
1894            EventEncryptionAlgorithm,
1895            events::{
1896                room_key_bundle::RoomKeyBundleContent,
1897                room_key_withheld::{MegolmV1AesSha2WithheldContent, RoomKeyWithheldContent},
1898            },
1899        },
1900    };
1901
1902    #[async_test]
1903    async fn test_merge_received_group_session() {
1904        let alice_account = Account::with_device_id(user_id!("@a:s.co"), device_id!("ABC"));
1905        let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("DEF")).await;
1906
1907        let room_id = room_id!("!test:localhost");
1908
1909        let megolm_signing_key = Ed25519Keypair::new();
1910        let inbound = make_inbound_group_session(&alice_account, &megolm_signing_key, room_id);
1911
1912        // Bob already knows about the session, at index 5, with the device keys.
1913        let mut inbound_at_index_5 =
1914            InboundGroupSession::from_export(&inbound.export_at_index(5).await).unwrap();
1915        inbound_at_index_5.sender_data = inbound.sender_data.clone();
1916        bob.store().save_inbound_group_sessions(&[inbound_at_index_5.clone()]).await.unwrap();
1917
1918        // No changes if we get a disconnected session.
1919        let disconnected = make_inbound_group_session(&alice_account, &megolm_signing_key, room_id);
1920        assert_eq!(bob.store().merge_received_group_session(disconnected).await.unwrap(), None);
1921
1922        // No changes needed when we receive a worse copy of the session
1923        let mut worse =
1924            InboundGroupSession::from_export(&inbound.export_at_index(10).await).unwrap();
1925        worse.sender_data = inbound.sender_data.clone();
1926        assert_eq!(bob.store().merge_received_group_session(worse).await.unwrap(), None);
1927
1928        // Nor when we receive an exact copy of what we already have
1929        let mut copy = InboundGroupSession::from_pickle(inbound_at_index_5.pickle().await).unwrap();
1930        copy.sender_data = inbound.sender_data.clone();
1931        assert_eq!(bob.store().merge_received_group_session(copy).await.unwrap(), None);
1932
1933        // But when we receive a better copy of the session, we should get it back
1934        let mut better =
1935            InboundGroupSession::from_export(&inbound.export_at_index(0).await).unwrap();
1936        better.sender_data = inbound.sender_data.clone();
1937        assert_let!(Some(update) = bob.store().merge_received_group_session(better).await.unwrap());
1938        assert_eq!(update.first_known_index(), 0);
1939
1940        // A worse copy of the ratchet, but better trust data
1941        {
1942            let mut worse_ratchet_better_trust =
1943                InboundGroupSession::from_export(&inbound.export_at_index(10).await).unwrap();
1944            let updated_sender_data = SenderData::sender_verified(
1945                alice_account.user_id(),
1946                alice_account.device_id(),
1947                Ed25519Keypair::new().public_key(),
1948            );
1949            worse_ratchet_better_trust.sender_data = updated_sender_data.clone();
1950            assert_let!(
1951                Some(update) = bob
1952                    .store()
1953                    .merge_received_group_session(worse_ratchet_better_trust)
1954                    .await
1955                    .unwrap()
1956            );
1957            assert_eq!(update.sender_data, updated_sender_data);
1958            assert_eq!(update.first_known_index(), 5);
1959            assert_eq!(
1960                update.export_at_index(0).await.session_key.to_bytes(),
1961                inbound.export_at_index(5).await.session_key.to_bytes()
1962            );
1963        }
1964
1965        // A better copy of the ratchet, but worse trust data
1966        {
1967            let mut better_ratchet_worse_trust =
1968                InboundGroupSession::from_export(&inbound.export_at_index(0).await).unwrap();
1969            let updated_sender_data = SenderData::unknown();
1970            better_ratchet_worse_trust.sender_data = updated_sender_data.clone();
1971            assert_let!(
1972                Some(update) = bob
1973                    .store()
1974                    .merge_received_group_session(better_ratchet_worse_trust)
1975                    .await
1976                    .unwrap()
1977            );
1978            assert_eq!(update.sender_data, inbound.sender_data);
1979            assert_eq!(update.first_known_index(), 0);
1980            assert_eq!(
1981                update.export_at_index(0).await.session_key.to_bytes(),
1982                inbound.export_at_index(0).await.session_key.to_bytes()
1983            );
1984        }
1985    }
1986
1987    /// Create an [`InboundGroupSession`] for the given room, using the given
1988    /// Ed25519 key as the signing key/session ID.
1989    fn make_inbound_group_session(
1990        sender_account: &Account,
1991        signing_key: &Ed25519Keypair,
1992        room_id: &RoomId,
1993    ) -> InboundGroupSession {
1994        InboundGroupSession::new(
1995            sender_account.identity_keys.curve25519,
1996            sender_account.identity_keys.ed25519,
1997            room_id,
1998            &make_session_key(signing_key),
1999            SenderData::device_info(crate::types::DeviceKeys::new(
2000                sender_account.user_id().to_owned(),
2001                sender_account.device_id().to_owned(),
2002                vec![],
2003                BTreeMap::new(),
2004                crate::types::Signatures::new(),
2005            )),
2006            None,
2007            EventEncryptionAlgorithm::MegolmV1AesSha2,
2008            Some(ruma::events::room::history_visibility::HistoryVisibility::Shared),
2009            true,
2010        )
2011        .unwrap()
2012    }
2013
2014    /// Make a Megolm [`SessionKey`] using the given Ed25519 key as a signing
2015    /// key/session ID.
2016    fn make_session_key(signing_key: &Ed25519Keypair) -> SessionKey {
2017        use rand::Rng;
2018
2019        // `SessionKey::new` is not public, so the easiest way to construct a Megolm
2020        // session using a known Ed25519 key is to build a byte array in the export
2021        // format.
2022
2023        let mut session_key_bytes = vec![0u8; 229];
2024        // 0: version
2025        session_key_bytes[0] = 2;
2026        // 1..5: index
2027        // 5..133: ratchet key
2028        rand::thread_rng().fill(&mut session_key_bytes[5..133]);
2029        // 133..165: public ed25519 key
2030        session_key_bytes[133..165].copy_from_slice(signing_key.public_key().as_bytes());
2031        // 165..229: signature
2032        let sig = signing_key.sign(&session_key_bytes[0..165]);
2033        session_key_bytes[165..229].copy_from_slice(&sig.to_bytes());
2034
2035        SessionKey::from_bytes(&session_key_bytes).unwrap()
2036    }
2037
2038    #[async_test]
2039    async fn test_import_room_keys_notifies_stream() {
2040        use futures_util::FutureExt;
2041
2042        let (alice, bob, _) =
2043            get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2044
2045        let room1_id = room_id!("!room1:localhost");
2046        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2047        let exported_sessions = alice.store().export_room_keys(|_| true).await.unwrap();
2048
2049        let mut room_keys_received_stream = Box::pin(bob.store().room_keys_received_stream());
2050        bob.store().import_room_keys(exported_sessions, None, |_, _| {}).await.unwrap();
2051
2052        let room_keys = room_keys_received_stream
2053            .next()
2054            .now_or_never()
2055            .flatten()
2056            .expect("We should have received an update of room key infos")
2057            .unwrap();
2058        assert_eq!(room_keys.len(), 1);
2059        assert_eq!(room_keys[0].room_id, "!room1:localhost");
2060    }
2061
2062    #[async_test]
2063    async fn test_export_room_keys_provides_selected_keys() {
2064        // Given an OlmMachine with room keys in it
2065        let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2066        let room1_id = room_id!("!room1:localhost");
2067        let room2_id = room_id!("!room2:localhost");
2068        let room3_id = room_id!("!room3:localhost");
2069        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2070        alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2071        alice.create_outbound_group_session_with_defaults_test_helper(room3_id).await.unwrap();
2072
2073        // When I export some of the keys
2074        let keys = alice
2075            .store()
2076            .export_room_keys(|s| s.room_id() == room2_id || s.room_id() == room3_id)
2077            .await
2078            .unwrap();
2079
2080        // Then the requested keys were provided
2081        assert_eq!(keys.len(), 2);
2082        assert_eq!(keys[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2083        assert_eq!(keys[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2084        assert_eq!(keys[0].room_id, "!room2:localhost");
2085        assert_eq!(keys[1].room_id, "!room3:localhost");
2086        assert_eq!(keys[0].session_key.to_base64().len(), 220);
2087        assert_eq!(keys[1].session_key.to_base64().len(), 220);
2088    }
2089
2090    #[async_test]
2091    async fn test_export_room_keys_stream_can_provide_all_keys() {
2092        // Given an OlmMachine with room keys in it
2093        let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2094        let room1_id = room_id!("!room1:localhost");
2095        let room2_id = room_id!("!room2:localhost");
2096        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2097        alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2098
2099        // When I export the keys as a stream
2100        let mut keys = pin!(alice.store().export_room_keys_stream(|_| true).await.unwrap());
2101
2102        // And collect them
2103        let mut collected = vec![];
2104        while let Some(key) = keys.next().await {
2105            collected.push(key);
2106        }
2107
2108        // Then all the keys were provided
2109        assert_eq!(collected.len(), 2);
2110        assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2111        assert_eq!(collected[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2112        assert_eq!(collected[0].room_id, "!room1:localhost");
2113        assert_eq!(collected[1].room_id, "!room2:localhost");
2114        assert_eq!(collected[0].session_key.to_base64().len(), 220);
2115        assert_eq!(collected[1].session_key.to_base64().len(), 220);
2116    }
2117
2118    #[async_test]
2119    async fn test_export_room_keys_stream_can_provide_a_subset_of_keys() {
2120        // Given an OlmMachine with room keys in it
2121        let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2122        let room1_id = room_id!("!room1:localhost");
2123        let room2_id = room_id!("!room2:localhost");
2124        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2125        alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2126
2127        // When I export the keys as a stream
2128        let mut keys =
2129            pin!(alice.store().export_room_keys_stream(|s| s.room_id() == room1_id).await.unwrap());
2130
2131        // And collect them
2132        let mut collected = vec![];
2133        while let Some(key) = keys.next().await {
2134            collected.push(key);
2135        }
2136
2137        // Then all the keys matching our predicate were provided, and no others
2138        assert_eq!(collected.len(), 1);
2139        assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2140        assert_eq!(collected[0].room_id, "!room1:localhost");
2141        assert_eq!(collected[0].session_key.to_base64().len(), 220);
2142    }
2143
2144    #[async_test]
2145    async fn test_export_secrets_bundle() {
2146        let user_id = user_id!("@alice:example.com");
2147        let (first, second, _) = get_machine_pair(user_id, user_id, false).await;
2148
2149        let _ = first
2150            .bootstrap_cross_signing(false)
2151            .await
2152            .expect("We should be able to bootstrap cross-signing");
2153
2154        let bundle = first.store().export_secrets_bundle().await.expect(
2155            "We should be able to export the secrets bundle, now that we \
2156             have the cross-signing keys",
2157        );
2158
2159        assert!(bundle.backup.is_none(), "The bundle should not contain a backup key");
2160
2161        second
2162            .store()
2163            .import_secrets_bundle(&bundle)
2164            .await
2165            .expect("We should be able to import the secrets bundle");
2166
2167        let status = second.cross_signing_status().await;
2168        let identity = second.get_identity(user_id, None).await.unwrap().unwrap().own().unwrap();
2169
2170        assert!(identity.is_verified(), "The public identity should be marked as verified.");
2171
2172        assert!(status.is_complete(), "We should have imported all the cross-signing keys");
2173    }
2174
2175    #[async_test]
2176    async fn test_create_dehydrated_device_key() {
2177        let pickle_key = DehydratedDeviceKey::new()
2178            .expect("Should be able to create a random dehydrated device key");
2179
2180        let to_vec = pickle_key.inner.to_vec();
2181        let pickle_key_from_slice = DehydratedDeviceKey::from_slice(to_vec.as_slice())
2182            .expect("Should be able to create a dehydrated device key from slice");
2183
2184        assert_eq!(pickle_key_from_slice.to_base64(), pickle_key.to_base64());
2185    }
2186
2187    #[async_test]
2188    async fn test_create_dehydrated_errors() {
2189        let too_small = [0u8; 22];
2190        let pickle_key = DehydratedDeviceKey::from_slice(&too_small);
2191
2192        assert!(pickle_key.is_err());
2193
2194        let too_big = [0u8; 40];
2195        let pickle_key = DehydratedDeviceKey::from_slice(&too_big);
2196
2197        assert!(pickle_key.is_err());
2198    }
2199
2200    #[async_test]
2201    async fn test_build_room_key_bundle() {
2202        // Given: Alice has sent a number of room keys to Bob, including some in the
2203        // wrong room, and some that are not marked as shared...
2204        let alice = OlmMachine::new(user_id!("@a:s.co"), device_id!("ALICE")).await;
2205        let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("BOB")).await;
2206
2207        let room1_id = room_id!("!room1:localhost");
2208        let room2_id = room_id!("!room2:localhost");
2209
2210        /* We use hardcoded megolm session data, to get a stable output snapshot. These were all created with:
2211
2212           println!("{}", vodozemac::megolm::GroupSession::new(Default::default()).session_key().to_base64());
2213        */
2214        let session_key1 = "AgAAAAC2XHVzsMBKs4QCRElJ92CJKyGtknCSC8HY7cQ7UYwndMKLQAejXLh5UA0l6s736mgctcUMNvELScUWrObdflrHo+vth/gWreXOaCnaSxmyjjKErQwyIYTkUfqbHy40RJfEesLwnN23on9XAkch/iy8R2+Jz7B8zfG01f2Ow2SxPQFnAndcO1ZSD2GmXgedy6n4B20MWI1jGP2wiexOWbFSya8DO/VxC9m5+/mF+WwYqdpKn9g4Y05Yw4uz7cdjTc3rXm7xK+8E7hI//5QD1nHPvuKYbjjM9u2JSL+Bzp61Cw";
2215        let session_key2 = "AgAAAAC1BXreFTUQQSBGekTEuYxhdytRKyv4JgDGcG+VOBYdPNGgs807SdibCGJky4lJ3I+7ZDGHoUzZPZP/4ogGu4kxni0PWdtWuN7+5zsuamgoFF/BkaGeUUGv6kgIkx8pyPpM5SASTUEP9bN2loDSpUPYwfiIqz74DgC4WQ4435sTBctYvKz8n+TDJwdLXpyT6zKljuqADAioud+s/iqx9LYn9HpbBfezZcvbg67GtE113pLrvde3IcPI5s6dNHK2onGO2B2eoaobcen18bbEDnlUGPeIivArLya7Da6us14jBQ";
2216        let session_key3 = "AgAAAAAM9KFsliaUUhGSXgwOzM5UemjkNH4n8NHgvC/y8hhw13zTF+ooGD4uIYEXYX630oNvQm/EvgZo+dkoc0re+vsqsx4sQeNODdSjcBsWOa0oDF+irQn9oYoLUDPI1IBtY1rX+FV99Zm/xnG7uFOX7aTVlko2GSdejy1w9mfobmfxu5aUc04A9zaKJP1pOthZvRAlhpymGYHgsDtWPrrjyc/yypMflE4kIUEEEtu1kT6mrAmcl615XYRAHYK9G2+fZsGvokwzbkl4nulGwcZMpQEoM0nD2o3GWgX81HW3nGfKBg";
2217        let session_key4 = "AgAAAAA4Kkesxq2h4v9PLD6Sm3Smxspz1PXTqytQPCMQMkkrHNmzV2bHlJ+6/Al9cu8vh1Oj69AK0WUAeJOJuaiskEeg/PI3P03+UYLeC379RzgqwSHdBgdQ41G2vD6zpgmE/8vYToe+qpCZACtPOswZxyqxHH+T/Iq0nv13JmlFGIeA6fEPfr5Y28B49viG74Fs9rxV9EH5PfjbuPM/p+Sz5obShuaBPKQBX1jT913nEXPoIJ06exNZGr0285nw/LgVvNlmWmbqNnbzO2cNZjQWA+xZYz5FSfyCxwqEBbEdUCuRCQ";
2218
2219        let sessions = [
2220            create_inbound_group_session_with_visibility(
2221                &alice,
2222                room1_id,
2223                &SessionKey::from_base64(session_key1).unwrap(),
2224                true,
2225            ),
2226            create_inbound_group_session_with_visibility(
2227                &alice,
2228                room1_id,
2229                &SessionKey::from_base64(session_key2).unwrap(),
2230                true,
2231            ),
2232            create_inbound_group_session_with_visibility(
2233                &alice,
2234                room1_id,
2235                &SessionKey::from_base64(session_key3).unwrap(),
2236                false,
2237            ),
2238            create_inbound_group_session_with_visibility(
2239                &alice,
2240                room2_id,
2241                &SessionKey::from_base64(session_key4).unwrap(),
2242                true,
2243            ),
2244        ];
2245        bob.store().save_inbound_group_sessions(&sessions).await.unwrap();
2246
2247        // When I build the bundle
2248        let mut bundle = bob.store().build_room_key_bundle(room1_id).await.unwrap();
2249
2250        // Then the bundle matches the snapshot.
2251
2252        // We sort the sessions in the bundle, so that the snapshot is stable.
2253        bundle.room_keys.sort_by_key(|session| session.session_id.clone());
2254
2255        // We substitute the algorithm, since this changes based on feature flags.
2256        let algorithm = if cfg!(feature = "experimental-algorithms") {
2257            "m.megolm.v2.aes-sha2"
2258        } else {
2259            "m.megolm.v1.aes-sha2"
2260        };
2261        let map_algorithm = move |value: Content, _path: ContentPath<'_>| {
2262            assert_eq!(value.as_str().unwrap(), algorithm);
2263            "[algorithm]"
2264        };
2265
2266        // We also substitute alice's keys in the snapshot with placeholders
2267        let alice_curve_key = alice.identity_keys().curve25519.to_base64();
2268        let map_alice_curve_key = move |value: Content, _path: ContentPath<'_>| {
2269            assert_eq!(value.as_str().unwrap(), alice_curve_key);
2270            "[alice curve key]"
2271        };
2272        let alice_ed25519_key = alice.identity_keys().ed25519.to_base64();
2273        let map_alice_ed25519_key = move |value: Content, _path: ContentPath<'_>| {
2274            assert_eq!(value.as_str().unwrap(), alice_ed25519_key);
2275            "[alice ed25519 key]"
2276        };
2277
2278        insta::with_settings!({ sort_maps => true }, {
2279            assert_json_snapshot!(bundle, {
2280                ".withheld[].algorithm" => insta::dynamic_redaction(map_algorithm),
2281                ".room_keys[].algorithm" => insta::dynamic_redaction(map_algorithm),
2282                ".room_keys[].sender_key" => insta::dynamic_redaction(map_alice_curve_key.clone()),
2283                ".withheld[].sender_key" => insta::dynamic_redaction(map_alice_curve_key),
2284                ".room_keys[].sender_claimed_keys.ed25519" => insta::dynamic_redaction(map_alice_ed25519_key),
2285            });
2286        });
2287    }
2288
2289    #[async_test]
2290    async fn test_receive_room_key_bundle() {
2291        let alice = OlmMachine::new(user_id!("@a:s.co"), device_id!("ALICE")).await;
2292        let alice_key = alice.identity_keys().curve25519;
2293        let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("BOB")).await;
2294
2295        let room_id = room_id!("!room1:localhost");
2296
2297        let session_key1 = "AgAAAAC2XHVzsMBKs4QCRElJ92CJKyGtknCSC8HY7cQ7UYwndMKLQAejXLh5UA0l6s736mgctcUMNvELScUWrObdflrHo+vth/gWreXOaCnaSxmyjjKErQwyIYTkUfqbHy40RJfEesLwnN23on9XAkch/iy8R2+Jz7B8zfG01f2Ow2SxPQFnAndcO1ZSD2GmXgedy6n4B20MWI1jGP2wiexOWbFSya8DO/VxC9m5+/mF+WwYqdpKn9g4Y05Yw4uz7cdjTc3rXm7xK+8E7hI//5QD1nHPvuKYbjjM9u2JSL+Bzp61Cw";
2298        let session_key2 = "AgAAAAC1BXreFTUQQSBGekTEuYxhdytRKyv4JgDGcG+VOBYdPNGgs807SdibCGJky4lJ3I+7ZDGHoUzZPZP/4ogGu4kxni0PWdtWuN7+5zsuamgoFF/BkaGeUUGv6kgIkx8pyPpM5SASTUEP9bN2loDSpUPYwfiIqz74DgC4WQ4435sTBctYvKz8n+TDJwdLXpyT6zKljuqADAioud+s/iqx9LYn9HpbBfezZcvbg67GtE113pLrvde3IcPI5s6dNHK2onGO2B2eoaobcen18bbEDnlUGPeIivArLya7Da6us14jBQ";
2299
2300        let sessions = [
2301            create_inbound_group_session_with_visibility(
2302                &alice,
2303                room_id,
2304                &SessionKey::from_base64(session_key1).unwrap(),
2305                true,
2306            ),
2307            create_inbound_group_session_with_visibility(
2308                &alice,
2309                room_id,
2310                &SessionKey::from_base64(session_key2).unwrap(),
2311                false,
2312            ),
2313        ];
2314
2315        alice.store().save_inbound_group_sessions(&sessions).await.unwrap();
2316        let bundle = alice.store().build_room_key_bundle(room_id).await.unwrap();
2317
2318        bob.store()
2319            .receive_room_key_bundle(
2320                &StoredRoomKeyBundleData {
2321                    sender_user: alice.user_id().to_owned(),
2322                    sender_key: alice_key,
2323                    sender_data: SenderData::sender_verified(
2324                        alice.user_id(),
2325                        device_id!("ALICE"),
2326                        alice.identity_keys().ed25519,
2327                    ),
2328
2329                    bundle_data: RoomKeyBundleContent {
2330                        room_id: room_id.to_owned(),
2331                        // This isn't used at all in the method call, so we can fill it with
2332                        // garbage.
2333                        file: EncryptedFileInit {
2334                            url: owned_mxc_uri!("mxc://example.com/0"),
2335                            key: JsonWebKeyInit {
2336                                kty: "oct".to_owned(),
2337                                key_ops: vec!["encrypt".to_owned(), "decrypt".to_owned()],
2338                                alg: "A256CTR.".to_owned(),
2339                                k: Base64::new(vec![0u8; 128]),
2340                                ext: true,
2341                            }
2342                            .into(),
2343                            iv: Base64::new(vec![0u8; 128]),
2344                            hashes: vec![("sha256".to_owned(), Base64::new(vec![0u8; 128]))]
2345                                .into_iter()
2346                                .collect(),
2347                            v: "v2".to_owned(),
2348                        }
2349                        .into(),
2350                    },
2351                },
2352                bundle,
2353                |_, _| {},
2354            )
2355            .await
2356            .unwrap();
2357
2358        // The room key should be imported successfully
2359        let imported_sessions =
2360            bob.store().get_inbound_group_sessions_by_room_id(room_id).await.unwrap();
2361
2362        assert_eq!(imported_sessions.len(), 1);
2363        assert_eq!(imported_sessions[0].room_id(), room_id);
2364
2365        // The session forwarder data should be set correctly.
2366        assert_eq!(
2367            imported_sessions[0]
2368                .forwarder_data
2369                .as_ref()
2370                .expect("Session should contain forwarder data.")
2371                .user_id(),
2372            alice.user_id()
2373        );
2374
2375        assert_matches!(
2376            bob.store()
2377                .get_withheld_info(room_id, sessions[1].session_id())
2378                .await
2379                .unwrap()
2380                .expect("Withheld info should be present in the store."),
2381            RoomKeyWithheldEntry {
2382                #[cfg(not(feature = "experimental-algorithms"))]
2383                content: RoomKeyWithheldContent::MegolmV1AesSha2(
2384                    MegolmV1AesSha2WithheldContent::HistoryNotShared(_)
2385                ),
2386                #[cfg(feature = "experimental-algorithms")]
2387                content: RoomKeyWithheldContent::MegolmV2AesSha2(
2388                    MegolmV1AesSha2WithheldContent::HistoryNotShared(_)
2389                ),
2390                ..
2391            }
2392        );
2393    }
2394
2395    /// Tests that the new store format introduced in [#5737][#5737] does not
2396    /// conflict with items already in the store that were serialised with the
2397    /// older format.
2398    ///
2399    /// [#5737]: https://github.com/matrix-org/matrix-rust-sdk/pull/5737
2400    #[async_test]
2401    async fn test_deserialize_room_key_withheld_entry_from_to_device_event() {
2402        let entry: RoomKeyWithheldEntry = serde_json::from_value(json!(
2403            {
2404              "content": {
2405                "algorithm": "m.megolm.v1.aes-sha2",
2406                "code": "m.unauthorised",
2407                "from_device": "ALICE",
2408                "reason": "You are not authorised to read the message.",
2409                "room_id": "!roomid:s.co",
2410                "sender_key": "7hIcOrEroXYdzjtCBvBjUiqvT0Me7g+ymeXqoc65RS0",
2411                "session_id": "session123"
2412              },
2413              "sender": "@alice:s.co",
2414              "type": "m.room_key.withheld"
2415            }
2416        ))
2417        .unwrap();
2418
2419        assert_matches!(
2420            entry,
2421            RoomKeyWithheldEntry {
2422                sender,
2423                content: RoomKeyWithheldContent::MegolmV1AesSha2(
2424                    MegolmV1AesSha2WithheldContent::Unauthorised(withheld_content,)
2425                ),
2426            }
2427        );
2428
2429        assert_eq!(sender, "@alice:s.co");
2430        assert_eq!(withheld_content.room_id, "!roomid:s.co");
2431        assert_eq!(withheld_content.session_id, "session123");
2432        assert_eq!(
2433            withheld_content.sender_key.to_base64(),
2434            "7hIcOrEroXYdzjtCBvBjUiqvT0Me7g+ymeXqoc65RS0"
2435        );
2436        assert_eq!(withheld_content.from_device, Some(owned_device_id!("ALICE")));
2437    }
2438
2439    /// Create an inbound Megolm session for the given room.
2440    ///
2441    /// `olm_machine` is used to set the `sender_key` and `signing_key`
2442    /// fields of the resultant session.
2443    ///
2444    /// The encryption algorithm used for the session depends on the
2445    /// `experimental-algorithms` feature flag:
2446    ///
2447    /// - When not set, the session uses `m.megolm.v1.aes-sha2`.
2448    /// - When set, the session uses `m.megolm.v2.aes-sha2`.
2449    fn create_inbound_group_session_with_visibility(
2450        olm_machine: &OlmMachine,
2451        room_id: &RoomId,
2452        session_key: &SessionKey,
2453        shared_history: bool,
2454    ) -> InboundGroupSession {
2455        let identity_keys = &olm_machine.store().static_account().identity_keys;
2456        InboundGroupSession::new(
2457            identity_keys.curve25519,
2458            identity_keys.ed25519,
2459            room_id,
2460            session_key,
2461            SenderData::unknown(),
2462            None,
2463            #[cfg(not(feature = "experimental-algorithms"))]
2464            EventEncryptionAlgorithm::MegolmV1AesSha2,
2465            #[cfg(feature = "experimental-algorithms")]
2466            EventEncryptionAlgorithm::MegolmV2AesSha2,
2467            None,
2468            shared_history,
2469        )
2470        .unwrap()
2471    }
2472}