matrix_sdk_crypto/store/
crypto_store_wrapper.rs

1use std::{future, ops::Deref, sync::Arc};
2
3use futures_core::Stream;
4use futures_util::StreamExt;
5use matrix_sdk_common::store_locks::CrossProcessStoreLock;
6use ruma::{DeviceId, OwnedDeviceId, OwnedUserId, UserId};
7use tokio::sync::{
8    broadcast::{self},
9    Mutex,
10};
11use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
12use tracing::{debug, trace, warn};
13
14use super::{caches::SessionStore, DeviceChanges, IdentityChanges, LockableCryptoStore};
15use crate::{
16    olm::InboundGroupSession,
17    store,
18    store::{Changes, DynCryptoStore, IntoCryptoStore, RoomKeyInfo, RoomKeyWithheldInfo},
19    CryptoStoreError, GossippedSecret, OwnUserIdentityData, Session, UserIdentityData,
20};
21
22/// A wrapper for crypto store implementations that adds update notifiers.
23///
24/// This is shared between [`StoreInner`] and
25/// [`crate::verification::VerificationStore`].
26#[derive(Debug)]
27pub(crate) struct CryptoStoreWrapper {
28    user_id: OwnedUserId,
29    device_id: OwnedDeviceId,
30
31    store: Arc<DynCryptoStore>,
32
33    /// A cache for the Olm Sessions.
34    sessions: SessionStore,
35
36    /// The sender side of a broadcast stream that is notified whenever we get
37    /// an update to an inbound group session.
38    room_keys_received_sender: broadcast::Sender<Vec<RoomKeyInfo>>,
39
40    /// The sender side of a broadcast stream that is notified whenever we
41    /// receive an `m.room_key.withheld` message.
42    room_keys_withheld_received_sender: broadcast::Sender<Vec<RoomKeyWithheldInfo>>,
43
44    /// The sender side of a broadcast channel which sends out secrets we
45    /// received as a `m.secret.send` event.
46    secrets_broadcaster: broadcast::Sender<GossippedSecret>,
47
48    /// The sender side of a broadcast channel which sends out devices and user
49    /// identities which got updated or newly created.
50    identities_broadcaster:
51        broadcast::Sender<(Option<OwnUserIdentityData>, IdentityChanges, DeviceChanges)>,
52}
53
54impl CryptoStoreWrapper {
55    pub(crate) fn new(user_id: &UserId, device_id: &DeviceId, store: impl IntoCryptoStore) -> Self {
56        let room_keys_received_sender = broadcast::Sender::new(10);
57        let room_keys_withheld_received_sender = broadcast::Sender::new(10);
58        let secrets_broadcaster = broadcast::Sender::new(10);
59        // The identities broadcaster is responsible for user identities as well as
60        // devices, that's why we increase the capacity here.
61        let identities_broadcaster = broadcast::Sender::new(20);
62
63        Self {
64            user_id: user_id.to_owned(),
65            device_id: device_id.to_owned(),
66            store: store.into_crypto_store(),
67            sessions: SessionStore::new(),
68            room_keys_received_sender,
69            room_keys_withheld_received_sender,
70            secrets_broadcaster,
71            identities_broadcaster,
72        }
73    }
74
75    /// Save the set of changes to the store.
76    ///
77    /// Also responsible for sending updates to the broadcast streams such as
78    /// `room_keys_received_sender` and `secrets_broadcaster`.
79    ///
80    /// # Arguments
81    ///
82    /// * `changes` - The set of changes that should be stored.
83    pub async fn save_changes(&self, changes: Changes) -> store::Result<()> {
84        let room_key_updates: Vec<_> =
85            changes.inbound_group_sessions.iter().map(RoomKeyInfo::from).collect();
86
87        let withheld_session_updates: Vec<_> = changes
88            .withheld_session_info
89            .iter()
90            .flat_map(|(room_id, session_map)| {
91                session_map.iter().map(|(session_id, withheld_event)| RoomKeyWithheldInfo {
92                    room_id: room_id.to_owned(),
93                    session_id: session_id.to_owned(),
94                    withheld_event: withheld_event.clone(),
95                })
96            })
97            .collect();
98
99        // If our own identity verified status changes we need to do some checks on
100        // other identities. So remember the verification status before
101        // processing the changes
102        let own_identity_was_verified_before_change = self
103            .store
104            .get_user_identity(self.user_id.as_ref())
105            .await?
106            .as_ref()
107            .and_then(|i| i.own())
108            .is_some_and(|own| own.is_verified());
109
110        let secrets = changes.secrets.to_owned();
111        let devices = changes.devices.to_owned();
112        let identities = changes.identities.to_owned();
113
114        if devices
115            .changed
116            .iter()
117            .any(|d| d.user_id() == self.user_id && d.device_id() == self.device_id)
118        {
119            // If our own device key changes, we need to clear the
120            // session cache because the sessions contain a copy of our
121            // device key.
122            self.sessions.clear().await;
123        } else {
124            // Otherwise add the sessions to the cache.
125            for session in &changes.sessions {
126                self.sessions.add(session.clone()).await;
127            }
128        }
129
130        self.store.save_changes(changes).await?;
131
132        // If we updated our own public identity, log it for debugging purposes
133        if tracing::level_enabled!(tracing::Level::DEBUG) {
134            for updated_identity in
135                identities.new.iter().chain(identities.changed.iter()).filter_map(|id| id.own())
136            {
137                let master_key = updated_identity.master_key().get_first_key();
138                let user_signing_key = updated_identity.user_signing_key().get_first_key();
139                let self_signing_key = updated_identity.self_signing_key().get_first_key();
140
141                debug!(
142                    ?master_key,
143                    ?user_signing_key,
144                    ?self_signing_key,
145                    previously_verified = updated_identity.was_previously_verified(),
146                    verified = updated_identity.is_verified(),
147                    "Stored our own identity"
148                );
149            }
150        }
151
152        if !room_key_updates.is_empty() {
153            // Ignore the result. It can only fail if there are no listeners.
154            let _ = self.room_keys_received_sender.send(room_key_updates);
155        }
156
157        if !withheld_session_updates.is_empty() {
158            let _ = self.room_keys_withheld_received_sender.send(withheld_session_updates);
159        }
160
161        for secret in secrets {
162            let _ = self.secrets_broadcaster.send(secret);
163        }
164
165        if !devices.is_empty() || !identities.is_empty() {
166            // Mapping the devices and user identities from the read-only variant to one's
167            // that contain side-effects requires our own identity. This is
168            // guaranteed to be up-to-date since we just persisted it.
169            let maybe_own_identity =
170                self.store.get_user_identity(&self.user_id).await?.and_then(|i| i.into_own());
171
172            // If our identity was not verified before the change and is now, that means
173            // this could impact the verification chain of other known
174            // identities.
175            if let Some(own_identity_after) = maybe_own_identity.as_ref() {
176                // Only do this if our identity is passing from not verified to verified,
177                // the previously_verified can only change in that case.
178                let own_identity_is_verified = own_identity_after.is_verified();
179
180                if !own_identity_was_verified_before_change && own_identity_is_verified {
181                    debug!("Own identity is now verified, check all known identities for verification status changes");
182                    // We need to review all the other identities to see if they are verified now
183                    // and mark them as such
184                    self.check_all_identities_and_update_was_previously_verified_flag_if_needed(
185                        own_identity_after,
186                    )
187                    .await?;
188                } else if own_identity_was_verified_before_change != own_identity_is_verified {
189                    // Log that the verification state of the identity changed.
190                    debug!(
191                        own_identity_is_verified,
192                        "The verification state of our own identity has changed",
193                    );
194                }
195            }
196
197            let _ = self.identities_broadcaster.send((maybe_own_identity, identities, devices));
198        }
199
200        Ok(())
201    }
202
203    async fn check_all_identities_and_update_was_previously_verified_flag_if_needed(
204        &self,
205        own_identity_after: &OwnUserIdentityData,
206    ) -> Result<(), CryptoStoreError> {
207        let tracked_users = self.store.load_tracked_users().await?;
208        let mut updated_identities: Vec<UserIdentityData> = Default::default();
209        for tracked_user in tracked_users {
210            if let Some(other_identity) = self
211                .store
212                .get_user_identity(tracked_user.user_id.as_ref())
213                .await?
214                .as_ref()
215                .and_then(|i| i.other())
216            {
217                if !other_identity.was_previously_verified()
218                    && own_identity_after.is_identity_signed(other_identity)
219                {
220                    trace!(?tracked_user.user_id, "Marking set verified_latch to true.");
221                    other_identity.mark_as_previously_verified();
222                    updated_identities.push(other_identity.clone().into());
223                }
224            }
225        }
226
227        if !updated_identities.is_empty() {
228            let identity_changes =
229                IdentityChanges { changed: updated_identities, ..Default::default() };
230            self.store
231                .save_changes(Changes {
232                    identities: identity_changes.clone(),
233                    ..Default::default()
234                })
235                .await?;
236
237            let _ = self.identities_broadcaster.send((
238                Some(own_identity_after.clone()),
239                identity_changes,
240                DeviceChanges::default(),
241            ));
242        }
243
244        Ok(())
245    }
246
247    pub async fn get_sessions(
248        &self,
249        sender_key: &str,
250    ) -> store::Result<Option<Arc<Mutex<Vec<Session>>>>> {
251        let sessions = self.sessions.get(sender_key).await;
252
253        let sessions = if sessions.is_none() {
254            let mut entries = self.sessions.entries.write().await;
255
256            let sessions = entries.get(sender_key);
257
258            if sessions.is_some() {
259                sessions.cloned()
260            } else {
261                let sessions = self.store.get_sessions(sender_key).await?;
262                let sessions = Arc::new(Mutex::new(sessions.unwrap_or_default()));
263
264                entries.insert(sender_key.to_owned(), sessions.clone());
265
266                Some(sessions)
267            }
268        } else {
269            sessions
270        };
271
272        Ok(sessions)
273    }
274
275    /// Save a list of inbound group sessions to the store.
276    ///
277    /// # Arguments
278    ///
279    /// * `sessions` - The sessions to be saved.
280    /// * `backed_up_to_version` - If the keys should be marked as having been
281    ///   backed up, the version of the backup.
282    ///
283    /// Note: some implementations ignore `backup_version` and assume the
284    /// current backup version, which is normally the same.
285    pub async fn save_inbound_group_sessions(
286        &self,
287        sessions: Vec<InboundGroupSession>,
288        backed_up_to_version: Option<&str>,
289    ) -> store::Result<()> {
290        let room_key_updates: Vec<_> = sessions.iter().map(RoomKeyInfo::from).collect();
291        self.store.save_inbound_group_sessions(sessions, backed_up_to_version).await?;
292
293        if !room_key_updates.is_empty() {
294            // Ignore the result. It can only fail if there are no listeners.
295            let _ = self.room_keys_received_sender.send(room_key_updates);
296        }
297        Ok(())
298    }
299
300    /// Receive notifications of room keys being received as a [`Stream`].
301    ///
302    /// Each time a room key is updated in any way, an update will be sent to
303    /// the stream. Updates that happen at the same time are batched into a
304    /// [`Vec`].
305    ///
306    /// If the reader of the stream lags too far behind an error will be sent to
307    /// the reader.
308    pub fn room_keys_received_stream(
309        &self,
310    ) -> impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> {
311        BroadcastStream::new(self.room_keys_received_sender.subscribe())
312    }
313
314    /// Receive notifications of received `m.room_key.withheld` messages.
315    ///
316    /// Each time an `m.room_key.withheld` is received and stored, an update
317    /// will be sent to the stream. Updates that happen at the same time are
318    /// batched into a [`Vec`].
319    ///
320    /// If the reader of the stream lags too far behind, a warning will be
321    /// logged and items will be dropped.
322    pub fn room_keys_withheld_received_stream(
323        &self,
324    ) -> impl Stream<Item = Vec<RoomKeyWithheldInfo>> {
325        let stream = BroadcastStream::new(self.room_keys_withheld_received_sender.subscribe());
326        Self::filter_errors_out_of_stream(stream, "room_keys_withheld_received_stream")
327    }
328
329    /// Receive notifications of gossipped secrets being received and stored in
330    /// the secret inbox as a [`Stream`].
331    pub fn secrets_stream(&self) -> impl Stream<Item = GossippedSecret> {
332        let stream = BroadcastStream::new(self.secrets_broadcaster.subscribe());
333        Self::filter_errors_out_of_stream(stream, "secrets_stream")
334    }
335
336    /// Returns a stream of newly created or updated cryptographic identities.
337    ///
338    /// This is just a helper method which allows us to build higher level
339    /// device and user identity streams.
340    pub(super) fn identities_stream(
341        &self,
342    ) -> impl Stream<Item = (Option<OwnUserIdentityData>, IdentityChanges, DeviceChanges)> {
343        let stream = BroadcastStream::new(self.identities_broadcaster.subscribe());
344        Self::filter_errors_out_of_stream(stream, "identities_stream")
345    }
346
347    /// Helper for *_stream functions: filters errors out of the stream,
348    /// creating a new Stream.
349    ///
350    /// `BroadcastStream`s gives us `Result`s which can fail with
351    /// `BroadcastStreamRecvError` if the reader falls behind. That's annoying
352    /// to work with, so here we just emit a warning and drop the errors.
353    fn filter_errors_out_of_stream<ItemType>(
354        stream: BroadcastStream<ItemType>,
355        stream_name: &str,
356    ) -> impl Stream<Item = ItemType>
357    where
358        ItemType: 'static + Clone + Send,
359    {
360        let stream_name = stream_name.to_owned();
361        stream.filter_map(move |result| {
362            future::ready(match result {
363                Ok(r) => Some(r),
364                Err(BroadcastStreamRecvError::Lagged(lag)) => {
365                    warn!("{stream_name} missed {lag} updates");
366                    None
367                }
368            })
369        })
370    }
371
372    /// Creates a `CrossProcessStoreLock` for this store, that will contain the
373    /// given key and value when hold.
374    pub(crate) fn create_store_lock(
375        &self,
376        lock_key: String,
377        lock_value: String,
378    ) -> CrossProcessStoreLock<LockableCryptoStore> {
379        CrossProcessStoreLock::new(LockableCryptoStore(self.store.clone()), lock_key, lock_value)
380    }
381}
382
383impl Deref for CryptoStoreWrapper {
384    type Target = DynCryptoStore;
385
386    fn deref(&self) -> &Self::Target {
387        self.store.deref()
388    }
389}
390
391#[cfg(test)]
392mod test {
393    use matrix_sdk_test::async_test;
394    use ruma::user_id;
395
396    use super::*;
397    use crate::machine::test_helpers::get_machine_pair_with_setup_sessions_test_helper;
398
399    #[async_test]
400    async fn test_cache_cleared_after_device_update() {
401        let user_id = user_id!("@alice:example.com");
402        let (first, second) =
403            get_machine_pair_with_setup_sessions_test_helper(user_id, user_id, false).await;
404
405        let sender_key = second.identity_keys().curve25519.to_base64();
406
407        first
408            .store()
409            .inner
410            .store
411            .sessions
412            .get(&sender_key)
413            .await
414            .expect("We should have a session in the cache.");
415
416        let device_data = first
417            .get_device(user_id, first.device_id(), None)
418            .await
419            .unwrap()
420            .expect("We should have access to our own device.")
421            .inner;
422
423        // When we save a new version of our device keys
424        first
425            .store()
426            .save_changes(Changes {
427                devices: DeviceChanges { changed: vec![device_data], ..Default::default() },
428                ..Default::default()
429            })
430            .await
431            .unwrap();
432
433        // Then the session is no longer in the cache
434        assert!(
435            first.store().inner.store.sessions.get(&sender_key).await.is_none(),
436            "The session should no longer be in the cache after our own device keys changed"
437        );
438    }
439}