1use std::{
2 borrow::Cow,
3 collections::{BTreeMap, BTreeSet, HashMap},
4 fmt, iter,
5 path::Path,
6 str::FromStr as _,
7 sync::Arc,
8};
9
10use async_trait::async_trait;
11use matrix_sdk_base::{
12 MinimalRoomMemberEvent, ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK, RoomInfo,
13 RoomMemberships, RoomState, StateChanges, StateStore, StateStoreDataKey, StateStoreDataValue,
14 deserialized_responses::{DisplayName, RawAnySyncOrStrippedState, SyncOrStrippedState},
15 store::{
16 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError,
17 QueuedRequest, QueuedRequestKind, RoomLoadSettings, SentRequestKey,
18 StoredThreadSubscription, ThreadSubscriptionStatus, migration_helpers::RoomInfoV1,
19 },
20};
21use matrix_sdk_store_encryption::StoreCipher;
22use ruma::{
23 CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
24 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
25 canonical_json::{RedactedBecause, redact},
26 events::{
27 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
28 GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
29 presence::PresenceEvent,
30 receipt::{Receipt, ReceiptThread, ReceiptType},
31 room::{
32 create::RoomCreateEventContent,
33 member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
34 },
35 },
36 serde::Raw,
37};
38use rusqlite::{OptionalExtension, Transaction};
39use serde::{Deserialize, Serialize};
40use tokio::{
41 fs,
42 sync::{Mutex, OwnedMutexGuard},
43};
44use tracing::{debug, instrument, warn};
45
46use crate::{
47 OpenStoreError, Secret, SqliteStoreConfig,
48 connection::{Connection as SqliteAsyncConn, Pool as SqlitePool},
49 error::{Error, Result},
50 utils::{
51 EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
52 SqliteKeyValueStoreConnExt, repeat_vars,
53 },
54};
55
56mod keys {
57 pub const KV_BLOB: &str = "kv_blob";
59 pub const ROOM_INFO: &str = "room_info";
60 pub const STATE_EVENT: &str = "state_event";
61 pub const GLOBAL_ACCOUNT_DATA: &str = "global_account_data";
62 pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
63 pub const MEMBER: &str = "member";
64 pub const PROFILE: &str = "profile";
65 pub const RECEIPT: &str = "receipt";
66 pub const DISPLAY_NAME: &str = "display_name";
67 pub const SEND_QUEUE: &str = "send_queue_events";
68 pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
69 pub const THREAD_SUBSCRIPTIONS: &str = "thread_subscriptions";
70}
71
72pub const DATABASE_NAME: &str = "matrix-sdk-state.sqlite3";
74
75const DATABASE_VERSION: u8 = 14;
81
82#[derive(Clone)]
84pub struct SqliteStateStore {
85 store_cipher: Option<Arc<StoreCipher>>,
86
87 pool: SqlitePool,
89
90 write_connection: Arc<Mutex<SqliteAsyncConn>>,
95}
96
97#[cfg(not(tarpaulin_include))]
98impl fmt::Debug for SqliteStateStore {
99 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100 f.debug_struct("SqliteStateStore").finish_non_exhaustive()
101 }
102}
103
104impl SqliteStateStore {
105 pub async fn open(
108 path: impl AsRef<Path>,
109 passphrase: Option<&str>,
110 ) -> Result<Self, OpenStoreError> {
111 Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
112 }
113
114 pub async fn open_with_key(
117 path: impl AsRef<Path>,
118 key: Option<&[u8; 32]>,
119 ) -> Result<Self, OpenStoreError> {
120 Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
121 }
122
123 pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
125 fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir)?;
126
127 let pool = config.build_pool_of_connections(DATABASE_NAME)?;
128
129 let this = Self::open_with_pool(pool, config.secret).await?;
130 this.pool.get().await?.apply_runtime_config(config.runtime_config).await?;
131
132 Ok(this)
133 }
134
135 pub async fn open_with_pool(
138 pool: SqlitePool,
139 secret: Option<Secret>,
140 ) -> Result<Self, OpenStoreError> {
141 let conn = pool.get().await?;
142
143 let mut version = conn.db_version().await?;
144
145 if version == 0 {
146 init(&conn).await?;
147 version = 1;
148 }
149
150 let store_cipher = match secret {
151 Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
152 None => None,
153 };
154 let this = Self {
155 store_cipher,
156 pool,
157 write_connection: Arc::new(Mutex::new(conn)),
159 };
160 this.run_migrations(version, None).await?;
161
162 this.read().await?.wal_checkpoint().await;
163
164 Ok(this)
165 }
166
167 async fn run_migrations(&self, from: u8, to: Option<u8>) -> Result<()> {
172 let to = to.unwrap_or(DATABASE_VERSION);
173
174 if from < to {
175 debug!(version = from, new_version = to, "Upgrading database");
176 } else {
177 return Ok(());
178 }
179
180 let conn = self.write().await;
181
182 if from < 2 && to >= 2 {
183 let this = self.clone();
184 conn.with_transaction(move |txn| {
185 txn.execute_batch(include_str!(
187 "../migrations/state_store/002_a_create_new_room_info.sql"
188 ))?;
189
190 for data in txn
192 .prepare("SELECT data FROM room_info")?
193 .query_map((), |row| row.get::<_, Vec<u8>>(0))?
194 {
195 let data = data?;
196 let room_info: RoomInfoV1 = this.deserialize_json(&data)?;
197
198 let room_id = this.encode_key(keys::ROOM_INFO, room_info.room_id());
199 let state = this
200 .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
201 txn.prepare_cached(
202 "INSERT OR REPLACE INTO new_room_info (room_id, state, data)
203 VALUES (?, ?, ?)",
204 )?
205 .execute((room_id, state, data))?;
206 }
207
208 txn.execute_batch(include_str!(
210 "../migrations/state_store/002_b_replace_room_info.sql"
211 ))?;
212
213 txn.set_db_version(2)?;
214 Result::<_, Error>::Ok(())
215 })
216 .await?;
217 }
218
219 if from < 3 && to >= 3 {
221 let this = self.clone();
222 conn.with_transaction(move |txn| {
223 for data in txn
225 .prepare("SELECT data FROM room_info")?
226 .query_map((), |row| row.get::<_, Vec<u8>>(0))?
227 {
228 let data = data?;
229 let room_info_v1: RoomInfoV1 = this.deserialize_json(&data)?;
230
231 let room_id = this.encode_key(keys::STATE_EVENT, room_info_v1.room_id());
233 let event_type =
234 this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
235 let create_res = txn
236 .prepare(
237 "SELECT stripped, data FROM state_event
238 WHERE room_id = ? AND event_type = ?",
239 )?
240 .query_row([room_id, event_type], |row| {
241 Ok((row.get::<_, bool>(0)?, row.get::<_, Vec<u8>>(1)?))
242 })
243 .optional()?;
244
245 let create = create_res.and_then(|(stripped, data)| {
246 let create = if stripped {
247 SyncOrStrippedState::<RoomCreateEventContent>::Stripped(
248 this.deserialize_json(&data).ok()?,
249 )
250 } else {
251 SyncOrStrippedState::Sync(this.deserialize_json(&data).ok()?)
252 };
253 Some(create)
254 });
255
256 let migrated_room_info = room_info_v1.migrate(create.as_ref());
257
258 let data = this.serialize_json(&migrated_room_info)?;
259 let room_id = this.encode_key(keys::ROOM_INFO, migrated_room_info.room_id());
260 txn.prepare_cached("UPDATE room_info SET data = ? WHERE room_id = ?")?
261 .execute((data, room_id))?;
262 }
263
264 txn.set_db_version(3)?;
265 Result::<_, Error>::Ok(())
266 })
267 .await?;
268 }
269
270 if from < 4 && to >= 4 {
271 conn.with_transaction(move |txn| {
272 txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
274 txn.set_db_version(4)
275 })
276 .await?;
277 }
278
279 if from < 5 && to >= 5 {
280 conn.with_transaction(move |txn| {
281 txn.execute_batch(include_str!(
283 "../migrations/state_store/004_send_queue_with_roomid_value.sql"
284 ))?;
285 txn.set_db_version(4)
286 })
287 .await?;
288 }
289
290 if from < 6 && to >= 6 {
291 conn.with_transaction(move |txn| {
292 txn.execute_batch(include_str!(
294 "../migrations/state_store/005_send_queue_dependent_events.sql"
295 ))?;
296 txn.set_db_version(6)
297 })
298 .await?;
299 }
300
301 if from < 7 && to >= 7 {
302 conn.with_transaction(move |txn| {
303 txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
305 txn.set_db_version(7)
306 })
307 .await?;
308 }
309
310 if from < 8 && to >= 8 {
311 let error = QueueWedgeError::GenericApiError {
313 msg: "local echo failed to send in a previous session".into(),
314 };
315 let default_err = self.serialize_value(&error)?;
316
317 conn.with_transaction(move |txn| {
318 txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?;
320
321 for wedged_entries in txn
324 .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")?
325 .query_map((), |row| {
326 Ok(
327 (row.get::<_, Vec<u8>>(0)?,row.get::<_, String>(1)?)
328 )
329 })? {
330
331 let (room_id, transaction_id) = wedged_entries?;
332
333 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?
334 .execute((default_err.clone(), room_id, transaction_id))?;
335 }
336
337
338 txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?;
340
341 txn.set_db_version(8)
342 })
343 .await?;
344 }
345
346 if from < 9 && to >= 9 {
347 conn.with_transaction(move |txn| {
348 txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?;
350 txn.set_db_version(9)
351 })
352 .await?;
353 }
354
355 if from < 10 && to >= 10 {
356 conn.with_transaction(move |txn| {
357 txn.execute_batch(include_str!(
359 "../migrations/state_store/009_send_queue_priority.sql"
360 ))?;
361 txn.set_db_version(10)
362 })
363 .await?;
364 }
365
366 if from < 11 && to >= 11 {
367 conn.with_transaction(move |txn| {
368 txn.execute_batch(include_str!(
370 "../migrations/state_store/010_send_queue_enqueue_time.sql"
371 ))?;
372 txn.set_db_version(11)
373 })
374 .await?;
375 }
376
377 if from < 12 && to >= 12 {
378 conn.vacuum().await?;
382 conn.set_kv("version", vec![12]).await?;
383 }
384
385 if from < 13 && to >= 13 {
386 conn.with_transaction(move |txn| {
387 txn.execute_batch(include_str!(
389 "../migrations/state_store/011_thread_subscriptions.sql"
390 ))?;
391 txn.set_db_version(13)
392 })
393 .await?;
394 }
395
396 if from < 14 && to >= 14 {
397 conn.with_transaction(move |txn| {
398 txn.execute_batch(include_str!(
400 "../migrations/state_store/012_thread_subscriptions_bumpstamp.sql"
401 ))?;
402 txn.set_db_version(14)
403 })
404 .await?;
405 }
406
407 if from < 15 && to >= 15 {
408 conn.with_transaction(move |txn| {
409 txn.execute_batch(include_str!(
411 "../migrations/state_store/013_send_queue_new_parent_key_format.sql"
412 ))?;
413 txn.set_db_version(15)
414 })
415 .await?;
416 }
417
418 Ok(())
419 }
420
421 fn encode_state_store_data_key(&self, key: StateStoreDataKey<'_>) -> Key {
422 let key_s = match key {
423 StateStoreDataKey::SyncToken => Cow::Borrowed(StateStoreDataKey::SYNC_TOKEN),
424 StateStoreDataKey::SupportedVersions => {
425 Cow::Borrowed(StateStoreDataKey::SUPPORTED_VERSIONS)
426 }
427 StateStoreDataKey::WellKnown => Cow::Borrowed(StateStoreDataKey::WELL_KNOWN),
428 StateStoreDataKey::Filter(f) => {
429 Cow::Owned(format!("{}:{f}", StateStoreDataKey::FILTER))
430 }
431 StateStoreDataKey::UserAvatarUrl(u) => {
432 Cow::Owned(format!("{}:{u}", StateStoreDataKey::USER_AVATAR_URL))
433 }
434 StateStoreDataKey::RecentlyVisitedRooms(b) => {
435 Cow::Owned(format!("{}:{b}", StateStoreDataKey::RECENTLY_VISITED_ROOMS))
436 }
437 StateStoreDataKey::UtdHookManagerData => {
438 Cow::Borrowed(StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
439 }
440 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
441 Cow::Borrowed(StateStoreDataKey::ONE_TIME_KEY_ALREADY_UPLOADED)
442 }
443 StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
444 if let Some(thread_root) = thread_root {
445 Cow::Owned(format!(
446 "{}:{room_id}:{thread_root}",
447 StateStoreDataKey::COMPOSER_DRAFT
448 ))
449 } else {
450 Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::COMPOSER_DRAFT))
451 }
452 }
453 StateStoreDataKey::SeenKnockRequests(room_id) => {
454 Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::SEEN_KNOCK_REQUESTS))
455 }
456 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
457 Cow::Borrowed(StateStoreDataKey::THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS)
458 }
459 };
460
461 self.encode_key(keys::KV_BLOB, &*key_s)
462 }
463
464 fn encode_presence_key(&self, user_id: &UserId) -> Key {
465 self.encode_key(keys::KV_BLOB, format!("presence:{user_id}"))
466 }
467
468 fn encode_custom_key(&self, key: &[u8]) -> Key {
469 let mut full_key = b"custom:".to_vec();
470 full_key.extend(key);
471 self.encode_key(keys::KV_BLOB, full_key)
472 }
473
474 #[instrument(skip_all)]
476 async fn read(&self) -> Result<SqliteAsyncConn> {
477 Ok(self.pool.get().await?)
478 }
479
480 #[instrument(skip_all)]
482 async fn write(&self) -> OwnedMutexGuard<SqliteAsyncConn> {
483 self.write_connection.clone().lock_owned().await
484 }
485
486 fn remove_maybe_stripped_room_data(
487 &self,
488 txn: &Transaction<'_>,
489 room_id: &RoomId,
490 stripped: bool,
491 ) -> rusqlite::Result<()> {
492 let state_event_room_id = self.encode_key(keys::STATE_EVENT, room_id);
493 txn.remove_room_state_events(&state_event_room_id, Some(stripped))?;
494
495 let member_room_id = self.encode_key(keys::MEMBER, room_id);
496 txn.remove_room_members(&member_room_id, Some(stripped))
497 }
498
499 pub async fn vacuum(&self) -> Result<()> {
500 self.write_connection.lock().await.vacuum().await
501 }
502
503 pub async fn get_db_size(&self) -> Result<Option<usize>> {
504 let read_conn = self.pool.get().await?;
505 Ok(Some(read_conn.get_db_size().await?))
506 }
507}
508
509impl EncryptableStore for SqliteStateStore {
510 fn get_cypher(&self) -> Option<&StoreCipher> {
511 self.store_cipher.as_deref()
512 }
513}
514
515async fn init(conn: &SqliteAsyncConn) -> Result<()> {
517 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
520 conn.with_transaction(|txn| {
521 txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
522 txn.set_db_version(1)?;
523
524 Ok(())
525 })
526 .await
527}
528
529trait SqliteConnectionStateStoreExt {
530 fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()>;
531
532 fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()>;
533
534 fn set_room_account_data(
535 &self,
536 room_id: &[u8],
537 event_type: &[u8],
538 data: &[u8],
539 ) -> rusqlite::Result<()>;
540 fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()>;
541
542 fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()>;
543 fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>>;
544 fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()>;
545
546 fn set_state_event(
547 &self,
548 room_id: &[u8],
549 event_type: &[u8],
550 state_key: &[u8],
551 stripped: bool,
552 event_id: Option<&[u8]>,
553 data: &[u8],
554 ) -> rusqlite::Result<()>;
555 fn get_state_event_by_id(
556 &self,
557 room_id: &[u8],
558 event_id: &[u8],
559 ) -> rusqlite::Result<Option<Vec<u8>>>;
560 fn remove_room_state_events(
561 &self,
562 room_id: &[u8],
563 stripped: Option<bool>,
564 ) -> rusqlite::Result<()>;
565
566 fn set_member(
567 &self,
568 room_id: &[u8],
569 user_id: &[u8],
570 membership: &[u8],
571 stripped: bool,
572 data: &[u8],
573 ) -> rusqlite::Result<()>;
574 fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()>;
575
576 fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
577 fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()>;
578 fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()>;
579
580 fn set_receipt(
581 &self,
582 room_id: &[u8],
583 user_id: &[u8],
584 receipt_type: &[u8],
585 thread_id: &[u8],
586 event_id: &[u8],
587 data: &[u8],
588 ) -> rusqlite::Result<()>;
589 fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()>;
590
591 fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()>;
592 fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()>;
593 fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()>;
594 fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
595 fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
596}
597
598impl SqliteConnectionStateStoreExt for rusqlite::Connection {
599 fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()> {
600 self.execute("INSERT OR REPLACE INTO kv_blob VALUES (?, ?)", (key, value))?;
601 Ok(())
602 }
603
604 fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()> {
605 self.prepare_cached(
606 "INSERT OR REPLACE INTO global_account_data (event_type, data)
607 VALUES (?, ?)",
608 )?
609 .execute((event_type, data))?;
610 Ok(())
611 }
612
613 fn set_room_account_data(
614 &self,
615 room_id: &[u8],
616 event_type: &[u8],
617 data: &[u8],
618 ) -> rusqlite::Result<()> {
619 self.prepare_cached(
620 "INSERT OR REPLACE INTO room_account_data (room_id, event_type, data)
621 VALUES (?, ?, ?)",
622 )?
623 .execute((room_id, event_type, data))?;
624 Ok(())
625 }
626
627 fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()> {
628 self.prepare(
629 "DELETE FROM room_account_data
630 WHERE room_id = ?",
631 )?
632 .execute((room_id,))?;
633 Ok(())
634 }
635
636 fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()> {
637 self.prepare_cached(
638 "INSERT OR REPLACE INTO room_info (room_id, state, data)
639 VALUES (?, ?, ?)",
640 )?
641 .execute((room_id, state, data))?;
642 Ok(())
643 }
644
645 fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>> {
646 self.query_row("SELECT data FROM room_info WHERE room_id = ?", (room_id,), |row| row.get(0))
647 .optional()
648 }
649
650 fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()> {
652 self.prepare_cached("DELETE FROM room_info WHERE room_id = ?")?.execute((room_id,))?;
653 Ok(())
654 }
655
656 fn set_state_event(
657 &self,
658 room_id: &[u8],
659 event_type: &[u8],
660 state_key: &[u8],
661 stripped: bool,
662 event_id: Option<&[u8]>,
663 data: &[u8],
664 ) -> rusqlite::Result<()> {
665 self.prepare_cached(
666 "INSERT OR REPLACE
667 INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
668 VALUES (?, ?, ?, ?, ?, ?)",
669 )?
670 .execute((room_id, event_type, state_key, stripped, event_id, data))?;
671 Ok(())
672 }
673
674 fn get_state_event_by_id(
675 &self,
676 room_id: &[u8],
677 event_id: &[u8],
678 ) -> rusqlite::Result<Option<Vec<u8>>> {
679 self.query_row(
680 "SELECT data FROM state_event WHERE room_id = ? AND event_id = ?",
681 (room_id, event_id),
682 |row| row.get(0),
683 )
684 .optional()
685 }
686
687 fn remove_room_state_events(
693 &self,
694 room_id: &[u8],
695 stripped: Option<bool>,
696 ) -> rusqlite::Result<()> {
697 if let Some(stripped) = stripped {
698 self.prepare_cached("DELETE FROM state_event WHERE room_id = ? AND stripped = ?")?
699 .execute((room_id, stripped))?;
700 } else {
701 self.prepare_cached("DELETE FROM state_event WHERE room_id = ?")?
702 .execute((room_id,))?;
703 }
704 Ok(())
705 }
706
707 fn set_member(
708 &self,
709 room_id: &[u8],
710 user_id: &[u8],
711 membership: &[u8],
712 stripped: bool,
713 data: &[u8],
714 ) -> rusqlite::Result<()> {
715 self.prepare_cached(
716 "INSERT OR REPLACE
717 INTO member (room_id, user_id, membership, stripped, data)
718 VALUES (?, ?, ?, ?, ?)",
719 )?
720 .execute((room_id, user_id, membership, stripped, data))?;
721 Ok(())
722 }
723
724 fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()> {
729 if let Some(stripped) = stripped {
730 self.prepare_cached("DELETE FROM member WHERE room_id = ? AND stripped = ?")?
731 .execute((room_id, stripped))?;
732 } else {
733 self.prepare_cached("DELETE FROM member WHERE room_id = ?")?.execute((room_id,))?;
734 }
735 Ok(())
736 }
737
738 fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
739 self.prepare_cached(
740 "INSERT OR REPLACE
741 INTO profile (room_id, user_id, data)
742 VALUES (?, ?, ?)",
743 )?
744 .execute((room_id, user_id, data))?;
745 Ok(())
746 }
747
748 fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()> {
749 self.prepare("DELETE FROM profile WHERE room_id = ?")?.execute((room_id,))?;
750 Ok(())
751 }
752
753 fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()> {
754 self.prepare("DELETE FROM profile WHERE room_id = ? AND user_id = ?")?
755 .execute((room_id, user_id))?;
756 Ok(())
757 }
758
759 fn set_receipt(
760 &self,
761 room_id: &[u8],
762 user_id: &[u8],
763 receipt_type: &[u8],
764 thread: &[u8],
765 event_id: &[u8],
766 data: &[u8],
767 ) -> rusqlite::Result<()> {
768 self.prepare_cached(
769 "INSERT OR REPLACE
770 INTO receipt (room_id, user_id, receipt_type, thread, event_id, data)
771 VALUES (?, ?, ?, ?, ?, ?)",
772 )?
773 .execute((room_id, user_id, receipt_type, thread, event_id, data))?;
774 Ok(())
775 }
776
777 fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()> {
778 self.prepare("DELETE FROM receipt WHERE room_id = ?")?.execute((room_id,))?;
779 Ok(())
780 }
781
782 fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
783 self.prepare_cached(
784 "INSERT OR REPLACE
785 INTO display_name (room_id, name, data)
786 VALUES (?, ?, ?)",
787 )?
788 .execute((room_id, name, data))?;
789 Ok(())
790 }
791
792 fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()> {
793 self.prepare("DELETE FROM display_name WHERE room_id = ? AND name = ?")?
794 .execute((room_id, name))?;
795 Ok(())
796 }
797
798 fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()> {
799 self.prepare("DELETE FROM display_name WHERE room_id = ?")?.execute((room_id,))?;
800 Ok(())
801 }
802
803 fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
804 self.prepare("DELETE FROM send_queue_events WHERE room_id = ?")?.execute((room_id,))?;
805 Ok(())
806 }
807
808 fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
809 self.prepare("DELETE FROM dependent_send_queue_events WHERE room_id = ?")?
810 .execute((room_id,))?;
811 Ok(())
812 }
813}
814
815#[async_trait]
816trait SqliteObjectStateStoreExt: SqliteAsyncConnExt {
817 async fn get_kv_blob(&self, key: Key) -> Result<Option<Vec<u8>>> {
818 Ok(self
819 .query_row("SELECT value FROM kv_blob WHERE key = ?", (key,), |row| row.get(0))
820 .await
821 .optional()?)
822 }
823
824 async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
825 let keys_length = keys.len();
826
827 self.chunk_large_query_over(keys, Some(keys_length), |txn, keys| {
828 let sql_params = repeat_vars(keys.len());
829 let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");
830
831 let params = rusqlite::params_from_iter(keys);
832
833 Ok(txn
834 .prepare(&sql)?
835 .query(params)?
836 .mapped(|row| row.get(0))
837 .collect::<Result<_, _>>()?)
838 })
839 .await
840 }
841
842 async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;
843
844 async fn delete_kv_blob(&self, key: Key) -> Result<()> {
845 self.execute("DELETE FROM kv_blob WHERE key = ?", (key,)).await?;
846 Ok(())
847 }
848
849 async fn get_room_infos(&self, room_id: Option<Key>) -> Result<Vec<Vec<u8>>> {
850 Ok(match room_id {
851 None => {
852 self.prepare("SELECT data FROM room_info", move |mut stmt| {
853 stmt.query_map((), |row| row.get(0))?.collect()
854 })
855 .await?
856 }
857
858 Some(room_id) => {
859 self.prepare("SELECT data FROM room_info WHERE room_id = ?", move |mut stmt| {
860 stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
861 })
862 .await?
863 }
864 })
865 }
866
867 async fn get_maybe_stripped_state_events_for_keys(
868 &self,
869 room_id: Key,
870 event_type: Key,
871 state_keys: Vec<Key>,
872 ) -> Result<Vec<(bool, Vec<u8>)>> {
873 self.chunk_large_query_over(state_keys, None, move |txn, state_keys: Vec<Key>| {
874 let sql_params = repeat_vars(state_keys.len());
875 let sql = format!(
876 "SELECT stripped, data FROM state_event
877 WHERE room_id = ? AND event_type = ? AND state_key IN ({sql_params})"
878 );
879
880 let params = rusqlite::params_from_iter(
881 [room_id.clone(), event_type.clone()].into_iter().chain(state_keys),
882 );
883
884 Ok(txn
885 .prepare(&sql)?
886 .query(params)?
887 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
888 .collect::<Result<_, _>>()?)
889 })
890 .await
891 }
892
893 async fn get_maybe_stripped_state_events(
894 &self,
895 room_id: Key,
896 event_type: Key,
897 ) -> Result<Vec<(bool, Vec<u8>)>> {
898 Ok(self
899 .prepare(
900 "SELECT stripped, data FROM state_event
901 WHERE room_id = ? AND event_type = ?",
902 |mut stmt| {
903 stmt.query((room_id, event_type))?
904 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
905 .collect()
906 },
907 )
908 .await?)
909 }
910
911 async fn get_profiles(
912 &self,
913 room_id: Key,
914 user_ids: Vec<Key>,
915 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
916 let user_ids_length = user_ids.len();
917
918 self.chunk_large_query_over(user_ids, Some(user_ids_length), move |txn, user_ids| {
919 let sql_params = repeat_vars(user_ids.len());
920 let sql = format!(
921 "SELECT user_id, data FROM profile WHERE room_id = ? AND user_id IN ({sql_params})"
922 );
923
924 let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(user_ids));
925
926 Ok(txn
927 .prepare(&sql)?
928 .query(params)?
929 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
930 .collect::<Result<_, _>>()?)
931 })
932 .await
933 }
934
935 async fn get_user_ids(&self, room_id: Key, memberships: Vec<Key>) -> Result<Vec<Vec<u8>>> {
936 let res = if memberships.is_empty() {
937 self.prepare("SELECT data FROM member WHERE room_id = ?", |mut stmt| {
938 stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
939 })
940 .await?
941 } else {
942 self.chunk_large_query_over(memberships, None, move |txn, memberships| {
943 let sql_params = repeat_vars(memberships.len());
944 let sql = format!(
945 "SELECT data FROM member WHERE room_id = ? AND membership IN ({sql_params})"
946 );
947
948 let params =
949 rusqlite::params_from_iter(iter::once(room_id.clone()).chain(memberships));
950
951 Ok(txn
952 .prepare(&sql)?
953 .query(params)?
954 .mapped(|row| row.get(0))
955 .collect::<Result<_, _>>()?)
956 })
957 .await?
958 };
959
960 Ok(res)
961 }
962
963 async fn get_global_account_data(&self, event_type: Key) -> Result<Option<Vec<u8>>> {
964 Ok(self
965 .query_row(
966 "SELECT data FROM global_account_data WHERE event_type = ?",
967 (event_type,),
968 |row| row.get(0),
969 )
970 .await
971 .optional()?)
972 }
973
974 async fn get_room_account_data(
975 &self,
976 room_id: Key,
977 event_type: Key,
978 ) -> Result<Option<Vec<u8>>> {
979 Ok(self
980 .query_row(
981 "SELECT data FROM room_account_data WHERE room_id = ? AND event_type = ?",
982 (room_id, event_type),
983 |row| row.get(0),
984 )
985 .await
986 .optional()?)
987 }
988
989 async fn get_display_names(
990 &self,
991 room_id: Key,
992 names: Vec<Key>,
993 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
994 let names_length = names.len();
995
996 self.chunk_large_query_over(names, Some(names_length), move |txn, names| {
997 let sql_params = repeat_vars(names.len());
998 let sql = format!(
999 "SELECT name, data FROM display_name WHERE room_id = ? AND name IN ({sql_params})"
1000 );
1001
1002 let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(names));
1003
1004 Ok(txn
1005 .prepare(&sql)?
1006 .query(params)?
1007 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
1008 .collect::<Result<_, _>>()?)
1009 })
1010 .await
1011 }
1012
1013 async fn get_user_receipt(
1014 &self,
1015 room_id: Key,
1016 receipt_type: Key,
1017 thread: Key,
1018 user_id: Key,
1019 ) -> Result<Option<Vec<u8>>> {
1020 Ok(self
1021 .query_row(
1022 "SELECT data FROM receipt
1023 WHERE room_id = ? AND receipt_type = ? AND thread = ? and user_id = ?",
1024 (room_id, receipt_type, thread, user_id),
1025 |row| row.get(0),
1026 )
1027 .await
1028 .optional()?)
1029 }
1030
1031 async fn get_event_receipts(
1032 &self,
1033 room_id: Key,
1034 receipt_type: Key,
1035 thread: Key,
1036 event_id: Key,
1037 ) -> Result<Vec<Vec<u8>>> {
1038 Ok(self
1039 .prepare(
1040 "SELECT data FROM receipt
1041 WHERE room_id = ? AND receipt_type = ? AND thread = ? and event_id = ?",
1042 |mut stmt| {
1043 stmt.query((room_id, receipt_type, thread, event_id))?
1044 .mapped(|row| row.get(0))
1045 .collect()
1046 },
1047 )
1048 .await?)
1049 }
1050}
1051
1052#[async_trait]
1053impl SqliteObjectStateStoreExt for SqliteAsyncConn {
1054 async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()> {
1055 Ok(self.interact(move |conn| conn.set_kv_blob(&key, &value)).await.unwrap()?)
1056 }
1057}
1058
1059#[async_trait]
1060impl StateStore for SqliteStateStore {
1061 type Error = Error;
1062
1063 async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
1064 self.read()
1065 .await?
1066 .get_kv_blob(self.encode_state_store_data_key(key))
1067 .await?
1068 .map(|data| {
1069 Ok(match key {
1070 StateStoreDataKey::SyncToken => {
1071 StateStoreDataValue::SyncToken(self.deserialize_value(&data)?)
1072 }
1073 StateStoreDataKey::SupportedVersions => {
1074 StateStoreDataValue::SupportedVersions(self.deserialize_value(&data)?)
1075 }
1076 StateStoreDataKey::WellKnown => {
1077 StateStoreDataValue::WellKnown(self.deserialize_value(&data)?)
1078 }
1079 StateStoreDataKey::Filter(_) => {
1080 StateStoreDataValue::Filter(self.deserialize_value(&data)?)
1081 }
1082 StateStoreDataKey::UserAvatarUrl(_) => {
1083 StateStoreDataValue::UserAvatarUrl(self.deserialize_value(&data)?)
1084 }
1085 StateStoreDataKey::RecentlyVisitedRooms(_) => {
1086 StateStoreDataValue::RecentlyVisitedRooms(self.deserialize_value(&data)?)
1087 }
1088 StateStoreDataKey::UtdHookManagerData => {
1089 StateStoreDataValue::UtdHookManagerData(self.deserialize_value(&data)?)
1090 }
1091 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1092 StateStoreDataValue::OneTimeKeyAlreadyUploaded
1093 }
1094 StateStoreDataKey::ComposerDraft(_, _) => {
1095 StateStoreDataValue::ComposerDraft(self.deserialize_value(&data)?)
1096 }
1097 StateStoreDataKey::SeenKnockRequests(_) => {
1098 StateStoreDataValue::SeenKnockRequests(self.deserialize_value(&data)?)
1099 }
1100 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
1101 StateStoreDataValue::ThreadSubscriptionsCatchupTokens(
1102 self.deserialize_value(&data)?,
1103 )
1104 }
1105 })
1106 })
1107 .transpose()
1108 }
1109
1110 async fn set_kv_data(
1111 &self,
1112 key: StateStoreDataKey<'_>,
1113 value: StateStoreDataValue,
1114 ) -> Result<()> {
1115 let serialized_value = match key {
1116 StateStoreDataKey::SyncToken => self.serialize_value(
1117 &value.into_sync_token().expect("Session data not a sync token"),
1118 )?,
1119 StateStoreDataKey::SupportedVersions => self.serialize_value(
1120 &value
1121 .into_supported_versions()
1122 .expect("Session data not containing supported versions"),
1123 )?,
1124 StateStoreDataKey::WellKnown => self.serialize_value(
1125 &value.into_well_known().expect("Session data not containing well-known"),
1126 )?,
1127 StateStoreDataKey::Filter(_) => {
1128 self.serialize_value(&value.into_filter().expect("Session data not a filter"))?
1129 }
1130 StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
1131 &value.into_user_avatar_url().expect("Session data not an user avatar url"),
1132 )?,
1133 StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
1134 &value.into_recently_visited_rooms().expect("Session data not breadcrumbs"),
1135 )?,
1136 StateStoreDataKey::UtdHookManagerData => self.serialize_value(
1137 &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
1138 )?,
1139 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1140 self.serialize_value(&true).expect("We should be able to serialize a boolean")
1141 }
1142 StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
1143 &value.into_composer_draft().expect("Session data not a composer draft"),
1144 )?,
1145 StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
1146 &value
1147 .into_seen_knock_requests()
1148 .expect("Session data is not a set of seen knock request ids"),
1149 )?,
1150 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => self.serialize_value(
1151 &value
1152 .into_thread_subscriptions_catchup_tokens()
1153 .expect("Session data is not a list of thread subscription catchup tokens"),
1154 )?,
1155 };
1156
1157 self.write()
1158 .await
1159 .set_kv_blob(self.encode_state_store_data_key(key), serialized_value)
1160 .await
1161 }
1162
1163 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
1164 self.write().await.delete_kv_blob(self.encode_state_store_data_key(key)).await
1165 }
1166
1167 async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
1168 let changes = changes.to_owned();
1169 let this = self.clone();
1170 self.write()
1171 .await
1172 .with_transaction(move |txn| {
1173 let StateChanges {
1174 sync_token,
1175 account_data,
1176 presence,
1177 profiles,
1178 profiles_to_delete,
1179 state,
1180 room_account_data,
1181 room_infos,
1182 receipts,
1183 redactions,
1184 stripped_state,
1185 ambiguity_maps,
1186 } = changes;
1187
1188 if let Some(sync_token) = sync_token {
1189 let key = this.encode_state_store_data_key(StateStoreDataKey::SyncToken);
1190 let value = this.serialize_value(&sync_token)?;
1191 txn.set_kv_blob(&key, &value)?;
1192 }
1193
1194 for (event_type, event) in account_data {
1195 let event_type =
1196 this.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1197 let data = this.serialize_json(&event)?;
1198 txn.set_global_account_data(&event_type, &data)?;
1199 }
1200
1201 for (room_id, events) in room_account_data {
1202 let room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1203 for (event_type, event) in events {
1204 let event_type =
1205 this.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1206 let data = this.serialize_json(&event)?;
1207 txn.set_room_account_data(&room_id, &event_type, &data)?;
1208 }
1209 }
1210
1211 for (user_id, event) in presence {
1212 let key = this.encode_presence_key(&user_id);
1213 let value = this.serialize_json(&event)?;
1214 txn.set_kv_blob(&key, &value)?;
1215 }
1216
1217 for (room_id, room_info) in room_infos {
1218 let stripped = room_info.state() == RoomState::Invited;
1219 this.remove_maybe_stripped_room_data(txn, &room_id, !stripped)?;
1221
1222 let room_id = this.encode_key(keys::ROOM_INFO, room_id);
1223 let state = this
1224 .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
1225 let data = this.serialize_json(&room_info)?;
1226 txn.set_room_info(&room_id, &state, &data)?;
1227 }
1228
1229 for (room_id, user_ids) in profiles_to_delete {
1230 let room_id = this.encode_key(keys::PROFILE, room_id);
1231 for user_id in user_ids {
1232 let user_id = this.encode_key(keys::PROFILE, user_id);
1233 txn.remove_room_profile(&room_id, &user_id)?;
1234 }
1235 }
1236
1237 for (room_id, state_event_types) in state {
1238 let profiles = profiles.get(&room_id);
1239 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1240
1241 for (event_type, state_events) in state_event_types {
1242 let encoded_event_type =
1243 this.encode_key(keys::STATE_EVENT, event_type.to_string());
1244
1245 for (state_key, raw_state_event) in state_events {
1246 let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1247 let data = this.serialize_json(&raw_state_event)?;
1248
1249 let event_id: Option<String> =
1250 raw_state_event.get_field("event_id").ok().flatten();
1251 let encoded_event_id =
1252 event_id.as_ref().map(|e| this.encode_key(keys::STATE_EVENT, e));
1253
1254 txn.set_state_event(
1255 &encoded_room_id,
1256 &encoded_event_type,
1257 &encoded_state_key,
1258 false,
1259 encoded_event_id.as_deref(),
1260 &data,
1261 )?;
1262
1263 if event_type == StateEventType::RoomMember {
1264 let member_event = match raw_state_event
1265 .deserialize_as_unchecked::<SyncRoomMemberEvent>()
1266 {
1267 Ok(ev) => ev,
1268 Err(e) => {
1269 debug!(event_id, "Failed to deserialize member event: {e}");
1270 continue;
1271 }
1272 };
1273
1274 let encoded_room_id = this.encode_key(keys::MEMBER, &room_id);
1275 let user_id = this.encode_key(keys::MEMBER, &state_key);
1276 let membership = this
1277 .encode_key(keys::MEMBER, member_event.membership().as_str());
1278 let data = this.serialize_value(&state_key)?;
1279
1280 txn.set_member(
1281 &encoded_room_id,
1282 &user_id,
1283 &membership,
1284 false,
1285 &data,
1286 )?;
1287
1288 if let Some(profile) =
1289 profiles.and_then(|p| p.get(member_event.state_key()))
1290 {
1291 let room_id = this.encode_key(keys::PROFILE, &room_id);
1292 let user_id = this.encode_key(keys::PROFILE, &state_key);
1293 let data = this.serialize_json(&profile)?;
1294 txn.set_profile(&room_id, &user_id, &data)?;
1295 }
1296 }
1297 }
1298 }
1299 }
1300
1301 for (room_id, stripped_state_event_types) in stripped_state {
1302 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1303
1304 for (event_type, stripped_state_events) in stripped_state_event_types {
1305 let encoded_event_type =
1306 this.encode_key(keys::STATE_EVENT, event_type.to_string());
1307
1308 for (state_key, raw_stripped_state_event) in stripped_state_events {
1309 let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1310 let data = this.serialize_json(&raw_stripped_state_event)?;
1311 txn.set_state_event(
1312 &encoded_room_id,
1313 &encoded_event_type,
1314 &encoded_state_key,
1315 true,
1316 None,
1317 &data,
1318 )?;
1319
1320 if event_type == StateEventType::RoomMember {
1321 let member_event = match raw_stripped_state_event
1322 .deserialize_as_unchecked::<StrippedRoomMemberEvent>(
1323 ) {
1324 Ok(ev) => ev,
1325 Err(e) => {
1326 debug!("Failed to deserialize stripped member event: {e}");
1327 continue;
1328 }
1329 };
1330
1331 let room_id = this.encode_key(keys::MEMBER, &room_id);
1332 let user_id = this.encode_key(keys::MEMBER, &state_key);
1333 let membership = this.encode_key(
1334 keys::MEMBER,
1335 member_event.content.membership.as_str(),
1336 );
1337 let data = this.serialize_value(&state_key)?;
1338
1339 txn.set_member(&room_id, &user_id, &membership, true, &data)?;
1340 }
1341 }
1342 }
1343 }
1344
1345 for (room_id, receipt_event) in receipts {
1346 let room_id = this.encode_key(keys::RECEIPT, room_id);
1347
1348 for (event_id, receipt_types) in receipt_event {
1349 let encoded_event_id = this.encode_key(keys::RECEIPT, &event_id);
1350
1351 for (receipt_type, receipt_users) in receipt_types {
1352 let receipt_type =
1353 this.encode_key(keys::RECEIPT, receipt_type.as_str());
1354
1355 for (user_id, receipt) in receipt_users {
1356 let encoded_user_id = this.encode_key(keys::RECEIPT, &user_id);
1357 let thread = this.encode_key(
1360 keys::RECEIPT,
1361 rmp_serde::to_vec_named(&receipt.thread)?,
1362 );
1363 let data = this.serialize_json(&ReceiptData {
1364 receipt,
1365 event_id: event_id.clone(),
1366 user_id,
1367 })?;
1368
1369 txn.set_receipt(
1370 &room_id,
1371 &encoded_user_id,
1372 &receipt_type,
1373 &thread,
1374 &encoded_event_id,
1375 &data,
1376 )?;
1377 }
1378 }
1379 }
1380 }
1381
1382 for (room_id, redactions) in redactions {
1383 let make_redaction_rules = || {
1384 let encoded_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1385 txn.get_room_info(&encoded_room_id)
1386 .ok()
1387 .flatten()
1388 .and_then(|v| this.deserialize_json::<RoomInfo>(&v).ok())
1389 .map(|info| info.room_version_rules_or_default())
1390 .unwrap_or_else(|| {
1391 warn!(
1392 ?room_id,
1393 "Unable to get the room version rules, defaulting to rules for room version {ROOM_VERSION_FALLBACK}"
1394 );
1395 ROOM_VERSION_RULES_FALLBACK
1396 }).redaction
1397 };
1398
1399 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1400 let mut redaction_rules = None;
1401
1402 for (event_id, redaction) in redactions {
1403 let event_id = this.encode_key(keys::STATE_EVENT, event_id);
1404
1405 if let Some(Ok(raw_event)) = txn
1406 .get_state_event_by_id(&encoded_room_id, &event_id)?
1407 .map(|value| this.deserialize_json::<Raw<AnySyncStateEvent>>(&value))
1408 {
1409 let event = raw_event.deserialize()?;
1410 let redacted = redact(
1411 raw_event.deserialize_as::<CanonicalJsonObject>()?,
1412 redaction_rules.get_or_insert_with(make_redaction_rules),
1413 Some(RedactedBecause::from_raw_event(&redaction)?),
1414 )
1415 .map_err(Error::Redaction)?;
1416 let data = this.serialize_json(&redacted)?;
1417
1418 let event_type =
1419 this.encode_key(keys::STATE_EVENT, event.event_type().to_string());
1420 let state_key = this.encode_key(keys::STATE_EVENT, event.state_key());
1421
1422 txn.set_state_event(
1423 &encoded_room_id,
1424 &event_type,
1425 &state_key,
1426 false,
1427 Some(&event_id),
1428 &data,
1429 )?;
1430 }
1431 }
1432 }
1433
1434 for (room_id, display_names) in ambiguity_maps {
1435 let room_id = this.encode_key(keys::DISPLAY_NAME, room_id);
1436
1437 for (name, user_ids) in display_names {
1438 let encoded_name = this.encode_key(
1439 keys::DISPLAY_NAME,
1440 name.as_normalized_str().unwrap_or_else(|| name.as_raw_str()),
1441 );
1442 let data = this.serialize_json(&user_ids)?;
1443
1444 if user_ids.is_empty() {
1445 txn.remove_display_name(&room_id, &encoded_name)?;
1446
1447 let raw_name = this.encode_key(keys::DISPLAY_NAME, name.as_raw_str());
1462 txn.remove_display_name(&room_id, &raw_name)?;
1463 } else {
1464 txn.set_display_name(&room_id, &encoded_name, &data)?;
1466 }
1467 }
1468 }
1469
1470 Ok::<_, Error>(())
1471 })
1472 .await?;
1473
1474 Ok(())
1475 }
1476
1477 async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1478 self.read()
1479 .await?
1480 .get_kv_blob(self.encode_presence_key(user_id))
1481 .await?
1482 .map(|data| self.deserialize_json(&data))
1483 .transpose()
1484 }
1485
1486 async fn get_presence_events(
1487 &self,
1488 user_ids: &[OwnedUserId],
1489 ) -> Result<Vec<Raw<PresenceEvent>>> {
1490 if user_ids.is_empty() {
1491 return Ok(Vec::new());
1492 }
1493
1494 let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
1495 self.read()
1496 .await?
1497 .get_kv_blobs(user_ids)
1498 .await?
1499 .into_iter()
1500 .map(|data| self.deserialize_json(&data))
1501 .collect()
1502 }
1503
1504 async fn get_state_event(
1505 &self,
1506 room_id: &RoomId,
1507 event_type: StateEventType,
1508 state_key: &str,
1509 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1510 Ok(self
1511 .get_state_events_for_keys(room_id, event_type, &[state_key])
1512 .await?
1513 .into_iter()
1514 .next())
1515 }
1516
1517 async fn get_state_events(
1518 &self,
1519 room_id: &RoomId,
1520 event_type: StateEventType,
1521 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1522 let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1523 let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1524 self.read()
1525 .await?
1526 .get_maybe_stripped_state_events(room_id, event_type)
1527 .await?
1528 .into_iter()
1529 .map(|(stripped, data)| {
1530 let ev = if stripped {
1531 RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1532 } else {
1533 RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1534 };
1535
1536 Ok(ev)
1537 })
1538 .collect()
1539 }
1540
1541 async fn get_state_events_for_keys(
1542 &self,
1543 room_id: &RoomId,
1544 event_type: StateEventType,
1545 state_keys: &[&str],
1546 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1547 if state_keys.is_empty() {
1548 return Ok(Vec::new());
1549 }
1550
1551 let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1552 let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1553 let state_keys = state_keys.iter().map(|k| self.encode_key(keys::STATE_EVENT, k)).collect();
1554 self.read()
1555 .await?
1556 .get_maybe_stripped_state_events_for_keys(room_id, event_type, state_keys)
1557 .await?
1558 .into_iter()
1559 .map(|(stripped, data)| {
1560 let ev = if stripped {
1561 RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1562 } else {
1563 RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1564 };
1565
1566 Ok(ev)
1567 })
1568 .collect()
1569 }
1570
1571 async fn get_profile(
1572 &self,
1573 room_id: &RoomId,
1574 user_id: &UserId,
1575 ) -> Result<Option<MinimalRoomMemberEvent>> {
1576 let room_id = self.encode_key(keys::PROFILE, room_id);
1577 let user_ids = vec![self.encode_key(keys::PROFILE, user_id)];
1578
1579 self.read()
1580 .await?
1581 .get_profiles(room_id, user_ids)
1582 .await?
1583 .into_iter()
1584 .next()
1585 .map(|(_, data)| self.deserialize_json(&data))
1586 .transpose()
1587 }
1588
1589 async fn get_profiles<'a>(
1590 &self,
1591 room_id: &RoomId,
1592 user_ids: &'a [OwnedUserId],
1593 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1594 if user_ids.is_empty() {
1595 return Ok(BTreeMap::new());
1596 }
1597
1598 let room_id = self.encode_key(keys::PROFILE, room_id);
1599 let mut user_ids_map = user_ids
1600 .iter()
1601 .map(|u| (self.encode_key(keys::PROFILE, u), u.as_ref()))
1602 .collect::<BTreeMap<_, _>>();
1603 let user_ids = user_ids_map.keys().cloned().collect();
1604
1605 self.read()
1606 .await?
1607 .get_profiles(room_id, user_ids)
1608 .await?
1609 .into_iter()
1610 .map(|(user_id, data)| {
1611 Ok((
1612 user_ids_map
1613 .remove(user_id.as_slice())
1614 .expect("returned user IDs were requested"),
1615 self.deserialize_json(&data)?,
1616 ))
1617 })
1618 .collect()
1619 }
1620
1621 async fn get_user_ids(
1622 &self,
1623 room_id: &RoomId,
1624 membership: RoomMemberships,
1625 ) -> Result<Vec<OwnedUserId>> {
1626 let room_id = self.encode_key(keys::MEMBER, room_id);
1627 let memberships = membership
1628 .as_vec()
1629 .into_iter()
1630 .map(|m| self.encode_key(keys::MEMBER, m.as_str()))
1631 .collect();
1632 self.read()
1633 .await?
1634 .get_user_ids(room_id, memberships)
1635 .await?
1636 .iter()
1637 .map(|data| self.deserialize_value(data))
1638 .collect()
1639 }
1640
1641 async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1642 self.read()
1643 .await?
1644 .get_room_infos(match room_load_settings {
1645 RoomLoadSettings::All => None,
1646 RoomLoadSettings::One(room_id) => Some(self.encode_key(keys::ROOM_INFO, room_id)),
1647 })
1648 .await?
1649 .into_iter()
1650 .map(|data| self.deserialize_json(&data))
1651 .collect()
1652 }
1653
1654 async fn get_users_with_display_name(
1655 &self,
1656 room_id: &RoomId,
1657 display_name: &DisplayName,
1658 ) -> Result<BTreeSet<OwnedUserId>> {
1659 let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1660 let names = vec![self.encode_key(
1661 keys::DISPLAY_NAME,
1662 display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1663 )];
1664
1665 Ok(self
1666 .read()
1667 .await?
1668 .get_display_names(room_id, names)
1669 .await?
1670 .into_iter()
1671 .next()
1672 .map(|(_, data)| self.deserialize_json(&data))
1673 .transpose()?
1674 .unwrap_or_default())
1675 }
1676
1677 async fn get_users_with_display_names<'a>(
1678 &self,
1679 room_id: &RoomId,
1680 display_names: &'a [DisplayName],
1681 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1682 let mut result = HashMap::new();
1683
1684 if display_names.is_empty() {
1685 return Ok(result);
1686 }
1687
1688 let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1689 let mut names_map = display_names
1690 .iter()
1691 .flat_map(|display_name| {
1692 let raw =
1702 (self.encode_key(keys::DISPLAY_NAME, display_name.as_raw_str()), display_name);
1703 let normalized = display_name.as_normalized_str().map(|normalized| {
1704 (self.encode_key(keys::DISPLAY_NAME, normalized), display_name)
1705 });
1706
1707 iter::once(raw).chain(normalized)
1708 })
1709 .collect::<BTreeMap<_, _>>();
1710 let names = names_map.keys().cloned().collect();
1711
1712 for (name, data) in self.read().await?.get_display_names(room_id, names).await?.into_iter()
1713 {
1714 let display_name =
1715 names_map.remove(name.as_slice()).expect("returned display names were requested");
1716 let user_ids: BTreeSet<_> = self.deserialize_json(&data)?;
1717
1718 result.entry(display_name).or_insert_with(BTreeSet::new).extend(user_ids);
1719 }
1720
1721 Ok(result)
1722 }
1723
1724 async fn get_account_data_event(
1725 &self,
1726 event_type: GlobalAccountDataEventType,
1727 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1728 let event_type = self.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1729 self.read()
1730 .await?
1731 .get_global_account_data(event_type)
1732 .await?
1733 .map(|value| self.deserialize_json(&value))
1734 .transpose()
1735 }
1736
1737 async fn get_room_account_data_event(
1738 &self,
1739 room_id: &RoomId,
1740 event_type: RoomAccountDataEventType,
1741 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1742 let room_id = self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1743 let event_type = self.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1744 self.read()
1745 .await?
1746 .get_room_account_data(room_id, event_type)
1747 .await?
1748 .map(|value| self.deserialize_json(&value))
1749 .transpose()
1750 }
1751
1752 async fn get_user_room_receipt_event(
1753 &self,
1754 room_id: &RoomId,
1755 receipt_type: ReceiptType,
1756 thread: ReceiptThread,
1757 user_id: &UserId,
1758 ) -> Result<Option<(OwnedEventId, Receipt)>> {
1759 let room_id = self.encode_key(keys::RECEIPT, room_id);
1760 let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1761 let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1764 let user_id = self.encode_key(keys::RECEIPT, user_id);
1765
1766 self.read()
1767 .await?
1768 .get_user_receipt(room_id, receipt_type, thread, user_id)
1769 .await?
1770 .map(|value| {
1771 self.deserialize_json::<ReceiptData>(&value).map(|d| (d.event_id, d.receipt))
1772 })
1773 .transpose()
1774 }
1775
1776 async fn get_event_room_receipt_events(
1777 &self,
1778 room_id: &RoomId,
1779 receipt_type: ReceiptType,
1780 thread: ReceiptThread,
1781 event_id: &EventId,
1782 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1783 let room_id = self.encode_key(keys::RECEIPT, room_id);
1784 let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1785 let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1788 let event_id = self.encode_key(keys::RECEIPT, event_id);
1789
1790 self.read()
1791 .await?
1792 .get_event_receipts(room_id, receipt_type, thread, event_id)
1793 .await?
1794 .iter()
1795 .map(|value| {
1796 self.deserialize_json::<ReceiptData>(value).map(|d| (d.user_id, d.receipt))
1797 })
1798 .collect()
1799 }
1800
1801 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1802 self.read().await?.get_kv_blob(self.encode_custom_key(key)).await
1803 }
1804
1805 async fn set_custom_value_no_read(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
1806 let conn = self.write().await;
1807 let key = self.encode_custom_key(key);
1808 conn.set_kv_blob(key, value).await?;
1809 Ok(())
1810 }
1811
1812 async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1813 let conn = self.write().await;
1814 let key = self.encode_custom_key(key);
1815 let previous = conn.get_kv_blob(key.clone()).await?;
1816 conn.set_kv_blob(key, value).await?;
1817 Ok(previous)
1818 }
1819
1820 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1821 let conn = self.write().await;
1822 let key = self.encode_custom_key(key);
1823 let previous = conn.get_kv_blob(key.clone()).await?;
1824 if previous.is_some() {
1825 conn.delete_kv_blob(key).await?;
1826 }
1827 Ok(previous)
1828 }
1829
1830 async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1831 let this = self.clone();
1832 let room_id = room_id.to_owned();
1833
1834 let conn = self.write().await;
1835
1836 conn.with_transaction(move |txn| -> Result<()> {
1837 let room_info_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1838 txn.remove_room_info(&room_info_room_id)?;
1839
1840 let state_event_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1841 txn.remove_room_state_events(&state_event_room_id, None)?;
1842
1843 let member_room_id = this.encode_key(keys::MEMBER, &room_id);
1844 txn.remove_room_members(&member_room_id, None)?;
1845
1846 let profile_room_id = this.encode_key(keys::PROFILE, &room_id);
1847 txn.remove_room_profiles(&profile_room_id)?;
1848
1849 let room_account_data_room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, &room_id);
1850 txn.remove_room_account_data(&room_account_data_room_id)?;
1851
1852 let receipt_room_id = this.encode_key(keys::RECEIPT, &room_id);
1853 txn.remove_room_receipts(&receipt_room_id)?;
1854
1855 let display_name_room_id = this.encode_key(keys::DISPLAY_NAME, &room_id);
1856 txn.remove_room_display_names(&display_name_room_id)?;
1857
1858 let send_queue_room_id = this.encode_key(keys::SEND_QUEUE, &room_id);
1859 txn.remove_room_send_queue(&send_queue_room_id)?;
1860
1861 let dependent_send_queue_room_id =
1862 this.encode_key(keys::DEPENDENTS_SEND_QUEUE, &room_id);
1863 txn.remove_room_dependent_send_queue(&dependent_send_queue_room_id)?;
1864
1865 let thread_subscriptions_room_id =
1866 this.encode_key(keys::THREAD_SUBSCRIPTIONS, &room_id);
1867 txn.execute(
1868 "DELETE FROM thread_subscriptions WHERE room_id = ?",
1869 (thread_subscriptions_room_id,),
1870 )?;
1871
1872 Ok(())
1873 })
1874 .await?;
1875
1876 conn.vacuum().await
1877 }
1878
1879 async fn save_send_queue_request(
1880 &self,
1881 room_id: &RoomId,
1882 transaction_id: OwnedTransactionId,
1883 created_at: MilliSecondsSinceUnixEpoch,
1884 content: QueuedRequestKind,
1885 priority: usize,
1886 ) -> Result<(), Self::Error> {
1887 let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
1888 let room_id_value = self.serialize_value(&room_id.to_owned())?;
1889
1890 let content = self.serialize_json(&content)?;
1891 let created_at_ts: u64 = created_at.0.into();
1897 self.write()
1898 .await
1899 .with_transaction(move |txn| {
1900 txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
1901 Ok(())
1902 })
1903 .await
1904 }
1905
1906 async fn update_send_queue_request(
1907 &self,
1908 room_id: &RoomId,
1909 transaction_id: &TransactionId,
1910 content: QueuedRequestKind,
1911 ) -> Result<bool, Self::Error> {
1912 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1913
1914 let content = self.serialize_json(&content)?;
1915 let transaction_id = transaction_id.to_string();
1918
1919 let num_updated = self.write()
1920 .await
1921 .with_transaction(move |txn| {
1922 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
1923 })
1924 .await?;
1925
1926 Ok(num_updated > 0)
1927 }
1928
1929 async fn remove_send_queue_request(
1930 &self,
1931 room_id: &RoomId,
1932 transaction_id: &TransactionId,
1933 ) -> Result<bool, Self::Error> {
1934 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1935
1936 let transaction_id = transaction_id.to_string();
1938
1939 let num_deleted = self
1940 .write()
1941 .await
1942 .with_transaction(move |txn| {
1943 txn.prepare_cached(
1944 "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
1945 )?
1946 .execute((room_id, &transaction_id))
1947 })
1948 .await?;
1949
1950 Ok(num_deleted > 0)
1951 }
1952
1953 async fn load_send_queue_requests(
1954 &self,
1955 room_id: &RoomId,
1956 ) -> Result<Vec<QueuedRequest>, Self::Error> {
1957 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1958
1959 let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, Option<u64>)> = self
1963 .read()
1964 .await?
1965 .prepare(
1966 "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
1967 |mut stmt| {
1968 stmt.query((room_id,))?
1969 .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
1970 .collect()
1971 },
1972 )
1973 .await?;
1974
1975 let mut requests = Vec::with_capacity(res.len());
1976
1977 for entry in res {
1978 let created_at = entry
1979 .4
1980 .and_then(UInt::new)
1981 .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
1982
1983 requests.push(QueuedRequest {
1984 transaction_id: entry.0.into(),
1985 kind: self.deserialize_json(&entry.1)?,
1986 error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
1987 priority: entry.3,
1988 created_at,
1989 });
1990 }
1991
1992 Ok(requests)
1993 }
1994
1995 async fn update_send_queue_request_status(
1996 &self,
1997 room_id: &RoomId,
1998 transaction_id: &TransactionId,
1999 error: Option<QueueWedgeError>,
2000 ) -> Result<(), Self::Error> {
2001 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
2002
2003 let transaction_id = transaction_id.to_string();
2005
2006 let error_value = error.map(|e| self.serialize_value(&e)).transpose()?;
2008
2009 self.write()
2010 .await
2011 .with_transaction(move |txn| {
2012 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?;
2013 Ok(())
2014 })
2015 .await
2016 }
2017
2018 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
2019 let res: Vec<Vec<u8>> = self
2024 .read()
2025 .await?
2026 .prepare("SELECT room_id_val FROM send_queue_events", |mut stmt| {
2027 stmt.query(())?.mapped(|row| row.get(0)).collect()
2028 })
2029 .await?;
2030
2031 Ok(res
2034 .into_iter()
2035 .map(|entry| self.deserialize_value(&entry))
2036 .collect::<Result<BTreeSet<OwnedRoomId>, _>>()?
2037 .into_iter()
2038 .collect())
2039 }
2040
2041 async fn save_dependent_queued_request(
2042 &self,
2043 room_id: &RoomId,
2044 parent_txn_id: &TransactionId,
2045 own_txn_id: ChildTransactionId,
2046 created_at: MilliSecondsSinceUnixEpoch,
2047 content: DependentQueuedRequestKind,
2048 ) -> Result<()> {
2049 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2050 let content = self.serialize_json(&content)?;
2051
2052 let parent_txn_id = parent_txn_id.to_string();
2054 let own_txn_id = own_txn_id.to_string();
2055
2056 let created_at_ts: u64 = created_at.0.into();
2057 self.write()
2058 .await
2059 .with_transaction(move |txn| {
2060 txn.prepare_cached(
2061 r#"INSERT INTO dependent_send_queue_events
2062 (room_id, parent_transaction_id, own_transaction_id, content, created_at)
2063 VALUES (?, ?, ?, ?, ?)"#,
2064 )?
2065 .execute((
2066 room_id,
2067 parent_txn_id,
2068 own_txn_id,
2069 content,
2070 created_at_ts,
2071 ))?;
2072 Ok(())
2073 })
2074 .await
2075 }
2076
2077 async fn update_dependent_queued_request(
2078 &self,
2079 room_id: &RoomId,
2080 own_transaction_id: &ChildTransactionId,
2081 new_content: DependentQueuedRequestKind,
2082 ) -> Result<bool> {
2083 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2084 let content = self.serialize_json(&new_content)?;
2085
2086 let own_txn_id = own_transaction_id.to_string();
2088
2089 let num_updated = self
2090 .write()
2091 .await
2092 .with_transaction(move |txn| {
2093 txn.prepare_cached(
2094 r#"UPDATE dependent_send_queue_events
2095 SET content = ?
2096 WHERE own_transaction_id = ?
2097 AND room_id = ?"#,
2098 )?
2099 .execute((content, own_txn_id, room_id))
2100 })
2101 .await?;
2102
2103 if num_updated > 1 {
2104 return Err(Error::InconsistentUpdate);
2105 }
2106
2107 Ok(num_updated == 1)
2108 }
2109
2110 async fn mark_dependent_queued_requests_as_ready(
2111 &self,
2112 room_id: &RoomId,
2113 parent_txn_id: &TransactionId,
2114 parent_key: SentRequestKey,
2115 ) -> Result<usize> {
2116 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2117 let parent_key = self.serialize_json(&parent_key)?;
2118
2119 let parent_txn_id = parent_txn_id.to_string();
2121
2122 self.write()
2123 .await
2124 .with_transaction(move |txn| {
2125 Ok(txn.prepare_cached(
2126 "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?",
2127 )?
2128 .execute((parent_key, parent_txn_id, room_id))?)
2129 })
2130 .await
2131 }
2132
2133 async fn remove_dependent_queued_request(
2134 &self,
2135 room_id: &RoomId,
2136 txn_id: &ChildTransactionId,
2137 ) -> Result<bool> {
2138 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2139
2140 let txn_id = txn_id.to_string();
2142
2143 let num_deleted = self
2144 .write()
2145 .await
2146 .with_transaction(move |txn| {
2147 txn.prepare_cached(
2148 "DELETE FROM dependent_send_queue_events WHERE own_transaction_id = ? AND room_id = ?",
2149 )?
2150 .execute((txn_id, room_id))
2151 })
2152 .await?;
2153
2154 Ok(num_deleted > 0)
2155 }
2156
2157 async fn load_dependent_queued_requests(
2158 &self,
2159 room_id: &RoomId,
2160 ) -> Result<Vec<DependentQueuedRequest>> {
2161 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2162
2163 let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, Option<u64>)> = self
2165 .read()
2166 .await?
2167 .prepare(
2168 "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2169 |mut stmt| {
2170 stmt.query((room_id,))?
2171 .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2172 .collect()
2173 },
2174 )
2175 .await?;
2176
2177 let mut dependent_events = Vec::with_capacity(res.len());
2178
2179 for entry in res {
2180 let created_at = entry
2181 .4
2182 .and_then(UInt::new)
2183 .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2184
2185 dependent_events.push(DependentQueuedRequest {
2186 own_transaction_id: entry.0.into(),
2187 parent_transaction_id: entry.1.into(),
2188 parent_key: entry.2.map(|json| self.deserialize_json(&json)).transpose()?,
2189 kind: self.deserialize_json(&entry.3)?,
2190 created_at,
2191 });
2192 }
2193
2194 Ok(dependent_events)
2195 }
2196
2197 async fn upsert_thread_subscriptions(
2198 &self,
2199 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
2200 ) -> Result<(), Self::Error> {
2201 let values: Vec<_> = updates
2202 .into_iter()
2203 .map(|(room_id, thread_id, subscription)| {
2204 (
2205 self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id),
2206 self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id),
2207 subscription.status.as_str(),
2208 subscription.bump_stamp,
2209 )
2210 })
2211 .collect();
2212
2213 self.write()
2214 .await
2215 .with_transaction(move |txn| {
2216 let mut txn = txn.prepare_cached(
2217 "INSERT INTO thread_subscriptions (room_id, event_id, status, bump_stamp)
2218 VALUES (?, ?, ?, ?)
2219 ON CONFLICT (room_id, event_id) DO UPDATE
2220 SET
2221 status =
2222 CASE
2223 WHEN thread_subscriptions.bump_stamp IS NULL THEN EXCLUDED.status
2224 WHEN EXCLUDED.bump_stamp IS NULL THEN EXCLUDED.status
2225 WHEN thread_subscriptions.bump_stamp < EXCLUDED.bump_stamp THEN EXCLUDED.status
2226 ELSE thread_subscriptions.status
2227 END,
2228 bump_stamp =
2229 CASE
2230 WHEN thread_subscriptions.bump_stamp IS NULL THEN EXCLUDED.bump_stamp
2231 WHEN EXCLUDED.bump_stamp IS NULL THEN thread_subscriptions.bump_stamp
2232 WHEN thread_subscriptions.bump_stamp < EXCLUDED.bump_stamp THEN EXCLUDED.bump_stamp
2233 ELSE thread_subscriptions.bump_stamp
2234 END",
2235 )?;
2236
2237 for value in values {
2238 txn.execute(value)?;
2239 }
2240
2241 Result::<_, Error>::Ok(())
2242 })
2243 .await?;
2244
2245 Ok(())
2246 }
2247
2248 async fn load_thread_subscription(
2249 &self,
2250 room_id: &RoomId,
2251 thread_id: &EventId,
2252 ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
2253 let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2254 let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2255
2256 Ok(self
2257 .read()
2258 .await?
2259 .query_row(
2260 "SELECT status, bump_stamp FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2261 (room_id, thread_id),
2262 |row| Ok((row.get::<_, String>(0)?, row.get::<_, Option<u64>>(1)?))
2263 )
2264 .await
2265 .optional()?
2266 .map(|(status, bump_stamp)| -> Result<_, Self::Error> {
2267 let status = ThreadSubscriptionStatus::from_str(&status).map_err(|_| {
2268 Error::InvalidData { details: format!("Invalid thread status: {status}") }
2269 })?;
2270 Ok(StoredThreadSubscription { status, bump_stamp })
2271 })
2272 .transpose()?)
2273 }
2274
2275 async fn remove_thread_subscription(
2276 &self,
2277 room_id: &RoomId,
2278 thread_id: &EventId,
2279 ) -> Result<(), Self::Error> {
2280 let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2281 let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2282
2283 self.write()
2284 .await
2285 .execute(
2286 "DELETE FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2287 (room_id, thread_id),
2288 )
2289 .await?;
2290
2291 Ok(())
2292 }
2293
2294 async fn optimize(&self) -> Result<(), Self::Error> {
2295 Ok(self.vacuum().await?)
2296 }
2297
2298 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
2299 self.get_db_size().await
2300 }
2301}
2302
2303#[derive(Debug, Clone, Serialize, Deserialize)]
2304struct ReceiptData {
2305 receipt: Receipt,
2306 event_id: OwnedEventId,
2307 user_id: OwnedUserId,
2308}
2309
2310#[cfg(test)]
2311mod tests {
2312 use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2313
2314 use matrix_sdk_base::{StateStore, StoreError, statestore_integration_tests};
2315 use once_cell::sync::Lazy;
2316 use tempfile::{TempDir, tempdir};
2317
2318 use super::SqliteStateStore;
2319
2320 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2321 static NUM: AtomicU32 = AtomicU32::new(0);
2322
2323 async fn get_store() -> Result<impl StateStore, StoreError> {
2324 let name = NUM.fetch_add(1, SeqCst).to_string();
2325 let tmpdir_path = TMP_DIR.path().join(name);
2326
2327 tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2328
2329 Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
2330 }
2331
2332 statestore_integration_tests!();
2333}
2334
2335#[cfg(test)]
2336mod encrypted_tests {
2337 use std::{
2338 path::PathBuf,
2339 sync::atomic::{AtomicU32, Ordering::SeqCst},
2340 };
2341
2342 use matrix_sdk_base::{StateStore, StoreError, statestore_integration_tests};
2343 use matrix_sdk_test::async_test;
2344 use once_cell::sync::Lazy;
2345 use tempfile::{TempDir, tempdir};
2346
2347 use super::SqliteStateStore;
2348 use crate::{SqliteStoreConfig, utils::SqliteAsyncConnExt};
2349
2350 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2351 static NUM: AtomicU32 = AtomicU32::new(0);
2352
2353 fn new_state_store_workspace() -> PathBuf {
2354 let name = NUM.fetch_add(1, SeqCst).to_string();
2355 TMP_DIR.path().join(name)
2356 }
2357
2358 async fn get_store() -> Result<impl StateStore, StoreError> {
2359 let tmpdir_path = new_state_store_workspace();
2360
2361 tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2362
2363 Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
2364 .await
2365 .unwrap())
2366 }
2367
2368 #[async_test]
2369 async fn test_pool_size() {
2370 let tmpdir_path = new_state_store_workspace();
2371 let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
2372
2373 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2374
2375 assert_eq!(store.pool.status().max_size, 42);
2376 }
2377
2378 #[async_test]
2379 async fn test_cache_size() {
2380 let tmpdir_path = new_state_store_workspace();
2381 let store_open_config = SqliteStoreConfig::new(tmpdir_path).cache_size(1500);
2382
2383 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2384
2385 let conn = store.pool.get().await.unwrap();
2386 let cache_size =
2387 conn.query_row("PRAGMA cache_size", (), |row| row.get::<_, i32>(0)).await.unwrap();
2388
2389 assert_eq!(cache_size, -(1500 / 1024));
2393 }
2394
2395 #[async_test]
2396 async fn test_journal_size_limit() {
2397 let tmpdir_path = new_state_store_workspace();
2398 let store_open_config = SqliteStoreConfig::new(tmpdir_path).journal_size_limit(1500);
2399
2400 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2401
2402 let conn = store.pool.get().await.unwrap();
2403 let journal_size_limit = conn
2404 .query_row("PRAGMA journal_size_limit", (), |row| row.get::<_, u32>(0))
2405 .await
2406 .unwrap();
2407
2408 assert_eq!(journal_size_limit, 1500);
2411 }
2412
2413 statestore_integration_tests!();
2414}
2415
2416#[cfg(test)]
2417mod migration_tests {
2418 use std::{
2419 path::{Path, PathBuf},
2420 sync::{
2421 Arc,
2422 atomic::{AtomicU32, Ordering::SeqCst},
2423 },
2424 };
2425
2426 use as_variant::as_variant;
2427 use matrix_sdk_base::{
2428 RoomState, StateStore,
2429 media::{MediaFormat, MediaRequestParameters},
2430 store::{
2431 ChildTransactionId, DependentQueuedRequestKind, RoomLoadSettings,
2432 SerializableEventContent,
2433 },
2434 sync::UnreadNotificationsCount,
2435 };
2436 use matrix_sdk_test::async_test;
2437 use once_cell::sync::Lazy;
2438 use ruma::{
2439 EventId, MilliSecondsSinceUnixEpoch, OwnedTransactionId, RoomId, TransactionId, UserId,
2440 events::{
2441 StateEventType,
2442 room::{MediaSource, create::RoomCreateEventContent, message::RoomMessageEventContent},
2443 },
2444 room_id, server_name, user_id,
2445 };
2446 use rusqlite::Transaction;
2447 use serde::{Deserialize, Serialize};
2448 use serde_json::json;
2449 use tempfile::{TempDir, tempdir};
2450 use tokio::{fs, sync::Mutex};
2451 use zeroize::Zeroizing;
2452
2453 use super::{DATABASE_NAME, SqliteStateStore, init, keys};
2454 use crate::{
2455 OpenStoreError, Secret, SqliteStoreConfig,
2456 error::{Error, Result},
2457 utils::{EncryptableStore as _, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
2458 };
2459
2460 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2461 static NUM: AtomicU32 = AtomicU32::new(0);
2462 const SECRET: &str = "secret";
2463
2464 fn new_path() -> PathBuf {
2465 let name = NUM.fetch_add(1, SeqCst).to_string();
2466 TMP_DIR.path().join(name)
2467 }
2468
2469 async fn create_fake_db(path: &Path, version: u8) -> Result<SqliteStateStore> {
2470 let config = SqliteStoreConfig::new(path);
2471
2472 fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir).unwrap();
2473
2474 let pool = config.build_pool_of_connections(DATABASE_NAME).unwrap();
2475 let conn = pool.get().await?;
2476
2477 init(&conn).await?;
2478
2479 let store_cipher = Some(Arc::new(
2480 conn.get_or_create_store_cipher(Secret::PassPhrase(Zeroizing::new(SECRET.to_owned())))
2481 .await
2482 .unwrap(),
2483 ));
2484 let this = SqliteStateStore {
2485 store_cipher,
2486 pool,
2487 write_connection: Arc::new(Mutex::new(conn)),
2489 };
2490 this.run_migrations(1, Some(version)).await?;
2491
2492 Ok(this)
2493 }
2494
2495 fn room_info_v1_json(
2496 room_id: &RoomId,
2497 state: RoomState,
2498 name: Option<&str>,
2499 creator: Option<&UserId>,
2500 ) -> serde_json::Value {
2501 let name_content = match name {
2503 Some(name) => json!({ "name": name }),
2504 None => json!({ "name": null }),
2505 };
2506 let create_content = match creator {
2508 Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2509 None => RoomCreateEventContent::new_v11(),
2510 };
2511
2512 json!({
2513 "room_id": room_id,
2514 "room_type": state,
2515 "notification_counts": UnreadNotificationsCount::default(),
2516 "summary": {
2517 "heroes": [],
2518 "joined_member_count": 0,
2519 "invited_member_count": 0,
2520 },
2521 "members_synced": false,
2522 "base_info": {
2523 "dm_targets": [],
2524 "max_power_level": 100,
2525 "name": {
2526 "Original": {
2527 "content": name_content,
2528 },
2529 },
2530 "create": {
2531 "Original": {
2532 "content": create_content,
2533 }
2534 }
2535 },
2536 })
2537 }
2538
2539 #[async_test]
2540 pub async fn test_migrating_v1_to_v2() {
2541 let path = new_path();
2542 {
2544 let db = create_fake_db(&path, 1).await.unwrap();
2545 let conn = db.pool.get().await.unwrap();
2546
2547 let this = db.clone();
2548 conn.with_transaction(move |txn| {
2549 for i in 0..5 {
2550 let room_id = RoomId::parse(format!("!room_{i}:localhost")).unwrap();
2551 let (state, stripped) =
2552 if i < 3 { (RoomState::Joined, false) } else { (RoomState::Invited, true) };
2553 let info = room_info_v1_json(&room_id, state, None, None);
2554
2555 let room_id = this.encode_key(keys::ROOM_INFO, room_id);
2556 let data = this.serialize_json(&info)?;
2557
2558 txn.prepare_cached(
2559 "INSERT INTO room_info (room_id, stripped, data)
2560 VALUES (?, ?, ?)",
2561 )?
2562 .execute((room_id, stripped, data))?;
2563 }
2564
2565 Result::<_, Error>::Ok(())
2566 })
2567 .await
2568 .unwrap();
2569 }
2570
2571 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2573
2574 assert_eq!(store.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 5);
2576 }
2577
2578 fn add_room_v2(
2580 this: &SqliteStateStore,
2581 txn: &Transaction<'_>,
2582 room_id: &RoomId,
2583 name: Option<&str>,
2584 create_creator: Option<&UserId>,
2585 create_sender: Option<&UserId>,
2586 ) -> Result<(), Error> {
2587 let room_info_json = room_info_v1_json(room_id, RoomState::Joined, name, create_creator);
2588
2589 let encoded_room_id = this.encode_key(keys::ROOM_INFO, room_id);
2590 let encoded_state =
2591 this.encode_key(keys::ROOM_INFO, serde_json::to_string(&RoomState::Joined)?);
2592 let data = this.serialize_json(&room_info_json)?;
2593
2594 txn.prepare_cached(
2595 "INSERT INTO room_info (room_id, state, data)
2596 VALUES (?, ?, ?)",
2597 )?
2598 .execute((encoded_room_id, encoded_state, data))?;
2599
2600 let Some(create_sender) = create_sender else {
2602 return Ok(());
2603 };
2604
2605 let create_content = match create_creator {
2606 Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2607 None => RoomCreateEventContent::new_v11(),
2608 };
2609
2610 let event_id = EventId::new(server_name!("dummy.local"));
2611 let create_event = json!({
2612 "content": create_content,
2613 "event_id": event_id,
2614 "sender": create_sender.to_owned(),
2615 "origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
2616 "state_key": "",
2617 "type": "m.room.create",
2618 "unsigned": {},
2619 });
2620
2621 let encoded_room_id = this.encode_key(keys::STATE_EVENT, room_id);
2622 let encoded_event_type =
2623 this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
2624 let encoded_state_key = this.encode_key(keys::STATE_EVENT, "");
2625 let stripped = false;
2626 let encoded_event_id = this.encode_key(keys::STATE_EVENT, event_id);
2627 let data = this.serialize_json(&create_event)?;
2628
2629 txn.prepare_cached(
2630 "INSERT
2631 INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
2632 VALUES (?, ?, ?, ?, ?, ?)",
2633 )?
2634 .execute((
2635 encoded_room_id,
2636 encoded_event_type,
2637 encoded_state_key,
2638 stripped,
2639 encoded_event_id,
2640 data,
2641 ))?;
2642
2643 Ok(())
2644 }
2645
2646 #[async_test]
2647 pub async fn test_migrating_v2_to_v3() {
2648 let path = new_path();
2649
2650 let room_a_id = room_id!("!room_a:dummy.local");
2652 let room_a_name = "Room A";
2653 let room_a_creator = user_id!("@creator:dummy.local");
2654 let room_a_create_sender = user_id!("@sender:dummy.local");
2657
2658 let room_b_id = room_id!("!room_b:dummy.local");
2660
2661 let room_c_id = room_id!("!room_c:dummy.local");
2663 let room_c_create_sender = user_id!("@creator:dummy.local");
2664
2665 {
2667 let db = create_fake_db(&path, 2).await.unwrap();
2668 let conn = db.pool.get().await.unwrap();
2669
2670 let this = db.clone();
2671 conn.with_transaction(move |txn| {
2672 add_room_v2(
2673 &this,
2674 txn,
2675 room_a_id,
2676 Some(room_a_name),
2677 Some(room_a_creator),
2678 Some(room_a_create_sender),
2679 )?;
2680 add_room_v2(&this, txn, room_b_id, None, None, None)?;
2681 add_room_v2(&this, txn, room_c_id, None, None, Some(room_c_create_sender))?;
2682
2683 Result::<_, Error>::Ok(())
2684 })
2685 .await
2686 .unwrap();
2687 }
2688
2689 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2691
2692 let room_infos = store.get_room_infos(&RoomLoadSettings::default()).await.unwrap();
2694 assert_eq!(room_infos.len(), 3);
2695
2696 let room_a = room_infos.iter().find(|r| r.room_id() == room_a_id).unwrap();
2697 assert_eq!(room_a.name(), Some(room_a_name));
2698 assert_eq!(room_a.creators(), Some(vec![room_a_create_sender.to_owned()]));
2699
2700 let room_b = room_infos.iter().find(|r| r.room_id() == room_b_id).unwrap();
2701 assert_eq!(room_b.name(), None);
2702 assert_eq!(room_b.creators(), None);
2703
2704 let room_c = room_infos.iter().find(|r| r.room_id() == room_c_id).unwrap();
2705 assert_eq!(room_c.name(), None);
2706 assert_eq!(room_c.creators(), Some(vec![room_c_create_sender.to_owned()]));
2707 }
2708
2709 #[async_test]
2710 pub async fn test_migrating_v7_to_v9() {
2711 let path = new_path();
2712
2713 let room_id = room_id!("!room_a:dummy.local");
2714 let wedged_event_transaction_id = TransactionId::new();
2715 let local_event_transaction_id = TransactionId::new();
2716
2717 {
2719 let db = create_fake_db(&path, 7).await.unwrap();
2720 let conn = db.pool.get().await.unwrap();
2721
2722 let wedge_tx = wedged_event_transaction_id.clone();
2723 let local_tx = local_event_transaction_id.clone();
2724
2725 conn.with_transaction(move |txn| {
2726 add_dependent_send_queue_event_v7(
2727 &db,
2728 txn,
2729 room_id,
2730 &local_tx,
2731 ChildTransactionId::new(),
2732 DependentQueuedRequestKind::RedactEvent,
2733 )?;
2734 add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?;
2735 add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?;
2736 Result::<_, Error>::Ok(())
2737 })
2738 .await
2739 .unwrap();
2740 }
2741
2742 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2745
2746 let requests = store.load_send_queue_requests(room_id).await.unwrap();
2747 assert!(requests.is_empty());
2748
2749 let dependent_requests = store.load_dependent_queued_requests(room_id).await.unwrap();
2750 assert!(dependent_requests.is_empty());
2751 }
2752
2753 fn add_send_queue_event_v7(
2754 this: &SqliteStateStore,
2755 txn: &Transaction<'_>,
2756 transaction_id: &TransactionId,
2757 room_id: &RoomId,
2758 is_wedged: bool,
2759 ) -> Result<(), Error> {
2760 let content =
2761 SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?;
2762
2763 let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id);
2764 let room_id_value = this.serialize_value(&room_id.to_owned())?;
2765
2766 let content = this.serialize_json(&content)?;
2767
2768 txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
2769 .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
2770
2771 Ok(())
2772 }
2773
2774 fn add_dependent_send_queue_event_v7(
2775 this: &SqliteStateStore,
2776 txn: &Transaction<'_>,
2777 room_id: &RoomId,
2778 parent_txn_id: &TransactionId,
2779 own_txn_id: ChildTransactionId,
2780 content: DependentQueuedRequestKind,
2781 ) -> Result<(), Error> {
2782 let room_id_value = this.serialize_value(&room_id.to_owned())?;
2783
2784 let parent_txn_id = parent_txn_id.to_string();
2785 let own_txn_id = own_txn_id.to_string();
2786 let content = this.serialize_json(&content)?;
2787
2788 txn.prepare_cached(
2789 "INSERT INTO dependent_send_queue_events
2790 (room_id, parent_transaction_id, own_transaction_id, content)
2791 VALUES (?, ?, ?, ?)",
2792 )?
2793 .execute((room_id_value, parent_txn_id, own_txn_id, content))?;
2794
2795 Ok(())
2796 }
2797
2798 #[derive(Clone, Debug, Serialize, Deserialize)]
2799 pub enum LegacyDependentQueuedRequestKind {
2800 UploadFileWithThumbnail {
2801 content_type: String,
2802 cache_key: MediaRequestParameters,
2803 related_to: OwnedTransactionId,
2804 },
2805 }
2806
2807 #[async_test]
2808 pub async fn test_dependent_queued_request_variant_renaming() {
2809 let path = new_path();
2810 let db = create_fake_db(&path, 7).await.unwrap();
2811
2812 let cache_key = MediaRequestParameters {
2813 format: MediaFormat::File,
2814 source: MediaSource::Plain("https://server.local/foobar".into()),
2815 };
2816 let related_to = TransactionId::new();
2817 let request = LegacyDependentQueuedRequestKind::UploadFileWithThumbnail {
2818 content_type: "image/png".to_owned(),
2819 cache_key,
2820 related_to: related_to.clone(),
2821 };
2822
2823 let data = db
2824 .serialize_json(&request)
2825 .expect("should be able to serialize legacy dependent request");
2826 let deserialized: DependentQueuedRequestKind = db.deserialize_json(&data).expect(
2827 "should be able to deserialize dependent request from legacy dependent request",
2828 );
2829
2830 as_variant!(deserialized, DependentQueuedRequestKind::UploadFileOrThumbnail { related_to: de_related_to, .. } => {
2831 assert_eq!(de_related_to, related_to);
2832 });
2833 }
2834}