1use std::{future, ops::Deref, sync::Arc};
2
3use futures_core::Stream;
4use futures_util::StreamExt;
5use matrix_sdk_common::cross_process_lock::CrossProcessLock;
6use ruma::{DeviceId, OwnedDeviceId, OwnedUserId, UserId};
7use tokio::sync::{Mutex, broadcast};
8use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
9use tracing::{debug, trace, warn};
10
11use super::{
12 DeviceChanges, IdentityChanges, LockableCryptoStore, caches::SessionStore,
13 types::RoomKeyBundleInfo,
14};
15use crate::{
16 CryptoStoreError, GossippedSecret, OwnUserIdentityData, Session, UserIdentityData,
17 olm::InboundGroupSession,
18 store,
19 store::{Changes, DynCryptoStore, IntoCryptoStore, RoomKeyInfo, RoomKeyWithheldInfo},
20};
21
22#[derive(Debug)]
27pub(crate) struct CryptoStoreWrapper {
28 user_id: OwnedUserId,
29 device_id: OwnedDeviceId,
30
31 store: Arc<DynCryptoStore>,
32
33 sessions: SessionStore,
35
36 room_keys_received_sender: broadcast::Sender<Vec<RoomKeyInfo>>,
39
40 room_keys_withheld_received_sender: broadcast::Sender<Vec<RoomKeyWithheldInfo>>,
43
44 secrets_broadcaster: broadcast::Sender<GossippedSecret>,
47
48 identities_broadcaster:
51 broadcast::Sender<(Option<OwnUserIdentityData>, IdentityChanges, DeviceChanges)>,
52
53 historic_room_key_bundles_broadcaster: broadcast::Sender<RoomKeyBundleInfo>,
56}
57
58impl CryptoStoreWrapper {
59 pub(crate) fn new(user_id: &UserId, device_id: &DeviceId, store: impl IntoCryptoStore) -> Self {
60 let room_keys_received_sender = broadcast::Sender::new(10);
61 let room_keys_withheld_received_sender = broadcast::Sender::new(10);
62 let secrets_broadcaster = broadcast::Sender::new(10);
63 let identities_broadcaster = broadcast::Sender::new(20);
66 let historic_room_key_bundles_broadcaster = broadcast::Sender::new(10);
67
68 Self {
69 user_id: user_id.to_owned(),
70 device_id: device_id.to_owned(),
71 store: store.into_crypto_store(),
72 sessions: SessionStore::new(),
73 room_keys_received_sender,
74 room_keys_withheld_received_sender,
75 secrets_broadcaster,
76 identities_broadcaster,
77 historic_room_key_bundles_broadcaster,
78 }
79 }
80
81 pub async fn save_changes(&self, changes: Changes) -> store::Result<()> {
90 let room_key_updates: Vec<_> =
91 changes.inbound_group_sessions.iter().map(RoomKeyInfo::from).collect();
92
93 let withheld_session_updates: Vec<_> = changes
94 .withheld_session_info
95 .iter()
96 .flat_map(|(room_id, session_map)| {
97 session_map.iter().map(|(session_id, withheld_event)| RoomKeyWithheldInfo {
98 room_id: room_id.to_owned(),
99 session_id: session_id.to_owned(),
100 withheld_event: withheld_event.clone(),
101 })
102 })
103 .collect();
104
105 let own_identity_was_verified_before_change = self
109 .store
110 .get_user_identity(self.user_id.as_ref())
111 .await?
112 .as_ref()
113 .and_then(|i| i.own())
114 .is_some_and(|own| own.is_verified());
115
116 let secrets = changes.secrets.to_owned();
117 let devices = changes.devices.to_owned();
118 let identities = changes.identities.to_owned();
119 let room_key_bundle_updates: Vec<_> =
120 changes.received_room_key_bundles.iter().map(RoomKeyBundleInfo::from).collect();
121
122 if devices
123 .changed
124 .iter()
125 .any(|d| d.user_id() == self.user_id && d.device_id() == self.device_id)
126 {
127 self.sessions.clear().await;
131 } else {
132 for session in &changes.sessions {
134 self.sessions.add(session.clone()).await;
135 }
136 }
137
138 self.store.save_changes(changes).await?;
139
140 if tracing::level_enabled!(tracing::Level::DEBUG) {
142 for updated_identity in
143 identities.new.iter().chain(identities.changed.iter()).filter_map(|id| id.own())
144 {
145 let master_key = updated_identity.master_key().get_first_key();
146 let user_signing_key = updated_identity.user_signing_key().get_first_key();
147 let self_signing_key = updated_identity.self_signing_key().get_first_key();
148
149 debug!(
150 ?master_key,
151 ?user_signing_key,
152 ?self_signing_key,
153 previously_verified = updated_identity.was_previously_verified(),
154 verified = updated_identity.is_verified(),
155 "Stored our own identity"
156 );
157 }
158 }
159
160 if !room_key_updates.is_empty() {
161 let _ = self.room_keys_received_sender.send(room_key_updates);
163 }
164
165 if !withheld_session_updates.is_empty() {
166 let _ = self.room_keys_withheld_received_sender.send(withheld_session_updates);
167 }
168
169 for secret in secrets {
170 let _ = self.secrets_broadcaster.send(secret);
171 }
172
173 for bundle_info in room_key_bundle_updates {
174 let _ = self.historic_room_key_bundles_broadcaster.send(bundle_info);
175 }
176
177 if !devices.is_empty() || !identities.is_empty() {
178 let maybe_own_identity =
182 self.store.get_user_identity(&self.user_id).await?.and_then(|i| i.into_own());
183
184 if let Some(own_identity_after) = maybe_own_identity.as_ref() {
188 let own_identity_is_verified = own_identity_after.is_verified();
191
192 if !own_identity_was_verified_before_change && own_identity_is_verified {
193 debug!(
194 "Own identity is now verified, check all known identities for verification status changes"
195 );
196 self.check_all_identities_and_update_was_previously_verified_flag_if_needed(
199 own_identity_after,
200 )
201 .await?;
202 } else if own_identity_was_verified_before_change != own_identity_is_verified {
203 debug!(
205 own_identity_is_verified,
206 "The verification state of our own identity has changed",
207 );
208 }
209 }
210
211 let _ = self.identities_broadcaster.send((maybe_own_identity, identities, devices));
212 }
213
214 Ok(())
215 }
216
217 async fn check_all_identities_and_update_was_previously_verified_flag_if_needed(
218 &self,
219 own_identity_after: &OwnUserIdentityData,
220 ) -> Result<(), CryptoStoreError> {
221 let tracked_users = self.store.load_tracked_users().await?;
222 let mut updated_identities: Vec<UserIdentityData> = Default::default();
223 for tracked_user in tracked_users {
224 if let Some(other_identity) = self
225 .store
226 .get_user_identity(tracked_user.user_id.as_ref())
227 .await?
228 .as_ref()
229 .and_then(|i| i.other())
230 && !other_identity.was_previously_verified()
231 && own_identity_after.is_identity_signed(other_identity)
232 {
233 trace!(?tracked_user.user_id, "Marking set verified_latch to true.");
234 other_identity.mark_as_previously_verified();
235 updated_identities.push(other_identity.clone().into());
236 }
237 }
238
239 if !updated_identities.is_empty() {
240 let identity_changes =
241 IdentityChanges { changed: updated_identities, ..Default::default() };
242 self.store
243 .save_changes(Changes {
244 identities: identity_changes.clone(),
245 ..Default::default()
246 })
247 .await?;
248
249 let _ = self.identities_broadcaster.send((
250 Some(own_identity_after.clone()),
251 identity_changes,
252 DeviceChanges::default(),
253 ));
254 }
255
256 Ok(())
257 }
258
259 pub async fn get_sessions(
260 &self,
261 sender_key: &str,
262 ) -> store::Result<Option<Arc<Mutex<Vec<Session>>>>> {
263 let sessions = self.sessions.get(sender_key).await;
264
265 let sessions = if sessions.is_none() {
266 let mut entries = self.sessions.entries.write().await;
267
268 let sessions = entries.get(sender_key);
269
270 if sessions.is_some() {
271 sessions.cloned()
272 } else {
273 let sessions = self.store.get_sessions(sender_key).await?;
274 let sessions = Arc::new(Mutex::new(sessions.unwrap_or_default()));
275
276 entries.insert(sender_key.to_owned(), sessions.clone());
277
278 Some(sessions)
279 }
280 } else {
281 sessions
282 };
283
284 Ok(sessions)
285 }
286
287 pub async fn save_inbound_group_sessions(
298 &self,
299 sessions: Vec<InboundGroupSession>,
300 backed_up_to_version: Option<&str>,
301 ) -> store::Result<()> {
302 let room_key_updates: Vec<_> = sessions.iter().map(RoomKeyInfo::from).collect();
303 self.store.save_inbound_group_sessions(sessions, backed_up_to_version).await?;
304
305 if !room_key_updates.is_empty() {
306 let _ = self.room_keys_received_sender.send(room_key_updates);
308 }
309 Ok(())
310 }
311
312 pub fn room_keys_received_stream(
321 &self,
322 ) -> impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> + use<> {
323 BroadcastStream::new(self.room_keys_received_sender.subscribe())
324 }
325
326 pub fn room_keys_withheld_received_stream(
335 &self,
336 ) -> impl Stream<Item = Vec<RoomKeyWithheldInfo>> + use<> {
337 let stream = BroadcastStream::new(self.room_keys_withheld_received_sender.subscribe());
338 Self::filter_errors_out_of_stream(stream, "room_keys_withheld_received_stream")
339 }
340
341 pub fn secrets_stream(&self) -> impl Stream<Item = GossippedSecret> + use<> {
344 let stream = BroadcastStream::new(self.secrets_broadcaster.subscribe());
345 Self::filter_errors_out_of_stream(stream, "secrets_stream")
346 }
347
348 pub fn historic_room_key_stream(&self) -> impl Stream<Item = RoomKeyBundleInfo> + use<> {
351 let stream = BroadcastStream::new(self.historic_room_key_bundles_broadcaster.subscribe());
352 Self::filter_errors_out_of_stream(stream, "bundle_stream")
353 }
354
355 pub(super) fn identities_stream(
360 &self,
361 ) -> impl Stream<Item = (Option<OwnUserIdentityData>, IdentityChanges, DeviceChanges)> + use<>
362 {
363 let stream = BroadcastStream::new(self.identities_broadcaster.subscribe());
364 Self::filter_errors_out_of_stream(stream, "identities_stream")
365 }
366
367 fn filter_errors_out_of_stream<ItemType>(
374 stream: BroadcastStream<ItemType>,
375 stream_name: &str,
376 ) -> impl Stream<Item = ItemType> + use<ItemType>
377 where
378 ItemType: 'static + Clone + Send,
379 {
380 let stream_name = stream_name.to_owned();
381 stream.filter_map(move |result| {
382 future::ready(match result {
383 Ok(r) => Some(r),
384 Err(BroadcastStreamRecvError::Lagged(lag)) => {
385 warn!("{stream_name} missed {lag} updates");
386 None
387 }
388 })
389 })
390 }
391
392 pub(crate) fn create_store_lock(
395 &self,
396 lock_key: String,
397 lock_value: String,
398 ) -> CrossProcessLock<LockableCryptoStore> {
399 CrossProcessLock::new(LockableCryptoStore(self.store.clone()), lock_key, lock_value)
400 }
401}
402
403impl Deref for CryptoStoreWrapper {
404 type Target = DynCryptoStore;
405
406 fn deref(&self) -> &Self::Target {
407 self.store.deref()
408 }
409}
410
411#[cfg(test)]
412mod test {
413 use matrix_sdk_test::async_test;
414 use ruma::user_id;
415
416 use super::*;
417 use crate::machine::test_helpers::get_machine_pair_with_setup_sessions_test_helper;
418
419 #[async_test]
420 async fn test_cache_cleared_after_device_update() {
421 let user_id = user_id!("@alice:example.com");
422 let (first, second) =
423 get_machine_pair_with_setup_sessions_test_helper(user_id, user_id, false).await;
424
425 let sender_key = second.identity_keys().curve25519.to_base64();
426
427 first
428 .store()
429 .inner
430 .store
431 .sessions
432 .get(&sender_key)
433 .await
434 .expect("We should have a session in the cache.");
435
436 let device_data = first
437 .get_device(user_id, first.device_id(), None)
438 .await
439 .unwrap()
440 .expect("We should have access to our own device.")
441 .inner;
442
443 first
445 .store()
446 .save_changes(Changes {
447 devices: DeviceChanges { changed: vec![device_data], ..Default::default() },
448 ..Default::default()
449 })
450 .await
451 .unwrap();
452
453 assert!(
455 first.store().inner.store.sessions.get(&sender_key).await.is_none(),
456 "The session should no longer be in the cache after our own device keys changed"
457 );
458 }
459}