1use std::{
16 cmp::Reverse,
17 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
18 str::FromStr as _,
19 sync::Arc,
20};
21
22use async_trait::async_trait;
23use gloo_utils::format::JsValueSerdeExt;
24use growable_bloom_filter::GrowableBloom;
25use indexed_db_futures::{
26 KeyRange, cursor::CursorDirection, database::Database, error::OpenDbError, prelude::*,
27 transaction::TransactionMode,
28};
29use matrix_sdk_base::{
30 MinimalRoomMemberEvent, ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK, RoomInfo,
31 RoomMemberships, StateStoreDataKey, StateStoreDataValue, ThreadSubscriptionCatchupToken,
32 deserialized_responses::{DisplayName, RawAnySyncOrStrippedState},
33 store::{
34 ChildTransactionId, ComposerDraft, DependentQueuedRequest, DependentQueuedRequestKind,
35 QueuedRequest, QueuedRequestKind, RoomLoadSettings, SentRequestKey,
36 SerializableEventContent, StateChanges, StateStore, StoreError, StoredThreadSubscription,
37 SupportedVersionsResponse, ThreadSubscriptionStatus, TtlStoreValue, WellKnownResponse,
38 compare_thread_subscription_bump_stamps,
39 },
40};
41use matrix_sdk_store_encryption::{Error as EncryptionError, StoreCipher};
42use ruma::{
43 CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri,
44 OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
45 api::client::discovery::get_capabilities::v3::Capabilities,
46 canonical_json::{RedactedBecause, redact},
47 events::{
48 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
49 GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, SyncStateEvent,
50 presence::PresenceEvent,
51 receipt::{Receipt, ReceiptThread, ReceiptType},
52 room::member::{
53 MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent, SyncRoomMemberEvent,
54 },
55 },
56 serde::Raw,
57};
58use serde::{Deserialize, Serialize, de::DeserializeOwned, ser::Error};
59use tracing::{debug, warn};
60use wasm_bindgen::JsValue;
61
62mod migrations;
63
64pub use self::migrations::MigrationConflictStrategy;
65use self::migrations::{upgrade_inner_db, upgrade_meta_db};
66use crate::{error::GenericError, serializer::safe_encode::traits::SafeEncode};
67
68#[derive(Debug, thiserror::Error)]
69pub enum IndexeddbStateStoreError {
70 #[error(transparent)]
71 Json(#[from] serde_json::Error),
72 #[error(transparent)]
73 Encryption(#[from] EncryptionError),
74 #[error("DomException {name} ({code}): {message}")]
75 DomException { name: String, message: String, code: u16 },
76 #[error(transparent)]
77 StoreError(#[from] StoreError),
78 #[error(
79 "Can't migrate {name} from {old_version} to {new_version} without deleting data. \
80 See MigrationConflictStrategy for ways to configure."
81 )]
82 MigrationConflict { name: String, old_version: u32, new_version: u32 },
83}
84
85impl From<GenericError> for IndexeddbStateStoreError {
86 fn from(value: GenericError) -> Self {
87 Self::StoreError(value.into())
88 }
89}
90
91impl From<web_sys::DomException> for IndexeddbStateStoreError {
92 fn from(frm: web_sys::DomException) -> IndexeddbStateStoreError {
93 IndexeddbStateStoreError::DomException {
94 name: frm.name(),
95 message: frm.message(),
96 code: frm.code(),
97 }
98 }
99}
100
101impl From<IndexeddbStateStoreError> for StoreError {
102 fn from(e: IndexeddbStateStoreError) -> Self {
103 match e {
104 IndexeddbStateStoreError::Json(e) => StoreError::Json(e),
105 IndexeddbStateStoreError::StoreError(e) => e,
106 IndexeddbStateStoreError::Encryption(e) => StoreError::Encryption(e),
107 _ => StoreError::backend(e),
108 }
109 }
110}
111
112impl From<indexed_db_futures::error::DomException> for IndexeddbStateStoreError {
113 fn from(value: indexed_db_futures::error::DomException) -> Self {
114 web_sys::DomException::from(value).into()
115 }
116}
117
118impl From<indexed_db_futures::error::SerialisationError> for IndexeddbStateStoreError {
119 fn from(value: indexed_db_futures::error::SerialisationError) -> Self {
120 Self::Json(serde_json::Error::custom(value.to_string()))
121 }
122}
123
124impl From<indexed_db_futures::error::UnexpectedDataError> for IndexeddbStateStoreError {
125 fn from(value: indexed_db_futures::error::UnexpectedDataError) -> Self {
126 IndexeddbStateStoreError::StoreError(StoreError::backend(value))
127 }
128}
129
130impl From<indexed_db_futures::error::JSError> for IndexeddbStateStoreError {
131 fn from(value: indexed_db_futures::error::JSError) -> Self {
132 GenericError::from(value.to_string()).into()
133 }
134}
135
136impl From<indexed_db_futures::error::Error> for IndexeddbStateStoreError {
137 fn from(value: indexed_db_futures::error::Error) -> Self {
138 use indexed_db_futures::error::Error;
139 match value {
140 Error::DomException(e) => e.into(),
141 Error::Serialisation(e) => e.into(),
142 Error::MissingData(e) => e.into(),
143 Error::Unknown(e) => e.into(),
144 }
145 }
146}
147
148impl From<OpenDbError> for IndexeddbStateStoreError {
149 fn from(value: OpenDbError) -> Self {
150 match value {
151 OpenDbError::Base(error) => error.into(),
152 _ => GenericError::from(value.to_string()).into(),
153 }
154 }
155}
156
157mod keys {
158 pub const INTERNAL_STATE: &str = "matrix-sdk-state";
159 pub const BACKUPS_META: &str = "backups";
160
161 pub const ACCOUNT_DATA: &str = "account_data";
162
163 pub const PROFILES: &str = "profiles";
164 pub const DISPLAY_NAMES: &str = "display_names";
165 pub const USER_IDS: &str = "user_ids";
166
167 pub const ROOM_STATE: &str = "room_state";
168 pub const ROOM_INFOS: &str = "room_infos";
169 pub const PRESENCE: &str = "presence";
170 pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
171 pub const ROOM_SEND_QUEUE: &str = "room_send_queue";
173 pub const DEPENDENT_SEND_QUEUE: &str = "room_dependent_send_queue";
175 pub const THREAD_SUBSCRIPTIONS: &str = "room_thread_subscriptions";
176
177 pub const STRIPPED_ROOM_STATE: &str = "stripped_room_state";
178 pub const STRIPPED_USER_IDS: &str = "stripped_user_ids";
179
180 pub const ROOM_USER_RECEIPTS: &str = "room_user_receipts";
181 pub const ROOM_EVENT_RECEIPTS: &str = "room_event_receipts";
182
183 pub const CUSTOM: &str = "custom";
184 pub const KV: &str = "kv";
185
186 pub const ALL_STORES: &[&str] = &[
188 ACCOUNT_DATA,
189 PROFILES,
190 DISPLAY_NAMES,
191 USER_IDS,
192 ROOM_STATE,
193 ROOM_INFOS,
194 PRESENCE,
195 ROOM_ACCOUNT_DATA,
196 STRIPPED_ROOM_STATE,
197 STRIPPED_USER_IDS,
198 ROOM_USER_RECEIPTS,
199 ROOM_EVENT_RECEIPTS,
200 ROOM_SEND_QUEUE,
201 THREAD_SUBSCRIPTIONS,
202 DEPENDENT_SEND_QUEUE,
203 CUSTOM,
204 KV,
205 ];
206
207 pub const STORE_KEY: &str = "store_key";
210}
211
212pub use keys::ALL_STORES;
213use matrix_sdk_base::store::QueueWedgeError;
214
215fn serialize_value(store_cipher: Option<&StoreCipher>, event: &impl Serialize) -> Result<JsValue> {
217 Ok(match store_cipher {
218 Some(cipher) => {
219 let data = serde_json::to_vec(event)?;
220 JsValue::from_serde(&cipher.encrypt_value_data(data)?)?
221 }
222 None => JsValue::from_serde(event)?,
223 })
224}
225
226fn deserialize_value<T: DeserializeOwned>(
228 store_cipher: Option<&StoreCipher>,
229 event: &JsValue,
230) -> Result<T> {
231 match store_cipher {
232 Some(cipher) => {
233 use zeroize::Zeroize;
234 let mut plaintext = cipher.decrypt_value_data(event.into_serde()?)?;
235 let ret = serde_json::from_slice(&plaintext);
236 plaintext.zeroize();
237 Ok(ret?)
238 }
239 None => Ok(event.into_serde()?),
240 }
241}
242
243fn encode_key<T>(store_cipher: Option<&StoreCipher>, table_name: &str, key: T) -> JsValue
244where
245 T: SafeEncode,
246{
247 match store_cipher {
248 Some(cipher) => key.as_secure_string(table_name, cipher),
249 None => key.as_encoded_string(),
250 }
251 .into()
252}
253
254fn encode_to_range<T>(
255 store_cipher: Option<&StoreCipher>,
256 table_name: &str,
257 key: T,
258) -> KeyRange<JsValue>
259where
260 T: SafeEncode,
261{
262 match store_cipher {
263 Some(cipher) => key.encode_to_range_secure(table_name, cipher),
264 None => key.encode_to_range(),
265 }
266}
267
268#[derive(Debug)]
270pub struct IndexeddbStateStoreBuilder {
271 name: Option<String>,
272 passphrase: Option<String>,
273 migration_conflict_strategy: MigrationConflictStrategy,
274}
275
276impl IndexeddbStateStoreBuilder {
277 fn new() -> Self {
278 Self {
279 name: None,
280 passphrase: None,
281 migration_conflict_strategy: MigrationConflictStrategy::BackupAndDrop,
282 }
283 }
284
285 pub fn name(mut self, value: String) -> Self {
287 self.name = Some(value);
288 self
289 }
290
291 pub fn passphrase(mut self, value: String) -> Self {
295 self.passphrase = Some(value);
296 self
297 }
298
299 pub fn migration_conflict_strategy(mut self, value: MigrationConflictStrategy) -> Self {
303 self.migration_conflict_strategy = value;
304 self
305 }
306
307 pub async fn build(self) -> Result<IndexeddbStateStore> {
308 let migration_strategy = self.migration_conflict_strategy.clone();
309 let name = self.name.unwrap_or_else(|| "state".to_owned());
310
311 let meta_name = format!("{name}::{}", keys::INTERNAL_STATE);
312
313 let (meta, store_cipher) = upgrade_meta_db(&meta_name, self.passphrase.as_deref()).await?;
314 let inner =
315 upgrade_inner_db(&name, store_cipher.as_deref(), migration_strategy, &meta).await?;
316
317 Ok(IndexeddbStateStore { name, inner, meta, store_cipher })
318 }
319}
320
321pub struct IndexeddbStateStore {
322 name: String,
323 pub(crate) inner: Database,
324 pub(crate) meta: Database,
325 pub(crate) store_cipher: Option<Arc<StoreCipher>>,
326}
327
328#[cfg(not(tarpaulin_include))]
329impl std::fmt::Debug for IndexeddbStateStore {
330 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
331 f.debug_struct("IndexeddbStateStore").field("name", &self.name).finish()
332 }
333}
334
335type Result<A, E = IndexeddbStateStoreError> = std::result::Result<A, E>;
336
337impl IndexeddbStateStore {
338 pub fn builder() -> IndexeddbStateStoreBuilder {
340 IndexeddbStateStoreBuilder::new()
341 }
342
343 pub fn version(&self) -> u32 {
345 self.inner.version() as u32
346 }
347
348 pub fn meta_version(&self) -> u32 {
350 self.meta.version() as u32
351 }
352
353 pub async fn has_backups(&self) -> Result<bool> {
355 Ok(self
356 .meta
357 .transaction(keys::BACKUPS_META)
358 .with_mode(TransactionMode::Readonly)
359 .build()?
360 .object_store(keys::BACKUPS_META)?
361 .count()
362 .await?
363 > 0)
364 }
365
366 pub async fn latest_backup(&self) -> Result<Option<String>> {
368 if let Some(mut cursor) = self
369 .meta
370 .transaction(keys::BACKUPS_META)
371 .with_mode(TransactionMode::Readonly)
372 .build()?
373 .object_store(keys::BACKUPS_META)?
374 .open_cursor()
375 .with_direction(CursorDirection::Prev)
376 .await?
377 && let Some(record) = cursor.next_record::<JsValue>().await?
378 {
379 Ok(record.as_string())
380 } else {
381 Ok(None)
382 }
383 }
384
385 fn serialize_value(&self, event: &impl Serialize) -> Result<JsValue> {
387 serialize_value(self.store_cipher.as_deref(), event)
388 }
389
390 fn deserialize_value<T: DeserializeOwned>(&self, event: &JsValue) -> Result<T> {
392 deserialize_value(self.store_cipher.as_deref(), event)
393 }
394
395 fn encode_key<T>(&self, table_name: &str, key: T) -> JsValue
396 where
397 T: SafeEncode,
398 {
399 encode_key(self.store_cipher.as_deref(), table_name, key)
400 }
401
402 fn encode_to_range<T>(&self, table_name: &str, key: T) -> KeyRange<JsValue>
403 where
404 T: SafeEncode,
405 {
406 encode_to_range(self.store_cipher.as_deref(), table_name, key)
407 }
408
409 pub async fn get_user_ids_inner(
412 &self,
413 room_id: &RoomId,
414 memberships: RoomMemberships,
415 stripped: bool,
416 ) -> Result<Vec<OwnedUserId>> {
417 let store_name = if stripped { keys::STRIPPED_USER_IDS } else { keys::USER_IDS };
418
419 let tx = self.inner.transaction(store_name).with_mode(TransactionMode::Readonly).build()?;
420 let store = tx.object_store(store_name)?;
421 let range = self.encode_to_range(store_name, room_id);
422
423 let user_ids = if memberships.is_empty() {
424 store
426 .get_all()
427 .with_query(&range)
428 .await?
429 .filter_map(Result::ok)
430 .filter_map(|f| self.deserialize_value::<RoomMember>(&f).ok().map(|m| m.user_id))
431 .collect::<Vec<_>>()
432 } else {
433 let mut user_ids = Vec::new();
434 let cursor = store.open_cursor().with_query(&range).await?;
435
436 if let Some(mut cursor) = cursor {
437 while let Some(value) = cursor.next_record().await? {
438 let member = self.deserialize_value::<RoomMember>(&value)?;
439
440 if memberships.matches(&member.membership) {
441 user_ids.push(member.user_id);
442 }
443 }
444 }
445
446 user_ids
447 };
448
449 Ok(user_ids)
450 }
451
452 async fn get_custom_value_for_js(&self, jskey: &JsValue) -> Result<Option<Vec<u8>>> {
453 self.inner
454 .transaction(keys::CUSTOM)
455 .with_mode(TransactionMode::Readonly)
456 .build()?
457 .object_store(keys::CUSTOM)?
458 .get(jskey)
459 .await?
460 .map(|f| self.deserialize_value(&f))
461 .transpose()
462 }
463
464 fn encode_kv_data_key(&self, key: StateStoreDataKey<'_>) -> JsValue {
465 match key {
469 StateStoreDataKey::SyncToken => {
470 self.encode_key(StateStoreDataKey::SYNC_TOKEN, StateStoreDataKey::SYNC_TOKEN)
471 }
472 StateStoreDataKey::SupportedVersions => self.encode_key(
473 StateStoreDataKey::SUPPORTED_VERSIONS,
474 StateStoreDataKey::SUPPORTED_VERSIONS,
475 ),
476 StateStoreDataKey::WellKnown => {
477 self.encode_key(keys::KV, StateStoreDataKey::WELL_KNOWN)
478 }
479 StateStoreDataKey::Filter(filter_name) => {
480 self.encode_key(StateStoreDataKey::FILTER, (StateStoreDataKey::FILTER, filter_name))
481 }
482 StateStoreDataKey::UserAvatarUrl(user_id) => {
483 self.encode_key(keys::KV, (StateStoreDataKey::USER_AVATAR_URL, user_id))
484 }
485 StateStoreDataKey::RecentlyVisitedRooms(user_id) => {
486 self.encode_key(keys::KV, (StateStoreDataKey::RECENTLY_VISITED_ROOMS, user_id))
487 }
488 StateStoreDataKey::UtdHookManagerData => {
489 self.encode_key(keys::KV, StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
490 }
491 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
492 self.encode_key(keys::KV, StateStoreDataKey::ONE_TIME_KEY_ALREADY_UPLOADED)
493 }
494 StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
495 if let Some(thread_root) = thread_root {
496 self.encode_key(
497 keys::KV,
498 (StateStoreDataKey::COMPOSER_DRAFT, (room_id, thread_root)),
499 )
500 } else {
501 self.encode_key(keys::KV, (StateStoreDataKey::COMPOSER_DRAFT, room_id))
502 }
503 }
504 StateStoreDataKey::SeenKnockRequests(room_id) => {
505 self.encode_key(keys::KV, (StateStoreDataKey::SEEN_KNOCK_REQUESTS, room_id))
506 }
507 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
508 self.encode_key(keys::KV, StateStoreDataKey::THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS)
509 }
510 StateStoreDataKey::HomeserverCapabilities => {
511 self.encode_key(keys::KV, StateStoreDataKey::HOMESERVER_CAPABILITIES)
512 }
513 }
514 }
515}
516
517#[derive(Serialize, Deserialize)]
520struct PersistedQueuedRequest {
521 pub room_id: OwnedRoomId,
523
524 kind: Option<QueuedRequestKind>,
527 transaction_id: OwnedTransactionId,
528
529 pub error: Option<QueueWedgeError>,
530
531 priority: Option<usize>,
532
533 #[serde(default = "created_now")]
535 created_at: MilliSecondsSinceUnixEpoch,
536
537 is_wedged: Option<bool>,
540
541 event: Option<SerializableEventContent>,
542}
543
544fn created_now() -> MilliSecondsSinceUnixEpoch {
545 MilliSecondsSinceUnixEpoch::now()
546}
547
548impl PersistedQueuedRequest {
549 fn into_queued_request(self) -> Option<QueuedRequest> {
550 let kind =
551 self.kind.or_else(|| self.event.map(|content| QueuedRequestKind::Event { content }))?;
552
553 let error = match self.is_wedged {
554 Some(true) => {
555 Some(QueueWedgeError::GenericApiError {
557 msg: "local echo failed to send in a previous session".into(),
558 })
559 }
560 _ => self.error,
561 };
562
563 let priority = self.priority.unwrap_or(0);
565
566 Some(QueuedRequest {
567 kind,
568 transaction_id: self.transaction_id,
569 error,
570 priority,
571 created_at: self.created_at,
572 })
573 }
574}
575
576#[derive(Serialize, Deserialize, PartialEq)]
577struct PersistedThreadSubscription {
578 status: String,
579 bump_stamp: Option<u64>,
580}
581
582impl From<StoredThreadSubscription> for PersistedThreadSubscription {
583 fn from(value: StoredThreadSubscription) -> Self {
584 Self { status: value.status.as_str().to_owned(), bump_stamp: value.bump_stamp }
585 }
586}
587
588#[cfg(target_family = "wasm")]
597macro_rules! impl_state_store {
598 ({ $($body:tt)* }) => {
599 #[async_trait(?Send)]
600 impl StateStore for IndexeddbStateStore {
601 type Error = IndexeddbStateStoreError;
602
603 $($body)*
604 }
605 };
606}
607
608#[cfg(not(target_family = "wasm"))]
609macro_rules! impl_state_store {
610 ({ $($body:tt)* }) => {
611 impl IndexeddbStateStore {
612 $($body)*
613 }
614 };
615}
616
617impl_state_store!({
618 async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
619 let encoded_key = self.encode_kv_data_key(key);
620
621 let value = self
622 .inner
623 .transaction(keys::KV)
624 .with_mode(TransactionMode::Readonly)
625 .build()?
626 .object_store(keys::KV)?
627 .get(&encoded_key)
628 .await?;
629
630 let value = match key {
631 StateStoreDataKey::SyncToken => value
632 .map(|f| self.deserialize_value::<String>(&f))
633 .transpose()?
634 .map(StateStoreDataValue::SyncToken),
635 StateStoreDataKey::SupportedVersions => value
636 .map(|f| self.deserialize_value::<TtlStoreValue<SupportedVersionsResponse>>(&f))
637 .transpose()?
638 .map(StateStoreDataValue::SupportedVersions),
639 StateStoreDataKey::WellKnown => value
640 .map(|f| self.deserialize_value::<TtlStoreValue<Option<WellKnownResponse>>>(&f))
641 .transpose()?
642 .map(StateStoreDataValue::WellKnown),
643 StateStoreDataKey::Filter(_) => value
644 .map(|f| self.deserialize_value::<String>(&f))
645 .transpose()?
646 .map(StateStoreDataValue::Filter),
647 StateStoreDataKey::UserAvatarUrl(_) => value
648 .map(|f| self.deserialize_value::<OwnedMxcUri>(&f))
649 .transpose()?
650 .map(StateStoreDataValue::UserAvatarUrl),
651 StateStoreDataKey::RecentlyVisitedRooms(_) => value
652 .map(|f| self.deserialize_value::<Vec<OwnedRoomId>>(&f))
653 .transpose()?
654 .map(StateStoreDataValue::RecentlyVisitedRooms),
655 StateStoreDataKey::UtdHookManagerData => value
656 .map(|f| self.deserialize_value::<GrowableBloom>(&f))
657 .transpose()?
658 .map(StateStoreDataValue::UtdHookManagerData),
659 StateStoreDataKey::OneTimeKeyAlreadyUploaded => value
660 .map(|f| self.deserialize_value::<bool>(&f))
661 .transpose()?
662 .map(|_| StateStoreDataValue::OneTimeKeyAlreadyUploaded),
663 StateStoreDataKey::ComposerDraft(_, _) => value
664 .map(|f| self.deserialize_value::<ComposerDraft>(&f))
665 .transpose()?
666 .map(StateStoreDataValue::ComposerDraft),
667 StateStoreDataKey::SeenKnockRequests(_) => value
668 .map(|f| self.deserialize_value::<BTreeMap<OwnedEventId, OwnedUserId>>(&f))
669 .transpose()?
670 .map(StateStoreDataValue::SeenKnockRequests),
671 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => value
672 .map(|f| self.deserialize_value::<Vec<ThreadSubscriptionCatchupToken>>(&f))
673 .transpose()?
674 .map(StateStoreDataValue::ThreadSubscriptionsCatchupTokens),
675 StateStoreDataKey::HomeserverCapabilities => value
676 .map(|f| self.deserialize_value::<Capabilities>(&f))
677 .transpose()?
678 .map(StateStoreDataValue::HomeserverCapabilities),
679 };
680
681 Ok(value)
682 }
683
684 async fn set_kv_data(
685 &self,
686 key: StateStoreDataKey<'_>,
687 value: StateStoreDataValue,
688 ) -> Result<()> {
689 let encoded_key = self.encode_kv_data_key(key);
690
691 let serialized_value = match key {
692 StateStoreDataKey::SyncToken => self
693 .serialize_value(&value.into_sync_token().expect("Session data not a sync token")),
694 StateStoreDataKey::SupportedVersions => self.serialize_value(
695 &value
696 .into_supported_versions()
697 .expect("Session data not containing supported versions"),
698 ),
699 StateStoreDataKey::WellKnown => self.serialize_value(
700 &value.into_well_known().expect("Session data not containing well-known"),
701 ),
702 StateStoreDataKey::Filter(_) => {
703 self.serialize_value(&value.into_filter().expect("Session data not a filter"))
704 }
705 StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
706 &value.into_user_avatar_url().expect("Session data not an user avatar url"),
707 ),
708 StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
709 &value
710 .into_recently_visited_rooms()
711 .expect("Session data not a recently visited room list"),
712 ),
713 StateStoreDataKey::UtdHookManagerData => self.serialize_value(
714 &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
715 ),
716 StateStoreDataKey::OneTimeKeyAlreadyUploaded => self.serialize_value(&true),
717 StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
718 &value.into_composer_draft().expect("Session data not a composer draft"),
719 ),
720 StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
721 &value
722 .into_seen_knock_requests()
723 .expect("Session data is not a set of seen knock request ids"),
724 ),
725 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => self.serialize_value(
726 &value
727 .into_thread_subscriptions_catchup_tokens()
728 .expect("Session data is not a list of thread subscription catchup tokens"),
729 ),
730 StateStoreDataKey::HomeserverCapabilities => self.serialize_value(
731 &value
732 .into_homeserver_capabilities()
733 .expect("Session data is not a homeserver capabilities"),
734 ),
735 };
736
737 let tx = self.inner.transaction(keys::KV).with_mode(TransactionMode::Readwrite).build()?;
738
739 let obj = tx.object_store(keys::KV)?;
740
741 obj.put(&serialized_value?).with_key(encoded_key).build()?;
742
743 tx.commit().await?;
744
745 Ok(())
746 }
747
748 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
749 let encoded_key = self.encode_kv_data_key(key);
750
751 let tx = self.inner.transaction(keys::KV).with_mode(TransactionMode::Readwrite).build()?;
752 let obj = tx.object_store(keys::KV)?;
753
754 obj.delete(&encoded_key).build()?;
755
756 tx.commit().await?;
757
758 Ok(())
759 }
760
761 async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
762 let mut stores: HashSet<&'static str> = [
763 (changes.sync_token.is_some(), keys::KV),
764 (!changes.ambiguity_maps.is_empty(), keys::DISPLAY_NAMES),
765 (!changes.account_data.is_empty(), keys::ACCOUNT_DATA),
766 (!changes.presence.is_empty(), keys::PRESENCE),
767 (
768 !changes.profiles.is_empty() || !changes.profiles_to_delete.is_empty(),
769 keys::PROFILES,
770 ),
771 (!changes.room_account_data.is_empty(), keys::ROOM_ACCOUNT_DATA),
772 (!changes.receipts.is_empty(), keys::ROOM_EVENT_RECEIPTS),
773 ]
774 .iter()
775 .filter_map(|(id, key)| if *id { Some(*key) } else { None })
776 .collect();
777
778 if !changes.state.is_empty() {
779 stores.extend([
780 keys::ROOM_STATE,
781 keys::USER_IDS,
782 keys::STRIPPED_USER_IDS,
783 keys::STRIPPED_ROOM_STATE,
784 keys::PROFILES,
785 ]);
786 }
787
788 if !changes.redactions.is_empty() {
789 stores.extend([keys::ROOM_STATE, keys::ROOM_INFOS]);
790 }
791
792 if !changes.room_infos.is_empty() {
793 stores.insert(keys::ROOM_INFOS);
794 }
795
796 if !changes.stripped_state.is_empty() {
797 stores.extend([keys::STRIPPED_ROOM_STATE, keys::STRIPPED_USER_IDS]);
798 }
799
800 if !changes.receipts.is_empty() {
801 stores.extend([keys::ROOM_EVENT_RECEIPTS, keys::ROOM_USER_RECEIPTS])
802 }
803
804 if stores.is_empty() {
805 return Ok(());
807 }
808
809 let stores: Vec<&'static str> = stores.into_iter().collect();
810 let tx = self.inner.transaction(stores).with_mode(TransactionMode::Readwrite).build()?;
811
812 if let Some(s) = &changes.sync_token {
813 tx.object_store(keys::KV)?
814 .put(&self.serialize_value(s)?)
815 .with_key(self.encode_kv_data_key(StateStoreDataKey::SyncToken))
816 .build()?;
817 }
818
819 if !changes.ambiguity_maps.is_empty() {
820 let store = tx.object_store(keys::DISPLAY_NAMES)?;
821 for (room_id, ambiguity_maps) in &changes.ambiguity_maps {
822 for (display_name, map) in ambiguity_maps {
823 let key = self.encode_key(
824 keys::DISPLAY_NAMES,
825 (
826 room_id,
827 display_name
828 .as_normalized_str()
829 .unwrap_or_else(|| display_name.as_raw_str()),
830 ),
831 );
832
833 store.put(&self.serialize_value(&map)?).with_key(key).build()?;
834 }
835 }
836 }
837
838 if !changes.account_data.is_empty() {
839 let store = tx.object_store(keys::ACCOUNT_DATA)?;
840 for (event_type, event) in &changes.account_data {
841 store
842 .put(&self.serialize_value(&event)?)
843 .with_key(self.encode_key(keys::ACCOUNT_DATA, event_type))
844 .build()?;
845 }
846 }
847
848 if !changes.room_account_data.is_empty() {
849 let store = tx.object_store(keys::ROOM_ACCOUNT_DATA)?;
850 for (room, events) in &changes.room_account_data {
851 for (event_type, event) in events {
852 let key = self.encode_key(keys::ROOM_ACCOUNT_DATA, (room, event_type));
853 store.put(&self.serialize_value(&event)?).with_key(key).build()?;
854 }
855 }
856 }
857
858 if !changes.state.is_empty() {
859 let state = tx.object_store(keys::ROOM_STATE)?;
860 let profiles = tx.object_store(keys::PROFILES)?;
861 let user_ids = tx.object_store(keys::USER_IDS)?;
862 let stripped_state = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
863 let stripped_user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
864
865 for (room, user_ids) in &changes.profiles_to_delete {
866 for user_id in user_ids {
867 let key = self.encode_key(keys::PROFILES, (room, user_id));
868 profiles.delete(&key).build()?;
869 }
870 }
871
872 for (room, event_types) in &changes.state {
873 let profile_changes = changes.profiles.get(room);
874
875 for (event_type, events) in event_types {
876 for (state_key, raw_event) in events {
877 let key = self.encode_key(keys::ROOM_STATE, (room, event_type, state_key));
878 state
879 .put(&self.serialize_value(&raw_event)?)
880 .with_key(key.clone())
881 .build()?;
882 stripped_state.delete(&key).build()?;
883
884 if *event_type == StateEventType::RoomMember {
885 let event =
886 match raw_event.deserialize_as_unchecked::<SyncRoomMemberEvent>() {
887 Ok(ev) => ev,
888 Err(e) => {
889 let event_id: Option<String> =
890 raw_event.get_field("event_id").ok().flatten();
891 debug!(event_id, "Failed to deserialize member event: {e}");
892 continue;
893 }
894 };
895
896 let key = (room, state_key);
897
898 stripped_user_ids
899 .delete(&self.encode_key(keys::STRIPPED_USER_IDS, key))
900 .build()?;
901
902 user_ids
903 .put(&self.serialize_value(&RoomMember::from(&event))?)
904 .with_key(self.encode_key(keys::USER_IDS, key))
905 .build()?;
906
907 if let Some(profile) =
908 profile_changes.and_then(|p| p.get(event.state_key()))
909 {
910 profiles
911 .put(&self.serialize_value(&profile)?)
912 .with_key(self.encode_key(keys::PROFILES, key))
913 .build()?;
914 }
915 }
916 }
917 }
918 }
919 }
920
921 if !changes.room_infos.is_empty() {
922 let room_infos = tx.object_store(keys::ROOM_INFOS)?;
923 for (room_id, room_info) in &changes.room_infos {
924 room_infos
925 .put(&self.serialize_value(&room_info)?)
926 .with_key(self.encode_key(keys::ROOM_INFOS, room_id))
927 .build()?;
928 }
929 }
930
931 if !changes.presence.is_empty() {
932 let store = tx.object_store(keys::PRESENCE)?;
933 for (sender, event) in &changes.presence {
934 store
935 .put(&self.serialize_value(&event)?)
936 .with_key(self.encode_key(keys::PRESENCE, sender))
937 .build()?;
938 }
939 }
940
941 if !changes.stripped_state.is_empty() {
942 let store = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
943 let user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
944
945 for (room, event_types) in &changes.stripped_state {
946 for (event_type, events) in event_types {
947 for (state_key, raw_event) in events {
948 let key = self
949 .encode_key(keys::STRIPPED_ROOM_STATE, (room, event_type, state_key));
950 store.put(&self.serialize_value(&raw_event)?).with_key(key).build()?;
951
952 if *event_type == StateEventType::RoomMember {
953 let event = match raw_event
954 .deserialize_as_unchecked::<StrippedRoomMemberEvent>()
955 {
956 Ok(ev) => ev,
957 Err(e) => {
958 let event_id: Option<String> =
959 raw_event.get_field("event_id").ok().flatten();
960 debug!(
961 event_id,
962 "Failed to deserialize stripped member event: {e}"
963 );
964 continue;
965 }
966 };
967
968 let key = (room, state_key);
969
970 user_ids
971 .put(&self.serialize_value(&RoomMember::from(&event))?)
972 .with_key(self.encode_key(keys::STRIPPED_USER_IDS, key))
973 .build()?;
974 }
975 }
976 }
977 }
978 }
979
980 if !changes.receipts.is_empty() {
981 let room_user_receipts = tx.object_store(keys::ROOM_USER_RECEIPTS)?;
982 let room_event_receipts = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
983
984 for (room, content) in &changes.receipts {
985 for (event_id, receipts) in &content.0 {
986 for (receipt_type, receipts) in receipts {
987 for (user_id, receipt) in receipts {
988 let key = match receipt.thread.as_str() {
989 Some(thread_id) => self.encode_key(
990 keys::ROOM_USER_RECEIPTS,
991 (room, receipt_type, thread_id, user_id),
992 ),
993 None => self.encode_key(
994 keys::ROOM_USER_RECEIPTS,
995 (room, receipt_type, user_id),
996 ),
997 };
998
999 if let Some((old_event, _)) =
1000 room_user_receipts.get(&key).await?.and_then(|f| {
1001 self.deserialize_value::<(OwnedEventId, Receipt)>(&f).ok()
1002 })
1003 {
1004 let key = match receipt.thread.as_str() {
1005 Some(thread_id) => self.encode_key(
1006 keys::ROOM_EVENT_RECEIPTS,
1007 (room, receipt_type, thread_id, old_event, user_id),
1008 ),
1009 None => self.encode_key(
1010 keys::ROOM_EVENT_RECEIPTS,
1011 (room, receipt_type, old_event, user_id),
1012 ),
1013 };
1014 room_event_receipts.delete(&key).build()?;
1015 }
1016
1017 room_user_receipts
1018 .put(&self.serialize_value(&(event_id, receipt))?)
1019 .with_key(key)
1020 .build()?;
1021
1022 let key = match receipt.thread.as_str() {
1024 Some(thread_id) => self.encode_key(
1025 keys::ROOM_EVENT_RECEIPTS,
1026 (room, receipt_type, thread_id, event_id, user_id),
1027 ),
1028 None => self.encode_key(
1029 keys::ROOM_EVENT_RECEIPTS,
1030 (room, receipt_type, event_id, user_id),
1031 ),
1032 };
1033 room_event_receipts
1034 .put(&self.serialize_value(&(user_id, receipt))?)
1035 .with_key(key)
1036 .build()?;
1037 }
1038 }
1039 }
1040 }
1041 }
1042
1043 if !changes.redactions.is_empty() {
1044 let state = tx.object_store(keys::ROOM_STATE)?;
1045 let room_info = tx.object_store(keys::ROOM_INFOS)?;
1046
1047 for (room_id, redactions) in &changes.redactions {
1048 let range = self.encode_to_range(keys::ROOM_STATE, room_id);
1049 let Some(mut cursor) = state.open_cursor().with_query(&range).await? else {
1050 continue;
1051 };
1052
1053 let mut redaction_rules = None;
1054
1055 while let Some(value) = cursor.next_record().await? {
1056 let Some(key) = cursor.key::<JsValue>()? else {
1057 break;
1058 };
1059
1060 let raw_evt = self.deserialize_value::<Raw<AnySyncStateEvent>>(&value)?;
1061 if let Ok(Some(event_id)) = raw_evt.get_field::<OwnedEventId>("event_id")
1062 && let Some(redaction) = redactions.get(&event_id)
1063 {
1064 let redaction_rules = match &redaction_rules {
1065 Some(r) => r,
1066 None => {
1067 let value = room_info
1068 .get(&self.encode_key(keys::ROOM_INFOS, room_id))
1069 .await?
1070 .and_then(|f| self.deserialize_value::<RoomInfo>(&f).ok())
1071 .map(|info| info.room_version_rules_or_default())
1072 .unwrap_or_else(|| {
1073 warn!(
1074 ?room_id,
1075 "Unable to get the room version rules, \
1076 defaulting to rules for room version \
1077 {ROOM_VERSION_FALLBACK}"
1078 );
1079 ROOM_VERSION_RULES_FALLBACK
1080 })
1081 .redaction;
1082 redaction_rules.get_or_insert(value)
1083 }
1084 };
1085
1086 let redacted = redact(
1087 raw_evt.deserialize_as::<CanonicalJsonObject>()?,
1088 redaction_rules,
1089 Some(RedactedBecause::from_raw_event(redaction)?),
1090 )
1091 .map_err(StoreError::Redaction)?;
1092 state.put(&self.serialize_value(&redacted)?).with_key(key).build()?;
1093 }
1094 }
1095 }
1096 }
1097
1098 tx.commit().await.map_err(|e| e.into())
1099 }
1100
1101 async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1102 self.inner
1103 .transaction(keys::PRESENCE)
1104 .with_mode(TransactionMode::Readonly)
1105 .build()?
1106 .object_store(keys::PRESENCE)?
1107 .get(&self.encode_key(keys::PRESENCE, user_id))
1108 .await?
1109 .map(|f| self.deserialize_value(&f))
1110 .transpose()
1111 }
1112
1113 async fn get_presence_events(
1114 &self,
1115 user_ids: &[OwnedUserId],
1116 ) -> Result<Vec<Raw<PresenceEvent>>> {
1117 if user_ids.is_empty() {
1118 return Ok(Vec::new());
1119 }
1120
1121 let txn =
1122 self.inner.transaction(keys::PRESENCE).with_mode(TransactionMode::Readonly).build()?;
1123 let store = txn.object_store(keys::PRESENCE)?;
1124
1125 let mut events = Vec::with_capacity(user_ids.len());
1126
1127 for user_id in user_ids {
1128 if let Some(event) = store
1129 .get(&self.encode_key(keys::PRESENCE, user_id))
1130 .await?
1131 .map(|f| self.deserialize_value(&f))
1132 .transpose()?
1133 {
1134 events.push(event)
1135 }
1136 }
1137
1138 Ok(events)
1139 }
1140
1141 async fn get_state_event(
1142 &self,
1143 room_id: &RoomId,
1144 event_type: StateEventType,
1145 state_key: &str,
1146 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1147 Ok(self
1148 .get_state_events_for_keys(room_id, event_type, &[state_key])
1149 .await?
1150 .into_iter()
1151 .next())
1152 }
1153
1154 async fn get_state_events(
1155 &self,
1156 room_id: &RoomId,
1157 event_type: StateEventType,
1158 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1159 let stripped_range =
1160 self.encode_to_range(keys::STRIPPED_ROOM_STATE, (room_id, &event_type));
1161 let stripped_events = self
1162 .inner
1163 .transaction(keys::STRIPPED_ROOM_STATE)
1164 .with_mode(TransactionMode::Readonly)
1165 .build()?
1166 .object_store(keys::STRIPPED_ROOM_STATE)?
1167 .get_all()
1168 .with_query(&stripped_range)
1169 .await?
1170 .filter_map(Result::ok)
1171 .filter_map(|f| {
1172 self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Stripped)
1173 })
1174 .collect::<Vec<_>>();
1175
1176 if !stripped_events.is_empty() {
1177 return Ok(stripped_events);
1178 }
1179
1180 let range = self.encode_to_range(keys::ROOM_STATE, (room_id, event_type));
1181 Ok(self
1182 .inner
1183 .transaction(keys::ROOM_STATE)
1184 .with_mode(TransactionMode::Readonly)
1185 .build()?
1186 .object_store(keys::ROOM_STATE)?
1187 .get_all()
1188 .with_query(&range)
1189 .await?
1190 .filter_map(Result::ok)
1191 .filter_map(|f| self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Sync))
1192 .collect::<Vec<_>>())
1193 }
1194
1195 async fn get_state_events_for_keys(
1196 &self,
1197 room_id: &RoomId,
1198 event_type: StateEventType,
1199 state_keys: &[&str],
1200 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1201 if state_keys.is_empty() {
1202 return Ok(Vec::new());
1203 }
1204
1205 let mut events = Vec::with_capacity(state_keys.len());
1206
1207 {
1208 let txn = self
1209 .inner
1210 .transaction(keys::STRIPPED_ROOM_STATE)
1211 .with_mode(TransactionMode::Readonly)
1212 .build()?;
1213 let store = txn.object_store(keys::STRIPPED_ROOM_STATE)?;
1214
1215 for state_key in state_keys {
1216 if let Some(event) =
1217 store
1218 .get(&self.encode_key(
1219 keys::STRIPPED_ROOM_STATE,
1220 (room_id, &event_type, state_key),
1221 ))
1222 .await?
1223 .map(|f| self.deserialize_value(&f))
1224 .transpose()?
1225 {
1226 events.push(RawAnySyncOrStrippedState::Stripped(event));
1227 }
1228 }
1229
1230 if !events.is_empty() {
1231 return Ok(events);
1232 }
1233 }
1234
1235 let txn = self
1236 .inner
1237 .transaction(keys::ROOM_STATE)
1238 .with_mode(TransactionMode::Readonly)
1239 .build()?;
1240 let store = txn.object_store(keys::ROOM_STATE)?;
1241
1242 for state_key in state_keys {
1243 if let Some(event) = store
1244 .get(&self.encode_key(keys::ROOM_STATE, (room_id, &event_type, state_key)))
1245 .await?
1246 .map(|f| self.deserialize_value(&f))
1247 .transpose()?
1248 {
1249 events.push(RawAnySyncOrStrippedState::Sync(event));
1250 }
1251 }
1252
1253 Ok(events)
1254 }
1255
1256 async fn get_profile(
1257 &self,
1258 room_id: &RoomId,
1259 user_id: &UserId,
1260 ) -> Result<Option<MinimalRoomMemberEvent>> {
1261 self.inner
1262 .transaction(keys::PROFILES)
1263 .with_mode(TransactionMode::Readonly)
1264 .build()?
1265 .object_store(keys::PROFILES)?
1266 .get(&self.encode_key(keys::PROFILES, (room_id, user_id)))
1267 .await?
1268 .map(|f| self.deserialize_value(&f))
1269 .transpose()
1270 }
1271
1272 async fn get_profiles<'a>(
1273 &self,
1274 room_id: &RoomId,
1275 user_ids: &'a [OwnedUserId],
1276 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1277 if user_ids.is_empty() {
1278 return Ok(BTreeMap::new());
1279 }
1280
1281 let txn =
1282 self.inner.transaction(keys::PROFILES).with_mode(TransactionMode::Readonly).build()?;
1283 let store = txn.object_store(keys::PROFILES)?;
1284
1285 let mut profiles = BTreeMap::new();
1286 for user_id in user_ids {
1287 if let Some(profile) = store
1288 .get(&self.encode_key(keys::PROFILES, (room_id, user_id)))
1289 .await?
1290 .map(|f| self.deserialize_value(&f))
1291 .transpose()?
1292 {
1293 profiles.insert(user_id.as_ref(), profile);
1294 }
1295 }
1296
1297 Ok(profiles)
1298 }
1299
1300 async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1301 let transaction = self
1302 .inner
1303 .transaction(keys::ROOM_INFOS)
1304 .with_mode(TransactionMode::Readonly)
1305 .build()?;
1306
1307 let object_store = transaction.object_store(keys::ROOM_INFOS)?;
1308
1309 Ok(match room_load_settings {
1310 RoomLoadSettings::All => object_store
1311 .get_all()
1312 .await?
1313 .map(|room_info| self.deserialize_value::<RoomInfo>(&room_info?))
1314 .collect::<Result<_>>()?,
1315
1316 RoomLoadSettings::One(room_id) => {
1317 match object_store.get(&self.encode_key(keys::ROOM_INFOS, room_id)).await? {
1318 Some(room_info) => vec![self.deserialize_value::<RoomInfo>(&room_info)?],
1319 None => vec![],
1320 }
1321 }
1322 })
1323 }
1324
1325 async fn get_users_with_display_name(
1326 &self,
1327 room_id: &RoomId,
1328 display_name: &DisplayName,
1329 ) -> Result<BTreeSet<OwnedUserId>> {
1330 self.inner
1331 .transaction(keys::DISPLAY_NAMES)
1332 .with_mode(TransactionMode::Readonly)
1333 .build()?
1334 .object_store(keys::DISPLAY_NAMES)?
1335 .get(&self.encode_key(
1336 keys::DISPLAY_NAMES,
1337 (
1338 room_id,
1339 display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1340 ),
1341 ))
1342 .await?
1343 .map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
1344 .unwrap_or_else(|| Ok(Default::default()))
1345 }
1346
1347 async fn get_users_with_display_names<'a>(
1348 &self,
1349 room_id: &RoomId,
1350 display_names: &'a [DisplayName],
1351 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1352 let mut map = HashMap::new();
1353
1354 if display_names.is_empty() {
1355 return Ok(map);
1356 }
1357
1358 let txn = self
1359 .inner
1360 .transaction(keys::DISPLAY_NAMES)
1361 .with_mode(TransactionMode::Readonly)
1362 .build()?;
1363 let store = txn.object_store(keys::DISPLAY_NAMES)?;
1364
1365 for display_name in display_names {
1366 if let Some(user_ids) = store
1367 .get(
1368 &self.encode_key(
1369 keys::DISPLAY_NAMES,
1370 (
1371 room_id,
1372 display_name
1373 .as_normalized_str()
1374 .unwrap_or_else(|| display_name.as_raw_str()),
1375 ),
1376 ),
1377 )
1378 .await?
1379 .map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
1380 .transpose()?
1381 {
1382 map.insert(display_name, user_ids);
1383 }
1384 }
1385
1386 Ok(map)
1387 }
1388
1389 async fn get_account_data_event(
1390 &self,
1391 event_type: GlobalAccountDataEventType,
1392 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1393 self.inner
1394 .transaction(keys::ACCOUNT_DATA)
1395 .with_mode(TransactionMode::Readonly)
1396 .build()?
1397 .object_store(keys::ACCOUNT_DATA)?
1398 .get(&self.encode_key(keys::ACCOUNT_DATA, event_type))
1399 .await?
1400 .map(|f| self.deserialize_value(&f))
1401 .transpose()
1402 }
1403
1404 async fn get_room_account_data_event(
1405 &self,
1406 room_id: &RoomId,
1407 event_type: RoomAccountDataEventType,
1408 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1409 self.inner
1410 .transaction(keys::ROOM_ACCOUNT_DATA)
1411 .with_mode(TransactionMode::Readonly)
1412 .build()?
1413 .object_store(keys::ROOM_ACCOUNT_DATA)?
1414 .get(&self.encode_key(keys::ROOM_ACCOUNT_DATA, (room_id, event_type)))
1415 .await?
1416 .map(|f| self.deserialize_value(&f))
1417 .transpose()
1418 }
1419
1420 async fn get_user_room_receipt_event(
1421 &self,
1422 room_id: &RoomId,
1423 receipt_type: ReceiptType,
1424 thread: ReceiptThread,
1425 user_id: &UserId,
1426 ) -> Result<Option<(OwnedEventId, Receipt)>> {
1427 let key = match thread.as_str() {
1428 Some(thread_id) => self
1429 .encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, thread_id, user_id)),
1430 None => self.encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, user_id)),
1431 };
1432 self.inner
1433 .transaction(keys::ROOM_USER_RECEIPTS)
1434 .with_mode(TransactionMode::Readonly)
1435 .build()?
1436 .object_store(keys::ROOM_USER_RECEIPTS)?
1437 .get(&key)
1438 .await?
1439 .map(|f| self.deserialize_value(&f))
1440 .transpose()
1441 }
1442
1443 async fn get_event_room_receipt_events(
1444 &self,
1445 room_id: &RoomId,
1446 receipt_type: ReceiptType,
1447 thread: ReceiptThread,
1448 event_id: &EventId,
1449 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1450 let range = match thread.as_str() {
1451 Some(thread_id) => self.encode_to_range(
1452 keys::ROOM_EVENT_RECEIPTS,
1453 (room_id, receipt_type, thread_id, event_id),
1454 ),
1455 None => {
1456 self.encode_to_range(keys::ROOM_EVENT_RECEIPTS, (room_id, receipt_type, event_id))
1457 }
1458 };
1459 let tx = self
1460 .inner
1461 .transaction(keys::ROOM_EVENT_RECEIPTS)
1462 .with_mode(TransactionMode::Readonly)
1463 .build()?;
1464 let store = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
1465
1466 Ok(store
1467 .get_all()
1468 .with_query(&range)
1469 .await?
1470 .filter_map(Result::ok)
1471 .filter_map(|f| self.deserialize_value(&f).ok())
1472 .collect::<Vec<_>>())
1473 }
1474
1475 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1476 let jskey = &JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1477 self.get_custom_value_for_js(jskey).await
1478 }
1479
1480 async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1481 let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1482
1483 let prev = self.get_custom_value_for_js(&jskey).await?;
1484
1485 let tx =
1486 self.inner.transaction(keys::CUSTOM).with_mode(TransactionMode::Readwrite).build()?;
1487
1488 tx.object_store(keys::CUSTOM)?
1489 .put(&self.serialize_value(&value)?)
1490 .with_key(jskey)
1491 .build()?;
1492
1493 tx.commit().await.map_err(IndexeddbStateStoreError::from)?;
1494 Ok(prev)
1495 }
1496
1497 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1498 let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1499
1500 let prev = self.get_custom_value_for_js(&jskey).await?;
1501
1502 let tx =
1503 self.inner.transaction(keys::CUSTOM).with_mode(TransactionMode::Readwrite).build()?;
1504
1505 tx.object_store(keys::CUSTOM)?.delete(&jskey).build()?;
1506
1507 tx.commit().await.map_err(IndexeddbStateStoreError::from)?;
1508 Ok(prev)
1509 }
1510
1511 async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1512 let direct_stores = [keys::ROOM_INFOS, keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE];
1514
1515 let prefixed_stores = [
1518 keys::PROFILES,
1519 keys::DISPLAY_NAMES,
1520 keys::USER_IDS,
1521 keys::ROOM_STATE,
1522 keys::ROOM_ACCOUNT_DATA,
1523 keys::ROOM_EVENT_RECEIPTS,
1524 keys::ROOM_USER_RECEIPTS,
1525 keys::STRIPPED_ROOM_STATE,
1526 keys::STRIPPED_USER_IDS,
1527 keys::THREAD_SUBSCRIPTIONS,
1528 ];
1529
1530 let all_stores = {
1531 let mut v = Vec::new();
1532 v.extend(prefixed_stores);
1533 v.extend(direct_stores);
1534 v
1535 };
1536
1537 let tx =
1538 self.inner.transaction(all_stores).with_mode(TransactionMode::Readwrite).build()?;
1539
1540 for store_name in direct_stores {
1541 tx.object_store(store_name)?.delete(&self.encode_key(store_name, room_id)).build()?;
1542 }
1543
1544 for store_name in prefixed_stores {
1545 let store = tx.object_store(store_name)?;
1546 let range = self.encode_to_range(store_name, room_id);
1547 for key in store.get_all_keys::<JsValue>().with_query(&range).await? {
1548 store.delete(&key?).build()?;
1549 }
1550 }
1551
1552 tx.commit().await.map_err(|e| e.into())
1553 }
1554
1555 async fn get_user_ids(
1556 &self,
1557 room_id: &RoomId,
1558 memberships: RoomMemberships,
1559 ) -> Result<Vec<OwnedUserId>> {
1560 let ids = self.get_user_ids_inner(room_id, memberships, true).await?;
1561 if !ids.is_empty() {
1562 return Ok(ids);
1563 }
1564 self.get_user_ids_inner(room_id, memberships, false).await
1565 }
1566
1567 async fn save_send_queue_request(
1568 &self,
1569 room_id: &RoomId,
1570 transaction_id: OwnedTransactionId,
1571 created_at: MilliSecondsSinceUnixEpoch,
1572 kind: QueuedRequestKind,
1573 priority: usize,
1574 ) -> Result<()> {
1575 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1576
1577 let tx = self
1578 .inner
1579 .transaction(keys::ROOM_SEND_QUEUE)
1580 .with_mode(TransactionMode::Readwrite)
1581 .build()?;
1582
1583 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1584
1585 let prev = obj.get(&encoded_key).await?;
1590
1591 let mut prev = prev.map_or_else(
1592 || Ok(Vec::new()),
1593 |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1594 )?;
1595
1596 prev.push(PersistedQueuedRequest {
1598 room_id: room_id.to_owned(),
1599 kind: Some(kind),
1600 transaction_id,
1601 error: None,
1602 is_wedged: None,
1603 event: None,
1604 priority: Some(priority),
1605 created_at,
1606 });
1607
1608 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1610
1611 tx.commit().await?;
1612
1613 Ok(())
1614 }
1615
1616 async fn update_send_queue_request(
1617 &self,
1618 room_id: &RoomId,
1619 transaction_id: &TransactionId,
1620 kind: QueuedRequestKind,
1621 ) -> Result<bool> {
1622 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1623
1624 let tx = self
1625 .inner
1626 .transaction(keys::ROOM_SEND_QUEUE)
1627 .with_mode(TransactionMode::Readwrite)
1628 .build()?;
1629
1630 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1631
1632 let prev = obj.get(&encoded_key).await?;
1637
1638 let mut prev = prev.map_or_else(
1639 || Ok(Vec::new()),
1640 |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1641 )?;
1642
1643 if let Some(entry) = prev.iter_mut().find(|entry| entry.transaction_id == transaction_id) {
1645 entry.kind = Some(kind);
1646 entry.error = None;
1648 entry.is_wedged = None;
1650 entry.event = None;
1651
1652 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1654 tx.commit().await?;
1655
1656 Ok(true)
1657 } else {
1658 Ok(false)
1659 }
1660 }
1661
1662 async fn remove_send_queue_request(
1663 &self,
1664 room_id: &RoomId,
1665 transaction_id: &TransactionId,
1666 ) -> Result<bool> {
1667 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1668
1669 let tx = self
1670 .inner
1671 .transaction([keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE])
1672 .with_mode(TransactionMode::Readwrite)
1673 .build()?;
1674
1675 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1676
1677 if let Some(val) = obj.get(&encoded_key).await? {
1682 let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
1683 if let Some(pos) = prev.iter().position(|item| item.transaction_id == transaction_id) {
1684 prev.remove(pos);
1685
1686 if prev.is_empty() {
1687 obj.delete(&encoded_key).build()?;
1688 } else {
1689 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1690 }
1691
1692 tx.commit().await?;
1693 return Ok(true);
1694 }
1695 }
1696
1697 Ok(false)
1698 }
1699
1700 async fn load_send_queue_requests(&self, room_id: &RoomId) -> Result<Vec<QueuedRequest>> {
1701 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1702
1703 let prev = self
1706 .inner
1707 .transaction(keys::ROOM_SEND_QUEUE)
1708 .with_mode(TransactionMode::Readwrite)
1709 .build()?
1710 .object_store(keys::ROOM_SEND_QUEUE)?
1711 .get(&encoded_key)
1712 .await?;
1713
1714 let mut prev = prev.map_or_else(
1715 || Ok(Vec::new()),
1716 |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1717 )?;
1718
1719 prev.sort_by_key(|item| Reverse(item.priority.unwrap_or(0)));
1721
1722 Ok(prev.into_iter().filter_map(PersistedQueuedRequest::into_queued_request).collect())
1723 }
1724
1725 async fn update_send_queue_request_status(
1726 &self,
1727 room_id: &RoomId,
1728 transaction_id: &TransactionId,
1729 error: Option<QueueWedgeError>,
1730 ) -> Result<()> {
1731 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1732
1733 let tx = self
1734 .inner
1735 .transaction(keys::ROOM_SEND_QUEUE)
1736 .with_mode(TransactionMode::Readwrite)
1737 .build()?;
1738
1739 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1740
1741 if let Some(val) = obj.get(&encoded_key).await? {
1742 let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
1743 if let Some(request) =
1744 prev.iter_mut().find(|item| item.transaction_id == transaction_id)
1745 {
1746 request.is_wedged = None;
1747 request.error = error;
1748 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1749 }
1750 }
1751
1752 tx.commit().await?;
1753
1754 Ok(())
1755 }
1756
1757 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>> {
1758 let tx = self
1759 .inner
1760 .transaction(keys::ROOM_SEND_QUEUE)
1761 .with_mode(TransactionMode::Readwrite)
1762 .build()?;
1763
1764 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1765
1766 let all_entries = obj
1767 .get_all()
1768 .await?
1769 .map(|item| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&item?))
1770 .collect::<Result<Vec<Vec<PersistedQueuedRequest>>, _>>()?
1771 .into_iter()
1772 .flat_map(|vec| vec.into_iter().map(|item| item.room_id))
1773 .collect::<BTreeSet<_>>();
1774
1775 Ok(all_entries.into_iter().collect())
1776 }
1777
1778 async fn save_dependent_queued_request(
1779 &self,
1780 room_id: &RoomId,
1781 parent_txn_id: &TransactionId,
1782 own_txn_id: ChildTransactionId,
1783 created_at: MilliSecondsSinceUnixEpoch,
1784 content: DependentQueuedRequestKind,
1785 ) -> Result<()> {
1786 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1787
1788 let tx = self
1789 .inner
1790 .transaction(keys::DEPENDENT_SEND_QUEUE)
1791 .with_mode(TransactionMode::Readwrite)
1792 .build()?;
1793
1794 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1795
1796 let prev = obj.get(&encoded_key).await?;
1799
1800 let mut prev = prev.map_or_else(
1801 || Ok(Vec::new()),
1802 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1803 )?;
1804
1805 prev.push(DependentQueuedRequest {
1807 kind: content,
1808 parent_transaction_id: parent_txn_id.to_owned(),
1809 own_transaction_id: own_txn_id,
1810 parent_key: None,
1811 created_at,
1812 });
1813
1814 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1816
1817 tx.commit().await?;
1818
1819 Ok(())
1820 }
1821
1822 async fn update_dependent_queued_request(
1823 &self,
1824 room_id: &RoomId,
1825 own_transaction_id: &ChildTransactionId,
1826 new_content: DependentQueuedRequestKind,
1827 ) -> Result<bool> {
1828 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1829
1830 let tx = self
1831 .inner
1832 .transaction(keys::DEPENDENT_SEND_QUEUE)
1833 .with_mode(TransactionMode::Readwrite)
1834 .build()?;
1835
1836 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1837
1838 let prev = obj.get(&encoded_key).await?;
1841
1842 let mut prev = prev.map_or_else(
1843 || Ok(Vec::new()),
1844 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1845 )?;
1846
1847 let mut found = false;
1849 for entry in prev.iter_mut() {
1850 if entry.own_transaction_id == *own_transaction_id {
1851 found = true;
1852 entry.kind = new_content;
1853 break;
1854 }
1855 }
1856
1857 if found {
1858 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1859 tx.commit().await?;
1860 }
1861
1862 Ok(found)
1863 }
1864
1865 async fn mark_dependent_queued_requests_as_ready(
1866 &self,
1867 room_id: &RoomId,
1868 parent_txn_id: &TransactionId,
1869 parent_key: SentRequestKey,
1870 ) -> Result<usize> {
1871 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1872
1873 let tx = self
1874 .inner
1875 .transaction(keys::DEPENDENT_SEND_QUEUE)
1876 .with_mode(TransactionMode::Readwrite)
1877 .build()?;
1878
1879 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1880
1881 let prev = obj.get(&encoded_key).await?;
1884
1885 let mut prev = prev.map_or_else(
1886 || Ok(Vec::new()),
1887 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1888 )?;
1889
1890 let mut num_updated = 0;
1892 for entry in prev.iter_mut().filter(|entry| entry.parent_transaction_id == parent_txn_id) {
1893 entry.parent_key = Some(parent_key.clone());
1894 num_updated += 1;
1895 }
1896
1897 if num_updated > 0 {
1898 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1899 tx.commit().await?;
1900 }
1901
1902 Ok(num_updated)
1903 }
1904
1905 async fn remove_dependent_queued_request(
1906 &self,
1907 room_id: &RoomId,
1908 txn_id: &ChildTransactionId,
1909 ) -> Result<bool> {
1910 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1911
1912 let tx = self
1913 .inner
1914 .transaction(keys::DEPENDENT_SEND_QUEUE)
1915 .with_mode(TransactionMode::Readwrite)
1916 .build()?;
1917
1918 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1919
1920 if let Some(val) = obj.get(&encoded_key).await? {
1923 let mut prev = self.deserialize_value::<Vec<DependentQueuedRequest>>(&val)?;
1924 if let Some(pos) = prev.iter().position(|item| item.own_transaction_id == *txn_id) {
1925 prev.remove(pos);
1926
1927 if prev.is_empty() {
1928 obj.delete(&encoded_key).build()?;
1929 } else {
1930 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1931 }
1932
1933 tx.commit().await?;
1934 return Ok(true);
1935 }
1936 }
1937
1938 Ok(false)
1939 }
1940
1941 async fn load_dependent_queued_requests(
1942 &self,
1943 room_id: &RoomId,
1944 ) -> Result<Vec<DependentQueuedRequest>> {
1945 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1946
1947 let prev = self
1949 .inner
1950 .transaction(keys::DEPENDENT_SEND_QUEUE)
1951 .with_mode(TransactionMode::Readwrite)
1952 .build()?
1953 .object_store(keys::DEPENDENT_SEND_QUEUE)?
1954 .get(&encoded_key)
1955 .await?;
1956
1957 prev.map_or_else(
1958 || Ok(Vec::new()),
1959 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1960 )
1961 }
1962
1963 async fn upsert_thread_subscriptions(
1964 &self,
1965 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1966 ) -> Result<()> {
1967 let tx = self
1968 .inner
1969 .transaction(keys::THREAD_SUBSCRIPTIONS)
1970 .with_mode(TransactionMode::Readwrite)
1971 .build()?;
1972 let obj = tx.object_store(keys::THREAD_SUBSCRIPTIONS)?;
1973
1974 for (room_id, thread_id, subscription) in updates {
1975 let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room_id, thread_id));
1976 let mut new = PersistedThreadSubscription::from(subscription);
1977
1978 if let Some(previous_value) = obj.get(&encoded_key).await? {
1980 let previous: PersistedThreadSubscription =
1981 self.deserialize_value(&previous_value)?;
1982
1983 if new == previous {
1985 continue;
1986 }
1987 if !compare_thread_subscription_bump_stamps(
1988 previous.bump_stamp,
1989 &mut new.bump_stamp,
1990 ) {
1991 continue;
1992 }
1993 }
1994
1995 let serialized_value = self.serialize_value(&new);
1996 obj.put(&serialized_value?).with_key(encoded_key).build()?;
1997 }
1998
1999 tx.commit().await?;
2000
2001 Ok(())
2002 }
2003
2004 async fn load_thread_subscription(
2005 &self,
2006 room: &RoomId,
2007 thread_id: &EventId,
2008 ) -> Result<Option<StoredThreadSubscription>> {
2009 let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
2010
2011 let js_value = self
2012 .inner
2013 .transaction(keys::THREAD_SUBSCRIPTIONS)
2014 .with_mode(TransactionMode::Readonly)
2015 .build()?
2016 .object_store(keys::THREAD_SUBSCRIPTIONS)?
2017 .get(&encoded_key)
2018 .await?;
2019
2020 let Some(js_value) = js_value else {
2021 return Ok(None);
2023 };
2024
2025 let sub: PersistedThreadSubscription = self.deserialize_value(&js_value)?;
2026
2027 let status = ThreadSubscriptionStatus::from_str(&sub.status).map_err(|_| {
2028 StoreError::InvalidData {
2029 details: format!(
2030 "invalid thread status for room {room} and thread {thread_id}: {}",
2031 sub.status
2032 ),
2033 }
2034 })?;
2035
2036 Ok(Some(StoredThreadSubscription { status, bump_stamp: sub.bump_stamp }))
2037 }
2038
2039 async fn remove_thread_subscription(&self, room: &RoomId, thread_id: &EventId) -> Result<()> {
2040 let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
2041
2042 let transaction = self
2043 .inner
2044 .transaction(keys::THREAD_SUBSCRIPTIONS)
2045 .with_mode(TransactionMode::Readwrite)
2046 .build()?;
2047 transaction.object_store(keys::THREAD_SUBSCRIPTIONS)?.delete(&encoded_key).await?;
2048 transaction.commit().await?;
2049
2050 Ok(())
2051 }
2052
2053 #[allow(clippy::unused_async)]
2054 async fn optimize(&self) -> Result<()> {
2055 Ok(())
2056 }
2057
2058 #[allow(clippy::unused_async)]
2059 async fn get_size(&self) -> Result<Option<usize>> {
2060 Ok(None)
2061 }
2062});
2063
2064#[derive(Debug, Serialize, Deserialize)]
2066struct RoomMember {
2067 user_id: OwnedUserId,
2068 membership: MembershipState,
2069}
2070
2071impl From<&SyncStateEvent<RoomMemberEventContent>> for RoomMember {
2072 fn from(event: &SyncStateEvent<RoomMemberEventContent>) -> Self {
2073 Self { user_id: event.state_key().clone(), membership: event.membership().clone() }
2074 }
2075}
2076
2077impl From<&StrippedRoomMemberEvent> for RoomMember {
2078 fn from(event: &StrippedRoomMemberEvent) -> Self {
2079 Self { user_id: event.state_key.clone(), membership: event.content.membership.clone() }
2080 }
2081}
2082
2083#[cfg(test)]
2084mod migration_tests {
2085 use assert_matches2::assert_matches;
2086 use matrix_sdk_base::store::{QueuedRequestKind, SerializableEventContent};
2087 use ruma::{
2088 OwnedRoomId, OwnedTransactionId, TransactionId,
2089 events::room::message::RoomMessageEventContent, room_id,
2090 };
2091 use serde::{Deserialize, Serialize};
2092
2093 use crate::state_store::PersistedQueuedRequest;
2094
2095 #[derive(Serialize, Deserialize)]
2096 struct OldPersistedQueuedRequest {
2097 room_id: OwnedRoomId,
2098 event: SerializableEventContent,
2099 transaction_id: OwnedTransactionId,
2100 is_wedged: bool,
2101 }
2102
2103 #[test]
2107 fn test_migrating_persisted_queue_event_serialization() {
2108 let room_a_id = room_id!("!room_a:dummy.local");
2109 let transaction_id = TransactionId::new();
2110 let content =
2111 SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())
2112 .unwrap();
2113
2114 let old_persisted_queue_event = OldPersistedQueuedRequest {
2115 room_id: room_a_id.to_owned(),
2116 event: content,
2117 transaction_id: transaction_id.clone(),
2118 is_wedged: true,
2119 };
2120
2121 let serialized_persisted = serde_json::to_vec(&old_persisted_queue_event).unwrap();
2122
2123 let new_persisted: PersistedQueuedRequest =
2125 serde_json::from_slice(&serialized_persisted).unwrap();
2126
2127 assert_eq!(new_persisted.is_wedged, Some(true));
2128 assert!(new_persisted.error.is_none());
2129
2130 assert!(new_persisted.event.is_some());
2131 assert!(new_persisted.kind.is_none());
2132
2133 let queued = new_persisted.into_queued_request().unwrap();
2134 assert_matches!(queued.kind, QueuedRequestKind::Event { .. });
2135 assert_eq!(queued.transaction_id, transaction_id);
2136 assert!(queued.error.is_some());
2137 }
2138}
2139
2140#[cfg(all(test, target_family = "wasm"))]
2141mod tests {
2142 #[cfg(target_family = "wasm")]
2143 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
2144
2145 use matrix_sdk_base::statestore_integration_tests;
2146 use uuid::Uuid;
2147
2148 use super::{IndexeddbStateStore, Result};
2149
2150 async fn get_store() -> Result<IndexeddbStateStore> {
2151 let db_name = format!("test-state-plain-{}", Uuid::new_v4().as_hyphenated());
2152 Ok(IndexeddbStateStore::builder().name(db_name).build().await?)
2153 }
2154
2155 statestore_integration_tests!();
2156}
2157
2158#[cfg(all(test, target_family = "wasm"))]
2159mod encrypted_tests {
2160 #[cfg(target_family = "wasm")]
2161 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
2162
2163 use matrix_sdk_base::statestore_integration_tests;
2164 use uuid::Uuid;
2165
2166 use super::{IndexeddbStateStore, Result};
2167
2168 async fn get_store() -> Result<IndexeddbStateStore> {
2169 let db_name = format!("test-state-encrypted-{}", Uuid::new_v4().as_hyphenated());
2170 let passphrase = format!("some_passphrase-{}", Uuid::new_v4().as_hyphenated());
2171 Ok(IndexeddbStateStore::builder().name(db_name).passphrase(passphrase).build().await?)
2172 }
2173
2174 statestore_integration_tests!();
2175}