Skip to main content

matrix_sdk_crypto/store/
mod.rs

1// Copyright 2020, 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Types and traits to implement the storage layer for the [`OlmMachine`]
16//!
17//! The storage layer for the [`OlmMachine`] can be customized using a trait.
18//! Implementing your own [`CryptoStore`]
19//!
20//! An in-memory only store is provided as well as an SQLite-based one,
21//! depending on your needs and targets a custom store may be implemented, e.g.
22//! for `wasm-unknown-unknown` an indexeddb store would be needed
23//!
24//! ```
25//! # use std::sync::Arc;
26//! # use matrix_sdk_crypto::{
27//! #     OlmMachine,
28//! #     OlmMachineBuilder,
29//! #     store::MemoryStore,
30//! # };
31//! # use ruma::{device_id, user_id};
32//! # let user_id = user_id!("@example:localhost");
33//! # let device_id = device_id!("TEST");
34//! let store = Arc::new(MemoryStore::new());
35//!
36//! let machine = OlmMachineBuilder::new(user_id, device_id)
37//!     .with_crypto_store(store)
38//!     .build();
39//! ```
40//!
41//! [`OlmMachine`]: /matrix_sdk_crypto/struct.OlmMachine.html
42//! [`CryptoStore`]: trait.Cryptostore.html
43
44use std::{
45    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
46    fmt::Debug,
47    ops::Deref,
48    pin::pin,
49    sync::{Arc, atomic::Ordering},
50    time::Duration,
51};
52
53use as_variant::as_variant;
54use futures_core::Stream;
55use futures_util::StreamExt;
56use itertools::{Either, Itertools};
57use ruma::{
58    DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, RoomId, UserId,
59    encryption::KeyUsage, events::secret::request::SecretName,
60};
61use serde::{Serialize, de::DeserializeOwned};
62use thiserror::Error;
63use tokio::sync::{Mutex, Notify, OwnedRwLockWriteGuard, RwLock};
64use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
65use tracing::{info, instrument, trace, warn};
66use types::{RoomKeyBundleInfo, StoredRoomKeyBundleData};
67use vodozemac::{Curve25519PublicKey, megolm::SessionOrdering};
68
69use self::types::{
70    Changes, CrossSigningKeyExport, DeviceChanges, DeviceUpdates, IdentityChanges, IdentityUpdates,
71    PendingChanges, RoomKeyInfo, RoomKeyWithheldInfo, RoomPendingKeyBundleDetails,
72    UserKeyQueryResult,
73};
74use crate::{
75    CrossSigningStatus, OwnUserIdentityData, RoomKeyImportResult,
76    gossiping::GossippedSecret,
77    identities::{Device, DeviceData, UserDevices, UserIdentityData, user::UserIdentity},
78    olm::{
79        Account, ExportedRoomKey, ForwarderData, InboundGroupSession, PrivateCrossSigningIdentity,
80        SenderData, Session, StaticAccountData,
81    },
82    store::types::{RoomKeyWithheldEntry, SecretsInboxItem},
83    types::{
84        BackupSecrets, CrossSigningSecrets, MegolmBackupV1Curve25519AesSha2Secrets, RoomKeyExport,
85        SecretsBundle,
86    },
87    verification::VerificationMachine,
88};
89#[cfg(doc)]
90use crate::{backups::BackupMachine, identities::OwnUserIdentity};
91
92pub mod caches;
93mod crypto_store_wrapper;
94mod error;
95mod memorystore;
96mod traits;
97pub mod types;
98
99#[cfg(any(test, feature = "testing"))]
100#[macro_use]
101#[allow(missing_docs)]
102pub mod integration_tests;
103
104pub(crate) use crypto_store_wrapper::CryptoStoreWrapper;
105pub use error::{CryptoStoreError, Result};
106use matrix_sdk_common::{
107    cross_process_lock::{CrossProcessLock, CrossProcessLockConfig, CrossProcessLockGeneration},
108    deserialized_responses::WithheldCode,
109    timeout::timeout,
110};
111pub use memorystore::MemoryStore;
112pub use traits::{CryptoStore, DynCryptoStore, IntoCryptoStore};
113
114use self::caches::{SequenceNumber, StoreCache, StoreCacheGuard, UsersForKeyQuery};
115use crate::types::{
116    events::room_key_withheld::RoomKeyWithheldContent, room_history::RoomKeyBundle,
117};
118pub use crate::{
119    dehydrated_devices::DehydrationError,
120    gossiping::{GossipRequest, SecretInfo},
121};
122
123/// A wrapper for our CryptoStore trait object.
124///
125/// This is needed because we want to have a generic interface so we can
126/// store/restore objects that we can serialize. Since trait objects and
127/// generics don't mix let the CryptoStore store strings and this wrapper
128/// adds the generic interface on top.
129#[derive(Debug, Clone)]
130pub struct Store {
131    inner: Arc<StoreInner>,
132}
133
134#[derive(Debug, Default)]
135pub(crate) struct KeyQueryManager {
136    /// Record of the users that are waiting for a /keys/query.
137    users_for_key_query: Mutex<UsersForKeyQuery>,
138
139    /// Notifier that is triggered each time an update is received for a user.
140    users_for_key_query_notify: Notify,
141}
142
143impl KeyQueryManager {
144    pub async fn synced<'a>(&'a self, cache: &'a StoreCache) -> Result<SyncedKeyQueryManager<'a>> {
145        self.ensure_sync_tracked_users(cache).await?;
146        Ok(SyncedKeyQueryManager { cache, manager: self })
147    }
148
149    /// Load the list of users for whom we are tracking their device lists and
150    /// fill out our caches.
151    ///
152    /// This method ensures that we're only going to load the users from the
153    /// actual [`CryptoStore`] once, it will also make sure that any
154    /// concurrent calls to this method get deduplicated.
155    async fn ensure_sync_tracked_users(&self, cache: &StoreCache) -> Result<()> {
156        // Check if the users are loaded, and in that case do nothing.
157        let loaded = cache.loaded_tracked_users.read().await;
158        if *loaded {
159            return Ok(());
160        }
161
162        // Otherwise, we may load the users.
163        drop(loaded);
164        let mut loaded = cache.loaded_tracked_users.write().await;
165
166        // Check again if the users have been loaded, in case another call to this
167        // method loaded the tracked users between the time we tried to
168        // acquire the lock and the time we actually acquired the lock.
169        if *loaded {
170            return Ok(());
171        }
172
173        let tracked_users = cache.store.load_tracked_users().await?;
174
175        let mut query_users_lock = self.users_for_key_query.lock().await;
176        let mut tracked_users_cache = cache.tracked_users.write();
177        for user in tracked_users {
178            tracked_users_cache.insert(user.user_id.to_owned());
179
180            if user.dirty {
181                query_users_lock.insert_user(&user.user_id);
182            }
183        }
184
185        *loaded = true;
186
187        Ok(())
188    }
189
190    /// Wait for a `/keys/query` response to be received if one is expected for
191    /// the given user.
192    ///
193    /// If the given timeout elapses, the method will stop waiting and return
194    /// [`UserKeyQueryResult::TimeoutExpired`].
195    ///
196    /// Requires a [`StoreCacheGuard`] to make sure the users for which a key
197    /// query is pending are up to date, but doesn't hold on to it
198    /// thereafter: the lock is short-lived in this case.
199    pub async fn wait_if_user_key_query_pending(
200        &self,
201        cache: StoreCacheGuard,
202        timeout_duration: Duration,
203        user: &UserId,
204    ) -> Result<UserKeyQueryResult> {
205        {
206            // Drop the cache early, so we don't keep it while waiting (since writing the
207            // results requires to write in the cache, thus take another lock).
208            self.ensure_sync_tracked_users(&cache).await?;
209            drop(cache);
210        }
211
212        let mut users_for_key_query = self.users_for_key_query.lock().await;
213        let Some(waiter) = users_for_key_query.maybe_register_waiting_task(user) else {
214            return Ok(UserKeyQueryResult::WasNotPending);
215        };
216
217        let wait_for_completion = async {
218            while !waiter.completed.load(Ordering::Relaxed) {
219                // Register for being notified before releasing the mutex, so
220                // it's impossible to miss a wakeup between the last check for
221                // whether we should wait, and starting to wait.
222                let mut notified = pin!(self.users_for_key_query_notify.notified());
223                notified.as_mut().enable();
224                drop(users_for_key_query);
225
226                // Wait for a notification
227                notified.await;
228
229                // Reclaim the lock before checking the flag to avoid races
230                // when two notifications happen right after each other and the
231                // second one sets the flag we want to wait for.
232                users_for_key_query = self.users_for_key_query.lock().await;
233            }
234        };
235
236        match timeout(Box::pin(wait_for_completion), timeout_duration).await {
237            Err(_) => {
238                warn!(
239                    user_id = ?user,
240                    "The user has a pending `/keys/query` request which did \
241                    not finish yet, some devices might be missing."
242                );
243
244                Ok(UserKeyQueryResult::TimeoutExpired)
245            }
246            _ => Ok(UserKeyQueryResult::WasPending),
247        }
248    }
249}
250
251pub(crate) struct SyncedKeyQueryManager<'a> {
252    cache: &'a StoreCache,
253    manager: &'a KeyQueryManager,
254}
255
256impl SyncedKeyQueryManager<'_> {
257    /// Add entries to the list of users being tracked for device changes
258    ///
259    /// Any users not already on the list are flagged as awaiting a key query.
260    /// Users that were already in the list are unaffected.
261    pub async fn update_tracked_users(&self, users: impl Iterator<Item = &UserId>) -> Result<()> {
262        let mut store_updates = Vec::new();
263        let mut key_query_lock = self.manager.users_for_key_query.lock().await;
264
265        {
266            let mut tracked_users = self.cache.tracked_users.write();
267            for user_id in users {
268                if tracked_users.insert(user_id.to_owned()) {
269                    key_query_lock.insert_user(user_id);
270                    store_updates.push((user_id, true))
271                }
272            }
273        }
274
275        self.cache.store.save_tracked_users(&store_updates).await
276    }
277
278    /// Process notifications that users have changed devices.
279    ///
280    /// This is used to handle the list of device-list updates that is received
281    /// from the `/sync` response. Any users *whose device lists we are
282    /// tracking* are flagged as needing a key query. Users whose devices we
283    /// are not tracking are ignored.
284    pub async fn mark_tracked_users_as_changed(
285        &self,
286        users: impl Iterator<Item = &UserId>,
287    ) -> Result<()> {
288        let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
289        let mut key_query_lock = self.manager.users_for_key_query.lock().await;
290
291        {
292            let tracked_users = &self.cache.tracked_users.read();
293            for user_id in users {
294                if tracked_users.contains(user_id) {
295                    key_query_lock.insert_user(user_id);
296                    store_updates.push((user_id, true));
297                }
298            }
299        }
300
301        self.cache.store.save_tracked_users(&store_updates).await
302    }
303
304    /// Flag that the given users devices are now up-to-date.
305    ///
306    /// This is called after processing the response to a /keys/query request.
307    /// Any users whose device lists we are tracking are removed from the
308    /// list of those pending a /keys/query.
309    pub async fn mark_tracked_users_as_up_to_date(
310        &self,
311        users: impl Iterator<Item = &UserId>,
312        sequence_number: SequenceNumber,
313    ) -> Result<()> {
314        let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
315        let mut key_query_lock = self.manager.users_for_key_query.lock().await;
316
317        {
318            let tracked_users = self.cache.tracked_users.read();
319            for user_id in users {
320                if tracked_users.contains(user_id) {
321                    let clean = key_query_lock.maybe_remove_user(user_id, sequence_number);
322                    store_updates.push((user_id, !clean));
323                }
324            }
325        }
326
327        self.cache.store.save_tracked_users(&store_updates).await?;
328        // wake up any tasks that may have been waiting for updates
329        self.manager.users_for_key_query_notify.notify_waiters();
330
331        Ok(())
332    }
333
334    /// Get the set of users that has the outdate/dirty flag set for their list
335    /// of devices.
336    ///
337    /// This set should be included in a `/keys/query` request which will update
338    /// the device list.
339    ///
340    /// # Returns
341    ///
342    /// A pair `(users, sequence_number)`, where `users` is the list of users to
343    /// be queried, and `sequence_number` is the current sequence number,
344    /// which should be returned in `mark_tracked_users_as_up_to_date`.
345    pub async fn users_for_key_query(&self) -> (HashSet<OwnedUserId>, SequenceNumber) {
346        self.manager.users_for_key_query.lock().await.users_for_key_query()
347    }
348
349    /// See the docs for [`crate::OlmMachine::tracked_users()`].
350    pub fn tracked_users(&self) -> HashSet<OwnedUserId> {
351        self.cache.tracked_users.read().iter().cloned().collect()
352    }
353
354    /// Mark the given user as being tracked for device lists, and mark that it
355    /// has an outdated device list.
356    ///
357    /// This means that the user will be considered for a `/keys/query` request
358    /// next time [`Store::users_for_key_query()`] is called.
359    pub async fn mark_user_as_changed(&self, user: &UserId) -> Result<()> {
360        self.manager.users_for_key_query.lock().await.insert_user(user);
361        self.cache.tracked_users.write().insert(user.to_owned());
362
363        self.cache.store.save_tracked_users(&[(user, true)]).await
364    }
365}
366
367/// Convert the devices and vectors contained in the [`DeviceChanges`] into
368/// a [`DeviceUpdates`] struct.
369///
370/// The [`DeviceChanges`] will contain vectors of [`DeviceData`]s which
371/// we want to convert to a [`Device`].
372fn collect_device_updates(
373    verification_machine: VerificationMachine,
374    own_identity: Option<OwnUserIdentityData>,
375    identities: IdentityChanges,
376    devices: DeviceChanges,
377) -> DeviceUpdates {
378    let mut new: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
379    let mut changed: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
380
381    let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
382
383    let map_device = |device: DeviceData| {
384        let device_owner_identity = new_identities
385            .get(device.user_id())
386            .or_else(|| changed_identities.get(device.user_id()))
387            .or_else(|| unchanged_identities.get(device.user_id()))
388            .cloned();
389
390        Device {
391            inner: device,
392            verification_machine: verification_machine.to_owned(),
393            own_identity: own_identity.to_owned(),
394            device_owner_identity,
395        }
396    };
397
398    for device in devices.new {
399        let device = map_device(device);
400
401        new.entry(device.user_id().to_owned())
402            .or_default()
403            .insert(device.device_id().to_owned(), device);
404    }
405
406    for device in devices.changed {
407        let device = map_device(device);
408
409        changed
410            .entry(device.user_id().to_owned())
411            .or_default()
412            .insert(device.device_id().to_owned(), device.to_owned());
413    }
414
415    DeviceUpdates { new, changed }
416}
417
418/// A temporary transaction (that implies a write) to the underlying store.
419#[allow(missing_debug_implementations)]
420pub struct StoreTransaction {
421    store: Store,
422    changes: PendingChanges,
423    // TODO hold onto the cross-process crypto store lock + cache.
424    cache: OwnedRwLockWriteGuard<StoreCache>,
425}
426
427impl StoreTransaction {
428    /// Starts a new `StoreTransaction`.
429    async fn new(store: Store) -> Self {
430        let cache = store.inner.cache.clone();
431
432        Self { store, changes: PendingChanges::default(), cache: cache.clone().write_owned().await }
433    }
434
435    pub(crate) fn cache(&self) -> &StoreCache {
436        &self.cache
437    }
438
439    /// Returns a reference to the current `Store`.
440    pub fn store(&self) -> &Store {
441        &self.store
442    }
443
444    /// Gets a `Account` for update.
445    ///
446    /// Note: since it's guaranteed that one can't have both a
447    /// `StoreTransaction` and a `StoreCacheGuard` at runtime (since the
448    /// underlying `StoreCache` is guarded by a `RwLock` mutex), this ensures
449    /// that we can't have two copies of an `Account` alive at the same time.
450    pub async fn account(&mut self) -> Result<&mut Account> {
451        if self.changes.account.is_none() {
452            // Make sure the cache loaded the account.
453            let _ = self.cache.account().await?;
454            self.changes.account = self.cache.account.lock().await.take();
455        }
456        Ok(self.changes.account.as_mut().unwrap())
457    }
458
459    /// Commits all dirty fields to the store, and maintains the cache so it
460    /// reflects the current state of the database.
461    pub async fn commit(self) -> Result<()> {
462        if self.changes.is_empty() {
463            return Ok(());
464        }
465
466        // Save changes in the database.
467        let account = self.changes.account.as_ref().map(|acc| acc.deep_clone());
468
469        self.store.save_pending_changes(self.changes).await?;
470
471        // Make the cache coherent with the database.
472        if let Some(account) = account {
473            *self.cache.account.lock().await = Some(account);
474        }
475
476        Ok(())
477    }
478}
479
480#[derive(Debug)]
481struct StoreInner {
482    identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
483    store: Arc<CryptoStoreWrapper>,
484
485    /// In-memory cache for the current crypto store.
486    ///
487    /// ⚠ Must remain private.
488    cache: Arc<RwLock<StoreCache>>,
489
490    verification_machine: VerificationMachine,
491
492    /// Static account data that never changes (and thus can be loaded once and
493    /// for all when creating the store).
494    static_account: StaticAccountData,
495}
496
497/// Error describing what went wrong when importing private cross signing keys
498/// or the key backup key.
499#[derive(Debug, Error)]
500pub enum SecretImportError {
501    /// The key that we tried to import was invalid.
502    #[error("Error while importing {name}: {error}")]
503    Key {
504        /// The name of the secret that was being imported.
505        name: SecretName,
506        /// The key error that occurred.
507        error: vodozemac::KeyError,
508    },
509    /// The public key of the imported private key doesn't match the public
510    /// key that was uploaded to the server.
511    #[error(
512        "Error while importing {name}: The public key of the imported private \
513            key doesn't match the public key that was uploaded to the server"
514    )]
515    MismatchedPublicKeys {
516        /// The name of the secret that was being imported.
517        name: SecretName,
518    },
519    /// The new version of the identity couldn't be stored.
520    #[error(transparent)]
521    Store(#[from] CryptoStoreError),
522}
523
524/// Error describing what went wrong when exporting a [`SecretsBundle`].
525///
526/// The [`SecretsBundle`] can only be exported if we have all cross-signing
527/// private keys in the store.
528#[derive(Debug, Error)]
529pub enum SecretsBundleExportError {
530    /// The store itself had an error.
531    #[error(transparent)]
532    Store(#[from] CryptoStoreError),
533    /// We're missing one or multiple cross-signing keys.
534    #[error("The store is missing one or multiple cross-signing keys")]
535    MissingCrossSigningKey(KeyUsage),
536    /// We're missing all cross-signing keys.
537    #[error("The store doesn't contain any cross-signing keys")]
538    MissingCrossSigningKeys,
539    /// We have a backup key stored, but we don't know the version of the
540    /// backup.
541    #[error("The store contains a backup key, but no backup version")]
542    MissingBackupVersion,
543}
544
545impl Store {
546    /// Create a new Store.
547    pub(crate) fn new(
548        account: StaticAccountData,
549        identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
550        store: Arc<CryptoStoreWrapper>,
551        verification_machine: VerificationMachine,
552    ) -> Self {
553        Self {
554            inner: Arc::new(StoreInner {
555                static_account: account,
556                identity,
557                store: store.clone(),
558                verification_machine,
559                cache: Arc::new(RwLock::new(StoreCache {
560                    store,
561                    tracked_users: Default::default(),
562                    loaded_tracked_users: Default::default(),
563                    account: Default::default(),
564                })),
565            }),
566        }
567    }
568
569    /// UserId associated with this store
570    pub(crate) fn user_id(&self) -> &UserId {
571        &self.inner.static_account.user_id
572    }
573
574    /// DeviceId associated with this store
575    pub(crate) fn device_id(&self) -> &DeviceId {
576        self.inner.verification_machine.own_device_id()
577    }
578
579    /// The static data for the account associated with this store.
580    pub(crate) fn static_account(&self) -> &StaticAccountData {
581        &self.inner.static_account
582    }
583
584    pub(crate) async fn cache(&self) -> Result<StoreCacheGuard> {
585        Ok(StoreCacheGuard { cache: self.inner.cache.clone().read_owned().await })
586    }
587
588    pub(crate) async fn transaction(&self) -> StoreTransaction {
589        StoreTransaction::new(self.clone()).await
590    }
591
592    pub(crate) async fn with_transaction<T>(
593        &self,
594        func: impl AsyncFnOnce(&mut StoreTransaction) -> Result<T, crate::OlmError>,
595    ) -> Result<T, crate::OlmError> {
596        let mut tr = self.transaction().await;
597        let res = func(&mut tr).await?;
598        tr.commit().await?;
599        Ok(res)
600    }
601
602    #[cfg(test)]
603    /// test helper to reset the cross signing identity
604    pub(crate) async fn reset_cross_signing_identity(&self) {
605        self.inner.identity.lock().await.reset();
606    }
607
608    /// PrivateCrossSigningIdentity associated with this store
609    pub(crate) fn private_identity(&self) -> Arc<Mutex<PrivateCrossSigningIdentity>> {
610        self.inner.identity.clone()
611    }
612
613    /// Save the given Sessions to the store
614    pub(crate) async fn save_sessions(&self, sessions: &[Session]) -> Result<()> {
615        let changes = Changes { sessions: sessions.to_vec(), ..Default::default() };
616
617        self.save_changes(changes).await
618    }
619
620    pub(crate) async fn get_sessions(
621        &self,
622        sender_key: &str,
623    ) -> Result<Option<Arc<Mutex<Vec<Session>>>>> {
624        self.inner.store.get_sessions(sender_key).await
625    }
626
627    pub(crate) async fn save_changes(&self, changes: Changes) -> Result<()> {
628        self.inner.store.save_changes(changes).await
629    }
630
631    /// Given an `InboundGroupSession` which we have just received, see if we
632    /// have a matching session already in the store, and determine how to
633    /// handle it.
634    ///
635    /// If the store already has everything we can gather from the new session,
636    /// returns `None`. Otherwise, returns a merged session which should be
637    /// persisted to the store.
638    pub(crate) async fn merge_received_group_session(
639        &self,
640        session: InboundGroupSession,
641    ) -> Result<Option<InboundGroupSession>> {
642        let old_session = self
643            .inner
644            .store
645            .get_inbound_group_session(session.room_id(), session.session_id())
646            .await?;
647
648        // If there is no old session, just use the new session.
649        let Some(old_session) = old_session else {
650            info!("Received a new megolm room key");
651            return Ok(Some(session));
652        };
653
654        let index_comparison = session.compare_ratchet(&old_session).await;
655        let trust_level_comparison =
656            session.sender_data.compare_trust_level(&old_session.sender_data);
657
658        let result = match (index_comparison, trust_level_comparison) {
659            (SessionOrdering::Unconnected, _) => {
660                // If this happens, it means that we have two sessions purporting to have the
661                // same session id, but where the ratchets do not match up.
662                // In other words, someone is playing silly buggers.
663                warn!(
664                    "Received a group session with an ratchet that does not connect to the one in the store, discarding"
665                );
666                None
667            }
668
669            (SessionOrdering::Better, std::cmp::Ordering::Greater)
670            | (SessionOrdering::Better, std::cmp::Ordering::Equal)
671            | (SessionOrdering::Equal, std::cmp::Ordering::Greater) => {
672                // The new session is unambiguously better than what we have in the store.
673                info!(
674                    ?index_comparison,
675                    ?trust_level_comparison,
676                    "Received a megolm room key that we have a worse version of, merging"
677                );
678                Some(session)
679            }
680
681            (SessionOrdering::Worse, std::cmp::Ordering::Less)
682            | (SessionOrdering::Worse, std::cmp::Ordering::Equal)
683            | (SessionOrdering::Equal, std::cmp::Ordering::Less) => {
684                // The new session is unambiguously worse than the one we have in the store.
685                warn!(
686                    ?index_comparison,
687                    ?trust_level_comparison,
688                    "Received a megolm room key that we already have a better version \
689                     of, discarding"
690                );
691                None
692            }
693
694            (SessionOrdering::Equal, std::cmp::Ordering::Equal) => {
695                // The new session is the same as what we have.
696                info!("Received a megolm room key that we already have, discarding");
697                None
698            }
699
700            (SessionOrdering::Better, std::cmp::Ordering::Less) => {
701                // We need to take the ratchet from the new session, and the
702                // sender data from the old session.
703                info!("Upgrading a previously-received megolm session with new ratchet");
704                let result = old_session.with_ratchet(&session);
705                // We'll need to back it up again.
706                result.reset_backup_state();
707                Some(result)
708            }
709
710            (SessionOrdering::Worse, std::cmp::Ordering::Greater) => {
711                // We need to take the ratchet from the old session, and the
712                // sender data from the new session.
713                info!("Upgrading a previously-received megolm session with new sender data");
714                Some(session.with_ratchet(&old_session))
715            }
716        };
717
718        Ok(result)
719    }
720
721    #[cfg(test)]
722    /// Testing helper to allow to save only a set of devices
723    pub(crate) async fn save_device_data(&self, devices: &[DeviceData]) -> Result<()> {
724        use types::DeviceChanges;
725
726        let changes = Changes {
727            devices: DeviceChanges { changed: devices.to_vec(), ..Default::default() },
728            ..Default::default()
729        };
730
731        self.save_changes(changes).await
732    }
733
734    /// Convenience helper to persist an array of [`InboundGroupSession`]s.
735    pub(crate) async fn save_inbound_group_sessions(
736        &self,
737        sessions: &[InboundGroupSession],
738    ) -> Result<()> {
739        let changes = Changes { inbound_group_sessions: sessions.to_vec(), ..Default::default() };
740
741        self.save_changes(changes).await
742    }
743
744    /// Get the display name of our own device.
745    pub(crate) async fn device_display_name(&self) -> Result<Option<String>, CryptoStoreError> {
746        Ok(self
747            .inner
748            .store
749            .get_device(self.user_id(), self.device_id())
750            .await?
751            .and_then(|d| d.display_name().map(|d| d.to_owned())))
752    }
753
754    /// Get the device data for the given [`UserId`] and [`DeviceId`].
755    ///
756    /// *Note*: This method will include our own device which is always present
757    /// in the store.
758    pub(crate) async fn get_device_data(
759        &self,
760        user_id: &UserId,
761        device_id: &DeviceId,
762    ) -> Result<Option<DeviceData>> {
763        self.inner.store.get_device(user_id, device_id).await
764    }
765
766    /// Get the device data for the given [`UserId`] and [`DeviceId`].
767    ///
768    /// *Note*: This method will **not** include our own device.
769    ///
770    /// Use this method if you need a list of recipients for a given user, since
771    /// we don't want to encrypt for our own device, otherwise take a look at
772    /// the [`Store::get_device_data_for_user`] method.
773    pub(crate) async fn get_device_data_for_user_filtered(
774        &self,
775        user_id: &UserId,
776    ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
777        self.inner.store.get_user_devices(user_id).await.map(|mut d| {
778            if user_id == self.user_id() {
779                d.remove(self.device_id());
780            }
781            d
782        })
783    }
784
785    /// Get the [`DeviceData`] for all the devices a user has.
786    ///
787    /// *Note*: This method will include our own device which is always present
788    /// in the store.
789    ///
790    /// Use this method if you need to operate on or update all devices of a
791    /// user, otherwise take a look at the
792    /// [`Store::get_device_data_for_user_filtered`] method.
793    pub(crate) async fn get_device_data_for_user(
794        &self,
795        user_id: &UserId,
796    ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
797        self.inner.store.get_user_devices(user_id).await
798    }
799
800    /// Get a [`Device`] for the given user with the given
801    /// [`Curve25519PublicKey`] key.
802    ///
803    /// *Note*: This method will include our own device which is always present
804    /// in the store.
805    pub(crate) async fn get_device_from_curve_key(
806        &self,
807        user_id: &UserId,
808        curve_key: Curve25519PublicKey,
809    ) -> Result<Option<Device>> {
810        self.get_user_devices(user_id)
811            .await
812            .map(|d| d.devices().find(|d| d.curve25519_key() == Some(curve_key)))
813    }
814
815    /// Get all devices associated with the given [`UserId`].
816    ///
817    /// This method is more expensive than the
818    /// [`Store::get_device_data_for_user`] method, since a [`Device`]
819    /// requires the [`OwnUserIdentityData`] and the [`UserIdentityData`] of the
820    /// device owner to be fetched from the store as well.
821    ///
822    /// *Note*: This method will include our own device which is always present
823    /// in the store.
824    pub(crate) async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices> {
825        let devices = self.get_device_data_for_user(user_id).await?;
826
827        let own_identity = self
828            .inner
829            .store
830            .get_user_identity(self.user_id())
831            .await?
832            .and_then(|i| i.own().cloned());
833        let device_owner_identity = self.inner.store.get_user_identity(user_id).await?;
834
835        Ok(UserDevices {
836            inner: devices,
837            verification_machine: self.inner.verification_machine.clone(),
838            own_identity,
839            device_owner_identity,
840        })
841    }
842
843    /// Get a [`Device`] for the given user with the given [`DeviceId`].
844    ///
845    /// This method is more expensive than the [`Store::get_device_data`] method
846    /// since a [`Device`] requires the [`OwnUserIdentityData`] and the
847    /// [`UserIdentityData`] of the device owner to be fetched from the
848    /// store as well.
849    ///
850    /// *Note*: This method will include our own device which is always present
851    /// in the store.
852    pub(crate) async fn get_device(
853        &self,
854        user_id: &UserId,
855        device_id: &DeviceId,
856    ) -> Result<Option<Device>> {
857        if let Some(device_data) = self.inner.store.get_device(user_id, device_id).await? {
858            Ok(Some(self.wrap_device_data(device_data).await?))
859        } else {
860            Ok(None)
861        }
862    }
863
864    /// Create a new device using the supplied [`DeviceData`]. Normally we would
865    /// call [`Self::get_device`] to find an existing device inside this
866    /// store. Only call this if you have some existing DeviceData and want
867    /// to wrap it with the extra information provided by a [`Device`].
868    pub(crate) async fn wrap_device_data(&self, device_data: DeviceData) -> Result<Device> {
869        let own_identity = self
870            .inner
871            .store
872            .get_user_identity(self.user_id())
873            .await?
874            .and_then(|i| i.own().cloned());
875
876        let device_owner_identity =
877            self.inner.store.get_user_identity(device_data.user_id()).await?;
878
879        Ok(Device {
880            inner: device_data,
881            verification_machine: self.inner.verification_machine.clone(),
882            own_identity,
883            device_owner_identity,
884        })
885    }
886
887    ///  Get the Identity of `user_id`
888    pub(crate) async fn get_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
889        let own_identity = self
890            .inner
891            .store
892            .get_user_identity(self.user_id())
893            .await?
894            .and_then(as_variant!(UserIdentityData::Own));
895
896        Ok(self.inner.store.get_user_identity(user_id).await?.map(|i| {
897            UserIdentity::new(
898                self.clone(),
899                i,
900                self.inner.verification_machine.to_owned(),
901                own_identity,
902            )
903        }))
904    }
905
906    /// Try to export the secret with the given secret name.
907    ///
908    /// The exported secret will be encoded as unpadded base64. Returns `Null`
909    /// if the secret can't be found.
910    ///
911    /// # Arguments
912    ///
913    /// * `secret_name` - The name of the secret that should be exported.
914    pub async fn export_secret(
915        &self,
916        secret_name: &SecretName,
917    ) -> Result<Option<String>, CryptoStoreError> {
918        Ok(match secret_name {
919            SecretName::CrossSigningMasterKey
920            | SecretName::CrossSigningUserSigningKey
921            | SecretName::CrossSigningSelfSigningKey => {
922                self.inner.identity.lock().await.export_secret(secret_name).await
923            }
924            SecretName::RecoveryKey => {
925                if let Some(key) = self.load_backup_keys().await?.decryption_key {
926                    let exported = key.to_base64();
927                    Some(exported)
928                } else {
929                    None
930                }
931            }
932            name => {
933                warn!(secret = ?name, "Unknown secret was requested");
934                None
935            }
936        })
937    }
938
939    /// Export all the private cross signing keys we have.
940    ///
941    /// The export will contain the seed for the ed25519 keys as a unpadded
942    /// base64 encoded string.
943    ///
944    /// This method returns `None` if we don't have any private cross signing
945    /// keys.
946    pub async fn export_cross_signing_keys(
947        &self,
948    ) -> Result<Option<CrossSigningKeyExport>, CryptoStoreError> {
949        let master_key = self.export_secret(&SecretName::CrossSigningMasterKey).await?;
950        let self_signing_key = self.export_secret(&SecretName::CrossSigningSelfSigningKey).await?;
951        let user_signing_key = self.export_secret(&SecretName::CrossSigningUserSigningKey).await?;
952
953        Ok(if master_key.is_none() && self_signing_key.is_none() && user_signing_key.is_none() {
954            None
955        } else {
956            Some(CrossSigningKeyExport { master_key, self_signing_key, user_signing_key })
957        })
958    }
959
960    /// Import our private cross signing keys.
961    ///
962    /// The export needs to contain the seed for the Ed25519 keys as an unpadded
963    /// base64 encoded string.
964    pub async fn import_cross_signing_keys(
965        &self,
966        export: CrossSigningKeyExport,
967    ) -> Result<CrossSigningStatus, SecretImportError> {
968        if let Some(public_identity) =
969            self.get_identity(self.user_id()).await?.and_then(|i| i.own())
970        {
971            let identity = self.inner.identity.lock().await;
972
973            identity
974                .import_secrets(
975                    public_identity.to_owned(),
976                    export.master_key.as_deref(),
977                    export.self_signing_key.as_deref(),
978                    export.user_signing_key.as_deref(),
979                )
980                .await?;
981
982            let status = identity.status().await;
983
984            let diff = identity.get_public_identity_diff(&public_identity.inner).await;
985
986            let mut changes =
987                Changes { private_identity: Some(identity.clone()), ..Default::default() };
988
989            if diff.none_differ() {
990                public_identity.mark_as_verified();
991                changes.identities.changed.push(UserIdentityData::Own(public_identity.inner));
992            }
993
994            info!(?status, "Successfully imported the private cross-signing keys");
995
996            self.save_changes(changes).await?;
997        } else {
998            warn!(
999                "No public identity found while importing cross-signing keys, \
1000                 a /keys/query needs to be done"
1001            );
1002        }
1003
1004        Ok(self.inner.identity.lock().await.status().await)
1005    }
1006
1007    /// Export all the secrets we have in the store into a [`SecretsBundle`].
1008    ///
1009    /// This method will export all the private cross-signing keys and, if
1010    /// available, the private part of a backup key and its accompanying
1011    /// version.
1012    ///
1013    /// The method will fail if we don't have all three private cross-signing
1014    /// keys available.
1015    ///
1016    /// **Warning**: Only export this and share it with a trusted recipient,
1017    /// i.e. if an existing device is sharing this with a new device.
1018    pub async fn export_secrets_bundle(&self) -> Result<SecretsBundle, SecretsBundleExportError> {
1019        let Some(cross_signing) = self.export_cross_signing_keys().await? else {
1020            return Err(SecretsBundleExportError::MissingCrossSigningKeys);
1021        };
1022
1023        let Some(master_key) = cross_signing.master_key.clone() else {
1024            return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::Master));
1025        };
1026
1027        let Some(user_signing_key) = cross_signing.user_signing_key.clone() else {
1028            return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::UserSigning));
1029        };
1030
1031        let Some(self_signing_key) = cross_signing.self_signing_key.clone() else {
1032            return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::SelfSigning));
1033        };
1034
1035        let backup_keys = self.load_backup_keys().await?;
1036
1037        let backup = if let Some(key) = backup_keys.decryption_key {
1038            if let Some(backup_version) = backup_keys.backup_version {
1039                Some(BackupSecrets::MegolmBackupV1Curve25519AesSha2(
1040                    MegolmBackupV1Curve25519AesSha2Secrets { key, backup_version },
1041                ))
1042            } else {
1043                return Err(SecretsBundleExportError::MissingBackupVersion);
1044            }
1045        } else {
1046            None
1047        };
1048
1049        Ok(SecretsBundle {
1050            cross_signing: CrossSigningSecrets { master_key, user_signing_key, self_signing_key },
1051            backup,
1052        })
1053    }
1054
1055    /// Import and persists secrets from a [`SecretsBundle`].
1056    ///
1057    /// This method will import all the private cross-signing keys and, if
1058    /// available, the private part of a backup key and its accompanying
1059    /// version into the store.
1060    ///
1061    /// **Warning**: Only import this from a trusted source, i.e. if an existing
1062    /// device is sharing this with a new device. The imported cross-signing
1063    /// keys will create a [`OwnUserIdentity`] and mark it as verified.
1064    ///
1065    /// The backup key will be persisted in the store and can be enabled using
1066    /// the [`BackupMachine`].
1067    pub async fn import_secrets_bundle(
1068        &self,
1069        bundle: &SecretsBundle,
1070    ) -> Result<(), SecretImportError> {
1071        let mut changes = Changes::default();
1072
1073        if let Some(backup_bundle) = &bundle.backup {
1074            match backup_bundle {
1075                BackupSecrets::MegolmBackupV1Curve25519AesSha2(bundle) => {
1076                    changes.backup_decryption_key = Some(bundle.key.clone());
1077                    changes.backup_version = Some(bundle.backup_version.clone());
1078                }
1079            }
1080        }
1081
1082        let identity = self.inner.identity.lock().await;
1083
1084        identity
1085            .import_secrets_unchecked(
1086                Some(&bundle.cross_signing.master_key),
1087                Some(&bundle.cross_signing.self_signing_key),
1088                Some(&bundle.cross_signing.user_signing_key),
1089            )
1090            .await?;
1091
1092        let public_identity = identity.to_public_identity().await.expect(
1093            "We should be able to create a new public identity since we just imported \
1094             all the private cross-signing keys",
1095        );
1096
1097        changes.private_identity = Some(identity.clone());
1098        changes.identities.new.push(UserIdentityData::Own(public_identity));
1099
1100        Ok(self.save_changes(changes).await?)
1101    }
1102
1103    /// Import the given `secret` named `secret_name` into the keystore.
1104    pub async fn import_secret(&self, secret: &GossippedSecret) -> Result<(), SecretImportError> {
1105        match &secret.secret_name {
1106            SecretName::CrossSigningMasterKey
1107            | SecretName::CrossSigningUserSigningKey
1108            | SecretName::CrossSigningSelfSigningKey => {
1109                if let Some(public_identity) =
1110                    self.get_identity(self.user_id()).await?.and_then(|i| i.own())
1111                {
1112                    let identity = self.inner.identity.lock().await;
1113
1114                    identity
1115                        .import_secret(
1116                            public_identity,
1117                            &secret.secret_name,
1118                            &secret.event.content.secret,
1119                        )
1120                        .await?;
1121                    info!(
1122                        secret_name = ?secret.secret_name,
1123                        "Successfully imported a private cross signing key"
1124                    );
1125
1126                    let changes =
1127                        Changes { private_identity: Some(identity.clone()), ..Default::default() };
1128
1129                    self.save_changes(changes).await?;
1130                }
1131            }
1132            SecretName::RecoveryKey => {
1133                // We don't import the decryption key here since we'll want to
1134                // check if the public key matches to the latest version on the
1135                // server. We instead put the secret into a secret inbox where
1136                // it will stay until it either gets overwritten
1137                // or the user accepts the secret.
1138            }
1139            name => {
1140                warn!(secret = ?name, "Tried to import an unknown secret");
1141            }
1142        }
1143
1144        Ok(())
1145    }
1146
1147    /// Check whether there is a global flag to only encrypt messages for
1148    /// trusted devices or for everyone.
1149    pub async fn get_only_allow_trusted_devices(&self) -> Result<bool> {
1150        let value = self.get_value("only_allow_trusted_devices").await?.unwrap_or_default();
1151        Ok(value)
1152    }
1153
1154    /// Set global flag whether to encrypt messages for untrusted devices, or
1155    /// whether they should be excluded from the conversation.
1156    pub async fn set_only_allow_trusted_devices(
1157        &self,
1158        block_untrusted_devices: bool,
1159    ) -> Result<()> {
1160        self.set_value("only_allow_trusted_devices", &block_untrusted_devices).await
1161    }
1162
1163    /// Get custom stored value associated with a key
1164    pub async fn get_value<T: DeserializeOwned>(&self, key: &str) -> Result<Option<T>> {
1165        let Some(value) = self.get_custom_value(key).await? else {
1166            return Ok(None);
1167        };
1168        let deserialized = self.deserialize_value(&value)?;
1169        Ok(Some(deserialized))
1170    }
1171
1172    /// Store custom value associated with a key
1173    pub async fn set_value(&self, key: &str, value: &impl Serialize) -> Result<()> {
1174        let serialized = self.serialize_value(value)?;
1175        self.set_custom_value(key, serialized).await?;
1176        Ok(())
1177    }
1178
1179    fn serialize_value(&self, value: &impl Serialize) -> Result<Vec<u8>> {
1180        let serialized =
1181            rmp_serde::to_vec_named(value).map_err(|x| CryptoStoreError::Backend(x.into()))?;
1182        Ok(serialized)
1183    }
1184
1185    fn deserialize_value<T: DeserializeOwned>(&self, value: &[u8]) -> Result<T> {
1186        let deserialized =
1187            rmp_serde::from_slice(value).map_err(|e| CryptoStoreError::Backend(e.into()))?;
1188        Ok(deserialized)
1189    }
1190
1191    /// Receive notifications of room keys being received as a [`Stream`].
1192    ///
1193    /// Each time a room key is updated in any way, an update will be sent to
1194    /// the stream. Updates that happen at the same time are batched into a
1195    /// [`Vec`].
1196    ///
1197    /// If the reader of the stream lags too far behind an error will be sent to
1198    /// the reader.
1199    ///
1200    /// The stream will terminate once all references to the underlying
1201    /// `CryptoStoreWrapper` are dropped.
1202    pub fn room_keys_received_stream(
1203        &self,
1204    ) -> impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<> {
1205        self.inner.store.room_keys_received_stream()
1206    }
1207
1208    /// Receive notifications of received `m.room_key.withheld` messages.
1209    ///
1210    /// Each time an `m.room_key.withheld` is received and stored, an update
1211    /// will be sent to the stream. Updates that happen at the same time are
1212    /// batched into a [`Vec`].
1213    ///
1214    /// If the reader of the stream lags too far behind, a warning will be
1215    /// logged and items will be dropped.
1216    pub fn room_keys_withheld_received_stream(
1217        &self,
1218    ) -> impl Stream<Item = Vec<RoomKeyWithheldInfo>> + use<> {
1219        self.inner.store.room_keys_withheld_received_stream()
1220    }
1221
1222    /// Returns a stream of user identity updates, allowing users to listen for
1223    /// notifications about new or changed user identities.
1224    ///
1225    /// The stream produced by this method emits updates whenever a new user
1226    /// identity is discovered or when an existing identities information is
1227    /// changed. Users can subscribe to this stream and receive updates in
1228    /// real-time.
1229    ///
1230    /// Caution: the returned stream will never terminate, and it holds a
1231    /// reference to the [`CryptoStore`]. Listeners should be careful to avoid
1232    /// resource leaks.
1233    ///
1234    /// # Examples
1235    ///
1236    /// ```no_run
1237    /// # use matrix_sdk_crypto::OlmMachine;
1238    /// # use ruma::{device_id, user_id};
1239    /// # use futures_util::{pin_mut, StreamExt};
1240    /// # let machine: OlmMachine = unimplemented!();
1241    /// # futures_executor::block_on(async {
1242    /// let identities_stream = machine.store().user_identities_stream();
1243    /// pin_mut!(identities_stream);
1244    ///
1245    /// for identity_updates in identities_stream.next().await {
1246    ///     for (_, identity) in identity_updates.new {
1247    ///         println!("A new identity has been added {}", identity.user_id());
1248    ///     }
1249    /// }
1250    /// # });
1251    /// ```
1252    pub fn user_identities_stream(&self) -> impl Stream<Item = IdentityUpdates> + use<> {
1253        let verification_machine = self.inner.verification_machine.to_owned();
1254
1255        let this = self.clone();
1256        self.inner.store.identities_stream().map(move |(own_identity, identities, _)| {
1257            let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
1258
1259            let map_identity = |(user_id, identity)| {
1260                (
1261                    user_id,
1262                    UserIdentity::new(
1263                        this.clone(),
1264                        identity,
1265                        verification_machine.to_owned(),
1266                        own_identity.to_owned(),
1267                    ),
1268                )
1269            };
1270
1271            let new = new_identities.into_iter().map(map_identity).collect();
1272            let changed = changed_identities.into_iter().map(map_identity).collect();
1273            let unchanged = unchanged_identities.into_iter().map(map_identity).collect();
1274
1275            IdentityUpdates { new, changed, unchanged }
1276        })
1277    }
1278
1279    /// Returns a stream of device updates, allowing users to listen for
1280    /// notifications about new or changed devices.
1281    ///
1282    /// The stream produced by this method emits updates whenever a new device
1283    /// is discovered or when an existing device's information is changed. Users
1284    /// can subscribe to this stream and receive updates in real-time.
1285    ///
1286    /// Caution: the returned stream will never terminate, and it holds a
1287    /// reference to the [`CryptoStore`]. Listeners should be careful to avoid
1288    /// resource leaks.
1289    ///
1290    /// # Examples
1291    ///
1292    /// ```no_run
1293    /// # use matrix_sdk_crypto::OlmMachine;
1294    /// # use ruma::{device_id, user_id};
1295    /// # use futures_util::{pin_mut, StreamExt};
1296    /// # let machine: OlmMachine = unimplemented!();
1297    /// # futures_executor::block_on(async {
1298    /// let devices_stream = machine.store().devices_stream();
1299    /// pin_mut!(devices_stream);
1300    ///
1301    /// for device_updates in devices_stream.next().await {
1302    ///     if let Some(user_devices) = device_updates.new.get(machine.user_id()) {
1303    ///         for device in user_devices.values() {
1304    ///             println!("A new device has been added {}", device.device_id());
1305    ///         }
1306    ///     }
1307    /// }
1308    /// # });
1309    /// ```
1310    pub fn devices_stream(&self) -> impl Stream<Item = DeviceUpdates> + use<> {
1311        let verification_machine = self.inner.verification_machine.to_owned();
1312
1313        self.inner.store.identities_stream().map(move |(own_identity, identities, devices)| {
1314            collect_device_updates(
1315                verification_machine.to_owned(),
1316                own_identity,
1317                identities,
1318                devices,
1319            )
1320        })
1321    }
1322
1323    /// Returns a [`Stream`] of user identity and device updates
1324    ///
1325    /// The stream returned by this method returns the same data as
1326    /// [`Store::user_identities_stream`] and [`Store::devices_stream`] but does
1327    /// not include references to the `VerificationMachine`. It is therefore a
1328    /// lower-level view on that data.
1329    ///
1330    /// The stream will terminate once all references to the underlying
1331    /// `CryptoStoreWrapper` are dropped.
1332    pub fn identities_stream_raw(
1333        &self,
1334    ) -> impl Stream<Item = (IdentityChanges, DeviceChanges)> + use<> {
1335        self.inner.store.identities_stream().map(|(_, identities, devices)| (identities, devices))
1336    }
1337
1338    /// Creates a [`CrossProcessLock`] for this store, that will contain the
1339    /// given key and value when hold.
1340    pub fn create_store_lock(
1341        &self,
1342        lock_key: String,
1343        config: CrossProcessLockConfig,
1344    ) -> CrossProcessLock<LockableCryptoStore> {
1345        self.inner.store.create_store_lock(lock_key, config)
1346    }
1347
1348    /// Receive notifications of gossipped secrets being received and stored in
1349    /// the secret inbox as a [`Stream`].
1350    ///
1351    /// The gossipped secrets are received using the `m.secret.send` event type
1352    /// and are guaranteed to have been received over a 1-to-1 Olm
1353    /// [`Session`] from a verified [`Device`].
1354    ///
1355    /// The [`GossippedSecret`] can also be later found in the secret inbox and
1356    /// retrieved using the [`CryptoStore::get_secrets_from_inbox()`] method.
1357    ///
1358    /// After a suitable secret of a certain type has been found it can be
1359    /// removed from the store
1360    /// using the [`CryptoStore::delete_secrets_from_inbox()`] method.
1361    ///
1362    /// The only secret this will currently broadcast is the
1363    /// `m.megolm_backup.v1`.
1364    ///
1365    /// If the reader of the stream lags too far behind, a warning will be
1366    /// logged and items will be dropped.
1367    ///
1368    /// # Examples
1369    ///
1370    /// ```no_run
1371    /// # use matrix_sdk_crypto::OlmMachine;
1372    /// # use ruma::{device_id, owned_user_id};
1373    /// # use futures_util::{pin_mut, StreamExt};
1374    /// # let alice = owned_user_id!("@alice:example.org");
1375    /// # futures_executor::block_on(async {
1376    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1377    ///
1378    /// let secret_stream = machine.store().secrets_stream();
1379    /// pin_mut!(secret_stream);
1380    ///
1381    /// for secret in secret_stream.next().await {
1382    ///     // Accept the secret if it's valid, then delete all the secrets of this type.
1383    ///     machine.store().delete_secrets_from_inbox(&secret.secret_name);
1384    /// }
1385    /// # });
1386    /// ```
1387    pub fn secrets_stream(&self) -> impl Stream<Item = SecretsInboxItem> + use<> {
1388        self.inner.store.secrets_stream()
1389    }
1390
1391    /// Receive notifications of historic room key bundles as a [`Stream`].
1392    ///
1393    /// Historic room key bundles are defined in [MSC4268](https://github.com/matrix-org/matrix-spec-proposals/pull/4268).
1394    ///
1395    /// Each time a historic room key bundle was received, an update will be
1396    /// sent to the stream. This stream can be used to accept historic room key
1397    /// bundles that arrive out of order, i.e. the bundle arrives after the
1398    /// user has already accepted a room invitation.
1399    ///
1400    /// # Examples
1401    ///
1402    /// ```no_run
1403    /// # use matrix_sdk_crypto::{
1404    /// #    OlmMachine,
1405    /// #    store::types::StoredRoomKeyBundleData,
1406    /// #    types::room_history::RoomKeyBundle
1407    /// # };
1408    /// # use ruma::{device_id, owned_user_id};
1409    /// # use futures_util::{pin_mut, StreamExt};
1410    /// # let alice = owned_user_id!("@alice:example.org");
1411    /// # async {
1412    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1413    /// let bundle_stream = machine.store().historic_room_key_stream();
1414    /// pin_mut!(bundle_stream);
1415    ///
1416    /// while let Some(bundle_info) = bundle_stream.next().await {
1417    ///     // Try to find the bundle content in the store and if it's valid accept it.
1418    ///     if let Some(bundle_data) = machine.store().get_received_room_key_bundle_data(&bundle_info.room_id, &bundle_info.sender).await? {
1419    ///         // Download the bundle now and import it.
1420    ///         let bundle: RoomKeyBundle = todo!("Download the bundle");
1421    ///         machine.store().receive_room_key_bundle(
1422    ///             &bundle_data,
1423    ///             bundle,
1424    ///             |_, _| {},
1425    ///         ).await?;
1426    ///     }
1427    /// }
1428    /// # anyhow::Ok(()) };
1429    /// ```
1430    pub fn historic_room_key_stream(&self) -> impl Stream<Item = RoomKeyBundleInfo> + use<> {
1431        self.inner.store.historic_room_key_stream()
1432    }
1433
1434    /// Import the given room keys into the store.
1435    ///
1436    /// # Arguments
1437    ///
1438    /// * `exported_keys` - The keys to be imported.
1439    /// * `from_backup_version` - If the keys came from key backup, the key
1440    ///   backup version. This will cause the keys to be marked as already
1441    ///   backed up, and therefore not requiring another backup.
1442    /// * `progress_listener` - Callback which will be called after each key is
1443    ///   processed. Called with arguments `(processed, total)` where
1444    ///   `processed` is the number of keys processed so far, and `total` is the
1445    ///   total number of keys (i.e., `exported_keys.len()`).
1446    pub async fn import_room_keys(
1447        &self,
1448        exported_keys: Vec<ExportedRoomKey>,
1449        from_backup_version: Option<&str>,
1450        progress_listener: impl Fn(usize, usize),
1451    ) -> Result<RoomKeyImportResult> {
1452        let exported_keys = exported_keys.iter().filter_map(|key| {
1453            key.try_into()
1454                .map_err(|e| {
1455                    warn!(
1456                        sender_key = key.sender_key().to_base64(),
1457                        room_id = ?key.room_id(),
1458                        session_id = key.session_id(),
1459                        error = ?e,
1460                        "Couldn't import a room key from a file export."
1461                    );
1462                })
1463                .ok()
1464        });
1465        self.import_sessions_impl(exported_keys, from_backup_version, progress_listener).await
1466    }
1467
1468    /// Import the given room keys into our store.
1469    ///
1470    /// # Arguments
1471    ///
1472    /// * `exported_keys` - A list of previously exported keys that should be
1473    ///   imported into our store. If we already have a better version of a key
1474    ///   the key will *not* be imported.
1475    ///
1476    /// Returns a tuple of numbers that represent the number of sessions that
1477    /// were imported and the total number of sessions that were found in the
1478    /// key export.
1479    ///
1480    /// # Examples
1481    ///
1482    /// ```no_run
1483    /// # use std::io::Cursor;
1484    /// # use matrix_sdk_crypto::{OlmMachine, decrypt_room_key_export};
1485    /// # use ruma::{device_id, user_id};
1486    /// # let alice = user_id!("@alice:example.org");
1487    /// # async {
1488    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1489    /// # let export = Cursor::new("".to_owned());
1490    /// let exported_keys = decrypt_room_key_export(export, "1234").unwrap();
1491    /// machine.store().import_exported_room_keys(exported_keys, |_, _| {}).await.unwrap();
1492    /// # };
1493    /// ```
1494    pub async fn import_exported_room_keys(
1495        &self,
1496        exported_keys: Vec<ExportedRoomKey>,
1497        progress_listener: impl Fn(usize, usize),
1498    ) -> Result<RoomKeyImportResult> {
1499        self.import_room_keys(exported_keys, None, progress_listener).await
1500    }
1501
1502    async fn import_sessions_impl(
1503        &self,
1504        sessions: impl Iterator<Item = InboundGroupSession>,
1505        from_backup_version: Option<&str>,
1506        progress_listener: impl Fn(usize, usize),
1507    ) -> Result<RoomKeyImportResult> {
1508        let sessions: Vec<_> = sessions.collect();
1509        let mut imported_sessions = Vec::new();
1510
1511        let total_count = sessions.len();
1512        let mut keys = BTreeMap::new();
1513
1514        for (i, session) in sessions.into_iter().enumerate() {
1515            // Only import the session if we didn't have this session or
1516            // if it's a better version of the same session.
1517            if let Some(merged) = self.merge_received_group_session(session).await? {
1518                if from_backup_version.is_some() {
1519                    merged.mark_as_backed_up();
1520                }
1521
1522                keys.entry(merged.room_id().to_owned())
1523                    .or_insert_with(BTreeMap::new)
1524                    .entry(merged.sender_key().to_base64())
1525                    .or_insert_with(BTreeSet::new)
1526                    .insert(merged.session_id().to_owned());
1527
1528                imported_sessions.push(merged);
1529            }
1530
1531            progress_listener(i, total_count);
1532        }
1533
1534        let imported_count = imported_sessions.len();
1535
1536        self.inner
1537            .store
1538            .save_inbound_group_sessions(imported_sessions, from_backup_version)
1539            .await?;
1540
1541        info!(total_count, imported_count, room_keys = ?keys, "Successfully imported room keys");
1542
1543        Ok(RoomKeyImportResult::new(imported_count, total_count, keys))
1544    }
1545
1546    pub(crate) fn crypto_store(&self) -> Arc<CryptoStoreWrapper> {
1547        self.inner.store.clone()
1548    }
1549
1550    /// Export the keys that match the given predicate.
1551    ///
1552    /// # Arguments
1553    ///
1554    /// * `predicate` - A closure that will be called for every known
1555    ///   `InboundGroupSession`, which represents a room key. If the closure
1556    ///   returns `true` the `InboundGroupSession` will be included in the
1557    ///   export, if the closure returns `false` it will not be included.
1558    ///
1559    /// # Examples
1560    ///
1561    /// ```no_run
1562    /// # use matrix_sdk_crypto::{OlmMachine, encrypt_room_key_export};
1563    /// # use ruma::{device_id, user_id, room_id};
1564    /// # let alice = user_id!("@alice:example.org");
1565    /// # async {
1566    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1567    /// let room_id = room_id!("!test:localhost");
1568    /// let exported_keys = machine.store().export_room_keys(|s| s.room_id() == room_id).await.unwrap();
1569    /// let encrypted_export = encrypt_room_key_export(&exported_keys, "1234", 1);
1570    /// # };
1571    /// ```
1572    pub async fn export_room_keys(
1573        &self,
1574        predicate: impl FnMut(&InboundGroupSession) -> bool,
1575    ) -> Result<Vec<ExportedRoomKey>> {
1576        let mut exported = Vec::new();
1577
1578        let mut sessions = self.get_inbound_group_sessions().await?;
1579        sessions.retain(predicate);
1580
1581        for session in sessions {
1582            let export = session.export().await;
1583            exported.push(export);
1584        }
1585
1586        Ok(exported)
1587    }
1588
1589    /// Export room keys matching a predicate, providing them as an async
1590    /// `Stream`.
1591    ///
1592    /// # Arguments
1593    ///
1594    /// * `predicate` - A closure that will be called for every known
1595    ///   `InboundGroupSession`, which represents a room key. If the closure
1596    ///   returns `true` the `InboundGroupSession` will be included in the
1597    ///   export, if the closure returns `false` it will not be included.
1598    ///
1599    /// # Examples
1600    ///
1601    /// ```no_run
1602    /// use std::pin::pin;
1603    ///
1604    /// use matrix_sdk_crypto::{OlmMachine, olm::ExportedRoomKey};
1605    /// use ruma::{device_id, room_id, user_id};
1606    /// use tokio_stream::StreamExt;
1607    /// # async {
1608    /// let alice = user_id!("@alice:example.org");
1609    /// let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1610    /// let room_id = room_id!("!test:localhost");
1611    /// let mut keys = pin!(
1612    ///     machine
1613    ///         .store()
1614    ///         .export_room_keys_stream(|s| s.room_id() == room_id)
1615    ///         .await
1616    ///         .unwrap()
1617    /// );
1618    /// while let Some(key) = keys.next().await {
1619    ///     println!("{}", key.room_id);
1620    /// }
1621    /// # };
1622    /// ```
1623    pub async fn export_room_keys_stream(
1624        &self,
1625        predicate: impl FnMut(&InboundGroupSession) -> bool,
1626    ) -> Result<impl Stream<Item = ExportedRoomKey>> {
1627        // TODO: if/when there is a get_inbound_group_sessions_stream, use that here.
1628        let sessions = self.get_inbound_group_sessions().await?;
1629        Ok(futures_util::stream::iter(sessions.into_iter().filter(predicate))
1630            .then(|session| async move { session.export().await }))
1631    }
1632
1633    /// Assemble a room key bundle for sharing encrypted history, as per
1634    /// [MSC4268].
1635    ///
1636    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
1637    pub async fn build_room_key_bundle(
1638        &self,
1639        room_id: &RoomId,
1640    ) -> std::result::Result<RoomKeyBundle, CryptoStoreError> {
1641        let sessions = self.get_inbound_group_sessions_by_room_id(room_id).await?;
1642
1643        let mut bundle = RoomKeyBundle::default();
1644        for session in sessions {
1645            if session.shared_history() {
1646                bundle.room_keys.push(session.export().await.into());
1647            } else {
1648                bundle.withheld.push(RoomKeyWithheldContent::new(
1649                    session.algorithm().to_owned(),
1650                    WithheldCode::HistoryNotShared,
1651                    session.room_id().to_owned(),
1652                    session.session_id().to_owned(),
1653                    session.sender_key().to_owned(),
1654                    self.device_id().to_owned(),
1655                ));
1656            }
1657        }
1658
1659        // If we received a key bundle ourselves, in which one or more sessions was
1660        // marked as "history not shared", pass that on to the new user.
1661        let withhelds = self.get_withheld_sessions_by_room_id(room_id).await?;
1662        for withheld in withhelds {
1663            if withheld.content.withheld_code() == WithheldCode::HistoryNotShared {
1664                bundle.withheld.push(withheld.content);
1665            }
1666        }
1667
1668        Ok(bundle)
1669    }
1670
1671    /// Import the contents of a downloaded and decrypted [MSC4268] key bundle.
1672    ///
1673    /// # Arguments
1674    ///
1675    /// * `bundle_info` - The [`StoredRoomKeyBundleData`] of the bundle that is
1676    ///   being received.
1677    /// * `bundle` - The decrypted and deserialized bundle itself.
1678    ///
1679    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
1680    #[instrument(skip(self, bundle, progress_listener), fields(bundle_size = bundle.room_keys.len(), sender_data))]
1681    pub async fn receive_room_key_bundle(
1682        &self,
1683        bundle_info: &StoredRoomKeyBundleData,
1684        bundle: RoomKeyBundle,
1685        progress_listener: impl Fn(usize, usize),
1686    ) -> Result<(), CryptoStoreError> {
1687        let sender_data = if bundle_info.sender_data.should_recalculate() {
1688            let device = self
1689                .get_device_from_curve_key(&bundle_info.sender_user, bundle_info.sender_key)
1690                .await?;
1691
1692            device
1693                .as_ref()
1694                .map(SenderData::from_device)
1695                .unwrap_or_else(|| bundle_info.sender_data.clone())
1696        } else {
1697            bundle_info.sender_data.clone()
1698        };
1699
1700        tracing::Span::current().record("sender_data", tracing::field::debug(&sender_data));
1701
1702        // The sender's device must be either `SenderData::SenderUnverified` (i.e.,
1703        // TOFU-trusted) or `SenderData::SenderVerified` (i.e., fully verified
1704        // via user verification and cross-signing).
1705        let Ok(forwarder_data) = (&sender_data).try_into() else {
1706            warn!(
1707                "Not accepting a historic room key bundle due to insufficient trust in the sender"
1708            );
1709            return Ok(());
1710        };
1711
1712        self.import_room_key_bundle_sessions(
1713            bundle_info,
1714            &bundle,
1715            &forwarder_data,
1716            progress_listener,
1717        )
1718        .await?;
1719        self.import_room_key_bundle_withheld_info(bundle_info, &bundle).await?;
1720
1721        Ok(())
1722    }
1723
1724    async fn import_room_key_bundle_sessions(
1725        &self,
1726        bundle_info: &StoredRoomKeyBundleData,
1727        bundle: &RoomKeyBundle,
1728        forwarder_data: &ForwarderData,
1729        progress_listener: impl Fn(usize, usize),
1730    ) -> Result<(), CryptoStoreError> {
1731        let (good, bad): (Vec<_>, Vec<_>) = bundle.room_keys.iter().partition_map(|key| {
1732            if key.room_id != bundle_info.bundle_data.room_id {
1733                trace!("Ignoring key for incorrect room {} in bundle", key.room_id);
1734                Either::Right(key)
1735            } else {
1736                Either::Left(key)
1737            }
1738        });
1739
1740        match (bad.is_empty(), good.is_empty()) {
1741            // Case 1: Completely empty bundle.
1742            (true, true) => {
1743                warn!("Received a completely empty room key bundle");
1744            }
1745
1746            // Case 2: A bundle for the wrong room.
1747            (false, true) => {
1748                let bad_keys: Vec<_> =
1749                    bad.iter().map(|&key| (&key.room_id, &key.session_id)).collect();
1750
1751                warn!(
1752                    ?bad_keys,
1753                    "Received a room key bundle for the wrong room, ignoring all room keys from the bundle"
1754                );
1755            }
1756
1757            // Case 3: A bundle containing useful room keys.
1758            (_, false) => {
1759                // We have at least some good keys, if we also have some bad ones let's
1760                // mention that here.
1761                if !bad.is_empty() {
1762                    warn!(
1763                        bad_key_count = bad.len(),
1764                        "The room key bundle contained some room keys \
1765                         that were meant for a different room"
1766                    );
1767                }
1768
1769                let keys = good.iter().filter_map(|key| {
1770                    key.try_into_inbound_group_session(forwarder_data)
1771                        .map_err(|e| {
1772                            warn!(
1773                                sender_key = ?key.sender_key().to_base64(),
1774                                room_id = ?key.room_id(),
1775                                session_id = key.session_id(),
1776                                error = ?e,
1777                                "Couldn't import a room key from a key bundle."
1778                            );
1779                        })
1780                        .ok()
1781                });
1782
1783                self.import_sessions_impl(keys, None, progress_listener).await?;
1784            }
1785        }
1786
1787        Ok(())
1788    }
1789
1790    async fn import_room_key_bundle_withheld_info(
1791        &self,
1792        bundle_info: &StoredRoomKeyBundleData,
1793        bundle: &RoomKeyBundle,
1794    ) -> Result<(), CryptoStoreError> {
1795        let mut session_id_to_withheld_code_map = BTreeMap::new();
1796
1797        let mut changes = Changes::default();
1798        for withheld in &bundle.withheld {
1799            let (room_id, session_id) = match withheld {
1800                RoomKeyWithheldContent::MegolmV1AesSha2(c) => match (c.room_id(), c.session_id()) {
1801                    (Some(room_id), Some(session_id)) => (room_id, session_id),
1802                    _ => continue,
1803                },
1804                #[cfg(feature = "experimental-algorithms")]
1805                RoomKeyWithheldContent::MegolmV2AesSha2(c) => match (c.room_id(), c.session_id()) {
1806                    (Some(room_id), Some(session_id)) => (room_id, session_id),
1807                    _ => continue,
1808                },
1809                RoomKeyWithheldContent::Unknown(_) => continue,
1810            };
1811
1812            if room_id != bundle_info.bundle_data.room_id {
1813                trace!("Ignoring withheld info for incorrect room {} in bundle", room_id);
1814                continue;
1815            }
1816
1817            changes.withheld_session_info.entry(room_id.to_owned()).or_default().insert(
1818                session_id.to_owned(),
1819                RoomKeyWithheldEntry {
1820                    sender: bundle_info.sender_user.clone(),
1821                    content: withheld.to_owned(),
1822                },
1823            );
1824            session_id_to_withheld_code_map.insert(session_id, withheld.withheld_code());
1825        }
1826
1827        self.save_changes(changes).await?;
1828
1829        info!(
1830            room_id = ?bundle_info.bundle_data.room_id,
1831            ?session_id_to_withheld_code_map,
1832            "Successfully imported withheld info from room key bundle",
1833        );
1834
1835        Ok(())
1836    }
1837
1838    /// Store the fact that we have accepted an invite for a given room on this
1839    /// client, so should accept an [MSC4268] key bundle if one arrives
1840    /// soon.
1841    ///
1842    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
1843    pub async fn store_room_pending_key_bundle(
1844        &self,
1845        room_id: &RoomId,
1846        inviter: &UserId,
1847    ) -> Result<(), CryptoStoreError> {
1848        let invite_accepted_at = MilliSecondsSinceUnixEpoch::now();
1849        self.save_changes(Changes {
1850            rooms_pending_key_bundle: HashMap::from([(
1851                room_id.to_owned(),
1852                Some(RoomPendingKeyBundleDetails {
1853                    room_id: room_id.to_owned(),
1854                    invite_accepted_at,
1855                    inviter: inviter.to_owned(),
1856                }),
1857            )]),
1858            ..Default::default()
1859        })
1860        .await?;
1861
1862        Ok(())
1863    }
1864
1865    /// Clear the record of accepting an invite for a specific room.
1866    ///
1867    /// The counterpart of [`Store::store_room_pending_key_bundle`].
1868    pub async fn clear_room_pending_key_bundle(&self, room_id: &RoomId) -> Result<()> {
1869        self.save_changes(Changes {
1870            rooms_pending_key_bundle: HashMap::from([(room_id.to_owned(), None)]),
1871            ..Default::default()
1872        })
1873        .await
1874    }
1875}
1876
1877impl Deref for Store {
1878    type Target = DynCryptoStore;
1879
1880    fn deref(&self) -> &Self::Target {
1881        self.inner.store.deref().deref()
1882    }
1883}
1884
1885/// A crypto store that implements primitives for cross-process locking.
1886#[derive(Clone, Debug)]
1887pub struct LockableCryptoStore(Arc<dyn CryptoStore<Error = CryptoStoreError>>);
1888
1889impl matrix_sdk_common::cross_process_lock::TryLock for LockableCryptoStore {
1890    type LockError = CryptoStoreError;
1891
1892    async fn try_lock(
1893        &self,
1894        lease_duration_ms: u32,
1895        key: &str,
1896        holder: &str,
1897    ) -> std::result::Result<Option<CrossProcessLockGeneration>, Self::LockError> {
1898        self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
1899    }
1900}
1901
1902#[cfg(test)]
1903mod tests {
1904    use std::{collections::BTreeMap, pin::pin};
1905
1906    use assert_matches2::{assert_let, assert_matches};
1907    use futures_util::StreamExt;
1908    use insta::{_macro_support::Content, assert_json_snapshot, internals::ContentPath};
1909    use matrix_sdk_test::async_test;
1910    use rand::RngExt;
1911    use ruma::{
1912        RoomId, device_id,
1913        events::room::{EncryptedFile, EncryptedFileHashes, V2EncryptedFileInfo},
1914        owned_device_id, owned_mxc_uri, room_id, user_id,
1915    };
1916    use serde_json::json;
1917    use vodozemac::{Ed25519Keypair, megolm::SessionKey};
1918
1919    use crate::{
1920        Account, OlmMachine,
1921        machine::test_helpers::get_machine_pair,
1922        olm::{InboundGroupSession, SenderData},
1923        store::types::{DehydratedDeviceKey, RoomKeyWithheldEntry, StoredRoomKeyBundleData},
1924        types::{
1925            EventEncryptionAlgorithm,
1926            events::{
1927                room_key_bundle::RoomKeyBundleContent,
1928                room_key_withheld::{MegolmV1AesSha2WithheldContent, RoomKeyWithheldContent},
1929            },
1930        },
1931    };
1932
1933    #[async_test]
1934    async fn test_merge_received_group_session() {
1935        let alice_account = Account::with_device_id(user_id!("@a:s.co"), device_id!("ABC"));
1936        let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("DEF")).await;
1937
1938        let room_id = room_id!("!test:localhost");
1939
1940        let megolm_signing_key = Ed25519Keypair::new();
1941        let inbound = make_inbound_group_session(&alice_account, &megolm_signing_key, room_id);
1942
1943        // Bob already knows about the session, at index 5, with the device keys.
1944        let mut inbound_at_index_5 =
1945            InboundGroupSession::from_export(&inbound.export_at_index(5).await).unwrap();
1946        inbound_at_index_5.sender_data = inbound.sender_data.clone();
1947        bob.store().save_inbound_group_sessions(&[inbound_at_index_5.clone()]).await.unwrap();
1948
1949        // No changes if we get a disconnected session.
1950        let disconnected = make_inbound_group_session(&alice_account, &megolm_signing_key, room_id);
1951        assert_eq!(bob.store().merge_received_group_session(disconnected).await.unwrap(), None);
1952
1953        // No changes needed when we receive a worse copy of the session
1954        let mut worse =
1955            InboundGroupSession::from_export(&inbound.export_at_index(10).await).unwrap();
1956        worse.sender_data = inbound.sender_data.clone();
1957        assert_eq!(bob.store().merge_received_group_session(worse).await.unwrap(), None);
1958
1959        // Nor when we receive an exact copy of what we already have
1960        let mut copy = InboundGroupSession::from_pickle(inbound_at_index_5.pickle().await).unwrap();
1961        copy.sender_data = inbound.sender_data.clone();
1962        assert_eq!(bob.store().merge_received_group_session(copy).await.unwrap(), None);
1963
1964        // But when we receive a better copy of the session, we should get it back
1965        let mut better =
1966            InboundGroupSession::from_export(&inbound.export_at_index(0).await).unwrap();
1967        better.sender_data = inbound.sender_data.clone();
1968        assert_let!(Some(update) = bob.store().merge_received_group_session(better).await.unwrap());
1969        assert_eq!(update.first_known_index(), 0);
1970
1971        // A worse copy of the ratchet, but better trust data
1972        {
1973            let mut worse_ratchet_better_trust =
1974                InboundGroupSession::from_export(&inbound.export_at_index(10).await).unwrap();
1975            let updated_sender_data = SenderData::sender_verified(
1976                alice_account.user_id(),
1977                alice_account.device_id(),
1978                Ed25519Keypair::new().public_key(),
1979            );
1980            worse_ratchet_better_trust.sender_data = updated_sender_data.clone();
1981            assert_let!(
1982                Some(update) = bob
1983                    .store()
1984                    .merge_received_group_session(worse_ratchet_better_trust)
1985                    .await
1986                    .unwrap()
1987            );
1988            assert_eq!(update.sender_data, updated_sender_data);
1989            assert_eq!(update.first_known_index(), 5);
1990            assert_eq!(
1991                update.export_at_index(0).await.session_key.to_bytes(),
1992                inbound.export_at_index(5).await.session_key.to_bytes()
1993            );
1994        }
1995
1996        // A better copy of the ratchet, but worse trust data
1997        {
1998            let mut better_ratchet_worse_trust =
1999                InboundGroupSession::from_export(&inbound.export_at_index(0).await).unwrap();
2000            let updated_sender_data = SenderData::unknown();
2001            better_ratchet_worse_trust.sender_data = updated_sender_data.clone();
2002            assert_let!(
2003                Some(update) = bob
2004                    .store()
2005                    .merge_received_group_session(better_ratchet_worse_trust)
2006                    .await
2007                    .unwrap()
2008            );
2009            assert_eq!(update.sender_data, inbound.sender_data);
2010            assert_eq!(update.first_known_index(), 0);
2011            assert_eq!(
2012                update.export_at_index(0).await.session_key.to_bytes(),
2013                inbound.export_at_index(0).await.session_key.to_bytes()
2014            );
2015        }
2016    }
2017
2018    /// Create an [`InboundGroupSession`] for the given room, using the given
2019    /// Ed25519 key as the signing key/session ID.
2020    fn make_inbound_group_session(
2021        sender_account: &Account,
2022        signing_key: &Ed25519Keypair,
2023        room_id: &RoomId,
2024    ) -> InboundGroupSession {
2025        InboundGroupSession::new(
2026            sender_account.identity_keys.curve25519,
2027            sender_account.identity_keys.ed25519,
2028            room_id,
2029            &make_session_key(signing_key),
2030            SenderData::device_info(crate::types::DeviceKeys::new(
2031                sender_account.user_id().to_owned(),
2032                sender_account.device_id().to_owned(),
2033                vec![],
2034                BTreeMap::new(),
2035                crate::types::Signatures::new(),
2036            )),
2037            None,
2038            EventEncryptionAlgorithm::MegolmV1AesSha2,
2039            Some(ruma::events::room::history_visibility::HistoryVisibility::Shared),
2040            true,
2041        )
2042        .unwrap()
2043    }
2044
2045    /// Make a Megolm [`SessionKey`] using the given Ed25519 key as a signing
2046    /// key/session ID.
2047    fn make_session_key(signing_key: &Ed25519Keypair) -> SessionKey {
2048        // `SessionKey::new` is not public, so the easiest way to construct a Megolm
2049        // session using a known Ed25519 key is to build a byte array in the export
2050        // format.
2051
2052        let mut session_key_bytes = vec![0u8; 229];
2053        // 0: version
2054        session_key_bytes[0] = 2;
2055        // 1..5: index
2056        // 5..133: ratchet key
2057        rand::rng().fill(&mut session_key_bytes[5..133]);
2058        // 133..165: public ed25519 key
2059        session_key_bytes[133..165].copy_from_slice(signing_key.public_key().as_bytes());
2060        // 165..229: signature
2061        let sig = signing_key.sign(&session_key_bytes[0..165]);
2062        session_key_bytes[165..229].copy_from_slice(&sig.to_bytes());
2063
2064        SessionKey::from_bytes(&session_key_bytes).unwrap()
2065    }
2066
2067    #[async_test]
2068    async fn test_import_room_keys_notifies_stream() {
2069        use futures_util::FutureExt;
2070
2071        let (alice, bob, _) =
2072            get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2073
2074        let room1_id = room_id!("!room1:localhost");
2075        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2076        let exported_sessions = alice.store().export_room_keys(|_| true).await.unwrap();
2077
2078        let mut room_keys_received_stream = Box::pin(bob.store().room_keys_received_stream());
2079        bob.store().import_room_keys(exported_sessions, None, |_, _| {}).await.unwrap();
2080
2081        let room_keys = room_keys_received_stream
2082            .next()
2083            .now_or_never()
2084            .flatten()
2085            .expect("We should have received an update of room key infos")
2086            .unwrap();
2087        assert_eq!(room_keys.len(), 1);
2088        assert_eq!(room_keys[0].room_id, "!room1:localhost");
2089    }
2090
2091    #[async_test]
2092    async fn test_export_room_keys_provides_selected_keys() {
2093        // Given an OlmMachine with room keys in it
2094        let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2095        let room1_id = room_id!("!room1:localhost");
2096        let room2_id = room_id!("!room2:localhost");
2097        let room3_id = room_id!("!room3:localhost");
2098        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2099        alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2100        alice.create_outbound_group_session_with_defaults_test_helper(room3_id).await.unwrap();
2101
2102        // When I export some of the keys
2103        let keys = alice
2104            .store()
2105            .export_room_keys(|s| s.room_id() == room2_id || s.room_id() == room3_id)
2106            .await
2107            .unwrap();
2108
2109        // Then the requested keys were provided
2110        assert_eq!(keys.len(), 2);
2111        assert_eq!(keys[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2112        assert_eq!(keys[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2113        assert_eq!(keys[0].room_id, "!room2:localhost");
2114        assert_eq!(keys[1].room_id, "!room3:localhost");
2115        assert_eq!(keys[0].session_key.to_base64().len(), 220);
2116        assert_eq!(keys[1].session_key.to_base64().len(), 220);
2117    }
2118
2119    #[async_test]
2120    async fn test_export_room_keys_stream_can_provide_all_keys() {
2121        // Given an OlmMachine with room keys in it
2122        let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2123        let room1_id = room_id!("!room1:localhost");
2124        let room2_id = room_id!("!room2:localhost");
2125        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2126        alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2127
2128        // When I export the keys as a stream
2129        let mut keys = pin!(alice.store().export_room_keys_stream(|_| true).await.unwrap());
2130
2131        // And collect them
2132        let mut collected = vec![];
2133        while let Some(key) = keys.next().await {
2134            collected.push(key);
2135        }
2136
2137        // Then all the keys were provided
2138        assert_eq!(collected.len(), 2);
2139        assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2140        assert_eq!(collected[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2141        assert_eq!(collected[0].room_id, "!room1:localhost");
2142        assert_eq!(collected[1].room_id, "!room2:localhost");
2143        assert_eq!(collected[0].session_key.to_base64().len(), 220);
2144        assert_eq!(collected[1].session_key.to_base64().len(), 220);
2145    }
2146
2147    #[async_test]
2148    async fn test_export_room_keys_stream_can_provide_a_subset_of_keys() {
2149        // Given an OlmMachine with room keys in it
2150        let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2151        let room1_id = room_id!("!room1:localhost");
2152        let room2_id = room_id!("!room2:localhost");
2153        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2154        alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2155
2156        // When I export the keys as a stream
2157        let mut keys =
2158            pin!(alice.store().export_room_keys_stream(|s| s.room_id() == room1_id).await.unwrap());
2159
2160        // And collect them
2161        let mut collected = vec![];
2162        while let Some(key) = keys.next().await {
2163            collected.push(key);
2164        }
2165
2166        // Then all the keys matching our predicate were provided, and no others
2167        assert_eq!(collected.len(), 1);
2168        assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2169        assert_eq!(collected[0].room_id, "!room1:localhost");
2170        assert_eq!(collected[0].session_key.to_base64().len(), 220);
2171    }
2172
2173    #[async_test]
2174    async fn test_export_secrets_bundle() {
2175        let user_id = user_id!("@alice:example.com");
2176        let (first, second, _) = get_machine_pair(user_id, user_id, false).await;
2177
2178        let _ = first
2179            .bootstrap_cross_signing(false)
2180            .await
2181            .expect("We should be able to bootstrap cross-signing");
2182
2183        let bundle = first.store().export_secrets_bundle().await.expect(
2184            "We should be able to export the secrets bundle, now that we \
2185             have the cross-signing keys",
2186        );
2187
2188        assert!(bundle.backup.is_none(), "The bundle should not contain a backup key");
2189
2190        second
2191            .store()
2192            .import_secrets_bundle(&bundle)
2193            .await
2194            .expect("We should be able to import the secrets bundle");
2195
2196        let status = second.cross_signing_status().await;
2197        let identity = second.get_identity(user_id, None).await.unwrap().unwrap().own().unwrap();
2198
2199        assert!(identity.is_verified(), "The public identity should be marked as verified.");
2200
2201        assert!(status.is_complete(), "We should have imported all the cross-signing keys");
2202    }
2203
2204    #[async_test]
2205    async fn test_create_dehydrated_device_key() {
2206        let pickle_key = DehydratedDeviceKey::new();
2207
2208        let to_vec = pickle_key.inner.to_vec();
2209        let pickle_key_from_slice = DehydratedDeviceKey::from_slice(to_vec.as_slice())
2210            .expect("Should be able to create a dehydrated device key from slice");
2211
2212        assert_eq!(pickle_key_from_slice.to_base64(), pickle_key.to_base64());
2213    }
2214
2215    #[async_test]
2216    async fn test_create_dehydrated_errors() {
2217        let too_small = [0u8; 22];
2218        let pickle_key = DehydratedDeviceKey::from_slice(&too_small);
2219
2220        assert!(pickle_key.is_err());
2221
2222        let too_big = [0u8; 40];
2223        let pickle_key = DehydratedDeviceKey::from_slice(&too_big);
2224
2225        assert!(pickle_key.is_err());
2226    }
2227
2228    #[async_test]
2229    async fn test_build_room_key_bundle() {
2230        // Given: Alice has sent a number of room keys to Bob, including some in the
2231        // wrong room, and some that are not marked as shared...
2232        let alice = OlmMachine::new(user_id!("@a:s.co"), device_id!("ALICE")).await;
2233        let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("BOB")).await;
2234
2235        let room1_id = room_id!("!room1:localhost");
2236        let room2_id = room_id!("!room2:localhost");
2237
2238        /* We use hardcoded megolm session data, to get a stable output snapshot. These were all created with:
2239
2240           println!("{}", vodozemac::megolm::GroupSession::new(Default::default()).session_key().to_base64());
2241        */
2242        let session_key1 = "AgAAAAC2XHVzsMBKs4QCRElJ92CJKyGtknCSC8HY7cQ7UYwndMKLQAejXLh5UA0l6s736mgctcUMNvELScUWrObdflrHo+vth/gWreXOaCnaSxmyjjKErQwyIYTkUfqbHy40RJfEesLwnN23on9XAkch/iy8R2+Jz7B8zfG01f2Ow2SxPQFnAndcO1ZSD2GmXgedy6n4B20MWI1jGP2wiexOWbFSya8DO/VxC9m5+/mF+WwYqdpKn9g4Y05Yw4uz7cdjTc3rXm7xK+8E7hI//5QD1nHPvuKYbjjM9u2JSL+Bzp61Cw";
2243        let session_key2 = "AgAAAAC1BXreFTUQQSBGekTEuYxhdytRKyv4JgDGcG+VOBYdPNGgs807SdibCGJky4lJ3I+7ZDGHoUzZPZP/4ogGu4kxni0PWdtWuN7+5zsuamgoFF/BkaGeUUGv6kgIkx8pyPpM5SASTUEP9bN2loDSpUPYwfiIqz74DgC4WQ4435sTBctYvKz8n+TDJwdLXpyT6zKljuqADAioud+s/iqx9LYn9HpbBfezZcvbg67GtE113pLrvde3IcPI5s6dNHK2onGO2B2eoaobcen18bbEDnlUGPeIivArLya7Da6us14jBQ";
2244        let session_key3 = "AgAAAAAM9KFsliaUUhGSXgwOzM5UemjkNH4n8NHgvC/y8hhw13zTF+ooGD4uIYEXYX630oNvQm/EvgZo+dkoc0re+vsqsx4sQeNODdSjcBsWOa0oDF+irQn9oYoLUDPI1IBtY1rX+FV99Zm/xnG7uFOX7aTVlko2GSdejy1w9mfobmfxu5aUc04A9zaKJP1pOthZvRAlhpymGYHgsDtWPrrjyc/yypMflE4kIUEEEtu1kT6mrAmcl615XYRAHYK9G2+fZsGvokwzbkl4nulGwcZMpQEoM0nD2o3GWgX81HW3nGfKBg";
2245        let session_key4 = "AgAAAAA4Kkesxq2h4v9PLD6Sm3Smxspz1PXTqytQPCMQMkkrHNmzV2bHlJ+6/Al9cu8vh1Oj69AK0WUAeJOJuaiskEeg/PI3P03+UYLeC379RzgqwSHdBgdQ41G2vD6zpgmE/8vYToe+qpCZACtPOswZxyqxHH+T/Iq0nv13JmlFGIeA6fEPfr5Y28B49viG74Fs9rxV9EH5PfjbuPM/p+Sz5obShuaBPKQBX1jT913nEXPoIJ06exNZGr0285nw/LgVvNlmWmbqNnbzO2cNZjQWA+xZYz5FSfyCxwqEBbEdUCuRCQ";
2246
2247        let sessions = [
2248            create_inbound_group_session_with_visibility(
2249                &alice,
2250                room1_id,
2251                &SessionKey::from_base64(session_key1).unwrap(),
2252                true,
2253            ),
2254            create_inbound_group_session_with_visibility(
2255                &alice,
2256                room1_id,
2257                &SessionKey::from_base64(session_key2).unwrap(),
2258                true,
2259            ),
2260            create_inbound_group_session_with_visibility(
2261                &alice,
2262                room1_id,
2263                &SessionKey::from_base64(session_key3).unwrap(),
2264                false,
2265            ),
2266            create_inbound_group_session_with_visibility(
2267                &alice,
2268                room2_id,
2269                &SessionKey::from_base64(session_key4).unwrap(),
2270                true,
2271            ),
2272        ];
2273        bob.store().save_inbound_group_sessions(&sessions).await.unwrap();
2274
2275        // When I build the bundle
2276        let mut bundle = bob.store().build_room_key_bundle(room1_id).await.unwrap();
2277
2278        // Then the bundle matches the snapshot.
2279
2280        // We sort the sessions in the bundle, so that the snapshot is stable.
2281        bundle.room_keys.sort_by_key(|session| session.session_id.clone());
2282
2283        // We substitute the algorithm, since this changes based on feature flags.
2284        let algorithm = if cfg!(feature = "experimental-algorithms") {
2285            "m.megolm.v2.aes-sha2"
2286        } else {
2287            "m.megolm.v1.aes-sha2"
2288        };
2289        let map_algorithm = move |value: Content, _path: ContentPath<'_>| {
2290            assert_eq!(value.as_str().unwrap(), algorithm);
2291            "[algorithm]"
2292        };
2293
2294        // We also substitute alice's keys in the snapshot with placeholders
2295        let alice_curve_key = alice.identity_keys().curve25519.to_base64();
2296        let map_alice_curve_key = move |value: Content, _path: ContentPath<'_>| {
2297            assert_eq!(value.as_str().unwrap(), alice_curve_key);
2298            "[alice curve key]"
2299        };
2300        let alice_ed25519_key = alice.identity_keys().ed25519.to_base64();
2301        let map_alice_ed25519_key = move |value: Content, _path: ContentPath<'_>| {
2302            assert_eq!(value.as_str().unwrap(), alice_ed25519_key);
2303            "[alice ed25519 key]"
2304        };
2305
2306        insta::with_settings!({ sort_maps => true }, {
2307            assert_json_snapshot!(bundle, {
2308                ".withheld[].algorithm" => insta::dynamic_redaction(map_algorithm),
2309                ".room_keys[].algorithm" => insta::dynamic_redaction(map_algorithm),
2310                ".room_keys[].sender_key" => insta::dynamic_redaction(map_alice_curve_key.clone()),
2311                ".withheld[].sender_key" => insta::dynamic_redaction(map_alice_curve_key),
2312                ".room_keys[].sender_claimed_keys.ed25519" => insta::dynamic_redaction(map_alice_ed25519_key),
2313            });
2314        });
2315    }
2316
2317    #[async_test]
2318    async fn test_receive_room_key_bundle() {
2319        let alice = OlmMachine::new(user_id!("@a:s.co"), device_id!("ALICE")).await;
2320        let alice_key = alice.identity_keys().curve25519;
2321        let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("BOB")).await;
2322
2323        let room_id = room_id!("!room1:localhost");
2324
2325        let session_key1 = "AgAAAAC2XHVzsMBKs4QCRElJ92CJKyGtknCSC8HY7cQ7UYwndMKLQAejXLh5UA0l6s736mgctcUMNvELScUWrObdflrHo+vth/gWreXOaCnaSxmyjjKErQwyIYTkUfqbHy40RJfEesLwnN23on9XAkch/iy8R2+Jz7B8zfG01f2Ow2SxPQFnAndcO1ZSD2GmXgedy6n4B20MWI1jGP2wiexOWbFSya8DO/VxC9m5+/mF+WwYqdpKn9g4Y05Yw4uz7cdjTc3rXm7xK+8E7hI//5QD1nHPvuKYbjjM9u2JSL+Bzp61Cw";
2326        let session_key2 = "AgAAAAC1BXreFTUQQSBGekTEuYxhdytRKyv4JgDGcG+VOBYdPNGgs807SdibCGJky4lJ3I+7ZDGHoUzZPZP/4ogGu4kxni0PWdtWuN7+5zsuamgoFF/BkaGeUUGv6kgIkx8pyPpM5SASTUEP9bN2loDSpUPYwfiIqz74DgC4WQ4435sTBctYvKz8n+TDJwdLXpyT6zKljuqADAioud+s/iqx9LYn9HpbBfezZcvbg67GtE113pLrvde3IcPI5s6dNHK2onGO2B2eoaobcen18bbEDnlUGPeIivArLya7Da6us14jBQ";
2327
2328        let sessions = [
2329            create_inbound_group_session_with_visibility(
2330                &alice,
2331                room_id,
2332                &SessionKey::from_base64(session_key1).unwrap(),
2333                true,
2334            ),
2335            create_inbound_group_session_with_visibility(
2336                &alice,
2337                room_id,
2338                &SessionKey::from_base64(session_key2).unwrap(),
2339                false,
2340            ),
2341        ];
2342
2343        alice.store().save_inbound_group_sessions(&sessions).await.unwrap();
2344        let bundle = alice.store().build_room_key_bundle(room_id).await.unwrap();
2345
2346        bob.store()
2347            .receive_room_key_bundle(
2348                &StoredRoomKeyBundleData {
2349                    sender_user: alice.user_id().to_owned(),
2350                    sender_key: alice_key,
2351                    sender_data: SenderData::sender_verified(
2352                        alice.user_id(),
2353                        device_id!("ALICE"),
2354                        alice.identity_keys().ed25519,
2355                    ),
2356
2357                    bundle_data: RoomKeyBundleContent {
2358                        room_id: room_id.to_owned(),
2359                        // This isn't used at all in the method call, so we can fill it with
2360                        // garbage.
2361                        file: EncryptedFile::new(
2362                            owned_mxc_uri!("mxc://example.com/0"),
2363                            V2EncryptedFileInfo::encode([0; 32], [0; 16]).into(),
2364                            EncryptedFileHashes::with_sha256([0; 32]),
2365                        ),
2366                    },
2367                },
2368                bundle,
2369                |_, _| {},
2370            )
2371            .await
2372            .unwrap();
2373
2374        // The room key should be imported successfully
2375        let imported_sessions =
2376            bob.store().get_inbound_group_sessions_by_room_id(room_id).await.unwrap();
2377
2378        assert_eq!(imported_sessions.len(), 1);
2379        assert_eq!(imported_sessions[0].room_id(), room_id);
2380
2381        // The session forwarder data should be set correctly.
2382        assert_eq!(
2383            imported_sessions[0]
2384                .forwarder_data
2385                .as_ref()
2386                .expect("Session should contain forwarder data.")
2387                .user_id(),
2388            alice.user_id()
2389        );
2390
2391        assert_matches!(
2392            bob.store()
2393                .get_withheld_info(room_id, sessions[1].session_id())
2394                .await
2395                .unwrap()
2396                .expect("Withheld info should be present in the store."),
2397            RoomKeyWithheldEntry {
2398                #[cfg(not(feature = "experimental-algorithms"))]
2399                content: RoomKeyWithheldContent::MegolmV1AesSha2(
2400                    MegolmV1AesSha2WithheldContent::HistoryNotShared(_)
2401                ),
2402                #[cfg(feature = "experimental-algorithms")]
2403                content: RoomKeyWithheldContent::MegolmV2AesSha2(
2404                    MegolmV1AesSha2WithheldContent::HistoryNotShared(_)
2405                ),
2406                ..
2407            }
2408        );
2409    }
2410
2411    /// Tests that the new store format introduced in [#5737][#5737] does not
2412    /// conflict with items already in the store that were serialised with the
2413    /// older format.
2414    ///
2415    /// [#5737]: https://github.com/matrix-org/matrix-rust-sdk/pull/5737
2416    #[async_test]
2417    async fn test_deserialize_room_key_withheld_entry_from_to_device_event() {
2418        let entry: RoomKeyWithheldEntry = serde_json::from_value(json!(
2419            {
2420              "content": {
2421                "algorithm": "m.megolm.v1.aes-sha2",
2422                "code": "m.unauthorised",
2423                "from_device": "ALICE",
2424                "reason": "You are not authorised to read the message.",
2425                "room_id": "!roomid:s.co",
2426                "sender_key": "7hIcOrEroXYdzjtCBvBjUiqvT0Me7g+ymeXqoc65RS0",
2427                "session_id": "session123"
2428              },
2429              "sender": "@alice:s.co",
2430              "type": "m.room_key.withheld"
2431            }
2432        ))
2433        .unwrap();
2434
2435        assert_matches!(
2436            entry,
2437            RoomKeyWithheldEntry {
2438                sender,
2439                content: RoomKeyWithheldContent::MegolmV1AesSha2(
2440                    MegolmV1AesSha2WithheldContent::Unauthorised(withheld_content,)
2441                ),
2442            }
2443        );
2444
2445        assert_eq!(sender, "@alice:s.co");
2446        assert_eq!(withheld_content.room_id, "!roomid:s.co");
2447        assert_eq!(withheld_content.session_id, "session123");
2448        assert_eq!(
2449            withheld_content.sender_key.to_base64(),
2450            "7hIcOrEroXYdzjtCBvBjUiqvT0Me7g+ymeXqoc65RS0"
2451        );
2452        assert_eq!(withheld_content.from_device, Some(owned_device_id!("ALICE")));
2453    }
2454
2455    /// Create an inbound Megolm session for the given room.
2456    ///
2457    /// `olm_machine` is used to set the `sender_key` and `signing_key`
2458    /// fields of the resultant session.
2459    ///
2460    /// The encryption algorithm used for the session depends on the
2461    /// `experimental-algorithms` feature flag:
2462    ///
2463    /// - When not set, the session uses `m.megolm.v1.aes-sha2`.
2464    /// - When set, the session uses `m.megolm.v2.aes-sha2`.
2465    fn create_inbound_group_session_with_visibility(
2466        olm_machine: &OlmMachine,
2467        room_id: &RoomId,
2468        session_key: &SessionKey,
2469        shared_history: bool,
2470    ) -> InboundGroupSession {
2471        let identity_keys = &olm_machine.store().static_account().identity_keys;
2472        InboundGroupSession::new(
2473            identity_keys.curve25519,
2474            identity_keys.ed25519,
2475            room_id,
2476            session_key,
2477            SenderData::unknown(),
2478            None,
2479            #[cfg(not(feature = "experimental-algorithms"))]
2480            EventEncryptionAlgorithm::MegolmV1AesSha2,
2481            #[cfg(feature = "experimental-algorithms")]
2482            EventEncryptionAlgorithm::MegolmV2AesSha2,
2483            None,
2484            shared_history,
2485        )
2486        .unwrap()
2487    }
2488}