1use std::{
2 borrow::Cow,
3 collections::{BTreeMap, BTreeSet, HashMap},
4 fmt, iter,
5 path::Path,
6 str::FromStr as _,
7 sync::Arc,
8};
9
10use async_trait::async_trait;
11use matrix_sdk_base::{
12 MinimalRoomMemberEvent, ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK, RoomInfo,
13 RoomMemberships, RoomState, StateChanges, StateStore, StateStoreDataKey, StateStoreDataValue,
14 deserialized_responses::{DisplayName, RawAnySyncOrStrippedState, SyncOrStrippedState},
15 store::{
16 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError,
17 QueuedRequest, QueuedRequestKind, RoomLoadSettings, SentRequestKey,
18 StoredThreadSubscription, ThreadSubscriptionStatus, migration_helpers::RoomInfoV1,
19 },
20};
21use matrix_sdk_store_encryption::StoreCipher;
22use ruma::{
23 CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
24 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
25 canonical_json::{RedactedBecause, redact},
26 events::{
27 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
28 GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
29 presence::PresenceEvent,
30 receipt::{Receipt, ReceiptThread, ReceiptType},
31 room::{
32 create::RoomCreateEventContent,
33 member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
34 },
35 },
36 serde::Raw,
37};
38use rusqlite::{OptionalExtension, Transaction};
39use serde::{Deserialize, Serialize};
40use tokio::{
41 fs,
42 sync::{Mutex, OwnedMutexGuard},
43};
44use tracing::{debug, instrument, warn};
45
46use crate::{
47 OpenStoreError, Secret, SqliteStoreConfig,
48 connection::{Connection as SqliteAsyncConn, Pool as SqlitePool},
49 error::{Error, Result},
50 utils::{
51 EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
52 SqliteKeyValueStoreConnExt, repeat_vars,
53 },
54};
55
56mod keys {
57 pub const KV_BLOB: &str = "kv_blob";
59 pub const ROOM_INFO: &str = "room_info";
60 pub const STATE_EVENT: &str = "state_event";
61 pub const GLOBAL_ACCOUNT_DATA: &str = "global_account_data";
62 pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
63 pub const MEMBER: &str = "member";
64 pub const PROFILE: &str = "profile";
65 pub const RECEIPT: &str = "receipt";
66 pub const DISPLAY_NAME: &str = "display_name";
67 pub const SEND_QUEUE: &str = "send_queue_events";
68 pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
69 pub const THREAD_SUBSCRIPTIONS: &str = "thread_subscriptions";
70}
71
72pub const DATABASE_NAME: &str = "matrix-sdk-state.sqlite3";
74
75#[derive(Clone)]
77pub struct SqliteStateStore {
78 store_cipher: Option<Arc<StoreCipher>>,
79
80 pool: SqlitePool,
82
83 write_connection: Arc<Mutex<SqliteAsyncConn>>,
88}
89
90#[cfg(not(tarpaulin_include))]
91impl fmt::Debug for SqliteStateStore {
92 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93 f.debug_struct("SqliteStateStore").finish_non_exhaustive()
94 }
95}
96
97impl SqliteStateStore {
98 pub async fn open(
101 path: impl AsRef<Path>,
102 passphrase: Option<&str>,
103 ) -> Result<Self, OpenStoreError> {
104 Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
105 }
106
107 pub async fn open_with_key(
110 path: impl AsRef<Path>,
111 key: Option<&[u8; 32]>,
112 ) -> Result<Self, OpenStoreError> {
113 Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
114 }
115
116 pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
118 fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir)?;
119
120 let pool = config.build_pool_of_connections(DATABASE_NAME)?;
121
122 let this = Self::open_with_pool(pool, config.secret).await?;
123 this.pool.get().await?.apply_runtime_config(config.runtime_config).await?;
124
125 Ok(this)
126 }
127
128 pub async fn open_with_pool(
131 pool: SqlitePool,
132 secret: Option<Secret>,
133 ) -> Result<Self, OpenStoreError> {
134 let conn = pool.get().await?;
135
136 let mut version = conn.db_version().await?;
137
138 if version == 0 {
139 init(&conn).await?;
140 version = 1;
141 }
142
143 let store_cipher = match secret {
144 Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
145 None => None,
146 };
147 let this = Self {
148 store_cipher,
149 pool,
150 write_connection: Arc::new(Mutex::new(conn)),
152 };
153 this.run_migrations(version, None).await?;
154
155 this.read().await?.wal_checkpoint().await;
156
157 Ok(this)
158 }
159
160 async fn run_migrations(&self, from: u8, to: Option<u8>) -> Result<()> {
165 if to == Some(1) {
166 return Ok(());
167 }
168
169 let conn = self.write().await;
170
171 if from < 2 {
172 debug!("Upgrading database to version 2");
173 let this = self.clone();
174 conn.with_transaction(move |txn| {
175 txn.execute_batch(include_str!(
177 "../migrations/state_store/002_a_create_new_room_info.sql"
178 ))?;
179
180 for data in txn
182 .prepare("SELECT data FROM room_info")?
183 .query_map((), |row| row.get::<_, Vec<u8>>(0))?
184 {
185 let data = data?;
186 let room_info: RoomInfoV1 = this.deserialize_json(&data)?;
187
188 let room_id = this.encode_key(keys::ROOM_INFO, room_info.room_id());
189 let state = this
190 .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
191 txn.prepare_cached(
192 "INSERT OR REPLACE INTO new_room_info (room_id, state, data)
193 VALUES (?, ?, ?)",
194 )?
195 .execute((room_id, state, data))?;
196 }
197
198 txn.execute_batch(include_str!(
200 "../migrations/state_store/002_b_replace_room_info.sql"
201 ))?;
202
203 txn.set_db_version(2)?;
204 Result::<_, Error>::Ok(())
205 })
206 .await?;
207 }
208
209 if to == Some(2) {
210 return Ok(());
211 }
212
213 if from < 3 {
215 debug!("Upgrading database to version 3");
216 let this = self.clone();
217 conn.with_transaction(move |txn| {
218 for data in txn
220 .prepare("SELECT data FROM room_info")?
221 .query_map((), |row| row.get::<_, Vec<u8>>(0))?
222 {
223 let data = data?;
224 let room_info_v1: RoomInfoV1 = this.deserialize_json(&data)?;
225
226 let room_id = this.encode_key(keys::STATE_EVENT, room_info_v1.room_id());
228 let event_type =
229 this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
230 let create_res = txn
231 .prepare(
232 "SELECT stripped, data FROM state_event
233 WHERE room_id = ? AND event_type = ?",
234 )?
235 .query_row([room_id, event_type], |row| {
236 Ok((row.get::<_, bool>(0)?, row.get::<_, Vec<u8>>(1)?))
237 })
238 .optional()?;
239
240 let create = create_res.and_then(|(stripped, data)| {
241 let create = if stripped {
242 SyncOrStrippedState::<RoomCreateEventContent>::Stripped(
243 this.deserialize_json(&data).ok()?,
244 )
245 } else {
246 SyncOrStrippedState::Sync(this.deserialize_json(&data).ok()?)
247 };
248 Some(create)
249 });
250
251 let migrated_room_info = room_info_v1.migrate(create.as_ref());
252
253 let data = this.serialize_json(&migrated_room_info)?;
254 let room_id = this.encode_key(keys::ROOM_INFO, migrated_room_info.room_id());
255 txn.prepare_cached("UPDATE room_info SET data = ? WHERE room_id = ?")?
256 .execute((data, room_id))?;
257 }
258
259 txn.set_db_version(3)?;
260 Result::<_, Error>::Ok(())
261 })
262 .await?;
263 }
264
265 if to == Some(3) {
266 return Ok(());
267 }
268
269 if from < 4 {
270 debug!("Upgrading database to version 4");
271 conn.with_transaction(move |txn| {
272 txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
274 txn.set_db_version(4)
275 })
276 .await?;
277 }
278
279 if to == Some(4) {
280 return Ok(());
281 }
282
283 if from < 5 {
284 debug!("Upgrading database to version 5");
285 conn.with_transaction(move |txn| {
286 txn.execute_batch(include_str!(
288 "../migrations/state_store/004_send_queue_with_roomid_value.sql"
289 ))?;
290 txn.set_db_version(4)
291 })
292 .await?;
293 }
294
295 if to == Some(5) {
296 return Ok(());
297 }
298
299 if from < 6 {
300 debug!("Upgrading database to version 6");
301 conn.with_transaction(move |txn| {
302 txn.execute_batch(include_str!(
304 "../migrations/state_store/005_send_queue_dependent_events.sql"
305 ))?;
306 txn.set_db_version(6)
307 })
308 .await?;
309 }
310
311 if to == Some(6) {
312 return Ok(());
313 }
314
315 if from < 7 {
316 debug!("Upgrading database to version 7");
317 conn.with_transaction(move |txn| {
318 txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
320 txn.set_db_version(7)
321 })
322 .await?;
323 }
324
325 if to == Some(7) {
326 return Ok(());
327 }
328
329 if from < 8 {
330 debug!("Upgrading database to version 8");
331 let error = QueueWedgeError::GenericApiError {
333 msg: "local echo failed to send in a previous session".into(),
334 };
335 let default_err = self.serialize_value(&error)?;
336
337 conn.with_transaction(move |txn| {
338 txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?;
340
341 for wedged_entries in txn
344 .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")?
345 .query_map((), |row| {
346 Ok(
347 (row.get::<_, Vec<u8>>(0)?,row.get::<_, String>(1)?)
348 )
349 })? {
350
351 let (room_id, transaction_id) = wedged_entries?;
352
353 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?
354 .execute((default_err.clone(), room_id, transaction_id))?;
355 }
356
357
358 txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?;
360
361 txn.set_db_version(8)
362 })
363 .await?;
364 }
365
366 if to == Some(8) {
367 return Ok(());
368 }
369
370 if from < 9 {
371 debug!("Upgrading database to version 9");
372 conn.with_transaction(move |txn| {
373 txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?;
375 txn.set_db_version(9)
376 })
377 .await?;
378 }
379
380 if to == Some(9) {
381 return Ok(());
382 }
383
384 if from < 10 {
385 debug!("Upgrading database to version 10");
386 conn.with_transaction(move |txn| {
387 txn.execute_batch(include_str!(
389 "../migrations/state_store/009_send_queue_priority.sql"
390 ))?;
391 txn.set_db_version(10)
392 })
393 .await?;
394 }
395
396 if to == Some(10) {
397 return Ok(());
398 }
399
400 if from < 11 {
401 debug!("Upgrading database to version 11");
402 conn.with_transaction(move |txn| {
403 txn.execute_batch(include_str!(
405 "../migrations/state_store/010_send_queue_enqueue_time.sql"
406 ))?;
407 txn.set_db_version(11)
408 })
409 .await?;
410 }
411
412 if to == Some(11) {
413 return Ok(());
414 }
415
416 if from < 12 {
417 debug!("Upgrading database to version 12");
418 conn.vacuum().await?;
422 conn.set_kv("version", vec![12]).await?;
423 }
424
425 if to == Some(12) {
426 return Ok(());
427 }
428
429 if from < 13 {
430 debug!("Upgrading database to version 13");
431 conn.with_transaction(move |txn| {
432 txn.execute_batch(include_str!(
434 "../migrations/state_store/011_thread_subscriptions.sql"
435 ))?;
436 txn.set_db_version(13)
437 })
438 .await?;
439 }
440
441 if to == Some(13) {
442 return Ok(());
443 }
444
445 if from < 14 {
446 debug!("Upgrading database to version 14");
447 conn.with_transaction(move |txn| {
448 txn.execute_batch(include_str!(
450 "../migrations/state_store/012_thread_subscriptions_bumpstamp.sql"
451 ))?;
452 txn.set_db_version(14)
453 })
454 .await?;
455 }
456
457 if to == Some(14) {
458 return Ok(());
459 }
460
461 if from < 15 {
462 debug!("Upgrading database to version 15");
463 conn.with_transaction(move |txn| {
464 txn.execute_batch(include_str!(
466 "../migrations/state_store/013_send_queue_new_parent_key_format.sql"
467 ))?;
468 txn.set_db_version(15)
469 })
470 .await?;
471 }
472
473 if to == Some(15) {
474 return Ok(());
475 }
476
477 Ok(())
478 }
479
480 fn encode_state_store_data_key(&self, key: StateStoreDataKey<'_>) -> Key {
481 let key_s = match key {
482 StateStoreDataKey::SyncToken => Cow::Borrowed(StateStoreDataKey::SYNC_TOKEN),
483 StateStoreDataKey::SupportedVersions => {
484 Cow::Borrowed(StateStoreDataKey::SUPPORTED_VERSIONS)
485 }
486 StateStoreDataKey::WellKnown => Cow::Borrowed(StateStoreDataKey::WELL_KNOWN),
487 StateStoreDataKey::Filter(f) => {
488 Cow::Owned(format!("{}:{f}", StateStoreDataKey::FILTER))
489 }
490 StateStoreDataKey::UserAvatarUrl(u) => {
491 Cow::Owned(format!("{}:{u}", StateStoreDataKey::USER_AVATAR_URL))
492 }
493 StateStoreDataKey::RecentlyVisitedRooms(b) => {
494 Cow::Owned(format!("{}:{b}", StateStoreDataKey::RECENTLY_VISITED_ROOMS))
495 }
496 StateStoreDataKey::UtdHookManagerData => {
497 Cow::Borrowed(StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
498 }
499 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
500 Cow::Borrowed(StateStoreDataKey::ONE_TIME_KEY_ALREADY_UPLOADED)
501 }
502 StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
503 if let Some(thread_root) = thread_root {
504 Cow::Owned(format!(
505 "{}:{room_id}:{thread_root}",
506 StateStoreDataKey::COMPOSER_DRAFT
507 ))
508 } else {
509 Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::COMPOSER_DRAFT))
510 }
511 }
512 StateStoreDataKey::SeenKnockRequests(room_id) => {
513 Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::SEEN_KNOCK_REQUESTS))
514 }
515 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
516 Cow::Borrowed(StateStoreDataKey::THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS)
517 }
518 };
519
520 self.encode_key(keys::KV_BLOB, &*key_s)
521 }
522
523 fn encode_presence_key(&self, user_id: &UserId) -> Key {
524 self.encode_key(keys::KV_BLOB, format!("presence:{user_id}"))
525 }
526
527 fn encode_custom_key(&self, key: &[u8]) -> Key {
528 let mut full_key = b"custom:".to_vec();
529 full_key.extend(key);
530 self.encode_key(keys::KV_BLOB, full_key)
531 }
532
533 #[instrument(skip_all)]
535 async fn read(&self) -> Result<SqliteAsyncConn> {
536 Ok(self.pool.get().await?)
537 }
538
539 #[instrument(skip_all)]
541 async fn write(&self) -> OwnedMutexGuard<SqliteAsyncConn> {
542 self.write_connection.clone().lock_owned().await
543 }
544
545 fn remove_maybe_stripped_room_data(
546 &self,
547 txn: &Transaction<'_>,
548 room_id: &RoomId,
549 stripped: bool,
550 ) -> rusqlite::Result<()> {
551 let state_event_room_id = self.encode_key(keys::STATE_EVENT, room_id);
552 txn.remove_room_state_events(&state_event_room_id, Some(stripped))?;
553
554 let member_room_id = self.encode_key(keys::MEMBER, room_id);
555 txn.remove_room_members(&member_room_id, Some(stripped))
556 }
557
558 pub async fn vacuum(&self) -> Result<()> {
559 self.write_connection.lock().await.vacuum().await
560 }
561
562 pub async fn get_db_size(&self) -> Result<Option<usize>> {
563 let read_conn = self.pool.get().await?;
564 Ok(Some(read_conn.get_db_size().await?))
565 }
566}
567
568impl EncryptableStore for SqliteStateStore {
569 fn get_cypher(&self) -> Option<&StoreCipher> {
570 self.store_cipher.as_deref()
571 }
572}
573
574async fn init(conn: &SqliteAsyncConn) -> Result<()> {
576 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
579 conn.with_transaction(|txn| {
580 txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
581 txn.set_db_version(1)?;
582
583 Ok(())
584 })
585 .await
586}
587
588trait SqliteConnectionStateStoreExt {
589 fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()>;
590
591 fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()>;
592
593 fn set_room_account_data(
594 &self,
595 room_id: &[u8],
596 event_type: &[u8],
597 data: &[u8],
598 ) -> rusqlite::Result<()>;
599 fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()>;
600
601 fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()>;
602 fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>>;
603 fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()>;
604
605 fn set_state_event(
606 &self,
607 room_id: &[u8],
608 event_type: &[u8],
609 state_key: &[u8],
610 stripped: bool,
611 event_id: Option<&[u8]>,
612 data: &[u8],
613 ) -> rusqlite::Result<()>;
614 fn get_state_event_by_id(
615 &self,
616 room_id: &[u8],
617 event_id: &[u8],
618 ) -> rusqlite::Result<Option<Vec<u8>>>;
619 fn remove_room_state_events(
620 &self,
621 room_id: &[u8],
622 stripped: Option<bool>,
623 ) -> rusqlite::Result<()>;
624
625 fn set_member(
626 &self,
627 room_id: &[u8],
628 user_id: &[u8],
629 membership: &[u8],
630 stripped: bool,
631 data: &[u8],
632 ) -> rusqlite::Result<()>;
633 fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()>;
634
635 fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
636 fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()>;
637 fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()>;
638
639 fn set_receipt(
640 &self,
641 room_id: &[u8],
642 user_id: &[u8],
643 receipt_type: &[u8],
644 thread_id: &[u8],
645 event_id: &[u8],
646 data: &[u8],
647 ) -> rusqlite::Result<()>;
648 fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()>;
649
650 fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()>;
651 fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()>;
652 fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()>;
653 fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
654 fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
655}
656
657impl SqliteConnectionStateStoreExt for rusqlite::Connection {
658 fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()> {
659 self.execute("INSERT OR REPLACE INTO kv_blob VALUES (?, ?)", (key, value))?;
660 Ok(())
661 }
662
663 fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()> {
664 self.prepare_cached(
665 "INSERT OR REPLACE INTO global_account_data (event_type, data)
666 VALUES (?, ?)",
667 )?
668 .execute((event_type, data))?;
669 Ok(())
670 }
671
672 fn set_room_account_data(
673 &self,
674 room_id: &[u8],
675 event_type: &[u8],
676 data: &[u8],
677 ) -> rusqlite::Result<()> {
678 self.prepare_cached(
679 "INSERT OR REPLACE INTO room_account_data (room_id, event_type, data)
680 VALUES (?, ?, ?)",
681 )?
682 .execute((room_id, event_type, data))?;
683 Ok(())
684 }
685
686 fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()> {
687 self.prepare(
688 "DELETE FROM room_account_data
689 WHERE room_id = ?",
690 )?
691 .execute((room_id,))?;
692 Ok(())
693 }
694
695 fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()> {
696 self.prepare_cached(
697 "INSERT OR REPLACE INTO room_info (room_id, state, data)
698 VALUES (?, ?, ?)",
699 )?
700 .execute((room_id, state, data))?;
701 Ok(())
702 }
703
704 fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>> {
705 self.query_row("SELECT data FROM room_info WHERE room_id = ?", (room_id,), |row| row.get(0))
706 .optional()
707 }
708
709 fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()> {
711 self.prepare_cached("DELETE FROM room_info WHERE room_id = ?")?.execute((room_id,))?;
712 Ok(())
713 }
714
715 fn set_state_event(
716 &self,
717 room_id: &[u8],
718 event_type: &[u8],
719 state_key: &[u8],
720 stripped: bool,
721 event_id: Option<&[u8]>,
722 data: &[u8],
723 ) -> rusqlite::Result<()> {
724 self.prepare_cached(
725 "INSERT OR REPLACE
726 INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
727 VALUES (?, ?, ?, ?, ?, ?)",
728 )?
729 .execute((room_id, event_type, state_key, stripped, event_id, data))?;
730 Ok(())
731 }
732
733 fn get_state_event_by_id(
734 &self,
735 room_id: &[u8],
736 event_id: &[u8],
737 ) -> rusqlite::Result<Option<Vec<u8>>> {
738 self.query_row(
739 "SELECT data FROM state_event WHERE room_id = ? AND event_id = ?",
740 (room_id, event_id),
741 |row| row.get(0),
742 )
743 .optional()
744 }
745
746 fn remove_room_state_events(
752 &self,
753 room_id: &[u8],
754 stripped: Option<bool>,
755 ) -> rusqlite::Result<()> {
756 if let Some(stripped) = stripped {
757 self.prepare_cached("DELETE FROM state_event WHERE room_id = ? AND stripped = ?")?
758 .execute((room_id, stripped))?;
759 } else {
760 self.prepare_cached("DELETE FROM state_event WHERE room_id = ?")?
761 .execute((room_id,))?;
762 }
763 Ok(())
764 }
765
766 fn set_member(
767 &self,
768 room_id: &[u8],
769 user_id: &[u8],
770 membership: &[u8],
771 stripped: bool,
772 data: &[u8],
773 ) -> rusqlite::Result<()> {
774 self.prepare_cached(
775 "INSERT OR REPLACE
776 INTO member (room_id, user_id, membership, stripped, data)
777 VALUES (?, ?, ?, ?, ?)",
778 )?
779 .execute((room_id, user_id, membership, stripped, data))?;
780 Ok(())
781 }
782
783 fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()> {
788 if let Some(stripped) = stripped {
789 self.prepare_cached("DELETE FROM member WHERE room_id = ? AND stripped = ?")?
790 .execute((room_id, stripped))?;
791 } else {
792 self.prepare_cached("DELETE FROM member WHERE room_id = ?")?.execute((room_id,))?;
793 }
794 Ok(())
795 }
796
797 fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
798 self.prepare_cached(
799 "INSERT OR REPLACE
800 INTO profile (room_id, user_id, data)
801 VALUES (?, ?, ?)",
802 )?
803 .execute((room_id, user_id, data))?;
804 Ok(())
805 }
806
807 fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()> {
808 self.prepare("DELETE FROM profile WHERE room_id = ?")?.execute((room_id,))?;
809 Ok(())
810 }
811
812 fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()> {
813 self.prepare("DELETE FROM profile WHERE room_id = ? AND user_id = ?")?
814 .execute((room_id, user_id))?;
815 Ok(())
816 }
817
818 fn set_receipt(
819 &self,
820 room_id: &[u8],
821 user_id: &[u8],
822 receipt_type: &[u8],
823 thread: &[u8],
824 event_id: &[u8],
825 data: &[u8],
826 ) -> rusqlite::Result<()> {
827 self.prepare_cached(
828 "INSERT OR REPLACE
829 INTO receipt (room_id, user_id, receipt_type, thread, event_id, data)
830 VALUES (?, ?, ?, ?, ?, ?)",
831 )?
832 .execute((room_id, user_id, receipt_type, thread, event_id, data))?;
833 Ok(())
834 }
835
836 fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()> {
837 self.prepare("DELETE FROM receipt WHERE room_id = ?")?.execute((room_id,))?;
838 Ok(())
839 }
840
841 fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
842 self.prepare_cached(
843 "INSERT OR REPLACE
844 INTO display_name (room_id, name, data)
845 VALUES (?, ?, ?)",
846 )?
847 .execute((room_id, name, data))?;
848 Ok(())
849 }
850
851 fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()> {
852 self.prepare("DELETE FROM display_name WHERE room_id = ? AND name = ?")?
853 .execute((room_id, name))?;
854 Ok(())
855 }
856
857 fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()> {
858 self.prepare("DELETE FROM display_name WHERE room_id = ?")?.execute((room_id,))?;
859 Ok(())
860 }
861
862 fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
863 self.prepare("DELETE FROM send_queue_events WHERE room_id = ?")?.execute((room_id,))?;
864 Ok(())
865 }
866
867 fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
868 self.prepare("DELETE FROM dependent_send_queue_events WHERE room_id = ?")?
869 .execute((room_id,))?;
870 Ok(())
871 }
872}
873
874#[async_trait]
875trait SqliteObjectStateStoreExt: SqliteAsyncConnExt {
876 async fn get_kv_blob(&self, key: Key) -> Result<Option<Vec<u8>>> {
877 Ok(self
878 .query_row("SELECT value FROM kv_blob WHERE key = ?", (key,), |row| row.get(0))
879 .await
880 .optional()?)
881 }
882
883 async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
884 let keys_length = keys.len();
885
886 self.chunk_large_query_over(keys, Some(keys_length), |txn, keys| {
887 let sql_params = repeat_vars(keys.len());
888 let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");
889
890 let params = rusqlite::params_from_iter(keys);
891
892 Ok(txn
893 .prepare(&sql)?
894 .query(params)?
895 .mapped(|row| row.get(0))
896 .collect::<Result<_, _>>()?)
897 })
898 .await
899 }
900
901 async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;
902
903 async fn delete_kv_blob(&self, key: Key) -> Result<()> {
904 self.execute("DELETE FROM kv_blob WHERE key = ?", (key,)).await?;
905 Ok(())
906 }
907
908 async fn get_room_infos(&self, room_id: Option<Key>) -> Result<Vec<Vec<u8>>> {
909 Ok(match room_id {
910 None => {
911 self.prepare("SELECT data FROM room_info", move |mut stmt| {
912 stmt.query_map((), |row| row.get(0))?.collect()
913 })
914 .await?
915 }
916
917 Some(room_id) => {
918 self.prepare("SELECT data FROM room_info WHERE room_id = ?", move |mut stmt| {
919 stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
920 })
921 .await?
922 }
923 })
924 }
925
926 async fn get_maybe_stripped_state_events_for_keys(
927 &self,
928 room_id: Key,
929 event_type: Key,
930 state_keys: Vec<Key>,
931 ) -> Result<Vec<(bool, Vec<u8>)>> {
932 self.chunk_large_query_over(state_keys, None, move |txn, state_keys: Vec<Key>| {
933 let sql_params = repeat_vars(state_keys.len());
934 let sql = format!(
935 "SELECT stripped, data FROM state_event
936 WHERE room_id = ? AND event_type = ? AND state_key IN ({sql_params})"
937 );
938
939 let params = rusqlite::params_from_iter(
940 [room_id.clone(), event_type.clone()].into_iter().chain(state_keys),
941 );
942
943 Ok(txn
944 .prepare(&sql)?
945 .query(params)?
946 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
947 .collect::<Result<_, _>>()?)
948 })
949 .await
950 }
951
952 async fn get_maybe_stripped_state_events(
953 &self,
954 room_id: Key,
955 event_type: Key,
956 ) -> Result<Vec<(bool, Vec<u8>)>> {
957 Ok(self
958 .prepare(
959 "SELECT stripped, data FROM state_event
960 WHERE room_id = ? AND event_type = ?",
961 |mut stmt| {
962 stmt.query((room_id, event_type))?
963 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
964 .collect()
965 },
966 )
967 .await?)
968 }
969
970 async fn get_profiles(
971 &self,
972 room_id: Key,
973 user_ids: Vec<Key>,
974 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
975 let user_ids_length = user_ids.len();
976
977 self.chunk_large_query_over(user_ids, Some(user_ids_length), move |txn, user_ids| {
978 let sql_params = repeat_vars(user_ids.len());
979 let sql = format!(
980 "SELECT user_id, data FROM profile WHERE room_id = ? AND user_id IN ({sql_params})"
981 );
982
983 let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(user_ids));
984
985 Ok(txn
986 .prepare(&sql)?
987 .query(params)?
988 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
989 .collect::<Result<_, _>>()?)
990 })
991 .await
992 }
993
994 async fn get_user_ids(&self, room_id: Key, memberships: Vec<Key>) -> Result<Vec<Vec<u8>>> {
995 let res = if memberships.is_empty() {
996 self.prepare("SELECT data FROM member WHERE room_id = ?", |mut stmt| {
997 stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
998 })
999 .await?
1000 } else {
1001 self.chunk_large_query_over(memberships, None, move |txn, memberships| {
1002 let sql_params = repeat_vars(memberships.len());
1003 let sql = format!(
1004 "SELECT data FROM member WHERE room_id = ? AND membership IN ({sql_params})"
1005 );
1006
1007 let params =
1008 rusqlite::params_from_iter(iter::once(room_id.clone()).chain(memberships));
1009
1010 Ok(txn
1011 .prepare(&sql)?
1012 .query(params)?
1013 .mapped(|row| row.get(0))
1014 .collect::<Result<_, _>>()?)
1015 })
1016 .await?
1017 };
1018
1019 Ok(res)
1020 }
1021
1022 async fn get_global_account_data(&self, event_type: Key) -> Result<Option<Vec<u8>>> {
1023 Ok(self
1024 .query_row(
1025 "SELECT data FROM global_account_data WHERE event_type = ?",
1026 (event_type,),
1027 |row| row.get(0),
1028 )
1029 .await
1030 .optional()?)
1031 }
1032
1033 async fn get_room_account_data(
1034 &self,
1035 room_id: Key,
1036 event_type: Key,
1037 ) -> Result<Option<Vec<u8>>> {
1038 Ok(self
1039 .query_row(
1040 "SELECT data FROM room_account_data WHERE room_id = ? AND event_type = ?",
1041 (room_id, event_type),
1042 |row| row.get(0),
1043 )
1044 .await
1045 .optional()?)
1046 }
1047
1048 async fn get_display_names(
1049 &self,
1050 room_id: Key,
1051 names: Vec<Key>,
1052 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1053 let names_length = names.len();
1054
1055 self.chunk_large_query_over(names, Some(names_length), move |txn, names| {
1056 let sql_params = repeat_vars(names.len());
1057 let sql = format!(
1058 "SELECT name, data FROM display_name WHERE room_id = ? AND name IN ({sql_params})"
1059 );
1060
1061 let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(names));
1062
1063 Ok(txn
1064 .prepare(&sql)?
1065 .query(params)?
1066 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
1067 .collect::<Result<_, _>>()?)
1068 })
1069 .await
1070 }
1071
1072 async fn get_user_receipt(
1073 &self,
1074 room_id: Key,
1075 receipt_type: Key,
1076 thread: Key,
1077 user_id: Key,
1078 ) -> Result<Option<Vec<u8>>> {
1079 Ok(self
1080 .query_row(
1081 "SELECT data FROM receipt
1082 WHERE room_id = ? AND receipt_type = ? AND thread = ? and user_id = ?",
1083 (room_id, receipt_type, thread, user_id),
1084 |row| row.get(0),
1085 )
1086 .await
1087 .optional()?)
1088 }
1089
1090 async fn get_event_receipts(
1091 &self,
1092 room_id: Key,
1093 receipt_type: Key,
1094 thread: Key,
1095 event_id: Key,
1096 ) -> Result<Vec<Vec<u8>>> {
1097 Ok(self
1098 .prepare(
1099 "SELECT data FROM receipt
1100 WHERE room_id = ? AND receipt_type = ? AND thread = ? and event_id = ?",
1101 |mut stmt| {
1102 stmt.query((room_id, receipt_type, thread, event_id))?
1103 .mapped(|row| row.get(0))
1104 .collect()
1105 },
1106 )
1107 .await?)
1108 }
1109}
1110
1111#[async_trait]
1112impl SqliteObjectStateStoreExt for SqliteAsyncConn {
1113 async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()> {
1114 Ok(self.interact(move |conn| conn.set_kv_blob(&key, &value)).await.unwrap()?)
1115 }
1116}
1117
1118#[async_trait]
1119impl StateStore for SqliteStateStore {
1120 type Error = Error;
1121
1122 async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
1123 self.read()
1124 .await?
1125 .get_kv_blob(self.encode_state_store_data_key(key))
1126 .await?
1127 .map(|data| {
1128 Ok(match key {
1129 StateStoreDataKey::SyncToken => {
1130 StateStoreDataValue::SyncToken(self.deserialize_value(&data)?)
1131 }
1132 StateStoreDataKey::SupportedVersions => {
1133 StateStoreDataValue::SupportedVersions(self.deserialize_value(&data)?)
1134 }
1135 StateStoreDataKey::WellKnown => {
1136 StateStoreDataValue::WellKnown(self.deserialize_value(&data)?)
1137 }
1138 StateStoreDataKey::Filter(_) => {
1139 StateStoreDataValue::Filter(self.deserialize_value(&data)?)
1140 }
1141 StateStoreDataKey::UserAvatarUrl(_) => {
1142 StateStoreDataValue::UserAvatarUrl(self.deserialize_value(&data)?)
1143 }
1144 StateStoreDataKey::RecentlyVisitedRooms(_) => {
1145 StateStoreDataValue::RecentlyVisitedRooms(self.deserialize_value(&data)?)
1146 }
1147 StateStoreDataKey::UtdHookManagerData => {
1148 StateStoreDataValue::UtdHookManagerData(self.deserialize_value(&data)?)
1149 }
1150 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1151 StateStoreDataValue::OneTimeKeyAlreadyUploaded
1152 }
1153 StateStoreDataKey::ComposerDraft(_, _) => {
1154 StateStoreDataValue::ComposerDraft(self.deserialize_value(&data)?)
1155 }
1156 StateStoreDataKey::SeenKnockRequests(_) => {
1157 StateStoreDataValue::SeenKnockRequests(self.deserialize_value(&data)?)
1158 }
1159 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
1160 StateStoreDataValue::ThreadSubscriptionsCatchupTokens(
1161 self.deserialize_value(&data)?,
1162 )
1163 }
1164 })
1165 })
1166 .transpose()
1167 }
1168
1169 async fn set_kv_data(
1170 &self,
1171 key: StateStoreDataKey<'_>,
1172 value: StateStoreDataValue,
1173 ) -> Result<()> {
1174 let serialized_value = match key {
1175 StateStoreDataKey::SyncToken => self.serialize_value(
1176 &value.into_sync_token().expect("Session data not a sync token"),
1177 )?,
1178 StateStoreDataKey::SupportedVersions => self.serialize_value(
1179 &value
1180 .into_supported_versions()
1181 .expect("Session data not containing supported versions"),
1182 )?,
1183 StateStoreDataKey::WellKnown => self.serialize_value(
1184 &value.into_well_known().expect("Session data not containing well-known"),
1185 )?,
1186 StateStoreDataKey::Filter(_) => {
1187 self.serialize_value(&value.into_filter().expect("Session data not a filter"))?
1188 }
1189 StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
1190 &value.into_user_avatar_url().expect("Session data not an user avatar url"),
1191 )?,
1192 StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
1193 &value.into_recently_visited_rooms().expect("Session data not breadcrumbs"),
1194 )?,
1195 StateStoreDataKey::UtdHookManagerData => self.serialize_value(
1196 &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
1197 )?,
1198 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1199 self.serialize_value(&true).expect("We should be able to serialize a boolean")
1200 }
1201 StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
1202 &value.into_composer_draft().expect("Session data not a composer draft"),
1203 )?,
1204 StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
1205 &value
1206 .into_seen_knock_requests()
1207 .expect("Session data is not a set of seen knock request ids"),
1208 )?,
1209 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => self.serialize_value(
1210 &value
1211 .into_thread_subscriptions_catchup_tokens()
1212 .expect("Session data is not a list of thread subscription catchup tokens"),
1213 )?,
1214 };
1215
1216 self.write()
1217 .await
1218 .set_kv_blob(self.encode_state_store_data_key(key), serialized_value)
1219 .await
1220 }
1221
1222 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
1223 self.write().await.delete_kv_blob(self.encode_state_store_data_key(key)).await
1224 }
1225
1226 async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
1227 let changes = changes.to_owned();
1228 let this = self.clone();
1229 self.write()
1230 .await
1231 .with_transaction(move |txn| {
1232 let StateChanges {
1233 sync_token,
1234 account_data,
1235 presence,
1236 profiles,
1237 profiles_to_delete,
1238 state,
1239 room_account_data,
1240 room_infos,
1241 receipts,
1242 redactions,
1243 stripped_state,
1244 ambiguity_maps,
1245 } = changes;
1246
1247 if let Some(sync_token) = sync_token {
1248 let key = this.encode_state_store_data_key(StateStoreDataKey::SyncToken);
1249 let value = this.serialize_value(&sync_token)?;
1250 txn.set_kv_blob(&key, &value)?;
1251 }
1252
1253 for (event_type, event) in account_data {
1254 let event_type =
1255 this.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1256 let data = this.serialize_json(&event)?;
1257 txn.set_global_account_data(&event_type, &data)?;
1258 }
1259
1260 for (room_id, events) in room_account_data {
1261 let room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1262 for (event_type, event) in events {
1263 let event_type =
1264 this.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1265 let data = this.serialize_json(&event)?;
1266 txn.set_room_account_data(&room_id, &event_type, &data)?;
1267 }
1268 }
1269
1270 for (user_id, event) in presence {
1271 let key = this.encode_presence_key(&user_id);
1272 let value = this.serialize_json(&event)?;
1273 txn.set_kv_blob(&key, &value)?;
1274 }
1275
1276 for (room_id, room_info) in room_infos {
1277 let stripped = room_info.state() == RoomState::Invited;
1278 this.remove_maybe_stripped_room_data(txn, &room_id, !stripped)?;
1280
1281 let room_id = this.encode_key(keys::ROOM_INFO, room_id);
1282 let state = this
1283 .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
1284 let data = this.serialize_json(&room_info)?;
1285 txn.set_room_info(&room_id, &state, &data)?;
1286 }
1287
1288 for (room_id, user_ids) in profiles_to_delete {
1289 let room_id = this.encode_key(keys::PROFILE, room_id);
1290 for user_id in user_ids {
1291 let user_id = this.encode_key(keys::PROFILE, user_id);
1292 txn.remove_room_profile(&room_id, &user_id)?;
1293 }
1294 }
1295
1296 for (room_id, state_event_types) in state {
1297 let profiles = profiles.get(&room_id);
1298 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1299
1300 for (event_type, state_events) in state_event_types {
1301 let encoded_event_type =
1302 this.encode_key(keys::STATE_EVENT, event_type.to_string());
1303
1304 for (state_key, raw_state_event) in state_events {
1305 let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1306 let data = this.serialize_json(&raw_state_event)?;
1307
1308 let event_id: Option<String> =
1309 raw_state_event.get_field("event_id").ok().flatten();
1310 let encoded_event_id =
1311 event_id.as_ref().map(|e| this.encode_key(keys::STATE_EVENT, e));
1312
1313 txn.set_state_event(
1314 &encoded_room_id,
1315 &encoded_event_type,
1316 &encoded_state_key,
1317 false,
1318 encoded_event_id.as_deref(),
1319 &data,
1320 )?;
1321
1322 if event_type == StateEventType::RoomMember {
1323 let member_event = match raw_state_event
1324 .deserialize_as_unchecked::<SyncRoomMemberEvent>()
1325 {
1326 Ok(ev) => ev,
1327 Err(e) => {
1328 debug!(event_id, "Failed to deserialize member event: {e}");
1329 continue;
1330 }
1331 };
1332
1333 let encoded_room_id = this.encode_key(keys::MEMBER, &room_id);
1334 let user_id = this.encode_key(keys::MEMBER, &state_key);
1335 let membership = this
1336 .encode_key(keys::MEMBER, member_event.membership().as_str());
1337 let data = this.serialize_value(&state_key)?;
1338
1339 txn.set_member(
1340 &encoded_room_id,
1341 &user_id,
1342 &membership,
1343 false,
1344 &data,
1345 )?;
1346
1347 if let Some(profile) =
1348 profiles.and_then(|p| p.get(member_event.state_key()))
1349 {
1350 let room_id = this.encode_key(keys::PROFILE, &room_id);
1351 let user_id = this.encode_key(keys::PROFILE, &state_key);
1352 let data = this.serialize_json(&profile)?;
1353 txn.set_profile(&room_id, &user_id, &data)?;
1354 }
1355 }
1356 }
1357 }
1358 }
1359
1360 for (room_id, stripped_state_event_types) in stripped_state {
1361 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1362
1363 for (event_type, stripped_state_events) in stripped_state_event_types {
1364 let encoded_event_type =
1365 this.encode_key(keys::STATE_EVENT, event_type.to_string());
1366
1367 for (state_key, raw_stripped_state_event) in stripped_state_events {
1368 let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1369 let data = this.serialize_json(&raw_stripped_state_event)?;
1370 txn.set_state_event(
1371 &encoded_room_id,
1372 &encoded_event_type,
1373 &encoded_state_key,
1374 true,
1375 None,
1376 &data,
1377 )?;
1378
1379 if event_type == StateEventType::RoomMember {
1380 let member_event = match raw_stripped_state_event
1381 .deserialize_as_unchecked::<StrippedRoomMemberEvent>(
1382 ) {
1383 Ok(ev) => ev,
1384 Err(e) => {
1385 debug!("Failed to deserialize stripped member event: {e}");
1386 continue;
1387 }
1388 };
1389
1390 let room_id = this.encode_key(keys::MEMBER, &room_id);
1391 let user_id = this.encode_key(keys::MEMBER, &state_key);
1392 let membership = this.encode_key(
1393 keys::MEMBER,
1394 member_event.content.membership.as_str(),
1395 );
1396 let data = this.serialize_value(&state_key)?;
1397
1398 txn.set_member(&room_id, &user_id, &membership, true, &data)?;
1399 }
1400 }
1401 }
1402 }
1403
1404 for (room_id, receipt_event) in receipts {
1405 let room_id = this.encode_key(keys::RECEIPT, room_id);
1406
1407 for (event_id, receipt_types) in receipt_event {
1408 let encoded_event_id = this.encode_key(keys::RECEIPT, &event_id);
1409
1410 for (receipt_type, receipt_users) in receipt_types {
1411 let receipt_type =
1412 this.encode_key(keys::RECEIPT, receipt_type.as_str());
1413
1414 for (user_id, receipt) in receipt_users {
1415 let encoded_user_id = this.encode_key(keys::RECEIPT, &user_id);
1416 let thread = this.encode_key(
1419 keys::RECEIPT,
1420 rmp_serde::to_vec_named(&receipt.thread)?,
1421 );
1422 let data = this.serialize_json(&ReceiptData {
1423 receipt,
1424 event_id: event_id.clone(),
1425 user_id,
1426 })?;
1427
1428 txn.set_receipt(
1429 &room_id,
1430 &encoded_user_id,
1431 &receipt_type,
1432 &thread,
1433 &encoded_event_id,
1434 &data,
1435 )?;
1436 }
1437 }
1438 }
1439 }
1440
1441 for (room_id, redactions) in redactions {
1442 let make_redaction_rules = || {
1443 let encoded_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1444 txn.get_room_info(&encoded_room_id)
1445 .ok()
1446 .flatten()
1447 .and_then(|v| this.deserialize_json::<RoomInfo>(&v).ok())
1448 .map(|info| info.room_version_rules_or_default())
1449 .unwrap_or_else(|| {
1450 warn!(
1451 ?room_id,
1452 "Unable to get the room version rules, defaulting to rules for room version {ROOM_VERSION_FALLBACK}"
1453 );
1454 ROOM_VERSION_RULES_FALLBACK
1455 }).redaction
1456 };
1457
1458 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1459 let mut redaction_rules = None;
1460
1461 for (event_id, redaction) in redactions {
1462 let event_id = this.encode_key(keys::STATE_EVENT, event_id);
1463
1464 if let Some(Ok(raw_event)) = txn
1465 .get_state_event_by_id(&encoded_room_id, &event_id)?
1466 .map(|value| this.deserialize_json::<Raw<AnySyncStateEvent>>(&value))
1467 {
1468 let event = raw_event.deserialize()?;
1469 let redacted = redact(
1470 raw_event.deserialize_as::<CanonicalJsonObject>()?,
1471 redaction_rules.get_or_insert_with(make_redaction_rules),
1472 Some(RedactedBecause::from_raw_event(&redaction)?),
1473 )
1474 .map_err(Error::Redaction)?;
1475 let data = this.serialize_json(&redacted)?;
1476
1477 let event_type =
1478 this.encode_key(keys::STATE_EVENT, event.event_type().to_string());
1479 let state_key = this.encode_key(keys::STATE_EVENT, event.state_key());
1480
1481 txn.set_state_event(
1482 &encoded_room_id,
1483 &event_type,
1484 &state_key,
1485 false,
1486 Some(&event_id),
1487 &data,
1488 )?;
1489 }
1490 }
1491 }
1492
1493 for (room_id, display_names) in ambiguity_maps {
1494 let room_id = this.encode_key(keys::DISPLAY_NAME, room_id);
1495
1496 for (name, user_ids) in display_names {
1497 let encoded_name = this.encode_key(
1498 keys::DISPLAY_NAME,
1499 name.as_normalized_str().unwrap_or_else(|| name.as_raw_str()),
1500 );
1501 let data = this.serialize_json(&user_ids)?;
1502
1503 if user_ids.is_empty() {
1504 txn.remove_display_name(&room_id, &encoded_name)?;
1505
1506 let raw_name = this.encode_key(keys::DISPLAY_NAME, name.as_raw_str());
1521 txn.remove_display_name(&room_id, &raw_name)?;
1522 } else {
1523 txn.set_display_name(&room_id, &encoded_name, &data)?;
1525 }
1526 }
1527 }
1528
1529 Ok::<_, Error>(())
1530 })
1531 .await?;
1532
1533 Ok(())
1534 }
1535
1536 async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1537 self.read()
1538 .await?
1539 .get_kv_blob(self.encode_presence_key(user_id))
1540 .await?
1541 .map(|data| self.deserialize_json(&data))
1542 .transpose()
1543 }
1544
1545 async fn get_presence_events(
1546 &self,
1547 user_ids: &[OwnedUserId],
1548 ) -> Result<Vec<Raw<PresenceEvent>>> {
1549 if user_ids.is_empty() {
1550 return Ok(Vec::new());
1551 }
1552
1553 let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
1554 self.read()
1555 .await?
1556 .get_kv_blobs(user_ids)
1557 .await?
1558 .into_iter()
1559 .map(|data| self.deserialize_json(&data))
1560 .collect()
1561 }
1562
1563 async fn get_state_event(
1564 &self,
1565 room_id: &RoomId,
1566 event_type: StateEventType,
1567 state_key: &str,
1568 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1569 Ok(self
1570 .get_state_events_for_keys(room_id, event_type, &[state_key])
1571 .await?
1572 .into_iter()
1573 .next())
1574 }
1575
1576 async fn get_state_events(
1577 &self,
1578 room_id: &RoomId,
1579 event_type: StateEventType,
1580 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1581 let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1582 let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1583 self.read()
1584 .await?
1585 .get_maybe_stripped_state_events(room_id, event_type)
1586 .await?
1587 .into_iter()
1588 .map(|(stripped, data)| {
1589 let ev = if stripped {
1590 RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1591 } else {
1592 RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1593 };
1594
1595 Ok(ev)
1596 })
1597 .collect()
1598 }
1599
1600 async fn get_state_events_for_keys(
1601 &self,
1602 room_id: &RoomId,
1603 event_type: StateEventType,
1604 state_keys: &[&str],
1605 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1606 if state_keys.is_empty() {
1607 return Ok(Vec::new());
1608 }
1609
1610 let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1611 let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1612 let state_keys = state_keys.iter().map(|k| self.encode_key(keys::STATE_EVENT, k)).collect();
1613 self.read()
1614 .await?
1615 .get_maybe_stripped_state_events_for_keys(room_id, event_type, state_keys)
1616 .await?
1617 .into_iter()
1618 .map(|(stripped, data)| {
1619 let ev = if stripped {
1620 RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1621 } else {
1622 RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1623 };
1624
1625 Ok(ev)
1626 })
1627 .collect()
1628 }
1629
1630 async fn get_profile(
1631 &self,
1632 room_id: &RoomId,
1633 user_id: &UserId,
1634 ) -> Result<Option<MinimalRoomMemberEvent>> {
1635 let room_id = self.encode_key(keys::PROFILE, room_id);
1636 let user_ids = vec![self.encode_key(keys::PROFILE, user_id)];
1637
1638 self.read()
1639 .await?
1640 .get_profiles(room_id, user_ids)
1641 .await?
1642 .into_iter()
1643 .next()
1644 .map(|(_, data)| self.deserialize_json(&data))
1645 .transpose()
1646 }
1647
1648 async fn get_profiles<'a>(
1649 &self,
1650 room_id: &RoomId,
1651 user_ids: &'a [OwnedUserId],
1652 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1653 if user_ids.is_empty() {
1654 return Ok(BTreeMap::new());
1655 }
1656
1657 let room_id = self.encode_key(keys::PROFILE, room_id);
1658 let mut user_ids_map = user_ids
1659 .iter()
1660 .map(|u| (self.encode_key(keys::PROFILE, u), u.as_ref()))
1661 .collect::<BTreeMap<_, _>>();
1662 let user_ids = user_ids_map.keys().cloned().collect();
1663
1664 self.read()
1665 .await?
1666 .get_profiles(room_id, user_ids)
1667 .await?
1668 .into_iter()
1669 .map(|(user_id, data)| {
1670 Ok((
1671 user_ids_map
1672 .remove(user_id.as_slice())
1673 .expect("returned user IDs were requested"),
1674 self.deserialize_json(&data)?,
1675 ))
1676 })
1677 .collect()
1678 }
1679
1680 async fn get_user_ids(
1681 &self,
1682 room_id: &RoomId,
1683 membership: RoomMemberships,
1684 ) -> Result<Vec<OwnedUserId>> {
1685 let room_id = self.encode_key(keys::MEMBER, room_id);
1686 let memberships = membership
1687 .as_vec()
1688 .into_iter()
1689 .map(|m| self.encode_key(keys::MEMBER, m.as_str()))
1690 .collect();
1691 self.read()
1692 .await?
1693 .get_user_ids(room_id, memberships)
1694 .await?
1695 .iter()
1696 .map(|data| self.deserialize_value(data))
1697 .collect()
1698 }
1699
1700 async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1701 self.read()
1702 .await?
1703 .get_room_infos(match room_load_settings {
1704 RoomLoadSettings::All => None,
1705 RoomLoadSettings::One(room_id) => Some(self.encode_key(keys::ROOM_INFO, room_id)),
1706 })
1707 .await?
1708 .into_iter()
1709 .map(|data| self.deserialize_json(&data))
1710 .collect()
1711 }
1712
1713 async fn get_users_with_display_name(
1714 &self,
1715 room_id: &RoomId,
1716 display_name: &DisplayName,
1717 ) -> Result<BTreeSet<OwnedUserId>> {
1718 let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1719 let names = vec![self.encode_key(
1720 keys::DISPLAY_NAME,
1721 display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1722 )];
1723
1724 Ok(self
1725 .read()
1726 .await?
1727 .get_display_names(room_id, names)
1728 .await?
1729 .into_iter()
1730 .next()
1731 .map(|(_, data)| self.deserialize_json(&data))
1732 .transpose()?
1733 .unwrap_or_default())
1734 }
1735
1736 async fn get_users_with_display_names<'a>(
1737 &self,
1738 room_id: &RoomId,
1739 display_names: &'a [DisplayName],
1740 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1741 let mut result = HashMap::new();
1742
1743 if display_names.is_empty() {
1744 return Ok(result);
1745 }
1746
1747 let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1748 let mut names_map = display_names
1749 .iter()
1750 .flat_map(|display_name| {
1751 let raw =
1761 (self.encode_key(keys::DISPLAY_NAME, display_name.as_raw_str()), display_name);
1762 let normalized = display_name.as_normalized_str().map(|normalized| {
1763 (self.encode_key(keys::DISPLAY_NAME, normalized), display_name)
1764 });
1765
1766 iter::once(raw).chain(normalized)
1767 })
1768 .collect::<BTreeMap<_, _>>();
1769 let names = names_map.keys().cloned().collect();
1770
1771 for (name, data) in self.read().await?.get_display_names(room_id, names).await?.into_iter()
1772 {
1773 let display_name =
1774 names_map.remove(name.as_slice()).expect("returned display names were requested");
1775 let user_ids: BTreeSet<_> = self.deserialize_json(&data)?;
1776
1777 result.entry(display_name).or_insert_with(BTreeSet::new).extend(user_ids);
1778 }
1779
1780 Ok(result)
1781 }
1782
1783 async fn get_account_data_event(
1784 &self,
1785 event_type: GlobalAccountDataEventType,
1786 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1787 let event_type = self.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1788 self.read()
1789 .await?
1790 .get_global_account_data(event_type)
1791 .await?
1792 .map(|value| self.deserialize_json(&value))
1793 .transpose()
1794 }
1795
1796 async fn get_room_account_data_event(
1797 &self,
1798 room_id: &RoomId,
1799 event_type: RoomAccountDataEventType,
1800 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1801 let room_id = self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1802 let event_type = self.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1803 self.read()
1804 .await?
1805 .get_room_account_data(room_id, event_type)
1806 .await?
1807 .map(|value| self.deserialize_json(&value))
1808 .transpose()
1809 }
1810
1811 async fn get_user_room_receipt_event(
1812 &self,
1813 room_id: &RoomId,
1814 receipt_type: ReceiptType,
1815 thread: ReceiptThread,
1816 user_id: &UserId,
1817 ) -> Result<Option<(OwnedEventId, Receipt)>> {
1818 let room_id = self.encode_key(keys::RECEIPT, room_id);
1819 let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1820 let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1823 let user_id = self.encode_key(keys::RECEIPT, user_id);
1824
1825 self.read()
1826 .await?
1827 .get_user_receipt(room_id, receipt_type, thread, user_id)
1828 .await?
1829 .map(|value| {
1830 self.deserialize_json::<ReceiptData>(&value).map(|d| (d.event_id, d.receipt))
1831 })
1832 .transpose()
1833 }
1834
1835 async fn get_event_room_receipt_events(
1836 &self,
1837 room_id: &RoomId,
1838 receipt_type: ReceiptType,
1839 thread: ReceiptThread,
1840 event_id: &EventId,
1841 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1842 let room_id = self.encode_key(keys::RECEIPT, room_id);
1843 let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1844 let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1847 let event_id = self.encode_key(keys::RECEIPT, event_id);
1848
1849 self.read()
1850 .await?
1851 .get_event_receipts(room_id, receipt_type, thread, event_id)
1852 .await?
1853 .iter()
1854 .map(|value| {
1855 self.deserialize_json::<ReceiptData>(value).map(|d| (d.user_id, d.receipt))
1856 })
1857 .collect()
1858 }
1859
1860 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1861 self.read().await?.get_kv_blob(self.encode_custom_key(key)).await
1862 }
1863
1864 async fn set_custom_value_no_read(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
1865 let conn = self.write().await;
1866 let key = self.encode_custom_key(key);
1867 conn.set_kv_blob(key, value).await?;
1868 Ok(())
1869 }
1870
1871 async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1872 let conn = self.write().await;
1873 let key = self.encode_custom_key(key);
1874 let previous = conn.get_kv_blob(key.clone()).await?;
1875 conn.set_kv_blob(key, value).await?;
1876 Ok(previous)
1877 }
1878
1879 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1880 let conn = self.write().await;
1881 let key = self.encode_custom_key(key);
1882 let previous = conn.get_kv_blob(key.clone()).await?;
1883 if previous.is_some() {
1884 conn.delete_kv_blob(key).await?;
1885 }
1886 Ok(previous)
1887 }
1888
1889 async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1890 let this = self.clone();
1891 let room_id = room_id.to_owned();
1892
1893 let conn = self.write().await;
1894
1895 conn.with_transaction(move |txn| -> Result<()> {
1896 let room_info_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1897 txn.remove_room_info(&room_info_room_id)?;
1898
1899 let state_event_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1900 txn.remove_room_state_events(&state_event_room_id, None)?;
1901
1902 let member_room_id = this.encode_key(keys::MEMBER, &room_id);
1903 txn.remove_room_members(&member_room_id, None)?;
1904
1905 let profile_room_id = this.encode_key(keys::PROFILE, &room_id);
1906 txn.remove_room_profiles(&profile_room_id)?;
1907
1908 let room_account_data_room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, &room_id);
1909 txn.remove_room_account_data(&room_account_data_room_id)?;
1910
1911 let receipt_room_id = this.encode_key(keys::RECEIPT, &room_id);
1912 txn.remove_room_receipts(&receipt_room_id)?;
1913
1914 let display_name_room_id = this.encode_key(keys::DISPLAY_NAME, &room_id);
1915 txn.remove_room_display_names(&display_name_room_id)?;
1916
1917 let send_queue_room_id = this.encode_key(keys::SEND_QUEUE, &room_id);
1918 txn.remove_room_send_queue(&send_queue_room_id)?;
1919
1920 let dependent_send_queue_room_id =
1921 this.encode_key(keys::DEPENDENTS_SEND_QUEUE, &room_id);
1922 txn.remove_room_dependent_send_queue(&dependent_send_queue_room_id)?;
1923
1924 let thread_subscriptions_room_id =
1925 this.encode_key(keys::THREAD_SUBSCRIPTIONS, &room_id);
1926 txn.execute(
1927 "DELETE FROM thread_subscriptions WHERE room_id = ?",
1928 (thread_subscriptions_room_id,),
1929 )?;
1930
1931 Ok(())
1932 })
1933 .await?;
1934
1935 conn.vacuum().await
1936 }
1937
1938 async fn save_send_queue_request(
1939 &self,
1940 room_id: &RoomId,
1941 transaction_id: OwnedTransactionId,
1942 created_at: MilliSecondsSinceUnixEpoch,
1943 content: QueuedRequestKind,
1944 priority: usize,
1945 ) -> Result<(), Self::Error> {
1946 let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
1947 let room_id_value = self.serialize_value(&room_id.to_owned())?;
1948
1949 let content = self.serialize_json(&content)?;
1950 let created_at_ts: u64 = created_at.0.into();
1956 self.write()
1957 .await
1958 .with_transaction(move |txn| {
1959 txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
1960 Ok(())
1961 })
1962 .await
1963 }
1964
1965 async fn update_send_queue_request(
1966 &self,
1967 room_id: &RoomId,
1968 transaction_id: &TransactionId,
1969 content: QueuedRequestKind,
1970 ) -> Result<bool, Self::Error> {
1971 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1972
1973 let content = self.serialize_json(&content)?;
1974 let transaction_id = transaction_id.to_string();
1977
1978 let num_updated = self.write()
1979 .await
1980 .with_transaction(move |txn| {
1981 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
1982 })
1983 .await?;
1984
1985 Ok(num_updated > 0)
1986 }
1987
1988 async fn remove_send_queue_request(
1989 &self,
1990 room_id: &RoomId,
1991 transaction_id: &TransactionId,
1992 ) -> Result<bool, Self::Error> {
1993 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1994
1995 let transaction_id = transaction_id.to_string();
1997
1998 let num_deleted = self
1999 .write()
2000 .await
2001 .with_transaction(move |txn| {
2002 txn.prepare_cached(
2003 "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
2004 )?
2005 .execute((room_id, &transaction_id))
2006 })
2007 .await?;
2008
2009 Ok(num_deleted > 0)
2010 }
2011
2012 async fn load_send_queue_requests(
2013 &self,
2014 room_id: &RoomId,
2015 ) -> Result<Vec<QueuedRequest>, Self::Error> {
2016 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
2017
2018 let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, Option<u64>)> = self
2022 .read()
2023 .await?
2024 .prepare(
2025 "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
2026 |mut stmt| {
2027 stmt.query((room_id,))?
2028 .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2029 .collect()
2030 },
2031 )
2032 .await?;
2033
2034 let mut requests = Vec::with_capacity(res.len());
2035
2036 for entry in res {
2037 let created_at = entry
2038 .4
2039 .and_then(UInt::new)
2040 .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2041
2042 requests.push(QueuedRequest {
2043 transaction_id: entry.0.into(),
2044 kind: self.deserialize_json(&entry.1)?,
2045 error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
2046 priority: entry.3,
2047 created_at,
2048 });
2049 }
2050
2051 Ok(requests)
2052 }
2053
2054 async fn update_send_queue_request_status(
2055 &self,
2056 room_id: &RoomId,
2057 transaction_id: &TransactionId,
2058 error: Option<QueueWedgeError>,
2059 ) -> Result<(), Self::Error> {
2060 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
2061
2062 let transaction_id = transaction_id.to_string();
2064
2065 let error_value = error.map(|e| self.serialize_value(&e)).transpose()?;
2067
2068 self.write()
2069 .await
2070 .with_transaction(move |txn| {
2071 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?;
2072 Ok(())
2073 })
2074 .await
2075 }
2076
2077 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
2078 let res: Vec<Vec<u8>> = self
2083 .read()
2084 .await?
2085 .prepare("SELECT room_id_val FROM send_queue_events", |mut stmt| {
2086 stmt.query(())?.mapped(|row| row.get(0)).collect()
2087 })
2088 .await?;
2089
2090 Ok(res
2093 .into_iter()
2094 .map(|entry| self.deserialize_value(&entry))
2095 .collect::<Result<BTreeSet<OwnedRoomId>, _>>()?
2096 .into_iter()
2097 .collect())
2098 }
2099
2100 async fn save_dependent_queued_request(
2101 &self,
2102 room_id: &RoomId,
2103 parent_txn_id: &TransactionId,
2104 own_txn_id: ChildTransactionId,
2105 created_at: MilliSecondsSinceUnixEpoch,
2106 content: DependentQueuedRequestKind,
2107 ) -> Result<()> {
2108 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2109 let content = self.serialize_json(&content)?;
2110
2111 let parent_txn_id = parent_txn_id.to_string();
2113 let own_txn_id = own_txn_id.to_string();
2114
2115 let created_at_ts: u64 = created_at.0.into();
2116 self.write()
2117 .await
2118 .with_transaction(move |txn| {
2119 txn.prepare_cached(
2120 r#"INSERT INTO dependent_send_queue_events
2121 (room_id, parent_transaction_id, own_transaction_id, content, created_at)
2122 VALUES (?, ?, ?, ?, ?)"#,
2123 )?
2124 .execute((
2125 room_id,
2126 parent_txn_id,
2127 own_txn_id,
2128 content,
2129 created_at_ts,
2130 ))?;
2131 Ok(())
2132 })
2133 .await
2134 }
2135
2136 async fn update_dependent_queued_request(
2137 &self,
2138 room_id: &RoomId,
2139 own_transaction_id: &ChildTransactionId,
2140 new_content: DependentQueuedRequestKind,
2141 ) -> Result<bool> {
2142 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2143 let content = self.serialize_json(&new_content)?;
2144
2145 let own_txn_id = own_transaction_id.to_string();
2147
2148 let num_updated = self
2149 .write()
2150 .await
2151 .with_transaction(move |txn| {
2152 txn.prepare_cached(
2153 r#"UPDATE dependent_send_queue_events
2154 SET content = ?
2155 WHERE own_transaction_id = ?
2156 AND room_id = ?"#,
2157 )?
2158 .execute((content, own_txn_id, room_id))
2159 })
2160 .await?;
2161
2162 if num_updated > 1 {
2163 return Err(Error::InconsistentUpdate);
2164 }
2165
2166 Ok(num_updated == 1)
2167 }
2168
2169 async fn mark_dependent_queued_requests_as_ready(
2170 &self,
2171 room_id: &RoomId,
2172 parent_txn_id: &TransactionId,
2173 parent_key: SentRequestKey,
2174 ) -> Result<usize> {
2175 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2176 let parent_key = self.serialize_json(&parent_key)?;
2177
2178 let parent_txn_id = parent_txn_id.to_string();
2180
2181 self.write()
2182 .await
2183 .with_transaction(move |txn| {
2184 Ok(txn.prepare_cached(
2185 "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?",
2186 )?
2187 .execute((parent_key, parent_txn_id, room_id))?)
2188 })
2189 .await
2190 }
2191
2192 async fn remove_dependent_queued_request(
2193 &self,
2194 room_id: &RoomId,
2195 txn_id: &ChildTransactionId,
2196 ) -> Result<bool> {
2197 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2198
2199 let txn_id = txn_id.to_string();
2201
2202 let num_deleted = self
2203 .write()
2204 .await
2205 .with_transaction(move |txn| {
2206 txn.prepare_cached(
2207 "DELETE FROM dependent_send_queue_events WHERE own_transaction_id = ? AND room_id = ?",
2208 )?
2209 .execute((txn_id, room_id))
2210 })
2211 .await?;
2212
2213 Ok(num_deleted > 0)
2214 }
2215
2216 async fn load_dependent_queued_requests(
2217 &self,
2218 room_id: &RoomId,
2219 ) -> Result<Vec<DependentQueuedRequest>> {
2220 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2221
2222 let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, Option<u64>)> = self
2224 .read()
2225 .await?
2226 .prepare(
2227 "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2228 |mut stmt| {
2229 stmt.query((room_id,))?
2230 .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2231 .collect()
2232 },
2233 )
2234 .await?;
2235
2236 let mut dependent_events = Vec::with_capacity(res.len());
2237
2238 for entry in res {
2239 let created_at = entry
2240 .4
2241 .and_then(UInt::new)
2242 .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2243
2244 dependent_events.push(DependentQueuedRequest {
2245 own_transaction_id: entry.0.into(),
2246 parent_transaction_id: entry.1.into(),
2247 parent_key: entry.2.map(|json| self.deserialize_json(&json)).transpose()?,
2248 kind: self.deserialize_json(&entry.3)?,
2249 created_at,
2250 });
2251 }
2252
2253 Ok(dependent_events)
2254 }
2255
2256 async fn upsert_thread_subscriptions(
2257 &self,
2258 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
2259 ) -> Result<(), Self::Error> {
2260 let values: Vec<_> = updates
2261 .into_iter()
2262 .map(|(room_id, thread_id, subscription)| {
2263 (
2264 self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id),
2265 self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id),
2266 subscription.status.as_str(),
2267 subscription.bump_stamp,
2268 )
2269 })
2270 .collect();
2271
2272 self.write()
2273 .await
2274 .with_transaction(move |txn| {
2275 let mut txn = txn.prepare_cached(
2276 "INSERT INTO thread_subscriptions (room_id, event_id, status, bump_stamp)
2277 VALUES (?, ?, ?, ?)
2278 ON CONFLICT (room_id, event_id) DO UPDATE
2279 SET
2280 status =
2281 CASE
2282 WHEN thread_subscriptions.bump_stamp IS NULL THEN EXCLUDED.status
2283 WHEN EXCLUDED.bump_stamp IS NULL THEN EXCLUDED.status
2284 WHEN thread_subscriptions.bump_stamp < EXCLUDED.bump_stamp THEN EXCLUDED.status
2285 ELSE thread_subscriptions.status
2286 END,
2287 bump_stamp =
2288 CASE
2289 WHEN thread_subscriptions.bump_stamp IS NULL THEN EXCLUDED.bump_stamp
2290 WHEN EXCLUDED.bump_stamp IS NULL THEN thread_subscriptions.bump_stamp
2291 WHEN thread_subscriptions.bump_stamp < EXCLUDED.bump_stamp THEN EXCLUDED.bump_stamp
2292 ELSE thread_subscriptions.bump_stamp
2293 END",
2294 )?;
2295
2296 for value in values {
2297 txn.execute(value)?;
2298 }
2299
2300 Result::<_, Error>::Ok(())
2301 })
2302 .await?;
2303
2304 Ok(())
2305 }
2306
2307 async fn load_thread_subscription(
2308 &self,
2309 room_id: &RoomId,
2310 thread_id: &EventId,
2311 ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
2312 let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2313 let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2314
2315 Ok(self
2316 .read()
2317 .await?
2318 .query_row(
2319 "SELECT status, bump_stamp FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2320 (room_id, thread_id),
2321 |row| Ok((row.get::<_, String>(0)?, row.get::<_, Option<u64>>(1)?))
2322 )
2323 .await
2324 .optional()?
2325 .map(|(status, bump_stamp)| -> Result<_, Self::Error> {
2326 let status = ThreadSubscriptionStatus::from_str(&status).map_err(|_| {
2327 Error::InvalidData { details: format!("Invalid thread status: {status}") }
2328 })?;
2329 Ok(StoredThreadSubscription { status, bump_stamp })
2330 })
2331 .transpose()?)
2332 }
2333
2334 async fn remove_thread_subscription(
2335 &self,
2336 room_id: &RoomId,
2337 thread_id: &EventId,
2338 ) -> Result<(), Self::Error> {
2339 let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2340 let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2341
2342 self.write()
2343 .await
2344 .execute(
2345 "DELETE FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2346 (room_id, thread_id),
2347 )
2348 .await?;
2349
2350 Ok(())
2351 }
2352
2353 async fn optimize(&self) -> Result<(), Self::Error> {
2354 Ok(self.vacuum().await?)
2355 }
2356
2357 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
2358 self.get_db_size().await
2359 }
2360}
2361
2362#[derive(Debug, Clone, Serialize, Deserialize)]
2363struct ReceiptData {
2364 receipt: Receipt,
2365 event_id: OwnedEventId,
2366 user_id: OwnedUserId,
2367}
2368
2369#[cfg(test)]
2370mod tests {
2371 use std::sync::{
2372 LazyLock,
2373 atomic::{AtomicU32, Ordering::SeqCst},
2374 };
2375
2376 use matrix_sdk_base::{StateStore, StoreError, statestore_integration_tests};
2377 use tempfile::{TempDir, tempdir};
2378
2379 use super::SqliteStateStore;
2380
2381 static TMP_DIR: LazyLock<TempDir> = LazyLock::new(|| tempdir().unwrap());
2382 static NUM: AtomicU32 = AtomicU32::new(0);
2383
2384 async fn get_store() -> Result<impl StateStore, StoreError> {
2385 let name = NUM.fetch_add(1, SeqCst).to_string();
2386 let tmpdir_path = TMP_DIR.path().join(name);
2387
2388 tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2389
2390 Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
2391 }
2392
2393 statestore_integration_tests!();
2394}
2395
2396#[cfg(test)]
2397mod encrypted_tests {
2398 use std::{
2399 path::PathBuf,
2400 sync::{
2401 LazyLock,
2402 atomic::{AtomicU32, Ordering::SeqCst},
2403 },
2404 };
2405
2406 use matrix_sdk_base::{StateStore, StoreError, statestore_integration_tests};
2407 use matrix_sdk_test::async_test;
2408 use tempfile::{TempDir, tempdir};
2409
2410 use super::SqliteStateStore;
2411 use crate::{SqliteStoreConfig, utils::SqliteAsyncConnExt};
2412
2413 static TMP_DIR: LazyLock<TempDir> = LazyLock::new(|| tempdir().unwrap());
2414 static NUM: AtomicU32 = AtomicU32::new(0);
2415
2416 fn new_state_store_workspace() -> PathBuf {
2417 let name = NUM.fetch_add(1, SeqCst).to_string();
2418 TMP_DIR.path().join(name)
2419 }
2420
2421 async fn get_store() -> Result<impl StateStore, StoreError> {
2422 let tmpdir_path = new_state_store_workspace();
2423
2424 tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2425
2426 Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
2427 .await
2428 .unwrap())
2429 }
2430
2431 #[async_test]
2432 async fn test_pool_size() {
2433 let tmpdir_path = new_state_store_workspace();
2434 let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
2435
2436 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2437
2438 assert_eq!(store.pool.status().max_size, 42);
2439 }
2440
2441 #[async_test]
2442 async fn test_cache_size() {
2443 let tmpdir_path = new_state_store_workspace();
2444 let store_open_config = SqliteStoreConfig::new(tmpdir_path).cache_size(1500);
2445
2446 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2447
2448 let conn = store.pool.get().await.unwrap();
2449 let cache_size =
2450 conn.query_row("PRAGMA cache_size", (), |row| row.get::<_, i32>(0)).await.unwrap();
2451
2452 assert_eq!(cache_size, -(1500 / 1024));
2456 }
2457
2458 #[async_test]
2459 async fn test_journal_size_limit() {
2460 let tmpdir_path = new_state_store_workspace();
2461 let store_open_config = SqliteStoreConfig::new(tmpdir_path).journal_size_limit(1500);
2462
2463 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2464
2465 let conn = store.pool.get().await.unwrap();
2466 let journal_size_limit = conn
2467 .query_row("PRAGMA journal_size_limit", (), |row| row.get::<_, u32>(0))
2468 .await
2469 .unwrap();
2470
2471 assert_eq!(journal_size_limit, 1500);
2474 }
2475
2476 statestore_integration_tests!();
2477}
2478
2479#[cfg(test)]
2480mod migration_tests {
2481 use std::{
2482 path::{Path, PathBuf},
2483 sync::{
2484 Arc, LazyLock,
2485 atomic::{AtomicU32, Ordering::SeqCst},
2486 },
2487 };
2488
2489 use as_variant::as_variant;
2490 use matrix_sdk_base::{
2491 RoomState, StateStore,
2492 media::{MediaFormat, MediaRequestParameters},
2493 store::{
2494 ChildTransactionId, DependentQueuedRequestKind, RoomLoadSettings,
2495 SerializableEventContent,
2496 },
2497 sync::UnreadNotificationsCount,
2498 };
2499 use matrix_sdk_test::async_test;
2500 use ruma::{
2501 EventId, MilliSecondsSinceUnixEpoch, OwnedTransactionId, RoomId, TransactionId, UserId,
2502 events::{
2503 StateEventType,
2504 room::{MediaSource, create::RoomCreateEventContent, message::RoomMessageEventContent},
2505 },
2506 room_id, server_name, user_id,
2507 };
2508 use rusqlite::Transaction;
2509 use serde::{Deserialize, Serialize};
2510 use serde_json::json;
2511 use tempfile::{TempDir, tempdir};
2512 use tokio::{fs, sync::Mutex};
2513 use zeroize::Zeroizing;
2514
2515 use super::{DATABASE_NAME, SqliteStateStore, init, keys};
2516 use crate::{
2517 OpenStoreError, Secret, SqliteStoreConfig,
2518 error::{Error, Result},
2519 utils::{EncryptableStore as _, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
2520 };
2521
2522 static TMP_DIR: LazyLock<TempDir> = LazyLock::new(|| tempdir().unwrap());
2523 static NUM: AtomicU32 = AtomicU32::new(0);
2524 const SECRET: &str = "secret";
2525
2526 fn new_path() -> PathBuf {
2527 let name = NUM.fetch_add(1, SeqCst).to_string();
2528 TMP_DIR.path().join(name)
2529 }
2530
2531 async fn create_fake_db(path: &Path, version: u8) -> Result<SqliteStateStore> {
2532 let config = SqliteStoreConfig::new(path);
2533
2534 fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir).unwrap();
2535
2536 let pool = config.build_pool_of_connections(DATABASE_NAME).unwrap();
2537 let conn = pool.get().await?;
2538
2539 init(&conn).await?;
2540
2541 let store_cipher = Some(Arc::new(
2542 conn.get_or_create_store_cipher(Secret::PassPhrase(Zeroizing::new(SECRET.to_owned())))
2543 .await
2544 .unwrap(),
2545 ));
2546 let this = SqliteStateStore {
2547 store_cipher,
2548 pool,
2549 write_connection: Arc::new(Mutex::new(conn)),
2551 };
2552 this.run_migrations(1, Some(version)).await?;
2553
2554 Ok(this)
2555 }
2556
2557 fn room_info_v1_json(
2558 room_id: &RoomId,
2559 state: RoomState,
2560 name: Option<&str>,
2561 creator: Option<&UserId>,
2562 ) -> serde_json::Value {
2563 let name_content = match name {
2565 Some(name) => json!({ "name": name }),
2566 None => json!({ "name": null }),
2567 };
2568 let create_content = match creator {
2570 Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2571 None => RoomCreateEventContent::new_v11(),
2572 };
2573
2574 json!({
2575 "room_id": room_id,
2576 "room_type": state,
2577 "notification_counts": UnreadNotificationsCount::default(),
2578 "summary": {
2579 "heroes": [],
2580 "joined_member_count": 0,
2581 "invited_member_count": 0,
2582 },
2583 "members_synced": false,
2584 "base_info": {
2585 "dm_targets": [],
2586 "max_power_level": 100,
2587 "name": {
2588 "Original": {
2589 "content": name_content,
2590 },
2591 },
2592 "create": {
2593 "Original": {
2594 "content": create_content,
2595 }
2596 }
2597 },
2598 })
2599 }
2600
2601 #[async_test]
2602 pub async fn test_migrating_v1_to_v2() {
2603 let path = new_path();
2604 {
2606 let db = create_fake_db(&path, 1).await.unwrap();
2607 let conn = db.pool.get().await.unwrap();
2608
2609 let this = db.clone();
2610 conn.with_transaction(move |txn| {
2611 for i in 0..5 {
2612 let room_id = RoomId::parse(format!("!room_{i}:localhost")).unwrap();
2613 let (state, stripped) =
2614 if i < 3 { (RoomState::Joined, false) } else { (RoomState::Invited, true) };
2615 let info = room_info_v1_json(&room_id, state, None, None);
2616
2617 let room_id = this.encode_key(keys::ROOM_INFO, room_id);
2618 let data = this.serialize_json(&info)?;
2619
2620 txn.prepare_cached(
2621 "INSERT INTO room_info (room_id, stripped, data)
2622 VALUES (?, ?, ?)",
2623 )?
2624 .execute((room_id, stripped, data))?;
2625 }
2626
2627 Result::<_, Error>::Ok(())
2628 })
2629 .await
2630 .unwrap();
2631 }
2632
2633 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2635
2636 assert_eq!(store.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 5);
2638 }
2639
2640 fn add_room_v2(
2642 this: &SqliteStateStore,
2643 txn: &Transaction<'_>,
2644 room_id: &RoomId,
2645 name: Option<&str>,
2646 create_creator: Option<&UserId>,
2647 create_sender: Option<&UserId>,
2648 ) -> Result<(), Error> {
2649 let room_info_json = room_info_v1_json(room_id, RoomState::Joined, name, create_creator);
2650
2651 let encoded_room_id = this.encode_key(keys::ROOM_INFO, room_id);
2652 let encoded_state =
2653 this.encode_key(keys::ROOM_INFO, serde_json::to_string(&RoomState::Joined)?);
2654 let data = this.serialize_json(&room_info_json)?;
2655
2656 txn.prepare_cached(
2657 "INSERT INTO room_info (room_id, state, data)
2658 VALUES (?, ?, ?)",
2659 )?
2660 .execute((encoded_room_id, encoded_state, data))?;
2661
2662 let Some(create_sender) = create_sender else {
2664 return Ok(());
2665 };
2666
2667 let create_content = match create_creator {
2668 Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2669 None => RoomCreateEventContent::new_v11(),
2670 };
2671
2672 let event_id = EventId::new_v1(server_name!("dummy.local"));
2673 let create_event = json!({
2674 "content": create_content,
2675 "event_id": event_id,
2676 "sender": create_sender.to_owned(),
2677 "origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
2678 "state_key": "",
2679 "type": "m.room.create",
2680 "unsigned": {},
2681 });
2682
2683 let encoded_room_id = this.encode_key(keys::STATE_EVENT, room_id);
2684 let encoded_event_type =
2685 this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
2686 let encoded_state_key = this.encode_key(keys::STATE_EVENT, "");
2687 let stripped = false;
2688 let encoded_event_id = this.encode_key(keys::STATE_EVENT, event_id);
2689 let data = this.serialize_json(&create_event)?;
2690
2691 txn.prepare_cached(
2692 "INSERT
2693 INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
2694 VALUES (?, ?, ?, ?, ?, ?)",
2695 )?
2696 .execute((
2697 encoded_room_id,
2698 encoded_event_type,
2699 encoded_state_key,
2700 stripped,
2701 encoded_event_id,
2702 data,
2703 ))?;
2704
2705 Ok(())
2706 }
2707
2708 #[async_test]
2709 pub async fn test_migrating_v2_to_v3() {
2710 let path = new_path();
2711
2712 let room_a_id = room_id!("!room_a:dummy.local");
2714 let room_a_name = "Room A";
2715 let room_a_creator = user_id!("@creator:dummy.local");
2716 let room_a_create_sender = user_id!("@sender:dummy.local");
2719
2720 let room_b_id = room_id!("!room_b:dummy.local");
2722
2723 let room_c_id = room_id!("!room_c:dummy.local");
2725 let room_c_create_sender = user_id!("@creator:dummy.local");
2726
2727 {
2729 let db = create_fake_db(&path, 2).await.unwrap();
2730 let conn = db.pool.get().await.unwrap();
2731
2732 let this = db.clone();
2733 conn.with_transaction(move |txn| {
2734 add_room_v2(
2735 &this,
2736 txn,
2737 room_a_id,
2738 Some(room_a_name),
2739 Some(room_a_creator),
2740 Some(room_a_create_sender),
2741 )?;
2742 add_room_v2(&this, txn, room_b_id, None, None, None)?;
2743 add_room_v2(&this, txn, room_c_id, None, None, Some(room_c_create_sender))?;
2744
2745 Result::<_, Error>::Ok(())
2746 })
2747 .await
2748 .unwrap();
2749 }
2750
2751 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2753
2754 let room_infos = store.get_room_infos(&RoomLoadSettings::default()).await.unwrap();
2756 assert_eq!(room_infos.len(), 3);
2757
2758 let room_a = room_infos.iter().find(|r| r.room_id() == room_a_id).unwrap();
2759 assert_eq!(room_a.name(), Some(room_a_name));
2760 assert_eq!(room_a.creators(), Some(vec![room_a_create_sender.to_owned()]));
2761
2762 let room_b = room_infos.iter().find(|r| r.room_id() == room_b_id).unwrap();
2763 assert_eq!(room_b.name(), None);
2764 assert_eq!(room_b.creators(), None);
2765
2766 let room_c = room_infos.iter().find(|r| r.room_id() == room_c_id).unwrap();
2767 assert_eq!(room_c.name(), None);
2768 assert_eq!(room_c.creators(), Some(vec![room_c_create_sender.to_owned()]));
2769 }
2770
2771 #[async_test]
2772 pub async fn test_migrating_v7_to_v9() {
2773 let path = new_path();
2774
2775 let room_id = room_id!("!room_a:dummy.local");
2776 let wedged_event_transaction_id = TransactionId::new();
2777 let local_event_transaction_id = TransactionId::new();
2778
2779 {
2781 let db = create_fake_db(&path, 7).await.unwrap();
2782 let conn = db.pool.get().await.unwrap();
2783
2784 let wedge_tx = wedged_event_transaction_id.clone();
2785 let local_tx = local_event_transaction_id.clone();
2786
2787 conn.with_transaction(move |txn| {
2788 add_dependent_send_queue_event_v7(
2789 &db,
2790 txn,
2791 room_id,
2792 &local_tx,
2793 ChildTransactionId::new(),
2794 DependentQueuedRequestKind::RedactEvent,
2795 )?;
2796 add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?;
2797 add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?;
2798 Result::<_, Error>::Ok(())
2799 })
2800 .await
2801 .unwrap();
2802 }
2803
2804 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2807
2808 let requests = store.load_send_queue_requests(room_id).await.unwrap();
2809 assert!(requests.is_empty());
2810
2811 let dependent_requests = store.load_dependent_queued_requests(room_id).await.unwrap();
2812 assert!(dependent_requests.is_empty());
2813 }
2814
2815 fn add_send_queue_event_v7(
2816 this: &SqliteStateStore,
2817 txn: &Transaction<'_>,
2818 transaction_id: &TransactionId,
2819 room_id: &RoomId,
2820 is_wedged: bool,
2821 ) -> Result<(), Error> {
2822 let content =
2823 SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?;
2824
2825 let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id);
2826 let room_id_value = this.serialize_value(&room_id.to_owned())?;
2827
2828 let content = this.serialize_json(&content)?;
2829
2830 txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
2831 .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
2832
2833 Ok(())
2834 }
2835
2836 fn add_dependent_send_queue_event_v7(
2837 this: &SqliteStateStore,
2838 txn: &Transaction<'_>,
2839 room_id: &RoomId,
2840 parent_txn_id: &TransactionId,
2841 own_txn_id: ChildTransactionId,
2842 content: DependentQueuedRequestKind,
2843 ) -> Result<(), Error> {
2844 let room_id_value = this.serialize_value(&room_id.to_owned())?;
2845
2846 let parent_txn_id = parent_txn_id.to_string();
2847 let own_txn_id = own_txn_id.to_string();
2848 let content = this.serialize_json(&content)?;
2849
2850 txn.prepare_cached(
2851 "INSERT INTO dependent_send_queue_events
2852 (room_id, parent_transaction_id, own_transaction_id, content)
2853 VALUES (?, ?, ?, ?)",
2854 )?
2855 .execute((room_id_value, parent_txn_id, own_txn_id, content))?;
2856
2857 Ok(())
2858 }
2859
2860 #[derive(Clone, Debug, Serialize, Deserialize)]
2861 pub enum LegacyDependentQueuedRequestKind {
2862 UploadFileWithThumbnail {
2863 content_type: String,
2864 cache_key: MediaRequestParameters,
2865 related_to: OwnedTransactionId,
2866 },
2867 }
2868
2869 #[async_test]
2870 pub async fn test_dependent_queued_request_variant_renaming() {
2871 let path = new_path();
2872 let db = create_fake_db(&path, 7).await.unwrap();
2873
2874 let cache_key = MediaRequestParameters {
2875 format: MediaFormat::File,
2876 source: MediaSource::Plain("https://server.local/foobar".into()),
2877 };
2878 let related_to = TransactionId::new();
2879 let request = LegacyDependentQueuedRequestKind::UploadFileWithThumbnail {
2880 content_type: "image/png".to_owned(),
2881 cache_key,
2882 related_to: related_to.clone(),
2883 };
2884
2885 let data = db
2886 .serialize_json(&request)
2887 .expect("should be able to serialize legacy dependent request");
2888 let deserialized: DependentQueuedRequestKind = db.deserialize_json(&data).expect(
2889 "should be able to deserialize dependent request from legacy dependent request",
2890 );
2891
2892 as_variant!(deserialized, DependentQueuedRequestKind::UploadFileOrThumbnail { related_to: de_related_to, .. } => {
2893 assert_eq!(de_related_to, related_to);
2894 });
2895 }
2896}