1use std::{
16 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
17 str::FromStr as _,
18 sync::Arc,
19};
20
21use async_trait::async_trait;
22use gloo_utils::format::JsValueSerdeExt;
23use growable_bloom_filter::GrowableBloom;
24use indexed_db_futures::{
25 KeyRange, cursor::CursorDirection, database::Database, error::OpenDbError, prelude::*,
26 transaction::TransactionMode,
27};
28use matrix_sdk_base::{
29 MinimalRoomMemberEvent, ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK, RoomInfo,
30 RoomMemberships, StateStoreDataKey, StateStoreDataValue, ThreadSubscriptionCatchupToken,
31 deserialized_responses::{DisplayName, RawAnySyncOrStrippedState},
32 store::{
33 ChildTransactionId, ComposerDraft, DependentQueuedRequest, DependentQueuedRequestKind,
34 QueuedRequest, QueuedRequestKind, RoomLoadSettings, SentRequestKey,
35 SerializableEventContent, StateChanges, StateStore, StoreError, StoredThreadSubscription,
36 SupportedVersionsResponse, ThreadSubscriptionStatus, TtlStoreValue, WellKnownResponse,
37 compare_thread_subscription_bump_stamps,
38 },
39};
40use matrix_sdk_store_encryption::{Error as EncryptionError, StoreCipher};
41use ruma::{
42 CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri,
43 OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
44 canonical_json::{RedactedBecause, redact},
45 events::{
46 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
47 GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, SyncStateEvent,
48 presence::PresenceEvent,
49 receipt::{Receipt, ReceiptThread, ReceiptType},
50 room::member::{
51 MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent, SyncRoomMemberEvent,
52 },
53 },
54 serde::Raw,
55};
56use serde::{Deserialize, Serialize, de::DeserializeOwned, ser::Error};
57use tracing::{debug, warn};
58use wasm_bindgen::JsValue;
59
60mod migrations;
61
62pub use self::migrations::MigrationConflictStrategy;
63use self::migrations::{upgrade_inner_db, upgrade_meta_db};
64use crate::{error::GenericError, serializer::safe_encode::traits::SafeEncode};
65
66#[derive(Debug, thiserror::Error)]
67pub enum IndexeddbStateStoreError {
68 #[error(transparent)]
69 Json(#[from] serde_json::Error),
70 #[error(transparent)]
71 Encryption(#[from] EncryptionError),
72 #[error("DomException {name} ({code}): {message}")]
73 DomException { name: String, message: String, code: u16 },
74 #[error(transparent)]
75 StoreError(#[from] StoreError),
76 #[error(
77 "Can't migrate {name} from {old_version} to {new_version} without deleting data. \
78 See MigrationConflictStrategy for ways to configure."
79 )]
80 MigrationConflict { name: String, old_version: u32, new_version: u32 },
81}
82
83impl From<GenericError> for IndexeddbStateStoreError {
84 fn from(value: GenericError) -> Self {
85 Self::StoreError(value.into())
86 }
87}
88
89impl From<web_sys::DomException> for IndexeddbStateStoreError {
90 fn from(frm: web_sys::DomException) -> IndexeddbStateStoreError {
91 IndexeddbStateStoreError::DomException {
92 name: frm.name(),
93 message: frm.message(),
94 code: frm.code(),
95 }
96 }
97}
98
99impl From<IndexeddbStateStoreError> for StoreError {
100 fn from(e: IndexeddbStateStoreError) -> Self {
101 match e {
102 IndexeddbStateStoreError::Json(e) => StoreError::Json(e),
103 IndexeddbStateStoreError::StoreError(e) => e,
104 IndexeddbStateStoreError::Encryption(e) => StoreError::Encryption(e),
105 _ => StoreError::backend(e),
106 }
107 }
108}
109
110impl From<indexed_db_futures::error::DomException> for IndexeddbStateStoreError {
111 fn from(value: indexed_db_futures::error::DomException) -> Self {
112 web_sys::DomException::from(value).into()
113 }
114}
115
116impl From<indexed_db_futures::error::SerialisationError> for IndexeddbStateStoreError {
117 fn from(value: indexed_db_futures::error::SerialisationError) -> Self {
118 Self::Json(serde_json::Error::custom(value.to_string()))
119 }
120}
121
122impl From<indexed_db_futures::error::UnexpectedDataError> for IndexeddbStateStoreError {
123 fn from(value: indexed_db_futures::error::UnexpectedDataError) -> Self {
124 IndexeddbStateStoreError::StoreError(StoreError::backend(value))
125 }
126}
127
128impl From<indexed_db_futures::error::JSError> for IndexeddbStateStoreError {
129 fn from(value: indexed_db_futures::error::JSError) -> Self {
130 GenericError::from(value.to_string()).into()
131 }
132}
133
134impl From<indexed_db_futures::error::Error> for IndexeddbStateStoreError {
135 fn from(value: indexed_db_futures::error::Error) -> Self {
136 use indexed_db_futures::error::Error;
137 match value {
138 Error::DomException(e) => e.into(),
139 Error::Serialisation(e) => e.into(),
140 Error::MissingData(e) => e.into(),
141 Error::Unknown(e) => e.into(),
142 }
143 }
144}
145
146impl From<OpenDbError> for IndexeddbStateStoreError {
147 fn from(value: OpenDbError) -> Self {
148 match value {
149 OpenDbError::Base(error) => error.into(),
150 _ => GenericError::from(value.to_string()).into(),
151 }
152 }
153}
154
155mod keys {
156 pub const INTERNAL_STATE: &str = "matrix-sdk-state";
157 pub const BACKUPS_META: &str = "backups";
158
159 pub const ACCOUNT_DATA: &str = "account_data";
160
161 pub const PROFILES: &str = "profiles";
162 pub const DISPLAY_NAMES: &str = "display_names";
163 pub const USER_IDS: &str = "user_ids";
164
165 pub const ROOM_STATE: &str = "room_state";
166 pub const ROOM_INFOS: &str = "room_infos";
167 pub const PRESENCE: &str = "presence";
168 pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
169 pub const ROOM_SEND_QUEUE: &str = "room_send_queue";
171 pub const DEPENDENT_SEND_QUEUE: &str = "room_dependent_send_queue";
173 pub const THREAD_SUBSCRIPTIONS: &str = "room_thread_subscriptions";
174
175 pub const STRIPPED_ROOM_STATE: &str = "stripped_room_state";
176 pub const STRIPPED_USER_IDS: &str = "stripped_user_ids";
177
178 pub const ROOM_USER_RECEIPTS: &str = "room_user_receipts";
179 pub const ROOM_EVENT_RECEIPTS: &str = "room_event_receipts";
180
181 pub const CUSTOM: &str = "custom";
182 pub const KV: &str = "kv";
183
184 pub const ALL_STORES: &[&str] = &[
186 ACCOUNT_DATA,
187 PROFILES,
188 DISPLAY_NAMES,
189 USER_IDS,
190 ROOM_STATE,
191 ROOM_INFOS,
192 PRESENCE,
193 ROOM_ACCOUNT_DATA,
194 STRIPPED_ROOM_STATE,
195 STRIPPED_USER_IDS,
196 ROOM_USER_RECEIPTS,
197 ROOM_EVENT_RECEIPTS,
198 ROOM_SEND_QUEUE,
199 THREAD_SUBSCRIPTIONS,
200 DEPENDENT_SEND_QUEUE,
201 CUSTOM,
202 KV,
203 ];
204
205 pub const STORE_KEY: &str = "store_key";
208}
209
210pub use keys::ALL_STORES;
211use matrix_sdk_base::store::QueueWedgeError;
212
213fn serialize_value(store_cipher: Option<&StoreCipher>, event: &impl Serialize) -> Result<JsValue> {
215 Ok(match store_cipher {
216 Some(cipher) => {
217 let data = serde_json::to_vec(event)?;
218 JsValue::from_serde(&cipher.encrypt_value_data(data)?)?
219 }
220 None => JsValue::from_serde(event)?,
221 })
222}
223
224fn deserialize_value<T: DeserializeOwned>(
226 store_cipher: Option<&StoreCipher>,
227 event: &JsValue,
228) -> Result<T> {
229 match store_cipher {
230 Some(cipher) => {
231 use zeroize::Zeroize;
232 let mut plaintext = cipher.decrypt_value_data(event.into_serde()?)?;
233 let ret = serde_json::from_slice(&plaintext);
234 plaintext.zeroize();
235 Ok(ret?)
236 }
237 None => Ok(event.into_serde()?),
238 }
239}
240
241fn encode_key<T>(store_cipher: Option<&StoreCipher>, table_name: &str, key: T) -> JsValue
242where
243 T: SafeEncode,
244{
245 match store_cipher {
246 Some(cipher) => key.as_secure_string(table_name, cipher),
247 None => key.as_encoded_string(),
248 }
249 .into()
250}
251
252fn encode_to_range<T>(
253 store_cipher: Option<&StoreCipher>,
254 table_name: &str,
255 key: T,
256) -> KeyRange<JsValue>
257where
258 T: SafeEncode,
259{
260 match store_cipher {
261 Some(cipher) => key.encode_to_range_secure(table_name, cipher),
262 None => key.encode_to_range(),
263 }
264}
265
266#[derive(Debug)]
268pub struct IndexeddbStateStoreBuilder {
269 name: Option<String>,
270 passphrase: Option<String>,
271 migration_conflict_strategy: MigrationConflictStrategy,
272}
273
274impl IndexeddbStateStoreBuilder {
275 fn new() -> Self {
276 Self {
277 name: None,
278 passphrase: None,
279 migration_conflict_strategy: MigrationConflictStrategy::BackupAndDrop,
280 }
281 }
282
283 pub fn name(mut self, value: String) -> Self {
285 self.name = Some(value);
286 self
287 }
288
289 pub fn passphrase(mut self, value: String) -> Self {
293 self.passphrase = Some(value);
294 self
295 }
296
297 pub fn migration_conflict_strategy(mut self, value: MigrationConflictStrategy) -> Self {
301 self.migration_conflict_strategy = value;
302 self
303 }
304
305 pub async fn build(self) -> Result<IndexeddbStateStore> {
306 let migration_strategy = self.migration_conflict_strategy.clone();
307 let name = self.name.unwrap_or_else(|| "state".to_owned());
308
309 let meta_name = format!("{name}::{}", keys::INTERNAL_STATE);
310
311 let (meta, store_cipher) = upgrade_meta_db(&meta_name, self.passphrase.as_deref()).await?;
312 let inner =
313 upgrade_inner_db(&name, store_cipher.as_deref(), migration_strategy, &meta).await?;
314
315 Ok(IndexeddbStateStore { name, inner, meta, store_cipher })
316 }
317}
318
319pub struct IndexeddbStateStore {
320 name: String,
321 pub(crate) inner: Database,
322 pub(crate) meta: Database,
323 pub(crate) store_cipher: Option<Arc<StoreCipher>>,
324}
325
326#[cfg(not(tarpaulin_include))]
327impl std::fmt::Debug for IndexeddbStateStore {
328 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
329 f.debug_struct("IndexeddbStateStore").field("name", &self.name).finish()
330 }
331}
332
333type Result<A, E = IndexeddbStateStoreError> = std::result::Result<A, E>;
334
335impl IndexeddbStateStore {
336 pub fn builder() -> IndexeddbStateStoreBuilder {
338 IndexeddbStateStoreBuilder::new()
339 }
340
341 pub fn version(&self) -> u32 {
343 self.inner.version() as u32
344 }
345
346 pub fn meta_version(&self) -> u32 {
348 self.meta.version() as u32
349 }
350
351 pub async fn has_backups(&self) -> Result<bool> {
353 Ok(self
354 .meta
355 .transaction(keys::BACKUPS_META)
356 .with_mode(TransactionMode::Readonly)
357 .build()?
358 .object_store(keys::BACKUPS_META)?
359 .count()
360 .await?
361 > 0)
362 }
363
364 pub async fn latest_backup(&self) -> Result<Option<String>> {
366 if let Some(mut cursor) = self
367 .meta
368 .transaction(keys::BACKUPS_META)
369 .with_mode(TransactionMode::Readonly)
370 .build()?
371 .object_store(keys::BACKUPS_META)?
372 .open_cursor()
373 .with_direction(CursorDirection::Prev)
374 .await?
375 && let Some(record) = cursor.next_record::<JsValue>().await?
376 {
377 Ok(record.as_string())
378 } else {
379 Ok(None)
380 }
381 }
382
383 fn serialize_value(&self, event: &impl Serialize) -> Result<JsValue> {
385 serialize_value(self.store_cipher.as_deref(), event)
386 }
387
388 fn deserialize_value<T: DeserializeOwned>(&self, event: &JsValue) -> Result<T> {
390 deserialize_value(self.store_cipher.as_deref(), event)
391 }
392
393 fn encode_key<T>(&self, table_name: &str, key: T) -> JsValue
394 where
395 T: SafeEncode,
396 {
397 encode_key(self.store_cipher.as_deref(), table_name, key)
398 }
399
400 fn encode_to_range<T>(&self, table_name: &str, key: T) -> KeyRange<JsValue>
401 where
402 T: SafeEncode,
403 {
404 encode_to_range(self.store_cipher.as_deref(), table_name, key)
405 }
406
407 pub async fn get_user_ids_inner(
410 &self,
411 room_id: &RoomId,
412 memberships: RoomMemberships,
413 stripped: bool,
414 ) -> Result<Vec<OwnedUserId>> {
415 let store_name = if stripped { keys::STRIPPED_USER_IDS } else { keys::USER_IDS };
416
417 let tx = self.inner.transaction(store_name).with_mode(TransactionMode::Readonly).build()?;
418 let store = tx.object_store(store_name)?;
419 let range = self.encode_to_range(store_name, room_id);
420
421 let user_ids = if memberships.is_empty() {
422 store
424 .get_all()
425 .with_query(&range)
426 .await?
427 .filter_map(Result::ok)
428 .filter_map(|f| self.deserialize_value::<RoomMember>(&f).ok().map(|m| m.user_id))
429 .collect::<Vec<_>>()
430 } else {
431 let mut user_ids = Vec::new();
432 let cursor = store.open_cursor().with_query(&range).await?;
433
434 if let Some(mut cursor) = cursor {
435 while let Some(value) = cursor.next_record().await? {
436 let member = self.deserialize_value::<RoomMember>(&value)?;
437
438 if memberships.matches(&member.membership) {
439 user_ids.push(member.user_id);
440 }
441 }
442 }
443
444 user_ids
445 };
446
447 Ok(user_ids)
448 }
449
450 async fn get_custom_value_for_js(&self, jskey: &JsValue) -> Result<Option<Vec<u8>>> {
451 self.inner
452 .transaction(keys::CUSTOM)
453 .with_mode(TransactionMode::Readonly)
454 .build()?
455 .object_store(keys::CUSTOM)?
456 .get(jskey)
457 .await?
458 .map(|f| self.deserialize_value(&f))
459 .transpose()
460 }
461
462 fn encode_kv_data_key(&self, key: StateStoreDataKey<'_>) -> JsValue {
463 match key {
467 StateStoreDataKey::SyncToken => {
468 self.encode_key(StateStoreDataKey::SYNC_TOKEN, StateStoreDataKey::SYNC_TOKEN)
469 }
470 StateStoreDataKey::SupportedVersions => self.encode_key(
471 StateStoreDataKey::SUPPORTED_VERSIONS,
472 StateStoreDataKey::SUPPORTED_VERSIONS,
473 ),
474 StateStoreDataKey::WellKnown => {
475 self.encode_key(keys::KV, StateStoreDataKey::WELL_KNOWN)
476 }
477 StateStoreDataKey::Filter(filter_name) => {
478 self.encode_key(StateStoreDataKey::FILTER, (StateStoreDataKey::FILTER, filter_name))
479 }
480 StateStoreDataKey::UserAvatarUrl(user_id) => {
481 self.encode_key(keys::KV, (StateStoreDataKey::USER_AVATAR_URL, user_id))
482 }
483 StateStoreDataKey::RecentlyVisitedRooms(user_id) => {
484 self.encode_key(keys::KV, (StateStoreDataKey::RECENTLY_VISITED_ROOMS, user_id))
485 }
486 StateStoreDataKey::UtdHookManagerData => {
487 self.encode_key(keys::KV, StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
488 }
489 StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
490 self.encode_key(keys::KV, StateStoreDataKey::ONE_TIME_KEY_ALREADY_UPLOADED)
491 }
492 StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
493 if let Some(thread_root) = thread_root {
494 self.encode_key(
495 keys::KV,
496 (StateStoreDataKey::COMPOSER_DRAFT, (room_id, thread_root)),
497 )
498 } else {
499 self.encode_key(keys::KV, (StateStoreDataKey::COMPOSER_DRAFT, room_id))
500 }
501 }
502 StateStoreDataKey::SeenKnockRequests(room_id) => {
503 self.encode_key(keys::KV, (StateStoreDataKey::SEEN_KNOCK_REQUESTS, room_id))
504 }
505 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
506 self.encode_key(keys::KV, StateStoreDataKey::THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS)
507 }
508 }
509 }
510}
511
512#[derive(Serialize, Deserialize)]
515struct PersistedQueuedRequest {
516 pub room_id: OwnedRoomId,
518
519 kind: Option<QueuedRequestKind>,
522 transaction_id: OwnedTransactionId,
523
524 pub error: Option<QueueWedgeError>,
525
526 priority: Option<usize>,
527
528 #[serde(default = "created_now")]
530 created_at: MilliSecondsSinceUnixEpoch,
531
532 is_wedged: Option<bool>,
535
536 event: Option<SerializableEventContent>,
537}
538
539fn created_now() -> MilliSecondsSinceUnixEpoch {
540 MilliSecondsSinceUnixEpoch::now()
541}
542
543impl PersistedQueuedRequest {
544 fn into_queued_request(self) -> Option<QueuedRequest> {
545 let kind =
546 self.kind.or_else(|| self.event.map(|content| QueuedRequestKind::Event { content }))?;
547
548 let error = match self.is_wedged {
549 Some(true) => {
550 Some(QueueWedgeError::GenericApiError {
552 msg: "local echo failed to send in a previous session".into(),
553 })
554 }
555 _ => self.error,
556 };
557
558 let priority = self.priority.unwrap_or(0);
560
561 Some(QueuedRequest {
562 kind,
563 transaction_id: self.transaction_id,
564 error,
565 priority,
566 created_at: self.created_at,
567 })
568 }
569}
570
571#[derive(Serialize, Deserialize, PartialEq)]
572struct PersistedThreadSubscription {
573 status: String,
574 bump_stamp: Option<u64>,
575}
576
577impl From<StoredThreadSubscription> for PersistedThreadSubscription {
578 fn from(value: StoredThreadSubscription) -> Self {
579 Self { status: value.status.as_str().to_owned(), bump_stamp: value.bump_stamp }
580 }
581}
582
583#[cfg(target_family = "wasm")]
592macro_rules! impl_state_store {
593 ({ $($body:tt)* }) => {
594 #[async_trait(?Send)]
595 impl StateStore for IndexeddbStateStore {
596 type Error = IndexeddbStateStoreError;
597
598 $($body)*
599 }
600 };
601}
602
603#[cfg(not(target_family = "wasm"))]
604macro_rules! impl_state_store {
605 ({ $($body:tt)* }) => {
606 impl IndexeddbStateStore {
607 $($body)*
608 }
609 };
610}
611
612impl_state_store!({
613 async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
614 let encoded_key = self.encode_kv_data_key(key);
615
616 let value = self
617 .inner
618 .transaction(keys::KV)
619 .with_mode(TransactionMode::Readonly)
620 .build()?
621 .object_store(keys::KV)?
622 .get(&encoded_key)
623 .await?;
624
625 let value = match key {
626 StateStoreDataKey::SyncToken => value
627 .map(|f| self.deserialize_value::<String>(&f))
628 .transpose()?
629 .map(StateStoreDataValue::SyncToken),
630 StateStoreDataKey::SupportedVersions => value
631 .map(|f| self.deserialize_value::<TtlStoreValue<SupportedVersionsResponse>>(&f))
632 .transpose()?
633 .map(StateStoreDataValue::SupportedVersions),
634 StateStoreDataKey::WellKnown => value
635 .map(|f| self.deserialize_value::<TtlStoreValue<Option<WellKnownResponse>>>(&f))
636 .transpose()?
637 .map(StateStoreDataValue::WellKnown),
638 StateStoreDataKey::Filter(_) => value
639 .map(|f| self.deserialize_value::<String>(&f))
640 .transpose()?
641 .map(StateStoreDataValue::Filter),
642 StateStoreDataKey::UserAvatarUrl(_) => value
643 .map(|f| self.deserialize_value::<OwnedMxcUri>(&f))
644 .transpose()?
645 .map(StateStoreDataValue::UserAvatarUrl),
646 StateStoreDataKey::RecentlyVisitedRooms(_) => value
647 .map(|f| self.deserialize_value::<Vec<OwnedRoomId>>(&f))
648 .transpose()?
649 .map(StateStoreDataValue::RecentlyVisitedRooms),
650 StateStoreDataKey::UtdHookManagerData => value
651 .map(|f| self.deserialize_value::<GrowableBloom>(&f))
652 .transpose()?
653 .map(StateStoreDataValue::UtdHookManagerData),
654 StateStoreDataKey::OneTimeKeyAlreadyUploaded => value
655 .map(|f| self.deserialize_value::<bool>(&f))
656 .transpose()?
657 .map(|_| StateStoreDataValue::OneTimeKeyAlreadyUploaded),
658 StateStoreDataKey::ComposerDraft(_, _) => value
659 .map(|f| self.deserialize_value::<ComposerDraft>(&f))
660 .transpose()?
661 .map(StateStoreDataValue::ComposerDraft),
662 StateStoreDataKey::SeenKnockRequests(_) => value
663 .map(|f| self.deserialize_value::<BTreeMap<OwnedEventId, OwnedUserId>>(&f))
664 .transpose()?
665 .map(StateStoreDataValue::SeenKnockRequests),
666 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => value
667 .map(|f| self.deserialize_value::<Vec<ThreadSubscriptionCatchupToken>>(&f))
668 .transpose()?
669 .map(StateStoreDataValue::ThreadSubscriptionsCatchupTokens),
670 };
671
672 Ok(value)
673 }
674
675 async fn set_kv_data(
676 &self,
677 key: StateStoreDataKey<'_>,
678 value: StateStoreDataValue,
679 ) -> Result<()> {
680 let encoded_key = self.encode_kv_data_key(key);
681
682 let serialized_value = match key {
683 StateStoreDataKey::SyncToken => self
684 .serialize_value(&value.into_sync_token().expect("Session data not a sync token")),
685 StateStoreDataKey::SupportedVersions => self.serialize_value(
686 &value
687 .into_supported_versions()
688 .expect("Session data not containing supported versions"),
689 ),
690 StateStoreDataKey::WellKnown => self.serialize_value(
691 &value.into_well_known().expect("Session data not containing well-known"),
692 ),
693 StateStoreDataKey::Filter(_) => {
694 self.serialize_value(&value.into_filter().expect("Session data not a filter"))
695 }
696 StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
697 &value.into_user_avatar_url().expect("Session data not an user avatar url"),
698 ),
699 StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
700 &value
701 .into_recently_visited_rooms()
702 .expect("Session data not a recently visited room list"),
703 ),
704 StateStoreDataKey::UtdHookManagerData => self.serialize_value(
705 &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
706 ),
707 StateStoreDataKey::OneTimeKeyAlreadyUploaded => self.serialize_value(&true),
708 StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
709 &value.into_composer_draft().expect("Session data not a composer draft"),
710 ),
711 StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
712 &value
713 .into_seen_knock_requests()
714 .expect("Session data is not a set of seen knock request ids"),
715 ),
716 StateStoreDataKey::ThreadSubscriptionsCatchupTokens => self.serialize_value(
717 &value
718 .into_thread_subscriptions_catchup_tokens()
719 .expect("Session data is not a list of thread subscription catchup tokens"),
720 ),
721 };
722
723 let tx = self.inner.transaction(keys::KV).with_mode(TransactionMode::Readwrite).build()?;
724
725 let obj = tx.object_store(keys::KV)?;
726
727 obj.put(&serialized_value?).with_key(encoded_key).build()?;
728
729 tx.commit().await?;
730
731 Ok(())
732 }
733
734 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
735 let encoded_key = self.encode_kv_data_key(key);
736
737 let tx = self.inner.transaction(keys::KV).with_mode(TransactionMode::Readwrite).build()?;
738 let obj = tx.object_store(keys::KV)?;
739
740 obj.delete(&encoded_key).build()?;
741
742 tx.commit().await?;
743
744 Ok(())
745 }
746
747 async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
748 let mut stores: HashSet<&'static str> = [
749 (changes.sync_token.is_some(), keys::KV),
750 (!changes.ambiguity_maps.is_empty(), keys::DISPLAY_NAMES),
751 (!changes.account_data.is_empty(), keys::ACCOUNT_DATA),
752 (!changes.presence.is_empty(), keys::PRESENCE),
753 (
754 !changes.profiles.is_empty() || !changes.profiles_to_delete.is_empty(),
755 keys::PROFILES,
756 ),
757 (!changes.room_account_data.is_empty(), keys::ROOM_ACCOUNT_DATA),
758 (!changes.receipts.is_empty(), keys::ROOM_EVENT_RECEIPTS),
759 ]
760 .iter()
761 .filter_map(|(id, key)| if *id { Some(*key) } else { None })
762 .collect();
763
764 if !changes.state.is_empty() {
765 stores.extend([
766 keys::ROOM_STATE,
767 keys::USER_IDS,
768 keys::STRIPPED_USER_IDS,
769 keys::STRIPPED_ROOM_STATE,
770 keys::PROFILES,
771 ]);
772 }
773
774 if !changes.redactions.is_empty() {
775 stores.extend([keys::ROOM_STATE, keys::ROOM_INFOS]);
776 }
777
778 if !changes.room_infos.is_empty() {
779 stores.insert(keys::ROOM_INFOS);
780 }
781
782 if !changes.stripped_state.is_empty() {
783 stores.extend([keys::STRIPPED_ROOM_STATE, keys::STRIPPED_USER_IDS]);
784 }
785
786 if !changes.receipts.is_empty() {
787 stores.extend([keys::ROOM_EVENT_RECEIPTS, keys::ROOM_USER_RECEIPTS])
788 }
789
790 if stores.is_empty() {
791 return Ok(());
793 }
794
795 let stores: Vec<&'static str> = stores.into_iter().collect();
796 let tx = self.inner.transaction(stores).with_mode(TransactionMode::Readwrite).build()?;
797
798 if let Some(s) = &changes.sync_token {
799 tx.object_store(keys::KV)?
800 .put(&self.serialize_value(s)?)
801 .with_key(self.encode_kv_data_key(StateStoreDataKey::SyncToken))
802 .build()?;
803 }
804
805 if !changes.ambiguity_maps.is_empty() {
806 let store = tx.object_store(keys::DISPLAY_NAMES)?;
807 for (room_id, ambiguity_maps) in &changes.ambiguity_maps {
808 for (display_name, map) in ambiguity_maps {
809 let key = self.encode_key(
810 keys::DISPLAY_NAMES,
811 (
812 room_id,
813 display_name
814 .as_normalized_str()
815 .unwrap_or_else(|| display_name.as_raw_str()),
816 ),
817 );
818
819 store.put(&self.serialize_value(&map)?).with_key(key).build()?;
820 }
821 }
822 }
823
824 if !changes.account_data.is_empty() {
825 let store = tx.object_store(keys::ACCOUNT_DATA)?;
826 for (event_type, event) in &changes.account_data {
827 store
828 .put(&self.serialize_value(&event)?)
829 .with_key(self.encode_key(keys::ACCOUNT_DATA, event_type))
830 .build()?;
831 }
832 }
833
834 if !changes.room_account_data.is_empty() {
835 let store = tx.object_store(keys::ROOM_ACCOUNT_DATA)?;
836 for (room, events) in &changes.room_account_data {
837 for (event_type, event) in events {
838 let key = self.encode_key(keys::ROOM_ACCOUNT_DATA, (room, event_type));
839 store.put(&self.serialize_value(&event)?).with_key(key).build()?;
840 }
841 }
842 }
843
844 if !changes.state.is_empty() {
845 let state = tx.object_store(keys::ROOM_STATE)?;
846 let profiles = tx.object_store(keys::PROFILES)?;
847 let user_ids = tx.object_store(keys::USER_IDS)?;
848 let stripped_state = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
849 let stripped_user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
850
851 for (room, user_ids) in &changes.profiles_to_delete {
852 for user_id in user_ids {
853 let key = self.encode_key(keys::PROFILES, (room, user_id));
854 profiles.delete(&key).build()?;
855 }
856 }
857
858 for (room, event_types) in &changes.state {
859 let profile_changes = changes.profiles.get(room);
860
861 for (event_type, events) in event_types {
862 for (state_key, raw_event) in events {
863 let key = self.encode_key(keys::ROOM_STATE, (room, event_type, state_key));
864 state
865 .put(&self.serialize_value(&raw_event)?)
866 .with_key(key.clone())
867 .build()?;
868 stripped_state.delete(&key).build()?;
869
870 if *event_type == StateEventType::RoomMember {
871 let event =
872 match raw_event.deserialize_as_unchecked::<SyncRoomMemberEvent>() {
873 Ok(ev) => ev,
874 Err(e) => {
875 let event_id: Option<String> =
876 raw_event.get_field("event_id").ok().flatten();
877 debug!(event_id, "Failed to deserialize member event: {e}");
878 continue;
879 }
880 };
881
882 let key = (room, state_key);
883
884 stripped_user_ids
885 .delete(&self.encode_key(keys::STRIPPED_USER_IDS, key))
886 .build()?;
887
888 user_ids
889 .put(&self.serialize_value(&RoomMember::from(&event))?)
890 .with_key(self.encode_key(keys::USER_IDS, key))
891 .build()?;
892
893 if let Some(profile) =
894 profile_changes.and_then(|p| p.get(event.state_key()))
895 {
896 profiles
897 .put(&self.serialize_value(&profile)?)
898 .with_key(self.encode_key(keys::PROFILES, key))
899 .build()?;
900 }
901 }
902 }
903 }
904 }
905 }
906
907 if !changes.room_infos.is_empty() {
908 let room_infos = tx.object_store(keys::ROOM_INFOS)?;
909 for (room_id, room_info) in &changes.room_infos {
910 room_infos
911 .put(&self.serialize_value(&room_info)?)
912 .with_key(self.encode_key(keys::ROOM_INFOS, room_id))
913 .build()?;
914 }
915 }
916
917 if !changes.presence.is_empty() {
918 let store = tx.object_store(keys::PRESENCE)?;
919 for (sender, event) in &changes.presence {
920 store
921 .put(&self.serialize_value(&event)?)
922 .with_key(self.encode_key(keys::PRESENCE, sender))
923 .build()?;
924 }
925 }
926
927 if !changes.stripped_state.is_empty() {
928 let store = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
929 let user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
930
931 for (room, event_types) in &changes.stripped_state {
932 for (event_type, events) in event_types {
933 for (state_key, raw_event) in events {
934 let key = self
935 .encode_key(keys::STRIPPED_ROOM_STATE, (room, event_type, state_key));
936 store.put(&self.serialize_value(&raw_event)?).with_key(key).build()?;
937
938 if *event_type == StateEventType::RoomMember {
939 let event = match raw_event
940 .deserialize_as_unchecked::<StrippedRoomMemberEvent>()
941 {
942 Ok(ev) => ev,
943 Err(e) => {
944 let event_id: Option<String> =
945 raw_event.get_field("event_id").ok().flatten();
946 debug!(
947 event_id,
948 "Failed to deserialize stripped member event: {e}"
949 );
950 continue;
951 }
952 };
953
954 let key = (room, state_key);
955
956 user_ids
957 .put(&self.serialize_value(&RoomMember::from(&event))?)
958 .with_key(self.encode_key(keys::STRIPPED_USER_IDS, key))
959 .build()?;
960 }
961 }
962 }
963 }
964 }
965
966 if !changes.receipts.is_empty() {
967 let room_user_receipts = tx.object_store(keys::ROOM_USER_RECEIPTS)?;
968 let room_event_receipts = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
969
970 for (room, content) in &changes.receipts {
971 for (event_id, receipts) in &content.0 {
972 for (receipt_type, receipts) in receipts {
973 for (user_id, receipt) in receipts {
974 let key = match receipt.thread.as_str() {
975 Some(thread_id) => self.encode_key(
976 keys::ROOM_USER_RECEIPTS,
977 (room, receipt_type, thread_id, user_id),
978 ),
979 None => self.encode_key(
980 keys::ROOM_USER_RECEIPTS,
981 (room, receipt_type, user_id),
982 ),
983 };
984
985 if let Some((old_event, _)) =
986 room_user_receipts.get(&key).await?.and_then(|f| {
987 self.deserialize_value::<(OwnedEventId, Receipt)>(&f).ok()
988 })
989 {
990 let key = match receipt.thread.as_str() {
991 Some(thread_id) => self.encode_key(
992 keys::ROOM_EVENT_RECEIPTS,
993 (room, receipt_type, thread_id, old_event, user_id),
994 ),
995 None => self.encode_key(
996 keys::ROOM_EVENT_RECEIPTS,
997 (room, receipt_type, old_event, user_id),
998 ),
999 };
1000 room_event_receipts.delete(&key).build()?;
1001 }
1002
1003 room_user_receipts
1004 .put(&self.serialize_value(&(event_id, receipt))?)
1005 .with_key(key)
1006 .build()?;
1007
1008 let key = match receipt.thread.as_str() {
1010 Some(thread_id) => self.encode_key(
1011 keys::ROOM_EVENT_RECEIPTS,
1012 (room, receipt_type, thread_id, event_id, user_id),
1013 ),
1014 None => self.encode_key(
1015 keys::ROOM_EVENT_RECEIPTS,
1016 (room, receipt_type, event_id, user_id),
1017 ),
1018 };
1019 room_event_receipts
1020 .put(&self.serialize_value(&(user_id, receipt))?)
1021 .with_key(key)
1022 .build()?;
1023 }
1024 }
1025 }
1026 }
1027 }
1028
1029 if !changes.redactions.is_empty() {
1030 let state = tx.object_store(keys::ROOM_STATE)?;
1031 let room_info = tx.object_store(keys::ROOM_INFOS)?;
1032
1033 for (room_id, redactions) in &changes.redactions {
1034 let range = self.encode_to_range(keys::ROOM_STATE, room_id);
1035 let Some(mut cursor) = state.open_cursor().with_query(&range).await? else {
1036 continue;
1037 };
1038
1039 let mut redaction_rules = None;
1040
1041 while let Some(value) = cursor.next_record().await? {
1042 let Some(key) = cursor.key::<JsValue>()? else {
1043 break;
1044 };
1045
1046 let raw_evt = self.deserialize_value::<Raw<AnySyncStateEvent>>(&value)?;
1047 if let Ok(Some(event_id)) = raw_evt.get_field::<OwnedEventId>("event_id")
1048 && let Some(redaction) = redactions.get(&event_id)
1049 {
1050 let redaction_rules = match &redaction_rules {
1051 Some(r) => r,
1052 None => {
1053 let value = room_info
1054 .get(&self.encode_key(keys::ROOM_INFOS, room_id))
1055 .await?
1056 .and_then(|f| self.deserialize_value::<RoomInfo>(&f).ok())
1057 .map(|info| info.room_version_rules_or_default())
1058 .unwrap_or_else(|| {
1059 warn!(
1060 ?room_id,
1061 "Unable to get the room version rules, \
1062 defaulting to rules for room version \
1063 {ROOM_VERSION_FALLBACK}"
1064 );
1065 ROOM_VERSION_RULES_FALLBACK
1066 })
1067 .redaction;
1068 redaction_rules.get_or_insert(value)
1069 }
1070 };
1071
1072 let redacted = redact(
1073 raw_evt.deserialize_as::<CanonicalJsonObject>()?,
1074 redaction_rules,
1075 Some(RedactedBecause::from_raw_event(redaction)?),
1076 )
1077 .map_err(StoreError::Redaction)?;
1078 state.put(&self.serialize_value(&redacted)?).with_key(key).build()?;
1079 }
1080 }
1081 }
1082 }
1083
1084 tx.commit().await.map_err(|e| e.into())
1085 }
1086
1087 async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1088 self.inner
1089 .transaction(keys::PRESENCE)
1090 .with_mode(TransactionMode::Readonly)
1091 .build()?
1092 .object_store(keys::PRESENCE)?
1093 .get(&self.encode_key(keys::PRESENCE, user_id))
1094 .await?
1095 .map(|f| self.deserialize_value(&f))
1096 .transpose()
1097 }
1098
1099 async fn get_presence_events(
1100 &self,
1101 user_ids: &[OwnedUserId],
1102 ) -> Result<Vec<Raw<PresenceEvent>>> {
1103 if user_ids.is_empty() {
1104 return Ok(Vec::new());
1105 }
1106
1107 let txn =
1108 self.inner.transaction(keys::PRESENCE).with_mode(TransactionMode::Readonly).build()?;
1109 let store = txn.object_store(keys::PRESENCE)?;
1110
1111 let mut events = Vec::with_capacity(user_ids.len());
1112
1113 for user_id in user_ids {
1114 if let Some(event) = store
1115 .get(&self.encode_key(keys::PRESENCE, user_id))
1116 .await?
1117 .map(|f| self.deserialize_value(&f))
1118 .transpose()?
1119 {
1120 events.push(event)
1121 }
1122 }
1123
1124 Ok(events)
1125 }
1126
1127 async fn get_state_event(
1128 &self,
1129 room_id: &RoomId,
1130 event_type: StateEventType,
1131 state_key: &str,
1132 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1133 Ok(self
1134 .get_state_events_for_keys(room_id, event_type, &[state_key])
1135 .await?
1136 .into_iter()
1137 .next())
1138 }
1139
1140 async fn get_state_events(
1141 &self,
1142 room_id: &RoomId,
1143 event_type: StateEventType,
1144 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1145 let stripped_range =
1146 self.encode_to_range(keys::STRIPPED_ROOM_STATE, (room_id, &event_type));
1147 let stripped_events = self
1148 .inner
1149 .transaction(keys::STRIPPED_ROOM_STATE)
1150 .with_mode(TransactionMode::Readonly)
1151 .build()?
1152 .object_store(keys::STRIPPED_ROOM_STATE)?
1153 .get_all()
1154 .with_query(&stripped_range)
1155 .await?
1156 .filter_map(Result::ok)
1157 .filter_map(|f| {
1158 self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Stripped)
1159 })
1160 .collect::<Vec<_>>();
1161
1162 if !stripped_events.is_empty() {
1163 return Ok(stripped_events);
1164 }
1165
1166 let range = self.encode_to_range(keys::ROOM_STATE, (room_id, event_type));
1167 Ok(self
1168 .inner
1169 .transaction(keys::ROOM_STATE)
1170 .with_mode(TransactionMode::Readonly)
1171 .build()?
1172 .object_store(keys::ROOM_STATE)?
1173 .get_all()
1174 .with_query(&range)
1175 .await?
1176 .filter_map(Result::ok)
1177 .filter_map(|f| self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Sync))
1178 .collect::<Vec<_>>())
1179 }
1180
1181 async fn get_state_events_for_keys(
1182 &self,
1183 room_id: &RoomId,
1184 event_type: StateEventType,
1185 state_keys: &[&str],
1186 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1187 if state_keys.is_empty() {
1188 return Ok(Vec::new());
1189 }
1190
1191 let mut events = Vec::with_capacity(state_keys.len());
1192
1193 {
1194 let txn = self
1195 .inner
1196 .transaction(keys::STRIPPED_ROOM_STATE)
1197 .with_mode(TransactionMode::Readonly)
1198 .build()?;
1199 let store = txn.object_store(keys::STRIPPED_ROOM_STATE)?;
1200
1201 for state_key in state_keys {
1202 if let Some(event) =
1203 store
1204 .get(&self.encode_key(
1205 keys::STRIPPED_ROOM_STATE,
1206 (room_id, &event_type, state_key),
1207 ))
1208 .await?
1209 .map(|f| self.deserialize_value(&f))
1210 .transpose()?
1211 {
1212 events.push(RawAnySyncOrStrippedState::Stripped(event));
1213 }
1214 }
1215
1216 if !events.is_empty() {
1217 return Ok(events);
1218 }
1219 }
1220
1221 let txn = self
1222 .inner
1223 .transaction(keys::ROOM_STATE)
1224 .with_mode(TransactionMode::Readonly)
1225 .build()?;
1226 let store = txn.object_store(keys::ROOM_STATE)?;
1227
1228 for state_key in state_keys {
1229 if let Some(event) = store
1230 .get(&self.encode_key(keys::ROOM_STATE, (room_id, &event_type, state_key)))
1231 .await?
1232 .map(|f| self.deserialize_value(&f))
1233 .transpose()?
1234 {
1235 events.push(RawAnySyncOrStrippedState::Sync(event));
1236 }
1237 }
1238
1239 Ok(events)
1240 }
1241
1242 async fn get_profile(
1243 &self,
1244 room_id: &RoomId,
1245 user_id: &UserId,
1246 ) -> Result<Option<MinimalRoomMemberEvent>> {
1247 self.inner
1248 .transaction(keys::PROFILES)
1249 .with_mode(TransactionMode::Readonly)
1250 .build()?
1251 .object_store(keys::PROFILES)?
1252 .get(&self.encode_key(keys::PROFILES, (room_id, user_id)))
1253 .await?
1254 .map(|f| self.deserialize_value(&f))
1255 .transpose()
1256 }
1257
1258 async fn get_profiles<'a>(
1259 &self,
1260 room_id: &RoomId,
1261 user_ids: &'a [OwnedUserId],
1262 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1263 if user_ids.is_empty() {
1264 return Ok(BTreeMap::new());
1265 }
1266
1267 let txn =
1268 self.inner.transaction(keys::PROFILES).with_mode(TransactionMode::Readonly).build()?;
1269 let store = txn.object_store(keys::PROFILES)?;
1270
1271 let mut profiles = BTreeMap::new();
1272 for user_id in user_ids {
1273 if let Some(profile) = store
1274 .get(&self.encode_key(keys::PROFILES, (room_id, user_id)))
1275 .await?
1276 .map(|f| self.deserialize_value(&f))
1277 .transpose()?
1278 {
1279 profiles.insert(user_id.as_ref(), profile);
1280 }
1281 }
1282
1283 Ok(profiles)
1284 }
1285
1286 async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1287 let transaction = self
1288 .inner
1289 .transaction(keys::ROOM_INFOS)
1290 .with_mode(TransactionMode::Readonly)
1291 .build()?;
1292
1293 let object_store = transaction.object_store(keys::ROOM_INFOS)?;
1294
1295 Ok(match room_load_settings {
1296 RoomLoadSettings::All => object_store
1297 .get_all()
1298 .await?
1299 .map(|room_info| self.deserialize_value::<RoomInfo>(&room_info?))
1300 .collect::<Result<_>>()?,
1301
1302 RoomLoadSettings::One(room_id) => {
1303 match object_store.get(&self.encode_key(keys::ROOM_INFOS, room_id)).await? {
1304 Some(room_info) => vec![self.deserialize_value::<RoomInfo>(&room_info)?],
1305 None => vec![],
1306 }
1307 }
1308 })
1309 }
1310
1311 async fn get_users_with_display_name(
1312 &self,
1313 room_id: &RoomId,
1314 display_name: &DisplayName,
1315 ) -> Result<BTreeSet<OwnedUserId>> {
1316 self.inner
1317 .transaction(keys::DISPLAY_NAMES)
1318 .with_mode(TransactionMode::Readonly)
1319 .build()?
1320 .object_store(keys::DISPLAY_NAMES)?
1321 .get(&self.encode_key(
1322 keys::DISPLAY_NAMES,
1323 (
1324 room_id,
1325 display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1326 ),
1327 ))
1328 .await?
1329 .map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
1330 .unwrap_or_else(|| Ok(Default::default()))
1331 }
1332
1333 async fn get_users_with_display_names<'a>(
1334 &self,
1335 room_id: &RoomId,
1336 display_names: &'a [DisplayName],
1337 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1338 let mut map = HashMap::new();
1339
1340 if display_names.is_empty() {
1341 return Ok(map);
1342 }
1343
1344 let txn = self
1345 .inner
1346 .transaction(keys::DISPLAY_NAMES)
1347 .with_mode(TransactionMode::Readonly)
1348 .build()?;
1349 let store = txn.object_store(keys::DISPLAY_NAMES)?;
1350
1351 for display_name in display_names {
1352 if let Some(user_ids) = store
1353 .get(
1354 &self.encode_key(
1355 keys::DISPLAY_NAMES,
1356 (
1357 room_id,
1358 display_name
1359 .as_normalized_str()
1360 .unwrap_or_else(|| display_name.as_raw_str()),
1361 ),
1362 ),
1363 )
1364 .await?
1365 .map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
1366 .transpose()?
1367 {
1368 map.insert(display_name, user_ids);
1369 }
1370 }
1371
1372 Ok(map)
1373 }
1374
1375 async fn get_account_data_event(
1376 &self,
1377 event_type: GlobalAccountDataEventType,
1378 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1379 self.inner
1380 .transaction(keys::ACCOUNT_DATA)
1381 .with_mode(TransactionMode::Readonly)
1382 .build()?
1383 .object_store(keys::ACCOUNT_DATA)?
1384 .get(&self.encode_key(keys::ACCOUNT_DATA, event_type))
1385 .await?
1386 .map(|f| self.deserialize_value(&f))
1387 .transpose()
1388 }
1389
1390 async fn get_room_account_data_event(
1391 &self,
1392 room_id: &RoomId,
1393 event_type: RoomAccountDataEventType,
1394 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1395 self.inner
1396 .transaction(keys::ROOM_ACCOUNT_DATA)
1397 .with_mode(TransactionMode::Readonly)
1398 .build()?
1399 .object_store(keys::ROOM_ACCOUNT_DATA)?
1400 .get(&self.encode_key(keys::ROOM_ACCOUNT_DATA, (room_id, event_type)))
1401 .await?
1402 .map(|f| self.deserialize_value(&f))
1403 .transpose()
1404 }
1405
1406 async fn get_user_room_receipt_event(
1407 &self,
1408 room_id: &RoomId,
1409 receipt_type: ReceiptType,
1410 thread: ReceiptThread,
1411 user_id: &UserId,
1412 ) -> Result<Option<(OwnedEventId, Receipt)>> {
1413 let key = match thread.as_str() {
1414 Some(thread_id) => self
1415 .encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, thread_id, user_id)),
1416 None => self.encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, user_id)),
1417 };
1418 self.inner
1419 .transaction(keys::ROOM_USER_RECEIPTS)
1420 .with_mode(TransactionMode::Readonly)
1421 .build()?
1422 .object_store(keys::ROOM_USER_RECEIPTS)?
1423 .get(&key)
1424 .await?
1425 .map(|f| self.deserialize_value(&f))
1426 .transpose()
1427 }
1428
1429 async fn get_event_room_receipt_events(
1430 &self,
1431 room_id: &RoomId,
1432 receipt_type: ReceiptType,
1433 thread: ReceiptThread,
1434 event_id: &EventId,
1435 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1436 let range = match thread.as_str() {
1437 Some(thread_id) => self.encode_to_range(
1438 keys::ROOM_EVENT_RECEIPTS,
1439 (room_id, receipt_type, thread_id, event_id),
1440 ),
1441 None => {
1442 self.encode_to_range(keys::ROOM_EVENT_RECEIPTS, (room_id, receipt_type, event_id))
1443 }
1444 };
1445 let tx = self
1446 .inner
1447 .transaction(keys::ROOM_EVENT_RECEIPTS)
1448 .with_mode(TransactionMode::Readonly)
1449 .build()?;
1450 let store = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
1451
1452 Ok(store
1453 .get_all()
1454 .with_query(&range)
1455 .await?
1456 .filter_map(Result::ok)
1457 .filter_map(|f| self.deserialize_value(&f).ok())
1458 .collect::<Vec<_>>())
1459 }
1460
1461 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1462 let jskey = &JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1463 self.get_custom_value_for_js(jskey).await
1464 }
1465
1466 async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1467 let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1468
1469 let prev = self.get_custom_value_for_js(&jskey).await?;
1470
1471 let tx =
1472 self.inner.transaction(keys::CUSTOM).with_mode(TransactionMode::Readwrite).build()?;
1473
1474 tx.object_store(keys::CUSTOM)?
1475 .put(&self.serialize_value(&value)?)
1476 .with_key(jskey)
1477 .build()?;
1478
1479 tx.commit().await.map_err(IndexeddbStateStoreError::from)?;
1480 Ok(prev)
1481 }
1482
1483 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1484 let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1485
1486 let prev = self.get_custom_value_for_js(&jskey).await?;
1487
1488 let tx =
1489 self.inner.transaction(keys::CUSTOM).with_mode(TransactionMode::Readwrite).build()?;
1490
1491 tx.object_store(keys::CUSTOM)?.delete(&jskey).build()?;
1492
1493 tx.commit().await.map_err(IndexeddbStateStoreError::from)?;
1494 Ok(prev)
1495 }
1496
1497 async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1498 let direct_stores = [keys::ROOM_INFOS, keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE];
1500
1501 let prefixed_stores = [
1504 keys::PROFILES,
1505 keys::DISPLAY_NAMES,
1506 keys::USER_IDS,
1507 keys::ROOM_STATE,
1508 keys::ROOM_ACCOUNT_DATA,
1509 keys::ROOM_EVENT_RECEIPTS,
1510 keys::ROOM_USER_RECEIPTS,
1511 keys::STRIPPED_ROOM_STATE,
1512 keys::STRIPPED_USER_IDS,
1513 keys::THREAD_SUBSCRIPTIONS,
1514 ];
1515
1516 let all_stores = {
1517 let mut v = Vec::new();
1518 v.extend(prefixed_stores);
1519 v.extend(direct_stores);
1520 v
1521 };
1522
1523 let tx =
1524 self.inner.transaction(all_stores).with_mode(TransactionMode::Readwrite).build()?;
1525
1526 for store_name in direct_stores {
1527 tx.object_store(store_name)?.delete(&self.encode_key(store_name, room_id)).build()?;
1528 }
1529
1530 for store_name in prefixed_stores {
1531 let store = tx.object_store(store_name)?;
1532 let range = self.encode_to_range(store_name, room_id);
1533 for key in store.get_all_keys::<JsValue>().with_query(&range).await? {
1534 store.delete(&key?).build()?;
1535 }
1536 }
1537
1538 tx.commit().await.map_err(|e| e.into())
1539 }
1540
1541 async fn get_user_ids(
1542 &self,
1543 room_id: &RoomId,
1544 memberships: RoomMemberships,
1545 ) -> Result<Vec<OwnedUserId>> {
1546 let ids = self.get_user_ids_inner(room_id, memberships, true).await?;
1547 if !ids.is_empty() {
1548 return Ok(ids);
1549 }
1550 self.get_user_ids_inner(room_id, memberships, false).await
1551 }
1552
1553 async fn save_send_queue_request(
1554 &self,
1555 room_id: &RoomId,
1556 transaction_id: OwnedTransactionId,
1557 created_at: MilliSecondsSinceUnixEpoch,
1558 kind: QueuedRequestKind,
1559 priority: usize,
1560 ) -> Result<()> {
1561 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1562
1563 let tx = self
1564 .inner
1565 .transaction(keys::ROOM_SEND_QUEUE)
1566 .with_mode(TransactionMode::Readwrite)
1567 .build()?;
1568
1569 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1570
1571 let prev = obj.get(&encoded_key).await?;
1576
1577 let mut prev = prev.map_or_else(
1578 || Ok(Vec::new()),
1579 |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1580 )?;
1581
1582 prev.push(PersistedQueuedRequest {
1584 room_id: room_id.to_owned(),
1585 kind: Some(kind),
1586 transaction_id,
1587 error: None,
1588 is_wedged: None,
1589 event: None,
1590 priority: Some(priority),
1591 created_at,
1592 });
1593
1594 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1596
1597 tx.commit().await?;
1598
1599 Ok(())
1600 }
1601
1602 async fn update_send_queue_request(
1603 &self,
1604 room_id: &RoomId,
1605 transaction_id: &TransactionId,
1606 kind: QueuedRequestKind,
1607 ) -> Result<bool> {
1608 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1609
1610 let tx = self
1611 .inner
1612 .transaction(keys::ROOM_SEND_QUEUE)
1613 .with_mode(TransactionMode::Readwrite)
1614 .build()?;
1615
1616 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1617
1618 let prev = obj.get(&encoded_key).await?;
1623
1624 let mut prev = prev.map_or_else(
1625 || Ok(Vec::new()),
1626 |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1627 )?;
1628
1629 if let Some(entry) = prev.iter_mut().find(|entry| entry.transaction_id == transaction_id) {
1631 entry.kind = Some(kind);
1632 entry.error = None;
1634 entry.is_wedged = None;
1636 entry.event = None;
1637
1638 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1640 tx.commit().await?;
1641
1642 Ok(true)
1643 } else {
1644 Ok(false)
1645 }
1646 }
1647
1648 async fn remove_send_queue_request(
1649 &self,
1650 room_id: &RoomId,
1651 transaction_id: &TransactionId,
1652 ) -> Result<bool> {
1653 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1654
1655 let tx = self
1656 .inner
1657 .transaction([keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE])
1658 .with_mode(TransactionMode::Readwrite)
1659 .build()?;
1660
1661 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1662
1663 if let Some(val) = obj.get(&encoded_key).await? {
1668 let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
1669 if let Some(pos) = prev.iter().position(|item| item.transaction_id == transaction_id) {
1670 prev.remove(pos);
1671
1672 if prev.is_empty() {
1673 obj.delete(&encoded_key).build()?;
1674 } else {
1675 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1676 }
1677
1678 tx.commit().await?;
1679 return Ok(true);
1680 }
1681 }
1682
1683 Ok(false)
1684 }
1685
1686 async fn load_send_queue_requests(&self, room_id: &RoomId) -> Result<Vec<QueuedRequest>> {
1687 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1688
1689 let prev = self
1692 .inner
1693 .transaction(keys::ROOM_SEND_QUEUE)
1694 .with_mode(TransactionMode::Readwrite)
1695 .build()?
1696 .object_store(keys::ROOM_SEND_QUEUE)?
1697 .get(&encoded_key)
1698 .await?;
1699
1700 let mut prev = prev.map_or_else(
1701 || Ok(Vec::new()),
1702 |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1703 )?;
1704
1705 prev.sort_by(|lhs, rhs| rhs.priority.unwrap_or(0).cmp(&lhs.priority.unwrap_or(0)));
1707
1708 Ok(prev.into_iter().filter_map(PersistedQueuedRequest::into_queued_request).collect())
1709 }
1710
1711 async fn update_send_queue_request_status(
1712 &self,
1713 room_id: &RoomId,
1714 transaction_id: &TransactionId,
1715 error: Option<QueueWedgeError>,
1716 ) -> Result<()> {
1717 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1718
1719 let tx = self
1720 .inner
1721 .transaction(keys::ROOM_SEND_QUEUE)
1722 .with_mode(TransactionMode::Readwrite)
1723 .build()?;
1724
1725 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1726
1727 if let Some(val) = obj.get(&encoded_key).await? {
1728 let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
1729 if let Some(request) =
1730 prev.iter_mut().find(|item| item.transaction_id == transaction_id)
1731 {
1732 request.is_wedged = None;
1733 request.error = error;
1734 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1735 }
1736 }
1737
1738 tx.commit().await?;
1739
1740 Ok(())
1741 }
1742
1743 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>> {
1744 let tx = self
1745 .inner
1746 .transaction(keys::ROOM_SEND_QUEUE)
1747 .with_mode(TransactionMode::Readwrite)
1748 .build()?;
1749
1750 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1751
1752 let all_entries = obj
1753 .get_all()
1754 .await?
1755 .map(|item| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&item?))
1756 .collect::<Result<Vec<Vec<PersistedQueuedRequest>>, _>>()?
1757 .into_iter()
1758 .flat_map(|vec| vec.into_iter().map(|item| item.room_id))
1759 .collect::<BTreeSet<_>>();
1760
1761 Ok(all_entries.into_iter().collect())
1762 }
1763
1764 async fn save_dependent_queued_request(
1765 &self,
1766 room_id: &RoomId,
1767 parent_txn_id: &TransactionId,
1768 own_txn_id: ChildTransactionId,
1769 created_at: MilliSecondsSinceUnixEpoch,
1770 content: DependentQueuedRequestKind,
1771 ) -> Result<()> {
1772 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1773
1774 let tx = self
1775 .inner
1776 .transaction(keys::DEPENDENT_SEND_QUEUE)
1777 .with_mode(TransactionMode::Readwrite)
1778 .build()?;
1779
1780 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1781
1782 let prev = obj.get(&encoded_key).await?;
1785
1786 let mut prev = prev.map_or_else(
1787 || Ok(Vec::new()),
1788 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1789 )?;
1790
1791 prev.push(DependentQueuedRequest {
1793 kind: content,
1794 parent_transaction_id: parent_txn_id.to_owned(),
1795 own_transaction_id: own_txn_id,
1796 parent_key: None,
1797 created_at,
1798 });
1799
1800 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1802
1803 tx.commit().await?;
1804
1805 Ok(())
1806 }
1807
1808 async fn update_dependent_queued_request(
1809 &self,
1810 room_id: &RoomId,
1811 own_transaction_id: &ChildTransactionId,
1812 new_content: DependentQueuedRequestKind,
1813 ) -> Result<bool> {
1814 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1815
1816 let tx = self
1817 .inner
1818 .transaction(keys::DEPENDENT_SEND_QUEUE)
1819 .with_mode(TransactionMode::Readwrite)
1820 .build()?;
1821
1822 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1823
1824 let prev = obj.get(&encoded_key).await?;
1827
1828 let mut prev = prev.map_or_else(
1829 || Ok(Vec::new()),
1830 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1831 )?;
1832
1833 let mut found = false;
1835 for entry in prev.iter_mut() {
1836 if entry.own_transaction_id == *own_transaction_id {
1837 found = true;
1838 entry.kind = new_content;
1839 break;
1840 }
1841 }
1842
1843 if found {
1844 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1845 tx.commit().await?;
1846 }
1847
1848 Ok(found)
1849 }
1850
1851 async fn mark_dependent_queued_requests_as_ready(
1852 &self,
1853 room_id: &RoomId,
1854 parent_txn_id: &TransactionId,
1855 parent_key: SentRequestKey,
1856 ) -> Result<usize> {
1857 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1858
1859 let tx = self
1860 .inner
1861 .transaction(keys::DEPENDENT_SEND_QUEUE)
1862 .with_mode(TransactionMode::Readwrite)
1863 .build()?;
1864
1865 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1866
1867 let prev = obj.get(&encoded_key).await?;
1870
1871 let mut prev = prev.map_or_else(
1872 || Ok(Vec::new()),
1873 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1874 )?;
1875
1876 let mut num_updated = 0;
1878 for entry in prev.iter_mut().filter(|entry| entry.parent_transaction_id == parent_txn_id) {
1879 entry.parent_key = Some(parent_key.clone());
1880 num_updated += 1;
1881 }
1882
1883 if num_updated > 0 {
1884 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1885 tx.commit().await?;
1886 }
1887
1888 Ok(num_updated)
1889 }
1890
1891 async fn remove_dependent_queued_request(
1892 &self,
1893 room_id: &RoomId,
1894 txn_id: &ChildTransactionId,
1895 ) -> Result<bool> {
1896 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1897
1898 let tx = self
1899 .inner
1900 .transaction(keys::DEPENDENT_SEND_QUEUE)
1901 .with_mode(TransactionMode::Readwrite)
1902 .build()?;
1903
1904 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1905
1906 if let Some(val) = obj.get(&encoded_key).await? {
1909 let mut prev = self.deserialize_value::<Vec<DependentQueuedRequest>>(&val)?;
1910 if let Some(pos) = prev.iter().position(|item| item.own_transaction_id == *txn_id) {
1911 prev.remove(pos);
1912
1913 if prev.is_empty() {
1914 obj.delete(&encoded_key).build()?;
1915 } else {
1916 obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1917 }
1918
1919 tx.commit().await?;
1920 return Ok(true);
1921 }
1922 }
1923
1924 Ok(false)
1925 }
1926
1927 async fn load_dependent_queued_requests(
1928 &self,
1929 room_id: &RoomId,
1930 ) -> Result<Vec<DependentQueuedRequest>> {
1931 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1932
1933 let prev = self
1935 .inner
1936 .transaction(keys::DEPENDENT_SEND_QUEUE)
1937 .with_mode(TransactionMode::Readwrite)
1938 .build()?
1939 .object_store(keys::DEPENDENT_SEND_QUEUE)?
1940 .get(&encoded_key)
1941 .await?;
1942
1943 prev.map_or_else(
1944 || Ok(Vec::new()),
1945 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1946 )
1947 }
1948
1949 async fn upsert_thread_subscription(
1950 &self,
1951 room: &RoomId,
1952 thread_id: &EventId,
1953 subscription: StoredThreadSubscription,
1954 ) -> Result<()> {
1955 let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
1956
1957 let tx = self
1958 .inner
1959 .transaction(keys::THREAD_SUBSCRIPTIONS)
1960 .with_mode(TransactionMode::Readwrite)
1961 .build()?;
1962 let obj = tx.object_store(keys::THREAD_SUBSCRIPTIONS)?;
1963
1964 let mut new = PersistedThreadSubscription::from(subscription);
1965
1966 if let Some(previous_value) = obj.get(&encoded_key).await? {
1968 let previous: PersistedThreadSubscription = self.deserialize_value(&previous_value)?;
1969
1970 if new == previous {
1972 return Ok(());
1973 }
1974 if !compare_thread_subscription_bump_stamps(previous.bump_stamp, &mut new.bump_stamp) {
1975 return Ok(());
1976 }
1977 }
1978
1979 let serialized_value = self.serialize_value(&new);
1980 obj.put(&serialized_value?).with_key(encoded_key).build()?;
1981
1982 tx.commit().await?;
1983
1984 Ok(())
1985 }
1986
1987 async fn upsert_thread_subscriptions(
1988 &self,
1989 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1990 ) -> Result<()> {
1991 let tx = self
1992 .inner
1993 .transaction(keys::THREAD_SUBSCRIPTIONS)
1994 .with_mode(TransactionMode::Readwrite)
1995 .build()?;
1996 let obj = tx.object_store(keys::THREAD_SUBSCRIPTIONS)?;
1997
1998 for (room_id, thread_id, subscription) in updates {
1999 let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room_id, thread_id));
2000 let mut new = PersistedThreadSubscription::from(subscription);
2001
2002 if let Some(previous_value) = obj.get(&encoded_key).await? {
2004 let previous: PersistedThreadSubscription =
2005 self.deserialize_value(&previous_value)?;
2006
2007 if new == previous {
2009 continue;
2010 }
2011 if !compare_thread_subscription_bump_stamps(
2012 previous.bump_stamp,
2013 &mut new.bump_stamp,
2014 ) {
2015 continue;
2016 }
2017 }
2018
2019 let serialized_value = self.serialize_value(&new);
2020 obj.put(&serialized_value?).with_key(encoded_key).build()?;
2021 }
2022
2023 tx.commit().await?;
2024
2025 Ok(())
2026 }
2027
2028 async fn load_thread_subscription(
2029 &self,
2030 room: &RoomId,
2031 thread_id: &EventId,
2032 ) -> Result<Option<StoredThreadSubscription>> {
2033 let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
2034
2035 let js_value = self
2036 .inner
2037 .transaction(keys::THREAD_SUBSCRIPTIONS)
2038 .with_mode(TransactionMode::Readonly)
2039 .build()?
2040 .object_store(keys::THREAD_SUBSCRIPTIONS)?
2041 .get(&encoded_key)
2042 .await?;
2043
2044 let Some(js_value) = js_value else {
2045 return Ok(None);
2047 };
2048
2049 let sub: PersistedThreadSubscription = self.deserialize_value(&js_value)?;
2050
2051 let status = ThreadSubscriptionStatus::from_str(&sub.status).map_err(|_| {
2052 StoreError::InvalidData {
2053 details: format!(
2054 "invalid thread status for room {room} and thread {thread_id}: {}",
2055 sub.status
2056 ),
2057 }
2058 })?;
2059
2060 Ok(Some(StoredThreadSubscription { status, bump_stamp: sub.bump_stamp }))
2061 }
2062
2063 async fn remove_thread_subscription(&self, room: &RoomId, thread_id: &EventId) -> Result<()> {
2064 let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
2065
2066 let transaction = self
2067 .inner
2068 .transaction(keys::THREAD_SUBSCRIPTIONS)
2069 .with_mode(TransactionMode::Readwrite)
2070 .build()?;
2071 transaction.object_store(keys::THREAD_SUBSCRIPTIONS)?.delete(&encoded_key).await?;
2072 transaction.commit().await?;
2073
2074 Ok(())
2075 }
2076
2077 #[allow(clippy::unused_async)]
2078 async fn optimize(&self) -> Result<()> {
2079 Ok(())
2080 }
2081
2082 #[allow(clippy::unused_async)]
2083 async fn get_size(&self) -> Result<Option<usize>> {
2084 Ok(None)
2085 }
2086});
2087
2088#[derive(Debug, Serialize, Deserialize)]
2090struct RoomMember {
2091 user_id: OwnedUserId,
2092 membership: MembershipState,
2093}
2094
2095impl From<&SyncStateEvent<RoomMemberEventContent>> for RoomMember {
2096 fn from(event: &SyncStateEvent<RoomMemberEventContent>) -> Self {
2097 Self { user_id: event.state_key().clone(), membership: event.membership().clone() }
2098 }
2099}
2100
2101impl From<&StrippedRoomMemberEvent> for RoomMember {
2102 fn from(event: &StrippedRoomMemberEvent) -> Self {
2103 Self { user_id: event.state_key.clone(), membership: event.content.membership.clone() }
2104 }
2105}
2106
2107#[cfg(test)]
2108mod migration_tests {
2109 use assert_matches2::assert_matches;
2110 use matrix_sdk_base::store::{QueuedRequestKind, SerializableEventContent};
2111 use ruma::{
2112 OwnedRoomId, OwnedTransactionId, TransactionId,
2113 events::room::message::RoomMessageEventContent, room_id,
2114 };
2115 use serde::{Deserialize, Serialize};
2116
2117 use crate::state_store::PersistedQueuedRequest;
2118
2119 #[derive(Serialize, Deserialize)]
2120 struct OldPersistedQueuedRequest {
2121 room_id: OwnedRoomId,
2122 event: SerializableEventContent,
2123 transaction_id: OwnedTransactionId,
2124 is_wedged: bool,
2125 }
2126
2127 #[test]
2131 fn test_migrating_persisted_queue_event_serialization() {
2132 let room_a_id = room_id!("!room_a:dummy.local");
2133 let transaction_id = TransactionId::new();
2134 let content =
2135 SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())
2136 .unwrap();
2137
2138 let old_persisted_queue_event = OldPersistedQueuedRequest {
2139 room_id: room_a_id.to_owned(),
2140 event: content,
2141 transaction_id: transaction_id.clone(),
2142 is_wedged: true,
2143 };
2144
2145 let serialized_persisted = serde_json::to_vec(&old_persisted_queue_event).unwrap();
2146
2147 let new_persisted: PersistedQueuedRequest =
2149 serde_json::from_slice(&serialized_persisted).unwrap();
2150
2151 assert_eq!(new_persisted.is_wedged, Some(true));
2152 assert!(new_persisted.error.is_none());
2153
2154 assert!(new_persisted.event.is_some());
2155 assert!(new_persisted.kind.is_none());
2156
2157 let queued = new_persisted.into_queued_request().unwrap();
2158 assert_matches!(queued.kind, QueuedRequestKind::Event { .. });
2159 assert_eq!(queued.transaction_id, transaction_id);
2160 assert!(queued.error.is_some());
2161 }
2162}
2163
2164#[cfg(all(test, target_family = "wasm"))]
2165mod tests {
2166 #[cfg(target_family = "wasm")]
2167 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
2168
2169 use matrix_sdk_base::statestore_integration_tests;
2170 use uuid::Uuid;
2171
2172 use super::{IndexeddbStateStore, Result};
2173
2174 async fn get_store() -> Result<IndexeddbStateStore> {
2175 let db_name = format!("test-state-plain-{}", Uuid::new_v4().as_hyphenated());
2176 Ok(IndexeddbStateStore::builder().name(db_name).build().await?)
2177 }
2178
2179 statestore_integration_tests!();
2180}
2181
2182#[cfg(all(test, target_family = "wasm"))]
2183mod encrypted_tests {
2184 #[cfg(target_family = "wasm")]
2185 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
2186
2187 use matrix_sdk_base::statestore_integration_tests;
2188 use uuid::Uuid;
2189
2190 use super::{IndexeddbStateStore, Result};
2191
2192 async fn get_store() -> Result<IndexeddbStateStore> {
2193 let db_name = format!("test-state-encrypted-{}", Uuid::new_v4().as_hyphenated());
2194 let passphrase = format!("some_passphrase-{}", Uuid::new_v4().as_hyphenated());
2195 Ok(IndexeddbStateStore::builder().name(db_name).passphrase(passphrase).build().await?)
2196 }
2197
2198 statestore_integration_tests!();
2199}