Skip to main content

matrix_sdk_indexeddb/state_store/
mod.rs

1// Copyright 2021 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    cmp::Reverse,
17    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
18    str::FromStr as _,
19    sync::Arc,
20};
21
22use async_trait::async_trait;
23use gloo_utils::format::JsValueSerdeExt;
24use growable_bloom_filter::GrowableBloom;
25use indexed_db_futures::{
26    KeyRange, cursor::CursorDirection, database::Database, error::OpenDbError, prelude::*,
27    transaction::TransactionMode,
28};
29use matrix_sdk_base::{
30    MinimalRoomMemberEvent, ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK, RoomInfo,
31    RoomMemberships, StateStoreDataKey, StateStoreDataValue, ThreadSubscriptionCatchupToken,
32    deserialized_responses::{DisplayName, RawAnySyncOrStrippedState},
33    store::{
34        ChildTransactionId, ComposerDraft, DependentQueuedRequest, DependentQueuedRequestKind,
35        QueuedRequest, QueuedRequestKind, RoomLoadSettings, SentRequestKey,
36        SerializableEventContent, StateChanges, StateStore, StoreError, StoredThreadSubscription,
37        SupportedVersionsResponse, ThreadSubscriptionStatus, TtlStoreValue, WellKnownResponse,
38        compare_thread_subscription_bump_stamps,
39    },
40};
41use matrix_sdk_store_encryption::{Error as EncryptionError, StoreCipher};
42use ruma::{
43    CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri,
44    OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
45    api::client::discovery::get_capabilities::v3::Capabilities,
46    canonical_json::{RedactedBecause, redact},
47    events::{
48        AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
49        GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, SyncStateEvent,
50        presence::PresenceEvent,
51        receipt::{Receipt, ReceiptThread, ReceiptType},
52        room::member::{
53            MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent, SyncRoomMemberEvent,
54        },
55    },
56    serde::Raw,
57};
58use serde::{Deserialize, Serialize, de::DeserializeOwned, ser::Error};
59use tracing::{debug, warn};
60use wasm_bindgen::JsValue;
61
62mod migrations;
63
64pub use self::migrations::MigrationConflictStrategy;
65use self::migrations::{upgrade_inner_db, upgrade_meta_db};
66use crate::{error::GenericError, serializer::safe_encode::traits::SafeEncode};
67
68#[derive(Debug, thiserror::Error)]
69pub enum IndexeddbStateStoreError {
70    #[error(transparent)]
71    Json(#[from] serde_json::Error),
72    #[error(transparent)]
73    Encryption(#[from] EncryptionError),
74    #[error("DomException {name} ({code}): {message}")]
75    DomException { name: String, message: String, code: u16 },
76    #[error(transparent)]
77    StoreError(#[from] StoreError),
78    #[error(
79        "Can't migrate {name} from {old_version} to {new_version} without deleting data. \
80         See MigrationConflictStrategy for ways to configure."
81    )]
82    MigrationConflict { name: String, old_version: u32, new_version: u32 },
83}
84
85impl From<GenericError> for IndexeddbStateStoreError {
86    fn from(value: GenericError) -> Self {
87        Self::StoreError(value.into())
88    }
89}
90
91impl From<web_sys::DomException> for IndexeddbStateStoreError {
92    fn from(frm: web_sys::DomException) -> IndexeddbStateStoreError {
93        IndexeddbStateStoreError::DomException {
94            name: frm.name(),
95            message: frm.message(),
96            code: frm.code(),
97        }
98    }
99}
100
101impl From<IndexeddbStateStoreError> for StoreError {
102    fn from(e: IndexeddbStateStoreError) -> Self {
103        match e {
104            IndexeddbStateStoreError::Json(e) => StoreError::Json(e),
105            IndexeddbStateStoreError::StoreError(e) => e,
106            IndexeddbStateStoreError::Encryption(e) => StoreError::Encryption(e),
107            _ => StoreError::backend(e),
108        }
109    }
110}
111
112impl From<indexed_db_futures::error::DomException> for IndexeddbStateStoreError {
113    fn from(value: indexed_db_futures::error::DomException) -> Self {
114        web_sys::DomException::from(value).into()
115    }
116}
117
118impl From<indexed_db_futures::error::SerialisationError> for IndexeddbStateStoreError {
119    fn from(value: indexed_db_futures::error::SerialisationError) -> Self {
120        Self::Json(serde_json::Error::custom(value.to_string()))
121    }
122}
123
124impl From<indexed_db_futures::error::UnexpectedDataError> for IndexeddbStateStoreError {
125    fn from(value: indexed_db_futures::error::UnexpectedDataError) -> Self {
126        IndexeddbStateStoreError::StoreError(StoreError::backend(value))
127    }
128}
129
130impl From<indexed_db_futures::error::JSError> for IndexeddbStateStoreError {
131    fn from(value: indexed_db_futures::error::JSError) -> Self {
132        GenericError::from(value.to_string()).into()
133    }
134}
135
136impl From<indexed_db_futures::error::Error> for IndexeddbStateStoreError {
137    fn from(value: indexed_db_futures::error::Error) -> Self {
138        use indexed_db_futures::error::Error;
139        match value {
140            Error::DomException(e) => e.into(),
141            Error::Serialisation(e) => e.into(),
142            Error::MissingData(e) => e.into(),
143            Error::Unknown(e) => e.into(),
144        }
145    }
146}
147
148impl From<OpenDbError> for IndexeddbStateStoreError {
149    fn from(value: OpenDbError) -> Self {
150        match value {
151            OpenDbError::Base(error) => error.into(),
152            _ => GenericError::from(value.to_string()).into(),
153        }
154    }
155}
156
157mod keys {
158    pub const INTERNAL_STATE: &str = "matrix-sdk-state";
159    pub const BACKUPS_META: &str = "backups";
160
161    pub const ACCOUNT_DATA: &str = "account_data";
162
163    pub const PROFILES: &str = "profiles";
164    pub const DISPLAY_NAMES: &str = "display_names";
165    pub const USER_IDS: &str = "user_ids";
166
167    pub const ROOM_STATE: &str = "room_state";
168    pub const ROOM_INFOS: &str = "room_infos";
169    pub const PRESENCE: &str = "presence";
170    pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
171    /// Table used to save send queue events.
172    pub const ROOM_SEND_QUEUE: &str = "room_send_queue";
173    /// Table used to save dependent send queue events.
174    pub const DEPENDENT_SEND_QUEUE: &str = "room_dependent_send_queue";
175    pub const THREAD_SUBSCRIPTIONS: &str = "room_thread_subscriptions";
176
177    pub const STRIPPED_ROOM_STATE: &str = "stripped_room_state";
178    pub const STRIPPED_USER_IDS: &str = "stripped_user_ids";
179
180    pub const ROOM_USER_RECEIPTS: &str = "room_user_receipts";
181    pub const ROOM_EVENT_RECEIPTS: &str = "room_event_receipts";
182
183    pub const CUSTOM: &str = "custom";
184    pub const KV: &str = "kv";
185
186    /// All names of the current state stores for convenience.
187    pub const ALL_STORES: &[&str] = &[
188        ACCOUNT_DATA,
189        PROFILES,
190        DISPLAY_NAMES,
191        USER_IDS,
192        ROOM_STATE,
193        ROOM_INFOS,
194        PRESENCE,
195        ROOM_ACCOUNT_DATA,
196        STRIPPED_ROOM_STATE,
197        STRIPPED_USER_IDS,
198        ROOM_USER_RECEIPTS,
199        ROOM_EVENT_RECEIPTS,
200        ROOM_SEND_QUEUE,
201        THREAD_SUBSCRIPTIONS,
202        DEPENDENT_SEND_QUEUE,
203        CUSTOM,
204        KV,
205    ];
206
207    // static keys
208
209    pub const STORE_KEY: &str = "store_key";
210}
211
212pub use keys::ALL_STORES;
213use matrix_sdk_base::store::QueueWedgeError;
214
215/// Encrypt (if needs be) then JSON-serialize a value.
216fn serialize_value(store_cipher: Option<&StoreCipher>, event: &impl Serialize) -> Result<JsValue> {
217    Ok(match store_cipher {
218        Some(cipher) => {
219            let data = serde_json::to_vec(event)?;
220            JsValue::from_serde(&cipher.encrypt_value_data(data)?)?
221        }
222        None => JsValue::from_serde(event)?,
223    })
224}
225
226/// Deserialize a JSON value and then decrypt it (if needs be).
227fn deserialize_value<T: DeserializeOwned>(
228    store_cipher: Option<&StoreCipher>,
229    event: &JsValue,
230) -> Result<T> {
231    match store_cipher {
232        Some(cipher) => {
233            use zeroize::Zeroize;
234            let mut plaintext = cipher.decrypt_value_data(event.into_serde()?)?;
235            let ret = serde_json::from_slice(&plaintext);
236            plaintext.zeroize();
237            Ok(ret?)
238        }
239        None => Ok(event.into_serde()?),
240    }
241}
242
243fn encode_key<T>(store_cipher: Option<&StoreCipher>, table_name: &str, key: T) -> JsValue
244where
245    T: SafeEncode,
246{
247    match store_cipher {
248        Some(cipher) => key.as_secure_string(table_name, cipher),
249        None => key.as_encoded_string(),
250    }
251    .into()
252}
253
254fn encode_to_range<T>(
255    store_cipher: Option<&StoreCipher>,
256    table_name: &str,
257    key: T,
258) -> KeyRange<JsValue>
259where
260    T: SafeEncode,
261{
262    match store_cipher {
263        Some(cipher) => key.encode_to_range_secure(table_name, cipher),
264        None => key.encode_to_range(),
265    }
266}
267
268/// Builder for [`IndexeddbStateStore`].
269#[derive(Debug)]
270pub struct IndexeddbStateStoreBuilder {
271    name: Option<String>,
272    passphrase: Option<String>,
273    migration_conflict_strategy: MigrationConflictStrategy,
274}
275
276impl IndexeddbStateStoreBuilder {
277    fn new() -> Self {
278        Self {
279            name: None,
280            passphrase: None,
281            migration_conflict_strategy: MigrationConflictStrategy::BackupAndDrop,
282        }
283    }
284
285    /// Set the name for the indexeddb store to use, `state` is none given.
286    pub fn name(mut self, value: String) -> Self {
287        self.name = Some(value);
288        self
289    }
290
291    /// Set the password the indexeddb should be encrypted with.
292    ///
293    /// If not given, the DB is not encrypted.
294    pub fn passphrase(mut self, value: String) -> Self {
295        self.passphrase = Some(value);
296        self
297    }
298
299    /// The strategy to use when a merge conflict is found.
300    ///
301    /// See [`MigrationConflictStrategy`] for details.
302    pub fn migration_conflict_strategy(mut self, value: MigrationConflictStrategy) -> Self {
303        self.migration_conflict_strategy = value;
304        self
305    }
306
307    pub async fn build(self) -> Result<IndexeddbStateStore> {
308        let migration_strategy = self.migration_conflict_strategy.clone();
309        let name = self.name.unwrap_or_else(|| "state".to_owned());
310
311        let meta_name = format!("{name}::{}", keys::INTERNAL_STATE);
312
313        let (meta, store_cipher) = upgrade_meta_db(&meta_name, self.passphrase.as_deref()).await?;
314        let inner =
315            upgrade_inner_db(&name, store_cipher.as_deref(), migration_strategy, &meta).await?;
316
317        Ok(IndexeddbStateStore { name, inner, meta, store_cipher })
318    }
319}
320
321pub struct IndexeddbStateStore {
322    name: String,
323    pub(crate) inner: Database,
324    pub(crate) meta: Database,
325    pub(crate) store_cipher: Option<Arc<StoreCipher>>,
326}
327
328#[cfg(not(tarpaulin_include))]
329impl std::fmt::Debug for IndexeddbStateStore {
330    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
331        f.debug_struct("IndexeddbStateStore").field("name", &self.name).finish()
332    }
333}
334
335type Result<A, E = IndexeddbStateStoreError> = std::result::Result<A, E>;
336
337impl IndexeddbStateStore {
338    /// Generate a IndexeddbStateStoreBuilder with default parameters
339    pub fn builder() -> IndexeddbStateStoreBuilder {
340        IndexeddbStateStoreBuilder::new()
341    }
342
343    /// The version of the database containing the data.
344    pub fn version(&self) -> u32 {
345        self.inner.version() as u32
346    }
347
348    /// The version of the database containing the metadata.
349    pub fn meta_version(&self) -> u32 {
350        self.meta.version() as u32
351    }
352
353    /// Whether this database has any migration backups
354    pub async fn has_backups(&self) -> Result<bool> {
355        Ok(self
356            .meta
357            .transaction(keys::BACKUPS_META)
358            .with_mode(TransactionMode::Readonly)
359            .build()?
360            .object_store(keys::BACKUPS_META)?
361            .count()
362            .await?
363            > 0)
364    }
365
366    /// What's the database name of the latest backup<
367    pub async fn latest_backup(&self) -> Result<Option<String>> {
368        if let Some(mut cursor) = self
369            .meta
370            .transaction(keys::BACKUPS_META)
371            .with_mode(TransactionMode::Readonly)
372            .build()?
373            .object_store(keys::BACKUPS_META)?
374            .open_cursor()
375            .with_direction(CursorDirection::Prev)
376            .await?
377            && let Some(record) = cursor.next_record::<JsValue>().await?
378        {
379            Ok(record.as_string())
380        } else {
381            Ok(None)
382        }
383    }
384
385    /// Encrypt (if needs be) then JSON-serialize a value.
386    fn serialize_value(&self, event: &impl Serialize) -> Result<JsValue> {
387        serialize_value(self.store_cipher.as_deref(), event)
388    }
389
390    /// Deserialize a JSON value and then decrypt it (if needs be).
391    fn deserialize_value<T: DeserializeOwned>(&self, event: &JsValue) -> Result<T> {
392        deserialize_value(self.store_cipher.as_deref(), event)
393    }
394
395    fn encode_key<T>(&self, table_name: &str, key: T) -> JsValue
396    where
397        T: SafeEncode,
398    {
399        encode_key(self.store_cipher.as_deref(), table_name, key)
400    }
401
402    fn encode_to_range<T>(&self, table_name: &str, key: T) -> KeyRange<JsValue>
403    where
404        T: SafeEncode,
405    {
406        encode_to_range(self.store_cipher.as_deref(), table_name, key)
407    }
408
409    /// Get user IDs for the given room with the given memberships and stripped
410    /// state.
411    pub async fn get_user_ids_inner(
412        &self,
413        room_id: &RoomId,
414        memberships: RoomMemberships,
415        stripped: bool,
416    ) -> Result<Vec<OwnedUserId>> {
417        let store_name = if stripped { keys::STRIPPED_USER_IDS } else { keys::USER_IDS };
418
419        let tx = self.inner.transaction(store_name).with_mode(TransactionMode::Readonly).build()?;
420        let store = tx.object_store(store_name)?;
421        let range = self.encode_to_range(store_name, room_id);
422
423        let user_ids = if memberships.is_empty() {
424            // It should be faster to just get all user IDs in this case.
425            store
426                .get_all()
427                .with_query(&range)
428                .await?
429                .filter_map(Result::ok)
430                .filter_map(|f| self.deserialize_value::<RoomMember>(&f).ok().map(|m| m.user_id))
431                .collect::<Vec<_>>()
432        } else {
433            let mut user_ids = Vec::new();
434            let cursor = store.open_cursor().with_query(&range).await?;
435
436            if let Some(mut cursor) = cursor {
437                while let Some(value) = cursor.next_record().await? {
438                    let member = self.deserialize_value::<RoomMember>(&value)?;
439
440                    if memberships.matches(&member.membership) {
441                        user_ids.push(member.user_id);
442                    }
443                }
444            }
445
446            user_ids
447        };
448
449        Ok(user_ids)
450    }
451
452    async fn get_custom_value_for_js(&self, jskey: &JsValue) -> Result<Option<Vec<u8>>> {
453        self.inner
454            .transaction(keys::CUSTOM)
455            .with_mode(TransactionMode::Readonly)
456            .build()?
457            .object_store(keys::CUSTOM)?
458            .get(jskey)
459            .await?
460            .map(|f| self.deserialize_value(&f))
461            .transpose()
462    }
463
464    fn encode_kv_data_key(&self, key: StateStoreDataKey<'_>) -> JsValue {
465        // Use the key (prefix) for the table name as well, to keep encoded
466        // keys compatible for the sync token and filters, which were in
467        // separate tables initially.
468        match key {
469            StateStoreDataKey::SyncToken => {
470                self.encode_key(StateStoreDataKey::SYNC_TOKEN, StateStoreDataKey::SYNC_TOKEN)
471            }
472            StateStoreDataKey::SupportedVersions => self.encode_key(
473                StateStoreDataKey::SUPPORTED_VERSIONS,
474                StateStoreDataKey::SUPPORTED_VERSIONS,
475            ),
476            StateStoreDataKey::WellKnown => {
477                self.encode_key(keys::KV, StateStoreDataKey::WELL_KNOWN)
478            }
479            StateStoreDataKey::Filter(filter_name) => {
480                self.encode_key(StateStoreDataKey::FILTER, (StateStoreDataKey::FILTER, filter_name))
481            }
482            StateStoreDataKey::UserAvatarUrl(user_id) => {
483                self.encode_key(keys::KV, (StateStoreDataKey::USER_AVATAR_URL, user_id))
484            }
485            StateStoreDataKey::RecentlyVisitedRooms(user_id) => {
486                self.encode_key(keys::KV, (StateStoreDataKey::RECENTLY_VISITED_ROOMS, user_id))
487            }
488            StateStoreDataKey::UtdHookManagerData => {
489                self.encode_key(keys::KV, StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
490            }
491            StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
492                self.encode_key(keys::KV, StateStoreDataKey::ONE_TIME_KEY_ALREADY_UPLOADED)
493            }
494            StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
495                if let Some(thread_root) = thread_root {
496                    self.encode_key(
497                        keys::KV,
498                        (StateStoreDataKey::COMPOSER_DRAFT, (room_id, thread_root)),
499                    )
500                } else {
501                    self.encode_key(keys::KV, (StateStoreDataKey::COMPOSER_DRAFT, room_id))
502                }
503            }
504            StateStoreDataKey::SeenKnockRequests(room_id) => {
505                self.encode_key(keys::KV, (StateStoreDataKey::SEEN_KNOCK_REQUESTS, room_id))
506            }
507            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
508                self.encode_key(keys::KV, StateStoreDataKey::THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS)
509            }
510            StateStoreDataKey::HomeserverCapabilities => {
511                self.encode_key(keys::KV, StateStoreDataKey::HOMESERVER_CAPABILITIES)
512            }
513        }
514    }
515}
516
517/// A superset of [`QueuedRequest`] that also contains the room id, since we
518/// want to return them.
519#[derive(Serialize, Deserialize)]
520struct PersistedQueuedRequest {
521    /// In which room is this event going to be sent.
522    pub room_id: OwnedRoomId,
523
524    // All these fields are the same as in [`QueuedRequest`].
525    /// Kind. Optional because it might be missing from previous formats.
526    kind: Option<QueuedRequestKind>,
527    transaction_id: OwnedTransactionId,
528
529    pub error: Option<QueueWedgeError>,
530
531    priority: Option<usize>,
532
533    /// The time the original message was first attempted to be sent at.
534    #[serde(default = "created_now")]
535    created_at: MilliSecondsSinceUnixEpoch,
536
537    // Migrated fields: keep these private, they're not used anymore elsewhere in the code base.
538    /// Deprecated (from old format), now replaced with error field.
539    is_wedged: Option<bool>,
540
541    event: Option<SerializableEventContent>,
542}
543
544fn created_now() -> MilliSecondsSinceUnixEpoch {
545    MilliSecondsSinceUnixEpoch::now()
546}
547
548impl PersistedQueuedRequest {
549    fn into_queued_request(self) -> Option<QueuedRequest> {
550        let kind =
551            self.kind.or_else(|| self.event.map(|content| QueuedRequestKind::Event { content }))?;
552
553        let error = match self.is_wedged {
554            Some(true) => {
555                // Migrate to a generic error.
556                Some(QueueWedgeError::GenericApiError {
557                    msg: "local echo failed to send in a previous session".into(),
558                })
559            }
560            _ => self.error,
561        };
562
563        // By default, events without a priority have a priority of 0.
564        let priority = self.priority.unwrap_or(0);
565
566        Some(QueuedRequest {
567            kind,
568            transaction_id: self.transaction_id,
569            error,
570            priority,
571            created_at: self.created_at,
572        })
573    }
574}
575
576#[derive(Serialize, Deserialize, PartialEq)]
577struct PersistedThreadSubscription {
578    status: String,
579    bump_stamp: Option<u64>,
580}
581
582impl From<StoredThreadSubscription> for PersistedThreadSubscription {
583    fn from(value: StoredThreadSubscription) -> Self {
584        Self { status: value.status.as_str().to_owned(), bump_stamp: value.bump_stamp }
585    }
586}
587
588// Small hack to have the following macro invocation act as the appropriate
589// trait impl block on wasm, but still be compiled on non-wasm as a regular
590// impl block otherwise.
591//
592// The trait impl doesn't compile on non-wasm due to unfulfilled trait bounds,
593// this hack allows us to still have most of rust-analyzer's IDE functionality
594// within the impl block without having to set it up to check things against
595// the wasm target (which would disable many other parts of the codebase).
596#[cfg(target_family = "wasm")]
597macro_rules! impl_state_store {
598    ({ $($body:tt)* }) => {
599        #[async_trait(?Send)]
600        impl StateStore for IndexeddbStateStore {
601            type Error = IndexeddbStateStoreError;
602
603            $($body)*
604        }
605    };
606}
607
608#[cfg(not(target_family = "wasm"))]
609macro_rules! impl_state_store {
610    ({ $($body:tt)* }) => {
611        impl IndexeddbStateStore {
612            $($body)*
613        }
614    };
615}
616
617impl_state_store!({
618    async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
619        let encoded_key = self.encode_kv_data_key(key);
620
621        let value = self
622            .inner
623            .transaction(keys::KV)
624            .with_mode(TransactionMode::Readonly)
625            .build()?
626            .object_store(keys::KV)?
627            .get(&encoded_key)
628            .await?;
629
630        let value = match key {
631            StateStoreDataKey::SyncToken => value
632                .map(|f| self.deserialize_value::<String>(&f))
633                .transpose()?
634                .map(StateStoreDataValue::SyncToken),
635            StateStoreDataKey::SupportedVersions => value
636                .map(|f| self.deserialize_value::<TtlStoreValue<SupportedVersionsResponse>>(&f))
637                .transpose()?
638                .map(StateStoreDataValue::SupportedVersions),
639            StateStoreDataKey::WellKnown => value
640                .map(|f| self.deserialize_value::<TtlStoreValue<Option<WellKnownResponse>>>(&f))
641                .transpose()?
642                .map(StateStoreDataValue::WellKnown),
643            StateStoreDataKey::Filter(_) => value
644                .map(|f| self.deserialize_value::<String>(&f))
645                .transpose()?
646                .map(StateStoreDataValue::Filter),
647            StateStoreDataKey::UserAvatarUrl(_) => value
648                .map(|f| self.deserialize_value::<OwnedMxcUri>(&f))
649                .transpose()?
650                .map(StateStoreDataValue::UserAvatarUrl),
651            StateStoreDataKey::RecentlyVisitedRooms(_) => value
652                .map(|f| self.deserialize_value::<Vec<OwnedRoomId>>(&f))
653                .transpose()?
654                .map(StateStoreDataValue::RecentlyVisitedRooms),
655            StateStoreDataKey::UtdHookManagerData => value
656                .map(|f| self.deserialize_value::<GrowableBloom>(&f))
657                .transpose()?
658                .map(StateStoreDataValue::UtdHookManagerData),
659            StateStoreDataKey::OneTimeKeyAlreadyUploaded => value
660                .map(|f| self.deserialize_value::<bool>(&f))
661                .transpose()?
662                .map(|_| StateStoreDataValue::OneTimeKeyAlreadyUploaded),
663            StateStoreDataKey::ComposerDraft(_, _) => value
664                .map(|f| self.deserialize_value::<ComposerDraft>(&f))
665                .transpose()?
666                .map(StateStoreDataValue::ComposerDraft),
667            StateStoreDataKey::SeenKnockRequests(_) => value
668                .map(|f| self.deserialize_value::<BTreeMap<OwnedEventId, OwnedUserId>>(&f))
669                .transpose()?
670                .map(StateStoreDataValue::SeenKnockRequests),
671            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => value
672                .map(|f| self.deserialize_value::<Vec<ThreadSubscriptionCatchupToken>>(&f))
673                .transpose()?
674                .map(StateStoreDataValue::ThreadSubscriptionsCatchupTokens),
675            StateStoreDataKey::HomeserverCapabilities => value
676                .map(|f| self.deserialize_value::<Capabilities>(&f))
677                .transpose()?
678                .map(StateStoreDataValue::HomeserverCapabilities),
679        };
680
681        Ok(value)
682    }
683
684    async fn set_kv_data(
685        &self,
686        key: StateStoreDataKey<'_>,
687        value: StateStoreDataValue,
688    ) -> Result<()> {
689        let encoded_key = self.encode_kv_data_key(key);
690
691        let serialized_value = match key {
692            StateStoreDataKey::SyncToken => self
693                .serialize_value(&value.into_sync_token().expect("Session data not a sync token")),
694            StateStoreDataKey::SupportedVersions => self.serialize_value(
695                &value
696                    .into_supported_versions()
697                    .expect("Session data not containing supported versions"),
698            ),
699            StateStoreDataKey::WellKnown => self.serialize_value(
700                &value.into_well_known().expect("Session data not containing well-known"),
701            ),
702            StateStoreDataKey::Filter(_) => {
703                self.serialize_value(&value.into_filter().expect("Session data not a filter"))
704            }
705            StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
706                &value.into_user_avatar_url().expect("Session data not an user avatar url"),
707            ),
708            StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
709                &value
710                    .into_recently_visited_rooms()
711                    .expect("Session data not a recently visited room list"),
712            ),
713            StateStoreDataKey::UtdHookManagerData => self.serialize_value(
714                &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
715            ),
716            StateStoreDataKey::OneTimeKeyAlreadyUploaded => self.serialize_value(&true),
717            StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
718                &value.into_composer_draft().expect("Session data not a composer draft"),
719            ),
720            StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
721                &value
722                    .into_seen_knock_requests()
723                    .expect("Session data is not a set of seen knock request ids"),
724            ),
725            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => self.serialize_value(
726                &value
727                    .into_thread_subscriptions_catchup_tokens()
728                    .expect("Session data is not a list of thread subscription catchup tokens"),
729            ),
730            StateStoreDataKey::HomeserverCapabilities => self.serialize_value(
731                &value
732                    .into_homeserver_capabilities()
733                    .expect("Session data is not a homeserver capabilities"),
734            ),
735        };
736
737        let tx = self.inner.transaction(keys::KV).with_mode(TransactionMode::Readwrite).build()?;
738
739        let obj = tx.object_store(keys::KV)?;
740
741        obj.put(&serialized_value?).with_key(encoded_key).build()?;
742
743        tx.commit().await?;
744
745        Ok(())
746    }
747
748    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
749        let encoded_key = self.encode_kv_data_key(key);
750
751        let tx = self.inner.transaction(keys::KV).with_mode(TransactionMode::Readwrite).build()?;
752        let obj = tx.object_store(keys::KV)?;
753
754        obj.delete(&encoded_key).build()?;
755
756        tx.commit().await?;
757
758        Ok(())
759    }
760
761    async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
762        let mut stores: HashSet<&'static str> = [
763            (changes.sync_token.is_some(), keys::KV),
764            (!changes.ambiguity_maps.is_empty(), keys::DISPLAY_NAMES),
765            (!changes.account_data.is_empty(), keys::ACCOUNT_DATA),
766            (!changes.presence.is_empty(), keys::PRESENCE),
767            (
768                !changes.profiles.is_empty() || !changes.profiles_to_delete.is_empty(),
769                keys::PROFILES,
770            ),
771            (!changes.room_account_data.is_empty(), keys::ROOM_ACCOUNT_DATA),
772            (!changes.receipts.is_empty(), keys::ROOM_EVENT_RECEIPTS),
773        ]
774        .iter()
775        .filter_map(|(id, key)| if *id { Some(*key) } else { None })
776        .collect();
777
778        if !changes.state.is_empty() {
779            stores.extend([
780                keys::ROOM_STATE,
781                keys::USER_IDS,
782                keys::STRIPPED_USER_IDS,
783                keys::STRIPPED_ROOM_STATE,
784                keys::PROFILES,
785            ]);
786        }
787
788        if !changes.redactions.is_empty() {
789            stores.extend([keys::ROOM_STATE, keys::ROOM_INFOS]);
790        }
791
792        if !changes.room_infos.is_empty() {
793            stores.insert(keys::ROOM_INFOS);
794        }
795
796        if !changes.stripped_state.is_empty() {
797            stores.extend([keys::STRIPPED_ROOM_STATE, keys::STRIPPED_USER_IDS]);
798        }
799
800        if !changes.receipts.is_empty() {
801            stores.extend([keys::ROOM_EVENT_RECEIPTS, keys::ROOM_USER_RECEIPTS])
802        }
803
804        if stores.is_empty() {
805            // nothing to do, quit early
806            return Ok(());
807        }
808
809        let stores: Vec<&'static str> = stores.into_iter().collect();
810        let tx = self.inner.transaction(stores).with_mode(TransactionMode::Readwrite).build()?;
811
812        if let Some(s) = &changes.sync_token {
813            tx.object_store(keys::KV)?
814                .put(&self.serialize_value(s)?)
815                .with_key(self.encode_kv_data_key(StateStoreDataKey::SyncToken))
816                .build()?;
817        }
818
819        if !changes.ambiguity_maps.is_empty() {
820            let store = tx.object_store(keys::DISPLAY_NAMES)?;
821            for (room_id, ambiguity_maps) in &changes.ambiguity_maps {
822                for (display_name, map) in ambiguity_maps {
823                    let key = self.encode_key(
824                        keys::DISPLAY_NAMES,
825                        (
826                            room_id,
827                            display_name
828                                .as_normalized_str()
829                                .unwrap_or_else(|| display_name.as_raw_str()),
830                        ),
831                    );
832
833                    store.put(&self.serialize_value(&map)?).with_key(key).build()?;
834                }
835            }
836        }
837
838        if !changes.account_data.is_empty() {
839            let store = tx.object_store(keys::ACCOUNT_DATA)?;
840            for (event_type, event) in &changes.account_data {
841                store
842                    .put(&self.serialize_value(&event)?)
843                    .with_key(self.encode_key(keys::ACCOUNT_DATA, event_type))
844                    .build()?;
845            }
846        }
847
848        if !changes.room_account_data.is_empty() {
849            let store = tx.object_store(keys::ROOM_ACCOUNT_DATA)?;
850            for (room, events) in &changes.room_account_data {
851                for (event_type, event) in events {
852                    let key = self.encode_key(keys::ROOM_ACCOUNT_DATA, (room, event_type));
853                    store.put(&self.serialize_value(&event)?).with_key(key).build()?;
854                }
855            }
856        }
857
858        if !changes.state.is_empty() {
859            let state = tx.object_store(keys::ROOM_STATE)?;
860            let profiles = tx.object_store(keys::PROFILES)?;
861            let user_ids = tx.object_store(keys::USER_IDS)?;
862            let stripped_state = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
863            let stripped_user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
864
865            for (room, user_ids) in &changes.profiles_to_delete {
866                for user_id in user_ids {
867                    let key = self.encode_key(keys::PROFILES, (room, user_id));
868                    profiles.delete(&key).build()?;
869                }
870            }
871
872            for (room, event_types) in &changes.state {
873                let profile_changes = changes.profiles.get(room);
874
875                for (event_type, events) in event_types {
876                    for (state_key, raw_event) in events {
877                        let key = self.encode_key(keys::ROOM_STATE, (room, event_type, state_key));
878                        state
879                            .put(&self.serialize_value(&raw_event)?)
880                            .with_key(key.clone())
881                            .build()?;
882                        stripped_state.delete(&key).build()?;
883
884                        if *event_type == StateEventType::RoomMember {
885                            let event =
886                                match raw_event.deserialize_as_unchecked::<SyncRoomMemberEvent>() {
887                                    Ok(ev) => ev,
888                                    Err(e) => {
889                                        let event_id: Option<String> =
890                                            raw_event.get_field("event_id").ok().flatten();
891                                        debug!(event_id, "Failed to deserialize member event: {e}");
892                                        continue;
893                                    }
894                                };
895
896                            let key = (room, state_key);
897
898                            stripped_user_ids
899                                .delete(&self.encode_key(keys::STRIPPED_USER_IDS, key))
900                                .build()?;
901
902                            user_ids
903                                .put(&self.serialize_value(&RoomMember::from(&event))?)
904                                .with_key(self.encode_key(keys::USER_IDS, key))
905                                .build()?;
906
907                            if let Some(profile) =
908                                profile_changes.and_then(|p| p.get(event.state_key()))
909                            {
910                                profiles
911                                    .put(&self.serialize_value(&profile)?)
912                                    .with_key(self.encode_key(keys::PROFILES, key))
913                                    .build()?;
914                            }
915                        }
916                    }
917                }
918            }
919        }
920
921        if !changes.room_infos.is_empty() {
922            let room_infos = tx.object_store(keys::ROOM_INFOS)?;
923            for (room_id, room_info) in &changes.room_infos {
924                room_infos
925                    .put(&self.serialize_value(&room_info)?)
926                    .with_key(self.encode_key(keys::ROOM_INFOS, room_id))
927                    .build()?;
928            }
929        }
930
931        if !changes.presence.is_empty() {
932            let store = tx.object_store(keys::PRESENCE)?;
933            for (sender, event) in &changes.presence {
934                store
935                    .put(&self.serialize_value(&event)?)
936                    .with_key(self.encode_key(keys::PRESENCE, sender))
937                    .build()?;
938            }
939        }
940
941        if !changes.stripped_state.is_empty() {
942            let store = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
943            let user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
944
945            for (room, event_types) in &changes.stripped_state {
946                for (event_type, events) in event_types {
947                    for (state_key, raw_event) in events {
948                        let key = self
949                            .encode_key(keys::STRIPPED_ROOM_STATE, (room, event_type, state_key));
950                        store.put(&self.serialize_value(&raw_event)?).with_key(key).build()?;
951
952                        if *event_type == StateEventType::RoomMember {
953                            let event = match raw_event
954                                .deserialize_as_unchecked::<StrippedRoomMemberEvent>()
955                            {
956                                Ok(ev) => ev,
957                                Err(e) => {
958                                    let event_id: Option<String> =
959                                        raw_event.get_field("event_id").ok().flatten();
960                                    debug!(
961                                        event_id,
962                                        "Failed to deserialize stripped member event: {e}"
963                                    );
964                                    continue;
965                                }
966                            };
967
968                            let key = (room, state_key);
969
970                            user_ids
971                                .put(&self.serialize_value(&RoomMember::from(&event))?)
972                                .with_key(self.encode_key(keys::STRIPPED_USER_IDS, key))
973                                .build()?;
974                        }
975                    }
976                }
977            }
978        }
979
980        if !changes.receipts.is_empty() {
981            let room_user_receipts = tx.object_store(keys::ROOM_USER_RECEIPTS)?;
982            let room_event_receipts = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
983
984            for (room, content) in &changes.receipts {
985                for (event_id, receipts) in &content.0 {
986                    for (receipt_type, receipts) in receipts {
987                        for (user_id, receipt) in receipts {
988                            let key = match receipt.thread.as_str() {
989                                Some(thread_id) => self.encode_key(
990                                    keys::ROOM_USER_RECEIPTS,
991                                    (room, receipt_type, thread_id, user_id),
992                                ),
993                                None => self.encode_key(
994                                    keys::ROOM_USER_RECEIPTS,
995                                    (room, receipt_type, user_id),
996                                ),
997                            };
998
999                            if let Some((old_event, _)) =
1000                                room_user_receipts.get(&key).await?.and_then(|f| {
1001                                    self.deserialize_value::<(OwnedEventId, Receipt)>(&f).ok()
1002                                })
1003                            {
1004                                let key = match receipt.thread.as_str() {
1005                                    Some(thread_id) => self.encode_key(
1006                                        keys::ROOM_EVENT_RECEIPTS,
1007                                        (room, receipt_type, thread_id, old_event, user_id),
1008                                    ),
1009                                    None => self.encode_key(
1010                                        keys::ROOM_EVENT_RECEIPTS,
1011                                        (room, receipt_type, old_event, user_id),
1012                                    ),
1013                                };
1014                                room_event_receipts.delete(&key).build()?;
1015                            }
1016
1017                            room_user_receipts
1018                                .put(&self.serialize_value(&(event_id, receipt))?)
1019                                .with_key(key)
1020                                .build()?;
1021
1022                            // Add the receipt to the room event receipts
1023                            let key = match receipt.thread.as_str() {
1024                                Some(thread_id) => self.encode_key(
1025                                    keys::ROOM_EVENT_RECEIPTS,
1026                                    (room, receipt_type, thread_id, event_id, user_id),
1027                                ),
1028                                None => self.encode_key(
1029                                    keys::ROOM_EVENT_RECEIPTS,
1030                                    (room, receipt_type, event_id, user_id),
1031                                ),
1032                            };
1033                            room_event_receipts
1034                                .put(&self.serialize_value(&(user_id, receipt))?)
1035                                .with_key(key)
1036                                .build()?;
1037                        }
1038                    }
1039                }
1040            }
1041        }
1042
1043        if !changes.redactions.is_empty() {
1044            let state = tx.object_store(keys::ROOM_STATE)?;
1045            let room_info = tx.object_store(keys::ROOM_INFOS)?;
1046
1047            for (room_id, redactions) in &changes.redactions {
1048                let range = self.encode_to_range(keys::ROOM_STATE, room_id);
1049                let Some(mut cursor) = state.open_cursor().with_query(&range).await? else {
1050                    continue;
1051                };
1052
1053                let mut redaction_rules = None;
1054
1055                while let Some(value) = cursor.next_record().await? {
1056                    let Some(key) = cursor.key::<JsValue>()? else {
1057                        break;
1058                    };
1059
1060                    let raw_evt = self.deserialize_value::<Raw<AnySyncStateEvent>>(&value)?;
1061                    if let Ok(Some(event_id)) = raw_evt.get_field::<OwnedEventId>("event_id")
1062                        && let Some(redaction) = redactions.get(&event_id)
1063                    {
1064                        let redaction_rules = match &redaction_rules {
1065                            Some(r) => r,
1066                            None => {
1067                                let value = room_info
1068                                    .get(&self.encode_key(keys::ROOM_INFOS, room_id))
1069                                    .await?
1070                                    .and_then(|f| self.deserialize_value::<RoomInfo>(&f).ok())
1071                                    .map(|info| info.room_version_rules_or_default())
1072                                    .unwrap_or_else(|| {
1073                                        warn!(
1074                                            ?room_id,
1075                                            "Unable to get the room version rules, \
1076                                             defaulting to rules for room version \
1077                                             {ROOM_VERSION_FALLBACK}"
1078                                        );
1079                                        ROOM_VERSION_RULES_FALLBACK
1080                                    })
1081                                    .redaction;
1082                                redaction_rules.get_or_insert(value)
1083                            }
1084                        };
1085
1086                        let redacted = redact(
1087                            raw_evt.deserialize_as::<CanonicalJsonObject>()?,
1088                            redaction_rules,
1089                            Some(RedactedBecause::from_raw_event(redaction)?),
1090                        )
1091                        .map_err(StoreError::Redaction)?;
1092                        state.put(&self.serialize_value(&redacted)?).with_key(key).build()?;
1093                    }
1094                }
1095            }
1096        }
1097
1098        tx.commit().await.map_err(|e| e.into())
1099    }
1100
1101    async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1102        self.inner
1103            .transaction(keys::PRESENCE)
1104            .with_mode(TransactionMode::Readonly)
1105            .build()?
1106            .object_store(keys::PRESENCE)?
1107            .get(&self.encode_key(keys::PRESENCE, user_id))
1108            .await?
1109            .map(|f| self.deserialize_value(&f))
1110            .transpose()
1111    }
1112
1113    async fn get_presence_events(
1114        &self,
1115        user_ids: &[OwnedUserId],
1116    ) -> Result<Vec<Raw<PresenceEvent>>> {
1117        if user_ids.is_empty() {
1118            return Ok(Vec::new());
1119        }
1120
1121        let txn =
1122            self.inner.transaction(keys::PRESENCE).with_mode(TransactionMode::Readonly).build()?;
1123        let store = txn.object_store(keys::PRESENCE)?;
1124
1125        let mut events = Vec::with_capacity(user_ids.len());
1126
1127        for user_id in user_ids {
1128            if let Some(event) = store
1129                .get(&self.encode_key(keys::PRESENCE, user_id))
1130                .await?
1131                .map(|f| self.deserialize_value(&f))
1132                .transpose()?
1133            {
1134                events.push(event)
1135            }
1136        }
1137
1138        Ok(events)
1139    }
1140
1141    async fn get_state_event(
1142        &self,
1143        room_id: &RoomId,
1144        event_type: StateEventType,
1145        state_key: &str,
1146    ) -> Result<Option<RawAnySyncOrStrippedState>> {
1147        Ok(self
1148            .get_state_events_for_keys(room_id, event_type, &[state_key])
1149            .await?
1150            .into_iter()
1151            .next())
1152    }
1153
1154    async fn get_state_events(
1155        &self,
1156        room_id: &RoomId,
1157        event_type: StateEventType,
1158    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1159        let stripped_range =
1160            self.encode_to_range(keys::STRIPPED_ROOM_STATE, (room_id, &event_type));
1161        let stripped_events = self
1162            .inner
1163            .transaction(keys::STRIPPED_ROOM_STATE)
1164            .with_mode(TransactionMode::Readonly)
1165            .build()?
1166            .object_store(keys::STRIPPED_ROOM_STATE)?
1167            .get_all()
1168            .with_query(&stripped_range)
1169            .await?
1170            .filter_map(Result::ok)
1171            .filter_map(|f| {
1172                self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Stripped)
1173            })
1174            .collect::<Vec<_>>();
1175
1176        if !stripped_events.is_empty() {
1177            return Ok(stripped_events);
1178        }
1179
1180        let range = self.encode_to_range(keys::ROOM_STATE, (room_id, event_type));
1181        Ok(self
1182            .inner
1183            .transaction(keys::ROOM_STATE)
1184            .with_mode(TransactionMode::Readonly)
1185            .build()?
1186            .object_store(keys::ROOM_STATE)?
1187            .get_all()
1188            .with_query(&range)
1189            .await?
1190            .filter_map(Result::ok)
1191            .filter_map(|f| self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Sync))
1192            .collect::<Vec<_>>())
1193    }
1194
1195    async fn get_state_events_for_keys(
1196        &self,
1197        room_id: &RoomId,
1198        event_type: StateEventType,
1199        state_keys: &[&str],
1200    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1201        if state_keys.is_empty() {
1202            return Ok(Vec::new());
1203        }
1204
1205        let mut events = Vec::with_capacity(state_keys.len());
1206
1207        {
1208            let txn = self
1209                .inner
1210                .transaction(keys::STRIPPED_ROOM_STATE)
1211                .with_mode(TransactionMode::Readonly)
1212                .build()?;
1213            let store = txn.object_store(keys::STRIPPED_ROOM_STATE)?;
1214
1215            for state_key in state_keys {
1216                if let Some(event) =
1217                    store
1218                        .get(&self.encode_key(
1219                            keys::STRIPPED_ROOM_STATE,
1220                            (room_id, &event_type, state_key),
1221                        ))
1222                        .await?
1223                        .map(|f| self.deserialize_value(&f))
1224                        .transpose()?
1225                {
1226                    events.push(RawAnySyncOrStrippedState::Stripped(event));
1227                }
1228            }
1229
1230            if !events.is_empty() {
1231                return Ok(events);
1232            }
1233        }
1234
1235        let txn = self
1236            .inner
1237            .transaction(keys::ROOM_STATE)
1238            .with_mode(TransactionMode::Readonly)
1239            .build()?;
1240        let store = txn.object_store(keys::ROOM_STATE)?;
1241
1242        for state_key in state_keys {
1243            if let Some(event) = store
1244                .get(&self.encode_key(keys::ROOM_STATE, (room_id, &event_type, state_key)))
1245                .await?
1246                .map(|f| self.deserialize_value(&f))
1247                .transpose()?
1248            {
1249                events.push(RawAnySyncOrStrippedState::Sync(event));
1250            }
1251        }
1252
1253        Ok(events)
1254    }
1255
1256    async fn get_profile(
1257        &self,
1258        room_id: &RoomId,
1259        user_id: &UserId,
1260    ) -> Result<Option<MinimalRoomMemberEvent>> {
1261        self.inner
1262            .transaction(keys::PROFILES)
1263            .with_mode(TransactionMode::Readonly)
1264            .build()?
1265            .object_store(keys::PROFILES)?
1266            .get(&self.encode_key(keys::PROFILES, (room_id, user_id)))
1267            .await?
1268            .map(|f| self.deserialize_value(&f))
1269            .transpose()
1270    }
1271
1272    async fn get_profiles<'a>(
1273        &self,
1274        room_id: &RoomId,
1275        user_ids: &'a [OwnedUserId],
1276    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1277        if user_ids.is_empty() {
1278            return Ok(BTreeMap::new());
1279        }
1280
1281        let txn =
1282            self.inner.transaction(keys::PROFILES).with_mode(TransactionMode::Readonly).build()?;
1283        let store = txn.object_store(keys::PROFILES)?;
1284
1285        let mut profiles = BTreeMap::new();
1286        for user_id in user_ids {
1287            if let Some(profile) = store
1288                .get(&self.encode_key(keys::PROFILES, (room_id, user_id)))
1289                .await?
1290                .map(|f| self.deserialize_value(&f))
1291                .transpose()?
1292            {
1293                profiles.insert(user_id.as_ref(), profile);
1294            }
1295        }
1296
1297        Ok(profiles)
1298    }
1299
1300    async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1301        let transaction = self
1302            .inner
1303            .transaction(keys::ROOM_INFOS)
1304            .with_mode(TransactionMode::Readonly)
1305            .build()?;
1306
1307        let object_store = transaction.object_store(keys::ROOM_INFOS)?;
1308
1309        Ok(match room_load_settings {
1310            RoomLoadSettings::All => object_store
1311                .get_all()
1312                .await?
1313                .map(|room_info| self.deserialize_value::<RoomInfo>(&room_info?))
1314                .collect::<Result<_>>()?,
1315
1316            RoomLoadSettings::One(room_id) => {
1317                match object_store.get(&self.encode_key(keys::ROOM_INFOS, room_id)).await? {
1318                    Some(room_info) => vec![self.deserialize_value::<RoomInfo>(&room_info)?],
1319                    None => vec![],
1320                }
1321            }
1322        })
1323    }
1324
1325    async fn get_users_with_display_name(
1326        &self,
1327        room_id: &RoomId,
1328        display_name: &DisplayName,
1329    ) -> Result<BTreeSet<OwnedUserId>> {
1330        self.inner
1331            .transaction(keys::DISPLAY_NAMES)
1332            .with_mode(TransactionMode::Readonly)
1333            .build()?
1334            .object_store(keys::DISPLAY_NAMES)?
1335            .get(&self.encode_key(
1336                keys::DISPLAY_NAMES,
1337                (
1338                    room_id,
1339                    display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1340                ),
1341            ))
1342            .await?
1343            .map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
1344            .unwrap_or_else(|| Ok(Default::default()))
1345    }
1346
1347    async fn get_users_with_display_names<'a>(
1348        &self,
1349        room_id: &RoomId,
1350        display_names: &'a [DisplayName],
1351    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1352        let mut map = HashMap::new();
1353
1354        if display_names.is_empty() {
1355            return Ok(map);
1356        }
1357
1358        let txn = self
1359            .inner
1360            .transaction(keys::DISPLAY_NAMES)
1361            .with_mode(TransactionMode::Readonly)
1362            .build()?;
1363        let store = txn.object_store(keys::DISPLAY_NAMES)?;
1364
1365        for display_name in display_names {
1366            if let Some(user_ids) = store
1367                .get(
1368                    &self.encode_key(
1369                        keys::DISPLAY_NAMES,
1370                        (
1371                            room_id,
1372                            display_name
1373                                .as_normalized_str()
1374                                .unwrap_or_else(|| display_name.as_raw_str()),
1375                        ),
1376                    ),
1377                )
1378                .await?
1379                .map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
1380                .transpose()?
1381            {
1382                map.insert(display_name, user_ids);
1383            }
1384        }
1385
1386        Ok(map)
1387    }
1388
1389    async fn get_account_data_event(
1390        &self,
1391        event_type: GlobalAccountDataEventType,
1392    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1393        self.inner
1394            .transaction(keys::ACCOUNT_DATA)
1395            .with_mode(TransactionMode::Readonly)
1396            .build()?
1397            .object_store(keys::ACCOUNT_DATA)?
1398            .get(&self.encode_key(keys::ACCOUNT_DATA, event_type))
1399            .await?
1400            .map(|f| self.deserialize_value(&f))
1401            .transpose()
1402    }
1403
1404    async fn get_room_account_data_event(
1405        &self,
1406        room_id: &RoomId,
1407        event_type: RoomAccountDataEventType,
1408    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1409        self.inner
1410            .transaction(keys::ROOM_ACCOUNT_DATA)
1411            .with_mode(TransactionMode::Readonly)
1412            .build()?
1413            .object_store(keys::ROOM_ACCOUNT_DATA)?
1414            .get(&self.encode_key(keys::ROOM_ACCOUNT_DATA, (room_id, event_type)))
1415            .await?
1416            .map(|f| self.deserialize_value(&f))
1417            .transpose()
1418    }
1419
1420    async fn get_user_room_receipt_event(
1421        &self,
1422        room_id: &RoomId,
1423        receipt_type: ReceiptType,
1424        thread: ReceiptThread,
1425        user_id: &UserId,
1426    ) -> Result<Option<(OwnedEventId, Receipt)>> {
1427        let key = match thread.as_str() {
1428            Some(thread_id) => self
1429                .encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, thread_id, user_id)),
1430            None => self.encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, user_id)),
1431        };
1432        self.inner
1433            .transaction(keys::ROOM_USER_RECEIPTS)
1434            .with_mode(TransactionMode::Readonly)
1435            .build()?
1436            .object_store(keys::ROOM_USER_RECEIPTS)?
1437            .get(&key)
1438            .await?
1439            .map(|f| self.deserialize_value(&f))
1440            .transpose()
1441    }
1442
1443    async fn get_event_room_receipt_events(
1444        &self,
1445        room_id: &RoomId,
1446        receipt_type: ReceiptType,
1447        thread: ReceiptThread,
1448        event_id: &EventId,
1449    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1450        let range = match thread.as_str() {
1451            Some(thread_id) => self.encode_to_range(
1452                keys::ROOM_EVENT_RECEIPTS,
1453                (room_id, receipt_type, thread_id, event_id),
1454            ),
1455            None => {
1456                self.encode_to_range(keys::ROOM_EVENT_RECEIPTS, (room_id, receipt_type, event_id))
1457            }
1458        };
1459        let tx = self
1460            .inner
1461            .transaction(keys::ROOM_EVENT_RECEIPTS)
1462            .with_mode(TransactionMode::Readonly)
1463            .build()?;
1464        let store = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
1465
1466        Ok(store
1467            .get_all()
1468            .with_query(&range)
1469            .await?
1470            .filter_map(Result::ok)
1471            .filter_map(|f| self.deserialize_value(&f).ok())
1472            .collect::<Vec<_>>())
1473    }
1474
1475    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1476        let jskey = &JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1477        self.get_custom_value_for_js(jskey).await
1478    }
1479
1480    async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1481        let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1482
1483        let prev = self.get_custom_value_for_js(&jskey).await?;
1484
1485        let tx =
1486            self.inner.transaction(keys::CUSTOM).with_mode(TransactionMode::Readwrite).build()?;
1487
1488        tx.object_store(keys::CUSTOM)?
1489            .put(&self.serialize_value(&value)?)
1490            .with_key(jskey)
1491            .build()?;
1492
1493        tx.commit().await.map_err(IndexeddbStateStoreError::from)?;
1494        Ok(prev)
1495    }
1496
1497    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1498        let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1499
1500        let prev = self.get_custom_value_for_js(&jskey).await?;
1501
1502        let tx =
1503            self.inner.transaction(keys::CUSTOM).with_mode(TransactionMode::Readwrite).build()?;
1504
1505        tx.object_store(keys::CUSTOM)?.delete(&jskey).build()?;
1506
1507        tx.commit().await.map_err(IndexeddbStateStoreError::from)?;
1508        Ok(prev)
1509    }
1510
1511    async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1512        // All the stores which use a RoomId as their key (and nothing additional).
1513        let direct_stores = [keys::ROOM_INFOS, keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE];
1514
1515        // All the stores which use a RoomId as the first part of their key, but may
1516        // have some additional data in the key.
1517        let prefixed_stores = [
1518            keys::PROFILES,
1519            keys::DISPLAY_NAMES,
1520            keys::USER_IDS,
1521            keys::ROOM_STATE,
1522            keys::ROOM_ACCOUNT_DATA,
1523            keys::ROOM_EVENT_RECEIPTS,
1524            keys::ROOM_USER_RECEIPTS,
1525            keys::STRIPPED_ROOM_STATE,
1526            keys::STRIPPED_USER_IDS,
1527            keys::THREAD_SUBSCRIPTIONS,
1528        ];
1529
1530        let all_stores = {
1531            let mut v = Vec::new();
1532            v.extend(prefixed_stores);
1533            v.extend(direct_stores);
1534            v
1535        };
1536
1537        let tx =
1538            self.inner.transaction(all_stores).with_mode(TransactionMode::Readwrite).build()?;
1539
1540        for store_name in direct_stores {
1541            tx.object_store(store_name)?.delete(&self.encode_key(store_name, room_id)).build()?;
1542        }
1543
1544        for store_name in prefixed_stores {
1545            let store = tx.object_store(store_name)?;
1546            let range = self.encode_to_range(store_name, room_id);
1547            for key in store.get_all_keys::<JsValue>().with_query(&range).await? {
1548                store.delete(&key?).build()?;
1549            }
1550        }
1551
1552        tx.commit().await.map_err(|e| e.into())
1553    }
1554
1555    async fn get_user_ids(
1556        &self,
1557        room_id: &RoomId,
1558        memberships: RoomMemberships,
1559    ) -> Result<Vec<OwnedUserId>> {
1560        let ids = self.get_user_ids_inner(room_id, memberships, true).await?;
1561        if !ids.is_empty() {
1562            return Ok(ids);
1563        }
1564        self.get_user_ids_inner(room_id, memberships, false).await
1565    }
1566
1567    async fn save_send_queue_request(
1568        &self,
1569        room_id: &RoomId,
1570        transaction_id: OwnedTransactionId,
1571        created_at: MilliSecondsSinceUnixEpoch,
1572        kind: QueuedRequestKind,
1573        priority: usize,
1574    ) -> Result<()> {
1575        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1576
1577        let tx = self
1578            .inner
1579            .transaction(keys::ROOM_SEND_QUEUE)
1580            .with_mode(TransactionMode::Readwrite)
1581            .build()?;
1582
1583        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1584
1585        // We store an encoded vector of the queued requests, with their transaction
1586        // ids.
1587
1588        // Reload the previous vector for this room, or create an empty one.
1589        let prev = obj.get(&encoded_key).await?;
1590
1591        let mut prev = prev.map_or_else(
1592            || Ok(Vec::new()),
1593            |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1594        )?;
1595
1596        // Push the new request.
1597        prev.push(PersistedQueuedRequest {
1598            room_id: room_id.to_owned(),
1599            kind: Some(kind),
1600            transaction_id,
1601            error: None,
1602            is_wedged: None,
1603            event: None,
1604            priority: Some(priority),
1605            created_at,
1606        });
1607
1608        // Save the new vector into db.
1609        obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1610
1611        tx.commit().await?;
1612
1613        Ok(())
1614    }
1615
1616    async fn update_send_queue_request(
1617        &self,
1618        room_id: &RoomId,
1619        transaction_id: &TransactionId,
1620        kind: QueuedRequestKind,
1621    ) -> Result<bool> {
1622        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1623
1624        let tx = self
1625            .inner
1626            .transaction(keys::ROOM_SEND_QUEUE)
1627            .with_mode(TransactionMode::Readwrite)
1628            .build()?;
1629
1630        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1631
1632        // We store an encoded vector of the queued requests, with their transaction
1633        // ids.
1634
1635        // Reload the previous vector for this room, or create an empty one.
1636        let prev = obj.get(&encoded_key).await?;
1637
1638        let mut prev = prev.map_or_else(
1639            || Ok(Vec::new()),
1640            |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1641        )?;
1642
1643        // Modify the one request.
1644        if let Some(entry) = prev.iter_mut().find(|entry| entry.transaction_id == transaction_id) {
1645            entry.kind = Some(kind);
1646            // Reset the error state.
1647            entry.error = None;
1648            // Remove migrated fields.
1649            entry.is_wedged = None;
1650            entry.event = None;
1651
1652            // Save the new vector into db.
1653            obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1654            tx.commit().await?;
1655
1656            Ok(true)
1657        } else {
1658            Ok(false)
1659        }
1660    }
1661
1662    async fn remove_send_queue_request(
1663        &self,
1664        room_id: &RoomId,
1665        transaction_id: &TransactionId,
1666    ) -> Result<bool> {
1667        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1668
1669        let tx = self
1670            .inner
1671            .transaction([keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE])
1672            .with_mode(TransactionMode::Readwrite)
1673            .build()?;
1674
1675        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1676
1677        // We store an encoded vector of the queued requests, with their transaction
1678        // ids.
1679
1680        // Reload the previous vector for this room.
1681        if let Some(val) = obj.get(&encoded_key).await? {
1682            let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
1683            if let Some(pos) = prev.iter().position(|item| item.transaction_id == transaction_id) {
1684                prev.remove(pos);
1685
1686                if prev.is_empty() {
1687                    obj.delete(&encoded_key).build()?;
1688                } else {
1689                    obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1690                }
1691
1692                tx.commit().await?;
1693                return Ok(true);
1694            }
1695        }
1696
1697        Ok(false)
1698    }
1699
1700    async fn load_send_queue_requests(&self, room_id: &RoomId) -> Result<Vec<QueuedRequest>> {
1701        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1702
1703        // We store an encoded vector of the queued requests, with their transaction
1704        // ids.
1705        let prev = self
1706            .inner
1707            .transaction(keys::ROOM_SEND_QUEUE)
1708            .with_mode(TransactionMode::Readwrite)
1709            .build()?
1710            .object_store(keys::ROOM_SEND_QUEUE)?
1711            .get(&encoded_key)
1712            .await?;
1713
1714        let mut prev = prev.map_or_else(
1715            || Ok(Vec::new()),
1716            |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1717        )?;
1718
1719        // Inverted stable ordering on priority.
1720        prev.sort_by_key(|item| Reverse(item.priority.unwrap_or(0)));
1721
1722        Ok(prev.into_iter().filter_map(PersistedQueuedRequest::into_queued_request).collect())
1723    }
1724
1725    async fn update_send_queue_request_status(
1726        &self,
1727        room_id: &RoomId,
1728        transaction_id: &TransactionId,
1729        error: Option<QueueWedgeError>,
1730    ) -> Result<()> {
1731        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1732
1733        let tx = self
1734            .inner
1735            .transaction(keys::ROOM_SEND_QUEUE)
1736            .with_mode(TransactionMode::Readwrite)
1737            .build()?;
1738
1739        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1740
1741        if let Some(val) = obj.get(&encoded_key).await? {
1742            let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
1743            if let Some(request) =
1744                prev.iter_mut().find(|item| item.transaction_id == transaction_id)
1745            {
1746                request.is_wedged = None;
1747                request.error = error;
1748                obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1749            }
1750        }
1751
1752        tx.commit().await?;
1753
1754        Ok(())
1755    }
1756
1757    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>> {
1758        let tx = self
1759            .inner
1760            .transaction(keys::ROOM_SEND_QUEUE)
1761            .with_mode(TransactionMode::Readwrite)
1762            .build()?;
1763
1764        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1765
1766        let all_entries = obj
1767            .get_all()
1768            .await?
1769            .map(|item| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&item?))
1770            .collect::<Result<Vec<Vec<PersistedQueuedRequest>>, _>>()?
1771            .into_iter()
1772            .flat_map(|vec| vec.into_iter().map(|item| item.room_id))
1773            .collect::<BTreeSet<_>>();
1774
1775        Ok(all_entries.into_iter().collect())
1776    }
1777
1778    async fn save_dependent_queued_request(
1779        &self,
1780        room_id: &RoomId,
1781        parent_txn_id: &TransactionId,
1782        own_txn_id: ChildTransactionId,
1783        created_at: MilliSecondsSinceUnixEpoch,
1784        content: DependentQueuedRequestKind,
1785    ) -> Result<()> {
1786        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1787
1788        let tx = self
1789            .inner
1790            .transaction(keys::DEPENDENT_SEND_QUEUE)
1791            .with_mode(TransactionMode::Readwrite)
1792            .build()?;
1793
1794        let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1795
1796        // We store an encoded vector of the dependent requests.
1797        // Reload the previous vector for this room, or create an empty one.
1798        let prev = obj.get(&encoded_key).await?;
1799
1800        let mut prev = prev.map_or_else(
1801            || Ok(Vec::new()),
1802            |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1803        )?;
1804
1805        // Push the new request.
1806        prev.push(DependentQueuedRequest {
1807            kind: content,
1808            parent_transaction_id: parent_txn_id.to_owned(),
1809            own_transaction_id: own_txn_id,
1810            parent_key: None,
1811            created_at,
1812        });
1813
1814        // Save the new vector into db.
1815        obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1816
1817        tx.commit().await?;
1818
1819        Ok(())
1820    }
1821
1822    async fn update_dependent_queued_request(
1823        &self,
1824        room_id: &RoomId,
1825        own_transaction_id: &ChildTransactionId,
1826        new_content: DependentQueuedRequestKind,
1827    ) -> Result<bool> {
1828        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1829
1830        let tx = self
1831            .inner
1832            .transaction(keys::DEPENDENT_SEND_QUEUE)
1833            .with_mode(TransactionMode::Readwrite)
1834            .build()?;
1835
1836        let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1837
1838        // We store an encoded vector of the dependent requests.
1839        // Reload the previous vector for this room, or create an empty one.
1840        let prev = obj.get(&encoded_key).await?;
1841
1842        let mut prev = prev.map_or_else(
1843            || Ok(Vec::new()),
1844            |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1845        )?;
1846
1847        // Modify the dependent request, if found.
1848        let mut found = false;
1849        for entry in prev.iter_mut() {
1850            if entry.own_transaction_id == *own_transaction_id {
1851                found = true;
1852                entry.kind = new_content;
1853                break;
1854            }
1855        }
1856
1857        if found {
1858            obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1859            tx.commit().await?;
1860        }
1861
1862        Ok(found)
1863    }
1864
1865    async fn mark_dependent_queued_requests_as_ready(
1866        &self,
1867        room_id: &RoomId,
1868        parent_txn_id: &TransactionId,
1869        parent_key: SentRequestKey,
1870    ) -> Result<usize> {
1871        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1872
1873        let tx = self
1874            .inner
1875            .transaction(keys::DEPENDENT_SEND_QUEUE)
1876            .with_mode(TransactionMode::Readwrite)
1877            .build()?;
1878
1879        let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1880
1881        // We store an encoded vector of the dependent requests.
1882        // Reload the previous vector for this room, or create an empty one.
1883        let prev = obj.get(&encoded_key).await?;
1884
1885        let mut prev = prev.map_or_else(
1886            || Ok(Vec::new()),
1887            |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1888        )?;
1889
1890        // Modify all requests that match.
1891        let mut num_updated = 0;
1892        for entry in prev.iter_mut().filter(|entry| entry.parent_transaction_id == parent_txn_id) {
1893            entry.parent_key = Some(parent_key.clone());
1894            num_updated += 1;
1895        }
1896
1897        if num_updated > 0 {
1898            obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1899            tx.commit().await?;
1900        }
1901
1902        Ok(num_updated)
1903    }
1904
1905    async fn remove_dependent_queued_request(
1906        &self,
1907        room_id: &RoomId,
1908        txn_id: &ChildTransactionId,
1909    ) -> Result<bool> {
1910        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1911
1912        let tx = self
1913            .inner
1914            .transaction(keys::DEPENDENT_SEND_QUEUE)
1915            .with_mode(TransactionMode::Readwrite)
1916            .build()?;
1917
1918        let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1919
1920        // We store an encoded vector of the dependent requests.
1921        // Reload the previous vector for this room.
1922        if let Some(val) = obj.get(&encoded_key).await? {
1923            let mut prev = self.deserialize_value::<Vec<DependentQueuedRequest>>(&val)?;
1924            if let Some(pos) = prev.iter().position(|item| item.own_transaction_id == *txn_id) {
1925                prev.remove(pos);
1926
1927                if prev.is_empty() {
1928                    obj.delete(&encoded_key).build()?;
1929                } else {
1930                    obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1931                }
1932
1933                tx.commit().await?;
1934                return Ok(true);
1935            }
1936        }
1937
1938        Ok(false)
1939    }
1940
1941    async fn load_dependent_queued_requests(
1942        &self,
1943        room_id: &RoomId,
1944    ) -> Result<Vec<DependentQueuedRequest>> {
1945        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1946
1947        // We store an encoded vector of the dependent requests.
1948        let prev = self
1949            .inner
1950            .transaction(keys::DEPENDENT_SEND_QUEUE)
1951            .with_mode(TransactionMode::Readwrite)
1952            .build()?
1953            .object_store(keys::DEPENDENT_SEND_QUEUE)?
1954            .get(&encoded_key)
1955            .await?;
1956
1957        prev.map_or_else(
1958            || Ok(Vec::new()),
1959            |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1960        )
1961    }
1962
1963    async fn upsert_thread_subscriptions(
1964        &self,
1965        updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1966    ) -> Result<()> {
1967        let tx = self
1968            .inner
1969            .transaction(keys::THREAD_SUBSCRIPTIONS)
1970            .with_mode(TransactionMode::Readwrite)
1971            .build()?;
1972        let obj = tx.object_store(keys::THREAD_SUBSCRIPTIONS)?;
1973
1974        for (room_id, thread_id, subscription) in updates {
1975            let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room_id, thread_id));
1976            let mut new = PersistedThreadSubscription::from(subscription);
1977
1978            // See if there's a previous subscription.
1979            if let Some(previous_value) = obj.get(&encoded_key).await? {
1980                let previous: PersistedThreadSubscription =
1981                    self.deserialize_value(&previous_value)?;
1982
1983                // If the previous status is the same as the new one, don't do anything.
1984                if new == previous {
1985                    continue;
1986                }
1987                if !compare_thread_subscription_bump_stamps(
1988                    previous.bump_stamp,
1989                    &mut new.bump_stamp,
1990                ) {
1991                    continue;
1992                }
1993            }
1994
1995            let serialized_value = self.serialize_value(&new);
1996            obj.put(&serialized_value?).with_key(encoded_key).build()?;
1997        }
1998
1999        tx.commit().await?;
2000
2001        Ok(())
2002    }
2003
2004    async fn load_thread_subscription(
2005        &self,
2006        room: &RoomId,
2007        thread_id: &EventId,
2008    ) -> Result<Option<StoredThreadSubscription>> {
2009        let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
2010
2011        let js_value = self
2012            .inner
2013            .transaction(keys::THREAD_SUBSCRIPTIONS)
2014            .with_mode(TransactionMode::Readonly)
2015            .build()?
2016            .object_store(keys::THREAD_SUBSCRIPTIONS)?
2017            .get(&encoded_key)
2018            .await?;
2019
2020        let Some(js_value) = js_value else {
2021            // We didn't have a previous subscription for this thread.
2022            return Ok(None);
2023        };
2024
2025        let sub: PersistedThreadSubscription = self.deserialize_value(&js_value)?;
2026
2027        let status = ThreadSubscriptionStatus::from_str(&sub.status).map_err(|_| {
2028            StoreError::InvalidData {
2029                details: format!(
2030                    "invalid thread status for room {room} and thread {thread_id}: {}",
2031                    sub.status
2032                ),
2033            }
2034        })?;
2035
2036        Ok(Some(StoredThreadSubscription { status, bump_stamp: sub.bump_stamp }))
2037    }
2038
2039    async fn remove_thread_subscription(&self, room: &RoomId, thread_id: &EventId) -> Result<()> {
2040        let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
2041
2042        let transaction = self
2043            .inner
2044            .transaction(keys::THREAD_SUBSCRIPTIONS)
2045            .with_mode(TransactionMode::Readwrite)
2046            .build()?;
2047        transaction.object_store(keys::THREAD_SUBSCRIPTIONS)?.delete(&encoded_key).await?;
2048        transaction.commit().await?;
2049
2050        Ok(())
2051    }
2052
2053    #[allow(clippy::unused_async)]
2054    async fn optimize(&self) -> Result<()> {
2055        Ok(())
2056    }
2057
2058    #[allow(clippy::unused_async)]
2059    async fn get_size(&self) -> Result<Option<usize>> {
2060        Ok(None)
2061    }
2062});
2063
2064/// A room member.
2065#[derive(Debug, Serialize, Deserialize)]
2066struct RoomMember {
2067    user_id: OwnedUserId,
2068    membership: MembershipState,
2069}
2070
2071impl From<&SyncStateEvent<RoomMemberEventContent>> for RoomMember {
2072    fn from(event: &SyncStateEvent<RoomMemberEventContent>) -> Self {
2073        Self { user_id: event.state_key().clone(), membership: event.membership().clone() }
2074    }
2075}
2076
2077impl From<&StrippedRoomMemberEvent> for RoomMember {
2078    fn from(event: &StrippedRoomMemberEvent) -> Self {
2079        Self { user_id: event.state_key.clone(), membership: event.content.membership.clone() }
2080    }
2081}
2082
2083#[cfg(test)]
2084mod migration_tests {
2085    use assert_matches2::assert_matches;
2086    use matrix_sdk_base::store::{QueuedRequestKind, SerializableEventContent};
2087    use ruma::{
2088        OwnedRoomId, OwnedTransactionId, TransactionId,
2089        events::room::message::RoomMessageEventContent, room_id,
2090    };
2091    use serde::{Deserialize, Serialize};
2092
2093    use crate::state_store::PersistedQueuedRequest;
2094
2095    #[derive(Serialize, Deserialize)]
2096    struct OldPersistedQueuedRequest {
2097        room_id: OwnedRoomId,
2098        event: SerializableEventContent,
2099        transaction_id: OwnedTransactionId,
2100        is_wedged: bool,
2101    }
2102
2103    // We now persist an error when an event failed to send instead of just a
2104    // boolean. To support that, `PersistedQueueEvent` changed a bool to
2105    // Option<bool>, ensures that this work properly.
2106    #[test]
2107    fn test_migrating_persisted_queue_event_serialization() {
2108        let room_a_id = room_id!("!room_a:dummy.local");
2109        let transaction_id = TransactionId::new();
2110        let content =
2111            SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())
2112                .unwrap();
2113
2114        let old_persisted_queue_event = OldPersistedQueuedRequest {
2115            room_id: room_a_id.to_owned(),
2116            event: content,
2117            transaction_id: transaction_id.clone(),
2118            is_wedged: true,
2119        };
2120
2121        let serialized_persisted = serde_json::to_vec(&old_persisted_queue_event).unwrap();
2122
2123        // Load it with the new version.
2124        let new_persisted: PersistedQueuedRequest =
2125            serde_json::from_slice(&serialized_persisted).unwrap();
2126
2127        assert_eq!(new_persisted.is_wedged, Some(true));
2128        assert!(new_persisted.error.is_none());
2129
2130        assert!(new_persisted.event.is_some());
2131        assert!(new_persisted.kind.is_none());
2132
2133        let queued = new_persisted.into_queued_request().unwrap();
2134        assert_matches!(queued.kind, QueuedRequestKind::Event { .. });
2135        assert_eq!(queued.transaction_id, transaction_id);
2136        assert!(queued.error.is_some());
2137    }
2138}
2139
2140#[cfg(all(test, target_family = "wasm"))]
2141mod tests {
2142    #[cfg(target_family = "wasm")]
2143    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
2144
2145    use matrix_sdk_base::statestore_integration_tests;
2146    use uuid::Uuid;
2147
2148    use super::{IndexeddbStateStore, Result};
2149
2150    async fn get_store() -> Result<IndexeddbStateStore> {
2151        let db_name = format!("test-state-plain-{}", Uuid::new_v4().as_hyphenated());
2152        Ok(IndexeddbStateStore::builder().name(db_name).build().await?)
2153    }
2154
2155    statestore_integration_tests!();
2156}
2157
2158#[cfg(all(test, target_family = "wasm"))]
2159mod encrypted_tests {
2160    #[cfg(target_family = "wasm")]
2161    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
2162
2163    use matrix_sdk_base::statestore_integration_tests;
2164    use uuid::Uuid;
2165
2166    use super::{IndexeddbStateStore, Result};
2167
2168    async fn get_store() -> Result<IndexeddbStateStore> {
2169        let db_name = format!("test-state-encrypted-{}", Uuid::new_v4().as_hyphenated());
2170        let passphrase = format!("some_passphrase-{}", Uuid::new_v4().as_hyphenated());
2171        Ok(IndexeddbStateStore::builder().name(db_name).passphrase(passphrase).build().await?)
2172    }
2173
2174    statestore_integration_tests!();
2175}