matrix_sdk_sqlite/
state_store.rs

1use std::{
2    borrow::Cow,
3    collections::{BTreeMap, BTreeSet, HashMap},
4    fmt, iter,
5    path::Path,
6    sync::Arc,
7};
8
9use async_trait::async_trait;
10use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
11use matrix_sdk_base::{
12    deserialized_responses::{DisplayName, RawAnySyncOrStrippedState, SyncOrStrippedState},
13    store::{
14        migration_helpers::RoomInfoV1, ChildTransactionId, DependentQueuedRequest,
15        DependentQueuedRequestKind, QueueWedgeError, QueuedRequest, QueuedRequestKind,
16        RoomLoadSettings, SentRequestKey, ThreadSubscription,
17    },
18    MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateChanges, StateStore,
19    StateStoreDataKey, StateStoreDataValue, ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK,
20};
21use matrix_sdk_store_encryption::StoreCipher;
22use ruma::{
23    canonical_json::{redact, RedactedBecause},
24    events::{
25        presence::PresenceEvent,
26        receipt::{Receipt, ReceiptThread, ReceiptType},
27        room::{
28            create::RoomCreateEventContent,
29            member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
30        },
31        AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
32        GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
33    },
34    serde::Raw,
35    CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
36    OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
37};
38use rusqlite::{OptionalExtension, Transaction};
39use serde::{Deserialize, Serialize};
40use tokio::fs;
41use tracing::{debug, warn};
42
43use crate::{
44    error::{Error, Result},
45    utils::{
46        repeat_vars, EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
47        SqliteKeyValueStoreConnExt,
48    },
49    OpenStoreError, SqliteStoreConfig,
50};
51
52mod keys {
53    // Tables
54    pub const KV_BLOB: &str = "kv_blob";
55    pub const ROOM_INFO: &str = "room_info";
56    pub const STATE_EVENT: &str = "state_event";
57    pub const GLOBAL_ACCOUNT_DATA: &str = "global_account_data";
58    pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
59    pub const MEMBER: &str = "member";
60    pub const PROFILE: &str = "profile";
61    pub const RECEIPT: &str = "receipt";
62    pub const DISPLAY_NAME: &str = "display_name";
63    pub const SEND_QUEUE: &str = "send_queue_events";
64    pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
65    pub const THREAD_SUBSCRIPTIONS: &str = "thread_subscriptions";
66}
67
68/// The filename used for the SQLITE database file used by the state store.
69pub const DATABASE_NAME: &str = "matrix-sdk-state.sqlite3";
70
71/// Identifier of the latest database version.
72///
73/// This is used to figure whether the SQLite database requires a migration.
74/// Every new SQL migration should imply a bump of this number, and changes in
75/// the [`SqliteStateStore::run_migrations`] function.
76const DATABASE_VERSION: u8 = 13;
77
78/// An SQLite-based state store.
79#[derive(Clone)]
80pub struct SqliteStateStore {
81    store_cipher: Option<Arc<StoreCipher>>,
82    pool: SqlitePool,
83}
84
85#[cfg(not(tarpaulin_include))]
86impl fmt::Debug for SqliteStateStore {
87    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88        f.debug_struct("SqliteStateStore").finish_non_exhaustive()
89    }
90}
91
92impl SqliteStateStore {
93    /// Open the SQLite-based state store at the given path using the given
94    /// passphrase to encrypt private data.
95    pub async fn open(
96        path: impl AsRef<Path>,
97        passphrase: Option<&str>,
98    ) -> Result<Self, OpenStoreError> {
99        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
100    }
101
102    /// Open the SQLite-based state store with the config open config.
103    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
104        let SqliteStoreConfig { path, passphrase, pool_config, runtime_config } = config;
105
106        fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
107
108        let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
109        config.pool = Some(pool_config);
110
111        let pool = config.create_pool(Runtime::Tokio1)?;
112
113        let this = Self::open_with_pool(pool, passphrase.as_deref()).await?;
114        this.pool.get().await?.apply_runtime_config(runtime_config).await?;
115
116        Ok(this)
117    }
118
119    /// Create an SQLite-based state store using the given SQLite database pool.
120    /// The given passphrase will be used to encrypt private data.
121    pub async fn open_with_pool(
122        pool: SqlitePool,
123        passphrase: Option<&str>,
124    ) -> Result<Self, OpenStoreError> {
125        let conn = pool.get().await?;
126
127        let mut version = conn.db_version().await?;
128
129        if version == 0 {
130            init(&conn).await?;
131            version = 1;
132        }
133
134        let store_cipher = match passphrase {
135            Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
136            None => None,
137        };
138        let this = Self { store_cipher, pool };
139        this.run_migrations(&conn, version, None).await?;
140
141        Ok(this)
142    }
143
144    /// Run database migrations from the given `from` version to the given `to`
145    /// version
146    ///
147    /// If `to` is `None`, the current database version will be used.
148    async fn run_migrations(&self, conn: &SqliteAsyncConn, from: u8, to: Option<u8>) -> Result<()> {
149        let to = to.unwrap_or(DATABASE_VERSION);
150
151        if from < to {
152            debug!(version = from, new_version = to, "Upgrading database");
153        } else {
154            return Ok(());
155        }
156
157        if from < 2 && to >= 2 {
158            let this = self.clone();
159            conn.with_transaction(move |txn| {
160                // Create new table.
161                txn.execute_batch(include_str!(
162                    "../migrations/state_store/002_a_create_new_room_info.sql"
163                ))?;
164
165                // Migrate data to new table.
166                for data in txn
167                    .prepare("SELECT data FROM room_info")?
168                    .query_map((), |row| row.get::<_, Vec<u8>>(0))?
169                {
170                    let data = data?;
171                    let room_info: RoomInfoV1 = this.deserialize_json(&data)?;
172
173                    let room_id = this.encode_key(keys::ROOM_INFO, room_info.room_id());
174                    let state = this
175                        .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
176                    txn.prepare_cached(
177                        "INSERT OR REPLACE INTO new_room_info (room_id, state, data)
178                         VALUES (?, ?, ?)",
179                    )?
180                    .execute((room_id, state, data))?;
181                }
182
183                // Replace old table.
184                txn.execute_batch(include_str!(
185                    "../migrations/state_store/002_b_replace_room_info.sql"
186                ))?;
187
188                txn.set_db_version(2)?;
189                Result::<_, Error>::Ok(())
190            })
191            .await?;
192        }
193
194        // Migration to v3: RoomInfo format has changed.
195        if from < 3 && to >= 3 {
196            let this = self.clone();
197            conn.with_transaction(move |txn| {
198                // Migrate data .
199                for data in txn
200                    .prepare("SELECT data FROM room_info")?
201                    .query_map((), |row| row.get::<_, Vec<u8>>(0))?
202                {
203                    let data = data?;
204                    let room_info_v1: RoomInfoV1 = this.deserialize_json(&data)?;
205
206                    // Get the `m.room.create` event from the room state.
207                    let room_id = this.encode_key(keys::STATE_EVENT, room_info_v1.room_id());
208                    let event_type =
209                        this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
210                    let create_res = txn
211                        .prepare(
212                            "SELECT stripped, data FROM state_event
213                             WHERE room_id = ? AND event_type = ?",
214                        )?
215                        .query_row([room_id, event_type], |row| {
216                            Ok((row.get::<_, bool>(0)?, row.get::<_, Vec<u8>>(1)?))
217                        })
218                        .optional()?;
219
220                    let create = create_res.and_then(|(stripped, data)| {
221                        let create = if stripped {
222                            SyncOrStrippedState::<RoomCreateEventContent>::Stripped(
223                                this.deserialize_json(&data).ok()?,
224                            )
225                        } else {
226                            SyncOrStrippedState::Sync(this.deserialize_json(&data).ok()?)
227                        };
228                        Some(create)
229                    });
230
231                    let migrated_room_info = room_info_v1.migrate(create.as_ref());
232
233                    let data = this.serialize_json(&migrated_room_info)?;
234                    let room_id = this.encode_key(keys::ROOM_INFO, migrated_room_info.room_id());
235                    txn.prepare_cached("UPDATE room_info SET data = ? WHERE room_id = ?")?
236                        .execute((data, room_id))?;
237                }
238
239                txn.set_db_version(3)?;
240                Result::<_, Error>::Ok(())
241            })
242            .await?;
243        }
244
245        if from < 4 && to >= 4 {
246            conn.with_transaction(move |txn| {
247                // Create new table.
248                txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
249                txn.set_db_version(4)
250            })
251            .await?;
252        }
253
254        if from < 5 && to >= 5 {
255            conn.with_transaction(move |txn| {
256                // Create new table.
257                txn.execute_batch(include_str!(
258                    "../migrations/state_store/004_send_queue_with_roomid_value.sql"
259                ))?;
260                txn.set_db_version(4)
261            })
262            .await?;
263        }
264
265        if from < 6 && to >= 6 {
266            conn.with_transaction(move |txn| {
267                // Create new table.
268                txn.execute_batch(include_str!(
269                    "../migrations/state_store/005_send_queue_dependent_events.sql"
270                ))?;
271                txn.set_db_version(6)
272            })
273            .await?;
274        }
275
276        if from < 7 && to >= 7 {
277            conn.with_transaction(move |txn| {
278                // Drop media table.
279                txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
280                txn.set_db_version(7)
281            })
282            .await?;
283        }
284
285        if from < 8 && to >= 8 {
286            // Replace all existing wedged events with a generic error.
287            let error = QueueWedgeError::GenericApiError {
288                msg: "local echo failed to send in a previous session".into(),
289            };
290            let default_err = self.serialize_value(&error)?;
291
292            conn.with_transaction(move |txn| {
293                // Update send queue table to persist the wedge reason if any.
294                txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?;
295
296                // Migrate the data, add a generic error for currently wedged events
297
298                for wedged_entries in txn
299                    .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")?
300                    .query_map((), |row| {
301                        Ok(
302                            (row.get::<_, Vec<u8>>(0)?,row.get::<_, String>(1)?)
303                        )
304                    })? {
305
306                    let (room_id, transaction_id) = wedged_entries?;
307
308                    txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?
309                        .execute((default_err.clone(), room_id, transaction_id))?;
310                }
311
312
313                // Clean up the table now that data is migrated
314                txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?;
315
316                txn.set_db_version(8)
317            })
318                .await?;
319        }
320
321        if from < 9 && to >= 9 {
322            conn.with_transaction(move |txn| {
323                // Run the migration.
324                txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?;
325                txn.set_db_version(9)
326            })
327            .await?;
328        }
329
330        if from < 10 && to >= 10 {
331            conn.with_transaction(move |txn| {
332                // Run the migration.
333                txn.execute_batch(include_str!(
334                    "../migrations/state_store/009_send_queue_priority.sql"
335                ))?;
336                txn.set_db_version(10)
337            })
338            .await?;
339        }
340
341        if from < 11 && to >= 11 {
342            conn.with_transaction(move |txn| {
343                // Run the migration.
344                txn.execute_batch(include_str!(
345                    "../migrations/state_store/010_send_queue_enqueue_time.sql"
346                ))?;
347                txn.set_db_version(11)
348            })
349            .await?;
350        }
351
352        if from < 12 && to >= 12 {
353            // Defragment the DB and optimize its size on the filesystem.
354            // This should have been run in the migration for version 7, to reduce the size
355            // of the DB as we removed the media cache.
356            conn.vacuum().await?;
357            conn.set_kv("version", vec![12]).await?;
358        }
359
360        if from < 13 && to >= 13 {
361            conn.with_transaction(move |txn| {
362                // Run the migration.
363                txn.execute_batch(include_str!(
364                    "../migrations/state_store/011_thread_subscriptions.sql"
365                ))?;
366                txn.set_db_version(13)
367            })
368            .await?;
369        }
370
371        Ok(())
372    }
373
374    fn encode_state_store_data_key(&self, key: StateStoreDataKey<'_>) -> Key {
375        let key_s = match key {
376            StateStoreDataKey::SyncToken => Cow::Borrowed(StateStoreDataKey::SYNC_TOKEN),
377            StateStoreDataKey::ServerInfo => Cow::Borrowed(StateStoreDataKey::SERVER_INFO),
378            StateStoreDataKey::Filter(f) => {
379                Cow::Owned(format!("{}:{f}", StateStoreDataKey::FILTER))
380            }
381            StateStoreDataKey::UserAvatarUrl(u) => {
382                Cow::Owned(format!("{}:{u}", StateStoreDataKey::USER_AVATAR_URL))
383            }
384            StateStoreDataKey::RecentlyVisitedRooms(b) => {
385                Cow::Owned(format!("{}:{b}", StateStoreDataKey::RECENTLY_VISITED_ROOMS))
386            }
387            StateStoreDataKey::UtdHookManagerData => {
388                Cow::Borrowed(StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
389            }
390            StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
391                Cow::Borrowed(StateStoreDataKey::ONE_TIME_KEY_ALREADY_UPLOADED)
392            }
393            StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
394                if let Some(thread_root) = thread_root {
395                    Cow::Owned(format!(
396                        "{}:{room_id}:{thread_root}",
397                        StateStoreDataKey::COMPOSER_DRAFT
398                    ))
399                } else {
400                    Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::COMPOSER_DRAFT))
401                }
402            }
403            StateStoreDataKey::SeenKnockRequests(room_id) => {
404                Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::SEEN_KNOCK_REQUESTS))
405            }
406        };
407
408        self.encode_key(keys::KV_BLOB, &*key_s)
409    }
410
411    fn encode_presence_key(&self, user_id: &UserId) -> Key {
412        self.encode_key(keys::KV_BLOB, format!("presence:{user_id}"))
413    }
414
415    fn encode_custom_key(&self, key: &[u8]) -> Key {
416        let mut full_key = b"custom:".to_vec();
417        full_key.extend(key);
418        self.encode_key(keys::KV_BLOB, full_key)
419    }
420
421    async fn acquire(&self) -> Result<SqliteAsyncConn> {
422        Ok(self.pool.get().await?)
423    }
424
425    fn remove_maybe_stripped_room_data(
426        &self,
427        txn: &Transaction<'_>,
428        room_id: &RoomId,
429        stripped: bool,
430    ) -> rusqlite::Result<()> {
431        let state_event_room_id = self.encode_key(keys::STATE_EVENT, room_id);
432        txn.remove_room_state_events(&state_event_room_id, Some(stripped))?;
433
434        let member_room_id = self.encode_key(keys::MEMBER, room_id);
435        txn.remove_room_members(&member_room_id, Some(stripped))
436    }
437}
438
439impl EncryptableStore for SqliteStateStore {
440    fn get_cypher(&self) -> Option<&StoreCipher> {
441        self.store_cipher.as_deref()
442    }
443}
444
445/// Initialize the database.
446async fn init(conn: &SqliteAsyncConn) -> Result<()> {
447    // First turn on WAL mode, this can't be done in the transaction, it fails with
448    // the error message: "cannot change into wal mode from within a transaction".
449    conn.execute_batch("PRAGMA journal_mode = wal;").await?;
450    conn.with_transaction(|txn| {
451        txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
452        txn.set_db_version(1)?;
453
454        Ok(())
455    })
456    .await
457}
458
459trait SqliteConnectionStateStoreExt {
460    fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()>;
461
462    fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()>;
463
464    fn set_room_account_data(
465        &self,
466        room_id: &[u8],
467        event_type: &[u8],
468        data: &[u8],
469    ) -> rusqlite::Result<()>;
470    fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()>;
471
472    fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()>;
473    fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>>;
474    fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()>;
475
476    fn set_state_event(
477        &self,
478        room_id: &[u8],
479        event_type: &[u8],
480        state_key: &[u8],
481        stripped: bool,
482        event_id: Option<&[u8]>,
483        data: &[u8],
484    ) -> rusqlite::Result<()>;
485    fn get_state_event_by_id(
486        &self,
487        room_id: &[u8],
488        event_id: &[u8],
489    ) -> rusqlite::Result<Option<Vec<u8>>>;
490    fn remove_room_state_events(
491        &self,
492        room_id: &[u8],
493        stripped: Option<bool>,
494    ) -> rusqlite::Result<()>;
495
496    fn set_member(
497        &self,
498        room_id: &[u8],
499        user_id: &[u8],
500        membership: &[u8],
501        stripped: bool,
502        data: &[u8],
503    ) -> rusqlite::Result<()>;
504    fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()>;
505
506    fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
507    fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()>;
508    fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()>;
509
510    fn set_receipt(
511        &self,
512        room_id: &[u8],
513        user_id: &[u8],
514        receipt_type: &[u8],
515        thread_id: &[u8],
516        event_id: &[u8],
517        data: &[u8],
518    ) -> rusqlite::Result<()>;
519    fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()>;
520
521    fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()>;
522    fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()>;
523    fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()>;
524    fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
525    fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
526}
527
528impl SqliteConnectionStateStoreExt for rusqlite::Connection {
529    fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()> {
530        self.execute("INSERT OR REPLACE INTO kv_blob VALUES (?, ?)", (key, value))?;
531        Ok(())
532    }
533
534    fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()> {
535        self.prepare_cached(
536            "INSERT OR REPLACE INTO global_account_data (event_type, data)
537             VALUES (?, ?)",
538        )?
539        .execute((event_type, data))?;
540        Ok(())
541    }
542
543    fn set_room_account_data(
544        &self,
545        room_id: &[u8],
546        event_type: &[u8],
547        data: &[u8],
548    ) -> rusqlite::Result<()> {
549        self.prepare_cached(
550            "INSERT OR REPLACE INTO room_account_data (room_id, event_type, data)
551             VALUES (?, ?, ?)",
552        )?
553        .execute((room_id, event_type, data))?;
554        Ok(())
555    }
556
557    fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()> {
558        self.prepare(
559            "DELETE FROM room_account_data
560             WHERE room_id = ?",
561        )?
562        .execute((room_id,))?;
563        Ok(())
564    }
565
566    fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()> {
567        self.prepare_cached(
568            "INSERT OR REPLACE INTO room_info (room_id, state, data)
569             VALUES (?, ?, ?)",
570        )?
571        .execute((room_id, state, data))?;
572        Ok(())
573    }
574
575    fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>> {
576        self.query_row("SELECT data FROM room_info WHERE room_id = ?", (room_id,), |row| row.get(0))
577            .optional()
578    }
579
580    /// Remove the room info for the given room.
581    fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()> {
582        self.prepare_cached("DELETE FROM room_info WHERE room_id = ?")?.execute((room_id,))?;
583        Ok(())
584    }
585
586    fn set_state_event(
587        &self,
588        room_id: &[u8],
589        event_type: &[u8],
590        state_key: &[u8],
591        stripped: bool,
592        event_id: Option<&[u8]>,
593        data: &[u8],
594    ) -> rusqlite::Result<()> {
595        self.prepare_cached(
596            "INSERT OR REPLACE
597             INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
598             VALUES (?, ?, ?, ?, ?, ?)",
599        )?
600        .execute((room_id, event_type, state_key, stripped, event_id, data))?;
601        Ok(())
602    }
603
604    fn get_state_event_by_id(
605        &self,
606        room_id: &[u8],
607        event_id: &[u8],
608    ) -> rusqlite::Result<Option<Vec<u8>>> {
609        self.query_row(
610            "SELECT data FROM state_event WHERE room_id = ? AND event_id = ?",
611            (room_id, event_id),
612            |row| row.get(0),
613        )
614        .optional()
615    }
616
617    /// Remove state events for the given room.
618    ///
619    /// If `stripped` is `Some()`, only removes state events for the given
620    /// stripped state. Otherwise, state events are removed regardless of the
621    /// stripped state.
622    fn remove_room_state_events(
623        &self,
624        room_id: &[u8],
625        stripped: Option<bool>,
626    ) -> rusqlite::Result<()> {
627        if let Some(stripped) = stripped {
628            self.prepare_cached("DELETE FROM state_event WHERE room_id = ? AND stripped = ?")?
629                .execute((room_id, stripped))?;
630        } else {
631            self.prepare_cached("DELETE FROM state_event WHERE room_id = ?")?
632                .execute((room_id,))?;
633        }
634        Ok(())
635    }
636
637    fn set_member(
638        &self,
639        room_id: &[u8],
640        user_id: &[u8],
641        membership: &[u8],
642        stripped: bool,
643        data: &[u8],
644    ) -> rusqlite::Result<()> {
645        self.prepare_cached(
646            "INSERT OR REPLACE
647             INTO member (room_id, user_id, membership, stripped, data)
648             VALUES (?, ?, ?, ?, ?)",
649        )?
650        .execute((room_id, user_id, membership, stripped, data))?;
651        Ok(())
652    }
653
654    /// Remove members for the given room.
655    ///
656    /// If `stripped` is `Some()`, only removes members for the given stripped
657    /// state. Otherwise, members are removed regardless of the stripped state.
658    fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()> {
659        if let Some(stripped) = stripped {
660            self.prepare_cached("DELETE FROM member WHERE room_id = ? AND stripped = ?")?
661                .execute((room_id, stripped))?;
662        } else {
663            self.prepare_cached("DELETE FROM member WHERE room_id = ?")?.execute((room_id,))?;
664        }
665        Ok(())
666    }
667
668    fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
669        self.prepare_cached(
670            "INSERT OR REPLACE
671             INTO profile (room_id, user_id, data)
672             VALUES (?, ?, ?)",
673        )?
674        .execute((room_id, user_id, data))?;
675        Ok(())
676    }
677
678    fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()> {
679        self.prepare("DELETE FROM profile WHERE room_id = ?")?.execute((room_id,))?;
680        Ok(())
681    }
682
683    fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()> {
684        self.prepare("DELETE FROM profile WHERE room_id = ? AND user_id = ?")?
685            .execute((room_id, user_id))?;
686        Ok(())
687    }
688
689    fn set_receipt(
690        &self,
691        room_id: &[u8],
692        user_id: &[u8],
693        receipt_type: &[u8],
694        thread: &[u8],
695        event_id: &[u8],
696        data: &[u8],
697    ) -> rusqlite::Result<()> {
698        self.prepare_cached(
699            "INSERT OR REPLACE
700             INTO receipt (room_id, user_id, receipt_type, thread, event_id, data)
701             VALUES (?, ?, ?, ?, ?, ?)",
702        )?
703        .execute((room_id, user_id, receipt_type, thread, event_id, data))?;
704        Ok(())
705    }
706
707    fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()> {
708        self.prepare("DELETE FROM receipt WHERE room_id = ?")?.execute((room_id,))?;
709        Ok(())
710    }
711
712    fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
713        self.prepare_cached(
714            "INSERT OR REPLACE
715             INTO display_name (room_id, name, data)
716             VALUES (?, ?, ?)",
717        )?
718        .execute((room_id, name, data))?;
719        Ok(())
720    }
721
722    fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()> {
723        self.prepare("DELETE FROM display_name WHERE room_id = ? AND name = ?")?
724            .execute((room_id, name))?;
725        Ok(())
726    }
727
728    fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()> {
729        self.prepare("DELETE FROM display_name WHERE room_id = ?")?.execute((room_id,))?;
730        Ok(())
731    }
732
733    fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
734        self.prepare("DELETE FROM send_queue_events WHERE room_id = ?")?.execute((room_id,))?;
735        Ok(())
736    }
737
738    fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
739        self.prepare("DELETE FROM dependent_send_queue_events WHERE room_id = ?")?
740            .execute((room_id,))?;
741        Ok(())
742    }
743}
744
745#[async_trait]
746trait SqliteObjectStateStoreExt: SqliteAsyncConnExt {
747    async fn get_kv_blob(&self, key: Key) -> Result<Option<Vec<u8>>> {
748        Ok(self
749            .query_row("SELECT value FROM kv_blob WHERE key = ?", (key,), |row| row.get(0))
750            .await
751            .optional()?)
752    }
753
754    async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
755        let keys_length = keys.len();
756
757        self.chunk_large_query_over(keys, Some(keys_length), |txn, keys| {
758            let sql_params = repeat_vars(keys.len());
759            let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");
760
761            let params = rusqlite::params_from_iter(keys);
762
763            Ok(txn
764                .prepare(&sql)?
765                .query(params)?
766                .mapped(|row| row.get(0))
767                .collect::<Result<_, _>>()?)
768        })
769        .await
770    }
771
772    async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;
773
774    async fn delete_kv_blob(&self, key: Key) -> Result<()> {
775        self.execute("DELETE FROM kv_blob WHERE key = ?", (key,)).await?;
776        Ok(())
777    }
778
779    async fn get_room_infos(&self, room_id: Option<Key>) -> Result<Vec<Vec<u8>>> {
780        Ok(match room_id {
781            None => {
782                self.prepare("SELECT data FROM room_info", move |mut stmt| {
783                    stmt.query_map((), |row| row.get(0))?.collect()
784                })
785                .await?
786            }
787
788            Some(room_id) => {
789                self.prepare("SELECT data FROM room_info WHERE room_id = ?", move |mut stmt| {
790                    stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
791                })
792                .await?
793            }
794        })
795    }
796
797    async fn get_maybe_stripped_state_events_for_keys(
798        &self,
799        room_id: Key,
800        event_type: Key,
801        state_keys: Vec<Key>,
802    ) -> Result<Vec<(bool, Vec<u8>)>> {
803        self.chunk_large_query_over(state_keys, None, move |txn, state_keys: Vec<Key>| {
804            let sql_params = repeat_vars(state_keys.len());
805            let sql = format!(
806                "SELECT stripped, data FROM state_event
807                 WHERE room_id = ? AND event_type = ? AND state_key IN ({sql_params})"
808            );
809
810            let params = rusqlite::params_from_iter(
811                [room_id.clone(), event_type.clone()].into_iter().chain(state_keys),
812            );
813
814            Ok(txn
815                .prepare(&sql)?
816                .query(params)?
817                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
818                .collect::<Result<_, _>>()?)
819        })
820        .await
821    }
822
823    async fn get_maybe_stripped_state_events(
824        &self,
825        room_id: Key,
826        event_type: Key,
827    ) -> Result<Vec<(bool, Vec<u8>)>> {
828        Ok(self
829            .prepare(
830                "SELECT stripped, data FROM state_event
831                 WHERE room_id = ? AND event_type = ?",
832                |mut stmt| {
833                    stmt.query((room_id, event_type))?
834                        .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
835                        .collect()
836                },
837            )
838            .await?)
839    }
840
841    async fn get_profiles(
842        &self,
843        room_id: Key,
844        user_ids: Vec<Key>,
845    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
846        let user_ids_length = user_ids.len();
847
848        self.chunk_large_query_over(user_ids, Some(user_ids_length), move |txn, user_ids| {
849            let sql_params = repeat_vars(user_ids.len());
850            let sql = format!(
851                "SELECT user_id, data FROM profile WHERE room_id = ? AND user_id IN ({sql_params})"
852            );
853
854            let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(user_ids));
855
856            Ok(txn
857                .prepare(&sql)?
858                .query(params)?
859                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
860                .collect::<Result<_, _>>()?)
861        })
862        .await
863    }
864
865    async fn get_user_ids(&self, room_id: Key, memberships: Vec<Key>) -> Result<Vec<Vec<u8>>> {
866        let res = if memberships.is_empty() {
867            self.prepare("SELECT data FROM member WHERE room_id = ?", |mut stmt| {
868                stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
869            })
870            .await?
871        } else {
872            self.chunk_large_query_over(memberships, None, move |txn, memberships| {
873                let sql_params = repeat_vars(memberships.len());
874                let sql = format!(
875                    "SELECT data FROM member WHERE room_id = ? AND membership IN ({sql_params})"
876                );
877
878                let params =
879                    rusqlite::params_from_iter(iter::once(room_id.clone()).chain(memberships));
880
881                Ok(txn
882                    .prepare(&sql)?
883                    .query(params)?
884                    .mapped(|row| row.get(0))
885                    .collect::<Result<_, _>>()?)
886            })
887            .await?
888        };
889
890        Ok(res)
891    }
892
893    async fn get_global_account_data(&self, event_type: Key) -> Result<Option<Vec<u8>>> {
894        Ok(self
895            .query_row(
896                "SELECT data FROM global_account_data WHERE event_type = ?",
897                (event_type,),
898                |row| row.get(0),
899            )
900            .await
901            .optional()?)
902    }
903
904    async fn get_room_account_data(
905        &self,
906        room_id: Key,
907        event_type: Key,
908    ) -> Result<Option<Vec<u8>>> {
909        Ok(self
910            .query_row(
911                "SELECT data FROM room_account_data WHERE room_id = ? AND event_type = ?",
912                (room_id, event_type),
913                |row| row.get(0),
914            )
915            .await
916            .optional()?)
917    }
918
919    async fn get_display_names(
920        &self,
921        room_id: Key,
922        names: Vec<Key>,
923    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
924        let names_length = names.len();
925
926        self.chunk_large_query_over(names, Some(names_length), move |txn, names| {
927            let sql_params = repeat_vars(names.len());
928            let sql = format!(
929                "SELECT name, data FROM display_name WHERE room_id = ? AND name IN ({sql_params})"
930            );
931
932            let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(names));
933
934            Ok(txn
935                .prepare(&sql)?
936                .query(params)?
937                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
938                .collect::<Result<_, _>>()?)
939        })
940        .await
941    }
942
943    async fn get_user_receipt(
944        &self,
945        room_id: Key,
946        receipt_type: Key,
947        thread: Key,
948        user_id: Key,
949    ) -> Result<Option<Vec<u8>>> {
950        Ok(self
951            .query_row(
952                "SELECT data FROM receipt
953                 WHERE room_id = ? AND receipt_type = ? AND thread = ? and user_id = ?",
954                (room_id, receipt_type, thread, user_id),
955                |row| row.get(0),
956            )
957            .await
958            .optional()?)
959    }
960
961    async fn get_event_receipts(
962        &self,
963        room_id: Key,
964        receipt_type: Key,
965        thread: Key,
966        event_id: Key,
967    ) -> Result<Vec<Vec<u8>>> {
968        Ok(self
969            .prepare(
970                "SELECT data FROM receipt
971                 WHERE room_id = ? AND receipt_type = ? AND thread = ? and event_id = ?",
972                |mut stmt| {
973                    stmt.query((room_id, receipt_type, thread, event_id))?
974                        .mapped(|row| row.get(0))
975                        .collect()
976                },
977            )
978            .await?)
979    }
980}
981
982#[async_trait]
983impl SqliteObjectStateStoreExt for SqliteAsyncConn {
984    async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()> {
985        Ok(self.interact(move |conn| conn.set_kv_blob(&key, &value)).await.unwrap()?)
986    }
987}
988
989#[async_trait]
990impl StateStore for SqliteStateStore {
991    type Error = Error;
992
993    async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
994        self.acquire()
995            .await?
996            .get_kv_blob(self.encode_state_store_data_key(key))
997            .await?
998            .map(|data| {
999                Ok(match key {
1000                    StateStoreDataKey::SyncToken => {
1001                        StateStoreDataValue::SyncToken(self.deserialize_value(&data)?)
1002                    }
1003                    StateStoreDataKey::ServerInfo => {
1004                        StateStoreDataValue::ServerInfo(self.deserialize_value(&data)?)
1005                    }
1006                    StateStoreDataKey::Filter(_) => {
1007                        StateStoreDataValue::Filter(self.deserialize_value(&data)?)
1008                    }
1009                    StateStoreDataKey::UserAvatarUrl(_) => {
1010                        StateStoreDataValue::UserAvatarUrl(self.deserialize_value(&data)?)
1011                    }
1012                    StateStoreDataKey::RecentlyVisitedRooms(_) => {
1013                        StateStoreDataValue::RecentlyVisitedRooms(self.deserialize_value(&data)?)
1014                    }
1015                    StateStoreDataKey::UtdHookManagerData => {
1016                        StateStoreDataValue::UtdHookManagerData(self.deserialize_value(&data)?)
1017                    }
1018                    StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1019                        StateStoreDataValue::OneTimeKeyAlreadyUploaded
1020                    }
1021                    StateStoreDataKey::ComposerDraft(_, _) => {
1022                        StateStoreDataValue::ComposerDraft(self.deserialize_value(&data)?)
1023                    }
1024                    StateStoreDataKey::SeenKnockRequests(_) => {
1025                        StateStoreDataValue::SeenKnockRequests(self.deserialize_value(&data)?)
1026                    }
1027                })
1028            })
1029            .transpose()
1030    }
1031
1032    async fn set_kv_data(
1033        &self,
1034        key: StateStoreDataKey<'_>,
1035        value: StateStoreDataValue,
1036    ) -> Result<()> {
1037        let serialized_value = match key {
1038            StateStoreDataKey::SyncToken => self.serialize_value(
1039                &value.into_sync_token().expect("Session data not a sync token"),
1040            )?,
1041            StateStoreDataKey::ServerInfo => self.serialize_value(
1042                &value.into_server_info().expect("Session data not containing server info"),
1043            )?,
1044            StateStoreDataKey::Filter(_) => {
1045                self.serialize_value(&value.into_filter().expect("Session data not a filter"))?
1046            }
1047            StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
1048                &value.into_user_avatar_url().expect("Session data not an user avatar url"),
1049            )?,
1050            StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
1051                &value.into_recently_visited_rooms().expect("Session data not breadcrumbs"),
1052            )?,
1053            StateStoreDataKey::UtdHookManagerData => self.serialize_value(
1054                &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
1055            )?,
1056            StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1057                self.serialize_value(&true).expect("We should be able to serialize a boolean")
1058            }
1059            StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
1060                &value.into_composer_draft().expect("Session data not a composer draft"),
1061            )?,
1062            StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
1063                &value
1064                    .into_seen_knock_requests()
1065                    .expect("Session data is not a set of seen knock request ids"),
1066            )?,
1067        };
1068
1069        self.acquire()
1070            .await?
1071            .set_kv_blob(self.encode_state_store_data_key(key), serialized_value)
1072            .await
1073    }
1074
1075    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
1076        self.acquire().await?.delete_kv_blob(self.encode_state_store_data_key(key)).await
1077    }
1078
1079    async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
1080        let changes = changes.to_owned();
1081        let this = self.clone();
1082        self.acquire()
1083            .await?
1084            .with_transaction(move |txn| {
1085                let StateChanges {
1086                    sync_token,
1087                    account_data,
1088                    presence,
1089                    profiles,
1090                    profiles_to_delete,
1091                    state,
1092                    room_account_data,
1093                    room_infos,
1094                    receipts,
1095                    redactions,
1096                    stripped_state,
1097                    ambiguity_maps,
1098                } = changes;
1099
1100                if let Some(sync_token) = sync_token {
1101                    let key = this.encode_state_store_data_key(StateStoreDataKey::SyncToken);
1102                    let value = this.serialize_value(&sync_token)?;
1103                    txn.set_kv_blob(&key, &value)?;
1104                }
1105
1106                for (event_type, event) in account_data {
1107                    let event_type =
1108                        this.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1109                    let data = this.serialize_json(&event)?;
1110                    txn.set_global_account_data(&event_type, &data)?;
1111                }
1112
1113                for (room_id, events) in room_account_data {
1114                    let room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1115                    for (event_type, event) in events {
1116                        let event_type =
1117                            this.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1118                        let data = this.serialize_json(&event)?;
1119                        txn.set_room_account_data(&room_id, &event_type, &data)?;
1120                    }
1121                }
1122
1123                for (user_id, event) in presence {
1124                    let key = this.encode_presence_key(&user_id);
1125                    let value = this.serialize_json(&event)?;
1126                    txn.set_kv_blob(&key, &value)?;
1127                }
1128
1129                for (room_id, room_info) in room_infos {
1130                    let stripped = room_info.state() == RoomState::Invited;
1131                    // Remove non-stripped data for stripped rooms and vice-versa.
1132                    this.remove_maybe_stripped_room_data(txn, &room_id, !stripped)?;
1133
1134                    let room_id = this.encode_key(keys::ROOM_INFO, room_id);
1135                    let state = this
1136                        .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
1137                    let data = this.serialize_json(&room_info)?;
1138                    txn.set_room_info(&room_id, &state, &data)?;
1139                }
1140
1141                for (room_id, user_ids) in profiles_to_delete {
1142                    let room_id = this.encode_key(keys::PROFILE, room_id);
1143                    for user_id in user_ids {
1144                        let user_id = this.encode_key(keys::PROFILE, user_id);
1145                        txn.remove_room_profile(&room_id, &user_id)?;
1146                    }
1147                }
1148
1149                for (room_id, state_event_types) in state {
1150                    let profiles = profiles.get(&room_id);
1151                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1152
1153                    for (event_type, state_events) in state_event_types {
1154                        let encoded_event_type =
1155                            this.encode_key(keys::STATE_EVENT, event_type.to_string());
1156
1157                        for (state_key, raw_state_event) in state_events {
1158                            let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1159                            let data = this.serialize_json(&raw_state_event)?;
1160
1161                            let event_id: Option<String> =
1162                                raw_state_event.get_field("event_id").ok().flatten();
1163                            let encoded_event_id =
1164                                event_id.as_ref().map(|e| this.encode_key(keys::STATE_EVENT, e));
1165
1166                            txn.set_state_event(
1167                                &encoded_room_id,
1168                                &encoded_event_type,
1169                                &encoded_state_key,
1170                                false,
1171                                encoded_event_id.as_deref(),
1172                                &data,
1173                            )?;
1174
1175                            if event_type == StateEventType::RoomMember {
1176                                let member_event = match raw_state_event
1177                                    .deserialize_as_unchecked::<SyncRoomMemberEvent>()
1178                                {
1179                                    Ok(ev) => ev,
1180                                    Err(e) => {
1181                                        debug!(event_id, "Failed to deserialize member event: {e}");
1182                                        continue;
1183                                    }
1184                                };
1185
1186                                let encoded_room_id = this.encode_key(keys::MEMBER, &room_id);
1187                                let user_id = this.encode_key(keys::MEMBER, &state_key);
1188                                let membership = this
1189                                    .encode_key(keys::MEMBER, member_event.membership().as_str());
1190                                let data = this.serialize_value(&state_key)?;
1191
1192                                txn.set_member(
1193                                    &encoded_room_id,
1194                                    &user_id,
1195                                    &membership,
1196                                    false,
1197                                    &data,
1198                                )?;
1199
1200                                if let Some(profile) =
1201                                    profiles.and_then(|p| p.get(member_event.state_key()))
1202                                {
1203                                    let room_id = this.encode_key(keys::PROFILE, &room_id);
1204                                    let user_id = this.encode_key(keys::PROFILE, &state_key);
1205                                    let data = this.serialize_json(&profile)?;
1206                                    txn.set_profile(&room_id, &user_id, &data)?;
1207                                }
1208                            }
1209                        }
1210                    }
1211                }
1212
1213                for (room_id, stripped_state_event_types) in stripped_state {
1214                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1215
1216                    for (event_type, stripped_state_events) in stripped_state_event_types {
1217                        let encoded_event_type =
1218                            this.encode_key(keys::STATE_EVENT, event_type.to_string());
1219
1220                        for (state_key, raw_stripped_state_event) in stripped_state_events {
1221                            let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1222                            let data = this.serialize_json(&raw_stripped_state_event)?;
1223                            txn.set_state_event(
1224                                &encoded_room_id,
1225                                &encoded_event_type,
1226                                &encoded_state_key,
1227                                true,
1228                                None,
1229                                &data,
1230                            )?;
1231
1232                            if event_type == StateEventType::RoomMember {
1233                                let member_event = match raw_stripped_state_event
1234                                    .deserialize_as_unchecked::<StrippedRoomMemberEvent>(
1235                                ) {
1236                                    Ok(ev) => ev,
1237                                    Err(e) => {
1238                                        debug!("Failed to deserialize stripped member event: {e}");
1239                                        continue;
1240                                    }
1241                                };
1242
1243                                let room_id = this.encode_key(keys::MEMBER, &room_id);
1244                                let user_id = this.encode_key(keys::MEMBER, &state_key);
1245                                let membership = this.encode_key(
1246                                    keys::MEMBER,
1247                                    member_event.content.membership.as_str(),
1248                                );
1249                                let data = this.serialize_value(&state_key)?;
1250
1251                                txn.set_member(&room_id, &user_id, &membership, true, &data)?;
1252                            }
1253                        }
1254                    }
1255                }
1256
1257                for (room_id, receipt_event) in receipts {
1258                    let room_id = this.encode_key(keys::RECEIPT, room_id);
1259
1260                    for (event_id, receipt_types) in receipt_event {
1261                        let encoded_event_id = this.encode_key(keys::RECEIPT, &event_id);
1262
1263                        for (receipt_type, receipt_users) in receipt_types {
1264                            let receipt_type =
1265                                this.encode_key(keys::RECEIPT, receipt_type.as_str());
1266
1267                            for (user_id, receipt) in receipt_users {
1268                                let encoded_user_id = this.encode_key(keys::RECEIPT, &user_id);
1269                                // We cannot have a NULL primary key so we rely on serialization
1270                                // instead of the string representation.
1271                                let thread = this.encode_key(
1272                                    keys::RECEIPT,
1273                                    rmp_serde::to_vec_named(&receipt.thread)?,
1274                                );
1275                                let data = this.serialize_json(&ReceiptData {
1276                                    receipt,
1277                                    event_id: event_id.clone(),
1278                                    user_id,
1279                                })?;
1280
1281                                txn.set_receipt(
1282                                    &room_id,
1283                                    &encoded_user_id,
1284                                    &receipt_type,
1285                                    &thread,
1286                                    &encoded_event_id,
1287                                    &data,
1288                                )?;
1289                            }
1290                        }
1291                    }
1292                }
1293
1294                for (room_id, redactions) in redactions {
1295                    let make_redaction_rules = || {
1296                        let encoded_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1297                        txn.get_room_info(&encoded_room_id)
1298                            .ok()
1299                            .flatten()
1300                            .and_then(|v| this.deserialize_json::<RoomInfo>(&v).ok())
1301                            .map(|info| info.room_version_rules_or_default())
1302                            .unwrap_or_else(|| {
1303                                warn!(
1304                                    ?room_id,
1305                                    "Unable to get the room version rules, defaulting to rules for room version {ROOM_VERSION_FALLBACK}"
1306                                );
1307                                ROOM_VERSION_RULES_FALLBACK
1308                            }).redaction
1309                    };
1310
1311                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1312                    let mut redaction_rules = None;
1313
1314                    for (event_id, redaction) in redactions {
1315                        let event_id = this.encode_key(keys::STATE_EVENT, event_id);
1316
1317                        if let Some(Ok(raw_event)) = txn
1318                            .get_state_event_by_id(&encoded_room_id, &event_id)?
1319                            .map(|value| this.deserialize_json::<Raw<AnySyncStateEvent>>(&value))
1320                        {
1321                            let event = raw_event.deserialize()?;
1322                            let redacted = redact(
1323                                raw_event.deserialize_as::<CanonicalJsonObject>()?,
1324                                redaction_rules.get_or_insert_with(make_redaction_rules),
1325                                Some(RedactedBecause::from_raw_event(&redaction)?),
1326                            )
1327                            .map_err(Error::Redaction)?;
1328                            let data = this.serialize_json(&redacted)?;
1329
1330                            let event_type =
1331                                this.encode_key(keys::STATE_EVENT, event.event_type().to_string());
1332                            let state_key = this.encode_key(keys::STATE_EVENT, event.state_key());
1333
1334                            txn.set_state_event(
1335                                &encoded_room_id,
1336                                &event_type,
1337                                &state_key,
1338                                false,
1339                                Some(&event_id),
1340                                &data,
1341                            )?;
1342                        }
1343                    }
1344                }
1345
1346                for (room_id, display_names) in ambiguity_maps {
1347                    let room_id = this.encode_key(keys::DISPLAY_NAME, room_id);
1348
1349                    for (name, user_ids) in display_names {
1350                        let encoded_name = this.encode_key(
1351                            keys::DISPLAY_NAME,
1352                            name.as_normalized_str().unwrap_or_else(|| name.as_raw_str()),
1353                        );
1354                        let data = this.serialize_json(&user_ids)?;
1355
1356                        if user_ids.is_empty() {
1357                            txn.remove_display_name(&room_id, &encoded_name)?;
1358
1359                            // We can't do a migration to merge the previously distinct buckets of
1360                            // user IDs since the display names themselves are hashed before they
1361                            // are persisted in the store. So the store will always retain two
1362                            // buckets: one for raw display names and one for normalised ones.
1363                            //
1364                            // We therefore do the next best thing, which is a sort of a soft
1365                            // migration: we fetch both the raw and normalised buckets, then merge
1366                            // the user IDs contained in them into a separate, temporary merged
1367                            // bucket. The SDK then operates on the merged buckets exclusively. See
1368                            // the comment in `get_users_with_display_names` for details.
1369                            //
1370                            // If the merged bucket is empty, that must mean that both the raw and
1371                            // normalised buckets were also empty, so we can remove both from the
1372                            // store.
1373                            let raw_name = this.encode_key(keys::DISPLAY_NAME, name.as_raw_str());
1374                            txn.remove_display_name(&room_id, &raw_name)?;
1375                        } else {
1376                            // We only create new buckets with the normalized display name.
1377                            txn.set_display_name(&room_id, &encoded_name, &data)?;
1378                        }
1379                    }
1380                }
1381
1382                Ok::<_, Error>(())
1383            })
1384            .await?;
1385
1386        Ok(())
1387    }
1388
1389    async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1390        self.acquire()
1391            .await?
1392            .get_kv_blob(self.encode_presence_key(user_id))
1393            .await?
1394            .map(|data| self.deserialize_json(&data))
1395            .transpose()
1396    }
1397
1398    async fn get_presence_events(
1399        &self,
1400        user_ids: &[OwnedUserId],
1401    ) -> Result<Vec<Raw<PresenceEvent>>> {
1402        if user_ids.is_empty() {
1403            return Ok(Vec::new());
1404        }
1405
1406        let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
1407        self.acquire()
1408            .await?
1409            .get_kv_blobs(user_ids)
1410            .await?
1411            .into_iter()
1412            .map(|data| self.deserialize_json(&data))
1413            .collect()
1414    }
1415
1416    async fn get_state_event(
1417        &self,
1418        room_id: &RoomId,
1419        event_type: StateEventType,
1420        state_key: &str,
1421    ) -> Result<Option<RawAnySyncOrStrippedState>> {
1422        Ok(self
1423            .get_state_events_for_keys(room_id, event_type, &[state_key])
1424            .await?
1425            .into_iter()
1426            .next())
1427    }
1428
1429    async fn get_state_events(
1430        &self,
1431        room_id: &RoomId,
1432        event_type: StateEventType,
1433    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1434        let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1435        let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1436        self.acquire()
1437            .await?
1438            .get_maybe_stripped_state_events(room_id, event_type)
1439            .await?
1440            .into_iter()
1441            .map(|(stripped, data)| {
1442                let ev = if stripped {
1443                    RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1444                } else {
1445                    RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1446                };
1447
1448                Ok(ev)
1449            })
1450            .collect()
1451    }
1452
1453    async fn get_state_events_for_keys(
1454        &self,
1455        room_id: &RoomId,
1456        event_type: StateEventType,
1457        state_keys: &[&str],
1458    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1459        if state_keys.is_empty() {
1460            return Ok(Vec::new());
1461        }
1462
1463        let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1464        let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1465        let state_keys = state_keys.iter().map(|k| self.encode_key(keys::STATE_EVENT, k)).collect();
1466        self.acquire()
1467            .await?
1468            .get_maybe_stripped_state_events_for_keys(room_id, event_type, state_keys)
1469            .await?
1470            .into_iter()
1471            .map(|(stripped, data)| {
1472                let ev = if stripped {
1473                    RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1474                } else {
1475                    RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1476                };
1477
1478                Ok(ev)
1479            })
1480            .collect()
1481    }
1482
1483    async fn get_profile(
1484        &self,
1485        room_id: &RoomId,
1486        user_id: &UserId,
1487    ) -> Result<Option<MinimalRoomMemberEvent>> {
1488        let room_id = self.encode_key(keys::PROFILE, room_id);
1489        let user_ids = vec![self.encode_key(keys::PROFILE, user_id)];
1490
1491        self.acquire()
1492            .await?
1493            .get_profiles(room_id, user_ids)
1494            .await?
1495            .into_iter()
1496            .next()
1497            .map(|(_, data)| self.deserialize_json(&data))
1498            .transpose()
1499    }
1500
1501    async fn get_profiles<'a>(
1502        &self,
1503        room_id: &RoomId,
1504        user_ids: &'a [OwnedUserId],
1505    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1506        if user_ids.is_empty() {
1507            return Ok(BTreeMap::new());
1508        }
1509
1510        let room_id = self.encode_key(keys::PROFILE, room_id);
1511        let mut user_ids_map = user_ids
1512            .iter()
1513            .map(|u| (self.encode_key(keys::PROFILE, u), u.as_ref()))
1514            .collect::<BTreeMap<_, _>>();
1515        let user_ids = user_ids_map.keys().cloned().collect();
1516
1517        self.acquire()
1518            .await?
1519            .get_profiles(room_id, user_ids)
1520            .await?
1521            .into_iter()
1522            .map(|(user_id, data)| {
1523                Ok((
1524                    user_ids_map
1525                        .remove(user_id.as_slice())
1526                        .expect("returned user IDs were requested"),
1527                    self.deserialize_json(&data)?,
1528                ))
1529            })
1530            .collect()
1531    }
1532
1533    async fn get_user_ids(
1534        &self,
1535        room_id: &RoomId,
1536        membership: RoomMemberships,
1537    ) -> Result<Vec<OwnedUserId>> {
1538        let room_id = self.encode_key(keys::MEMBER, room_id);
1539        let memberships = membership
1540            .as_vec()
1541            .into_iter()
1542            .map(|m| self.encode_key(keys::MEMBER, m.as_str()))
1543            .collect();
1544        self.acquire()
1545            .await?
1546            .get_user_ids(room_id, memberships)
1547            .await?
1548            .iter()
1549            .map(|data| self.deserialize_value(data))
1550            .collect()
1551    }
1552
1553    async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1554        self.acquire()
1555            .await?
1556            .get_room_infos(match room_load_settings {
1557                RoomLoadSettings::All => None,
1558                RoomLoadSettings::One(room_id) => Some(self.encode_key(keys::ROOM_INFO, room_id)),
1559            })
1560            .await?
1561            .into_iter()
1562            .map(|data| self.deserialize_json(&data))
1563            .collect()
1564    }
1565
1566    async fn get_users_with_display_name(
1567        &self,
1568        room_id: &RoomId,
1569        display_name: &DisplayName,
1570    ) -> Result<BTreeSet<OwnedUserId>> {
1571        let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1572        let names = vec![self.encode_key(
1573            keys::DISPLAY_NAME,
1574            display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1575        )];
1576
1577        Ok(self
1578            .acquire()
1579            .await?
1580            .get_display_names(room_id, names)
1581            .await?
1582            .into_iter()
1583            .next()
1584            .map(|(_, data)| self.deserialize_json(&data))
1585            .transpose()?
1586            .unwrap_or_default())
1587    }
1588
1589    async fn get_users_with_display_names<'a>(
1590        &self,
1591        room_id: &RoomId,
1592        display_names: &'a [DisplayName],
1593    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1594        let mut result = HashMap::new();
1595
1596        if display_names.is_empty() {
1597            return Ok(result);
1598        }
1599
1600        let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1601        let mut names_map = display_names
1602            .iter()
1603            .flat_map(|display_name| {
1604                // We encode the display name as the `raw_str()` and the normalized string.
1605                //
1606                // This is for compatibility reasons since:
1607                //  1. Previously "Alice" and "alice" were considered to be distinct display
1608                //     names, while we now consider them to be the same so we need to merge the
1609                //     previously distinct buckets of user IDs.
1610                //  2. We can't do a migration to merge the previously distinct buckets of user
1611                //     IDs since the display names itself are hashed before they are persisted
1612                //     in the store.
1613                let raw =
1614                    (self.encode_key(keys::DISPLAY_NAME, display_name.as_raw_str()), display_name);
1615                let normalized = display_name.as_normalized_str().map(|normalized| {
1616                    (self.encode_key(keys::DISPLAY_NAME, normalized), display_name)
1617                });
1618
1619                iter::once(raw).chain(normalized.into_iter())
1620            })
1621            .collect::<BTreeMap<_, _>>();
1622        let names = names_map.keys().cloned().collect();
1623
1624        for (name, data) in
1625            self.acquire().await?.get_display_names(room_id, names).await?.into_iter()
1626        {
1627            let display_name =
1628                names_map.remove(name.as_slice()).expect("returned display names were requested");
1629            let user_ids: BTreeSet<_> = self.deserialize_json(&data)?;
1630
1631            result.entry(display_name).or_insert_with(BTreeSet::new).extend(user_ids);
1632        }
1633
1634        Ok(result)
1635    }
1636
1637    async fn get_account_data_event(
1638        &self,
1639        event_type: GlobalAccountDataEventType,
1640    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1641        let event_type = self.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1642        self.acquire()
1643            .await?
1644            .get_global_account_data(event_type)
1645            .await?
1646            .map(|value| self.deserialize_json(&value))
1647            .transpose()
1648    }
1649
1650    async fn get_room_account_data_event(
1651        &self,
1652        room_id: &RoomId,
1653        event_type: RoomAccountDataEventType,
1654    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1655        let room_id = self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1656        let event_type = self.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1657        self.acquire()
1658            .await?
1659            .get_room_account_data(room_id, event_type)
1660            .await?
1661            .map(|value| self.deserialize_json(&value))
1662            .transpose()
1663    }
1664
1665    async fn get_user_room_receipt_event(
1666        &self,
1667        room_id: &RoomId,
1668        receipt_type: ReceiptType,
1669        thread: ReceiptThread,
1670        user_id: &UserId,
1671    ) -> Result<Option<(OwnedEventId, Receipt)>> {
1672        let room_id = self.encode_key(keys::RECEIPT, room_id);
1673        let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1674        // We cannot have a NULL primary key so we rely on serialization instead of the
1675        // string representation.
1676        let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1677        let user_id = self.encode_key(keys::RECEIPT, user_id);
1678
1679        self.acquire()
1680            .await?
1681            .get_user_receipt(room_id, receipt_type, thread, user_id)
1682            .await?
1683            .map(|value| {
1684                self.deserialize_json::<ReceiptData>(&value).map(|d| (d.event_id, d.receipt))
1685            })
1686            .transpose()
1687    }
1688
1689    async fn get_event_room_receipt_events(
1690        &self,
1691        room_id: &RoomId,
1692        receipt_type: ReceiptType,
1693        thread: ReceiptThread,
1694        event_id: &EventId,
1695    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1696        let room_id = self.encode_key(keys::RECEIPT, room_id);
1697        let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1698        // We cannot have a NULL primary key so we rely on serialization instead of the
1699        // string representation.
1700        let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1701        let event_id = self.encode_key(keys::RECEIPT, event_id);
1702
1703        self.acquire()
1704            .await?
1705            .get_event_receipts(room_id, receipt_type, thread, event_id)
1706            .await?
1707            .iter()
1708            .map(|value| {
1709                self.deserialize_json::<ReceiptData>(value).map(|d| (d.user_id, d.receipt))
1710            })
1711            .collect()
1712    }
1713
1714    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1715        self.acquire().await?.get_kv_blob(self.encode_custom_key(key)).await
1716    }
1717
1718    async fn set_custom_value_no_read(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
1719        let conn = self.acquire().await?;
1720        let key = self.encode_custom_key(key);
1721        conn.set_kv_blob(key, value).await?;
1722        Ok(())
1723    }
1724
1725    async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1726        let conn = self.acquire().await?;
1727        let key = self.encode_custom_key(key);
1728        let previous = conn.get_kv_blob(key.clone()).await?;
1729        conn.set_kv_blob(key, value).await?;
1730        Ok(previous)
1731    }
1732
1733    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1734        let conn = self.acquire().await?;
1735        let key = self.encode_custom_key(key);
1736        let previous = conn.get_kv_blob(key.clone()).await?;
1737        if previous.is_some() {
1738            conn.delete_kv_blob(key).await?;
1739        }
1740        Ok(previous)
1741    }
1742
1743    async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1744        let this = self.clone();
1745        let room_id = room_id.to_owned();
1746
1747        let conn = self.acquire().await?;
1748
1749        conn.with_transaction(move |txn| -> Result<()> {
1750            let room_info_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1751            txn.remove_room_info(&room_info_room_id)?;
1752
1753            let state_event_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1754            txn.remove_room_state_events(&state_event_room_id, None)?;
1755
1756            let member_room_id = this.encode_key(keys::MEMBER, &room_id);
1757            txn.remove_room_members(&member_room_id, None)?;
1758
1759            let profile_room_id = this.encode_key(keys::PROFILE, &room_id);
1760            txn.remove_room_profiles(&profile_room_id)?;
1761
1762            let room_account_data_room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, &room_id);
1763            txn.remove_room_account_data(&room_account_data_room_id)?;
1764
1765            let receipt_room_id = this.encode_key(keys::RECEIPT, &room_id);
1766            txn.remove_room_receipts(&receipt_room_id)?;
1767
1768            let display_name_room_id = this.encode_key(keys::DISPLAY_NAME, &room_id);
1769            txn.remove_room_display_names(&display_name_room_id)?;
1770
1771            let send_queue_room_id = this.encode_key(keys::SEND_QUEUE, &room_id);
1772            txn.remove_room_send_queue(&send_queue_room_id)?;
1773
1774            let dependent_send_queue_room_id =
1775                this.encode_key(keys::DEPENDENTS_SEND_QUEUE, &room_id);
1776            txn.remove_room_dependent_send_queue(&dependent_send_queue_room_id)?;
1777
1778            let thread_subscriptions_room_id =
1779                this.encode_key(keys::THREAD_SUBSCRIPTIONS, &room_id);
1780            txn.execute(
1781                "DELETE FROM thread_subscriptions WHERE room_id = ?",
1782                (thread_subscriptions_room_id,),
1783            )?;
1784
1785            Ok(())
1786        })
1787        .await?;
1788
1789        conn.vacuum().await
1790    }
1791
1792    async fn save_send_queue_request(
1793        &self,
1794        room_id: &RoomId,
1795        transaction_id: OwnedTransactionId,
1796        created_at: MilliSecondsSinceUnixEpoch,
1797        content: QueuedRequestKind,
1798        priority: usize,
1799    ) -> Result<(), Self::Error> {
1800        let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
1801        let room_id_value = self.serialize_value(&room_id.to_owned())?;
1802
1803        let content = self.serialize_json(&content)?;
1804        // The transaction id is used both as a key (in remove/update) and a value (as
1805        // it's useful for the callers), so we keep it as is, and neither hash
1806        // it (with encode_key) or encrypt it (through serialize_value). After
1807        // all, it carries no personal information, so this is considered fine.
1808
1809        let created_at_ts: u64 = created_at.0.into();
1810        self.acquire()
1811            .await?
1812            .with_transaction(move |txn| {
1813                txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
1814                Ok(())
1815            })
1816            .await
1817    }
1818
1819    async fn update_send_queue_request(
1820        &self,
1821        room_id: &RoomId,
1822        transaction_id: &TransactionId,
1823        content: QueuedRequestKind,
1824    ) -> Result<bool, Self::Error> {
1825        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1826
1827        let content = self.serialize_json(&content)?;
1828        // See comment in [`Self::save_send_queue_event`] to understand why the
1829        // transaction id is neither encrypted or hashed.
1830        let transaction_id = transaction_id.to_string();
1831
1832        let num_updated = self.acquire()
1833            .await?
1834            .with_transaction(move |txn| {
1835                txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
1836            })
1837            .await?;
1838
1839        Ok(num_updated > 0)
1840    }
1841
1842    async fn remove_send_queue_request(
1843        &self,
1844        room_id: &RoomId,
1845        transaction_id: &TransactionId,
1846    ) -> Result<bool, Self::Error> {
1847        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1848
1849        // See comment in `save_send_queue_event`.
1850        let transaction_id = transaction_id.to_string();
1851
1852        let num_deleted = self
1853            .acquire()
1854            .await?
1855            .with_transaction(move |txn| {
1856                txn.prepare_cached(
1857                    "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
1858                )?
1859                .execute((room_id, &transaction_id))
1860            })
1861            .await?;
1862
1863        Ok(num_deleted > 0)
1864    }
1865
1866    async fn load_send_queue_requests(
1867        &self,
1868        room_id: &RoomId,
1869    ) -> Result<Vec<QueuedRequest>, Self::Error> {
1870        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1871
1872        // Note: ROWID is always present and is an auto-incremented integer counter. We
1873        // want to maintain the insertion order, so we can sort using it.
1874        // Note 2: transaction_id is not encoded, see why in `save_send_queue_event`.
1875        let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, Option<u64>)> = self
1876            .acquire()
1877            .await?
1878            .prepare(
1879                "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
1880                |mut stmt| {
1881                    stmt.query((room_id,))?
1882                        .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
1883                        .collect()
1884                },
1885            )
1886            .await?;
1887
1888        let mut requests = Vec::with_capacity(res.len());
1889        for entry in res {
1890            let created_at = entry
1891                .4
1892                .and_then(UInt::new)
1893                .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
1894            requests.push(QueuedRequest {
1895                transaction_id: entry.0.into(),
1896                kind: self.deserialize_json(&entry.1)?,
1897                error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
1898                priority: entry.3,
1899                created_at,
1900            });
1901        }
1902
1903        Ok(requests)
1904    }
1905
1906    async fn update_send_queue_request_status(
1907        &self,
1908        room_id: &RoomId,
1909        transaction_id: &TransactionId,
1910        error: Option<QueueWedgeError>,
1911    ) -> Result<(), Self::Error> {
1912        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1913
1914        // See comment in `save_send_queue_event`.
1915        let transaction_id = transaction_id.to_string();
1916
1917        // Serialize the error to json bytes (encrypted if option is enabled) if set.
1918        let error_value = error.map(|e| self.serialize_value(&e)).transpose()?;
1919
1920        self.acquire()
1921            .await?
1922            .with_transaction(move |txn| {
1923                txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?;
1924                Ok(())
1925            })
1926            .await
1927    }
1928
1929    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1930        // If the values were not encrypted, we could use `SELECT DISTINCT` here, but we
1931        // have to manually do the deduplication: indeed, for all X, encrypt(X)
1932        // != encrypted(X), since we use a nonce in the encryption process.
1933
1934        let res: Vec<Vec<u8>> = self
1935            .acquire()
1936            .await?
1937            .prepare("SELECT room_id_val FROM send_queue_events", |mut stmt| {
1938                stmt.query(())?.mapped(|row| row.get(0)).collect()
1939            })
1940            .await?;
1941
1942        // So we collect the results into a `BTreeSet` to perform the deduplication, and
1943        // then rejigger that into a vector.
1944        Ok(res
1945            .into_iter()
1946            .map(|entry| self.deserialize_value(&entry))
1947            .collect::<Result<BTreeSet<OwnedRoomId>, _>>()?
1948            .into_iter()
1949            .collect())
1950    }
1951
1952    async fn save_dependent_queued_request(
1953        &self,
1954        room_id: &RoomId,
1955        parent_txn_id: &TransactionId,
1956        own_txn_id: ChildTransactionId,
1957        created_at: MilliSecondsSinceUnixEpoch,
1958        content: DependentQueuedRequestKind,
1959    ) -> Result<()> {
1960        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
1961        let content = self.serialize_json(&content)?;
1962
1963        // See comment in `save_send_queue_event`.
1964        let parent_txn_id = parent_txn_id.to_string();
1965        let own_txn_id = own_txn_id.to_string();
1966
1967        let created_at_ts: u64 = created_at.0.into();
1968        self.acquire()
1969            .await?
1970            .with_transaction(move |txn| {
1971                txn.prepare_cached(
1972                    r#"INSERT INTO dependent_send_queue_events
1973                         (room_id, parent_transaction_id, own_transaction_id, content, created_at)
1974                       VALUES (?, ?, ?, ?, ?)"#,
1975                )?
1976                .execute((
1977                    room_id,
1978                    parent_txn_id,
1979                    own_txn_id,
1980                    content,
1981                    created_at_ts,
1982                ))?;
1983                Ok(())
1984            })
1985            .await
1986    }
1987
1988    async fn update_dependent_queued_request(
1989        &self,
1990        room_id: &RoomId,
1991        own_transaction_id: &ChildTransactionId,
1992        new_content: DependentQueuedRequestKind,
1993    ) -> Result<bool> {
1994        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
1995        let content = self.serialize_json(&new_content)?;
1996
1997        // See comment in `save_send_queue_event`.
1998        let own_txn_id = own_transaction_id.to_string();
1999
2000        let num_updated = self
2001            .acquire()
2002            .await?
2003            .with_transaction(move |txn| {
2004                txn.prepare_cached(
2005                    r#"UPDATE dependent_send_queue_events
2006                       SET content = ?
2007                       WHERE own_transaction_id = ?
2008                       AND room_id = ?"#,
2009                )?
2010                .execute((content, own_txn_id, room_id))
2011            })
2012            .await?;
2013
2014        if num_updated > 1 {
2015            return Err(Error::InconsistentUpdate);
2016        }
2017
2018        Ok(num_updated == 1)
2019    }
2020
2021    async fn mark_dependent_queued_requests_as_ready(
2022        &self,
2023        room_id: &RoomId,
2024        parent_txn_id: &TransactionId,
2025        parent_key: SentRequestKey,
2026    ) -> Result<usize> {
2027        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2028        let parent_key = self.serialize_value(&parent_key)?;
2029
2030        // See comment in `save_send_queue_event`.
2031        let parent_txn_id = parent_txn_id.to_string();
2032
2033        self.acquire()
2034            .await?
2035            .with_transaction(move |txn| {
2036                Ok(txn.prepare_cached(
2037                    "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?",
2038                )?
2039                .execute((parent_key, parent_txn_id, room_id))?)
2040            })
2041            .await
2042    }
2043
2044    async fn remove_dependent_queued_request(
2045        &self,
2046        room_id: &RoomId,
2047        txn_id: &ChildTransactionId,
2048    ) -> Result<bool> {
2049        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2050
2051        // See comment in `save_send_queue_event`.
2052        let txn_id = txn_id.to_string();
2053
2054        let num_deleted = self
2055            .acquire()
2056            .await?
2057            .with_transaction(move |txn| {
2058                txn.prepare_cached(
2059                    "DELETE FROM dependent_send_queue_events WHERE own_transaction_id = ? AND room_id = ?",
2060                )?
2061                .execute((txn_id, room_id))
2062            })
2063            .await?;
2064
2065        Ok(num_deleted > 0)
2066    }
2067
2068    async fn load_dependent_queued_requests(
2069        &self,
2070        room_id: &RoomId,
2071    ) -> Result<Vec<DependentQueuedRequest>> {
2072        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2073
2074        // Note: transaction_id is not encoded, see why in `save_send_queue_event`.
2075        let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, Option<u64>)> = self
2076            .acquire()
2077            .await?
2078            .prepare(
2079                "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2080                |mut stmt| {
2081                    stmt.query((room_id,))?
2082                        .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2083                        .collect()
2084                },
2085            )
2086            .await?;
2087
2088        let mut dependent_events = Vec::with_capacity(res.len());
2089        for entry in res {
2090            let created_at = entry
2091                .4
2092                .and_then(UInt::new)
2093                .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2094            dependent_events.push(DependentQueuedRequest {
2095                own_transaction_id: entry.0.into(),
2096                parent_transaction_id: entry.1.into(),
2097                parent_key: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?,
2098                kind: self.deserialize_json(&entry.3)?,
2099                created_at,
2100            });
2101        }
2102
2103        Ok(dependent_events)
2104    }
2105
2106    async fn upsert_thread_subscription(
2107        &self,
2108        room_id: &RoomId,
2109        thread_id: &EventId,
2110        subscription: ThreadSubscription,
2111    ) -> Result<(), Self::Error> {
2112        let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2113        let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2114        let status = subscription.as_str();
2115
2116        self.acquire()
2117            .await?
2118            .with_transaction(move |txn| {
2119                txn.prepare_cached(
2120                    "INSERT OR REPLACE INTO thread_subscriptions (room_id, event_id, status)
2121                         VALUES (?, ?, ?)",
2122                )?
2123                .execute((room_id, thread_id, status))
2124            })
2125            .await?;
2126        Ok(())
2127    }
2128
2129    async fn load_thread_subscription(
2130        &self,
2131        room_id: &RoomId,
2132        thread_id: &EventId,
2133    ) -> Result<Option<ThreadSubscription>, Self::Error> {
2134        let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2135        let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2136
2137        Ok(self
2138            .acquire()
2139            .await?
2140            .query_row(
2141                "SELECT status FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2142                (room_id, thread_id),
2143                |row| row.get::<_, String>(0),
2144            )
2145            .await
2146            .optional()?
2147            .map(|data| {
2148                ThreadSubscription::from_value(&data).ok_or_else(|| Error::InvalidData {
2149                    details: format!("Invalid thread status: {data}"),
2150                })
2151            })
2152            .transpose()?)
2153    }
2154
2155    async fn remove_thread_subscription(
2156        &self,
2157        room_id: &RoomId,
2158        thread_id: &EventId,
2159    ) -> Result<(), Self::Error> {
2160        let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2161        let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2162
2163        self.acquire()
2164            .await?
2165            .execute(
2166                "DELETE FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2167                (room_id, thread_id),
2168            )
2169            .await?;
2170
2171        Ok(())
2172    }
2173}
2174
2175#[derive(Debug, Clone, Serialize, Deserialize)]
2176struct ReceiptData {
2177    receipt: Receipt,
2178    event_id: OwnedEventId,
2179    user_id: OwnedUserId,
2180}
2181
2182#[cfg(test)]
2183mod tests {
2184    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2185
2186    use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2187    use once_cell::sync::Lazy;
2188    use tempfile::{tempdir, TempDir};
2189
2190    use super::SqliteStateStore;
2191
2192    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2193    static NUM: AtomicU32 = AtomicU32::new(0);
2194
2195    async fn get_store() -> Result<impl StateStore, StoreError> {
2196        let name = NUM.fetch_add(1, SeqCst).to_string();
2197        let tmpdir_path = TMP_DIR.path().join(name);
2198
2199        tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2200
2201        Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
2202    }
2203
2204    statestore_integration_tests!();
2205}
2206
2207#[cfg(test)]
2208mod encrypted_tests {
2209    use std::{
2210        path::PathBuf,
2211        sync::atomic::{AtomicU32, Ordering::SeqCst},
2212    };
2213
2214    use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2215    use matrix_sdk_test::async_test;
2216    use once_cell::sync::Lazy;
2217    use tempfile::{tempdir, TempDir};
2218
2219    use super::SqliteStateStore;
2220    use crate::{utils::SqliteAsyncConnExt, SqliteStoreConfig};
2221
2222    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2223    static NUM: AtomicU32 = AtomicU32::new(0);
2224
2225    fn new_state_store_workspace() -> PathBuf {
2226        let name = NUM.fetch_add(1, SeqCst).to_string();
2227        TMP_DIR.path().join(name)
2228    }
2229
2230    async fn get_store() -> Result<impl StateStore, StoreError> {
2231        let tmpdir_path = new_state_store_workspace();
2232
2233        tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2234
2235        Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
2236            .await
2237            .unwrap())
2238    }
2239
2240    #[async_test]
2241    async fn test_pool_size() {
2242        let tmpdir_path = new_state_store_workspace();
2243        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
2244
2245        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2246
2247        assert_eq!(store.pool.status().max_size, 42);
2248    }
2249
2250    #[async_test]
2251    async fn test_cache_size() {
2252        let tmpdir_path = new_state_store_workspace();
2253        let store_open_config = SqliteStoreConfig::new(tmpdir_path).cache_size(1500);
2254
2255        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2256
2257        let conn = store.pool.get().await.unwrap();
2258        let cache_size =
2259            conn.query_row("PRAGMA cache_size", (), |row| row.get::<_, i32>(0)).await.unwrap();
2260
2261        // The value passed to `SqliteStoreConfig` is in bytes. Check it is
2262        // converted to kibibytes. Also, it must be a negative value because it
2263        // _is_ the size in kibibytes, not in page size.
2264        assert_eq!(cache_size, -(1500 / 1024));
2265    }
2266
2267    #[async_test]
2268    async fn test_journal_size_limit() {
2269        let tmpdir_path = new_state_store_workspace();
2270        let store_open_config = SqliteStoreConfig::new(tmpdir_path).journal_size_limit(1500);
2271
2272        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2273
2274        let conn = store.pool.get().await.unwrap();
2275        let journal_size_limit = conn
2276            .query_row("PRAGMA journal_size_limit", (), |row| row.get::<_, u32>(0))
2277            .await
2278            .unwrap();
2279
2280        // The value passed to `SqliteStoreConfig` is in bytes. It stays in
2281        // bytes in SQLite.
2282        assert_eq!(journal_size_limit, 1500);
2283    }
2284
2285    statestore_integration_tests!();
2286}
2287
2288#[cfg(test)]
2289mod migration_tests {
2290    use std::{
2291        path::{Path, PathBuf},
2292        sync::{
2293            atomic::{AtomicU32, Ordering::SeqCst},
2294            Arc,
2295        },
2296    };
2297
2298    use as_variant::as_variant;
2299    use deadpool_sqlite::Runtime;
2300    use matrix_sdk_base::{
2301        media::{MediaFormat, MediaRequestParameters},
2302        store::{
2303            ChildTransactionId, DependentQueuedRequestKind, RoomLoadSettings,
2304            SerializableEventContent,
2305        },
2306        sync::UnreadNotificationsCount,
2307        RoomState, StateStore,
2308    };
2309    use matrix_sdk_test::async_test;
2310    use once_cell::sync::Lazy;
2311    use ruma::{
2312        events::{
2313            room::{create::RoomCreateEventContent, message::RoomMessageEventContent, MediaSource},
2314            StateEventType,
2315        },
2316        room_id, server_name, user_id, EventId, MilliSecondsSinceUnixEpoch, OwnedTransactionId,
2317        RoomId, TransactionId, UserId,
2318    };
2319    use rusqlite::Transaction;
2320    use serde::{Deserialize, Serialize};
2321    use serde_json::json;
2322    use tempfile::{tempdir, TempDir};
2323    use tokio::fs;
2324
2325    use super::{init, keys, SqliteStateStore, DATABASE_NAME};
2326    use crate::{
2327        error::{Error, Result},
2328        utils::{EncryptableStore as _, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
2329        OpenStoreError,
2330    };
2331
2332    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2333    static NUM: AtomicU32 = AtomicU32::new(0);
2334    const SECRET: &str = "secret";
2335
2336    fn new_path() -> PathBuf {
2337        let name = NUM.fetch_add(1, SeqCst).to_string();
2338        TMP_DIR.path().join(name)
2339    }
2340
2341    async fn create_fake_db(path: &Path, version: u8) -> Result<SqliteStateStore> {
2342        fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir).unwrap();
2343
2344        let config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
2345        // use default pool config
2346
2347        let pool = config.create_pool(Runtime::Tokio1).unwrap();
2348        let conn = pool.get().await?;
2349
2350        init(&conn).await?;
2351
2352        let store_cipher = Some(Arc::new(conn.get_or_create_store_cipher(SECRET).await.unwrap()));
2353        let this = SqliteStateStore { store_cipher, pool };
2354        this.run_migrations(&conn, 1, Some(version)).await?;
2355
2356        Ok(this)
2357    }
2358
2359    fn room_info_v1_json(
2360        room_id: &RoomId,
2361        state: RoomState,
2362        name: Option<&str>,
2363        creator: Option<&UserId>,
2364    ) -> serde_json::Value {
2365        // Test with name set or not.
2366        let name_content = match name {
2367            Some(name) => json!({ "name": name }),
2368            None => json!({ "name": null }),
2369        };
2370        // Test with creator set or not.
2371        let create_content = match creator {
2372            Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2373            None => RoomCreateEventContent::new_v11(),
2374        };
2375
2376        json!({
2377            "room_id": room_id,
2378            "room_type": state,
2379            "notification_counts": UnreadNotificationsCount::default(),
2380            "summary": {
2381                "heroes": [],
2382                "joined_member_count": 0,
2383                "invited_member_count": 0,
2384            },
2385            "members_synced": false,
2386            "base_info": {
2387                "dm_targets": [],
2388                "max_power_level": 100,
2389                "name": {
2390                    "Original": {
2391                        "content": name_content,
2392                    },
2393                },
2394                "create": {
2395                    "Original": {
2396                        "content": create_content,
2397                    }
2398                }
2399            },
2400        })
2401    }
2402
2403    #[async_test]
2404    pub async fn test_migrating_v1_to_v2() {
2405        let path = new_path();
2406        // Create and populate db.
2407        {
2408            let db = create_fake_db(&path, 1).await.unwrap();
2409            let conn = db.pool.get().await.unwrap();
2410
2411            let this = db.clone();
2412            conn.with_transaction(move |txn| {
2413                for i in 0..5 {
2414                    let room_id = RoomId::parse(format!("!room_{i}:localhost")).unwrap();
2415                    let (state, stripped) =
2416                        if i < 3 { (RoomState::Joined, false) } else { (RoomState::Invited, true) };
2417                    let info = room_info_v1_json(&room_id, state, None, None);
2418
2419                    let room_id = this.encode_key(keys::ROOM_INFO, room_id);
2420                    let data = this.serialize_json(&info)?;
2421
2422                    txn.prepare_cached(
2423                        "INSERT INTO room_info (room_id, stripped, data)
2424                         VALUES (?, ?, ?)",
2425                    )?
2426                    .execute((room_id, stripped, data))?;
2427                }
2428
2429                Result::<_, Error>::Ok(())
2430            })
2431            .await
2432            .unwrap();
2433        }
2434
2435        // This transparently migrates to the latest version.
2436        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2437
2438        // Check all room infos are there.
2439        assert_eq!(store.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 5);
2440    }
2441
2442    // Add a room in version 2 format of the state store.
2443    fn add_room_v2(
2444        this: &SqliteStateStore,
2445        txn: &Transaction<'_>,
2446        room_id: &RoomId,
2447        name: Option<&str>,
2448        create_creator: Option<&UserId>,
2449        create_sender: Option<&UserId>,
2450    ) -> Result<(), Error> {
2451        let room_info_json = room_info_v1_json(room_id, RoomState::Joined, name, create_creator);
2452
2453        let encoded_room_id = this.encode_key(keys::ROOM_INFO, room_id);
2454        let encoded_state =
2455            this.encode_key(keys::ROOM_INFO, serde_json::to_string(&RoomState::Joined)?);
2456        let data = this.serialize_json(&room_info_json)?;
2457
2458        txn.prepare_cached(
2459            "INSERT INTO room_info (room_id, state, data)
2460             VALUES (?, ?, ?)",
2461        )?
2462        .execute((encoded_room_id, encoded_state, data))?;
2463
2464        // Test with or without `m.room.create` event in the room state.
2465        let Some(create_sender) = create_sender else {
2466            return Ok(());
2467        };
2468
2469        let create_content = match create_creator {
2470            Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2471            None => RoomCreateEventContent::new_v11(),
2472        };
2473
2474        let event_id = EventId::new(server_name!("dummy.local"));
2475        let create_event = json!({
2476            "content": create_content,
2477            "event_id": event_id,
2478            "sender": create_sender.to_owned(),
2479            "origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
2480            "state_key": "",
2481            "type": "m.room.create",
2482            "unsigned": {},
2483        });
2484
2485        let encoded_room_id = this.encode_key(keys::STATE_EVENT, room_id);
2486        let encoded_event_type =
2487            this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
2488        let encoded_state_key = this.encode_key(keys::STATE_EVENT, "");
2489        let stripped = false;
2490        let encoded_event_id = this.encode_key(keys::STATE_EVENT, event_id);
2491        let data = this.serialize_json(&create_event)?;
2492
2493        txn.prepare_cached(
2494            "INSERT
2495             INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
2496             VALUES (?, ?, ?, ?, ?, ?)",
2497        )?
2498        .execute((
2499            encoded_room_id,
2500            encoded_event_type,
2501            encoded_state_key,
2502            stripped,
2503            encoded_event_id,
2504            data,
2505        ))?;
2506
2507        Ok(())
2508    }
2509
2510    #[async_test]
2511    pub async fn test_migrating_v2_to_v3() {
2512        let path = new_path();
2513
2514        // Room A: with name, creator and sender.
2515        let room_a_id = room_id!("!room_a:dummy.local");
2516        let room_a_name = "Room A";
2517        let room_a_creator = user_id!("@creator:dummy.local");
2518        // Use a different sender to check that sender is used over creator in
2519        // migration.
2520        let room_a_create_sender = user_id!("@sender:dummy.local");
2521
2522        // Room B: without name, creator and sender.
2523        let room_b_id = room_id!("!room_b:dummy.local");
2524
2525        // Room C: only with sender.
2526        let room_c_id = room_id!("!room_c:dummy.local");
2527        let room_c_create_sender = user_id!("@creator:dummy.local");
2528
2529        // Create and populate db.
2530        {
2531            let db = create_fake_db(&path, 2).await.unwrap();
2532            let conn = db.pool.get().await.unwrap();
2533
2534            let this = db.clone();
2535            conn.with_transaction(move |txn| {
2536                add_room_v2(
2537                    &this,
2538                    txn,
2539                    room_a_id,
2540                    Some(room_a_name),
2541                    Some(room_a_creator),
2542                    Some(room_a_create_sender),
2543                )?;
2544                add_room_v2(&this, txn, room_b_id, None, None, None)?;
2545                add_room_v2(&this, txn, room_c_id, None, None, Some(room_c_create_sender))?;
2546
2547                Result::<_, Error>::Ok(())
2548            })
2549            .await
2550            .unwrap();
2551        }
2552
2553        // This transparently migrates to the latest version.
2554        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2555
2556        // Check all room infos are there.
2557        let room_infos = store.get_room_infos(&RoomLoadSettings::default()).await.unwrap();
2558        assert_eq!(room_infos.len(), 3);
2559
2560        let room_a = room_infos.iter().find(|r| r.room_id() == room_a_id).unwrap();
2561        assert_eq!(room_a.name(), Some(room_a_name));
2562        assert_eq!(room_a.creators(), Some(vec![room_a_create_sender.to_owned()]));
2563
2564        let room_b = room_infos.iter().find(|r| r.room_id() == room_b_id).unwrap();
2565        assert_eq!(room_b.name(), None);
2566        assert_eq!(room_b.creators(), None);
2567
2568        let room_c = room_infos.iter().find(|r| r.room_id() == room_c_id).unwrap();
2569        assert_eq!(room_c.name(), None);
2570        assert_eq!(room_c.creators(), Some(vec![room_c_create_sender.to_owned()]));
2571    }
2572
2573    #[async_test]
2574    pub async fn test_migrating_v7_to_v9() {
2575        let path = new_path();
2576
2577        let room_id = room_id!("!room_a:dummy.local");
2578        let wedged_event_transaction_id = TransactionId::new();
2579        let local_event_transaction_id = TransactionId::new();
2580
2581        // Create and populate db.
2582        {
2583            let db = create_fake_db(&path, 7).await.unwrap();
2584            let conn = db.pool.get().await.unwrap();
2585
2586            let wedge_tx = wedged_event_transaction_id.clone();
2587            let local_tx = local_event_transaction_id.clone();
2588
2589            conn.with_transaction(move |txn| {
2590                add_dependent_send_queue_event_v7(
2591                    &db,
2592                    txn,
2593                    room_id,
2594                    &local_tx,
2595                    ChildTransactionId::new(),
2596                    DependentQueuedRequestKind::RedactEvent,
2597                )?;
2598                add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?;
2599                add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?;
2600                Result::<_, Error>::Ok(())
2601            })
2602            .await
2603            .unwrap();
2604        }
2605
2606        // This transparently migrates to the latest version, which clears up all
2607        // requests and dependent requests.
2608        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2609
2610        let requests = store.load_send_queue_requests(room_id).await.unwrap();
2611        assert!(requests.is_empty());
2612
2613        let dependent_requests = store.load_dependent_queued_requests(room_id).await.unwrap();
2614        assert!(dependent_requests.is_empty());
2615    }
2616
2617    fn add_send_queue_event_v7(
2618        this: &SqliteStateStore,
2619        txn: &Transaction<'_>,
2620        transaction_id: &TransactionId,
2621        room_id: &RoomId,
2622        is_wedged: bool,
2623    ) -> Result<(), Error> {
2624        let content =
2625            SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?;
2626
2627        let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id);
2628        let room_id_value = this.serialize_value(&room_id.to_owned())?;
2629
2630        let content = this.serialize_json(&content)?;
2631
2632        txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
2633            .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
2634
2635        Ok(())
2636    }
2637
2638    fn add_dependent_send_queue_event_v7(
2639        this: &SqliteStateStore,
2640        txn: &Transaction<'_>,
2641        room_id: &RoomId,
2642        parent_txn_id: &TransactionId,
2643        own_txn_id: ChildTransactionId,
2644        content: DependentQueuedRequestKind,
2645    ) -> Result<(), Error> {
2646        let room_id_value = this.serialize_value(&room_id.to_owned())?;
2647
2648        let parent_txn_id = parent_txn_id.to_string();
2649        let own_txn_id = own_txn_id.to_string();
2650        let content = this.serialize_json(&content)?;
2651
2652        txn.prepare_cached(
2653            "INSERT INTO dependent_send_queue_events
2654                         (room_id, parent_transaction_id, own_transaction_id, content)
2655                       VALUES (?, ?, ?, ?)",
2656        )?
2657        .execute((room_id_value, parent_txn_id, own_txn_id, content))?;
2658
2659        Ok(())
2660    }
2661
2662    #[derive(Clone, Debug, Serialize, Deserialize)]
2663    pub enum LegacyDependentQueuedRequestKind {
2664        UploadFileWithThumbnail {
2665            content_type: String,
2666            cache_key: MediaRequestParameters,
2667            related_to: OwnedTransactionId,
2668        },
2669    }
2670
2671    #[async_test]
2672    pub async fn test_dependent_queued_request_variant_renaming() {
2673        let path = new_path();
2674        let db = create_fake_db(&path, 7).await.unwrap();
2675
2676        let cache_key = MediaRequestParameters {
2677            format: MediaFormat::File,
2678            source: MediaSource::Plain("https://server.local/foobar".into()),
2679        };
2680        let related_to = TransactionId::new();
2681        let request = LegacyDependentQueuedRequestKind::UploadFileWithThumbnail {
2682            content_type: "image/png".to_owned(),
2683            cache_key,
2684            related_to: related_to.clone(),
2685        };
2686
2687        let data = db
2688            .serialize_json(&request)
2689            .expect("should be able to serialize legacy dependent request");
2690        let deserialized: DependentQueuedRequestKind = db.deserialize_json(&data).expect(
2691            "should be able to deserialize dependent request from legacy dependent request",
2692        );
2693
2694        as_variant!(deserialized, DependentQueuedRequestKind::UploadFileOrThumbnail { related_to: de_related_to, .. } => {
2695            assert_eq!(de_related_to, related_to);
2696        });
2697    }
2698}