matrix_sdk_sqlite/
state_store.rs

1use std::{
2    borrow::Cow,
3    collections::{BTreeMap, BTreeSet, HashMap},
4    fmt, iter,
5    path::Path,
6    str::FromStr as _,
7    sync::Arc,
8};
9
10use async_trait::async_trait;
11use matrix_sdk_base::{
12    MinimalRoomMemberEvent, ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK, RoomInfo,
13    RoomMemberships, RoomState, StateChanges, StateStore, StateStoreDataKey, StateStoreDataValue,
14    deserialized_responses::{DisplayName, RawAnySyncOrStrippedState, SyncOrStrippedState},
15    store::{
16        ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError,
17        QueuedRequest, QueuedRequestKind, RoomLoadSettings, SentRequestKey,
18        StoredThreadSubscription, ThreadSubscriptionStatus, migration_helpers::RoomInfoV1,
19    },
20};
21use matrix_sdk_store_encryption::StoreCipher;
22use ruma::{
23    CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
24    OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
25    canonical_json::{RedactedBecause, redact},
26    events::{
27        AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
28        GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
29        presence::PresenceEvent,
30        receipt::{Receipt, ReceiptThread, ReceiptType},
31        room::{
32            create::RoomCreateEventContent,
33            member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
34        },
35    },
36    serde::Raw,
37};
38use rusqlite::{OptionalExtension, Transaction};
39use serde::{Deserialize, Serialize};
40use tokio::{
41    fs,
42    sync::{Mutex, OwnedMutexGuard},
43};
44use tracing::{debug, instrument, warn};
45
46use crate::{
47    OpenStoreError, Secret, SqliteStoreConfig,
48    connection::{Connection as SqliteAsyncConn, Pool as SqlitePool},
49    error::{Error, Result},
50    utils::{
51        EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
52        SqliteKeyValueStoreConnExt, repeat_vars,
53    },
54};
55
56mod keys {
57    // Tables
58    pub const KV_BLOB: &str = "kv_blob";
59    pub const ROOM_INFO: &str = "room_info";
60    pub const STATE_EVENT: &str = "state_event";
61    pub const GLOBAL_ACCOUNT_DATA: &str = "global_account_data";
62    pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
63    pub const MEMBER: &str = "member";
64    pub const PROFILE: &str = "profile";
65    pub const RECEIPT: &str = "receipt";
66    pub const DISPLAY_NAME: &str = "display_name";
67    pub const SEND_QUEUE: &str = "send_queue_events";
68    pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
69    pub const THREAD_SUBSCRIPTIONS: &str = "thread_subscriptions";
70}
71
72/// The filename used for the SQLITE database file used by the state store.
73pub const DATABASE_NAME: &str = "matrix-sdk-state.sqlite3";
74
75/// Identifier of the latest database version.
76///
77/// This is used to figure whether the SQLite database requires a migration.
78/// Every new SQL migration should imply a bump of this number, and changes in
79/// the [`SqliteStateStore::run_migrations`] function.
80const DATABASE_VERSION: u8 = 14;
81
82/// An SQLite-based state store.
83#[derive(Clone)]
84pub struct SqliteStateStore {
85    store_cipher: Option<Arc<StoreCipher>>,
86
87    /// The pool of connections.
88    pool: SqlitePool,
89
90    /// We make the difference between connections for read operations, and for
91    /// write operations. We keep a single connection apart from write
92    /// operations. All other connections are used for read operations. The
93    /// lock is used to ensure there is one owner at a time.
94    write_connection: Arc<Mutex<SqliteAsyncConn>>,
95}
96
97#[cfg(not(tarpaulin_include))]
98impl fmt::Debug for SqliteStateStore {
99    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100        f.debug_struct("SqliteStateStore").finish_non_exhaustive()
101    }
102}
103
104impl SqliteStateStore {
105    /// Open the SQLite-based state store at the given path using the given
106    /// given passphrase to encrypt private data.
107    pub async fn open(
108        path: impl AsRef<Path>,
109        passphrase: Option<&str>,
110    ) -> Result<Self, OpenStoreError> {
111        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
112    }
113
114    /// Open the SQLite-based state store at the given path using the given
115    /// key to encrypt private data.
116    pub async fn open_with_key(
117        path: impl AsRef<Path>,
118        key: Option<&[u8; 32]>,
119    ) -> Result<Self, OpenStoreError> {
120        Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
121    }
122
123    /// Open the SQLite-based state store with the config open config.
124    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
125        fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir)?;
126
127        let pool = config.build_pool_of_connections(DATABASE_NAME)?;
128
129        let this = Self::open_with_pool(pool, config.secret).await?;
130        this.pool.get().await?.apply_runtime_config(config.runtime_config).await?;
131
132        Ok(this)
133    }
134
135    /// Create an SQLite-based state store using the given SQLite database pool.
136    /// The given secret will be used to encrypt private data.
137    pub async fn open_with_pool(
138        pool: SqlitePool,
139        secret: Option<Secret>,
140    ) -> Result<Self, OpenStoreError> {
141        let conn = pool.get().await?;
142
143        let mut version = conn.db_version().await?;
144
145        if version == 0 {
146            init(&conn).await?;
147            version = 1;
148        }
149
150        let store_cipher = match secret {
151            Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
152            None => None,
153        };
154        let this = Self {
155            store_cipher,
156            pool,
157            // Use `conn` as our selected write connections.
158            write_connection: Arc::new(Mutex::new(conn)),
159        };
160        this.run_migrations(version, None).await?;
161
162        this.read().await?.wal_checkpoint().await;
163
164        Ok(this)
165    }
166
167    /// Run database migrations from the given `from` version to the given `to`
168    /// version
169    ///
170    /// If `to` is `None`, the current database version will be used.
171    async fn run_migrations(&self, from: u8, to: Option<u8>) -> Result<()> {
172        let to = to.unwrap_or(DATABASE_VERSION);
173
174        if from < to {
175            debug!(version = from, new_version = to, "Upgrading database");
176        } else {
177            return Ok(());
178        }
179
180        let conn = self.write().await;
181
182        if from < 2 && to >= 2 {
183            let this = self.clone();
184            conn.with_transaction(move |txn| {
185                // Create new table.
186                txn.execute_batch(include_str!(
187                    "../migrations/state_store/002_a_create_new_room_info.sql"
188                ))?;
189
190                // Migrate data to new table.
191                for data in txn
192                    .prepare("SELECT data FROM room_info")?
193                    .query_map((), |row| row.get::<_, Vec<u8>>(0))?
194                {
195                    let data = data?;
196                    let room_info: RoomInfoV1 = this.deserialize_json(&data)?;
197
198                    let room_id = this.encode_key(keys::ROOM_INFO, room_info.room_id());
199                    let state = this
200                        .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
201                    txn.prepare_cached(
202                        "INSERT OR REPLACE INTO new_room_info (room_id, state, data)
203                         VALUES (?, ?, ?)",
204                    )?
205                    .execute((room_id, state, data))?;
206                }
207
208                // Replace old table.
209                txn.execute_batch(include_str!(
210                    "../migrations/state_store/002_b_replace_room_info.sql"
211                ))?;
212
213                txn.set_db_version(2)?;
214                Result::<_, Error>::Ok(())
215            })
216            .await?;
217        }
218
219        // Migration to v3: RoomInfo format has changed.
220        if from < 3 && to >= 3 {
221            let this = self.clone();
222            conn.with_transaction(move |txn| {
223                // Migrate data .
224                for data in txn
225                    .prepare("SELECT data FROM room_info")?
226                    .query_map((), |row| row.get::<_, Vec<u8>>(0))?
227                {
228                    let data = data?;
229                    let room_info_v1: RoomInfoV1 = this.deserialize_json(&data)?;
230
231                    // Get the `m.room.create` event from the room state.
232                    let room_id = this.encode_key(keys::STATE_EVENT, room_info_v1.room_id());
233                    let event_type =
234                        this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
235                    let create_res = txn
236                        .prepare(
237                            "SELECT stripped, data FROM state_event
238                             WHERE room_id = ? AND event_type = ?",
239                        )?
240                        .query_row([room_id, event_type], |row| {
241                            Ok((row.get::<_, bool>(0)?, row.get::<_, Vec<u8>>(1)?))
242                        })
243                        .optional()?;
244
245                    let create = create_res.and_then(|(stripped, data)| {
246                        let create = if stripped {
247                            SyncOrStrippedState::<RoomCreateEventContent>::Stripped(
248                                this.deserialize_json(&data).ok()?,
249                            )
250                        } else {
251                            SyncOrStrippedState::Sync(this.deserialize_json(&data).ok()?)
252                        };
253                        Some(create)
254                    });
255
256                    let migrated_room_info = room_info_v1.migrate(create.as_ref());
257
258                    let data = this.serialize_json(&migrated_room_info)?;
259                    let room_id = this.encode_key(keys::ROOM_INFO, migrated_room_info.room_id());
260                    txn.prepare_cached("UPDATE room_info SET data = ? WHERE room_id = ?")?
261                        .execute((data, room_id))?;
262                }
263
264                txn.set_db_version(3)?;
265                Result::<_, Error>::Ok(())
266            })
267            .await?;
268        }
269
270        if from < 4 && to >= 4 {
271            conn.with_transaction(move |txn| {
272                // Create new table.
273                txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
274                txn.set_db_version(4)
275            })
276            .await?;
277        }
278
279        if from < 5 && to >= 5 {
280            conn.with_transaction(move |txn| {
281                // Create new table.
282                txn.execute_batch(include_str!(
283                    "../migrations/state_store/004_send_queue_with_roomid_value.sql"
284                ))?;
285                txn.set_db_version(4)
286            })
287            .await?;
288        }
289
290        if from < 6 && to >= 6 {
291            conn.with_transaction(move |txn| {
292                // Create new table.
293                txn.execute_batch(include_str!(
294                    "../migrations/state_store/005_send_queue_dependent_events.sql"
295                ))?;
296                txn.set_db_version(6)
297            })
298            .await?;
299        }
300
301        if from < 7 && to >= 7 {
302            conn.with_transaction(move |txn| {
303                // Drop media table.
304                txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
305                txn.set_db_version(7)
306            })
307            .await?;
308        }
309
310        if from < 8 && to >= 8 {
311            // Replace all existing wedged events with a generic error.
312            let error = QueueWedgeError::GenericApiError {
313                msg: "local echo failed to send in a previous session".into(),
314            };
315            let default_err = self.serialize_value(&error)?;
316
317            conn.with_transaction(move |txn| {
318                // Update send queue table to persist the wedge reason if any.
319                txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?;
320
321                // Migrate the data, add a generic error for currently wedged events
322
323                for wedged_entries in txn
324                    .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")?
325                    .query_map((), |row| {
326                        Ok(
327                            (row.get::<_, Vec<u8>>(0)?,row.get::<_, String>(1)?)
328                        )
329                    })? {
330
331                    let (room_id, transaction_id) = wedged_entries?;
332
333                    txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?
334                        .execute((default_err.clone(), room_id, transaction_id))?;
335                }
336
337
338                // Clean up the table now that data is migrated
339                txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?;
340
341                txn.set_db_version(8)
342            })
343                .await?;
344        }
345
346        if from < 9 && to >= 9 {
347            conn.with_transaction(move |txn| {
348                // Run the migration.
349                txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?;
350                txn.set_db_version(9)
351            })
352            .await?;
353        }
354
355        if from < 10 && to >= 10 {
356            conn.with_transaction(move |txn| {
357                // Run the migration.
358                txn.execute_batch(include_str!(
359                    "../migrations/state_store/009_send_queue_priority.sql"
360                ))?;
361                txn.set_db_version(10)
362            })
363            .await?;
364        }
365
366        if from < 11 && to >= 11 {
367            conn.with_transaction(move |txn| {
368                // Run the migration.
369                txn.execute_batch(include_str!(
370                    "../migrations/state_store/010_send_queue_enqueue_time.sql"
371                ))?;
372                txn.set_db_version(11)
373            })
374            .await?;
375        }
376
377        if from < 12 && to >= 12 {
378            // Defragment the DB and optimize its size on the filesystem.
379            // This should have been run in the migration for version 7, to reduce the size
380            // of the DB as we removed the media cache.
381            conn.vacuum().await?;
382            conn.set_kv("version", vec![12]).await?;
383        }
384
385        if from < 13 && to >= 13 {
386            conn.with_transaction(move |txn| {
387                // Run the migration.
388                txn.execute_batch(include_str!(
389                    "../migrations/state_store/011_thread_subscriptions.sql"
390                ))?;
391                txn.set_db_version(13)
392            })
393            .await?;
394        }
395
396        if from < 14 && to >= 14 {
397            conn.with_transaction(move |txn| {
398                // Run the migration.
399                txn.execute_batch(include_str!(
400                    "../migrations/state_store/012_thread_subscriptions_bumpstamp.sql"
401                ))?;
402                txn.set_db_version(14)
403            })
404            .await?;
405        }
406
407        if from < 15 && to >= 15 {
408            conn.with_transaction(move |txn| {
409                // Run the migration.
410                txn.execute_batch(include_str!(
411                    "../migrations/state_store/013_send_queue_new_parent_key_format.sql"
412                ))?;
413                txn.set_db_version(15)
414            })
415            .await?;
416        }
417
418        Ok(())
419    }
420
421    fn encode_state_store_data_key(&self, key: StateStoreDataKey<'_>) -> Key {
422        let key_s = match key {
423            StateStoreDataKey::SyncToken => Cow::Borrowed(StateStoreDataKey::SYNC_TOKEN),
424            StateStoreDataKey::SupportedVersions => {
425                Cow::Borrowed(StateStoreDataKey::SUPPORTED_VERSIONS)
426            }
427            StateStoreDataKey::WellKnown => Cow::Borrowed(StateStoreDataKey::WELL_KNOWN),
428            StateStoreDataKey::Filter(f) => {
429                Cow::Owned(format!("{}:{f}", StateStoreDataKey::FILTER))
430            }
431            StateStoreDataKey::UserAvatarUrl(u) => {
432                Cow::Owned(format!("{}:{u}", StateStoreDataKey::USER_AVATAR_URL))
433            }
434            StateStoreDataKey::RecentlyVisitedRooms(b) => {
435                Cow::Owned(format!("{}:{b}", StateStoreDataKey::RECENTLY_VISITED_ROOMS))
436            }
437            StateStoreDataKey::UtdHookManagerData => {
438                Cow::Borrowed(StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
439            }
440            StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
441                Cow::Borrowed(StateStoreDataKey::ONE_TIME_KEY_ALREADY_UPLOADED)
442            }
443            StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
444                if let Some(thread_root) = thread_root {
445                    Cow::Owned(format!(
446                        "{}:{room_id}:{thread_root}",
447                        StateStoreDataKey::COMPOSER_DRAFT
448                    ))
449                } else {
450                    Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::COMPOSER_DRAFT))
451                }
452            }
453            StateStoreDataKey::SeenKnockRequests(room_id) => {
454                Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::SEEN_KNOCK_REQUESTS))
455            }
456            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
457                Cow::Borrowed(StateStoreDataKey::THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS)
458            }
459        };
460
461        self.encode_key(keys::KV_BLOB, &*key_s)
462    }
463
464    fn encode_presence_key(&self, user_id: &UserId) -> Key {
465        self.encode_key(keys::KV_BLOB, format!("presence:{user_id}"))
466    }
467
468    fn encode_custom_key(&self, key: &[u8]) -> Key {
469        let mut full_key = b"custom:".to_vec();
470        full_key.extend(key);
471        self.encode_key(keys::KV_BLOB, full_key)
472    }
473
474    /// Acquire a connection for executing read operations.
475    #[instrument(skip_all)]
476    async fn read(&self) -> Result<SqliteAsyncConn> {
477        Ok(self.pool.get().await?)
478    }
479
480    /// Acquire a connection for executing write operations.
481    #[instrument(skip_all)]
482    async fn write(&self) -> OwnedMutexGuard<SqliteAsyncConn> {
483        self.write_connection.clone().lock_owned().await
484    }
485
486    fn remove_maybe_stripped_room_data(
487        &self,
488        txn: &Transaction<'_>,
489        room_id: &RoomId,
490        stripped: bool,
491    ) -> rusqlite::Result<()> {
492        let state_event_room_id = self.encode_key(keys::STATE_EVENT, room_id);
493        txn.remove_room_state_events(&state_event_room_id, Some(stripped))?;
494
495        let member_room_id = self.encode_key(keys::MEMBER, room_id);
496        txn.remove_room_members(&member_room_id, Some(stripped))
497    }
498
499    pub async fn vacuum(&self) -> Result<()> {
500        self.write_connection.lock().await.vacuum().await
501    }
502
503    pub async fn get_db_size(&self) -> Result<Option<usize>> {
504        let read_conn = self.pool.get().await?;
505        Ok(Some(read_conn.get_db_size().await?))
506    }
507}
508
509impl EncryptableStore for SqliteStateStore {
510    fn get_cypher(&self) -> Option<&StoreCipher> {
511        self.store_cipher.as_deref()
512    }
513}
514
515/// Initialize the database.
516async fn init(conn: &SqliteAsyncConn) -> Result<()> {
517    // First turn on WAL mode, this can't be done in the transaction, it fails with
518    // the error message: "cannot change into wal mode from within a transaction".
519    conn.execute_batch("PRAGMA journal_mode = wal;").await?;
520    conn.with_transaction(|txn| {
521        txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
522        txn.set_db_version(1)?;
523
524        Ok(())
525    })
526    .await
527}
528
529trait SqliteConnectionStateStoreExt {
530    fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()>;
531
532    fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()>;
533
534    fn set_room_account_data(
535        &self,
536        room_id: &[u8],
537        event_type: &[u8],
538        data: &[u8],
539    ) -> rusqlite::Result<()>;
540    fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()>;
541
542    fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()>;
543    fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>>;
544    fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()>;
545
546    fn set_state_event(
547        &self,
548        room_id: &[u8],
549        event_type: &[u8],
550        state_key: &[u8],
551        stripped: bool,
552        event_id: Option<&[u8]>,
553        data: &[u8],
554    ) -> rusqlite::Result<()>;
555    fn get_state_event_by_id(
556        &self,
557        room_id: &[u8],
558        event_id: &[u8],
559    ) -> rusqlite::Result<Option<Vec<u8>>>;
560    fn remove_room_state_events(
561        &self,
562        room_id: &[u8],
563        stripped: Option<bool>,
564    ) -> rusqlite::Result<()>;
565
566    fn set_member(
567        &self,
568        room_id: &[u8],
569        user_id: &[u8],
570        membership: &[u8],
571        stripped: bool,
572        data: &[u8],
573    ) -> rusqlite::Result<()>;
574    fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()>;
575
576    fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
577    fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()>;
578    fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()>;
579
580    fn set_receipt(
581        &self,
582        room_id: &[u8],
583        user_id: &[u8],
584        receipt_type: &[u8],
585        thread_id: &[u8],
586        event_id: &[u8],
587        data: &[u8],
588    ) -> rusqlite::Result<()>;
589    fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()>;
590
591    fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()>;
592    fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()>;
593    fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()>;
594    fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
595    fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
596}
597
598impl SqliteConnectionStateStoreExt for rusqlite::Connection {
599    fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()> {
600        self.execute("INSERT OR REPLACE INTO kv_blob VALUES (?, ?)", (key, value))?;
601        Ok(())
602    }
603
604    fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()> {
605        self.prepare_cached(
606            "INSERT OR REPLACE INTO global_account_data (event_type, data)
607             VALUES (?, ?)",
608        )?
609        .execute((event_type, data))?;
610        Ok(())
611    }
612
613    fn set_room_account_data(
614        &self,
615        room_id: &[u8],
616        event_type: &[u8],
617        data: &[u8],
618    ) -> rusqlite::Result<()> {
619        self.prepare_cached(
620            "INSERT OR REPLACE INTO room_account_data (room_id, event_type, data)
621             VALUES (?, ?, ?)",
622        )?
623        .execute((room_id, event_type, data))?;
624        Ok(())
625    }
626
627    fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()> {
628        self.prepare(
629            "DELETE FROM room_account_data
630             WHERE room_id = ?",
631        )?
632        .execute((room_id,))?;
633        Ok(())
634    }
635
636    fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()> {
637        self.prepare_cached(
638            "INSERT OR REPLACE INTO room_info (room_id, state, data)
639             VALUES (?, ?, ?)",
640        )?
641        .execute((room_id, state, data))?;
642        Ok(())
643    }
644
645    fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>> {
646        self.query_row("SELECT data FROM room_info WHERE room_id = ?", (room_id,), |row| row.get(0))
647            .optional()
648    }
649
650    /// Remove the room info for the given room.
651    fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()> {
652        self.prepare_cached("DELETE FROM room_info WHERE room_id = ?")?.execute((room_id,))?;
653        Ok(())
654    }
655
656    fn set_state_event(
657        &self,
658        room_id: &[u8],
659        event_type: &[u8],
660        state_key: &[u8],
661        stripped: bool,
662        event_id: Option<&[u8]>,
663        data: &[u8],
664    ) -> rusqlite::Result<()> {
665        self.prepare_cached(
666            "INSERT OR REPLACE
667             INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
668             VALUES (?, ?, ?, ?, ?, ?)",
669        )?
670        .execute((room_id, event_type, state_key, stripped, event_id, data))?;
671        Ok(())
672    }
673
674    fn get_state_event_by_id(
675        &self,
676        room_id: &[u8],
677        event_id: &[u8],
678    ) -> rusqlite::Result<Option<Vec<u8>>> {
679        self.query_row(
680            "SELECT data FROM state_event WHERE room_id = ? AND event_id = ?",
681            (room_id, event_id),
682            |row| row.get(0),
683        )
684        .optional()
685    }
686
687    /// Remove state events for the given room.
688    ///
689    /// If `stripped` is `Some()`, only removes state events for the given
690    /// stripped state. Otherwise, state events are removed regardless of the
691    /// stripped state.
692    fn remove_room_state_events(
693        &self,
694        room_id: &[u8],
695        stripped: Option<bool>,
696    ) -> rusqlite::Result<()> {
697        if let Some(stripped) = stripped {
698            self.prepare_cached("DELETE FROM state_event WHERE room_id = ? AND stripped = ?")?
699                .execute((room_id, stripped))?;
700        } else {
701            self.prepare_cached("DELETE FROM state_event WHERE room_id = ?")?
702                .execute((room_id,))?;
703        }
704        Ok(())
705    }
706
707    fn set_member(
708        &self,
709        room_id: &[u8],
710        user_id: &[u8],
711        membership: &[u8],
712        stripped: bool,
713        data: &[u8],
714    ) -> rusqlite::Result<()> {
715        self.prepare_cached(
716            "INSERT OR REPLACE
717             INTO member (room_id, user_id, membership, stripped, data)
718             VALUES (?, ?, ?, ?, ?)",
719        )?
720        .execute((room_id, user_id, membership, stripped, data))?;
721        Ok(())
722    }
723
724    /// Remove members for the given room.
725    ///
726    /// If `stripped` is `Some()`, only removes members for the given stripped
727    /// state. Otherwise, members are removed regardless of the stripped state.
728    fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()> {
729        if let Some(stripped) = stripped {
730            self.prepare_cached("DELETE FROM member WHERE room_id = ? AND stripped = ?")?
731                .execute((room_id, stripped))?;
732        } else {
733            self.prepare_cached("DELETE FROM member WHERE room_id = ?")?.execute((room_id,))?;
734        }
735        Ok(())
736    }
737
738    fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
739        self.prepare_cached(
740            "INSERT OR REPLACE
741             INTO profile (room_id, user_id, data)
742             VALUES (?, ?, ?)",
743        )?
744        .execute((room_id, user_id, data))?;
745        Ok(())
746    }
747
748    fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()> {
749        self.prepare("DELETE FROM profile WHERE room_id = ?")?.execute((room_id,))?;
750        Ok(())
751    }
752
753    fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()> {
754        self.prepare("DELETE FROM profile WHERE room_id = ? AND user_id = ?")?
755            .execute((room_id, user_id))?;
756        Ok(())
757    }
758
759    fn set_receipt(
760        &self,
761        room_id: &[u8],
762        user_id: &[u8],
763        receipt_type: &[u8],
764        thread: &[u8],
765        event_id: &[u8],
766        data: &[u8],
767    ) -> rusqlite::Result<()> {
768        self.prepare_cached(
769            "INSERT OR REPLACE
770             INTO receipt (room_id, user_id, receipt_type, thread, event_id, data)
771             VALUES (?, ?, ?, ?, ?, ?)",
772        )?
773        .execute((room_id, user_id, receipt_type, thread, event_id, data))?;
774        Ok(())
775    }
776
777    fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()> {
778        self.prepare("DELETE FROM receipt WHERE room_id = ?")?.execute((room_id,))?;
779        Ok(())
780    }
781
782    fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
783        self.prepare_cached(
784            "INSERT OR REPLACE
785             INTO display_name (room_id, name, data)
786             VALUES (?, ?, ?)",
787        )?
788        .execute((room_id, name, data))?;
789        Ok(())
790    }
791
792    fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()> {
793        self.prepare("DELETE FROM display_name WHERE room_id = ? AND name = ?")?
794            .execute((room_id, name))?;
795        Ok(())
796    }
797
798    fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()> {
799        self.prepare("DELETE FROM display_name WHERE room_id = ?")?.execute((room_id,))?;
800        Ok(())
801    }
802
803    fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
804        self.prepare("DELETE FROM send_queue_events WHERE room_id = ?")?.execute((room_id,))?;
805        Ok(())
806    }
807
808    fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
809        self.prepare("DELETE FROM dependent_send_queue_events WHERE room_id = ?")?
810            .execute((room_id,))?;
811        Ok(())
812    }
813}
814
815#[async_trait]
816trait SqliteObjectStateStoreExt: SqliteAsyncConnExt {
817    async fn get_kv_blob(&self, key: Key) -> Result<Option<Vec<u8>>> {
818        Ok(self
819            .query_row("SELECT value FROM kv_blob WHERE key = ?", (key,), |row| row.get(0))
820            .await
821            .optional()?)
822    }
823
824    async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
825        let keys_length = keys.len();
826
827        self.chunk_large_query_over(keys, Some(keys_length), |txn, keys| {
828            let sql_params = repeat_vars(keys.len());
829            let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");
830
831            let params = rusqlite::params_from_iter(keys);
832
833            Ok(txn
834                .prepare(&sql)?
835                .query(params)?
836                .mapped(|row| row.get(0))
837                .collect::<Result<_, _>>()?)
838        })
839        .await
840    }
841
842    async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;
843
844    async fn delete_kv_blob(&self, key: Key) -> Result<()> {
845        self.execute("DELETE FROM kv_blob WHERE key = ?", (key,)).await?;
846        Ok(())
847    }
848
849    async fn get_room_infos(&self, room_id: Option<Key>) -> Result<Vec<Vec<u8>>> {
850        Ok(match room_id {
851            None => {
852                self.prepare("SELECT data FROM room_info", move |mut stmt| {
853                    stmt.query_map((), |row| row.get(0))?.collect()
854                })
855                .await?
856            }
857
858            Some(room_id) => {
859                self.prepare("SELECT data FROM room_info WHERE room_id = ?", move |mut stmt| {
860                    stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
861                })
862                .await?
863            }
864        })
865    }
866
867    async fn get_maybe_stripped_state_events_for_keys(
868        &self,
869        room_id: Key,
870        event_type: Key,
871        state_keys: Vec<Key>,
872    ) -> Result<Vec<(bool, Vec<u8>)>> {
873        self.chunk_large_query_over(state_keys, None, move |txn, state_keys: Vec<Key>| {
874            let sql_params = repeat_vars(state_keys.len());
875            let sql = format!(
876                "SELECT stripped, data FROM state_event
877                 WHERE room_id = ? AND event_type = ? AND state_key IN ({sql_params})"
878            );
879
880            let params = rusqlite::params_from_iter(
881                [room_id.clone(), event_type.clone()].into_iter().chain(state_keys),
882            );
883
884            Ok(txn
885                .prepare(&sql)?
886                .query(params)?
887                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
888                .collect::<Result<_, _>>()?)
889        })
890        .await
891    }
892
893    async fn get_maybe_stripped_state_events(
894        &self,
895        room_id: Key,
896        event_type: Key,
897    ) -> Result<Vec<(bool, Vec<u8>)>> {
898        Ok(self
899            .prepare(
900                "SELECT stripped, data FROM state_event
901                 WHERE room_id = ? AND event_type = ?",
902                |mut stmt| {
903                    stmt.query((room_id, event_type))?
904                        .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
905                        .collect()
906                },
907            )
908            .await?)
909    }
910
911    async fn get_profiles(
912        &self,
913        room_id: Key,
914        user_ids: Vec<Key>,
915    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
916        let user_ids_length = user_ids.len();
917
918        self.chunk_large_query_over(user_ids, Some(user_ids_length), move |txn, user_ids| {
919            let sql_params = repeat_vars(user_ids.len());
920            let sql = format!(
921                "SELECT user_id, data FROM profile WHERE room_id = ? AND user_id IN ({sql_params})"
922            );
923
924            let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(user_ids));
925
926            Ok(txn
927                .prepare(&sql)?
928                .query(params)?
929                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
930                .collect::<Result<_, _>>()?)
931        })
932        .await
933    }
934
935    async fn get_user_ids(&self, room_id: Key, memberships: Vec<Key>) -> Result<Vec<Vec<u8>>> {
936        let res = if memberships.is_empty() {
937            self.prepare("SELECT data FROM member WHERE room_id = ?", |mut stmt| {
938                stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
939            })
940            .await?
941        } else {
942            self.chunk_large_query_over(memberships, None, move |txn, memberships| {
943                let sql_params = repeat_vars(memberships.len());
944                let sql = format!(
945                    "SELECT data FROM member WHERE room_id = ? AND membership IN ({sql_params})"
946                );
947
948                let params =
949                    rusqlite::params_from_iter(iter::once(room_id.clone()).chain(memberships));
950
951                Ok(txn
952                    .prepare(&sql)?
953                    .query(params)?
954                    .mapped(|row| row.get(0))
955                    .collect::<Result<_, _>>()?)
956            })
957            .await?
958        };
959
960        Ok(res)
961    }
962
963    async fn get_global_account_data(&self, event_type: Key) -> Result<Option<Vec<u8>>> {
964        Ok(self
965            .query_row(
966                "SELECT data FROM global_account_data WHERE event_type = ?",
967                (event_type,),
968                |row| row.get(0),
969            )
970            .await
971            .optional()?)
972    }
973
974    async fn get_room_account_data(
975        &self,
976        room_id: Key,
977        event_type: Key,
978    ) -> Result<Option<Vec<u8>>> {
979        Ok(self
980            .query_row(
981                "SELECT data FROM room_account_data WHERE room_id = ? AND event_type = ?",
982                (room_id, event_type),
983                |row| row.get(0),
984            )
985            .await
986            .optional()?)
987    }
988
989    async fn get_display_names(
990        &self,
991        room_id: Key,
992        names: Vec<Key>,
993    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
994        let names_length = names.len();
995
996        self.chunk_large_query_over(names, Some(names_length), move |txn, names| {
997            let sql_params = repeat_vars(names.len());
998            let sql = format!(
999                "SELECT name, data FROM display_name WHERE room_id = ? AND name IN ({sql_params})"
1000            );
1001
1002            let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(names));
1003
1004            Ok(txn
1005                .prepare(&sql)?
1006                .query(params)?
1007                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
1008                .collect::<Result<_, _>>()?)
1009        })
1010        .await
1011    }
1012
1013    async fn get_user_receipt(
1014        &self,
1015        room_id: Key,
1016        receipt_type: Key,
1017        thread: Key,
1018        user_id: Key,
1019    ) -> Result<Option<Vec<u8>>> {
1020        Ok(self
1021            .query_row(
1022                "SELECT data FROM receipt
1023                 WHERE room_id = ? AND receipt_type = ? AND thread = ? and user_id = ?",
1024                (room_id, receipt_type, thread, user_id),
1025                |row| row.get(0),
1026            )
1027            .await
1028            .optional()?)
1029    }
1030
1031    async fn get_event_receipts(
1032        &self,
1033        room_id: Key,
1034        receipt_type: Key,
1035        thread: Key,
1036        event_id: Key,
1037    ) -> Result<Vec<Vec<u8>>> {
1038        Ok(self
1039            .prepare(
1040                "SELECT data FROM receipt
1041                 WHERE room_id = ? AND receipt_type = ? AND thread = ? and event_id = ?",
1042                |mut stmt| {
1043                    stmt.query((room_id, receipt_type, thread, event_id))?
1044                        .mapped(|row| row.get(0))
1045                        .collect()
1046                },
1047            )
1048            .await?)
1049    }
1050}
1051
1052#[async_trait]
1053impl SqliteObjectStateStoreExt for SqliteAsyncConn {
1054    async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()> {
1055        Ok(self.interact(move |conn| conn.set_kv_blob(&key, &value)).await.unwrap()?)
1056    }
1057}
1058
1059#[async_trait]
1060impl StateStore for SqliteStateStore {
1061    type Error = Error;
1062
1063    async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
1064        self.read()
1065            .await?
1066            .get_kv_blob(self.encode_state_store_data_key(key))
1067            .await?
1068            .map(|data| {
1069                Ok(match key {
1070                    StateStoreDataKey::SyncToken => {
1071                        StateStoreDataValue::SyncToken(self.deserialize_value(&data)?)
1072                    }
1073                    StateStoreDataKey::SupportedVersions => {
1074                        StateStoreDataValue::SupportedVersions(self.deserialize_value(&data)?)
1075                    }
1076                    StateStoreDataKey::WellKnown => {
1077                        StateStoreDataValue::WellKnown(self.deserialize_value(&data)?)
1078                    }
1079                    StateStoreDataKey::Filter(_) => {
1080                        StateStoreDataValue::Filter(self.deserialize_value(&data)?)
1081                    }
1082                    StateStoreDataKey::UserAvatarUrl(_) => {
1083                        StateStoreDataValue::UserAvatarUrl(self.deserialize_value(&data)?)
1084                    }
1085                    StateStoreDataKey::RecentlyVisitedRooms(_) => {
1086                        StateStoreDataValue::RecentlyVisitedRooms(self.deserialize_value(&data)?)
1087                    }
1088                    StateStoreDataKey::UtdHookManagerData => {
1089                        StateStoreDataValue::UtdHookManagerData(self.deserialize_value(&data)?)
1090                    }
1091                    StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1092                        StateStoreDataValue::OneTimeKeyAlreadyUploaded
1093                    }
1094                    StateStoreDataKey::ComposerDraft(_, _) => {
1095                        StateStoreDataValue::ComposerDraft(self.deserialize_value(&data)?)
1096                    }
1097                    StateStoreDataKey::SeenKnockRequests(_) => {
1098                        StateStoreDataValue::SeenKnockRequests(self.deserialize_value(&data)?)
1099                    }
1100                    StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
1101                        StateStoreDataValue::ThreadSubscriptionsCatchupTokens(
1102                            self.deserialize_value(&data)?,
1103                        )
1104                    }
1105                })
1106            })
1107            .transpose()
1108    }
1109
1110    async fn set_kv_data(
1111        &self,
1112        key: StateStoreDataKey<'_>,
1113        value: StateStoreDataValue,
1114    ) -> Result<()> {
1115        let serialized_value = match key {
1116            StateStoreDataKey::SyncToken => self.serialize_value(
1117                &value.into_sync_token().expect("Session data not a sync token"),
1118            )?,
1119            StateStoreDataKey::SupportedVersions => self.serialize_value(
1120                &value
1121                    .into_supported_versions()
1122                    .expect("Session data not containing supported versions"),
1123            )?,
1124            StateStoreDataKey::WellKnown => self.serialize_value(
1125                &value.into_well_known().expect("Session data not containing well-known"),
1126            )?,
1127            StateStoreDataKey::Filter(_) => {
1128                self.serialize_value(&value.into_filter().expect("Session data not a filter"))?
1129            }
1130            StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
1131                &value.into_user_avatar_url().expect("Session data not an user avatar url"),
1132            )?,
1133            StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
1134                &value.into_recently_visited_rooms().expect("Session data not breadcrumbs"),
1135            )?,
1136            StateStoreDataKey::UtdHookManagerData => self.serialize_value(
1137                &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
1138            )?,
1139            StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1140                self.serialize_value(&true).expect("We should be able to serialize a boolean")
1141            }
1142            StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
1143                &value.into_composer_draft().expect("Session data not a composer draft"),
1144            )?,
1145            StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
1146                &value
1147                    .into_seen_knock_requests()
1148                    .expect("Session data is not a set of seen knock request ids"),
1149            )?,
1150            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => self.serialize_value(
1151                &value
1152                    .into_thread_subscriptions_catchup_tokens()
1153                    .expect("Session data is not a list of thread subscription catchup tokens"),
1154            )?,
1155        };
1156
1157        self.write()
1158            .await
1159            .set_kv_blob(self.encode_state_store_data_key(key), serialized_value)
1160            .await
1161    }
1162
1163    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
1164        self.write().await.delete_kv_blob(self.encode_state_store_data_key(key)).await
1165    }
1166
1167    async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
1168        let changes = changes.to_owned();
1169        let this = self.clone();
1170        self.write()
1171            .await
1172            .with_transaction(move |txn| {
1173                let StateChanges {
1174                    sync_token,
1175                    account_data,
1176                    presence,
1177                    profiles,
1178                    profiles_to_delete,
1179                    state,
1180                    room_account_data,
1181                    room_infos,
1182                    receipts,
1183                    redactions,
1184                    stripped_state,
1185                    ambiguity_maps,
1186                } = changes;
1187
1188                if let Some(sync_token) = sync_token {
1189                    let key = this.encode_state_store_data_key(StateStoreDataKey::SyncToken);
1190                    let value = this.serialize_value(&sync_token)?;
1191                    txn.set_kv_blob(&key, &value)?;
1192                }
1193
1194                for (event_type, event) in account_data {
1195                    let event_type =
1196                        this.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1197                    let data = this.serialize_json(&event)?;
1198                    txn.set_global_account_data(&event_type, &data)?;
1199                }
1200
1201                for (room_id, events) in room_account_data {
1202                    let room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1203                    for (event_type, event) in events {
1204                        let event_type =
1205                            this.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1206                        let data = this.serialize_json(&event)?;
1207                        txn.set_room_account_data(&room_id, &event_type, &data)?;
1208                    }
1209                }
1210
1211                for (user_id, event) in presence {
1212                    let key = this.encode_presence_key(&user_id);
1213                    let value = this.serialize_json(&event)?;
1214                    txn.set_kv_blob(&key, &value)?;
1215                }
1216
1217                for (room_id, room_info) in room_infos {
1218                    let stripped = room_info.state() == RoomState::Invited;
1219                    // Remove non-stripped data for stripped rooms and vice-versa.
1220                    this.remove_maybe_stripped_room_data(txn, &room_id, !stripped)?;
1221
1222                    let room_id = this.encode_key(keys::ROOM_INFO, room_id);
1223                    let state = this
1224                        .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
1225                    let data = this.serialize_json(&room_info)?;
1226                    txn.set_room_info(&room_id, &state, &data)?;
1227                }
1228
1229                for (room_id, user_ids) in profiles_to_delete {
1230                    let room_id = this.encode_key(keys::PROFILE, room_id);
1231                    for user_id in user_ids {
1232                        let user_id = this.encode_key(keys::PROFILE, user_id);
1233                        txn.remove_room_profile(&room_id, &user_id)?;
1234                    }
1235                }
1236
1237                for (room_id, state_event_types) in state {
1238                    let profiles = profiles.get(&room_id);
1239                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1240
1241                    for (event_type, state_events) in state_event_types {
1242                        let encoded_event_type =
1243                            this.encode_key(keys::STATE_EVENT, event_type.to_string());
1244
1245                        for (state_key, raw_state_event) in state_events {
1246                            let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1247                            let data = this.serialize_json(&raw_state_event)?;
1248
1249                            let event_id: Option<String> =
1250                                raw_state_event.get_field("event_id").ok().flatten();
1251                            let encoded_event_id =
1252                                event_id.as_ref().map(|e| this.encode_key(keys::STATE_EVENT, e));
1253
1254                            txn.set_state_event(
1255                                &encoded_room_id,
1256                                &encoded_event_type,
1257                                &encoded_state_key,
1258                                false,
1259                                encoded_event_id.as_deref(),
1260                                &data,
1261                            )?;
1262
1263                            if event_type == StateEventType::RoomMember {
1264                                let member_event = match raw_state_event
1265                                    .deserialize_as_unchecked::<SyncRoomMemberEvent>()
1266                                {
1267                                    Ok(ev) => ev,
1268                                    Err(e) => {
1269                                        debug!(event_id, "Failed to deserialize member event: {e}");
1270                                        continue;
1271                                    }
1272                                };
1273
1274                                let encoded_room_id = this.encode_key(keys::MEMBER, &room_id);
1275                                let user_id = this.encode_key(keys::MEMBER, &state_key);
1276                                let membership = this
1277                                    .encode_key(keys::MEMBER, member_event.membership().as_str());
1278                                let data = this.serialize_value(&state_key)?;
1279
1280                                txn.set_member(
1281                                    &encoded_room_id,
1282                                    &user_id,
1283                                    &membership,
1284                                    false,
1285                                    &data,
1286                                )?;
1287
1288                                if let Some(profile) =
1289                                    profiles.and_then(|p| p.get(member_event.state_key()))
1290                                {
1291                                    let room_id = this.encode_key(keys::PROFILE, &room_id);
1292                                    let user_id = this.encode_key(keys::PROFILE, &state_key);
1293                                    let data = this.serialize_json(&profile)?;
1294                                    txn.set_profile(&room_id, &user_id, &data)?;
1295                                }
1296                            }
1297                        }
1298                    }
1299                }
1300
1301                for (room_id, stripped_state_event_types) in stripped_state {
1302                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1303
1304                    for (event_type, stripped_state_events) in stripped_state_event_types {
1305                        let encoded_event_type =
1306                            this.encode_key(keys::STATE_EVENT, event_type.to_string());
1307
1308                        for (state_key, raw_stripped_state_event) in stripped_state_events {
1309                            let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1310                            let data = this.serialize_json(&raw_stripped_state_event)?;
1311                            txn.set_state_event(
1312                                &encoded_room_id,
1313                                &encoded_event_type,
1314                                &encoded_state_key,
1315                                true,
1316                                None,
1317                                &data,
1318                            )?;
1319
1320                            if event_type == StateEventType::RoomMember {
1321                                let member_event = match raw_stripped_state_event
1322                                    .deserialize_as_unchecked::<StrippedRoomMemberEvent>(
1323                                ) {
1324                                    Ok(ev) => ev,
1325                                    Err(e) => {
1326                                        debug!("Failed to deserialize stripped member event: {e}");
1327                                        continue;
1328                                    }
1329                                };
1330
1331                                let room_id = this.encode_key(keys::MEMBER, &room_id);
1332                                let user_id = this.encode_key(keys::MEMBER, &state_key);
1333                                let membership = this.encode_key(
1334                                    keys::MEMBER,
1335                                    member_event.content.membership.as_str(),
1336                                );
1337                                let data = this.serialize_value(&state_key)?;
1338
1339                                txn.set_member(&room_id, &user_id, &membership, true, &data)?;
1340                            }
1341                        }
1342                    }
1343                }
1344
1345                for (room_id, receipt_event) in receipts {
1346                    let room_id = this.encode_key(keys::RECEIPT, room_id);
1347
1348                    for (event_id, receipt_types) in receipt_event {
1349                        let encoded_event_id = this.encode_key(keys::RECEIPT, &event_id);
1350
1351                        for (receipt_type, receipt_users) in receipt_types {
1352                            let receipt_type =
1353                                this.encode_key(keys::RECEIPT, receipt_type.as_str());
1354
1355                            for (user_id, receipt) in receipt_users {
1356                                let encoded_user_id = this.encode_key(keys::RECEIPT, &user_id);
1357                                // We cannot have a NULL primary key so we rely on serialization
1358                                // instead of the string representation.
1359                                let thread = this.encode_key(
1360                                    keys::RECEIPT,
1361                                    rmp_serde::to_vec_named(&receipt.thread)?,
1362                                );
1363                                let data = this.serialize_json(&ReceiptData {
1364                                    receipt,
1365                                    event_id: event_id.clone(),
1366                                    user_id,
1367                                })?;
1368
1369                                txn.set_receipt(
1370                                    &room_id,
1371                                    &encoded_user_id,
1372                                    &receipt_type,
1373                                    &thread,
1374                                    &encoded_event_id,
1375                                    &data,
1376                                )?;
1377                            }
1378                        }
1379                    }
1380                }
1381
1382                for (room_id, redactions) in redactions {
1383                    let make_redaction_rules = || {
1384                        let encoded_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1385                        txn.get_room_info(&encoded_room_id)
1386                            .ok()
1387                            .flatten()
1388                            .and_then(|v| this.deserialize_json::<RoomInfo>(&v).ok())
1389                            .map(|info| info.room_version_rules_or_default())
1390                            .unwrap_or_else(|| {
1391                                warn!(
1392                                    ?room_id,
1393                                    "Unable to get the room version rules, defaulting to rules for room version {ROOM_VERSION_FALLBACK}"
1394                                );
1395                                ROOM_VERSION_RULES_FALLBACK
1396                            }).redaction
1397                    };
1398
1399                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1400                    let mut redaction_rules = None;
1401
1402                    for (event_id, redaction) in redactions {
1403                        let event_id = this.encode_key(keys::STATE_EVENT, event_id);
1404
1405                        if let Some(Ok(raw_event)) = txn
1406                            .get_state_event_by_id(&encoded_room_id, &event_id)?
1407                            .map(|value| this.deserialize_json::<Raw<AnySyncStateEvent>>(&value))
1408                        {
1409                            let event = raw_event.deserialize()?;
1410                            let redacted = redact(
1411                                raw_event.deserialize_as::<CanonicalJsonObject>()?,
1412                                redaction_rules.get_or_insert_with(make_redaction_rules),
1413                                Some(RedactedBecause::from_raw_event(&redaction)?),
1414                            )
1415                            .map_err(Error::Redaction)?;
1416                            let data = this.serialize_json(&redacted)?;
1417
1418                            let event_type =
1419                                this.encode_key(keys::STATE_EVENT, event.event_type().to_string());
1420                            let state_key = this.encode_key(keys::STATE_EVENT, event.state_key());
1421
1422                            txn.set_state_event(
1423                                &encoded_room_id,
1424                                &event_type,
1425                                &state_key,
1426                                false,
1427                                Some(&event_id),
1428                                &data,
1429                            )?;
1430                        }
1431                    }
1432                }
1433
1434                for (room_id, display_names) in ambiguity_maps {
1435                    let room_id = this.encode_key(keys::DISPLAY_NAME, room_id);
1436
1437                    for (name, user_ids) in display_names {
1438                        let encoded_name = this.encode_key(
1439                            keys::DISPLAY_NAME,
1440                            name.as_normalized_str().unwrap_or_else(|| name.as_raw_str()),
1441                        );
1442                        let data = this.serialize_json(&user_ids)?;
1443
1444                        if user_ids.is_empty() {
1445                            txn.remove_display_name(&room_id, &encoded_name)?;
1446
1447                            // We can't do a migration to merge the previously distinct buckets of
1448                            // user IDs since the display names themselves are hashed before they
1449                            // are persisted in the store. So the store will always retain two
1450                            // buckets: one for raw display names and one for normalised ones.
1451                            //
1452                            // We therefore do the next best thing, which is a sort of a soft
1453                            // migration: we fetch both the raw and normalised buckets, then merge
1454                            // the user IDs contained in them into a separate, temporary merged
1455                            // bucket. The SDK then operates on the merged buckets exclusively. See
1456                            // the comment in `get_users_with_display_names` for details.
1457                            //
1458                            // If the merged bucket is empty, that must mean that both the raw and
1459                            // normalised buckets were also empty, so we can remove both from the
1460                            // store.
1461                            let raw_name = this.encode_key(keys::DISPLAY_NAME, name.as_raw_str());
1462                            txn.remove_display_name(&room_id, &raw_name)?;
1463                        } else {
1464                            // We only create new buckets with the normalized display name.
1465                            txn.set_display_name(&room_id, &encoded_name, &data)?;
1466                        }
1467                    }
1468                }
1469
1470                Ok::<_, Error>(())
1471            })
1472            .await?;
1473
1474        Ok(())
1475    }
1476
1477    async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1478        self.read()
1479            .await?
1480            .get_kv_blob(self.encode_presence_key(user_id))
1481            .await?
1482            .map(|data| self.deserialize_json(&data))
1483            .transpose()
1484    }
1485
1486    async fn get_presence_events(
1487        &self,
1488        user_ids: &[OwnedUserId],
1489    ) -> Result<Vec<Raw<PresenceEvent>>> {
1490        if user_ids.is_empty() {
1491            return Ok(Vec::new());
1492        }
1493
1494        let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
1495        self.read()
1496            .await?
1497            .get_kv_blobs(user_ids)
1498            .await?
1499            .into_iter()
1500            .map(|data| self.deserialize_json(&data))
1501            .collect()
1502    }
1503
1504    async fn get_state_event(
1505        &self,
1506        room_id: &RoomId,
1507        event_type: StateEventType,
1508        state_key: &str,
1509    ) -> Result<Option<RawAnySyncOrStrippedState>> {
1510        Ok(self
1511            .get_state_events_for_keys(room_id, event_type, &[state_key])
1512            .await?
1513            .into_iter()
1514            .next())
1515    }
1516
1517    async fn get_state_events(
1518        &self,
1519        room_id: &RoomId,
1520        event_type: StateEventType,
1521    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1522        let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1523        let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1524        self.read()
1525            .await?
1526            .get_maybe_stripped_state_events(room_id, event_type)
1527            .await?
1528            .into_iter()
1529            .map(|(stripped, data)| {
1530                let ev = if stripped {
1531                    RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1532                } else {
1533                    RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1534                };
1535
1536                Ok(ev)
1537            })
1538            .collect()
1539    }
1540
1541    async fn get_state_events_for_keys(
1542        &self,
1543        room_id: &RoomId,
1544        event_type: StateEventType,
1545        state_keys: &[&str],
1546    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1547        if state_keys.is_empty() {
1548            return Ok(Vec::new());
1549        }
1550
1551        let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1552        let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1553        let state_keys = state_keys.iter().map(|k| self.encode_key(keys::STATE_EVENT, k)).collect();
1554        self.read()
1555            .await?
1556            .get_maybe_stripped_state_events_for_keys(room_id, event_type, state_keys)
1557            .await?
1558            .into_iter()
1559            .map(|(stripped, data)| {
1560                let ev = if stripped {
1561                    RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1562                } else {
1563                    RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1564                };
1565
1566                Ok(ev)
1567            })
1568            .collect()
1569    }
1570
1571    async fn get_profile(
1572        &self,
1573        room_id: &RoomId,
1574        user_id: &UserId,
1575    ) -> Result<Option<MinimalRoomMemberEvent>> {
1576        let room_id = self.encode_key(keys::PROFILE, room_id);
1577        let user_ids = vec![self.encode_key(keys::PROFILE, user_id)];
1578
1579        self.read()
1580            .await?
1581            .get_profiles(room_id, user_ids)
1582            .await?
1583            .into_iter()
1584            .next()
1585            .map(|(_, data)| self.deserialize_json(&data))
1586            .transpose()
1587    }
1588
1589    async fn get_profiles<'a>(
1590        &self,
1591        room_id: &RoomId,
1592        user_ids: &'a [OwnedUserId],
1593    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1594        if user_ids.is_empty() {
1595            return Ok(BTreeMap::new());
1596        }
1597
1598        let room_id = self.encode_key(keys::PROFILE, room_id);
1599        let mut user_ids_map = user_ids
1600            .iter()
1601            .map(|u| (self.encode_key(keys::PROFILE, u), u.as_ref()))
1602            .collect::<BTreeMap<_, _>>();
1603        let user_ids = user_ids_map.keys().cloned().collect();
1604
1605        self.read()
1606            .await?
1607            .get_profiles(room_id, user_ids)
1608            .await?
1609            .into_iter()
1610            .map(|(user_id, data)| {
1611                Ok((
1612                    user_ids_map
1613                        .remove(user_id.as_slice())
1614                        .expect("returned user IDs were requested"),
1615                    self.deserialize_json(&data)?,
1616                ))
1617            })
1618            .collect()
1619    }
1620
1621    async fn get_user_ids(
1622        &self,
1623        room_id: &RoomId,
1624        membership: RoomMemberships,
1625    ) -> Result<Vec<OwnedUserId>> {
1626        let room_id = self.encode_key(keys::MEMBER, room_id);
1627        let memberships = membership
1628            .as_vec()
1629            .into_iter()
1630            .map(|m| self.encode_key(keys::MEMBER, m.as_str()))
1631            .collect();
1632        self.read()
1633            .await?
1634            .get_user_ids(room_id, memberships)
1635            .await?
1636            .iter()
1637            .map(|data| self.deserialize_value(data))
1638            .collect()
1639    }
1640
1641    async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1642        self.read()
1643            .await?
1644            .get_room_infos(match room_load_settings {
1645                RoomLoadSettings::All => None,
1646                RoomLoadSettings::One(room_id) => Some(self.encode_key(keys::ROOM_INFO, room_id)),
1647            })
1648            .await?
1649            .into_iter()
1650            .map(|data| self.deserialize_json(&data))
1651            .collect()
1652    }
1653
1654    async fn get_users_with_display_name(
1655        &self,
1656        room_id: &RoomId,
1657        display_name: &DisplayName,
1658    ) -> Result<BTreeSet<OwnedUserId>> {
1659        let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1660        let names = vec![self.encode_key(
1661            keys::DISPLAY_NAME,
1662            display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1663        )];
1664
1665        Ok(self
1666            .read()
1667            .await?
1668            .get_display_names(room_id, names)
1669            .await?
1670            .into_iter()
1671            .next()
1672            .map(|(_, data)| self.deserialize_json(&data))
1673            .transpose()?
1674            .unwrap_or_default())
1675    }
1676
1677    async fn get_users_with_display_names<'a>(
1678        &self,
1679        room_id: &RoomId,
1680        display_names: &'a [DisplayName],
1681    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1682        let mut result = HashMap::new();
1683
1684        if display_names.is_empty() {
1685            return Ok(result);
1686        }
1687
1688        let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1689        let mut names_map = display_names
1690            .iter()
1691            .flat_map(|display_name| {
1692                // We encode the display name as the `raw_str()` and the normalized string.
1693                //
1694                // This is for compatibility reasons since:
1695                //  1. Previously "Alice" and "alice" were considered to be distinct display
1696                //     names, while we now consider them to be the same so we need to merge the
1697                //     previously distinct buckets of user IDs.
1698                //  2. We can't do a migration to merge the previously distinct buckets of user
1699                //     IDs since the display names itself are hashed before they are persisted
1700                //     in the store.
1701                let raw =
1702                    (self.encode_key(keys::DISPLAY_NAME, display_name.as_raw_str()), display_name);
1703                let normalized = display_name.as_normalized_str().map(|normalized| {
1704                    (self.encode_key(keys::DISPLAY_NAME, normalized), display_name)
1705                });
1706
1707                iter::once(raw).chain(normalized)
1708            })
1709            .collect::<BTreeMap<_, _>>();
1710        let names = names_map.keys().cloned().collect();
1711
1712        for (name, data) in self.read().await?.get_display_names(room_id, names).await?.into_iter()
1713        {
1714            let display_name =
1715                names_map.remove(name.as_slice()).expect("returned display names were requested");
1716            let user_ids: BTreeSet<_> = self.deserialize_json(&data)?;
1717
1718            result.entry(display_name).or_insert_with(BTreeSet::new).extend(user_ids);
1719        }
1720
1721        Ok(result)
1722    }
1723
1724    async fn get_account_data_event(
1725        &self,
1726        event_type: GlobalAccountDataEventType,
1727    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1728        let event_type = self.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1729        self.read()
1730            .await?
1731            .get_global_account_data(event_type)
1732            .await?
1733            .map(|value| self.deserialize_json(&value))
1734            .transpose()
1735    }
1736
1737    async fn get_room_account_data_event(
1738        &self,
1739        room_id: &RoomId,
1740        event_type: RoomAccountDataEventType,
1741    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1742        let room_id = self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1743        let event_type = self.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1744        self.read()
1745            .await?
1746            .get_room_account_data(room_id, event_type)
1747            .await?
1748            .map(|value| self.deserialize_json(&value))
1749            .transpose()
1750    }
1751
1752    async fn get_user_room_receipt_event(
1753        &self,
1754        room_id: &RoomId,
1755        receipt_type: ReceiptType,
1756        thread: ReceiptThread,
1757        user_id: &UserId,
1758    ) -> Result<Option<(OwnedEventId, Receipt)>> {
1759        let room_id = self.encode_key(keys::RECEIPT, room_id);
1760        let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1761        // We cannot have a NULL primary key so we rely on serialization instead of the
1762        // string representation.
1763        let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1764        let user_id = self.encode_key(keys::RECEIPT, user_id);
1765
1766        self.read()
1767            .await?
1768            .get_user_receipt(room_id, receipt_type, thread, user_id)
1769            .await?
1770            .map(|value| {
1771                self.deserialize_json::<ReceiptData>(&value).map(|d| (d.event_id, d.receipt))
1772            })
1773            .transpose()
1774    }
1775
1776    async fn get_event_room_receipt_events(
1777        &self,
1778        room_id: &RoomId,
1779        receipt_type: ReceiptType,
1780        thread: ReceiptThread,
1781        event_id: &EventId,
1782    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1783        let room_id = self.encode_key(keys::RECEIPT, room_id);
1784        let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1785        // We cannot have a NULL primary key so we rely on serialization instead of the
1786        // string representation.
1787        let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1788        let event_id = self.encode_key(keys::RECEIPT, event_id);
1789
1790        self.read()
1791            .await?
1792            .get_event_receipts(room_id, receipt_type, thread, event_id)
1793            .await?
1794            .iter()
1795            .map(|value| {
1796                self.deserialize_json::<ReceiptData>(value).map(|d| (d.user_id, d.receipt))
1797            })
1798            .collect()
1799    }
1800
1801    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1802        self.read().await?.get_kv_blob(self.encode_custom_key(key)).await
1803    }
1804
1805    async fn set_custom_value_no_read(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
1806        let conn = self.write().await;
1807        let key = self.encode_custom_key(key);
1808        conn.set_kv_blob(key, value).await?;
1809        Ok(())
1810    }
1811
1812    async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1813        let conn = self.write().await;
1814        let key = self.encode_custom_key(key);
1815        let previous = conn.get_kv_blob(key.clone()).await?;
1816        conn.set_kv_blob(key, value).await?;
1817        Ok(previous)
1818    }
1819
1820    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1821        let conn = self.write().await;
1822        let key = self.encode_custom_key(key);
1823        let previous = conn.get_kv_blob(key.clone()).await?;
1824        if previous.is_some() {
1825            conn.delete_kv_blob(key).await?;
1826        }
1827        Ok(previous)
1828    }
1829
1830    async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1831        let this = self.clone();
1832        let room_id = room_id.to_owned();
1833
1834        let conn = self.write().await;
1835
1836        conn.with_transaction(move |txn| -> Result<()> {
1837            let room_info_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1838            txn.remove_room_info(&room_info_room_id)?;
1839
1840            let state_event_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1841            txn.remove_room_state_events(&state_event_room_id, None)?;
1842
1843            let member_room_id = this.encode_key(keys::MEMBER, &room_id);
1844            txn.remove_room_members(&member_room_id, None)?;
1845
1846            let profile_room_id = this.encode_key(keys::PROFILE, &room_id);
1847            txn.remove_room_profiles(&profile_room_id)?;
1848
1849            let room_account_data_room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, &room_id);
1850            txn.remove_room_account_data(&room_account_data_room_id)?;
1851
1852            let receipt_room_id = this.encode_key(keys::RECEIPT, &room_id);
1853            txn.remove_room_receipts(&receipt_room_id)?;
1854
1855            let display_name_room_id = this.encode_key(keys::DISPLAY_NAME, &room_id);
1856            txn.remove_room_display_names(&display_name_room_id)?;
1857
1858            let send_queue_room_id = this.encode_key(keys::SEND_QUEUE, &room_id);
1859            txn.remove_room_send_queue(&send_queue_room_id)?;
1860
1861            let dependent_send_queue_room_id =
1862                this.encode_key(keys::DEPENDENTS_SEND_QUEUE, &room_id);
1863            txn.remove_room_dependent_send_queue(&dependent_send_queue_room_id)?;
1864
1865            let thread_subscriptions_room_id =
1866                this.encode_key(keys::THREAD_SUBSCRIPTIONS, &room_id);
1867            txn.execute(
1868                "DELETE FROM thread_subscriptions WHERE room_id = ?",
1869                (thread_subscriptions_room_id,),
1870            )?;
1871
1872            Ok(())
1873        })
1874        .await?;
1875
1876        conn.vacuum().await
1877    }
1878
1879    async fn save_send_queue_request(
1880        &self,
1881        room_id: &RoomId,
1882        transaction_id: OwnedTransactionId,
1883        created_at: MilliSecondsSinceUnixEpoch,
1884        content: QueuedRequestKind,
1885        priority: usize,
1886    ) -> Result<(), Self::Error> {
1887        let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
1888        let room_id_value = self.serialize_value(&room_id.to_owned())?;
1889
1890        let content = self.serialize_json(&content)?;
1891        // The transaction id is used both as a key (in remove/update) and a value (as
1892        // it's useful for the callers), so we keep it as is, and neither hash
1893        // it (with encode_key) or encrypt it (through serialize_value). After
1894        // all, it carries no personal information, so this is considered fine.
1895
1896        let created_at_ts: u64 = created_at.0.into();
1897        self.write()
1898            .await
1899            .with_transaction(move |txn| {
1900                txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
1901                Ok(())
1902            })
1903            .await
1904    }
1905
1906    async fn update_send_queue_request(
1907        &self,
1908        room_id: &RoomId,
1909        transaction_id: &TransactionId,
1910        content: QueuedRequestKind,
1911    ) -> Result<bool, Self::Error> {
1912        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1913
1914        let content = self.serialize_json(&content)?;
1915        // See comment in [`Self::save_send_queue_request`] to understand why the
1916        // transaction id is neither encrypted or hashed.
1917        let transaction_id = transaction_id.to_string();
1918
1919        let num_updated = self.write()
1920            .await
1921            .with_transaction(move |txn| {
1922                txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
1923            })
1924            .await?;
1925
1926        Ok(num_updated > 0)
1927    }
1928
1929    async fn remove_send_queue_request(
1930        &self,
1931        room_id: &RoomId,
1932        transaction_id: &TransactionId,
1933    ) -> Result<bool, Self::Error> {
1934        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1935
1936        // See comment in `save_send_queue_request`.
1937        let transaction_id = transaction_id.to_string();
1938
1939        let num_deleted = self
1940            .write()
1941            .await
1942            .with_transaction(move |txn| {
1943                txn.prepare_cached(
1944                    "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
1945                )?
1946                .execute((room_id, &transaction_id))
1947            })
1948            .await?;
1949
1950        Ok(num_deleted > 0)
1951    }
1952
1953    async fn load_send_queue_requests(
1954        &self,
1955        room_id: &RoomId,
1956    ) -> Result<Vec<QueuedRequest>, Self::Error> {
1957        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1958
1959        // Note: ROWID is always present and is an auto-incremented integer counter. We
1960        // want to maintain the insertion order, so we can sort using it.
1961        // Note 2: transaction_id is not encoded, see why in `save_send_queue_request`.
1962        let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, Option<u64>)> = self
1963            .read()
1964            .await?
1965            .prepare(
1966                "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
1967                |mut stmt| {
1968                    stmt.query((room_id,))?
1969                        .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
1970                        .collect()
1971                },
1972            )
1973            .await?;
1974
1975        let mut requests = Vec::with_capacity(res.len());
1976
1977        for entry in res {
1978            let created_at = entry
1979                .4
1980                .and_then(UInt::new)
1981                .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
1982
1983            requests.push(QueuedRequest {
1984                transaction_id: entry.0.into(),
1985                kind: self.deserialize_json(&entry.1)?,
1986                error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
1987                priority: entry.3,
1988                created_at,
1989            });
1990        }
1991
1992        Ok(requests)
1993    }
1994
1995    async fn update_send_queue_request_status(
1996        &self,
1997        room_id: &RoomId,
1998        transaction_id: &TransactionId,
1999        error: Option<QueueWedgeError>,
2000    ) -> Result<(), Self::Error> {
2001        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
2002
2003        // See comment in `save_send_queue_request`.
2004        let transaction_id = transaction_id.to_string();
2005
2006        // Serialize the error to json bytes (encrypted if option is enabled) if set.
2007        let error_value = error.map(|e| self.serialize_value(&e)).transpose()?;
2008
2009        self.write()
2010            .await
2011            .with_transaction(move |txn| {
2012                txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?;
2013                Ok(())
2014            })
2015            .await
2016    }
2017
2018    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
2019        // If the values were not encrypted, we could use `SELECT DISTINCT` here, but we
2020        // have to manually do the deduplication: indeed, for all X, encrypt(X)
2021        // != encrypted(X), since we use a nonce in the encryption process.
2022
2023        let res: Vec<Vec<u8>> = self
2024            .read()
2025            .await?
2026            .prepare("SELECT room_id_val FROM send_queue_events", |mut stmt| {
2027                stmt.query(())?.mapped(|row| row.get(0)).collect()
2028            })
2029            .await?;
2030
2031        // So we collect the results into a `BTreeSet` to perform the deduplication, and
2032        // then rejigger that into a vector.
2033        Ok(res
2034            .into_iter()
2035            .map(|entry| self.deserialize_value(&entry))
2036            .collect::<Result<BTreeSet<OwnedRoomId>, _>>()?
2037            .into_iter()
2038            .collect())
2039    }
2040
2041    async fn save_dependent_queued_request(
2042        &self,
2043        room_id: &RoomId,
2044        parent_txn_id: &TransactionId,
2045        own_txn_id: ChildTransactionId,
2046        created_at: MilliSecondsSinceUnixEpoch,
2047        content: DependentQueuedRequestKind,
2048    ) -> Result<()> {
2049        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2050        let content = self.serialize_json(&content)?;
2051
2052        // See comment in `save_send_queue_request`.
2053        let parent_txn_id = parent_txn_id.to_string();
2054        let own_txn_id = own_txn_id.to_string();
2055
2056        let created_at_ts: u64 = created_at.0.into();
2057        self.write()
2058            .await
2059            .with_transaction(move |txn| {
2060                txn.prepare_cached(
2061                    r#"INSERT INTO dependent_send_queue_events
2062                         (room_id, parent_transaction_id, own_transaction_id, content, created_at)
2063                       VALUES (?, ?, ?, ?, ?)"#,
2064                )?
2065                .execute((
2066                    room_id,
2067                    parent_txn_id,
2068                    own_txn_id,
2069                    content,
2070                    created_at_ts,
2071                ))?;
2072                Ok(())
2073            })
2074            .await
2075    }
2076
2077    async fn update_dependent_queued_request(
2078        &self,
2079        room_id: &RoomId,
2080        own_transaction_id: &ChildTransactionId,
2081        new_content: DependentQueuedRequestKind,
2082    ) -> Result<bool> {
2083        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2084        let content = self.serialize_json(&new_content)?;
2085
2086        // See comment in `save_send_queue_request`.
2087        let own_txn_id = own_transaction_id.to_string();
2088
2089        let num_updated = self
2090            .write()
2091            .await
2092            .with_transaction(move |txn| {
2093                txn.prepare_cached(
2094                    r#"UPDATE dependent_send_queue_events
2095                       SET content = ?
2096                       WHERE own_transaction_id = ?
2097                       AND room_id = ?"#,
2098                )?
2099                .execute((content, own_txn_id, room_id))
2100            })
2101            .await?;
2102
2103        if num_updated > 1 {
2104            return Err(Error::InconsistentUpdate);
2105        }
2106
2107        Ok(num_updated == 1)
2108    }
2109
2110    async fn mark_dependent_queued_requests_as_ready(
2111        &self,
2112        room_id: &RoomId,
2113        parent_txn_id: &TransactionId,
2114        parent_key: SentRequestKey,
2115    ) -> Result<usize> {
2116        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2117        let parent_key = self.serialize_json(&parent_key)?;
2118
2119        // See comment in `save_send_queue_request`.
2120        let parent_txn_id = parent_txn_id.to_string();
2121
2122        self.write()
2123            .await
2124            .with_transaction(move |txn| {
2125                Ok(txn.prepare_cached(
2126                    "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?",
2127                )?
2128                .execute((parent_key, parent_txn_id, room_id))?)
2129            })
2130            .await
2131    }
2132
2133    async fn remove_dependent_queued_request(
2134        &self,
2135        room_id: &RoomId,
2136        txn_id: &ChildTransactionId,
2137    ) -> Result<bool> {
2138        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2139
2140        // See comment in `save_send_queue_request`.
2141        let txn_id = txn_id.to_string();
2142
2143        let num_deleted = self
2144            .write()
2145            .await
2146            .with_transaction(move |txn| {
2147                txn.prepare_cached(
2148                    "DELETE FROM dependent_send_queue_events WHERE own_transaction_id = ? AND room_id = ?",
2149                )?
2150                .execute((txn_id, room_id))
2151            })
2152            .await?;
2153
2154        Ok(num_deleted > 0)
2155    }
2156
2157    async fn load_dependent_queued_requests(
2158        &self,
2159        room_id: &RoomId,
2160    ) -> Result<Vec<DependentQueuedRequest>> {
2161        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2162
2163        // Note: transaction_id is not encoded, see why in `save_send_queue_request`.
2164        let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, Option<u64>)> = self
2165            .read()
2166            .await?
2167            .prepare(
2168                "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2169                |mut stmt| {
2170                    stmt.query((room_id,))?
2171                        .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2172                        .collect()
2173                },
2174            )
2175            .await?;
2176
2177        let mut dependent_events = Vec::with_capacity(res.len());
2178
2179        for entry in res {
2180            let created_at = entry
2181                .4
2182                .and_then(UInt::new)
2183                .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2184
2185            dependent_events.push(DependentQueuedRequest {
2186                own_transaction_id: entry.0.into(),
2187                parent_transaction_id: entry.1.into(),
2188                parent_key: entry.2.map(|json| self.deserialize_json(&json)).transpose()?,
2189                kind: self.deserialize_json(&entry.3)?,
2190                created_at,
2191            });
2192        }
2193
2194        Ok(dependent_events)
2195    }
2196
2197    async fn upsert_thread_subscriptions(
2198        &self,
2199        updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
2200    ) -> Result<(), Self::Error> {
2201        let values: Vec<_> = updates
2202            .into_iter()
2203            .map(|(room_id, thread_id, subscription)| {
2204                (
2205                    self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id),
2206                    self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id),
2207                    subscription.status.as_str(),
2208                    subscription.bump_stamp,
2209                )
2210            })
2211            .collect();
2212
2213        self.write()
2214            .await
2215            .with_transaction(move |txn| {
2216                let mut txn = txn.prepare_cached(
2217                    "INSERT INTO thread_subscriptions (room_id, event_id, status, bump_stamp) 
2218                    VALUES (?, ?, ?, ?)
2219                    ON CONFLICT (room_id, event_id) DO UPDATE 
2220                    SET 
2221                        status = 
2222                            CASE
2223                                WHEN thread_subscriptions.bump_stamp IS NULL THEN EXCLUDED.status
2224                                WHEN EXCLUDED.bump_stamp IS NULL THEN EXCLUDED.status
2225                                WHEN thread_subscriptions.bump_stamp < EXCLUDED.bump_stamp THEN EXCLUDED.status
2226                                ELSE thread_subscriptions.status
2227                            END, 
2228                        bump_stamp = 
2229                            CASE
2230                                WHEN thread_subscriptions.bump_stamp IS NULL THEN EXCLUDED.bump_stamp
2231                                WHEN EXCLUDED.bump_stamp IS NULL THEN thread_subscriptions.bump_stamp
2232                                WHEN thread_subscriptions.bump_stamp < EXCLUDED.bump_stamp THEN EXCLUDED.bump_stamp
2233                                ELSE thread_subscriptions.bump_stamp
2234                            END",
2235                )?;
2236
2237                for value in values {
2238                    txn.execute(value)?;
2239                }
2240
2241                Result::<_, Error>::Ok(())
2242            })
2243            .await?;
2244
2245        Ok(())
2246    }
2247
2248    async fn load_thread_subscription(
2249        &self,
2250        room_id: &RoomId,
2251        thread_id: &EventId,
2252    ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
2253        let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2254        let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2255
2256        Ok(self
2257            .read()
2258            .await?
2259            .query_row(
2260                "SELECT status, bump_stamp FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2261                (room_id, thread_id),
2262                |row| Ok((row.get::<_, String>(0)?, row.get::<_, Option<u64>>(1)?))
2263            )
2264            .await
2265            .optional()?
2266            .map(|(status, bump_stamp)| -> Result<_, Self::Error> {
2267                let status = ThreadSubscriptionStatus::from_str(&status).map_err(|_| {
2268                    Error::InvalidData { details: format!("Invalid thread status: {status}") }
2269                })?;
2270                Ok(StoredThreadSubscription { status, bump_stamp })
2271            })
2272            .transpose()?)
2273    }
2274
2275    async fn remove_thread_subscription(
2276        &self,
2277        room_id: &RoomId,
2278        thread_id: &EventId,
2279    ) -> Result<(), Self::Error> {
2280        let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2281        let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2282
2283        self.write()
2284            .await
2285            .execute(
2286                "DELETE FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2287                (room_id, thread_id),
2288            )
2289            .await?;
2290
2291        Ok(())
2292    }
2293
2294    async fn optimize(&self) -> Result<(), Self::Error> {
2295        Ok(self.vacuum().await?)
2296    }
2297
2298    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
2299        self.get_db_size().await
2300    }
2301}
2302
2303#[derive(Debug, Clone, Serialize, Deserialize)]
2304struct ReceiptData {
2305    receipt: Receipt,
2306    event_id: OwnedEventId,
2307    user_id: OwnedUserId,
2308}
2309
2310#[cfg(test)]
2311mod tests {
2312    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2313
2314    use matrix_sdk_base::{StateStore, StoreError, statestore_integration_tests};
2315    use once_cell::sync::Lazy;
2316    use tempfile::{TempDir, tempdir};
2317
2318    use super::SqliteStateStore;
2319
2320    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2321    static NUM: AtomicU32 = AtomicU32::new(0);
2322
2323    async fn get_store() -> Result<impl StateStore, StoreError> {
2324        let name = NUM.fetch_add(1, SeqCst).to_string();
2325        let tmpdir_path = TMP_DIR.path().join(name);
2326
2327        tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2328
2329        Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
2330    }
2331
2332    statestore_integration_tests!();
2333}
2334
2335#[cfg(test)]
2336mod encrypted_tests {
2337    use std::{
2338        path::PathBuf,
2339        sync::atomic::{AtomicU32, Ordering::SeqCst},
2340    };
2341
2342    use matrix_sdk_base::{StateStore, StoreError, statestore_integration_tests};
2343    use matrix_sdk_test::async_test;
2344    use once_cell::sync::Lazy;
2345    use tempfile::{TempDir, tempdir};
2346
2347    use super::SqliteStateStore;
2348    use crate::{SqliteStoreConfig, utils::SqliteAsyncConnExt};
2349
2350    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2351    static NUM: AtomicU32 = AtomicU32::new(0);
2352
2353    fn new_state_store_workspace() -> PathBuf {
2354        let name = NUM.fetch_add(1, SeqCst).to_string();
2355        TMP_DIR.path().join(name)
2356    }
2357
2358    async fn get_store() -> Result<impl StateStore, StoreError> {
2359        let tmpdir_path = new_state_store_workspace();
2360
2361        tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2362
2363        Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
2364            .await
2365            .unwrap())
2366    }
2367
2368    #[async_test]
2369    async fn test_pool_size() {
2370        let tmpdir_path = new_state_store_workspace();
2371        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
2372
2373        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2374
2375        assert_eq!(store.pool.status().max_size, 42);
2376    }
2377
2378    #[async_test]
2379    async fn test_cache_size() {
2380        let tmpdir_path = new_state_store_workspace();
2381        let store_open_config = SqliteStoreConfig::new(tmpdir_path).cache_size(1500);
2382
2383        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2384
2385        let conn = store.pool.get().await.unwrap();
2386        let cache_size =
2387            conn.query_row("PRAGMA cache_size", (), |row| row.get::<_, i32>(0)).await.unwrap();
2388
2389        // The value passed to `SqliteStoreConfig` is in bytes. Check it is
2390        // converted to kibibytes. Also, it must be a negative value because it
2391        // _is_ the size in kibibytes, not in page size.
2392        assert_eq!(cache_size, -(1500 / 1024));
2393    }
2394
2395    #[async_test]
2396    async fn test_journal_size_limit() {
2397        let tmpdir_path = new_state_store_workspace();
2398        let store_open_config = SqliteStoreConfig::new(tmpdir_path).journal_size_limit(1500);
2399
2400        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2401
2402        let conn = store.pool.get().await.unwrap();
2403        let journal_size_limit = conn
2404            .query_row("PRAGMA journal_size_limit", (), |row| row.get::<_, u32>(0))
2405            .await
2406            .unwrap();
2407
2408        // The value passed to `SqliteStoreConfig` is in bytes. It stays in
2409        // bytes in SQLite.
2410        assert_eq!(journal_size_limit, 1500);
2411    }
2412
2413    statestore_integration_tests!();
2414}
2415
2416#[cfg(test)]
2417mod migration_tests {
2418    use std::{
2419        path::{Path, PathBuf},
2420        sync::{
2421            Arc,
2422            atomic::{AtomicU32, Ordering::SeqCst},
2423        },
2424    };
2425
2426    use as_variant::as_variant;
2427    use matrix_sdk_base::{
2428        RoomState, StateStore,
2429        media::{MediaFormat, MediaRequestParameters},
2430        store::{
2431            ChildTransactionId, DependentQueuedRequestKind, RoomLoadSettings,
2432            SerializableEventContent,
2433        },
2434        sync::UnreadNotificationsCount,
2435    };
2436    use matrix_sdk_test::async_test;
2437    use once_cell::sync::Lazy;
2438    use ruma::{
2439        EventId, MilliSecondsSinceUnixEpoch, OwnedTransactionId, RoomId, TransactionId, UserId,
2440        events::{
2441            StateEventType,
2442            room::{MediaSource, create::RoomCreateEventContent, message::RoomMessageEventContent},
2443        },
2444        room_id, server_name, user_id,
2445    };
2446    use rusqlite::Transaction;
2447    use serde::{Deserialize, Serialize};
2448    use serde_json::json;
2449    use tempfile::{TempDir, tempdir};
2450    use tokio::{fs, sync::Mutex};
2451    use zeroize::Zeroizing;
2452
2453    use super::{DATABASE_NAME, SqliteStateStore, init, keys};
2454    use crate::{
2455        OpenStoreError, Secret, SqliteStoreConfig,
2456        error::{Error, Result},
2457        utils::{EncryptableStore as _, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
2458    };
2459
2460    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2461    static NUM: AtomicU32 = AtomicU32::new(0);
2462    const SECRET: &str = "secret";
2463
2464    fn new_path() -> PathBuf {
2465        let name = NUM.fetch_add(1, SeqCst).to_string();
2466        TMP_DIR.path().join(name)
2467    }
2468
2469    async fn create_fake_db(path: &Path, version: u8) -> Result<SqliteStateStore> {
2470        let config = SqliteStoreConfig::new(path);
2471
2472        fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir).unwrap();
2473
2474        let pool = config.build_pool_of_connections(DATABASE_NAME).unwrap();
2475        let conn = pool.get().await?;
2476
2477        init(&conn).await?;
2478
2479        let store_cipher = Some(Arc::new(
2480            conn.get_or_create_store_cipher(Secret::PassPhrase(Zeroizing::new(SECRET.to_owned())))
2481                .await
2482                .unwrap(),
2483        ));
2484        let this = SqliteStateStore {
2485            store_cipher,
2486            pool,
2487            // Use `conn` as our selected write connections.
2488            write_connection: Arc::new(Mutex::new(conn)),
2489        };
2490        this.run_migrations(1, Some(version)).await?;
2491
2492        Ok(this)
2493    }
2494
2495    fn room_info_v1_json(
2496        room_id: &RoomId,
2497        state: RoomState,
2498        name: Option<&str>,
2499        creator: Option<&UserId>,
2500    ) -> serde_json::Value {
2501        // Test with name set or not.
2502        let name_content = match name {
2503            Some(name) => json!({ "name": name }),
2504            None => json!({ "name": null }),
2505        };
2506        // Test with creator set or not.
2507        let create_content = match creator {
2508            Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2509            None => RoomCreateEventContent::new_v11(),
2510        };
2511
2512        json!({
2513            "room_id": room_id,
2514            "room_type": state,
2515            "notification_counts": UnreadNotificationsCount::default(),
2516            "summary": {
2517                "heroes": [],
2518                "joined_member_count": 0,
2519                "invited_member_count": 0,
2520            },
2521            "members_synced": false,
2522            "base_info": {
2523                "dm_targets": [],
2524                "max_power_level": 100,
2525                "name": {
2526                    "Original": {
2527                        "content": name_content,
2528                    },
2529                },
2530                "create": {
2531                    "Original": {
2532                        "content": create_content,
2533                    }
2534                }
2535            },
2536        })
2537    }
2538
2539    #[async_test]
2540    pub async fn test_migrating_v1_to_v2() {
2541        let path = new_path();
2542        // Create and populate db.
2543        {
2544            let db = create_fake_db(&path, 1).await.unwrap();
2545            let conn = db.pool.get().await.unwrap();
2546
2547            let this = db.clone();
2548            conn.with_transaction(move |txn| {
2549                for i in 0..5 {
2550                    let room_id = RoomId::parse(format!("!room_{i}:localhost")).unwrap();
2551                    let (state, stripped) =
2552                        if i < 3 { (RoomState::Joined, false) } else { (RoomState::Invited, true) };
2553                    let info = room_info_v1_json(&room_id, state, None, None);
2554
2555                    let room_id = this.encode_key(keys::ROOM_INFO, room_id);
2556                    let data = this.serialize_json(&info)?;
2557
2558                    txn.prepare_cached(
2559                        "INSERT INTO room_info (room_id, stripped, data)
2560                         VALUES (?, ?, ?)",
2561                    )?
2562                    .execute((room_id, stripped, data))?;
2563                }
2564
2565                Result::<_, Error>::Ok(())
2566            })
2567            .await
2568            .unwrap();
2569        }
2570
2571        // This transparently migrates to the latest version.
2572        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2573
2574        // Check all room infos are there.
2575        assert_eq!(store.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 5);
2576    }
2577
2578    // Add a room in version 2 format of the state store.
2579    fn add_room_v2(
2580        this: &SqliteStateStore,
2581        txn: &Transaction<'_>,
2582        room_id: &RoomId,
2583        name: Option<&str>,
2584        create_creator: Option<&UserId>,
2585        create_sender: Option<&UserId>,
2586    ) -> Result<(), Error> {
2587        let room_info_json = room_info_v1_json(room_id, RoomState::Joined, name, create_creator);
2588
2589        let encoded_room_id = this.encode_key(keys::ROOM_INFO, room_id);
2590        let encoded_state =
2591            this.encode_key(keys::ROOM_INFO, serde_json::to_string(&RoomState::Joined)?);
2592        let data = this.serialize_json(&room_info_json)?;
2593
2594        txn.prepare_cached(
2595            "INSERT INTO room_info (room_id, state, data)
2596             VALUES (?, ?, ?)",
2597        )?
2598        .execute((encoded_room_id, encoded_state, data))?;
2599
2600        // Test with or without `m.room.create` event in the room state.
2601        let Some(create_sender) = create_sender else {
2602            return Ok(());
2603        };
2604
2605        let create_content = match create_creator {
2606            Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2607            None => RoomCreateEventContent::new_v11(),
2608        };
2609
2610        let event_id = EventId::new(server_name!("dummy.local"));
2611        let create_event = json!({
2612            "content": create_content,
2613            "event_id": event_id,
2614            "sender": create_sender.to_owned(),
2615            "origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
2616            "state_key": "",
2617            "type": "m.room.create",
2618            "unsigned": {},
2619        });
2620
2621        let encoded_room_id = this.encode_key(keys::STATE_EVENT, room_id);
2622        let encoded_event_type =
2623            this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
2624        let encoded_state_key = this.encode_key(keys::STATE_EVENT, "");
2625        let stripped = false;
2626        let encoded_event_id = this.encode_key(keys::STATE_EVENT, event_id);
2627        let data = this.serialize_json(&create_event)?;
2628
2629        txn.prepare_cached(
2630            "INSERT
2631             INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
2632             VALUES (?, ?, ?, ?, ?, ?)",
2633        )?
2634        .execute((
2635            encoded_room_id,
2636            encoded_event_type,
2637            encoded_state_key,
2638            stripped,
2639            encoded_event_id,
2640            data,
2641        ))?;
2642
2643        Ok(())
2644    }
2645
2646    #[async_test]
2647    pub async fn test_migrating_v2_to_v3() {
2648        let path = new_path();
2649
2650        // Room A: with name, creator and sender.
2651        let room_a_id = room_id!("!room_a:dummy.local");
2652        let room_a_name = "Room A";
2653        let room_a_creator = user_id!("@creator:dummy.local");
2654        // Use a different sender to check that sender is used over creator in
2655        // migration.
2656        let room_a_create_sender = user_id!("@sender:dummy.local");
2657
2658        // Room B: without name, creator and sender.
2659        let room_b_id = room_id!("!room_b:dummy.local");
2660
2661        // Room C: only with sender.
2662        let room_c_id = room_id!("!room_c:dummy.local");
2663        let room_c_create_sender = user_id!("@creator:dummy.local");
2664
2665        // Create and populate db.
2666        {
2667            let db = create_fake_db(&path, 2).await.unwrap();
2668            let conn = db.pool.get().await.unwrap();
2669
2670            let this = db.clone();
2671            conn.with_transaction(move |txn| {
2672                add_room_v2(
2673                    &this,
2674                    txn,
2675                    room_a_id,
2676                    Some(room_a_name),
2677                    Some(room_a_creator),
2678                    Some(room_a_create_sender),
2679                )?;
2680                add_room_v2(&this, txn, room_b_id, None, None, None)?;
2681                add_room_v2(&this, txn, room_c_id, None, None, Some(room_c_create_sender))?;
2682
2683                Result::<_, Error>::Ok(())
2684            })
2685            .await
2686            .unwrap();
2687        }
2688
2689        // This transparently migrates to the latest version.
2690        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2691
2692        // Check all room infos are there.
2693        let room_infos = store.get_room_infos(&RoomLoadSettings::default()).await.unwrap();
2694        assert_eq!(room_infos.len(), 3);
2695
2696        let room_a = room_infos.iter().find(|r| r.room_id() == room_a_id).unwrap();
2697        assert_eq!(room_a.name(), Some(room_a_name));
2698        assert_eq!(room_a.creators(), Some(vec![room_a_create_sender.to_owned()]));
2699
2700        let room_b = room_infos.iter().find(|r| r.room_id() == room_b_id).unwrap();
2701        assert_eq!(room_b.name(), None);
2702        assert_eq!(room_b.creators(), None);
2703
2704        let room_c = room_infos.iter().find(|r| r.room_id() == room_c_id).unwrap();
2705        assert_eq!(room_c.name(), None);
2706        assert_eq!(room_c.creators(), Some(vec![room_c_create_sender.to_owned()]));
2707    }
2708
2709    #[async_test]
2710    pub async fn test_migrating_v7_to_v9() {
2711        let path = new_path();
2712
2713        let room_id = room_id!("!room_a:dummy.local");
2714        let wedged_event_transaction_id = TransactionId::new();
2715        let local_event_transaction_id = TransactionId::new();
2716
2717        // Create and populate db.
2718        {
2719            let db = create_fake_db(&path, 7).await.unwrap();
2720            let conn = db.pool.get().await.unwrap();
2721
2722            let wedge_tx = wedged_event_transaction_id.clone();
2723            let local_tx = local_event_transaction_id.clone();
2724
2725            conn.with_transaction(move |txn| {
2726                add_dependent_send_queue_event_v7(
2727                    &db,
2728                    txn,
2729                    room_id,
2730                    &local_tx,
2731                    ChildTransactionId::new(),
2732                    DependentQueuedRequestKind::RedactEvent,
2733                )?;
2734                add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?;
2735                add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?;
2736                Result::<_, Error>::Ok(())
2737            })
2738            .await
2739            .unwrap();
2740        }
2741
2742        // This transparently migrates to the latest version, which clears up all
2743        // requests and dependent requests.
2744        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2745
2746        let requests = store.load_send_queue_requests(room_id).await.unwrap();
2747        assert!(requests.is_empty());
2748
2749        let dependent_requests = store.load_dependent_queued_requests(room_id).await.unwrap();
2750        assert!(dependent_requests.is_empty());
2751    }
2752
2753    fn add_send_queue_event_v7(
2754        this: &SqliteStateStore,
2755        txn: &Transaction<'_>,
2756        transaction_id: &TransactionId,
2757        room_id: &RoomId,
2758        is_wedged: bool,
2759    ) -> Result<(), Error> {
2760        let content =
2761            SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?;
2762
2763        let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id);
2764        let room_id_value = this.serialize_value(&room_id.to_owned())?;
2765
2766        let content = this.serialize_json(&content)?;
2767
2768        txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
2769            .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
2770
2771        Ok(())
2772    }
2773
2774    fn add_dependent_send_queue_event_v7(
2775        this: &SqliteStateStore,
2776        txn: &Transaction<'_>,
2777        room_id: &RoomId,
2778        parent_txn_id: &TransactionId,
2779        own_txn_id: ChildTransactionId,
2780        content: DependentQueuedRequestKind,
2781    ) -> Result<(), Error> {
2782        let room_id_value = this.serialize_value(&room_id.to_owned())?;
2783
2784        let parent_txn_id = parent_txn_id.to_string();
2785        let own_txn_id = own_txn_id.to_string();
2786        let content = this.serialize_json(&content)?;
2787
2788        txn.prepare_cached(
2789            "INSERT INTO dependent_send_queue_events
2790                         (room_id, parent_transaction_id, own_transaction_id, content)
2791                       VALUES (?, ?, ?, ?)",
2792        )?
2793        .execute((room_id_value, parent_txn_id, own_txn_id, content))?;
2794
2795        Ok(())
2796    }
2797
2798    #[derive(Clone, Debug, Serialize, Deserialize)]
2799    pub enum LegacyDependentQueuedRequestKind {
2800        UploadFileWithThumbnail {
2801            content_type: String,
2802            cache_key: MediaRequestParameters,
2803            related_to: OwnedTransactionId,
2804        },
2805    }
2806
2807    #[async_test]
2808    pub async fn test_dependent_queued_request_variant_renaming() {
2809        let path = new_path();
2810        let db = create_fake_db(&path, 7).await.unwrap();
2811
2812        let cache_key = MediaRequestParameters {
2813            format: MediaFormat::File,
2814            source: MediaSource::Plain("https://server.local/foobar".into()),
2815        };
2816        let related_to = TransactionId::new();
2817        let request = LegacyDependentQueuedRequestKind::UploadFileWithThumbnail {
2818            content_type: "image/png".to_owned(),
2819            cache_key,
2820            related_to: related_to.clone(),
2821        };
2822
2823        let data = db
2824            .serialize_json(&request)
2825            .expect("should be able to serialize legacy dependent request");
2826        let deserialized: DependentQueuedRequestKind = db.deserialize_json(&data).expect(
2827            "should be able to deserialize dependent request from legacy dependent request",
2828        );
2829
2830        as_variant!(deserialized, DependentQueuedRequestKind::UploadFileOrThumbnail { related_to: de_related_to, .. } => {
2831            assert_eq!(de_related_to, related_to);
2832        });
2833    }
2834}