Skip to main content

matrix_sdk_crypto/store/
mod.rs

1// Copyright 2020 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Types and traits to implement the storage layer for the [`OlmMachine`]
16//!
17//! The storage layer for the [`OlmMachine`] can be customized using a trait.
18//! Implementing your own [`CryptoStore`]
19//!
20//! An in-memory only store is provided as well as an SQLite-based one,
21//! depending on your needs and targets a custom store may be implemented, e.g.
22//! for `wasm-unknown-unknown` an indexeddb store would be needed
23//!
24//! ```
25//! # use std::sync::Arc;
26//! # use matrix_sdk_crypto::{
27//! #     OlmMachine,
28//! #     store::MemoryStore,
29//! # };
30//! # use ruma::{device_id, user_id};
31//! # let user_id = user_id!("@example:localhost");
32//! # let device_id = device_id!("TEST");
33//! let store = Arc::new(MemoryStore::new());
34//!
35//! let machine = OlmMachine::with_store(user_id, device_id, store, None);
36//! ```
37//!
38//! [`OlmMachine`]: /matrix_sdk_crypto/struct.OlmMachine.html
39//! [`CryptoStore`]: trait.Cryptostore.html
40
41use std::{
42    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
43    fmt::Debug,
44    ops::Deref,
45    pin::pin,
46    sync::{Arc, atomic::Ordering},
47    time::Duration,
48};
49
50use as_variant::as_variant;
51use futures_core::Stream;
52use futures_util::StreamExt;
53use itertools::{Either, Itertools};
54use ruma::{
55    DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedUserId, RoomId, UserId,
56    encryption::KeyUsage, events::secret::request::SecretName,
57};
58use serde::{Serialize, de::DeserializeOwned};
59use thiserror::Error;
60use tokio::sync::{Mutex, Notify, OwnedRwLockWriteGuard, RwLock};
61use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
62use tracing::{info, instrument, trace, warn};
63use types::{RoomKeyBundleInfo, StoredRoomKeyBundleData};
64use vodozemac::{Curve25519PublicKey, megolm::SessionOrdering};
65
66use self::types::{
67    Changes, CrossSigningKeyExport, DeviceChanges, DeviceUpdates, IdentityChanges, IdentityUpdates,
68    PendingChanges, RoomKeyInfo, RoomKeyWithheldInfo, RoomPendingKeyBundleDetails,
69    UserKeyQueryResult,
70};
71use crate::{
72    CrossSigningStatus, OwnUserIdentityData, RoomKeyImportResult,
73    gossiping::GossippedSecret,
74    identities::{Device, DeviceData, UserDevices, UserIdentityData, user::UserIdentity},
75    olm::{
76        Account, ExportedRoomKey, ForwarderData, InboundGroupSession, PrivateCrossSigningIdentity,
77        SenderData, Session, StaticAccountData,
78    },
79    store::types::RoomKeyWithheldEntry,
80    types::{
81        BackupSecrets, CrossSigningSecrets, MegolmBackupV1Curve25519AesSha2Secrets, RoomKeyExport,
82        SecretsBundle,
83    },
84    verification::VerificationMachine,
85};
86#[cfg(doc)]
87use crate::{backups::BackupMachine, identities::OwnUserIdentity};
88
89pub mod caches;
90mod crypto_store_wrapper;
91mod error;
92mod memorystore;
93mod traits;
94pub mod types;
95
96#[cfg(any(test, feature = "testing"))]
97#[macro_use]
98#[allow(missing_docs)]
99pub mod integration_tests;
100
101pub(crate) use crypto_store_wrapper::CryptoStoreWrapper;
102pub use error::{CryptoStoreError, Result};
103use matrix_sdk_common::{
104    cross_process_lock::{CrossProcessLock, CrossProcessLockConfig, CrossProcessLockGeneration},
105    deserialized_responses::WithheldCode,
106    timeout::timeout,
107};
108pub use memorystore::MemoryStore;
109pub use traits::{CryptoStore, DynCryptoStore, IntoCryptoStore};
110
111use self::caches::{SequenceNumber, StoreCache, StoreCacheGuard, UsersForKeyQuery};
112use crate::types::{
113    events::room_key_withheld::RoomKeyWithheldContent, room_history::RoomKeyBundle,
114};
115pub use crate::{
116    dehydrated_devices::DehydrationError,
117    gossiping::{GossipRequest, SecretInfo},
118};
119
120/// A wrapper for our CryptoStore trait object.
121///
122/// This is needed because we want to have a generic interface so we can
123/// store/restore objects that we can serialize. Since trait objects and
124/// generics don't mix let the CryptoStore store strings and this wrapper
125/// adds the generic interface on top.
126#[derive(Debug, Clone)]
127pub struct Store {
128    inner: Arc<StoreInner>,
129}
130
131#[derive(Debug, Default)]
132pub(crate) struct KeyQueryManager {
133    /// Record of the users that are waiting for a /keys/query.
134    users_for_key_query: Mutex<UsersForKeyQuery>,
135
136    /// Notifier that is triggered each time an update is received for a user.
137    users_for_key_query_notify: Notify,
138}
139
140impl KeyQueryManager {
141    pub async fn synced<'a>(&'a self, cache: &'a StoreCache) -> Result<SyncedKeyQueryManager<'a>> {
142        self.ensure_sync_tracked_users(cache).await?;
143        Ok(SyncedKeyQueryManager { cache, manager: self })
144    }
145
146    /// Load the list of users for whom we are tracking their device lists and
147    /// fill out our caches.
148    ///
149    /// This method ensures that we're only going to load the users from the
150    /// actual [`CryptoStore`] once, it will also make sure that any
151    /// concurrent calls to this method get deduplicated.
152    async fn ensure_sync_tracked_users(&self, cache: &StoreCache) -> Result<()> {
153        // Check if the users are loaded, and in that case do nothing.
154        let loaded = cache.loaded_tracked_users.read().await;
155        if *loaded {
156            return Ok(());
157        }
158
159        // Otherwise, we may load the users.
160        drop(loaded);
161        let mut loaded = cache.loaded_tracked_users.write().await;
162
163        // Check again if the users have been loaded, in case another call to this
164        // method loaded the tracked users between the time we tried to
165        // acquire the lock and the time we actually acquired the lock.
166        if *loaded {
167            return Ok(());
168        }
169
170        let tracked_users = cache.store.load_tracked_users().await?;
171
172        let mut query_users_lock = self.users_for_key_query.lock().await;
173        let mut tracked_users_cache = cache.tracked_users.write();
174        for user in tracked_users {
175            tracked_users_cache.insert(user.user_id.to_owned());
176
177            if user.dirty {
178                query_users_lock.insert_user(&user.user_id);
179            }
180        }
181
182        *loaded = true;
183
184        Ok(())
185    }
186
187    /// Wait for a `/keys/query` response to be received if one is expected for
188    /// the given user.
189    ///
190    /// If the given timeout elapses, the method will stop waiting and return
191    /// [`UserKeyQueryResult::TimeoutExpired`].
192    ///
193    /// Requires a [`StoreCacheGuard`] to make sure the users for which a key
194    /// query is pending are up to date, but doesn't hold on to it
195    /// thereafter: the lock is short-lived in this case.
196    pub async fn wait_if_user_key_query_pending(
197        &self,
198        cache: StoreCacheGuard,
199        timeout_duration: Duration,
200        user: &UserId,
201    ) -> Result<UserKeyQueryResult> {
202        {
203            // Drop the cache early, so we don't keep it while waiting (since writing the
204            // results requires to write in the cache, thus take another lock).
205            self.ensure_sync_tracked_users(&cache).await?;
206            drop(cache);
207        }
208
209        let mut users_for_key_query = self.users_for_key_query.lock().await;
210        let Some(waiter) = users_for_key_query.maybe_register_waiting_task(user) else {
211            return Ok(UserKeyQueryResult::WasNotPending);
212        };
213
214        let wait_for_completion = async {
215            while !waiter.completed.load(Ordering::Relaxed) {
216                // Register for being notified before releasing the mutex, so
217                // it's impossible to miss a wakeup between the last check for
218                // whether we should wait, and starting to wait.
219                let mut notified = pin!(self.users_for_key_query_notify.notified());
220                notified.as_mut().enable();
221                drop(users_for_key_query);
222
223                // Wait for a notification
224                notified.await;
225
226                // Reclaim the lock before checking the flag to avoid races
227                // when two notifications happen right after each other and the
228                // second one sets the flag we want to wait for.
229                users_for_key_query = self.users_for_key_query.lock().await;
230            }
231        };
232
233        match timeout(Box::pin(wait_for_completion), timeout_duration).await {
234            Err(_) => {
235                warn!(
236                    user_id = ?user,
237                    "The user has a pending `/keys/query` request which did \
238                    not finish yet, some devices might be missing."
239                );
240
241                Ok(UserKeyQueryResult::TimeoutExpired)
242            }
243            _ => Ok(UserKeyQueryResult::WasPending),
244        }
245    }
246}
247
248pub(crate) struct SyncedKeyQueryManager<'a> {
249    cache: &'a StoreCache,
250    manager: &'a KeyQueryManager,
251}
252
253impl SyncedKeyQueryManager<'_> {
254    /// Add entries to the list of users being tracked for device changes
255    ///
256    /// Any users not already on the list are flagged as awaiting a key query.
257    /// Users that were already in the list are unaffected.
258    pub async fn update_tracked_users(&self, users: impl Iterator<Item = &UserId>) -> Result<()> {
259        let mut store_updates = Vec::new();
260        let mut key_query_lock = self.manager.users_for_key_query.lock().await;
261
262        {
263            let mut tracked_users = self.cache.tracked_users.write();
264            for user_id in users {
265                if tracked_users.insert(user_id.to_owned()) {
266                    key_query_lock.insert_user(user_id);
267                    store_updates.push((user_id, true))
268                }
269            }
270        }
271
272        self.cache.store.save_tracked_users(&store_updates).await
273    }
274
275    /// Process notifications that users have changed devices.
276    ///
277    /// This is used to handle the list of device-list updates that is received
278    /// from the `/sync` response. Any users *whose device lists we are
279    /// tracking* are flagged as needing a key query. Users whose devices we
280    /// are not tracking are ignored.
281    pub async fn mark_tracked_users_as_changed(
282        &self,
283        users: impl Iterator<Item = &UserId>,
284    ) -> Result<()> {
285        let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
286        let mut key_query_lock = self.manager.users_for_key_query.lock().await;
287
288        {
289            let tracked_users = &self.cache.tracked_users.read();
290            for user_id in users {
291                if tracked_users.contains(user_id) {
292                    key_query_lock.insert_user(user_id);
293                    store_updates.push((user_id, true));
294                }
295            }
296        }
297
298        self.cache.store.save_tracked_users(&store_updates).await
299    }
300
301    /// Flag that the given users devices are now up-to-date.
302    ///
303    /// This is called after processing the response to a /keys/query request.
304    /// Any users whose device lists we are tracking are removed from the
305    /// list of those pending a /keys/query.
306    pub async fn mark_tracked_users_as_up_to_date(
307        &self,
308        users: impl Iterator<Item = &UserId>,
309        sequence_number: SequenceNumber,
310    ) -> Result<()> {
311        let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
312        let mut key_query_lock = self.manager.users_for_key_query.lock().await;
313
314        {
315            let tracked_users = self.cache.tracked_users.read();
316            for user_id in users {
317                if tracked_users.contains(user_id) {
318                    let clean = key_query_lock.maybe_remove_user(user_id, sequence_number);
319                    store_updates.push((user_id, !clean));
320                }
321            }
322        }
323
324        self.cache.store.save_tracked_users(&store_updates).await?;
325        // wake up any tasks that may have been waiting for updates
326        self.manager.users_for_key_query_notify.notify_waiters();
327
328        Ok(())
329    }
330
331    /// Get the set of users that has the outdate/dirty flag set for their list
332    /// of devices.
333    ///
334    /// This set should be included in a `/keys/query` request which will update
335    /// the device list.
336    ///
337    /// # Returns
338    ///
339    /// A pair `(users, sequence_number)`, where `users` is the list of users to
340    /// be queried, and `sequence_number` is the current sequence number,
341    /// which should be returned in `mark_tracked_users_as_up_to_date`.
342    pub async fn users_for_key_query(&self) -> (HashSet<OwnedUserId>, SequenceNumber) {
343        self.manager.users_for_key_query.lock().await.users_for_key_query()
344    }
345
346    /// See the docs for [`crate::OlmMachine::tracked_users()`].
347    pub fn tracked_users(&self) -> HashSet<OwnedUserId> {
348        self.cache.tracked_users.read().iter().cloned().collect()
349    }
350
351    /// Mark the given user as being tracked for device lists, and mark that it
352    /// has an outdated device list.
353    ///
354    /// This means that the user will be considered for a `/keys/query` request
355    /// next time [`Store::users_for_key_query()`] is called.
356    pub async fn mark_user_as_changed(&self, user: &UserId) -> Result<()> {
357        self.manager.users_for_key_query.lock().await.insert_user(user);
358        self.cache.tracked_users.write().insert(user.to_owned());
359
360        self.cache.store.save_tracked_users(&[(user, true)]).await
361    }
362}
363
364/// Convert the devices and vectors contained in the [`DeviceChanges`] into
365/// a [`DeviceUpdates`] struct.
366///
367/// The [`DeviceChanges`] will contain vectors of [`DeviceData`]s which
368/// we want to convert to a [`Device`].
369fn collect_device_updates(
370    verification_machine: VerificationMachine,
371    own_identity: Option<OwnUserIdentityData>,
372    identities: IdentityChanges,
373    devices: DeviceChanges,
374) -> DeviceUpdates {
375    let mut new: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
376    let mut changed: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
377
378    let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
379
380    let map_device = |device: DeviceData| {
381        let device_owner_identity = new_identities
382            .get(device.user_id())
383            .or_else(|| changed_identities.get(device.user_id()))
384            .or_else(|| unchanged_identities.get(device.user_id()))
385            .cloned();
386
387        Device {
388            inner: device,
389            verification_machine: verification_machine.to_owned(),
390            own_identity: own_identity.to_owned(),
391            device_owner_identity,
392        }
393    };
394
395    for device in devices.new {
396        let device = map_device(device);
397
398        new.entry(device.user_id().to_owned())
399            .or_default()
400            .insert(device.device_id().to_owned(), device);
401    }
402
403    for device in devices.changed {
404        let device = map_device(device);
405
406        changed
407            .entry(device.user_id().to_owned())
408            .or_default()
409            .insert(device.device_id().to_owned(), device.to_owned());
410    }
411
412    DeviceUpdates { new, changed }
413}
414
415/// A temporary transaction (that implies a write) to the underlying store.
416#[allow(missing_debug_implementations)]
417pub struct StoreTransaction {
418    store: Store,
419    changes: PendingChanges,
420    // TODO hold onto the cross-process crypto store lock + cache.
421    cache: OwnedRwLockWriteGuard<StoreCache>,
422}
423
424impl StoreTransaction {
425    /// Starts a new `StoreTransaction`.
426    async fn new(store: Store) -> Self {
427        let cache = store.inner.cache.clone();
428
429        Self { store, changes: PendingChanges::default(), cache: cache.clone().write_owned().await }
430    }
431
432    pub(crate) fn cache(&self) -> &StoreCache {
433        &self.cache
434    }
435
436    /// Returns a reference to the current `Store`.
437    pub fn store(&self) -> &Store {
438        &self.store
439    }
440
441    /// Gets a `Account` for update.
442    ///
443    /// Note: since it's guaranteed that one can't have both a
444    /// `StoreTransaction` and a `StoreCacheGuard` at runtime (since the
445    /// underlying `StoreCache` is guarded by a `RwLock` mutex), this ensures
446    /// that we can't have two copies of an `Account` alive at the same time.
447    pub async fn account(&mut self) -> Result<&mut Account> {
448        if self.changes.account.is_none() {
449            // Make sure the cache loaded the account.
450            let _ = self.cache.account().await?;
451            self.changes.account = self.cache.account.lock().await.take();
452        }
453        Ok(self.changes.account.as_mut().unwrap())
454    }
455
456    /// Commits all dirty fields to the store, and maintains the cache so it
457    /// reflects the current state of the database.
458    pub async fn commit(self) -> Result<()> {
459        if self.changes.is_empty() {
460            return Ok(());
461        }
462
463        // Save changes in the database.
464        let account = self.changes.account.as_ref().map(|acc| acc.deep_clone());
465
466        self.store.save_pending_changes(self.changes).await?;
467
468        // Make the cache coherent with the database.
469        if let Some(account) = account {
470            *self.cache.account.lock().await = Some(account);
471        }
472
473        Ok(())
474    }
475}
476
477#[derive(Debug)]
478struct StoreInner {
479    identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
480    store: Arc<CryptoStoreWrapper>,
481
482    /// In-memory cache for the current crypto store.
483    ///
484    /// ⚠ Must remain private.
485    cache: Arc<RwLock<StoreCache>>,
486
487    verification_machine: VerificationMachine,
488
489    /// Static account data that never changes (and thus can be loaded once and
490    /// for all when creating the store).
491    static_account: StaticAccountData,
492}
493
494/// Error describing what went wrong when importing private cross signing keys
495/// or the key backup key.
496#[derive(Debug, Error)]
497pub enum SecretImportError {
498    /// The key that we tried to import was invalid.
499    #[error("Error while importing {name}: {error}")]
500    Key {
501        /// The name of the secret that was being imported.
502        name: SecretName,
503        /// The key error that occurred.
504        error: vodozemac::KeyError,
505    },
506    /// The public key of the imported private key doesn't match the public
507    /// key that was uploaded to the server.
508    #[error(
509        "Error while importing {name}: The public key of the imported private \
510            key doesn't match the public key that was uploaded to the server"
511    )]
512    MismatchedPublicKeys {
513        /// The name of the secret that was being imported.
514        name: SecretName,
515    },
516    /// The new version of the identity couldn't be stored.
517    #[error(transparent)]
518    Store(#[from] CryptoStoreError),
519}
520
521/// Error describing what went wrong when exporting a [`SecretsBundle`].
522///
523/// The [`SecretsBundle`] can only be exported if we have all cross-signing
524/// private keys in the store.
525#[derive(Debug, Error)]
526pub enum SecretsBundleExportError {
527    /// The store itself had an error.
528    #[error(transparent)]
529    Store(#[from] CryptoStoreError),
530    /// We're missing one or multiple cross-signing keys.
531    #[error("The store is missing one or multiple cross-signing keys")]
532    MissingCrossSigningKey(KeyUsage),
533    /// We're missing all cross-signing keys.
534    #[error("The store doesn't contain any cross-signing keys")]
535    MissingCrossSigningKeys,
536    /// We have a backup key stored, but we don't know the version of the
537    /// backup.
538    #[error("The store contains a backup key, but no backup version")]
539    MissingBackupVersion,
540}
541
542impl Store {
543    /// Create a new Store.
544    pub(crate) fn new(
545        account: StaticAccountData,
546        identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
547        store: Arc<CryptoStoreWrapper>,
548        verification_machine: VerificationMachine,
549    ) -> Self {
550        Self {
551            inner: Arc::new(StoreInner {
552                static_account: account,
553                identity,
554                store: store.clone(),
555                verification_machine,
556                cache: Arc::new(RwLock::new(StoreCache {
557                    store,
558                    tracked_users: Default::default(),
559                    loaded_tracked_users: Default::default(),
560                    account: Default::default(),
561                })),
562            }),
563        }
564    }
565
566    /// UserId associated with this store
567    pub(crate) fn user_id(&self) -> &UserId {
568        &self.inner.static_account.user_id
569    }
570
571    /// DeviceId associated with this store
572    pub(crate) fn device_id(&self) -> &DeviceId {
573        self.inner.verification_machine.own_device_id()
574    }
575
576    /// The static data for the account associated with this store.
577    pub(crate) fn static_account(&self) -> &StaticAccountData {
578        &self.inner.static_account
579    }
580
581    pub(crate) async fn cache(&self) -> Result<StoreCacheGuard> {
582        Ok(StoreCacheGuard { cache: self.inner.cache.clone().read_owned().await })
583    }
584
585    pub(crate) async fn transaction(&self) -> StoreTransaction {
586        StoreTransaction::new(self.clone()).await
587    }
588
589    pub(crate) async fn with_transaction<T>(
590        &self,
591        func: impl AsyncFnOnce(&mut StoreTransaction) -> Result<T, crate::OlmError>,
592    ) -> Result<T, crate::OlmError> {
593        let mut tr = self.transaction().await;
594        let res = func(&mut tr).await?;
595        tr.commit().await?;
596        Ok(res)
597    }
598
599    #[cfg(test)]
600    /// test helper to reset the cross signing identity
601    pub(crate) async fn reset_cross_signing_identity(&self) {
602        self.inner.identity.lock().await.reset();
603    }
604
605    /// PrivateCrossSigningIdentity associated with this store
606    pub(crate) fn private_identity(&self) -> Arc<Mutex<PrivateCrossSigningIdentity>> {
607        self.inner.identity.clone()
608    }
609
610    /// Save the given Sessions to the store
611    pub(crate) async fn save_sessions(&self, sessions: &[Session]) -> Result<()> {
612        let changes = Changes { sessions: sessions.to_vec(), ..Default::default() };
613
614        self.save_changes(changes).await
615    }
616
617    pub(crate) async fn get_sessions(
618        &self,
619        sender_key: &str,
620    ) -> Result<Option<Arc<Mutex<Vec<Session>>>>> {
621        self.inner.store.get_sessions(sender_key).await
622    }
623
624    pub(crate) async fn save_changes(&self, changes: Changes) -> Result<()> {
625        self.inner.store.save_changes(changes).await
626    }
627
628    /// Given an `InboundGroupSession` which we have just received, see if we
629    /// have a matching session already in the store, and determine how to
630    /// handle it.
631    ///
632    /// If the store already has everything we can gather from the new session,
633    /// returns `None`. Otherwise, returns a merged session which should be
634    /// persisted to the store.
635    pub(crate) async fn merge_received_group_session(
636        &self,
637        session: InboundGroupSession,
638    ) -> Result<Option<InboundGroupSession>> {
639        let old_session = self
640            .inner
641            .store
642            .get_inbound_group_session(session.room_id(), session.session_id())
643            .await?;
644
645        // If there is no old session, just use the new session.
646        let Some(old_session) = old_session else {
647            info!("Received a new megolm room key");
648            return Ok(Some(session));
649        };
650
651        let index_comparison = session.compare_ratchet(&old_session).await;
652        let trust_level_comparison =
653            session.sender_data.compare_trust_level(&old_session.sender_data);
654
655        let result = match (index_comparison, trust_level_comparison) {
656            (SessionOrdering::Unconnected, _) => {
657                // If this happens, it means that we have two sessions purporting to have the
658                // same session id, but where the ratchets do not match up.
659                // In other words, someone is playing silly buggers.
660                warn!(
661                    "Received a group session with an ratchet that does not connect to the one in the store, discarding"
662                );
663                None
664            }
665
666            (SessionOrdering::Better, std::cmp::Ordering::Greater)
667            | (SessionOrdering::Better, std::cmp::Ordering::Equal)
668            | (SessionOrdering::Equal, std::cmp::Ordering::Greater) => {
669                // The new session is unambiguously better than what we have in the store.
670                info!(
671                    ?index_comparison,
672                    ?trust_level_comparison,
673                    "Received a megolm room key that we have a worse version of, merging"
674                );
675                Some(session)
676            }
677
678            (SessionOrdering::Worse, std::cmp::Ordering::Less)
679            | (SessionOrdering::Worse, std::cmp::Ordering::Equal)
680            | (SessionOrdering::Equal, std::cmp::Ordering::Less) => {
681                // The new session is unambiguously worse than the one we have in the store.
682                warn!(
683                    ?index_comparison,
684                    ?trust_level_comparison,
685                    "Received a megolm room key that we already have a better version \
686                     of, discarding"
687                );
688                None
689            }
690
691            (SessionOrdering::Equal, std::cmp::Ordering::Equal) => {
692                // The new session is the same as what we have.
693                info!("Received a megolm room key that we already have, discarding");
694                None
695            }
696
697            (SessionOrdering::Better, std::cmp::Ordering::Less) => {
698                // We need to take the ratchet from the new session, and the
699                // sender data from the old session.
700                info!("Upgrading a previously-received megolm session with new ratchet");
701                let result = old_session.with_ratchet(&session);
702                // We'll need to back it up again.
703                result.reset_backup_state();
704                Some(result)
705            }
706
707            (SessionOrdering::Worse, std::cmp::Ordering::Greater) => {
708                // We need to take the ratchet from the old session, and the
709                // sender data from the new session.
710                info!("Upgrading a previously-received megolm session with new sender data");
711                Some(session.with_ratchet(&old_session))
712            }
713        };
714
715        Ok(result)
716    }
717
718    #[cfg(test)]
719    /// Testing helper to allow to save only a set of devices
720    pub(crate) async fn save_device_data(&self, devices: &[DeviceData]) -> Result<()> {
721        use types::DeviceChanges;
722
723        let changes = Changes {
724            devices: DeviceChanges { changed: devices.to_vec(), ..Default::default() },
725            ..Default::default()
726        };
727
728        self.save_changes(changes).await
729    }
730
731    /// Convenience helper to persist an array of [`InboundGroupSession`]s.
732    pub(crate) async fn save_inbound_group_sessions(
733        &self,
734        sessions: &[InboundGroupSession],
735    ) -> Result<()> {
736        let changes = Changes { inbound_group_sessions: sessions.to_vec(), ..Default::default() };
737
738        self.save_changes(changes).await
739    }
740
741    /// Get the display name of our own device.
742    pub(crate) async fn device_display_name(&self) -> Result<Option<String>, CryptoStoreError> {
743        Ok(self
744            .inner
745            .store
746            .get_device(self.user_id(), self.device_id())
747            .await?
748            .and_then(|d| d.display_name().map(|d| d.to_owned())))
749    }
750
751    /// Get the device data for the given [`UserId`] and [`DeviceId`].
752    ///
753    /// *Note*: This method will include our own device which is always present
754    /// in the store.
755    pub(crate) async fn get_device_data(
756        &self,
757        user_id: &UserId,
758        device_id: &DeviceId,
759    ) -> Result<Option<DeviceData>> {
760        self.inner.store.get_device(user_id, device_id).await
761    }
762
763    /// Get the device data for the given [`UserId`] and [`DeviceId`].
764    ///
765    /// *Note*: This method will **not** include our own device.
766    ///
767    /// Use this method if you need a list of recipients for a given user, since
768    /// we don't want to encrypt for our own device, otherwise take a look at
769    /// the [`Store::get_device_data_for_user`] method.
770    pub(crate) async fn get_device_data_for_user_filtered(
771        &self,
772        user_id: &UserId,
773    ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
774        self.inner.store.get_user_devices(user_id).await.map(|mut d| {
775            if user_id == self.user_id() {
776                d.remove(self.device_id());
777            }
778            d
779        })
780    }
781
782    /// Get the [`DeviceData`] for all the devices a user has.
783    ///
784    /// *Note*: This method will include our own device which is always present
785    /// in the store.
786    ///
787    /// Use this method if you need to operate on or update all devices of a
788    /// user, otherwise take a look at the
789    /// [`Store::get_device_data_for_user_filtered`] method.
790    pub(crate) async fn get_device_data_for_user(
791        &self,
792        user_id: &UserId,
793    ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
794        self.inner.store.get_user_devices(user_id).await
795    }
796
797    /// Get a [`Device`] for the given user with the given
798    /// [`Curve25519PublicKey`] key.
799    ///
800    /// *Note*: This method will include our own device which is always present
801    /// in the store.
802    pub(crate) async fn get_device_from_curve_key(
803        &self,
804        user_id: &UserId,
805        curve_key: Curve25519PublicKey,
806    ) -> Result<Option<Device>> {
807        self.get_user_devices(user_id)
808            .await
809            .map(|d| d.devices().find(|d| d.curve25519_key() == Some(curve_key)))
810    }
811
812    /// Get all devices associated with the given [`UserId`].
813    ///
814    /// This method is more expensive than the
815    /// [`Store::get_device_data_for_user`] method, since a [`Device`]
816    /// requires the [`OwnUserIdentityData`] and the [`UserIdentityData`] of the
817    /// device owner to be fetched from the store as well.
818    ///
819    /// *Note*: This method will include our own device which is always present
820    /// in the store.
821    pub(crate) async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices> {
822        let devices = self.get_device_data_for_user(user_id).await?;
823
824        let own_identity = self
825            .inner
826            .store
827            .get_user_identity(self.user_id())
828            .await?
829            .and_then(|i| i.own().cloned());
830        let device_owner_identity = self.inner.store.get_user_identity(user_id).await?;
831
832        Ok(UserDevices {
833            inner: devices,
834            verification_machine: self.inner.verification_machine.clone(),
835            own_identity,
836            device_owner_identity,
837        })
838    }
839
840    /// Get a [`Device`] for the given user with the given [`DeviceId`].
841    ///
842    /// This method is more expensive than the [`Store::get_device_data`] method
843    /// since a [`Device`] requires the [`OwnUserIdentityData`] and the
844    /// [`UserIdentityData`] of the device owner to be fetched from the
845    /// store as well.
846    ///
847    /// *Note*: This method will include our own device which is always present
848    /// in the store.
849    pub(crate) async fn get_device(
850        &self,
851        user_id: &UserId,
852        device_id: &DeviceId,
853    ) -> Result<Option<Device>> {
854        if let Some(device_data) = self.inner.store.get_device(user_id, device_id).await? {
855            Ok(Some(self.wrap_device_data(device_data).await?))
856        } else {
857            Ok(None)
858        }
859    }
860
861    /// Create a new device using the supplied [`DeviceData`]. Normally we would
862    /// call [`Self::get_device`] to find an existing device inside this
863    /// store. Only call this if you have some existing DeviceData and want
864    /// to wrap it with the extra information provided by a [`Device`].
865    pub(crate) async fn wrap_device_data(&self, device_data: DeviceData) -> Result<Device> {
866        let own_identity = self
867            .inner
868            .store
869            .get_user_identity(self.user_id())
870            .await?
871            .and_then(|i| i.own().cloned());
872
873        let device_owner_identity =
874            self.inner.store.get_user_identity(device_data.user_id()).await?;
875
876        Ok(Device {
877            inner: device_data,
878            verification_machine: self.inner.verification_machine.clone(),
879            own_identity,
880            device_owner_identity,
881        })
882    }
883
884    ///  Get the Identity of `user_id`
885    pub(crate) async fn get_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
886        let own_identity = self
887            .inner
888            .store
889            .get_user_identity(self.user_id())
890            .await?
891            .and_then(as_variant!(UserIdentityData::Own));
892
893        Ok(self.inner.store.get_user_identity(user_id).await?.map(|i| {
894            UserIdentity::new(
895                self.clone(),
896                i,
897                self.inner.verification_machine.to_owned(),
898                own_identity,
899            )
900        }))
901    }
902
903    /// Try to export the secret with the given secret name.
904    ///
905    /// The exported secret will be encoded as unpadded base64. Returns `Null`
906    /// if the secret can't be found.
907    ///
908    /// # Arguments
909    ///
910    /// * `secret_name` - The name of the secret that should be exported.
911    pub async fn export_secret(
912        &self,
913        secret_name: &SecretName,
914    ) -> Result<Option<String>, CryptoStoreError> {
915        Ok(match secret_name {
916            SecretName::CrossSigningMasterKey
917            | SecretName::CrossSigningUserSigningKey
918            | SecretName::CrossSigningSelfSigningKey => {
919                self.inner.identity.lock().await.export_secret(secret_name).await
920            }
921            SecretName::RecoveryKey => {
922                if let Some(key) = self.load_backup_keys().await?.decryption_key {
923                    let exported = key.to_base64();
924                    Some(exported)
925                } else {
926                    None
927                }
928            }
929            name => {
930                warn!(secret = ?name, "Unknown secret was requested");
931                None
932            }
933        })
934    }
935
936    /// Export all the private cross signing keys we have.
937    ///
938    /// The export will contain the seed for the ed25519 keys as a unpadded
939    /// base64 encoded string.
940    ///
941    /// This method returns `None` if we don't have any private cross signing
942    /// keys.
943    pub async fn export_cross_signing_keys(
944        &self,
945    ) -> Result<Option<CrossSigningKeyExport>, CryptoStoreError> {
946        let master_key = self.export_secret(&SecretName::CrossSigningMasterKey).await?;
947        let self_signing_key = self.export_secret(&SecretName::CrossSigningSelfSigningKey).await?;
948        let user_signing_key = self.export_secret(&SecretName::CrossSigningUserSigningKey).await?;
949
950        Ok(if master_key.is_none() && self_signing_key.is_none() && user_signing_key.is_none() {
951            None
952        } else {
953            Some(CrossSigningKeyExport { master_key, self_signing_key, user_signing_key })
954        })
955    }
956
957    /// Import our private cross signing keys.
958    ///
959    /// The export needs to contain the seed for the Ed25519 keys as an unpadded
960    /// base64 encoded string.
961    pub async fn import_cross_signing_keys(
962        &self,
963        export: CrossSigningKeyExport,
964    ) -> Result<CrossSigningStatus, SecretImportError> {
965        if let Some(public_identity) =
966            self.get_identity(self.user_id()).await?.and_then(|i| i.own())
967        {
968            let identity = self.inner.identity.lock().await;
969
970            identity
971                .import_secrets(
972                    public_identity.to_owned(),
973                    export.master_key.as_deref(),
974                    export.self_signing_key.as_deref(),
975                    export.user_signing_key.as_deref(),
976                )
977                .await?;
978
979            let status = identity.status().await;
980
981            let diff = identity.get_public_identity_diff(&public_identity.inner).await;
982
983            let mut changes =
984                Changes { private_identity: Some(identity.clone()), ..Default::default() };
985
986            if diff.none_differ() {
987                public_identity.mark_as_verified();
988                changes.identities.changed.push(UserIdentityData::Own(public_identity.inner));
989            }
990
991            info!(?status, "Successfully imported the private cross-signing keys");
992
993            self.save_changes(changes).await?;
994        } else {
995            warn!(
996                "No public identity found while importing cross-signing keys, \
997                 a /keys/query needs to be done"
998            );
999        }
1000
1001        Ok(self.inner.identity.lock().await.status().await)
1002    }
1003
1004    /// Export all the secrets we have in the store into a [`SecretsBundle`].
1005    ///
1006    /// This method will export all the private cross-signing keys and, if
1007    /// available, the private part of a backup key and its accompanying
1008    /// version.
1009    ///
1010    /// The method will fail if we don't have all three private cross-signing
1011    /// keys available.
1012    ///
1013    /// **Warning**: Only export this and share it with a trusted recipient,
1014    /// i.e. if an existing device is sharing this with a new device.
1015    pub async fn export_secrets_bundle(&self) -> Result<SecretsBundle, SecretsBundleExportError> {
1016        let Some(cross_signing) = self.export_cross_signing_keys().await? else {
1017            return Err(SecretsBundleExportError::MissingCrossSigningKeys);
1018        };
1019
1020        let Some(master_key) = cross_signing.master_key.clone() else {
1021            return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::Master));
1022        };
1023
1024        let Some(user_signing_key) = cross_signing.user_signing_key.clone() else {
1025            return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::UserSigning));
1026        };
1027
1028        let Some(self_signing_key) = cross_signing.self_signing_key.clone() else {
1029            return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::SelfSigning));
1030        };
1031
1032        let backup_keys = self.load_backup_keys().await?;
1033
1034        let backup = if let Some(key) = backup_keys.decryption_key {
1035            if let Some(backup_version) = backup_keys.backup_version {
1036                Some(BackupSecrets::MegolmBackupV1Curve25519AesSha2(
1037                    MegolmBackupV1Curve25519AesSha2Secrets { key, backup_version },
1038                ))
1039            } else {
1040                return Err(SecretsBundleExportError::MissingBackupVersion);
1041            }
1042        } else {
1043            None
1044        };
1045
1046        Ok(SecretsBundle {
1047            cross_signing: CrossSigningSecrets { master_key, user_signing_key, self_signing_key },
1048            backup,
1049        })
1050    }
1051
1052    /// Import and persists secrets from a [`SecretsBundle`].
1053    ///
1054    /// This method will import all the private cross-signing keys and, if
1055    /// available, the private part of a backup key and its accompanying
1056    /// version into the store.
1057    ///
1058    /// **Warning**: Only import this from a trusted source, i.e. if an existing
1059    /// device is sharing this with a new device. The imported cross-signing
1060    /// keys will create a [`OwnUserIdentity`] and mark it as verified.
1061    ///
1062    /// The backup key will be persisted in the store and can be enabled using
1063    /// the [`BackupMachine`].
1064    pub async fn import_secrets_bundle(
1065        &self,
1066        bundle: &SecretsBundle,
1067    ) -> Result<(), SecretImportError> {
1068        let mut changes = Changes::default();
1069
1070        if let Some(backup_bundle) = &bundle.backup {
1071            match backup_bundle {
1072                BackupSecrets::MegolmBackupV1Curve25519AesSha2(bundle) => {
1073                    changes.backup_decryption_key = Some(bundle.key.clone());
1074                    changes.backup_version = Some(bundle.backup_version.clone());
1075                }
1076            }
1077        }
1078
1079        let identity = self.inner.identity.lock().await;
1080
1081        identity
1082            .import_secrets_unchecked(
1083                Some(&bundle.cross_signing.master_key),
1084                Some(&bundle.cross_signing.self_signing_key),
1085                Some(&bundle.cross_signing.user_signing_key),
1086            )
1087            .await?;
1088
1089        let public_identity = identity.to_public_identity().await.expect(
1090            "We should be able to create a new public identity since we just imported \
1091             all the private cross-signing keys",
1092        );
1093
1094        changes.private_identity = Some(identity.clone());
1095        changes.identities.new.push(UserIdentityData::Own(public_identity));
1096
1097        Ok(self.save_changes(changes).await?)
1098    }
1099
1100    /// Import the given `secret` named `secret_name` into the keystore.
1101    pub async fn import_secret(&self, secret: &GossippedSecret) -> Result<(), SecretImportError> {
1102        match &secret.secret_name {
1103            SecretName::CrossSigningMasterKey
1104            | SecretName::CrossSigningUserSigningKey
1105            | SecretName::CrossSigningSelfSigningKey => {
1106                if let Some(public_identity) =
1107                    self.get_identity(self.user_id()).await?.and_then(|i| i.own())
1108                {
1109                    let identity = self.inner.identity.lock().await;
1110
1111                    identity
1112                        .import_secret(
1113                            public_identity,
1114                            &secret.secret_name,
1115                            &secret.event.content.secret,
1116                        )
1117                        .await?;
1118                    info!(
1119                        secret_name = ?secret.secret_name,
1120                        "Successfully imported a private cross signing key"
1121                    );
1122
1123                    let changes =
1124                        Changes { private_identity: Some(identity.clone()), ..Default::default() };
1125
1126                    self.save_changes(changes).await?;
1127                }
1128            }
1129            SecretName::RecoveryKey => {
1130                // We don't import the decryption key here since we'll want to
1131                // check if the public key matches to the latest version on the
1132                // server. We instead put the secret into a secret inbox where
1133                // it will stay until it either gets overwritten
1134                // or the user accepts the secret.
1135            }
1136            name => {
1137                warn!(secret = ?name, "Tried to import an unknown secret");
1138            }
1139        }
1140
1141        Ok(())
1142    }
1143
1144    /// Check whether there is a global flag to only encrypt messages for
1145    /// trusted devices or for everyone.
1146    pub async fn get_only_allow_trusted_devices(&self) -> Result<bool> {
1147        let value = self.get_value("only_allow_trusted_devices").await?.unwrap_or_default();
1148        Ok(value)
1149    }
1150
1151    /// Set global flag whether to encrypt messages for untrusted devices, or
1152    /// whether they should be excluded from the conversation.
1153    pub async fn set_only_allow_trusted_devices(
1154        &self,
1155        block_untrusted_devices: bool,
1156    ) -> Result<()> {
1157        self.set_value("only_allow_trusted_devices", &block_untrusted_devices).await
1158    }
1159
1160    /// Get custom stored value associated with a key
1161    pub async fn get_value<T: DeserializeOwned>(&self, key: &str) -> Result<Option<T>> {
1162        let Some(value) = self.get_custom_value(key).await? else {
1163            return Ok(None);
1164        };
1165        let deserialized = self.deserialize_value(&value)?;
1166        Ok(Some(deserialized))
1167    }
1168
1169    /// Store custom value associated with a key
1170    pub async fn set_value(&self, key: &str, value: &impl Serialize) -> Result<()> {
1171        let serialized = self.serialize_value(value)?;
1172        self.set_custom_value(key, serialized).await?;
1173        Ok(())
1174    }
1175
1176    fn serialize_value(&self, value: &impl Serialize) -> Result<Vec<u8>> {
1177        let serialized =
1178            rmp_serde::to_vec_named(value).map_err(|x| CryptoStoreError::Backend(x.into()))?;
1179        Ok(serialized)
1180    }
1181
1182    fn deserialize_value<T: DeserializeOwned>(&self, value: &[u8]) -> Result<T> {
1183        let deserialized =
1184            rmp_serde::from_slice(value).map_err(|e| CryptoStoreError::Backend(e.into()))?;
1185        Ok(deserialized)
1186    }
1187
1188    /// Receive notifications of room keys being received as a [`Stream`].
1189    ///
1190    /// Each time a room key is updated in any way, an update will be sent to
1191    /// the stream. Updates that happen at the same time are batched into a
1192    /// [`Vec`].
1193    ///
1194    /// If the reader of the stream lags too far behind an error will be sent to
1195    /// the reader.
1196    ///
1197    /// The stream will terminate once all references to the underlying
1198    /// `CryptoStoreWrapper` are dropped.
1199    pub fn room_keys_received_stream(
1200        &self,
1201    ) -> impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<> {
1202        self.inner.store.room_keys_received_stream()
1203    }
1204
1205    /// Receive notifications of received `m.room_key.withheld` messages.
1206    ///
1207    /// Each time an `m.room_key.withheld` is received and stored, an update
1208    /// will be sent to the stream. Updates that happen at the same time are
1209    /// batched into a [`Vec`].
1210    ///
1211    /// If the reader of the stream lags too far behind, a warning will be
1212    /// logged and items will be dropped.
1213    pub fn room_keys_withheld_received_stream(
1214        &self,
1215    ) -> impl Stream<Item = Vec<RoomKeyWithheldInfo>> + use<> {
1216        self.inner.store.room_keys_withheld_received_stream()
1217    }
1218
1219    /// Returns a stream of user identity updates, allowing users to listen for
1220    /// notifications about new or changed user identities.
1221    ///
1222    /// The stream produced by this method emits updates whenever a new user
1223    /// identity is discovered or when an existing identities information is
1224    /// changed. Users can subscribe to this stream and receive updates in
1225    /// real-time.
1226    ///
1227    /// Caution: the returned stream will never terminate, and it holds a
1228    /// reference to the [`CryptoStore`]. Listeners should be careful to avoid
1229    /// resource leaks.
1230    ///
1231    /// # Examples
1232    ///
1233    /// ```no_run
1234    /// # use matrix_sdk_crypto::OlmMachine;
1235    /// # use ruma::{device_id, user_id};
1236    /// # use futures_util::{pin_mut, StreamExt};
1237    /// # let machine: OlmMachine = unimplemented!();
1238    /// # futures_executor::block_on(async {
1239    /// let identities_stream = machine.store().user_identities_stream();
1240    /// pin_mut!(identities_stream);
1241    ///
1242    /// for identity_updates in identities_stream.next().await {
1243    ///     for (_, identity) in identity_updates.new {
1244    ///         println!("A new identity has been added {}", identity.user_id());
1245    ///     }
1246    /// }
1247    /// # });
1248    /// ```
1249    pub fn user_identities_stream(&self) -> impl Stream<Item = IdentityUpdates> + use<> {
1250        let verification_machine = self.inner.verification_machine.to_owned();
1251
1252        let this = self.clone();
1253        self.inner.store.identities_stream().map(move |(own_identity, identities, _)| {
1254            let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
1255
1256            let map_identity = |(user_id, identity)| {
1257                (
1258                    user_id,
1259                    UserIdentity::new(
1260                        this.clone(),
1261                        identity,
1262                        verification_machine.to_owned(),
1263                        own_identity.to_owned(),
1264                    ),
1265                )
1266            };
1267
1268            let new = new_identities.into_iter().map(map_identity).collect();
1269            let changed = changed_identities.into_iter().map(map_identity).collect();
1270            let unchanged = unchanged_identities.into_iter().map(map_identity).collect();
1271
1272            IdentityUpdates { new, changed, unchanged }
1273        })
1274    }
1275
1276    /// Returns a stream of device updates, allowing users to listen for
1277    /// notifications about new or changed devices.
1278    ///
1279    /// The stream produced by this method emits updates whenever a new device
1280    /// is discovered or when an existing device's information is changed. Users
1281    /// can subscribe to this stream and receive updates in real-time.
1282    ///
1283    /// Caution: the returned stream will never terminate, and it holds a
1284    /// reference to the [`CryptoStore`]. Listeners should be careful to avoid
1285    /// resource leaks.
1286    ///
1287    /// # Examples
1288    ///
1289    /// ```no_run
1290    /// # use matrix_sdk_crypto::OlmMachine;
1291    /// # use ruma::{device_id, user_id};
1292    /// # use futures_util::{pin_mut, StreamExt};
1293    /// # let machine: OlmMachine = unimplemented!();
1294    /// # futures_executor::block_on(async {
1295    /// let devices_stream = machine.store().devices_stream();
1296    /// pin_mut!(devices_stream);
1297    ///
1298    /// for device_updates in devices_stream.next().await {
1299    ///     if let Some(user_devices) = device_updates.new.get(machine.user_id()) {
1300    ///         for device in user_devices.values() {
1301    ///             println!("A new device has been added {}", device.device_id());
1302    ///         }
1303    ///     }
1304    /// }
1305    /// # });
1306    /// ```
1307    pub fn devices_stream(&self) -> impl Stream<Item = DeviceUpdates> + use<> {
1308        let verification_machine = self.inner.verification_machine.to_owned();
1309
1310        self.inner.store.identities_stream().map(move |(own_identity, identities, devices)| {
1311            collect_device_updates(
1312                verification_machine.to_owned(),
1313                own_identity,
1314                identities,
1315                devices,
1316            )
1317        })
1318    }
1319
1320    /// Returns a [`Stream`] of user identity and device updates
1321    ///
1322    /// The stream returned by this method returns the same data as
1323    /// [`Store::user_identities_stream`] and [`Store::devices_stream`] but does
1324    /// not include references to the `VerificationMachine`. It is therefore a
1325    /// lower-level view on that data.
1326    ///
1327    /// The stream will terminate once all references to the underlying
1328    /// `CryptoStoreWrapper` are dropped.
1329    pub fn identities_stream_raw(
1330        &self,
1331    ) -> impl Stream<Item = (IdentityChanges, DeviceChanges)> + use<> {
1332        self.inner.store.identities_stream().map(|(_, identities, devices)| (identities, devices))
1333    }
1334
1335    /// Creates a [`CrossProcessLock`] for this store, that will contain the
1336    /// given key and value when hold.
1337    pub fn create_store_lock(
1338        &self,
1339        lock_key: String,
1340        config: CrossProcessLockConfig,
1341    ) -> CrossProcessLock<LockableCryptoStore> {
1342        self.inner.store.create_store_lock(lock_key, config)
1343    }
1344
1345    /// Receive notifications of gossipped secrets being received and stored in
1346    /// the secret inbox as a [`Stream`].
1347    ///
1348    /// The gossipped secrets are received using the `m.secret.send` event type
1349    /// and are guaranteed to have been received over a 1-to-1 Olm
1350    /// [`Session`] from a verified [`Device`].
1351    ///
1352    /// The [`GossippedSecret`] can also be later found in the secret inbox and
1353    /// retrieved using the [`CryptoStore::get_secrets_from_inbox()`] method.
1354    ///
1355    /// After a suitable secret of a certain type has been found it can be
1356    /// removed from the store
1357    /// using the [`CryptoStore::delete_secrets_from_inbox()`] method.
1358    ///
1359    /// The only secret this will currently broadcast is the
1360    /// `m.megolm_backup.v1`.
1361    ///
1362    /// If the reader of the stream lags too far behind, a warning will be
1363    /// logged and items will be dropped.
1364    ///
1365    /// # Examples
1366    ///
1367    /// ```no_run
1368    /// # use matrix_sdk_crypto::OlmMachine;
1369    /// # use ruma::{device_id, owned_user_id};
1370    /// # use futures_util::{pin_mut, StreamExt};
1371    /// # let alice = owned_user_id!("@alice:example.org");
1372    /// # futures_executor::block_on(async {
1373    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1374    ///
1375    /// let secret_stream = machine.store().secrets_stream();
1376    /// pin_mut!(secret_stream);
1377    ///
1378    /// for secret in secret_stream.next().await {
1379    ///     // Accept the secret if it's valid, then delete all the secrets of this type.
1380    ///     machine.store().delete_secrets_from_inbox(&secret.secret_name);
1381    /// }
1382    /// # });
1383    /// ```
1384    pub fn secrets_stream(&self) -> impl Stream<Item = GossippedSecret> + use<> {
1385        self.inner.store.secrets_stream()
1386    }
1387
1388    /// Receive notifications of historic room key bundles as a [`Stream`].
1389    ///
1390    /// Historic room key bundles are defined in [MSC4268](https://github.com/matrix-org/matrix-spec-proposals/pull/4268).
1391    ///
1392    /// Each time a historic room key bundle was received, an update will be
1393    /// sent to the stream. This stream can be used to accept historic room key
1394    /// bundles that arrive out of order, i.e. the bundle arrives after the
1395    /// user has already accepted a room invitation.
1396    ///
1397    /// # Examples
1398    ///
1399    /// ```no_run
1400    /// # use matrix_sdk_crypto::{
1401    /// #    OlmMachine,
1402    /// #    store::types::StoredRoomKeyBundleData,
1403    /// #    types::room_history::RoomKeyBundle
1404    /// # };
1405    /// # use ruma::{device_id, owned_user_id};
1406    /// # use futures_util::{pin_mut, StreamExt};
1407    /// # let alice = owned_user_id!("@alice:example.org");
1408    /// # async {
1409    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1410    /// let bundle_stream = machine.store().historic_room_key_stream();
1411    /// pin_mut!(bundle_stream);
1412    ///
1413    /// while let Some(bundle_info) = bundle_stream.next().await {
1414    ///     // Try to find the bundle content in the store and if it's valid accept it.
1415    ///     if let Some(bundle_data) = machine.store().get_received_room_key_bundle_data(&bundle_info.room_id, &bundle_info.sender).await? {
1416    ///         // Download the bundle now and import it.
1417    ///         let bundle: RoomKeyBundle = todo!("Download the bundle");
1418    ///         machine.store().receive_room_key_bundle(
1419    ///             &bundle_data,
1420    ///             bundle,
1421    ///             |_, _| {},
1422    ///         ).await?;
1423    ///     }
1424    /// }
1425    /// # anyhow::Ok(()) };
1426    /// ```
1427    pub fn historic_room_key_stream(&self) -> impl Stream<Item = RoomKeyBundleInfo> + use<> {
1428        self.inner.store.historic_room_key_stream()
1429    }
1430
1431    /// Import the given room keys into the store.
1432    ///
1433    /// # Arguments
1434    ///
1435    /// * `exported_keys` - The keys to be imported.
1436    /// * `from_backup_version` - If the keys came from key backup, the key
1437    ///   backup version. This will cause the keys to be marked as already
1438    ///   backed up, and therefore not requiring another backup.
1439    /// * `progress_listener` - Callback which will be called after each key is
1440    ///   processed. Called with arguments `(processed, total)` where
1441    ///   `processed` is the number of keys processed so far, and `total` is the
1442    ///   total number of keys (i.e., `exported_keys.len()`).
1443    pub async fn import_room_keys(
1444        &self,
1445        exported_keys: Vec<ExportedRoomKey>,
1446        from_backup_version: Option<&str>,
1447        progress_listener: impl Fn(usize, usize),
1448    ) -> Result<RoomKeyImportResult> {
1449        let exported_keys = exported_keys.iter().filter_map(|key| {
1450            key.try_into()
1451                .map_err(|e| {
1452                    warn!(
1453                        sender_key = key.sender_key().to_base64(),
1454                        room_id = ?key.room_id(),
1455                        session_id = key.session_id(),
1456                        error = ?e,
1457                        "Couldn't import a room key from a file export."
1458                    );
1459                })
1460                .ok()
1461        });
1462        self.import_sessions_impl(exported_keys, from_backup_version, progress_listener).await
1463    }
1464
1465    /// Import the given room keys into our store.
1466    ///
1467    /// # Arguments
1468    ///
1469    /// * `exported_keys` - A list of previously exported keys that should be
1470    ///   imported into our store. If we already have a better version of a key
1471    ///   the key will *not* be imported.
1472    ///
1473    /// Returns a tuple of numbers that represent the number of sessions that
1474    /// were imported and the total number of sessions that were found in the
1475    /// key export.
1476    ///
1477    /// # Examples
1478    ///
1479    /// ```no_run
1480    /// # use std::io::Cursor;
1481    /// # use matrix_sdk_crypto::{OlmMachine, decrypt_room_key_export};
1482    /// # use ruma::{device_id, user_id};
1483    /// # let alice = user_id!("@alice:example.org");
1484    /// # async {
1485    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1486    /// # let export = Cursor::new("".to_owned());
1487    /// let exported_keys = decrypt_room_key_export(export, "1234").unwrap();
1488    /// machine.store().import_exported_room_keys(exported_keys, |_, _| {}).await.unwrap();
1489    /// # };
1490    /// ```
1491    pub async fn import_exported_room_keys(
1492        &self,
1493        exported_keys: Vec<ExportedRoomKey>,
1494        progress_listener: impl Fn(usize, usize),
1495    ) -> Result<RoomKeyImportResult> {
1496        self.import_room_keys(exported_keys, None, progress_listener).await
1497    }
1498
1499    async fn import_sessions_impl(
1500        &self,
1501        sessions: impl Iterator<Item = InboundGroupSession>,
1502        from_backup_version: Option<&str>,
1503        progress_listener: impl Fn(usize, usize),
1504    ) -> Result<RoomKeyImportResult> {
1505        let sessions: Vec<_> = sessions.collect();
1506        let mut imported_sessions = Vec::new();
1507
1508        let total_count = sessions.len();
1509        let mut keys = BTreeMap::new();
1510
1511        for (i, session) in sessions.into_iter().enumerate() {
1512            // Only import the session if we didn't have this session or
1513            // if it's a better version of the same session.
1514            if let Some(merged) = self.merge_received_group_session(session).await? {
1515                if from_backup_version.is_some() {
1516                    merged.mark_as_backed_up();
1517                }
1518
1519                keys.entry(merged.room_id().to_owned())
1520                    .or_insert_with(BTreeMap::new)
1521                    .entry(merged.sender_key().to_base64())
1522                    .or_insert_with(BTreeSet::new)
1523                    .insert(merged.session_id().to_owned());
1524
1525                imported_sessions.push(merged);
1526            }
1527
1528            progress_listener(i, total_count);
1529        }
1530
1531        let imported_count = imported_sessions.len();
1532
1533        self.inner
1534            .store
1535            .save_inbound_group_sessions(imported_sessions, from_backup_version)
1536            .await?;
1537
1538        info!(total_count, imported_count, room_keys = ?keys, "Successfully imported room keys");
1539
1540        Ok(RoomKeyImportResult::new(imported_count, total_count, keys))
1541    }
1542
1543    pub(crate) fn crypto_store(&self) -> Arc<CryptoStoreWrapper> {
1544        self.inner.store.clone()
1545    }
1546
1547    /// Export the keys that match the given predicate.
1548    ///
1549    /// # Arguments
1550    ///
1551    /// * `predicate` - A closure that will be called for every known
1552    ///   `InboundGroupSession`, which represents a room key. If the closure
1553    ///   returns `true` the `InboundGroupSession` will be included in the
1554    ///   export, if the closure returns `false` it will not be included.
1555    ///
1556    /// # Examples
1557    ///
1558    /// ```no_run
1559    /// # use matrix_sdk_crypto::{OlmMachine, encrypt_room_key_export};
1560    /// # use ruma::{device_id, user_id, room_id};
1561    /// # let alice = user_id!("@alice:example.org");
1562    /// # async {
1563    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1564    /// let room_id = room_id!("!test:localhost");
1565    /// let exported_keys = machine.store().export_room_keys(|s| s.room_id() == room_id).await.unwrap();
1566    /// let encrypted_export = encrypt_room_key_export(&exported_keys, "1234", 1);
1567    /// # };
1568    /// ```
1569    pub async fn export_room_keys(
1570        &self,
1571        predicate: impl FnMut(&InboundGroupSession) -> bool,
1572    ) -> Result<Vec<ExportedRoomKey>> {
1573        let mut exported = Vec::new();
1574
1575        let mut sessions = self.get_inbound_group_sessions().await?;
1576        sessions.retain(predicate);
1577
1578        for session in sessions {
1579            let export = session.export().await;
1580            exported.push(export);
1581        }
1582
1583        Ok(exported)
1584    }
1585
1586    /// Export room keys matching a predicate, providing them as an async
1587    /// `Stream`.
1588    ///
1589    /// # Arguments
1590    ///
1591    /// * `predicate` - A closure that will be called for every known
1592    ///   `InboundGroupSession`, which represents a room key. If the closure
1593    ///   returns `true` the `InboundGroupSession` will be included in the
1594    ///   export, if the closure returns `false` it will not be included.
1595    ///
1596    /// # Examples
1597    ///
1598    /// ```no_run
1599    /// use std::pin::pin;
1600    ///
1601    /// use matrix_sdk_crypto::{OlmMachine, olm::ExportedRoomKey};
1602    /// use ruma::{device_id, room_id, user_id};
1603    /// use tokio_stream::StreamExt;
1604    /// # async {
1605    /// let alice = user_id!("@alice:example.org");
1606    /// let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1607    /// let room_id = room_id!("!test:localhost");
1608    /// let mut keys = pin!(
1609    ///     machine
1610    ///         .store()
1611    ///         .export_room_keys_stream(|s| s.room_id() == room_id)
1612    ///         .await
1613    ///         .unwrap()
1614    /// );
1615    /// while let Some(key) = keys.next().await {
1616    ///     println!("{}", key.room_id);
1617    /// }
1618    /// # };
1619    /// ```
1620    pub async fn export_room_keys_stream(
1621        &self,
1622        predicate: impl FnMut(&InboundGroupSession) -> bool,
1623    ) -> Result<impl Stream<Item = ExportedRoomKey>> {
1624        // TODO: if/when there is a get_inbound_group_sessions_stream, use that here.
1625        let sessions = self.get_inbound_group_sessions().await?;
1626        Ok(futures_util::stream::iter(sessions.into_iter().filter(predicate))
1627            .then(|session| async move { session.export().await }))
1628    }
1629
1630    /// Assemble a room key bundle for sharing encrypted history, as per
1631    /// [MSC4268].
1632    ///
1633    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
1634    pub async fn build_room_key_bundle(
1635        &self,
1636        room_id: &RoomId,
1637    ) -> std::result::Result<RoomKeyBundle, CryptoStoreError> {
1638        let sessions = self.get_inbound_group_sessions_by_room_id(room_id).await?;
1639
1640        let mut bundle = RoomKeyBundle::default();
1641        for session in sessions {
1642            if session.shared_history() {
1643                bundle.room_keys.push(session.export().await.into());
1644            } else {
1645                bundle.withheld.push(RoomKeyWithheldContent::new(
1646                    session.algorithm().to_owned(),
1647                    WithheldCode::HistoryNotShared,
1648                    session.room_id().to_owned(),
1649                    session.session_id().to_owned(),
1650                    session.sender_key().to_owned(),
1651                    self.device_id().to_owned(),
1652                ));
1653            }
1654        }
1655
1656        // If we received a key bundle ourselves, in which one or more sessions was
1657        // marked as "history not shared", pass that on to the new user.
1658        let withhelds = self.get_withheld_sessions_by_room_id(room_id).await?;
1659        for withheld in withhelds {
1660            if withheld.content.withheld_code() == WithheldCode::HistoryNotShared {
1661                bundle.withheld.push(withheld.content);
1662            }
1663        }
1664
1665        Ok(bundle)
1666    }
1667
1668    /// Import the contents of a downloaded and decrypted [MSC4268] key bundle.
1669    ///
1670    /// # Arguments
1671    ///
1672    /// * `bundle_info` - The [`StoredRoomKeyBundleData`] of the bundle that is
1673    ///   being received.
1674    /// * `bundle` - The decrypted and deserialized bundle itself.
1675    ///
1676    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
1677    #[instrument(skip(self, bundle, progress_listener), fields(bundle_size = bundle.room_keys.len(), sender_data))]
1678    pub async fn receive_room_key_bundle(
1679        &self,
1680        bundle_info: &StoredRoomKeyBundleData,
1681        bundle: RoomKeyBundle,
1682        progress_listener: impl Fn(usize, usize),
1683    ) -> Result<(), CryptoStoreError> {
1684        let sender_data = if bundle_info.sender_data.should_recalculate() {
1685            let device = self
1686                .get_device_from_curve_key(&bundle_info.sender_user, bundle_info.sender_key)
1687                .await?;
1688
1689            device
1690                .as_ref()
1691                .map(SenderData::from_device)
1692                .unwrap_or_else(|| bundle_info.sender_data.clone())
1693        } else {
1694            bundle_info.sender_data.clone()
1695        };
1696
1697        tracing::Span::current().record("sender_data", tracing::field::debug(&sender_data));
1698
1699        // The sender's device must be either `SenderData::SenderUnverified` (i.e.,
1700        // TOFU-trusted) or `SenderData::SenderVerified` (i.e., fully verified
1701        // via user verification and cross-signing).
1702        let Ok(forwarder_data) = (&sender_data).try_into() else {
1703            warn!(
1704                "Not accepting a historic room key bundle due to insufficient trust in the sender"
1705            );
1706            return Ok(());
1707        };
1708
1709        self.import_room_key_bundle_sessions(
1710            bundle_info,
1711            &bundle,
1712            &forwarder_data,
1713            progress_listener,
1714        )
1715        .await?;
1716        self.import_room_key_bundle_withheld_info(bundle_info, &bundle).await?;
1717
1718        Ok(())
1719    }
1720
1721    async fn import_room_key_bundle_sessions(
1722        &self,
1723        bundle_info: &StoredRoomKeyBundleData,
1724        bundle: &RoomKeyBundle,
1725        forwarder_data: &ForwarderData,
1726        progress_listener: impl Fn(usize, usize),
1727    ) -> Result<(), CryptoStoreError> {
1728        let (good, bad): (Vec<_>, Vec<_>) = bundle.room_keys.iter().partition_map(|key| {
1729            if key.room_id != bundle_info.bundle_data.room_id {
1730                trace!("Ignoring key for incorrect room {} in bundle", key.room_id);
1731                Either::Right(key)
1732            } else {
1733                Either::Left(key)
1734            }
1735        });
1736
1737        match (bad.is_empty(), good.is_empty()) {
1738            // Case 1: Completely empty bundle.
1739            (true, true) => {
1740                warn!("Received a completely empty room key bundle");
1741            }
1742
1743            // Case 2: A bundle for the wrong room.
1744            (false, true) => {
1745                let bad_keys: Vec<_> =
1746                    bad.iter().map(|&key| (&key.room_id, &key.session_id)).collect();
1747
1748                warn!(
1749                    ?bad_keys,
1750                    "Received a room key bundle for the wrong room, ignoring all room keys from the bundle"
1751                );
1752            }
1753
1754            // Case 3: A bundle containing useful room keys.
1755            (_, false) => {
1756                // We have at least some good keys, if we also have some bad ones let's
1757                // mention that here.
1758                if !bad.is_empty() {
1759                    warn!(
1760                        bad_key_count = bad.len(),
1761                        "The room key bundle contained some room keys \
1762                         that were meant for a different room"
1763                    );
1764                }
1765
1766                let keys = good.iter().filter_map(|key| {
1767                    key.try_into_inbound_group_session(forwarder_data)
1768                        .map_err(|e| {
1769                            warn!(
1770                                sender_key = ?key.sender_key().to_base64(),
1771                                room_id = ?key.room_id(),
1772                                session_id = key.session_id(),
1773                                error = ?e,
1774                                "Couldn't import a room key from a key bundle."
1775                            );
1776                        })
1777                        .ok()
1778                });
1779
1780                self.import_sessions_impl(keys, None, progress_listener).await?;
1781            }
1782        }
1783
1784        Ok(())
1785    }
1786
1787    async fn import_room_key_bundle_withheld_info(
1788        &self,
1789        bundle_info: &StoredRoomKeyBundleData,
1790        bundle: &RoomKeyBundle,
1791    ) -> Result<(), CryptoStoreError> {
1792        let mut session_id_to_withheld_code_map = BTreeMap::new();
1793
1794        let mut changes = Changes::default();
1795        for withheld in &bundle.withheld {
1796            let (room_id, session_id) = match withheld {
1797                RoomKeyWithheldContent::MegolmV1AesSha2(c) => match (c.room_id(), c.session_id()) {
1798                    (Some(room_id), Some(session_id)) => (room_id, session_id),
1799                    _ => continue,
1800                },
1801                #[cfg(feature = "experimental-algorithms")]
1802                RoomKeyWithheldContent::MegolmV2AesSha2(c) => match (c.room_id(), c.session_id()) {
1803                    (Some(room_id), Some(session_id)) => (room_id, session_id),
1804                    _ => continue,
1805                },
1806                RoomKeyWithheldContent::Unknown(_) => continue,
1807            };
1808
1809            if room_id != bundle_info.bundle_data.room_id {
1810                trace!("Ignoring withheld info for incorrect room {} in bundle", room_id);
1811                continue;
1812            }
1813
1814            changes.withheld_session_info.entry(room_id.to_owned()).or_default().insert(
1815                session_id.to_owned(),
1816                RoomKeyWithheldEntry {
1817                    sender: bundle_info.sender_user.clone(),
1818                    content: withheld.to_owned(),
1819                },
1820            );
1821            session_id_to_withheld_code_map.insert(session_id, withheld.withheld_code());
1822        }
1823
1824        self.save_changes(changes).await?;
1825
1826        info!(
1827            room_id = ?bundle_info.bundle_data.room_id,
1828            ?session_id_to_withheld_code_map,
1829            "Successfully imported withheld info from room key bundle",
1830        );
1831
1832        Ok(())
1833    }
1834
1835    /// Store the fact that we have accepted an invite for a given room on this
1836    /// client, so should accept an [MSC4268] key bundle if one arrives
1837    /// soon.
1838    ///
1839    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
1840    pub async fn store_room_pending_key_bundle(
1841        &self,
1842        room_id: &RoomId,
1843        inviter: &UserId,
1844    ) -> Result<(), CryptoStoreError> {
1845        let invite_accepted_at = MilliSecondsSinceUnixEpoch::now();
1846        self.save_changes(Changes {
1847            rooms_pending_key_bundle: HashMap::from([(
1848                room_id.to_owned(),
1849                Some(RoomPendingKeyBundleDetails {
1850                    room_id: room_id.to_owned(),
1851                    invite_accepted_at,
1852                    inviter: inviter.to_owned(),
1853                }),
1854            )]),
1855            ..Default::default()
1856        })
1857        .await?;
1858
1859        Ok(())
1860    }
1861
1862    /// Clear the record of accepting an invite for a specific room.
1863    ///
1864    /// The counterpart of [`Store::store_room_pending_key_bundle`].
1865    pub async fn clear_room_pending_key_bundle(&self, room_id: &RoomId) -> Result<()> {
1866        self.save_changes(Changes {
1867            rooms_pending_key_bundle: HashMap::from([(room_id.to_owned(), None)]),
1868            ..Default::default()
1869        })
1870        .await
1871    }
1872}
1873
1874impl Deref for Store {
1875    type Target = DynCryptoStore;
1876
1877    fn deref(&self) -> &Self::Target {
1878        self.inner.store.deref().deref()
1879    }
1880}
1881
1882/// A crypto store that implements primitives for cross-process locking.
1883#[derive(Clone, Debug)]
1884pub struct LockableCryptoStore(Arc<dyn CryptoStore<Error = CryptoStoreError>>);
1885
1886impl matrix_sdk_common::cross_process_lock::TryLock for LockableCryptoStore {
1887    type LockError = CryptoStoreError;
1888
1889    async fn try_lock(
1890        &self,
1891        lease_duration_ms: u32,
1892        key: &str,
1893        holder: &str,
1894    ) -> std::result::Result<Option<CrossProcessLockGeneration>, Self::LockError> {
1895        self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
1896    }
1897}
1898
1899#[cfg(test)]
1900mod tests {
1901    use std::{collections::BTreeMap, pin::pin};
1902
1903    use assert_matches2::{assert_let, assert_matches};
1904    use futures_util::StreamExt;
1905    use insta::{_macro_support::Content, assert_json_snapshot, internals::ContentPath};
1906    use matrix_sdk_test::async_test;
1907    use ruma::{
1908        RoomId, device_id,
1909        events::room::{EncryptedFileInit, JsonWebKeyInit},
1910        owned_device_id, owned_mxc_uri, room_id,
1911        serde::Base64,
1912        user_id,
1913    };
1914    use serde_json::json;
1915    use vodozemac::{Ed25519Keypair, megolm::SessionKey};
1916
1917    use crate::{
1918        Account, OlmMachine,
1919        machine::test_helpers::get_machine_pair,
1920        olm::{InboundGroupSession, SenderData},
1921        store::types::{DehydratedDeviceKey, RoomKeyWithheldEntry, StoredRoomKeyBundleData},
1922        types::{
1923            EventEncryptionAlgorithm,
1924            events::{
1925                room_key_bundle::RoomKeyBundleContent,
1926                room_key_withheld::{MegolmV1AesSha2WithheldContent, RoomKeyWithheldContent},
1927            },
1928        },
1929    };
1930
1931    #[async_test]
1932    async fn test_merge_received_group_session() {
1933        let alice_account = Account::with_device_id(user_id!("@a:s.co"), device_id!("ABC"));
1934        let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("DEF")).await;
1935
1936        let room_id = room_id!("!test:localhost");
1937
1938        let megolm_signing_key = Ed25519Keypair::new();
1939        let inbound = make_inbound_group_session(&alice_account, &megolm_signing_key, room_id);
1940
1941        // Bob already knows about the session, at index 5, with the device keys.
1942        let mut inbound_at_index_5 =
1943            InboundGroupSession::from_export(&inbound.export_at_index(5).await).unwrap();
1944        inbound_at_index_5.sender_data = inbound.sender_data.clone();
1945        bob.store().save_inbound_group_sessions(&[inbound_at_index_5.clone()]).await.unwrap();
1946
1947        // No changes if we get a disconnected session.
1948        let disconnected = make_inbound_group_session(&alice_account, &megolm_signing_key, room_id);
1949        assert_eq!(bob.store().merge_received_group_session(disconnected).await.unwrap(), None);
1950
1951        // No changes needed when we receive a worse copy of the session
1952        let mut worse =
1953            InboundGroupSession::from_export(&inbound.export_at_index(10).await).unwrap();
1954        worse.sender_data = inbound.sender_data.clone();
1955        assert_eq!(bob.store().merge_received_group_session(worse).await.unwrap(), None);
1956
1957        // Nor when we receive an exact copy of what we already have
1958        let mut copy = InboundGroupSession::from_pickle(inbound_at_index_5.pickle().await).unwrap();
1959        copy.sender_data = inbound.sender_data.clone();
1960        assert_eq!(bob.store().merge_received_group_session(copy).await.unwrap(), None);
1961
1962        // But when we receive a better copy of the session, we should get it back
1963        let mut better =
1964            InboundGroupSession::from_export(&inbound.export_at_index(0).await).unwrap();
1965        better.sender_data = inbound.sender_data.clone();
1966        assert_let!(Some(update) = bob.store().merge_received_group_session(better).await.unwrap());
1967        assert_eq!(update.first_known_index(), 0);
1968
1969        // A worse copy of the ratchet, but better trust data
1970        {
1971            let mut worse_ratchet_better_trust =
1972                InboundGroupSession::from_export(&inbound.export_at_index(10).await).unwrap();
1973            let updated_sender_data = SenderData::sender_verified(
1974                alice_account.user_id(),
1975                alice_account.device_id(),
1976                Ed25519Keypair::new().public_key(),
1977            );
1978            worse_ratchet_better_trust.sender_data = updated_sender_data.clone();
1979            assert_let!(
1980                Some(update) = bob
1981                    .store()
1982                    .merge_received_group_session(worse_ratchet_better_trust)
1983                    .await
1984                    .unwrap()
1985            );
1986            assert_eq!(update.sender_data, updated_sender_data);
1987            assert_eq!(update.first_known_index(), 5);
1988            assert_eq!(
1989                update.export_at_index(0).await.session_key.to_bytes(),
1990                inbound.export_at_index(5).await.session_key.to_bytes()
1991            );
1992        }
1993
1994        // A better copy of the ratchet, but worse trust data
1995        {
1996            let mut better_ratchet_worse_trust =
1997                InboundGroupSession::from_export(&inbound.export_at_index(0).await).unwrap();
1998            let updated_sender_data = SenderData::unknown();
1999            better_ratchet_worse_trust.sender_data = updated_sender_data.clone();
2000            assert_let!(
2001                Some(update) = bob
2002                    .store()
2003                    .merge_received_group_session(better_ratchet_worse_trust)
2004                    .await
2005                    .unwrap()
2006            );
2007            assert_eq!(update.sender_data, inbound.sender_data);
2008            assert_eq!(update.first_known_index(), 0);
2009            assert_eq!(
2010                update.export_at_index(0).await.session_key.to_bytes(),
2011                inbound.export_at_index(0).await.session_key.to_bytes()
2012            );
2013        }
2014    }
2015
2016    /// Create an [`InboundGroupSession`] for the given room, using the given
2017    /// Ed25519 key as the signing key/session ID.
2018    fn make_inbound_group_session(
2019        sender_account: &Account,
2020        signing_key: &Ed25519Keypair,
2021        room_id: &RoomId,
2022    ) -> InboundGroupSession {
2023        InboundGroupSession::new(
2024            sender_account.identity_keys.curve25519,
2025            sender_account.identity_keys.ed25519,
2026            room_id,
2027            &make_session_key(signing_key),
2028            SenderData::device_info(crate::types::DeviceKeys::new(
2029                sender_account.user_id().to_owned(),
2030                sender_account.device_id().to_owned(),
2031                vec![],
2032                BTreeMap::new(),
2033                crate::types::Signatures::new(),
2034            )),
2035            None,
2036            EventEncryptionAlgorithm::MegolmV1AesSha2,
2037            Some(ruma::events::room::history_visibility::HistoryVisibility::Shared),
2038            true,
2039        )
2040        .unwrap()
2041    }
2042
2043    /// Make a Megolm [`SessionKey`] using the given Ed25519 key as a signing
2044    /// key/session ID.
2045    fn make_session_key(signing_key: &Ed25519Keypair) -> SessionKey {
2046        use rand::Rng;
2047
2048        // `SessionKey::new` is not public, so the easiest way to construct a Megolm
2049        // session using a known Ed25519 key is to build a byte array in the export
2050        // format.
2051
2052        let mut session_key_bytes = vec![0u8; 229];
2053        // 0: version
2054        session_key_bytes[0] = 2;
2055        // 1..5: index
2056        // 5..133: ratchet key
2057        rand::thread_rng().fill(&mut session_key_bytes[5..133]);
2058        // 133..165: public ed25519 key
2059        session_key_bytes[133..165].copy_from_slice(signing_key.public_key().as_bytes());
2060        // 165..229: signature
2061        let sig = signing_key.sign(&session_key_bytes[0..165]);
2062        session_key_bytes[165..229].copy_from_slice(&sig.to_bytes());
2063
2064        SessionKey::from_bytes(&session_key_bytes).unwrap()
2065    }
2066
2067    #[async_test]
2068    async fn test_import_room_keys_notifies_stream() {
2069        use futures_util::FutureExt;
2070
2071        let (alice, bob, _) =
2072            get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2073
2074        let room1_id = room_id!("!room1:localhost");
2075        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2076        let exported_sessions = alice.store().export_room_keys(|_| true).await.unwrap();
2077
2078        let mut room_keys_received_stream = Box::pin(bob.store().room_keys_received_stream());
2079        bob.store().import_room_keys(exported_sessions, None, |_, _| {}).await.unwrap();
2080
2081        let room_keys = room_keys_received_stream
2082            .next()
2083            .now_or_never()
2084            .flatten()
2085            .expect("We should have received an update of room key infos")
2086            .unwrap();
2087        assert_eq!(room_keys.len(), 1);
2088        assert_eq!(room_keys[0].room_id, "!room1:localhost");
2089    }
2090
2091    #[async_test]
2092    async fn test_export_room_keys_provides_selected_keys() {
2093        // Given an OlmMachine with room keys in it
2094        let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2095        let room1_id = room_id!("!room1:localhost");
2096        let room2_id = room_id!("!room2:localhost");
2097        let room3_id = room_id!("!room3:localhost");
2098        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2099        alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2100        alice.create_outbound_group_session_with_defaults_test_helper(room3_id).await.unwrap();
2101
2102        // When I export some of the keys
2103        let keys = alice
2104            .store()
2105            .export_room_keys(|s| s.room_id() == room2_id || s.room_id() == room3_id)
2106            .await
2107            .unwrap();
2108
2109        // Then the requested keys were provided
2110        assert_eq!(keys.len(), 2);
2111        assert_eq!(keys[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2112        assert_eq!(keys[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2113        assert_eq!(keys[0].room_id, "!room2:localhost");
2114        assert_eq!(keys[1].room_id, "!room3:localhost");
2115        assert_eq!(keys[0].session_key.to_base64().len(), 220);
2116        assert_eq!(keys[1].session_key.to_base64().len(), 220);
2117    }
2118
2119    #[async_test]
2120    async fn test_export_room_keys_stream_can_provide_all_keys() {
2121        // Given an OlmMachine with room keys in it
2122        let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2123        let room1_id = room_id!("!room1:localhost");
2124        let room2_id = room_id!("!room2:localhost");
2125        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2126        alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2127
2128        // When I export the keys as a stream
2129        let mut keys = pin!(alice.store().export_room_keys_stream(|_| true).await.unwrap());
2130
2131        // And collect them
2132        let mut collected = vec![];
2133        while let Some(key) = keys.next().await {
2134            collected.push(key);
2135        }
2136
2137        // Then all the keys were provided
2138        assert_eq!(collected.len(), 2);
2139        assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2140        assert_eq!(collected[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2141        assert_eq!(collected[0].room_id, "!room1:localhost");
2142        assert_eq!(collected[1].room_id, "!room2:localhost");
2143        assert_eq!(collected[0].session_key.to_base64().len(), 220);
2144        assert_eq!(collected[1].session_key.to_base64().len(), 220);
2145    }
2146
2147    #[async_test]
2148    async fn test_export_room_keys_stream_can_provide_a_subset_of_keys() {
2149        // Given an OlmMachine with room keys in it
2150        let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2151        let room1_id = room_id!("!room1:localhost");
2152        let room2_id = room_id!("!room2:localhost");
2153        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2154        alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2155
2156        // When I export the keys as a stream
2157        let mut keys =
2158            pin!(alice.store().export_room_keys_stream(|s| s.room_id() == room1_id).await.unwrap());
2159
2160        // And collect them
2161        let mut collected = vec![];
2162        while let Some(key) = keys.next().await {
2163            collected.push(key);
2164        }
2165
2166        // Then all the keys matching our predicate were provided, and no others
2167        assert_eq!(collected.len(), 1);
2168        assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2169        assert_eq!(collected[0].room_id, "!room1:localhost");
2170        assert_eq!(collected[0].session_key.to_base64().len(), 220);
2171    }
2172
2173    #[async_test]
2174    async fn test_export_secrets_bundle() {
2175        let user_id = user_id!("@alice:example.com");
2176        let (first, second, _) = get_machine_pair(user_id, user_id, false).await;
2177
2178        let _ = first
2179            .bootstrap_cross_signing(false)
2180            .await
2181            .expect("We should be able to bootstrap cross-signing");
2182
2183        let bundle = first.store().export_secrets_bundle().await.expect(
2184            "We should be able to export the secrets bundle, now that we \
2185             have the cross-signing keys",
2186        );
2187
2188        assert!(bundle.backup.is_none(), "The bundle should not contain a backup key");
2189
2190        second
2191            .store()
2192            .import_secrets_bundle(&bundle)
2193            .await
2194            .expect("We should be able to import the secrets bundle");
2195
2196        let status = second.cross_signing_status().await;
2197        let identity = second.get_identity(user_id, None).await.unwrap().unwrap().own().unwrap();
2198
2199        assert!(identity.is_verified(), "The public identity should be marked as verified.");
2200
2201        assert!(status.is_complete(), "We should have imported all the cross-signing keys");
2202    }
2203
2204    #[async_test]
2205    async fn test_create_dehydrated_device_key() {
2206        let pickle_key = DehydratedDeviceKey::new()
2207            .expect("Should be able to create a random dehydrated device key");
2208
2209        let to_vec = pickle_key.inner.to_vec();
2210        let pickle_key_from_slice = DehydratedDeviceKey::from_slice(to_vec.as_slice())
2211            .expect("Should be able to create a dehydrated device key from slice");
2212
2213        assert_eq!(pickle_key_from_slice.to_base64(), pickle_key.to_base64());
2214    }
2215
2216    #[async_test]
2217    async fn test_create_dehydrated_errors() {
2218        let too_small = [0u8; 22];
2219        let pickle_key = DehydratedDeviceKey::from_slice(&too_small);
2220
2221        assert!(pickle_key.is_err());
2222
2223        let too_big = [0u8; 40];
2224        let pickle_key = DehydratedDeviceKey::from_slice(&too_big);
2225
2226        assert!(pickle_key.is_err());
2227    }
2228
2229    #[async_test]
2230    async fn test_build_room_key_bundle() {
2231        // Given: Alice has sent a number of room keys to Bob, including some in the
2232        // wrong room, and some that are not marked as shared...
2233        let alice = OlmMachine::new(user_id!("@a:s.co"), device_id!("ALICE")).await;
2234        let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("BOB")).await;
2235
2236        let room1_id = room_id!("!room1:localhost");
2237        let room2_id = room_id!("!room2:localhost");
2238
2239        /* We use hardcoded megolm session data, to get a stable output snapshot. These were all created with:
2240
2241           println!("{}", vodozemac::megolm::GroupSession::new(Default::default()).session_key().to_base64());
2242        */
2243        let session_key1 = "AgAAAAC2XHVzsMBKs4QCRElJ92CJKyGtknCSC8HY7cQ7UYwndMKLQAejXLh5UA0l6s736mgctcUMNvELScUWrObdflrHo+vth/gWreXOaCnaSxmyjjKErQwyIYTkUfqbHy40RJfEesLwnN23on9XAkch/iy8R2+Jz7B8zfG01f2Ow2SxPQFnAndcO1ZSD2GmXgedy6n4B20MWI1jGP2wiexOWbFSya8DO/VxC9m5+/mF+WwYqdpKn9g4Y05Yw4uz7cdjTc3rXm7xK+8E7hI//5QD1nHPvuKYbjjM9u2JSL+Bzp61Cw";
2244        let session_key2 = "AgAAAAC1BXreFTUQQSBGekTEuYxhdytRKyv4JgDGcG+VOBYdPNGgs807SdibCGJky4lJ3I+7ZDGHoUzZPZP/4ogGu4kxni0PWdtWuN7+5zsuamgoFF/BkaGeUUGv6kgIkx8pyPpM5SASTUEP9bN2loDSpUPYwfiIqz74DgC4WQ4435sTBctYvKz8n+TDJwdLXpyT6zKljuqADAioud+s/iqx9LYn9HpbBfezZcvbg67GtE113pLrvde3IcPI5s6dNHK2onGO2B2eoaobcen18bbEDnlUGPeIivArLya7Da6us14jBQ";
2245        let session_key3 = "AgAAAAAM9KFsliaUUhGSXgwOzM5UemjkNH4n8NHgvC/y8hhw13zTF+ooGD4uIYEXYX630oNvQm/EvgZo+dkoc0re+vsqsx4sQeNODdSjcBsWOa0oDF+irQn9oYoLUDPI1IBtY1rX+FV99Zm/xnG7uFOX7aTVlko2GSdejy1w9mfobmfxu5aUc04A9zaKJP1pOthZvRAlhpymGYHgsDtWPrrjyc/yypMflE4kIUEEEtu1kT6mrAmcl615XYRAHYK9G2+fZsGvokwzbkl4nulGwcZMpQEoM0nD2o3GWgX81HW3nGfKBg";
2246        let session_key4 = "AgAAAAA4Kkesxq2h4v9PLD6Sm3Smxspz1PXTqytQPCMQMkkrHNmzV2bHlJ+6/Al9cu8vh1Oj69AK0WUAeJOJuaiskEeg/PI3P03+UYLeC379RzgqwSHdBgdQ41G2vD6zpgmE/8vYToe+qpCZACtPOswZxyqxHH+T/Iq0nv13JmlFGIeA6fEPfr5Y28B49viG74Fs9rxV9EH5PfjbuPM/p+Sz5obShuaBPKQBX1jT913nEXPoIJ06exNZGr0285nw/LgVvNlmWmbqNnbzO2cNZjQWA+xZYz5FSfyCxwqEBbEdUCuRCQ";
2247
2248        let sessions = [
2249            create_inbound_group_session_with_visibility(
2250                &alice,
2251                room1_id,
2252                &SessionKey::from_base64(session_key1).unwrap(),
2253                true,
2254            ),
2255            create_inbound_group_session_with_visibility(
2256                &alice,
2257                room1_id,
2258                &SessionKey::from_base64(session_key2).unwrap(),
2259                true,
2260            ),
2261            create_inbound_group_session_with_visibility(
2262                &alice,
2263                room1_id,
2264                &SessionKey::from_base64(session_key3).unwrap(),
2265                false,
2266            ),
2267            create_inbound_group_session_with_visibility(
2268                &alice,
2269                room2_id,
2270                &SessionKey::from_base64(session_key4).unwrap(),
2271                true,
2272            ),
2273        ];
2274        bob.store().save_inbound_group_sessions(&sessions).await.unwrap();
2275
2276        // When I build the bundle
2277        let mut bundle = bob.store().build_room_key_bundle(room1_id).await.unwrap();
2278
2279        // Then the bundle matches the snapshot.
2280
2281        // We sort the sessions in the bundle, so that the snapshot is stable.
2282        bundle.room_keys.sort_by_key(|session| session.session_id.clone());
2283
2284        // We substitute the algorithm, since this changes based on feature flags.
2285        let algorithm = if cfg!(feature = "experimental-algorithms") {
2286            "m.megolm.v2.aes-sha2"
2287        } else {
2288            "m.megolm.v1.aes-sha2"
2289        };
2290        let map_algorithm = move |value: Content, _path: ContentPath<'_>| {
2291            assert_eq!(value.as_str().unwrap(), algorithm);
2292            "[algorithm]"
2293        };
2294
2295        // We also substitute alice's keys in the snapshot with placeholders
2296        let alice_curve_key = alice.identity_keys().curve25519.to_base64();
2297        let map_alice_curve_key = move |value: Content, _path: ContentPath<'_>| {
2298            assert_eq!(value.as_str().unwrap(), alice_curve_key);
2299            "[alice curve key]"
2300        };
2301        let alice_ed25519_key = alice.identity_keys().ed25519.to_base64();
2302        let map_alice_ed25519_key = move |value: Content, _path: ContentPath<'_>| {
2303            assert_eq!(value.as_str().unwrap(), alice_ed25519_key);
2304            "[alice ed25519 key]"
2305        };
2306
2307        insta::with_settings!({ sort_maps => true }, {
2308            assert_json_snapshot!(bundle, {
2309                ".withheld[].algorithm" => insta::dynamic_redaction(map_algorithm),
2310                ".room_keys[].algorithm" => insta::dynamic_redaction(map_algorithm),
2311                ".room_keys[].sender_key" => insta::dynamic_redaction(map_alice_curve_key.clone()),
2312                ".withheld[].sender_key" => insta::dynamic_redaction(map_alice_curve_key),
2313                ".room_keys[].sender_claimed_keys.ed25519" => insta::dynamic_redaction(map_alice_ed25519_key),
2314            });
2315        });
2316    }
2317
2318    #[async_test]
2319    async fn test_receive_room_key_bundle() {
2320        let alice = OlmMachine::new(user_id!("@a:s.co"), device_id!("ALICE")).await;
2321        let alice_key = alice.identity_keys().curve25519;
2322        let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("BOB")).await;
2323
2324        let room_id = room_id!("!room1:localhost");
2325
2326        let session_key1 = "AgAAAAC2XHVzsMBKs4QCRElJ92CJKyGtknCSC8HY7cQ7UYwndMKLQAejXLh5UA0l6s736mgctcUMNvELScUWrObdflrHo+vth/gWreXOaCnaSxmyjjKErQwyIYTkUfqbHy40RJfEesLwnN23on9XAkch/iy8R2+Jz7B8zfG01f2Ow2SxPQFnAndcO1ZSD2GmXgedy6n4B20MWI1jGP2wiexOWbFSya8DO/VxC9m5+/mF+WwYqdpKn9g4Y05Yw4uz7cdjTc3rXm7xK+8E7hI//5QD1nHPvuKYbjjM9u2JSL+Bzp61Cw";
2327        let session_key2 = "AgAAAAC1BXreFTUQQSBGekTEuYxhdytRKyv4JgDGcG+VOBYdPNGgs807SdibCGJky4lJ3I+7ZDGHoUzZPZP/4ogGu4kxni0PWdtWuN7+5zsuamgoFF/BkaGeUUGv6kgIkx8pyPpM5SASTUEP9bN2loDSpUPYwfiIqz74DgC4WQ4435sTBctYvKz8n+TDJwdLXpyT6zKljuqADAioud+s/iqx9LYn9HpbBfezZcvbg67GtE113pLrvde3IcPI5s6dNHK2onGO2B2eoaobcen18bbEDnlUGPeIivArLya7Da6us14jBQ";
2328
2329        let sessions = [
2330            create_inbound_group_session_with_visibility(
2331                &alice,
2332                room_id,
2333                &SessionKey::from_base64(session_key1).unwrap(),
2334                true,
2335            ),
2336            create_inbound_group_session_with_visibility(
2337                &alice,
2338                room_id,
2339                &SessionKey::from_base64(session_key2).unwrap(),
2340                false,
2341            ),
2342        ];
2343
2344        alice.store().save_inbound_group_sessions(&sessions).await.unwrap();
2345        let bundle = alice.store().build_room_key_bundle(room_id).await.unwrap();
2346
2347        bob.store()
2348            .receive_room_key_bundle(
2349                &StoredRoomKeyBundleData {
2350                    sender_user: alice.user_id().to_owned(),
2351                    sender_key: alice_key,
2352                    sender_data: SenderData::sender_verified(
2353                        alice.user_id(),
2354                        device_id!("ALICE"),
2355                        alice.identity_keys().ed25519,
2356                    ),
2357
2358                    bundle_data: RoomKeyBundleContent {
2359                        room_id: room_id.to_owned(),
2360                        // This isn't used at all in the method call, so we can fill it with
2361                        // garbage.
2362                        file: EncryptedFileInit {
2363                            url: owned_mxc_uri!("mxc://example.com/0"),
2364                            key: JsonWebKeyInit {
2365                                kty: "oct".to_owned(),
2366                                key_ops: vec!["encrypt".to_owned(), "decrypt".to_owned()],
2367                                alg: "A256CTR.".to_owned(),
2368                                k: Base64::new(vec![0u8; 128]),
2369                                ext: true,
2370                            }
2371                            .into(),
2372                            iv: Base64::new(vec![0u8; 128]),
2373                            hashes: vec![("sha256".to_owned(), Base64::new(vec![0u8; 128]))]
2374                                .into_iter()
2375                                .collect(),
2376                            v: "v2".to_owned(),
2377                        }
2378                        .into(),
2379                    },
2380                },
2381                bundle,
2382                |_, _| {},
2383            )
2384            .await
2385            .unwrap();
2386
2387        // The room key should be imported successfully
2388        let imported_sessions =
2389            bob.store().get_inbound_group_sessions_by_room_id(room_id).await.unwrap();
2390
2391        assert_eq!(imported_sessions.len(), 1);
2392        assert_eq!(imported_sessions[0].room_id(), room_id);
2393
2394        // The session forwarder data should be set correctly.
2395        assert_eq!(
2396            imported_sessions[0]
2397                .forwarder_data
2398                .as_ref()
2399                .expect("Session should contain forwarder data.")
2400                .user_id(),
2401            alice.user_id()
2402        );
2403
2404        assert_matches!(
2405            bob.store()
2406                .get_withheld_info(room_id, sessions[1].session_id())
2407                .await
2408                .unwrap()
2409                .expect("Withheld info should be present in the store."),
2410            RoomKeyWithheldEntry {
2411                #[cfg(not(feature = "experimental-algorithms"))]
2412                content: RoomKeyWithheldContent::MegolmV1AesSha2(
2413                    MegolmV1AesSha2WithheldContent::HistoryNotShared(_)
2414                ),
2415                #[cfg(feature = "experimental-algorithms")]
2416                content: RoomKeyWithheldContent::MegolmV2AesSha2(
2417                    MegolmV1AesSha2WithheldContent::HistoryNotShared(_)
2418                ),
2419                ..
2420            }
2421        );
2422    }
2423
2424    /// Tests that the new store format introduced in [#5737][#5737] does not
2425    /// conflict with items already in the store that were serialised with the
2426    /// older format.
2427    ///
2428    /// [#5737]: https://github.com/matrix-org/matrix-rust-sdk/pull/5737
2429    #[async_test]
2430    async fn test_deserialize_room_key_withheld_entry_from_to_device_event() {
2431        let entry: RoomKeyWithheldEntry = serde_json::from_value(json!(
2432            {
2433              "content": {
2434                "algorithm": "m.megolm.v1.aes-sha2",
2435                "code": "m.unauthorised",
2436                "from_device": "ALICE",
2437                "reason": "You are not authorised to read the message.",
2438                "room_id": "!roomid:s.co",
2439                "sender_key": "7hIcOrEroXYdzjtCBvBjUiqvT0Me7g+ymeXqoc65RS0",
2440                "session_id": "session123"
2441              },
2442              "sender": "@alice:s.co",
2443              "type": "m.room_key.withheld"
2444            }
2445        ))
2446        .unwrap();
2447
2448        assert_matches!(
2449            entry,
2450            RoomKeyWithheldEntry {
2451                sender,
2452                content: RoomKeyWithheldContent::MegolmV1AesSha2(
2453                    MegolmV1AesSha2WithheldContent::Unauthorised(withheld_content,)
2454                ),
2455            }
2456        );
2457
2458        assert_eq!(sender, "@alice:s.co");
2459        assert_eq!(withheld_content.room_id, "!roomid:s.co");
2460        assert_eq!(withheld_content.session_id, "session123");
2461        assert_eq!(
2462            withheld_content.sender_key.to_base64(),
2463            "7hIcOrEroXYdzjtCBvBjUiqvT0Me7g+ymeXqoc65RS0"
2464        );
2465        assert_eq!(withheld_content.from_device, Some(owned_device_id!("ALICE")));
2466    }
2467
2468    /// Create an inbound Megolm session for the given room.
2469    ///
2470    /// `olm_machine` is used to set the `sender_key` and `signing_key`
2471    /// fields of the resultant session.
2472    ///
2473    /// The encryption algorithm used for the session depends on the
2474    /// `experimental-algorithms` feature flag:
2475    ///
2476    /// - When not set, the session uses `m.megolm.v1.aes-sha2`.
2477    /// - When set, the session uses `m.megolm.v2.aes-sha2`.
2478    fn create_inbound_group_session_with_visibility(
2479        olm_machine: &OlmMachine,
2480        room_id: &RoomId,
2481        session_key: &SessionKey,
2482        shared_history: bool,
2483    ) -> InboundGroupSession {
2484        let identity_keys = &olm_machine.store().static_account().identity_keys;
2485        InboundGroupSession::new(
2486            identity_keys.curve25519,
2487            identity_keys.ed25519,
2488            room_id,
2489            session_key,
2490            SenderData::unknown(),
2491            None,
2492            #[cfg(not(feature = "experimental-algorithms"))]
2493            EventEncryptionAlgorithm::MegolmV1AesSha2,
2494            #[cfg(feature = "experimental-algorithms")]
2495            EventEncryptionAlgorithm::MegolmV2AesSha2,
2496            None,
2497            shared_history,
2498        )
2499        .unwrap()
2500    }
2501}