1use std::{
16 collections::HashMap,
17 fmt,
18 path::Path,
19 sync::{Arc, RwLock},
20};
21
22use async_trait::async_trait;
23use matrix_sdk_base::cross_process_lock::CrossProcessLockGeneration;
24use matrix_sdk_crypto::{
25 Account, DeviceData, GossipRequest, GossippedSecret, SecretInfo, TrackedUser, UserIdentityData,
26 olm::{
27 InboundGroupSession, OutboundGroupSession, PickledInboundGroupSession,
28 PrivateCrossSigningIdentity, SenderDataType, Session, StaticAccountData,
29 },
30 store::{
31 CryptoStore,
32 types::{
33 BackupKeys, Changes, DehydratedDeviceKey, PendingChanges, RoomKeyCounts,
34 RoomKeyWithheldEntry, RoomSettings, StoredRoomKeyBundleData,
35 },
36 },
37};
38use matrix_sdk_store_encryption::StoreCipher;
39use ruma::{
40 DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, RoomId, TransactionId, UserId,
41 events::secret::request::SecretName,
42};
43use rusqlite::{OptionalExtension, named_params, params_from_iter};
44use tokio::{fs, sync::Mutex};
45use tracing::{debug, instrument, warn};
46use vodozemac::Curve25519PublicKey;
47
48use crate::{
49 OpenStoreError, Secret, SqliteStoreConfig,
50 connection::{Connection as SqliteAsyncConn, Pool as SqlitePool},
51 error::{Error, Result},
52 utils::{
53 EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
54 SqliteKeyValueStoreConnExt, repeat_vars,
55 },
56};
57
58const DATABASE_NAME: &str = "matrix-sdk-crypto.sqlite3";
60
61#[derive(Clone)]
63pub struct SqliteCryptoStore {
64 store_cipher: Option<Arc<StoreCipher>>,
65 pool: SqlitePool,
66
67 static_account: Arc<RwLock<Option<StaticAccountData>>>,
69 save_changes_lock: Arc<Mutex<()>>,
70}
71
72#[cfg(not(tarpaulin_include))]
73impl fmt::Debug for SqliteCryptoStore {
74 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75 f.debug_struct("SqliteCryptoStore").finish_non_exhaustive()
76 }
77}
78
79impl EncryptableStore for SqliteCryptoStore {
80 fn get_cypher(&self) -> Option<&StoreCipher> {
81 self.store_cipher.as_deref()
82 }
83}
84
85impl SqliteCryptoStore {
86 pub async fn open(
89 path: impl AsRef<Path>,
90 passphrase: Option<&str>,
91 ) -> Result<Self, OpenStoreError> {
92 Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
93 }
94
95 pub async fn open_with_key(
98 path: impl AsRef<Path>,
99 key: Option<&[u8; 32]>,
100 ) -> Result<Self, OpenStoreError> {
101 Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
102 }
103
104 pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
106 fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir)?;
107
108 let pool = config.build_pool_of_connections(DATABASE_NAME)?;
109
110 let this = Self::open_with_pool(pool, config.secret).await?;
111 this.pool.get().await?.apply_runtime_config(config.runtime_config).await?;
112
113 Ok(this)
114 }
115
116 async fn open_with_pool(
119 pool: SqlitePool,
120 secret: Option<Secret>,
121 ) -> Result<Self, OpenStoreError> {
122 let conn = pool.get().await?;
123
124 let version = conn.db_version().await?;
125 debug!("Opened sqlite store with version {}", version);
126 run_migrations(&conn, version).await?;
127
128 let store_cipher = match secret {
129 Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
130 None => None,
131 };
132
133 Ok(SqliteCryptoStore {
134 store_cipher,
135 pool,
136 static_account: Arc::new(RwLock::new(None)),
137 save_changes_lock: Default::default(),
138 })
139 }
140
141 fn deserialize_and_unpickle_inbound_group_session(
142 &self,
143 value: Vec<u8>,
144 backed_up: bool,
145 ) -> Result<InboundGroupSession> {
146 let mut pickle: PickledInboundGroupSession = self.deserialize_value(&value)?;
147
148 pickle.backed_up = backed_up;
153
154 Ok(InboundGroupSession::from_pickle(pickle)?)
155 }
156
157 fn deserialize_key_request(&self, value: &[u8], sent_out: bool) -> Result<GossipRequest> {
158 let mut request: GossipRequest = self.deserialize_value(value)?;
159 request.sent_out = sent_out;
162 Ok(request)
163 }
164
165 fn get_static_account(&self) -> Option<StaticAccountData> {
166 self.static_account.read().unwrap().clone()
167 }
168
169 async fn acquire(&self) -> Result<SqliteAsyncConn> {
170 Ok(self.pool.get().await?)
171 }
172}
173
174const DATABASE_VERSION: u8 = 13;
175
176const DEHYDRATED_DEVICE_PICKLE_KEY: &str = "dehydrated_device_pickle_key";
178
179async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
181 if version == 0 {
182 debug!("Creating database");
183 } else if version < DATABASE_VERSION {
184 debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
185 } else {
186 return Ok(());
187 }
188
189 if version < 1 {
190 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
193 conn.with_transaction(|txn| {
194 txn.execute_batch(include_str!("../migrations/crypto_store/001_init.sql"))?;
195 txn.set_db_version(1)
196 })
197 .await?;
198 }
199
200 if version < 2 {
201 conn.with_transaction(|txn| {
202 txn.execute_batch(include_str!("../migrations/crypto_store/002_reset_olm_hash.sql"))?;
203 txn.set_db_version(2)
204 })
205 .await?;
206 }
207
208 if version < 3 {
209 conn.with_transaction(|txn| {
210 txn.execute_batch(include_str!("../migrations/crypto_store/003_room_settings.sql"))?;
211 txn.set_db_version(3)
212 })
213 .await?;
214 }
215
216 if version < 4 {
217 conn.with_transaction(|txn| {
218 txn.execute_batch(include_str!(
219 "../migrations/crypto_store/004_drop_outbound_group_sessions.sql"
220 ))?;
221 txn.set_db_version(4)
222 })
223 .await?;
224 }
225
226 if version < 5 {
227 conn.with_transaction(|txn| {
228 txn.execute_batch(include_str!("../migrations/crypto_store/005_withheld_code.sql"))?;
229 txn.set_db_version(5)
230 })
231 .await?;
232 }
233
234 if version < 6 {
235 conn.with_transaction(|txn| {
236 txn.execute_batch(include_str!(
237 "../migrations/crypto_store/006_drop_outbound_group_sessions.sql"
238 ))?;
239 txn.set_db_version(6)
240 })
241 .await?;
242 }
243
244 if version < 7 {
245 conn.with_transaction(|txn| {
246 txn.execute_batch(include_str!("../migrations/crypto_store/007_lock_leases.sql"))?;
247 txn.set_db_version(7)
248 })
249 .await?;
250 }
251
252 if version < 8 {
253 conn.with_transaction(|txn| {
254 txn.execute_batch(include_str!("../migrations/crypto_store/008_secret_inbox.sql"))?;
255 txn.set_db_version(8)
256 })
257 .await?;
258 }
259
260 if version < 9 {
261 conn.with_transaction(|txn| {
262 txn.execute_batch(include_str!(
263 "../migrations/crypto_store/009_inbound_group_session_sender_key_sender_data_type.sql"
264 ))?;
265 txn.set_db_version(9)
266 })
267 .await?;
268 }
269
270 if version < 10 {
271 conn.with_transaction(|txn| {
272 txn.execute_batch(include_str!(
273 "../migrations/crypto_store/010_received_room_key_bundles.sql"
274 ))?;
275 txn.set_db_version(10)
276 })
277 .await?;
278 }
279
280 if version < 11 {
281 conn.with_transaction(|txn| {
282 txn.execute_batch(include_str!(
283 "../migrations/crypto_store/011_received_room_key_bundles_with_curve_key.sql"
284 ))?;
285 txn.set_db_version(11)
286 })
287 .await?;
288 }
289
290 if version < 12 {
291 conn.with_transaction(|txn| {
292 txn.execute_batch(include_str!(
293 "../migrations/crypto_store/012_withheld_code_by_room.sql"
294 ))?;
295 txn.set_db_version(12)
296 })
297 .await?;
298 }
299
300 if version < 13 {
301 conn.with_transaction(|txn| {
302 txn.execute_batch(include_str!(
303 "../migrations/crypto_store/013_lease_locks_with_generation.sql"
304 ))?;
305 txn.set_db_version(13)
306 })
307 .await?;
308 }
309
310 Ok(())
311}
312
313trait SqliteConnectionExt {
314 fn set_session(
315 &self,
316 session_id: &[u8],
317 sender_key: &[u8],
318 data: &[u8],
319 ) -> rusqlite::Result<()>;
320
321 fn set_inbound_group_session(
322 &self,
323 room_id: &[u8],
324 session_id: &[u8],
325 data: &[u8],
326 backed_up: bool,
327 sender_key: Option<&[u8]>,
328 sender_data_type: Option<u8>,
329 ) -> rusqlite::Result<()>;
330
331 fn set_outbound_group_session(&self, room_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
332
333 fn set_device(&self, user_id: &[u8], device_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
334 fn delete_device(&self, user_id: &[u8], device_id: &[u8]) -> rusqlite::Result<()>;
335
336 fn set_identity(&self, user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
337
338 fn add_olm_hash(&self, data: &[u8]) -> rusqlite::Result<()>;
339
340 fn set_key_request(
341 &self,
342 request_id: &[u8],
343 sent_out: bool,
344 data: &[u8],
345 ) -> rusqlite::Result<()>;
346
347 fn set_direct_withheld(
348 &self,
349 session_id: &[u8],
350 room_id: &[u8],
351 data: &[u8],
352 ) -> rusqlite::Result<()>;
353
354 fn set_room_settings(&self, room_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
355
356 fn set_secret(&self, request_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
357
358 fn set_received_room_key_bundle(
359 &self,
360 room_id: &[u8],
361 user_id: &[u8],
362 data: &[u8],
363 ) -> rusqlite::Result<()>;
364}
365
366impl SqliteConnectionExt for rusqlite::Connection {
367 fn set_session(
368 &self,
369 session_id: &[u8],
370 sender_key: &[u8],
371 data: &[u8],
372 ) -> rusqlite::Result<()> {
373 self.execute(
374 "INSERT INTO session (session_id, sender_key, data)
375 VALUES (?1, ?2, ?3)
376 ON CONFLICT (session_id) DO UPDATE SET data = ?3",
377 (session_id, sender_key, data),
378 )?;
379 Ok(())
380 }
381
382 fn set_inbound_group_session(
383 &self,
384 room_id: &[u8],
385 session_id: &[u8],
386 data: &[u8],
387 backed_up: bool,
388 sender_key: Option<&[u8]>,
389 sender_data_type: Option<u8>,
390 ) -> rusqlite::Result<()> {
391 self.execute(
392 "INSERT INTO inbound_group_session (session_id, room_id, data, backed_up, sender_key, sender_data_type) \
393 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
394 ON CONFLICT (session_id) DO UPDATE SET data = ?3, backed_up = ?4, sender_key = ?5, sender_data_type = ?6",
395 (session_id, room_id, data, backed_up, sender_key, sender_data_type),
396 )?;
397 Ok(())
398 }
399
400 fn set_outbound_group_session(&self, room_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
401 self.execute(
402 "INSERT INTO outbound_group_session (room_id, data) \
403 VALUES (?1, ?2)
404 ON CONFLICT (room_id) DO UPDATE SET data = ?2",
405 (room_id, data),
406 )?;
407 Ok(())
408 }
409
410 fn set_device(&self, user_id: &[u8], device_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
411 self.execute(
412 "INSERT INTO device (user_id, device_id, data) \
413 VALUES (?1, ?2, ?3)
414 ON CONFLICT (user_id, device_id) DO UPDATE SET data = ?3",
415 (user_id, device_id, data),
416 )?;
417 Ok(())
418 }
419
420 fn delete_device(&self, user_id: &[u8], device_id: &[u8]) -> rusqlite::Result<()> {
421 self.execute(
422 "DELETE FROM device WHERE user_id = ? AND device_id = ?",
423 (user_id, device_id),
424 )?;
425 Ok(())
426 }
427
428 fn set_identity(&self, user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
429 self.execute(
430 "INSERT INTO identity (user_id, data) \
431 VALUES (?1, ?2)
432 ON CONFLICT (user_id) DO UPDATE SET data = ?2",
433 (user_id, data),
434 )?;
435 Ok(())
436 }
437
438 fn add_olm_hash(&self, data: &[u8]) -> rusqlite::Result<()> {
439 self.execute("INSERT INTO olm_hash (data) VALUES (?) ON CONFLICT DO NOTHING", (data,))?;
440 Ok(())
441 }
442
443 fn set_key_request(
444 &self,
445 request_id: &[u8],
446 sent_out: bool,
447 data: &[u8],
448 ) -> rusqlite::Result<()> {
449 self.execute(
450 "INSERT INTO key_requests (request_id, sent_out, data)
451 VALUES (?1, ?2, ?3)
452 ON CONFLICT (request_id) DO UPDATE SET sent_out = ?2, data = ?3",
453 (request_id, sent_out, data),
454 )?;
455 Ok(())
456 }
457
458 fn set_direct_withheld(
459 &self,
460 session_id: &[u8],
461 room_id: &[u8],
462 data: &[u8],
463 ) -> rusqlite::Result<()> {
464 self.execute(
465 "INSERT INTO direct_withheld_info (session_id, room_id, data)
466 VALUES (?1, ?2, ?3)
467 ON CONFLICT (session_id) DO UPDATE SET room_id = ?2, data = ?3",
468 (session_id, room_id, data),
469 )?;
470 Ok(())
471 }
472
473 fn set_room_settings(&self, room_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
474 self.execute(
475 "INSERT INTO room_settings (room_id, data)
476 VALUES (?1, ?2)
477 ON CONFLICT (room_id) DO UPDATE SET data = ?2",
478 (room_id, data),
479 )?;
480 Ok(())
481 }
482
483 fn set_secret(&self, secret_name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
484 self.execute(
485 "INSERT INTO secrets (secret_name, data)
486 VALUES (?1, ?2)",
487 (secret_name, data),
488 )?;
489
490 Ok(())
491 }
492
493 fn set_received_room_key_bundle(
494 &self,
495 room_id: &[u8],
496 sender_user_id: &[u8],
497 data: &[u8],
498 ) -> rusqlite::Result<()> {
499 self.execute(
500 "INSERT INTO received_room_key_bundle(room_id, sender_user_id, bundle_data)
501 VALUES (?1, ?2, ?3)
502 ON CONFLICT (room_id, sender_user_id) DO UPDATE SET bundle_data = ?3",
503 (room_id, sender_user_id, data),
504 )?;
505 Ok(())
506 }
507}
508
509#[async_trait]
510trait SqliteObjectCryptoStoreExt: SqliteAsyncConnExt {
511 async fn get_sessions_for_sender_key(&self, sender_key: Key) -> Result<Vec<Vec<u8>>> {
512 Ok(self
513 .prepare("SELECT data FROM session WHERE sender_key = ?", |mut stmt| {
514 stmt.query((sender_key,))?.mapped(|row| row.get(0)).collect()
515 })
516 .await?)
517 }
518
519 async fn get_inbound_group_session(
520 &self,
521 session_id: Key,
522 ) -> Result<Option<(Vec<u8>, Vec<u8>, bool)>> {
523 Ok(self
524 .query_row(
525 "SELECT room_id, data, backed_up FROM inbound_group_session WHERE session_id = ?",
526 (session_id,),
527 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
528 )
529 .await
530 .optional()?)
531 }
532
533 async fn get_inbound_group_sessions(&self) -> Result<Vec<(Vec<u8>, bool)>> {
534 Ok(self
535 .prepare("SELECT data, backed_up FROM inbound_group_session", |mut stmt| {
536 stmt.query(())?.mapped(|row| Ok((row.get(0)?, row.get(1)?))).collect()
537 })
538 .await?)
539 }
540
541 async fn get_inbound_group_session_counts(
542 &self,
543 _backup_version: Option<&str>,
544 ) -> Result<RoomKeyCounts> {
545 let total = self
546 .query_row("SELECT count(*) FROM inbound_group_session", (), |row| row.get(0))
547 .await?;
548 let backed_up = self
549 .query_row(
550 "SELECT count(*) FROM inbound_group_session WHERE backed_up = TRUE",
551 (),
552 |row| row.get(0),
553 )
554 .await?;
555 Ok(RoomKeyCounts { total, backed_up })
556 }
557
558 async fn get_inbound_group_sessions_by_room_id(
559 &self,
560 room_id: Key,
561 ) -> Result<Vec<(Vec<u8>, bool)>> {
562 Ok(self
563 .prepare(
564 "SELECT data, backed_up FROM inbound_group_session WHERE room_id = :room_id",
565 move |mut stmt| {
566 stmt.query(named_params! {
567 ":room_id": room_id,
568 })?
569 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
570 .collect()
571 },
572 )
573 .await?)
574 }
575
576 async fn get_inbound_group_sessions_for_device_batch(
577 &self,
578 sender_key: Key,
579 sender_data_type: SenderDataType,
580 after_session_id: Option<Key>,
581 limit: usize,
582 ) -> Result<Vec<(Vec<u8>, bool)>> {
583 Ok(self
584 .prepare(
585 "
586 SELECT data, backed_up
587 FROM inbound_group_session
588 WHERE sender_key = :sender_key
589 AND sender_data_type = :sender_data_type
590 AND session_id > :after_session_id
591 ORDER BY session_id
592 LIMIT :limit
593 ",
594 move |mut stmt| {
595 let sender_data_type = sender_data_type as u8;
596
597 let after_session_id = after_session_id.unwrap_or(Key::Plain(Vec::new()));
600
601 stmt.query(named_params! {
602 ":sender_key": sender_key,
603 ":sender_data_type": sender_data_type,
604 ":after_session_id": after_session_id,
605 ":limit": limit,
606 })?
607 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
608 .collect()
609 },
610 )
611 .await?)
612 }
613
614 async fn get_inbound_group_sessions_for_backup(&self, limit: usize) -> Result<Vec<Vec<u8>>> {
615 Ok(self
616 .prepare(
617 "SELECT data FROM inbound_group_session WHERE backed_up = FALSE LIMIT ?",
618 move |mut stmt| stmt.query((limit,))?.mapped(|row| row.get(0)).collect(),
619 )
620 .await?)
621 }
622
623 async fn mark_inbound_group_sessions_as_backed_up(&self, session_ids: Vec<Key>) -> Result<()> {
624 if session_ids.is_empty() {
625 warn!("No sessions to mark as backed up!");
627 return Ok(());
628 }
629
630 let session_ids_len = session_ids.len();
631
632 self.chunk_large_query_over(session_ids, None, move |txn, session_ids| {
633 let sql_params = repeat_vars(session_ids_len);
636 let query = format!("UPDATE inbound_group_session SET backed_up = TRUE where session_id IN ({sql_params})");
637 txn.prepare(&query)?.execute(params_from_iter(session_ids.iter()))?;
638 Ok(Vec::<()>::new())
639 }).await?;
640
641 Ok(())
642 }
643
644 async fn reset_inbound_group_session_backup_state(&self) -> Result<()> {
645 self.execute("UPDATE inbound_group_session SET backed_up = FALSE", ()).await?;
646 Ok(())
647 }
648
649 async fn get_outbound_group_session(&self, room_id: Key) -> Result<Option<Vec<u8>>> {
650 Ok(self
651 .query_row(
652 "SELECT data FROM outbound_group_session WHERE room_id = ?",
653 (room_id,),
654 |row| row.get(0),
655 )
656 .await
657 .optional()?)
658 }
659
660 async fn get_device(&self, user_id: Key, device_id: Key) -> Result<Option<Vec<u8>>> {
661 Ok(self
662 .query_row(
663 "SELECT data FROM device WHERE user_id = ? AND device_id = ?",
664 (user_id, device_id),
665 |row| row.get(0),
666 )
667 .await
668 .optional()?)
669 }
670
671 async fn get_user_devices(&self, user_id: Key) -> Result<Vec<Vec<u8>>> {
672 Ok(self
673 .prepare("SELECT data FROM device WHERE user_id = ?", |mut stmt| {
674 stmt.query((user_id,))?.mapped(|row| row.get(0)).collect()
675 })
676 .await?)
677 }
678
679 async fn get_user_identity(&self, user_id: Key) -> Result<Option<Vec<u8>>> {
680 Ok(self
681 .query_row("SELECT data FROM identity WHERE user_id = ?", (user_id,), |row| row.get(0))
682 .await
683 .optional()?)
684 }
685
686 async fn has_olm_hash(&self, data: Vec<u8>) -> Result<bool> {
687 Ok(self
688 .query_row("SELECT count(*) FROM olm_hash WHERE data = ?", (data,), |row| {
689 row.get::<_, i32>(0)
690 })
691 .await?
692 > 0)
693 }
694
695 async fn get_tracked_users(&self) -> Result<Vec<Vec<u8>>> {
696 Ok(self
697 .prepare("SELECT data FROM tracked_user", |mut stmt| {
698 stmt.query(())?.mapped(|row| row.get(0)).collect()
699 })
700 .await?)
701 }
702
703 async fn add_tracked_users(&self, users: Vec<(Key, Vec<u8>)>) -> Result<()> {
704 Ok(self
705 .prepare(
706 "INSERT INTO tracked_user (user_id, data) \
707 VALUES (?1, ?2) \
708 ON CONFLICT (user_id) DO UPDATE SET data = ?2",
709 |mut stmt| {
710 for (user_id, data) in users {
711 stmt.execute((user_id, data))?;
712 }
713
714 Ok(())
715 },
716 )
717 .await?)
718 }
719
720 async fn get_outgoing_secret_request(
721 &self,
722 request_id: Key,
723 ) -> Result<Option<(Vec<u8>, bool)>> {
724 Ok(self
725 .query_row(
726 "SELECT data, sent_out FROM key_requests WHERE request_id = ?",
727 (request_id,),
728 |row| Ok((row.get(0)?, row.get(1)?)),
729 )
730 .await
731 .optional()?)
732 }
733
734 async fn get_outgoing_secret_requests(&self) -> Result<Vec<(Vec<u8>, bool)>> {
735 Ok(self
736 .prepare("SELECT data, sent_out FROM key_requests", |mut stmt| {
737 stmt.query(())?.mapped(|row| Ok((row.get(0)?, row.get(1)?))).collect()
738 })
739 .await?)
740 }
741
742 async fn get_unsent_secret_requests(&self) -> Result<Vec<Vec<u8>>> {
743 Ok(self
744 .prepare("SELECT data FROM key_requests WHERE sent_out = FALSE", |mut stmt| {
745 stmt.query(())?.mapped(|row| row.get(0)).collect()
746 })
747 .await?)
748 }
749
750 async fn delete_key_request(&self, request_id: Key) -> Result<()> {
751 self.execute("DELETE FROM key_requests WHERE request_id = ?", (request_id,)).await?;
752 Ok(())
753 }
754
755 async fn get_secrets_from_inbox(&self, secret_name: Key) -> Result<Vec<Vec<u8>>> {
756 Ok(self
757 .prepare("SELECT data FROM secrets WHERE secret_name = ?", |mut stmt| {
758 stmt.query((secret_name,))?.mapped(|row| row.get(0)).collect()
759 })
760 .await?)
761 }
762
763 async fn delete_secrets_from_inbox(&self, secret_name: Key) -> Result<()> {
764 self.execute("DELETE FROM secrets WHERE secret_name = ?", (secret_name,)).await?;
765 Ok(())
766 }
767
768 async fn get_direct_withheld_info(
769 &self,
770 session_id: Key,
771 room_id: Key,
772 ) -> Result<Option<Vec<u8>>> {
773 Ok(self
774 .query_row(
775 "SELECT data FROM direct_withheld_info WHERE session_id = ?1 AND room_id = ?2",
776 (session_id, room_id),
777 |row| row.get(0),
778 )
779 .await
780 .optional()?)
781 }
782
783 async fn get_withheld_sessions_by_room_id(&self, room_id: Key) -> Result<Vec<Vec<u8>>> {
784 Ok(self
785 .prepare("SELECT data FROM direct_withheld_info WHERE room_id = ?1", |mut stmt| {
786 stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
787 })
788 .await?)
789 }
790
791 async fn get_room_settings(&self, room_id: Key) -> Result<Option<Vec<u8>>> {
792 Ok(self
793 .query_row("SELECT data FROM room_settings WHERE room_id = ?", (room_id,), |row| {
794 row.get(0)
795 })
796 .await
797 .optional()?)
798 }
799
800 async fn get_received_room_key_bundle(
801 &self,
802 room_id: Key,
803 sender_user: Key,
804 ) -> Result<Option<Vec<u8>>> {
805 Ok(self
806 .query_row(
807 "SELECT bundle_data FROM received_room_key_bundle WHERE room_id = ? AND sender_user_id = ?",
808 (room_id, sender_user),
809 |row| { row.get(0) },
810 )
811 .await
812 .optional()?)
813 }
814}
815
816#[async_trait]
817impl SqliteObjectCryptoStoreExt for SqliteAsyncConn {}
818
819#[async_trait]
820impl CryptoStore for SqliteCryptoStore {
821 type Error = Error;
822
823 async fn load_account(&self) -> Result<Option<Account>> {
824 let conn = self.acquire().await?;
825 if let Some(pickle) = conn.get_kv("account").await? {
826 let pickle = self.deserialize_value(&pickle)?;
827
828 let account = Account::from_pickle(pickle).map_err(|_| Error::Unpickle)?;
829
830 *self.static_account.write().unwrap() = Some(account.static_data().clone());
831
832 Ok(Some(account))
833 } else {
834 Ok(None)
835 }
836 }
837
838 async fn load_identity(&self) -> Result<Option<PrivateCrossSigningIdentity>> {
839 let conn = self.acquire().await?;
840 if let Some(i) = conn.get_kv("identity").await? {
841 let pickle = self.deserialize_value(&i)?;
842 Ok(Some(PrivateCrossSigningIdentity::from_pickle(pickle).map_err(|_| Error::Unpickle)?))
843 } else {
844 Ok(None)
845 }
846 }
847
848 async fn save_pending_changes(&self, changes: PendingChanges) -> Result<()> {
849 let _guard = self.save_changes_lock.lock().await;
854
855 let pickled_account = if let Some(account) = changes.account {
856 *self.static_account.write().unwrap() = Some(account.static_data().clone());
857 Some(account.pickle())
858 } else {
859 None
860 };
861
862 let this = self.clone();
863 self.acquire()
864 .await?
865 .with_transaction(move |txn| {
866 if let Some(pickled_account) = pickled_account {
867 let serialized_account = this.serialize_value(&pickled_account)?;
868 txn.set_kv("account", &serialized_account)?;
869 }
870
871 Ok::<_, Error>(())
872 })
873 .await?;
874
875 Ok(())
876 }
877
878 async fn save_changes(&self, changes: Changes) -> Result<()> {
879 let _guard = self.save_changes_lock.lock().await;
884
885 let pickled_private_identity =
886 if let Some(i) = changes.private_identity { Some(i.pickle().await) } else { None };
887
888 let mut session_changes = Vec::new();
889
890 for session in changes.sessions {
891 let session_id = self.encode_key("session", session.session_id());
892 let sender_key = self.encode_key("session", session.sender_key().to_base64());
893 let pickle = session.pickle().await;
894 session_changes.push((session_id, sender_key, pickle));
895 }
896
897 let mut inbound_session_changes = Vec::new();
898 for session in changes.inbound_group_sessions {
899 let room_id = self.encode_key("inbound_group_session", session.room_id().as_bytes());
900 let session_id = self.encode_key("inbound_group_session", session.session_id());
901 let pickle = session.pickle().await;
902 let sender_key =
903 self.encode_key("inbound_group_session", session.sender_key().to_base64());
904 inbound_session_changes.push((room_id, session_id, pickle, sender_key));
905 }
906
907 let mut outbound_session_changes = Vec::new();
908 for session in changes.outbound_group_sessions {
909 let room_id = self.encode_key("outbound_group_session", session.room_id().as_bytes());
910 let pickle = session.pickle().await;
911 outbound_session_changes.push((room_id, pickle));
912 }
913
914 let this = self.clone();
915 self.acquire()
916 .await?
917 .with_transaction(move |txn| {
918 if let Some(pickled_private_identity) = &pickled_private_identity {
919 let serialized_private_identity =
920 this.serialize_value(pickled_private_identity)?;
921 txn.set_kv("identity", &serialized_private_identity)?;
922 }
923
924 if let Some(token) = &changes.next_batch_token {
925 let serialized_token = this.serialize_value(token)?;
926 txn.set_kv("next_batch_token", &serialized_token)?;
927 }
928
929 if let Some(decryption_key) = &changes.backup_decryption_key {
930 let serialized_decryption_key = this.serialize_value(decryption_key)?;
931 txn.set_kv("recovery_key_v1", &serialized_decryption_key)?;
932 }
933
934 if let Some(backup_version) = &changes.backup_version {
935 let serialized_backup_version = this.serialize_value(backup_version)?;
936 txn.set_kv("backup_version_v1", &serialized_backup_version)?;
937 }
938
939 if let Some(pickle_key) = &changes.dehydrated_device_pickle_key {
940 let serialized_pickle_key = this.serialize_value(pickle_key)?;
941 txn.set_kv(DEHYDRATED_DEVICE_PICKLE_KEY, &serialized_pickle_key)?;
942 }
943
944 for device in changes.devices.new.iter().chain(&changes.devices.changed) {
945 let user_id = this.encode_key("device", device.user_id().as_bytes());
946 let device_id = this.encode_key("device", device.device_id().as_bytes());
947 let data = this.serialize_value(&device)?;
948 txn.set_device(&user_id, &device_id, &data)?;
949 }
950
951 for device in &changes.devices.deleted {
952 let user_id = this.encode_key("device", device.user_id().as_bytes());
953 let device_id = this.encode_key("device", device.device_id().as_bytes());
954 txn.delete_device(&user_id, &device_id)?;
955 }
956
957 for identity in changes.identities.changed.iter().chain(&changes.identities.new) {
958 let user_id = this.encode_key("identity", identity.user_id().as_bytes());
959 let data = this.serialize_value(&identity)?;
960 txn.set_identity(&user_id, &data)?;
961 }
962
963 for (session_id, sender_key, pickle) in &session_changes {
964 let serialized_session = this.serialize_value(&pickle)?;
965 txn.set_session(session_id, sender_key, &serialized_session)?;
966 }
967
968 for (room_id, session_id, pickle, sender_key) in &inbound_session_changes {
969 let serialized_session = this.serialize_value(&pickle)?;
970 txn.set_inbound_group_session(
971 room_id,
972 session_id,
973 &serialized_session,
974 pickle.backed_up,
975 Some(sender_key),
976 Some(pickle.sender_data.to_type() as u8),
977 )?;
978 }
979
980 for (room_id, pickle) in &outbound_session_changes {
981 let serialized_session = this.serialize_json(&pickle)?;
982 txn.set_outbound_group_session(room_id, &serialized_session)?;
983 }
984
985 for hash in &changes.message_hashes {
986 let hash = rmp_serde::to_vec(hash)?;
987 txn.add_olm_hash(&hash)?;
988 }
989
990 for request in changes.key_requests {
991 let request_id = this.encode_key("key_requests", request.request_id.as_bytes());
992 let serialized_request = this.serialize_value(&request)?;
993 txn.set_key_request(&request_id, request.sent_out, &serialized_request)?;
994 }
995
996 for (room_id, data) in changes.withheld_session_info {
997 for (session_id, event) in data {
998 let session_id = this.encode_key("direct_withheld_info", session_id);
999 let room_id = this.encode_key("direct_withheld_info", &room_id);
1000 let serialized_info = this.serialize_json(&event)?;
1001 txn.set_direct_withheld(&session_id, &room_id, &serialized_info)?;
1002 }
1003 }
1004
1005 for (room_id, settings) in changes.room_settings {
1006 let room_id = this.encode_key("room_settings", room_id.as_bytes());
1007 let value = this.serialize_value(&settings)?;
1008 txn.set_room_settings(&room_id, &value)?;
1009 }
1010
1011 for secret in changes.secrets {
1012 let secret_name = this.encode_key("secrets", secret.secret_name.to_string());
1013 let value = this.serialize_json(&secret)?;
1014 txn.set_secret(&secret_name, &value)?;
1015 }
1016
1017 for bundle in changes.received_room_key_bundles {
1018 let room_id =
1019 this.encode_key("received_room_key_bundle", &bundle.bundle_data.room_id);
1020 let user_id = this.encode_key("received_room_key_bundle", &bundle.sender_user);
1021 let value = this.serialize_value(&bundle)?;
1022 txn.set_received_room_key_bundle(&room_id, &user_id, &value)?;
1023 }
1024
1025 Ok::<_, Error>(())
1026 })
1027 .await?;
1028
1029 Ok(())
1030 }
1031
1032 async fn save_inbound_group_sessions(
1033 &self,
1034 sessions: Vec<InboundGroupSession>,
1035 backed_up_to_version: Option<&str>,
1036 ) -> matrix_sdk_crypto::store::Result<(), Self::Error> {
1037 sessions.iter().for_each(|s| {
1039 let backed_up = s.backed_up();
1040 if backed_up != backed_up_to_version.is_some() {
1041 warn!(
1042 backed_up,
1043 backed_up_to_version,
1044 "Session backed-up flag does not correspond to backup version setting",
1045 );
1046 }
1047 });
1048
1049 self.save_changes(Changes { inbound_group_sessions: sessions, ..Changes::default() }).await
1052 }
1053
1054 async fn get_sessions(&self, sender_key: &str) -> Result<Option<Vec<Session>>> {
1055 let device_keys = self.get_own_device().await?.as_device_keys().clone();
1056
1057 let sessions: Vec<_> = self
1058 .acquire()
1059 .await?
1060 .get_sessions_for_sender_key(self.encode_key("session", sender_key.as_bytes()))
1061 .await?
1062 .into_iter()
1063 .map(|bytes| {
1064 let pickle = self.deserialize_value(&bytes)?;
1065 Session::from_pickle(device_keys.clone(), pickle).map_err(|_| Error::AccountUnset)
1066 })
1067 .collect::<Result<_>>()?;
1068
1069 if sessions.is_empty() { Ok(None) } else { Ok(Some(sessions)) }
1070 }
1071
1072 #[instrument(skip(self))]
1073 async fn get_inbound_group_session(
1074 &self,
1075 room_id: &RoomId,
1076 session_id: &str,
1077 ) -> Result<Option<InboundGroupSession>> {
1078 let session_id = self.encode_key("inbound_group_session", session_id);
1079 let Some((room_id_from_db, value, backed_up)) =
1080 self.acquire().await?.get_inbound_group_session(session_id).await?
1081 else {
1082 return Ok(None);
1083 };
1084
1085 let room_id = self.encode_key("inbound_group_session", room_id.as_bytes());
1086 if *room_id != room_id_from_db {
1087 warn!("expected room_id for session_id doesn't match what's in the DB");
1088 return Ok(None);
1089 }
1090
1091 Ok(Some(self.deserialize_and_unpickle_inbound_group_session(value, backed_up)?))
1092 }
1093
1094 async fn get_inbound_group_sessions(&self) -> Result<Vec<InboundGroupSession>> {
1095 self.acquire()
1096 .await?
1097 .get_inbound_group_sessions()
1098 .await?
1099 .into_iter()
1100 .map(|(value, backed_up)| {
1101 self.deserialize_and_unpickle_inbound_group_session(value, backed_up)
1102 })
1103 .collect()
1104 }
1105
1106 async fn get_inbound_group_sessions_by_room_id(
1107 &self,
1108 room_id: &RoomId,
1109 ) -> Result<Vec<InboundGroupSession>> {
1110 let room_id = self.encode_key("inbound_group_session", room_id.as_bytes());
1111 self.acquire()
1112 .await?
1113 .get_inbound_group_sessions_by_room_id(room_id)
1114 .await?
1115 .into_iter()
1116 .map(|(value, backed_up)| {
1117 self.deserialize_and_unpickle_inbound_group_session(value, backed_up)
1118 })
1119 .collect()
1120 }
1121
1122 async fn get_inbound_group_sessions_for_device_batch(
1123 &self,
1124 sender_key: Curve25519PublicKey,
1125 sender_data_type: SenderDataType,
1126 after_session_id: Option<String>,
1127 limit: usize,
1128 ) -> Result<Vec<InboundGroupSession>, Self::Error> {
1129 let after_session_id =
1130 after_session_id.map(|session_id| self.encode_key("inbound_group_session", session_id));
1131 let sender_key = self.encode_key("inbound_group_session", sender_key.to_base64());
1132
1133 self.acquire()
1134 .await?
1135 .get_inbound_group_sessions_for_device_batch(
1136 sender_key,
1137 sender_data_type,
1138 after_session_id,
1139 limit,
1140 )
1141 .await?
1142 .into_iter()
1143 .map(|(value, backed_up)| {
1144 self.deserialize_and_unpickle_inbound_group_session(value, backed_up)
1145 })
1146 .collect()
1147 }
1148
1149 async fn inbound_group_session_counts(
1150 &self,
1151 backup_version: Option<&str>,
1152 ) -> Result<RoomKeyCounts> {
1153 Ok(self.acquire().await?.get_inbound_group_session_counts(backup_version).await?)
1154 }
1155
1156 async fn inbound_group_sessions_for_backup(
1157 &self,
1158 _backup_version: &str,
1159 limit: usize,
1160 ) -> Result<Vec<InboundGroupSession>> {
1161 self.acquire()
1162 .await?
1163 .get_inbound_group_sessions_for_backup(limit)
1164 .await?
1165 .into_iter()
1166 .map(|value| self.deserialize_and_unpickle_inbound_group_session(value, false))
1167 .collect()
1168 }
1169
1170 async fn mark_inbound_group_sessions_as_backed_up(
1171 &self,
1172 _backup_version: &str,
1173 session_ids: &[(&RoomId, &str)],
1174 ) -> Result<()> {
1175 Ok(self
1176 .acquire()
1177 .await?
1178 .mark_inbound_group_sessions_as_backed_up(
1179 session_ids
1180 .iter()
1181 .map(|(_, s)| self.encode_key("inbound_group_session", s))
1182 .collect(),
1183 )
1184 .await?)
1185 }
1186
1187 async fn reset_backup_state(&self) -> Result<()> {
1188 Ok(self.acquire().await?.reset_inbound_group_session_backup_state().await?)
1189 }
1190
1191 async fn load_backup_keys(&self) -> Result<BackupKeys> {
1192 let conn = self.acquire().await?;
1193
1194 let backup_version = conn
1195 .get_kv("backup_version_v1")
1196 .await?
1197 .map(|value| self.deserialize_value(&value))
1198 .transpose()?;
1199
1200 let decryption_key = conn
1201 .get_kv("recovery_key_v1")
1202 .await?
1203 .map(|value| self.deserialize_value(&value))
1204 .transpose()?;
1205
1206 Ok(BackupKeys { backup_version, decryption_key })
1207 }
1208
1209 async fn load_dehydrated_device_pickle_key(&self) -> Result<Option<DehydratedDeviceKey>> {
1210 let conn = self.acquire().await?;
1211
1212 conn.get_kv(DEHYDRATED_DEVICE_PICKLE_KEY)
1213 .await?
1214 .map(|value| self.deserialize_value(&value))
1215 .transpose()
1216 }
1217
1218 async fn delete_dehydrated_device_pickle_key(&self) -> Result<(), Self::Error> {
1219 let conn = self.acquire().await?;
1220 conn.clear_kv(DEHYDRATED_DEVICE_PICKLE_KEY).await?;
1221
1222 Ok(())
1223 }
1224 async fn get_outbound_group_session(
1225 &self,
1226 room_id: &RoomId,
1227 ) -> Result<Option<OutboundGroupSession>> {
1228 let room_id = self.encode_key("outbound_group_session", room_id.as_bytes());
1229 let Some(value) = self.acquire().await?.get_outbound_group_session(room_id).await? else {
1230 return Ok(None);
1231 };
1232
1233 let account_info = self.get_static_account().ok_or(Error::AccountUnset)?;
1234
1235 let pickle = self.deserialize_json(&value)?;
1236 let session = OutboundGroupSession::from_pickle(
1237 account_info.device_id,
1238 account_info.identity_keys,
1239 pickle,
1240 )
1241 .map_err(|_| Error::Unpickle)?;
1242
1243 return Ok(Some(session));
1244 }
1245
1246 async fn load_tracked_users(&self) -> Result<Vec<TrackedUser>> {
1247 self.acquire()
1248 .await?
1249 .get_tracked_users()
1250 .await?
1251 .iter()
1252 .map(|value| self.deserialize_value(value))
1253 .collect()
1254 }
1255
1256 async fn save_tracked_users(&self, tracked_users: &[(&UserId, bool)]) -> Result<()> {
1257 let users: Vec<(Key, Vec<u8>)> = tracked_users
1258 .iter()
1259 .map(|(u, d)| {
1260 let user_id = self.encode_key("tracked_users", u.as_bytes());
1261 let data =
1262 self.serialize_value(&TrackedUser { user_id: (*u).into(), dirty: *d })?;
1263 Ok((user_id, data))
1264 })
1265 .collect::<Result<_>>()?;
1266
1267 Ok(self.acquire().await?.add_tracked_users(users).await?)
1268 }
1269
1270 async fn get_device(
1271 &self,
1272 user_id: &UserId,
1273 device_id: &DeviceId,
1274 ) -> Result<Option<DeviceData>> {
1275 let user_id = self.encode_key("device", user_id.as_bytes());
1276 let device_id = self.encode_key("device", device_id.as_bytes());
1277 Ok(self
1278 .acquire()
1279 .await?
1280 .get_device(user_id, device_id)
1281 .await?
1282 .map(|value| self.deserialize_value(&value))
1283 .transpose()?)
1284 }
1285
1286 async fn get_user_devices(
1287 &self,
1288 user_id: &UserId,
1289 ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
1290 let user_id = self.encode_key("device", user_id.as_bytes());
1291 self.acquire()
1292 .await?
1293 .get_user_devices(user_id)
1294 .await?
1295 .into_iter()
1296 .map(|value| {
1297 let device: DeviceData = self.deserialize_value(&value)?;
1298 Ok((device.device_id().to_owned(), device))
1299 })
1300 .collect()
1301 }
1302
1303 async fn get_own_device(&self) -> Result<DeviceData> {
1304 let account_info = self.get_static_account().ok_or(Error::AccountUnset)?;
1305
1306 Ok(self
1307 .get_device(&account_info.user_id, &account_info.device_id)
1308 .await?
1309 .expect("We should be able to find our own device."))
1310 }
1311
1312 async fn get_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentityData>> {
1313 let user_id = self.encode_key("identity", user_id.as_bytes());
1314 Ok(self
1315 .acquire()
1316 .await?
1317 .get_user_identity(user_id)
1318 .await?
1319 .map(|value| self.deserialize_value(&value))
1320 .transpose()?)
1321 }
1322
1323 async fn is_message_known(
1324 &self,
1325 message_hash: &matrix_sdk_crypto::olm::OlmMessageHash,
1326 ) -> Result<bool> {
1327 let value = rmp_serde::to_vec(message_hash)?;
1328 Ok(self.acquire().await?.has_olm_hash(value).await?)
1329 }
1330
1331 async fn get_outgoing_secret_requests(
1332 &self,
1333 request_id: &TransactionId,
1334 ) -> Result<Option<GossipRequest>> {
1335 let request_id = self.encode_key("key_requests", request_id.as_bytes());
1336 Ok(self
1337 .acquire()
1338 .await?
1339 .get_outgoing_secret_request(request_id)
1340 .await?
1341 .map(|(value, sent_out)| self.deserialize_key_request(&value, sent_out))
1342 .transpose()?)
1343 }
1344
1345 async fn get_secret_request_by_info(
1346 &self,
1347 key_info: &SecretInfo,
1348 ) -> Result<Option<GossipRequest>> {
1349 let requests = self.acquire().await?.get_outgoing_secret_requests().await?;
1350 for (request, sent_out) in requests {
1351 let request = self.deserialize_key_request(&request, sent_out)?;
1352 if request.info == *key_info {
1353 return Ok(Some(request));
1354 }
1355 }
1356 Ok(None)
1357 }
1358
1359 async fn get_unsent_secret_requests(&self) -> Result<Vec<GossipRequest>> {
1360 self.acquire()
1361 .await?
1362 .get_unsent_secret_requests()
1363 .await?
1364 .iter()
1365 .map(|value| {
1366 let request = self.deserialize_key_request(value, false)?;
1367 Ok(request)
1368 })
1369 .collect()
1370 }
1371
1372 async fn delete_outgoing_secret_requests(&self, request_id: &TransactionId) -> Result<()> {
1373 let request_id = self.encode_key("key_requests", request_id.as_bytes());
1374 Ok(self.acquire().await?.delete_key_request(request_id).await?)
1375 }
1376
1377 async fn get_secrets_from_inbox(
1378 &self,
1379 secret_name: &SecretName,
1380 ) -> Result<Vec<GossippedSecret>> {
1381 let secret_name = self.encode_key("secrets", secret_name.to_string());
1382
1383 self.acquire()
1384 .await?
1385 .get_secrets_from_inbox(secret_name)
1386 .await?
1387 .into_iter()
1388 .map(|value| self.deserialize_json(value.as_ref()))
1389 .collect()
1390 }
1391
1392 async fn delete_secrets_from_inbox(&self, secret_name: &SecretName) -> Result<()> {
1393 let secret_name = self.encode_key("secrets", secret_name.to_string());
1394 self.acquire().await?.delete_secrets_from_inbox(secret_name).await
1395 }
1396
1397 async fn get_withheld_info(
1398 &self,
1399 room_id: &RoomId,
1400 session_id: &str,
1401 ) -> Result<Option<RoomKeyWithheldEntry>> {
1402 let room_id = self.encode_key("direct_withheld_info", room_id);
1403 let session_id = self.encode_key("direct_withheld_info", session_id);
1404
1405 self.acquire()
1406 .await?
1407 .get_direct_withheld_info(session_id, room_id)
1408 .await?
1409 .map(|value| {
1410 let info = self.deserialize_json::<RoomKeyWithheldEntry>(&value)?;
1411 Ok(info)
1412 })
1413 .transpose()
1414 }
1415
1416 async fn get_withheld_sessions_by_room_id(
1417 &self,
1418 room_id: &RoomId,
1419 ) -> matrix_sdk_crypto::store::Result<Vec<RoomKeyWithheldEntry>, Self::Error> {
1420 let room_id = self.encode_key("direct_withheld_info", room_id);
1421
1422 self.acquire()
1423 .await?
1424 .get_withheld_sessions_by_room_id(room_id)
1425 .await?
1426 .into_iter()
1427 .map(|value| self.deserialize_json(&value))
1428 .collect()
1429 }
1430
1431 async fn get_room_settings(&self, room_id: &RoomId) -> Result<Option<RoomSettings>> {
1432 let room_id = self.encode_key("room_settings", room_id.as_bytes());
1433 let Some(value) = self.acquire().await?.get_room_settings(room_id).await? else {
1434 return Ok(None);
1435 };
1436
1437 let settings = self.deserialize_value(&value)?;
1438
1439 return Ok(Some(settings));
1440 }
1441
1442 async fn get_received_room_key_bundle_data(
1443 &self,
1444 room_id: &RoomId,
1445 user_id: &UserId,
1446 ) -> Result<Option<StoredRoomKeyBundleData>> {
1447 let room_id = self.encode_key("received_room_key_bundle", room_id);
1448 let user_id = self.encode_key("received_room_key_bundle", user_id);
1449 self.acquire()
1450 .await?
1451 .get_received_room_key_bundle(room_id, user_id)
1452 .await?
1453 .map(|value| self.deserialize_value(&value))
1454 .transpose()
1455 }
1456
1457 async fn get_custom_value(&self, key: &str) -> Result<Option<Vec<u8>>> {
1458 let Some(serialized) = self.acquire().await?.get_kv(key).await? else {
1459 return Ok(None);
1460 };
1461 let value = if let Some(cipher) = &self.store_cipher {
1462 let encrypted = rmp_serde::from_slice(&serialized)?;
1463 cipher.decrypt_value_data(encrypted)?
1464 } else {
1465 serialized
1466 };
1467
1468 Ok(Some(value))
1469 }
1470
1471 async fn set_custom_value(&self, key: &str, value: Vec<u8>) -> Result<()> {
1472 let serialized = if let Some(cipher) = &self.store_cipher {
1473 let encrypted = cipher.encrypt_value_data(value)?;
1474 rmp_serde::to_vec_named(&encrypted)?
1475 } else {
1476 value
1477 };
1478
1479 self.acquire().await?.set_kv(key, serialized).await?;
1480 Ok(())
1481 }
1482
1483 async fn remove_custom_value(&self, key: &str) -> Result<()> {
1484 let key = key.to_owned();
1485 self.acquire()
1486 .await?
1487 .interact(move |conn| conn.execute("DELETE FROM kv WHERE key = ?1", (&key,)))
1488 .await
1489 .unwrap()?;
1490 Ok(())
1491 }
1492
1493 #[instrument(skip(self))]
1494 async fn try_take_leased_lock(
1495 &self,
1496 lease_duration_ms: u32,
1497 key: &str,
1498 holder: &str,
1499 ) -> Result<Option<CrossProcessLockGeneration>> {
1500 let key = key.to_owned();
1501 let holder = holder.to_owned();
1502
1503 let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
1504 let expiration = now + lease_duration_ms as u64;
1505
1506 let generation = self
1508 .acquire()
1509 .await?
1510 .with_transaction(move |txn| {
1511 txn.query_row(
1512 "INSERT INTO lease_locks (key, holder, expiration)
1513 VALUES (?1, ?2, ?3)
1514 ON CONFLICT (key)
1515 DO
1516 UPDATE SET
1517 holder = excluded.holder,
1518 expiration = excluded.expiration,
1519 generation =
1520 CASE holder
1521 WHEN excluded.holder THEN generation
1522 ELSE generation + 1
1523 END
1524 WHERE
1525 holder = excluded.holder
1526 OR expiration < ?4
1527 RETURNING generation
1528 ",
1529 (key, holder, expiration, now),
1530 |row| row.get(0),
1531 )
1532 .optional()
1533 })
1534 .await?;
1535
1536 Ok(generation)
1537 }
1538
1539 async fn next_batch_token(&self) -> Result<Option<String>, Self::Error> {
1540 let conn = self.acquire().await?;
1541 if let Some(token) = conn.get_kv("next_batch_token").await? {
1542 let maybe_token: Option<String> = self.deserialize_value(&token)?;
1543 Ok(maybe_token)
1544 } else {
1545 Ok(None)
1546 }
1547 }
1548
1549 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1550 Ok(Some(self.pool.get().await?.get_db_size().await?))
1551 }
1552}
1553
1554#[cfg(test)]
1555mod tests {
1556 use std::path::Path;
1557
1558 use matrix_sdk_common::deserialized_responses::WithheldCode;
1559 use matrix_sdk_crypto::{
1560 cryptostore_integration_tests, cryptostore_integration_tests_time, olm::SenderDataType,
1561 store::CryptoStore,
1562 };
1563 use matrix_sdk_test::async_test;
1564 use once_cell::sync::Lazy;
1565 use ruma::{device_id, room_id, user_id};
1566 use similar_asserts::assert_eq;
1567 use tempfile::{TempDir, tempdir};
1568 use tokio::fs;
1569
1570 use super::SqliteCryptoStore;
1571 use crate::SqliteStoreConfig;
1572
1573 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1574
1575 struct TestDb {
1576 _dir: TempDir,
1579 database: SqliteCryptoStore,
1580 }
1581
1582 fn copy_db(data_path: &str) -> TempDir {
1583 let db_name = super::DATABASE_NAME;
1584
1585 let manifest_path = Path::new(env!("CARGO_MANIFEST_DIR")).join("../..");
1586 let database_path = manifest_path.join(data_path).join(db_name);
1587
1588 let tmpdir = tempdir().unwrap();
1589 let destination = tmpdir.path().join(db_name);
1590
1591 std::fs::copy(&database_path, destination).unwrap();
1593
1594 tmpdir
1595 }
1596
1597 async fn get_test_db(data_path: &str, passphrase: Option<&str>) -> TestDb {
1598 let tmpdir = copy_db(data_path);
1599
1600 let database = SqliteCryptoStore::open(tmpdir.path(), passphrase)
1601 .await
1602 .expect("Can't open the test store");
1603
1604 TestDb { _dir: tmpdir, database }
1605 }
1606
1607 #[async_test]
1608 async fn test_pool_size() {
1609 let store_open_config =
1610 SqliteStoreConfig::new(TMP_DIR.path().join("test_pool_size")).pool_max_size(42);
1611
1612 let store = SqliteCryptoStore::open_with_config(store_open_config).await.unwrap();
1613
1614 assert_eq!(store.pool.status().max_size, 42);
1615 }
1616
1617 #[async_test]
1620 async fn test_open_test_vector_store() {
1621 let TestDb { _dir: _, database } = get_test_db("testing/data/storage", None).await;
1622
1623 let account = database
1624 .load_account()
1625 .await
1626 .unwrap()
1627 .expect("The test database is prefilled with data, we should find an account");
1628
1629 let user_id = account.user_id();
1630 let device_id = account.device_id();
1631
1632 assert_eq!(
1633 user_id.as_str(),
1634 "@pjtest:synapse-oidc.element.dev",
1635 "The user ID should match to the one we expect."
1636 );
1637
1638 assert_eq!(
1639 device_id.as_str(),
1640 "v4TqgcuIH6",
1641 "The device ID should match to the one we expect."
1642 );
1643
1644 let device = database
1645 .get_device(user_id, device_id)
1646 .await
1647 .unwrap()
1648 .expect("Our own device should be found in the store.");
1649
1650 assert_eq!(device.device_id(), device_id);
1651 assert_eq!(device.user_id(), user_id);
1652
1653 assert_eq!(
1654 device.ed25519_key().expect("The device should have a Ed25519 key.").to_base64(),
1655 "+cxl1Gl3du5i7UJwfWnoRDdnafFF+xYdAiTYYhYLr8s"
1656 );
1657
1658 assert_eq!(
1659 device.curve25519_key().expect("The device should have a Curve25519 key.").to_base64(),
1660 "4SL9eEUlpyWSUvjljC5oMjknHQQJY7WZKo5S1KL/5VU"
1661 );
1662
1663 let identity = database
1664 .get_user_identity(user_id)
1665 .await
1666 .unwrap()
1667 .expect("The store should contain an identity.");
1668
1669 assert_eq!(identity.user_id(), user_id);
1670
1671 let identity = identity
1672 .own()
1673 .expect("The identity should be of the correct type, it should be our own identity.");
1674
1675 let master_key = identity
1676 .master_key()
1677 .get_first_key()
1678 .expect("Our own identity should have a master key");
1679
1680 assert_eq!(master_key.to_base64(), "iCUEtB1RwANeqRa5epDrblLk4mer/36sylwQ5hYY3oE");
1681 }
1682
1683 #[async_test]
1686 async fn test_open_test_vector_encrypted_store() {
1687 let TestDb { _dir: _, database } = get_test_db(
1688 "testing/data/storage/alice",
1689 Some(concat!(
1690 "/rCia2fYAJ+twCZ1Xm2mxFCYcmJdyzkdJjwtgXsziWpYS/UeNxnixuSieuwZXm+x1VsJHmWpl",
1691 "H+QIQBZpEGZtC9/S/l8xK+WOCesmET0o6yJ/KP73ofDtjBlnNpPwuHLKFpyTbyicpCgQ4UT+5E",
1692 "UBuJ08TY9Ujdf1D13k5kr5tSZUefDKKCuG1fCRqlU8ByRas1PMQsZxT2W8t7QgBrQiiGmhpo/O",
1693 "Ti4hfx97GOxncKcxTzppiYQNoHs/f15+XXQD7/oiCcqRIuUlXNsU6hRpFGmbYx2Pi1eyQViQCt",
1694 "B5dAEiSD0N8U81wXYnpynuTPtnL+hfnOJIn7Sy7mkERQeKg"
1695 )),
1696 )
1697 .await;
1698
1699 let account = database
1700 .load_account()
1701 .await
1702 .unwrap()
1703 .expect("The test database is prefilled with data, we should find an account");
1704
1705 let user_id = account.user_id();
1706 let device_id = account.device_id();
1707
1708 assert_eq!(
1709 user_id.as_str(),
1710 "@alice:localhost",
1711 "The user ID should match to the one we expect."
1712 );
1713
1714 assert_eq!(
1715 device_id.as_str(),
1716 "JVVORTHFXY",
1717 "The device ID should match to the one we expect."
1718 );
1719
1720 let tracked_users =
1721 database.load_tracked_users().await.expect("Should be tracking some users");
1722
1723 assert_eq!(tracked_users.len(), 6);
1724
1725 let known_users = vec![
1726 user_id!("@alice:localhost"),
1727 user_id!("@dehydration3:localhost"),
1728 user_id!("@eve:localhost"),
1729 user_id!("@bob:localhost"),
1730 user_id!("@malo:localhost"),
1731 user_id!("@carl:localhost"),
1732 ];
1733
1734 for user_id in known_users {
1736 database.get_user_identity(user_id).await.expect("Should load this identity").unwrap();
1737 }
1738
1739 let carl_identity =
1740 database.get_user_identity(user_id!("@carl:localhost")).await.unwrap().unwrap();
1741
1742 assert_eq!(
1743 carl_identity.master_key().get_first_key().unwrap().to_base64(),
1744 "CdhKYYDeBDQveOioXEGWhTPCyzc63Irpar3CNyfun2Q"
1745 );
1746 assert!(!carl_identity.was_previously_verified());
1747
1748 let bob_identity =
1749 database.get_user_identity(user_id!("@bob:localhost")).await.unwrap().unwrap();
1750
1751 assert_eq!(
1752 bob_identity.master_key().get_first_key().unwrap().to_base64(),
1753 "COh2GYOJWSjem5QPRCaGp9iWV83IELG1IzLKW2S3pFY"
1754 );
1755 assert!(bob_identity.was_previously_verified());
1757
1758 let known_devices = vec![
1759 (device_id!("OPXQHCZSKW"), user_id!("@alice:localhost")),
1760 (
1762 device_id!("EvW+9IrGR10KVgVeZP25/KaPfx4R86FofVMcaz7VOho"),
1763 user_id!("@alice:localhost"),
1764 ),
1765 (device_id!("HEEFRFQENV"), user_id!("@alice:localhost")),
1766 (device_id!("JVVORTHFXY"), user_id!("@alice:localhost")),
1767 (device_id!("NQUWWSKKHS"), user_id!("@alice:localhost")),
1768 (device_id!("ORBLPFYCPG"), user_id!("@alice:localhost")),
1769 (device_id!("YXOWENSEGM"), user_id!("@dehydration3:localhost")),
1770 (device_id!("VXLFMYCHXC"), user_id!("@bob:localhost")),
1771 (device_id!("FDGDQAEWOW"), user_id!("@bob:localhost")),
1772 (device_id!("VXLFMYCHXC"), user_id!("@bob:localhost")),
1773 (device_id!("FDGDQAEWOW"), user_id!("@bob:localhost")),
1774 (device_id!("QKUKWJTTQC"), user_id!("@malo:localhost")),
1775 (device_id!("LOUXJECTFG"), user_id!("@malo:localhost")),
1776 (device_id!("MKKMAEVLPB"), user_id!("@carl:localhost")),
1777 ];
1778
1779 for (device_id, user_id) in known_devices {
1780 database.get_device(user_id, device_id).await.expect("Should load the device").unwrap();
1781 }
1782
1783 let known_sender_key_to_session_count = vec![
1784 ("FfYcYfDF4nWy+LHdK6CEpIMlFAQDORc30WUkghL06kM", 1),
1785 ("EvW+9IrGR10KVgVeZP25/KaPfx4R86FofVMcaz7VOho", 1),
1786 ("hAGsoA4a9M6wwEUX5Q1jux1i+tUngLi01n5AmhDoHTY", 1),
1787 ("aKqtSJymLzuoglWFwPGk1r/Vm2LE2hFESzXxn4RNjRM", 0),
1788 ("zHK1psCrgeMn0kaz8hcdvA3INyar9jg1yfrSp0p1pHo", 1),
1789 ("1QmBA316Wj5jIFRwNOti6N6Xh/vW0bsYCcR4uPfy8VQ", 1),
1790 ("g5ef2vZF3VXgSPyODIeXpyHIRkuthvLhGvd6uwYggWU", 1),
1791 ("o7hfupPd1VsNkRIvdlH6ujrEJFSKjFCGbxhAd31XxjI", 1),
1792 ("Z3RxKQLxY7xpP+ZdOGR2SiNE37SrvmRhW7GPu1UGdm8", 1),
1793 ("GDomaav8NiY3J+dNEeApJm+O0FooJ3IpVaIyJzCN4w4", 1),
1794 ("7m7fqkHyEr47V5s/KjaxtJMOr3pSHrrns2q2lWpAQi8", 0),
1795 ("9psAkPUIF8vNbWbnviX3PlwRcaeO53EHJdNtKpTY1X0", 0),
1796 ("mqanh+ztw5oRtpqYQgLGW864i6NY2zpoKMIlrcyC+Aw", 0),
1797 ("fJU/TJdbsv7tVbbpHw1Ke73ziElnM32cNhP2WIg4T10", 0),
1798 ("sUIeFeFcCZoa5IC6nJ6Vrbvztcyx09m8BBg57XKRClg", 1),
1799 ];
1800
1801 for (id, count) in known_sender_key_to_session_count {
1802 let olm_sessions =
1803 database.get_sessions(id).await.expect("Should have some olm sessions");
1804
1805 println!("### Session id: {id:?}");
1806 assert_eq!(olm_sessions.map_or(0, |v| v.len()), count);
1807 }
1808
1809 let inbound_group_sessions = database.get_inbound_group_sessions().await.unwrap();
1810 assert_eq!(inbound_group_sessions.len(), 15);
1811 let known_inbound_group_sessions = vec![
1812 (
1813 "5hNAxrLai3VI0LKBwfh3wLfksfBFWds0W1a5X5/vSXA",
1814 room_id!("!SRstFdydzrGwJYtVfm:localhost"),
1815 ),
1816 (
1817 "M6d2eU3y54gaYTbvGSlqa/xc1Az35l56Cp9sxzHWO4g",
1818 room_id!("!SRstFdydzrGwJYtVfm:localhost"),
1819 ),
1820 (
1821 "IrydwXkRk2N2AqUMIVmLL3oJgMq14R9KId0P/uSD100",
1822 room_id!("!SRstFdydzrGwJYtVfm:localhost"),
1823 ),
1824 (
1825 "Y74+l9jTo7N5UF+GQwdpgJGe4sn1+QtWITq7BxulHIE",
1826 room_id!("!SRstFdydzrGwJYtVfm:localhost"),
1827 ),
1828 (
1829 "HpJxQR57WbQGdY6w2Q+C16znVvbXGa+JvQdRoMpWbXg",
1830 room_id!("!SRstFdydzrGwJYtVfm:localhost"),
1831 ),
1832 (
1833 "Xetvi+ydFkZt8dpONGFbEusQb/Chc2V0XlLByZhsbgE",
1834 room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
1835 ),
1836 (
1837 "wv/WN/39akyerIXczTaIpjAuLnwgXKRtbXFSEHiJqxo",
1838 room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
1839 ),
1840 (
1841 "nA4gQwL//Cm8OdlyjABl/jChbPT/cP5V4Sd8iuE6H0s",
1842 room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
1843 ),
1844 (
1845 "bAAgqFeRDTjfEqL6Qf/c9mk55zoNDCSlboAIRd6b0hw",
1846 room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
1847 ),
1848 (
1849 "exPbsMMdGfAG2qmDdFtpAn+koVprfzS0Zip/RA9QRCE",
1850 room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
1851 ),
1852 (
1853 "h+om7oSw/ZV94fcKaoe8FGXJwQXWOfKQfzbGgNWQILI",
1854 room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
1855 ),
1856 (
1857 "ul3VXonpgk4lO2L3fEWubP/nxsTmLHqu5v8ZM9vHEcw",
1858 room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
1859 ),
1860 (
1861 "JXY15UxC3az2mwg8uX4qwgxfvCM4aygiIWMcdNiVQoc",
1862 room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
1863 ),
1864 (
1865 "OGB9lObr9kWUvha9tB5sMfOF/Mztk24JwQz/nwg3iFQ",
1866 room_id!("!OgRiTRMaUzLdpCeDBM:localhost"),
1867 ),
1868 (
1869 "SFkHcbxjUOYF7mUAYI/oEMDZFaXszQbCN6Jza7iemj0",
1870 room_id!("!OgRiTRMaUzLdpCeDBM:localhost"),
1871 ),
1872 ];
1873
1874 for (session_id, room_id) in &known_inbound_group_sessions {
1876 database
1877 .get_inbound_group_session(room_id, session_id)
1878 .await
1879 .expect("Should be able to load inbound group session")
1880 .unwrap();
1881 }
1882
1883 let bob_sender_verified = database
1884 .get_inbound_group_session(
1885 room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
1886 "exPbsMMdGfAG2qmDdFtpAn+koVprfzS0Zip/RA9QRCE",
1887 )
1888 .await
1889 .unwrap()
1890 .unwrap();
1891
1892 assert_eq!(bob_sender_verified.sender_data.to_type(), SenderDataType::SenderVerified);
1893 assert!(bob_sender_verified.backed_up());
1894 assert!(!bob_sender_verified.has_been_imported());
1895
1896 let alice_unknown_device = database
1897 .get_inbound_group_session(
1898 room_id!("!SRstFdydzrGwJYtVfm:localhost"),
1899 "IrydwXkRk2N2AqUMIVmLL3oJgMq14R9KId0P/uSD100",
1900 )
1901 .await
1902 .unwrap()
1903 .unwrap();
1904
1905 assert_eq!(alice_unknown_device.sender_data.to_type(), SenderDataType::UnknownDevice);
1906 assert!(alice_unknown_device.backed_up());
1907 assert!(alice_unknown_device.has_been_imported());
1908
1909 let carl_tofu_session = database
1910 .get_inbound_group_session(
1911 room_id!("!OgRiTRMaUzLdpCeDBM:localhost"),
1912 "OGB9lObr9kWUvha9tB5sMfOF/Mztk24JwQz/nwg3iFQ",
1913 )
1914 .await
1915 .unwrap()
1916 .unwrap();
1917
1918 assert_eq!(carl_tofu_session.sender_data.to_type(), SenderDataType::SenderUnverified);
1919 assert!(carl_tofu_session.backed_up());
1920 assert!(!carl_tofu_session.has_been_imported());
1921
1922 database
1924 .get_outbound_group_session(room_id!("!OgRiTRMaUzLdpCeDBM:localhost"))
1925 .await
1926 .unwrap()
1927 .unwrap();
1928 database
1929 .get_outbound_group_session(room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"))
1930 .await
1931 .unwrap()
1932 .unwrap();
1933 database
1934 .get_outbound_group_session(room_id!("!SRstFdydzrGwJYtVfm:localhost"))
1935 .await
1936 .unwrap()
1937 .unwrap();
1938
1939 let withheld_info = database
1940 .get_withheld_info(
1941 room_id!("!OgRiTRMaUzLdpCeDBM:localhost"),
1942 "SASgZ+EklvAF4QxJclMlDRlmL0fAMjAJJIKFMdb4Ht0",
1943 )
1944 .await
1945 .expect("This session should be withheld")
1946 .unwrap();
1947
1948 assert_eq!(withheld_info.content.withheld_code(), WithheldCode::Unverified);
1949
1950 let backup_keys = database.load_backup_keys().await.expect("backup key should be cached");
1951 assert_eq!(backup_keys.backup_version.unwrap(), "6");
1952 assert!(backup_keys.decryption_key.is_some());
1953 }
1954
1955 async fn get_store(
1956 name: &str,
1957 passphrase: Option<&str>,
1958 clear_data: bool,
1959 ) -> SqliteCryptoStore {
1960 let tmpdir_path = TMP_DIR.path().join(name);
1961
1962 if clear_data {
1963 let _ = fs::remove_dir_all(&tmpdir_path).await;
1964 }
1965
1966 SqliteCryptoStore::open(tmpdir_path.to_str().unwrap(), passphrase)
1967 .await
1968 .expect("Can't create a secret protected store")
1969 }
1970
1971 cryptostore_integration_tests!();
1972 cryptostore_integration_tests_time!();
1973}
1974
1975#[cfg(test)]
1976mod encrypted_tests {
1977 use matrix_sdk_crypto::{cryptostore_integration_tests, cryptostore_integration_tests_time};
1978 use once_cell::sync::Lazy;
1979 use tempfile::{TempDir, tempdir};
1980 use tokio::fs;
1981
1982 use super::SqliteCryptoStore;
1983
1984 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1985
1986 async fn get_store(
1987 name: &str,
1988 passphrase: Option<&str>,
1989 clear_data: bool,
1990 ) -> SqliteCryptoStore {
1991 let tmpdir_path = TMP_DIR.path().join(name);
1992 let pass = passphrase.unwrap_or("default_test_password");
1993
1994 if clear_data {
1995 let _ = fs::remove_dir_all(&tmpdir_path).await;
1996 }
1997
1998 SqliteCryptoStore::open(tmpdir_path.to_str().unwrap(), Some(pass))
1999 .await
2000 .expect("Can't create a secret protected store")
2001 }
2002
2003 cryptostore_integration_tests!();
2004 cryptostore_integration_tests_time!();
2005}