1use std::{
16 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
17 str::FromStr as _,
18 sync::Arc,
19};
20
21use anyhow::anyhow;
22use async_trait::async_trait;
23use gloo_utils::format::JsValueSerdeExt;
24use growable_bloom_filter::GrowableBloom;
25use indexed_db_futures::prelude::*;
26use matrix_sdk_base::{
27 deserialized_responses::{DisplayName, RawAnySyncOrStrippedState},
28 store::{
29 compare_thread_subscription_bump_stamps, ChildTransactionId, ComposerDraft,
30 DependentQueuedRequest, DependentQueuedRequestKind, QueuedRequest, QueuedRequestKind,
31 RoomLoadSettings, SentRequestKey, SerializableEventContent, ServerInfo, StateChanges,
32 StateStore, StoreError, StoredThreadSubscription, ThreadSubscriptionStatus,
33 },
34 MinimalRoomMemberEvent, RoomInfo, RoomMemberships, StateStoreDataKey, StateStoreDataValue,
35 ThreadSubscriptionCatchupToken, ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK,
36};
37use matrix_sdk_store_encryption::{Error as EncryptionError, StoreCipher};
38use ruma::{
39 canonical_json::{redact, RedactedBecause},
40 events::{
41 presence::PresenceEvent,
42 receipt::{Receipt, ReceiptThread, ReceiptType},
43 room::member::{
44 MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent, SyncRoomMemberEvent,
45 },
46 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
47 GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, SyncStateEvent,
48 },
49 serde::Raw,
50 CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri,
51 OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
52};
53use serde::{de::DeserializeOwned, Deserialize, Serialize};
54use tracing::{debug, warn};
55use wasm_bindgen::JsValue;
56use web_sys::IdbKeyRange;
57
58mod migrations;
59
60pub use self::migrations::MigrationConflictStrategy;
61use self::migrations::{upgrade_inner_db, upgrade_meta_db};
62use crate::serializer::safe_encode::traits::SafeEncode;
63
64#[derive(Debug, thiserror::Error)]
65pub enum IndexeddbStateStoreError {
66 #[error(transparent)]
67 Json(#[from] serde_json::Error),
68 #[error(transparent)]
69 Encryption(#[from] EncryptionError),
70 #[error("DomException {name} ({code}): {message}")]
71 DomException { name: String, message: String, code: u16 },
72 #[error(transparent)]
73 StoreError(#[from] StoreError),
74 #[error(
75 "Can't migrate {name} from {old_version} to {new_version} without deleting data. \
76 See MigrationConflictStrategy for ways to configure."
77 )]
78 MigrationConflict { name: String, old_version: u32, new_version: u32 },
79}
80
81impl From<web_sys::DomException> for IndexeddbStateStoreError {
82 fn from(frm: web_sys::DomException) -> IndexeddbStateStoreError {
83 IndexeddbStateStoreError::DomException {
84 name: frm.name(),
85 message: frm.message(),
86 code: frm.code(),
87 }
88 }
89}
90
91impl From<IndexeddbStateStoreError> for StoreError {
92 fn from(e: IndexeddbStateStoreError) -> Self {
93 match e {
94 IndexeddbStateStoreError::Json(e) => StoreError::Json(e),
95 IndexeddbStateStoreError::StoreError(e) => e,
96 IndexeddbStateStoreError::Encryption(e) => StoreError::Encryption(e),
97 _ => StoreError::backend(e),
98 }
99 }
100}
101
102mod keys {
103 pub const INTERNAL_STATE: &str = "matrix-sdk-state";
104 pub const BACKUPS_META: &str = "backups";
105
106 pub const ACCOUNT_DATA: &str = "account_data";
107
108 pub const PROFILES: &str = "profiles";
109 pub const DISPLAY_NAMES: &str = "display_names";
110 pub const USER_IDS: &str = "user_ids";
111
112 pub const ROOM_STATE: &str = "room_state";
113 pub const ROOM_INFOS: &str = "room_infos";
114 pub const PRESENCE: &str = "presence";
115 pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
116 pub const ROOM_SEND_QUEUE: &str = "room_send_queue";
118 pub const DEPENDENT_SEND_QUEUE: &str = "room_dependent_send_queue";
120 pub const THREAD_SUBSCRIPTIONS: &str = "room_thread_subscriptions";
121
122 pub const STRIPPED_ROOM_STATE: &str = "stripped_room_state";
123 pub const STRIPPED_USER_IDS: &str = "stripped_user_ids";
124
125 pub const ROOM_USER_RECEIPTS: &str = "room_user_receipts";
126 pub const ROOM_EVENT_RECEIPTS: &str = "room_event_receipts";
127
128 pub const CUSTOM: &str = "custom";
129 pub const KV: &str = "kv";
130
131 pub const ALL_STORES: &[&str] = &[
133 ACCOUNT_DATA,
134 PROFILES,
135 DISPLAY_NAMES,
136 USER_IDS,
137 ROOM_STATE,
138 ROOM_INFOS,
139 PRESENCE,
140 ROOM_ACCOUNT_DATA,
141 STRIPPED_ROOM_STATE,
142 STRIPPED_USER_IDS,
143 ROOM_USER_RECEIPTS,
144 ROOM_EVENT_RECEIPTS,
145 ROOM_SEND_QUEUE,
146 THREAD_SUBSCRIPTIONS,
147 DEPENDENT_SEND_QUEUE,
148 CUSTOM,
149 KV,
150 ];
151
152 pub const STORE_KEY: &str = "store_key";
155}
156
157pub use keys::ALL_STORES;
158use matrix_sdk_base::store::QueueWedgeError;
159
160fn serialize_value(store_cipher: Option<&StoreCipher>, event: &impl Serialize) -> Result<JsValue> {
162 Ok(match store_cipher {
163 Some(cipher) => {
164 let data = serde_json::to_vec(event)?;
165 JsValue::from_serde(&cipher.encrypt_value_data(data)?)?
166 }
167 None => JsValue::from_serde(event)?,
168 })
169}
170
171fn deserialize_value<T: DeserializeOwned>(
173 store_cipher: Option<&StoreCipher>,
174 event: &JsValue,
175) -> Result<T> {
176 match store_cipher {
177 Some(cipher) => {
178 use zeroize::Zeroize;
179 let mut plaintext = cipher.decrypt_value_data(event.into_serde()?)?;
180 let ret = serde_json::from_slice(&plaintext);
181 plaintext.zeroize();
182 Ok(ret?)
183 }
184 None => Ok(event.into_serde()?),
185 }
186}
187
188fn encode_key<T>(store_cipher: Option<&StoreCipher>, table_name: &str, key: T) -> JsValue
189where
190 T: SafeEncode,
191{
192 match store_cipher {
193 Some(cipher) => key.as_secure_string(table_name, cipher),
194 None => key.as_encoded_string(),
195 }
196 .into()
197}
198
199fn encode_to_range<T>(
200 store_cipher: Option<&StoreCipher>,
201 table_name: &str,
202 key: T,
203) -> Result<IdbKeyRange>
204where
205 T: SafeEncode,
206{
207 match store_cipher {
208 Some(cipher) => key.encode_to_range_secure(table_name, cipher),
209 None => key.encode_to_range(),
210 }
211 .map_err(|e| IndexeddbStateStoreError::StoreError(StoreError::Backend(anyhow!(e).into())))
212}
213
214#[derive(Debug)]
216pub struct IndexeddbStateStoreBuilder {
217 name: Option<String>,
218 passphrase: Option<String>,
219 migration_conflict_strategy: MigrationConflictStrategy,
220}
221
222impl IndexeddbStateStoreBuilder {
223 fn new() -> Self {
224 Self {
225 name: None,
226 passphrase: None,
227 migration_conflict_strategy: MigrationConflictStrategy::BackupAndDrop,
228 }
229 }
230
231 pub fn name(mut self, value: String) -> Self {
233 self.name = Some(value);
234 self
235 }
236
237 pub fn passphrase(mut self, value: String) -> Self {
241 self.passphrase = Some(value);
242 self
243 }
244
245 pub fn migration_conflict_strategy(mut self, value: MigrationConflictStrategy) -> Self {
249 self.migration_conflict_strategy = value;
250 self
251 }
252
253 pub async fn build(self) -> Result<IndexeddbStateStore> {
254 let migration_strategy = self.migration_conflict_strategy.clone();
255 let name = self.name.unwrap_or_else(|| "state".to_owned());
256
257 let meta_name = format!("{name}::{}", keys::INTERNAL_STATE);
258
259 let (meta, store_cipher) = upgrade_meta_db(&meta_name, self.passphrase.as_deref()).await?;
260 let inner =
261 upgrade_inner_db(&name, store_cipher.as_deref(), migration_strategy, &meta).await?;
262
263 Ok(IndexeddbStateStore { name, inner, meta, store_cipher })
264 }
265}
266
267pub struct IndexeddbStateStore {
268 name: String,
269 pub(crate) inner: IdbDatabase,
270 pub(crate) meta: IdbDatabase,
271 pub(crate) store_cipher: Option<Arc<StoreCipher>>,
272}
273
274#[cfg(not(tarpaulin_include))]
275impl std::fmt::Debug for IndexeddbStateStore {
276 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
277 f.debug_struct("IndexeddbStateStore").field("name", &self.name).finish()
278 }
279}
280
281type Result<A, E = IndexeddbStateStoreError> = std::result::Result<A, E>;
282
283impl IndexeddbStateStore {
284 pub fn builder() -> IndexeddbStateStoreBuilder {
286 IndexeddbStateStoreBuilder::new()
287 }
288
289 pub fn version(&self) -> u32 {
291 self.inner.version() as u32
292 }
293
294 pub fn meta_version(&self) -> u32 {
296 self.meta.version() as u32
297 }
298
299 pub async fn has_backups(&self) -> Result<bool> {
301 Ok(self
302 .meta
303 .transaction_on_one_with_mode(keys::BACKUPS_META, IdbTransactionMode::Readonly)?
304 .object_store(keys::BACKUPS_META)?
305 .count()?
306 .await?
307 > 0)
308 }
309
310 pub async fn latest_backup(&self) -> Result<Option<String>> {
312 Ok(self
313 .meta
314 .transaction_on_one_with_mode(keys::BACKUPS_META, IdbTransactionMode::Readonly)?
315 .object_store(keys::BACKUPS_META)?
316 .open_cursor_with_direction(IdbCursorDirection::Prev)?
317 .await?
318 .and_then(|c| c.value().as_string()))
319 }
320
321 fn serialize_value(&self, event: &impl Serialize) -> Result<JsValue> {
323 serialize_value(self.store_cipher.as_deref(), event)
324 }
325
326 fn deserialize_value<T: DeserializeOwned>(&self, event: &JsValue) -> Result<T> {
328 deserialize_value(self.store_cipher.as_deref(), event)
329 }
330
331 fn encode_key<T>(&self, table_name: &str, key: T) -> JsValue
332 where
333 T: SafeEncode,
334 {
335 encode_key(self.store_cipher.as_deref(), table_name, key)
336 }
337
338 fn encode_to_range<T>(&self, table_name: &str, key: T) -> Result<IdbKeyRange>
339 where
340 T: SafeEncode,
341 {
342 encode_to_range(self.store_cipher.as_deref(), table_name, key)
343 }
344
345 pub async fn get_user_ids_inner(
348 &self,
349 room_id: &RoomId,
350 memberships: RoomMemberships,
351 stripped: bool,
352 ) -> Result<Vec<OwnedUserId>> {
353 let store_name = if stripped { keys::STRIPPED_USER_IDS } else { keys::USER_IDS };
354
355 let tx =
356 self.inner.transaction_on_one_with_mode(store_name, IdbTransactionMode::Readonly)?;
357 let store = tx.object_store(store_name)?;
358 let range = self.encode_to_range(store_name, room_id)?;
359
360 let user_ids = if memberships.is_empty() {
361 store
363 .get_all_with_key(&range)?
364 .await?
365 .iter()
366 .filter_map(|f| self.deserialize_value::<RoomMember>(&f).ok().map(|m| m.user_id))
367 .collect::<Vec<_>>()
368 } else {
369 let mut user_ids = Vec::new();
370 let cursor = store.open_cursor_with_range(&range)?.await?;
371
372 if let Some(cursor) = cursor {
373 loop {
374 let value = cursor.value();
375 let member = self.deserialize_value::<RoomMember>(&value)?;
376
377 if memberships.matches(&member.membership) {
378 user_ids.push(member.user_id);
379 }
380
381 if !cursor.continue_cursor()?.await? {
382 break;
383 }
384 }
385 }
386
387 user_ids
388 };
389
390 Ok(user_ids)
391 }
392
393 async fn get_custom_value_for_js(&self, jskey: &JsValue) -> Result<Option<Vec<u8>>> {
394 self.inner
395 .transaction_on_one_with_mode(keys::CUSTOM, IdbTransactionMode::Readonly)?
396 .object_store(keys::CUSTOM)?
397 .get(jskey)?
398 .await?
399 .map(|f| self.deserialize_value(&f))
400 .transpose()
401 }
402
403 fn encode_kv_data_key(&self, key: StateStoreDataKey<'_>) -> JsValue {
404 match key {
408 StateStoreDataKey::SyncToken => {
409 self.encode_key(StateStoreDataKey::SYNC_TOKEN, StateStoreDataKey::SYNC_TOKEN)
410 }
411 StateStoreDataKey::ServerInfo => {
412 self.encode_key(StateStoreDataKey::SERVER_INFO, StateStoreDataKey::SERVER_INFO)
413 }
414 StateStoreDataKey::Filter(filter_name) => {
415 self.encode_key(StateStoreDataKey::FILTER, (StateStoreDataKey::FILTER, filter_name))
416 }
417 StateStoreDataKey::UserAvatarUrl(user_id) => {
418 self.encode_key(keys::KV, (StateStoreDataKey::USER_AVATAR_URL, user_id))
419 }
420 StateStoreDataKey::RecentlyVisitedRooms(user_id) => {
421 self.encode_key(keys::KV, (StateStoreDataKey::RECENTLY_VISITED_ROOMS, user_id))
422 }
423 StateStoreDataKey::UtdHookManagerData => {
424 self.encode_key(keys::KV, StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
425 }
426 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
427 self.encode_key(keys::KV, StateStoreDataKey::ONE_TIME_KEY_ALREADY_UPLOADED)
428 }
429 StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
430 if let Some(thread_root) = thread_root {
431 self.encode_key(
432 keys::KV,
433 (StateStoreDataKey::COMPOSER_DRAFT, (room_id, thread_root)),
434 )
435 } else {
436 self.encode_key(keys::KV, (StateStoreDataKey::COMPOSER_DRAFT, room_id))
437 }
438 }
439 StateStoreDataKey::SeenKnockRequests(room_id) => {
440 self.encode_key(keys::KV, (StateStoreDataKey::SEEN_KNOCK_REQUESTS, room_id))
441 }
442 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
443 self.encode_key(keys::KV, StateStoreDataKey::THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS)
444 }
445 }
446 }
447}
448
449#[derive(Serialize, Deserialize)]
452struct PersistedQueuedRequest {
453 pub room_id: OwnedRoomId,
455
456 kind: Option<QueuedRequestKind>,
459 transaction_id: OwnedTransactionId,
460
461 pub error: Option<QueueWedgeError>,
462
463 priority: Option<usize>,
464
465 #[serde(default = "created_now")]
467 created_at: MilliSecondsSinceUnixEpoch,
468
469 is_wedged: Option<bool>,
472
473 event: Option<SerializableEventContent>,
474}
475
476fn created_now() -> MilliSecondsSinceUnixEpoch {
477 MilliSecondsSinceUnixEpoch::now()
478}
479
480impl PersistedQueuedRequest {
481 fn into_queued_request(self) -> Option<QueuedRequest> {
482 let kind =
483 self.kind.or_else(|| self.event.map(|content| QueuedRequestKind::Event { content }))?;
484
485 let error = match self.is_wedged {
486 Some(true) => {
487 Some(QueueWedgeError::GenericApiError {
489 msg: "local echo failed to send in a previous session".into(),
490 })
491 }
492 _ => self.error,
493 };
494
495 let priority = self.priority.unwrap_or(0);
497
498 Some(QueuedRequest {
499 kind,
500 transaction_id: self.transaction_id,
501 error,
502 priority,
503 created_at: self.created_at,
504 })
505 }
506}
507
508#[derive(Serialize, Deserialize, PartialEq)]
509struct PersistedThreadSubscription {
510 status: String,
511 bump_stamp: Option<u64>,
512}
513
514impl From<StoredThreadSubscription> for PersistedThreadSubscription {
515 fn from(value: StoredThreadSubscription) -> Self {
516 Self { status: value.status.as_str().to_owned(), bump_stamp: value.bump_stamp }
517 }
518}
519
520#[cfg(target_family = "wasm")]
529macro_rules! impl_state_store {
530 ({ $($body:tt)* }) => {
531 #[async_trait(?Send)]
532 impl StateStore for IndexeddbStateStore {
533 type Error = IndexeddbStateStoreError;
534
535 $($body)*
536 }
537 };
538}
539
540#[cfg(not(target_family = "wasm"))]
541macro_rules! impl_state_store {
542 ({ $($body:tt)* }) => {
543 impl IndexeddbStateStore {
544 $($body)*
545 }
546 };
547}
548
549impl_state_store!({
550 async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
551 let encoded_key = self.encode_kv_data_key(key);
552
553 let value = self
554 .inner
555 .transaction_on_one_with_mode(keys::KV, IdbTransactionMode::Readonly)?
556 .object_store(keys::KV)?
557 .get(&encoded_key)?
558 .await?;
559
560 let value = match key {
561 StateStoreDataKey::SyncToken => value
562 .map(|f| self.deserialize_value::<String>(&f))
563 .transpose()?
564 .map(StateStoreDataValue::SyncToken),
565 StateStoreDataKey::ServerInfo => value
566 .map(|f| self.deserialize_value::<ServerInfo>(&f))
567 .transpose()?
568 .map(StateStoreDataValue::ServerInfo),
569 StateStoreDataKey::Filter(_) => value
570 .map(|f| self.deserialize_value::<String>(&f))
571 .transpose()?
572 .map(StateStoreDataValue::Filter),
573 StateStoreDataKey::UserAvatarUrl(_) => value
574 .map(|f| self.deserialize_value::<OwnedMxcUri>(&f))
575 .transpose()?
576 .map(StateStoreDataValue::UserAvatarUrl),
577 StateStoreDataKey::RecentlyVisitedRooms(_) => value
578 .map(|f| self.deserialize_value::<Vec<OwnedRoomId>>(&f))
579 .transpose()?
580 .map(StateStoreDataValue::RecentlyVisitedRooms),
581 StateStoreDataKey::UtdHookManagerData => value
582 .map(|f| self.deserialize_value::<GrowableBloom>(&f))
583 .transpose()?
584 .map(StateStoreDataValue::UtdHookManagerData),
585 StateStoreDataKey::OneTimeKeyAlreadyUploaded => value
586 .map(|f| self.deserialize_value::<bool>(&f))
587 .transpose()?
588 .map(|_| StateStoreDataValue::OneTimeKeyAlreadyUploaded),
589 StateStoreDataKey::ComposerDraft(_, _) => value
590 .map(|f| self.deserialize_value::<ComposerDraft>(&f))
591 .transpose()?
592 .map(StateStoreDataValue::ComposerDraft),
593 StateStoreDataKey::SeenKnockRequests(_) => value
594 .map(|f| self.deserialize_value::<BTreeMap<OwnedEventId, OwnedUserId>>(&f))
595 .transpose()?
596 .map(StateStoreDataValue::SeenKnockRequests),
597 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => value
598 .map(|f| self.deserialize_value::<Vec<ThreadSubscriptionCatchupToken>>(&f))
599 .transpose()?
600 .map(StateStoreDataValue::ThreadSubscriptionsCatchupTokens),
601 };
602
603 Ok(value)
604 }
605
606 async fn set_kv_data(
607 &self,
608 key: StateStoreDataKey<'_>,
609 value: StateStoreDataValue,
610 ) -> Result<()> {
611 let encoded_key = self.encode_kv_data_key(key);
612
613 let serialized_value = match key {
614 StateStoreDataKey::SyncToken => self
615 .serialize_value(&value.into_sync_token().expect("Session data not a sync token")),
616 StateStoreDataKey::ServerInfo => self.serialize_value(
617 &value.into_server_info().expect("Session data not containing server info"),
618 ),
619 StateStoreDataKey::Filter(_) => {
620 self.serialize_value(&value.into_filter().expect("Session data not a filter"))
621 }
622 StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
623 &value.into_user_avatar_url().expect("Session data not an user avatar url"),
624 ),
625 StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
626 &value
627 .into_recently_visited_rooms()
628 .expect("Session data not a recently visited room list"),
629 ),
630 StateStoreDataKey::UtdHookManagerData => self.serialize_value(
631 &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
632 ),
633 StateStoreDataKey::OneTimeKeyAlreadyUploaded => self.serialize_value(&true),
634 StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
635 &value.into_composer_draft().expect("Session data not a composer draft"),
636 ),
637 StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
638 &value
639 .into_seen_knock_requests()
640 .expect("Session data is not a set of seen knock request ids"),
641 ),
642 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => self.serialize_value(
643 &value
644 .into_thread_subscriptions_catchup_tokens()
645 .expect("Session data is not a list of thread subscription catchup tokens"),
646 ),
647 };
648
649 let tx =
650 self.inner.transaction_on_one_with_mode(keys::KV, IdbTransactionMode::Readwrite)?;
651
652 let obj = tx.object_store(keys::KV)?;
653
654 obj.put_key_val(&encoded_key, &serialized_value?)?;
655
656 tx.await.into_result()?;
657
658 Ok(())
659 }
660
661 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
662 let encoded_key = self.encode_kv_data_key(key);
663
664 let tx =
665 self.inner.transaction_on_one_with_mode(keys::KV, IdbTransactionMode::Readwrite)?;
666 let obj = tx.object_store(keys::KV)?;
667
668 obj.delete(&encoded_key)?;
669
670 tx.await.into_result()?;
671
672 Ok(())
673 }
674
675 async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
676 let mut stores: HashSet<&'static str> = [
677 (changes.sync_token.is_some(), keys::KV),
678 (!changes.ambiguity_maps.is_empty(), keys::DISPLAY_NAMES),
679 (!changes.account_data.is_empty(), keys::ACCOUNT_DATA),
680 (!changes.presence.is_empty(), keys::PRESENCE),
681 (
682 !changes.profiles.is_empty() || !changes.profiles_to_delete.is_empty(),
683 keys::PROFILES,
684 ),
685 (!changes.room_account_data.is_empty(), keys::ROOM_ACCOUNT_DATA),
686 (!changes.receipts.is_empty(), keys::ROOM_EVENT_RECEIPTS),
687 ]
688 .iter()
689 .filter_map(|(id, key)| if *id { Some(*key) } else { None })
690 .collect();
691
692 if !changes.state.is_empty() {
693 stores.extend([
694 keys::ROOM_STATE,
695 keys::USER_IDS,
696 keys::STRIPPED_USER_IDS,
697 keys::STRIPPED_ROOM_STATE,
698 keys::PROFILES,
699 ]);
700 }
701
702 if !changes.redactions.is_empty() {
703 stores.extend([keys::ROOM_STATE, keys::ROOM_INFOS]);
704 }
705
706 if !changes.room_infos.is_empty() {
707 stores.insert(keys::ROOM_INFOS);
708 }
709
710 if !changes.stripped_state.is_empty() {
711 stores.extend([keys::STRIPPED_ROOM_STATE, keys::STRIPPED_USER_IDS]);
712 }
713
714 if !changes.receipts.is_empty() {
715 stores.extend([keys::ROOM_EVENT_RECEIPTS, keys::ROOM_USER_RECEIPTS])
716 }
717
718 if stores.is_empty() {
719 return Ok(());
721 }
722
723 let stores: Vec<&'static str> = stores.into_iter().collect();
724 let tx =
725 self.inner.transaction_on_multi_with_mode(&stores, IdbTransactionMode::Readwrite)?;
726
727 if let Some(s) = &changes.sync_token {
728 tx.object_store(keys::KV)?.put_key_val(
729 &self.encode_kv_data_key(StateStoreDataKey::SyncToken),
730 &self.serialize_value(s)?,
731 )?;
732 }
733
734 if !changes.ambiguity_maps.is_empty() {
735 let store = tx.object_store(keys::DISPLAY_NAMES)?;
736 for (room_id, ambiguity_maps) in &changes.ambiguity_maps {
737 for (display_name, map) in ambiguity_maps {
738 let key = self.encode_key(
739 keys::DISPLAY_NAMES,
740 (
741 room_id,
742 display_name
743 .as_normalized_str()
744 .unwrap_or_else(|| display_name.as_raw_str()),
745 ),
746 );
747
748 store.put_key_val(&key, &self.serialize_value(&map)?)?;
749 }
750 }
751 }
752
753 if !changes.account_data.is_empty() {
754 let store = tx.object_store(keys::ACCOUNT_DATA)?;
755 for (event_type, event) in &changes.account_data {
756 store.put_key_val(
757 &self.encode_key(keys::ACCOUNT_DATA, event_type),
758 &self.serialize_value(&event)?,
759 )?;
760 }
761 }
762
763 if !changes.room_account_data.is_empty() {
764 let store = tx.object_store(keys::ROOM_ACCOUNT_DATA)?;
765 for (room, events) in &changes.room_account_data {
766 for (event_type, event) in events {
767 let key = self.encode_key(keys::ROOM_ACCOUNT_DATA, (room, event_type));
768 store.put_key_val(&key, &self.serialize_value(&event)?)?;
769 }
770 }
771 }
772
773 if !changes.state.is_empty() {
774 let state = tx.object_store(keys::ROOM_STATE)?;
775 let profiles = tx.object_store(keys::PROFILES)?;
776 let user_ids = tx.object_store(keys::USER_IDS)?;
777 let stripped_state = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
778 let stripped_user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
779
780 for (room, user_ids) in &changes.profiles_to_delete {
781 for user_id in user_ids {
782 let key = self.encode_key(keys::PROFILES, (room, user_id));
783 profiles.delete(&key)?;
784 }
785 }
786
787 for (room, event_types) in &changes.state {
788 let profile_changes = changes.profiles.get(room);
789
790 for (event_type, events) in event_types {
791 for (state_key, raw_event) in events {
792 let key = self.encode_key(keys::ROOM_STATE, (room, event_type, state_key));
793 state.put_key_val(&key, &self.serialize_value(&raw_event)?)?;
794 stripped_state.delete(&key)?;
795
796 if *event_type == StateEventType::RoomMember {
797 let event =
798 match raw_event.deserialize_as_unchecked::<SyncRoomMemberEvent>() {
799 Ok(ev) => ev,
800 Err(e) => {
801 let event_id: Option<String> =
802 raw_event.get_field("event_id").ok().flatten();
803 debug!(event_id, "Failed to deserialize member event: {e}");
804 continue;
805 }
806 };
807
808 let key = (room, state_key);
809
810 stripped_user_ids
811 .delete(&self.encode_key(keys::STRIPPED_USER_IDS, key))?;
812
813 user_ids.put_key_val_owned(
814 self.encode_key(keys::USER_IDS, key),
815 &self.serialize_value(&RoomMember::from(&event))?,
816 )?;
817
818 if let Some(profile) =
819 profile_changes.and_then(|p| p.get(event.state_key()))
820 {
821 profiles.put_key_val_owned(
822 self.encode_key(keys::PROFILES, key),
823 &self.serialize_value(&profile)?,
824 )?;
825 }
826 }
827 }
828 }
829 }
830 }
831
832 if !changes.room_infos.is_empty() {
833 let room_infos = tx.object_store(keys::ROOM_INFOS)?;
834 for (room_id, room_info) in &changes.room_infos {
835 room_infos.put_key_val(
836 &self.encode_key(keys::ROOM_INFOS, room_id),
837 &self.serialize_value(&room_info)?,
838 )?;
839 }
840 }
841
842 if !changes.presence.is_empty() {
843 let store = tx.object_store(keys::PRESENCE)?;
844 for (sender, event) in &changes.presence {
845 store.put_key_val(
846 &self.encode_key(keys::PRESENCE, sender),
847 &self.serialize_value(&event)?,
848 )?;
849 }
850 }
851
852 if !changes.stripped_state.is_empty() {
853 let store = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
854 let user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
855
856 for (room, event_types) in &changes.stripped_state {
857 for (event_type, events) in event_types {
858 for (state_key, raw_event) in events {
859 let key = self
860 .encode_key(keys::STRIPPED_ROOM_STATE, (room, event_type, state_key));
861 store.put_key_val(&key, &self.serialize_value(&raw_event)?)?;
862
863 if *event_type == StateEventType::RoomMember {
864 let event = match raw_event
865 .deserialize_as_unchecked::<StrippedRoomMemberEvent>()
866 {
867 Ok(ev) => ev,
868 Err(e) => {
869 let event_id: Option<String> =
870 raw_event.get_field("event_id").ok().flatten();
871 debug!(
872 event_id,
873 "Failed to deserialize stripped member event: {e}"
874 );
875 continue;
876 }
877 };
878
879 let key = (room, state_key);
880
881 user_ids.put_key_val_owned(
882 self.encode_key(keys::STRIPPED_USER_IDS, key),
883 &self.serialize_value(&RoomMember::from(&event))?,
884 )?;
885 }
886 }
887 }
888 }
889 }
890
891 if !changes.receipts.is_empty() {
892 let room_user_receipts = tx.object_store(keys::ROOM_USER_RECEIPTS)?;
893 let room_event_receipts = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
894
895 for (room, content) in &changes.receipts {
896 for (event_id, receipts) in &content.0 {
897 for (receipt_type, receipts) in receipts {
898 for (user_id, receipt) in receipts {
899 let key = match receipt.thread.as_str() {
900 Some(thread_id) => self.encode_key(
901 keys::ROOM_USER_RECEIPTS,
902 (room, receipt_type, thread_id, user_id),
903 ),
904 None => self.encode_key(
905 keys::ROOM_USER_RECEIPTS,
906 (room, receipt_type, user_id),
907 ),
908 };
909
910 if let Some((old_event, _)) =
911 room_user_receipts.get(&key)?.await?.and_then(|f| {
912 self.deserialize_value::<(OwnedEventId, Receipt)>(&f).ok()
913 })
914 {
915 let key = match receipt.thread.as_str() {
916 Some(thread_id) => self.encode_key(
917 keys::ROOM_EVENT_RECEIPTS,
918 (room, receipt_type, thread_id, old_event, user_id),
919 ),
920 None => self.encode_key(
921 keys::ROOM_EVENT_RECEIPTS,
922 (room, receipt_type, old_event, user_id),
923 ),
924 };
925 room_event_receipts.delete(&key)?;
926 }
927
928 room_user_receipts
929 .put_key_val(&key, &self.serialize_value(&(event_id, receipt))?)?;
930
931 let key = match receipt.thread.as_str() {
933 Some(thread_id) => self.encode_key(
934 keys::ROOM_EVENT_RECEIPTS,
935 (room, receipt_type, thread_id, event_id, user_id),
936 ),
937 None => self.encode_key(
938 keys::ROOM_EVENT_RECEIPTS,
939 (room, receipt_type, event_id, user_id),
940 ),
941 };
942 room_event_receipts
943 .put_key_val(&key, &self.serialize_value(&(user_id, receipt))?)?;
944 }
945 }
946 }
947 }
948 }
949
950 if !changes.redactions.is_empty() {
951 let state = tx.object_store(keys::ROOM_STATE)?;
952 let room_info = tx.object_store(keys::ROOM_INFOS)?;
953
954 for (room_id, redactions) in &changes.redactions {
955 let range = self.encode_to_range(keys::ROOM_STATE, room_id)?;
956 let Some(cursor) = state.open_cursor_with_range(&range)?.await? else { continue };
957
958 let mut redaction_rules = None;
959
960 while let Some(key) = cursor.key() {
961 let raw_evt =
962 self.deserialize_value::<Raw<AnySyncStateEvent>>(&cursor.value())?;
963 if let Ok(Some(event_id)) = raw_evt.get_field::<OwnedEventId>("event_id") {
964 if let Some(redaction) = redactions.get(&event_id) {
965 let redaction_rules = {
966 if redaction_rules.is_none() {
967 redaction_rules.replace(room_info
968 .get(&self.encode_key(keys::ROOM_INFOS, room_id))?
969 .await?
970 .and_then(|f| self.deserialize_value::<RoomInfo>(&f).ok())
971 .map(|info| info.room_version_rules_or_default())
972 .unwrap_or_else(|| {
973 warn!(?room_id, "Unable to get the room version rules, defaulting to rules for room version {ROOM_VERSION_FALLBACK}");
974 ROOM_VERSION_RULES_FALLBACK
975 }).redaction
976 );
977 }
978 redaction_rules.as_ref().unwrap()
979 };
980
981 let redacted = redact(
982 raw_evt.deserialize_as::<CanonicalJsonObject>()?,
983 redaction_rules,
984 Some(RedactedBecause::from_raw_event(redaction)?),
985 )
986 .map_err(StoreError::Redaction)?;
987 state.put_key_val(&key, &self.serialize_value(&redacted)?)?;
988 }
989 }
990
991 cursor.advance(1)?.await?;
993 }
994 }
995 }
996
997 tx.await.into_result().map_err(|e| e.into())
998 }
999
1000 async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1001 self.inner
1002 .transaction_on_one_with_mode(keys::PRESENCE, IdbTransactionMode::Readonly)?
1003 .object_store(keys::PRESENCE)?
1004 .get(&self.encode_key(keys::PRESENCE, user_id))?
1005 .await?
1006 .map(|f| self.deserialize_value(&f))
1007 .transpose()
1008 }
1009
1010 async fn get_presence_events(
1011 &self,
1012 user_ids: &[OwnedUserId],
1013 ) -> Result<Vec<Raw<PresenceEvent>>> {
1014 if user_ids.is_empty() {
1015 return Ok(Vec::new());
1016 }
1017
1018 let txn = self
1019 .inner
1020 .transaction_on_one_with_mode(keys::PRESENCE, IdbTransactionMode::Readonly)?;
1021 let store = txn.object_store(keys::PRESENCE)?;
1022
1023 let mut events = Vec::with_capacity(user_ids.len());
1024
1025 for user_id in user_ids {
1026 if let Some(event) = store
1027 .get(&self.encode_key(keys::PRESENCE, user_id))?
1028 .await?
1029 .map(|f| self.deserialize_value(&f))
1030 .transpose()?
1031 {
1032 events.push(event)
1033 }
1034 }
1035
1036 Ok(events)
1037 }
1038
1039 async fn get_state_event(
1040 &self,
1041 room_id: &RoomId,
1042 event_type: StateEventType,
1043 state_key: &str,
1044 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1045 Ok(self
1046 .get_state_events_for_keys(room_id, event_type, &[state_key])
1047 .await?
1048 .into_iter()
1049 .next())
1050 }
1051
1052 async fn get_state_events(
1053 &self,
1054 room_id: &RoomId,
1055 event_type: StateEventType,
1056 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1057 let stripped_range =
1058 self.encode_to_range(keys::STRIPPED_ROOM_STATE, (room_id, &event_type))?;
1059 let stripped_events = self
1060 .inner
1061 .transaction_on_one_with_mode(keys::STRIPPED_ROOM_STATE, IdbTransactionMode::Readonly)?
1062 .object_store(keys::STRIPPED_ROOM_STATE)?
1063 .get_all_with_key(&stripped_range)?
1064 .await?
1065 .iter()
1066 .filter_map(|f| {
1067 self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Stripped)
1068 })
1069 .collect::<Vec<_>>();
1070
1071 if !stripped_events.is_empty() {
1072 return Ok(stripped_events);
1073 }
1074
1075 let range = self.encode_to_range(keys::ROOM_STATE, (room_id, event_type))?;
1076 Ok(self
1077 .inner
1078 .transaction_on_one_with_mode(keys::ROOM_STATE, IdbTransactionMode::Readonly)?
1079 .object_store(keys::ROOM_STATE)?
1080 .get_all_with_key(&range)?
1081 .await?
1082 .iter()
1083 .filter_map(|f| self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Sync))
1084 .collect::<Vec<_>>())
1085 }
1086
1087 async fn get_state_events_for_keys(
1088 &self,
1089 room_id: &RoomId,
1090 event_type: StateEventType,
1091 state_keys: &[&str],
1092 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1093 if state_keys.is_empty() {
1094 return Ok(Vec::new());
1095 }
1096
1097 let mut events = Vec::with_capacity(state_keys.len());
1098
1099 {
1100 let txn = self.inner.transaction_on_one_with_mode(
1101 keys::STRIPPED_ROOM_STATE,
1102 IdbTransactionMode::Readonly,
1103 )?;
1104 let store = txn.object_store(keys::STRIPPED_ROOM_STATE)?;
1105
1106 for state_key in state_keys {
1107 if let Some(event) =
1108 store
1109 .get(&self.encode_key(
1110 keys::STRIPPED_ROOM_STATE,
1111 (room_id, &event_type, state_key),
1112 ))?
1113 .await?
1114 .map(|f| self.deserialize_value(&f))
1115 .transpose()?
1116 {
1117 events.push(RawAnySyncOrStrippedState::Stripped(event));
1118 }
1119 }
1120
1121 if !events.is_empty() {
1122 return Ok(events);
1123 }
1124 }
1125
1126 let txn = self
1127 .inner
1128 .transaction_on_one_with_mode(keys::ROOM_STATE, IdbTransactionMode::Readonly)?;
1129 let store = txn.object_store(keys::ROOM_STATE)?;
1130
1131 for state_key in state_keys {
1132 if let Some(event) = store
1133 .get(&self.encode_key(keys::ROOM_STATE, (room_id, &event_type, state_key)))?
1134 .await?
1135 .map(|f| self.deserialize_value(&f))
1136 .transpose()?
1137 {
1138 events.push(RawAnySyncOrStrippedState::Sync(event));
1139 }
1140 }
1141
1142 Ok(events)
1143 }
1144
1145 async fn get_profile(
1146 &self,
1147 room_id: &RoomId,
1148 user_id: &UserId,
1149 ) -> Result<Option<MinimalRoomMemberEvent>> {
1150 self.inner
1151 .transaction_on_one_with_mode(keys::PROFILES, IdbTransactionMode::Readonly)?
1152 .object_store(keys::PROFILES)?
1153 .get(&self.encode_key(keys::PROFILES, (room_id, user_id)))?
1154 .await?
1155 .map(|f| self.deserialize_value(&f))
1156 .transpose()
1157 }
1158
1159 async fn get_profiles<'a>(
1160 &self,
1161 room_id: &RoomId,
1162 user_ids: &'a [OwnedUserId],
1163 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1164 if user_ids.is_empty() {
1165 return Ok(BTreeMap::new());
1166 }
1167
1168 let txn = self
1169 .inner
1170 .transaction_on_one_with_mode(keys::PROFILES, IdbTransactionMode::Readonly)?;
1171 let store = txn.object_store(keys::PROFILES)?;
1172
1173 let mut profiles = BTreeMap::new();
1174 for user_id in user_ids {
1175 if let Some(profile) = store
1176 .get(&self.encode_key(keys::PROFILES, (room_id, user_id)))?
1177 .await?
1178 .map(|f| self.deserialize_value(&f))
1179 .transpose()?
1180 {
1181 profiles.insert(user_id.as_ref(), profile);
1182 }
1183 }
1184
1185 Ok(profiles)
1186 }
1187
1188 async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1189 let transaction = self
1190 .inner
1191 .transaction_on_one_with_mode(keys::ROOM_INFOS, IdbTransactionMode::Readonly)?;
1192
1193 let object_store = transaction.object_store(keys::ROOM_INFOS)?;
1194
1195 Ok(match room_load_settings {
1196 RoomLoadSettings::All => object_store
1197 .get_all()?
1198 .await?
1199 .iter()
1200 .map(|room_info| self.deserialize_value::<RoomInfo>(&room_info))
1201 .collect::<Result<_>>()?,
1202
1203 RoomLoadSettings::One(room_id) => {
1204 match object_store.get(&self.encode_key(keys::ROOM_INFOS, room_id))?.await? {
1205 Some(room_info) => vec![self.deserialize_value::<RoomInfo>(&room_info)?],
1206 None => vec![],
1207 }
1208 }
1209 })
1210 }
1211
1212 async fn get_users_with_display_name(
1213 &self,
1214 room_id: &RoomId,
1215 display_name: &DisplayName,
1216 ) -> Result<BTreeSet<OwnedUserId>> {
1217 self.inner
1218 .transaction_on_one_with_mode(keys::DISPLAY_NAMES, IdbTransactionMode::Readonly)?
1219 .object_store(keys::DISPLAY_NAMES)?
1220 .get(&self.encode_key(
1221 keys::DISPLAY_NAMES,
1222 (
1223 room_id,
1224 display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1225 ),
1226 ))?
1227 .await?
1228 .map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
1229 .unwrap_or_else(|| Ok(Default::default()))
1230 }
1231
1232 async fn get_users_with_display_names<'a>(
1233 &self,
1234 room_id: &RoomId,
1235 display_names: &'a [DisplayName],
1236 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1237 let mut map = HashMap::new();
1238
1239 if display_names.is_empty() {
1240 return Ok(map);
1241 }
1242
1243 let txn = self
1244 .inner
1245 .transaction_on_one_with_mode(keys::DISPLAY_NAMES, IdbTransactionMode::Readonly)?;
1246 let store = txn.object_store(keys::DISPLAY_NAMES)?;
1247
1248 for display_name in display_names {
1249 if let Some(user_ids) = store
1250 .get(
1251 &self.encode_key(
1252 keys::DISPLAY_NAMES,
1253 (
1254 room_id,
1255 display_name
1256 .as_normalized_str()
1257 .unwrap_or_else(|| display_name.as_raw_str()),
1258 ),
1259 ),
1260 )?
1261 .await?
1262 .map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
1263 .transpose()?
1264 {
1265 map.insert(display_name, user_ids);
1266 }
1267 }
1268
1269 Ok(map)
1270 }
1271
1272 async fn get_account_data_event(
1273 &self,
1274 event_type: GlobalAccountDataEventType,
1275 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1276 self.inner
1277 .transaction_on_one_with_mode(keys::ACCOUNT_DATA, IdbTransactionMode::Readonly)?
1278 .object_store(keys::ACCOUNT_DATA)?
1279 .get(&self.encode_key(keys::ACCOUNT_DATA, event_type))?
1280 .await?
1281 .map(|f| self.deserialize_value(&f))
1282 .transpose()
1283 }
1284
1285 async fn get_room_account_data_event(
1286 &self,
1287 room_id: &RoomId,
1288 event_type: RoomAccountDataEventType,
1289 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1290 self.inner
1291 .transaction_on_one_with_mode(keys::ROOM_ACCOUNT_DATA, IdbTransactionMode::Readonly)?
1292 .object_store(keys::ROOM_ACCOUNT_DATA)?
1293 .get(&self.encode_key(keys::ROOM_ACCOUNT_DATA, (room_id, event_type)))?
1294 .await?
1295 .map(|f| self.deserialize_value(&f))
1296 .transpose()
1297 }
1298
1299 async fn get_user_room_receipt_event(
1300 &self,
1301 room_id: &RoomId,
1302 receipt_type: ReceiptType,
1303 thread: ReceiptThread,
1304 user_id: &UserId,
1305 ) -> Result<Option<(OwnedEventId, Receipt)>> {
1306 let key = match thread.as_str() {
1307 Some(thread_id) => self
1308 .encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, thread_id, user_id)),
1309 None => self.encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, user_id)),
1310 };
1311 self.inner
1312 .transaction_on_one_with_mode(keys::ROOM_USER_RECEIPTS, IdbTransactionMode::Readonly)?
1313 .object_store(keys::ROOM_USER_RECEIPTS)?
1314 .get(&key)?
1315 .await?
1316 .map(|f| self.deserialize_value(&f))
1317 .transpose()
1318 }
1319
1320 async fn get_event_room_receipt_events(
1321 &self,
1322 room_id: &RoomId,
1323 receipt_type: ReceiptType,
1324 thread: ReceiptThread,
1325 event_id: &EventId,
1326 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1327 let range = match thread.as_str() {
1328 Some(thread_id) => self.encode_to_range(
1329 keys::ROOM_EVENT_RECEIPTS,
1330 (room_id, receipt_type, thread_id, event_id),
1331 ),
1332 None => {
1333 self.encode_to_range(keys::ROOM_EVENT_RECEIPTS, (room_id, receipt_type, event_id))
1334 }
1335 }?;
1336 let tx = self.inner.transaction_on_one_with_mode(
1337 keys::ROOM_EVENT_RECEIPTS,
1338 IdbTransactionMode::Readonly,
1339 )?;
1340 let store = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
1341
1342 Ok(store
1343 .get_all_with_key(&range)?
1344 .await?
1345 .iter()
1346 .filter_map(|f| self.deserialize_value(&f).ok())
1347 .collect::<Vec<_>>())
1348 }
1349
1350 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1351 let jskey = &JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1352 self.get_custom_value_for_js(jskey).await
1353 }
1354
1355 async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1356 let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1357
1358 let prev = self.get_custom_value_for_js(&jskey).await?;
1359
1360 let tx =
1361 self.inner.transaction_on_one_with_mode(keys::CUSTOM, IdbTransactionMode::Readwrite)?;
1362
1363 tx.object_store(keys::CUSTOM)?.put_key_val(&jskey, &self.serialize_value(&value)?)?;
1364
1365 tx.await.into_result().map_err(IndexeddbStateStoreError::from)?;
1366 Ok(prev)
1367 }
1368
1369 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1370 let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1371
1372 let prev = self.get_custom_value_for_js(&jskey).await?;
1373
1374 let tx =
1375 self.inner.transaction_on_one_with_mode(keys::CUSTOM, IdbTransactionMode::Readwrite)?;
1376
1377 tx.object_store(keys::CUSTOM)?.delete(&jskey)?;
1378
1379 tx.await.into_result().map_err(IndexeddbStateStoreError::from)?;
1380 Ok(prev)
1381 }
1382
1383 async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1384 let direct_stores = [keys::ROOM_INFOS, keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE];
1386
1387 let prefixed_stores = [
1390 keys::PROFILES,
1391 keys::DISPLAY_NAMES,
1392 keys::USER_IDS,
1393 keys::ROOM_STATE,
1394 keys::ROOM_ACCOUNT_DATA,
1395 keys::ROOM_EVENT_RECEIPTS,
1396 keys::ROOM_USER_RECEIPTS,
1397 keys::STRIPPED_ROOM_STATE,
1398 keys::STRIPPED_USER_IDS,
1399 keys::THREAD_SUBSCRIPTIONS,
1400 ];
1401
1402 let all_stores = {
1403 let mut v = Vec::new();
1404 v.extend(prefixed_stores);
1405 v.extend(direct_stores);
1406 v
1407 };
1408
1409 let tx = self
1410 .inner
1411 .transaction_on_multi_with_mode(&all_stores, IdbTransactionMode::Readwrite)?;
1412
1413 for store_name in direct_stores {
1414 tx.object_store(store_name)?.delete(&self.encode_key(store_name, room_id))?;
1415 }
1416
1417 for store_name in prefixed_stores {
1418 let store = tx.object_store(store_name)?;
1419 let range = self.encode_to_range(store_name, room_id)?;
1420 for key in store.get_all_keys_with_key(&range)?.await?.iter() {
1421 store.delete(&key)?;
1422 }
1423 }
1424
1425 tx.await.into_result().map_err(|e| e.into())
1426 }
1427
1428 async fn get_user_ids(
1429 &self,
1430 room_id: &RoomId,
1431 memberships: RoomMemberships,
1432 ) -> Result<Vec<OwnedUserId>> {
1433 let ids = self.get_user_ids_inner(room_id, memberships, true).await?;
1434 if !ids.is_empty() {
1435 return Ok(ids);
1436 }
1437 self.get_user_ids_inner(room_id, memberships, false).await
1438 }
1439
1440 async fn save_send_queue_request(
1441 &self,
1442 room_id: &RoomId,
1443 transaction_id: OwnedTransactionId,
1444 created_at: MilliSecondsSinceUnixEpoch,
1445 kind: QueuedRequestKind,
1446 priority: usize,
1447 ) -> Result<()> {
1448 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1449
1450 let tx = self
1451 .inner
1452 .transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?;
1453
1454 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1455
1456 let prev = obj.get(&encoded_key)?.await?;
1461
1462 let mut prev = prev.map_or_else(
1463 || Ok(Vec::new()),
1464 |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1465 )?;
1466
1467 prev.push(PersistedQueuedRequest {
1469 room_id: room_id.to_owned(),
1470 kind: Some(kind),
1471 transaction_id,
1472 error: None,
1473 is_wedged: None,
1474 event: None,
1475 priority: Some(priority),
1476 created_at,
1477 });
1478
1479 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1481
1482 tx.await.into_result()?;
1483
1484 Ok(())
1485 }
1486
1487 async fn update_send_queue_request(
1488 &self,
1489 room_id: &RoomId,
1490 transaction_id: &TransactionId,
1491 kind: QueuedRequestKind,
1492 ) -> Result<bool> {
1493 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1494
1495 let tx = self
1496 .inner
1497 .transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?;
1498
1499 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1500
1501 let prev = obj.get(&encoded_key)?.await?;
1506
1507 let mut prev = prev.map_or_else(
1508 || Ok(Vec::new()),
1509 |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1510 )?;
1511
1512 if let Some(entry) = prev.iter_mut().find(|entry| entry.transaction_id == transaction_id) {
1514 entry.kind = Some(kind);
1515 entry.error = None;
1517 entry.is_wedged = None;
1519 entry.event = None;
1520
1521 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1523 tx.await.into_result()?;
1524
1525 Ok(true)
1526 } else {
1527 Ok(false)
1528 }
1529 }
1530
1531 async fn remove_send_queue_request(
1532 &self,
1533 room_id: &RoomId,
1534 transaction_id: &TransactionId,
1535 ) -> Result<bool> {
1536 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1537
1538 let tx = self.inner.transaction_on_multi_with_mode(
1539 &[keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE],
1540 IdbTransactionMode::Readwrite,
1541 )?;
1542
1543 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1544
1545 if let Some(val) = obj.get(&encoded_key)?.await? {
1550 let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
1551 if let Some(pos) = prev.iter().position(|item| item.transaction_id == transaction_id) {
1552 prev.remove(pos);
1553
1554 if prev.is_empty() {
1555 obj.delete(&encoded_key)?;
1556 } else {
1557 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1558 }
1559
1560 tx.await.into_result()?;
1561 return Ok(true);
1562 }
1563 }
1564
1565 Ok(false)
1566 }
1567
1568 async fn load_send_queue_requests(&self, room_id: &RoomId) -> Result<Vec<QueuedRequest>> {
1569 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1570
1571 let prev = self
1574 .inner
1575 .transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?
1576 .object_store(keys::ROOM_SEND_QUEUE)?
1577 .get(&encoded_key)?
1578 .await?;
1579
1580 let mut prev = prev.map_or_else(
1581 || Ok(Vec::new()),
1582 |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1583 )?;
1584
1585 prev.sort_by(|lhs, rhs| rhs.priority.unwrap_or(0).cmp(&lhs.priority.unwrap_or(0)));
1587
1588 Ok(prev.into_iter().filter_map(PersistedQueuedRequest::into_queued_request).collect())
1589 }
1590
1591 async fn update_send_queue_request_status(
1592 &self,
1593 room_id: &RoomId,
1594 transaction_id: &TransactionId,
1595 error: Option<QueueWedgeError>,
1596 ) -> Result<()> {
1597 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1598
1599 let tx = self
1600 .inner
1601 .transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?;
1602
1603 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1604
1605 if let Some(val) = obj.get(&encoded_key)?.await? {
1606 let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
1607 if let Some(request) =
1608 prev.iter_mut().find(|item| item.transaction_id == transaction_id)
1609 {
1610 request.is_wedged = None;
1611 request.error = error;
1612 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1613 }
1614 }
1615
1616 tx.await.into_result()?;
1617
1618 Ok(())
1619 }
1620
1621 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>> {
1622 let tx = self
1623 .inner
1624 .transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?;
1625
1626 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1627
1628 let all_entries = obj
1629 .get_all()?
1630 .await?
1631 .into_iter()
1632 .map(|item| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&item))
1633 .collect::<Result<Vec<Vec<PersistedQueuedRequest>>, _>>()?
1634 .into_iter()
1635 .flat_map(|vec| vec.into_iter().map(|item| item.room_id))
1636 .collect::<BTreeSet<_>>();
1637
1638 Ok(all_entries.into_iter().collect())
1639 }
1640
1641 async fn save_dependent_queued_request(
1642 &self,
1643 room_id: &RoomId,
1644 parent_txn_id: &TransactionId,
1645 own_txn_id: ChildTransactionId,
1646 created_at: MilliSecondsSinceUnixEpoch,
1647 content: DependentQueuedRequestKind,
1648 ) -> Result<()> {
1649 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1650
1651 let tx = self.inner.transaction_on_one_with_mode(
1652 keys::DEPENDENT_SEND_QUEUE,
1653 IdbTransactionMode::Readwrite,
1654 )?;
1655
1656 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1657
1658 let prev = obj.get(&encoded_key)?.await?;
1661
1662 let mut prev = prev.map_or_else(
1663 || Ok(Vec::new()),
1664 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1665 )?;
1666
1667 prev.push(DependentQueuedRequest {
1669 kind: content,
1670 parent_transaction_id: parent_txn_id.to_owned(),
1671 own_transaction_id: own_txn_id,
1672 parent_key: None,
1673 created_at,
1674 });
1675
1676 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1678
1679 tx.await.into_result()?;
1680
1681 Ok(())
1682 }
1683
1684 async fn update_dependent_queued_request(
1685 &self,
1686 room_id: &RoomId,
1687 own_transaction_id: &ChildTransactionId,
1688 new_content: DependentQueuedRequestKind,
1689 ) -> Result<bool> {
1690 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1691
1692 let tx = self.inner.transaction_on_one_with_mode(
1693 keys::DEPENDENT_SEND_QUEUE,
1694 IdbTransactionMode::Readwrite,
1695 )?;
1696
1697 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1698
1699 let prev = obj.get(&encoded_key)?.await?;
1702
1703 let mut prev = prev.map_or_else(
1704 || Ok(Vec::new()),
1705 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1706 )?;
1707
1708 let mut found = false;
1710 for entry in prev.iter_mut() {
1711 if entry.own_transaction_id == *own_transaction_id {
1712 found = true;
1713 entry.kind = new_content;
1714 break;
1715 }
1716 }
1717
1718 if found {
1719 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1720 tx.await.into_result()?;
1721 }
1722
1723 Ok(found)
1724 }
1725
1726 async fn mark_dependent_queued_requests_as_ready(
1727 &self,
1728 room_id: &RoomId,
1729 parent_txn_id: &TransactionId,
1730 parent_key: SentRequestKey,
1731 ) -> Result<usize> {
1732 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1733
1734 let tx = self.inner.transaction_on_one_with_mode(
1735 keys::DEPENDENT_SEND_QUEUE,
1736 IdbTransactionMode::Readwrite,
1737 )?;
1738
1739 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1740
1741 let prev = obj.get(&encoded_key)?.await?;
1744
1745 let mut prev = prev.map_or_else(
1746 || Ok(Vec::new()),
1747 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1748 )?;
1749
1750 let mut num_updated = 0;
1752 for entry in prev.iter_mut().filter(|entry| entry.parent_transaction_id == parent_txn_id) {
1753 entry.parent_key = Some(parent_key.clone());
1754 num_updated += 1;
1755 }
1756
1757 if num_updated > 0 {
1758 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1759 tx.await.into_result()?;
1760 }
1761
1762 Ok(num_updated)
1763 }
1764
1765 async fn remove_dependent_queued_request(
1766 &self,
1767 room_id: &RoomId,
1768 txn_id: &ChildTransactionId,
1769 ) -> Result<bool> {
1770 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1771
1772 let tx = self.inner.transaction_on_one_with_mode(
1773 keys::DEPENDENT_SEND_QUEUE,
1774 IdbTransactionMode::Readwrite,
1775 )?;
1776
1777 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1778
1779 if let Some(val) = obj.get(&encoded_key)?.await? {
1782 let mut prev = self.deserialize_value::<Vec<DependentQueuedRequest>>(&val)?;
1783 if let Some(pos) = prev.iter().position(|item| item.own_transaction_id == *txn_id) {
1784 prev.remove(pos);
1785
1786 if prev.is_empty() {
1787 obj.delete(&encoded_key)?;
1788 } else {
1789 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1790 }
1791
1792 tx.await.into_result()?;
1793 return Ok(true);
1794 }
1795 }
1796
1797 Ok(false)
1798 }
1799
1800 async fn load_dependent_queued_requests(
1801 &self,
1802 room_id: &RoomId,
1803 ) -> Result<Vec<DependentQueuedRequest>> {
1804 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1805
1806 let prev = self
1808 .inner
1809 .transaction_on_one_with_mode(
1810 keys::DEPENDENT_SEND_QUEUE,
1811 IdbTransactionMode::Readwrite,
1812 )?
1813 .object_store(keys::DEPENDENT_SEND_QUEUE)?
1814 .get(&encoded_key)?
1815 .await?;
1816
1817 prev.map_or_else(
1818 || Ok(Vec::new()),
1819 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1820 )
1821 }
1822
1823 async fn upsert_thread_subscription(
1824 &self,
1825 room: &RoomId,
1826 thread_id: &EventId,
1827 subscription: StoredThreadSubscription,
1828 ) -> Result<()> {
1829 let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
1830
1831 let tx = self.inner.transaction_on_one_with_mode(
1832 keys::THREAD_SUBSCRIPTIONS,
1833 IdbTransactionMode::Readwrite,
1834 )?;
1835 let obj = tx.object_store(keys::THREAD_SUBSCRIPTIONS)?;
1836
1837 let mut new = PersistedThreadSubscription::from(subscription);
1838
1839 if let Some(previous_value) = obj.get(&encoded_key)?.await? {
1841 let previous: PersistedThreadSubscription = self.deserialize_value(&previous_value)?;
1842
1843 if new == previous {
1845 return Ok(());
1846 }
1847 if !compare_thread_subscription_bump_stamps(previous.bump_stamp, &mut new.bump_stamp) {
1848 return Ok(());
1849 }
1850 }
1851
1852 let serialized_value = self.serialize_value(&new);
1853 obj.put_key_val(&encoded_key, &serialized_value?)?;
1854
1855 tx.await.into_result()?;
1856
1857 Ok(())
1858 }
1859
1860 async fn load_thread_subscription(
1861 &self,
1862 room: &RoomId,
1863 thread_id: &EventId,
1864 ) -> Result<Option<StoredThreadSubscription>> {
1865 let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
1866
1867 let js_value = self
1868 .inner
1869 .transaction_on_one_with_mode(keys::THREAD_SUBSCRIPTIONS, IdbTransactionMode::Readonly)?
1870 .object_store(keys::THREAD_SUBSCRIPTIONS)?
1871 .get(&encoded_key)?
1872 .await?;
1873
1874 let Some(js_value) = js_value else {
1875 return Ok(None);
1877 };
1878
1879 let sub: PersistedThreadSubscription = self.deserialize_value(&js_value)?;
1880
1881 let status = ThreadSubscriptionStatus::from_str(&sub.status).map_err(|_| {
1882 StoreError::InvalidData {
1883 details: format!(
1884 "invalid thread status for room {room} and thread {thread_id}: {}",
1885 sub.status
1886 ),
1887 }
1888 })?;
1889
1890 Ok(Some(StoredThreadSubscription { status, bump_stamp: sub.bump_stamp }))
1891 }
1892
1893 async fn remove_thread_subscription(&self, room: &RoomId, thread_id: &EventId) -> Result<()> {
1894 let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
1895
1896 self.inner
1897 .transaction_on_one_with_mode(
1898 keys::THREAD_SUBSCRIPTIONS,
1899 IdbTransactionMode::Readwrite,
1900 )?
1901 .object_store(keys::THREAD_SUBSCRIPTIONS)?
1902 .delete(&encoded_key)?
1903 .await?;
1904
1905 Ok(())
1906 }
1907});
1908
1909#[derive(Debug, Serialize, Deserialize)]
1911struct RoomMember {
1912 user_id: OwnedUserId,
1913 membership: MembershipState,
1914}
1915
1916impl From<&SyncStateEvent<RoomMemberEventContent>> for RoomMember {
1917 fn from(event: &SyncStateEvent<RoomMemberEventContent>) -> Self {
1918 Self { user_id: event.state_key().clone(), membership: event.membership().clone() }
1919 }
1920}
1921
1922impl From<&StrippedRoomMemberEvent> for RoomMember {
1923 fn from(event: &StrippedRoomMemberEvent) -> Self {
1924 Self { user_id: event.state_key.clone(), membership: event.content.membership.clone() }
1925 }
1926}
1927
1928#[cfg(test)]
1929mod migration_tests {
1930 use assert_matches2::assert_matches;
1931 use matrix_sdk_base::store::{QueuedRequestKind, SerializableEventContent};
1932 use ruma::{
1933 events::room::message::RoomMessageEventContent, room_id, OwnedRoomId, OwnedTransactionId,
1934 TransactionId,
1935 };
1936 use serde::{Deserialize, Serialize};
1937
1938 use crate::state_store::PersistedQueuedRequest;
1939
1940 #[derive(Serialize, Deserialize)]
1941 struct OldPersistedQueuedRequest {
1942 room_id: OwnedRoomId,
1943 event: SerializableEventContent,
1944 transaction_id: OwnedTransactionId,
1945 is_wedged: bool,
1946 }
1947
1948 #[test]
1952 fn test_migrating_persisted_queue_event_serialization() {
1953 let room_a_id = room_id!("!room_a:dummy.local");
1954 let transaction_id = TransactionId::new();
1955 let content =
1956 SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())
1957 .unwrap();
1958
1959 let old_persisted_queue_event = OldPersistedQueuedRequest {
1960 room_id: room_a_id.to_owned(),
1961 event: content,
1962 transaction_id: transaction_id.clone(),
1963 is_wedged: true,
1964 };
1965
1966 let serialized_persisted = serde_json::to_vec(&old_persisted_queue_event).unwrap();
1967
1968 let new_persisted: PersistedQueuedRequest =
1970 serde_json::from_slice(&serialized_persisted).unwrap();
1971
1972 assert_eq!(new_persisted.is_wedged, Some(true));
1973 assert!(new_persisted.error.is_none());
1974
1975 assert!(new_persisted.event.is_some());
1976 assert!(new_persisted.kind.is_none());
1977
1978 let queued = new_persisted.into_queued_request().unwrap();
1979 assert_matches!(queued.kind, QueuedRequestKind::Event { .. });
1980 assert_eq!(queued.transaction_id, transaction_id);
1981 assert!(queued.error.is_some());
1982 }
1983}
1984
1985#[cfg(all(test, target_family = "wasm"))]
1986mod tests {
1987 #[cfg(target_family = "wasm")]
1988 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
1989
1990 use matrix_sdk_base::statestore_integration_tests;
1991 use uuid::Uuid;
1992
1993 use super::{IndexeddbStateStore, Result};
1994
1995 async fn get_store() -> Result<IndexeddbStateStore> {
1996 let db_name = format!("test-state-plain-{}", Uuid::new_v4().as_hyphenated());
1997 Ok(IndexeddbStateStore::builder().name(db_name).build().await?)
1998 }
1999
2000 statestore_integration_tests!();
2001}
2002
2003#[cfg(all(test, target_family = "wasm"))]
2004mod encrypted_tests {
2005 #[cfg(target_family = "wasm")]
2006 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
2007
2008 use matrix_sdk_base::statestore_integration_tests;
2009 use uuid::Uuid;
2010
2011 use super::{IndexeddbStateStore, Result};
2012
2013 async fn get_store() -> Result<IndexeddbStateStore> {
2014 let db_name = format!("test-state-encrypted-{}", Uuid::new_v4().as_hyphenated());
2015 let passphrase = format!("some_passphrase-{}", Uuid::new_v4().as_hyphenated());
2016 Ok(IndexeddbStateStore::builder().name(db_name).passphrase(passphrase).build().await?)
2017 }
2018
2019 statestore_integration_tests!();
2020}