1use std::{
2 borrow::Cow,
3 collections::{BTreeMap, BTreeSet, HashMap},
4 fmt, iter,
5 path::Path,
6 sync::Arc,
7};
8
9use async_trait::async_trait;
10use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
11use matrix_sdk_base::{
12 deserialized_responses::{DisplayName, RawAnySyncOrStrippedState, SyncOrStrippedState},
13 store::{
14 migration_helpers::RoomInfoV1, ChildTransactionId, DependentQueuedRequest,
15 DependentQueuedRequestKind, QueueWedgeError, QueuedRequest, QueuedRequestKind,
16 RoomLoadSettings, SentRequestKey,
17 },
18 MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateChanges, StateStore,
19 StateStoreDataKey, StateStoreDataValue,
20};
21use matrix_sdk_store_encryption::StoreCipher;
22use ruma::{
23 canonical_json::{redact, RedactedBecause},
24 events::{
25 presence::PresenceEvent,
26 receipt::{Receipt, ReceiptThread, ReceiptType},
27 room::{
28 create::RoomCreateEventContent,
29 member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
30 },
31 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
32 GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
33 },
34 serde::Raw,
35 CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
36 OwnedTransactionId, OwnedUserId, RoomId, RoomVersionId, TransactionId, UInt, UserId,
37};
38use rusqlite::{OptionalExtension, Transaction};
39use serde::{de::DeserializeOwned, Deserialize, Serialize};
40use tokio::fs;
41use tracing::{debug, warn};
42
43use crate::{
44 error::{Error, Result},
45 utils::{
46 repeat_vars, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
47 SqliteKeyValueStoreConnExt,
48 },
49 OpenStoreError, SqliteStoreConfig,
50};
51
52mod keys {
53 pub const KV_BLOB: &str = "kv_blob";
55 pub const ROOM_INFO: &str = "room_info";
56 pub const STATE_EVENT: &str = "state_event";
57 pub const GLOBAL_ACCOUNT_DATA: &str = "global_account_data";
58 pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
59 pub const MEMBER: &str = "member";
60 pub const PROFILE: &str = "profile";
61 pub const RECEIPT: &str = "receipt";
62 pub const DISPLAY_NAME: &str = "display_name";
63 pub const SEND_QUEUE: &str = "send_queue_events";
64 pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
65}
66
67const DATABASE_NAME: &str = "matrix-sdk-state.sqlite3";
68
69const DATABASE_VERSION: u8 = 12;
75
76#[derive(Clone)]
78pub struct SqliteStateStore {
79 store_cipher: Option<Arc<StoreCipher>>,
80 pool: SqlitePool,
81}
82
83#[cfg(not(tarpaulin_include))]
84impl fmt::Debug for SqliteStateStore {
85 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86 f.debug_struct("SqliteStateStore").finish_non_exhaustive()
87 }
88}
89
90impl SqliteStateStore {
91 pub async fn open(
94 path: impl AsRef<Path>,
95 passphrase: Option<&str>,
96 ) -> Result<Self, OpenStoreError> {
97 Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
98 }
99
100 pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
102 let SqliteStoreConfig { path, passphrase, pool_config, runtime_config } = config;
103
104 fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
105
106 let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
107 config.pool = Some(pool_config);
108
109 let pool = config.create_pool(Runtime::Tokio1)?;
110
111 let this = Self::open_with_pool(pool, passphrase.as_deref()).await?;
112 this.pool.get().await?.apply_runtime_config(runtime_config).await?;
113
114 Ok(this)
115 }
116
117 async fn open_with_pool(
120 pool: SqlitePool,
121 passphrase: Option<&str>,
122 ) -> Result<Self, OpenStoreError> {
123 let conn = pool.get().await?;
124
125 let mut version = conn.db_version().await?;
126
127 if version == 0 {
128 init(&conn).await?;
129 version = 1;
130 }
131
132 let store_cipher = match passphrase {
133 Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
134 None => None,
135 };
136 let this = Self { store_cipher, pool };
137 this.run_migrations(&conn, version, None).await?;
138
139 Ok(this)
140 }
141
142 async fn run_migrations(&self, conn: &SqliteAsyncConn, from: u8, to: Option<u8>) -> Result<()> {
147 let to = to.unwrap_or(DATABASE_VERSION);
148
149 if from < to {
150 debug!(version = from, new_version = to, "Upgrading database");
151 } else {
152 return Ok(());
153 }
154
155 if from < 2 && to >= 2 {
156 let this = self.clone();
157 conn.with_transaction(move |txn| {
158 txn.execute_batch(include_str!(
160 "../migrations/state_store/002_a_create_new_room_info.sql"
161 ))?;
162
163 for data in txn
165 .prepare("SELECT data FROM room_info")?
166 .query_map((), |row| row.get::<_, Vec<u8>>(0))?
167 {
168 let data = data?;
169 let room_info: RoomInfoV1 = this.deserialize_json(&data)?;
170
171 let room_id = this.encode_key(keys::ROOM_INFO, room_info.room_id());
172 let state = this
173 .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
174 txn.prepare_cached(
175 "INSERT OR REPLACE INTO new_room_info (room_id, state, data)
176 VALUES (?, ?, ?)",
177 )?
178 .execute((room_id, state, data))?;
179 }
180
181 txn.execute_batch(include_str!(
183 "../migrations/state_store/002_b_replace_room_info.sql"
184 ))?;
185
186 txn.set_db_version(2)?;
187 Result::<_, Error>::Ok(())
188 })
189 .await?;
190 }
191
192 if from < 3 && to >= 3 {
194 let this = self.clone();
195 conn.with_transaction(move |txn| {
196 for data in txn
198 .prepare("SELECT data FROM room_info")?
199 .query_map((), |row| row.get::<_, Vec<u8>>(0))?
200 {
201 let data = data?;
202 let room_info_v1: RoomInfoV1 = this.deserialize_json(&data)?;
203
204 let room_id = this.encode_key(keys::STATE_EVENT, room_info_v1.room_id());
206 let event_type =
207 this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
208 let create_res = txn
209 .prepare(
210 "SELECT stripped, data FROM state_event
211 WHERE room_id = ? AND event_type = ?",
212 )?
213 .query_row([room_id, event_type], |row| {
214 Ok((row.get::<_, bool>(0)?, row.get::<_, Vec<u8>>(1)?))
215 })
216 .optional()?;
217
218 let create = create_res.and_then(|(stripped, data)| {
219 let create = if stripped {
220 SyncOrStrippedState::<RoomCreateEventContent>::Stripped(
221 this.deserialize_json(&data).ok()?,
222 )
223 } else {
224 SyncOrStrippedState::Sync(this.deserialize_json(&data).ok()?)
225 };
226 Some(create)
227 });
228
229 let migrated_room_info = room_info_v1.migrate(create.as_ref());
230
231 let data = this.serialize_json(&migrated_room_info)?;
232 let room_id = this.encode_key(keys::ROOM_INFO, migrated_room_info.room_id());
233 txn.prepare_cached("UPDATE room_info SET data = ? WHERE room_id = ?")?
234 .execute((data, room_id))?;
235 }
236
237 txn.set_db_version(3)?;
238 Result::<_, Error>::Ok(())
239 })
240 .await?;
241 }
242
243 if from < 4 && to >= 4 {
244 conn.with_transaction(move |txn| {
245 txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
247 txn.set_db_version(4)
248 })
249 .await?;
250 }
251
252 if from < 5 && to >= 5 {
253 conn.with_transaction(move |txn| {
254 txn.execute_batch(include_str!(
256 "../migrations/state_store/004_send_queue_with_roomid_value.sql"
257 ))?;
258 txn.set_db_version(4)
259 })
260 .await?;
261 }
262
263 if from < 6 && to >= 6 {
264 conn.with_transaction(move |txn| {
265 txn.execute_batch(include_str!(
267 "../migrations/state_store/005_send_queue_dependent_events.sql"
268 ))?;
269 txn.set_db_version(6)
270 })
271 .await?;
272 }
273
274 if from < 7 && to >= 7 {
275 conn.with_transaction(move |txn| {
276 txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
278 txn.set_db_version(7)
279 })
280 .await?;
281 }
282
283 if from < 8 && to >= 8 {
284 let error = QueueWedgeError::GenericApiError {
286 msg: "local echo failed to send in a previous session".into(),
287 };
288 let default_err = self.serialize_value(&error)?;
289
290 conn.with_transaction(move |txn| {
291 txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?;
293
294 for wedged_entries in txn
297 .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")?
298 .query_map((), |row| {
299 Ok(
300 (row.get::<_, Vec<u8>>(0)?,row.get::<_, String>(1)?)
301 )
302 })? {
303
304 let (room_id, transaction_id) = wedged_entries?;
305
306 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?
307 .execute((default_err.clone(), room_id, transaction_id))?;
308 }
309
310
311 txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?;
313
314 txn.set_db_version(8)
315 })
316 .await?;
317 }
318
319 if from < 9 && to >= 9 {
320 conn.with_transaction(move |txn| {
321 txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?;
323 txn.set_db_version(9)
324 })
325 .await?;
326 }
327
328 if from < 10 && to >= 10 {
329 conn.with_transaction(move |txn| {
330 txn.execute_batch(include_str!(
332 "../migrations/state_store/009_send_queue_priority.sql"
333 ))?;
334 txn.set_db_version(10)
335 })
336 .await?;
337 }
338
339 if from < 11 && to >= 11 {
340 conn.with_transaction(move |txn| {
341 txn.execute_batch(include_str!(
343 "../migrations/state_store/010_send_queue_enqueue_time.sql"
344 ))?;
345 txn.set_db_version(11)
346 })
347 .await?;
348 }
349
350 if from < 12 && to >= 12 {
351 conn.vacuum().await?;
355 conn.set_kv("version", vec![12]).await?;
356 }
357
358 Ok(())
359 }
360
361 fn encode_value(&self, value: Vec<u8>) -> Result<Vec<u8>> {
362 if let Some(key) = &self.store_cipher {
363 let encrypted = key.encrypt_value_data(value)?;
364 Ok(rmp_serde::to_vec_named(&encrypted)?)
365 } else {
366 Ok(value)
367 }
368 }
369
370 fn serialize_value(&self, value: &impl Serialize) -> Result<Vec<u8>> {
371 let serialized = rmp_serde::to_vec_named(value)?;
372 self.encode_value(serialized)
373 }
374
375 fn serialize_json(&self, value: &impl Serialize) -> Result<Vec<u8>> {
376 let serialized = serde_json::to_vec(value)?;
377 self.encode_value(serialized)
378 }
379
380 fn decode_value<'a>(&self, value: &'a [u8]) -> Result<Cow<'a, [u8]>> {
381 if let Some(key) = &self.store_cipher {
382 let encrypted = rmp_serde::from_slice(value)?;
383 let decrypted = key.decrypt_value_data(encrypted)?;
384 Ok(Cow::Owned(decrypted))
385 } else {
386 Ok(Cow::Borrowed(value))
387 }
388 }
389
390 fn deserialize_json<T: DeserializeOwned>(&self, data: &[u8]) -> Result<T> {
391 let decoded = self.decode_value(data)?;
392 Ok(serde_json::from_slice(&decoded)?)
393 }
394
395 fn deserialize_value<T: DeserializeOwned>(&self, value: &[u8]) -> Result<T> {
396 let decoded = self.decode_value(value)?;
397 Ok(rmp_serde::from_slice(&decoded)?)
398 }
399
400 fn encode_key(&self, table_name: &str, key: impl AsRef<[u8]>) -> Key {
401 let bytes = key.as_ref();
402 if let Some(store_cipher) = &self.store_cipher {
403 Key::Hashed(store_cipher.hash_key(table_name, bytes))
404 } else {
405 Key::Plain(bytes.to_owned())
406 }
407 }
408
409 fn encode_state_store_data_key(&self, key: StateStoreDataKey<'_>) -> Key {
410 let key_s = match key {
411 StateStoreDataKey::SyncToken => Cow::Borrowed(StateStoreDataKey::SYNC_TOKEN),
412 StateStoreDataKey::ServerCapabilities => {
413 Cow::Borrowed(StateStoreDataKey::SERVER_CAPABILITIES)
414 }
415 StateStoreDataKey::Filter(f) => {
416 Cow::Owned(format!("{}:{f}", StateStoreDataKey::FILTER))
417 }
418 StateStoreDataKey::UserAvatarUrl(u) => {
419 Cow::Owned(format!("{}:{u}", StateStoreDataKey::USER_AVATAR_URL))
420 }
421 StateStoreDataKey::RecentlyVisitedRooms(b) => {
422 Cow::Owned(format!("{}:{b}", StateStoreDataKey::RECENTLY_VISITED_ROOMS))
423 }
424 StateStoreDataKey::UtdHookManagerData => {
425 Cow::Borrowed(StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
426 }
427 StateStoreDataKey::ComposerDraft(room_id) => {
428 Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::COMPOSER_DRAFT))
429 }
430 StateStoreDataKey::SeenKnockRequests(room_id) => {
431 Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::SEEN_KNOCK_REQUESTS))
432 }
433 };
434
435 self.encode_key(keys::KV_BLOB, &*key_s)
436 }
437
438 fn encode_presence_key(&self, user_id: &UserId) -> Key {
439 self.encode_key(keys::KV_BLOB, format!("presence:{user_id}"))
440 }
441
442 fn encode_custom_key(&self, key: &[u8]) -> Key {
443 let mut full_key = b"custom:".to_vec();
444 full_key.extend(key);
445 self.encode_key(keys::KV_BLOB, full_key)
446 }
447
448 async fn acquire(&self) -> Result<SqliteAsyncConn> {
449 Ok(self.pool.get().await?)
450 }
451
452 fn remove_maybe_stripped_room_data(
453 &self,
454 txn: &Transaction<'_>,
455 room_id: &RoomId,
456 stripped: bool,
457 ) -> rusqlite::Result<()> {
458 let state_event_room_id = self.encode_key(keys::STATE_EVENT, room_id);
459 txn.remove_room_state_events(&state_event_room_id, Some(stripped))?;
460
461 let member_room_id = self.encode_key(keys::MEMBER, room_id);
462 txn.remove_room_members(&member_room_id, Some(stripped))
463 }
464}
465
466async fn init(conn: &SqliteAsyncConn) -> Result<()> {
468 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
471 conn.with_transaction(|txn| {
472 txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
473 txn.set_db_version(1)?;
474
475 Ok(())
476 })
477 .await
478}
479
480trait SqliteConnectionStateStoreExt {
481 fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()>;
482
483 fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()>;
484
485 fn set_room_account_data(
486 &self,
487 room_id: &[u8],
488 event_type: &[u8],
489 data: &[u8],
490 ) -> rusqlite::Result<()>;
491 fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()>;
492
493 fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()>;
494 fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>>;
495 fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()>;
496
497 fn set_state_event(
498 &self,
499 room_id: &[u8],
500 event_type: &[u8],
501 state_key: &[u8],
502 stripped: bool,
503 event_id: Option<&[u8]>,
504 data: &[u8],
505 ) -> rusqlite::Result<()>;
506 fn get_state_event_by_id(
507 &self,
508 room_id: &[u8],
509 event_id: &[u8],
510 ) -> rusqlite::Result<Option<Vec<u8>>>;
511 fn remove_room_state_events(
512 &self,
513 room_id: &[u8],
514 stripped: Option<bool>,
515 ) -> rusqlite::Result<()>;
516
517 fn set_member(
518 &self,
519 room_id: &[u8],
520 user_id: &[u8],
521 membership: &[u8],
522 stripped: bool,
523 data: &[u8],
524 ) -> rusqlite::Result<()>;
525 fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()>;
526
527 fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
528 fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()>;
529 fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()>;
530
531 fn set_receipt(
532 &self,
533 room_id: &[u8],
534 user_id: &[u8],
535 receipt_type: &[u8],
536 thread_id: &[u8],
537 event_id: &[u8],
538 data: &[u8],
539 ) -> rusqlite::Result<()>;
540 fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()>;
541
542 fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()>;
543 fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()>;
544 fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()>;
545 fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
546 fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
547}
548
549impl SqliteConnectionStateStoreExt for rusqlite::Connection {
550 fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()> {
551 self.execute("INSERT OR REPLACE INTO kv_blob VALUES (?, ?)", (key, value))?;
552 Ok(())
553 }
554
555 fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()> {
556 self.prepare_cached(
557 "INSERT OR REPLACE INTO global_account_data (event_type, data)
558 VALUES (?, ?)",
559 )?
560 .execute((event_type, data))?;
561 Ok(())
562 }
563
564 fn set_room_account_data(
565 &self,
566 room_id: &[u8],
567 event_type: &[u8],
568 data: &[u8],
569 ) -> rusqlite::Result<()> {
570 self.prepare_cached(
571 "INSERT OR REPLACE INTO room_account_data (room_id, event_type, data)
572 VALUES (?, ?, ?)",
573 )?
574 .execute((room_id, event_type, data))?;
575 Ok(())
576 }
577
578 fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()> {
579 self.prepare(
580 "DELETE FROM room_account_data
581 WHERE room_id = ?",
582 )?
583 .execute((room_id,))?;
584 Ok(())
585 }
586
587 fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()> {
588 self.prepare_cached(
589 "INSERT OR REPLACE INTO room_info (room_id, state, data)
590 VALUES (?, ?, ?)",
591 )?
592 .execute((room_id, state, data))?;
593 Ok(())
594 }
595
596 fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>> {
597 self.query_row("SELECT data FROM room_info WHERE room_id = ?", (room_id,), |row| row.get(0))
598 .optional()
599 }
600
601 fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()> {
603 self.prepare_cached("DELETE FROM room_info WHERE room_id = ?")?.execute((room_id,))?;
604 Ok(())
605 }
606
607 fn set_state_event(
608 &self,
609 room_id: &[u8],
610 event_type: &[u8],
611 state_key: &[u8],
612 stripped: bool,
613 event_id: Option<&[u8]>,
614 data: &[u8],
615 ) -> rusqlite::Result<()> {
616 self.prepare_cached(
617 "INSERT OR REPLACE
618 INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
619 VALUES (?, ?, ?, ?, ?, ?)",
620 )?
621 .execute((room_id, event_type, state_key, stripped, event_id, data))?;
622 Ok(())
623 }
624
625 fn get_state_event_by_id(
626 &self,
627 room_id: &[u8],
628 event_id: &[u8],
629 ) -> rusqlite::Result<Option<Vec<u8>>> {
630 self.query_row(
631 "SELECT data FROM state_event WHERE room_id = ? AND event_id = ?",
632 (room_id, event_id),
633 |row| row.get(0),
634 )
635 .optional()
636 }
637
638 fn remove_room_state_events(
644 &self,
645 room_id: &[u8],
646 stripped: Option<bool>,
647 ) -> rusqlite::Result<()> {
648 if let Some(stripped) = stripped {
649 self.prepare_cached("DELETE FROM state_event WHERE room_id = ? AND stripped = ?")?
650 .execute((room_id, stripped))?;
651 } else {
652 self.prepare_cached("DELETE FROM state_event WHERE room_id = ?")?
653 .execute((room_id,))?;
654 }
655 Ok(())
656 }
657
658 fn set_member(
659 &self,
660 room_id: &[u8],
661 user_id: &[u8],
662 membership: &[u8],
663 stripped: bool,
664 data: &[u8],
665 ) -> rusqlite::Result<()> {
666 self.prepare_cached(
667 "INSERT OR REPLACE
668 INTO member (room_id, user_id, membership, stripped, data)
669 VALUES (?, ?, ?, ?, ?)",
670 )?
671 .execute((room_id, user_id, membership, stripped, data))?;
672 Ok(())
673 }
674
675 fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()> {
680 if let Some(stripped) = stripped {
681 self.prepare_cached("DELETE FROM member WHERE room_id = ? AND stripped = ?")?
682 .execute((room_id, stripped))?;
683 } else {
684 self.prepare_cached("DELETE FROM member WHERE room_id = ?")?.execute((room_id,))?;
685 }
686 Ok(())
687 }
688
689 fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
690 self.prepare_cached(
691 "INSERT OR REPLACE
692 INTO profile (room_id, user_id, data)
693 VALUES (?, ?, ?)",
694 )?
695 .execute((room_id, user_id, data))?;
696 Ok(())
697 }
698
699 fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()> {
700 self.prepare("DELETE FROM profile WHERE room_id = ?")?.execute((room_id,))?;
701 Ok(())
702 }
703
704 fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()> {
705 self.prepare("DELETE FROM profile WHERE room_id = ? AND user_id = ?")?
706 .execute((room_id, user_id))?;
707 Ok(())
708 }
709
710 fn set_receipt(
711 &self,
712 room_id: &[u8],
713 user_id: &[u8],
714 receipt_type: &[u8],
715 thread: &[u8],
716 event_id: &[u8],
717 data: &[u8],
718 ) -> rusqlite::Result<()> {
719 self.prepare_cached(
720 "INSERT OR REPLACE
721 INTO receipt (room_id, user_id, receipt_type, thread, event_id, data)
722 VALUES (?, ?, ?, ?, ?, ?)",
723 )?
724 .execute((room_id, user_id, receipt_type, thread, event_id, data))?;
725 Ok(())
726 }
727
728 fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()> {
729 self.prepare("DELETE FROM receipt WHERE room_id = ?")?.execute((room_id,))?;
730 Ok(())
731 }
732
733 fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
734 self.prepare_cached(
735 "INSERT OR REPLACE
736 INTO display_name (room_id, name, data)
737 VALUES (?, ?, ?)",
738 )?
739 .execute((room_id, name, data))?;
740 Ok(())
741 }
742
743 fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()> {
744 self.prepare("DELETE FROM display_name WHERE room_id = ? AND name = ?")?
745 .execute((room_id, name))?;
746 Ok(())
747 }
748
749 fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()> {
750 self.prepare("DELETE FROM display_name WHERE room_id = ?")?.execute((room_id,))?;
751 Ok(())
752 }
753
754 fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
755 self.prepare("DELETE FROM send_queue_events WHERE room_id = ?")?.execute((room_id,))?;
756 Ok(())
757 }
758
759 fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
760 self.prepare("DELETE FROM dependent_send_queue_events WHERE room_id = ?")?
761 .execute((room_id,))?;
762 Ok(())
763 }
764}
765
766#[async_trait]
767trait SqliteObjectStateStoreExt: SqliteAsyncConnExt {
768 async fn get_kv_blob(&self, key: Key) -> Result<Option<Vec<u8>>> {
769 Ok(self
770 .query_row("SELECT value FROM kv_blob WHERE key = ?", (key,), |row| row.get(0))
771 .await
772 .optional()?)
773 }
774
775 async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
776 let keys_length = keys.len();
777
778 self.chunk_large_query_over(keys, Some(keys_length), |txn, keys| {
779 let sql_params = repeat_vars(keys.len());
780 let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");
781
782 let params = rusqlite::params_from_iter(keys);
783
784 Ok(txn
785 .prepare(&sql)?
786 .query(params)?
787 .mapped(|row| row.get(0))
788 .collect::<Result<_, _>>()?)
789 })
790 .await
791 }
792
793 async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;
794
795 async fn delete_kv_blob(&self, key: Key) -> Result<()> {
796 self.execute("DELETE FROM kv_blob WHERE key = ?", (key,)).await?;
797 Ok(())
798 }
799
800 async fn get_room_infos(&self, room_id: Option<Key>) -> Result<Vec<Vec<u8>>> {
801 Ok(match room_id {
802 None => {
803 self.prepare("SELECT data FROM room_info", move |mut stmt| {
804 stmt.query_map((), |row| row.get(0))?.collect()
805 })
806 .await?
807 }
808
809 Some(room_id) => {
810 self.prepare("SELECT data FROM room_info WHERE room_id = ?", move |mut stmt| {
811 stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
812 })
813 .await?
814 }
815 })
816 }
817
818 async fn get_maybe_stripped_state_events_for_keys(
819 &self,
820 room_id: Key,
821 event_type: Key,
822 state_keys: Vec<Key>,
823 ) -> Result<Vec<(bool, Vec<u8>)>> {
824 self.chunk_large_query_over(state_keys, None, move |txn, state_keys: Vec<Key>| {
825 let sql_params = repeat_vars(state_keys.len());
826 let sql = format!(
827 "SELECT stripped, data FROM state_event
828 WHERE room_id = ? AND event_type = ? AND state_key IN ({sql_params})"
829 );
830
831 let params = rusqlite::params_from_iter(
832 [room_id.clone(), event_type.clone()].into_iter().chain(state_keys),
833 );
834
835 Ok(txn
836 .prepare(&sql)?
837 .query(params)?
838 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
839 .collect::<Result<_, _>>()?)
840 })
841 .await
842 }
843
844 async fn get_maybe_stripped_state_events(
845 &self,
846 room_id: Key,
847 event_type: Key,
848 ) -> Result<Vec<(bool, Vec<u8>)>> {
849 Ok(self
850 .prepare(
851 "SELECT stripped, data FROM state_event
852 WHERE room_id = ? AND event_type = ?",
853 |mut stmt| {
854 stmt.query((room_id, event_type))?
855 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
856 .collect()
857 },
858 )
859 .await?)
860 }
861
862 async fn get_profiles(
863 &self,
864 room_id: Key,
865 user_ids: Vec<Key>,
866 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
867 let user_ids_length = user_ids.len();
868
869 self.chunk_large_query_over(user_ids, Some(user_ids_length), move |txn, user_ids| {
870 let sql_params = repeat_vars(user_ids.len());
871 let sql = format!(
872 "SELECT user_id, data FROM profile WHERE room_id = ? AND user_id IN ({sql_params})"
873 );
874
875 let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(user_ids));
876
877 Ok(txn
878 .prepare(&sql)?
879 .query(params)?
880 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
881 .collect::<Result<_, _>>()?)
882 })
883 .await
884 }
885
886 async fn get_user_ids(&self, room_id: Key, memberships: Vec<Key>) -> Result<Vec<Vec<u8>>> {
887 let res = if memberships.is_empty() {
888 self.prepare("SELECT data FROM member WHERE room_id = ?", |mut stmt| {
889 stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
890 })
891 .await?
892 } else {
893 self.chunk_large_query_over(memberships, None, move |txn, memberships| {
894 let sql_params = repeat_vars(memberships.len());
895 let sql = format!(
896 "SELECT data FROM member WHERE room_id = ? AND membership IN ({sql_params})"
897 );
898
899 let params =
900 rusqlite::params_from_iter(iter::once(room_id.clone()).chain(memberships));
901
902 Ok(txn
903 .prepare(&sql)?
904 .query(params)?
905 .mapped(|row| row.get(0))
906 .collect::<Result<_, _>>()?)
907 })
908 .await?
909 };
910
911 Ok(res)
912 }
913
914 async fn get_global_account_data(&self, event_type: Key) -> Result<Option<Vec<u8>>> {
915 Ok(self
916 .query_row(
917 "SELECT data FROM global_account_data WHERE event_type = ?",
918 (event_type,),
919 |row| row.get(0),
920 )
921 .await
922 .optional()?)
923 }
924
925 async fn get_room_account_data(
926 &self,
927 room_id: Key,
928 event_type: Key,
929 ) -> Result<Option<Vec<u8>>> {
930 Ok(self
931 .query_row(
932 "SELECT data FROM room_account_data WHERE room_id = ? AND event_type = ?",
933 (room_id, event_type),
934 |row| row.get(0),
935 )
936 .await
937 .optional()?)
938 }
939
940 async fn get_display_names(
941 &self,
942 room_id: Key,
943 names: Vec<Key>,
944 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
945 let names_length = names.len();
946
947 self.chunk_large_query_over(names, Some(names_length), move |txn, names| {
948 let sql_params = repeat_vars(names.len());
949 let sql = format!(
950 "SELECT name, data FROM display_name WHERE room_id = ? AND name IN ({sql_params})"
951 );
952
953 let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(names));
954
955 Ok(txn
956 .prepare(&sql)?
957 .query(params)?
958 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
959 .collect::<Result<_, _>>()?)
960 })
961 .await
962 }
963
964 async fn get_user_receipt(
965 &self,
966 room_id: Key,
967 receipt_type: Key,
968 thread: Key,
969 user_id: Key,
970 ) -> Result<Option<Vec<u8>>> {
971 Ok(self
972 .query_row(
973 "SELECT data FROM receipt
974 WHERE room_id = ? AND receipt_type = ? AND thread = ? and user_id = ?",
975 (room_id, receipt_type, thread, user_id),
976 |row| row.get(0),
977 )
978 .await
979 .optional()?)
980 }
981
982 async fn get_event_receipts(
983 &self,
984 room_id: Key,
985 receipt_type: Key,
986 thread: Key,
987 event_id: Key,
988 ) -> Result<Vec<Vec<u8>>> {
989 Ok(self
990 .prepare(
991 "SELECT data FROM receipt
992 WHERE room_id = ? AND receipt_type = ? AND thread = ? and event_id = ?",
993 |mut stmt| {
994 stmt.query((room_id, receipt_type, thread, event_id))?
995 .mapped(|row| row.get(0))
996 .collect()
997 },
998 )
999 .await?)
1000 }
1001}
1002
1003#[async_trait]
1004impl SqliteObjectStateStoreExt for SqliteAsyncConn {
1005 async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()> {
1006 Ok(self.interact(move |conn| conn.set_kv_blob(&key, &value)).await.unwrap()?)
1007 }
1008}
1009
1010#[async_trait]
1011impl StateStore for SqliteStateStore {
1012 type Error = Error;
1013
1014 async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
1015 self.acquire()
1016 .await?
1017 .get_kv_blob(self.encode_state_store_data_key(key))
1018 .await?
1019 .map(|data| {
1020 Ok(match key {
1021 StateStoreDataKey::SyncToken => {
1022 StateStoreDataValue::SyncToken(self.deserialize_value(&data)?)
1023 }
1024 StateStoreDataKey::ServerCapabilities => {
1025 StateStoreDataValue::ServerCapabilities(self.deserialize_value(&data)?)
1026 }
1027 StateStoreDataKey::Filter(_) => {
1028 StateStoreDataValue::Filter(self.deserialize_value(&data)?)
1029 }
1030 StateStoreDataKey::UserAvatarUrl(_) => {
1031 StateStoreDataValue::UserAvatarUrl(self.deserialize_value(&data)?)
1032 }
1033 StateStoreDataKey::RecentlyVisitedRooms(_) => {
1034 StateStoreDataValue::RecentlyVisitedRooms(self.deserialize_value(&data)?)
1035 }
1036 StateStoreDataKey::UtdHookManagerData => {
1037 StateStoreDataValue::UtdHookManagerData(self.deserialize_value(&data)?)
1038 }
1039 StateStoreDataKey::ComposerDraft(_) => {
1040 StateStoreDataValue::ComposerDraft(self.deserialize_value(&data)?)
1041 }
1042 StateStoreDataKey::SeenKnockRequests(_) => {
1043 StateStoreDataValue::SeenKnockRequests(self.deserialize_value(&data)?)
1044 }
1045 })
1046 })
1047 .transpose()
1048 }
1049
1050 async fn set_kv_data(
1051 &self,
1052 key: StateStoreDataKey<'_>,
1053 value: StateStoreDataValue,
1054 ) -> Result<()> {
1055 let serialized_value = match key {
1056 StateStoreDataKey::SyncToken => self.serialize_value(
1057 &value.into_sync_token().expect("Session data not a sync token"),
1058 )?,
1059 StateStoreDataKey::ServerCapabilities => self.serialize_value(
1060 &value
1061 .into_server_capabilities()
1062 .expect("Session data not containing server capabilities"),
1063 )?,
1064 StateStoreDataKey::Filter(_) => {
1065 self.serialize_value(&value.into_filter().expect("Session data not a filter"))?
1066 }
1067 StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
1068 &value.into_user_avatar_url().expect("Session data not an user avatar url"),
1069 )?,
1070 StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
1071 &value.into_recently_visited_rooms().expect("Session data not breadcrumbs"),
1072 )?,
1073 StateStoreDataKey::UtdHookManagerData => self.serialize_value(
1074 &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
1075 )?,
1076 StateStoreDataKey::ComposerDraft(_) => self.serialize_value(
1077 &value.into_composer_draft().expect("Session data not a composer draft"),
1078 )?,
1079 StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
1080 &value
1081 .into_seen_knock_requests()
1082 .expect("Session data is not a set of seen knock request ids"),
1083 )?,
1084 };
1085
1086 self.acquire()
1087 .await?
1088 .set_kv_blob(self.encode_state_store_data_key(key), serialized_value)
1089 .await
1090 }
1091
1092 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
1093 self.acquire().await?.delete_kv_blob(self.encode_state_store_data_key(key)).await
1094 }
1095
1096 async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
1097 let changes = changes.to_owned();
1098 let this = self.clone();
1099 self.acquire()
1100 .await?
1101 .with_transaction(move |txn| {
1102 let StateChanges {
1103 sync_token,
1104 account_data,
1105 presence,
1106 profiles,
1107 profiles_to_delete,
1108 state,
1109 room_account_data,
1110 room_infos,
1111 receipts,
1112 redactions,
1113 stripped_state,
1114 ambiguity_maps,
1115 } = changes;
1116
1117 if let Some(sync_token) = sync_token {
1118 let key = this.encode_state_store_data_key(StateStoreDataKey::SyncToken);
1119 let value = this.serialize_value(&sync_token)?;
1120 txn.set_kv_blob(&key, &value)?;
1121 }
1122
1123 for (event_type, event) in account_data {
1124 let event_type =
1125 this.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1126 let data = this.serialize_json(&event)?;
1127 txn.set_global_account_data(&event_type, &data)?;
1128 }
1129
1130 for (room_id, events) in room_account_data {
1131 let room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1132 for (event_type, event) in events {
1133 let event_type =
1134 this.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1135 let data = this.serialize_json(&event)?;
1136 txn.set_room_account_data(&room_id, &event_type, &data)?;
1137 }
1138 }
1139
1140 for (user_id, event) in presence {
1141 let key = this.encode_presence_key(&user_id);
1142 let value = this.serialize_json(&event)?;
1143 txn.set_kv_blob(&key, &value)?;
1144 }
1145
1146 for (room_id, room_info) in room_infos {
1147 let stripped = room_info.state() == RoomState::Invited;
1148 this.remove_maybe_stripped_room_data(txn, &room_id, !stripped)?;
1150
1151 let room_id = this.encode_key(keys::ROOM_INFO, room_id);
1152 let state = this
1153 .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
1154 let data = this.serialize_json(&room_info)?;
1155 txn.set_room_info(&room_id, &state, &data)?;
1156 }
1157
1158 for (room_id, user_ids) in profiles_to_delete {
1159 let room_id = this.encode_key(keys::PROFILE, room_id);
1160 for user_id in user_ids {
1161 let user_id = this.encode_key(keys::PROFILE, user_id);
1162 txn.remove_room_profile(&room_id, &user_id)?;
1163 }
1164 }
1165
1166 for (room_id, state_event_types) in state {
1167 let profiles = profiles.get(&room_id);
1168 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1169
1170 for (event_type, state_events) in state_event_types {
1171 let encoded_event_type =
1172 this.encode_key(keys::STATE_EVENT, event_type.to_string());
1173
1174 for (state_key, raw_state_event) in state_events {
1175 let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1176 let data = this.serialize_json(&raw_state_event)?;
1177
1178 let event_id: Option<String> =
1179 raw_state_event.get_field("event_id").ok().flatten();
1180 let encoded_event_id =
1181 event_id.as_ref().map(|e| this.encode_key(keys::STATE_EVENT, e));
1182
1183 txn.set_state_event(
1184 &encoded_room_id,
1185 &encoded_event_type,
1186 &encoded_state_key,
1187 false,
1188 encoded_event_id.as_deref(),
1189 &data,
1190 )?;
1191
1192 if event_type == StateEventType::RoomMember {
1193 let member_event = match raw_state_event
1194 .deserialize_as::<SyncRoomMemberEvent>()
1195 {
1196 Ok(ev) => ev,
1197 Err(e) => {
1198 debug!(event_id, "Failed to deserialize member event: {e}");
1199 continue;
1200 }
1201 };
1202
1203 let encoded_room_id = this.encode_key(keys::MEMBER, &room_id);
1204 let user_id = this.encode_key(keys::MEMBER, &state_key);
1205 let membership = this
1206 .encode_key(keys::MEMBER, member_event.membership().as_str());
1207 let data = this.serialize_value(&state_key)?;
1208
1209 txn.set_member(
1210 &encoded_room_id,
1211 &user_id,
1212 &membership,
1213 false,
1214 &data,
1215 )?;
1216
1217 if let Some(profile) =
1218 profiles.and_then(|p| p.get(member_event.state_key()))
1219 {
1220 let room_id = this.encode_key(keys::PROFILE, &room_id);
1221 let user_id = this.encode_key(keys::PROFILE, &state_key);
1222 let data = this.serialize_json(&profile)?;
1223 txn.set_profile(&room_id, &user_id, &data)?;
1224 }
1225 }
1226 }
1227 }
1228 }
1229
1230 for (room_id, stripped_state_event_types) in stripped_state {
1231 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1232
1233 for (event_type, stripped_state_events) in stripped_state_event_types {
1234 let encoded_event_type =
1235 this.encode_key(keys::STATE_EVENT, event_type.to_string());
1236
1237 for (state_key, raw_stripped_state_event) in stripped_state_events {
1238 let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1239 let data = this.serialize_json(&raw_stripped_state_event)?;
1240 txn.set_state_event(
1241 &encoded_room_id,
1242 &encoded_event_type,
1243 &encoded_state_key,
1244 true,
1245 None,
1246 &data,
1247 )?;
1248
1249 if event_type == StateEventType::RoomMember {
1250 let member_event = match raw_stripped_state_event
1251 .deserialize_as::<StrippedRoomMemberEvent>(
1252 ) {
1253 Ok(ev) => ev,
1254 Err(e) => {
1255 debug!("Failed to deserialize stripped member event: {e}");
1256 continue;
1257 }
1258 };
1259
1260 let room_id = this.encode_key(keys::MEMBER, &room_id);
1261 let user_id = this.encode_key(keys::MEMBER, &state_key);
1262 let membership = this.encode_key(
1263 keys::MEMBER,
1264 member_event.content.membership.as_str(),
1265 );
1266 let data = this.serialize_value(&state_key)?;
1267
1268 txn.set_member(&room_id, &user_id, &membership, true, &data)?;
1269 }
1270 }
1271 }
1272 }
1273
1274 for (room_id, receipt_event) in receipts {
1275 let room_id = this.encode_key(keys::RECEIPT, room_id);
1276
1277 for (event_id, receipt_types) in receipt_event {
1278 let encoded_event_id = this.encode_key(keys::RECEIPT, &event_id);
1279
1280 for (receipt_type, receipt_users) in receipt_types {
1281 let receipt_type =
1282 this.encode_key(keys::RECEIPT, receipt_type.as_str());
1283
1284 for (user_id, receipt) in receipt_users {
1285 let encoded_user_id = this.encode_key(keys::RECEIPT, &user_id);
1286 let thread = this.encode_key(
1289 keys::RECEIPT,
1290 rmp_serde::to_vec_named(&receipt.thread)?,
1291 );
1292 let data = this.serialize_json(&ReceiptData {
1293 receipt,
1294 event_id: event_id.clone(),
1295 user_id,
1296 })?;
1297
1298 txn.set_receipt(
1299 &room_id,
1300 &encoded_user_id,
1301 &receipt_type,
1302 &thread,
1303 &encoded_event_id,
1304 &data,
1305 )?;
1306 }
1307 }
1308 }
1309 }
1310
1311 for (room_id, redactions) in redactions {
1312 let make_room_version = || {
1313 let encoded_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1314 txn.get_room_info(&encoded_room_id)
1315 .ok()
1316 .flatten()
1317 .and_then(|v| this.deserialize_json::<RoomInfo>(&v).ok())
1318 .and_then(|info| info.room_version().cloned())
1319 .unwrap_or_else(|| {
1320 warn!(
1321 ?room_id,
1322 "Unable to find the room version, assume version 9"
1323 );
1324 RoomVersionId::V9
1325 })
1326 };
1327
1328 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1329 let mut room_version = None;
1330
1331 for (event_id, redaction) in redactions {
1332 let event_id = this.encode_key(keys::STATE_EVENT, event_id);
1333
1334 if let Some(Ok(raw_event)) = txn
1335 .get_state_event_by_id(&encoded_room_id, &event_id)?
1336 .map(|value| this.deserialize_json::<Raw<AnySyncStateEvent>>(&value))
1337 {
1338 let event = raw_event.deserialize()?;
1339 let redacted = redact(
1340 raw_event.deserialize_as::<CanonicalJsonObject>()?,
1341 room_version.get_or_insert_with(make_room_version),
1342 Some(RedactedBecause::from_raw_event(&redaction)?),
1343 )
1344 .map_err(Error::Redaction)?;
1345 let data = this.serialize_json(&redacted)?;
1346
1347 let event_type =
1348 this.encode_key(keys::STATE_EVENT, event.event_type().to_string());
1349 let state_key = this.encode_key(keys::STATE_EVENT, event.state_key());
1350
1351 txn.set_state_event(
1352 &encoded_room_id,
1353 &event_type,
1354 &state_key,
1355 false,
1356 Some(&event_id),
1357 &data,
1358 )?;
1359 }
1360 }
1361 }
1362
1363 for (room_id, display_names) in ambiguity_maps {
1364 let room_id = this.encode_key(keys::DISPLAY_NAME, room_id);
1365
1366 for (name, user_ids) in display_names {
1367 let encoded_name = this.encode_key(
1368 keys::DISPLAY_NAME,
1369 name.as_normalized_str().unwrap_or_else(|| name.as_raw_str()),
1370 );
1371 let data = this.serialize_json(&user_ids)?;
1372
1373 if user_ids.is_empty() {
1374 txn.remove_display_name(&room_id, &encoded_name)?;
1375
1376 let raw_name = this.encode_key(keys::DISPLAY_NAME, name.as_raw_str());
1391 txn.remove_display_name(&room_id, &raw_name)?;
1392 } else {
1393 txn.set_display_name(&room_id, &encoded_name, &data)?;
1395 }
1396 }
1397 }
1398
1399 Ok::<_, Error>(())
1400 })
1401 .await?;
1402
1403 Ok(())
1404 }
1405
1406 async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1407 self.acquire()
1408 .await?
1409 .get_kv_blob(self.encode_presence_key(user_id))
1410 .await?
1411 .map(|data| self.deserialize_json(&data))
1412 .transpose()
1413 }
1414
1415 async fn get_presence_events(
1416 &self,
1417 user_ids: &[OwnedUserId],
1418 ) -> Result<Vec<Raw<PresenceEvent>>> {
1419 if user_ids.is_empty() {
1420 return Ok(Vec::new());
1421 }
1422
1423 let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
1424 self.acquire()
1425 .await?
1426 .get_kv_blobs(user_ids)
1427 .await?
1428 .into_iter()
1429 .map(|data| self.deserialize_json(&data))
1430 .collect()
1431 }
1432
1433 async fn get_state_event(
1434 &self,
1435 room_id: &RoomId,
1436 event_type: StateEventType,
1437 state_key: &str,
1438 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1439 Ok(self
1440 .get_state_events_for_keys(room_id, event_type, &[state_key])
1441 .await?
1442 .into_iter()
1443 .next())
1444 }
1445
1446 async fn get_state_events(
1447 &self,
1448 room_id: &RoomId,
1449 event_type: StateEventType,
1450 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1451 let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1452 let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1453 self.acquire()
1454 .await?
1455 .get_maybe_stripped_state_events(room_id, event_type)
1456 .await?
1457 .into_iter()
1458 .map(|(stripped, data)| {
1459 let ev = if stripped {
1460 RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1461 } else {
1462 RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1463 };
1464
1465 Ok(ev)
1466 })
1467 .collect()
1468 }
1469
1470 async fn get_state_events_for_keys(
1471 &self,
1472 room_id: &RoomId,
1473 event_type: StateEventType,
1474 state_keys: &[&str],
1475 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1476 if state_keys.is_empty() {
1477 return Ok(Vec::new());
1478 }
1479
1480 let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1481 let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1482 let state_keys = state_keys.iter().map(|k| self.encode_key(keys::STATE_EVENT, k)).collect();
1483 self.acquire()
1484 .await?
1485 .get_maybe_stripped_state_events_for_keys(room_id, event_type, state_keys)
1486 .await?
1487 .into_iter()
1488 .map(|(stripped, data)| {
1489 let ev = if stripped {
1490 RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1491 } else {
1492 RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1493 };
1494
1495 Ok(ev)
1496 })
1497 .collect()
1498 }
1499
1500 async fn get_profile(
1501 &self,
1502 room_id: &RoomId,
1503 user_id: &UserId,
1504 ) -> Result<Option<MinimalRoomMemberEvent>> {
1505 let room_id = self.encode_key(keys::PROFILE, room_id);
1506 let user_ids = vec![self.encode_key(keys::PROFILE, user_id)];
1507
1508 self.acquire()
1509 .await?
1510 .get_profiles(room_id, user_ids)
1511 .await?
1512 .into_iter()
1513 .next()
1514 .map(|(_, data)| self.deserialize_json(&data))
1515 .transpose()
1516 }
1517
1518 async fn get_profiles<'a>(
1519 &self,
1520 room_id: &RoomId,
1521 user_ids: &'a [OwnedUserId],
1522 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1523 if user_ids.is_empty() {
1524 return Ok(BTreeMap::new());
1525 }
1526
1527 let room_id = self.encode_key(keys::PROFILE, room_id);
1528 let mut user_ids_map = user_ids
1529 .iter()
1530 .map(|u| (self.encode_key(keys::PROFILE, u), u.as_ref()))
1531 .collect::<BTreeMap<_, _>>();
1532 let user_ids = user_ids_map.keys().cloned().collect();
1533
1534 self.acquire()
1535 .await?
1536 .get_profiles(room_id, user_ids)
1537 .await?
1538 .into_iter()
1539 .map(|(user_id, data)| {
1540 Ok((
1541 user_ids_map
1542 .remove(user_id.as_slice())
1543 .expect("returned user IDs were requested"),
1544 self.deserialize_json(&data)?,
1545 ))
1546 })
1547 .collect()
1548 }
1549
1550 async fn get_user_ids(
1551 &self,
1552 room_id: &RoomId,
1553 membership: RoomMemberships,
1554 ) -> Result<Vec<OwnedUserId>> {
1555 let room_id = self.encode_key(keys::MEMBER, room_id);
1556 let memberships = membership
1557 .as_vec()
1558 .into_iter()
1559 .map(|m| self.encode_key(keys::MEMBER, m.as_str()))
1560 .collect();
1561 self.acquire()
1562 .await?
1563 .get_user_ids(room_id, memberships)
1564 .await?
1565 .iter()
1566 .map(|data| self.deserialize_value(data))
1567 .collect()
1568 }
1569
1570 async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1571 self.acquire()
1572 .await?
1573 .get_room_infos(match room_load_settings {
1574 RoomLoadSettings::All => None,
1575 RoomLoadSettings::One(room_id) => Some(self.encode_key(keys::ROOM_INFO, room_id)),
1576 })
1577 .await?
1578 .into_iter()
1579 .map(|data| self.deserialize_json(&data))
1580 .collect()
1581 }
1582
1583 async fn get_users_with_display_name(
1584 &self,
1585 room_id: &RoomId,
1586 display_name: &DisplayName,
1587 ) -> Result<BTreeSet<OwnedUserId>> {
1588 let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1589 let names = vec![self.encode_key(
1590 keys::DISPLAY_NAME,
1591 display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1592 )];
1593
1594 Ok(self
1595 .acquire()
1596 .await?
1597 .get_display_names(room_id, names)
1598 .await?
1599 .into_iter()
1600 .next()
1601 .map(|(_, data)| self.deserialize_json(&data))
1602 .transpose()?
1603 .unwrap_or_default())
1604 }
1605
1606 async fn get_users_with_display_names<'a>(
1607 &self,
1608 room_id: &RoomId,
1609 display_names: &'a [DisplayName],
1610 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1611 let mut result = HashMap::new();
1612
1613 if display_names.is_empty() {
1614 return Ok(result);
1615 }
1616
1617 let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1618 let mut names_map = display_names
1619 .iter()
1620 .flat_map(|display_name| {
1621 let raw =
1631 (self.encode_key(keys::DISPLAY_NAME, display_name.as_raw_str()), display_name);
1632 let normalized = display_name.as_normalized_str().map(|normalized| {
1633 (self.encode_key(keys::DISPLAY_NAME, normalized), display_name)
1634 });
1635
1636 iter::once(raw).chain(normalized.into_iter())
1637 })
1638 .collect::<BTreeMap<_, _>>();
1639 let names = names_map.keys().cloned().collect();
1640
1641 for (name, data) in
1642 self.acquire().await?.get_display_names(room_id, names).await?.into_iter()
1643 {
1644 let display_name =
1645 names_map.remove(name.as_slice()).expect("returned display names were requested");
1646 let user_ids: BTreeSet<_> = self.deserialize_json(&data)?;
1647
1648 result.entry(display_name).or_insert_with(BTreeSet::new).extend(user_ids);
1649 }
1650
1651 Ok(result)
1652 }
1653
1654 async fn get_account_data_event(
1655 &self,
1656 event_type: GlobalAccountDataEventType,
1657 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1658 let event_type = self.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1659 self.acquire()
1660 .await?
1661 .get_global_account_data(event_type)
1662 .await?
1663 .map(|value| self.deserialize_json(&value))
1664 .transpose()
1665 }
1666
1667 async fn get_room_account_data_event(
1668 &self,
1669 room_id: &RoomId,
1670 event_type: RoomAccountDataEventType,
1671 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1672 let room_id = self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1673 let event_type = self.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1674 self.acquire()
1675 .await?
1676 .get_room_account_data(room_id, event_type)
1677 .await?
1678 .map(|value| self.deserialize_json(&value))
1679 .transpose()
1680 }
1681
1682 async fn get_user_room_receipt_event(
1683 &self,
1684 room_id: &RoomId,
1685 receipt_type: ReceiptType,
1686 thread: ReceiptThread,
1687 user_id: &UserId,
1688 ) -> Result<Option<(OwnedEventId, Receipt)>> {
1689 let room_id = self.encode_key(keys::RECEIPT, room_id);
1690 let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1691 let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1694 let user_id = self.encode_key(keys::RECEIPT, user_id);
1695
1696 self.acquire()
1697 .await?
1698 .get_user_receipt(room_id, receipt_type, thread, user_id)
1699 .await?
1700 .map(|value| {
1701 self.deserialize_json::<ReceiptData>(&value).map(|d| (d.event_id, d.receipt))
1702 })
1703 .transpose()
1704 }
1705
1706 async fn get_event_room_receipt_events(
1707 &self,
1708 room_id: &RoomId,
1709 receipt_type: ReceiptType,
1710 thread: ReceiptThread,
1711 event_id: &EventId,
1712 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1713 let room_id = self.encode_key(keys::RECEIPT, room_id);
1714 let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1715 let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1718 let event_id = self.encode_key(keys::RECEIPT, event_id);
1719
1720 self.acquire()
1721 .await?
1722 .get_event_receipts(room_id, receipt_type, thread, event_id)
1723 .await?
1724 .iter()
1725 .map(|value| {
1726 self.deserialize_json::<ReceiptData>(value).map(|d| (d.user_id, d.receipt))
1727 })
1728 .collect()
1729 }
1730
1731 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1732 self.acquire().await?.get_kv_blob(self.encode_custom_key(key)).await
1733 }
1734
1735 async fn set_custom_value_no_read(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
1736 let conn = self.acquire().await?;
1737 let key = self.encode_custom_key(key);
1738 conn.set_kv_blob(key, value).await?;
1739 Ok(())
1740 }
1741
1742 async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1743 let conn = self.acquire().await?;
1744 let key = self.encode_custom_key(key);
1745 let previous = conn.get_kv_blob(key.clone()).await?;
1746 conn.set_kv_blob(key, value).await?;
1747 Ok(previous)
1748 }
1749
1750 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1751 let conn = self.acquire().await?;
1752 let key = self.encode_custom_key(key);
1753 let previous = conn.get_kv_blob(key.clone()).await?;
1754 if previous.is_some() {
1755 conn.delete_kv_blob(key).await?;
1756 }
1757 Ok(previous)
1758 }
1759
1760 async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1761 let this = self.clone();
1762 let room_id = room_id.to_owned();
1763
1764 let conn = self.acquire().await?;
1765
1766 conn.with_transaction(move |txn| -> Result<()> {
1767 let room_info_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1768 txn.remove_room_info(&room_info_room_id)?;
1769
1770 let state_event_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1771 txn.remove_room_state_events(&state_event_room_id, None)?;
1772
1773 let member_room_id = this.encode_key(keys::MEMBER, &room_id);
1774 txn.remove_room_members(&member_room_id, None)?;
1775
1776 let profile_room_id = this.encode_key(keys::PROFILE, &room_id);
1777 txn.remove_room_profiles(&profile_room_id)?;
1778
1779 let room_account_data_room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, &room_id);
1780 txn.remove_room_account_data(&room_account_data_room_id)?;
1781
1782 let receipt_room_id = this.encode_key(keys::RECEIPT, &room_id);
1783 txn.remove_room_receipts(&receipt_room_id)?;
1784
1785 let display_name_room_id = this.encode_key(keys::DISPLAY_NAME, &room_id);
1786 txn.remove_room_display_names(&display_name_room_id)?;
1787
1788 let send_queue_room_id = this.encode_key(keys::SEND_QUEUE, &room_id);
1789 txn.remove_room_send_queue(&send_queue_room_id)?;
1790
1791 let dependent_send_queue_room_id =
1792 this.encode_key(keys::DEPENDENTS_SEND_QUEUE, &room_id);
1793 txn.remove_room_dependent_send_queue(&dependent_send_queue_room_id)?;
1794
1795 Ok(())
1796 })
1797 .await?;
1798
1799 conn.vacuum().await
1800 }
1801
1802 async fn save_send_queue_request(
1803 &self,
1804 room_id: &RoomId,
1805 transaction_id: OwnedTransactionId,
1806 created_at: MilliSecondsSinceUnixEpoch,
1807 content: QueuedRequestKind,
1808 priority: usize,
1809 ) -> Result<(), Self::Error> {
1810 let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
1811 let room_id_value = self.serialize_value(&room_id.to_owned())?;
1812
1813 let content = self.serialize_json(&content)?;
1814 let created_at_ts: u64 = created_at.0.into();
1820 self.acquire()
1821 .await?
1822 .with_transaction(move |txn| {
1823 txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
1824 Ok(())
1825 })
1826 .await
1827 }
1828
1829 async fn update_send_queue_request(
1830 &self,
1831 room_id: &RoomId,
1832 transaction_id: &TransactionId,
1833 content: QueuedRequestKind,
1834 ) -> Result<bool, Self::Error> {
1835 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1836
1837 let content = self.serialize_json(&content)?;
1838 let transaction_id = transaction_id.to_string();
1841
1842 let num_updated = self.acquire()
1843 .await?
1844 .with_transaction(move |txn| {
1845 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
1846 })
1847 .await?;
1848
1849 Ok(num_updated > 0)
1850 }
1851
1852 async fn remove_send_queue_request(
1853 &self,
1854 room_id: &RoomId,
1855 transaction_id: &TransactionId,
1856 ) -> Result<bool, Self::Error> {
1857 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1858
1859 let transaction_id = transaction_id.to_string();
1861
1862 let num_deleted = self
1863 .acquire()
1864 .await?
1865 .with_transaction(move |txn| {
1866 txn.prepare_cached(
1867 "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
1868 )?
1869 .execute((room_id, &transaction_id))
1870 })
1871 .await?;
1872
1873 Ok(num_deleted > 0)
1874 }
1875
1876 async fn load_send_queue_requests(
1877 &self,
1878 room_id: &RoomId,
1879 ) -> Result<Vec<QueuedRequest>, Self::Error> {
1880 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1881
1882 let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, Option<u64>)> = self
1886 .acquire()
1887 .await?
1888 .prepare(
1889 "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
1890 |mut stmt| {
1891 stmt.query((room_id,))?
1892 .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
1893 .collect()
1894 },
1895 )
1896 .await?;
1897
1898 let mut requests = Vec::with_capacity(res.len());
1899 for entry in res {
1900 let created_at = entry
1901 .4
1902 .and_then(UInt::new)
1903 .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
1904 requests.push(QueuedRequest {
1905 transaction_id: entry.0.into(),
1906 kind: self.deserialize_json(&entry.1)?,
1907 error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
1908 priority: entry.3,
1909 created_at,
1910 });
1911 }
1912
1913 Ok(requests)
1914 }
1915
1916 async fn update_send_queue_request_status(
1917 &self,
1918 room_id: &RoomId,
1919 transaction_id: &TransactionId,
1920 error: Option<QueueWedgeError>,
1921 ) -> Result<(), Self::Error> {
1922 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1923
1924 let transaction_id = transaction_id.to_string();
1926
1927 let error_value = error.map(|e| self.serialize_value(&e)).transpose()?;
1929
1930 self.acquire()
1931 .await?
1932 .with_transaction(move |txn| {
1933 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?;
1934 Ok(())
1935 })
1936 .await
1937 }
1938
1939 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1940 let res: Vec<Vec<u8>> = self
1945 .acquire()
1946 .await?
1947 .prepare("SELECT room_id_val FROM send_queue_events", |mut stmt| {
1948 stmt.query(())?.mapped(|row| row.get(0)).collect()
1949 })
1950 .await?;
1951
1952 Ok(res
1955 .into_iter()
1956 .map(|entry| self.deserialize_value(&entry))
1957 .collect::<Result<BTreeSet<OwnedRoomId>, _>>()?
1958 .into_iter()
1959 .collect())
1960 }
1961
1962 async fn save_dependent_queued_request(
1963 &self,
1964 room_id: &RoomId,
1965 parent_txn_id: &TransactionId,
1966 own_txn_id: ChildTransactionId,
1967 created_at: MilliSecondsSinceUnixEpoch,
1968 content: DependentQueuedRequestKind,
1969 ) -> Result<()> {
1970 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
1971 let content = self.serialize_json(&content)?;
1972
1973 let parent_txn_id = parent_txn_id.to_string();
1975 let own_txn_id = own_txn_id.to_string();
1976
1977 let created_at_ts: u64 = created_at.0.into();
1978 self.acquire()
1979 .await?
1980 .with_transaction(move |txn| {
1981 txn.prepare_cached(
1982 r#"INSERT INTO dependent_send_queue_events
1983 (room_id, parent_transaction_id, own_transaction_id, content, created_at)
1984 VALUES (?, ?, ?, ?, ?)"#,
1985 )?
1986 .execute((
1987 room_id,
1988 parent_txn_id,
1989 own_txn_id,
1990 content,
1991 created_at_ts,
1992 ))?;
1993 Ok(())
1994 })
1995 .await
1996 }
1997
1998 async fn update_dependent_queued_request(
1999 &self,
2000 room_id: &RoomId,
2001 own_transaction_id: &ChildTransactionId,
2002 new_content: DependentQueuedRequestKind,
2003 ) -> Result<bool> {
2004 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2005 let content = self.serialize_json(&new_content)?;
2006
2007 let own_txn_id = own_transaction_id.to_string();
2009
2010 let num_updated = self
2011 .acquire()
2012 .await?
2013 .with_transaction(move |txn| {
2014 txn.prepare_cached(
2015 r#"UPDATE dependent_send_queue_events
2016 SET content = ?
2017 WHERE own_transaction_id = ?
2018 AND room_id = ?"#,
2019 )?
2020 .execute((content, own_txn_id, room_id))
2021 })
2022 .await?;
2023
2024 if num_updated > 1 {
2025 return Err(Error::InconsistentUpdate);
2026 }
2027
2028 Ok(num_updated == 1)
2029 }
2030
2031 async fn mark_dependent_queued_requests_as_ready(
2032 &self,
2033 room_id: &RoomId,
2034 parent_txn_id: &TransactionId,
2035 parent_key: SentRequestKey,
2036 ) -> Result<usize> {
2037 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2038 let parent_key = self.serialize_value(&parent_key)?;
2039
2040 let parent_txn_id = parent_txn_id.to_string();
2042
2043 self.acquire()
2044 .await?
2045 .with_transaction(move |txn| {
2046 Ok(txn.prepare_cached(
2047 "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?",
2048 )?
2049 .execute((parent_key, parent_txn_id, room_id))?)
2050 })
2051 .await
2052 }
2053
2054 async fn remove_dependent_queued_request(
2055 &self,
2056 room_id: &RoomId,
2057 txn_id: &ChildTransactionId,
2058 ) -> Result<bool> {
2059 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2060
2061 let txn_id = txn_id.to_string();
2063
2064 let num_deleted = self
2065 .acquire()
2066 .await?
2067 .with_transaction(move |txn| {
2068 txn.prepare_cached(
2069 "DELETE FROM dependent_send_queue_events WHERE own_transaction_id = ? AND room_id = ?",
2070 )?
2071 .execute((txn_id, room_id))
2072 })
2073 .await?;
2074
2075 Ok(num_deleted > 0)
2076 }
2077
2078 async fn load_dependent_queued_requests(
2079 &self,
2080 room_id: &RoomId,
2081 ) -> Result<Vec<DependentQueuedRequest>> {
2082 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2083
2084 let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, Option<u64>)> = self
2086 .acquire()
2087 .await?
2088 .prepare(
2089 "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2090 |mut stmt| {
2091 stmt.query((room_id,))?
2092 .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2093 .collect()
2094 },
2095 )
2096 .await?;
2097
2098 let mut dependent_events = Vec::with_capacity(res.len());
2099 for entry in res {
2100 let created_at = entry
2101 .4
2102 .and_then(UInt::new)
2103 .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2104 dependent_events.push(DependentQueuedRequest {
2105 own_transaction_id: entry.0.into(),
2106 parent_transaction_id: entry.1.into(),
2107 parent_key: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?,
2108 kind: self.deserialize_json(&entry.3)?,
2109 created_at,
2110 });
2111 }
2112
2113 Ok(dependent_events)
2114 }
2115}
2116
2117#[derive(Debug, Clone, Serialize, Deserialize)]
2118struct ReceiptData {
2119 receipt: Receipt,
2120 event_id: OwnedEventId,
2121 user_id: OwnedUserId,
2122}
2123
2124#[cfg(test)]
2125mod tests {
2126 use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2127
2128 use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2129 use once_cell::sync::Lazy;
2130 use tempfile::{tempdir, TempDir};
2131
2132 use super::SqliteStateStore;
2133
2134 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2135 static NUM: AtomicU32 = AtomicU32::new(0);
2136
2137 async fn get_store() -> Result<impl StateStore, StoreError> {
2138 let name = NUM.fetch_add(1, SeqCst).to_string();
2139 let tmpdir_path = TMP_DIR.path().join(name);
2140
2141 tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2142
2143 Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
2144 }
2145
2146 statestore_integration_tests!();
2147}
2148
2149#[cfg(test)]
2150mod encrypted_tests {
2151 use std::{
2152 path::PathBuf,
2153 sync::atomic::{AtomicU32, Ordering::SeqCst},
2154 };
2155
2156 use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2157 use matrix_sdk_test::async_test;
2158 use once_cell::sync::Lazy;
2159 use tempfile::{tempdir, TempDir};
2160
2161 use super::SqliteStateStore;
2162 use crate::{utils::SqliteAsyncConnExt, SqliteStoreConfig};
2163
2164 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2165 static NUM: AtomicU32 = AtomicU32::new(0);
2166
2167 fn new_state_store_workspace() -> PathBuf {
2168 let name = NUM.fetch_add(1, SeqCst).to_string();
2169 TMP_DIR.path().join(name)
2170 }
2171
2172 async fn get_store() -> Result<impl StateStore, StoreError> {
2173 let tmpdir_path = new_state_store_workspace();
2174
2175 tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2176
2177 Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
2178 .await
2179 .unwrap())
2180 }
2181
2182 #[async_test]
2183 async fn test_pool_size() {
2184 let tmpdir_path = new_state_store_workspace();
2185 let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
2186
2187 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2188
2189 assert_eq!(store.pool.status().max_size, 42);
2190 }
2191
2192 #[async_test]
2193 async fn test_cache_size() {
2194 let tmpdir_path = new_state_store_workspace();
2195 let store_open_config = SqliteStoreConfig::new(tmpdir_path).cache_size(1500);
2196
2197 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2198
2199 let conn = store.pool.get().await.unwrap();
2200 let cache_size =
2201 conn.query_row("PRAGMA cache_size", (), |row| row.get::<_, i32>(0)).await.unwrap();
2202
2203 assert_eq!(cache_size, -(1500 / 1024));
2207 }
2208
2209 #[async_test]
2210 async fn test_journal_size_limit() {
2211 let tmpdir_path = new_state_store_workspace();
2212 let store_open_config = SqliteStoreConfig::new(tmpdir_path).journal_size_limit(1500);
2213
2214 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2215
2216 let conn = store.pool.get().await.unwrap();
2217 let journal_size_limit = conn
2218 .query_row("PRAGMA journal_size_limit", (), |row| row.get::<_, u32>(0))
2219 .await
2220 .unwrap();
2221
2222 assert_eq!(journal_size_limit, 1500);
2225 }
2226
2227 statestore_integration_tests!();
2228}
2229
2230#[cfg(test)]
2231mod migration_tests {
2232 use std::{
2233 path::{Path, PathBuf},
2234 sync::{
2235 atomic::{AtomicU32, Ordering::SeqCst},
2236 Arc,
2237 },
2238 };
2239
2240 use deadpool_sqlite::Runtime;
2241 use matrix_sdk_base::{
2242 store::{
2243 ChildTransactionId, DependentQueuedRequestKind, RoomLoadSettings,
2244 SerializableEventContent,
2245 },
2246 sync::UnreadNotificationsCount,
2247 RoomState, StateStore,
2248 };
2249 use matrix_sdk_test::async_test;
2250 use once_cell::sync::Lazy;
2251 use ruma::{
2252 events::{
2253 room::{create::RoomCreateEventContent, message::RoomMessageEventContent},
2254 StateEventType,
2255 },
2256 room_id, server_name, user_id, EventId, MilliSecondsSinceUnixEpoch, RoomId, TransactionId,
2257 UserId,
2258 };
2259 use rusqlite::Transaction;
2260 use serde_json::json;
2261 use tempfile::{tempdir, TempDir};
2262 use tokio::fs;
2263
2264 use super::{init, keys, SqliteStateStore, DATABASE_NAME};
2265 use crate::{
2266 error::{Error, Result},
2267 utils::{SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
2268 OpenStoreError,
2269 };
2270
2271 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2272 static NUM: AtomicU32 = AtomicU32::new(0);
2273 const SECRET: &str = "secret";
2274
2275 fn new_path() -> PathBuf {
2276 let name = NUM.fetch_add(1, SeqCst).to_string();
2277 TMP_DIR.path().join(name)
2278 }
2279
2280 async fn create_fake_db(path: &Path, version: u8) -> Result<SqliteStateStore> {
2281 fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir).unwrap();
2282
2283 let config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
2284 let pool = config.create_pool(Runtime::Tokio1).unwrap();
2287 let conn = pool.get().await?;
2288
2289 init(&conn).await?;
2290
2291 let store_cipher = Some(Arc::new(conn.get_or_create_store_cipher(SECRET).await.unwrap()));
2292 let this = SqliteStateStore { store_cipher, pool };
2293 this.run_migrations(&conn, 1, Some(version)).await?;
2294
2295 Ok(this)
2296 }
2297
2298 fn room_info_v1_json(
2299 room_id: &RoomId,
2300 state: RoomState,
2301 name: Option<&str>,
2302 creator: Option<&UserId>,
2303 ) -> serde_json::Value {
2304 let name_content = match name {
2306 Some(name) => json!({ "name": name }),
2307 None => json!({ "name": null }),
2308 };
2309 let create_content = match creator {
2311 Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2312 None => RoomCreateEventContent::new_v11(),
2313 };
2314
2315 json!({
2316 "room_id": room_id,
2317 "room_type": state,
2318 "notification_counts": UnreadNotificationsCount::default(),
2319 "summary": {
2320 "heroes": [],
2321 "joined_member_count": 0,
2322 "invited_member_count": 0,
2323 },
2324 "members_synced": false,
2325 "base_info": {
2326 "dm_targets": [],
2327 "max_power_level": 100,
2328 "name": {
2329 "Original": {
2330 "content": name_content,
2331 },
2332 },
2333 "create": {
2334 "Original": {
2335 "content": create_content,
2336 }
2337 }
2338 },
2339 })
2340 }
2341
2342 #[async_test]
2343 pub async fn test_migrating_v1_to_v2() {
2344 let path = new_path();
2345 {
2347 let db = create_fake_db(&path, 1).await.unwrap();
2348 let conn = db.pool.get().await.unwrap();
2349
2350 let this = db.clone();
2351 conn.with_transaction(move |txn| {
2352 for i in 0..5 {
2353 let room_id = RoomId::parse(format!("!room_{i}:localhost")).unwrap();
2354 let (state, stripped) =
2355 if i < 3 { (RoomState::Joined, false) } else { (RoomState::Invited, true) };
2356 let info = room_info_v1_json(&room_id, state, None, None);
2357
2358 let room_id = this.encode_key(keys::ROOM_INFO, room_id);
2359 let data = this.serialize_json(&info)?;
2360
2361 txn.prepare_cached(
2362 "INSERT INTO room_info (room_id, stripped, data)
2363 VALUES (?, ?, ?)",
2364 )?
2365 .execute((room_id, stripped, data))?;
2366 }
2367
2368 Result::<_, Error>::Ok(())
2369 })
2370 .await
2371 .unwrap();
2372 }
2373
2374 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2376
2377 assert_eq!(store.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 5);
2379 }
2380
2381 fn add_room_v2(
2383 this: &SqliteStateStore,
2384 txn: &Transaction<'_>,
2385 room_id: &RoomId,
2386 name: Option<&str>,
2387 create_creator: Option<&UserId>,
2388 create_sender: Option<&UserId>,
2389 ) -> Result<(), Error> {
2390 let room_info_json = room_info_v1_json(room_id, RoomState::Joined, name, create_creator);
2391
2392 let encoded_room_id = this.encode_key(keys::ROOM_INFO, room_id);
2393 let encoded_state =
2394 this.encode_key(keys::ROOM_INFO, serde_json::to_string(&RoomState::Joined)?);
2395 let data = this.serialize_json(&room_info_json)?;
2396
2397 txn.prepare_cached(
2398 "INSERT INTO room_info (room_id, state, data)
2399 VALUES (?, ?, ?)",
2400 )?
2401 .execute((encoded_room_id, encoded_state, data))?;
2402
2403 let Some(create_sender) = create_sender else {
2405 return Ok(());
2406 };
2407
2408 let create_content = match create_creator {
2409 Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2410 None => RoomCreateEventContent::new_v11(),
2411 };
2412
2413 let event_id = EventId::new(server_name!("dummy.local"));
2414 let create_event = json!({
2415 "content": create_content,
2416 "event_id": event_id,
2417 "sender": create_sender.to_owned(),
2418 "origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
2419 "state_key": "",
2420 "type": "m.room.create",
2421 "unsigned": {},
2422 });
2423
2424 let encoded_room_id = this.encode_key(keys::STATE_EVENT, room_id);
2425 let encoded_event_type =
2426 this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
2427 let encoded_state_key = this.encode_key(keys::STATE_EVENT, "");
2428 let stripped = false;
2429 let encoded_event_id = this.encode_key(keys::STATE_EVENT, event_id);
2430 let data = this.serialize_json(&create_event)?;
2431
2432 txn.prepare_cached(
2433 "INSERT
2434 INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
2435 VALUES (?, ?, ?, ?, ?, ?)",
2436 )?
2437 .execute((
2438 encoded_room_id,
2439 encoded_event_type,
2440 encoded_state_key,
2441 stripped,
2442 encoded_event_id,
2443 data,
2444 ))?;
2445
2446 Ok(())
2447 }
2448
2449 #[async_test]
2450 pub async fn test_migrating_v2_to_v3() {
2451 let path = new_path();
2452
2453 let room_a_id = room_id!("!room_a:dummy.local");
2455 let room_a_name = "Room A";
2456 let room_a_creator = user_id!("@creator:dummy.local");
2457 let room_a_create_sender = user_id!("@sender:dummy.local");
2460
2461 let room_b_id = room_id!("!room_b:dummy.local");
2463
2464 let room_c_id = room_id!("!room_c:dummy.local");
2466 let room_c_create_sender = user_id!("@creator:dummy.local");
2467
2468 {
2470 let db = create_fake_db(&path, 2).await.unwrap();
2471 let conn = db.pool.get().await.unwrap();
2472
2473 let this = db.clone();
2474 conn.with_transaction(move |txn| {
2475 add_room_v2(
2476 &this,
2477 txn,
2478 room_a_id,
2479 Some(room_a_name),
2480 Some(room_a_creator),
2481 Some(room_a_create_sender),
2482 )?;
2483 add_room_v2(&this, txn, room_b_id, None, None, None)?;
2484 add_room_v2(&this, txn, room_c_id, None, None, Some(room_c_create_sender))?;
2485
2486 Result::<_, Error>::Ok(())
2487 })
2488 .await
2489 .unwrap();
2490 }
2491
2492 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2494
2495 let room_infos = store.get_room_infos(&RoomLoadSettings::default()).await.unwrap();
2497 assert_eq!(room_infos.len(), 3);
2498
2499 let room_a = room_infos.iter().find(|r| r.room_id() == room_a_id).unwrap();
2500 assert_eq!(room_a.name(), Some(room_a_name));
2501 assert_eq!(room_a.creator(), Some(room_a_create_sender));
2502
2503 let room_b = room_infos.iter().find(|r| r.room_id() == room_b_id).unwrap();
2504 assert_eq!(room_b.name(), None);
2505 assert_eq!(room_b.creator(), None);
2506
2507 let room_c = room_infos.iter().find(|r| r.room_id() == room_c_id).unwrap();
2508 assert_eq!(room_c.name(), None);
2509 assert_eq!(room_c.creator(), Some(room_c_create_sender));
2510 }
2511
2512 #[async_test]
2513 pub async fn test_migrating_v7_to_v9() {
2514 let path = new_path();
2515
2516 let room_id = room_id!("!room_a:dummy.local");
2517 let wedged_event_transaction_id = TransactionId::new();
2518 let local_event_transaction_id = TransactionId::new();
2519
2520 {
2522 let db = create_fake_db(&path, 7).await.unwrap();
2523 let conn = db.pool.get().await.unwrap();
2524
2525 let wedge_tx = wedged_event_transaction_id.clone();
2526 let local_tx = local_event_transaction_id.clone();
2527
2528 conn.with_transaction(move |txn| {
2529 add_dependent_send_queue_event_v7(
2530 &db,
2531 txn,
2532 room_id,
2533 &local_tx,
2534 ChildTransactionId::new(),
2535 DependentQueuedRequestKind::RedactEvent,
2536 )?;
2537 add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?;
2538 add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?;
2539 Result::<_, Error>::Ok(())
2540 })
2541 .await
2542 .unwrap();
2543 }
2544
2545 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2548
2549 let requests = store.load_send_queue_requests(room_id).await.unwrap();
2550 assert!(requests.is_empty());
2551
2552 let dependent_requests = store.load_dependent_queued_requests(room_id).await.unwrap();
2553 assert!(dependent_requests.is_empty());
2554 }
2555
2556 fn add_send_queue_event_v7(
2557 this: &SqliteStateStore,
2558 txn: &Transaction<'_>,
2559 transaction_id: &TransactionId,
2560 room_id: &RoomId,
2561 is_wedged: bool,
2562 ) -> Result<(), Error> {
2563 let content =
2564 SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?;
2565
2566 let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id);
2567 let room_id_value = this.serialize_value(&room_id.to_owned())?;
2568
2569 let content = this.serialize_json(&content)?;
2570
2571 txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
2572 .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
2573
2574 Ok(())
2575 }
2576
2577 fn add_dependent_send_queue_event_v7(
2578 this: &SqliteStateStore,
2579 txn: &Transaction<'_>,
2580 room_id: &RoomId,
2581 parent_txn_id: &TransactionId,
2582 own_txn_id: ChildTransactionId,
2583 content: DependentQueuedRequestKind,
2584 ) -> Result<(), Error> {
2585 let room_id_value = this.serialize_value(&room_id.to_owned())?;
2586
2587 let parent_txn_id = parent_txn_id.to_string();
2588 let own_txn_id = own_txn_id.to_string();
2589 let content = this.serialize_json(&content)?;
2590
2591 txn.prepare_cached(
2592 "INSERT INTO dependent_send_queue_events
2593 (room_id, parent_transaction_id, own_transaction_id, content)
2594 VALUES (?, ?, ?, ?)",
2595 )?
2596 .execute((room_id_value, parent_txn_id, own_txn_id, content))?;
2597
2598 Ok(())
2599 }
2600}