matrix_sdk_indexeddb/state_store/
mod.rs

1// Copyright 2021 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
17    str::FromStr as _,
18    sync::Arc,
19};
20
21use async_trait::async_trait;
22use gloo_utils::format::JsValueSerdeExt;
23use growable_bloom_filter::GrowableBloom;
24use indexed_db_futures::{
25    cursor::CursorDirection, database::Database, error::OpenDbError, prelude::*,
26    transaction::TransactionMode, KeyRange,
27};
28use matrix_sdk_base::{
29    deserialized_responses::{DisplayName, RawAnySyncOrStrippedState},
30    store::{
31        compare_thread_subscription_bump_stamps, ChildTransactionId, ComposerDraft,
32        DependentQueuedRequest, DependentQueuedRequestKind, QueuedRequest, QueuedRequestKind,
33        RoomLoadSettings, SentRequestKey, SerializableEventContent, ServerInfo, StateChanges,
34        StateStore, StoreError, StoredThreadSubscription, ThreadSubscriptionStatus,
35    },
36    MinimalRoomMemberEvent, RoomInfo, RoomMemberships, StateStoreDataKey, StateStoreDataValue,
37    ThreadSubscriptionCatchupToken, ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK,
38};
39use matrix_sdk_store_encryption::{Error as EncryptionError, StoreCipher};
40use ruma::{
41    canonical_json::{redact, RedactedBecause},
42    events::{
43        presence::PresenceEvent,
44        receipt::{Receipt, ReceiptThread, ReceiptType},
45        room::member::{
46            MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent, SyncRoomMemberEvent,
47        },
48        AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
49        GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, SyncStateEvent,
50    },
51    serde::Raw,
52    CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri,
53    OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
54};
55use serde::{de::DeserializeOwned, ser::Error, Deserialize, Serialize};
56use tracing::{debug, warn};
57use wasm_bindgen::JsValue;
58
59mod migrations;
60
61pub use self::migrations::MigrationConflictStrategy;
62use self::migrations::{upgrade_inner_db, upgrade_meta_db};
63use crate::{error::GenericError, serializer::safe_encode::traits::SafeEncode};
64
65#[derive(Debug, thiserror::Error)]
66pub enum IndexeddbStateStoreError {
67    #[error(transparent)]
68    Json(#[from] serde_json::Error),
69    #[error(transparent)]
70    Encryption(#[from] EncryptionError),
71    #[error("DomException {name} ({code}): {message}")]
72    DomException { name: String, message: String, code: u16 },
73    #[error(transparent)]
74    StoreError(#[from] StoreError),
75    #[error(
76        "Can't migrate {name} from {old_version} to {new_version} without deleting data. \
77         See MigrationConflictStrategy for ways to configure."
78    )]
79    MigrationConflict { name: String, old_version: u32, new_version: u32 },
80}
81
82impl From<GenericError> for IndexeddbStateStoreError {
83    fn from(value: GenericError) -> Self {
84        Self::StoreError(value.into())
85    }
86}
87
88impl From<web_sys::DomException> for IndexeddbStateStoreError {
89    fn from(frm: web_sys::DomException) -> IndexeddbStateStoreError {
90        IndexeddbStateStoreError::DomException {
91            name: frm.name(),
92            message: frm.message(),
93            code: frm.code(),
94        }
95    }
96}
97
98impl From<IndexeddbStateStoreError> for StoreError {
99    fn from(e: IndexeddbStateStoreError) -> Self {
100        match e {
101            IndexeddbStateStoreError::Json(e) => StoreError::Json(e),
102            IndexeddbStateStoreError::StoreError(e) => e,
103            IndexeddbStateStoreError::Encryption(e) => StoreError::Encryption(e),
104            _ => StoreError::backend(e),
105        }
106    }
107}
108
109impl From<indexed_db_futures::error::DomException> for IndexeddbStateStoreError {
110    fn from(value: indexed_db_futures::error::DomException) -> Self {
111        web_sys::DomException::from(value).into()
112    }
113}
114
115impl From<indexed_db_futures::error::SerialisationError> for IndexeddbStateStoreError {
116    fn from(value: indexed_db_futures::error::SerialisationError) -> Self {
117        Self::Json(serde_json::Error::custom(value.to_string()))
118    }
119}
120
121impl From<indexed_db_futures::error::UnexpectedDataError> for IndexeddbStateStoreError {
122    fn from(value: indexed_db_futures::error::UnexpectedDataError) -> Self {
123        IndexeddbStateStoreError::StoreError(StoreError::backend(value))
124    }
125}
126
127impl From<indexed_db_futures::error::JSError> for IndexeddbStateStoreError {
128    fn from(value: indexed_db_futures::error::JSError) -> Self {
129        GenericError::from(value.to_string()).into()
130    }
131}
132
133impl From<indexed_db_futures::error::Error> for IndexeddbStateStoreError {
134    fn from(value: indexed_db_futures::error::Error) -> Self {
135        use indexed_db_futures::error::Error;
136        match value {
137            Error::DomException(e) => e.into(),
138            Error::Serialisation(e) => e.into(),
139            Error::MissingData(e) => e.into(),
140            Error::Unknown(e) => e.into(),
141        }
142    }
143}
144
145impl From<OpenDbError> for IndexeddbStateStoreError {
146    fn from(value: OpenDbError) -> Self {
147        match value {
148            OpenDbError::Base(error) => error.into(),
149            _ => GenericError::from(value.to_string()).into(),
150        }
151    }
152}
153
154mod keys {
155    pub const INTERNAL_STATE: &str = "matrix-sdk-state";
156    pub const BACKUPS_META: &str = "backups";
157
158    pub const ACCOUNT_DATA: &str = "account_data";
159
160    pub const PROFILES: &str = "profiles";
161    pub const DISPLAY_NAMES: &str = "display_names";
162    pub const USER_IDS: &str = "user_ids";
163
164    pub const ROOM_STATE: &str = "room_state";
165    pub const ROOM_INFOS: &str = "room_infos";
166    pub const PRESENCE: &str = "presence";
167    pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
168    /// Table used to save send queue events.
169    pub const ROOM_SEND_QUEUE: &str = "room_send_queue";
170    /// Table used to save dependent send queue events.
171    pub const DEPENDENT_SEND_QUEUE: &str = "room_dependent_send_queue";
172    pub const THREAD_SUBSCRIPTIONS: &str = "room_thread_subscriptions";
173
174    pub const STRIPPED_ROOM_STATE: &str = "stripped_room_state";
175    pub const STRIPPED_USER_IDS: &str = "stripped_user_ids";
176
177    pub const ROOM_USER_RECEIPTS: &str = "room_user_receipts";
178    pub const ROOM_EVENT_RECEIPTS: &str = "room_event_receipts";
179
180    pub const CUSTOM: &str = "custom";
181    pub const KV: &str = "kv";
182
183    /// All names of the current state stores for convenience.
184    pub const ALL_STORES: &[&str] = &[
185        ACCOUNT_DATA,
186        PROFILES,
187        DISPLAY_NAMES,
188        USER_IDS,
189        ROOM_STATE,
190        ROOM_INFOS,
191        PRESENCE,
192        ROOM_ACCOUNT_DATA,
193        STRIPPED_ROOM_STATE,
194        STRIPPED_USER_IDS,
195        ROOM_USER_RECEIPTS,
196        ROOM_EVENT_RECEIPTS,
197        ROOM_SEND_QUEUE,
198        THREAD_SUBSCRIPTIONS,
199        DEPENDENT_SEND_QUEUE,
200        CUSTOM,
201        KV,
202    ];
203
204    // static keys
205
206    pub const STORE_KEY: &str = "store_key";
207}
208
209pub use keys::ALL_STORES;
210use matrix_sdk_base::store::QueueWedgeError;
211
212/// Encrypt (if needs be) then JSON-serialize a value.
213fn serialize_value(store_cipher: Option<&StoreCipher>, event: &impl Serialize) -> Result<JsValue> {
214    Ok(match store_cipher {
215        Some(cipher) => {
216            let data = serde_json::to_vec(event)?;
217            JsValue::from_serde(&cipher.encrypt_value_data(data)?)?
218        }
219        None => JsValue::from_serde(event)?,
220    })
221}
222
223/// Deserialize a JSON value and then decrypt it (if needs be).
224fn deserialize_value<T: DeserializeOwned>(
225    store_cipher: Option<&StoreCipher>,
226    event: &JsValue,
227) -> Result<T> {
228    match store_cipher {
229        Some(cipher) => {
230            use zeroize::Zeroize;
231            let mut plaintext = cipher.decrypt_value_data(event.into_serde()?)?;
232            let ret = serde_json::from_slice(&plaintext);
233            plaintext.zeroize();
234            Ok(ret?)
235        }
236        None => Ok(event.into_serde()?),
237    }
238}
239
240fn encode_key<T>(store_cipher: Option<&StoreCipher>, table_name: &str, key: T) -> JsValue
241where
242    T: SafeEncode,
243{
244    match store_cipher {
245        Some(cipher) => key.as_secure_string(table_name, cipher),
246        None => key.as_encoded_string(),
247    }
248    .into()
249}
250
251fn encode_to_range<T>(
252    store_cipher: Option<&StoreCipher>,
253    table_name: &str,
254    key: T,
255) -> KeyRange<JsValue>
256where
257    T: SafeEncode,
258{
259    match store_cipher {
260        Some(cipher) => key.encode_to_range_secure(table_name, cipher),
261        None => key.encode_to_range(),
262    }
263}
264
265/// Builder for [`IndexeddbStateStore`].
266#[derive(Debug)]
267pub struct IndexeddbStateStoreBuilder {
268    name: Option<String>,
269    passphrase: Option<String>,
270    migration_conflict_strategy: MigrationConflictStrategy,
271}
272
273impl IndexeddbStateStoreBuilder {
274    fn new() -> Self {
275        Self {
276            name: None,
277            passphrase: None,
278            migration_conflict_strategy: MigrationConflictStrategy::BackupAndDrop,
279        }
280    }
281
282    /// Set the name for the indexeddb store to use, `state` is none given.
283    pub fn name(mut self, value: String) -> Self {
284        self.name = Some(value);
285        self
286    }
287
288    /// Set the password the indexeddb should be encrypted with.
289    ///
290    /// If not given, the DB is not encrypted.
291    pub fn passphrase(mut self, value: String) -> Self {
292        self.passphrase = Some(value);
293        self
294    }
295
296    /// The strategy to use when a merge conflict is found.
297    ///
298    /// See [`MigrationConflictStrategy`] for details.
299    pub fn migration_conflict_strategy(mut self, value: MigrationConflictStrategy) -> Self {
300        self.migration_conflict_strategy = value;
301        self
302    }
303
304    pub async fn build(self) -> Result<IndexeddbStateStore> {
305        let migration_strategy = self.migration_conflict_strategy.clone();
306        let name = self.name.unwrap_or_else(|| "state".to_owned());
307
308        let meta_name = format!("{name}::{}", keys::INTERNAL_STATE);
309
310        let (meta, store_cipher) = upgrade_meta_db(&meta_name, self.passphrase.as_deref()).await?;
311        let inner =
312            upgrade_inner_db(&name, store_cipher.as_deref(), migration_strategy, &meta).await?;
313
314        Ok(IndexeddbStateStore { name, inner, meta, store_cipher })
315    }
316}
317
318pub struct IndexeddbStateStore {
319    name: String,
320    pub(crate) inner: Database,
321    pub(crate) meta: Database,
322    pub(crate) store_cipher: Option<Arc<StoreCipher>>,
323}
324
325#[cfg(not(tarpaulin_include))]
326impl std::fmt::Debug for IndexeddbStateStore {
327    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
328        f.debug_struct("IndexeddbStateStore").field("name", &self.name).finish()
329    }
330}
331
332type Result<A, E = IndexeddbStateStoreError> = std::result::Result<A, E>;
333
334impl IndexeddbStateStore {
335    /// Generate a IndexeddbStateStoreBuilder with default parameters
336    pub fn builder() -> IndexeddbStateStoreBuilder {
337        IndexeddbStateStoreBuilder::new()
338    }
339
340    /// The version of the database containing the data.
341    pub fn version(&self) -> u32 {
342        self.inner.version() as u32
343    }
344
345    /// The version of the database containing the metadata.
346    pub fn meta_version(&self) -> u32 {
347        self.meta.version() as u32
348    }
349
350    /// Whether this database has any migration backups
351    pub async fn has_backups(&self) -> Result<bool> {
352        Ok(self
353            .meta
354            .transaction(keys::BACKUPS_META)
355            .with_mode(TransactionMode::Readonly)
356            .build()?
357            .object_store(keys::BACKUPS_META)?
358            .count()
359            .await?
360            > 0)
361    }
362
363    /// What's the database name of the latest backup<
364    pub async fn latest_backup(&self) -> Result<Option<String>> {
365        if let Some(mut cursor) = self
366            .meta
367            .transaction(keys::BACKUPS_META)
368            .with_mode(TransactionMode::Readonly)
369            .build()?
370            .object_store(keys::BACKUPS_META)?
371            .open_cursor()
372            .with_direction(CursorDirection::Prev)
373            .await?
374        {
375            if let Some(record) = cursor.next_record::<JsValue>().await? {
376                return Ok(record.as_string());
377            }
378        }
379        Ok(None)
380    }
381
382    /// Encrypt (if needs be) then JSON-serialize a value.
383    fn serialize_value(&self, event: &impl Serialize) -> Result<JsValue> {
384        serialize_value(self.store_cipher.as_deref(), event)
385    }
386
387    /// Deserialize a JSON value and then decrypt it (if needs be).
388    fn deserialize_value<T: DeserializeOwned>(&self, event: &JsValue) -> Result<T> {
389        deserialize_value(self.store_cipher.as_deref(), event)
390    }
391
392    fn encode_key<T>(&self, table_name: &str, key: T) -> JsValue
393    where
394        T: SafeEncode,
395    {
396        encode_key(self.store_cipher.as_deref(), table_name, key)
397    }
398
399    fn encode_to_range<T>(&self, table_name: &str, key: T) -> KeyRange<JsValue>
400    where
401        T: SafeEncode,
402    {
403        encode_to_range(self.store_cipher.as_deref(), table_name, key)
404    }
405
406    /// Get user IDs for the given room with the given memberships and stripped
407    /// state.
408    pub async fn get_user_ids_inner(
409        &self,
410        room_id: &RoomId,
411        memberships: RoomMemberships,
412        stripped: bool,
413    ) -> Result<Vec<OwnedUserId>> {
414        let store_name = if stripped { keys::STRIPPED_USER_IDS } else { keys::USER_IDS };
415
416        let tx = self.inner.transaction(store_name).with_mode(TransactionMode::Readonly).build()?;
417        let store = tx.object_store(store_name)?;
418        let range = self.encode_to_range(store_name, room_id);
419
420        let user_ids = if memberships.is_empty() {
421            // It should be faster to just get all user IDs in this case.
422            store
423                .get_all()
424                .with_query(&range)
425                .await?
426                .filter_map(Result::ok)
427                .filter_map(|f| self.deserialize_value::<RoomMember>(&f).ok().map(|m| m.user_id))
428                .collect::<Vec<_>>()
429        } else {
430            let mut user_ids = Vec::new();
431            let cursor = store.open_cursor().with_query(&range).await?;
432
433            if let Some(mut cursor) = cursor {
434                while let Some(value) = cursor.next_record().await? {
435                    let member = self.deserialize_value::<RoomMember>(&value)?;
436
437                    if memberships.matches(&member.membership) {
438                        user_ids.push(member.user_id);
439                    }
440                }
441            }
442
443            user_ids
444        };
445
446        Ok(user_ids)
447    }
448
449    async fn get_custom_value_for_js(&self, jskey: &JsValue) -> Result<Option<Vec<u8>>> {
450        self.inner
451            .transaction(keys::CUSTOM)
452            .with_mode(TransactionMode::Readonly)
453            .build()?
454            .object_store(keys::CUSTOM)?
455            .get(jskey)
456            .await?
457            .map(|f| self.deserialize_value(&f))
458            .transpose()
459    }
460
461    fn encode_kv_data_key(&self, key: StateStoreDataKey<'_>) -> JsValue {
462        // Use the key (prefix) for the table name as well, to keep encoded
463        // keys compatible for the sync token and filters, which were in
464        // separate tables initially.
465        match key {
466            StateStoreDataKey::SyncToken => {
467                self.encode_key(StateStoreDataKey::SYNC_TOKEN, StateStoreDataKey::SYNC_TOKEN)
468            }
469            StateStoreDataKey::ServerInfo => {
470                self.encode_key(StateStoreDataKey::SERVER_INFO, StateStoreDataKey::SERVER_INFO)
471            }
472            StateStoreDataKey::Filter(filter_name) => {
473                self.encode_key(StateStoreDataKey::FILTER, (StateStoreDataKey::FILTER, filter_name))
474            }
475            StateStoreDataKey::UserAvatarUrl(user_id) => {
476                self.encode_key(keys::KV, (StateStoreDataKey::USER_AVATAR_URL, user_id))
477            }
478            StateStoreDataKey::RecentlyVisitedRooms(user_id) => {
479                self.encode_key(keys::KV, (StateStoreDataKey::RECENTLY_VISITED_ROOMS, user_id))
480            }
481            StateStoreDataKey::UtdHookManagerData => {
482                self.encode_key(keys::KV, StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
483            }
484            StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
485                self.encode_key(keys::KV, StateStoreDataKey::ONE_TIME_KEY_ALREADY_UPLOADED)
486            }
487            StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
488                if let Some(thread_root) = thread_root {
489                    self.encode_key(
490                        keys::KV,
491                        (StateStoreDataKey::COMPOSER_DRAFT, (room_id, thread_root)),
492                    )
493                } else {
494                    self.encode_key(keys::KV, (StateStoreDataKey::COMPOSER_DRAFT, room_id))
495                }
496            }
497            StateStoreDataKey::SeenKnockRequests(room_id) => {
498                self.encode_key(keys::KV, (StateStoreDataKey::SEEN_KNOCK_REQUESTS, room_id))
499            }
500            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
501                self.encode_key(keys::KV, StateStoreDataKey::THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS)
502            }
503        }
504    }
505}
506
507/// A superset of [`QueuedRequest`] that also contains the room id, since we
508/// want to return them.
509#[derive(Serialize, Deserialize)]
510struct PersistedQueuedRequest {
511    /// In which room is this event going to be sent.
512    pub room_id: OwnedRoomId,
513
514    // All these fields are the same as in [`QueuedRequest`].
515    /// Kind. Optional because it might be missing from previous formats.
516    kind: Option<QueuedRequestKind>,
517    transaction_id: OwnedTransactionId,
518
519    pub error: Option<QueueWedgeError>,
520
521    priority: Option<usize>,
522
523    /// The time the original message was first attempted to be sent at.
524    #[serde(default = "created_now")]
525    created_at: MilliSecondsSinceUnixEpoch,
526
527    // Migrated fields: keep these private, they're not used anymore elsewhere in the code base.
528    /// Deprecated (from old format), now replaced with error field.
529    is_wedged: Option<bool>,
530
531    event: Option<SerializableEventContent>,
532}
533
534fn created_now() -> MilliSecondsSinceUnixEpoch {
535    MilliSecondsSinceUnixEpoch::now()
536}
537
538impl PersistedQueuedRequest {
539    fn into_queued_request(self) -> Option<QueuedRequest> {
540        let kind =
541            self.kind.or_else(|| self.event.map(|content| QueuedRequestKind::Event { content }))?;
542
543        let error = match self.is_wedged {
544            Some(true) => {
545                // Migrate to a generic error.
546                Some(QueueWedgeError::GenericApiError {
547                    msg: "local echo failed to send in a previous session".into(),
548                })
549            }
550            _ => self.error,
551        };
552
553        // By default, events without a priority have a priority of 0.
554        let priority = self.priority.unwrap_or(0);
555
556        Some(QueuedRequest {
557            kind,
558            transaction_id: self.transaction_id,
559            error,
560            priority,
561            created_at: self.created_at,
562        })
563    }
564}
565
566#[derive(Serialize, Deserialize, PartialEq)]
567struct PersistedThreadSubscription {
568    status: String,
569    bump_stamp: Option<u64>,
570}
571
572impl From<StoredThreadSubscription> for PersistedThreadSubscription {
573    fn from(value: StoredThreadSubscription) -> Self {
574        Self { status: value.status.as_str().to_owned(), bump_stamp: value.bump_stamp }
575    }
576}
577
578// Small hack to have the following macro invocation act as the appropriate
579// trait impl block on wasm, but still be compiled on non-wasm as a regular
580// impl block otherwise.
581//
582// The trait impl doesn't compile on non-wasm due to unfulfilled trait bounds,
583// this hack allows us to still have most of rust-analyzer's IDE functionality
584// within the impl block without having to set it up to check things against
585// the wasm target (which would disable many other parts of the codebase).
586#[cfg(target_family = "wasm")]
587macro_rules! impl_state_store {
588    ({ $($body:tt)* }) => {
589        #[async_trait(?Send)]
590        impl StateStore for IndexeddbStateStore {
591            type Error = IndexeddbStateStoreError;
592
593            $($body)*
594        }
595    };
596}
597
598#[cfg(not(target_family = "wasm"))]
599macro_rules! impl_state_store {
600    ({ $($body:tt)* }) => {
601        impl IndexeddbStateStore {
602            $($body)*
603        }
604    };
605}
606
607impl_state_store!({
608    async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
609        let encoded_key = self.encode_kv_data_key(key);
610
611        let value = self
612            .inner
613            .transaction(keys::KV)
614            .with_mode(TransactionMode::Readonly)
615            .build()?
616            .object_store(keys::KV)?
617            .get(&encoded_key)
618            .await?;
619
620        let value = match key {
621            StateStoreDataKey::SyncToken => value
622                .map(|f| self.deserialize_value::<String>(&f))
623                .transpose()?
624                .map(StateStoreDataValue::SyncToken),
625            StateStoreDataKey::ServerInfo => value
626                .map(|f| self.deserialize_value::<ServerInfo>(&f))
627                .transpose()?
628                .map(StateStoreDataValue::ServerInfo),
629            StateStoreDataKey::Filter(_) => value
630                .map(|f| self.deserialize_value::<String>(&f))
631                .transpose()?
632                .map(StateStoreDataValue::Filter),
633            StateStoreDataKey::UserAvatarUrl(_) => value
634                .map(|f| self.deserialize_value::<OwnedMxcUri>(&f))
635                .transpose()?
636                .map(StateStoreDataValue::UserAvatarUrl),
637            StateStoreDataKey::RecentlyVisitedRooms(_) => value
638                .map(|f| self.deserialize_value::<Vec<OwnedRoomId>>(&f))
639                .transpose()?
640                .map(StateStoreDataValue::RecentlyVisitedRooms),
641            StateStoreDataKey::UtdHookManagerData => value
642                .map(|f| self.deserialize_value::<GrowableBloom>(&f))
643                .transpose()?
644                .map(StateStoreDataValue::UtdHookManagerData),
645            StateStoreDataKey::OneTimeKeyAlreadyUploaded => value
646                .map(|f| self.deserialize_value::<bool>(&f))
647                .transpose()?
648                .map(|_| StateStoreDataValue::OneTimeKeyAlreadyUploaded),
649            StateStoreDataKey::ComposerDraft(_, _) => value
650                .map(|f| self.deserialize_value::<ComposerDraft>(&f))
651                .transpose()?
652                .map(StateStoreDataValue::ComposerDraft),
653            StateStoreDataKey::SeenKnockRequests(_) => value
654                .map(|f| self.deserialize_value::<BTreeMap<OwnedEventId, OwnedUserId>>(&f))
655                .transpose()?
656                .map(StateStoreDataValue::SeenKnockRequests),
657            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => value
658                .map(|f| self.deserialize_value::<Vec<ThreadSubscriptionCatchupToken>>(&f))
659                .transpose()?
660                .map(StateStoreDataValue::ThreadSubscriptionsCatchupTokens),
661        };
662
663        Ok(value)
664    }
665
666    async fn set_kv_data(
667        &self,
668        key: StateStoreDataKey<'_>,
669        value: StateStoreDataValue,
670    ) -> Result<()> {
671        let encoded_key = self.encode_kv_data_key(key);
672
673        let serialized_value = match key {
674            StateStoreDataKey::SyncToken => self
675                .serialize_value(&value.into_sync_token().expect("Session data not a sync token")),
676            StateStoreDataKey::ServerInfo => self.serialize_value(
677                &value.into_server_info().expect("Session data not containing server info"),
678            ),
679            StateStoreDataKey::Filter(_) => {
680                self.serialize_value(&value.into_filter().expect("Session data not a filter"))
681            }
682            StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
683                &value.into_user_avatar_url().expect("Session data not an user avatar url"),
684            ),
685            StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
686                &value
687                    .into_recently_visited_rooms()
688                    .expect("Session data not a recently visited room list"),
689            ),
690            StateStoreDataKey::UtdHookManagerData => self.serialize_value(
691                &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
692            ),
693            StateStoreDataKey::OneTimeKeyAlreadyUploaded => self.serialize_value(&true),
694            StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
695                &value.into_composer_draft().expect("Session data not a composer draft"),
696            ),
697            StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
698                &value
699                    .into_seen_knock_requests()
700                    .expect("Session data is not a set of seen knock request ids"),
701            ),
702            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => self.serialize_value(
703                &value
704                    .into_thread_subscriptions_catchup_tokens()
705                    .expect("Session data is not a list of thread subscription catchup tokens"),
706            ),
707        };
708
709        let tx = self.inner.transaction(keys::KV).with_mode(TransactionMode::Readwrite).build()?;
710
711        let obj = tx.object_store(keys::KV)?;
712
713        obj.put(&serialized_value?).with_key(encoded_key).build()?;
714
715        tx.commit().await?;
716
717        Ok(())
718    }
719
720    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
721        let encoded_key = self.encode_kv_data_key(key);
722
723        let tx = self.inner.transaction(keys::KV).with_mode(TransactionMode::Readwrite).build()?;
724        let obj = tx.object_store(keys::KV)?;
725
726        obj.delete(&encoded_key).build()?;
727
728        tx.commit().await?;
729
730        Ok(())
731    }
732
733    async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
734        let mut stores: HashSet<&'static str> = [
735            (changes.sync_token.is_some(), keys::KV),
736            (!changes.ambiguity_maps.is_empty(), keys::DISPLAY_NAMES),
737            (!changes.account_data.is_empty(), keys::ACCOUNT_DATA),
738            (!changes.presence.is_empty(), keys::PRESENCE),
739            (
740                !changes.profiles.is_empty() || !changes.profiles_to_delete.is_empty(),
741                keys::PROFILES,
742            ),
743            (!changes.room_account_data.is_empty(), keys::ROOM_ACCOUNT_DATA),
744            (!changes.receipts.is_empty(), keys::ROOM_EVENT_RECEIPTS),
745        ]
746        .iter()
747        .filter_map(|(id, key)| if *id { Some(*key) } else { None })
748        .collect();
749
750        if !changes.state.is_empty() {
751            stores.extend([
752                keys::ROOM_STATE,
753                keys::USER_IDS,
754                keys::STRIPPED_USER_IDS,
755                keys::STRIPPED_ROOM_STATE,
756                keys::PROFILES,
757            ]);
758        }
759
760        if !changes.redactions.is_empty() {
761            stores.extend([keys::ROOM_STATE, keys::ROOM_INFOS]);
762        }
763
764        if !changes.room_infos.is_empty() {
765            stores.insert(keys::ROOM_INFOS);
766        }
767
768        if !changes.stripped_state.is_empty() {
769            stores.extend([keys::STRIPPED_ROOM_STATE, keys::STRIPPED_USER_IDS]);
770        }
771
772        if !changes.receipts.is_empty() {
773            stores.extend([keys::ROOM_EVENT_RECEIPTS, keys::ROOM_USER_RECEIPTS])
774        }
775
776        if stores.is_empty() {
777            // nothing to do, quit early
778            return Ok(());
779        }
780
781        let stores: Vec<&'static str> = stores.into_iter().collect();
782        let tx = self.inner.transaction(stores).with_mode(TransactionMode::Readwrite).build()?;
783
784        if let Some(s) = &changes.sync_token {
785            tx.object_store(keys::KV)?
786                .put(&self.serialize_value(s)?)
787                .with_key(self.encode_kv_data_key(StateStoreDataKey::SyncToken))
788                .build()?;
789        }
790
791        if !changes.ambiguity_maps.is_empty() {
792            let store = tx.object_store(keys::DISPLAY_NAMES)?;
793            for (room_id, ambiguity_maps) in &changes.ambiguity_maps {
794                for (display_name, map) in ambiguity_maps {
795                    let key = self.encode_key(
796                        keys::DISPLAY_NAMES,
797                        (
798                            room_id,
799                            display_name
800                                .as_normalized_str()
801                                .unwrap_or_else(|| display_name.as_raw_str()),
802                        ),
803                    );
804
805                    store.put(&self.serialize_value(&map)?).with_key(key).build()?;
806                }
807            }
808        }
809
810        if !changes.account_data.is_empty() {
811            let store = tx.object_store(keys::ACCOUNT_DATA)?;
812            for (event_type, event) in &changes.account_data {
813                store
814                    .put(&self.serialize_value(&event)?)
815                    .with_key(self.encode_key(keys::ACCOUNT_DATA, event_type))
816                    .build()?;
817            }
818        }
819
820        if !changes.room_account_data.is_empty() {
821            let store = tx.object_store(keys::ROOM_ACCOUNT_DATA)?;
822            for (room, events) in &changes.room_account_data {
823                for (event_type, event) in events {
824                    let key = self.encode_key(keys::ROOM_ACCOUNT_DATA, (room, event_type));
825                    store.put(&self.serialize_value(&event)?).with_key(key).build()?;
826                }
827            }
828        }
829
830        if !changes.state.is_empty() {
831            let state = tx.object_store(keys::ROOM_STATE)?;
832            let profiles = tx.object_store(keys::PROFILES)?;
833            let user_ids = tx.object_store(keys::USER_IDS)?;
834            let stripped_state = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
835            let stripped_user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
836
837            for (room, user_ids) in &changes.profiles_to_delete {
838                for user_id in user_ids {
839                    let key = self.encode_key(keys::PROFILES, (room, user_id));
840                    profiles.delete(&key).build()?;
841                }
842            }
843
844            for (room, event_types) in &changes.state {
845                let profile_changes = changes.profiles.get(room);
846
847                for (event_type, events) in event_types {
848                    for (state_key, raw_event) in events {
849                        let key = self.encode_key(keys::ROOM_STATE, (room, event_type, state_key));
850                        state
851                            .put(&self.serialize_value(&raw_event)?)
852                            .with_key(key.clone())
853                            .build()?;
854                        stripped_state.delete(&key).build()?;
855
856                        if *event_type == StateEventType::RoomMember {
857                            let event =
858                                match raw_event.deserialize_as_unchecked::<SyncRoomMemberEvent>() {
859                                    Ok(ev) => ev,
860                                    Err(e) => {
861                                        let event_id: Option<String> =
862                                            raw_event.get_field("event_id").ok().flatten();
863                                        debug!(event_id, "Failed to deserialize member event: {e}");
864                                        continue;
865                                    }
866                                };
867
868                            let key = (room, state_key);
869
870                            stripped_user_ids
871                                .delete(&self.encode_key(keys::STRIPPED_USER_IDS, key))
872                                .build()?;
873
874                            user_ids
875                                .put(&self.serialize_value(&RoomMember::from(&event))?)
876                                .with_key(self.encode_key(keys::USER_IDS, key))
877                                .build()?;
878
879                            if let Some(profile) =
880                                profile_changes.and_then(|p| p.get(event.state_key()))
881                            {
882                                profiles
883                                    .put(&self.serialize_value(&profile)?)
884                                    .with_key(self.encode_key(keys::PROFILES, key))
885                                    .build()?;
886                            }
887                        }
888                    }
889                }
890            }
891        }
892
893        if !changes.room_infos.is_empty() {
894            let room_infos = tx.object_store(keys::ROOM_INFOS)?;
895            for (room_id, room_info) in &changes.room_infos {
896                room_infos
897                    .put(&self.serialize_value(&room_info)?)
898                    .with_key(self.encode_key(keys::ROOM_INFOS, room_id))
899                    .build()?;
900            }
901        }
902
903        if !changes.presence.is_empty() {
904            let store = tx.object_store(keys::PRESENCE)?;
905            for (sender, event) in &changes.presence {
906                store
907                    .put(&self.serialize_value(&event)?)
908                    .with_key(self.encode_key(keys::PRESENCE, sender))
909                    .build()?;
910            }
911        }
912
913        if !changes.stripped_state.is_empty() {
914            let store = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
915            let user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
916
917            for (room, event_types) in &changes.stripped_state {
918                for (event_type, events) in event_types {
919                    for (state_key, raw_event) in events {
920                        let key = self
921                            .encode_key(keys::STRIPPED_ROOM_STATE, (room, event_type, state_key));
922                        store.put(&self.serialize_value(&raw_event)?).with_key(key).build()?;
923
924                        if *event_type == StateEventType::RoomMember {
925                            let event = match raw_event
926                                .deserialize_as_unchecked::<StrippedRoomMemberEvent>()
927                            {
928                                Ok(ev) => ev,
929                                Err(e) => {
930                                    let event_id: Option<String> =
931                                        raw_event.get_field("event_id").ok().flatten();
932                                    debug!(
933                                        event_id,
934                                        "Failed to deserialize stripped member event: {e}"
935                                    );
936                                    continue;
937                                }
938                            };
939
940                            let key = (room, state_key);
941
942                            user_ids
943                                .put(&self.serialize_value(&RoomMember::from(&event))?)
944                                .with_key(self.encode_key(keys::STRIPPED_USER_IDS, key))
945                                .build()?;
946                        }
947                    }
948                }
949            }
950        }
951
952        if !changes.receipts.is_empty() {
953            let room_user_receipts = tx.object_store(keys::ROOM_USER_RECEIPTS)?;
954            let room_event_receipts = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
955
956            for (room, content) in &changes.receipts {
957                for (event_id, receipts) in &content.0 {
958                    for (receipt_type, receipts) in receipts {
959                        for (user_id, receipt) in receipts {
960                            let key = match receipt.thread.as_str() {
961                                Some(thread_id) => self.encode_key(
962                                    keys::ROOM_USER_RECEIPTS,
963                                    (room, receipt_type, thread_id, user_id),
964                                ),
965                                None => self.encode_key(
966                                    keys::ROOM_USER_RECEIPTS,
967                                    (room, receipt_type, user_id),
968                                ),
969                            };
970
971                            if let Some((old_event, _)) =
972                                room_user_receipts.get(&key).await?.and_then(|f| {
973                                    self.deserialize_value::<(OwnedEventId, Receipt)>(&f).ok()
974                                })
975                            {
976                                let key = match receipt.thread.as_str() {
977                                    Some(thread_id) => self.encode_key(
978                                        keys::ROOM_EVENT_RECEIPTS,
979                                        (room, receipt_type, thread_id, old_event, user_id),
980                                    ),
981                                    None => self.encode_key(
982                                        keys::ROOM_EVENT_RECEIPTS,
983                                        (room, receipt_type, old_event, user_id),
984                                    ),
985                                };
986                                room_event_receipts.delete(&key).build()?;
987                            }
988
989                            room_user_receipts
990                                .put(&self.serialize_value(&(event_id, receipt))?)
991                                .with_key(key)
992                                .build()?;
993
994                            // Add the receipt to the room event receipts
995                            let key = match receipt.thread.as_str() {
996                                Some(thread_id) => self.encode_key(
997                                    keys::ROOM_EVENT_RECEIPTS,
998                                    (room, receipt_type, thread_id, event_id, user_id),
999                                ),
1000                                None => self.encode_key(
1001                                    keys::ROOM_EVENT_RECEIPTS,
1002                                    (room, receipt_type, event_id, user_id),
1003                                ),
1004                            };
1005                            room_event_receipts
1006                                .put(&self.serialize_value(&(user_id, receipt))?)
1007                                .with_key(key)
1008                                .build()?;
1009                        }
1010                    }
1011                }
1012            }
1013        }
1014
1015        if !changes.redactions.is_empty() {
1016            let state = tx.object_store(keys::ROOM_STATE)?;
1017            let room_info = tx.object_store(keys::ROOM_INFOS)?;
1018
1019            for (room_id, redactions) in &changes.redactions {
1020                let range = self.encode_to_range(keys::ROOM_STATE, room_id);
1021                let Some(mut cursor) = state.open_cursor().with_query(&range).await? else {
1022                    continue;
1023                };
1024
1025                let mut redaction_rules = None;
1026
1027                while let Some(value) = cursor.next_record().await? {
1028                    let Some(key) = cursor.key::<JsValue>()? else {
1029                        break;
1030                    };
1031
1032                    let raw_evt = self.deserialize_value::<Raw<AnySyncStateEvent>>(&value)?;
1033                    if let Ok(Some(event_id)) = raw_evt.get_field::<OwnedEventId>("event_id") {
1034                        if let Some(redaction) = redactions.get(&event_id) {
1035                            let redaction_rules = {
1036                                if redaction_rules.is_none() {
1037                                    redaction_rules.replace(room_info
1038                                        .get(&self.encode_key(keys::ROOM_INFOS, room_id))
1039                                        .await?
1040                                        .and_then(|f| self.deserialize_value::<RoomInfo>(&f).ok())
1041                                        .map(|info| info.room_version_rules_or_default())
1042                                        .unwrap_or_else(|| {
1043                                            warn!(?room_id, "Unable to get the room version rules, defaulting to rules for room version {ROOM_VERSION_FALLBACK}");
1044                                            ROOM_VERSION_RULES_FALLBACK
1045                                        }).redaction
1046                                    );
1047                                }
1048                                redaction_rules.as_ref().unwrap()
1049                            };
1050
1051                            let redacted = redact(
1052                                raw_evt.deserialize_as::<CanonicalJsonObject>()?,
1053                                redaction_rules,
1054                                Some(RedactedBecause::from_raw_event(redaction)?),
1055                            )
1056                            .map_err(StoreError::Redaction)?;
1057                            state.put(&self.serialize_value(&redacted)?).with_key(key).build()?;
1058                        }
1059                    }
1060                }
1061            }
1062        }
1063
1064        tx.commit().await.map_err(|e| e.into())
1065    }
1066
1067    async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1068        self.inner
1069            .transaction(keys::PRESENCE)
1070            .with_mode(TransactionMode::Readonly)
1071            .build()?
1072            .object_store(keys::PRESENCE)?
1073            .get(&self.encode_key(keys::PRESENCE, user_id))
1074            .await?
1075            .map(|f| self.deserialize_value(&f))
1076            .transpose()
1077    }
1078
1079    async fn get_presence_events(
1080        &self,
1081        user_ids: &[OwnedUserId],
1082    ) -> Result<Vec<Raw<PresenceEvent>>> {
1083        if user_ids.is_empty() {
1084            return Ok(Vec::new());
1085        }
1086
1087        let txn =
1088            self.inner.transaction(keys::PRESENCE).with_mode(TransactionMode::Readonly).build()?;
1089        let store = txn.object_store(keys::PRESENCE)?;
1090
1091        let mut events = Vec::with_capacity(user_ids.len());
1092
1093        for user_id in user_ids {
1094            if let Some(event) = store
1095                .get(&self.encode_key(keys::PRESENCE, user_id))
1096                .await?
1097                .map(|f| self.deserialize_value(&f))
1098                .transpose()?
1099            {
1100                events.push(event)
1101            }
1102        }
1103
1104        Ok(events)
1105    }
1106
1107    async fn get_state_event(
1108        &self,
1109        room_id: &RoomId,
1110        event_type: StateEventType,
1111        state_key: &str,
1112    ) -> Result<Option<RawAnySyncOrStrippedState>> {
1113        Ok(self
1114            .get_state_events_for_keys(room_id, event_type, &[state_key])
1115            .await?
1116            .into_iter()
1117            .next())
1118    }
1119
1120    async fn get_state_events(
1121        &self,
1122        room_id: &RoomId,
1123        event_type: StateEventType,
1124    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1125        let stripped_range =
1126            self.encode_to_range(keys::STRIPPED_ROOM_STATE, (room_id, &event_type));
1127        let stripped_events = self
1128            .inner
1129            .transaction(keys::STRIPPED_ROOM_STATE)
1130            .with_mode(TransactionMode::Readonly)
1131            .build()?
1132            .object_store(keys::STRIPPED_ROOM_STATE)?
1133            .get_all()
1134            .with_query(&stripped_range)
1135            .await?
1136            .filter_map(Result::ok)
1137            .filter_map(|f| {
1138                self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Stripped)
1139            })
1140            .collect::<Vec<_>>();
1141
1142        if !stripped_events.is_empty() {
1143            return Ok(stripped_events);
1144        }
1145
1146        let range = self.encode_to_range(keys::ROOM_STATE, (room_id, event_type));
1147        Ok(self
1148            .inner
1149            .transaction(keys::ROOM_STATE)
1150            .with_mode(TransactionMode::Readonly)
1151            .build()?
1152            .object_store(keys::ROOM_STATE)?
1153            .get_all()
1154            .with_query(&range)
1155            .await?
1156            .filter_map(Result::ok)
1157            .filter_map(|f| self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Sync))
1158            .collect::<Vec<_>>())
1159    }
1160
1161    async fn get_state_events_for_keys(
1162        &self,
1163        room_id: &RoomId,
1164        event_type: StateEventType,
1165        state_keys: &[&str],
1166    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1167        if state_keys.is_empty() {
1168            return Ok(Vec::new());
1169        }
1170
1171        let mut events = Vec::with_capacity(state_keys.len());
1172
1173        {
1174            let txn = self
1175                .inner
1176                .transaction(keys::STRIPPED_ROOM_STATE)
1177                .with_mode(TransactionMode::Readonly)
1178                .build()?;
1179            let store = txn.object_store(keys::STRIPPED_ROOM_STATE)?;
1180
1181            for state_key in state_keys {
1182                if let Some(event) =
1183                    store
1184                        .get(&self.encode_key(
1185                            keys::STRIPPED_ROOM_STATE,
1186                            (room_id, &event_type, state_key),
1187                        ))
1188                        .await?
1189                        .map(|f| self.deserialize_value(&f))
1190                        .transpose()?
1191                {
1192                    events.push(RawAnySyncOrStrippedState::Stripped(event));
1193                }
1194            }
1195
1196            if !events.is_empty() {
1197                return Ok(events);
1198            }
1199        }
1200
1201        let txn = self
1202            .inner
1203            .transaction(keys::ROOM_STATE)
1204            .with_mode(TransactionMode::Readonly)
1205            .build()?;
1206        let store = txn.object_store(keys::ROOM_STATE)?;
1207
1208        for state_key in state_keys {
1209            if let Some(event) = store
1210                .get(&self.encode_key(keys::ROOM_STATE, (room_id, &event_type, state_key)))
1211                .await?
1212                .map(|f| self.deserialize_value(&f))
1213                .transpose()?
1214            {
1215                events.push(RawAnySyncOrStrippedState::Sync(event));
1216            }
1217        }
1218
1219        Ok(events)
1220    }
1221
1222    async fn get_profile(
1223        &self,
1224        room_id: &RoomId,
1225        user_id: &UserId,
1226    ) -> Result<Option<MinimalRoomMemberEvent>> {
1227        self.inner
1228            .transaction(keys::PROFILES)
1229            .with_mode(TransactionMode::Readonly)
1230            .build()?
1231            .object_store(keys::PROFILES)?
1232            .get(&self.encode_key(keys::PROFILES, (room_id, user_id)))
1233            .await?
1234            .map(|f| self.deserialize_value(&f))
1235            .transpose()
1236    }
1237
1238    async fn get_profiles<'a>(
1239        &self,
1240        room_id: &RoomId,
1241        user_ids: &'a [OwnedUserId],
1242    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1243        if user_ids.is_empty() {
1244            return Ok(BTreeMap::new());
1245        }
1246
1247        let txn =
1248            self.inner.transaction(keys::PROFILES).with_mode(TransactionMode::Readonly).build()?;
1249        let store = txn.object_store(keys::PROFILES)?;
1250
1251        let mut profiles = BTreeMap::new();
1252        for user_id in user_ids {
1253            if let Some(profile) = store
1254                .get(&self.encode_key(keys::PROFILES, (room_id, user_id)))
1255                .await?
1256                .map(|f| self.deserialize_value(&f))
1257                .transpose()?
1258            {
1259                profiles.insert(user_id.as_ref(), profile);
1260            }
1261        }
1262
1263        Ok(profiles)
1264    }
1265
1266    async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1267        let transaction = self
1268            .inner
1269            .transaction(keys::ROOM_INFOS)
1270            .with_mode(TransactionMode::Readonly)
1271            .build()?;
1272
1273        let object_store = transaction.object_store(keys::ROOM_INFOS)?;
1274
1275        Ok(match room_load_settings {
1276            RoomLoadSettings::All => object_store
1277                .get_all()
1278                .await?
1279                .map(|room_info| self.deserialize_value::<RoomInfo>(&room_info?))
1280                .collect::<Result<_>>()?,
1281
1282            RoomLoadSettings::One(room_id) => {
1283                match object_store.get(&self.encode_key(keys::ROOM_INFOS, room_id)).await? {
1284                    Some(room_info) => vec![self.deserialize_value::<RoomInfo>(&room_info)?],
1285                    None => vec![],
1286                }
1287            }
1288        })
1289    }
1290
1291    async fn get_users_with_display_name(
1292        &self,
1293        room_id: &RoomId,
1294        display_name: &DisplayName,
1295    ) -> Result<BTreeSet<OwnedUserId>> {
1296        self.inner
1297            .transaction(keys::DISPLAY_NAMES)
1298            .with_mode(TransactionMode::Readonly)
1299            .build()?
1300            .object_store(keys::DISPLAY_NAMES)?
1301            .get(&self.encode_key(
1302                keys::DISPLAY_NAMES,
1303                (
1304                    room_id,
1305                    display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1306                ),
1307            ))
1308            .await?
1309            .map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
1310            .unwrap_or_else(|| Ok(Default::default()))
1311    }
1312
1313    async fn get_users_with_display_names<'a>(
1314        &self,
1315        room_id: &RoomId,
1316        display_names: &'a [DisplayName],
1317    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1318        let mut map = HashMap::new();
1319
1320        if display_names.is_empty() {
1321            return Ok(map);
1322        }
1323
1324        let txn = self
1325            .inner
1326            .transaction(keys::DISPLAY_NAMES)
1327            .with_mode(TransactionMode::Readonly)
1328            .build()?;
1329        let store = txn.object_store(keys::DISPLAY_NAMES)?;
1330
1331        for display_name in display_names {
1332            if let Some(user_ids) = store
1333                .get(
1334                    &self.encode_key(
1335                        keys::DISPLAY_NAMES,
1336                        (
1337                            room_id,
1338                            display_name
1339                                .as_normalized_str()
1340                                .unwrap_or_else(|| display_name.as_raw_str()),
1341                        ),
1342                    ),
1343                )
1344                .await?
1345                .map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
1346                .transpose()?
1347            {
1348                map.insert(display_name, user_ids);
1349            }
1350        }
1351
1352        Ok(map)
1353    }
1354
1355    async fn get_account_data_event(
1356        &self,
1357        event_type: GlobalAccountDataEventType,
1358    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1359        self.inner
1360            .transaction(keys::ACCOUNT_DATA)
1361            .with_mode(TransactionMode::Readonly)
1362            .build()?
1363            .object_store(keys::ACCOUNT_DATA)?
1364            .get(&self.encode_key(keys::ACCOUNT_DATA, event_type))
1365            .await?
1366            .map(|f| self.deserialize_value(&f))
1367            .transpose()
1368    }
1369
1370    async fn get_room_account_data_event(
1371        &self,
1372        room_id: &RoomId,
1373        event_type: RoomAccountDataEventType,
1374    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1375        self.inner
1376            .transaction(keys::ROOM_ACCOUNT_DATA)
1377            .with_mode(TransactionMode::Readonly)
1378            .build()?
1379            .object_store(keys::ROOM_ACCOUNT_DATA)?
1380            .get(&self.encode_key(keys::ROOM_ACCOUNT_DATA, (room_id, event_type)))
1381            .await?
1382            .map(|f| self.deserialize_value(&f))
1383            .transpose()
1384    }
1385
1386    async fn get_user_room_receipt_event(
1387        &self,
1388        room_id: &RoomId,
1389        receipt_type: ReceiptType,
1390        thread: ReceiptThread,
1391        user_id: &UserId,
1392    ) -> Result<Option<(OwnedEventId, Receipt)>> {
1393        let key = match thread.as_str() {
1394            Some(thread_id) => self
1395                .encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, thread_id, user_id)),
1396            None => self.encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, user_id)),
1397        };
1398        self.inner
1399            .transaction(keys::ROOM_USER_RECEIPTS)
1400            .with_mode(TransactionMode::Readonly)
1401            .build()?
1402            .object_store(keys::ROOM_USER_RECEIPTS)?
1403            .get(&key)
1404            .await?
1405            .map(|f| self.deserialize_value(&f))
1406            .transpose()
1407    }
1408
1409    async fn get_event_room_receipt_events(
1410        &self,
1411        room_id: &RoomId,
1412        receipt_type: ReceiptType,
1413        thread: ReceiptThread,
1414        event_id: &EventId,
1415    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1416        let range = match thread.as_str() {
1417            Some(thread_id) => self.encode_to_range(
1418                keys::ROOM_EVENT_RECEIPTS,
1419                (room_id, receipt_type, thread_id, event_id),
1420            ),
1421            None => {
1422                self.encode_to_range(keys::ROOM_EVENT_RECEIPTS, (room_id, receipt_type, event_id))
1423            }
1424        };
1425        let tx = self
1426            .inner
1427            .transaction(keys::ROOM_EVENT_RECEIPTS)
1428            .with_mode(TransactionMode::Readonly)
1429            .build()?;
1430        let store = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
1431
1432        Ok(store
1433            .get_all()
1434            .with_query(&range)
1435            .await?
1436            .filter_map(Result::ok)
1437            .filter_map(|f| self.deserialize_value(&f).ok())
1438            .collect::<Vec<_>>())
1439    }
1440
1441    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1442        let jskey = &JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1443        self.get_custom_value_for_js(jskey).await
1444    }
1445
1446    async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1447        let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1448
1449        let prev = self.get_custom_value_for_js(&jskey).await?;
1450
1451        let tx =
1452            self.inner.transaction(keys::CUSTOM).with_mode(TransactionMode::Readwrite).build()?;
1453
1454        tx.object_store(keys::CUSTOM)?
1455            .put(&self.serialize_value(&value)?)
1456            .with_key(jskey)
1457            .build()?;
1458
1459        tx.commit().await.map_err(IndexeddbStateStoreError::from)?;
1460        Ok(prev)
1461    }
1462
1463    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1464        let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1465
1466        let prev = self.get_custom_value_for_js(&jskey).await?;
1467
1468        let tx =
1469            self.inner.transaction(keys::CUSTOM).with_mode(TransactionMode::Readwrite).build()?;
1470
1471        tx.object_store(keys::CUSTOM)?.delete(&jskey).build()?;
1472
1473        tx.commit().await.map_err(IndexeddbStateStoreError::from)?;
1474        Ok(prev)
1475    }
1476
1477    async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1478        // All the stores which use a RoomId as their key (and nothing additional).
1479        let direct_stores = [keys::ROOM_INFOS, keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE];
1480
1481        // All the stores which use a RoomId as the first part of their key, but may
1482        // have some additional data in the key.
1483        let prefixed_stores = [
1484            keys::PROFILES,
1485            keys::DISPLAY_NAMES,
1486            keys::USER_IDS,
1487            keys::ROOM_STATE,
1488            keys::ROOM_ACCOUNT_DATA,
1489            keys::ROOM_EVENT_RECEIPTS,
1490            keys::ROOM_USER_RECEIPTS,
1491            keys::STRIPPED_ROOM_STATE,
1492            keys::STRIPPED_USER_IDS,
1493            keys::THREAD_SUBSCRIPTIONS,
1494        ];
1495
1496        let all_stores = {
1497            let mut v = Vec::new();
1498            v.extend(prefixed_stores);
1499            v.extend(direct_stores);
1500            v
1501        };
1502
1503        let tx =
1504            self.inner.transaction(all_stores).with_mode(TransactionMode::Readwrite).build()?;
1505
1506        for store_name in direct_stores {
1507            tx.object_store(store_name)?.delete(&self.encode_key(store_name, room_id)).build()?;
1508        }
1509
1510        for store_name in prefixed_stores {
1511            let store = tx.object_store(store_name)?;
1512            let range = self.encode_to_range(store_name, room_id);
1513            for key in store.get_all_keys::<JsValue>().with_query(&range).await? {
1514                store.delete(&key?).build()?;
1515            }
1516        }
1517
1518        tx.commit().await.map_err(|e| e.into())
1519    }
1520
1521    async fn get_user_ids(
1522        &self,
1523        room_id: &RoomId,
1524        memberships: RoomMemberships,
1525    ) -> Result<Vec<OwnedUserId>> {
1526        let ids = self.get_user_ids_inner(room_id, memberships, true).await?;
1527        if !ids.is_empty() {
1528            return Ok(ids);
1529        }
1530        self.get_user_ids_inner(room_id, memberships, false).await
1531    }
1532
1533    async fn save_send_queue_request(
1534        &self,
1535        room_id: &RoomId,
1536        transaction_id: OwnedTransactionId,
1537        created_at: MilliSecondsSinceUnixEpoch,
1538        kind: QueuedRequestKind,
1539        priority: usize,
1540    ) -> Result<()> {
1541        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1542
1543        let tx = self
1544            .inner
1545            .transaction(keys::ROOM_SEND_QUEUE)
1546            .with_mode(TransactionMode::Readwrite)
1547            .build()?;
1548
1549        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1550
1551        // We store an encoded vector of the queued requests, with their transaction
1552        // ids.
1553
1554        // Reload the previous vector for this room, or create an empty one.
1555        let prev = obj.get(&encoded_key).await?;
1556
1557        let mut prev = prev.map_or_else(
1558            || Ok(Vec::new()),
1559            |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1560        )?;
1561
1562        // Push the new request.
1563        prev.push(PersistedQueuedRequest {
1564            room_id: room_id.to_owned(),
1565            kind: Some(kind),
1566            transaction_id,
1567            error: None,
1568            is_wedged: None,
1569            event: None,
1570            priority: Some(priority),
1571            created_at,
1572        });
1573
1574        // Save the new vector into db.
1575        obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1576
1577        tx.commit().await?;
1578
1579        Ok(())
1580    }
1581
1582    async fn update_send_queue_request(
1583        &self,
1584        room_id: &RoomId,
1585        transaction_id: &TransactionId,
1586        kind: QueuedRequestKind,
1587    ) -> Result<bool> {
1588        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1589
1590        let tx = self
1591            .inner
1592            .transaction(keys::ROOM_SEND_QUEUE)
1593            .with_mode(TransactionMode::Readwrite)
1594            .build()?;
1595
1596        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1597
1598        // We store an encoded vector of the queued requests, with their transaction
1599        // ids.
1600
1601        // Reload the previous vector for this room, or create an empty one.
1602        let prev = obj.get(&encoded_key).await?;
1603
1604        let mut prev = prev.map_or_else(
1605            || Ok(Vec::new()),
1606            |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1607        )?;
1608
1609        // Modify the one request.
1610        if let Some(entry) = prev.iter_mut().find(|entry| entry.transaction_id == transaction_id) {
1611            entry.kind = Some(kind);
1612            // Reset the error state.
1613            entry.error = None;
1614            // Remove migrated fields.
1615            entry.is_wedged = None;
1616            entry.event = None;
1617
1618            // Save the new vector into db.
1619            obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1620            tx.commit().await?;
1621
1622            Ok(true)
1623        } else {
1624            Ok(false)
1625        }
1626    }
1627
1628    async fn remove_send_queue_request(
1629        &self,
1630        room_id: &RoomId,
1631        transaction_id: &TransactionId,
1632    ) -> Result<bool> {
1633        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1634
1635        let tx = self
1636            .inner
1637            .transaction([keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE])
1638            .with_mode(TransactionMode::Readwrite)
1639            .build()?;
1640
1641        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1642
1643        // We store an encoded vector of the queued requests, with their transaction
1644        // ids.
1645
1646        // Reload the previous vector for this room.
1647        if let Some(val) = obj.get(&encoded_key).await? {
1648            let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
1649            if let Some(pos) = prev.iter().position(|item| item.transaction_id == transaction_id) {
1650                prev.remove(pos);
1651
1652                if prev.is_empty() {
1653                    obj.delete(&encoded_key).build()?;
1654                } else {
1655                    obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1656                }
1657
1658                tx.commit().await?;
1659                return Ok(true);
1660            }
1661        }
1662
1663        Ok(false)
1664    }
1665
1666    async fn load_send_queue_requests(&self, room_id: &RoomId) -> Result<Vec<QueuedRequest>> {
1667        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1668
1669        // We store an encoded vector of the queued requests, with their transaction
1670        // ids.
1671        let prev = self
1672            .inner
1673            .transaction(keys::ROOM_SEND_QUEUE)
1674            .with_mode(TransactionMode::Readwrite)
1675            .build()?
1676            .object_store(keys::ROOM_SEND_QUEUE)?
1677            .get(&encoded_key)
1678            .await?;
1679
1680        let mut prev = prev.map_or_else(
1681            || Ok(Vec::new()),
1682            |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1683        )?;
1684
1685        // Inverted stable ordering on priority.
1686        prev.sort_by(|lhs, rhs| rhs.priority.unwrap_or(0).cmp(&lhs.priority.unwrap_or(0)));
1687
1688        Ok(prev.into_iter().filter_map(PersistedQueuedRequest::into_queued_request).collect())
1689    }
1690
1691    async fn update_send_queue_request_status(
1692        &self,
1693        room_id: &RoomId,
1694        transaction_id: &TransactionId,
1695        error: Option<QueueWedgeError>,
1696    ) -> Result<()> {
1697        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1698
1699        let tx = self
1700            .inner
1701            .transaction(keys::ROOM_SEND_QUEUE)
1702            .with_mode(TransactionMode::Readwrite)
1703            .build()?;
1704
1705        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1706
1707        if let Some(val) = obj.get(&encoded_key).await? {
1708            let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
1709            if let Some(request) =
1710                prev.iter_mut().find(|item| item.transaction_id == transaction_id)
1711            {
1712                request.is_wedged = None;
1713                request.error = error;
1714                obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1715            }
1716        }
1717
1718        tx.commit().await?;
1719
1720        Ok(())
1721    }
1722
1723    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>> {
1724        let tx = self
1725            .inner
1726            .transaction(keys::ROOM_SEND_QUEUE)
1727            .with_mode(TransactionMode::Readwrite)
1728            .build()?;
1729
1730        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1731
1732        let all_entries = obj
1733            .get_all()
1734            .await?
1735            .map(|item| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&item?))
1736            .collect::<Result<Vec<Vec<PersistedQueuedRequest>>, _>>()?
1737            .into_iter()
1738            .flat_map(|vec| vec.into_iter().map(|item| item.room_id))
1739            .collect::<BTreeSet<_>>();
1740
1741        Ok(all_entries.into_iter().collect())
1742    }
1743
1744    async fn save_dependent_queued_request(
1745        &self,
1746        room_id: &RoomId,
1747        parent_txn_id: &TransactionId,
1748        own_txn_id: ChildTransactionId,
1749        created_at: MilliSecondsSinceUnixEpoch,
1750        content: DependentQueuedRequestKind,
1751    ) -> Result<()> {
1752        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1753
1754        let tx = self
1755            .inner
1756            .transaction(keys::DEPENDENT_SEND_QUEUE)
1757            .with_mode(TransactionMode::Readwrite)
1758            .build()?;
1759
1760        let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1761
1762        // We store an encoded vector of the dependent requests.
1763        // Reload the previous vector for this room, or create an empty one.
1764        let prev = obj.get(&encoded_key).await?;
1765
1766        let mut prev = prev.map_or_else(
1767            || Ok(Vec::new()),
1768            |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1769        )?;
1770
1771        // Push the new request.
1772        prev.push(DependentQueuedRequest {
1773            kind: content,
1774            parent_transaction_id: parent_txn_id.to_owned(),
1775            own_transaction_id: own_txn_id,
1776            parent_key: None,
1777            created_at,
1778        });
1779
1780        // Save the new vector into db.
1781        obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1782
1783        tx.commit().await?;
1784
1785        Ok(())
1786    }
1787
1788    async fn update_dependent_queued_request(
1789        &self,
1790        room_id: &RoomId,
1791        own_transaction_id: &ChildTransactionId,
1792        new_content: DependentQueuedRequestKind,
1793    ) -> Result<bool> {
1794        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1795
1796        let tx = self
1797            .inner
1798            .transaction(keys::DEPENDENT_SEND_QUEUE)
1799            .with_mode(TransactionMode::Readwrite)
1800            .build()?;
1801
1802        let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1803
1804        // We store an encoded vector of the dependent requests.
1805        // Reload the previous vector for this room, or create an empty one.
1806        let prev = obj.get(&encoded_key).await?;
1807
1808        let mut prev = prev.map_or_else(
1809            || Ok(Vec::new()),
1810            |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1811        )?;
1812
1813        // Modify the dependent request, if found.
1814        let mut found = false;
1815        for entry in prev.iter_mut() {
1816            if entry.own_transaction_id == *own_transaction_id {
1817                found = true;
1818                entry.kind = new_content;
1819                break;
1820            }
1821        }
1822
1823        if found {
1824            obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1825            tx.commit().await?;
1826        }
1827
1828        Ok(found)
1829    }
1830
1831    async fn mark_dependent_queued_requests_as_ready(
1832        &self,
1833        room_id: &RoomId,
1834        parent_txn_id: &TransactionId,
1835        parent_key: SentRequestKey,
1836    ) -> Result<usize> {
1837        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1838
1839        let tx = self
1840            .inner
1841            .transaction(keys::DEPENDENT_SEND_QUEUE)
1842            .with_mode(TransactionMode::Readwrite)
1843            .build()?;
1844
1845        let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1846
1847        // We store an encoded vector of the dependent requests.
1848        // Reload the previous vector for this room, or create an empty one.
1849        let prev = obj.get(&encoded_key).await?;
1850
1851        let mut prev = prev.map_or_else(
1852            || Ok(Vec::new()),
1853            |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1854        )?;
1855
1856        // Modify all requests that match.
1857        let mut num_updated = 0;
1858        for entry in prev.iter_mut().filter(|entry| entry.parent_transaction_id == parent_txn_id) {
1859            entry.parent_key = Some(parent_key.clone());
1860            num_updated += 1;
1861        }
1862
1863        if num_updated > 0 {
1864            obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1865            tx.commit().await?;
1866        }
1867
1868        Ok(num_updated)
1869    }
1870
1871    async fn remove_dependent_queued_request(
1872        &self,
1873        room_id: &RoomId,
1874        txn_id: &ChildTransactionId,
1875    ) -> Result<bool> {
1876        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1877
1878        let tx = self
1879            .inner
1880            .transaction(keys::DEPENDENT_SEND_QUEUE)
1881            .with_mode(TransactionMode::Readwrite)
1882            .build()?;
1883
1884        let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1885
1886        // We store an encoded vector of the dependent requests.
1887        // Reload the previous vector for this room.
1888        if let Some(val) = obj.get(&encoded_key).await? {
1889            let mut prev = self.deserialize_value::<Vec<DependentQueuedRequest>>(&val)?;
1890            if let Some(pos) = prev.iter().position(|item| item.own_transaction_id == *txn_id) {
1891                prev.remove(pos);
1892
1893                if prev.is_empty() {
1894                    obj.delete(&encoded_key).build()?;
1895                } else {
1896                    obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1897                }
1898
1899                tx.commit().await?;
1900                return Ok(true);
1901            }
1902        }
1903
1904        Ok(false)
1905    }
1906
1907    async fn load_dependent_queued_requests(
1908        &self,
1909        room_id: &RoomId,
1910    ) -> Result<Vec<DependentQueuedRequest>> {
1911        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1912
1913        // We store an encoded vector of the dependent requests.
1914        let prev = self
1915            .inner
1916            .transaction(keys::DEPENDENT_SEND_QUEUE)
1917            .with_mode(TransactionMode::Readwrite)
1918            .build()?
1919            .object_store(keys::DEPENDENT_SEND_QUEUE)?
1920            .get(&encoded_key)
1921            .await?;
1922
1923        prev.map_or_else(
1924            || Ok(Vec::new()),
1925            |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1926        )
1927    }
1928
1929    async fn upsert_thread_subscription(
1930        &self,
1931        room: &RoomId,
1932        thread_id: &EventId,
1933        subscription: StoredThreadSubscription,
1934    ) -> Result<()> {
1935        let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
1936
1937        let tx = self
1938            .inner
1939            .transaction(keys::THREAD_SUBSCRIPTIONS)
1940            .with_mode(TransactionMode::Readwrite)
1941            .build()?;
1942        let obj = tx.object_store(keys::THREAD_SUBSCRIPTIONS)?;
1943
1944        let mut new = PersistedThreadSubscription::from(subscription);
1945
1946        // See if there's a previous subscription.
1947        if let Some(previous_value) = obj.get(&encoded_key).await? {
1948            let previous: PersistedThreadSubscription = self.deserialize_value(&previous_value)?;
1949
1950            // If the previous status is the same as the new one, don't do anything.
1951            if new == previous {
1952                return Ok(());
1953            }
1954            if !compare_thread_subscription_bump_stamps(previous.bump_stamp, &mut new.bump_stamp) {
1955                return Ok(());
1956            }
1957        }
1958
1959        let serialized_value = self.serialize_value(&new);
1960        obj.put(&serialized_value?).with_key(encoded_key).build()?;
1961
1962        tx.commit().await?;
1963
1964        Ok(())
1965    }
1966
1967    async fn load_thread_subscription(
1968        &self,
1969        room: &RoomId,
1970        thread_id: &EventId,
1971    ) -> Result<Option<StoredThreadSubscription>> {
1972        let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
1973
1974        let js_value = self
1975            .inner
1976            .transaction(keys::THREAD_SUBSCRIPTIONS)
1977            .with_mode(TransactionMode::Readonly)
1978            .build()?
1979            .object_store(keys::THREAD_SUBSCRIPTIONS)?
1980            .get(&encoded_key)
1981            .await?;
1982
1983        let Some(js_value) = js_value else {
1984            // We didn't have a previous subscription for this thread.
1985            return Ok(None);
1986        };
1987
1988        let sub: PersistedThreadSubscription = self.deserialize_value(&js_value)?;
1989
1990        let status = ThreadSubscriptionStatus::from_str(&sub.status).map_err(|_| {
1991            StoreError::InvalidData {
1992                details: format!(
1993                    "invalid thread status for room {room} and thread {thread_id}: {}",
1994                    sub.status
1995                ),
1996            }
1997        })?;
1998
1999        Ok(Some(StoredThreadSubscription { status, bump_stamp: sub.bump_stamp }))
2000    }
2001
2002    async fn remove_thread_subscription(&self, room: &RoomId, thread_id: &EventId) -> Result<()> {
2003        let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
2004
2005        let transaction = self
2006            .inner
2007            .transaction(keys::THREAD_SUBSCRIPTIONS)
2008            .with_mode(TransactionMode::Readwrite)
2009            .build()?;
2010        transaction.object_store(keys::THREAD_SUBSCRIPTIONS)?.delete(&encoded_key).await?;
2011        transaction.commit().await?;
2012
2013        Ok(())
2014    }
2015});
2016
2017/// A room member.
2018#[derive(Debug, Serialize, Deserialize)]
2019struct RoomMember {
2020    user_id: OwnedUserId,
2021    membership: MembershipState,
2022}
2023
2024impl From<&SyncStateEvent<RoomMemberEventContent>> for RoomMember {
2025    fn from(event: &SyncStateEvent<RoomMemberEventContent>) -> Self {
2026        Self { user_id: event.state_key().clone(), membership: event.membership().clone() }
2027    }
2028}
2029
2030impl From<&StrippedRoomMemberEvent> for RoomMember {
2031    fn from(event: &StrippedRoomMemberEvent) -> Self {
2032        Self { user_id: event.state_key.clone(), membership: event.content.membership.clone() }
2033    }
2034}
2035
2036#[cfg(test)]
2037mod migration_tests {
2038    use assert_matches2::assert_matches;
2039    use matrix_sdk_base::store::{QueuedRequestKind, SerializableEventContent};
2040    use ruma::{
2041        events::room::message::RoomMessageEventContent, room_id, OwnedRoomId, OwnedTransactionId,
2042        TransactionId,
2043    };
2044    use serde::{Deserialize, Serialize};
2045
2046    use crate::state_store::PersistedQueuedRequest;
2047
2048    #[derive(Serialize, Deserialize)]
2049    struct OldPersistedQueuedRequest {
2050        room_id: OwnedRoomId,
2051        event: SerializableEventContent,
2052        transaction_id: OwnedTransactionId,
2053        is_wedged: bool,
2054    }
2055
2056    // We now persist an error when an event failed to send instead of just a
2057    // boolean. To support that, `PersistedQueueEvent` changed a bool to
2058    // Option<bool>, ensures that this work properly.
2059    #[test]
2060    fn test_migrating_persisted_queue_event_serialization() {
2061        let room_a_id = room_id!("!room_a:dummy.local");
2062        let transaction_id = TransactionId::new();
2063        let content =
2064            SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())
2065                .unwrap();
2066
2067        let old_persisted_queue_event = OldPersistedQueuedRequest {
2068            room_id: room_a_id.to_owned(),
2069            event: content,
2070            transaction_id: transaction_id.clone(),
2071            is_wedged: true,
2072        };
2073
2074        let serialized_persisted = serde_json::to_vec(&old_persisted_queue_event).unwrap();
2075
2076        // Load it with the new version.
2077        let new_persisted: PersistedQueuedRequest =
2078            serde_json::from_slice(&serialized_persisted).unwrap();
2079
2080        assert_eq!(new_persisted.is_wedged, Some(true));
2081        assert!(new_persisted.error.is_none());
2082
2083        assert!(new_persisted.event.is_some());
2084        assert!(new_persisted.kind.is_none());
2085
2086        let queued = new_persisted.into_queued_request().unwrap();
2087        assert_matches!(queued.kind, QueuedRequestKind::Event { .. });
2088        assert_eq!(queued.transaction_id, transaction_id);
2089        assert!(queued.error.is_some());
2090    }
2091}
2092
2093#[cfg(all(test, target_family = "wasm"))]
2094mod tests {
2095    #[cfg(target_family = "wasm")]
2096    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
2097
2098    use matrix_sdk_base::statestore_integration_tests;
2099    use uuid::Uuid;
2100
2101    use super::{IndexeddbStateStore, Result};
2102
2103    async fn get_store() -> Result<IndexeddbStateStore> {
2104        let db_name = format!("test-state-plain-{}", Uuid::new_v4().as_hyphenated());
2105        Ok(IndexeddbStateStore::builder().name(db_name).build().await?)
2106    }
2107
2108    statestore_integration_tests!();
2109}
2110
2111#[cfg(all(test, target_family = "wasm"))]
2112mod encrypted_tests {
2113    #[cfg(target_family = "wasm")]
2114    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
2115
2116    use matrix_sdk_base::statestore_integration_tests;
2117    use uuid::Uuid;
2118
2119    use super::{IndexeddbStateStore, Result};
2120
2121    async fn get_store() -> Result<IndexeddbStateStore> {
2122        let db_name = format!("test-state-encrypted-{}", Uuid::new_v4().as_hyphenated());
2123        let passphrase = format!("some_passphrase-{}", Uuid::new_v4().as_hyphenated());
2124        Ok(IndexeddbStateStore::builder().name(db_name).passphrase(passphrase).build().await?)
2125    }
2126
2127    statestore_integration_tests!();
2128}