matrix_sdk_sqlite/
state_store.rs

1use std::{
2    borrow::Cow,
3    collections::{BTreeMap, BTreeSet, HashMap},
4    fmt, iter,
5    path::Path,
6    sync::Arc,
7};
8
9use async_trait::async_trait;
10use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
11use matrix_sdk_base::{
12    deserialized_responses::{DisplayName, RawAnySyncOrStrippedState, SyncOrStrippedState},
13    store::{
14        migration_helpers::RoomInfoV1, ChildTransactionId, DependentQueuedRequest,
15        DependentQueuedRequestKind, QueueWedgeError, QueuedRequest, QueuedRequestKind,
16        SentRequestKey,
17    },
18    MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateChanges, StateStore,
19    StateStoreDataKey, StateStoreDataValue,
20};
21use matrix_sdk_store_encryption::StoreCipher;
22use ruma::{
23    canonical_json::{redact, RedactedBecause},
24    events::{
25        presence::PresenceEvent,
26        receipt::{Receipt, ReceiptThread, ReceiptType},
27        room::{
28            create::RoomCreateEventContent,
29            member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
30        },
31        AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
32        GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
33    },
34    serde::Raw,
35    CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
36    OwnedTransactionId, OwnedUserId, RoomId, RoomVersionId, TransactionId, UInt, UserId,
37};
38use rusqlite::{OptionalExtension, Transaction};
39use serde::{de::DeserializeOwned, Deserialize, Serialize};
40use tokio::fs;
41use tracing::{debug, warn};
42
43use crate::{
44    error::{Error, Result},
45    utils::{
46        repeat_vars, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
47        SqliteKeyValueStoreConnExt,
48    },
49    OpenStoreError,
50};
51
52mod keys {
53    // Tables
54    pub const KV_BLOB: &str = "kv_blob";
55    pub const ROOM_INFO: &str = "room_info";
56    pub const STATE_EVENT: &str = "state_event";
57    pub const GLOBAL_ACCOUNT_DATA: &str = "global_account_data";
58    pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
59    pub const MEMBER: &str = "member";
60    pub const PROFILE: &str = "profile";
61    pub const RECEIPT: &str = "receipt";
62    pub const DISPLAY_NAME: &str = "display_name";
63    pub const SEND_QUEUE: &str = "send_queue_events";
64    pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
65}
66
67/// Identifier of the latest database version.
68///
69/// This is used to figure whether the sqlite database requires a migration.
70/// Every new SQL migration should imply a bump of this number, and changes in
71/// the [`SqliteStateStore::run_migrations`] function..
72const DATABASE_VERSION: u8 = 12;
73
74/// A sqlite based cryptostore.
75#[derive(Clone)]
76pub struct SqliteStateStore {
77    store_cipher: Option<Arc<StoreCipher>>,
78    pool: SqlitePool,
79}
80
81#[cfg(not(tarpaulin_include))]
82impl fmt::Debug for SqliteStateStore {
83    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84        f.debug_struct("SqliteStateStore").finish_non_exhaustive()
85    }
86}
87
88impl SqliteStateStore {
89    /// Open the sqlite-based state store at the given path using the given
90    /// passphrase to encrypt private data.
91    pub async fn open(
92        path: impl AsRef<Path>,
93        passphrase: Option<&str>,
94    ) -> Result<Self, OpenStoreError> {
95        let pool = create_pool(path.as_ref()).await?;
96
97        Self::open_with_pool(pool, passphrase).await
98    }
99
100    /// Create a sqlite-based state store using the given sqlite database pool.
101    /// The given passphrase will be used to encrypt private data.
102    pub async fn open_with_pool(
103        pool: SqlitePool,
104        passphrase: Option<&str>,
105    ) -> Result<Self, OpenStoreError> {
106        let conn = pool.get().await?;
107        conn.set_journal_size_limit().await?;
108
109        let mut version = conn.db_version().await?;
110
111        if version == 0 {
112            init(&conn).await?;
113            version = 1;
114        }
115
116        let store_cipher = match passphrase {
117            Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
118            None => None,
119        };
120        let this = Self { store_cipher, pool };
121        this.run_migrations(&conn, version, None).await?;
122        conn.optimize().await?;
123
124        Ok(this)
125    }
126
127    /// Run database migrations from the given `from` version to the given `to`
128    /// version
129    ///
130    /// If `to` is `None`, the current database version will be used.
131    async fn run_migrations(&self, conn: &SqliteAsyncConn, from: u8, to: Option<u8>) -> Result<()> {
132        let to = to.unwrap_or(DATABASE_VERSION);
133
134        if from < to {
135            debug!(version = from, new_version = to, "Upgrading database");
136        } else {
137            return Ok(());
138        }
139
140        if from < 2 && to >= 2 {
141            let this = self.clone();
142            conn.with_transaction(move |txn| {
143                // Create new table.
144                txn.execute_batch(include_str!(
145                    "../migrations/state_store/002_a_create_new_room_info.sql"
146                ))?;
147
148                // Migrate data to new table.
149                for data in txn
150                    .prepare("SELECT data FROM room_info")?
151                    .query_map((), |row| row.get::<_, Vec<u8>>(0))?
152                {
153                    let data = data?;
154                    let room_info: RoomInfoV1 = this.deserialize_json(&data)?;
155
156                    let room_id = this.encode_key(keys::ROOM_INFO, room_info.room_id());
157                    let state = this
158                        .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
159                    txn.prepare_cached(
160                        "INSERT OR REPLACE INTO new_room_info (room_id, state, data)
161                         VALUES (?, ?, ?)",
162                    )?
163                    .execute((room_id, state, data))?;
164                }
165
166                // Replace old table.
167                txn.execute_batch(include_str!(
168                    "../migrations/state_store/002_b_replace_room_info.sql"
169                ))?;
170
171                txn.set_db_version(2)?;
172                Result::<_, Error>::Ok(())
173            })
174            .await?;
175        }
176
177        // Migration to v3: RoomInfo format has changed.
178        if from < 3 && to >= 3 {
179            let this = self.clone();
180            conn.with_transaction(move |txn| {
181                // Migrate data .
182                for data in txn
183                    .prepare("SELECT data FROM room_info")?
184                    .query_map((), |row| row.get::<_, Vec<u8>>(0))?
185                {
186                    let data = data?;
187                    let room_info_v1: RoomInfoV1 = this.deserialize_json(&data)?;
188
189                    // Get the `m.room.create` event from the room state.
190                    let room_id = this.encode_key(keys::STATE_EVENT, room_info_v1.room_id());
191                    let event_type =
192                        this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
193                    let create_res = txn
194                        .prepare(
195                            "SELECT stripped, data FROM state_event
196                             WHERE room_id = ? AND event_type = ?",
197                        )?
198                        .query_row([room_id, event_type], |row| {
199                            Ok((row.get::<_, bool>(0)?, row.get::<_, Vec<u8>>(1)?))
200                        })
201                        .optional()?;
202
203                    let create = create_res.and_then(|(stripped, data)| {
204                        let create = if stripped {
205                            SyncOrStrippedState::<RoomCreateEventContent>::Stripped(
206                                this.deserialize_json(&data).ok()?,
207                            )
208                        } else {
209                            SyncOrStrippedState::Sync(this.deserialize_json(&data).ok()?)
210                        };
211                        Some(create)
212                    });
213
214                    let migrated_room_info = room_info_v1.migrate(create.as_ref());
215
216                    let data = this.serialize_json(&migrated_room_info)?;
217                    let room_id = this.encode_key(keys::ROOM_INFO, migrated_room_info.room_id());
218                    txn.prepare_cached("UPDATE room_info SET data = ? WHERE room_id = ?")?
219                        .execute((data, room_id))?;
220                }
221
222                txn.set_db_version(3)?;
223                Result::<_, Error>::Ok(())
224            })
225            .await?;
226        }
227
228        if from < 4 && to >= 4 {
229            conn.with_transaction(move |txn| {
230                // Create new table.
231                txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
232                txn.set_db_version(4)
233            })
234            .await?;
235        }
236
237        if from < 5 && to >= 5 {
238            conn.with_transaction(move |txn| {
239                // Create new table.
240                txn.execute_batch(include_str!(
241                    "../migrations/state_store/004_send_queue_with_roomid_value.sql"
242                ))?;
243                txn.set_db_version(4)
244            })
245            .await?;
246        }
247
248        if from < 6 && to >= 6 {
249            conn.with_transaction(move |txn| {
250                // Create new table.
251                txn.execute_batch(include_str!(
252                    "../migrations/state_store/005_send_queue_dependent_events.sql"
253                ))?;
254                txn.set_db_version(6)
255            })
256            .await?;
257        }
258
259        if from < 7 && to >= 7 {
260            conn.with_transaction(move |txn| {
261                // Drop media table.
262                txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
263                txn.set_db_version(7)
264            })
265            .await?;
266        }
267
268        if from < 8 && to >= 8 {
269            // Replace all existing wedged events with a generic error.
270            let error = QueueWedgeError::GenericApiError {
271                msg: "local echo failed to send in a previous session".into(),
272            };
273            let default_err = self.serialize_value(&error)?;
274
275            conn.with_transaction(move |txn| {
276                // Update send queue table to persist the wedge reason if any.
277                txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?;
278
279                // Migrate the data, add a generic error for currently wedged events
280
281                for wedged_entries in txn
282                    .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")?
283                    .query_map((), |row| {
284                        Ok(
285                            (row.get::<_, Vec<u8>>(0)?,row.get::<_, String>(1)?)
286                        )
287                    })? {
288
289                    let (room_id, transaction_id) = wedged_entries?;
290
291                    txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?
292                        .execute((default_err.clone(), room_id, transaction_id))?;
293                }
294
295
296                // Clean up the table now that data is migrated
297                txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?;
298
299                txn.set_db_version(8)
300            })
301                .await?;
302        }
303
304        if from < 9 && to >= 9 {
305            conn.with_transaction(move |txn| {
306                // Run the migration.
307                txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?;
308                txn.set_db_version(9)
309            })
310            .await?;
311        }
312
313        if from < 10 && to >= 10 {
314            conn.with_transaction(move |txn| {
315                // Run the migration.
316                txn.execute_batch(include_str!(
317                    "../migrations/state_store/009_send_queue_priority.sql"
318                ))?;
319                txn.set_db_version(10)
320            })
321            .await?;
322        }
323
324        if from < 11 && to >= 11 {
325            conn.with_transaction(move |txn| {
326                // Run the migration.
327                txn.execute_batch(include_str!(
328                    "../migrations/state_store/010_send_queue_enqueue_time.sql"
329                ))?;
330                txn.set_db_version(11)
331            })
332            .await?;
333        }
334
335        if from < 12 && to >= 12 {
336            // Defragment the DB and optimize its size on the filesystem.
337            // This should have been run in the migration for version 7, to reduce the size
338            // of the DB as we removed the media cache.
339            conn.vacuum().await?;
340            conn.set_kv("version", vec![12]).await?;
341        }
342
343        Ok(())
344    }
345
346    fn encode_value(&self, value: Vec<u8>) -> Result<Vec<u8>> {
347        if let Some(key) = &self.store_cipher {
348            let encrypted = key.encrypt_value_data(value)?;
349            Ok(rmp_serde::to_vec_named(&encrypted)?)
350        } else {
351            Ok(value)
352        }
353    }
354
355    fn serialize_value(&self, value: &impl Serialize) -> Result<Vec<u8>> {
356        let serialized = rmp_serde::to_vec_named(value)?;
357        self.encode_value(serialized)
358    }
359
360    fn serialize_json(&self, value: &impl Serialize) -> Result<Vec<u8>> {
361        let serialized = serde_json::to_vec(value)?;
362        self.encode_value(serialized)
363    }
364
365    fn decode_value<'a>(&self, value: &'a [u8]) -> Result<Cow<'a, [u8]>> {
366        if let Some(key) = &self.store_cipher {
367            let encrypted = rmp_serde::from_slice(value)?;
368            let decrypted = key.decrypt_value_data(encrypted)?;
369            Ok(Cow::Owned(decrypted))
370        } else {
371            Ok(Cow::Borrowed(value))
372        }
373    }
374
375    fn deserialize_json<T: DeserializeOwned>(&self, data: &[u8]) -> Result<T> {
376        let decoded = self.decode_value(data)?;
377        Ok(serde_json::from_slice(&decoded)?)
378    }
379
380    fn deserialize_value<T: DeserializeOwned>(&self, value: &[u8]) -> Result<T> {
381        let decoded = self.decode_value(value)?;
382        Ok(rmp_serde::from_slice(&decoded)?)
383    }
384
385    fn encode_key(&self, table_name: &str, key: impl AsRef<[u8]>) -> Key {
386        let bytes = key.as_ref();
387        if let Some(store_cipher) = &self.store_cipher {
388            Key::Hashed(store_cipher.hash_key(table_name, bytes))
389        } else {
390            Key::Plain(bytes.to_owned())
391        }
392    }
393
394    fn encode_state_store_data_key(&self, key: StateStoreDataKey<'_>) -> Key {
395        let key_s = match key {
396            StateStoreDataKey::SyncToken => Cow::Borrowed(StateStoreDataKey::SYNC_TOKEN),
397            StateStoreDataKey::ServerCapabilities => {
398                Cow::Borrowed(StateStoreDataKey::SERVER_CAPABILITIES)
399            }
400            StateStoreDataKey::Filter(f) => {
401                Cow::Owned(format!("{}:{f}", StateStoreDataKey::FILTER))
402            }
403            StateStoreDataKey::UserAvatarUrl(u) => {
404                Cow::Owned(format!("{}:{u}", StateStoreDataKey::USER_AVATAR_URL))
405            }
406            StateStoreDataKey::RecentlyVisitedRooms(b) => {
407                Cow::Owned(format!("{}:{b}", StateStoreDataKey::RECENTLY_VISITED_ROOMS))
408            }
409            StateStoreDataKey::UtdHookManagerData => {
410                Cow::Borrowed(StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
411            }
412            StateStoreDataKey::ComposerDraft(room_id) => {
413                Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::COMPOSER_DRAFT))
414            }
415            StateStoreDataKey::SeenKnockRequests(room_id) => {
416                Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::SEEN_KNOCK_REQUESTS))
417            }
418        };
419
420        self.encode_key(keys::KV_BLOB, &*key_s)
421    }
422
423    fn encode_presence_key(&self, user_id: &UserId) -> Key {
424        self.encode_key(keys::KV_BLOB, format!("presence:{user_id}"))
425    }
426
427    fn encode_custom_key(&self, key: &[u8]) -> Key {
428        let mut full_key = b"custom:".to_vec();
429        full_key.extend(key);
430        self.encode_key(keys::KV_BLOB, full_key)
431    }
432
433    async fn acquire(&self) -> Result<SqliteAsyncConn> {
434        Ok(self.pool.get().await?)
435    }
436
437    fn remove_maybe_stripped_room_data(
438        &self,
439        txn: &Transaction<'_>,
440        room_id: &RoomId,
441        stripped: bool,
442    ) -> rusqlite::Result<()> {
443        let state_event_room_id = self.encode_key(keys::STATE_EVENT, room_id);
444        txn.remove_room_state_events(&state_event_room_id, Some(stripped))?;
445
446        let member_room_id = self.encode_key(keys::MEMBER, room_id);
447        txn.remove_room_members(&member_room_id, Some(stripped))
448    }
449}
450
451async fn create_pool(path: &Path) -> Result<SqlitePool, OpenStoreError> {
452    fs::create_dir_all(path).await.map_err(OpenStoreError::CreateDir)?;
453    let cfg = deadpool_sqlite::Config::new(path.join("matrix-sdk-state.sqlite3"));
454    Ok(cfg.create_pool(Runtime::Tokio1)?)
455}
456
457/// Initialize the database.
458async fn init(conn: &SqliteAsyncConn) -> Result<()> {
459    // First turn on WAL mode, this can't be done in the transaction, it fails with
460    // the error message: "cannot change into wal mode from within a transaction".
461    conn.execute_batch("PRAGMA journal_mode = wal;").await?;
462    conn.with_transaction(|txn| {
463        txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
464        txn.set_db_version(1)?;
465
466        Ok(())
467    })
468    .await
469}
470
471trait SqliteConnectionStateStoreExt {
472    fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()>;
473
474    fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()>;
475
476    fn set_room_account_data(
477        &self,
478        room_id: &[u8],
479        event_type: &[u8],
480        data: &[u8],
481    ) -> rusqlite::Result<()>;
482    fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()>;
483
484    fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()>;
485    fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>>;
486    fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()>;
487
488    fn set_state_event(
489        &self,
490        room_id: &[u8],
491        event_type: &[u8],
492        state_key: &[u8],
493        stripped: bool,
494        event_id: Option<&[u8]>,
495        data: &[u8],
496    ) -> rusqlite::Result<()>;
497    fn get_state_event_by_id(
498        &self,
499        room_id: &[u8],
500        event_id: &[u8],
501    ) -> rusqlite::Result<Option<Vec<u8>>>;
502    fn remove_room_state_events(
503        &self,
504        room_id: &[u8],
505        stripped: Option<bool>,
506    ) -> rusqlite::Result<()>;
507
508    fn set_member(
509        &self,
510        room_id: &[u8],
511        user_id: &[u8],
512        membership: &[u8],
513        stripped: bool,
514        data: &[u8],
515    ) -> rusqlite::Result<()>;
516    fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()>;
517
518    fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
519    fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()>;
520    fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()>;
521
522    fn set_receipt(
523        &self,
524        room_id: &[u8],
525        user_id: &[u8],
526        receipt_type: &[u8],
527        thread_id: &[u8],
528        event_id: &[u8],
529        data: &[u8],
530    ) -> rusqlite::Result<()>;
531    fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()>;
532
533    fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()>;
534    fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()>;
535    fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()>;
536    fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
537    fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
538}
539
540impl SqliteConnectionStateStoreExt for rusqlite::Connection {
541    fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()> {
542        self.execute("INSERT OR REPLACE INTO kv_blob VALUES (?, ?)", (key, value))?;
543        Ok(())
544    }
545
546    fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()> {
547        self.prepare_cached(
548            "INSERT OR REPLACE INTO global_account_data (event_type, data)
549             VALUES (?, ?)",
550        )?
551        .execute((event_type, data))?;
552        Ok(())
553    }
554
555    fn set_room_account_data(
556        &self,
557        room_id: &[u8],
558        event_type: &[u8],
559        data: &[u8],
560    ) -> rusqlite::Result<()> {
561        self.prepare_cached(
562            "INSERT OR REPLACE INTO room_account_data (room_id, event_type, data)
563             VALUES (?, ?, ?)",
564        )?
565        .execute((room_id, event_type, data))?;
566        Ok(())
567    }
568
569    fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()> {
570        self.prepare(
571            "DELETE FROM room_account_data
572             WHERE room_id = ?",
573        )?
574        .execute((room_id,))?;
575        Ok(())
576    }
577
578    fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()> {
579        self.prepare_cached(
580            "INSERT OR REPLACE INTO room_info (room_id, state, data)
581             VALUES (?, ?, ?)",
582        )?
583        .execute((room_id, state, data))?;
584        Ok(())
585    }
586
587    fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>> {
588        self.query_row("SELECT data FROM room_info WHERE room_id = ?", (room_id,), |row| row.get(0))
589            .optional()
590    }
591
592    /// Remove the room info for the given room.
593    fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()> {
594        self.prepare_cached("DELETE FROM room_info WHERE room_id = ?")?.execute((room_id,))?;
595        Ok(())
596    }
597
598    fn set_state_event(
599        &self,
600        room_id: &[u8],
601        event_type: &[u8],
602        state_key: &[u8],
603        stripped: bool,
604        event_id: Option<&[u8]>,
605        data: &[u8],
606    ) -> rusqlite::Result<()> {
607        self.prepare_cached(
608            "INSERT OR REPLACE
609             INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
610             VALUES (?, ?, ?, ?, ?, ?)",
611        )?
612        .execute((room_id, event_type, state_key, stripped, event_id, data))?;
613        Ok(())
614    }
615
616    fn get_state_event_by_id(
617        &self,
618        room_id: &[u8],
619        event_id: &[u8],
620    ) -> rusqlite::Result<Option<Vec<u8>>> {
621        self.query_row(
622            "SELECT data FROM state_event WHERE room_id = ? AND event_id = ?",
623            (room_id, event_id),
624            |row| row.get(0),
625        )
626        .optional()
627    }
628
629    /// Remove state events for the given room.
630    ///
631    /// If `stripped` is `Some()`, only removes state events for the given
632    /// stripped state. Otherwise, state events are removed regardless of the
633    /// stripped state.
634    fn remove_room_state_events(
635        &self,
636        room_id: &[u8],
637        stripped: Option<bool>,
638    ) -> rusqlite::Result<()> {
639        if let Some(stripped) = stripped {
640            self.prepare_cached("DELETE FROM state_event WHERE room_id = ? AND stripped = ?")?
641                .execute((room_id, stripped))?;
642        } else {
643            self.prepare_cached("DELETE FROM state_event WHERE room_id = ?")?
644                .execute((room_id,))?;
645        }
646        Ok(())
647    }
648
649    fn set_member(
650        &self,
651        room_id: &[u8],
652        user_id: &[u8],
653        membership: &[u8],
654        stripped: bool,
655        data: &[u8],
656    ) -> rusqlite::Result<()> {
657        self.prepare_cached(
658            "INSERT OR REPLACE
659             INTO member (room_id, user_id, membership, stripped, data)
660             VALUES (?, ?, ?, ?, ?)",
661        )?
662        .execute((room_id, user_id, membership, stripped, data))?;
663        Ok(())
664    }
665
666    /// Remove members for the given room.
667    ///
668    /// If `stripped` is `Some()`, only removes members for the given stripped
669    /// state. Otherwise, members are removed regardless of the stripped state.
670    fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()> {
671        if let Some(stripped) = stripped {
672            self.prepare_cached("DELETE FROM member WHERE room_id = ? AND stripped = ?")?
673                .execute((room_id, stripped))?;
674        } else {
675            self.prepare_cached("DELETE FROM member WHERE room_id = ?")?.execute((room_id,))?;
676        }
677        Ok(())
678    }
679
680    fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
681        self.prepare_cached(
682            "INSERT OR REPLACE
683             INTO profile (room_id, user_id, data)
684             VALUES (?, ?, ?)",
685        )?
686        .execute((room_id, user_id, data))?;
687        Ok(())
688    }
689
690    fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()> {
691        self.prepare("DELETE FROM profile WHERE room_id = ?")?.execute((room_id,))?;
692        Ok(())
693    }
694
695    fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()> {
696        self.prepare("DELETE FROM profile WHERE room_id = ? AND user_id = ?")?
697            .execute((room_id, user_id))?;
698        Ok(())
699    }
700
701    fn set_receipt(
702        &self,
703        room_id: &[u8],
704        user_id: &[u8],
705        receipt_type: &[u8],
706        thread: &[u8],
707        event_id: &[u8],
708        data: &[u8],
709    ) -> rusqlite::Result<()> {
710        self.prepare_cached(
711            "INSERT OR REPLACE
712             INTO receipt (room_id, user_id, receipt_type, thread, event_id, data)
713             VALUES (?, ?, ?, ?, ?, ?)",
714        )?
715        .execute((room_id, user_id, receipt_type, thread, event_id, data))?;
716        Ok(())
717    }
718
719    fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()> {
720        self.prepare("DELETE FROM receipt WHERE room_id = ?")?.execute((room_id,))?;
721        Ok(())
722    }
723
724    fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
725        self.prepare_cached(
726            "INSERT OR REPLACE
727             INTO display_name (room_id, name, data)
728             VALUES (?, ?, ?)",
729        )?
730        .execute((room_id, name, data))?;
731        Ok(())
732    }
733
734    fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()> {
735        self.prepare("DELETE FROM display_name WHERE room_id = ? AND name = ?")?
736            .execute((room_id, name))?;
737        Ok(())
738    }
739
740    fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()> {
741        self.prepare("DELETE FROM display_name WHERE room_id = ?")?.execute((room_id,))?;
742        Ok(())
743    }
744
745    fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
746        self.prepare("DELETE FROM send_queue_events WHERE room_id = ?")?.execute((room_id,))?;
747        Ok(())
748    }
749
750    fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
751        self.prepare("DELETE FROM dependent_send_queue_events WHERE room_id = ?")?
752            .execute((room_id,))?;
753        Ok(())
754    }
755}
756
757#[async_trait]
758trait SqliteObjectStateStoreExt: SqliteAsyncConnExt {
759    async fn get_kv_blob(&self, key: Key) -> Result<Option<Vec<u8>>> {
760        Ok(self
761            .query_row("SELECT value FROM kv_blob WHERE key = ?", (key,), |row| row.get(0))
762            .await
763            .optional()?)
764    }
765
766    async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
767        let keys_length = keys.len();
768
769        self.chunk_large_query_over(keys, Some(keys_length), |txn, keys| {
770            let sql_params = repeat_vars(keys.len());
771            let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");
772
773            let params = rusqlite::params_from_iter(keys);
774
775            Ok(txn
776                .prepare(&sql)?
777                .query(params)?
778                .mapped(|row| row.get(0))
779                .collect::<Result<_, _>>()?)
780        })
781        .await
782    }
783
784    async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;
785
786    async fn delete_kv_blob(&self, key: Key) -> Result<()> {
787        self.execute("DELETE FROM kv_blob WHERE key = ?", (key,)).await?;
788        Ok(())
789    }
790
791    async fn get_room_infos(&self) -> Result<Vec<Vec<u8>>> {
792        Ok(self
793            .prepare("SELECT data FROM room_info", move |mut stmt| {
794                stmt.query_map((), |row| row.get(0))?.collect()
795            })
796            .await?)
797    }
798
799    async fn get_maybe_stripped_state_events_for_keys(
800        &self,
801        room_id: Key,
802        event_type: Key,
803        state_keys: Vec<Key>,
804    ) -> Result<Vec<(bool, Vec<u8>)>> {
805        self.chunk_large_query_over(state_keys, None, move |txn, state_keys: Vec<Key>| {
806            let sql_params = repeat_vars(state_keys.len());
807            let sql = format!(
808                "SELECT stripped, data FROM state_event
809                 WHERE room_id = ? AND event_type = ? AND state_key IN ({sql_params})"
810            );
811
812            let params = rusqlite::params_from_iter(
813                [room_id.clone(), event_type.clone()].into_iter().chain(state_keys),
814            );
815
816            Ok(txn
817                .prepare(&sql)?
818                .query(params)?
819                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
820                .collect::<Result<_, _>>()?)
821        })
822        .await
823    }
824
825    async fn get_maybe_stripped_state_events(
826        &self,
827        room_id: Key,
828        event_type: Key,
829    ) -> Result<Vec<(bool, Vec<u8>)>> {
830        Ok(self
831            .prepare(
832                "SELECT stripped, data FROM state_event
833                 WHERE room_id = ? AND event_type = ?",
834                |mut stmt| {
835                    stmt.query((room_id, event_type))?
836                        .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
837                        .collect()
838                },
839            )
840            .await?)
841    }
842
843    async fn get_profiles(
844        &self,
845        room_id: Key,
846        user_ids: Vec<Key>,
847    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
848        let user_ids_length = user_ids.len();
849
850        self.chunk_large_query_over(user_ids, Some(user_ids_length), move |txn, user_ids| {
851            let sql_params = repeat_vars(user_ids.len());
852            let sql = format!(
853                "SELECT user_id, data FROM profile WHERE room_id = ? AND user_id IN ({sql_params})"
854            );
855
856            let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(user_ids));
857
858            Ok(txn
859                .prepare(&sql)?
860                .query(params)?
861                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
862                .collect::<Result<_, _>>()?)
863        })
864        .await
865    }
866
867    async fn get_user_ids(&self, room_id: Key, memberships: Vec<Key>) -> Result<Vec<Vec<u8>>> {
868        let res = if memberships.is_empty() {
869            self.prepare("SELECT data FROM member WHERE room_id = ?", |mut stmt| {
870                stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
871            })
872            .await?
873        } else {
874            self.chunk_large_query_over(memberships, None, move |txn, memberships| {
875                let sql_params = repeat_vars(memberships.len());
876                let sql = format!(
877                    "SELECT data FROM member WHERE room_id = ? AND membership IN ({sql_params})"
878                );
879
880                let params =
881                    rusqlite::params_from_iter(iter::once(room_id.clone()).chain(memberships));
882
883                Ok(txn
884                    .prepare(&sql)?
885                    .query(params)?
886                    .mapped(|row| row.get(0))
887                    .collect::<Result<_, _>>()?)
888            })
889            .await?
890        };
891
892        Ok(res)
893    }
894
895    async fn get_global_account_data(&self, event_type: Key) -> Result<Option<Vec<u8>>> {
896        Ok(self
897            .query_row(
898                "SELECT data FROM global_account_data WHERE event_type = ?",
899                (event_type,),
900                |row| row.get(0),
901            )
902            .await
903            .optional()?)
904    }
905
906    async fn get_room_account_data(
907        &self,
908        room_id: Key,
909        event_type: Key,
910    ) -> Result<Option<Vec<u8>>> {
911        Ok(self
912            .query_row(
913                "SELECT data FROM room_account_data WHERE room_id = ? AND event_type = ?",
914                (room_id, event_type),
915                |row| row.get(0),
916            )
917            .await
918            .optional()?)
919    }
920
921    async fn get_display_names(
922        &self,
923        room_id: Key,
924        names: Vec<Key>,
925    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
926        let names_length = names.len();
927
928        self.chunk_large_query_over(names, Some(names_length), move |txn, names| {
929            let sql_params = repeat_vars(names.len());
930            let sql = format!(
931                "SELECT name, data FROM display_name WHERE room_id = ? AND name IN ({sql_params})"
932            );
933
934            let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(names));
935
936            Ok(txn
937                .prepare(&sql)?
938                .query(params)?
939                .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
940                .collect::<Result<_, _>>()?)
941        })
942        .await
943    }
944
945    async fn get_user_receipt(
946        &self,
947        room_id: Key,
948        receipt_type: Key,
949        thread: Key,
950        user_id: Key,
951    ) -> Result<Option<Vec<u8>>> {
952        Ok(self
953            .query_row(
954                "SELECT data FROM receipt
955                 WHERE room_id = ? AND receipt_type = ? AND thread = ? and user_id = ?",
956                (room_id, receipt_type, thread, user_id),
957                |row| row.get(0),
958            )
959            .await
960            .optional()?)
961    }
962
963    async fn get_event_receipts(
964        &self,
965        room_id: Key,
966        receipt_type: Key,
967        thread: Key,
968        event_id: Key,
969    ) -> Result<Vec<Vec<u8>>> {
970        Ok(self
971            .prepare(
972                "SELECT data FROM receipt
973                 WHERE room_id = ? AND receipt_type = ? AND thread = ? and event_id = ?",
974                |mut stmt| {
975                    stmt.query((room_id, receipt_type, thread, event_id))?
976                        .mapped(|row| row.get(0))
977                        .collect()
978                },
979            )
980            .await?)
981    }
982}
983
984#[async_trait]
985impl SqliteObjectStateStoreExt for SqliteAsyncConn {
986    async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()> {
987        Ok(self.interact(move |conn| conn.set_kv_blob(&key, &value)).await.unwrap()?)
988    }
989}
990
991#[async_trait]
992impl StateStore for SqliteStateStore {
993    type Error = Error;
994
995    async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
996        self.acquire()
997            .await?
998            .get_kv_blob(self.encode_state_store_data_key(key))
999            .await?
1000            .map(|data| {
1001                Ok(match key {
1002                    StateStoreDataKey::SyncToken => {
1003                        StateStoreDataValue::SyncToken(self.deserialize_value(&data)?)
1004                    }
1005                    StateStoreDataKey::ServerCapabilities => {
1006                        StateStoreDataValue::ServerCapabilities(self.deserialize_value(&data)?)
1007                    }
1008                    StateStoreDataKey::Filter(_) => {
1009                        StateStoreDataValue::Filter(self.deserialize_value(&data)?)
1010                    }
1011                    StateStoreDataKey::UserAvatarUrl(_) => {
1012                        StateStoreDataValue::UserAvatarUrl(self.deserialize_value(&data)?)
1013                    }
1014                    StateStoreDataKey::RecentlyVisitedRooms(_) => {
1015                        StateStoreDataValue::RecentlyVisitedRooms(self.deserialize_value(&data)?)
1016                    }
1017                    StateStoreDataKey::UtdHookManagerData => {
1018                        StateStoreDataValue::UtdHookManagerData(self.deserialize_value(&data)?)
1019                    }
1020                    StateStoreDataKey::ComposerDraft(_) => {
1021                        StateStoreDataValue::ComposerDraft(self.deserialize_value(&data)?)
1022                    }
1023                    StateStoreDataKey::SeenKnockRequests(_) => {
1024                        StateStoreDataValue::SeenKnockRequests(self.deserialize_value(&data)?)
1025                    }
1026                })
1027            })
1028            .transpose()
1029    }
1030
1031    async fn set_kv_data(
1032        &self,
1033        key: StateStoreDataKey<'_>,
1034        value: StateStoreDataValue,
1035    ) -> Result<()> {
1036        let serialized_value = match key {
1037            StateStoreDataKey::SyncToken => self.serialize_value(
1038                &value.into_sync_token().expect("Session data not a sync token"),
1039            )?,
1040            StateStoreDataKey::ServerCapabilities => self.serialize_value(
1041                &value
1042                    .into_server_capabilities()
1043                    .expect("Session data not containing server capabilities"),
1044            )?,
1045            StateStoreDataKey::Filter(_) => {
1046                self.serialize_value(&value.into_filter().expect("Session data not a filter"))?
1047            }
1048            StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
1049                &value.into_user_avatar_url().expect("Session data not an user avatar url"),
1050            )?,
1051            StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
1052                &value.into_recently_visited_rooms().expect("Session data not breadcrumbs"),
1053            )?,
1054            StateStoreDataKey::UtdHookManagerData => self.serialize_value(
1055                &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
1056            )?,
1057            StateStoreDataKey::ComposerDraft(_) => self.serialize_value(
1058                &value.into_composer_draft().expect("Session data not a composer draft"),
1059            )?,
1060            StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
1061                &value
1062                    .into_seen_knock_requests()
1063                    .expect("Session data is not a set of seen knock request ids"),
1064            )?,
1065        };
1066
1067        self.acquire()
1068            .await?
1069            .set_kv_blob(self.encode_state_store_data_key(key), serialized_value)
1070            .await
1071    }
1072
1073    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
1074        self.acquire().await?.delete_kv_blob(self.encode_state_store_data_key(key)).await
1075    }
1076
1077    async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
1078        let changes = changes.to_owned();
1079        let this = self.clone();
1080        self.acquire()
1081            .await?
1082            .with_transaction(move |txn| {
1083                let StateChanges {
1084                    sync_token,
1085                    account_data,
1086                    presence,
1087                    profiles,
1088                    profiles_to_delete,
1089                    state,
1090                    room_account_data,
1091                    room_infos,
1092                    receipts,
1093                    redactions,
1094                    stripped_state,
1095                    ambiguity_maps,
1096                } = changes;
1097
1098                if let Some(sync_token) = sync_token {
1099                    let key = this.encode_state_store_data_key(StateStoreDataKey::SyncToken);
1100                    let value = this.serialize_value(&sync_token)?;
1101                    txn.set_kv_blob(&key, &value)?;
1102                }
1103
1104                for (event_type, event) in account_data {
1105                    let event_type =
1106                        this.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1107                    let data = this.serialize_json(&event)?;
1108                    txn.set_global_account_data(&event_type, &data)?;
1109                }
1110
1111                for (room_id, events) in room_account_data {
1112                    let room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1113                    for (event_type, event) in events {
1114                        let event_type =
1115                            this.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1116                        let data = this.serialize_json(&event)?;
1117                        txn.set_room_account_data(&room_id, &event_type, &data)?;
1118                    }
1119                }
1120
1121                for (user_id, event) in presence {
1122                    let key = this.encode_presence_key(&user_id);
1123                    let value = this.serialize_json(&event)?;
1124                    txn.set_kv_blob(&key, &value)?;
1125                }
1126
1127                for (room_id, room_info) in room_infos {
1128                    let stripped = room_info.state() == RoomState::Invited;
1129                    // Remove non-stripped data for stripped rooms and vice-versa.
1130                    this.remove_maybe_stripped_room_data(txn, &room_id, !stripped)?;
1131
1132                    let room_id = this.encode_key(keys::ROOM_INFO, room_id);
1133                    let state = this
1134                        .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
1135                    let data = this.serialize_json(&room_info)?;
1136                    txn.set_room_info(&room_id, &state, &data)?;
1137                }
1138
1139                for (room_id, user_ids) in profiles_to_delete {
1140                    let room_id = this.encode_key(keys::PROFILE, room_id);
1141                    for user_id in user_ids {
1142                        let user_id = this.encode_key(keys::PROFILE, user_id);
1143                        txn.remove_room_profile(&room_id, &user_id)?;
1144                    }
1145                }
1146
1147                for (room_id, state_event_types) in state {
1148                    let profiles = profiles.get(&room_id);
1149                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1150
1151                    for (event_type, state_events) in state_event_types {
1152                        let encoded_event_type =
1153                            this.encode_key(keys::STATE_EVENT, event_type.to_string());
1154
1155                        for (state_key, raw_state_event) in state_events {
1156                            let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1157                            let data = this.serialize_json(&raw_state_event)?;
1158
1159                            let event_id: Option<String> =
1160                                raw_state_event.get_field("event_id").ok().flatten();
1161                            let encoded_event_id =
1162                                event_id.as_ref().map(|e| this.encode_key(keys::STATE_EVENT, e));
1163
1164                            txn.set_state_event(
1165                                &encoded_room_id,
1166                                &encoded_event_type,
1167                                &encoded_state_key,
1168                                false,
1169                                encoded_event_id.as_deref(),
1170                                &data,
1171                            )?;
1172
1173                            if event_type == StateEventType::RoomMember {
1174                                let member_event = match raw_state_event
1175                                    .deserialize_as::<SyncRoomMemberEvent>()
1176                                {
1177                                    Ok(ev) => ev,
1178                                    Err(e) => {
1179                                        debug!(event_id, "Failed to deserialize member event: {e}");
1180                                        continue;
1181                                    }
1182                                };
1183
1184                                let encoded_room_id = this.encode_key(keys::MEMBER, &room_id);
1185                                let user_id = this.encode_key(keys::MEMBER, &state_key);
1186                                let membership = this
1187                                    .encode_key(keys::MEMBER, member_event.membership().as_str());
1188                                let data = this.serialize_value(&state_key)?;
1189
1190                                txn.set_member(
1191                                    &encoded_room_id,
1192                                    &user_id,
1193                                    &membership,
1194                                    false,
1195                                    &data,
1196                                )?;
1197
1198                                if let Some(profile) =
1199                                    profiles.and_then(|p| p.get(member_event.state_key()))
1200                                {
1201                                    let room_id = this.encode_key(keys::PROFILE, &room_id);
1202                                    let user_id = this.encode_key(keys::PROFILE, &state_key);
1203                                    let data = this.serialize_json(&profile)?;
1204                                    txn.set_profile(&room_id, &user_id, &data)?;
1205                                }
1206                            }
1207                        }
1208                    }
1209                }
1210
1211                for (room_id, stripped_state_event_types) in stripped_state {
1212                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1213
1214                    for (event_type, stripped_state_events) in stripped_state_event_types {
1215                        let encoded_event_type =
1216                            this.encode_key(keys::STATE_EVENT, event_type.to_string());
1217
1218                        for (state_key, raw_stripped_state_event) in stripped_state_events {
1219                            let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1220                            let data = this.serialize_json(&raw_stripped_state_event)?;
1221                            txn.set_state_event(
1222                                &encoded_room_id,
1223                                &encoded_event_type,
1224                                &encoded_state_key,
1225                                true,
1226                                None,
1227                                &data,
1228                            )?;
1229
1230                            if event_type == StateEventType::RoomMember {
1231                                let member_event = match raw_stripped_state_event
1232                                    .deserialize_as::<StrippedRoomMemberEvent>(
1233                                ) {
1234                                    Ok(ev) => ev,
1235                                    Err(e) => {
1236                                        debug!("Failed to deserialize stripped member event: {e}");
1237                                        continue;
1238                                    }
1239                                };
1240
1241                                let room_id = this.encode_key(keys::MEMBER, &room_id);
1242                                let user_id = this.encode_key(keys::MEMBER, &state_key);
1243                                let membership = this.encode_key(
1244                                    keys::MEMBER,
1245                                    member_event.content.membership.as_str(),
1246                                );
1247                                let data = this.serialize_value(&state_key)?;
1248
1249                                txn.set_member(&room_id, &user_id, &membership, true, &data)?;
1250                            }
1251                        }
1252                    }
1253                }
1254
1255                for (room_id, receipt_event) in receipts {
1256                    let room_id = this.encode_key(keys::RECEIPT, room_id);
1257
1258                    for (event_id, receipt_types) in receipt_event {
1259                        let encoded_event_id = this.encode_key(keys::RECEIPT, &event_id);
1260
1261                        for (receipt_type, receipt_users) in receipt_types {
1262                            let receipt_type =
1263                                this.encode_key(keys::RECEIPT, receipt_type.as_str());
1264
1265                            for (user_id, receipt) in receipt_users {
1266                                let encoded_user_id = this.encode_key(keys::RECEIPT, &user_id);
1267                                // We cannot have a NULL primary key so we rely on serialization
1268                                // instead of the string representation.
1269                                let thread = this.encode_key(
1270                                    keys::RECEIPT,
1271                                    rmp_serde::to_vec_named(&receipt.thread)?,
1272                                );
1273                                let data = this.serialize_json(&ReceiptData {
1274                                    receipt,
1275                                    event_id: event_id.clone(),
1276                                    user_id,
1277                                })?;
1278
1279                                txn.set_receipt(
1280                                    &room_id,
1281                                    &encoded_user_id,
1282                                    &receipt_type,
1283                                    &thread,
1284                                    &encoded_event_id,
1285                                    &data,
1286                                )?;
1287                            }
1288                        }
1289                    }
1290                }
1291
1292                for (room_id, redactions) in redactions {
1293                    let make_room_version = || {
1294                        let encoded_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1295                        txn.get_room_info(&encoded_room_id)
1296                            .ok()
1297                            .flatten()
1298                            .and_then(|v| this.deserialize_json::<RoomInfo>(&v).ok())
1299                            .and_then(|info| info.room_version().cloned())
1300                            .unwrap_or_else(|| {
1301                                warn!(
1302                                    ?room_id,
1303                                    "Unable to find the room version, assume version 9"
1304                                );
1305                                RoomVersionId::V9
1306                            })
1307                    };
1308
1309                    let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1310                    let mut room_version = None;
1311
1312                    for (event_id, redaction) in redactions {
1313                        let event_id = this.encode_key(keys::STATE_EVENT, event_id);
1314
1315                        if let Some(Ok(raw_event)) = txn
1316                            .get_state_event_by_id(&encoded_room_id, &event_id)?
1317                            .map(|value| this.deserialize_json::<Raw<AnySyncStateEvent>>(&value))
1318                        {
1319                            let event = raw_event.deserialize()?;
1320                            let redacted = redact(
1321                                raw_event.deserialize_as::<CanonicalJsonObject>()?,
1322                                room_version.get_or_insert_with(make_room_version),
1323                                Some(RedactedBecause::from_raw_event(&redaction)?),
1324                            )
1325                            .map_err(Error::Redaction)?;
1326                            let data = this.serialize_json(&redacted)?;
1327
1328                            let event_type =
1329                                this.encode_key(keys::STATE_EVENT, event.event_type().to_string());
1330                            let state_key = this.encode_key(keys::STATE_EVENT, event.state_key());
1331
1332                            txn.set_state_event(
1333                                &encoded_room_id,
1334                                &event_type,
1335                                &state_key,
1336                                false,
1337                                Some(&event_id),
1338                                &data,
1339                            )?;
1340                        }
1341                    }
1342                }
1343
1344                for (room_id, display_names) in ambiguity_maps {
1345                    let room_id = this.encode_key(keys::DISPLAY_NAME, room_id);
1346
1347                    for (name, user_ids) in display_names {
1348                        let encoded_name = this.encode_key(
1349                            keys::DISPLAY_NAME,
1350                            name.as_normalized_str().unwrap_or_else(|| name.as_raw_str()),
1351                        );
1352                        let data = this.serialize_json(&user_ids)?;
1353
1354                        if user_ids.is_empty() {
1355                            txn.remove_display_name(&room_id, &encoded_name)?;
1356
1357                            // We can't do a migration to merge the previously distinct buckets of
1358                            // user IDs since the display names themselves are hashed before they
1359                            // are persisted in the store. So the store will always retain two
1360                            // buckets: one for raw display names and one for normalised ones.
1361                            //
1362                            // We therefore do the next best thing, which is a sort of a soft
1363                            // migration: we fetch both the raw and normalised buckets, then merge
1364                            // the user IDs contained in them into a separate, temporary merged
1365                            // bucket. The SDK then operates on the merged buckets exclusively. See
1366                            // the comment in `get_users_with_display_names` for details.
1367                            //
1368                            // If the merged bucket is empty, that must mean that both the raw and
1369                            // normalised buckets were also empty, so we can remove both from the
1370                            // store.
1371                            let raw_name = this.encode_key(keys::DISPLAY_NAME, name.as_raw_str());
1372                            txn.remove_display_name(&room_id, &raw_name)?;
1373                        } else {
1374                            // We only create new buckets with the normalized display name.
1375                            txn.set_display_name(&room_id, &encoded_name, &data)?;
1376                        }
1377                    }
1378                }
1379
1380                Ok::<_, Error>(())
1381            })
1382            .await?;
1383
1384        Ok(())
1385    }
1386
1387    async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1388        self.acquire()
1389            .await?
1390            .get_kv_blob(self.encode_presence_key(user_id))
1391            .await?
1392            .map(|data| self.deserialize_json(&data))
1393            .transpose()
1394    }
1395
1396    async fn get_presence_events(
1397        &self,
1398        user_ids: &[OwnedUserId],
1399    ) -> Result<Vec<Raw<PresenceEvent>>> {
1400        if user_ids.is_empty() {
1401            return Ok(Vec::new());
1402        }
1403
1404        let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
1405        self.acquire()
1406            .await?
1407            .get_kv_blobs(user_ids)
1408            .await?
1409            .into_iter()
1410            .map(|data| self.deserialize_json(&data))
1411            .collect()
1412    }
1413
1414    async fn get_state_event(
1415        &self,
1416        room_id: &RoomId,
1417        event_type: StateEventType,
1418        state_key: &str,
1419    ) -> Result<Option<RawAnySyncOrStrippedState>> {
1420        Ok(self
1421            .get_state_events_for_keys(room_id, event_type, &[state_key])
1422            .await?
1423            .into_iter()
1424            .next())
1425    }
1426
1427    async fn get_state_events(
1428        &self,
1429        room_id: &RoomId,
1430        event_type: StateEventType,
1431    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1432        let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1433        let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1434        self.acquire()
1435            .await?
1436            .get_maybe_stripped_state_events(room_id, event_type)
1437            .await?
1438            .into_iter()
1439            .map(|(stripped, data)| {
1440                let ev = if stripped {
1441                    RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1442                } else {
1443                    RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1444                };
1445
1446                Ok(ev)
1447            })
1448            .collect()
1449    }
1450
1451    async fn get_state_events_for_keys(
1452        &self,
1453        room_id: &RoomId,
1454        event_type: StateEventType,
1455        state_keys: &[&str],
1456    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1457        if state_keys.is_empty() {
1458            return Ok(Vec::new());
1459        }
1460
1461        let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1462        let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1463        let state_keys = state_keys.iter().map(|k| self.encode_key(keys::STATE_EVENT, k)).collect();
1464        self.acquire()
1465            .await?
1466            .get_maybe_stripped_state_events_for_keys(room_id, event_type, state_keys)
1467            .await?
1468            .into_iter()
1469            .map(|(stripped, data)| {
1470                let ev = if stripped {
1471                    RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1472                } else {
1473                    RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1474                };
1475
1476                Ok(ev)
1477            })
1478            .collect()
1479    }
1480
1481    async fn get_profile(
1482        &self,
1483        room_id: &RoomId,
1484        user_id: &UserId,
1485    ) -> Result<Option<MinimalRoomMemberEvent>> {
1486        let room_id = self.encode_key(keys::PROFILE, room_id);
1487        let user_ids = vec![self.encode_key(keys::PROFILE, user_id)];
1488
1489        self.acquire()
1490            .await?
1491            .get_profiles(room_id, user_ids)
1492            .await?
1493            .into_iter()
1494            .next()
1495            .map(|(_, data)| self.deserialize_json(&data))
1496            .transpose()
1497    }
1498
1499    async fn get_profiles<'a>(
1500        &self,
1501        room_id: &RoomId,
1502        user_ids: &'a [OwnedUserId],
1503    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1504        if user_ids.is_empty() {
1505            return Ok(BTreeMap::new());
1506        }
1507
1508        let room_id = self.encode_key(keys::PROFILE, room_id);
1509        let mut user_ids_map = user_ids
1510            .iter()
1511            .map(|u| (self.encode_key(keys::PROFILE, u), u.as_ref()))
1512            .collect::<BTreeMap<_, _>>();
1513        let user_ids = user_ids_map.keys().cloned().collect();
1514
1515        self.acquire()
1516            .await?
1517            .get_profiles(room_id, user_ids)
1518            .await?
1519            .into_iter()
1520            .map(|(user_id, data)| {
1521                Ok((
1522                    user_ids_map
1523                        .remove(user_id.as_slice())
1524                        .expect("returned user IDs were requested"),
1525                    self.deserialize_json(&data)?,
1526                ))
1527            })
1528            .collect()
1529    }
1530
1531    async fn get_user_ids(
1532        &self,
1533        room_id: &RoomId,
1534        membership: RoomMemberships,
1535    ) -> Result<Vec<OwnedUserId>> {
1536        let room_id = self.encode_key(keys::MEMBER, room_id);
1537        let memberships = membership
1538            .as_vec()
1539            .into_iter()
1540            .map(|m| self.encode_key(keys::MEMBER, m.as_str()))
1541            .collect();
1542        self.acquire()
1543            .await?
1544            .get_user_ids(room_id, memberships)
1545            .await?
1546            .iter()
1547            .map(|data| self.deserialize_value(data))
1548            .collect()
1549    }
1550
1551    async fn get_room_infos(&self) -> Result<Vec<RoomInfo>> {
1552        self.acquire()
1553            .await?
1554            .get_room_infos()
1555            .await?
1556            .into_iter()
1557            .map(|data| self.deserialize_json(&data))
1558            .collect()
1559    }
1560
1561    async fn get_users_with_display_name(
1562        &self,
1563        room_id: &RoomId,
1564        display_name: &DisplayName,
1565    ) -> Result<BTreeSet<OwnedUserId>> {
1566        let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1567        let names = vec![self.encode_key(
1568            keys::DISPLAY_NAME,
1569            display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1570        )];
1571
1572        Ok(self
1573            .acquire()
1574            .await?
1575            .get_display_names(room_id, names)
1576            .await?
1577            .into_iter()
1578            .next()
1579            .map(|(_, data)| self.deserialize_json(&data))
1580            .transpose()?
1581            .unwrap_or_default())
1582    }
1583
1584    async fn get_users_with_display_names<'a>(
1585        &self,
1586        room_id: &RoomId,
1587        display_names: &'a [DisplayName],
1588    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1589        let mut result = HashMap::new();
1590
1591        if display_names.is_empty() {
1592            return Ok(result);
1593        }
1594
1595        let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1596        let mut names_map = display_names
1597            .iter()
1598            .flat_map(|display_name| {
1599                // We encode the display name as the `raw_str()` and the normalized string.
1600                //
1601                // This is for compatibility reasons since:
1602                //  1. Previously "Alice" and "alice" were considered to be distinct display
1603                //     names, while we now consider them to be the same so we need to merge the
1604                //     previously distinct buckets of user IDs.
1605                //  2. We can't do a migration to merge the previously distinct buckets of user
1606                //     IDs since the display names itself are hashed before they are persisted
1607                //     in the store.
1608                let raw =
1609                    (self.encode_key(keys::DISPLAY_NAME, display_name.as_raw_str()), display_name);
1610                let normalized = display_name.as_normalized_str().map(|normalized| {
1611                    (self.encode_key(keys::DISPLAY_NAME, normalized), display_name)
1612                });
1613
1614                iter::once(raw).chain(normalized.into_iter())
1615            })
1616            .collect::<BTreeMap<_, _>>();
1617        let names = names_map.keys().cloned().collect();
1618
1619        for (name, data) in
1620            self.acquire().await?.get_display_names(room_id, names).await?.into_iter()
1621        {
1622            let display_name =
1623                names_map.remove(name.as_slice()).expect("returned display names were requested");
1624            let user_ids: BTreeSet<_> = self.deserialize_json(&data)?;
1625
1626            result.entry(display_name).or_insert_with(BTreeSet::new).extend(user_ids);
1627        }
1628
1629        Ok(result)
1630    }
1631
1632    async fn get_account_data_event(
1633        &self,
1634        event_type: GlobalAccountDataEventType,
1635    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1636        let event_type = self.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1637        self.acquire()
1638            .await?
1639            .get_global_account_data(event_type)
1640            .await?
1641            .map(|value| self.deserialize_json(&value))
1642            .transpose()
1643    }
1644
1645    async fn get_room_account_data_event(
1646        &self,
1647        room_id: &RoomId,
1648        event_type: RoomAccountDataEventType,
1649    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1650        let room_id = self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1651        let event_type = self.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1652        self.acquire()
1653            .await?
1654            .get_room_account_data(room_id, event_type)
1655            .await?
1656            .map(|value| self.deserialize_json(&value))
1657            .transpose()
1658    }
1659
1660    async fn get_user_room_receipt_event(
1661        &self,
1662        room_id: &RoomId,
1663        receipt_type: ReceiptType,
1664        thread: ReceiptThread,
1665        user_id: &UserId,
1666    ) -> Result<Option<(OwnedEventId, Receipt)>> {
1667        let room_id = self.encode_key(keys::RECEIPT, room_id);
1668        let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1669        // We cannot have a NULL primary key so we rely on serialization instead of the
1670        // string representation.
1671        let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1672        let user_id = self.encode_key(keys::RECEIPT, user_id);
1673
1674        self.acquire()
1675            .await?
1676            .get_user_receipt(room_id, receipt_type, thread, user_id)
1677            .await?
1678            .map(|value| {
1679                self.deserialize_json::<ReceiptData>(&value).map(|d| (d.event_id, d.receipt))
1680            })
1681            .transpose()
1682    }
1683
1684    async fn get_event_room_receipt_events(
1685        &self,
1686        room_id: &RoomId,
1687        receipt_type: ReceiptType,
1688        thread: ReceiptThread,
1689        event_id: &EventId,
1690    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1691        let room_id = self.encode_key(keys::RECEIPT, room_id);
1692        let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1693        // We cannot have a NULL primary key so we rely on serialization instead of the
1694        // string representation.
1695        let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1696        let event_id = self.encode_key(keys::RECEIPT, event_id);
1697
1698        self.acquire()
1699            .await?
1700            .get_event_receipts(room_id, receipt_type, thread, event_id)
1701            .await?
1702            .iter()
1703            .map(|value| {
1704                self.deserialize_json::<ReceiptData>(value).map(|d| (d.user_id, d.receipt))
1705            })
1706            .collect()
1707    }
1708
1709    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1710        self.acquire().await?.get_kv_blob(self.encode_custom_key(key)).await
1711    }
1712
1713    async fn set_custom_value_no_read(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
1714        let conn = self.acquire().await?;
1715        let key = self.encode_custom_key(key);
1716        conn.set_kv_blob(key, value).await?;
1717        Ok(())
1718    }
1719
1720    async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1721        let conn = self.acquire().await?;
1722        let key = self.encode_custom_key(key);
1723        let previous = conn.get_kv_blob(key.clone()).await?;
1724        conn.set_kv_blob(key, value).await?;
1725        Ok(previous)
1726    }
1727
1728    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1729        let conn = self.acquire().await?;
1730        let key = self.encode_custom_key(key);
1731        let previous = conn.get_kv_blob(key.clone()).await?;
1732        if previous.is_some() {
1733            conn.delete_kv_blob(key).await?;
1734        }
1735        Ok(previous)
1736    }
1737
1738    async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1739        let this = self.clone();
1740        let room_id = room_id.to_owned();
1741
1742        let conn = self.acquire().await?;
1743
1744        conn.with_transaction(move |txn| -> Result<()> {
1745            let room_info_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1746            txn.remove_room_info(&room_info_room_id)?;
1747
1748            let state_event_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1749            txn.remove_room_state_events(&state_event_room_id, None)?;
1750
1751            let member_room_id = this.encode_key(keys::MEMBER, &room_id);
1752            txn.remove_room_members(&member_room_id, None)?;
1753
1754            let profile_room_id = this.encode_key(keys::PROFILE, &room_id);
1755            txn.remove_room_profiles(&profile_room_id)?;
1756
1757            let room_account_data_room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, &room_id);
1758            txn.remove_room_account_data(&room_account_data_room_id)?;
1759
1760            let receipt_room_id = this.encode_key(keys::RECEIPT, &room_id);
1761            txn.remove_room_receipts(&receipt_room_id)?;
1762
1763            let display_name_room_id = this.encode_key(keys::DISPLAY_NAME, &room_id);
1764            txn.remove_room_display_names(&display_name_room_id)?;
1765
1766            let send_queue_room_id = this.encode_key(keys::SEND_QUEUE, &room_id);
1767            txn.remove_room_send_queue(&send_queue_room_id)?;
1768
1769            let dependent_send_queue_room_id =
1770                this.encode_key(keys::DEPENDENTS_SEND_QUEUE, &room_id);
1771            txn.remove_room_dependent_send_queue(&dependent_send_queue_room_id)?;
1772
1773            Ok(())
1774        })
1775        .await?;
1776
1777        conn.vacuum().await
1778    }
1779
1780    async fn save_send_queue_request(
1781        &self,
1782        room_id: &RoomId,
1783        transaction_id: OwnedTransactionId,
1784        created_at: MilliSecondsSinceUnixEpoch,
1785        content: QueuedRequestKind,
1786        priority: usize,
1787    ) -> Result<(), Self::Error> {
1788        let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
1789        let room_id_value = self.serialize_value(&room_id.to_owned())?;
1790
1791        let content = self.serialize_json(&content)?;
1792        // The transaction id is used both as a key (in remove/update) and a value (as
1793        // it's useful for the callers), so we keep it as is, and neither hash
1794        // it (with encode_key) or encrypt it (through serialize_value). After
1795        // all, it carries no personal information, so this is considered fine.
1796
1797        let created_at_ts: u64 = created_at.0.into();
1798        self.acquire()
1799            .await?
1800            .with_transaction(move |txn| {
1801                txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
1802                Ok(())
1803            })
1804            .await
1805    }
1806
1807    async fn update_send_queue_request(
1808        &self,
1809        room_id: &RoomId,
1810        transaction_id: &TransactionId,
1811        content: QueuedRequestKind,
1812    ) -> Result<bool, Self::Error> {
1813        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1814
1815        let content = self.serialize_json(&content)?;
1816        // See comment in [`Self::save_send_queue_event`] to understand why the
1817        // transaction id is neither encrypted or hashed.
1818        let transaction_id = transaction_id.to_string();
1819
1820        let num_updated = self.acquire()
1821            .await?
1822            .with_transaction(move |txn| {
1823                txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
1824            })
1825            .await?;
1826
1827        Ok(num_updated > 0)
1828    }
1829
1830    async fn remove_send_queue_request(
1831        &self,
1832        room_id: &RoomId,
1833        transaction_id: &TransactionId,
1834    ) -> Result<bool, Self::Error> {
1835        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1836
1837        // See comment in `save_send_queue_event`.
1838        let transaction_id = transaction_id.to_string();
1839
1840        let num_deleted = self
1841            .acquire()
1842            .await?
1843            .with_transaction(move |txn| {
1844                txn.prepare_cached(
1845                    "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
1846                )?
1847                .execute((room_id, &transaction_id))
1848            })
1849            .await?;
1850
1851        Ok(num_deleted > 0)
1852    }
1853
1854    async fn load_send_queue_requests(
1855        &self,
1856        room_id: &RoomId,
1857    ) -> Result<Vec<QueuedRequest>, Self::Error> {
1858        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1859
1860        // Note: ROWID is always present and is an auto-incremented integer counter. We
1861        // want to maintain the insertion order, so we can sort using it.
1862        // Note 2: transaction_id is not encoded, see why in `save_send_queue_event`.
1863        let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, Option<u64>)> = self
1864            .acquire()
1865            .await?
1866            .prepare(
1867                "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
1868                |mut stmt| {
1869                    stmt.query((room_id,))?
1870                        .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
1871                        .collect()
1872                },
1873            )
1874            .await?;
1875
1876        let mut requests = Vec::with_capacity(res.len());
1877        for entry in res {
1878            let created_at = entry
1879                .4
1880                .and_then(UInt::new)
1881                .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
1882            requests.push(QueuedRequest {
1883                transaction_id: entry.0.into(),
1884                kind: self.deserialize_json(&entry.1)?,
1885                error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
1886                priority: entry.3,
1887                created_at,
1888            });
1889        }
1890
1891        Ok(requests)
1892    }
1893
1894    async fn update_send_queue_request_status(
1895        &self,
1896        room_id: &RoomId,
1897        transaction_id: &TransactionId,
1898        error: Option<QueueWedgeError>,
1899    ) -> Result<(), Self::Error> {
1900        let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1901
1902        // See comment in `save_send_queue_event`.
1903        let transaction_id = transaction_id.to_string();
1904
1905        // Serialize the error to json bytes (encrypted if option is enabled) if set.
1906        let error_value = error.map(|e| self.serialize_value(&e)).transpose()?;
1907
1908        self.acquire()
1909            .await?
1910            .with_transaction(move |txn| {
1911                txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?;
1912                Ok(())
1913            })
1914            .await
1915    }
1916
1917    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1918        // If the values were not encrypted, we could use `SELECT DISTINCT` here, but we
1919        // have to manually do the deduplication: indeed, for all X, encrypt(X)
1920        // != encrypted(X), since we use a nonce in the encryption process.
1921
1922        let res: Vec<Vec<u8>> = self
1923            .acquire()
1924            .await?
1925            .prepare("SELECT room_id_val FROM send_queue_events", |mut stmt| {
1926                stmt.query(())?.mapped(|row| row.get(0)).collect()
1927            })
1928            .await?;
1929
1930        // So we collect the results into a `BTreeSet` to perform the deduplication, and
1931        // then rejigger that into a vector.
1932        Ok(res
1933            .into_iter()
1934            .map(|entry| self.deserialize_value(&entry))
1935            .collect::<Result<BTreeSet<OwnedRoomId>, _>>()?
1936            .into_iter()
1937            .collect())
1938    }
1939
1940    async fn save_dependent_queued_request(
1941        &self,
1942        room_id: &RoomId,
1943        parent_txn_id: &TransactionId,
1944        own_txn_id: ChildTransactionId,
1945        created_at: MilliSecondsSinceUnixEpoch,
1946        content: DependentQueuedRequestKind,
1947    ) -> Result<()> {
1948        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
1949        let content = self.serialize_json(&content)?;
1950
1951        // See comment in `save_send_queue_event`.
1952        let parent_txn_id = parent_txn_id.to_string();
1953        let own_txn_id = own_txn_id.to_string();
1954
1955        let created_at_ts: u64 = created_at.0.into();
1956        self.acquire()
1957            .await?
1958            .with_transaction(move |txn| {
1959                txn.prepare_cached(
1960                    r#"INSERT INTO dependent_send_queue_events
1961                         (room_id, parent_transaction_id, own_transaction_id, content, created_at)
1962                       VALUES (?, ?, ?, ?, ?)"#,
1963                )?
1964                .execute((
1965                    room_id,
1966                    parent_txn_id,
1967                    own_txn_id,
1968                    content,
1969                    created_at_ts,
1970                ))?;
1971                Ok(())
1972            })
1973            .await
1974    }
1975
1976    async fn update_dependent_queued_request(
1977        &self,
1978        room_id: &RoomId,
1979        own_transaction_id: &ChildTransactionId,
1980        new_content: DependentQueuedRequestKind,
1981    ) -> Result<bool> {
1982        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
1983        let content = self.serialize_json(&new_content)?;
1984
1985        // See comment in `save_send_queue_event`.
1986        let own_txn_id = own_transaction_id.to_string();
1987
1988        let num_updated = self
1989            .acquire()
1990            .await?
1991            .with_transaction(move |txn| {
1992                txn.prepare_cached(
1993                    r#"UPDATE dependent_send_queue_events
1994                       SET content = ?
1995                       WHERE own_transaction_id = ?
1996                       AND room_id = ?"#,
1997                )?
1998                .execute((content, own_txn_id, room_id))
1999            })
2000            .await?;
2001
2002        if num_updated > 1 {
2003            return Err(Error::InconsistentUpdate);
2004        }
2005
2006        Ok(num_updated == 1)
2007    }
2008
2009    async fn mark_dependent_queued_requests_as_ready(
2010        &self,
2011        room_id: &RoomId,
2012        parent_txn_id: &TransactionId,
2013        parent_key: SentRequestKey,
2014    ) -> Result<usize> {
2015        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2016        let parent_key = self.serialize_value(&parent_key)?;
2017
2018        // See comment in `save_send_queue_event`.
2019        let parent_txn_id = parent_txn_id.to_string();
2020
2021        self.acquire()
2022            .await?
2023            .with_transaction(move |txn| {
2024                Ok(txn.prepare_cached(
2025                    "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?",
2026                )?
2027                .execute((parent_key, parent_txn_id, room_id))?)
2028            })
2029            .await
2030    }
2031
2032    async fn remove_dependent_queued_request(
2033        &self,
2034        room_id: &RoomId,
2035        txn_id: &ChildTransactionId,
2036    ) -> Result<bool> {
2037        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2038
2039        // See comment in `save_send_queue_event`.
2040        let txn_id = txn_id.to_string();
2041
2042        let num_deleted = self
2043            .acquire()
2044            .await?
2045            .with_transaction(move |txn| {
2046                txn.prepare_cached(
2047                    "DELETE FROM dependent_send_queue_events WHERE own_transaction_id = ? AND room_id = ?",
2048                )?
2049                .execute((txn_id, room_id))
2050            })
2051            .await?;
2052
2053        Ok(num_deleted > 0)
2054    }
2055
2056    async fn load_dependent_queued_requests(
2057        &self,
2058        room_id: &RoomId,
2059    ) -> Result<Vec<DependentQueuedRequest>> {
2060        let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2061
2062        // Note: transaction_id is not encoded, see why in `save_send_queue_event`.
2063        let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, Option<u64>)> = self
2064            .acquire()
2065            .await?
2066            .prepare(
2067                "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2068                |mut stmt| {
2069                    stmt.query((room_id,))?
2070                        .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2071                        .collect()
2072                },
2073            )
2074            .await?;
2075
2076        let mut dependent_events = Vec::with_capacity(res.len());
2077        for entry in res {
2078            let created_at = entry
2079                .4
2080                .and_then(UInt::new)
2081                .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2082            dependent_events.push(DependentQueuedRequest {
2083                own_transaction_id: entry.0.into(),
2084                parent_transaction_id: entry.1.into(),
2085                parent_key: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?,
2086                kind: self.deserialize_json(&entry.3)?,
2087                created_at,
2088            });
2089        }
2090
2091        Ok(dependent_events)
2092    }
2093}
2094
2095#[derive(Debug, Clone, Serialize, Deserialize)]
2096struct ReceiptData {
2097    receipt: Receipt,
2098    event_id: OwnedEventId,
2099    user_id: OwnedUserId,
2100}
2101
2102#[cfg(test)]
2103mod tests {
2104    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2105
2106    use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2107    use once_cell::sync::Lazy;
2108    use tempfile::{tempdir, TempDir};
2109
2110    use super::SqliteStateStore;
2111
2112    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2113    static NUM: AtomicU32 = AtomicU32::new(0);
2114
2115    async fn get_store() -> Result<impl StateStore, StoreError> {
2116        let name = NUM.fetch_add(1, SeqCst).to_string();
2117        let tmpdir_path = TMP_DIR.path().join(name);
2118
2119        tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2120
2121        Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
2122    }
2123
2124    statestore_integration_tests!();
2125}
2126
2127#[cfg(test)]
2128mod encrypted_tests {
2129    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2130
2131    use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2132    use once_cell::sync::Lazy;
2133    use tempfile::{tempdir, TempDir};
2134
2135    use super::SqliteStateStore;
2136
2137    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2138    static NUM: AtomicU32 = AtomicU32::new(0);
2139
2140    async fn get_store() -> Result<impl StateStore, StoreError> {
2141        let name = NUM.fetch_add(1, SeqCst).to_string();
2142        let tmpdir_path = TMP_DIR.path().join(name);
2143
2144        tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2145
2146        Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
2147            .await
2148            .unwrap())
2149    }
2150
2151    statestore_integration_tests!();
2152}
2153
2154#[cfg(test)]
2155mod migration_tests {
2156    use std::{
2157        path::{Path, PathBuf},
2158        sync::{
2159            atomic::{AtomicU32, Ordering::SeqCst},
2160            Arc,
2161        },
2162    };
2163
2164    use matrix_sdk_base::{
2165        store::{ChildTransactionId, DependentQueuedRequestKind, SerializableEventContent},
2166        sync::UnreadNotificationsCount,
2167        RoomState, StateStore,
2168    };
2169    use matrix_sdk_test::async_test;
2170    use once_cell::sync::Lazy;
2171    use ruma::{
2172        events::{
2173            room::{create::RoomCreateEventContent, message::RoomMessageEventContent},
2174            StateEventType,
2175        },
2176        room_id, server_name, user_id, EventId, MilliSecondsSinceUnixEpoch, RoomId, TransactionId,
2177        UserId,
2178    };
2179    use rusqlite::Transaction;
2180    use serde_json::json;
2181    use tempfile::{tempdir, TempDir};
2182
2183    use super::{create_pool, init, keys, SqliteStateStore};
2184    use crate::{
2185        error::{Error, Result},
2186        utils::{SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
2187    };
2188
2189    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2190    static NUM: AtomicU32 = AtomicU32::new(0);
2191    const SECRET: &str = "secret";
2192
2193    fn new_path() -> PathBuf {
2194        let name = NUM.fetch_add(1, SeqCst).to_string();
2195        TMP_DIR.path().join(name)
2196    }
2197
2198    async fn create_fake_db(path: &Path, version: u8) -> Result<SqliteStateStore> {
2199        let pool = create_pool(path).await.unwrap();
2200        let conn = pool.get().await?;
2201
2202        init(&conn).await?;
2203
2204        let store_cipher = Some(Arc::new(conn.get_or_create_store_cipher(SECRET).await.unwrap()));
2205        let this = SqliteStateStore { store_cipher, pool };
2206        this.run_migrations(&conn, 1, Some(version)).await?;
2207
2208        Ok(this)
2209    }
2210
2211    fn room_info_v1_json(
2212        room_id: &RoomId,
2213        state: RoomState,
2214        name: Option<&str>,
2215        creator: Option<&UserId>,
2216    ) -> serde_json::Value {
2217        // Test with name set or not.
2218        let name_content = match name {
2219            Some(name) => json!({ "name": name }),
2220            None => json!({ "name": null }),
2221        };
2222        // Test with creator set or not.
2223        let create_content = match creator {
2224            Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2225            None => RoomCreateEventContent::new_v11(),
2226        };
2227
2228        json!({
2229            "room_id": room_id,
2230            "room_type": state,
2231            "notification_counts": UnreadNotificationsCount::default(),
2232            "summary": {
2233                "heroes": [],
2234                "joined_member_count": 0,
2235                "invited_member_count": 0,
2236            },
2237            "members_synced": false,
2238            "base_info": {
2239                "dm_targets": [],
2240                "max_power_level": 100,
2241                "name": {
2242                    "Original": {
2243                        "content": name_content,
2244                    },
2245                },
2246                "create": {
2247                    "Original": {
2248                        "content": create_content,
2249                    }
2250                }
2251            },
2252        })
2253    }
2254
2255    #[async_test]
2256    pub async fn test_migrating_v1_to_v2() {
2257        let path = new_path();
2258        // Create and populate db.
2259        {
2260            let db = create_fake_db(&path, 1).await.unwrap();
2261            let conn = db.pool.get().await.unwrap();
2262
2263            let this = db.clone();
2264            conn.with_transaction(move |txn| {
2265                for i in 0..5 {
2266                    let room_id = RoomId::parse(format!("!room_{i}:localhost")).unwrap();
2267                    let (state, stripped) =
2268                        if i < 3 { (RoomState::Joined, false) } else { (RoomState::Invited, true) };
2269                    let info = room_info_v1_json(&room_id, state, None, None);
2270
2271                    let room_id = this.encode_key(keys::ROOM_INFO, room_id);
2272                    let data = this.serialize_json(&info)?;
2273
2274                    txn.prepare_cached(
2275                        "INSERT INTO room_info (room_id, stripped, data)
2276                         VALUES (?, ?, ?)",
2277                    )?
2278                    .execute((room_id, stripped, data))?;
2279                }
2280
2281                Result::<_, Error>::Ok(())
2282            })
2283            .await
2284            .unwrap();
2285        }
2286
2287        // This transparently migrates to the latest version.
2288        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2289
2290        // Check all room infos are there.
2291        assert_eq!(store.get_room_infos().await.unwrap().len(), 5);
2292    }
2293
2294    // Add a room in version 2 format of the state store.
2295    fn add_room_v2(
2296        this: &SqliteStateStore,
2297        txn: &Transaction<'_>,
2298        room_id: &RoomId,
2299        name: Option<&str>,
2300        create_creator: Option<&UserId>,
2301        create_sender: Option<&UserId>,
2302    ) -> Result<(), Error> {
2303        let room_info_json = room_info_v1_json(room_id, RoomState::Joined, name, create_creator);
2304
2305        let encoded_room_id = this.encode_key(keys::ROOM_INFO, room_id);
2306        let encoded_state =
2307            this.encode_key(keys::ROOM_INFO, serde_json::to_string(&RoomState::Joined)?);
2308        let data = this.serialize_json(&room_info_json)?;
2309
2310        txn.prepare_cached(
2311            "INSERT INTO room_info (room_id, state, data)
2312             VALUES (?, ?, ?)",
2313        )?
2314        .execute((encoded_room_id, encoded_state, data))?;
2315
2316        // Test with or without `m.room.create` event in the room state.
2317        let Some(create_sender) = create_sender else {
2318            return Ok(());
2319        };
2320
2321        let create_content = match create_creator {
2322            Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2323            None => RoomCreateEventContent::new_v11(),
2324        };
2325
2326        let event_id = EventId::new(server_name!("dummy.local"));
2327        let create_event = json!({
2328            "content": create_content,
2329            "event_id": event_id,
2330            "sender": create_sender.to_owned(),
2331            "origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
2332            "state_key": "",
2333            "type": "m.room.create",
2334            "unsigned": {},
2335        });
2336
2337        let encoded_room_id = this.encode_key(keys::STATE_EVENT, room_id);
2338        let encoded_event_type =
2339            this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
2340        let encoded_state_key = this.encode_key(keys::STATE_EVENT, "");
2341        let stripped = false;
2342        let encoded_event_id = this.encode_key(keys::STATE_EVENT, event_id);
2343        let data = this.serialize_json(&create_event)?;
2344
2345        txn.prepare_cached(
2346            "INSERT
2347             INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
2348             VALUES (?, ?, ?, ?, ?, ?)",
2349        )?
2350        .execute((
2351            encoded_room_id,
2352            encoded_event_type,
2353            encoded_state_key,
2354            stripped,
2355            encoded_event_id,
2356            data,
2357        ))?;
2358
2359        Ok(())
2360    }
2361
2362    #[async_test]
2363    pub async fn test_migrating_v2_to_v3() {
2364        let path = new_path();
2365
2366        // Room A: with name, creator and sender.
2367        let room_a_id = room_id!("!room_a:dummy.local");
2368        let room_a_name = "Room A";
2369        let room_a_creator = user_id!("@creator:dummy.local");
2370        // Use a different sender to check that sender is used over creator in
2371        // migration.
2372        let room_a_create_sender = user_id!("@sender:dummy.local");
2373
2374        // Room B: without name, creator and sender.
2375        let room_b_id = room_id!("!room_b:dummy.local");
2376
2377        // Room C: only with sender.
2378        let room_c_id = room_id!("!room_c:dummy.local");
2379        let room_c_create_sender = user_id!("@creator:dummy.local");
2380
2381        // Create and populate db.
2382        {
2383            let db = create_fake_db(&path, 2).await.unwrap();
2384            let conn = db.pool.get().await.unwrap();
2385
2386            let this = db.clone();
2387            conn.with_transaction(move |txn| {
2388                add_room_v2(
2389                    &this,
2390                    txn,
2391                    room_a_id,
2392                    Some(room_a_name),
2393                    Some(room_a_creator),
2394                    Some(room_a_create_sender),
2395                )?;
2396                add_room_v2(&this, txn, room_b_id, None, None, None)?;
2397                add_room_v2(&this, txn, room_c_id, None, None, Some(room_c_create_sender))?;
2398
2399                Result::<_, Error>::Ok(())
2400            })
2401            .await
2402            .unwrap();
2403        }
2404
2405        // This transparently migrates to the latest version.
2406        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2407
2408        // Check all room infos are there.
2409        let room_infos = store.get_room_infos().await.unwrap();
2410        assert_eq!(room_infos.len(), 3);
2411
2412        let room_a = room_infos.iter().find(|r| r.room_id() == room_a_id).unwrap();
2413        assert_eq!(room_a.name(), Some(room_a_name));
2414        assert_eq!(room_a.creator(), Some(room_a_create_sender));
2415
2416        let room_b = room_infos.iter().find(|r| r.room_id() == room_b_id).unwrap();
2417        assert_eq!(room_b.name(), None);
2418        assert_eq!(room_b.creator(), None);
2419
2420        let room_c = room_infos.iter().find(|r| r.room_id() == room_c_id).unwrap();
2421        assert_eq!(room_c.name(), None);
2422        assert_eq!(room_c.creator(), Some(room_c_create_sender));
2423    }
2424
2425    #[async_test]
2426    pub async fn test_migrating_v7_to_v9() {
2427        let path = new_path();
2428
2429        let room_id = room_id!("!room_a:dummy.local");
2430        let wedged_event_transaction_id = TransactionId::new();
2431        let local_event_transaction_id = TransactionId::new();
2432
2433        // Create and populate db.
2434        {
2435            let db = create_fake_db(&path, 7).await.unwrap();
2436            let conn = db.pool.get().await.unwrap();
2437
2438            let wedge_tx = wedged_event_transaction_id.clone();
2439            let local_tx = local_event_transaction_id.clone();
2440
2441            conn.with_transaction(move |txn| {
2442                add_dependent_send_queue_event_v7(
2443                    &db,
2444                    txn,
2445                    room_id,
2446                    &local_tx,
2447                    ChildTransactionId::new(),
2448                    DependentQueuedRequestKind::RedactEvent,
2449                )?;
2450                add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?;
2451                add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?;
2452                Result::<_, Error>::Ok(())
2453            })
2454            .await
2455            .unwrap();
2456        }
2457
2458        // This transparently migrates to the latest version, which clears up all
2459        // requests and dependent requests.
2460        let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2461
2462        let requests = store.load_send_queue_requests(room_id).await.unwrap();
2463        assert!(requests.is_empty());
2464
2465        let dependent_requests = store.load_dependent_queued_requests(room_id).await.unwrap();
2466        assert!(dependent_requests.is_empty());
2467    }
2468
2469    fn add_send_queue_event_v7(
2470        this: &SqliteStateStore,
2471        txn: &Transaction<'_>,
2472        transaction_id: &TransactionId,
2473        room_id: &RoomId,
2474        is_wedged: bool,
2475    ) -> Result<(), Error> {
2476        let content =
2477            SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?;
2478
2479        let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id);
2480        let room_id_value = this.serialize_value(&room_id.to_owned())?;
2481
2482        let content = this.serialize_json(&content)?;
2483
2484        txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
2485            .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
2486
2487        Ok(())
2488    }
2489
2490    fn add_dependent_send_queue_event_v7(
2491        this: &SqliteStateStore,
2492        txn: &Transaction<'_>,
2493        room_id: &RoomId,
2494        parent_txn_id: &TransactionId,
2495        own_txn_id: ChildTransactionId,
2496        content: DependentQueuedRequestKind,
2497    ) -> Result<(), Error> {
2498        let room_id_value = this.serialize_value(&room_id.to_owned())?;
2499
2500        let parent_txn_id = parent_txn_id.to_string();
2501        let own_txn_id = own_txn_id.to_string();
2502        let content = this.serialize_json(&content)?;
2503
2504        txn.prepare_cached(
2505            "INSERT INTO dependent_send_queue_events
2506                         (room_id, parent_transaction_id, own_transaction_id, content)
2507                       VALUES (?, ?, ?, ?)",
2508        )?
2509        .execute((room_id_value, parent_txn_id, own_txn_id, content))?;
2510
2511        Ok(())
2512    }
2513}