1use std::{
2 borrow::Cow,
3 collections::{BTreeMap, BTreeSet, HashMap},
4 fmt, iter,
5 path::Path,
6 str::FromStr as _,
7 sync::Arc,
8};
9
10use async_trait::async_trait;
11use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
12use matrix_sdk_base::{
13 deserialized_responses::{DisplayName, RawAnySyncOrStrippedState, SyncOrStrippedState},
14 store::{
15 compare_thread_subscription_bump_stamps, migration_helpers::RoomInfoV1, ChildTransactionId,
16 DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError, QueuedRequest,
17 QueuedRequestKind, RoomLoadSettings, SentRequestKey, StoredThreadSubscription,
18 ThreadSubscriptionStatus,
19 },
20 timer, MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateChanges, StateStore,
21 StateStoreDataKey, StateStoreDataValue, ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK,
22};
23use matrix_sdk_store_encryption::StoreCipher;
24use ruma::{
25 canonical_json::{redact, RedactedBecause},
26 events::{
27 presence::PresenceEvent,
28 receipt::{Receipt, ReceiptThread, ReceiptType},
29 room::{
30 create::RoomCreateEventContent,
31 member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
32 },
33 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
34 GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
35 },
36 serde::Raw,
37 CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
38 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
39};
40use rusqlite::{OptionalExtension, Transaction};
41use serde::{Deserialize, Serialize};
42use tokio::{
43 fs,
44 sync::{Mutex, OwnedMutexGuard},
45};
46use tracing::{debug, instrument, trace, warn};
47
48use crate::{
49 error::{Error, Result},
50 utils::{
51 repeat_vars, EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
52 SqliteKeyValueStoreConnExt,
53 },
54 OpenStoreError, Secret, SqliteStoreConfig,
55};
56
57mod keys {
58 pub const KV_BLOB: &str = "kv_blob";
60 pub const ROOM_INFO: &str = "room_info";
61 pub const STATE_EVENT: &str = "state_event";
62 pub const GLOBAL_ACCOUNT_DATA: &str = "global_account_data";
63 pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
64 pub const MEMBER: &str = "member";
65 pub const PROFILE: &str = "profile";
66 pub const RECEIPT: &str = "receipt";
67 pub const DISPLAY_NAME: &str = "display_name";
68 pub const SEND_QUEUE: &str = "send_queue_events";
69 pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
70 pub const THREAD_SUBSCRIPTIONS: &str = "thread_subscriptions";
71}
72
73pub const DATABASE_NAME: &str = "matrix-sdk-state.sqlite3";
75
76const DATABASE_VERSION: u8 = 14;
82
83#[derive(Clone)]
85pub struct SqliteStateStore {
86 store_cipher: Option<Arc<StoreCipher>>,
87
88 pool: SqlitePool,
90
91 write_connection: Arc<Mutex<SqliteAsyncConn>>,
96}
97
98#[cfg(not(tarpaulin_include))]
99impl fmt::Debug for SqliteStateStore {
100 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101 f.debug_struct("SqliteStateStore").finish_non_exhaustive()
102 }
103}
104
105impl SqliteStateStore {
106 pub async fn open(
109 path: impl AsRef<Path>,
110 passphrase: Option<&str>,
111 ) -> Result<Self, OpenStoreError> {
112 Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
113 }
114
115 pub async fn open_with_key(
118 path: impl AsRef<Path>,
119 key: Option<&[u8; 32]>,
120 ) -> Result<Self, OpenStoreError> {
121 Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
122 }
123
124 pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
126 let SqliteStoreConfig { path, pool_config, runtime_config, secret } = config;
127
128 fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
129
130 let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
131 config.pool = Some(pool_config);
132
133 let pool = config.create_pool(Runtime::Tokio1)?;
134
135 let this = Self::open_with_pool(pool, secret).await?;
136 this.pool.get().await?.apply_runtime_config(runtime_config).await?;
137
138 Ok(this)
139 }
140
141 pub async fn open_with_pool(
144 pool: SqlitePool,
145 secret: Option<Secret>,
146 ) -> Result<Self, OpenStoreError> {
147 let conn = pool.get().await?;
148
149 let mut version = conn.db_version().await?;
150
151 if version == 0 {
152 init(&conn).await?;
153 version = 1;
154 }
155
156 let store_cipher = match secret {
157 Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
158 None => None,
159 };
160 let this = Self {
161 store_cipher,
162 pool,
163 write_connection: Arc::new(Mutex::new(conn)),
165 };
166 this.run_migrations(version, None).await?;
167
168 Ok(this)
169 }
170
171 async fn run_migrations(&self, from: u8, to: Option<u8>) -> Result<()> {
176 let to = to.unwrap_or(DATABASE_VERSION);
177
178 if from < to {
179 debug!(version = from, new_version = to, "Upgrading database");
180 } else {
181 return Ok(());
182 }
183
184 let conn = self.write().await;
185
186 if from < 2 && to >= 2 {
187 let this = self.clone();
188 conn.with_transaction(move |txn| {
189 txn.execute_batch(include_str!(
191 "../migrations/state_store/002_a_create_new_room_info.sql"
192 ))?;
193
194 for data in txn
196 .prepare("SELECT data FROM room_info")?
197 .query_map((), |row| row.get::<_, Vec<u8>>(0))?
198 {
199 let data = data?;
200 let room_info: RoomInfoV1 = this.deserialize_json(&data)?;
201
202 let room_id = this.encode_key(keys::ROOM_INFO, room_info.room_id());
203 let state = this
204 .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
205 txn.prepare_cached(
206 "INSERT OR REPLACE INTO new_room_info (room_id, state, data)
207 VALUES (?, ?, ?)",
208 )?
209 .execute((room_id, state, data))?;
210 }
211
212 txn.execute_batch(include_str!(
214 "../migrations/state_store/002_b_replace_room_info.sql"
215 ))?;
216
217 txn.set_db_version(2)?;
218 Result::<_, Error>::Ok(())
219 })
220 .await?;
221 }
222
223 if from < 3 && to >= 3 {
225 let this = self.clone();
226 conn.with_transaction(move |txn| {
227 for data in txn
229 .prepare("SELECT data FROM room_info")?
230 .query_map((), |row| row.get::<_, Vec<u8>>(0))?
231 {
232 let data = data?;
233 let room_info_v1: RoomInfoV1 = this.deserialize_json(&data)?;
234
235 let room_id = this.encode_key(keys::STATE_EVENT, room_info_v1.room_id());
237 let event_type =
238 this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
239 let create_res = txn
240 .prepare(
241 "SELECT stripped, data FROM state_event
242 WHERE room_id = ? AND event_type = ?",
243 )?
244 .query_row([room_id, event_type], |row| {
245 Ok((row.get::<_, bool>(0)?, row.get::<_, Vec<u8>>(1)?))
246 })
247 .optional()?;
248
249 let create = create_res.and_then(|(stripped, data)| {
250 let create = if stripped {
251 SyncOrStrippedState::<RoomCreateEventContent>::Stripped(
252 this.deserialize_json(&data).ok()?,
253 )
254 } else {
255 SyncOrStrippedState::Sync(this.deserialize_json(&data).ok()?)
256 };
257 Some(create)
258 });
259
260 let migrated_room_info = room_info_v1.migrate(create.as_ref());
261
262 let data = this.serialize_json(&migrated_room_info)?;
263 let room_id = this.encode_key(keys::ROOM_INFO, migrated_room_info.room_id());
264 txn.prepare_cached("UPDATE room_info SET data = ? WHERE room_id = ?")?
265 .execute((data, room_id))?;
266 }
267
268 txn.set_db_version(3)?;
269 Result::<_, Error>::Ok(())
270 })
271 .await?;
272 }
273
274 if from < 4 && to >= 4 {
275 conn.with_transaction(move |txn| {
276 txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
278 txn.set_db_version(4)
279 })
280 .await?;
281 }
282
283 if from < 5 && to >= 5 {
284 conn.with_transaction(move |txn| {
285 txn.execute_batch(include_str!(
287 "../migrations/state_store/004_send_queue_with_roomid_value.sql"
288 ))?;
289 txn.set_db_version(4)
290 })
291 .await?;
292 }
293
294 if from < 6 && to >= 6 {
295 conn.with_transaction(move |txn| {
296 txn.execute_batch(include_str!(
298 "../migrations/state_store/005_send_queue_dependent_events.sql"
299 ))?;
300 txn.set_db_version(6)
301 })
302 .await?;
303 }
304
305 if from < 7 && to >= 7 {
306 conn.with_transaction(move |txn| {
307 txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
309 txn.set_db_version(7)
310 })
311 .await?;
312 }
313
314 if from < 8 && to >= 8 {
315 let error = QueueWedgeError::GenericApiError {
317 msg: "local echo failed to send in a previous session".into(),
318 };
319 let default_err = self.serialize_value(&error)?;
320
321 conn.with_transaction(move |txn| {
322 txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?;
324
325 for wedged_entries in txn
328 .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")?
329 .query_map((), |row| {
330 Ok(
331 (row.get::<_, Vec<u8>>(0)?,row.get::<_, String>(1)?)
332 )
333 })? {
334
335 let (room_id, transaction_id) = wedged_entries?;
336
337 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?
338 .execute((default_err.clone(), room_id, transaction_id))?;
339 }
340
341
342 txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?;
344
345 txn.set_db_version(8)
346 })
347 .await?;
348 }
349
350 if from < 9 && to >= 9 {
351 conn.with_transaction(move |txn| {
352 txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?;
354 txn.set_db_version(9)
355 })
356 .await?;
357 }
358
359 if from < 10 && to >= 10 {
360 conn.with_transaction(move |txn| {
361 txn.execute_batch(include_str!(
363 "../migrations/state_store/009_send_queue_priority.sql"
364 ))?;
365 txn.set_db_version(10)
366 })
367 .await?;
368 }
369
370 if from < 11 && to >= 11 {
371 conn.with_transaction(move |txn| {
372 txn.execute_batch(include_str!(
374 "../migrations/state_store/010_send_queue_enqueue_time.sql"
375 ))?;
376 txn.set_db_version(11)
377 })
378 .await?;
379 }
380
381 if from < 12 && to >= 12 {
382 conn.vacuum().await?;
386 conn.set_kv("version", vec![12]).await?;
387 }
388
389 if from < 13 && to >= 13 {
390 conn.with_transaction(move |txn| {
391 txn.execute_batch(include_str!(
393 "../migrations/state_store/011_thread_subscriptions.sql"
394 ))?;
395 txn.set_db_version(13)
396 })
397 .await?;
398 }
399
400 if from < 14 && to >= 14 {
401 conn.with_transaction(move |txn| {
402 txn.execute_batch(include_str!(
404 "../migrations/state_store/012_thread_subscriptions_bumpstamp.sql"
405 ))?;
406 txn.set_db_version(14)
407 })
408 .await?;
409 }
410
411 Ok(())
412 }
413
414 fn encode_state_store_data_key(&self, key: StateStoreDataKey<'_>) -> Key {
415 let key_s = match key {
416 StateStoreDataKey::SyncToken => Cow::Borrowed(StateStoreDataKey::SYNC_TOKEN),
417 StateStoreDataKey::ServerInfo => Cow::Borrowed(StateStoreDataKey::SERVER_INFO),
418 StateStoreDataKey::Filter(f) => {
419 Cow::Owned(format!("{}:{f}", StateStoreDataKey::FILTER))
420 }
421 StateStoreDataKey::UserAvatarUrl(u) => {
422 Cow::Owned(format!("{}:{u}", StateStoreDataKey::USER_AVATAR_URL))
423 }
424 StateStoreDataKey::RecentlyVisitedRooms(b) => {
425 Cow::Owned(format!("{}:{b}", StateStoreDataKey::RECENTLY_VISITED_ROOMS))
426 }
427 StateStoreDataKey::UtdHookManagerData => {
428 Cow::Borrowed(StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
429 }
430 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
431 Cow::Borrowed(StateStoreDataKey::ONE_TIME_KEY_ALREADY_UPLOADED)
432 }
433 StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
434 if let Some(thread_root) = thread_root {
435 Cow::Owned(format!(
436 "{}:{room_id}:{thread_root}",
437 StateStoreDataKey::COMPOSER_DRAFT
438 ))
439 } else {
440 Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::COMPOSER_DRAFT))
441 }
442 }
443 StateStoreDataKey::SeenKnockRequests(room_id) => {
444 Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::SEEN_KNOCK_REQUESTS))
445 }
446 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
447 Cow::Borrowed(StateStoreDataKey::THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS)
448 }
449 };
450
451 self.encode_key(keys::KV_BLOB, &*key_s)
452 }
453
454 fn encode_presence_key(&self, user_id: &UserId) -> Key {
455 self.encode_key(keys::KV_BLOB, format!("presence:{user_id}"))
456 }
457
458 fn encode_custom_key(&self, key: &[u8]) -> Key {
459 let mut full_key = b"custom:".to_vec();
460 full_key.extend(key);
461 self.encode_key(keys::KV_BLOB, full_key)
462 }
463
464 #[instrument(skip_all)]
466 async fn read(&self) -> Result<SqliteAsyncConn> {
467 trace!("Taking a `read` connection");
468 let _timer = timer!("connection");
469
470 Ok(self.pool.get().await?)
471 }
472
473 #[instrument(skip_all)]
475 async fn write(&self) -> OwnedMutexGuard<SqliteAsyncConn> {
476 trace!("Taking a `write` connection");
477 let _timer = timer!("connection");
478
479 self.write_connection.clone().lock_owned().await
480 }
481
482 fn remove_maybe_stripped_room_data(
483 &self,
484 txn: &Transaction<'_>,
485 room_id: &RoomId,
486 stripped: bool,
487 ) -> rusqlite::Result<()> {
488 let state_event_room_id = self.encode_key(keys::STATE_EVENT, room_id);
489 txn.remove_room_state_events(&state_event_room_id, Some(stripped))?;
490
491 let member_room_id = self.encode_key(keys::MEMBER, room_id);
492 txn.remove_room_members(&member_room_id, Some(stripped))
493 }
494}
495
496impl EncryptableStore for SqliteStateStore {
497 fn get_cypher(&self) -> Option<&StoreCipher> {
498 self.store_cipher.as_deref()
499 }
500}
501
502async fn init(conn: &SqliteAsyncConn) -> Result<()> {
504 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
507 conn.with_transaction(|txn| {
508 txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
509 txn.set_db_version(1)?;
510
511 Ok(())
512 })
513 .await
514}
515
516trait SqliteConnectionStateStoreExt {
517 fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()>;
518
519 fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()>;
520
521 fn set_room_account_data(
522 &self,
523 room_id: &[u8],
524 event_type: &[u8],
525 data: &[u8],
526 ) -> rusqlite::Result<()>;
527 fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()>;
528
529 fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()>;
530 fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>>;
531 fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()>;
532
533 fn set_state_event(
534 &self,
535 room_id: &[u8],
536 event_type: &[u8],
537 state_key: &[u8],
538 stripped: bool,
539 event_id: Option<&[u8]>,
540 data: &[u8],
541 ) -> rusqlite::Result<()>;
542 fn get_state_event_by_id(
543 &self,
544 room_id: &[u8],
545 event_id: &[u8],
546 ) -> rusqlite::Result<Option<Vec<u8>>>;
547 fn remove_room_state_events(
548 &self,
549 room_id: &[u8],
550 stripped: Option<bool>,
551 ) -> rusqlite::Result<()>;
552
553 fn set_member(
554 &self,
555 room_id: &[u8],
556 user_id: &[u8],
557 membership: &[u8],
558 stripped: bool,
559 data: &[u8],
560 ) -> rusqlite::Result<()>;
561 fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()>;
562
563 fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
564 fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()>;
565 fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()>;
566
567 fn set_receipt(
568 &self,
569 room_id: &[u8],
570 user_id: &[u8],
571 receipt_type: &[u8],
572 thread_id: &[u8],
573 event_id: &[u8],
574 data: &[u8],
575 ) -> rusqlite::Result<()>;
576 fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()>;
577
578 fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()>;
579 fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()>;
580 fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()>;
581 fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
582 fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
583}
584
585impl SqliteConnectionStateStoreExt for rusqlite::Connection {
586 fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()> {
587 self.execute("INSERT OR REPLACE INTO kv_blob VALUES (?, ?)", (key, value))?;
588 Ok(())
589 }
590
591 fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()> {
592 self.prepare_cached(
593 "INSERT OR REPLACE INTO global_account_data (event_type, data)
594 VALUES (?, ?)",
595 )?
596 .execute((event_type, data))?;
597 Ok(())
598 }
599
600 fn set_room_account_data(
601 &self,
602 room_id: &[u8],
603 event_type: &[u8],
604 data: &[u8],
605 ) -> rusqlite::Result<()> {
606 self.prepare_cached(
607 "INSERT OR REPLACE INTO room_account_data (room_id, event_type, data)
608 VALUES (?, ?, ?)",
609 )?
610 .execute((room_id, event_type, data))?;
611 Ok(())
612 }
613
614 fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()> {
615 self.prepare(
616 "DELETE FROM room_account_data
617 WHERE room_id = ?",
618 )?
619 .execute((room_id,))?;
620 Ok(())
621 }
622
623 fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()> {
624 self.prepare_cached(
625 "INSERT OR REPLACE INTO room_info (room_id, state, data)
626 VALUES (?, ?, ?)",
627 )?
628 .execute((room_id, state, data))?;
629 Ok(())
630 }
631
632 fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>> {
633 self.query_row("SELECT data FROM room_info WHERE room_id = ?", (room_id,), |row| row.get(0))
634 .optional()
635 }
636
637 fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()> {
639 self.prepare_cached("DELETE FROM room_info WHERE room_id = ?")?.execute((room_id,))?;
640 Ok(())
641 }
642
643 fn set_state_event(
644 &self,
645 room_id: &[u8],
646 event_type: &[u8],
647 state_key: &[u8],
648 stripped: bool,
649 event_id: Option<&[u8]>,
650 data: &[u8],
651 ) -> rusqlite::Result<()> {
652 self.prepare_cached(
653 "INSERT OR REPLACE
654 INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
655 VALUES (?, ?, ?, ?, ?, ?)",
656 )?
657 .execute((room_id, event_type, state_key, stripped, event_id, data))?;
658 Ok(())
659 }
660
661 fn get_state_event_by_id(
662 &self,
663 room_id: &[u8],
664 event_id: &[u8],
665 ) -> rusqlite::Result<Option<Vec<u8>>> {
666 self.query_row(
667 "SELECT data FROM state_event WHERE room_id = ? AND event_id = ?",
668 (room_id, event_id),
669 |row| row.get(0),
670 )
671 .optional()
672 }
673
674 fn remove_room_state_events(
680 &self,
681 room_id: &[u8],
682 stripped: Option<bool>,
683 ) -> rusqlite::Result<()> {
684 if let Some(stripped) = stripped {
685 self.prepare_cached("DELETE FROM state_event WHERE room_id = ? AND stripped = ?")?
686 .execute((room_id, stripped))?;
687 } else {
688 self.prepare_cached("DELETE FROM state_event WHERE room_id = ?")?
689 .execute((room_id,))?;
690 }
691 Ok(())
692 }
693
694 fn set_member(
695 &self,
696 room_id: &[u8],
697 user_id: &[u8],
698 membership: &[u8],
699 stripped: bool,
700 data: &[u8],
701 ) -> rusqlite::Result<()> {
702 self.prepare_cached(
703 "INSERT OR REPLACE
704 INTO member (room_id, user_id, membership, stripped, data)
705 VALUES (?, ?, ?, ?, ?)",
706 )?
707 .execute((room_id, user_id, membership, stripped, data))?;
708 Ok(())
709 }
710
711 fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()> {
716 if let Some(stripped) = stripped {
717 self.prepare_cached("DELETE FROM member WHERE room_id = ? AND stripped = ?")?
718 .execute((room_id, stripped))?;
719 } else {
720 self.prepare_cached("DELETE FROM member WHERE room_id = ?")?.execute((room_id,))?;
721 }
722 Ok(())
723 }
724
725 fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
726 self.prepare_cached(
727 "INSERT OR REPLACE
728 INTO profile (room_id, user_id, data)
729 VALUES (?, ?, ?)",
730 )?
731 .execute((room_id, user_id, data))?;
732 Ok(())
733 }
734
735 fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()> {
736 self.prepare("DELETE FROM profile WHERE room_id = ?")?.execute((room_id,))?;
737 Ok(())
738 }
739
740 fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()> {
741 self.prepare("DELETE FROM profile WHERE room_id = ? AND user_id = ?")?
742 .execute((room_id, user_id))?;
743 Ok(())
744 }
745
746 fn set_receipt(
747 &self,
748 room_id: &[u8],
749 user_id: &[u8],
750 receipt_type: &[u8],
751 thread: &[u8],
752 event_id: &[u8],
753 data: &[u8],
754 ) -> rusqlite::Result<()> {
755 self.prepare_cached(
756 "INSERT OR REPLACE
757 INTO receipt (room_id, user_id, receipt_type, thread, event_id, data)
758 VALUES (?, ?, ?, ?, ?, ?)",
759 )?
760 .execute((room_id, user_id, receipt_type, thread, event_id, data))?;
761 Ok(())
762 }
763
764 fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()> {
765 self.prepare("DELETE FROM receipt WHERE room_id = ?")?.execute((room_id,))?;
766 Ok(())
767 }
768
769 fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
770 self.prepare_cached(
771 "INSERT OR REPLACE
772 INTO display_name (room_id, name, data)
773 VALUES (?, ?, ?)",
774 )?
775 .execute((room_id, name, data))?;
776 Ok(())
777 }
778
779 fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()> {
780 self.prepare("DELETE FROM display_name WHERE room_id = ? AND name = ?")?
781 .execute((room_id, name))?;
782 Ok(())
783 }
784
785 fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()> {
786 self.prepare("DELETE FROM display_name WHERE room_id = ?")?.execute((room_id,))?;
787 Ok(())
788 }
789
790 fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
791 self.prepare("DELETE FROM send_queue_events WHERE room_id = ?")?.execute((room_id,))?;
792 Ok(())
793 }
794
795 fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
796 self.prepare("DELETE FROM dependent_send_queue_events WHERE room_id = ?")?
797 .execute((room_id,))?;
798 Ok(())
799 }
800}
801
802#[async_trait]
803trait SqliteObjectStateStoreExt: SqliteAsyncConnExt {
804 async fn get_kv_blob(&self, key: Key) -> Result<Option<Vec<u8>>> {
805 Ok(self
806 .query_row("SELECT value FROM kv_blob WHERE key = ?", (key,), |row| row.get(0))
807 .await
808 .optional()?)
809 }
810
811 async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
812 let keys_length = keys.len();
813
814 self.chunk_large_query_over(keys, Some(keys_length), |txn, keys| {
815 let sql_params = repeat_vars(keys.len());
816 let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");
817
818 let params = rusqlite::params_from_iter(keys);
819
820 Ok(txn
821 .prepare(&sql)?
822 .query(params)?
823 .mapped(|row| row.get(0))
824 .collect::<Result<_, _>>()?)
825 })
826 .await
827 }
828
829 async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;
830
831 async fn delete_kv_blob(&self, key: Key) -> Result<()> {
832 self.execute("DELETE FROM kv_blob WHERE key = ?", (key,)).await?;
833 Ok(())
834 }
835
836 async fn get_room_infos(&self, room_id: Option<Key>) -> Result<Vec<Vec<u8>>> {
837 Ok(match room_id {
838 None => {
839 self.prepare("SELECT data FROM room_info", move |mut stmt| {
840 stmt.query_map((), |row| row.get(0))?.collect()
841 })
842 .await?
843 }
844
845 Some(room_id) => {
846 self.prepare("SELECT data FROM room_info WHERE room_id = ?", move |mut stmt| {
847 stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
848 })
849 .await?
850 }
851 })
852 }
853
854 async fn get_maybe_stripped_state_events_for_keys(
855 &self,
856 room_id: Key,
857 event_type: Key,
858 state_keys: Vec<Key>,
859 ) -> Result<Vec<(bool, Vec<u8>)>> {
860 self.chunk_large_query_over(state_keys, None, move |txn, state_keys: Vec<Key>| {
861 let sql_params = repeat_vars(state_keys.len());
862 let sql = format!(
863 "SELECT stripped, data FROM state_event
864 WHERE room_id = ? AND event_type = ? AND state_key IN ({sql_params})"
865 );
866
867 let params = rusqlite::params_from_iter(
868 [room_id.clone(), event_type.clone()].into_iter().chain(state_keys),
869 );
870
871 Ok(txn
872 .prepare(&sql)?
873 .query(params)?
874 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
875 .collect::<Result<_, _>>()?)
876 })
877 .await
878 }
879
880 async fn get_maybe_stripped_state_events(
881 &self,
882 room_id: Key,
883 event_type: Key,
884 ) -> Result<Vec<(bool, Vec<u8>)>> {
885 Ok(self
886 .prepare(
887 "SELECT stripped, data FROM state_event
888 WHERE room_id = ? AND event_type = ?",
889 |mut stmt| {
890 stmt.query((room_id, event_type))?
891 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
892 .collect()
893 },
894 )
895 .await?)
896 }
897
898 async fn get_profiles(
899 &self,
900 room_id: Key,
901 user_ids: Vec<Key>,
902 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
903 let user_ids_length = user_ids.len();
904
905 self.chunk_large_query_over(user_ids, Some(user_ids_length), move |txn, user_ids| {
906 let sql_params = repeat_vars(user_ids.len());
907 let sql = format!(
908 "SELECT user_id, data FROM profile WHERE room_id = ? AND user_id IN ({sql_params})"
909 );
910
911 let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(user_ids));
912
913 Ok(txn
914 .prepare(&sql)?
915 .query(params)?
916 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
917 .collect::<Result<_, _>>()?)
918 })
919 .await
920 }
921
922 async fn get_user_ids(&self, room_id: Key, memberships: Vec<Key>) -> Result<Vec<Vec<u8>>> {
923 let res = if memberships.is_empty() {
924 self.prepare("SELECT data FROM member WHERE room_id = ?", |mut stmt| {
925 stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
926 })
927 .await?
928 } else {
929 self.chunk_large_query_over(memberships, None, move |txn, memberships| {
930 let sql_params = repeat_vars(memberships.len());
931 let sql = format!(
932 "SELECT data FROM member WHERE room_id = ? AND membership IN ({sql_params})"
933 );
934
935 let params =
936 rusqlite::params_from_iter(iter::once(room_id.clone()).chain(memberships));
937
938 Ok(txn
939 .prepare(&sql)?
940 .query(params)?
941 .mapped(|row| row.get(0))
942 .collect::<Result<_, _>>()?)
943 })
944 .await?
945 };
946
947 Ok(res)
948 }
949
950 async fn get_global_account_data(&self, event_type: Key) -> Result<Option<Vec<u8>>> {
951 Ok(self
952 .query_row(
953 "SELECT data FROM global_account_data WHERE event_type = ?",
954 (event_type,),
955 |row| row.get(0),
956 )
957 .await
958 .optional()?)
959 }
960
961 async fn get_room_account_data(
962 &self,
963 room_id: Key,
964 event_type: Key,
965 ) -> Result<Option<Vec<u8>>> {
966 Ok(self
967 .query_row(
968 "SELECT data FROM room_account_data WHERE room_id = ? AND event_type = ?",
969 (room_id, event_type),
970 |row| row.get(0),
971 )
972 .await
973 .optional()?)
974 }
975
976 async fn get_display_names(
977 &self,
978 room_id: Key,
979 names: Vec<Key>,
980 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
981 let names_length = names.len();
982
983 self.chunk_large_query_over(names, Some(names_length), move |txn, names| {
984 let sql_params = repeat_vars(names.len());
985 let sql = format!(
986 "SELECT name, data FROM display_name WHERE room_id = ? AND name IN ({sql_params})"
987 );
988
989 let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(names));
990
991 Ok(txn
992 .prepare(&sql)?
993 .query(params)?
994 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
995 .collect::<Result<_, _>>()?)
996 })
997 .await
998 }
999
1000 async fn get_user_receipt(
1001 &self,
1002 room_id: Key,
1003 receipt_type: Key,
1004 thread: Key,
1005 user_id: Key,
1006 ) -> Result<Option<Vec<u8>>> {
1007 Ok(self
1008 .query_row(
1009 "SELECT data FROM receipt
1010 WHERE room_id = ? AND receipt_type = ? AND thread = ? and user_id = ?",
1011 (room_id, receipt_type, thread, user_id),
1012 |row| row.get(0),
1013 )
1014 .await
1015 .optional()?)
1016 }
1017
1018 async fn get_event_receipts(
1019 &self,
1020 room_id: Key,
1021 receipt_type: Key,
1022 thread: Key,
1023 event_id: Key,
1024 ) -> Result<Vec<Vec<u8>>> {
1025 Ok(self
1026 .prepare(
1027 "SELECT data FROM receipt
1028 WHERE room_id = ? AND receipt_type = ? AND thread = ? and event_id = ?",
1029 |mut stmt| {
1030 stmt.query((room_id, receipt_type, thread, event_id))?
1031 .mapped(|row| row.get(0))
1032 .collect()
1033 },
1034 )
1035 .await?)
1036 }
1037}
1038
1039#[async_trait]
1040impl SqliteObjectStateStoreExt for SqliteAsyncConn {
1041 async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()> {
1042 Ok(self.interact(move |conn| conn.set_kv_blob(&key, &value)).await.unwrap()?)
1043 }
1044}
1045
1046#[async_trait]
1047impl StateStore for SqliteStateStore {
1048 type Error = Error;
1049
1050 async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
1051 self.read()
1052 .await?
1053 .get_kv_blob(self.encode_state_store_data_key(key))
1054 .await?
1055 .map(|data| {
1056 Ok(match key {
1057 StateStoreDataKey::SyncToken => {
1058 StateStoreDataValue::SyncToken(self.deserialize_value(&data)?)
1059 }
1060 StateStoreDataKey::ServerInfo => {
1061 StateStoreDataValue::ServerInfo(self.deserialize_value(&data)?)
1062 }
1063 StateStoreDataKey::Filter(_) => {
1064 StateStoreDataValue::Filter(self.deserialize_value(&data)?)
1065 }
1066 StateStoreDataKey::UserAvatarUrl(_) => {
1067 StateStoreDataValue::UserAvatarUrl(self.deserialize_value(&data)?)
1068 }
1069 StateStoreDataKey::RecentlyVisitedRooms(_) => {
1070 StateStoreDataValue::RecentlyVisitedRooms(self.deserialize_value(&data)?)
1071 }
1072 StateStoreDataKey::UtdHookManagerData => {
1073 StateStoreDataValue::UtdHookManagerData(self.deserialize_value(&data)?)
1074 }
1075 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1076 StateStoreDataValue::OneTimeKeyAlreadyUploaded
1077 }
1078 StateStoreDataKey::ComposerDraft(_, _) => {
1079 StateStoreDataValue::ComposerDraft(self.deserialize_value(&data)?)
1080 }
1081 StateStoreDataKey::SeenKnockRequests(_) => {
1082 StateStoreDataValue::SeenKnockRequests(self.deserialize_value(&data)?)
1083 }
1084 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
1085 StateStoreDataValue::ThreadSubscriptionsCatchupTokens(
1086 self.deserialize_value(&data)?,
1087 )
1088 }
1089 })
1090 })
1091 .transpose()
1092 }
1093
1094 async fn set_kv_data(
1095 &self,
1096 key: StateStoreDataKey<'_>,
1097 value: StateStoreDataValue,
1098 ) -> Result<()> {
1099 let serialized_value = match key {
1100 StateStoreDataKey::SyncToken => self.serialize_value(
1101 &value.into_sync_token().expect("Session data not a sync token"),
1102 )?,
1103 StateStoreDataKey::ServerInfo => self.serialize_value(
1104 &value.into_server_info().expect("Session data not containing server info"),
1105 )?,
1106 StateStoreDataKey::Filter(_) => {
1107 self.serialize_value(&value.into_filter().expect("Session data not a filter"))?
1108 }
1109 StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
1110 &value.into_user_avatar_url().expect("Session data not an user avatar url"),
1111 )?,
1112 StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
1113 &value.into_recently_visited_rooms().expect("Session data not breadcrumbs"),
1114 )?,
1115 StateStoreDataKey::UtdHookManagerData => self.serialize_value(
1116 &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
1117 )?,
1118 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1119 self.serialize_value(&true).expect("We should be able to serialize a boolean")
1120 }
1121 StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
1122 &value.into_composer_draft().expect("Session data not a composer draft"),
1123 )?,
1124 StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
1125 &value
1126 .into_seen_knock_requests()
1127 .expect("Session data is not a set of seen knock request ids"),
1128 )?,
1129 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => self.serialize_value(
1130 &value
1131 .into_thread_subscriptions_catchup_tokens()
1132 .expect("Session data is not a list of thread subscription catchup tokens"),
1133 )?,
1134 };
1135
1136 self.write()
1137 .await
1138 .set_kv_blob(self.encode_state_store_data_key(key), serialized_value)
1139 .await
1140 }
1141
1142 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
1143 self.write().await.delete_kv_blob(self.encode_state_store_data_key(key)).await
1144 }
1145
1146 async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
1147 let changes = changes.to_owned();
1148 let this = self.clone();
1149 self.write()
1150 .await
1151 .with_transaction(move |txn| {
1152 let StateChanges {
1153 sync_token,
1154 account_data,
1155 presence,
1156 profiles,
1157 profiles_to_delete,
1158 state,
1159 room_account_data,
1160 room_infos,
1161 receipts,
1162 redactions,
1163 stripped_state,
1164 ambiguity_maps,
1165 } = changes;
1166
1167 if let Some(sync_token) = sync_token {
1168 let key = this.encode_state_store_data_key(StateStoreDataKey::SyncToken);
1169 let value = this.serialize_value(&sync_token)?;
1170 txn.set_kv_blob(&key, &value)?;
1171 }
1172
1173 for (event_type, event) in account_data {
1174 let event_type =
1175 this.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1176 let data = this.serialize_json(&event)?;
1177 txn.set_global_account_data(&event_type, &data)?;
1178 }
1179
1180 for (room_id, events) in room_account_data {
1181 let room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1182 for (event_type, event) in events {
1183 let event_type =
1184 this.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1185 let data = this.serialize_json(&event)?;
1186 txn.set_room_account_data(&room_id, &event_type, &data)?;
1187 }
1188 }
1189
1190 for (user_id, event) in presence {
1191 let key = this.encode_presence_key(&user_id);
1192 let value = this.serialize_json(&event)?;
1193 txn.set_kv_blob(&key, &value)?;
1194 }
1195
1196 for (room_id, room_info) in room_infos {
1197 let stripped = room_info.state() == RoomState::Invited;
1198 this.remove_maybe_stripped_room_data(txn, &room_id, !stripped)?;
1200
1201 let room_id = this.encode_key(keys::ROOM_INFO, room_id);
1202 let state = this
1203 .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
1204 let data = this.serialize_json(&room_info)?;
1205 txn.set_room_info(&room_id, &state, &data)?;
1206 }
1207
1208 for (room_id, user_ids) in profiles_to_delete {
1209 let room_id = this.encode_key(keys::PROFILE, room_id);
1210 for user_id in user_ids {
1211 let user_id = this.encode_key(keys::PROFILE, user_id);
1212 txn.remove_room_profile(&room_id, &user_id)?;
1213 }
1214 }
1215
1216 for (room_id, state_event_types) in state {
1217 let profiles = profiles.get(&room_id);
1218 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1219
1220 for (event_type, state_events) in state_event_types {
1221 let encoded_event_type =
1222 this.encode_key(keys::STATE_EVENT, event_type.to_string());
1223
1224 for (state_key, raw_state_event) in state_events {
1225 let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1226 let data = this.serialize_json(&raw_state_event)?;
1227
1228 let event_id: Option<String> =
1229 raw_state_event.get_field("event_id").ok().flatten();
1230 let encoded_event_id =
1231 event_id.as_ref().map(|e| this.encode_key(keys::STATE_EVENT, e));
1232
1233 txn.set_state_event(
1234 &encoded_room_id,
1235 &encoded_event_type,
1236 &encoded_state_key,
1237 false,
1238 encoded_event_id.as_deref(),
1239 &data,
1240 )?;
1241
1242 if event_type == StateEventType::RoomMember {
1243 let member_event = match raw_state_event
1244 .deserialize_as_unchecked::<SyncRoomMemberEvent>()
1245 {
1246 Ok(ev) => ev,
1247 Err(e) => {
1248 debug!(event_id, "Failed to deserialize member event: {e}");
1249 continue;
1250 }
1251 };
1252
1253 let encoded_room_id = this.encode_key(keys::MEMBER, &room_id);
1254 let user_id = this.encode_key(keys::MEMBER, &state_key);
1255 let membership = this
1256 .encode_key(keys::MEMBER, member_event.membership().as_str());
1257 let data = this.serialize_value(&state_key)?;
1258
1259 txn.set_member(
1260 &encoded_room_id,
1261 &user_id,
1262 &membership,
1263 false,
1264 &data,
1265 )?;
1266
1267 if let Some(profile) =
1268 profiles.and_then(|p| p.get(member_event.state_key()))
1269 {
1270 let room_id = this.encode_key(keys::PROFILE, &room_id);
1271 let user_id = this.encode_key(keys::PROFILE, &state_key);
1272 let data = this.serialize_json(&profile)?;
1273 txn.set_profile(&room_id, &user_id, &data)?;
1274 }
1275 }
1276 }
1277 }
1278 }
1279
1280 for (room_id, stripped_state_event_types) in stripped_state {
1281 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1282
1283 for (event_type, stripped_state_events) in stripped_state_event_types {
1284 let encoded_event_type =
1285 this.encode_key(keys::STATE_EVENT, event_type.to_string());
1286
1287 for (state_key, raw_stripped_state_event) in stripped_state_events {
1288 let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1289 let data = this.serialize_json(&raw_stripped_state_event)?;
1290 txn.set_state_event(
1291 &encoded_room_id,
1292 &encoded_event_type,
1293 &encoded_state_key,
1294 true,
1295 None,
1296 &data,
1297 )?;
1298
1299 if event_type == StateEventType::RoomMember {
1300 let member_event = match raw_stripped_state_event
1301 .deserialize_as_unchecked::<StrippedRoomMemberEvent>(
1302 ) {
1303 Ok(ev) => ev,
1304 Err(e) => {
1305 debug!("Failed to deserialize stripped member event: {e}");
1306 continue;
1307 }
1308 };
1309
1310 let room_id = this.encode_key(keys::MEMBER, &room_id);
1311 let user_id = this.encode_key(keys::MEMBER, &state_key);
1312 let membership = this.encode_key(
1313 keys::MEMBER,
1314 member_event.content.membership.as_str(),
1315 );
1316 let data = this.serialize_value(&state_key)?;
1317
1318 txn.set_member(&room_id, &user_id, &membership, true, &data)?;
1319 }
1320 }
1321 }
1322 }
1323
1324 for (room_id, receipt_event) in receipts {
1325 let room_id = this.encode_key(keys::RECEIPT, room_id);
1326
1327 for (event_id, receipt_types) in receipt_event {
1328 let encoded_event_id = this.encode_key(keys::RECEIPT, &event_id);
1329
1330 for (receipt_type, receipt_users) in receipt_types {
1331 let receipt_type =
1332 this.encode_key(keys::RECEIPT, receipt_type.as_str());
1333
1334 for (user_id, receipt) in receipt_users {
1335 let encoded_user_id = this.encode_key(keys::RECEIPT, &user_id);
1336 let thread = this.encode_key(
1339 keys::RECEIPT,
1340 rmp_serde::to_vec_named(&receipt.thread)?,
1341 );
1342 let data = this.serialize_json(&ReceiptData {
1343 receipt,
1344 event_id: event_id.clone(),
1345 user_id,
1346 })?;
1347
1348 txn.set_receipt(
1349 &room_id,
1350 &encoded_user_id,
1351 &receipt_type,
1352 &thread,
1353 &encoded_event_id,
1354 &data,
1355 )?;
1356 }
1357 }
1358 }
1359 }
1360
1361 for (room_id, redactions) in redactions {
1362 let make_redaction_rules = || {
1363 let encoded_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1364 txn.get_room_info(&encoded_room_id)
1365 .ok()
1366 .flatten()
1367 .and_then(|v| this.deserialize_json::<RoomInfo>(&v).ok())
1368 .map(|info| info.room_version_rules_or_default())
1369 .unwrap_or_else(|| {
1370 warn!(
1371 ?room_id,
1372 "Unable to get the room version rules, defaulting to rules for room version {ROOM_VERSION_FALLBACK}"
1373 );
1374 ROOM_VERSION_RULES_FALLBACK
1375 }).redaction
1376 };
1377
1378 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1379 let mut redaction_rules = None;
1380
1381 for (event_id, redaction) in redactions {
1382 let event_id = this.encode_key(keys::STATE_EVENT, event_id);
1383
1384 if let Some(Ok(raw_event)) = txn
1385 .get_state_event_by_id(&encoded_room_id, &event_id)?
1386 .map(|value| this.deserialize_json::<Raw<AnySyncStateEvent>>(&value))
1387 {
1388 let event = raw_event.deserialize()?;
1389 let redacted = redact(
1390 raw_event.deserialize_as::<CanonicalJsonObject>()?,
1391 redaction_rules.get_or_insert_with(make_redaction_rules),
1392 Some(RedactedBecause::from_raw_event(&redaction)?),
1393 )
1394 .map_err(Error::Redaction)?;
1395 let data = this.serialize_json(&redacted)?;
1396
1397 let event_type =
1398 this.encode_key(keys::STATE_EVENT, event.event_type().to_string());
1399 let state_key = this.encode_key(keys::STATE_EVENT, event.state_key());
1400
1401 txn.set_state_event(
1402 &encoded_room_id,
1403 &event_type,
1404 &state_key,
1405 false,
1406 Some(&event_id),
1407 &data,
1408 )?;
1409 }
1410 }
1411 }
1412
1413 for (room_id, display_names) in ambiguity_maps {
1414 let room_id = this.encode_key(keys::DISPLAY_NAME, room_id);
1415
1416 for (name, user_ids) in display_names {
1417 let encoded_name = this.encode_key(
1418 keys::DISPLAY_NAME,
1419 name.as_normalized_str().unwrap_or_else(|| name.as_raw_str()),
1420 );
1421 let data = this.serialize_json(&user_ids)?;
1422
1423 if user_ids.is_empty() {
1424 txn.remove_display_name(&room_id, &encoded_name)?;
1425
1426 let raw_name = this.encode_key(keys::DISPLAY_NAME, name.as_raw_str());
1441 txn.remove_display_name(&room_id, &raw_name)?;
1442 } else {
1443 txn.set_display_name(&room_id, &encoded_name, &data)?;
1445 }
1446 }
1447 }
1448
1449 Ok::<_, Error>(())
1450 })
1451 .await?;
1452
1453 Ok(())
1454 }
1455
1456 async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1457 self.read()
1458 .await?
1459 .get_kv_blob(self.encode_presence_key(user_id))
1460 .await?
1461 .map(|data| self.deserialize_json(&data))
1462 .transpose()
1463 }
1464
1465 async fn get_presence_events(
1466 &self,
1467 user_ids: &[OwnedUserId],
1468 ) -> Result<Vec<Raw<PresenceEvent>>> {
1469 if user_ids.is_empty() {
1470 return Ok(Vec::new());
1471 }
1472
1473 let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
1474 self.read()
1475 .await?
1476 .get_kv_blobs(user_ids)
1477 .await?
1478 .into_iter()
1479 .map(|data| self.deserialize_json(&data))
1480 .collect()
1481 }
1482
1483 async fn get_state_event(
1484 &self,
1485 room_id: &RoomId,
1486 event_type: StateEventType,
1487 state_key: &str,
1488 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1489 Ok(self
1490 .get_state_events_for_keys(room_id, event_type, &[state_key])
1491 .await?
1492 .into_iter()
1493 .next())
1494 }
1495
1496 async fn get_state_events(
1497 &self,
1498 room_id: &RoomId,
1499 event_type: StateEventType,
1500 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1501 let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1502 let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1503 self.read()
1504 .await?
1505 .get_maybe_stripped_state_events(room_id, event_type)
1506 .await?
1507 .into_iter()
1508 .map(|(stripped, data)| {
1509 let ev = if stripped {
1510 RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1511 } else {
1512 RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1513 };
1514
1515 Ok(ev)
1516 })
1517 .collect()
1518 }
1519
1520 async fn get_state_events_for_keys(
1521 &self,
1522 room_id: &RoomId,
1523 event_type: StateEventType,
1524 state_keys: &[&str],
1525 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1526 if state_keys.is_empty() {
1527 return Ok(Vec::new());
1528 }
1529
1530 let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1531 let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1532 let state_keys = state_keys.iter().map(|k| self.encode_key(keys::STATE_EVENT, k)).collect();
1533 self.read()
1534 .await?
1535 .get_maybe_stripped_state_events_for_keys(room_id, event_type, state_keys)
1536 .await?
1537 .into_iter()
1538 .map(|(stripped, data)| {
1539 let ev = if stripped {
1540 RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1541 } else {
1542 RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1543 };
1544
1545 Ok(ev)
1546 })
1547 .collect()
1548 }
1549
1550 async fn get_profile(
1551 &self,
1552 room_id: &RoomId,
1553 user_id: &UserId,
1554 ) -> Result<Option<MinimalRoomMemberEvent>> {
1555 let room_id = self.encode_key(keys::PROFILE, room_id);
1556 let user_ids = vec![self.encode_key(keys::PROFILE, user_id)];
1557
1558 self.read()
1559 .await?
1560 .get_profiles(room_id, user_ids)
1561 .await?
1562 .into_iter()
1563 .next()
1564 .map(|(_, data)| self.deserialize_json(&data))
1565 .transpose()
1566 }
1567
1568 async fn get_profiles<'a>(
1569 &self,
1570 room_id: &RoomId,
1571 user_ids: &'a [OwnedUserId],
1572 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1573 if user_ids.is_empty() {
1574 return Ok(BTreeMap::new());
1575 }
1576
1577 let room_id = self.encode_key(keys::PROFILE, room_id);
1578 let mut user_ids_map = user_ids
1579 .iter()
1580 .map(|u| (self.encode_key(keys::PROFILE, u), u.as_ref()))
1581 .collect::<BTreeMap<_, _>>();
1582 let user_ids = user_ids_map.keys().cloned().collect();
1583
1584 self.read()
1585 .await?
1586 .get_profiles(room_id, user_ids)
1587 .await?
1588 .into_iter()
1589 .map(|(user_id, data)| {
1590 Ok((
1591 user_ids_map
1592 .remove(user_id.as_slice())
1593 .expect("returned user IDs were requested"),
1594 self.deserialize_json(&data)?,
1595 ))
1596 })
1597 .collect()
1598 }
1599
1600 async fn get_user_ids(
1601 &self,
1602 room_id: &RoomId,
1603 membership: RoomMemberships,
1604 ) -> Result<Vec<OwnedUserId>> {
1605 let room_id = self.encode_key(keys::MEMBER, room_id);
1606 let memberships = membership
1607 .as_vec()
1608 .into_iter()
1609 .map(|m| self.encode_key(keys::MEMBER, m.as_str()))
1610 .collect();
1611 self.read()
1612 .await?
1613 .get_user_ids(room_id, memberships)
1614 .await?
1615 .iter()
1616 .map(|data| self.deserialize_value(data))
1617 .collect()
1618 }
1619
1620 async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1621 self.read()
1622 .await?
1623 .get_room_infos(match room_load_settings {
1624 RoomLoadSettings::All => None,
1625 RoomLoadSettings::One(room_id) => Some(self.encode_key(keys::ROOM_INFO, room_id)),
1626 })
1627 .await?
1628 .into_iter()
1629 .map(|data| self.deserialize_json(&data))
1630 .collect()
1631 }
1632
1633 async fn get_users_with_display_name(
1634 &self,
1635 room_id: &RoomId,
1636 display_name: &DisplayName,
1637 ) -> Result<BTreeSet<OwnedUserId>> {
1638 let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1639 let names = vec![self.encode_key(
1640 keys::DISPLAY_NAME,
1641 display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1642 )];
1643
1644 Ok(self
1645 .read()
1646 .await?
1647 .get_display_names(room_id, names)
1648 .await?
1649 .into_iter()
1650 .next()
1651 .map(|(_, data)| self.deserialize_json(&data))
1652 .transpose()?
1653 .unwrap_or_default())
1654 }
1655
1656 async fn get_users_with_display_names<'a>(
1657 &self,
1658 room_id: &RoomId,
1659 display_names: &'a [DisplayName],
1660 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1661 let mut result = HashMap::new();
1662
1663 if display_names.is_empty() {
1664 return Ok(result);
1665 }
1666
1667 let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1668 let mut names_map = display_names
1669 .iter()
1670 .flat_map(|display_name| {
1671 let raw =
1681 (self.encode_key(keys::DISPLAY_NAME, display_name.as_raw_str()), display_name);
1682 let normalized = display_name.as_normalized_str().map(|normalized| {
1683 (self.encode_key(keys::DISPLAY_NAME, normalized), display_name)
1684 });
1685
1686 iter::once(raw).chain(normalized)
1687 })
1688 .collect::<BTreeMap<_, _>>();
1689 let names = names_map.keys().cloned().collect();
1690
1691 for (name, data) in self.read().await?.get_display_names(room_id, names).await?.into_iter()
1692 {
1693 let display_name =
1694 names_map.remove(name.as_slice()).expect("returned display names were requested");
1695 let user_ids: BTreeSet<_> = self.deserialize_json(&data)?;
1696
1697 result.entry(display_name).or_insert_with(BTreeSet::new).extend(user_ids);
1698 }
1699
1700 Ok(result)
1701 }
1702
1703 async fn get_account_data_event(
1704 &self,
1705 event_type: GlobalAccountDataEventType,
1706 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1707 let event_type = self.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1708 self.read()
1709 .await?
1710 .get_global_account_data(event_type)
1711 .await?
1712 .map(|value| self.deserialize_json(&value))
1713 .transpose()
1714 }
1715
1716 async fn get_room_account_data_event(
1717 &self,
1718 room_id: &RoomId,
1719 event_type: RoomAccountDataEventType,
1720 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1721 let room_id = self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1722 let event_type = self.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1723 self.read()
1724 .await?
1725 .get_room_account_data(room_id, event_type)
1726 .await?
1727 .map(|value| self.deserialize_json(&value))
1728 .transpose()
1729 }
1730
1731 async fn get_user_room_receipt_event(
1732 &self,
1733 room_id: &RoomId,
1734 receipt_type: ReceiptType,
1735 thread: ReceiptThread,
1736 user_id: &UserId,
1737 ) -> Result<Option<(OwnedEventId, Receipt)>> {
1738 let room_id = self.encode_key(keys::RECEIPT, room_id);
1739 let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1740 let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1743 let user_id = self.encode_key(keys::RECEIPT, user_id);
1744
1745 self.read()
1746 .await?
1747 .get_user_receipt(room_id, receipt_type, thread, user_id)
1748 .await?
1749 .map(|value| {
1750 self.deserialize_json::<ReceiptData>(&value).map(|d| (d.event_id, d.receipt))
1751 })
1752 .transpose()
1753 }
1754
1755 async fn get_event_room_receipt_events(
1756 &self,
1757 room_id: &RoomId,
1758 receipt_type: ReceiptType,
1759 thread: ReceiptThread,
1760 event_id: &EventId,
1761 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1762 let room_id = self.encode_key(keys::RECEIPT, room_id);
1763 let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1764 let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1767 let event_id = self.encode_key(keys::RECEIPT, event_id);
1768
1769 self.read()
1770 .await?
1771 .get_event_receipts(room_id, receipt_type, thread, event_id)
1772 .await?
1773 .iter()
1774 .map(|value| {
1775 self.deserialize_json::<ReceiptData>(value).map(|d| (d.user_id, d.receipt))
1776 })
1777 .collect()
1778 }
1779
1780 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1781 self.read().await?.get_kv_blob(self.encode_custom_key(key)).await
1782 }
1783
1784 async fn set_custom_value_no_read(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
1785 let conn = self.write().await;
1786 let key = self.encode_custom_key(key);
1787 conn.set_kv_blob(key, value).await?;
1788 Ok(())
1789 }
1790
1791 async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1792 let conn = self.write().await;
1793 let key = self.encode_custom_key(key);
1794 let previous = conn.get_kv_blob(key.clone()).await?;
1795 conn.set_kv_blob(key, value).await?;
1796 Ok(previous)
1797 }
1798
1799 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1800 let conn = self.write().await;
1801 let key = self.encode_custom_key(key);
1802 let previous = conn.get_kv_blob(key.clone()).await?;
1803 if previous.is_some() {
1804 conn.delete_kv_blob(key).await?;
1805 }
1806 Ok(previous)
1807 }
1808
1809 async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1810 let this = self.clone();
1811 let room_id = room_id.to_owned();
1812
1813 let conn = self.write().await;
1814
1815 conn.with_transaction(move |txn| -> Result<()> {
1816 let room_info_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1817 txn.remove_room_info(&room_info_room_id)?;
1818
1819 let state_event_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1820 txn.remove_room_state_events(&state_event_room_id, None)?;
1821
1822 let member_room_id = this.encode_key(keys::MEMBER, &room_id);
1823 txn.remove_room_members(&member_room_id, None)?;
1824
1825 let profile_room_id = this.encode_key(keys::PROFILE, &room_id);
1826 txn.remove_room_profiles(&profile_room_id)?;
1827
1828 let room_account_data_room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, &room_id);
1829 txn.remove_room_account_data(&room_account_data_room_id)?;
1830
1831 let receipt_room_id = this.encode_key(keys::RECEIPT, &room_id);
1832 txn.remove_room_receipts(&receipt_room_id)?;
1833
1834 let display_name_room_id = this.encode_key(keys::DISPLAY_NAME, &room_id);
1835 txn.remove_room_display_names(&display_name_room_id)?;
1836
1837 let send_queue_room_id = this.encode_key(keys::SEND_QUEUE, &room_id);
1838 txn.remove_room_send_queue(&send_queue_room_id)?;
1839
1840 let dependent_send_queue_room_id =
1841 this.encode_key(keys::DEPENDENTS_SEND_QUEUE, &room_id);
1842 txn.remove_room_dependent_send_queue(&dependent_send_queue_room_id)?;
1843
1844 let thread_subscriptions_room_id =
1845 this.encode_key(keys::THREAD_SUBSCRIPTIONS, &room_id);
1846 txn.execute(
1847 "DELETE FROM thread_subscriptions WHERE room_id = ?",
1848 (thread_subscriptions_room_id,),
1849 )?;
1850
1851 Ok(())
1852 })
1853 .await?;
1854
1855 conn.vacuum().await
1856 }
1857
1858 async fn save_send_queue_request(
1859 &self,
1860 room_id: &RoomId,
1861 transaction_id: OwnedTransactionId,
1862 created_at: MilliSecondsSinceUnixEpoch,
1863 content: QueuedRequestKind,
1864 priority: usize,
1865 ) -> Result<(), Self::Error> {
1866 let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
1867 let room_id_value = self.serialize_value(&room_id.to_owned())?;
1868
1869 let content = self.serialize_json(&content)?;
1870 let created_at_ts: u64 = created_at.0.into();
1876 self.write()
1877 .await
1878 .with_transaction(move |txn| {
1879 txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
1880 Ok(())
1881 })
1882 .await
1883 }
1884
1885 async fn update_send_queue_request(
1886 &self,
1887 room_id: &RoomId,
1888 transaction_id: &TransactionId,
1889 content: QueuedRequestKind,
1890 ) -> Result<bool, Self::Error> {
1891 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1892
1893 let content = self.serialize_json(&content)?;
1894 let transaction_id = transaction_id.to_string();
1897
1898 let num_updated = self.write()
1899 .await
1900 .with_transaction(move |txn| {
1901 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
1902 })
1903 .await?;
1904
1905 Ok(num_updated > 0)
1906 }
1907
1908 async fn remove_send_queue_request(
1909 &self,
1910 room_id: &RoomId,
1911 transaction_id: &TransactionId,
1912 ) -> Result<bool, Self::Error> {
1913 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1914
1915 let transaction_id = transaction_id.to_string();
1917
1918 let num_deleted = self
1919 .write()
1920 .await
1921 .with_transaction(move |txn| {
1922 txn.prepare_cached(
1923 "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
1924 )?
1925 .execute((room_id, &transaction_id))
1926 })
1927 .await?;
1928
1929 Ok(num_deleted > 0)
1930 }
1931
1932 async fn load_send_queue_requests(
1933 &self,
1934 room_id: &RoomId,
1935 ) -> Result<Vec<QueuedRequest>, Self::Error> {
1936 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1937
1938 let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, Option<u64>)> = self
1942 .read()
1943 .await?
1944 .prepare(
1945 "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
1946 |mut stmt| {
1947 stmt.query((room_id,))?
1948 .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
1949 .collect()
1950 },
1951 )
1952 .await?;
1953
1954 let mut requests = Vec::with_capacity(res.len());
1955
1956 for entry in res {
1957 let created_at = entry
1958 .4
1959 .and_then(UInt::new)
1960 .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
1961
1962 requests.push(QueuedRequest {
1963 transaction_id: entry.0.into(),
1964 kind: self.deserialize_json(&entry.1)?,
1965 error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
1966 priority: entry.3,
1967 created_at,
1968 });
1969 }
1970
1971 Ok(requests)
1972 }
1973
1974 async fn update_send_queue_request_status(
1975 &self,
1976 room_id: &RoomId,
1977 transaction_id: &TransactionId,
1978 error: Option<QueueWedgeError>,
1979 ) -> Result<(), Self::Error> {
1980 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1981
1982 let transaction_id = transaction_id.to_string();
1984
1985 let error_value = error.map(|e| self.serialize_value(&e)).transpose()?;
1987
1988 self.write()
1989 .await
1990 .with_transaction(move |txn| {
1991 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?;
1992 Ok(())
1993 })
1994 .await
1995 }
1996
1997 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1998 let res: Vec<Vec<u8>> = self
2003 .read()
2004 .await?
2005 .prepare("SELECT room_id_val FROM send_queue_events", |mut stmt| {
2006 stmt.query(())?.mapped(|row| row.get(0)).collect()
2007 })
2008 .await?;
2009
2010 Ok(res
2013 .into_iter()
2014 .map(|entry| self.deserialize_value(&entry))
2015 .collect::<Result<BTreeSet<OwnedRoomId>, _>>()?
2016 .into_iter()
2017 .collect())
2018 }
2019
2020 async fn save_dependent_queued_request(
2021 &self,
2022 room_id: &RoomId,
2023 parent_txn_id: &TransactionId,
2024 own_txn_id: ChildTransactionId,
2025 created_at: MilliSecondsSinceUnixEpoch,
2026 content: DependentQueuedRequestKind,
2027 ) -> Result<()> {
2028 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2029 let content = self.serialize_json(&content)?;
2030
2031 let parent_txn_id = parent_txn_id.to_string();
2033 let own_txn_id = own_txn_id.to_string();
2034
2035 let created_at_ts: u64 = created_at.0.into();
2036 self.write()
2037 .await
2038 .with_transaction(move |txn| {
2039 txn.prepare_cached(
2040 r#"INSERT INTO dependent_send_queue_events
2041 (room_id, parent_transaction_id, own_transaction_id, content, created_at)
2042 VALUES (?, ?, ?, ?, ?)"#,
2043 )?
2044 .execute((
2045 room_id,
2046 parent_txn_id,
2047 own_txn_id,
2048 content,
2049 created_at_ts,
2050 ))?;
2051 Ok(())
2052 })
2053 .await
2054 }
2055
2056 async fn update_dependent_queued_request(
2057 &self,
2058 room_id: &RoomId,
2059 own_transaction_id: &ChildTransactionId,
2060 new_content: DependentQueuedRequestKind,
2061 ) -> Result<bool> {
2062 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2063 let content = self.serialize_json(&new_content)?;
2064
2065 let own_txn_id = own_transaction_id.to_string();
2067
2068 let num_updated = self
2069 .write()
2070 .await
2071 .with_transaction(move |txn| {
2072 txn.prepare_cached(
2073 r#"UPDATE dependent_send_queue_events
2074 SET content = ?
2075 WHERE own_transaction_id = ?
2076 AND room_id = ?"#,
2077 )?
2078 .execute((content, own_txn_id, room_id))
2079 })
2080 .await?;
2081
2082 if num_updated > 1 {
2083 return Err(Error::InconsistentUpdate);
2084 }
2085
2086 Ok(num_updated == 1)
2087 }
2088
2089 async fn mark_dependent_queued_requests_as_ready(
2090 &self,
2091 room_id: &RoomId,
2092 parent_txn_id: &TransactionId,
2093 parent_key: SentRequestKey,
2094 ) -> Result<usize> {
2095 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2096 let parent_key = self.serialize_value(&parent_key)?;
2097
2098 let parent_txn_id = parent_txn_id.to_string();
2100
2101 self.write()
2102 .await
2103 .with_transaction(move |txn| {
2104 Ok(txn.prepare_cached(
2105 "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?",
2106 )?
2107 .execute((parent_key, parent_txn_id, room_id))?)
2108 })
2109 .await
2110 }
2111
2112 async fn remove_dependent_queued_request(
2113 &self,
2114 room_id: &RoomId,
2115 txn_id: &ChildTransactionId,
2116 ) -> Result<bool> {
2117 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2118
2119 let txn_id = txn_id.to_string();
2121
2122 let num_deleted = self
2123 .write()
2124 .await
2125 .with_transaction(move |txn| {
2126 txn.prepare_cached(
2127 "DELETE FROM dependent_send_queue_events WHERE own_transaction_id = ? AND room_id = ?",
2128 )?
2129 .execute((txn_id, room_id))
2130 })
2131 .await?;
2132
2133 Ok(num_deleted > 0)
2134 }
2135
2136 async fn load_dependent_queued_requests(
2137 &self,
2138 room_id: &RoomId,
2139 ) -> Result<Vec<DependentQueuedRequest>> {
2140 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2141
2142 let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, Option<u64>)> = self
2144 .read()
2145 .await?
2146 .prepare(
2147 "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2148 |mut stmt| {
2149 stmt.query((room_id,))?
2150 .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2151 .collect()
2152 },
2153 )
2154 .await?;
2155
2156 let mut dependent_events = Vec::with_capacity(res.len());
2157
2158 for entry in res {
2159 let created_at = entry
2160 .4
2161 .and_then(UInt::new)
2162 .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2163
2164 dependent_events.push(DependentQueuedRequest {
2165 own_transaction_id: entry.0.into(),
2166 parent_transaction_id: entry.1.into(),
2167 parent_key: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?,
2168 kind: self.deserialize_json(&entry.3)?,
2169 created_at,
2170 });
2171 }
2172
2173 Ok(dependent_events)
2174 }
2175
2176 async fn upsert_thread_subscription(
2177 &self,
2178 room_id: &RoomId,
2179 thread_id: &EventId,
2180 mut new: StoredThreadSubscription,
2181 ) -> Result<(), Self::Error> {
2182 if let Some(previous) = self.load_thread_subscription(room_id, thread_id).await? {
2183 if previous == new {
2184 trace!("not saving thread subscription because the subscription is the same");
2186 return Ok(());
2187 }
2188
2189 if !compare_thread_subscription_bump_stamps(previous.bump_stamp, &mut new.bump_stamp) {
2190 trace!("not saving thread subscription because we have a newer bump stamp");
2191 return Ok(());
2192 }
2193 }
2194
2195 let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2196 let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2197 let status = new.status.as_str();
2198
2199 self.write()
2200 .await
2201 .with_transaction(move |txn| {
2202 txn.prepare_cached(
2204 "INSERT OR REPLACE INTO thread_subscriptions (room_id, event_id, status, bump_stamp)
2205 VALUES (?, ?, ?, ?)",
2206 )?
2207 .execute((room_id, thread_id, status, new.bump_stamp))
2208 })
2209 .await?;
2210 Ok(())
2211 }
2212
2213 async fn load_thread_subscription(
2214 &self,
2215 room_id: &RoomId,
2216 thread_id: &EventId,
2217 ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
2218 let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2219 let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2220
2221 Ok(self
2222 .read()
2223 .await?
2224 .query_row(
2225 "SELECT status, bump_stamp FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2226 (room_id, thread_id),
2227 |row| Ok((row.get::<_, String>(0)?, row.get::<_, Option<u64>>(1)?))
2228 )
2229 .await
2230 .optional()?
2231 .map(|(status, bump_stamp)| -> Result<_, Self::Error> {
2232 let status = ThreadSubscriptionStatus::from_str(&status).map_err(|_| {
2233 Error::InvalidData { details: format!("Invalid thread status: {status}") }
2234 })?;
2235 Ok(StoredThreadSubscription { status, bump_stamp })
2236 })
2237 .transpose()?)
2238 }
2239
2240 async fn remove_thread_subscription(
2241 &self,
2242 room_id: &RoomId,
2243 thread_id: &EventId,
2244 ) -> Result<(), Self::Error> {
2245 let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2246 let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2247
2248 self.write()
2249 .await
2250 .execute(
2251 "DELETE FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2252 (room_id, thread_id),
2253 )
2254 .await?;
2255
2256 Ok(())
2257 }
2258}
2259
2260#[derive(Debug, Clone, Serialize, Deserialize)]
2261struct ReceiptData {
2262 receipt: Receipt,
2263 event_id: OwnedEventId,
2264 user_id: OwnedUserId,
2265}
2266
2267#[cfg(test)]
2268mod tests {
2269 use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2270
2271 use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2272 use once_cell::sync::Lazy;
2273 use tempfile::{tempdir, TempDir};
2274
2275 use super::SqliteStateStore;
2276
2277 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2278 static NUM: AtomicU32 = AtomicU32::new(0);
2279
2280 async fn get_store() -> Result<impl StateStore, StoreError> {
2281 let name = NUM.fetch_add(1, SeqCst).to_string();
2282 let tmpdir_path = TMP_DIR.path().join(name);
2283
2284 tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2285
2286 Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
2287 }
2288
2289 statestore_integration_tests!();
2290}
2291
2292#[cfg(test)]
2293mod encrypted_tests {
2294 use std::{
2295 path::PathBuf,
2296 sync::atomic::{AtomicU32, Ordering::SeqCst},
2297 };
2298
2299 use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2300 use matrix_sdk_test::async_test;
2301 use once_cell::sync::Lazy;
2302 use tempfile::{tempdir, TempDir};
2303
2304 use super::SqliteStateStore;
2305 use crate::{utils::SqliteAsyncConnExt, SqliteStoreConfig};
2306
2307 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2308 static NUM: AtomicU32 = AtomicU32::new(0);
2309
2310 fn new_state_store_workspace() -> PathBuf {
2311 let name = NUM.fetch_add(1, SeqCst).to_string();
2312 TMP_DIR.path().join(name)
2313 }
2314
2315 async fn get_store() -> Result<impl StateStore, StoreError> {
2316 let tmpdir_path = new_state_store_workspace();
2317
2318 tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2319
2320 Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
2321 .await
2322 .unwrap())
2323 }
2324
2325 #[async_test]
2326 async fn test_pool_size() {
2327 let tmpdir_path = new_state_store_workspace();
2328 let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
2329
2330 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2331
2332 assert_eq!(store.pool.status().max_size, 42);
2333 }
2334
2335 #[async_test]
2336 async fn test_cache_size() {
2337 let tmpdir_path = new_state_store_workspace();
2338 let store_open_config = SqliteStoreConfig::new(tmpdir_path).cache_size(1500);
2339
2340 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2341
2342 let conn = store.pool.get().await.unwrap();
2343 let cache_size =
2344 conn.query_row("PRAGMA cache_size", (), |row| row.get::<_, i32>(0)).await.unwrap();
2345
2346 assert_eq!(cache_size, -(1500 / 1024));
2350 }
2351
2352 #[async_test]
2353 async fn test_journal_size_limit() {
2354 let tmpdir_path = new_state_store_workspace();
2355 let store_open_config = SqliteStoreConfig::new(tmpdir_path).journal_size_limit(1500);
2356
2357 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2358
2359 let conn = store.pool.get().await.unwrap();
2360 let journal_size_limit = conn
2361 .query_row("PRAGMA journal_size_limit", (), |row| row.get::<_, u32>(0))
2362 .await
2363 .unwrap();
2364
2365 assert_eq!(journal_size_limit, 1500);
2368 }
2369
2370 statestore_integration_tests!();
2371}
2372
2373#[cfg(test)]
2374mod migration_tests {
2375 use std::{
2376 path::{Path, PathBuf},
2377 sync::{
2378 atomic::{AtomicU32, Ordering::SeqCst},
2379 Arc,
2380 },
2381 };
2382
2383 use as_variant::as_variant;
2384 use deadpool_sqlite::Runtime;
2385 use matrix_sdk_base::{
2386 media::{MediaFormat, MediaRequestParameters},
2387 store::{
2388 ChildTransactionId, DependentQueuedRequestKind, RoomLoadSettings,
2389 SerializableEventContent,
2390 },
2391 sync::UnreadNotificationsCount,
2392 RoomState, StateStore,
2393 };
2394 use matrix_sdk_test::async_test;
2395 use once_cell::sync::Lazy;
2396 use ruma::{
2397 events::{
2398 room::{create::RoomCreateEventContent, message::RoomMessageEventContent, MediaSource},
2399 StateEventType,
2400 },
2401 room_id, server_name, user_id, EventId, MilliSecondsSinceUnixEpoch, OwnedTransactionId,
2402 RoomId, TransactionId, UserId,
2403 };
2404 use rusqlite::Transaction;
2405 use serde::{Deserialize, Serialize};
2406 use serde_json::json;
2407 use tempfile::{tempdir, TempDir};
2408 use tokio::{fs, sync::Mutex};
2409 use zeroize::Zeroizing;
2410
2411 use super::{init, keys, SqliteStateStore, DATABASE_NAME};
2412 use crate::{
2413 error::{Error, Result},
2414 utils::{EncryptableStore as _, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
2415 OpenStoreError, Secret,
2416 };
2417
2418 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2419 static NUM: AtomicU32 = AtomicU32::new(0);
2420 const SECRET: &str = "secret";
2421
2422 fn new_path() -> PathBuf {
2423 let name = NUM.fetch_add(1, SeqCst).to_string();
2424 TMP_DIR.path().join(name)
2425 }
2426
2427 async fn create_fake_db(path: &Path, version: u8) -> Result<SqliteStateStore> {
2428 fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir).unwrap();
2429
2430 let config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
2431 let pool = config.create_pool(Runtime::Tokio1).unwrap();
2434 let conn = pool.get().await?;
2435
2436 init(&conn).await?;
2437
2438 let store_cipher = Some(Arc::new(
2439 conn.get_or_create_store_cipher(Secret::PassPhrase(Zeroizing::new(SECRET.to_owned())))
2440 .await
2441 .unwrap(),
2442 ));
2443 let this = SqliteStateStore {
2444 store_cipher,
2445 pool,
2446 write_connection: Arc::new(Mutex::new(conn)),
2448 };
2449 this.run_migrations(1, Some(version)).await?;
2450
2451 Ok(this)
2452 }
2453
2454 fn room_info_v1_json(
2455 room_id: &RoomId,
2456 state: RoomState,
2457 name: Option<&str>,
2458 creator: Option<&UserId>,
2459 ) -> serde_json::Value {
2460 let name_content = match name {
2462 Some(name) => json!({ "name": name }),
2463 None => json!({ "name": null }),
2464 };
2465 let create_content = match creator {
2467 Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2468 None => RoomCreateEventContent::new_v11(),
2469 };
2470
2471 json!({
2472 "room_id": room_id,
2473 "room_type": state,
2474 "notification_counts": UnreadNotificationsCount::default(),
2475 "summary": {
2476 "heroes": [],
2477 "joined_member_count": 0,
2478 "invited_member_count": 0,
2479 },
2480 "members_synced": false,
2481 "base_info": {
2482 "dm_targets": [],
2483 "max_power_level": 100,
2484 "name": {
2485 "Original": {
2486 "content": name_content,
2487 },
2488 },
2489 "create": {
2490 "Original": {
2491 "content": create_content,
2492 }
2493 }
2494 },
2495 })
2496 }
2497
2498 #[async_test]
2499 pub async fn test_migrating_v1_to_v2() {
2500 let path = new_path();
2501 {
2503 let db = create_fake_db(&path, 1).await.unwrap();
2504 let conn = db.pool.get().await.unwrap();
2505
2506 let this = db.clone();
2507 conn.with_transaction(move |txn| {
2508 for i in 0..5 {
2509 let room_id = RoomId::parse(format!("!room_{i}:localhost")).unwrap();
2510 let (state, stripped) =
2511 if i < 3 { (RoomState::Joined, false) } else { (RoomState::Invited, true) };
2512 let info = room_info_v1_json(&room_id, state, None, None);
2513
2514 let room_id = this.encode_key(keys::ROOM_INFO, room_id);
2515 let data = this.serialize_json(&info)?;
2516
2517 txn.prepare_cached(
2518 "INSERT INTO room_info (room_id, stripped, data)
2519 VALUES (?, ?, ?)",
2520 )?
2521 .execute((room_id, stripped, data))?;
2522 }
2523
2524 Result::<_, Error>::Ok(())
2525 })
2526 .await
2527 .unwrap();
2528 }
2529
2530 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2532
2533 assert_eq!(store.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 5);
2535 }
2536
2537 fn add_room_v2(
2539 this: &SqliteStateStore,
2540 txn: &Transaction<'_>,
2541 room_id: &RoomId,
2542 name: Option<&str>,
2543 create_creator: Option<&UserId>,
2544 create_sender: Option<&UserId>,
2545 ) -> Result<(), Error> {
2546 let room_info_json = room_info_v1_json(room_id, RoomState::Joined, name, create_creator);
2547
2548 let encoded_room_id = this.encode_key(keys::ROOM_INFO, room_id);
2549 let encoded_state =
2550 this.encode_key(keys::ROOM_INFO, serde_json::to_string(&RoomState::Joined)?);
2551 let data = this.serialize_json(&room_info_json)?;
2552
2553 txn.prepare_cached(
2554 "INSERT INTO room_info (room_id, state, data)
2555 VALUES (?, ?, ?)",
2556 )?
2557 .execute((encoded_room_id, encoded_state, data))?;
2558
2559 let Some(create_sender) = create_sender else {
2561 return Ok(());
2562 };
2563
2564 let create_content = match create_creator {
2565 Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2566 None => RoomCreateEventContent::new_v11(),
2567 };
2568
2569 let event_id = EventId::new(server_name!("dummy.local"));
2570 let create_event = json!({
2571 "content": create_content,
2572 "event_id": event_id,
2573 "sender": create_sender.to_owned(),
2574 "origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
2575 "state_key": "",
2576 "type": "m.room.create",
2577 "unsigned": {},
2578 });
2579
2580 let encoded_room_id = this.encode_key(keys::STATE_EVENT, room_id);
2581 let encoded_event_type =
2582 this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
2583 let encoded_state_key = this.encode_key(keys::STATE_EVENT, "");
2584 let stripped = false;
2585 let encoded_event_id = this.encode_key(keys::STATE_EVENT, event_id);
2586 let data = this.serialize_json(&create_event)?;
2587
2588 txn.prepare_cached(
2589 "INSERT
2590 INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
2591 VALUES (?, ?, ?, ?, ?, ?)",
2592 )?
2593 .execute((
2594 encoded_room_id,
2595 encoded_event_type,
2596 encoded_state_key,
2597 stripped,
2598 encoded_event_id,
2599 data,
2600 ))?;
2601
2602 Ok(())
2603 }
2604
2605 #[async_test]
2606 pub async fn test_migrating_v2_to_v3() {
2607 let path = new_path();
2608
2609 let room_a_id = room_id!("!room_a:dummy.local");
2611 let room_a_name = "Room A";
2612 let room_a_creator = user_id!("@creator:dummy.local");
2613 let room_a_create_sender = user_id!("@sender:dummy.local");
2616
2617 let room_b_id = room_id!("!room_b:dummy.local");
2619
2620 let room_c_id = room_id!("!room_c:dummy.local");
2622 let room_c_create_sender = user_id!("@creator:dummy.local");
2623
2624 {
2626 let db = create_fake_db(&path, 2).await.unwrap();
2627 let conn = db.pool.get().await.unwrap();
2628
2629 let this = db.clone();
2630 conn.with_transaction(move |txn| {
2631 add_room_v2(
2632 &this,
2633 txn,
2634 room_a_id,
2635 Some(room_a_name),
2636 Some(room_a_creator),
2637 Some(room_a_create_sender),
2638 )?;
2639 add_room_v2(&this, txn, room_b_id, None, None, None)?;
2640 add_room_v2(&this, txn, room_c_id, None, None, Some(room_c_create_sender))?;
2641
2642 Result::<_, Error>::Ok(())
2643 })
2644 .await
2645 .unwrap();
2646 }
2647
2648 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2650
2651 let room_infos = store.get_room_infos(&RoomLoadSettings::default()).await.unwrap();
2653 assert_eq!(room_infos.len(), 3);
2654
2655 let room_a = room_infos.iter().find(|r| r.room_id() == room_a_id).unwrap();
2656 assert_eq!(room_a.name(), Some(room_a_name));
2657 assert_eq!(room_a.creators(), Some(vec![room_a_create_sender.to_owned()]));
2658
2659 let room_b = room_infos.iter().find(|r| r.room_id() == room_b_id).unwrap();
2660 assert_eq!(room_b.name(), None);
2661 assert_eq!(room_b.creators(), None);
2662
2663 let room_c = room_infos.iter().find(|r| r.room_id() == room_c_id).unwrap();
2664 assert_eq!(room_c.name(), None);
2665 assert_eq!(room_c.creators(), Some(vec![room_c_create_sender.to_owned()]));
2666 }
2667
2668 #[async_test]
2669 pub async fn test_migrating_v7_to_v9() {
2670 let path = new_path();
2671
2672 let room_id = room_id!("!room_a:dummy.local");
2673 let wedged_event_transaction_id = TransactionId::new();
2674 let local_event_transaction_id = TransactionId::new();
2675
2676 {
2678 let db = create_fake_db(&path, 7).await.unwrap();
2679 let conn = db.pool.get().await.unwrap();
2680
2681 let wedge_tx = wedged_event_transaction_id.clone();
2682 let local_tx = local_event_transaction_id.clone();
2683
2684 conn.with_transaction(move |txn| {
2685 add_dependent_send_queue_event_v7(
2686 &db,
2687 txn,
2688 room_id,
2689 &local_tx,
2690 ChildTransactionId::new(),
2691 DependentQueuedRequestKind::RedactEvent,
2692 )?;
2693 add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?;
2694 add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?;
2695 Result::<_, Error>::Ok(())
2696 })
2697 .await
2698 .unwrap();
2699 }
2700
2701 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2704
2705 let requests = store.load_send_queue_requests(room_id).await.unwrap();
2706 assert!(requests.is_empty());
2707
2708 let dependent_requests = store.load_dependent_queued_requests(room_id).await.unwrap();
2709 assert!(dependent_requests.is_empty());
2710 }
2711
2712 fn add_send_queue_event_v7(
2713 this: &SqliteStateStore,
2714 txn: &Transaction<'_>,
2715 transaction_id: &TransactionId,
2716 room_id: &RoomId,
2717 is_wedged: bool,
2718 ) -> Result<(), Error> {
2719 let content =
2720 SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?;
2721
2722 let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id);
2723 let room_id_value = this.serialize_value(&room_id.to_owned())?;
2724
2725 let content = this.serialize_json(&content)?;
2726
2727 txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
2728 .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
2729
2730 Ok(())
2731 }
2732
2733 fn add_dependent_send_queue_event_v7(
2734 this: &SqliteStateStore,
2735 txn: &Transaction<'_>,
2736 room_id: &RoomId,
2737 parent_txn_id: &TransactionId,
2738 own_txn_id: ChildTransactionId,
2739 content: DependentQueuedRequestKind,
2740 ) -> Result<(), Error> {
2741 let room_id_value = this.serialize_value(&room_id.to_owned())?;
2742
2743 let parent_txn_id = parent_txn_id.to_string();
2744 let own_txn_id = own_txn_id.to_string();
2745 let content = this.serialize_json(&content)?;
2746
2747 txn.prepare_cached(
2748 "INSERT INTO dependent_send_queue_events
2749 (room_id, parent_transaction_id, own_transaction_id, content)
2750 VALUES (?, ?, ?, ?)",
2751 )?
2752 .execute((room_id_value, parent_txn_id, own_txn_id, content))?;
2753
2754 Ok(())
2755 }
2756
2757 #[derive(Clone, Debug, Serialize, Deserialize)]
2758 pub enum LegacyDependentQueuedRequestKind {
2759 UploadFileWithThumbnail {
2760 content_type: String,
2761 cache_key: MediaRequestParameters,
2762 related_to: OwnedTransactionId,
2763 },
2764 }
2765
2766 #[async_test]
2767 pub async fn test_dependent_queued_request_variant_renaming() {
2768 let path = new_path();
2769 let db = create_fake_db(&path, 7).await.unwrap();
2770
2771 let cache_key = MediaRequestParameters {
2772 format: MediaFormat::File,
2773 source: MediaSource::Plain("https://server.local/foobar".into()),
2774 };
2775 let related_to = TransactionId::new();
2776 let request = LegacyDependentQueuedRequestKind::UploadFileWithThumbnail {
2777 content_type: "image/png".to_owned(),
2778 cache_key,
2779 related_to: related_to.clone(),
2780 };
2781
2782 let data = db
2783 .serialize_json(&request)
2784 .expect("should be able to serialize legacy dependent request");
2785 let deserialized: DependentQueuedRequestKind = db.deserialize_json(&data).expect(
2786 "should be able to deserialize dependent request from legacy dependent request",
2787 );
2788
2789 as_variant!(deserialized, DependentQueuedRequestKind::UploadFileOrThumbnail { related_to: de_related_to, .. } => {
2790 assert_eq!(de_related_to, related_to);
2791 });
2792 }
2793}