1use std::{
16 collections::HashMap,
17 fmt,
18 path::Path,
19 sync::{Arc, RwLock},
20};
21
22use async_trait::async_trait;
23use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
24use matrix_sdk_crypto::{
25 olm::{
26 InboundGroupSession, OutboundGroupSession, PickledInboundGroupSession,
27 PrivateCrossSigningIdentity, SenderDataType, Session, StaticAccountData,
28 },
29 store::{
30 types::{
31 BackupKeys, Changes, DehydratedDeviceKey, PendingChanges, RoomKeyCounts,
32 RoomKeyWithheldEntry, RoomSettings, StoredRoomKeyBundleData,
33 },
34 CryptoStore,
35 },
36 Account, DeviceData, GossipRequest, GossippedSecret, SecretInfo, TrackedUser, UserIdentityData,
37};
38use matrix_sdk_store_encryption::StoreCipher;
39use ruma::{
40 events::secret::request::SecretName, DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId,
41 RoomId, TransactionId, UserId,
42};
43use rusqlite::{named_params, params_from_iter, OptionalExtension};
44use tokio::{fs, sync::Mutex};
45use tracing::{debug, instrument, warn};
46use vodozemac::Curve25519PublicKey;
47
48use crate::{
49 error::{Error, Result},
50 utils::{
51 repeat_vars, EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
52 SqliteKeyValueStoreConnExt,
53 },
54 OpenStoreError, Secret, SqliteStoreConfig,
55};
56
57const DATABASE_NAME: &str = "matrix-sdk-crypto.sqlite3";
59
60#[derive(Clone)]
62pub struct SqliteCryptoStore {
63 store_cipher: Option<Arc<StoreCipher>>,
64 pool: SqlitePool,
65
66 static_account: Arc<RwLock<Option<StaticAccountData>>>,
68 save_changes_lock: Arc<Mutex<()>>,
69}
70
71#[cfg(not(tarpaulin_include))]
72impl fmt::Debug for SqliteCryptoStore {
73 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74 f.debug_struct("SqliteCryptoStore").finish_non_exhaustive()
75 }
76}
77
78impl EncryptableStore for SqliteCryptoStore {
79 fn get_cypher(&self) -> Option<&StoreCipher> {
80 self.store_cipher.as_deref()
81 }
82}
83
84impl SqliteCryptoStore {
85 pub async fn open(
88 path: impl AsRef<Path>,
89 passphrase: Option<&str>,
90 ) -> Result<Self, OpenStoreError> {
91 Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
92 }
93
94 pub async fn open_with_key(
97 path: impl AsRef<Path>,
98 key: Option<&[u8; 32]>,
99 ) -> Result<Self, OpenStoreError> {
100 Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
101 }
102
103 pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
105 let SqliteStoreConfig { path, pool_config, runtime_config, secret } = config;
106
107 fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
108
109 let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
110 config.pool = Some(pool_config);
111
112 let pool = config.create_pool(Runtime::Tokio1)?;
113
114 let this = Self::open_with_pool(pool, secret).await?;
115 this.pool.get().await?.apply_runtime_config(runtime_config).await?;
116
117 Ok(this)
118 }
119
120 async fn open_with_pool(
123 pool: SqlitePool,
124 secret: Option<Secret>,
125 ) -> Result<Self, OpenStoreError> {
126 let conn = pool.get().await?;
127
128 let version = conn.db_version().await?;
129 debug!("Opened sqlite store with version {}", version);
130 run_migrations(&conn, version).await?;
131
132 let store_cipher = match secret {
133 Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
134 None => None,
135 };
136
137 Ok(SqliteCryptoStore {
138 store_cipher,
139 pool,
140 static_account: Arc::new(RwLock::new(None)),
141 save_changes_lock: Default::default(),
142 })
143 }
144
145 fn deserialize_and_unpickle_inbound_group_session(
146 &self,
147 value: Vec<u8>,
148 backed_up: bool,
149 ) -> Result<InboundGroupSession> {
150 let mut pickle: PickledInboundGroupSession = self.deserialize_value(&value)?;
151
152 pickle.backed_up = backed_up;
157
158 Ok(InboundGroupSession::from_pickle(pickle)?)
159 }
160
161 fn deserialize_key_request(&self, value: &[u8], sent_out: bool) -> Result<GossipRequest> {
162 let mut request: GossipRequest = self.deserialize_value(value)?;
163 request.sent_out = sent_out;
166 Ok(request)
167 }
168
169 fn get_static_account(&self) -> Option<StaticAccountData> {
170 self.static_account.read().unwrap().clone()
171 }
172
173 async fn acquire(&self) -> Result<SqliteAsyncConn> {
174 Ok(self.pool.get().await?)
175 }
176}
177
178const DATABASE_VERSION: u8 = 12;
179
180const DEHYDRATED_DEVICE_PICKLE_KEY: &str = "dehydrated_device_pickle_key";
182
183async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
185 if version == 0 {
186 debug!("Creating database");
187 } else if version < DATABASE_VERSION {
188 debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
189 } else {
190 return Ok(());
191 }
192
193 if version < 1 {
194 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
197 conn.with_transaction(|txn| {
198 txn.execute_batch(include_str!("../migrations/crypto_store/001_init.sql"))?;
199 txn.set_db_version(1)
200 })
201 .await?;
202 }
203
204 if version < 2 {
205 conn.with_transaction(|txn| {
206 txn.execute_batch(include_str!("../migrations/crypto_store/002_reset_olm_hash.sql"))?;
207 txn.set_db_version(2)
208 })
209 .await?;
210 }
211
212 if version < 3 {
213 conn.with_transaction(|txn| {
214 txn.execute_batch(include_str!("../migrations/crypto_store/003_room_settings.sql"))?;
215 txn.set_db_version(3)
216 })
217 .await?;
218 }
219
220 if version < 4 {
221 conn.with_transaction(|txn| {
222 txn.execute_batch(include_str!(
223 "../migrations/crypto_store/004_drop_outbound_group_sessions.sql"
224 ))?;
225 txn.set_db_version(4)
226 })
227 .await?;
228 }
229
230 if version < 5 {
231 conn.with_transaction(|txn| {
232 txn.execute_batch(include_str!("../migrations/crypto_store/005_withheld_code.sql"))?;
233 txn.set_db_version(5)
234 })
235 .await?;
236 }
237
238 if version < 6 {
239 conn.with_transaction(|txn| {
240 txn.execute_batch(include_str!(
241 "../migrations/crypto_store/006_drop_outbound_group_sessions.sql"
242 ))?;
243 txn.set_db_version(6)
244 })
245 .await?;
246 }
247
248 if version < 7 {
249 conn.with_transaction(|txn| {
250 txn.execute_batch(include_str!("../migrations/crypto_store/007_lock_leases.sql"))?;
251 txn.set_db_version(7)
252 })
253 .await?;
254 }
255
256 if version < 8 {
257 conn.with_transaction(|txn| {
258 txn.execute_batch(include_str!("../migrations/crypto_store/008_secret_inbox.sql"))?;
259 txn.set_db_version(8)
260 })
261 .await?;
262 }
263
264 if version < 9 {
265 conn.with_transaction(|txn| {
266 txn.execute_batch(include_str!(
267 "../migrations/crypto_store/009_inbound_group_session_sender_key_sender_data_type.sql"
268 ))?;
269 txn.set_db_version(9)
270 })
271 .await?;
272 }
273
274 if version < 10 {
275 conn.with_transaction(|txn| {
276 txn.execute_batch(include_str!(
277 "../migrations/crypto_store/010_received_room_key_bundles.sql"
278 ))?;
279 txn.set_db_version(10)
280 })
281 .await?;
282 }
283
284 if version < 11 {
285 conn.with_transaction(|txn| {
286 txn.execute_batch(include_str!(
287 "../migrations/crypto_store/011_received_room_key_bundles_with_curve_key.sql"
288 ))?;
289 txn.set_db_version(11)
290 })
291 .await?;
292 }
293
294 if version < 12 {
295 conn.with_transaction(|txn| {
296 txn.execute_batch(include_str!(
297 "../migrations/crypto_store/012_withheld_code_by_room.sql"
298 ))?;
299 txn.set_db_version(12)
300 })
301 .await?;
302 }
303
304 Ok(())
305}
306
307trait SqliteConnectionExt {
308 fn set_session(
309 &self,
310 session_id: &[u8],
311 sender_key: &[u8],
312 data: &[u8],
313 ) -> rusqlite::Result<()>;
314
315 fn set_inbound_group_session(
316 &self,
317 room_id: &[u8],
318 session_id: &[u8],
319 data: &[u8],
320 backed_up: bool,
321 sender_key: Option<&[u8]>,
322 sender_data_type: Option<u8>,
323 ) -> rusqlite::Result<()>;
324
325 fn set_outbound_group_session(&self, room_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
326
327 fn set_device(&self, user_id: &[u8], device_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
328 fn delete_device(&self, user_id: &[u8], device_id: &[u8]) -> rusqlite::Result<()>;
329
330 fn set_identity(&self, user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
331
332 fn add_olm_hash(&self, data: &[u8]) -> rusqlite::Result<()>;
333
334 fn set_key_request(
335 &self,
336 request_id: &[u8],
337 sent_out: bool,
338 data: &[u8],
339 ) -> rusqlite::Result<()>;
340
341 fn set_direct_withheld(
342 &self,
343 session_id: &[u8],
344 room_id: &[u8],
345 data: &[u8],
346 ) -> rusqlite::Result<()>;
347
348 fn set_room_settings(&self, room_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
349
350 fn set_secret(&self, request_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
351
352 fn set_received_room_key_bundle(
353 &self,
354 room_id: &[u8],
355 user_id: &[u8],
356 data: &[u8],
357 ) -> rusqlite::Result<()>;
358}
359
360impl SqliteConnectionExt for rusqlite::Connection {
361 fn set_session(
362 &self,
363 session_id: &[u8],
364 sender_key: &[u8],
365 data: &[u8],
366 ) -> rusqlite::Result<()> {
367 self.execute(
368 "INSERT INTO session (session_id, sender_key, data)
369 VALUES (?1, ?2, ?3)
370 ON CONFLICT (session_id) DO UPDATE SET data = ?3",
371 (session_id, sender_key, data),
372 )?;
373 Ok(())
374 }
375
376 fn set_inbound_group_session(
377 &self,
378 room_id: &[u8],
379 session_id: &[u8],
380 data: &[u8],
381 backed_up: bool,
382 sender_key: Option<&[u8]>,
383 sender_data_type: Option<u8>,
384 ) -> rusqlite::Result<()> {
385 self.execute(
386 "INSERT INTO inbound_group_session (session_id, room_id, data, backed_up, sender_key, sender_data_type) \
387 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
388 ON CONFLICT (session_id) DO UPDATE SET data = ?3, backed_up = ?4, sender_key = ?5, sender_data_type = ?6",
389 (session_id, room_id, data, backed_up, sender_key, sender_data_type),
390 )?;
391 Ok(())
392 }
393
394 fn set_outbound_group_session(&self, room_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
395 self.execute(
396 "INSERT INTO outbound_group_session (room_id, data) \
397 VALUES (?1, ?2)
398 ON CONFLICT (room_id) DO UPDATE SET data = ?2",
399 (room_id, data),
400 )?;
401 Ok(())
402 }
403
404 fn set_device(&self, user_id: &[u8], device_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
405 self.execute(
406 "INSERT INTO device (user_id, device_id, data) \
407 VALUES (?1, ?2, ?3)
408 ON CONFLICT (user_id, device_id) DO UPDATE SET data = ?3",
409 (user_id, device_id, data),
410 )?;
411 Ok(())
412 }
413
414 fn delete_device(&self, user_id: &[u8], device_id: &[u8]) -> rusqlite::Result<()> {
415 self.execute(
416 "DELETE FROM device WHERE user_id = ? AND device_id = ?",
417 (user_id, device_id),
418 )?;
419 Ok(())
420 }
421
422 fn set_identity(&self, user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
423 self.execute(
424 "INSERT INTO identity (user_id, data) \
425 VALUES (?1, ?2)
426 ON CONFLICT (user_id) DO UPDATE SET data = ?2",
427 (user_id, data),
428 )?;
429 Ok(())
430 }
431
432 fn add_olm_hash(&self, data: &[u8]) -> rusqlite::Result<()> {
433 self.execute("INSERT INTO olm_hash (data) VALUES (?) ON CONFLICT DO NOTHING", (data,))?;
434 Ok(())
435 }
436
437 fn set_key_request(
438 &self,
439 request_id: &[u8],
440 sent_out: bool,
441 data: &[u8],
442 ) -> rusqlite::Result<()> {
443 self.execute(
444 "INSERT INTO key_requests (request_id, sent_out, data)
445 VALUES (?1, ?2, ?3)
446 ON CONFLICT (request_id) DO UPDATE SET sent_out = ?2, data = ?3",
447 (request_id, sent_out, data),
448 )?;
449 Ok(())
450 }
451
452 fn set_direct_withheld(
453 &self,
454 session_id: &[u8],
455 room_id: &[u8],
456 data: &[u8],
457 ) -> rusqlite::Result<()> {
458 self.execute(
459 "INSERT INTO direct_withheld_info (session_id, room_id, data)
460 VALUES (?1, ?2, ?3)
461 ON CONFLICT (session_id) DO UPDATE SET room_id = ?2, data = ?3",
462 (session_id, room_id, data),
463 )?;
464 Ok(())
465 }
466
467 fn set_room_settings(&self, room_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
468 self.execute(
469 "INSERT INTO room_settings (room_id, data)
470 VALUES (?1, ?2)
471 ON CONFLICT (room_id) DO UPDATE SET data = ?2",
472 (room_id, data),
473 )?;
474 Ok(())
475 }
476
477 fn set_secret(&self, secret_name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
478 self.execute(
479 "INSERT INTO secrets (secret_name, data)
480 VALUES (?1, ?2)",
481 (secret_name, data),
482 )?;
483
484 Ok(())
485 }
486
487 fn set_received_room_key_bundle(
488 &self,
489 room_id: &[u8],
490 sender_user_id: &[u8],
491 data: &[u8],
492 ) -> rusqlite::Result<()> {
493 self.execute(
494 "INSERT INTO received_room_key_bundle(room_id, sender_user_id, bundle_data)
495 VALUES (?1, ?2, ?3)
496 ON CONFLICT (room_id, sender_user_id) DO UPDATE SET bundle_data = ?3",
497 (room_id, sender_user_id, data),
498 )?;
499 Ok(())
500 }
501}
502
503#[async_trait]
504trait SqliteObjectCryptoStoreExt: SqliteAsyncConnExt {
505 async fn get_sessions_for_sender_key(&self, sender_key: Key) -> Result<Vec<Vec<u8>>> {
506 Ok(self
507 .prepare("SELECT data FROM session WHERE sender_key = ?", |mut stmt| {
508 stmt.query((sender_key,))?.mapped(|row| row.get(0)).collect()
509 })
510 .await?)
511 }
512
513 async fn get_inbound_group_session(
514 &self,
515 session_id: Key,
516 ) -> Result<Option<(Vec<u8>, Vec<u8>, bool)>> {
517 Ok(self
518 .query_row(
519 "SELECT room_id, data, backed_up FROM inbound_group_session WHERE session_id = ?",
520 (session_id,),
521 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
522 )
523 .await
524 .optional()?)
525 }
526
527 async fn get_inbound_group_sessions(&self) -> Result<Vec<(Vec<u8>, bool)>> {
528 Ok(self
529 .prepare("SELECT data, backed_up FROM inbound_group_session", |mut stmt| {
530 stmt.query(())?.mapped(|row| Ok((row.get(0)?, row.get(1)?))).collect()
531 })
532 .await?)
533 }
534
535 async fn get_inbound_group_session_counts(
536 &self,
537 _backup_version: Option<&str>,
538 ) -> Result<RoomKeyCounts> {
539 let total = self
540 .query_row("SELECT count(*) FROM inbound_group_session", (), |row| row.get(0))
541 .await?;
542 let backed_up = self
543 .query_row(
544 "SELECT count(*) FROM inbound_group_session WHERE backed_up = TRUE",
545 (),
546 |row| row.get(0),
547 )
548 .await?;
549 Ok(RoomKeyCounts { total, backed_up })
550 }
551
552 async fn get_inbound_group_sessions_by_room_id(
553 &self,
554 room_id: Key,
555 ) -> Result<Vec<(Vec<u8>, bool)>> {
556 Ok(self
557 .prepare(
558 "SELECT data, backed_up FROM inbound_group_session WHERE room_id = :room_id",
559 move |mut stmt| {
560 stmt.query(named_params! {
561 ":room_id": room_id,
562 })?
563 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
564 .collect()
565 },
566 )
567 .await?)
568 }
569
570 async fn get_inbound_group_sessions_for_device_batch(
571 &self,
572 sender_key: Key,
573 sender_data_type: SenderDataType,
574 after_session_id: Option<Key>,
575 limit: usize,
576 ) -> Result<Vec<(Vec<u8>, bool)>> {
577 Ok(self
578 .prepare(
579 "
580 SELECT data, backed_up
581 FROM inbound_group_session
582 WHERE sender_key = :sender_key
583 AND sender_data_type = :sender_data_type
584 AND session_id > :after_session_id
585 ORDER BY session_id
586 LIMIT :limit
587 ",
588 move |mut stmt| {
589 let sender_data_type = sender_data_type as u8;
590
591 let after_session_id = after_session_id.unwrap_or(Key::Plain(Vec::new()));
594
595 stmt.query(named_params! {
596 ":sender_key": sender_key,
597 ":sender_data_type": sender_data_type,
598 ":after_session_id": after_session_id,
599 ":limit": limit,
600 })?
601 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
602 .collect()
603 },
604 )
605 .await?)
606 }
607
608 async fn get_inbound_group_sessions_for_backup(&self, limit: usize) -> Result<Vec<Vec<u8>>> {
609 Ok(self
610 .prepare(
611 "SELECT data FROM inbound_group_session WHERE backed_up = FALSE LIMIT ?",
612 move |mut stmt| stmt.query((limit,))?.mapped(|row| row.get(0)).collect(),
613 )
614 .await?)
615 }
616
617 async fn mark_inbound_group_sessions_as_backed_up(&self, session_ids: Vec<Key>) -> Result<()> {
618 if session_ids.is_empty() {
619 warn!("No sessions to mark as backed up!");
621 return Ok(());
622 }
623
624 let session_ids_len = session_ids.len();
625
626 self.chunk_large_query_over(session_ids, None, move |txn, session_ids| {
627 let sql_params = repeat_vars(session_ids_len);
630 let query = format!("UPDATE inbound_group_session SET backed_up = TRUE where session_id IN ({sql_params})");
631 txn.prepare(&query)?.execute(params_from_iter(session_ids.iter()))?;
632 Ok(Vec::<()>::new())
633 }).await?;
634
635 Ok(())
636 }
637
638 async fn reset_inbound_group_session_backup_state(&self) -> Result<()> {
639 self.execute("UPDATE inbound_group_session SET backed_up = FALSE", ()).await?;
640 Ok(())
641 }
642
643 async fn get_outbound_group_session(&self, room_id: Key) -> Result<Option<Vec<u8>>> {
644 Ok(self
645 .query_row(
646 "SELECT data FROM outbound_group_session WHERE room_id = ?",
647 (room_id,),
648 |row| row.get(0),
649 )
650 .await
651 .optional()?)
652 }
653
654 async fn get_device(&self, user_id: Key, device_id: Key) -> Result<Option<Vec<u8>>> {
655 Ok(self
656 .query_row(
657 "SELECT data FROM device WHERE user_id = ? AND device_id = ?",
658 (user_id, device_id),
659 |row| row.get(0),
660 )
661 .await
662 .optional()?)
663 }
664
665 async fn get_user_devices(&self, user_id: Key) -> Result<Vec<Vec<u8>>> {
666 Ok(self
667 .prepare("SELECT data FROM device WHERE user_id = ?", |mut stmt| {
668 stmt.query((user_id,))?.mapped(|row| row.get(0)).collect()
669 })
670 .await?)
671 }
672
673 async fn get_user_identity(&self, user_id: Key) -> Result<Option<Vec<u8>>> {
674 Ok(self
675 .query_row("SELECT data FROM identity WHERE user_id = ?", (user_id,), |row| row.get(0))
676 .await
677 .optional()?)
678 }
679
680 async fn has_olm_hash(&self, data: Vec<u8>) -> Result<bool> {
681 Ok(self
682 .query_row("SELECT count(*) FROM olm_hash WHERE data = ?", (data,), |row| {
683 row.get::<_, i32>(0)
684 })
685 .await?
686 > 0)
687 }
688
689 async fn get_tracked_users(&self) -> Result<Vec<Vec<u8>>> {
690 Ok(self
691 .prepare("SELECT data FROM tracked_user", |mut stmt| {
692 stmt.query(())?.mapped(|row| row.get(0)).collect()
693 })
694 .await?)
695 }
696
697 async fn add_tracked_users(&self, users: Vec<(Key, Vec<u8>)>) -> Result<()> {
698 Ok(self
699 .prepare(
700 "INSERT INTO tracked_user (user_id, data) \
701 VALUES (?1, ?2) \
702 ON CONFLICT (user_id) DO UPDATE SET data = ?2",
703 |mut stmt| {
704 for (user_id, data) in users {
705 stmt.execute((user_id, data))?;
706 }
707
708 Ok(())
709 },
710 )
711 .await?)
712 }
713
714 async fn get_outgoing_secret_request(
715 &self,
716 request_id: Key,
717 ) -> Result<Option<(Vec<u8>, bool)>> {
718 Ok(self
719 .query_row(
720 "SELECT data, sent_out FROM key_requests WHERE request_id = ?",
721 (request_id,),
722 |row| Ok((row.get(0)?, row.get(1)?)),
723 )
724 .await
725 .optional()?)
726 }
727
728 async fn get_outgoing_secret_requests(&self) -> Result<Vec<(Vec<u8>, bool)>> {
729 Ok(self
730 .prepare("SELECT data, sent_out FROM key_requests", |mut stmt| {
731 stmt.query(())?.mapped(|row| Ok((row.get(0)?, row.get(1)?))).collect()
732 })
733 .await?)
734 }
735
736 async fn get_unsent_secret_requests(&self) -> Result<Vec<Vec<u8>>> {
737 Ok(self
738 .prepare("SELECT data FROM key_requests WHERE sent_out = FALSE", |mut stmt| {
739 stmt.query(())?.mapped(|row| row.get(0)).collect()
740 })
741 .await?)
742 }
743
744 async fn delete_key_request(&self, request_id: Key) -> Result<()> {
745 self.execute("DELETE FROM key_requests WHERE request_id = ?", (request_id,)).await?;
746 Ok(())
747 }
748
749 async fn get_secrets_from_inbox(&self, secret_name: Key) -> Result<Vec<Vec<u8>>> {
750 Ok(self
751 .prepare("SELECT data FROM secrets WHERE secret_name = ?", |mut stmt| {
752 stmt.query((secret_name,))?.mapped(|row| row.get(0)).collect()
753 })
754 .await?)
755 }
756
757 async fn delete_secrets_from_inbox(&self, secret_name: Key) -> Result<()> {
758 self.execute("DELETE FROM secrets WHERE secret_name = ?", (secret_name,)).await?;
759 Ok(())
760 }
761
762 async fn get_direct_withheld_info(
763 &self,
764 session_id: Key,
765 room_id: Key,
766 ) -> Result<Option<Vec<u8>>> {
767 Ok(self
768 .query_row(
769 "SELECT data FROM direct_withheld_info WHERE session_id = ?1 AND room_id = ?2",
770 (session_id, room_id),
771 |row| row.get(0),
772 )
773 .await
774 .optional()?)
775 }
776
777 async fn get_withheld_sessions_by_room_id(&self, room_id: Key) -> Result<Vec<Vec<u8>>> {
778 Ok(self
779 .prepare("SELECT data FROM direct_withheld_info WHERE room_id = ?1", |mut stmt| {
780 stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
781 })
782 .await?)
783 }
784
785 async fn get_room_settings(&self, room_id: Key) -> Result<Option<Vec<u8>>> {
786 Ok(self
787 .query_row("SELECT data FROM room_settings WHERE room_id = ?", (room_id,), |row| {
788 row.get(0)
789 })
790 .await
791 .optional()?)
792 }
793
794 async fn get_received_room_key_bundle(
795 &self,
796 room_id: Key,
797 sender_user: Key,
798 ) -> Result<Option<Vec<u8>>> {
799 Ok(self
800 .query_row(
801 "SELECT bundle_data FROM received_room_key_bundle WHERE room_id = ? AND sender_user_id = ?",
802 (room_id, sender_user),
803 |row| { row.get(0) },
804 )
805 .await
806 .optional()?)
807 }
808}
809
810#[async_trait]
811impl SqliteObjectCryptoStoreExt for SqliteAsyncConn {}
812
813#[async_trait]
814impl CryptoStore for SqliteCryptoStore {
815 type Error = Error;
816
817 async fn load_account(&self) -> Result<Option<Account>> {
818 let conn = self.acquire().await?;
819 if let Some(pickle) = conn.get_kv("account").await? {
820 let pickle = self.deserialize_value(&pickle)?;
821
822 let account = Account::from_pickle(pickle).map_err(|_| Error::Unpickle)?;
823
824 *self.static_account.write().unwrap() = Some(account.static_data().clone());
825
826 Ok(Some(account))
827 } else {
828 Ok(None)
829 }
830 }
831
832 async fn load_identity(&self) -> Result<Option<PrivateCrossSigningIdentity>> {
833 let conn = self.acquire().await?;
834 if let Some(i) = conn.get_kv("identity").await? {
835 let pickle = self.deserialize_value(&i)?;
836 Ok(Some(PrivateCrossSigningIdentity::from_pickle(pickle).map_err(|_| Error::Unpickle)?))
837 } else {
838 Ok(None)
839 }
840 }
841
842 async fn save_pending_changes(&self, changes: PendingChanges) -> Result<()> {
843 let _guard = self.save_changes_lock.lock().await;
848
849 let pickled_account = if let Some(account) = changes.account {
850 *self.static_account.write().unwrap() = Some(account.static_data().clone());
851 Some(account.pickle())
852 } else {
853 None
854 };
855
856 let this = self.clone();
857 self.acquire()
858 .await?
859 .with_transaction(move |txn| {
860 if let Some(pickled_account) = pickled_account {
861 let serialized_account = this.serialize_value(&pickled_account)?;
862 txn.set_kv("account", &serialized_account)?;
863 }
864
865 Ok::<_, Error>(())
866 })
867 .await?;
868
869 Ok(())
870 }
871
872 async fn save_changes(&self, changes: Changes) -> Result<()> {
873 let _guard = self.save_changes_lock.lock().await;
878
879 let pickled_private_identity =
880 if let Some(i) = changes.private_identity { Some(i.pickle().await) } else { None };
881
882 let mut session_changes = Vec::new();
883
884 for session in changes.sessions {
885 let session_id = self.encode_key("session", session.session_id());
886 let sender_key = self.encode_key("session", session.sender_key().to_base64());
887 let pickle = session.pickle().await;
888 session_changes.push((session_id, sender_key, pickle));
889 }
890
891 let mut inbound_session_changes = Vec::new();
892 for session in changes.inbound_group_sessions {
893 let room_id = self.encode_key("inbound_group_session", session.room_id().as_bytes());
894 let session_id = self.encode_key("inbound_group_session", session.session_id());
895 let pickle = session.pickle().await;
896 let sender_key =
897 self.encode_key("inbound_group_session", session.sender_key().to_base64());
898 inbound_session_changes.push((room_id, session_id, pickle, sender_key));
899 }
900
901 let mut outbound_session_changes = Vec::new();
902 for session in changes.outbound_group_sessions {
903 let room_id = self.encode_key("outbound_group_session", session.room_id().as_bytes());
904 let pickle = session.pickle().await;
905 outbound_session_changes.push((room_id, pickle));
906 }
907
908 let this = self.clone();
909 self.acquire()
910 .await?
911 .with_transaction(move |txn| {
912 if let Some(pickled_private_identity) = &pickled_private_identity {
913 let serialized_private_identity =
914 this.serialize_value(pickled_private_identity)?;
915 txn.set_kv("identity", &serialized_private_identity)?;
916 }
917
918 if let Some(token) = &changes.next_batch_token {
919 let serialized_token = this.serialize_value(token)?;
920 txn.set_kv("next_batch_token", &serialized_token)?;
921 }
922
923 if let Some(decryption_key) = &changes.backup_decryption_key {
924 let serialized_decryption_key = this.serialize_value(decryption_key)?;
925 txn.set_kv("recovery_key_v1", &serialized_decryption_key)?;
926 }
927
928 if let Some(backup_version) = &changes.backup_version {
929 let serialized_backup_version = this.serialize_value(backup_version)?;
930 txn.set_kv("backup_version_v1", &serialized_backup_version)?;
931 }
932
933 if let Some(pickle_key) = &changes.dehydrated_device_pickle_key {
934 let serialized_pickle_key = this.serialize_value(pickle_key)?;
935 txn.set_kv(DEHYDRATED_DEVICE_PICKLE_KEY, &serialized_pickle_key)?;
936 }
937
938 for device in changes.devices.new.iter().chain(&changes.devices.changed) {
939 let user_id = this.encode_key("device", device.user_id().as_bytes());
940 let device_id = this.encode_key("device", device.device_id().as_bytes());
941 let data = this.serialize_value(&device)?;
942 txn.set_device(&user_id, &device_id, &data)?;
943 }
944
945 for device in &changes.devices.deleted {
946 let user_id = this.encode_key("device", device.user_id().as_bytes());
947 let device_id = this.encode_key("device", device.device_id().as_bytes());
948 txn.delete_device(&user_id, &device_id)?;
949 }
950
951 for identity in changes.identities.changed.iter().chain(&changes.identities.new) {
952 let user_id = this.encode_key("identity", identity.user_id().as_bytes());
953 let data = this.serialize_value(&identity)?;
954 txn.set_identity(&user_id, &data)?;
955 }
956
957 for (session_id, sender_key, pickle) in &session_changes {
958 let serialized_session = this.serialize_value(&pickle)?;
959 txn.set_session(session_id, sender_key, &serialized_session)?;
960 }
961
962 for (room_id, session_id, pickle, sender_key) in &inbound_session_changes {
963 let serialized_session = this.serialize_value(&pickle)?;
964 txn.set_inbound_group_session(
965 room_id,
966 session_id,
967 &serialized_session,
968 pickle.backed_up,
969 Some(sender_key),
970 Some(pickle.sender_data.to_type() as u8),
971 )?;
972 }
973
974 for (room_id, pickle) in &outbound_session_changes {
975 let serialized_session = this.serialize_json(&pickle)?;
976 txn.set_outbound_group_session(room_id, &serialized_session)?;
977 }
978
979 for hash in &changes.message_hashes {
980 let hash = rmp_serde::to_vec(hash)?;
981 txn.add_olm_hash(&hash)?;
982 }
983
984 for request in changes.key_requests {
985 let request_id = this.encode_key("key_requests", request.request_id.as_bytes());
986 let serialized_request = this.serialize_value(&request)?;
987 txn.set_key_request(&request_id, request.sent_out, &serialized_request)?;
988 }
989
990 for (room_id, data) in changes.withheld_session_info {
991 for (session_id, event) in data {
992 let session_id = this.encode_key("direct_withheld_info", session_id);
993 let room_id = this.encode_key("direct_withheld_info", &room_id);
994 let serialized_info = this.serialize_json(&event)?;
995 txn.set_direct_withheld(&session_id, &room_id, &serialized_info)?;
996 }
997 }
998
999 for (room_id, settings) in changes.room_settings {
1000 let room_id = this.encode_key("room_settings", room_id.as_bytes());
1001 let value = this.serialize_value(&settings)?;
1002 txn.set_room_settings(&room_id, &value)?;
1003 }
1004
1005 for secret in changes.secrets {
1006 let secret_name = this.encode_key("secrets", secret.secret_name.to_string());
1007 let value = this.serialize_json(&secret)?;
1008 txn.set_secret(&secret_name, &value)?;
1009 }
1010
1011 for bundle in changes.received_room_key_bundles {
1012 let room_id =
1013 this.encode_key("received_room_key_bundle", &bundle.bundle_data.room_id);
1014 let user_id = this.encode_key("received_room_key_bundle", &bundle.sender_user);
1015 let value = this.serialize_value(&bundle)?;
1016 txn.set_received_room_key_bundle(&room_id, &user_id, &value)?;
1017 }
1018
1019 Ok::<_, Error>(())
1020 })
1021 .await?;
1022
1023 Ok(())
1024 }
1025
1026 async fn save_inbound_group_sessions(
1027 &self,
1028 sessions: Vec<InboundGroupSession>,
1029 backed_up_to_version: Option<&str>,
1030 ) -> matrix_sdk_crypto::store::Result<(), Self::Error> {
1031 sessions.iter().for_each(|s| {
1033 let backed_up = s.backed_up();
1034 if backed_up != backed_up_to_version.is_some() {
1035 warn!(
1036 backed_up,
1037 backed_up_to_version,
1038 "Session backed-up flag does not correspond to backup version setting",
1039 );
1040 }
1041 });
1042
1043 self.save_changes(Changes { inbound_group_sessions: sessions, ..Changes::default() }).await
1046 }
1047
1048 async fn get_sessions(&self, sender_key: &str) -> Result<Option<Vec<Session>>> {
1049 let device_keys = self.get_own_device().await?.as_device_keys().clone();
1050
1051 let sessions: Vec<_> = self
1052 .acquire()
1053 .await?
1054 .get_sessions_for_sender_key(self.encode_key("session", sender_key.as_bytes()))
1055 .await?
1056 .into_iter()
1057 .map(|bytes| {
1058 let pickle = self.deserialize_value(&bytes)?;
1059 Session::from_pickle(device_keys.clone(), pickle).map_err(|_| Error::AccountUnset)
1060 })
1061 .collect::<Result<_>>()?;
1062
1063 if sessions.is_empty() {
1064 Ok(None)
1065 } else {
1066 Ok(Some(sessions))
1067 }
1068 }
1069
1070 #[instrument(skip(self))]
1071 async fn get_inbound_group_session(
1072 &self,
1073 room_id: &RoomId,
1074 session_id: &str,
1075 ) -> Result<Option<InboundGroupSession>> {
1076 let session_id = self.encode_key("inbound_group_session", session_id);
1077 let Some((room_id_from_db, value, backed_up)) =
1078 self.acquire().await?.get_inbound_group_session(session_id).await?
1079 else {
1080 return Ok(None);
1081 };
1082
1083 let room_id = self.encode_key("inbound_group_session", room_id.as_bytes());
1084 if *room_id != room_id_from_db {
1085 warn!("expected room_id for session_id doesn't match what's in the DB");
1086 return Ok(None);
1087 }
1088
1089 Ok(Some(self.deserialize_and_unpickle_inbound_group_session(value, backed_up)?))
1090 }
1091
1092 async fn get_inbound_group_sessions(&self) -> Result<Vec<InboundGroupSession>> {
1093 self.acquire()
1094 .await?
1095 .get_inbound_group_sessions()
1096 .await?
1097 .into_iter()
1098 .map(|(value, backed_up)| {
1099 self.deserialize_and_unpickle_inbound_group_session(value, backed_up)
1100 })
1101 .collect()
1102 }
1103
1104 async fn get_inbound_group_sessions_by_room_id(
1105 &self,
1106 room_id: &RoomId,
1107 ) -> Result<Vec<InboundGroupSession>> {
1108 let room_id = self.encode_key("inbound_group_session", room_id.as_bytes());
1109 self.acquire()
1110 .await?
1111 .get_inbound_group_sessions_by_room_id(room_id)
1112 .await?
1113 .into_iter()
1114 .map(|(value, backed_up)| {
1115 self.deserialize_and_unpickle_inbound_group_session(value, backed_up)
1116 })
1117 .collect()
1118 }
1119
1120 async fn get_inbound_group_sessions_for_device_batch(
1121 &self,
1122 sender_key: Curve25519PublicKey,
1123 sender_data_type: SenderDataType,
1124 after_session_id: Option<String>,
1125 limit: usize,
1126 ) -> Result<Vec<InboundGroupSession>, Self::Error> {
1127 let after_session_id =
1128 after_session_id.map(|session_id| self.encode_key("inbound_group_session", session_id));
1129 let sender_key = self.encode_key("inbound_group_session", sender_key.to_base64());
1130
1131 self.acquire()
1132 .await?
1133 .get_inbound_group_sessions_for_device_batch(
1134 sender_key,
1135 sender_data_type,
1136 after_session_id,
1137 limit,
1138 )
1139 .await?
1140 .into_iter()
1141 .map(|(value, backed_up)| {
1142 self.deserialize_and_unpickle_inbound_group_session(value, backed_up)
1143 })
1144 .collect()
1145 }
1146
1147 async fn inbound_group_session_counts(
1148 &self,
1149 backup_version: Option<&str>,
1150 ) -> Result<RoomKeyCounts> {
1151 Ok(self.acquire().await?.get_inbound_group_session_counts(backup_version).await?)
1152 }
1153
1154 async fn inbound_group_sessions_for_backup(
1155 &self,
1156 _backup_version: &str,
1157 limit: usize,
1158 ) -> Result<Vec<InboundGroupSession>> {
1159 self.acquire()
1160 .await?
1161 .get_inbound_group_sessions_for_backup(limit)
1162 .await?
1163 .into_iter()
1164 .map(|value| self.deserialize_and_unpickle_inbound_group_session(value, false))
1165 .collect()
1166 }
1167
1168 async fn mark_inbound_group_sessions_as_backed_up(
1169 &self,
1170 _backup_version: &str,
1171 session_ids: &[(&RoomId, &str)],
1172 ) -> Result<()> {
1173 Ok(self
1174 .acquire()
1175 .await?
1176 .mark_inbound_group_sessions_as_backed_up(
1177 session_ids
1178 .iter()
1179 .map(|(_, s)| self.encode_key("inbound_group_session", s))
1180 .collect(),
1181 )
1182 .await?)
1183 }
1184
1185 async fn reset_backup_state(&self) -> Result<()> {
1186 Ok(self.acquire().await?.reset_inbound_group_session_backup_state().await?)
1187 }
1188
1189 async fn load_backup_keys(&self) -> Result<BackupKeys> {
1190 let conn = self.acquire().await?;
1191
1192 let backup_version = conn
1193 .get_kv("backup_version_v1")
1194 .await?
1195 .map(|value| self.deserialize_value(&value))
1196 .transpose()?;
1197
1198 let decryption_key = conn
1199 .get_kv("recovery_key_v1")
1200 .await?
1201 .map(|value| self.deserialize_value(&value))
1202 .transpose()?;
1203
1204 Ok(BackupKeys { backup_version, decryption_key })
1205 }
1206
1207 async fn load_dehydrated_device_pickle_key(&self) -> Result<Option<DehydratedDeviceKey>> {
1208 let conn = self.acquire().await?;
1209
1210 conn.get_kv(DEHYDRATED_DEVICE_PICKLE_KEY)
1211 .await?
1212 .map(|value| self.deserialize_value(&value))
1213 .transpose()
1214 }
1215
1216 async fn delete_dehydrated_device_pickle_key(&self) -> Result<(), Self::Error> {
1217 let conn = self.acquire().await?;
1218 conn.clear_kv(DEHYDRATED_DEVICE_PICKLE_KEY).await?;
1219
1220 Ok(())
1221 }
1222 async fn get_outbound_group_session(
1223 &self,
1224 room_id: &RoomId,
1225 ) -> Result<Option<OutboundGroupSession>> {
1226 let room_id = self.encode_key("outbound_group_session", room_id.as_bytes());
1227 let Some(value) = self.acquire().await?.get_outbound_group_session(room_id).await? else {
1228 return Ok(None);
1229 };
1230
1231 let account_info = self.get_static_account().ok_or(Error::AccountUnset)?;
1232
1233 let pickle = self.deserialize_json(&value)?;
1234 let session = OutboundGroupSession::from_pickle(
1235 account_info.device_id,
1236 account_info.identity_keys,
1237 pickle,
1238 )
1239 .map_err(|_| Error::Unpickle)?;
1240
1241 return Ok(Some(session));
1242 }
1243
1244 async fn load_tracked_users(&self) -> Result<Vec<TrackedUser>> {
1245 self.acquire()
1246 .await?
1247 .get_tracked_users()
1248 .await?
1249 .iter()
1250 .map(|value| self.deserialize_value(value))
1251 .collect()
1252 }
1253
1254 async fn save_tracked_users(&self, tracked_users: &[(&UserId, bool)]) -> Result<()> {
1255 let users: Vec<(Key, Vec<u8>)> = tracked_users
1256 .iter()
1257 .map(|(u, d)| {
1258 let user_id = self.encode_key("tracked_users", u.as_bytes());
1259 let data =
1260 self.serialize_value(&TrackedUser { user_id: (*u).into(), dirty: *d })?;
1261 Ok((user_id, data))
1262 })
1263 .collect::<Result<_>>()?;
1264
1265 Ok(self.acquire().await?.add_tracked_users(users).await?)
1266 }
1267
1268 async fn get_device(
1269 &self,
1270 user_id: &UserId,
1271 device_id: &DeviceId,
1272 ) -> Result<Option<DeviceData>> {
1273 let user_id = self.encode_key("device", user_id.as_bytes());
1274 let device_id = self.encode_key("device", device_id.as_bytes());
1275 Ok(self
1276 .acquire()
1277 .await?
1278 .get_device(user_id, device_id)
1279 .await?
1280 .map(|value| self.deserialize_value(&value))
1281 .transpose()?)
1282 }
1283
1284 async fn get_user_devices(
1285 &self,
1286 user_id: &UserId,
1287 ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
1288 let user_id = self.encode_key("device", user_id.as_bytes());
1289 self.acquire()
1290 .await?
1291 .get_user_devices(user_id)
1292 .await?
1293 .into_iter()
1294 .map(|value| {
1295 let device: DeviceData = self.deserialize_value(&value)?;
1296 Ok((device.device_id().to_owned(), device))
1297 })
1298 .collect()
1299 }
1300
1301 async fn get_own_device(&self) -> Result<DeviceData> {
1302 let account_info = self.get_static_account().ok_or(Error::AccountUnset)?;
1303
1304 Ok(self
1305 .get_device(&account_info.user_id, &account_info.device_id)
1306 .await?
1307 .expect("We should be able to find our own device."))
1308 }
1309
1310 async fn get_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentityData>> {
1311 let user_id = self.encode_key("identity", user_id.as_bytes());
1312 Ok(self
1313 .acquire()
1314 .await?
1315 .get_user_identity(user_id)
1316 .await?
1317 .map(|value| self.deserialize_value(&value))
1318 .transpose()?)
1319 }
1320
1321 async fn is_message_known(
1322 &self,
1323 message_hash: &matrix_sdk_crypto::olm::OlmMessageHash,
1324 ) -> Result<bool> {
1325 let value = rmp_serde::to_vec(message_hash)?;
1326 Ok(self.acquire().await?.has_olm_hash(value).await?)
1327 }
1328
1329 async fn get_outgoing_secret_requests(
1330 &self,
1331 request_id: &TransactionId,
1332 ) -> Result<Option<GossipRequest>> {
1333 let request_id = self.encode_key("key_requests", request_id.as_bytes());
1334 Ok(self
1335 .acquire()
1336 .await?
1337 .get_outgoing_secret_request(request_id)
1338 .await?
1339 .map(|(value, sent_out)| self.deserialize_key_request(&value, sent_out))
1340 .transpose()?)
1341 }
1342
1343 async fn get_secret_request_by_info(
1344 &self,
1345 key_info: &SecretInfo,
1346 ) -> Result<Option<GossipRequest>> {
1347 let requests = self.acquire().await?.get_outgoing_secret_requests().await?;
1348 for (request, sent_out) in requests {
1349 let request = self.deserialize_key_request(&request, sent_out)?;
1350 if request.info == *key_info {
1351 return Ok(Some(request));
1352 }
1353 }
1354 Ok(None)
1355 }
1356
1357 async fn get_unsent_secret_requests(&self) -> Result<Vec<GossipRequest>> {
1358 self.acquire()
1359 .await?
1360 .get_unsent_secret_requests()
1361 .await?
1362 .iter()
1363 .map(|value| {
1364 let request = self.deserialize_key_request(value, false)?;
1365 Ok(request)
1366 })
1367 .collect()
1368 }
1369
1370 async fn delete_outgoing_secret_requests(&self, request_id: &TransactionId) -> Result<()> {
1371 let request_id = self.encode_key("key_requests", request_id.as_bytes());
1372 Ok(self.acquire().await?.delete_key_request(request_id).await?)
1373 }
1374
1375 async fn get_secrets_from_inbox(
1376 &self,
1377 secret_name: &SecretName,
1378 ) -> Result<Vec<GossippedSecret>> {
1379 let secret_name = self.encode_key("secrets", secret_name.to_string());
1380
1381 self.acquire()
1382 .await?
1383 .get_secrets_from_inbox(secret_name)
1384 .await?
1385 .into_iter()
1386 .map(|value| self.deserialize_json(value.as_ref()))
1387 .collect()
1388 }
1389
1390 async fn delete_secrets_from_inbox(&self, secret_name: &SecretName) -> Result<()> {
1391 let secret_name = self.encode_key("secrets", secret_name.to_string());
1392 self.acquire().await?.delete_secrets_from_inbox(secret_name).await
1393 }
1394
1395 async fn get_withheld_info(
1396 &self,
1397 room_id: &RoomId,
1398 session_id: &str,
1399 ) -> Result<Option<RoomKeyWithheldEntry>> {
1400 let room_id = self.encode_key("direct_withheld_info", room_id);
1401 let session_id = self.encode_key("direct_withheld_info", session_id);
1402
1403 self.acquire()
1404 .await?
1405 .get_direct_withheld_info(session_id, room_id)
1406 .await?
1407 .map(|value| {
1408 let info = self.deserialize_json::<RoomKeyWithheldEntry>(&value)?;
1409 Ok(info)
1410 })
1411 .transpose()
1412 }
1413
1414 async fn get_withheld_sessions_by_room_id(
1415 &self,
1416 room_id: &RoomId,
1417 ) -> matrix_sdk_crypto::store::Result<Vec<RoomKeyWithheldEntry>, Self::Error> {
1418 let room_id = self.encode_key("direct_withheld_info", room_id);
1419
1420 self.acquire()
1421 .await?
1422 .get_withheld_sessions_by_room_id(room_id)
1423 .await?
1424 .into_iter()
1425 .map(|value| self.deserialize_json(&value))
1426 .collect()
1427 }
1428
1429 async fn get_room_settings(&self, room_id: &RoomId) -> Result<Option<RoomSettings>> {
1430 let room_id = self.encode_key("room_settings", room_id.as_bytes());
1431 let Some(value) = self.acquire().await?.get_room_settings(room_id).await? else {
1432 return Ok(None);
1433 };
1434
1435 let settings = self.deserialize_value(&value)?;
1436
1437 return Ok(Some(settings));
1438 }
1439
1440 async fn get_received_room_key_bundle_data(
1441 &self,
1442 room_id: &RoomId,
1443 user_id: &UserId,
1444 ) -> Result<Option<StoredRoomKeyBundleData>> {
1445 let room_id = self.encode_key("received_room_key_bundle", room_id);
1446 let user_id = self.encode_key("received_room_key_bundle", user_id);
1447 self.acquire()
1448 .await?
1449 .get_received_room_key_bundle(room_id, user_id)
1450 .await?
1451 .map(|value| self.deserialize_value(&value))
1452 .transpose()
1453 }
1454
1455 async fn get_custom_value(&self, key: &str) -> Result<Option<Vec<u8>>> {
1456 let Some(serialized) = self.acquire().await?.get_kv(key).await? else {
1457 return Ok(None);
1458 };
1459 let value = if let Some(cipher) = &self.store_cipher {
1460 let encrypted = rmp_serde::from_slice(&serialized)?;
1461 cipher.decrypt_value_data(encrypted)?
1462 } else {
1463 serialized
1464 };
1465
1466 Ok(Some(value))
1467 }
1468
1469 async fn set_custom_value(&self, key: &str, value: Vec<u8>) -> Result<()> {
1470 let serialized = if let Some(cipher) = &self.store_cipher {
1471 let encrypted = cipher.encrypt_value_data(value)?;
1472 rmp_serde::to_vec_named(&encrypted)?
1473 } else {
1474 value
1475 };
1476
1477 self.acquire().await?.set_kv(key, serialized).await?;
1478 Ok(())
1479 }
1480
1481 async fn remove_custom_value(&self, key: &str) -> Result<()> {
1482 let key = key.to_owned();
1483 self.acquire()
1484 .await?
1485 .interact(move |conn| conn.execute("DELETE FROM kv WHERE key = ?1", (&key,)))
1486 .await
1487 .unwrap()?;
1488 Ok(())
1489 }
1490
1491 async fn try_take_leased_lock(
1492 &self,
1493 lease_duration_ms: u32,
1494 key: &str,
1495 holder: &str,
1496 ) -> Result<bool> {
1497 let key = key.to_owned();
1498 let holder = holder.to_owned();
1499
1500 let now_ts: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
1501 let expiration_ts = now_ts + lease_duration_ms as u64;
1502
1503 let num_touched = self
1504 .acquire()
1505 .await?
1506 .with_transaction(move |txn| {
1507 txn.execute(
1508 "INSERT INTO lease_locks (key, holder, expiration_ts)
1509 VALUES (?1, ?2, ?3)
1510 ON CONFLICT (key)
1511 DO
1512 UPDATE SET holder = ?2, expiration_ts = ?3
1513 WHERE holder = ?2
1514 OR expiration_ts < ?4
1515 ",
1516 (key, holder, expiration_ts, now_ts),
1517 )
1518 })
1519 .await?;
1520
1521 Ok(num_touched == 1)
1522 }
1523
1524 async fn next_batch_token(&self) -> Result<Option<String>, Self::Error> {
1525 let conn = self.acquire().await?;
1526 if let Some(token) = conn.get_kv("next_batch_token").await? {
1527 let maybe_token: Option<String> = self.deserialize_value(&token)?;
1528 Ok(maybe_token)
1529 } else {
1530 Ok(None)
1531 }
1532 }
1533}
1534
1535#[cfg(test)]
1536mod tests {
1537 use std::path::Path;
1538
1539 use matrix_sdk_common::deserialized_responses::WithheldCode;
1540 use matrix_sdk_crypto::{
1541 cryptostore_integration_tests, cryptostore_integration_tests_time, olm::SenderDataType,
1542 store::CryptoStore,
1543 };
1544 use matrix_sdk_test::async_test;
1545 use once_cell::sync::Lazy;
1546 use ruma::{device_id, room_id, user_id};
1547 use similar_asserts::assert_eq;
1548 use tempfile::{tempdir, TempDir};
1549 use tokio::fs;
1550
1551 use super::SqliteCryptoStore;
1552 use crate::SqliteStoreConfig;
1553
1554 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1555
1556 struct TestDb {
1557 _dir: TempDir,
1560 database: SqliteCryptoStore,
1561 }
1562
1563 fn copy_db(data_path: &str) -> TempDir {
1564 let db_name = super::DATABASE_NAME;
1565
1566 let manifest_path = Path::new(env!("CARGO_MANIFEST_DIR")).join("../..");
1567 let database_path = manifest_path.join(data_path).join(db_name);
1568
1569 let tmpdir = tempdir().unwrap();
1570 let destination = tmpdir.path().join(db_name);
1571
1572 std::fs::copy(&database_path, destination).unwrap();
1574
1575 tmpdir
1576 }
1577
1578 async fn get_test_db(data_path: &str, passphrase: Option<&str>) -> TestDb {
1579 let tmpdir = copy_db(data_path);
1580
1581 let database = SqliteCryptoStore::open(tmpdir.path(), passphrase)
1582 .await
1583 .expect("Can't open the test store");
1584
1585 TestDb { _dir: tmpdir, database }
1586 }
1587
1588 #[async_test]
1589 async fn test_pool_size() {
1590 let store_open_config =
1591 SqliteStoreConfig::new(TMP_DIR.path().join("test_pool_size")).pool_max_size(42);
1592
1593 let store = SqliteCryptoStore::open_with_config(store_open_config).await.unwrap();
1594
1595 assert_eq!(store.pool.status().max_size, 42);
1596 }
1597
1598 #[async_test]
1601 async fn test_open_test_vector_store() {
1602 let TestDb { _dir: _, database } = get_test_db("testing/data/storage", None).await;
1603
1604 let account = database
1605 .load_account()
1606 .await
1607 .unwrap()
1608 .expect("The test database is prefilled with data, we should find an account");
1609
1610 let user_id = account.user_id();
1611 let device_id = account.device_id();
1612
1613 assert_eq!(
1614 user_id.as_str(),
1615 "@pjtest:synapse-oidc.element.dev",
1616 "The user ID should match to the one we expect."
1617 );
1618
1619 assert_eq!(
1620 device_id.as_str(),
1621 "v4TqgcuIH6",
1622 "The device ID should match to the one we expect."
1623 );
1624
1625 let device = database
1626 .get_device(user_id, device_id)
1627 .await
1628 .unwrap()
1629 .expect("Our own device should be found in the store.");
1630
1631 assert_eq!(device.device_id(), device_id);
1632 assert_eq!(device.user_id(), user_id);
1633
1634 assert_eq!(
1635 device.ed25519_key().expect("The device should have a Ed25519 key.").to_base64(),
1636 "+cxl1Gl3du5i7UJwfWnoRDdnafFF+xYdAiTYYhYLr8s"
1637 );
1638
1639 assert_eq!(
1640 device.curve25519_key().expect("The device should have a Curve25519 key.").to_base64(),
1641 "4SL9eEUlpyWSUvjljC5oMjknHQQJY7WZKo5S1KL/5VU"
1642 );
1643
1644 let identity = database
1645 .get_user_identity(user_id)
1646 .await
1647 .unwrap()
1648 .expect("The store should contain an identity.");
1649
1650 assert_eq!(identity.user_id(), user_id);
1651
1652 let identity = identity
1653 .own()
1654 .expect("The identity should be of the correct type, it should be our own identity.");
1655
1656 let master_key = identity
1657 .master_key()
1658 .get_first_key()
1659 .expect("Our own identity should have a master key");
1660
1661 assert_eq!(master_key.to_base64(), "iCUEtB1RwANeqRa5epDrblLk4mer/36sylwQ5hYY3oE");
1662 }
1663
1664 #[async_test]
1667 async fn test_open_test_vector_encrypted_store() {
1668 let TestDb { _dir: _, database } = get_test_db(
1669 "testing/data/storage/alice",
1670 Some(concat!(
1671 "/rCia2fYAJ+twCZ1Xm2mxFCYcmJdyzkdJjwtgXsziWpYS/UeNxnixuSieuwZXm+x1VsJHmWpl",
1672 "H+QIQBZpEGZtC9/S/l8xK+WOCesmET0o6yJ/KP73ofDtjBlnNpPwuHLKFpyTbyicpCgQ4UT+5E",
1673 "UBuJ08TY9Ujdf1D13k5kr5tSZUefDKKCuG1fCRqlU8ByRas1PMQsZxT2W8t7QgBrQiiGmhpo/O",
1674 "Ti4hfx97GOxncKcxTzppiYQNoHs/f15+XXQD7/oiCcqRIuUlXNsU6hRpFGmbYx2Pi1eyQViQCt",
1675 "B5dAEiSD0N8U81wXYnpynuTPtnL+hfnOJIn7Sy7mkERQeKg"
1676 )),
1677 )
1678 .await;
1679
1680 let account = database
1681 .load_account()
1682 .await
1683 .unwrap()
1684 .expect("The test database is prefilled with data, we should find an account");
1685
1686 let user_id = account.user_id();
1687 let device_id = account.device_id();
1688
1689 assert_eq!(
1690 user_id.as_str(),
1691 "@alice:localhost",
1692 "The user ID should match to the one we expect."
1693 );
1694
1695 assert_eq!(
1696 device_id.as_str(),
1697 "JVVORTHFXY",
1698 "The device ID should match to the one we expect."
1699 );
1700
1701 let tracked_users =
1702 database.load_tracked_users().await.expect("Should be tracking some users");
1703
1704 assert_eq!(tracked_users.len(), 6);
1705
1706 let known_users = vec![
1707 user_id!("@alice:localhost"),
1708 user_id!("@dehydration3:localhost"),
1709 user_id!("@eve:localhost"),
1710 user_id!("@bob:localhost"),
1711 user_id!("@malo:localhost"),
1712 user_id!("@carl:localhost"),
1713 ];
1714
1715 for user_id in known_users {
1717 database.get_user_identity(user_id).await.expect("Should load this identity").unwrap();
1718 }
1719
1720 let carl_identity =
1721 database.get_user_identity(user_id!("@carl:localhost")).await.unwrap().unwrap();
1722
1723 assert_eq!(
1724 carl_identity.master_key().get_first_key().unwrap().to_base64(),
1725 "CdhKYYDeBDQveOioXEGWhTPCyzc63Irpar3CNyfun2Q"
1726 );
1727 assert!(!carl_identity.was_previously_verified());
1728
1729 let bob_identity =
1730 database.get_user_identity(user_id!("@bob:localhost")).await.unwrap().unwrap();
1731
1732 assert_eq!(
1733 bob_identity.master_key().get_first_key().unwrap().to_base64(),
1734 "COh2GYOJWSjem5QPRCaGp9iWV83IELG1IzLKW2S3pFY"
1735 );
1736 assert!(bob_identity.was_previously_verified());
1738
1739 let known_devices = vec![
1740 (device_id!("OPXQHCZSKW"), user_id!("@alice:localhost")),
1741 (
1743 device_id!("EvW+9IrGR10KVgVeZP25/KaPfx4R86FofVMcaz7VOho"),
1744 user_id!("@alice:localhost"),
1745 ),
1746 (device_id!("HEEFRFQENV"), user_id!("@alice:localhost")),
1747 (device_id!("JVVORTHFXY"), user_id!("@alice:localhost")),
1748 (device_id!("NQUWWSKKHS"), user_id!("@alice:localhost")),
1749 (device_id!("ORBLPFYCPG"), user_id!("@alice:localhost")),
1750 (device_id!("YXOWENSEGM"), user_id!("@dehydration3:localhost")),
1751 (device_id!("VXLFMYCHXC"), user_id!("@bob:localhost")),
1752 (device_id!("FDGDQAEWOW"), user_id!("@bob:localhost")),
1753 (device_id!("VXLFMYCHXC"), user_id!("@bob:localhost")),
1754 (device_id!("FDGDQAEWOW"), user_id!("@bob:localhost")),
1755 (device_id!("QKUKWJTTQC"), user_id!("@malo:localhost")),
1756 (device_id!("LOUXJECTFG"), user_id!("@malo:localhost")),
1757 (device_id!("MKKMAEVLPB"), user_id!("@carl:localhost")),
1758 ];
1759
1760 for (device_id, user_id) in known_devices {
1761 database.get_device(user_id, device_id).await.expect("Should load the device").unwrap();
1762 }
1763
1764 let known_sender_key_to_session_count = vec![
1765 ("FfYcYfDF4nWy+LHdK6CEpIMlFAQDORc30WUkghL06kM", 1),
1766 ("EvW+9IrGR10KVgVeZP25/KaPfx4R86FofVMcaz7VOho", 1),
1767 ("hAGsoA4a9M6wwEUX5Q1jux1i+tUngLi01n5AmhDoHTY", 1),
1768 ("aKqtSJymLzuoglWFwPGk1r/Vm2LE2hFESzXxn4RNjRM", 0),
1769 ("zHK1psCrgeMn0kaz8hcdvA3INyar9jg1yfrSp0p1pHo", 1),
1770 ("1QmBA316Wj5jIFRwNOti6N6Xh/vW0bsYCcR4uPfy8VQ", 1),
1771 ("g5ef2vZF3VXgSPyODIeXpyHIRkuthvLhGvd6uwYggWU", 1),
1772 ("o7hfupPd1VsNkRIvdlH6ujrEJFSKjFCGbxhAd31XxjI", 1),
1773 ("Z3RxKQLxY7xpP+ZdOGR2SiNE37SrvmRhW7GPu1UGdm8", 1),
1774 ("GDomaav8NiY3J+dNEeApJm+O0FooJ3IpVaIyJzCN4w4", 1),
1775 ("7m7fqkHyEr47V5s/KjaxtJMOr3pSHrrns2q2lWpAQi8", 0),
1776 ("9psAkPUIF8vNbWbnviX3PlwRcaeO53EHJdNtKpTY1X0", 0),
1777 ("mqanh+ztw5oRtpqYQgLGW864i6NY2zpoKMIlrcyC+Aw", 0),
1778 ("fJU/TJdbsv7tVbbpHw1Ke73ziElnM32cNhP2WIg4T10", 0),
1779 ("sUIeFeFcCZoa5IC6nJ6Vrbvztcyx09m8BBg57XKRClg", 1),
1780 ];
1781
1782 for (id, count) in known_sender_key_to_session_count {
1783 let olm_sessions =
1784 database.get_sessions(id).await.expect("Should have some olm sessions");
1785
1786 println!("### Session id: {id:?}");
1787 assert_eq!(olm_sessions.map_or(0, |v| v.len()), count);
1788 }
1789
1790 let inbound_group_sessions = database.get_inbound_group_sessions().await.unwrap();
1791 assert_eq!(inbound_group_sessions.len(), 15);
1792 let known_inbound_group_sessions = vec![
1793 (
1794 "5hNAxrLai3VI0LKBwfh3wLfksfBFWds0W1a5X5/vSXA",
1795 room_id!("!SRstFdydzrGwJYtVfm:localhost"),
1796 ),
1797 (
1798 "M6d2eU3y54gaYTbvGSlqa/xc1Az35l56Cp9sxzHWO4g",
1799 room_id!("!SRstFdydzrGwJYtVfm:localhost"),
1800 ),
1801 (
1802 "IrydwXkRk2N2AqUMIVmLL3oJgMq14R9KId0P/uSD100",
1803 room_id!("!SRstFdydzrGwJYtVfm:localhost"),
1804 ),
1805 (
1806 "Y74+l9jTo7N5UF+GQwdpgJGe4sn1+QtWITq7BxulHIE",
1807 room_id!("!SRstFdydzrGwJYtVfm:localhost"),
1808 ),
1809 (
1810 "HpJxQR57WbQGdY6w2Q+C16znVvbXGa+JvQdRoMpWbXg",
1811 room_id!("!SRstFdydzrGwJYtVfm:localhost"),
1812 ),
1813 (
1814 "Xetvi+ydFkZt8dpONGFbEusQb/Chc2V0XlLByZhsbgE",
1815 room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
1816 ),
1817 (
1818 "wv/WN/39akyerIXczTaIpjAuLnwgXKRtbXFSEHiJqxo",
1819 room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
1820 ),
1821 (
1822 "nA4gQwL//Cm8OdlyjABl/jChbPT/cP5V4Sd8iuE6H0s",
1823 room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
1824 ),
1825 (
1826 "bAAgqFeRDTjfEqL6Qf/c9mk55zoNDCSlboAIRd6b0hw",
1827 room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
1828 ),
1829 (
1830 "exPbsMMdGfAG2qmDdFtpAn+koVprfzS0Zip/RA9QRCE",
1831 room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
1832 ),
1833 (
1834 "h+om7oSw/ZV94fcKaoe8FGXJwQXWOfKQfzbGgNWQILI",
1835 room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
1836 ),
1837 (
1838 "ul3VXonpgk4lO2L3fEWubP/nxsTmLHqu5v8ZM9vHEcw",
1839 room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
1840 ),
1841 (
1842 "JXY15UxC3az2mwg8uX4qwgxfvCM4aygiIWMcdNiVQoc",
1843 room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
1844 ),
1845 (
1846 "OGB9lObr9kWUvha9tB5sMfOF/Mztk24JwQz/nwg3iFQ",
1847 room_id!("!OgRiTRMaUzLdpCeDBM:localhost"),
1848 ),
1849 (
1850 "SFkHcbxjUOYF7mUAYI/oEMDZFaXszQbCN6Jza7iemj0",
1851 room_id!("!OgRiTRMaUzLdpCeDBM:localhost"),
1852 ),
1853 ];
1854
1855 for (session_id, room_id) in &known_inbound_group_sessions {
1857 database
1858 .get_inbound_group_session(room_id, session_id)
1859 .await
1860 .expect("Should be able to load inbound group session")
1861 .unwrap();
1862 }
1863
1864 let bob_sender_verified = database
1865 .get_inbound_group_session(
1866 room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
1867 "exPbsMMdGfAG2qmDdFtpAn+koVprfzS0Zip/RA9QRCE",
1868 )
1869 .await
1870 .unwrap()
1871 .unwrap();
1872
1873 assert_eq!(bob_sender_verified.sender_data.to_type(), SenderDataType::SenderVerified);
1874 assert!(bob_sender_verified.backed_up());
1875 assert!(!bob_sender_verified.has_been_imported());
1876
1877 let alice_unknown_device = database
1878 .get_inbound_group_session(
1879 room_id!("!SRstFdydzrGwJYtVfm:localhost"),
1880 "IrydwXkRk2N2AqUMIVmLL3oJgMq14R9KId0P/uSD100",
1881 )
1882 .await
1883 .unwrap()
1884 .unwrap();
1885
1886 assert_eq!(alice_unknown_device.sender_data.to_type(), SenderDataType::UnknownDevice);
1887 assert!(alice_unknown_device.backed_up());
1888 assert!(alice_unknown_device.has_been_imported());
1889
1890 let carl_tofu_session = database
1891 .get_inbound_group_session(
1892 room_id!("!OgRiTRMaUzLdpCeDBM:localhost"),
1893 "OGB9lObr9kWUvha9tB5sMfOF/Mztk24JwQz/nwg3iFQ",
1894 )
1895 .await
1896 .unwrap()
1897 .unwrap();
1898
1899 assert_eq!(carl_tofu_session.sender_data.to_type(), SenderDataType::SenderUnverified);
1900 assert!(carl_tofu_session.backed_up());
1901 assert!(!carl_tofu_session.has_been_imported());
1902
1903 database
1905 .get_outbound_group_session(room_id!("!OgRiTRMaUzLdpCeDBM:localhost"))
1906 .await
1907 .unwrap()
1908 .unwrap();
1909 database
1910 .get_outbound_group_session(room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"))
1911 .await
1912 .unwrap()
1913 .unwrap();
1914 database
1915 .get_outbound_group_session(room_id!("!SRstFdydzrGwJYtVfm:localhost"))
1916 .await
1917 .unwrap()
1918 .unwrap();
1919
1920 let withheld_info = database
1921 .get_withheld_info(
1922 room_id!("!OgRiTRMaUzLdpCeDBM:localhost"),
1923 "SASgZ+EklvAF4QxJclMlDRlmL0fAMjAJJIKFMdb4Ht0",
1924 )
1925 .await
1926 .expect("This session should be withheld")
1927 .unwrap();
1928
1929 assert_eq!(withheld_info.content.withheld_code(), WithheldCode::Unverified);
1930
1931 let backup_keys = database.load_backup_keys().await.expect("backup key should be cached");
1932 assert_eq!(backup_keys.backup_version.unwrap(), "6");
1933 assert!(backup_keys.decryption_key.is_some());
1934 }
1935
1936 async fn get_store(
1937 name: &str,
1938 passphrase: Option<&str>,
1939 clear_data: bool,
1940 ) -> SqliteCryptoStore {
1941 let tmpdir_path = TMP_DIR.path().join(name);
1942
1943 if clear_data {
1944 let _ = fs::remove_dir_all(&tmpdir_path).await;
1945 }
1946
1947 SqliteCryptoStore::open(tmpdir_path.to_str().unwrap(), passphrase)
1948 .await
1949 .expect("Can't create a secret protected store")
1950 }
1951
1952 cryptostore_integration_tests!();
1953 cryptostore_integration_tests_time!();
1954}
1955
1956#[cfg(test)]
1957mod encrypted_tests {
1958 use matrix_sdk_crypto::{cryptostore_integration_tests, cryptostore_integration_tests_time};
1959 use once_cell::sync::Lazy;
1960 use tempfile::{tempdir, TempDir};
1961 use tokio::fs;
1962
1963 use super::SqliteCryptoStore;
1964
1965 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1966
1967 async fn get_store(
1968 name: &str,
1969 passphrase: Option<&str>,
1970 clear_data: bool,
1971 ) -> SqliteCryptoStore {
1972 let tmpdir_path = TMP_DIR.path().join(name);
1973 let pass = passphrase.unwrap_or("default_test_password");
1974
1975 if clear_data {
1976 let _ = fs::remove_dir_all(&tmpdir_path).await;
1977 }
1978
1979 SqliteCryptoStore::open(tmpdir_path.to_str().unwrap(), Some(pass))
1980 .await
1981 .expect("Can't create a secret protected store")
1982 }
1983
1984 cryptostore_integration_tests!();
1985 cryptostore_integration_tests_time!();
1986}