1use std::{
2 borrow::Cow,
3 collections::{BTreeMap, BTreeSet, HashMap},
4 fmt, iter,
5 path::Path,
6 str::FromStr as _,
7 sync::Arc,
8};
9
10use async_trait::async_trait;
11use matrix_sdk_base::{
12 deserialized_responses::{DisplayName, RawAnySyncOrStrippedState, SyncOrStrippedState},
13 store::{
14 compare_thread_subscription_bump_stamps, migration_helpers::RoomInfoV1, ChildTransactionId,
15 DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError, QueuedRequest,
16 QueuedRequestKind, RoomLoadSettings, SentRequestKey, StoredThreadSubscription,
17 ThreadSubscriptionStatus,
18 },
19 timer, MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateChanges, StateStore,
20 StateStoreDataKey, StateStoreDataValue, ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK,
21};
22use matrix_sdk_store_encryption::StoreCipher;
23use ruma::{
24 canonical_json::{redact, RedactedBecause},
25 events::{
26 presence::PresenceEvent,
27 receipt::{Receipt, ReceiptThread, ReceiptType},
28 room::{
29 create::RoomCreateEventContent,
30 member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
31 },
32 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
33 GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
34 },
35 serde::Raw,
36 CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
37 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
38};
39use rusqlite::{OptionalExtension, Transaction};
40use serde::{Deserialize, Serialize};
41use tokio::{
42 fs,
43 sync::{Mutex, OwnedMutexGuard},
44};
45use tracing::{debug, instrument, trace, warn};
46
47use crate::{
48 connection::{Connection as SqliteAsyncConn, Pool as SqlitePool},
49 error::{Error, Result},
50 utils::{
51 repeat_vars, EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
52 SqliteKeyValueStoreConnExt,
53 },
54 OpenStoreError, Secret, SqliteStoreConfig,
55};
56
57mod keys {
58 pub const KV_BLOB: &str = "kv_blob";
60 pub const ROOM_INFO: &str = "room_info";
61 pub const STATE_EVENT: &str = "state_event";
62 pub const GLOBAL_ACCOUNT_DATA: &str = "global_account_data";
63 pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
64 pub const MEMBER: &str = "member";
65 pub const PROFILE: &str = "profile";
66 pub const RECEIPT: &str = "receipt";
67 pub const DISPLAY_NAME: &str = "display_name";
68 pub const SEND_QUEUE: &str = "send_queue_events";
69 pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
70 pub const THREAD_SUBSCRIPTIONS: &str = "thread_subscriptions";
71}
72
73pub const DATABASE_NAME: &str = "matrix-sdk-state.sqlite3";
75
76const DATABASE_VERSION: u8 = 14;
82
83#[derive(Clone)]
85pub struct SqliteStateStore {
86 store_cipher: Option<Arc<StoreCipher>>,
87
88 pool: SqlitePool,
90
91 write_connection: Arc<Mutex<SqliteAsyncConn>>,
96}
97
98#[cfg(not(tarpaulin_include))]
99impl fmt::Debug for SqliteStateStore {
100 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101 f.debug_struct("SqliteStateStore").finish_non_exhaustive()
102 }
103}
104
105impl SqliteStateStore {
106 pub async fn open(
109 path: impl AsRef<Path>,
110 passphrase: Option<&str>,
111 ) -> Result<Self, OpenStoreError> {
112 Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
113 }
114
115 pub async fn open_with_key(
118 path: impl AsRef<Path>,
119 key: Option<&[u8; 32]>,
120 ) -> Result<Self, OpenStoreError> {
121 Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
122 }
123
124 pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
126 fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir)?;
127
128 let pool = config.build_pool_of_connections(DATABASE_NAME)?;
129
130 let this = Self::open_with_pool(pool, config.secret).await?;
131 this.pool.get().await?.apply_runtime_config(config.runtime_config).await?;
132
133 Ok(this)
134 }
135
136 pub async fn open_with_pool(
139 pool: SqlitePool,
140 secret: Option<Secret>,
141 ) -> Result<Self, OpenStoreError> {
142 let conn = pool.get().await?;
143
144 let mut version = conn.db_version().await?;
145
146 if version == 0 {
147 init(&conn).await?;
148 version = 1;
149 }
150
151 let store_cipher = match secret {
152 Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
153 None => None,
154 };
155 let this = Self {
156 store_cipher,
157 pool,
158 write_connection: Arc::new(Mutex::new(conn)),
160 };
161 this.run_migrations(version, None).await?;
162
163 Ok(this)
164 }
165
166 async fn run_migrations(&self, from: u8, to: Option<u8>) -> Result<()> {
171 let to = to.unwrap_or(DATABASE_VERSION);
172
173 if from < to {
174 debug!(version = from, new_version = to, "Upgrading database");
175 } else {
176 return Ok(());
177 }
178
179 let conn = self.write().await;
180
181 if from < 2 && to >= 2 {
182 let this = self.clone();
183 conn.with_transaction(move |txn| {
184 txn.execute_batch(include_str!(
186 "../migrations/state_store/002_a_create_new_room_info.sql"
187 ))?;
188
189 for data in txn
191 .prepare("SELECT data FROM room_info")?
192 .query_map((), |row| row.get::<_, Vec<u8>>(0))?
193 {
194 let data = data?;
195 let room_info: RoomInfoV1 = this.deserialize_json(&data)?;
196
197 let room_id = this.encode_key(keys::ROOM_INFO, room_info.room_id());
198 let state = this
199 .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
200 txn.prepare_cached(
201 "INSERT OR REPLACE INTO new_room_info (room_id, state, data)
202 VALUES (?, ?, ?)",
203 )?
204 .execute((room_id, state, data))?;
205 }
206
207 txn.execute_batch(include_str!(
209 "../migrations/state_store/002_b_replace_room_info.sql"
210 ))?;
211
212 txn.set_db_version(2)?;
213 Result::<_, Error>::Ok(())
214 })
215 .await?;
216 }
217
218 if from < 3 && to >= 3 {
220 let this = self.clone();
221 conn.with_transaction(move |txn| {
222 for data in txn
224 .prepare("SELECT data FROM room_info")?
225 .query_map((), |row| row.get::<_, Vec<u8>>(0))?
226 {
227 let data = data?;
228 let room_info_v1: RoomInfoV1 = this.deserialize_json(&data)?;
229
230 let room_id = this.encode_key(keys::STATE_EVENT, room_info_v1.room_id());
232 let event_type =
233 this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
234 let create_res = txn
235 .prepare(
236 "SELECT stripped, data FROM state_event
237 WHERE room_id = ? AND event_type = ?",
238 )?
239 .query_row([room_id, event_type], |row| {
240 Ok((row.get::<_, bool>(0)?, row.get::<_, Vec<u8>>(1)?))
241 })
242 .optional()?;
243
244 let create = create_res.and_then(|(stripped, data)| {
245 let create = if stripped {
246 SyncOrStrippedState::<RoomCreateEventContent>::Stripped(
247 this.deserialize_json(&data).ok()?,
248 )
249 } else {
250 SyncOrStrippedState::Sync(this.deserialize_json(&data).ok()?)
251 };
252 Some(create)
253 });
254
255 let migrated_room_info = room_info_v1.migrate(create.as_ref());
256
257 let data = this.serialize_json(&migrated_room_info)?;
258 let room_id = this.encode_key(keys::ROOM_INFO, migrated_room_info.room_id());
259 txn.prepare_cached("UPDATE room_info SET data = ? WHERE room_id = ?")?
260 .execute((data, room_id))?;
261 }
262
263 txn.set_db_version(3)?;
264 Result::<_, Error>::Ok(())
265 })
266 .await?;
267 }
268
269 if from < 4 && to >= 4 {
270 conn.with_transaction(move |txn| {
271 txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
273 txn.set_db_version(4)
274 })
275 .await?;
276 }
277
278 if from < 5 && to >= 5 {
279 conn.with_transaction(move |txn| {
280 txn.execute_batch(include_str!(
282 "../migrations/state_store/004_send_queue_with_roomid_value.sql"
283 ))?;
284 txn.set_db_version(4)
285 })
286 .await?;
287 }
288
289 if from < 6 && to >= 6 {
290 conn.with_transaction(move |txn| {
291 txn.execute_batch(include_str!(
293 "../migrations/state_store/005_send_queue_dependent_events.sql"
294 ))?;
295 txn.set_db_version(6)
296 })
297 .await?;
298 }
299
300 if from < 7 && to >= 7 {
301 conn.with_transaction(move |txn| {
302 txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
304 txn.set_db_version(7)
305 })
306 .await?;
307 }
308
309 if from < 8 && to >= 8 {
310 let error = QueueWedgeError::GenericApiError {
312 msg: "local echo failed to send in a previous session".into(),
313 };
314 let default_err = self.serialize_value(&error)?;
315
316 conn.with_transaction(move |txn| {
317 txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?;
319
320 for wedged_entries in txn
323 .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")?
324 .query_map((), |row| {
325 Ok(
326 (row.get::<_, Vec<u8>>(0)?,row.get::<_, String>(1)?)
327 )
328 })? {
329
330 let (room_id, transaction_id) = wedged_entries?;
331
332 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?
333 .execute((default_err.clone(), room_id, transaction_id))?;
334 }
335
336
337 txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?;
339
340 txn.set_db_version(8)
341 })
342 .await?;
343 }
344
345 if from < 9 && to >= 9 {
346 conn.with_transaction(move |txn| {
347 txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?;
349 txn.set_db_version(9)
350 })
351 .await?;
352 }
353
354 if from < 10 && to >= 10 {
355 conn.with_transaction(move |txn| {
356 txn.execute_batch(include_str!(
358 "../migrations/state_store/009_send_queue_priority.sql"
359 ))?;
360 txn.set_db_version(10)
361 })
362 .await?;
363 }
364
365 if from < 11 && to >= 11 {
366 conn.with_transaction(move |txn| {
367 txn.execute_batch(include_str!(
369 "../migrations/state_store/010_send_queue_enqueue_time.sql"
370 ))?;
371 txn.set_db_version(11)
372 })
373 .await?;
374 }
375
376 if from < 12 && to >= 12 {
377 conn.vacuum().await?;
381 conn.set_kv("version", vec![12]).await?;
382 }
383
384 if from < 13 && to >= 13 {
385 conn.with_transaction(move |txn| {
386 txn.execute_batch(include_str!(
388 "../migrations/state_store/011_thread_subscriptions.sql"
389 ))?;
390 txn.set_db_version(13)
391 })
392 .await?;
393 }
394
395 if from < 14 && to >= 14 {
396 conn.with_transaction(move |txn| {
397 txn.execute_batch(include_str!(
399 "../migrations/state_store/012_thread_subscriptions_bumpstamp.sql"
400 ))?;
401 txn.set_db_version(14)
402 })
403 .await?;
404 }
405
406 Ok(())
407 }
408
409 fn encode_state_store_data_key(&self, key: StateStoreDataKey<'_>) -> Key {
410 let key_s = match key {
411 StateStoreDataKey::SyncToken => Cow::Borrowed(StateStoreDataKey::SYNC_TOKEN),
412 StateStoreDataKey::ServerInfo => Cow::Borrowed(StateStoreDataKey::SERVER_INFO),
413 StateStoreDataKey::Filter(f) => {
414 Cow::Owned(format!("{}:{f}", StateStoreDataKey::FILTER))
415 }
416 StateStoreDataKey::UserAvatarUrl(u) => {
417 Cow::Owned(format!("{}:{u}", StateStoreDataKey::USER_AVATAR_URL))
418 }
419 StateStoreDataKey::RecentlyVisitedRooms(b) => {
420 Cow::Owned(format!("{}:{b}", StateStoreDataKey::RECENTLY_VISITED_ROOMS))
421 }
422 StateStoreDataKey::UtdHookManagerData => {
423 Cow::Borrowed(StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
424 }
425 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
426 Cow::Borrowed(StateStoreDataKey::ONE_TIME_KEY_ALREADY_UPLOADED)
427 }
428 StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
429 if let Some(thread_root) = thread_root {
430 Cow::Owned(format!(
431 "{}:{room_id}:{thread_root}",
432 StateStoreDataKey::COMPOSER_DRAFT
433 ))
434 } else {
435 Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::COMPOSER_DRAFT))
436 }
437 }
438 StateStoreDataKey::SeenKnockRequests(room_id) => {
439 Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::SEEN_KNOCK_REQUESTS))
440 }
441 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
442 Cow::Borrowed(StateStoreDataKey::THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS)
443 }
444 };
445
446 self.encode_key(keys::KV_BLOB, &*key_s)
447 }
448
449 fn encode_presence_key(&self, user_id: &UserId) -> Key {
450 self.encode_key(keys::KV_BLOB, format!("presence:{user_id}"))
451 }
452
453 fn encode_custom_key(&self, key: &[u8]) -> Key {
454 let mut full_key = b"custom:".to_vec();
455 full_key.extend(key);
456 self.encode_key(keys::KV_BLOB, full_key)
457 }
458
459 #[instrument(skip_all)]
461 async fn read(&self) -> Result<SqliteAsyncConn> {
462 trace!("Taking a `read` connection");
463 let _timer = timer!("connection");
464
465 Ok(self.pool.get().await?)
466 }
467
468 #[instrument(skip_all)]
470 async fn write(&self) -> OwnedMutexGuard<SqliteAsyncConn> {
471 trace!("Taking a `write` connection");
472 let _timer = timer!("connection");
473
474 self.write_connection.clone().lock_owned().await
475 }
476
477 fn remove_maybe_stripped_room_data(
478 &self,
479 txn: &Transaction<'_>,
480 room_id: &RoomId,
481 stripped: bool,
482 ) -> rusqlite::Result<()> {
483 let state_event_room_id = self.encode_key(keys::STATE_EVENT, room_id);
484 txn.remove_room_state_events(&state_event_room_id, Some(stripped))?;
485
486 let member_room_id = self.encode_key(keys::MEMBER, room_id);
487 txn.remove_room_members(&member_room_id, Some(stripped))
488 }
489}
490
491impl EncryptableStore for SqliteStateStore {
492 fn get_cypher(&self) -> Option<&StoreCipher> {
493 self.store_cipher.as_deref()
494 }
495}
496
497async fn init(conn: &SqliteAsyncConn) -> Result<()> {
499 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
502 conn.with_transaction(|txn| {
503 txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
504 txn.set_db_version(1)?;
505
506 Ok(())
507 })
508 .await
509}
510
511trait SqliteConnectionStateStoreExt {
512 fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()>;
513
514 fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()>;
515
516 fn set_room_account_data(
517 &self,
518 room_id: &[u8],
519 event_type: &[u8],
520 data: &[u8],
521 ) -> rusqlite::Result<()>;
522 fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()>;
523
524 fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()>;
525 fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>>;
526 fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()>;
527
528 fn set_state_event(
529 &self,
530 room_id: &[u8],
531 event_type: &[u8],
532 state_key: &[u8],
533 stripped: bool,
534 event_id: Option<&[u8]>,
535 data: &[u8],
536 ) -> rusqlite::Result<()>;
537 fn get_state_event_by_id(
538 &self,
539 room_id: &[u8],
540 event_id: &[u8],
541 ) -> rusqlite::Result<Option<Vec<u8>>>;
542 fn remove_room_state_events(
543 &self,
544 room_id: &[u8],
545 stripped: Option<bool>,
546 ) -> rusqlite::Result<()>;
547
548 fn set_member(
549 &self,
550 room_id: &[u8],
551 user_id: &[u8],
552 membership: &[u8],
553 stripped: bool,
554 data: &[u8],
555 ) -> rusqlite::Result<()>;
556 fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()>;
557
558 fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
559 fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()>;
560 fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()>;
561
562 fn set_receipt(
563 &self,
564 room_id: &[u8],
565 user_id: &[u8],
566 receipt_type: &[u8],
567 thread_id: &[u8],
568 event_id: &[u8],
569 data: &[u8],
570 ) -> rusqlite::Result<()>;
571 fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()>;
572
573 fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()>;
574 fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()>;
575 fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()>;
576 fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
577 fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
578}
579
580impl SqliteConnectionStateStoreExt for rusqlite::Connection {
581 fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()> {
582 self.execute("INSERT OR REPLACE INTO kv_blob VALUES (?, ?)", (key, value))?;
583 Ok(())
584 }
585
586 fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()> {
587 self.prepare_cached(
588 "INSERT OR REPLACE INTO global_account_data (event_type, data)
589 VALUES (?, ?)",
590 )?
591 .execute((event_type, data))?;
592 Ok(())
593 }
594
595 fn set_room_account_data(
596 &self,
597 room_id: &[u8],
598 event_type: &[u8],
599 data: &[u8],
600 ) -> rusqlite::Result<()> {
601 self.prepare_cached(
602 "INSERT OR REPLACE INTO room_account_data (room_id, event_type, data)
603 VALUES (?, ?, ?)",
604 )?
605 .execute((room_id, event_type, data))?;
606 Ok(())
607 }
608
609 fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()> {
610 self.prepare(
611 "DELETE FROM room_account_data
612 WHERE room_id = ?",
613 )?
614 .execute((room_id,))?;
615 Ok(())
616 }
617
618 fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()> {
619 self.prepare_cached(
620 "INSERT OR REPLACE INTO room_info (room_id, state, data)
621 VALUES (?, ?, ?)",
622 )?
623 .execute((room_id, state, data))?;
624 Ok(())
625 }
626
627 fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>> {
628 self.query_row("SELECT data FROM room_info WHERE room_id = ?", (room_id,), |row| row.get(0))
629 .optional()
630 }
631
632 fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()> {
634 self.prepare_cached("DELETE FROM room_info WHERE room_id = ?")?.execute((room_id,))?;
635 Ok(())
636 }
637
638 fn set_state_event(
639 &self,
640 room_id: &[u8],
641 event_type: &[u8],
642 state_key: &[u8],
643 stripped: bool,
644 event_id: Option<&[u8]>,
645 data: &[u8],
646 ) -> rusqlite::Result<()> {
647 self.prepare_cached(
648 "INSERT OR REPLACE
649 INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
650 VALUES (?, ?, ?, ?, ?, ?)",
651 )?
652 .execute((room_id, event_type, state_key, stripped, event_id, data))?;
653 Ok(())
654 }
655
656 fn get_state_event_by_id(
657 &self,
658 room_id: &[u8],
659 event_id: &[u8],
660 ) -> rusqlite::Result<Option<Vec<u8>>> {
661 self.query_row(
662 "SELECT data FROM state_event WHERE room_id = ? AND event_id = ?",
663 (room_id, event_id),
664 |row| row.get(0),
665 )
666 .optional()
667 }
668
669 fn remove_room_state_events(
675 &self,
676 room_id: &[u8],
677 stripped: Option<bool>,
678 ) -> rusqlite::Result<()> {
679 if let Some(stripped) = stripped {
680 self.prepare_cached("DELETE FROM state_event WHERE room_id = ? AND stripped = ?")?
681 .execute((room_id, stripped))?;
682 } else {
683 self.prepare_cached("DELETE FROM state_event WHERE room_id = ?")?
684 .execute((room_id,))?;
685 }
686 Ok(())
687 }
688
689 fn set_member(
690 &self,
691 room_id: &[u8],
692 user_id: &[u8],
693 membership: &[u8],
694 stripped: bool,
695 data: &[u8],
696 ) -> rusqlite::Result<()> {
697 self.prepare_cached(
698 "INSERT OR REPLACE
699 INTO member (room_id, user_id, membership, stripped, data)
700 VALUES (?, ?, ?, ?, ?)",
701 )?
702 .execute((room_id, user_id, membership, stripped, data))?;
703 Ok(())
704 }
705
706 fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()> {
711 if let Some(stripped) = stripped {
712 self.prepare_cached("DELETE FROM member WHERE room_id = ? AND stripped = ?")?
713 .execute((room_id, stripped))?;
714 } else {
715 self.prepare_cached("DELETE FROM member WHERE room_id = ?")?.execute((room_id,))?;
716 }
717 Ok(())
718 }
719
720 fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
721 self.prepare_cached(
722 "INSERT OR REPLACE
723 INTO profile (room_id, user_id, data)
724 VALUES (?, ?, ?)",
725 )?
726 .execute((room_id, user_id, data))?;
727 Ok(())
728 }
729
730 fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()> {
731 self.prepare("DELETE FROM profile WHERE room_id = ?")?.execute((room_id,))?;
732 Ok(())
733 }
734
735 fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()> {
736 self.prepare("DELETE FROM profile WHERE room_id = ? AND user_id = ?")?
737 .execute((room_id, user_id))?;
738 Ok(())
739 }
740
741 fn set_receipt(
742 &self,
743 room_id: &[u8],
744 user_id: &[u8],
745 receipt_type: &[u8],
746 thread: &[u8],
747 event_id: &[u8],
748 data: &[u8],
749 ) -> rusqlite::Result<()> {
750 self.prepare_cached(
751 "INSERT OR REPLACE
752 INTO receipt (room_id, user_id, receipt_type, thread, event_id, data)
753 VALUES (?, ?, ?, ?, ?, ?)",
754 )?
755 .execute((room_id, user_id, receipt_type, thread, event_id, data))?;
756 Ok(())
757 }
758
759 fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()> {
760 self.prepare("DELETE FROM receipt WHERE room_id = ?")?.execute((room_id,))?;
761 Ok(())
762 }
763
764 fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
765 self.prepare_cached(
766 "INSERT OR REPLACE
767 INTO display_name (room_id, name, data)
768 VALUES (?, ?, ?)",
769 )?
770 .execute((room_id, name, data))?;
771 Ok(())
772 }
773
774 fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()> {
775 self.prepare("DELETE FROM display_name WHERE room_id = ? AND name = ?")?
776 .execute((room_id, name))?;
777 Ok(())
778 }
779
780 fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()> {
781 self.prepare("DELETE FROM display_name WHERE room_id = ?")?.execute((room_id,))?;
782 Ok(())
783 }
784
785 fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
786 self.prepare("DELETE FROM send_queue_events WHERE room_id = ?")?.execute((room_id,))?;
787 Ok(())
788 }
789
790 fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
791 self.prepare("DELETE FROM dependent_send_queue_events WHERE room_id = ?")?
792 .execute((room_id,))?;
793 Ok(())
794 }
795}
796
797#[async_trait]
798trait SqliteObjectStateStoreExt: SqliteAsyncConnExt {
799 async fn get_kv_blob(&self, key: Key) -> Result<Option<Vec<u8>>> {
800 Ok(self
801 .query_row("SELECT value FROM kv_blob WHERE key = ?", (key,), |row| row.get(0))
802 .await
803 .optional()?)
804 }
805
806 async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
807 let keys_length = keys.len();
808
809 self.chunk_large_query_over(keys, Some(keys_length), |txn, keys| {
810 let sql_params = repeat_vars(keys.len());
811 let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");
812
813 let params = rusqlite::params_from_iter(keys);
814
815 Ok(txn
816 .prepare(&sql)?
817 .query(params)?
818 .mapped(|row| row.get(0))
819 .collect::<Result<_, _>>()?)
820 })
821 .await
822 }
823
824 async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;
825
826 async fn delete_kv_blob(&self, key: Key) -> Result<()> {
827 self.execute("DELETE FROM kv_blob WHERE key = ?", (key,)).await?;
828 Ok(())
829 }
830
831 async fn get_room_infos(&self, room_id: Option<Key>) -> Result<Vec<Vec<u8>>> {
832 Ok(match room_id {
833 None => {
834 self.prepare("SELECT data FROM room_info", move |mut stmt| {
835 stmt.query_map((), |row| row.get(0))?.collect()
836 })
837 .await?
838 }
839
840 Some(room_id) => {
841 self.prepare("SELECT data FROM room_info WHERE room_id = ?", move |mut stmt| {
842 stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
843 })
844 .await?
845 }
846 })
847 }
848
849 async fn get_maybe_stripped_state_events_for_keys(
850 &self,
851 room_id: Key,
852 event_type: Key,
853 state_keys: Vec<Key>,
854 ) -> Result<Vec<(bool, Vec<u8>)>> {
855 self.chunk_large_query_over(state_keys, None, move |txn, state_keys: Vec<Key>| {
856 let sql_params = repeat_vars(state_keys.len());
857 let sql = format!(
858 "SELECT stripped, data FROM state_event
859 WHERE room_id = ? AND event_type = ? AND state_key IN ({sql_params})"
860 );
861
862 let params = rusqlite::params_from_iter(
863 [room_id.clone(), event_type.clone()].into_iter().chain(state_keys),
864 );
865
866 Ok(txn
867 .prepare(&sql)?
868 .query(params)?
869 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
870 .collect::<Result<_, _>>()?)
871 })
872 .await
873 }
874
875 async fn get_maybe_stripped_state_events(
876 &self,
877 room_id: Key,
878 event_type: Key,
879 ) -> Result<Vec<(bool, Vec<u8>)>> {
880 Ok(self
881 .prepare(
882 "SELECT stripped, data FROM state_event
883 WHERE room_id = ? AND event_type = ?",
884 |mut stmt| {
885 stmt.query((room_id, event_type))?
886 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
887 .collect()
888 },
889 )
890 .await?)
891 }
892
893 async fn get_profiles(
894 &self,
895 room_id: Key,
896 user_ids: Vec<Key>,
897 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
898 let user_ids_length = user_ids.len();
899
900 self.chunk_large_query_over(user_ids, Some(user_ids_length), move |txn, user_ids| {
901 let sql_params = repeat_vars(user_ids.len());
902 let sql = format!(
903 "SELECT user_id, data FROM profile WHERE room_id = ? AND user_id IN ({sql_params})"
904 );
905
906 let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(user_ids));
907
908 Ok(txn
909 .prepare(&sql)?
910 .query(params)?
911 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
912 .collect::<Result<_, _>>()?)
913 })
914 .await
915 }
916
917 async fn get_user_ids(&self, room_id: Key, memberships: Vec<Key>) -> Result<Vec<Vec<u8>>> {
918 let res = if memberships.is_empty() {
919 self.prepare("SELECT data FROM member WHERE room_id = ?", |mut stmt| {
920 stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
921 })
922 .await?
923 } else {
924 self.chunk_large_query_over(memberships, None, move |txn, memberships| {
925 let sql_params = repeat_vars(memberships.len());
926 let sql = format!(
927 "SELECT data FROM member WHERE room_id = ? AND membership IN ({sql_params})"
928 );
929
930 let params =
931 rusqlite::params_from_iter(iter::once(room_id.clone()).chain(memberships));
932
933 Ok(txn
934 .prepare(&sql)?
935 .query(params)?
936 .mapped(|row| row.get(0))
937 .collect::<Result<_, _>>()?)
938 })
939 .await?
940 };
941
942 Ok(res)
943 }
944
945 async fn get_global_account_data(&self, event_type: Key) -> Result<Option<Vec<u8>>> {
946 Ok(self
947 .query_row(
948 "SELECT data FROM global_account_data WHERE event_type = ?",
949 (event_type,),
950 |row| row.get(0),
951 )
952 .await
953 .optional()?)
954 }
955
956 async fn get_room_account_data(
957 &self,
958 room_id: Key,
959 event_type: Key,
960 ) -> Result<Option<Vec<u8>>> {
961 Ok(self
962 .query_row(
963 "SELECT data FROM room_account_data WHERE room_id = ? AND event_type = ?",
964 (room_id, event_type),
965 |row| row.get(0),
966 )
967 .await
968 .optional()?)
969 }
970
971 async fn get_display_names(
972 &self,
973 room_id: Key,
974 names: Vec<Key>,
975 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
976 let names_length = names.len();
977
978 self.chunk_large_query_over(names, Some(names_length), move |txn, names| {
979 let sql_params = repeat_vars(names.len());
980 let sql = format!(
981 "SELECT name, data FROM display_name WHERE room_id = ? AND name IN ({sql_params})"
982 );
983
984 let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(names));
985
986 Ok(txn
987 .prepare(&sql)?
988 .query(params)?
989 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
990 .collect::<Result<_, _>>()?)
991 })
992 .await
993 }
994
995 async fn get_user_receipt(
996 &self,
997 room_id: Key,
998 receipt_type: Key,
999 thread: Key,
1000 user_id: Key,
1001 ) -> Result<Option<Vec<u8>>> {
1002 Ok(self
1003 .query_row(
1004 "SELECT data FROM receipt
1005 WHERE room_id = ? AND receipt_type = ? AND thread = ? and user_id = ?",
1006 (room_id, receipt_type, thread, user_id),
1007 |row| row.get(0),
1008 )
1009 .await
1010 .optional()?)
1011 }
1012
1013 async fn get_event_receipts(
1014 &self,
1015 room_id: Key,
1016 receipt_type: Key,
1017 thread: Key,
1018 event_id: Key,
1019 ) -> Result<Vec<Vec<u8>>> {
1020 Ok(self
1021 .prepare(
1022 "SELECT data FROM receipt
1023 WHERE room_id = ? AND receipt_type = ? AND thread = ? and event_id = ?",
1024 |mut stmt| {
1025 stmt.query((room_id, receipt_type, thread, event_id))?
1026 .mapped(|row| row.get(0))
1027 .collect()
1028 },
1029 )
1030 .await?)
1031 }
1032}
1033
1034#[async_trait]
1035impl SqliteObjectStateStoreExt for SqliteAsyncConn {
1036 async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()> {
1037 Ok(self.interact(move |conn| conn.set_kv_blob(&key, &value)).await.unwrap()?)
1038 }
1039}
1040
1041#[async_trait]
1042impl StateStore for SqliteStateStore {
1043 type Error = Error;
1044
1045 async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
1046 self.read()
1047 .await?
1048 .get_kv_blob(self.encode_state_store_data_key(key))
1049 .await?
1050 .map(|data| {
1051 Ok(match key {
1052 StateStoreDataKey::SyncToken => {
1053 StateStoreDataValue::SyncToken(self.deserialize_value(&data)?)
1054 }
1055 StateStoreDataKey::ServerInfo => {
1056 StateStoreDataValue::ServerInfo(self.deserialize_value(&data)?)
1057 }
1058 StateStoreDataKey::Filter(_) => {
1059 StateStoreDataValue::Filter(self.deserialize_value(&data)?)
1060 }
1061 StateStoreDataKey::UserAvatarUrl(_) => {
1062 StateStoreDataValue::UserAvatarUrl(self.deserialize_value(&data)?)
1063 }
1064 StateStoreDataKey::RecentlyVisitedRooms(_) => {
1065 StateStoreDataValue::RecentlyVisitedRooms(self.deserialize_value(&data)?)
1066 }
1067 StateStoreDataKey::UtdHookManagerData => {
1068 StateStoreDataValue::UtdHookManagerData(self.deserialize_value(&data)?)
1069 }
1070 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1071 StateStoreDataValue::OneTimeKeyAlreadyUploaded
1072 }
1073 StateStoreDataKey::ComposerDraft(_, _) => {
1074 StateStoreDataValue::ComposerDraft(self.deserialize_value(&data)?)
1075 }
1076 StateStoreDataKey::SeenKnockRequests(_) => {
1077 StateStoreDataValue::SeenKnockRequests(self.deserialize_value(&data)?)
1078 }
1079 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
1080 StateStoreDataValue::ThreadSubscriptionsCatchupTokens(
1081 self.deserialize_value(&data)?,
1082 )
1083 }
1084 })
1085 })
1086 .transpose()
1087 }
1088
1089 async fn set_kv_data(
1090 &self,
1091 key: StateStoreDataKey<'_>,
1092 value: StateStoreDataValue,
1093 ) -> Result<()> {
1094 let serialized_value = match key {
1095 StateStoreDataKey::SyncToken => self.serialize_value(
1096 &value.into_sync_token().expect("Session data not a sync token"),
1097 )?,
1098 StateStoreDataKey::ServerInfo => self.serialize_value(
1099 &value.into_server_info().expect("Session data not containing server info"),
1100 )?,
1101 StateStoreDataKey::Filter(_) => {
1102 self.serialize_value(&value.into_filter().expect("Session data not a filter"))?
1103 }
1104 StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
1105 &value.into_user_avatar_url().expect("Session data not an user avatar url"),
1106 )?,
1107 StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
1108 &value.into_recently_visited_rooms().expect("Session data not breadcrumbs"),
1109 )?,
1110 StateStoreDataKey::UtdHookManagerData => self.serialize_value(
1111 &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
1112 )?,
1113 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1114 self.serialize_value(&true).expect("We should be able to serialize a boolean")
1115 }
1116 StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
1117 &value.into_composer_draft().expect("Session data not a composer draft"),
1118 )?,
1119 StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
1120 &value
1121 .into_seen_knock_requests()
1122 .expect("Session data is not a set of seen knock request ids"),
1123 )?,
1124 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => self.serialize_value(
1125 &value
1126 .into_thread_subscriptions_catchup_tokens()
1127 .expect("Session data is not a list of thread subscription catchup tokens"),
1128 )?,
1129 };
1130
1131 self.write()
1132 .await
1133 .set_kv_blob(self.encode_state_store_data_key(key), serialized_value)
1134 .await
1135 }
1136
1137 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
1138 self.write().await.delete_kv_blob(self.encode_state_store_data_key(key)).await
1139 }
1140
1141 async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
1142 let changes = changes.to_owned();
1143 let this = self.clone();
1144 self.write()
1145 .await
1146 .with_transaction(move |txn| {
1147 let StateChanges {
1148 sync_token,
1149 account_data,
1150 presence,
1151 profiles,
1152 profiles_to_delete,
1153 state,
1154 room_account_data,
1155 room_infos,
1156 receipts,
1157 redactions,
1158 stripped_state,
1159 ambiguity_maps,
1160 } = changes;
1161
1162 if let Some(sync_token) = sync_token {
1163 let key = this.encode_state_store_data_key(StateStoreDataKey::SyncToken);
1164 let value = this.serialize_value(&sync_token)?;
1165 txn.set_kv_blob(&key, &value)?;
1166 }
1167
1168 for (event_type, event) in account_data {
1169 let event_type =
1170 this.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1171 let data = this.serialize_json(&event)?;
1172 txn.set_global_account_data(&event_type, &data)?;
1173 }
1174
1175 for (room_id, events) in room_account_data {
1176 let room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1177 for (event_type, event) in events {
1178 let event_type =
1179 this.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1180 let data = this.serialize_json(&event)?;
1181 txn.set_room_account_data(&room_id, &event_type, &data)?;
1182 }
1183 }
1184
1185 for (user_id, event) in presence {
1186 let key = this.encode_presence_key(&user_id);
1187 let value = this.serialize_json(&event)?;
1188 txn.set_kv_blob(&key, &value)?;
1189 }
1190
1191 for (room_id, room_info) in room_infos {
1192 let stripped = room_info.state() == RoomState::Invited;
1193 this.remove_maybe_stripped_room_data(txn, &room_id, !stripped)?;
1195
1196 let room_id = this.encode_key(keys::ROOM_INFO, room_id);
1197 let state = this
1198 .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
1199 let data = this.serialize_json(&room_info)?;
1200 txn.set_room_info(&room_id, &state, &data)?;
1201 }
1202
1203 for (room_id, user_ids) in profiles_to_delete {
1204 let room_id = this.encode_key(keys::PROFILE, room_id);
1205 for user_id in user_ids {
1206 let user_id = this.encode_key(keys::PROFILE, user_id);
1207 txn.remove_room_profile(&room_id, &user_id)?;
1208 }
1209 }
1210
1211 for (room_id, state_event_types) in state {
1212 let profiles = profiles.get(&room_id);
1213 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1214
1215 for (event_type, state_events) in state_event_types {
1216 let encoded_event_type =
1217 this.encode_key(keys::STATE_EVENT, event_type.to_string());
1218
1219 for (state_key, raw_state_event) in state_events {
1220 let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1221 let data = this.serialize_json(&raw_state_event)?;
1222
1223 let event_id: Option<String> =
1224 raw_state_event.get_field("event_id").ok().flatten();
1225 let encoded_event_id =
1226 event_id.as_ref().map(|e| this.encode_key(keys::STATE_EVENT, e));
1227
1228 txn.set_state_event(
1229 &encoded_room_id,
1230 &encoded_event_type,
1231 &encoded_state_key,
1232 false,
1233 encoded_event_id.as_deref(),
1234 &data,
1235 )?;
1236
1237 if event_type == StateEventType::RoomMember {
1238 let member_event = match raw_state_event
1239 .deserialize_as_unchecked::<SyncRoomMemberEvent>()
1240 {
1241 Ok(ev) => ev,
1242 Err(e) => {
1243 debug!(event_id, "Failed to deserialize member event: {e}");
1244 continue;
1245 }
1246 };
1247
1248 let encoded_room_id = this.encode_key(keys::MEMBER, &room_id);
1249 let user_id = this.encode_key(keys::MEMBER, &state_key);
1250 let membership = this
1251 .encode_key(keys::MEMBER, member_event.membership().as_str());
1252 let data = this.serialize_value(&state_key)?;
1253
1254 txn.set_member(
1255 &encoded_room_id,
1256 &user_id,
1257 &membership,
1258 false,
1259 &data,
1260 )?;
1261
1262 if let Some(profile) =
1263 profiles.and_then(|p| p.get(member_event.state_key()))
1264 {
1265 let room_id = this.encode_key(keys::PROFILE, &room_id);
1266 let user_id = this.encode_key(keys::PROFILE, &state_key);
1267 let data = this.serialize_json(&profile)?;
1268 txn.set_profile(&room_id, &user_id, &data)?;
1269 }
1270 }
1271 }
1272 }
1273 }
1274
1275 for (room_id, stripped_state_event_types) in stripped_state {
1276 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1277
1278 for (event_type, stripped_state_events) in stripped_state_event_types {
1279 let encoded_event_type =
1280 this.encode_key(keys::STATE_EVENT, event_type.to_string());
1281
1282 for (state_key, raw_stripped_state_event) in stripped_state_events {
1283 let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1284 let data = this.serialize_json(&raw_stripped_state_event)?;
1285 txn.set_state_event(
1286 &encoded_room_id,
1287 &encoded_event_type,
1288 &encoded_state_key,
1289 true,
1290 None,
1291 &data,
1292 )?;
1293
1294 if event_type == StateEventType::RoomMember {
1295 let member_event = match raw_stripped_state_event
1296 .deserialize_as_unchecked::<StrippedRoomMemberEvent>(
1297 ) {
1298 Ok(ev) => ev,
1299 Err(e) => {
1300 debug!("Failed to deserialize stripped member event: {e}");
1301 continue;
1302 }
1303 };
1304
1305 let room_id = this.encode_key(keys::MEMBER, &room_id);
1306 let user_id = this.encode_key(keys::MEMBER, &state_key);
1307 let membership = this.encode_key(
1308 keys::MEMBER,
1309 member_event.content.membership.as_str(),
1310 );
1311 let data = this.serialize_value(&state_key)?;
1312
1313 txn.set_member(&room_id, &user_id, &membership, true, &data)?;
1314 }
1315 }
1316 }
1317 }
1318
1319 for (room_id, receipt_event) in receipts {
1320 let room_id = this.encode_key(keys::RECEIPT, room_id);
1321
1322 for (event_id, receipt_types) in receipt_event {
1323 let encoded_event_id = this.encode_key(keys::RECEIPT, &event_id);
1324
1325 for (receipt_type, receipt_users) in receipt_types {
1326 let receipt_type =
1327 this.encode_key(keys::RECEIPT, receipt_type.as_str());
1328
1329 for (user_id, receipt) in receipt_users {
1330 let encoded_user_id = this.encode_key(keys::RECEIPT, &user_id);
1331 let thread = this.encode_key(
1334 keys::RECEIPT,
1335 rmp_serde::to_vec_named(&receipt.thread)?,
1336 );
1337 let data = this.serialize_json(&ReceiptData {
1338 receipt,
1339 event_id: event_id.clone(),
1340 user_id,
1341 })?;
1342
1343 txn.set_receipt(
1344 &room_id,
1345 &encoded_user_id,
1346 &receipt_type,
1347 &thread,
1348 &encoded_event_id,
1349 &data,
1350 )?;
1351 }
1352 }
1353 }
1354 }
1355
1356 for (room_id, redactions) in redactions {
1357 let make_redaction_rules = || {
1358 let encoded_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1359 txn.get_room_info(&encoded_room_id)
1360 .ok()
1361 .flatten()
1362 .and_then(|v| this.deserialize_json::<RoomInfo>(&v).ok())
1363 .map(|info| info.room_version_rules_or_default())
1364 .unwrap_or_else(|| {
1365 warn!(
1366 ?room_id,
1367 "Unable to get the room version rules, defaulting to rules for room version {ROOM_VERSION_FALLBACK}"
1368 );
1369 ROOM_VERSION_RULES_FALLBACK
1370 }).redaction
1371 };
1372
1373 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1374 let mut redaction_rules = None;
1375
1376 for (event_id, redaction) in redactions {
1377 let event_id = this.encode_key(keys::STATE_EVENT, event_id);
1378
1379 if let Some(Ok(raw_event)) = txn
1380 .get_state_event_by_id(&encoded_room_id, &event_id)?
1381 .map(|value| this.deserialize_json::<Raw<AnySyncStateEvent>>(&value))
1382 {
1383 let event = raw_event.deserialize()?;
1384 let redacted = redact(
1385 raw_event.deserialize_as::<CanonicalJsonObject>()?,
1386 redaction_rules.get_or_insert_with(make_redaction_rules),
1387 Some(RedactedBecause::from_raw_event(&redaction)?),
1388 )
1389 .map_err(Error::Redaction)?;
1390 let data = this.serialize_json(&redacted)?;
1391
1392 let event_type =
1393 this.encode_key(keys::STATE_EVENT, event.event_type().to_string());
1394 let state_key = this.encode_key(keys::STATE_EVENT, event.state_key());
1395
1396 txn.set_state_event(
1397 &encoded_room_id,
1398 &event_type,
1399 &state_key,
1400 false,
1401 Some(&event_id),
1402 &data,
1403 )?;
1404 }
1405 }
1406 }
1407
1408 for (room_id, display_names) in ambiguity_maps {
1409 let room_id = this.encode_key(keys::DISPLAY_NAME, room_id);
1410
1411 for (name, user_ids) in display_names {
1412 let encoded_name = this.encode_key(
1413 keys::DISPLAY_NAME,
1414 name.as_normalized_str().unwrap_or_else(|| name.as_raw_str()),
1415 );
1416 let data = this.serialize_json(&user_ids)?;
1417
1418 if user_ids.is_empty() {
1419 txn.remove_display_name(&room_id, &encoded_name)?;
1420
1421 let raw_name = this.encode_key(keys::DISPLAY_NAME, name.as_raw_str());
1436 txn.remove_display_name(&room_id, &raw_name)?;
1437 } else {
1438 txn.set_display_name(&room_id, &encoded_name, &data)?;
1440 }
1441 }
1442 }
1443
1444 Ok::<_, Error>(())
1445 })
1446 .await?;
1447
1448 Ok(())
1449 }
1450
1451 async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1452 self.read()
1453 .await?
1454 .get_kv_blob(self.encode_presence_key(user_id))
1455 .await?
1456 .map(|data| self.deserialize_json(&data))
1457 .transpose()
1458 }
1459
1460 async fn get_presence_events(
1461 &self,
1462 user_ids: &[OwnedUserId],
1463 ) -> Result<Vec<Raw<PresenceEvent>>> {
1464 if user_ids.is_empty() {
1465 return Ok(Vec::new());
1466 }
1467
1468 let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
1469 self.read()
1470 .await?
1471 .get_kv_blobs(user_ids)
1472 .await?
1473 .into_iter()
1474 .map(|data| self.deserialize_json(&data))
1475 .collect()
1476 }
1477
1478 async fn get_state_event(
1479 &self,
1480 room_id: &RoomId,
1481 event_type: StateEventType,
1482 state_key: &str,
1483 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1484 Ok(self
1485 .get_state_events_for_keys(room_id, event_type, &[state_key])
1486 .await?
1487 .into_iter()
1488 .next())
1489 }
1490
1491 async fn get_state_events(
1492 &self,
1493 room_id: &RoomId,
1494 event_type: StateEventType,
1495 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1496 let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1497 let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1498 self.read()
1499 .await?
1500 .get_maybe_stripped_state_events(room_id, event_type)
1501 .await?
1502 .into_iter()
1503 .map(|(stripped, data)| {
1504 let ev = if stripped {
1505 RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1506 } else {
1507 RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1508 };
1509
1510 Ok(ev)
1511 })
1512 .collect()
1513 }
1514
1515 async fn get_state_events_for_keys(
1516 &self,
1517 room_id: &RoomId,
1518 event_type: StateEventType,
1519 state_keys: &[&str],
1520 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1521 if state_keys.is_empty() {
1522 return Ok(Vec::new());
1523 }
1524
1525 let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1526 let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1527 let state_keys = state_keys.iter().map(|k| self.encode_key(keys::STATE_EVENT, k)).collect();
1528 self.read()
1529 .await?
1530 .get_maybe_stripped_state_events_for_keys(room_id, event_type, state_keys)
1531 .await?
1532 .into_iter()
1533 .map(|(stripped, data)| {
1534 let ev = if stripped {
1535 RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1536 } else {
1537 RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1538 };
1539
1540 Ok(ev)
1541 })
1542 .collect()
1543 }
1544
1545 async fn get_profile(
1546 &self,
1547 room_id: &RoomId,
1548 user_id: &UserId,
1549 ) -> Result<Option<MinimalRoomMemberEvent>> {
1550 let room_id = self.encode_key(keys::PROFILE, room_id);
1551 let user_ids = vec![self.encode_key(keys::PROFILE, user_id)];
1552
1553 self.read()
1554 .await?
1555 .get_profiles(room_id, user_ids)
1556 .await?
1557 .into_iter()
1558 .next()
1559 .map(|(_, data)| self.deserialize_json(&data))
1560 .transpose()
1561 }
1562
1563 async fn get_profiles<'a>(
1564 &self,
1565 room_id: &RoomId,
1566 user_ids: &'a [OwnedUserId],
1567 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1568 if user_ids.is_empty() {
1569 return Ok(BTreeMap::new());
1570 }
1571
1572 let room_id = self.encode_key(keys::PROFILE, room_id);
1573 let mut user_ids_map = user_ids
1574 .iter()
1575 .map(|u| (self.encode_key(keys::PROFILE, u), u.as_ref()))
1576 .collect::<BTreeMap<_, _>>();
1577 let user_ids = user_ids_map.keys().cloned().collect();
1578
1579 self.read()
1580 .await?
1581 .get_profiles(room_id, user_ids)
1582 .await?
1583 .into_iter()
1584 .map(|(user_id, data)| {
1585 Ok((
1586 user_ids_map
1587 .remove(user_id.as_slice())
1588 .expect("returned user IDs were requested"),
1589 self.deserialize_json(&data)?,
1590 ))
1591 })
1592 .collect()
1593 }
1594
1595 async fn get_user_ids(
1596 &self,
1597 room_id: &RoomId,
1598 membership: RoomMemberships,
1599 ) -> Result<Vec<OwnedUserId>> {
1600 let room_id = self.encode_key(keys::MEMBER, room_id);
1601 let memberships = membership
1602 .as_vec()
1603 .into_iter()
1604 .map(|m| self.encode_key(keys::MEMBER, m.as_str()))
1605 .collect();
1606 self.read()
1607 .await?
1608 .get_user_ids(room_id, memberships)
1609 .await?
1610 .iter()
1611 .map(|data| self.deserialize_value(data))
1612 .collect()
1613 }
1614
1615 async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1616 self.read()
1617 .await?
1618 .get_room_infos(match room_load_settings {
1619 RoomLoadSettings::All => None,
1620 RoomLoadSettings::One(room_id) => Some(self.encode_key(keys::ROOM_INFO, room_id)),
1621 })
1622 .await?
1623 .into_iter()
1624 .map(|data| self.deserialize_json(&data))
1625 .collect()
1626 }
1627
1628 async fn get_users_with_display_name(
1629 &self,
1630 room_id: &RoomId,
1631 display_name: &DisplayName,
1632 ) -> Result<BTreeSet<OwnedUserId>> {
1633 let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1634 let names = vec![self.encode_key(
1635 keys::DISPLAY_NAME,
1636 display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1637 )];
1638
1639 Ok(self
1640 .read()
1641 .await?
1642 .get_display_names(room_id, names)
1643 .await?
1644 .into_iter()
1645 .next()
1646 .map(|(_, data)| self.deserialize_json(&data))
1647 .transpose()?
1648 .unwrap_or_default())
1649 }
1650
1651 async fn get_users_with_display_names<'a>(
1652 &self,
1653 room_id: &RoomId,
1654 display_names: &'a [DisplayName],
1655 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1656 let mut result = HashMap::new();
1657
1658 if display_names.is_empty() {
1659 return Ok(result);
1660 }
1661
1662 let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1663 let mut names_map = display_names
1664 .iter()
1665 .flat_map(|display_name| {
1666 let raw =
1676 (self.encode_key(keys::DISPLAY_NAME, display_name.as_raw_str()), display_name);
1677 let normalized = display_name.as_normalized_str().map(|normalized| {
1678 (self.encode_key(keys::DISPLAY_NAME, normalized), display_name)
1679 });
1680
1681 iter::once(raw).chain(normalized)
1682 })
1683 .collect::<BTreeMap<_, _>>();
1684 let names = names_map.keys().cloned().collect();
1685
1686 for (name, data) in self.read().await?.get_display_names(room_id, names).await?.into_iter()
1687 {
1688 let display_name =
1689 names_map.remove(name.as_slice()).expect("returned display names were requested");
1690 let user_ids: BTreeSet<_> = self.deserialize_json(&data)?;
1691
1692 result.entry(display_name).or_insert_with(BTreeSet::new).extend(user_ids);
1693 }
1694
1695 Ok(result)
1696 }
1697
1698 async fn get_account_data_event(
1699 &self,
1700 event_type: GlobalAccountDataEventType,
1701 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1702 let event_type = self.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1703 self.read()
1704 .await?
1705 .get_global_account_data(event_type)
1706 .await?
1707 .map(|value| self.deserialize_json(&value))
1708 .transpose()
1709 }
1710
1711 async fn get_room_account_data_event(
1712 &self,
1713 room_id: &RoomId,
1714 event_type: RoomAccountDataEventType,
1715 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1716 let room_id = self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1717 let event_type = self.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1718 self.read()
1719 .await?
1720 .get_room_account_data(room_id, event_type)
1721 .await?
1722 .map(|value| self.deserialize_json(&value))
1723 .transpose()
1724 }
1725
1726 async fn get_user_room_receipt_event(
1727 &self,
1728 room_id: &RoomId,
1729 receipt_type: ReceiptType,
1730 thread: ReceiptThread,
1731 user_id: &UserId,
1732 ) -> Result<Option<(OwnedEventId, Receipt)>> {
1733 let room_id = self.encode_key(keys::RECEIPT, room_id);
1734 let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1735 let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1738 let user_id = self.encode_key(keys::RECEIPT, user_id);
1739
1740 self.read()
1741 .await?
1742 .get_user_receipt(room_id, receipt_type, thread, user_id)
1743 .await?
1744 .map(|value| {
1745 self.deserialize_json::<ReceiptData>(&value).map(|d| (d.event_id, d.receipt))
1746 })
1747 .transpose()
1748 }
1749
1750 async fn get_event_room_receipt_events(
1751 &self,
1752 room_id: &RoomId,
1753 receipt_type: ReceiptType,
1754 thread: ReceiptThread,
1755 event_id: &EventId,
1756 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1757 let room_id = self.encode_key(keys::RECEIPT, room_id);
1758 let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1759 let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1762 let event_id = self.encode_key(keys::RECEIPT, event_id);
1763
1764 self.read()
1765 .await?
1766 .get_event_receipts(room_id, receipt_type, thread, event_id)
1767 .await?
1768 .iter()
1769 .map(|value| {
1770 self.deserialize_json::<ReceiptData>(value).map(|d| (d.user_id, d.receipt))
1771 })
1772 .collect()
1773 }
1774
1775 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1776 self.read().await?.get_kv_blob(self.encode_custom_key(key)).await
1777 }
1778
1779 async fn set_custom_value_no_read(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
1780 let conn = self.write().await;
1781 let key = self.encode_custom_key(key);
1782 conn.set_kv_blob(key, value).await?;
1783 Ok(())
1784 }
1785
1786 async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1787 let conn = self.write().await;
1788 let key = self.encode_custom_key(key);
1789 let previous = conn.get_kv_blob(key.clone()).await?;
1790 conn.set_kv_blob(key, value).await?;
1791 Ok(previous)
1792 }
1793
1794 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1795 let conn = self.write().await;
1796 let key = self.encode_custom_key(key);
1797 let previous = conn.get_kv_blob(key.clone()).await?;
1798 if previous.is_some() {
1799 conn.delete_kv_blob(key).await?;
1800 }
1801 Ok(previous)
1802 }
1803
1804 async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1805 let this = self.clone();
1806 let room_id = room_id.to_owned();
1807
1808 let conn = self.write().await;
1809
1810 conn.with_transaction(move |txn| -> Result<()> {
1811 let room_info_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1812 txn.remove_room_info(&room_info_room_id)?;
1813
1814 let state_event_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1815 txn.remove_room_state_events(&state_event_room_id, None)?;
1816
1817 let member_room_id = this.encode_key(keys::MEMBER, &room_id);
1818 txn.remove_room_members(&member_room_id, None)?;
1819
1820 let profile_room_id = this.encode_key(keys::PROFILE, &room_id);
1821 txn.remove_room_profiles(&profile_room_id)?;
1822
1823 let room_account_data_room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, &room_id);
1824 txn.remove_room_account_data(&room_account_data_room_id)?;
1825
1826 let receipt_room_id = this.encode_key(keys::RECEIPT, &room_id);
1827 txn.remove_room_receipts(&receipt_room_id)?;
1828
1829 let display_name_room_id = this.encode_key(keys::DISPLAY_NAME, &room_id);
1830 txn.remove_room_display_names(&display_name_room_id)?;
1831
1832 let send_queue_room_id = this.encode_key(keys::SEND_QUEUE, &room_id);
1833 txn.remove_room_send_queue(&send_queue_room_id)?;
1834
1835 let dependent_send_queue_room_id =
1836 this.encode_key(keys::DEPENDENTS_SEND_QUEUE, &room_id);
1837 txn.remove_room_dependent_send_queue(&dependent_send_queue_room_id)?;
1838
1839 let thread_subscriptions_room_id =
1840 this.encode_key(keys::THREAD_SUBSCRIPTIONS, &room_id);
1841 txn.execute(
1842 "DELETE FROM thread_subscriptions WHERE room_id = ?",
1843 (thread_subscriptions_room_id,),
1844 )?;
1845
1846 Ok(())
1847 })
1848 .await?;
1849
1850 conn.vacuum().await
1851 }
1852
1853 async fn save_send_queue_request(
1854 &self,
1855 room_id: &RoomId,
1856 transaction_id: OwnedTransactionId,
1857 created_at: MilliSecondsSinceUnixEpoch,
1858 content: QueuedRequestKind,
1859 priority: usize,
1860 ) -> Result<(), Self::Error> {
1861 let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
1862 let room_id_value = self.serialize_value(&room_id.to_owned())?;
1863
1864 let content = self.serialize_json(&content)?;
1865 let created_at_ts: u64 = created_at.0.into();
1871 self.write()
1872 .await
1873 .with_transaction(move |txn| {
1874 txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
1875 Ok(())
1876 })
1877 .await
1878 }
1879
1880 async fn update_send_queue_request(
1881 &self,
1882 room_id: &RoomId,
1883 transaction_id: &TransactionId,
1884 content: QueuedRequestKind,
1885 ) -> Result<bool, Self::Error> {
1886 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1887
1888 let content = self.serialize_json(&content)?;
1889 let transaction_id = transaction_id.to_string();
1892
1893 let num_updated = self.write()
1894 .await
1895 .with_transaction(move |txn| {
1896 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
1897 })
1898 .await?;
1899
1900 Ok(num_updated > 0)
1901 }
1902
1903 async fn remove_send_queue_request(
1904 &self,
1905 room_id: &RoomId,
1906 transaction_id: &TransactionId,
1907 ) -> Result<bool, Self::Error> {
1908 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1909
1910 let transaction_id = transaction_id.to_string();
1912
1913 let num_deleted = self
1914 .write()
1915 .await
1916 .with_transaction(move |txn| {
1917 txn.prepare_cached(
1918 "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
1919 )?
1920 .execute((room_id, &transaction_id))
1921 })
1922 .await?;
1923
1924 Ok(num_deleted > 0)
1925 }
1926
1927 async fn load_send_queue_requests(
1928 &self,
1929 room_id: &RoomId,
1930 ) -> Result<Vec<QueuedRequest>, Self::Error> {
1931 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1932
1933 let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, Option<u64>)> = self
1937 .read()
1938 .await?
1939 .prepare(
1940 "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
1941 |mut stmt| {
1942 stmt.query((room_id,))?
1943 .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
1944 .collect()
1945 },
1946 )
1947 .await?;
1948
1949 let mut requests = Vec::with_capacity(res.len());
1950
1951 for entry in res {
1952 let created_at = entry
1953 .4
1954 .and_then(UInt::new)
1955 .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
1956
1957 requests.push(QueuedRequest {
1958 transaction_id: entry.0.into(),
1959 kind: self.deserialize_json(&entry.1)?,
1960 error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
1961 priority: entry.3,
1962 created_at,
1963 });
1964 }
1965
1966 Ok(requests)
1967 }
1968
1969 async fn update_send_queue_request_status(
1970 &self,
1971 room_id: &RoomId,
1972 transaction_id: &TransactionId,
1973 error: Option<QueueWedgeError>,
1974 ) -> Result<(), Self::Error> {
1975 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1976
1977 let transaction_id = transaction_id.to_string();
1979
1980 let error_value = error.map(|e| self.serialize_value(&e)).transpose()?;
1982
1983 self.write()
1984 .await
1985 .with_transaction(move |txn| {
1986 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?;
1987 Ok(())
1988 })
1989 .await
1990 }
1991
1992 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1993 let res: Vec<Vec<u8>> = self
1998 .read()
1999 .await?
2000 .prepare("SELECT room_id_val FROM send_queue_events", |mut stmt| {
2001 stmt.query(())?.mapped(|row| row.get(0)).collect()
2002 })
2003 .await?;
2004
2005 Ok(res
2008 .into_iter()
2009 .map(|entry| self.deserialize_value(&entry))
2010 .collect::<Result<BTreeSet<OwnedRoomId>, _>>()?
2011 .into_iter()
2012 .collect())
2013 }
2014
2015 async fn save_dependent_queued_request(
2016 &self,
2017 room_id: &RoomId,
2018 parent_txn_id: &TransactionId,
2019 own_txn_id: ChildTransactionId,
2020 created_at: MilliSecondsSinceUnixEpoch,
2021 content: DependentQueuedRequestKind,
2022 ) -> Result<()> {
2023 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2024 let content = self.serialize_json(&content)?;
2025
2026 let parent_txn_id = parent_txn_id.to_string();
2028 let own_txn_id = own_txn_id.to_string();
2029
2030 let created_at_ts: u64 = created_at.0.into();
2031 self.write()
2032 .await
2033 .with_transaction(move |txn| {
2034 txn.prepare_cached(
2035 r#"INSERT INTO dependent_send_queue_events
2036 (room_id, parent_transaction_id, own_transaction_id, content, created_at)
2037 VALUES (?, ?, ?, ?, ?)"#,
2038 )?
2039 .execute((
2040 room_id,
2041 parent_txn_id,
2042 own_txn_id,
2043 content,
2044 created_at_ts,
2045 ))?;
2046 Ok(())
2047 })
2048 .await
2049 }
2050
2051 async fn update_dependent_queued_request(
2052 &self,
2053 room_id: &RoomId,
2054 own_transaction_id: &ChildTransactionId,
2055 new_content: DependentQueuedRequestKind,
2056 ) -> Result<bool> {
2057 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2058 let content = self.serialize_json(&new_content)?;
2059
2060 let own_txn_id = own_transaction_id.to_string();
2062
2063 let num_updated = self
2064 .write()
2065 .await
2066 .with_transaction(move |txn| {
2067 txn.prepare_cached(
2068 r#"UPDATE dependent_send_queue_events
2069 SET content = ?
2070 WHERE own_transaction_id = ?
2071 AND room_id = ?"#,
2072 )?
2073 .execute((content, own_txn_id, room_id))
2074 })
2075 .await?;
2076
2077 if num_updated > 1 {
2078 return Err(Error::InconsistentUpdate);
2079 }
2080
2081 Ok(num_updated == 1)
2082 }
2083
2084 async fn mark_dependent_queued_requests_as_ready(
2085 &self,
2086 room_id: &RoomId,
2087 parent_txn_id: &TransactionId,
2088 parent_key: SentRequestKey,
2089 ) -> Result<usize> {
2090 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2091 let parent_key = self.serialize_value(&parent_key)?;
2092
2093 let parent_txn_id = parent_txn_id.to_string();
2095
2096 self.write()
2097 .await
2098 .with_transaction(move |txn| {
2099 Ok(txn.prepare_cached(
2100 "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?",
2101 )?
2102 .execute((parent_key, parent_txn_id, room_id))?)
2103 })
2104 .await
2105 }
2106
2107 async fn remove_dependent_queued_request(
2108 &self,
2109 room_id: &RoomId,
2110 txn_id: &ChildTransactionId,
2111 ) -> Result<bool> {
2112 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2113
2114 let txn_id = txn_id.to_string();
2116
2117 let num_deleted = self
2118 .write()
2119 .await
2120 .with_transaction(move |txn| {
2121 txn.prepare_cached(
2122 "DELETE FROM dependent_send_queue_events WHERE own_transaction_id = ? AND room_id = ?",
2123 )?
2124 .execute((txn_id, room_id))
2125 })
2126 .await?;
2127
2128 Ok(num_deleted > 0)
2129 }
2130
2131 async fn load_dependent_queued_requests(
2132 &self,
2133 room_id: &RoomId,
2134 ) -> Result<Vec<DependentQueuedRequest>> {
2135 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2136
2137 let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, Option<u64>)> = self
2139 .read()
2140 .await?
2141 .prepare(
2142 "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2143 |mut stmt| {
2144 stmt.query((room_id,))?
2145 .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2146 .collect()
2147 },
2148 )
2149 .await?;
2150
2151 let mut dependent_events = Vec::with_capacity(res.len());
2152
2153 for entry in res {
2154 let created_at = entry
2155 .4
2156 .and_then(UInt::new)
2157 .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2158
2159 dependent_events.push(DependentQueuedRequest {
2160 own_transaction_id: entry.0.into(),
2161 parent_transaction_id: entry.1.into(),
2162 parent_key: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?,
2163 kind: self.deserialize_json(&entry.3)?,
2164 created_at,
2165 });
2166 }
2167
2168 Ok(dependent_events)
2169 }
2170
2171 async fn upsert_thread_subscription(
2172 &self,
2173 room_id: &RoomId,
2174 thread_id: &EventId,
2175 mut new: StoredThreadSubscription,
2176 ) -> Result<(), Self::Error> {
2177 if let Some(previous) = self.load_thread_subscription(room_id, thread_id).await? {
2178 if previous == new {
2179 trace!("not saving thread subscription because the subscription is the same");
2181 return Ok(());
2182 }
2183
2184 if !compare_thread_subscription_bump_stamps(previous.bump_stamp, &mut new.bump_stamp) {
2185 trace!("not saving thread subscription because we have a newer bump stamp");
2186 return Ok(());
2187 }
2188 }
2189
2190 let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2191 let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2192 let status = new.status.as_str();
2193
2194 self.write()
2195 .await
2196 .with_transaction(move |txn| {
2197 txn.prepare_cached(
2199 "INSERT OR REPLACE INTO thread_subscriptions (room_id, event_id, status, bump_stamp)
2200 VALUES (?, ?, ?, ?)",
2201 )?
2202 .execute((room_id, thread_id, status, new.bump_stamp))
2203 })
2204 .await?;
2205 Ok(())
2206 }
2207
2208 async fn load_thread_subscription(
2209 &self,
2210 room_id: &RoomId,
2211 thread_id: &EventId,
2212 ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
2213 let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2214 let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2215
2216 Ok(self
2217 .read()
2218 .await?
2219 .query_row(
2220 "SELECT status, bump_stamp FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2221 (room_id, thread_id),
2222 |row| Ok((row.get::<_, String>(0)?, row.get::<_, Option<u64>>(1)?))
2223 )
2224 .await
2225 .optional()?
2226 .map(|(status, bump_stamp)| -> Result<_, Self::Error> {
2227 let status = ThreadSubscriptionStatus::from_str(&status).map_err(|_| {
2228 Error::InvalidData { details: format!("Invalid thread status: {status}") }
2229 })?;
2230 Ok(StoredThreadSubscription { status, bump_stamp })
2231 })
2232 .transpose()?)
2233 }
2234
2235 async fn remove_thread_subscription(
2236 &self,
2237 room_id: &RoomId,
2238 thread_id: &EventId,
2239 ) -> Result<(), Self::Error> {
2240 let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2241 let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2242
2243 self.write()
2244 .await
2245 .execute(
2246 "DELETE FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2247 (room_id, thread_id),
2248 )
2249 .await?;
2250
2251 Ok(())
2252 }
2253}
2254
2255#[derive(Debug, Clone, Serialize, Deserialize)]
2256struct ReceiptData {
2257 receipt: Receipt,
2258 event_id: OwnedEventId,
2259 user_id: OwnedUserId,
2260}
2261
2262#[cfg(test)]
2263mod tests {
2264 use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2265
2266 use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2267 use once_cell::sync::Lazy;
2268 use tempfile::{tempdir, TempDir};
2269
2270 use super::SqliteStateStore;
2271
2272 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2273 static NUM: AtomicU32 = AtomicU32::new(0);
2274
2275 async fn get_store() -> Result<impl StateStore, StoreError> {
2276 let name = NUM.fetch_add(1, SeqCst).to_string();
2277 let tmpdir_path = TMP_DIR.path().join(name);
2278
2279 tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2280
2281 Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
2282 }
2283
2284 statestore_integration_tests!();
2285}
2286
2287#[cfg(test)]
2288mod encrypted_tests {
2289 use std::{
2290 path::PathBuf,
2291 sync::atomic::{AtomicU32, Ordering::SeqCst},
2292 };
2293
2294 use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2295 use matrix_sdk_test::async_test;
2296 use once_cell::sync::Lazy;
2297 use tempfile::{tempdir, TempDir};
2298
2299 use super::SqliteStateStore;
2300 use crate::{utils::SqliteAsyncConnExt, SqliteStoreConfig};
2301
2302 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2303 static NUM: AtomicU32 = AtomicU32::new(0);
2304
2305 fn new_state_store_workspace() -> PathBuf {
2306 let name = NUM.fetch_add(1, SeqCst).to_string();
2307 TMP_DIR.path().join(name)
2308 }
2309
2310 async fn get_store() -> Result<impl StateStore, StoreError> {
2311 let tmpdir_path = new_state_store_workspace();
2312
2313 tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2314
2315 Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
2316 .await
2317 .unwrap())
2318 }
2319
2320 #[async_test]
2321 async fn test_pool_size() {
2322 let tmpdir_path = new_state_store_workspace();
2323 let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
2324
2325 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2326
2327 assert_eq!(store.pool.status().max_size, 42);
2328 }
2329
2330 #[async_test]
2331 async fn test_cache_size() {
2332 let tmpdir_path = new_state_store_workspace();
2333 let store_open_config = SqliteStoreConfig::new(tmpdir_path).cache_size(1500);
2334
2335 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2336
2337 let conn = store.pool.get().await.unwrap();
2338 let cache_size =
2339 conn.query_row("PRAGMA cache_size", (), |row| row.get::<_, i32>(0)).await.unwrap();
2340
2341 assert_eq!(cache_size, -(1500 / 1024));
2345 }
2346
2347 #[async_test]
2348 async fn test_journal_size_limit() {
2349 let tmpdir_path = new_state_store_workspace();
2350 let store_open_config = SqliteStoreConfig::new(tmpdir_path).journal_size_limit(1500);
2351
2352 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2353
2354 let conn = store.pool.get().await.unwrap();
2355 let journal_size_limit = conn
2356 .query_row("PRAGMA journal_size_limit", (), |row| row.get::<_, u32>(0))
2357 .await
2358 .unwrap();
2359
2360 assert_eq!(journal_size_limit, 1500);
2363 }
2364
2365 statestore_integration_tests!();
2366}
2367
2368#[cfg(test)]
2369mod migration_tests {
2370 use std::{
2371 path::{Path, PathBuf},
2372 sync::{
2373 atomic::{AtomicU32, Ordering::SeqCst},
2374 Arc,
2375 },
2376 };
2377
2378 use as_variant::as_variant;
2379 use matrix_sdk_base::{
2380 media::{MediaFormat, MediaRequestParameters},
2381 store::{
2382 ChildTransactionId, DependentQueuedRequestKind, RoomLoadSettings,
2383 SerializableEventContent,
2384 },
2385 sync::UnreadNotificationsCount,
2386 RoomState, StateStore,
2387 };
2388 use matrix_sdk_test::async_test;
2389 use once_cell::sync::Lazy;
2390 use ruma::{
2391 events::{
2392 room::{create::RoomCreateEventContent, message::RoomMessageEventContent, MediaSource},
2393 StateEventType,
2394 },
2395 room_id, server_name, user_id, EventId, MilliSecondsSinceUnixEpoch, OwnedTransactionId,
2396 RoomId, TransactionId, UserId,
2397 };
2398 use rusqlite::Transaction;
2399 use serde::{Deserialize, Serialize};
2400 use serde_json::json;
2401 use tempfile::{tempdir, TempDir};
2402 use tokio::{fs, sync::Mutex};
2403 use zeroize::Zeroizing;
2404
2405 use super::{init, keys, SqliteStateStore, DATABASE_NAME};
2406 use crate::{
2407 error::{Error, Result},
2408 utils::{EncryptableStore as _, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
2409 OpenStoreError, Secret, SqliteStoreConfig,
2410 };
2411
2412 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2413 static NUM: AtomicU32 = AtomicU32::new(0);
2414 const SECRET: &str = "secret";
2415
2416 fn new_path() -> PathBuf {
2417 let name = NUM.fetch_add(1, SeqCst).to_string();
2418 TMP_DIR.path().join(name)
2419 }
2420
2421 async fn create_fake_db(path: &Path, version: u8) -> Result<SqliteStateStore> {
2422 let config = SqliteStoreConfig::new(path);
2423
2424 fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir).unwrap();
2425
2426 let pool = config.build_pool_of_connections(DATABASE_NAME).unwrap();
2427 let conn = pool.get().await?;
2428
2429 init(&conn).await?;
2430
2431 let store_cipher = Some(Arc::new(
2432 conn.get_or_create_store_cipher(Secret::PassPhrase(Zeroizing::new(SECRET.to_owned())))
2433 .await
2434 .unwrap(),
2435 ));
2436 let this = SqliteStateStore {
2437 store_cipher,
2438 pool,
2439 write_connection: Arc::new(Mutex::new(conn)),
2441 };
2442 this.run_migrations(1, Some(version)).await?;
2443
2444 Ok(this)
2445 }
2446
2447 fn room_info_v1_json(
2448 room_id: &RoomId,
2449 state: RoomState,
2450 name: Option<&str>,
2451 creator: Option<&UserId>,
2452 ) -> serde_json::Value {
2453 let name_content = match name {
2455 Some(name) => json!({ "name": name }),
2456 None => json!({ "name": null }),
2457 };
2458 let create_content = match creator {
2460 Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2461 None => RoomCreateEventContent::new_v11(),
2462 };
2463
2464 json!({
2465 "room_id": room_id,
2466 "room_type": state,
2467 "notification_counts": UnreadNotificationsCount::default(),
2468 "summary": {
2469 "heroes": [],
2470 "joined_member_count": 0,
2471 "invited_member_count": 0,
2472 },
2473 "members_synced": false,
2474 "base_info": {
2475 "dm_targets": [],
2476 "max_power_level": 100,
2477 "name": {
2478 "Original": {
2479 "content": name_content,
2480 },
2481 },
2482 "create": {
2483 "Original": {
2484 "content": create_content,
2485 }
2486 }
2487 },
2488 })
2489 }
2490
2491 #[async_test]
2492 pub async fn test_migrating_v1_to_v2() {
2493 let path = new_path();
2494 {
2496 let db = create_fake_db(&path, 1).await.unwrap();
2497 let conn = db.pool.get().await.unwrap();
2498
2499 let this = db.clone();
2500 conn.with_transaction(move |txn| {
2501 for i in 0..5 {
2502 let room_id = RoomId::parse(format!("!room_{i}:localhost")).unwrap();
2503 let (state, stripped) =
2504 if i < 3 { (RoomState::Joined, false) } else { (RoomState::Invited, true) };
2505 let info = room_info_v1_json(&room_id, state, None, None);
2506
2507 let room_id = this.encode_key(keys::ROOM_INFO, room_id);
2508 let data = this.serialize_json(&info)?;
2509
2510 txn.prepare_cached(
2511 "INSERT INTO room_info (room_id, stripped, data)
2512 VALUES (?, ?, ?)",
2513 )?
2514 .execute((room_id, stripped, data))?;
2515 }
2516
2517 Result::<_, Error>::Ok(())
2518 })
2519 .await
2520 .unwrap();
2521 }
2522
2523 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2525
2526 assert_eq!(store.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 5);
2528 }
2529
2530 fn add_room_v2(
2532 this: &SqliteStateStore,
2533 txn: &Transaction<'_>,
2534 room_id: &RoomId,
2535 name: Option<&str>,
2536 create_creator: Option<&UserId>,
2537 create_sender: Option<&UserId>,
2538 ) -> Result<(), Error> {
2539 let room_info_json = room_info_v1_json(room_id, RoomState::Joined, name, create_creator);
2540
2541 let encoded_room_id = this.encode_key(keys::ROOM_INFO, room_id);
2542 let encoded_state =
2543 this.encode_key(keys::ROOM_INFO, serde_json::to_string(&RoomState::Joined)?);
2544 let data = this.serialize_json(&room_info_json)?;
2545
2546 txn.prepare_cached(
2547 "INSERT INTO room_info (room_id, state, data)
2548 VALUES (?, ?, ?)",
2549 )?
2550 .execute((encoded_room_id, encoded_state, data))?;
2551
2552 let Some(create_sender) = create_sender else {
2554 return Ok(());
2555 };
2556
2557 let create_content = match create_creator {
2558 Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2559 None => RoomCreateEventContent::new_v11(),
2560 };
2561
2562 let event_id = EventId::new(server_name!("dummy.local"));
2563 let create_event = json!({
2564 "content": create_content,
2565 "event_id": event_id,
2566 "sender": create_sender.to_owned(),
2567 "origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
2568 "state_key": "",
2569 "type": "m.room.create",
2570 "unsigned": {},
2571 });
2572
2573 let encoded_room_id = this.encode_key(keys::STATE_EVENT, room_id);
2574 let encoded_event_type =
2575 this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
2576 let encoded_state_key = this.encode_key(keys::STATE_EVENT, "");
2577 let stripped = false;
2578 let encoded_event_id = this.encode_key(keys::STATE_EVENT, event_id);
2579 let data = this.serialize_json(&create_event)?;
2580
2581 txn.prepare_cached(
2582 "INSERT
2583 INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
2584 VALUES (?, ?, ?, ?, ?, ?)",
2585 )?
2586 .execute((
2587 encoded_room_id,
2588 encoded_event_type,
2589 encoded_state_key,
2590 stripped,
2591 encoded_event_id,
2592 data,
2593 ))?;
2594
2595 Ok(())
2596 }
2597
2598 #[async_test]
2599 pub async fn test_migrating_v2_to_v3() {
2600 let path = new_path();
2601
2602 let room_a_id = room_id!("!room_a:dummy.local");
2604 let room_a_name = "Room A";
2605 let room_a_creator = user_id!("@creator:dummy.local");
2606 let room_a_create_sender = user_id!("@sender:dummy.local");
2609
2610 let room_b_id = room_id!("!room_b:dummy.local");
2612
2613 let room_c_id = room_id!("!room_c:dummy.local");
2615 let room_c_create_sender = user_id!("@creator:dummy.local");
2616
2617 {
2619 let db = create_fake_db(&path, 2).await.unwrap();
2620 let conn = db.pool.get().await.unwrap();
2621
2622 let this = db.clone();
2623 conn.with_transaction(move |txn| {
2624 add_room_v2(
2625 &this,
2626 txn,
2627 room_a_id,
2628 Some(room_a_name),
2629 Some(room_a_creator),
2630 Some(room_a_create_sender),
2631 )?;
2632 add_room_v2(&this, txn, room_b_id, None, None, None)?;
2633 add_room_v2(&this, txn, room_c_id, None, None, Some(room_c_create_sender))?;
2634
2635 Result::<_, Error>::Ok(())
2636 })
2637 .await
2638 .unwrap();
2639 }
2640
2641 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2643
2644 let room_infos = store.get_room_infos(&RoomLoadSettings::default()).await.unwrap();
2646 assert_eq!(room_infos.len(), 3);
2647
2648 let room_a = room_infos.iter().find(|r| r.room_id() == room_a_id).unwrap();
2649 assert_eq!(room_a.name(), Some(room_a_name));
2650 assert_eq!(room_a.creators(), Some(vec![room_a_create_sender.to_owned()]));
2651
2652 let room_b = room_infos.iter().find(|r| r.room_id() == room_b_id).unwrap();
2653 assert_eq!(room_b.name(), None);
2654 assert_eq!(room_b.creators(), None);
2655
2656 let room_c = room_infos.iter().find(|r| r.room_id() == room_c_id).unwrap();
2657 assert_eq!(room_c.name(), None);
2658 assert_eq!(room_c.creators(), Some(vec![room_c_create_sender.to_owned()]));
2659 }
2660
2661 #[async_test]
2662 pub async fn test_migrating_v7_to_v9() {
2663 let path = new_path();
2664
2665 let room_id = room_id!("!room_a:dummy.local");
2666 let wedged_event_transaction_id = TransactionId::new();
2667 let local_event_transaction_id = TransactionId::new();
2668
2669 {
2671 let db = create_fake_db(&path, 7).await.unwrap();
2672 let conn = db.pool.get().await.unwrap();
2673
2674 let wedge_tx = wedged_event_transaction_id.clone();
2675 let local_tx = local_event_transaction_id.clone();
2676
2677 conn.with_transaction(move |txn| {
2678 add_dependent_send_queue_event_v7(
2679 &db,
2680 txn,
2681 room_id,
2682 &local_tx,
2683 ChildTransactionId::new(),
2684 DependentQueuedRequestKind::RedactEvent,
2685 )?;
2686 add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?;
2687 add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?;
2688 Result::<_, Error>::Ok(())
2689 })
2690 .await
2691 .unwrap();
2692 }
2693
2694 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2697
2698 let requests = store.load_send_queue_requests(room_id).await.unwrap();
2699 assert!(requests.is_empty());
2700
2701 let dependent_requests = store.load_dependent_queued_requests(room_id).await.unwrap();
2702 assert!(dependent_requests.is_empty());
2703 }
2704
2705 fn add_send_queue_event_v7(
2706 this: &SqliteStateStore,
2707 txn: &Transaction<'_>,
2708 transaction_id: &TransactionId,
2709 room_id: &RoomId,
2710 is_wedged: bool,
2711 ) -> Result<(), Error> {
2712 let content =
2713 SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?;
2714
2715 let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id);
2716 let room_id_value = this.serialize_value(&room_id.to_owned())?;
2717
2718 let content = this.serialize_json(&content)?;
2719
2720 txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
2721 .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
2722
2723 Ok(())
2724 }
2725
2726 fn add_dependent_send_queue_event_v7(
2727 this: &SqliteStateStore,
2728 txn: &Transaction<'_>,
2729 room_id: &RoomId,
2730 parent_txn_id: &TransactionId,
2731 own_txn_id: ChildTransactionId,
2732 content: DependentQueuedRequestKind,
2733 ) -> Result<(), Error> {
2734 let room_id_value = this.serialize_value(&room_id.to_owned())?;
2735
2736 let parent_txn_id = parent_txn_id.to_string();
2737 let own_txn_id = own_txn_id.to_string();
2738 let content = this.serialize_json(&content)?;
2739
2740 txn.prepare_cached(
2741 "INSERT INTO dependent_send_queue_events
2742 (room_id, parent_transaction_id, own_transaction_id, content)
2743 VALUES (?, ?, ?, ?)",
2744 )?
2745 .execute((room_id_value, parent_txn_id, own_txn_id, content))?;
2746
2747 Ok(())
2748 }
2749
2750 #[derive(Clone, Debug, Serialize, Deserialize)]
2751 pub enum LegacyDependentQueuedRequestKind {
2752 UploadFileWithThumbnail {
2753 content_type: String,
2754 cache_key: MediaRequestParameters,
2755 related_to: OwnedTransactionId,
2756 },
2757 }
2758
2759 #[async_test]
2760 pub async fn test_dependent_queued_request_variant_renaming() {
2761 let path = new_path();
2762 let db = create_fake_db(&path, 7).await.unwrap();
2763
2764 let cache_key = MediaRequestParameters {
2765 format: MediaFormat::File,
2766 source: MediaSource::Plain("https://server.local/foobar".into()),
2767 };
2768 let related_to = TransactionId::new();
2769 let request = LegacyDependentQueuedRequestKind::UploadFileWithThumbnail {
2770 content_type: "image/png".to_owned(),
2771 cache_key,
2772 related_to: related_to.clone(),
2773 };
2774
2775 let data = db
2776 .serialize_json(&request)
2777 .expect("should be able to serialize legacy dependent request");
2778 let deserialized: DependentQueuedRequestKind = db.deserialize_json(&data).expect(
2779 "should be able to deserialize dependent request from legacy dependent request",
2780 );
2781
2782 as_variant!(deserialized, DependentQueuedRequestKind::UploadFileOrThumbnail { related_to: de_related_to, .. } => {
2783 assert_eq!(de_related_to, related_to);
2784 });
2785 }
2786}