1use std::{
2 borrow::Cow,
3 collections::{BTreeMap, BTreeSet, HashMap},
4 fmt, iter,
5 path::Path,
6 sync::Arc,
7};
8
9use async_trait::async_trait;
10use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
11use matrix_sdk_base::{
12 deserialized_responses::{DisplayName, RawAnySyncOrStrippedState, SyncOrStrippedState},
13 store::{
14 migration_helpers::RoomInfoV1, ChildTransactionId, DependentQueuedRequest,
15 DependentQueuedRequestKind, QueueWedgeError, QueuedRequest, QueuedRequestKind,
16 RoomLoadSettings, SentRequestKey, ThreadSubscription,
17 },
18 MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateChanges, StateStore,
19 StateStoreDataKey, StateStoreDataValue, ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK,
20};
21use matrix_sdk_store_encryption::StoreCipher;
22use ruma::{
23 canonical_json::{redact, RedactedBecause},
24 events::{
25 presence::PresenceEvent,
26 receipt::{Receipt, ReceiptThread, ReceiptType},
27 room::{
28 create::RoomCreateEventContent,
29 member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
30 },
31 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
32 GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
33 },
34 serde::Raw,
35 CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
36 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
37};
38use rusqlite::{OptionalExtension, Transaction};
39use serde::{Deserialize, Serialize};
40use tokio::fs;
41use tracing::{debug, warn};
42
43use crate::{
44 error::{Error, Result},
45 utils::{
46 repeat_vars, EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
47 SqliteKeyValueStoreConnExt,
48 },
49 OpenStoreError, SqliteStoreConfig,
50};
51
52mod keys {
53 pub const KV_BLOB: &str = "kv_blob";
55 pub const ROOM_INFO: &str = "room_info";
56 pub const STATE_EVENT: &str = "state_event";
57 pub const GLOBAL_ACCOUNT_DATA: &str = "global_account_data";
58 pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
59 pub const MEMBER: &str = "member";
60 pub const PROFILE: &str = "profile";
61 pub const RECEIPT: &str = "receipt";
62 pub const DISPLAY_NAME: &str = "display_name";
63 pub const SEND_QUEUE: &str = "send_queue_events";
64 pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
65 pub const THREAD_SUBSCRIPTIONS: &str = "thread_subscriptions";
66}
67
68pub const DATABASE_NAME: &str = "matrix-sdk-state.sqlite3";
70
71const DATABASE_VERSION: u8 = 13;
77
78#[derive(Clone)]
80pub struct SqliteStateStore {
81 store_cipher: Option<Arc<StoreCipher>>,
82 pool: SqlitePool,
83}
84
85#[cfg(not(tarpaulin_include))]
86impl fmt::Debug for SqliteStateStore {
87 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88 f.debug_struct("SqliteStateStore").finish_non_exhaustive()
89 }
90}
91
92impl SqliteStateStore {
93 pub async fn open(
96 path: impl AsRef<Path>,
97 passphrase: Option<&str>,
98 ) -> Result<Self, OpenStoreError> {
99 Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
100 }
101
102 pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
104 let SqliteStoreConfig { path, passphrase, pool_config, runtime_config } = config;
105
106 fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
107
108 let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
109 config.pool = Some(pool_config);
110
111 let pool = config.create_pool(Runtime::Tokio1)?;
112
113 let this = Self::open_with_pool(pool, passphrase.as_deref()).await?;
114 this.pool.get().await?.apply_runtime_config(runtime_config).await?;
115
116 Ok(this)
117 }
118
119 pub async fn open_with_pool(
122 pool: SqlitePool,
123 passphrase: Option<&str>,
124 ) -> Result<Self, OpenStoreError> {
125 let conn = pool.get().await?;
126
127 let mut version = conn.db_version().await?;
128
129 if version == 0 {
130 init(&conn).await?;
131 version = 1;
132 }
133
134 let store_cipher = match passphrase {
135 Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
136 None => None,
137 };
138 let this = Self { store_cipher, pool };
139 this.run_migrations(&conn, version, None).await?;
140
141 Ok(this)
142 }
143
144 async fn run_migrations(&self, conn: &SqliteAsyncConn, from: u8, to: Option<u8>) -> Result<()> {
149 let to = to.unwrap_or(DATABASE_VERSION);
150
151 if from < to {
152 debug!(version = from, new_version = to, "Upgrading database");
153 } else {
154 return Ok(());
155 }
156
157 if from < 2 && to >= 2 {
158 let this = self.clone();
159 conn.with_transaction(move |txn| {
160 txn.execute_batch(include_str!(
162 "../migrations/state_store/002_a_create_new_room_info.sql"
163 ))?;
164
165 for data in txn
167 .prepare("SELECT data FROM room_info")?
168 .query_map((), |row| row.get::<_, Vec<u8>>(0))?
169 {
170 let data = data?;
171 let room_info: RoomInfoV1 = this.deserialize_json(&data)?;
172
173 let room_id = this.encode_key(keys::ROOM_INFO, room_info.room_id());
174 let state = this
175 .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
176 txn.prepare_cached(
177 "INSERT OR REPLACE INTO new_room_info (room_id, state, data)
178 VALUES (?, ?, ?)",
179 )?
180 .execute((room_id, state, data))?;
181 }
182
183 txn.execute_batch(include_str!(
185 "../migrations/state_store/002_b_replace_room_info.sql"
186 ))?;
187
188 txn.set_db_version(2)?;
189 Result::<_, Error>::Ok(())
190 })
191 .await?;
192 }
193
194 if from < 3 && to >= 3 {
196 let this = self.clone();
197 conn.with_transaction(move |txn| {
198 for data in txn
200 .prepare("SELECT data FROM room_info")?
201 .query_map((), |row| row.get::<_, Vec<u8>>(0))?
202 {
203 let data = data?;
204 let room_info_v1: RoomInfoV1 = this.deserialize_json(&data)?;
205
206 let room_id = this.encode_key(keys::STATE_EVENT, room_info_v1.room_id());
208 let event_type =
209 this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
210 let create_res = txn
211 .prepare(
212 "SELECT stripped, data FROM state_event
213 WHERE room_id = ? AND event_type = ?",
214 )?
215 .query_row([room_id, event_type], |row| {
216 Ok((row.get::<_, bool>(0)?, row.get::<_, Vec<u8>>(1)?))
217 })
218 .optional()?;
219
220 let create = create_res.and_then(|(stripped, data)| {
221 let create = if stripped {
222 SyncOrStrippedState::<RoomCreateEventContent>::Stripped(
223 this.deserialize_json(&data).ok()?,
224 )
225 } else {
226 SyncOrStrippedState::Sync(this.deserialize_json(&data).ok()?)
227 };
228 Some(create)
229 });
230
231 let migrated_room_info = room_info_v1.migrate(create.as_ref());
232
233 let data = this.serialize_json(&migrated_room_info)?;
234 let room_id = this.encode_key(keys::ROOM_INFO, migrated_room_info.room_id());
235 txn.prepare_cached("UPDATE room_info SET data = ? WHERE room_id = ?")?
236 .execute((data, room_id))?;
237 }
238
239 txn.set_db_version(3)?;
240 Result::<_, Error>::Ok(())
241 })
242 .await?;
243 }
244
245 if from < 4 && to >= 4 {
246 conn.with_transaction(move |txn| {
247 txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
249 txn.set_db_version(4)
250 })
251 .await?;
252 }
253
254 if from < 5 && to >= 5 {
255 conn.with_transaction(move |txn| {
256 txn.execute_batch(include_str!(
258 "../migrations/state_store/004_send_queue_with_roomid_value.sql"
259 ))?;
260 txn.set_db_version(4)
261 })
262 .await?;
263 }
264
265 if from < 6 && to >= 6 {
266 conn.with_transaction(move |txn| {
267 txn.execute_batch(include_str!(
269 "../migrations/state_store/005_send_queue_dependent_events.sql"
270 ))?;
271 txn.set_db_version(6)
272 })
273 .await?;
274 }
275
276 if from < 7 && to >= 7 {
277 conn.with_transaction(move |txn| {
278 txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
280 txn.set_db_version(7)
281 })
282 .await?;
283 }
284
285 if from < 8 && to >= 8 {
286 let error = QueueWedgeError::GenericApiError {
288 msg: "local echo failed to send in a previous session".into(),
289 };
290 let default_err = self.serialize_value(&error)?;
291
292 conn.with_transaction(move |txn| {
293 txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?;
295
296 for wedged_entries in txn
299 .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")?
300 .query_map((), |row| {
301 Ok(
302 (row.get::<_, Vec<u8>>(0)?,row.get::<_, String>(1)?)
303 )
304 })? {
305
306 let (room_id, transaction_id) = wedged_entries?;
307
308 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?
309 .execute((default_err.clone(), room_id, transaction_id))?;
310 }
311
312
313 txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?;
315
316 txn.set_db_version(8)
317 })
318 .await?;
319 }
320
321 if from < 9 && to >= 9 {
322 conn.with_transaction(move |txn| {
323 txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?;
325 txn.set_db_version(9)
326 })
327 .await?;
328 }
329
330 if from < 10 && to >= 10 {
331 conn.with_transaction(move |txn| {
332 txn.execute_batch(include_str!(
334 "../migrations/state_store/009_send_queue_priority.sql"
335 ))?;
336 txn.set_db_version(10)
337 })
338 .await?;
339 }
340
341 if from < 11 && to >= 11 {
342 conn.with_transaction(move |txn| {
343 txn.execute_batch(include_str!(
345 "../migrations/state_store/010_send_queue_enqueue_time.sql"
346 ))?;
347 txn.set_db_version(11)
348 })
349 .await?;
350 }
351
352 if from < 12 && to >= 12 {
353 conn.vacuum().await?;
357 conn.set_kv("version", vec![12]).await?;
358 }
359
360 if from < 13 && to >= 13 {
361 conn.with_transaction(move |txn| {
362 txn.execute_batch(include_str!(
364 "../migrations/state_store/011_thread_subscriptions.sql"
365 ))?;
366 txn.set_db_version(13)
367 })
368 .await?;
369 }
370
371 Ok(())
372 }
373
374 fn encode_state_store_data_key(&self, key: StateStoreDataKey<'_>) -> Key {
375 let key_s = match key {
376 StateStoreDataKey::SyncToken => Cow::Borrowed(StateStoreDataKey::SYNC_TOKEN),
377 StateStoreDataKey::ServerInfo => Cow::Borrowed(StateStoreDataKey::SERVER_INFO),
378 StateStoreDataKey::Filter(f) => {
379 Cow::Owned(format!("{}:{f}", StateStoreDataKey::FILTER))
380 }
381 StateStoreDataKey::UserAvatarUrl(u) => {
382 Cow::Owned(format!("{}:{u}", StateStoreDataKey::USER_AVATAR_URL))
383 }
384 StateStoreDataKey::RecentlyVisitedRooms(b) => {
385 Cow::Owned(format!("{}:{b}", StateStoreDataKey::RECENTLY_VISITED_ROOMS))
386 }
387 StateStoreDataKey::UtdHookManagerData => {
388 Cow::Borrowed(StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
389 }
390 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
391 Cow::Borrowed(StateStoreDataKey::ONE_TIME_KEY_ALREADY_UPLOADED)
392 }
393 StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
394 if let Some(thread_root) = thread_root {
395 Cow::Owned(format!(
396 "{}:{room_id}:{thread_root}",
397 StateStoreDataKey::COMPOSER_DRAFT
398 ))
399 } else {
400 Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::COMPOSER_DRAFT))
401 }
402 }
403 StateStoreDataKey::SeenKnockRequests(room_id) => {
404 Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::SEEN_KNOCK_REQUESTS))
405 }
406 };
407
408 self.encode_key(keys::KV_BLOB, &*key_s)
409 }
410
411 fn encode_presence_key(&self, user_id: &UserId) -> Key {
412 self.encode_key(keys::KV_BLOB, format!("presence:{user_id}"))
413 }
414
415 fn encode_custom_key(&self, key: &[u8]) -> Key {
416 let mut full_key = b"custom:".to_vec();
417 full_key.extend(key);
418 self.encode_key(keys::KV_BLOB, full_key)
419 }
420
421 async fn acquire(&self) -> Result<SqliteAsyncConn> {
422 Ok(self.pool.get().await?)
423 }
424
425 fn remove_maybe_stripped_room_data(
426 &self,
427 txn: &Transaction<'_>,
428 room_id: &RoomId,
429 stripped: bool,
430 ) -> rusqlite::Result<()> {
431 let state_event_room_id = self.encode_key(keys::STATE_EVENT, room_id);
432 txn.remove_room_state_events(&state_event_room_id, Some(stripped))?;
433
434 let member_room_id = self.encode_key(keys::MEMBER, room_id);
435 txn.remove_room_members(&member_room_id, Some(stripped))
436 }
437}
438
439impl EncryptableStore for SqliteStateStore {
440 fn get_cypher(&self) -> Option<&StoreCipher> {
441 self.store_cipher.as_deref()
442 }
443}
444
445async fn init(conn: &SqliteAsyncConn) -> Result<()> {
447 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
450 conn.with_transaction(|txn| {
451 txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
452 txn.set_db_version(1)?;
453
454 Ok(())
455 })
456 .await
457}
458
459trait SqliteConnectionStateStoreExt {
460 fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()>;
461
462 fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()>;
463
464 fn set_room_account_data(
465 &self,
466 room_id: &[u8],
467 event_type: &[u8],
468 data: &[u8],
469 ) -> rusqlite::Result<()>;
470 fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()>;
471
472 fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()>;
473 fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>>;
474 fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()>;
475
476 fn set_state_event(
477 &self,
478 room_id: &[u8],
479 event_type: &[u8],
480 state_key: &[u8],
481 stripped: bool,
482 event_id: Option<&[u8]>,
483 data: &[u8],
484 ) -> rusqlite::Result<()>;
485 fn get_state_event_by_id(
486 &self,
487 room_id: &[u8],
488 event_id: &[u8],
489 ) -> rusqlite::Result<Option<Vec<u8>>>;
490 fn remove_room_state_events(
491 &self,
492 room_id: &[u8],
493 stripped: Option<bool>,
494 ) -> rusqlite::Result<()>;
495
496 fn set_member(
497 &self,
498 room_id: &[u8],
499 user_id: &[u8],
500 membership: &[u8],
501 stripped: bool,
502 data: &[u8],
503 ) -> rusqlite::Result<()>;
504 fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()>;
505
506 fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
507 fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()>;
508 fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()>;
509
510 fn set_receipt(
511 &self,
512 room_id: &[u8],
513 user_id: &[u8],
514 receipt_type: &[u8],
515 thread_id: &[u8],
516 event_id: &[u8],
517 data: &[u8],
518 ) -> rusqlite::Result<()>;
519 fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()>;
520
521 fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()>;
522 fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()>;
523 fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()>;
524 fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
525 fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
526}
527
528impl SqliteConnectionStateStoreExt for rusqlite::Connection {
529 fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()> {
530 self.execute("INSERT OR REPLACE INTO kv_blob VALUES (?, ?)", (key, value))?;
531 Ok(())
532 }
533
534 fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()> {
535 self.prepare_cached(
536 "INSERT OR REPLACE INTO global_account_data (event_type, data)
537 VALUES (?, ?)",
538 )?
539 .execute((event_type, data))?;
540 Ok(())
541 }
542
543 fn set_room_account_data(
544 &self,
545 room_id: &[u8],
546 event_type: &[u8],
547 data: &[u8],
548 ) -> rusqlite::Result<()> {
549 self.prepare_cached(
550 "INSERT OR REPLACE INTO room_account_data (room_id, event_type, data)
551 VALUES (?, ?, ?)",
552 )?
553 .execute((room_id, event_type, data))?;
554 Ok(())
555 }
556
557 fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()> {
558 self.prepare(
559 "DELETE FROM room_account_data
560 WHERE room_id = ?",
561 )?
562 .execute((room_id,))?;
563 Ok(())
564 }
565
566 fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()> {
567 self.prepare_cached(
568 "INSERT OR REPLACE INTO room_info (room_id, state, data)
569 VALUES (?, ?, ?)",
570 )?
571 .execute((room_id, state, data))?;
572 Ok(())
573 }
574
575 fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>> {
576 self.query_row("SELECT data FROM room_info WHERE room_id = ?", (room_id,), |row| row.get(0))
577 .optional()
578 }
579
580 fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()> {
582 self.prepare_cached("DELETE FROM room_info WHERE room_id = ?")?.execute((room_id,))?;
583 Ok(())
584 }
585
586 fn set_state_event(
587 &self,
588 room_id: &[u8],
589 event_type: &[u8],
590 state_key: &[u8],
591 stripped: bool,
592 event_id: Option<&[u8]>,
593 data: &[u8],
594 ) -> rusqlite::Result<()> {
595 self.prepare_cached(
596 "INSERT OR REPLACE
597 INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
598 VALUES (?, ?, ?, ?, ?, ?)",
599 )?
600 .execute((room_id, event_type, state_key, stripped, event_id, data))?;
601 Ok(())
602 }
603
604 fn get_state_event_by_id(
605 &self,
606 room_id: &[u8],
607 event_id: &[u8],
608 ) -> rusqlite::Result<Option<Vec<u8>>> {
609 self.query_row(
610 "SELECT data FROM state_event WHERE room_id = ? AND event_id = ?",
611 (room_id, event_id),
612 |row| row.get(0),
613 )
614 .optional()
615 }
616
617 fn remove_room_state_events(
623 &self,
624 room_id: &[u8],
625 stripped: Option<bool>,
626 ) -> rusqlite::Result<()> {
627 if let Some(stripped) = stripped {
628 self.prepare_cached("DELETE FROM state_event WHERE room_id = ? AND stripped = ?")?
629 .execute((room_id, stripped))?;
630 } else {
631 self.prepare_cached("DELETE FROM state_event WHERE room_id = ?")?
632 .execute((room_id,))?;
633 }
634 Ok(())
635 }
636
637 fn set_member(
638 &self,
639 room_id: &[u8],
640 user_id: &[u8],
641 membership: &[u8],
642 stripped: bool,
643 data: &[u8],
644 ) -> rusqlite::Result<()> {
645 self.prepare_cached(
646 "INSERT OR REPLACE
647 INTO member (room_id, user_id, membership, stripped, data)
648 VALUES (?, ?, ?, ?, ?)",
649 )?
650 .execute((room_id, user_id, membership, stripped, data))?;
651 Ok(())
652 }
653
654 fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()> {
659 if let Some(stripped) = stripped {
660 self.prepare_cached("DELETE FROM member WHERE room_id = ? AND stripped = ?")?
661 .execute((room_id, stripped))?;
662 } else {
663 self.prepare_cached("DELETE FROM member WHERE room_id = ?")?.execute((room_id,))?;
664 }
665 Ok(())
666 }
667
668 fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
669 self.prepare_cached(
670 "INSERT OR REPLACE
671 INTO profile (room_id, user_id, data)
672 VALUES (?, ?, ?)",
673 )?
674 .execute((room_id, user_id, data))?;
675 Ok(())
676 }
677
678 fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()> {
679 self.prepare("DELETE FROM profile WHERE room_id = ?")?.execute((room_id,))?;
680 Ok(())
681 }
682
683 fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()> {
684 self.prepare("DELETE FROM profile WHERE room_id = ? AND user_id = ?")?
685 .execute((room_id, user_id))?;
686 Ok(())
687 }
688
689 fn set_receipt(
690 &self,
691 room_id: &[u8],
692 user_id: &[u8],
693 receipt_type: &[u8],
694 thread: &[u8],
695 event_id: &[u8],
696 data: &[u8],
697 ) -> rusqlite::Result<()> {
698 self.prepare_cached(
699 "INSERT OR REPLACE
700 INTO receipt (room_id, user_id, receipt_type, thread, event_id, data)
701 VALUES (?, ?, ?, ?, ?, ?)",
702 )?
703 .execute((room_id, user_id, receipt_type, thread, event_id, data))?;
704 Ok(())
705 }
706
707 fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()> {
708 self.prepare("DELETE FROM receipt WHERE room_id = ?")?.execute((room_id,))?;
709 Ok(())
710 }
711
712 fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
713 self.prepare_cached(
714 "INSERT OR REPLACE
715 INTO display_name (room_id, name, data)
716 VALUES (?, ?, ?)",
717 )?
718 .execute((room_id, name, data))?;
719 Ok(())
720 }
721
722 fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()> {
723 self.prepare("DELETE FROM display_name WHERE room_id = ? AND name = ?")?
724 .execute((room_id, name))?;
725 Ok(())
726 }
727
728 fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()> {
729 self.prepare("DELETE FROM display_name WHERE room_id = ?")?.execute((room_id,))?;
730 Ok(())
731 }
732
733 fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
734 self.prepare("DELETE FROM send_queue_events WHERE room_id = ?")?.execute((room_id,))?;
735 Ok(())
736 }
737
738 fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
739 self.prepare("DELETE FROM dependent_send_queue_events WHERE room_id = ?")?
740 .execute((room_id,))?;
741 Ok(())
742 }
743}
744
745#[async_trait]
746trait SqliteObjectStateStoreExt: SqliteAsyncConnExt {
747 async fn get_kv_blob(&self, key: Key) -> Result<Option<Vec<u8>>> {
748 Ok(self
749 .query_row("SELECT value FROM kv_blob WHERE key = ?", (key,), |row| row.get(0))
750 .await
751 .optional()?)
752 }
753
754 async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
755 let keys_length = keys.len();
756
757 self.chunk_large_query_over(keys, Some(keys_length), |txn, keys| {
758 let sql_params = repeat_vars(keys.len());
759 let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");
760
761 let params = rusqlite::params_from_iter(keys);
762
763 Ok(txn
764 .prepare(&sql)?
765 .query(params)?
766 .mapped(|row| row.get(0))
767 .collect::<Result<_, _>>()?)
768 })
769 .await
770 }
771
772 async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;
773
774 async fn delete_kv_blob(&self, key: Key) -> Result<()> {
775 self.execute("DELETE FROM kv_blob WHERE key = ?", (key,)).await?;
776 Ok(())
777 }
778
779 async fn get_room_infos(&self, room_id: Option<Key>) -> Result<Vec<Vec<u8>>> {
780 Ok(match room_id {
781 None => {
782 self.prepare("SELECT data FROM room_info", move |mut stmt| {
783 stmt.query_map((), |row| row.get(0))?.collect()
784 })
785 .await?
786 }
787
788 Some(room_id) => {
789 self.prepare("SELECT data FROM room_info WHERE room_id = ?", move |mut stmt| {
790 stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
791 })
792 .await?
793 }
794 })
795 }
796
797 async fn get_maybe_stripped_state_events_for_keys(
798 &self,
799 room_id: Key,
800 event_type: Key,
801 state_keys: Vec<Key>,
802 ) -> Result<Vec<(bool, Vec<u8>)>> {
803 self.chunk_large_query_over(state_keys, None, move |txn, state_keys: Vec<Key>| {
804 let sql_params = repeat_vars(state_keys.len());
805 let sql = format!(
806 "SELECT stripped, data FROM state_event
807 WHERE room_id = ? AND event_type = ? AND state_key IN ({sql_params})"
808 );
809
810 let params = rusqlite::params_from_iter(
811 [room_id.clone(), event_type.clone()].into_iter().chain(state_keys),
812 );
813
814 Ok(txn
815 .prepare(&sql)?
816 .query(params)?
817 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
818 .collect::<Result<_, _>>()?)
819 })
820 .await
821 }
822
823 async fn get_maybe_stripped_state_events(
824 &self,
825 room_id: Key,
826 event_type: Key,
827 ) -> Result<Vec<(bool, Vec<u8>)>> {
828 Ok(self
829 .prepare(
830 "SELECT stripped, data FROM state_event
831 WHERE room_id = ? AND event_type = ?",
832 |mut stmt| {
833 stmt.query((room_id, event_type))?
834 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
835 .collect()
836 },
837 )
838 .await?)
839 }
840
841 async fn get_profiles(
842 &self,
843 room_id: Key,
844 user_ids: Vec<Key>,
845 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
846 let user_ids_length = user_ids.len();
847
848 self.chunk_large_query_over(user_ids, Some(user_ids_length), move |txn, user_ids| {
849 let sql_params = repeat_vars(user_ids.len());
850 let sql = format!(
851 "SELECT user_id, data FROM profile WHERE room_id = ? AND user_id IN ({sql_params})"
852 );
853
854 let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(user_ids));
855
856 Ok(txn
857 .prepare(&sql)?
858 .query(params)?
859 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
860 .collect::<Result<_, _>>()?)
861 })
862 .await
863 }
864
865 async fn get_user_ids(&self, room_id: Key, memberships: Vec<Key>) -> Result<Vec<Vec<u8>>> {
866 let res = if memberships.is_empty() {
867 self.prepare("SELECT data FROM member WHERE room_id = ?", |mut stmt| {
868 stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
869 })
870 .await?
871 } else {
872 self.chunk_large_query_over(memberships, None, move |txn, memberships| {
873 let sql_params = repeat_vars(memberships.len());
874 let sql = format!(
875 "SELECT data FROM member WHERE room_id = ? AND membership IN ({sql_params})"
876 );
877
878 let params =
879 rusqlite::params_from_iter(iter::once(room_id.clone()).chain(memberships));
880
881 Ok(txn
882 .prepare(&sql)?
883 .query(params)?
884 .mapped(|row| row.get(0))
885 .collect::<Result<_, _>>()?)
886 })
887 .await?
888 };
889
890 Ok(res)
891 }
892
893 async fn get_global_account_data(&self, event_type: Key) -> Result<Option<Vec<u8>>> {
894 Ok(self
895 .query_row(
896 "SELECT data FROM global_account_data WHERE event_type = ?",
897 (event_type,),
898 |row| row.get(0),
899 )
900 .await
901 .optional()?)
902 }
903
904 async fn get_room_account_data(
905 &self,
906 room_id: Key,
907 event_type: Key,
908 ) -> Result<Option<Vec<u8>>> {
909 Ok(self
910 .query_row(
911 "SELECT data FROM room_account_data WHERE room_id = ? AND event_type = ?",
912 (room_id, event_type),
913 |row| row.get(0),
914 )
915 .await
916 .optional()?)
917 }
918
919 async fn get_display_names(
920 &self,
921 room_id: Key,
922 names: Vec<Key>,
923 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
924 let names_length = names.len();
925
926 self.chunk_large_query_over(names, Some(names_length), move |txn, names| {
927 let sql_params = repeat_vars(names.len());
928 let sql = format!(
929 "SELECT name, data FROM display_name WHERE room_id = ? AND name IN ({sql_params})"
930 );
931
932 let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(names));
933
934 Ok(txn
935 .prepare(&sql)?
936 .query(params)?
937 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
938 .collect::<Result<_, _>>()?)
939 })
940 .await
941 }
942
943 async fn get_user_receipt(
944 &self,
945 room_id: Key,
946 receipt_type: Key,
947 thread: Key,
948 user_id: Key,
949 ) -> Result<Option<Vec<u8>>> {
950 Ok(self
951 .query_row(
952 "SELECT data FROM receipt
953 WHERE room_id = ? AND receipt_type = ? AND thread = ? and user_id = ?",
954 (room_id, receipt_type, thread, user_id),
955 |row| row.get(0),
956 )
957 .await
958 .optional()?)
959 }
960
961 async fn get_event_receipts(
962 &self,
963 room_id: Key,
964 receipt_type: Key,
965 thread: Key,
966 event_id: Key,
967 ) -> Result<Vec<Vec<u8>>> {
968 Ok(self
969 .prepare(
970 "SELECT data FROM receipt
971 WHERE room_id = ? AND receipt_type = ? AND thread = ? and event_id = ?",
972 |mut stmt| {
973 stmt.query((room_id, receipt_type, thread, event_id))?
974 .mapped(|row| row.get(0))
975 .collect()
976 },
977 )
978 .await?)
979 }
980}
981
982#[async_trait]
983impl SqliteObjectStateStoreExt for SqliteAsyncConn {
984 async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()> {
985 Ok(self.interact(move |conn| conn.set_kv_blob(&key, &value)).await.unwrap()?)
986 }
987}
988
989#[async_trait]
990impl StateStore for SqliteStateStore {
991 type Error = Error;
992
993 async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
994 self.acquire()
995 .await?
996 .get_kv_blob(self.encode_state_store_data_key(key))
997 .await?
998 .map(|data| {
999 Ok(match key {
1000 StateStoreDataKey::SyncToken => {
1001 StateStoreDataValue::SyncToken(self.deserialize_value(&data)?)
1002 }
1003 StateStoreDataKey::ServerInfo => {
1004 StateStoreDataValue::ServerInfo(self.deserialize_value(&data)?)
1005 }
1006 StateStoreDataKey::Filter(_) => {
1007 StateStoreDataValue::Filter(self.deserialize_value(&data)?)
1008 }
1009 StateStoreDataKey::UserAvatarUrl(_) => {
1010 StateStoreDataValue::UserAvatarUrl(self.deserialize_value(&data)?)
1011 }
1012 StateStoreDataKey::RecentlyVisitedRooms(_) => {
1013 StateStoreDataValue::RecentlyVisitedRooms(self.deserialize_value(&data)?)
1014 }
1015 StateStoreDataKey::UtdHookManagerData => {
1016 StateStoreDataValue::UtdHookManagerData(self.deserialize_value(&data)?)
1017 }
1018 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1019 StateStoreDataValue::OneTimeKeyAlreadyUploaded
1020 }
1021 StateStoreDataKey::ComposerDraft(_, _) => {
1022 StateStoreDataValue::ComposerDraft(self.deserialize_value(&data)?)
1023 }
1024 StateStoreDataKey::SeenKnockRequests(_) => {
1025 StateStoreDataValue::SeenKnockRequests(self.deserialize_value(&data)?)
1026 }
1027 })
1028 })
1029 .transpose()
1030 }
1031
1032 async fn set_kv_data(
1033 &self,
1034 key: StateStoreDataKey<'_>,
1035 value: StateStoreDataValue,
1036 ) -> Result<()> {
1037 let serialized_value = match key {
1038 StateStoreDataKey::SyncToken => self.serialize_value(
1039 &value.into_sync_token().expect("Session data not a sync token"),
1040 )?,
1041 StateStoreDataKey::ServerInfo => self.serialize_value(
1042 &value.into_server_info().expect("Session data not containing server info"),
1043 )?,
1044 StateStoreDataKey::Filter(_) => {
1045 self.serialize_value(&value.into_filter().expect("Session data not a filter"))?
1046 }
1047 StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
1048 &value.into_user_avatar_url().expect("Session data not an user avatar url"),
1049 )?,
1050 StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
1051 &value.into_recently_visited_rooms().expect("Session data not breadcrumbs"),
1052 )?,
1053 StateStoreDataKey::UtdHookManagerData => self.serialize_value(
1054 &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
1055 )?,
1056 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
1057 self.serialize_value(&true).expect("We should be able to serialize a boolean")
1058 }
1059 StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
1060 &value.into_composer_draft().expect("Session data not a composer draft"),
1061 )?,
1062 StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
1063 &value
1064 .into_seen_knock_requests()
1065 .expect("Session data is not a set of seen knock request ids"),
1066 )?,
1067 };
1068
1069 self.acquire()
1070 .await?
1071 .set_kv_blob(self.encode_state_store_data_key(key), serialized_value)
1072 .await
1073 }
1074
1075 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
1076 self.acquire().await?.delete_kv_blob(self.encode_state_store_data_key(key)).await
1077 }
1078
1079 async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
1080 let changes = changes.to_owned();
1081 let this = self.clone();
1082 self.acquire()
1083 .await?
1084 .with_transaction(move |txn| {
1085 let StateChanges {
1086 sync_token,
1087 account_data,
1088 presence,
1089 profiles,
1090 profiles_to_delete,
1091 state,
1092 room_account_data,
1093 room_infos,
1094 receipts,
1095 redactions,
1096 stripped_state,
1097 ambiguity_maps,
1098 } = changes;
1099
1100 if let Some(sync_token) = sync_token {
1101 let key = this.encode_state_store_data_key(StateStoreDataKey::SyncToken);
1102 let value = this.serialize_value(&sync_token)?;
1103 txn.set_kv_blob(&key, &value)?;
1104 }
1105
1106 for (event_type, event) in account_data {
1107 let event_type =
1108 this.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1109 let data = this.serialize_json(&event)?;
1110 txn.set_global_account_data(&event_type, &data)?;
1111 }
1112
1113 for (room_id, events) in room_account_data {
1114 let room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1115 for (event_type, event) in events {
1116 let event_type =
1117 this.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1118 let data = this.serialize_json(&event)?;
1119 txn.set_room_account_data(&room_id, &event_type, &data)?;
1120 }
1121 }
1122
1123 for (user_id, event) in presence {
1124 let key = this.encode_presence_key(&user_id);
1125 let value = this.serialize_json(&event)?;
1126 txn.set_kv_blob(&key, &value)?;
1127 }
1128
1129 for (room_id, room_info) in room_infos {
1130 let stripped = room_info.state() == RoomState::Invited;
1131 this.remove_maybe_stripped_room_data(txn, &room_id, !stripped)?;
1133
1134 let room_id = this.encode_key(keys::ROOM_INFO, room_id);
1135 let state = this
1136 .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
1137 let data = this.serialize_json(&room_info)?;
1138 txn.set_room_info(&room_id, &state, &data)?;
1139 }
1140
1141 for (room_id, user_ids) in profiles_to_delete {
1142 let room_id = this.encode_key(keys::PROFILE, room_id);
1143 for user_id in user_ids {
1144 let user_id = this.encode_key(keys::PROFILE, user_id);
1145 txn.remove_room_profile(&room_id, &user_id)?;
1146 }
1147 }
1148
1149 for (room_id, state_event_types) in state {
1150 let profiles = profiles.get(&room_id);
1151 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1152
1153 for (event_type, state_events) in state_event_types {
1154 let encoded_event_type =
1155 this.encode_key(keys::STATE_EVENT, event_type.to_string());
1156
1157 for (state_key, raw_state_event) in state_events {
1158 let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1159 let data = this.serialize_json(&raw_state_event)?;
1160
1161 let event_id: Option<String> =
1162 raw_state_event.get_field("event_id").ok().flatten();
1163 let encoded_event_id =
1164 event_id.as_ref().map(|e| this.encode_key(keys::STATE_EVENT, e));
1165
1166 txn.set_state_event(
1167 &encoded_room_id,
1168 &encoded_event_type,
1169 &encoded_state_key,
1170 false,
1171 encoded_event_id.as_deref(),
1172 &data,
1173 )?;
1174
1175 if event_type == StateEventType::RoomMember {
1176 let member_event = match raw_state_event
1177 .deserialize_as_unchecked::<SyncRoomMemberEvent>()
1178 {
1179 Ok(ev) => ev,
1180 Err(e) => {
1181 debug!(event_id, "Failed to deserialize member event: {e}");
1182 continue;
1183 }
1184 };
1185
1186 let encoded_room_id = this.encode_key(keys::MEMBER, &room_id);
1187 let user_id = this.encode_key(keys::MEMBER, &state_key);
1188 let membership = this
1189 .encode_key(keys::MEMBER, member_event.membership().as_str());
1190 let data = this.serialize_value(&state_key)?;
1191
1192 txn.set_member(
1193 &encoded_room_id,
1194 &user_id,
1195 &membership,
1196 false,
1197 &data,
1198 )?;
1199
1200 if let Some(profile) =
1201 profiles.and_then(|p| p.get(member_event.state_key()))
1202 {
1203 let room_id = this.encode_key(keys::PROFILE, &room_id);
1204 let user_id = this.encode_key(keys::PROFILE, &state_key);
1205 let data = this.serialize_json(&profile)?;
1206 txn.set_profile(&room_id, &user_id, &data)?;
1207 }
1208 }
1209 }
1210 }
1211 }
1212
1213 for (room_id, stripped_state_event_types) in stripped_state {
1214 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1215
1216 for (event_type, stripped_state_events) in stripped_state_event_types {
1217 let encoded_event_type =
1218 this.encode_key(keys::STATE_EVENT, event_type.to_string());
1219
1220 for (state_key, raw_stripped_state_event) in stripped_state_events {
1221 let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1222 let data = this.serialize_json(&raw_stripped_state_event)?;
1223 txn.set_state_event(
1224 &encoded_room_id,
1225 &encoded_event_type,
1226 &encoded_state_key,
1227 true,
1228 None,
1229 &data,
1230 )?;
1231
1232 if event_type == StateEventType::RoomMember {
1233 let member_event = match raw_stripped_state_event
1234 .deserialize_as_unchecked::<StrippedRoomMemberEvent>(
1235 ) {
1236 Ok(ev) => ev,
1237 Err(e) => {
1238 debug!("Failed to deserialize stripped member event: {e}");
1239 continue;
1240 }
1241 };
1242
1243 let room_id = this.encode_key(keys::MEMBER, &room_id);
1244 let user_id = this.encode_key(keys::MEMBER, &state_key);
1245 let membership = this.encode_key(
1246 keys::MEMBER,
1247 member_event.content.membership.as_str(),
1248 );
1249 let data = this.serialize_value(&state_key)?;
1250
1251 txn.set_member(&room_id, &user_id, &membership, true, &data)?;
1252 }
1253 }
1254 }
1255 }
1256
1257 for (room_id, receipt_event) in receipts {
1258 let room_id = this.encode_key(keys::RECEIPT, room_id);
1259
1260 for (event_id, receipt_types) in receipt_event {
1261 let encoded_event_id = this.encode_key(keys::RECEIPT, &event_id);
1262
1263 for (receipt_type, receipt_users) in receipt_types {
1264 let receipt_type =
1265 this.encode_key(keys::RECEIPT, receipt_type.as_str());
1266
1267 for (user_id, receipt) in receipt_users {
1268 let encoded_user_id = this.encode_key(keys::RECEIPT, &user_id);
1269 let thread = this.encode_key(
1272 keys::RECEIPT,
1273 rmp_serde::to_vec_named(&receipt.thread)?,
1274 );
1275 let data = this.serialize_json(&ReceiptData {
1276 receipt,
1277 event_id: event_id.clone(),
1278 user_id,
1279 })?;
1280
1281 txn.set_receipt(
1282 &room_id,
1283 &encoded_user_id,
1284 &receipt_type,
1285 &thread,
1286 &encoded_event_id,
1287 &data,
1288 )?;
1289 }
1290 }
1291 }
1292 }
1293
1294 for (room_id, redactions) in redactions {
1295 let make_redaction_rules = || {
1296 let encoded_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1297 txn.get_room_info(&encoded_room_id)
1298 .ok()
1299 .flatten()
1300 .and_then(|v| this.deserialize_json::<RoomInfo>(&v).ok())
1301 .map(|info| info.room_version_rules_or_default())
1302 .unwrap_or_else(|| {
1303 warn!(
1304 ?room_id,
1305 "Unable to get the room version rules, defaulting to rules for room version {ROOM_VERSION_FALLBACK}"
1306 );
1307 ROOM_VERSION_RULES_FALLBACK
1308 }).redaction
1309 };
1310
1311 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1312 let mut redaction_rules = None;
1313
1314 for (event_id, redaction) in redactions {
1315 let event_id = this.encode_key(keys::STATE_EVENT, event_id);
1316
1317 if let Some(Ok(raw_event)) = txn
1318 .get_state_event_by_id(&encoded_room_id, &event_id)?
1319 .map(|value| this.deserialize_json::<Raw<AnySyncStateEvent>>(&value))
1320 {
1321 let event = raw_event.deserialize()?;
1322 let redacted = redact(
1323 raw_event.deserialize_as::<CanonicalJsonObject>()?,
1324 redaction_rules.get_or_insert_with(make_redaction_rules),
1325 Some(RedactedBecause::from_raw_event(&redaction)?),
1326 )
1327 .map_err(Error::Redaction)?;
1328 let data = this.serialize_json(&redacted)?;
1329
1330 let event_type =
1331 this.encode_key(keys::STATE_EVENT, event.event_type().to_string());
1332 let state_key = this.encode_key(keys::STATE_EVENT, event.state_key());
1333
1334 txn.set_state_event(
1335 &encoded_room_id,
1336 &event_type,
1337 &state_key,
1338 false,
1339 Some(&event_id),
1340 &data,
1341 )?;
1342 }
1343 }
1344 }
1345
1346 for (room_id, display_names) in ambiguity_maps {
1347 let room_id = this.encode_key(keys::DISPLAY_NAME, room_id);
1348
1349 for (name, user_ids) in display_names {
1350 let encoded_name = this.encode_key(
1351 keys::DISPLAY_NAME,
1352 name.as_normalized_str().unwrap_or_else(|| name.as_raw_str()),
1353 );
1354 let data = this.serialize_json(&user_ids)?;
1355
1356 if user_ids.is_empty() {
1357 txn.remove_display_name(&room_id, &encoded_name)?;
1358
1359 let raw_name = this.encode_key(keys::DISPLAY_NAME, name.as_raw_str());
1374 txn.remove_display_name(&room_id, &raw_name)?;
1375 } else {
1376 txn.set_display_name(&room_id, &encoded_name, &data)?;
1378 }
1379 }
1380 }
1381
1382 Ok::<_, Error>(())
1383 })
1384 .await?;
1385
1386 Ok(())
1387 }
1388
1389 async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1390 self.acquire()
1391 .await?
1392 .get_kv_blob(self.encode_presence_key(user_id))
1393 .await?
1394 .map(|data| self.deserialize_json(&data))
1395 .transpose()
1396 }
1397
1398 async fn get_presence_events(
1399 &self,
1400 user_ids: &[OwnedUserId],
1401 ) -> Result<Vec<Raw<PresenceEvent>>> {
1402 if user_ids.is_empty() {
1403 return Ok(Vec::new());
1404 }
1405
1406 let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
1407 self.acquire()
1408 .await?
1409 .get_kv_blobs(user_ids)
1410 .await?
1411 .into_iter()
1412 .map(|data| self.deserialize_json(&data))
1413 .collect()
1414 }
1415
1416 async fn get_state_event(
1417 &self,
1418 room_id: &RoomId,
1419 event_type: StateEventType,
1420 state_key: &str,
1421 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1422 Ok(self
1423 .get_state_events_for_keys(room_id, event_type, &[state_key])
1424 .await?
1425 .into_iter()
1426 .next())
1427 }
1428
1429 async fn get_state_events(
1430 &self,
1431 room_id: &RoomId,
1432 event_type: StateEventType,
1433 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1434 let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1435 let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1436 self.acquire()
1437 .await?
1438 .get_maybe_stripped_state_events(room_id, event_type)
1439 .await?
1440 .into_iter()
1441 .map(|(stripped, data)| {
1442 let ev = if stripped {
1443 RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1444 } else {
1445 RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1446 };
1447
1448 Ok(ev)
1449 })
1450 .collect()
1451 }
1452
1453 async fn get_state_events_for_keys(
1454 &self,
1455 room_id: &RoomId,
1456 event_type: StateEventType,
1457 state_keys: &[&str],
1458 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1459 if state_keys.is_empty() {
1460 return Ok(Vec::new());
1461 }
1462
1463 let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1464 let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1465 let state_keys = state_keys.iter().map(|k| self.encode_key(keys::STATE_EVENT, k)).collect();
1466 self.acquire()
1467 .await?
1468 .get_maybe_stripped_state_events_for_keys(room_id, event_type, state_keys)
1469 .await?
1470 .into_iter()
1471 .map(|(stripped, data)| {
1472 let ev = if stripped {
1473 RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1474 } else {
1475 RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1476 };
1477
1478 Ok(ev)
1479 })
1480 .collect()
1481 }
1482
1483 async fn get_profile(
1484 &self,
1485 room_id: &RoomId,
1486 user_id: &UserId,
1487 ) -> Result<Option<MinimalRoomMemberEvent>> {
1488 let room_id = self.encode_key(keys::PROFILE, room_id);
1489 let user_ids = vec![self.encode_key(keys::PROFILE, user_id)];
1490
1491 self.acquire()
1492 .await?
1493 .get_profiles(room_id, user_ids)
1494 .await?
1495 .into_iter()
1496 .next()
1497 .map(|(_, data)| self.deserialize_json(&data))
1498 .transpose()
1499 }
1500
1501 async fn get_profiles<'a>(
1502 &self,
1503 room_id: &RoomId,
1504 user_ids: &'a [OwnedUserId],
1505 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1506 if user_ids.is_empty() {
1507 return Ok(BTreeMap::new());
1508 }
1509
1510 let room_id = self.encode_key(keys::PROFILE, room_id);
1511 let mut user_ids_map = user_ids
1512 .iter()
1513 .map(|u| (self.encode_key(keys::PROFILE, u), u.as_ref()))
1514 .collect::<BTreeMap<_, _>>();
1515 let user_ids = user_ids_map.keys().cloned().collect();
1516
1517 self.acquire()
1518 .await?
1519 .get_profiles(room_id, user_ids)
1520 .await?
1521 .into_iter()
1522 .map(|(user_id, data)| {
1523 Ok((
1524 user_ids_map
1525 .remove(user_id.as_slice())
1526 .expect("returned user IDs were requested"),
1527 self.deserialize_json(&data)?,
1528 ))
1529 })
1530 .collect()
1531 }
1532
1533 async fn get_user_ids(
1534 &self,
1535 room_id: &RoomId,
1536 membership: RoomMemberships,
1537 ) -> Result<Vec<OwnedUserId>> {
1538 let room_id = self.encode_key(keys::MEMBER, room_id);
1539 let memberships = membership
1540 .as_vec()
1541 .into_iter()
1542 .map(|m| self.encode_key(keys::MEMBER, m.as_str()))
1543 .collect();
1544 self.acquire()
1545 .await?
1546 .get_user_ids(room_id, memberships)
1547 .await?
1548 .iter()
1549 .map(|data| self.deserialize_value(data))
1550 .collect()
1551 }
1552
1553 async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1554 self.acquire()
1555 .await?
1556 .get_room_infos(match room_load_settings {
1557 RoomLoadSettings::All => None,
1558 RoomLoadSettings::One(room_id) => Some(self.encode_key(keys::ROOM_INFO, room_id)),
1559 })
1560 .await?
1561 .into_iter()
1562 .map(|data| self.deserialize_json(&data))
1563 .collect()
1564 }
1565
1566 async fn get_users_with_display_name(
1567 &self,
1568 room_id: &RoomId,
1569 display_name: &DisplayName,
1570 ) -> Result<BTreeSet<OwnedUserId>> {
1571 let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1572 let names = vec![self.encode_key(
1573 keys::DISPLAY_NAME,
1574 display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1575 )];
1576
1577 Ok(self
1578 .acquire()
1579 .await?
1580 .get_display_names(room_id, names)
1581 .await?
1582 .into_iter()
1583 .next()
1584 .map(|(_, data)| self.deserialize_json(&data))
1585 .transpose()?
1586 .unwrap_or_default())
1587 }
1588
1589 async fn get_users_with_display_names<'a>(
1590 &self,
1591 room_id: &RoomId,
1592 display_names: &'a [DisplayName],
1593 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1594 let mut result = HashMap::new();
1595
1596 if display_names.is_empty() {
1597 return Ok(result);
1598 }
1599
1600 let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1601 let mut names_map = display_names
1602 .iter()
1603 .flat_map(|display_name| {
1604 let raw =
1614 (self.encode_key(keys::DISPLAY_NAME, display_name.as_raw_str()), display_name);
1615 let normalized = display_name.as_normalized_str().map(|normalized| {
1616 (self.encode_key(keys::DISPLAY_NAME, normalized), display_name)
1617 });
1618
1619 iter::once(raw).chain(normalized.into_iter())
1620 })
1621 .collect::<BTreeMap<_, _>>();
1622 let names = names_map.keys().cloned().collect();
1623
1624 for (name, data) in
1625 self.acquire().await?.get_display_names(room_id, names).await?.into_iter()
1626 {
1627 let display_name =
1628 names_map.remove(name.as_slice()).expect("returned display names were requested");
1629 let user_ids: BTreeSet<_> = self.deserialize_json(&data)?;
1630
1631 result.entry(display_name).or_insert_with(BTreeSet::new).extend(user_ids);
1632 }
1633
1634 Ok(result)
1635 }
1636
1637 async fn get_account_data_event(
1638 &self,
1639 event_type: GlobalAccountDataEventType,
1640 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1641 let event_type = self.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1642 self.acquire()
1643 .await?
1644 .get_global_account_data(event_type)
1645 .await?
1646 .map(|value| self.deserialize_json(&value))
1647 .transpose()
1648 }
1649
1650 async fn get_room_account_data_event(
1651 &self,
1652 room_id: &RoomId,
1653 event_type: RoomAccountDataEventType,
1654 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1655 let room_id = self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1656 let event_type = self.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1657 self.acquire()
1658 .await?
1659 .get_room_account_data(room_id, event_type)
1660 .await?
1661 .map(|value| self.deserialize_json(&value))
1662 .transpose()
1663 }
1664
1665 async fn get_user_room_receipt_event(
1666 &self,
1667 room_id: &RoomId,
1668 receipt_type: ReceiptType,
1669 thread: ReceiptThread,
1670 user_id: &UserId,
1671 ) -> Result<Option<(OwnedEventId, Receipt)>> {
1672 let room_id = self.encode_key(keys::RECEIPT, room_id);
1673 let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1674 let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1677 let user_id = self.encode_key(keys::RECEIPT, user_id);
1678
1679 self.acquire()
1680 .await?
1681 .get_user_receipt(room_id, receipt_type, thread, user_id)
1682 .await?
1683 .map(|value| {
1684 self.deserialize_json::<ReceiptData>(&value).map(|d| (d.event_id, d.receipt))
1685 })
1686 .transpose()
1687 }
1688
1689 async fn get_event_room_receipt_events(
1690 &self,
1691 room_id: &RoomId,
1692 receipt_type: ReceiptType,
1693 thread: ReceiptThread,
1694 event_id: &EventId,
1695 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1696 let room_id = self.encode_key(keys::RECEIPT, room_id);
1697 let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1698 let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1701 let event_id = self.encode_key(keys::RECEIPT, event_id);
1702
1703 self.acquire()
1704 .await?
1705 .get_event_receipts(room_id, receipt_type, thread, event_id)
1706 .await?
1707 .iter()
1708 .map(|value| {
1709 self.deserialize_json::<ReceiptData>(value).map(|d| (d.user_id, d.receipt))
1710 })
1711 .collect()
1712 }
1713
1714 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1715 self.acquire().await?.get_kv_blob(self.encode_custom_key(key)).await
1716 }
1717
1718 async fn set_custom_value_no_read(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
1719 let conn = self.acquire().await?;
1720 let key = self.encode_custom_key(key);
1721 conn.set_kv_blob(key, value).await?;
1722 Ok(())
1723 }
1724
1725 async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1726 let conn = self.acquire().await?;
1727 let key = self.encode_custom_key(key);
1728 let previous = conn.get_kv_blob(key.clone()).await?;
1729 conn.set_kv_blob(key, value).await?;
1730 Ok(previous)
1731 }
1732
1733 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1734 let conn = self.acquire().await?;
1735 let key = self.encode_custom_key(key);
1736 let previous = conn.get_kv_blob(key.clone()).await?;
1737 if previous.is_some() {
1738 conn.delete_kv_blob(key).await?;
1739 }
1740 Ok(previous)
1741 }
1742
1743 async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1744 let this = self.clone();
1745 let room_id = room_id.to_owned();
1746
1747 let conn = self.acquire().await?;
1748
1749 conn.with_transaction(move |txn| -> Result<()> {
1750 let room_info_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1751 txn.remove_room_info(&room_info_room_id)?;
1752
1753 let state_event_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1754 txn.remove_room_state_events(&state_event_room_id, None)?;
1755
1756 let member_room_id = this.encode_key(keys::MEMBER, &room_id);
1757 txn.remove_room_members(&member_room_id, None)?;
1758
1759 let profile_room_id = this.encode_key(keys::PROFILE, &room_id);
1760 txn.remove_room_profiles(&profile_room_id)?;
1761
1762 let room_account_data_room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, &room_id);
1763 txn.remove_room_account_data(&room_account_data_room_id)?;
1764
1765 let receipt_room_id = this.encode_key(keys::RECEIPT, &room_id);
1766 txn.remove_room_receipts(&receipt_room_id)?;
1767
1768 let display_name_room_id = this.encode_key(keys::DISPLAY_NAME, &room_id);
1769 txn.remove_room_display_names(&display_name_room_id)?;
1770
1771 let send_queue_room_id = this.encode_key(keys::SEND_QUEUE, &room_id);
1772 txn.remove_room_send_queue(&send_queue_room_id)?;
1773
1774 let dependent_send_queue_room_id =
1775 this.encode_key(keys::DEPENDENTS_SEND_QUEUE, &room_id);
1776 txn.remove_room_dependent_send_queue(&dependent_send_queue_room_id)?;
1777
1778 let thread_subscriptions_room_id =
1779 this.encode_key(keys::THREAD_SUBSCRIPTIONS, &room_id);
1780 txn.execute(
1781 "DELETE FROM thread_subscriptions WHERE room_id = ?",
1782 (thread_subscriptions_room_id,),
1783 )?;
1784
1785 Ok(())
1786 })
1787 .await?;
1788
1789 conn.vacuum().await
1790 }
1791
1792 async fn save_send_queue_request(
1793 &self,
1794 room_id: &RoomId,
1795 transaction_id: OwnedTransactionId,
1796 created_at: MilliSecondsSinceUnixEpoch,
1797 content: QueuedRequestKind,
1798 priority: usize,
1799 ) -> Result<(), Self::Error> {
1800 let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
1801 let room_id_value = self.serialize_value(&room_id.to_owned())?;
1802
1803 let content = self.serialize_json(&content)?;
1804 let created_at_ts: u64 = created_at.0.into();
1810 self.acquire()
1811 .await?
1812 .with_transaction(move |txn| {
1813 txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
1814 Ok(())
1815 })
1816 .await
1817 }
1818
1819 async fn update_send_queue_request(
1820 &self,
1821 room_id: &RoomId,
1822 transaction_id: &TransactionId,
1823 content: QueuedRequestKind,
1824 ) -> Result<bool, Self::Error> {
1825 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1826
1827 let content = self.serialize_json(&content)?;
1828 let transaction_id = transaction_id.to_string();
1831
1832 let num_updated = self.acquire()
1833 .await?
1834 .with_transaction(move |txn| {
1835 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
1836 })
1837 .await?;
1838
1839 Ok(num_updated > 0)
1840 }
1841
1842 async fn remove_send_queue_request(
1843 &self,
1844 room_id: &RoomId,
1845 transaction_id: &TransactionId,
1846 ) -> Result<bool, Self::Error> {
1847 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1848
1849 let transaction_id = transaction_id.to_string();
1851
1852 let num_deleted = self
1853 .acquire()
1854 .await?
1855 .with_transaction(move |txn| {
1856 txn.prepare_cached(
1857 "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
1858 )?
1859 .execute((room_id, &transaction_id))
1860 })
1861 .await?;
1862
1863 Ok(num_deleted > 0)
1864 }
1865
1866 async fn load_send_queue_requests(
1867 &self,
1868 room_id: &RoomId,
1869 ) -> Result<Vec<QueuedRequest>, Self::Error> {
1870 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1871
1872 let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, Option<u64>)> = self
1876 .acquire()
1877 .await?
1878 .prepare(
1879 "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
1880 |mut stmt| {
1881 stmt.query((room_id,))?
1882 .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
1883 .collect()
1884 },
1885 )
1886 .await?;
1887
1888 let mut requests = Vec::with_capacity(res.len());
1889 for entry in res {
1890 let created_at = entry
1891 .4
1892 .and_then(UInt::new)
1893 .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
1894 requests.push(QueuedRequest {
1895 transaction_id: entry.0.into(),
1896 kind: self.deserialize_json(&entry.1)?,
1897 error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
1898 priority: entry.3,
1899 created_at,
1900 });
1901 }
1902
1903 Ok(requests)
1904 }
1905
1906 async fn update_send_queue_request_status(
1907 &self,
1908 room_id: &RoomId,
1909 transaction_id: &TransactionId,
1910 error: Option<QueueWedgeError>,
1911 ) -> Result<(), Self::Error> {
1912 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1913
1914 let transaction_id = transaction_id.to_string();
1916
1917 let error_value = error.map(|e| self.serialize_value(&e)).transpose()?;
1919
1920 self.acquire()
1921 .await?
1922 .with_transaction(move |txn| {
1923 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?;
1924 Ok(())
1925 })
1926 .await
1927 }
1928
1929 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1930 let res: Vec<Vec<u8>> = self
1935 .acquire()
1936 .await?
1937 .prepare("SELECT room_id_val FROM send_queue_events", |mut stmt| {
1938 stmt.query(())?.mapped(|row| row.get(0)).collect()
1939 })
1940 .await?;
1941
1942 Ok(res
1945 .into_iter()
1946 .map(|entry| self.deserialize_value(&entry))
1947 .collect::<Result<BTreeSet<OwnedRoomId>, _>>()?
1948 .into_iter()
1949 .collect())
1950 }
1951
1952 async fn save_dependent_queued_request(
1953 &self,
1954 room_id: &RoomId,
1955 parent_txn_id: &TransactionId,
1956 own_txn_id: ChildTransactionId,
1957 created_at: MilliSecondsSinceUnixEpoch,
1958 content: DependentQueuedRequestKind,
1959 ) -> Result<()> {
1960 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
1961 let content = self.serialize_json(&content)?;
1962
1963 let parent_txn_id = parent_txn_id.to_string();
1965 let own_txn_id = own_txn_id.to_string();
1966
1967 let created_at_ts: u64 = created_at.0.into();
1968 self.acquire()
1969 .await?
1970 .with_transaction(move |txn| {
1971 txn.prepare_cached(
1972 r#"INSERT INTO dependent_send_queue_events
1973 (room_id, parent_transaction_id, own_transaction_id, content, created_at)
1974 VALUES (?, ?, ?, ?, ?)"#,
1975 )?
1976 .execute((
1977 room_id,
1978 parent_txn_id,
1979 own_txn_id,
1980 content,
1981 created_at_ts,
1982 ))?;
1983 Ok(())
1984 })
1985 .await
1986 }
1987
1988 async fn update_dependent_queued_request(
1989 &self,
1990 room_id: &RoomId,
1991 own_transaction_id: &ChildTransactionId,
1992 new_content: DependentQueuedRequestKind,
1993 ) -> Result<bool> {
1994 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
1995 let content = self.serialize_json(&new_content)?;
1996
1997 let own_txn_id = own_transaction_id.to_string();
1999
2000 let num_updated = self
2001 .acquire()
2002 .await?
2003 .with_transaction(move |txn| {
2004 txn.prepare_cached(
2005 r#"UPDATE dependent_send_queue_events
2006 SET content = ?
2007 WHERE own_transaction_id = ?
2008 AND room_id = ?"#,
2009 )?
2010 .execute((content, own_txn_id, room_id))
2011 })
2012 .await?;
2013
2014 if num_updated > 1 {
2015 return Err(Error::InconsistentUpdate);
2016 }
2017
2018 Ok(num_updated == 1)
2019 }
2020
2021 async fn mark_dependent_queued_requests_as_ready(
2022 &self,
2023 room_id: &RoomId,
2024 parent_txn_id: &TransactionId,
2025 parent_key: SentRequestKey,
2026 ) -> Result<usize> {
2027 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2028 let parent_key = self.serialize_value(&parent_key)?;
2029
2030 let parent_txn_id = parent_txn_id.to_string();
2032
2033 self.acquire()
2034 .await?
2035 .with_transaction(move |txn| {
2036 Ok(txn.prepare_cached(
2037 "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?",
2038 )?
2039 .execute((parent_key, parent_txn_id, room_id))?)
2040 })
2041 .await
2042 }
2043
2044 async fn remove_dependent_queued_request(
2045 &self,
2046 room_id: &RoomId,
2047 txn_id: &ChildTransactionId,
2048 ) -> Result<bool> {
2049 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2050
2051 let txn_id = txn_id.to_string();
2053
2054 let num_deleted = self
2055 .acquire()
2056 .await?
2057 .with_transaction(move |txn| {
2058 txn.prepare_cached(
2059 "DELETE FROM dependent_send_queue_events WHERE own_transaction_id = ? AND room_id = ?",
2060 )?
2061 .execute((txn_id, room_id))
2062 })
2063 .await?;
2064
2065 Ok(num_deleted > 0)
2066 }
2067
2068 async fn load_dependent_queued_requests(
2069 &self,
2070 room_id: &RoomId,
2071 ) -> Result<Vec<DependentQueuedRequest>> {
2072 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2073
2074 let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, Option<u64>)> = self
2076 .acquire()
2077 .await?
2078 .prepare(
2079 "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2080 |mut stmt| {
2081 stmt.query((room_id,))?
2082 .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2083 .collect()
2084 },
2085 )
2086 .await?;
2087
2088 let mut dependent_events = Vec::with_capacity(res.len());
2089 for entry in res {
2090 let created_at = entry
2091 .4
2092 .and_then(UInt::new)
2093 .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2094 dependent_events.push(DependentQueuedRequest {
2095 own_transaction_id: entry.0.into(),
2096 parent_transaction_id: entry.1.into(),
2097 parent_key: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?,
2098 kind: self.deserialize_json(&entry.3)?,
2099 created_at,
2100 });
2101 }
2102
2103 Ok(dependent_events)
2104 }
2105
2106 async fn upsert_thread_subscription(
2107 &self,
2108 room_id: &RoomId,
2109 thread_id: &EventId,
2110 subscription: ThreadSubscription,
2111 ) -> Result<(), Self::Error> {
2112 let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2113 let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2114 let status = subscription.as_str();
2115
2116 self.acquire()
2117 .await?
2118 .with_transaction(move |txn| {
2119 txn.prepare_cached(
2120 "INSERT OR REPLACE INTO thread_subscriptions (room_id, event_id, status)
2121 VALUES (?, ?, ?)",
2122 )?
2123 .execute((room_id, thread_id, status))
2124 })
2125 .await?;
2126 Ok(())
2127 }
2128
2129 async fn load_thread_subscription(
2130 &self,
2131 room_id: &RoomId,
2132 thread_id: &EventId,
2133 ) -> Result<Option<ThreadSubscription>, Self::Error> {
2134 let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2135 let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2136
2137 Ok(self
2138 .acquire()
2139 .await?
2140 .query_row(
2141 "SELECT status FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2142 (room_id, thread_id),
2143 |row| row.get::<_, String>(0),
2144 )
2145 .await
2146 .optional()?
2147 .map(|data| {
2148 ThreadSubscription::from_value(&data).ok_or_else(|| Error::InvalidData {
2149 details: format!("Invalid thread status: {data}"),
2150 })
2151 })
2152 .transpose()?)
2153 }
2154
2155 async fn remove_thread_subscription(
2156 &self,
2157 room_id: &RoomId,
2158 thread_id: &EventId,
2159 ) -> Result<(), Self::Error> {
2160 let room_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id);
2161 let thread_id = self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id);
2162
2163 self.acquire()
2164 .await?
2165 .execute(
2166 "DELETE FROM thread_subscriptions WHERE room_id = ? AND event_id = ?",
2167 (room_id, thread_id),
2168 )
2169 .await?;
2170
2171 Ok(())
2172 }
2173}
2174
2175#[derive(Debug, Clone, Serialize, Deserialize)]
2176struct ReceiptData {
2177 receipt: Receipt,
2178 event_id: OwnedEventId,
2179 user_id: OwnedUserId,
2180}
2181
2182#[cfg(test)]
2183mod tests {
2184 use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2185
2186 use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2187 use once_cell::sync::Lazy;
2188 use tempfile::{tempdir, TempDir};
2189
2190 use super::SqliteStateStore;
2191
2192 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2193 static NUM: AtomicU32 = AtomicU32::new(0);
2194
2195 async fn get_store() -> Result<impl StateStore, StoreError> {
2196 let name = NUM.fetch_add(1, SeqCst).to_string();
2197 let tmpdir_path = TMP_DIR.path().join(name);
2198
2199 tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2200
2201 Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
2202 }
2203
2204 statestore_integration_tests!();
2205}
2206
2207#[cfg(test)]
2208mod encrypted_tests {
2209 use std::{
2210 path::PathBuf,
2211 sync::atomic::{AtomicU32, Ordering::SeqCst},
2212 };
2213
2214 use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2215 use matrix_sdk_test::async_test;
2216 use once_cell::sync::Lazy;
2217 use tempfile::{tempdir, TempDir};
2218
2219 use super::SqliteStateStore;
2220 use crate::{utils::SqliteAsyncConnExt, SqliteStoreConfig};
2221
2222 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2223 static NUM: AtomicU32 = AtomicU32::new(0);
2224
2225 fn new_state_store_workspace() -> PathBuf {
2226 let name = NUM.fetch_add(1, SeqCst).to_string();
2227 TMP_DIR.path().join(name)
2228 }
2229
2230 async fn get_store() -> Result<impl StateStore, StoreError> {
2231 let tmpdir_path = new_state_store_workspace();
2232
2233 tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2234
2235 Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
2236 .await
2237 .unwrap())
2238 }
2239
2240 #[async_test]
2241 async fn test_pool_size() {
2242 let tmpdir_path = new_state_store_workspace();
2243 let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
2244
2245 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2246
2247 assert_eq!(store.pool.status().max_size, 42);
2248 }
2249
2250 #[async_test]
2251 async fn test_cache_size() {
2252 let tmpdir_path = new_state_store_workspace();
2253 let store_open_config = SqliteStoreConfig::new(tmpdir_path).cache_size(1500);
2254
2255 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2256
2257 let conn = store.pool.get().await.unwrap();
2258 let cache_size =
2259 conn.query_row("PRAGMA cache_size", (), |row| row.get::<_, i32>(0)).await.unwrap();
2260
2261 assert_eq!(cache_size, -(1500 / 1024));
2265 }
2266
2267 #[async_test]
2268 async fn test_journal_size_limit() {
2269 let tmpdir_path = new_state_store_workspace();
2270 let store_open_config = SqliteStoreConfig::new(tmpdir_path).journal_size_limit(1500);
2271
2272 let store = SqliteStateStore::open_with_config(store_open_config).await.unwrap();
2273
2274 let conn = store.pool.get().await.unwrap();
2275 let journal_size_limit = conn
2276 .query_row("PRAGMA journal_size_limit", (), |row| row.get::<_, u32>(0))
2277 .await
2278 .unwrap();
2279
2280 assert_eq!(journal_size_limit, 1500);
2283 }
2284
2285 statestore_integration_tests!();
2286}
2287
2288#[cfg(test)]
2289mod migration_tests {
2290 use std::{
2291 path::{Path, PathBuf},
2292 sync::{
2293 atomic::{AtomicU32, Ordering::SeqCst},
2294 Arc,
2295 },
2296 };
2297
2298 use as_variant::as_variant;
2299 use deadpool_sqlite::Runtime;
2300 use matrix_sdk_base::{
2301 media::{MediaFormat, MediaRequestParameters},
2302 store::{
2303 ChildTransactionId, DependentQueuedRequestKind, RoomLoadSettings,
2304 SerializableEventContent,
2305 },
2306 sync::UnreadNotificationsCount,
2307 RoomState, StateStore,
2308 };
2309 use matrix_sdk_test::async_test;
2310 use once_cell::sync::Lazy;
2311 use ruma::{
2312 events::{
2313 room::{create::RoomCreateEventContent, message::RoomMessageEventContent, MediaSource},
2314 StateEventType,
2315 },
2316 room_id, server_name, user_id, EventId, MilliSecondsSinceUnixEpoch, OwnedTransactionId,
2317 RoomId, TransactionId, UserId,
2318 };
2319 use rusqlite::Transaction;
2320 use serde::{Deserialize, Serialize};
2321 use serde_json::json;
2322 use tempfile::{tempdir, TempDir};
2323 use tokio::fs;
2324
2325 use super::{init, keys, SqliteStateStore, DATABASE_NAME};
2326 use crate::{
2327 error::{Error, Result},
2328 utils::{EncryptableStore as _, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
2329 OpenStoreError,
2330 };
2331
2332 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2333 static NUM: AtomicU32 = AtomicU32::new(0);
2334 const SECRET: &str = "secret";
2335
2336 fn new_path() -> PathBuf {
2337 let name = NUM.fetch_add(1, SeqCst).to_string();
2338 TMP_DIR.path().join(name)
2339 }
2340
2341 async fn create_fake_db(path: &Path, version: u8) -> Result<SqliteStateStore> {
2342 fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir).unwrap();
2343
2344 let config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
2345 let pool = config.create_pool(Runtime::Tokio1).unwrap();
2348 let conn = pool.get().await?;
2349
2350 init(&conn).await?;
2351
2352 let store_cipher = Some(Arc::new(conn.get_or_create_store_cipher(SECRET).await.unwrap()));
2353 let this = SqliteStateStore { store_cipher, pool };
2354 this.run_migrations(&conn, 1, Some(version)).await?;
2355
2356 Ok(this)
2357 }
2358
2359 fn room_info_v1_json(
2360 room_id: &RoomId,
2361 state: RoomState,
2362 name: Option<&str>,
2363 creator: Option<&UserId>,
2364 ) -> serde_json::Value {
2365 let name_content = match name {
2367 Some(name) => json!({ "name": name }),
2368 None => json!({ "name": null }),
2369 };
2370 let create_content = match creator {
2372 Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2373 None => RoomCreateEventContent::new_v11(),
2374 };
2375
2376 json!({
2377 "room_id": room_id,
2378 "room_type": state,
2379 "notification_counts": UnreadNotificationsCount::default(),
2380 "summary": {
2381 "heroes": [],
2382 "joined_member_count": 0,
2383 "invited_member_count": 0,
2384 },
2385 "members_synced": false,
2386 "base_info": {
2387 "dm_targets": [],
2388 "max_power_level": 100,
2389 "name": {
2390 "Original": {
2391 "content": name_content,
2392 },
2393 },
2394 "create": {
2395 "Original": {
2396 "content": create_content,
2397 }
2398 }
2399 },
2400 })
2401 }
2402
2403 #[async_test]
2404 pub async fn test_migrating_v1_to_v2() {
2405 let path = new_path();
2406 {
2408 let db = create_fake_db(&path, 1).await.unwrap();
2409 let conn = db.pool.get().await.unwrap();
2410
2411 let this = db.clone();
2412 conn.with_transaction(move |txn| {
2413 for i in 0..5 {
2414 let room_id = RoomId::parse(format!("!room_{i}:localhost")).unwrap();
2415 let (state, stripped) =
2416 if i < 3 { (RoomState::Joined, false) } else { (RoomState::Invited, true) };
2417 let info = room_info_v1_json(&room_id, state, None, None);
2418
2419 let room_id = this.encode_key(keys::ROOM_INFO, room_id);
2420 let data = this.serialize_json(&info)?;
2421
2422 txn.prepare_cached(
2423 "INSERT INTO room_info (room_id, stripped, data)
2424 VALUES (?, ?, ?)",
2425 )?
2426 .execute((room_id, stripped, data))?;
2427 }
2428
2429 Result::<_, Error>::Ok(())
2430 })
2431 .await
2432 .unwrap();
2433 }
2434
2435 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2437
2438 assert_eq!(store.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 5);
2440 }
2441
2442 fn add_room_v2(
2444 this: &SqliteStateStore,
2445 txn: &Transaction<'_>,
2446 room_id: &RoomId,
2447 name: Option<&str>,
2448 create_creator: Option<&UserId>,
2449 create_sender: Option<&UserId>,
2450 ) -> Result<(), Error> {
2451 let room_info_json = room_info_v1_json(room_id, RoomState::Joined, name, create_creator);
2452
2453 let encoded_room_id = this.encode_key(keys::ROOM_INFO, room_id);
2454 let encoded_state =
2455 this.encode_key(keys::ROOM_INFO, serde_json::to_string(&RoomState::Joined)?);
2456 let data = this.serialize_json(&room_info_json)?;
2457
2458 txn.prepare_cached(
2459 "INSERT INTO room_info (room_id, state, data)
2460 VALUES (?, ?, ?)",
2461 )?
2462 .execute((encoded_room_id, encoded_state, data))?;
2463
2464 let Some(create_sender) = create_sender else {
2466 return Ok(());
2467 };
2468
2469 let create_content = match create_creator {
2470 Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2471 None => RoomCreateEventContent::new_v11(),
2472 };
2473
2474 let event_id = EventId::new(server_name!("dummy.local"));
2475 let create_event = json!({
2476 "content": create_content,
2477 "event_id": event_id,
2478 "sender": create_sender.to_owned(),
2479 "origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
2480 "state_key": "",
2481 "type": "m.room.create",
2482 "unsigned": {},
2483 });
2484
2485 let encoded_room_id = this.encode_key(keys::STATE_EVENT, room_id);
2486 let encoded_event_type =
2487 this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
2488 let encoded_state_key = this.encode_key(keys::STATE_EVENT, "");
2489 let stripped = false;
2490 let encoded_event_id = this.encode_key(keys::STATE_EVENT, event_id);
2491 let data = this.serialize_json(&create_event)?;
2492
2493 txn.prepare_cached(
2494 "INSERT
2495 INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
2496 VALUES (?, ?, ?, ?, ?, ?)",
2497 )?
2498 .execute((
2499 encoded_room_id,
2500 encoded_event_type,
2501 encoded_state_key,
2502 stripped,
2503 encoded_event_id,
2504 data,
2505 ))?;
2506
2507 Ok(())
2508 }
2509
2510 #[async_test]
2511 pub async fn test_migrating_v2_to_v3() {
2512 let path = new_path();
2513
2514 let room_a_id = room_id!("!room_a:dummy.local");
2516 let room_a_name = "Room A";
2517 let room_a_creator = user_id!("@creator:dummy.local");
2518 let room_a_create_sender = user_id!("@sender:dummy.local");
2521
2522 let room_b_id = room_id!("!room_b:dummy.local");
2524
2525 let room_c_id = room_id!("!room_c:dummy.local");
2527 let room_c_create_sender = user_id!("@creator:dummy.local");
2528
2529 {
2531 let db = create_fake_db(&path, 2).await.unwrap();
2532 let conn = db.pool.get().await.unwrap();
2533
2534 let this = db.clone();
2535 conn.with_transaction(move |txn| {
2536 add_room_v2(
2537 &this,
2538 txn,
2539 room_a_id,
2540 Some(room_a_name),
2541 Some(room_a_creator),
2542 Some(room_a_create_sender),
2543 )?;
2544 add_room_v2(&this, txn, room_b_id, None, None, None)?;
2545 add_room_v2(&this, txn, room_c_id, None, None, Some(room_c_create_sender))?;
2546
2547 Result::<_, Error>::Ok(())
2548 })
2549 .await
2550 .unwrap();
2551 }
2552
2553 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2555
2556 let room_infos = store.get_room_infos(&RoomLoadSettings::default()).await.unwrap();
2558 assert_eq!(room_infos.len(), 3);
2559
2560 let room_a = room_infos.iter().find(|r| r.room_id() == room_a_id).unwrap();
2561 assert_eq!(room_a.name(), Some(room_a_name));
2562 assert_eq!(room_a.creators(), Some(vec![room_a_create_sender.to_owned()]));
2563
2564 let room_b = room_infos.iter().find(|r| r.room_id() == room_b_id).unwrap();
2565 assert_eq!(room_b.name(), None);
2566 assert_eq!(room_b.creators(), None);
2567
2568 let room_c = room_infos.iter().find(|r| r.room_id() == room_c_id).unwrap();
2569 assert_eq!(room_c.name(), None);
2570 assert_eq!(room_c.creators(), Some(vec![room_c_create_sender.to_owned()]));
2571 }
2572
2573 #[async_test]
2574 pub async fn test_migrating_v7_to_v9() {
2575 let path = new_path();
2576
2577 let room_id = room_id!("!room_a:dummy.local");
2578 let wedged_event_transaction_id = TransactionId::new();
2579 let local_event_transaction_id = TransactionId::new();
2580
2581 {
2583 let db = create_fake_db(&path, 7).await.unwrap();
2584 let conn = db.pool.get().await.unwrap();
2585
2586 let wedge_tx = wedged_event_transaction_id.clone();
2587 let local_tx = local_event_transaction_id.clone();
2588
2589 conn.with_transaction(move |txn| {
2590 add_dependent_send_queue_event_v7(
2591 &db,
2592 txn,
2593 room_id,
2594 &local_tx,
2595 ChildTransactionId::new(),
2596 DependentQueuedRequestKind::RedactEvent,
2597 )?;
2598 add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?;
2599 add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?;
2600 Result::<_, Error>::Ok(())
2601 })
2602 .await
2603 .unwrap();
2604 }
2605
2606 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2609
2610 let requests = store.load_send_queue_requests(room_id).await.unwrap();
2611 assert!(requests.is_empty());
2612
2613 let dependent_requests = store.load_dependent_queued_requests(room_id).await.unwrap();
2614 assert!(dependent_requests.is_empty());
2615 }
2616
2617 fn add_send_queue_event_v7(
2618 this: &SqliteStateStore,
2619 txn: &Transaction<'_>,
2620 transaction_id: &TransactionId,
2621 room_id: &RoomId,
2622 is_wedged: bool,
2623 ) -> Result<(), Error> {
2624 let content =
2625 SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?;
2626
2627 let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id);
2628 let room_id_value = this.serialize_value(&room_id.to_owned())?;
2629
2630 let content = this.serialize_json(&content)?;
2631
2632 txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
2633 .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
2634
2635 Ok(())
2636 }
2637
2638 fn add_dependent_send_queue_event_v7(
2639 this: &SqliteStateStore,
2640 txn: &Transaction<'_>,
2641 room_id: &RoomId,
2642 parent_txn_id: &TransactionId,
2643 own_txn_id: ChildTransactionId,
2644 content: DependentQueuedRequestKind,
2645 ) -> Result<(), Error> {
2646 let room_id_value = this.serialize_value(&room_id.to_owned())?;
2647
2648 let parent_txn_id = parent_txn_id.to_string();
2649 let own_txn_id = own_txn_id.to_string();
2650 let content = this.serialize_json(&content)?;
2651
2652 txn.prepare_cached(
2653 "INSERT INTO dependent_send_queue_events
2654 (room_id, parent_transaction_id, own_transaction_id, content)
2655 VALUES (?, ?, ?, ?)",
2656 )?
2657 .execute((room_id_value, parent_txn_id, own_txn_id, content))?;
2658
2659 Ok(())
2660 }
2661
2662 #[derive(Clone, Debug, Serialize, Deserialize)]
2663 pub enum LegacyDependentQueuedRequestKind {
2664 UploadFileWithThumbnail {
2665 content_type: String,
2666 cache_key: MediaRequestParameters,
2667 related_to: OwnedTransactionId,
2668 },
2669 }
2670
2671 #[async_test]
2672 pub async fn test_dependent_queued_request_variant_renaming() {
2673 let path = new_path();
2674 let db = create_fake_db(&path, 7).await.unwrap();
2675
2676 let cache_key = MediaRequestParameters {
2677 format: MediaFormat::File,
2678 source: MediaSource::Plain("https://server.local/foobar".into()),
2679 };
2680 let related_to = TransactionId::new();
2681 let request = LegacyDependentQueuedRequestKind::UploadFileWithThumbnail {
2682 content_type: "image/png".to_owned(),
2683 cache_key,
2684 related_to: related_to.clone(),
2685 };
2686
2687 let data = db
2688 .serialize_json(&request)
2689 .expect("should be able to serialize legacy dependent request");
2690 let deserialized: DependentQueuedRequestKind = db.deserialize_json(&data).expect(
2691 "should be able to deserialize dependent request from legacy dependent request",
2692 );
2693
2694 as_variant!(deserialized, DependentQueuedRequestKind::UploadFileOrThumbnail { related_to: de_related_to, .. } => {
2695 assert_eq!(de_related_to, related_to);
2696 });
2697 }
2698}