Skip to main content

matrix_sdk_indexeddb/state_store/
mod.rs

1// Copyright 2021 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    cmp::Reverse,
17    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
18    str::FromStr as _,
19    sync::Arc,
20};
21
22use async_trait::async_trait;
23use gloo_utils::format::JsValueSerdeExt;
24use growable_bloom_filter::GrowableBloom;
25use indexed_db_futures::{
26    KeyRange, cursor::CursorDirection, database::Database, error::OpenDbError, prelude::*,
27    transaction::TransactionMode,
28};
29use matrix_sdk_base::{
30    MinimalRoomMemberEvent, ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK, RoomInfo,
31    RoomMemberships, StateStoreDataKey, StateStoreDataValue, ThreadSubscriptionCatchupToken,
32    deserialized_responses::{DisplayName, RawAnySyncOrStrippedState},
33    store::{
34        ChildTransactionId, ComposerDraft, DependentQueuedRequest, DependentQueuedRequestKind,
35        QueuedRequest, QueuedRequestKind, RoomLoadSettings, SentRequestKey,
36        SerializableEventContent, StateChanges, StateStore, StoreError, StoredThreadSubscription,
37        SupportedVersionsResponse, ThreadSubscriptionStatus, WellKnownResponse,
38        compare_thread_subscription_bump_stamps,
39    },
40    ttl::TtlValue,
41};
42use matrix_sdk_store_encryption::{Error as EncryptionError, StoreCipher};
43use ruma::{
44    CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri,
45    OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
46    api::client::discovery::get_capabilities::v3::Capabilities,
47    canonical_json::{RedactedBecause, redact},
48    events::{
49        AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
50        GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, SyncStateEvent,
51        presence::PresenceEvent,
52        receipt::{Receipt, ReceiptThread, ReceiptType},
53        room::member::{
54            MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent, SyncRoomMemberEvent,
55        },
56    },
57    serde::Raw,
58};
59use serde::{Deserialize, Serialize, de::DeserializeOwned, ser::Error};
60use tracing::{debug, warn};
61use wasm_bindgen::JsValue;
62
63mod migrations;
64
65pub use self::migrations::MigrationConflictStrategy;
66use self::migrations::{upgrade_inner_db, upgrade_meta_db};
67use crate::{error::GenericError, serializer::safe_encode::traits::SafeEncode};
68
69#[derive(Debug, thiserror::Error)]
70pub enum IndexeddbStateStoreError {
71    #[error(transparent)]
72    Json(#[from] serde_json::Error),
73    #[error(transparent)]
74    Encryption(#[from] EncryptionError),
75    #[error("DomException {name} ({code}): {message}")]
76    DomException { name: String, message: String, code: u16 },
77    #[error(transparent)]
78    StoreError(#[from] StoreError),
79    #[error(
80        "Can't migrate {name} from {old_version} to {new_version} without deleting data. \
81         See MigrationConflictStrategy for ways to configure."
82    )]
83    MigrationConflict { name: String, old_version: u32, new_version: u32 },
84}
85
86impl From<GenericError> for IndexeddbStateStoreError {
87    fn from(value: GenericError) -> Self {
88        Self::StoreError(value.into())
89    }
90}
91
92impl From<web_sys::DomException> for IndexeddbStateStoreError {
93    fn from(frm: web_sys::DomException) -> IndexeddbStateStoreError {
94        IndexeddbStateStoreError::DomException {
95            name: frm.name(),
96            message: frm.message(),
97            code: frm.code(),
98        }
99    }
100}
101
102impl From<IndexeddbStateStoreError> for StoreError {
103    fn from(e: IndexeddbStateStoreError) -> Self {
104        match e {
105            IndexeddbStateStoreError::Json(e) => StoreError::Json(e),
106            IndexeddbStateStoreError::StoreError(e) => e,
107            IndexeddbStateStoreError::Encryption(e) => StoreError::Encryption(e),
108            _ => StoreError::backend(e),
109        }
110    }
111}
112
113impl From<indexed_db_futures::error::DomException> for IndexeddbStateStoreError {
114    fn from(value: indexed_db_futures::error::DomException) -> Self {
115        web_sys::DomException::from(value).into()
116    }
117}
118
119impl From<indexed_db_futures::error::SerialisationError> for IndexeddbStateStoreError {
120    fn from(value: indexed_db_futures::error::SerialisationError) -> Self {
121        Self::Json(serde_json::Error::custom(value.to_string()))
122    }
123}
124
125impl From<indexed_db_futures::error::UnexpectedDataError> for IndexeddbStateStoreError {
126    fn from(value: indexed_db_futures::error::UnexpectedDataError) -> Self {
127        IndexeddbStateStoreError::StoreError(StoreError::backend(value))
128    }
129}
130
131impl From<indexed_db_futures::error::JSError> for IndexeddbStateStoreError {
132    fn from(value: indexed_db_futures::error::JSError) -> Self {
133        GenericError::from(value.to_string()).into()
134    }
135}
136
137impl From<indexed_db_futures::error::Error> for IndexeddbStateStoreError {
138    fn from(value: indexed_db_futures::error::Error) -> Self {
139        use indexed_db_futures::error::Error;
140        match value {
141            Error::DomException(e) => e.into(),
142            Error::Serialisation(e) => e.into(),
143            Error::MissingData(e) => e.into(),
144            Error::Unknown(e) => e.into(),
145        }
146    }
147}
148
149impl From<OpenDbError> for IndexeddbStateStoreError {
150    fn from(value: OpenDbError) -> Self {
151        match value {
152            OpenDbError::Base(error) => error.into(),
153            _ => GenericError::from(value.to_string()).into(),
154        }
155    }
156}
157
158mod keys {
159    pub const INTERNAL_STATE: &str = "matrix-sdk-state";
160    pub const BACKUPS_META: &str = "backups";
161
162    pub const ACCOUNT_DATA: &str = "account_data";
163
164    pub const PROFILES: &str = "profiles";
165    pub const DISPLAY_NAMES: &str = "display_names";
166    pub const USER_IDS: &str = "user_ids";
167
168    pub const ROOM_STATE: &str = "room_state";
169    pub const ROOM_INFOS: &str = "room_infos";
170    pub const PRESENCE: &str = "presence";
171    pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
172    /// Table used to save send queue events.
173    pub const ROOM_SEND_QUEUE: &str = "room_send_queue";
174    /// Table used to save dependent send queue events.
175    pub const DEPENDENT_SEND_QUEUE: &str = "room_dependent_send_queue";
176    pub const THREAD_SUBSCRIPTIONS: &str = "room_thread_subscriptions";
177
178    pub const STRIPPED_ROOM_STATE: &str = "stripped_room_state";
179    pub const STRIPPED_USER_IDS: &str = "stripped_user_ids";
180
181    pub const ROOM_USER_RECEIPTS: &str = "room_user_receipts";
182    pub const ROOM_EVENT_RECEIPTS: &str = "room_event_receipts";
183
184    pub const CUSTOM: &str = "custom";
185    pub const KV: &str = "kv";
186
187    /// All names of the current state stores for convenience.
188    pub const ALL_STORES: &[&str] = &[
189        ACCOUNT_DATA,
190        PROFILES,
191        DISPLAY_NAMES,
192        USER_IDS,
193        ROOM_STATE,
194        ROOM_INFOS,
195        PRESENCE,
196        ROOM_ACCOUNT_DATA,
197        STRIPPED_ROOM_STATE,
198        STRIPPED_USER_IDS,
199        ROOM_USER_RECEIPTS,
200        ROOM_EVENT_RECEIPTS,
201        ROOM_SEND_QUEUE,
202        THREAD_SUBSCRIPTIONS,
203        DEPENDENT_SEND_QUEUE,
204        CUSTOM,
205        KV,
206    ];
207
208    // static keys
209
210    pub const STORE_KEY: &str = "store_key";
211}
212
213pub use keys::ALL_STORES;
214use matrix_sdk_base::store::QueueWedgeError;
215
216/// Encrypt (if needs be) then JSON-serialize a value.
217fn serialize_value(store_cipher: Option<&StoreCipher>, event: &impl Serialize) -> Result<JsValue> {
218    Ok(match store_cipher {
219        Some(cipher) => {
220            let data = serde_json::to_vec(event)?;
221            JsValue::from_serde(&cipher.encrypt_value_data(data)?)?
222        }
223        None => JsValue::from_serde(event)?,
224    })
225}
226
227/// Deserialize a JSON value and then decrypt it (if needs be).
228fn deserialize_value<T: DeserializeOwned>(
229    store_cipher: Option<&StoreCipher>,
230    event: &JsValue,
231) -> Result<T> {
232    match store_cipher {
233        Some(cipher) => {
234            use zeroize::Zeroize;
235            let mut plaintext = cipher.decrypt_value_data(event.into_serde()?)?;
236            let ret = serde_json::from_slice(&plaintext);
237            plaintext.zeroize();
238            Ok(ret?)
239        }
240        None => Ok(event.into_serde()?),
241    }
242}
243
244fn encode_key<T>(store_cipher: Option<&StoreCipher>, table_name: &str, key: T) -> JsValue
245where
246    T: SafeEncode,
247{
248    match store_cipher {
249        Some(cipher) => key.as_secure_string(table_name, cipher),
250        None => key.as_encoded_string(),
251    }
252    .into()
253}
254
255fn encode_to_range<T>(
256    store_cipher: Option<&StoreCipher>,
257    table_name: &str,
258    key: T,
259) -> KeyRange<JsValue>
260where
261    T: SafeEncode,
262{
263    match store_cipher {
264        Some(cipher) => key.encode_to_range_secure(table_name, cipher),
265        None => key.encode_to_range(),
266    }
267}
268
269/// Builder for [`IndexeddbStateStore`].
270#[derive(Debug)]
271pub struct IndexeddbStateStoreBuilder {
272    name: Option<String>,
273    passphrase: Option<String>,
274    migration_conflict_strategy: MigrationConflictStrategy,
275}
276
277impl IndexeddbStateStoreBuilder {
278    fn new() -> Self {
279        Self {
280            name: None,
281            passphrase: None,
282            migration_conflict_strategy: MigrationConflictStrategy::BackupAndDrop,
283        }
284    }
285
286    /// Set the name for the indexeddb store to use, `state` is none given.
287    pub fn name(mut self, value: String) -> Self {
288        self.name = Some(value);
289        self
290    }
291
292    /// Set the password the indexeddb should be encrypted with.
293    ///
294    /// If not given, the DB is not encrypted.
295    pub fn passphrase(mut self, value: String) -> Self {
296        self.passphrase = Some(value);
297        self
298    }
299
300    /// The strategy to use when a merge conflict is found.
301    ///
302    /// See [`MigrationConflictStrategy`] for details.
303    pub fn migration_conflict_strategy(mut self, value: MigrationConflictStrategy) -> Self {
304        self.migration_conflict_strategy = value;
305        self
306    }
307
308    pub async fn build(self) -> Result<IndexeddbStateStore> {
309        let migration_strategy = self.migration_conflict_strategy.clone();
310        let name = self.name.unwrap_or_else(|| "state".to_owned());
311
312        let meta_name = format!("{name}::{}", keys::INTERNAL_STATE);
313
314        let (meta, store_cipher) = upgrade_meta_db(&meta_name, self.passphrase.as_deref()).await?;
315        let inner =
316            upgrade_inner_db(&name, store_cipher.as_deref(), migration_strategy, &meta).await?;
317
318        Ok(IndexeddbStateStore { name, inner, meta, store_cipher })
319    }
320}
321
322pub struct IndexeddbStateStore {
323    name: String,
324    pub(crate) inner: Database,
325    pub(crate) meta: Database,
326    pub(crate) store_cipher: Option<Arc<StoreCipher>>,
327}
328
329#[cfg(not(tarpaulin_include))]
330impl std::fmt::Debug for IndexeddbStateStore {
331    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
332        f.debug_struct("IndexeddbStateStore").field("name", &self.name).finish()
333    }
334}
335
336type Result<A, E = IndexeddbStateStoreError> = std::result::Result<A, E>;
337
338impl IndexeddbStateStore {
339    /// Generate a IndexeddbStateStoreBuilder with default parameters
340    pub fn builder() -> IndexeddbStateStoreBuilder {
341        IndexeddbStateStoreBuilder::new()
342    }
343
344    /// The version of the database containing the data.
345    pub fn version(&self) -> u32 {
346        self.inner.version() as u32
347    }
348
349    /// The version of the database containing the metadata.
350    pub fn meta_version(&self) -> u32 {
351        self.meta.version() as u32
352    }
353
354    /// Whether this database has any migration backups
355    pub async fn has_backups(&self) -> Result<bool> {
356        Ok(self
357            .meta
358            .transaction(keys::BACKUPS_META)
359            .with_mode(TransactionMode::Readonly)
360            .build()?
361            .object_store(keys::BACKUPS_META)?
362            .count()
363            .await?
364            > 0)
365    }
366
367    /// What's the database name of the latest backup<
368    pub async fn latest_backup(&self) -> Result<Option<String>> {
369        if let Some(mut cursor) = self
370            .meta
371            .transaction(keys::BACKUPS_META)
372            .with_mode(TransactionMode::Readonly)
373            .build()?
374            .object_store(keys::BACKUPS_META)?
375            .open_cursor()
376            .with_direction(CursorDirection::Prev)
377            .await?
378            && let Some(record) = cursor.next_record::<JsValue>().await?
379        {
380            Ok(record.as_string())
381        } else {
382            Ok(None)
383        }
384    }
385
386    /// Encrypt (if needs be) then JSON-serialize a value.
387    fn serialize_value(&self, event: &impl Serialize) -> Result<JsValue> {
388        serialize_value(self.store_cipher.as_deref(), event)
389    }
390
391    /// Deserialize a JSON value and then decrypt it (if needs be).
392    fn deserialize_value<T: DeserializeOwned>(&self, event: &JsValue) -> Result<T> {
393        deserialize_value(self.store_cipher.as_deref(), event)
394    }
395
396    fn encode_key<T>(&self, table_name: &str, key: T) -> JsValue
397    where
398        T: SafeEncode,
399    {
400        encode_key(self.store_cipher.as_deref(), table_name, key)
401    }
402
403    fn encode_to_range<T>(&self, table_name: &str, key: T) -> KeyRange<JsValue>
404    where
405        T: SafeEncode,
406    {
407        encode_to_range(self.store_cipher.as_deref(), table_name, key)
408    }
409
410    /// Get user IDs for the given room with the given memberships and stripped
411    /// state.
412    pub async fn get_user_ids_inner(
413        &self,
414        room_id: &RoomId,
415        memberships: RoomMemberships,
416        stripped: bool,
417    ) -> Result<Vec<OwnedUserId>> {
418        let store_name = if stripped { keys::STRIPPED_USER_IDS } else { keys::USER_IDS };
419
420        let tx = self.inner.transaction(store_name).with_mode(TransactionMode::Readonly).build()?;
421        let store = tx.object_store(store_name)?;
422        let range = self.encode_to_range(store_name, room_id);
423
424        let user_ids = if memberships.is_empty() {
425            // It should be faster to just get all user IDs in this case.
426            store
427                .get_all()
428                .with_query(&range)
429                .await?
430                .filter_map(Result::ok)
431                .filter_map(|f| self.deserialize_value::<RoomMember>(&f).ok().map(|m| m.user_id))
432                .collect::<Vec<_>>()
433        } else {
434            let mut user_ids = Vec::new();
435            let cursor = store.open_cursor().with_query(&range).await?;
436
437            if let Some(mut cursor) = cursor {
438                while let Some(value) = cursor.next_record().await? {
439                    let member = self.deserialize_value::<RoomMember>(&value)?;
440
441                    if memberships.matches(&member.membership) {
442                        user_ids.push(member.user_id);
443                    }
444                }
445            }
446
447            user_ids
448        };
449
450        Ok(user_ids)
451    }
452
453    async fn get_custom_value_for_js(&self, jskey: &JsValue) -> Result<Option<Vec<u8>>> {
454        self.inner
455            .transaction(keys::CUSTOM)
456            .with_mode(TransactionMode::Readonly)
457            .build()?
458            .object_store(keys::CUSTOM)?
459            .get(jskey)
460            .await?
461            .map(|f| self.deserialize_value(&f))
462            .transpose()
463    }
464
465    fn encode_kv_data_key(&self, key: StateStoreDataKey<'_>) -> JsValue {
466        // Use the key (prefix) for the table name as well, to keep encoded
467        // keys compatible for the sync token and filters, which were in
468        // separate tables initially.
469        match key {
470            StateStoreDataKey::SyncToken => {
471                self.encode_key(StateStoreDataKey::SYNC_TOKEN, StateStoreDataKey::SYNC_TOKEN)
472            }
473            StateStoreDataKey::SupportedVersions => self.encode_key(
474                StateStoreDataKey::SUPPORTED_VERSIONS,
475                StateStoreDataKey::SUPPORTED_VERSIONS,
476            ),
477            StateStoreDataKey::WellKnown => {
478                self.encode_key(keys::KV, StateStoreDataKey::WELL_KNOWN)
479            }
480            StateStoreDataKey::Filter(filter_name) => {
481                self.encode_key(StateStoreDataKey::FILTER, (StateStoreDataKey::FILTER, filter_name))
482            }
483            StateStoreDataKey::UserAvatarUrl(user_id) => {
484                self.encode_key(keys::KV, (StateStoreDataKey::USER_AVATAR_URL, user_id))
485            }
486            StateStoreDataKey::RecentlyVisitedRooms(user_id) => {
487                self.encode_key(keys::KV, (StateStoreDataKey::RECENTLY_VISITED_ROOMS, user_id))
488            }
489            StateStoreDataKey::UtdHookManagerData => {
490                self.encode_key(keys::KV, StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
491            }
492            StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
493                self.encode_key(keys::KV, StateStoreDataKey::ONE_TIME_KEY_ALREADY_UPLOADED)
494            }
495            StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
496                if let Some(thread_root) = thread_root {
497                    self.encode_key(
498                        keys::KV,
499                        (StateStoreDataKey::COMPOSER_DRAFT, (room_id, thread_root)),
500                    )
501                } else {
502                    self.encode_key(keys::KV, (StateStoreDataKey::COMPOSER_DRAFT, room_id))
503                }
504            }
505            StateStoreDataKey::SeenKnockRequests(room_id) => {
506                self.encode_key(keys::KV, (StateStoreDataKey::SEEN_KNOCK_REQUESTS, room_id))
507            }
508            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
509                self.encode_key(keys::KV, StateStoreDataKey::THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS)
510            }
511            StateStoreDataKey::HomeserverCapabilities => {
512                self.encode_key(keys::KV, StateStoreDataKey::HOMESERVER_CAPABILITIES)
513            }
514        }
515    }
516}
517
518/// A superset of [`QueuedRequest`] that also contains the room id, since we
519/// want to return them.
520#[derive(Serialize, Deserialize)]
521struct PersistedQueuedRequest {
522    /// In which room is this event going to be sent.
523    pub room_id: OwnedRoomId,
524
525    // All these fields are the same as in [`QueuedRequest`].
526    /// Kind. Optional because it might be missing from previous formats.
527    kind: Option<QueuedRequestKind>,
528    transaction_id: OwnedTransactionId,
529
530    pub error: Option<QueueWedgeError>,
531
532    priority: Option<usize>,
533
534    /// The time the original message was first attempted to be sent at.
535    #[serde(default = "created_now")]
536    created_at: MilliSecondsSinceUnixEpoch,
537
538    // Migrated fields: keep these private, they're not used anymore elsewhere in the code base.
539    /// Deprecated (from old format), now replaced with error field.
540    is_wedged: Option<bool>,
541
542    event: Option<SerializableEventContent>,
543}
544
545fn created_now() -> MilliSecondsSinceUnixEpoch {
546    MilliSecondsSinceUnixEpoch::now()
547}
548
549impl PersistedQueuedRequest {
550    fn into_queued_request(self) -> Option<QueuedRequest> {
551        let kind =
552            self.kind.or_else(|| self.event.map(|content| QueuedRequestKind::Event { content }))?;
553
554        let error = match self.is_wedged {
555            Some(true) => {
556                // Migrate to a generic error.
557                Some(QueueWedgeError::GenericApiError {
558                    msg: "local echo failed to send in a previous session".into(),
559                })
560            }
561            _ => self.error,
562        };
563
564        // By default, events without a priority have a priority of 0.
565        let priority = self.priority.unwrap_or(0);
566
567        Some(QueuedRequest {
568            kind,
569            transaction_id: self.transaction_id,
570            error,
571            priority,
572            created_at: self.created_at,
573        })
574    }
575}
576
577#[derive(Serialize, Deserialize, PartialEq)]
578struct PersistedThreadSubscription {
579    status: String,
580    bump_stamp: Option<u64>,
581}
582
583impl From<StoredThreadSubscription> for PersistedThreadSubscription {
584    fn from(value: StoredThreadSubscription) -> Self {
585        Self { status: value.status.as_str().to_owned(), bump_stamp: value.bump_stamp }
586    }
587}
588
589// Small hack to have the following macro invocation act as the appropriate
590// trait impl block on wasm, but still be compiled on non-wasm as a regular
591// impl block otherwise.
592//
593// The trait impl doesn't compile on non-wasm due to unfulfilled trait bounds,
594// this hack allows us to still have most of rust-analyzer's IDE functionality
595// within the impl block without having to set it up to check things against
596// the wasm target (which would disable many other parts of the codebase).
597#[cfg(target_family = "wasm")]
598macro_rules! impl_state_store {
599    ({ $($body:tt)* }) => {
600        #[async_trait(?Send)]
601        impl StateStore for IndexeddbStateStore {
602            type Error = IndexeddbStateStoreError;
603
604            $($body)*
605        }
606    };
607}
608
609#[cfg(not(target_family = "wasm"))]
610macro_rules! impl_state_store {
611    ({ $($body:tt)* }) => {
612        impl IndexeddbStateStore {
613            $($body)*
614        }
615    };
616}
617
618impl_state_store!({
619    async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
620        let encoded_key = self.encode_kv_data_key(key);
621
622        let value = self
623            .inner
624            .transaction(keys::KV)
625            .with_mode(TransactionMode::Readonly)
626            .build()?
627            .object_store(keys::KV)?
628            .get(&encoded_key)
629            .await?;
630
631        let value = match key {
632            StateStoreDataKey::SyncToken => value
633                .map(|f| self.deserialize_value::<String>(&f))
634                .transpose()?
635                .map(StateStoreDataValue::SyncToken),
636            StateStoreDataKey::SupportedVersions => value
637                .map(|f| self.deserialize_value::<TtlValue<SupportedVersionsResponse>>(&f))
638                .transpose()?
639                .map(StateStoreDataValue::SupportedVersions),
640            StateStoreDataKey::WellKnown => value
641                .map(|f| self.deserialize_value::<TtlValue<Option<WellKnownResponse>>>(&f))
642                .transpose()?
643                .map(StateStoreDataValue::WellKnown),
644            StateStoreDataKey::Filter(_) => value
645                .map(|f| self.deserialize_value::<String>(&f))
646                .transpose()?
647                .map(StateStoreDataValue::Filter),
648            StateStoreDataKey::UserAvatarUrl(_) => value
649                .map(|f| self.deserialize_value::<OwnedMxcUri>(&f))
650                .transpose()?
651                .map(StateStoreDataValue::UserAvatarUrl),
652            StateStoreDataKey::RecentlyVisitedRooms(_) => value
653                .map(|f| self.deserialize_value::<Vec<OwnedRoomId>>(&f))
654                .transpose()?
655                .map(StateStoreDataValue::RecentlyVisitedRooms),
656            StateStoreDataKey::UtdHookManagerData => value
657                .map(|f| self.deserialize_value::<GrowableBloom>(&f))
658                .transpose()?
659                .map(StateStoreDataValue::UtdHookManagerData),
660            StateStoreDataKey::OneTimeKeyAlreadyUploaded => value
661                .map(|f| self.deserialize_value::<bool>(&f))
662                .transpose()?
663                .map(|_| StateStoreDataValue::OneTimeKeyAlreadyUploaded),
664            StateStoreDataKey::ComposerDraft(_, _) => value
665                .map(|f| self.deserialize_value::<ComposerDraft>(&f))
666                .transpose()?
667                .map(StateStoreDataValue::ComposerDraft),
668            StateStoreDataKey::SeenKnockRequests(_) => value
669                .map(|f| self.deserialize_value::<BTreeMap<OwnedEventId, OwnedUserId>>(&f))
670                .transpose()?
671                .map(StateStoreDataValue::SeenKnockRequests),
672            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => value
673                .map(|f| self.deserialize_value::<Vec<ThreadSubscriptionCatchupToken>>(&f))
674                .transpose()?
675                .map(StateStoreDataValue::ThreadSubscriptionsCatchupTokens),
676            StateStoreDataKey::HomeserverCapabilities => value
677                .map(|f| self.deserialize_value::<TtlValue<Capabilities>>(&f))
678                .transpose()?
679                .map(StateStoreDataValue::HomeserverCapabilities),
680        };
681
682        Ok(value)
683    }
684
685    async fn set_kv_data(
686        &self,
687        key: StateStoreDataKey<'_>,
688        value: StateStoreDataValue,
689    ) -> Result<()> {
690        let encoded_key = self.encode_kv_data_key(key);
691
692        let serialized_value = match key {
693            StateStoreDataKey::SyncToken => self
694                .serialize_value(&value.into_sync_token().expect("Session data not a sync token")),
695            StateStoreDataKey::SupportedVersions => self.serialize_value(
696                &value
697                    .into_supported_versions()
698                    .expect("Session data not containing supported versions"),
699            ),
700            StateStoreDataKey::WellKnown => self.serialize_value(
701                &value.into_well_known().expect("Session data not containing well-known"),
702            ),
703            StateStoreDataKey::Filter(_) => {
704                self.serialize_value(&value.into_filter().expect("Session data not a filter"))
705            }
706            StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
707                &value.into_user_avatar_url().expect("Session data not an user avatar url"),
708            ),
709            StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
710                &value
711                    .into_recently_visited_rooms()
712                    .expect("Session data not a recently visited room list"),
713            ),
714            StateStoreDataKey::UtdHookManagerData => self.serialize_value(
715                &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
716            ),
717            StateStoreDataKey::OneTimeKeyAlreadyUploaded => self.serialize_value(&true),
718            StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
719                &value.into_composer_draft().expect("Session data not a composer draft"),
720            ),
721            StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
722                &value
723                    .into_seen_knock_requests()
724                    .expect("Session data is not a set of seen knock request ids"),
725            ),
726            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => self.serialize_value(
727                &value
728                    .into_thread_subscriptions_catchup_tokens()
729                    .expect("Session data is not a list of thread subscription catchup tokens"),
730            ),
731            StateStoreDataKey::HomeserverCapabilities => self.serialize_value(
732                &value
733                    .into_homeserver_capabilities()
734                    .expect("Session data is not a homeserver capabilities"),
735            ),
736        };
737
738        let tx = self.inner.transaction(keys::KV).with_mode(TransactionMode::Readwrite).build()?;
739
740        let obj = tx.object_store(keys::KV)?;
741
742        obj.put(&serialized_value?).with_key(encoded_key).build()?;
743
744        tx.commit().await?;
745
746        Ok(())
747    }
748
749    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
750        let encoded_key = self.encode_kv_data_key(key);
751
752        let tx = self.inner.transaction(keys::KV).with_mode(TransactionMode::Readwrite).build()?;
753        let obj = tx.object_store(keys::KV)?;
754
755        obj.delete(&encoded_key).build()?;
756
757        tx.commit().await?;
758
759        Ok(())
760    }
761
762    async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
763        let mut stores: HashSet<&'static str> = [
764            (changes.sync_token.is_some(), keys::KV),
765            (!changes.ambiguity_maps.is_empty(), keys::DISPLAY_NAMES),
766            (!changes.account_data.is_empty(), keys::ACCOUNT_DATA),
767            (!changes.presence.is_empty(), keys::PRESENCE),
768            (
769                !changes.profiles.is_empty() || !changes.profiles_to_delete.is_empty(),
770                keys::PROFILES,
771            ),
772            (!changes.room_account_data.is_empty(), keys::ROOM_ACCOUNT_DATA),
773            (!changes.receipts.is_empty(), keys::ROOM_EVENT_RECEIPTS),
774        ]
775        .iter()
776        .filter_map(|(id, key)| if *id { Some(*key) } else { None })
777        .collect();
778
779        if !changes.state.is_empty() {
780            stores.extend([
781                keys::ROOM_STATE,
782                keys::USER_IDS,
783                keys::STRIPPED_USER_IDS,
784                keys::STRIPPED_ROOM_STATE,
785                keys::PROFILES,
786            ]);
787        }
788
789        if !changes.redactions.is_empty() {
790            stores.extend([keys::ROOM_STATE, keys::ROOM_INFOS]);
791        }
792
793        if !changes.room_infos.is_empty() {
794            stores.insert(keys::ROOM_INFOS);
795        }
796
797        if !changes.stripped_state.is_empty() {
798            stores.extend([keys::STRIPPED_ROOM_STATE, keys::STRIPPED_USER_IDS]);
799        }
800
801        if !changes.receipts.is_empty() {
802            stores.extend([keys::ROOM_EVENT_RECEIPTS, keys::ROOM_USER_RECEIPTS])
803        }
804
805        if stores.is_empty() {
806            // nothing to do, quit early
807            return Ok(());
808        }
809
810        let stores: Vec<&'static str> = stores.into_iter().collect();
811        let tx = self.inner.transaction(stores).with_mode(TransactionMode::Readwrite).build()?;
812
813        if let Some(s) = &changes.sync_token {
814            tx.object_store(keys::KV)?
815                .put(&self.serialize_value(s)?)
816                .with_key(self.encode_kv_data_key(StateStoreDataKey::SyncToken))
817                .build()?;
818        }
819
820        if !changes.ambiguity_maps.is_empty() {
821            let store = tx.object_store(keys::DISPLAY_NAMES)?;
822            for (room_id, ambiguity_maps) in &changes.ambiguity_maps {
823                for (display_name, map) in ambiguity_maps {
824                    let key = self.encode_key(
825                        keys::DISPLAY_NAMES,
826                        (
827                            room_id,
828                            display_name
829                                .as_normalized_str()
830                                .unwrap_or_else(|| display_name.as_raw_str()),
831                        ),
832                    );
833
834                    store.put(&self.serialize_value(&map)?).with_key(key).build()?;
835                }
836            }
837        }
838
839        if !changes.account_data.is_empty() {
840            let store = tx.object_store(keys::ACCOUNT_DATA)?;
841            for (event_type, event) in &changes.account_data {
842                store
843                    .put(&self.serialize_value(&event)?)
844                    .with_key(self.encode_key(keys::ACCOUNT_DATA, event_type))
845                    .build()?;
846            }
847        }
848
849        if !changes.room_account_data.is_empty() {
850            let store = tx.object_store(keys::ROOM_ACCOUNT_DATA)?;
851            for (room, events) in &changes.room_account_data {
852                for (event_type, event) in events {
853                    let key = self.encode_key(keys::ROOM_ACCOUNT_DATA, (room, event_type));
854                    store.put(&self.serialize_value(&event)?).with_key(key).build()?;
855                }
856            }
857        }
858
859        if !changes.state.is_empty() {
860            let state = tx.object_store(keys::ROOM_STATE)?;
861            let profiles = tx.object_store(keys::PROFILES)?;
862            let user_ids = tx.object_store(keys::USER_IDS)?;
863            let stripped_state = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
864            let stripped_user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
865
866            for (room, user_ids) in &changes.profiles_to_delete {
867                for user_id in user_ids {
868                    let key = self.encode_key(keys::PROFILES, (room, user_id));
869                    profiles.delete(&key).build()?;
870                }
871            }
872
873            for (room, event_types) in &changes.state {
874                let profile_changes = changes.profiles.get(room);
875
876                for (event_type, events) in event_types {
877                    for (state_key, raw_event) in events {
878                        let key = self.encode_key(keys::ROOM_STATE, (room, event_type, state_key));
879                        state
880                            .put(&self.serialize_value(&raw_event)?)
881                            .with_key(key.clone())
882                            .build()?;
883                        stripped_state.delete(&key).build()?;
884
885                        if *event_type == StateEventType::RoomMember {
886                            let event =
887                                match raw_event.deserialize_as_unchecked::<SyncRoomMemberEvent>() {
888                                    Ok(ev) => ev,
889                                    Err(e) => {
890                                        let event_id: Option<String> =
891                                            raw_event.get_field("event_id").ok().flatten();
892                                        debug!(event_id, "Failed to deserialize member event: {e}");
893                                        continue;
894                                    }
895                                };
896
897                            let key = (room, state_key);
898
899                            stripped_user_ids
900                                .delete(&self.encode_key(keys::STRIPPED_USER_IDS, key))
901                                .build()?;
902
903                            user_ids
904                                .put(&self.serialize_value(&RoomMember::from(&event))?)
905                                .with_key(self.encode_key(keys::USER_IDS, key))
906                                .build()?;
907
908                            if let Some(profile) =
909                                profile_changes.and_then(|p| p.get(event.state_key()))
910                            {
911                                profiles
912                                    .put(&self.serialize_value(&profile)?)
913                                    .with_key(self.encode_key(keys::PROFILES, key))
914                                    .build()?;
915                            }
916                        }
917                    }
918                }
919            }
920        }
921
922        if !changes.room_infos.is_empty() {
923            let room_infos = tx.object_store(keys::ROOM_INFOS)?;
924            for (room_id, room_info) in &changes.room_infos {
925                room_infos
926                    .put(&self.serialize_value(&room_info)?)
927                    .with_key(self.encode_key(keys::ROOM_INFOS, room_id))
928                    .build()?;
929            }
930        }
931
932        if !changes.presence.is_empty() {
933            let store = tx.object_store(keys::PRESENCE)?;
934            for (sender, event) in &changes.presence {
935                store
936                    .put(&self.serialize_value(&event)?)
937                    .with_key(self.encode_key(keys::PRESENCE, sender))
938                    .build()?;
939            }
940        }
941
942        if !changes.stripped_state.is_empty() {
943            let store = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
944            let user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
945
946            for (room, event_types) in &changes.stripped_state {
947                for (event_type, events) in event_types {
948                    for (state_key, raw_event) in events {
949                        let key = self
950                            .encode_key(keys::STRIPPED_ROOM_STATE, (room, event_type, state_key));
951                        store.put(&self.serialize_value(&raw_event)?).with_key(key).build()?;
952
953                        if *event_type == StateEventType::RoomMember {
954                            let event = match raw_event
955                                .deserialize_as_unchecked::<StrippedRoomMemberEvent>()
956                            {
957                                Ok(ev) => ev,
958                                Err(e) => {
959                                    let event_id: Option<String> =
960                                        raw_event.get_field("event_id").ok().flatten();
961                                    debug!(
962                                        event_id,
963                                        "Failed to deserialize stripped member event: {e}"
964                                    );
965                                    continue;
966                                }
967                            };
968
969                            let key = (room, state_key);
970
971                            user_ids
972                                .put(&self.serialize_value(&RoomMember::from(&event))?)
973                                .with_key(self.encode_key(keys::STRIPPED_USER_IDS, key))
974                                .build()?;
975                        }
976                    }
977                }
978            }
979        }
980
981        if !changes.receipts.is_empty() {
982            let room_user_receipts = tx.object_store(keys::ROOM_USER_RECEIPTS)?;
983            let room_event_receipts = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
984
985            for (room, content) in &changes.receipts {
986                for (event_id, receipts) in &content.0 {
987                    for (receipt_type, receipts) in receipts {
988                        for (user_id, receipt) in receipts {
989                            let key = match receipt.thread.as_str() {
990                                Some(thread_id) => self.encode_key(
991                                    keys::ROOM_USER_RECEIPTS,
992                                    (room, receipt_type, thread_id, user_id),
993                                ),
994                                None => self.encode_key(
995                                    keys::ROOM_USER_RECEIPTS,
996                                    (room, receipt_type, user_id),
997                                ),
998                            };
999
1000                            if let Some((old_event, _)) =
1001                                room_user_receipts.get(&key).await?.and_then(|f| {
1002                                    self.deserialize_value::<(OwnedEventId, Receipt)>(&f).ok()
1003                                })
1004                            {
1005                                let key = match receipt.thread.as_str() {
1006                                    Some(thread_id) => self.encode_key(
1007                                        keys::ROOM_EVENT_RECEIPTS,
1008                                        (room, receipt_type, thread_id, old_event, user_id),
1009                                    ),
1010                                    None => self.encode_key(
1011                                        keys::ROOM_EVENT_RECEIPTS,
1012                                        (room, receipt_type, old_event, user_id),
1013                                    ),
1014                                };
1015                                room_event_receipts.delete(&key).build()?;
1016                            }
1017
1018                            room_user_receipts
1019                                .put(&self.serialize_value(&(event_id, receipt))?)
1020                                .with_key(key)
1021                                .build()?;
1022
1023                            // Add the receipt to the room event receipts
1024                            let key = match receipt.thread.as_str() {
1025                                Some(thread_id) => self.encode_key(
1026                                    keys::ROOM_EVENT_RECEIPTS,
1027                                    (room, receipt_type, thread_id, event_id, user_id),
1028                                ),
1029                                None => self.encode_key(
1030                                    keys::ROOM_EVENT_RECEIPTS,
1031                                    (room, receipt_type, event_id, user_id),
1032                                ),
1033                            };
1034                            room_event_receipts
1035                                .put(&self.serialize_value(&(user_id, receipt))?)
1036                                .with_key(key)
1037                                .build()?;
1038                        }
1039                    }
1040                }
1041            }
1042        }
1043
1044        if !changes.redactions.is_empty() {
1045            let state = tx.object_store(keys::ROOM_STATE)?;
1046            let room_info = tx.object_store(keys::ROOM_INFOS)?;
1047
1048            for (room_id, redactions) in &changes.redactions {
1049                let range = self.encode_to_range(keys::ROOM_STATE, room_id);
1050                let Some(mut cursor) = state.open_cursor().with_query(&range).await? else {
1051                    continue;
1052                };
1053
1054                let mut redaction_rules = None;
1055
1056                while let Some(value) = cursor.next_record().await? {
1057                    let Some(key) = cursor.key::<JsValue>()? else {
1058                        break;
1059                    };
1060
1061                    let raw_evt = self.deserialize_value::<Raw<AnySyncStateEvent>>(&value)?;
1062                    if let Ok(Some(event_id)) = raw_evt.get_field::<OwnedEventId>("event_id")
1063                        && let Some(redaction) = redactions.get(&event_id)
1064                    {
1065                        let redaction_rules = match &redaction_rules {
1066                            Some(r) => r,
1067                            None => {
1068                                let value = room_info
1069                                    .get(&self.encode_key(keys::ROOM_INFOS, room_id))
1070                                    .await?
1071                                    .and_then(|f| self.deserialize_value::<RoomInfo>(&f).ok())
1072                                    .map(|info| info.room_version_rules_or_default())
1073                                    .unwrap_or_else(|| {
1074                                        warn!(
1075                                            ?room_id,
1076                                            "Unable to get the room version rules, \
1077                                             defaulting to rules for room version \
1078                                             {ROOM_VERSION_FALLBACK}"
1079                                        );
1080                                        ROOM_VERSION_RULES_FALLBACK
1081                                    })
1082                                    .redaction;
1083                                redaction_rules.get_or_insert(value)
1084                            }
1085                        };
1086
1087                        let redacted = redact(
1088                            raw_evt.deserialize_as::<CanonicalJsonObject>()?,
1089                            redaction_rules,
1090                            Some(RedactedBecause::from_raw_event(redaction)?),
1091                        )
1092                        .map_err(StoreError::Redaction)?;
1093                        state.put(&self.serialize_value(&redacted)?).with_key(key).build()?;
1094                    }
1095                }
1096            }
1097        }
1098
1099        tx.commit().await.map_err(|e| e.into())
1100    }
1101
1102    async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1103        self.inner
1104            .transaction(keys::PRESENCE)
1105            .with_mode(TransactionMode::Readonly)
1106            .build()?
1107            .object_store(keys::PRESENCE)?
1108            .get(&self.encode_key(keys::PRESENCE, user_id))
1109            .await?
1110            .map(|f| self.deserialize_value(&f))
1111            .transpose()
1112    }
1113
1114    async fn get_presence_events(
1115        &self,
1116        user_ids: &[OwnedUserId],
1117    ) -> Result<Vec<Raw<PresenceEvent>>> {
1118        if user_ids.is_empty() {
1119            return Ok(Vec::new());
1120        }
1121
1122        let txn =
1123            self.inner.transaction(keys::PRESENCE).with_mode(TransactionMode::Readonly).build()?;
1124        let store = txn.object_store(keys::PRESENCE)?;
1125
1126        let mut events = Vec::with_capacity(user_ids.len());
1127
1128        for user_id in user_ids {
1129            if let Some(event) = store
1130                .get(&self.encode_key(keys::PRESENCE, user_id))
1131                .await?
1132                .map(|f| self.deserialize_value(&f))
1133                .transpose()?
1134            {
1135                events.push(event)
1136            }
1137        }
1138
1139        Ok(events)
1140    }
1141
1142    async fn get_state_event(
1143        &self,
1144        room_id: &RoomId,
1145        event_type: StateEventType,
1146        state_key: &str,
1147    ) -> Result<Option<RawAnySyncOrStrippedState>> {
1148        Ok(self
1149            .get_state_events_for_keys(room_id, event_type, &[state_key])
1150            .await?
1151            .into_iter()
1152            .next())
1153    }
1154
1155    async fn get_state_events(
1156        &self,
1157        room_id: &RoomId,
1158        event_type: StateEventType,
1159    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1160        let stripped_range =
1161            self.encode_to_range(keys::STRIPPED_ROOM_STATE, (room_id, &event_type));
1162        let stripped_events = self
1163            .inner
1164            .transaction(keys::STRIPPED_ROOM_STATE)
1165            .with_mode(TransactionMode::Readonly)
1166            .build()?
1167            .object_store(keys::STRIPPED_ROOM_STATE)?
1168            .get_all()
1169            .with_query(&stripped_range)
1170            .await?
1171            .filter_map(Result::ok)
1172            .filter_map(|f| {
1173                self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Stripped)
1174            })
1175            .collect::<Vec<_>>();
1176
1177        if !stripped_events.is_empty() {
1178            return Ok(stripped_events);
1179        }
1180
1181        let range = self.encode_to_range(keys::ROOM_STATE, (room_id, event_type));
1182        Ok(self
1183            .inner
1184            .transaction(keys::ROOM_STATE)
1185            .with_mode(TransactionMode::Readonly)
1186            .build()?
1187            .object_store(keys::ROOM_STATE)?
1188            .get_all()
1189            .with_query(&range)
1190            .await?
1191            .filter_map(Result::ok)
1192            .filter_map(|f| self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Sync))
1193            .collect::<Vec<_>>())
1194    }
1195
1196    async fn get_state_events_for_keys(
1197        &self,
1198        room_id: &RoomId,
1199        event_type: StateEventType,
1200        state_keys: &[&str],
1201    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1202        if state_keys.is_empty() {
1203            return Ok(Vec::new());
1204        }
1205
1206        let mut events = Vec::with_capacity(state_keys.len());
1207
1208        {
1209            let txn = self
1210                .inner
1211                .transaction(keys::STRIPPED_ROOM_STATE)
1212                .with_mode(TransactionMode::Readonly)
1213                .build()?;
1214            let store = txn.object_store(keys::STRIPPED_ROOM_STATE)?;
1215
1216            for state_key in state_keys {
1217                if let Some(event) =
1218                    store
1219                        .get(&self.encode_key(
1220                            keys::STRIPPED_ROOM_STATE,
1221                            (room_id, &event_type, state_key),
1222                        ))
1223                        .await?
1224                        .map(|f| self.deserialize_value(&f))
1225                        .transpose()?
1226                {
1227                    events.push(RawAnySyncOrStrippedState::Stripped(event));
1228                }
1229            }
1230
1231            if !events.is_empty() {
1232                return Ok(events);
1233            }
1234        }
1235
1236        let txn = self
1237            .inner
1238            .transaction(keys::ROOM_STATE)
1239            .with_mode(TransactionMode::Readonly)
1240            .build()?;
1241        let store = txn.object_store(keys::ROOM_STATE)?;
1242
1243        for state_key in state_keys {
1244            if let Some(event) = store
1245                .get(&self.encode_key(keys::ROOM_STATE, (room_id, &event_type, state_key)))
1246                .await?
1247                .map(|f| self.deserialize_value(&f))
1248                .transpose()?
1249            {
1250                events.push(RawAnySyncOrStrippedState::Sync(event));
1251            }
1252        }
1253
1254        Ok(events)
1255    }
1256
1257    async fn get_profile(
1258        &self,
1259        room_id: &RoomId,
1260        user_id: &UserId,
1261    ) -> Result<Option<MinimalRoomMemberEvent>> {
1262        self.inner
1263            .transaction(keys::PROFILES)
1264            .with_mode(TransactionMode::Readonly)
1265            .build()?
1266            .object_store(keys::PROFILES)?
1267            .get(&self.encode_key(keys::PROFILES, (room_id, user_id)))
1268            .await?
1269            .map(|f| self.deserialize_value(&f))
1270            .transpose()
1271    }
1272
1273    async fn get_profiles<'a>(
1274        &self,
1275        room_id: &RoomId,
1276        user_ids: &'a [OwnedUserId],
1277    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1278        if user_ids.is_empty() {
1279            return Ok(BTreeMap::new());
1280        }
1281
1282        let txn =
1283            self.inner.transaction(keys::PROFILES).with_mode(TransactionMode::Readonly).build()?;
1284        let store = txn.object_store(keys::PROFILES)?;
1285
1286        let mut profiles = BTreeMap::new();
1287        for user_id in user_ids {
1288            if let Some(profile) = store
1289                .get(&self.encode_key(keys::PROFILES, (room_id, user_id)))
1290                .await?
1291                .map(|f| self.deserialize_value(&f))
1292                .transpose()?
1293            {
1294                profiles.insert(user_id.as_ref(), profile);
1295            }
1296        }
1297
1298        Ok(profiles)
1299    }
1300
1301    async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1302        let transaction = self
1303            .inner
1304            .transaction(keys::ROOM_INFOS)
1305            .with_mode(TransactionMode::Readonly)
1306            .build()?;
1307
1308        let object_store = transaction.object_store(keys::ROOM_INFOS)?;
1309
1310        Ok(match room_load_settings {
1311            RoomLoadSettings::All => object_store
1312                .get_all()
1313                .await?
1314                .map(|room_info| self.deserialize_value::<RoomInfo>(&room_info?))
1315                .collect::<Result<_>>()?,
1316
1317            RoomLoadSettings::One(room_id) => {
1318                match object_store.get(&self.encode_key(keys::ROOM_INFOS, room_id)).await? {
1319                    Some(room_info) => vec![self.deserialize_value::<RoomInfo>(&room_info)?],
1320                    None => vec![],
1321                }
1322            }
1323        })
1324    }
1325
1326    async fn get_users_with_display_name(
1327        &self,
1328        room_id: &RoomId,
1329        display_name: &DisplayName,
1330    ) -> Result<BTreeSet<OwnedUserId>> {
1331        self.inner
1332            .transaction(keys::DISPLAY_NAMES)
1333            .with_mode(TransactionMode::Readonly)
1334            .build()?
1335            .object_store(keys::DISPLAY_NAMES)?
1336            .get(&self.encode_key(
1337                keys::DISPLAY_NAMES,
1338                (
1339                    room_id,
1340                    display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1341                ),
1342            ))
1343            .await?
1344            .map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
1345            .unwrap_or_else(|| Ok(Default::default()))
1346    }
1347
1348    async fn get_users_with_display_names<'a>(
1349        &self,
1350        room_id: &RoomId,
1351        display_names: &'a [DisplayName],
1352    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1353        let mut map = HashMap::new();
1354
1355        if display_names.is_empty() {
1356            return Ok(map);
1357        }
1358
1359        let txn = self
1360            .inner
1361            .transaction(keys::DISPLAY_NAMES)
1362            .with_mode(TransactionMode::Readonly)
1363            .build()?;
1364        let store = txn.object_store(keys::DISPLAY_NAMES)?;
1365
1366        for display_name in display_names {
1367            if let Some(user_ids) = store
1368                .get(
1369                    &self.encode_key(
1370                        keys::DISPLAY_NAMES,
1371                        (
1372                            room_id,
1373                            display_name
1374                                .as_normalized_str()
1375                                .unwrap_or_else(|| display_name.as_raw_str()),
1376                        ),
1377                    ),
1378                )
1379                .await?
1380                .map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
1381                .transpose()?
1382            {
1383                map.insert(display_name, user_ids);
1384            }
1385        }
1386
1387        Ok(map)
1388    }
1389
1390    async fn get_account_data_event(
1391        &self,
1392        event_type: GlobalAccountDataEventType,
1393    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1394        self.inner
1395            .transaction(keys::ACCOUNT_DATA)
1396            .with_mode(TransactionMode::Readonly)
1397            .build()?
1398            .object_store(keys::ACCOUNT_DATA)?
1399            .get(&self.encode_key(keys::ACCOUNT_DATA, event_type))
1400            .await?
1401            .map(|f| self.deserialize_value(&f))
1402            .transpose()
1403    }
1404
1405    async fn get_room_account_data_event(
1406        &self,
1407        room_id: &RoomId,
1408        event_type: RoomAccountDataEventType,
1409    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1410        self.inner
1411            .transaction(keys::ROOM_ACCOUNT_DATA)
1412            .with_mode(TransactionMode::Readonly)
1413            .build()?
1414            .object_store(keys::ROOM_ACCOUNT_DATA)?
1415            .get(&self.encode_key(keys::ROOM_ACCOUNT_DATA, (room_id, event_type)))
1416            .await?
1417            .map(|f| self.deserialize_value(&f))
1418            .transpose()
1419    }
1420
1421    async fn get_user_room_receipt_event(
1422        &self,
1423        room_id: &RoomId,
1424        receipt_type: ReceiptType,
1425        thread: ReceiptThread,
1426        user_id: &UserId,
1427    ) -> Result<Option<(OwnedEventId, Receipt)>> {
1428        let key = match thread.as_str() {
1429            Some(thread_id) => self
1430                .encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, thread_id, user_id)),
1431            None => self.encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, user_id)),
1432        };
1433        self.inner
1434            .transaction(keys::ROOM_USER_RECEIPTS)
1435            .with_mode(TransactionMode::Readonly)
1436            .build()?
1437            .object_store(keys::ROOM_USER_RECEIPTS)?
1438            .get(&key)
1439            .await?
1440            .map(|f| self.deserialize_value(&f))
1441            .transpose()
1442    }
1443
1444    async fn get_event_room_receipt_events(
1445        &self,
1446        room_id: &RoomId,
1447        receipt_type: ReceiptType,
1448        thread: ReceiptThread,
1449        event_id: &EventId,
1450    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1451        let range = match thread.as_str() {
1452            Some(thread_id) => self.encode_to_range(
1453                keys::ROOM_EVENT_RECEIPTS,
1454                (room_id, receipt_type, thread_id, event_id),
1455            ),
1456            None => {
1457                self.encode_to_range(keys::ROOM_EVENT_RECEIPTS, (room_id, receipt_type, event_id))
1458            }
1459        };
1460        let tx = self
1461            .inner
1462            .transaction(keys::ROOM_EVENT_RECEIPTS)
1463            .with_mode(TransactionMode::Readonly)
1464            .build()?;
1465        let store = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
1466
1467        Ok(store
1468            .get_all()
1469            .with_query(&range)
1470            .await?
1471            .filter_map(Result::ok)
1472            .filter_map(|f| self.deserialize_value(&f).ok())
1473            .collect::<Vec<_>>())
1474    }
1475
1476    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1477        let jskey = &JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1478        self.get_custom_value_for_js(jskey).await
1479    }
1480
1481    async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1482        let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1483
1484        let prev = self.get_custom_value_for_js(&jskey).await?;
1485
1486        let tx =
1487            self.inner.transaction(keys::CUSTOM).with_mode(TransactionMode::Readwrite).build()?;
1488
1489        tx.object_store(keys::CUSTOM)?
1490            .put(&self.serialize_value(&value)?)
1491            .with_key(jskey)
1492            .build()?;
1493
1494        tx.commit().await.map_err(IndexeddbStateStoreError::from)?;
1495        Ok(prev)
1496    }
1497
1498    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1499        let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1500
1501        let prev = self.get_custom_value_for_js(&jskey).await?;
1502
1503        let tx =
1504            self.inner.transaction(keys::CUSTOM).with_mode(TransactionMode::Readwrite).build()?;
1505
1506        tx.object_store(keys::CUSTOM)?.delete(&jskey).build()?;
1507
1508        tx.commit().await.map_err(IndexeddbStateStoreError::from)?;
1509        Ok(prev)
1510    }
1511
1512    async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1513        // All the stores which use a RoomId as their key (and nothing additional).
1514        let direct_stores = [keys::ROOM_INFOS, keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE];
1515
1516        // All the stores which use a RoomId as the first part of their key, but may
1517        // have some additional data in the key.
1518        let prefixed_stores = [
1519            keys::PROFILES,
1520            keys::DISPLAY_NAMES,
1521            keys::USER_IDS,
1522            keys::ROOM_STATE,
1523            keys::ROOM_ACCOUNT_DATA,
1524            keys::ROOM_EVENT_RECEIPTS,
1525            keys::ROOM_USER_RECEIPTS,
1526            keys::STRIPPED_ROOM_STATE,
1527            keys::STRIPPED_USER_IDS,
1528            keys::THREAD_SUBSCRIPTIONS,
1529        ];
1530
1531        let all_stores = {
1532            let mut v = Vec::new();
1533            v.extend(prefixed_stores);
1534            v.extend(direct_stores);
1535            v
1536        };
1537
1538        let tx =
1539            self.inner.transaction(all_stores).with_mode(TransactionMode::Readwrite).build()?;
1540
1541        for store_name in direct_stores {
1542            tx.object_store(store_name)?.delete(&self.encode_key(store_name, room_id)).build()?;
1543        }
1544
1545        for store_name in prefixed_stores {
1546            let store = tx.object_store(store_name)?;
1547            let range = self.encode_to_range(store_name, room_id);
1548            for key in store.get_all_keys::<JsValue>().with_query(&range).await? {
1549                store.delete(&key?).build()?;
1550            }
1551        }
1552
1553        tx.commit().await.map_err(|e| e.into())
1554    }
1555
1556    async fn get_user_ids(
1557        &self,
1558        room_id: &RoomId,
1559        memberships: RoomMemberships,
1560    ) -> Result<Vec<OwnedUserId>> {
1561        let ids = self.get_user_ids_inner(room_id, memberships, true).await?;
1562        if !ids.is_empty() {
1563            return Ok(ids);
1564        }
1565        self.get_user_ids_inner(room_id, memberships, false).await
1566    }
1567
1568    async fn save_send_queue_request(
1569        &self,
1570        room_id: &RoomId,
1571        transaction_id: OwnedTransactionId,
1572        created_at: MilliSecondsSinceUnixEpoch,
1573        kind: QueuedRequestKind,
1574        priority: usize,
1575    ) -> Result<()> {
1576        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1577
1578        let tx = self
1579            .inner
1580            .transaction(keys::ROOM_SEND_QUEUE)
1581            .with_mode(TransactionMode::Readwrite)
1582            .build()?;
1583
1584        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1585
1586        // We store an encoded vector of the queued requests, with their transaction
1587        // ids.
1588
1589        // Reload the previous vector for this room, or create an empty one.
1590        let prev = obj.get(&encoded_key).await?;
1591
1592        let mut prev = prev.map_or_else(
1593            || Ok(Vec::new()),
1594            |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1595        )?;
1596
1597        // Push the new request.
1598        prev.push(PersistedQueuedRequest {
1599            room_id: room_id.to_owned(),
1600            kind: Some(kind),
1601            transaction_id,
1602            error: None,
1603            is_wedged: None,
1604            event: None,
1605            priority: Some(priority),
1606            created_at,
1607        });
1608
1609        // Save the new vector into db.
1610        obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1611
1612        tx.commit().await?;
1613
1614        Ok(())
1615    }
1616
1617    async fn update_send_queue_request(
1618        &self,
1619        room_id: &RoomId,
1620        transaction_id: &TransactionId,
1621        kind: QueuedRequestKind,
1622    ) -> Result<bool> {
1623        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1624
1625        let tx = self
1626            .inner
1627            .transaction(keys::ROOM_SEND_QUEUE)
1628            .with_mode(TransactionMode::Readwrite)
1629            .build()?;
1630
1631        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1632
1633        // We store an encoded vector of the queued requests, with their transaction
1634        // ids.
1635
1636        // Reload the previous vector for this room, or create an empty one.
1637        let prev = obj.get(&encoded_key).await?;
1638
1639        let mut prev = prev.map_or_else(
1640            || Ok(Vec::new()),
1641            |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1642        )?;
1643
1644        // Modify the one request.
1645        if let Some(entry) = prev.iter_mut().find(|entry| entry.transaction_id == transaction_id) {
1646            entry.kind = Some(kind);
1647            // Reset the error state.
1648            entry.error = None;
1649            // Remove migrated fields.
1650            entry.is_wedged = None;
1651            entry.event = None;
1652
1653            // Save the new vector into db.
1654            obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1655            tx.commit().await?;
1656
1657            Ok(true)
1658        } else {
1659            Ok(false)
1660        }
1661    }
1662
1663    async fn remove_send_queue_request(
1664        &self,
1665        room_id: &RoomId,
1666        transaction_id: &TransactionId,
1667    ) -> Result<bool> {
1668        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1669
1670        let tx = self
1671            .inner
1672            .transaction([keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE])
1673            .with_mode(TransactionMode::Readwrite)
1674            .build()?;
1675
1676        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1677
1678        // We store an encoded vector of the queued requests, with their transaction
1679        // ids.
1680
1681        // Reload the previous vector for this room.
1682        if let Some(val) = obj.get(&encoded_key).await? {
1683            let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
1684            if let Some(pos) = prev.iter().position(|item| item.transaction_id == transaction_id) {
1685                prev.remove(pos);
1686
1687                if prev.is_empty() {
1688                    obj.delete(&encoded_key).build()?;
1689                } else {
1690                    obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1691                }
1692
1693                tx.commit().await?;
1694                return Ok(true);
1695            }
1696        }
1697
1698        Ok(false)
1699    }
1700
1701    async fn load_send_queue_requests(&self, room_id: &RoomId) -> Result<Vec<QueuedRequest>> {
1702        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1703
1704        // We store an encoded vector of the queued requests, with their transaction
1705        // ids.
1706        let prev = self
1707            .inner
1708            .transaction(keys::ROOM_SEND_QUEUE)
1709            .with_mode(TransactionMode::Readwrite)
1710            .build()?
1711            .object_store(keys::ROOM_SEND_QUEUE)?
1712            .get(&encoded_key)
1713            .await?;
1714
1715        let mut prev = prev.map_or_else(
1716            || Ok(Vec::new()),
1717            |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1718        )?;
1719
1720        // Inverted stable ordering on priority.
1721        prev.sort_by_key(|item| Reverse(item.priority.unwrap_or(0)));
1722
1723        Ok(prev.into_iter().filter_map(PersistedQueuedRequest::into_queued_request).collect())
1724    }
1725
1726    async fn update_send_queue_request_status(
1727        &self,
1728        room_id: &RoomId,
1729        transaction_id: &TransactionId,
1730        error: Option<QueueWedgeError>,
1731    ) -> Result<()> {
1732        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1733
1734        let tx = self
1735            .inner
1736            .transaction(keys::ROOM_SEND_QUEUE)
1737            .with_mode(TransactionMode::Readwrite)
1738            .build()?;
1739
1740        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1741
1742        if let Some(val) = obj.get(&encoded_key).await? {
1743            let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
1744            if let Some(request) =
1745                prev.iter_mut().find(|item| item.transaction_id == transaction_id)
1746            {
1747                request.is_wedged = None;
1748                request.error = error;
1749                obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1750            }
1751        }
1752
1753        tx.commit().await?;
1754
1755        Ok(())
1756    }
1757
1758    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>> {
1759        let tx = self
1760            .inner
1761            .transaction(keys::ROOM_SEND_QUEUE)
1762            .with_mode(TransactionMode::Readwrite)
1763            .build()?;
1764
1765        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1766
1767        let all_entries = obj
1768            .get_all()
1769            .await?
1770            .map(|item| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&item?))
1771            .collect::<Result<Vec<Vec<PersistedQueuedRequest>>, _>>()?
1772            .into_iter()
1773            .flat_map(|vec| vec.into_iter().map(|item| item.room_id))
1774            .collect::<BTreeSet<_>>();
1775
1776        Ok(all_entries.into_iter().collect())
1777    }
1778
1779    async fn save_dependent_queued_request(
1780        &self,
1781        room_id: &RoomId,
1782        parent_txn_id: &TransactionId,
1783        own_txn_id: ChildTransactionId,
1784        created_at: MilliSecondsSinceUnixEpoch,
1785        content: DependentQueuedRequestKind,
1786    ) -> Result<()> {
1787        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1788
1789        let tx = self
1790            .inner
1791            .transaction(keys::DEPENDENT_SEND_QUEUE)
1792            .with_mode(TransactionMode::Readwrite)
1793            .build()?;
1794
1795        let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1796
1797        // We store an encoded vector of the dependent requests.
1798        // Reload the previous vector for this room, or create an empty one.
1799        let prev = obj.get(&encoded_key).await?;
1800
1801        let mut prev = prev.map_or_else(
1802            || Ok(Vec::new()),
1803            |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1804        )?;
1805
1806        // Push the new request.
1807        prev.push(DependentQueuedRequest {
1808            kind: content,
1809            parent_transaction_id: parent_txn_id.to_owned(),
1810            own_transaction_id: own_txn_id,
1811            parent_key: None,
1812            created_at,
1813        });
1814
1815        // Save the new vector into db.
1816        obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1817
1818        tx.commit().await?;
1819
1820        Ok(())
1821    }
1822
1823    async fn update_dependent_queued_request(
1824        &self,
1825        room_id: &RoomId,
1826        own_transaction_id: &ChildTransactionId,
1827        new_content: DependentQueuedRequestKind,
1828    ) -> Result<bool> {
1829        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1830
1831        let tx = self
1832            .inner
1833            .transaction(keys::DEPENDENT_SEND_QUEUE)
1834            .with_mode(TransactionMode::Readwrite)
1835            .build()?;
1836
1837        let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1838
1839        // We store an encoded vector of the dependent requests.
1840        // Reload the previous vector for this room, or create an empty one.
1841        let prev = obj.get(&encoded_key).await?;
1842
1843        let mut prev = prev.map_or_else(
1844            || Ok(Vec::new()),
1845            |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1846        )?;
1847
1848        // Modify the dependent request, if found.
1849        let mut found = false;
1850        for entry in prev.iter_mut() {
1851            if entry.own_transaction_id == *own_transaction_id {
1852                found = true;
1853                entry.kind = new_content;
1854                break;
1855            }
1856        }
1857
1858        if found {
1859            obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1860            tx.commit().await?;
1861        }
1862
1863        Ok(found)
1864    }
1865
1866    async fn mark_dependent_queued_requests_as_ready(
1867        &self,
1868        room_id: &RoomId,
1869        parent_txn_id: &TransactionId,
1870        parent_key: SentRequestKey,
1871    ) -> Result<usize> {
1872        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1873
1874        let tx = self
1875            .inner
1876            .transaction(keys::DEPENDENT_SEND_QUEUE)
1877            .with_mode(TransactionMode::Readwrite)
1878            .build()?;
1879
1880        let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1881
1882        // We store an encoded vector of the dependent requests.
1883        // Reload the previous vector for this room, or create an empty one.
1884        let prev = obj.get(&encoded_key).await?;
1885
1886        let mut prev = prev.map_or_else(
1887            || Ok(Vec::new()),
1888            |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1889        )?;
1890
1891        // Modify all requests that match.
1892        let mut num_updated = 0;
1893        for entry in prev.iter_mut().filter(|entry| entry.parent_transaction_id == parent_txn_id) {
1894            entry.parent_key = Some(parent_key.clone());
1895            num_updated += 1;
1896        }
1897
1898        if num_updated > 0 {
1899            obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1900            tx.commit().await?;
1901        }
1902
1903        Ok(num_updated)
1904    }
1905
1906    async fn remove_dependent_queued_request(
1907        &self,
1908        room_id: &RoomId,
1909        txn_id: &ChildTransactionId,
1910    ) -> Result<bool> {
1911        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1912
1913        let tx = self
1914            .inner
1915            .transaction(keys::DEPENDENT_SEND_QUEUE)
1916            .with_mode(TransactionMode::Readwrite)
1917            .build()?;
1918
1919        let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1920
1921        // We store an encoded vector of the dependent requests.
1922        // Reload the previous vector for this room.
1923        if let Some(val) = obj.get(&encoded_key).await? {
1924            let mut prev = self.deserialize_value::<Vec<DependentQueuedRequest>>(&val)?;
1925            if let Some(pos) = prev.iter().position(|item| item.own_transaction_id == *txn_id) {
1926                prev.remove(pos);
1927
1928                if prev.is_empty() {
1929                    obj.delete(&encoded_key).build()?;
1930                } else {
1931                    obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1932                }
1933
1934                tx.commit().await?;
1935                return Ok(true);
1936            }
1937        }
1938
1939        Ok(false)
1940    }
1941
1942    async fn load_dependent_queued_requests(
1943        &self,
1944        room_id: &RoomId,
1945    ) -> Result<Vec<DependentQueuedRequest>> {
1946        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1947
1948        // We store an encoded vector of the dependent requests.
1949        let prev = self
1950            .inner
1951            .transaction(keys::DEPENDENT_SEND_QUEUE)
1952            .with_mode(TransactionMode::Readwrite)
1953            .build()?
1954            .object_store(keys::DEPENDENT_SEND_QUEUE)?
1955            .get(&encoded_key)
1956            .await?;
1957
1958        prev.map_or_else(
1959            || Ok(Vec::new()),
1960            |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1961        )
1962    }
1963
1964    async fn upsert_thread_subscriptions(
1965        &self,
1966        updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1967    ) -> Result<()> {
1968        let tx = self
1969            .inner
1970            .transaction(keys::THREAD_SUBSCRIPTIONS)
1971            .with_mode(TransactionMode::Readwrite)
1972            .build()?;
1973        let obj = tx.object_store(keys::THREAD_SUBSCRIPTIONS)?;
1974
1975        for (room_id, thread_id, subscription) in updates {
1976            let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room_id, thread_id));
1977            let mut new = PersistedThreadSubscription::from(subscription);
1978
1979            // See if there's a previous subscription.
1980            if let Some(previous_value) = obj.get(&encoded_key).await? {
1981                let previous: PersistedThreadSubscription =
1982                    self.deserialize_value(&previous_value)?;
1983
1984                // If the previous status is the same as the new one, don't do anything.
1985                if new == previous {
1986                    continue;
1987                }
1988                if !compare_thread_subscription_bump_stamps(
1989                    previous.bump_stamp,
1990                    &mut new.bump_stamp,
1991                ) {
1992                    continue;
1993                }
1994            }
1995
1996            let serialized_value = self.serialize_value(&new);
1997            obj.put(&serialized_value?).with_key(encoded_key).build()?;
1998        }
1999
2000        tx.commit().await?;
2001
2002        Ok(())
2003    }
2004
2005    async fn load_thread_subscription(
2006        &self,
2007        room: &RoomId,
2008        thread_id: &EventId,
2009    ) -> Result<Option<StoredThreadSubscription>> {
2010        let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
2011
2012        let js_value = self
2013            .inner
2014            .transaction(keys::THREAD_SUBSCRIPTIONS)
2015            .with_mode(TransactionMode::Readonly)
2016            .build()?
2017            .object_store(keys::THREAD_SUBSCRIPTIONS)?
2018            .get(&encoded_key)
2019            .await?;
2020
2021        let Some(js_value) = js_value else {
2022            // We didn't have a previous subscription for this thread.
2023            return Ok(None);
2024        };
2025
2026        let sub: PersistedThreadSubscription = self.deserialize_value(&js_value)?;
2027
2028        let status = ThreadSubscriptionStatus::from_str(&sub.status).map_err(|_| {
2029            StoreError::InvalidData {
2030                details: format!(
2031                    "invalid thread status for room {room} and thread {thread_id}: {}",
2032                    sub.status
2033                ),
2034            }
2035        })?;
2036
2037        Ok(Some(StoredThreadSubscription { status, bump_stamp: sub.bump_stamp }))
2038    }
2039
2040    async fn remove_thread_subscription(&self, room: &RoomId, thread_id: &EventId) -> Result<()> {
2041        let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
2042
2043        let transaction = self
2044            .inner
2045            .transaction(keys::THREAD_SUBSCRIPTIONS)
2046            .with_mode(TransactionMode::Readwrite)
2047            .build()?;
2048        transaction.object_store(keys::THREAD_SUBSCRIPTIONS)?.delete(&encoded_key).await?;
2049        transaction.commit().await?;
2050
2051        Ok(())
2052    }
2053
2054    #[allow(clippy::unused_async)]
2055    async fn optimize(&self) -> Result<()> {
2056        Ok(())
2057    }
2058
2059    #[allow(clippy::unused_async)]
2060    async fn get_size(&self) -> Result<Option<usize>> {
2061        Ok(None)
2062    }
2063});
2064
2065/// A room member.
2066#[derive(Debug, Serialize, Deserialize)]
2067struct RoomMember {
2068    user_id: OwnedUserId,
2069    membership: MembershipState,
2070}
2071
2072impl From<&SyncStateEvent<RoomMemberEventContent>> for RoomMember {
2073    fn from(event: &SyncStateEvent<RoomMemberEventContent>) -> Self {
2074        Self { user_id: event.state_key().clone(), membership: event.membership().clone() }
2075    }
2076}
2077
2078impl From<&StrippedRoomMemberEvent> for RoomMember {
2079    fn from(event: &StrippedRoomMemberEvent) -> Self {
2080        Self { user_id: event.state_key.clone(), membership: event.content.membership.clone() }
2081    }
2082}
2083
2084#[cfg(test)]
2085mod migration_tests {
2086    use assert_matches2::assert_matches;
2087    use matrix_sdk_base::store::{QueuedRequestKind, SerializableEventContent};
2088    use ruma::{
2089        OwnedRoomId, OwnedTransactionId, TransactionId,
2090        events::room::message::RoomMessageEventContent, room_id,
2091    };
2092    use serde::{Deserialize, Serialize};
2093
2094    use crate::state_store::PersistedQueuedRequest;
2095
2096    #[derive(Serialize, Deserialize)]
2097    struct OldPersistedQueuedRequest {
2098        room_id: OwnedRoomId,
2099        event: SerializableEventContent,
2100        transaction_id: OwnedTransactionId,
2101        is_wedged: bool,
2102    }
2103
2104    // We now persist an error when an event failed to send instead of just a
2105    // boolean. To support that, `PersistedQueueEvent` changed a bool to
2106    // Option<bool>, ensures that this work properly.
2107    #[test]
2108    fn test_migrating_persisted_queue_event_serialization() {
2109        let room_a_id = room_id!("!room_a:dummy.local");
2110        let transaction_id = TransactionId::new();
2111        let content =
2112            SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())
2113                .unwrap();
2114
2115        let old_persisted_queue_event = OldPersistedQueuedRequest {
2116            room_id: room_a_id.to_owned(),
2117            event: content,
2118            transaction_id: transaction_id.clone(),
2119            is_wedged: true,
2120        };
2121
2122        let serialized_persisted = serde_json::to_vec(&old_persisted_queue_event).unwrap();
2123
2124        // Load it with the new version.
2125        let new_persisted: PersistedQueuedRequest =
2126            serde_json::from_slice(&serialized_persisted).unwrap();
2127
2128        assert_eq!(new_persisted.is_wedged, Some(true));
2129        assert!(new_persisted.error.is_none());
2130
2131        assert!(new_persisted.event.is_some());
2132        assert!(new_persisted.kind.is_none());
2133
2134        let queued = new_persisted.into_queued_request().unwrap();
2135        assert_matches!(queued.kind, QueuedRequestKind::Event { .. });
2136        assert_eq!(queued.transaction_id, transaction_id);
2137        assert!(queued.error.is_some());
2138    }
2139}
2140
2141#[cfg(all(test, target_family = "wasm"))]
2142mod tests {
2143    #[cfg(target_family = "wasm")]
2144    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
2145
2146    use matrix_sdk_base::statestore_integration_tests;
2147    use uuid::Uuid;
2148
2149    use super::{IndexeddbStateStore, Result};
2150
2151    async fn get_store() -> Result<IndexeddbStateStore> {
2152        let db_name = format!("test-state-plain-{}", Uuid::new_v4().as_hyphenated());
2153        Ok(IndexeddbStateStore::builder().name(db_name).build().await?)
2154    }
2155
2156    statestore_integration_tests!();
2157}
2158
2159#[cfg(all(test, target_family = "wasm"))]
2160mod encrypted_tests {
2161    #[cfg(target_family = "wasm")]
2162    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
2163
2164    use matrix_sdk_base::statestore_integration_tests;
2165    use uuid::Uuid;
2166
2167    use super::{IndexeddbStateStore, Result};
2168
2169    async fn get_store() -> Result<IndexeddbStateStore> {
2170        let db_name = format!("test-state-encrypted-{}", Uuid::new_v4().as_hyphenated());
2171        let passphrase = format!("some_passphrase-{}", Uuid::new_v4().as_hyphenated());
2172        Ok(IndexeddbStateStore::builder().name(db_name).passphrase(passphrase).build().await?)
2173    }
2174
2175    statestore_integration_tests!();
2176}