Skip to main content

matrix_sdk_sqlite/
crypto_store.rs

1// Copyright 2022, 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::HashMap,
17    fmt,
18    ops::Deref,
19    path::Path,
20    sync::{Arc, RwLock},
21};
22
23use async_trait::async_trait;
24use matrix_sdk_base::cross_process_lock::CrossProcessLockGeneration;
25use matrix_sdk_crypto::{
26    Account, DeviceData, GossipRequest, GossippedSecret, SecretInfo, TrackedUser, UserIdentityData,
27    olm::{
28        InboundGroupSession, OutboundGroupSession, PickledInboundGroupSession,
29        PrivateCrossSigningIdentity, SenderDataType, Session, StaticAccountData,
30    },
31    store::{
32        CryptoStore,
33        types::{
34            BackupKeys, Changes, DehydratedDeviceKey, PendingChanges, RoomKeyCounts,
35            RoomKeyWithheldEntry, RoomPendingKeyBundleDetails, RoomSettings,
36            StoredRoomKeyBundleData,
37        },
38    },
39};
40use matrix_sdk_store_encryption::StoreCipher;
41use ruma::{
42    DeviceId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, RoomId, TransactionId, UserId,
43    events::secret::request::SecretName,
44};
45use rusqlite::{OptionalExtension, named_params, params_from_iter};
46use tokio::{
47    fs,
48    sync::{Mutex, OwnedMutexGuard},
49};
50use tracing::{debug, instrument, warn};
51use vodozemac::Curve25519PublicKey;
52use zeroize::Zeroizing;
53
54use crate::{
55    OpenStoreError, Secret, SqliteStoreConfig,
56    connection::{Connection as SqliteAsyncConn, Pool as SqlitePool},
57    error::{Error, Result},
58    utils::{
59        EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
60        SqliteKeyValueStoreConnExt, repeat_vars,
61    },
62};
63
64/// The database name.
65const DATABASE_NAME: &str = "matrix-sdk-crypto.sqlite3";
66
67/// An SQLite-based crypto store.
68#[derive(Clone)]
69pub struct SqliteCryptoStore {
70    store_cipher: Option<Arc<StoreCipher>>,
71
72    /// The pool of connections.
73    pool: SqlitePool,
74
75    /// We make the difference between connections for read operations, and for
76    /// write operations. We keep a single connection apart from write
77    /// operations. All other connections are used for read operations. The
78    /// lock is used to ensure there is one owner at a time.
79    write_connection: Arc<Mutex<SqliteAsyncConn>>,
80
81    // DB values cached in memory
82    static_account: Arc<RwLock<Option<StaticAccountData>>>,
83    save_changes_lock: Arc<Mutex<()>>,
84}
85
86#[cfg(not(tarpaulin_include))]
87impl fmt::Debug for SqliteCryptoStore {
88    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89        f.debug_struct("SqliteCryptoStore").finish_non_exhaustive()
90    }
91}
92
93impl EncryptableStore for SqliteCryptoStore {
94    fn get_cypher(&self) -> Option<&StoreCipher> {
95        self.store_cipher.as_deref()
96    }
97}
98
99impl SqliteCryptoStore {
100    /// Create an `SqliteCryptoStore` struct without trying to create the
101    /// database or migrate to a newer version.  This is only for use
102    /// internally, and for testing.
103    ///
104    /// # Arguments
105    ///
106    /// * `secret` - The secret used to encrypt the data.
107    ///
108    /// * `pool` - A connection pool to use for reading from the store.
109    ///
110    /// * `conn` - The connection to use for writing to the store.
111    pub(crate) async fn create_raw(
112        secret: Option<Secret>,
113        pool: SqlitePool,
114        conn: SqliteAsyncConn,
115    ) -> Result<Self, OpenStoreError> {
116        let store_cipher = match secret {
117            Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
118            None => None,
119        };
120
121        Ok(Self {
122            store_cipher,
123            pool,
124            write_connection: Arc::new(Mutex::new(conn)),
125            static_account: Arc::new(RwLock::new(None)),
126            save_changes_lock: Default::default(),
127        })
128    }
129
130    /// Open the SQLite-based crypto store at the given path using the given
131    /// passphrase to encrypt private data.
132    pub async fn open(
133        path: impl AsRef<Path>,
134        passphrase: Option<&str>,
135    ) -> Result<Self, OpenStoreError> {
136        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
137    }
138
139    /// Open the SQLite-based crypto store at the given path using the given
140    /// key to encrypt private data.
141    pub async fn open_with_key(
142        path: impl AsRef<Path>,
143        key: Option<&[u8; 32]>,
144    ) -> Result<Self, OpenStoreError> {
145        Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
146    }
147
148    /// Open the SQLite-based crypto store with the config open config.
149    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
150        fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir)?;
151
152        let pool = config.build_pool_of_connections(DATABASE_NAME)?;
153
154        let this = Self::open_with_pool(pool, config.secret).await?;
155        this.pool.get().await?.apply_runtime_config(config.runtime_config).await?;
156
157        Ok(this)
158    }
159
160    /// Create an SQLite-based crypto store using the given SQLite database
161    /// pool. The given secret will be used to encrypt private data.
162    async fn open_with_pool(
163        pool: SqlitePool,
164        secret: Option<Secret>,
165    ) -> Result<Self, OpenStoreError> {
166        let conn = pool.get().await?;
167
168        let version = conn.db_version().await?;
169        debug!("Opened sqlite store with version {}", version);
170
171        let version = initialize_store(&conn, version).await?;
172
173        let store = Self::create_raw(secret, pool, conn).await?;
174
175        run_migrations(&store, version, None).await?;
176
177        store.write().await.wal_checkpoint().await;
178
179        Ok(store)
180    }
181
182    fn deserialize_and_unpickle_inbound_group_session(
183        &self,
184        value: Vec<u8>,
185        backed_up: bool,
186    ) -> Result<InboundGroupSession> {
187        let mut pickle: PickledInboundGroupSession = self.deserialize_value(&value)?;
188
189        // The `backed_up` SQL column is the source of truth, because we update it
190        // inside `mark_inbound_group_sessions_as_backed_up` and don't update
191        // the pickled value inside the `data` column (until now, when we are puling it
192        // out of the DB).
193        pickle.backed_up = backed_up;
194
195        Ok(InboundGroupSession::from_pickle(pickle)?)
196    }
197
198    fn deserialize_key_request(&self, value: &[u8], sent_out: bool) -> Result<GossipRequest> {
199        let mut request: GossipRequest = self.deserialize_value(value)?;
200        // sent_out SQL column is source of truth, sent_out field in serialized value
201        // needed for other stores though
202        request.sent_out = sent_out;
203        Ok(request)
204    }
205
206    fn get_static_account(&self) -> Option<StaticAccountData> {
207        self.static_account.read().unwrap().clone()
208    }
209
210    /// Acquire a connection for executing read operations.
211    #[instrument(skip_all)]
212    async fn read(&self) -> Result<SqliteAsyncConn> {
213        Ok(self.pool.get().await?)
214    }
215
216    /// Acquire a connection for executing write operations.
217    #[instrument(skip_all)]
218    pub(crate) async fn write(&self) -> OwnedMutexGuard<SqliteAsyncConn> {
219        self.write_connection.clone().lock_owned().await
220    }
221}
222
223const DATABASE_VERSION: u8 = 15;
224
225/// key for the dehydrated device pickle key in the key/value table.
226const DEHYDRATED_DEVICE_PICKLE_KEY: &str = "dehydrated_device_pickle_key";
227
228/// Initialize the database to version 1
229///
230/// This must be done before creating the store cipher, because the store cipher
231/// requires the `kv` table.
232///
233/// # Arguments
234///
235/// * `conn` - The connection to use.
236///
237/// * `version` - the current version of the database.
238pub(crate) async fn initialize_store(conn: &SqliteAsyncConn, version: u8) -> Result<u8> {
239    if version == 0 {
240        debug!("Creating database");
241    } else if version < DATABASE_VERSION {
242        debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
243    } else {
244        return Ok(version);
245    }
246
247    if version < 1 {
248        debug!("Creating database");
249        // First turn on WAL mode, this can't be done in the transaction, it fails with
250        // the error message: "cannot change into wal mode from within a transaction".
251        conn.execute_batch("PRAGMA journal_mode = wal;").await?;
252        conn.with_transaction(|txn| {
253            txn.execute_batch(include_str!("../migrations/crypto_store/001_init.sql"))?;
254            txn.set_db_version(1)
255        })
256        .await?;
257        return Ok(1);
258    }
259
260    Ok(version)
261}
262
263/// Run migrations for the given version of the database.
264///
265/// # Arguments
266///
267/// * `store` - The store to run the migrations on
268///
269/// * `version` - The current version of the database.
270///
271/// * `max_version` - The maximum version that the database will be migrated to.
272///   Only used for testing, so will only be checked for the versions that are
273///   needed for tests.
274pub(crate) async fn run_migrations(
275    store: &SqliteCryptoStore,
276    version: u8,
277    max_version: Option<u8>,
278) -> Result<()> {
279    let conn = store.write().await;
280
281    if version < 2 {
282        debug!("Upgrading database to version 2");
283        conn.with_transaction(|txn| {
284            txn.execute_batch(include_str!("../migrations/crypto_store/002_reset_olm_hash.sql"))?;
285            txn.set_db_version(2)
286        })
287        .await?;
288    }
289
290    if version < 3 {
291        debug!("Upgrading database to version 3");
292        conn.with_transaction(|txn| {
293            txn.execute_batch(include_str!("../migrations/crypto_store/003_room_settings.sql"))?;
294            txn.set_db_version(3)
295        })
296        .await?;
297    }
298
299    if version < 4 {
300        debug!("Upgrading database to version 4");
301        conn.with_transaction(|txn| {
302            txn.execute_batch(include_str!(
303                "../migrations/crypto_store/004_drop_outbound_group_sessions.sql"
304            ))?;
305            txn.set_db_version(4)
306        })
307        .await?;
308    }
309
310    if version < 5 {
311        debug!("Upgrading database to version 5");
312        conn.with_transaction(|txn| {
313            txn.execute_batch(include_str!("../migrations/crypto_store/005_withheld_code.sql"))?;
314            txn.set_db_version(5)
315        })
316        .await?;
317    }
318
319    if version < 6 {
320        debug!("Upgrading database to version 6");
321        conn.with_transaction(|txn| {
322            txn.execute_batch(include_str!(
323                "../migrations/crypto_store/006_drop_outbound_group_sessions.sql"
324            ))?;
325            txn.set_db_version(6)
326        })
327        .await?;
328    }
329
330    if version < 7 {
331        debug!("Upgrading database to version 7");
332        conn.with_transaction(|txn| {
333            txn.execute_batch(include_str!("../migrations/crypto_store/007_lock_leases.sql"))?;
334            txn.set_db_version(7)
335        })
336        .await?;
337    }
338
339    if version < 8 {
340        debug!("Upgrading database to version 8");
341        conn.with_transaction(|txn| {
342            txn.execute_batch(include_str!("../migrations/crypto_store/008_secret_inbox.sql"))?;
343            txn.set_db_version(8)
344        })
345        .await?;
346    }
347
348    if version < 9 {
349        debug!("Upgrading database to version 9");
350        conn.with_transaction(|txn| {
351            txn.execute_batch(include_str!(
352                "../migrations/crypto_store/009_inbound_group_session_sender_key_sender_data_type.sql"
353            ))?;
354            txn.set_db_version(9)
355        })
356        .await?;
357    }
358
359    if version < 10 {
360        debug!("Upgrading database to version 10");
361        conn.with_transaction(|txn| {
362            txn.execute_batch(include_str!(
363                "../migrations/crypto_store/010_received_room_key_bundles.sql"
364            ))?;
365            txn.set_db_version(10)
366        })
367        .await?;
368    }
369
370    if version < 11 {
371        debug!("Upgrading database to version 11");
372        conn.with_transaction(|txn| {
373            txn.execute_batch(include_str!(
374                "../migrations/crypto_store/011_received_room_key_bundles_with_curve_key.sql"
375            ))?;
376            txn.set_db_version(11)
377        })
378        .await?;
379    }
380
381    if version < 12 {
382        debug!("Upgrading database to version 12");
383        conn.with_transaction(|txn| {
384            txn.execute_batch(include_str!(
385                "../migrations/crypto_store/012_withheld_code_by_room.sql"
386            ))?;
387            txn.set_db_version(12)
388        })
389        .await?;
390    }
391
392    if version < 13 {
393        debug!("Upgrading database to version 13");
394        conn.with_transaction(|txn| {
395            txn.execute_batch(include_str!(
396                "../migrations/crypto_store/013_lease_locks_with_generation.sql"
397            ))?;
398            txn.set_db_version(13)
399        })
400        .await?;
401    }
402
403    if version < 14 {
404        debug!("Upgrading database to version 14");
405        conn.with_transaction(|txn| {
406            txn.execute_batch(include_str!(
407                "../migrations/crypto_store/014_room_key_backups_fully_downloaded.sql"
408            ))?;
409            txn.set_db_version(14)
410        })
411        .await?;
412    }
413
414    if version < 15 {
415        debug!("Upgrading database to version 15");
416        conn.with_transaction(|txn| {
417            txn.execute_batch(include_str!(
418                "../migrations/crypto_store/015_rooms_pending_key_bundle.sql"
419            ))?;
420            txn.set_db_version(15)
421        })
422        .await?;
423    }
424
425    if version < 16 {
426        debug!("Upgrading database to version 16");
427        conn.with_transaction(|txn| {
428            txn.execute_batch(include_str!(
429                "../migrations/crypto_store/016_remove_old_generation_counter.sql"
430            ))?;
431            txn.set_db_version(16)
432        })
433        .await?;
434    }
435
436    if max_version.is_some_and(|max_version| max_version < 17) {
437        return Ok(());
438    }
439
440    if version < 17 {
441        let store = store.clone();
442        conn.with_transaction(move |txn| {
443            txn.execute_batch(include_str!(
444                "../migrations/crypto_store/017_add_new_secrets_inbox.sql"
445            ))?;
446            let mut select_query = txn.prepare("SELECT data FROM secrets")?;
447            let mut secrets = select_query.query([])?;
448            let mut insert_query = txn.prepare(
449                "INSERT OR IGNORE INTO secrets_inbox (secret_name, secret)
450            VALUES (?1, ?2)",
451            )?;
452            while let Some(row) = secrets.next()? {
453                let Ok(secret) =
454                    store.deserialize_json::<GossippedSecret>(row.get::<_, Vec<u8>>(0)?.as_ref())
455                else {
456                    continue;
457                };
458                let Ok(encoded_secret) = store.serialize_json(&secret.event.content.secret) else {
459                    continue;
460                };
461                insert_query.execute((
462                    store.encode_key("secrets_inbox", secret.secret_name.to_string()),
463                    &encoded_secret,
464                ))?;
465            }
466            txn.execute_batch(include_str!(
467                "../migrations/crypto_store/017_drop_old_secrets_inbox.sql"
468            ))?;
469            txn.set_db_version(17)
470        })
471        .await?;
472    }
473
474    Ok(())
475}
476
477trait SqliteConnectionExt {
478    fn set_session(
479        &self,
480        session_id: &[u8],
481        sender_key: &[u8],
482        data: &[u8],
483    ) -> rusqlite::Result<()>;
484
485    fn set_inbound_group_session(
486        &self,
487        room_id: &[u8],
488        session_id: &[u8],
489        data: &[u8],
490        backed_up: bool,
491        sender_key: Option<&[u8]>,
492        sender_data_type: Option<u8>,
493    ) -> rusqlite::Result<()>;
494
495    fn set_outbound_group_session(&self, room_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
496
497    fn set_device(&self, user_id: &[u8], device_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
498    fn delete_device(&self, user_id: &[u8], device_id: &[u8]) -> rusqlite::Result<()>;
499
500    fn set_identity(&self, user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
501
502    fn add_olm_hash(&self, data: &[u8]) -> rusqlite::Result<()>;
503
504    fn set_key_request(
505        &self,
506        request_id: &[u8],
507        sent_out: bool,
508        data: &[u8],
509    ) -> rusqlite::Result<()>;
510
511    fn set_direct_withheld(
512        &self,
513        session_id: &[u8],
514        room_id: &[u8],
515        data: &[u8],
516    ) -> rusqlite::Result<()>;
517
518    fn set_room_settings(&self, room_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
519
520    fn set_secret(&self, request_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
521
522    fn set_received_room_key_bundle(
523        &self,
524        room_id: &[u8],
525        user_id: &[u8],
526        data: &[u8],
527    ) -> rusqlite::Result<()>;
528
529    fn set_has_downloaded_all_room_keys(&self, room_id: &[u8]) -> rusqlite::Result<()>;
530
531    fn set_room_pending_key_bundle(
532        &self,
533        room_id: &[u8],
534        details: Option<&[u8]>,
535    ) -> rusqlite::Result<()>;
536}
537
538impl SqliteConnectionExt for rusqlite::Connection {
539    fn set_session(
540        &self,
541        session_id: &[u8],
542        sender_key: &[u8],
543        data: &[u8],
544    ) -> rusqlite::Result<()> {
545        self.execute(
546            "INSERT INTO session (session_id, sender_key, data)
547             VALUES (?1, ?2, ?3)
548             ON CONFLICT (session_id) DO UPDATE SET data = ?3",
549            (session_id, sender_key, data),
550        )?;
551        Ok(())
552    }
553
554    fn set_inbound_group_session(
555        &self,
556        room_id: &[u8],
557        session_id: &[u8],
558        data: &[u8],
559        backed_up: bool,
560        sender_key: Option<&[u8]>,
561        sender_data_type: Option<u8>,
562    ) -> rusqlite::Result<()> {
563        self.execute(
564            "INSERT INTO inbound_group_session (session_id, room_id, data, backed_up, sender_key, sender_data_type) \
565             VALUES (?1, ?2, ?3, ?4, ?5, ?6)
566             ON CONFLICT (session_id) DO UPDATE SET data = ?3, backed_up = ?4, sender_key = ?5, sender_data_type = ?6",
567            (session_id, room_id, data, backed_up, sender_key, sender_data_type),
568        )?;
569        Ok(())
570    }
571
572    fn set_outbound_group_session(&self, room_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
573        self.execute(
574            "INSERT INTO outbound_group_session (room_id, data) \
575             VALUES (?1, ?2)
576             ON CONFLICT (room_id) DO UPDATE SET data = ?2",
577            (room_id, data),
578        )?;
579        Ok(())
580    }
581
582    fn set_device(&self, user_id: &[u8], device_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
583        self.execute(
584            "INSERT INTO device (user_id, device_id, data) \
585             VALUES (?1, ?2, ?3)
586             ON CONFLICT (user_id, device_id) DO UPDATE SET data = ?3",
587            (user_id, device_id, data),
588        )?;
589        Ok(())
590    }
591
592    fn delete_device(&self, user_id: &[u8], device_id: &[u8]) -> rusqlite::Result<()> {
593        self.execute(
594            "DELETE FROM device WHERE user_id = ? AND device_id = ?",
595            (user_id, device_id),
596        )?;
597        Ok(())
598    }
599
600    fn set_identity(&self, user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
601        self.execute(
602            "INSERT INTO identity (user_id, data) \
603             VALUES (?1, ?2)
604             ON CONFLICT (user_id) DO UPDATE SET data = ?2",
605            (user_id, data),
606        )?;
607        Ok(())
608    }
609
610    fn add_olm_hash(&self, data: &[u8]) -> rusqlite::Result<()> {
611        self.execute("INSERT INTO olm_hash (data) VALUES (?) ON CONFLICT DO NOTHING", (data,))?;
612        Ok(())
613    }
614
615    fn set_key_request(
616        &self,
617        request_id: &[u8],
618        sent_out: bool,
619        data: &[u8],
620    ) -> rusqlite::Result<()> {
621        self.execute(
622            "INSERT INTO key_requests (request_id, sent_out, data)
623            VALUES (?1, ?2, ?3)
624            ON CONFLICT (request_id) DO UPDATE SET sent_out = ?2, data = ?3",
625            (request_id, sent_out, data),
626        )?;
627        Ok(())
628    }
629
630    fn set_direct_withheld(
631        &self,
632        session_id: &[u8],
633        room_id: &[u8],
634        data: &[u8],
635    ) -> rusqlite::Result<()> {
636        self.execute(
637            "INSERT INTO direct_withheld_info (session_id, room_id, data)
638            VALUES (?1, ?2, ?3)
639            ON CONFLICT (session_id) DO UPDATE SET room_id = ?2, data = ?3",
640            (session_id, room_id, data),
641        )?;
642        Ok(())
643    }
644
645    fn set_room_settings(&self, room_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
646        self.execute(
647            "INSERT INTO room_settings (room_id, data)
648            VALUES (?1, ?2)
649            ON CONFLICT (room_id) DO UPDATE SET data = ?2",
650            (room_id, data),
651        )?;
652        Ok(())
653    }
654
655    fn set_secret(&self, secret_name: &[u8], secret: &[u8]) -> rusqlite::Result<()> {
656        // Ignore duplicate values, since we may get set the same secret
657        // multiple times.
658        self.execute(
659            "INSERT OR IGNORE INTO secrets_inbox (secret_name, secret)
660            VALUES (?1, ?2)",
661            (secret_name, secret),
662        )?;
663
664        Ok(())
665    }
666
667    fn set_received_room_key_bundle(
668        &self,
669        room_id: &[u8],
670        sender_user_id: &[u8],
671        data: &[u8],
672    ) -> rusqlite::Result<()> {
673        self.execute(
674            "INSERT INTO received_room_key_bundle(room_id, sender_user_id, bundle_data)
675            VALUES (?1, ?2, ?3)
676            ON CONFLICT (room_id, sender_user_id) DO UPDATE SET bundle_data = ?3",
677            (room_id, sender_user_id, data),
678        )?;
679        Ok(())
680    }
681
682    fn set_room_pending_key_bundle(
683        &self,
684        room_id: &[u8],
685        data: Option<&[u8]>,
686    ) -> rusqlite::Result<()> {
687        if let Some(data) = data {
688            self.execute(
689                "INSERT INTO rooms_pending_key_bundle (room_id, data)
690                 VALUES (?1, ?2)
691                 ON CONFLICT (room_id) DO UPDATE SET data = ?2",
692                (room_id, data),
693            )?;
694        } else {
695            self.execute("DELETE FROM rooms_pending_key_bundle WHERE room_id = ?1", (room_id,))?;
696        }
697        Ok(())
698    }
699
700    fn set_has_downloaded_all_room_keys(&self, room_id: &[u8]) -> rusqlite::Result<()> {
701        self.execute(
702            "INSERT INTO room_key_backups_fully_downloaded(room_id)
703             VALUES (?1)
704             ON CONFLICT(room_id) DO NOTHING",
705            (room_id,),
706        )?;
707        Ok(())
708    }
709}
710
711#[async_trait]
712trait SqliteObjectCryptoStoreExt: SqliteAsyncConnExt {
713    async fn get_sessions_for_sender_key(&self, sender_key: Key) -> Result<Vec<Vec<u8>>> {
714        Ok(self
715            .prepare("SELECT data FROM session WHERE sender_key = ?", |mut stmt| {
716                stmt.query((sender_key,))?.mapped(|row| row.get(0)).collect()
717            })
718            .await?)
719    }
720
721    async fn get_inbound_group_session(
722        &self,
723        session_id: Key,
724    ) -> Result<Option<(Vec<u8>, Vec<u8>, bool)>> {
725        Ok(self
726            .query_row(
727                "SELECT room_id, data, backed_up FROM inbound_group_session WHERE session_id = ?",
728                (session_id,),
729                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
730            )
731            .await
732            .optional()?)
733    }
734
735    async fn get_inbound_group_sessions(&self) -> Result<Vec<(Vec<u8>, bool)>> {
736        Ok(self
737            .prepare("SELECT data, backed_up FROM inbound_group_session", |mut stmt| {
738                stmt.query(())?.mapped(|row| Ok((row.get(0)?, row.get(1)?))).collect()
739            })
740            .await?)
741    }
742
743    async fn get_inbound_group_session_counts(
744        &self,
745        _backup_version: Option<&str>,
746    ) -> Result<RoomKeyCounts> {
747        let total = self
748            .query_row("SELECT count(*) FROM inbound_group_session", (), |row| row.get(0))
749            .await?;
750        let backed_up = self
751            .query_row(
752                "SELECT count(*) FROM inbound_group_session WHERE backed_up = TRUE",
753                (),
754                |row| row.get(0),
755            )
756            .await?;
757        Ok(RoomKeyCounts { total, backed_up })
758    }
759
760    async fn get_inbound_group_sessions_by_room_id(
761        &self,
762        room_id: Key,
763    ) -> Result<Vec<(Vec<u8>, bool)>> {
764        Ok(self
765            .prepare(
766                "SELECT data, backed_up FROM inbound_group_session WHERE room_id = :room_id",
767                move |mut stmt| {
768                    stmt.query(named_params! {
769                        ":room_id": room_id,
770                    })?
771                    .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
772                    .collect()
773                },
774            )
775            .await?)
776    }
777
778    async fn get_inbound_group_sessions_for_device_batch(
779        &self,
780        sender_key: Key,
781        sender_data_type: SenderDataType,
782        after_session_id: Option<Key>,
783        limit: usize,
784    ) -> Result<Vec<(Vec<u8>, bool)>> {
785        Ok(self
786            .prepare(
787                "
788                SELECT data, backed_up
789                FROM inbound_group_session
790                WHERE sender_key = :sender_key
791                    AND sender_data_type = :sender_data_type
792                    AND session_id > :after_session_id
793                ORDER BY session_id
794                LIMIT :limit
795                ",
796                move |mut stmt| {
797                    let sender_data_type = sender_data_type as u8;
798
799                    // If we are not provided with an `after_session_id`, use a key which will sort
800                    // before all real keys: the empty string.
801                    let after_session_id = after_session_id.unwrap_or(Key::Plain(Vec::new()));
802
803                    stmt.query(named_params! {
804                        ":sender_key": sender_key,
805                        ":sender_data_type": sender_data_type,
806                        ":after_session_id": after_session_id,
807                        ":limit": limit,
808                    })?
809                    .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
810                    .collect()
811                },
812            )
813            .await?)
814    }
815
816    async fn get_inbound_group_sessions_for_backup(&self, limit: usize) -> Result<Vec<Vec<u8>>> {
817        Ok(self
818            .prepare(
819                "SELECT data FROM inbound_group_session WHERE backed_up = FALSE LIMIT ?",
820                move |mut stmt| stmt.query((limit,))?.mapped(|row| row.get(0)).collect(),
821            )
822            .await?)
823    }
824
825    async fn mark_inbound_group_sessions_as_backed_up(&self, session_ids: Vec<Key>) -> Result<()> {
826        if session_ids.is_empty() {
827            // We are not expecting to be called with an empty list of sessions
828            warn!("No sessions to mark as backed up!");
829            return Ok(());
830        }
831
832        let session_ids_len = session_ids.len();
833
834        self.chunk_large_query_over(session_ids, None, move |txn, session_ids| {
835            // Safety: placeholders is not generated using any user input except the number
836            // of session IDs, so it is safe from injection.
837            let sql_params = repeat_vars(session_ids_len);
838            let query = format!("UPDATE inbound_group_session SET backed_up = TRUE where session_id IN ({sql_params})");
839            txn.prepare(&query)?.execute(params_from_iter(session_ids.iter()))?;
840            Ok(Vec::<()>::new())
841        }).await?;
842
843        Ok(())
844    }
845
846    async fn reset_inbound_group_session_backup_state(&self) -> Result<()> {
847        self.execute("UPDATE inbound_group_session SET backed_up = FALSE", ()).await?;
848        Ok(())
849    }
850
851    async fn get_outbound_group_session(&self, room_id: Key) -> Result<Option<Vec<u8>>> {
852        Ok(self
853            .query_row(
854                "SELECT data FROM outbound_group_session WHERE room_id = ?",
855                (room_id,),
856                |row| row.get(0),
857            )
858            .await
859            .optional()?)
860    }
861
862    async fn get_device(&self, user_id: Key, device_id: Key) -> Result<Option<Vec<u8>>> {
863        Ok(self
864            .query_row(
865                "SELECT data FROM device WHERE user_id = ? AND device_id = ?",
866                (user_id, device_id),
867                |row| row.get(0),
868            )
869            .await
870            .optional()?)
871    }
872
873    async fn get_user_devices(&self, user_id: Key) -> Result<Vec<Vec<u8>>> {
874        Ok(self
875            .prepare("SELECT data FROM device WHERE user_id = ?", |mut stmt| {
876                stmt.query((user_id,))?.mapped(|row| row.get(0)).collect()
877            })
878            .await?)
879    }
880
881    async fn get_user_identity(&self, user_id: Key) -> Result<Option<Vec<u8>>> {
882        Ok(self
883            .query_row("SELECT data FROM identity WHERE user_id = ?", (user_id,), |row| row.get(0))
884            .await
885            .optional()?)
886    }
887
888    async fn has_olm_hash(&self, data: Vec<u8>) -> Result<bool> {
889        Ok(self
890            .query_row("SELECT count(*) FROM olm_hash WHERE data = ?", (data,), |row| {
891                row.get::<_, i32>(0)
892            })
893            .await?
894            > 0)
895    }
896
897    async fn get_tracked_users(&self) -> Result<Vec<Vec<u8>>> {
898        Ok(self
899            .prepare("SELECT data FROM tracked_user", |mut stmt| {
900                stmt.query(())?.mapped(|row| row.get(0)).collect()
901            })
902            .await?)
903    }
904
905    async fn add_tracked_users(&self, users: Vec<(Key, Vec<u8>)>) -> Result<()> {
906        Ok(self
907            .prepare(
908                "INSERT INTO tracked_user (user_id, data) \
909                 VALUES (?1, ?2) \
910                 ON CONFLICT (user_id) DO UPDATE SET data = ?2",
911                |mut stmt| {
912                    for (user_id, data) in users {
913                        stmt.execute((user_id, data))?;
914                    }
915
916                    Ok(())
917                },
918            )
919            .await?)
920    }
921
922    async fn get_outgoing_secret_request(
923        &self,
924        request_id: Key,
925    ) -> Result<Option<(Vec<u8>, bool)>> {
926        Ok(self
927            .query_row(
928                "SELECT data, sent_out FROM key_requests WHERE request_id = ?",
929                (request_id,),
930                |row| Ok((row.get(0)?, row.get(1)?)),
931            )
932            .await
933            .optional()?)
934    }
935
936    async fn get_outgoing_secret_requests(&self) -> Result<Vec<(Vec<u8>, bool)>> {
937        Ok(self
938            .prepare("SELECT data, sent_out FROM key_requests", |mut stmt| {
939                stmt.query(())?.mapped(|row| Ok((row.get(0)?, row.get(1)?))).collect()
940            })
941            .await?)
942    }
943
944    async fn get_unsent_secret_requests(&self) -> Result<Vec<Vec<u8>>> {
945        Ok(self
946            .prepare("SELECT data FROM key_requests WHERE sent_out = FALSE", |mut stmt| {
947                stmt.query(())?.mapped(|row| row.get(0)).collect()
948            })
949            .await?)
950    }
951
952    async fn delete_key_request(&self, request_id: Key) -> Result<()> {
953        self.execute("DELETE FROM key_requests WHERE request_id = ?", (request_id,)).await?;
954        Ok(())
955    }
956
957    async fn get_secrets_from_inbox(&self, secret_name: Key) -> Result<Vec<Vec<u8>>> {
958        Ok(self
959            .prepare("SELECT secret FROM secrets_inbox WHERE secret_name = ?", |mut stmt| {
960                stmt.query((secret_name,))?.mapped(|row| row.get(0)).collect()
961            })
962            .await?)
963    }
964
965    async fn delete_secrets_from_inbox(&self, secret_name: Key) -> Result<()> {
966        self.execute("DELETE FROM secrets_inbox WHERE secret_name = ?", (secret_name,)).await?;
967        Ok(())
968    }
969
970    async fn get_direct_withheld_info(
971        &self,
972        session_id: Key,
973        room_id: Key,
974    ) -> Result<Option<Vec<u8>>> {
975        Ok(self
976            .query_row(
977                "SELECT data FROM direct_withheld_info WHERE session_id = ?1 AND room_id = ?2",
978                (session_id, room_id),
979                |row| row.get(0),
980            )
981            .await
982            .optional()?)
983    }
984
985    async fn get_withheld_sessions_by_room_id(&self, room_id: Key) -> Result<Vec<Vec<u8>>> {
986        Ok(self
987            .prepare("SELECT data FROM direct_withheld_info WHERE room_id = ?1", |mut stmt| {
988                stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
989            })
990            .await?)
991    }
992
993    async fn get_room_settings(&self, room_id: Key) -> Result<Option<Vec<u8>>> {
994        Ok(self
995            .query_row("SELECT data FROM room_settings WHERE room_id = ?", (room_id,), |row| {
996                row.get(0)
997            })
998            .await
999            .optional()?)
1000    }
1001
1002    async fn get_received_room_key_bundle(
1003        &self,
1004        room_id: Key,
1005        sender_user: Key,
1006    ) -> Result<Option<Vec<u8>>> {
1007        Ok(self
1008            .query_row(
1009                "SELECT bundle_data FROM received_room_key_bundle WHERE room_id = ? AND sender_user_id = ?",
1010                (room_id, sender_user),
1011                |row| { row.get(0) },
1012            )
1013            .await
1014            .optional()?)
1015    }
1016
1017    async fn get_room_pending_key_bundle(&self, room_id: Key) -> Result<Option<Vec<u8>>> {
1018        Ok(self
1019            .query_row(
1020                "SELECT data FROM rooms_pending_key_bundle WHERE room_id = ?",
1021                (room_id,),
1022                |row| row.get(0),
1023            )
1024            .await
1025            .optional()?)
1026    }
1027
1028    async fn get_all_rooms_pending_key_bundle(&self) -> Result<Vec<Vec<u8>>> {
1029        Ok(self
1030            .query_many("SELECT data FROM rooms_pending_key_bundle", (), |row| row.get(0))
1031            .await?)
1032    }
1033
1034    async fn has_downloaded_all_room_keys(&self, room_id: Key) -> Result<bool> {
1035        Ok(self
1036            .query_row(
1037                "SELECT EXISTS (SELECT 1 FROM room_key_backups_fully_downloaded WHERE room_id = ?)",
1038                (room_id,),
1039                |row| row.get(0),
1040            )
1041            .await?)
1042    }
1043}
1044
1045#[async_trait]
1046impl SqliteObjectCryptoStoreExt for SqliteAsyncConn {}
1047
1048#[async_trait]
1049impl CryptoStore for SqliteCryptoStore {
1050    type Error = Error;
1051
1052    async fn load_account(&self) -> Result<Option<Account>> {
1053        let conn = self.read().await?;
1054        if let Some(pickle) = conn.get_kv("account").await? {
1055            let pickle = self.deserialize_value(&pickle)?;
1056
1057            let account = Account::from_pickle(pickle).map_err(|_| Error::Unpickle)?;
1058
1059            *self.static_account.write().unwrap() = Some(account.static_data().clone());
1060
1061            Ok(Some(account))
1062        } else {
1063            Ok(None)
1064        }
1065    }
1066
1067    async fn load_identity(&self) -> Result<Option<PrivateCrossSigningIdentity>> {
1068        let conn = self.read().await?;
1069        if let Some(i) = conn.get_kv("identity").await? {
1070            let pickle = self.deserialize_value(&i)?;
1071            Ok(Some(PrivateCrossSigningIdentity::from_pickle(pickle).map_err(|_| Error::Unpickle)?))
1072        } else {
1073            Ok(None)
1074        }
1075    }
1076
1077    async fn save_pending_changes(&self, changes: PendingChanges) -> Result<()> {
1078        // Serialize calls to `save_pending_changes`; there are multiple await points
1079        // below, and we're pickling data as we go, so we don't want to
1080        // invalidate data we've previously read and overwrite it in the store.
1081        // TODO: #2000 should make this lock go away, or change its shape.
1082        let _guard = self.save_changes_lock.lock().await;
1083
1084        let pickled_account = if let Some(account) = changes.account {
1085            *self.static_account.write().unwrap() = Some(account.static_data().clone());
1086            Some(account.pickle())
1087        } else {
1088            None
1089        };
1090
1091        let this = self.clone();
1092        self.write()
1093            .await
1094            .with_transaction(move |txn| {
1095                if let Some(pickled_account) = pickled_account {
1096                    let serialized_account = this.serialize_value(&pickled_account)?;
1097                    txn.set_kv("account", &serialized_account)?;
1098                }
1099
1100                Ok::<_, Error>(())
1101            })
1102            .await?;
1103
1104        Ok(())
1105    }
1106
1107    async fn save_changes(&self, changes: Changes) -> Result<()> {
1108        // Serialize calls to `save_changes`; there are multiple await points below, and
1109        // we're pickling data as we go, so we don't want to invalidate data
1110        // we've previously read and overwrite it in the store.
1111        // TODO: #2000 should make this lock go away, or change its shape.
1112        let _guard = self.save_changes_lock.lock().await;
1113
1114        let pickled_private_identity =
1115            if let Some(i) = changes.private_identity { Some(i.pickle().await) } else { None };
1116
1117        let mut session_changes = Vec::new();
1118
1119        for session in changes.sessions {
1120            let session_id = self.encode_key("session", session.session_id());
1121            let sender_key = self.encode_key("session", session.sender_key().to_base64());
1122            let pickle = session.pickle().await;
1123            session_changes.push((session_id, sender_key, pickle));
1124        }
1125
1126        let mut inbound_session_changes = Vec::new();
1127        for session in changes.inbound_group_sessions {
1128            let room_id = self.encode_key("inbound_group_session", session.room_id().as_bytes());
1129            let session_id = self.encode_key("inbound_group_session", session.session_id());
1130            let pickle = session.pickle().await;
1131            let sender_key =
1132                self.encode_key("inbound_group_session", session.sender_key().to_base64());
1133            inbound_session_changes.push((room_id, session_id, pickle, sender_key));
1134        }
1135
1136        let mut outbound_session_changes = Vec::new();
1137        for session in changes.outbound_group_sessions {
1138            let room_id = self.encode_key("outbound_group_session", session.room_id().as_bytes());
1139            let pickle = session.pickle().await;
1140            outbound_session_changes.push((room_id, pickle));
1141        }
1142
1143        let this = self.clone();
1144        self.write()
1145            .await
1146            .with_transaction(move |txn| {
1147                if let Some(pickled_private_identity) = &pickled_private_identity {
1148                    let serialized_private_identity =
1149                        this.serialize_value(pickled_private_identity)?;
1150                    txn.set_kv("identity", &serialized_private_identity)?;
1151                }
1152
1153                if let Some(token) = &changes.next_batch_token {
1154                    let serialized_token = this.serialize_value(token)?;
1155                    txn.set_kv("next_batch_token", &serialized_token)?;
1156                }
1157
1158                if let Some(decryption_key) = &changes.backup_decryption_key {
1159                    let serialized_decryption_key = this.serialize_value(decryption_key)?;
1160                    txn.set_kv("recovery_key_v1", &serialized_decryption_key)?;
1161                }
1162
1163                if let Some(backup_version) = &changes.backup_version {
1164                    let serialized_backup_version = this.serialize_value(backup_version)?;
1165                    txn.set_kv("backup_version_v1", &serialized_backup_version)?;
1166                }
1167
1168                if let Some(pickle_key) = &changes.dehydrated_device_pickle_key {
1169                    let serialized_pickle_key = this.serialize_value(pickle_key)?;
1170                    txn.set_kv(DEHYDRATED_DEVICE_PICKLE_KEY, &serialized_pickle_key)?;
1171                }
1172
1173                for device in changes.devices.new.iter().chain(&changes.devices.changed) {
1174                    let user_id = this.encode_key("device", device.user_id().as_bytes());
1175                    let device_id = this.encode_key("device", device.device_id().as_bytes());
1176                    let data = this.serialize_value(&device)?;
1177                    txn.set_device(&user_id, &device_id, &data)?;
1178                }
1179
1180                for device in &changes.devices.deleted {
1181                    let user_id = this.encode_key("device", device.user_id().as_bytes());
1182                    let device_id = this.encode_key("device", device.device_id().as_bytes());
1183                    txn.delete_device(&user_id, &device_id)?;
1184                }
1185
1186                for identity in changes.identities.changed.iter().chain(&changes.identities.new) {
1187                    let user_id = this.encode_key("identity", identity.user_id().as_bytes());
1188                    let data = this.serialize_value(&identity)?;
1189                    txn.set_identity(&user_id, &data)?;
1190                }
1191
1192                for (session_id, sender_key, pickle) in &session_changes {
1193                    let serialized_session = this.serialize_value(&pickle)?;
1194                    txn.set_session(session_id, sender_key, &serialized_session)?;
1195                }
1196
1197                for (room_id, session_id, pickle, sender_key) in &inbound_session_changes {
1198                    let serialized_session = this.serialize_value(&pickle)?;
1199                    txn.set_inbound_group_session(
1200                        room_id,
1201                        session_id,
1202                        &serialized_session,
1203                        pickle.backed_up,
1204                        Some(sender_key),
1205                        Some(pickle.sender_data.to_type() as u8),
1206                    )?;
1207                }
1208
1209                for (room_id, pickle) in &outbound_session_changes {
1210                    let serialized_session = this.serialize_json(&pickle)?;
1211                    txn.set_outbound_group_session(room_id, &serialized_session)?;
1212                }
1213
1214                for hash in &changes.message_hashes {
1215                    let hash = rmp_serde::to_vec(hash)?;
1216                    txn.add_olm_hash(&hash)?;
1217                }
1218
1219                for request in changes.key_requests {
1220                    let request_id = this.encode_key("key_requests", request.request_id.as_bytes());
1221                    let serialized_request = this.serialize_value(&request)?;
1222                    txn.set_key_request(&request_id, request.sent_out, &serialized_request)?;
1223                }
1224
1225                for (room_id, data) in changes.withheld_session_info {
1226                    for (session_id, event) in data {
1227                        let session_id = this.encode_key("direct_withheld_info", session_id);
1228                        let room_id = this.encode_key("direct_withheld_info", &room_id);
1229                        let serialized_info = this.serialize_json(&event)?;
1230                        txn.set_direct_withheld(&session_id, &room_id, &serialized_info)?;
1231                    }
1232                }
1233
1234                for (room_id, settings) in changes.room_settings {
1235                    let room_id = this.encode_key("room_settings", room_id.as_bytes());
1236                    let value = this.serialize_value(&settings)?;
1237                    txn.set_room_settings(&room_id, &value)?;
1238                }
1239
1240                for secret in changes.secrets {
1241                    let secret_name =
1242                        this.encode_key("secrets_inbox", secret.secret_name.to_string());
1243                    let value = this.serialize_json(secret.secret.deref())?;
1244                    txn.set_secret(&secret_name, &value)?;
1245                }
1246
1247                for bundle in changes.received_room_key_bundles {
1248                    let room_id =
1249                        this.encode_key("received_room_key_bundle", &bundle.bundle_data.room_id);
1250                    let user_id = this.encode_key("received_room_key_bundle", &bundle.sender_user);
1251                    let value = this.serialize_value(&bundle)?;
1252                    txn.set_received_room_key_bundle(&room_id, &user_id, &value)?;
1253                }
1254
1255                for room in changes.room_key_backups_fully_downloaded {
1256                    let room_id = this.encode_key("room_key_backups_fully_downloaded", &room);
1257                    txn.set_has_downloaded_all_room_keys(&room_id)?;
1258                }
1259
1260                for (room, details) in changes.rooms_pending_key_bundle {
1261                    let room_id = this.encode_key("rooms_pending_key_bundle", &room);
1262                    let value = details.as_ref().map(|d| this.serialize_value(d)).transpose()?;
1263                    txn.set_room_pending_key_bundle(&room_id, value.as_deref())?;
1264                }
1265
1266                Ok::<_, Error>(())
1267            })
1268            .await?;
1269
1270        Ok(())
1271    }
1272
1273    async fn save_inbound_group_sessions(
1274        &self,
1275        sessions: Vec<InboundGroupSession>,
1276        backed_up_to_version: Option<&str>,
1277    ) -> matrix_sdk_crypto::store::Result<(), Self::Error> {
1278        // Sanity-check that the data in the sessions corresponds to backed_up_version
1279        sessions.iter().for_each(|s| {
1280            let backed_up = s.backed_up();
1281            if backed_up != backed_up_to_version.is_some() {
1282                warn!(
1283                    backed_up,
1284                    backed_up_to_version,
1285                    "Session backed-up flag does not correspond to backup version setting",
1286                );
1287            }
1288        });
1289
1290        // Currently, this store doesn't save the backup version separately, so this
1291        // just delegates to save_changes.
1292        self.save_changes(Changes { inbound_group_sessions: sessions, ..Changes::default() }).await
1293    }
1294
1295    async fn get_sessions(&self, sender_key: &str) -> Result<Option<Vec<Session>>> {
1296        let device_keys = self.get_own_device().await?.as_device_keys().clone();
1297
1298        let sessions: Vec<_> = self
1299            .read()
1300            .await?
1301            .get_sessions_for_sender_key(self.encode_key("session", sender_key.as_bytes()))
1302            .await?
1303            .into_iter()
1304            .map(|bytes| {
1305                let pickle = self.deserialize_value(&bytes)?;
1306                Session::from_pickle(device_keys.clone(), pickle).map_err(|_| Error::AccountUnset)
1307            })
1308            .collect::<Result<_>>()?;
1309
1310        if sessions.is_empty() { Ok(None) } else { Ok(Some(sessions)) }
1311    }
1312
1313    #[instrument(skip(self))]
1314    async fn get_inbound_group_session(
1315        &self,
1316        room_id: &RoomId,
1317        session_id: &str,
1318    ) -> Result<Option<InboundGroupSession>> {
1319        let session_id = self.encode_key("inbound_group_session", session_id);
1320        let Some((room_id_from_db, value, backed_up)) =
1321            self.read().await?.get_inbound_group_session(session_id).await?
1322        else {
1323            return Ok(None);
1324        };
1325
1326        let room_id = self.encode_key("inbound_group_session", room_id.as_bytes());
1327        if *room_id != room_id_from_db {
1328            warn!("expected room_id for session_id doesn't match what's in the DB");
1329            return Ok(None);
1330        }
1331
1332        Ok(Some(self.deserialize_and_unpickle_inbound_group_session(value, backed_up)?))
1333    }
1334
1335    async fn get_inbound_group_sessions(&self) -> Result<Vec<InboundGroupSession>> {
1336        self.read()
1337            .await?
1338            .get_inbound_group_sessions()
1339            .await?
1340            .into_iter()
1341            .map(|(value, backed_up)| {
1342                self.deserialize_and_unpickle_inbound_group_session(value, backed_up)
1343            })
1344            .collect()
1345    }
1346
1347    async fn get_inbound_group_sessions_by_room_id(
1348        &self,
1349        room_id: &RoomId,
1350    ) -> Result<Vec<InboundGroupSession>> {
1351        let room_id = self.encode_key("inbound_group_session", room_id.as_bytes());
1352        self.read()
1353            .await?
1354            .get_inbound_group_sessions_by_room_id(room_id)
1355            .await?
1356            .into_iter()
1357            .map(|(value, backed_up)| {
1358                self.deserialize_and_unpickle_inbound_group_session(value, backed_up)
1359            })
1360            .collect()
1361    }
1362
1363    async fn get_inbound_group_sessions_for_device_batch(
1364        &self,
1365        sender_key: Curve25519PublicKey,
1366        sender_data_type: SenderDataType,
1367        after_session_id: Option<String>,
1368        limit: usize,
1369    ) -> Result<Vec<InboundGroupSession>, Self::Error> {
1370        let after_session_id =
1371            after_session_id.map(|session_id| self.encode_key("inbound_group_session", session_id));
1372        let sender_key = self.encode_key("inbound_group_session", sender_key.to_base64());
1373
1374        self.read()
1375            .await?
1376            .get_inbound_group_sessions_for_device_batch(
1377                sender_key,
1378                sender_data_type,
1379                after_session_id,
1380                limit,
1381            )
1382            .await?
1383            .into_iter()
1384            .map(|(value, backed_up)| {
1385                self.deserialize_and_unpickle_inbound_group_session(value, backed_up)
1386            })
1387            .collect()
1388    }
1389
1390    async fn inbound_group_session_counts(
1391        &self,
1392        backup_version: Option<&str>,
1393    ) -> Result<RoomKeyCounts> {
1394        Ok(self.read().await?.get_inbound_group_session_counts(backup_version).await?)
1395    }
1396
1397    async fn inbound_group_sessions_for_backup(
1398        &self,
1399        _backup_version: &str,
1400        limit: usize,
1401    ) -> Result<Vec<InboundGroupSession>> {
1402        self.read()
1403            .await?
1404            .get_inbound_group_sessions_for_backup(limit)
1405            .await?
1406            .into_iter()
1407            .map(|value| self.deserialize_and_unpickle_inbound_group_session(value, false))
1408            .collect()
1409    }
1410
1411    async fn mark_inbound_group_sessions_as_backed_up(
1412        &self,
1413        _backup_version: &str,
1414        session_ids: &[(&RoomId, &str)],
1415    ) -> Result<()> {
1416        Ok(self
1417            .write()
1418            .await
1419            .mark_inbound_group_sessions_as_backed_up(
1420                session_ids
1421                    .iter()
1422                    .map(|(_, s)| self.encode_key("inbound_group_session", s))
1423                    .collect(),
1424            )
1425            .await?)
1426    }
1427
1428    async fn reset_backup_state(&self) -> Result<()> {
1429        Ok(self.write().await.reset_inbound_group_session_backup_state().await?)
1430    }
1431
1432    async fn load_backup_keys(&self) -> Result<BackupKeys> {
1433        let conn = self.read().await?;
1434
1435        let backup_version = conn
1436            .get_kv("backup_version_v1")
1437            .await?
1438            .map(|value| self.deserialize_value(&value))
1439            .transpose()?;
1440
1441        let decryption_key = conn
1442            .get_kv("recovery_key_v1")
1443            .await?
1444            .map(|value| self.deserialize_value(&value))
1445            .transpose()?;
1446
1447        Ok(BackupKeys { backup_version, decryption_key })
1448    }
1449
1450    async fn load_dehydrated_device_pickle_key(&self) -> Result<Option<DehydratedDeviceKey>> {
1451        let conn = self.read().await?;
1452
1453        conn.get_kv(DEHYDRATED_DEVICE_PICKLE_KEY)
1454            .await?
1455            .map(|value| self.deserialize_value(&value))
1456            .transpose()
1457    }
1458
1459    async fn delete_dehydrated_device_pickle_key(&self) -> Result<(), Self::Error> {
1460        Ok(self.write().await.clear_kv(DEHYDRATED_DEVICE_PICKLE_KEY).await?)
1461    }
1462    async fn get_outbound_group_session(
1463        &self,
1464        room_id: &RoomId,
1465    ) -> Result<Option<OutboundGroupSession>> {
1466        let room_id = self.encode_key("outbound_group_session", room_id.as_bytes());
1467        let Some(value) = self.read().await?.get_outbound_group_session(room_id).await? else {
1468            return Ok(None);
1469        };
1470
1471        let account_info = self.get_static_account().ok_or(Error::AccountUnset)?;
1472
1473        let pickle = self.deserialize_json(&value)?;
1474        let session = OutboundGroupSession::from_pickle(
1475            account_info.device_id,
1476            account_info.identity_keys,
1477            pickle,
1478        )
1479        .map_err(|_| Error::Unpickle)?;
1480
1481        return Ok(Some(session));
1482    }
1483
1484    async fn load_tracked_users(&self) -> Result<Vec<TrackedUser>> {
1485        self.read()
1486            .await?
1487            .get_tracked_users()
1488            .await?
1489            .iter()
1490            .map(|value| self.deserialize_value(value))
1491            .collect()
1492    }
1493
1494    async fn save_tracked_users(&self, tracked_users: &[(&UserId, bool)]) -> Result<()> {
1495        let users: Vec<(Key, Vec<u8>)> = tracked_users
1496            .iter()
1497            .map(|(u, d)| {
1498                let user_id = self.encode_key("tracked_users", u.as_bytes());
1499                let data =
1500                    self.serialize_value(&TrackedUser { user_id: (*u).into(), dirty: *d })?;
1501                Ok((user_id, data))
1502            })
1503            .collect::<Result<_>>()?;
1504
1505        Ok(self.write().await.add_tracked_users(users).await?)
1506    }
1507
1508    async fn get_device(
1509        &self,
1510        user_id: &UserId,
1511        device_id: &DeviceId,
1512    ) -> Result<Option<DeviceData>> {
1513        let user_id = self.encode_key("device", user_id.as_bytes());
1514        let device_id = self.encode_key("device", device_id.as_bytes());
1515        Ok(self
1516            .read()
1517            .await?
1518            .get_device(user_id, device_id)
1519            .await?
1520            .map(|value| self.deserialize_value(&value))
1521            .transpose()?)
1522    }
1523
1524    async fn get_user_devices(
1525        &self,
1526        user_id: &UserId,
1527    ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
1528        let user_id = self.encode_key("device", user_id.as_bytes());
1529        self.read()
1530            .await?
1531            .get_user_devices(user_id)
1532            .await?
1533            .into_iter()
1534            .map(|value| {
1535                let device: DeviceData = self.deserialize_value(&value)?;
1536                Ok((device.device_id().to_owned(), device))
1537            })
1538            .collect()
1539    }
1540
1541    async fn get_own_device(&self) -> Result<DeviceData> {
1542        let account_info = self.get_static_account().ok_or(Error::AccountUnset)?;
1543
1544        Ok(self
1545            .get_device(&account_info.user_id, &account_info.device_id)
1546            .await?
1547            .expect("We should be able to find our own device."))
1548    }
1549
1550    async fn get_user_identity(&self, user_id: &UserId) -> Result<Option<UserIdentityData>> {
1551        let user_id = self.encode_key("identity", user_id.as_bytes());
1552        Ok(self
1553            .read()
1554            .await?
1555            .get_user_identity(user_id)
1556            .await?
1557            .map(|value| self.deserialize_value(&value))
1558            .transpose()?)
1559    }
1560
1561    async fn is_message_known(
1562        &self,
1563        message_hash: &matrix_sdk_crypto::olm::OlmMessageHash,
1564    ) -> Result<bool> {
1565        let value = rmp_serde::to_vec(message_hash)?;
1566        Ok(self.read().await?.has_olm_hash(value).await?)
1567    }
1568
1569    async fn get_outgoing_secret_requests(
1570        &self,
1571        request_id: &TransactionId,
1572    ) -> Result<Option<GossipRequest>> {
1573        let request_id = self.encode_key("key_requests", request_id.as_bytes());
1574        Ok(self
1575            .read()
1576            .await?
1577            .get_outgoing_secret_request(request_id)
1578            .await?
1579            .map(|(value, sent_out)| self.deserialize_key_request(&value, sent_out))
1580            .transpose()?)
1581    }
1582
1583    async fn get_secret_request_by_info(
1584        &self,
1585        key_info: &SecretInfo,
1586    ) -> Result<Option<GossipRequest>> {
1587        let requests = self.read().await?.get_outgoing_secret_requests().await?;
1588        for (request, sent_out) in requests {
1589            let request = self.deserialize_key_request(&request, sent_out)?;
1590            if request.info == *key_info {
1591                return Ok(Some(request));
1592            }
1593        }
1594        Ok(None)
1595    }
1596
1597    async fn get_unsent_secret_requests(&self) -> Result<Vec<GossipRequest>> {
1598        self.read()
1599            .await?
1600            .get_unsent_secret_requests()
1601            .await?
1602            .iter()
1603            .map(|value| {
1604                let request = self.deserialize_key_request(value, false)?;
1605                Ok(request)
1606            })
1607            .collect()
1608    }
1609
1610    async fn delete_outgoing_secret_requests(&self, request_id: &TransactionId) -> Result<()> {
1611        let request_id = self.encode_key("key_requests", request_id.as_bytes());
1612        Ok(self.write().await.delete_key_request(request_id).await?)
1613    }
1614
1615    async fn get_secrets_from_inbox(
1616        &self,
1617        secret_name: &SecretName,
1618    ) -> Result<Vec<Zeroizing<String>>> {
1619        let secret_name = self.encode_key("secrets_inbox", secret_name.to_string());
1620
1621        self.read()
1622            .await?
1623            .get_secrets_from_inbox(secret_name)
1624            .await?
1625            .into_iter()
1626            .map(|value| self.deserialize_json(value.as_ref()).map(|value: String| value.into()))
1627            .collect()
1628    }
1629
1630    async fn delete_secrets_from_inbox(&self, secret_name: &SecretName) -> Result<()> {
1631        let secret_name = self.encode_key("secrets_inbox", secret_name.to_string());
1632        self.write().await.delete_secrets_from_inbox(secret_name).await
1633    }
1634
1635    async fn get_withheld_info(
1636        &self,
1637        room_id: &RoomId,
1638        session_id: &str,
1639    ) -> Result<Option<RoomKeyWithheldEntry>> {
1640        let room_id = self.encode_key("direct_withheld_info", room_id);
1641        let session_id = self.encode_key("direct_withheld_info", session_id);
1642
1643        self.read()
1644            .await?
1645            .get_direct_withheld_info(session_id, room_id)
1646            .await?
1647            .map(|value| {
1648                let info = self.deserialize_json::<RoomKeyWithheldEntry>(&value)?;
1649                Ok(info)
1650            })
1651            .transpose()
1652    }
1653
1654    async fn get_withheld_sessions_by_room_id(
1655        &self,
1656        room_id: &RoomId,
1657    ) -> matrix_sdk_crypto::store::Result<Vec<RoomKeyWithheldEntry>, Self::Error> {
1658        let room_id = self.encode_key("direct_withheld_info", room_id);
1659
1660        self.read()
1661            .await?
1662            .get_withheld_sessions_by_room_id(room_id)
1663            .await?
1664            .into_iter()
1665            .map(|value| self.deserialize_json(&value))
1666            .collect()
1667    }
1668
1669    async fn get_room_settings(&self, room_id: &RoomId) -> Result<Option<RoomSettings>> {
1670        let room_id = self.encode_key("room_settings", room_id.as_bytes());
1671        let Some(value) = self.read().await?.get_room_settings(room_id).await? else {
1672            return Ok(None);
1673        };
1674
1675        let settings = self.deserialize_value(&value)?;
1676
1677        return Ok(Some(settings));
1678    }
1679
1680    async fn get_received_room_key_bundle_data(
1681        &self,
1682        room_id: &RoomId,
1683        user_id: &UserId,
1684    ) -> Result<Option<StoredRoomKeyBundleData>> {
1685        let room_id = self.encode_key("received_room_key_bundle", room_id);
1686        let user_id = self.encode_key("received_room_key_bundle", user_id);
1687        self.read()
1688            .await?
1689            .get_received_room_key_bundle(room_id, user_id)
1690            .await?
1691            .map(|value| self.deserialize_value(&value))
1692            .transpose()
1693    }
1694
1695    async fn has_downloaded_all_room_keys(&self, room_id: &RoomId) -> Result<bool> {
1696        let room_id = self.encode_key("room_key_backups_fully_downloaded", room_id);
1697        self.read().await?.has_downloaded_all_room_keys(room_id).await
1698    }
1699
1700    async fn get_pending_key_bundle_details_for_room(
1701        &self,
1702        room_id: &RoomId,
1703    ) -> Result<Option<RoomPendingKeyBundleDetails>> {
1704        let room_id = self.encode_key("rooms_pending_key_bundle", room_id.as_bytes());
1705        let Some(value) = self.read().await?.get_room_pending_key_bundle(room_id).await? else {
1706            return Ok(None);
1707        };
1708
1709        let details = self.deserialize_value(&value)?;
1710        Ok(Some(details))
1711    }
1712
1713    async fn get_all_rooms_pending_key_bundles(&self) -> Result<Vec<RoomPendingKeyBundleDetails>> {
1714        let details = self.read().await?.get_all_rooms_pending_key_bundle().await?;
1715        let room_ids = details
1716            .into_iter()
1717            .map(|value| self.deserialize_value(&value))
1718            .collect::<Result<_, _>>()?;
1719        Ok(room_ids)
1720    }
1721
1722    async fn get_custom_value(&self, key: &str) -> Result<Option<Vec<u8>>> {
1723        let Some(serialized) = self.read().await?.get_kv(key).await? else {
1724            return Ok(None);
1725        };
1726        let value = if let Some(cipher) = &self.store_cipher {
1727            let encrypted = rmp_serde::from_slice(&serialized)?;
1728            cipher.decrypt_value_data(encrypted)?
1729        } else {
1730            serialized
1731        };
1732
1733        Ok(Some(value))
1734    }
1735
1736    async fn set_custom_value(&self, key: &str, value: Vec<u8>) -> Result<()> {
1737        let serialized = if let Some(cipher) = &self.store_cipher {
1738            let encrypted = cipher.encrypt_value_data(value)?;
1739            rmp_serde::to_vec_named(&encrypted)?
1740        } else {
1741            value
1742        };
1743
1744        self.write().await.set_kv(key, serialized).await?;
1745        Ok(())
1746    }
1747
1748    async fn remove_custom_value(&self, key: &str) -> Result<()> {
1749        let key = key.to_owned();
1750        self.write()
1751            .await
1752            .interact(move |conn| conn.execute("DELETE FROM kv WHERE key = ?1", (&key,)))
1753            .await
1754            .unwrap()?;
1755        Ok(())
1756    }
1757
1758    #[instrument(skip(self))]
1759    async fn try_take_leased_lock(
1760        &self,
1761        lease_duration_ms: u32,
1762        key: &str,
1763        holder: &str,
1764    ) -> Result<Option<CrossProcessLockGeneration>> {
1765        let key = key.to_owned();
1766        let holder = holder.to_owned();
1767
1768        let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
1769        let expiration = now + lease_duration_ms as u64;
1770
1771        // Learn about the `excluded` keyword in https://sqlite.org/lang_upsert.html.
1772        let generation = self
1773            .write()
1774            .await
1775            .with_transaction(move |txn| {
1776                txn.query_row(
1777                    "INSERT INTO lease_locks (key, holder, expiration)
1778                    VALUES (?1, ?2, ?3)
1779                    ON CONFLICT (key)
1780                    DO
1781                        UPDATE SET
1782                            holder = excluded.holder,
1783                            expiration = excluded.expiration,
1784                            generation =
1785                                CASE holder
1786                                    WHEN excluded.holder THEN generation
1787                                    ELSE generation + 1
1788                                END
1789                        WHERE
1790                            holder = excluded.holder
1791                            OR expiration < ?4
1792                    RETURNING generation
1793                    ",
1794                    (key, holder, expiration, now),
1795                    |row| row.get(0),
1796                )
1797                .optional()
1798            })
1799            .await?;
1800
1801        Ok(generation)
1802    }
1803
1804    async fn next_batch_token(&self) -> Result<Option<String>, Self::Error> {
1805        let conn = self.read().await?;
1806        if let Some(token) = conn.get_kv("next_batch_token").await? {
1807            let maybe_token: Option<String> = self.deserialize_value(&token)?;
1808            Ok(maybe_token)
1809        } else {
1810            Ok(None)
1811        }
1812    }
1813
1814    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1815        Ok(Some(self.pool.get().await?.get_db_size().await?))
1816    }
1817}
1818
1819#[cfg(test)]
1820mod tests {
1821    use std::{path::Path, sync::LazyLock};
1822
1823    use matrix_sdk_common::deserialized_responses::WithheldCode;
1824    use matrix_sdk_crypto::{
1825        cryptostore_integration_tests, cryptostore_integration_tests_time, olm::SenderDataType,
1826        store::CryptoStore,
1827    };
1828    use matrix_sdk_test::async_test;
1829    use ruma::{device_id, room_id, user_id};
1830    use similar_asserts::assert_eq;
1831    use tempfile::{TempDir, tempdir};
1832    use tokio::fs;
1833
1834    use super::SqliteCryptoStore;
1835    use crate::SqliteStoreConfig;
1836
1837    static TMP_DIR: LazyLock<TempDir> = LazyLock::new(|| tempdir().unwrap());
1838
1839    struct TestDb {
1840        // Needs to be kept alive because the Drop implementation for TempDir deletes the
1841        // directory.
1842        _dir: TempDir,
1843        database: SqliteCryptoStore,
1844    }
1845
1846    fn copy_db(data_path: &str) -> TempDir {
1847        let db_name = super::DATABASE_NAME;
1848
1849        let manifest_path = Path::new(env!("CARGO_MANIFEST_DIR")).join("../..");
1850        let database_path = manifest_path.join(data_path).join(db_name);
1851
1852        let tmpdir = tempdir().unwrap();
1853        let destination = tmpdir.path().join(db_name);
1854
1855        // Copy the test database to the tempdir so our test runs are idempotent.
1856        std::fs::copy(&database_path, destination).unwrap();
1857
1858        tmpdir
1859    }
1860
1861    async fn get_test_db(data_path: &str, passphrase: Option<&str>) -> TestDb {
1862        let tmpdir = copy_db(data_path);
1863
1864        let database = SqliteCryptoStore::open(tmpdir.path(), passphrase)
1865            .await
1866            .expect("Can't open the test store");
1867
1868        TestDb { _dir: tmpdir, database }
1869    }
1870
1871    #[async_test]
1872    async fn test_pool_size() {
1873        let store_open_config =
1874            SqliteStoreConfig::new(TMP_DIR.path().join("test_pool_size")).pool_max_size(42);
1875
1876        let store = SqliteCryptoStore::open_with_config(store_open_config).await.unwrap();
1877
1878        assert_eq!(store.pool.status().max_size, 42);
1879    }
1880
1881    /// Test that we didn't regress in our storage layer by loading data from a
1882    /// pre-filled database, or in other words use a test vector for this.
1883    #[async_test]
1884    async fn test_open_test_vector_store() {
1885        let TestDb { _dir: _, database } = get_test_db("testing/data/storage", None).await;
1886
1887        let account = database
1888            .load_account()
1889            .await
1890            .unwrap()
1891            .expect("The test database is prefilled with data, we should find an account");
1892
1893        let user_id = account.user_id();
1894        let device_id = account.device_id();
1895
1896        assert_eq!(
1897            user_id.as_str(),
1898            "@pjtest:synapse-oidc.element.dev",
1899            "The user ID should match to the one we expect."
1900        );
1901
1902        assert_eq!(
1903            device_id.as_str(),
1904            "v4TqgcuIH6",
1905            "The device ID should match to the one we expect."
1906        );
1907
1908        let device = database
1909            .get_device(user_id, device_id)
1910            .await
1911            .unwrap()
1912            .expect("Our own device should be found in the store.");
1913
1914        assert_eq!(device.device_id(), device_id);
1915        assert_eq!(device.user_id(), user_id);
1916
1917        assert_eq!(
1918            device.ed25519_key().expect("The device should have a Ed25519 key.").to_base64(),
1919            "+cxl1Gl3du5i7UJwfWnoRDdnafFF+xYdAiTYYhYLr8s"
1920        );
1921
1922        assert_eq!(
1923            device.curve25519_key().expect("The device should have a Curve25519 key.").to_base64(),
1924            "4SL9eEUlpyWSUvjljC5oMjknHQQJY7WZKo5S1KL/5VU"
1925        );
1926
1927        let identity = database
1928            .get_user_identity(user_id)
1929            .await
1930            .unwrap()
1931            .expect("The store should contain an identity.");
1932
1933        assert_eq!(identity.user_id(), user_id);
1934
1935        let identity = identity
1936            .own()
1937            .expect("The identity should be of the correct type, it should be our own identity.");
1938
1939        let master_key = identity
1940            .master_key()
1941            .get_first_key()
1942            .expect("Our own identity should have a master key");
1943
1944        assert_eq!(master_key.to_base64(), "iCUEtB1RwANeqRa5epDrblLk4mer/36sylwQ5hYY3oE");
1945    }
1946
1947    /// Test that we didn't regress in our storage layer by loading data from a
1948    /// pre-filled database, or in other words use a test vector for this.
1949    #[async_test]
1950    async fn test_open_test_vector_encrypted_store() {
1951        let TestDb { _dir: _, database } = get_test_db(
1952            "testing/data/storage/alice",
1953            Some(concat!(
1954                "/rCia2fYAJ+twCZ1Xm2mxFCYcmJdyzkdJjwtgXsziWpYS/UeNxnixuSieuwZXm+x1VsJHmWpl",
1955                "H+QIQBZpEGZtC9/S/l8xK+WOCesmET0o6yJ/KP73ofDtjBlnNpPwuHLKFpyTbyicpCgQ4UT+5E",
1956                "UBuJ08TY9Ujdf1D13k5kr5tSZUefDKKCuG1fCRqlU8ByRas1PMQsZxT2W8t7QgBrQiiGmhpo/O",
1957                "Ti4hfx97GOxncKcxTzppiYQNoHs/f15+XXQD7/oiCcqRIuUlXNsU6hRpFGmbYx2Pi1eyQViQCt",
1958                "B5dAEiSD0N8U81wXYnpynuTPtnL+hfnOJIn7Sy7mkERQeKg"
1959            )),
1960        )
1961        .await;
1962
1963        let account = database
1964            .load_account()
1965            .await
1966            .unwrap()
1967            .expect("The test database is prefilled with data, we should find an account");
1968
1969        let user_id = account.user_id();
1970        let device_id = account.device_id();
1971
1972        assert_eq!(
1973            user_id.as_str(),
1974            "@alice:localhost",
1975            "The user ID should match to the one we expect."
1976        );
1977
1978        assert_eq!(
1979            device_id.as_str(),
1980            "JVVORTHFXY",
1981            "The device ID should match to the one we expect."
1982        );
1983
1984        let tracked_users =
1985            database.load_tracked_users().await.expect("Should be tracking some users");
1986
1987        assert_eq!(tracked_users.len(), 6);
1988
1989        let known_users = vec![
1990            user_id!("@alice:localhost"),
1991            user_id!("@dehydration3:localhost"),
1992            user_id!("@eve:localhost"),
1993            user_id!("@bob:localhost"),
1994            user_id!("@malo:localhost"),
1995            user_id!("@carl:localhost"),
1996        ];
1997
1998        // load the identities
1999        for user_id in known_users {
2000            database.get_user_identity(user_id).await.expect("Should load this identity").unwrap();
2001        }
2002
2003        let carl_identity =
2004            database.get_user_identity(user_id!("@carl:localhost")).await.unwrap().unwrap();
2005
2006        assert_eq!(
2007            carl_identity.master_key().get_first_key().unwrap().to_base64(),
2008            "CdhKYYDeBDQveOioXEGWhTPCyzc63Irpar3CNyfun2Q"
2009        );
2010        assert!(!carl_identity.was_previously_verified());
2011
2012        let bob_identity =
2013            database.get_user_identity(user_id!("@bob:localhost")).await.unwrap().unwrap();
2014
2015        assert_eq!(
2016            bob_identity.master_key().get_first_key().unwrap().to_base64(),
2017            "COh2GYOJWSjem5QPRCaGp9iWV83IELG1IzLKW2S3pFY"
2018        );
2019        // Bob is verified so this flag should be set
2020        assert!(bob_identity.was_previously_verified());
2021
2022        let known_devices = vec![
2023            (device_id!("OPXQHCZSKW"), user_id!("@alice:localhost")),
2024            // a dehydrated one
2025            (
2026                device_id!("EvW+9IrGR10KVgVeZP25/KaPfx4R86FofVMcaz7VOho"),
2027                user_id!("@alice:localhost"),
2028            ),
2029            (device_id!("HEEFRFQENV"), user_id!("@alice:localhost")),
2030            (device_id!("JVVORTHFXY"), user_id!("@alice:localhost")),
2031            (device_id!("NQUWWSKKHS"), user_id!("@alice:localhost")),
2032            (device_id!("ORBLPFYCPG"), user_id!("@alice:localhost")),
2033            (device_id!("YXOWENSEGM"), user_id!("@dehydration3:localhost")),
2034            (device_id!("VXLFMYCHXC"), user_id!("@bob:localhost")),
2035            (device_id!("FDGDQAEWOW"), user_id!("@bob:localhost")),
2036            (device_id!("VXLFMYCHXC"), user_id!("@bob:localhost")),
2037            (device_id!("FDGDQAEWOW"), user_id!("@bob:localhost")),
2038            (device_id!("QKUKWJTTQC"), user_id!("@malo:localhost")),
2039            (device_id!("LOUXJECTFG"), user_id!("@malo:localhost")),
2040            (device_id!("MKKMAEVLPB"), user_id!("@carl:localhost")),
2041        ];
2042
2043        for (device_id, user_id) in known_devices {
2044            database.get_device(user_id, device_id).await.expect("Should load the device").unwrap();
2045        }
2046
2047        let known_sender_key_to_session_count = vec![
2048            ("FfYcYfDF4nWy+LHdK6CEpIMlFAQDORc30WUkghL06kM", 1),
2049            ("EvW+9IrGR10KVgVeZP25/KaPfx4R86FofVMcaz7VOho", 1),
2050            ("hAGsoA4a9M6wwEUX5Q1jux1i+tUngLi01n5AmhDoHTY", 1),
2051            ("aKqtSJymLzuoglWFwPGk1r/Vm2LE2hFESzXxn4RNjRM", 0),
2052            ("zHK1psCrgeMn0kaz8hcdvA3INyar9jg1yfrSp0p1pHo", 1),
2053            ("1QmBA316Wj5jIFRwNOti6N6Xh/vW0bsYCcR4uPfy8VQ", 1),
2054            ("g5ef2vZF3VXgSPyODIeXpyHIRkuthvLhGvd6uwYggWU", 1),
2055            ("o7hfupPd1VsNkRIvdlH6ujrEJFSKjFCGbxhAd31XxjI", 1),
2056            ("Z3RxKQLxY7xpP+ZdOGR2SiNE37SrvmRhW7GPu1UGdm8", 1),
2057            ("GDomaav8NiY3J+dNEeApJm+O0FooJ3IpVaIyJzCN4w4", 1),
2058            ("7m7fqkHyEr47V5s/KjaxtJMOr3pSHrrns2q2lWpAQi8", 0),
2059            ("9psAkPUIF8vNbWbnviX3PlwRcaeO53EHJdNtKpTY1X0", 0),
2060            ("mqanh+ztw5oRtpqYQgLGW864i6NY2zpoKMIlrcyC+Aw", 0),
2061            ("fJU/TJdbsv7tVbbpHw1Ke73ziElnM32cNhP2WIg4T10", 0),
2062            ("sUIeFeFcCZoa5IC6nJ6Vrbvztcyx09m8BBg57XKRClg", 1),
2063        ];
2064
2065        for (id, count) in known_sender_key_to_session_count {
2066            let olm_sessions =
2067                database.get_sessions(id).await.expect("Should have some olm sessions");
2068
2069            println!("### Session id: {id:?}");
2070            assert_eq!(olm_sessions.map_or(0, |v| v.len()), count);
2071        }
2072
2073        let inbound_group_sessions = database.get_inbound_group_sessions().await.unwrap();
2074        assert_eq!(inbound_group_sessions.len(), 15);
2075        let known_inbound_group_sessions = vec![
2076            (
2077                "5hNAxrLai3VI0LKBwfh3wLfksfBFWds0W1a5X5/vSXA",
2078                room_id!("!SRstFdydzrGwJYtVfm:localhost"),
2079            ),
2080            (
2081                "M6d2eU3y54gaYTbvGSlqa/xc1Az35l56Cp9sxzHWO4g",
2082                room_id!("!SRstFdydzrGwJYtVfm:localhost"),
2083            ),
2084            (
2085                "IrydwXkRk2N2AqUMIVmLL3oJgMq14R9KId0P/uSD100",
2086                room_id!("!SRstFdydzrGwJYtVfm:localhost"),
2087            ),
2088            (
2089                "Y74+l9jTo7N5UF+GQwdpgJGe4sn1+QtWITq7BxulHIE",
2090                room_id!("!SRstFdydzrGwJYtVfm:localhost"),
2091            ),
2092            (
2093                "HpJxQR57WbQGdY6w2Q+C16znVvbXGa+JvQdRoMpWbXg",
2094                room_id!("!SRstFdydzrGwJYtVfm:localhost"),
2095            ),
2096            (
2097                "Xetvi+ydFkZt8dpONGFbEusQb/Chc2V0XlLByZhsbgE",
2098                room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
2099            ),
2100            (
2101                "wv/WN/39akyerIXczTaIpjAuLnwgXKRtbXFSEHiJqxo",
2102                room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
2103            ),
2104            (
2105                "nA4gQwL//Cm8OdlyjABl/jChbPT/cP5V4Sd8iuE6H0s",
2106                room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
2107            ),
2108            (
2109                "bAAgqFeRDTjfEqL6Qf/c9mk55zoNDCSlboAIRd6b0hw",
2110                room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
2111            ),
2112            (
2113                "exPbsMMdGfAG2qmDdFtpAn+koVprfzS0Zip/RA9QRCE",
2114                room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
2115            ),
2116            (
2117                "h+om7oSw/ZV94fcKaoe8FGXJwQXWOfKQfzbGgNWQILI",
2118                room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
2119            ),
2120            (
2121                "ul3VXonpgk4lO2L3fEWubP/nxsTmLHqu5v8ZM9vHEcw",
2122                room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
2123            ),
2124            (
2125                "JXY15UxC3az2mwg8uX4qwgxfvCM4aygiIWMcdNiVQoc",
2126                room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
2127            ),
2128            (
2129                "OGB9lObr9kWUvha9tB5sMfOF/Mztk24JwQz/nwg3iFQ",
2130                room_id!("!OgRiTRMaUzLdpCeDBM:localhost"),
2131            ),
2132            (
2133                "SFkHcbxjUOYF7mUAYI/oEMDZFaXszQbCN6Jza7iemj0",
2134                room_id!("!OgRiTRMaUzLdpCeDBM:localhost"),
2135            ),
2136        ];
2137
2138        // ensure we can load them all
2139        for (session_id, room_id) in &known_inbound_group_sessions {
2140            database
2141                .get_inbound_group_session(room_id, session_id)
2142                .await
2143                .expect("Should be able to load inbound group session")
2144                .unwrap();
2145        }
2146
2147        let bob_sender_verified = database
2148            .get_inbound_group_session(
2149                room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"),
2150                "exPbsMMdGfAG2qmDdFtpAn+koVprfzS0Zip/RA9QRCE",
2151            )
2152            .await
2153            .unwrap()
2154            .unwrap();
2155
2156        assert_eq!(bob_sender_verified.sender_data.to_type(), SenderDataType::SenderVerified);
2157        assert!(bob_sender_verified.backed_up());
2158        assert!(!bob_sender_verified.has_been_imported());
2159
2160        let alice_unknown_device = database
2161            .get_inbound_group_session(
2162                room_id!("!SRstFdydzrGwJYtVfm:localhost"),
2163                "IrydwXkRk2N2AqUMIVmLL3oJgMq14R9KId0P/uSD100",
2164            )
2165            .await
2166            .unwrap()
2167            .unwrap();
2168
2169        assert_eq!(alice_unknown_device.sender_data.to_type(), SenderDataType::UnknownDevice);
2170        assert!(alice_unknown_device.backed_up());
2171        assert!(alice_unknown_device.has_been_imported());
2172
2173        let carl_tofu_session = database
2174            .get_inbound_group_session(
2175                room_id!("!OgRiTRMaUzLdpCeDBM:localhost"),
2176                "OGB9lObr9kWUvha9tB5sMfOF/Mztk24JwQz/nwg3iFQ",
2177            )
2178            .await
2179            .unwrap()
2180            .unwrap();
2181
2182        assert_eq!(carl_tofu_session.sender_data.to_type(), SenderDataType::SenderUnverified);
2183        assert!(carl_tofu_session.backed_up());
2184        assert!(!carl_tofu_session.has_been_imported());
2185
2186        // Load outbound sessions
2187        database
2188            .get_outbound_group_session(room_id!("!OgRiTRMaUzLdpCeDBM:localhost"))
2189            .await
2190            .unwrap()
2191            .unwrap();
2192        database
2193            .get_outbound_group_session(room_id!("!ZIwZcFqZVAYLAqVjfV:localhost"))
2194            .await
2195            .unwrap()
2196            .unwrap();
2197        database
2198            .get_outbound_group_session(room_id!("!SRstFdydzrGwJYtVfm:localhost"))
2199            .await
2200            .unwrap()
2201            .unwrap();
2202
2203        let withheld_info = database
2204            .get_withheld_info(
2205                room_id!("!OgRiTRMaUzLdpCeDBM:localhost"),
2206                "SASgZ+EklvAF4QxJclMlDRlmL0fAMjAJJIKFMdb4Ht0",
2207            )
2208            .await
2209            .expect("This session should be withheld")
2210            .unwrap();
2211
2212        assert_eq!(withheld_info.content.withheld_code(), WithheldCode::Unverified);
2213
2214        let backup_keys = database.load_backup_keys().await.expect("backup key should be cached");
2215        assert_eq!(backup_keys.backup_version.unwrap(), "6");
2216        assert!(backup_keys.decryption_key.is_some());
2217    }
2218
2219    /// Test that we migrate the secrets inbox properly.
2220    ///
2221    /// The format for the secrets inbox changed in version 15.  Previously, the
2222    /// secrets inbox stored a full `GossippedSecrets` struct.  In version 15,
2223    /// the secrets inbox now stores only the secret.
2224    #[async_test]
2225    async fn test_secrets_inbox_migration() {
2226        use std::ops::Deref;
2227
2228        use matrix_sdk_crypto::{
2229            GossipRequest, GossippedSecret, SecretInfo,
2230            types::events::{
2231                olm_v1::{DecryptedSecretSendEvent, OlmV1Keys},
2232                secret_send::SecretSendContent,
2233            },
2234            vodozemac::Ed25519SecretKey,
2235        };
2236        use ruma::{TransactionId, events::secret::request::SecretName, owned_user_id};
2237
2238        use crate::utils::{EncryptableStore, SqliteAsyncConnExt};
2239
2240        // Create a database with version 16
2241        let tmpdir = tempdir().unwrap();
2242        let config = SqliteStoreConfig::new(tmpdir.path());
2243        let pool = config.build_pool_of_connections(super::DATABASE_NAME).unwrap();
2244        let conn = pool.get().await.unwrap();
2245        let version = super::initialize_store(&conn, 0).await.unwrap();
2246        let old_data_store =
2247            SqliteCryptoStore::create_raw(config.secret.clone(), pool, conn).await.unwrap();
2248        super::run_migrations(&old_data_store, version, Some(16)).await.unwrap();
2249        old_data_store.write().await.wal_checkpoint().await;
2250
2251        // Store a secret using the old format
2252        let secret = GossippedSecret {
2253            secret_name: SecretName::CrossSigningMasterKey,
2254            gossip_request: GossipRequest {
2255                request_recipient: owned_user_id!("@alice:example.com"),
2256                request_id: TransactionId::new(),
2257                info: SecretInfo::SecretRequest(SecretName::CrossSigningMasterKey),
2258                sent_out: true,
2259            },
2260            event: DecryptedSecretSendEvent {
2261                sender: owned_user_id!("@alice:example.com"),
2262                recipient: owned_user_id!("@alice:example.com"),
2263                keys: OlmV1Keys { ed25519: Ed25519SecretKey::new().public_key() },
2264                recipient_keys: OlmV1Keys { ed25519: Ed25519SecretKey::new().public_key() },
2265                sender_device_keys: None,
2266                content: SecretSendContent::new(
2267                    "abc".into(),
2268                    "It is a secret to everybody".to_owned(),
2269                ),
2270            },
2271        };
2272        let value = old_data_store.serialize_json(&secret).unwrap();
2273        old_data_store
2274            .write()
2275            .await
2276            .prepare("INSERT INTO secrets (secret_name, data) VALUES (?1, ?2)", |mut stmt| {
2277                stmt.execute((SecretName::CrossSigningMasterKey.to_string(), value))
2278            })
2279            .await
2280            .unwrap();
2281
2282        // After we open the store, the data will be migrated
2283        let store = SqliteCryptoStore::open_with_config(config).await.unwrap();
2284
2285        // and we should be able to read the secrets from the inbox
2286        let secrets =
2287            store.get_secrets_from_inbox(&SecretName::CrossSigningMasterKey).await.unwrap();
2288        assert_eq!(secrets.len(), 1);
2289        assert_eq!(secrets[0].deref(), "It is a secret to everybody");
2290    }
2291
2292    async fn get_store(
2293        name: &str,
2294        passphrase: Option<&str>,
2295        clear_data: bool,
2296    ) -> SqliteCryptoStore {
2297        let tmpdir_path = TMP_DIR.path().join(name);
2298
2299        if clear_data {
2300            let _ = fs::remove_dir_all(&tmpdir_path).await;
2301        }
2302
2303        SqliteCryptoStore::open(tmpdir_path.to_str().unwrap(), passphrase)
2304            .await
2305            .expect("Can't create a secret protected store")
2306    }
2307
2308    cryptostore_integration_tests!();
2309    cryptostore_integration_tests_time!();
2310}
2311
2312#[cfg(test)]
2313mod encrypted_tests {
2314    use std::sync::LazyLock;
2315
2316    use matrix_sdk_crypto::{cryptostore_integration_tests, cryptostore_integration_tests_time};
2317    use tempfile::{TempDir, tempdir};
2318    use tokio::fs;
2319
2320    use super::SqliteCryptoStore;
2321
2322    static TMP_DIR: LazyLock<TempDir> = LazyLock::new(|| tempdir().unwrap());
2323
2324    async fn get_store(
2325        name: &str,
2326        passphrase: Option<&str>,
2327        clear_data: bool,
2328    ) -> SqliteCryptoStore {
2329        let tmpdir_path = TMP_DIR.path().join(name);
2330        let pass = passphrase.unwrap_or("default_test_password");
2331
2332        if clear_data {
2333            let _ = fs::remove_dir_all(&tmpdir_path).await;
2334        }
2335
2336        SqliteCryptoStore::open(tmpdir_path.to_str().unwrap(), Some(pass))
2337            .await
2338            .expect("Can't create a secret protected store")
2339    }
2340
2341    cryptostore_integration_tests!();
2342    cryptostore_integration_tests_time!();
2343}