1use std::{
2 borrow::Cow,
3 collections::{BTreeMap, BTreeSet, HashMap},
4 fmt, iter,
5 path::Path,
6 sync::Arc,
7};
8
9use async_trait::async_trait;
10use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
11use matrix_sdk_base::{
12 deserialized_responses::{DisplayName, RawAnySyncOrStrippedState, SyncOrStrippedState},
13 store::{
14 migration_helpers::RoomInfoV1, ChildTransactionId, DependentQueuedRequest,
15 DependentQueuedRequestKind, QueueWedgeError, QueuedRequest, QueuedRequestKind,
16 SentRequestKey,
17 },
18 MinimalRoomMemberEvent, RoomInfo, RoomMemberships, RoomState, StateChanges, StateStore,
19 StateStoreDataKey, StateStoreDataValue,
20};
21use matrix_sdk_store_encryption::StoreCipher;
22use ruma::{
23 canonical_json::{redact, RedactedBecause},
24 events::{
25 presence::PresenceEvent,
26 receipt::{Receipt, ReceiptThread, ReceiptType},
27 room::{
28 create::RoomCreateEventContent,
29 member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
30 },
31 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
32 GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
33 },
34 serde::Raw,
35 CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
36 OwnedTransactionId, OwnedUserId, RoomId, RoomVersionId, TransactionId, UInt, UserId,
37};
38use rusqlite::{OptionalExtension, Transaction};
39use serde::{de::DeserializeOwned, Deserialize, Serialize};
40use tokio::fs;
41use tracing::{debug, warn};
42
43use crate::{
44 error::{Error, Result},
45 utils::{
46 repeat_vars, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
47 SqliteKeyValueStoreConnExt,
48 },
49 OpenStoreError,
50};
51
52mod keys {
53 pub const KV_BLOB: &str = "kv_blob";
55 pub const ROOM_INFO: &str = "room_info";
56 pub const STATE_EVENT: &str = "state_event";
57 pub const GLOBAL_ACCOUNT_DATA: &str = "global_account_data";
58 pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
59 pub const MEMBER: &str = "member";
60 pub const PROFILE: &str = "profile";
61 pub const RECEIPT: &str = "receipt";
62 pub const DISPLAY_NAME: &str = "display_name";
63 pub const SEND_QUEUE: &str = "send_queue_events";
64 pub const DEPENDENTS_SEND_QUEUE: &str = "dependent_send_queue_events";
65}
66
67const DATABASE_VERSION: u8 = 12;
73
74#[derive(Clone)]
76pub struct SqliteStateStore {
77 store_cipher: Option<Arc<StoreCipher>>,
78 pool: SqlitePool,
79}
80
81#[cfg(not(tarpaulin_include))]
82impl fmt::Debug for SqliteStateStore {
83 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84 f.debug_struct("SqliteStateStore").finish_non_exhaustive()
85 }
86}
87
88impl SqliteStateStore {
89 pub async fn open(
92 path: impl AsRef<Path>,
93 passphrase: Option<&str>,
94 ) -> Result<Self, OpenStoreError> {
95 let pool = create_pool(path.as_ref()).await?;
96
97 Self::open_with_pool(pool, passphrase).await
98 }
99
100 pub async fn open_with_pool(
103 pool: SqlitePool,
104 passphrase: Option<&str>,
105 ) -> Result<Self, OpenStoreError> {
106 let conn = pool.get().await?;
107 conn.set_journal_size_limit().await?;
108
109 let mut version = conn.db_version().await?;
110
111 if version == 0 {
112 init(&conn).await?;
113 version = 1;
114 }
115
116 let store_cipher = match passphrase {
117 Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
118 None => None,
119 };
120 let this = Self { store_cipher, pool };
121 this.run_migrations(&conn, version, None).await?;
122 conn.optimize().await?;
123
124 Ok(this)
125 }
126
127 async fn run_migrations(&self, conn: &SqliteAsyncConn, from: u8, to: Option<u8>) -> Result<()> {
132 let to = to.unwrap_or(DATABASE_VERSION);
133
134 if from < to {
135 debug!(version = from, new_version = to, "Upgrading database");
136 } else {
137 return Ok(());
138 }
139
140 if from < 2 && to >= 2 {
141 let this = self.clone();
142 conn.with_transaction(move |txn| {
143 txn.execute_batch(include_str!(
145 "../migrations/state_store/002_a_create_new_room_info.sql"
146 ))?;
147
148 for data in txn
150 .prepare("SELECT data FROM room_info")?
151 .query_map((), |row| row.get::<_, Vec<u8>>(0))?
152 {
153 let data = data?;
154 let room_info: RoomInfoV1 = this.deserialize_json(&data)?;
155
156 let room_id = this.encode_key(keys::ROOM_INFO, room_info.room_id());
157 let state = this
158 .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
159 txn.prepare_cached(
160 "INSERT OR REPLACE INTO new_room_info (room_id, state, data)
161 VALUES (?, ?, ?)",
162 )?
163 .execute((room_id, state, data))?;
164 }
165
166 txn.execute_batch(include_str!(
168 "../migrations/state_store/002_b_replace_room_info.sql"
169 ))?;
170
171 txn.set_db_version(2)?;
172 Result::<_, Error>::Ok(())
173 })
174 .await?;
175 }
176
177 if from < 3 && to >= 3 {
179 let this = self.clone();
180 conn.with_transaction(move |txn| {
181 for data in txn
183 .prepare("SELECT data FROM room_info")?
184 .query_map((), |row| row.get::<_, Vec<u8>>(0))?
185 {
186 let data = data?;
187 let room_info_v1: RoomInfoV1 = this.deserialize_json(&data)?;
188
189 let room_id = this.encode_key(keys::STATE_EVENT, room_info_v1.room_id());
191 let event_type =
192 this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
193 let create_res = txn
194 .prepare(
195 "SELECT stripped, data FROM state_event
196 WHERE room_id = ? AND event_type = ?",
197 )?
198 .query_row([room_id, event_type], |row| {
199 Ok((row.get::<_, bool>(0)?, row.get::<_, Vec<u8>>(1)?))
200 })
201 .optional()?;
202
203 let create = create_res.and_then(|(stripped, data)| {
204 let create = if stripped {
205 SyncOrStrippedState::<RoomCreateEventContent>::Stripped(
206 this.deserialize_json(&data).ok()?,
207 )
208 } else {
209 SyncOrStrippedState::Sync(this.deserialize_json(&data).ok()?)
210 };
211 Some(create)
212 });
213
214 let migrated_room_info = room_info_v1.migrate(create.as_ref());
215
216 let data = this.serialize_json(&migrated_room_info)?;
217 let room_id = this.encode_key(keys::ROOM_INFO, migrated_room_info.room_id());
218 txn.prepare_cached("UPDATE room_info SET data = ? WHERE room_id = ?")?
219 .execute((data, room_id))?;
220 }
221
222 txn.set_db_version(3)?;
223 Result::<_, Error>::Ok(())
224 })
225 .await?;
226 }
227
228 if from < 4 && to >= 4 {
229 conn.with_transaction(move |txn| {
230 txn.execute_batch(include_str!("../migrations/state_store/003_send_queue.sql"))?;
232 txn.set_db_version(4)
233 })
234 .await?;
235 }
236
237 if from < 5 && to >= 5 {
238 conn.with_transaction(move |txn| {
239 txn.execute_batch(include_str!(
241 "../migrations/state_store/004_send_queue_with_roomid_value.sql"
242 ))?;
243 txn.set_db_version(4)
244 })
245 .await?;
246 }
247
248 if from < 6 && to >= 6 {
249 conn.with_transaction(move |txn| {
250 txn.execute_batch(include_str!(
252 "../migrations/state_store/005_send_queue_dependent_events.sql"
253 ))?;
254 txn.set_db_version(6)
255 })
256 .await?;
257 }
258
259 if from < 7 && to >= 7 {
260 conn.with_transaction(move |txn| {
261 txn.execute_batch(include_str!("../migrations/state_store/006_drop_media.sql"))?;
263 txn.set_db_version(7)
264 })
265 .await?;
266 }
267
268 if from < 8 && to >= 8 {
269 let error = QueueWedgeError::GenericApiError {
271 msg: "local echo failed to send in a previous session".into(),
272 };
273 let default_err = self.serialize_value(&error)?;
274
275 conn.with_transaction(move |txn| {
276 txn.execute_batch(include_str!("../migrations/state_store/007_a_send_queue_wedge_reason.sql"))?;
278
279 for wedged_entries in txn
282 .prepare("SELECT room_id, transaction_id FROM send_queue_events WHERE wedged = 1")?
283 .query_map((), |row| {
284 Ok(
285 (row.get::<_, Vec<u8>>(0)?,row.get::<_, String>(1)?)
286 )
287 })? {
288
289 let (room_id, transaction_id) = wedged_entries?;
290
291 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?
292 .execute((default_err.clone(), room_id, transaction_id))?;
293 }
294
295
296 txn.execute_batch(include_str!("../migrations/state_store/007_b_send_queue_clean.sql"))?;
298
299 txn.set_db_version(8)
300 })
301 .await?;
302 }
303
304 if from < 9 && to >= 9 {
305 conn.with_transaction(move |txn| {
306 txn.execute_batch(include_str!("../migrations/state_store/008_send_queue.sql"))?;
308 txn.set_db_version(9)
309 })
310 .await?;
311 }
312
313 if from < 10 && to >= 10 {
314 conn.with_transaction(move |txn| {
315 txn.execute_batch(include_str!(
317 "../migrations/state_store/009_send_queue_priority.sql"
318 ))?;
319 txn.set_db_version(10)
320 })
321 .await?;
322 }
323
324 if from < 11 && to >= 11 {
325 conn.with_transaction(move |txn| {
326 txn.execute_batch(include_str!(
328 "../migrations/state_store/010_send_queue_enqueue_time.sql"
329 ))?;
330 txn.set_db_version(11)
331 })
332 .await?;
333 }
334
335 if from < 12 && to >= 12 {
336 conn.vacuum().await?;
340 conn.set_kv("version", vec![12]).await?;
341 }
342
343 Ok(())
344 }
345
346 fn encode_value(&self, value: Vec<u8>) -> Result<Vec<u8>> {
347 if let Some(key) = &self.store_cipher {
348 let encrypted = key.encrypt_value_data(value)?;
349 Ok(rmp_serde::to_vec_named(&encrypted)?)
350 } else {
351 Ok(value)
352 }
353 }
354
355 fn serialize_value(&self, value: &impl Serialize) -> Result<Vec<u8>> {
356 let serialized = rmp_serde::to_vec_named(value)?;
357 self.encode_value(serialized)
358 }
359
360 fn serialize_json(&self, value: &impl Serialize) -> Result<Vec<u8>> {
361 let serialized = serde_json::to_vec(value)?;
362 self.encode_value(serialized)
363 }
364
365 fn decode_value<'a>(&self, value: &'a [u8]) -> Result<Cow<'a, [u8]>> {
366 if let Some(key) = &self.store_cipher {
367 let encrypted = rmp_serde::from_slice(value)?;
368 let decrypted = key.decrypt_value_data(encrypted)?;
369 Ok(Cow::Owned(decrypted))
370 } else {
371 Ok(Cow::Borrowed(value))
372 }
373 }
374
375 fn deserialize_json<T: DeserializeOwned>(&self, data: &[u8]) -> Result<T> {
376 let decoded = self.decode_value(data)?;
377 Ok(serde_json::from_slice(&decoded)?)
378 }
379
380 fn deserialize_value<T: DeserializeOwned>(&self, value: &[u8]) -> Result<T> {
381 let decoded = self.decode_value(value)?;
382 Ok(rmp_serde::from_slice(&decoded)?)
383 }
384
385 fn encode_key(&self, table_name: &str, key: impl AsRef<[u8]>) -> Key {
386 let bytes = key.as_ref();
387 if let Some(store_cipher) = &self.store_cipher {
388 Key::Hashed(store_cipher.hash_key(table_name, bytes))
389 } else {
390 Key::Plain(bytes.to_owned())
391 }
392 }
393
394 fn encode_state_store_data_key(&self, key: StateStoreDataKey<'_>) -> Key {
395 let key_s = match key {
396 StateStoreDataKey::SyncToken => Cow::Borrowed(StateStoreDataKey::SYNC_TOKEN),
397 StateStoreDataKey::ServerCapabilities => {
398 Cow::Borrowed(StateStoreDataKey::SERVER_CAPABILITIES)
399 }
400 StateStoreDataKey::Filter(f) => {
401 Cow::Owned(format!("{}:{f}", StateStoreDataKey::FILTER))
402 }
403 StateStoreDataKey::UserAvatarUrl(u) => {
404 Cow::Owned(format!("{}:{u}", StateStoreDataKey::USER_AVATAR_URL))
405 }
406 StateStoreDataKey::RecentlyVisitedRooms(b) => {
407 Cow::Owned(format!("{}:{b}", StateStoreDataKey::RECENTLY_VISITED_ROOMS))
408 }
409 StateStoreDataKey::UtdHookManagerData => {
410 Cow::Borrowed(StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
411 }
412 StateStoreDataKey::ComposerDraft(room_id) => {
413 Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::COMPOSER_DRAFT))
414 }
415 StateStoreDataKey::SeenKnockRequests(room_id) => {
416 Cow::Owned(format!("{}:{room_id}", StateStoreDataKey::SEEN_KNOCK_REQUESTS))
417 }
418 };
419
420 self.encode_key(keys::KV_BLOB, &*key_s)
421 }
422
423 fn encode_presence_key(&self, user_id: &UserId) -> Key {
424 self.encode_key(keys::KV_BLOB, format!("presence:{user_id}"))
425 }
426
427 fn encode_custom_key(&self, key: &[u8]) -> Key {
428 let mut full_key = b"custom:".to_vec();
429 full_key.extend(key);
430 self.encode_key(keys::KV_BLOB, full_key)
431 }
432
433 async fn acquire(&self) -> Result<SqliteAsyncConn> {
434 Ok(self.pool.get().await?)
435 }
436
437 fn remove_maybe_stripped_room_data(
438 &self,
439 txn: &Transaction<'_>,
440 room_id: &RoomId,
441 stripped: bool,
442 ) -> rusqlite::Result<()> {
443 let state_event_room_id = self.encode_key(keys::STATE_EVENT, room_id);
444 txn.remove_room_state_events(&state_event_room_id, Some(stripped))?;
445
446 let member_room_id = self.encode_key(keys::MEMBER, room_id);
447 txn.remove_room_members(&member_room_id, Some(stripped))
448 }
449}
450
451async fn create_pool(path: &Path) -> Result<SqlitePool, OpenStoreError> {
452 fs::create_dir_all(path).await.map_err(OpenStoreError::CreateDir)?;
453 let cfg = deadpool_sqlite::Config::new(path.join("matrix-sdk-state.sqlite3"));
454 Ok(cfg.create_pool(Runtime::Tokio1)?)
455}
456
457async fn init(conn: &SqliteAsyncConn) -> Result<()> {
459 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
462 conn.with_transaction(|txn| {
463 txn.execute_batch(include_str!("../migrations/state_store/001_init.sql"))?;
464 txn.set_db_version(1)?;
465
466 Ok(())
467 })
468 .await
469}
470
471trait SqliteConnectionStateStoreExt {
472 fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()>;
473
474 fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()>;
475
476 fn set_room_account_data(
477 &self,
478 room_id: &[u8],
479 event_type: &[u8],
480 data: &[u8],
481 ) -> rusqlite::Result<()>;
482 fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()>;
483
484 fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()>;
485 fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>>;
486 fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()>;
487
488 fn set_state_event(
489 &self,
490 room_id: &[u8],
491 event_type: &[u8],
492 state_key: &[u8],
493 stripped: bool,
494 event_id: Option<&[u8]>,
495 data: &[u8],
496 ) -> rusqlite::Result<()>;
497 fn get_state_event_by_id(
498 &self,
499 room_id: &[u8],
500 event_id: &[u8],
501 ) -> rusqlite::Result<Option<Vec<u8>>>;
502 fn remove_room_state_events(
503 &self,
504 room_id: &[u8],
505 stripped: Option<bool>,
506 ) -> rusqlite::Result<()>;
507
508 fn set_member(
509 &self,
510 room_id: &[u8],
511 user_id: &[u8],
512 membership: &[u8],
513 stripped: bool,
514 data: &[u8],
515 ) -> rusqlite::Result<()>;
516 fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()>;
517
518 fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()>;
519 fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()>;
520 fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()>;
521
522 fn set_receipt(
523 &self,
524 room_id: &[u8],
525 user_id: &[u8],
526 receipt_type: &[u8],
527 thread_id: &[u8],
528 event_id: &[u8],
529 data: &[u8],
530 ) -> rusqlite::Result<()>;
531 fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()>;
532
533 fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()>;
534 fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()>;
535 fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()>;
536 fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
537 fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()>;
538}
539
540impl SqliteConnectionStateStoreExt for rusqlite::Connection {
541 fn set_kv_blob(&self, key: &[u8], value: &[u8]) -> rusqlite::Result<()> {
542 self.execute("INSERT OR REPLACE INTO kv_blob VALUES (?, ?)", (key, value))?;
543 Ok(())
544 }
545
546 fn set_global_account_data(&self, event_type: &[u8], data: &[u8]) -> rusqlite::Result<()> {
547 self.prepare_cached(
548 "INSERT OR REPLACE INTO global_account_data (event_type, data)
549 VALUES (?, ?)",
550 )?
551 .execute((event_type, data))?;
552 Ok(())
553 }
554
555 fn set_room_account_data(
556 &self,
557 room_id: &[u8],
558 event_type: &[u8],
559 data: &[u8],
560 ) -> rusqlite::Result<()> {
561 self.prepare_cached(
562 "INSERT OR REPLACE INTO room_account_data (room_id, event_type, data)
563 VALUES (?, ?, ?)",
564 )?
565 .execute((room_id, event_type, data))?;
566 Ok(())
567 }
568
569 fn remove_room_account_data(&self, room_id: &[u8]) -> rusqlite::Result<()> {
570 self.prepare(
571 "DELETE FROM room_account_data
572 WHERE room_id = ?",
573 )?
574 .execute((room_id,))?;
575 Ok(())
576 }
577
578 fn set_room_info(&self, room_id: &[u8], state: &[u8], data: &[u8]) -> rusqlite::Result<()> {
579 self.prepare_cached(
580 "INSERT OR REPLACE INTO room_info (room_id, state, data)
581 VALUES (?, ?, ?)",
582 )?
583 .execute((room_id, state, data))?;
584 Ok(())
585 }
586
587 fn get_room_info(&self, room_id: &[u8]) -> rusqlite::Result<Option<Vec<u8>>> {
588 self.query_row("SELECT data FROM room_info WHERE room_id = ?", (room_id,), |row| row.get(0))
589 .optional()
590 }
591
592 fn remove_room_info(&self, room_id: &[u8]) -> rusqlite::Result<()> {
594 self.prepare_cached("DELETE FROM room_info WHERE room_id = ?")?.execute((room_id,))?;
595 Ok(())
596 }
597
598 fn set_state_event(
599 &self,
600 room_id: &[u8],
601 event_type: &[u8],
602 state_key: &[u8],
603 stripped: bool,
604 event_id: Option<&[u8]>,
605 data: &[u8],
606 ) -> rusqlite::Result<()> {
607 self.prepare_cached(
608 "INSERT OR REPLACE
609 INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
610 VALUES (?, ?, ?, ?, ?, ?)",
611 )?
612 .execute((room_id, event_type, state_key, stripped, event_id, data))?;
613 Ok(())
614 }
615
616 fn get_state_event_by_id(
617 &self,
618 room_id: &[u8],
619 event_id: &[u8],
620 ) -> rusqlite::Result<Option<Vec<u8>>> {
621 self.query_row(
622 "SELECT data FROM state_event WHERE room_id = ? AND event_id = ?",
623 (room_id, event_id),
624 |row| row.get(0),
625 )
626 .optional()
627 }
628
629 fn remove_room_state_events(
635 &self,
636 room_id: &[u8],
637 stripped: Option<bool>,
638 ) -> rusqlite::Result<()> {
639 if let Some(stripped) = stripped {
640 self.prepare_cached("DELETE FROM state_event WHERE room_id = ? AND stripped = ?")?
641 .execute((room_id, stripped))?;
642 } else {
643 self.prepare_cached("DELETE FROM state_event WHERE room_id = ?")?
644 .execute((room_id,))?;
645 }
646 Ok(())
647 }
648
649 fn set_member(
650 &self,
651 room_id: &[u8],
652 user_id: &[u8],
653 membership: &[u8],
654 stripped: bool,
655 data: &[u8],
656 ) -> rusqlite::Result<()> {
657 self.prepare_cached(
658 "INSERT OR REPLACE
659 INTO member (room_id, user_id, membership, stripped, data)
660 VALUES (?, ?, ?, ?, ?)",
661 )?
662 .execute((room_id, user_id, membership, stripped, data))?;
663 Ok(())
664 }
665
666 fn remove_room_members(&self, room_id: &[u8], stripped: Option<bool>) -> rusqlite::Result<()> {
671 if let Some(stripped) = stripped {
672 self.prepare_cached("DELETE FROM member WHERE room_id = ? AND stripped = ?")?
673 .execute((room_id, stripped))?;
674 } else {
675 self.prepare_cached("DELETE FROM member WHERE room_id = ?")?.execute((room_id,))?;
676 }
677 Ok(())
678 }
679
680 fn set_profile(&self, room_id: &[u8], user_id: &[u8], data: &[u8]) -> rusqlite::Result<()> {
681 self.prepare_cached(
682 "INSERT OR REPLACE
683 INTO profile (room_id, user_id, data)
684 VALUES (?, ?, ?)",
685 )?
686 .execute((room_id, user_id, data))?;
687 Ok(())
688 }
689
690 fn remove_room_profiles(&self, room_id: &[u8]) -> rusqlite::Result<()> {
691 self.prepare("DELETE FROM profile WHERE room_id = ?")?.execute((room_id,))?;
692 Ok(())
693 }
694
695 fn remove_room_profile(&self, room_id: &[u8], user_id: &[u8]) -> rusqlite::Result<()> {
696 self.prepare("DELETE FROM profile WHERE room_id = ? AND user_id = ?")?
697 .execute((room_id, user_id))?;
698 Ok(())
699 }
700
701 fn set_receipt(
702 &self,
703 room_id: &[u8],
704 user_id: &[u8],
705 receipt_type: &[u8],
706 thread: &[u8],
707 event_id: &[u8],
708 data: &[u8],
709 ) -> rusqlite::Result<()> {
710 self.prepare_cached(
711 "INSERT OR REPLACE
712 INTO receipt (room_id, user_id, receipt_type, thread, event_id, data)
713 VALUES (?, ?, ?, ?, ?, ?)",
714 )?
715 .execute((room_id, user_id, receipt_type, thread, event_id, data))?;
716 Ok(())
717 }
718
719 fn remove_room_receipts(&self, room_id: &[u8]) -> rusqlite::Result<()> {
720 self.prepare("DELETE FROM receipt WHERE room_id = ?")?.execute((room_id,))?;
721 Ok(())
722 }
723
724 fn set_display_name(&self, room_id: &[u8], name: &[u8], data: &[u8]) -> rusqlite::Result<()> {
725 self.prepare_cached(
726 "INSERT OR REPLACE
727 INTO display_name (room_id, name, data)
728 VALUES (?, ?, ?)",
729 )?
730 .execute((room_id, name, data))?;
731 Ok(())
732 }
733
734 fn remove_display_name(&self, room_id: &[u8], name: &[u8]) -> rusqlite::Result<()> {
735 self.prepare("DELETE FROM display_name WHERE room_id = ? AND name = ?")?
736 .execute((room_id, name))?;
737 Ok(())
738 }
739
740 fn remove_room_display_names(&self, room_id: &[u8]) -> rusqlite::Result<()> {
741 self.prepare("DELETE FROM display_name WHERE room_id = ?")?.execute((room_id,))?;
742 Ok(())
743 }
744
745 fn remove_room_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
746 self.prepare("DELETE FROM send_queue_events WHERE room_id = ?")?.execute((room_id,))?;
747 Ok(())
748 }
749
750 fn remove_room_dependent_send_queue(&self, room_id: &[u8]) -> rusqlite::Result<()> {
751 self.prepare("DELETE FROM dependent_send_queue_events WHERE room_id = ?")?
752 .execute((room_id,))?;
753 Ok(())
754 }
755}
756
757#[async_trait]
758trait SqliteObjectStateStoreExt: SqliteAsyncConnExt {
759 async fn get_kv_blob(&self, key: Key) -> Result<Option<Vec<u8>>> {
760 Ok(self
761 .query_row("SELECT value FROM kv_blob WHERE key = ?", (key,), |row| row.get(0))
762 .await
763 .optional()?)
764 }
765
766 async fn get_kv_blobs(&self, keys: Vec<Key>) -> Result<Vec<Vec<u8>>> {
767 let keys_length = keys.len();
768
769 self.chunk_large_query_over(keys, Some(keys_length), |txn, keys| {
770 let sql_params = repeat_vars(keys.len());
771 let sql = format!("SELECT value FROM kv_blob WHERE key IN ({sql_params})");
772
773 let params = rusqlite::params_from_iter(keys);
774
775 Ok(txn
776 .prepare(&sql)?
777 .query(params)?
778 .mapped(|row| row.get(0))
779 .collect::<Result<_, _>>()?)
780 })
781 .await
782 }
783
784 async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()>;
785
786 async fn delete_kv_blob(&self, key: Key) -> Result<()> {
787 self.execute("DELETE FROM kv_blob WHERE key = ?", (key,)).await?;
788 Ok(())
789 }
790
791 async fn get_room_infos(&self) -> Result<Vec<Vec<u8>>> {
792 Ok(self
793 .prepare("SELECT data FROM room_info", move |mut stmt| {
794 stmt.query_map((), |row| row.get(0))?.collect()
795 })
796 .await?)
797 }
798
799 async fn get_maybe_stripped_state_events_for_keys(
800 &self,
801 room_id: Key,
802 event_type: Key,
803 state_keys: Vec<Key>,
804 ) -> Result<Vec<(bool, Vec<u8>)>> {
805 self.chunk_large_query_over(state_keys, None, move |txn, state_keys: Vec<Key>| {
806 let sql_params = repeat_vars(state_keys.len());
807 let sql = format!(
808 "SELECT stripped, data FROM state_event
809 WHERE room_id = ? AND event_type = ? AND state_key IN ({sql_params})"
810 );
811
812 let params = rusqlite::params_from_iter(
813 [room_id.clone(), event_type.clone()].into_iter().chain(state_keys),
814 );
815
816 Ok(txn
817 .prepare(&sql)?
818 .query(params)?
819 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
820 .collect::<Result<_, _>>()?)
821 })
822 .await
823 }
824
825 async fn get_maybe_stripped_state_events(
826 &self,
827 room_id: Key,
828 event_type: Key,
829 ) -> Result<Vec<(bool, Vec<u8>)>> {
830 Ok(self
831 .prepare(
832 "SELECT stripped, data FROM state_event
833 WHERE room_id = ? AND event_type = ?",
834 |mut stmt| {
835 stmt.query((room_id, event_type))?
836 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
837 .collect()
838 },
839 )
840 .await?)
841 }
842
843 async fn get_profiles(
844 &self,
845 room_id: Key,
846 user_ids: Vec<Key>,
847 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
848 let user_ids_length = user_ids.len();
849
850 self.chunk_large_query_over(user_ids, Some(user_ids_length), move |txn, user_ids| {
851 let sql_params = repeat_vars(user_ids.len());
852 let sql = format!(
853 "SELECT user_id, data FROM profile WHERE room_id = ? AND user_id IN ({sql_params})"
854 );
855
856 let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(user_ids));
857
858 Ok(txn
859 .prepare(&sql)?
860 .query(params)?
861 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
862 .collect::<Result<_, _>>()?)
863 })
864 .await
865 }
866
867 async fn get_user_ids(&self, room_id: Key, memberships: Vec<Key>) -> Result<Vec<Vec<u8>>> {
868 let res = if memberships.is_empty() {
869 self.prepare("SELECT data FROM member WHERE room_id = ?", |mut stmt| {
870 stmt.query((room_id,))?.mapped(|row| row.get(0)).collect()
871 })
872 .await?
873 } else {
874 self.chunk_large_query_over(memberships, None, move |txn, memberships| {
875 let sql_params = repeat_vars(memberships.len());
876 let sql = format!(
877 "SELECT data FROM member WHERE room_id = ? AND membership IN ({sql_params})"
878 );
879
880 let params =
881 rusqlite::params_from_iter(iter::once(room_id.clone()).chain(memberships));
882
883 Ok(txn
884 .prepare(&sql)?
885 .query(params)?
886 .mapped(|row| row.get(0))
887 .collect::<Result<_, _>>()?)
888 })
889 .await?
890 };
891
892 Ok(res)
893 }
894
895 async fn get_global_account_data(&self, event_type: Key) -> Result<Option<Vec<u8>>> {
896 Ok(self
897 .query_row(
898 "SELECT data FROM global_account_data WHERE event_type = ?",
899 (event_type,),
900 |row| row.get(0),
901 )
902 .await
903 .optional()?)
904 }
905
906 async fn get_room_account_data(
907 &self,
908 room_id: Key,
909 event_type: Key,
910 ) -> Result<Option<Vec<u8>>> {
911 Ok(self
912 .query_row(
913 "SELECT data FROM room_account_data WHERE room_id = ? AND event_type = ?",
914 (room_id, event_type),
915 |row| row.get(0),
916 )
917 .await
918 .optional()?)
919 }
920
921 async fn get_display_names(
922 &self,
923 room_id: Key,
924 names: Vec<Key>,
925 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
926 let names_length = names.len();
927
928 self.chunk_large_query_over(names, Some(names_length), move |txn, names| {
929 let sql_params = repeat_vars(names.len());
930 let sql = format!(
931 "SELECT name, data FROM display_name WHERE room_id = ? AND name IN ({sql_params})"
932 );
933
934 let params = rusqlite::params_from_iter(iter::once(room_id.clone()).chain(names));
935
936 Ok(txn
937 .prepare(&sql)?
938 .query(params)?
939 .mapped(|row| Ok((row.get(0)?, row.get(1)?)))
940 .collect::<Result<_, _>>()?)
941 })
942 .await
943 }
944
945 async fn get_user_receipt(
946 &self,
947 room_id: Key,
948 receipt_type: Key,
949 thread: Key,
950 user_id: Key,
951 ) -> Result<Option<Vec<u8>>> {
952 Ok(self
953 .query_row(
954 "SELECT data FROM receipt
955 WHERE room_id = ? AND receipt_type = ? AND thread = ? and user_id = ?",
956 (room_id, receipt_type, thread, user_id),
957 |row| row.get(0),
958 )
959 .await
960 .optional()?)
961 }
962
963 async fn get_event_receipts(
964 &self,
965 room_id: Key,
966 receipt_type: Key,
967 thread: Key,
968 event_id: Key,
969 ) -> Result<Vec<Vec<u8>>> {
970 Ok(self
971 .prepare(
972 "SELECT data FROM receipt
973 WHERE room_id = ? AND receipt_type = ? AND thread = ? and event_id = ?",
974 |mut stmt| {
975 stmt.query((room_id, receipt_type, thread, event_id))?
976 .mapped(|row| row.get(0))
977 .collect()
978 },
979 )
980 .await?)
981 }
982}
983
984#[async_trait]
985impl SqliteObjectStateStoreExt for SqliteAsyncConn {
986 async fn set_kv_blob(&self, key: Key, value: Vec<u8>) -> Result<()> {
987 Ok(self.interact(move |conn| conn.set_kv_blob(&key, &value)).await.unwrap()?)
988 }
989}
990
991#[async_trait]
992impl StateStore for SqliteStateStore {
993 type Error = Error;
994
995 async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
996 self.acquire()
997 .await?
998 .get_kv_blob(self.encode_state_store_data_key(key))
999 .await?
1000 .map(|data| {
1001 Ok(match key {
1002 StateStoreDataKey::SyncToken => {
1003 StateStoreDataValue::SyncToken(self.deserialize_value(&data)?)
1004 }
1005 StateStoreDataKey::ServerCapabilities => {
1006 StateStoreDataValue::ServerCapabilities(self.deserialize_value(&data)?)
1007 }
1008 StateStoreDataKey::Filter(_) => {
1009 StateStoreDataValue::Filter(self.deserialize_value(&data)?)
1010 }
1011 StateStoreDataKey::UserAvatarUrl(_) => {
1012 StateStoreDataValue::UserAvatarUrl(self.deserialize_value(&data)?)
1013 }
1014 StateStoreDataKey::RecentlyVisitedRooms(_) => {
1015 StateStoreDataValue::RecentlyVisitedRooms(self.deserialize_value(&data)?)
1016 }
1017 StateStoreDataKey::UtdHookManagerData => {
1018 StateStoreDataValue::UtdHookManagerData(self.deserialize_value(&data)?)
1019 }
1020 StateStoreDataKey::ComposerDraft(_) => {
1021 StateStoreDataValue::ComposerDraft(self.deserialize_value(&data)?)
1022 }
1023 StateStoreDataKey::SeenKnockRequests(_) => {
1024 StateStoreDataValue::SeenKnockRequests(self.deserialize_value(&data)?)
1025 }
1026 })
1027 })
1028 .transpose()
1029 }
1030
1031 async fn set_kv_data(
1032 &self,
1033 key: StateStoreDataKey<'_>,
1034 value: StateStoreDataValue,
1035 ) -> Result<()> {
1036 let serialized_value = match key {
1037 StateStoreDataKey::SyncToken => self.serialize_value(
1038 &value.into_sync_token().expect("Session data not a sync token"),
1039 )?,
1040 StateStoreDataKey::ServerCapabilities => self.serialize_value(
1041 &value
1042 .into_server_capabilities()
1043 .expect("Session data not containing server capabilities"),
1044 )?,
1045 StateStoreDataKey::Filter(_) => {
1046 self.serialize_value(&value.into_filter().expect("Session data not a filter"))?
1047 }
1048 StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
1049 &value.into_user_avatar_url().expect("Session data not an user avatar url"),
1050 )?,
1051 StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
1052 &value.into_recently_visited_rooms().expect("Session data not breadcrumbs"),
1053 )?,
1054 StateStoreDataKey::UtdHookManagerData => self.serialize_value(
1055 &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
1056 )?,
1057 StateStoreDataKey::ComposerDraft(_) => self.serialize_value(
1058 &value.into_composer_draft().expect("Session data not a composer draft"),
1059 )?,
1060 StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
1061 &value
1062 .into_seen_knock_requests()
1063 .expect("Session data is not a set of seen knock request ids"),
1064 )?,
1065 };
1066
1067 self.acquire()
1068 .await?
1069 .set_kv_blob(self.encode_state_store_data_key(key), serialized_value)
1070 .await
1071 }
1072
1073 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
1074 self.acquire().await?.delete_kv_blob(self.encode_state_store_data_key(key)).await
1075 }
1076
1077 async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
1078 let changes = changes.to_owned();
1079 let this = self.clone();
1080 self.acquire()
1081 .await?
1082 .with_transaction(move |txn| {
1083 let StateChanges {
1084 sync_token,
1085 account_data,
1086 presence,
1087 profiles,
1088 profiles_to_delete,
1089 state,
1090 room_account_data,
1091 room_infos,
1092 receipts,
1093 redactions,
1094 stripped_state,
1095 ambiguity_maps,
1096 } = changes;
1097
1098 if let Some(sync_token) = sync_token {
1099 let key = this.encode_state_store_data_key(StateStoreDataKey::SyncToken);
1100 let value = this.serialize_value(&sync_token)?;
1101 txn.set_kv_blob(&key, &value)?;
1102 }
1103
1104 for (event_type, event) in account_data {
1105 let event_type =
1106 this.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1107 let data = this.serialize_json(&event)?;
1108 txn.set_global_account_data(&event_type, &data)?;
1109 }
1110
1111 for (room_id, events) in room_account_data {
1112 let room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1113 for (event_type, event) in events {
1114 let event_type =
1115 this.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1116 let data = this.serialize_json(&event)?;
1117 txn.set_room_account_data(&room_id, &event_type, &data)?;
1118 }
1119 }
1120
1121 for (user_id, event) in presence {
1122 let key = this.encode_presence_key(&user_id);
1123 let value = this.serialize_json(&event)?;
1124 txn.set_kv_blob(&key, &value)?;
1125 }
1126
1127 for (room_id, room_info) in room_infos {
1128 let stripped = room_info.state() == RoomState::Invited;
1129 this.remove_maybe_stripped_room_data(txn, &room_id, !stripped)?;
1131
1132 let room_id = this.encode_key(keys::ROOM_INFO, room_id);
1133 let state = this
1134 .encode_key(keys::ROOM_INFO, serde_json::to_string(&room_info.state())?);
1135 let data = this.serialize_json(&room_info)?;
1136 txn.set_room_info(&room_id, &state, &data)?;
1137 }
1138
1139 for (room_id, user_ids) in profiles_to_delete {
1140 let room_id = this.encode_key(keys::PROFILE, room_id);
1141 for user_id in user_ids {
1142 let user_id = this.encode_key(keys::PROFILE, user_id);
1143 txn.remove_room_profile(&room_id, &user_id)?;
1144 }
1145 }
1146
1147 for (room_id, state_event_types) in state {
1148 let profiles = profiles.get(&room_id);
1149 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1150
1151 for (event_type, state_events) in state_event_types {
1152 let encoded_event_type =
1153 this.encode_key(keys::STATE_EVENT, event_type.to_string());
1154
1155 for (state_key, raw_state_event) in state_events {
1156 let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1157 let data = this.serialize_json(&raw_state_event)?;
1158
1159 let event_id: Option<String> =
1160 raw_state_event.get_field("event_id").ok().flatten();
1161 let encoded_event_id =
1162 event_id.as_ref().map(|e| this.encode_key(keys::STATE_EVENT, e));
1163
1164 txn.set_state_event(
1165 &encoded_room_id,
1166 &encoded_event_type,
1167 &encoded_state_key,
1168 false,
1169 encoded_event_id.as_deref(),
1170 &data,
1171 )?;
1172
1173 if event_type == StateEventType::RoomMember {
1174 let member_event = match raw_state_event
1175 .deserialize_as::<SyncRoomMemberEvent>()
1176 {
1177 Ok(ev) => ev,
1178 Err(e) => {
1179 debug!(event_id, "Failed to deserialize member event: {e}");
1180 continue;
1181 }
1182 };
1183
1184 let encoded_room_id = this.encode_key(keys::MEMBER, &room_id);
1185 let user_id = this.encode_key(keys::MEMBER, &state_key);
1186 let membership = this
1187 .encode_key(keys::MEMBER, member_event.membership().as_str());
1188 let data = this.serialize_value(&state_key)?;
1189
1190 txn.set_member(
1191 &encoded_room_id,
1192 &user_id,
1193 &membership,
1194 false,
1195 &data,
1196 )?;
1197
1198 if let Some(profile) =
1199 profiles.and_then(|p| p.get(member_event.state_key()))
1200 {
1201 let room_id = this.encode_key(keys::PROFILE, &room_id);
1202 let user_id = this.encode_key(keys::PROFILE, &state_key);
1203 let data = this.serialize_json(&profile)?;
1204 txn.set_profile(&room_id, &user_id, &data)?;
1205 }
1206 }
1207 }
1208 }
1209 }
1210
1211 for (room_id, stripped_state_event_types) in stripped_state {
1212 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1213
1214 for (event_type, stripped_state_events) in stripped_state_event_types {
1215 let encoded_event_type =
1216 this.encode_key(keys::STATE_EVENT, event_type.to_string());
1217
1218 for (state_key, raw_stripped_state_event) in stripped_state_events {
1219 let encoded_state_key = this.encode_key(keys::STATE_EVENT, &state_key);
1220 let data = this.serialize_json(&raw_stripped_state_event)?;
1221 txn.set_state_event(
1222 &encoded_room_id,
1223 &encoded_event_type,
1224 &encoded_state_key,
1225 true,
1226 None,
1227 &data,
1228 )?;
1229
1230 if event_type == StateEventType::RoomMember {
1231 let member_event = match raw_stripped_state_event
1232 .deserialize_as::<StrippedRoomMemberEvent>(
1233 ) {
1234 Ok(ev) => ev,
1235 Err(e) => {
1236 debug!("Failed to deserialize stripped member event: {e}");
1237 continue;
1238 }
1239 };
1240
1241 let room_id = this.encode_key(keys::MEMBER, &room_id);
1242 let user_id = this.encode_key(keys::MEMBER, &state_key);
1243 let membership = this.encode_key(
1244 keys::MEMBER,
1245 member_event.content.membership.as_str(),
1246 );
1247 let data = this.serialize_value(&state_key)?;
1248
1249 txn.set_member(&room_id, &user_id, &membership, true, &data)?;
1250 }
1251 }
1252 }
1253 }
1254
1255 for (room_id, receipt_event) in receipts {
1256 let room_id = this.encode_key(keys::RECEIPT, room_id);
1257
1258 for (event_id, receipt_types) in receipt_event {
1259 let encoded_event_id = this.encode_key(keys::RECEIPT, &event_id);
1260
1261 for (receipt_type, receipt_users) in receipt_types {
1262 let receipt_type =
1263 this.encode_key(keys::RECEIPT, receipt_type.as_str());
1264
1265 for (user_id, receipt) in receipt_users {
1266 let encoded_user_id = this.encode_key(keys::RECEIPT, &user_id);
1267 let thread = this.encode_key(
1270 keys::RECEIPT,
1271 rmp_serde::to_vec_named(&receipt.thread)?,
1272 );
1273 let data = this.serialize_json(&ReceiptData {
1274 receipt,
1275 event_id: event_id.clone(),
1276 user_id,
1277 })?;
1278
1279 txn.set_receipt(
1280 &room_id,
1281 &encoded_user_id,
1282 &receipt_type,
1283 &thread,
1284 &encoded_event_id,
1285 &data,
1286 )?;
1287 }
1288 }
1289 }
1290 }
1291
1292 for (room_id, redactions) in redactions {
1293 let make_room_version = || {
1294 let encoded_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1295 txn.get_room_info(&encoded_room_id)
1296 .ok()
1297 .flatten()
1298 .and_then(|v| this.deserialize_json::<RoomInfo>(&v).ok())
1299 .and_then(|info| info.room_version().cloned())
1300 .unwrap_or_else(|| {
1301 warn!(
1302 ?room_id,
1303 "Unable to find the room version, assume version 9"
1304 );
1305 RoomVersionId::V9
1306 })
1307 };
1308
1309 let encoded_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1310 let mut room_version = None;
1311
1312 for (event_id, redaction) in redactions {
1313 let event_id = this.encode_key(keys::STATE_EVENT, event_id);
1314
1315 if let Some(Ok(raw_event)) = txn
1316 .get_state_event_by_id(&encoded_room_id, &event_id)?
1317 .map(|value| this.deserialize_json::<Raw<AnySyncStateEvent>>(&value))
1318 {
1319 let event = raw_event.deserialize()?;
1320 let redacted = redact(
1321 raw_event.deserialize_as::<CanonicalJsonObject>()?,
1322 room_version.get_or_insert_with(make_room_version),
1323 Some(RedactedBecause::from_raw_event(&redaction)?),
1324 )
1325 .map_err(Error::Redaction)?;
1326 let data = this.serialize_json(&redacted)?;
1327
1328 let event_type =
1329 this.encode_key(keys::STATE_EVENT, event.event_type().to_string());
1330 let state_key = this.encode_key(keys::STATE_EVENT, event.state_key());
1331
1332 txn.set_state_event(
1333 &encoded_room_id,
1334 &event_type,
1335 &state_key,
1336 false,
1337 Some(&event_id),
1338 &data,
1339 )?;
1340 }
1341 }
1342 }
1343
1344 for (room_id, display_names) in ambiguity_maps {
1345 let room_id = this.encode_key(keys::DISPLAY_NAME, room_id);
1346
1347 for (name, user_ids) in display_names {
1348 let encoded_name = this.encode_key(
1349 keys::DISPLAY_NAME,
1350 name.as_normalized_str().unwrap_or_else(|| name.as_raw_str()),
1351 );
1352 let data = this.serialize_json(&user_ids)?;
1353
1354 if user_ids.is_empty() {
1355 txn.remove_display_name(&room_id, &encoded_name)?;
1356
1357 let raw_name = this.encode_key(keys::DISPLAY_NAME, name.as_raw_str());
1372 txn.remove_display_name(&room_id, &raw_name)?;
1373 } else {
1374 txn.set_display_name(&room_id, &encoded_name, &data)?;
1376 }
1377 }
1378 }
1379
1380 Ok::<_, Error>(())
1381 })
1382 .await?;
1383
1384 Ok(())
1385 }
1386
1387 async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1388 self.acquire()
1389 .await?
1390 .get_kv_blob(self.encode_presence_key(user_id))
1391 .await?
1392 .map(|data| self.deserialize_json(&data))
1393 .transpose()
1394 }
1395
1396 async fn get_presence_events(
1397 &self,
1398 user_ids: &[OwnedUserId],
1399 ) -> Result<Vec<Raw<PresenceEvent>>> {
1400 if user_ids.is_empty() {
1401 return Ok(Vec::new());
1402 }
1403
1404 let user_ids = user_ids.iter().map(|u| self.encode_presence_key(u)).collect();
1405 self.acquire()
1406 .await?
1407 .get_kv_blobs(user_ids)
1408 .await?
1409 .into_iter()
1410 .map(|data| self.deserialize_json(&data))
1411 .collect()
1412 }
1413
1414 async fn get_state_event(
1415 &self,
1416 room_id: &RoomId,
1417 event_type: StateEventType,
1418 state_key: &str,
1419 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1420 Ok(self
1421 .get_state_events_for_keys(room_id, event_type, &[state_key])
1422 .await?
1423 .into_iter()
1424 .next())
1425 }
1426
1427 async fn get_state_events(
1428 &self,
1429 room_id: &RoomId,
1430 event_type: StateEventType,
1431 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1432 let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1433 let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1434 self.acquire()
1435 .await?
1436 .get_maybe_stripped_state_events(room_id, event_type)
1437 .await?
1438 .into_iter()
1439 .map(|(stripped, data)| {
1440 let ev = if stripped {
1441 RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1442 } else {
1443 RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1444 };
1445
1446 Ok(ev)
1447 })
1448 .collect()
1449 }
1450
1451 async fn get_state_events_for_keys(
1452 &self,
1453 room_id: &RoomId,
1454 event_type: StateEventType,
1455 state_keys: &[&str],
1456 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1457 if state_keys.is_empty() {
1458 return Ok(Vec::new());
1459 }
1460
1461 let room_id = self.encode_key(keys::STATE_EVENT, room_id);
1462 let event_type = self.encode_key(keys::STATE_EVENT, event_type.to_string());
1463 let state_keys = state_keys.iter().map(|k| self.encode_key(keys::STATE_EVENT, k)).collect();
1464 self.acquire()
1465 .await?
1466 .get_maybe_stripped_state_events_for_keys(room_id, event_type, state_keys)
1467 .await?
1468 .into_iter()
1469 .map(|(stripped, data)| {
1470 let ev = if stripped {
1471 RawAnySyncOrStrippedState::Stripped(self.deserialize_json(&data)?)
1472 } else {
1473 RawAnySyncOrStrippedState::Sync(self.deserialize_json(&data)?)
1474 };
1475
1476 Ok(ev)
1477 })
1478 .collect()
1479 }
1480
1481 async fn get_profile(
1482 &self,
1483 room_id: &RoomId,
1484 user_id: &UserId,
1485 ) -> Result<Option<MinimalRoomMemberEvent>> {
1486 let room_id = self.encode_key(keys::PROFILE, room_id);
1487 let user_ids = vec![self.encode_key(keys::PROFILE, user_id)];
1488
1489 self.acquire()
1490 .await?
1491 .get_profiles(room_id, user_ids)
1492 .await?
1493 .into_iter()
1494 .next()
1495 .map(|(_, data)| self.deserialize_json(&data))
1496 .transpose()
1497 }
1498
1499 async fn get_profiles<'a>(
1500 &self,
1501 room_id: &RoomId,
1502 user_ids: &'a [OwnedUserId],
1503 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1504 if user_ids.is_empty() {
1505 return Ok(BTreeMap::new());
1506 }
1507
1508 let room_id = self.encode_key(keys::PROFILE, room_id);
1509 let mut user_ids_map = user_ids
1510 .iter()
1511 .map(|u| (self.encode_key(keys::PROFILE, u), u.as_ref()))
1512 .collect::<BTreeMap<_, _>>();
1513 let user_ids = user_ids_map.keys().cloned().collect();
1514
1515 self.acquire()
1516 .await?
1517 .get_profiles(room_id, user_ids)
1518 .await?
1519 .into_iter()
1520 .map(|(user_id, data)| {
1521 Ok((
1522 user_ids_map
1523 .remove(user_id.as_slice())
1524 .expect("returned user IDs were requested"),
1525 self.deserialize_json(&data)?,
1526 ))
1527 })
1528 .collect()
1529 }
1530
1531 async fn get_user_ids(
1532 &self,
1533 room_id: &RoomId,
1534 membership: RoomMemberships,
1535 ) -> Result<Vec<OwnedUserId>> {
1536 let room_id = self.encode_key(keys::MEMBER, room_id);
1537 let memberships = membership
1538 .as_vec()
1539 .into_iter()
1540 .map(|m| self.encode_key(keys::MEMBER, m.as_str()))
1541 .collect();
1542 self.acquire()
1543 .await?
1544 .get_user_ids(room_id, memberships)
1545 .await?
1546 .iter()
1547 .map(|data| self.deserialize_value(data))
1548 .collect()
1549 }
1550
1551 async fn get_room_infos(&self) -> Result<Vec<RoomInfo>> {
1552 self.acquire()
1553 .await?
1554 .get_room_infos()
1555 .await?
1556 .into_iter()
1557 .map(|data| self.deserialize_json(&data))
1558 .collect()
1559 }
1560
1561 async fn get_users_with_display_name(
1562 &self,
1563 room_id: &RoomId,
1564 display_name: &DisplayName,
1565 ) -> Result<BTreeSet<OwnedUserId>> {
1566 let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1567 let names = vec![self.encode_key(
1568 keys::DISPLAY_NAME,
1569 display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1570 )];
1571
1572 Ok(self
1573 .acquire()
1574 .await?
1575 .get_display_names(room_id, names)
1576 .await?
1577 .into_iter()
1578 .next()
1579 .map(|(_, data)| self.deserialize_json(&data))
1580 .transpose()?
1581 .unwrap_or_default())
1582 }
1583
1584 async fn get_users_with_display_names<'a>(
1585 &self,
1586 room_id: &RoomId,
1587 display_names: &'a [DisplayName],
1588 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1589 let mut result = HashMap::new();
1590
1591 if display_names.is_empty() {
1592 return Ok(result);
1593 }
1594
1595 let room_id = self.encode_key(keys::DISPLAY_NAME, room_id);
1596 let mut names_map = display_names
1597 .iter()
1598 .flat_map(|display_name| {
1599 let raw =
1609 (self.encode_key(keys::DISPLAY_NAME, display_name.as_raw_str()), display_name);
1610 let normalized = display_name.as_normalized_str().map(|normalized| {
1611 (self.encode_key(keys::DISPLAY_NAME, normalized), display_name)
1612 });
1613
1614 iter::once(raw).chain(normalized.into_iter())
1615 })
1616 .collect::<BTreeMap<_, _>>();
1617 let names = names_map.keys().cloned().collect();
1618
1619 for (name, data) in
1620 self.acquire().await?.get_display_names(room_id, names).await?.into_iter()
1621 {
1622 let display_name =
1623 names_map.remove(name.as_slice()).expect("returned display names were requested");
1624 let user_ids: BTreeSet<_> = self.deserialize_json(&data)?;
1625
1626 result.entry(display_name).or_insert_with(BTreeSet::new).extend(user_ids);
1627 }
1628
1629 Ok(result)
1630 }
1631
1632 async fn get_account_data_event(
1633 &self,
1634 event_type: GlobalAccountDataEventType,
1635 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1636 let event_type = self.encode_key(keys::GLOBAL_ACCOUNT_DATA, event_type.to_string());
1637 self.acquire()
1638 .await?
1639 .get_global_account_data(event_type)
1640 .await?
1641 .map(|value| self.deserialize_json(&value))
1642 .transpose()
1643 }
1644
1645 async fn get_room_account_data_event(
1646 &self,
1647 room_id: &RoomId,
1648 event_type: RoomAccountDataEventType,
1649 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1650 let room_id = self.encode_key(keys::ROOM_ACCOUNT_DATA, room_id);
1651 let event_type = self.encode_key(keys::ROOM_ACCOUNT_DATA, event_type.to_string());
1652 self.acquire()
1653 .await?
1654 .get_room_account_data(room_id, event_type)
1655 .await?
1656 .map(|value| self.deserialize_json(&value))
1657 .transpose()
1658 }
1659
1660 async fn get_user_room_receipt_event(
1661 &self,
1662 room_id: &RoomId,
1663 receipt_type: ReceiptType,
1664 thread: ReceiptThread,
1665 user_id: &UserId,
1666 ) -> Result<Option<(OwnedEventId, Receipt)>> {
1667 let room_id = self.encode_key(keys::RECEIPT, room_id);
1668 let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1669 let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1672 let user_id = self.encode_key(keys::RECEIPT, user_id);
1673
1674 self.acquire()
1675 .await?
1676 .get_user_receipt(room_id, receipt_type, thread, user_id)
1677 .await?
1678 .map(|value| {
1679 self.deserialize_json::<ReceiptData>(&value).map(|d| (d.event_id, d.receipt))
1680 })
1681 .transpose()
1682 }
1683
1684 async fn get_event_room_receipt_events(
1685 &self,
1686 room_id: &RoomId,
1687 receipt_type: ReceiptType,
1688 thread: ReceiptThread,
1689 event_id: &EventId,
1690 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1691 let room_id = self.encode_key(keys::RECEIPT, room_id);
1692 let receipt_type = self.encode_key(keys::RECEIPT, receipt_type.to_string());
1693 let thread = self.encode_key(keys::RECEIPT, rmp_serde::to_vec_named(&thread)?);
1696 let event_id = self.encode_key(keys::RECEIPT, event_id);
1697
1698 self.acquire()
1699 .await?
1700 .get_event_receipts(room_id, receipt_type, thread, event_id)
1701 .await?
1702 .iter()
1703 .map(|value| {
1704 self.deserialize_json::<ReceiptData>(value).map(|d| (d.user_id, d.receipt))
1705 })
1706 .collect()
1707 }
1708
1709 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1710 self.acquire().await?.get_kv_blob(self.encode_custom_key(key)).await
1711 }
1712
1713 async fn set_custom_value_no_read(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
1714 let conn = self.acquire().await?;
1715 let key = self.encode_custom_key(key);
1716 conn.set_kv_blob(key, value).await?;
1717 Ok(())
1718 }
1719
1720 async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1721 let conn = self.acquire().await?;
1722 let key = self.encode_custom_key(key);
1723 let previous = conn.get_kv_blob(key.clone()).await?;
1724 conn.set_kv_blob(key, value).await?;
1725 Ok(previous)
1726 }
1727
1728 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1729 let conn = self.acquire().await?;
1730 let key = self.encode_custom_key(key);
1731 let previous = conn.get_kv_blob(key.clone()).await?;
1732 if previous.is_some() {
1733 conn.delete_kv_blob(key).await?;
1734 }
1735 Ok(previous)
1736 }
1737
1738 async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1739 let this = self.clone();
1740 let room_id = room_id.to_owned();
1741
1742 let conn = self.acquire().await?;
1743
1744 conn.with_transaction(move |txn| -> Result<()> {
1745 let room_info_room_id = this.encode_key(keys::ROOM_INFO, &room_id);
1746 txn.remove_room_info(&room_info_room_id)?;
1747
1748 let state_event_room_id = this.encode_key(keys::STATE_EVENT, &room_id);
1749 txn.remove_room_state_events(&state_event_room_id, None)?;
1750
1751 let member_room_id = this.encode_key(keys::MEMBER, &room_id);
1752 txn.remove_room_members(&member_room_id, None)?;
1753
1754 let profile_room_id = this.encode_key(keys::PROFILE, &room_id);
1755 txn.remove_room_profiles(&profile_room_id)?;
1756
1757 let room_account_data_room_id = this.encode_key(keys::ROOM_ACCOUNT_DATA, &room_id);
1758 txn.remove_room_account_data(&room_account_data_room_id)?;
1759
1760 let receipt_room_id = this.encode_key(keys::RECEIPT, &room_id);
1761 txn.remove_room_receipts(&receipt_room_id)?;
1762
1763 let display_name_room_id = this.encode_key(keys::DISPLAY_NAME, &room_id);
1764 txn.remove_room_display_names(&display_name_room_id)?;
1765
1766 let send_queue_room_id = this.encode_key(keys::SEND_QUEUE, &room_id);
1767 txn.remove_room_send_queue(&send_queue_room_id)?;
1768
1769 let dependent_send_queue_room_id =
1770 this.encode_key(keys::DEPENDENTS_SEND_QUEUE, &room_id);
1771 txn.remove_room_dependent_send_queue(&dependent_send_queue_room_id)?;
1772
1773 Ok(())
1774 })
1775 .await?;
1776
1777 conn.vacuum().await
1778 }
1779
1780 async fn save_send_queue_request(
1781 &self,
1782 room_id: &RoomId,
1783 transaction_id: OwnedTransactionId,
1784 created_at: MilliSecondsSinceUnixEpoch,
1785 content: QueuedRequestKind,
1786 priority: usize,
1787 ) -> Result<(), Self::Error> {
1788 let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
1789 let room_id_value = self.serialize_value(&room_id.to_owned())?;
1790
1791 let content = self.serialize_json(&content)?;
1792 let created_at_ts: u64 = created_at.0.into();
1798 self.acquire()
1799 .await?
1800 .with_transaction(move |txn| {
1801 txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
1802 Ok(())
1803 })
1804 .await
1805 }
1806
1807 async fn update_send_queue_request(
1808 &self,
1809 room_id: &RoomId,
1810 transaction_id: &TransactionId,
1811 content: QueuedRequestKind,
1812 ) -> Result<bool, Self::Error> {
1813 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1814
1815 let content = self.serialize_json(&content)?;
1816 let transaction_id = transaction_id.to_string();
1819
1820 let num_updated = self.acquire()
1821 .await?
1822 .with_transaction(move |txn| {
1823 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = NULL, content = ? WHERE room_id = ? AND transaction_id = ?")?.execute((content, room_id, transaction_id))
1824 })
1825 .await?;
1826
1827 Ok(num_updated > 0)
1828 }
1829
1830 async fn remove_send_queue_request(
1831 &self,
1832 room_id: &RoomId,
1833 transaction_id: &TransactionId,
1834 ) -> Result<bool, Self::Error> {
1835 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1836
1837 let transaction_id = transaction_id.to_string();
1839
1840 let num_deleted = self
1841 .acquire()
1842 .await?
1843 .with_transaction(move |txn| {
1844 txn.prepare_cached(
1845 "DELETE FROM send_queue_events WHERE room_id = ? AND transaction_id = ?",
1846 )?
1847 .execute((room_id, &transaction_id))
1848 })
1849 .await?;
1850
1851 Ok(num_deleted > 0)
1852 }
1853
1854 async fn load_send_queue_requests(
1855 &self,
1856 room_id: &RoomId,
1857 ) -> Result<Vec<QueuedRequest>, Self::Error> {
1858 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1859
1860 let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, Option<u64>)> = self
1864 .acquire()
1865 .await?
1866 .prepare(
1867 "SELECT transaction_id, content, wedge_reason, priority, created_at FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
1868 |mut stmt| {
1869 stmt.query((room_id,))?
1870 .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
1871 .collect()
1872 },
1873 )
1874 .await?;
1875
1876 let mut requests = Vec::with_capacity(res.len());
1877 for entry in res {
1878 let created_at = entry
1879 .4
1880 .and_then(UInt::new)
1881 .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
1882 requests.push(QueuedRequest {
1883 transaction_id: entry.0.into(),
1884 kind: self.deserialize_json(&entry.1)?,
1885 error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
1886 priority: entry.3,
1887 created_at,
1888 });
1889 }
1890
1891 Ok(requests)
1892 }
1893
1894 async fn update_send_queue_request_status(
1895 &self,
1896 room_id: &RoomId,
1897 transaction_id: &TransactionId,
1898 error: Option<QueueWedgeError>,
1899 ) -> Result<(), Self::Error> {
1900 let room_id = self.encode_key(keys::SEND_QUEUE, room_id);
1901
1902 let transaction_id = transaction_id.to_string();
1904
1905 let error_value = error.map(|e| self.serialize_value(&e)).transpose()?;
1907
1908 self.acquire()
1909 .await?
1910 .with_transaction(move |txn| {
1911 txn.prepare_cached("UPDATE send_queue_events SET wedge_reason = ? WHERE room_id = ? AND transaction_id = ?")?.execute((error_value, room_id, transaction_id))?;
1912 Ok(())
1913 })
1914 .await
1915 }
1916
1917 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1918 let res: Vec<Vec<u8>> = self
1923 .acquire()
1924 .await?
1925 .prepare("SELECT room_id_val FROM send_queue_events", |mut stmt| {
1926 stmt.query(())?.mapped(|row| row.get(0)).collect()
1927 })
1928 .await?;
1929
1930 Ok(res
1933 .into_iter()
1934 .map(|entry| self.deserialize_value(&entry))
1935 .collect::<Result<BTreeSet<OwnedRoomId>, _>>()?
1936 .into_iter()
1937 .collect())
1938 }
1939
1940 async fn save_dependent_queued_request(
1941 &self,
1942 room_id: &RoomId,
1943 parent_txn_id: &TransactionId,
1944 own_txn_id: ChildTransactionId,
1945 created_at: MilliSecondsSinceUnixEpoch,
1946 content: DependentQueuedRequestKind,
1947 ) -> Result<()> {
1948 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
1949 let content = self.serialize_json(&content)?;
1950
1951 let parent_txn_id = parent_txn_id.to_string();
1953 let own_txn_id = own_txn_id.to_string();
1954
1955 let created_at_ts: u64 = created_at.0.into();
1956 self.acquire()
1957 .await?
1958 .with_transaction(move |txn| {
1959 txn.prepare_cached(
1960 r#"INSERT INTO dependent_send_queue_events
1961 (room_id, parent_transaction_id, own_transaction_id, content, created_at)
1962 VALUES (?, ?, ?, ?, ?)"#,
1963 )?
1964 .execute((
1965 room_id,
1966 parent_txn_id,
1967 own_txn_id,
1968 content,
1969 created_at_ts,
1970 ))?;
1971 Ok(())
1972 })
1973 .await
1974 }
1975
1976 async fn update_dependent_queued_request(
1977 &self,
1978 room_id: &RoomId,
1979 own_transaction_id: &ChildTransactionId,
1980 new_content: DependentQueuedRequestKind,
1981 ) -> Result<bool> {
1982 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
1983 let content = self.serialize_json(&new_content)?;
1984
1985 let own_txn_id = own_transaction_id.to_string();
1987
1988 let num_updated = self
1989 .acquire()
1990 .await?
1991 .with_transaction(move |txn| {
1992 txn.prepare_cached(
1993 r#"UPDATE dependent_send_queue_events
1994 SET content = ?
1995 WHERE own_transaction_id = ?
1996 AND room_id = ?"#,
1997 )?
1998 .execute((content, own_txn_id, room_id))
1999 })
2000 .await?;
2001
2002 if num_updated > 1 {
2003 return Err(Error::InconsistentUpdate);
2004 }
2005
2006 Ok(num_updated == 1)
2007 }
2008
2009 async fn mark_dependent_queued_requests_as_ready(
2010 &self,
2011 room_id: &RoomId,
2012 parent_txn_id: &TransactionId,
2013 parent_key: SentRequestKey,
2014 ) -> Result<usize> {
2015 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2016 let parent_key = self.serialize_value(&parent_key)?;
2017
2018 let parent_txn_id = parent_txn_id.to_string();
2020
2021 self.acquire()
2022 .await?
2023 .with_transaction(move |txn| {
2024 Ok(txn.prepare_cached(
2025 "UPDATE dependent_send_queue_events SET parent_key = ? WHERE parent_transaction_id = ? and room_id = ?",
2026 )?
2027 .execute((parent_key, parent_txn_id, room_id))?)
2028 })
2029 .await
2030 }
2031
2032 async fn remove_dependent_queued_request(
2033 &self,
2034 room_id: &RoomId,
2035 txn_id: &ChildTransactionId,
2036 ) -> Result<bool> {
2037 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2038
2039 let txn_id = txn_id.to_string();
2041
2042 let num_deleted = self
2043 .acquire()
2044 .await?
2045 .with_transaction(move |txn| {
2046 txn.prepare_cached(
2047 "DELETE FROM dependent_send_queue_events WHERE own_transaction_id = ? AND room_id = ?",
2048 )?
2049 .execute((txn_id, room_id))
2050 })
2051 .await?;
2052
2053 Ok(num_deleted > 0)
2054 }
2055
2056 async fn load_dependent_queued_requests(
2057 &self,
2058 room_id: &RoomId,
2059 ) -> Result<Vec<DependentQueuedRequest>> {
2060 let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
2061
2062 let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, Option<u64>)> = self
2064 .acquire()
2065 .await?
2066 .prepare(
2067 "SELECT own_transaction_id, parent_transaction_id, parent_key, content, created_at FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
2068 |mut stmt| {
2069 stmt.query((room_id,))?
2070 .mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
2071 .collect()
2072 },
2073 )
2074 .await?;
2075
2076 let mut dependent_events = Vec::with_capacity(res.len());
2077 for entry in res {
2078 let created_at = entry
2079 .4
2080 .and_then(UInt::new)
2081 .map_or_else(MilliSecondsSinceUnixEpoch::now, MilliSecondsSinceUnixEpoch);
2082 dependent_events.push(DependentQueuedRequest {
2083 own_transaction_id: entry.0.into(),
2084 parent_transaction_id: entry.1.into(),
2085 parent_key: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?,
2086 kind: self.deserialize_json(&entry.3)?,
2087 created_at,
2088 });
2089 }
2090
2091 Ok(dependent_events)
2092 }
2093}
2094
2095#[derive(Debug, Clone, Serialize, Deserialize)]
2096struct ReceiptData {
2097 receipt: Receipt,
2098 event_id: OwnedEventId,
2099 user_id: OwnedUserId,
2100}
2101
2102#[cfg(test)]
2103mod tests {
2104 use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2105
2106 use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2107 use once_cell::sync::Lazy;
2108 use tempfile::{tempdir, TempDir};
2109
2110 use super::SqliteStateStore;
2111
2112 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2113 static NUM: AtomicU32 = AtomicU32::new(0);
2114
2115 async fn get_store() -> Result<impl StateStore, StoreError> {
2116 let name = NUM.fetch_add(1, SeqCst).to_string();
2117 let tmpdir_path = TMP_DIR.path().join(name);
2118
2119 tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2120
2121 Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
2122 }
2123
2124 statestore_integration_tests!();
2125}
2126
2127#[cfg(test)]
2128mod encrypted_tests {
2129 use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2130
2131 use matrix_sdk_base::{statestore_integration_tests, StateStore, StoreError};
2132 use once_cell::sync::Lazy;
2133 use tempfile::{tempdir, TempDir};
2134
2135 use super::SqliteStateStore;
2136
2137 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2138 static NUM: AtomicU32 = AtomicU32::new(0);
2139
2140 async fn get_store() -> Result<impl StateStore, StoreError> {
2141 let name = NUM.fetch_add(1, SeqCst).to_string();
2142 let tmpdir_path = TMP_DIR.path().join(name);
2143
2144 tracing::info!("using store @ {}", tmpdir_path.to_str().unwrap());
2145
2146 Ok(SqliteStateStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
2147 .await
2148 .unwrap())
2149 }
2150
2151 statestore_integration_tests!();
2152}
2153
2154#[cfg(test)]
2155mod migration_tests {
2156 use std::{
2157 path::{Path, PathBuf},
2158 sync::{
2159 atomic::{AtomicU32, Ordering::SeqCst},
2160 Arc,
2161 },
2162 };
2163
2164 use matrix_sdk_base::{
2165 store::{ChildTransactionId, DependentQueuedRequestKind, SerializableEventContent},
2166 sync::UnreadNotificationsCount,
2167 RoomState, StateStore,
2168 };
2169 use matrix_sdk_test::async_test;
2170 use once_cell::sync::Lazy;
2171 use ruma::{
2172 events::{
2173 room::{create::RoomCreateEventContent, message::RoomMessageEventContent},
2174 StateEventType,
2175 },
2176 room_id, server_name, user_id, EventId, MilliSecondsSinceUnixEpoch, RoomId, TransactionId,
2177 UserId,
2178 };
2179 use rusqlite::Transaction;
2180 use serde_json::json;
2181 use tempfile::{tempdir, TempDir};
2182
2183 use super::{create_pool, init, keys, SqliteStateStore};
2184 use crate::{
2185 error::{Error, Result},
2186 utils::{SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt},
2187 };
2188
2189 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2190 static NUM: AtomicU32 = AtomicU32::new(0);
2191 const SECRET: &str = "secret";
2192
2193 fn new_path() -> PathBuf {
2194 let name = NUM.fetch_add(1, SeqCst).to_string();
2195 TMP_DIR.path().join(name)
2196 }
2197
2198 async fn create_fake_db(path: &Path, version: u8) -> Result<SqliteStateStore> {
2199 let pool = create_pool(path).await.unwrap();
2200 let conn = pool.get().await?;
2201
2202 init(&conn).await?;
2203
2204 let store_cipher = Some(Arc::new(conn.get_or_create_store_cipher(SECRET).await.unwrap()));
2205 let this = SqliteStateStore { store_cipher, pool };
2206 this.run_migrations(&conn, 1, Some(version)).await?;
2207
2208 Ok(this)
2209 }
2210
2211 fn room_info_v1_json(
2212 room_id: &RoomId,
2213 state: RoomState,
2214 name: Option<&str>,
2215 creator: Option<&UserId>,
2216 ) -> serde_json::Value {
2217 let name_content = match name {
2219 Some(name) => json!({ "name": name }),
2220 None => json!({ "name": null }),
2221 };
2222 let create_content = match creator {
2224 Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2225 None => RoomCreateEventContent::new_v11(),
2226 };
2227
2228 json!({
2229 "room_id": room_id,
2230 "room_type": state,
2231 "notification_counts": UnreadNotificationsCount::default(),
2232 "summary": {
2233 "heroes": [],
2234 "joined_member_count": 0,
2235 "invited_member_count": 0,
2236 },
2237 "members_synced": false,
2238 "base_info": {
2239 "dm_targets": [],
2240 "max_power_level": 100,
2241 "name": {
2242 "Original": {
2243 "content": name_content,
2244 },
2245 },
2246 "create": {
2247 "Original": {
2248 "content": create_content,
2249 }
2250 }
2251 },
2252 })
2253 }
2254
2255 #[async_test]
2256 pub async fn test_migrating_v1_to_v2() {
2257 let path = new_path();
2258 {
2260 let db = create_fake_db(&path, 1).await.unwrap();
2261 let conn = db.pool.get().await.unwrap();
2262
2263 let this = db.clone();
2264 conn.with_transaction(move |txn| {
2265 for i in 0..5 {
2266 let room_id = RoomId::parse(format!("!room_{i}:localhost")).unwrap();
2267 let (state, stripped) =
2268 if i < 3 { (RoomState::Joined, false) } else { (RoomState::Invited, true) };
2269 let info = room_info_v1_json(&room_id, state, None, None);
2270
2271 let room_id = this.encode_key(keys::ROOM_INFO, room_id);
2272 let data = this.serialize_json(&info)?;
2273
2274 txn.prepare_cached(
2275 "INSERT INTO room_info (room_id, stripped, data)
2276 VALUES (?, ?, ?)",
2277 )?
2278 .execute((room_id, stripped, data))?;
2279 }
2280
2281 Result::<_, Error>::Ok(())
2282 })
2283 .await
2284 .unwrap();
2285 }
2286
2287 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2289
2290 assert_eq!(store.get_room_infos().await.unwrap().len(), 5);
2292 }
2293
2294 fn add_room_v2(
2296 this: &SqliteStateStore,
2297 txn: &Transaction<'_>,
2298 room_id: &RoomId,
2299 name: Option<&str>,
2300 create_creator: Option<&UserId>,
2301 create_sender: Option<&UserId>,
2302 ) -> Result<(), Error> {
2303 let room_info_json = room_info_v1_json(room_id, RoomState::Joined, name, create_creator);
2304
2305 let encoded_room_id = this.encode_key(keys::ROOM_INFO, room_id);
2306 let encoded_state =
2307 this.encode_key(keys::ROOM_INFO, serde_json::to_string(&RoomState::Joined)?);
2308 let data = this.serialize_json(&room_info_json)?;
2309
2310 txn.prepare_cached(
2311 "INSERT INTO room_info (room_id, state, data)
2312 VALUES (?, ?, ?)",
2313 )?
2314 .execute((encoded_room_id, encoded_state, data))?;
2315
2316 let Some(create_sender) = create_sender else {
2318 return Ok(());
2319 };
2320
2321 let create_content = match create_creator {
2322 Some(creator) => RoomCreateEventContent::new_v1(creator.to_owned()),
2323 None => RoomCreateEventContent::new_v11(),
2324 };
2325
2326 let event_id = EventId::new(server_name!("dummy.local"));
2327 let create_event = json!({
2328 "content": create_content,
2329 "event_id": event_id,
2330 "sender": create_sender.to_owned(),
2331 "origin_server_ts": MilliSecondsSinceUnixEpoch::now(),
2332 "state_key": "",
2333 "type": "m.room.create",
2334 "unsigned": {},
2335 });
2336
2337 let encoded_room_id = this.encode_key(keys::STATE_EVENT, room_id);
2338 let encoded_event_type =
2339 this.encode_key(keys::STATE_EVENT, StateEventType::RoomCreate.to_string());
2340 let encoded_state_key = this.encode_key(keys::STATE_EVENT, "");
2341 let stripped = false;
2342 let encoded_event_id = this.encode_key(keys::STATE_EVENT, event_id);
2343 let data = this.serialize_json(&create_event)?;
2344
2345 txn.prepare_cached(
2346 "INSERT
2347 INTO state_event (room_id, event_type, state_key, stripped, event_id, data)
2348 VALUES (?, ?, ?, ?, ?, ?)",
2349 )?
2350 .execute((
2351 encoded_room_id,
2352 encoded_event_type,
2353 encoded_state_key,
2354 stripped,
2355 encoded_event_id,
2356 data,
2357 ))?;
2358
2359 Ok(())
2360 }
2361
2362 #[async_test]
2363 pub async fn test_migrating_v2_to_v3() {
2364 let path = new_path();
2365
2366 let room_a_id = room_id!("!room_a:dummy.local");
2368 let room_a_name = "Room A";
2369 let room_a_creator = user_id!("@creator:dummy.local");
2370 let room_a_create_sender = user_id!("@sender:dummy.local");
2373
2374 let room_b_id = room_id!("!room_b:dummy.local");
2376
2377 let room_c_id = room_id!("!room_c:dummy.local");
2379 let room_c_create_sender = user_id!("@creator:dummy.local");
2380
2381 {
2383 let db = create_fake_db(&path, 2).await.unwrap();
2384 let conn = db.pool.get().await.unwrap();
2385
2386 let this = db.clone();
2387 conn.with_transaction(move |txn| {
2388 add_room_v2(
2389 &this,
2390 txn,
2391 room_a_id,
2392 Some(room_a_name),
2393 Some(room_a_creator),
2394 Some(room_a_create_sender),
2395 )?;
2396 add_room_v2(&this, txn, room_b_id, None, None, None)?;
2397 add_room_v2(&this, txn, room_c_id, None, None, Some(room_c_create_sender))?;
2398
2399 Result::<_, Error>::Ok(())
2400 })
2401 .await
2402 .unwrap();
2403 }
2404
2405 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2407
2408 let room_infos = store.get_room_infos().await.unwrap();
2410 assert_eq!(room_infos.len(), 3);
2411
2412 let room_a = room_infos.iter().find(|r| r.room_id() == room_a_id).unwrap();
2413 assert_eq!(room_a.name(), Some(room_a_name));
2414 assert_eq!(room_a.creator(), Some(room_a_create_sender));
2415
2416 let room_b = room_infos.iter().find(|r| r.room_id() == room_b_id).unwrap();
2417 assert_eq!(room_b.name(), None);
2418 assert_eq!(room_b.creator(), None);
2419
2420 let room_c = room_infos.iter().find(|r| r.room_id() == room_c_id).unwrap();
2421 assert_eq!(room_c.name(), None);
2422 assert_eq!(room_c.creator(), Some(room_c_create_sender));
2423 }
2424
2425 #[async_test]
2426 pub async fn test_migrating_v7_to_v9() {
2427 let path = new_path();
2428
2429 let room_id = room_id!("!room_a:dummy.local");
2430 let wedged_event_transaction_id = TransactionId::new();
2431 let local_event_transaction_id = TransactionId::new();
2432
2433 {
2435 let db = create_fake_db(&path, 7).await.unwrap();
2436 let conn = db.pool.get().await.unwrap();
2437
2438 let wedge_tx = wedged_event_transaction_id.clone();
2439 let local_tx = local_event_transaction_id.clone();
2440
2441 conn.with_transaction(move |txn| {
2442 add_dependent_send_queue_event_v7(
2443 &db,
2444 txn,
2445 room_id,
2446 &local_tx,
2447 ChildTransactionId::new(),
2448 DependentQueuedRequestKind::RedactEvent,
2449 )?;
2450 add_send_queue_event_v7(&db, txn, &wedge_tx, room_id, true)?;
2451 add_send_queue_event_v7(&db, txn, &local_tx, room_id, false)?;
2452 Result::<_, Error>::Ok(())
2453 })
2454 .await
2455 .unwrap();
2456 }
2457
2458 let store = SqliteStateStore::open(path, Some(SECRET)).await.unwrap();
2461
2462 let requests = store.load_send_queue_requests(room_id).await.unwrap();
2463 assert!(requests.is_empty());
2464
2465 let dependent_requests = store.load_dependent_queued_requests(room_id).await.unwrap();
2466 assert!(dependent_requests.is_empty());
2467 }
2468
2469 fn add_send_queue_event_v7(
2470 this: &SqliteStateStore,
2471 txn: &Transaction<'_>,
2472 transaction_id: &TransactionId,
2473 room_id: &RoomId,
2474 is_wedged: bool,
2475 ) -> Result<(), Error> {
2476 let content =
2477 SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())?;
2478
2479 let room_id_key = this.encode_key(keys::SEND_QUEUE, room_id);
2480 let room_id_value = this.serialize_value(&room_id.to_owned())?;
2481
2482 let content = this.serialize_json(&content)?;
2483
2484 txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
2485 .execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;
2486
2487 Ok(())
2488 }
2489
2490 fn add_dependent_send_queue_event_v7(
2491 this: &SqliteStateStore,
2492 txn: &Transaction<'_>,
2493 room_id: &RoomId,
2494 parent_txn_id: &TransactionId,
2495 own_txn_id: ChildTransactionId,
2496 content: DependentQueuedRequestKind,
2497 ) -> Result<(), Error> {
2498 let room_id_value = this.serialize_value(&room_id.to_owned())?;
2499
2500 let parent_txn_id = parent_txn_id.to_string();
2501 let own_txn_id = own_txn_id.to_string();
2502 let content = this.serialize_json(&content)?;
2503
2504 txn.prepare_cached(
2505 "INSERT INTO dependent_send_queue_events
2506 (room_id, parent_transaction_id, own_transaction_id, content)
2507 VALUES (?, ?, ?, ?)",
2508 )?
2509 .execute((room_id_value, parent_txn_id, own_txn_id, content))?;
2510
2511 Ok(())
2512 }
2513}