1use std::{
16 cmp::Reverse,
17 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
18 str::FromStr as _,
19 sync::Arc,
20};
21
22use async_trait::async_trait;
23use gloo_utils::format::JsValueSerdeExt;
24use growable_bloom_filter::GrowableBloom;
25use indexed_db_futures::{
26 KeyRange, cursor::CursorDirection, database::Database, error::OpenDbError, prelude::*,
27 transaction::TransactionMode,
28};
29use matrix_sdk_base::{
30 MinimalRoomMemberEvent, ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK, RoomInfo,
31 RoomMemberships, StateStoreDataKey, StateStoreDataValue, ThreadSubscriptionCatchupToken,
32 deserialized_responses::{DisplayName, RawAnySyncOrStrippedState},
33 store::{
34 ChildTransactionId, ComposerDraft, DependentQueuedRequest, DependentQueuedRequestKind,
35 QueuedRequest, QueuedRequestKind, RoomLoadSettings, SentRequestKey,
36 SerializableEventContent, StateChanges, StateStore, StoreError, StoredThreadSubscription,
37 SupportedVersionsResponse, ThreadSubscriptionStatus, TtlStoreValue, WellKnownResponse,
38 compare_thread_subscription_bump_stamps,
39 },
40};
41use matrix_sdk_store_encryption::{Error as EncryptionError, StoreCipher};
42use ruma::{
43 CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri,
44 OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
45 canonical_json::{RedactedBecause, redact},
46 events::{
47 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
48 GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, SyncStateEvent,
49 presence::PresenceEvent,
50 receipt::{Receipt, ReceiptThread, ReceiptType},
51 room::member::{
52 MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent, SyncRoomMemberEvent,
53 },
54 },
55 serde::Raw,
56};
57use serde::{Deserialize, Serialize, de::DeserializeOwned, ser::Error};
58use tracing::{debug, warn};
59use wasm_bindgen::JsValue;
60
61mod migrations;
62
63pub use self::migrations::MigrationConflictStrategy;
64use self::migrations::{upgrade_inner_db, upgrade_meta_db};
65use crate::{error::GenericError, serializer::safe_encode::traits::SafeEncode};
66
67#[derive(Debug, thiserror::Error)]
68pub enum IndexeddbStateStoreError {
69 #[error(transparent)]
70 Json(#[from] serde_json::Error),
71 #[error(transparent)]
72 Encryption(#[from] EncryptionError),
73 #[error("DomException {name} ({code}): {message}")]
74 DomException { name: String, message: String, code: u16 },
75 #[error(transparent)]
76 StoreError(#[from] StoreError),
77 #[error(
78 "Can't migrate {name} from {old_version} to {new_version} without deleting data. \
79 See MigrationConflictStrategy for ways to configure."
80 )]
81 MigrationConflict { name: String, old_version: u32, new_version: u32 },
82}
83
84impl From<GenericError> for IndexeddbStateStoreError {
85 fn from(value: GenericError) -> Self {
86 Self::StoreError(value.into())
87 }
88}
89
90impl From<web_sys::DomException> for IndexeddbStateStoreError {
91 fn from(frm: web_sys::DomException) -> IndexeddbStateStoreError {
92 IndexeddbStateStoreError::DomException {
93 name: frm.name(),
94 message: frm.message(),
95 code: frm.code(),
96 }
97 }
98}
99
100impl From<IndexeddbStateStoreError> for StoreError {
101 fn from(e: IndexeddbStateStoreError) -> Self {
102 match e {
103 IndexeddbStateStoreError::Json(e) => StoreError::Json(e),
104 IndexeddbStateStoreError::StoreError(e) => e,
105 IndexeddbStateStoreError::Encryption(e) => StoreError::Encryption(e),
106 _ => StoreError::backend(e),
107 }
108 }
109}
110
111impl From<indexed_db_futures::error::DomException> for IndexeddbStateStoreError {
112 fn from(value: indexed_db_futures::error::DomException) -> Self {
113 web_sys::DomException::from(value).into()
114 }
115}
116
117impl From<indexed_db_futures::error::SerialisationError> for IndexeddbStateStoreError {
118 fn from(value: indexed_db_futures::error::SerialisationError) -> Self {
119 Self::Json(serde_json::Error::custom(value.to_string()))
120 }
121}
122
123impl From<indexed_db_futures::error::UnexpectedDataError> for IndexeddbStateStoreError {
124 fn from(value: indexed_db_futures::error::UnexpectedDataError) -> Self {
125 IndexeddbStateStoreError::StoreError(StoreError::backend(value))
126 }
127}
128
129impl From<indexed_db_futures::error::JSError> for IndexeddbStateStoreError {
130 fn from(value: indexed_db_futures::error::JSError) -> Self {
131 GenericError::from(value.to_string()).into()
132 }
133}
134
135impl From<indexed_db_futures::error::Error> for IndexeddbStateStoreError {
136 fn from(value: indexed_db_futures::error::Error) -> Self {
137 use indexed_db_futures::error::Error;
138 match value {
139 Error::DomException(e) => e.into(),
140 Error::Serialisation(e) => e.into(),
141 Error::MissingData(e) => e.into(),
142 Error::Unknown(e) => e.into(),
143 }
144 }
145}
146
147impl From<OpenDbError> for IndexeddbStateStoreError {
148 fn from(value: OpenDbError) -> Self {
149 match value {
150 OpenDbError::Base(error) => error.into(),
151 _ => GenericError::from(value.to_string()).into(),
152 }
153 }
154}
155
156mod keys {
157 pub const INTERNAL_STATE: &str = "matrix-sdk-state";
158 pub const BACKUPS_META: &str = "backups";
159
160 pub const ACCOUNT_DATA: &str = "account_data";
161
162 pub const PROFILES: &str = "profiles";
163 pub const DISPLAY_NAMES: &str = "display_names";
164 pub const USER_IDS: &str = "user_ids";
165
166 pub const ROOM_STATE: &str = "room_state";
167 pub const ROOM_INFOS: &str = "room_infos";
168 pub const PRESENCE: &str = "presence";
169 pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
170 pub const ROOM_SEND_QUEUE: &str = "room_send_queue";
172 pub const DEPENDENT_SEND_QUEUE: &str = "room_dependent_send_queue";
174 pub const THREAD_SUBSCRIPTIONS: &str = "room_thread_subscriptions";
175
176 pub const STRIPPED_ROOM_STATE: &str = "stripped_room_state";
177 pub const STRIPPED_USER_IDS: &str = "stripped_user_ids";
178
179 pub const ROOM_USER_RECEIPTS: &str = "room_user_receipts";
180 pub const ROOM_EVENT_RECEIPTS: &str = "room_event_receipts";
181
182 pub const CUSTOM: &str = "custom";
183 pub const KV: &str = "kv";
184
185 pub const ALL_STORES: &[&str] = &[
187 ACCOUNT_DATA,
188 PROFILES,
189 DISPLAY_NAMES,
190 USER_IDS,
191 ROOM_STATE,
192 ROOM_INFOS,
193 PRESENCE,
194 ROOM_ACCOUNT_DATA,
195 STRIPPED_ROOM_STATE,
196 STRIPPED_USER_IDS,
197 ROOM_USER_RECEIPTS,
198 ROOM_EVENT_RECEIPTS,
199 ROOM_SEND_QUEUE,
200 THREAD_SUBSCRIPTIONS,
201 DEPENDENT_SEND_QUEUE,
202 CUSTOM,
203 KV,
204 ];
205
206 pub const STORE_KEY: &str = "store_key";
209}
210
211pub use keys::ALL_STORES;
212use matrix_sdk_base::store::QueueWedgeError;
213
214fn serialize_value(store_cipher: Option<&StoreCipher>, event: &impl Serialize) -> Result<JsValue> {
216 Ok(match store_cipher {
217 Some(cipher) => {
218 let data = serde_json::to_vec(event)?;
219 JsValue::from_serde(&cipher.encrypt_value_data(data)?)?
220 }
221 None => JsValue::from_serde(event)?,
222 })
223}
224
225fn deserialize_value<T: DeserializeOwned>(
227 store_cipher: Option<&StoreCipher>,
228 event: &JsValue,
229) -> Result<T> {
230 match store_cipher {
231 Some(cipher) => {
232 use zeroize::Zeroize;
233 let mut plaintext = cipher.decrypt_value_data(event.into_serde()?)?;
234 let ret = serde_json::from_slice(&plaintext);
235 plaintext.zeroize();
236 Ok(ret?)
237 }
238 None => Ok(event.into_serde()?),
239 }
240}
241
242fn encode_key<T>(store_cipher: Option<&StoreCipher>, table_name: &str, key: T) -> JsValue
243where
244 T: SafeEncode,
245{
246 match store_cipher {
247 Some(cipher) => key.as_secure_string(table_name, cipher),
248 None => key.as_encoded_string(),
249 }
250 .into()
251}
252
253fn encode_to_range<T>(
254 store_cipher: Option<&StoreCipher>,
255 table_name: &str,
256 key: T,
257) -> KeyRange<JsValue>
258where
259 T: SafeEncode,
260{
261 match store_cipher {
262 Some(cipher) => key.encode_to_range_secure(table_name, cipher),
263 None => key.encode_to_range(),
264 }
265}
266
267#[derive(Debug)]
269pub struct IndexeddbStateStoreBuilder {
270 name: Option<String>,
271 passphrase: Option<String>,
272 migration_conflict_strategy: MigrationConflictStrategy,
273}
274
275impl IndexeddbStateStoreBuilder {
276 fn new() -> Self {
277 Self {
278 name: None,
279 passphrase: None,
280 migration_conflict_strategy: MigrationConflictStrategy::BackupAndDrop,
281 }
282 }
283
284 pub fn name(mut self, value: String) -> Self {
286 self.name = Some(value);
287 self
288 }
289
290 pub fn passphrase(mut self, value: String) -> Self {
294 self.passphrase = Some(value);
295 self
296 }
297
298 pub fn migration_conflict_strategy(mut self, value: MigrationConflictStrategy) -> Self {
302 self.migration_conflict_strategy = value;
303 self
304 }
305
306 pub async fn build(self) -> Result<IndexeddbStateStore> {
307 let migration_strategy = self.migration_conflict_strategy.clone();
308 let name = self.name.unwrap_or_else(|| "state".to_owned());
309
310 let meta_name = format!("{name}::{}", keys::INTERNAL_STATE);
311
312 let (meta, store_cipher) = upgrade_meta_db(&meta_name, self.passphrase.as_deref()).await?;
313 let inner =
314 upgrade_inner_db(&name, store_cipher.as_deref(), migration_strategy, &meta).await?;
315
316 Ok(IndexeddbStateStore { name, inner, meta, store_cipher })
317 }
318}
319
320pub struct IndexeddbStateStore {
321 name: String,
322 pub(crate) inner: Database,
323 pub(crate) meta: Database,
324 pub(crate) store_cipher: Option<Arc<StoreCipher>>,
325}
326
327#[cfg(not(tarpaulin_include))]
328impl std::fmt::Debug for IndexeddbStateStore {
329 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
330 f.debug_struct("IndexeddbStateStore").field("name", &self.name).finish()
331 }
332}
333
334type Result<A, E = IndexeddbStateStoreError> = std::result::Result<A, E>;
335
336impl IndexeddbStateStore {
337 pub fn builder() -> IndexeddbStateStoreBuilder {
339 IndexeddbStateStoreBuilder::new()
340 }
341
342 pub fn version(&self) -> u32 {
344 self.inner.version() as u32
345 }
346
347 pub fn meta_version(&self) -> u32 {
349 self.meta.version() as u32
350 }
351
352 pub async fn has_backups(&self) -> Result<bool> {
354 Ok(self
355 .meta
356 .transaction(keys::BACKUPS_META)
357 .with_mode(TransactionMode::Readonly)
358 .build()?
359 .object_store(keys::BACKUPS_META)?
360 .count()
361 .await?
362 > 0)
363 }
364
365 pub async fn latest_backup(&self) -> Result<Option<String>> {
367 if let Some(mut cursor) = self
368 .meta
369 .transaction(keys::BACKUPS_META)
370 .with_mode(TransactionMode::Readonly)
371 .build()?
372 .object_store(keys::BACKUPS_META)?
373 .open_cursor()
374 .with_direction(CursorDirection::Prev)
375 .await?
376 && let Some(record) = cursor.next_record::<JsValue>().await?
377 {
378 Ok(record.as_string())
379 } else {
380 Ok(None)
381 }
382 }
383
384 fn serialize_value(&self, event: &impl Serialize) -> Result<JsValue> {
386 serialize_value(self.store_cipher.as_deref(), event)
387 }
388
389 fn deserialize_value<T: DeserializeOwned>(&self, event: &JsValue) -> Result<T> {
391 deserialize_value(self.store_cipher.as_deref(), event)
392 }
393
394 fn encode_key<T>(&self, table_name: &str, key: T) -> JsValue
395 where
396 T: SafeEncode,
397 {
398 encode_key(self.store_cipher.as_deref(), table_name, key)
399 }
400
401 fn encode_to_range<T>(&self, table_name: &str, key: T) -> KeyRange<JsValue>
402 where
403 T: SafeEncode,
404 {
405 encode_to_range(self.store_cipher.as_deref(), table_name, key)
406 }
407
408 pub async fn get_user_ids_inner(
411 &self,
412 room_id: &RoomId,
413 memberships: RoomMemberships,
414 stripped: bool,
415 ) -> Result<Vec<OwnedUserId>> {
416 let store_name = if stripped { keys::STRIPPED_USER_IDS } else { keys::USER_IDS };
417
418 let tx = self.inner.transaction(store_name).with_mode(TransactionMode::Readonly).build()?;
419 let store = tx.object_store(store_name)?;
420 let range = self.encode_to_range(store_name, room_id);
421
422 let user_ids = if memberships.is_empty() {
423 store
425 .get_all()
426 .with_query(&range)
427 .await?
428 .filter_map(Result::ok)
429 .filter_map(|f| self.deserialize_value::<RoomMember>(&f).ok().map(|m| m.user_id))
430 .collect::<Vec<_>>()
431 } else {
432 let mut user_ids = Vec::new();
433 let cursor = store.open_cursor().with_query(&range).await?;
434
435 if let Some(mut cursor) = cursor {
436 while let Some(value) = cursor.next_record().await? {
437 let member = self.deserialize_value::<RoomMember>(&value)?;
438
439 if memberships.matches(&member.membership) {
440 user_ids.push(member.user_id);
441 }
442 }
443 }
444
445 user_ids
446 };
447
448 Ok(user_ids)
449 }
450
451 async fn get_custom_value_for_js(&self, jskey: &JsValue) -> Result<Option<Vec<u8>>> {
452 self.inner
453 .transaction(keys::CUSTOM)
454 .with_mode(TransactionMode::Readonly)
455 .build()?
456 .object_store(keys::CUSTOM)?
457 .get(jskey)
458 .await?
459 .map(|f| self.deserialize_value(&f))
460 .transpose()
461 }
462
463 fn encode_kv_data_key(&self, key: StateStoreDataKey<'_>) -> JsValue {
464 match key {
468 StateStoreDataKey::SyncToken => {
469 self.encode_key(StateStoreDataKey::SYNC_TOKEN, StateStoreDataKey::SYNC_TOKEN)
470 }
471 StateStoreDataKey::SupportedVersions => self.encode_key(
472 StateStoreDataKey::SUPPORTED_VERSIONS,
473 StateStoreDataKey::SUPPORTED_VERSIONS,
474 ),
475 StateStoreDataKey::WellKnown => {
476 self.encode_key(keys::KV, StateStoreDataKey::WELL_KNOWN)
477 }
478 StateStoreDataKey::Filter(filter_name) => {
479 self.encode_key(StateStoreDataKey::FILTER, (StateStoreDataKey::FILTER, filter_name))
480 }
481 StateStoreDataKey::UserAvatarUrl(user_id) => {
482 self.encode_key(keys::KV, (StateStoreDataKey::USER_AVATAR_URL, user_id))
483 }
484 StateStoreDataKey::RecentlyVisitedRooms(user_id) => {
485 self.encode_key(keys::KV, (StateStoreDataKey::RECENTLY_VISITED_ROOMS, user_id))
486 }
487 StateStoreDataKey::UtdHookManagerData => {
488 self.encode_key(keys::KV, StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
489 }
490 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
491 self.encode_key(keys::KV, StateStoreDataKey::ONE_TIME_KEY_ALREADY_UPLOADED)
492 }
493 StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
494 if let Some(thread_root) = thread_root {
495 self.encode_key(
496 keys::KV,
497 (StateStoreDataKey::COMPOSER_DRAFT, (room_id, thread_root)),
498 )
499 } else {
500 self.encode_key(keys::KV, (StateStoreDataKey::COMPOSER_DRAFT, room_id))
501 }
502 }
503 StateStoreDataKey::SeenKnockRequests(room_id) => {
504 self.encode_key(keys::KV, (StateStoreDataKey::SEEN_KNOCK_REQUESTS, room_id))
505 }
506 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
507 self.encode_key(keys::KV, StateStoreDataKey::THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS)
508 }
509 }
510 }
511}
512
513#[derive(Serialize, Deserialize)]
516struct PersistedQueuedRequest {
517 pub room_id: OwnedRoomId,
519
520 kind: Option<QueuedRequestKind>,
523 transaction_id: OwnedTransactionId,
524
525 pub error: Option<QueueWedgeError>,
526
527 priority: Option<usize>,
528
529 #[serde(default = "created_now")]
531 created_at: MilliSecondsSinceUnixEpoch,
532
533 is_wedged: Option<bool>,
536
537 event: Option<SerializableEventContent>,
538}
539
540fn created_now() -> MilliSecondsSinceUnixEpoch {
541 MilliSecondsSinceUnixEpoch::now()
542}
543
544impl PersistedQueuedRequest {
545 fn into_queued_request(self) -> Option<QueuedRequest> {
546 let kind =
547 self.kind.or_else(|| self.event.map(|content| QueuedRequestKind::Event { content }))?;
548
549 let error = match self.is_wedged {
550 Some(true) => {
551 Some(QueueWedgeError::GenericApiError {
553 msg: "local echo failed to send in a previous session".into(),
554 })
555 }
556 _ => self.error,
557 };
558
559 let priority = self.priority.unwrap_or(0);
561
562 Some(QueuedRequest {
563 kind,
564 transaction_id: self.transaction_id,
565 error,
566 priority,
567 created_at: self.created_at,
568 })
569 }
570}
571
572#[derive(Serialize, Deserialize, PartialEq)]
573struct PersistedThreadSubscription {
574 status: String,
575 bump_stamp: Option<u64>,
576}
577
578impl From<StoredThreadSubscription> for PersistedThreadSubscription {
579 fn from(value: StoredThreadSubscription) -> Self {
580 Self { status: value.status.as_str().to_owned(), bump_stamp: value.bump_stamp }
581 }
582}
583
584#[cfg(target_family = "wasm")]
593macro_rules! impl_state_store {
594 ({ $($body:tt)* }) => {
595 #[async_trait(?Send)]
596 impl StateStore for IndexeddbStateStore {
597 type Error = IndexeddbStateStoreError;
598
599 $($body)*
600 }
601 };
602}
603
604#[cfg(not(target_family = "wasm"))]
605macro_rules! impl_state_store {
606 ({ $($body:tt)* }) => {
607 impl IndexeddbStateStore {
608 $($body)*
609 }
610 };
611}
612
613impl_state_store!({
614 async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
615 let encoded_key = self.encode_kv_data_key(key);
616
617 let value = self
618 .inner
619 .transaction(keys::KV)
620 .with_mode(TransactionMode::Readonly)
621 .build()?
622 .object_store(keys::KV)?
623 .get(&encoded_key)
624 .await?;
625
626 let value = match key {
627 StateStoreDataKey::SyncToken => value
628 .map(|f| self.deserialize_value::<String>(&f))
629 .transpose()?
630 .map(StateStoreDataValue::SyncToken),
631 StateStoreDataKey::SupportedVersions => value
632 .map(|f| self.deserialize_value::<TtlStoreValue<SupportedVersionsResponse>>(&f))
633 .transpose()?
634 .map(StateStoreDataValue::SupportedVersions),
635 StateStoreDataKey::WellKnown => value
636 .map(|f| self.deserialize_value::<TtlStoreValue<Option<WellKnownResponse>>>(&f))
637 .transpose()?
638 .map(StateStoreDataValue::WellKnown),
639 StateStoreDataKey::Filter(_) => value
640 .map(|f| self.deserialize_value::<String>(&f))
641 .transpose()?
642 .map(StateStoreDataValue::Filter),
643 StateStoreDataKey::UserAvatarUrl(_) => value
644 .map(|f| self.deserialize_value::<OwnedMxcUri>(&f))
645 .transpose()?
646 .map(StateStoreDataValue::UserAvatarUrl),
647 StateStoreDataKey::RecentlyVisitedRooms(_) => value
648 .map(|f| self.deserialize_value::<Vec<OwnedRoomId>>(&f))
649 .transpose()?
650 .map(StateStoreDataValue::RecentlyVisitedRooms),
651 StateStoreDataKey::UtdHookManagerData => value
652 .map(|f| self.deserialize_value::<GrowableBloom>(&f))
653 .transpose()?
654 .map(StateStoreDataValue::UtdHookManagerData),
655 StateStoreDataKey::OneTimeKeyAlreadyUploaded => value
656 .map(|f| self.deserialize_value::<bool>(&f))
657 .transpose()?
658 .map(|_| StateStoreDataValue::OneTimeKeyAlreadyUploaded),
659 StateStoreDataKey::ComposerDraft(_, _) => value
660 .map(|f| self.deserialize_value::<ComposerDraft>(&f))
661 .transpose()?
662 .map(StateStoreDataValue::ComposerDraft),
663 StateStoreDataKey::SeenKnockRequests(_) => value
664 .map(|f| self.deserialize_value::<BTreeMap<OwnedEventId, OwnedUserId>>(&f))
665 .transpose()?
666 .map(StateStoreDataValue::SeenKnockRequests),
667 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => value
668 .map(|f| self.deserialize_value::<Vec<ThreadSubscriptionCatchupToken>>(&f))
669 .transpose()?
670 .map(StateStoreDataValue::ThreadSubscriptionsCatchupTokens),
671 };
672
673 Ok(value)
674 }
675
676 async fn set_kv_data(
677 &self,
678 key: StateStoreDataKey<'_>,
679 value: StateStoreDataValue,
680 ) -> Result<()> {
681 let encoded_key = self.encode_kv_data_key(key);
682
683 let serialized_value = match key {
684 StateStoreDataKey::SyncToken => self
685 .serialize_value(&value.into_sync_token().expect("Session data not a sync token")),
686 StateStoreDataKey::SupportedVersions => self.serialize_value(
687 &value
688 .into_supported_versions()
689 .expect("Session data not containing supported versions"),
690 ),
691 StateStoreDataKey::WellKnown => self.serialize_value(
692 &value.into_well_known().expect("Session data not containing well-known"),
693 ),
694 StateStoreDataKey::Filter(_) => {
695 self.serialize_value(&value.into_filter().expect("Session data not a filter"))
696 }
697 StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
698 &value.into_user_avatar_url().expect("Session data not an user avatar url"),
699 ),
700 StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
701 &value
702 .into_recently_visited_rooms()
703 .expect("Session data not a recently visited room list"),
704 ),
705 StateStoreDataKey::UtdHookManagerData => self.serialize_value(
706 &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
707 ),
708 StateStoreDataKey::OneTimeKeyAlreadyUploaded => self.serialize_value(&true),
709 StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
710 &value.into_composer_draft().expect("Session data not a composer draft"),
711 ),
712 StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
713 &value
714 .into_seen_knock_requests()
715 .expect("Session data is not a set of seen knock request ids"),
716 ),
717 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => self.serialize_value(
718 &value
719 .into_thread_subscriptions_catchup_tokens()
720 .expect("Session data is not a list of thread subscription catchup tokens"),
721 ),
722 };
723
724 let tx = self.inner.transaction(keys::KV).with_mode(TransactionMode::Readwrite).build()?;
725
726 let obj = tx.object_store(keys::KV)?;
727
728 obj.put(&serialized_value?).with_key(encoded_key).build()?;
729
730 tx.commit().await?;
731
732 Ok(())
733 }
734
735 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
736 let encoded_key = self.encode_kv_data_key(key);
737
738 let tx = self.inner.transaction(keys::KV).with_mode(TransactionMode::Readwrite).build()?;
739 let obj = tx.object_store(keys::KV)?;
740
741 obj.delete(&encoded_key).build()?;
742
743 tx.commit().await?;
744
745 Ok(())
746 }
747
748 async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
749 let mut stores: HashSet<&'static str> = [
750 (changes.sync_token.is_some(), keys::KV),
751 (!changes.ambiguity_maps.is_empty(), keys::DISPLAY_NAMES),
752 (!changes.account_data.is_empty(), keys::ACCOUNT_DATA),
753 (!changes.presence.is_empty(), keys::PRESENCE),
754 (
755 !changes.profiles.is_empty() || !changes.profiles_to_delete.is_empty(),
756 keys::PROFILES,
757 ),
758 (!changes.room_account_data.is_empty(), keys::ROOM_ACCOUNT_DATA),
759 (!changes.receipts.is_empty(), keys::ROOM_EVENT_RECEIPTS),
760 ]
761 .iter()
762 .filter_map(|(id, key)| if *id { Some(*key) } else { None })
763 .collect();
764
765 if !changes.state.is_empty() {
766 stores.extend([
767 keys::ROOM_STATE,
768 keys::USER_IDS,
769 keys::STRIPPED_USER_IDS,
770 keys::STRIPPED_ROOM_STATE,
771 keys::PROFILES,
772 ]);
773 }
774
775 if !changes.redactions.is_empty() {
776 stores.extend([keys::ROOM_STATE, keys::ROOM_INFOS]);
777 }
778
779 if !changes.room_infos.is_empty() {
780 stores.insert(keys::ROOM_INFOS);
781 }
782
783 if !changes.stripped_state.is_empty() {
784 stores.extend([keys::STRIPPED_ROOM_STATE, keys::STRIPPED_USER_IDS]);
785 }
786
787 if !changes.receipts.is_empty() {
788 stores.extend([keys::ROOM_EVENT_RECEIPTS, keys::ROOM_USER_RECEIPTS])
789 }
790
791 if stores.is_empty() {
792 return Ok(());
794 }
795
796 let stores: Vec<&'static str> = stores.into_iter().collect();
797 let tx = self.inner.transaction(stores).with_mode(TransactionMode::Readwrite).build()?;
798
799 if let Some(s) = &changes.sync_token {
800 tx.object_store(keys::KV)?
801 .put(&self.serialize_value(s)?)
802 .with_key(self.encode_kv_data_key(StateStoreDataKey::SyncToken))
803 .build()?;
804 }
805
806 if !changes.ambiguity_maps.is_empty() {
807 let store = tx.object_store(keys::DISPLAY_NAMES)?;
808 for (room_id, ambiguity_maps) in &changes.ambiguity_maps {
809 for (display_name, map) in ambiguity_maps {
810 let key = self.encode_key(
811 keys::DISPLAY_NAMES,
812 (
813 room_id,
814 display_name
815 .as_normalized_str()
816 .unwrap_or_else(|| display_name.as_raw_str()),
817 ),
818 );
819
820 store.put(&self.serialize_value(&map)?).with_key(key).build()?;
821 }
822 }
823 }
824
825 if !changes.account_data.is_empty() {
826 let store = tx.object_store(keys::ACCOUNT_DATA)?;
827 for (event_type, event) in &changes.account_data {
828 store
829 .put(&self.serialize_value(&event)?)
830 .with_key(self.encode_key(keys::ACCOUNT_DATA, event_type))
831 .build()?;
832 }
833 }
834
835 if !changes.room_account_data.is_empty() {
836 let store = tx.object_store(keys::ROOM_ACCOUNT_DATA)?;
837 for (room, events) in &changes.room_account_data {
838 for (event_type, event) in events {
839 let key = self.encode_key(keys::ROOM_ACCOUNT_DATA, (room, event_type));
840 store.put(&self.serialize_value(&event)?).with_key(key).build()?;
841 }
842 }
843 }
844
845 if !changes.state.is_empty() {
846 let state = tx.object_store(keys::ROOM_STATE)?;
847 let profiles = tx.object_store(keys::PROFILES)?;
848 let user_ids = tx.object_store(keys::USER_IDS)?;
849 let stripped_state = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
850 let stripped_user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
851
852 for (room, user_ids) in &changes.profiles_to_delete {
853 for user_id in user_ids {
854 let key = self.encode_key(keys::PROFILES, (room, user_id));
855 profiles.delete(&key).build()?;
856 }
857 }
858
859 for (room, event_types) in &changes.state {
860 let profile_changes = changes.profiles.get(room);
861
862 for (event_type, events) in event_types {
863 for (state_key, raw_event) in events {
864 let key = self.encode_key(keys::ROOM_STATE, (room, event_type, state_key));
865 state
866 .put(&self.serialize_value(&raw_event)?)
867 .with_key(key.clone())
868 .build()?;
869 stripped_state.delete(&key).build()?;
870
871 if *event_type == StateEventType::RoomMember {
872 let event =
873 match raw_event.deserialize_as_unchecked::<SyncRoomMemberEvent>() {
874 Ok(ev) => ev,
875 Err(e) => {
876 let event_id: Option<String> =
877 raw_event.get_field("event_id").ok().flatten();
878 debug!(event_id, "Failed to deserialize member event: {e}");
879 continue;
880 }
881 };
882
883 let key = (room, state_key);
884
885 stripped_user_ids
886 .delete(&self.encode_key(keys::STRIPPED_USER_IDS, key))
887 .build()?;
888
889 user_ids
890 .put(&self.serialize_value(&RoomMember::from(&event))?)
891 .with_key(self.encode_key(keys::USER_IDS, key))
892 .build()?;
893
894 if let Some(profile) =
895 profile_changes.and_then(|p| p.get(event.state_key()))
896 {
897 profiles
898 .put(&self.serialize_value(&profile)?)
899 .with_key(self.encode_key(keys::PROFILES, key))
900 .build()?;
901 }
902 }
903 }
904 }
905 }
906 }
907
908 if !changes.room_infos.is_empty() {
909 let room_infos = tx.object_store(keys::ROOM_INFOS)?;
910 for (room_id, room_info) in &changes.room_infos {
911 room_infos
912 .put(&self.serialize_value(&room_info)?)
913 .with_key(self.encode_key(keys::ROOM_INFOS, room_id))
914 .build()?;
915 }
916 }
917
918 if !changes.presence.is_empty() {
919 let store = tx.object_store(keys::PRESENCE)?;
920 for (sender, event) in &changes.presence {
921 store
922 .put(&self.serialize_value(&event)?)
923 .with_key(self.encode_key(keys::PRESENCE, sender))
924 .build()?;
925 }
926 }
927
928 if !changes.stripped_state.is_empty() {
929 let store = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
930 let user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
931
932 for (room, event_types) in &changes.stripped_state {
933 for (event_type, events) in event_types {
934 for (state_key, raw_event) in events {
935 let key = self
936 .encode_key(keys::STRIPPED_ROOM_STATE, (room, event_type, state_key));
937 store.put(&self.serialize_value(&raw_event)?).with_key(key).build()?;
938
939 if *event_type == StateEventType::RoomMember {
940 let event = match raw_event
941 .deserialize_as_unchecked::<StrippedRoomMemberEvent>()
942 {
943 Ok(ev) => ev,
944 Err(e) => {
945 let event_id: Option<String> =
946 raw_event.get_field("event_id").ok().flatten();
947 debug!(
948 event_id,
949 "Failed to deserialize stripped member event: {e}"
950 );
951 continue;
952 }
953 };
954
955 let key = (room, state_key);
956
957 user_ids
958 .put(&self.serialize_value(&RoomMember::from(&event))?)
959 .with_key(self.encode_key(keys::STRIPPED_USER_IDS, key))
960 .build()?;
961 }
962 }
963 }
964 }
965 }
966
967 if !changes.receipts.is_empty() {
968 let room_user_receipts = tx.object_store(keys::ROOM_USER_RECEIPTS)?;
969 let room_event_receipts = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
970
971 for (room, content) in &changes.receipts {
972 for (event_id, receipts) in &content.0 {
973 for (receipt_type, receipts) in receipts {
974 for (user_id, receipt) in receipts {
975 let key = match receipt.thread.as_str() {
976 Some(thread_id) => self.encode_key(
977 keys::ROOM_USER_RECEIPTS,
978 (room, receipt_type, thread_id, user_id),
979 ),
980 None => self.encode_key(
981 keys::ROOM_USER_RECEIPTS,
982 (room, receipt_type, user_id),
983 ),
984 };
985
986 if let Some((old_event, _)) =
987 room_user_receipts.get(&key).await?.and_then(|f| {
988 self.deserialize_value::<(OwnedEventId, Receipt)>(&f).ok()
989 })
990 {
991 let key = match receipt.thread.as_str() {
992 Some(thread_id) => self.encode_key(
993 keys::ROOM_EVENT_RECEIPTS,
994 (room, receipt_type, thread_id, old_event, user_id),
995 ),
996 None => self.encode_key(
997 keys::ROOM_EVENT_RECEIPTS,
998 (room, receipt_type, old_event, user_id),
999 ),
1000 };
1001 room_event_receipts.delete(&key).build()?;
1002 }
1003
1004 room_user_receipts
1005 .put(&self.serialize_value(&(event_id, receipt))?)
1006 .with_key(key)
1007 .build()?;
1008
1009 let key = match receipt.thread.as_str() {
1011 Some(thread_id) => self.encode_key(
1012 keys::ROOM_EVENT_RECEIPTS,
1013 (room, receipt_type, thread_id, event_id, user_id),
1014 ),
1015 None => self.encode_key(
1016 keys::ROOM_EVENT_RECEIPTS,
1017 (room, receipt_type, event_id, user_id),
1018 ),
1019 };
1020 room_event_receipts
1021 .put(&self.serialize_value(&(user_id, receipt))?)
1022 .with_key(key)
1023 .build()?;
1024 }
1025 }
1026 }
1027 }
1028 }
1029
1030 if !changes.redactions.is_empty() {
1031 let state = tx.object_store(keys::ROOM_STATE)?;
1032 let room_info = tx.object_store(keys::ROOM_INFOS)?;
1033
1034 for (room_id, redactions) in &changes.redactions {
1035 let range = self.encode_to_range(keys::ROOM_STATE, room_id);
1036 let Some(mut cursor) = state.open_cursor().with_query(&range).await? else {
1037 continue;
1038 };
1039
1040 let mut redaction_rules = None;
1041
1042 while let Some(value) = cursor.next_record().await? {
1043 let Some(key) = cursor.key::<JsValue>()? else {
1044 break;
1045 };
1046
1047 let raw_evt = self.deserialize_value::<Raw<AnySyncStateEvent>>(&value)?;
1048 if let Ok(Some(event_id)) = raw_evt.get_field::<OwnedEventId>("event_id")
1049 && let Some(redaction) = redactions.get(&event_id)
1050 {
1051 let redaction_rules = match &redaction_rules {
1052 Some(r) => r,
1053 None => {
1054 let value = room_info
1055 .get(&self.encode_key(keys::ROOM_INFOS, room_id))
1056 .await?
1057 .and_then(|f| self.deserialize_value::<RoomInfo>(&f).ok())
1058 .map(|info| info.room_version_rules_or_default())
1059 .unwrap_or_else(|| {
1060 warn!(
1061 ?room_id,
1062 "Unable to get the room version rules, \
1063 defaulting to rules for room version \
1064 {ROOM_VERSION_FALLBACK}"
1065 );
1066 ROOM_VERSION_RULES_FALLBACK
1067 })
1068 .redaction;
1069 redaction_rules.get_or_insert(value)
1070 }
1071 };
1072
1073 let redacted = redact(
1074 raw_evt.deserialize_as::<CanonicalJsonObject>()?,
1075 redaction_rules,
1076 Some(RedactedBecause::from_raw_event(redaction)?),
1077 )
1078 .map_err(StoreError::Redaction)?;
1079 state.put(&self.serialize_value(&redacted)?).with_key(key).build()?;
1080 }
1081 }
1082 }
1083 }
1084
1085 tx.commit().await.map_err(|e| e.into())
1086 }
1087
1088 async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1089 self.inner
1090 .transaction(keys::PRESENCE)
1091 .with_mode(TransactionMode::Readonly)
1092 .build()?
1093 .object_store(keys::PRESENCE)?
1094 .get(&self.encode_key(keys::PRESENCE, user_id))
1095 .await?
1096 .map(|f| self.deserialize_value(&f))
1097 .transpose()
1098 }
1099
1100 async fn get_presence_events(
1101 &self,
1102 user_ids: &[OwnedUserId],
1103 ) -> Result<Vec<Raw<PresenceEvent>>> {
1104 if user_ids.is_empty() {
1105 return Ok(Vec::new());
1106 }
1107
1108 let txn =
1109 self.inner.transaction(keys::PRESENCE).with_mode(TransactionMode::Readonly).build()?;
1110 let store = txn.object_store(keys::PRESENCE)?;
1111
1112 let mut events = Vec::with_capacity(user_ids.len());
1113
1114 for user_id in user_ids {
1115 if let Some(event) = store
1116 .get(&self.encode_key(keys::PRESENCE, user_id))
1117 .await?
1118 .map(|f| self.deserialize_value(&f))
1119 .transpose()?
1120 {
1121 events.push(event)
1122 }
1123 }
1124
1125 Ok(events)
1126 }
1127
1128 async fn get_state_event(
1129 &self,
1130 room_id: &RoomId,
1131 event_type: StateEventType,
1132 state_key: &str,
1133 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1134 Ok(self
1135 .get_state_events_for_keys(room_id, event_type, &[state_key])
1136 .await?
1137 .into_iter()
1138 .next())
1139 }
1140
1141 async fn get_state_events(
1142 &self,
1143 room_id: &RoomId,
1144 event_type: StateEventType,
1145 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1146 let stripped_range =
1147 self.encode_to_range(keys::STRIPPED_ROOM_STATE, (room_id, &event_type));
1148 let stripped_events = self
1149 .inner
1150 .transaction(keys::STRIPPED_ROOM_STATE)
1151 .with_mode(TransactionMode::Readonly)
1152 .build()?
1153 .object_store(keys::STRIPPED_ROOM_STATE)?
1154 .get_all()
1155 .with_query(&stripped_range)
1156 .await?
1157 .filter_map(Result::ok)
1158 .filter_map(|f| {
1159 self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Stripped)
1160 })
1161 .collect::<Vec<_>>();
1162
1163 if !stripped_events.is_empty() {
1164 return Ok(stripped_events);
1165 }
1166
1167 let range = self.encode_to_range(keys::ROOM_STATE, (room_id, event_type));
1168 Ok(self
1169 .inner
1170 .transaction(keys::ROOM_STATE)
1171 .with_mode(TransactionMode::Readonly)
1172 .build()?
1173 .object_store(keys::ROOM_STATE)?
1174 .get_all()
1175 .with_query(&range)
1176 .await?
1177 .filter_map(Result::ok)
1178 .filter_map(|f| self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Sync))
1179 .collect::<Vec<_>>())
1180 }
1181
1182 async fn get_state_events_for_keys(
1183 &self,
1184 room_id: &RoomId,
1185 event_type: StateEventType,
1186 state_keys: &[&str],
1187 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1188 if state_keys.is_empty() {
1189 return Ok(Vec::new());
1190 }
1191
1192 let mut events = Vec::with_capacity(state_keys.len());
1193
1194 {
1195 let txn = self
1196 .inner
1197 .transaction(keys::STRIPPED_ROOM_STATE)
1198 .with_mode(TransactionMode::Readonly)
1199 .build()?;
1200 let store = txn.object_store(keys::STRIPPED_ROOM_STATE)?;
1201
1202 for state_key in state_keys {
1203 if let Some(event) =
1204 store
1205 .get(&self.encode_key(
1206 keys::STRIPPED_ROOM_STATE,
1207 (room_id, &event_type, state_key),
1208 ))
1209 .await?
1210 .map(|f| self.deserialize_value(&f))
1211 .transpose()?
1212 {
1213 events.push(RawAnySyncOrStrippedState::Stripped(event));
1214 }
1215 }
1216
1217 if !events.is_empty() {
1218 return Ok(events);
1219 }
1220 }
1221
1222 let txn = self
1223 .inner
1224 .transaction(keys::ROOM_STATE)
1225 .with_mode(TransactionMode::Readonly)
1226 .build()?;
1227 let store = txn.object_store(keys::ROOM_STATE)?;
1228
1229 for state_key in state_keys {
1230 if let Some(event) = store
1231 .get(&self.encode_key(keys::ROOM_STATE, (room_id, &event_type, state_key)))
1232 .await?
1233 .map(|f| self.deserialize_value(&f))
1234 .transpose()?
1235 {
1236 events.push(RawAnySyncOrStrippedState::Sync(event));
1237 }
1238 }
1239
1240 Ok(events)
1241 }
1242
1243 async fn get_profile(
1244 &self,
1245 room_id: &RoomId,
1246 user_id: &UserId,
1247 ) -> Result<Option<MinimalRoomMemberEvent>> {
1248 self.inner
1249 .transaction(keys::PROFILES)
1250 .with_mode(TransactionMode::Readonly)
1251 .build()?
1252 .object_store(keys::PROFILES)?
1253 .get(&self.encode_key(keys::PROFILES, (room_id, user_id)))
1254 .await?
1255 .map(|f| self.deserialize_value(&f))
1256 .transpose()
1257 }
1258
1259 async fn get_profiles<'a>(
1260 &self,
1261 room_id: &RoomId,
1262 user_ids: &'a [OwnedUserId],
1263 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1264 if user_ids.is_empty() {
1265 return Ok(BTreeMap::new());
1266 }
1267
1268 let txn =
1269 self.inner.transaction(keys::PROFILES).with_mode(TransactionMode::Readonly).build()?;
1270 let store = txn.object_store(keys::PROFILES)?;
1271
1272 let mut profiles = BTreeMap::new();
1273 for user_id in user_ids {
1274 if let Some(profile) = store
1275 .get(&self.encode_key(keys::PROFILES, (room_id, user_id)))
1276 .await?
1277 .map(|f| self.deserialize_value(&f))
1278 .transpose()?
1279 {
1280 profiles.insert(user_id.as_ref(), profile);
1281 }
1282 }
1283
1284 Ok(profiles)
1285 }
1286
1287 async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1288 let transaction = self
1289 .inner
1290 .transaction(keys::ROOM_INFOS)
1291 .with_mode(TransactionMode::Readonly)
1292 .build()?;
1293
1294 let object_store = transaction.object_store(keys::ROOM_INFOS)?;
1295
1296 Ok(match room_load_settings {
1297 RoomLoadSettings::All => object_store
1298 .get_all()
1299 .await?
1300 .map(|room_info| self.deserialize_value::<RoomInfo>(&room_info?))
1301 .collect::<Result<_>>()?,
1302
1303 RoomLoadSettings::One(room_id) => {
1304 match object_store.get(&self.encode_key(keys::ROOM_INFOS, room_id)).await? {
1305 Some(room_info) => vec![self.deserialize_value::<RoomInfo>(&room_info)?],
1306 None => vec![],
1307 }
1308 }
1309 })
1310 }
1311
1312 async fn get_users_with_display_name(
1313 &self,
1314 room_id: &RoomId,
1315 display_name: &DisplayName,
1316 ) -> Result<BTreeSet<OwnedUserId>> {
1317 self.inner
1318 .transaction(keys::DISPLAY_NAMES)
1319 .with_mode(TransactionMode::Readonly)
1320 .build()?
1321 .object_store(keys::DISPLAY_NAMES)?
1322 .get(&self.encode_key(
1323 keys::DISPLAY_NAMES,
1324 (
1325 room_id,
1326 display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1327 ),
1328 ))
1329 .await?
1330 .map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
1331 .unwrap_or_else(|| Ok(Default::default()))
1332 }
1333
1334 async fn get_users_with_display_names<'a>(
1335 &self,
1336 room_id: &RoomId,
1337 display_names: &'a [DisplayName],
1338 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1339 let mut map = HashMap::new();
1340
1341 if display_names.is_empty() {
1342 return Ok(map);
1343 }
1344
1345 let txn = self
1346 .inner
1347 .transaction(keys::DISPLAY_NAMES)
1348 .with_mode(TransactionMode::Readonly)
1349 .build()?;
1350 let store = txn.object_store(keys::DISPLAY_NAMES)?;
1351
1352 for display_name in display_names {
1353 if let Some(user_ids) = store
1354 .get(
1355 &self.encode_key(
1356 keys::DISPLAY_NAMES,
1357 (
1358 room_id,
1359 display_name
1360 .as_normalized_str()
1361 .unwrap_or_else(|| display_name.as_raw_str()),
1362 ),
1363 ),
1364 )
1365 .await?
1366 .map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
1367 .transpose()?
1368 {
1369 map.insert(display_name, user_ids);
1370 }
1371 }
1372
1373 Ok(map)
1374 }
1375
1376 async fn get_account_data_event(
1377 &self,
1378 event_type: GlobalAccountDataEventType,
1379 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1380 self.inner
1381 .transaction(keys::ACCOUNT_DATA)
1382 .with_mode(TransactionMode::Readonly)
1383 .build()?
1384 .object_store(keys::ACCOUNT_DATA)?
1385 .get(&self.encode_key(keys::ACCOUNT_DATA, event_type))
1386 .await?
1387 .map(|f| self.deserialize_value(&f))
1388 .transpose()
1389 }
1390
1391 async fn get_room_account_data_event(
1392 &self,
1393 room_id: &RoomId,
1394 event_type: RoomAccountDataEventType,
1395 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1396 self.inner
1397 .transaction(keys::ROOM_ACCOUNT_DATA)
1398 .with_mode(TransactionMode::Readonly)
1399 .build()?
1400 .object_store(keys::ROOM_ACCOUNT_DATA)?
1401 .get(&self.encode_key(keys::ROOM_ACCOUNT_DATA, (room_id, event_type)))
1402 .await?
1403 .map(|f| self.deserialize_value(&f))
1404 .transpose()
1405 }
1406
1407 async fn get_user_room_receipt_event(
1408 &self,
1409 room_id: &RoomId,
1410 receipt_type: ReceiptType,
1411 thread: ReceiptThread,
1412 user_id: &UserId,
1413 ) -> Result<Option<(OwnedEventId, Receipt)>> {
1414 let key = match thread.as_str() {
1415 Some(thread_id) => self
1416 .encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, thread_id, user_id)),
1417 None => self.encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, user_id)),
1418 };
1419 self.inner
1420 .transaction(keys::ROOM_USER_RECEIPTS)
1421 .with_mode(TransactionMode::Readonly)
1422 .build()?
1423 .object_store(keys::ROOM_USER_RECEIPTS)?
1424 .get(&key)
1425 .await?
1426 .map(|f| self.deserialize_value(&f))
1427 .transpose()
1428 }
1429
1430 async fn get_event_room_receipt_events(
1431 &self,
1432 room_id: &RoomId,
1433 receipt_type: ReceiptType,
1434 thread: ReceiptThread,
1435 event_id: &EventId,
1436 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1437 let range = match thread.as_str() {
1438 Some(thread_id) => self.encode_to_range(
1439 keys::ROOM_EVENT_RECEIPTS,
1440 (room_id, receipt_type, thread_id, event_id),
1441 ),
1442 None => {
1443 self.encode_to_range(keys::ROOM_EVENT_RECEIPTS, (room_id, receipt_type, event_id))
1444 }
1445 };
1446 let tx = self
1447 .inner
1448 .transaction(keys::ROOM_EVENT_RECEIPTS)
1449 .with_mode(TransactionMode::Readonly)
1450 .build()?;
1451 let store = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
1452
1453 Ok(store
1454 .get_all()
1455 .with_query(&range)
1456 .await?
1457 .filter_map(Result::ok)
1458 .filter_map(|f| self.deserialize_value(&f).ok())
1459 .collect::<Vec<_>>())
1460 }
1461
1462 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1463 let jskey = &JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1464 self.get_custom_value_for_js(jskey).await
1465 }
1466
1467 async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1468 let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1469
1470 let prev = self.get_custom_value_for_js(&jskey).await?;
1471
1472 let tx =
1473 self.inner.transaction(keys::CUSTOM).with_mode(TransactionMode::Readwrite).build()?;
1474
1475 tx.object_store(keys::CUSTOM)?
1476 .put(&self.serialize_value(&value)?)
1477 .with_key(jskey)
1478 .build()?;
1479
1480 tx.commit().await.map_err(IndexeddbStateStoreError::from)?;
1481 Ok(prev)
1482 }
1483
1484 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1485 let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1486
1487 let prev = self.get_custom_value_for_js(&jskey).await?;
1488
1489 let tx =
1490 self.inner.transaction(keys::CUSTOM).with_mode(TransactionMode::Readwrite).build()?;
1491
1492 tx.object_store(keys::CUSTOM)?.delete(&jskey).build()?;
1493
1494 tx.commit().await.map_err(IndexeddbStateStoreError::from)?;
1495 Ok(prev)
1496 }
1497
1498 async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1499 let direct_stores = [keys::ROOM_INFOS, keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE];
1501
1502 let prefixed_stores = [
1505 keys::PROFILES,
1506 keys::DISPLAY_NAMES,
1507 keys::USER_IDS,
1508 keys::ROOM_STATE,
1509 keys::ROOM_ACCOUNT_DATA,
1510 keys::ROOM_EVENT_RECEIPTS,
1511 keys::ROOM_USER_RECEIPTS,
1512 keys::STRIPPED_ROOM_STATE,
1513 keys::STRIPPED_USER_IDS,
1514 keys::THREAD_SUBSCRIPTIONS,
1515 ];
1516
1517 let all_stores = {
1518 let mut v = Vec::new();
1519 v.extend(prefixed_stores);
1520 v.extend(direct_stores);
1521 v
1522 };
1523
1524 let tx =
1525 self.inner.transaction(all_stores).with_mode(TransactionMode::Readwrite).build()?;
1526
1527 for store_name in direct_stores {
1528 tx.object_store(store_name)?.delete(&self.encode_key(store_name, room_id)).build()?;
1529 }
1530
1531 for store_name in prefixed_stores {
1532 let store = tx.object_store(store_name)?;
1533 let range = self.encode_to_range(store_name, room_id);
1534 for key in store.get_all_keys::<JsValue>().with_query(&range).await? {
1535 store.delete(&key?).build()?;
1536 }
1537 }
1538
1539 tx.commit().await.map_err(|e| e.into())
1540 }
1541
1542 async fn get_user_ids(
1543 &self,
1544 room_id: &RoomId,
1545 memberships: RoomMemberships,
1546 ) -> Result<Vec<OwnedUserId>> {
1547 let ids = self.get_user_ids_inner(room_id, memberships, true).await?;
1548 if !ids.is_empty() {
1549 return Ok(ids);
1550 }
1551 self.get_user_ids_inner(room_id, memberships, false).await
1552 }
1553
1554 async fn save_send_queue_request(
1555 &self,
1556 room_id: &RoomId,
1557 transaction_id: OwnedTransactionId,
1558 created_at: MilliSecondsSinceUnixEpoch,
1559 kind: QueuedRequestKind,
1560 priority: usize,
1561 ) -> Result<()> {
1562 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1563
1564 let tx = self
1565 .inner
1566 .transaction(keys::ROOM_SEND_QUEUE)
1567 .with_mode(TransactionMode::Readwrite)
1568 .build()?;
1569
1570 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1571
1572 let prev = obj.get(&encoded_key).await?;
1577
1578 let mut prev = prev.map_or_else(
1579 || Ok(Vec::new()),
1580 |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1581 )?;
1582
1583 prev.push(PersistedQueuedRequest {
1585 room_id: room_id.to_owned(),
1586 kind: Some(kind),
1587 transaction_id,
1588 error: None,
1589 is_wedged: None,
1590 event: None,
1591 priority: Some(priority),
1592 created_at,
1593 });
1594
1595 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1597
1598 tx.commit().await?;
1599
1600 Ok(())
1601 }
1602
1603 async fn update_send_queue_request(
1604 &self,
1605 room_id: &RoomId,
1606 transaction_id: &TransactionId,
1607 kind: QueuedRequestKind,
1608 ) -> Result<bool> {
1609 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1610
1611 let tx = self
1612 .inner
1613 .transaction(keys::ROOM_SEND_QUEUE)
1614 .with_mode(TransactionMode::Readwrite)
1615 .build()?;
1616
1617 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1618
1619 let prev = obj.get(&encoded_key).await?;
1624
1625 let mut prev = prev.map_or_else(
1626 || Ok(Vec::new()),
1627 |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1628 )?;
1629
1630 if let Some(entry) = prev.iter_mut().find(|entry| entry.transaction_id == transaction_id) {
1632 entry.kind = Some(kind);
1633 entry.error = None;
1635 entry.is_wedged = None;
1637 entry.event = None;
1638
1639 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1641 tx.commit().await?;
1642
1643 Ok(true)
1644 } else {
1645 Ok(false)
1646 }
1647 }
1648
1649 async fn remove_send_queue_request(
1650 &self,
1651 room_id: &RoomId,
1652 transaction_id: &TransactionId,
1653 ) -> Result<bool> {
1654 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1655
1656 let tx = self
1657 .inner
1658 .transaction([keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE])
1659 .with_mode(TransactionMode::Readwrite)
1660 .build()?;
1661
1662 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1663
1664 if let Some(val) = obj.get(&encoded_key).await? {
1669 let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
1670 if let Some(pos) = prev.iter().position(|item| item.transaction_id == transaction_id) {
1671 prev.remove(pos);
1672
1673 if prev.is_empty() {
1674 obj.delete(&encoded_key).build()?;
1675 } else {
1676 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1677 }
1678
1679 tx.commit().await?;
1680 return Ok(true);
1681 }
1682 }
1683
1684 Ok(false)
1685 }
1686
1687 async fn load_send_queue_requests(&self, room_id: &RoomId) -> Result<Vec<QueuedRequest>> {
1688 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1689
1690 let prev = self
1693 .inner
1694 .transaction(keys::ROOM_SEND_QUEUE)
1695 .with_mode(TransactionMode::Readwrite)
1696 .build()?
1697 .object_store(keys::ROOM_SEND_QUEUE)?
1698 .get(&encoded_key)
1699 .await?;
1700
1701 let mut prev = prev.map_or_else(
1702 || Ok(Vec::new()),
1703 |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1704 )?;
1705
1706 prev.sort_by_key(|item| Reverse(item.priority.unwrap_or(0)));
1708
1709 Ok(prev.into_iter().filter_map(PersistedQueuedRequest::into_queued_request).collect())
1710 }
1711
1712 async fn update_send_queue_request_status(
1713 &self,
1714 room_id: &RoomId,
1715 transaction_id: &TransactionId,
1716 error: Option<QueueWedgeError>,
1717 ) -> Result<()> {
1718 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1719
1720 let tx = self
1721 .inner
1722 .transaction(keys::ROOM_SEND_QUEUE)
1723 .with_mode(TransactionMode::Readwrite)
1724 .build()?;
1725
1726 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1727
1728 if let Some(val) = obj.get(&encoded_key).await? {
1729 let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
1730 if let Some(request) =
1731 prev.iter_mut().find(|item| item.transaction_id == transaction_id)
1732 {
1733 request.is_wedged = None;
1734 request.error = error;
1735 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1736 }
1737 }
1738
1739 tx.commit().await?;
1740
1741 Ok(())
1742 }
1743
1744 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>> {
1745 let tx = self
1746 .inner
1747 .transaction(keys::ROOM_SEND_QUEUE)
1748 .with_mode(TransactionMode::Readwrite)
1749 .build()?;
1750
1751 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1752
1753 let all_entries = obj
1754 .get_all()
1755 .await?
1756 .map(|item| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&item?))
1757 .collect::<Result<Vec<Vec<PersistedQueuedRequest>>, _>>()?
1758 .into_iter()
1759 .flat_map(|vec| vec.into_iter().map(|item| item.room_id))
1760 .collect::<BTreeSet<_>>();
1761
1762 Ok(all_entries.into_iter().collect())
1763 }
1764
1765 async fn save_dependent_queued_request(
1766 &self,
1767 room_id: &RoomId,
1768 parent_txn_id: &TransactionId,
1769 own_txn_id: ChildTransactionId,
1770 created_at: MilliSecondsSinceUnixEpoch,
1771 content: DependentQueuedRequestKind,
1772 ) -> Result<()> {
1773 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1774
1775 let tx = self
1776 .inner
1777 .transaction(keys::DEPENDENT_SEND_QUEUE)
1778 .with_mode(TransactionMode::Readwrite)
1779 .build()?;
1780
1781 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1782
1783 let prev = obj.get(&encoded_key).await?;
1786
1787 let mut prev = prev.map_or_else(
1788 || Ok(Vec::new()),
1789 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1790 )?;
1791
1792 prev.push(DependentQueuedRequest {
1794 kind: content,
1795 parent_transaction_id: parent_txn_id.to_owned(),
1796 own_transaction_id: own_txn_id,
1797 parent_key: None,
1798 created_at,
1799 });
1800
1801 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1803
1804 tx.commit().await?;
1805
1806 Ok(())
1807 }
1808
1809 async fn update_dependent_queued_request(
1810 &self,
1811 room_id: &RoomId,
1812 own_transaction_id: &ChildTransactionId,
1813 new_content: DependentQueuedRequestKind,
1814 ) -> Result<bool> {
1815 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1816
1817 let tx = self
1818 .inner
1819 .transaction(keys::DEPENDENT_SEND_QUEUE)
1820 .with_mode(TransactionMode::Readwrite)
1821 .build()?;
1822
1823 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1824
1825 let prev = obj.get(&encoded_key).await?;
1828
1829 let mut prev = prev.map_or_else(
1830 || Ok(Vec::new()),
1831 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1832 )?;
1833
1834 let mut found = false;
1836 for entry in prev.iter_mut() {
1837 if entry.own_transaction_id == *own_transaction_id {
1838 found = true;
1839 entry.kind = new_content;
1840 break;
1841 }
1842 }
1843
1844 if found {
1845 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1846 tx.commit().await?;
1847 }
1848
1849 Ok(found)
1850 }
1851
1852 async fn mark_dependent_queued_requests_as_ready(
1853 &self,
1854 room_id: &RoomId,
1855 parent_txn_id: &TransactionId,
1856 parent_key: SentRequestKey,
1857 ) -> Result<usize> {
1858 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1859
1860 let tx = self
1861 .inner
1862 .transaction(keys::DEPENDENT_SEND_QUEUE)
1863 .with_mode(TransactionMode::Readwrite)
1864 .build()?;
1865
1866 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1867
1868 let prev = obj.get(&encoded_key).await?;
1871
1872 let mut prev = prev.map_or_else(
1873 || Ok(Vec::new()),
1874 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1875 )?;
1876
1877 let mut num_updated = 0;
1879 for entry in prev.iter_mut().filter(|entry| entry.parent_transaction_id == parent_txn_id) {
1880 entry.parent_key = Some(parent_key.clone());
1881 num_updated += 1;
1882 }
1883
1884 if num_updated > 0 {
1885 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1886 tx.commit().await?;
1887 }
1888
1889 Ok(num_updated)
1890 }
1891
1892 async fn remove_dependent_queued_request(
1893 &self,
1894 room_id: &RoomId,
1895 txn_id: &ChildTransactionId,
1896 ) -> Result<bool> {
1897 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1898
1899 let tx = self
1900 .inner
1901 .transaction(keys::DEPENDENT_SEND_QUEUE)
1902 .with_mode(TransactionMode::Readwrite)
1903 .build()?;
1904
1905 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1906
1907 if let Some(val) = obj.get(&encoded_key).await? {
1910 let mut prev = self.deserialize_value::<Vec<DependentQueuedRequest>>(&val)?;
1911 if let Some(pos) = prev.iter().position(|item| item.own_transaction_id == *txn_id) {
1912 prev.remove(pos);
1913
1914 if prev.is_empty() {
1915 obj.delete(&encoded_key).build()?;
1916 } else {
1917 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1918 }
1919
1920 tx.commit().await?;
1921 return Ok(true);
1922 }
1923 }
1924
1925 Ok(false)
1926 }
1927
1928 async fn load_dependent_queued_requests(
1929 &self,
1930 room_id: &RoomId,
1931 ) -> Result<Vec<DependentQueuedRequest>> {
1932 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1933
1934 let prev = self
1936 .inner
1937 .transaction(keys::DEPENDENT_SEND_QUEUE)
1938 .with_mode(TransactionMode::Readwrite)
1939 .build()?
1940 .object_store(keys::DEPENDENT_SEND_QUEUE)?
1941 .get(&encoded_key)
1942 .await?;
1943
1944 prev.map_or_else(
1945 || Ok(Vec::new()),
1946 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1947 )
1948 }
1949
1950 async fn upsert_thread_subscriptions(
1951 &self,
1952 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1953 ) -> Result<()> {
1954 let tx = self
1955 .inner
1956 .transaction(keys::THREAD_SUBSCRIPTIONS)
1957 .with_mode(TransactionMode::Readwrite)
1958 .build()?;
1959 let obj = tx.object_store(keys::THREAD_SUBSCRIPTIONS)?;
1960
1961 for (room_id, thread_id, subscription) in updates {
1962 let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room_id, thread_id));
1963 let mut new = PersistedThreadSubscription::from(subscription);
1964
1965 if let Some(previous_value) = obj.get(&encoded_key).await? {
1967 let previous: PersistedThreadSubscription =
1968 self.deserialize_value(&previous_value)?;
1969
1970 if new == previous {
1972 continue;
1973 }
1974 if !compare_thread_subscription_bump_stamps(
1975 previous.bump_stamp,
1976 &mut new.bump_stamp,
1977 ) {
1978 continue;
1979 }
1980 }
1981
1982 let serialized_value = self.serialize_value(&new);
1983 obj.put(&serialized_value?).with_key(encoded_key).build()?;
1984 }
1985
1986 tx.commit().await?;
1987
1988 Ok(())
1989 }
1990
1991 async fn load_thread_subscription(
1992 &self,
1993 room: &RoomId,
1994 thread_id: &EventId,
1995 ) -> Result<Option<StoredThreadSubscription>> {
1996 let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
1997
1998 let js_value = self
1999 .inner
2000 .transaction(keys::THREAD_SUBSCRIPTIONS)
2001 .with_mode(TransactionMode::Readonly)
2002 .build()?
2003 .object_store(keys::THREAD_SUBSCRIPTIONS)?
2004 .get(&encoded_key)
2005 .await?;
2006
2007 let Some(js_value) = js_value else {
2008 return Ok(None);
2010 };
2011
2012 let sub: PersistedThreadSubscription = self.deserialize_value(&js_value)?;
2013
2014 let status = ThreadSubscriptionStatus::from_str(&sub.status).map_err(|_| {
2015 StoreError::InvalidData {
2016 details: format!(
2017 "invalid thread status for room {room} and thread {thread_id}: {}",
2018 sub.status
2019 ),
2020 }
2021 })?;
2022
2023 Ok(Some(StoredThreadSubscription { status, bump_stamp: sub.bump_stamp }))
2024 }
2025
2026 async fn remove_thread_subscription(&self, room: &RoomId, thread_id: &EventId) -> Result<()> {
2027 let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
2028
2029 let transaction = self
2030 .inner
2031 .transaction(keys::THREAD_SUBSCRIPTIONS)
2032 .with_mode(TransactionMode::Readwrite)
2033 .build()?;
2034 transaction.object_store(keys::THREAD_SUBSCRIPTIONS)?.delete(&encoded_key).await?;
2035 transaction.commit().await?;
2036
2037 Ok(())
2038 }
2039
2040 #[allow(clippy::unused_async)]
2041 async fn optimize(&self) -> Result<()> {
2042 Ok(())
2043 }
2044
2045 #[allow(clippy::unused_async)]
2046 async fn get_size(&self) -> Result<Option<usize>> {
2047 Ok(None)
2048 }
2049});
2050
2051#[derive(Debug, Serialize, Deserialize)]
2053struct RoomMember {
2054 user_id: OwnedUserId,
2055 membership: MembershipState,
2056}
2057
2058impl From<&SyncStateEvent<RoomMemberEventContent>> for RoomMember {
2059 fn from(event: &SyncStateEvent<RoomMemberEventContent>) -> Self {
2060 Self { user_id: event.state_key().clone(), membership: event.membership().clone() }
2061 }
2062}
2063
2064impl From<&StrippedRoomMemberEvent> for RoomMember {
2065 fn from(event: &StrippedRoomMemberEvent) -> Self {
2066 Self { user_id: event.state_key.clone(), membership: event.content.membership.clone() }
2067 }
2068}
2069
2070#[cfg(test)]
2071mod migration_tests {
2072 use assert_matches2::assert_matches;
2073 use matrix_sdk_base::store::{QueuedRequestKind, SerializableEventContent};
2074 use ruma::{
2075 OwnedRoomId, OwnedTransactionId, TransactionId,
2076 events::room::message::RoomMessageEventContent, room_id,
2077 };
2078 use serde::{Deserialize, Serialize};
2079
2080 use crate::state_store::PersistedQueuedRequest;
2081
2082 #[derive(Serialize, Deserialize)]
2083 struct OldPersistedQueuedRequest {
2084 room_id: OwnedRoomId,
2085 event: SerializableEventContent,
2086 transaction_id: OwnedTransactionId,
2087 is_wedged: bool,
2088 }
2089
2090 #[test]
2094 fn test_migrating_persisted_queue_event_serialization() {
2095 let room_a_id = room_id!("!room_a:dummy.local");
2096 let transaction_id = TransactionId::new();
2097 let content =
2098 SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())
2099 .unwrap();
2100
2101 let old_persisted_queue_event = OldPersistedQueuedRequest {
2102 room_id: room_a_id.to_owned(),
2103 event: content,
2104 transaction_id: transaction_id.clone(),
2105 is_wedged: true,
2106 };
2107
2108 let serialized_persisted = serde_json::to_vec(&old_persisted_queue_event).unwrap();
2109
2110 let new_persisted: PersistedQueuedRequest =
2112 serde_json::from_slice(&serialized_persisted).unwrap();
2113
2114 assert_eq!(new_persisted.is_wedged, Some(true));
2115 assert!(new_persisted.error.is_none());
2116
2117 assert!(new_persisted.event.is_some());
2118 assert!(new_persisted.kind.is_none());
2119
2120 let queued = new_persisted.into_queued_request().unwrap();
2121 assert_matches!(queued.kind, QueuedRequestKind::Event { .. });
2122 assert_eq!(queued.transaction_id, transaction_id);
2123 assert!(queued.error.is_some());
2124 }
2125}
2126
2127#[cfg(all(test, target_family = "wasm"))]
2128mod tests {
2129 #[cfg(target_family = "wasm")]
2130 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
2131
2132 use matrix_sdk_base::statestore_integration_tests;
2133 use uuid::Uuid;
2134
2135 use super::{IndexeddbStateStore, Result};
2136
2137 async fn get_store() -> Result<IndexeddbStateStore> {
2138 let db_name = format!("test-state-plain-{}", Uuid::new_v4().as_hyphenated());
2139 Ok(IndexeddbStateStore::builder().name(db_name).build().await?)
2140 }
2141
2142 statestore_integration_tests!();
2143}
2144
2145#[cfg(all(test, target_family = "wasm"))]
2146mod encrypted_tests {
2147 #[cfg(target_family = "wasm")]
2148 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
2149
2150 use matrix_sdk_base::statestore_integration_tests;
2151 use uuid::Uuid;
2152
2153 use super::{IndexeddbStateStore, Result};
2154
2155 async fn get_store() -> Result<IndexeddbStateStore> {
2156 let db_name = format!("test-state-encrypted-{}", Uuid::new_v4().as_hyphenated());
2157 let passphrase = format!("some_passphrase-{}", Uuid::new_v4().as_hyphenated());
2158 Ok(IndexeddbStateStore::builder().name(db_name).passphrase(passphrase).build().await?)
2159 }
2160
2161 statestore_integration_tests!();
2162}