matrix_sdk_sqlite/
state_store.rs

1use std::{
2    borrow::Cow,
3    collections::{BTreeMap, BTreeSet, HashMap},
4    fmt, iter,
5    path::Path,
6    sync::Arc,
7};
8
9use async_trait::async_trait;
10use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
11use matrix_sdk_base::{
12    deserialized_responses::{DisplayName, RawAnySyncOrStrippedState, SyncOrStrippedState},
13    store::{
14        migration_helpers::RoomInfoV1, ChildTransactionId, DependentQueuedRequest,
15        DependentQueuedRequestKind, QueueWedgeError, QueuedRequest, QueuedRequestKind,
16        RoomLoadSettings, SentRequestKey,
17    },
18    MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateChanges, StateStore,
19    StateStoreDataKey, StateStoreDataValue,
20};
21use matrix_sdk_store_encryption::StoreCipher;
22use ruma::{
23    canonical_json::{redact, RedactedBecause},
24    events::{
25        presence::PresenceEvent,
26        receipt::{Receipt, ReceiptThread, ReceiptType},
27        room::{
28            create::RoomCreateEventContent,
29            member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
30        },
31        AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
32        GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
33    },
34    serde::Raw,
35    CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
36    OwnedTransactionId, OwnedUserId, RoomId, RoomVersionId, TransactionId, UInt, UserId,
37};
38use rusqlite::{OptionalExtension, Transaction};
39use serde::{de::DeserializeOwned, Deserialize, Serialize};
40use tokio::fs;
41use tracing::{debug, warn};
42
43use crate::{
44    error::{Error, Result},
45    utils::{
46        repeat_vars, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
47        SqliteKeyValueStoreConnExt,
48    },
49    OpenStoreError, SqliteStoreConfig,
50};
51
52mod keys {
53    // Tables
54    pub const KV_BLOB: &str = "kv_blob";
55    pub const ROOM_INFO: &str = "room_info";
56    pub const STATE_EVENT: &str = "state_event";
57    pub const GLOBAL_ACCOUNT_DATA: &str = "global_account_data";
58    pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
59    pub const MEMBER: &str = "member";
60    pub const PROFILE: &str = "profile";
61    pub const RECEIPT: &str = "receipt";
62    pub const DISPLAY_NAME: &str = "display_name";
63    pub const SEND_QUEUE: &str = "send_queue_events";
64    pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
65}
66
67const DATABASE_NAME: &str = "matrix-sdk-state.sqlite3";
68
69/// Identifier of the latest database version.
70///
71/// This is used to figure whether the sqlite database requires a migration.
72/// Every new SQL migration should imply a bump of this number, and changes in
73/// the [`SqliteStateStore::run_migrations`] function..
74const DATABASE_VERSION: u8 = 12;
75
76/// A sqlite based cryptostore.
77#[derive(Clone)]
78pub struct SqliteStateStore {
79    store_cipher: Option<Arc<StoreCipher>>,
80    pool: SqlitePool,
81}
82
83#[cfg(not(tarpaulin_include))]
84impl fmt::Debug for SqliteStateStore {
85    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86        f.debug_struct("SqliteStateStore").finish_non_exhaustive()
87    }
88}
89
90impl SqliteStateStore {
91    /// Open the sqlite-based state store at the given path using the given
92    /// passphrase to encrypt private data.
93    pub async fn open(
94        path: impl AsRef<Path>,
95        passphrase: Option<&str>,
96    ) -> Result<Self, OpenStoreError> {
97        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
98    }
99
100    /// Open the sqlite-based state store with the config open config.
101    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
102        let SqliteStoreConfig { path, passphrase, pool_config, runtime_config } = config;
103
104        fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
105
106        let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
107        config.pool = Some(pool_config);
108
109        let pool = config.create_pool(Runtime::Tokio1)?;
110
111        let this = Self::open_with_pool(pool, passphrase.as_deref()).await?;
112        this.pool.get().await?.apply_runtime_config(runtime_config).await?;
113
114        Ok(this)
115    }
116
117    /// Create a sqlite-based state store using the given sqlite database pool.
118    /// The given passphrase will be used to encrypt private data.
119    async fn open_with_pool(
120        pool: SqlitePool,
121        passphrase: Option<&str>,
122    ) -> Result<Self, OpenStoreError> {
123        let conn = pool.get().await?;
124
125        let mut version = conn.db_version().await?;
126
127        if version == 0 {
128            init(&conn).await?;
129            version = 1;
130        }
131
132        let store_cipher = match passphrase {
133            Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
134            None => None,
135        };
136        let this = Self { store_cipher, pool };
137        this.run_migrations(&conn, version, None).await?;
138
139        Ok(this)
140    }
141
142    /// Run database migrations from the given `from` version to the given `to`
143    /// version
144    ///
145    /// If `to` is `None`, the current database version will be used.
146    async fn run_migrations(&self, conn: &SqliteAsyncConn, from: u8, to: Option<u8>) -> Result<()> {
147        let to = to.unwrap_or(DATABASE_VERSION);
148
149        if from < to {
150            debug!(version = from, new_version = to, "Upgrading database");
151        } else {
152            return Ok(());
153        }
154
155        if from < 2 && to >= 2 {
156            let this = self.clone();
157            conn.with_transaction(move |txn| {
158                // Create new table.
159                txn.execute_batch(include_str!(
160                    "../migrations/state_store/002_a_create_new_room_info.sql"
161                ))?;
162
163                // Migrate data to new table.
164                for data in txn
165                    .prepare("SELECT data FROM room_info")?
166                    .query_map((), |row| row.get::<_, Vec<u8>>(0))?
167                {
168                    let data = data?;
169                    let room_info: RoomInfoV1 = this.deserialize_json(&data)?;
170
171                    let room_id = this.encode_key(keys::ROOM_INFO, room_info.room_id());
172                    let state = this
173                        .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
174                    txn.prepare_cached(
175                        "INSERT OR REPLACE INTO new_room_info (room_id, state, data)
176                         VALUES (?, ?, ?)",
177                    )?
178                    .execute((room_id, state, data))?;
179                }
180
181                // Replace old table.
182                txn.execute_batch(include_str!(
183                    "../migrations/state_store/002_b_replace_room_info.sql"
184                ))?;
185
186                txn.set_db_version(2)?;
187                Result::<_, Error>::Ok(())
188            })
189            .await?;
190        }
191
192        // Migration to v3: RoomInfo format has changed.
193        if from < 3 && to >= 3 {
194            let this = self.clone();
195            conn.with_transaction(move |txn| {
196                // Migrate data .
197                for data in txn
198                    .prepare("SELECT data FROM room_info")?
199                    .query_map((), |row| row.get::<_, Vec<u8>>(0))?
200                {
201                    let data = data?;
202                    let room_info_v1: RoomInfoV1 = this.deserialize_json(&data)?;
203
204                    // Get the `m.room.create` event from the room state.
205                    let room_id = this.encode_key(keys::STATE_EVENT, room_info_v1.room_id());
206                    let event_type =
207                        this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
208                    let create_res = txn
209                        .prepare(
210                            "SELECT stripped, data FROM state_event
211                             WHERE room_id = ? AND event_type = ?",
212                        )?
213                        .query_row([room_id, event_type], |row| {
214                            Ok((row.get::<_, bool>(0)?, row.get::<_, Vec<u8>>(1)?))
215                        })
216                        .optional()?;
217
218                    let create = create_res.and_then(|(stripped, data)| {
219                        let create = if stripped {
220                            SyncOrStrippedState::<RoomCreateEventContent>::Stripped(
221                                this.deserialize_json(&data).ok()?,
222                            )
223                        } else {
224                            SyncOrStrippedState::Sync(this.deserialize_json(&data).ok()?)
225                        };
226                        Some(create)
227                    });
228
229                    let migrated_room_info = room_info_v1.migrate(create.as_ref());
230
231                    let data = this.serialize_json(&migrated_room_info)?;
232                    let room_id = this.encode_key(keys::ROOM_INFO, migrated_room_info.room_id());
233                    txn.prepare_cached("UPDATE room_info SET data = ? WHERE room_id = ?")?
234                        .execute((data, room_id))?;
235                }
236
237                txn.set_db_version(3)?;
238                Result::<_, Error>::Ok(())
239            })
240            .await?;
241        }
242
243        if from < 4 && to >= 4 {
244            conn.with_transaction(move |txn| {
245                // Create new table.
246                txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
247                txn.set_db_version(4)
248            })
249            .await?;
250        }
251
252        if from < 5 && to >= 5 {
253            conn.with_transaction(move |txn| {
254                // Create new table.
255                txn.execute_batch(include_str!(
256                    "../migrations/state_store/004_send_queue_with_roomid_value.sql"
257                ))?;
258                txn.set_db_version(4)
259            })
260            .await?;
261        }
262
263        if from < 6 && to >= 6 {
264            conn.with_transaction(move |txn| {
265                // Create new table.
266                txn.execute_batch(include_str!(
267                    "../migrations/state_store/005_send_queue_dependent_events.sql"
268                ))?;
269                txn.set_db_version(6)
270            })
271            .await?;
272        }
273
274        if from < 7 && to >= 7 {
275            conn.with_transaction(move |txn| {
276                // Drop media table.
277                txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
278                txn.set_db_version(7)
279            })
280            .await?;
281        }
282
283        if from < 8 && to >= 8 {
284            // Replace all existing wedged events with a generic error.
285            let error = QueueWedgeError::GenericApiError {
286                msg: "local echo failed to send in a previous session".into(),
287            };
288            let default_err = self.serialize_value(&error)?;
289
290            conn.with_transaction(move |txn| {
291                // Update send queue table to persist the wedge reason if any.
292                txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?;
293
294                // Migrate the data, add a generic error for currently wedged events
295
296                for wedged_entries in txn
297                    .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")?
298                    .query_map((), |row| {
299                        Ok(
300                            (row.get::<_, Vec<u8>>(0)?,row.get::<_, String>(1)?)
301                        )
302                    })? {
303
304                    let (room_id, transaction_id) = wedged_entries?;
305
306                    txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?
307                        .execute((default_err.clone(), room_id, transaction_id))?;
308                }
309
310
311                // Clean up the table now that data is migrated
312                txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?;
313
314                txn.set_db_version(8)
315            })
316                .await?;
317        }
318
319        if from < 9 && to >= 9 {
320            conn.with_transaction(move |txn| {
321                // Run the migration.
322                txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?;
323                txn.set_db_version(9)
324            })
325            .await?;
326        }
327
328        if from < 10 && to >= 10 {
329            conn.with_transaction(move |txn| {
330                // Run the migration.
331                txn.execute_batch(include_str!(
332                    "../migrations/state_store/009_send_queue_priority.sql"
333                ))?;
334                txn.set_db_version(10)
335            })
336            .await?;
337        }
338
339        if from < 11 && to >= 11 {
340            conn.with_transaction(move |txn| {
341                // Run the migration.
342                txn.execute_batch(include_str!(
343                    "../migrations/state_store/010_send_queue_enqueue_time.sql"
344                ))?;
345                txn.set_db_version(11)
346            })
347            .await?;
348        }
349
350        if from < 12 && to >= 12 {
351            // Defragment the DB and optimize its size on the filesystem.
352            // This should have been run in the migration for version 7, to reduce the size
353            // of the DB as we removed the media cache.
354            conn.vacuum().await?;
355            conn.set_kv("version", vec![12]).await?;
356        }
357
358        Ok(())
359    }
360
361    fn encode_value(&self, value: Vec<u8>) -> Result<Vec<u8>> {
362        if let Some(key) = &self.store_cipher {
363            let encrypted = key.encrypt_value_data(value)?;
364            Ok(rmp_serde::to_vec_named(&encrypted)?)
365        } else {
366            Ok(value)
367        }
368    }
369
370    fn serialize_value(&self, value: &impl Serialize) -> Result<Vec<u8>> {
371        let serialized = rmp_serde::to_vec_named(value)?;
372        self.encode_value(serialized)
373    }
374
375    fn serialize_json(&self, value: &impl Serialize) -> Result<Vec<u8>> {
376        let serialized = serde_json::to_vec(value)?;
377        self.encode_value(serialized)
378    }
379
380    fn decode_value<'a>(&self, value: &'a [u8]) -> Result<Cow<'a, [u8]>> {
381        if let Some(key) = &self.store_cipher {
382            let encrypted = rmp_serde::from_slice(value)?;
383            let decrypted = key.decrypt_value_data(encrypted)?;
384            Ok(Cow::Owned(decrypted))
385        } else {
386            Ok(Cow::Borrowed(value))
387        }
388    }
389
390    fn deserialize_json<T: DeserializeOwned>(&self, data: &[u8]) -> Result<T> {
391        let decoded = self.decode_value(data)?;
392        Ok(serde_json::from_slice(&decoded)?)
393    }
394
395    fn deserialize_value<T: DeserializeOwned>(&self, value: &[u8]) -> Result<T> {
396        let decoded = self.decode_value(value)?;
397        Ok(rmp_serde::from_slice(&decoded)?)
398    }
399
400    fn encode_key(&self, table_name: &str, key: impl AsRef<[u8]>) -> Key {
401        let bytes = key.as_ref();
402        if let Some(store_cipher) = &self.store_cipher {
403            Key::Hashed(store_cipher.hash_key(table_name, bytes))
404        } else {
405            Key::Plain(bytes.to_owned())
406        }
407    }
408
409    fn encode_state_store_data_key(&self, key: StateStoreDataKey<'_>) -> Key {
410        let key_s = match key {
411            StateStoreDataKey::SyncToken => Cow::Borrowed(StateStoreDataKey::SYNC_TOKEN),
412            StateStoreDataKey::ServerCapabilities => {
413                Cow::Borrowed(StateStoreDataKey::SERVER_CAPABILITIES)
414            }
415            StateStoreDataKey::Filter(f) => {
416                Cow::Owned(format!("{}:{f}", StateStoreDataKey::FILTER))
417            }
418            StateStoreDataKey::UserAvatarUrl(u) => {
419                Cow::Owned(format!("{}:{u}", StateStoreDataKey::USER_AVATAR_URL))
420            }
421            StateStoreDataKey::RecentlyVisitedRooms(b) => {
422                Cow::Owned(format!("{}:{b}", StateStoreDataKey::RECENTLY_VISITED_ROOMS))
423            }
424            StateStoreDataKey::UtdHookManagerData => {
425                Cow::Borrowed(StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
426            }
427            StateStoreDataKey::ComposerDraft(room_id) => {
428                Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::COMPOSER_DRAFT))
429            }
430            StateStoreDataKey::SeenKnockRequests(room_id) => {
431                Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::SEEN_KNOCK_REQUESTS))
432            }
433        };
434
435        self.encode_key(keys::KV_BLOB, &*key_s)
436    }
437
438    fn encode_presence_key(&self, user_id: &UserId) -> Key {
439        self.encode_key(keys::KV_BLOB, format!("presence:{user_id}"))
440    }
441
442    fn encode_custom_key(&self, key: &[u8]) -> Key {
443        let mut full_key = b"custom:".to_vec();
444        full_key.extend(key);
445        self.encode_key(keys::KV_BLOB, full_key)
446    }
447
448    async fn acquire(&self) -> Result<SqliteAsyncConn> {
449        Ok(self.pool.get().await?)
450    }
451
452    fn remove_maybe_stripped_room_data(
453        &self,
454        txn: &Transaction<'_>,
455        room_id: &RoomId,
456        stripped: bool,
457    ) -> rusqlite::Result<()> {
458        let state_event_room_id = self.encode_key(keys::STATE_EVENT, room_id);
459        txn.remove_room_state_events(&state_event_room_id, Some(stripped))?;
460
461        let member_room_id = self.encode_key(keys::MEMBER, room_id);
462        txn.remove_room_members(&member_room_id, Some(stripped))
463    }
464}
465
466/// Initialize the database.
467async fn init(conn: &SqliteAsyncConn) -> Result<()> {
468    // First turn on WAL mode, this can't be done in the transaction, it fails with
469    // the error message: "cannot change into wal mode from within a transaction".
470    conn.execute_batch("PRAGMA journal_mode = wal;").await?;
471    conn.with_transaction(|txn| {
472        txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
473        txn.set_db_version(1)?;
474
475        Ok(())
476    })
477    .await
478}
479
480trait SqliteConnectionStateStoreExt {
481    fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()>;
482
483    fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()>;
484
485    fn set_room_account_data(
486        &self,
487        room_id: &[u8],
488        event_type: &[u8],
489        data: &[u8],
490    ) -> rusqlite::Result<()>;
491    fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()>;
492
493    fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()>;
494    fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>>;
495    fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()>;
496
497    fn set_state_event(
498        &self,
499        room_id: &[u8],
500        event_type: &[u8],
501        state_key: &[u8],
502        stripped: bool,
503        event_id: Option<&[u8]>,
504        data: &[u8],
505    ) -> rusqlite::Result<()>;
506    fn get_state_event_by_id(
507        &self,
508        room_id: &[u8],
509        event_id: &[u8],
510    ) -> rusqlite::Result<Option<Vec<u8>>>;
511    fn remove_room_state_events(
512        &self,
513        room_id: &[u8],
514        stripped: Option<bool>,
515    ) -> rusqlite::Result<()>;
516
517    fn set_member(
518        &self,
519        room_id: &[u8],
520        user_id: &[u8],
521        membership: &[u8],
522        stripped: bool,
523        data: &[u8],
524    ) -> rusqlite::Result<()>;
525    fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()>;
526
527    fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
528    fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()>;
529    fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()>;
530
531    fn set_receipt(
532        &self,
533        room_id: &[u8],
534        user_id: &[u8],
535        receipt_type: &[u8],
536        thread_id: &[u8],
537        event_id: &[u8],
538        data: &[u8],
539    ) -> rusqlite::Result<()>;
540    fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()>;
541
542    fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()>;
543    fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()>;
544    fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()>;
545    fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
546    fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
547}
548
549impl SqliteConnectionStateStoreExt for rusqlite::Connection {
550    fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()> {
551        self.execute("INSERT OR REPLACE INTO kv_blob VALUES (?, ?)", (key, value))?;
552        Ok(())
553    }
554
555    fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()> {
556        self.prepare_cached(
557            "INSERT OR REPLACE INTO global_account_data (event_type, data)
558             VALUES (?, ?)",
559        )?
560        .execute((event_type, data))?;
561        Ok(())
562    }
563
564    fn set_room_account_data(
565        &self,
566        room_id: &[u8],
567        event_type: &[u8],
568        data: &[u8],
569    ) -> rusqlite::Result<()> {
570        self.prepare_cached(
571            "INSERT OR REPLACE INTO room_account_data (room_id, event_type, data)
572             VALUES (?, ?, ?)",
573        )?
574        .execute((room_id, event_type, data))?;
575        Ok(())
576    }
577
578    fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()> {
579        self.prepare(
580            "DELETE FROM room_account_data
581             WHERE room_id = ?",
582        )?
583        .execute((room_id,))?;
584        Ok(())
585    }
586
587    fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()> {
588        self.prepare_cached(
589            "INSERT OR REPLACE INTO room_info (room_id, state, data)
590             VALUES (?, ?, ?)",
591        )?
592        .execute((room_id, state, data))?;
593        Ok(())
594    }
595
596    fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>> {
597        self.query_row("SELECT data FROM room_info WHERE room_id = ?", (room_id,), |row| row.get(0))
598            .optional()
599    }
600
601    /// Remove the room info for the given room.
602    fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()> {
603        self.prepare_cached("DELETE FROM room_info WHERE room_id = ?")?.execute((room_id,))?;
604        Ok(())
605    }
606
607    fn set_state_event(
608        &self,
609        room_id: &[u8],
610        event_type: &[u8],
611        state_key: &[u8],
612        stripped: bool,
613        event_id: Option<&[u8]>,
614        data: &[u8],
615    ) -> rusqlite::Result<()> {
616        self.prepare_cached(
617            "INSERT OR REPLACE
618             INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
619             VALUES (?, ?, ?, ?, ?, ?)",
620        )?
621        .execute((room_id, event_type, state_key, stripped, event_id, data))?;
622        Ok(())
623    }
624
625    fn get_state_event_by_id(
626        &self,
627        room_id: &[u8],
628        event_id: &[u8],
629    ) -> rusqlite::Result<Option<Vec<u8>>> {
630        self.query_row(
631            "SELECT data FROM state_event WHERE room_id = ? AND event_id = ?",
632            (room_id, event_id),
633            |row| row.get(0),
634        )
635        .optional()
636    }
637
638    /// Remove state events for the given room.
639    ///
640    /// If `stripped` is `Some()`, only removes state events for the given
641    /// stripped state. Otherwise, state events are removed regardless of the
642    /// stripped state.
643    fn remove_room_state_events(
644        &self,
645        room_id: &[u8],
646        stripped: Option<bool>,
647    ) -> rusqlite::Result<()> {
648        if let Some(stripped) = stripped {
649            self.prepare_cached("DELETE FROM state_event WHERE room_id = ? AND stripped = ?")?
650                .execute((room_id, stripped))?;
651        } else {
652            self.prepare_cached("DELETE FROM state_event WHERE room_id = ?")?
653                .execute((room_id,))?;
654        }
655        Ok(())
656    }
657
658    fn set_member(
659        &self,
660        room_id: &[u8],
661        user_id: &[u8],
662        membership: &[u8],
663        stripped: bool,
664        data: &[u8],
665    ) -> rusqlite::Result<()> {
666        self.prepare_cached(
667            "INSERT OR REPLACE
668             INTO member (room_id, user_id, membership, stripped, data)
669             VALUES (?, ?, ?, ?, ?)",
670        )?
671        .execute((room_id, user_id, membership, stripped, data))?;
672        Ok(())
673    }
674
675    /// Remove members for the given room.
676    ///
677    /// If `stripped` is `Some()`, only removes members for the given stripped
678    /// state. Otherwise, members are removed regardless of the stripped state.
679    fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()> {
680        if let Some(stripped) = stripped {
681            self.prepare_cached("DELETE FROM member WHERE room_id = ? AND stripped = ?")?
682                .execute((room_id, stripped))?;
683        } else {
684            self.prepare_cached("DELETE FROM member WHERE room_id = ?")?.execute((room_id,))?;
685        }
686        Ok(())
687    }
688
689    fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
690        self.prepare_cached(
691            "INSERT OR REPLACE
692             INTO profile (room_id, user_id, data)
693             VALUES (?, ?, ?)",
694        )?
695        .execute((room_id, user_id, data))?;
696        Ok(())
697    }
698
699    fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()> {
700        self.prepare("DELETE FROM profile WHERE room_id = ?")?.execute((room_id,))?;
701        Ok(())
702    }
703
704    fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()> {
705        self.prepare("DELETE FROM profile WHERE room_id = ? AND user_id = ?")?
706            .execute((room_id, user_id))?;
707        Ok(())
708    }
709
710    fn set_receipt(
711        &self,
712        room_id: &[u8],
713        user_id: &[u8],
714        receipt_type: &[u8],
715        thread: &[u8],
716        event_id: &[u8],
717        data: &[u8],
718    ) -> rusqlite::Result<()> {
719        self.prepare_cached(
720            "INSERT OR REPLACE
721             INTO receipt (room_id, user_id, receipt_type, thread, event_id, data)
722             VALUES (?, ?, ?, ?, ?, ?)",
723        )?
724        .execute((room_id, user_id, receipt_type, thread, event_id, data))?;
725        Ok(())
726    }
727
728    fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()> {
729        self.prepare("DELETE FROM receipt WHERE room_id = ?")?.execute((room_id,))?;
730        Ok(())
731    }
732
733    fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
734        self.prepare_cached(
735            "INSERT OR REPLACE
736             INTO display_name (room_id, name, data)
737             VALUES (?, ?, ?)",
738        )?
739        .execute((room_id, name, data))?;
740        Ok(())
741    }
742
743    fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()> {
744        self.prepare("DELETE FROM display_name WHERE room_id = ? AND name = ?")?
745            .execute((room_id, name))?;
746        Ok(())
747    }
748
749    fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()> {
750        self.prepare("DELETE FROM display_name WHERE room_id = ?")?.execute((room_id,))?;
751        Ok(())
752    }
753
754    fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
755        self.prepare("DELETE FROM send_queue_events WHERE room_id = ?")?.execute((room_id,))?;
756        Ok(())
757    }
758
759    fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
760        self.prepare("DELETE FROM dependent_send_queue_events WHERE room_id = ?")?
761            .execute((room_id,))?;
762        Ok(())
763    }
764}
765
766#[async_trait]
767trait SqliteObjectStateStoreExt: SqliteAsyncConnExt {
768    async fn get_kv_blob(&self, key: Key) -> Result<Option<Vec<u8>>> {
769        Ok(self
770            .query_row("SELECT value FROM kv_blob WHERE key = ?", (key,), |row| row.get(0))
771            .await
772            .optional()?)
773    }
774
775    async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
776        let keys_length = keys.len();
777
778        self.chunk_large_query_over(keys, Some(keys_length), |txn, keys| {
779            let sql_params = repeat_vars(keys.len());
780            let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");
781
782            let params = rusqlite::params_from_iter(keys);
783
784            Ok(txn
785                .prepare(&sql)?
786                .query(params)?
787                .mapped(|row| row.get(0))
788                .collect::<Result<_, _>>()?)
789        })
790        .await
791    }
792
793    async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;
794
795    async fn delete_kv_blob(&self, key: Key) -> Result<()> {
796        self.execute("DELETE FROM kv_blob WHERE key = ?", (key,)).await?;
797        Ok(())
798    }
799
800    async fn get_room_infos(&self, room_id: Option<Key>) -> Result<Vec<Vec<u8>>> {
801        Ok(match room_id {
802            None => {
803                self.prepare("SELECT data FROM room_info", move |mut stmt| {
804                    stmt.query_map((), |row| row.get(0))?.collect()
805                })
806                .await?
807            }
808
809            Some(room_id) => {
810                self.prepare("SELECT data FROM room_info WHERE room_id = ?", move |mut stmt| {
811                    stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
812                })
813                .await?
814            }
815        })
816    }
817
818    async fn get_maybe_stripped_state_events_for_keys(
819        &self,
820        room_id: Key,
821        event_type: Key,
822        state_keys: Vec<Key>,
823    ) -> Result<Vec<(bool, Vec<u8>)>> {
824        self.chunk_large_query_over(state_keys, None, move |txn, state_keys: Vec<Key>| {
825            let sql_params = repeat_vars(state_keys.len());
826            let sql = format!(
827                "SELECT stripped, data FROM state_event
828                 WHERE room_id = ? AND event_type = ? AND state_key IN ({sql_params})"
829            );
830
831            let params = rusqlite::params_from_iter(
832                [room_id.clone(), event_type.clone()].into_iter().chain(state_keys),
833            );
834
835            Ok(txn
836                .prepare(&sql)?
837                .query(params)?
838                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
839                .collect::<Result<_, _>>()?)
840        })
841        .await
842    }
843
844    async fn get_maybe_stripped_state_events(
845        &self,
846        room_id: Key,
847        event_type: Key,
848    ) -> Result<Vec<(bool, Vec<u8>)>> {
849        Ok(self
850            .prepare(
851                "SELECT stripped, data FROM state_event
852                 WHERE room_id = ? AND event_type = ?",
853                |mut stmt| {
854                    stmt.query((room_id, event_type))?
855                        .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
856                        .collect()
857                },
858            )
859            .await?)
860    }
861
862    async fn get_profiles(
863        &self,
864        room_id: Key,
865        user_ids: Vec<Key>,
866    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
867        let user_ids_length = user_ids.len();
868
869        self.chunk_large_query_over(user_ids, Some(user_ids_length), move |txn, user_ids| {
870            let sql_params = repeat_vars(user_ids.len());
871            let sql = format!(
872                "SELECT user_id, data FROM profile WHERE room_id = ? AND user_id IN ({sql_params})"
873            );
874
875            let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(user_ids));
876
877            Ok(txn
878                .prepare(&sql)?
879                .query(params)?
880                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
881                .collect::<Result<_, _>>()?)
882        })
883        .await
884    }
885
886    async fn get_user_ids(&self, room_id: Key, memberships: Vec<Key>) -> Result<Vec<Vec<u8>>> {
887        let res = if memberships.is_empty() {
888            self.prepare("SELECT data FROM member WHERE room_id = ?", |mut stmt| {
889                stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
890            })
891            .await?
892        } else {
893            self.chunk_large_query_over(memberships, None, move |txn, memberships| {
894                let sql_params = repeat_vars(memberships.len());
895                let sql = format!(
896                    "SELECT data FROM member WHERE room_id = ? AND membership IN ({sql_params})"
897                );
898
899                let params =
900                    rusqlite::params_from_iter(iter::once(room_id.clone()).chain(memberships));
901
902                Ok(txn
903                    .prepare(&sql)?
904                    .query(params)?
905                    .mapped(|row| row.get(0))
906                    .collect::<Result<_, _>>()?)
907            })
908            .await?
909        };
910
911        Ok(res)
912    }
913
914    async fn get_global_account_data(&self, event_type: Key) -> Result<Option<Vec<u8>>> {
915        Ok(self
916            .query_row(
917                "SELECT data FROM global_account_data WHERE event_type = ?",
918                (event_type,),
919                |row| row.get(0),
920            )
921            .await
922            .optional()?)
923    }
924
925    async fn get_room_account_data(
926        &self,
927        room_id: Key,
928        event_type: Key,
929    ) -> Result<Option<Vec<u8>>> {
930        Ok(self
931            .query_row(
932                "SELECT data FROM room_account_data WHERE room_id = ? AND event_type = ?",
933                (room_id, event_type),
934                |row| row.get(0),
935            )
936            .await
937            .optional()?)
938    }
939
940    async fn get_display_names(
941        &self,
942        room_id: Key,
943        names: Vec<Key>,
944    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
945        let names_length = names.len();
946
947        self.chunk_large_query_over(names, Some(names_length), move |txn, names| {
948            let sql_params = repeat_vars(names.len());
949            let sql = format!(
950                "SELECT name, data FROM display_name WHERE room_id = ? AND name IN ({sql_params})"
951            );
952
953            let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(names));
954
955            Ok(txn
956                .prepare(&sql)?
957                .query(params)?
958                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
959                .collect::<Result<_, _>>()?)
960        })
961        .await
962    }
963
964    async fn get_user_receipt(
965        &self,
966        room_id: Key,
967        receipt_type: Key,
968        thread: Key,
969        user_id: Key,
970    ) -> Result<Option<Vec<u8>>> {
971        Ok(self
972            .query_row(
973                "SELECT data FROM receipt
974                 WHERE room_id = ? AND receipt_type = ? AND thread = ? and user_id = ?",
975                (room_id, receipt_type, thread, user_id),
976                |row| row.get(0),
977            )
978            .await
979            .optional()?)
980    }
981
982    async fn get_event_receipts(
983        &self,
984        room_id: Key,
985        receipt_type: Key,
986        thread: Key,
987        event_id: Key,
988    ) -> Result<Vec<Vec<u8>>> {
989        Ok(self
990            .prepare(
991                "SELECT data FROM receipt
992                 WHERE room_id = ? AND receipt_type = ? AND thread = ? and event_id = ?",
993                |mut stmt| {
994                    stmt.query((room_id, receipt_type, thread, event_id))?
995                        .mapped(|row| row.get(0))
996                        .collect()
997                },
998            )
999            .await?)
1000    }
1001}
1002
1003#[async_trait]
1004impl SqliteObjectStateStoreExt for SqliteAsyncConn {
1005    async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()> {
1006        Ok(self.interact(move |conn| conn.set_kv_blob(&key, &value)).await.unwrap()?)
1007    }
1008}
1009
1010#[async_trait]
1011impl StateStore for SqliteStateStore {
1012    type Error = Error;
1013
1014    async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
1015        self.acquire()
1016            .await?
1017            .get_kv_blob(self.encode_state_store_data_key(key))
1018            .await?
1019            .map(|data| {
1020                Ok(match key {
1021                    StateStoreDataKey::SyncToken => {
1022                        StateStoreDataValue::SyncToken(self.deserialize_value(&data)?)
1023                    }
1024                    StateStoreDataKey::ServerCapabilities => {
1025                        StateStoreDataValue::ServerCapabilities(self.deserialize_value(&data)?)
1026                    }
1027                    StateStoreDataKey::Filter(_) => {
1028                        StateStoreDataValue::Filter(self.deserialize_value(&data)?)
1029                    }
1030                    StateStoreDataKey::UserAvatarUrl(_) => {
1031                        StateStoreDataValue::UserAvatarUrl(self.deserialize_value(&data)?)
1032                    }
1033                    StateStoreDataKey::RecentlyVisitedRooms(_) => {
1034                        StateStoreDataValue::RecentlyVisitedRooms(self.deserialize_value(&data)?)
1035                    }
1036                    StateStoreDataKey::UtdHookManagerData => {
1037                        StateStoreDataValue::UtdHookManagerData(self.deserialize_value(&data)?)
1038                    }
1039                    StateStoreDataKey::ComposerDraft(_) => {
1040                        StateStoreDataValue::ComposerDraft(self.deserialize_value(&data)?)
1041                    }
1042                    StateStoreDataKey::SeenKnockRequests(_) => {
1043                        StateStoreDataValue::SeenKnockRequests(self.deserialize_value(&data)?)
1044                    }
1045                })
1046            })
1047            .transpose()
1048    }
1049
1050    async fn set_kv_data(
1051        &self,
1052        key: StateStoreDataKey<'_>,
1053        value: StateStoreDataValue,
1054    ) -> Result<()> {
1055        let serialized_value = match key {
1056            StateStoreDataKey::SyncToken => self.serialize_value(
1057                &value.into_sync_token().expect("Session data not a sync token"),
1058            )?,
1059            StateStoreDataKey::ServerCapabilities => self.serialize_value(
1060                &value
1061                    .into_server_capabilities()
1062                    .expect("Session data not containing server capabilities"),
1063            )?,
1064            StateStoreDataKey::Filter(_) => {
1065                self.serialize_value(&value.into_filter().expect("Session data not a filter"))?
1066            }
1067            StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
1068                &value.into_user_avatar_url().expect("Session data not an user avatar url"),
1069            )?,
1070            StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
1071                &value.into_recently_visited_rooms().expect("Session data not breadcrumbs"),
1072            )?,
1073            StateStoreDataKey::UtdHookManagerData => self.serialize_value(
1074                &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
1075            )?,
1076            StateStoreDataKey::ComposerDraft(_) => self.serialize_value(
1077                &value.into_composer_draft().expect("Session data not a composer draft"),
1078            )?,
1079            StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
1080                &value
1081                    .into_seen_knock_requests()
1082                    .expect("Session data is not a set of seen knock request ids"),
1083            )?,
1084        };
1085
1086        self.acquire()
1087            .await?
1088            .set_kv_blob(self.encode_state_store_data_key(key), serialized_value)
1089            .await
1090    }
1091
1092    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
1093        self.acquire().await?.delete_kv_blob(self.encode_state_store_data_key(key)).await
1094    }
1095
1096    async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
1097        let changes = changes.to_owned();
1098        let this = self.clone();
1099        self.acquire()
1100            .await?
1101            .with_transaction(move |txn| {
1102                let StateChanges {
1103                    sync_token,
1104                    account_data,
1105                    presence,
1106                    profiles,
1107                    profiles_to_delete,
1108                    state,
1109                    room_account_data,
1110                    room_infos,
1111                    receipts,
1112                    redactions,
1113                    stripped_state,
1114                    ambiguity_maps,
1115                } = changes;
1116
1117                if let Some(sync_token) = sync_token {
1118                    let key = this.encode_state_store_data_key(StateStoreDataKey::SyncToken);
1119                    let value = this.serialize_value(&sync_token)?;
1120                    txn.set_kv_blob(&key, &value)?;
1121                }
1122
1123                for (event_type, event) in account_data {
1124                    let event_type =
1125                        this.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1126                    let data = this.serialize_json(&event)?;
1127                    txn.set_global_account_data(&event_type, &data)?;
1128                }
1129
1130                for (room_id, events) in room_account_data {
1131                    let room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1132                    for (event_type, event) in events {
1133                        let event_type =
1134                            this.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1135                        let data = this.serialize_json(&event)?;
1136                        txn.set_room_account_data(&room_id, &event_type, &data)?;
1137                    }
1138                }
1139
1140                for (user_id, event) in presence {
1141                    let key = this.encode_presence_key(&user_id);
1142                    let value = this.serialize_json(&event)?;
1143                    txn.set_kv_blob(&key, &value)?;
1144                }
1145
1146                for (room_id, room_info) in room_infos {
1147                    let stripped = room_info.state() == RoomState::Invited;
1148                    // Remove non-stripped data for stripped rooms and vice-versa.
1149                    this.remove_maybe_stripped_room_data(txn, &room_id, !stripped)?;
1150
1151                    let room_id = this.encode_key(keys::ROOM_INFO, room_id);
1152                    let state = this
1153                        .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
1154                    let data = this.serialize_json(&room_info)?;
1155                    txn.set_room_info(&room_id, &state, &data)?;
1156                }
1157
1158                for (room_id, user_ids) in profiles_to_delete {
1159                    let room_id = this.encode_key(keys::PROFILE, room_id);
1160                    for user_id in user_ids {
1161                        let user_id = this.encode_key(keys::PROFILE, user_id);
1162                        txn.remove_room_profile(&room_id, &user_id)?;
1163                    }
1164                }
1165
1166                for (room_id, state_event_types) in state {
1167                    let profiles = profiles.get(&room_id);
1168                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1169
1170                    for (event_type, state_events) in state_event_types {
1171                        let encoded_event_type =
1172                            this.encode_key(keys::STATE_EVENT, event_type.to_string());
1173
1174                        for (state_key, raw_state_event) in state_events {
1175                            let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1176                            let data = this.serialize_json(&raw_state_event)?;
1177
1178                            let event_id: Option<String> =
1179                                raw_state_event.get_field("event_id").ok().flatten();
1180                            let encoded_event_id =
1181                                event_id.as_ref().map(|e| this.encode_key(keys::STATE_EVENT, e));
1182
1183                            txn.set_state_event(
1184                                &encoded_room_id,
1185                                &encoded_event_type,
1186                                &encoded_state_key,
1187                                false,
1188                                encoded_event_id.as_deref(),
1189                                &data,
1190                            )?;
1191
1192                            if event_type == StateEventType::RoomMember {
1193                                let member_event = match raw_state_event
1194                                    .deserialize_as::<SyncRoomMemberEvent>()
1195                                {
1196                                    Ok(ev) => ev,
1197                                    Err(e) => {
1198                                        debug!(event_id, "Failed to deserialize member event: {e}");
1199                                        continue;
1200                                    }
1201                                };
1202
1203                                let encoded_room_id = this.encode_key(keys::MEMBER, &room_id);
1204                                let user_id = this.encode_key(keys::MEMBER, &state_key);
1205                                let membership = this
1206                                    .encode_key(keys::MEMBER, member_event.membership().as_str());
1207                                let data = this.serialize_value(&state_key)?;
1208
1209                                txn.set_member(
1210                                    &encoded_room_id,
1211                                    &user_id,
1212                                    &membership,
1213                                    false,
1214                                    &data,
1215                                )?;
1216
1217                                if let Some(profile) =
1218                                    profiles.and_then(|p| p.get(member_event.state_key()))
1219                                {
1220                                    let room_id = this.encode_key(keys::PROFILE, &room_id);
1221                                    let user_id = this.encode_key(keys::PROFILE, &state_key);
1222                                    let data = this.serialize_json(&profile)?;
1223                                    txn.set_profile(&room_id, &user_id, &data)?;
1224                                }
1225                            }
1226                        }
1227                    }
1228                }
1229
1230                for (room_id, stripped_state_event_types) in stripped_state {
1231                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1232
1233                    for (event_type, stripped_state_events) in stripped_state_event_types {
1234                        let encoded_event_type =
1235                            this.encode_key(keys::STATE_EVENT, event_type.to_string());
1236
1237                        for (state_key, raw_stripped_state_event) in stripped_state_events {
1238                            let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1239                            let data = this.serialize_json(&raw_stripped_state_event)?;
1240                            txn.set_state_event(
1241                                &encoded_room_id,
1242                                &encoded_event_type,
1243                                &encoded_state_key,
1244                                true,
1245                                None,
1246                                &data,
1247                            )?;
1248
1249                            if event_type == StateEventType::RoomMember {
1250                                let member_event = match raw_stripped_state_event
1251                                    .deserialize_as::<StrippedRoomMemberEvent>(
1252                                ) {
1253                                    Ok(ev) => ev,
1254                                    Err(e) => {
1255                                        debug!("Failed to deserialize stripped member event: {e}");
1256                                        continue;
1257                                    }
1258                                };
1259
1260                                let room_id = this.encode_key(keys::MEMBER, &room_id);
1261                                let user_id = this.encode_key(keys::MEMBER, &state_key);
1262                                let membership = this.encode_key(
1263                                    keys::MEMBER,
1264                                    member_event.content.membership.as_str(),
1265                                );
1266                                let data = this.serialize_value(&state_key)?;
1267
1268                                txn.set_member(&room_id, &user_id, &membership, true, &data)?;
1269                            }
1270                        }
1271                    }
1272                }
1273
1274                for (room_id, receipt_event) in receipts {
1275                    let room_id = this.encode_key(keys::RECEIPT, room_id);
1276
1277                    for (event_id, receipt_types) in receipt_event {
1278                        let encoded_event_id = this.encode_key(keys::RECEIPT, &event_id);
1279
1280                        for (receipt_type, receipt_users) in receipt_types {
1281                            let receipt_type =
1282                                this.encode_key(keys::RECEIPT, receipt_type.as_str());
1283
1284                            for (user_id, receipt) in receipt_users {
1285                                let encoded_user_id = this.encode_key(keys::RECEIPT, &user_id);
1286                                // We cannot have a NULL primary key so we rely on serialization
1287                                // instead of the string representation.
1288                                let thread = this.encode_key(
1289                                    keys::RECEIPT,
1290                                    rmp_serde::to_vec_named(&receipt.thread)?,
1291                                );
1292                                let data = this.serialize_json(&ReceiptData {
1293                                    receipt,
1294                                    event_id: event_id.clone(),
1295                                    user_id,
1296                                })?;
1297
1298                                txn.set_receipt(
1299                                    &room_id,
1300                                    &encoded_user_id,
1301                                    &receipt_type,
1302                                    &thread,
1303                                    &encoded_event_id,
1304                                    &data,
1305                                )?;
1306                            }
1307                        }
1308                    }
1309                }
1310
1311                for (room_id, redactions) in redactions {
1312                    let make_room_version = || {
1313                        let encoded_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1314                        txn.get_room_info(&encoded_room_id)
1315                            .ok()
1316                            .flatten()
1317                            .and_then(|v| this.deserialize_json::<RoomInfo>(&v).ok())
1318                            .and_then(|info| info.room_version().cloned())
1319                            .unwrap_or_else(|| {
1320                                warn!(
1321                                    ?room_id,
1322                                    "Unable to find the room version, assume version 9"
1323                                );
1324                                RoomVersionId::V9
1325                            })
1326                    };
1327
1328                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1329                    let mut room_version = None;
1330
1331                    for (event_id, redaction) in redactions {
1332                        let event_id = this.encode_key(keys::STATE_EVENT, event_id);
1333
1334                        if let Some(Ok(raw_event)) = txn
1335                            .get_state_event_by_id(&encoded_room_id, &event_id)?
1336                            .map(|value| this.deserialize_json::<Raw<AnySyncStateEvent>>(&value))
1337                        {
1338                            let event = raw_event.deserialize()?;
1339                            let redacted = redact(
1340                                raw_event.deserialize_as::<CanonicalJsonObject>()?,
1341                                room_version.get_or_insert_with(make_room_version),
1342                                Some(RedactedBecause::from_raw_event(&redaction)?),
1343                            )
1344                            .map_err(Error::Redaction)?;
1345                            let data = this.serialize_json(&redacted)?;
1346
1347                            let event_type =
1348                                this.encode_key(keys::STATE_EVENT, event.event_type().to_string());
1349                            let state_key = this.encode_key(keys::STATE_EVENT, event.state_key());
1350
1351                            txn.set_state_event(
1352                                &encoded_room_id,
1353                                &event_type,
1354                                &state_key,
1355                                false,
1356                                Some(&event_id),
1357                                &data,
1358                            )?;
1359                        }
1360                    }
1361                }
1362
1363                for (room_id, display_names) in ambiguity_maps {
1364                    let room_id = this.encode_key(keys::DISPLAY_NAME, room_id);
1365
1366                    for (name, user_ids) in display_names {
1367                        let encoded_name = this.encode_key(
1368                            keys::DISPLAY_NAME,
1369                            name.as_normalized_str().unwrap_or_else(|| name.as_raw_str()),
1370                        );
1371                        let data = this.serialize_json(&user_ids)?;
1372
1373                        if user_ids.is_empty() {
1374                            txn.remove_display_name(&room_id, &encoded_name)?;
1375
1376                            // We can't do a migration to merge the previously distinct buckets of
1377                            // user IDs since the display names themselves are hashed before they
1378                            // are persisted in the store. So the store will always retain two
1379                            // buckets: one for raw display names and one for normalised ones.
1380                            //
1381                            // We therefore do the next best thing, which is a sort of a soft
1382                            // migration: we fetch both the raw and normalised buckets, then merge
1383                            // the user IDs contained in them into a separate, temporary merged
1384                            // bucket. The SDK then operates on the merged buckets exclusively. See
1385                            // the comment in `get_users_with_display_names` for details.
1386                            //
1387                            // If the merged bucket is empty, that must mean that both the raw and
1388                            // normalised buckets were also empty, so we can remove both from the
1389                            // store.
1390                            let raw_name = this.encode_key(keys::DISPLAY_NAME, name.as_raw_str());
1391                            txn.remove_display_name(&room_id, &raw_name)?;
1392                        } else {
1393                            // We only create new buckets with the normalized display name.
1394                            txn.set_display_name(&room_id, &encoded_name, &data)?;
1395                        }
1396                    }
1397                }
1398
1399                Ok::<_, Error>(())
1400            })
1401            .await?;
1402
1403        Ok(())
1404    }
1405
1406    async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1407        self.acquire()
1408            .await?
1409            .get_kv_blob(self.encode_presence_key(user_id))
1410            .await?
1411            .map(|data| self.deserialize_json(&data))
1412            .transpose()
1413    }
1414
1415    async fn get_presence_events(
1416        &self,
1417        user_ids: &[OwnedUserId],
1418    ) -> Result<Vec<Raw<PresenceEvent>>> {
1419        if user_ids.is_empty() {
1420            return Ok(Vec::new());
1421        }
1422
1423        let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
1424        self.acquire()
1425            .await?
1426            .get_kv_blobs(user_ids)
1427            .await?
1428            .into_iter()
1429            .map(|data| self.deserialize_json(&data))
1430            .collect()
1431    }
1432
1433    async fn get_state_event(
1434        &self,
1435        room_id: &RoomId,
1436        event_type: StateEventType,
1437        state_key: &str,
1438    ) -> Result<Option<RawAnySyncOrStrippedState>> {
1439        Ok(self
1440            .get_state_events_for_keys(room_id, event_type, &[state_key])
1441            .await?
1442            .into_iter()
1443            .next())
1444    }
1445
1446    async fn get_state_events(
1447        &self,
1448        room_id: &RoomId,
1449        event_type: StateEventType,
1450    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1451        let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1452        let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1453        self.acquire()
1454            .await?
1455            .get_maybe_stripped_state_events(room_id, event_type)
1456            .await?
1457            .into_iter()
1458            .map(|(stripped, data)| {
1459                let ev = if stripped {
1460                    RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1461                } else {
1462                    RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1463                };
1464
1465                Ok(ev)
1466            })
1467            .collect()
1468    }
1469
1470    async fn get_state_events_for_keys(
1471        &self,
1472        room_id: &RoomId,
1473        event_type: StateEventType,
1474        state_keys: &[&str],
1475    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1476        if state_keys.is_empty() {
1477            return Ok(Vec::new());
1478        }
1479
1480        let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1481        let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1482        let state_keys = state_keys.iter().map(|k| self.encode_key(keys::STATE_EVENT, k)).collect();
1483        self.acquire()
1484            .await?
1485            .get_maybe_stripped_state_events_for_keys(room_id, event_type, state_keys)
1486            .await?
1487            .into_iter()
1488            .map(|(stripped, data)| {
1489                let ev = if stripped {
1490                    RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1491                } else {
1492                    RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1493                };
1494
1495                Ok(ev)
1496            })
1497            .collect()
1498    }
1499
1500    async fn get_profile(
1501        &self,
1502        room_id: &RoomId,
1503        user_id: &UserId,
1504    ) -> Result<Option<MinimalRoomMemberEvent>> {
1505        let room_id = self.encode_key(keys::PROFILE, room_id);
1506        let user_ids = vec![self.encode_key(keys::PROFILE, user_id)];
1507
1508        self.acquire()
1509            .await?
1510            .get_profiles(room_id, user_ids)
1511            .await?
1512            .into_iter()
1513            .next()
1514            .map(|(_, data)| self.deserialize_json(&data))
1515            .transpose()
1516    }
1517
1518    async fn get_profiles<'a>(
1519        &self,
1520        room_id: &RoomId,
1521        user_ids: &'a [OwnedUserId],
1522    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1523        if user_ids.is_empty() {
1524            return Ok(BTreeMap::new());
1525        }
1526
1527        let room_id = self.encode_key(keys::PROFILE, room_id);
1528        let mut user_ids_map = user_ids
1529            .iter()
1530            .map(|u| (self.encode_key(keys::PROFILE, u), u.as_ref()))
1531            .collect::<BTreeMap<_, _>>();
1532        let user_ids = user_ids_map.keys().cloned().collect();
1533
1534        self.acquire()
1535            .await?
1536            .get_profiles(room_id, user_ids)
1537            .await?
1538            .into_iter()
1539            .map(|(user_id, data)| {
1540                Ok((
1541                    user_ids_map
1542                        .remove(user_id.as_slice())
1543                        .expect("returned user IDs were requested"),
1544                    self.deserialize_json(&data)?,
1545                ))
1546            })
1547            .collect()
1548    }
1549
1550    async fn get_user_ids(
1551        &self,
1552        room_id: &RoomId,
1553        membership: RoomMemberships,
1554    ) -> Result<Vec<OwnedUserId>> {
1555        let room_id = self.encode_key(keys::MEMBER, room_id);
1556        let memberships = membership
1557            .as_vec()
1558            .into_iter()
1559            .map(|m| self.encode_key(keys::MEMBER, m.as_str()))
1560            .collect();
1561        self.acquire()
1562            .await?
1563            .get_user_ids(room_id, memberships)
1564            .await?
1565            .iter()
1566            .map(|data| self.deserialize_value(data))
1567            .collect()
1568    }
1569
1570    async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1571        self.acquire()
1572            .await?
1573            .get_room_infos(match room_load_settings {
1574                RoomLoadSettings::All => None,
1575                RoomLoadSettings::One(room_id) => Some(self.encode_key(keys::ROOM_INFO, room_id)),
1576            })
1577            .await?
1578            .into_iter()
1579            .map(|data| self.deserialize_json(&data))
1580            .collect()
1581    }
1582
1583    async fn get_users_with_display_name(
1584        &self,
1585        room_id: &RoomId,
1586        display_name: &DisplayName,
1587    ) -> Result<BTreeSet<OwnedUserId>> {
1588        let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1589        let names = vec![self.encode_key(
1590            keys::DISPLAY_NAME,
1591            display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1592        )];
1593
1594        Ok(self
1595            .acquire()
1596            .await?
1597            .get_display_names(room_id, names)
1598            .await?
1599            .into_iter()
1600            .next()
1601            .map(|(_, data)| self.deserialize_json(&data))
1602            .transpose()?
1603            .unwrap_or_default())
1604    }
1605
1606    async fn get_users_with_display_names<'a>(
1607        &self,
1608        room_id: &RoomId,
1609        display_names: &'a [DisplayName],
1610    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1611        let mut result = HashMap::new();
1612
1613        if display_names.is_empty() {
1614            return Ok(result);
1615        }
1616
1617        let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1618        let mut names_map = display_names
1619            .iter()
1620            .flat_map(|display_name| {
1621                // We encode the display name as the `raw_str()` and the normalized string.
1622                //
1623                // This is for compatibility reasons since:
1624                //  1. Previously "Alice" and "alice" were considered to be distinct display
1625                //     names, while we now consider them to be the same so we need to merge the
1626                //     previously distinct buckets of user IDs.
1627                //  2. We can't do a migration to merge the previously distinct buckets of user
1628                //     IDs since the display names itself are hashed before they are persisted
1629                //     in the store.
1630                let raw =
1631                    (self.encode_key(keys::DISPLAY_NAME, display_name.as_raw_str()), display_name);
1632                let normalized = display_name.as_normalized_str().map(|normalized| {
1633                    (self.encode_key(keys::DISPLAY_NAME, normalized), display_name)
1634                });
1635
1636                iter::once(raw).chain(normalized.into_iter())
1637            })
1638            .collect::<BTreeMap<_, _>>();
1639        let names = names_map.keys().cloned().collect();
1640
1641        for (name, data) in
1642            self.acquire().await?.get_display_names(room_id, names).await?.into_iter()
1643        {
1644            let display_name =
1645                names_map.remove(name.as_slice()).expect("returned display names were requested");
1646            let user_ids: BTreeSet<_> = self.deserialize_json(&data)?;
1647
1648            result.entry(display_name).or_insert_with(BTreeSet::new).extend(user_ids);
1649        }
1650
1651        Ok(result)
1652    }
1653
1654    async fn get_account_data_event(
1655        &self,
1656        event_type: GlobalAccountDataEventType,
1657    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1658        let event_type = self.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1659        self.acquire()
1660            .await?
1661            .get_global_account_data(event_type)
1662            .await?
1663            .map(|value| self.deserialize_json(&value))
1664            .transpose()
1665    }
1666
1667    async fn get_room_account_data_event(
1668        &self,
1669        room_id: &RoomId,
1670        event_type: RoomAccountDataEventType,
1671    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1672        let room_id = self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1673        let event_type = self.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1674        self.acquire()
1675            .await?
1676            .get_room_account_data(room_id, event_type)
1677            .await?
1678            .map(|value| self.deserialize_json(&value))
1679            .transpose()
1680    }
1681
1682    async fn get_user_room_receipt_event(
1683        &self,
1684        room_id: &RoomId,
1685        receipt_type: ReceiptType,
1686        thread: ReceiptThread,
1687        user_id: &UserId,
1688    ) -> Result<Option<(OwnedEventId, Receipt)>> {
1689        let room_id = self.encode_key(keys::RECEIPT, room_id);
1690        let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1691        // We cannot have a NULL primary key so we rely on serialization instead of the
1692        // string representation.
1693        let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1694        let user_id = self.encode_key(keys::RECEIPT, user_id);
1695
1696        self.acquire()
1697            .await?
1698            .get_user_receipt(room_id, receipt_type, thread, user_id)
1699            .await?
1700            .map(|value| {
1701                self.deserialize_json::<ReceiptData>(&value).map(|d| (d.event_id, d.receipt))
1702            })
1703            .transpose()
1704    }
1705
1706    async fn get_event_room_receipt_events(
1707        &self,
1708        room_id: &RoomId,
1709        receipt_type: ReceiptType,
1710        thread: ReceiptThread,
1711        event_id: &EventId,
1712    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1713        let room_id = self.encode_key(keys::RECEIPT, room_id);
1714        let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1715        // We cannot have a NULL primary key so we rely on serialization instead of the
1716        // string representation.
1717        let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1718        let event_id = self.encode_key(keys::RECEIPT, event_id);
1719
1720        self.acquire()
1721            .await?
1722            .get_event_receipts(room_id, receipt_type, thread, event_id)
1723            .await?
1724            .iter()
1725            .map(|value| {
1726                self.deserialize_json::<ReceiptData>(value).map(|d| (d.user_id, d.receipt))
1727            })
1728            .collect()
1729    }
1730
1731    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1732        self.acquire().await?.get_kv_blob(self.encode_custom_key(key)).await
1733    }
1734
1735    async fn set_custom_value_no_read(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
1736        let conn = self.acquire().await?;
1737        let key = self.encode_custom_key(key);
1738        conn.set_kv_blob(key, value).await?;
1739        Ok(())
1740    }
1741
1742    async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1743        let conn = self.acquire().await?;
1744        let key = self.encode_custom_key(key);
1745        let previous = conn.get_kv_blob(key.clone()).await?;
1746        conn.set_kv_blob(key, value).await?;
1747        Ok(previous)
1748    }
1749
1750    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1751        let conn = self.acquire().await?;
1752        let key = self.encode_custom_key(key);
1753        let previous = conn.get_kv_blob(key.clone()).await?;
1754        if previous.is_some() {
1755            conn.delete_kv_blob(key).await?;
1756        }
1757        Ok(previous)
1758    }
1759
1760    async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1761        let this = self.clone();
1762        let room_id = room_id.to_owned();
1763
1764        let conn = self.acquire().await?;
1765
1766        conn.with_transaction(move |txn| -> Result<()> {
1767            let room_info_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1768            txn.remove_room_info(&room_info_room_id)?;
1769
1770            let state_event_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1771            txn.remove_room_state_events(&state_event_room_id, None)?;
1772
1773            let member_room_id = this.encode_key(keys::MEMBER, &room_id);
1774            txn.remove_room_members(&member_room_id, None)?;
1775
1776            let profile_room_id = this.encode_key(keys::PROFILE, &room_id);
1777            txn.remove_room_profiles(&profile_room_id)?;
1778
1779            let room_account_data_room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, &room_id);
1780            txn.remove_room_account_data(&room_account_data_room_id)?;
1781
1782            let receipt_room_id = this.encode_key(keys::RECEIPT, &room_id);
1783            txn.remove_room_receipts(&receipt_room_id)?;
1784
1785            let display_name_room_id = this.encode_key(keys::DISPLAY_NAME, &room_id);
1786            txn.remove_room_display_names(&display_name_room_id)?;
1787
1788            let send_queue_room_id = this.encode_key(keys::SEND_QUEUE, &room_id);
1789            txn.remove_room_send_queue(&send_queue_room_id)?;
1790
1791            let dependent_send_queue_room_id =
1792                this.encode_key(keys::DEPENDENTS_SEND_QUEUE, &room_id);
1793            txn.remove_room_dependent_send_queue(&dependent_send_queue_room_id)?;
1794
1795            Ok(())
1796        })
1797        .await?;
1798
1799        conn.vacuum().await
1800    }
1801
1802    async fn save_send_queue_request(
1803        &self,
1804        room_id: &RoomId,
1805        transaction_id: OwnedTransactionId,
1806        created_at: MilliSecondsSinceUnixEpoch,
1807        content: QueuedRequestKind,
1808        priority: usize,
1809    ) -> Result<(), Self::Error> {
1810        let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
1811        let room_id_value = self.serialize_value(&room_id.to_owned())?;
1812
1813        let content = self.serialize_json(&content)?;
1814        // The transaction id is used both as a key (in remove/update) and a value (as
1815        // it's useful for the callers), so we keep it as is, and neither hash
1816        // it (with encode_key) or encrypt it (through serialize_value). After
1817        // all, it carries no personal information, so this is considered fine.
1818
1819        let created_at_ts: u64 = created_at.0.into();
1820        self.acquire()
1821            .await?
1822            .with_transaction(move |txn| {
1823                txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
1824                Ok(())
1825            })
1826            .await
1827    }
1828
1829    async fn update_send_queue_request(
1830        &self,
1831        room_id: &RoomId,
1832        transaction_id: &TransactionId,
1833        content: QueuedRequestKind,
1834    ) -> Result<bool, Self::Error> {
1835        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1836
1837        let content = self.serialize_json(&content)?;
1838        // See comment in [`Self::save_send_queue_event`] to understand why the
1839        // transaction id is neither encrypted or hashed.
1840        let transaction_id = transaction_id.to_string();
1841
1842        let num_updated = self.acquire()
1843            .await?
1844            .with_transaction(move |txn| {
1845                txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
1846            })
1847            .await?;
1848
1849        Ok(num_updated > 0)
1850    }
1851
1852    async fn remove_send_queue_request(
1853        &self,
1854        room_id: &RoomId,
1855        transaction_id: &TransactionId,
1856    ) -> Result<bool, Self::Error> {
1857        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1858
1859        // See comment in `save_send_queue_event`.
1860        let transaction_id = transaction_id.to_string();
1861
1862        let num_deleted = self
1863            .acquire()
1864            .await?
1865            .with_transaction(move |txn| {
1866                txn.prepare_cached(
1867                    "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
1868                )?
1869                .execute((room_id, &transaction_id))
1870            })
1871            .await?;
1872
1873        Ok(num_deleted > 0)
1874    }
1875
1876    async fn load_send_queue_requests(
1877        &self,
1878        room_id: &RoomId,
1879    ) -> Result<Vec<QueuedRequest>, Self::Error> {
1880        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1881
1882        // Note: ROWID is always present and is an auto-incremented integer counter. We
1883        // want to maintain the insertion order, so we can sort using it.
1884        // Note 2: transaction_id is not encoded, see why in `save_send_queue_event`.
1885        let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, Option<u64>)> = self
1886            .acquire()
1887            .await?
1888            .prepare(
1889                "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
1890                |mut stmt| {
1891                    stmt.query((room_id,))?
1892                        .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
1893                        .collect()
1894                },
1895            )
1896            .await?;
1897
1898        let mut requests = Vec::with_capacity(res.len());
1899        for entry in res {
1900            let created_at = entry
1901                .4
1902                .and_then(UInt::new)
1903                .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
1904            requests.push(QueuedRequest {
1905                transaction_id: entry.0.into(),
1906                kind: self.deserialize_json(&entry.1)?,
1907                error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
1908                priority: entry.3,
1909                created_at,
1910            });
1911        }
1912
1913        Ok(requests)
1914    }
1915
1916    async fn update_send_queue_request_status(
1917        &self,
1918        room_id: &RoomId,
1919        transaction_id: &TransactionId,
1920        error: Option<QueueWedgeError>,
1921    ) -> Result<(), Self::Error> {
1922        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1923
1924        // See comment in `save_send_queue_event`.
1925        let transaction_id = transaction_id.to_string();
1926
1927        // Serialize the error to json bytes (encrypted if option is enabled) if set.
1928        let error_value = error.map(|e| self.serialize_value(&e)).transpose()?;
1929
1930        self.acquire()
1931            .await?
1932            .with_transaction(move |txn| {
1933                txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?;
1934                Ok(())
1935            })
1936            .await
1937    }
1938
1939    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1940        // If the values were not encrypted, we could use `SELECT DISTINCT` here, but we
1941        // have to manually do the deduplication: indeed, for all X, encrypt(X)
1942        // != encrypted(X), since we use a nonce in the encryption process.
1943
1944        let res: Vec<Vec<u8>> = self
1945            .acquire()
1946            .await?
1947            .prepare("SELECT room_id_val FROM send_queue_events", |mut stmt| {
1948                stmt.query(())?.mapped(|row| row.get(0)).collect()
1949            })
1950            .await?;
1951
1952        // So we collect the results into a `BTreeSet` to perform the deduplication, and
1953        // then rejigger that into a vector.
1954        Ok(res
1955            .into_iter()
1956            .map(|entry| self.deserialize_value(&entry))
1957            .collect::<Result<BTreeSet<OwnedRoomId>, _>>()?
1958            .into_iter()
1959            .collect())
1960    }
1961
1962    async fn save_dependent_queued_request(
1963        &self,
1964        room_id: &RoomId,
1965        parent_txn_id: &TransactionId,
1966        own_txn_id: ChildTransactionId,
1967        created_at: MilliSecondsSinceUnixEpoch,
1968        content: DependentQueuedRequestKind,
1969    ) -> Result<()> {
1970        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
1971        let content = self.serialize_json(&content)?;
1972
1973        // See comment in `save_send_queue_event`.
1974        let parent_txn_id = parent_txn_id.to_string();
1975        let own_txn_id = own_txn_id.to_string();
1976
1977        let created_at_ts: u64 = created_at.0.into();
1978        self.acquire()
1979            .await?
1980            .with_transaction(move |txn| {
1981                txn.prepare_cached(
1982                    r#"INSERT INTO dependent_send_queue_events
1983                         (room_id, parent_transaction_id, own_transaction_id, content, created_at)
1984                       VALUES (?, ?, ?, ?, ?)"#,
1985                )?
1986                .execute((
1987                    room_id,
1988                    parent_txn_id,
1989                    own_txn_id,
1990                    content,
1991                    created_at_ts,
1992                ))?;
1993                Ok(())
1994            })
1995            .await
1996    }
1997
1998    async fn update_dependent_queued_request(
1999        &self,
2000        room_id: &RoomId,
2001        own_transaction_id: &ChildTransactionId,
2002        new_content: DependentQueuedRequestKind,
2003    ) -> Result<bool> {
2004        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2005        let content = self.serialize_json(&new_content)?;
2006
2007        // See comment in `save_send_queue_event`.
2008        let own_txn_id = own_transaction_id.to_string();
2009
2010        let num_updated = self
2011            .acquire()
2012            .await?
2013            .with_transaction(move |txn| {
2014                txn.prepare_cached(
2015                    r#"UPDATE dependent_send_queue_events
2016                       SET content = ?
2017                       WHERE own_transaction_id = ?
2018                       AND room_id = ?"#,
2019                )?
2020                .execute((content, own_txn_id, room_id))
2021            })
2022            .await?;
2023
2024        if num_updated > 1 {
2025            return Err(Error::InconsistentUpdate);
2026        }
2027
2028        Ok(num_updated == 1)
2029    }
2030
2031    async fn mark_dependent_queued_requests_as_ready(
2032        &self,
2033        room_id: &RoomId,
2034        parent_txn_id: &TransactionId,
2035        parent_key: SentRequestKey,
2036    ) -> Result<usize> {
2037        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2038        let parent_key = self.serialize_value(&parent_key)?;
2039
2040        // See comment in `save_send_queue_event`.
2041        let parent_txn_id = parent_txn_id.to_string();
2042
2043        self.acquire()
2044            .await?
2045            .with_transaction(move |txn| {
2046                Ok(txn.prepare_cached(
2047                    "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?",
2048                )?
2049                .execute((parent_key, parent_txn_id, room_id))?)
2050            })
2051            .await
2052    }
2053
2054    async fn remove_dependent_queued_request(
2055        &self,
2056        room_id: &RoomId,
2057        txn_id: &ChildTransactionId,
2058    ) -> Result<bool> {
2059        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2060
2061        // See comment in `save_send_queue_event`.
2062        let txn_id = txn_id.to_string();
2063
2064        let num_deleted = self
2065            .acquire()
2066            .await?
2067            .with_transaction(move |txn| {
2068                txn.prepare_cached(
2069                    "DELETE FROM dependent_send_queue_events WHERE own_transaction_id = ? AND room_id = ?",
2070                )?
2071                .execute((txn_id, room_id))
2072            })
2073            .await?;
2074
2075        Ok(num_deleted > 0)
2076    }
2077
2078    async fn load_dependent_queued_requests(
2079        &self,
2080        room_id: &RoomId,
2081    ) -> Result<Vec<DependentQueuedRequest>> {
2082        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2083
2084        // Note: transaction_id is not encoded, see why in `save_send_queue_event`.
2085        let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, Option<u64>)> = self
2086            .acquire()
2087            .await?
2088            .prepare(
2089                "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2090                |mut stmt| {
2091                    stmt.query((room_id,))?
2092                        .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2093                        .collect()
2094                },
2095            )
2096            .await?;
2097
2098        let mut dependent_events = Vec::with_capacity(res.len());
2099        for entry in res {
2100            let created_at = entry
2101                .4
2102                .and_then(UInt::new)
2103                .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2104            dependent_events.push(DependentQueuedRequest {
2105                own_transaction_id: entry.0.into(),
2106                parent_transaction_id: entry.1.into(),
2107                parent_key: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?,
2108                kind: self.deserialize_json(&entry.3)?,
2109                created_at,
2110            });
2111        }
2112
2113        Ok(dependent_events)
2114    }
2115}
2116
2117#[derive(Debug, Clone, Serialize, Deserialize)]
2118struct ReceiptData {
2119    receipt: Receipt,
2120    event_id: OwnedEventId,
2121    user_id: OwnedUserId,
2122}
2123
2124#[cfg(test)]
2125mod tests {
2126    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2127
2128    use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2129    use once_cell::sync::Lazy;
2130    use tempfile::{tempdir, TempDir};
2131
2132    use super::SqliteStateStore;
2133
2134    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2135    static NUM: AtomicU32 = AtomicU32::new(0);
2136
2137    async fn get_store() -> Result<impl StateStore, StoreError> {
2138        let name = NUM.fetch_add(1, SeqCst).to_string();
2139        let tmpdir_path = TMP_DIR.path().join(name);
2140
2141        tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2142
2143        Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
2144    }
2145
2146    statestore_integration_tests!();
2147}
2148
2149#[cfg(test)]
2150mod encrypted_tests {
2151    use std::{
2152        path::PathBuf,
2153        sync::atomic::{AtomicU32, Ordering::SeqCst},
2154    };
2155
2156    use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2157    use matrix_sdk_test::async_test;
2158    use once_cell::sync::Lazy;
2159    use tempfile::{tempdir, TempDir};
2160
2161    use super::SqliteStateStore;
2162    use crate::{utils::SqliteAsyncConnExt, SqliteStoreConfig};
2163
2164    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2165    static NUM: AtomicU32 = AtomicU32::new(0);
2166
2167    fn new_state_store_workspace() -> PathBuf {
2168        let name = NUM.fetch_add(1, SeqCst).to_string();
2169        TMP_DIR.path().join(name)
2170    }
2171
2172    async fn get_store() -> Result<impl StateStore, StoreError> {
2173        let tmpdir_path = new_state_store_workspace();
2174
2175        tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2176
2177        Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
2178            .await
2179            .unwrap())
2180    }
2181
2182    #[async_test]
2183    async fn test_pool_size() {
2184        let tmpdir_path = new_state_store_workspace();
2185        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
2186
2187        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2188
2189        assert_eq!(store.pool.status().max_size, 42);
2190    }
2191
2192    #[async_test]
2193    async fn test_cache_size() {
2194        let tmpdir_path = new_state_store_workspace();
2195        let store_open_config = SqliteStoreConfig::new(tmpdir_path).cache_size(1500);
2196
2197        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2198
2199        let conn = store.pool.get().await.unwrap();
2200        let cache_size =
2201            conn.query_row("PRAGMA cache_size", (), |row| row.get::<_, i32>(0)).await.unwrap();
2202
2203        // The value passed to  `SqliteStoreConfig` is in bytes. Check it is converted
2204        // to kibibytes. Also, it must be a negative value because it _is_ the size in
2205        // kibibytes, not in page size.
2206        assert_eq!(cache_size, -(1500 / 1024));
2207    }
2208
2209    #[async_test]
2210    async fn test_journal_size_limit() {
2211        let tmpdir_path = new_state_store_workspace();
2212        let store_open_config = SqliteStoreConfig::new(tmpdir_path).journal_size_limit(1500);
2213
2214        let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2215
2216        let conn = store.pool.get().await.unwrap();
2217        let journal_size_limit = conn
2218            .query_row("PRAGMA journal_size_limit", (), |row| row.get::<_, u32>(0))
2219            .await
2220            .unwrap();
2221
2222        // The value passed to  `SqliteStoreConfig` is in bytes. It stays in bytes in
2223        // SQLite.
2224        assert_eq!(journal_size_limit, 1500);
2225    }
2226
2227    statestore_integration_tests!();
2228}
2229
2230#[cfg(test)]
2231mod migration_tests {
2232    use std::{
2233        path::{Path, PathBuf},
2234        sync::{
2235            atomic::{AtomicU32, Ordering::SeqCst},
2236            Arc,
2237        },
2238    };
2239
2240    use deadpool_sqlite::Runtime;
2241    use matrix_sdk_base::{
2242        store::{
2243            ChildTransactionId, DependentQueuedRequestKind, RoomLoadSettings,
2244            SerializableEventContent,
2245        },
2246        sync::UnreadNotificationsCount,
2247        RoomState, StateStore,
2248    };
2249    use matrix_sdk_test::async_test;
2250    use once_cell::sync::Lazy;
2251    use ruma::{
2252        events::{
2253            room::{create::RoomCreateEventContent, message::RoomMessageEventContent},
2254            StateEventType,
2255        },
2256        room_id, server_name, user_id, EventId, MilliSecondsSinceUnixEpoch, RoomId, TransactionId,
2257        UserId,
2258    };
2259    use rusqlite::Transaction;
2260    use serde_json::json;
2261    use tempfile::{tempdir, TempDir};
2262    use tokio::fs;
2263
2264    use super::{init, keys, SqliteStateStore, DATABASE_NAME};
2265    use crate::{
2266        error::{Error, Result},
2267        utils::{SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
2268        OpenStoreError,
2269    };
2270
2271    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2272    static NUM: AtomicU32 = AtomicU32::new(0);
2273    const SECRET: &str = "secret";
2274
2275    fn new_path() -> PathBuf {
2276        let name = NUM.fetch_add(1, SeqCst).to_string();
2277        TMP_DIR.path().join(name)
2278    }
2279
2280    async fn create_fake_db(path: &Path, version: u8) -> Result<SqliteStateStore> {
2281        fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir).unwrap();
2282
2283        let config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
2284        // use default pool config
2285
2286        let pool = config.create_pool(Runtime::Tokio1).unwrap();
2287        let conn = pool.get().await?;
2288
2289        init(&conn).await?;
2290
2291        let store_cipher = Some(Arc::new(conn.get_or_create_store_cipher(SECRET).await.unwrap()));
2292        let this = SqliteStateStore { store_cipher, pool };
2293        this.run_migrations(&conn, 1, Some(version)).await?;
2294
2295        Ok(this)
2296    }
2297
2298    fn room_info_v1_json(
2299        room_id: &RoomId,
2300        state: RoomState,
2301        name: Option<&str>,
2302        creator: Option<&UserId>,
2303    ) -> serde_json::Value {
2304        // Test with name set or not.
2305        let name_content = match name {
2306            Some(name) => json!({ "name": name }),
2307            None => json!({ "name": null }),
2308        };
2309        // Test with creator set or not.
2310        let create_content = match creator {
2311            Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2312            None => RoomCreateEventContent::new_v11(),
2313        };
2314
2315        json!({
2316            "room_id": room_id,
2317            "room_type": state,
2318            "notification_counts": UnreadNotificationsCount::default(),
2319            "summary": {
2320                "heroes": [],
2321                "joined_member_count": 0,
2322                "invited_member_count": 0,
2323            },
2324            "members_synced": false,
2325            "base_info": {
2326                "dm_targets": [],
2327                "max_power_level": 100,
2328                "name": {
2329                    "Original": {
2330                        "content": name_content,
2331                    },
2332                },
2333                "create": {
2334                    "Original": {
2335                        "content": create_content,
2336                    }
2337                }
2338            },
2339        })
2340    }
2341
2342    #[async_test]
2343    pub async fn test_migrating_v1_to_v2() {
2344        let path = new_path();
2345        // Create and populate db.
2346        {
2347            let db = create_fake_db(&path, 1).await.unwrap();
2348            let conn = db.pool.get().await.unwrap();
2349
2350            let this = db.clone();
2351            conn.with_transaction(move |txn| {
2352                for i in 0..5 {
2353                    let room_id = RoomId::parse(format!("!room_{i}:localhost")).unwrap();
2354                    let (state, stripped) =
2355                        if i < 3 { (RoomState::Joined, false) } else { (RoomState::Invited, true) };
2356                    let info = room_info_v1_json(&room_id, state, None, None);
2357
2358                    let room_id = this.encode_key(keys::ROOM_INFO, room_id);
2359                    let data = this.serialize_json(&info)?;
2360
2361                    txn.prepare_cached(
2362                        "INSERT INTO room_info (room_id, stripped, data)
2363                         VALUES (?, ?, ?)",
2364                    )?
2365                    .execute((room_id, stripped, data))?;
2366                }
2367
2368                Result::<_, Error>::Ok(())
2369            })
2370            .await
2371            .unwrap();
2372        }
2373
2374        // This transparently migrates to the latest version.
2375        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2376
2377        // Check all room infos are there.
2378        assert_eq!(store.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 5);
2379    }
2380
2381    // Add a room in version 2 format of the state store.
2382    fn add_room_v2(
2383        this: &SqliteStateStore,
2384        txn: &Transaction<'_>,
2385        room_id: &RoomId,
2386        name: Option<&str>,
2387        create_creator: Option<&UserId>,
2388        create_sender: Option<&UserId>,
2389    ) -> Result<(), Error> {
2390        let room_info_json = room_info_v1_json(room_id, RoomState::Joined, name, create_creator);
2391
2392        let encoded_room_id = this.encode_key(keys::ROOM_INFO, room_id);
2393        let encoded_state =
2394            this.encode_key(keys::ROOM_INFO, serde_json::to_string(&RoomState::Joined)?);
2395        let data = this.serialize_json(&room_info_json)?;
2396
2397        txn.prepare_cached(
2398            "INSERT INTO room_info (room_id, state, data)
2399             VALUES (?, ?, ?)",
2400        )?
2401        .execute((encoded_room_id, encoded_state, data))?;
2402
2403        // Test with or without `m.room.create` event in the room state.
2404        let Some(create_sender) = create_sender else {
2405            return Ok(());
2406        };
2407
2408        let create_content = match create_creator {
2409            Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2410            None => RoomCreateEventContent::new_v11(),
2411        };
2412
2413        let event_id = EventId::new(server_name!("dummy.local"));
2414        let create_event = json!({
2415            "content": create_content,
2416            "event_id": event_id,
2417            "sender": create_sender.to_owned(),
2418            "origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
2419            "state_key": "",
2420            "type": "m.room.create",
2421            "unsigned": {},
2422        });
2423
2424        let encoded_room_id = this.encode_key(keys::STATE_EVENT, room_id);
2425        let encoded_event_type =
2426            this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
2427        let encoded_state_key = this.encode_key(keys::STATE_EVENT, "");
2428        let stripped = false;
2429        let encoded_event_id = this.encode_key(keys::STATE_EVENT, event_id);
2430        let data = this.serialize_json(&create_event)?;
2431
2432        txn.prepare_cached(
2433            "INSERT
2434             INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
2435             VALUES (?, ?, ?, ?, ?, ?)",
2436        )?
2437        .execute((
2438            encoded_room_id,
2439            encoded_event_type,
2440            encoded_state_key,
2441            stripped,
2442            encoded_event_id,
2443            data,
2444        ))?;
2445
2446        Ok(())
2447    }
2448
2449    #[async_test]
2450    pub async fn test_migrating_v2_to_v3() {
2451        let path = new_path();
2452
2453        // Room A: with name, creator and sender.
2454        let room_a_id = room_id!("!room_a:dummy.local");
2455        let room_a_name = "Room A";
2456        let room_a_creator = user_id!("@creator:dummy.local");
2457        // Use a different sender to check that sender is used over creator in
2458        // migration.
2459        let room_a_create_sender = user_id!("@sender:dummy.local");
2460
2461        // Room B: without name, creator and sender.
2462        let room_b_id = room_id!("!room_b:dummy.local");
2463
2464        // Room C: only with sender.
2465        let room_c_id = room_id!("!room_c:dummy.local");
2466        let room_c_create_sender = user_id!("@creator:dummy.local");
2467
2468        // Create and populate db.
2469        {
2470            let db = create_fake_db(&path, 2).await.unwrap();
2471            let conn = db.pool.get().await.unwrap();
2472
2473            let this = db.clone();
2474            conn.with_transaction(move |txn| {
2475                add_room_v2(
2476                    &this,
2477                    txn,
2478                    room_a_id,
2479                    Some(room_a_name),
2480                    Some(room_a_creator),
2481                    Some(room_a_create_sender),
2482                )?;
2483                add_room_v2(&this, txn, room_b_id, None, None, None)?;
2484                add_room_v2(&this, txn, room_c_id, None, None, Some(room_c_create_sender))?;
2485
2486                Result::<_, Error>::Ok(())
2487            })
2488            .await
2489            .unwrap();
2490        }
2491
2492        // This transparently migrates to the latest version.
2493        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2494
2495        // Check all room infos are there.
2496        let room_infos = store.get_room_infos(&RoomLoadSettings::default()).await.unwrap();
2497        assert_eq!(room_infos.len(), 3);
2498
2499        let room_a = room_infos.iter().find(|r| r.room_id() == room_a_id).unwrap();
2500        assert_eq!(room_a.name(), Some(room_a_name));
2501        assert_eq!(room_a.creator(), Some(room_a_create_sender));
2502
2503        let room_b = room_infos.iter().find(|r| r.room_id() == room_b_id).unwrap();
2504        assert_eq!(room_b.name(), None);
2505        assert_eq!(room_b.creator(), None);
2506
2507        let room_c = room_infos.iter().find(|r| r.room_id() == room_c_id).unwrap();
2508        assert_eq!(room_c.name(), None);
2509        assert_eq!(room_c.creator(), Some(room_c_create_sender));
2510    }
2511
2512    #[async_test]
2513    pub async fn test_migrating_v7_to_v9() {
2514        let path = new_path();
2515
2516        let room_id = room_id!("!room_a:dummy.local");
2517        let wedged_event_transaction_id = TransactionId::new();
2518        let local_event_transaction_id = TransactionId::new();
2519
2520        // Create and populate db.
2521        {
2522            let db = create_fake_db(&path, 7).await.unwrap();
2523            let conn = db.pool.get().await.unwrap();
2524
2525            let wedge_tx = wedged_event_transaction_id.clone();
2526            let local_tx = local_event_transaction_id.clone();
2527
2528            conn.with_transaction(move |txn| {
2529                add_dependent_send_queue_event_v7(
2530                    &db,
2531                    txn,
2532                    room_id,
2533                    &local_tx,
2534                    ChildTransactionId::new(),
2535                    DependentQueuedRequestKind::RedactEvent,
2536                )?;
2537                add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?;
2538                add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?;
2539                Result::<_, Error>::Ok(())
2540            })
2541            .await
2542            .unwrap();
2543        }
2544
2545        // This transparently migrates to the latest version, which clears up all
2546        // requests and dependent requests.
2547        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2548
2549        let requests = store.load_send_queue_requests(room_id).await.unwrap();
2550        assert!(requests.is_empty());
2551
2552        let dependent_requests = store.load_dependent_queued_requests(room_id).await.unwrap();
2553        assert!(dependent_requests.is_empty());
2554    }
2555
2556    fn add_send_queue_event_v7(
2557        this: &SqliteStateStore,
2558        txn: &Transaction<'_>,
2559        transaction_id: &TransactionId,
2560        room_id: &RoomId,
2561        is_wedged: bool,
2562    ) -> Result<(), Error> {
2563        let content =
2564            SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?;
2565
2566        let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id);
2567        let room_id_value = this.serialize_value(&room_id.to_owned())?;
2568
2569        let content = this.serialize_json(&content)?;
2570
2571        txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
2572            .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
2573
2574        Ok(())
2575    }
2576
2577    fn add_dependent_send_queue_event_v7(
2578        this: &SqliteStateStore,
2579        txn: &Transaction<'_>,
2580        room_id: &RoomId,
2581        parent_txn_id: &TransactionId,
2582        own_txn_id: ChildTransactionId,
2583        content: DependentQueuedRequestKind,
2584    ) -> Result<(), Error> {
2585        let room_id_value = this.serialize_value(&room_id.to_owned())?;
2586
2587        let parent_txn_id = parent_txn_id.to_string();
2588        let own_txn_id = own_txn_id.to_string();
2589        let content = this.serialize_json(&content)?;
2590
2591        txn.prepare_cached(
2592            "INSERT INTO dependent_send_queue_events
2593                         (room_id, parent_transaction_id, own_transaction_id, content)
2594                       VALUES (?, ?, ?, ?)",
2595        )?
2596        .execute((room_id_value, parent_txn_id, own_txn_id, content))?;
2597
2598        Ok(())
2599    }
2600}