Skip to main content

matrix_sdk_sqlite/
state_store.rs

1use std::{
2    borrow::Cow,
3    collections::{BTreeMap, BTreeSet, HashMap},
4    fmt, iter,
5    path::Path,
6    str::FromStr as _,
7    sync::Arc,
8};
9
10use async_trait::async_trait;
11use matrix_sdk_base::{
12    MinimalRoomMemberEvent, ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK, RoomInfo,
13    RoomMemberships, RoomState, StateChanges, StateStore, StateStoreDataKey, StateStoreDataValue,
14    deserialized_responses::{DisplayName, RawAnySyncOrStrippedState, SyncOrStrippedState},
15    store::{
16        ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError,
17        QueuedRequest, QueuedRequestKind, RoomLoadSettings, SentRequestKey,
18        StoredThreadSubscription, ThreadSubscriptionStatus, migration_helpers::RoomInfoV1,
19    },
20};
21use matrix_sdk_store_encryption::StoreCipher;
22use ruma::{
23    CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
24    OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
25    canonical_json::{RedactedBecause, redact},
26    events::{
27        AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
28        GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
29        presence::PresenceEvent,
30        receipt::{Receipt, ReceiptThread, ReceiptType},
31        room::{
32            create::RoomCreateEventContent,
33            member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
34        },
35    },
36    serde::Raw,
37};
38use rusqlite::{OptionalExtension, Transaction};
39use serde::{Deserialize, Serialize};
40use tokio::{
41    fs,
42    sync::{Mutex, OwnedMutexGuard},
43};
44use tracing::{debug, instrument, warn};
45
46use crate::{
47    OpenStoreError, Secret, SqliteStoreConfig,
48    connection::{Connection as SqliteAsyncConn, Pool as SqlitePool},
49    error::{Error, Result},
50    utils::{
51        EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
52        SqliteKeyValueStoreConnExt, repeat_vars,
53    },
54};
55
56mod keys {
57    // Tables
58    pub const KV_BLOB: &str = "kv_blob";
59    pub const ROOM_INFO: &str = "room_info";
60    pub const STATE_EVENT: &str = "state_event";
61    pub const GLOBAL_ACCOUNT_DATA: &str = "global_account_data";
62    pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
63    pub const MEMBER: &str = "member";
64    pub const PROFILE: &str = "profile";
65    pub const RECEIPT: &str = "receipt";
66    pub const DISPLAY_NAME: &str = "display_name";
67    pub const SEND_QUEUE: &str = "send_queue_events";
68    pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
69    pub const THREAD_SUBSCRIPTIONS: &str = "thread_subscriptions";
70}
71
72/// The filename used for the SQLITE database file used by the state store.
73pub const DATABASE_NAME: &str = "matrix-sdk-state.sqlite3";
74
75/// An SQLite-based state store.
76#[derive(Clone)]
77pub struct SqliteStateStore {
78    store_cipher: Option<Arc<StoreCipher>>,
79
80    /// The pool of connections.
81    pool: SqlitePool,
82
83    /// We make the difference between connections for read operations, and for
84    /// write operations. We keep a single connection apart from write
85    /// operations. All other connections are used for read operations. The
86    /// lock is used to ensure there is one owner at a time.
87    write_connection: Arc<Mutex<SqliteAsyncConn>>,
88}
89
90#[cfg(not(tarpaulin_include))]
91impl fmt::Debug for SqliteStateStore {
92    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93        f.debug_struct("SqliteStateStore").finish_non_exhaustive()
94    }
95}
96
97impl SqliteStateStore {
98    /// Open the SQLite-based state store at the given path using the given
99    /// given passphrase to encrypt private data.
100    pub async fn open(
101        path: impl AsRef<Path>,
102        passphrase: Option<&str>,
103    ) -> Result<Self, OpenStoreError> {
104        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
105    }
106
107    /// Open the SQLite-based state store at the given path using the given
108    /// key to encrypt private data.
109    pub async fn open_with_key(
110        path: impl AsRef<Path>,
111        key: Option<&[u8; 32]>,
112    ) -> Result<Self, OpenStoreError> {
113        Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
114    }
115
116    /// Open the SQLite-based state store with the config open config.
117    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
118        fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir)?;
119
120        let pool = config.build_pool_of_connections(DATABASE_NAME)?;
121
122        let this = Self::open_with_pool(pool, config.secret).await?;
123        this.pool.get().await?.apply_runtime_config(config.runtime_config).await?;
124
125        Ok(this)
126    }
127
128    /// Create an SQLite-based state store using the given SQLite database pool.
129    /// The given secret will be used to encrypt private data.
130    pub async fn open_with_pool(
131        pool: SqlitePool,
132        secret: Option<Secret>,
133    ) -> Result<Self, OpenStoreError> {
134        let conn = pool.get().await?;
135
136        let mut version = conn.db_version().await?;
137
138        if version == 0 {
139            init(&conn).await?;
140            version = 1;
141        }
142
143        let store_cipher = match secret {
144            Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
145            None => None,
146        };
147        let this = Self {
148            store_cipher,
149            pool,
150            // Use `conn` as our selected write connections.
151            write_connection: Arc::new(Mutex::new(conn)),
152        };
153        this.run_migrations(version, None).await?;
154
155        this.read().await?.wal_checkpoint().await;
156
157        Ok(this)
158    }
159
160    /// Run database migrations from the given `from` version to the given `to`
161    /// version
162    ///
163    /// If `to` is `None`, the current database version will be used.
164    async fn run_migrations(&self, from: u8, to: Option<u8>) -> Result<()> {
165        if to == Some(1) {
166            return Ok(());
167        }
168
169        let conn = self.write().await;
170
171        if from < 2 {
172            debug!("Upgrading database to version 2");
173            let this = self.clone();
174            conn.with_transaction(move |txn| {
175                // Create new table.
176                txn.execute_batch(include_str!(
177                    "../migrations/state_store/002_a_create_new_room_info.sql"
178                ))?;
179
180                // Migrate data to new table.
181                for data in txn
182                    .prepare("SELECT data FROM room_info")?
183                    .query_map((), |row| row.get::<_, Vec<u8>>(0))?
184                {
185                    let data = data?;
186                    let room_info: RoomInfoV1 = this.deserialize_json(&data)?;
187
188                    let room_id = this.encode_key(keys::ROOM_INFO, room_info.room_id());
189                    let state = this
190                        .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
191                    txn.prepare_cached(
192                        "INSERT OR REPLACE INTO new_room_info (room_id, state, data)
193                         VALUES (?, ?, ?)",
194                    )?
195                    .execute((room_id, state, data))?;
196                }
197
198                // Replace old table.
199                txn.execute_batch(include_str!(
200                    "../migrations/state_store/002_b_replace_room_info.sql"
201                ))?;
202
203                txn.set_db_version(2)?;
204                Result::<_, Error>::Ok(())
205            })
206            .await?;
207        }
208
209        if to == Some(2) {
210            return Ok(());
211        }
212
213        // Migration to v3: RoomInfo format has changed.
214        if from < 3 {
215            debug!("Upgrading database to version 3");
216            let this = self.clone();
217            conn.with_transaction(move |txn| {
218                // Migrate data .
219                for data in txn
220                    .prepare("SELECT data FROM room_info")?
221                    .query_map((), |row| row.get::<_, Vec<u8>>(0))?
222                {
223                    let data = data?;
224                    let room_info_v1: RoomInfoV1 = this.deserialize_json(&data)?;
225
226                    // Get the `m.room.create` event from the room state.
227                    let room_id = this.encode_key(keys::STATE_EVENT, room_info_v1.room_id());
228                    let event_type =
229                        this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
230                    let create_res = txn
231                        .prepare(
232                            "SELECT stripped, data FROM state_event
233                             WHERE room_id = ? AND event_type = ?",
234                        )?
235                        .query_row([room_id, event_type], |row| {
236                            Ok((row.get::<_, bool>(0)?, row.get::<_, Vec<u8>>(1)?))
237                        })
238                        .optional()?;
239
240                    let create = create_res.and_then(|(stripped, data)| {
241                        let create = if stripped {
242                            SyncOrStrippedState::<RoomCreateEventContent>::Stripped(
243                                this.deserialize_json(&data).ok()?,
244                            )
245                        } else {
246                            SyncOrStrippedState::Sync(this.deserialize_json(&data).ok()?)
247                        };
248                        Some(create)
249                    });
250
251                    let migrated_room_info = room_info_v1.migrate(create.as_ref());
252
253                    let data = this.serialize_json(&migrated_room_info)?;
254                    let room_id = this.encode_key(keys::ROOM_INFO, migrated_room_info.room_id());
255                    txn.prepare_cached("UPDATE room_info SET data = ? WHERE room_id = ?")?
256                        .execute((data, room_id))?;
257                }
258
259                txn.set_db_version(3)?;
260                Result::<_, Error>::Ok(())
261            })
262            .await?;
263        }
264
265        if to == Some(3) {
266            return Ok(());
267        }
268
269        if from < 4 {
270            debug!("Upgrading database to version 4");
271            conn.with_transaction(move |txn| {
272                // Create new table.
273                txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
274                txn.set_db_version(4)
275            })
276            .await?;
277        }
278
279        if to == Some(4) {
280            return Ok(());
281        }
282
283        if from < 5 {
284            debug!("Upgrading database to version 5");
285            conn.with_transaction(move |txn| {
286                // Create new table.
287                txn.execute_batch(include_str!(
288                    "../migrations/state_store/004_send_queue_with_roomid_value.sql"
289                ))?;
290                txn.set_db_version(4)
291            })
292            .await?;
293        }
294
295        if to == Some(5) {
296            return Ok(());
297        }
298
299        if from < 6 {
300            debug!("Upgrading database to version 6");
301            conn.with_transaction(move |txn| {
302                // Create new table.
303                txn.execute_batch(include_str!(
304                    "../migrations/state_store/005_send_queue_dependent_events.sql"
305                ))?;
306                txn.set_db_version(6)
307            })
308            .await?;
309        }
310
311        if to == Some(6) {
312            return Ok(());
313        }
314
315        if from < 7 {
316            debug!("Upgrading database to version 7");
317            conn.with_transaction(move |txn| {
318                // Drop media table.
319                txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
320                txn.set_db_version(7)
321            })
322            .await?;
323        }
324
325        if to == Some(7) {
326            return Ok(());
327        }
328
329        if from < 8 {
330            debug!("Upgrading database to version 8");
331            // Replace all existing wedged events with a generic error.
332            let error = QueueWedgeError::GenericApiError {
333                msg: "local echo failed to send in a previous session".into(),
334            };
335            let default_err = self.serialize_value(&error)?;
336
337            conn.with_transaction(move |txn| {
338                // Update send queue table to persist the wedge reason if any.
339                txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?;
340
341                // Migrate the data, add a generic error for currently wedged events
342
343                for wedged_entries in txn
344                    .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")?
345                    .query_map((), |row| {
346                        Ok(
347                            (row.get::<_, Vec<u8>>(0)?,row.get::<_, String>(1)?)
348                        )
349                    })? {
350
351                    let (room_id, transaction_id) = wedged_entries?;
352
353                    txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?
354                        .execute((default_err.clone(), room_id, transaction_id))?;
355                }
356
357
358                // Clean up the table now that data is migrated
359                txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?;
360
361                txn.set_db_version(8)
362            })
363                .await?;
364        }
365
366        if to == Some(8) {
367            return Ok(());
368        }
369
370        if from < 9 {
371            debug!("Upgrading database to version 9");
372            conn.with_transaction(move |txn| {
373                // Run the migration.
374                txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?;
375                txn.set_db_version(9)
376            })
377            .await?;
378        }
379
380        if to == Some(9) {
381            return Ok(());
382        }
383
384        if from < 10 {
385            debug!("Upgrading database to version 10");
386            conn.with_transaction(move |txn| {
387                // Run the migration.
388                txn.execute_batch(include_str!(
389                    "../migrations/state_store/009_send_queue_priority.sql"
390                ))?;
391                txn.set_db_version(10)
392            })
393            .await?;
394        }
395
396        if to == Some(10) {
397            return Ok(());
398        }
399
400        if from < 11 {
401            debug!("Upgrading database to version 11");
402            conn.with_transaction(move |txn| {
403                // Run the migration.
404                txn.execute_batch(include_str!(
405                    "../migrations/state_store/010_send_queue_enqueue_time.sql"
406                ))?;
407                txn.set_db_version(11)
408            })
409            .await?;
410        }
411
412        if to == Some(11) {
413            return Ok(());
414        }
415
416        if from < 12 {
417            debug!("Upgrading database to version 12");
418            // Defragment the DB and optimize its size on the filesystem.
419            // This should have been run in the migration for version 7, to reduce the size
420            // of the DB as we removed the media cache.
421            conn.vacuum().await?;
422            conn.set_kv("version", vec![12]).await?;
423        }
424
425        if to == Some(12) {
426            return Ok(());
427        }
428
429        if from < 13 {
430            debug!("Upgrading database to version 13");
431            conn.with_transaction(move |txn| {
432                // Run the migration.
433                txn.execute_batch(include_str!(
434                    "../migrations/state_store/011_thread_subscriptions.sql"
435                ))?;
436                txn.set_db_version(13)
437            })
438            .await?;
439        }
440
441        if to == Some(13) {
442            return Ok(());
443        }
444
445        if from < 14 {
446            debug!("Upgrading database to version 14");
447            conn.with_transaction(move |txn| {
448                // Run the migration.
449                txn.execute_batch(include_str!(
450                    "../migrations/state_store/012_thread_subscriptions_bumpstamp.sql"
451                ))?;
452                txn.set_db_version(14)
453            })
454            .await?;
455        }
456
457        if to == Some(14) {
458            return Ok(());
459        }
460
461        if from < 15 {
462            debug!("Upgrading database to version 15");
463            conn.with_transaction(move |txn| {
464                // Run the migration.
465                txn.execute_batch(include_str!(
466                    "../migrations/state_store/013_send_queue_new_parent_key_format.sql"
467                ))?;
468                txn.set_db_version(15)
469            })
470            .await?;
471        }
472
473        if to == Some(15) {
474            return Ok(());
475        }
476
477        Ok(())
478    }
479
480    fn encode_state_store_data_key(&self, key: StateStoreDataKey<'_>) -> Key {
481        let key_s = match key {
482            StateStoreDataKey::SyncToken => Cow::Borrowed(StateStoreDataKey::SYNC_TOKEN),
483            StateStoreDataKey::SupportedVersions => {
484                Cow::Borrowed(StateStoreDataKey::SUPPORTED_VERSIONS)
485            }
486            StateStoreDataKey::WellKnown => Cow::Borrowed(StateStoreDataKey::WELL_KNOWN),
487            StateStoreDataKey::Filter(f) => {
488                Cow::Owned(format!("{}:{f}", StateStoreDataKey::FILTER))
489            }
490            StateStoreDataKey::UserAvatarUrl(u) => {
491                Cow::Owned(format!("{}:{u}", StateStoreDataKey::USER_AVATAR_URL))
492            }
493            StateStoreDataKey::RecentlyVisitedRooms(b) => {
494                Cow::Owned(format!("{}:{b}", StateStoreDataKey::RECENTLY_VISITED_ROOMS))
495            }
496            StateStoreDataKey::UtdHookManagerData => {
497                Cow::Borrowed(StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
498            }
499            StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
500                Cow::Borrowed(StateStoreDataKey::ONE_TIME_KEY_ALREADY_UPLOADED)
501            }
502            StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
503                if let Some(thread_root) = thread_root {
504                    Cow::Owned(format!(
505                        "{}:{room_id}:{thread_root}",
506                        StateStoreDataKey::COMPOSER_DRAFT
507                    ))
508                } else {
509                    Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::COMPOSER_DRAFT))
510                }
511            }
512            StateStoreDataKey::SeenKnockRequests(room_id) => {
513                Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::SEEN_KNOCK_REQUESTS))
514            }
515            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
516                Cow::Borrowed(StateStoreDataKey::THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS)
517            }
518            StateStoreDataKey::HomeserverCapabilities => {
519                Cow::Borrowed(StateStoreDataKey::HOMESERVER_CAPABILITIES)
520            }
521        };
522
523        self.encode_key(keys::KV_BLOB, &*key_s)
524    }
525
526    fn encode_presence_key(&self, user_id: &UserId) -> Key {
527        self.encode_key(keys::KV_BLOB, format!("presence:{user_id}"))
528    }
529
530    fn encode_custom_key(&self, key: &[u8]) -> Key {
531        let mut full_key = b"custom:".to_vec();
532        full_key.extend(key);
533        self.encode_key(keys::KV_BLOB, full_key)
534    }
535
536    /// Acquire a connection for executing read operations.
537    #[instrument(skip_all)]
538    async fn read(&self) -> Result<SqliteAsyncConn> {
539        Ok(self.pool.get().await?)
540    }
541
542    /// Acquire a connection for executing write operations.
543    #[instrument(skip_all)]
544    async fn write(&self) -> OwnedMutexGuard<SqliteAsyncConn> {
545        self.write_connection.clone().lock_owned().await
546    }
547
548    fn remove_maybe_stripped_room_data(
549        &self,
550        txn: &Transaction<'_>,
551        room_id: &RoomId,
552        stripped: bool,
553    ) -> rusqlite::Result<()> {
554        let state_event_room_id = self.encode_key(keys::STATE_EVENT, room_id);
555        txn.remove_room_state_events(&state_event_room_id, Some(stripped))?;
556
557        let member_room_id = self.encode_key(keys::MEMBER, room_id);
558        txn.remove_room_members(&member_room_id, Some(stripped))
559    }
560
561    pub async fn vacuum(&self) -> Result<()> {
562        self.write_connection.lock().await.vacuum().await
563    }
564
565    pub async fn get_db_size(&self) -> Result<Option<usize>> {
566        let read_conn = self.pool.get().await?;
567        Ok(Some(read_conn.get_db_size().await?))
568    }
569}
570
571impl EncryptableStore for SqliteStateStore {
572    fn get_cypher(&self) -> Option<&StoreCipher> {
573        self.store_cipher.as_deref()
574    }
575}
576
577/// Initialize the database.
578async fn init(conn: &SqliteAsyncConn) -> Result<()> {
579    // First turn on WAL mode, this can't be done in the transaction, it fails with
580    // the error message: "cannot change into wal mode from within a transaction".
581    conn.execute_batch("PRAGMA journal_mode = wal;").await?;
582    conn.with_transaction(|txn| {
583        txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
584        txn.set_db_version(1)?;
585
586        Ok(())
587    })
588    .await
589}
590
591trait SqliteConnectionStateStoreExt {
592    fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()>;
593
594    fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()>;
595
596    fn set_room_account_data(
597        &self,
598        room_id: &[u8],
599        event_type: &[u8],
600        data: &[u8],
601    ) -> rusqlite::Result<()>;
602    fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()>;
603
604    fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()>;
605    fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>>;
606    fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()>;
607
608    fn set_state_event(
609        &self,
610        room_id: &[u8],
611        event_type: &[u8],
612        state_key: &[u8],
613        stripped: bool,
614        event_id: Option<&[u8]>,
615        data: &[u8],
616    ) -> rusqlite::Result<()>;
617    fn get_state_event_by_id(
618        &self,
619        room_id: &[u8],
620        event_id: &[u8],
621    ) -> rusqlite::Result<Option<Vec<u8>>>;
622    fn remove_room_state_events(
623        &self,
624        room_id: &[u8],
625        stripped: Option<bool>,
626    ) -> rusqlite::Result<()>;
627
628    fn set_member(
629        &self,
630        room_id: &[u8],
631        user_id: &[u8],
632        membership: &[u8],
633        stripped: bool,
634        data: &[u8],
635    ) -> rusqlite::Result<()>;
636    fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()>;
637
638    fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
639    fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()>;
640    fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()>;
641
642    fn set_receipt(
643        &self,
644        room_id: &[u8],
645        user_id: &[u8],
646        receipt_type: &[u8],
647        thread_id: &[u8],
648        event_id: &[u8],
649        data: &[u8],
650    ) -> rusqlite::Result<()>;
651    fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()>;
652
653    fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()>;
654    fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()>;
655    fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()>;
656    fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
657    fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
658}
659
660impl SqliteConnectionStateStoreExt for rusqlite::Connection {
661    fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()> {
662        self.execute("INSERT OR REPLACE INTO kv_blob VALUES (?, ?)", (key, value))?;
663        Ok(())
664    }
665
666    fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()> {
667        self.prepare_cached(
668            "INSERT OR REPLACE INTO global_account_data (event_type, data)
669             VALUES (?, ?)",
670        )?
671        .execute((event_type, data))?;
672        Ok(())
673    }
674
675    fn set_room_account_data(
676        &self,
677        room_id: &[u8],
678        event_type: &[u8],
679        data: &[u8],
680    ) -> rusqlite::Result<()> {
681        self.prepare_cached(
682            "INSERT OR REPLACE INTO room_account_data (room_id, event_type, data)
683             VALUES (?, ?, ?)",
684        )?
685        .execute((room_id, event_type, data))?;
686        Ok(())
687    }
688
689    fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()> {
690        self.prepare(
691            "DELETE FROM room_account_data
692             WHERE room_id = ?",
693        )?
694        .execute((room_id,))?;
695        Ok(())
696    }
697
698    fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()> {
699        self.prepare_cached(
700            "INSERT OR REPLACE INTO room_info (room_id, state, data)
701             VALUES (?, ?, ?)",
702        )?
703        .execute((room_id, state, data))?;
704        Ok(())
705    }
706
707    fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>> {
708        self.query_row("SELECT data FROM room_info WHERE room_id = ?", (room_id,), |row| row.get(0))
709            .optional()
710    }
711
712    /// Remove the room info for the given room.
713    fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()> {
714        self.prepare_cached("DELETE FROM room_info WHERE room_id = ?")?.execute((room_id,))?;
715        Ok(())
716    }
717
718    fn set_state_event(
719        &self,
720        room_id: &[u8],
721        event_type: &[u8],
722        state_key: &[u8],
723        stripped: bool,
724        event_id: Option<&[u8]>,
725        data: &[u8],
726    ) -> rusqlite::Result<()> {
727        self.prepare_cached(
728            "INSERT OR REPLACE
729             INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
730             VALUES (?, ?, ?, ?, ?, ?)",
731        )?
732        .execute((room_id, event_type, state_key, stripped, event_id, data))?;
733        Ok(())
734    }
735
736    fn get_state_event_by_id(
737        &self,
738        room_id: &[u8],
739        event_id: &[u8],
740    ) -> rusqlite::Result<Option<Vec<u8>>> {
741        self.query_row(
742            "SELECT data FROM state_event WHERE room_id = ? AND event_id = ?",
743            (room_id, event_id),
744            |row| row.get(0),
745        )
746        .optional()
747    }
748
749    /// Remove state events for the given room.
750    ///
751    /// If `stripped` is `Some()`, only removes state events for the given
752    /// stripped state. Otherwise, state events are removed regardless of the
753    /// stripped state.
754    fn remove_room_state_events(
755        &self,
756        room_id: &[u8],
757        stripped: Option<bool>,
758    ) -> rusqlite::Result<()> {
759        if let Some(stripped) = stripped {
760            self.prepare_cached("DELETE FROM state_event WHERE room_id = ? AND stripped = ?")?
761                .execute((room_id, stripped))?;
762        } else {
763            self.prepare_cached("DELETE FROM state_event WHERE room_id = ?")?
764                .execute((room_id,))?;
765        }
766        Ok(())
767    }
768
769    fn set_member(
770        &self,
771        room_id: &[u8],
772        user_id: &[u8],
773        membership: &[u8],
774        stripped: bool,
775        data: &[u8],
776    ) -> rusqlite::Result<()> {
777        self.prepare_cached(
778            "INSERT OR REPLACE
779             INTO member (room_id, user_id, membership, stripped, data)
780             VALUES (?, ?, ?, ?, ?)",
781        )?
782        .execute((room_id, user_id, membership, stripped, data))?;
783        Ok(())
784    }
785
786    /// Remove members for the given room.
787    ///
788    /// If `stripped` is `Some()`, only removes members for the given stripped
789    /// state. Otherwise, members are removed regardless of the stripped state.
790    fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()> {
791        if let Some(stripped) = stripped {
792            self.prepare_cached("DELETE FROM member WHERE room_id = ? AND stripped = ?")?
793                .execute((room_id, stripped))?;
794        } else {
795            self.prepare_cached("DELETE FROM member WHERE room_id = ?")?.execute((room_id,))?;
796        }
797        Ok(())
798    }
799
800    fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
801        self.prepare_cached(
802            "INSERT OR REPLACE
803             INTO profile (room_id, user_id, data)
804             VALUES (?, ?, ?)",
805        )?
806        .execute((room_id, user_id, data))?;
807        Ok(())
808    }
809
810    fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()> {
811        self.prepare("DELETE FROM profile WHERE room_id = ?")?.execute((room_id,))?;
812        Ok(())
813    }
814
815    fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()> {
816        self.prepare("DELETE FROM profile WHERE room_id = ? AND user_id = ?")?
817            .execute((room_id, user_id))?;
818        Ok(())
819    }
820
821    fn set_receipt(
822        &self,
823        room_id: &[u8],
824        user_id: &[u8],
825        receipt_type: &[u8],
826        thread: &[u8],
827        event_id: &[u8],
828        data: &[u8],
829    ) -> rusqlite::Result<()> {
830        self.prepare_cached(
831            "INSERT OR REPLACE
832             INTO receipt (room_id, user_id, receipt_type, thread, event_id, data)
833             VALUES (?, ?, ?, ?, ?, ?)",
834        )?
835        .execute((room_id, user_id, receipt_type, thread, event_id, data))?;
836        Ok(())
837    }
838
839    fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()> {
840        self.prepare("DELETE FROM receipt WHERE room_id = ?")?.execute((room_id,))?;
841        Ok(())
842    }
843
844    fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
845        self.prepare_cached(
846            "INSERT OR REPLACE
847             INTO display_name (room_id, name, data)
848             VALUES (?, ?, ?)",
849        )?
850        .execute((room_id, name, data))?;
851        Ok(())
852    }
853
854    fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()> {
855        self.prepare("DELETE FROM display_name WHERE room_id = ? AND name = ?")?
856            .execute((room_id, name))?;
857        Ok(())
858    }
859
860    fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()> {
861        self.prepare("DELETE FROM display_name WHERE room_id = ?")?.execute((room_id,))?;
862        Ok(())
863    }
864
865    fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
866        self.prepare("DELETE FROM send_queue_events WHERE room_id = ?")?.execute((room_id,))?;
867        Ok(())
868    }
869
870    fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
871        self.prepare("DELETE FROM dependent_send_queue_events WHERE room_id = ?")?
872            .execute((room_id,))?;
873        Ok(())
874    }
875}
876
877#[async_trait]
878trait SqliteObjectStateStoreExt: SqliteAsyncConnExt {
879    async fn get_kv_blob(&self, key: Key) -> Result<Option<Vec<u8>>> {
880        Ok(self
881            .query_row("SELECT value FROM kv_blob WHERE key = ?", (key,), |row| row.get(0))
882            .await
883            .optional()?)
884    }
885
886    async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
887        let keys_length = keys.len();
888
889        self.chunk_large_query_over(keys, Some(keys_length), |txn, keys| {
890            let sql_params = repeat_vars(keys.len());
891            let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");
892
893            let params = rusqlite::params_from_iter(keys);
894
895            Ok(txn
896                .prepare(&sql)?
897                .query(params)?
898                .mapped(|row| row.get(0))
899                .collect::<Result<_, _>>()?)
900        })
901        .await
902    }
903
904    async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;
905
906    async fn delete_kv_blob(&self, key: Key) -> Result<()> {
907        self.execute("DELETE FROM kv_blob WHERE key = ?", (key,)).await?;
908        Ok(())
909    }
910
911    async fn get_room_infos(&self, room_id: Option<Key>) -> Result<Vec<Vec<u8>>> {
912        Ok(match room_id {
913            None => {
914                self.prepare("SELECT data FROM room_info", move |mut stmt| {
915                    stmt.query_map((), |row| row.get(0))?.collect()
916                })
917                .await?
918            }
919
920            Some(room_id) => {
921                self.prepare("SELECT data FROM room_info WHERE room_id = ?", move |mut stmt| {
922                    stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
923                })
924                .await?
925            }
926        })
927    }
928
929    async fn get_maybe_stripped_state_events_for_keys(
930        &self,
931        room_id: Key,
932        event_type: Key,
933        state_keys: Vec<Key>,
934    ) -> Result<Vec<(bool, Vec<u8>)>> {
935        self.chunk_large_query_over(state_keys, None, move |txn, state_keys: Vec<Key>| {
936            let sql_params = repeat_vars(state_keys.len());
937            let sql = format!(
938                "SELECT stripped, data FROM state_event
939                 WHERE room_id = ? AND event_type = ? AND state_key IN ({sql_params})"
940            );
941
942            let params = rusqlite::params_from_iter(
943                [room_id.clone(), event_type.clone()].into_iter().chain(state_keys),
944            );
945
946            Ok(txn
947                .prepare(&sql)?
948                .query(params)?
949                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
950                .collect::<Result<_, _>>()?)
951        })
952        .await
953    }
954
955    async fn get_maybe_stripped_state_events(
956        &self,
957        room_id: Key,
958        event_type: Key,
959    ) -> Result<Vec<(bool, Vec<u8>)>> {
960        Ok(self
961            .prepare(
962                "SELECT stripped, data FROM state_event
963                 WHERE room_id = ? AND event_type = ?",
964                |mut stmt| {
965                    stmt.query((room_id, event_type))?
966                        .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
967                        .collect()
968                },
969            )
970            .await?)
971    }
972
973    async fn get_profiles(
974        &self,
975        room_id: Key,
976        user_ids: Vec<Key>,
977    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
978        let user_ids_length = user_ids.len();
979
980        self.chunk_large_query_over(user_ids, Some(user_ids_length), move |txn, user_ids| {
981            let sql_params = repeat_vars(user_ids.len());
982            let sql = format!(
983                "SELECT user_id, data FROM profile WHERE room_id = ? AND user_id IN ({sql_params})"
984            );
985
986            let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(user_ids));
987
988            Ok(txn
989                .prepare(&sql)?
990                .query(params)?
991                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
992                .collect::<Result<_, _>>()?)
993        })
994        .await
995    }
996
997    async fn get_user_ids(&self, room_id: Key, memberships: Vec<Key>) -> Result<Vec<Vec<u8>>> {
998        let res = if memberships.is_empty() {
999            self.prepare("SELECT data FROM member WHERE room_id = ?", |mut stmt| {
1000                stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
1001            })
1002            .await?
1003        } else {
1004            self.chunk_large_query_over(memberships, None, move |txn, memberships| {
1005                let sql_params = repeat_vars(memberships.len());
1006                let sql = format!(
1007                    "SELECT data FROM member WHERE room_id = ? AND membership IN ({sql_params})"
1008                );
1009
1010                let params =
1011                    rusqlite::params_from_iter(iter::once(room_id.clone()).chain(memberships));
1012
1013                Ok(txn
1014                    .prepare(&sql)?
1015                    .query(params)?
1016                    .mapped(|row| row.get(0))
1017                    .collect::<Result<_, _>>()?)
1018            })
1019            .await?
1020        };
1021
1022        Ok(res)
1023    }
1024
1025    async fn get_global_account_data(&self, event_type: Key) -> Result<Option<Vec<u8>>> {
1026        Ok(self
1027            .query_row(
1028                "SELECT data FROM global_account_data WHERE event_type = ?",
1029                (event_type,),
1030                |row| row.get(0),
1031            )
1032            .await
1033            .optional()?)
1034    }
1035
1036    async fn get_room_account_data(
1037        &self,
1038        room_id: Key,
1039        event_type: Key,
1040    ) -> Result<Option<Vec<u8>>> {
1041        Ok(self
1042            .query_row(
1043                "SELECT data FROM room_account_data WHERE room_id = ? AND event_type = ?",
1044                (room_id, event_type),
1045                |row| row.get(0),
1046            )
1047            .await
1048            .optional()?)
1049    }
1050
1051    async fn get_display_names(
1052        &self,
1053        room_id: Key,
1054        names: Vec<Key>,
1055    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1056        let names_length = names.len();
1057
1058        self.chunk_large_query_over(names, Some(names_length), move |txn, names| {
1059            let sql_params = repeat_vars(names.len());
1060            let sql = format!(
1061                "SELECT name, data FROM display_name WHERE room_id = ? AND name IN ({sql_params})"
1062            );
1063
1064            let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(names));
1065
1066            Ok(txn
1067                .prepare(&sql)?
1068                .query(params)?
1069                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
1070                .collect::<Result<_, _>>()?)
1071        })
1072        .await
1073    }
1074
1075    async fn get_user_receipt(
1076        &self,
1077        room_id: Key,
1078        receipt_type: Key,
1079        thread: Key,
1080        user_id: Key,
1081    ) -> Result<Option<Vec<u8>>> {
1082        Ok(self
1083            .query_row(
1084                "SELECT data FROM receipt
1085                 WHERE room_id = ? AND receipt_type = ? AND thread = ? and user_id = ?",
1086                (room_id, receipt_type, thread, user_id),
1087                |row| row.get(0),
1088            )
1089            .await
1090            .optional()?)
1091    }
1092
1093    async fn get_event_receipts(
1094        &self,
1095        room_id: Key,
1096        receipt_type: Key,
1097        thread: Key,
1098        event_id: Key,
1099    ) -> Result<Vec<Vec<u8>>> {
1100        Ok(self
1101            .prepare(
1102                "SELECT data FROM receipt
1103                 WHERE room_id = ? AND receipt_type = ? AND thread = ? and event_id = ?",
1104                |mut stmt| {
1105                    stmt.query((room_id, receipt_type, thread, event_id))?
1106                        .mapped(|row| row.get(0))
1107                        .collect()
1108                },
1109            )
1110            .await?)
1111    }
1112}
1113
1114#[async_trait]
1115impl SqliteObjectStateStoreExt for SqliteAsyncConn {
1116    async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()> {
1117        Ok(self.interact(move |conn| conn.set_kv_blob(&key, &value)).await.unwrap()?)
1118    }
1119}
1120
1121#[async_trait]
1122impl StateStore for SqliteStateStore {
1123    type Error = Error;
1124
1125    async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
1126        self.read()
1127            .await?
1128            .get_kv_blob(self.encode_state_store_data_key(key))
1129            .await?
1130            .map(|data| {
1131                Ok(match key {
1132                    StateStoreDataKey::SyncToken => {
1133                        StateStoreDataValue::SyncToken(self.deserialize_value(&data)?)
1134                    }
1135                    StateStoreDataKey::SupportedVersions => {
1136                        StateStoreDataValue::SupportedVersions(self.deserialize_value(&data)?)
1137                    }
1138                    StateStoreDataKey::WellKnown => {
1139                        StateStoreDataValue::WellKnown(self.deserialize_value(&data)?)
1140                    }
1141                    StateStoreDataKey::Filter(_) => {
1142                        StateStoreDataValue::Filter(self.deserialize_value(&data)?)
1143                    }
1144                    StateStoreDataKey::UserAvatarUrl(_) => {
1145                        StateStoreDataValue::UserAvatarUrl(self.deserialize_value(&data)?)
1146                    }
1147                    StateStoreDataKey::RecentlyVisitedRooms(_) => {
1148                        StateStoreDataValue::RecentlyVisitedRooms(self.deserialize_value(&data)?)
1149                    }
1150                    StateStoreDataKey::UtdHookManagerData => {
1151                        StateStoreDataValue::UtdHookManagerData(self.deserialize_value(&data)?)
1152                    }
1153                    StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1154                        StateStoreDataValue::OneTimeKeyAlreadyUploaded
1155                    }
1156                    StateStoreDataKey::ComposerDraft(_, _) => {
1157                        StateStoreDataValue::ComposerDraft(self.deserialize_value(&data)?)
1158                    }
1159                    StateStoreDataKey::SeenKnockRequests(_) => {
1160                        StateStoreDataValue::SeenKnockRequests(self.deserialize_value(&data)?)
1161                    }
1162                    StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
1163                        StateStoreDataValue::ThreadSubscriptionsCatchupTokens(
1164                            self.deserialize_value(&data)?,
1165                        )
1166                    }
1167                    StateStoreDataKey::HomeserverCapabilities => {
1168                        StateStoreDataValue::HomeserverCapabilities(self.deserialize_value(&data)?)
1169                    }
1170                })
1171            })
1172            .transpose()
1173    }
1174
1175    async fn set_kv_data(
1176        &self,
1177        key: StateStoreDataKey<'_>,
1178        value: StateStoreDataValue,
1179    ) -> Result<()> {
1180        let serialized_value = match key {
1181            StateStoreDataKey::SyncToken => self.serialize_value(
1182                &value.into_sync_token().expect("Session data not a sync token"),
1183            )?,
1184            StateStoreDataKey::SupportedVersions => self.serialize_value(
1185                &value
1186                    .into_supported_versions()
1187                    .expect("Session data not containing supported versions"),
1188            )?,
1189            StateStoreDataKey::WellKnown => self.serialize_value(
1190                &value.into_well_known().expect("Session data not containing well-known"),
1191            )?,
1192            StateStoreDataKey::Filter(_) => {
1193                self.serialize_value(&value.into_filter().expect("Session data not a filter"))?
1194            }
1195            StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
1196                &value.into_user_avatar_url().expect("Session data not an user avatar url"),
1197            )?,
1198            StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
1199                &value.into_recently_visited_rooms().expect("Session data not breadcrumbs"),
1200            )?,
1201            StateStoreDataKey::UtdHookManagerData => self.serialize_value(
1202                &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
1203            )?,
1204            StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1205                self.serialize_value(&true).expect("We should be able to serialize a boolean")
1206            }
1207            StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
1208                &value.into_composer_draft().expect("Session data not a composer draft"),
1209            )?,
1210            StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
1211                &value
1212                    .into_seen_knock_requests()
1213                    .expect("Session data is not a set of seen knock request ids"),
1214            )?,
1215            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => self.serialize_value(
1216                &value
1217                    .into_thread_subscriptions_catchup_tokens()
1218                    .expect("Session data is not a list of thread subscription catchup tokens"),
1219            )?,
1220            StateStoreDataKey::HomeserverCapabilities => self.serialize_value(
1221                &value
1222                    .into_homeserver_capabilities()
1223                    .expect("Session data is not the homeserver capabilities"),
1224            )?,
1225        };
1226
1227        self.write()
1228            .await
1229            .set_kv_blob(self.encode_state_store_data_key(key), serialized_value)
1230            .await
1231    }
1232
1233    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
1234        self.write().await.delete_kv_blob(self.encode_state_store_data_key(key)).await
1235    }
1236
1237    async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
1238        let changes = changes.to_owned();
1239        let this = self.clone();
1240        self.write()
1241            .await
1242            .with_transaction(move |txn| {
1243                let StateChanges {
1244                    sync_token,
1245                    account_data,
1246                    presence,
1247                    profiles,
1248                    profiles_to_delete,
1249                    state,
1250                    room_account_data,
1251                    room_infos,
1252                    receipts,
1253                    redactions,
1254                    stripped_state,
1255                    ambiguity_maps,
1256                } = changes;
1257
1258                if let Some(sync_token) = sync_token {
1259                    let key = this.encode_state_store_data_key(StateStoreDataKey::SyncToken);
1260                    let value = this.serialize_value(&sync_token)?;
1261                    txn.set_kv_blob(&key, &value)?;
1262                }
1263
1264                for (event_type, event) in account_data {
1265                    let event_type =
1266                        this.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1267                    let data = this.serialize_json(&event)?;
1268                    txn.set_global_account_data(&event_type, &data)?;
1269                }
1270
1271                for (room_id, events) in room_account_data {
1272                    let room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1273                    for (event_type, event) in events {
1274                        let event_type =
1275                            this.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1276                        let data = this.serialize_json(&event)?;
1277                        txn.set_room_account_data(&room_id, &event_type, &data)?;
1278                    }
1279                }
1280
1281                for (user_id, event) in presence {
1282                    let key = this.encode_presence_key(&user_id);
1283                    let value = this.serialize_json(&event)?;
1284                    txn.set_kv_blob(&key, &value)?;
1285                }
1286
1287                for (room_id, room_info) in room_infos {
1288                    let stripped = room_info.state() == RoomState::Invited;
1289                    // Remove non-stripped data for stripped rooms and vice-versa.
1290                    this.remove_maybe_stripped_room_data(txn, &room_id, !stripped)?;
1291
1292                    let room_id = this.encode_key(keys::ROOM_INFO, room_id);
1293                    let state = this
1294                        .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
1295                    let data = this.serialize_json(&room_info)?;
1296                    txn.set_room_info(&room_id, &state, &data)?;
1297                }
1298
1299                for (room_id, user_ids) in profiles_to_delete {
1300                    let room_id = this.encode_key(keys::PROFILE, room_id);
1301                    for user_id in user_ids {
1302                        let user_id = this.encode_key(keys::PROFILE, user_id);
1303                        txn.remove_room_profile(&room_id, &user_id)?;
1304                    }
1305                }
1306
1307                for (room_id, state_event_types) in state {
1308                    let profiles = profiles.get(&room_id);
1309                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1310
1311                    for (event_type, state_events) in state_event_types {
1312                        let encoded_event_type =
1313                            this.encode_key(keys::STATE_EVENT, event_type.to_string());
1314
1315                        for (state_key, raw_state_event) in state_events {
1316                            let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1317                            let data = this.serialize_json(&raw_state_event)?;
1318
1319                            let event_id: Option<String> =
1320                                raw_state_event.get_field("event_id").ok().flatten();
1321                            let encoded_event_id =
1322                                event_id.as_ref().map(|e| this.encode_key(keys::STATE_EVENT, e));
1323
1324                            txn.set_state_event(
1325                                &encoded_room_id,
1326                                &encoded_event_type,
1327                                &encoded_state_key,
1328                                false,
1329                                encoded_event_id.as_deref(),
1330                                &data,
1331                            )?;
1332
1333                            if event_type == StateEventType::RoomMember {
1334                                let member_event = match raw_state_event
1335                                    .deserialize_as_unchecked::<SyncRoomMemberEvent>()
1336                                {
1337                                    Ok(ev) => ev,
1338                                    Err(e) => {
1339                                        debug!(event_id, "Failed to deserialize member event: {e}");
1340                                        continue;
1341                                    }
1342                                };
1343
1344                                let encoded_room_id = this.encode_key(keys::MEMBER, &room_id);
1345                                let user_id = this.encode_key(keys::MEMBER, &state_key);
1346                                let membership = this
1347                                    .encode_key(keys::MEMBER, member_event.membership().as_str());
1348                                let data = this.serialize_value(&state_key)?;
1349
1350                                txn.set_member(
1351                                    &encoded_room_id,
1352                                    &user_id,
1353                                    &membership,
1354                                    false,
1355                                    &data,
1356                                )?;
1357
1358                                if let Some(profile) =
1359                                    profiles.and_then(|p| p.get(member_event.state_key()))
1360                                {
1361                                    let room_id = this.encode_key(keys::PROFILE, &room_id);
1362                                    let user_id = this.encode_key(keys::PROFILE, &state_key);
1363                                    let data = this.serialize_json(&profile)?;
1364                                    txn.set_profile(&room_id, &user_id, &data)?;
1365                                }
1366                            }
1367                        }
1368                    }
1369                }
1370
1371                for (room_id, stripped_state_event_types) in stripped_state {
1372                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1373
1374                    for (event_type, stripped_state_events) in stripped_state_event_types {
1375                        let encoded_event_type =
1376                            this.encode_key(keys::STATE_EVENT, event_type.to_string());
1377
1378                        for (state_key, raw_stripped_state_event) in stripped_state_events {
1379                            let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1380                            let data = this.serialize_json(&raw_stripped_state_event)?;
1381                            txn.set_state_event(
1382                                &encoded_room_id,
1383                                &encoded_event_type,
1384                                &encoded_state_key,
1385                                true,
1386                                None,
1387                                &data,
1388                            )?;
1389
1390                            if event_type == StateEventType::RoomMember {
1391                                let member_event = match raw_stripped_state_event
1392                                    .deserialize_as_unchecked::<StrippedRoomMemberEvent>(
1393                                ) {
1394                                    Ok(ev) => ev,
1395                                    Err(e) => {
1396                                        debug!("Failed to deserialize stripped member event: {e}");
1397                                        continue;
1398                                    }
1399                                };
1400
1401                                let room_id = this.encode_key(keys::MEMBER, &room_id);
1402                                let user_id = this.encode_key(keys::MEMBER, &state_key);
1403                                let membership = this.encode_key(
1404                                    keys::MEMBER,
1405                                    member_event.content.membership.as_str(),
1406                                );
1407                                let data = this.serialize_value(&state_key)?;
1408
1409                                txn.set_member(&room_id, &user_id, &membership, true, &data)?;
1410                            }
1411                        }
1412                    }
1413                }
1414
1415                for (room_id, receipt_event) in receipts {
1416                    let room_id = this.encode_key(keys::RECEIPT, room_id);
1417
1418                    for (event_id, receipt_types) in receipt_event {
1419                        let encoded_event_id = this.encode_key(keys::RECEIPT, &event_id);
1420
1421                        for (receipt_type, receipt_users) in receipt_types {
1422                            let receipt_type =
1423                                this.encode_key(keys::RECEIPT, receipt_type.as_str());
1424
1425                            for (user_id, receipt) in receipt_users {
1426                                let encoded_user_id = this.encode_key(keys::RECEIPT, &user_id);
1427                                // We cannot have a NULL primary key so we rely on serialization
1428                                // instead of the string representation.
1429                                let thread = this.encode_key(
1430                                    keys::RECEIPT,
1431                                    rmp_serde::to_vec_named(&receipt.thread)?,
1432                                );
1433                                let data = this.serialize_json(&ReceiptData {
1434                                    receipt,
1435                                    event_id: event_id.clone(),
1436                                    user_id,
1437                                })?;
1438
1439                                txn.set_receipt(
1440                                    &room_id,
1441                                    &encoded_user_id,
1442                                    &receipt_type,
1443                                    &thread,
1444                                    &encoded_event_id,
1445                                    &data,
1446                                )?;
1447                            }
1448                        }
1449                    }
1450                }
1451
1452                for (room_id, redactions) in redactions {
1453                    let make_redaction_rules = || {
1454                        let encoded_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1455                        txn.get_room_info(&encoded_room_id)
1456                            .ok()
1457                            .flatten()
1458                            .and_then(|v| this.deserialize_json::<RoomInfo>(&v).ok())
1459                            .map(|info| info.room_version_rules_or_default())
1460                            .unwrap_or_else(|| {
1461                                warn!(
1462                                    ?room_id,
1463                                    "Unable to get the room version rules, defaulting to rules for room version {ROOM_VERSION_FALLBACK}"
1464                                );
1465                                ROOM_VERSION_RULES_FALLBACK
1466                            }).redaction
1467                    };
1468
1469                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1470                    let mut redaction_rules = None;
1471
1472                    for (event_id, redaction) in redactions {
1473                        let event_id = this.encode_key(keys::STATE_EVENT, event_id);
1474
1475                        if let Some(Ok(raw_event)) = txn
1476                            .get_state_event_by_id(&encoded_room_id, &event_id)?
1477                            .map(|value| this.deserialize_json::<Raw<AnySyncStateEvent>>(&value))
1478                        {
1479                            let event = raw_event.deserialize()?;
1480                            let redacted = redact(
1481                                raw_event.deserialize_as::<CanonicalJsonObject>()?,
1482                                redaction_rules.get_or_insert_with(make_redaction_rules),
1483                                Some(RedactedBecause::from_raw_event(&redaction)?),
1484                            )
1485                            .map_err(Error::Redaction)?;
1486                            let data = this.serialize_json(&redacted)?;
1487
1488                            let event_type =
1489                                this.encode_key(keys::STATE_EVENT, event.event_type().to_string());
1490                            let state_key = this.encode_key(keys::STATE_EVENT, event.state_key());
1491
1492                            txn.set_state_event(
1493                                &encoded_room_id,
1494                                &event_type,
1495                                &state_key,
1496                                false,
1497                                Some(&event_id),
1498                                &data,
1499                            )?;
1500                        }
1501                    }
1502                }
1503
1504                for (room_id, display_names) in ambiguity_maps {
1505                    let room_id = this.encode_key(keys::DISPLAY_NAME, room_id);
1506
1507                    for (name, user_ids) in display_names {
1508                        let encoded_name = this.encode_key(
1509                            keys::DISPLAY_NAME,
1510                            name.as_normalized_str().unwrap_or_else(|| name.as_raw_str()),
1511                        );
1512                        let data = this.serialize_json(&user_ids)?;
1513
1514                        if user_ids.is_empty() {
1515                            txn.remove_display_name(&room_id, &encoded_name)?;
1516
1517                            // We can't do a migration to merge the previously distinct buckets of
1518                            // user IDs since the display names themselves are hashed before they
1519                            // are persisted in the store. So the store will always retain two
1520                            // buckets: one for raw display names and one for normalised ones.
1521                            //
1522                            // We therefore do the next best thing, which is a sort of a soft
1523                            // migration: we fetch both the raw and normalised buckets, then merge
1524                            // the user IDs contained in them into a separate, temporary merged
1525                            // bucket. The SDK then operates on the merged buckets exclusively. See
1526                            // the comment in `get_users_with_display_names` for details.
1527                            //
1528                            // If the merged bucket is empty, that must mean that both the raw and
1529                            // normalised buckets were also empty, so we can remove both from the
1530                            // store.
1531                            let raw_name = this.encode_key(keys::DISPLAY_NAME, name.as_raw_str());
1532                            txn.remove_display_name(&room_id, &raw_name)?;
1533                        } else {
1534                            // We only create new buckets with the normalized display name.
1535                            txn.set_display_name(&room_id, &encoded_name, &data)?;
1536                        }
1537                    }
1538                }
1539
1540                Ok::<_, Error>(())
1541            })
1542            .await?;
1543
1544        Ok(())
1545    }
1546
1547    async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1548        self.read()
1549            .await?
1550            .get_kv_blob(self.encode_presence_key(user_id))
1551            .await?
1552            .map(|data| self.deserialize_json(&data))
1553            .transpose()
1554    }
1555
1556    async fn get_presence_events(
1557        &self,
1558        user_ids: &[OwnedUserId],
1559    ) -> Result<Vec<Raw<PresenceEvent>>> {
1560        if user_ids.is_empty() {
1561            return Ok(Vec::new());
1562        }
1563
1564        let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
1565        self.read()
1566            .await?
1567            .get_kv_blobs(user_ids)
1568            .await?
1569            .into_iter()
1570            .map(|data| self.deserialize_json(&data))
1571            .collect()
1572    }
1573
1574    async fn get_state_event(
1575        &self,
1576        room_id: &RoomId,
1577        event_type: StateEventType,
1578        state_key: &str,
1579    ) -> Result<Option<RawAnySyncOrStrippedState>> {
1580        Ok(self
1581            .get_state_events_for_keys(room_id, event_type, &[state_key])
1582            .await?
1583            .into_iter()
1584            .next())
1585    }
1586
1587    async fn get_state_events(
1588        &self,
1589        room_id: &RoomId,
1590        event_type: StateEventType,
1591    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1592        let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1593        let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1594        self.read()
1595            .await?
1596            .get_maybe_stripped_state_events(room_id, event_type)
1597            .await?
1598            .into_iter()
1599            .map(|(stripped, data)| {
1600                let ev = if stripped {
1601                    RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1602                } else {
1603                    RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1604                };
1605
1606                Ok(ev)
1607            })
1608            .collect()
1609    }
1610
1611    async fn get_state_events_for_keys(
1612        &self,
1613        room_id: &RoomId,
1614        event_type: StateEventType,
1615        state_keys: &[&str],
1616    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1617        if state_keys.is_empty() {
1618            return Ok(Vec::new());
1619        }
1620
1621        let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1622        let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1623        let state_keys = state_keys.iter().map(|k| self.encode_key(keys::STATE_EVENT, k)).collect();
1624        self.read()
1625            .await?
1626            .get_maybe_stripped_state_events_for_keys(room_id, event_type, state_keys)
1627            .await?
1628            .into_iter()
1629            .map(|(stripped, data)| {
1630                let ev = if stripped {
1631                    RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1632                } else {
1633                    RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1634                };
1635
1636                Ok(ev)
1637            })
1638            .collect()
1639    }
1640
1641    async fn get_profile(
1642        &self,
1643        room_id: &RoomId,
1644        user_id: &UserId,
1645    ) -> Result<Option<MinimalRoomMemberEvent>> {
1646        let room_id = self.encode_key(keys::PROFILE, room_id);
1647        let user_ids = vec![self.encode_key(keys::PROFILE, user_id)];
1648
1649        self.read()
1650            .await?
1651            .get_profiles(room_id, user_ids)
1652            .await?
1653            .into_iter()
1654            .next()
1655            .map(|(_, data)| self.deserialize_json(&data))
1656            .transpose()
1657    }
1658
1659    async fn get_profiles<'a>(
1660        &self,
1661        room_id: &RoomId,
1662        user_ids: &'a [OwnedUserId],
1663    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1664        if user_ids.is_empty() {
1665            return Ok(BTreeMap::new());
1666        }
1667
1668        let room_id = self.encode_key(keys::PROFILE, room_id);
1669        let mut user_ids_map = user_ids
1670            .iter()
1671            .map(|u| (self.encode_key(keys::PROFILE, u), u.as_ref()))
1672            .collect::<BTreeMap<_, _>>();
1673        let user_ids = user_ids_map.keys().cloned().collect();
1674
1675        self.read()
1676            .await?
1677            .get_profiles(room_id, user_ids)
1678            .await?
1679            .into_iter()
1680            .map(|(user_id, data)| {
1681                Ok((
1682                    user_ids_map
1683                        .remove(user_id.as_slice())
1684                        .expect("returned user IDs were requested"),
1685                    self.deserialize_json(&data)?,
1686                ))
1687            })
1688            .collect()
1689    }
1690
1691    async fn get_user_ids(
1692        &self,
1693        room_id: &RoomId,
1694        membership: RoomMemberships,
1695    ) -> Result<Vec<OwnedUserId>> {
1696        let room_id = self.encode_key(keys::MEMBER, room_id);
1697        let memberships = membership
1698            .as_vec()
1699            .into_iter()
1700            .map(|m| self.encode_key(keys::MEMBER, m.as_str()))
1701            .collect();
1702        self.read()
1703            .await?
1704            .get_user_ids(room_id, memberships)
1705            .await?
1706            .iter()
1707            .map(|data| self.deserialize_value(data))
1708            .collect()
1709    }
1710
1711    async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1712        self.read()
1713            .await?
1714            .get_room_infos(match room_load_settings {
1715                RoomLoadSettings::All => None,
1716                RoomLoadSettings::One(room_id) => Some(self.encode_key(keys::ROOM_INFO, room_id)),
1717            })
1718            .await?
1719            .into_iter()
1720            .map(|data| self.deserialize_json(&data))
1721            .collect()
1722    }
1723
1724    async fn get_users_with_display_name(
1725        &self,
1726        room_id: &RoomId,
1727        display_name: &DisplayName,
1728    ) -> Result<BTreeSet<OwnedUserId>> {
1729        let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1730        let names = vec![self.encode_key(
1731            keys::DISPLAY_NAME,
1732            display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1733        )];
1734
1735        Ok(self
1736            .read()
1737            .await?
1738            .get_display_names(room_id, names)
1739            .await?
1740            .into_iter()
1741            .next()
1742            .map(|(_, data)| self.deserialize_json(&data))
1743            .transpose()?
1744            .unwrap_or_default())
1745    }
1746
1747    async fn get_users_with_display_names<'a>(
1748        &self,
1749        room_id: &RoomId,
1750        display_names: &'a [DisplayName],
1751    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1752        let mut result = HashMap::new();
1753
1754        if display_names.is_empty() {
1755            return Ok(result);
1756        }
1757
1758        let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1759        let mut names_map = display_names
1760            .iter()
1761            .flat_map(|display_name| {
1762                // We encode the display name as the `raw_str()` and the normalized string.
1763                //
1764                // This is for compatibility reasons since:
1765                //  1. Previously "Alice" and "alice" were considered to be distinct display
1766                //     names, while we now consider them to be the same so we need to merge the
1767                //     previously distinct buckets of user IDs.
1768                //  2. We can't do a migration to merge the previously distinct buckets of user
1769                //     IDs since the display names itself are hashed before they are persisted
1770                //     in the store.
1771                let raw =
1772                    (self.encode_key(keys::DISPLAY_NAME, display_name.as_raw_str()), display_name);
1773                let normalized = display_name.as_normalized_str().map(|normalized| {
1774                    (self.encode_key(keys::DISPLAY_NAME, normalized), display_name)
1775                });
1776
1777                iter::once(raw).chain(normalized)
1778            })
1779            .collect::<BTreeMap<_, _>>();
1780        let names = names_map.keys().cloned().collect();
1781
1782        for (name, data) in self.read().await?.get_display_names(room_id, names).await?.into_iter()
1783        {
1784            let display_name =
1785                names_map.remove(name.as_slice()).expect("returned display names were requested");
1786            let user_ids: BTreeSet<_> = self.deserialize_json(&data)?;
1787
1788            result.entry(display_name).or_insert_with(BTreeSet::new).extend(user_ids);
1789        }
1790
1791        Ok(result)
1792    }
1793
1794    async fn get_account_data_event(
1795        &self,
1796        event_type: GlobalAccountDataEventType,
1797    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1798        let event_type = self.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1799        self.read()
1800            .await?
1801            .get_global_account_data(event_type)
1802            .await?
1803            .map(|value| self.deserialize_json(&value))
1804            .transpose()
1805    }
1806
1807    async fn get_room_account_data_event(
1808        &self,
1809        room_id: &RoomId,
1810        event_type: RoomAccountDataEventType,
1811    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1812        let room_id = self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1813        let event_type = self.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1814        self.read()
1815            .await?
1816            .get_room_account_data(room_id, event_type)
1817            .await?
1818            .map(|value| self.deserialize_json(&value))
1819            .transpose()
1820    }
1821
1822    async fn get_user_room_receipt_event(
1823        &self,
1824        room_id: &RoomId,
1825        receipt_type: ReceiptType,
1826        thread: ReceiptThread,
1827        user_id: &UserId,
1828    ) -> Result<Option<(OwnedEventId, Receipt)>> {
1829        let room_id = self.encode_key(keys::RECEIPT, room_id);
1830        let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1831        // We cannot have a NULL primary key so we rely on serialization instead of the
1832        // string representation.
1833        let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1834        let user_id = self.encode_key(keys::RECEIPT, user_id);
1835
1836        self.read()
1837            .await?
1838            .get_user_receipt(room_id, receipt_type, thread, user_id)
1839            .await?
1840            .map(|value| {
1841                self.deserialize_json::<ReceiptData>(&value).map(|d| (d.event_id, d.receipt))
1842            })
1843            .transpose()
1844    }
1845
1846    async fn get_event_room_receipt_events(
1847        &self,
1848        room_id: &RoomId,
1849        receipt_type: ReceiptType,
1850        thread: ReceiptThread,
1851        event_id: &EventId,
1852    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1853        let room_id = self.encode_key(keys::RECEIPT, room_id);
1854        let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1855        // We cannot have a NULL primary key so we rely on serialization instead of the
1856        // string representation.
1857        let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1858        let event_id = self.encode_key(keys::RECEIPT, event_id);
1859
1860        self.read()
1861            .await?
1862            .get_event_receipts(room_id, receipt_type, thread, event_id)
1863            .await?
1864            .iter()
1865            .map(|value| {
1866                self.deserialize_json::<ReceiptData>(value).map(|d| (d.user_id, d.receipt))
1867            })
1868            .collect()
1869    }
1870
1871    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1872        self.read().await?.get_kv_blob(self.encode_custom_key(key)).await
1873    }
1874
1875    async fn set_custom_value_no_read(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
1876        let conn = self.write().await;
1877        let key = self.encode_custom_key(key);
1878        conn.set_kv_blob(key, value).await?;
1879        Ok(())
1880    }
1881
1882    async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1883        let conn = self.write().await;
1884        let key = self.encode_custom_key(key);
1885        let previous = conn.get_kv_blob(key.clone()).await?;
1886        conn.set_kv_blob(key, value).await?;
1887        Ok(previous)
1888    }
1889
1890    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1891        let conn = self.write().await;
1892        let key = self.encode_custom_key(key);
1893        let previous = conn.get_kv_blob(key.clone()).await?;
1894        if previous.is_some() {
1895            conn.delete_kv_blob(key).await?;
1896        }
1897        Ok(previous)
1898    }
1899
1900    async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1901        let this = self.clone();
1902        let room_id = room_id.to_owned();
1903
1904        let conn = self.write().await;
1905
1906        conn.with_transaction(move |txn| -> Result<()> {
1907            let room_info_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1908            txn.remove_room_info(&room_info_room_id)?;
1909
1910            let state_event_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1911            txn.remove_room_state_events(&state_event_room_id, None)?;
1912
1913            let member_room_id = this.encode_key(keys::MEMBER, &room_id);
1914            txn.remove_room_members(&member_room_id, None)?;
1915
1916            let profile_room_id = this.encode_key(keys::PROFILE, &room_id);
1917            txn.remove_room_profiles(&profile_room_id)?;
1918
1919            let room_account_data_room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, &room_id);
1920            txn.remove_room_account_data(&room_account_data_room_id)?;
1921
1922            let receipt_room_id = this.encode_key(keys::RECEIPT, &room_id);
1923            txn.remove_room_receipts(&receipt_room_id)?;
1924
1925            let display_name_room_id = this.encode_key(keys::DISPLAY_NAME, &room_id);
1926            txn.remove_room_display_names(&display_name_room_id)?;
1927
1928            let send_queue_room_id = this.encode_key(keys::SEND_QUEUE, &room_id);
1929            txn.remove_room_send_queue(&send_queue_room_id)?;
1930
1931            let dependent_send_queue_room_id =
1932                this.encode_key(keys::DEPENDENTS_SEND_QUEUE, &room_id);
1933            txn.remove_room_dependent_send_queue(&dependent_send_queue_room_id)?;
1934
1935            let thread_subscriptions_room_id =
1936                this.encode_key(keys::THREAD_SUBSCRIPTIONS, &room_id);
1937            txn.execute(
1938                "DELETE FROM thread_subscriptions WHERE room_id = ?",
1939                (thread_subscriptions_room_id,),
1940            )?;
1941
1942            Ok(())
1943        })
1944        .await?;
1945
1946        conn.vacuum().await
1947    }
1948
1949    async fn save_send_queue_request(
1950        &self,
1951        room_id: &RoomId,
1952        transaction_id: OwnedTransactionId,
1953        created_at: MilliSecondsSinceUnixEpoch,
1954        content: QueuedRequestKind,
1955        priority: usize,
1956    ) -> Result<(), Self::Error> {
1957        let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
1958        let room_id_value = self.serialize_value(&room_id.to_owned())?;
1959
1960        let content = self.serialize_json(&content)?;
1961        // The transaction id is used both as a key (in remove/update) and a value (as
1962        // it's useful for the callers), so we keep it as is, and neither hash
1963        // it (with encode_key) or encrypt it (through serialize_value). After
1964        // all, it carries no personal information, so this is considered fine.
1965
1966        let created_at_ts: u64 = created_at.0.into();
1967        self.write()
1968            .await
1969            .with_transaction(move |txn| {
1970                txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
1971                Ok(())
1972            })
1973            .await
1974    }
1975
1976    async fn update_send_queue_request(
1977        &self,
1978        room_id: &RoomId,
1979        transaction_id: &TransactionId,
1980        content: QueuedRequestKind,
1981    ) -> Result<bool, Self::Error> {
1982        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1983
1984        let content = self.serialize_json(&content)?;
1985        // See comment in [`Self::save_send_queue_request`] to understand why the
1986        // transaction id is neither encrypted or hashed.
1987        let transaction_id = transaction_id.to_string();
1988
1989        let num_updated = self.write()
1990            .await
1991            .with_transaction(move |txn| {
1992                txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
1993            })
1994            .await?;
1995
1996        Ok(num_updated > 0)
1997    }
1998
1999    async fn remove_send_queue_request(
2000        &self,
2001        room_id: &RoomId,
2002        transaction_id: &TransactionId,
2003    ) -> Result<bool, Self::Error> {
2004        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
2005
2006        // See comment in `save_send_queue_request`.
2007        let transaction_id = transaction_id.to_string();
2008
2009        let num_deleted = self
2010            .write()
2011            .await
2012            .with_transaction(move |txn| {
2013                txn.prepare_cached(
2014                    "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
2015                )?
2016                .execute((room_id, &transaction_id))
2017            })
2018            .await?;
2019
2020        Ok(num_deleted > 0)
2021    }
2022
2023    async fn load_send_queue_requests(
2024        &self,
2025        room_id: &RoomId,
2026    ) -> Result<Vec<QueuedRequest>, Self::Error> {
2027        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
2028
2029        // Note: ROWID is always present and is an auto-incremented integer counter. We
2030        // want to maintain the insertion order, so we can sort using it.
2031        // Note 2: transaction_id is not encoded, see why in `save_send_queue_request`.
2032        let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, Option<u64>)> = self
2033            .read()
2034            .await?
2035            .prepare(
2036                "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
2037                |mut stmt| {
2038                    stmt.query((room_id,))?
2039                        .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2040                        .collect()
2041                },
2042            )
2043            .await?;
2044
2045        let mut requests = Vec::with_capacity(res.len());
2046
2047        for entry in res {
2048            let created_at = entry
2049                .4
2050                .and_then(UInt::new)
2051                .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2052
2053            requests.push(QueuedRequest {
2054                transaction_id: entry.0.into(),
2055                kind: self.deserialize_json(&entry.1)?,
2056                error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
2057                priority: entry.3,
2058                created_at,
2059            });
2060        }
2061
2062        Ok(requests)
2063    }
2064
2065    async fn update_send_queue_request_status(
2066        &self,
2067        room_id: &RoomId,
2068        transaction_id: &TransactionId,
2069        error: Option<QueueWedgeError>,
2070    ) -> Result<(), Self::Error> {
2071        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
2072
2073        // See comment in `save_send_queue_request`.
2074        let transaction_id = transaction_id.to_string();
2075
2076        // Serialize the error to json bytes (encrypted if option is enabled) if set.
2077        let error_value = error.map(|e| self.serialize_value(&e)).transpose()?;
2078
2079        self.write()
2080            .await
2081            .with_transaction(move |txn| {
2082                txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?;
2083                Ok(())
2084            })
2085            .await
2086    }
2087
2088    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
2089        // If the values were not encrypted, we could use `SELECT DISTINCT` here, but we
2090        // have to manually do the deduplication: indeed, for all X, encrypt(X)
2091        // != encrypted(X), since we use a nonce in the encryption process.
2092
2093        let res: Vec<Vec<u8>> = self
2094            .read()
2095            .await?
2096            .prepare("SELECT room_id_val FROM send_queue_events", |mut stmt| {
2097                stmt.query(())?.mapped(|row| row.get(0)).collect()
2098            })
2099            .await?;
2100
2101        // So we collect the results into a `BTreeSet` to perform the deduplication, and
2102        // then rejigger that into a vector.
2103        Ok(res
2104            .into_iter()
2105            .map(|entry| self.deserialize_value(&entry))
2106            .collect::<Result<BTreeSet<OwnedRoomId>, _>>()?
2107            .into_iter()
2108            .collect())
2109    }
2110
2111    async fn save_dependent_queued_request(
2112        &self,
2113        room_id: &RoomId,
2114        parent_txn_id: &TransactionId,
2115        own_txn_id: ChildTransactionId,
2116        created_at: MilliSecondsSinceUnixEpoch,
2117        content: DependentQueuedRequestKind,
2118    ) -> Result<()> {
2119        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2120        let content = self.serialize_json(&content)?;
2121
2122        // See comment in `save_send_queue_request`.
2123        let parent_txn_id = parent_txn_id.to_string();
2124        let own_txn_id = own_txn_id.to_string();
2125
2126        let created_at_ts: u64 = created_at.0.into();
2127        self.write()
2128            .await
2129            .with_transaction(move |txn| {
2130                txn.prepare_cached(
2131                    r#"INSERT INTO dependent_send_queue_events
2132                         (room_id, parent_transaction_id, own_transaction_id, content, created_at)
2133                       VALUES (?, ?, ?, ?, ?)"#,
2134                )?
2135                .execute((
2136                    room_id,
2137                    parent_txn_id,
2138                    own_txn_id,
2139                    content,
2140                    created_at_ts,
2141                ))?;
2142                Ok(())
2143            })
2144            .await
2145    }
2146
2147    async fn update_dependent_queued_request(
2148        &self,
2149        room_id: &RoomId,
2150        own_transaction_id: &ChildTransactionId,
2151        new_content: DependentQueuedRequestKind,
2152    ) -> Result<bool> {
2153        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2154        let content = self.serialize_json(&new_content)?;
2155
2156        // See comment in `save_send_queue_request`.
2157        let own_txn_id = own_transaction_id.to_string();
2158
2159        let num_updated = self
2160            .write()
2161            .await
2162            .with_transaction(move |txn| {
2163                txn.prepare_cached(
2164                    r#"UPDATE dependent_send_queue_events
2165                       SET content = ?
2166                       WHERE own_transaction_id = ?
2167                       AND room_id = ?"#,
2168                )?
2169                .execute((content, own_txn_id, room_id))
2170            })
2171            .await?;
2172
2173        if num_updated > 1 {
2174            return Err(Error::InconsistentUpdate);
2175        }
2176
2177        Ok(num_updated == 1)
2178    }
2179
2180    async fn mark_dependent_queued_requests_as_ready(
2181        &self,
2182        room_id: &RoomId,
2183        parent_txn_id: &TransactionId,
2184        parent_key: SentRequestKey,
2185    ) -> Result<usize> {
2186        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2187        let parent_key = self.serialize_json(&parent_key)?;
2188
2189        // See comment in `save_send_queue_request`.
2190        let parent_txn_id = parent_txn_id.to_string();
2191
2192        self.write()
2193            .await
2194            .with_transaction(move |txn| {
2195                Ok(txn.prepare_cached(
2196                    "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?",
2197                )?
2198                .execute((parent_key, parent_txn_id, room_id))?)
2199            })
2200            .await
2201    }
2202
2203    async fn remove_dependent_queued_request(
2204        &self,
2205        room_id: &RoomId,
2206        txn_id: &ChildTransactionId,
2207    ) -> Result<bool> {
2208        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2209
2210        // See comment in `save_send_queue_request`.
2211        let txn_id = txn_id.to_string();
2212
2213        let num_deleted = self
2214            .write()
2215            .await
2216            .with_transaction(move |txn| {
2217                txn.prepare_cached(
2218                    "DELETE FROM dependent_send_queue_events WHERE own_transaction_id = ? AND room_id = ?",
2219                )?
2220                .execute((txn_id, room_id))
2221            })
2222            .await?;
2223
2224        Ok(num_deleted > 0)
2225    }
2226
2227    async fn load_dependent_queued_requests(
2228        &self,
2229        room_id: &RoomId,
2230    ) -> Result<Vec<DependentQueuedRequest>> {
2231        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2232
2233        // Note: transaction_id is not encoded, see why in `save_send_queue_request`.
2234        let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, Option<u64>)> = self
2235            .read()
2236            .await?
2237            .prepare(
2238                "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2239                |mut stmt| {
2240                    stmt.query((room_id,))?
2241                        .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2242                        .collect()
2243                },
2244            )
2245            .await?;
2246
2247        let mut dependent_events = Vec::with_capacity(res.len());
2248
2249        for entry in res {
2250            let created_at = entry
2251                .4
2252                .and_then(UInt::new)
2253                .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2254
2255            dependent_events.push(DependentQueuedRequest {
2256                own_transaction_id: entry.0.into(),
2257                parent_transaction_id: entry.1.into(),
2258                parent_key: entry.2.map(|json| self.deserialize_json(&json)).transpose()?,
2259                kind: self.deserialize_json(&entry.3)?,
2260                created_at,
2261            });
2262        }
2263
2264        Ok(dependent_events)
2265    }
2266
2267    async fn upsert_thread_subscriptions(
2268        &self,
2269        updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
2270    ) -> Result<(), Self::Error> {
2271        let values: Vec<_> = updates
2272            .into_iter()
2273            .map(|(room_id, thread_id, subscription)| {
2274                (
2275                    self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id),
2276                    self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id),
2277                    subscription.status.as_str(),
2278                    subscription.bump_stamp,
2279                )
2280            })
2281            .collect();
2282
2283        self.write()
2284            .await
2285            .with_transaction(move |txn| {
2286                let mut txn = txn.prepare_cached(
2287                    "INSERT INTO thread_subscriptions (room_id, event_id, status, bump_stamp)
2288                    VALUES (?, ?, ?, ?)
2289                    ON CONFLICT (room_id, event_id) DO UPDATE
2290                    SET
2291                        status =
2292                            CASE
2293                                WHEN thread_subscriptions.bump_stamp IS NULL THEN EXCLUDED.status
2294                                WHEN EXCLUDED.bump_stamp IS NULL THEN EXCLUDED.status
2295                                WHEN thread_subscriptions.bump_stamp < EXCLUDED.bump_stamp THEN EXCLUDED.status
2296                                ELSE thread_subscriptions.status
2297                            END,
2298                        bump_stamp =
2299                            CASE
2300                                WHEN thread_subscriptions.bump_stamp IS NULL THEN EXCLUDED.bump_stamp
2301                                WHEN EXCLUDED.bump_stamp IS NULL THEN thread_subscriptions.bump_stamp
2302                                WHEN thread_subscriptions.bump_stamp < EXCLUDED.bump_stamp THEN EXCLUDED.bump_stamp
2303                                ELSE thread_subscriptions.bump_stamp
2304                            END",
2305                )?;
2306
2307                for value in values {
2308                    txn.execute(value)?;
2309                }
2310
2311                Result::<_, Error>::Ok(())
2312            })
2313            .await?;
2314
2315        Ok(())
2316    }
2317
2318    async fn load_thread_subscription(
2319        &self,
2320        room_id: &RoomId,
2321        thread_id: &EventId,
2322    ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
2323        let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2324        let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2325
2326        Ok(self
2327            .read()
2328            .await?
2329            .query_row(
2330                "SELECT status, bump_stamp FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2331                (room_id, thread_id),
2332                |row| Ok((row.get::<_, String>(0)?, row.get::<_, Option<u64>>(1)?))
2333            )
2334            .await
2335            .optional()?
2336            .map(|(status, bump_stamp)| -> Result<_, Self::Error> {
2337                let status = ThreadSubscriptionStatus::from_str(&status).map_err(|_| {
2338                    Error::InvalidData { details: format!("Invalid thread status: {status}") }
2339                })?;
2340                Ok(StoredThreadSubscription { status, bump_stamp })
2341            })
2342            .transpose()?)
2343    }
2344
2345    async fn remove_thread_subscription(
2346        &self,
2347        room_id: &RoomId,
2348        thread_id: &EventId,
2349    ) -> Result<(), Self::Error> {
2350        let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2351        let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2352
2353        self.write()
2354            .await
2355            .execute(
2356                "DELETE FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2357                (room_id, thread_id),
2358            )
2359            .await?;
2360
2361        Ok(())
2362    }
2363
2364    async fn optimize(&self) -> Result<(), Self::Error> {
2365        Ok(self.vacuum().await?)
2366    }
2367
2368    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
2369        self.get_db_size().await
2370    }
2371}
2372
2373#[derive(Debug, Clone, Serialize, Deserialize)]
2374struct ReceiptData {
2375    receipt: Receipt,
2376    event_id: OwnedEventId,
2377    user_id: OwnedUserId,
2378}
2379
2380#[cfg(test)]
2381mod tests {
2382    use std::sync::{
2383        LazyLock,
2384        atomic::{AtomicU32, Ordering::SeqCst},
2385    };
2386
2387    use matrix_sdk_base::{StateStore, StoreError, statestore_integration_tests};
2388    use tempfile::{TempDir, tempdir};
2389
2390    use super::SqliteStateStore;
2391
2392    static TMP_DIR: LazyLock<TempDir> = LazyLock::new(|| tempdir().unwrap());
2393    static NUM: AtomicU32 = AtomicU32::new(0);
2394
2395    async fn get_store() -> Result<impl StateStore, StoreError> {
2396        let name = NUM.fetch_add(1, SeqCst).to_string();
2397        let tmpdir_path = TMP_DIR.path().join(name);
2398
2399        tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2400
2401        Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
2402    }
2403
2404    statestore_integration_tests!();
2405}
2406
2407#[cfg(test)]
2408mod encrypted_tests {
2409    use std::{
2410        path::PathBuf,
2411        sync::{
2412            LazyLock,
2413            atomic::{AtomicU32, Ordering::SeqCst},
2414        },
2415    };
2416
2417    use matrix_sdk_base::{StateStore, StoreError, statestore_integration_tests};
2418    use matrix_sdk_test::async_test;
2419    use tempfile::{TempDir, tempdir};
2420
2421    use super::SqliteStateStore;
2422    use crate::{SqliteStoreConfig, utils::SqliteAsyncConnExt};
2423
2424    static TMP_DIR: LazyLock<TempDir> = LazyLock::new(|| tempdir().unwrap());
2425    static NUM: AtomicU32 = AtomicU32::new(0);
2426
2427    fn new_state_store_workspace() -> PathBuf {
2428        let name = NUM.fetch_add(1, SeqCst).to_string();
2429        TMP_DIR.path().join(name)
2430    }
2431
2432    async fn get_store() -> Result<impl StateStore, StoreError> {
2433        let tmpdir_path = new_state_store_workspace();
2434
2435        tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2436
2437        Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
2438            .await
2439            .unwrap())
2440    }
2441
2442    #[async_test]
2443    async fn test_pool_size() {
2444        let tmpdir_path = new_state_store_workspace();
2445        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
2446
2447        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2448
2449        assert_eq!(store.pool.status().max_size, 42);
2450    }
2451
2452    #[async_test]
2453    async fn test_cache_size() {
2454        let tmpdir_path = new_state_store_workspace();
2455        let store_open_config = SqliteStoreConfig::new(tmpdir_path).cache_size(1500);
2456
2457        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2458
2459        let conn = store.pool.get().await.unwrap();
2460        let cache_size =
2461            conn.query_row("PRAGMA cache_size", (), |row| row.get::<_, i32>(0)).await.unwrap();
2462
2463        // The value passed to `SqliteStoreConfig` is in bytes. Check it is
2464        // converted to kibibytes. Also, it must be a negative value because it
2465        // _is_ the size in kibibytes, not in page size.
2466        assert_eq!(cache_size, -(1500 / 1024));
2467    }
2468
2469    #[async_test]
2470    async fn test_journal_size_limit() {
2471        let tmpdir_path = new_state_store_workspace();
2472        let store_open_config = SqliteStoreConfig::new(tmpdir_path).journal_size_limit(1500);
2473
2474        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2475
2476        let conn = store.pool.get().await.unwrap();
2477        let journal_size_limit = conn
2478            .query_row("PRAGMA journal_size_limit", (), |row| row.get::<_, u32>(0))
2479            .await
2480            .unwrap();
2481
2482        // The value passed to `SqliteStoreConfig` is in bytes. It stays in
2483        // bytes in SQLite.
2484        assert_eq!(journal_size_limit, 1500);
2485    }
2486
2487    statestore_integration_tests!();
2488}
2489
2490#[cfg(test)]
2491mod migration_tests {
2492    use std::{
2493        path::{Path, PathBuf},
2494        sync::{
2495            Arc, LazyLock,
2496            atomic::{AtomicU32, Ordering::SeqCst},
2497        },
2498    };
2499
2500    use as_variant::as_variant;
2501    use matrix_sdk_base::{
2502        RoomState, StateStore,
2503        media::{MediaFormat, MediaRequestParameters},
2504        store::{
2505            ChildTransactionId, DependentQueuedRequestKind, RoomLoadSettings,
2506            SerializableEventContent,
2507        },
2508        sync::UnreadNotificationsCount,
2509    };
2510    use matrix_sdk_test::async_test;
2511    use ruma::{
2512        EventId, MilliSecondsSinceUnixEpoch, OwnedTransactionId, RoomId, TransactionId, UserId,
2513        events::{
2514            StateEventType,
2515            room::{MediaSource, create::RoomCreateEventContent, message::RoomMessageEventContent},
2516        },
2517        room_id, server_name, user_id,
2518    };
2519    use rusqlite::Transaction;
2520    use serde::{Deserialize, Serialize};
2521    use serde_json::json;
2522    use tempfile::{TempDir, tempdir};
2523    use tokio::{fs, sync::Mutex};
2524    use zeroize::Zeroizing;
2525
2526    use super::{DATABASE_NAME, SqliteStateStore, init, keys};
2527    use crate::{
2528        OpenStoreError, Secret, SqliteStoreConfig,
2529        error::{Error, Result},
2530        utils::{EncryptableStore as _, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
2531    };
2532
2533    static TMP_DIR: LazyLock<TempDir> = LazyLock::new(|| tempdir().unwrap());
2534    static NUM: AtomicU32 = AtomicU32::new(0);
2535    const SECRET: &str = "secret";
2536
2537    fn new_path() -> PathBuf {
2538        let name = NUM.fetch_add(1, SeqCst).to_string();
2539        TMP_DIR.path().join(name)
2540    }
2541
2542    async fn create_fake_db(path: &Path, version: u8) -> Result<SqliteStateStore> {
2543        let config = SqliteStoreConfig::new(path);
2544
2545        fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir).unwrap();
2546
2547        let pool = config.build_pool_of_connections(DATABASE_NAME).unwrap();
2548        let conn = pool.get().await?;
2549
2550        init(&conn).await?;
2551
2552        let store_cipher = Some(Arc::new(
2553            conn.get_or_create_store_cipher(Secret::PassPhrase(Zeroizing::new(SECRET.to_owned())))
2554                .await
2555                .unwrap(),
2556        ));
2557        let this = SqliteStateStore {
2558            store_cipher,
2559            pool,
2560            // Use `conn` as our selected write connections.
2561            write_connection: Arc::new(Mutex::new(conn)),
2562        };
2563        this.run_migrations(1, Some(version)).await?;
2564
2565        Ok(this)
2566    }
2567
2568    fn room_info_v1_json(
2569        room_id: &RoomId,
2570        state: RoomState,
2571        name: Option<&str>,
2572        creator: Option<&UserId>,
2573    ) -> serde_json::Value {
2574        // Test with name set or not.
2575        let name_content = match name {
2576            Some(name) => json!({ "name": name }),
2577            None => json!({ "name": null }),
2578        };
2579        // Test with creator set or not.
2580        let create_content = match creator {
2581            Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2582            None => RoomCreateEventContent::new_v11(),
2583        };
2584
2585        json!({
2586            "room_id": room_id,
2587            "room_type": state,
2588            "notification_counts": UnreadNotificationsCount::default(),
2589            "summary": {
2590                "heroes": [],
2591                "joined_member_count": 0,
2592                "invited_member_count": 0,
2593            },
2594            "members_synced": false,
2595            "base_info": {
2596                "dm_targets": [],
2597                "max_power_level": 100,
2598                "name": {
2599                    "Original": {
2600                        "content": name_content,
2601                    },
2602                },
2603                "create": {
2604                    "Original": {
2605                        "content": create_content,
2606                    }
2607                }
2608            },
2609        })
2610    }
2611
2612    #[async_test]
2613    pub async fn test_migrating_v1_to_v2() {
2614        let path = new_path();
2615        // Create and populate db.
2616        {
2617            let db = create_fake_db(&path, 1).await.unwrap();
2618            let conn = db.pool.get().await.unwrap();
2619
2620            let this = db.clone();
2621            conn.with_transaction(move |txn| {
2622                for i in 0..5 {
2623                    let room_id = RoomId::parse(format!("!room_{i}:localhost")).unwrap();
2624                    let (state, stripped) =
2625                        if i < 3 { (RoomState::Joined, false) } else { (RoomState::Invited, true) };
2626                    let info = room_info_v1_json(&room_id, state, None, None);
2627
2628                    let room_id = this.encode_key(keys::ROOM_INFO, room_id);
2629                    let data = this.serialize_json(&info)?;
2630
2631                    txn.prepare_cached(
2632                        "INSERT INTO room_info (room_id, stripped, data)
2633                         VALUES (?, ?, ?)",
2634                    )?
2635                    .execute((room_id, stripped, data))?;
2636                }
2637
2638                Result::<_, Error>::Ok(())
2639            })
2640            .await
2641            .unwrap();
2642        }
2643
2644        // This transparently migrates to the latest version.
2645        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2646
2647        // Check all room infos are there.
2648        assert_eq!(store.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 5);
2649    }
2650
2651    // Add a room in version 2 format of the state store.
2652    fn add_room_v2(
2653        this: &SqliteStateStore,
2654        txn: &Transaction<'_>,
2655        room_id: &RoomId,
2656        name: Option<&str>,
2657        create_creator: Option<&UserId>,
2658        create_sender: Option<&UserId>,
2659    ) -> Result<(), Error> {
2660        let room_info_json = room_info_v1_json(room_id, RoomState::Joined, name, create_creator);
2661
2662        let encoded_room_id = this.encode_key(keys::ROOM_INFO, room_id);
2663        let encoded_state =
2664            this.encode_key(keys::ROOM_INFO, serde_json::to_string(&RoomState::Joined)?);
2665        let data = this.serialize_json(&room_info_json)?;
2666
2667        txn.prepare_cached(
2668            "INSERT INTO room_info (room_id, state, data)
2669             VALUES (?, ?, ?)",
2670        )?
2671        .execute((encoded_room_id, encoded_state, data))?;
2672
2673        // Test with or without `m.room.create` event in the room state.
2674        let Some(create_sender) = create_sender else {
2675            return Ok(());
2676        };
2677
2678        let create_content = match create_creator {
2679            Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2680            None => RoomCreateEventContent::new_v11(),
2681        };
2682
2683        let event_id = EventId::new_v1(server_name!("dummy.local"));
2684        let create_event = json!({
2685            "content": create_content,
2686            "event_id": event_id,
2687            "sender": create_sender.to_owned(),
2688            "origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
2689            "state_key": "",
2690            "type": "m.room.create",
2691            "unsigned": {},
2692        });
2693
2694        let encoded_room_id = this.encode_key(keys::STATE_EVENT, room_id);
2695        let encoded_event_type =
2696            this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
2697        let encoded_state_key = this.encode_key(keys::STATE_EVENT, "");
2698        let stripped = false;
2699        let encoded_event_id = this.encode_key(keys::STATE_EVENT, event_id);
2700        let data = this.serialize_json(&create_event)?;
2701
2702        txn.prepare_cached(
2703            "INSERT
2704             INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
2705             VALUES (?, ?, ?, ?, ?, ?)",
2706        )?
2707        .execute((
2708            encoded_room_id,
2709            encoded_event_type,
2710            encoded_state_key,
2711            stripped,
2712            encoded_event_id,
2713            data,
2714        ))?;
2715
2716        Ok(())
2717    }
2718
2719    #[async_test]
2720    pub async fn test_migrating_v2_to_v3() {
2721        let path = new_path();
2722
2723        // Room A: with name, creator and sender.
2724        let room_a_id = room_id!("!room_a:dummy.local");
2725        let room_a_name = "Room A";
2726        let room_a_creator = user_id!("@creator:dummy.local");
2727        // Use a different sender to check that sender is used over creator in
2728        // migration.
2729        let room_a_create_sender = user_id!("@sender:dummy.local");
2730
2731        // Room B: without name, creator and sender.
2732        let room_b_id = room_id!("!room_b:dummy.local");
2733
2734        // Room C: only with sender.
2735        let room_c_id = room_id!("!room_c:dummy.local");
2736        let room_c_create_sender = user_id!("@creator:dummy.local");
2737
2738        // Create and populate db.
2739        {
2740            let db = create_fake_db(&path, 2).await.unwrap();
2741            let conn = db.pool.get().await.unwrap();
2742
2743            let this = db.clone();
2744            conn.with_transaction(move |txn| {
2745                add_room_v2(
2746                    &this,
2747                    txn,
2748                    room_a_id,
2749                    Some(room_a_name),
2750                    Some(room_a_creator),
2751                    Some(room_a_create_sender),
2752                )?;
2753                add_room_v2(&this, txn, room_b_id, None, None, None)?;
2754                add_room_v2(&this, txn, room_c_id, None, None, Some(room_c_create_sender))?;
2755
2756                Result::<_, Error>::Ok(())
2757            })
2758            .await
2759            .unwrap();
2760        }
2761
2762        // This transparently migrates to the latest version.
2763        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2764
2765        // Check all room infos are there.
2766        let room_infos = store.get_room_infos(&RoomLoadSettings::default()).await.unwrap();
2767        assert_eq!(room_infos.len(), 3);
2768
2769        let room_a = room_infos.iter().find(|r| r.room_id() == room_a_id).unwrap();
2770        assert_eq!(room_a.name(), Some(room_a_name));
2771        assert_eq!(room_a.creators(), Some(vec![room_a_create_sender.to_owned()]));
2772
2773        let room_b = room_infos.iter().find(|r| r.room_id() == room_b_id).unwrap();
2774        assert_eq!(room_b.name(), None);
2775        assert_eq!(room_b.creators(), None);
2776
2777        let room_c = room_infos.iter().find(|r| r.room_id() == room_c_id).unwrap();
2778        assert_eq!(room_c.name(), None);
2779        assert_eq!(room_c.creators(), Some(vec![room_c_create_sender.to_owned()]));
2780    }
2781
2782    #[async_test]
2783    pub async fn test_migrating_v7_to_v9() {
2784        let path = new_path();
2785
2786        let room_id = room_id!("!room_a:dummy.local");
2787        let wedged_event_transaction_id = TransactionId::new();
2788        let local_event_transaction_id = TransactionId::new();
2789
2790        // Create and populate db.
2791        {
2792            let db = create_fake_db(&path, 7).await.unwrap();
2793            let conn = db.pool.get().await.unwrap();
2794
2795            let wedge_tx = wedged_event_transaction_id.clone();
2796            let local_tx = local_event_transaction_id.clone();
2797
2798            conn.with_transaction(move |txn| {
2799                add_dependent_send_queue_event_v7(
2800                    &db,
2801                    txn,
2802                    room_id,
2803                    &local_tx,
2804                    ChildTransactionId::new(),
2805                    DependentQueuedRequestKind::RedactEvent,
2806                )?;
2807                add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?;
2808                add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?;
2809                Result::<_, Error>::Ok(())
2810            })
2811            .await
2812            .unwrap();
2813        }
2814
2815        // This transparently migrates to the latest version, which clears up all
2816        // requests and dependent requests.
2817        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2818
2819        let requests = store.load_send_queue_requests(room_id).await.unwrap();
2820        assert!(requests.is_empty());
2821
2822        let dependent_requests = store.load_dependent_queued_requests(room_id).await.unwrap();
2823        assert!(dependent_requests.is_empty());
2824    }
2825
2826    fn add_send_queue_event_v7(
2827        this: &SqliteStateStore,
2828        txn: &Transaction<'_>,
2829        transaction_id: &TransactionId,
2830        room_id: &RoomId,
2831        is_wedged: bool,
2832    ) -> Result<(), Error> {
2833        let content =
2834            SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?;
2835
2836        let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id);
2837        let room_id_value = this.serialize_value(&room_id.to_owned())?;
2838
2839        let content = this.serialize_json(&content)?;
2840
2841        txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
2842            .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
2843
2844        Ok(())
2845    }
2846
2847    fn add_dependent_send_queue_event_v7(
2848        this: &SqliteStateStore,
2849        txn: &Transaction<'_>,
2850        room_id: &RoomId,
2851        parent_txn_id: &TransactionId,
2852        own_txn_id: ChildTransactionId,
2853        content: DependentQueuedRequestKind,
2854    ) -> Result<(), Error> {
2855        let room_id_value = this.serialize_value(&room_id.to_owned())?;
2856
2857        let parent_txn_id = parent_txn_id.to_string();
2858        let own_txn_id = own_txn_id.to_string();
2859        let content = this.serialize_json(&content)?;
2860
2861        txn.prepare_cached(
2862            "INSERT INTO dependent_send_queue_events
2863                         (room_id, parent_transaction_id, own_transaction_id, content)
2864                       VALUES (?, ?, ?, ?)",
2865        )?
2866        .execute((room_id_value, parent_txn_id, own_txn_id, content))?;
2867
2868        Ok(())
2869    }
2870
2871    #[derive(Clone, Debug, Serialize, Deserialize)]
2872    pub enum LegacyDependentQueuedRequestKind {
2873        UploadFileWithThumbnail {
2874            content_type: String,
2875            cache_key: MediaRequestParameters,
2876            related_to: OwnedTransactionId,
2877        },
2878    }
2879
2880    #[async_test]
2881    pub async fn test_dependent_queued_request_variant_renaming() {
2882        let path = new_path();
2883        let db = create_fake_db(&path, 7).await.unwrap();
2884
2885        let cache_key = MediaRequestParameters {
2886            format: MediaFormat::File,
2887            source: MediaSource::Plain("https://server.local/foobar".into()),
2888        };
2889        let related_to = TransactionId::new();
2890        let request = LegacyDependentQueuedRequestKind::UploadFileWithThumbnail {
2891            content_type: "image/png".to_owned(),
2892            cache_key,
2893            related_to: related_to.clone(),
2894        };
2895
2896        let data = db
2897            .serialize_json(&request)
2898            .expect("should be able to serialize legacy dependent request");
2899        let deserialized: DependentQueuedRequestKind = db.deserialize_json(&data).expect(
2900            "should be able to deserialize dependent request from legacy dependent request",
2901        );
2902
2903        as_variant!(deserialized, DependentQueuedRequestKind::UploadFileOrThumbnail { related_to: de_related_to, .. } => {
2904            assert_eq!(de_related_to, related_to);
2905        });
2906    }
2907}