1use std::{
16 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
17 sync::Arc,
18};
19
20use anyhow::anyhow;
21use async_trait::async_trait;
22use gloo_utils::format::JsValueSerdeExt;
23use growable_bloom_filter::GrowableBloom;
24use indexed_db_futures::prelude::*;
25use matrix_sdk_base::{
26 deserialized_responses::{DisplayName, RawAnySyncOrStrippedState},
27 store::{
28 ChildTransactionId, ComposerDraft, DependentQueuedRequest, DependentQueuedRequestKind,
29 QueuedRequest, QueuedRequestKind, RoomLoadSettings, SentRequestKey,
30 SerializableEventContent, ServerInfo, StateChanges, StateStore, StoreError,
31 ThreadSubscription,
32 },
33 MinimalRoomMemberEvent, RoomInfo, RoomMemberships, StateStoreDataKey, StateStoreDataValue,
34 ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK,
35};
36use matrix_sdk_store_encryption::{Error as EncryptionError, StoreCipher};
37use ruma::{
38 canonical_json::{redact, RedactedBecause},
39 events::{
40 presence::PresenceEvent,
41 receipt::{Receipt, ReceiptThread, ReceiptType},
42 room::member::{
43 MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent, SyncRoomMemberEvent,
44 },
45 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
46 GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, SyncStateEvent,
47 },
48 serde::Raw,
49 CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri,
50 OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
51};
52use serde::{de::DeserializeOwned, Deserialize, Serialize};
53use tracing::{debug, warn};
54use wasm_bindgen::JsValue;
55use web_sys::IdbKeyRange;
56
57mod migrations;
58
59pub use self::migrations::MigrationConflictStrategy;
60use self::migrations::{upgrade_inner_db, upgrade_meta_db};
61use crate::safe_encode::SafeEncode;
62
63#[derive(Debug, thiserror::Error)]
64pub enum IndexeddbStateStoreError {
65 #[error(transparent)]
66 Json(#[from] serde_json::Error),
67 #[error(transparent)]
68 Encryption(#[from] EncryptionError),
69 #[error("DomException {name} ({code}): {message}")]
70 DomException { name: String, message: String, code: u16 },
71 #[error(transparent)]
72 StoreError(#[from] StoreError),
73 #[error(
74 "Can't migrate {name} from {old_version} to {new_version} without deleting data. \
75 See MigrationConflictStrategy for ways to configure."
76 )]
77 MigrationConflict { name: String, old_version: u32, new_version: u32 },
78}
79
80impl From<web_sys::DomException> for IndexeddbStateStoreError {
81 fn from(frm: web_sys::DomException) -> IndexeddbStateStoreError {
82 IndexeddbStateStoreError::DomException {
83 name: frm.name(),
84 message: frm.message(),
85 code: frm.code(),
86 }
87 }
88}
89
90impl From<IndexeddbStateStoreError> for StoreError {
91 fn from(e: IndexeddbStateStoreError) -> Self {
92 match e {
93 IndexeddbStateStoreError::Json(e) => StoreError::Json(e),
94 IndexeddbStateStoreError::StoreError(e) => e,
95 IndexeddbStateStoreError::Encryption(e) => StoreError::Encryption(e),
96 _ => StoreError::backend(e),
97 }
98 }
99}
100
101mod keys {
102 pub const INTERNAL_STATE: &str = "matrix-sdk-state";
103 pub const BACKUPS_META: &str = "backups";
104
105 pub const ACCOUNT_DATA: &str = "account_data";
106
107 pub const PROFILES: &str = "profiles";
108 pub const DISPLAY_NAMES: &str = "display_names";
109 pub const USER_IDS: &str = "user_ids";
110
111 pub const ROOM_STATE: &str = "room_state";
112 pub const ROOM_INFOS: &str = "room_infos";
113 pub const PRESENCE: &str = "presence";
114 pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
115 pub const ROOM_SEND_QUEUE: &str = "room_send_queue";
117 pub const DEPENDENT_SEND_QUEUE: &str = "room_dependent_send_queue";
119 pub const THREAD_SUBSCRIPTIONS: &str = "room_thread_subscriptions";
120
121 pub const STRIPPED_ROOM_STATE: &str = "stripped_room_state";
122 pub const STRIPPED_USER_IDS: &str = "stripped_user_ids";
123
124 pub const ROOM_USER_RECEIPTS: &str = "room_user_receipts";
125 pub const ROOM_EVENT_RECEIPTS: &str = "room_event_receipts";
126
127 pub const CUSTOM: &str = "custom";
128 pub const KV: &str = "kv";
129
130 pub const ALL_STORES: &[&str] = &[
132 ACCOUNT_DATA,
133 PROFILES,
134 DISPLAY_NAMES,
135 USER_IDS,
136 ROOM_STATE,
137 ROOM_INFOS,
138 PRESENCE,
139 ROOM_ACCOUNT_DATA,
140 STRIPPED_ROOM_STATE,
141 STRIPPED_USER_IDS,
142 ROOM_USER_RECEIPTS,
143 ROOM_EVENT_RECEIPTS,
144 ROOM_SEND_QUEUE,
145 THREAD_SUBSCRIPTIONS,
146 DEPENDENT_SEND_QUEUE,
147 CUSTOM,
148 KV,
149 ];
150
151 pub const STORE_KEY: &str = "store_key";
154}
155
156pub use keys::ALL_STORES;
157use matrix_sdk_base::store::QueueWedgeError;
158
159fn serialize_value(store_cipher: Option<&StoreCipher>, event: &impl Serialize) -> Result<JsValue> {
161 Ok(match store_cipher {
162 Some(cipher) => {
163 let data = serde_json::to_vec(event)?;
164 JsValue::from_serde(&cipher.encrypt_value_data(data)?)?
165 }
166 None => JsValue::from_serde(event)?,
167 })
168}
169
170fn deserialize_value<T: DeserializeOwned>(
172 store_cipher: Option<&StoreCipher>,
173 event: &JsValue,
174) -> Result<T> {
175 match store_cipher {
176 Some(cipher) => {
177 use zeroize::Zeroize;
178 let mut plaintext = cipher.decrypt_value_data(event.into_serde()?)?;
179 let ret = serde_json::from_slice(&plaintext);
180 plaintext.zeroize();
181 Ok(ret?)
182 }
183 None => Ok(event.into_serde()?),
184 }
185}
186
187fn encode_key<T>(store_cipher: Option<&StoreCipher>, table_name: &str, key: T) -> JsValue
188where
189 T: SafeEncode,
190{
191 match store_cipher {
192 Some(cipher) => key.as_secure_string(table_name, cipher),
193 None => key.as_encoded_string(),
194 }
195 .into()
196}
197
198fn encode_to_range<T>(
199 store_cipher: Option<&StoreCipher>,
200 table_name: &str,
201 key: T,
202) -> Result<IdbKeyRange>
203where
204 T: SafeEncode,
205{
206 match store_cipher {
207 Some(cipher) => key.encode_to_range_secure(table_name, cipher),
208 None => key.encode_to_range(),
209 }
210 .map_err(|e| IndexeddbStateStoreError::StoreError(StoreError::Backend(anyhow!(e).into())))
211}
212
213#[derive(Debug)]
215pub struct IndexeddbStateStoreBuilder {
216 name: Option<String>,
217 passphrase: Option<String>,
218 migration_conflict_strategy: MigrationConflictStrategy,
219}
220
221impl IndexeddbStateStoreBuilder {
222 fn new() -> Self {
223 Self {
224 name: None,
225 passphrase: None,
226 migration_conflict_strategy: MigrationConflictStrategy::BackupAndDrop,
227 }
228 }
229
230 pub fn name(mut self, value: String) -> Self {
232 self.name = Some(value);
233 self
234 }
235
236 pub fn passphrase(mut self, value: String) -> Self {
240 self.passphrase = Some(value);
241 self
242 }
243
244 pub fn migration_conflict_strategy(mut self, value: MigrationConflictStrategy) -> Self {
248 self.migration_conflict_strategy = value;
249 self
250 }
251
252 pub async fn build(self) -> Result<IndexeddbStateStore> {
253 let migration_strategy = self.migration_conflict_strategy.clone();
254 let name = self.name.unwrap_or_else(|| "state".to_owned());
255
256 let meta_name = format!("{name}::{}", keys::INTERNAL_STATE);
257
258 let (meta, store_cipher) = upgrade_meta_db(&meta_name, self.passphrase.as_deref()).await?;
259 let inner =
260 upgrade_inner_db(&name, store_cipher.as_deref(), migration_strategy, &meta).await?;
261
262 Ok(IndexeddbStateStore { name, inner, meta, store_cipher })
263 }
264}
265
266pub struct IndexeddbStateStore {
267 name: String,
268 pub(crate) inner: IdbDatabase,
269 pub(crate) meta: IdbDatabase,
270 pub(crate) store_cipher: Option<Arc<StoreCipher>>,
271}
272
273#[cfg(not(tarpaulin_include))]
274impl std::fmt::Debug for IndexeddbStateStore {
275 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276 f.debug_struct("IndexeddbStateStore").field("name", &self.name).finish()
277 }
278}
279
280type Result<A, E = IndexeddbStateStoreError> = std::result::Result<A, E>;
281
282impl IndexeddbStateStore {
283 pub fn builder() -> IndexeddbStateStoreBuilder {
285 IndexeddbStateStoreBuilder::new()
286 }
287
288 pub fn version(&self) -> u32 {
290 self.inner.version() as u32
291 }
292
293 pub fn meta_version(&self) -> u32 {
295 self.meta.version() as u32
296 }
297
298 pub async fn has_backups(&self) -> Result<bool> {
300 Ok(self
301 .meta
302 .transaction_on_one_with_mode(keys::BACKUPS_META, IdbTransactionMode::Readonly)?
303 .object_store(keys::BACKUPS_META)?
304 .count()?
305 .await?
306 > 0)
307 }
308
309 pub async fn latest_backup(&self) -> Result<Option<String>> {
311 Ok(self
312 .meta
313 .transaction_on_one_with_mode(keys::BACKUPS_META, IdbTransactionMode::Readonly)?
314 .object_store(keys::BACKUPS_META)?
315 .open_cursor_with_direction(IdbCursorDirection::Prev)?
316 .await?
317 .and_then(|c| c.value().as_string()))
318 }
319
320 fn serialize_value(&self, event: &impl Serialize) -> Result<JsValue> {
322 serialize_value(self.store_cipher.as_deref(), event)
323 }
324
325 fn deserialize_value<T: DeserializeOwned>(&self, event: &JsValue) -> Result<T> {
327 deserialize_value(self.store_cipher.as_deref(), event)
328 }
329
330 fn encode_key<T>(&self, table_name: &str, key: T) -> JsValue
331 where
332 T: SafeEncode,
333 {
334 encode_key(self.store_cipher.as_deref(), table_name, key)
335 }
336
337 fn encode_to_range<T>(&self, table_name: &str, key: T) -> Result<IdbKeyRange>
338 where
339 T: SafeEncode,
340 {
341 encode_to_range(self.store_cipher.as_deref(), table_name, key)
342 }
343
344 pub async fn get_user_ids_inner(
347 &self,
348 room_id: &RoomId,
349 memberships: RoomMemberships,
350 stripped: bool,
351 ) -> Result<Vec<OwnedUserId>> {
352 let store_name = if stripped { keys::STRIPPED_USER_IDS } else { keys::USER_IDS };
353
354 let tx =
355 self.inner.transaction_on_one_with_mode(store_name, IdbTransactionMode::Readonly)?;
356 let store = tx.object_store(store_name)?;
357 let range = self.encode_to_range(store_name, room_id)?;
358
359 let user_ids = if memberships.is_empty() {
360 store
362 .get_all_with_key(&range)?
363 .await?
364 .iter()
365 .filter_map(|f| self.deserialize_value::<RoomMember>(&f).ok().map(|m| m.user_id))
366 .collect::<Vec<_>>()
367 } else {
368 let mut user_ids = Vec::new();
369 let cursor = store.open_cursor_with_range(&range)?.await?;
370
371 if let Some(cursor) = cursor {
372 loop {
373 let value = cursor.value();
374 let member = self.deserialize_value::<RoomMember>(&value)?;
375
376 if memberships.matches(&member.membership) {
377 user_ids.push(member.user_id);
378 }
379
380 if !cursor.continue_cursor()?.await? {
381 break;
382 }
383 }
384 }
385
386 user_ids
387 };
388
389 Ok(user_ids)
390 }
391
392 async fn get_custom_value_for_js(&self, jskey: &JsValue) -> Result<Option<Vec<u8>>> {
393 self.inner
394 .transaction_on_one_with_mode(keys::CUSTOM, IdbTransactionMode::Readonly)?
395 .object_store(keys::CUSTOM)?
396 .get(jskey)?
397 .await?
398 .map(|f| self.deserialize_value(&f))
399 .transpose()
400 }
401
402 fn encode_kv_data_key(&self, key: StateStoreDataKey<'_>) -> JsValue {
403 match key {
407 StateStoreDataKey::SyncToken => {
408 self.encode_key(StateStoreDataKey::SYNC_TOKEN, StateStoreDataKey::SYNC_TOKEN)
409 }
410 StateStoreDataKey::ServerInfo => {
411 self.encode_key(StateStoreDataKey::SERVER_INFO, StateStoreDataKey::SERVER_INFO)
412 }
413 StateStoreDataKey::Filter(filter_name) => {
414 self.encode_key(StateStoreDataKey::FILTER, (StateStoreDataKey::FILTER, filter_name))
415 }
416 StateStoreDataKey::UserAvatarUrl(user_id) => {
417 self.encode_key(keys::KV, (StateStoreDataKey::USER_AVATAR_URL, user_id))
418 }
419 StateStoreDataKey::RecentlyVisitedRooms(user_id) => {
420 self.encode_key(keys::KV, (StateStoreDataKey::RECENTLY_VISITED_ROOMS, user_id))
421 }
422 StateStoreDataKey::UtdHookManagerData => {
423 self.encode_key(keys::KV, StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
424 }
425 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
426 self.encode_key(keys::KV, StateStoreDataKey::ONE_TIME_KEY_ALREADY_UPLOADED)
427 }
428 StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
429 if let Some(thread_root) = thread_root {
430 self.encode_key(
431 keys::KV,
432 (StateStoreDataKey::COMPOSER_DRAFT, (room_id, thread_root)),
433 )
434 } else {
435 self.encode_key(keys::KV, (StateStoreDataKey::COMPOSER_DRAFT, room_id))
436 }
437 }
438 StateStoreDataKey::SeenKnockRequests(room_id) => {
439 self.encode_key(keys::KV, (StateStoreDataKey::SEEN_KNOCK_REQUESTS, room_id))
440 }
441 }
442 }
443}
444
445#[derive(Serialize, Deserialize)]
448struct PersistedQueuedRequest {
449 pub room_id: OwnedRoomId,
451
452 kind: Option<QueuedRequestKind>,
455 transaction_id: OwnedTransactionId,
456
457 pub error: Option<QueueWedgeError>,
458
459 priority: Option<usize>,
460
461 #[serde(default = "created_now")]
463 created_at: MilliSecondsSinceUnixEpoch,
464
465 is_wedged: Option<bool>,
468
469 event: Option<SerializableEventContent>,
470}
471
472fn created_now() -> MilliSecondsSinceUnixEpoch {
473 MilliSecondsSinceUnixEpoch::now()
474}
475
476impl PersistedQueuedRequest {
477 fn into_queued_request(self) -> Option<QueuedRequest> {
478 let kind =
479 self.kind.or_else(|| self.event.map(|content| QueuedRequestKind::Event { content }))?;
480
481 let error = match self.is_wedged {
482 Some(true) => {
483 Some(QueueWedgeError::GenericApiError {
485 msg: "local echo failed to send in a previous session".into(),
486 })
487 }
488 _ => self.error,
489 };
490
491 let priority = self.priority.unwrap_or(0);
493
494 Some(QueuedRequest {
495 kind,
496 transaction_id: self.transaction_id,
497 error,
498 priority,
499 created_at: self.created_at,
500 })
501 }
502}
503
504#[cfg(target_family = "wasm")]
513macro_rules! impl_state_store {
514 ({ $($body:tt)* }) => {
515 #[async_trait(?Send)]
516 impl StateStore for IndexeddbStateStore {
517 type Error = IndexeddbStateStoreError;
518
519 $($body)*
520 }
521 };
522}
523
524#[cfg(not(target_family = "wasm"))]
525macro_rules! impl_state_store {
526 ({ $($body:tt)* }) => {
527 impl IndexeddbStateStore {
528 $($body)*
529 }
530 };
531}
532
533impl_state_store!({
534 async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
535 let encoded_key = self.encode_kv_data_key(key);
536
537 let value = self
538 .inner
539 .transaction_on_one_with_mode(keys::KV, IdbTransactionMode::Readonly)?
540 .object_store(keys::KV)?
541 .get(&encoded_key)?
542 .await?;
543
544 let value = match key {
545 StateStoreDataKey::SyncToken => value
546 .map(|f| self.deserialize_value::<String>(&f))
547 .transpose()?
548 .map(StateStoreDataValue::SyncToken),
549 StateStoreDataKey::ServerInfo => value
550 .map(|f| self.deserialize_value::<ServerInfo>(&f))
551 .transpose()?
552 .map(StateStoreDataValue::ServerInfo),
553 StateStoreDataKey::Filter(_) => value
554 .map(|f| self.deserialize_value::<String>(&f))
555 .transpose()?
556 .map(StateStoreDataValue::Filter),
557 StateStoreDataKey::UserAvatarUrl(_) => value
558 .map(|f| self.deserialize_value::<OwnedMxcUri>(&f))
559 .transpose()?
560 .map(StateStoreDataValue::UserAvatarUrl),
561 StateStoreDataKey::RecentlyVisitedRooms(_) => value
562 .map(|f| self.deserialize_value::<Vec<OwnedRoomId>>(&f))
563 .transpose()?
564 .map(StateStoreDataValue::RecentlyVisitedRooms),
565 StateStoreDataKey::UtdHookManagerData => value
566 .map(|f| self.deserialize_value::<GrowableBloom>(&f))
567 .transpose()?
568 .map(StateStoreDataValue::UtdHookManagerData),
569 StateStoreDataKey::OneTimeKeyAlreadyUploaded => value
570 .map(|f| self.deserialize_value::<bool>(&f))
571 .transpose()?
572 .map(|_| StateStoreDataValue::OneTimeKeyAlreadyUploaded),
573 StateStoreDataKey::ComposerDraft(_, _) => value
574 .map(|f| self.deserialize_value::<ComposerDraft>(&f))
575 .transpose()?
576 .map(StateStoreDataValue::ComposerDraft),
577 StateStoreDataKey::SeenKnockRequests(_) => value
578 .map(|f| self.deserialize_value::<BTreeMap<OwnedEventId, OwnedUserId>>(&f))
579 .transpose()?
580 .map(StateStoreDataValue::SeenKnockRequests),
581 };
582
583 Ok(value)
584 }
585
586 async fn set_kv_data(
587 &self,
588 key: StateStoreDataKey<'_>,
589 value: StateStoreDataValue,
590 ) -> Result<()> {
591 let encoded_key = self.encode_kv_data_key(key);
592
593 let serialized_value = match key {
594 StateStoreDataKey::SyncToken => self
595 .serialize_value(&value.into_sync_token().expect("Session data not a sync token")),
596 StateStoreDataKey::ServerInfo => self.serialize_value(
597 &value.into_server_info().expect("Session data not containing server info"),
598 ),
599 StateStoreDataKey::Filter(_) => {
600 self.serialize_value(&value.into_filter().expect("Session data not a filter"))
601 }
602 StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
603 &value.into_user_avatar_url().expect("Session data not an user avatar url"),
604 ),
605 StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
606 &value
607 .into_recently_visited_rooms()
608 .expect("Session data not a recently visited room list"),
609 ),
610 StateStoreDataKey::UtdHookManagerData => self.serialize_value(
611 &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
612 ),
613 StateStoreDataKey::OneTimeKeyAlreadyUploaded => self.serialize_value(&true),
614 StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
615 &value.into_composer_draft().expect("Session data not a composer draft"),
616 ),
617 StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
618 &value
619 .into_seen_knock_requests()
620 .expect("Session data is not a set of seen knock request ids"),
621 ),
622 };
623
624 let tx =
625 self.inner.transaction_on_one_with_mode(keys::KV, IdbTransactionMode::Readwrite)?;
626
627 let obj = tx.object_store(keys::KV)?;
628
629 obj.put_key_val(&encoded_key, &serialized_value?)?;
630
631 tx.await.into_result()?;
632
633 Ok(())
634 }
635
636 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
637 let encoded_key = self.encode_kv_data_key(key);
638
639 let tx =
640 self.inner.transaction_on_one_with_mode(keys::KV, IdbTransactionMode::Readwrite)?;
641 let obj = tx.object_store(keys::KV)?;
642
643 obj.delete(&encoded_key)?;
644
645 tx.await.into_result()?;
646
647 Ok(())
648 }
649
650 async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
651 let mut stores: HashSet<&'static str> = [
652 (changes.sync_token.is_some(), keys::KV),
653 (!changes.ambiguity_maps.is_empty(), keys::DISPLAY_NAMES),
654 (!changes.account_data.is_empty(), keys::ACCOUNT_DATA),
655 (!changes.presence.is_empty(), keys::PRESENCE),
656 (
657 !changes.profiles.is_empty() || !changes.profiles_to_delete.is_empty(),
658 keys::PROFILES,
659 ),
660 (!changes.room_account_data.is_empty(), keys::ROOM_ACCOUNT_DATA),
661 (!changes.receipts.is_empty(), keys::ROOM_EVENT_RECEIPTS),
662 ]
663 .iter()
664 .filter_map(|(id, key)| if *id { Some(*key) } else { None })
665 .collect();
666
667 if !changes.state.is_empty() {
668 stores.extend([
669 keys::ROOM_STATE,
670 keys::USER_IDS,
671 keys::STRIPPED_USER_IDS,
672 keys::STRIPPED_ROOM_STATE,
673 keys::PROFILES,
674 ]);
675 }
676
677 if !changes.redactions.is_empty() {
678 stores.extend([keys::ROOM_STATE, keys::ROOM_INFOS]);
679 }
680
681 if !changes.room_infos.is_empty() {
682 stores.insert(keys::ROOM_INFOS);
683 }
684
685 if !changes.stripped_state.is_empty() {
686 stores.extend([keys::STRIPPED_ROOM_STATE, keys::STRIPPED_USER_IDS]);
687 }
688
689 if !changes.receipts.is_empty() {
690 stores.extend([keys::ROOM_EVENT_RECEIPTS, keys::ROOM_USER_RECEIPTS])
691 }
692
693 if stores.is_empty() {
694 return Ok(());
696 }
697
698 let stores: Vec<&'static str> = stores.into_iter().collect();
699 let tx =
700 self.inner.transaction_on_multi_with_mode(&stores, IdbTransactionMode::Readwrite)?;
701
702 if let Some(s) = &changes.sync_token {
703 tx.object_store(keys::KV)?.put_key_val(
704 &self.encode_kv_data_key(StateStoreDataKey::SyncToken),
705 &self.serialize_value(s)?,
706 )?;
707 }
708
709 if !changes.ambiguity_maps.is_empty() {
710 let store = tx.object_store(keys::DISPLAY_NAMES)?;
711 for (room_id, ambiguity_maps) in &changes.ambiguity_maps {
712 for (display_name, map) in ambiguity_maps {
713 let key = self.encode_key(
714 keys::DISPLAY_NAMES,
715 (
716 room_id,
717 display_name
718 .as_normalized_str()
719 .unwrap_or_else(|| display_name.as_raw_str()),
720 ),
721 );
722
723 store.put_key_val(&key, &self.serialize_value(&map)?)?;
724 }
725 }
726 }
727
728 if !changes.account_data.is_empty() {
729 let store = tx.object_store(keys::ACCOUNT_DATA)?;
730 for (event_type, event) in &changes.account_data {
731 store.put_key_val(
732 &self.encode_key(keys::ACCOUNT_DATA, event_type),
733 &self.serialize_value(&event)?,
734 )?;
735 }
736 }
737
738 if !changes.room_account_data.is_empty() {
739 let store = tx.object_store(keys::ROOM_ACCOUNT_DATA)?;
740 for (room, events) in &changes.room_account_data {
741 for (event_type, event) in events {
742 let key = self.encode_key(keys::ROOM_ACCOUNT_DATA, (room, event_type));
743 store.put_key_val(&key, &self.serialize_value(&event)?)?;
744 }
745 }
746 }
747
748 if !changes.state.is_empty() {
749 let state = tx.object_store(keys::ROOM_STATE)?;
750 let profiles = tx.object_store(keys::PROFILES)?;
751 let user_ids = tx.object_store(keys::USER_IDS)?;
752 let stripped_state = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
753 let stripped_user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
754
755 for (room, user_ids) in &changes.profiles_to_delete {
756 for user_id in user_ids {
757 let key = self.encode_key(keys::PROFILES, (room, user_id));
758 profiles.delete(&key)?;
759 }
760 }
761
762 for (room, event_types) in &changes.state {
763 let profile_changes = changes.profiles.get(room);
764
765 for (event_type, events) in event_types {
766 for (state_key, raw_event) in events {
767 let key = self.encode_key(keys::ROOM_STATE, (room, event_type, state_key));
768 state.put_key_val(&key, &self.serialize_value(&raw_event)?)?;
769 stripped_state.delete(&key)?;
770
771 if *event_type == StateEventType::RoomMember {
772 let event =
773 match raw_event.deserialize_as_unchecked::<SyncRoomMemberEvent>() {
774 Ok(ev) => ev,
775 Err(e) => {
776 let event_id: Option<String> =
777 raw_event.get_field("event_id").ok().flatten();
778 debug!(event_id, "Failed to deserialize member event: {e}");
779 continue;
780 }
781 };
782
783 let key = (room, state_key);
784
785 stripped_user_ids
786 .delete(&self.encode_key(keys::STRIPPED_USER_IDS, key))?;
787
788 user_ids.put_key_val_owned(
789 self.encode_key(keys::USER_IDS, key),
790 &self.serialize_value(&RoomMember::from(&event))?,
791 )?;
792
793 if let Some(profile) =
794 profile_changes.and_then(|p| p.get(event.state_key()))
795 {
796 profiles.put_key_val_owned(
797 self.encode_key(keys::PROFILES, key),
798 &self.serialize_value(&profile)?,
799 )?;
800 }
801 }
802 }
803 }
804 }
805 }
806
807 if !changes.room_infos.is_empty() {
808 let room_infos = tx.object_store(keys::ROOM_INFOS)?;
809 for (room_id, room_info) in &changes.room_infos {
810 room_infos.put_key_val(
811 &self.encode_key(keys::ROOM_INFOS, room_id),
812 &self.serialize_value(&room_info)?,
813 )?;
814 }
815 }
816
817 if !changes.presence.is_empty() {
818 let store = tx.object_store(keys::PRESENCE)?;
819 for (sender, event) in &changes.presence {
820 store.put_key_val(
821 &self.encode_key(keys::PRESENCE, sender),
822 &self.serialize_value(&event)?,
823 )?;
824 }
825 }
826
827 if !changes.stripped_state.is_empty() {
828 let store = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
829 let user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
830
831 for (room, event_types) in &changes.stripped_state {
832 for (event_type, events) in event_types {
833 for (state_key, raw_event) in events {
834 let key = self
835 .encode_key(keys::STRIPPED_ROOM_STATE, (room, event_type, state_key));
836 store.put_key_val(&key, &self.serialize_value(&raw_event)?)?;
837
838 if *event_type == StateEventType::RoomMember {
839 let event = match raw_event
840 .deserialize_as_unchecked::<StrippedRoomMemberEvent>()
841 {
842 Ok(ev) => ev,
843 Err(e) => {
844 let event_id: Option<String> =
845 raw_event.get_field("event_id").ok().flatten();
846 debug!(
847 event_id,
848 "Failed to deserialize stripped member event: {e}"
849 );
850 continue;
851 }
852 };
853
854 let key = (room, state_key);
855
856 user_ids.put_key_val_owned(
857 self.encode_key(keys::STRIPPED_USER_IDS, key),
858 &self.serialize_value(&RoomMember::from(&event))?,
859 )?;
860 }
861 }
862 }
863 }
864 }
865
866 if !changes.receipts.is_empty() {
867 let room_user_receipts = tx.object_store(keys::ROOM_USER_RECEIPTS)?;
868 let room_event_receipts = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
869
870 for (room, content) in &changes.receipts {
871 for (event_id, receipts) in &content.0 {
872 for (receipt_type, receipts) in receipts {
873 for (user_id, receipt) in receipts {
874 let key = match receipt.thread.as_str() {
875 Some(thread_id) => self.encode_key(
876 keys::ROOM_USER_RECEIPTS,
877 (room, receipt_type, thread_id, user_id),
878 ),
879 None => self.encode_key(
880 keys::ROOM_USER_RECEIPTS,
881 (room, receipt_type, user_id),
882 ),
883 };
884
885 if let Some((old_event, _)) =
886 room_user_receipts.get(&key)?.await?.and_then(|f| {
887 self.deserialize_value::<(OwnedEventId, Receipt)>(&f).ok()
888 })
889 {
890 let key = match receipt.thread.as_str() {
891 Some(thread_id) => self.encode_key(
892 keys::ROOM_EVENT_RECEIPTS,
893 (room, receipt_type, thread_id, old_event, user_id),
894 ),
895 None => self.encode_key(
896 keys::ROOM_EVENT_RECEIPTS,
897 (room, receipt_type, old_event, user_id),
898 ),
899 };
900 room_event_receipts.delete(&key)?;
901 }
902
903 room_user_receipts
904 .put_key_val(&key, &self.serialize_value(&(event_id, receipt))?)?;
905
906 let key = match receipt.thread.as_str() {
908 Some(thread_id) => self.encode_key(
909 keys::ROOM_EVENT_RECEIPTS,
910 (room, receipt_type, thread_id, event_id, user_id),
911 ),
912 None => self.encode_key(
913 keys::ROOM_EVENT_RECEIPTS,
914 (room, receipt_type, event_id, user_id),
915 ),
916 };
917 room_event_receipts
918 .put_key_val(&key, &self.serialize_value(&(user_id, receipt))?)?;
919 }
920 }
921 }
922 }
923 }
924
925 if !changes.redactions.is_empty() {
926 let state = tx.object_store(keys::ROOM_STATE)?;
927 let room_info = tx.object_store(keys::ROOM_INFOS)?;
928
929 for (room_id, redactions) in &changes.redactions {
930 let range = self.encode_to_range(keys::ROOM_STATE, room_id)?;
931 let Some(cursor) = state.open_cursor_with_range(&range)?.await? else { continue };
932
933 let mut redaction_rules = None;
934
935 while let Some(key) = cursor.key() {
936 let raw_evt =
937 self.deserialize_value::<Raw<AnySyncStateEvent>>(&cursor.value())?;
938 if let Ok(Some(event_id)) = raw_evt.get_field::<OwnedEventId>("event_id") {
939 if let Some(redaction) = redactions.get(&event_id) {
940 let redaction_rules = {
941 if redaction_rules.is_none() {
942 redaction_rules.replace(room_info
943 .get(&self.encode_key(keys::ROOM_INFOS, room_id))?
944 .await?
945 .and_then(|f| self.deserialize_value::<RoomInfo>(&f).ok())
946 .map(|info| info.room_version_rules_or_default())
947 .unwrap_or_else(|| {
948 warn!(?room_id, "Unable to get the room version rules, defaulting to rules for room version {ROOM_VERSION_FALLBACK}");
949 ROOM_VERSION_RULES_FALLBACK
950 }).redaction
951 );
952 }
953 redaction_rules.as_ref().unwrap()
954 };
955
956 let redacted = redact(
957 raw_evt.deserialize_as::<CanonicalJsonObject>()?,
958 redaction_rules,
959 Some(RedactedBecause::from_raw_event(redaction)?),
960 )
961 .map_err(StoreError::Redaction)?;
962 state.put_key_val(&key, &self.serialize_value(&redacted)?)?;
963 }
964 }
965
966 cursor.advance(1)?.await?;
968 }
969 }
970 }
971
972 tx.await.into_result().map_err(|e| e.into())
973 }
974
975 async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
976 self.inner
977 .transaction_on_one_with_mode(keys::PRESENCE, IdbTransactionMode::Readonly)?
978 .object_store(keys::PRESENCE)?
979 .get(&self.encode_key(keys::PRESENCE, user_id))?
980 .await?
981 .map(|f| self.deserialize_value(&f))
982 .transpose()
983 }
984
985 async fn get_presence_events(
986 &self,
987 user_ids: &[OwnedUserId],
988 ) -> Result<Vec<Raw<PresenceEvent>>> {
989 if user_ids.is_empty() {
990 return Ok(Vec::new());
991 }
992
993 let txn = self
994 .inner
995 .transaction_on_one_with_mode(keys::PRESENCE, IdbTransactionMode::Readonly)?;
996 let store = txn.object_store(keys::PRESENCE)?;
997
998 let mut events = Vec::with_capacity(user_ids.len());
999
1000 for user_id in user_ids {
1001 if let Some(event) = store
1002 .get(&self.encode_key(keys::PRESENCE, user_id))?
1003 .await?
1004 .map(|f| self.deserialize_value(&f))
1005 .transpose()?
1006 {
1007 events.push(event)
1008 }
1009 }
1010
1011 Ok(events)
1012 }
1013
1014 async fn get_state_event(
1015 &self,
1016 room_id: &RoomId,
1017 event_type: StateEventType,
1018 state_key: &str,
1019 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1020 Ok(self
1021 .get_state_events_for_keys(room_id, event_type, &[state_key])
1022 .await?
1023 .into_iter()
1024 .next())
1025 }
1026
1027 async fn get_state_events(
1028 &self,
1029 room_id: &RoomId,
1030 event_type: StateEventType,
1031 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1032 let stripped_range =
1033 self.encode_to_range(keys::STRIPPED_ROOM_STATE, (room_id, &event_type))?;
1034 let stripped_events = self
1035 .inner
1036 .transaction_on_one_with_mode(keys::STRIPPED_ROOM_STATE, IdbTransactionMode::Readonly)?
1037 .object_store(keys::STRIPPED_ROOM_STATE)?
1038 .get_all_with_key(&stripped_range)?
1039 .await?
1040 .iter()
1041 .filter_map(|f| {
1042 self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Stripped)
1043 })
1044 .collect::<Vec<_>>();
1045
1046 if !stripped_events.is_empty() {
1047 return Ok(stripped_events);
1048 }
1049
1050 let range = self.encode_to_range(keys::ROOM_STATE, (room_id, event_type))?;
1051 Ok(self
1052 .inner
1053 .transaction_on_one_with_mode(keys::ROOM_STATE, IdbTransactionMode::Readonly)?
1054 .object_store(keys::ROOM_STATE)?
1055 .get_all_with_key(&range)?
1056 .await?
1057 .iter()
1058 .filter_map(|f| self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Sync))
1059 .collect::<Vec<_>>())
1060 }
1061
1062 async fn get_state_events_for_keys(
1063 &self,
1064 room_id: &RoomId,
1065 event_type: StateEventType,
1066 state_keys: &[&str],
1067 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1068 if state_keys.is_empty() {
1069 return Ok(Vec::new());
1070 }
1071
1072 let mut events = Vec::with_capacity(state_keys.len());
1073
1074 {
1075 let txn = self.inner.transaction_on_one_with_mode(
1076 keys::STRIPPED_ROOM_STATE,
1077 IdbTransactionMode::Readonly,
1078 )?;
1079 let store = txn.object_store(keys::STRIPPED_ROOM_STATE)?;
1080
1081 for state_key in state_keys {
1082 if let Some(event) =
1083 store
1084 .get(&self.encode_key(
1085 keys::STRIPPED_ROOM_STATE,
1086 (room_id, &event_type, state_key),
1087 ))?
1088 .await?
1089 .map(|f| self.deserialize_value(&f))
1090 .transpose()?
1091 {
1092 events.push(RawAnySyncOrStrippedState::Stripped(event));
1093 }
1094 }
1095
1096 if !events.is_empty() {
1097 return Ok(events);
1098 }
1099 }
1100
1101 let txn = self
1102 .inner
1103 .transaction_on_one_with_mode(keys::ROOM_STATE, IdbTransactionMode::Readonly)?;
1104 let store = txn.object_store(keys::ROOM_STATE)?;
1105
1106 for state_key in state_keys {
1107 if let Some(event) = store
1108 .get(&self.encode_key(keys::ROOM_STATE, (room_id, &event_type, state_key)))?
1109 .await?
1110 .map(|f| self.deserialize_value(&f))
1111 .transpose()?
1112 {
1113 events.push(RawAnySyncOrStrippedState::Sync(event));
1114 }
1115 }
1116
1117 Ok(events)
1118 }
1119
1120 async fn get_profile(
1121 &self,
1122 room_id: &RoomId,
1123 user_id: &UserId,
1124 ) -> Result<Option<MinimalRoomMemberEvent>> {
1125 self.inner
1126 .transaction_on_one_with_mode(keys::PROFILES, IdbTransactionMode::Readonly)?
1127 .object_store(keys::PROFILES)?
1128 .get(&self.encode_key(keys::PROFILES, (room_id, user_id)))?
1129 .await?
1130 .map(|f| self.deserialize_value(&f))
1131 .transpose()
1132 }
1133
1134 async fn get_profiles<'a>(
1135 &self,
1136 room_id: &RoomId,
1137 user_ids: &'a [OwnedUserId],
1138 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1139 if user_ids.is_empty() {
1140 return Ok(BTreeMap::new());
1141 }
1142
1143 let txn = self
1144 .inner
1145 .transaction_on_one_with_mode(keys::PROFILES, IdbTransactionMode::Readonly)?;
1146 let store = txn.object_store(keys::PROFILES)?;
1147
1148 let mut profiles = BTreeMap::new();
1149 for user_id in user_ids {
1150 if let Some(profile) = store
1151 .get(&self.encode_key(keys::PROFILES, (room_id, user_id)))?
1152 .await?
1153 .map(|f| self.deserialize_value(&f))
1154 .transpose()?
1155 {
1156 profiles.insert(user_id.as_ref(), profile);
1157 }
1158 }
1159
1160 Ok(profiles)
1161 }
1162
1163 async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1164 let transaction = self
1165 .inner
1166 .transaction_on_one_with_mode(keys::ROOM_INFOS, IdbTransactionMode::Readonly)?;
1167
1168 let object_store = transaction.object_store(keys::ROOM_INFOS)?;
1169
1170 Ok(match room_load_settings {
1171 RoomLoadSettings::All => object_store
1172 .get_all()?
1173 .await?
1174 .iter()
1175 .map(|room_info| self.deserialize_value::<RoomInfo>(&room_info))
1176 .collect::<Result<_>>()?,
1177
1178 RoomLoadSettings::One(room_id) => {
1179 match object_store.get(&self.encode_key(keys::ROOM_INFOS, room_id))?.await? {
1180 Some(room_info) => vec![self.deserialize_value::<RoomInfo>(&room_info)?],
1181 None => vec![],
1182 }
1183 }
1184 })
1185 }
1186
1187 async fn get_users_with_display_name(
1188 &self,
1189 room_id: &RoomId,
1190 display_name: &DisplayName,
1191 ) -> Result<BTreeSet<OwnedUserId>> {
1192 self.inner
1193 .transaction_on_one_with_mode(keys::DISPLAY_NAMES, IdbTransactionMode::Readonly)?
1194 .object_store(keys::DISPLAY_NAMES)?
1195 .get(&self.encode_key(
1196 keys::DISPLAY_NAMES,
1197 (
1198 room_id,
1199 display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1200 ),
1201 ))?
1202 .await?
1203 .map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
1204 .unwrap_or_else(|| Ok(Default::default()))
1205 }
1206
1207 async fn get_users_with_display_names<'a>(
1208 &self,
1209 room_id: &RoomId,
1210 display_names: &'a [DisplayName],
1211 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1212 let mut map = HashMap::new();
1213
1214 if display_names.is_empty() {
1215 return Ok(map);
1216 }
1217
1218 let txn = self
1219 .inner
1220 .transaction_on_one_with_mode(keys::DISPLAY_NAMES, IdbTransactionMode::Readonly)?;
1221 let store = txn.object_store(keys::DISPLAY_NAMES)?;
1222
1223 for display_name in display_names {
1224 if let Some(user_ids) = store
1225 .get(
1226 &self.encode_key(
1227 keys::DISPLAY_NAMES,
1228 (
1229 room_id,
1230 display_name
1231 .as_normalized_str()
1232 .unwrap_or_else(|| display_name.as_raw_str()),
1233 ),
1234 ),
1235 )?
1236 .await?
1237 .map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
1238 .transpose()?
1239 {
1240 map.insert(display_name, user_ids);
1241 }
1242 }
1243
1244 Ok(map)
1245 }
1246
1247 async fn get_account_data_event(
1248 &self,
1249 event_type: GlobalAccountDataEventType,
1250 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1251 self.inner
1252 .transaction_on_one_with_mode(keys::ACCOUNT_DATA, IdbTransactionMode::Readonly)?
1253 .object_store(keys::ACCOUNT_DATA)?
1254 .get(&self.encode_key(keys::ACCOUNT_DATA, event_type))?
1255 .await?
1256 .map(|f| self.deserialize_value(&f))
1257 .transpose()
1258 }
1259
1260 async fn get_room_account_data_event(
1261 &self,
1262 room_id: &RoomId,
1263 event_type: RoomAccountDataEventType,
1264 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1265 self.inner
1266 .transaction_on_one_with_mode(keys::ROOM_ACCOUNT_DATA, IdbTransactionMode::Readonly)?
1267 .object_store(keys::ROOM_ACCOUNT_DATA)?
1268 .get(&self.encode_key(keys::ROOM_ACCOUNT_DATA, (room_id, event_type)))?
1269 .await?
1270 .map(|f| self.deserialize_value(&f))
1271 .transpose()
1272 }
1273
1274 async fn get_user_room_receipt_event(
1275 &self,
1276 room_id: &RoomId,
1277 receipt_type: ReceiptType,
1278 thread: ReceiptThread,
1279 user_id: &UserId,
1280 ) -> Result<Option<(OwnedEventId, Receipt)>> {
1281 let key = match thread.as_str() {
1282 Some(thread_id) => self
1283 .encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, thread_id, user_id)),
1284 None => self.encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, user_id)),
1285 };
1286 self.inner
1287 .transaction_on_one_with_mode(keys::ROOM_USER_RECEIPTS, IdbTransactionMode::Readonly)?
1288 .object_store(keys::ROOM_USER_RECEIPTS)?
1289 .get(&key)?
1290 .await?
1291 .map(|f| self.deserialize_value(&f))
1292 .transpose()
1293 }
1294
1295 async fn get_event_room_receipt_events(
1296 &self,
1297 room_id: &RoomId,
1298 receipt_type: ReceiptType,
1299 thread: ReceiptThread,
1300 event_id: &EventId,
1301 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1302 let range = match thread.as_str() {
1303 Some(thread_id) => self.encode_to_range(
1304 keys::ROOM_EVENT_RECEIPTS,
1305 (room_id, receipt_type, thread_id, event_id),
1306 ),
1307 None => {
1308 self.encode_to_range(keys::ROOM_EVENT_RECEIPTS, (room_id, receipt_type, event_id))
1309 }
1310 }?;
1311 let tx = self.inner.transaction_on_one_with_mode(
1312 keys::ROOM_EVENT_RECEIPTS,
1313 IdbTransactionMode::Readonly,
1314 )?;
1315 let store = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
1316
1317 Ok(store
1318 .get_all_with_key(&range)?
1319 .await?
1320 .iter()
1321 .filter_map(|f| self.deserialize_value(&f).ok())
1322 .collect::<Vec<_>>())
1323 }
1324
1325 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1326 let jskey = &JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1327 self.get_custom_value_for_js(jskey).await
1328 }
1329
1330 async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1331 let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1332
1333 let prev = self.get_custom_value_for_js(&jskey).await?;
1334
1335 let tx =
1336 self.inner.transaction_on_one_with_mode(keys::CUSTOM, IdbTransactionMode::Readwrite)?;
1337
1338 tx.object_store(keys::CUSTOM)?.put_key_val(&jskey, &self.serialize_value(&value)?)?;
1339
1340 tx.await.into_result().map_err(IndexeddbStateStoreError::from)?;
1341 Ok(prev)
1342 }
1343
1344 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1345 let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1346
1347 let prev = self.get_custom_value_for_js(&jskey).await?;
1348
1349 let tx =
1350 self.inner.transaction_on_one_with_mode(keys::CUSTOM, IdbTransactionMode::Readwrite)?;
1351
1352 tx.object_store(keys::CUSTOM)?.delete(&jskey)?;
1353
1354 tx.await.into_result().map_err(IndexeddbStateStoreError::from)?;
1355 Ok(prev)
1356 }
1357
1358 async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1359 let direct_stores = [keys::ROOM_INFOS, keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE];
1361
1362 let prefixed_stores = [
1365 keys::PROFILES,
1366 keys::DISPLAY_NAMES,
1367 keys::USER_IDS,
1368 keys::ROOM_STATE,
1369 keys::ROOM_ACCOUNT_DATA,
1370 keys::ROOM_EVENT_RECEIPTS,
1371 keys::ROOM_USER_RECEIPTS,
1372 keys::STRIPPED_ROOM_STATE,
1373 keys::STRIPPED_USER_IDS,
1374 keys::THREAD_SUBSCRIPTIONS,
1375 ];
1376
1377 let all_stores = {
1378 let mut v = Vec::new();
1379 v.extend(prefixed_stores);
1380 v.extend(direct_stores);
1381 v
1382 };
1383
1384 let tx = self
1385 .inner
1386 .transaction_on_multi_with_mode(&all_stores, IdbTransactionMode::Readwrite)?;
1387
1388 for store_name in direct_stores {
1389 tx.object_store(store_name)?.delete(&self.encode_key(store_name, room_id))?;
1390 }
1391
1392 for store_name in prefixed_stores {
1393 let store = tx.object_store(store_name)?;
1394 let range = self.encode_to_range(store_name, room_id)?;
1395 for key in store.get_all_keys_with_key(&range)?.await?.iter() {
1396 store.delete(&key)?;
1397 }
1398 }
1399
1400 tx.await.into_result().map_err(|e| e.into())
1401 }
1402
1403 async fn get_user_ids(
1404 &self,
1405 room_id: &RoomId,
1406 memberships: RoomMemberships,
1407 ) -> Result<Vec<OwnedUserId>> {
1408 let ids = self.get_user_ids_inner(room_id, memberships, true).await?;
1409 if !ids.is_empty() {
1410 return Ok(ids);
1411 }
1412 self.get_user_ids_inner(room_id, memberships, false).await
1413 }
1414
1415 async fn save_send_queue_request(
1416 &self,
1417 room_id: &RoomId,
1418 transaction_id: OwnedTransactionId,
1419 created_at: MilliSecondsSinceUnixEpoch,
1420 kind: QueuedRequestKind,
1421 priority: usize,
1422 ) -> Result<()> {
1423 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1424
1425 let tx = self
1426 .inner
1427 .transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?;
1428
1429 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1430
1431 let prev = obj.get(&encoded_key)?.await?;
1436
1437 let mut prev = prev.map_or_else(
1438 || Ok(Vec::new()),
1439 |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1440 )?;
1441
1442 prev.push(PersistedQueuedRequest {
1444 room_id: room_id.to_owned(),
1445 kind: Some(kind),
1446 transaction_id,
1447 error: None,
1448 is_wedged: None,
1449 event: None,
1450 priority: Some(priority),
1451 created_at,
1452 });
1453
1454 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1456
1457 tx.await.into_result()?;
1458
1459 Ok(())
1460 }
1461
1462 async fn update_send_queue_request(
1463 &self,
1464 room_id: &RoomId,
1465 transaction_id: &TransactionId,
1466 kind: QueuedRequestKind,
1467 ) -> Result<bool> {
1468 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1469
1470 let tx = self
1471 .inner
1472 .transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?;
1473
1474 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1475
1476 let prev = obj.get(&encoded_key)?.await?;
1481
1482 let mut prev = prev.map_or_else(
1483 || Ok(Vec::new()),
1484 |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1485 )?;
1486
1487 if let Some(entry) = prev.iter_mut().find(|entry| entry.transaction_id == transaction_id) {
1489 entry.kind = Some(kind);
1490 entry.error = None;
1492 entry.is_wedged = None;
1494 entry.event = None;
1495
1496 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1498 tx.await.into_result()?;
1499
1500 Ok(true)
1501 } else {
1502 Ok(false)
1503 }
1504 }
1505
1506 async fn remove_send_queue_request(
1507 &self,
1508 room_id: &RoomId,
1509 transaction_id: &TransactionId,
1510 ) -> Result<bool> {
1511 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1512
1513 let tx = self.inner.transaction_on_multi_with_mode(
1514 &[keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE],
1515 IdbTransactionMode::Readwrite,
1516 )?;
1517
1518 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1519
1520 if let Some(val) = obj.get(&encoded_key)?.await? {
1525 let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
1526 if let Some(pos) = prev.iter().position(|item| item.transaction_id == transaction_id) {
1527 prev.remove(pos);
1528
1529 if prev.is_empty() {
1530 obj.delete(&encoded_key)?;
1531 } else {
1532 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1533 }
1534
1535 tx.await.into_result()?;
1536 return Ok(true);
1537 }
1538 }
1539
1540 Ok(false)
1541 }
1542
1543 async fn load_send_queue_requests(&self, room_id: &RoomId) -> Result<Vec<QueuedRequest>> {
1544 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1545
1546 let prev = self
1549 .inner
1550 .transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?
1551 .object_store(keys::ROOM_SEND_QUEUE)?
1552 .get(&encoded_key)?
1553 .await?;
1554
1555 let mut prev = prev.map_or_else(
1556 || Ok(Vec::new()),
1557 |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1558 )?;
1559
1560 prev.sort_by(|lhs, rhs| rhs.priority.unwrap_or(0).cmp(&lhs.priority.unwrap_or(0)));
1562
1563 Ok(prev.into_iter().filter_map(PersistedQueuedRequest::into_queued_request).collect())
1564 }
1565
1566 async fn update_send_queue_request_status(
1567 &self,
1568 room_id: &RoomId,
1569 transaction_id: &TransactionId,
1570 error: Option<QueueWedgeError>,
1571 ) -> Result<()> {
1572 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1573
1574 let tx = self
1575 .inner
1576 .transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?;
1577
1578 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1579
1580 if let Some(val) = obj.get(&encoded_key)?.await? {
1581 let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
1582 if let Some(request) =
1583 prev.iter_mut().find(|item| item.transaction_id == transaction_id)
1584 {
1585 request.is_wedged = None;
1586 request.error = error;
1587 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1588 }
1589 }
1590
1591 tx.await.into_result()?;
1592
1593 Ok(())
1594 }
1595
1596 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>> {
1597 let tx = self
1598 .inner
1599 .transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?;
1600
1601 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1602
1603 let all_entries = obj
1604 .get_all()?
1605 .await?
1606 .into_iter()
1607 .map(|item| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&item))
1608 .collect::<Result<Vec<Vec<PersistedQueuedRequest>>, _>>()?
1609 .into_iter()
1610 .flat_map(|vec| vec.into_iter().map(|item| item.room_id))
1611 .collect::<BTreeSet<_>>();
1612
1613 Ok(all_entries.into_iter().collect())
1614 }
1615
1616 async fn save_dependent_queued_request(
1617 &self,
1618 room_id: &RoomId,
1619 parent_txn_id: &TransactionId,
1620 own_txn_id: ChildTransactionId,
1621 created_at: MilliSecondsSinceUnixEpoch,
1622 content: DependentQueuedRequestKind,
1623 ) -> Result<()> {
1624 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1625
1626 let tx = self.inner.transaction_on_one_with_mode(
1627 keys::DEPENDENT_SEND_QUEUE,
1628 IdbTransactionMode::Readwrite,
1629 )?;
1630
1631 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1632
1633 let prev = obj.get(&encoded_key)?.await?;
1636
1637 let mut prev = prev.map_or_else(
1638 || Ok(Vec::new()),
1639 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1640 )?;
1641
1642 prev.push(DependentQueuedRequest {
1644 kind: content,
1645 parent_transaction_id: parent_txn_id.to_owned(),
1646 own_transaction_id: own_txn_id,
1647 parent_key: None,
1648 created_at,
1649 });
1650
1651 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1653
1654 tx.await.into_result()?;
1655
1656 Ok(())
1657 }
1658
1659 async fn update_dependent_queued_request(
1660 &self,
1661 room_id: &RoomId,
1662 own_transaction_id: &ChildTransactionId,
1663 new_content: DependentQueuedRequestKind,
1664 ) -> Result<bool> {
1665 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1666
1667 let tx = self.inner.transaction_on_one_with_mode(
1668 keys::DEPENDENT_SEND_QUEUE,
1669 IdbTransactionMode::Readwrite,
1670 )?;
1671
1672 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1673
1674 let prev = obj.get(&encoded_key)?.await?;
1677
1678 let mut prev = prev.map_or_else(
1679 || Ok(Vec::new()),
1680 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1681 )?;
1682
1683 let mut found = false;
1685 for entry in prev.iter_mut() {
1686 if entry.own_transaction_id == *own_transaction_id {
1687 found = true;
1688 entry.kind = new_content;
1689 break;
1690 }
1691 }
1692
1693 if found {
1694 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1695 tx.await.into_result()?;
1696 }
1697
1698 Ok(found)
1699 }
1700
1701 async fn mark_dependent_queued_requests_as_ready(
1702 &self,
1703 room_id: &RoomId,
1704 parent_txn_id: &TransactionId,
1705 parent_key: SentRequestKey,
1706 ) -> Result<usize> {
1707 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1708
1709 let tx = self.inner.transaction_on_one_with_mode(
1710 keys::DEPENDENT_SEND_QUEUE,
1711 IdbTransactionMode::Readwrite,
1712 )?;
1713
1714 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1715
1716 let prev = obj.get(&encoded_key)?.await?;
1719
1720 let mut prev = prev.map_or_else(
1721 || Ok(Vec::new()),
1722 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1723 )?;
1724
1725 let mut num_updated = 0;
1727 for entry in prev.iter_mut().filter(|entry| entry.parent_transaction_id == parent_txn_id) {
1728 entry.parent_key = Some(parent_key.clone());
1729 num_updated += 1;
1730 }
1731
1732 if num_updated > 0 {
1733 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1734 tx.await.into_result()?;
1735 }
1736
1737 Ok(num_updated)
1738 }
1739
1740 async fn remove_dependent_queued_request(
1741 &self,
1742 room_id: &RoomId,
1743 txn_id: &ChildTransactionId,
1744 ) -> Result<bool> {
1745 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1746
1747 let tx = self.inner.transaction_on_one_with_mode(
1748 keys::DEPENDENT_SEND_QUEUE,
1749 IdbTransactionMode::Readwrite,
1750 )?;
1751
1752 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1753
1754 if let Some(val) = obj.get(&encoded_key)?.await? {
1757 let mut prev = self.deserialize_value::<Vec<DependentQueuedRequest>>(&val)?;
1758 if let Some(pos) = prev.iter().position(|item| item.own_transaction_id == *txn_id) {
1759 prev.remove(pos);
1760
1761 if prev.is_empty() {
1762 obj.delete(&encoded_key)?;
1763 } else {
1764 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1765 }
1766
1767 tx.await.into_result()?;
1768 return Ok(true);
1769 }
1770 }
1771
1772 Ok(false)
1773 }
1774
1775 async fn load_dependent_queued_requests(
1776 &self,
1777 room_id: &RoomId,
1778 ) -> Result<Vec<DependentQueuedRequest>> {
1779 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1780
1781 let prev = self
1783 .inner
1784 .transaction_on_one_with_mode(
1785 keys::DEPENDENT_SEND_QUEUE,
1786 IdbTransactionMode::Readwrite,
1787 )?
1788 .object_store(keys::DEPENDENT_SEND_QUEUE)?
1789 .get(&encoded_key)?
1790 .await?;
1791
1792 prev.map_or_else(
1793 || Ok(Vec::new()),
1794 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1795 )
1796 }
1797
1798 async fn upsert_thread_subscription(
1799 &self,
1800 room: &RoomId,
1801 thread_id: &EventId,
1802 subscription: ThreadSubscription,
1803 ) -> Result<()> {
1804 let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
1805
1806 let tx = self.inner.transaction_on_one_with_mode(
1807 keys::THREAD_SUBSCRIPTIONS,
1808 IdbTransactionMode::Readwrite,
1809 )?;
1810 let obj = tx.object_store(keys::THREAD_SUBSCRIPTIONS)?;
1811
1812 let serialized_value = self.serialize_value(&subscription.as_str().to_owned());
1813 obj.put_key_val(&encoded_key, &serialized_value?)?;
1814
1815 tx.await.into_result()?;
1816
1817 Ok(())
1818 }
1819
1820 async fn load_thread_subscription(
1821 &self,
1822 room: &RoomId,
1823 thread_id: &EventId,
1824 ) -> Result<Option<ThreadSubscription>> {
1825 let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
1826
1827 let js_value = self
1828 .inner
1829 .transaction_on_one_with_mode(keys::THREAD_SUBSCRIPTIONS, IdbTransactionMode::Readonly)?
1830 .object_store(keys::THREAD_SUBSCRIPTIONS)?
1831 .get(&encoded_key)?
1832 .await?;
1833
1834 let Some(js_value) = js_value else {
1835 return Ok(None);
1837 };
1838
1839 let status_string: String = self.deserialize_value(&js_value)?;
1840 let status = ThreadSubscription::from_value(&status_string).ok_or_else(|| {
1841 StoreError::InvalidData {
1842 details: format!(
1843 "invalid thread status for room {room} and thread {thread_id}: {status_string}"
1844 ),
1845 }
1846 })?;
1847
1848 Ok(Some(status))
1849 }
1850
1851 async fn remove_thread_subscription(&self, room: &RoomId, thread_id: &EventId) -> Result<()> {
1852 let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
1853
1854 self.inner
1855 .transaction_on_one_with_mode(
1856 keys::THREAD_SUBSCRIPTIONS,
1857 IdbTransactionMode::Readwrite,
1858 )?
1859 .object_store(keys::THREAD_SUBSCRIPTIONS)?
1860 .delete(&encoded_key)?
1861 .await?;
1862
1863 Ok(())
1864 }
1865});
1866
1867#[derive(Debug, Serialize, Deserialize)]
1869struct RoomMember {
1870 user_id: OwnedUserId,
1871 membership: MembershipState,
1872}
1873
1874impl From<&SyncStateEvent<RoomMemberEventContent>> for RoomMember {
1875 fn from(event: &SyncStateEvent<RoomMemberEventContent>) -> Self {
1876 Self { user_id: event.state_key().clone(), membership: event.membership().clone() }
1877 }
1878}
1879
1880impl From<&StrippedRoomMemberEvent> for RoomMember {
1881 fn from(event: &StrippedRoomMemberEvent) -> Self {
1882 Self { user_id: event.state_key.clone(), membership: event.content.membership.clone() }
1883 }
1884}
1885
1886#[cfg(test)]
1887mod migration_tests {
1888 use assert_matches2::assert_matches;
1889 use matrix_sdk_base::store::{QueuedRequestKind, SerializableEventContent};
1890 use ruma::{
1891 events::room::message::RoomMessageEventContent, room_id, OwnedRoomId, OwnedTransactionId,
1892 TransactionId,
1893 };
1894 use serde::{Deserialize, Serialize};
1895
1896 use crate::state_store::PersistedQueuedRequest;
1897
1898 #[derive(Serialize, Deserialize)]
1899 struct OldPersistedQueuedRequest {
1900 room_id: OwnedRoomId,
1901 event: SerializableEventContent,
1902 transaction_id: OwnedTransactionId,
1903 is_wedged: bool,
1904 }
1905
1906 #[test]
1910 fn test_migrating_persisted_queue_event_serialization() {
1911 let room_a_id = room_id!("!room_a:dummy.local");
1912 let transaction_id = TransactionId::new();
1913 let content =
1914 SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())
1915 .unwrap();
1916
1917 let old_persisted_queue_event = OldPersistedQueuedRequest {
1918 room_id: room_a_id.to_owned(),
1919 event: content,
1920 transaction_id: transaction_id.clone(),
1921 is_wedged: true,
1922 };
1923
1924 let serialized_persisted = serde_json::to_vec(&old_persisted_queue_event).unwrap();
1925
1926 let new_persisted: PersistedQueuedRequest =
1928 serde_json::from_slice(&serialized_persisted).unwrap();
1929
1930 assert_eq!(new_persisted.is_wedged, Some(true));
1931 assert!(new_persisted.error.is_none());
1932
1933 assert!(new_persisted.event.is_some());
1934 assert!(new_persisted.kind.is_none());
1935
1936 let queued = new_persisted.into_queued_request().unwrap();
1937 assert_matches!(queued.kind, QueuedRequestKind::Event { .. });
1938 assert_eq!(queued.transaction_id, transaction_id);
1939 assert!(queued.error.is_some());
1940 }
1941}
1942
1943#[cfg(all(test, target_family = "wasm"))]
1944mod tests {
1945 #[cfg(target_family = "wasm")]
1946 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
1947
1948 use matrix_sdk_base::statestore_integration_tests;
1949 use uuid::Uuid;
1950
1951 use super::{IndexeddbStateStore, Result};
1952
1953 async fn get_store() -> Result<IndexeddbStateStore> {
1954 let db_name = format!("test-state-plain-{}", Uuid::new_v4().as_hyphenated());
1955 Ok(IndexeddbStateStore::builder().name(db_name).build().await?)
1956 }
1957
1958 statestore_integration_tests!();
1959}
1960
1961#[cfg(all(test, target_family = "wasm"))]
1962mod encrypted_tests {
1963 #[cfg(target_family = "wasm")]
1964 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
1965
1966 use matrix_sdk_base::statestore_integration_tests;
1967 use uuid::Uuid;
1968
1969 use super::{IndexeddbStateStore, Result};
1970
1971 async fn get_store() -> Result<IndexeddbStateStore> {
1972 let db_name = format!("test-state-encrypted-{}", Uuid::new_v4().as_hyphenated());
1973 let passphrase = format!("some_passphrase-{}", Uuid::new_v4().as_hyphenated());
1974 Ok(IndexeddbStateStore::builder().name(db_name).passphrase(passphrase).build().await?)
1975 }
1976
1977 statestore_integration_tests!();
1978}