1use std::{
16 cmp::Reverse,
17 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
18 str::FromStr as _,
19 sync::Arc,
20};
21
22use async_trait::async_trait;
23use gloo_utils::format::JsValueSerdeExt;
24use growable_bloom_filter::GrowableBloom;
25use indexed_db_futures::{
26 KeyRange, cursor::CursorDirection, database::Database, error::OpenDbError, prelude::*,
27 transaction::TransactionMode,
28};
29use matrix_sdk_base::{
30 MinimalRoomMemberEvent, ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK, RoomInfo,
31 RoomMemberships, StateStoreDataKey, StateStoreDataValue, ThreadSubscriptionCatchupToken,
32 deserialized_responses::{DisplayName, RawAnySyncOrStrippedState},
33 store::{
34 ChildTransactionId, ComposerDraft, DependentQueuedRequest, DependentQueuedRequestKind,
35 QueuedRequest, QueuedRequestKind, RoomLoadSettings, SentRequestKey,
36 SerializableEventContent, StateChanges, StateStore, StoreError, StoredThreadSubscription,
37 SupportedVersionsResponse, ThreadSubscriptionStatus, WellKnownResponse,
38 compare_thread_subscription_bump_stamps,
39 },
40 ttl::TtlValue,
41};
42use matrix_sdk_store_encryption::{Error as EncryptionError, StoreCipher};
43use ruma::{
44 CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri,
45 OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
46 api::client::discovery::get_capabilities::v3::Capabilities,
47 canonical_json::{RedactedBecause, redact},
48 events::{
49 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
50 GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, SyncStateEvent,
51 presence::PresenceEvent,
52 receipt::{Receipt, ReceiptThread, ReceiptType},
53 room::member::{
54 MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent, SyncRoomMemberEvent,
55 },
56 },
57 serde::Raw,
58};
59use serde::{Deserialize, Serialize, de::DeserializeOwned, ser::Error};
60use tracing::{debug, warn};
61use wasm_bindgen::JsValue;
62
63mod migrations;
64
65pub use self::migrations::MigrationConflictStrategy;
66use self::migrations::{upgrade_inner_db, upgrade_meta_db};
67use crate::{error::GenericError, serializer::safe_encode::traits::SafeEncode};
68
69#[derive(Debug, thiserror::Error)]
70pub enum IndexeddbStateStoreError {
71 #[error(transparent)]
72 Json(#[from] serde_json::Error),
73 #[error(transparent)]
74 Encryption(#[from] EncryptionError),
75 #[error("DomException {name} ({code}): {message}")]
76 DomException { name: String, message: String, code: u16 },
77 #[error(transparent)]
78 StoreError(#[from] StoreError),
79 #[error(
80 "Can't migrate {name} from {old_version} to {new_version} without deleting data. \
81 See MigrationConflictStrategy for ways to configure."
82 )]
83 MigrationConflict { name: String, old_version: u32, new_version: u32 },
84}
85
86impl From<GenericError> for IndexeddbStateStoreError {
87 fn from(value: GenericError) -> Self {
88 Self::StoreError(value.into())
89 }
90}
91
92impl From<web_sys::DomException> for IndexeddbStateStoreError {
93 fn from(frm: web_sys::DomException) -> IndexeddbStateStoreError {
94 IndexeddbStateStoreError::DomException {
95 name: frm.name(),
96 message: frm.message(),
97 code: frm.code(),
98 }
99 }
100}
101
102impl From<IndexeddbStateStoreError> for StoreError {
103 fn from(e: IndexeddbStateStoreError) -> Self {
104 match e {
105 IndexeddbStateStoreError::Json(e) => StoreError::Json(e),
106 IndexeddbStateStoreError::StoreError(e) => e,
107 IndexeddbStateStoreError::Encryption(e) => StoreError::Encryption(e),
108 _ => StoreError::backend(e),
109 }
110 }
111}
112
113impl From<indexed_db_futures::error::DomException> for IndexeddbStateStoreError {
114 fn from(value: indexed_db_futures::error::DomException) -> Self {
115 web_sys::DomException::from(value).into()
116 }
117}
118
119impl From<indexed_db_futures::error::SerialisationError> for IndexeddbStateStoreError {
120 fn from(value: indexed_db_futures::error::SerialisationError) -> Self {
121 Self::Json(serde_json::Error::custom(value.to_string()))
122 }
123}
124
125impl From<indexed_db_futures::error::UnexpectedDataError> for IndexeddbStateStoreError {
126 fn from(value: indexed_db_futures::error::UnexpectedDataError) -> Self {
127 IndexeddbStateStoreError::StoreError(StoreError::backend(value))
128 }
129}
130
131impl From<indexed_db_futures::error::JSError> for IndexeddbStateStoreError {
132 fn from(value: indexed_db_futures::error::JSError) -> Self {
133 GenericError::from(value.to_string()).into()
134 }
135}
136
137impl From<indexed_db_futures::error::Error> for IndexeddbStateStoreError {
138 fn from(value: indexed_db_futures::error::Error) -> Self {
139 use indexed_db_futures::error::Error;
140 match value {
141 Error::DomException(e) => e.into(),
142 Error::Serialisation(e) => e.into(),
143 Error::MissingData(e) => e.into(),
144 Error::Unknown(e) => e.into(),
145 }
146 }
147}
148
149impl From<OpenDbError> for IndexeddbStateStoreError {
150 fn from(value: OpenDbError) -> Self {
151 match value {
152 OpenDbError::Base(error) => error.into(),
153 _ => GenericError::from(value.to_string()).into(),
154 }
155 }
156}
157
158mod keys {
159 pub const INTERNAL_STATE: &str = "matrix-sdk-state";
160 pub const BACKUPS_META: &str = "backups";
161
162 pub const ACCOUNT_DATA: &str = "account_data";
163
164 pub const PROFILES: &str = "profiles";
165 pub const DISPLAY_NAMES: &str = "display_names";
166 pub const USER_IDS: &str = "user_ids";
167
168 pub const ROOM_STATE: &str = "room_state";
169 pub const ROOM_INFOS: &str = "room_infos";
170 pub const PRESENCE: &str = "presence";
171 pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
172 pub const ROOM_SEND_QUEUE: &str = "room_send_queue";
174 pub const DEPENDENT_SEND_QUEUE: &str = "room_dependent_send_queue";
176 pub const THREAD_SUBSCRIPTIONS: &str = "room_thread_subscriptions";
177
178 pub const STRIPPED_ROOM_STATE: &str = "stripped_room_state";
179 pub const STRIPPED_USER_IDS: &str = "stripped_user_ids";
180
181 pub const ROOM_USER_RECEIPTS: &str = "room_user_receipts";
182 pub const ROOM_EVENT_RECEIPTS: &str = "room_event_receipts";
183
184 pub const CUSTOM: &str = "custom";
185 pub const KV: &str = "kv";
186
187 pub const ALL_STORES: &[&str] = &[
189 ACCOUNT_DATA,
190 PROFILES,
191 DISPLAY_NAMES,
192 USER_IDS,
193 ROOM_STATE,
194 ROOM_INFOS,
195 PRESENCE,
196 ROOM_ACCOUNT_DATA,
197 STRIPPED_ROOM_STATE,
198 STRIPPED_USER_IDS,
199 ROOM_USER_RECEIPTS,
200 ROOM_EVENT_RECEIPTS,
201 ROOM_SEND_QUEUE,
202 THREAD_SUBSCRIPTIONS,
203 DEPENDENT_SEND_QUEUE,
204 CUSTOM,
205 KV,
206 ];
207
208 pub const STORE_KEY: &str = "store_key";
211}
212
213pub use keys::ALL_STORES;
214use matrix_sdk_base::store::QueueWedgeError;
215
216fn serialize_value(store_cipher: Option<&StoreCipher>, event: &impl Serialize) -> Result<JsValue> {
218 Ok(match store_cipher {
219 Some(cipher) => {
220 let data = serde_json::to_vec(event)?;
221 JsValue::from_serde(&cipher.encrypt_value_data(data)?)?
222 }
223 None => JsValue::from_serde(event)?,
224 })
225}
226
227fn deserialize_value<T: DeserializeOwned>(
229 store_cipher: Option<&StoreCipher>,
230 event: &JsValue,
231) -> Result<T> {
232 match store_cipher {
233 Some(cipher) => {
234 use zeroize::Zeroize;
235 let mut plaintext = cipher.decrypt_value_data(event.into_serde()?)?;
236 let ret = serde_json::from_slice(&plaintext);
237 plaintext.zeroize();
238 Ok(ret?)
239 }
240 None => Ok(event.into_serde()?),
241 }
242}
243
244fn encode_key<T>(store_cipher: Option<&StoreCipher>, table_name: &str, key: T) -> JsValue
245where
246 T: SafeEncode,
247{
248 match store_cipher {
249 Some(cipher) => key.as_secure_string(table_name, cipher),
250 None => key.as_encoded_string(),
251 }
252 .into()
253}
254
255fn encode_to_range<T>(
256 store_cipher: Option<&StoreCipher>,
257 table_name: &str,
258 key: T,
259) -> KeyRange<JsValue>
260where
261 T: SafeEncode,
262{
263 match store_cipher {
264 Some(cipher) => key.encode_to_range_secure(table_name, cipher),
265 None => key.encode_to_range(),
266 }
267}
268
269#[derive(Debug)]
271pub struct IndexeddbStateStoreBuilder {
272 name: Option<String>,
273 passphrase: Option<String>,
274 migration_conflict_strategy: MigrationConflictStrategy,
275}
276
277impl IndexeddbStateStoreBuilder {
278 fn new() -> Self {
279 Self {
280 name: None,
281 passphrase: None,
282 migration_conflict_strategy: MigrationConflictStrategy::BackupAndDrop,
283 }
284 }
285
286 pub fn name(mut self, value: String) -> Self {
288 self.name = Some(value);
289 self
290 }
291
292 pub fn passphrase(mut self, value: String) -> Self {
296 self.passphrase = Some(value);
297 self
298 }
299
300 pub fn migration_conflict_strategy(mut self, value: MigrationConflictStrategy) -> Self {
304 self.migration_conflict_strategy = value;
305 self
306 }
307
308 pub async fn build(self) -> Result<IndexeddbStateStore> {
309 let migration_strategy = self.migration_conflict_strategy.clone();
310 let name = self.name.unwrap_or_else(|| "state".to_owned());
311
312 let meta_name = format!("{name}::{}", keys::INTERNAL_STATE);
313
314 let (meta, store_cipher) = upgrade_meta_db(&meta_name, self.passphrase.as_deref()).await?;
315 let inner =
316 upgrade_inner_db(&name, store_cipher.as_deref(), migration_strategy, &meta).await?;
317
318 Ok(IndexeddbStateStore { name, inner, meta, store_cipher })
319 }
320}
321
322pub struct IndexeddbStateStore {
323 name: String,
324 pub(crate) inner: Database,
325 pub(crate) meta: Database,
326 pub(crate) store_cipher: Option<Arc<StoreCipher>>,
327}
328
329#[cfg(not(tarpaulin_include))]
330impl std::fmt::Debug for IndexeddbStateStore {
331 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
332 f.debug_struct("IndexeddbStateStore").field("name", &self.name).finish()
333 }
334}
335
336type Result<A, E = IndexeddbStateStoreError> = std::result::Result<A, E>;
337
338impl IndexeddbStateStore {
339 pub fn builder() -> IndexeddbStateStoreBuilder {
341 IndexeddbStateStoreBuilder::new()
342 }
343
344 pub fn version(&self) -> u32 {
346 self.inner.version() as u32
347 }
348
349 pub fn meta_version(&self) -> u32 {
351 self.meta.version() as u32
352 }
353
354 pub async fn has_backups(&self) -> Result<bool> {
356 Ok(self
357 .meta
358 .transaction(keys::BACKUPS_META)
359 .with_mode(TransactionMode::Readonly)
360 .build()?
361 .object_store(keys::BACKUPS_META)?
362 .count()
363 .await?
364 > 0)
365 }
366
367 pub async fn latest_backup(&self) -> Result<Option<String>> {
369 if let Some(mut cursor) = self
370 .meta
371 .transaction(keys::BACKUPS_META)
372 .with_mode(TransactionMode::Readonly)
373 .build()?
374 .object_store(keys::BACKUPS_META)?
375 .open_cursor()
376 .with_direction(CursorDirection::Prev)
377 .await?
378 && let Some(record) = cursor.next_record::<JsValue>().await?
379 {
380 Ok(record.as_string())
381 } else {
382 Ok(None)
383 }
384 }
385
386 fn serialize_value(&self, event: &impl Serialize) -> Result<JsValue> {
388 serialize_value(self.store_cipher.as_deref(), event)
389 }
390
391 fn deserialize_value<T: DeserializeOwned>(&self, event: &JsValue) -> Result<T> {
393 deserialize_value(self.store_cipher.as_deref(), event)
394 }
395
396 fn encode_key<T>(&self, table_name: &str, key: T) -> JsValue
397 where
398 T: SafeEncode,
399 {
400 encode_key(self.store_cipher.as_deref(), table_name, key)
401 }
402
403 fn encode_to_range<T>(&self, table_name: &str, key: T) -> KeyRange<JsValue>
404 where
405 T: SafeEncode,
406 {
407 encode_to_range(self.store_cipher.as_deref(), table_name, key)
408 }
409
410 pub async fn get_user_ids_inner(
413 &self,
414 room_id: &RoomId,
415 memberships: RoomMemberships,
416 stripped: bool,
417 ) -> Result<Vec<OwnedUserId>> {
418 let store_name = if stripped { keys::STRIPPED_USER_IDS } else { keys::USER_IDS };
419
420 let tx = self.inner.transaction(store_name).with_mode(TransactionMode::Readonly).build()?;
421 let store = tx.object_store(store_name)?;
422 let range = self.encode_to_range(store_name, room_id);
423
424 let user_ids = if memberships.is_empty() {
425 store
427 .get_all()
428 .with_query(&range)
429 .await?
430 .filter_map(Result::ok)
431 .filter_map(|f| self.deserialize_value::<RoomMember>(&f).ok().map(|m| m.user_id))
432 .collect::<Vec<_>>()
433 } else {
434 let mut user_ids = Vec::new();
435 let cursor = store.open_cursor().with_query(&range).await?;
436
437 if let Some(mut cursor) = cursor {
438 while let Some(value) = cursor.next_record().await? {
439 let member = self.deserialize_value::<RoomMember>(&value)?;
440
441 if memberships.matches(&member.membership) {
442 user_ids.push(member.user_id);
443 }
444 }
445 }
446
447 user_ids
448 };
449
450 Ok(user_ids)
451 }
452
453 async fn get_custom_value_for_js(&self, jskey: &JsValue) -> Result<Option<Vec<u8>>> {
454 self.inner
455 .transaction(keys::CUSTOM)
456 .with_mode(TransactionMode::Readonly)
457 .build()?
458 .object_store(keys::CUSTOM)?
459 .get(jskey)
460 .await?
461 .map(|f| self.deserialize_value(&f))
462 .transpose()
463 }
464
465 fn encode_kv_data_key(&self, key: StateStoreDataKey<'_>) -> JsValue {
466 match key {
470 StateStoreDataKey::SyncToken => {
471 self.encode_key(StateStoreDataKey::SYNC_TOKEN, StateStoreDataKey::SYNC_TOKEN)
472 }
473 StateStoreDataKey::SupportedVersions => self.encode_key(
474 StateStoreDataKey::SUPPORTED_VERSIONS,
475 StateStoreDataKey::SUPPORTED_VERSIONS,
476 ),
477 StateStoreDataKey::WellKnown => {
478 self.encode_key(keys::KV, StateStoreDataKey::WELL_KNOWN)
479 }
480 StateStoreDataKey::Filter(filter_name) => {
481 self.encode_key(StateStoreDataKey::FILTER, (StateStoreDataKey::FILTER, filter_name))
482 }
483 StateStoreDataKey::UserAvatarUrl(user_id) => {
484 self.encode_key(keys::KV, (StateStoreDataKey::USER_AVATAR_URL, user_id))
485 }
486 StateStoreDataKey::RecentlyVisitedRooms(user_id) => {
487 self.encode_key(keys::KV, (StateStoreDataKey::RECENTLY_VISITED_ROOMS, user_id))
488 }
489 StateStoreDataKey::UtdHookManagerData => {
490 self.encode_key(keys::KV, StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
491 }
492 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
493 self.encode_key(keys::KV, StateStoreDataKey::ONE_TIME_KEY_ALREADY_UPLOADED)
494 }
495 StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
496 if let Some(thread_root) = thread_root {
497 self.encode_key(
498 keys::KV,
499 (StateStoreDataKey::COMPOSER_DRAFT, (room_id, thread_root)),
500 )
501 } else {
502 self.encode_key(keys::KV, (StateStoreDataKey::COMPOSER_DRAFT, room_id))
503 }
504 }
505 StateStoreDataKey::SeenKnockRequests(room_id) => {
506 self.encode_key(keys::KV, (StateStoreDataKey::SEEN_KNOCK_REQUESTS, room_id))
507 }
508 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
509 self.encode_key(keys::KV, StateStoreDataKey::THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS)
510 }
511 StateStoreDataKey::HomeserverCapabilities => {
512 self.encode_key(keys::KV, StateStoreDataKey::HOMESERVER_CAPABILITIES)
513 }
514 }
515 }
516}
517
518#[derive(Serialize, Deserialize)]
521struct PersistedQueuedRequest {
522 pub room_id: OwnedRoomId,
524
525 kind: Option<QueuedRequestKind>,
528 transaction_id: OwnedTransactionId,
529
530 pub error: Option<QueueWedgeError>,
531
532 priority: Option<usize>,
533
534 #[serde(default = "created_now")]
536 created_at: MilliSecondsSinceUnixEpoch,
537
538 is_wedged: Option<bool>,
541
542 event: Option<SerializableEventContent>,
543}
544
545fn created_now() -> MilliSecondsSinceUnixEpoch {
546 MilliSecondsSinceUnixEpoch::now()
547}
548
549impl PersistedQueuedRequest {
550 fn into_queued_request(self) -> Option<QueuedRequest> {
551 let kind =
552 self.kind.or_else(|| self.event.map(|content| QueuedRequestKind::Event { content }))?;
553
554 let error = match self.is_wedged {
555 Some(true) => {
556 Some(QueueWedgeError::GenericApiError {
558 msg: "local echo failed to send in a previous session".into(),
559 })
560 }
561 _ => self.error,
562 };
563
564 let priority = self.priority.unwrap_or(0);
566
567 Some(QueuedRequest {
568 kind,
569 transaction_id: self.transaction_id,
570 error,
571 priority,
572 created_at: self.created_at,
573 })
574 }
575}
576
577#[derive(Serialize, Deserialize, PartialEq)]
578struct PersistedThreadSubscription {
579 status: String,
580 bump_stamp: Option<u64>,
581}
582
583impl From<StoredThreadSubscription> for PersistedThreadSubscription {
584 fn from(value: StoredThreadSubscription) -> Self {
585 Self { status: value.status.as_str().to_owned(), bump_stamp: value.bump_stamp }
586 }
587}
588
589#[cfg(target_family = "wasm")]
598macro_rules! impl_state_store {
599 ({ $($body:tt)* }) => {
600 #[async_trait(?Send)]
601 impl StateStore for IndexeddbStateStore {
602 type Error = IndexeddbStateStoreError;
603
604 $($body)*
605 }
606 };
607}
608
609#[cfg(not(target_family = "wasm"))]
610macro_rules! impl_state_store {
611 ({ $($body:tt)* }) => {
612 impl IndexeddbStateStore {
613 $($body)*
614 }
615 };
616}
617
618impl_state_store!({
619 async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
620 let encoded_key = self.encode_kv_data_key(key);
621
622 let value = self
623 .inner
624 .transaction(keys::KV)
625 .with_mode(TransactionMode::Readonly)
626 .build()?
627 .object_store(keys::KV)?
628 .get(&encoded_key)
629 .await?;
630
631 let value = match key {
632 StateStoreDataKey::SyncToken => value
633 .map(|f| self.deserialize_value::<String>(&f))
634 .transpose()?
635 .map(StateStoreDataValue::SyncToken),
636 StateStoreDataKey::SupportedVersions => value
637 .map(|f| self.deserialize_value::<TtlValue<SupportedVersionsResponse>>(&f))
638 .transpose()?
639 .map(StateStoreDataValue::SupportedVersions),
640 StateStoreDataKey::WellKnown => value
641 .map(|f| self.deserialize_value::<TtlValue<Option<WellKnownResponse>>>(&f))
642 .transpose()?
643 .map(StateStoreDataValue::WellKnown),
644 StateStoreDataKey::Filter(_) => value
645 .map(|f| self.deserialize_value::<String>(&f))
646 .transpose()?
647 .map(StateStoreDataValue::Filter),
648 StateStoreDataKey::UserAvatarUrl(_) => value
649 .map(|f| self.deserialize_value::<OwnedMxcUri>(&f))
650 .transpose()?
651 .map(StateStoreDataValue::UserAvatarUrl),
652 StateStoreDataKey::RecentlyVisitedRooms(_) => value
653 .map(|f| self.deserialize_value::<Vec<OwnedRoomId>>(&f))
654 .transpose()?
655 .map(StateStoreDataValue::RecentlyVisitedRooms),
656 StateStoreDataKey::UtdHookManagerData => value
657 .map(|f| self.deserialize_value::<GrowableBloom>(&f))
658 .transpose()?
659 .map(StateStoreDataValue::UtdHookManagerData),
660 StateStoreDataKey::OneTimeKeyAlreadyUploaded => value
661 .map(|f| self.deserialize_value::<bool>(&f))
662 .transpose()?
663 .map(|_| StateStoreDataValue::OneTimeKeyAlreadyUploaded),
664 StateStoreDataKey::ComposerDraft(_, _) => value
665 .map(|f| self.deserialize_value::<ComposerDraft>(&f))
666 .transpose()?
667 .map(StateStoreDataValue::ComposerDraft),
668 StateStoreDataKey::SeenKnockRequests(_) => value
669 .map(|f| self.deserialize_value::<BTreeMap<OwnedEventId, OwnedUserId>>(&f))
670 .transpose()?
671 .map(StateStoreDataValue::SeenKnockRequests),
672 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => value
673 .map(|f| self.deserialize_value::<Vec<ThreadSubscriptionCatchupToken>>(&f))
674 .transpose()?
675 .map(StateStoreDataValue::ThreadSubscriptionsCatchupTokens),
676 StateStoreDataKey::HomeserverCapabilities => value
677 .map(|f| self.deserialize_value::<TtlValue<Capabilities>>(&f))
678 .transpose()?
679 .map(StateStoreDataValue::HomeserverCapabilities),
680 };
681
682 Ok(value)
683 }
684
685 async fn set_kv_data(
686 &self,
687 key: StateStoreDataKey<'_>,
688 value: StateStoreDataValue,
689 ) -> Result<()> {
690 let encoded_key = self.encode_kv_data_key(key);
691
692 let serialized_value = match key {
693 StateStoreDataKey::SyncToken => self
694 .serialize_value(&value.into_sync_token().expect("Session data not a sync token")),
695 StateStoreDataKey::SupportedVersions => self.serialize_value(
696 &value
697 .into_supported_versions()
698 .expect("Session data not containing supported versions"),
699 ),
700 StateStoreDataKey::WellKnown => self.serialize_value(
701 &value.into_well_known().expect("Session data not containing well-known"),
702 ),
703 StateStoreDataKey::Filter(_) => {
704 self.serialize_value(&value.into_filter().expect("Session data not a filter"))
705 }
706 StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
707 &value.into_user_avatar_url().expect("Session data not an user avatar url"),
708 ),
709 StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
710 &value
711 .into_recently_visited_rooms()
712 .expect("Session data not a recently visited room list"),
713 ),
714 StateStoreDataKey::UtdHookManagerData => self.serialize_value(
715 &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
716 ),
717 StateStoreDataKey::OneTimeKeyAlreadyUploaded => self.serialize_value(&true),
718 StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
719 &value.into_composer_draft().expect("Session data not a composer draft"),
720 ),
721 StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
722 &value
723 .into_seen_knock_requests()
724 .expect("Session data is not a set of seen knock request ids"),
725 ),
726 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => self.serialize_value(
727 &value
728 .into_thread_subscriptions_catchup_tokens()
729 .expect("Session data is not a list of thread subscription catchup tokens"),
730 ),
731 StateStoreDataKey::HomeserverCapabilities => self.serialize_value(
732 &value
733 .into_homeserver_capabilities()
734 .expect("Session data is not a homeserver capabilities"),
735 ),
736 };
737
738 let tx = self.inner.transaction(keys::KV).with_mode(TransactionMode::Readwrite).build()?;
739
740 let obj = tx.object_store(keys::KV)?;
741
742 obj.put(&serialized_value?).with_key(encoded_key).build()?;
743
744 tx.commit().await?;
745
746 Ok(())
747 }
748
749 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
750 let encoded_key = self.encode_kv_data_key(key);
751
752 let tx = self.inner.transaction(keys::KV).with_mode(TransactionMode::Readwrite).build()?;
753 let obj = tx.object_store(keys::KV)?;
754
755 obj.delete(&encoded_key).build()?;
756
757 tx.commit().await?;
758
759 Ok(())
760 }
761
762 async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
763 let mut stores: HashSet<&'static str> = [
764 (changes.sync_token.is_some(), keys::KV),
765 (!changes.ambiguity_maps.is_empty(), keys::DISPLAY_NAMES),
766 (!changes.account_data.is_empty(), keys::ACCOUNT_DATA),
767 (!changes.presence.is_empty(), keys::PRESENCE),
768 (
769 !changes.profiles.is_empty() || !changes.profiles_to_delete.is_empty(),
770 keys::PROFILES,
771 ),
772 (!changes.room_account_data.is_empty(), keys::ROOM_ACCOUNT_DATA),
773 (!changes.receipts.is_empty(), keys::ROOM_EVENT_RECEIPTS),
774 ]
775 .iter()
776 .filter_map(|(id, key)| if *id { Some(*key) } else { None })
777 .collect();
778
779 if !changes.state.is_empty() {
780 stores.extend([
781 keys::ROOM_STATE,
782 keys::USER_IDS,
783 keys::STRIPPED_USER_IDS,
784 keys::STRIPPED_ROOM_STATE,
785 keys::PROFILES,
786 ]);
787 }
788
789 if !changes.redactions.is_empty() {
790 stores.extend([keys::ROOM_STATE, keys::ROOM_INFOS]);
791 }
792
793 if !changes.room_infos.is_empty() {
794 stores.insert(keys::ROOM_INFOS);
795 }
796
797 if !changes.stripped_state.is_empty() {
798 stores.extend([keys::STRIPPED_ROOM_STATE, keys::STRIPPED_USER_IDS]);
799 }
800
801 if !changes.receipts.is_empty() {
802 stores.extend([keys::ROOM_EVENT_RECEIPTS, keys::ROOM_USER_RECEIPTS])
803 }
804
805 if stores.is_empty() {
806 return Ok(());
808 }
809
810 let stores: Vec<&'static str> = stores.into_iter().collect();
811 let tx = self.inner.transaction(stores).with_mode(TransactionMode::Readwrite).build()?;
812
813 if let Some(s) = &changes.sync_token {
814 tx.object_store(keys::KV)?
815 .put(&self.serialize_value(s)?)
816 .with_key(self.encode_kv_data_key(StateStoreDataKey::SyncToken))
817 .build()?;
818 }
819
820 if !changes.ambiguity_maps.is_empty() {
821 let store = tx.object_store(keys::DISPLAY_NAMES)?;
822 for (room_id, ambiguity_maps) in &changes.ambiguity_maps {
823 for (display_name, map) in ambiguity_maps {
824 let key = self.encode_key(
825 keys::DISPLAY_NAMES,
826 (
827 room_id,
828 display_name
829 .as_normalized_str()
830 .unwrap_or_else(|| display_name.as_raw_str()),
831 ),
832 );
833
834 store.put(&self.serialize_value(&map)?).with_key(key).build()?;
835 }
836 }
837 }
838
839 if !changes.account_data.is_empty() {
840 let store = tx.object_store(keys::ACCOUNT_DATA)?;
841 for (event_type, event) in &changes.account_data {
842 store
843 .put(&self.serialize_value(&event)?)
844 .with_key(self.encode_key(keys::ACCOUNT_DATA, event_type))
845 .build()?;
846 }
847 }
848
849 if !changes.room_account_data.is_empty() {
850 let store = tx.object_store(keys::ROOM_ACCOUNT_DATA)?;
851 for (room, events) in &changes.room_account_data {
852 for (event_type, event) in events {
853 let key = self.encode_key(keys::ROOM_ACCOUNT_DATA, (room, event_type));
854 store.put(&self.serialize_value(&event)?).with_key(key).build()?;
855 }
856 }
857 }
858
859 if !changes.state.is_empty() {
860 let state = tx.object_store(keys::ROOM_STATE)?;
861 let profiles = tx.object_store(keys::PROFILES)?;
862 let user_ids = tx.object_store(keys::USER_IDS)?;
863 let stripped_state = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
864 let stripped_user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
865
866 for (room, user_ids) in &changes.profiles_to_delete {
867 for user_id in user_ids {
868 let key = self.encode_key(keys::PROFILES, (room, user_id));
869 profiles.delete(&key).build()?;
870 }
871 }
872
873 for (room, event_types) in &changes.state {
874 let profile_changes = changes.profiles.get(room);
875
876 for (event_type, events) in event_types {
877 for (state_key, raw_event) in events {
878 let key = self.encode_key(keys::ROOM_STATE, (room, event_type, state_key));
879 state
880 .put(&self.serialize_value(&raw_event)?)
881 .with_key(key.clone())
882 .build()?;
883 stripped_state.delete(&key).build()?;
884
885 if *event_type == StateEventType::RoomMember {
886 let event =
887 match raw_event.deserialize_as_unchecked::<SyncRoomMemberEvent>() {
888 Ok(ev) => ev,
889 Err(e) => {
890 let event_id: Option<String> =
891 raw_event.get_field("event_id").ok().flatten();
892 debug!(event_id, "Failed to deserialize member event: {e}");
893 continue;
894 }
895 };
896
897 let key = (room, state_key);
898
899 stripped_user_ids
900 .delete(&self.encode_key(keys::STRIPPED_USER_IDS, key))
901 .build()?;
902
903 user_ids
904 .put(&self.serialize_value(&RoomMember::from(&event))?)
905 .with_key(self.encode_key(keys::USER_IDS, key))
906 .build()?;
907
908 if let Some(profile) =
909 profile_changes.and_then(|p| p.get(event.state_key()))
910 {
911 profiles
912 .put(&self.serialize_value(&profile)?)
913 .with_key(self.encode_key(keys::PROFILES, key))
914 .build()?;
915 }
916 }
917 }
918 }
919 }
920 }
921
922 if !changes.room_infos.is_empty() {
923 let room_infos = tx.object_store(keys::ROOM_INFOS)?;
924 for (room_id, room_info) in &changes.room_infos {
925 room_infos
926 .put(&self.serialize_value(&room_info)?)
927 .with_key(self.encode_key(keys::ROOM_INFOS, room_id))
928 .build()?;
929 }
930 }
931
932 if !changes.presence.is_empty() {
933 let store = tx.object_store(keys::PRESENCE)?;
934 for (sender, event) in &changes.presence {
935 store
936 .put(&self.serialize_value(&event)?)
937 .with_key(self.encode_key(keys::PRESENCE, sender))
938 .build()?;
939 }
940 }
941
942 if !changes.stripped_state.is_empty() {
943 let store = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
944 let user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
945
946 for (room, event_types) in &changes.stripped_state {
947 for (event_type, events) in event_types {
948 for (state_key, raw_event) in events {
949 let key = self
950 .encode_key(keys::STRIPPED_ROOM_STATE, (room, event_type, state_key));
951 store.put(&self.serialize_value(&raw_event)?).with_key(key).build()?;
952
953 if *event_type == StateEventType::RoomMember {
954 let event = match raw_event
955 .deserialize_as_unchecked::<StrippedRoomMemberEvent>()
956 {
957 Ok(ev) => ev,
958 Err(e) => {
959 let event_id: Option<String> =
960 raw_event.get_field("event_id").ok().flatten();
961 debug!(
962 event_id,
963 "Failed to deserialize stripped member event: {e}"
964 );
965 continue;
966 }
967 };
968
969 let key = (room, state_key);
970
971 user_ids
972 .put(&self.serialize_value(&RoomMember::from(&event))?)
973 .with_key(self.encode_key(keys::STRIPPED_USER_IDS, key))
974 .build()?;
975 }
976 }
977 }
978 }
979 }
980
981 if !changes.receipts.is_empty() {
982 let room_user_receipts = tx.object_store(keys::ROOM_USER_RECEIPTS)?;
983 let room_event_receipts = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
984
985 for (room, content) in &changes.receipts {
986 for (event_id, receipts) in &content.0 {
987 for (receipt_type, receipts) in receipts {
988 for (user_id, receipt) in receipts {
989 let key = match receipt.thread.as_str() {
990 Some(thread_id) => self.encode_key(
991 keys::ROOM_USER_RECEIPTS,
992 (room, receipt_type, thread_id, user_id),
993 ),
994 None => self.encode_key(
995 keys::ROOM_USER_RECEIPTS,
996 (room, receipt_type, user_id),
997 ),
998 };
999
1000 if let Some((old_event, _)) =
1001 room_user_receipts.get(&key).await?.and_then(|f| {
1002 self.deserialize_value::<(OwnedEventId, Receipt)>(&f).ok()
1003 })
1004 {
1005 let key = match receipt.thread.as_str() {
1006 Some(thread_id) => self.encode_key(
1007 keys::ROOM_EVENT_RECEIPTS,
1008 (room, receipt_type, thread_id, old_event, user_id),
1009 ),
1010 None => self.encode_key(
1011 keys::ROOM_EVENT_RECEIPTS,
1012 (room, receipt_type, old_event, user_id),
1013 ),
1014 };
1015 room_event_receipts.delete(&key).build()?;
1016 }
1017
1018 room_user_receipts
1019 .put(&self.serialize_value(&(event_id, receipt))?)
1020 .with_key(key)
1021 .build()?;
1022
1023 let key = match receipt.thread.as_str() {
1025 Some(thread_id) => self.encode_key(
1026 keys::ROOM_EVENT_RECEIPTS,
1027 (room, receipt_type, thread_id, event_id, user_id),
1028 ),
1029 None => self.encode_key(
1030 keys::ROOM_EVENT_RECEIPTS,
1031 (room, receipt_type, event_id, user_id),
1032 ),
1033 };
1034 room_event_receipts
1035 .put(&self.serialize_value(&(user_id, receipt))?)
1036 .with_key(key)
1037 .build()?;
1038 }
1039 }
1040 }
1041 }
1042 }
1043
1044 if !changes.redactions.is_empty() {
1045 let state = tx.object_store(keys::ROOM_STATE)?;
1046 let room_info = tx.object_store(keys::ROOM_INFOS)?;
1047
1048 for (room_id, redactions) in &changes.redactions {
1049 let range = self.encode_to_range(keys::ROOM_STATE, room_id);
1050 let Some(mut cursor) = state.open_cursor().with_query(&range).await? else {
1051 continue;
1052 };
1053
1054 let mut redaction_rules = None;
1055
1056 while let Some(value) = cursor.next_record().await? {
1057 let Some(key) = cursor.key::<JsValue>()? else {
1058 break;
1059 };
1060
1061 let raw_evt = self.deserialize_value::<Raw<AnySyncStateEvent>>(&value)?;
1062 if let Ok(Some(event_id)) = raw_evt.get_field::<OwnedEventId>("event_id")
1063 && let Some(redaction) = redactions.get(&event_id)
1064 {
1065 let redaction_rules = match &redaction_rules {
1066 Some(r) => r,
1067 None => {
1068 let value = room_info
1069 .get(&self.encode_key(keys::ROOM_INFOS, room_id))
1070 .await?
1071 .and_then(|f| self.deserialize_value::<RoomInfo>(&f).ok())
1072 .map(|info| info.room_version_rules_or_default())
1073 .unwrap_or_else(|| {
1074 warn!(
1075 ?room_id,
1076 "Unable to get the room version rules, \
1077 defaulting to rules for room version \
1078 {ROOM_VERSION_FALLBACK}"
1079 );
1080 ROOM_VERSION_RULES_FALLBACK
1081 })
1082 .redaction;
1083 redaction_rules.get_or_insert(value)
1084 }
1085 };
1086
1087 let redacted = redact(
1088 raw_evt.deserialize_as::<CanonicalJsonObject>()?,
1089 redaction_rules,
1090 Some(RedactedBecause::from_raw_event(redaction)?),
1091 )
1092 .map_err(StoreError::Redaction)?;
1093 state.put(&self.serialize_value(&redacted)?).with_key(key).build()?;
1094 }
1095 }
1096 }
1097 }
1098
1099 tx.commit().await.map_err(|e| e.into())
1100 }
1101
1102 async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1103 self.inner
1104 .transaction(keys::PRESENCE)
1105 .with_mode(TransactionMode::Readonly)
1106 .build()?
1107 .object_store(keys::PRESENCE)?
1108 .get(&self.encode_key(keys::PRESENCE, user_id))
1109 .await?
1110 .map(|f| self.deserialize_value(&f))
1111 .transpose()
1112 }
1113
1114 async fn get_presence_events(
1115 &self,
1116 user_ids: &[OwnedUserId],
1117 ) -> Result<Vec<Raw<PresenceEvent>>> {
1118 if user_ids.is_empty() {
1119 return Ok(Vec::new());
1120 }
1121
1122 let txn =
1123 self.inner.transaction(keys::PRESENCE).with_mode(TransactionMode::Readonly).build()?;
1124 let store = txn.object_store(keys::PRESENCE)?;
1125
1126 let mut events = Vec::with_capacity(user_ids.len());
1127
1128 for user_id in user_ids {
1129 if let Some(event) = store
1130 .get(&self.encode_key(keys::PRESENCE, user_id))
1131 .await?
1132 .map(|f| self.deserialize_value(&f))
1133 .transpose()?
1134 {
1135 events.push(event)
1136 }
1137 }
1138
1139 Ok(events)
1140 }
1141
1142 async fn get_state_event(
1143 &self,
1144 room_id: &RoomId,
1145 event_type: StateEventType,
1146 state_key: &str,
1147 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1148 Ok(self
1149 .get_state_events_for_keys(room_id, event_type, &[state_key])
1150 .await?
1151 .into_iter()
1152 .next())
1153 }
1154
1155 async fn get_state_events(
1156 &self,
1157 room_id: &RoomId,
1158 event_type: StateEventType,
1159 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1160 let stripped_range =
1161 self.encode_to_range(keys::STRIPPED_ROOM_STATE, (room_id, &event_type));
1162 let stripped_events = self
1163 .inner
1164 .transaction(keys::STRIPPED_ROOM_STATE)
1165 .with_mode(TransactionMode::Readonly)
1166 .build()?
1167 .object_store(keys::STRIPPED_ROOM_STATE)?
1168 .get_all()
1169 .with_query(&stripped_range)
1170 .await?
1171 .filter_map(Result::ok)
1172 .filter_map(|f| {
1173 self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Stripped)
1174 })
1175 .collect::<Vec<_>>();
1176
1177 if !stripped_events.is_empty() {
1178 return Ok(stripped_events);
1179 }
1180
1181 let range = self.encode_to_range(keys::ROOM_STATE, (room_id, event_type));
1182 Ok(self
1183 .inner
1184 .transaction(keys::ROOM_STATE)
1185 .with_mode(TransactionMode::Readonly)
1186 .build()?
1187 .object_store(keys::ROOM_STATE)?
1188 .get_all()
1189 .with_query(&range)
1190 .await?
1191 .filter_map(Result::ok)
1192 .filter_map(|f| self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Sync))
1193 .collect::<Vec<_>>())
1194 }
1195
1196 async fn get_state_events_for_keys(
1197 &self,
1198 room_id: &RoomId,
1199 event_type: StateEventType,
1200 state_keys: &[&str],
1201 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1202 if state_keys.is_empty() {
1203 return Ok(Vec::new());
1204 }
1205
1206 let mut events = Vec::with_capacity(state_keys.len());
1207
1208 {
1209 let txn = self
1210 .inner
1211 .transaction(keys::STRIPPED_ROOM_STATE)
1212 .with_mode(TransactionMode::Readonly)
1213 .build()?;
1214 let store = txn.object_store(keys::STRIPPED_ROOM_STATE)?;
1215
1216 for state_key in state_keys {
1217 if let Some(event) =
1218 store
1219 .get(&self.encode_key(
1220 keys::STRIPPED_ROOM_STATE,
1221 (room_id, &event_type, state_key),
1222 ))
1223 .await?
1224 .map(|f| self.deserialize_value(&f))
1225 .transpose()?
1226 {
1227 events.push(RawAnySyncOrStrippedState::Stripped(event));
1228 }
1229 }
1230
1231 if !events.is_empty() {
1232 return Ok(events);
1233 }
1234 }
1235
1236 let txn = self
1237 .inner
1238 .transaction(keys::ROOM_STATE)
1239 .with_mode(TransactionMode::Readonly)
1240 .build()?;
1241 let store = txn.object_store(keys::ROOM_STATE)?;
1242
1243 for state_key in state_keys {
1244 if let Some(event) = store
1245 .get(&self.encode_key(keys::ROOM_STATE, (room_id, &event_type, state_key)))
1246 .await?
1247 .map(|f| self.deserialize_value(&f))
1248 .transpose()?
1249 {
1250 events.push(RawAnySyncOrStrippedState::Sync(event));
1251 }
1252 }
1253
1254 Ok(events)
1255 }
1256
1257 async fn get_profile(
1258 &self,
1259 room_id: &RoomId,
1260 user_id: &UserId,
1261 ) -> Result<Option<MinimalRoomMemberEvent>> {
1262 self.inner
1263 .transaction(keys::PROFILES)
1264 .with_mode(TransactionMode::Readonly)
1265 .build()?
1266 .object_store(keys::PROFILES)?
1267 .get(&self.encode_key(keys::PROFILES, (room_id, user_id)))
1268 .await?
1269 .map(|f| self.deserialize_value(&f))
1270 .transpose()
1271 }
1272
1273 async fn get_profiles<'a>(
1274 &self,
1275 room_id: &RoomId,
1276 user_ids: &'a [OwnedUserId],
1277 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1278 if user_ids.is_empty() {
1279 return Ok(BTreeMap::new());
1280 }
1281
1282 let txn =
1283 self.inner.transaction(keys::PROFILES).with_mode(TransactionMode::Readonly).build()?;
1284 let store = txn.object_store(keys::PROFILES)?;
1285
1286 let mut profiles = BTreeMap::new();
1287 for user_id in user_ids {
1288 if let Some(profile) = store
1289 .get(&self.encode_key(keys::PROFILES, (room_id, user_id)))
1290 .await?
1291 .map(|f| self.deserialize_value(&f))
1292 .transpose()?
1293 {
1294 profiles.insert(user_id.as_ref(), profile);
1295 }
1296 }
1297
1298 Ok(profiles)
1299 }
1300
1301 async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1302 let transaction = self
1303 .inner
1304 .transaction(keys::ROOM_INFOS)
1305 .with_mode(TransactionMode::Readonly)
1306 .build()?;
1307
1308 let object_store = transaction.object_store(keys::ROOM_INFOS)?;
1309
1310 Ok(match room_load_settings {
1311 RoomLoadSettings::All => object_store
1312 .get_all()
1313 .await?
1314 .map(|room_info| self.deserialize_value::<RoomInfo>(&room_info?))
1315 .collect::<Result<_>>()?,
1316
1317 RoomLoadSettings::One(room_id) => {
1318 match object_store.get(&self.encode_key(keys::ROOM_INFOS, room_id)).await? {
1319 Some(room_info) => vec![self.deserialize_value::<RoomInfo>(&room_info)?],
1320 None => vec![],
1321 }
1322 }
1323 })
1324 }
1325
1326 async fn get_users_with_display_name(
1327 &self,
1328 room_id: &RoomId,
1329 display_name: &DisplayName,
1330 ) -> Result<BTreeSet<OwnedUserId>> {
1331 self.inner
1332 .transaction(keys::DISPLAY_NAMES)
1333 .with_mode(TransactionMode::Readonly)
1334 .build()?
1335 .object_store(keys::DISPLAY_NAMES)?
1336 .get(&self.encode_key(
1337 keys::DISPLAY_NAMES,
1338 (
1339 room_id,
1340 display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1341 ),
1342 ))
1343 .await?
1344 .map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
1345 .unwrap_or_else(|| Ok(Default::default()))
1346 }
1347
1348 async fn get_users_with_display_names<'a>(
1349 &self,
1350 room_id: &RoomId,
1351 display_names: &'a [DisplayName],
1352 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1353 let mut map = HashMap::new();
1354
1355 if display_names.is_empty() {
1356 return Ok(map);
1357 }
1358
1359 let txn = self
1360 .inner
1361 .transaction(keys::DISPLAY_NAMES)
1362 .with_mode(TransactionMode::Readonly)
1363 .build()?;
1364 let store = txn.object_store(keys::DISPLAY_NAMES)?;
1365
1366 for display_name in display_names {
1367 if let Some(user_ids) = store
1368 .get(
1369 &self.encode_key(
1370 keys::DISPLAY_NAMES,
1371 (
1372 room_id,
1373 display_name
1374 .as_normalized_str()
1375 .unwrap_or_else(|| display_name.as_raw_str()),
1376 ),
1377 ),
1378 )
1379 .await?
1380 .map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
1381 .transpose()?
1382 {
1383 map.insert(display_name, user_ids);
1384 }
1385 }
1386
1387 Ok(map)
1388 }
1389
1390 async fn get_account_data_event(
1391 &self,
1392 event_type: GlobalAccountDataEventType,
1393 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1394 self.inner
1395 .transaction(keys::ACCOUNT_DATA)
1396 .with_mode(TransactionMode::Readonly)
1397 .build()?
1398 .object_store(keys::ACCOUNT_DATA)?
1399 .get(&self.encode_key(keys::ACCOUNT_DATA, event_type))
1400 .await?
1401 .map(|f| self.deserialize_value(&f))
1402 .transpose()
1403 }
1404
1405 async fn get_room_account_data_event(
1406 &self,
1407 room_id: &RoomId,
1408 event_type: RoomAccountDataEventType,
1409 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1410 self.inner
1411 .transaction(keys::ROOM_ACCOUNT_DATA)
1412 .with_mode(TransactionMode::Readonly)
1413 .build()?
1414 .object_store(keys::ROOM_ACCOUNT_DATA)?
1415 .get(&self.encode_key(keys::ROOM_ACCOUNT_DATA, (room_id, event_type)))
1416 .await?
1417 .map(|f| self.deserialize_value(&f))
1418 .transpose()
1419 }
1420
1421 async fn get_user_room_receipt_event(
1422 &self,
1423 room_id: &RoomId,
1424 receipt_type: ReceiptType,
1425 thread: ReceiptThread,
1426 user_id: &UserId,
1427 ) -> Result<Option<(OwnedEventId, Receipt)>> {
1428 let key = match thread.as_str() {
1429 Some(thread_id) => self
1430 .encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, thread_id, user_id)),
1431 None => self.encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, user_id)),
1432 };
1433 self.inner
1434 .transaction(keys::ROOM_USER_RECEIPTS)
1435 .with_mode(TransactionMode::Readonly)
1436 .build()?
1437 .object_store(keys::ROOM_USER_RECEIPTS)?
1438 .get(&key)
1439 .await?
1440 .map(|f| self.deserialize_value(&f))
1441 .transpose()
1442 }
1443
1444 async fn get_event_room_receipt_events(
1445 &self,
1446 room_id: &RoomId,
1447 receipt_type: ReceiptType,
1448 thread: ReceiptThread,
1449 event_id: &EventId,
1450 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1451 let range = match thread.as_str() {
1452 Some(thread_id) => self.encode_to_range(
1453 keys::ROOM_EVENT_RECEIPTS,
1454 (room_id, receipt_type, thread_id, event_id),
1455 ),
1456 None => {
1457 self.encode_to_range(keys::ROOM_EVENT_RECEIPTS, (room_id, receipt_type, event_id))
1458 }
1459 };
1460 let tx = self
1461 .inner
1462 .transaction(keys::ROOM_EVENT_RECEIPTS)
1463 .with_mode(TransactionMode::Readonly)
1464 .build()?;
1465 let store = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
1466
1467 Ok(store
1468 .get_all()
1469 .with_query(&range)
1470 .await?
1471 .filter_map(Result::ok)
1472 .filter_map(|f| self.deserialize_value(&f).ok())
1473 .collect::<Vec<_>>())
1474 }
1475
1476 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1477 let jskey = &JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1478 self.get_custom_value_for_js(jskey).await
1479 }
1480
1481 async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1482 let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1483
1484 let prev = self.get_custom_value_for_js(&jskey).await?;
1485
1486 let tx =
1487 self.inner.transaction(keys::CUSTOM).with_mode(TransactionMode::Readwrite).build()?;
1488
1489 tx.object_store(keys::CUSTOM)?
1490 .put(&self.serialize_value(&value)?)
1491 .with_key(jskey)
1492 .build()?;
1493
1494 tx.commit().await.map_err(IndexeddbStateStoreError::from)?;
1495 Ok(prev)
1496 }
1497
1498 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1499 let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1500
1501 let prev = self.get_custom_value_for_js(&jskey).await?;
1502
1503 let tx =
1504 self.inner.transaction(keys::CUSTOM).with_mode(TransactionMode::Readwrite).build()?;
1505
1506 tx.object_store(keys::CUSTOM)?.delete(&jskey).build()?;
1507
1508 tx.commit().await.map_err(IndexeddbStateStoreError::from)?;
1509 Ok(prev)
1510 }
1511
1512 async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1513 let direct_stores = [keys::ROOM_INFOS, keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE];
1515
1516 let prefixed_stores = [
1519 keys::PROFILES,
1520 keys::DISPLAY_NAMES,
1521 keys::USER_IDS,
1522 keys::ROOM_STATE,
1523 keys::ROOM_ACCOUNT_DATA,
1524 keys::ROOM_EVENT_RECEIPTS,
1525 keys::ROOM_USER_RECEIPTS,
1526 keys::STRIPPED_ROOM_STATE,
1527 keys::STRIPPED_USER_IDS,
1528 keys::THREAD_SUBSCRIPTIONS,
1529 ];
1530
1531 let all_stores = {
1532 let mut v = Vec::new();
1533 v.extend(prefixed_stores);
1534 v.extend(direct_stores);
1535 v
1536 };
1537
1538 let tx =
1539 self.inner.transaction(all_stores).with_mode(TransactionMode::Readwrite).build()?;
1540
1541 for store_name in direct_stores {
1542 tx.object_store(store_name)?.delete(&self.encode_key(store_name, room_id)).build()?;
1543 }
1544
1545 for store_name in prefixed_stores {
1546 let store = tx.object_store(store_name)?;
1547 let range = self.encode_to_range(store_name, room_id);
1548 for key in store.get_all_keys::<JsValue>().with_query(&range).await? {
1549 store.delete(&key?).build()?;
1550 }
1551 }
1552
1553 tx.commit().await.map_err(|e| e.into())
1554 }
1555
1556 async fn get_user_ids(
1557 &self,
1558 room_id: &RoomId,
1559 memberships: RoomMemberships,
1560 ) -> Result<Vec<OwnedUserId>> {
1561 let ids = self.get_user_ids_inner(room_id, memberships, true).await?;
1562 if !ids.is_empty() {
1563 return Ok(ids);
1564 }
1565 self.get_user_ids_inner(room_id, memberships, false).await
1566 }
1567
1568 async fn save_send_queue_request(
1569 &self,
1570 room_id: &RoomId,
1571 transaction_id: OwnedTransactionId,
1572 created_at: MilliSecondsSinceUnixEpoch,
1573 kind: QueuedRequestKind,
1574 priority: usize,
1575 ) -> Result<()> {
1576 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1577
1578 let tx = self
1579 .inner
1580 .transaction(keys::ROOM_SEND_QUEUE)
1581 .with_mode(TransactionMode::Readwrite)
1582 .build()?;
1583
1584 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1585
1586 let prev = obj.get(&encoded_key).await?;
1591
1592 let mut prev = prev.map_or_else(
1593 || Ok(Vec::new()),
1594 |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1595 )?;
1596
1597 prev.push(PersistedQueuedRequest {
1599 room_id: room_id.to_owned(),
1600 kind: Some(kind),
1601 transaction_id,
1602 error: None,
1603 is_wedged: None,
1604 event: None,
1605 priority: Some(priority),
1606 created_at,
1607 });
1608
1609 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1611
1612 tx.commit().await?;
1613
1614 Ok(())
1615 }
1616
1617 async fn update_send_queue_request(
1618 &self,
1619 room_id: &RoomId,
1620 transaction_id: &TransactionId,
1621 kind: QueuedRequestKind,
1622 ) -> Result<bool> {
1623 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1624
1625 let tx = self
1626 .inner
1627 .transaction(keys::ROOM_SEND_QUEUE)
1628 .with_mode(TransactionMode::Readwrite)
1629 .build()?;
1630
1631 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1632
1633 let prev = obj.get(&encoded_key).await?;
1638
1639 let mut prev = prev.map_or_else(
1640 || Ok(Vec::new()),
1641 |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1642 )?;
1643
1644 if let Some(entry) = prev.iter_mut().find(|entry| entry.transaction_id == transaction_id) {
1646 entry.kind = Some(kind);
1647 entry.error = None;
1649 entry.is_wedged = None;
1651 entry.event = None;
1652
1653 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1655 tx.commit().await?;
1656
1657 Ok(true)
1658 } else {
1659 Ok(false)
1660 }
1661 }
1662
1663 async fn remove_send_queue_request(
1664 &self,
1665 room_id: &RoomId,
1666 transaction_id: &TransactionId,
1667 ) -> Result<bool> {
1668 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1669
1670 let tx = self
1671 .inner
1672 .transaction([keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE])
1673 .with_mode(TransactionMode::Readwrite)
1674 .build()?;
1675
1676 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1677
1678 if let Some(val) = obj.get(&encoded_key).await? {
1683 let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
1684 if let Some(pos) = prev.iter().position(|item| item.transaction_id == transaction_id) {
1685 prev.remove(pos);
1686
1687 if prev.is_empty() {
1688 obj.delete(&encoded_key).build()?;
1689 } else {
1690 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1691 }
1692
1693 tx.commit().await?;
1694 return Ok(true);
1695 }
1696 }
1697
1698 Ok(false)
1699 }
1700
1701 async fn load_send_queue_requests(&self, room_id: &RoomId) -> Result<Vec<QueuedRequest>> {
1702 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1703
1704 let prev = self
1707 .inner
1708 .transaction(keys::ROOM_SEND_QUEUE)
1709 .with_mode(TransactionMode::Readwrite)
1710 .build()?
1711 .object_store(keys::ROOM_SEND_QUEUE)?
1712 .get(&encoded_key)
1713 .await?;
1714
1715 let mut prev = prev.map_or_else(
1716 || Ok(Vec::new()),
1717 |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1718 )?;
1719
1720 prev.sort_by_key(|item| Reverse(item.priority.unwrap_or(0)));
1722
1723 Ok(prev.into_iter().filter_map(PersistedQueuedRequest::into_queued_request).collect())
1724 }
1725
1726 async fn update_send_queue_request_status(
1727 &self,
1728 room_id: &RoomId,
1729 transaction_id: &TransactionId,
1730 error: Option<QueueWedgeError>,
1731 ) -> Result<()> {
1732 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1733
1734 let tx = self
1735 .inner
1736 .transaction(keys::ROOM_SEND_QUEUE)
1737 .with_mode(TransactionMode::Readwrite)
1738 .build()?;
1739
1740 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1741
1742 if let Some(val) = obj.get(&encoded_key).await? {
1743 let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
1744 if let Some(request) =
1745 prev.iter_mut().find(|item| item.transaction_id == transaction_id)
1746 {
1747 request.is_wedged = None;
1748 request.error = error;
1749 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1750 }
1751 }
1752
1753 tx.commit().await?;
1754
1755 Ok(())
1756 }
1757
1758 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>> {
1759 let tx = self
1760 .inner
1761 .transaction(keys::ROOM_SEND_QUEUE)
1762 .with_mode(TransactionMode::Readwrite)
1763 .build()?;
1764
1765 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1766
1767 let all_entries = obj
1768 .get_all()
1769 .await?
1770 .map(|item| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&item?))
1771 .collect::<Result<Vec<Vec<PersistedQueuedRequest>>, _>>()?
1772 .into_iter()
1773 .flat_map(|vec| vec.into_iter().map(|item| item.room_id))
1774 .collect::<BTreeSet<_>>();
1775
1776 Ok(all_entries.into_iter().collect())
1777 }
1778
1779 async fn save_dependent_queued_request(
1780 &self,
1781 room_id: &RoomId,
1782 parent_txn_id: &TransactionId,
1783 own_txn_id: ChildTransactionId,
1784 created_at: MilliSecondsSinceUnixEpoch,
1785 content: DependentQueuedRequestKind,
1786 ) -> Result<()> {
1787 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1788
1789 let tx = self
1790 .inner
1791 .transaction(keys::DEPENDENT_SEND_QUEUE)
1792 .with_mode(TransactionMode::Readwrite)
1793 .build()?;
1794
1795 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1796
1797 let prev = obj.get(&encoded_key).await?;
1800
1801 let mut prev = prev.map_or_else(
1802 || Ok(Vec::new()),
1803 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1804 )?;
1805
1806 prev.push(DependentQueuedRequest {
1808 kind: content,
1809 parent_transaction_id: parent_txn_id.to_owned(),
1810 own_transaction_id: own_txn_id,
1811 parent_key: None,
1812 created_at,
1813 });
1814
1815 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1817
1818 tx.commit().await?;
1819
1820 Ok(())
1821 }
1822
1823 async fn update_dependent_queued_request(
1824 &self,
1825 room_id: &RoomId,
1826 own_transaction_id: &ChildTransactionId,
1827 new_content: DependentQueuedRequestKind,
1828 ) -> Result<bool> {
1829 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1830
1831 let tx = self
1832 .inner
1833 .transaction(keys::DEPENDENT_SEND_QUEUE)
1834 .with_mode(TransactionMode::Readwrite)
1835 .build()?;
1836
1837 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1838
1839 let prev = obj.get(&encoded_key).await?;
1842
1843 let mut prev = prev.map_or_else(
1844 || Ok(Vec::new()),
1845 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1846 )?;
1847
1848 let mut found = false;
1850 for entry in prev.iter_mut() {
1851 if entry.own_transaction_id == *own_transaction_id {
1852 found = true;
1853 entry.kind = new_content;
1854 break;
1855 }
1856 }
1857
1858 if found {
1859 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1860 tx.commit().await?;
1861 }
1862
1863 Ok(found)
1864 }
1865
1866 async fn mark_dependent_queued_requests_as_ready(
1867 &self,
1868 room_id: &RoomId,
1869 parent_txn_id: &TransactionId,
1870 parent_key: SentRequestKey,
1871 ) -> Result<usize> {
1872 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1873
1874 let tx = self
1875 .inner
1876 .transaction(keys::DEPENDENT_SEND_QUEUE)
1877 .with_mode(TransactionMode::Readwrite)
1878 .build()?;
1879
1880 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1881
1882 let prev = obj.get(&encoded_key).await?;
1885
1886 let mut prev = prev.map_or_else(
1887 || Ok(Vec::new()),
1888 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1889 )?;
1890
1891 let mut num_updated = 0;
1893 for entry in prev.iter_mut().filter(|entry| entry.parent_transaction_id == parent_txn_id) {
1894 entry.parent_key = Some(parent_key.clone());
1895 num_updated += 1;
1896 }
1897
1898 if num_updated > 0 {
1899 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1900 tx.commit().await?;
1901 }
1902
1903 Ok(num_updated)
1904 }
1905
1906 async fn remove_dependent_queued_request(
1907 &self,
1908 room_id: &RoomId,
1909 txn_id: &ChildTransactionId,
1910 ) -> Result<bool> {
1911 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1912
1913 let tx = self
1914 .inner
1915 .transaction(keys::DEPENDENT_SEND_QUEUE)
1916 .with_mode(TransactionMode::Readwrite)
1917 .build()?;
1918
1919 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1920
1921 if let Some(val) = obj.get(&encoded_key).await? {
1924 let mut prev = self.deserialize_value::<Vec<DependentQueuedRequest>>(&val)?;
1925 if let Some(pos) = prev.iter().position(|item| item.own_transaction_id == *txn_id) {
1926 prev.remove(pos);
1927
1928 if prev.is_empty() {
1929 obj.delete(&encoded_key).build()?;
1930 } else {
1931 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1932 }
1933
1934 tx.commit().await?;
1935 return Ok(true);
1936 }
1937 }
1938
1939 Ok(false)
1940 }
1941
1942 async fn load_dependent_queued_requests(
1943 &self,
1944 room_id: &RoomId,
1945 ) -> Result<Vec<DependentQueuedRequest>> {
1946 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1947
1948 let prev = self
1950 .inner
1951 .transaction(keys::DEPENDENT_SEND_QUEUE)
1952 .with_mode(TransactionMode::Readwrite)
1953 .build()?
1954 .object_store(keys::DEPENDENT_SEND_QUEUE)?
1955 .get(&encoded_key)
1956 .await?;
1957
1958 prev.map_or_else(
1959 || Ok(Vec::new()),
1960 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1961 )
1962 }
1963
1964 async fn upsert_thread_subscriptions(
1965 &self,
1966 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1967 ) -> Result<()> {
1968 let tx = self
1969 .inner
1970 .transaction(keys::THREAD_SUBSCRIPTIONS)
1971 .with_mode(TransactionMode::Readwrite)
1972 .build()?;
1973 let obj = tx.object_store(keys::THREAD_SUBSCRIPTIONS)?;
1974
1975 for (room_id, thread_id, subscription) in updates {
1976 let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room_id, thread_id));
1977 let mut new = PersistedThreadSubscription::from(subscription);
1978
1979 if let Some(previous_value) = obj.get(&encoded_key).await? {
1981 let previous: PersistedThreadSubscription =
1982 self.deserialize_value(&previous_value)?;
1983
1984 if new == previous {
1986 continue;
1987 }
1988 if !compare_thread_subscription_bump_stamps(
1989 previous.bump_stamp,
1990 &mut new.bump_stamp,
1991 ) {
1992 continue;
1993 }
1994 }
1995
1996 let serialized_value = self.serialize_value(&new);
1997 obj.put(&serialized_value?).with_key(encoded_key).build()?;
1998 }
1999
2000 tx.commit().await?;
2001
2002 Ok(())
2003 }
2004
2005 async fn load_thread_subscription(
2006 &self,
2007 room: &RoomId,
2008 thread_id: &EventId,
2009 ) -> Result<Option<StoredThreadSubscription>> {
2010 let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
2011
2012 let js_value = self
2013 .inner
2014 .transaction(keys::THREAD_SUBSCRIPTIONS)
2015 .with_mode(TransactionMode::Readonly)
2016 .build()?
2017 .object_store(keys::THREAD_SUBSCRIPTIONS)?
2018 .get(&encoded_key)
2019 .await?;
2020
2021 let Some(js_value) = js_value else {
2022 return Ok(None);
2024 };
2025
2026 let sub: PersistedThreadSubscription = self.deserialize_value(&js_value)?;
2027
2028 let status = ThreadSubscriptionStatus::from_str(&sub.status).map_err(|_| {
2029 StoreError::InvalidData {
2030 details: format!(
2031 "invalid thread status for room {room} and thread {thread_id}: {}",
2032 sub.status
2033 ),
2034 }
2035 })?;
2036
2037 Ok(Some(StoredThreadSubscription { status, bump_stamp: sub.bump_stamp }))
2038 }
2039
2040 async fn remove_thread_subscription(&self, room: &RoomId, thread_id: &EventId) -> Result<()> {
2041 let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
2042
2043 let transaction = self
2044 .inner
2045 .transaction(keys::THREAD_SUBSCRIPTIONS)
2046 .with_mode(TransactionMode::Readwrite)
2047 .build()?;
2048 transaction.object_store(keys::THREAD_SUBSCRIPTIONS)?.delete(&encoded_key).await?;
2049 transaction.commit().await?;
2050
2051 Ok(())
2052 }
2053
2054 #[allow(clippy::unused_async)]
2055 async fn optimize(&self) -> Result<()> {
2056 Ok(())
2057 }
2058
2059 #[allow(clippy::unused_async)]
2060 async fn get_size(&self) -> Result<Option<usize>> {
2061 Ok(None)
2062 }
2063});
2064
2065#[derive(Debug, Serialize, Deserialize)]
2067struct RoomMember {
2068 user_id: OwnedUserId,
2069 membership: MembershipState,
2070}
2071
2072impl From<&SyncStateEvent<RoomMemberEventContent>> for RoomMember {
2073 fn from(event: &SyncStateEvent<RoomMemberEventContent>) -> Self {
2074 Self { user_id: event.state_key().clone(), membership: event.membership().clone() }
2075 }
2076}
2077
2078impl From<&StrippedRoomMemberEvent> for RoomMember {
2079 fn from(event: &StrippedRoomMemberEvent) -> Self {
2080 Self { user_id: event.state_key.clone(), membership: event.content.membership.clone() }
2081 }
2082}
2083
2084#[cfg(test)]
2085mod migration_tests {
2086 use assert_matches2::assert_matches;
2087 use matrix_sdk_base::store::{QueuedRequestKind, SerializableEventContent};
2088 use ruma::{
2089 OwnedRoomId, OwnedTransactionId, TransactionId,
2090 events::room::message::RoomMessageEventContent, room_id,
2091 };
2092 use serde::{Deserialize, Serialize};
2093
2094 use crate::state_store::PersistedQueuedRequest;
2095
2096 #[derive(Serialize, Deserialize)]
2097 struct OldPersistedQueuedRequest {
2098 room_id: OwnedRoomId,
2099 event: SerializableEventContent,
2100 transaction_id: OwnedTransactionId,
2101 is_wedged: bool,
2102 }
2103
2104 #[test]
2108 fn test_migrating_persisted_queue_event_serialization() {
2109 let room_a_id = room_id!("!room_a:dummy.local");
2110 let transaction_id = TransactionId::new();
2111 let content =
2112 SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())
2113 .unwrap();
2114
2115 let old_persisted_queue_event = OldPersistedQueuedRequest {
2116 room_id: room_a_id.to_owned(),
2117 event: content,
2118 transaction_id: transaction_id.clone(),
2119 is_wedged: true,
2120 };
2121
2122 let serialized_persisted = serde_json::to_vec(&old_persisted_queue_event).unwrap();
2123
2124 let new_persisted: PersistedQueuedRequest =
2126 serde_json::from_slice(&serialized_persisted).unwrap();
2127
2128 assert_eq!(new_persisted.is_wedged, Some(true));
2129 assert!(new_persisted.error.is_none());
2130
2131 assert!(new_persisted.event.is_some());
2132 assert!(new_persisted.kind.is_none());
2133
2134 let queued = new_persisted.into_queued_request().unwrap();
2135 assert_matches!(queued.kind, QueuedRequestKind::Event { .. });
2136 assert_eq!(queued.transaction_id, transaction_id);
2137 assert!(queued.error.is_some());
2138 }
2139}
2140
2141#[cfg(all(test, target_family = "wasm"))]
2142mod tests {
2143 #[cfg(target_family = "wasm")]
2144 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
2145
2146 use matrix_sdk_base::statestore_integration_tests;
2147 use uuid::Uuid;
2148
2149 use super::{IndexeddbStateStore, Result};
2150
2151 async fn get_store() -> Result<IndexeddbStateStore> {
2152 let db_name = format!("test-state-plain-{}", Uuid::new_v4().as_hyphenated());
2153 Ok(IndexeddbStateStore::builder().name(db_name).build().await?)
2154 }
2155
2156 statestore_integration_tests!();
2157}
2158
2159#[cfg(all(test, target_family = "wasm"))]
2160mod encrypted_tests {
2161 #[cfg(target_family = "wasm")]
2162 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
2163
2164 use matrix_sdk_base::statestore_integration_tests;
2165 use uuid::Uuid;
2166
2167 use super::{IndexeddbStateStore, Result};
2168
2169 async fn get_store() -> Result<IndexeddbStateStore> {
2170 let db_name = format!("test-state-encrypted-{}", Uuid::new_v4().as_hyphenated());
2171 let passphrase = format!("some_passphrase-{}", Uuid::new_v4().as_hyphenated());
2172 Ok(IndexeddbStateStore::builder().name(db_name).passphrase(passphrase).build().await?)
2173 }
2174
2175 statestore_integration_tests!();
2176}