1use std::{
16 collections::HashMap,
17 fmt,
18 path::Path,
19 sync::{Arc, RwLock},
20};
21
22use async_trait::async_trait;
23use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
24use matrix_sdk_crypto::{
25 olm::{
26 InboundGroupSession, OutboundGroupSession, PickledInboundGroupSession,
27 PrivateCrossSigningIdentity, SenderDataType, Session, StaticAccountData,
28 },
29 store::{
30 types::{
31 BackupKeys, Changes, DehydratedDeviceKey, PendingChanges, RoomKeyCounts, RoomSettings,
32 StoredRoomKeyBundleData,
33 },
34 CryptoStore,
35 },
36 types::events::room_key_withheld::RoomKeyWithheldEvent,
37 Account, DeviceData, GossipRequest, GossippedSecret, SecretInfo, TrackedUser, UserIdentityData,
38};
39use matrix_sdk_store_encryption::StoreCipher;
40use ruma::{
41 events::secret::request::SecretName, DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId,
42 RoomId, TransactionId, UserId,
43};
44use rusqlite::{named_params, params_from_iter, OptionalExtension};
45use tokio::{fs, sync::Mutex};
46use tracing::{debug, instrument, warn};
47use vodozemac::Curve25519PublicKey;
48
49use crate::{
50 error::{Error, Result},
51 utils::{
52 repeat_vars, EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
53 SqliteKeyValueStoreConnExt,
54 },
55 OpenStoreError, Secret, SqliteStoreConfig,
56};
57
58const DATABASE_NAME: &str = "matrix-sdk-crypto.sqlite3";
60
61#[derive(Clone)]
63pub struct SqliteCryptoStore {
64 store_cipher: Option<Arc<StoreCipher>>,
65 pool: SqlitePool,
66
67 static_account: Arc<RwLock<Option<StaticAccountData>>>,
69 save_changes_lock: Arc<Mutex<()>>,
70}
71
72#[cfg(not(tarpaulin_include))]
73impl fmt::Debug for SqliteCryptoStore {
74 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75 f.debug_struct("SqliteCryptoStore").finish_non_exhaustive()
76 }
77}
78
79impl EncryptableStore for SqliteCryptoStore {
80 fn get_cypher(&self) -> Option<&StoreCipher> {
81 self.store_cipher.as_deref()
82 }
83}
84
85impl SqliteCryptoStore {
86 pub async fn open(
89 path: impl AsRef<Path>,
90 passphrase: Option<&str>,
91 ) -> Result<Self, OpenStoreError> {
92 Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
93 }
94
95 pub async fn open_with_key(
98 path: impl AsRef<Path>,
99 key: Option<&[u8; 32]>,
100 ) -> Result<Self, OpenStoreError> {
101 Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
102 }
103
104 pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
106 let SqliteStoreConfig { path, pool_config, runtime_config, secret } = config;
107
108 fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
109
110 let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
111 config.pool = Some(pool_config);
112
113 let pool = config.create_pool(Runtime::Tokio1)?;
114
115 let this = Self::open_with_pool(pool, secret).await?;
116 this.pool.get().await?.apply_runtime_config(runtime_config).await?;
117
118 Ok(this)
119 }
120
121 async fn open_with_pool(
124 pool: SqlitePool,
125 secret: Option<Secret>,
126 ) -> Result<Self, OpenStoreError> {
127 let conn = pool.get().await?;
128
129 let version = conn.db_version().await?;
130 debug!("Opened sqlite store with version {}", version);
131 run_migrations(&conn, version).await?;
132
133 let store_cipher = match secret {
134 Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
135 None => None,
136 };
137
138 Ok(SqliteCryptoStore {
139 store_cipher,
140 pool,
141 static_account: Arc::new(RwLock::new(None)),
142 save_changes_lock: Default::default(),
143 })
144 }
145
146 fn deserialize_and_unpickle_inbound_group_session(
147 &self,
148 value: Vec<u8>,
149 backed_up: bool,
150 ) -> Result<InboundGroupSession> {
151 let mut pickle: PickledInboundGroupSession = self.deserialize_value(&value)?;
152
153 pickle.backed_up = backed_up;
158
159 Ok(InboundGroupSession::from_pickle(pickle)?)
160 }
161
162 fn deserialize_key_request(&self, value: &[u8], sent_out: bool) -> Result<GossipRequest> {
163 let mut request: GossipRequest = self.deserialize_value(value)?;
164 request.sent_out = sent_out;
167 Ok(request)
168 }
169
170 fn get_static_account(&self) -> Option<StaticAccountData> {
171 self.static_account.read().unwrap().clone()
172 }
173
174 async fn acquire(&self) -> Result<SqliteAsyncConn> {
175 Ok(self.pool.get().await?)
176 }
177}
178
179const DATABASE_VERSION: u8 = 11;
180
181const DEHYDRATED_DEVICE_PICKLE_KEY: &str = "dehydrated_device_pickle_key";
183
184async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
186 if version == 0 {
187 debug!("Creating database");
188 } else if version < DATABASE_VERSION {
189 debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
190 } else {
191 return Ok(());
192 }
193
194 if version < 1 {
195 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
198 conn.with_transaction(|txn| {
199 txn.execute_batch(include_str!("../migrations/crypto_store/001_init.sql"))?;
200 txn.set_db_version(1)
201 })
202 .await?;
203 }
204
205 if version < 2 {
206 conn.with_transaction(|txn| {
207 txn.execute_batch(include_str!("../migrations/crypto_store/002_reset_olm_hash.sql"))?;
208 txn.set_db_version(2)
209 })
210 .await?;
211 }
212
213 if version < 3 {
214 conn.with_transaction(|txn| {
215 txn.execute_batch(include_str!("../migrations/crypto_store/003_room_settings.sql"))?;
216 txn.set_db_version(3)
217 })
218 .await?;
219 }
220
221 if version < 4 {
222 conn.with_transaction(|txn| {
223 txn.execute_batch(include_str!(
224 "../migrations/crypto_store/004_drop_outbound_group_sessions.sql"
225 ))?;
226 txn.set_db_version(4)
227 })
228 .await?;
229 }
230
231 if version < 5 {
232 conn.with_transaction(|txn| {
233 txn.execute_batch(include_str!("../migrations/crypto_store/005_withheld_code.sql"))?;
234 txn.set_db_version(5)
235 })
236 .await?;
237 }
238
239 if version < 6 {
240 conn.with_transaction(|txn| {
241 txn.execute_batch(include_str!(
242 "../migrations/crypto_store/006_drop_outbound_group_sessions.sql"
243 ))?;
244 txn.set_db_version(6)
245 })
246 .await?;
247 }
248
249 if version < 7 {
250 conn.with_transaction(|txn| {
251 txn.execute_batch(include_str!("../migrations/crypto_store/007_lock_leases.sql"))?;
252 txn.set_db_version(7)
253 })
254 .await?;
255 }
256
257 if version < 8 {
258 conn.with_transaction(|txn| {
259 txn.execute_batch(include_str!("../migrations/crypto_store/008_secret_inbox.sql"))?;
260 txn.set_db_version(8)
261 })
262 .await?;
263 }
264
265 if version < 9 {
266 conn.with_transaction(|txn| {
267 txn.execute_batch(include_str!(
268 "../migrations/crypto_store/009_inbound_group_session_sender_key_sender_data_type.sql"
269 ))?;
270 txn.set_db_version(9)
271 })
272 .await?;
273 }
274
275 if version < 10 {
276 conn.with_transaction(|txn| {
277 txn.execute_batch(include_str!(
278 "../migrations/crypto_store/010_received_room_key_bundles.sql"
279 ))?;
280 txn.set_db_version(10)
281 })
282 .await?;
283 }
284
285 if version < 11 {
286 conn.with_transaction(|txn| {
287 txn.execute_batch(include_str!(
288 "../migrations/crypto_store/011_received_room_key_bundles_with_curve_key.sql"
289 ))?;
290 txn.set_db_version(11)
291 })
292 .await?;
293 }
294
295 Ok(())
296}
297
298trait SqliteConnectionExt {
299 fn set_session(
300 &self,
301 session_id: &[u8],
302 sender_key: &[u8],
303 data: &[u8],
304 ) -> rusqlite::Result<()>;
305
306 fn set_inbound_group_session(
307 &self,
308 room_id: &[u8],
309 session_id: &[u8],
310 data: &[u8],
311 backed_up: bool,
312 sender_key: Option<&[u8]>,
313 sender_data_type: Option<u8>,
314 ) -> rusqlite::Result<()>;
315
316 fn set_outbound_group_session(&self, room_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
317
318 fn set_device(&self, user_id: &[u8], device_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
319 fn delete_device(&self, user_id: &[u8], device_id: &[u8]) -> rusqlite::Result<()>;
320
321 fn set_identity(&self, user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
322
323 fn add_olm_hash(&self, data: &[u8]) -> rusqlite::Result<()>;
324
325 fn set_key_request(
326 &self,
327 request_id: &[u8],
328 sent_out: bool,
329 data: &[u8],
330 ) -> rusqlite::Result<()>;
331
332 fn set_direct_withheld(
333 &self,
334 session_id: &[u8],
335 room_id: &[u8],
336 data: &[u8],
337 ) -> rusqlite::Result<()>;
338
339 fn set_room_settings(&self, room_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
340
341 fn set_secret(&self, request_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
342
343 fn set_received_room_key_bundle(
344 &self,
345 room_id: &[u8],
346 user_id: &[u8],
347 data: &[u8],
348 ) -> rusqlite::Result<()>;
349}
350
351impl SqliteConnectionExt for rusqlite::Connection {
352 fn set_session(
353 &self,
354 session_id: &[u8],
355 sender_key: &[u8],
356 data: &[u8],
357 ) -> rusqlite::Result<()> {
358 self.execute(
359 "INSERT INTO session (session_id, sender_key, data)
360 VALUES (?1, ?2, ?3)
361 ON CONFLICT (session_id) DO UPDATE SET data = ?3",
362 (session_id, sender_key, data),
363 )?;
364 Ok(())
365 }
366
367 fn set_inbound_group_session(
368 &self,
369 room_id: &[u8],
370 session_id: &[u8],
371 data: &[u8],
372 backed_up: bool,
373 sender_key: Option<&[u8]>,
374 sender_data_type: Option<u8>,
375 ) -> rusqlite::Result<()> {
376 self.execute(
377 "INSERT INTO inbound_group_session (session_id, room_id, data, backed_up, sender_key, sender_data_type) \
378 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
379 ON CONFLICT (session_id) DO UPDATE SET data = ?3, backed_up = ?4, sender_key = ?5, sender_data_type = ?6",
380 (session_id, room_id, data, backed_up, sender_key, sender_data_type),
381 )?;
382 Ok(())
383 }
384
385 fn set_outbound_group_session(&self, room_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
386 self.execute(
387 "INSERT INTO outbound_group_session (room_id, data) \
388 VALUES (?1, ?2)
389 ON CONFLICT (room_id) DO UPDATE SET data = ?2",
390 (room_id, data),
391 )?;
392 Ok(())
393 }
394
395 fn set_device(&self, user_id: &[u8], device_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
396 self.execute(
397 "INSERT INTO device (user_id, device_id, data) \
398 VALUES (?1, ?2, ?3)
399 ON CONFLICT (user_id, device_id) DO UPDATE SET data = ?3",
400 (user_id, device_id, data),
401 )?;
402 Ok(())
403 }
404
405 fn delete_device(&self, user_id: &[u8], device_id: &[u8]) -> rusqlite::Result<()> {
406 self.execute(
407 "DELETE FROM device WHERE user_id = ? AND device_id = ?",
408 (user_id, device_id),
409 )?;
410 Ok(())
411 }
412
413 fn set_identity(&self, user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
414 self.execute(
415 "INSERT INTO identity (user_id, data) \
416 VALUES (?1, ?2)
417 ON CONFLICT (user_id) DO UPDATE SET data = ?2",
418 (user_id, data),
419 )?;
420 Ok(())
421 }
422
423 fn add_olm_hash(&self, data: &[u8]) -> rusqlite::Result<()> {
424 self.execute("INSERT INTO olm_hash (data) VALUES (?) ON CONFLICT DO NOTHING", (data,))?;
425 Ok(())
426 }
427
428 fn set_key_request(
429 &self,
430 request_id: &[u8],
431 sent_out: bool,
432 data: &[u8],
433 ) -> rusqlite::Result<()> {
434 self.execute(
435 "INSERT INTO key_requests (request_id, sent_out, data)
436 VALUES (?1, ?2, ?3)
437 ON CONFLICT (request_id) DO UPDATE SET sent_out = ?2, data = ?3",
438 (request_id, sent_out, data),
439 )?;
440 Ok(())
441 }
442
443 fn set_direct_withheld(
444 &self,
445 session_id: &[u8],
446 room_id: &[u8],
447 data: &[u8],
448 ) -> rusqlite::Result<()> {
449 self.execute(
450 "INSERT INTO direct_withheld_info (session_id, room_id, data)
451 VALUES (?1, ?2, ?3)
452 ON CONFLICT (session_id) DO UPDATE SET room_id = ?2, data = ?3",
453 (session_id, room_id, data),
454 )?;
455 Ok(())
456 }
457
458 fn set_room_settings(&self, room_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
459 self.execute(
460 "INSERT INTO room_settings (room_id, data)
461 VALUES (?1, ?2)
462 ON CONFLICT (room_id) DO UPDATE SET data = ?2",
463 (room_id, data),
464 )?;
465 Ok(())
466 }
467
468 fn set_secret(&self, secret_name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
469 self.execute(
470 "INSERT INTO secrets (secret_name, data)
471 VALUES (?1, ?2)",
472 (secret_name, data),
473 )?;
474
475 Ok(())
476 }
477
478 fn set_received_room_key_bundle(
479 &self,
480 room_id: &[u8],
481 sender_user_id: &[u8],
482 data: &[u8],
483 ) -> rusqlite::Result<()> {
484 self.execute(
485 "INSERT INTO received_room_key_bundle(room_id, sender_user_id, bundle_data)
486 VALUES (?1, ?2, ?3)
487 ON CONFLICT (room_id, sender_user_id) DO UPDATE SET bundle_data = ?3",
488 (room_id, sender_user_id, data),
489 )?;
490 Ok(())
491 }
492}
493
494#[async_trait]
495trait SqliteObjectCryptoStoreExt: SqliteAsyncConnExt {
496 async fn get_sessions_for_sender_key(&self, sender_key: Key) -> Result<Vec<Vec<u8>>> {
497 Ok(self
498 .prepare("SELECT data FROM session WHERE sender_key = ?", |mut stmt| {
499 stmt.query((sender_key,))?.mapped(|row| row.get(0)).collect()
500 })
501 .await?)
502 }
503
504 async fn get_inbound_group_session(
505 &self,
506 session_id: Key,
507 ) -> Result<Option<(Vec<u8>, Vec<u8>, bool)>> {
508 Ok(self
509 .query_row(
510 "SELECT room_id, data, backed_up FROM inbound_group_session WHERE session_id = ?",
511 (session_id,),
512 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
513 )
514 .await
515 .optional()?)
516 }
517
518 async fn get_inbound_group_sessions(&self) -> Result<Vec<(Vec<u8>, bool)>> {
519 Ok(self
520 .prepare("SELECT data, backed_up FROM inbound_group_session", |mut stmt| {
521 stmt.query(())?.mapped(|row| Ok((row.get(0)?, row.get(1)?))).collect()
522 })
523 .await?)
524 }
525
526 async fn get_inbound_group_session_counts(
527 &self,
528 _backup_version: Option<&str>,
529 ) -> Result<RoomKeyCounts> {
530 let total = self
531 .query_row("SELECT count(*) FROM inbound_group_session", (), |row| row.get(0))
532 .await?;
533 let backed_up = self
534 .query_row(
535 "SELECT count(*) FROM inbound_group_session WHERE backed_up = TRUE",
536 (),
537 |row| row.get(0),
538 )
539 .await?;
540 Ok(RoomKeyCounts { total, backed_up })
541 }
542
543 async fn get_inbound_group_sessions_by_room_id(
544 &self,
545 room_id: Key,
546 ) -> Result<Vec<(Vec<u8>, bool)>> {
547 Ok(self
548 .prepare(
549 "SELECT data, backed_up FROM inbound_group_session WHERE room_id = :room_id",
550 move |mut stmt| {
551 stmt.query(named_params! {
552 ":room_id": room_id,
553 })?
554 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
555 .collect()
556 },
557 )
558 .await?)
559 }
560
561 async fn get_inbound_group_sessions_for_device_batch(
562 &self,
563 sender_key: Key,
564 sender_data_type: SenderDataType,
565 after_session_id: Option<Key>,
566 limit: usize,
567 ) -> Result<Vec<(Vec<u8>, bool)>> {
568 Ok(self
569 .prepare(
570 "
571 SELECT data, backed_up
572 FROM inbound_group_session
573 WHERE sender_key = :sender_key
574 AND sender_data_type = :sender_data_type
575 AND session_id > :after_session_id
576 ORDER BY session_id
577 LIMIT :limit
578 ",
579 move |mut stmt| {
580 let sender_data_type = sender_data_type as u8;
581
582 let after_session_id = after_session_id.unwrap_or(Key::Plain(Vec::new()));
585
586 stmt.query(named_params! {
587 ":sender_key": sender_key,
588 ":sender_data_type": sender_data_type,
589 ":after_session_id": after_session_id,
590 ":limit": limit,
591 })?
592 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
593 .collect()
594 },
595 )
596 .await?)
597 }
598
599 async fn get_inbound_group_sessions_for_backup(&self, limit: usize) -> Result<Vec<Vec<u8>>> {
600 Ok(self
601 .prepare(
602 "SELECT data FROM inbound_group_session WHERE backed_up = FALSE LIMIT ?",
603 move |mut stmt| stmt.query((limit,))?.mapped(|row| row.get(0)).collect(),
604 )
605 .await?)
606 }
607
608 async fn mark_inbound_group_sessions_as_backed_up(&self, session_ids: Vec<Key>) -> Result<()> {
609 if session_ids.is_empty() {
610 warn!("No sessions to mark as backed up!");
612 return Ok(());
613 }
614
615 let session_ids_len = session_ids.len();
616
617 self.chunk_large_query_over(session_ids, None, move |txn, session_ids| {
618 let sql_params = repeat_vars(session_ids_len);
621 let query = format!("UPDATE inbound_group_session SET backed_up = TRUE where session_id IN ({sql_params})");
622 txn.prepare(&query)?.execute(params_from_iter(session_ids.iter()))?;
623 Ok(Vec::<()>::new())
624 }).await?;
625
626 Ok(())
627 }
628
629 async fn reset_inbound_group_session_backup_state(&self) -> Result<()> {
630 self.execute("UPDATE inbound_group_session SET backed_up = FALSE", ()).await?;
631 Ok(())
632 }
633
634 async fn get_outbound_group_session(&self, room_id: Key) -> Result<Option<Vec<u8>>> {
635 Ok(self
636 .query_row(
637 "SELECT data FROM outbound_group_session WHERE room_id = ?",
638 (room_id,),
639 |row| row.get(0),
640 )
641 .await
642 .optional()?)
643 }
644
645 async fn get_device(&self, user_id: Key, device_id: Key) -> Result<Option<Vec<u8>>> {
646 Ok(self
647 .query_row(
648 "SELECT data FROM device WHERE user_id = ? AND device_id = ?",
649 (user_id, device_id),
650 |row| row.get(0),
651 )
652 .await
653 .optional()?)
654 }
655
656 async fn get_user_devices(&self, user_id: Key) -> Result<Vec<Vec<u8>>> {
657 Ok(self
658 .prepare("SELECT data FROM device WHERE user_id = ?", |mut stmt| {
659 stmt.query((user_id,))?.mapped(|row| row.get(0)).collect()
660 })
661 .await?)
662 }
663
664 async fn get_user_identity(&self, user_id: Key) -> Result<Option<Vec<u8>>> {
665 Ok(self
666 .query_row("SELECT data FROM identity WHERE user_id = ?", (user_id,), |row| row.get(0))
667 .await
668 .optional()?)
669 }
670
671 async fn has_olm_hash(&self, data: Vec<u8>) -> Result<bool> {
672 Ok(self
673 .query_row("SELECT count(*) FROM olm_hash WHERE data = ?", (data,), |row| {
674 row.get::<_, i32>(0)
675 })
676 .await?
677 > 0)
678 }
679
680 async fn get_tracked_users(&self) -> Result<Vec<Vec<u8>>> {
681 Ok(self
682 .prepare("SELECT data FROM tracked_user", |mut stmt| {
683 stmt.query(())?.mapped(|row| row.get(0)).collect()
684 })
685 .await?)
686 }
687
688 async fn add_tracked_users(&self, users: Vec<(Key, Vec<u8>)>) -> Result<()> {
689 Ok(self
690 .prepare(
691 "INSERT INTO tracked_user (user_id, data) \
692 VALUES (?1, ?2) \
693 ON CONFLICT (user_id) DO UPDATE SET data = ?2",
694 |mut stmt| {
695 for (user_id, data) in users {
696 stmt.execute((user_id, data))?;
697 }
698
699 Ok(())
700 },
701 )
702 .await?)
703 }
704
705 async fn get_outgoing_secret_request(
706 &self,
707 request_id: Key,
708 ) -> Result<Option<(Vec<u8>, bool)>> {
709 Ok(self
710 .query_row(
711 "SELECT data, sent_out FROM key_requests WHERE request_id = ?",
712 (request_id,),
713 |row| Ok((row.get(0)?, row.get(1)?)),
714 )
715 .await
716 .optional()?)
717 }
718
719 async fn get_outgoing_secret_requests(&self) -> Result<Vec<(Vec<u8>, bool)>> {
720 Ok(self
721 .prepare("SELECT data, sent_out FROM key_requests", |mut stmt| {
722 stmt.query(())?.mapped(|row| Ok((row.get(0)?, row.get(1)?))).collect()
723 })
724 .await?)
725 }
726
727 async fn get_unsent_secret_requests(&self) -> Result<Vec<Vec<u8>>> {
728 Ok(self
729 .prepare("SELECT data FROM key_requests WHERE sent_out = FALSE", |mut stmt| {
730 stmt.query(())?.mapped(|row| row.get(0)).collect()
731 })
732 .await?)
733 }
734
735 async fn delete_key_request(&self, request_id: Key) -> Result<()> {
736 self.execute("DELETE FROM key_requests WHERE request_id = ?", (request_id,)).await?;
737 Ok(())
738 }
739
740 async fn get_secrets_from_inbox(&self, secret_name: Key) -> Result<Vec<Vec<u8>>> {
741 Ok(self
742 .prepare("SELECT data FROM secrets WHERE secret_name = ?", |mut stmt| {
743 stmt.query((secret_name,))?.mapped(|row| row.get(0)).collect()
744 })
745 .await?)
746 }
747
748 async fn delete_secrets_from_inbox(&self, secret_name: Key) -> Result<()> {
749 self.execute("DELETE FROM secrets WHERE secret_name = ?", (secret_name,)).await?;
750 Ok(())
751 }
752
753 async fn get_direct_withheld_info(
754 &self,
755 session_id: Key,
756 room_id: Key,
757 ) -> Result<Option<Vec<u8>>> {
758 Ok(self
759 .query_row(
760 "SELECT data FROM direct_withheld_info WHERE session_id = ?1 AND room_id = ?2",
761 (session_id, room_id),
762 |row| row.get(0),
763 )
764 .await
765 .optional()?)
766 }
767
768 async fn get_room_settings(&self, room_id: Key) -> Result<Option<Vec<u8>>> {
769 Ok(self
770 .query_row("SELECT data FROM room_settings WHERE room_id = ?", (room_id,), |row| {
771 row.get(0)
772 })
773 .await
774 .optional()?)
775 }
776
777 async fn get_received_room_key_bundle(
778 &self,
779 room_id: Key,
780 sender_user: Key,
781 ) -> Result<Option<Vec<u8>>> {
782 Ok(self
783 .query_row(
784 "SELECT bundle_data FROM received_room_key_bundle WHERE room_id = ? AND sender_user_id = ?",
785 (room_id, sender_user),
786 |row| { row.get(0) },
787 )
788 .await
789 .optional()?)
790 }
791}
792
793#[async_trait]
794impl SqliteObjectCryptoStoreExt for SqliteAsyncConn {}
795
796#[async_trait]
797impl CryptoStore for SqliteCryptoStore {
798 type Error = Error;
799
800 async fn load_account(&self) -> Result<Option<Account>> {
801 let conn = self.acquire().await?;
802 if let Some(pickle) = conn.get_kv("account").await? {
803 let pickle = self.deserialize_value(&pickle)?;
804
805 let account = Account::from_pickle(pickle).map_err(|_| Error::Unpickle)?;
806
807 *self.static_account.write().unwrap() = Some(account.static_data().clone());
808
809 Ok(Some(account))
810 } else {
811 Ok(None)
812 }
813 }
814
815 async fn load_identity(&self) -> Result<Option<PrivateCrossSigningIdentity>> {
816 let conn = self.acquire().await?;
817 if let Some(i) = conn.get_kv("identity").await? {
818 let pickle = self.deserialize_value(&i)?;
819 Ok(Some(PrivateCrossSigningIdentity::from_pickle(pickle).map_err(|_| Error::Unpickle)?))
820 } else {
821 Ok(None)
822 }
823 }
824
825 async fn save_pending_changes(&self, changes: PendingChanges) -> Result<()> {
826 let _guard = self.save_changes_lock.lock().await;
831
832 let pickled_account = if let Some(account) = changes.account {
833 *self.static_account.write().unwrap() = Some(account.static_data().clone());
834 Some(account.pickle())
835 } else {
836 None
837 };
838
839 let this = self.clone();
840 self.acquire()
841 .await?
842 .with_transaction(move |txn| {
843 if let Some(pickled_account) = pickled_account {
844 let serialized_account = this.serialize_value(&pickled_account)?;
845 txn.set_kv("account", &serialized_account)?;
846 }
847
848 Ok::<_, Error>(())
849 })
850 .await?;
851
852 Ok(())
853 }
854
855 async fn save_changes(&self, changes: Changes) -> Result<()> {
856 let _guard = self.save_changes_lock.lock().await;
861
862 let pickled_private_identity =
863 if let Some(i) = changes.private_identity { Some(i.pickle().await) } else { None };
864
865 let mut session_changes = Vec::new();
866
867 for session in changes.sessions {
868 let session_id = self.encode_key("session", session.session_id());
869 let sender_key = self.encode_key("session", session.sender_key().to_base64());
870 let pickle = session.pickle().await;
871 session_changes.push((session_id, sender_key, pickle));
872 }
873
874 let mut inbound_session_changes = Vec::new();
875 for session in changes.inbound_group_sessions {
876 let room_id = self.encode_key("inbound_group_session", session.room_id().as_bytes());
877 let session_id = self.encode_key("inbound_group_session", session.session_id());
878 let pickle = session.pickle().await;
879 let sender_key =
880 self.encode_key("inbound_group_session", session.sender_key().to_base64());
881 inbound_session_changes.push((room_id, session_id, pickle, sender_key));
882 }
883
884 let mut outbound_session_changes = Vec::new();
885 for session in changes.outbound_group_sessions {
886 let room_id = self.encode_key("outbound_group_session", session.room_id().as_bytes());
887 let pickle = session.pickle().await;
888 outbound_session_changes.push((room_id, pickle));
889 }
890
891 let this = self.clone();
892 self.acquire()
893 .await?
894 .with_transaction(move |txn| {
895 if let Some(pickled_private_identity) = &pickled_private_identity {
896 let serialized_private_identity =
897 this.serialize_value(pickled_private_identity)?;
898 txn.set_kv("identity", &serialized_private_identity)?;
899 }
900
901 if let Some(token) = &changes.next_batch_token {
902 let serialized_token = this.serialize_value(token)?;
903 txn.set_kv("next_batch_token", &serialized_token)?;
904 }
905
906 if let Some(decryption_key) = &changes.backup_decryption_key {
907 let serialized_decryption_key = this.serialize_value(decryption_key)?;
908 txn.set_kv("recovery_key_v1", &serialized_decryption_key)?;
909 }
910
911 if let Some(backup_version) = &changes.backup_version {
912 let serialized_backup_version = this.serialize_value(backup_version)?;
913 txn.set_kv("backup_version_v1", &serialized_backup_version)?;
914 }
915
916 if let Some(pickle_key) = &changes.dehydrated_device_pickle_key {
917 let serialized_pickle_key = this.serialize_value(pickle_key)?;
918 txn.set_kv(DEHYDRATED_DEVICE_PICKLE_KEY, &serialized_pickle_key)?;
919 }
920
921 for device in changes.devices.new.iter().chain(&changes.devices.changed) {
922 let user_id = this.encode_key("device", device.user_id().as_bytes());
923 let device_id = this.encode_key("device", device.device_id().as_bytes());
924 let data = this.serialize_value(&device)?;
925 txn.set_device(&user_id, &device_id, &data)?;
926 }
927
928 for device in &changes.devices.deleted {
929 let user_id = this.encode_key("device", device.user_id().as_bytes());
930 let device_id = this.encode_key("device", device.device_id().as_bytes());
931 txn.delete_device(&user_id, &device_id)?;
932 }
933
934 for identity in changes.identities.changed.iter().chain(&changes.identities.new) {
935 let user_id = this.encode_key("identity", identity.user_id().as_bytes());
936 let data = this.serialize_value(&identity)?;
937 txn.set_identity(&user_id, &data)?;
938 }
939
940 for (session_id, sender_key, pickle) in &session_changes {
941 let serialized_session = this.serialize_value(&pickle)?;
942 txn.set_session(session_id, sender_key, &serialized_session)?;
943 }
944
945 for (room_id, session_id, pickle, sender_key) in &inbound_session_changes {
946 let serialized_session = this.serialize_value(&pickle)?;
947 txn.set_inbound_group_session(
948 room_id,
949 session_id,
950 &serialized_session,
951 pickle.backed_up,
952 Some(sender_key),
953 Some(pickle.sender_data.to_type() as u8),
954 )?;
955 }
956
957 for (room_id, pickle) in &outbound_session_changes {
958 let serialized_session = this.serialize_json(&pickle)?;
959 txn.set_outbound_group_session(room_id, &serialized_session)?;
960 }
961
962 for hash in &changes.message_hashes {
963 let hash = rmp_serde::to_vec(hash)?;
964 txn.add_olm_hash(&hash)?;
965 }
966
967 for request in changes.key_requests {
968 let request_id = this.encode_key("key_requests", request.request_id.as_bytes());
969 let serialized_request = this.serialize_value(&request)?;
970 txn.set_key_request(&request_id, request.sent_out, &serialized_request)?;
971 }
972
973 for (room_id, data) in changes.withheld_session_info {
974 for (session_id, event) in data {
975 let session_id = this.encode_key("direct_withheld_info", session_id);
976 let room_id = this.encode_key("direct_withheld_info", &room_id);
977 let serialized_info = this.serialize_json(&event)?;
978 txn.set_direct_withheld(&session_id, &room_id, &serialized_info)?;
979 }
980 }
981
982 for (room_id, settings) in changes.room_settings {
983 let room_id = this.encode_key("room_settings", room_id.as_bytes());
984 let value = this.serialize_value(&settings)?;
985 txn.set_room_settings(&room_id, &value)?;
986 }
987
988 for secret in changes.secrets {
989 let secret_name = this.encode_key("secrets", secret.secret_name.to_string());
990 let value = this.serialize_json(&secret)?;
991 txn.set_secret(&secret_name, &value)?;
992 }
993
994 for bundle in changes.received_room_key_bundles {
995 let room_id =
996 this.encode_key("received_room_key_bundle", &bundle.bundle_data.room_id);
997 let user_id = this.encode_key("received_room_key_bundle", &bundle.sender_user);
998 let value = this.serialize_value(&bundle)?;
999 txn.set_received_room_key_bundle(&room_id, &user_id, &value)?;
1000 }
1001
1002 Ok::<_, Error>(())
1003 })
1004 .await?;
1005
1006 Ok(())
1007 }
1008
1009 async fn save_inbound_group_sessions(
1010 &self,
1011 sessions: Vec<InboundGroupSession>,
1012 backed_up_to_version: Option<&str>,
1013 ) -> matrix_sdk_crypto::store::Result<(), Self::Error> {
1014 sessions.iter().for_each(|s| {
1016 let backed_up = s.backed_up();
1017 if backed_up != backed_up_to_version.is_some() {
1018 warn!(
1019 backed_up,
1020 backed_up_to_version,
1021 "Session backed-up flag does not correspond to backup version setting",
1022 );
1023 }
1024 });
1025
1026 self.save_changes(Changes { inbound_group_sessions: sessions, ..Changes::default() }).await
1029 }
1030
1031 async fn get_sessions(&self, sender_key: &str) -> Result<Option<Vec<Session>>> {
1032 let device_keys = self.get_own_device().await?.as_device_keys().clone();
1033
1034 let sessions: Vec<_> = self
1035 .acquire()
1036 .await?
1037 .get_sessions_for_sender_key(self.encode_key("session", sender_key.as_bytes()))
1038 .await?
1039 .into_iter()
1040 .map(|bytes| {
1041 let pickle = self.deserialize_value(&bytes)?;
1042 Session::from_pickle(device_keys.clone(), pickle).map_err(|_| Error::AccountUnset)
1043 })
1044 .collect::<Result<_>>()?;
1045
1046 if sessions.is_empty() {
1047 Ok(None)
1048 } else {
1049 Ok(Some(sessions))
1050 }
1051 }
1052
1053 #[instrument(skip(self))]
1054 async fn get_inbound_group_session(
1055 &self,
1056 room_id: &RoomId,
1057 session_id: &str,
1058 ) -> Result<Option<InboundGroupSession>> {
1059 let session_id = self.encode_key("inbound_group_session", session_id);
1060 let Some((room_id_from_db, value, backed_up)) =
1061 self.acquire().await?.get_inbound_group_session(session_id).await?
1062 else {
1063 return Ok(None);
1064 };
1065
1066 let room_id = self.encode_key("inbound_group_session", room_id.as_bytes());
1067 if *room_id != room_id_from_db {
1068 warn!("expected room_id for session_id doesn't match what's in the DB");
1069 return Ok(None);
1070 }
1071
1072 Ok(Some(self.deserialize_and_unpickle_inbound_group_session(value, backed_up)?))
1073 }
1074
1075 async fn get_inbound_group_sessions(&self) -> Result<Vec<InboundGroupSession>> {
1076 self.acquire()
1077 .await?
1078 .get_inbound_group_sessions()
1079 .await?
1080 .into_iter()
1081 .map(|(value, backed_up)| {
1082 self.deserialize_and_unpickle_inbound_group_session(value, backed_up)
1083 })
1084 .collect()
1085 }
1086
1087 async fn get_inbound_group_sessions_by_room_id(
1088 &self,
1089 room_id: &RoomId,
1090 ) -> Result<Vec<InboundGroupSession>> {
1091 let room_id = self.encode_key("inbound_group_session", room_id.as_bytes());
1092 self.acquire()
1093 .await?
1094 .get_inbound_group_sessions_by_room_id(room_id)
1095 .await?
1096 .into_iter()
1097 .map(|(value, backed_up)| {
1098 self.deserialize_and_unpickle_inbound_group_session(value, backed_up)
1099 })
1100 .collect()
1101 }
1102
1103 async fn get_inbound_group_sessions_for_device_batch(
1104 &self,
1105 sender_key: Curve25519PublicKey,
1106 sender_data_type: SenderDataType,
1107 after_session_id: Option<String>,
1108 limit: usize,
1109 ) -> Result<Vec<InboundGroupSession>, Self::Error> {
1110 let after_session_id =
1111 after_session_id.map(|session_id| self.encode_key("inbound_group_session", session_id));
1112 let sender_key = self.encode_key("inbound_group_session", sender_key.to_base64());
1113
1114 self.acquire()
1115 .await?
1116 .get_inbound_group_sessions_for_device_batch(
1117 sender_key,
1118 sender_data_type,
1119 after_session_id,
1120 limit,
1121 )
1122 .await?
1123 .into_iter()
1124 .map(|(value, backed_up)| {
1125 self.deserialize_and_unpickle_inbound_group_session(value, backed_up)
1126 })
1127 .collect()
1128 }
1129
1130 async fn inbound_group_session_counts(
1131 &self,
1132 backup_version: Option<&str>,
1133 ) -> Result<RoomKeyCounts> {
1134 Ok(self.acquire().await?.get_inbound_group_session_counts(backup_version).await?)
1135 }
1136
1137 async fn inbound_group_sessions_for_backup(
1138 &self,
1139 _backup_version: &str,
1140 limit: usize,
1141 ) -> Result<Vec<InboundGroupSession>> {
1142 self.acquire()
1143 .await?
1144 .get_inbound_group_sessions_for_backup(limit)
1145 .await?
1146 .into_iter()
1147 .map(|value| self.deserialize_and_unpickle_inbound_group_session(value, false))
1148 .collect()
1149 }
1150
1151 async fn mark_inbound_group_sessions_as_backed_up(
1152 &self,
1153 _backup_version: &str,
1154 session_ids: &[(&RoomId, &str)],
1155 ) -> Result<()> {
1156 Ok(self
1157 .acquire()
1158 .await?
1159 .mark_inbound_group_sessions_as_backed_up(
1160 session_ids
1161 .iter()
1162 .map(|(_, s)| self.encode_key("inbound_group_session", s))
1163 .collect(),
1164 )
1165 .await?)
1166 }
1167
1168 async fn reset_backup_state(&self) -> Result<()> {
1169 Ok(self.acquire().await?.reset_inbound_group_session_backup_state().await?)
1170 }
1171
1172 async fn load_backup_keys(&self) -> Result<BackupKeys> {
1173 let conn = self.acquire().await?;
1174
1175 let backup_version = conn
1176 .get_kv("backup_version_v1")
1177 .await?
1178 .map(|value| self.deserialize_value(&value))
1179 .transpose()?;
1180
1181 let decryption_key = conn
1182 .get_kv("recovery_key_v1")
1183 .await?
1184 .map(|value| self.deserialize_value(&value))
1185 .transpose()?;
1186
1187 Ok(BackupKeys { backup_version, decryption_key })
1188 }
1189
1190 async fn load_dehydrated_device_pickle_key(&self) -> Result<Option<DehydratedDeviceKey>> {
1191 let conn = self.acquire().await?;
1192
1193 conn.get_kv(DEHYDRATED_DEVICE_PICKLE_KEY)
1194 .await?
1195 .map(|value| self.deserialize_value(&value))
1196 .transpose()
1197 }
1198
1199 async fn delete_dehydrated_device_pickle_key(&self) -> Result<(), Self::Error> {
1200 let conn = self.acquire().await?;
1201 conn.clear_kv(DEHYDRATED_DEVICE_PICKLE_KEY).await?;
1202
1203 Ok(())
1204 }
1205 async fn get_outbound_group_session(
1206 &self,
1207 room_id: &RoomId,
1208 ) -> Result<Option<OutboundGroupSession>> {
1209 let room_id = self.encode_key("outbound_group_session", room_id.as_bytes());
1210 let Some(value) = self.acquire().await?.get_outbound_group_session(room_id).await? else {
1211 return Ok(None);
1212 };
1213
1214 let account_info = self.get_static_account().ok_or(Error::AccountUnset)?;
1215
1216 let pickle = self.deserialize_json(&value)?;
1217 let session = OutboundGroupSession::from_pickle(
1218 account_info.device_id,
1219 account_info.identity_keys,
1220 pickle,
1221 )
1222 .map_err(|_| Error::Unpickle)?;
1223
1224 return Ok(Some(session));
1225 }
1226
1227 async fn load_tracked_users(&self) -> Result<Vec<TrackedUser>> {
1228 self.acquire()
1229 .await?
1230 .get_tracked_users()
1231 .await?
1232 .iter()
1233 .map(|value| self.deserialize_value(value))
1234 .collect()
1235 }
1236
1237 async fn save_tracked_users(&self, tracked_users: &[(&UserId, bool)]) -> Result<()> {
1238 let users: Vec<(Key, Vec<u8>)> = tracked_users
1239 .iter()
1240 .map(|(u, d)| {
1241 let user_id = self.encode_key("tracked_users", u.as_bytes());
1242 let data =
1243 self.serialize_value(&TrackedUser { user_id: (*u).into(), dirty: *d })?;
1244 Ok((user_id, data))
1245 })
1246 .collect::<Result<_>>()?;
1247
1248 Ok(self.acquire().await?.add_tracked_users(users).await?)
1249 }
1250
1251 async fn get_device(
1252 &self,
1253 user_id: &UserId,
1254 device_id: &DeviceId,
1255 ) -> Result<Option<DeviceData>> {
1256 let user_id = self.encode_key("device", user_id.as_bytes());
1257 let device_id = self.encode_key("device", device_id.as_bytes());
1258 Ok(self
1259 .acquire()
1260 .await?
1261 .get_device(user_id, device_id)
1262 .await?
1263 .map(|value| self.deserialize_value(&value))
1264 .transpose()?)
1265 }
1266
1267 async fn get_user_devices(
1268 &self,
1269 user_id: &UserId,
1270 ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
1271 let user_id = self.encode_key("device", user_id.as_bytes());
1272 self.acquire()
1273 .await?
1274 .get_user_devices(user_id)
1275 .await?
1276 .into_iter()
1277 .map(|value| {
1278 let device: DeviceData = self.deserialize_value(&value)?;
1279 Ok((device.device_id().to_owned(), device))
1280 })
1281 .collect()
1282 }
1283
1284 async fn get_own_device(&self) -> Result<DeviceData> {
1285 let account_info = self.get_static_account().ok_or(Error::AccountUnset)?;
1286
1287 Ok(self
1288 .get_device(&account_info.user_id, &account_info.device_id)
1289 .await?
1290 .expect("We should be able to find our own device."))
1291 }
1292
1293 async fn get_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentityData>> {
1294 let user_id = self.encode_key("identity", user_id.as_bytes());
1295 Ok(self
1296 .acquire()
1297 .await?
1298 .get_user_identity(user_id)
1299 .await?
1300 .map(|value| self.deserialize_value(&value))
1301 .transpose()?)
1302 }
1303
1304 async fn is_message_known(
1305 &self,
1306 message_hash: &matrix_sdk_crypto::olm::OlmMessageHash,
1307 ) -> Result<bool> {
1308 let value = rmp_serde::to_vec(message_hash)?;
1309 Ok(self.acquire().await?.has_olm_hash(value).await?)
1310 }
1311
1312 async fn get_outgoing_secret_requests(
1313 &self,
1314 request_id: &TransactionId,
1315 ) -> Result<Option<GossipRequest>> {
1316 let request_id = self.encode_key("key_requests", request_id.as_bytes());
1317 Ok(self
1318 .acquire()
1319 .await?
1320 .get_outgoing_secret_request(request_id)
1321 .await?
1322 .map(|(value, sent_out)| self.deserialize_key_request(&value, sent_out))
1323 .transpose()?)
1324 }
1325
1326 async fn get_secret_request_by_info(
1327 &self,
1328 key_info: &SecretInfo,
1329 ) -> Result<Option<GossipRequest>> {
1330 let requests = self.acquire().await?.get_outgoing_secret_requests().await?;
1331 for (request, sent_out) in requests {
1332 let request = self.deserialize_key_request(&request, sent_out)?;
1333 if request.info == *key_info {
1334 return Ok(Some(request));
1335 }
1336 }
1337 Ok(None)
1338 }
1339
1340 async fn get_unsent_secret_requests(&self) -> Result<Vec<GossipRequest>> {
1341 self.acquire()
1342 .await?
1343 .get_unsent_secret_requests()
1344 .await?
1345 .iter()
1346 .map(|value| {
1347 let request = self.deserialize_key_request(value, false)?;
1348 Ok(request)
1349 })
1350 .collect()
1351 }
1352
1353 async fn delete_outgoing_secret_requests(&self, request_id: &TransactionId) -> Result<()> {
1354 let request_id = self.encode_key("key_requests", request_id.as_bytes());
1355 Ok(self.acquire().await?.delete_key_request(request_id).await?)
1356 }
1357
1358 async fn get_secrets_from_inbox(
1359 &self,
1360 secret_name: &SecretName,
1361 ) -> Result<Vec<GossippedSecret>> {
1362 let secret_name = self.encode_key("secrets", secret_name.to_string());
1363
1364 self.acquire()
1365 .await?
1366 .get_secrets_from_inbox(secret_name)
1367 .await?
1368 .into_iter()
1369 .map(|value| self.deserialize_json(value.as_ref()))
1370 .collect()
1371 }
1372
1373 async fn delete_secrets_from_inbox(&self, secret_name: &SecretName) -> Result<()> {
1374 let secret_name = self.encode_key("secrets", secret_name.to_string());
1375 self.acquire().await?.delete_secrets_from_inbox(secret_name).await
1376 }
1377
1378 async fn get_withheld_info(
1379 &self,
1380 room_id: &RoomId,
1381 session_id: &str,
1382 ) -> Result<Option<RoomKeyWithheldEvent>> {
1383 let room_id = self.encode_key("direct_withheld_info", room_id);
1384 let session_id = self.encode_key("direct_withheld_info", session_id);
1385
1386 self.acquire()
1387 .await?
1388 .get_direct_withheld_info(session_id, room_id)
1389 .await?
1390 .map(|value| {
1391 let info = self.deserialize_json::<RoomKeyWithheldEvent>(&value)?;
1392 Ok(info)
1393 })
1394 .transpose()
1395 }
1396
1397 async fn get_room_settings(&self, room_id: &RoomId) -> Result<Option<RoomSettings>> {
1398 let room_id = self.encode_key("room_settings", room_id.as_bytes());
1399 let Some(value) = self.acquire().await?.get_room_settings(room_id).await? else {
1400 return Ok(None);
1401 };
1402
1403 let settings = self.deserialize_value(&value)?;
1404
1405 return Ok(Some(settings));
1406 }
1407
1408 async fn get_received_room_key_bundle_data(
1409 &self,
1410 room_id: &RoomId,
1411 user_id: &UserId,
1412 ) -> Result<Option<StoredRoomKeyBundleData>> {
1413 let room_id = self.encode_key("received_room_key_bundle", room_id);
1414 let user_id = self.encode_key("received_room_key_bundle", user_id);
1415 self.acquire()
1416 .await?
1417 .get_received_room_key_bundle(room_id, user_id)
1418 .await?
1419 .map(|value| self.deserialize_value(&value))
1420 .transpose()
1421 }
1422
1423 async fn get_custom_value(&self, key: &str) -> Result<Option<Vec<u8>>> {
1424 let Some(serialized) = self.acquire().await?.get_kv(key).await? else {
1425 return Ok(None);
1426 };
1427 let value = if let Some(cipher) = &self.store_cipher {
1428 let encrypted = rmp_serde::from_slice(&serialized)?;
1429 cipher.decrypt_value_data(encrypted)?
1430 } else {
1431 serialized
1432 };
1433
1434 Ok(Some(value))
1435 }
1436
1437 async fn set_custom_value(&self, key: &str, value: Vec<u8>) -> Result<()> {
1438 let serialized = if let Some(cipher) = &self.store_cipher {
1439 let encrypted = cipher.encrypt_value_data(value)?;
1440 rmp_serde::to_vec_named(&encrypted)?
1441 } else {
1442 value
1443 };
1444
1445 self.acquire().await?.set_kv(key, serialized).await?;
1446 Ok(())
1447 }
1448
1449 async fn remove_custom_value(&self, key: &str) -> Result<()> {
1450 let key = key.to_owned();
1451 self.acquire()
1452 .await?
1453 .interact(move |conn| conn.execute("DELETE FROM kv WHERE key = ?1", (&key,)))
1454 .await
1455 .unwrap()?;
1456 Ok(())
1457 }
1458
1459 async fn try_take_leased_lock(
1460 &self,
1461 lease_duration_ms: u32,
1462 key: &str,
1463 holder: &str,
1464 ) -> Result<bool> {
1465 let key = key.to_owned();
1466 let holder = holder.to_owned();
1467
1468 let now_ts: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
1469 let expiration_ts = now_ts + lease_duration_ms as u64;
1470
1471 let num_touched = self
1472 .acquire()
1473 .await?
1474 .with_transaction(move |txn| {
1475 txn.execute(
1476 "INSERT INTO lease_locks (key, holder, expiration_ts)
1477 VALUES (?1, ?2, ?3)
1478 ON CONFLICT (key)
1479 DO
1480 UPDATE SET holder = ?2, expiration_ts = ?3
1481 WHERE holder = ?2
1482 OR expiration_ts < ?4
1483 ",
1484 (key, holder, expiration_ts, now_ts),
1485 )
1486 })
1487 .await?;
1488
1489 Ok(num_touched == 1)
1490 }
1491
1492 async fn next_batch_token(&self) -> Result<Option<String>, Self::Error> {
1493 let conn = self.acquire().await?;
1494 if let Some(token) = conn.get_kv("next_batch_token").await? {
1495 let maybe_token: Option<String> = self.deserialize_value(&token)?;
1496 Ok(maybe_token)
1497 } else {
1498 Ok(None)
1499 }
1500 }
1501}
1502
1503#[cfg(test)]
1504mod tests {
1505 use std::path::Path;
1506
1507 use matrix_sdk_common::deserialized_responses::WithheldCode;
1508 use matrix_sdk_crypto::{
1509 cryptostore_integration_tests, cryptostore_integration_tests_time, olm::SenderDataType,
1510 store::CryptoStore,
1511 };
1512 use matrix_sdk_test::async_test;
1513 use once_cell::sync::Lazy;
1514 use ruma::{device_id, room_id, user_id};
1515 use similar_asserts::assert_eq;
1516 use tempfile::{tempdir, TempDir};
1517 use tokio::fs;
1518
1519 use super::SqliteCryptoStore;
1520 use crate::SqliteStoreConfig;
1521
1522 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1523
1524 struct TestDb {
1525 _dir: TempDir,
1528 database: SqliteCryptoStore,
1529 }
1530
1531 fn copy_db(data_path: &str) -> TempDir {
1532 let db_name = super::DATABASE_NAME;
1533
1534 let manifest_path = Path::new(env!("CARGO_MANIFEST_DIR")).join("../..");
1535 let database_path = manifest_path.join(data_path).join(db_name);
1536
1537 let tmpdir = tempdir().unwrap();
1538 let destination = tmpdir.path().join(db_name);
1539
1540 std::fs::copy(&database_path, destination).unwrap();
1542
1543 tmpdir
1544 }
1545
1546 async fn get_test_db(data_path: &str, passphrase: Option<&str>) -> TestDb {
1547 let tmpdir = copy_db(data_path);
1548
1549 let database = SqliteCryptoStore::open(tmpdir.path(), passphrase)
1550 .await
1551 .expect("Can't open the test store");
1552
1553 TestDb { _dir: tmpdir, database }
1554 }
1555
1556 #[async_test]
1557 async fn test_pool_size() {
1558 let store_open_config =
1559 SqliteStoreConfig::new(TMP_DIR.path().join("test_pool_size")).pool_max_size(42);
1560
1561 let store = SqliteCryptoStore::open_with_config(store_open_config).await.unwrap();
1562
1563 assert_eq!(store.pool.status().max_size, 42);
1564 }
1565
1566 #[async_test]
1569 async fn test_open_test_vector_store() {
1570 let TestDb { _dir: _, database } = get_test_db("testing/data/storage", None).await;
1571
1572 let account = database
1573 .load_account()
1574 .await
1575 .unwrap()
1576 .expect("The test database is prefilled with data, we should find an account");
1577
1578 let user_id = account.user_id();
1579 let device_id = account.device_id();
1580
1581 assert_eq!(
1582 user_id.as_str(),
1583 "@pjtest:synapse-oidc.element.dev",
1584 "The user ID should match to the one we expect."
1585 );
1586
1587 assert_eq!(
1588 device_id.as_str(),
1589 "v4TqgcuIH6",
1590 "The device ID should match to the one we expect."
1591 );
1592
1593 let device = database
1594 .get_device(user_id, device_id)
1595 .await
1596 .unwrap()
1597 .expect("Our own device should be found in the store.");
1598
1599 assert_eq!(device.device_id(), device_id);
1600 assert_eq!(device.user_id(), user_id);
1601
1602 assert_eq!(
1603 device.ed25519_key().expect("The device should have a Ed25519 key.").to_base64(),
1604 "+cxl1Gl3du5i7UJwfWnoRDdnafFF+xYdAiTYYhYLr8s"
1605 );
1606
1607 assert_eq!(
1608 device.curve25519_key().expect("The device should have a Curve25519 key.").to_base64(),
1609 "4SL9eEUlpyWSUvjljC5oMjknHQQJY7WZKo5S1KL/5VU"
1610 );
1611
1612 let identity = database
1613 .get_user_identity(user_id)
1614 .await
1615 .unwrap()
1616 .expect("The store should contain an identity.");
1617
1618 assert_eq!(identity.user_id(), user_id);
1619
1620 let identity = identity
1621 .own()
1622 .expect("The identity should be of the correct type, it should be our own identity.");
1623
1624 let master_key = identity
1625 .master_key()
1626 .get_first_key()
1627 .expect("Our own identity should have a master key");
1628
1629 assert_eq!(master_key.to_base64(), "iCUEtB1RwANeqRa5epDrblLk4mer/36sylwQ5hYY3oE");
1630 }
1631
1632 #[async_test]
1635 async fn test_open_test_vector_encrypted_store() {
1636 let TestDb { _dir: _, database } = get_test_db(
1637 "testing/data/storage/alice",
1638 Some(concat!(
1639 "/rCia2fYAJ+twCZ1Xm2mxFCYcmJdyzkdJjwtgXsziWpYS/UeNxnixuSieuwZXm+x1VsJHmWpl",
1640 "H+QIQBZpEGZtC9/S/l8xK+WOCesmET0o6yJ/KP73ofDtjBlnNpPwuHLKFpyTbyicpCgQ4UT+5E",
1641 "UBuJ08TY9Ujdf1D13k5kr5tSZUefDKKCuG1fCRqlU8ByRas1PMQsZxT2W8t7QgBrQiiGmhpo/O",
1642 "Ti4hfx97GOxncKcxTzppiYQNoHs/f15+XXQD7/oiCcqRIuUlXNsU6hRpFGmbYx2Pi1eyQViQCt",
1643 "B5dAEiSD0N8U81wXYnpynuTPtnL+hfnOJIn7Sy7mkERQeKg"
1644 )),
1645 )
1646 .await;
1647
1648 let account = database
1649 .load_account()
1650 .await
1651 .unwrap()
1652 .expect("The test database is prefilled with data, we should find an account");
1653
1654 let user_id = account.user_id();
1655 let device_id = account.device_id();
1656
1657 assert_eq!(
1658 user_id.as_str(),
1659 "@alice:localhost",
1660 "The user ID should match to the one we expect."
1661 );
1662
1663 assert_eq!(
1664 device_id.as_str(),
1665 "JVVORTHFXY",
1666 "The device ID should match to the one we expect."
1667 );
1668
1669 let tracked_users =
1670 database.load_tracked_users().await.expect("Should be tracking some users");
1671
1672 assert_eq!(tracked_users.len(), 6);
1673
1674 let known_users = vec![
1675 user_id!("@alice:localhost"),
1676 user_id!("@dehydration3:localhost"),
1677 user_id!("@eve:localhost"),
1678 user_id!("@bob:localhost"),
1679 user_id!("@malo:localhost"),
1680 user_id!("@carl:localhost"),
1681 ];
1682
1683 for user_id in known_users {
1685 database.get_user_identity(user_id).await.expect("Should load this identity").unwrap();
1686 }
1687
1688 let carl_identity =
1689 database.get_user_identity(user_id!("@carl:localhost")).await.unwrap().unwrap();
1690
1691 assert_eq!(
1692 carl_identity.master_key().get_first_key().unwrap().to_base64(),
1693 "CdhKYYDeBDQveOioXEGWhTPCyzc63Irpar3CNyfun2Q"
1694 );
1695 assert!(!carl_identity.was_previously_verified());
1696
1697 let bob_identity =
1698 database.get_user_identity(user_id!("@bob:localhost")).await.unwrap().unwrap();
1699
1700 assert_eq!(
1701 bob_identity.master_key().get_first_key().unwrap().to_base64(),
1702 "COh2GYOJWSjem5QPRCaGp9iWV83IELG1IzLKW2S3pFY"
1703 );
1704 assert!(bob_identity.was_previously_verified());
1706
1707 let known_devices = vec![
1708 (device_id!("OPXQHCZSKW"), user_id!("@alice:localhost")),
1709 (
1711 device_id!("EvW+9IrGR10KVgVeZP25/KaPfx4R86FofVMcaz7VOho"),
1712 user_id!("@alice:localhost"),
1713 ),
1714 (device_id!("HEEFRFQENV"), user_id!("@alice:localhost")),
1715 (device_id!("JVVORTHFXY"), user_id!("@alice:localhost")),
1716 (device_id!("NQUWWSKKHS"), user_id!("@alice:localhost")),
1717 (device_id!("ORBLPFYCPG"), user_id!("@alice:localhost")),
1718 (device_id!("YXOWENSEGM"), user_id!("@dehydration3:localhost")),
1719 (device_id!("VXLFMYCHXC"), user_id!("@bob:localhost")),
1720 (device_id!("FDGDQAEWOW"), user_id!("@bob:localhost")),
1721 (device_id!("VXLFMYCHXC"), user_id!("@bob:localhost")),
1722 (device_id!("FDGDQAEWOW"), user_id!("@bob:localhost")),
1723 (device_id!("QKUKWJTTQC"), user_id!("@malo:localhost")),
1724 (device_id!("LOUXJECTFG"), user_id!("@malo:localhost")),
1725 (device_id!("MKKMAEVLPB"), user_id!("@carl:localhost")),
1726 ];
1727
1728 for (device_id, user_id) in known_devices {
1729 database.get_device(user_id, device_id).await.expect("Should load the device").unwrap();
1730 }
1731
1732 let known_sender_key_to_session_count = vec![
1733 ("FfYcYfDF4nWy+LHdK6CEpIMlFAQDORc30WUkghL06kM", 1),
1734 ("EvW+9IrGR10KVgVeZP25/KaPfx4R86FofVMcaz7VOho", 1),
1735 ("hAGsoA4a9M6wwEUX5Q1jux1i+tUngLi01n5AmhDoHTY", 1),
1736 ("aKqtSJymLzuoglWFwPGk1r/Vm2LE2hFESzXxn4RNjRM", 0),
1737 ("zHK1psCrgeMn0kaz8hcdvA3INyar9jg1yfrSp0p1pHo", 1),
1738 ("1QmBA316Wj5jIFRwNOti6N6Xh/vW0bsYCcR4uPfy8VQ", 1),
1739 ("g5ef2vZF3VXgSPyODIeXpyHIRkuthvLhGvd6uwYggWU", 1),
1740 ("o7hfupPd1VsNkRIvdlH6ujrEJFSKjFCGbxhAd31XxjI", 1),
1741 ("Z3RxKQLxY7xpP+ZdOGR2SiNE37SrvmRhW7GPu1UGdm8", 1),
1742 ("GDomaav8NiY3J+dNEeApJm+O0FooJ3IpVaIyJzCN4w4", 1),
1743 ("7m7fqkHyEr47V5s/KjaxtJMOr3pSHrrns2q2lWpAQi8", 0),
1744 ("9psAkPUIF8vNbWbnviX3PlwRcaeO53EHJdNtKpTY1X0", 0),
1745 ("mqanh+ztw5oRtpqYQgLGW864i6NY2zpoKMIlrcyC+Aw", 0),
1746 ("fJU/TJdbsv7tVbbpHw1Ke73ziElnM32cNhP2WIg4T10", 0),
1747 ("sUIeFeFcCZoa5IC6nJ6Vrbvztcyx09m8BBg57XKRClg", 1),
1748 ];
1749
1750 for (id, count) in known_sender_key_to_session_count {
1751 let olm_sessions =
1752 database.get_sessions(id).await.expect("Should have some olm sessions");
1753
1754 println!("### Session id: {id:?}");
1755 assert_eq!(olm_sessions.map_or(0, |v| v.len()), count);
1756 }
1757
1758 let inbound_group_sessions = database.get_inbound_group_sessions().await.unwrap();
1759 assert_eq!(inbound_group_sessions.len(), 15);
1760 let known_inbound_group_sessions = vec![
1761 (
1762 "5hNAxrLai3VI0LKBwfh3wLfksfBFWds0W1a5X5/vSXA",
1763 room_id!("!SRstFdydzrGwJYtVfm:localhost"),
1764 ),
1765 (
1766 "M6d2eU3y54gaYTbvGSlqa/xc1Az35l56Cp9sxzHWO4g",
1767 room_id!("!SRstFdydzrGwJYtVfm:localhost"),
1768 ),
1769 (
1770 "IrydwXkRk2N2AqUMIVmLL3oJgMq14R9KId0P/uSD100",
1771 room_id!("!SRstFdydzrGwJYtVfm:localhost"),
1772 ),
1773 (
1774 "Y74+l9jTo7N5UF+GQwdpgJGe4sn1+QtWITq7BxulHIE",
1775 room_id!("!SRstFdydzrGwJYtVfm:localhost"),
1776 ),
1777 (
1778 "HpJxQR57WbQGdY6w2Q+C16znVvbXGa+JvQdRoMpWbXg",
1779 room_id!("!SRstFdydzrGwJYtVfm:localhost"),
1780 ),
1781 (
1782 "Xetvi+ydFkZt8dpONGFbEusQb/Chc2V0XlLByZhsbgE",
1783 room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
1784 ),
1785 (
1786 "wv/WN/39akyerIXczTaIpjAuLnwgXKRtbXFSEHiJqxo",
1787 room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
1788 ),
1789 (
1790 "nA4gQwL//Cm8OdlyjABl/jChbPT/cP5V4Sd8iuE6H0s",
1791 room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
1792 ),
1793 (
1794 "bAAgqFeRDTjfEqL6Qf/c9mk55zoNDCSlboAIRd6b0hw",
1795 room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
1796 ),
1797 (
1798 "exPbsMMdGfAG2qmDdFtpAn+koVprfzS0Zip/RA9QRCE",
1799 room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
1800 ),
1801 (
1802 "h+om7oSw/ZV94fcKaoe8FGXJwQXWOfKQfzbGgNWQILI",
1803 room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
1804 ),
1805 (
1806 "ul3VXonpgk4lO2L3fEWubP/nxsTmLHqu5v8ZM9vHEcw",
1807 room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
1808 ),
1809 (
1810 "JXY15UxC3az2mwg8uX4qwgxfvCM4aygiIWMcdNiVQoc",
1811 room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
1812 ),
1813 (
1814 "OGB9lObr9kWUvha9tB5sMfOF/Mztk24JwQz/nwg3iFQ",
1815 room_id!("!OgRiTRMaUzLdpCeDBM:localhost"),
1816 ),
1817 (
1818 "SFkHcbxjUOYF7mUAYI/oEMDZFaXszQbCN6Jza7iemj0",
1819 room_id!("!OgRiTRMaUzLdpCeDBM:localhost"),
1820 ),
1821 ];
1822
1823 for (session_id, room_id) in &known_inbound_group_sessions {
1825 database
1826 .get_inbound_group_session(room_id, session_id)
1827 .await
1828 .expect("Should be able to load inbound group session")
1829 .unwrap();
1830 }
1831
1832 let bob_sender_verified = database
1833 .get_inbound_group_session(
1834 room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
1835 "exPbsMMdGfAG2qmDdFtpAn+koVprfzS0Zip/RA9QRCE",
1836 )
1837 .await
1838 .unwrap()
1839 .unwrap();
1840
1841 assert_eq!(bob_sender_verified.sender_data.to_type(), SenderDataType::SenderVerified);
1842 assert!(bob_sender_verified.backed_up());
1843 assert!(!bob_sender_verified.has_been_imported());
1844
1845 let alice_unknown_device = database
1846 .get_inbound_group_session(
1847 room_id!("!SRstFdydzrGwJYtVfm:localhost"),
1848 "IrydwXkRk2N2AqUMIVmLL3oJgMq14R9KId0P/uSD100",
1849 )
1850 .await
1851 .unwrap()
1852 .unwrap();
1853
1854 assert_eq!(alice_unknown_device.sender_data.to_type(), SenderDataType::UnknownDevice);
1855 assert!(alice_unknown_device.backed_up());
1856 assert!(alice_unknown_device.has_been_imported());
1857
1858 let carl_tofu_session = database
1859 .get_inbound_group_session(
1860 room_id!("!OgRiTRMaUzLdpCeDBM:localhost"),
1861 "OGB9lObr9kWUvha9tB5sMfOF/Mztk24JwQz/nwg3iFQ",
1862 )
1863 .await
1864 .unwrap()
1865 .unwrap();
1866
1867 assert_eq!(carl_tofu_session.sender_data.to_type(), SenderDataType::SenderUnverified);
1868 assert!(carl_tofu_session.backed_up());
1869 assert!(!carl_tofu_session.has_been_imported());
1870
1871 database
1873 .get_outbound_group_session(room_id!("!OgRiTRMaUzLdpCeDBM:localhost"))
1874 .await
1875 .unwrap()
1876 .unwrap();
1877 database
1878 .get_outbound_group_session(room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"))
1879 .await
1880 .unwrap()
1881 .unwrap();
1882 database
1883 .get_outbound_group_session(room_id!("!SRstFdydzrGwJYtVfm:localhost"))
1884 .await
1885 .unwrap()
1886 .unwrap();
1887
1888 let withheld_info = database
1889 .get_withheld_info(
1890 room_id!("!OgRiTRMaUzLdpCeDBM:localhost"),
1891 "SASgZ+EklvAF4QxJclMlDRlmL0fAMjAJJIKFMdb4Ht0",
1892 )
1893 .await
1894 .expect("This session should be withheld")
1895 .unwrap();
1896
1897 assert_eq!(withheld_info.content.withheld_code(), WithheldCode::Unverified);
1898
1899 let backup_keys = database.load_backup_keys().await.expect("backup key should be cached");
1900 assert_eq!(backup_keys.backup_version.unwrap(), "6");
1901 assert!(backup_keys.decryption_key.is_some());
1902 }
1903
1904 async fn get_store(
1905 name: &str,
1906 passphrase: Option<&str>,
1907 clear_data: bool,
1908 ) -> SqliteCryptoStore {
1909 let tmpdir_path = TMP_DIR.path().join(name);
1910
1911 if clear_data {
1912 let _ = fs::remove_dir_all(&tmpdir_path).await;
1913 }
1914
1915 SqliteCryptoStore::open(tmpdir_path.to_str().unwrap(), passphrase)
1916 .await
1917 .expect("Can't create a secret protected store")
1918 }
1919
1920 cryptostore_integration_tests!();
1921 cryptostore_integration_tests_time!();
1922}
1923
1924#[cfg(test)]
1925mod encrypted_tests {
1926 use matrix_sdk_crypto::{cryptostore_integration_tests, cryptostore_integration_tests_time};
1927 use once_cell::sync::Lazy;
1928 use tempfile::{tempdir, TempDir};
1929 use tokio::fs;
1930
1931 use super::SqliteCryptoStore;
1932
1933 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1934
1935 async fn get_store(
1936 name: &str,
1937 passphrase: Option<&str>,
1938 clear_data: bool,
1939 ) -> SqliteCryptoStore {
1940 let tmpdir_path = TMP_DIR.path().join(name);
1941 let pass = passphrase.unwrap_or("default_test_password");
1942
1943 if clear_data {
1944 let _ = fs::remove_dir_all(&tmpdir_path).await;
1945 }
1946
1947 SqliteCryptoStore::open(tmpdir_path.to_str().unwrap(), Some(pass))
1948 .await
1949 .expect("Can't create a secret protected store")
1950 }
1951
1952 cryptostore_integration_tests!();
1953 cryptostore_integration_tests_time!();
1954}