Skip to main content

matrix_sdk_sqlite/
state_store.rs

1use std::{
2    borrow::Cow,
3    collections::{BTreeMap, BTreeSet, HashMap},
4    fmt, iter,
5    path::Path,
6    str::FromStr as _,
7    sync::Arc,
8};
9
10use async_trait::async_trait;
11use matrix_sdk_base::{
12    MinimalRoomMemberEvent, ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK, RoomInfo,
13    RoomMemberships, RoomState, StateChanges, StateStore, StateStoreDataKey, StateStoreDataValue,
14    deserialized_responses::{DisplayName, RawAnySyncOrStrippedState, SyncOrStrippedState},
15    store::{
16        ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError,
17        QueuedRequest, QueuedRequestKind, RoomLoadSettings, SentRequestKey,
18        StoredThreadSubscription, ThreadSubscriptionStatus, migration_helpers::RoomInfoV1,
19    },
20};
21use matrix_sdk_store_encryption::StoreCipher;
22use ruma::{
23    CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
24    OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
25    canonical_json::{RedactedBecause, redact},
26    events::{
27        AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
28        GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
29        presence::PresenceEvent,
30        receipt::{Receipt, ReceiptThread, ReceiptType},
31        room::{
32            create::RoomCreateEventContent,
33            member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
34        },
35    },
36    serde::Raw,
37};
38use rusqlite::{OptionalExtension, Transaction};
39use serde::{Deserialize, Serialize};
40use tokio::{
41    fs,
42    sync::{Mutex, OwnedMutexGuard},
43};
44use tracing::{debug, instrument, warn};
45
46use crate::{
47    OpenStoreError, Secret, SqliteStoreConfig,
48    connection::{Connection as SqliteAsyncConn, Pool as SqlitePool},
49    error::{Error, Result},
50    utils::{
51        EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
52        SqliteKeyValueStoreConnExt, repeat_vars,
53    },
54};
55
56mod keys {
57    // Tables
58    pub const KV_BLOB: &str = "kv_blob";
59    pub const ROOM_INFO: &str = "room_info";
60    pub const STATE_EVENT: &str = "state_event";
61    pub const GLOBAL_ACCOUNT_DATA: &str = "global_account_data";
62    pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
63    pub const MEMBER: &str = "member";
64    pub const PROFILE: &str = "profile";
65    pub const RECEIPT: &str = "receipt";
66    pub const DISPLAY_NAME: &str = "display_name";
67    pub const SEND_QUEUE: &str = "send_queue_events";
68    pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
69    pub const THREAD_SUBSCRIPTIONS: &str = "thread_subscriptions";
70}
71
72/// The filename used for the SQLITE database file used by the state store.
73pub const DATABASE_NAME: &str = "matrix-sdk-state.sqlite3";
74
75/// An SQLite-based state store.
76#[derive(Clone)]
77pub struct SqliteStateStore {
78    store_cipher: Option<Arc<StoreCipher>>,
79
80    /// The pool of connections.
81    pool: SqlitePool,
82
83    /// We make the difference between connections for read operations, and for
84    /// write operations. We keep a single connection apart from write
85    /// operations. All other connections are used for read operations. The
86    /// lock is used to ensure there is one owner at a time.
87    write_connection: Arc<Mutex<SqliteAsyncConn>>,
88}
89
90#[cfg(not(tarpaulin_include))]
91impl fmt::Debug for SqliteStateStore {
92    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93        f.debug_struct("SqliteStateStore").finish_non_exhaustive()
94    }
95}
96
97impl SqliteStateStore {
98    /// Open the SQLite-based state store at the given path using the given
99    /// given passphrase to encrypt private data.
100    pub async fn open(
101        path: impl AsRef<Path>,
102        passphrase: Option<&str>,
103    ) -> Result<Self, OpenStoreError> {
104        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
105    }
106
107    /// Open the SQLite-based state store at the given path using the given
108    /// key to encrypt private data.
109    pub async fn open_with_key(
110        path: impl AsRef<Path>,
111        key: Option<&[u8; 32]>,
112    ) -> Result<Self, OpenStoreError> {
113        Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
114    }
115
116    /// Open the SQLite-based state store with the config open config.
117    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
118        fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir)?;
119
120        let pool = config.build_pool_of_connections(DATABASE_NAME)?;
121
122        let this = Self::open_with_pool(pool, config.secret).await?;
123        this.pool.get().await?.apply_runtime_config(config.runtime_config).await?;
124
125        Ok(this)
126    }
127
128    /// Create an SQLite-based state store using the given SQLite database pool.
129    /// The given secret will be used to encrypt private data.
130    pub async fn open_with_pool(
131        pool: SqlitePool,
132        secret: Option<Secret>,
133    ) -> Result<Self, OpenStoreError> {
134        let conn = pool.get().await?;
135
136        let mut version = conn.db_version().await?;
137
138        if version == 0 {
139            init(&conn).await?;
140            version = 1;
141        }
142
143        let store_cipher = match secret {
144            Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
145            None => None,
146        };
147        let this = Self {
148            store_cipher,
149            pool,
150            // Use `conn` as our selected write connections.
151            write_connection: Arc::new(Mutex::new(conn)),
152        };
153        this.run_migrations(version, None).await?;
154
155        this.read().await?.wal_checkpoint().await;
156
157        Ok(this)
158    }
159
160    /// Run database migrations from the given `from` version to the given `to`
161    /// version
162    ///
163    /// If `to` is `None`, the current database version will be used.
164    async fn run_migrations(&self, from: u8, to: Option<u8>) -> Result<()> {
165        if to == Some(1) {
166            return Ok(());
167        }
168
169        let conn = self.write().await;
170
171        if from < 2 {
172            debug!("Upgrading database to version 2");
173            let this = self.clone();
174            conn.with_transaction(move |txn| {
175                // Create new table.
176                txn.execute_batch(include_str!(
177                    "../migrations/state_store/002_a_create_new_room_info.sql"
178                ))?;
179
180                // Migrate data to new table.
181                for data in txn
182                    .prepare("SELECT data FROM room_info")?
183                    .query_map((), |row| row.get::<_, Vec<u8>>(0))?
184                {
185                    let data = data?;
186                    let room_info: RoomInfoV1 = this.deserialize_json(&data)?;
187
188                    let room_id = this.encode_key(keys::ROOM_INFO, room_info.room_id());
189                    let state = this
190                        .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
191                    txn.prepare_cached(
192                        "INSERT OR REPLACE INTO new_room_info (room_id, state, data)
193                         VALUES (?, ?, ?)",
194                    )?
195                    .execute((room_id, state, data))?;
196                }
197
198                // Replace old table.
199                txn.execute_batch(include_str!(
200                    "../migrations/state_store/002_b_replace_room_info.sql"
201                ))?;
202
203                txn.set_db_version(2)?;
204                Result::<_, Error>::Ok(())
205            })
206            .await?;
207        }
208
209        if to == Some(2) {
210            return Ok(());
211        }
212
213        // Migration to v3: RoomInfo format has changed.
214        if from < 3 {
215            debug!("Upgrading database to version 3");
216            let this = self.clone();
217            conn.with_transaction(move |txn| {
218                // Migrate data .
219                for data in txn
220                    .prepare("SELECT data FROM room_info")?
221                    .query_map((), |row| row.get::<_, Vec<u8>>(0))?
222                {
223                    let data = data?;
224                    let room_info_v1: RoomInfoV1 = this.deserialize_json(&data)?;
225
226                    // Get the `m.room.create` event from the room state.
227                    let room_id = this.encode_key(keys::STATE_EVENT, room_info_v1.room_id());
228                    let event_type =
229                        this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
230                    let create_res = txn
231                        .prepare(
232                            "SELECT stripped, data FROM state_event
233                             WHERE room_id = ? AND event_type = ?",
234                        )?
235                        .query_row([room_id, event_type], |row| {
236                            Ok((row.get::<_, bool>(0)?, row.get::<_, Vec<u8>>(1)?))
237                        })
238                        .optional()?;
239
240                    let create = create_res.and_then(|(stripped, data)| {
241                        let create = if stripped {
242                            SyncOrStrippedState::<RoomCreateEventContent>::Stripped(
243                                this.deserialize_json(&data).ok()?,
244                            )
245                        } else {
246                            SyncOrStrippedState::Sync(this.deserialize_json(&data).ok()?)
247                        };
248                        Some(create)
249                    });
250
251                    let migrated_room_info = room_info_v1.migrate(create.as_ref());
252
253                    let data = this.serialize_json(&migrated_room_info)?;
254                    let room_id = this.encode_key(keys::ROOM_INFO, migrated_room_info.room_id());
255                    txn.prepare_cached("UPDATE room_info SET data = ? WHERE room_id = ?")?
256                        .execute((data, room_id))?;
257                }
258
259                txn.set_db_version(3)?;
260                Result::<_, Error>::Ok(())
261            })
262            .await?;
263        }
264
265        if to == Some(3) {
266            return Ok(());
267        }
268
269        if from < 4 {
270            debug!("Upgrading database to version 4");
271            conn.with_transaction(move |txn| {
272                // Create new table.
273                txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
274                txn.set_db_version(4)
275            })
276            .await?;
277        }
278
279        if to == Some(4) {
280            return Ok(());
281        }
282
283        if from < 5 {
284            debug!("Upgrading database to version 5");
285            conn.with_transaction(move |txn| {
286                // Create new table.
287                txn.execute_batch(include_str!(
288                    "../migrations/state_store/004_send_queue_with_roomid_value.sql"
289                ))?;
290                txn.set_db_version(4)
291            })
292            .await?;
293        }
294
295        if to == Some(5) {
296            return Ok(());
297        }
298
299        if from < 6 {
300            debug!("Upgrading database to version 6");
301            conn.with_transaction(move |txn| {
302                // Create new table.
303                txn.execute_batch(include_str!(
304                    "../migrations/state_store/005_send_queue_dependent_events.sql"
305                ))?;
306                txn.set_db_version(6)
307            })
308            .await?;
309        }
310
311        if to == Some(6) {
312            return Ok(());
313        }
314
315        if from < 7 {
316            debug!("Upgrading database to version 7");
317            conn.with_transaction(move |txn| {
318                // Drop media table.
319                txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
320                txn.set_db_version(7)
321            })
322            .await?;
323        }
324
325        if to == Some(7) {
326            return Ok(());
327        }
328
329        if from < 8 {
330            debug!("Upgrading database to version 8");
331            // Replace all existing wedged events with a generic error.
332            let error = QueueWedgeError::GenericApiError {
333                msg: "local echo failed to send in a previous session".into(),
334            };
335            let default_err = self.serialize_value(&error)?;
336
337            conn.with_transaction(move |txn| {
338                // Update send queue table to persist the wedge reason if any.
339                txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?;
340
341                // Migrate the data, add a generic error for currently wedged events
342
343                for wedged_entries in txn
344                    .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")?
345                    .query_map((), |row| {
346                        Ok(
347                            (row.get::<_, Vec<u8>>(0)?,row.get::<_, String>(1)?)
348                        )
349                    })? {
350
351                    let (room_id, transaction_id) = wedged_entries?;
352
353                    txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?
354                        .execute((default_err.clone(), room_id, transaction_id))?;
355                }
356
357
358                // Clean up the table now that data is migrated
359                txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?;
360
361                txn.set_db_version(8)
362            })
363                .await?;
364        }
365
366        if to == Some(8) {
367            return Ok(());
368        }
369
370        if from < 9 {
371            debug!("Upgrading database to version 9");
372            conn.with_transaction(move |txn| {
373                // Run the migration.
374                txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?;
375                txn.set_db_version(9)
376            })
377            .await?;
378        }
379
380        if to == Some(9) {
381            return Ok(());
382        }
383
384        if from < 10 {
385            debug!("Upgrading database to version 10");
386            conn.with_transaction(move |txn| {
387                // Run the migration.
388                txn.execute_batch(include_str!(
389                    "../migrations/state_store/009_send_queue_priority.sql"
390                ))?;
391                txn.set_db_version(10)
392            })
393            .await?;
394        }
395
396        if to == Some(10) {
397            return Ok(());
398        }
399
400        if from < 11 {
401            debug!("Upgrading database to version 11");
402            conn.with_transaction(move |txn| {
403                // Run the migration.
404                txn.execute_batch(include_str!(
405                    "../migrations/state_store/010_send_queue_enqueue_time.sql"
406                ))?;
407                txn.set_db_version(11)
408            })
409            .await?;
410        }
411
412        if to == Some(11) {
413            return Ok(());
414        }
415
416        if from < 12 {
417            debug!("Upgrading database to version 12");
418            // Defragment the DB and optimize its size on the filesystem.
419            // This should have been run in the migration for version 7, to reduce the size
420            // of the DB as we removed the media cache.
421            conn.vacuum().await?;
422            conn.set_kv("version", vec![12]).await?;
423        }
424
425        if to == Some(12) {
426            return Ok(());
427        }
428
429        if from < 13 {
430            debug!("Upgrading database to version 13");
431            conn.with_transaction(move |txn| {
432                // Run the migration.
433                txn.execute_batch(include_str!(
434                    "../migrations/state_store/011_thread_subscriptions.sql"
435                ))?;
436                txn.set_db_version(13)
437            })
438            .await?;
439        }
440
441        if to == Some(13) {
442            return Ok(());
443        }
444
445        if from < 14 {
446            debug!("Upgrading database to version 14");
447            conn.with_transaction(move |txn| {
448                // Run the migration.
449                txn.execute_batch(include_str!(
450                    "../migrations/state_store/012_thread_subscriptions_bumpstamp.sql"
451                ))?;
452                txn.set_db_version(14)
453            })
454            .await?;
455        }
456
457        if to == Some(14) {
458            return Ok(());
459        }
460
461        if from < 15 {
462            debug!("Upgrading database to version 15");
463            conn.with_transaction(move |txn| {
464                // Run the migration.
465                txn.execute_batch(include_str!(
466                    "../migrations/state_store/013_send_queue_new_parent_key_format.sql"
467                ))?;
468                txn.set_db_version(15)
469            })
470            .await?;
471        }
472
473        if to == Some(15) {
474            return Ok(());
475        }
476
477        Ok(())
478    }
479
480    fn encode_state_store_data_key(&self, key: StateStoreDataKey<'_>) -> Key {
481        let key_s = match key {
482            StateStoreDataKey::SyncToken => Cow::Borrowed(StateStoreDataKey::SYNC_TOKEN),
483            StateStoreDataKey::SupportedVersions => {
484                Cow::Borrowed(StateStoreDataKey::SUPPORTED_VERSIONS)
485            }
486            StateStoreDataKey::WellKnown => Cow::Borrowed(StateStoreDataKey::WELL_KNOWN),
487            StateStoreDataKey::Filter(f) => {
488                Cow::Owned(format!("{}:{f}", StateStoreDataKey::FILTER))
489            }
490            StateStoreDataKey::UserAvatarUrl(u) => {
491                Cow::Owned(format!("{}:{u}", StateStoreDataKey::USER_AVATAR_URL))
492            }
493            StateStoreDataKey::RecentlyVisitedRooms(b) => {
494                Cow::Owned(format!("{}:{b}", StateStoreDataKey::RECENTLY_VISITED_ROOMS))
495            }
496            StateStoreDataKey::UtdHookManagerData => {
497                Cow::Borrowed(StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
498            }
499            StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
500                Cow::Borrowed(StateStoreDataKey::ONE_TIME_KEY_ALREADY_UPLOADED)
501            }
502            StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
503                if let Some(thread_root) = thread_root {
504                    Cow::Owned(format!(
505                        "{}:{room_id}:{thread_root}",
506                        StateStoreDataKey::COMPOSER_DRAFT
507                    ))
508                } else {
509                    Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::COMPOSER_DRAFT))
510                }
511            }
512            StateStoreDataKey::SeenKnockRequests(room_id) => {
513                Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::SEEN_KNOCK_REQUESTS))
514            }
515            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
516                Cow::Borrowed(StateStoreDataKey::THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS)
517            }
518        };
519
520        self.encode_key(keys::KV_BLOB, &*key_s)
521    }
522
523    fn encode_presence_key(&self, user_id: &UserId) -> Key {
524        self.encode_key(keys::KV_BLOB, format!("presence:{user_id}"))
525    }
526
527    fn encode_custom_key(&self, key: &[u8]) -> Key {
528        let mut full_key = b"custom:".to_vec();
529        full_key.extend(key);
530        self.encode_key(keys::KV_BLOB, full_key)
531    }
532
533    /// Acquire a connection for executing read operations.
534    #[instrument(skip_all)]
535    async fn read(&self) -> Result<SqliteAsyncConn> {
536        Ok(self.pool.get().await?)
537    }
538
539    /// Acquire a connection for executing write operations.
540    #[instrument(skip_all)]
541    async fn write(&self) -> OwnedMutexGuard<SqliteAsyncConn> {
542        self.write_connection.clone().lock_owned().await
543    }
544
545    fn remove_maybe_stripped_room_data(
546        &self,
547        txn: &Transaction<'_>,
548        room_id: &RoomId,
549        stripped: bool,
550    ) -> rusqlite::Result<()> {
551        let state_event_room_id = self.encode_key(keys::STATE_EVENT, room_id);
552        txn.remove_room_state_events(&state_event_room_id, Some(stripped))?;
553
554        let member_room_id = self.encode_key(keys::MEMBER, room_id);
555        txn.remove_room_members(&member_room_id, Some(stripped))
556    }
557
558    pub async fn vacuum(&self) -> Result<()> {
559        self.write_connection.lock().await.vacuum().await
560    }
561
562    pub async fn get_db_size(&self) -> Result<Option<usize>> {
563        let read_conn = self.pool.get().await?;
564        Ok(Some(read_conn.get_db_size().await?))
565    }
566}
567
568impl EncryptableStore for SqliteStateStore {
569    fn get_cypher(&self) -> Option<&StoreCipher> {
570        self.store_cipher.as_deref()
571    }
572}
573
574/// Initialize the database.
575async fn init(conn: &SqliteAsyncConn) -> Result<()> {
576    // First turn on WAL mode, this can't be done in the transaction, it fails with
577    // the error message: "cannot change into wal mode from within a transaction".
578    conn.execute_batch("PRAGMA journal_mode = wal;").await?;
579    conn.with_transaction(|txn| {
580        txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
581        txn.set_db_version(1)?;
582
583        Ok(())
584    })
585    .await
586}
587
588trait SqliteConnectionStateStoreExt {
589    fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()>;
590
591    fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()>;
592
593    fn set_room_account_data(
594        &self,
595        room_id: &[u8],
596        event_type: &[u8],
597        data: &[u8],
598    ) -> rusqlite::Result<()>;
599    fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()>;
600
601    fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()>;
602    fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>>;
603    fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()>;
604
605    fn set_state_event(
606        &self,
607        room_id: &[u8],
608        event_type: &[u8],
609        state_key: &[u8],
610        stripped: bool,
611        event_id: Option<&[u8]>,
612        data: &[u8],
613    ) -> rusqlite::Result<()>;
614    fn get_state_event_by_id(
615        &self,
616        room_id: &[u8],
617        event_id: &[u8],
618    ) -> rusqlite::Result<Option<Vec<u8>>>;
619    fn remove_room_state_events(
620        &self,
621        room_id: &[u8],
622        stripped: Option<bool>,
623    ) -> rusqlite::Result<()>;
624
625    fn set_member(
626        &self,
627        room_id: &[u8],
628        user_id: &[u8],
629        membership: &[u8],
630        stripped: bool,
631        data: &[u8],
632    ) -> rusqlite::Result<()>;
633    fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()>;
634
635    fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
636    fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()>;
637    fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()>;
638
639    fn set_receipt(
640        &self,
641        room_id: &[u8],
642        user_id: &[u8],
643        receipt_type: &[u8],
644        thread_id: &[u8],
645        event_id: &[u8],
646        data: &[u8],
647    ) -> rusqlite::Result<()>;
648    fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()>;
649
650    fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()>;
651    fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()>;
652    fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()>;
653    fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
654    fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
655}
656
657impl SqliteConnectionStateStoreExt for rusqlite::Connection {
658    fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()> {
659        self.execute("INSERT OR REPLACE INTO kv_blob VALUES (?, ?)", (key, value))?;
660        Ok(())
661    }
662
663    fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()> {
664        self.prepare_cached(
665            "INSERT OR REPLACE INTO global_account_data (event_type, data)
666             VALUES (?, ?)",
667        )?
668        .execute((event_type, data))?;
669        Ok(())
670    }
671
672    fn set_room_account_data(
673        &self,
674        room_id: &[u8],
675        event_type: &[u8],
676        data: &[u8],
677    ) -> rusqlite::Result<()> {
678        self.prepare_cached(
679            "INSERT OR REPLACE INTO room_account_data (room_id, event_type, data)
680             VALUES (?, ?, ?)",
681        )?
682        .execute((room_id, event_type, data))?;
683        Ok(())
684    }
685
686    fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()> {
687        self.prepare(
688            "DELETE FROM room_account_data
689             WHERE room_id = ?",
690        )?
691        .execute((room_id,))?;
692        Ok(())
693    }
694
695    fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()> {
696        self.prepare_cached(
697            "INSERT OR REPLACE INTO room_info (room_id, state, data)
698             VALUES (?, ?, ?)",
699        )?
700        .execute((room_id, state, data))?;
701        Ok(())
702    }
703
704    fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>> {
705        self.query_row("SELECT data FROM room_info WHERE room_id = ?", (room_id,), |row| row.get(0))
706            .optional()
707    }
708
709    /// Remove the room info for the given room.
710    fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()> {
711        self.prepare_cached("DELETE FROM room_info WHERE room_id = ?")?.execute((room_id,))?;
712        Ok(())
713    }
714
715    fn set_state_event(
716        &self,
717        room_id: &[u8],
718        event_type: &[u8],
719        state_key: &[u8],
720        stripped: bool,
721        event_id: Option<&[u8]>,
722        data: &[u8],
723    ) -> rusqlite::Result<()> {
724        self.prepare_cached(
725            "INSERT OR REPLACE
726             INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
727             VALUES (?, ?, ?, ?, ?, ?)",
728        )?
729        .execute((room_id, event_type, state_key, stripped, event_id, data))?;
730        Ok(())
731    }
732
733    fn get_state_event_by_id(
734        &self,
735        room_id: &[u8],
736        event_id: &[u8],
737    ) -> rusqlite::Result<Option<Vec<u8>>> {
738        self.query_row(
739            "SELECT data FROM state_event WHERE room_id = ? AND event_id = ?",
740            (room_id, event_id),
741            |row| row.get(0),
742        )
743        .optional()
744    }
745
746    /// Remove state events for the given room.
747    ///
748    /// If `stripped` is `Some()`, only removes state events for the given
749    /// stripped state. Otherwise, state events are removed regardless of the
750    /// stripped state.
751    fn remove_room_state_events(
752        &self,
753        room_id: &[u8],
754        stripped: Option<bool>,
755    ) -> rusqlite::Result<()> {
756        if let Some(stripped) = stripped {
757            self.prepare_cached("DELETE FROM state_event WHERE room_id = ? AND stripped = ?")?
758                .execute((room_id, stripped))?;
759        } else {
760            self.prepare_cached("DELETE FROM state_event WHERE room_id = ?")?
761                .execute((room_id,))?;
762        }
763        Ok(())
764    }
765
766    fn set_member(
767        &self,
768        room_id: &[u8],
769        user_id: &[u8],
770        membership: &[u8],
771        stripped: bool,
772        data: &[u8],
773    ) -> rusqlite::Result<()> {
774        self.prepare_cached(
775            "INSERT OR REPLACE
776             INTO member (room_id, user_id, membership, stripped, data)
777             VALUES (?, ?, ?, ?, ?)",
778        )?
779        .execute((room_id, user_id, membership, stripped, data))?;
780        Ok(())
781    }
782
783    /// Remove members for the given room.
784    ///
785    /// If `stripped` is `Some()`, only removes members for the given stripped
786    /// state. Otherwise, members are removed regardless of the stripped state.
787    fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()> {
788        if let Some(stripped) = stripped {
789            self.prepare_cached("DELETE FROM member WHERE room_id = ? AND stripped = ?")?
790                .execute((room_id, stripped))?;
791        } else {
792            self.prepare_cached("DELETE FROM member WHERE room_id = ?")?.execute((room_id,))?;
793        }
794        Ok(())
795    }
796
797    fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
798        self.prepare_cached(
799            "INSERT OR REPLACE
800             INTO profile (room_id, user_id, data)
801             VALUES (?, ?, ?)",
802        )?
803        .execute((room_id, user_id, data))?;
804        Ok(())
805    }
806
807    fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()> {
808        self.prepare("DELETE FROM profile WHERE room_id = ?")?.execute((room_id,))?;
809        Ok(())
810    }
811
812    fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()> {
813        self.prepare("DELETE FROM profile WHERE room_id = ? AND user_id = ?")?
814            .execute((room_id, user_id))?;
815        Ok(())
816    }
817
818    fn set_receipt(
819        &self,
820        room_id: &[u8],
821        user_id: &[u8],
822        receipt_type: &[u8],
823        thread: &[u8],
824        event_id: &[u8],
825        data: &[u8],
826    ) -> rusqlite::Result<()> {
827        self.prepare_cached(
828            "INSERT OR REPLACE
829             INTO receipt (room_id, user_id, receipt_type, thread, event_id, data)
830             VALUES (?, ?, ?, ?, ?, ?)",
831        )?
832        .execute((room_id, user_id, receipt_type, thread, event_id, data))?;
833        Ok(())
834    }
835
836    fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()> {
837        self.prepare("DELETE FROM receipt WHERE room_id = ?")?.execute((room_id,))?;
838        Ok(())
839    }
840
841    fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
842        self.prepare_cached(
843            "INSERT OR REPLACE
844             INTO display_name (room_id, name, data)
845             VALUES (?, ?, ?)",
846        )?
847        .execute((room_id, name, data))?;
848        Ok(())
849    }
850
851    fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()> {
852        self.prepare("DELETE FROM display_name WHERE room_id = ? AND name = ?")?
853            .execute((room_id, name))?;
854        Ok(())
855    }
856
857    fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()> {
858        self.prepare("DELETE FROM display_name WHERE room_id = ?")?.execute((room_id,))?;
859        Ok(())
860    }
861
862    fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
863        self.prepare("DELETE FROM send_queue_events WHERE room_id = ?")?.execute((room_id,))?;
864        Ok(())
865    }
866
867    fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
868        self.prepare("DELETE FROM dependent_send_queue_events WHERE room_id = ?")?
869            .execute((room_id,))?;
870        Ok(())
871    }
872}
873
874#[async_trait]
875trait SqliteObjectStateStoreExt: SqliteAsyncConnExt {
876    async fn get_kv_blob(&self, key: Key) -> Result<Option<Vec<u8>>> {
877        Ok(self
878            .query_row("SELECT value FROM kv_blob WHERE key = ?", (key,), |row| row.get(0))
879            .await
880            .optional()?)
881    }
882
883    async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
884        let keys_length = keys.len();
885
886        self.chunk_large_query_over(keys, Some(keys_length), |txn, keys| {
887            let sql_params = repeat_vars(keys.len());
888            let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");
889
890            let params = rusqlite::params_from_iter(keys);
891
892            Ok(txn
893                .prepare(&sql)?
894                .query(params)?
895                .mapped(|row| row.get(0))
896                .collect::<Result<_, _>>()?)
897        })
898        .await
899    }
900
901    async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;
902
903    async fn delete_kv_blob(&self, key: Key) -> Result<()> {
904        self.execute("DELETE FROM kv_blob WHERE key = ?", (key,)).await?;
905        Ok(())
906    }
907
908    async fn get_room_infos(&self, room_id: Option<Key>) -> Result<Vec<Vec<u8>>> {
909        Ok(match room_id {
910            None => {
911                self.prepare("SELECT data FROM room_info", move |mut stmt| {
912                    stmt.query_map((), |row| row.get(0))?.collect()
913                })
914                .await?
915            }
916
917            Some(room_id) => {
918                self.prepare("SELECT data FROM room_info WHERE room_id = ?", move |mut stmt| {
919                    stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
920                })
921                .await?
922            }
923        })
924    }
925
926    async fn get_maybe_stripped_state_events_for_keys(
927        &self,
928        room_id: Key,
929        event_type: Key,
930        state_keys: Vec<Key>,
931    ) -> Result<Vec<(bool, Vec<u8>)>> {
932        self.chunk_large_query_over(state_keys, None, move |txn, state_keys: Vec<Key>| {
933            let sql_params = repeat_vars(state_keys.len());
934            let sql = format!(
935                "SELECT stripped, data FROM state_event
936                 WHERE room_id = ? AND event_type = ? AND state_key IN ({sql_params})"
937            );
938
939            let params = rusqlite::params_from_iter(
940                [room_id.clone(), event_type.clone()].into_iter().chain(state_keys),
941            );
942
943            Ok(txn
944                .prepare(&sql)?
945                .query(params)?
946                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
947                .collect::<Result<_, _>>()?)
948        })
949        .await
950    }
951
952    async fn get_maybe_stripped_state_events(
953        &self,
954        room_id: Key,
955        event_type: Key,
956    ) -> Result<Vec<(bool, Vec<u8>)>> {
957        Ok(self
958            .prepare(
959                "SELECT stripped, data FROM state_event
960                 WHERE room_id = ? AND event_type = ?",
961                |mut stmt| {
962                    stmt.query((room_id, event_type))?
963                        .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
964                        .collect()
965                },
966            )
967            .await?)
968    }
969
970    async fn get_profiles(
971        &self,
972        room_id: Key,
973        user_ids: Vec<Key>,
974    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
975        let user_ids_length = user_ids.len();
976
977        self.chunk_large_query_over(user_ids, Some(user_ids_length), move |txn, user_ids| {
978            let sql_params = repeat_vars(user_ids.len());
979            let sql = format!(
980                "SELECT user_id, data FROM profile WHERE room_id = ? AND user_id IN ({sql_params})"
981            );
982
983            let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(user_ids));
984
985            Ok(txn
986                .prepare(&sql)?
987                .query(params)?
988                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
989                .collect::<Result<_, _>>()?)
990        })
991        .await
992    }
993
994    async fn get_user_ids(&self, room_id: Key, memberships: Vec<Key>) -> Result<Vec<Vec<u8>>> {
995        let res = if memberships.is_empty() {
996            self.prepare("SELECT data FROM member WHERE room_id = ?", |mut stmt| {
997                stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
998            })
999            .await?
1000        } else {
1001            self.chunk_large_query_over(memberships, None, move |txn, memberships| {
1002                let sql_params = repeat_vars(memberships.len());
1003                let sql = format!(
1004                    "SELECT data FROM member WHERE room_id = ? AND membership IN ({sql_params})"
1005                );
1006
1007                let params =
1008                    rusqlite::params_from_iter(iter::once(room_id.clone()).chain(memberships));
1009
1010                Ok(txn
1011                    .prepare(&sql)?
1012                    .query(params)?
1013                    .mapped(|row| row.get(0))
1014                    .collect::<Result<_, _>>()?)
1015            })
1016            .await?
1017        };
1018
1019        Ok(res)
1020    }
1021
1022    async fn get_global_account_data(&self, event_type: Key) -> Result<Option<Vec<u8>>> {
1023        Ok(self
1024            .query_row(
1025                "SELECT data FROM global_account_data WHERE event_type = ?",
1026                (event_type,),
1027                |row| row.get(0),
1028            )
1029            .await
1030            .optional()?)
1031    }
1032
1033    async fn get_room_account_data(
1034        &self,
1035        room_id: Key,
1036        event_type: Key,
1037    ) -> Result<Option<Vec<u8>>> {
1038        Ok(self
1039            .query_row(
1040                "SELECT data FROM room_account_data WHERE room_id = ? AND event_type = ?",
1041                (room_id, event_type),
1042                |row| row.get(0),
1043            )
1044            .await
1045            .optional()?)
1046    }
1047
1048    async fn get_display_names(
1049        &self,
1050        room_id: Key,
1051        names: Vec<Key>,
1052    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1053        let names_length = names.len();
1054
1055        self.chunk_large_query_over(names, Some(names_length), move |txn, names| {
1056            let sql_params = repeat_vars(names.len());
1057            let sql = format!(
1058                "SELECT name, data FROM display_name WHERE room_id = ? AND name IN ({sql_params})"
1059            );
1060
1061            let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(names));
1062
1063            Ok(txn
1064                .prepare(&sql)?
1065                .query(params)?
1066                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
1067                .collect::<Result<_, _>>()?)
1068        })
1069        .await
1070    }
1071
1072    async fn get_user_receipt(
1073        &self,
1074        room_id: Key,
1075        receipt_type: Key,
1076        thread: Key,
1077        user_id: Key,
1078    ) -> Result<Option<Vec<u8>>> {
1079        Ok(self
1080            .query_row(
1081                "SELECT data FROM receipt
1082                 WHERE room_id = ? AND receipt_type = ? AND thread = ? and user_id = ?",
1083                (room_id, receipt_type, thread, user_id),
1084                |row| row.get(0),
1085            )
1086            .await
1087            .optional()?)
1088    }
1089
1090    async fn get_event_receipts(
1091        &self,
1092        room_id: Key,
1093        receipt_type: Key,
1094        thread: Key,
1095        event_id: Key,
1096    ) -> Result<Vec<Vec<u8>>> {
1097        Ok(self
1098            .prepare(
1099                "SELECT data FROM receipt
1100                 WHERE room_id = ? AND receipt_type = ? AND thread = ? and event_id = ?",
1101                |mut stmt| {
1102                    stmt.query((room_id, receipt_type, thread, event_id))?
1103                        .mapped(|row| row.get(0))
1104                        .collect()
1105                },
1106            )
1107            .await?)
1108    }
1109}
1110
1111#[async_trait]
1112impl SqliteObjectStateStoreExt for SqliteAsyncConn {
1113    async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()> {
1114        Ok(self.interact(move |conn| conn.set_kv_blob(&key, &value)).await.unwrap()?)
1115    }
1116}
1117
1118#[async_trait]
1119impl StateStore for SqliteStateStore {
1120    type Error = Error;
1121
1122    async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
1123        self.read()
1124            .await?
1125            .get_kv_blob(self.encode_state_store_data_key(key))
1126            .await?
1127            .map(|data| {
1128                Ok(match key {
1129                    StateStoreDataKey::SyncToken => {
1130                        StateStoreDataValue::SyncToken(self.deserialize_value(&data)?)
1131                    }
1132                    StateStoreDataKey::SupportedVersions => {
1133                        StateStoreDataValue::SupportedVersions(self.deserialize_value(&data)?)
1134                    }
1135                    StateStoreDataKey::WellKnown => {
1136                        StateStoreDataValue::WellKnown(self.deserialize_value(&data)?)
1137                    }
1138                    StateStoreDataKey::Filter(_) => {
1139                        StateStoreDataValue::Filter(self.deserialize_value(&data)?)
1140                    }
1141                    StateStoreDataKey::UserAvatarUrl(_) => {
1142                        StateStoreDataValue::UserAvatarUrl(self.deserialize_value(&data)?)
1143                    }
1144                    StateStoreDataKey::RecentlyVisitedRooms(_) => {
1145                        StateStoreDataValue::RecentlyVisitedRooms(self.deserialize_value(&data)?)
1146                    }
1147                    StateStoreDataKey::UtdHookManagerData => {
1148                        StateStoreDataValue::UtdHookManagerData(self.deserialize_value(&data)?)
1149                    }
1150                    StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1151                        StateStoreDataValue::OneTimeKeyAlreadyUploaded
1152                    }
1153                    StateStoreDataKey::ComposerDraft(_, _) => {
1154                        StateStoreDataValue::ComposerDraft(self.deserialize_value(&data)?)
1155                    }
1156                    StateStoreDataKey::SeenKnockRequests(_) => {
1157                        StateStoreDataValue::SeenKnockRequests(self.deserialize_value(&data)?)
1158                    }
1159                    StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
1160                        StateStoreDataValue::ThreadSubscriptionsCatchupTokens(
1161                            self.deserialize_value(&data)?,
1162                        )
1163                    }
1164                })
1165            })
1166            .transpose()
1167    }
1168
1169    async fn set_kv_data(
1170        &self,
1171        key: StateStoreDataKey<'_>,
1172        value: StateStoreDataValue,
1173    ) -> Result<()> {
1174        let serialized_value = match key {
1175            StateStoreDataKey::SyncToken => self.serialize_value(
1176                &value.into_sync_token().expect("Session data not a sync token"),
1177            )?,
1178            StateStoreDataKey::SupportedVersions => self.serialize_value(
1179                &value
1180                    .into_supported_versions()
1181                    .expect("Session data not containing supported versions"),
1182            )?,
1183            StateStoreDataKey::WellKnown => self.serialize_value(
1184                &value.into_well_known().expect("Session data not containing well-known"),
1185            )?,
1186            StateStoreDataKey::Filter(_) => {
1187                self.serialize_value(&value.into_filter().expect("Session data not a filter"))?
1188            }
1189            StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
1190                &value.into_user_avatar_url().expect("Session data not an user avatar url"),
1191            )?,
1192            StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
1193                &value.into_recently_visited_rooms().expect("Session data not breadcrumbs"),
1194            )?,
1195            StateStoreDataKey::UtdHookManagerData => self.serialize_value(
1196                &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
1197            )?,
1198            StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1199                self.serialize_value(&true).expect("We should be able to serialize a boolean")
1200            }
1201            StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
1202                &value.into_composer_draft().expect("Session data not a composer draft"),
1203            )?,
1204            StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
1205                &value
1206                    .into_seen_knock_requests()
1207                    .expect("Session data is not a set of seen knock request ids"),
1208            )?,
1209            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => self.serialize_value(
1210                &value
1211                    .into_thread_subscriptions_catchup_tokens()
1212                    .expect("Session data is not a list of thread subscription catchup tokens"),
1213            )?,
1214        };
1215
1216        self.write()
1217            .await
1218            .set_kv_blob(self.encode_state_store_data_key(key), serialized_value)
1219            .await
1220    }
1221
1222    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
1223        self.write().await.delete_kv_blob(self.encode_state_store_data_key(key)).await
1224    }
1225
1226    async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
1227        let changes = changes.to_owned();
1228        let this = self.clone();
1229        self.write()
1230            .await
1231            .with_transaction(move |txn| {
1232                let StateChanges {
1233                    sync_token,
1234                    account_data,
1235                    presence,
1236                    profiles,
1237                    profiles_to_delete,
1238                    state,
1239                    room_account_data,
1240                    room_infos,
1241                    receipts,
1242                    redactions,
1243                    stripped_state,
1244                    ambiguity_maps,
1245                } = changes;
1246
1247                if let Some(sync_token) = sync_token {
1248                    let key = this.encode_state_store_data_key(StateStoreDataKey::SyncToken);
1249                    let value = this.serialize_value(&sync_token)?;
1250                    txn.set_kv_blob(&key, &value)?;
1251                }
1252
1253                for (event_type, event) in account_data {
1254                    let event_type =
1255                        this.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1256                    let data = this.serialize_json(&event)?;
1257                    txn.set_global_account_data(&event_type, &data)?;
1258                }
1259
1260                for (room_id, events) in room_account_data {
1261                    let room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1262                    for (event_type, event) in events {
1263                        let event_type =
1264                            this.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1265                        let data = this.serialize_json(&event)?;
1266                        txn.set_room_account_data(&room_id, &event_type, &data)?;
1267                    }
1268                }
1269
1270                for (user_id, event) in presence {
1271                    let key = this.encode_presence_key(&user_id);
1272                    let value = this.serialize_json(&event)?;
1273                    txn.set_kv_blob(&key, &value)?;
1274                }
1275
1276                for (room_id, room_info) in room_infos {
1277                    let stripped = room_info.state() == RoomState::Invited;
1278                    // Remove non-stripped data for stripped rooms and vice-versa.
1279                    this.remove_maybe_stripped_room_data(txn, &room_id, !stripped)?;
1280
1281                    let room_id = this.encode_key(keys::ROOM_INFO, room_id);
1282                    let state = this
1283                        .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
1284                    let data = this.serialize_json(&room_info)?;
1285                    txn.set_room_info(&room_id, &state, &data)?;
1286                }
1287
1288                for (room_id, user_ids) in profiles_to_delete {
1289                    let room_id = this.encode_key(keys::PROFILE, room_id);
1290                    for user_id in user_ids {
1291                        let user_id = this.encode_key(keys::PROFILE, user_id);
1292                        txn.remove_room_profile(&room_id, &user_id)?;
1293                    }
1294                }
1295
1296                for (room_id, state_event_types) in state {
1297                    let profiles = profiles.get(&room_id);
1298                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1299
1300                    for (event_type, state_events) in state_event_types {
1301                        let encoded_event_type =
1302                            this.encode_key(keys::STATE_EVENT, event_type.to_string());
1303
1304                        for (state_key, raw_state_event) in state_events {
1305                            let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1306                            let data = this.serialize_json(&raw_state_event)?;
1307
1308                            let event_id: Option<String> =
1309                                raw_state_event.get_field("event_id").ok().flatten();
1310                            let encoded_event_id =
1311                                event_id.as_ref().map(|e| this.encode_key(keys::STATE_EVENT, e));
1312
1313                            txn.set_state_event(
1314                                &encoded_room_id,
1315                                &encoded_event_type,
1316                                &encoded_state_key,
1317                                false,
1318                                encoded_event_id.as_deref(),
1319                                &data,
1320                            )?;
1321
1322                            if event_type == StateEventType::RoomMember {
1323                                let member_event = match raw_state_event
1324                                    .deserialize_as_unchecked::<SyncRoomMemberEvent>()
1325                                {
1326                                    Ok(ev) => ev,
1327                                    Err(e) => {
1328                                        debug!(event_id, "Failed to deserialize member event: {e}");
1329                                        continue;
1330                                    }
1331                                };
1332
1333                                let encoded_room_id = this.encode_key(keys::MEMBER, &room_id);
1334                                let user_id = this.encode_key(keys::MEMBER, &state_key);
1335                                let membership = this
1336                                    .encode_key(keys::MEMBER, member_event.membership().as_str());
1337                                let data = this.serialize_value(&state_key)?;
1338
1339                                txn.set_member(
1340                                    &encoded_room_id,
1341                                    &user_id,
1342                                    &membership,
1343                                    false,
1344                                    &data,
1345                                )?;
1346
1347                                if let Some(profile) =
1348                                    profiles.and_then(|p| p.get(member_event.state_key()))
1349                                {
1350                                    let room_id = this.encode_key(keys::PROFILE, &room_id);
1351                                    let user_id = this.encode_key(keys::PROFILE, &state_key);
1352                                    let data = this.serialize_json(&profile)?;
1353                                    txn.set_profile(&room_id, &user_id, &data)?;
1354                                }
1355                            }
1356                        }
1357                    }
1358                }
1359
1360                for (room_id, stripped_state_event_types) in stripped_state {
1361                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1362
1363                    for (event_type, stripped_state_events) in stripped_state_event_types {
1364                        let encoded_event_type =
1365                            this.encode_key(keys::STATE_EVENT, event_type.to_string());
1366
1367                        for (state_key, raw_stripped_state_event) in stripped_state_events {
1368                            let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1369                            let data = this.serialize_json(&raw_stripped_state_event)?;
1370                            txn.set_state_event(
1371                                &encoded_room_id,
1372                                &encoded_event_type,
1373                                &encoded_state_key,
1374                                true,
1375                                None,
1376                                &data,
1377                            )?;
1378
1379                            if event_type == StateEventType::RoomMember {
1380                                let member_event = match raw_stripped_state_event
1381                                    .deserialize_as_unchecked::<StrippedRoomMemberEvent>(
1382                                ) {
1383                                    Ok(ev) => ev,
1384                                    Err(e) => {
1385                                        debug!("Failed to deserialize stripped member event: {e}");
1386                                        continue;
1387                                    }
1388                                };
1389
1390                                let room_id = this.encode_key(keys::MEMBER, &room_id);
1391                                let user_id = this.encode_key(keys::MEMBER, &state_key);
1392                                let membership = this.encode_key(
1393                                    keys::MEMBER,
1394                                    member_event.content.membership.as_str(),
1395                                );
1396                                let data = this.serialize_value(&state_key)?;
1397
1398                                txn.set_member(&room_id, &user_id, &membership, true, &data)?;
1399                            }
1400                        }
1401                    }
1402                }
1403
1404                for (room_id, receipt_event) in receipts {
1405                    let room_id = this.encode_key(keys::RECEIPT, room_id);
1406
1407                    for (event_id, receipt_types) in receipt_event {
1408                        let encoded_event_id = this.encode_key(keys::RECEIPT, &event_id);
1409
1410                        for (receipt_type, receipt_users) in receipt_types {
1411                            let receipt_type =
1412                                this.encode_key(keys::RECEIPT, receipt_type.as_str());
1413
1414                            for (user_id, receipt) in receipt_users {
1415                                let encoded_user_id = this.encode_key(keys::RECEIPT, &user_id);
1416                                // We cannot have a NULL primary key so we rely on serialization
1417                                // instead of the string representation.
1418                                let thread = this.encode_key(
1419                                    keys::RECEIPT,
1420                                    rmp_serde::to_vec_named(&receipt.thread)?,
1421                                );
1422                                let data = this.serialize_json(&ReceiptData {
1423                                    receipt,
1424                                    event_id: event_id.clone(),
1425                                    user_id,
1426                                })?;
1427
1428                                txn.set_receipt(
1429                                    &room_id,
1430                                    &encoded_user_id,
1431                                    &receipt_type,
1432                                    &thread,
1433                                    &encoded_event_id,
1434                                    &data,
1435                                )?;
1436                            }
1437                        }
1438                    }
1439                }
1440
1441                for (room_id, redactions) in redactions {
1442                    let make_redaction_rules = || {
1443                        let encoded_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1444                        txn.get_room_info(&encoded_room_id)
1445                            .ok()
1446                            .flatten()
1447                            .and_then(|v| this.deserialize_json::<RoomInfo>(&v).ok())
1448                            .map(|info| info.room_version_rules_or_default())
1449                            .unwrap_or_else(|| {
1450                                warn!(
1451                                    ?room_id,
1452                                    "Unable to get the room version rules, defaulting to rules for room version {ROOM_VERSION_FALLBACK}"
1453                                );
1454                                ROOM_VERSION_RULES_FALLBACK
1455                            }).redaction
1456                    };
1457
1458                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1459                    let mut redaction_rules = None;
1460
1461                    for (event_id, redaction) in redactions {
1462                        let event_id = this.encode_key(keys::STATE_EVENT, event_id);
1463
1464                        if let Some(Ok(raw_event)) = txn
1465                            .get_state_event_by_id(&encoded_room_id, &event_id)?
1466                            .map(|value| this.deserialize_json::<Raw<AnySyncStateEvent>>(&value))
1467                        {
1468                            let event = raw_event.deserialize()?;
1469                            let redacted = redact(
1470                                raw_event.deserialize_as::<CanonicalJsonObject>()?,
1471                                redaction_rules.get_or_insert_with(make_redaction_rules),
1472                                Some(RedactedBecause::from_raw_event(&redaction)?),
1473                            )
1474                            .map_err(Error::Redaction)?;
1475                            let data = this.serialize_json(&redacted)?;
1476
1477                            let event_type =
1478                                this.encode_key(keys::STATE_EVENT, event.event_type().to_string());
1479                            let state_key = this.encode_key(keys::STATE_EVENT, event.state_key());
1480
1481                            txn.set_state_event(
1482                                &encoded_room_id,
1483                                &event_type,
1484                                &state_key,
1485                                false,
1486                                Some(&event_id),
1487                                &data,
1488                            )?;
1489                        }
1490                    }
1491                }
1492
1493                for (room_id, display_names) in ambiguity_maps {
1494                    let room_id = this.encode_key(keys::DISPLAY_NAME, room_id);
1495
1496                    for (name, user_ids) in display_names {
1497                        let encoded_name = this.encode_key(
1498                            keys::DISPLAY_NAME,
1499                            name.as_normalized_str().unwrap_or_else(|| name.as_raw_str()),
1500                        );
1501                        let data = this.serialize_json(&user_ids)?;
1502
1503                        if user_ids.is_empty() {
1504                            txn.remove_display_name(&room_id, &encoded_name)?;
1505
1506                            // We can't do a migration to merge the previously distinct buckets of
1507                            // user IDs since the display names themselves are hashed before they
1508                            // are persisted in the store. So the store will always retain two
1509                            // buckets: one for raw display names and one for normalised ones.
1510                            //
1511                            // We therefore do the next best thing, which is a sort of a soft
1512                            // migration: we fetch both the raw and normalised buckets, then merge
1513                            // the user IDs contained in them into a separate, temporary merged
1514                            // bucket. The SDK then operates on the merged buckets exclusively. See
1515                            // the comment in `get_users_with_display_names` for details.
1516                            //
1517                            // If the merged bucket is empty, that must mean that both the raw and
1518                            // normalised buckets were also empty, so we can remove both from the
1519                            // store.
1520                            let raw_name = this.encode_key(keys::DISPLAY_NAME, name.as_raw_str());
1521                            txn.remove_display_name(&room_id, &raw_name)?;
1522                        } else {
1523                            // We only create new buckets with the normalized display name.
1524                            txn.set_display_name(&room_id, &encoded_name, &data)?;
1525                        }
1526                    }
1527                }
1528
1529                Ok::<_, Error>(())
1530            })
1531            .await?;
1532
1533        Ok(())
1534    }
1535
1536    async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1537        self.read()
1538            .await?
1539            .get_kv_blob(self.encode_presence_key(user_id))
1540            .await?
1541            .map(|data| self.deserialize_json(&data))
1542            .transpose()
1543    }
1544
1545    async fn get_presence_events(
1546        &self,
1547        user_ids: &[OwnedUserId],
1548    ) -> Result<Vec<Raw<PresenceEvent>>> {
1549        if user_ids.is_empty() {
1550            return Ok(Vec::new());
1551        }
1552
1553        let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
1554        self.read()
1555            .await?
1556            .get_kv_blobs(user_ids)
1557            .await?
1558            .into_iter()
1559            .map(|data| self.deserialize_json(&data))
1560            .collect()
1561    }
1562
1563    async fn get_state_event(
1564        &self,
1565        room_id: &RoomId,
1566        event_type: StateEventType,
1567        state_key: &str,
1568    ) -> Result<Option<RawAnySyncOrStrippedState>> {
1569        Ok(self
1570            .get_state_events_for_keys(room_id, event_type, &[state_key])
1571            .await?
1572            .into_iter()
1573            .next())
1574    }
1575
1576    async fn get_state_events(
1577        &self,
1578        room_id: &RoomId,
1579        event_type: StateEventType,
1580    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1581        let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1582        let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1583        self.read()
1584            .await?
1585            .get_maybe_stripped_state_events(room_id, event_type)
1586            .await?
1587            .into_iter()
1588            .map(|(stripped, data)| {
1589                let ev = if stripped {
1590                    RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1591                } else {
1592                    RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1593                };
1594
1595                Ok(ev)
1596            })
1597            .collect()
1598    }
1599
1600    async fn get_state_events_for_keys(
1601        &self,
1602        room_id: &RoomId,
1603        event_type: StateEventType,
1604        state_keys: &[&str],
1605    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1606        if state_keys.is_empty() {
1607            return Ok(Vec::new());
1608        }
1609
1610        let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1611        let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1612        let state_keys = state_keys.iter().map(|k| self.encode_key(keys::STATE_EVENT, k)).collect();
1613        self.read()
1614            .await?
1615            .get_maybe_stripped_state_events_for_keys(room_id, event_type, state_keys)
1616            .await?
1617            .into_iter()
1618            .map(|(stripped, data)| {
1619                let ev = if stripped {
1620                    RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1621                } else {
1622                    RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1623                };
1624
1625                Ok(ev)
1626            })
1627            .collect()
1628    }
1629
1630    async fn get_profile(
1631        &self,
1632        room_id: &RoomId,
1633        user_id: &UserId,
1634    ) -> Result<Option<MinimalRoomMemberEvent>> {
1635        let room_id = self.encode_key(keys::PROFILE, room_id);
1636        let user_ids = vec![self.encode_key(keys::PROFILE, user_id)];
1637
1638        self.read()
1639            .await?
1640            .get_profiles(room_id, user_ids)
1641            .await?
1642            .into_iter()
1643            .next()
1644            .map(|(_, data)| self.deserialize_json(&data))
1645            .transpose()
1646    }
1647
1648    async fn get_profiles<'a>(
1649        &self,
1650        room_id: &RoomId,
1651        user_ids: &'a [OwnedUserId],
1652    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1653        if user_ids.is_empty() {
1654            return Ok(BTreeMap::new());
1655        }
1656
1657        let room_id = self.encode_key(keys::PROFILE, room_id);
1658        let mut user_ids_map = user_ids
1659            .iter()
1660            .map(|u| (self.encode_key(keys::PROFILE, u), u.as_ref()))
1661            .collect::<BTreeMap<_, _>>();
1662        let user_ids = user_ids_map.keys().cloned().collect();
1663
1664        self.read()
1665            .await?
1666            .get_profiles(room_id, user_ids)
1667            .await?
1668            .into_iter()
1669            .map(|(user_id, data)| {
1670                Ok((
1671                    user_ids_map
1672                        .remove(user_id.as_slice())
1673                        .expect("returned user IDs were requested"),
1674                    self.deserialize_json(&data)?,
1675                ))
1676            })
1677            .collect()
1678    }
1679
1680    async fn get_user_ids(
1681        &self,
1682        room_id: &RoomId,
1683        membership: RoomMemberships,
1684    ) -> Result<Vec<OwnedUserId>> {
1685        let room_id = self.encode_key(keys::MEMBER, room_id);
1686        let memberships = membership
1687            .as_vec()
1688            .into_iter()
1689            .map(|m| self.encode_key(keys::MEMBER, m.as_str()))
1690            .collect();
1691        self.read()
1692            .await?
1693            .get_user_ids(room_id, memberships)
1694            .await?
1695            .iter()
1696            .map(|data| self.deserialize_value(data))
1697            .collect()
1698    }
1699
1700    async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1701        self.read()
1702            .await?
1703            .get_room_infos(match room_load_settings {
1704                RoomLoadSettings::All => None,
1705                RoomLoadSettings::One(room_id) => Some(self.encode_key(keys::ROOM_INFO, room_id)),
1706            })
1707            .await?
1708            .into_iter()
1709            .map(|data| self.deserialize_json(&data))
1710            .collect()
1711    }
1712
1713    async fn get_users_with_display_name(
1714        &self,
1715        room_id: &RoomId,
1716        display_name: &DisplayName,
1717    ) -> Result<BTreeSet<OwnedUserId>> {
1718        let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1719        let names = vec![self.encode_key(
1720            keys::DISPLAY_NAME,
1721            display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1722        )];
1723
1724        Ok(self
1725            .read()
1726            .await?
1727            .get_display_names(room_id, names)
1728            .await?
1729            .into_iter()
1730            .next()
1731            .map(|(_, data)| self.deserialize_json(&data))
1732            .transpose()?
1733            .unwrap_or_default())
1734    }
1735
1736    async fn get_users_with_display_names<'a>(
1737        &self,
1738        room_id: &RoomId,
1739        display_names: &'a [DisplayName],
1740    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1741        let mut result = HashMap::new();
1742
1743        if display_names.is_empty() {
1744            return Ok(result);
1745        }
1746
1747        let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1748        let mut names_map = display_names
1749            .iter()
1750            .flat_map(|display_name| {
1751                // We encode the display name as the `raw_str()` and the normalized string.
1752                //
1753                // This is for compatibility reasons since:
1754                //  1. Previously "Alice" and "alice" were considered to be distinct display
1755                //     names, while we now consider them to be the same so we need to merge the
1756                //     previously distinct buckets of user IDs.
1757                //  2. We can't do a migration to merge the previously distinct buckets of user
1758                //     IDs since the display names itself are hashed before they are persisted
1759                //     in the store.
1760                let raw =
1761                    (self.encode_key(keys::DISPLAY_NAME, display_name.as_raw_str()), display_name);
1762                let normalized = display_name.as_normalized_str().map(|normalized| {
1763                    (self.encode_key(keys::DISPLAY_NAME, normalized), display_name)
1764                });
1765
1766                iter::once(raw).chain(normalized)
1767            })
1768            .collect::<BTreeMap<_, _>>();
1769        let names = names_map.keys().cloned().collect();
1770
1771        for (name, data) in self.read().await?.get_display_names(room_id, names).await?.into_iter()
1772        {
1773            let display_name =
1774                names_map.remove(name.as_slice()).expect("returned display names were requested");
1775            let user_ids: BTreeSet<_> = self.deserialize_json(&data)?;
1776
1777            result.entry(display_name).or_insert_with(BTreeSet::new).extend(user_ids);
1778        }
1779
1780        Ok(result)
1781    }
1782
1783    async fn get_account_data_event(
1784        &self,
1785        event_type: GlobalAccountDataEventType,
1786    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1787        let event_type = self.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1788        self.read()
1789            .await?
1790            .get_global_account_data(event_type)
1791            .await?
1792            .map(|value| self.deserialize_json(&value))
1793            .transpose()
1794    }
1795
1796    async fn get_room_account_data_event(
1797        &self,
1798        room_id: &RoomId,
1799        event_type: RoomAccountDataEventType,
1800    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1801        let room_id = self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1802        let event_type = self.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1803        self.read()
1804            .await?
1805            .get_room_account_data(room_id, event_type)
1806            .await?
1807            .map(|value| self.deserialize_json(&value))
1808            .transpose()
1809    }
1810
1811    async fn get_user_room_receipt_event(
1812        &self,
1813        room_id: &RoomId,
1814        receipt_type: ReceiptType,
1815        thread: ReceiptThread,
1816        user_id: &UserId,
1817    ) -> Result<Option<(OwnedEventId, Receipt)>> {
1818        let room_id = self.encode_key(keys::RECEIPT, room_id);
1819        let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1820        // We cannot have a NULL primary key so we rely on serialization instead of the
1821        // string representation.
1822        let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1823        let user_id = self.encode_key(keys::RECEIPT, user_id);
1824
1825        self.read()
1826            .await?
1827            .get_user_receipt(room_id, receipt_type, thread, user_id)
1828            .await?
1829            .map(|value| {
1830                self.deserialize_json::<ReceiptData>(&value).map(|d| (d.event_id, d.receipt))
1831            })
1832            .transpose()
1833    }
1834
1835    async fn get_event_room_receipt_events(
1836        &self,
1837        room_id: &RoomId,
1838        receipt_type: ReceiptType,
1839        thread: ReceiptThread,
1840        event_id: &EventId,
1841    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1842        let room_id = self.encode_key(keys::RECEIPT, room_id);
1843        let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1844        // We cannot have a NULL primary key so we rely on serialization instead of the
1845        // string representation.
1846        let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1847        let event_id = self.encode_key(keys::RECEIPT, event_id);
1848
1849        self.read()
1850            .await?
1851            .get_event_receipts(room_id, receipt_type, thread, event_id)
1852            .await?
1853            .iter()
1854            .map(|value| {
1855                self.deserialize_json::<ReceiptData>(value).map(|d| (d.user_id, d.receipt))
1856            })
1857            .collect()
1858    }
1859
1860    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1861        self.read().await?.get_kv_blob(self.encode_custom_key(key)).await
1862    }
1863
1864    async fn set_custom_value_no_read(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
1865        let conn = self.write().await;
1866        let key = self.encode_custom_key(key);
1867        conn.set_kv_blob(key, value).await?;
1868        Ok(())
1869    }
1870
1871    async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1872        let conn = self.write().await;
1873        let key = self.encode_custom_key(key);
1874        let previous = conn.get_kv_blob(key.clone()).await?;
1875        conn.set_kv_blob(key, value).await?;
1876        Ok(previous)
1877    }
1878
1879    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1880        let conn = self.write().await;
1881        let key = self.encode_custom_key(key);
1882        let previous = conn.get_kv_blob(key.clone()).await?;
1883        if previous.is_some() {
1884            conn.delete_kv_blob(key).await?;
1885        }
1886        Ok(previous)
1887    }
1888
1889    async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1890        let this = self.clone();
1891        let room_id = room_id.to_owned();
1892
1893        let conn = self.write().await;
1894
1895        conn.with_transaction(move |txn| -> Result<()> {
1896            let room_info_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1897            txn.remove_room_info(&room_info_room_id)?;
1898
1899            let state_event_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1900            txn.remove_room_state_events(&state_event_room_id, None)?;
1901
1902            let member_room_id = this.encode_key(keys::MEMBER, &room_id);
1903            txn.remove_room_members(&member_room_id, None)?;
1904
1905            let profile_room_id = this.encode_key(keys::PROFILE, &room_id);
1906            txn.remove_room_profiles(&profile_room_id)?;
1907
1908            let room_account_data_room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, &room_id);
1909            txn.remove_room_account_data(&room_account_data_room_id)?;
1910
1911            let receipt_room_id = this.encode_key(keys::RECEIPT, &room_id);
1912            txn.remove_room_receipts(&receipt_room_id)?;
1913
1914            let display_name_room_id = this.encode_key(keys::DISPLAY_NAME, &room_id);
1915            txn.remove_room_display_names(&display_name_room_id)?;
1916
1917            let send_queue_room_id = this.encode_key(keys::SEND_QUEUE, &room_id);
1918            txn.remove_room_send_queue(&send_queue_room_id)?;
1919
1920            let dependent_send_queue_room_id =
1921                this.encode_key(keys::DEPENDENTS_SEND_QUEUE, &room_id);
1922            txn.remove_room_dependent_send_queue(&dependent_send_queue_room_id)?;
1923
1924            let thread_subscriptions_room_id =
1925                this.encode_key(keys::THREAD_SUBSCRIPTIONS, &room_id);
1926            txn.execute(
1927                "DELETE FROM thread_subscriptions WHERE room_id = ?",
1928                (thread_subscriptions_room_id,),
1929            )?;
1930
1931            Ok(())
1932        })
1933        .await?;
1934
1935        conn.vacuum().await
1936    }
1937
1938    async fn save_send_queue_request(
1939        &self,
1940        room_id: &RoomId,
1941        transaction_id: OwnedTransactionId,
1942        created_at: MilliSecondsSinceUnixEpoch,
1943        content: QueuedRequestKind,
1944        priority: usize,
1945    ) -> Result<(), Self::Error> {
1946        let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
1947        let room_id_value = self.serialize_value(&room_id.to_owned())?;
1948
1949        let content = self.serialize_json(&content)?;
1950        // The transaction id is used both as a key (in remove/update) and a value (as
1951        // it's useful for the callers), so we keep it as is, and neither hash
1952        // it (with encode_key) or encrypt it (through serialize_value). After
1953        // all, it carries no personal information, so this is considered fine.
1954
1955        let created_at_ts: u64 = created_at.0.into();
1956        self.write()
1957            .await
1958            .with_transaction(move |txn| {
1959                txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
1960                Ok(())
1961            })
1962            .await
1963    }
1964
1965    async fn update_send_queue_request(
1966        &self,
1967        room_id: &RoomId,
1968        transaction_id: &TransactionId,
1969        content: QueuedRequestKind,
1970    ) -> Result<bool, Self::Error> {
1971        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1972
1973        let content = self.serialize_json(&content)?;
1974        // See comment in [`Self::save_send_queue_request`] to understand why the
1975        // transaction id is neither encrypted or hashed.
1976        let transaction_id = transaction_id.to_string();
1977
1978        let num_updated = self.write()
1979            .await
1980            .with_transaction(move |txn| {
1981                txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
1982            })
1983            .await?;
1984
1985        Ok(num_updated > 0)
1986    }
1987
1988    async fn remove_send_queue_request(
1989        &self,
1990        room_id: &RoomId,
1991        transaction_id: &TransactionId,
1992    ) -> Result<bool, Self::Error> {
1993        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1994
1995        // See comment in `save_send_queue_request`.
1996        let transaction_id = transaction_id.to_string();
1997
1998        let num_deleted = self
1999            .write()
2000            .await
2001            .with_transaction(move |txn| {
2002                txn.prepare_cached(
2003                    "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
2004                )?
2005                .execute((room_id, &transaction_id))
2006            })
2007            .await?;
2008
2009        Ok(num_deleted > 0)
2010    }
2011
2012    async fn load_send_queue_requests(
2013        &self,
2014        room_id: &RoomId,
2015    ) -> Result<Vec<QueuedRequest>, Self::Error> {
2016        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
2017
2018        // Note: ROWID is always present and is an auto-incremented integer counter. We
2019        // want to maintain the insertion order, so we can sort using it.
2020        // Note 2: transaction_id is not encoded, see why in `save_send_queue_request`.
2021        let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, Option<u64>)> = self
2022            .read()
2023            .await?
2024            .prepare(
2025                "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
2026                |mut stmt| {
2027                    stmt.query((room_id,))?
2028                        .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2029                        .collect()
2030                },
2031            )
2032            .await?;
2033
2034        let mut requests = Vec::with_capacity(res.len());
2035
2036        for entry in res {
2037            let created_at = entry
2038                .4
2039                .and_then(UInt::new)
2040                .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2041
2042            requests.push(QueuedRequest {
2043                transaction_id: entry.0.into(),
2044                kind: self.deserialize_json(&entry.1)?,
2045                error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
2046                priority: entry.3,
2047                created_at,
2048            });
2049        }
2050
2051        Ok(requests)
2052    }
2053
2054    async fn update_send_queue_request_status(
2055        &self,
2056        room_id: &RoomId,
2057        transaction_id: &TransactionId,
2058        error: Option<QueueWedgeError>,
2059    ) -> Result<(), Self::Error> {
2060        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
2061
2062        // See comment in `save_send_queue_request`.
2063        let transaction_id = transaction_id.to_string();
2064
2065        // Serialize the error to json bytes (encrypted if option is enabled) if set.
2066        let error_value = error.map(|e| self.serialize_value(&e)).transpose()?;
2067
2068        self.write()
2069            .await
2070            .with_transaction(move |txn| {
2071                txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?;
2072                Ok(())
2073            })
2074            .await
2075    }
2076
2077    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
2078        // If the values were not encrypted, we could use `SELECT DISTINCT` here, but we
2079        // have to manually do the deduplication: indeed, for all X, encrypt(X)
2080        // != encrypted(X), since we use a nonce in the encryption process.
2081
2082        let res: Vec<Vec<u8>> = self
2083            .read()
2084            .await?
2085            .prepare("SELECT room_id_val FROM send_queue_events", |mut stmt| {
2086                stmt.query(())?.mapped(|row| row.get(0)).collect()
2087            })
2088            .await?;
2089
2090        // So we collect the results into a `BTreeSet` to perform the deduplication, and
2091        // then rejigger that into a vector.
2092        Ok(res
2093            .into_iter()
2094            .map(|entry| self.deserialize_value(&entry))
2095            .collect::<Result<BTreeSet<OwnedRoomId>, _>>()?
2096            .into_iter()
2097            .collect())
2098    }
2099
2100    async fn save_dependent_queued_request(
2101        &self,
2102        room_id: &RoomId,
2103        parent_txn_id: &TransactionId,
2104        own_txn_id: ChildTransactionId,
2105        created_at: MilliSecondsSinceUnixEpoch,
2106        content: DependentQueuedRequestKind,
2107    ) -> Result<()> {
2108        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2109        let content = self.serialize_json(&content)?;
2110
2111        // See comment in `save_send_queue_request`.
2112        let parent_txn_id = parent_txn_id.to_string();
2113        let own_txn_id = own_txn_id.to_string();
2114
2115        let created_at_ts: u64 = created_at.0.into();
2116        self.write()
2117            .await
2118            .with_transaction(move |txn| {
2119                txn.prepare_cached(
2120                    r#"INSERT INTO dependent_send_queue_events
2121                         (room_id, parent_transaction_id, own_transaction_id, content, created_at)
2122                       VALUES (?, ?, ?, ?, ?)"#,
2123                )?
2124                .execute((
2125                    room_id,
2126                    parent_txn_id,
2127                    own_txn_id,
2128                    content,
2129                    created_at_ts,
2130                ))?;
2131                Ok(())
2132            })
2133            .await
2134    }
2135
2136    async fn update_dependent_queued_request(
2137        &self,
2138        room_id: &RoomId,
2139        own_transaction_id: &ChildTransactionId,
2140        new_content: DependentQueuedRequestKind,
2141    ) -> Result<bool> {
2142        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2143        let content = self.serialize_json(&new_content)?;
2144
2145        // See comment in `save_send_queue_request`.
2146        let own_txn_id = own_transaction_id.to_string();
2147
2148        let num_updated = self
2149            .write()
2150            .await
2151            .with_transaction(move |txn| {
2152                txn.prepare_cached(
2153                    r#"UPDATE dependent_send_queue_events
2154                       SET content = ?
2155                       WHERE own_transaction_id = ?
2156                       AND room_id = ?"#,
2157                )?
2158                .execute((content, own_txn_id, room_id))
2159            })
2160            .await?;
2161
2162        if num_updated > 1 {
2163            return Err(Error::InconsistentUpdate);
2164        }
2165
2166        Ok(num_updated == 1)
2167    }
2168
2169    async fn mark_dependent_queued_requests_as_ready(
2170        &self,
2171        room_id: &RoomId,
2172        parent_txn_id: &TransactionId,
2173        parent_key: SentRequestKey,
2174    ) -> Result<usize> {
2175        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2176        let parent_key = self.serialize_json(&parent_key)?;
2177
2178        // See comment in `save_send_queue_request`.
2179        let parent_txn_id = parent_txn_id.to_string();
2180
2181        self.write()
2182            .await
2183            .with_transaction(move |txn| {
2184                Ok(txn.prepare_cached(
2185                    "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?",
2186                )?
2187                .execute((parent_key, parent_txn_id, room_id))?)
2188            })
2189            .await
2190    }
2191
2192    async fn remove_dependent_queued_request(
2193        &self,
2194        room_id: &RoomId,
2195        txn_id: &ChildTransactionId,
2196    ) -> Result<bool> {
2197        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2198
2199        // See comment in `save_send_queue_request`.
2200        let txn_id = txn_id.to_string();
2201
2202        let num_deleted = self
2203            .write()
2204            .await
2205            .with_transaction(move |txn| {
2206                txn.prepare_cached(
2207                    "DELETE FROM dependent_send_queue_events WHERE own_transaction_id = ? AND room_id = ?",
2208                )?
2209                .execute((txn_id, room_id))
2210            })
2211            .await?;
2212
2213        Ok(num_deleted > 0)
2214    }
2215
2216    async fn load_dependent_queued_requests(
2217        &self,
2218        room_id: &RoomId,
2219    ) -> Result<Vec<DependentQueuedRequest>> {
2220        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2221
2222        // Note: transaction_id is not encoded, see why in `save_send_queue_request`.
2223        let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, Option<u64>)> = self
2224            .read()
2225            .await?
2226            .prepare(
2227                "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2228                |mut stmt| {
2229                    stmt.query((room_id,))?
2230                        .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2231                        .collect()
2232                },
2233            )
2234            .await?;
2235
2236        let mut dependent_events = Vec::with_capacity(res.len());
2237
2238        for entry in res {
2239            let created_at = entry
2240                .4
2241                .and_then(UInt::new)
2242                .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2243
2244            dependent_events.push(DependentQueuedRequest {
2245                own_transaction_id: entry.0.into(),
2246                parent_transaction_id: entry.1.into(),
2247                parent_key: entry.2.map(|json| self.deserialize_json(&json)).transpose()?,
2248                kind: self.deserialize_json(&entry.3)?,
2249                created_at,
2250            });
2251        }
2252
2253        Ok(dependent_events)
2254    }
2255
2256    async fn upsert_thread_subscriptions(
2257        &self,
2258        updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
2259    ) -> Result<(), Self::Error> {
2260        let values: Vec<_> = updates
2261            .into_iter()
2262            .map(|(room_id, thread_id, subscription)| {
2263                (
2264                    self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id),
2265                    self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id),
2266                    subscription.status.as_str(),
2267                    subscription.bump_stamp,
2268                )
2269            })
2270            .collect();
2271
2272        self.write()
2273            .await
2274            .with_transaction(move |txn| {
2275                let mut txn = txn.prepare_cached(
2276                    "INSERT INTO thread_subscriptions (room_id, event_id, status, bump_stamp)
2277                    VALUES (?, ?, ?, ?)
2278                    ON CONFLICT (room_id, event_id) DO UPDATE
2279                    SET
2280                        status =
2281                            CASE
2282                                WHEN thread_subscriptions.bump_stamp IS NULL THEN EXCLUDED.status
2283                                WHEN EXCLUDED.bump_stamp IS NULL THEN EXCLUDED.status
2284                                WHEN thread_subscriptions.bump_stamp < EXCLUDED.bump_stamp THEN EXCLUDED.status
2285                                ELSE thread_subscriptions.status
2286                            END,
2287                        bump_stamp =
2288                            CASE
2289                                WHEN thread_subscriptions.bump_stamp IS NULL THEN EXCLUDED.bump_stamp
2290                                WHEN EXCLUDED.bump_stamp IS NULL THEN thread_subscriptions.bump_stamp
2291                                WHEN thread_subscriptions.bump_stamp < EXCLUDED.bump_stamp THEN EXCLUDED.bump_stamp
2292                                ELSE thread_subscriptions.bump_stamp
2293                            END",
2294                )?;
2295
2296                for value in values {
2297                    txn.execute(value)?;
2298                }
2299
2300                Result::<_, Error>::Ok(())
2301            })
2302            .await?;
2303
2304        Ok(())
2305    }
2306
2307    async fn load_thread_subscription(
2308        &self,
2309        room_id: &RoomId,
2310        thread_id: &EventId,
2311    ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
2312        let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2313        let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2314
2315        Ok(self
2316            .read()
2317            .await?
2318            .query_row(
2319                "SELECT status, bump_stamp FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2320                (room_id, thread_id),
2321                |row| Ok((row.get::<_, String>(0)?, row.get::<_, Option<u64>>(1)?))
2322            )
2323            .await
2324            .optional()?
2325            .map(|(status, bump_stamp)| -> Result<_, Self::Error> {
2326                let status = ThreadSubscriptionStatus::from_str(&status).map_err(|_| {
2327                    Error::InvalidData { details: format!("Invalid thread status: {status}") }
2328                })?;
2329                Ok(StoredThreadSubscription { status, bump_stamp })
2330            })
2331            .transpose()?)
2332    }
2333
2334    async fn remove_thread_subscription(
2335        &self,
2336        room_id: &RoomId,
2337        thread_id: &EventId,
2338    ) -> Result<(), Self::Error> {
2339        let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2340        let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2341
2342        self.write()
2343            .await
2344            .execute(
2345                "DELETE FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2346                (room_id, thread_id),
2347            )
2348            .await?;
2349
2350        Ok(())
2351    }
2352
2353    async fn optimize(&self) -> Result<(), Self::Error> {
2354        Ok(self.vacuum().await?)
2355    }
2356
2357    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
2358        self.get_db_size().await
2359    }
2360}
2361
2362#[derive(Debug, Clone, Serialize, Deserialize)]
2363struct ReceiptData {
2364    receipt: Receipt,
2365    event_id: OwnedEventId,
2366    user_id: OwnedUserId,
2367}
2368
2369#[cfg(test)]
2370mod tests {
2371    use std::sync::{
2372        LazyLock,
2373        atomic::{AtomicU32, Ordering::SeqCst},
2374    };
2375
2376    use matrix_sdk_base::{StateStore, StoreError, statestore_integration_tests};
2377    use tempfile::{TempDir, tempdir};
2378
2379    use super::SqliteStateStore;
2380
2381    static TMP_DIR: LazyLock<TempDir> = LazyLock::new(|| tempdir().unwrap());
2382    static NUM: AtomicU32 = AtomicU32::new(0);
2383
2384    async fn get_store() -> Result<impl StateStore, StoreError> {
2385        let name = NUM.fetch_add(1, SeqCst).to_string();
2386        let tmpdir_path = TMP_DIR.path().join(name);
2387
2388        tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2389
2390        Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
2391    }
2392
2393    statestore_integration_tests!();
2394}
2395
2396#[cfg(test)]
2397mod encrypted_tests {
2398    use std::{
2399        path::PathBuf,
2400        sync::{
2401            LazyLock,
2402            atomic::{AtomicU32, Ordering::SeqCst},
2403        },
2404    };
2405
2406    use matrix_sdk_base::{StateStore, StoreError, statestore_integration_tests};
2407    use matrix_sdk_test::async_test;
2408    use tempfile::{TempDir, tempdir};
2409
2410    use super::SqliteStateStore;
2411    use crate::{SqliteStoreConfig, utils::SqliteAsyncConnExt};
2412
2413    static TMP_DIR: LazyLock<TempDir> = LazyLock::new(|| tempdir().unwrap());
2414    static NUM: AtomicU32 = AtomicU32::new(0);
2415
2416    fn new_state_store_workspace() -> PathBuf {
2417        let name = NUM.fetch_add(1, SeqCst).to_string();
2418        TMP_DIR.path().join(name)
2419    }
2420
2421    async fn get_store() -> Result<impl StateStore, StoreError> {
2422        let tmpdir_path = new_state_store_workspace();
2423
2424        tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2425
2426        Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
2427            .await
2428            .unwrap())
2429    }
2430
2431    #[async_test]
2432    async fn test_pool_size() {
2433        let tmpdir_path = new_state_store_workspace();
2434        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
2435
2436        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2437
2438        assert_eq!(store.pool.status().max_size, 42);
2439    }
2440
2441    #[async_test]
2442    async fn test_cache_size() {
2443        let tmpdir_path = new_state_store_workspace();
2444        let store_open_config = SqliteStoreConfig::new(tmpdir_path).cache_size(1500);
2445
2446        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2447
2448        let conn = store.pool.get().await.unwrap();
2449        let cache_size =
2450            conn.query_row("PRAGMA cache_size", (), |row| row.get::<_, i32>(0)).await.unwrap();
2451
2452        // The value passed to `SqliteStoreConfig` is in bytes. Check it is
2453        // converted to kibibytes. Also, it must be a negative value because it
2454        // _is_ the size in kibibytes, not in page size.
2455        assert_eq!(cache_size, -(1500 / 1024));
2456    }
2457
2458    #[async_test]
2459    async fn test_journal_size_limit() {
2460        let tmpdir_path = new_state_store_workspace();
2461        let store_open_config = SqliteStoreConfig::new(tmpdir_path).journal_size_limit(1500);
2462
2463        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2464
2465        let conn = store.pool.get().await.unwrap();
2466        let journal_size_limit = conn
2467            .query_row("PRAGMA journal_size_limit", (), |row| row.get::<_, u32>(0))
2468            .await
2469            .unwrap();
2470
2471        // The value passed to `SqliteStoreConfig` is in bytes. It stays in
2472        // bytes in SQLite.
2473        assert_eq!(journal_size_limit, 1500);
2474    }
2475
2476    statestore_integration_tests!();
2477}
2478
2479#[cfg(test)]
2480mod migration_tests {
2481    use std::{
2482        path::{Path, PathBuf},
2483        sync::{
2484            Arc, LazyLock,
2485            atomic::{AtomicU32, Ordering::SeqCst},
2486        },
2487    };
2488
2489    use as_variant::as_variant;
2490    use matrix_sdk_base::{
2491        RoomState, StateStore,
2492        media::{MediaFormat, MediaRequestParameters},
2493        store::{
2494            ChildTransactionId, DependentQueuedRequestKind, RoomLoadSettings,
2495            SerializableEventContent,
2496        },
2497        sync::UnreadNotificationsCount,
2498    };
2499    use matrix_sdk_test::async_test;
2500    use ruma::{
2501        EventId, MilliSecondsSinceUnixEpoch, OwnedTransactionId, RoomId, TransactionId, UserId,
2502        events::{
2503            StateEventType,
2504            room::{MediaSource, create::RoomCreateEventContent, message::RoomMessageEventContent},
2505        },
2506        room_id, server_name, user_id,
2507    };
2508    use rusqlite::Transaction;
2509    use serde::{Deserialize, Serialize};
2510    use serde_json::json;
2511    use tempfile::{TempDir, tempdir};
2512    use tokio::{fs, sync::Mutex};
2513    use zeroize::Zeroizing;
2514
2515    use super::{DATABASE_NAME, SqliteStateStore, init, keys};
2516    use crate::{
2517        OpenStoreError, Secret, SqliteStoreConfig,
2518        error::{Error, Result},
2519        utils::{EncryptableStore as _, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
2520    };
2521
2522    static TMP_DIR: LazyLock<TempDir> = LazyLock::new(|| tempdir().unwrap());
2523    static NUM: AtomicU32 = AtomicU32::new(0);
2524    const SECRET: &str = "secret";
2525
2526    fn new_path() -> PathBuf {
2527        let name = NUM.fetch_add(1, SeqCst).to_string();
2528        TMP_DIR.path().join(name)
2529    }
2530
2531    async fn create_fake_db(path: &Path, version: u8) -> Result<SqliteStateStore> {
2532        let config = SqliteStoreConfig::new(path);
2533
2534        fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir).unwrap();
2535
2536        let pool = config.build_pool_of_connections(DATABASE_NAME).unwrap();
2537        let conn = pool.get().await?;
2538
2539        init(&conn).await?;
2540
2541        let store_cipher = Some(Arc::new(
2542            conn.get_or_create_store_cipher(Secret::PassPhrase(Zeroizing::new(SECRET.to_owned())))
2543                .await
2544                .unwrap(),
2545        ));
2546        let this = SqliteStateStore {
2547            store_cipher,
2548            pool,
2549            // Use `conn` as our selected write connections.
2550            write_connection: Arc::new(Mutex::new(conn)),
2551        };
2552        this.run_migrations(1, Some(version)).await?;
2553
2554        Ok(this)
2555    }
2556
2557    fn room_info_v1_json(
2558        room_id: &RoomId,
2559        state: RoomState,
2560        name: Option<&str>,
2561        creator: Option<&UserId>,
2562    ) -> serde_json::Value {
2563        // Test with name set or not.
2564        let name_content = match name {
2565            Some(name) => json!({ "name": name }),
2566            None => json!({ "name": null }),
2567        };
2568        // Test with creator set or not.
2569        let create_content = match creator {
2570            Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2571            None => RoomCreateEventContent::new_v11(),
2572        };
2573
2574        json!({
2575            "room_id": room_id,
2576            "room_type": state,
2577            "notification_counts": UnreadNotificationsCount::default(),
2578            "summary": {
2579                "heroes": [],
2580                "joined_member_count": 0,
2581                "invited_member_count": 0,
2582            },
2583            "members_synced": false,
2584            "base_info": {
2585                "dm_targets": [],
2586                "max_power_level": 100,
2587                "name": {
2588                    "Original": {
2589                        "content": name_content,
2590                    },
2591                },
2592                "create": {
2593                    "Original": {
2594                        "content": create_content,
2595                    }
2596                }
2597            },
2598        })
2599    }
2600
2601    #[async_test]
2602    pub async fn test_migrating_v1_to_v2() {
2603        let path = new_path();
2604        // Create and populate db.
2605        {
2606            let db = create_fake_db(&path, 1).await.unwrap();
2607            let conn = db.pool.get().await.unwrap();
2608
2609            let this = db.clone();
2610            conn.with_transaction(move |txn| {
2611                for i in 0..5 {
2612                    let room_id = RoomId::parse(format!("!room_{i}:localhost")).unwrap();
2613                    let (state, stripped) =
2614                        if i < 3 { (RoomState::Joined, false) } else { (RoomState::Invited, true) };
2615                    let info = room_info_v1_json(&room_id, state, None, None);
2616
2617                    let room_id = this.encode_key(keys::ROOM_INFO, room_id);
2618                    let data = this.serialize_json(&info)?;
2619
2620                    txn.prepare_cached(
2621                        "INSERT INTO room_info (room_id, stripped, data)
2622                         VALUES (?, ?, ?)",
2623                    )?
2624                    .execute((room_id, stripped, data))?;
2625                }
2626
2627                Result::<_, Error>::Ok(())
2628            })
2629            .await
2630            .unwrap();
2631        }
2632
2633        // This transparently migrates to the latest version.
2634        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2635
2636        // Check all room infos are there.
2637        assert_eq!(store.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 5);
2638    }
2639
2640    // Add a room in version 2 format of the state store.
2641    fn add_room_v2(
2642        this: &SqliteStateStore,
2643        txn: &Transaction<'_>,
2644        room_id: &RoomId,
2645        name: Option<&str>,
2646        create_creator: Option<&UserId>,
2647        create_sender: Option<&UserId>,
2648    ) -> Result<(), Error> {
2649        let room_info_json = room_info_v1_json(room_id, RoomState::Joined, name, create_creator);
2650
2651        let encoded_room_id = this.encode_key(keys::ROOM_INFO, room_id);
2652        let encoded_state =
2653            this.encode_key(keys::ROOM_INFO, serde_json::to_string(&RoomState::Joined)?);
2654        let data = this.serialize_json(&room_info_json)?;
2655
2656        txn.prepare_cached(
2657            "INSERT INTO room_info (room_id, state, data)
2658             VALUES (?, ?, ?)",
2659        )?
2660        .execute((encoded_room_id, encoded_state, data))?;
2661
2662        // Test with or without `m.room.create` event in the room state.
2663        let Some(create_sender) = create_sender else {
2664            return Ok(());
2665        };
2666
2667        let create_content = match create_creator {
2668            Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2669            None => RoomCreateEventContent::new_v11(),
2670        };
2671
2672        let event_id = EventId::new_v1(server_name!("dummy.local"));
2673        let create_event = json!({
2674            "content": create_content,
2675            "event_id": event_id,
2676            "sender": create_sender.to_owned(),
2677            "origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
2678            "state_key": "",
2679            "type": "m.room.create",
2680            "unsigned": {},
2681        });
2682
2683        let encoded_room_id = this.encode_key(keys::STATE_EVENT, room_id);
2684        let encoded_event_type =
2685            this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
2686        let encoded_state_key = this.encode_key(keys::STATE_EVENT, "");
2687        let stripped = false;
2688        let encoded_event_id = this.encode_key(keys::STATE_EVENT, event_id);
2689        let data = this.serialize_json(&create_event)?;
2690
2691        txn.prepare_cached(
2692            "INSERT
2693             INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
2694             VALUES (?, ?, ?, ?, ?, ?)",
2695        )?
2696        .execute((
2697            encoded_room_id,
2698            encoded_event_type,
2699            encoded_state_key,
2700            stripped,
2701            encoded_event_id,
2702            data,
2703        ))?;
2704
2705        Ok(())
2706    }
2707
2708    #[async_test]
2709    pub async fn test_migrating_v2_to_v3() {
2710        let path = new_path();
2711
2712        // Room A: with name, creator and sender.
2713        let room_a_id = room_id!("!room_a:dummy.local");
2714        let room_a_name = "Room A";
2715        let room_a_creator = user_id!("@creator:dummy.local");
2716        // Use a different sender to check that sender is used over creator in
2717        // migration.
2718        let room_a_create_sender = user_id!("@sender:dummy.local");
2719
2720        // Room B: without name, creator and sender.
2721        let room_b_id = room_id!("!room_b:dummy.local");
2722
2723        // Room C: only with sender.
2724        let room_c_id = room_id!("!room_c:dummy.local");
2725        let room_c_create_sender = user_id!("@creator:dummy.local");
2726
2727        // Create and populate db.
2728        {
2729            let db = create_fake_db(&path, 2).await.unwrap();
2730            let conn = db.pool.get().await.unwrap();
2731
2732            let this = db.clone();
2733            conn.with_transaction(move |txn| {
2734                add_room_v2(
2735                    &this,
2736                    txn,
2737                    room_a_id,
2738                    Some(room_a_name),
2739                    Some(room_a_creator),
2740                    Some(room_a_create_sender),
2741                )?;
2742                add_room_v2(&this, txn, room_b_id, None, None, None)?;
2743                add_room_v2(&this, txn, room_c_id, None, None, Some(room_c_create_sender))?;
2744
2745                Result::<_, Error>::Ok(())
2746            })
2747            .await
2748            .unwrap();
2749        }
2750
2751        // This transparently migrates to the latest version.
2752        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2753
2754        // Check all room infos are there.
2755        let room_infos = store.get_room_infos(&RoomLoadSettings::default()).await.unwrap();
2756        assert_eq!(room_infos.len(), 3);
2757
2758        let room_a = room_infos.iter().find(|r| r.room_id() == room_a_id).unwrap();
2759        assert_eq!(room_a.name(), Some(room_a_name));
2760        assert_eq!(room_a.creators(), Some(vec![room_a_create_sender.to_owned()]));
2761
2762        let room_b = room_infos.iter().find(|r| r.room_id() == room_b_id).unwrap();
2763        assert_eq!(room_b.name(), None);
2764        assert_eq!(room_b.creators(), None);
2765
2766        let room_c = room_infos.iter().find(|r| r.room_id() == room_c_id).unwrap();
2767        assert_eq!(room_c.name(), None);
2768        assert_eq!(room_c.creators(), Some(vec![room_c_create_sender.to_owned()]));
2769    }
2770
2771    #[async_test]
2772    pub async fn test_migrating_v7_to_v9() {
2773        let path = new_path();
2774
2775        let room_id = room_id!("!room_a:dummy.local");
2776        let wedged_event_transaction_id = TransactionId::new();
2777        let local_event_transaction_id = TransactionId::new();
2778
2779        // Create and populate db.
2780        {
2781            let db = create_fake_db(&path, 7).await.unwrap();
2782            let conn = db.pool.get().await.unwrap();
2783
2784            let wedge_tx = wedged_event_transaction_id.clone();
2785            let local_tx = local_event_transaction_id.clone();
2786
2787            conn.with_transaction(move |txn| {
2788                add_dependent_send_queue_event_v7(
2789                    &db,
2790                    txn,
2791                    room_id,
2792                    &local_tx,
2793                    ChildTransactionId::new(),
2794                    DependentQueuedRequestKind::RedactEvent,
2795                )?;
2796                add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?;
2797                add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?;
2798                Result::<_, Error>::Ok(())
2799            })
2800            .await
2801            .unwrap();
2802        }
2803
2804        // This transparently migrates to the latest version, which clears up all
2805        // requests and dependent requests.
2806        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2807
2808        let requests = store.load_send_queue_requests(room_id).await.unwrap();
2809        assert!(requests.is_empty());
2810
2811        let dependent_requests = store.load_dependent_queued_requests(room_id).await.unwrap();
2812        assert!(dependent_requests.is_empty());
2813    }
2814
2815    fn add_send_queue_event_v7(
2816        this: &SqliteStateStore,
2817        txn: &Transaction<'_>,
2818        transaction_id: &TransactionId,
2819        room_id: &RoomId,
2820        is_wedged: bool,
2821    ) -> Result<(), Error> {
2822        let content =
2823            SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?;
2824
2825        let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id);
2826        let room_id_value = this.serialize_value(&room_id.to_owned())?;
2827
2828        let content = this.serialize_json(&content)?;
2829
2830        txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
2831            .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
2832
2833        Ok(())
2834    }
2835
2836    fn add_dependent_send_queue_event_v7(
2837        this: &SqliteStateStore,
2838        txn: &Transaction<'_>,
2839        room_id: &RoomId,
2840        parent_txn_id: &TransactionId,
2841        own_txn_id: ChildTransactionId,
2842        content: DependentQueuedRequestKind,
2843    ) -> Result<(), Error> {
2844        let room_id_value = this.serialize_value(&room_id.to_owned())?;
2845
2846        let parent_txn_id = parent_txn_id.to_string();
2847        let own_txn_id = own_txn_id.to_string();
2848        let content = this.serialize_json(&content)?;
2849
2850        txn.prepare_cached(
2851            "INSERT INTO dependent_send_queue_events
2852                         (room_id, parent_transaction_id, own_transaction_id, content)
2853                       VALUES (?, ?, ?, ?)",
2854        )?
2855        .execute((room_id_value, parent_txn_id, own_txn_id, content))?;
2856
2857        Ok(())
2858    }
2859
2860    #[derive(Clone, Debug, Serialize, Deserialize)]
2861    pub enum LegacyDependentQueuedRequestKind {
2862        UploadFileWithThumbnail {
2863            content_type: String,
2864            cache_key: MediaRequestParameters,
2865            related_to: OwnedTransactionId,
2866        },
2867    }
2868
2869    #[async_test]
2870    pub async fn test_dependent_queued_request_variant_renaming() {
2871        let path = new_path();
2872        let db = create_fake_db(&path, 7).await.unwrap();
2873
2874        let cache_key = MediaRequestParameters {
2875            format: MediaFormat::File,
2876            source: MediaSource::Plain("https://server.local/foobar".into()),
2877        };
2878        let related_to = TransactionId::new();
2879        let request = LegacyDependentQueuedRequestKind::UploadFileWithThumbnail {
2880            content_type: "image/png".to_owned(),
2881            cache_key,
2882            related_to: related_to.clone(),
2883        };
2884
2885        let data = db
2886            .serialize_json(&request)
2887            .expect("should be able to serialize legacy dependent request");
2888        let deserialized: DependentQueuedRequestKind = db.deserialize_json(&data).expect(
2889            "should be able to deserialize dependent request from legacy dependent request",
2890        );
2891
2892        as_variant!(deserialized, DependentQueuedRequestKind::UploadFileOrThumbnail { related_to: de_related_to, .. } => {
2893            assert_eq!(de_related_to, related_to);
2894        });
2895    }
2896}