matrix_sdk_indexeddb/state_store/
mod.rs

1// Copyright 2021 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
17    str::FromStr as _,
18    sync::Arc,
19};
20
21use async_trait::async_trait;
22use gloo_utils::format::JsValueSerdeExt;
23use growable_bloom_filter::GrowableBloom;
24use indexed_db_futures::{
25    KeyRange, cursor::CursorDirection, database::Database, error::OpenDbError, prelude::*,
26    transaction::TransactionMode,
27};
28use matrix_sdk_base::{
29    MinimalRoomMemberEvent, ROOM_VERSION_FALLBACK, ROOM_VERSION_RULES_FALLBACK, RoomInfo,
30    RoomMemberships, StateStoreDataKey, StateStoreDataValue, ThreadSubscriptionCatchupToken,
31    deserialized_responses::{DisplayName, RawAnySyncOrStrippedState},
32    store::{
33        ChildTransactionId, ComposerDraft, DependentQueuedRequest, DependentQueuedRequestKind,
34        QueuedRequest, QueuedRequestKind, RoomLoadSettings, SentRequestKey,
35        SerializableEventContent, StateChanges, StateStore, StoreError, StoredThreadSubscription,
36        SupportedVersionsResponse, ThreadSubscriptionStatus, TtlStoreValue, WellKnownResponse,
37        compare_thread_subscription_bump_stamps,
38    },
39};
40use matrix_sdk_store_encryption::{Error as EncryptionError, StoreCipher};
41use ruma::{
42    CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri,
43    OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
44    canonical_json::{RedactedBecause, redact},
45    events::{
46        AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
47        GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, SyncStateEvent,
48        presence::PresenceEvent,
49        receipt::{Receipt, ReceiptThread, ReceiptType},
50        room::member::{
51            MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent, SyncRoomMemberEvent,
52        },
53    },
54    serde::Raw,
55};
56use serde::{Deserialize, Serialize, de::DeserializeOwned, ser::Error};
57use tracing::{debug, warn};
58use wasm_bindgen::JsValue;
59
60mod migrations;
61
62pub use self::migrations::MigrationConflictStrategy;
63use self::migrations::{upgrade_inner_db, upgrade_meta_db};
64use crate::{error::GenericError, serializer::safe_encode::traits::SafeEncode};
65
66#[derive(Debug, thiserror::Error)]
67pub enum IndexeddbStateStoreError {
68    #[error(transparent)]
69    Json(#[from] serde_json::Error),
70    #[error(transparent)]
71    Encryption(#[from] EncryptionError),
72    #[error("DomException {name} ({code}): {message}")]
73    DomException { name: String, message: String, code: u16 },
74    #[error(transparent)]
75    StoreError(#[from] StoreError),
76    #[error(
77        "Can't migrate {name} from {old_version} to {new_version} without deleting data. \
78         See MigrationConflictStrategy for ways to configure."
79    )]
80    MigrationConflict { name: String, old_version: u32, new_version: u32 },
81}
82
83impl From<GenericError> for IndexeddbStateStoreError {
84    fn from(value: GenericError) -> Self {
85        Self::StoreError(value.into())
86    }
87}
88
89impl From<web_sys::DomException> for IndexeddbStateStoreError {
90    fn from(frm: web_sys::DomException) -> IndexeddbStateStoreError {
91        IndexeddbStateStoreError::DomException {
92            name: frm.name(),
93            message: frm.message(),
94            code: frm.code(),
95        }
96    }
97}
98
99impl From<IndexeddbStateStoreError> for StoreError {
100    fn from(e: IndexeddbStateStoreError) -> Self {
101        match e {
102            IndexeddbStateStoreError::Json(e) => StoreError::Json(e),
103            IndexeddbStateStoreError::StoreError(e) => e,
104            IndexeddbStateStoreError::Encryption(e) => StoreError::Encryption(e),
105            _ => StoreError::backend(e),
106        }
107    }
108}
109
110impl From<indexed_db_futures::error::DomException> for IndexeddbStateStoreError {
111    fn from(value: indexed_db_futures::error::DomException) -> Self {
112        web_sys::DomException::from(value).into()
113    }
114}
115
116impl From<indexed_db_futures::error::SerialisationError> for IndexeddbStateStoreError {
117    fn from(value: indexed_db_futures::error::SerialisationError) -> Self {
118        Self::Json(serde_json::Error::custom(value.to_string()))
119    }
120}
121
122impl From<indexed_db_futures::error::UnexpectedDataError> for IndexeddbStateStoreError {
123    fn from(value: indexed_db_futures::error::UnexpectedDataError) -> Self {
124        IndexeddbStateStoreError::StoreError(StoreError::backend(value))
125    }
126}
127
128impl From<indexed_db_futures::error::JSError> for IndexeddbStateStoreError {
129    fn from(value: indexed_db_futures::error::JSError) -> Self {
130        GenericError::from(value.to_string()).into()
131    }
132}
133
134impl From<indexed_db_futures::error::Error> for IndexeddbStateStoreError {
135    fn from(value: indexed_db_futures::error::Error) -> Self {
136        use indexed_db_futures::error::Error;
137        match value {
138            Error::DomException(e) => e.into(),
139            Error::Serialisation(e) => e.into(),
140            Error::MissingData(e) => e.into(),
141            Error::Unknown(e) => e.into(),
142        }
143    }
144}
145
146impl From<OpenDbError> for IndexeddbStateStoreError {
147    fn from(value: OpenDbError) -> Self {
148        match value {
149            OpenDbError::Base(error) => error.into(),
150            _ => GenericError::from(value.to_string()).into(),
151        }
152    }
153}
154
155mod keys {
156    pub const INTERNAL_STATE: &str = "matrix-sdk-state";
157    pub const BACKUPS_META: &str = "backups";
158
159    pub const ACCOUNT_DATA: &str = "account_data";
160
161    pub const PROFILES: &str = "profiles";
162    pub const DISPLAY_NAMES: &str = "display_names";
163    pub const USER_IDS: &str = "user_ids";
164
165    pub const ROOM_STATE: &str = "room_state";
166    pub const ROOM_INFOS: &str = "room_infos";
167    pub const PRESENCE: &str = "presence";
168    pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
169    /// Table used to save send queue events.
170    pub const ROOM_SEND_QUEUE: &str = "room_send_queue";
171    /// Table used to save dependent send queue events.
172    pub const DEPENDENT_SEND_QUEUE: &str = "room_dependent_send_queue";
173    pub const THREAD_SUBSCRIPTIONS: &str = "room_thread_subscriptions";
174
175    pub const STRIPPED_ROOM_STATE: &str = "stripped_room_state";
176    pub const STRIPPED_USER_IDS: &str = "stripped_user_ids";
177
178    pub const ROOM_USER_RECEIPTS: &str = "room_user_receipts";
179    pub const ROOM_EVENT_RECEIPTS: &str = "room_event_receipts";
180
181    pub const CUSTOM: &str = "custom";
182    pub const KV: &str = "kv";
183
184    /// All names of the current state stores for convenience.
185    pub const ALL_STORES: &[&str] = &[
186        ACCOUNT_DATA,
187        PROFILES,
188        DISPLAY_NAMES,
189        USER_IDS,
190        ROOM_STATE,
191        ROOM_INFOS,
192        PRESENCE,
193        ROOM_ACCOUNT_DATA,
194        STRIPPED_ROOM_STATE,
195        STRIPPED_USER_IDS,
196        ROOM_USER_RECEIPTS,
197        ROOM_EVENT_RECEIPTS,
198        ROOM_SEND_QUEUE,
199        THREAD_SUBSCRIPTIONS,
200        DEPENDENT_SEND_QUEUE,
201        CUSTOM,
202        KV,
203    ];
204
205    // static keys
206
207    pub const STORE_KEY: &str = "store_key";
208}
209
210pub use keys::ALL_STORES;
211use matrix_sdk_base::store::QueueWedgeError;
212
213/// Encrypt (if needs be) then JSON-serialize a value.
214fn serialize_value(store_cipher: Option<&StoreCipher>, event: &impl Serialize) -> Result<JsValue> {
215    Ok(match store_cipher {
216        Some(cipher) => {
217            let data = serde_json::to_vec(event)?;
218            JsValue::from_serde(&cipher.encrypt_value_data(data)?)?
219        }
220        None => JsValue::from_serde(event)?,
221    })
222}
223
224/// Deserialize a JSON value and then decrypt it (if needs be).
225fn deserialize_value<T: DeserializeOwned>(
226    store_cipher: Option<&StoreCipher>,
227    event: &JsValue,
228) -> Result<T> {
229    match store_cipher {
230        Some(cipher) => {
231            use zeroize::Zeroize;
232            let mut plaintext = cipher.decrypt_value_data(event.into_serde()?)?;
233            let ret = serde_json::from_slice(&plaintext);
234            plaintext.zeroize();
235            Ok(ret?)
236        }
237        None => Ok(event.into_serde()?),
238    }
239}
240
241fn encode_key<T>(store_cipher: Option<&StoreCipher>, table_name: &str, key: T) -> JsValue
242where
243    T: SafeEncode,
244{
245    match store_cipher {
246        Some(cipher) => key.as_secure_string(table_name, cipher),
247        None => key.as_encoded_string(),
248    }
249    .into()
250}
251
252fn encode_to_range<T>(
253    store_cipher: Option<&StoreCipher>,
254    table_name: &str,
255    key: T,
256) -> KeyRange<JsValue>
257where
258    T: SafeEncode,
259{
260    match store_cipher {
261        Some(cipher) => key.encode_to_range_secure(table_name, cipher),
262        None => key.encode_to_range(),
263    }
264}
265
266/// Builder for [`IndexeddbStateStore`].
267#[derive(Debug)]
268pub struct IndexeddbStateStoreBuilder {
269    name: Option<String>,
270    passphrase: Option<String>,
271    migration_conflict_strategy: MigrationConflictStrategy,
272}
273
274impl IndexeddbStateStoreBuilder {
275    fn new() -> Self {
276        Self {
277            name: None,
278            passphrase: None,
279            migration_conflict_strategy: MigrationConflictStrategy::BackupAndDrop,
280        }
281    }
282
283    /// Set the name for the indexeddb store to use, `state` is none given.
284    pub fn name(mut self, value: String) -> Self {
285        self.name = Some(value);
286        self
287    }
288
289    /// Set the password the indexeddb should be encrypted with.
290    ///
291    /// If not given, the DB is not encrypted.
292    pub fn passphrase(mut self, value: String) -> Self {
293        self.passphrase = Some(value);
294        self
295    }
296
297    /// The strategy to use when a merge conflict is found.
298    ///
299    /// See [`MigrationConflictStrategy`] for details.
300    pub fn migration_conflict_strategy(mut self, value: MigrationConflictStrategy) -> Self {
301        self.migration_conflict_strategy = value;
302        self
303    }
304
305    pub async fn build(self) -> Result<IndexeddbStateStore> {
306        let migration_strategy = self.migration_conflict_strategy.clone();
307        let name = self.name.unwrap_or_else(|| "state".to_owned());
308
309        let meta_name = format!("{name}::{}", keys::INTERNAL_STATE);
310
311        let (meta, store_cipher) = upgrade_meta_db(&meta_name, self.passphrase.as_deref()).await?;
312        let inner =
313            upgrade_inner_db(&name, store_cipher.as_deref(), migration_strategy, &meta).await?;
314
315        Ok(IndexeddbStateStore { name, inner, meta, store_cipher })
316    }
317}
318
319pub struct IndexeddbStateStore {
320    name: String,
321    pub(crate) inner: Database,
322    pub(crate) meta: Database,
323    pub(crate) store_cipher: Option<Arc<StoreCipher>>,
324}
325
326#[cfg(not(tarpaulin_include))]
327impl std::fmt::Debug for IndexeddbStateStore {
328    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
329        f.debug_struct("IndexeddbStateStore").field("name", &self.name).finish()
330    }
331}
332
333type Result<A, E = IndexeddbStateStoreError> = std::result::Result<A, E>;
334
335impl IndexeddbStateStore {
336    /// Generate a IndexeddbStateStoreBuilder with default parameters
337    pub fn builder() -> IndexeddbStateStoreBuilder {
338        IndexeddbStateStoreBuilder::new()
339    }
340
341    /// The version of the database containing the data.
342    pub fn version(&self) -> u32 {
343        self.inner.version() as u32
344    }
345
346    /// The version of the database containing the metadata.
347    pub fn meta_version(&self) -> u32 {
348        self.meta.version() as u32
349    }
350
351    /// Whether this database has any migration backups
352    pub async fn has_backups(&self) -> Result<bool> {
353        Ok(self
354            .meta
355            .transaction(keys::BACKUPS_META)
356            .with_mode(TransactionMode::Readonly)
357            .build()?
358            .object_store(keys::BACKUPS_META)?
359            .count()
360            .await?
361            > 0)
362    }
363
364    /// What's the database name of the latest backup<
365    pub async fn latest_backup(&self) -> Result<Option<String>> {
366        if let Some(mut cursor) = self
367            .meta
368            .transaction(keys::BACKUPS_META)
369            .with_mode(TransactionMode::Readonly)
370            .build()?
371            .object_store(keys::BACKUPS_META)?
372            .open_cursor()
373            .with_direction(CursorDirection::Prev)
374            .await?
375            && let Some(record) = cursor.next_record::<JsValue>().await?
376        {
377            Ok(record.as_string())
378        } else {
379            Ok(None)
380        }
381    }
382
383    /// Encrypt (if needs be) then JSON-serialize a value.
384    fn serialize_value(&self, event: &impl Serialize) -> Result<JsValue> {
385        serialize_value(self.store_cipher.as_deref(), event)
386    }
387
388    /// Deserialize a JSON value and then decrypt it (if needs be).
389    fn deserialize_value<T: DeserializeOwned>(&self, event: &JsValue) -> Result<T> {
390        deserialize_value(self.store_cipher.as_deref(), event)
391    }
392
393    fn encode_key<T>(&self, table_name: &str, key: T) -> JsValue
394    where
395        T: SafeEncode,
396    {
397        encode_key(self.store_cipher.as_deref(), table_name, key)
398    }
399
400    fn encode_to_range<T>(&self, table_name: &str, key: T) -> KeyRange<JsValue>
401    where
402        T: SafeEncode,
403    {
404        encode_to_range(self.store_cipher.as_deref(), table_name, key)
405    }
406
407    /// Get user IDs for the given room with the given memberships and stripped
408    /// state.
409    pub async fn get_user_ids_inner(
410        &self,
411        room_id: &RoomId,
412        memberships: RoomMemberships,
413        stripped: bool,
414    ) -> Result<Vec<OwnedUserId>> {
415        let store_name = if stripped { keys::STRIPPED_USER_IDS } else { keys::USER_IDS };
416
417        let tx = self.inner.transaction(store_name).with_mode(TransactionMode::Readonly).build()?;
418        let store = tx.object_store(store_name)?;
419        let range = self.encode_to_range(store_name, room_id);
420
421        let user_ids = if memberships.is_empty() {
422            // It should be faster to just get all user IDs in this case.
423            store
424                .get_all()
425                .with_query(&range)
426                .await?
427                .filter_map(Result::ok)
428                .filter_map(|f| self.deserialize_value::<RoomMember>(&f).ok().map(|m| m.user_id))
429                .collect::<Vec<_>>()
430        } else {
431            let mut user_ids = Vec::new();
432            let cursor = store.open_cursor().with_query(&range).await?;
433
434            if let Some(mut cursor) = cursor {
435                while let Some(value) = cursor.next_record().await? {
436                    let member = self.deserialize_value::<RoomMember>(&value)?;
437
438                    if memberships.matches(&member.membership) {
439                        user_ids.push(member.user_id);
440                    }
441                }
442            }
443
444            user_ids
445        };
446
447        Ok(user_ids)
448    }
449
450    async fn get_custom_value_for_js(&self, jskey: &JsValue) -> Result<Option<Vec<u8>>> {
451        self.inner
452            .transaction(keys::CUSTOM)
453            .with_mode(TransactionMode::Readonly)
454            .build()?
455            .object_store(keys::CUSTOM)?
456            .get(jskey)
457            .await?
458            .map(|f| self.deserialize_value(&f))
459            .transpose()
460    }
461
462    fn encode_kv_data_key(&self, key: StateStoreDataKey<'_>) -> JsValue {
463        // Use the key (prefix) for the table name as well, to keep encoded
464        // keys compatible for the sync token and filters, which were in
465        // separate tables initially.
466        match key {
467            StateStoreDataKey::SyncToken => {
468                self.encode_key(StateStoreDataKey::SYNC_TOKEN, StateStoreDataKey::SYNC_TOKEN)
469            }
470            StateStoreDataKey::SupportedVersions => self.encode_key(
471                StateStoreDataKey::SUPPORTED_VERSIONS,
472                StateStoreDataKey::SUPPORTED_VERSIONS,
473            ),
474            StateStoreDataKey::WellKnown => {
475                self.encode_key(keys::KV, StateStoreDataKey::WELL_KNOWN)
476            }
477            StateStoreDataKey::Filter(filter_name) => {
478                self.encode_key(StateStoreDataKey::FILTER, (StateStoreDataKey::FILTER, filter_name))
479            }
480            StateStoreDataKey::UserAvatarUrl(user_id) => {
481                self.encode_key(keys::KV, (StateStoreDataKey::USER_AVATAR_URL, user_id))
482            }
483            StateStoreDataKey::RecentlyVisitedRooms(user_id) => {
484                self.encode_key(keys::KV, (StateStoreDataKey::RECENTLY_VISITED_ROOMS, user_id))
485            }
486            StateStoreDataKey::UtdHookManagerData => {
487                self.encode_key(keys::KV, StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
488            }
489            StateStoreDataKey::OneTimeKeyAlreadyUploaded => {
490                self.encode_key(keys::KV, StateStoreDataKey::ONE_TIME_KEY_ALREADY_UPLOADED)
491            }
492            StateStoreDataKey::ComposerDraft(room_id, thread_root) => {
493                if let Some(thread_root) = thread_root {
494                    self.encode_key(
495                        keys::KV,
496                        (StateStoreDataKey::COMPOSER_DRAFT, (room_id, thread_root)),
497                    )
498                } else {
499                    self.encode_key(keys::KV, (StateStoreDataKey::COMPOSER_DRAFT, room_id))
500                }
501            }
502            StateStoreDataKey::SeenKnockRequests(room_id) => {
503                self.encode_key(keys::KV, (StateStoreDataKey::SEEN_KNOCK_REQUESTS, room_id))
504            }
505            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => {
506                self.encode_key(keys::KV, StateStoreDataKey::THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS)
507            }
508        }
509    }
510}
511
512/// A superset of [`QueuedRequest`] that also contains the room id, since we
513/// want to return them.
514#[derive(Serialize, Deserialize)]
515struct PersistedQueuedRequest {
516    /// In which room is this event going to be sent.
517    pub room_id: OwnedRoomId,
518
519    // All these fields are the same as in [`QueuedRequest`].
520    /// Kind. Optional because it might be missing from previous formats.
521    kind: Option<QueuedRequestKind>,
522    transaction_id: OwnedTransactionId,
523
524    pub error: Option<QueueWedgeError>,
525
526    priority: Option<usize>,
527
528    /// The time the original message was first attempted to be sent at.
529    #[serde(default = "created_now")]
530    created_at: MilliSecondsSinceUnixEpoch,
531
532    // Migrated fields: keep these private, they're not used anymore elsewhere in the code base.
533    /// Deprecated (from old format), now replaced with error field.
534    is_wedged: Option<bool>,
535
536    event: Option<SerializableEventContent>,
537}
538
539fn created_now() -> MilliSecondsSinceUnixEpoch {
540    MilliSecondsSinceUnixEpoch::now()
541}
542
543impl PersistedQueuedRequest {
544    fn into_queued_request(self) -> Option<QueuedRequest> {
545        let kind =
546            self.kind.or_else(|| self.event.map(|content| QueuedRequestKind::Event { content }))?;
547
548        let error = match self.is_wedged {
549            Some(true) => {
550                // Migrate to a generic error.
551                Some(QueueWedgeError::GenericApiError {
552                    msg: "local echo failed to send in a previous session".into(),
553                })
554            }
555            _ => self.error,
556        };
557
558        // By default, events without a priority have a priority of 0.
559        let priority = self.priority.unwrap_or(0);
560
561        Some(QueuedRequest {
562            kind,
563            transaction_id: self.transaction_id,
564            error,
565            priority,
566            created_at: self.created_at,
567        })
568    }
569}
570
571#[derive(Serialize, Deserialize, PartialEq)]
572struct PersistedThreadSubscription {
573    status: String,
574    bump_stamp: Option<u64>,
575}
576
577impl From<StoredThreadSubscription> for PersistedThreadSubscription {
578    fn from(value: StoredThreadSubscription) -> Self {
579        Self { status: value.status.as_str().to_owned(), bump_stamp: value.bump_stamp }
580    }
581}
582
583// Small hack to have the following macro invocation act as the appropriate
584// trait impl block on wasm, but still be compiled on non-wasm as a regular
585// impl block otherwise.
586//
587// The trait impl doesn't compile on non-wasm due to unfulfilled trait bounds,
588// this hack allows us to still have most of rust-analyzer's IDE functionality
589// within the impl block without having to set it up to check things against
590// the wasm target (which would disable many other parts of the codebase).
591#[cfg(target_family = "wasm")]
592macro_rules! impl_state_store {
593    ({ $($body:tt)* }) => {
594        #[async_trait(?Send)]
595        impl StateStore for IndexeddbStateStore {
596            type Error = IndexeddbStateStoreError;
597
598            $($body)*
599        }
600    };
601}
602
603#[cfg(not(target_family = "wasm"))]
604macro_rules! impl_state_store {
605    ({ $($body:tt)* }) => {
606        impl IndexeddbStateStore {
607            $($body)*
608        }
609    };
610}
611
612impl_state_store!({
613    async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
614        let encoded_key = self.encode_kv_data_key(key);
615
616        let value = self
617            .inner
618            .transaction(keys::KV)
619            .with_mode(TransactionMode::Readonly)
620            .build()?
621            .object_store(keys::KV)?
622            .get(&encoded_key)
623            .await?;
624
625        let value = match key {
626            StateStoreDataKey::SyncToken => value
627                .map(|f| self.deserialize_value::<String>(&f))
628                .transpose()?
629                .map(StateStoreDataValue::SyncToken),
630            StateStoreDataKey::SupportedVersions => value
631                .map(|f| self.deserialize_value::<TtlStoreValue<SupportedVersionsResponse>>(&f))
632                .transpose()?
633                .map(StateStoreDataValue::SupportedVersions),
634            StateStoreDataKey::WellKnown => value
635                .map(|f| self.deserialize_value::<TtlStoreValue<Option<WellKnownResponse>>>(&f))
636                .transpose()?
637                .map(StateStoreDataValue::WellKnown),
638            StateStoreDataKey::Filter(_) => value
639                .map(|f| self.deserialize_value::<String>(&f))
640                .transpose()?
641                .map(StateStoreDataValue::Filter),
642            StateStoreDataKey::UserAvatarUrl(_) => value
643                .map(|f| self.deserialize_value::<OwnedMxcUri>(&f))
644                .transpose()?
645                .map(StateStoreDataValue::UserAvatarUrl),
646            StateStoreDataKey::RecentlyVisitedRooms(_) => value
647                .map(|f| self.deserialize_value::<Vec<OwnedRoomId>>(&f))
648                .transpose()?
649                .map(StateStoreDataValue::RecentlyVisitedRooms),
650            StateStoreDataKey::UtdHookManagerData => value
651                .map(|f| self.deserialize_value::<GrowableBloom>(&f))
652                .transpose()?
653                .map(StateStoreDataValue::UtdHookManagerData),
654            StateStoreDataKey::OneTimeKeyAlreadyUploaded => value
655                .map(|f| self.deserialize_value::<bool>(&f))
656                .transpose()?
657                .map(|_| StateStoreDataValue::OneTimeKeyAlreadyUploaded),
658            StateStoreDataKey::ComposerDraft(_, _) => value
659                .map(|f| self.deserialize_value::<ComposerDraft>(&f))
660                .transpose()?
661                .map(StateStoreDataValue::ComposerDraft),
662            StateStoreDataKey::SeenKnockRequests(_) => value
663                .map(|f| self.deserialize_value::<BTreeMap<OwnedEventId, OwnedUserId>>(&f))
664                .transpose()?
665                .map(StateStoreDataValue::SeenKnockRequests),
666            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => value
667                .map(|f| self.deserialize_value::<Vec<ThreadSubscriptionCatchupToken>>(&f))
668                .transpose()?
669                .map(StateStoreDataValue::ThreadSubscriptionsCatchupTokens),
670        };
671
672        Ok(value)
673    }
674
675    async fn set_kv_data(
676        &self,
677        key: StateStoreDataKey<'_>,
678        value: StateStoreDataValue,
679    ) -> Result<()> {
680        let encoded_key = self.encode_kv_data_key(key);
681
682        let serialized_value = match key {
683            StateStoreDataKey::SyncToken => self
684                .serialize_value(&value.into_sync_token().expect("Session data not a sync token")),
685            StateStoreDataKey::SupportedVersions => self.serialize_value(
686                &value
687                    .into_supported_versions()
688                    .expect("Session data not containing supported versions"),
689            ),
690            StateStoreDataKey::WellKnown => self.serialize_value(
691                &value.into_well_known().expect("Session data not containing well-known"),
692            ),
693            StateStoreDataKey::Filter(_) => {
694                self.serialize_value(&value.into_filter().expect("Session data not a filter"))
695            }
696            StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
697                &value.into_user_avatar_url().expect("Session data not an user avatar url"),
698            ),
699            StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
700                &value
701                    .into_recently_visited_rooms()
702                    .expect("Session data not a recently visited room list"),
703            ),
704            StateStoreDataKey::UtdHookManagerData => self.serialize_value(
705                &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
706            ),
707            StateStoreDataKey::OneTimeKeyAlreadyUploaded => self.serialize_value(&true),
708            StateStoreDataKey::ComposerDraft(_, _) => self.serialize_value(
709                &value.into_composer_draft().expect("Session data not a composer draft"),
710            ),
711            StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
712                &value
713                    .into_seen_knock_requests()
714                    .expect("Session data is not a set of seen knock request ids"),
715            ),
716            StateStoreDataKey::ThreadSubscriptionsCatchupTokens => self.serialize_value(
717                &value
718                    .into_thread_subscriptions_catchup_tokens()
719                    .expect("Session data is not a list of thread subscription catchup tokens"),
720            ),
721        };
722
723        let tx = self.inner.transaction(keys::KV).with_mode(TransactionMode::Readwrite).build()?;
724
725        let obj = tx.object_store(keys::KV)?;
726
727        obj.put(&serialized_value?).with_key(encoded_key).build()?;
728
729        tx.commit().await?;
730
731        Ok(())
732    }
733
734    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
735        let encoded_key = self.encode_kv_data_key(key);
736
737        let tx = self.inner.transaction(keys::KV).with_mode(TransactionMode::Readwrite).build()?;
738        let obj = tx.object_store(keys::KV)?;
739
740        obj.delete(&encoded_key).build()?;
741
742        tx.commit().await?;
743
744        Ok(())
745    }
746
747    async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
748        let mut stores: HashSet<&'static str> = [
749            (changes.sync_token.is_some(), keys::KV),
750            (!changes.ambiguity_maps.is_empty(), keys::DISPLAY_NAMES),
751            (!changes.account_data.is_empty(), keys::ACCOUNT_DATA),
752            (!changes.presence.is_empty(), keys::PRESENCE),
753            (
754                !changes.profiles.is_empty() || !changes.profiles_to_delete.is_empty(),
755                keys::PROFILES,
756            ),
757            (!changes.room_account_data.is_empty(), keys::ROOM_ACCOUNT_DATA),
758            (!changes.receipts.is_empty(), keys::ROOM_EVENT_RECEIPTS),
759        ]
760        .iter()
761        .filter_map(|(id, key)| if *id { Some(*key) } else { None })
762        .collect();
763
764        if !changes.state.is_empty() {
765            stores.extend([
766                keys::ROOM_STATE,
767                keys::USER_IDS,
768                keys::STRIPPED_USER_IDS,
769                keys::STRIPPED_ROOM_STATE,
770                keys::PROFILES,
771            ]);
772        }
773
774        if !changes.redactions.is_empty() {
775            stores.extend([keys::ROOM_STATE, keys::ROOM_INFOS]);
776        }
777
778        if !changes.room_infos.is_empty() {
779            stores.insert(keys::ROOM_INFOS);
780        }
781
782        if !changes.stripped_state.is_empty() {
783            stores.extend([keys::STRIPPED_ROOM_STATE, keys::STRIPPED_USER_IDS]);
784        }
785
786        if !changes.receipts.is_empty() {
787            stores.extend([keys::ROOM_EVENT_RECEIPTS, keys::ROOM_USER_RECEIPTS])
788        }
789
790        if stores.is_empty() {
791            // nothing to do, quit early
792            return Ok(());
793        }
794
795        let stores: Vec<&'static str> = stores.into_iter().collect();
796        let tx = self.inner.transaction(stores).with_mode(TransactionMode::Readwrite).build()?;
797
798        if let Some(s) = &changes.sync_token {
799            tx.object_store(keys::KV)?
800                .put(&self.serialize_value(s)?)
801                .with_key(self.encode_kv_data_key(StateStoreDataKey::SyncToken))
802                .build()?;
803        }
804
805        if !changes.ambiguity_maps.is_empty() {
806            let store = tx.object_store(keys::DISPLAY_NAMES)?;
807            for (room_id, ambiguity_maps) in &changes.ambiguity_maps {
808                for (display_name, map) in ambiguity_maps {
809                    let key = self.encode_key(
810                        keys::DISPLAY_NAMES,
811                        (
812                            room_id,
813                            display_name
814                                .as_normalized_str()
815                                .unwrap_or_else(|| display_name.as_raw_str()),
816                        ),
817                    );
818
819                    store.put(&self.serialize_value(&map)?).with_key(key).build()?;
820                }
821            }
822        }
823
824        if !changes.account_data.is_empty() {
825            let store = tx.object_store(keys::ACCOUNT_DATA)?;
826            for (event_type, event) in &changes.account_data {
827                store
828                    .put(&self.serialize_value(&event)?)
829                    .with_key(self.encode_key(keys::ACCOUNT_DATA, event_type))
830                    .build()?;
831            }
832        }
833
834        if !changes.room_account_data.is_empty() {
835            let store = tx.object_store(keys::ROOM_ACCOUNT_DATA)?;
836            for (room, events) in &changes.room_account_data {
837                for (event_type, event) in events {
838                    let key = self.encode_key(keys::ROOM_ACCOUNT_DATA, (room, event_type));
839                    store.put(&self.serialize_value(&event)?).with_key(key).build()?;
840                }
841            }
842        }
843
844        if !changes.state.is_empty() {
845            let state = tx.object_store(keys::ROOM_STATE)?;
846            let profiles = tx.object_store(keys::PROFILES)?;
847            let user_ids = tx.object_store(keys::USER_IDS)?;
848            let stripped_state = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
849            let stripped_user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
850
851            for (room, user_ids) in &changes.profiles_to_delete {
852                for user_id in user_ids {
853                    let key = self.encode_key(keys::PROFILES, (room, user_id));
854                    profiles.delete(&key).build()?;
855                }
856            }
857
858            for (room, event_types) in &changes.state {
859                let profile_changes = changes.profiles.get(room);
860
861                for (event_type, events) in event_types {
862                    for (state_key, raw_event) in events {
863                        let key = self.encode_key(keys::ROOM_STATE, (room, event_type, state_key));
864                        state
865                            .put(&self.serialize_value(&raw_event)?)
866                            .with_key(key.clone())
867                            .build()?;
868                        stripped_state.delete(&key).build()?;
869
870                        if *event_type == StateEventType::RoomMember {
871                            let event =
872                                match raw_event.deserialize_as_unchecked::<SyncRoomMemberEvent>() {
873                                    Ok(ev) => ev,
874                                    Err(e) => {
875                                        let event_id: Option<String> =
876                                            raw_event.get_field("event_id").ok().flatten();
877                                        debug!(event_id, "Failed to deserialize member event: {e}");
878                                        continue;
879                                    }
880                                };
881
882                            let key = (room, state_key);
883
884                            stripped_user_ids
885                                .delete(&self.encode_key(keys::STRIPPED_USER_IDS, key))
886                                .build()?;
887
888                            user_ids
889                                .put(&self.serialize_value(&RoomMember::from(&event))?)
890                                .with_key(self.encode_key(keys::USER_IDS, key))
891                                .build()?;
892
893                            if let Some(profile) =
894                                profile_changes.and_then(|p| p.get(event.state_key()))
895                            {
896                                profiles
897                                    .put(&self.serialize_value(&profile)?)
898                                    .with_key(self.encode_key(keys::PROFILES, key))
899                                    .build()?;
900                            }
901                        }
902                    }
903                }
904            }
905        }
906
907        if !changes.room_infos.is_empty() {
908            let room_infos = tx.object_store(keys::ROOM_INFOS)?;
909            for (room_id, room_info) in &changes.room_infos {
910                room_infos
911                    .put(&self.serialize_value(&room_info)?)
912                    .with_key(self.encode_key(keys::ROOM_INFOS, room_id))
913                    .build()?;
914            }
915        }
916
917        if !changes.presence.is_empty() {
918            let store = tx.object_store(keys::PRESENCE)?;
919            for (sender, event) in &changes.presence {
920                store
921                    .put(&self.serialize_value(&event)?)
922                    .with_key(self.encode_key(keys::PRESENCE, sender))
923                    .build()?;
924            }
925        }
926
927        if !changes.stripped_state.is_empty() {
928            let store = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
929            let user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
930
931            for (room, event_types) in &changes.stripped_state {
932                for (event_type, events) in event_types {
933                    for (state_key, raw_event) in events {
934                        let key = self
935                            .encode_key(keys::STRIPPED_ROOM_STATE, (room, event_type, state_key));
936                        store.put(&self.serialize_value(&raw_event)?).with_key(key).build()?;
937
938                        if *event_type == StateEventType::RoomMember {
939                            let event = match raw_event
940                                .deserialize_as_unchecked::<StrippedRoomMemberEvent>()
941                            {
942                                Ok(ev) => ev,
943                                Err(e) => {
944                                    let event_id: Option<String> =
945                                        raw_event.get_field("event_id").ok().flatten();
946                                    debug!(
947                                        event_id,
948                                        "Failed to deserialize stripped member event: {e}"
949                                    );
950                                    continue;
951                                }
952                            };
953
954                            let key = (room, state_key);
955
956                            user_ids
957                                .put(&self.serialize_value(&RoomMember::from(&event))?)
958                                .with_key(self.encode_key(keys::STRIPPED_USER_IDS, key))
959                                .build()?;
960                        }
961                    }
962                }
963            }
964        }
965
966        if !changes.receipts.is_empty() {
967            let room_user_receipts = tx.object_store(keys::ROOM_USER_RECEIPTS)?;
968            let room_event_receipts = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
969
970            for (room, content) in &changes.receipts {
971                for (event_id, receipts) in &content.0 {
972                    for (receipt_type, receipts) in receipts {
973                        for (user_id, receipt) in receipts {
974                            let key = match receipt.thread.as_str() {
975                                Some(thread_id) => self.encode_key(
976                                    keys::ROOM_USER_RECEIPTS,
977                                    (room, receipt_type, thread_id, user_id),
978                                ),
979                                None => self.encode_key(
980                                    keys::ROOM_USER_RECEIPTS,
981                                    (room, receipt_type, user_id),
982                                ),
983                            };
984
985                            if let Some((old_event, _)) =
986                                room_user_receipts.get(&key).await?.and_then(|f| {
987                                    self.deserialize_value::<(OwnedEventId, Receipt)>(&f).ok()
988                                })
989                            {
990                                let key = match receipt.thread.as_str() {
991                                    Some(thread_id) => self.encode_key(
992                                        keys::ROOM_EVENT_RECEIPTS,
993                                        (room, receipt_type, thread_id, old_event, user_id),
994                                    ),
995                                    None => self.encode_key(
996                                        keys::ROOM_EVENT_RECEIPTS,
997                                        (room, receipt_type, old_event, user_id),
998                                    ),
999                                };
1000                                room_event_receipts.delete(&key).build()?;
1001                            }
1002
1003                            room_user_receipts
1004                                .put(&self.serialize_value(&(event_id, receipt))?)
1005                                .with_key(key)
1006                                .build()?;
1007
1008                            // Add the receipt to the room event receipts
1009                            let key = match receipt.thread.as_str() {
1010                                Some(thread_id) => self.encode_key(
1011                                    keys::ROOM_EVENT_RECEIPTS,
1012                                    (room, receipt_type, thread_id, event_id, user_id),
1013                                ),
1014                                None => self.encode_key(
1015                                    keys::ROOM_EVENT_RECEIPTS,
1016                                    (room, receipt_type, event_id, user_id),
1017                                ),
1018                            };
1019                            room_event_receipts
1020                                .put(&self.serialize_value(&(user_id, receipt))?)
1021                                .with_key(key)
1022                                .build()?;
1023                        }
1024                    }
1025                }
1026            }
1027        }
1028
1029        if !changes.redactions.is_empty() {
1030            let state = tx.object_store(keys::ROOM_STATE)?;
1031            let room_info = tx.object_store(keys::ROOM_INFOS)?;
1032
1033            for (room_id, redactions) in &changes.redactions {
1034                let range = self.encode_to_range(keys::ROOM_STATE, room_id);
1035                let Some(mut cursor) = state.open_cursor().with_query(&range).await? else {
1036                    continue;
1037                };
1038
1039                let mut redaction_rules = None;
1040
1041                while let Some(value) = cursor.next_record().await? {
1042                    let Some(key) = cursor.key::<JsValue>()? else {
1043                        break;
1044                    };
1045
1046                    let raw_evt = self.deserialize_value::<Raw<AnySyncStateEvent>>(&value)?;
1047                    if let Ok(Some(event_id)) = raw_evt.get_field::<OwnedEventId>("event_id")
1048                        && let Some(redaction) = redactions.get(&event_id)
1049                    {
1050                        let redaction_rules = match &redaction_rules {
1051                            Some(r) => r,
1052                            None => {
1053                                let value = room_info
1054                                    .get(&self.encode_key(keys::ROOM_INFOS, room_id))
1055                                    .await?
1056                                    .and_then(|f| self.deserialize_value::<RoomInfo>(&f).ok())
1057                                    .map(|info| info.room_version_rules_or_default())
1058                                    .unwrap_or_else(|| {
1059                                        warn!(
1060                                            ?room_id,
1061                                            "Unable to get the room version rules, \
1062                                             defaulting to rules for room version \
1063                                             {ROOM_VERSION_FALLBACK}"
1064                                        );
1065                                        ROOM_VERSION_RULES_FALLBACK
1066                                    })
1067                                    .redaction;
1068                                redaction_rules.get_or_insert(value)
1069                            }
1070                        };
1071
1072                        let redacted = redact(
1073                            raw_evt.deserialize_as::<CanonicalJsonObject>()?,
1074                            redaction_rules,
1075                            Some(RedactedBecause::from_raw_event(redaction)?),
1076                        )
1077                        .map_err(StoreError::Redaction)?;
1078                        state.put(&self.serialize_value(&redacted)?).with_key(key).build()?;
1079                    }
1080                }
1081            }
1082        }
1083
1084        tx.commit().await.map_err(|e| e.into())
1085    }
1086
1087    async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
1088        self.inner
1089            .transaction(keys::PRESENCE)
1090            .with_mode(TransactionMode::Readonly)
1091            .build()?
1092            .object_store(keys::PRESENCE)?
1093            .get(&self.encode_key(keys::PRESENCE, user_id))
1094            .await?
1095            .map(|f| self.deserialize_value(&f))
1096            .transpose()
1097    }
1098
1099    async fn get_presence_events(
1100        &self,
1101        user_ids: &[OwnedUserId],
1102    ) -> Result<Vec<Raw<PresenceEvent>>> {
1103        if user_ids.is_empty() {
1104            return Ok(Vec::new());
1105        }
1106
1107        let txn =
1108            self.inner.transaction(keys::PRESENCE).with_mode(TransactionMode::Readonly).build()?;
1109        let store = txn.object_store(keys::PRESENCE)?;
1110
1111        let mut events = Vec::with_capacity(user_ids.len());
1112
1113        for user_id in user_ids {
1114            if let Some(event) = store
1115                .get(&self.encode_key(keys::PRESENCE, user_id))
1116                .await?
1117                .map(|f| self.deserialize_value(&f))
1118                .transpose()?
1119            {
1120                events.push(event)
1121            }
1122        }
1123
1124        Ok(events)
1125    }
1126
1127    async fn get_state_event(
1128        &self,
1129        room_id: &RoomId,
1130        event_type: StateEventType,
1131        state_key: &str,
1132    ) -> Result<Option<RawAnySyncOrStrippedState>> {
1133        Ok(self
1134            .get_state_events_for_keys(room_id, event_type, &[state_key])
1135            .await?
1136            .into_iter()
1137            .next())
1138    }
1139
1140    async fn get_state_events(
1141        &self,
1142        room_id: &RoomId,
1143        event_type: StateEventType,
1144    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1145        let stripped_range =
1146            self.encode_to_range(keys::STRIPPED_ROOM_STATE, (room_id, &event_type));
1147        let stripped_events = self
1148            .inner
1149            .transaction(keys::STRIPPED_ROOM_STATE)
1150            .with_mode(TransactionMode::Readonly)
1151            .build()?
1152            .object_store(keys::STRIPPED_ROOM_STATE)?
1153            .get_all()
1154            .with_query(&stripped_range)
1155            .await?
1156            .filter_map(Result::ok)
1157            .filter_map(|f| {
1158                self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Stripped)
1159            })
1160            .collect::<Vec<_>>();
1161
1162        if !stripped_events.is_empty() {
1163            return Ok(stripped_events);
1164        }
1165
1166        let range = self.encode_to_range(keys::ROOM_STATE, (room_id, event_type));
1167        Ok(self
1168            .inner
1169            .transaction(keys::ROOM_STATE)
1170            .with_mode(TransactionMode::Readonly)
1171            .build()?
1172            .object_store(keys::ROOM_STATE)?
1173            .get_all()
1174            .with_query(&range)
1175            .await?
1176            .filter_map(Result::ok)
1177            .filter_map(|f| self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Sync))
1178            .collect::<Vec<_>>())
1179    }
1180
1181    async fn get_state_events_for_keys(
1182        &self,
1183        room_id: &RoomId,
1184        event_type: StateEventType,
1185        state_keys: &[&str],
1186    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1187        if state_keys.is_empty() {
1188            return Ok(Vec::new());
1189        }
1190
1191        let mut events = Vec::with_capacity(state_keys.len());
1192
1193        {
1194            let txn = self
1195                .inner
1196                .transaction(keys::STRIPPED_ROOM_STATE)
1197                .with_mode(TransactionMode::Readonly)
1198                .build()?;
1199            let store = txn.object_store(keys::STRIPPED_ROOM_STATE)?;
1200
1201            for state_key in state_keys {
1202                if let Some(event) =
1203                    store
1204                        .get(&self.encode_key(
1205                            keys::STRIPPED_ROOM_STATE,
1206                            (room_id, &event_type, state_key),
1207                        ))
1208                        .await?
1209                        .map(|f| self.deserialize_value(&f))
1210                        .transpose()?
1211                {
1212                    events.push(RawAnySyncOrStrippedState::Stripped(event));
1213                }
1214            }
1215
1216            if !events.is_empty() {
1217                return Ok(events);
1218            }
1219        }
1220
1221        let txn = self
1222            .inner
1223            .transaction(keys::ROOM_STATE)
1224            .with_mode(TransactionMode::Readonly)
1225            .build()?;
1226        let store = txn.object_store(keys::ROOM_STATE)?;
1227
1228        for state_key in state_keys {
1229            if let Some(event) = store
1230                .get(&self.encode_key(keys::ROOM_STATE, (room_id, &event_type, state_key)))
1231                .await?
1232                .map(|f| self.deserialize_value(&f))
1233                .transpose()?
1234            {
1235                events.push(RawAnySyncOrStrippedState::Sync(event));
1236            }
1237        }
1238
1239        Ok(events)
1240    }
1241
1242    async fn get_profile(
1243        &self,
1244        room_id: &RoomId,
1245        user_id: &UserId,
1246    ) -> Result<Option<MinimalRoomMemberEvent>> {
1247        self.inner
1248            .transaction(keys::PROFILES)
1249            .with_mode(TransactionMode::Readonly)
1250            .build()?
1251            .object_store(keys::PROFILES)?
1252            .get(&self.encode_key(keys::PROFILES, (room_id, user_id)))
1253            .await?
1254            .map(|f| self.deserialize_value(&f))
1255            .transpose()
1256    }
1257
1258    async fn get_profiles<'a>(
1259        &self,
1260        room_id: &RoomId,
1261        user_ids: &'a [OwnedUserId],
1262    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1263        if user_ids.is_empty() {
1264            return Ok(BTreeMap::new());
1265        }
1266
1267        let txn =
1268            self.inner.transaction(keys::PROFILES).with_mode(TransactionMode::Readonly).build()?;
1269        let store = txn.object_store(keys::PROFILES)?;
1270
1271        let mut profiles = BTreeMap::new();
1272        for user_id in user_ids {
1273            if let Some(profile) = store
1274                .get(&self.encode_key(keys::PROFILES, (room_id, user_id)))
1275                .await?
1276                .map(|f| self.deserialize_value(&f))
1277                .transpose()?
1278            {
1279                profiles.insert(user_id.as_ref(), profile);
1280            }
1281        }
1282
1283        Ok(profiles)
1284    }
1285
1286    async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1287        let transaction = self
1288            .inner
1289            .transaction(keys::ROOM_INFOS)
1290            .with_mode(TransactionMode::Readonly)
1291            .build()?;
1292
1293        let object_store = transaction.object_store(keys::ROOM_INFOS)?;
1294
1295        Ok(match room_load_settings {
1296            RoomLoadSettings::All => object_store
1297                .get_all()
1298                .await?
1299                .map(|room_info| self.deserialize_value::<RoomInfo>(&room_info?))
1300                .collect::<Result<_>>()?,
1301
1302            RoomLoadSettings::One(room_id) => {
1303                match object_store.get(&self.encode_key(keys::ROOM_INFOS, room_id)).await? {
1304                    Some(room_info) => vec![self.deserialize_value::<RoomInfo>(&room_info)?],
1305                    None => vec![],
1306                }
1307            }
1308        })
1309    }
1310
1311    async fn get_users_with_display_name(
1312        &self,
1313        room_id: &RoomId,
1314        display_name: &DisplayName,
1315    ) -> Result<BTreeSet<OwnedUserId>> {
1316        self.inner
1317            .transaction(keys::DISPLAY_NAMES)
1318            .with_mode(TransactionMode::Readonly)
1319            .build()?
1320            .object_store(keys::DISPLAY_NAMES)?
1321            .get(&self.encode_key(
1322                keys::DISPLAY_NAMES,
1323                (
1324                    room_id,
1325                    display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1326                ),
1327            ))
1328            .await?
1329            .map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
1330            .unwrap_or_else(|| Ok(Default::default()))
1331    }
1332
1333    async fn get_users_with_display_names<'a>(
1334        &self,
1335        room_id: &RoomId,
1336        display_names: &'a [DisplayName],
1337    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1338        let mut map = HashMap::new();
1339
1340        if display_names.is_empty() {
1341            return Ok(map);
1342        }
1343
1344        let txn = self
1345            .inner
1346            .transaction(keys::DISPLAY_NAMES)
1347            .with_mode(TransactionMode::Readonly)
1348            .build()?;
1349        let store = txn.object_store(keys::DISPLAY_NAMES)?;
1350
1351        for display_name in display_names {
1352            if let Some(user_ids) = store
1353                .get(
1354                    &self.encode_key(
1355                        keys::DISPLAY_NAMES,
1356                        (
1357                            room_id,
1358                            display_name
1359                                .as_normalized_str()
1360                                .unwrap_or_else(|| display_name.as_raw_str()),
1361                        ),
1362                    ),
1363                )
1364                .await?
1365                .map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
1366                .transpose()?
1367            {
1368                map.insert(display_name, user_ids);
1369            }
1370        }
1371
1372        Ok(map)
1373    }
1374
1375    async fn get_account_data_event(
1376        &self,
1377        event_type: GlobalAccountDataEventType,
1378    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1379        self.inner
1380            .transaction(keys::ACCOUNT_DATA)
1381            .with_mode(TransactionMode::Readonly)
1382            .build()?
1383            .object_store(keys::ACCOUNT_DATA)?
1384            .get(&self.encode_key(keys::ACCOUNT_DATA, event_type))
1385            .await?
1386            .map(|f| self.deserialize_value(&f))
1387            .transpose()
1388    }
1389
1390    async fn get_room_account_data_event(
1391        &self,
1392        room_id: &RoomId,
1393        event_type: RoomAccountDataEventType,
1394    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1395        self.inner
1396            .transaction(keys::ROOM_ACCOUNT_DATA)
1397            .with_mode(TransactionMode::Readonly)
1398            .build()?
1399            .object_store(keys::ROOM_ACCOUNT_DATA)?
1400            .get(&self.encode_key(keys::ROOM_ACCOUNT_DATA, (room_id, event_type)))
1401            .await?
1402            .map(|f| self.deserialize_value(&f))
1403            .transpose()
1404    }
1405
1406    async fn get_user_room_receipt_event(
1407        &self,
1408        room_id: &RoomId,
1409        receipt_type: ReceiptType,
1410        thread: ReceiptThread,
1411        user_id: &UserId,
1412    ) -> Result<Option<(OwnedEventId, Receipt)>> {
1413        let key = match thread.as_str() {
1414            Some(thread_id) => self
1415                .encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, thread_id, user_id)),
1416            None => self.encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, user_id)),
1417        };
1418        self.inner
1419            .transaction(keys::ROOM_USER_RECEIPTS)
1420            .with_mode(TransactionMode::Readonly)
1421            .build()?
1422            .object_store(keys::ROOM_USER_RECEIPTS)?
1423            .get(&key)
1424            .await?
1425            .map(|f| self.deserialize_value(&f))
1426            .transpose()
1427    }
1428
1429    async fn get_event_room_receipt_events(
1430        &self,
1431        room_id: &RoomId,
1432        receipt_type: ReceiptType,
1433        thread: ReceiptThread,
1434        event_id: &EventId,
1435    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1436        let range = match thread.as_str() {
1437            Some(thread_id) => self.encode_to_range(
1438                keys::ROOM_EVENT_RECEIPTS,
1439                (room_id, receipt_type, thread_id, event_id),
1440            ),
1441            None => {
1442                self.encode_to_range(keys::ROOM_EVENT_RECEIPTS, (room_id, receipt_type, event_id))
1443            }
1444        };
1445        let tx = self
1446            .inner
1447            .transaction(keys::ROOM_EVENT_RECEIPTS)
1448            .with_mode(TransactionMode::Readonly)
1449            .build()?;
1450        let store = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
1451
1452        Ok(store
1453            .get_all()
1454            .with_query(&range)
1455            .await?
1456            .filter_map(Result::ok)
1457            .filter_map(|f| self.deserialize_value(&f).ok())
1458            .collect::<Vec<_>>())
1459    }
1460
1461    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1462        let jskey = &JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1463        self.get_custom_value_for_js(jskey).await
1464    }
1465
1466    async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1467        let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1468
1469        let prev = self.get_custom_value_for_js(&jskey).await?;
1470
1471        let tx =
1472            self.inner.transaction(keys::CUSTOM).with_mode(TransactionMode::Readwrite).build()?;
1473
1474        tx.object_store(keys::CUSTOM)?
1475            .put(&self.serialize_value(&value)?)
1476            .with_key(jskey)
1477            .build()?;
1478
1479        tx.commit().await.map_err(IndexeddbStateStoreError::from)?;
1480        Ok(prev)
1481    }
1482
1483    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1484        let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1485
1486        let prev = self.get_custom_value_for_js(&jskey).await?;
1487
1488        let tx =
1489            self.inner.transaction(keys::CUSTOM).with_mode(TransactionMode::Readwrite).build()?;
1490
1491        tx.object_store(keys::CUSTOM)?.delete(&jskey).build()?;
1492
1493        tx.commit().await.map_err(IndexeddbStateStoreError::from)?;
1494        Ok(prev)
1495    }
1496
1497    async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1498        // All the stores which use a RoomId as their key (and nothing additional).
1499        let direct_stores = [keys::ROOM_INFOS, keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE];
1500
1501        // All the stores which use a RoomId as the first part of their key, but may
1502        // have some additional data in the key.
1503        let prefixed_stores = [
1504            keys::PROFILES,
1505            keys::DISPLAY_NAMES,
1506            keys::USER_IDS,
1507            keys::ROOM_STATE,
1508            keys::ROOM_ACCOUNT_DATA,
1509            keys::ROOM_EVENT_RECEIPTS,
1510            keys::ROOM_USER_RECEIPTS,
1511            keys::STRIPPED_ROOM_STATE,
1512            keys::STRIPPED_USER_IDS,
1513            keys::THREAD_SUBSCRIPTIONS,
1514        ];
1515
1516        let all_stores = {
1517            let mut v = Vec::new();
1518            v.extend(prefixed_stores);
1519            v.extend(direct_stores);
1520            v
1521        };
1522
1523        let tx =
1524            self.inner.transaction(all_stores).with_mode(TransactionMode::Readwrite).build()?;
1525
1526        for store_name in direct_stores {
1527            tx.object_store(store_name)?.delete(&self.encode_key(store_name, room_id)).build()?;
1528        }
1529
1530        for store_name in prefixed_stores {
1531            let store = tx.object_store(store_name)?;
1532            let range = self.encode_to_range(store_name, room_id);
1533            for key in store.get_all_keys::<JsValue>().with_query(&range).await? {
1534                store.delete(&key?).build()?;
1535            }
1536        }
1537
1538        tx.commit().await.map_err(|e| e.into())
1539    }
1540
1541    async fn get_user_ids(
1542        &self,
1543        room_id: &RoomId,
1544        memberships: RoomMemberships,
1545    ) -> Result<Vec<OwnedUserId>> {
1546        let ids = self.get_user_ids_inner(room_id, memberships, true).await?;
1547        if !ids.is_empty() {
1548            return Ok(ids);
1549        }
1550        self.get_user_ids_inner(room_id, memberships, false).await
1551    }
1552
1553    async fn save_send_queue_request(
1554        &self,
1555        room_id: &RoomId,
1556        transaction_id: OwnedTransactionId,
1557        created_at: MilliSecondsSinceUnixEpoch,
1558        kind: QueuedRequestKind,
1559        priority: usize,
1560    ) -> Result<()> {
1561        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1562
1563        let tx = self
1564            .inner
1565            .transaction(keys::ROOM_SEND_QUEUE)
1566            .with_mode(TransactionMode::Readwrite)
1567            .build()?;
1568
1569        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1570
1571        // We store an encoded vector of the queued requests, with their transaction
1572        // ids.
1573
1574        // Reload the previous vector for this room, or create an empty one.
1575        let prev = obj.get(&encoded_key).await?;
1576
1577        let mut prev = prev.map_or_else(
1578            || Ok(Vec::new()),
1579            |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1580        )?;
1581
1582        // Push the new request.
1583        prev.push(PersistedQueuedRequest {
1584            room_id: room_id.to_owned(),
1585            kind: Some(kind),
1586            transaction_id,
1587            error: None,
1588            is_wedged: None,
1589            event: None,
1590            priority: Some(priority),
1591            created_at,
1592        });
1593
1594        // Save the new vector into db.
1595        obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1596
1597        tx.commit().await?;
1598
1599        Ok(())
1600    }
1601
1602    async fn update_send_queue_request(
1603        &self,
1604        room_id: &RoomId,
1605        transaction_id: &TransactionId,
1606        kind: QueuedRequestKind,
1607    ) -> Result<bool> {
1608        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1609
1610        let tx = self
1611            .inner
1612            .transaction(keys::ROOM_SEND_QUEUE)
1613            .with_mode(TransactionMode::Readwrite)
1614            .build()?;
1615
1616        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1617
1618        // We store an encoded vector of the queued requests, with their transaction
1619        // ids.
1620
1621        // Reload the previous vector for this room, or create an empty one.
1622        let prev = obj.get(&encoded_key).await?;
1623
1624        let mut prev = prev.map_or_else(
1625            || Ok(Vec::new()),
1626            |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1627        )?;
1628
1629        // Modify the one request.
1630        if let Some(entry) = prev.iter_mut().find(|entry| entry.transaction_id == transaction_id) {
1631            entry.kind = Some(kind);
1632            // Reset the error state.
1633            entry.error = None;
1634            // Remove migrated fields.
1635            entry.is_wedged = None;
1636            entry.event = None;
1637
1638            // Save the new vector into db.
1639            obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1640            tx.commit().await?;
1641
1642            Ok(true)
1643        } else {
1644            Ok(false)
1645        }
1646    }
1647
1648    async fn remove_send_queue_request(
1649        &self,
1650        room_id: &RoomId,
1651        transaction_id: &TransactionId,
1652    ) -> Result<bool> {
1653        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1654
1655        let tx = self
1656            .inner
1657            .transaction([keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE])
1658            .with_mode(TransactionMode::Readwrite)
1659            .build()?;
1660
1661        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1662
1663        // We store an encoded vector of the queued requests, with their transaction
1664        // ids.
1665
1666        // Reload the previous vector for this room.
1667        if let Some(val) = obj.get(&encoded_key).await? {
1668            let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
1669            if let Some(pos) = prev.iter().position(|item| item.transaction_id == transaction_id) {
1670                prev.remove(pos);
1671
1672                if prev.is_empty() {
1673                    obj.delete(&encoded_key).build()?;
1674                } else {
1675                    obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1676                }
1677
1678                tx.commit().await?;
1679                return Ok(true);
1680            }
1681        }
1682
1683        Ok(false)
1684    }
1685
1686    async fn load_send_queue_requests(&self, room_id: &RoomId) -> Result<Vec<QueuedRequest>> {
1687        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1688
1689        // We store an encoded vector of the queued requests, with their transaction
1690        // ids.
1691        let prev = self
1692            .inner
1693            .transaction(keys::ROOM_SEND_QUEUE)
1694            .with_mode(TransactionMode::Readwrite)
1695            .build()?
1696            .object_store(keys::ROOM_SEND_QUEUE)?
1697            .get(&encoded_key)
1698            .await?;
1699
1700        let mut prev = prev.map_or_else(
1701            || Ok(Vec::new()),
1702            |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1703        )?;
1704
1705        // Inverted stable ordering on priority.
1706        prev.sort_by(|lhs, rhs| rhs.priority.unwrap_or(0).cmp(&lhs.priority.unwrap_or(0)));
1707
1708        Ok(prev.into_iter().filter_map(PersistedQueuedRequest::into_queued_request).collect())
1709    }
1710
1711    async fn update_send_queue_request_status(
1712        &self,
1713        room_id: &RoomId,
1714        transaction_id: &TransactionId,
1715        error: Option<QueueWedgeError>,
1716    ) -> Result<()> {
1717        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1718
1719        let tx = self
1720            .inner
1721            .transaction(keys::ROOM_SEND_QUEUE)
1722            .with_mode(TransactionMode::Readwrite)
1723            .build()?;
1724
1725        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1726
1727        if let Some(val) = obj.get(&encoded_key).await? {
1728            let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
1729            if let Some(request) =
1730                prev.iter_mut().find(|item| item.transaction_id == transaction_id)
1731            {
1732                request.is_wedged = None;
1733                request.error = error;
1734                obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1735            }
1736        }
1737
1738        tx.commit().await?;
1739
1740        Ok(())
1741    }
1742
1743    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>> {
1744        let tx = self
1745            .inner
1746            .transaction(keys::ROOM_SEND_QUEUE)
1747            .with_mode(TransactionMode::Readwrite)
1748            .build()?;
1749
1750        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1751
1752        let all_entries = obj
1753            .get_all()
1754            .await?
1755            .map(|item| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&item?))
1756            .collect::<Result<Vec<Vec<PersistedQueuedRequest>>, _>>()?
1757            .into_iter()
1758            .flat_map(|vec| vec.into_iter().map(|item| item.room_id))
1759            .collect::<BTreeSet<_>>();
1760
1761        Ok(all_entries.into_iter().collect())
1762    }
1763
1764    async fn save_dependent_queued_request(
1765        &self,
1766        room_id: &RoomId,
1767        parent_txn_id: &TransactionId,
1768        own_txn_id: ChildTransactionId,
1769        created_at: MilliSecondsSinceUnixEpoch,
1770        content: DependentQueuedRequestKind,
1771    ) -> Result<()> {
1772        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1773
1774        let tx = self
1775            .inner
1776            .transaction(keys::DEPENDENT_SEND_QUEUE)
1777            .with_mode(TransactionMode::Readwrite)
1778            .build()?;
1779
1780        let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1781
1782        // We store an encoded vector of the dependent requests.
1783        // Reload the previous vector for this room, or create an empty one.
1784        let prev = obj.get(&encoded_key).await?;
1785
1786        let mut prev = prev.map_or_else(
1787            || Ok(Vec::new()),
1788            |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1789        )?;
1790
1791        // Push the new request.
1792        prev.push(DependentQueuedRequest {
1793            kind: content,
1794            parent_transaction_id: parent_txn_id.to_owned(),
1795            own_transaction_id: own_txn_id,
1796            parent_key: None,
1797            created_at,
1798        });
1799
1800        // Save the new vector into db.
1801        obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1802
1803        tx.commit().await?;
1804
1805        Ok(())
1806    }
1807
1808    async fn update_dependent_queued_request(
1809        &self,
1810        room_id: &RoomId,
1811        own_transaction_id: &ChildTransactionId,
1812        new_content: DependentQueuedRequestKind,
1813    ) -> Result<bool> {
1814        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1815
1816        let tx = self
1817            .inner
1818            .transaction(keys::DEPENDENT_SEND_QUEUE)
1819            .with_mode(TransactionMode::Readwrite)
1820            .build()?;
1821
1822        let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1823
1824        // We store an encoded vector of the dependent requests.
1825        // Reload the previous vector for this room, or create an empty one.
1826        let prev = obj.get(&encoded_key).await?;
1827
1828        let mut prev = prev.map_or_else(
1829            || Ok(Vec::new()),
1830            |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1831        )?;
1832
1833        // Modify the dependent request, if found.
1834        let mut found = false;
1835        for entry in prev.iter_mut() {
1836            if entry.own_transaction_id == *own_transaction_id {
1837                found = true;
1838                entry.kind = new_content;
1839                break;
1840            }
1841        }
1842
1843        if found {
1844            obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1845            tx.commit().await?;
1846        }
1847
1848        Ok(found)
1849    }
1850
1851    async fn mark_dependent_queued_requests_as_ready(
1852        &self,
1853        room_id: &RoomId,
1854        parent_txn_id: &TransactionId,
1855        parent_key: SentRequestKey,
1856    ) -> Result<usize> {
1857        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1858
1859        let tx = self
1860            .inner
1861            .transaction(keys::DEPENDENT_SEND_QUEUE)
1862            .with_mode(TransactionMode::Readwrite)
1863            .build()?;
1864
1865        let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1866
1867        // We store an encoded vector of the dependent requests.
1868        // Reload the previous vector for this room, or create an empty one.
1869        let prev = obj.get(&encoded_key).await?;
1870
1871        let mut prev = prev.map_or_else(
1872            || Ok(Vec::new()),
1873            |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1874        )?;
1875
1876        // Modify all requests that match.
1877        let mut num_updated = 0;
1878        for entry in prev.iter_mut().filter(|entry| entry.parent_transaction_id == parent_txn_id) {
1879            entry.parent_key = Some(parent_key.clone());
1880            num_updated += 1;
1881        }
1882
1883        if num_updated > 0 {
1884            obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1885            tx.commit().await?;
1886        }
1887
1888        Ok(num_updated)
1889    }
1890
1891    async fn remove_dependent_queued_request(
1892        &self,
1893        room_id: &RoomId,
1894        txn_id: &ChildTransactionId,
1895    ) -> Result<bool> {
1896        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1897
1898        let tx = self
1899            .inner
1900            .transaction(keys::DEPENDENT_SEND_QUEUE)
1901            .with_mode(TransactionMode::Readwrite)
1902            .build()?;
1903
1904        let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1905
1906        // We store an encoded vector of the dependent requests.
1907        // Reload the previous vector for this room.
1908        if let Some(val) = obj.get(&encoded_key).await? {
1909            let mut prev = self.deserialize_value::<Vec<DependentQueuedRequest>>(&val)?;
1910            if let Some(pos) = prev.iter().position(|item| item.own_transaction_id == *txn_id) {
1911                prev.remove(pos);
1912
1913                if prev.is_empty() {
1914                    obj.delete(&encoded_key).build()?;
1915                } else {
1916                    obj.put(&self.serialize_value(&prev)?).with_key(encoded_key).build()?;
1917                }
1918
1919                tx.commit().await?;
1920                return Ok(true);
1921            }
1922        }
1923
1924        Ok(false)
1925    }
1926
1927    async fn load_dependent_queued_requests(
1928        &self,
1929        room_id: &RoomId,
1930    ) -> Result<Vec<DependentQueuedRequest>> {
1931        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1932
1933        // We store an encoded vector of the dependent requests.
1934        let prev = self
1935            .inner
1936            .transaction(keys::DEPENDENT_SEND_QUEUE)
1937            .with_mode(TransactionMode::Readwrite)
1938            .build()?
1939            .object_store(keys::DEPENDENT_SEND_QUEUE)?
1940            .get(&encoded_key)
1941            .await?;
1942
1943        prev.map_or_else(
1944            || Ok(Vec::new()),
1945            |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1946        )
1947    }
1948
1949    async fn upsert_thread_subscription(
1950        &self,
1951        room: &RoomId,
1952        thread_id: &EventId,
1953        subscription: StoredThreadSubscription,
1954    ) -> Result<()> {
1955        let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
1956
1957        let tx = self
1958            .inner
1959            .transaction(keys::THREAD_SUBSCRIPTIONS)
1960            .with_mode(TransactionMode::Readwrite)
1961            .build()?;
1962        let obj = tx.object_store(keys::THREAD_SUBSCRIPTIONS)?;
1963
1964        let mut new = PersistedThreadSubscription::from(subscription);
1965
1966        // See if there's a previous subscription.
1967        if let Some(previous_value) = obj.get(&encoded_key).await? {
1968            let previous: PersistedThreadSubscription = self.deserialize_value(&previous_value)?;
1969
1970            // If the previous status is the same as the new one, don't do anything.
1971            if new == previous {
1972                return Ok(());
1973            }
1974            if !compare_thread_subscription_bump_stamps(previous.bump_stamp, &mut new.bump_stamp) {
1975                return Ok(());
1976            }
1977        }
1978
1979        let serialized_value = self.serialize_value(&new);
1980        obj.put(&serialized_value?).with_key(encoded_key).build()?;
1981
1982        tx.commit().await?;
1983
1984        Ok(())
1985    }
1986
1987    async fn upsert_thread_subscriptions(
1988        &self,
1989        updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1990    ) -> Result<()> {
1991        let tx = self
1992            .inner
1993            .transaction(keys::THREAD_SUBSCRIPTIONS)
1994            .with_mode(TransactionMode::Readwrite)
1995            .build()?;
1996        let obj = tx.object_store(keys::THREAD_SUBSCRIPTIONS)?;
1997
1998        for (room_id, thread_id, subscription) in updates {
1999            let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room_id, thread_id));
2000            let mut new = PersistedThreadSubscription::from(subscription);
2001
2002            // See if there's a previous subscription.
2003            if let Some(previous_value) = obj.get(&encoded_key).await? {
2004                let previous: PersistedThreadSubscription =
2005                    self.deserialize_value(&previous_value)?;
2006
2007                // If the previous status is the same as the new one, don't do anything.
2008                if new == previous {
2009                    continue;
2010                }
2011                if !compare_thread_subscription_bump_stamps(
2012                    previous.bump_stamp,
2013                    &mut new.bump_stamp,
2014                ) {
2015                    continue;
2016                }
2017            }
2018
2019            let serialized_value = self.serialize_value(&new);
2020            obj.put(&serialized_value?).with_key(encoded_key).build()?;
2021        }
2022
2023        tx.commit().await?;
2024
2025        Ok(())
2026    }
2027
2028    async fn load_thread_subscription(
2029        &self,
2030        room: &RoomId,
2031        thread_id: &EventId,
2032    ) -> Result<Option<StoredThreadSubscription>> {
2033        let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
2034
2035        let js_value = self
2036            .inner
2037            .transaction(keys::THREAD_SUBSCRIPTIONS)
2038            .with_mode(TransactionMode::Readonly)
2039            .build()?
2040            .object_store(keys::THREAD_SUBSCRIPTIONS)?
2041            .get(&encoded_key)
2042            .await?;
2043
2044        let Some(js_value) = js_value else {
2045            // We didn't have a previous subscription for this thread.
2046            return Ok(None);
2047        };
2048
2049        let sub: PersistedThreadSubscription = self.deserialize_value(&js_value)?;
2050
2051        let status = ThreadSubscriptionStatus::from_str(&sub.status).map_err(|_| {
2052            StoreError::InvalidData {
2053                details: format!(
2054                    "invalid thread status for room {room} and thread {thread_id}: {}",
2055                    sub.status
2056                ),
2057            }
2058        })?;
2059
2060        Ok(Some(StoredThreadSubscription { status, bump_stamp: sub.bump_stamp }))
2061    }
2062
2063    async fn remove_thread_subscription(&self, room: &RoomId, thread_id: &EventId) -> Result<()> {
2064        let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room, thread_id));
2065
2066        let transaction = self
2067            .inner
2068            .transaction(keys::THREAD_SUBSCRIPTIONS)
2069            .with_mode(TransactionMode::Readwrite)
2070            .build()?;
2071        transaction.object_store(keys::THREAD_SUBSCRIPTIONS)?.delete(&encoded_key).await?;
2072        transaction.commit().await?;
2073
2074        Ok(())
2075    }
2076
2077    #[allow(clippy::unused_async)]
2078    async fn optimize(&self) -> Result<()> {
2079        Ok(())
2080    }
2081
2082    #[allow(clippy::unused_async)]
2083    async fn get_size(&self) -> Result<Option<usize>> {
2084        Ok(None)
2085    }
2086});
2087
2088/// A room member.
2089#[derive(Debug, Serialize, Deserialize)]
2090struct RoomMember {
2091    user_id: OwnedUserId,
2092    membership: MembershipState,
2093}
2094
2095impl From<&SyncStateEvent<RoomMemberEventContent>> for RoomMember {
2096    fn from(event: &SyncStateEvent<RoomMemberEventContent>) -> Self {
2097        Self { user_id: event.state_key().clone(), membership: event.membership().clone() }
2098    }
2099}
2100
2101impl From<&StrippedRoomMemberEvent> for RoomMember {
2102    fn from(event: &StrippedRoomMemberEvent) -> Self {
2103        Self { user_id: event.state_key.clone(), membership: event.content.membership.clone() }
2104    }
2105}
2106
2107#[cfg(test)]
2108mod migration_tests {
2109    use assert_matches2::assert_matches;
2110    use matrix_sdk_base::store::{QueuedRequestKind, SerializableEventContent};
2111    use ruma::{
2112        OwnedRoomId, OwnedTransactionId, TransactionId,
2113        events::room::message::RoomMessageEventContent, room_id,
2114    };
2115    use serde::{Deserialize, Serialize};
2116
2117    use crate::state_store::PersistedQueuedRequest;
2118
2119    #[derive(Serialize, Deserialize)]
2120    struct OldPersistedQueuedRequest {
2121        room_id: OwnedRoomId,
2122        event: SerializableEventContent,
2123        transaction_id: OwnedTransactionId,
2124        is_wedged: bool,
2125    }
2126
2127    // We now persist an error when an event failed to send instead of just a
2128    // boolean. To support that, `PersistedQueueEvent` changed a bool to
2129    // Option<bool>, ensures that this work properly.
2130    #[test]
2131    fn test_migrating_persisted_queue_event_serialization() {
2132        let room_a_id = room_id!("!room_a:dummy.local");
2133        let transaction_id = TransactionId::new();
2134        let content =
2135            SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())
2136                .unwrap();
2137
2138        let old_persisted_queue_event = OldPersistedQueuedRequest {
2139            room_id: room_a_id.to_owned(),
2140            event: content,
2141            transaction_id: transaction_id.clone(),
2142            is_wedged: true,
2143        };
2144
2145        let serialized_persisted = serde_json::to_vec(&old_persisted_queue_event).unwrap();
2146
2147        // Load it with the new version.
2148        let new_persisted: PersistedQueuedRequest =
2149            serde_json::from_slice(&serialized_persisted).unwrap();
2150
2151        assert_eq!(new_persisted.is_wedged, Some(true));
2152        assert!(new_persisted.error.is_none());
2153
2154        assert!(new_persisted.event.is_some());
2155        assert!(new_persisted.kind.is_none());
2156
2157        let queued = new_persisted.into_queued_request().unwrap();
2158        assert_matches!(queued.kind, QueuedRequestKind::Event { .. });
2159        assert_eq!(queued.transaction_id, transaction_id);
2160        assert!(queued.error.is_some());
2161    }
2162}
2163
2164#[cfg(all(test, target_family = "wasm"))]
2165mod tests {
2166    #[cfg(target_family = "wasm")]
2167    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
2168
2169    use matrix_sdk_base::statestore_integration_tests;
2170    use uuid::Uuid;
2171
2172    use super::{IndexeddbStateStore, Result};
2173
2174    async fn get_store() -> Result<IndexeddbStateStore> {
2175        let db_name = format!("test-state-plain-{}", Uuid::new_v4().as_hyphenated());
2176        Ok(IndexeddbStateStore::builder().name(db_name).build().await?)
2177    }
2178
2179    statestore_integration_tests!();
2180}
2181
2182#[cfg(all(test, target_family = "wasm"))]
2183mod encrypted_tests {
2184    #[cfg(target_family = "wasm")]
2185    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
2186
2187    use matrix_sdk_base::statestore_integration_tests;
2188    use uuid::Uuid;
2189
2190    use super::{IndexeddbStateStore, Result};
2191
2192    async fn get_store() -> Result<IndexeddbStateStore> {
2193        let db_name = format!("test-state-encrypted-{}", Uuid::new_v4().as_hyphenated());
2194        let passphrase = format!("some_passphrase-{}", Uuid::new_v4().as_hyphenated());
2195        Ok(IndexeddbStateStore::builder().name(db_name).passphrase(passphrase).build().await?)
2196    }
2197
2198    statestore_integration_tests!();
2199}