matrix_sdk_sqlite/
state_store.rs

1use std::{
2    borrow::Cow,
3    collections::{BTreeMap, BTreeSet, HashMap},
4    fmt, iter,
5    path::Path,
6    str::FromStr as _,
7    sync::Arc,
8};
9
10use async_trait::async_trait;
11use matrix_sdk_base::{
12    deserialized_responses::{DisplayName, RawAnySyncOrStrippedState, SyncOrStrippedState},
13    store::{
14        compare_thread_subscription_bump_stamps, migration_helpers::RoomInfoV1, ChildTransactionId,
15        DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError, QueuedRequest,
16        QueuedRequestKind, RoomLoadSettings, SentRequestKey, StoredThreadSubscription,
17        ThreadSubscriptionStatus,
18    },
19    timer, MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateChanges, StateStore,
20    StateStoreDataKey, StateStoreDataValue, ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK,
21};
22use matrix_sdk_store_encryption::StoreCipher;
23use ruma::{
24    canonical_json::{redact, RedactedBecause},
25    events::{
26        presence::PresenceEvent,
27        receipt::{Receipt, ReceiptThread, ReceiptType},
28        room::{
29            create::RoomCreateEventContent,
30            member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
31        },
32        AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
33        GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
34    },
35    serde::Raw,
36    CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
37    OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
38};
39use rusqlite::{OptionalExtension, Transaction};
40use serde::{Deserialize, Serialize};
41use tokio::{
42    fs,
43    sync::{Mutex, OwnedMutexGuard},
44};
45use tracing::{debug, instrument, trace, warn};
46
47use crate::{
48    connection::{Connection as SqliteAsyncConn, Pool as SqlitePool},
49    error::{Error, Result},
50    utils::{
51        repeat_vars, EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
52        SqliteKeyValueStoreConnExt,
53    },
54    OpenStoreError, Secret, SqliteStoreConfig,
55};
56
57mod keys {
58    // Tables
59    pub const KV_BLOB: &str = "kv_blob";
60    pub const ROOM_INFO: &str = "room_info";
61    pub const STATE_EVENT: &str = "state_event";
62    pub const GLOBAL_ACCOUNT_DATA: &str = "global_account_data";
63    pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
64    pub const MEMBER: &str = "member";
65    pub const PROFILE: &str = "profile";
66    pub const RECEIPT: &str = "receipt";
67    pub const DISPLAY_NAME: &str = "display_name";
68    pub const SEND_QUEUE: &str = "send_queue_events";
69    pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
70    pub const THREAD_SUBSCRIPTIONS: &str = "thread_subscriptions";
71}
72
73/// The filename used for the SQLITE database file used by the state store.
74pub const DATABASE_NAME: &str = "matrix-sdk-state.sqlite3";
75
76/// Identifier of the latest database version.
77///
78/// This is used to figure whether the SQLite database requires a migration.
79/// Every new SQL migration should imply a bump of this number, and changes in
80/// the [`SqliteStateStore::run_migrations`] function.
81const DATABASE_VERSION: u8 = 14;
82
83/// An SQLite-based state store.
84#[derive(Clone)]
85pub struct SqliteStateStore {
86    store_cipher: Option<Arc<StoreCipher>>,
87
88    /// The pool of connections.
89    pool: SqlitePool,
90
91    /// We make the difference between connections for read operations, and for
92    /// write operations. We keep a single connection apart from write
93    /// operations. All other connections are used for read operations. The
94    /// lock is used to ensure there is one owner at a time.
95    write_connection: Arc<Mutex<SqliteAsyncConn>>,
96}
97
98#[cfg(not(tarpaulin_include))]
99impl fmt::Debug for SqliteStateStore {
100    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101        f.debug_struct("SqliteStateStore").finish_non_exhaustive()
102    }
103}
104
105impl SqliteStateStore {
106    /// Open the SQLite-based state store at the given path using the given
107    /// given passphrase to encrypt private data.
108    pub async fn open(
109        path: impl AsRef<Path>,
110        passphrase: Option<&str>,
111    ) -> Result<Self, OpenStoreError> {
112        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
113    }
114
115    /// Open the SQLite-based state store at the given path using the given
116    /// key to encrypt private data.
117    pub async fn open_with_key(
118        path: impl AsRef<Path>,
119        key: Option<&[u8; 32]>,
120    ) -> Result<Self, OpenStoreError> {
121        Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
122    }
123
124    /// Open the SQLite-based state store with the config open config.
125    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
126        fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir)?;
127
128        let pool = config.build_pool_of_connections(DATABASE_NAME)?;
129
130        let this = Self::open_with_pool(pool, config.secret).await?;
131        this.pool.get().await?.apply_runtime_config(config.runtime_config).await?;
132
133        Ok(this)
134    }
135
136    /// Create an SQLite-based state store using the given SQLite database pool.
137    /// The given secret will be used to encrypt private data.
138    pub async fn open_with_pool(
139        pool: SqlitePool,
140        secret: Option<Secret>,
141    ) -> Result<Self, OpenStoreError> {
142        let conn = pool.get().await?;
143
144        let mut version = conn.db_version().await?;
145
146        if version == 0 {
147            init(&conn).await?;
148            version = 1;
149        }
150
151        let store_cipher = match secret {
152            Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
153            None => None,
154        };
155        let this = Self {
156            store_cipher,
157            pool,
158            // Use `conn` as our selected write connections.
159            write_connection: Arc::new(Mutex::new(conn)),
160        };
161        this.run_migrations(version, None).await?;
162
163        Ok(this)
164    }
165
166    /// Run database migrations from the given `from` version to the given `to`
167    /// version
168    ///
169    /// If `to` is `None`, the current database version will be used.
170    async fn run_migrations(&self, from: u8, to: Option<u8>) -> Result<()> {
171        let to = to.unwrap_or(DATABASE_VERSION);
172
173        if from < to {
174            debug!(version = from, new_version = to, "Upgrading database");
175        } else {
176            return Ok(());
177        }
178
179        let conn = self.write().await;
180
181        if from < 2 && to >= 2 {
182            let this = self.clone();
183            conn.with_transaction(move |txn| {
184                // Create new table.
185                txn.execute_batch(include_str!(
186                    "../migrations/state_store/002_a_create_new_room_info.sql"
187                ))?;
188
189                // Migrate data to new table.
190                for data in txn
191                    .prepare("SELECT data FROM room_info")?
192                    .query_map((), |row| row.get::<_, Vec<u8>>(0))?
193                {
194                    let data = data?;
195                    let room_info: RoomInfoV1 = this.deserialize_json(&data)?;
196
197                    let room_id = this.encode_key(keys::ROOM_INFO, room_info.room_id());
198                    let state = this
199                        .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
200                    txn.prepare_cached(
201                        "INSERT OR REPLACE INTO new_room_info (room_id, state, data)
202                         VALUES (?, ?, ?)",
203                    )?
204                    .execute((room_id, state, data))?;
205                }
206
207                // Replace old table.
208                txn.execute_batch(include_str!(
209                    "../migrations/state_store/002_b_replace_room_info.sql"
210                ))?;
211
212                txn.set_db_version(2)?;
213                Result::<_, Error>::Ok(())
214            })
215            .await?;
216        }
217
218        // Migration to v3: RoomInfo format has changed.
219        if from < 3 && to >= 3 {
220            let this = self.clone();
221            conn.with_transaction(move |txn| {
222                // Migrate data .
223                for data in txn
224                    .prepare("SELECT data FROM room_info")?
225                    .query_map((), |row| row.get::<_, Vec<u8>>(0))?
226                {
227                    let data = data?;
228                    let room_info_v1: RoomInfoV1 = this.deserialize_json(&data)?;
229
230                    // Get the `m.room.create` event from the room state.
231                    let room_id = this.encode_key(keys::STATE_EVENT, room_info_v1.room_id());
232                    let event_type =
233                        this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
234                    let create_res = txn
235                        .prepare(
236                            "SELECT stripped, data FROM state_event
237                             WHERE room_id = ? AND event_type = ?",
238                        )?
239                        .query_row([room_id, event_type], |row| {
240                            Ok((row.get::<_, bool>(0)?, row.get::<_, Vec<u8>>(1)?))
241                        })
242                        .optional()?;
243
244                    let create = create_res.and_then(|(stripped, data)| {
245                        let create = if stripped {
246                            SyncOrStrippedState::<RoomCreateEventContent>::Stripped(
247                                this.deserialize_json(&data).ok()?,
248                            )
249                        } else {
250                            SyncOrStrippedState::Sync(this.deserialize_json(&data).ok()?)
251                        };
252                        Some(create)
253                    });
254
255                    let migrated_room_info = room_info_v1.migrate(create.as_ref());
256
257                    let data = this.serialize_json(&migrated_room_info)?;
258                    let room_id = this.encode_key(keys::ROOM_INFO, migrated_room_info.room_id());
259                    txn.prepare_cached("UPDATE room_info SET data = ? WHERE room_id = ?")?
260                        .execute((data, room_id))?;
261                }
262
263                txn.set_db_version(3)?;
264                Result::<_, Error>::Ok(())
265            })
266            .await?;
267        }
268
269        if from < 4 && to >= 4 {
270            conn.with_transaction(move |txn| {
271                // Create new table.
272                txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
273                txn.set_db_version(4)
274            })
275            .await?;
276        }
277
278        if from < 5 && to >= 5 {
279            conn.with_transaction(move |txn| {
280                // Create new table.
281                txn.execute_batch(include_str!(
282                    "../migrations/state_store/004_send_queue_with_roomid_value.sql"
283                ))?;
284                txn.set_db_version(4)
285            })
286            .await?;
287        }
288
289        if from < 6 && to >= 6 {
290            conn.with_transaction(move |txn| {
291                // Create new table.
292                txn.execute_batch(include_str!(
293                    "../migrations/state_store/005_send_queue_dependent_events.sql"
294                ))?;
295                txn.set_db_version(6)
296            })
297            .await?;
298        }
299
300        if from < 7 && to >= 7 {
301            conn.with_transaction(move |txn| {
302                // Drop media table.
303                txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
304                txn.set_db_version(7)
305            })
306            .await?;
307        }
308
309        if from < 8 && to >= 8 {
310            // Replace all existing wedged events with a generic error.
311            let error = QueueWedgeError::GenericApiError {
312                msg: "local echo failed to send in a previous session".into(),
313            };
314            let default_err = self.serialize_value(&error)?;
315
316            conn.with_transaction(move |txn| {
317                // Update send queue table to persist the wedge reason if any.
318                txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?;
319
320                // Migrate the data, add a generic error for currently wedged events
321
322                for wedged_entries in txn
323                    .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")?
324                    .query_map((), |row| {
325                        Ok(
326                            (row.get::<_, Vec<u8>>(0)?,row.get::<_, String>(1)?)
327                        )
328                    })? {
329
330                    let (room_id, transaction_id) = wedged_entries?;
331
332                    txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?
333                        .execute((default_err.clone(), room_id, transaction_id))?;
334                }
335
336
337                // Clean up the table now that data is migrated
338                txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?;
339
340                txn.set_db_version(8)
341            })
342                .await?;
343        }
344
345        if from < 9 && to >= 9 {
346            conn.with_transaction(move |txn| {
347                // Run the migration.
348                txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?;
349                txn.set_db_version(9)
350            })
351            .await?;
352        }
353
354        if from < 10 && to >= 10 {
355            conn.with_transaction(move |txn| {
356                // Run the migration.
357                txn.execute_batch(include_str!(
358                    "../migrations/state_store/009_send_queue_priority.sql"
359                ))?;
360                txn.set_db_version(10)
361            })
362            .await?;
363        }
364
365        if from < 11 && to >= 11 {
366            conn.with_transaction(move |txn| {
367                // Run the migration.
368                txn.execute_batch(include_str!(
369                    "../migrations/state_store/010_send_queue_enqueue_time.sql"
370                ))?;
371                txn.set_db_version(11)
372            })
373            .await?;
374        }
375
376        if from < 12 && to >= 12 {
377            // Defragment the DB and optimize its size on the filesystem.
378            // This should have been run in the migration for version 7, to reduce the size
379            // of the DB as we removed the media cache.
380            conn.vacuum().await?;
381            conn.set_kv("version", vec![12]).await?;
382        }
383
384        if from < 13 && to >= 13 {
385            conn.with_transaction(move |txn| {
386                // Run the migration.
387                txn.execute_batch(include_str!(
388                    "../migrations/state_store/011_thread_subscriptions.sql"
389                ))?;
390                txn.set_db_version(13)
391            })
392            .await?;
393        }
394
395        if from < 14 && to >= 14 {
396            conn.with_transaction(move |txn| {
397                // Run the migration.
398                txn.execute_batch(include_str!(
399                    "../migrations/state_store/012_thread_subscriptions_bumpstamp.sql"
400                ))?;
401                txn.set_db_version(14)
402            })
403            .await?;
404        }
405
406        Ok(())
407    }
408
409    fn encode_state_store_data_key(&self, key: StateStoreDataKey<'_>) -> Key {
410        let key_s = match key {
411            StateStoreDataKey::SyncToken => Cow::Borrowed(StateStoreDataKey::SYNC_TOKEN),
412            StateStoreDataKey::ServerInfo => Cow::Borrowed(StateStoreDataKey::SERVER_INFO),
413            StateStoreDataKey::Filter(f) => {
414                Cow::Owned(format!("{}:{f}", StateStoreDataKey::FILTER))
415            }
416            StateStoreDataKey::UserAvatarUrl(u) => {
417                Cow::Owned(format!("{}:{u}", StateStoreDataKey::USER_AVATAR_URL))
418            }
419            StateStoreDataKey::RecentlyVisitedRooms(b) => {
420                Cow::Owned(format!("{}:{b}", StateStoreDataKey::RECENTLY_VISITED_ROOMS))
421            }
422            StateStoreDataKey::UtdHookManagerData => {
423                Cow::Borrowed(StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
424            }
425            StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
426                Cow::Borrowed(StateStoreDataKey::ONE_TIME_KEY_ALREADY_UPLOADED)
427            }
428            StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
429                if let Some(thread_root) = thread_root {
430                    Cow::Owned(format!(
431                        "{}:{room_id}:{thread_root}",
432                        StateStoreDataKey::COMPOSER_DRAFT
433                    ))
434                } else {
435                    Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::COMPOSER_DRAFT))
436                }
437            }
438            StateStoreDataKey::SeenKnockRequests(room_id) => {
439                Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::SEEN_KNOCK_REQUESTS))
440            }
441            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
442                Cow::Borrowed(StateStoreDataKey::THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS)
443            }
444        };
445
446        self.encode_key(keys::KV_BLOB, &*key_s)
447    }
448
449    fn encode_presence_key(&self, user_id: &UserId) -> Key {
450        self.encode_key(keys::KV_BLOB, format!("presence:{user_id}"))
451    }
452
453    fn encode_custom_key(&self, key: &[u8]) -> Key {
454        let mut full_key = b"custom:".to_vec();
455        full_key.extend(key);
456        self.encode_key(keys::KV_BLOB, full_key)
457    }
458
459    /// Acquire a connection for executing read operations.
460    #[instrument(skip_all)]
461    async fn read(&self) -> Result<SqliteAsyncConn> {
462        trace!("Taking a `read` connection");
463        let _timer = timer!("connection");
464
465        Ok(self.pool.get().await?)
466    }
467
468    /// Acquire a connection for executing write operations.
469    #[instrument(skip_all)]
470    async fn write(&self) -> OwnedMutexGuard<SqliteAsyncConn> {
471        trace!("Taking a `write` connection");
472        let _timer = timer!("connection");
473
474        self.write_connection.clone().lock_owned().await
475    }
476
477    fn remove_maybe_stripped_room_data(
478        &self,
479        txn: &Transaction<'_>,
480        room_id: &RoomId,
481        stripped: bool,
482    ) -> rusqlite::Result<()> {
483        let state_event_room_id = self.encode_key(keys::STATE_EVENT, room_id);
484        txn.remove_room_state_events(&state_event_room_id, Some(stripped))?;
485
486        let member_room_id = self.encode_key(keys::MEMBER, room_id);
487        txn.remove_room_members(&member_room_id, Some(stripped))
488    }
489}
490
491impl EncryptableStore for SqliteStateStore {
492    fn get_cypher(&self) -> Option<&StoreCipher> {
493        self.store_cipher.as_deref()
494    }
495}
496
497/// Initialize the database.
498async fn init(conn: &SqliteAsyncConn) -> Result<()> {
499    // First turn on WAL mode, this can't be done in the transaction, it fails with
500    // the error message: "cannot change into wal mode from within a transaction".
501    conn.execute_batch("PRAGMA journal_mode = wal;").await?;
502    conn.with_transaction(|txn| {
503        txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
504        txn.set_db_version(1)?;
505
506        Ok(())
507    })
508    .await
509}
510
511trait SqliteConnectionStateStoreExt {
512    fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()>;
513
514    fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()>;
515
516    fn set_room_account_data(
517        &self,
518        room_id: &[u8],
519        event_type: &[u8],
520        data: &[u8],
521    ) -> rusqlite::Result<()>;
522    fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()>;
523
524    fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()>;
525    fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>>;
526    fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()>;
527
528    fn set_state_event(
529        &self,
530        room_id: &[u8],
531        event_type: &[u8],
532        state_key: &[u8],
533        stripped: bool,
534        event_id: Option<&[u8]>,
535        data: &[u8],
536    ) -> rusqlite::Result<()>;
537    fn get_state_event_by_id(
538        &self,
539        room_id: &[u8],
540        event_id: &[u8],
541    ) -> rusqlite::Result<Option<Vec<u8>>>;
542    fn remove_room_state_events(
543        &self,
544        room_id: &[u8],
545        stripped: Option<bool>,
546    ) -> rusqlite::Result<()>;
547
548    fn set_member(
549        &self,
550        room_id: &[u8],
551        user_id: &[u8],
552        membership: &[u8],
553        stripped: bool,
554        data: &[u8],
555    ) -> rusqlite::Result<()>;
556    fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()>;
557
558    fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
559    fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()>;
560    fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()>;
561
562    fn set_receipt(
563        &self,
564        room_id: &[u8],
565        user_id: &[u8],
566        receipt_type: &[u8],
567        thread_id: &[u8],
568        event_id: &[u8],
569        data: &[u8],
570    ) -> rusqlite::Result<()>;
571    fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()>;
572
573    fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()>;
574    fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()>;
575    fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()>;
576    fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
577    fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
578}
579
580impl SqliteConnectionStateStoreExt for rusqlite::Connection {
581    fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()> {
582        self.execute("INSERT OR REPLACE INTO kv_blob VALUES (?, ?)", (key, value))?;
583        Ok(())
584    }
585
586    fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()> {
587        self.prepare_cached(
588            "INSERT OR REPLACE INTO global_account_data (event_type, data)
589             VALUES (?, ?)",
590        )?
591        .execute((event_type, data))?;
592        Ok(())
593    }
594
595    fn set_room_account_data(
596        &self,
597        room_id: &[u8],
598        event_type: &[u8],
599        data: &[u8],
600    ) -> rusqlite::Result<()> {
601        self.prepare_cached(
602            "INSERT OR REPLACE INTO room_account_data (room_id, event_type, data)
603             VALUES (?, ?, ?)",
604        )?
605        .execute((room_id, event_type, data))?;
606        Ok(())
607    }
608
609    fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()> {
610        self.prepare(
611            "DELETE FROM room_account_data
612             WHERE room_id = ?",
613        )?
614        .execute((room_id,))?;
615        Ok(())
616    }
617
618    fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()> {
619        self.prepare_cached(
620            "INSERT OR REPLACE INTO room_info (room_id, state, data)
621             VALUES (?, ?, ?)",
622        )?
623        .execute((room_id, state, data))?;
624        Ok(())
625    }
626
627    fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>> {
628        self.query_row("SELECT data FROM room_info WHERE room_id = ?", (room_id,), |row| row.get(0))
629            .optional()
630    }
631
632    /// Remove the room info for the given room.
633    fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()> {
634        self.prepare_cached("DELETE FROM room_info WHERE room_id = ?")?.execute((room_id,))?;
635        Ok(())
636    }
637
638    fn set_state_event(
639        &self,
640        room_id: &[u8],
641        event_type: &[u8],
642        state_key: &[u8],
643        stripped: bool,
644        event_id: Option<&[u8]>,
645        data: &[u8],
646    ) -> rusqlite::Result<()> {
647        self.prepare_cached(
648            "INSERT OR REPLACE
649             INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
650             VALUES (?, ?, ?, ?, ?, ?)",
651        )?
652        .execute((room_id, event_type, state_key, stripped, event_id, data))?;
653        Ok(())
654    }
655
656    fn get_state_event_by_id(
657        &self,
658        room_id: &[u8],
659        event_id: &[u8],
660    ) -> rusqlite::Result<Option<Vec<u8>>> {
661        self.query_row(
662            "SELECT data FROM state_event WHERE room_id = ? AND event_id = ?",
663            (room_id, event_id),
664            |row| row.get(0),
665        )
666        .optional()
667    }
668
669    /// Remove state events for the given room.
670    ///
671    /// If `stripped` is `Some()`, only removes state events for the given
672    /// stripped state. Otherwise, state events are removed regardless of the
673    /// stripped state.
674    fn remove_room_state_events(
675        &self,
676        room_id: &[u8],
677        stripped: Option<bool>,
678    ) -> rusqlite::Result<()> {
679        if let Some(stripped) = stripped {
680            self.prepare_cached("DELETE FROM state_event WHERE room_id = ? AND stripped = ?")?
681                .execute((room_id, stripped))?;
682        } else {
683            self.prepare_cached("DELETE FROM state_event WHERE room_id = ?")?
684                .execute((room_id,))?;
685        }
686        Ok(())
687    }
688
689    fn set_member(
690        &self,
691        room_id: &[u8],
692        user_id: &[u8],
693        membership: &[u8],
694        stripped: bool,
695        data: &[u8],
696    ) -> rusqlite::Result<()> {
697        self.prepare_cached(
698            "INSERT OR REPLACE
699             INTO member (room_id, user_id, membership, stripped, data)
700             VALUES (?, ?, ?, ?, ?)",
701        )?
702        .execute((room_id, user_id, membership, stripped, data))?;
703        Ok(())
704    }
705
706    /// Remove members for the given room.
707    ///
708    /// If `stripped` is `Some()`, only removes members for the given stripped
709    /// state. Otherwise, members are removed regardless of the stripped state.
710    fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()> {
711        if let Some(stripped) = stripped {
712            self.prepare_cached("DELETE FROM member WHERE room_id = ? AND stripped = ?")?
713                .execute((room_id, stripped))?;
714        } else {
715            self.prepare_cached("DELETE FROM member WHERE room_id = ?")?.execute((room_id,))?;
716        }
717        Ok(())
718    }
719
720    fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
721        self.prepare_cached(
722            "INSERT OR REPLACE
723             INTO profile (room_id, user_id, data)
724             VALUES (?, ?, ?)",
725        )?
726        .execute((room_id, user_id, data))?;
727        Ok(())
728    }
729
730    fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()> {
731        self.prepare("DELETE FROM profile WHERE room_id = ?")?.execute((room_id,))?;
732        Ok(())
733    }
734
735    fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()> {
736        self.prepare("DELETE FROM profile WHERE room_id = ? AND user_id = ?")?
737            .execute((room_id, user_id))?;
738        Ok(())
739    }
740
741    fn set_receipt(
742        &self,
743        room_id: &[u8],
744        user_id: &[u8],
745        receipt_type: &[u8],
746        thread: &[u8],
747        event_id: &[u8],
748        data: &[u8],
749    ) -> rusqlite::Result<()> {
750        self.prepare_cached(
751            "INSERT OR REPLACE
752             INTO receipt (room_id, user_id, receipt_type, thread, event_id, data)
753             VALUES (?, ?, ?, ?, ?, ?)",
754        )?
755        .execute((room_id, user_id, receipt_type, thread, event_id, data))?;
756        Ok(())
757    }
758
759    fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()> {
760        self.prepare("DELETE FROM receipt WHERE room_id = ?")?.execute((room_id,))?;
761        Ok(())
762    }
763
764    fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
765        self.prepare_cached(
766            "INSERT OR REPLACE
767             INTO display_name (room_id, name, data)
768             VALUES (?, ?, ?)",
769        )?
770        .execute((room_id, name, data))?;
771        Ok(())
772    }
773
774    fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()> {
775        self.prepare("DELETE FROM display_name WHERE room_id = ? AND name = ?")?
776            .execute((room_id, name))?;
777        Ok(())
778    }
779
780    fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()> {
781        self.prepare("DELETE FROM display_name WHERE room_id = ?")?.execute((room_id,))?;
782        Ok(())
783    }
784
785    fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
786        self.prepare("DELETE FROM send_queue_events WHERE room_id = ?")?.execute((room_id,))?;
787        Ok(())
788    }
789
790    fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
791        self.prepare("DELETE FROM dependent_send_queue_events WHERE room_id = ?")?
792            .execute((room_id,))?;
793        Ok(())
794    }
795}
796
797#[async_trait]
798trait SqliteObjectStateStoreExt: SqliteAsyncConnExt {
799    async fn get_kv_blob(&self, key: Key) -> Result<Option<Vec<u8>>> {
800        Ok(self
801            .query_row("SELECT value FROM kv_blob WHERE key = ?", (key,), |row| row.get(0))
802            .await
803            .optional()?)
804    }
805
806    async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
807        let keys_length = keys.len();
808
809        self.chunk_large_query_over(keys, Some(keys_length), |txn, keys| {
810            let sql_params = repeat_vars(keys.len());
811            let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");
812
813            let params = rusqlite::params_from_iter(keys);
814
815            Ok(txn
816                .prepare(&sql)?
817                .query(params)?
818                .mapped(|row| row.get(0))
819                .collect::<Result<_, _>>()?)
820        })
821        .await
822    }
823
824    async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;
825
826    async fn delete_kv_blob(&self, key: Key) -> Result<()> {
827        self.execute("DELETE FROM kv_blob WHERE key = ?", (key,)).await?;
828        Ok(())
829    }
830
831    async fn get_room_infos(&self, room_id: Option<Key>) -> Result<Vec<Vec<u8>>> {
832        Ok(match room_id {
833            None => {
834                self.prepare("SELECT data FROM room_info", move |mut stmt| {
835                    stmt.query_map((), |row| row.get(0))?.collect()
836                })
837                .await?
838            }
839
840            Some(room_id) => {
841                self.prepare("SELECT data FROM room_info WHERE room_id = ?", move |mut stmt| {
842                    stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
843                })
844                .await?
845            }
846        })
847    }
848
849    async fn get_maybe_stripped_state_events_for_keys(
850        &self,
851        room_id: Key,
852        event_type: Key,
853        state_keys: Vec<Key>,
854    ) -> Result<Vec<(bool, Vec<u8>)>> {
855        self.chunk_large_query_over(state_keys, None, move |txn, state_keys: Vec<Key>| {
856            let sql_params = repeat_vars(state_keys.len());
857            let sql = format!(
858                "SELECT stripped, data FROM state_event
859                 WHERE room_id = ? AND event_type = ? AND state_key IN ({sql_params})"
860            );
861
862            let params = rusqlite::params_from_iter(
863                [room_id.clone(), event_type.clone()].into_iter().chain(state_keys),
864            );
865
866            Ok(txn
867                .prepare(&sql)?
868                .query(params)?
869                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
870                .collect::<Result<_, _>>()?)
871        })
872        .await
873    }
874
875    async fn get_maybe_stripped_state_events(
876        &self,
877        room_id: Key,
878        event_type: Key,
879    ) -> Result<Vec<(bool, Vec<u8>)>> {
880        Ok(self
881            .prepare(
882                "SELECT stripped, data FROM state_event
883                 WHERE room_id = ? AND event_type = ?",
884                |mut stmt| {
885                    stmt.query((room_id, event_type))?
886                        .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
887                        .collect()
888                },
889            )
890            .await?)
891    }
892
893    async fn get_profiles(
894        &self,
895        room_id: Key,
896        user_ids: Vec<Key>,
897    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
898        let user_ids_length = user_ids.len();
899
900        self.chunk_large_query_over(user_ids, Some(user_ids_length), move |txn, user_ids| {
901            let sql_params = repeat_vars(user_ids.len());
902            let sql = format!(
903                "SELECT user_id, data FROM profile WHERE room_id = ? AND user_id IN ({sql_params})"
904            );
905
906            let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(user_ids));
907
908            Ok(txn
909                .prepare(&sql)?
910                .query(params)?
911                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
912                .collect::<Result<_, _>>()?)
913        })
914        .await
915    }
916
917    async fn get_user_ids(&self, room_id: Key, memberships: Vec<Key>) -> Result<Vec<Vec<u8>>> {
918        let res = if memberships.is_empty() {
919            self.prepare("SELECT data FROM member WHERE room_id = ?", |mut stmt| {
920                stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
921            })
922            .await?
923        } else {
924            self.chunk_large_query_over(memberships, None, move |txn, memberships| {
925                let sql_params = repeat_vars(memberships.len());
926                let sql = format!(
927                    "SELECT data FROM member WHERE room_id = ? AND membership IN ({sql_params})"
928                );
929
930                let params =
931                    rusqlite::params_from_iter(iter::once(room_id.clone()).chain(memberships));
932
933                Ok(txn
934                    .prepare(&sql)?
935                    .query(params)?
936                    .mapped(|row| row.get(0))
937                    .collect::<Result<_, _>>()?)
938            })
939            .await?
940        };
941
942        Ok(res)
943    }
944
945    async fn get_global_account_data(&self, event_type: Key) -> Result<Option<Vec<u8>>> {
946        Ok(self
947            .query_row(
948                "SELECT data FROM global_account_data WHERE event_type = ?",
949                (event_type,),
950                |row| row.get(0),
951            )
952            .await
953            .optional()?)
954    }
955
956    async fn get_room_account_data(
957        &self,
958        room_id: Key,
959        event_type: Key,
960    ) -> Result<Option<Vec<u8>>> {
961        Ok(self
962            .query_row(
963                "SELECT data FROM room_account_data WHERE room_id = ? AND event_type = ?",
964                (room_id, event_type),
965                |row| row.get(0),
966            )
967            .await
968            .optional()?)
969    }
970
971    async fn get_display_names(
972        &self,
973        room_id: Key,
974        names: Vec<Key>,
975    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
976        let names_length = names.len();
977
978        self.chunk_large_query_over(names, Some(names_length), move |txn, names| {
979            let sql_params = repeat_vars(names.len());
980            let sql = format!(
981                "SELECT name, data FROM display_name WHERE room_id = ? AND name IN ({sql_params})"
982            );
983
984            let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(names));
985
986            Ok(txn
987                .prepare(&sql)?
988                .query(params)?
989                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
990                .collect::<Result<_, _>>()?)
991        })
992        .await
993    }
994
995    async fn get_user_receipt(
996        &self,
997        room_id: Key,
998        receipt_type: Key,
999        thread: Key,
1000        user_id: Key,
1001    ) -> Result<Option<Vec<u8>>> {
1002        Ok(self
1003            .query_row(
1004                "SELECT data FROM receipt
1005                 WHERE room_id = ? AND receipt_type = ? AND thread = ? and user_id = ?",
1006                (room_id, receipt_type, thread, user_id),
1007                |row| row.get(0),
1008            )
1009            .await
1010            .optional()?)
1011    }
1012
1013    async fn get_event_receipts(
1014        &self,
1015        room_id: Key,
1016        receipt_type: Key,
1017        thread: Key,
1018        event_id: Key,
1019    ) -> Result<Vec<Vec<u8>>> {
1020        Ok(self
1021            .prepare(
1022                "SELECT data FROM receipt
1023                 WHERE room_id = ? AND receipt_type = ? AND thread = ? and event_id = ?",
1024                |mut stmt| {
1025                    stmt.query((room_id, receipt_type, thread, event_id))?
1026                        .mapped(|row| row.get(0))
1027                        .collect()
1028                },
1029            )
1030            .await?)
1031    }
1032}
1033
1034#[async_trait]
1035impl SqliteObjectStateStoreExt for SqliteAsyncConn {
1036    async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()> {
1037        Ok(self.interact(move |conn| conn.set_kv_blob(&key, &value)).await.unwrap()?)
1038    }
1039}
1040
1041#[async_trait]
1042impl StateStore for SqliteStateStore {
1043    type Error = Error;
1044
1045    async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
1046        self.read()
1047            .await?
1048            .get_kv_blob(self.encode_state_store_data_key(key))
1049            .await?
1050            .map(|data| {
1051                Ok(match key {
1052                    StateStoreDataKey::SyncToken => {
1053                        StateStoreDataValue::SyncToken(self.deserialize_value(&data)?)
1054                    }
1055                    StateStoreDataKey::ServerInfo => {
1056                        StateStoreDataValue::ServerInfo(self.deserialize_value(&data)?)
1057                    }
1058                    StateStoreDataKey::Filter(_) => {
1059                        StateStoreDataValue::Filter(self.deserialize_value(&data)?)
1060                    }
1061                    StateStoreDataKey::UserAvatarUrl(_) => {
1062                        StateStoreDataValue::UserAvatarUrl(self.deserialize_value(&data)?)
1063                    }
1064                    StateStoreDataKey::RecentlyVisitedRooms(_) => {
1065                        StateStoreDataValue::RecentlyVisitedRooms(self.deserialize_value(&data)?)
1066                    }
1067                    StateStoreDataKey::UtdHookManagerData => {
1068                        StateStoreDataValue::UtdHookManagerData(self.deserialize_value(&data)?)
1069                    }
1070                    StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1071                        StateStoreDataValue::OneTimeKeyAlreadyUploaded
1072                    }
1073                    StateStoreDataKey::ComposerDraft(_, _) => {
1074                        StateStoreDataValue::ComposerDraft(self.deserialize_value(&data)?)
1075                    }
1076                    StateStoreDataKey::SeenKnockRequests(_) => {
1077                        StateStoreDataValue::SeenKnockRequests(self.deserialize_value(&data)?)
1078                    }
1079                    StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
1080                        StateStoreDataValue::ThreadSubscriptionsCatchupTokens(
1081                            self.deserialize_value(&data)?,
1082                        )
1083                    }
1084                })
1085            })
1086            .transpose()
1087    }
1088
1089    async fn set_kv_data(
1090        &self,
1091        key: StateStoreDataKey<'_>,
1092        value: StateStoreDataValue,
1093    ) -> Result<()> {
1094        let serialized_value = match key {
1095            StateStoreDataKey::SyncToken => self.serialize_value(
1096                &value.into_sync_token().expect("Session data not a sync token"),
1097            )?,
1098            StateStoreDataKey::ServerInfo => self.serialize_value(
1099                &value.into_server_info().expect("Session data not containing server info"),
1100            )?,
1101            StateStoreDataKey::Filter(_) => {
1102                self.serialize_value(&value.into_filter().expect("Session data not a filter"))?
1103            }
1104            StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
1105                &value.into_user_avatar_url().expect("Session data not an user avatar url"),
1106            )?,
1107            StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
1108                &value.into_recently_visited_rooms().expect("Session data not breadcrumbs"),
1109            )?,
1110            StateStoreDataKey::UtdHookManagerData => self.serialize_value(
1111                &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
1112            )?,
1113            StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1114                self.serialize_value(&true).expect("We should be able to serialize a boolean")
1115            }
1116            StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
1117                &value.into_composer_draft().expect("Session data not a composer draft"),
1118            )?,
1119            StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
1120                &value
1121                    .into_seen_knock_requests()
1122                    .expect("Session data is not a set of seen knock request ids"),
1123            )?,
1124            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => self.serialize_value(
1125                &value
1126                    .into_thread_subscriptions_catchup_tokens()
1127                    .expect("Session data is not a list of thread subscription catchup tokens"),
1128            )?,
1129        };
1130
1131        self.write()
1132            .await
1133            .set_kv_blob(self.encode_state_store_data_key(key), serialized_value)
1134            .await
1135    }
1136
1137    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
1138        self.write().await.delete_kv_blob(self.encode_state_store_data_key(key)).await
1139    }
1140
1141    async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
1142        let changes = changes.to_owned();
1143        let this = self.clone();
1144        self.write()
1145            .await
1146            .with_transaction(move |txn| {
1147                let StateChanges {
1148                    sync_token,
1149                    account_data,
1150                    presence,
1151                    profiles,
1152                    profiles_to_delete,
1153                    state,
1154                    room_account_data,
1155                    room_infos,
1156                    receipts,
1157                    redactions,
1158                    stripped_state,
1159                    ambiguity_maps,
1160                } = changes;
1161
1162                if let Some(sync_token) = sync_token {
1163                    let key = this.encode_state_store_data_key(StateStoreDataKey::SyncToken);
1164                    let value = this.serialize_value(&sync_token)?;
1165                    txn.set_kv_blob(&key, &value)?;
1166                }
1167
1168                for (event_type, event) in account_data {
1169                    let event_type =
1170                        this.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1171                    let data = this.serialize_json(&event)?;
1172                    txn.set_global_account_data(&event_type, &data)?;
1173                }
1174
1175                for (room_id, events) in room_account_data {
1176                    let room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1177                    for (event_type, event) in events {
1178                        let event_type =
1179                            this.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1180                        let data = this.serialize_json(&event)?;
1181                        txn.set_room_account_data(&room_id, &event_type, &data)?;
1182                    }
1183                }
1184
1185                for (user_id, event) in presence {
1186                    let key = this.encode_presence_key(&user_id);
1187                    let value = this.serialize_json(&event)?;
1188                    txn.set_kv_blob(&key, &value)?;
1189                }
1190
1191                for (room_id, room_info) in room_infos {
1192                    let stripped = room_info.state() == RoomState::Invited;
1193                    // Remove non-stripped data for stripped rooms and vice-versa.
1194                    this.remove_maybe_stripped_room_data(txn, &room_id, !stripped)?;
1195
1196                    let room_id = this.encode_key(keys::ROOM_INFO, room_id);
1197                    let state = this
1198                        .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
1199                    let data = this.serialize_json(&room_info)?;
1200                    txn.set_room_info(&room_id, &state, &data)?;
1201                }
1202
1203                for (room_id, user_ids) in profiles_to_delete {
1204                    let room_id = this.encode_key(keys::PROFILE, room_id);
1205                    for user_id in user_ids {
1206                        let user_id = this.encode_key(keys::PROFILE, user_id);
1207                        txn.remove_room_profile(&room_id, &user_id)?;
1208                    }
1209                }
1210
1211                for (room_id, state_event_types) in state {
1212                    let profiles = profiles.get(&room_id);
1213                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1214
1215                    for (event_type, state_events) in state_event_types {
1216                        let encoded_event_type =
1217                            this.encode_key(keys::STATE_EVENT, event_type.to_string());
1218
1219                        for (state_key, raw_state_event) in state_events {
1220                            let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1221                            let data = this.serialize_json(&raw_state_event)?;
1222
1223                            let event_id: Option<String> =
1224                                raw_state_event.get_field("event_id").ok().flatten();
1225                            let encoded_event_id =
1226                                event_id.as_ref().map(|e| this.encode_key(keys::STATE_EVENT, e));
1227
1228                            txn.set_state_event(
1229                                &encoded_room_id,
1230                                &encoded_event_type,
1231                                &encoded_state_key,
1232                                false,
1233                                encoded_event_id.as_deref(),
1234                                &data,
1235                            )?;
1236
1237                            if event_type == StateEventType::RoomMember {
1238                                let member_event = match raw_state_event
1239                                    .deserialize_as_unchecked::<SyncRoomMemberEvent>()
1240                                {
1241                                    Ok(ev) => ev,
1242                                    Err(e) => {
1243                                        debug!(event_id, "Failed to deserialize member event: {e}");
1244                                        continue;
1245                                    }
1246                                };
1247
1248                                let encoded_room_id = this.encode_key(keys::MEMBER, &room_id);
1249                                let user_id = this.encode_key(keys::MEMBER, &state_key);
1250                                let membership = this
1251                                    .encode_key(keys::MEMBER, member_event.membership().as_str());
1252                                let data = this.serialize_value(&state_key)?;
1253
1254                                txn.set_member(
1255                                    &encoded_room_id,
1256                                    &user_id,
1257                                    &membership,
1258                                    false,
1259                                    &data,
1260                                )?;
1261
1262                                if let Some(profile) =
1263                                    profiles.and_then(|p| p.get(member_event.state_key()))
1264                                {
1265                                    let room_id = this.encode_key(keys::PROFILE, &room_id);
1266                                    let user_id = this.encode_key(keys::PROFILE, &state_key);
1267                                    let data = this.serialize_json(&profile)?;
1268                                    txn.set_profile(&room_id, &user_id, &data)?;
1269                                }
1270                            }
1271                        }
1272                    }
1273                }
1274
1275                for (room_id, stripped_state_event_types) in stripped_state {
1276                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1277
1278                    for (event_type, stripped_state_events) in stripped_state_event_types {
1279                        let encoded_event_type =
1280                            this.encode_key(keys::STATE_EVENT, event_type.to_string());
1281
1282                        for (state_key, raw_stripped_state_event) in stripped_state_events {
1283                            let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1284                            let data = this.serialize_json(&raw_stripped_state_event)?;
1285                            txn.set_state_event(
1286                                &encoded_room_id,
1287                                &encoded_event_type,
1288                                &encoded_state_key,
1289                                true,
1290                                None,
1291                                &data,
1292                            )?;
1293
1294                            if event_type == StateEventType::RoomMember {
1295                                let member_event = match raw_stripped_state_event
1296                                    .deserialize_as_unchecked::<StrippedRoomMemberEvent>(
1297                                ) {
1298                                    Ok(ev) => ev,
1299                                    Err(e) => {
1300                                        debug!("Failed to deserialize stripped member event: {e}");
1301                                        continue;
1302                                    }
1303                                };
1304
1305                                let room_id = this.encode_key(keys::MEMBER, &room_id);
1306                                let user_id = this.encode_key(keys::MEMBER, &state_key);
1307                                let membership = this.encode_key(
1308                                    keys::MEMBER,
1309                                    member_event.content.membership.as_str(),
1310                                );
1311                                let data = this.serialize_value(&state_key)?;
1312
1313                                txn.set_member(&room_id, &user_id, &membership, true, &data)?;
1314                            }
1315                        }
1316                    }
1317                }
1318
1319                for (room_id, receipt_event) in receipts {
1320                    let room_id = this.encode_key(keys::RECEIPT, room_id);
1321
1322                    for (event_id, receipt_types) in receipt_event {
1323                        let encoded_event_id = this.encode_key(keys::RECEIPT, &event_id);
1324
1325                        for (receipt_type, receipt_users) in receipt_types {
1326                            let receipt_type =
1327                                this.encode_key(keys::RECEIPT, receipt_type.as_str());
1328
1329                            for (user_id, receipt) in receipt_users {
1330                                let encoded_user_id = this.encode_key(keys::RECEIPT, &user_id);
1331                                // We cannot have a NULL primary key so we rely on serialization
1332                                // instead of the string representation.
1333                                let thread = this.encode_key(
1334                                    keys::RECEIPT,
1335                                    rmp_serde::to_vec_named(&receipt.thread)?,
1336                                );
1337                                let data = this.serialize_json(&ReceiptData {
1338                                    receipt,
1339                                    event_id: event_id.clone(),
1340                                    user_id,
1341                                })?;
1342
1343                                txn.set_receipt(
1344                                    &room_id,
1345                                    &encoded_user_id,
1346                                    &receipt_type,
1347                                    &thread,
1348                                    &encoded_event_id,
1349                                    &data,
1350                                )?;
1351                            }
1352                        }
1353                    }
1354                }
1355
1356                for (room_id, redactions) in redactions {
1357                    let make_redaction_rules = || {
1358                        let encoded_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1359                        txn.get_room_info(&encoded_room_id)
1360                            .ok()
1361                            .flatten()
1362                            .and_then(|v| this.deserialize_json::<RoomInfo>(&v).ok())
1363                            .map(|info| info.room_version_rules_or_default())
1364                            .unwrap_or_else(|| {
1365                                warn!(
1366                                    ?room_id,
1367                                    "Unable to get the room version rules, defaulting to rules for room version {ROOM_VERSION_FALLBACK}"
1368                                );
1369                                ROOM_VERSION_RULES_FALLBACK
1370                            }).redaction
1371                    };
1372
1373                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1374                    let mut redaction_rules = None;
1375
1376                    for (event_id, redaction) in redactions {
1377                        let event_id = this.encode_key(keys::STATE_EVENT, event_id);
1378
1379                        if let Some(Ok(raw_event)) = txn
1380                            .get_state_event_by_id(&encoded_room_id, &event_id)?
1381                            .map(|value| this.deserialize_json::<Raw<AnySyncStateEvent>>(&value))
1382                        {
1383                            let event = raw_event.deserialize()?;
1384                            let redacted = redact(
1385                                raw_event.deserialize_as::<CanonicalJsonObject>()?,
1386                                redaction_rules.get_or_insert_with(make_redaction_rules),
1387                                Some(RedactedBecause::from_raw_event(&redaction)?),
1388                            )
1389                            .map_err(Error::Redaction)?;
1390                            let data = this.serialize_json(&redacted)?;
1391
1392                            let event_type =
1393                                this.encode_key(keys::STATE_EVENT, event.event_type().to_string());
1394                            let state_key = this.encode_key(keys::STATE_EVENT, event.state_key());
1395
1396                            txn.set_state_event(
1397                                &encoded_room_id,
1398                                &event_type,
1399                                &state_key,
1400                                false,
1401                                Some(&event_id),
1402                                &data,
1403                            )?;
1404                        }
1405                    }
1406                }
1407
1408                for (room_id, display_names) in ambiguity_maps {
1409                    let room_id = this.encode_key(keys::DISPLAY_NAME, room_id);
1410
1411                    for (name, user_ids) in display_names {
1412                        let encoded_name = this.encode_key(
1413                            keys::DISPLAY_NAME,
1414                            name.as_normalized_str().unwrap_or_else(|| name.as_raw_str()),
1415                        );
1416                        let data = this.serialize_json(&user_ids)?;
1417
1418                        if user_ids.is_empty() {
1419                            txn.remove_display_name(&room_id, &encoded_name)?;
1420
1421                            // We can't do a migration to merge the previously distinct buckets of
1422                            // user IDs since the display names themselves are hashed before they
1423                            // are persisted in the store. So the store will always retain two
1424                            // buckets: one for raw display names and one for normalised ones.
1425                            //
1426                            // We therefore do the next best thing, which is a sort of a soft
1427                            // migration: we fetch both the raw and normalised buckets, then merge
1428                            // the user IDs contained in them into a separate, temporary merged
1429                            // bucket. The SDK then operates on the merged buckets exclusively. See
1430                            // the comment in `get_users_with_display_names` for details.
1431                            //
1432                            // If the merged bucket is empty, that must mean that both the raw and
1433                            // normalised buckets were also empty, so we can remove both from the
1434                            // store.
1435                            let raw_name = this.encode_key(keys::DISPLAY_NAME, name.as_raw_str());
1436                            txn.remove_display_name(&room_id, &raw_name)?;
1437                        } else {
1438                            // We only create new buckets with the normalized display name.
1439                            txn.set_display_name(&room_id, &encoded_name, &data)?;
1440                        }
1441                    }
1442                }
1443
1444                Ok::<_, Error>(())
1445            })
1446            .await?;
1447
1448        Ok(())
1449    }
1450
1451    async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1452        self.read()
1453            .await?
1454            .get_kv_blob(self.encode_presence_key(user_id))
1455            .await?
1456            .map(|data| self.deserialize_json(&data))
1457            .transpose()
1458    }
1459
1460    async fn get_presence_events(
1461        &self,
1462        user_ids: &[OwnedUserId],
1463    ) -> Result<Vec<Raw<PresenceEvent>>> {
1464        if user_ids.is_empty() {
1465            return Ok(Vec::new());
1466        }
1467
1468        let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
1469        self.read()
1470            .await?
1471            .get_kv_blobs(user_ids)
1472            .await?
1473            .into_iter()
1474            .map(|data| self.deserialize_json(&data))
1475            .collect()
1476    }
1477
1478    async fn get_state_event(
1479        &self,
1480        room_id: &RoomId,
1481        event_type: StateEventType,
1482        state_key: &str,
1483    ) -> Result<Option<RawAnySyncOrStrippedState>> {
1484        Ok(self
1485            .get_state_events_for_keys(room_id, event_type, &[state_key])
1486            .await?
1487            .into_iter()
1488            .next())
1489    }
1490
1491    async fn get_state_events(
1492        &self,
1493        room_id: &RoomId,
1494        event_type: StateEventType,
1495    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1496        let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1497        let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1498        self.read()
1499            .await?
1500            .get_maybe_stripped_state_events(room_id, event_type)
1501            .await?
1502            .into_iter()
1503            .map(|(stripped, data)| {
1504                let ev = if stripped {
1505                    RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1506                } else {
1507                    RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1508                };
1509
1510                Ok(ev)
1511            })
1512            .collect()
1513    }
1514
1515    async fn get_state_events_for_keys(
1516        &self,
1517        room_id: &RoomId,
1518        event_type: StateEventType,
1519        state_keys: &[&str],
1520    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1521        if state_keys.is_empty() {
1522            return Ok(Vec::new());
1523        }
1524
1525        let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1526        let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1527        let state_keys = state_keys.iter().map(|k| self.encode_key(keys::STATE_EVENT, k)).collect();
1528        self.read()
1529            .await?
1530            .get_maybe_stripped_state_events_for_keys(room_id, event_type, state_keys)
1531            .await?
1532            .into_iter()
1533            .map(|(stripped, data)| {
1534                let ev = if stripped {
1535                    RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1536                } else {
1537                    RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1538                };
1539
1540                Ok(ev)
1541            })
1542            .collect()
1543    }
1544
1545    async fn get_profile(
1546        &self,
1547        room_id: &RoomId,
1548        user_id: &UserId,
1549    ) -> Result<Option<MinimalRoomMemberEvent>> {
1550        let room_id = self.encode_key(keys::PROFILE, room_id);
1551        let user_ids = vec![self.encode_key(keys::PROFILE, user_id)];
1552
1553        self.read()
1554            .await?
1555            .get_profiles(room_id, user_ids)
1556            .await?
1557            .into_iter()
1558            .next()
1559            .map(|(_, data)| self.deserialize_json(&data))
1560            .transpose()
1561    }
1562
1563    async fn get_profiles<'a>(
1564        &self,
1565        room_id: &RoomId,
1566        user_ids: &'a [OwnedUserId],
1567    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1568        if user_ids.is_empty() {
1569            return Ok(BTreeMap::new());
1570        }
1571
1572        let room_id = self.encode_key(keys::PROFILE, room_id);
1573        let mut user_ids_map = user_ids
1574            .iter()
1575            .map(|u| (self.encode_key(keys::PROFILE, u), u.as_ref()))
1576            .collect::<BTreeMap<_, _>>();
1577        let user_ids = user_ids_map.keys().cloned().collect();
1578
1579        self.read()
1580            .await?
1581            .get_profiles(room_id, user_ids)
1582            .await?
1583            .into_iter()
1584            .map(|(user_id, data)| {
1585                Ok((
1586                    user_ids_map
1587                        .remove(user_id.as_slice())
1588                        .expect("returned user IDs were requested"),
1589                    self.deserialize_json(&data)?,
1590                ))
1591            })
1592            .collect()
1593    }
1594
1595    async fn get_user_ids(
1596        &self,
1597        room_id: &RoomId,
1598        membership: RoomMemberships,
1599    ) -> Result<Vec<OwnedUserId>> {
1600        let room_id = self.encode_key(keys::MEMBER, room_id);
1601        let memberships = membership
1602            .as_vec()
1603            .into_iter()
1604            .map(|m| self.encode_key(keys::MEMBER, m.as_str()))
1605            .collect();
1606        self.read()
1607            .await?
1608            .get_user_ids(room_id, memberships)
1609            .await?
1610            .iter()
1611            .map(|data| self.deserialize_value(data))
1612            .collect()
1613    }
1614
1615    async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1616        self.read()
1617            .await?
1618            .get_room_infos(match room_load_settings {
1619                RoomLoadSettings::All => None,
1620                RoomLoadSettings::One(room_id) => Some(self.encode_key(keys::ROOM_INFO, room_id)),
1621            })
1622            .await?
1623            .into_iter()
1624            .map(|data| self.deserialize_json(&data))
1625            .collect()
1626    }
1627
1628    async fn get_users_with_display_name(
1629        &self,
1630        room_id: &RoomId,
1631        display_name: &DisplayName,
1632    ) -> Result<BTreeSet<OwnedUserId>> {
1633        let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1634        let names = vec![self.encode_key(
1635            keys::DISPLAY_NAME,
1636            display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1637        )];
1638
1639        Ok(self
1640            .read()
1641            .await?
1642            .get_display_names(room_id, names)
1643            .await?
1644            .into_iter()
1645            .next()
1646            .map(|(_, data)| self.deserialize_json(&data))
1647            .transpose()?
1648            .unwrap_or_default())
1649    }
1650
1651    async fn get_users_with_display_names<'a>(
1652        &self,
1653        room_id: &RoomId,
1654        display_names: &'a [DisplayName],
1655    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1656        let mut result = HashMap::new();
1657
1658        if display_names.is_empty() {
1659            return Ok(result);
1660        }
1661
1662        let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1663        let mut names_map = display_names
1664            .iter()
1665            .flat_map(|display_name| {
1666                // We encode the display name as the `raw_str()` and the normalized string.
1667                //
1668                // This is for compatibility reasons since:
1669                //  1. Previously "Alice" and "alice" were considered to be distinct display
1670                //     names, while we now consider them to be the same so we need to merge the
1671                //     previously distinct buckets of user IDs.
1672                //  2. We can't do a migration to merge the previously distinct buckets of user
1673                //     IDs since the display names itself are hashed before they are persisted
1674                //     in the store.
1675                let raw =
1676                    (self.encode_key(keys::DISPLAY_NAME, display_name.as_raw_str()), display_name);
1677                let normalized = display_name.as_normalized_str().map(|normalized| {
1678                    (self.encode_key(keys::DISPLAY_NAME, normalized), display_name)
1679                });
1680
1681                iter::once(raw).chain(normalized)
1682            })
1683            .collect::<BTreeMap<_, _>>();
1684        let names = names_map.keys().cloned().collect();
1685
1686        for (name, data) in self.read().await?.get_display_names(room_id, names).await?.into_iter()
1687        {
1688            let display_name =
1689                names_map.remove(name.as_slice()).expect("returned display names were requested");
1690            let user_ids: BTreeSet<_> = self.deserialize_json(&data)?;
1691
1692            result.entry(display_name).or_insert_with(BTreeSet::new).extend(user_ids);
1693        }
1694
1695        Ok(result)
1696    }
1697
1698    async fn get_account_data_event(
1699        &self,
1700        event_type: GlobalAccountDataEventType,
1701    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1702        let event_type = self.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1703        self.read()
1704            .await?
1705            .get_global_account_data(event_type)
1706            .await?
1707            .map(|value| self.deserialize_json(&value))
1708            .transpose()
1709    }
1710
1711    async fn get_room_account_data_event(
1712        &self,
1713        room_id: &RoomId,
1714        event_type: RoomAccountDataEventType,
1715    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1716        let room_id = self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1717        let event_type = self.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1718        self.read()
1719            .await?
1720            .get_room_account_data(room_id, event_type)
1721            .await?
1722            .map(|value| self.deserialize_json(&value))
1723            .transpose()
1724    }
1725
1726    async fn get_user_room_receipt_event(
1727        &self,
1728        room_id: &RoomId,
1729        receipt_type: ReceiptType,
1730        thread: ReceiptThread,
1731        user_id: &UserId,
1732    ) -> Result<Option<(OwnedEventId, Receipt)>> {
1733        let room_id = self.encode_key(keys::RECEIPT, room_id);
1734        let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1735        // We cannot have a NULL primary key so we rely on serialization instead of the
1736        // string representation.
1737        let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1738        let user_id = self.encode_key(keys::RECEIPT, user_id);
1739
1740        self.read()
1741            .await?
1742            .get_user_receipt(room_id, receipt_type, thread, user_id)
1743            .await?
1744            .map(|value| {
1745                self.deserialize_json::<ReceiptData>(&value).map(|d| (d.event_id, d.receipt))
1746            })
1747            .transpose()
1748    }
1749
1750    async fn get_event_room_receipt_events(
1751        &self,
1752        room_id: &RoomId,
1753        receipt_type: ReceiptType,
1754        thread: ReceiptThread,
1755        event_id: &EventId,
1756    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1757        let room_id = self.encode_key(keys::RECEIPT, room_id);
1758        let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1759        // We cannot have a NULL primary key so we rely on serialization instead of the
1760        // string representation.
1761        let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1762        let event_id = self.encode_key(keys::RECEIPT, event_id);
1763
1764        self.read()
1765            .await?
1766            .get_event_receipts(room_id, receipt_type, thread, event_id)
1767            .await?
1768            .iter()
1769            .map(|value| {
1770                self.deserialize_json::<ReceiptData>(value).map(|d| (d.user_id, d.receipt))
1771            })
1772            .collect()
1773    }
1774
1775    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1776        self.read().await?.get_kv_blob(self.encode_custom_key(key)).await
1777    }
1778
1779    async fn set_custom_value_no_read(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
1780        let conn = self.write().await;
1781        let key = self.encode_custom_key(key);
1782        conn.set_kv_blob(key, value).await?;
1783        Ok(())
1784    }
1785
1786    async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1787        let conn = self.write().await;
1788        let key = self.encode_custom_key(key);
1789        let previous = conn.get_kv_blob(key.clone()).await?;
1790        conn.set_kv_blob(key, value).await?;
1791        Ok(previous)
1792    }
1793
1794    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1795        let conn = self.write().await;
1796        let key = self.encode_custom_key(key);
1797        let previous = conn.get_kv_blob(key.clone()).await?;
1798        if previous.is_some() {
1799            conn.delete_kv_blob(key).await?;
1800        }
1801        Ok(previous)
1802    }
1803
1804    async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1805        let this = self.clone();
1806        let room_id = room_id.to_owned();
1807
1808        let conn = self.write().await;
1809
1810        conn.with_transaction(move |txn| -> Result<()> {
1811            let room_info_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1812            txn.remove_room_info(&room_info_room_id)?;
1813
1814            let state_event_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1815            txn.remove_room_state_events(&state_event_room_id, None)?;
1816
1817            let member_room_id = this.encode_key(keys::MEMBER, &room_id);
1818            txn.remove_room_members(&member_room_id, None)?;
1819
1820            let profile_room_id = this.encode_key(keys::PROFILE, &room_id);
1821            txn.remove_room_profiles(&profile_room_id)?;
1822
1823            let room_account_data_room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, &room_id);
1824            txn.remove_room_account_data(&room_account_data_room_id)?;
1825
1826            let receipt_room_id = this.encode_key(keys::RECEIPT, &room_id);
1827            txn.remove_room_receipts(&receipt_room_id)?;
1828
1829            let display_name_room_id = this.encode_key(keys::DISPLAY_NAME, &room_id);
1830            txn.remove_room_display_names(&display_name_room_id)?;
1831
1832            let send_queue_room_id = this.encode_key(keys::SEND_QUEUE, &room_id);
1833            txn.remove_room_send_queue(&send_queue_room_id)?;
1834
1835            let dependent_send_queue_room_id =
1836                this.encode_key(keys::DEPENDENTS_SEND_QUEUE, &room_id);
1837            txn.remove_room_dependent_send_queue(&dependent_send_queue_room_id)?;
1838
1839            let thread_subscriptions_room_id =
1840                this.encode_key(keys::THREAD_SUBSCRIPTIONS, &room_id);
1841            txn.execute(
1842                "DELETE FROM thread_subscriptions WHERE room_id = ?",
1843                (thread_subscriptions_room_id,),
1844            )?;
1845
1846            Ok(())
1847        })
1848        .await?;
1849
1850        conn.vacuum().await
1851    }
1852
1853    async fn save_send_queue_request(
1854        &self,
1855        room_id: &RoomId,
1856        transaction_id: OwnedTransactionId,
1857        created_at: MilliSecondsSinceUnixEpoch,
1858        content: QueuedRequestKind,
1859        priority: usize,
1860    ) -> Result<(), Self::Error> {
1861        let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
1862        let room_id_value = self.serialize_value(&room_id.to_owned())?;
1863
1864        let content = self.serialize_json(&content)?;
1865        // The transaction id is used both as a key (in remove/update) and a value (as
1866        // it's useful for the callers), so we keep it as is, and neither hash
1867        // it (with encode_key) or encrypt it (through serialize_value). After
1868        // all, it carries no personal information, so this is considered fine.
1869
1870        let created_at_ts: u64 = created_at.0.into();
1871        self.write()
1872            .await
1873            .with_transaction(move |txn| {
1874                txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
1875                Ok(())
1876            })
1877            .await
1878    }
1879
1880    async fn update_send_queue_request(
1881        &self,
1882        room_id: &RoomId,
1883        transaction_id: &TransactionId,
1884        content: QueuedRequestKind,
1885    ) -> Result<bool, Self::Error> {
1886        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1887
1888        let content = self.serialize_json(&content)?;
1889        // See comment in [`Self::save_send_queue_event`] to understand why the
1890        // transaction id is neither encrypted or hashed.
1891        let transaction_id = transaction_id.to_string();
1892
1893        let num_updated = self.write()
1894            .await
1895            .with_transaction(move |txn| {
1896                txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
1897            })
1898            .await?;
1899
1900        Ok(num_updated > 0)
1901    }
1902
1903    async fn remove_send_queue_request(
1904        &self,
1905        room_id: &RoomId,
1906        transaction_id: &TransactionId,
1907    ) -> Result<bool, Self::Error> {
1908        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1909
1910        // See comment in `save_send_queue_event`.
1911        let transaction_id = transaction_id.to_string();
1912
1913        let num_deleted = self
1914            .write()
1915            .await
1916            .with_transaction(move |txn| {
1917                txn.prepare_cached(
1918                    "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
1919                )?
1920                .execute((room_id, &transaction_id))
1921            })
1922            .await?;
1923
1924        Ok(num_deleted > 0)
1925    }
1926
1927    async fn load_send_queue_requests(
1928        &self,
1929        room_id: &RoomId,
1930    ) -> Result<Vec<QueuedRequest>, Self::Error> {
1931        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1932
1933        // Note: ROWID is always present and is an auto-incremented integer counter. We
1934        // want to maintain the insertion order, so we can sort using it.
1935        // Note 2: transaction_id is not encoded, see why in `save_send_queue_event`.
1936        let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, Option<u64>)> = self
1937            .read()
1938            .await?
1939            .prepare(
1940                "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
1941                |mut stmt| {
1942                    stmt.query((room_id,))?
1943                        .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
1944                        .collect()
1945                },
1946            )
1947            .await?;
1948
1949        let mut requests = Vec::with_capacity(res.len());
1950
1951        for entry in res {
1952            let created_at = entry
1953                .4
1954                .and_then(UInt::new)
1955                .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
1956
1957            requests.push(QueuedRequest {
1958                transaction_id: entry.0.into(),
1959                kind: self.deserialize_json(&entry.1)?,
1960                error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
1961                priority: entry.3,
1962                created_at,
1963            });
1964        }
1965
1966        Ok(requests)
1967    }
1968
1969    async fn update_send_queue_request_status(
1970        &self,
1971        room_id: &RoomId,
1972        transaction_id: &TransactionId,
1973        error: Option<QueueWedgeError>,
1974    ) -> Result<(), Self::Error> {
1975        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1976
1977        // See comment in `save_send_queue_event`.
1978        let transaction_id = transaction_id.to_string();
1979
1980        // Serialize the error to json bytes (encrypted if option is enabled) if set.
1981        let error_value = error.map(|e| self.serialize_value(&e)).transpose()?;
1982
1983        self.write()
1984            .await
1985            .with_transaction(move |txn| {
1986                txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?;
1987                Ok(())
1988            })
1989            .await
1990    }
1991
1992    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1993        // If the values were not encrypted, we could use `SELECT DISTINCT` here, but we
1994        // have to manually do the deduplication: indeed, for all X, encrypt(X)
1995        // != encrypted(X), since we use a nonce in the encryption process.
1996
1997        let res: Vec<Vec<u8>> = self
1998            .read()
1999            .await?
2000            .prepare("SELECT room_id_val FROM send_queue_events", |mut stmt| {
2001                stmt.query(())?.mapped(|row| row.get(0)).collect()
2002            })
2003            .await?;
2004
2005        // So we collect the results into a `BTreeSet` to perform the deduplication, and
2006        // then rejigger that into a vector.
2007        Ok(res
2008            .into_iter()
2009            .map(|entry| self.deserialize_value(&entry))
2010            .collect::<Result<BTreeSet<OwnedRoomId>, _>>()?
2011            .into_iter()
2012            .collect())
2013    }
2014
2015    async fn save_dependent_queued_request(
2016        &self,
2017        room_id: &RoomId,
2018        parent_txn_id: &TransactionId,
2019        own_txn_id: ChildTransactionId,
2020        created_at: MilliSecondsSinceUnixEpoch,
2021        content: DependentQueuedRequestKind,
2022    ) -> Result<()> {
2023        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2024        let content = self.serialize_json(&content)?;
2025
2026        // See comment in `save_send_queue_event`.
2027        let parent_txn_id = parent_txn_id.to_string();
2028        let own_txn_id = own_txn_id.to_string();
2029
2030        let created_at_ts: u64 = created_at.0.into();
2031        self.write()
2032            .await
2033            .with_transaction(move |txn| {
2034                txn.prepare_cached(
2035                    r#"INSERT INTO dependent_send_queue_events
2036                         (room_id, parent_transaction_id, own_transaction_id, content, created_at)
2037                       VALUES (?, ?, ?, ?, ?)"#,
2038                )?
2039                .execute((
2040                    room_id,
2041                    parent_txn_id,
2042                    own_txn_id,
2043                    content,
2044                    created_at_ts,
2045                ))?;
2046                Ok(())
2047            })
2048            .await
2049    }
2050
2051    async fn update_dependent_queued_request(
2052        &self,
2053        room_id: &RoomId,
2054        own_transaction_id: &ChildTransactionId,
2055        new_content: DependentQueuedRequestKind,
2056    ) -> Result<bool> {
2057        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2058        let content = self.serialize_json(&new_content)?;
2059
2060        // See comment in `save_send_queue_event`.
2061        let own_txn_id = own_transaction_id.to_string();
2062
2063        let num_updated = self
2064            .write()
2065            .await
2066            .with_transaction(move |txn| {
2067                txn.prepare_cached(
2068                    r#"UPDATE dependent_send_queue_events
2069                       SET content = ?
2070                       WHERE own_transaction_id = ?
2071                       AND room_id = ?"#,
2072                )?
2073                .execute((content, own_txn_id, room_id))
2074            })
2075            .await?;
2076
2077        if num_updated > 1 {
2078            return Err(Error::InconsistentUpdate);
2079        }
2080
2081        Ok(num_updated == 1)
2082    }
2083
2084    async fn mark_dependent_queued_requests_as_ready(
2085        &self,
2086        room_id: &RoomId,
2087        parent_txn_id: &TransactionId,
2088        parent_key: SentRequestKey,
2089    ) -> Result<usize> {
2090        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2091        let parent_key = self.serialize_value(&parent_key)?;
2092
2093        // See comment in `save_send_queue_event`.
2094        let parent_txn_id = parent_txn_id.to_string();
2095
2096        self.write()
2097            .await
2098            .with_transaction(move |txn| {
2099                Ok(txn.prepare_cached(
2100                    "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?",
2101                )?
2102                .execute((parent_key, parent_txn_id, room_id))?)
2103            })
2104            .await
2105    }
2106
2107    async fn remove_dependent_queued_request(
2108        &self,
2109        room_id: &RoomId,
2110        txn_id: &ChildTransactionId,
2111    ) -> Result<bool> {
2112        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2113
2114        // See comment in `save_send_queue_event`.
2115        let txn_id = txn_id.to_string();
2116
2117        let num_deleted = self
2118            .write()
2119            .await
2120            .with_transaction(move |txn| {
2121                txn.prepare_cached(
2122                    "DELETE FROM dependent_send_queue_events WHERE own_transaction_id = ? AND room_id = ?",
2123                )?
2124                .execute((txn_id, room_id))
2125            })
2126            .await?;
2127
2128        Ok(num_deleted > 0)
2129    }
2130
2131    async fn load_dependent_queued_requests(
2132        &self,
2133        room_id: &RoomId,
2134    ) -> Result<Vec<DependentQueuedRequest>> {
2135        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2136
2137        // Note: transaction_id is not encoded, see why in `save_send_queue_event`.
2138        let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, Option<u64>)> = self
2139            .read()
2140            .await?
2141            .prepare(
2142                "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2143                |mut stmt| {
2144                    stmt.query((room_id,))?
2145                        .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2146                        .collect()
2147                },
2148            )
2149            .await?;
2150
2151        let mut dependent_events = Vec::with_capacity(res.len());
2152
2153        for entry in res {
2154            let created_at = entry
2155                .4
2156                .and_then(UInt::new)
2157                .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2158
2159            dependent_events.push(DependentQueuedRequest {
2160                own_transaction_id: entry.0.into(),
2161                parent_transaction_id: entry.1.into(),
2162                parent_key: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?,
2163                kind: self.deserialize_json(&entry.3)?,
2164                created_at,
2165            });
2166        }
2167
2168        Ok(dependent_events)
2169    }
2170
2171    async fn upsert_thread_subscription(
2172        &self,
2173        room_id: &RoomId,
2174        thread_id: &EventId,
2175        mut new: StoredThreadSubscription,
2176    ) -> Result<(), Self::Error> {
2177        if let Some(previous) = self.load_thread_subscription(room_id, thread_id).await? {
2178            if previous == new {
2179                // No need to update anything.
2180                trace!("not saving thread subscription because the subscription is the same");
2181                return Ok(());
2182            }
2183
2184            if !compare_thread_subscription_bump_stamps(previous.bump_stamp, &mut new.bump_stamp) {
2185                trace!("not saving thread subscription because we have a newer bump stamp");
2186                return Ok(());
2187            }
2188        }
2189
2190        let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2191        let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2192        let status = new.status.as_str();
2193
2194        self.write()
2195            .await
2196            .with_transaction(move |txn| {
2197                // Try to find a previous value.
2198                txn.prepare_cached(
2199                    "INSERT OR REPLACE INTO thread_subscriptions (room_id, event_id, status, bump_stamp)
2200                         VALUES (?, ?, ?, ?)",
2201                )?
2202                .execute((room_id, thread_id, status, new.bump_stamp))
2203            })
2204            .await?;
2205        Ok(())
2206    }
2207
2208    async fn load_thread_subscription(
2209        &self,
2210        room_id: &RoomId,
2211        thread_id: &EventId,
2212    ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
2213        let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2214        let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2215
2216        Ok(self
2217            .read()
2218            .await?
2219            .query_row(
2220                "SELECT status, bump_stamp FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2221                (room_id, thread_id),
2222                |row| Ok((row.get::<_, String>(0)?, row.get::<_, Option<u64>>(1)?))
2223            )
2224            .await
2225            .optional()?
2226            .map(|(status, bump_stamp)| -> Result<_, Self::Error> {
2227                let status = ThreadSubscriptionStatus::from_str(&status).map_err(|_| {
2228                    Error::InvalidData { details: format!("Invalid thread status: {status}") }
2229                })?;
2230                Ok(StoredThreadSubscription { status, bump_stamp })
2231            })
2232            .transpose()?)
2233    }
2234
2235    async fn remove_thread_subscription(
2236        &self,
2237        room_id: &RoomId,
2238        thread_id: &EventId,
2239    ) -> Result<(), Self::Error> {
2240        let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2241        let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2242
2243        self.write()
2244            .await
2245            .execute(
2246                "DELETE FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2247                (room_id, thread_id),
2248            )
2249            .await?;
2250
2251        Ok(())
2252    }
2253}
2254
2255#[derive(Debug, Clone, Serialize, Deserialize)]
2256struct ReceiptData {
2257    receipt: Receipt,
2258    event_id: OwnedEventId,
2259    user_id: OwnedUserId,
2260}
2261
2262#[cfg(test)]
2263mod tests {
2264    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2265
2266    use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2267    use once_cell::sync::Lazy;
2268    use tempfile::{tempdir, TempDir};
2269
2270    use super::SqliteStateStore;
2271
2272    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2273    static NUM: AtomicU32 = AtomicU32::new(0);
2274
2275    async fn get_store() -> Result<impl StateStore, StoreError> {
2276        let name = NUM.fetch_add(1, SeqCst).to_string();
2277        let tmpdir_path = TMP_DIR.path().join(name);
2278
2279        tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2280
2281        Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
2282    }
2283
2284    statestore_integration_tests!();
2285}
2286
2287#[cfg(test)]
2288mod encrypted_tests {
2289    use std::{
2290        path::PathBuf,
2291        sync::atomic::{AtomicU32, Ordering::SeqCst},
2292    };
2293
2294    use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2295    use matrix_sdk_test::async_test;
2296    use once_cell::sync::Lazy;
2297    use tempfile::{tempdir, TempDir};
2298
2299    use super::SqliteStateStore;
2300    use crate::{utils::SqliteAsyncConnExt, SqliteStoreConfig};
2301
2302    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2303    static NUM: AtomicU32 = AtomicU32::new(0);
2304
2305    fn new_state_store_workspace() -> PathBuf {
2306        let name = NUM.fetch_add(1, SeqCst).to_string();
2307        TMP_DIR.path().join(name)
2308    }
2309
2310    async fn get_store() -> Result<impl StateStore, StoreError> {
2311        let tmpdir_path = new_state_store_workspace();
2312
2313        tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2314
2315        Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
2316            .await
2317            .unwrap())
2318    }
2319
2320    #[async_test]
2321    async fn test_pool_size() {
2322        let tmpdir_path = new_state_store_workspace();
2323        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
2324
2325        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2326
2327        assert_eq!(store.pool.status().max_size, 42);
2328    }
2329
2330    #[async_test]
2331    async fn test_cache_size() {
2332        let tmpdir_path = new_state_store_workspace();
2333        let store_open_config = SqliteStoreConfig::new(tmpdir_path).cache_size(1500);
2334
2335        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2336
2337        let conn = store.pool.get().await.unwrap();
2338        let cache_size =
2339            conn.query_row("PRAGMA cache_size", (), |row| row.get::<_, i32>(0)).await.unwrap();
2340
2341        // The value passed to `SqliteStoreConfig` is in bytes. Check it is
2342        // converted to kibibytes. Also, it must be a negative value because it
2343        // _is_ the size in kibibytes, not in page size.
2344        assert_eq!(cache_size, -(1500 / 1024));
2345    }
2346
2347    #[async_test]
2348    async fn test_journal_size_limit() {
2349        let tmpdir_path = new_state_store_workspace();
2350        let store_open_config = SqliteStoreConfig::new(tmpdir_path).journal_size_limit(1500);
2351
2352        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2353
2354        let conn = store.pool.get().await.unwrap();
2355        let journal_size_limit = conn
2356            .query_row("PRAGMA journal_size_limit", (), |row| row.get::<_, u32>(0))
2357            .await
2358            .unwrap();
2359
2360        // The value passed to `SqliteStoreConfig` is in bytes. It stays in
2361        // bytes in SQLite.
2362        assert_eq!(journal_size_limit, 1500);
2363    }
2364
2365    statestore_integration_tests!();
2366}
2367
2368#[cfg(test)]
2369mod migration_tests {
2370    use std::{
2371        path::{Path, PathBuf},
2372        sync::{
2373            atomic::{AtomicU32, Ordering::SeqCst},
2374            Arc,
2375        },
2376    };
2377
2378    use as_variant::as_variant;
2379    use matrix_sdk_base::{
2380        media::{MediaFormat, MediaRequestParameters},
2381        store::{
2382            ChildTransactionId, DependentQueuedRequestKind, RoomLoadSettings,
2383            SerializableEventContent,
2384        },
2385        sync::UnreadNotificationsCount,
2386        RoomState, StateStore,
2387    };
2388    use matrix_sdk_test::async_test;
2389    use once_cell::sync::Lazy;
2390    use ruma::{
2391        events::{
2392            room::{create::RoomCreateEventContent, message::RoomMessageEventContent, MediaSource},
2393            StateEventType,
2394        },
2395        room_id, server_name, user_id, EventId, MilliSecondsSinceUnixEpoch, OwnedTransactionId,
2396        RoomId, TransactionId, UserId,
2397    };
2398    use rusqlite::Transaction;
2399    use serde::{Deserialize, Serialize};
2400    use serde_json::json;
2401    use tempfile::{tempdir, TempDir};
2402    use tokio::{fs, sync::Mutex};
2403    use zeroize::Zeroizing;
2404
2405    use super::{init, keys, SqliteStateStore, DATABASE_NAME};
2406    use crate::{
2407        error::{Error, Result},
2408        utils::{EncryptableStore as _, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
2409        OpenStoreError, Secret, SqliteStoreConfig,
2410    };
2411
2412    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2413    static NUM: AtomicU32 = AtomicU32::new(0);
2414    const SECRET: &str = "secret";
2415
2416    fn new_path() -> PathBuf {
2417        let name = NUM.fetch_add(1, SeqCst).to_string();
2418        TMP_DIR.path().join(name)
2419    }
2420
2421    async fn create_fake_db(path: &Path, version: u8) -> Result<SqliteStateStore> {
2422        let config = SqliteStoreConfig::new(path);
2423
2424        fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir).unwrap();
2425
2426        let pool = config.build_pool_of_connections(DATABASE_NAME).unwrap();
2427        let conn = pool.get().await?;
2428
2429        init(&conn).await?;
2430
2431        let store_cipher = Some(Arc::new(
2432            conn.get_or_create_store_cipher(Secret::PassPhrase(Zeroizing::new(SECRET.to_owned())))
2433                .await
2434                .unwrap(),
2435        ));
2436        let this = SqliteStateStore {
2437            store_cipher,
2438            pool,
2439            // Use `conn` as our selected write connections.
2440            write_connection: Arc::new(Mutex::new(conn)),
2441        };
2442        this.run_migrations(1, Some(version)).await?;
2443
2444        Ok(this)
2445    }
2446
2447    fn room_info_v1_json(
2448        room_id: &RoomId,
2449        state: RoomState,
2450        name: Option<&str>,
2451        creator: Option<&UserId>,
2452    ) -> serde_json::Value {
2453        // Test with name set or not.
2454        let name_content = match name {
2455            Some(name) => json!({ "name": name }),
2456            None => json!({ "name": null }),
2457        };
2458        // Test with creator set or not.
2459        let create_content = match creator {
2460            Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2461            None => RoomCreateEventContent::new_v11(),
2462        };
2463
2464        json!({
2465            "room_id": room_id,
2466            "room_type": state,
2467            "notification_counts": UnreadNotificationsCount::default(),
2468            "summary": {
2469                "heroes": [],
2470                "joined_member_count": 0,
2471                "invited_member_count": 0,
2472            },
2473            "members_synced": false,
2474            "base_info": {
2475                "dm_targets": [],
2476                "max_power_level": 100,
2477                "name": {
2478                    "Original": {
2479                        "content": name_content,
2480                    },
2481                },
2482                "create": {
2483                    "Original": {
2484                        "content": create_content,
2485                    }
2486                }
2487            },
2488        })
2489    }
2490
2491    #[async_test]
2492    pub async fn test_migrating_v1_to_v2() {
2493        let path = new_path();
2494        // Create and populate db.
2495        {
2496            let db = create_fake_db(&path, 1).await.unwrap();
2497            let conn = db.pool.get().await.unwrap();
2498
2499            let this = db.clone();
2500            conn.with_transaction(move |txn| {
2501                for i in 0..5 {
2502                    let room_id = RoomId::parse(format!("!room_{i}:localhost")).unwrap();
2503                    let (state, stripped) =
2504                        if i < 3 { (RoomState::Joined, false) } else { (RoomState::Invited, true) };
2505                    let info = room_info_v1_json(&room_id, state, None, None);
2506
2507                    let room_id = this.encode_key(keys::ROOM_INFO, room_id);
2508                    let data = this.serialize_json(&info)?;
2509
2510                    txn.prepare_cached(
2511                        "INSERT INTO room_info (room_id, stripped, data)
2512                         VALUES (?, ?, ?)",
2513                    )?
2514                    .execute((room_id, stripped, data))?;
2515                }
2516
2517                Result::<_, Error>::Ok(())
2518            })
2519            .await
2520            .unwrap();
2521        }
2522
2523        // This transparently migrates to the latest version.
2524        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2525
2526        // Check all room infos are there.
2527        assert_eq!(store.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 5);
2528    }
2529
2530    // Add a room in version 2 format of the state store.
2531    fn add_room_v2(
2532        this: &SqliteStateStore,
2533        txn: &Transaction<'_>,
2534        room_id: &RoomId,
2535        name: Option<&str>,
2536        create_creator: Option<&UserId>,
2537        create_sender: Option<&UserId>,
2538    ) -> Result<(), Error> {
2539        let room_info_json = room_info_v1_json(room_id, RoomState::Joined, name, create_creator);
2540
2541        let encoded_room_id = this.encode_key(keys::ROOM_INFO, room_id);
2542        let encoded_state =
2543            this.encode_key(keys::ROOM_INFO, serde_json::to_string(&RoomState::Joined)?);
2544        let data = this.serialize_json(&room_info_json)?;
2545
2546        txn.prepare_cached(
2547            "INSERT INTO room_info (room_id, state, data)
2548             VALUES (?, ?, ?)",
2549        )?
2550        .execute((encoded_room_id, encoded_state, data))?;
2551
2552        // Test with or without `m.room.create` event in the room state.
2553        let Some(create_sender) = create_sender else {
2554            return Ok(());
2555        };
2556
2557        let create_content = match create_creator {
2558            Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2559            None => RoomCreateEventContent::new_v11(),
2560        };
2561
2562        let event_id = EventId::new(server_name!("dummy.local"));
2563        let create_event = json!({
2564            "content": create_content,
2565            "event_id": event_id,
2566            "sender": create_sender.to_owned(),
2567            "origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
2568            "state_key": "",
2569            "type": "m.room.create",
2570            "unsigned": {},
2571        });
2572
2573        let encoded_room_id = this.encode_key(keys::STATE_EVENT, room_id);
2574        let encoded_event_type =
2575            this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
2576        let encoded_state_key = this.encode_key(keys::STATE_EVENT, "");
2577        let stripped = false;
2578        let encoded_event_id = this.encode_key(keys::STATE_EVENT, event_id);
2579        let data = this.serialize_json(&create_event)?;
2580
2581        txn.prepare_cached(
2582            "INSERT
2583             INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
2584             VALUES (?, ?, ?, ?, ?, ?)",
2585        )?
2586        .execute((
2587            encoded_room_id,
2588            encoded_event_type,
2589            encoded_state_key,
2590            stripped,
2591            encoded_event_id,
2592            data,
2593        ))?;
2594
2595        Ok(())
2596    }
2597
2598    #[async_test]
2599    pub async fn test_migrating_v2_to_v3() {
2600        let path = new_path();
2601
2602        // Room A: with name, creator and sender.
2603        let room_a_id = room_id!("!room_a:dummy.local");
2604        let room_a_name = "Room A";
2605        let room_a_creator = user_id!("@creator:dummy.local");
2606        // Use a different sender to check that sender is used over creator in
2607        // migration.
2608        let room_a_create_sender = user_id!("@sender:dummy.local");
2609
2610        // Room B: without name, creator and sender.
2611        let room_b_id = room_id!("!room_b:dummy.local");
2612
2613        // Room C: only with sender.
2614        let room_c_id = room_id!("!room_c:dummy.local");
2615        let room_c_create_sender = user_id!("@creator:dummy.local");
2616
2617        // Create and populate db.
2618        {
2619            let db = create_fake_db(&path, 2).await.unwrap();
2620            let conn = db.pool.get().await.unwrap();
2621
2622            let this = db.clone();
2623            conn.with_transaction(move |txn| {
2624                add_room_v2(
2625                    &this,
2626                    txn,
2627                    room_a_id,
2628                    Some(room_a_name),
2629                    Some(room_a_creator),
2630                    Some(room_a_create_sender),
2631                )?;
2632                add_room_v2(&this, txn, room_b_id, None, None, None)?;
2633                add_room_v2(&this, txn, room_c_id, None, None, Some(room_c_create_sender))?;
2634
2635                Result::<_, Error>::Ok(())
2636            })
2637            .await
2638            .unwrap();
2639        }
2640
2641        // This transparently migrates to the latest version.
2642        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2643
2644        // Check all room infos are there.
2645        let room_infos = store.get_room_infos(&RoomLoadSettings::default()).await.unwrap();
2646        assert_eq!(room_infos.len(), 3);
2647
2648        let room_a = room_infos.iter().find(|r| r.room_id() == room_a_id).unwrap();
2649        assert_eq!(room_a.name(), Some(room_a_name));
2650        assert_eq!(room_a.creators(), Some(vec![room_a_create_sender.to_owned()]));
2651
2652        let room_b = room_infos.iter().find(|r| r.room_id() == room_b_id).unwrap();
2653        assert_eq!(room_b.name(), None);
2654        assert_eq!(room_b.creators(), None);
2655
2656        let room_c = room_infos.iter().find(|r| r.room_id() == room_c_id).unwrap();
2657        assert_eq!(room_c.name(), None);
2658        assert_eq!(room_c.creators(), Some(vec![room_c_create_sender.to_owned()]));
2659    }
2660
2661    #[async_test]
2662    pub async fn test_migrating_v7_to_v9() {
2663        let path = new_path();
2664
2665        let room_id = room_id!("!room_a:dummy.local");
2666        let wedged_event_transaction_id = TransactionId::new();
2667        let local_event_transaction_id = TransactionId::new();
2668
2669        // Create and populate db.
2670        {
2671            let db = create_fake_db(&path, 7).await.unwrap();
2672            let conn = db.pool.get().await.unwrap();
2673
2674            let wedge_tx = wedged_event_transaction_id.clone();
2675            let local_tx = local_event_transaction_id.clone();
2676
2677            conn.with_transaction(move |txn| {
2678                add_dependent_send_queue_event_v7(
2679                    &db,
2680                    txn,
2681                    room_id,
2682                    &local_tx,
2683                    ChildTransactionId::new(),
2684                    DependentQueuedRequestKind::RedactEvent,
2685                )?;
2686                add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?;
2687                add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?;
2688                Result::<_, Error>::Ok(())
2689            })
2690            .await
2691            .unwrap();
2692        }
2693
2694        // This transparently migrates to the latest version, which clears up all
2695        // requests and dependent requests.
2696        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2697
2698        let requests = store.load_send_queue_requests(room_id).await.unwrap();
2699        assert!(requests.is_empty());
2700
2701        let dependent_requests = store.load_dependent_queued_requests(room_id).await.unwrap();
2702        assert!(dependent_requests.is_empty());
2703    }
2704
2705    fn add_send_queue_event_v7(
2706        this: &SqliteStateStore,
2707        txn: &Transaction<'_>,
2708        transaction_id: &TransactionId,
2709        room_id: &RoomId,
2710        is_wedged: bool,
2711    ) -> Result<(), Error> {
2712        let content =
2713            SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?;
2714
2715        let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id);
2716        let room_id_value = this.serialize_value(&room_id.to_owned())?;
2717
2718        let content = this.serialize_json(&content)?;
2719
2720        txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
2721            .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
2722
2723        Ok(())
2724    }
2725
2726    fn add_dependent_send_queue_event_v7(
2727        this: &SqliteStateStore,
2728        txn: &Transaction<'_>,
2729        room_id: &RoomId,
2730        parent_txn_id: &TransactionId,
2731        own_txn_id: ChildTransactionId,
2732        content: DependentQueuedRequestKind,
2733    ) -> Result<(), Error> {
2734        let room_id_value = this.serialize_value(&room_id.to_owned())?;
2735
2736        let parent_txn_id = parent_txn_id.to_string();
2737        let own_txn_id = own_txn_id.to_string();
2738        let content = this.serialize_json(&content)?;
2739
2740        txn.prepare_cached(
2741            "INSERT INTO dependent_send_queue_events
2742                         (room_id, parent_transaction_id, own_transaction_id, content)
2743                       VALUES (?, ?, ?, ?)",
2744        )?
2745        .execute((room_id_value, parent_txn_id, own_txn_id, content))?;
2746
2747        Ok(())
2748    }
2749
2750    #[derive(Clone, Debug, Serialize, Deserialize)]
2751    pub enum LegacyDependentQueuedRequestKind {
2752        UploadFileWithThumbnail {
2753            content_type: String,
2754            cache_key: MediaRequestParameters,
2755            related_to: OwnedTransactionId,
2756        },
2757    }
2758
2759    #[async_test]
2760    pub async fn test_dependent_queued_request_variant_renaming() {
2761        let path = new_path();
2762        let db = create_fake_db(&path, 7).await.unwrap();
2763
2764        let cache_key = MediaRequestParameters {
2765            format: MediaFormat::File,
2766            source: MediaSource::Plain("https://server.local/foobar".into()),
2767        };
2768        let related_to = TransactionId::new();
2769        let request = LegacyDependentQueuedRequestKind::UploadFileWithThumbnail {
2770            content_type: "image/png".to_owned(),
2771            cache_key,
2772            related_to: related_to.clone(),
2773        };
2774
2775        let data = db
2776            .serialize_json(&request)
2777            .expect("should be able to serialize legacy dependent request");
2778        let deserialized: DependentQueuedRequestKind = db.deserialize_json(&data).expect(
2779            "should be able to deserialize dependent request from legacy dependent request",
2780        );
2781
2782        as_variant!(deserialized, DependentQueuedRequestKind::UploadFileOrThumbnail { related_to: de_related_to, .. } => {
2783            assert_eq!(de_related_to, related_to);
2784        });
2785    }
2786}