1use std::{future, ops::Deref, sync::Arc};
2
3use futures_core::Stream;
4use futures_util::StreamExt;
5use matrix_sdk_common::store_locks::CrossProcessStoreLock;
6use ruma::{DeviceId, OwnedDeviceId, OwnedUserId, UserId};
7use tokio::sync::{
8 broadcast::{self},
9 Mutex,
10};
11use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
12use tracing::{debug, trace, warn};
13
14use super::{caches::SessionStore, DeviceChanges, IdentityChanges, LockableCryptoStore};
15use crate::{
16 olm::InboundGroupSession,
17 store,
18 store::{Changes, DynCryptoStore, IntoCryptoStore, RoomKeyInfo, RoomKeyWithheldInfo},
19 CryptoStoreError, GossippedSecret, OwnUserIdentityData, Session, UserIdentityData,
20};
21
22#[derive(Debug)]
27pub(crate) struct CryptoStoreWrapper {
28 user_id: OwnedUserId,
29 device_id: OwnedDeviceId,
30
31 store: Arc<DynCryptoStore>,
32
33 sessions: SessionStore,
35
36 room_keys_received_sender: broadcast::Sender<Vec<RoomKeyInfo>>,
39
40 room_keys_withheld_received_sender: broadcast::Sender<Vec<RoomKeyWithheldInfo>>,
43
44 secrets_broadcaster: broadcast::Sender<GossippedSecret>,
47
48 identities_broadcaster:
51 broadcast::Sender<(Option<OwnUserIdentityData>, IdentityChanges, DeviceChanges)>,
52}
53
54impl CryptoStoreWrapper {
55 pub(crate) fn new(user_id: &UserId, device_id: &DeviceId, store: impl IntoCryptoStore) -> Self {
56 let room_keys_received_sender = broadcast::Sender::new(10);
57 let room_keys_withheld_received_sender = broadcast::Sender::new(10);
58 let secrets_broadcaster = broadcast::Sender::new(10);
59 let identities_broadcaster = broadcast::Sender::new(20);
62
63 Self {
64 user_id: user_id.to_owned(),
65 device_id: device_id.to_owned(),
66 store: store.into_crypto_store(),
67 sessions: SessionStore::new(),
68 room_keys_received_sender,
69 room_keys_withheld_received_sender,
70 secrets_broadcaster,
71 identities_broadcaster,
72 }
73 }
74
75 pub async fn save_changes(&self, changes: Changes) -> store::Result<()> {
84 let room_key_updates: Vec<_> =
85 changes.inbound_group_sessions.iter().map(RoomKeyInfo::from).collect();
86
87 let withheld_session_updates: Vec<_> = changes
88 .withheld_session_info
89 .iter()
90 .flat_map(|(room_id, session_map)| {
91 session_map.iter().map(|(session_id, withheld_event)| RoomKeyWithheldInfo {
92 room_id: room_id.to_owned(),
93 session_id: session_id.to_owned(),
94 withheld_event: withheld_event.clone(),
95 })
96 })
97 .collect();
98
99 let own_identity_was_verified_before_change = self
103 .store
104 .get_user_identity(self.user_id.as_ref())
105 .await?
106 .as_ref()
107 .and_then(|i| i.own())
108 .is_some_and(|own| own.is_verified());
109
110 let secrets = changes.secrets.to_owned();
111 let devices = changes.devices.to_owned();
112 let identities = changes.identities.to_owned();
113
114 if devices
115 .changed
116 .iter()
117 .any(|d| d.user_id() == self.user_id && d.device_id() == self.device_id)
118 {
119 self.sessions.clear().await;
123 } else {
124 for session in &changes.sessions {
126 self.sessions.add(session.clone()).await;
127 }
128 }
129
130 self.store.save_changes(changes).await?;
131
132 if tracing::level_enabled!(tracing::Level::DEBUG) {
134 for updated_identity in
135 identities.new.iter().chain(identities.changed.iter()).filter_map(|id| id.own())
136 {
137 let master_key = updated_identity.master_key().get_first_key();
138 let user_signing_key = updated_identity.user_signing_key().get_first_key();
139 let self_signing_key = updated_identity.self_signing_key().get_first_key();
140
141 debug!(
142 ?master_key,
143 ?user_signing_key,
144 ?self_signing_key,
145 previously_verified = updated_identity.was_previously_verified(),
146 verified = updated_identity.is_verified(),
147 "Stored our own identity"
148 );
149 }
150 }
151
152 if !room_key_updates.is_empty() {
153 let _ = self.room_keys_received_sender.send(room_key_updates);
155 }
156
157 if !withheld_session_updates.is_empty() {
158 let _ = self.room_keys_withheld_received_sender.send(withheld_session_updates);
159 }
160
161 for secret in secrets {
162 let _ = self.secrets_broadcaster.send(secret);
163 }
164
165 if !devices.is_empty() || !identities.is_empty() {
166 let maybe_own_identity =
170 self.store.get_user_identity(&self.user_id).await?.and_then(|i| i.into_own());
171
172 if let Some(own_identity_after) = maybe_own_identity.as_ref() {
176 let own_identity_is_verified = own_identity_after.is_verified();
179
180 if !own_identity_was_verified_before_change && own_identity_is_verified {
181 debug!("Own identity is now verified, check all known identities for verification status changes");
182 self.check_all_identities_and_update_was_previously_verified_flag_if_needed(
185 own_identity_after,
186 )
187 .await?;
188 } else if own_identity_was_verified_before_change != own_identity_is_verified {
189 debug!(
191 own_identity_is_verified,
192 "The verification state of our own identity has changed",
193 );
194 }
195 }
196
197 let _ = self.identities_broadcaster.send((maybe_own_identity, identities, devices));
198 }
199
200 Ok(())
201 }
202
203 async fn check_all_identities_and_update_was_previously_verified_flag_if_needed(
204 &self,
205 own_identity_after: &OwnUserIdentityData,
206 ) -> Result<(), CryptoStoreError> {
207 let tracked_users = self.store.load_tracked_users().await?;
208 let mut updated_identities: Vec<UserIdentityData> = Default::default();
209 for tracked_user in tracked_users {
210 if let Some(other_identity) = self
211 .store
212 .get_user_identity(tracked_user.user_id.as_ref())
213 .await?
214 .as_ref()
215 .and_then(|i| i.other())
216 {
217 if !other_identity.was_previously_verified()
218 && own_identity_after.is_identity_signed(other_identity)
219 {
220 trace!(?tracked_user.user_id, "Marking set verified_latch to true.");
221 other_identity.mark_as_previously_verified();
222 updated_identities.push(other_identity.clone().into());
223 }
224 }
225 }
226
227 if !updated_identities.is_empty() {
228 let identity_changes =
229 IdentityChanges { changed: updated_identities, ..Default::default() };
230 self.store
231 .save_changes(Changes {
232 identities: identity_changes.clone(),
233 ..Default::default()
234 })
235 .await?;
236
237 let _ = self.identities_broadcaster.send((
238 Some(own_identity_after.clone()),
239 identity_changes,
240 DeviceChanges::default(),
241 ));
242 }
243
244 Ok(())
245 }
246
247 pub async fn get_sessions(
248 &self,
249 sender_key: &str,
250 ) -> store::Result<Option<Arc<Mutex<Vec<Session>>>>> {
251 let sessions = self.sessions.get(sender_key).await;
252
253 let sessions = if sessions.is_none() {
254 let mut entries = self.sessions.entries.write().await;
255
256 let sessions = entries.get(sender_key);
257
258 if sessions.is_some() {
259 sessions.cloned()
260 } else {
261 let sessions = self.store.get_sessions(sender_key).await?;
262 let sessions = Arc::new(Mutex::new(sessions.unwrap_or_default()));
263
264 entries.insert(sender_key.to_owned(), sessions.clone());
265
266 Some(sessions)
267 }
268 } else {
269 sessions
270 };
271
272 Ok(sessions)
273 }
274
275 pub async fn save_inbound_group_sessions(
286 &self,
287 sessions: Vec<InboundGroupSession>,
288 backed_up_to_version: Option<&str>,
289 ) -> store::Result<()> {
290 let room_key_updates: Vec<_> = sessions.iter().map(RoomKeyInfo::from).collect();
291 self.store.save_inbound_group_sessions(sessions, backed_up_to_version).await?;
292
293 if !room_key_updates.is_empty() {
294 let _ = self.room_keys_received_sender.send(room_key_updates);
296 }
297 Ok(())
298 }
299
300 pub fn room_keys_received_stream(
309 &self,
310 ) -> impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> {
311 BroadcastStream::new(self.room_keys_received_sender.subscribe())
312 }
313
314 pub fn room_keys_withheld_received_stream(
323 &self,
324 ) -> impl Stream<Item = Vec<RoomKeyWithheldInfo>> {
325 let stream = BroadcastStream::new(self.room_keys_withheld_received_sender.subscribe());
326 Self::filter_errors_out_of_stream(stream, "room_keys_withheld_received_stream")
327 }
328
329 pub fn secrets_stream(&self) -> impl Stream<Item = GossippedSecret> {
332 let stream = BroadcastStream::new(self.secrets_broadcaster.subscribe());
333 Self::filter_errors_out_of_stream(stream, "secrets_stream")
334 }
335
336 pub(super) fn identities_stream(
341 &self,
342 ) -> impl Stream<Item = (Option<OwnUserIdentityData>, IdentityChanges, DeviceChanges)> {
343 let stream = BroadcastStream::new(self.identities_broadcaster.subscribe());
344 Self::filter_errors_out_of_stream(stream, "identities_stream")
345 }
346
347 fn filter_errors_out_of_stream<ItemType>(
354 stream: BroadcastStream<ItemType>,
355 stream_name: &str,
356 ) -> impl Stream<Item = ItemType>
357 where
358 ItemType: 'static + Clone + Send,
359 {
360 let stream_name = stream_name.to_owned();
361 stream.filter_map(move |result| {
362 future::ready(match result {
363 Ok(r) => Some(r),
364 Err(BroadcastStreamRecvError::Lagged(lag)) => {
365 warn!("{stream_name} missed {lag} updates");
366 None
367 }
368 })
369 })
370 }
371
372 pub(crate) fn create_store_lock(
375 &self,
376 lock_key: String,
377 lock_value: String,
378 ) -> CrossProcessStoreLock<LockableCryptoStore> {
379 CrossProcessStoreLock::new(LockableCryptoStore(self.store.clone()), lock_key, lock_value)
380 }
381}
382
383impl Deref for CryptoStoreWrapper {
384 type Target = DynCryptoStore;
385
386 fn deref(&self) -> &Self::Target {
387 self.store.deref()
388 }
389}
390
391#[cfg(test)]
392mod test {
393 use matrix_sdk_test::async_test;
394 use ruma::user_id;
395
396 use super::*;
397 use crate::machine::test_helpers::get_machine_pair_with_setup_sessions_test_helper;
398
399 #[async_test]
400 async fn test_cache_cleared_after_device_update() {
401 let user_id = user_id!("@alice:example.com");
402 let (first, second) =
403 get_machine_pair_with_setup_sessions_test_helper(user_id, user_id, false).await;
404
405 let sender_key = second.identity_keys().curve25519.to_base64();
406
407 first
408 .store()
409 .inner
410 .store
411 .sessions
412 .get(&sender_key)
413 .await
414 .expect("We should have a session in the cache.");
415
416 let device_data = first
417 .get_device(user_id, first.device_id(), None)
418 .await
419 .unwrap()
420 .expect("We should have access to our own device.")
421 .inner;
422
423 first
425 .store()
426 .save_changes(Changes {
427 devices: DeviceChanges { changed: vec![device_data], ..Default::default() },
428 ..Default::default()
429 })
430 .await
431 .unwrap();
432
433 assert!(
435 first.store().inner.store.sessions.get(&sender_key).await.is_none(),
436 "The session should no longer be in the cache after our own device keys changed"
437 );
438 }
439}