matrix_sdk_indexeddb/state_store/
mod.rs

1// Copyright 2021 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
17    sync::Arc,
18};
19
20use anyhow::anyhow;
21use async_trait::async_trait;
22use gloo_utils::format::JsValueSerdeExt;
23use growable_bloom_filter::GrowableBloom;
24use indexed_db_futures::prelude::*;
25use matrix_sdk_base::{
26    deserialized_responses::{DisplayName, RawAnySyncOrStrippedState},
27    store::{
28        ChildTransactionId, ComposerDraft, DependentQueuedRequest, DependentQueuedRequestKind,
29        QueuedRequest, QueuedRequestKind, RoomLoadSettings, SentRequestKey,
30        SerializableEventContent, ServerCapabilities, StateChanges, StateStore, StoreError,
31    },
32    MinimalRoomMemberEvent, RoomInfo, RoomMemberships, StateStoreDataKey, StateStoreDataValue,
33};
34use matrix_sdk_store_encryption::{Error as EncryptionError, StoreCipher};
35use ruma::{
36    canonical_json::{redact, RedactedBecause},
37    events::{
38        presence::PresenceEvent,
39        receipt::{Receipt, ReceiptThread, ReceiptType},
40        room::member::{
41            MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent, SyncRoomMemberEvent,
42        },
43        AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
44        GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, SyncStateEvent,
45    },
46    serde::Raw,
47    CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri,
48    OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, RoomVersionId, TransactionId, UserId,
49};
50use serde::{de::DeserializeOwned, Deserialize, Serialize};
51use tracing::{debug, warn};
52use wasm_bindgen::JsValue;
53use web_sys::IdbKeyRange;
54
55mod migrations;
56
57pub use self::migrations::MigrationConflictStrategy;
58use self::migrations::{upgrade_inner_db, upgrade_meta_db};
59use crate::safe_encode::SafeEncode;
60
61#[derive(Debug, thiserror::Error)]
62pub enum IndexeddbStateStoreError {
63    #[error(transparent)]
64    Json(#[from] serde_json::Error),
65    #[error(transparent)]
66    Encryption(#[from] EncryptionError),
67    #[error("DomException {name} ({code}): {message}")]
68    DomException { name: String, message: String, code: u16 },
69    #[error(transparent)]
70    StoreError(#[from] StoreError),
71    #[error("Can't migrate {name} from {old_version} to {new_version} without deleting data. See MigrationConflictStrategy for ways to configure.")]
72    MigrationConflict { name: String, old_version: u32, new_version: u32 },
73}
74
75impl From<web_sys::DomException> for IndexeddbStateStoreError {
76    fn from(frm: web_sys::DomException) -> IndexeddbStateStoreError {
77        IndexeddbStateStoreError::DomException {
78            name: frm.name(),
79            message: frm.message(),
80            code: frm.code(),
81        }
82    }
83}
84
85impl From<IndexeddbStateStoreError> for StoreError {
86    fn from(e: IndexeddbStateStoreError) -> Self {
87        match e {
88            IndexeddbStateStoreError::Json(e) => StoreError::Json(e),
89            IndexeddbStateStoreError::StoreError(e) => e,
90            IndexeddbStateStoreError::Encryption(e) => StoreError::Encryption(e),
91            _ => StoreError::backend(e),
92        }
93    }
94}
95
96mod keys {
97    pub const INTERNAL_STATE: &str = "matrix-sdk-state";
98    pub const BACKUPS_META: &str = "backups";
99
100    pub const ACCOUNT_DATA: &str = "account_data";
101
102    pub const PROFILES: &str = "profiles";
103    pub const DISPLAY_NAMES: &str = "display_names";
104    pub const USER_IDS: &str = "user_ids";
105
106    pub const ROOM_STATE: &str = "room_state";
107    pub const ROOM_INFOS: &str = "room_infos";
108    pub const PRESENCE: &str = "presence";
109    pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
110    /// Table used to save send queue events.
111    pub const ROOM_SEND_QUEUE: &str = "room_send_queue";
112    /// Table used to save dependent send queue events.
113    pub const DEPENDENT_SEND_QUEUE: &str = "room_dependent_send_queue";
114
115    pub const STRIPPED_ROOM_STATE: &str = "stripped_room_state";
116    pub const STRIPPED_USER_IDS: &str = "stripped_user_ids";
117
118    pub const ROOM_USER_RECEIPTS: &str = "room_user_receipts";
119    pub const ROOM_EVENT_RECEIPTS: &str = "room_event_receipts";
120
121    pub const CUSTOM: &str = "custom";
122    pub const KV: &str = "kv";
123
124    /// All names of the current state stores for convenience.
125    pub const ALL_STORES: &[&str] = &[
126        ACCOUNT_DATA,
127        PROFILES,
128        DISPLAY_NAMES,
129        USER_IDS,
130        ROOM_STATE,
131        ROOM_INFOS,
132        PRESENCE,
133        ROOM_ACCOUNT_DATA,
134        STRIPPED_ROOM_STATE,
135        STRIPPED_USER_IDS,
136        ROOM_USER_RECEIPTS,
137        ROOM_EVENT_RECEIPTS,
138        ROOM_SEND_QUEUE,
139        DEPENDENT_SEND_QUEUE,
140        CUSTOM,
141        KV,
142    ];
143
144    // static keys
145
146    pub const STORE_KEY: &str = "store_key";
147}
148
149pub use keys::ALL_STORES;
150use matrix_sdk_base::store::QueueWedgeError;
151
152/// Encrypt (if needs be) then JSON-serialize a value.
153fn serialize_value(store_cipher: Option<&StoreCipher>, event: &impl Serialize) -> Result<JsValue> {
154    Ok(match store_cipher {
155        Some(cipher) => {
156            let data = serde_json::to_vec(event)?;
157            JsValue::from_serde(&cipher.encrypt_value_data(data)?)?
158        }
159        None => JsValue::from_serde(event)?,
160    })
161}
162
163/// Deserialize a JSON value and then decrypt it (if needs be).
164fn deserialize_value<T: DeserializeOwned>(
165    store_cipher: Option<&StoreCipher>,
166    event: &JsValue,
167) -> Result<T> {
168    match store_cipher {
169        Some(cipher) => {
170            use zeroize::Zeroize;
171            let mut plaintext = cipher.decrypt_value_data(event.into_serde()?)?;
172            let ret = serde_json::from_slice(&plaintext);
173            plaintext.zeroize();
174            Ok(ret?)
175        }
176        None => Ok(event.into_serde()?),
177    }
178}
179
180fn encode_key<T>(store_cipher: Option<&StoreCipher>, table_name: &str, key: T) -> JsValue
181where
182    T: SafeEncode,
183{
184    match store_cipher {
185        Some(cipher) => key.as_secure_string(table_name, cipher),
186        None => key.as_encoded_string(),
187    }
188    .into()
189}
190
191fn encode_to_range<T>(
192    store_cipher: Option<&StoreCipher>,
193    table_name: &str,
194    key: T,
195) -> Result<IdbKeyRange>
196where
197    T: SafeEncode,
198{
199    match store_cipher {
200        Some(cipher) => key.encode_to_range_secure(table_name, cipher),
201        None => key.encode_to_range(),
202    }
203    .map_err(|e| IndexeddbStateStoreError::StoreError(StoreError::Backend(anyhow!(e).into())))
204}
205
206/// Builder for [`IndexeddbStateStore`].
207#[derive(Debug)]
208pub struct IndexeddbStateStoreBuilder {
209    name: Option<String>,
210    passphrase: Option<String>,
211    migration_conflict_strategy: MigrationConflictStrategy,
212}
213
214impl IndexeddbStateStoreBuilder {
215    fn new() -> Self {
216        Self {
217            name: None,
218            passphrase: None,
219            migration_conflict_strategy: MigrationConflictStrategy::BackupAndDrop,
220        }
221    }
222
223    /// Set the name for the indexeddb store to use, `state` is none given.
224    pub fn name(mut self, value: String) -> Self {
225        self.name = Some(value);
226        self
227    }
228
229    /// Set the password the indexeddb should be encrypted with.
230    ///
231    /// If not given, the DB is not encrypted.
232    pub fn passphrase(mut self, value: String) -> Self {
233        self.passphrase = Some(value);
234        self
235    }
236
237    /// The strategy to use when a merge conflict is found.
238    ///
239    /// See [`MigrationConflictStrategy`] for details.
240    pub fn migration_conflict_strategy(mut self, value: MigrationConflictStrategy) -> Self {
241        self.migration_conflict_strategy = value;
242        self
243    }
244
245    pub async fn build(self) -> Result<IndexeddbStateStore> {
246        let migration_strategy = self.migration_conflict_strategy.clone();
247        let name = self.name.unwrap_or_else(|| "state".to_owned());
248
249        let meta_name = format!("{name}::{}", keys::INTERNAL_STATE);
250
251        let (meta, store_cipher) = upgrade_meta_db(&meta_name, self.passphrase.as_deref()).await?;
252        let inner =
253            upgrade_inner_db(&name, store_cipher.as_deref(), migration_strategy, &meta).await?;
254
255        Ok(IndexeddbStateStore { name, inner, meta, store_cipher })
256    }
257}
258
259pub struct IndexeddbStateStore {
260    name: String,
261    pub(crate) inner: IdbDatabase,
262    pub(crate) meta: IdbDatabase,
263    pub(crate) store_cipher: Option<Arc<StoreCipher>>,
264}
265
266#[cfg(not(tarpaulin_include))]
267impl std::fmt::Debug for IndexeddbStateStore {
268    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269        f.debug_struct("IndexeddbStateStore").field("name", &self.name).finish()
270    }
271}
272
273type Result<A, E = IndexeddbStateStoreError> = std::result::Result<A, E>;
274
275impl IndexeddbStateStore {
276    /// Generate a IndexeddbStateStoreBuilder with default parameters
277    pub fn builder() -> IndexeddbStateStoreBuilder {
278        IndexeddbStateStoreBuilder::new()
279    }
280
281    /// The version of the database containing the data.
282    pub fn version(&self) -> u32 {
283        self.inner.version() as u32
284    }
285
286    /// The version of the database containing the metadata.
287    pub fn meta_version(&self) -> u32 {
288        self.meta.version() as u32
289    }
290
291    /// Whether this database has any migration backups
292    pub async fn has_backups(&self) -> Result<bool> {
293        Ok(self
294            .meta
295            .transaction_on_one_with_mode(keys::BACKUPS_META, IdbTransactionMode::Readonly)?
296            .object_store(keys::BACKUPS_META)?
297            .count()?
298            .await?
299            > 0)
300    }
301
302    /// What's the database name of the latest backup<
303    pub async fn latest_backup(&self) -> Result<Option<String>> {
304        Ok(self
305            .meta
306            .transaction_on_one_with_mode(keys::BACKUPS_META, IdbTransactionMode::Readonly)?
307            .object_store(keys::BACKUPS_META)?
308            .open_cursor_with_direction(IdbCursorDirection::Prev)?
309            .await?
310            .and_then(|c| c.value().as_string()))
311    }
312
313    /// Encrypt (if needs be) then JSON-serialize a value.
314    fn serialize_value(&self, event: &impl Serialize) -> Result<JsValue> {
315        serialize_value(self.store_cipher.as_deref(), event)
316    }
317
318    /// Deserialize a JSON value and then decrypt it (if needs be).
319    fn deserialize_value<T: DeserializeOwned>(&self, event: &JsValue) -> Result<T> {
320        deserialize_value(self.store_cipher.as_deref(), event)
321    }
322
323    fn encode_key<T>(&self, table_name: &str, key: T) -> JsValue
324    where
325        T: SafeEncode,
326    {
327        encode_key(self.store_cipher.as_deref(), table_name, key)
328    }
329
330    fn encode_to_range<T>(&self, table_name: &str, key: T) -> Result<IdbKeyRange>
331    where
332        T: SafeEncode,
333    {
334        encode_to_range(self.store_cipher.as_deref(), table_name, key)
335    }
336
337    /// Get user IDs for the given room with the given memberships and stripped
338    /// state.
339    pub async fn get_user_ids_inner(
340        &self,
341        room_id: &RoomId,
342        memberships: RoomMemberships,
343        stripped: bool,
344    ) -> Result<Vec<OwnedUserId>> {
345        let store_name = if stripped { keys::STRIPPED_USER_IDS } else { keys::USER_IDS };
346
347        let tx =
348            self.inner.transaction_on_one_with_mode(store_name, IdbTransactionMode::Readonly)?;
349        let store = tx.object_store(store_name)?;
350        let range = self.encode_to_range(store_name, room_id)?;
351
352        let user_ids = if memberships.is_empty() {
353            // It should be faster to just get all user IDs in this case.
354            store
355                .get_all_with_key(&range)?
356                .await?
357                .iter()
358                .filter_map(|f| self.deserialize_value::<RoomMember>(&f).ok().map(|m| m.user_id))
359                .collect::<Vec<_>>()
360        } else {
361            let mut user_ids = Vec::new();
362            let cursor = store.open_cursor_with_range(&range)?.await?;
363
364            if let Some(cursor) = cursor {
365                loop {
366                    let value = cursor.value();
367                    let member = self.deserialize_value::<RoomMember>(&value)?;
368
369                    if memberships.matches(&member.membership) {
370                        user_ids.push(member.user_id);
371                    }
372
373                    if !cursor.continue_cursor()?.await? {
374                        break;
375                    }
376                }
377            }
378
379            user_ids
380        };
381
382        Ok(user_ids)
383    }
384
385    async fn get_custom_value_for_js(&self, jskey: &JsValue) -> Result<Option<Vec<u8>>> {
386        self.inner
387            .transaction_on_one_with_mode(keys::CUSTOM, IdbTransactionMode::Readonly)?
388            .object_store(keys::CUSTOM)?
389            .get(jskey)?
390            .await?
391            .map(|f| self.deserialize_value(&f))
392            .transpose()
393    }
394
395    fn encode_kv_data_key(&self, key: StateStoreDataKey<'_>) -> JsValue {
396        // Use the key (prefix) for the table name as well, to keep encoded
397        // keys compatible for the sync token and filters, which were in
398        // separate tables initially.
399        match key {
400            StateStoreDataKey::SyncToken => {
401                self.encode_key(StateStoreDataKey::SYNC_TOKEN, StateStoreDataKey::SYNC_TOKEN)
402            }
403            StateStoreDataKey::ServerCapabilities => self.encode_key(
404                StateStoreDataKey::SERVER_CAPABILITIES,
405                StateStoreDataKey::SERVER_CAPABILITIES,
406            ),
407            StateStoreDataKey::Filter(filter_name) => {
408                self.encode_key(StateStoreDataKey::FILTER, (StateStoreDataKey::FILTER, filter_name))
409            }
410            StateStoreDataKey::UserAvatarUrl(user_id) => {
411                self.encode_key(keys::KV, (StateStoreDataKey::USER_AVATAR_URL, user_id))
412            }
413            StateStoreDataKey::RecentlyVisitedRooms(user_id) => {
414                self.encode_key(keys::KV, (StateStoreDataKey::RECENTLY_VISITED_ROOMS, user_id))
415            }
416            StateStoreDataKey::UtdHookManagerData => {
417                self.encode_key(keys::KV, StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
418            }
419            StateStoreDataKey::ComposerDraft(room_id) => {
420                self.encode_key(keys::KV, (StateStoreDataKey::COMPOSER_DRAFT, room_id))
421            }
422            StateStoreDataKey::SeenKnockRequests(room_id) => {
423                self.encode_key(keys::KV, (StateStoreDataKey::SEEN_KNOCK_REQUESTS, room_id))
424            }
425        }
426    }
427}
428
429/// A superset of [`QueuedRequest`] that also contains the room id, since we
430/// want to return them.
431#[derive(Serialize, Deserialize)]
432struct PersistedQueuedRequest {
433    /// In which room is this event going to be sent.
434    pub room_id: OwnedRoomId,
435
436    // All these fields are the same as in [`QueuedRequest`].
437    /// Kind. Optional because it might be missing from previous formats.
438    kind: Option<QueuedRequestKind>,
439    transaction_id: OwnedTransactionId,
440
441    pub error: Option<QueueWedgeError>,
442
443    priority: Option<usize>,
444
445    /// The time the original message was first attempted to be sent at.
446    #[serde(default = "created_now")]
447    created_at: MilliSecondsSinceUnixEpoch,
448
449    // Migrated fields: keep these private, they're not used anymore elsewhere in the code base.
450    /// Deprecated (from old format), now replaced with error field.
451    is_wedged: Option<bool>,
452
453    event: Option<SerializableEventContent>,
454}
455
456fn created_now() -> MilliSecondsSinceUnixEpoch {
457    MilliSecondsSinceUnixEpoch::now()
458}
459
460impl PersistedQueuedRequest {
461    fn into_queued_request(self) -> Option<QueuedRequest> {
462        let kind =
463            self.kind.or_else(|| self.event.map(|content| QueuedRequestKind::Event { content }))?;
464
465        let error = match self.is_wedged {
466            Some(true) => {
467                // Migrate to a generic error.
468                Some(QueueWedgeError::GenericApiError {
469                    msg: "local echo failed to send in a previous session".into(),
470                })
471            }
472            _ => self.error,
473        };
474
475        // By default, events without a priority have a priority of 0.
476        let priority = self.priority.unwrap_or(0);
477
478        Some(QueuedRequest {
479            kind,
480            transaction_id: self.transaction_id,
481            error,
482            priority,
483            created_at: self.created_at,
484        })
485    }
486}
487
488// Small hack to have the following macro invocation act as the appropriate
489// trait impl block on wasm, but still be compiled on non-wasm as a regular
490// impl block otherwise.
491//
492// The trait impl doesn't compile on non-wasm due to unfulfilled trait bounds,
493// this hack allows us to still have most of rust-analyzer's IDE functionality
494// within the impl block without having to set it up to check things against
495// the wasm target (which would disable many other parts of the codebase).
496#[cfg(target_arch = "wasm32")]
497macro_rules! impl_state_store {
498    ({ $($body:tt)* }) => {
499        #[async_trait(?Send)]
500        impl StateStore for IndexeddbStateStore {
501            type Error = IndexeddbStateStoreError;
502
503            $($body)*
504        }
505    };
506}
507
508#[cfg(not(target_arch = "wasm32"))]
509macro_rules! impl_state_store {
510    ({ $($body:tt)* }) => {
511        impl IndexeddbStateStore {
512            $($body)*
513        }
514    };
515}
516
517impl_state_store!({
518    async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
519        let encoded_key = self.encode_kv_data_key(key);
520
521        let value = self
522            .inner
523            .transaction_on_one_with_mode(keys::KV, IdbTransactionMode::Readonly)?
524            .object_store(keys::KV)?
525            .get(&encoded_key)?
526            .await?;
527
528        let value = match key {
529            StateStoreDataKey::SyncToken => value
530                .map(|f| self.deserialize_value::<String>(&f))
531                .transpose()?
532                .map(StateStoreDataValue::SyncToken),
533            StateStoreDataKey::ServerCapabilities => value
534                .map(|f| self.deserialize_value::<ServerCapabilities>(&f))
535                .transpose()?
536                .map(StateStoreDataValue::ServerCapabilities),
537            StateStoreDataKey::Filter(_) => value
538                .map(|f| self.deserialize_value::<String>(&f))
539                .transpose()?
540                .map(StateStoreDataValue::Filter),
541            StateStoreDataKey::UserAvatarUrl(_) => value
542                .map(|f| self.deserialize_value::<OwnedMxcUri>(&f))
543                .transpose()?
544                .map(StateStoreDataValue::UserAvatarUrl),
545            StateStoreDataKey::RecentlyVisitedRooms(_) => value
546                .map(|f| self.deserialize_value::<Vec<OwnedRoomId>>(&f))
547                .transpose()?
548                .map(StateStoreDataValue::RecentlyVisitedRooms),
549            StateStoreDataKey::UtdHookManagerData => value
550                .map(|f| self.deserialize_value::<GrowableBloom>(&f))
551                .transpose()?
552                .map(StateStoreDataValue::UtdHookManagerData),
553            StateStoreDataKey::ComposerDraft(_) => value
554                .map(|f| self.deserialize_value::<ComposerDraft>(&f))
555                .transpose()?
556                .map(StateStoreDataValue::ComposerDraft),
557            StateStoreDataKey::SeenKnockRequests(_) => value
558                .map(|f| self.deserialize_value::<BTreeMap<OwnedEventId, OwnedUserId>>(&f))
559                .transpose()?
560                .map(StateStoreDataValue::SeenKnockRequests),
561        };
562
563        Ok(value)
564    }
565
566    async fn set_kv_data(
567        &self,
568        key: StateStoreDataKey<'_>,
569        value: StateStoreDataValue,
570    ) -> Result<()> {
571        let encoded_key = self.encode_kv_data_key(key);
572
573        let serialized_value = match key {
574            StateStoreDataKey::SyncToken => self
575                .serialize_value(&value.into_sync_token().expect("Session data not a sync token")),
576            StateStoreDataKey::ServerCapabilities => self.serialize_value(
577                &value
578                    .into_server_capabilities()
579                    .expect("Session data not containing server capabilities"),
580            ),
581            StateStoreDataKey::Filter(_) => {
582                self.serialize_value(&value.into_filter().expect("Session data not a filter"))
583            }
584            StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
585                &value.into_user_avatar_url().expect("Session data not an user avatar url"),
586            ),
587            StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
588                &value
589                    .into_recently_visited_rooms()
590                    .expect("Session data not a recently visited room list"),
591            ),
592            StateStoreDataKey::UtdHookManagerData => self.serialize_value(
593                &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
594            ),
595            StateStoreDataKey::ComposerDraft(_) => self.serialize_value(
596                &value.into_composer_draft().expect("Session data not a composer draft"),
597            ),
598            StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
599                &value
600                    .into_seen_knock_requests()
601                    .expect("Session data is not a set of seen knock request ids"),
602            ),
603        };
604
605        let tx =
606            self.inner.transaction_on_one_with_mode(keys::KV, IdbTransactionMode::Readwrite)?;
607
608        let obj = tx.object_store(keys::KV)?;
609
610        obj.put_key_val(&encoded_key, &serialized_value?)?;
611
612        tx.await.into_result()?;
613
614        Ok(())
615    }
616
617    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
618        let encoded_key = self.encode_kv_data_key(key);
619
620        let tx =
621            self.inner.transaction_on_one_with_mode(keys::KV, IdbTransactionMode::Readwrite)?;
622        let obj = tx.object_store(keys::KV)?;
623
624        obj.delete(&encoded_key)?;
625
626        tx.await.into_result()?;
627
628        Ok(())
629    }
630
631    async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
632        let mut stores: HashSet<&'static str> = [
633            (changes.sync_token.is_some(), keys::KV),
634            (!changes.ambiguity_maps.is_empty(), keys::DISPLAY_NAMES),
635            (!changes.account_data.is_empty(), keys::ACCOUNT_DATA),
636            (!changes.presence.is_empty(), keys::PRESENCE),
637            (
638                !changes.profiles.is_empty() || !changes.profiles_to_delete.is_empty(),
639                keys::PROFILES,
640            ),
641            (!changes.room_account_data.is_empty(), keys::ROOM_ACCOUNT_DATA),
642            (!changes.receipts.is_empty(), keys::ROOM_EVENT_RECEIPTS),
643        ]
644        .iter()
645        .filter_map(|(id, key)| if *id { Some(*key) } else { None })
646        .collect();
647
648        if !changes.state.is_empty() {
649            stores.extend([
650                keys::ROOM_STATE,
651                keys::USER_IDS,
652                keys::STRIPPED_USER_IDS,
653                keys::STRIPPED_ROOM_STATE,
654                keys::PROFILES,
655            ]);
656        }
657
658        if !changes.redactions.is_empty() {
659            stores.extend([keys::ROOM_STATE, keys::ROOM_INFOS]);
660        }
661
662        if !changes.room_infos.is_empty() {
663            stores.insert(keys::ROOM_INFOS);
664        }
665
666        if !changes.stripped_state.is_empty() {
667            stores.extend([keys::STRIPPED_ROOM_STATE, keys::STRIPPED_USER_IDS]);
668        }
669
670        if !changes.receipts.is_empty() {
671            stores.extend([keys::ROOM_EVENT_RECEIPTS, keys::ROOM_USER_RECEIPTS])
672        }
673
674        if stores.is_empty() {
675            // nothing to do, quit early
676            return Ok(());
677        }
678
679        let stores: Vec<&'static str> = stores.into_iter().collect();
680        let tx =
681            self.inner.transaction_on_multi_with_mode(&stores, IdbTransactionMode::Readwrite)?;
682
683        if let Some(s) = &changes.sync_token {
684            tx.object_store(keys::KV)?.put_key_val(
685                &self.encode_kv_data_key(StateStoreDataKey::SyncToken),
686                &self.serialize_value(s)?,
687            )?;
688        }
689
690        if !changes.ambiguity_maps.is_empty() {
691            let store = tx.object_store(keys::DISPLAY_NAMES)?;
692            for (room_id, ambiguity_maps) in &changes.ambiguity_maps {
693                for (display_name, map) in ambiguity_maps {
694                    let key = self.encode_key(
695                        keys::DISPLAY_NAMES,
696                        (
697                            room_id,
698                            display_name
699                                .as_normalized_str()
700                                .unwrap_or_else(|| display_name.as_raw_str()),
701                        ),
702                    );
703
704                    store.put_key_val(&key, &self.serialize_value(&map)?)?;
705                }
706            }
707        }
708
709        if !changes.account_data.is_empty() {
710            let store = tx.object_store(keys::ACCOUNT_DATA)?;
711            for (event_type, event) in &changes.account_data {
712                store.put_key_val(
713                    &self.encode_key(keys::ACCOUNT_DATA, event_type),
714                    &self.serialize_value(&event)?,
715                )?;
716            }
717        }
718
719        if !changes.room_account_data.is_empty() {
720            let store = tx.object_store(keys::ROOM_ACCOUNT_DATA)?;
721            for (room, events) in &changes.room_account_data {
722                for (event_type, event) in events {
723                    let key = self.encode_key(keys::ROOM_ACCOUNT_DATA, (room, event_type));
724                    store.put_key_val(&key, &self.serialize_value(&event)?)?;
725                }
726            }
727        }
728
729        if !changes.state.is_empty() {
730            let state = tx.object_store(keys::ROOM_STATE)?;
731            let profiles = tx.object_store(keys::PROFILES)?;
732            let user_ids = tx.object_store(keys::USER_IDS)?;
733            let stripped_state = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
734            let stripped_user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
735
736            for (room, user_ids) in &changes.profiles_to_delete {
737                for user_id in user_ids {
738                    let key = self.encode_key(keys::PROFILES, (room, user_id));
739                    profiles.delete(&key)?;
740                }
741            }
742
743            for (room, event_types) in &changes.state {
744                let profile_changes = changes.profiles.get(room);
745
746                for (event_type, events) in event_types {
747                    for (state_key, raw_event) in events {
748                        let key = self.encode_key(keys::ROOM_STATE, (room, event_type, state_key));
749                        state.put_key_val(&key, &self.serialize_value(&raw_event)?)?;
750                        stripped_state.delete(&key)?;
751
752                        if *event_type == StateEventType::RoomMember {
753                            let event = match raw_event.deserialize_as::<SyncRoomMemberEvent>() {
754                                Ok(ev) => ev,
755                                Err(e) => {
756                                    let event_id: Option<String> =
757                                        raw_event.get_field("event_id").ok().flatten();
758                                    debug!(event_id, "Failed to deserialize member event: {e}");
759                                    continue;
760                                }
761                            };
762
763                            let key = (room, state_key);
764
765                            stripped_user_ids
766                                .delete(&self.encode_key(keys::STRIPPED_USER_IDS, key))?;
767
768                            user_ids.put_key_val_owned(
769                                self.encode_key(keys::USER_IDS, key),
770                                &self.serialize_value(&RoomMember::from(&event))?,
771                            )?;
772
773                            if let Some(profile) =
774                                profile_changes.and_then(|p| p.get(event.state_key()))
775                            {
776                                profiles.put_key_val_owned(
777                                    self.encode_key(keys::PROFILES, key),
778                                    &self.serialize_value(&profile)?,
779                                )?;
780                            }
781                        }
782                    }
783                }
784            }
785        }
786
787        if !changes.room_infos.is_empty() {
788            let room_infos = tx.object_store(keys::ROOM_INFOS)?;
789            for (room_id, room_info) in &changes.room_infos {
790                room_infos.put_key_val(
791                    &self.encode_key(keys::ROOM_INFOS, room_id),
792                    &self.serialize_value(&room_info)?,
793                )?;
794            }
795        }
796
797        if !changes.presence.is_empty() {
798            let store = tx.object_store(keys::PRESENCE)?;
799            for (sender, event) in &changes.presence {
800                store.put_key_val(
801                    &self.encode_key(keys::PRESENCE, sender),
802                    &self.serialize_value(&event)?,
803                )?;
804            }
805        }
806
807        if !changes.stripped_state.is_empty() {
808            let store = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
809            let user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
810
811            for (room, event_types) in &changes.stripped_state {
812                for (event_type, events) in event_types {
813                    for (state_key, raw_event) in events {
814                        let key = self
815                            .encode_key(keys::STRIPPED_ROOM_STATE, (room, event_type, state_key));
816                        store.put_key_val(&key, &self.serialize_value(&raw_event)?)?;
817
818                        if *event_type == StateEventType::RoomMember {
819                            let event = match raw_event.deserialize_as::<StrippedRoomMemberEvent>()
820                            {
821                                Ok(ev) => ev,
822                                Err(e) => {
823                                    let event_id: Option<String> =
824                                        raw_event.get_field("event_id").ok().flatten();
825                                    debug!(
826                                        event_id,
827                                        "Failed to deserialize stripped member event: {e}"
828                                    );
829                                    continue;
830                                }
831                            };
832
833                            let key = (room, state_key);
834
835                            user_ids.put_key_val_owned(
836                                self.encode_key(keys::STRIPPED_USER_IDS, key),
837                                &self.serialize_value(&RoomMember::from(&event))?,
838                            )?;
839                        }
840                    }
841                }
842            }
843        }
844
845        if !changes.receipts.is_empty() {
846            let room_user_receipts = tx.object_store(keys::ROOM_USER_RECEIPTS)?;
847            let room_event_receipts = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
848
849            for (room, content) in &changes.receipts {
850                for (event_id, receipts) in &content.0 {
851                    for (receipt_type, receipts) in receipts {
852                        for (user_id, receipt) in receipts {
853                            let key = match receipt.thread.as_str() {
854                                Some(thread_id) => self.encode_key(
855                                    keys::ROOM_USER_RECEIPTS,
856                                    (room, receipt_type, thread_id, user_id),
857                                ),
858                                None => self.encode_key(
859                                    keys::ROOM_USER_RECEIPTS,
860                                    (room, receipt_type, user_id),
861                                ),
862                            };
863
864                            if let Some((old_event, _)) =
865                                room_user_receipts.get(&key)?.await?.and_then(|f| {
866                                    self.deserialize_value::<(OwnedEventId, Receipt)>(&f).ok()
867                                })
868                            {
869                                let key = match receipt.thread.as_str() {
870                                    Some(thread_id) => self.encode_key(
871                                        keys::ROOM_EVENT_RECEIPTS,
872                                        (room, receipt_type, thread_id, old_event, user_id),
873                                    ),
874                                    None => self.encode_key(
875                                        keys::ROOM_EVENT_RECEIPTS,
876                                        (room, receipt_type, old_event, user_id),
877                                    ),
878                                };
879                                room_event_receipts.delete(&key)?;
880                            }
881
882                            room_user_receipts
883                                .put_key_val(&key, &self.serialize_value(&(event_id, receipt))?)?;
884
885                            // Add the receipt to the room event receipts
886                            let key = match receipt.thread.as_str() {
887                                Some(thread_id) => self.encode_key(
888                                    keys::ROOM_EVENT_RECEIPTS,
889                                    (room, receipt_type, thread_id, event_id, user_id),
890                                ),
891                                None => self.encode_key(
892                                    keys::ROOM_EVENT_RECEIPTS,
893                                    (room, receipt_type, event_id, user_id),
894                                ),
895                            };
896                            room_event_receipts
897                                .put_key_val(&key, &self.serialize_value(&(user_id, receipt))?)?;
898                        }
899                    }
900                }
901            }
902        }
903
904        if !changes.redactions.is_empty() {
905            let state = tx.object_store(keys::ROOM_STATE)?;
906            let room_info = tx.object_store(keys::ROOM_INFOS)?;
907
908            for (room_id, redactions) in &changes.redactions {
909                let range = self.encode_to_range(keys::ROOM_STATE, room_id)?;
910                let Some(cursor) = state.open_cursor_with_range(&range)?.await? else { continue };
911
912                let mut room_version = None;
913
914                while let Some(key) = cursor.key() {
915                    let raw_evt =
916                        self.deserialize_value::<Raw<AnySyncStateEvent>>(&cursor.value())?;
917                    if let Ok(Some(event_id)) = raw_evt.get_field::<OwnedEventId>("event_id") {
918                        if let Some(redaction) = redactions.get(&event_id) {
919                            let version = {
920                                if room_version.is_none() {
921                                    room_version.replace(room_info
922                                        .get(&self.encode_key(keys::ROOM_INFOS, room_id))?
923                                        .await?
924                                        .and_then(|f| self.deserialize_value::<RoomInfo>(&f).ok())
925                                        .and_then(|info| info.room_version().cloned())
926                                        .unwrap_or_else(|| {
927                                            warn!(?room_id, "Unable to find the room version, assume version 9");
928                                            RoomVersionId::V9
929                                        })
930                                    );
931                                }
932                                room_version.as_ref().unwrap()
933                            };
934
935                            let redacted = redact(
936                                raw_evt.deserialize_as::<CanonicalJsonObject>()?,
937                                version,
938                                Some(RedactedBecause::from_raw_event(redaction)?),
939                            )
940                            .map_err(StoreError::Redaction)?;
941                            state.put_key_val(&key, &self.serialize_value(&redacted)?)?;
942                        }
943                    }
944
945                    // move forward.
946                    cursor.advance(1)?.await?;
947                }
948            }
949        }
950
951        tx.await.into_result().map_err(|e| e.into())
952    }
953
954    async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
955        self.inner
956            .transaction_on_one_with_mode(keys::PRESENCE, IdbTransactionMode::Readonly)?
957            .object_store(keys::PRESENCE)?
958            .get(&self.encode_key(keys::PRESENCE, user_id))?
959            .await?
960            .map(|f| self.deserialize_value(&f))
961            .transpose()
962    }
963
964    async fn get_presence_events(
965        &self,
966        user_ids: &[OwnedUserId],
967    ) -> Result<Vec<Raw<PresenceEvent>>> {
968        if user_ids.is_empty() {
969            return Ok(Vec::new());
970        }
971
972        let txn = self
973            .inner
974            .transaction_on_one_with_mode(keys::PRESENCE, IdbTransactionMode::Readonly)?;
975        let store = txn.object_store(keys::PRESENCE)?;
976
977        let mut events = Vec::with_capacity(user_ids.len());
978
979        for user_id in user_ids {
980            if let Some(event) = store
981                .get(&self.encode_key(keys::PRESENCE, user_id))?
982                .await?
983                .map(|f| self.deserialize_value(&f))
984                .transpose()?
985            {
986                events.push(event)
987            }
988        }
989
990        Ok(events)
991    }
992
993    async fn get_state_event(
994        &self,
995        room_id: &RoomId,
996        event_type: StateEventType,
997        state_key: &str,
998    ) -> Result<Option<RawAnySyncOrStrippedState>> {
999        Ok(self
1000            .get_state_events_for_keys(room_id, event_type, &[state_key])
1001            .await?
1002            .into_iter()
1003            .next())
1004    }
1005
1006    async fn get_state_events(
1007        &self,
1008        room_id: &RoomId,
1009        event_type: StateEventType,
1010    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1011        let stripped_range =
1012            self.encode_to_range(keys::STRIPPED_ROOM_STATE, (room_id, &event_type))?;
1013        let stripped_events = self
1014            .inner
1015            .transaction_on_one_with_mode(keys::STRIPPED_ROOM_STATE, IdbTransactionMode::Readonly)?
1016            .object_store(keys::STRIPPED_ROOM_STATE)?
1017            .get_all_with_key(&stripped_range)?
1018            .await?
1019            .iter()
1020            .filter_map(|f| {
1021                self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Stripped)
1022            })
1023            .collect::<Vec<_>>();
1024
1025        if !stripped_events.is_empty() {
1026            return Ok(stripped_events);
1027        }
1028
1029        let range = self.encode_to_range(keys::ROOM_STATE, (room_id, event_type))?;
1030        Ok(self
1031            .inner
1032            .transaction_on_one_with_mode(keys::ROOM_STATE, IdbTransactionMode::Readonly)?
1033            .object_store(keys::ROOM_STATE)?
1034            .get_all_with_key(&range)?
1035            .await?
1036            .iter()
1037            .filter_map(|f| self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Sync))
1038            .collect::<Vec<_>>())
1039    }
1040
1041    async fn get_state_events_for_keys(
1042        &self,
1043        room_id: &RoomId,
1044        event_type: StateEventType,
1045        state_keys: &[&str],
1046    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1047        if state_keys.is_empty() {
1048            return Ok(Vec::new());
1049        }
1050
1051        let mut events = Vec::with_capacity(state_keys.len());
1052
1053        {
1054            let txn = self.inner.transaction_on_one_with_mode(
1055                keys::STRIPPED_ROOM_STATE,
1056                IdbTransactionMode::Readonly,
1057            )?;
1058            let store = txn.object_store(keys::STRIPPED_ROOM_STATE)?;
1059
1060            for state_key in state_keys {
1061                if let Some(event) =
1062                    store
1063                        .get(&self.encode_key(
1064                            keys::STRIPPED_ROOM_STATE,
1065                            (room_id, &event_type, state_key),
1066                        ))?
1067                        .await?
1068                        .map(|f| self.deserialize_value(&f))
1069                        .transpose()?
1070                {
1071                    events.push(RawAnySyncOrStrippedState::Stripped(event));
1072                }
1073            }
1074
1075            if !events.is_empty() {
1076                return Ok(events);
1077            }
1078        }
1079
1080        let txn = self
1081            .inner
1082            .transaction_on_one_with_mode(keys::ROOM_STATE, IdbTransactionMode::Readonly)?;
1083        let store = txn.object_store(keys::ROOM_STATE)?;
1084
1085        for state_key in state_keys {
1086            if let Some(event) = store
1087                .get(&self.encode_key(keys::ROOM_STATE, (room_id, &event_type, state_key)))?
1088                .await?
1089                .map(|f| self.deserialize_value(&f))
1090                .transpose()?
1091            {
1092                events.push(RawAnySyncOrStrippedState::Sync(event));
1093            }
1094        }
1095
1096        Ok(events)
1097    }
1098
1099    async fn get_profile(
1100        &self,
1101        room_id: &RoomId,
1102        user_id: &UserId,
1103    ) -> Result<Option<MinimalRoomMemberEvent>> {
1104        self.inner
1105            .transaction_on_one_with_mode(keys::PROFILES, IdbTransactionMode::Readonly)?
1106            .object_store(keys::PROFILES)?
1107            .get(&self.encode_key(keys::PROFILES, (room_id, user_id)))?
1108            .await?
1109            .map(|f| self.deserialize_value(&f))
1110            .transpose()
1111    }
1112
1113    async fn get_profiles<'a>(
1114        &self,
1115        room_id: &RoomId,
1116        user_ids: &'a [OwnedUserId],
1117    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1118        if user_ids.is_empty() {
1119            return Ok(BTreeMap::new());
1120        }
1121
1122        let txn = self
1123            .inner
1124            .transaction_on_one_with_mode(keys::PROFILES, IdbTransactionMode::Readonly)?;
1125        let store = txn.object_store(keys::PROFILES)?;
1126
1127        let mut profiles = BTreeMap::new();
1128        for user_id in user_ids {
1129            if let Some(profile) = store
1130                .get(&self.encode_key(keys::PROFILES, (room_id, user_id)))?
1131                .await?
1132                .map(|f| self.deserialize_value(&f))
1133                .transpose()?
1134            {
1135                profiles.insert(user_id.as_ref(), profile);
1136            }
1137        }
1138
1139        Ok(profiles)
1140    }
1141
1142    async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1143        let transaction = self
1144            .inner
1145            .transaction_on_one_with_mode(keys::ROOM_INFOS, IdbTransactionMode::Readonly)?;
1146
1147        let object_store = transaction.object_store(keys::ROOM_INFOS)?;
1148
1149        Ok(match room_load_settings {
1150            RoomLoadSettings::All => object_store
1151                .get_all()?
1152                .await?
1153                .iter()
1154                .map(|room_info| self.deserialize_value::<RoomInfo>(&room_info))
1155                .collect::<Result<_>>()?,
1156
1157            RoomLoadSettings::One(room_id) => {
1158                match object_store.get(&self.encode_key(keys::ROOM_INFOS, room_id))?.await? {
1159                    Some(room_info) => vec![self.deserialize_value::<RoomInfo>(&room_info)?],
1160                    None => vec![],
1161                }
1162            }
1163        })
1164    }
1165
1166    async fn get_users_with_display_name(
1167        &self,
1168        room_id: &RoomId,
1169        display_name: &DisplayName,
1170    ) -> Result<BTreeSet<OwnedUserId>> {
1171        self.inner
1172            .transaction_on_one_with_mode(keys::DISPLAY_NAMES, IdbTransactionMode::Readonly)?
1173            .object_store(keys::DISPLAY_NAMES)?
1174            .get(&self.encode_key(
1175                keys::DISPLAY_NAMES,
1176                (
1177                    room_id,
1178                    display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1179                ),
1180            ))?
1181            .await?
1182            .map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
1183            .unwrap_or_else(|| Ok(Default::default()))
1184    }
1185
1186    async fn get_users_with_display_names<'a>(
1187        &self,
1188        room_id: &RoomId,
1189        display_names: &'a [DisplayName],
1190    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1191        let mut map = HashMap::new();
1192
1193        if display_names.is_empty() {
1194            return Ok(map);
1195        }
1196
1197        let txn = self
1198            .inner
1199            .transaction_on_one_with_mode(keys::DISPLAY_NAMES, IdbTransactionMode::Readonly)?;
1200        let store = txn.object_store(keys::DISPLAY_NAMES)?;
1201
1202        for display_name in display_names {
1203            if let Some(user_ids) = store
1204                .get(
1205                    &self.encode_key(
1206                        keys::DISPLAY_NAMES,
1207                        (
1208                            room_id,
1209                            display_name
1210                                .as_normalized_str()
1211                                .unwrap_or_else(|| display_name.as_raw_str()),
1212                        ),
1213                    ),
1214                )?
1215                .await?
1216                .map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
1217                .transpose()?
1218            {
1219                map.insert(display_name, user_ids);
1220            }
1221        }
1222
1223        Ok(map)
1224    }
1225
1226    async fn get_account_data_event(
1227        &self,
1228        event_type: GlobalAccountDataEventType,
1229    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1230        self.inner
1231            .transaction_on_one_with_mode(keys::ACCOUNT_DATA, IdbTransactionMode::Readonly)?
1232            .object_store(keys::ACCOUNT_DATA)?
1233            .get(&self.encode_key(keys::ACCOUNT_DATA, event_type))?
1234            .await?
1235            .map(|f| self.deserialize_value(&f))
1236            .transpose()
1237    }
1238
1239    async fn get_room_account_data_event(
1240        &self,
1241        room_id: &RoomId,
1242        event_type: RoomAccountDataEventType,
1243    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1244        self.inner
1245            .transaction_on_one_with_mode(keys::ROOM_ACCOUNT_DATA, IdbTransactionMode::Readonly)?
1246            .object_store(keys::ROOM_ACCOUNT_DATA)?
1247            .get(&self.encode_key(keys::ROOM_ACCOUNT_DATA, (room_id, event_type)))?
1248            .await?
1249            .map(|f| self.deserialize_value(&f))
1250            .transpose()
1251    }
1252
1253    async fn get_user_room_receipt_event(
1254        &self,
1255        room_id: &RoomId,
1256        receipt_type: ReceiptType,
1257        thread: ReceiptThread,
1258        user_id: &UserId,
1259    ) -> Result<Option<(OwnedEventId, Receipt)>> {
1260        let key = match thread.as_str() {
1261            Some(thread_id) => self
1262                .encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, thread_id, user_id)),
1263            None => self.encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, user_id)),
1264        };
1265        self.inner
1266            .transaction_on_one_with_mode(keys::ROOM_USER_RECEIPTS, IdbTransactionMode::Readonly)?
1267            .object_store(keys::ROOM_USER_RECEIPTS)?
1268            .get(&key)?
1269            .await?
1270            .map(|f| self.deserialize_value(&f))
1271            .transpose()
1272    }
1273
1274    async fn get_event_room_receipt_events(
1275        &self,
1276        room_id: &RoomId,
1277        receipt_type: ReceiptType,
1278        thread: ReceiptThread,
1279        event_id: &EventId,
1280    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1281        let range = match thread.as_str() {
1282            Some(thread_id) => self.encode_to_range(
1283                keys::ROOM_EVENT_RECEIPTS,
1284                (room_id, receipt_type, thread_id, event_id),
1285            ),
1286            None => {
1287                self.encode_to_range(keys::ROOM_EVENT_RECEIPTS, (room_id, receipt_type, event_id))
1288            }
1289        }?;
1290        let tx = self.inner.transaction_on_one_with_mode(
1291            keys::ROOM_EVENT_RECEIPTS,
1292            IdbTransactionMode::Readonly,
1293        )?;
1294        let store = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
1295
1296        Ok(store
1297            .get_all_with_key(&range)?
1298            .await?
1299            .iter()
1300            .filter_map(|f| self.deserialize_value(&f).ok())
1301            .collect::<Vec<_>>())
1302    }
1303
1304    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1305        let jskey = &JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1306        self.get_custom_value_for_js(jskey).await
1307    }
1308
1309    async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1310        let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1311
1312        let prev = self.get_custom_value_for_js(&jskey).await?;
1313
1314        let tx =
1315            self.inner.transaction_on_one_with_mode(keys::CUSTOM, IdbTransactionMode::Readwrite)?;
1316
1317        tx.object_store(keys::CUSTOM)?.put_key_val(&jskey, &self.serialize_value(&value)?)?;
1318
1319        tx.await.into_result().map_err(IndexeddbStateStoreError::from)?;
1320        Ok(prev)
1321    }
1322
1323    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1324        let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1325
1326        let prev = self.get_custom_value_for_js(&jskey).await?;
1327
1328        let tx =
1329            self.inner.transaction_on_one_with_mode(keys::CUSTOM, IdbTransactionMode::Readwrite)?;
1330
1331        tx.object_store(keys::CUSTOM)?.delete(&jskey)?;
1332
1333        tx.await.into_result().map_err(IndexeddbStateStoreError::from)?;
1334        Ok(prev)
1335    }
1336
1337    async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1338        // All the stores which use a RoomId as their key (and nothing additional).
1339        let direct_stores = [keys::ROOM_INFOS, keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE];
1340
1341        // All the stores which use a RoomId as the first part of their key, but may
1342        // have some additional data in the key.
1343        let prefixed_stores = [
1344            keys::PROFILES,
1345            keys::DISPLAY_NAMES,
1346            keys::USER_IDS,
1347            keys::ROOM_STATE,
1348            keys::ROOM_ACCOUNT_DATA,
1349            keys::ROOM_EVENT_RECEIPTS,
1350            keys::ROOM_USER_RECEIPTS,
1351            keys::STRIPPED_ROOM_STATE,
1352            keys::STRIPPED_USER_IDS,
1353        ];
1354
1355        let all_stores = {
1356            let mut v = Vec::new();
1357            v.extend(prefixed_stores);
1358            v.extend(direct_stores);
1359            v
1360        };
1361
1362        let tx = self
1363            .inner
1364            .transaction_on_multi_with_mode(&all_stores, IdbTransactionMode::Readwrite)?;
1365
1366        for store_name in direct_stores {
1367            tx.object_store(store_name)?.delete(&self.encode_key(store_name, room_id))?;
1368        }
1369
1370        for store_name in prefixed_stores {
1371            let store = tx.object_store(store_name)?;
1372            let range = self.encode_to_range(store_name, room_id)?;
1373            for key in store.get_all_keys_with_key(&range)?.await?.iter() {
1374                store.delete(&key)?;
1375            }
1376        }
1377
1378        tx.await.into_result().map_err(|e| e.into())
1379    }
1380
1381    async fn get_user_ids(
1382        &self,
1383        room_id: &RoomId,
1384        memberships: RoomMemberships,
1385    ) -> Result<Vec<OwnedUserId>> {
1386        let ids = self.get_user_ids_inner(room_id, memberships, true).await?;
1387        if !ids.is_empty() {
1388            return Ok(ids);
1389        }
1390        self.get_user_ids_inner(room_id, memberships, false).await
1391    }
1392
1393    async fn save_send_queue_request(
1394        &self,
1395        room_id: &RoomId,
1396        transaction_id: OwnedTransactionId,
1397        created_at: MilliSecondsSinceUnixEpoch,
1398        kind: QueuedRequestKind,
1399        priority: usize,
1400    ) -> Result<()> {
1401        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1402
1403        let tx = self
1404            .inner
1405            .transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?;
1406
1407        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1408
1409        // We store an encoded vector of the queued requests, with their transaction
1410        // ids.
1411
1412        // Reload the previous vector for this room, or create an empty one.
1413        let prev = obj.get(&encoded_key)?.await?;
1414
1415        let mut prev = prev.map_or_else(
1416            || Ok(Vec::new()),
1417            |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1418        )?;
1419
1420        // Push the new request.
1421        prev.push(PersistedQueuedRequest {
1422            room_id: room_id.to_owned(),
1423            kind: Some(kind),
1424            transaction_id,
1425            error: None,
1426            is_wedged: None,
1427            event: None,
1428            priority: Some(priority),
1429            created_at,
1430        });
1431
1432        // Save the new vector into db.
1433        obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1434
1435        tx.await.into_result()?;
1436
1437        Ok(())
1438    }
1439
1440    async fn update_send_queue_request(
1441        &self,
1442        room_id: &RoomId,
1443        transaction_id: &TransactionId,
1444        kind: QueuedRequestKind,
1445    ) -> Result<bool> {
1446        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1447
1448        let tx = self
1449            .inner
1450            .transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?;
1451
1452        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1453
1454        // We store an encoded vector of the queued requests, with their transaction
1455        // ids.
1456
1457        // Reload the previous vector for this room, or create an empty one.
1458        let prev = obj.get(&encoded_key)?.await?;
1459
1460        let mut prev = prev.map_or_else(
1461            || Ok(Vec::new()),
1462            |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1463        )?;
1464
1465        // Modify the one request.
1466        if let Some(entry) = prev.iter_mut().find(|entry| entry.transaction_id == transaction_id) {
1467            entry.kind = Some(kind);
1468            // Reset the error state.
1469            entry.error = None;
1470            // Remove migrated fields.
1471            entry.is_wedged = None;
1472            entry.event = None;
1473
1474            // Save the new vector into db.
1475            obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1476            tx.await.into_result()?;
1477
1478            Ok(true)
1479        } else {
1480            Ok(false)
1481        }
1482    }
1483
1484    async fn remove_send_queue_request(
1485        &self,
1486        room_id: &RoomId,
1487        transaction_id: &TransactionId,
1488    ) -> Result<bool> {
1489        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1490
1491        let tx = self.inner.transaction_on_multi_with_mode(
1492            &[keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE],
1493            IdbTransactionMode::Readwrite,
1494        )?;
1495
1496        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1497
1498        // We store an encoded vector of the queued requests, with their transaction
1499        // ids.
1500
1501        // Reload the previous vector for this room.
1502        if let Some(val) = obj.get(&encoded_key)?.await? {
1503            let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
1504            if let Some(pos) = prev.iter().position(|item| item.transaction_id == transaction_id) {
1505                prev.remove(pos);
1506
1507                if prev.is_empty() {
1508                    obj.delete(&encoded_key)?;
1509                } else {
1510                    obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1511                }
1512
1513                tx.await.into_result()?;
1514                return Ok(true);
1515            }
1516        }
1517
1518        Ok(false)
1519    }
1520
1521    async fn load_send_queue_requests(&self, room_id: &RoomId) -> Result<Vec<QueuedRequest>> {
1522        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1523
1524        // We store an encoded vector of the queued requests, with their transaction
1525        // ids.
1526        let prev = self
1527            .inner
1528            .transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?
1529            .object_store(keys::ROOM_SEND_QUEUE)?
1530            .get(&encoded_key)?
1531            .await?;
1532
1533        let mut prev = prev.map_or_else(
1534            || Ok(Vec::new()),
1535            |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1536        )?;
1537
1538        // Inverted stable ordering on priority.
1539        prev.sort_by(|lhs, rhs| rhs.priority.unwrap_or(0).cmp(&lhs.priority.unwrap_or(0)));
1540
1541        Ok(prev.into_iter().filter_map(PersistedQueuedRequest::into_queued_request).collect())
1542    }
1543
1544    async fn update_send_queue_request_status(
1545        &self,
1546        room_id: &RoomId,
1547        transaction_id: &TransactionId,
1548        error: Option<QueueWedgeError>,
1549    ) -> Result<()> {
1550        let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1551
1552        let tx = self
1553            .inner
1554            .transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?;
1555
1556        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1557
1558        if let Some(val) = obj.get(&encoded_key)?.await? {
1559            let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
1560            if let Some(request) =
1561                prev.iter_mut().find(|item| item.transaction_id == transaction_id)
1562            {
1563                request.is_wedged = None;
1564                request.error = error;
1565                obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1566            }
1567        }
1568
1569        tx.await.into_result()?;
1570
1571        Ok(())
1572    }
1573
1574    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>> {
1575        let tx = self
1576            .inner
1577            .transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?;
1578
1579        let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1580
1581        let all_entries = obj
1582            .get_all()?
1583            .await?
1584            .into_iter()
1585            .map(|item| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&item))
1586            .collect::<Result<Vec<Vec<PersistedQueuedRequest>>, _>>()?
1587            .into_iter()
1588            .flat_map(|vec| vec.into_iter().map(|item| item.room_id))
1589            .collect::<BTreeSet<_>>();
1590
1591        Ok(all_entries.into_iter().collect())
1592    }
1593
1594    async fn save_dependent_queued_request(
1595        &self,
1596        room_id: &RoomId,
1597        parent_txn_id: &TransactionId,
1598        own_txn_id: ChildTransactionId,
1599        created_at: MilliSecondsSinceUnixEpoch,
1600        content: DependentQueuedRequestKind,
1601    ) -> Result<()> {
1602        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1603
1604        let tx = self.inner.transaction_on_one_with_mode(
1605            keys::DEPENDENT_SEND_QUEUE,
1606            IdbTransactionMode::Readwrite,
1607        )?;
1608
1609        let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1610
1611        // We store an encoded vector of the dependent requests.
1612        // Reload the previous vector for this room, or create an empty one.
1613        let prev = obj.get(&encoded_key)?.await?;
1614
1615        let mut prev = prev.map_or_else(
1616            || Ok(Vec::new()),
1617            |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1618        )?;
1619
1620        // Push the new request.
1621        prev.push(DependentQueuedRequest {
1622            kind: content,
1623            parent_transaction_id: parent_txn_id.to_owned(),
1624            own_transaction_id: own_txn_id,
1625            parent_key: None,
1626            created_at,
1627        });
1628
1629        // Save the new vector into db.
1630        obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1631
1632        tx.await.into_result()?;
1633
1634        Ok(())
1635    }
1636
1637    async fn update_dependent_queued_request(
1638        &self,
1639        room_id: &RoomId,
1640        own_transaction_id: &ChildTransactionId,
1641        new_content: DependentQueuedRequestKind,
1642    ) -> Result<bool> {
1643        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1644
1645        let tx = self.inner.transaction_on_one_with_mode(
1646            keys::DEPENDENT_SEND_QUEUE,
1647            IdbTransactionMode::Readwrite,
1648        )?;
1649
1650        let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1651
1652        // We store an encoded vector of the dependent requests.
1653        // Reload the previous vector for this room, or create an empty one.
1654        let prev = obj.get(&encoded_key)?.await?;
1655
1656        let mut prev = prev.map_or_else(
1657            || Ok(Vec::new()),
1658            |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1659        )?;
1660
1661        // Modify the dependent request, if found.
1662        let mut found = false;
1663        for entry in prev.iter_mut() {
1664            if entry.own_transaction_id == *own_transaction_id {
1665                found = true;
1666                entry.kind = new_content;
1667                break;
1668            }
1669        }
1670
1671        if found {
1672            obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1673            tx.await.into_result()?;
1674        }
1675
1676        Ok(found)
1677    }
1678
1679    async fn mark_dependent_queued_requests_as_ready(
1680        &self,
1681        room_id: &RoomId,
1682        parent_txn_id: &TransactionId,
1683        parent_key: SentRequestKey,
1684    ) -> Result<usize> {
1685        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1686
1687        let tx = self.inner.transaction_on_one_with_mode(
1688            keys::DEPENDENT_SEND_QUEUE,
1689            IdbTransactionMode::Readwrite,
1690        )?;
1691
1692        let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1693
1694        // We store an encoded vector of the dependent requests.
1695        // Reload the previous vector for this room, or create an empty one.
1696        let prev = obj.get(&encoded_key)?.await?;
1697
1698        let mut prev = prev.map_or_else(
1699            || Ok(Vec::new()),
1700            |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1701        )?;
1702
1703        // Modify all requests that match.
1704        let mut num_updated = 0;
1705        for entry in prev.iter_mut().filter(|entry| entry.parent_transaction_id == parent_txn_id) {
1706            entry.parent_key = Some(parent_key.clone());
1707            num_updated += 1;
1708        }
1709
1710        if num_updated > 0 {
1711            obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1712            tx.await.into_result()?;
1713        }
1714
1715        Ok(num_updated)
1716    }
1717
1718    async fn remove_dependent_queued_request(
1719        &self,
1720        room_id: &RoomId,
1721        txn_id: &ChildTransactionId,
1722    ) -> Result<bool> {
1723        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1724
1725        let tx = self.inner.transaction_on_one_with_mode(
1726            keys::DEPENDENT_SEND_QUEUE,
1727            IdbTransactionMode::Readwrite,
1728        )?;
1729
1730        let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1731
1732        // We store an encoded vector of the dependent requests.
1733        // Reload the previous vector for this room.
1734        if let Some(val) = obj.get(&encoded_key)?.await? {
1735            let mut prev = self.deserialize_value::<Vec<DependentQueuedRequest>>(&val)?;
1736            if let Some(pos) = prev.iter().position(|item| item.own_transaction_id == *txn_id) {
1737                prev.remove(pos);
1738
1739                if prev.is_empty() {
1740                    obj.delete(&encoded_key)?;
1741                } else {
1742                    obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1743                }
1744
1745                tx.await.into_result()?;
1746                return Ok(true);
1747            }
1748        }
1749
1750        Ok(false)
1751    }
1752
1753    async fn load_dependent_queued_requests(
1754        &self,
1755        room_id: &RoomId,
1756    ) -> Result<Vec<DependentQueuedRequest>> {
1757        let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1758
1759        // We store an encoded vector of the dependent requests.
1760        let prev = self
1761            .inner
1762            .transaction_on_one_with_mode(
1763                keys::DEPENDENT_SEND_QUEUE,
1764                IdbTransactionMode::Readwrite,
1765            )?
1766            .object_store(keys::DEPENDENT_SEND_QUEUE)?
1767            .get(&encoded_key)?
1768            .await?;
1769
1770        prev.map_or_else(
1771            || Ok(Vec::new()),
1772            |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1773        )
1774    }
1775});
1776
1777/// A room member.
1778#[derive(Debug, Serialize, Deserialize)]
1779struct RoomMember {
1780    user_id: OwnedUserId,
1781    membership: MembershipState,
1782}
1783
1784impl From<&SyncStateEvent<RoomMemberEventContent>> for RoomMember {
1785    fn from(event: &SyncStateEvent<RoomMemberEventContent>) -> Self {
1786        Self { user_id: event.state_key().clone(), membership: event.membership().clone() }
1787    }
1788}
1789
1790impl From<&StrippedRoomMemberEvent> for RoomMember {
1791    fn from(event: &StrippedRoomMemberEvent) -> Self {
1792        Self { user_id: event.state_key.clone(), membership: event.content.membership.clone() }
1793    }
1794}
1795
1796#[cfg(test)]
1797mod migration_tests {
1798    use assert_matches2::assert_matches;
1799    use matrix_sdk_base::store::{QueuedRequestKind, SerializableEventContent};
1800    use ruma::{
1801        events::room::message::RoomMessageEventContent, room_id, OwnedRoomId, OwnedTransactionId,
1802        TransactionId,
1803    };
1804    use serde::{Deserialize, Serialize};
1805
1806    use crate::state_store::PersistedQueuedRequest;
1807
1808    #[derive(Serialize, Deserialize)]
1809    struct OldPersistedQueuedRequest {
1810        room_id: OwnedRoomId,
1811        event: SerializableEventContent,
1812        transaction_id: OwnedTransactionId,
1813        is_wedged: bool,
1814    }
1815
1816    // We now persist an error when an event failed to send instead of just a
1817    // boolean. To support that, `PersistedQueueEvent` changed a bool to
1818    // Option<bool>, ensures that this work properly.
1819    #[test]
1820    fn test_migrating_persisted_queue_event_serialization() {
1821        let room_a_id = room_id!("!room_a:dummy.local");
1822        let transaction_id = TransactionId::new();
1823        let content =
1824            SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())
1825                .unwrap();
1826
1827        let old_persisted_queue_event = OldPersistedQueuedRequest {
1828            room_id: room_a_id.to_owned(),
1829            event: content,
1830            transaction_id: transaction_id.clone(),
1831            is_wedged: true,
1832        };
1833
1834        let serialized_persisted = serde_json::to_vec(&old_persisted_queue_event).unwrap();
1835
1836        // Load it with the new version.
1837        let new_persisted: PersistedQueuedRequest =
1838            serde_json::from_slice(&serialized_persisted).unwrap();
1839
1840        assert_eq!(new_persisted.is_wedged, Some(true));
1841        assert!(new_persisted.error.is_none());
1842
1843        assert!(new_persisted.event.is_some());
1844        assert!(new_persisted.kind.is_none());
1845
1846        let queued = new_persisted.into_queued_request().unwrap();
1847        assert_matches!(queued.kind, QueuedRequestKind::Event { .. });
1848        assert_eq!(queued.transaction_id, transaction_id);
1849        assert!(queued.error.is_some());
1850    }
1851}
1852
1853#[cfg(all(test, target_arch = "wasm32"))]
1854mod tests {
1855    #[cfg(target_arch = "wasm32")]
1856    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
1857
1858    use matrix_sdk_base::statestore_integration_tests;
1859    use uuid::Uuid;
1860
1861    use super::{IndexeddbStateStore, Result};
1862
1863    async fn get_store() -> Result<IndexeddbStateStore> {
1864        let db_name = format!("test-state-plain-{}", Uuid::new_v4().as_hyphenated());
1865        Ok(IndexeddbStateStore::builder().name(db_name).build().await?)
1866    }
1867
1868    statestore_integration_tests!();
1869}
1870
1871#[cfg(all(test, target_arch = "wasm32"))]
1872mod encrypted_tests {
1873    #[cfg(target_arch = "wasm32")]
1874    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
1875
1876    use matrix_sdk_base::statestore_integration_tests;
1877    use uuid::Uuid;
1878
1879    use super::{IndexeddbStateStore, Result};
1880
1881    async fn get_store() -> Result<IndexeddbStateStore> {
1882        let db_name = format!("test-state-encrypted-{}", Uuid::new_v4().as_hyphenated());
1883        let passphrase = format!("some_passphrase-{}", Uuid::new_v4().as_hyphenated());
1884        Ok(IndexeddbStateStore::builder().name(db_name).passphrase(passphrase).build().await?)
1885    }
1886
1887    statestore_integration_tests!();
1888}