Skip to main content

matrix_sdk_indexeddb/state_store/
mod.rs

1// Copyright 2021 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    cmp::Reverse,
17    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
18    str::FromStr as _,
19    sync::Arc,
20};
21
22use async_trait::async_trait;
23use gloo_utils::format::JsValueSerdeExt;
24use growable_bloom_filter::GrowableBloom;
25use indexed_db_futures::{
26    KeyRange, cursor::CursorDirection, database::Database, error::OpenDbError, prelude::*,
27    transaction::TransactionMode,
28};
29use matrix_sdk_base::{
30    MinimalRoomMemberEvent, ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK, RoomInfo,
31    RoomMemberships, StateStoreDataKey, StateStoreDataValue, ThreadSubscriptionCatchupToken,
32    deserialized_responses::{DisplayName, RawAnySyncOrStrippedState},
33    store::{
34        ChildTransactionId, ComposerDraft, DependentQueuedRequest, DependentQueuedRequestKind,
35        QueuedRequest, QueuedRequestKind, RoomLoadSettings, SentRequestKey,
36        SerializableEventContent, StateChanges, StateStore, StoreError, StoredThreadSubscription,
37        SupportedVersionsResponse, ThreadSubscriptionStatus, TtlStoreValue, WellKnownResponse,
38        compare_thread_subscription_bump_stamps,
39    },
40};
41use matrix_sdk_store_encryption::{Error as EncryptionError, StoreCipher};
42use ruma::{
43    CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri,
44    OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
45    canonical_json::{RedactedBecause, redact},
46    events::{
47        AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
48        GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, SyncStateEvent,
49        presence::PresenceEvent,
50        receipt::{Receipt, ReceiptThread, ReceiptType},
51        room::member::{
52            MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent, SyncRoomMemberEvent,
53        },
54    },
55    serde::Raw,
56};
57use serde::{Deserialize, Serialize, de::DeserializeOwned, ser::Error};
58use tracing::{debug, warn};
59use wasm_bindgen::JsValue;
60
61mod migrations;
62
63pub use self::migrations::MigrationConflictStrategy;
64use self::migrations::{upgrade_inner_db, upgrade_meta_db};
65use crate::{error::GenericError, serializer::safe_encode::traits::SafeEncode};
66
67#[derive(Debug, thiserror::Error)]
68pub enum IndexeddbStateStoreError {
69    #[error(transparent)]
70    Json(#[from] serde_json::Error),
71    #[error(transparent)]
72    Encryption(#[from] EncryptionError),
73    #[error("DomException {name} ({code}): {message}")]
74    DomException { name: String, message: String, code: u16 },
75    #[error(transparent)]
76    StoreError(#[from] StoreError),
77    #[error(
78        "Can't migrate {name} from {old_version} to {new_version} without deleting data. \
79         See MigrationConflictStrategy for ways to configure."
80    )]
81    MigrationConflict { name: String, old_version: u32, new_version: u32 },
82}
83
84impl From<GenericError> for IndexeddbStateStoreError {
85    fn from(value: GenericError) -> Self {
86        Self::StoreError(value.into())
87    }
88}
89
90impl From<web_sys::DomException> for IndexeddbStateStoreError {
91    fn from(frm: web_sys::DomException) -> IndexeddbStateStoreError {
92        IndexeddbStateStoreError::DomException {
93            name: frm.name(),
94            message: frm.message(),
95            code: frm.code(),
96        }
97    }
98}
99
100impl From<IndexeddbStateStoreError> for StoreError {
101    fn from(e: IndexeddbStateStoreError) -> Self {
102        match e {
103            IndexeddbStateStoreError::Json(e) => StoreError::Json(e),
104            IndexeddbStateStoreError::StoreError(e) => e,
105            IndexeddbStateStoreError::Encryption(e) => StoreError::Encryption(e),
106            _ => StoreError::backend(e),
107        }
108    }
109}
110
111impl From<indexed_db_futures::error::DomException> for IndexeddbStateStoreError {
112    fn from(value: indexed_db_futures::error::DomException) -> Self {
113        web_sys::DomException::from(value).into()
114    }
115}
116
117impl From<indexed_db_futures::error::SerialisationError> for IndexeddbStateStoreError {
118    fn from(value: indexed_db_futures::error::SerialisationError) -> Self {
119        Self::Json(serde_json::Error::custom(value.to_string()))
120    }
121}
122
123impl From<indexed_db_futures::error::UnexpectedDataError> for IndexeddbStateStoreError {
124    fn from(value: indexed_db_futures::error::UnexpectedDataError) -> Self {
125        IndexeddbStateStoreError::StoreError(StoreError::backend(value))
126    }
127}
128
129impl From<indexed_db_futures::error::JSError> for IndexeddbStateStoreError {
130    fn from(value: indexed_db_futures::error::JSError) -> Self {
131        GenericError::from(value.to_string()).into()
132    }
133}
134
135impl From<indexed_db_futures::error::Error> for IndexeddbStateStoreError {
136    fn from(value: indexed_db_futures::error::Error) -> Self {
137        use indexed_db_futures::error::Error;
138        match value {
139            Error::DomException(e) => e.into(),
140            Error::Serialisation(e) => e.into(),
141            Error::MissingData(e) => e.into(),
142            Error::Unknown(e) => e.into(),
143        }
144    }
145}
146
147impl From<OpenDbError> for IndexeddbStateStoreError {
148    fn from(value: OpenDbError) -> Self {
149        match value {
150            OpenDbError::Base(error) => error.into(),
151            _ => GenericError::from(value.to_string()).into(),
152        }
153    }
154}
155
156mod keys {
157    pub const INTERNAL_STATE: &str = "matrix-sdk-state";
158    pub const BACKUPS_META: &str = "backups";
159
160    pub const ACCOUNT_DATA: &str = "account_data";
161
162    pub const PROFILES: &str = "profiles";
163    pub const DISPLAY_NAMES: &str = "display_names";
164    pub const USER_IDS: &str = "user_ids";
165
166    pub const ROOM_STATE: &str = "room_state";
167    pub const ROOM_INFOS: &str = "room_infos";
168    pub const PRESENCE: &str = "presence";
169    pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
170    /// Table used to save send queue events.
171    pub const ROOM_SEND_QUEUE: &str = "room_send_queue";
172    /// Table used to save dependent send queue events.
173    pub const DEPENDENT_SEND_QUEUE: &str = "room_dependent_send_queue";
174    pub const THREAD_SUBSCRIPTIONS: &str = "room_thread_subscriptions";
175
176    pub const STRIPPED_ROOM_STATE: &str = "stripped_room_state";
177    pub const STRIPPED_USER_IDS: &str = "stripped_user_ids";
178
179    pub const ROOM_USER_RECEIPTS: &str = "room_user_receipts";
180    pub const ROOM_EVENT_RECEIPTS: &str = "room_event_receipts";
181
182    pub const CUSTOM: &str = "custom";
183    pub const KV: &str = "kv";
184
185    /// All names of the current state stores for convenience.
186    pub const ALL_STORES: &[&str] = &[
187        ACCOUNT_DATA,
188        PROFILES,
189        DISPLAY_NAMES,
190        USER_IDS,
191        ROOM_STATE,
192        ROOM_INFOS,
193        PRESENCE,
194        ROOM_ACCOUNT_DATA,
195        STRIPPED_ROOM_STATE,
196        STRIPPED_USER_IDS,
197        ROOM_USER_RECEIPTS,
198        ROOM_EVENT_RECEIPTS,
199        ROOM_SEND_QUEUE,
200        THREAD_SUBSCRIPTIONS,
201        DEPENDENT_SEND_QUEUE,
202        CUSTOM,
203        KV,
204    ];
205
206    // static keys
207
208    pub const STORE_KEY: &str = "store_key";
209}
210
211pub use keys::ALL_STORES;
212use matrix_sdk_base::store::QueueWedgeError;
213
214/// Encrypt (if needs be) then JSON-serialize a value.
215fn serialize_value(store_cipher: Option<&StoreCipher>, event: &impl Serialize) -> Result<JsValue> {
216    Ok(match store_cipher {
217        Some(cipher) => {
218            let data = serde_json::to_vec(event)?;
219            JsValue::from_serde(&cipher.encrypt_value_data(data)?)?
220        }
221        None => JsValue::from_serde(event)?,
222    })
223}
224
225/// Deserialize a JSON value and then decrypt it (if needs be).
226fn deserialize_value<T: DeserializeOwned>(
227    store_cipher: Option<&StoreCipher>,
228    event: &JsValue,
229) -> Result<T> {
230    match store_cipher {
231        Some(cipher) => {
232            use zeroize::Zeroize;
233            let mut plaintext = cipher.decrypt_value_data(event.into_serde()?)?;
234            let ret = serde_json::from_slice(&plaintext);
235            plaintext.zeroize();
236            Ok(ret?)
237        }
238        None => Ok(event.into_serde()?),
239    }
240}
241
242fn encode_key<T>(store_cipher: Option<&StoreCipher>, table_name: &str, key: T) -> JsValue
243where
244    T: SafeEncode,
245{
246    match store_cipher {
247        Some(cipher) => key.as_secure_string(table_name, cipher),
248        None => key.as_encoded_string(),
249    }
250    .into()
251}
252
253fn encode_to_range<T>(
254    store_cipher: Option<&StoreCipher>,
255    table_name: &str,
256    key: T,
257) -> KeyRange<JsValue>
258where
259    T: SafeEncode,
260{
261    match store_cipher {
262        Some(cipher) => key.encode_to_range_secure(table_name, cipher),
263        None => key.encode_to_range(),
264    }
265}
266
267/// Builder for [`IndexeddbStateStore`].
268#[derive(Debug)]
269pub struct IndexeddbStateStoreBuilder {
270    name: Option<String>,
271    passphrase: Option<String>,
272    migration_conflict_strategy: MigrationConflictStrategy,
273}
274
275impl IndexeddbStateStoreBuilder {
276    fn new() -> Self {
277        Self {
278            name: None,
279            passphrase: None,
280            migration_conflict_strategy: MigrationConflictStrategy::BackupAndDrop,
281        }
282    }
283
284    /// Set the name for the indexeddb store to use, `state` is none given.
285    pub fn name(mut self, value: String) -> Self {
286        self.name = Some(value);
287        self
288    }
289
290    /// Set the password the indexeddb should be encrypted with.
291    ///
292    /// If not given, the DB is not encrypted.
293    pub fn passphrase(mut self, value: String) -> Self {
294        self.passphrase = Some(value);
295        self
296    }
297
298    /// The strategy to use when a merge conflict is found.
299    ///
300    /// See [`MigrationConflictStrategy`] for details.
301    pub fn migration_conflict_strategy(mut self, value: MigrationConflictStrategy) -> Self {
302        self.migration_conflict_strategy = value;
303        self
304    }
305
306    pub async fn build(self) -> Result<IndexeddbStateStore> {
307        let migration_strategy = self.migration_conflict_strategy.clone();
308        let name = self.name.unwrap_or_else(|| "state".to_owned());
309
310        let meta_name = format!("{name}::{}", keys::INTERNAL_STATE);
311
312        let (meta, store_cipher) = upgrade_meta_db(&meta_name, self.passphrase.as_deref()).await?;
313        let inner =
314            upgrade_inner_db(&name, store_cipher.as_deref(), migration_strategy, &meta).await?;
315
316        Ok(IndexeddbStateStore { name, inner, meta, store_cipher })
317    }
318}
319
320pub struct IndexeddbStateStore {
321    name: String,
322    pub(crate) inner: Database,
323    pub(crate) meta: Database,
324    pub(crate) store_cipher: Option<Arc<StoreCipher>>,
325}
326
327#[cfg(not(tarpaulin_include))]
328impl std::fmt::Debug for IndexeddbStateStore {
329    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
330        f.debug_struct("IndexeddbStateStore").field("name", &self.name).finish()
331    }
332}
333
334type Result<A, E = IndexeddbStateStoreError> = std::result::Result<A, E>;
335
336impl IndexeddbStateStore {
337    /// Generate a IndexeddbStateStoreBuilder with default parameters
338    pub fn builder() -> IndexeddbStateStoreBuilder {
339        IndexeddbStateStoreBuilder::new()
340    }
341
342    /// The version of the database containing the data.
343    pub fn version(&self) -> u32 {
344        self.inner.version() as u32
345    }
346
347    /// The version of the database containing the metadata.
348    pub fn meta_version(&self) -> u32 {
349        self.meta.version() as u32
350    }
351
352    /// Whether this database has any migration backups
353    pub async fn has_backups(&self) -> Result<bool> {
354        Ok(self
355            .meta
356            .transaction(keys::BACKUPS_META)
357            .with_mode(TransactionMode::Readonly)
358            .build()?
359            .object_store(keys::BACKUPS_META)?
360            .count()
361            .await?
362            > 0)
363    }
364
365    /// What's the database name of the latest backup<
366    pub async fn latest_backup(&self) -> Result<Option<String>> {
367        if let Some(mut cursor) = self
368            .meta
369            .transaction(keys::BACKUPS_META)
370            .with_mode(TransactionMode::Readonly)
371            .build()?
372            .object_store(keys::BACKUPS_META)?
373            .open_cursor()
374            .with_direction(CursorDirection::Prev)
375            .await?
376            && let Some(record) = cursor.next_record::<JsValue>().await?
377        {
378            Ok(record.as_string())
379        } else {
380            Ok(None)
381        }
382    }
383
384    /// Encrypt (if needs be) then JSON-serialize a value.
385    fn serialize_value(&self, event: &impl Serialize) -> Result<JsValue> {
386        serialize_value(self.store_cipher.as_deref(), event)
387    }
388
389    /// Deserialize a JSON value and then decrypt it (if needs be).
390    fn deserialize_value<T: DeserializeOwned>(&self, event: &JsValue) -> Result<T> {
391        deserialize_value(self.store_cipher.as_deref(), event)
392    }
393
394    fn encode_key<T>(&self, table_name: &str, key: T) -> JsValue
395    where
396        T: SafeEncode,
397    {
398        encode_key(self.store_cipher.as_deref(), table_name, key)
399    }
400
401    fn encode_to_range<T>(&self, table_name: &str, key: T) -> KeyRange<JsValue>
402    where
403        T: SafeEncode,
404    {
405        encode_to_range(self.store_cipher.as_deref(), table_name, key)
406    }
407
408    /// Get user IDs for the given room with the given memberships and stripped
409    /// state.
410    pub async fn get_user_ids_inner(
411        &self,
412        room_id: &RoomId,
413        memberships: RoomMemberships,
414        stripped: bool,
415    ) -> Result<Vec<OwnedUserId>> {
416        let store_name = if stripped { keys::STRIPPED_USER_IDS } else { keys::USER_IDS };
417
418        let tx = self.inner.transaction(store_name).with_mode(TransactionMode::Readonly).build()?;
419        let store = tx.object_store(store_name)?;
420        let range = self.encode_to_range(store_name, room_id);
421
422        let user_ids = if memberships.is_empty() {
423            // It should be faster to just get all user IDs in this case.
424            store
425                .get_all()
426                .with_query(&range)
427                .await?
428                .filter_map(Result::ok)
429                .filter_map(|f| self.deserialize_value::<RoomMember>(&f).ok().map(|m| m.user_id))
430                .collect::<Vec<_>>()
431        } else {
432            let mut user_ids = Vec::new();
433            let cursor = store.open_cursor().with_query(&range).await?;
434
435            if let Some(mut cursor) = cursor {
436                while let Some(value) = cursor.next_record().await? {
437                    let member = self.deserialize_value::<RoomMember>(&value)?;
438
439                    if memberships.matches(&member.membership) {
440                        user_ids.push(member.user_id);
441                    }
442                }
443            }
444
445            user_ids
446        };
447
448        Ok(user_ids)
449    }
450
451    async fn get_custom_value_for_js(&self, jskey: &JsValue) -> Result<Option<Vec<u8>>> {
452        self.inner
453            .transaction(keys::CUSTOM)
454            .with_mode(TransactionMode::Readonly)
455            .build()?
456            .object_store(keys::CUSTOM)?
457            .get(jskey)
458            .await?
459            .map(|f| self.deserialize_value(&f))
460            .transpose()
461    }
462
463    fn encode_kv_data_key(&self, key: StateStoreDataKey<'_>) -> JsValue {
464        // Use the key (prefix) for the table name as well, to keep encoded
465        // keys compatible for the sync token and filters, which were in
466        // separate tables initially.
467        match key {
468            StateStoreDataKey::SyncToken => {
469                self.encode_key(StateStoreDataKey::SYNC_TOKEN, StateStoreDataKey::SYNC_TOKEN)
470            }
471            StateStoreDataKey::SupportedVersions => self.encode_key(
472                StateStoreDataKey::SUPPORTED_VERSIONS,
473                StateStoreDataKey::SUPPORTED_VERSIONS,
474            ),
475            StateStoreDataKey::WellKnown => {
476                self.encode_key(keys::KV, StateStoreDataKey::WELL_KNOWN)
477            }
478            StateStoreDataKey::Filter(filter_name) => {
479                self.encode_key(StateStoreDataKey::FILTER, (StateStoreDataKey::FILTER, filter_name))
480            }
481            StateStoreDataKey::UserAvatarUrl(user_id) => {
482                self.encode_key(keys::KV, (StateStoreDataKey::USER_AVATAR_URL, user_id))
483            }
484            StateStoreDataKey::RecentlyVisitedRooms(user_id) => {
485                self.encode_key(keys::KV, (StateStoreDataKey::RECENTLY_VISITED_ROOMS, user_id))
486            }
487            StateStoreDataKey::UtdHookManagerData => {
488                self.encode_key(keys::KV, StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
489            }
490            StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
491                self.encode_key(keys::KV, StateStoreDataKey::ONE_TIME_KEY_ALREADY_UPLOADED)
492            }
493            StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
494                if let Some(thread_root) = thread_root {
495                    self.encode_key(
496                        keys::KV,
497                        (StateStoreDataKey::COMPOSER_DRAFT, (room_id, thread_root)),
498                    )
499                } else {
500                    self.encode_key(keys::KV, (StateStoreDataKey::COMPOSER_DRAFT, room_id))
501                }
502            }
503            StateStoreDataKey::SeenKnockRequests(room_id) => {
504                self.encode_key(keys::KV, (StateStoreDataKey::SEEN_KNOCK_REQUESTS, room_id))
505            }
506            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
507                self.encode_key(keys::KV, StateStoreDataKey::THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS)
508            }
509        }
510    }
511}
512
513/// A superset of [`QueuedRequest`] that also contains the room id, since we
514/// want to return them.
515#[derive(Serialize, Deserialize)]
516struct PersistedQueuedRequest {
517    /// In which room is this event going to be sent.
518    pub room_id: OwnedRoomId,
519
520    // All these fields are the same as in [`QueuedRequest`].
521    /// Kind. Optional because it might be missing from previous formats.
522    kind: Option<QueuedRequestKind>,
523    transaction_id: OwnedTransactionId,
524
525    pub error: Option<QueueWedgeError>,
526
527    priority: Option<usize>,
528
529    /// The time the original message was first attempted to be sent at.
530    #[serde(default = "created_now")]
531    created_at: MilliSecondsSinceUnixEpoch,
532
533    // Migrated fields: keep these private, they're not used anymore elsewhere in the code base.
534    /// Deprecated (from old format), now replaced with error field.
535    is_wedged: Option<bool>,
536
537    event: Option<SerializableEventContent>,
538}
539
540fn created_now() -> MilliSecondsSinceUnixEpoch {
541    MilliSecondsSinceUnixEpoch::now()
542}
543
544impl PersistedQueuedRequest {
545    fn into_queued_request(self) -> Option<QueuedRequest> {
546        let kind =
547            self.kind.or_else(|| self.event.map(|content| QueuedRequestKind::Event { content }))?;
548
549        let error = match self.is_wedged {
550            Some(true) => {
551                // Migrate to a generic error.
552                Some(QueueWedgeError::GenericApiError {
553                    msg: "local echo failed to send in a previous session".into(),
554                })
555            }
556            _ => self.error,
557        };
558
559        // By default, events without a priority have a priority of 0.
560        let priority = self.priority.unwrap_or(0);
561
562        Some(QueuedRequest {
563            kind,
564            transaction_id: self.transaction_id,
565            error,
566            priority,
567            created_at: self.created_at,
568        })
569    }
570}
571
572#[derive(Serialize, Deserialize, PartialEq)]
573struct PersistedThreadSubscription {
574    status: String,
575    bump_stamp: Option<u64>,
576}
577
578impl From<StoredThreadSubscription> for PersistedThreadSubscription {
579    fn from(value: StoredThreadSubscription) -> Self {
580        Self { status: value.status.as_str().to_owned(), bump_stamp: value.bump_stamp }
581    }
582}
583
584// Small hack to have the following macro invocation act as the appropriate
585// trait impl block on wasm, but still be compiled on non-wasm as a regular
586// impl block otherwise.
587//
588// The trait impl doesn't compile on non-wasm due to unfulfilled trait bounds,
589// this hack allows us to still have most of rust-analyzer's IDE functionality
590// within the impl block without having to set it up to check things against
591// the wasm target (which would disable many other parts of the codebase).
592#[cfg(target_family = "wasm")]
593macro_rules! impl_state_store {
594    ({ $($body:tt)* }) => {
595        #[async_trait(?Send)]
596        impl StateStore for IndexeddbStateStore {
597            type Error = IndexeddbStateStoreError;
598
599            $($body)*
600        }
601    };
602}
603
604#[cfg(not(target_family = "wasm"))]
605macro_rules! impl_state_store {
606    ({ $($body:tt)* }) => {
607        impl IndexeddbStateStore {
608            $($body)*
609        }
610    };
611}
612
613impl_state_store!({
614    async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
615        let encoded_key = self.encode_kv_data_key(key);
616
617        let value = self
618            .inner
619            .transaction(keys::KV)
620            .with_mode(TransactionMode::Readonly)
621            .build()?
622            .object_store(keys::KV)?
623            .get(&encoded_key)
624            .await?;
625
626        let value = match key {
627            StateStoreDataKey::SyncToken => value
628                .map(|f| self.deserialize_value::<String>(&f))
629                .transpose()?
630                .map(StateStoreDataValue::SyncToken),
631            StateStoreDataKey::SupportedVersions => value
632                .map(|f| self.deserialize_value::<TtlStoreValue<SupportedVersionsResponse>>(&f))
633                .transpose()?
634                .map(StateStoreDataValue::SupportedVersions),
635            StateStoreDataKey::WellKnown => value
636                .map(|f| self.deserialize_value::<TtlStoreValue<Option<WellKnownResponse>>>(&f))
637                .transpose()?
638                .map(StateStoreDataValue::WellKnown),
639            StateStoreDataKey::Filter(_) => value
640                .map(|f| self.deserialize_value::<String>(&f))
641                .transpose()?
642                .map(StateStoreDataValue::Filter),
643            StateStoreDataKey::UserAvatarUrl(_) => value
644                .map(|f| self.deserialize_value::<OwnedMxcUri>(&f))
645                .transpose()?
646                .map(StateStoreDataValue::UserAvatarUrl),
647            StateStoreDataKey::RecentlyVisitedRooms(_) => value
648                .map(|f| self.deserialize_value::<Vec<OwnedRoomId>>(&f))
649                .transpose()?
650                .map(StateStoreDataValue::RecentlyVisitedRooms),
651            StateStoreDataKey::UtdHookManagerData => value
652                .map(|f| self.deserialize_value::<GrowableBloom>(&f))
653                .transpose()?
654                .map(StateStoreDataValue::UtdHookManagerData),
655            StateStoreDataKey::OneTimeKeyAlreadyUploaded => value
656                .map(|f| self.deserialize_value::<bool>(&f))
657                .transpose()?
658                .map(|_| StateStoreDataValue::OneTimeKeyAlreadyUploaded),
659            StateStoreDataKey::ComposerDraft(_, _) => value
660                .map(|f| self.deserialize_value::<ComposerDraft>(&f))
661                .transpose()?
662                .map(StateStoreDataValue::ComposerDraft),
663            StateStoreDataKey::SeenKnockRequests(_) => value
664                .map(|f| self.deserialize_value::<BTreeMap<OwnedEventId, OwnedUserId>>(&f))
665                .transpose()?
666                .map(StateStoreDataValue::SeenKnockRequests),
667            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => value
668                .map(|f| self.deserialize_value::<Vec<ThreadSubscriptionCatchupToken>>(&f))
669                .transpose()?
670                .map(StateStoreDataValue::ThreadSubscriptionsCatchupTokens),
671        };
672
673        Ok(value)
674    }
675
676    async fn set_kv_data(
677        &self,
678        key: StateStoreDataKey<'_>,
679        value: StateStoreDataValue,
680    ) -> Result<()> {
681        let encoded_key = self.encode_kv_data_key(key);
682
683        let serialized_value = match key {
684            StateStoreDataKey::SyncToken => self
685                .serialize_value(&value.into_sync_token().expect("Session data not a sync token")),
686            StateStoreDataKey::SupportedVersions => self.serialize_value(
687                &value
688                    .into_supported_versions()
689                    .expect("Session data not containing supported versions"),
690            ),
691            StateStoreDataKey::WellKnown => self.serialize_value(
692                &value.into_well_known().expect("Session data not containing well-known"),
693            ),
694            StateStoreDataKey::Filter(_) => {
695                self.serialize_value(&value.into_filter().expect("Session data not a filter"))
696            }
697            StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
698                &value.into_user_avatar_url().expect("Session data not an user avatar url"),
699            ),
700            StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
701                &value
702                    .into_recently_visited_rooms()
703                    .expect("Session data not a recently visited room list"),
704            ),
705            StateStoreDataKey::UtdHookManagerData => self.serialize_value(
706                &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
707            ),
708            StateStoreDataKey::OneTimeKeyAlreadyUploaded => self.serialize_value(&true),
709            StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
710                &value.into_composer_draft().expect("Session data not a composer draft"),
711            ),
712            StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
713                &value
714                    .into_seen_knock_requests()
715                    .expect("Session data is not a set of seen knock request ids"),
716            ),
717            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => self.serialize_value(
718                &value
719                    .into_thread_subscriptions_catchup_tokens()
720                    .expect("Session data is not a list of thread subscription catchup tokens"),
721            ),
722        };
723
724        let tx = self.inner.transaction(keys::KV).with_mode(TransactionMode::Readwrite).build()?;
725
726        let obj = tx.object_store(keys::KV)?;
727
728        obj.put(&serialized_value?).with_key(encoded_key).build()?;
729
730        tx.commit().await?;
731
732        Ok(())
733    }
734
735    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
736        let encoded_key = self.encode_kv_data_key(key);
737
738        let tx = self.inner.transaction(keys::KV).with_mode(TransactionMode::Readwrite).build()?;
739        let obj = tx.object_store(keys::KV)?;
740
741        obj.delete(&encoded_key).build()?;
742
743        tx.commit().await?;
744
745        Ok(())
746    }
747
748    async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
749        let mut stores: HashSet<&'static str> = [
750            (changes.sync_token.is_some(), keys::KV),
751            (!changes.ambiguity_maps.is_empty(), keys::DISPLAY_NAMES),
752            (!changes.account_data.is_empty(), keys::ACCOUNT_DATA),
753            (!changes.presence.is_empty(), keys::PRESENCE),
754            (
755                !changes.profiles.is_empty() || !changes.profiles_to_delete.is_empty(),
756                keys::PROFILES,
757            ),
758            (!changes.room_account_data.is_empty(), keys::ROOM_ACCOUNT_DATA),
759            (!changes.receipts.is_empty(), keys::ROOM_EVENT_RECEIPTS),
760        ]
761        .iter()
762        .filter_map(|(id, key)| if *id { Some(*key) } else { None })
763        .collect();
764
765        if !changes.state.is_empty() {
766            stores.extend([
767                keys::ROOM_STATE,
768                keys::USER_IDS,
769                keys::STRIPPED_USER_IDS,
770                keys::STRIPPED_ROOM_STATE,
771                keys::PROFILES,
772            ]);
773        }
774
775        if !changes.redactions.is_empty() {
776            stores.extend([keys::ROOM_STATE, keys::ROOM_INFOS]);
777        }
778
779        if !changes.room_infos.is_empty() {
780            stores.insert(keys::ROOM_INFOS);
781        }
782
783        if !changes.stripped_state.is_empty() {
784            stores.extend([keys::STRIPPED_ROOM_STATE, keys::STRIPPED_USER_IDS]);
785        }
786
787        if !changes.receipts.is_empty() {
788            stores.extend([keys::ROOM_EVENT_RECEIPTS, keys::ROOM_USER_RECEIPTS])
789        }
790
791        if stores.is_empty() {
792            // nothing to do, quit early
793            return Ok(());
794        }
795
796        let stores: Vec<&'static str> = stores.into_iter().collect();
797        let tx = self.inner.transaction(stores).with_mode(TransactionMode::Readwrite).build()?;
798
799        if let Some(s) = &changes.sync_token {
800            tx.object_store(keys::KV)?
801                .put(&self.serialize_value(s)?)
802                .with_key(self.encode_kv_data_key(StateStoreDataKey::SyncToken))
803                .build()?;
804        }
805
806        if !changes.ambiguity_maps.is_empty() {
807            let store = tx.object_store(keys::DISPLAY_NAMES)?;
808            for (room_id, ambiguity_maps) in &changes.ambiguity_maps {
809                for (display_name, map) in ambiguity_maps {
810                    let key = self.encode_key(
811                        keys::DISPLAY_NAMES,
812                        (
813                            room_id,
814                            display_name
815                                .as_normalized_str()
816                                .unwrap_or_else(|| display_name.as_raw_str()),
817                        ),
818                    );
819
820                    store.put(&self.serialize_value(&map)?).with_key(key).build()?;
821                }
822            }
823        }
824
825        if !changes.account_data.is_empty() {
826            let store = tx.object_store(keys::ACCOUNT_DATA)?;
827            for (event_type, event) in &changes.account_data {
828                store
829                    .put(&self.serialize_value(&event)?)
830                    .with_key(self.encode_key(keys::ACCOUNT_DATA, event_type))
831                    .build()?;
832            }
833        }
834
835        if !changes.room_account_data.is_empty() {
836            let store = tx.object_store(keys::ROOM_ACCOUNT_DATA)?;
837            for (room, events) in &changes.room_account_data {
838                for (event_type, event) in events {
839                    let key = self.encode_key(keys::ROOM_ACCOUNT_DATA, (room, event_type));
840                    store.put(&self.serialize_value(&event)?).with_key(key).build()?;
841                }
842            }
843        }
844
845        if !changes.state.is_empty() {
846            let state = tx.object_store(keys::ROOM_STATE)?;
847            let profiles = tx.object_store(keys::PROFILES)?;
848            let user_ids = tx.object_store(keys::USER_IDS)?;
849            let stripped_state = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
850            let stripped_user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
851
852            for (room, user_ids) in &changes.profiles_to_delete {
853                for user_id in user_ids {
854                    let key = self.encode_key(keys::PROFILES, (room, user_id));
855                    profiles.delete(&key).build()?;
856                }
857            }
858
859            for (room, event_types) in &changes.state {
860                let profile_changes = changes.profiles.get(room);
861
862                for (event_type, events) in event_types {
863                    for (state_key, raw_event) in events {
864                        let key = self.encode_key(keys::ROOM_STATE, (room, event_type, state_key));
865                        state
866                            .put(&self.serialize_value(&raw_event)?)
867                            .with_key(key.clone())
868                            .build()?;
869                        stripped_state.delete(&key).build()?;
870
871                        if *event_type == StateEventType::RoomMember {
872                            let event =
873                                match raw_event.deserialize_as_unchecked::<SyncRoomMemberEvent>() {
874                                    Ok(ev) => ev,
875                                    Err(e) => {
876                                        let event_id: Option<String> =
877                                            raw_event.get_field("event_id").ok().flatten();
878                                        debug!(event_id, "Failed to deserialize member event: {e}");
879                                        continue;
880                                    }
881                                };
882
883                            let key = (room, state_key);
884
885                            stripped_user_ids
886                                .delete(&self.encode_key(keys::STRIPPED_USER_IDS, key))
887                                .build()?;
888
889                            user_ids
890                                .put(&self.serialize_value(&RoomMember::from(&event))?)
891                                .with_key(self.encode_key(keys::USER_IDS, key))
892                                .build()?;
893
894                            if let Some(profile) =
895                                profile_changes.and_then(|p| p.get(event.state_key()))
896                            {
897                                profiles
898                                    .put(&self.serialize_value(&profile)?)
899                                    .with_key(self.encode_key(keys::PROFILES, key))
900                                    .build()?;
901                            }
902                        }
903                    }
904                }
905            }
906        }
907
908        if !changes.room_infos.is_empty() {
909            let room_infos = tx.object_store(keys::ROOM_INFOS)?;
910            for (room_id, room_info) in &changes.room_infos {
911                room_infos
912                    .put(&self.serialize_value(&room_info)?)
913                    .with_key(self.encode_key(keys::ROOM_INFOS, room_id))
914                    .build()?;
915            }
916        }
917
918        if !changes.presence.is_empty() {
919            let store = tx.object_store(keys::PRESENCE)?;
920            for (sender, event) in &changes.presence {
921                store
922                    .put(&self.serialize_value(&event)?)
923                    .with_key(self.encode_key(keys::PRESENCE, sender))
924                    .build()?;
925            }
926        }
927
928        if !changes.stripped_state.is_empty() {
929            let store = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
930            let user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
931
932            for (room, event_types) in &changes.stripped_state {
933                for (event_type, events) in event_types {
934                    for (state_key, raw_event) in events {
935                        let key = self
936                            .encode_key(keys::STRIPPED_ROOM_STATE, (room, event_type, state_key));
937                        store.put(&self.serialize_value(&raw_event)?).with_key(key).build()?;
938
939                        if *event_type == StateEventType::RoomMember {
940                            let event = match raw_event
941                                .deserialize_as_unchecked::<StrippedRoomMemberEvent>()
942                            {
943                                Ok(ev) => ev,
944                                Err(e) => {
945                                    let event_id: Option<String> =
946                                        raw_event.get_field("event_id").ok().flatten();
947                                    debug!(
948                                        event_id,
949                                        "Failed to deserialize stripped member event: {e}"
950                                    );
951                                    continue;
952                                }
953                            };
954
955                            let key = (room, state_key);
956
957                            user_ids
958                                .put(&self.serialize_value(&RoomMember::from(&event))?)
959                                .with_key(self.encode_key(keys::STRIPPED_USER_IDS, key))
960                                .build()?;
961                        }
962                    }
963                }
964            }
965        }
966
967        if !changes.receipts.is_empty() {
968            let room_user_receipts = tx.object_store(keys::ROOM_USER_RECEIPTS)?;
969            let room_event_receipts = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
970
971            for (room, content) in &changes.receipts {
972                for (event_id, receipts) in &content.0 {
973                    for (receipt_type, receipts) in receipts {
974                        for (user_id, receipt) in receipts {
975                            let key = match receipt.thread.as_str() {
976                                Some(thread_id) => self.encode_key(
977                                    keys::ROOM_USER_RECEIPTS,
978                                    (room, receipt_type, thread_id, user_id),
979                                ),
980                                None => self.encode_key(
981                                    keys::ROOM_USER_RECEIPTS,
982                                    (room, receipt_type, user_id),
983                                ),
984                            };
985
986                            if let Some((old_event, _)) =
987                                room_user_receipts.get(&key).await?.and_then(|f| {
988                                    self.deserialize_value::<(OwnedEventId, Receipt)>(&f).ok()
989                                })
990                            {
991                                let key = match receipt.thread.as_str() {
992                                    Some(thread_id) => self.encode_key(
993                                        keys::ROOM_EVENT_RECEIPTS,
994                                        (room, receipt_type, thread_id, old_event, user_id),
995                                    ),
996                                    None => self.encode_key(
997                                        keys::ROOM_EVENT_RECEIPTS,
998                                        (room, receipt_type, old_event, user_id),
999                                    ),
1000                                };
1001                                room_event_receipts.delete(&key).build()?;
1002                            }
1003
1004                            room_user_receipts
1005                                .put(&self.serialize_value(&(event_id, receipt))?)
1006                                .with_key(key)
1007                                .build()?;
1008
1009                            // Add the receipt to the room event receipts
1010                            let key = match receipt.thread.as_str() {
1011                                Some(thread_id) => self.encode_key(
1012                                    keys::ROOM_EVENT_RECEIPTS,
1013                                    (room, receipt_type, thread_id, event_id, user_id),
1014                                ),
1015                                None => self.encode_key(
1016                                    keys::ROOM_EVENT_RECEIPTS,
1017                                    (room, receipt_type, event_id, user_id),
1018                                ),
1019                            };
1020                            room_event_receipts
1021                                .put(&self.serialize_value(&(user_id, receipt))?)
1022                                .with_key(key)
1023                                .build()?;
1024                        }
1025                    }
1026                }
1027            }
1028        }
1029
1030        if !changes.redactions.is_empty() {
1031            let state = tx.object_store(keys::ROOM_STATE)?;
1032            let room_info = tx.object_store(keys::ROOM_INFOS)?;
1033
1034            for (room_id, redactions) in &changes.redactions {
1035                let range = self.encode_to_range(keys::ROOM_STATE, room_id);
1036                let Some(mut cursor) = state.open_cursor().with_query(&range).await? else {
1037                    continue;
1038                };
1039
1040                let mut redaction_rules = None;
1041
1042                while let Some(value) = cursor.next_record().await? {
1043                    let Some(key) = cursor.key::<JsValue>()? else {
1044                        break;
1045                    };
1046
1047                    let raw_evt = self.deserialize_value::<Raw<AnySyncStateEvent>>(&value)?;
1048                    if let Ok(Some(event_id)) = raw_evt.get_field::<OwnedEventId>("event_id")
1049                        && let Some(redaction) = redactions.get(&event_id)
1050                    {
1051                        let redaction_rules = match &redaction_rules {
1052                            Some(r) => r,
1053                            None => {
1054                                let value = room_info
1055                                    .get(&self.encode_key(keys::ROOM_INFOS, room_id))
1056                                    .await?
1057                                    .and_then(|f| self.deserialize_value::<RoomInfo>(&f).ok())
1058                                    .map(|info| info.room_version_rules_or_default())
1059                                    .unwrap_or_else(|| {
1060                                        warn!(
1061                                            ?room_id,
1062                                            "Unable to get the room version rules, \
1063                                             defaulting to rules for room version \
1064                                             {ROOM_VERSION_FALLBACK}"
1065                                        );
1066                                        ROOM_VERSION_RULES_FALLBACK
1067                                    })
1068                                    .redaction;
1069                                redaction_rules.get_or_insert(value)
1070                            }
1071                        };
1072
1073                        let redacted = redact(
1074                            raw_evt.deserialize_as::<CanonicalJsonObject>()?,
1075                            redaction_rules,
1076                            Some(RedactedBecause::from_raw_event(redaction)?),
1077                        )
1078                        .map_err(StoreError::Redaction)?;
1079                        state.put(&self.serialize_value(&redacted)?).with_key(key).build()?;
1080                    }
1081                }
1082            }
1083        }
1084
1085        tx.commit().await.map_err(|e| e.into())
1086    }
1087
1088    async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1089        self.inner
1090            .transaction(keys::PRESENCE)
1091            .with_mode(TransactionMode::Readonly)
1092            .build()?
1093            .object_store(keys::PRESENCE)?
1094            .get(&self.encode_key(keys::PRESENCE, user_id))
1095            .await?
1096            .map(|f| self.deserialize_value(&f))
1097            .transpose()
1098    }
1099
1100    async fn get_presence_events(
1101        &self,
1102        user_ids: &[OwnedUserId],
1103    ) -> Result<Vec<Raw<PresenceEvent>>> {
1104        if user_ids.is_empty() {
1105            return Ok(Vec::new());
1106        }
1107
1108        let txn =
1109            self.inner.transaction(keys::PRESENCE).with_mode(TransactionMode::Readonly).build()?;
1110        let store = txn.object_store(keys::PRESENCE)?;
1111
1112        let mut events = Vec::with_capacity(user_ids.len());
1113
1114        for user_id in user_ids {
1115            if let Some(event) = store
1116                .get(&self.encode_key(keys::PRESENCE, user_id))
1117                .await?
1118                .map(|f| self.deserialize_value(&f))
1119                .transpose()?
1120            {
1121                events.push(event)
1122            }
1123        }
1124
1125        Ok(events)
1126    }
1127
1128    async fn get_state_event(
1129        &self,
1130        room_id: &RoomId,
1131        event_type: StateEventType,
1132        state_key: &str,
1133    ) -> Result<Option<RawAnySyncOrStrippedState>> {
1134        Ok(self
1135            .get_state_events_for_keys(room_id, event_type, &[state_key])
1136            .await?
1137            .into_iter()
1138            .next())
1139    }
1140
1141    async fn get_state_events(
1142        &self,
1143        room_id: &RoomId,
1144        event_type: StateEventType,
1145    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1146        let stripped_range =
1147            self.encode_to_range(keys::STRIPPED_ROOM_STATE, (room_id, &event_type));
1148        let stripped_events = self
1149            .inner
1150            .transaction(keys::STRIPPED_ROOM_STATE)
1151            .with_mode(TransactionMode::Readonly)
1152            .build()?
1153            .object_store(keys::STRIPPED_ROOM_STATE)?
1154            .get_all()
1155            .with_query(&stripped_range)
1156            .await?
1157            .filter_map(Result::ok)
1158            .filter_map(|f| {
1159                self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Stripped)
1160            })
1161            .collect::<Vec<_>>();
1162
1163        if !stripped_events.is_empty() {
1164            return Ok(stripped_events);
1165        }
1166
1167        let range = self.encode_to_range(keys::ROOM_STATE, (room_id, event_type));
1168        Ok(self
1169            .inner
1170            .transaction(keys::ROOM_STATE)
1171            .with_mode(TransactionMode::Readonly)
1172            .build()?
1173            .object_store(keys::ROOM_STATE)?
1174            .get_all()
1175            .with_query(&range)
1176            .await?
1177            .filter_map(Result::ok)
1178            .filter_map(|f| self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Sync))
1179            .collect::<Vec<_>>())
1180    }
1181
1182    async fn get_state_events_for_keys(
1183        &self,
1184        room_id: &RoomId,
1185        event_type: StateEventType,
1186        state_keys: &[&str],
1187    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1188        if state_keys.is_empty() {
1189            return Ok(Vec::new());
1190        }
1191
1192        let mut events = Vec::with_capacity(state_keys.len());
1193
1194        {
1195            let txn = self
1196                .inner
1197                .transaction(keys::STRIPPED_ROOM_STATE)
1198                .with_mode(TransactionMode::Readonly)
1199                .build()?;
1200            let store = txn.object_store(keys::STRIPPED_ROOM_STATE)?;
1201
1202            for state_key in state_keys {
1203                if let Some(event) =
1204                    store
1205                        .get(&self.encode_key(
1206                            keys::STRIPPED_ROOM_STATE,
1207                            (room_id, &event_type, state_key),
1208                        ))
1209                        .await?
1210                        .map(|f| self.deserialize_value(&f))
1211                        .transpose()?
1212                {
1213                    events.push(RawAnySyncOrStrippedState::Stripped(event));
1214                }
1215            }
1216
1217            if !events.is_empty() {
1218                return Ok(events);
1219            }
1220        }
1221
1222        let txn = self
1223            .inner
1224            .transaction(keys::ROOM_STATE)
1225            .with_mode(TransactionMode::Readonly)
1226            .build()?;
1227        let store = txn.object_store(keys::ROOM_STATE)?;
1228
1229        for state_key in state_keys {
1230            if let Some(event) = store
1231                .get(&self.encode_key(keys::ROOM_STATE, (room_id, &event_type, state_key)))
1232                .await?
1233                .map(|f| self.deserialize_value(&f))
1234                .transpose()?
1235            {
1236                events.push(RawAnySyncOrStrippedState::Sync(event));
1237            }
1238        }
1239
1240        Ok(events)
1241    }
1242
1243    async fn get_profile(
1244        &self,
1245        room_id: &RoomId,
1246        user_id: &UserId,
1247    ) -> Result<Option<MinimalRoomMemberEvent>> {
1248        self.inner
1249            .transaction(keys::PROFILES)
1250            .with_mode(TransactionMode::Readonly)
1251            .build()?
1252            .object_store(keys::PROFILES)?
1253            .get(&self.encode_key(keys::PROFILES, (room_id, user_id)))
1254            .await?
1255            .map(|f| self.deserialize_value(&f))
1256            .transpose()
1257    }
1258
1259    async fn get_profiles<'a>(
1260        &self,
1261        room_id: &RoomId,
1262        user_ids: &'a [OwnedUserId],
1263    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1264        if user_ids.is_empty() {
1265            return Ok(BTreeMap::new());
1266        }
1267
1268        let txn =
1269            self.inner.transaction(keys::PROFILES).with_mode(TransactionMode::Readonly).build()?;
1270        let store = txn.object_store(keys::PROFILES)?;
1271
1272        let mut profiles = BTreeMap::new();
1273        for user_id in user_ids {
1274            if let Some(profile) = store
1275                .get(&self.encode_key(keys::PROFILES, (room_id, user_id)))
1276                .await?
1277                .map(|f| self.deserialize_value(&f))
1278                .transpose()?
1279            {
1280                profiles.insert(user_id.as_ref(), profile);
1281            }
1282        }
1283
1284        Ok(profiles)
1285    }
1286
1287    async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1288        let transaction = self
1289            .inner
1290            .transaction(keys::ROOM_INFOS)
1291            .with_mode(TransactionMode::Readonly)
1292            .build()?;
1293
1294        let object_store = transaction.object_store(keys::ROOM_INFOS)?;
1295
1296        Ok(match room_load_settings {
1297            RoomLoadSettings::All => object_store
1298                .get_all()
1299                .await?
1300                .map(|room_info| self.deserialize_value::<RoomInfo>(&room_info?))
1301                .collect::<Result<_>>()?,
1302
1303            RoomLoadSettings::One(room_id) => {
1304                match object_store.get(&self.encode_key(keys::ROOM_INFOS, room_id)).await? {
1305                    Some(room_info) => vec![self.deserialize_value::<RoomInfo>(&room_info)?],
1306                    None => vec![],
1307                }
1308            }
1309        })
1310    }
1311
1312    async fn get_users_with_display_name(
1313        &self,
1314        room_id: &RoomId,
1315        display_name: &DisplayName,
1316    ) -> Result<BTreeSet<OwnedUserId>> {
1317        self.inner
1318            .transaction(keys::DISPLAY_NAMES)
1319            .with_mode(TransactionMode::Readonly)
1320            .build()?
1321            .object_store(keys::DISPLAY_NAMES)?
1322            .get(&self.encode_key(
1323                keys::DISPLAY_NAMES,
1324                (
1325                    room_id,
1326                    display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1327                ),
1328            ))
1329            .await?
1330            .map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
1331            .unwrap_or_else(|| Ok(Default::default()))
1332    }
1333
1334    async fn get_users_with_display_names<'a>(
1335        &self,
1336        room_id: &RoomId,
1337        display_names: &'a [DisplayName],
1338    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1339        let mut map = HashMap::new();
1340
1341        if display_names.is_empty() {
1342            return Ok(map);
1343        }
1344
1345        let txn = self
1346            .inner
1347            .transaction(keys::DISPLAY_NAMES)
1348            .with_mode(TransactionMode::Readonly)
1349            .build()?;
1350        let store = txn.object_store(keys::DISPLAY_NAMES)?;
1351
1352        for display_name in display_names {
1353            if let Some(user_ids) = store
1354                .get(
1355                    &self.encode_key(
1356                        keys::DISPLAY_NAMES,
1357                        (
1358                            room_id,
1359                            display_name
1360                                .as_normalized_str()
1361                                .unwrap_or_else(|| display_name.as_raw_str()),
1362                        ),
1363                    ),
1364                )
1365                .await?
1366                .map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
1367                .transpose()?
1368            {
1369                map.insert(display_name, user_ids);
1370            }
1371        }
1372
1373        Ok(map)
1374    }
1375
1376    async fn get_account_data_event(
1377        &self,
1378        event_type: GlobalAccountDataEventType,
1379    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1380        self.inner
1381            .transaction(keys::ACCOUNT_DATA)
1382            .with_mode(TransactionMode::Readonly)
1383            .build()?
1384            .object_store(keys::ACCOUNT_DATA)?
1385            .get(&self.encode_key(keys::ACCOUNT_DATA, event_type))
1386            .await?
1387            .map(|f| self.deserialize_value(&f))
1388            .transpose()
1389    }
1390
1391    async fn get_room_account_data_event(
1392        &self,
1393        room_id: &RoomId,
1394        event_type: RoomAccountDataEventType,
1395    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1396        self.inner
1397            .transaction(keys::ROOM_ACCOUNT_DATA)
1398            .with_mode(TransactionMode::Readonly)
1399            .build()?
1400            .object_store(keys::ROOM_ACCOUNT_DATA)?
1401            .get(&self.encode_key(keys::ROOM_ACCOUNT_DATA, (room_id, event_type)))
1402            .await?
1403            .map(|f| self.deserialize_value(&f))
1404            .transpose()
1405    }
1406
1407    async fn get_user_room_receipt_event(
1408        &self,
1409        room_id: &RoomId,
1410        receipt_type: ReceiptType,
1411        thread: ReceiptThread,
1412        user_id: &UserId,
1413    ) -> Result<Option<(OwnedEventId, Receipt)>> {
1414        let key = match thread.as_str() {
1415            Some(thread_id) => self
1416                .encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, thread_id, user_id)),
1417            None => self.encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, user_id)),
1418        };
1419        self.inner
1420            .transaction(keys::ROOM_USER_RECEIPTS)
1421            .with_mode(TransactionMode::Readonly)
1422            .build()?
1423            .object_store(keys::ROOM_USER_RECEIPTS)?
1424            .get(&key)
1425            .await?
1426            .map(|f| self.deserialize_value(&f))
1427            .transpose()
1428    }
1429
1430    async fn get_event_room_receipt_events(
1431        &self,
1432        room_id: &RoomId,
1433        receipt_type: ReceiptType,
1434        thread: ReceiptThread,
1435        event_id: &EventId,
1436    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1437        let range = match thread.as_str() {
1438            Some(thread_id) => self.encode_to_range(
1439                keys::ROOM_EVENT_RECEIPTS,
1440                (room_id, receipt_type, thread_id, event_id),
1441            ),
1442            None => {
1443                self.encode_to_range(keys::ROOM_EVENT_RECEIPTS, (room_id, receipt_type, event_id))
1444            }
1445        };
1446        let tx = self
1447            .inner
1448            .transaction(keys::ROOM_EVENT_RECEIPTS)
1449            .with_mode(TransactionMode::Readonly)
1450            .build()?;
1451        let store = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
1452
1453        Ok(store
1454            .get_all()
1455            .with_query(&range)
1456            .await?
1457            .filter_map(Result::ok)
1458            .filter_map(|f| self.deserialize_value(&f).ok())
1459            .collect::<Vec<_>>())
1460    }
1461
1462    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1463        let jskey = &JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1464        self.get_custom_value_for_js(jskey).await
1465    }
1466
1467    async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1468        let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1469
1470        let prev = self.get_custom_value_for_js(&jskey).await?;
1471
1472        let tx =
1473            self.inner.transaction(keys::CUSTOM).with_mode(TransactionMode::Readwrite).build()?;
1474
1475        tx.object_store(keys::CUSTOM)?
1476            .put(&self.serialize_value(&value)?)
1477            .with_key(jskey)
1478            .build()?;
1479
1480        tx.commit().await.map_err(IndexeddbStateStoreError::from)?;
1481        Ok(prev)
1482    }
1483
1484    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1485        let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1486
1487        let prev = self.get_custom_value_for_js(&jskey).await?;
1488
1489        let tx =
1490            self.inner.transaction(keys::CUSTOM).with_mode(TransactionMode::Readwrite).build()?;
1491
1492        tx.object_store(keys::CUSTOM)?.delete(&jskey).build()?;
1493
1494        tx.commit().await.map_err(IndexeddbStateStoreError::from)?;
1495        Ok(prev)
1496    }
1497
1498    async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1499        // All the stores which use a RoomId as their key (and nothing additional).
1500        let direct_stores = [keys::ROOM_INFOS, keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE];
1501
1502        // All the stores which use a RoomId as the first part of their key, but may
1503        // have some additional data in the key.
1504        let prefixed_stores = [
1505            keys::PROFILES,
1506            keys::DISPLAY_NAMES,
1507            keys::USER_IDS,
1508            keys::ROOM_STATE,
1509            keys::ROOM_ACCOUNT_DATA,
1510            keys::ROOM_EVENT_RECEIPTS,
1511            keys::ROOM_USER_RECEIPTS,
1512            keys::STRIPPED_ROOM_STATE,
1513            keys::STRIPPED_USER_IDS,
1514            keys::THREAD_SUBSCRIPTIONS,
1515        ];
1516
1517        let all_stores = {
1518            let mut v = Vec::new();
1519            v.extend(prefixed_stores);
1520            v.extend(direct_stores);
1521            v
1522        };
1523
1524        let tx =
1525            self.inner.transaction(all_stores).with_mode(TransactionMode::Readwrite).build()?;
1526
1527        for store_name in direct_stores {
1528            tx.object_store(store_name)?.delete(&self.encode_key(store_name, room_id)).build()?;
1529        }
1530
1531        for store_name in prefixed_stores {
1532            let store = tx.object_store(store_name)?;
1533            let range = self.encode_to_range(store_name, room_id);
1534            for key in store.get_all_keys::<JsValue>().with_query(&range).await? {
1535                store.delete(&key?).build()?;
1536            }
1537        }
1538
1539        tx.commit().await.map_err(|e| e.into())
1540    }
1541
1542    async fn get_user_ids(
1543        &self,
1544        room_id: &RoomId,
1545        memberships: RoomMemberships,
1546    ) -> Result<Vec<OwnedUserId>> {
1547        let ids = self.get_user_ids_inner(room_id, memberships, true).await?;
1548        if !ids.is_empty() {
1549            return Ok(ids);
1550        }
1551        self.get_user_ids_inner(room_id, memberships, false).await
1552    }
1553
1554    async fn save_send_queue_request(
1555        &self,
1556        room_id: &RoomId,
1557        transaction_id: OwnedTransactionId,
1558        created_at: MilliSecondsSinceUnixEpoch,
1559        kind: QueuedRequestKind,
1560        priority: usize,
1561    ) -> Result<()> {
1562        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1563
1564        let tx = self
1565            .inner
1566            .transaction(keys::ROOM_SEND_QUEUE)
1567            .with_mode(TransactionMode::Readwrite)
1568            .build()?;
1569
1570        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1571
1572        // We store an encoded vector of the queued requests, with their transaction
1573        // ids.
1574
1575        // Reload the previous vector for this room, or create an empty one.
1576        let prev = obj.get(&encoded_key).await?;
1577
1578        let mut prev = prev.map_or_else(
1579            || Ok(Vec::new()),
1580            |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1581        )?;
1582
1583        // Push the new request.
1584        prev.push(PersistedQueuedRequest {
1585            room_id: room_id.to_owned(),
1586            kind: Some(kind),
1587            transaction_id,
1588            error: None,
1589            is_wedged: None,
1590            event: None,
1591            priority: Some(priority),
1592            created_at,
1593        });
1594
1595        // Save the new vector into db.
1596        obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1597
1598        tx.commit().await?;
1599
1600        Ok(())
1601    }
1602
1603    async fn update_send_queue_request(
1604        &self,
1605        room_id: &RoomId,
1606        transaction_id: &TransactionId,
1607        kind: QueuedRequestKind,
1608    ) -> Result<bool> {
1609        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1610
1611        let tx = self
1612            .inner
1613            .transaction(keys::ROOM_SEND_QUEUE)
1614            .with_mode(TransactionMode::Readwrite)
1615            .build()?;
1616
1617        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1618
1619        // We store an encoded vector of the queued requests, with their transaction
1620        // ids.
1621
1622        // Reload the previous vector for this room, or create an empty one.
1623        let prev = obj.get(&encoded_key).await?;
1624
1625        let mut prev = prev.map_or_else(
1626            || Ok(Vec::new()),
1627            |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1628        )?;
1629
1630        // Modify the one request.
1631        if let Some(entry) = prev.iter_mut().find(|entry| entry.transaction_id == transaction_id) {
1632            entry.kind = Some(kind);
1633            // Reset the error state.
1634            entry.error = None;
1635            // Remove migrated fields.
1636            entry.is_wedged = None;
1637            entry.event = None;
1638
1639            // Save the new vector into db.
1640            obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1641            tx.commit().await?;
1642
1643            Ok(true)
1644        } else {
1645            Ok(false)
1646        }
1647    }
1648
1649    async fn remove_send_queue_request(
1650        &self,
1651        room_id: &RoomId,
1652        transaction_id: &TransactionId,
1653    ) -> Result<bool> {
1654        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1655
1656        let tx = self
1657            .inner
1658            .transaction([keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE])
1659            .with_mode(TransactionMode::Readwrite)
1660            .build()?;
1661
1662        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1663
1664        // We store an encoded vector of the queued requests, with their transaction
1665        // ids.
1666
1667        // Reload the previous vector for this room.
1668        if let Some(val) = obj.get(&encoded_key).await? {
1669            let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
1670            if let Some(pos) = prev.iter().position(|item| item.transaction_id == transaction_id) {
1671                prev.remove(pos);
1672
1673                if prev.is_empty() {
1674                    obj.delete(&encoded_key).build()?;
1675                } else {
1676                    obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1677                }
1678
1679                tx.commit().await?;
1680                return Ok(true);
1681            }
1682        }
1683
1684        Ok(false)
1685    }
1686
1687    async fn load_send_queue_requests(&self, room_id: &RoomId) -> Result<Vec<QueuedRequest>> {
1688        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1689
1690        // We store an encoded vector of the queued requests, with their transaction
1691        // ids.
1692        let prev = self
1693            .inner
1694            .transaction(keys::ROOM_SEND_QUEUE)
1695            .with_mode(TransactionMode::Readwrite)
1696            .build()?
1697            .object_store(keys::ROOM_SEND_QUEUE)?
1698            .get(&encoded_key)
1699            .await?;
1700
1701        let mut prev = prev.map_or_else(
1702            || Ok(Vec::new()),
1703            |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1704        )?;
1705
1706        // Inverted stable ordering on priority.
1707        prev.sort_by_key(|item| Reverse(item.priority.unwrap_or(0)));
1708
1709        Ok(prev.into_iter().filter_map(PersistedQueuedRequest::into_queued_request).collect())
1710    }
1711
1712    async fn update_send_queue_request_status(
1713        &self,
1714        room_id: &RoomId,
1715        transaction_id: &TransactionId,
1716        error: Option<QueueWedgeError>,
1717    ) -> Result<()> {
1718        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1719
1720        let tx = self
1721            .inner
1722            .transaction(keys::ROOM_SEND_QUEUE)
1723            .with_mode(TransactionMode::Readwrite)
1724            .build()?;
1725
1726        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1727
1728        if let Some(val) = obj.get(&encoded_key).await? {
1729            let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
1730            if let Some(request) =
1731                prev.iter_mut().find(|item| item.transaction_id == transaction_id)
1732            {
1733                request.is_wedged = None;
1734                request.error = error;
1735                obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1736            }
1737        }
1738
1739        tx.commit().await?;
1740
1741        Ok(())
1742    }
1743
1744    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>> {
1745        let tx = self
1746            .inner
1747            .transaction(keys::ROOM_SEND_QUEUE)
1748            .with_mode(TransactionMode::Readwrite)
1749            .build()?;
1750
1751        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1752
1753        let all_entries = obj
1754            .get_all()
1755            .await?
1756            .map(|item| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&item?))
1757            .collect::<Result<Vec<Vec<PersistedQueuedRequest>>, _>>()?
1758            .into_iter()
1759            .flat_map(|vec| vec.into_iter().map(|item| item.room_id))
1760            .collect::<BTreeSet<_>>();
1761
1762        Ok(all_entries.into_iter().collect())
1763    }
1764
1765    async fn save_dependent_queued_request(
1766        &self,
1767        room_id: &RoomId,
1768        parent_txn_id: &TransactionId,
1769        own_txn_id: ChildTransactionId,
1770        created_at: MilliSecondsSinceUnixEpoch,
1771        content: DependentQueuedRequestKind,
1772    ) -> Result<()> {
1773        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1774
1775        let tx = self
1776            .inner
1777            .transaction(keys::DEPENDENT_SEND_QUEUE)
1778            .with_mode(TransactionMode::Readwrite)
1779            .build()?;
1780
1781        let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1782
1783        // We store an encoded vector of the dependent requests.
1784        // Reload the previous vector for this room, or create an empty one.
1785        let prev = obj.get(&encoded_key).await?;
1786
1787        let mut prev = prev.map_or_else(
1788            || Ok(Vec::new()),
1789            |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1790        )?;
1791
1792        // Push the new request.
1793        prev.push(DependentQueuedRequest {
1794            kind: content,
1795            parent_transaction_id: parent_txn_id.to_owned(),
1796            own_transaction_id: own_txn_id,
1797            parent_key: None,
1798            created_at,
1799        });
1800
1801        // Save the new vector into db.
1802        obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1803
1804        tx.commit().await?;
1805
1806        Ok(())
1807    }
1808
1809    async fn update_dependent_queued_request(
1810        &self,
1811        room_id: &RoomId,
1812        own_transaction_id: &ChildTransactionId,
1813        new_content: DependentQueuedRequestKind,
1814    ) -> Result<bool> {
1815        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1816
1817        let tx = self
1818            .inner
1819            .transaction(keys::DEPENDENT_SEND_QUEUE)
1820            .with_mode(TransactionMode::Readwrite)
1821            .build()?;
1822
1823        let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1824
1825        // We store an encoded vector of the dependent requests.
1826        // Reload the previous vector for this room, or create an empty one.
1827        let prev = obj.get(&encoded_key).await?;
1828
1829        let mut prev = prev.map_or_else(
1830            || Ok(Vec::new()),
1831            |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1832        )?;
1833
1834        // Modify the dependent request, if found.
1835        let mut found = false;
1836        for entry in prev.iter_mut() {
1837            if entry.own_transaction_id == *own_transaction_id {
1838                found = true;
1839                entry.kind = new_content;
1840                break;
1841            }
1842        }
1843
1844        if found {
1845            obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1846            tx.commit().await?;
1847        }
1848
1849        Ok(found)
1850    }
1851
1852    async fn mark_dependent_queued_requests_as_ready(
1853        &self,
1854        room_id: &RoomId,
1855        parent_txn_id: &TransactionId,
1856        parent_key: SentRequestKey,
1857    ) -> Result<usize> {
1858        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1859
1860        let tx = self
1861            .inner
1862            .transaction(keys::DEPENDENT_SEND_QUEUE)
1863            .with_mode(TransactionMode::Readwrite)
1864            .build()?;
1865
1866        let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1867
1868        // We store an encoded vector of the dependent requests.
1869        // Reload the previous vector for this room, or create an empty one.
1870        let prev = obj.get(&encoded_key).await?;
1871
1872        let mut prev = prev.map_or_else(
1873            || Ok(Vec::new()),
1874            |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1875        )?;
1876
1877        // Modify all requests that match.
1878        let mut num_updated = 0;
1879        for entry in prev.iter_mut().filter(|entry| entry.parent_transaction_id == parent_txn_id) {
1880            entry.parent_key = Some(parent_key.clone());
1881            num_updated += 1;
1882        }
1883
1884        if num_updated > 0 {
1885            obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1886            tx.commit().await?;
1887        }
1888
1889        Ok(num_updated)
1890    }
1891
1892    async fn remove_dependent_queued_request(
1893        &self,
1894        room_id: &RoomId,
1895        txn_id: &ChildTransactionId,
1896    ) -> Result<bool> {
1897        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1898
1899        let tx = self
1900            .inner
1901            .transaction(keys::DEPENDENT_SEND_QUEUE)
1902            .with_mode(TransactionMode::Readwrite)
1903            .build()?;
1904
1905        let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1906
1907        // We store an encoded vector of the dependent requests.
1908        // Reload the previous vector for this room.
1909        if let Some(val) = obj.get(&encoded_key).await? {
1910            let mut prev = self.deserialize_value::<Vec<DependentQueuedRequest>>(&val)?;
1911            if let Some(pos) = prev.iter().position(|item| item.own_transaction_id == *txn_id) {
1912                prev.remove(pos);
1913
1914                if prev.is_empty() {
1915                    obj.delete(&encoded_key).build()?;
1916                } else {
1917                    obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1918                }
1919
1920                tx.commit().await?;
1921                return Ok(true);
1922            }
1923        }
1924
1925        Ok(false)
1926    }
1927
1928    async fn load_dependent_queued_requests(
1929        &self,
1930        room_id: &RoomId,
1931    ) -> Result<Vec<DependentQueuedRequest>> {
1932        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1933
1934        // We store an encoded vector of the dependent requests.
1935        let prev = self
1936            .inner
1937            .transaction(keys::DEPENDENT_SEND_QUEUE)
1938            .with_mode(TransactionMode::Readwrite)
1939            .build()?
1940            .object_store(keys::DEPENDENT_SEND_QUEUE)?
1941            .get(&encoded_key)
1942            .await?;
1943
1944        prev.map_or_else(
1945            || Ok(Vec::new()),
1946            |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1947        )
1948    }
1949
1950    async fn upsert_thread_subscriptions(
1951        &self,
1952        updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1953    ) -> Result<()> {
1954        let tx = self
1955            .inner
1956            .transaction(keys::THREAD_SUBSCRIPTIONS)
1957            .with_mode(TransactionMode::Readwrite)
1958            .build()?;
1959        let obj = tx.object_store(keys::THREAD_SUBSCRIPTIONS)?;
1960
1961        for (room_id, thread_id, subscription) in updates {
1962            let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room_id, thread_id));
1963            let mut new = PersistedThreadSubscription::from(subscription);
1964
1965            // See if there's a previous subscription.
1966            if let Some(previous_value) = obj.get(&encoded_key).await? {
1967                let previous: PersistedThreadSubscription =
1968                    self.deserialize_value(&previous_value)?;
1969
1970                // If the previous status is the same as the new one, don't do anything.
1971                if new == previous {
1972                    continue;
1973                }
1974                if !compare_thread_subscription_bump_stamps(
1975                    previous.bump_stamp,
1976                    &mut new.bump_stamp,
1977                ) {
1978                    continue;
1979                }
1980            }
1981
1982            let serialized_value = self.serialize_value(&new);
1983            obj.put(&serialized_value?).with_key(encoded_key).build()?;
1984        }
1985
1986        tx.commit().await?;
1987
1988        Ok(())
1989    }
1990
1991    async fn load_thread_subscription(
1992        &self,
1993        room: &RoomId,
1994        thread_id: &EventId,
1995    ) -> Result<Option<StoredThreadSubscription>> {
1996        let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
1997
1998        let js_value = self
1999            .inner
2000            .transaction(keys::THREAD_SUBSCRIPTIONS)
2001            .with_mode(TransactionMode::Readonly)
2002            .build()?
2003            .object_store(keys::THREAD_SUBSCRIPTIONS)?
2004            .get(&encoded_key)
2005            .await?;
2006
2007        let Some(js_value) = js_value else {
2008            // We didn't have a previous subscription for this thread.
2009            return Ok(None);
2010        };
2011
2012        let sub: PersistedThreadSubscription = self.deserialize_value(&js_value)?;
2013
2014        let status = ThreadSubscriptionStatus::from_str(&sub.status).map_err(|_| {
2015            StoreError::InvalidData {
2016                details: format!(
2017                    "invalid thread status for room {room} and thread {thread_id}: {}",
2018                    sub.status
2019                ),
2020            }
2021        })?;
2022
2023        Ok(Some(StoredThreadSubscription { status, bump_stamp: sub.bump_stamp }))
2024    }
2025
2026    async fn remove_thread_subscription(&self, room: &RoomId, thread_id: &EventId) -> Result<()> {
2027        let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
2028
2029        let transaction = self
2030            .inner
2031            .transaction(keys::THREAD_SUBSCRIPTIONS)
2032            .with_mode(TransactionMode::Readwrite)
2033            .build()?;
2034        transaction.object_store(keys::THREAD_SUBSCRIPTIONS)?.delete(&encoded_key).await?;
2035        transaction.commit().await?;
2036
2037        Ok(())
2038    }
2039
2040    #[allow(clippy::unused_async)]
2041    async fn optimize(&self) -> Result<()> {
2042        Ok(())
2043    }
2044
2045    #[allow(clippy::unused_async)]
2046    async fn get_size(&self) -> Result<Option<usize>> {
2047        Ok(None)
2048    }
2049});
2050
2051/// A room member.
2052#[derive(Debug, Serialize, Deserialize)]
2053struct RoomMember {
2054    user_id: OwnedUserId,
2055    membership: MembershipState,
2056}
2057
2058impl From<&SyncStateEvent<RoomMemberEventContent>> for RoomMember {
2059    fn from(event: &SyncStateEvent<RoomMemberEventContent>) -> Self {
2060        Self { user_id: event.state_key().clone(), membership: event.membership().clone() }
2061    }
2062}
2063
2064impl From<&StrippedRoomMemberEvent> for RoomMember {
2065    fn from(event: &StrippedRoomMemberEvent) -> Self {
2066        Self { user_id: event.state_key.clone(), membership: event.content.membership.clone() }
2067    }
2068}
2069
2070#[cfg(test)]
2071mod migration_tests {
2072    use assert_matches2::assert_matches;
2073    use matrix_sdk_base::store::{QueuedRequestKind, SerializableEventContent};
2074    use ruma::{
2075        OwnedRoomId, OwnedTransactionId, TransactionId,
2076        events::room::message::RoomMessageEventContent, room_id,
2077    };
2078    use serde::{Deserialize, Serialize};
2079
2080    use crate::state_store::PersistedQueuedRequest;
2081
2082    #[derive(Serialize, Deserialize)]
2083    struct OldPersistedQueuedRequest {
2084        room_id: OwnedRoomId,
2085        event: SerializableEventContent,
2086        transaction_id: OwnedTransactionId,
2087        is_wedged: bool,
2088    }
2089
2090    // We now persist an error when an event failed to send instead of just a
2091    // boolean. To support that, `PersistedQueueEvent` changed a bool to
2092    // Option<bool>, ensures that this work properly.
2093    #[test]
2094    fn test_migrating_persisted_queue_event_serialization() {
2095        let room_a_id = room_id!("!room_a:dummy.local");
2096        let transaction_id = TransactionId::new();
2097        let content =
2098            SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())
2099                .unwrap();
2100
2101        let old_persisted_queue_event = OldPersistedQueuedRequest {
2102            room_id: room_a_id.to_owned(),
2103            event: content,
2104            transaction_id: transaction_id.clone(),
2105            is_wedged: true,
2106        };
2107
2108        let serialized_persisted = serde_json::to_vec(&old_persisted_queue_event).unwrap();
2109
2110        // Load it with the new version.
2111        let new_persisted: PersistedQueuedRequest =
2112            serde_json::from_slice(&serialized_persisted).unwrap();
2113
2114        assert_eq!(new_persisted.is_wedged, Some(true));
2115        assert!(new_persisted.error.is_none());
2116
2117        assert!(new_persisted.event.is_some());
2118        assert!(new_persisted.kind.is_none());
2119
2120        let queued = new_persisted.into_queued_request().unwrap();
2121        assert_matches!(queued.kind, QueuedRequestKind::Event { .. });
2122        assert_eq!(queued.transaction_id, transaction_id);
2123        assert!(queued.error.is_some());
2124    }
2125}
2126
2127#[cfg(all(test, target_family = "wasm"))]
2128mod tests {
2129    #[cfg(target_family = "wasm")]
2130    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
2131
2132    use matrix_sdk_base::statestore_integration_tests;
2133    use uuid::Uuid;
2134
2135    use super::{IndexeddbStateStore, Result};
2136
2137    async fn get_store() -> Result<IndexeddbStateStore> {
2138        let db_name = format!("test-state-plain-{}", Uuid::new_v4().as_hyphenated());
2139        Ok(IndexeddbStateStore::builder().name(db_name).build().await?)
2140    }
2141
2142    statestore_integration_tests!();
2143}
2144
2145#[cfg(all(test, target_family = "wasm"))]
2146mod encrypted_tests {
2147    #[cfg(target_family = "wasm")]
2148    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
2149
2150    use matrix_sdk_base::statestore_integration_tests;
2151    use uuid::Uuid;
2152
2153    use super::{IndexeddbStateStore, Result};
2154
2155    async fn get_store() -> Result<IndexeddbStateStore> {
2156        let db_name = format!("test-state-encrypted-{}", Uuid::new_v4().as_hyphenated());
2157        let passphrase = format!("some_passphrase-{}", Uuid::new_v4().as_hyphenated());
2158        Ok(IndexeddbStateStore::builder().name(db_name).passphrase(passphrase).build().await?)
2159    }
2160
2161    statestore_integration_tests!();
2162}