1use std::{
2 borrow::Cow,
3 collections::{BTreeMap, BTreeSet, HashMap},
4 fmt, iter,
5 path::Path,
6 str::FromStr as _,
7 sync::Arc,
8};
9
10use async_trait::async_trait;
11use matrix_sdk_base::{
12 MinimalRoomMemberEvent, ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK, RoomInfo,
13 RoomMemberships, RoomState, StateChanges, StateStore, StateStoreDataKey, StateStoreDataValue,
14 deserialized_responses::{DisplayName, RawAnySyncOrStrippedState, SyncOrStrippedState},
15 store::{
16 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError,
17 QueuedRequest, QueuedRequestKind, RoomLoadSettings, SentRequestKey,
18 StoredThreadSubscription, ThreadSubscriptionStatus, migration_helpers::RoomInfoV1,
19 },
20};
21use matrix_sdk_store_encryption::StoreCipher;
22use ruma::{
23 CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
24 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
25 canonical_json::{RedactedBecause, redact},
26 events::{
27 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
28 GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
29 presence::PresenceEvent,
30 receipt::{Receipt, ReceiptThread, ReceiptType},
31 room::{
32 create::RoomCreateEventContent,
33 member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
34 },
35 },
36 serde::Raw,
37};
38use rusqlite::{OptionalExtension, Transaction};
39use serde::{Deserialize, Serialize};
40use tokio::{
41 fs,
42 sync::{Mutex, OwnedMutexGuard},
43};
44use tracing::{debug, instrument, warn};
45
46use crate::{
47 OpenStoreError, Secret, SqliteStoreConfig,
48 connection::{Connection as SqliteAsyncConn, Pool as SqlitePool},
49 error::{Error, Result},
50 utils::{
51 EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
52 SqliteKeyValueStoreConnExt, repeat_vars,
53 },
54};
55
56mod keys {
57 pub const KV_BLOB: &str = "kv_blob";
59 pub const ROOM_INFO: &str = "room_info";
60 pub const STATE_EVENT: &str = "state_event";
61 pub const GLOBAL_ACCOUNT_DATA: &str = "global_account_data";
62 pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
63 pub const MEMBER: &str = "member";
64 pub const PROFILE: &str = "profile";
65 pub const RECEIPT: &str = "receipt";
66 pub const DISPLAY_NAME: &str = "display_name";
67 pub const SEND_QUEUE: &str = "send_queue_events";
68 pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
69 pub const THREAD_SUBSCRIPTIONS: &str = "thread_subscriptions";
70}
71
72pub const DATABASE_NAME: &str = "matrix-sdk-state.sqlite3";
74
75#[derive(Clone)]
77pub struct SqliteStateStore {
78 store_cipher: Option<Arc<StoreCipher>>,
79
80 pool: SqlitePool,
82
83 write_connection: Arc<Mutex<SqliteAsyncConn>>,
88}
89
90#[cfg(not(tarpaulin_include))]
91impl fmt::Debug for SqliteStateStore {
92 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93 f.debug_struct("SqliteStateStore").finish_non_exhaustive()
94 }
95}
96
97impl SqliteStateStore {
98 pub async fn open(
101 path: impl AsRef<Path>,
102 passphrase: Option<&str>,
103 ) -> Result<Self, OpenStoreError> {
104 Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
105 }
106
107 pub async fn open_with_key(
110 path: impl AsRef<Path>,
111 key: Option<&[u8; 32]>,
112 ) -> Result<Self, OpenStoreError> {
113 Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
114 }
115
116 pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
118 fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir)?;
119
120 let pool = config.build_pool_of_connections(DATABASE_NAME)?;
121
122 let this = Self::open_with_pool(pool, config.secret).await?;
123 this.pool.get().await?.apply_runtime_config(config.runtime_config).await?;
124
125 Ok(this)
126 }
127
128 pub async fn open_with_pool(
131 pool: SqlitePool,
132 secret: Option<Secret>,
133 ) -> Result<Self, OpenStoreError> {
134 let conn = pool.get().await?;
135
136 let mut version = conn.db_version().await?;
137
138 if version == 0 {
139 init(&conn).await?;
140 version = 1;
141 }
142
143 let store_cipher = match secret {
144 Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
145 None => None,
146 };
147 let this = Self {
148 store_cipher,
149 pool,
150 write_connection: Arc::new(Mutex::new(conn)),
152 };
153 this.run_migrations(version, None).await?;
154
155 this.read().await?.wal_checkpoint().await;
156
157 Ok(this)
158 }
159
160 async fn run_migrations(&self, from: u8, to: Option<u8>) -> Result<()> {
165 if to == Some(1) {
166 return Ok(());
167 }
168
169 let conn = self.write().await;
170
171 if from < 2 {
172 debug!("Upgrading database to version 2");
173 let this = self.clone();
174 conn.with_transaction(move |txn| {
175 txn.execute_batch(include_str!(
177 "../migrations/state_store/002_a_create_new_room_info.sql"
178 ))?;
179
180 for data in txn
182 .prepare("SELECT data FROM room_info")?
183 .query_map((), |row| row.get::<_, Vec<u8>>(0))?
184 {
185 let data = data?;
186 let room_info: RoomInfoV1 = this.deserialize_json(&data)?;
187
188 let room_id = this.encode_key(keys::ROOM_INFO, room_info.room_id());
189 let state = this
190 .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
191 txn.prepare_cached(
192 "INSERT OR REPLACE INTO new_room_info (room_id, state, data)
193 VALUES (?, ?, ?)",
194 )?
195 .execute((room_id, state, data))?;
196 }
197
198 txn.execute_batch(include_str!(
200 "../migrations/state_store/002_b_replace_room_info.sql"
201 ))?;
202
203 txn.set_db_version(2)?;
204 Result::<_, Error>::Ok(())
205 })
206 .await?;
207 }
208
209 if to == Some(2) {
210 return Ok(());
211 }
212
213 if from < 3 {
215 debug!("Upgrading database to version 3");
216 let this = self.clone();
217 conn.with_transaction(move |txn| {
218 for data in txn
220 .prepare("SELECT data FROM room_info")?
221 .query_map((), |row| row.get::<_, Vec<u8>>(0))?
222 {
223 let data = data?;
224 let room_info_v1: RoomInfoV1 = this.deserialize_json(&data)?;
225
226 let room_id = this.encode_key(keys::STATE_EVENT, room_info_v1.room_id());
228 let event_type =
229 this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
230 let create_res = txn
231 .prepare(
232 "SELECT stripped, data FROM state_event
233 WHERE room_id = ? AND event_type = ?",
234 )?
235 .query_row([room_id, event_type], |row| {
236 Ok((row.get::<_, bool>(0)?, row.get::<_, Vec<u8>>(1)?))
237 })
238 .optional()?;
239
240 let create = create_res.and_then(|(stripped, data)| {
241 let create = if stripped {
242 SyncOrStrippedState::<RoomCreateEventContent>::Stripped(
243 this.deserialize_json(&data).ok()?,
244 )
245 } else {
246 SyncOrStrippedState::Sync(this.deserialize_json(&data).ok()?)
247 };
248 Some(create)
249 });
250
251 let migrated_room_info = room_info_v1.migrate(create.as_ref());
252
253 let data = this.serialize_json(&migrated_room_info)?;
254 let room_id = this.encode_key(keys::ROOM_INFO, migrated_room_info.room_id());
255 txn.prepare_cached("UPDATE room_info SET data = ? WHERE room_id = ?")?
256 .execute((data, room_id))?;
257 }
258
259 txn.set_db_version(3)?;
260 Result::<_, Error>::Ok(())
261 })
262 .await?;
263 }
264
265 if to == Some(3) {
266 return Ok(());
267 }
268
269 if from < 4 {
270 debug!("Upgrading database to version 4");
271 conn.with_transaction(move |txn| {
272 txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
274 txn.set_db_version(4)
275 })
276 .await?;
277 }
278
279 if to == Some(4) {
280 return Ok(());
281 }
282
283 if from < 5 {
284 debug!("Upgrading database to version 5");
285 conn.with_transaction(move |txn| {
286 txn.execute_batch(include_str!(
288 "../migrations/state_store/004_send_queue_with_roomid_value.sql"
289 ))?;
290 txn.set_db_version(4)
291 })
292 .await?;
293 }
294
295 if to == Some(5) {
296 return Ok(());
297 }
298
299 if from < 6 {
300 debug!("Upgrading database to version 6");
301 conn.with_transaction(move |txn| {
302 txn.execute_batch(include_str!(
304 "../migrations/state_store/005_send_queue_dependent_events.sql"
305 ))?;
306 txn.set_db_version(6)
307 })
308 .await?;
309 }
310
311 if to == Some(6) {
312 return Ok(());
313 }
314
315 if from < 7 {
316 debug!("Upgrading database to version 7");
317 conn.with_transaction(move |txn| {
318 txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
320 txn.set_db_version(7)
321 })
322 .await?;
323 }
324
325 if to == Some(7) {
326 return Ok(());
327 }
328
329 if from < 8 {
330 debug!("Upgrading database to version 8");
331 let error = QueueWedgeError::GenericApiError {
333 msg: "local echo failed to send in a previous session".into(),
334 };
335 let default_err = self.serialize_value(&error)?;
336
337 conn.with_transaction(move |txn| {
338 txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?;
340
341 for wedged_entries in txn
344 .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")?
345 .query_map((), |row| {
346 Ok(
347 (row.get::<_, Vec<u8>>(0)?,row.get::<_, String>(1)?)
348 )
349 })? {
350
351 let (room_id, transaction_id) = wedged_entries?;
352
353 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?
354 .execute((default_err.clone(), room_id, transaction_id))?;
355 }
356
357
358 txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?;
360
361 txn.set_db_version(8)
362 })
363 .await?;
364 }
365
366 if to == Some(8) {
367 return Ok(());
368 }
369
370 if from < 9 {
371 debug!("Upgrading database to version 9");
372 conn.with_transaction(move |txn| {
373 txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?;
375 txn.set_db_version(9)
376 })
377 .await?;
378 }
379
380 if to == Some(9) {
381 return Ok(());
382 }
383
384 if from < 10 {
385 debug!("Upgrading database to version 10");
386 conn.with_transaction(move |txn| {
387 txn.execute_batch(include_str!(
389 "../migrations/state_store/009_send_queue_priority.sql"
390 ))?;
391 txn.set_db_version(10)
392 })
393 .await?;
394 }
395
396 if to == Some(10) {
397 return Ok(());
398 }
399
400 if from < 11 {
401 debug!("Upgrading database to version 11");
402 conn.with_transaction(move |txn| {
403 txn.execute_batch(include_str!(
405 "../migrations/state_store/010_send_queue_enqueue_time.sql"
406 ))?;
407 txn.set_db_version(11)
408 })
409 .await?;
410 }
411
412 if to == Some(11) {
413 return Ok(());
414 }
415
416 if from < 12 {
417 debug!("Upgrading database to version 12");
418 conn.vacuum().await?;
422 conn.set_kv("version", vec![12]).await?;
423 }
424
425 if to == Some(12) {
426 return Ok(());
427 }
428
429 if from < 13 {
430 debug!("Upgrading database to version 13");
431 conn.with_transaction(move |txn| {
432 txn.execute_batch(include_str!(
434 "../migrations/state_store/011_thread_subscriptions.sql"
435 ))?;
436 txn.set_db_version(13)
437 })
438 .await?;
439 }
440
441 if to == Some(13) {
442 return Ok(());
443 }
444
445 if from < 14 {
446 debug!("Upgrading database to version 14");
447 conn.with_transaction(move |txn| {
448 txn.execute_batch(include_str!(
450 "../migrations/state_store/012_thread_subscriptions_bumpstamp.sql"
451 ))?;
452 txn.set_db_version(14)
453 })
454 .await?;
455 }
456
457 if to == Some(14) {
458 return Ok(());
459 }
460
461 if from < 15 {
462 debug!("Upgrading database to version 15");
463 conn.with_transaction(move |txn| {
464 txn.execute_batch(include_str!(
466 "../migrations/state_store/013_send_queue_new_parent_key_format.sql"
467 ))?;
468 txn.set_db_version(15)
469 })
470 .await?;
471 }
472
473 if to == Some(15) {
474 return Ok(());
475 }
476
477 Ok(())
478 }
479
480 fn encode_state_store_data_key(&self, key: StateStoreDataKey<'_>) -> Key {
481 let key_s = match key {
482 StateStoreDataKey::SyncToken => Cow::Borrowed(StateStoreDataKey::SYNC_TOKEN),
483 StateStoreDataKey::SupportedVersions => {
484 Cow::Borrowed(StateStoreDataKey::SUPPORTED_VERSIONS)
485 }
486 StateStoreDataKey::WellKnown => Cow::Borrowed(StateStoreDataKey::WELL_KNOWN),
487 StateStoreDataKey::Filter(f) => {
488 Cow::Owned(format!("{}:{f}", StateStoreDataKey::FILTER))
489 }
490 StateStoreDataKey::UserAvatarUrl(u) => {
491 Cow::Owned(format!("{}:{u}", StateStoreDataKey::USER_AVATAR_URL))
492 }
493 StateStoreDataKey::RecentlyVisitedRooms(b) => {
494 Cow::Owned(format!("{}:{b}", StateStoreDataKey::RECENTLY_VISITED_ROOMS))
495 }
496 StateStoreDataKey::UtdHookManagerData => {
497 Cow::Borrowed(StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
498 }
499 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
500 Cow::Borrowed(StateStoreDataKey::ONE_TIME_KEY_ALREADY_UPLOADED)
501 }
502 StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
503 if let Some(thread_root) = thread_root {
504 Cow::Owned(format!(
505 "{}:{room_id}:{thread_root}",
506 StateStoreDataKey::COMPOSER_DRAFT
507 ))
508 } else {
509 Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::COMPOSER_DRAFT))
510 }
511 }
512 StateStoreDataKey::SeenKnockRequests(room_id) => {
513 Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::SEEN_KNOCK_REQUESTS))
514 }
515 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
516 Cow::Borrowed(StateStoreDataKey::THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS)
517 }
518 StateStoreDataKey::HomeserverCapabilities => {
519 Cow::Borrowed(StateStoreDataKey::HOMESERVER_CAPABILITIES)
520 }
521 };
522
523 self.encode_key(keys::KV_BLOB, &*key_s)
524 }
525
526 fn encode_presence_key(&self, user_id: &UserId) -> Key {
527 self.encode_key(keys::KV_BLOB, format!("presence:{user_id}"))
528 }
529
530 fn encode_custom_key(&self, key: &[u8]) -> Key {
531 let mut full_key = b"custom:".to_vec();
532 full_key.extend(key);
533 self.encode_key(keys::KV_BLOB, full_key)
534 }
535
536 #[instrument(skip_all)]
538 async fn read(&self) -> Result<SqliteAsyncConn> {
539 Ok(self.pool.get().await?)
540 }
541
542 #[instrument(skip_all)]
544 async fn write(&self) -> OwnedMutexGuard<SqliteAsyncConn> {
545 self.write_connection.clone().lock_owned().await
546 }
547
548 fn remove_maybe_stripped_room_data(
549 &self,
550 txn: &Transaction<'_>,
551 room_id: &RoomId,
552 stripped: bool,
553 ) -> rusqlite::Result<()> {
554 let state_event_room_id = self.encode_key(keys::STATE_EVENT, room_id);
555 txn.remove_room_state_events(&state_event_room_id, Some(stripped))?;
556
557 let member_room_id = self.encode_key(keys::MEMBER, room_id);
558 txn.remove_room_members(&member_room_id, Some(stripped))
559 }
560
561 pub async fn vacuum(&self) -> Result<()> {
562 self.write_connection.lock().await.vacuum().await
563 }
564
565 pub async fn get_db_size(&self) -> Result<Option<usize>> {
566 let read_conn = self.pool.get().await?;
567 Ok(Some(read_conn.get_db_size().await?))
568 }
569}
570
571impl EncryptableStore for SqliteStateStore {
572 fn get_cypher(&self) -> Option<&StoreCipher> {
573 self.store_cipher.as_deref()
574 }
575}
576
577async fn init(conn: &SqliteAsyncConn) -> Result<()> {
579 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
582 conn.with_transaction(|txn| {
583 txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
584 txn.set_db_version(1)?;
585
586 Ok(())
587 })
588 .await
589}
590
591trait SqliteConnectionStateStoreExt {
592 fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()>;
593
594 fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()>;
595
596 fn set_room_account_data(
597 &self,
598 room_id: &[u8],
599 event_type: &[u8],
600 data: &[u8],
601 ) -> rusqlite::Result<()>;
602 fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()>;
603
604 fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()>;
605 fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>>;
606 fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()>;
607
608 fn set_state_event(
609 &self,
610 room_id: &[u8],
611 event_type: &[u8],
612 state_key: &[u8],
613 stripped: bool,
614 event_id: Option<&[u8]>,
615 data: &[u8],
616 ) -> rusqlite::Result<()>;
617 fn get_state_event_by_id(
618 &self,
619 room_id: &[u8],
620 event_id: &[u8],
621 ) -> rusqlite::Result<Option<Vec<u8>>>;
622 fn remove_room_state_events(
623 &self,
624 room_id: &[u8],
625 stripped: Option<bool>,
626 ) -> rusqlite::Result<()>;
627
628 fn set_member(
629 &self,
630 room_id: &[u8],
631 user_id: &[u8],
632 membership: &[u8],
633 stripped: bool,
634 data: &[u8],
635 ) -> rusqlite::Result<()>;
636 fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()>;
637
638 fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
639 fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()>;
640 fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()>;
641
642 fn set_receipt(
643 &self,
644 room_id: &[u8],
645 user_id: &[u8],
646 receipt_type: &[u8],
647 thread_id: &[u8],
648 event_id: &[u8],
649 data: &[u8],
650 ) -> rusqlite::Result<()>;
651 fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()>;
652
653 fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()>;
654 fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()>;
655 fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()>;
656 fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
657 fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
658}
659
660impl SqliteConnectionStateStoreExt for rusqlite::Connection {
661 fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()> {
662 self.execute("INSERT OR REPLACE INTO kv_blob VALUES (?, ?)", (key, value))?;
663 Ok(())
664 }
665
666 fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()> {
667 self.prepare_cached(
668 "INSERT OR REPLACE INTO global_account_data (event_type, data)
669 VALUES (?, ?)",
670 )?
671 .execute((event_type, data))?;
672 Ok(())
673 }
674
675 fn set_room_account_data(
676 &self,
677 room_id: &[u8],
678 event_type: &[u8],
679 data: &[u8],
680 ) -> rusqlite::Result<()> {
681 self.prepare_cached(
682 "INSERT OR REPLACE INTO room_account_data (room_id, event_type, data)
683 VALUES (?, ?, ?)",
684 )?
685 .execute((room_id, event_type, data))?;
686 Ok(())
687 }
688
689 fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()> {
690 self.prepare(
691 "DELETE FROM room_account_data
692 WHERE room_id = ?",
693 )?
694 .execute((room_id,))?;
695 Ok(())
696 }
697
698 fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()> {
699 self.prepare_cached(
700 "INSERT OR REPLACE INTO room_info (room_id, state, data)
701 VALUES (?, ?, ?)",
702 )?
703 .execute((room_id, state, data))?;
704 Ok(())
705 }
706
707 fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>> {
708 self.query_row("SELECT data FROM room_info WHERE room_id = ?", (room_id,), |row| row.get(0))
709 .optional()
710 }
711
712 fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()> {
714 self.prepare_cached("DELETE FROM room_info WHERE room_id = ?")?.execute((room_id,))?;
715 Ok(())
716 }
717
718 fn set_state_event(
719 &self,
720 room_id: &[u8],
721 event_type: &[u8],
722 state_key: &[u8],
723 stripped: bool,
724 event_id: Option<&[u8]>,
725 data: &[u8],
726 ) -> rusqlite::Result<()> {
727 self.prepare_cached(
728 "INSERT OR REPLACE
729 INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
730 VALUES (?, ?, ?, ?, ?, ?)",
731 )?
732 .execute((room_id, event_type, state_key, stripped, event_id, data))?;
733 Ok(())
734 }
735
736 fn get_state_event_by_id(
737 &self,
738 room_id: &[u8],
739 event_id: &[u8],
740 ) -> rusqlite::Result<Option<Vec<u8>>> {
741 self.query_row(
742 "SELECT data FROM state_event WHERE room_id = ? AND event_id = ?",
743 (room_id, event_id),
744 |row| row.get(0),
745 )
746 .optional()
747 }
748
749 fn remove_room_state_events(
755 &self,
756 room_id: &[u8],
757 stripped: Option<bool>,
758 ) -> rusqlite::Result<()> {
759 if let Some(stripped) = stripped {
760 self.prepare_cached("DELETE FROM state_event WHERE room_id = ? AND stripped = ?")?
761 .execute((room_id, stripped))?;
762 } else {
763 self.prepare_cached("DELETE FROM state_event WHERE room_id = ?")?
764 .execute((room_id,))?;
765 }
766 Ok(())
767 }
768
769 fn set_member(
770 &self,
771 room_id: &[u8],
772 user_id: &[u8],
773 membership: &[u8],
774 stripped: bool,
775 data: &[u8],
776 ) -> rusqlite::Result<()> {
777 self.prepare_cached(
778 "INSERT OR REPLACE
779 INTO member (room_id, user_id, membership, stripped, data)
780 VALUES (?, ?, ?, ?, ?)",
781 )?
782 .execute((room_id, user_id, membership, stripped, data))?;
783 Ok(())
784 }
785
786 fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()> {
791 if let Some(stripped) = stripped {
792 self.prepare_cached("DELETE FROM member WHERE room_id = ? AND stripped = ?")?
793 .execute((room_id, stripped))?;
794 } else {
795 self.prepare_cached("DELETE FROM member WHERE room_id = ?")?.execute((room_id,))?;
796 }
797 Ok(())
798 }
799
800 fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
801 self.prepare_cached(
802 "INSERT OR REPLACE
803 INTO profile (room_id, user_id, data)
804 VALUES (?, ?, ?)",
805 )?
806 .execute((room_id, user_id, data))?;
807 Ok(())
808 }
809
810 fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()> {
811 self.prepare("DELETE FROM profile WHERE room_id = ?")?.execute((room_id,))?;
812 Ok(())
813 }
814
815 fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()> {
816 self.prepare("DELETE FROM profile WHERE room_id = ? AND user_id = ?")?
817 .execute((room_id, user_id))?;
818 Ok(())
819 }
820
821 fn set_receipt(
822 &self,
823 room_id: &[u8],
824 user_id: &[u8],
825 receipt_type: &[u8],
826 thread: &[u8],
827 event_id: &[u8],
828 data: &[u8],
829 ) -> rusqlite::Result<()> {
830 self.prepare_cached(
831 "INSERT OR REPLACE
832 INTO receipt (room_id, user_id, receipt_type, thread, event_id, data)
833 VALUES (?, ?, ?, ?, ?, ?)",
834 )?
835 .execute((room_id, user_id, receipt_type, thread, event_id, data))?;
836 Ok(())
837 }
838
839 fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()> {
840 self.prepare("DELETE FROM receipt WHERE room_id = ?")?.execute((room_id,))?;
841 Ok(())
842 }
843
844 fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
845 self.prepare_cached(
846 "INSERT OR REPLACE
847 INTO display_name (room_id, name, data)
848 VALUES (?, ?, ?)",
849 )?
850 .execute((room_id, name, data))?;
851 Ok(())
852 }
853
854 fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()> {
855 self.prepare("DELETE FROM display_name WHERE room_id = ? AND name = ?")?
856 .execute((room_id, name))?;
857 Ok(())
858 }
859
860 fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()> {
861 self.prepare("DELETE FROM display_name WHERE room_id = ?")?.execute((room_id,))?;
862 Ok(())
863 }
864
865 fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
866 self.prepare("DELETE FROM send_queue_events WHERE room_id = ?")?.execute((room_id,))?;
867 Ok(())
868 }
869
870 fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
871 self.prepare("DELETE FROM dependent_send_queue_events WHERE room_id = ?")?
872 .execute((room_id,))?;
873 Ok(())
874 }
875}
876
877#[async_trait]
878trait SqliteObjectStateStoreExt: SqliteAsyncConnExt {
879 async fn get_kv_blob(&self, key: Key) -> Result<Option<Vec<u8>>> {
880 Ok(self
881 .query_row("SELECT value FROM kv_blob WHERE key = ?", (key,), |row| row.get(0))
882 .await
883 .optional()?)
884 }
885
886 async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
887 let keys_length = keys.len();
888
889 self.chunk_large_query_over(keys, Some(keys_length), |txn, keys| {
890 let sql_params = repeat_vars(keys.len());
891 let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");
892
893 let params = rusqlite::params_from_iter(keys);
894
895 Ok(txn
896 .prepare(&sql)?
897 .query(params)?
898 .mapped(|row| row.get(0))
899 .collect::<Result<_, _>>()?)
900 })
901 .await
902 }
903
904 async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;
905
906 async fn delete_kv_blob(&self, key: Key) -> Result<()> {
907 self.execute("DELETE FROM kv_blob WHERE key = ?", (key,)).await?;
908 Ok(())
909 }
910
911 async fn get_room_infos(&self, room_id: Option<Key>) -> Result<Vec<Vec<u8>>> {
912 Ok(match room_id {
913 None => {
914 self.prepare("SELECT data FROM room_info", move |mut stmt| {
915 stmt.query_map((), |row| row.get(0))?.collect()
916 })
917 .await?
918 }
919
920 Some(room_id) => {
921 self.prepare("SELECT data FROM room_info WHERE room_id = ?", move |mut stmt| {
922 stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
923 })
924 .await?
925 }
926 })
927 }
928
929 async fn get_maybe_stripped_state_events_for_keys(
930 &self,
931 room_id: Key,
932 event_type: Key,
933 state_keys: Vec<Key>,
934 ) -> Result<Vec<(bool, Vec<u8>)>> {
935 self.chunk_large_query_over(state_keys, None, move |txn, state_keys: Vec<Key>| {
936 let sql_params = repeat_vars(state_keys.len());
937 let sql = format!(
938 "SELECT stripped, data FROM state_event
939 WHERE room_id = ? AND event_type = ? AND state_key IN ({sql_params})"
940 );
941
942 let params = rusqlite::params_from_iter(
943 [room_id.clone(), event_type.clone()].into_iter().chain(state_keys),
944 );
945
946 Ok(txn
947 .prepare(&sql)?
948 .query(params)?
949 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
950 .collect::<Result<_, _>>()?)
951 })
952 .await
953 }
954
955 async fn get_maybe_stripped_state_events(
956 &self,
957 room_id: Key,
958 event_type: Key,
959 ) -> Result<Vec<(bool, Vec<u8>)>> {
960 Ok(self
961 .prepare(
962 "SELECT stripped, data FROM state_event
963 WHERE room_id = ? AND event_type = ?",
964 |mut stmt| {
965 stmt.query((room_id, event_type))?
966 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
967 .collect()
968 },
969 )
970 .await?)
971 }
972
973 async fn get_profiles(
974 &self,
975 room_id: Key,
976 user_ids: Vec<Key>,
977 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
978 let user_ids_length = user_ids.len();
979
980 self.chunk_large_query_over(user_ids, Some(user_ids_length), move |txn, user_ids| {
981 let sql_params = repeat_vars(user_ids.len());
982 let sql = format!(
983 "SELECT user_id, data FROM profile WHERE room_id = ? AND user_id IN ({sql_params})"
984 );
985
986 let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(user_ids));
987
988 Ok(txn
989 .prepare(&sql)?
990 .query(params)?
991 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
992 .collect::<Result<_, _>>()?)
993 })
994 .await
995 }
996
997 async fn get_user_ids(&self, room_id: Key, memberships: Vec<Key>) -> Result<Vec<Vec<u8>>> {
998 let res = if memberships.is_empty() {
999 self.prepare("SELECT data FROM member WHERE room_id = ?", |mut stmt| {
1000 stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
1001 })
1002 .await?
1003 } else {
1004 self.chunk_large_query_over(memberships, None, move |txn, memberships| {
1005 let sql_params = repeat_vars(memberships.len());
1006 let sql = format!(
1007 "SELECT data FROM member WHERE room_id = ? AND membership IN ({sql_params})"
1008 );
1009
1010 let params =
1011 rusqlite::params_from_iter(iter::once(room_id.clone()).chain(memberships));
1012
1013 Ok(txn
1014 .prepare(&sql)?
1015 .query(params)?
1016 .mapped(|row| row.get(0))
1017 .collect::<Result<_, _>>()?)
1018 })
1019 .await?
1020 };
1021
1022 Ok(res)
1023 }
1024
1025 async fn get_global_account_data(&self, event_type: Key) -> Result<Option<Vec<u8>>> {
1026 Ok(self
1027 .query_row(
1028 "SELECT data FROM global_account_data WHERE event_type = ?",
1029 (event_type,),
1030 |row| row.get(0),
1031 )
1032 .await
1033 .optional()?)
1034 }
1035
1036 async fn get_room_account_data(
1037 &self,
1038 room_id: Key,
1039 event_type: Key,
1040 ) -> Result<Option<Vec<u8>>> {
1041 Ok(self
1042 .query_row(
1043 "SELECT data FROM room_account_data WHERE room_id = ? AND event_type = ?",
1044 (room_id, event_type),
1045 |row| row.get(0),
1046 )
1047 .await
1048 .optional()?)
1049 }
1050
1051 async fn get_display_names(
1052 &self,
1053 room_id: Key,
1054 names: Vec<Key>,
1055 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1056 let names_length = names.len();
1057
1058 self.chunk_large_query_over(names, Some(names_length), move |txn, names| {
1059 let sql_params = repeat_vars(names.len());
1060 let sql = format!(
1061 "SELECT name, data FROM display_name WHERE room_id = ? AND name IN ({sql_params})"
1062 );
1063
1064 let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(names));
1065
1066 Ok(txn
1067 .prepare(&sql)?
1068 .query(params)?
1069 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
1070 .collect::<Result<_, _>>()?)
1071 })
1072 .await
1073 }
1074
1075 async fn get_user_receipt(
1076 &self,
1077 room_id: Key,
1078 receipt_type: Key,
1079 thread: Key,
1080 user_id: Key,
1081 ) -> Result<Option<Vec<u8>>> {
1082 Ok(self
1083 .query_row(
1084 "SELECT data FROM receipt
1085 WHERE room_id = ? AND receipt_type = ? AND thread = ? and user_id = ?",
1086 (room_id, receipt_type, thread, user_id),
1087 |row| row.get(0),
1088 )
1089 .await
1090 .optional()?)
1091 }
1092
1093 async fn get_event_receipts(
1094 &self,
1095 room_id: Key,
1096 receipt_type: Key,
1097 thread: Key,
1098 event_id: Key,
1099 ) -> Result<Vec<Vec<u8>>> {
1100 Ok(self
1101 .prepare(
1102 "SELECT data FROM receipt
1103 WHERE room_id = ? AND receipt_type = ? AND thread = ? and event_id = ?",
1104 |mut stmt| {
1105 stmt.query((room_id, receipt_type, thread, event_id))?
1106 .mapped(|row| row.get(0))
1107 .collect()
1108 },
1109 )
1110 .await?)
1111 }
1112}
1113
1114#[async_trait]
1115impl SqliteObjectStateStoreExt for SqliteAsyncConn {
1116 async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()> {
1117 Ok(self.interact(move |conn| conn.set_kv_blob(&key, &value)).await.unwrap()?)
1118 }
1119}
1120
1121#[async_trait]
1122impl StateStore for SqliteStateStore {
1123 type Error = Error;
1124
1125 async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
1126 self.read()
1127 .await?
1128 .get_kv_blob(self.encode_state_store_data_key(key))
1129 .await?
1130 .map(|data| {
1131 Ok(match key {
1132 StateStoreDataKey::SyncToken => {
1133 StateStoreDataValue::SyncToken(self.deserialize_value(&data)?)
1134 }
1135 StateStoreDataKey::SupportedVersions => {
1136 StateStoreDataValue::SupportedVersions(self.deserialize_value(&data)?)
1137 }
1138 StateStoreDataKey::WellKnown => {
1139 StateStoreDataValue::WellKnown(self.deserialize_value(&data)?)
1140 }
1141 StateStoreDataKey::Filter(_) => {
1142 StateStoreDataValue::Filter(self.deserialize_value(&data)?)
1143 }
1144 StateStoreDataKey::UserAvatarUrl(_) => {
1145 StateStoreDataValue::UserAvatarUrl(self.deserialize_value(&data)?)
1146 }
1147 StateStoreDataKey::RecentlyVisitedRooms(_) => {
1148 StateStoreDataValue::RecentlyVisitedRooms(self.deserialize_value(&data)?)
1149 }
1150 StateStoreDataKey::UtdHookManagerData => {
1151 StateStoreDataValue::UtdHookManagerData(self.deserialize_value(&data)?)
1152 }
1153 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1154 StateStoreDataValue::OneTimeKeyAlreadyUploaded
1155 }
1156 StateStoreDataKey::ComposerDraft(_, _) => {
1157 StateStoreDataValue::ComposerDraft(self.deserialize_value(&data)?)
1158 }
1159 StateStoreDataKey::SeenKnockRequests(_) => {
1160 StateStoreDataValue::SeenKnockRequests(self.deserialize_value(&data)?)
1161 }
1162 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
1163 StateStoreDataValue::ThreadSubscriptionsCatchupTokens(
1164 self.deserialize_value(&data)?,
1165 )
1166 }
1167 StateStoreDataKey::HomeserverCapabilities => {
1168 StateStoreDataValue::HomeserverCapabilities(self.deserialize_value(&data)?)
1169 }
1170 })
1171 })
1172 .transpose()
1173 }
1174
1175 async fn set_kv_data(
1176 &self,
1177 key: StateStoreDataKey<'_>,
1178 value: StateStoreDataValue,
1179 ) -> Result<()> {
1180 let serialized_value = match key {
1181 StateStoreDataKey::SyncToken => self.serialize_value(
1182 &value.into_sync_token().expect("Session data not a sync token"),
1183 )?,
1184 StateStoreDataKey::SupportedVersions => self.serialize_value(
1185 &value
1186 .into_supported_versions()
1187 .expect("Session data not containing supported versions"),
1188 )?,
1189 StateStoreDataKey::WellKnown => self.serialize_value(
1190 &value.into_well_known().expect("Session data not containing well-known"),
1191 )?,
1192 StateStoreDataKey::Filter(_) => {
1193 self.serialize_value(&value.into_filter().expect("Session data not a filter"))?
1194 }
1195 StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
1196 &value.into_user_avatar_url().expect("Session data not an user avatar url"),
1197 )?,
1198 StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
1199 &value.into_recently_visited_rooms().expect("Session data not breadcrumbs"),
1200 )?,
1201 StateStoreDataKey::UtdHookManagerData => self.serialize_value(
1202 &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
1203 )?,
1204 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1205 self.serialize_value(&true).expect("We should be able to serialize a boolean")
1206 }
1207 StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
1208 &value.into_composer_draft().expect("Session data not a composer draft"),
1209 )?,
1210 StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
1211 &value
1212 .into_seen_knock_requests()
1213 .expect("Session data is not a set of seen knock request ids"),
1214 )?,
1215 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => self.serialize_value(
1216 &value
1217 .into_thread_subscriptions_catchup_tokens()
1218 .expect("Session data is not a list of thread subscription catchup tokens"),
1219 )?,
1220 StateStoreDataKey::HomeserverCapabilities => self.serialize_value(
1221 &value
1222 .into_homeserver_capabilities()
1223 .expect("Session data is not the homeserver capabilities"),
1224 )?,
1225 };
1226
1227 self.write()
1228 .await
1229 .set_kv_blob(self.encode_state_store_data_key(key), serialized_value)
1230 .await
1231 }
1232
1233 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
1234 self.write().await.delete_kv_blob(self.encode_state_store_data_key(key)).await
1235 }
1236
1237 async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
1238 let changes = changes.to_owned();
1239 let this = self.clone();
1240 self.write()
1241 .await
1242 .with_transaction(move |txn| {
1243 let StateChanges {
1244 sync_token,
1245 account_data,
1246 presence,
1247 profiles,
1248 profiles_to_delete,
1249 state,
1250 room_account_data,
1251 room_infos,
1252 receipts,
1253 redactions,
1254 stripped_state,
1255 ambiguity_maps,
1256 } = changes;
1257
1258 if let Some(sync_token) = sync_token {
1259 let key = this.encode_state_store_data_key(StateStoreDataKey::SyncToken);
1260 let value = this.serialize_value(&sync_token)?;
1261 txn.set_kv_blob(&key, &value)?;
1262 }
1263
1264 for (event_type, event) in account_data {
1265 let event_type =
1266 this.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1267 let data = this.serialize_json(&event)?;
1268 txn.set_global_account_data(&event_type, &data)?;
1269 }
1270
1271 for (room_id, events) in room_account_data {
1272 let room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1273 for (event_type, event) in events {
1274 let event_type =
1275 this.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1276 let data = this.serialize_json(&event)?;
1277 txn.set_room_account_data(&room_id, &event_type, &data)?;
1278 }
1279 }
1280
1281 for (user_id, event) in presence {
1282 let key = this.encode_presence_key(&user_id);
1283 let value = this.serialize_json(&event)?;
1284 txn.set_kv_blob(&key, &value)?;
1285 }
1286
1287 for (room_id, room_info) in room_infos {
1288 let stripped = room_info.state() == RoomState::Invited;
1289 this.remove_maybe_stripped_room_data(txn, &room_id, !stripped)?;
1291
1292 let room_id = this.encode_key(keys::ROOM_INFO, room_id);
1293 let state = this
1294 .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
1295 let data = this.serialize_json(&room_info)?;
1296 txn.set_room_info(&room_id, &state, &data)?;
1297 }
1298
1299 for (room_id, user_ids) in profiles_to_delete {
1300 let room_id = this.encode_key(keys::PROFILE, room_id);
1301 for user_id in user_ids {
1302 let user_id = this.encode_key(keys::PROFILE, user_id);
1303 txn.remove_room_profile(&room_id, &user_id)?;
1304 }
1305 }
1306
1307 for (room_id, state_event_types) in state {
1308 let profiles = profiles.get(&room_id);
1309 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1310
1311 for (event_type, state_events) in state_event_types {
1312 let encoded_event_type =
1313 this.encode_key(keys::STATE_EVENT, event_type.to_string());
1314
1315 for (state_key, raw_state_event) in state_events {
1316 let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1317 let data = this.serialize_json(&raw_state_event)?;
1318
1319 let event_id: Option<String> =
1320 raw_state_event.get_field("event_id").ok().flatten();
1321 let encoded_event_id =
1322 event_id.as_ref().map(|e| this.encode_key(keys::STATE_EVENT, e));
1323
1324 txn.set_state_event(
1325 &encoded_room_id,
1326 &encoded_event_type,
1327 &encoded_state_key,
1328 false,
1329 encoded_event_id.as_deref(),
1330 &data,
1331 )?;
1332
1333 if event_type == StateEventType::RoomMember {
1334 let member_event = match raw_state_event
1335 .deserialize_as_unchecked::<SyncRoomMemberEvent>()
1336 {
1337 Ok(ev) => ev,
1338 Err(e) => {
1339 debug!(event_id, "Failed to deserialize member event: {e}");
1340 continue;
1341 }
1342 };
1343
1344 let encoded_room_id = this.encode_key(keys::MEMBER, &room_id);
1345 let user_id = this.encode_key(keys::MEMBER, &state_key);
1346 let membership = this
1347 .encode_key(keys::MEMBER, member_event.membership().as_str());
1348 let data = this.serialize_value(&state_key)?;
1349
1350 txn.set_member(
1351 &encoded_room_id,
1352 &user_id,
1353 &membership,
1354 false,
1355 &data,
1356 )?;
1357
1358 if let Some(profile) =
1359 profiles.and_then(|p| p.get(member_event.state_key()))
1360 {
1361 let room_id = this.encode_key(keys::PROFILE, &room_id);
1362 let user_id = this.encode_key(keys::PROFILE, &state_key);
1363 let data = this.serialize_json(&profile)?;
1364 txn.set_profile(&room_id, &user_id, &data)?;
1365 }
1366 }
1367 }
1368 }
1369 }
1370
1371 for (room_id, stripped_state_event_types) in stripped_state {
1372 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1373
1374 for (event_type, stripped_state_events) in stripped_state_event_types {
1375 let encoded_event_type =
1376 this.encode_key(keys::STATE_EVENT, event_type.to_string());
1377
1378 for (state_key, raw_stripped_state_event) in stripped_state_events {
1379 let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1380 let data = this.serialize_json(&raw_stripped_state_event)?;
1381 txn.set_state_event(
1382 &encoded_room_id,
1383 &encoded_event_type,
1384 &encoded_state_key,
1385 true,
1386 None,
1387 &data,
1388 )?;
1389
1390 if event_type == StateEventType::RoomMember {
1391 let member_event = match raw_stripped_state_event
1392 .deserialize_as_unchecked::<StrippedRoomMemberEvent>(
1393 ) {
1394 Ok(ev) => ev,
1395 Err(e) => {
1396 debug!("Failed to deserialize stripped member event: {e}");
1397 continue;
1398 }
1399 };
1400
1401 let room_id = this.encode_key(keys::MEMBER, &room_id);
1402 let user_id = this.encode_key(keys::MEMBER, &state_key);
1403 let membership = this.encode_key(
1404 keys::MEMBER,
1405 member_event.content.membership.as_str(),
1406 );
1407 let data = this.serialize_value(&state_key)?;
1408
1409 txn.set_member(&room_id, &user_id, &membership, true, &data)?;
1410 }
1411 }
1412 }
1413 }
1414
1415 for (room_id, receipt_event) in receipts {
1416 let room_id = this.encode_key(keys::RECEIPT, room_id);
1417
1418 for (event_id, receipt_types) in receipt_event {
1419 let encoded_event_id = this.encode_key(keys::RECEIPT, &event_id);
1420
1421 for (receipt_type, receipt_users) in receipt_types {
1422 let receipt_type =
1423 this.encode_key(keys::RECEIPT, receipt_type.as_str());
1424
1425 for (user_id, receipt) in receipt_users {
1426 let encoded_user_id = this.encode_key(keys::RECEIPT, &user_id);
1427 let thread = this.encode_key(
1430 keys::RECEIPT,
1431 rmp_serde::to_vec_named(&receipt.thread)?,
1432 );
1433 let data = this.serialize_json(&ReceiptData {
1434 receipt,
1435 event_id: event_id.clone(),
1436 user_id,
1437 })?;
1438
1439 txn.set_receipt(
1440 &room_id,
1441 &encoded_user_id,
1442 &receipt_type,
1443 &thread,
1444 &encoded_event_id,
1445 &data,
1446 )?;
1447 }
1448 }
1449 }
1450 }
1451
1452 for (room_id, redactions) in redactions {
1453 let make_redaction_rules = || {
1454 let encoded_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1455 txn.get_room_info(&encoded_room_id)
1456 .ok()
1457 .flatten()
1458 .and_then(|v| this.deserialize_json::<RoomInfo>(&v).ok())
1459 .map(|info| info.room_version_rules_or_default())
1460 .unwrap_or_else(|| {
1461 warn!(
1462 ?room_id,
1463 "Unable to get the room version rules, defaulting to rules for room version {ROOM_VERSION_FALLBACK}"
1464 );
1465 ROOM_VERSION_RULES_FALLBACK
1466 }).redaction
1467 };
1468
1469 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1470 let mut redaction_rules = None;
1471
1472 for (event_id, redaction) in redactions {
1473 let event_id = this.encode_key(keys::STATE_EVENT, event_id);
1474
1475 if let Some(Ok(raw_event)) = txn
1476 .get_state_event_by_id(&encoded_room_id, &event_id)?
1477 .map(|value| this.deserialize_json::<Raw<AnySyncStateEvent>>(&value))
1478 {
1479 let event = raw_event.deserialize()?;
1480 let redacted = redact(
1481 raw_event.deserialize_as::<CanonicalJsonObject>()?,
1482 redaction_rules.get_or_insert_with(make_redaction_rules),
1483 Some(RedactedBecause::from_raw_event(&redaction)?),
1484 )
1485 .map_err(Error::Redaction)?;
1486 let data = this.serialize_json(&redacted)?;
1487
1488 let event_type =
1489 this.encode_key(keys::STATE_EVENT, event.event_type().to_string());
1490 let state_key = this.encode_key(keys::STATE_EVENT, event.state_key());
1491
1492 txn.set_state_event(
1493 &encoded_room_id,
1494 &event_type,
1495 &state_key,
1496 false,
1497 Some(&event_id),
1498 &data,
1499 )?;
1500 }
1501 }
1502 }
1503
1504 for (room_id, display_names) in ambiguity_maps {
1505 let room_id = this.encode_key(keys::DISPLAY_NAME, room_id);
1506
1507 for (name, user_ids) in display_names {
1508 let encoded_name = this.encode_key(
1509 keys::DISPLAY_NAME,
1510 name.as_normalized_str().unwrap_or_else(|| name.as_raw_str()),
1511 );
1512 let data = this.serialize_json(&user_ids)?;
1513
1514 if user_ids.is_empty() {
1515 txn.remove_display_name(&room_id, &encoded_name)?;
1516
1517 let raw_name = this.encode_key(keys::DISPLAY_NAME, name.as_raw_str());
1532 txn.remove_display_name(&room_id, &raw_name)?;
1533 } else {
1534 txn.set_display_name(&room_id, &encoded_name, &data)?;
1536 }
1537 }
1538 }
1539
1540 Ok::<_, Error>(())
1541 })
1542 .await?;
1543
1544 Ok(())
1545 }
1546
1547 async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1548 self.read()
1549 .await?
1550 .get_kv_blob(self.encode_presence_key(user_id))
1551 .await?
1552 .map(|data| self.deserialize_json(&data))
1553 .transpose()
1554 }
1555
1556 async fn get_presence_events(
1557 &self,
1558 user_ids: &[OwnedUserId],
1559 ) -> Result<Vec<Raw<PresenceEvent>>> {
1560 if user_ids.is_empty() {
1561 return Ok(Vec::new());
1562 }
1563
1564 let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
1565 self.read()
1566 .await?
1567 .get_kv_blobs(user_ids)
1568 .await?
1569 .into_iter()
1570 .map(|data| self.deserialize_json(&data))
1571 .collect()
1572 }
1573
1574 async fn get_state_event(
1575 &self,
1576 room_id: &RoomId,
1577 event_type: StateEventType,
1578 state_key: &str,
1579 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1580 Ok(self
1581 .get_state_events_for_keys(room_id, event_type, &[state_key])
1582 .await?
1583 .into_iter()
1584 .next())
1585 }
1586
1587 async fn get_state_events(
1588 &self,
1589 room_id: &RoomId,
1590 event_type: StateEventType,
1591 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1592 let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1593 let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1594 self.read()
1595 .await?
1596 .get_maybe_stripped_state_events(room_id, event_type)
1597 .await?
1598 .into_iter()
1599 .map(|(stripped, data)| {
1600 let ev = if stripped {
1601 RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1602 } else {
1603 RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1604 };
1605
1606 Ok(ev)
1607 })
1608 .collect()
1609 }
1610
1611 async fn get_state_events_for_keys(
1612 &self,
1613 room_id: &RoomId,
1614 event_type: StateEventType,
1615 state_keys: &[&str],
1616 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1617 if state_keys.is_empty() {
1618 return Ok(Vec::new());
1619 }
1620
1621 let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1622 let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1623 let state_keys = state_keys.iter().map(|k| self.encode_key(keys::STATE_EVENT, k)).collect();
1624 self.read()
1625 .await?
1626 .get_maybe_stripped_state_events_for_keys(room_id, event_type, state_keys)
1627 .await?
1628 .into_iter()
1629 .map(|(stripped, data)| {
1630 let ev = if stripped {
1631 RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1632 } else {
1633 RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1634 };
1635
1636 Ok(ev)
1637 })
1638 .collect()
1639 }
1640
1641 async fn get_profile(
1642 &self,
1643 room_id: &RoomId,
1644 user_id: &UserId,
1645 ) -> Result<Option<MinimalRoomMemberEvent>> {
1646 let room_id = self.encode_key(keys::PROFILE, room_id);
1647 let user_ids = vec![self.encode_key(keys::PROFILE, user_id)];
1648
1649 self.read()
1650 .await?
1651 .get_profiles(room_id, user_ids)
1652 .await?
1653 .into_iter()
1654 .next()
1655 .map(|(_, data)| self.deserialize_json(&data))
1656 .transpose()
1657 }
1658
1659 async fn get_profiles<'a>(
1660 &self,
1661 room_id: &RoomId,
1662 user_ids: &'a [OwnedUserId],
1663 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1664 if user_ids.is_empty() {
1665 return Ok(BTreeMap::new());
1666 }
1667
1668 let room_id = self.encode_key(keys::PROFILE, room_id);
1669 let mut user_ids_map = user_ids
1670 .iter()
1671 .map(|u| (self.encode_key(keys::PROFILE, u), u.as_ref()))
1672 .collect::<BTreeMap<_, _>>();
1673 let user_ids = user_ids_map.keys().cloned().collect();
1674
1675 self.read()
1676 .await?
1677 .get_profiles(room_id, user_ids)
1678 .await?
1679 .into_iter()
1680 .map(|(user_id, data)| {
1681 Ok((
1682 user_ids_map
1683 .remove(user_id.as_slice())
1684 .expect("returned user IDs were requested"),
1685 self.deserialize_json(&data)?,
1686 ))
1687 })
1688 .collect()
1689 }
1690
1691 async fn get_user_ids(
1692 &self,
1693 room_id: &RoomId,
1694 membership: RoomMemberships,
1695 ) -> Result<Vec<OwnedUserId>> {
1696 let room_id = self.encode_key(keys::MEMBER, room_id);
1697 let memberships = membership
1698 .as_vec()
1699 .into_iter()
1700 .map(|m| self.encode_key(keys::MEMBER, m.as_str()))
1701 .collect();
1702 self.read()
1703 .await?
1704 .get_user_ids(room_id, memberships)
1705 .await?
1706 .iter()
1707 .map(|data| self.deserialize_value(data))
1708 .collect()
1709 }
1710
1711 async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1712 self.read()
1713 .await?
1714 .get_room_infos(match room_load_settings {
1715 RoomLoadSettings::All => None,
1716 RoomLoadSettings::One(room_id) => Some(self.encode_key(keys::ROOM_INFO, room_id)),
1717 })
1718 .await?
1719 .into_iter()
1720 .map(|data| self.deserialize_json(&data))
1721 .collect()
1722 }
1723
1724 async fn get_users_with_display_name(
1725 &self,
1726 room_id: &RoomId,
1727 display_name: &DisplayName,
1728 ) -> Result<BTreeSet<OwnedUserId>> {
1729 let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1730 let names = vec![self.encode_key(
1731 keys::DISPLAY_NAME,
1732 display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1733 )];
1734
1735 Ok(self
1736 .read()
1737 .await?
1738 .get_display_names(room_id, names)
1739 .await?
1740 .into_iter()
1741 .next()
1742 .map(|(_, data)| self.deserialize_json(&data))
1743 .transpose()?
1744 .unwrap_or_default())
1745 }
1746
1747 async fn get_users_with_display_names<'a>(
1748 &self,
1749 room_id: &RoomId,
1750 display_names: &'a [DisplayName],
1751 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1752 let mut result = HashMap::new();
1753
1754 if display_names.is_empty() {
1755 return Ok(result);
1756 }
1757
1758 let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1759 let mut names_map = display_names
1760 .iter()
1761 .flat_map(|display_name| {
1762 let raw =
1772 (self.encode_key(keys::DISPLAY_NAME, display_name.as_raw_str()), display_name);
1773 let normalized = display_name.as_normalized_str().map(|normalized| {
1774 (self.encode_key(keys::DISPLAY_NAME, normalized), display_name)
1775 });
1776
1777 iter::once(raw).chain(normalized)
1778 })
1779 .collect::<BTreeMap<_, _>>();
1780 let names = names_map.keys().cloned().collect();
1781
1782 for (name, data) in self.read().await?.get_display_names(room_id, names).await?.into_iter()
1783 {
1784 let display_name =
1785 names_map.remove(name.as_slice()).expect("returned display names were requested");
1786 let user_ids: BTreeSet<_> = self.deserialize_json(&data)?;
1787
1788 result.entry(display_name).or_insert_with(BTreeSet::new).extend(user_ids);
1789 }
1790
1791 Ok(result)
1792 }
1793
1794 async fn get_account_data_event(
1795 &self,
1796 event_type: GlobalAccountDataEventType,
1797 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1798 let event_type = self.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1799 self.read()
1800 .await?
1801 .get_global_account_data(event_type)
1802 .await?
1803 .map(|value| self.deserialize_json(&value))
1804 .transpose()
1805 }
1806
1807 async fn get_room_account_data_event(
1808 &self,
1809 room_id: &RoomId,
1810 event_type: RoomAccountDataEventType,
1811 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1812 let room_id = self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1813 let event_type = self.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1814 self.read()
1815 .await?
1816 .get_room_account_data(room_id, event_type)
1817 .await?
1818 .map(|value| self.deserialize_json(&value))
1819 .transpose()
1820 }
1821
1822 async fn get_user_room_receipt_event(
1823 &self,
1824 room_id: &RoomId,
1825 receipt_type: ReceiptType,
1826 thread: ReceiptThread,
1827 user_id: &UserId,
1828 ) -> Result<Option<(OwnedEventId, Receipt)>> {
1829 let room_id = self.encode_key(keys::RECEIPT, room_id);
1830 let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1831 let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1834 let user_id = self.encode_key(keys::RECEIPT, user_id);
1835
1836 self.read()
1837 .await?
1838 .get_user_receipt(room_id, receipt_type, thread, user_id)
1839 .await?
1840 .map(|value| {
1841 self.deserialize_json::<ReceiptData>(&value).map(|d| (d.event_id, d.receipt))
1842 })
1843 .transpose()
1844 }
1845
1846 async fn get_event_room_receipt_events(
1847 &self,
1848 room_id: &RoomId,
1849 receipt_type: ReceiptType,
1850 thread: ReceiptThread,
1851 event_id: &EventId,
1852 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1853 let room_id = self.encode_key(keys::RECEIPT, room_id);
1854 let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1855 let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1858 let event_id = self.encode_key(keys::RECEIPT, event_id);
1859
1860 self.read()
1861 .await?
1862 .get_event_receipts(room_id, receipt_type, thread, event_id)
1863 .await?
1864 .iter()
1865 .map(|value| {
1866 self.deserialize_json::<ReceiptData>(value).map(|d| (d.user_id, d.receipt))
1867 })
1868 .collect()
1869 }
1870
1871 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1872 self.read().await?.get_kv_blob(self.encode_custom_key(key)).await
1873 }
1874
1875 async fn set_custom_value_no_read(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
1876 let conn = self.write().await;
1877 let key = self.encode_custom_key(key);
1878 conn.set_kv_blob(key, value).await?;
1879 Ok(())
1880 }
1881
1882 async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1883 let conn = self.write().await;
1884 let key = self.encode_custom_key(key);
1885 let previous = conn.get_kv_blob(key.clone()).await?;
1886 conn.set_kv_blob(key, value).await?;
1887 Ok(previous)
1888 }
1889
1890 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1891 let conn = self.write().await;
1892 let key = self.encode_custom_key(key);
1893 let previous = conn.get_kv_blob(key.clone()).await?;
1894 if previous.is_some() {
1895 conn.delete_kv_blob(key).await?;
1896 }
1897 Ok(previous)
1898 }
1899
1900 async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1901 let this = self.clone();
1902 let room_id = room_id.to_owned();
1903
1904 let conn = self.write().await;
1905
1906 conn.with_transaction(move |txn| -> Result<()> {
1907 let room_info_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1908 txn.remove_room_info(&room_info_room_id)?;
1909
1910 let state_event_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1911 txn.remove_room_state_events(&state_event_room_id, None)?;
1912
1913 let member_room_id = this.encode_key(keys::MEMBER, &room_id);
1914 txn.remove_room_members(&member_room_id, None)?;
1915
1916 let profile_room_id = this.encode_key(keys::PROFILE, &room_id);
1917 txn.remove_room_profiles(&profile_room_id)?;
1918
1919 let room_account_data_room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, &room_id);
1920 txn.remove_room_account_data(&room_account_data_room_id)?;
1921
1922 let receipt_room_id = this.encode_key(keys::RECEIPT, &room_id);
1923 txn.remove_room_receipts(&receipt_room_id)?;
1924
1925 let display_name_room_id = this.encode_key(keys::DISPLAY_NAME, &room_id);
1926 txn.remove_room_display_names(&display_name_room_id)?;
1927
1928 let send_queue_room_id = this.encode_key(keys::SEND_QUEUE, &room_id);
1929 txn.remove_room_send_queue(&send_queue_room_id)?;
1930
1931 let dependent_send_queue_room_id =
1932 this.encode_key(keys::DEPENDENTS_SEND_QUEUE, &room_id);
1933 txn.remove_room_dependent_send_queue(&dependent_send_queue_room_id)?;
1934
1935 let thread_subscriptions_room_id =
1936 this.encode_key(keys::THREAD_SUBSCRIPTIONS, &room_id);
1937 txn.execute(
1938 "DELETE FROM thread_subscriptions WHERE room_id = ?",
1939 (thread_subscriptions_room_id,),
1940 )?;
1941
1942 Ok(())
1943 })
1944 .await?;
1945
1946 conn.vacuum().await
1947 }
1948
1949 async fn save_send_queue_request(
1950 &self,
1951 room_id: &RoomId,
1952 transaction_id: OwnedTransactionId,
1953 created_at: MilliSecondsSinceUnixEpoch,
1954 content: QueuedRequestKind,
1955 priority: usize,
1956 ) -> Result<(), Self::Error> {
1957 let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
1958 let room_id_value = self.serialize_value(&room_id.to_owned())?;
1959
1960 let content = self.serialize_json(&content)?;
1961 let created_at_ts: u64 = created_at.0.into();
1967 self.write()
1968 .await
1969 .with_transaction(move |txn| {
1970 txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
1971 Ok(())
1972 })
1973 .await
1974 }
1975
1976 async fn update_send_queue_request(
1977 &self,
1978 room_id: &RoomId,
1979 transaction_id: &TransactionId,
1980 content: QueuedRequestKind,
1981 ) -> Result<bool, Self::Error> {
1982 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1983
1984 let content = self.serialize_json(&content)?;
1985 let transaction_id = transaction_id.to_string();
1988
1989 let num_updated = self.write()
1990 .await
1991 .with_transaction(move |txn| {
1992 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
1993 })
1994 .await?;
1995
1996 Ok(num_updated > 0)
1997 }
1998
1999 async fn remove_send_queue_request(
2000 &self,
2001 room_id: &RoomId,
2002 transaction_id: &TransactionId,
2003 ) -> Result<bool, Self::Error> {
2004 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
2005
2006 let transaction_id = transaction_id.to_string();
2008
2009 let num_deleted = self
2010 .write()
2011 .await
2012 .with_transaction(move |txn| {
2013 txn.prepare_cached(
2014 "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
2015 )?
2016 .execute((room_id, &transaction_id))
2017 })
2018 .await?;
2019
2020 Ok(num_deleted > 0)
2021 }
2022
2023 async fn load_send_queue_requests(
2024 &self,
2025 room_id: &RoomId,
2026 ) -> Result<Vec<QueuedRequest>, Self::Error> {
2027 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
2028
2029 let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, Option<u64>)> = self
2033 .read()
2034 .await?
2035 .prepare(
2036 "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
2037 |mut stmt| {
2038 stmt.query((room_id,))?
2039 .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2040 .collect()
2041 },
2042 )
2043 .await?;
2044
2045 let mut requests = Vec::with_capacity(res.len());
2046
2047 for entry in res {
2048 let created_at = entry
2049 .4
2050 .and_then(UInt::new)
2051 .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2052
2053 requests.push(QueuedRequest {
2054 transaction_id: entry.0.into(),
2055 kind: self.deserialize_json(&entry.1)?,
2056 error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
2057 priority: entry.3,
2058 created_at,
2059 });
2060 }
2061
2062 Ok(requests)
2063 }
2064
2065 async fn update_send_queue_request_status(
2066 &self,
2067 room_id: &RoomId,
2068 transaction_id: &TransactionId,
2069 error: Option<QueueWedgeError>,
2070 ) -> Result<(), Self::Error> {
2071 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
2072
2073 let transaction_id = transaction_id.to_string();
2075
2076 let error_value = error.map(|e| self.serialize_value(&e)).transpose()?;
2078
2079 self.write()
2080 .await
2081 .with_transaction(move |txn| {
2082 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?;
2083 Ok(())
2084 })
2085 .await
2086 }
2087
2088 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
2089 let res: Vec<Vec<u8>> = self
2094 .read()
2095 .await?
2096 .prepare("SELECT room_id_val FROM send_queue_events", |mut stmt| {
2097 stmt.query(())?.mapped(|row| row.get(0)).collect()
2098 })
2099 .await?;
2100
2101 Ok(res
2104 .into_iter()
2105 .map(|entry| self.deserialize_value(&entry))
2106 .collect::<Result<BTreeSet<OwnedRoomId>, _>>()?
2107 .into_iter()
2108 .collect())
2109 }
2110
2111 async fn save_dependent_queued_request(
2112 &self,
2113 room_id: &RoomId,
2114 parent_txn_id: &TransactionId,
2115 own_txn_id: ChildTransactionId,
2116 created_at: MilliSecondsSinceUnixEpoch,
2117 content: DependentQueuedRequestKind,
2118 ) -> Result<()> {
2119 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2120 let content = self.serialize_json(&content)?;
2121
2122 let parent_txn_id = parent_txn_id.to_string();
2124 let own_txn_id = own_txn_id.to_string();
2125
2126 let created_at_ts: u64 = created_at.0.into();
2127 self.write()
2128 .await
2129 .with_transaction(move |txn| {
2130 txn.prepare_cached(
2131 r#"INSERT INTO dependent_send_queue_events
2132 (room_id, parent_transaction_id, own_transaction_id, content, created_at)
2133 VALUES (?, ?, ?, ?, ?)"#,
2134 )?
2135 .execute((
2136 room_id,
2137 parent_txn_id,
2138 own_txn_id,
2139 content,
2140 created_at_ts,
2141 ))?;
2142 Ok(())
2143 })
2144 .await
2145 }
2146
2147 async fn update_dependent_queued_request(
2148 &self,
2149 room_id: &RoomId,
2150 own_transaction_id: &ChildTransactionId,
2151 new_content: DependentQueuedRequestKind,
2152 ) -> Result<bool> {
2153 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2154 let content = self.serialize_json(&new_content)?;
2155
2156 let own_txn_id = own_transaction_id.to_string();
2158
2159 let num_updated = self
2160 .write()
2161 .await
2162 .with_transaction(move |txn| {
2163 txn.prepare_cached(
2164 r#"UPDATE dependent_send_queue_events
2165 SET content = ?
2166 WHERE own_transaction_id = ?
2167 AND room_id = ?"#,
2168 )?
2169 .execute((content, own_txn_id, room_id))
2170 })
2171 .await?;
2172
2173 if num_updated > 1 {
2174 return Err(Error::InconsistentUpdate);
2175 }
2176
2177 Ok(num_updated == 1)
2178 }
2179
2180 async fn mark_dependent_queued_requests_as_ready(
2181 &self,
2182 room_id: &RoomId,
2183 parent_txn_id: &TransactionId,
2184 parent_key: SentRequestKey,
2185 ) -> Result<usize> {
2186 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2187 let parent_key = self.serialize_json(&parent_key)?;
2188
2189 let parent_txn_id = parent_txn_id.to_string();
2191
2192 self.write()
2193 .await
2194 .with_transaction(move |txn| {
2195 Ok(txn.prepare_cached(
2196 "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?",
2197 )?
2198 .execute((parent_key, parent_txn_id, room_id))?)
2199 })
2200 .await
2201 }
2202
2203 async fn remove_dependent_queued_request(
2204 &self,
2205 room_id: &RoomId,
2206 txn_id: &ChildTransactionId,
2207 ) -> Result<bool> {
2208 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2209
2210 let txn_id = txn_id.to_string();
2212
2213 let num_deleted = self
2214 .write()
2215 .await
2216 .with_transaction(move |txn| {
2217 txn.prepare_cached(
2218 "DELETE FROM dependent_send_queue_events WHERE own_transaction_id = ? AND room_id = ?",
2219 )?
2220 .execute((txn_id, room_id))
2221 })
2222 .await?;
2223
2224 Ok(num_deleted > 0)
2225 }
2226
2227 async fn load_dependent_queued_requests(
2228 &self,
2229 room_id: &RoomId,
2230 ) -> Result<Vec<DependentQueuedRequest>> {
2231 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2232
2233 let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, Option<u64>)> = self
2235 .read()
2236 .await?
2237 .prepare(
2238 "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2239 |mut stmt| {
2240 stmt.query((room_id,))?
2241 .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2242 .collect()
2243 },
2244 )
2245 .await?;
2246
2247 let mut dependent_events = Vec::with_capacity(res.len());
2248
2249 for entry in res {
2250 let created_at = entry
2251 .4
2252 .and_then(UInt::new)
2253 .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2254
2255 dependent_events.push(DependentQueuedRequest {
2256 own_transaction_id: entry.0.into(),
2257 parent_transaction_id: entry.1.into(),
2258 parent_key: entry.2.map(|json| self.deserialize_json(&json)).transpose()?,
2259 kind: self.deserialize_json(&entry.3)?,
2260 created_at,
2261 });
2262 }
2263
2264 Ok(dependent_events)
2265 }
2266
2267 async fn upsert_thread_subscriptions(
2268 &self,
2269 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
2270 ) -> Result<(), Self::Error> {
2271 let values: Vec<_> = updates
2272 .into_iter()
2273 .map(|(room_id, thread_id, subscription)| {
2274 (
2275 self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id),
2276 self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id),
2277 subscription.status.as_str(),
2278 subscription.bump_stamp,
2279 )
2280 })
2281 .collect();
2282
2283 self.write()
2284 .await
2285 .with_transaction(move |txn| {
2286 let mut txn = txn.prepare_cached(
2287 "INSERT INTO thread_subscriptions (room_id, event_id, status, bump_stamp)
2288 VALUES (?, ?, ?, ?)
2289 ON CONFLICT (room_id, event_id) DO UPDATE
2290 SET
2291 status =
2292 CASE
2293 WHEN thread_subscriptions.bump_stamp IS NULL THEN EXCLUDED.status
2294 WHEN EXCLUDED.bump_stamp IS NULL THEN EXCLUDED.status
2295 WHEN thread_subscriptions.bump_stamp < EXCLUDED.bump_stamp THEN EXCLUDED.status
2296 ELSE thread_subscriptions.status
2297 END,
2298 bump_stamp =
2299 CASE
2300 WHEN thread_subscriptions.bump_stamp IS NULL THEN EXCLUDED.bump_stamp
2301 WHEN EXCLUDED.bump_stamp IS NULL THEN thread_subscriptions.bump_stamp
2302 WHEN thread_subscriptions.bump_stamp < EXCLUDED.bump_stamp THEN EXCLUDED.bump_stamp
2303 ELSE thread_subscriptions.bump_stamp
2304 END",
2305 )?;
2306
2307 for value in values {
2308 txn.execute(value)?;
2309 }
2310
2311 Result::<_, Error>::Ok(())
2312 })
2313 .await?;
2314
2315 Ok(())
2316 }
2317
2318 async fn load_thread_subscription(
2319 &self,
2320 room_id: &RoomId,
2321 thread_id: &EventId,
2322 ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
2323 let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2324 let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2325
2326 Ok(self
2327 .read()
2328 .await?
2329 .query_row(
2330 "SELECT status, bump_stamp FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2331 (room_id, thread_id),
2332 |row| Ok((row.get::<_, String>(0)?, row.get::<_, Option<u64>>(1)?))
2333 )
2334 .await
2335 .optional()?
2336 .map(|(status, bump_stamp)| -> Result<_, Self::Error> {
2337 let status = ThreadSubscriptionStatus::from_str(&status).map_err(|_| {
2338 Error::InvalidData { details: format!("Invalid thread status: {status}") }
2339 })?;
2340 Ok(StoredThreadSubscription { status, bump_stamp })
2341 })
2342 .transpose()?)
2343 }
2344
2345 async fn remove_thread_subscription(
2346 &self,
2347 room_id: &RoomId,
2348 thread_id: &EventId,
2349 ) -> Result<(), Self::Error> {
2350 let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2351 let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2352
2353 self.write()
2354 .await
2355 .execute(
2356 "DELETE FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2357 (room_id, thread_id),
2358 )
2359 .await?;
2360
2361 Ok(())
2362 }
2363
2364 async fn optimize(&self) -> Result<(), Self::Error> {
2365 Ok(self.vacuum().await?)
2366 }
2367
2368 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
2369 self.get_db_size().await
2370 }
2371}
2372
2373#[derive(Debug, Clone, Serialize, Deserialize)]
2374struct ReceiptData {
2375 receipt: Receipt,
2376 event_id: OwnedEventId,
2377 user_id: OwnedUserId,
2378}
2379
2380#[cfg(test)]
2381mod tests {
2382 use std::sync::{
2383 LazyLock,
2384 atomic::{AtomicU32, Ordering::SeqCst},
2385 };
2386
2387 use matrix_sdk_base::{StateStore, StoreError, statestore_integration_tests};
2388 use tempfile::{TempDir, tempdir};
2389
2390 use super::SqliteStateStore;
2391
2392 static TMP_DIR: LazyLock<TempDir> = LazyLock::new(|| tempdir().unwrap());
2393 static NUM: AtomicU32 = AtomicU32::new(0);
2394
2395 async fn get_store() -> Result<impl StateStore, StoreError> {
2396 let name = NUM.fetch_add(1, SeqCst).to_string();
2397 let tmpdir_path = TMP_DIR.path().join(name);
2398
2399 tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2400
2401 Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
2402 }
2403
2404 statestore_integration_tests!();
2405}
2406
2407#[cfg(test)]
2408mod encrypted_tests {
2409 use std::{
2410 path::PathBuf,
2411 sync::{
2412 LazyLock,
2413 atomic::{AtomicU32, Ordering::SeqCst},
2414 },
2415 };
2416
2417 use matrix_sdk_base::{StateStore, StoreError, statestore_integration_tests};
2418 use matrix_sdk_test::async_test;
2419 use tempfile::{TempDir, tempdir};
2420
2421 use super::SqliteStateStore;
2422 use crate::{SqliteStoreConfig, utils::SqliteAsyncConnExt};
2423
2424 static TMP_DIR: LazyLock<TempDir> = LazyLock::new(|| tempdir().unwrap());
2425 static NUM: AtomicU32 = AtomicU32::new(0);
2426
2427 fn new_state_store_workspace() -> PathBuf {
2428 let name = NUM.fetch_add(1, SeqCst).to_string();
2429 TMP_DIR.path().join(name)
2430 }
2431
2432 async fn get_store() -> Result<impl StateStore, StoreError> {
2433 let tmpdir_path = new_state_store_workspace();
2434
2435 tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2436
2437 Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
2438 .await
2439 .unwrap())
2440 }
2441
2442 #[async_test]
2443 async fn test_pool_size() {
2444 let tmpdir_path = new_state_store_workspace();
2445 let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
2446
2447 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2448
2449 assert_eq!(store.pool.status().max_size, 42);
2450 }
2451
2452 #[async_test]
2453 async fn test_cache_size() {
2454 let tmpdir_path = new_state_store_workspace();
2455 let store_open_config = SqliteStoreConfig::new(tmpdir_path).cache_size(1500);
2456
2457 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2458
2459 let conn = store.pool.get().await.unwrap();
2460 let cache_size =
2461 conn.query_row("PRAGMA cache_size", (), |row| row.get::<_, i32>(0)).await.unwrap();
2462
2463 assert_eq!(cache_size, -(1500 / 1024));
2467 }
2468
2469 #[async_test]
2470 async fn test_journal_size_limit() {
2471 let tmpdir_path = new_state_store_workspace();
2472 let store_open_config = SqliteStoreConfig::new(tmpdir_path).journal_size_limit(1500);
2473
2474 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2475
2476 let conn = store.pool.get().await.unwrap();
2477 let journal_size_limit = conn
2478 .query_row("PRAGMA journal_size_limit", (), |row| row.get::<_, u32>(0))
2479 .await
2480 .unwrap();
2481
2482 assert_eq!(journal_size_limit, 1500);
2485 }
2486
2487 statestore_integration_tests!();
2488}
2489
2490#[cfg(test)]
2491mod migration_tests {
2492 use std::{
2493 path::{Path, PathBuf},
2494 sync::{
2495 Arc, LazyLock,
2496 atomic::{AtomicU32, Ordering::SeqCst},
2497 },
2498 };
2499
2500 use as_variant::as_variant;
2501 use matrix_sdk_base::{
2502 RoomState, StateStore,
2503 media::{MediaFormat, MediaRequestParameters},
2504 store::{
2505 ChildTransactionId, DependentQueuedRequestKind, RoomLoadSettings,
2506 SerializableEventContent,
2507 },
2508 sync::UnreadNotificationsCount,
2509 };
2510 use matrix_sdk_test::async_test;
2511 use ruma::{
2512 EventId, MilliSecondsSinceUnixEpoch, OwnedTransactionId, RoomId, TransactionId, UserId,
2513 events::{
2514 StateEventType,
2515 room::{MediaSource, create::RoomCreateEventContent, message::RoomMessageEventContent},
2516 },
2517 room_id, server_name, user_id,
2518 };
2519 use rusqlite::Transaction;
2520 use serde::{Deserialize, Serialize};
2521 use serde_json::json;
2522 use tempfile::{TempDir, tempdir};
2523 use tokio::{fs, sync::Mutex};
2524 use zeroize::Zeroizing;
2525
2526 use super::{DATABASE_NAME, SqliteStateStore, init, keys};
2527 use crate::{
2528 OpenStoreError, Secret, SqliteStoreConfig,
2529 error::{Error, Result},
2530 utils::{EncryptableStore as _, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
2531 };
2532
2533 static TMP_DIR: LazyLock<TempDir> = LazyLock::new(|| tempdir().unwrap());
2534 static NUM: AtomicU32 = AtomicU32::new(0);
2535 const SECRET: &str = "secret";
2536
2537 fn new_path() -> PathBuf {
2538 let name = NUM.fetch_add(1, SeqCst).to_string();
2539 TMP_DIR.path().join(name)
2540 }
2541
2542 async fn create_fake_db(path: &Path, version: u8) -> Result<SqliteStateStore> {
2543 let config = SqliteStoreConfig::new(path);
2544
2545 fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir).unwrap();
2546
2547 let pool = config.build_pool_of_connections(DATABASE_NAME).unwrap();
2548 let conn = pool.get().await?;
2549
2550 init(&conn).await?;
2551
2552 let store_cipher = Some(Arc::new(
2553 conn.get_or_create_store_cipher(Secret::PassPhrase(Zeroizing::new(SECRET.to_owned())))
2554 .await
2555 .unwrap(),
2556 ));
2557 let this = SqliteStateStore {
2558 store_cipher,
2559 pool,
2560 write_connection: Arc::new(Mutex::new(conn)),
2562 };
2563 this.run_migrations(1, Some(version)).await?;
2564
2565 Ok(this)
2566 }
2567
2568 fn room_info_v1_json(
2569 room_id: &RoomId,
2570 state: RoomState,
2571 name: Option<&str>,
2572 creator: Option<&UserId>,
2573 ) -> serde_json::Value {
2574 let name_content = match name {
2576 Some(name) => json!({ "name": name }),
2577 None => json!({ "name": null }),
2578 };
2579 let create_content = match creator {
2581 Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2582 None => RoomCreateEventContent::new_v11(),
2583 };
2584
2585 json!({
2586 "room_id": room_id,
2587 "room_type": state,
2588 "notification_counts": UnreadNotificationsCount::default(),
2589 "summary": {
2590 "heroes": [],
2591 "joined_member_count": 0,
2592 "invited_member_count": 0,
2593 },
2594 "members_synced": false,
2595 "base_info": {
2596 "dm_targets": [],
2597 "max_power_level": 100,
2598 "name": {
2599 "Original": {
2600 "content": name_content,
2601 },
2602 },
2603 "create": {
2604 "Original": {
2605 "content": create_content,
2606 }
2607 }
2608 },
2609 })
2610 }
2611
2612 #[async_test]
2613 pub async fn test_migrating_v1_to_v2() {
2614 let path = new_path();
2615 {
2617 let db = create_fake_db(&path, 1).await.unwrap();
2618 let conn = db.pool.get().await.unwrap();
2619
2620 let this = db.clone();
2621 conn.with_transaction(move |txn| {
2622 for i in 0..5 {
2623 let room_id = RoomId::parse(format!("!room_{i}:localhost")).unwrap();
2624 let (state, stripped) =
2625 if i < 3 { (RoomState::Joined, false) } else { (RoomState::Invited, true) };
2626 let info = room_info_v1_json(&room_id, state, None, None);
2627
2628 let room_id = this.encode_key(keys::ROOM_INFO, room_id);
2629 let data = this.serialize_json(&info)?;
2630
2631 txn.prepare_cached(
2632 "INSERT INTO room_info (room_id, stripped, data)
2633 VALUES (?, ?, ?)",
2634 )?
2635 .execute((room_id, stripped, data))?;
2636 }
2637
2638 Result::<_, Error>::Ok(())
2639 })
2640 .await
2641 .unwrap();
2642 }
2643
2644 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2646
2647 assert_eq!(store.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 5);
2649 }
2650
2651 fn add_room_v2(
2653 this: &SqliteStateStore,
2654 txn: &Transaction<'_>,
2655 room_id: &RoomId,
2656 name: Option<&str>,
2657 create_creator: Option<&UserId>,
2658 create_sender: Option<&UserId>,
2659 ) -> Result<(), Error> {
2660 let room_info_json = room_info_v1_json(room_id, RoomState::Joined, name, create_creator);
2661
2662 let encoded_room_id = this.encode_key(keys::ROOM_INFO, room_id);
2663 let encoded_state =
2664 this.encode_key(keys::ROOM_INFO, serde_json::to_string(&RoomState::Joined)?);
2665 let data = this.serialize_json(&room_info_json)?;
2666
2667 txn.prepare_cached(
2668 "INSERT INTO room_info (room_id, state, data)
2669 VALUES (?, ?, ?)",
2670 )?
2671 .execute((encoded_room_id, encoded_state, data))?;
2672
2673 let Some(create_sender) = create_sender else {
2675 return Ok(());
2676 };
2677
2678 let create_content = match create_creator {
2679 Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2680 None => RoomCreateEventContent::new_v11(),
2681 };
2682
2683 let event_id = EventId::new_v1(server_name!("dummy.local"));
2684 let create_event = json!({
2685 "content": create_content,
2686 "event_id": event_id,
2687 "sender": create_sender.to_owned(),
2688 "origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
2689 "state_key": "",
2690 "type": "m.room.create",
2691 "unsigned": {},
2692 });
2693
2694 let encoded_room_id = this.encode_key(keys::STATE_EVENT, room_id);
2695 let encoded_event_type =
2696 this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
2697 let encoded_state_key = this.encode_key(keys::STATE_EVENT, "");
2698 let stripped = false;
2699 let encoded_event_id = this.encode_key(keys::STATE_EVENT, event_id);
2700 let data = this.serialize_json(&create_event)?;
2701
2702 txn.prepare_cached(
2703 "INSERT
2704 INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
2705 VALUES (?, ?, ?, ?, ?, ?)",
2706 )?
2707 .execute((
2708 encoded_room_id,
2709 encoded_event_type,
2710 encoded_state_key,
2711 stripped,
2712 encoded_event_id,
2713 data,
2714 ))?;
2715
2716 Ok(())
2717 }
2718
2719 #[async_test]
2720 pub async fn test_migrating_v2_to_v3() {
2721 let path = new_path();
2722
2723 let room_a_id = room_id!("!room_a:dummy.local");
2725 let room_a_name = "Room A";
2726 let room_a_creator = user_id!("@creator:dummy.local");
2727 let room_a_create_sender = user_id!("@sender:dummy.local");
2730
2731 let room_b_id = room_id!("!room_b:dummy.local");
2733
2734 let room_c_id = room_id!("!room_c:dummy.local");
2736 let room_c_create_sender = user_id!("@creator:dummy.local");
2737
2738 {
2740 let db = create_fake_db(&path, 2).await.unwrap();
2741 let conn = db.pool.get().await.unwrap();
2742
2743 let this = db.clone();
2744 conn.with_transaction(move |txn| {
2745 add_room_v2(
2746 &this,
2747 txn,
2748 room_a_id,
2749 Some(room_a_name),
2750 Some(room_a_creator),
2751 Some(room_a_create_sender),
2752 )?;
2753 add_room_v2(&this, txn, room_b_id, None, None, None)?;
2754 add_room_v2(&this, txn, room_c_id, None, None, Some(room_c_create_sender))?;
2755
2756 Result::<_, Error>::Ok(())
2757 })
2758 .await
2759 .unwrap();
2760 }
2761
2762 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2764
2765 let room_infos = store.get_room_infos(&RoomLoadSettings::default()).await.unwrap();
2767 assert_eq!(room_infos.len(), 3);
2768
2769 let room_a = room_infos.iter().find(|r| r.room_id() == room_a_id).unwrap();
2770 assert_eq!(room_a.name(), Some(room_a_name));
2771 assert_eq!(room_a.creators(), Some(vec![room_a_create_sender.to_owned()]));
2772
2773 let room_b = room_infos.iter().find(|r| r.room_id() == room_b_id).unwrap();
2774 assert_eq!(room_b.name(), None);
2775 assert_eq!(room_b.creators(), None);
2776
2777 let room_c = room_infos.iter().find(|r| r.room_id() == room_c_id).unwrap();
2778 assert_eq!(room_c.name(), None);
2779 assert_eq!(room_c.creators(), Some(vec![room_c_create_sender.to_owned()]));
2780 }
2781
2782 #[async_test]
2783 pub async fn test_migrating_v7_to_v9() {
2784 let path = new_path();
2785
2786 let room_id = room_id!("!room_a:dummy.local");
2787 let wedged_event_transaction_id = TransactionId::new();
2788 let local_event_transaction_id = TransactionId::new();
2789
2790 {
2792 let db = create_fake_db(&path, 7).await.unwrap();
2793 let conn = db.pool.get().await.unwrap();
2794
2795 let wedge_tx = wedged_event_transaction_id.clone();
2796 let local_tx = local_event_transaction_id.clone();
2797
2798 conn.with_transaction(move |txn| {
2799 add_dependent_send_queue_event_v7(
2800 &db,
2801 txn,
2802 room_id,
2803 &local_tx,
2804 ChildTransactionId::new(),
2805 DependentQueuedRequestKind::RedactEvent,
2806 )?;
2807 add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?;
2808 add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?;
2809 Result::<_, Error>::Ok(())
2810 })
2811 .await
2812 .unwrap();
2813 }
2814
2815 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2818
2819 let requests = store.load_send_queue_requests(room_id).await.unwrap();
2820 assert!(requests.is_empty());
2821
2822 let dependent_requests = store.load_dependent_queued_requests(room_id).await.unwrap();
2823 assert!(dependent_requests.is_empty());
2824 }
2825
2826 fn add_send_queue_event_v7(
2827 this: &SqliteStateStore,
2828 txn: &Transaction<'_>,
2829 transaction_id: &TransactionId,
2830 room_id: &RoomId,
2831 is_wedged: bool,
2832 ) -> Result<(), Error> {
2833 let content =
2834 SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?;
2835
2836 let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id);
2837 let room_id_value = this.serialize_value(&room_id.to_owned())?;
2838
2839 let content = this.serialize_json(&content)?;
2840
2841 txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
2842 .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
2843
2844 Ok(())
2845 }
2846
2847 fn add_dependent_send_queue_event_v7(
2848 this: &SqliteStateStore,
2849 txn: &Transaction<'_>,
2850 room_id: &RoomId,
2851 parent_txn_id: &TransactionId,
2852 own_txn_id: ChildTransactionId,
2853 content: DependentQueuedRequestKind,
2854 ) -> Result<(), Error> {
2855 let room_id_value = this.serialize_value(&room_id.to_owned())?;
2856
2857 let parent_txn_id = parent_txn_id.to_string();
2858 let own_txn_id = own_txn_id.to_string();
2859 let content = this.serialize_json(&content)?;
2860
2861 txn.prepare_cached(
2862 "INSERT INTO dependent_send_queue_events
2863 (room_id, parent_transaction_id, own_transaction_id, content)
2864 VALUES (?, ?, ?, ?)",
2865 )?
2866 .execute((room_id_value, parent_txn_id, own_txn_id, content))?;
2867
2868 Ok(())
2869 }
2870
2871 #[derive(Clone, Debug, Serialize, Deserialize)]
2872 pub enum LegacyDependentQueuedRequestKind {
2873 UploadFileWithThumbnail {
2874 content_type: String,
2875 cache_key: MediaRequestParameters,
2876 related_to: OwnedTransactionId,
2877 },
2878 }
2879
2880 #[async_test]
2881 pub async fn test_dependent_queued_request_variant_renaming() {
2882 let path = new_path();
2883 let db = create_fake_db(&path, 7).await.unwrap();
2884
2885 let cache_key = MediaRequestParameters {
2886 format: MediaFormat::File,
2887 source: MediaSource::Plain("https://server.local/foobar".into()),
2888 };
2889 let related_to = TransactionId::new();
2890 let request = LegacyDependentQueuedRequestKind::UploadFileWithThumbnail {
2891 content_type: "image/png".to_owned(),
2892 cache_key,
2893 related_to: related_to.clone(),
2894 };
2895
2896 let data = db
2897 .serialize_json(&request)
2898 .expect("should be able to serialize legacy dependent request");
2899 let deserialized: DependentQueuedRequestKind = db.deserialize_json(&data).expect(
2900 "should be able to deserialize dependent request from legacy dependent request",
2901 );
2902
2903 as_variant!(deserialized, DependentQueuedRequestKind::UploadFileOrThumbnail { related_to: de_related_to, .. } => {
2904 assert_eq!(de_related_to, related_to);
2905 });
2906 }
2907}