matrix_sdk_crypto/store/
crypto_store_wrapper.rs

1use std::{future, ops::Deref, sync::Arc};
2
3use futures_core::Stream;
4use futures_util::StreamExt;
5use matrix_sdk_common::cross_process_lock::CrossProcessLock;
6use ruma::{DeviceId, OwnedDeviceId, OwnedUserId, UserId};
7use tokio::sync::{Mutex, broadcast};
8use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
9use tracing::{debug, trace, warn};
10
11use super::{
12    DeviceChanges, IdentityChanges, LockableCryptoStore, caches::SessionStore,
13    types::RoomKeyBundleInfo,
14};
15use crate::{
16    CryptoStoreError, GossippedSecret, OwnUserIdentityData, Session, UserIdentityData,
17    olm::InboundGroupSession,
18    store,
19    store::{Changes, DynCryptoStore, IntoCryptoStore, RoomKeyInfo, RoomKeyWithheldInfo},
20};
21
22/// A wrapper for crypto store implementations that adds update notifiers.
23///
24/// This is shared between [`StoreInner`] and
25/// [`crate::verification::VerificationStore`].
26#[derive(Debug)]
27pub(crate) struct CryptoStoreWrapper {
28    user_id: OwnedUserId,
29    device_id: OwnedDeviceId,
30
31    store: Arc<DynCryptoStore>,
32
33    /// A cache for the Olm Sessions.
34    sessions: SessionStore,
35
36    /// The sender side of a broadcast stream that is notified whenever we get
37    /// an update to an inbound group session.
38    room_keys_received_sender: broadcast::Sender<Vec<RoomKeyInfo>>,
39
40    /// The sender side of a broadcast stream that is notified whenever we
41    /// receive an `m.room_key.withheld` message.
42    room_keys_withheld_received_sender: broadcast::Sender<Vec<RoomKeyWithheldInfo>>,
43
44    /// The sender side of a broadcast channel which sends out secrets we
45    /// received as a `m.secret.send` event.
46    secrets_broadcaster: broadcast::Sender<GossippedSecret>,
47
48    /// The sender side of a broadcast channel which sends out devices and user
49    /// identities which got updated or newly created.
50    identities_broadcaster:
51        broadcast::Sender<(Option<OwnUserIdentityData>, IdentityChanges, DeviceChanges)>,
52
53    /// The sender side of a broadcast channel which sends out information about
54    /// historic room key bundles we have received.
55    historic_room_key_bundles_broadcaster: broadcast::Sender<RoomKeyBundleInfo>,
56}
57
58impl CryptoStoreWrapper {
59    pub(crate) fn new(user_id: &UserId, device_id: &DeviceId, store: impl IntoCryptoStore) -> Self {
60        let room_keys_received_sender = broadcast::Sender::new(10);
61        let room_keys_withheld_received_sender = broadcast::Sender::new(10);
62        let secrets_broadcaster = broadcast::Sender::new(10);
63        // The identities broadcaster is responsible for user identities as well as
64        // devices, that's why we increase the capacity here.
65        let identities_broadcaster = broadcast::Sender::new(20);
66        let historic_room_key_bundles_broadcaster = broadcast::Sender::new(10);
67
68        Self {
69            user_id: user_id.to_owned(),
70            device_id: device_id.to_owned(),
71            store: store.into_crypto_store(),
72            sessions: SessionStore::new(),
73            room_keys_received_sender,
74            room_keys_withheld_received_sender,
75            secrets_broadcaster,
76            identities_broadcaster,
77            historic_room_key_bundles_broadcaster,
78        }
79    }
80
81    /// Save the set of changes to the store.
82    ///
83    /// Also responsible for sending updates to the broadcast streams such as
84    /// `room_keys_received_sender` and `secrets_broadcaster`.
85    ///
86    /// # Arguments
87    ///
88    /// * `changes` - The set of changes that should be stored.
89    pub async fn save_changes(&self, changes: Changes) -> store::Result<()> {
90        let room_key_updates: Vec<_> =
91            changes.inbound_group_sessions.iter().map(RoomKeyInfo::from).collect();
92
93        let withheld_session_updates: Vec<_> = changes
94            .withheld_session_info
95            .iter()
96            .flat_map(|(room_id, session_map)| {
97                session_map.iter().map(|(session_id, withheld_event)| RoomKeyWithheldInfo {
98                    room_id: room_id.to_owned(),
99                    session_id: session_id.to_owned(),
100                    withheld_event: withheld_event.clone(),
101                })
102            })
103            .collect();
104
105        // If our own identity verified status changes we need to do some checks on
106        // other identities. So remember the verification status before
107        // processing the changes
108        let own_identity_was_verified_before_change = self
109            .store
110            .get_user_identity(self.user_id.as_ref())
111            .await?
112            .as_ref()
113            .and_then(|i| i.own())
114            .is_some_and(|own| own.is_verified());
115
116        let secrets = changes.secrets.to_owned();
117        let devices = changes.devices.to_owned();
118        let identities = changes.identities.to_owned();
119        let room_key_bundle_updates: Vec<_> =
120            changes.received_room_key_bundles.iter().map(RoomKeyBundleInfo::from).collect();
121
122        if devices
123            .changed
124            .iter()
125            .any(|d| d.user_id() == self.user_id && d.device_id() == self.device_id)
126        {
127            // If our own device key changes, we need to clear the
128            // session cache because the sessions contain a copy of our
129            // device key.
130            self.sessions.clear().await;
131        } else {
132            // Otherwise add the sessions to the cache.
133            for session in &changes.sessions {
134                self.sessions.add(session.clone()).await;
135            }
136        }
137
138        self.store.save_changes(changes).await?;
139
140        // If we updated our own public identity, log it for debugging purposes
141        if tracing::level_enabled!(tracing::Level::DEBUG) {
142            for updated_identity in
143                identities.new.iter().chain(identities.changed.iter()).filter_map(|id| id.own())
144            {
145                let master_key = updated_identity.master_key().get_first_key();
146                let user_signing_key = updated_identity.user_signing_key().get_first_key();
147                let self_signing_key = updated_identity.self_signing_key().get_first_key();
148
149                debug!(
150                    ?master_key,
151                    ?user_signing_key,
152                    ?self_signing_key,
153                    previously_verified = updated_identity.was_previously_verified(),
154                    verified = updated_identity.is_verified(),
155                    "Stored our own identity"
156                );
157            }
158        }
159
160        if !room_key_updates.is_empty() {
161            // Ignore the result. It can only fail if there are no listeners.
162            let _ = self.room_keys_received_sender.send(room_key_updates);
163        }
164
165        if !withheld_session_updates.is_empty() {
166            let _ = self.room_keys_withheld_received_sender.send(withheld_session_updates);
167        }
168
169        for secret in secrets {
170            let _ = self.secrets_broadcaster.send(secret);
171        }
172
173        for bundle_info in room_key_bundle_updates {
174            let _ = self.historic_room_key_bundles_broadcaster.send(bundle_info);
175        }
176
177        if !devices.is_empty() || !identities.is_empty() {
178            // Mapping the devices and user identities from the read-only variant to one's
179            // that contain side-effects requires our own identity. This is
180            // guaranteed to be up-to-date since we just persisted it.
181            let maybe_own_identity =
182                self.store.get_user_identity(&self.user_id).await?.and_then(|i| i.into_own());
183
184            // If our identity was not verified before the change and is now, that means
185            // this could impact the verification chain of other known
186            // identities.
187            if let Some(own_identity_after) = maybe_own_identity.as_ref() {
188                // Only do this if our identity is passing from not verified to verified,
189                // the previously_verified can only change in that case.
190                let own_identity_is_verified = own_identity_after.is_verified();
191
192                if !own_identity_was_verified_before_change && own_identity_is_verified {
193                    debug!(
194                        "Own identity is now verified, check all known identities for verification status changes"
195                    );
196                    // We need to review all the other identities to see if they are verified now
197                    // and mark them as such
198                    self.check_all_identities_and_update_was_previously_verified_flag_if_needed(
199                        own_identity_after,
200                    )
201                    .await?;
202                } else if own_identity_was_verified_before_change != own_identity_is_verified {
203                    // Log that the verification state of the identity changed.
204                    debug!(
205                        own_identity_is_verified,
206                        "The verification state of our own identity has changed",
207                    );
208                }
209            }
210
211            let _ = self.identities_broadcaster.send((maybe_own_identity, identities, devices));
212        }
213
214        Ok(())
215    }
216
217    async fn check_all_identities_and_update_was_previously_verified_flag_if_needed(
218        &self,
219        own_identity_after: &OwnUserIdentityData,
220    ) -> Result<(), CryptoStoreError> {
221        let tracked_users = self.store.load_tracked_users().await?;
222        let mut updated_identities: Vec<UserIdentityData> = Default::default();
223        for tracked_user in tracked_users {
224            if let Some(other_identity) = self
225                .store
226                .get_user_identity(tracked_user.user_id.as_ref())
227                .await?
228                .as_ref()
229                .and_then(|i| i.other())
230                && !other_identity.was_previously_verified()
231                && own_identity_after.is_identity_signed(other_identity)
232            {
233                trace!(?tracked_user.user_id, "Marking set verified_latch to true.");
234                other_identity.mark_as_previously_verified();
235                updated_identities.push(other_identity.clone().into());
236            }
237        }
238
239        if !updated_identities.is_empty() {
240            let identity_changes =
241                IdentityChanges { changed: updated_identities, ..Default::default() };
242            self.store
243                .save_changes(Changes {
244                    identities: identity_changes.clone(),
245                    ..Default::default()
246                })
247                .await?;
248
249            let _ = self.identities_broadcaster.send((
250                Some(own_identity_after.clone()),
251                identity_changes,
252                DeviceChanges::default(),
253            ));
254        }
255
256        Ok(())
257    }
258
259    pub async fn get_sessions(
260        &self,
261        sender_key: &str,
262    ) -> store::Result<Option<Arc<Mutex<Vec<Session>>>>> {
263        let sessions = self.sessions.get(sender_key).await;
264
265        let sessions = if sessions.is_none() {
266            let mut entries = self.sessions.entries.write().await;
267
268            let sessions = entries.get(sender_key);
269
270            if sessions.is_some() {
271                sessions.cloned()
272            } else {
273                let sessions = self.store.get_sessions(sender_key).await?;
274                let sessions = Arc::new(Mutex::new(sessions.unwrap_or_default()));
275
276                entries.insert(sender_key.to_owned(), sessions.clone());
277
278                Some(sessions)
279            }
280        } else {
281            sessions
282        };
283
284        Ok(sessions)
285    }
286
287    /// Save a list of inbound group sessions to the store.
288    ///
289    /// # Arguments
290    ///
291    /// * `sessions` - The sessions to be saved.
292    /// * `backed_up_to_version` - If the keys should be marked as having been
293    ///   backed up, the version of the backup.
294    ///
295    /// Note: some implementations ignore `backup_version` and assume the
296    /// current backup version, which is normally the same.
297    pub async fn save_inbound_group_sessions(
298        &self,
299        sessions: Vec<InboundGroupSession>,
300        backed_up_to_version: Option<&str>,
301    ) -> store::Result<()> {
302        let room_key_updates: Vec<_> = sessions.iter().map(RoomKeyInfo::from).collect();
303        self.store.save_inbound_group_sessions(sessions, backed_up_to_version).await?;
304
305        if !room_key_updates.is_empty() {
306            // Ignore the result. It can only fail if there are no listeners.
307            let _ = self.room_keys_received_sender.send(room_key_updates);
308        }
309        Ok(())
310    }
311
312    /// Receive notifications of room keys being received as a [`Stream`].
313    ///
314    /// Each time a room key is updated in any way, an update will be sent to
315    /// the stream. Updates that happen at the same time are batched into a
316    /// [`Vec`].
317    ///
318    /// If the reader of the stream lags too far behind an error will be sent to
319    /// the reader.
320    pub fn room_keys_received_stream(
321        &self,
322    ) -> impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<> {
323        BroadcastStream::new(self.room_keys_received_sender.subscribe())
324    }
325
326    /// Receive notifications of received `m.room_key.withheld` messages.
327    ///
328    /// Each time an `m.room_key.withheld` is received and stored, an update
329    /// will be sent to the stream. Updates that happen at the same time are
330    /// batched into a [`Vec`].
331    ///
332    /// If the reader of the stream lags too far behind, a warning will be
333    /// logged and items will be dropped.
334    pub fn room_keys_withheld_received_stream(
335        &self,
336    ) -> impl Stream<Item = Vec<RoomKeyWithheldInfo>> + use<> {
337        let stream = BroadcastStream::new(self.room_keys_withheld_received_sender.subscribe());
338        Self::filter_errors_out_of_stream(stream, "room_keys_withheld_received_stream")
339    }
340
341    /// Receive notifications of gossipped secrets being received and stored in
342    /// the secret inbox as a [`Stream`].
343    pub fn secrets_stream(&self) -> impl Stream<Item = GossippedSecret> + use<> {
344        let stream = BroadcastStream::new(self.secrets_broadcaster.subscribe());
345        Self::filter_errors_out_of_stream(stream, "secrets_stream")
346    }
347
348    /// Receive notifications of historic room key bundles being received and
349    /// stored in the store as a [`Stream`].
350    pub fn historic_room_key_stream(&self) -> impl Stream<Item = RoomKeyBundleInfo> + use<> {
351        let stream = BroadcastStream::new(self.historic_room_key_bundles_broadcaster.subscribe());
352        Self::filter_errors_out_of_stream(stream, "bundle_stream")
353    }
354
355    /// Returns a stream of newly created or updated cryptographic identities.
356    ///
357    /// This is just a helper method which allows us to build higher level
358    /// device and user identity streams.
359    pub(super) fn identities_stream(
360        &self,
361    ) -> impl Stream<Item = (Option<OwnUserIdentityData>, IdentityChanges, DeviceChanges)> + use<>
362    {
363        let stream = BroadcastStream::new(self.identities_broadcaster.subscribe());
364        Self::filter_errors_out_of_stream(stream, "identities_stream")
365    }
366
367    /// Helper for *_stream functions: filters errors out of the stream,
368    /// creating a new Stream.
369    ///
370    /// `BroadcastStream`s gives us `Result`s which can fail with
371    /// `BroadcastStreamRecvError` if the reader falls behind. That's annoying
372    /// to work with, so here we just emit a warning and drop the errors.
373    fn filter_errors_out_of_stream<ItemType>(
374        stream: BroadcastStream<ItemType>,
375        stream_name: &str,
376    ) -> impl Stream<Item = ItemType> + use<ItemType>
377    where
378        ItemType: 'static + Clone + Send,
379    {
380        let stream_name = stream_name.to_owned();
381        stream.filter_map(move |result| {
382            future::ready(match result {
383                Ok(r) => Some(r),
384                Err(BroadcastStreamRecvError::Lagged(lag)) => {
385                    warn!("{stream_name} missed {lag} updates");
386                    None
387                }
388            })
389        })
390    }
391
392    /// Creates a [`CrossProcessLock`] for this store, that will contain the
393    /// given key and value when hold.
394    pub(crate) fn create_store_lock(
395        &self,
396        lock_key: String,
397        lock_value: String,
398    ) -> CrossProcessLock<LockableCryptoStore> {
399        CrossProcessLock::new(LockableCryptoStore(self.store.clone()), lock_key, lock_value)
400    }
401}
402
403impl Deref for CryptoStoreWrapper {
404    type Target = DynCryptoStore;
405
406    fn deref(&self) -> &Self::Target {
407        self.store.deref()
408    }
409}
410
411#[cfg(test)]
412mod test {
413    use matrix_sdk_test::async_test;
414    use ruma::user_id;
415
416    use super::*;
417    use crate::machine::test_helpers::get_machine_pair_with_setup_sessions_test_helper;
418
419    #[async_test]
420    async fn test_cache_cleared_after_device_update() {
421        let user_id = user_id!("@alice:example.com");
422        let (first, second) =
423            get_machine_pair_with_setup_sessions_test_helper(user_id, user_id, false).await;
424
425        let sender_key = second.identity_keys().curve25519.to_base64();
426
427        first
428            .store()
429            .inner
430            .store
431            .sessions
432            .get(&sender_key)
433            .await
434            .expect("We should have a session in the cache.");
435
436        let device_data = first
437            .get_device(user_id, first.device_id(), None)
438            .await
439            .unwrap()
440            .expect("We should have access to our own device.")
441            .inner;
442
443        // When we save a new version of our device keys
444        first
445            .store()
446            .save_changes(Changes {
447                devices: DeviceChanges { changed: vec![device_data], ..Default::default() },
448                ..Default::default()
449            })
450            .await
451            .unwrap();
452
453        // Then the session is no longer in the cache
454        assert!(
455            first.store().inner.store.sessions.get(&sender_key).await.is_none(),
456            "The session should no longer be in the cache after our own device keys changed"
457        );
458    }
459}