matrix_sdk_sqlite/
state_store.rs

1use std::{
2    borrow::Cow,
3    collections::{BTreeMap, BTreeSet, HashMap},
4    fmt, iter,
5    path::Path,
6    str::FromStr as _,
7    sync::Arc,
8};
9
10use async_trait::async_trait;
11use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
12use matrix_sdk_base::{
13    deserialized_responses::{DisplayName, RawAnySyncOrStrippedState, SyncOrStrippedState},
14    store::{
15        compare_thread_subscription_bump_stamps, migration_helpers::RoomInfoV1, ChildTransactionId,
16        DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError, QueuedRequest,
17        QueuedRequestKind, RoomLoadSettings, SentRequestKey, StoredThreadSubscription,
18        ThreadSubscriptionStatus,
19    },
20    timer, MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateChanges, StateStore,
21    StateStoreDataKey, StateStoreDataValue, ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK,
22};
23use matrix_sdk_store_encryption::StoreCipher;
24use ruma::{
25    canonical_json::{redact, RedactedBecause},
26    events::{
27        presence::PresenceEvent,
28        receipt::{Receipt, ReceiptThread, ReceiptType},
29        room::{
30            create::RoomCreateEventContent,
31            member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
32        },
33        AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
34        GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
35    },
36    serde::Raw,
37    CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
38    OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
39};
40use rusqlite::{OptionalExtension, Transaction};
41use serde::{Deserialize, Serialize};
42use tokio::{
43    fs,
44    sync::{Mutex, OwnedMutexGuard},
45};
46use tracing::{debug, instrument, trace, warn};
47
48use crate::{
49    error::{Error, Result},
50    utils::{
51        repeat_vars, EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
52        SqliteKeyValueStoreConnExt,
53    },
54    OpenStoreError, Secret, SqliteStoreConfig,
55};
56
57mod keys {
58    // Tables
59    pub const KV_BLOB: &str = "kv_blob";
60    pub const ROOM_INFO: &str = "room_info";
61    pub const STATE_EVENT: &str = "state_event";
62    pub const GLOBAL_ACCOUNT_DATA: &str = "global_account_data";
63    pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
64    pub const MEMBER: &str = "member";
65    pub const PROFILE: &str = "profile";
66    pub const RECEIPT: &str = "receipt";
67    pub const DISPLAY_NAME: &str = "display_name";
68    pub const SEND_QUEUE: &str = "send_queue_events";
69    pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
70    pub const THREAD_SUBSCRIPTIONS: &str = "thread_subscriptions";
71}
72
73/// The filename used for the SQLITE database file used by the state store.
74pub const DATABASE_NAME: &str = "matrix-sdk-state.sqlite3";
75
76/// Identifier of the latest database version.
77///
78/// This is used to figure whether the SQLite database requires a migration.
79/// Every new SQL migration should imply a bump of this number, and changes in
80/// the [`SqliteStateStore::run_migrations`] function.
81const DATABASE_VERSION: u8 = 14;
82
83/// An SQLite-based state store.
84#[derive(Clone)]
85pub struct SqliteStateStore {
86    store_cipher: Option<Arc<StoreCipher>>,
87
88    /// The pool of connections.
89    pool: SqlitePool,
90
91    /// We make the difference between connections for read operations, and for
92    /// write operations. We keep a single connection apart from write
93    /// operations. All other connections are used for read operations. The
94    /// lock is used to ensure there is one owner at a time.
95    write_connection: Arc<Mutex<SqliteAsyncConn>>,
96}
97
98#[cfg(not(tarpaulin_include))]
99impl fmt::Debug for SqliteStateStore {
100    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101        f.debug_struct("SqliteStateStore").finish_non_exhaustive()
102    }
103}
104
105impl SqliteStateStore {
106    /// Open the SQLite-based state store at the given path using the given
107    /// given passphrase to encrypt private data.
108    pub async fn open(
109        path: impl AsRef<Path>,
110        passphrase: Option<&str>,
111    ) -> Result<Self, OpenStoreError> {
112        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
113    }
114
115    /// Open the SQLite-based state store at the given path using the given
116    /// key to encrypt private data.
117    pub async fn open_with_key(
118        path: impl AsRef<Path>,
119        key: Option<&[u8; 32]>,
120    ) -> Result<Self, OpenStoreError> {
121        Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
122    }
123
124    /// Open the SQLite-based state store with the config open config.
125    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
126        let SqliteStoreConfig { path, pool_config, runtime_config, secret } = config;
127
128        fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
129
130        let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
131        config.pool = Some(pool_config);
132
133        let pool = config.create_pool(Runtime::Tokio1)?;
134
135        let this = Self::open_with_pool(pool, secret).await?;
136        this.pool.get().await?.apply_runtime_config(runtime_config).await?;
137
138        Ok(this)
139    }
140
141    /// Create an SQLite-based state store using the given SQLite database pool.
142    /// The given secret will be used to encrypt private data.
143    pub async fn open_with_pool(
144        pool: SqlitePool,
145        secret: Option<Secret>,
146    ) -> Result<Self, OpenStoreError> {
147        let conn = pool.get().await?;
148
149        let mut version = conn.db_version().await?;
150
151        if version == 0 {
152            init(&conn).await?;
153            version = 1;
154        }
155
156        let store_cipher = match secret {
157            Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
158            None => None,
159        };
160        let this = Self {
161            store_cipher,
162            pool,
163            // Use `conn` as our selected write connections.
164            write_connection: Arc::new(Mutex::new(conn)),
165        };
166        this.run_migrations(version, None).await?;
167
168        Ok(this)
169    }
170
171    /// Run database migrations from the given `from` version to the given `to`
172    /// version
173    ///
174    /// If `to` is `None`, the current database version will be used.
175    async fn run_migrations(&self, from: u8, to: Option<u8>) -> Result<()> {
176        let to = to.unwrap_or(DATABASE_VERSION);
177
178        if from < to {
179            debug!(version = from, new_version = to, "Upgrading database");
180        } else {
181            return Ok(());
182        }
183
184        let conn = self.write().await;
185
186        if from < 2 && to >= 2 {
187            let this = self.clone();
188            conn.with_transaction(move |txn| {
189                // Create new table.
190                txn.execute_batch(include_str!(
191                    "../migrations/state_store/002_a_create_new_room_info.sql"
192                ))?;
193
194                // Migrate data to new table.
195                for data in txn
196                    .prepare("SELECT data FROM room_info")?
197                    .query_map((), |row| row.get::<_, Vec<u8>>(0))?
198                {
199                    let data = data?;
200                    let room_info: RoomInfoV1 = this.deserialize_json(&data)?;
201
202                    let room_id = this.encode_key(keys::ROOM_INFO, room_info.room_id());
203                    let state = this
204                        .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
205                    txn.prepare_cached(
206                        "INSERT OR REPLACE INTO new_room_info (room_id, state, data)
207                         VALUES (?, ?, ?)",
208                    )?
209                    .execute((room_id, state, data))?;
210                }
211
212                // Replace old table.
213                txn.execute_batch(include_str!(
214                    "../migrations/state_store/002_b_replace_room_info.sql"
215                ))?;
216
217                txn.set_db_version(2)?;
218                Result::<_, Error>::Ok(())
219            })
220            .await?;
221        }
222
223        // Migration to v3: RoomInfo format has changed.
224        if from < 3 && to >= 3 {
225            let this = self.clone();
226            conn.with_transaction(move |txn| {
227                // Migrate data .
228                for data in txn
229                    .prepare("SELECT data FROM room_info")?
230                    .query_map((), |row| row.get::<_, Vec<u8>>(0))?
231                {
232                    let data = data?;
233                    let room_info_v1: RoomInfoV1 = this.deserialize_json(&data)?;
234
235                    // Get the `m.room.create` event from the room state.
236                    let room_id = this.encode_key(keys::STATE_EVENT, room_info_v1.room_id());
237                    let event_type =
238                        this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
239                    let create_res = txn
240                        .prepare(
241                            "SELECT stripped, data FROM state_event
242                             WHERE room_id = ? AND event_type = ?",
243                        )?
244                        .query_row([room_id, event_type], |row| {
245                            Ok((row.get::<_, bool>(0)?, row.get::<_, Vec<u8>>(1)?))
246                        })
247                        .optional()?;
248
249                    let create = create_res.and_then(|(stripped, data)| {
250                        let create = if stripped {
251                            SyncOrStrippedState::<RoomCreateEventContent>::Stripped(
252                                this.deserialize_json(&data).ok()?,
253                            )
254                        } else {
255                            SyncOrStrippedState::Sync(this.deserialize_json(&data).ok()?)
256                        };
257                        Some(create)
258                    });
259
260                    let migrated_room_info = room_info_v1.migrate(create.as_ref());
261
262                    let data = this.serialize_json(&migrated_room_info)?;
263                    let room_id = this.encode_key(keys::ROOM_INFO, migrated_room_info.room_id());
264                    txn.prepare_cached("UPDATE room_info SET data = ? WHERE room_id = ?")?
265                        .execute((data, room_id))?;
266                }
267
268                txn.set_db_version(3)?;
269                Result::<_, Error>::Ok(())
270            })
271            .await?;
272        }
273
274        if from < 4 && to >= 4 {
275            conn.with_transaction(move |txn| {
276                // Create new table.
277                txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
278                txn.set_db_version(4)
279            })
280            .await?;
281        }
282
283        if from < 5 && to >= 5 {
284            conn.with_transaction(move |txn| {
285                // Create new table.
286                txn.execute_batch(include_str!(
287                    "../migrations/state_store/004_send_queue_with_roomid_value.sql"
288                ))?;
289                txn.set_db_version(4)
290            })
291            .await?;
292        }
293
294        if from < 6 && to >= 6 {
295            conn.with_transaction(move |txn| {
296                // Create new table.
297                txn.execute_batch(include_str!(
298                    "../migrations/state_store/005_send_queue_dependent_events.sql"
299                ))?;
300                txn.set_db_version(6)
301            })
302            .await?;
303        }
304
305        if from < 7 && to >= 7 {
306            conn.with_transaction(move |txn| {
307                // Drop media table.
308                txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
309                txn.set_db_version(7)
310            })
311            .await?;
312        }
313
314        if from < 8 && to >= 8 {
315            // Replace all existing wedged events with a generic error.
316            let error = QueueWedgeError::GenericApiError {
317                msg: "local echo failed to send in a previous session".into(),
318            };
319            let default_err = self.serialize_value(&error)?;
320
321            conn.with_transaction(move |txn| {
322                // Update send queue table to persist the wedge reason if any.
323                txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?;
324
325                // Migrate the data, add a generic error for currently wedged events
326
327                for wedged_entries in txn
328                    .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")?
329                    .query_map((), |row| {
330                        Ok(
331                            (row.get::<_, Vec<u8>>(0)?,row.get::<_, String>(1)?)
332                        )
333                    })? {
334
335                    let (room_id, transaction_id) = wedged_entries?;
336
337                    txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?
338                        .execute((default_err.clone(), room_id, transaction_id))?;
339                }
340
341
342                // Clean up the table now that data is migrated
343                txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?;
344
345                txn.set_db_version(8)
346            })
347                .await?;
348        }
349
350        if from < 9 && to >= 9 {
351            conn.with_transaction(move |txn| {
352                // Run the migration.
353                txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?;
354                txn.set_db_version(9)
355            })
356            .await?;
357        }
358
359        if from < 10 && to >= 10 {
360            conn.with_transaction(move |txn| {
361                // Run the migration.
362                txn.execute_batch(include_str!(
363                    "../migrations/state_store/009_send_queue_priority.sql"
364                ))?;
365                txn.set_db_version(10)
366            })
367            .await?;
368        }
369
370        if from < 11 && to >= 11 {
371            conn.with_transaction(move |txn| {
372                // Run the migration.
373                txn.execute_batch(include_str!(
374                    "../migrations/state_store/010_send_queue_enqueue_time.sql"
375                ))?;
376                txn.set_db_version(11)
377            })
378            .await?;
379        }
380
381        if from < 12 && to >= 12 {
382            // Defragment the DB and optimize its size on the filesystem.
383            // This should have been run in the migration for version 7, to reduce the size
384            // of the DB as we removed the media cache.
385            conn.vacuum().await?;
386            conn.set_kv("version", vec![12]).await?;
387        }
388
389        if from < 13 && to >= 13 {
390            conn.with_transaction(move |txn| {
391                // Run the migration.
392                txn.execute_batch(include_str!(
393                    "../migrations/state_store/011_thread_subscriptions.sql"
394                ))?;
395                txn.set_db_version(13)
396            })
397            .await?;
398        }
399
400        if from < 14 && to >= 14 {
401            conn.with_transaction(move |txn| {
402                // Run the migration.
403                txn.execute_batch(include_str!(
404                    "../migrations/state_store/012_thread_subscriptions_bumpstamp.sql"
405                ))?;
406                txn.set_db_version(14)
407            })
408            .await?;
409        }
410
411        Ok(())
412    }
413
414    fn encode_state_store_data_key(&self, key: StateStoreDataKey<'_>) -> Key {
415        let key_s = match key {
416            StateStoreDataKey::SyncToken => Cow::Borrowed(StateStoreDataKey::SYNC_TOKEN),
417            StateStoreDataKey::ServerInfo => Cow::Borrowed(StateStoreDataKey::SERVER_INFO),
418            StateStoreDataKey::Filter(f) => {
419                Cow::Owned(format!("{}:{f}", StateStoreDataKey::FILTER))
420            }
421            StateStoreDataKey::UserAvatarUrl(u) => {
422                Cow::Owned(format!("{}:{u}", StateStoreDataKey::USER_AVATAR_URL))
423            }
424            StateStoreDataKey::RecentlyVisitedRooms(b) => {
425                Cow::Owned(format!("{}:{b}", StateStoreDataKey::RECENTLY_VISITED_ROOMS))
426            }
427            StateStoreDataKey::UtdHookManagerData => {
428                Cow::Borrowed(StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
429            }
430            StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
431                Cow::Borrowed(StateStoreDataKey::ONE_TIME_KEY_ALREADY_UPLOADED)
432            }
433            StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
434                if let Some(thread_root) = thread_root {
435                    Cow::Owned(format!(
436                        "{}:{room_id}:{thread_root}",
437                        StateStoreDataKey::COMPOSER_DRAFT
438                    ))
439                } else {
440                    Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::COMPOSER_DRAFT))
441                }
442            }
443            StateStoreDataKey::SeenKnockRequests(room_id) => {
444                Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::SEEN_KNOCK_REQUESTS))
445            }
446            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
447                Cow::Borrowed(StateStoreDataKey::THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS)
448            }
449        };
450
451        self.encode_key(keys::KV_BLOB, &*key_s)
452    }
453
454    fn encode_presence_key(&self, user_id: &UserId) -> Key {
455        self.encode_key(keys::KV_BLOB, format!("presence:{user_id}"))
456    }
457
458    fn encode_custom_key(&self, key: &[u8]) -> Key {
459        let mut full_key = b"custom:".to_vec();
460        full_key.extend(key);
461        self.encode_key(keys::KV_BLOB, full_key)
462    }
463
464    /// Acquire a connection for executing read operations.
465    #[instrument(skip_all)]
466    async fn read(&self) -> Result<SqliteAsyncConn> {
467        trace!("Taking a `read` connection");
468        let _timer = timer!("connection");
469
470        Ok(self.pool.get().await?)
471    }
472
473    /// Acquire a connection for executing write operations.
474    #[instrument(skip_all)]
475    async fn write(&self) -> OwnedMutexGuard<SqliteAsyncConn> {
476        trace!("Taking a `write` connection");
477        let _timer = timer!("connection");
478
479        self.write_connection.clone().lock_owned().await
480    }
481
482    fn remove_maybe_stripped_room_data(
483        &self,
484        txn: &Transaction<'_>,
485        room_id: &RoomId,
486        stripped: bool,
487    ) -> rusqlite::Result<()> {
488        let state_event_room_id = self.encode_key(keys::STATE_EVENT, room_id);
489        txn.remove_room_state_events(&state_event_room_id, Some(stripped))?;
490
491        let member_room_id = self.encode_key(keys::MEMBER, room_id);
492        txn.remove_room_members(&member_room_id, Some(stripped))
493    }
494}
495
496impl EncryptableStore for SqliteStateStore {
497    fn get_cypher(&self) -> Option<&StoreCipher> {
498        self.store_cipher.as_deref()
499    }
500}
501
502/// Initialize the database.
503async fn init(conn: &SqliteAsyncConn) -> Result<()> {
504    // First turn on WAL mode, this can't be done in the transaction, it fails with
505    // the error message: "cannot change into wal mode from within a transaction".
506    conn.execute_batch("PRAGMA journal_mode = wal;").await?;
507    conn.with_transaction(|txn| {
508        txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
509        txn.set_db_version(1)?;
510
511        Ok(())
512    })
513    .await
514}
515
516trait SqliteConnectionStateStoreExt {
517    fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()>;
518
519    fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()>;
520
521    fn set_room_account_data(
522        &self,
523        room_id: &[u8],
524        event_type: &[u8],
525        data: &[u8],
526    ) -> rusqlite::Result<()>;
527    fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()>;
528
529    fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()>;
530    fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>>;
531    fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()>;
532
533    fn set_state_event(
534        &self,
535        room_id: &[u8],
536        event_type: &[u8],
537        state_key: &[u8],
538        stripped: bool,
539        event_id: Option<&[u8]>,
540        data: &[u8],
541    ) -> rusqlite::Result<()>;
542    fn get_state_event_by_id(
543        &self,
544        room_id: &[u8],
545        event_id: &[u8],
546    ) -> rusqlite::Result<Option<Vec<u8>>>;
547    fn remove_room_state_events(
548        &self,
549        room_id: &[u8],
550        stripped: Option<bool>,
551    ) -> rusqlite::Result<()>;
552
553    fn set_member(
554        &self,
555        room_id: &[u8],
556        user_id: &[u8],
557        membership: &[u8],
558        stripped: bool,
559        data: &[u8],
560    ) -> rusqlite::Result<()>;
561    fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()>;
562
563    fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
564    fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()>;
565    fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()>;
566
567    fn set_receipt(
568        &self,
569        room_id: &[u8],
570        user_id: &[u8],
571        receipt_type: &[u8],
572        thread_id: &[u8],
573        event_id: &[u8],
574        data: &[u8],
575    ) -> rusqlite::Result<()>;
576    fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()>;
577
578    fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()>;
579    fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()>;
580    fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()>;
581    fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
582    fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
583}
584
585impl SqliteConnectionStateStoreExt for rusqlite::Connection {
586    fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()> {
587        self.execute("INSERT OR REPLACE INTO kv_blob VALUES (?, ?)", (key, value))?;
588        Ok(())
589    }
590
591    fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()> {
592        self.prepare_cached(
593            "INSERT OR REPLACE INTO global_account_data (event_type, data)
594             VALUES (?, ?)",
595        )?
596        .execute((event_type, data))?;
597        Ok(())
598    }
599
600    fn set_room_account_data(
601        &self,
602        room_id: &[u8],
603        event_type: &[u8],
604        data: &[u8],
605    ) -> rusqlite::Result<()> {
606        self.prepare_cached(
607            "INSERT OR REPLACE INTO room_account_data (room_id, event_type, data)
608             VALUES (?, ?, ?)",
609        )?
610        .execute((room_id, event_type, data))?;
611        Ok(())
612    }
613
614    fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()> {
615        self.prepare(
616            "DELETE FROM room_account_data
617             WHERE room_id = ?",
618        )?
619        .execute((room_id,))?;
620        Ok(())
621    }
622
623    fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()> {
624        self.prepare_cached(
625            "INSERT OR REPLACE INTO room_info (room_id, state, data)
626             VALUES (?, ?, ?)",
627        )?
628        .execute((room_id, state, data))?;
629        Ok(())
630    }
631
632    fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>> {
633        self.query_row("SELECT data FROM room_info WHERE room_id = ?", (room_id,), |row| row.get(0))
634            .optional()
635    }
636
637    /// Remove the room info for the given room.
638    fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()> {
639        self.prepare_cached("DELETE FROM room_info WHERE room_id = ?")?.execute((room_id,))?;
640        Ok(())
641    }
642
643    fn set_state_event(
644        &self,
645        room_id: &[u8],
646        event_type: &[u8],
647        state_key: &[u8],
648        stripped: bool,
649        event_id: Option<&[u8]>,
650        data: &[u8],
651    ) -> rusqlite::Result<()> {
652        self.prepare_cached(
653            "INSERT OR REPLACE
654             INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
655             VALUES (?, ?, ?, ?, ?, ?)",
656        )?
657        .execute((room_id, event_type, state_key, stripped, event_id, data))?;
658        Ok(())
659    }
660
661    fn get_state_event_by_id(
662        &self,
663        room_id: &[u8],
664        event_id: &[u8],
665    ) -> rusqlite::Result<Option<Vec<u8>>> {
666        self.query_row(
667            "SELECT data FROM state_event WHERE room_id = ? AND event_id = ?",
668            (room_id, event_id),
669            |row| row.get(0),
670        )
671        .optional()
672    }
673
674    /// Remove state events for the given room.
675    ///
676    /// If `stripped` is `Some()`, only removes state events for the given
677    /// stripped state. Otherwise, state events are removed regardless of the
678    /// stripped state.
679    fn remove_room_state_events(
680        &self,
681        room_id: &[u8],
682        stripped: Option<bool>,
683    ) -> rusqlite::Result<()> {
684        if let Some(stripped) = stripped {
685            self.prepare_cached("DELETE FROM state_event WHERE room_id = ? AND stripped = ?")?
686                .execute((room_id, stripped))?;
687        } else {
688            self.prepare_cached("DELETE FROM state_event WHERE room_id = ?")?
689                .execute((room_id,))?;
690        }
691        Ok(())
692    }
693
694    fn set_member(
695        &self,
696        room_id: &[u8],
697        user_id: &[u8],
698        membership: &[u8],
699        stripped: bool,
700        data: &[u8],
701    ) -> rusqlite::Result<()> {
702        self.prepare_cached(
703            "INSERT OR REPLACE
704             INTO member (room_id, user_id, membership, stripped, data)
705             VALUES (?, ?, ?, ?, ?)",
706        )?
707        .execute((room_id, user_id, membership, stripped, data))?;
708        Ok(())
709    }
710
711    /// Remove members for the given room.
712    ///
713    /// If `stripped` is `Some()`, only removes members for the given stripped
714    /// state. Otherwise, members are removed regardless of the stripped state.
715    fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()> {
716        if let Some(stripped) = stripped {
717            self.prepare_cached("DELETE FROM member WHERE room_id = ? AND stripped = ?")?
718                .execute((room_id, stripped))?;
719        } else {
720            self.prepare_cached("DELETE FROM member WHERE room_id = ?")?.execute((room_id,))?;
721        }
722        Ok(())
723    }
724
725    fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
726        self.prepare_cached(
727            "INSERT OR REPLACE
728             INTO profile (room_id, user_id, data)
729             VALUES (?, ?, ?)",
730        )?
731        .execute((room_id, user_id, data))?;
732        Ok(())
733    }
734
735    fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()> {
736        self.prepare("DELETE FROM profile WHERE room_id = ?")?.execute((room_id,))?;
737        Ok(())
738    }
739
740    fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()> {
741        self.prepare("DELETE FROM profile WHERE room_id = ? AND user_id = ?")?
742            .execute((room_id, user_id))?;
743        Ok(())
744    }
745
746    fn set_receipt(
747        &self,
748        room_id: &[u8],
749        user_id: &[u8],
750        receipt_type: &[u8],
751        thread: &[u8],
752        event_id: &[u8],
753        data: &[u8],
754    ) -> rusqlite::Result<()> {
755        self.prepare_cached(
756            "INSERT OR REPLACE
757             INTO receipt (room_id, user_id, receipt_type, thread, event_id, data)
758             VALUES (?, ?, ?, ?, ?, ?)",
759        )?
760        .execute((room_id, user_id, receipt_type, thread, event_id, data))?;
761        Ok(())
762    }
763
764    fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()> {
765        self.prepare("DELETE FROM receipt WHERE room_id = ?")?.execute((room_id,))?;
766        Ok(())
767    }
768
769    fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
770        self.prepare_cached(
771            "INSERT OR REPLACE
772             INTO display_name (room_id, name, data)
773             VALUES (?, ?, ?)",
774        )?
775        .execute((room_id, name, data))?;
776        Ok(())
777    }
778
779    fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()> {
780        self.prepare("DELETE FROM display_name WHERE room_id = ? AND name = ?")?
781            .execute((room_id, name))?;
782        Ok(())
783    }
784
785    fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()> {
786        self.prepare("DELETE FROM display_name WHERE room_id = ?")?.execute((room_id,))?;
787        Ok(())
788    }
789
790    fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
791        self.prepare("DELETE FROM send_queue_events WHERE room_id = ?")?.execute((room_id,))?;
792        Ok(())
793    }
794
795    fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
796        self.prepare("DELETE FROM dependent_send_queue_events WHERE room_id = ?")?
797            .execute((room_id,))?;
798        Ok(())
799    }
800}
801
802#[async_trait]
803trait SqliteObjectStateStoreExt: SqliteAsyncConnExt {
804    async fn get_kv_blob(&self, key: Key) -> Result<Option<Vec<u8>>> {
805        Ok(self
806            .query_row("SELECT value FROM kv_blob WHERE key = ?", (key,), |row| row.get(0))
807            .await
808            .optional()?)
809    }
810
811    async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
812        let keys_length = keys.len();
813
814        self.chunk_large_query_over(keys, Some(keys_length), |txn, keys| {
815            let sql_params = repeat_vars(keys.len());
816            let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");
817
818            let params = rusqlite::params_from_iter(keys);
819
820            Ok(txn
821                .prepare(&sql)?
822                .query(params)?
823                .mapped(|row| row.get(0))
824                .collect::<Result<_, _>>()?)
825        })
826        .await
827    }
828
829    async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;
830
831    async fn delete_kv_blob(&self, key: Key) -> Result<()> {
832        self.execute("DELETE FROM kv_blob WHERE key = ?", (key,)).await?;
833        Ok(())
834    }
835
836    async fn get_room_infos(&self, room_id: Option<Key>) -> Result<Vec<Vec<u8>>> {
837        Ok(match room_id {
838            None => {
839                self.prepare("SELECT data FROM room_info", move |mut stmt| {
840                    stmt.query_map((), |row| row.get(0))?.collect()
841                })
842                .await?
843            }
844
845            Some(room_id) => {
846                self.prepare("SELECT data FROM room_info WHERE room_id = ?", move |mut stmt| {
847                    stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
848                })
849                .await?
850            }
851        })
852    }
853
854    async fn get_maybe_stripped_state_events_for_keys(
855        &self,
856        room_id: Key,
857        event_type: Key,
858        state_keys: Vec<Key>,
859    ) -> Result<Vec<(bool, Vec<u8>)>> {
860        self.chunk_large_query_over(state_keys, None, move |txn, state_keys: Vec<Key>| {
861            let sql_params = repeat_vars(state_keys.len());
862            let sql = format!(
863                "SELECT stripped, data FROM state_event
864                 WHERE room_id = ? AND event_type = ? AND state_key IN ({sql_params})"
865            );
866
867            let params = rusqlite::params_from_iter(
868                [room_id.clone(), event_type.clone()].into_iter().chain(state_keys),
869            );
870
871            Ok(txn
872                .prepare(&sql)?
873                .query(params)?
874                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
875                .collect::<Result<_, _>>()?)
876        })
877        .await
878    }
879
880    async fn get_maybe_stripped_state_events(
881        &self,
882        room_id: Key,
883        event_type: Key,
884    ) -> Result<Vec<(bool, Vec<u8>)>> {
885        Ok(self
886            .prepare(
887                "SELECT stripped, data FROM state_event
888                 WHERE room_id = ? AND event_type = ?",
889                |mut stmt| {
890                    stmt.query((room_id, event_type))?
891                        .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
892                        .collect()
893                },
894            )
895            .await?)
896    }
897
898    async fn get_profiles(
899        &self,
900        room_id: Key,
901        user_ids: Vec<Key>,
902    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
903        let user_ids_length = user_ids.len();
904
905        self.chunk_large_query_over(user_ids, Some(user_ids_length), move |txn, user_ids| {
906            let sql_params = repeat_vars(user_ids.len());
907            let sql = format!(
908                "SELECT user_id, data FROM profile WHERE room_id = ? AND user_id IN ({sql_params})"
909            );
910
911            let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(user_ids));
912
913            Ok(txn
914                .prepare(&sql)?
915                .query(params)?
916                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
917                .collect::<Result<_, _>>()?)
918        })
919        .await
920    }
921
922    async fn get_user_ids(&self, room_id: Key, memberships: Vec<Key>) -> Result<Vec<Vec<u8>>> {
923        let res = if memberships.is_empty() {
924            self.prepare("SELECT data FROM member WHERE room_id = ?", |mut stmt| {
925                stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
926            })
927            .await?
928        } else {
929            self.chunk_large_query_over(memberships, None, move |txn, memberships| {
930                let sql_params = repeat_vars(memberships.len());
931                let sql = format!(
932                    "SELECT data FROM member WHERE room_id = ? AND membership IN ({sql_params})"
933                );
934
935                let params =
936                    rusqlite::params_from_iter(iter::once(room_id.clone()).chain(memberships));
937
938                Ok(txn
939                    .prepare(&sql)?
940                    .query(params)?
941                    .mapped(|row| row.get(0))
942                    .collect::<Result<_, _>>()?)
943            })
944            .await?
945        };
946
947        Ok(res)
948    }
949
950    async fn get_global_account_data(&self, event_type: Key) -> Result<Option<Vec<u8>>> {
951        Ok(self
952            .query_row(
953                "SELECT data FROM global_account_data WHERE event_type = ?",
954                (event_type,),
955                |row| row.get(0),
956            )
957            .await
958            .optional()?)
959    }
960
961    async fn get_room_account_data(
962        &self,
963        room_id: Key,
964        event_type: Key,
965    ) -> Result<Option<Vec<u8>>> {
966        Ok(self
967            .query_row(
968                "SELECT data FROM room_account_data WHERE room_id = ? AND event_type = ?",
969                (room_id, event_type),
970                |row| row.get(0),
971            )
972            .await
973            .optional()?)
974    }
975
976    async fn get_display_names(
977        &self,
978        room_id: Key,
979        names: Vec<Key>,
980    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
981        let names_length = names.len();
982
983        self.chunk_large_query_over(names, Some(names_length), move |txn, names| {
984            let sql_params = repeat_vars(names.len());
985            let sql = format!(
986                "SELECT name, data FROM display_name WHERE room_id = ? AND name IN ({sql_params})"
987            );
988
989            let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(names));
990
991            Ok(txn
992                .prepare(&sql)?
993                .query(params)?
994                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
995                .collect::<Result<_, _>>()?)
996        })
997        .await
998    }
999
1000    async fn get_user_receipt(
1001        &self,
1002        room_id: Key,
1003        receipt_type: Key,
1004        thread: Key,
1005        user_id: Key,
1006    ) -> Result<Option<Vec<u8>>> {
1007        Ok(self
1008            .query_row(
1009                "SELECT data FROM receipt
1010                 WHERE room_id = ? AND receipt_type = ? AND thread = ? and user_id = ?",
1011                (room_id, receipt_type, thread, user_id),
1012                |row| row.get(0),
1013            )
1014            .await
1015            .optional()?)
1016    }
1017
1018    async fn get_event_receipts(
1019        &self,
1020        room_id: Key,
1021        receipt_type: Key,
1022        thread: Key,
1023        event_id: Key,
1024    ) -> Result<Vec<Vec<u8>>> {
1025        Ok(self
1026            .prepare(
1027                "SELECT data FROM receipt
1028                 WHERE room_id = ? AND receipt_type = ? AND thread = ? and event_id = ?",
1029                |mut stmt| {
1030                    stmt.query((room_id, receipt_type, thread, event_id))?
1031                        .mapped(|row| row.get(0))
1032                        .collect()
1033                },
1034            )
1035            .await?)
1036    }
1037}
1038
1039#[async_trait]
1040impl SqliteObjectStateStoreExt for SqliteAsyncConn {
1041    async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()> {
1042        Ok(self.interact(move |conn| conn.set_kv_blob(&key, &value)).await.unwrap()?)
1043    }
1044}
1045
1046#[async_trait]
1047impl StateStore for SqliteStateStore {
1048    type Error = Error;
1049
1050    async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
1051        self.read()
1052            .await?
1053            .get_kv_blob(self.encode_state_store_data_key(key))
1054            .await?
1055            .map(|data| {
1056                Ok(match key {
1057                    StateStoreDataKey::SyncToken => {
1058                        StateStoreDataValue::SyncToken(self.deserialize_value(&data)?)
1059                    }
1060                    StateStoreDataKey::ServerInfo => {
1061                        StateStoreDataValue::ServerInfo(self.deserialize_value(&data)?)
1062                    }
1063                    StateStoreDataKey::Filter(_) => {
1064                        StateStoreDataValue::Filter(self.deserialize_value(&data)?)
1065                    }
1066                    StateStoreDataKey::UserAvatarUrl(_) => {
1067                        StateStoreDataValue::UserAvatarUrl(self.deserialize_value(&data)?)
1068                    }
1069                    StateStoreDataKey::RecentlyVisitedRooms(_) => {
1070                        StateStoreDataValue::RecentlyVisitedRooms(self.deserialize_value(&data)?)
1071                    }
1072                    StateStoreDataKey::UtdHookManagerData => {
1073                        StateStoreDataValue::UtdHookManagerData(self.deserialize_value(&data)?)
1074                    }
1075                    StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1076                        StateStoreDataValue::OneTimeKeyAlreadyUploaded
1077                    }
1078                    StateStoreDataKey::ComposerDraft(_, _) => {
1079                        StateStoreDataValue::ComposerDraft(self.deserialize_value(&data)?)
1080                    }
1081                    StateStoreDataKey::SeenKnockRequests(_) => {
1082                        StateStoreDataValue::SeenKnockRequests(self.deserialize_value(&data)?)
1083                    }
1084                    StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
1085                        StateStoreDataValue::ThreadSubscriptionsCatchupTokens(
1086                            self.deserialize_value(&data)?,
1087                        )
1088                    }
1089                })
1090            })
1091            .transpose()
1092    }
1093
1094    async fn set_kv_data(
1095        &self,
1096        key: StateStoreDataKey<'_>,
1097        value: StateStoreDataValue,
1098    ) -> Result<()> {
1099        let serialized_value = match key {
1100            StateStoreDataKey::SyncToken => self.serialize_value(
1101                &value.into_sync_token().expect("Session data not a sync token"),
1102            )?,
1103            StateStoreDataKey::ServerInfo => self.serialize_value(
1104                &value.into_server_info().expect("Session data not containing server info"),
1105            )?,
1106            StateStoreDataKey::Filter(_) => {
1107                self.serialize_value(&value.into_filter().expect("Session data not a filter"))?
1108            }
1109            StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
1110                &value.into_user_avatar_url().expect("Session data not an user avatar url"),
1111            )?,
1112            StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
1113                &value.into_recently_visited_rooms().expect("Session data not breadcrumbs"),
1114            )?,
1115            StateStoreDataKey::UtdHookManagerData => self.serialize_value(
1116                &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
1117            )?,
1118            StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1119                self.serialize_value(&true).expect("We should be able to serialize a boolean")
1120            }
1121            StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
1122                &value.into_composer_draft().expect("Session data not a composer draft"),
1123            )?,
1124            StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
1125                &value
1126                    .into_seen_knock_requests()
1127                    .expect("Session data is not a set of seen knock request ids"),
1128            )?,
1129            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => self.serialize_value(
1130                &value
1131                    .into_thread_subscriptions_catchup_tokens()
1132                    .expect("Session data is not a list of thread subscription catchup tokens"),
1133            )?,
1134        };
1135
1136        self.write()
1137            .await
1138            .set_kv_blob(self.encode_state_store_data_key(key), serialized_value)
1139            .await
1140    }
1141
1142    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
1143        self.write().await.delete_kv_blob(self.encode_state_store_data_key(key)).await
1144    }
1145
1146    async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
1147        let changes = changes.to_owned();
1148        let this = self.clone();
1149        self.write()
1150            .await
1151            .with_transaction(move |txn| {
1152                let StateChanges {
1153                    sync_token,
1154                    account_data,
1155                    presence,
1156                    profiles,
1157                    profiles_to_delete,
1158                    state,
1159                    room_account_data,
1160                    room_infos,
1161                    receipts,
1162                    redactions,
1163                    stripped_state,
1164                    ambiguity_maps,
1165                } = changes;
1166
1167                if let Some(sync_token) = sync_token {
1168                    let key = this.encode_state_store_data_key(StateStoreDataKey::SyncToken);
1169                    let value = this.serialize_value(&sync_token)?;
1170                    txn.set_kv_blob(&key, &value)?;
1171                }
1172
1173                for (event_type, event) in account_data {
1174                    let event_type =
1175                        this.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1176                    let data = this.serialize_json(&event)?;
1177                    txn.set_global_account_data(&event_type, &data)?;
1178                }
1179
1180                for (room_id, events) in room_account_data {
1181                    let room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1182                    for (event_type, event) in events {
1183                        let event_type =
1184                            this.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1185                        let data = this.serialize_json(&event)?;
1186                        txn.set_room_account_data(&room_id, &event_type, &data)?;
1187                    }
1188                }
1189
1190                for (user_id, event) in presence {
1191                    let key = this.encode_presence_key(&user_id);
1192                    let value = this.serialize_json(&event)?;
1193                    txn.set_kv_blob(&key, &value)?;
1194                }
1195
1196                for (room_id, room_info) in room_infos {
1197                    let stripped = room_info.state() == RoomState::Invited;
1198                    // Remove non-stripped data for stripped rooms and vice-versa.
1199                    this.remove_maybe_stripped_room_data(txn, &room_id, !stripped)?;
1200
1201                    let room_id = this.encode_key(keys::ROOM_INFO, room_id);
1202                    let state = this
1203                        .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
1204                    let data = this.serialize_json(&room_info)?;
1205                    txn.set_room_info(&room_id, &state, &data)?;
1206                }
1207
1208                for (room_id, user_ids) in profiles_to_delete {
1209                    let room_id = this.encode_key(keys::PROFILE, room_id);
1210                    for user_id in user_ids {
1211                        let user_id = this.encode_key(keys::PROFILE, user_id);
1212                        txn.remove_room_profile(&room_id, &user_id)?;
1213                    }
1214                }
1215
1216                for (room_id, state_event_types) in state {
1217                    let profiles = profiles.get(&room_id);
1218                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1219
1220                    for (event_type, state_events) in state_event_types {
1221                        let encoded_event_type =
1222                            this.encode_key(keys::STATE_EVENT, event_type.to_string());
1223
1224                        for (state_key, raw_state_event) in state_events {
1225                            let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1226                            let data = this.serialize_json(&raw_state_event)?;
1227
1228                            let event_id: Option<String> =
1229                                raw_state_event.get_field("event_id").ok().flatten();
1230                            let encoded_event_id =
1231                                event_id.as_ref().map(|e| this.encode_key(keys::STATE_EVENT, e));
1232
1233                            txn.set_state_event(
1234                                &encoded_room_id,
1235                                &encoded_event_type,
1236                                &encoded_state_key,
1237                                false,
1238                                encoded_event_id.as_deref(),
1239                                &data,
1240                            )?;
1241
1242                            if event_type == StateEventType::RoomMember {
1243                                let member_event = match raw_state_event
1244                                    .deserialize_as_unchecked::<SyncRoomMemberEvent>()
1245                                {
1246                                    Ok(ev) => ev,
1247                                    Err(e) => {
1248                                        debug!(event_id, "Failed to deserialize member event: {e}");
1249                                        continue;
1250                                    }
1251                                };
1252
1253                                let encoded_room_id = this.encode_key(keys::MEMBER, &room_id);
1254                                let user_id = this.encode_key(keys::MEMBER, &state_key);
1255                                let membership = this
1256                                    .encode_key(keys::MEMBER, member_event.membership().as_str());
1257                                let data = this.serialize_value(&state_key)?;
1258
1259                                txn.set_member(
1260                                    &encoded_room_id,
1261                                    &user_id,
1262                                    &membership,
1263                                    false,
1264                                    &data,
1265                                )?;
1266
1267                                if let Some(profile) =
1268                                    profiles.and_then(|p| p.get(member_event.state_key()))
1269                                {
1270                                    let room_id = this.encode_key(keys::PROFILE, &room_id);
1271                                    let user_id = this.encode_key(keys::PROFILE, &state_key);
1272                                    let data = this.serialize_json(&profile)?;
1273                                    txn.set_profile(&room_id, &user_id, &data)?;
1274                                }
1275                            }
1276                        }
1277                    }
1278                }
1279
1280                for (room_id, stripped_state_event_types) in stripped_state {
1281                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1282
1283                    for (event_type, stripped_state_events) in stripped_state_event_types {
1284                        let encoded_event_type =
1285                            this.encode_key(keys::STATE_EVENT, event_type.to_string());
1286
1287                        for (state_key, raw_stripped_state_event) in stripped_state_events {
1288                            let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1289                            let data = this.serialize_json(&raw_stripped_state_event)?;
1290                            txn.set_state_event(
1291                                &encoded_room_id,
1292                                &encoded_event_type,
1293                                &encoded_state_key,
1294                                true,
1295                                None,
1296                                &data,
1297                            )?;
1298
1299                            if event_type == StateEventType::RoomMember {
1300                                let member_event = match raw_stripped_state_event
1301                                    .deserialize_as_unchecked::<StrippedRoomMemberEvent>(
1302                                ) {
1303                                    Ok(ev) => ev,
1304                                    Err(e) => {
1305                                        debug!("Failed to deserialize stripped member event: {e}");
1306                                        continue;
1307                                    }
1308                                };
1309
1310                                let room_id = this.encode_key(keys::MEMBER, &room_id);
1311                                let user_id = this.encode_key(keys::MEMBER, &state_key);
1312                                let membership = this.encode_key(
1313                                    keys::MEMBER,
1314                                    member_event.content.membership.as_str(),
1315                                );
1316                                let data = this.serialize_value(&state_key)?;
1317
1318                                txn.set_member(&room_id, &user_id, &membership, true, &data)?;
1319                            }
1320                        }
1321                    }
1322                }
1323
1324                for (room_id, receipt_event) in receipts {
1325                    let room_id = this.encode_key(keys::RECEIPT, room_id);
1326
1327                    for (event_id, receipt_types) in receipt_event {
1328                        let encoded_event_id = this.encode_key(keys::RECEIPT, &event_id);
1329
1330                        for (receipt_type, receipt_users) in receipt_types {
1331                            let receipt_type =
1332                                this.encode_key(keys::RECEIPT, receipt_type.as_str());
1333
1334                            for (user_id, receipt) in receipt_users {
1335                                let encoded_user_id = this.encode_key(keys::RECEIPT, &user_id);
1336                                // We cannot have a NULL primary key so we rely on serialization
1337                                // instead of the string representation.
1338                                let thread = this.encode_key(
1339                                    keys::RECEIPT,
1340                                    rmp_serde::to_vec_named(&receipt.thread)?,
1341                                );
1342                                let data = this.serialize_json(&ReceiptData {
1343                                    receipt,
1344                                    event_id: event_id.clone(),
1345                                    user_id,
1346                                })?;
1347
1348                                txn.set_receipt(
1349                                    &room_id,
1350                                    &encoded_user_id,
1351                                    &receipt_type,
1352                                    &thread,
1353                                    &encoded_event_id,
1354                                    &data,
1355                                )?;
1356                            }
1357                        }
1358                    }
1359                }
1360
1361                for (room_id, redactions) in redactions {
1362                    let make_redaction_rules = || {
1363                        let encoded_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1364                        txn.get_room_info(&encoded_room_id)
1365                            .ok()
1366                            .flatten()
1367                            .and_then(|v| this.deserialize_json::<RoomInfo>(&v).ok())
1368                            .map(|info| info.room_version_rules_or_default())
1369                            .unwrap_or_else(|| {
1370                                warn!(
1371                                    ?room_id,
1372                                    "Unable to get the room version rules, defaulting to rules for room version {ROOM_VERSION_FALLBACK}"
1373                                );
1374                                ROOM_VERSION_RULES_FALLBACK
1375                            }).redaction
1376                    };
1377
1378                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1379                    let mut redaction_rules = None;
1380
1381                    for (event_id, redaction) in redactions {
1382                        let event_id = this.encode_key(keys::STATE_EVENT, event_id);
1383
1384                        if let Some(Ok(raw_event)) = txn
1385                            .get_state_event_by_id(&encoded_room_id, &event_id)?
1386                            .map(|value| this.deserialize_json::<Raw<AnySyncStateEvent>>(&value))
1387                        {
1388                            let event = raw_event.deserialize()?;
1389                            let redacted = redact(
1390                                raw_event.deserialize_as::<CanonicalJsonObject>()?,
1391                                redaction_rules.get_or_insert_with(make_redaction_rules),
1392                                Some(RedactedBecause::from_raw_event(&redaction)?),
1393                            )
1394                            .map_err(Error::Redaction)?;
1395                            let data = this.serialize_json(&redacted)?;
1396
1397                            let event_type =
1398                                this.encode_key(keys::STATE_EVENT, event.event_type().to_string());
1399                            let state_key = this.encode_key(keys::STATE_EVENT, event.state_key());
1400
1401                            txn.set_state_event(
1402                                &encoded_room_id,
1403                                &event_type,
1404                                &state_key,
1405                                false,
1406                                Some(&event_id),
1407                                &data,
1408                            )?;
1409                        }
1410                    }
1411                }
1412
1413                for (room_id, display_names) in ambiguity_maps {
1414                    let room_id = this.encode_key(keys::DISPLAY_NAME, room_id);
1415
1416                    for (name, user_ids) in display_names {
1417                        let encoded_name = this.encode_key(
1418                            keys::DISPLAY_NAME,
1419                            name.as_normalized_str().unwrap_or_else(|| name.as_raw_str()),
1420                        );
1421                        let data = this.serialize_json(&user_ids)?;
1422
1423                        if user_ids.is_empty() {
1424                            txn.remove_display_name(&room_id, &encoded_name)?;
1425
1426                            // We can't do a migration to merge the previously distinct buckets of
1427                            // user IDs since the display names themselves are hashed before they
1428                            // are persisted in the store. So the store will always retain two
1429                            // buckets: one for raw display names and one for normalised ones.
1430                            //
1431                            // We therefore do the next best thing, which is a sort of a soft
1432                            // migration: we fetch both the raw and normalised buckets, then merge
1433                            // the user IDs contained in them into a separate, temporary merged
1434                            // bucket. The SDK then operates on the merged buckets exclusively. See
1435                            // the comment in `get_users_with_display_names` for details.
1436                            //
1437                            // If the merged bucket is empty, that must mean that both the raw and
1438                            // normalised buckets were also empty, so we can remove both from the
1439                            // store.
1440                            let raw_name = this.encode_key(keys::DISPLAY_NAME, name.as_raw_str());
1441                            txn.remove_display_name(&room_id, &raw_name)?;
1442                        } else {
1443                            // We only create new buckets with the normalized display name.
1444                            txn.set_display_name(&room_id, &encoded_name, &data)?;
1445                        }
1446                    }
1447                }
1448
1449                Ok::<_, Error>(())
1450            })
1451            .await?;
1452
1453        Ok(())
1454    }
1455
1456    async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1457        self.read()
1458            .await?
1459            .get_kv_blob(self.encode_presence_key(user_id))
1460            .await?
1461            .map(|data| self.deserialize_json(&data))
1462            .transpose()
1463    }
1464
1465    async fn get_presence_events(
1466        &self,
1467        user_ids: &[OwnedUserId],
1468    ) -> Result<Vec<Raw<PresenceEvent>>> {
1469        if user_ids.is_empty() {
1470            return Ok(Vec::new());
1471        }
1472
1473        let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
1474        self.read()
1475            .await?
1476            .get_kv_blobs(user_ids)
1477            .await?
1478            .into_iter()
1479            .map(|data| self.deserialize_json(&data))
1480            .collect()
1481    }
1482
1483    async fn get_state_event(
1484        &self,
1485        room_id: &RoomId,
1486        event_type: StateEventType,
1487        state_key: &str,
1488    ) -> Result<Option<RawAnySyncOrStrippedState>> {
1489        Ok(self
1490            .get_state_events_for_keys(room_id, event_type, &[state_key])
1491            .await?
1492            .into_iter()
1493            .next())
1494    }
1495
1496    async fn get_state_events(
1497        &self,
1498        room_id: &RoomId,
1499        event_type: StateEventType,
1500    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1501        let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1502        let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1503        self.read()
1504            .await?
1505            .get_maybe_stripped_state_events(room_id, event_type)
1506            .await?
1507            .into_iter()
1508            .map(|(stripped, data)| {
1509                let ev = if stripped {
1510                    RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1511                } else {
1512                    RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1513                };
1514
1515                Ok(ev)
1516            })
1517            .collect()
1518    }
1519
1520    async fn get_state_events_for_keys(
1521        &self,
1522        room_id: &RoomId,
1523        event_type: StateEventType,
1524        state_keys: &[&str],
1525    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1526        if state_keys.is_empty() {
1527            return Ok(Vec::new());
1528        }
1529
1530        let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1531        let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1532        let state_keys = state_keys.iter().map(|k| self.encode_key(keys::STATE_EVENT, k)).collect();
1533        self.read()
1534            .await?
1535            .get_maybe_stripped_state_events_for_keys(room_id, event_type, state_keys)
1536            .await?
1537            .into_iter()
1538            .map(|(stripped, data)| {
1539                let ev = if stripped {
1540                    RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1541                } else {
1542                    RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1543                };
1544
1545                Ok(ev)
1546            })
1547            .collect()
1548    }
1549
1550    async fn get_profile(
1551        &self,
1552        room_id: &RoomId,
1553        user_id: &UserId,
1554    ) -> Result<Option<MinimalRoomMemberEvent>> {
1555        let room_id = self.encode_key(keys::PROFILE, room_id);
1556        let user_ids = vec![self.encode_key(keys::PROFILE, user_id)];
1557
1558        self.read()
1559            .await?
1560            .get_profiles(room_id, user_ids)
1561            .await?
1562            .into_iter()
1563            .next()
1564            .map(|(_, data)| self.deserialize_json(&data))
1565            .transpose()
1566    }
1567
1568    async fn get_profiles<'a>(
1569        &self,
1570        room_id: &RoomId,
1571        user_ids: &'a [OwnedUserId],
1572    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1573        if user_ids.is_empty() {
1574            return Ok(BTreeMap::new());
1575        }
1576
1577        let room_id = self.encode_key(keys::PROFILE, room_id);
1578        let mut user_ids_map = user_ids
1579            .iter()
1580            .map(|u| (self.encode_key(keys::PROFILE, u), u.as_ref()))
1581            .collect::<BTreeMap<_, _>>();
1582        let user_ids = user_ids_map.keys().cloned().collect();
1583
1584        self.read()
1585            .await?
1586            .get_profiles(room_id, user_ids)
1587            .await?
1588            .into_iter()
1589            .map(|(user_id, data)| {
1590                Ok((
1591                    user_ids_map
1592                        .remove(user_id.as_slice())
1593                        .expect("returned user IDs were requested"),
1594                    self.deserialize_json(&data)?,
1595                ))
1596            })
1597            .collect()
1598    }
1599
1600    async fn get_user_ids(
1601        &self,
1602        room_id: &RoomId,
1603        membership: RoomMemberships,
1604    ) -> Result<Vec<OwnedUserId>> {
1605        let room_id = self.encode_key(keys::MEMBER, room_id);
1606        let memberships = membership
1607            .as_vec()
1608            .into_iter()
1609            .map(|m| self.encode_key(keys::MEMBER, m.as_str()))
1610            .collect();
1611        self.read()
1612            .await?
1613            .get_user_ids(room_id, memberships)
1614            .await?
1615            .iter()
1616            .map(|data| self.deserialize_value(data))
1617            .collect()
1618    }
1619
1620    async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1621        self.read()
1622            .await?
1623            .get_room_infos(match room_load_settings {
1624                RoomLoadSettings::All => None,
1625                RoomLoadSettings::One(room_id) => Some(self.encode_key(keys::ROOM_INFO, room_id)),
1626            })
1627            .await?
1628            .into_iter()
1629            .map(|data| self.deserialize_json(&data))
1630            .collect()
1631    }
1632
1633    async fn get_users_with_display_name(
1634        &self,
1635        room_id: &RoomId,
1636        display_name: &DisplayName,
1637    ) -> Result<BTreeSet<OwnedUserId>> {
1638        let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1639        let names = vec![self.encode_key(
1640            keys::DISPLAY_NAME,
1641            display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1642        )];
1643
1644        Ok(self
1645            .read()
1646            .await?
1647            .get_display_names(room_id, names)
1648            .await?
1649            .into_iter()
1650            .next()
1651            .map(|(_, data)| self.deserialize_json(&data))
1652            .transpose()?
1653            .unwrap_or_default())
1654    }
1655
1656    async fn get_users_with_display_names<'a>(
1657        &self,
1658        room_id: &RoomId,
1659        display_names: &'a [DisplayName],
1660    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1661        let mut result = HashMap::new();
1662
1663        if display_names.is_empty() {
1664            return Ok(result);
1665        }
1666
1667        let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1668        let mut names_map = display_names
1669            .iter()
1670            .flat_map(|display_name| {
1671                // We encode the display name as the `raw_str()` and the normalized string.
1672                //
1673                // This is for compatibility reasons since:
1674                //  1. Previously "Alice" and "alice" were considered to be distinct display
1675                //     names, while we now consider them to be the same so we need to merge the
1676                //     previously distinct buckets of user IDs.
1677                //  2. We can't do a migration to merge the previously distinct buckets of user
1678                //     IDs since the display names itself are hashed before they are persisted
1679                //     in the store.
1680                let raw =
1681                    (self.encode_key(keys::DISPLAY_NAME, display_name.as_raw_str()), display_name);
1682                let normalized = display_name.as_normalized_str().map(|normalized| {
1683                    (self.encode_key(keys::DISPLAY_NAME, normalized), display_name)
1684                });
1685
1686                iter::once(raw).chain(normalized)
1687            })
1688            .collect::<BTreeMap<_, _>>();
1689        let names = names_map.keys().cloned().collect();
1690
1691        for (name, data) in self.read().await?.get_display_names(room_id, names).await?.into_iter()
1692        {
1693            let display_name =
1694                names_map.remove(name.as_slice()).expect("returned display names were requested");
1695            let user_ids: BTreeSet<_> = self.deserialize_json(&data)?;
1696
1697            result.entry(display_name).or_insert_with(BTreeSet::new).extend(user_ids);
1698        }
1699
1700        Ok(result)
1701    }
1702
1703    async fn get_account_data_event(
1704        &self,
1705        event_type: GlobalAccountDataEventType,
1706    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1707        let event_type = self.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1708        self.read()
1709            .await?
1710            .get_global_account_data(event_type)
1711            .await?
1712            .map(|value| self.deserialize_json(&value))
1713            .transpose()
1714    }
1715
1716    async fn get_room_account_data_event(
1717        &self,
1718        room_id: &RoomId,
1719        event_type: RoomAccountDataEventType,
1720    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1721        let room_id = self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1722        let event_type = self.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1723        self.read()
1724            .await?
1725            .get_room_account_data(room_id, event_type)
1726            .await?
1727            .map(|value| self.deserialize_json(&value))
1728            .transpose()
1729    }
1730
1731    async fn get_user_room_receipt_event(
1732        &self,
1733        room_id: &RoomId,
1734        receipt_type: ReceiptType,
1735        thread: ReceiptThread,
1736        user_id: &UserId,
1737    ) -> Result<Option<(OwnedEventId, Receipt)>> {
1738        let room_id = self.encode_key(keys::RECEIPT, room_id);
1739        let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1740        // We cannot have a NULL primary key so we rely on serialization instead of the
1741        // string representation.
1742        let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1743        let user_id = self.encode_key(keys::RECEIPT, user_id);
1744
1745        self.read()
1746            .await?
1747            .get_user_receipt(room_id, receipt_type, thread, user_id)
1748            .await?
1749            .map(|value| {
1750                self.deserialize_json::<ReceiptData>(&value).map(|d| (d.event_id, d.receipt))
1751            })
1752            .transpose()
1753    }
1754
1755    async fn get_event_room_receipt_events(
1756        &self,
1757        room_id: &RoomId,
1758        receipt_type: ReceiptType,
1759        thread: ReceiptThread,
1760        event_id: &EventId,
1761    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1762        let room_id = self.encode_key(keys::RECEIPT, room_id);
1763        let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1764        // We cannot have a NULL primary key so we rely on serialization instead of the
1765        // string representation.
1766        let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1767        let event_id = self.encode_key(keys::RECEIPT, event_id);
1768
1769        self.read()
1770            .await?
1771            .get_event_receipts(room_id, receipt_type, thread, event_id)
1772            .await?
1773            .iter()
1774            .map(|value| {
1775                self.deserialize_json::<ReceiptData>(value).map(|d| (d.user_id, d.receipt))
1776            })
1777            .collect()
1778    }
1779
1780    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1781        self.read().await?.get_kv_blob(self.encode_custom_key(key)).await
1782    }
1783
1784    async fn set_custom_value_no_read(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
1785        let conn = self.write().await;
1786        let key = self.encode_custom_key(key);
1787        conn.set_kv_blob(key, value).await?;
1788        Ok(())
1789    }
1790
1791    async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1792        let conn = self.write().await;
1793        let key = self.encode_custom_key(key);
1794        let previous = conn.get_kv_blob(key.clone()).await?;
1795        conn.set_kv_blob(key, value).await?;
1796        Ok(previous)
1797    }
1798
1799    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1800        let conn = self.write().await;
1801        let key = self.encode_custom_key(key);
1802        let previous = conn.get_kv_blob(key.clone()).await?;
1803        if previous.is_some() {
1804            conn.delete_kv_blob(key).await?;
1805        }
1806        Ok(previous)
1807    }
1808
1809    async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1810        let this = self.clone();
1811        let room_id = room_id.to_owned();
1812
1813        let conn = self.write().await;
1814
1815        conn.with_transaction(move |txn| -> Result<()> {
1816            let room_info_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1817            txn.remove_room_info(&room_info_room_id)?;
1818
1819            let state_event_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1820            txn.remove_room_state_events(&state_event_room_id, None)?;
1821
1822            let member_room_id = this.encode_key(keys::MEMBER, &room_id);
1823            txn.remove_room_members(&member_room_id, None)?;
1824
1825            let profile_room_id = this.encode_key(keys::PROFILE, &room_id);
1826            txn.remove_room_profiles(&profile_room_id)?;
1827
1828            let room_account_data_room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, &room_id);
1829            txn.remove_room_account_data(&room_account_data_room_id)?;
1830
1831            let receipt_room_id = this.encode_key(keys::RECEIPT, &room_id);
1832            txn.remove_room_receipts(&receipt_room_id)?;
1833
1834            let display_name_room_id = this.encode_key(keys::DISPLAY_NAME, &room_id);
1835            txn.remove_room_display_names(&display_name_room_id)?;
1836
1837            let send_queue_room_id = this.encode_key(keys::SEND_QUEUE, &room_id);
1838            txn.remove_room_send_queue(&send_queue_room_id)?;
1839
1840            let dependent_send_queue_room_id =
1841                this.encode_key(keys::DEPENDENTS_SEND_QUEUE, &room_id);
1842            txn.remove_room_dependent_send_queue(&dependent_send_queue_room_id)?;
1843
1844            let thread_subscriptions_room_id =
1845                this.encode_key(keys::THREAD_SUBSCRIPTIONS, &room_id);
1846            txn.execute(
1847                "DELETE FROM thread_subscriptions WHERE room_id = ?",
1848                (thread_subscriptions_room_id,),
1849            )?;
1850
1851            Ok(())
1852        })
1853        .await?;
1854
1855        conn.vacuum().await
1856    }
1857
1858    async fn save_send_queue_request(
1859        &self,
1860        room_id: &RoomId,
1861        transaction_id: OwnedTransactionId,
1862        created_at: MilliSecondsSinceUnixEpoch,
1863        content: QueuedRequestKind,
1864        priority: usize,
1865    ) -> Result<(), Self::Error> {
1866        let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
1867        let room_id_value = self.serialize_value(&room_id.to_owned())?;
1868
1869        let content = self.serialize_json(&content)?;
1870        // The transaction id is used both as a key (in remove/update) and a value (as
1871        // it's useful for the callers), so we keep it as is, and neither hash
1872        // it (with encode_key) or encrypt it (through serialize_value). After
1873        // all, it carries no personal information, so this is considered fine.
1874
1875        let created_at_ts: u64 = created_at.0.into();
1876        self.write()
1877            .await
1878            .with_transaction(move |txn| {
1879                txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
1880                Ok(())
1881            })
1882            .await
1883    }
1884
1885    async fn update_send_queue_request(
1886        &self,
1887        room_id: &RoomId,
1888        transaction_id: &TransactionId,
1889        content: QueuedRequestKind,
1890    ) -> Result<bool, Self::Error> {
1891        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1892
1893        let content = self.serialize_json(&content)?;
1894        // See comment in [`Self::save_send_queue_event`] to understand why the
1895        // transaction id is neither encrypted or hashed.
1896        let transaction_id = transaction_id.to_string();
1897
1898        let num_updated = self.write()
1899            .await
1900            .with_transaction(move |txn| {
1901                txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
1902            })
1903            .await?;
1904
1905        Ok(num_updated > 0)
1906    }
1907
1908    async fn remove_send_queue_request(
1909        &self,
1910        room_id: &RoomId,
1911        transaction_id: &TransactionId,
1912    ) -> Result<bool, Self::Error> {
1913        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1914
1915        // See comment in `save_send_queue_event`.
1916        let transaction_id = transaction_id.to_string();
1917
1918        let num_deleted = self
1919            .write()
1920            .await
1921            .with_transaction(move |txn| {
1922                txn.prepare_cached(
1923                    "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
1924                )?
1925                .execute((room_id, &transaction_id))
1926            })
1927            .await?;
1928
1929        Ok(num_deleted > 0)
1930    }
1931
1932    async fn load_send_queue_requests(
1933        &self,
1934        room_id: &RoomId,
1935    ) -> Result<Vec<QueuedRequest>, Self::Error> {
1936        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1937
1938        // Note: ROWID is always present and is an auto-incremented integer counter. We
1939        // want to maintain the insertion order, so we can sort using it.
1940        // Note 2: transaction_id is not encoded, see why in `save_send_queue_event`.
1941        let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, Option<u64>)> = self
1942            .read()
1943            .await?
1944            .prepare(
1945                "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
1946                |mut stmt| {
1947                    stmt.query((room_id,))?
1948                        .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
1949                        .collect()
1950                },
1951            )
1952            .await?;
1953
1954        let mut requests = Vec::with_capacity(res.len());
1955
1956        for entry in res {
1957            let created_at = entry
1958                .4
1959                .and_then(UInt::new)
1960                .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
1961
1962            requests.push(QueuedRequest {
1963                transaction_id: entry.0.into(),
1964                kind: self.deserialize_json(&entry.1)?,
1965                error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
1966                priority: entry.3,
1967                created_at,
1968            });
1969        }
1970
1971        Ok(requests)
1972    }
1973
1974    async fn update_send_queue_request_status(
1975        &self,
1976        room_id: &RoomId,
1977        transaction_id: &TransactionId,
1978        error: Option<QueueWedgeError>,
1979    ) -> Result<(), Self::Error> {
1980        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1981
1982        // See comment in `save_send_queue_event`.
1983        let transaction_id = transaction_id.to_string();
1984
1985        // Serialize the error to json bytes (encrypted if option is enabled) if set.
1986        let error_value = error.map(|e| self.serialize_value(&e)).transpose()?;
1987
1988        self.write()
1989            .await
1990            .with_transaction(move |txn| {
1991                txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?;
1992                Ok(())
1993            })
1994            .await
1995    }
1996
1997    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1998        // If the values were not encrypted, we could use `SELECT DISTINCT` here, but we
1999        // have to manually do the deduplication: indeed, for all X, encrypt(X)
2000        // != encrypted(X), since we use a nonce in the encryption process.
2001
2002        let res: Vec<Vec<u8>> = self
2003            .read()
2004            .await?
2005            .prepare("SELECT room_id_val FROM send_queue_events", |mut stmt| {
2006                stmt.query(())?.mapped(|row| row.get(0)).collect()
2007            })
2008            .await?;
2009
2010        // So we collect the results into a `BTreeSet` to perform the deduplication, and
2011        // then rejigger that into a vector.
2012        Ok(res
2013            .into_iter()
2014            .map(|entry| self.deserialize_value(&entry))
2015            .collect::<Result<BTreeSet<OwnedRoomId>, _>>()?
2016            .into_iter()
2017            .collect())
2018    }
2019
2020    async fn save_dependent_queued_request(
2021        &self,
2022        room_id: &RoomId,
2023        parent_txn_id: &TransactionId,
2024        own_txn_id: ChildTransactionId,
2025        created_at: MilliSecondsSinceUnixEpoch,
2026        content: DependentQueuedRequestKind,
2027    ) -> Result<()> {
2028        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2029        let content = self.serialize_json(&content)?;
2030
2031        // See comment in `save_send_queue_event`.
2032        let parent_txn_id = parent_txn_id.to_string();
2033        let own_txn_id = own_txn_id.to_string();
2034
2035        let created_at_ts: u64 = created_at.0.into();
2036        self.write()
2037            .await
2038            .with_transaction(move |txn| {
2039                txn.prepare_cached(
2040                    r#"INSERT INTO dependent_send_queue_events
2041                         (room_id, parent_transaction_id, own_transaction_id, content, created_at)
2042                       VALUES (?, ?, ?, ?, ?)"#,
2043                )?
2044                .execute((
2045                    room_id,
2046                    parent_txn_id,
2047                    own_txn_id,
2048                    content,
2049                    created_at_ts,
2050                ))?;
2051                Ok(())
2052            })
2053            .await
2054    }
2055
2056    async fn update_dependent_queued_request(
2057        &self,
2058        room_id: &RoomId,
2059        own_transaction_id: &ChildTransactionId,
2060        new_content: DependentQueuedRequestKind,
2061    ) -> Result<bool> {
2062        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2063        let content = self.serialize_json(&new_content)?;
2064
2065        // See comment in `save_send_queue_event`.
2066        let own_txn_id = own_transaction_id.to_string();
2067
2068        let num_updated = self
2069            .write()
2070            .await
2071            .with_transaction(move |txn| {
2072                txn.prepare_cached(
2073                    r#"UPDATE dependent_send_queue_events
2074                       SET content = ?
2075                       WHERE own_transaction_id = ?
2076                       AND room_id = ?"#,
2077                )?
2078                .execute((content, own_txn_id, room_id))
2079            })
2080            .await?;
2081
2082        if num_updated > 1 {
2083            return Err(Error::InconsistentUpdate);
2084        }
2085
2086        Ok(num_updated == 1)
2087    }
2088
2089    async fn mark_dependent_queued_requests_as_ready(
2090        &self,
2091        room_id: &RoomId,
2092        parent_txn_id: &TransactionId,
2093        parent_key: SentRequestKey,
2094    ) -> Result<usize> {
2095        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2096        let parent_key = self.serialize_value(&parent_key)?;
2097
2098        // See comment in `save_send_queue_event`.
2099        let parent_txn_id = parent_txn_id.to_string();
2100
2101        self.write()
2102            .await
2103            .with_transaction(move |txn| {
2104                Ok(txn.prepare_cached(
2105                    "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?",
2106                )?
2107                .execute((parent_key, parent_txn_id, room_id))?)
2108            })
2109            .await
2110    }
2111
2112    async fn remove_dependent_queued_request(
2113        &self,
2114        room_id: &RoomId,
2115        txn_id: &ChildTransactionId,
2116    ) -> Result<bool> {
2117        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2118
2119        // See comment in `save_send_queue_event`.
2120        let txn_id = txn_id.to_string();
2121
2122        let num_deleted = self
2123            .write()
2124            .await
2125            .with_transaction(move |txn| {
2126                txn.prepare_cached(
2127                    "DELETE FROM dependent_send_queue_events WHERE own_transaction_id = ? AND room_id = ?",
2128                )?
2129                .execute((txn_id, room_id))
2130            })
2131            .await?;
2132
2133        Ok(num_deleted > 0)
2134    }
2135
2136    async fn load_dependent_queued_requests(
2137        &self,
2138        room_id: &RoomId,
2139    ) -> Result<Vec<DependentQueuedRequest>> {
2140        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2141
2142        // Note: transaction_id is not encoded, see why in `save_send_queue_event`.
2143        let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, Option<u64>)> = self
2144            .read()
2145            .await?
2146            .prepare(
2147                "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2148                |mut stmt| {
2149                    stmt.query((room_id,))?
2150                        .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2151                        .collect()
2152                },
2153            )
2154            .await?;
2155
2156        let mut dependent_events = Vec::with_capacity(res.len());
2157
2158        for entry in res {
2159            let created_at = entry
2160                .4
2161                .and_then(UInt::new)
2162                .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2163
2164            dependent_events.push(DependentQueuedRequest {
2165                own_transaction_id: entry.0.into(),
2166                parent_transaction_id: entry.1.into(),
2167                parent_key: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?,
2168                kind: self.deserialize_json(&entry.3)?,
2169                created_at,
2170            });
2171        }
2172
2173        Ok(dependent_events)
2174    }
2175
2176    async fn upsert_thread_subscription(
2177        &self,
2178        room_id: &RoomId,
2179        thread_id: &EventId,
2180        mut new: StoredThreadSubscription,
2181    ) -> Result<(), Self::Error> {
2182        if let Some(previous) = self.load_thread_subscription(room_id, thread_id).await? {
2183            if previous == new {
2184                // No need to update anything.
2185                trace!("not saving thread subscription because the subscription is the same");
2186                return Ok(());
2187            }
2188
2189            if !compare_thread_subscription_bump_stamps(previous.bump_stamp, &mut new.bump_stamp) {
2190                trace!("not saving thread subscription because we have a newer bump stamp");
2191                return Ok(());
2192            }
2193        }
2194
2195        let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2196        let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2197        let status = new.status.as_str();
2198
2199        self.write()
2200            .await
2201            .with_transaction(move |txn| {
2202                // Try to find a previous value.
2203                txn.prepare_cached(
2204                    "INSERT OR REPLACE INTO thread_subscriptions (room_id, event_id, status, bump_stamp)
2205                         VALUES (?, ?, ?, ?)",
2206                )?
2207                .execute((room_id, thread_id, status, new.bump_stamp))
2208            })
2209            .await?;
2210        Ok(())
2211    }
2212
2213    async fn load_thread_subscription(
2214        &self,
2215        room_id: &RoomId,
2216        thread_id: &EventId,
2217    ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
2218        let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2219        let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2220
2221        Ok(self
2222            .read()
2223            .await?
2224            .query_row(
2225                "SELECT status, bump_stamp FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2226                (room_id, thread_id),
2227                |row| Ok((row.get::<_, String>(0)?, row.get::<_, Option<u64>>(1)?))
2228            )
2229            .await
2230            .optional()?
2231            .map(|(status, bump_stamp)| -> Result<_, Self::Error> {
2232                let status = ThreadSubscriptionStatus::from_str(&status).map_err(|_| {
2233                    Error::InvalidData { details: format!("Invalid thread status: {status}") }
2234                })?;
2235                Ok(StoredThreadSubscription { status, bump_stamp })
2236            })
2237            .transpose()?)
2238    }
2239
2240    async fn remove_thread_subscription(
2241        &self,
2242        room_id: &RoomId,
2243        thread_id: &EventId,
2244    ) -> Result<(), Self::Error> {
2245        let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2246        let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2247
2248        self.write()
2249            .await
2250            .execute(
2251                "DELETE FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2252                (room_id, thread_id),
2253            )
2254            .await?;
2255
2256        Ok(())
2257    }
2258}
2259
2260#[derive(Debug, Clone, Serialize, Deserialize)]
2261struct ReceiptData {
2262    receipt: Receipt,
2263    event_id: OwnedEventId,
2264    user_id: OwnedUserId,
2265}
2266
2267#[cfg(test)]
2268mod tests {
2269    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2270
2271    use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2272    use once_cell::sync::Lazy;
2273    use tempfile::{tempdir, TempDir};
2274
2275    use super::SqliteStateStore;
2276
2277    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2278    static NUM: AtomicU32 = AtomicU32::new(0);
2279
2280    async fn get_store() -> Result<impl StateStore, StoreError> {
2281        let name = NUM.fetch_add(1, SeqCst).to_string();
2282        let tmpdir_path = TMP_DIR.path().join(name);
2283
2284        tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2285
2286        Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
2287    }
2288
2289    statestore_integration_tests!();
2290}
2291
2292#[cfg(test)]
2293mod encrypted_tests {
2294    use std::{
2295        path::PathBuf,
2296        sync::atomic::{AtomicU32, Ordering::SeqCst},
2297    };
2298
2299    use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2300    use matrix_sdk_test::async_test;
2301    use once_cell::sync::Lazy;
2302    use tempfile::{tempdir, TempDir};
2303
2304    use super::SqliteStateStore;
2305    use crate::{utils::SqliteAsyncConnExt, SqliteStoreConfig};
2306
2307    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2308    static NUM: AtomicU32 = AtomicU32::new(0);
2309
2310    fn new_state_store_workspace() -> PathBuf {
2311        let name = NUM.fetch_add(1, SeqCst).to_string();
2312        TMP_DIR.path().join(name)
2313    }
2314
2315    async fn get_store() -> Result<impl StateStore, StoreError> {
2316        let tmpdir_path = new_state_store_workspace();
2317
2318        tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2319
2320        Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
2321            .await
2322            .unwrap())
2323    }
2324
2325    #[async_test]
2326    async fn test_pool_size() {
2327        let tmpdir_path = new_state_store_workspace();
2328        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
2329
2330        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2331
2332        assert_eq!(store.pool.status().max_size, 42);
2333    }
2334
2335    #[async_test]
2336    async fn test_cache_size() {
2337        let tmpdir_path = new_state_store_workspace();
2338        let store_open_config = SqliteStoreConfig::new(tmpdir_path).cache_size(1500);
2339
2340        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2341
2342        let conn = store.pool.get().await.unwrap();
2343        let cache_size =
2344            conn.query_row("PRAGMA cache_size", (), |row| row.get::<_, i32>(0)).await.unwrap();
2345
2346        // The value passed to `SqliteStoreConfig` is in bytes. Check it is
2347        // converted to kibibytes. Also, it must be a negative value because it
2348        // _is_ the size in kibibytes, not in page size.
2349        assert_eq!(cache_size, -(1500 / 1024));
2350    }
2351
2352    #[async_test]
2353    async fn test_journal_size_limit() {
2354        let tmpdir_path = new_state_store_workspace();
2355        let store_open_config = SqliteStoreConfig::new(tmpdir_path).journal_size_limit(1500);
2356
2357        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2358
2359        let conn = store.pool.get().await.unwrap();
2360        let journal_size_limit = conn
2361            .query_row("PRAGMA journal_size_limit", (), |row| row.get::<_, u32>(0))
2362            .await
2363            .unwrap();
2364
2365        // The value passed to `SqliteStoreConfig` is in bytes. It stays in
2366        // bytes in SQLite.
2367        assert_eq!(journal_size_limit, 1500);
2368    }
2369
2370    statestore_integration_tests!();
2371}
2372
2373#[cfg(test)]
2374mod migration_tests {
2375    use std::{
2376        path::{Path, PathBuf},
2377        sync::{
2378            atomic::{AtomicU32, Ordering::SeqCst},
2379            Arc,
2380        },
2381    };
2382
2383    use as_variant::as_variant;
2384    use deadpool_sqlite::Runtime;
2385    use matrix_sdk_base::{
2386        media::{MediaFormat, MediaRequestParameters},
2387        store::{
2388            ChildTransactionId, DependentQueuedRequestKind, RoomLoadSettings,
2389            SerializableEventContent,
2390        },
2391        sync::UnreadNotificationsCount,
2392        RoomState, StateStore,
2393    };
2394    use matrix_sdk_test::async_test;
2395    use once_cell::sync::Lazy;
2396    use ruma::{
2397        events::{
2398            room::{create::RoomCreateEventContent, message::RoomMessageEventContent, MediaSource},
2399            StateEventType,
2400        },
2401        room_id, server_name, user_id, EventId, MilliSecondsSinceUnixEpoch, OwnedTransactionId,
2402        RoomId, TransactionId, UserId,
2403    };
2404    use rusqlite::Transaction;
2405    use serde::{Deserialize, Serialize};
2406    use serde_json::json;
2407    use tempfile::{tempdir, TempDir};
2408    use tokio::{fs, sync::Mutex};
2409    use zeroize::Zeroizing;
2410
2411    use super::{init, keys, SqliteStateStore, DATABASE_NAME};
2412    use crate::{
2413        error::{Error, Result},
2414        utils::{EncryptableStore as _, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
2415        OpenStoreError, Secret,
2416    };
2417
2418    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2419    static NUM: AtomicU32 = AtomicU32::new(0);
2420    const SECRET: &str = "secret";
2421
2422    fn new_path() -> PathBuf {
2423        let name = NUM.fetch_add(1, SeqCst).to_string();
2424        TMP_DIR.path().join(name)
2425    }
2426
2427    async fn create_fake_db(path: &Path, version: u8) -> Result<SqliteStateStore> {
2428        fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir).unwrap();
2429
2430        let config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
2431        // use default pool config
2432
2433        let pool = config.create_pool(Runtime::Tokio1).unwrap();
2434        let conn = pool.get().await?;
2435
2436        init(&conn).await?;
2437
2438        let store_cipher = Some(Arc::new(
2439            conn.get_or_create_store_cipher(Secret::PassPhrase(Zeroizing::new(SECRET.to_owned())))
2440                .await
2441                .unwrap(),
2442        ));
2443        let this = SqliteStateStore {
2444            store_cipher,
2445            pool,
2446            // Use `conn` as our selected write connections.
2447            write_connection: Arc::new(Mutex::new(conn)),
2448        };
2449        this.run_migrations(1, Some(version)).await?;
2450
2451        Ok(this)
2452    }
2453
2454    fn room_info_v1_json(
2455        room_id: &RoomId,
2456        state: RoomState,
2457        name: Option<&str>,
2458        creator: Option<&UserId>,
2459    ) -> serde_json::Value {
2460        // Test with name set or not.
2461        let name_content = match name {
2462            Some(name) => json!({ "name": name }),
2463            None => json!({ "name": null }),
2464        };
2465        // Test with creator set or not.
2466        let create_content = match creator {
2467            Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2468            None => RoomCreateEventContent::new_v11(),
2469        };
2470
2471        json!({
2472            "room_id": room_id,
2473            "room_type": state,
2474            "notification_counts": UnreadNotificationsCount::default(),
2475            "summary": {
2476                "heroes": [],
2477                "joined_member_count": 0,
2478                "invited_member_count": 0,
2479            },
2480            "members_synced": false,
2481            "base_info": {
2482                "dm_targets": [],
2483                "max_power_level": 100,
2484                "name": {
2485                    "Original": {
2486                        "content": name_content,
2487                    },
2488                },
2489                "create": {
2490                    "Original": {
2491                        "content": create_content,
2492                    }
2493                }
2494            },
2495        })
2496    }
2497
2498    #[async_test]
2499    pub async fn test_migrating_v1_to_v2() {
2500        let path = new_path();
2501        // Create and populate db.
2502        {
2503            let db = create_fake_db(&path, 1).await.unwrap();
2504            let conn = db.pool.get().await.unwrap();
2505
2506            let this = db.clone();
2507            conn.with_transaction(move |txn| {
2508                for i in 0..5 {
2509                    let room_id = RoomId::parse(format!("!room_{i}:localhost")).unwrap();
2510                    let (state, stripped) =
2511                        if i < 3 { (RoomState::Joined, false) } else { (RoomState::Invited, true) };
2512                    let info = room_info_v1_json(&room_id, state, None, None);
2513
2514                    let room_id = this.encode_key(keys::ROOM_INFO, room_id);
2515                    let data = this.serialize_json(&info)?;
2516
2517                    txn.prepare_cached(
2518                        "INSERT INTO room_info (room_id, stripped, data)
2519                         VALUES (?, ?, ?)",
2520                    )?
2521                    .execute((room_id, stripped, data))?;
2522                }
2523
2524                Result::<_, Error>::Ok(())
2525            })
2526            .await
2527            .unwrap();
2528        }
2529
2530        // This transparently migrates to the latest version.
2531        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2532
2533        // Check all room infos are there.
2534        assert_eq!(store.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 5);
2535    }
2536
2537    // Add a room in version 2 format of the state store.
2538    fn add_room_v2(
2539        this: &SqliteStateStore,
2540        txn: &Transaction<'_>,
2541        room_id: &RoomId,
2542        name: Option<&str>,
2543        create_creator: Option<&UserId>,
2544        create_sender: Option<&UserId>,
2545    ) -> Result<(), Error> {
2546        let room_info_json = room_info_v1_json(room_id, RoomState::Joined, name, create_creator);
2547
2548        let encoded_room_id = this.encode_key(keys::ROOM_INFO, room_id);
2549        let encoded_state =
2550            this.encode_key(keys::ROOM_INFO, serde_json::to_string(&RoomState::Joined)?);
2551        let data = this.serialize_json(&room_info_json)?;
2552
2553        txn.prepare_cached(
2554            "INSERT INTO room_info (room_id, state, data)
2555             VALUES (?, ?, ?)",
2556        )?
2557        .execute((encoded_room_id, encoded_state, data))?;
2558
2559        // Test with or without `m.room.create` event in the room state.
2560        let Some(create_sender) = create_sender else {
2561            return Ok(());
2562        };
2563
2564        let create_content = match create_creator {
2565            Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2566            None => RoomCreateEventContent::new_v11(),
2567        };
2568
2569        let event_id = EventId::new(server_name!("dummy.local"));
2570        let create_event = json!({
2571            "content": create_content,
2572            "event_id": event_id,
2573            "sender": create_sender.to_owned(),
2574            "origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
2575            "state_key": "",
2576            "type": "m.room.create",
2577            "unsigned": {},
2578        });
2579
2580        let encoded_room_id = this.encode_key(keys::STATE_EVENT, room_id);
2581        let encoded_event_type =
2582            this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
2583        let encoded_state_key = this.encode_key(keys::STATE_EVENT, "");
2584        let stripped = false;
2585        let encoded_event_id = this.encode_key(keys::STATE_EVENT, event_id);
2586        let data = this.serialize_json(&create_event)?;
2587
2588        txn.prepare_cached(
2589            "INSERT
2590             INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
2591             VALUES (?, ?, ?, ?, ?, ?)",
2592        )?
2593        .execute((
2594            encoded_room_id,
2595            encoded_event_type,
2596            encoded_state_key,
2597            stripped,
2598            encoded_event_id,
2599            data,
2600        ))?;
2601
2602        Ok(())
2603    }
2604
2605    #[async_test]
2606    pub async fn test_migrating_v2_to_v3() {
2607        let path = new_path();
2608
2609        // Room A: with name, creator and sender.
2610        let room_a_id = room_id!("!room_a:dummy.local");
2611        let room_a_name = "Room A";
2612        let room_a_creator = user_id!("@creator:dummy.local");
2613        // Use a different sender to check that sender is used over creator in
2614        // migration.
2615        let room_a_create_sender = user_id!("@sender:dummy.local");
2616
2617        // Room B: without name, creator and sender.
2618        let room_b_id = room_id!("!room_b:dummy.local");
2619
2620        // Room C: only with sender.
2621        let room_c_id = room_id!("!room_c:dummy.local");
2622        let room_c_create_sender = user_id!("@creator:dummy.local");
2623
2624        // Create and populate db.
2625        {
2626            let db = create_fake_db(&path, 2).await.unwrap();
2627            let conn = db.pool.get().await.unwrap();
2628
2629            let this = db.clone();
2630            conn.with_transaction(move |txn| {
2631                add_room_v2(
2632                    &this,
2633                    txn,
2634                    room_a_id,
2635                    Some(room_a_name),
2636                    Some(room_a_creator),
2637                    Some(room_a_create_sender),
2638                )?;
2639                add_room_v2(&this, txn, room_b_id, None, None, None)?;
2640                add_room_v2(&this, txn, room_c_id, None, None, Some(room_c_create_sender))?;
2641
2642                Result::<_, Error>::Ok(())
2643            })
2644            .await
2645            .unwrap();
2646        }
2647
2648        // This transparently migrates to the latest version.
2649        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2650
2651        // Check all room infos are there.
2652        let room_infos = store.get_room_infos(&RoomLoadSettings::default()).await.unwrap();
2653        assert_eq!(room_infos.len(), 3);
2654
2655        let room_a = room_infos.iter().find(|r| r.room_id() == room_a_id).unwrap();
2656        assert_eq!(room_a.name(), Some(room_a_name));
2657        assert_eq!(room_a.creators(), Some(vec![room_a_create_sender.to_owned()]));
2658
2659        let room_b = room_infos.iter().find(|r| r.room_id() == room_b_id).unwrap();
2660        assert_eq!(room_b.name(), None);
2661        assert_eq!(room_b.creators(), None);
2662
2663        let room_c = room_infos.iter().find(|r| r.room_id() == room_c_id).unwrap();
2664        assert_eq!(room_c.name(), None);
2665        assert_eq!(room_c.creators(), Some(vec![room_c_create_sender.to_owned()]));
2666    }
2667
2668    #[async_test]
2669    pub async fn test_migrating_v7_to_v9() {
2670        let path = new_path();
2671
2672        let room_id = room_id!("!room_a:dummy.local");
2673        let wedged_event_transaction_id = TransactionId::new();
2674        let local_event_transaction_id = TransactionId::new();
2675
2676        // Create and populate db.
2677        {
2678            let db = create_fake_db(&path, 7).await.unwrap();
2679            let conn = db.pool.get().await.unwrap();
2680
2681            let wedge_tx = wedged_event_transaction_id.clone();
2682            let local_tx = local_event_transaction_id.clone();
2683
2684            conn.with_transaction(move |txn| {
2685                add_dependent_send_queue_event_v7(
2686                    &db,
2687                    txn,
2688                    room_id,
2689                    &local_tx,
2690                    ChildTransactionId::new(),
2691                    DependentQueuedRequestKind::RedactEvent,
2692                )?;
2693                add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?;
2694                add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?;
2695                Result::<_, Error>::Ok(())
2696            })
2697            .await
2698            .unwrap();
2699        }
2700
2701        // This transparently migrates to the latest version, which clears up all
2702        // requests and dependent requests.
2703        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2704
2705        let requests = store.load_send_queue_requests(room_id).await.unwrap();
2706        assert!(requests.is_empty());
2707
2708        let dependent_requests = store.load_dependent_queued_requests(room_id).await.unwrap();
2709        assert!(dependent_requests.is_empty());
2710    }
2711
2712    fn add_send_queue_event_v7(
2713        this: &SqliteStateStore,
2714        txn: &Transaction<'_>,
2715        transaction_id: &TransactionId,
2716        room_id: &RoomId,
2717        is_wedged: bool,
2718    ) -> Result<(), Error> {
2719        let content =
2720            SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?;
2721
2722        let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id);
2723        let room_id_value = this.serialize_value(&room_id.to_owned())?;
2724
2725        let content = this.serialize_json(&content)?;
2726
2727        txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
2728            .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
2729
2730        Ok(())
2731    }
2732
2733    fn add_dependent_send_queue_event_v7(
2734        this: &SqliteStateStore,
2735        txn: &Transaction<'_>,
2736        room_id: &RoomId,
2737        parent_txn_id: &TransactionId,
2738        own_txn_id: ChildTransactionId,
2739        content: DependentQueuedRequestKind,
2740    ) -> Result<(), Error> {
2741        let room_id_value = this.serialize_value(&room_id.to_owned())?;
2742
2743        let parent_txn_id = parent_txn_id.to_string();
2744        let own_txn_id = own_txn_id.to_string();
2745        let content = this.serialize_json(&content)?;
2746
2747        txn.prepare_cached(
2748            "INSERT INTO dependent_send_queue_events
2749                         (room_id, parent_transaction_id, own_transaction_id, content)
2750                       VALUES (?, ?, ?, ?)",
2751        )?
2752        .execute((room_id_value, parent_txn_id, own_txn_id, content))?;
2753
2754        Ok(())
2755    }
2756
2757    #[derive(Clone, Debug, Serialize, Deserialize)]
2758    pub enum LegacyDependentQueuedRequestKind {
2759        UploadFileWithThumbnail {
2760            content_type: String,
2761            cache_key: MediaRequestParameters,
2762            related_to: OwnedTransactionId,
2763        },
2764    }
2765
2766    #[async_test]
2767    pub async fn test_dependent_queued_request_variant_renaming() {
2768        let path = new_path();
2769        let db = create_fake_db(&path, 7).await.unwrap();
2770
2771        let cache_key = MediaRequestParameters {
2772            format: MediaFormat::File,
2773            source: MediaSource::Plain("https://server.local/foobar".into()),
2774        };
2775        let related_to = TransactionId::new();
2776        let request = LegacyDependentQueuedRequestKind::UploadFileWithThumbnail {
2777            content_type: "image/png".to_owned(),
2778            cache_key,
2779            related_to: related_to.clone(),
2780        };
2781
2782        let data = db
2783            .serialize_json(&request)
2784            .expect("should be able to serialize legacy dependent request");
2785        let deserialized: DependentQueuedRequestKind = db.deserialize_json(&data).expect(
2786            "should be able to deserialize dependent request from legacy dependent request",
2787        );
2788
2789        as_variant!(deserialized, DependentQueuedRequestKind::UploadFileOrThumbnail { related_to: de_related_to, .. } => {
2790            assert_eq!(de_related_to, related_to);
2791        });
2792    }
2793}