1use std::{
16 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
17 sync::Arc,
18};
19
20use anyhow::anyhow;
21use async_trait::async_trait;
22use gloo_utils::format::JsValueSerdeExt;
23use growable_bloom_filter::GrowableBloom;
24use indexed_db_futures::prelude::*;
25use matrix_sdk_base::{
26 deserialized_responses::{DisplayName, RawAnySyncOrStrippedState},
27 store::{
28 ChildTransactionId, ComposerDraft, DependentQueuedRequest, DependentQueuedRequestKind,
29 QueuedRequest, QueuedRequestKind, SentRequestKey, SerializableEventContent,
30 ServerCapabilities, StateChanges, StateStore, StoreError,
31 },
32 MinimalRoomMemberEvent, RoomInfo, RoomMemberships, StateStoreDataKey, StateStoreDataValue,
33};
34use matrix_sdk_store_encryption::{Error as EncryptionError, StoreCipher};
35use ruma::{
36 canonical_json::{redact, RedactedBecause},
37 events::{
38 presence::PresenceEvent,
39 receipt::{Receipt, ReceiptThread, ReceiptType},
40 room::member::{
41 MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent, SyncRoomMemberEvent,
42 },
43 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
44 GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, SyncStateEvent,
45 },
46 serde::Raw,
47 CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri,
48 OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, RoomVersionId, TransactionId, UserId,
49};
50use serde::{de::DeserializeOwned, Deserialize, Serialize};
51use tracing::{debug, warn};
52use wasm_bindgen::JsValue;
53use web_sys::IdbKeyRange;
54
55mod migrations;
56
57pub use self::migrations::MigrationConflictStrategy;
58use self::migrations::{upgrade_inner_db, upgrade_meta_db};
59use crate::safe_encode::SafeEncode;
60
61#[derive(Debug, thiserror::Error)]
62pub enum IndexeddbStateStoreError {
63 #[error(transparent)]
64 Json(#[from] serde_json::Error),
65 #[error(transparent)]
66 Encryption(#[from] EncryptionError),
67 #[error("DomException {name} ({code}): {message}")]
68 DomException { name: String, message: String, code: u16 },
69 #[error(transparent)]
70 StoreError(#[from] StoreError),
71 #[error("Can't migrate {name} from {old_version} to {new_version} without deleting data. See MigrationConflictStrategy for ways to configure.")]
72 MigrationConflict { name: String, old_version: u32, new_version: u32 },
73}
74
75impl From<web_sys::DomException> for IndexeddbStateStoreError {
76 fn from(frm: web_sys::DomException) -> IndexeddbStateStoreError {
77 IndexeddbStateStoreError::DomException {
78 name: frm.name(),
79 message: frm.message(),
80 code: frm.code(),
81 }
82 }
83}
84
85impl From<IndexeddbStateStoreError> for StoreError {
86 fn from(e: IndexeddbStateStoreError) -> Self {
87 match e {
88 IndexeddbStateStoreError::Json(e) => StoreError::Json(e),
89 IndexeddbStateStoreError::StoreError(e) => e,
90 IndexeddbStateStoreError::Encryption(e) => StoreError::Encryption(e),
91 _ => StoreError::backend(e),
92 }
93 }
94}
95
96mod keys {
97 pub const INTERNAL_STATE: &str = "matrix-sdk-state";
98 pub const BACKUPS_META: &str = "backups";
99
100 pub const ACCOUNT_DATA: &str = "account_data";
101
102 pub const PROFILES: &str = "profiles";
103 pub const DISPLAY_NAMES: &str = "display_names";
104 pub const USER_IDS: &str = "user_ids";
105
106 pub const ROOM_STATE: &str = "room_state";
107 pub const ROOM_INFOS: &str = "room_infos";
108 pub const PRESENCE: &str = "presence";
109 pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
110 pub const ROOM_SEND_QUEUE: &str = "room_send_queue";
112 pub const DEPENDENT_SEND_QUEUE: &str = "room_dependent_send_queue";
114
115 pub const STRIPPED_ROOM_STATE: &str = "stripped_room_state";
116 pub const STRIPPED_USER_IDS: &str = "stripped_user_ids";
117
118 pub const ROOM_USER_RECEIPTS: &str = "room_user_receipts";
119 pub const ROOM_EVENT_RECEIPTS: &str = "room_event_receipts";
120
121 pub const CUSTOM: &str = "custom";
122 pub const KV: &str = "kv";
123
124 pub const ALL_STORES: &[&str] = &[
126 ACCOUNT_DATA,
127 PROFILES,
128 DISPLAY_NAMES,
129 USER_IDS,
130 ROOM_STATE,
131 ROOM_INFOS,
132 PRESENCE,
133 ROOM_ACCOUNT_DATA,
134 STRIPPED_ROOM_STATE,
135 STRIPPED_USER_IDS,
136 ROOM_USER_RECEIPTS,
137 ROOM_EVENT_RECEIPTS,
138 ROOM_SEND_QUEUE,
139 DEPENDENT_SEND_QUEUE,
140 CUSTOM,
141 KV,
142 ];
143
144 pub const STORE_KEY: &str = "store_key";
147}
148
149pub use keys::ALL_STORES;
150use matrix_sdk_base::store::QueueWedgeError;
151
152fn serialize_value(store_cipher: Option<&StoreCipher>, event: &impl Serialize) -> Result<JsValue> {
154 Ok(match store_cipher {
155 Some(cipher) => {
156 let data = serde_json::to_vec(event)?;
157 JsValue::from_serde(&cipher.encrypt_value_data(data)?)?
158 }
159 None => JsValue::from_serde(event)?,
160 })
161}
162
163fn deserialize_value<T: DeserializeOwned>(
165 store_cipher: Option<&StoreCipher>,
166 event: &JsValue,
167) -> Result<T> {
168 match store_cipher {
169 Some(cipher) => {
170 use zeroize::Zeroize;
171 let mut plaintext = cipher.decrypt_value_data(event.into_serde()?)?;
172 let ret = serde_json::from_slice(&plaintext);
173 plaintext.zeroize();
174 Ok(ret?)
175 }
176 None => Ok(event.into_serde()?),
177 }
178}
179
180fn encode_key<T>(store_cipher: Option<&StoreCipher>, table_name: &str, key: T) -> JsValue
181where
182 T: SafeEncode,
183{
184 match store_cipher {
185 Some(cipher) => key.as_secure_string(table_name, cipher),
186 None => key.as_encoded_string(),
187 }
188 .into()
189}
190
191fn encode_to_range<T>(
192 store_cipher: Option<&StoreCipher>,
193 table_name: &str,
194 key: T,
195) -> Result<IdbKeyRange>
196where
197 T: SafeEncode,
198{
199 match store_cipher {
200 Some(cipher) => key.encode_to_range_secure(table_name, cipher),
201 None => key.encode_to_range(),
202 }
203 .map_err(|e| IndexeddbStateStoreError::StoreError(StoreError::Backend(anyhow!(e).into())))
204}
205
206#[derive(Debug)]
208pub struct IndexeddbStateStoreBuilder {
209 name: Option<String>,
210 passphrase: Option<String>,
211 migration_conflict_strategy: MigrationConflictStrategy,
212}
213
214impl IndexeddbStateStoreBuilder {
215 fn new() -> Self {
216 Self {
217 name: None,
218 passphrase: None,
219 migration_conflict_strategy: MigrationConflictStrategy::BackupAndDrop,
220 }
221 }
222
223 pub fn name(mut self, value: String) -> Self {
225 self.name = Some(value);
226 self
227 }
228
229 pub fn passphrase(mut self, value: String) -> Self {
233 self.passphrase = Some(value);
234 self
235 }
236
237 pub fn migration_conflict_strategy(mut self, value: MigrationConflictStrategy) -> Self {
241 self.migration_conflict_strategy = value;
242 self
243 }
244
245 pub async fn build(self) -> Result<IndexeddbStateStore> {
246 let migration_strategy = self.migration_conflict_strategy.clone();
247 let name = self.name.unwrap_or_else(|| "state".to_owned());
248
249 let meta_name = format!("{name}::{}", keys::INTERNAL_STATE);
250
251 let (meta, store_cipher) = upgrade_meta_db(&meta_name, self.passphrase.as_deref()).await?;
252 let inner =
253 upgrade_inner_db(&name, store_cipher.as_deref(), migration_strategy, &meta).await?;
254
255 Ok(IndexeddbStateStore { name, inner, meta, store_cipher })
256 }
257}
258
259pub struct IndexeddbStateStore {
260 name: String,
261 pub(crate) inner: IdbDatabase,
262 pub(crate) meta: IdbDatabase,
263 pub(crate) store_cipher: Option<Arc<StoreCipher>>,
264}
265
266#[cfg(not(tarpaulin_include))]
267impl std::fmt::Debug for IndexeddbStateStore {
268 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269 f.debug_struct("IndexeddbStateStore").field("name", &self.name).finish()
270 }
271}
272
273type Result<A, E = IndexeddbStateStoreError> = std::result::Result<A, E>;
274
275impl IndexeddbStateStore {
276 pub fn builder() -> IndexeddbStateStoreBuilder {
278 IndexeddbStateStoreBuilder::new()
279 }
280
281 pub fn version(&self) -> u32 {
283 self.inner.version() as u32
284 }
285
286 pub fn meta_version(&self) -> u32 {
288 self.meta.version() as u32
289 }
290
291 pub async fn has_backups(&self) -> Result<bool> {
293 Ok(self
294 .meta
295 .transaction_on_one_with_mode(keys::BACKUPS_META, IdbTransactionMode::Readonly)?
296 .object_store(keys::BACKUPS_META)?
297 .count()?
298 .await?
299 > 0)
300 }
301
302 pub async fn latest_backup(&self) -> Result<Option<String>> {
304 Ok(self
305 .meta
306 .transaction_on_one_with_mode(keys::BACKUPS_META, IdbTransactionMode::Readonly)?
307 .object_store(keys::BACKUPS_META)?
308 .open_cursor_with_direction(IdbCursorDirection::Prev)?
309 .await?
310 .and_then(|c| c.value().as_string()))
311 }
312
313 fn serialize_value(&self, event: &impl Serialize) -> Result<JsValue> {
315 serialize_value(self.store_cipher.as_deref(), event)
316 }
317
318 fn deserialize_value<T: DeserializeOwned>(&self, event: &JsValue) -> Result<T> {
320 deserialize_value(self.store_cipher.as_deref(), event)
321 }
322
323 fn encode_key<T>(&self, table_name: &str, key: T) -> JsValue
324 where
325 T: SafeEncode,
326 {
327 encode_key(self.store_cipher.as_deref(), table_name, key)
328 }
329
330 fn encode_to_range<T>(&self, table_name: &str, key: T) -> Result<IdbKeyRange>
331 where
332 T: SafeEncode,
333 {
334 encode_to_range(self.store_cipher.as_deref(), table_name, key)
335 }
336
337 pub async fn get_user_ids_inner(
340 &self,
341 room_id: &RoomId,
342 memberships: RoomMemberships,
343 stripped: bool,
344 ) -> Result<Vec<OwnedUserId>> {
345 let store_name = if stripped { keys::STRIPPED_USER_IDS } else { keys::USER_IDS };
346
347 let tx =
348 self.inner.transaction_on_one_with_mode(store_name, IdbTransactionMode::Readonly)?;
349 let store = tx.object_store(store_name)?;
350 let range = self.encode_to_range(store_name, room_id)?;
351
352 let user_ids = if memberships.is_empty() {
353 store
355 .get_all_with_key(&range)?
356 .await?
357 .iter()
358 .filter_map(|f| self.deserialize_value::<RoomMember>(&f).ok().map(|m| m.user_id))
359 .collect::<Vec<_>>()
360 } else {
361 let mut user_ids = Vec::new();
362 let cursor = store.open_cursor_with_range(&range)?.await?;
363
364 if let Some(cursor) = cursor {
365 loop {
366 let value = cursor.value();
367 let member = self.deserialize_value::<RoomMember>(&value)?;
368
369 if memberships.matches(&member.membership) {
370 user_ids.push(member.user_id);
371 }
372
373 if !cursor.continue_cursor()?.await? {
374 break;
375 }
376 }
377 }
378
379 user_ids
380 };
381
382 Ok(user_ids)
383 }
384
385 async fn get_custom_value_for_js(&self, jskey: &JsValue) -> Result<Option<Vec<u8>>> {
386 self.inner
387 .transaction_on_one_with_mode(keys::CUSTOM, IdbTransactionMode::Readonly)?
388 .object_store(keys::CUSTOM)?
389 .get(jskey)?
390 .await?
391 .map(|f| self.deserialize_value(&f))
392 .transpose()
393 }
394
395 fn encode_kv_data_key(&self, key: StateStoreDataKey<'_>) -> JsValue {
396 match key {
400 StateStoreDataKey::SyncToken => {
401 self.encode_key(StateStoreDataKey::SYNC_TOKEN, StateStoreDataKey::SYNC_TOKEN)
402 }
403 StateStoreDataKey::ServerCapabilities => self.encode_key(
404 StateStoreDataKey::SERVER_CAPABILITIES,
405 StateStoreDataKey::SERVER_CAPABILITIES,
406 ),
407 StateStoreDataKey::Filter(filter_name) => {
408 self.encode_key(StateStoreDataKey::FILTER, (StateStoreDataKey::FILTER, filter_name))
409 }
410 StateStoreDataKey::UserAvatarUrl(user_id) => {
411 self.encode_key(keys::KV, (StateStoreDataKey::USER_AVATAR_URL, user_id))
412 }
413 StateStoreDataKey::RecentlyVisitedRooms(user_id) => {
414 self.encode_key(keys::KV, (StateStoreDataKey::RECENTLY_VISITED_ROOMS, user_id))
415 }
416 StateStoreDataKey::UtdHookManagerData => {
417 self.encode_key(keys::KV, StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
418 }
419 StateStoreDataKey::ComposerDraft(room_id) => {
420 self.encode_key(keys::KV, (StateStoreDataKey::COMPOSER_DRAFT, room_id))
421 }
422 StateStoreDataKey::SeenKnockRequests(room_id) => {
423 self.encode_key(keys::KV, (StateStoreDataKey::SEEN_KNOCK_REQUESTS, room_id))
424 }
425 }
426 }
427}
428
429#[derive(Serialize, Deserialize)]
432struct PersistedQueuedRequest {
433 pub room_id: OwnedRoomId,
435
436 kind: Option<QueuedRequestKind>,
439 transaction_id: OwnedTransactionId,
440
441 pub error: Option<QueueWedgeError>,
442
443 priority: Option<usize>,
444
445 #[serde(default = "created_now")]
447 created_at: MilliSecondsSinceUnixEpoch,
448
449 is_wedged: Option<bool>,
452
453 event: Option<SerializableEventContent>,
454}
455
456fn created_now() -> MilliSecondsSinceUnixEpoch {
457 MilliSecondsSinceUnixEpoch::now()
458}
459
460impl PersistedQueuedRequest {
461 fn into_queued_request(self) -> Option<QueuedRequest> {
462 let kind =
463 self.kind.or_else(|| self.event.map(|content| QueuedRequestKind::Event { content }))?;
464
465 let error = match self.is_wedged {
466 Some(true) => {
467 Some(QueueWedgeError::GenericApiError {
469 msg: "local echo failed to send in a previous session".into(),
470 })
471 }
472 _ => self.error,
473 };
474
475 let priority = self.priority.unwrap_or(0);
477
478 Some(QueuedRequest {
479 kind,
480 transaction_id: self.transaction_id,
481 error,
482 priority,
483 created_at: self.created_at,
484 })
485 }
486}
487
488#[cfg(target_arch = "wasm32")]
497macro_rules! impl_state_store {
498 ({ $($body:tt)* }) => {
499 #[async_trait(?Send)]
500 impl StateStore for IndexeddbStateStore {
501 type Error = IndexeddbStateStoreError;
502
503 $($body)*
504 }
505 };
506}
507
508#[cfg(not(target_arch = "wasm32"))]
509macro_rules! impl_state_store {
510 ({ $($body:tt)* }) => {
511 impl IndexeddbStateStore {
512 $($body)*
513 }
514 };
515}
516
517impl_state_store!({
518 async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
519 let encoded_key = self.encode_kv_data_key(key);
520
521 let value = self
522 .inner
523 .transaction_on_one_with_mode(keys::KV, IdbTransactionMode::Readonly)?
524 .object_store(keys::KV)?
525 .get(&encoded_key)?
526 .await?;
527
528 let value = match key {
529 StateStoreDataKey::SyncToken => value
530 .map(|f| self.deserialize_value::<String>(&f))
531 .transpose()?
532 .map(StateStoreDataValue::SyncToken),
533 StateStoreDataKey::ServerCapabilities => value
534 .map(|f| self.deserialize_value::<ServerCapabilities>(&f))
535 .transpose()?
536 .map(StateStoreDataValue::ServerCapabilities),
537 StateStoreDataKey::Filter(_) => value
538 .map(|f| self.deserialize_value::<String>(&f))
539 .transpose()?
540 .map(StateStoreDataValue::Filter),
541 StateStoreDataKey::UserAvatarUrl(_) => value
542 .map(|f| self.deserialize_value::<OwnedMxcUri>(&f))
543 .transpose()?
544 .map(StateStoreDataValue::UserAvatarUrl),
545 StateStoreDataKey::RecentlyVisitedRooms(_) => value
546 .map(|f| self.deserialize_value::<Vec<OwnedRoomId>>(&f))
547 .transpose()?
548 .map(StateStoreDataValue::RecentlyVisitedRooms),
549 StateStoreDataKey::UtdHookManagerData => value
550 .map(|f| self.deserialize_value::<GrowableBloom>(&f))
551 .transpose()?
552 .map(StateStoreDataValue::UtdHookManagerData),
553 StateStoreDataKey::ComposerDraft(_) => value
554 .map(|f| self.deserialize_value::<ComposerDraft>(&f))
555 .transpose()?
556 .map(StateStoreDataValue::ComposerDraft),
557 StateStoreDataKey::SeenKnockRequests(_) => value
558 .map(|f| self.deserialize_value::<BTreeMap<OwnedEventId, OwnedUserId>>(&f))
559 .transpose()?
560 .map(StateStoreDataValue::SeenKnockRequests),
561 };
562
563 Ok(value)
564 }
565
566 async fn set_kv_data(
567 &self,
568 key: StateStoreDataKey<'_>,
569 value: StateStoreDataValue,
570 ) -> Result<()> {
571 let encoded_key = self.encode_kv_data_key(key);
572
573 let serialized_value = match key {
574 StateStoreDataKey::SyncToken => self
575 .serialize_value(&value.into_sync_token().expect("Session data not a sync token")),
576 StateStoreDataKey::ServerCapabilities => self.serialize_value(
577 &value
578 .into_server_capabilities()
579 .expect("Session data not containing server capabilities"),
580 ),
581 StateStoreDataKey::Filter(_) => {
582 self.serialize_value(&value.into_filter().expect("Session data not a filter"))
583 }
584 StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
585 &value.into_user_avatar_url().expect("Session data not an user avatar url"),
586 ),
587 StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
588 &value
589 .into_recently_visited_rooms()
590 .expect("Session data not a recently visited room list"),
591 ),
592 StateStoreDataKey::UtdHookManagerData => self.serialize_value(
593 &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
594 ),
595 StateStoreDataKey::ComposerDraft(_) => self.serialize_value(
596 &value.into_composer_draft().expect("Session data not a composer draft"),
597 ),
598 StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
599 &value
600 .into_seen_knock_requests()
601 .expect("Session data is not a set of seen knock request ids"),
602 ),
603 };
604
605 let tx =
606 self.inner.transaction_on_one_with_mode(keys::KV, IdbTransactionMode::Readwrite)?;
607
608 let obj = tx.object_store(keys::KV)?;
609
610 obj.put_key_val(&encoded_key, &serialized_value?)?;
611
612 tx.await.into_result()?;
613
614 Ok(())
615 }
616
617 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
618 let encoded_key = self.encode_kv_data_key(key);
619
620 let tx =
621 self.inner.transaction_on_one_with_mode(keys::KV, IdbTransactionMode::Readwrite)?;
622 let obj = tx.object_store(keys::KV)?;
623
624 obj.delete(&encoded_key)?;
625
626 tx.await.into_result()?;
627
628 Ok(())
629 }
630
631 async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
632 let mut stores: HashSet<&'static str> = [
633 (changes.sync_token.is_some(), keys::KV),
634 (!changes.ambiguity_maps.is_empty(), keys::DISPLAY_NAMES),
635 (!changes.account_data.is_empty(), keys::ACCOUNT_DATA),
636 (!changes.presence.is_empty(), keys::PRESENCE),
637 (
638 !changes.profiles.is_empty() || !changes.profiles_to_delete.is_empty(),
639 keys::PROFILES,
640 ),
641 (!changes.room_account_data.is_empty(), keys::ROOM_ACCOUNT_DATA),
642 (!changes.receipts.is_empty(), keys::ROOM_EVENT_RECEIPTS),
643 ]
644 .iter()
645 .filter_map(|(id, key)| if *id { Some(*key) } else { None })
646 .collect();
647
648 if !changes.state.is_empty() {
649 stores.extend([
650 keys::ROOM_STATE,
651 keys::USER_IDS,
652 keys::STRIPPED_USER_IDS,
653 keys::STRIPPED_ROOM_STATE,
654 keys::PROFILES,
655 ]);
656 }
657
658 if !changes.redactions.is_empty() {
659 stores.extend([keys::ROOM_STATE, keys::ROOM_INFOS]);
660 }
661
662 if !changes.room_infos.is_empty() {
663 stores.insert(keys::ROOM_INFOS);
664 }
665
666 if !changes.stripped_state.is_empty() {
667 stores.extend([keys::STRIPPED_ROOM_STATE, keys::STRIPPED_USER_IDS]);
668 }
669
670 if !changes.receipts.is_empty() {
671 stores.extend([keys::ROOM_EVENT_RECEIPTS, keys::ROOM_USER_RECEIPTS])
672 }
673
674 if stores.is_empty() {
675 return Ok(());
677 }
678
679 let stores: Vec<&'static str> = stores.into_iter().collect();
680 let tx =
681 self.inner.transaction_on_multi_with_mode(&stores, IdbTransactionMode::Readwrite)?;
682
683 if let Some(s) = &changes.sync_token {
684 tx.object_store(keys::KV)?.put_key_val(
685 &self.encode_kv_data_key(StateStoreDataKey::SyncToken),
686 &self.serialize_value(s)?,
687 )?;
688 }
689
690 if !changes.ambiguity_maps.is_empty() {
691 let store = tx.object_store(keys::DISPLAY_NAMES)?;
692 for (room_id, ambiguity_maps) in &changes.ambiguity_maps {
693 for (display_name, map) in ambiguity_maps {
694 let key = self.encode_key(
695 keys::DISPLAY_NAMES,
696 (
697 room_id,
698 display_name
699 .as_normalized_str()
700 .unwrap_or_else(|| display_name.as_raw_str()),
701 ),
702 );
703
704 store.put_key_val(&key, &self.serialize_value(&map)?)?;
705 }
706 }
707 }
708
709 if !changes.account_data.is_empty() {
710 let store = tx.object_store(keys::ACCOUNT_DATA)?;
711 for (event_type, event) in &changes.account_data {
712 store.put_key_val(
713 &self.encode_key(keys::ACCOUNT_DATA, event_type),
714 &self.serialize_value(&event)?,
715 )?;
716 }
717 }
718
719 if !changes.room_account_data.is_empty() {
720 let store = tx.object_store(keys::ROOM_ACCOUNT_DATA)?;
721 for (room, events) in &changes.room_account_data {
722 for (event_type, event) in events {
723 let key = self.encode_key(keys::ROOM_ACCOUNT_DATA, (room, event_type));
724 store.put_key_val(&key, &self.serialize_value(&event)?)?;
725 }
726 }
727 }
728
729 if !changes.state.is_empty() {
730 let state = tx.object_store(keys::ROOM_STATE)?;
731 let profiles = tx.object_store(keys::PROFILES)?;
732 let user_ids = tx.object_store(keys::USER_IDS)?;
733 let stripped_state = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
734 let stripped_user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
735
736 for (room, user_ids) in &changes.profiles_to_delete {
737 for user_id in user_ids {
738 let key = self.encode_key(keys::PROFILES, (room, user_id));
739 profiles.delete(&key)?;
740 }
741 }
742
743 for (room, event_types) in &changes.state {
744 let profile_changes = changes.profiles.get(room);
745
746 for (event_type, events) in event_types {
747 for (state_key, raw_event) in events {
748 let key = self.encode_key(keys::ROOM_STATE, (room, event_type, state_key));
749 state.put_key_val(&key, &self.serialize_value(&raw_event)?)?;
750 stripped_state.delete(&key)?;
751
752 if *event_type == StateEventType::RoomMember {
753 let event = match raw_event.deserialize_as::<SyncRoomMemberEvent>() {
754 Ok(ev) => ev,
755 Err(e) => {
756 let event_id: Option<String> =
757 raw_event.get_field("event_id").ok().flatten();
758 debug!(event_id, "Failed to deserialize member event: {e}");
759 continue;
760 }
761 };
762
763 let key = (room, state_key);
764
765 stripped_user_ids
766 .delete(&self.encode_key(keys::STRIPPED_USER_IDS, key))?;
767
768 user_ids.put_key_val_owned(
769 self.encode_key(keys::USER_IDS, key),
770 &self.serialize_value(&RoomMember::from(&event))?,
771 )?;
772
773 if let Some(profile) =
774 profile_changes.and_then(|p| p.get(event.state_key()))
775 {
776 profiles.put_key_val_owned(
777 self.encode_key(keys::PROFILES, key),
778 &self.serialize_value(&profile)?,
779 )?;
780 }
781 }
782 }
783 }
784 }
785 }
786
787 if !changes.room_infos.is_empty() {
788 let room_infos = tx.object_store(keys::ROOM_INFOS)?;
789 for (room_id, room_info) in &changes.room_infos {
790 room_infos.put_key_val(
791 &self.encode_key(keys::ROOM_INFOS, room_id),
792 &self.serialize_value(&room_info)?,
793 )?;
794 }
795 }
796
797 if !changes.presence.is_empty() {
798 let store = tx.object_store(keys::PRESENCE)?;
799 for (sender, event) in &changes.presence {
800 store.put_key_val(
801 &self.encode_key(keys::PRESENCE, sender),
802 &self.serialize_value(&event)?,
803 )?;
804 }
805 }
806
807 if !changes.stripped_state.is_empty() {
808 let store = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
809 let user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
810
811 for (room, event_types) in &changes.stripped_state {
812 for (event_type, events) in event_types {
813 for (state_key, raw_event) in events {
814 let key = self
815 .encode_key(keys::STRIPPED_ROOM_STATE, (room, event_type, state_key));
816 store.put_key_val(&key, &self.serialize_value(&raw_event)?)?;
817
818 if *event_type == StateEventType::RoomMember {
819 let event = match raw_event.deserialize_as::<StrippedRoomMemberEvent>()
820 {
821 Ok(ev) => ev,
822 Err(e) => {
823 let event_id: Option<String> =
824 raw_event.get_field("event_id").ok().flatten();
825 debug!(
826 event_id,
827 "Failed to deserialize stripped member event: {e}"
828 );
829 continue;
830 }
831 };
832
833 let key = (room, state_key);
834
835 user_ids.put_key_val_owned(
836 self.encode_key(keys::STRIPPED_USER_IDS, key),
837 &self.serialize_value(&RoomMember::from(&event))?,
838 )?;
839 }
840 }
841 }
842 }
843 }
844
845 if !changes.receipts.is_empty() {
846 let room_user_receipts = tx.object_store(keys::ROOM_USER_RECEIPTS)?;
847 let room_event_receipts = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
848
849 for (room, content) in &changes.receipts {
850 for (event_id, receipts) in &content.0 {
851 for (receipt_type, receipts) in receipts {
852 for (user_id, receipt) in receipts {
853 let key = match receipt.thread.as_str() {
854 Some(thread_id) => self.encode_key(
855 keys::ROOM_USER_RECEIPTS,
856 (room, receipt_type, thread_id, user_id),
857 ),
858 None => self.encode_key(
859 keys::ROOM_USER_RECEIPTS,
860 (room, receipt_type, user_id),
861 ),
862 };
863
864 if let Some((old_event, _)) =
865 room_user_receipts.get(&key)?.await?.and_then(|f| {
866 self.deserialize_value::<(OwnedEventId, Receipt)>(&f).ok()
867 })
868 {
869 let key = match receipt.thread.as_str() {
870 Some(thread_id) => self.encode_key(
871 keys::ROOM_EVENT_RECEIPTS,
872 (room, receipt_type, thread_id, old_event, user_id),
873 ),
874 None => self.encode_key(
875 keys::ROOM_EVENT_RECEIPTS,
876 (room, receipt_type, old_event, user_id),
877 ),
878 };
879 room_event_receipts.delete(&key)?;
880 }
881
882 room_user_receipts
883 .put_key_val(&key, &self.serialize_value(&(event_id, receipt))?)?;
884
885 let key = match receipt.thread.as_str() {
887 Some(thread_id) => self.encode_key(
888 keys::ROOM_EVENT_RECEIPTS,
889 (room, receipt_type, thread_id, event_id, user_id),
890 ),
891 None => self.encode_key(
892 keys::ROOM_EVENT_RECEIPTS,
893 (room, receipt_type, event_id, user_id),
894 ),
895 };
896 room_event_receipts
897 .put_key_val(&key, &self.serialize_value(&(user_id, receipt))?)?;
898 }
899 }
900 }
901 }
902 }
903
904 if !changes.redactions.is_empty() {
905 let state = tx.object_store(keys::ROOM_STATE)?;
906 let room_info = tx.object_store(keys::ROOM_INFOS)?;
907
908 for (room_id, redactions) in &changes.redactions {
909 let range = self.encode_to_range(keys::ROOM_STATE, room_id)?;
910 let Some(cursor) = state.open_cursor_with_range(&range)?.await? else { continue };
911
912 let mut room_version = None;
913
914 while let Some(key) = cursor.key() {
915 let raw_evt =
916 self.deserialize_value::<Raw<AnySyncStateEvent>>(&cursor.value())?;
917 if let Ok(Some(event_id)) = raw_evt.get_field::<OwnedEventId>("event_id") {
918 if let Some(redaction) = redactions.get(&event_id) {
919 let version = {
920 if room_version.is_none() {
921 room_version.replace(room_info
922 .get(&self.encode_key(keys::ROOM_INFOS, room_id))?
923 .await?
924 .and_then(|f| self.deserialize_value::<RoomInfo>(&f).ok())
925 .and_then(|info| info.room_version().cloned())
926 .unwrap_or_else(|| {
927 warn!(?room_id, "Unable to find the room version, assume version 9");
928 RoomVersionId::V9
929 })
930 );
931 }
932 room_version.as_ref().unwrap()
933 };
934
935 let redacted = redact(
936 raw_evt.deserialize_as::<CanonicalJsonObject>()?,
937 version,
938 Some(RedactedBecause::from_raw_event(redaction)?),
939 )
940 .map_err(StoreError::Redaction)?;
941 state.put_key_val(&key, &self.serialize_value(&redacted)?)?;
942 }
943 }
944
945 cursor.advance(1)?.await?;
947 }
948 }
949 }
950
951 tx.await.into_result().map_err(|e| e.into())
952 }
953
954 async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
955 self.inner
956 .transaction_on_one_with_mode(keys::PRESENCE, IdbTransactionMode::Readonly)?
957 .object_store(keys::PRESENCE)?
958 .get(&self.encode_key(keys::PRESENCE, user_id))?
959 .await?
960 .map(|f| self.deserialize_value(&f))
961 .transpose()
962 }
963
964 async fn get_presence_events(
965 &self,
966 user_ids: &[OwnedUserId],
967 ) -> Result<Vec<Raw<PresenceEvent>>> {
968 if user_ids.is_empty() {
969 return Ok(Vec::new());
970 }
971
972 let txn = self
973 .inner
974 .transaction_on_one_with_mode(keys::PRESENCE, IdbTransactionMode::Readonly)?;
975 let store = txn.object_store(keys::PRESENCE)?;
976
977 let mut events = Vec::with_capacity(user_ids.len());
978
979 for user_id in user_ids {
980 if let Some(event) = store
981 .get(&self.encode_key(keys::PRESENCE, user_id))?
982 .await?
983 .map(|f| self.deserialize_value(&f))
984 .transpose()?
985 {
986 events.push(event)
987 }
988 }
989
990 Ok(events)
991 }
992
993 async fn get_state_event(
994 &self,
995 room_id: &RoomId,
996 event_type: StateEventType,
997 state_key: &str,
998 ) -> Result<Option<RawAnySyncOrStrippedState>> {
999 Ok(self
1000 .get_state_events_for_keys(room_id, event_type, &[state_key])
1001 .await?
1002 .into_iter()
1003 .next())
1004 }
1005
1006 async fn get_state_events(
1007 &self,
1008 room_id: &RoomId,
1009 event_type: StateEventType,
1010 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1011 let stripped_range =
1012 self.encode_to_range(keys::STRIPPED_ROOM_STATE, (room_id, &event_type))?;
1013 let stripped_events = self
1014 .inner
1015 .transaction_on_one_with_mode(keys::STRIPPED_ROOM_STATE, IdbTransactionMode::Readonly)?
1016 .object_store(keys::STRIPPED_ROOM_STATE)?
1017 .get_all_with_key(&stripped_range)?
1018 .await?
1019 .iter()
1020 .filter_map(|f| {
1021 self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Stripped)
1022 })
1023 .collect::<Vec<_>>();
1024
1025 if !stripped_events.is_empty() {
1026 return Ok(stripped_events);
1027 }
1028
1029 let range = self.encode_to_range(keys::ROOM_STATE, (room_id, event_type))?;
1030 Ok(self
1031 .inner
1032 .transaction_on_one_with_mode(keys::ROOM_STATE, IdbTransactionMode::Readonly)?
1033 .object_store(keys::ROOM_STATE)?
1034 .get_all_with_key(&range)?
1035 .await?
1036 .iter()
1037 .filter_map(|f| self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Sync))
1038 .collect::<Vec<_>>())
1039 }
1040
1041 async fn get_state_events_for_keys(
1042 &self,
1043 room_id: &RoomId,
1044 event_type: StateEventType,
1045 state_keys: &[&str],
1046 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1047 if state_keys.is_empty() {
1048 return Ok(Vec::new());
1049 }
1050
1051 let mut events = Vec::with_capacity(state_keys.len());
1052
1053 {
1054 let txn = self.inner.transaction_on_one_with_mode(
1055 keys::STRIPPED_ROOM_STATE,
1056 IdbTransactionMode::Readonly,
1057 )?;
1058 let store = txn.object_store(keys::STRIPPED_ROOM_STATE)?;
1059
1060 for state_key in state_keys {
1061 if let Some(event) =
1062 store
1063 .get(&self.encode_key(
1064 keys::STRIPPED_ROOM_STATE,
1065 (room_id, &event_type, state_key),
1066 ))?
1067 .await?
1068 .map(|f| self.deserialize_value(&f))
1069 .transpose()?
1070 {
1071 events.push(RawAnySyncOrStrippedState::Stripped(event));
1072 }
1073 }
1074
1075 if !events.is_empty() {
1076 return Ok(events);
1077 }
1078 }
1079
1080 let txn = self
1081 .inner
1082 .transaction_on_one_with_mode(keys::ROOM_STATE, IdbTransactionMode::Readonly)?;
1083 let store = txn.object_store(keys::ROOM_STATE)?;
1084
1085 for state_key in state_keys {
1086 if let Some(event) = store
1087 .get(&self.encode_key(keys::ROOM_STATE, (room_id, &event_type, state_key)))?
1088 .await?
1089 .map(|f| self.deserialize_value(&f))
1090 .transpose()?
1091 {
1092 events.push(RawAnySyncOrStrippedState::Sync(event));
1093 }
1094 }
1095
1096 Ok(events)
1097 }
1098
1099 async fn get_profile(
1100 &self,
1101 room_id: &RoomId,
1102 user_id: &UserId,
1103 ) -> Result<Option<MinimalRoomMemberEvent>> {
1104 self.inner
1105 .transaction_on_one_with_mode(keys::PROFILES, IdbTransactionMode::Readonly)?
1106 .object_store(keys::PROFILES)?
1107 .get(&self.encode_key(keys::PROFILES, (room_id, user_id)))?
1108 .await?
1109 .map(|f| self.deserialize_value(&f))
1110 .transpose()
1111 }
1112
1113 async fn get_profiles<'a>(
1114 &self,
1115 room_id: &RoomId,
1116 user_ids: &'a [OwnedUserId],
1117 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1118 if user_ids.is_empty() {
1119 return Ok(BTreeMap::new());
1120 }
1121
1122 let txn = self
1123 .inner
1124 .transaction_on_one_with_mode(keys::PROFILES, IdbTransactionMode::Readonly)?;
1125 let store = txn.object_store(keys::PROFILES)?;
1126
1127 let mut profiles = BTreeMap::new();
1128 for user_id in user_ids {
1129 if let Some(profile) = store
1130 .get(&self.encode_key(keys::PROFILES, (room_id, user_id)))?
1131 .await?
1132 .map(|f| self.deserialize_value(&f))
1133 .transpose()?
1134 {
1135 profiles.insert(user_id.as_ref(), profile);
1136 }
1137 }
1138
1139 Ok(profiles)
1140 }
1141
1142 async fn get_room_infos(&self) -> Result<Vec<RoomInfo>> {
1143 let entries: Vec<_> = self
1144 .inner
1145 .transaction_on_one_with_mode(keys::ROOM_INFOS, IdbTransactionMode::Readonly)?
1146 .object_store(keys::ROOM_INFOS)?
1147 .get_all()?
1148 .await?
1149 .iter()
1150 .filter_map(|f| self.deserialize_value::<RoomInfo>(&f).ok())
1151 .collect();
1152
1153 Ok(entries)
1154 }
1155
1156 async fn get_users_with_display_name(
1157 &self,
1158 room_id: &RoomId,
1159 display_name: &DisplayName,
1160 ) -> Result<BTreeSet<OwnedUserId>> {
1161 self.inner
1162 .transaction_on_one_with_mode(keys::DISPLAY_NAMES, IdbTransactionMode::Readonly)?
1163 .object_store(keys::DISPLAY_NAMES)?
1164 .get(&self.encode_key(
1165 keys::DISPLAY_NAMES,
1166 (
1167 room_id,
1168 display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1169 ),
1170 ))?
1171 .await?
1172 .map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
1173 .unwrap_or_else(|| Ok(Default::default()))
1174 }
1175
1176 async fn get_users_with_display_names<'a>(
1177 &self,
1178 room_id: &RoomId,
1179 display_names: &'a [DisplayName],
1180 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1181 let mut map = HashMap::new();
1182
1183 if display_names.is_empty() {
1184 return Ok(map);
1185 }
1186
1187 let txn = self
1188 .inner
1189 .transaction_on_one_with_mode(keys::DISPLAY_NAMES, IdbTransactionMode::Readonly)?;
1190 let store = txn.object_store(keys::DISPLAY_NAMES)?;
1191
1192 for display_name in display_names {
1193 if let Some(user_ids) = store
1194 .get(
1195 &self.encode_key(
1196 keys::DISPLAY_NAMES,
1197 (
1198 room_id,
1199 display_name
1200 .as_normalized_str()
1201 .unwrap_or_else(|| display_name.as_raw_str()),
1202 ),
1203 ),
1204 )?
1205 .await?
1206 .map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
1207 .transpose()?
1208 {
1209 map.insert(display_name, user_ids);
1210 }
1211 }
1212
1213 Ok(map)
1214 }
1215
1216 async fn get_account_data_event(
1217 &self,
1218 event_type: GlobalAccountDataEventType,
1219 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1220 self.inner
1221 .transaction_on_one_with_mode(keys::ACCOUNT_DATA, IdbTransactionMode::Readonly)?
1222 .object_store(keys::ACCOUNT_DATA)?
1223 .get(&self.encode_key(keys::ACCOUNT_DATA, event_type))?
1224 .await?
1225 .map(|f| self.deserialize_value(&f))
1226 .transpose()
1227 }
1228
1229 async fn get_room_account_data_event(
1230 &self,
1231 room_id: &RoomId,
1232 event_type: RoomAccountDataEventType,
1233 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1234 self.inner
1235 .transaction_on_one_with_mode(keys::ROOM_ACCOUNT_DATA, IdbTransactionMode::Readonly)?
1236 .object_store(keys::ROOM_ACCOUNT_DATA)?
1237 .get(&self.encode_key(keys::ROOM_ACCOUNT_DATA, (room_id, event_type)))?
1238 .await?
1239 .map(|f| self.deserialize_value(&f))
1240 .transpose()
1241 }
1242
1243 async fn get_user_room_receipt_event(
1244 &self,
1245 room_id: &RoomId,
1246 receipt_type: ReceiptType,
1247 thread: ReceiptThread,
1248 user_id: &UserId,
1249 ) -> Result<Option<(OwnedEventId, Receipt)>> {
1250 let key = match thread.as_str() {
1251 Some(thread_id) => self
1252 .encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, thread_id, user_id)),
1253 None => self.encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, user_id)),
1254 };
1255 self.inner
1256 .transaction_on_one_with_mode(keys::ROOM_USER_RECEIPTS, IdbTransactionMode::Readonly)?
1257 .object_store(keys::ROOM_USER_RECEIPTS)?
1258 .get(&key)?
1259 .await?
1260 .map(|f| self.deserialize_value(&f))
1261 .transpose()
1262 }
1263
1264 async fn get_event_room_receipt_events(
1265 &self,
1266 room_id: &RoomId,
1267 receipt_type: ReceiptType,
1268 thread: ReceiptThread,
1269 event_id: &EventId,
1270 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1271 let range = match thread.as_str() {
1272 Some(thread_id) => self.encode_to_range(
1273 keys::ROOM_EVENT_RECEIPTS,
1274 (room_id, receipt_type, thread_id, event_id),
1275 ),
1276 None => {
1277 self.encode_to_range(keys::ROOM_EVENT_RECEIPTS, (room_id, receipt_type, event_id))
1278 }
1279 }?;
1280 let tx = self.inner.transaction_on_one_with_mode(
1281 keys::ROOM_EVENT_RECEIPTS,
1282 IdbTransactionMode::Readonly,
1283 )?;
1284 let store = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
1285
1286 Ok(store
1287 .get_all_with_key(&range)?
1288 .await?
1289 .iter()
1290 .filter_map(|f| self.deserialize_value(&f).ok())
1291 .collect::<Vec<_>>())
1292 }
1293
1294 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1295 let jskey = &JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1296 self.get_custom_value_for_js(jskey).await
1297 }
1298
1299 async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1300 let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1301
1302 let prev = self.get_custom_value_for_js(&jskey).await?;
1303
1304 let tx =
1305 self.inner.transaction_on_one_with_mode(keys::CUSTOM, IdbTransactionMode::Readwrite)?;
1306
1307 tx.object_store(keys::CUSTOM)?.put_key_val(&jskey, &self.serialize_value(&value)?)?;
1308
1309 tx.await.into_result().map_err(IndexeddbStateStoreError::from)?;
1310 Ok(prev)
1311 }
1312
1313 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1314 let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1315
1316 let prev = self.get_custom_value_for_js(&jskey).await?;
1317
1318 let tx =
1319 self.inner.transaction_on_one_with_mode(keys::CUSTOM, IdbTransactionMode::Readwrite)?;
1320
1321 tx.object_store(keys::CUSTOM)?.delete(&jskey)?;
1322
1323 tx.await.into_result().map_err(IndexeddbStateStoreError::from)?;
1324 Ok(prev)
1325 }
1326
1327 async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1328 let direct_stores = [keys::ROOM_INFOS, keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE];
1330
1331 let prefixed_stores = [
1334 keys::PROFILES,
1335 keys::DISPLAY_NAMES,
1336 keys::USER_IDS,
1337 keys::ROOM_STATE,
1338 keys::ROOM_ACCOUNT_DATA,
1339 keys::ROOM_EVENT_RECEIPTS,
1340 keys::ROOM_USER_RECEIPTS,
1341 keys::STRIPPED_ROOM_STATE,
1342 keys::STRIPPED_USER_IDS,
1343 ];
1344
1345 let all_stores = {
1346 let mut v = Vec::new();
1347 v.extend(prefixed_stores);
1348 v.extend(direct_stores);
1349 v
1350 };
1351
1352 let tx = self
1353 .inner
1354 .transaction_on_multi_with_mode(&all_stores, IdbTransactionMode::Readwrite)?;
1355
1356 for store_name in direct_stores {
1357 tx.object_store(store_name)?.delete(&self.encode_key(store_name, room_id))?;
1358 }
1359
1360 for store_name in prefixed_stores {
1361 let store = tx.object_store(store_name)?;
1362 let range = self.encode_to_range(store_name, room_id)?;
1363 for key in store.get_all_keys_with_key(&range)?.await?.iter() {
1364 store.delete(&key)?;
1365 }
1366 }
1367
1368 tx.await.into_result().map_err(|e| e.into())
1369 }
1370
1371 async fn get_user_ids(
1372 &self,
1373 room_id: &RoomId,
1374 memberships: RoomMemberships,
1375 ) -> Result<Vec<OwnedUserId>> {
1376 let ids = self.get_user_ids_inner(room_id, memberships, true).await?;
1377 if !ids.is_empty() {
1378 return Ok(ids);
1379 }
1380 self.get_user_ids_inner(room_id, memberships, false).await
1381 }
1382
1383 async fn save_send_queue_request(
1384 &self,
1385 room_id: &RoomId,
1386 transaction_id: OwnedTransactionId,
1387 created_at: MilliSecondsSinceUnixEpoch,
1388 kind: QueuedRequestKind,
1389 priority: usize,
1390 ) -> Result<()> {
1391 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1392
1393 let tx = self
1394 .inner
1395 .transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?;
1396
1397 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1398
1399 let prev = obj.get(&encoded_key)?.await?;
1404
1405 let mut prev = prev.map_or_else(
1406 || Ok(Vec::new()),
1407 |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1408 )?;
1409
1410 prev.push(PersistedQueuedRequest {
1412 room_id: room_id.to_owned(),
1413 kind: Some(kind),
1414 transaction_id,
1415 error: None,
1416 is_wedged: None,
1417 event: None,
1418 priority: Some(priority),
1419 created_at,
1420 });
1421
1422 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1424
1425 tx.await.into_result()?;
1426
1427 Ok(())
1428 }
1429
1430 async fn update_send_queue_request(
1431 &self,
1432 room_id: &RoomId,
1433 transaction_id: &TransactionId,
1434 kind: QueuedRequestKind,
1435 ) -> Result<bool> {
1436 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1437
1438 let tx = self
1439 .inner
1440 .transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?;
1441
1442 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1443
1444 let prev = obj.get(&encoded_key)?.await?;
1449
1450 let mut prev = prev.map_or_else(
1451 || Ok(Vec::new()),
1452 |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1453 )?;
1454
1455 if let Some(entry) = prev.iter_mut().find(|entry| entry.transaction_id == transaction_id) {
1457 entry.kind = Some(kind);
1458 entry.error = None;
1460 entry.is_wedged = None;
1462 entry.event = None;
1463
1464 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1466 tx.await.into_result()?;
1467
1468 Ok(true)
1469 } else {
1470 Ok(false)
1471 }
1472 }
1473
1474 async fn remove_send_queue_request(
1475 &self,
1476 room_id: &RoomId,
1477 transaction_id: &TransactionId,
1478 ) -> Result<bool> {
1479 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1480
1481 let tx = self.inner.transaction_on_multi_with_mode(
1482 &[keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE],
1483 IdbTransactionMode::Readwrite,
1484 )?;
1485
1486 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1487
1488 if let Some(val) = obj.get(&encoded_key)?.await? {
1493 let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
1494 if let Some(pos) = prev.iter().position(|item| item.transaction_id == transaction_id) {
1495 prev.remove(pos);
1496
1497 if prev.is_empty() {
1498 obj.delete(&encoded_key)?;
1499 } else {
1500 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1501 }
1502
1503 tx.await.into_result()?;
1504 return Ok(true);
1505 }
1506 }
1507
1508 Ok(false)
1509 }
1510
1511 async fn load_send_queue_requests(&self, room_id: &RoomId) -> Result<Vec<QueuedRequest>> {
1512 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1513
1514 let prev = self
1517 .inner
1518 .transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?
1519 .object_store(keys::ROOM_SEND_QUEUE)?
1520 .get(&encoded_key)?
1521 .await?;
1522
1523 let mut prev = prev.map_or_else(
1524 || Ok(Vec::new()),
1525 |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1526 )?;
1527
1528 prev.sort_by(|lhs, rhs| rhs.priority.unwrap_or(0).cmp(&lhs.priority.unwrap_or(0)));
1530
1531 Ok(prev.into_iter().filter_map(PersistedQueuedRequest::into_queued_request).collect())
1532 }
1533
1534 async fn update_send_queue_request_status(
1535 &self,
1536 room_id: &RoomId,
1537 transaction_id: &TransactionId,
1538 error: Option<QueueWedgeError>,
1539 ) -> Result<()> {
1540 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1541
1542 let tx = self
1543 .inner
1544 .transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?;
1545
1546 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1547
1548 if let Some(val) = obj.get(&encoded_key)?.await? {
1549 let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
1550 if let Some(request) =
1551 prev.iter_mut().find(|item| item.transaction_id == transaction_id)
1552 {
1553 request.is_wedged = None;
1554 request.error = error;
1555 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1556 }
1557 }
1558
1559 tx.await.into_result()?;
1560
1561 Ok(())
1562 }
1563
1564 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>> {
1565 let tx = self
1566 .inner
1567 .transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?;
1568
1569 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1570
1571 let all_entries = obj
1572 .get_all()?
1573 .await?
1574 .into_iter()
1575 .map(|item| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&item))
1576 .collect::<Result<Vec<Vec<PersistedQueuedRequest>>, _>>()?
1577 .into_iter()
1578 .flat_map(|vec| vec.into_iter().map(|item| item.room_id))
1579 .collect::<BTreeSet<_>>();
1580
1581 Ok(all_entries.into_iter().collect())
1582 }
1583
1584 async fn save_dependent_queued_request(
1585 &self,
1586 room_id: &RoomId,
1587 parent_txn_id: &TransactionId,
1588 own_txn_id: ChildTransactionId,
1589 created_at: MilliSecondsSinceUnixEpoch,
1590 content: DependentQueuedRequestKind,
1591 ) -> Result<()> {
1592 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1593
1594 let tx = self.inner.transaction_on_one_with_mode(
1595 keys::DEPENDENT_SEND_QUEUE,
1596 IdbTransactionMode::Readwrite,
1597 )?;
1598
1599 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1600
1601 let prev = obj.get(&encoded_key)?.await?;
1604
1605 let mut prev = prev.map_or_else(
1606 || Ok(Vec::new()),
1607 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1608 )?;
1609
1610 prev.push(DependentQueuedRequest {
1612 kind: content,
1613 parent_transaction_id: parent_txn_id.to_owned(),
1614 own_transaction_id: own_txn_id,
1615 parent_key: None,
1616 created_at,
1617 });
1618
1619 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1621
1622 tx.await.into_result()?;
1623
1624 Ok(())
1625 }
1626
1627 async fn update_dependent_queued_request(
1628 &self,
1629 room_id: &RoomId,
1630 own_transaction_id: &ChildTransactionId,
1631 new_content: DependentQueuedRequestKind,
1632 ) -> Result<bool> {
1633 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1634
1635 let tx = self.inner.transaction_on_one_with_mode(
1636 keys::DEPENDENT_SEND_QUEUE,
1637 IdbTransactionMode::Readwrite,
1638 )?;
1639
1640 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1641
1642 let prev = obj.get(&encoded_key)?.await?;
1645
1646 let mut prev = prev.map_or_else(
1647 || Ok(Vec::new()),
1648 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1649 )?;
1650
1651 let mut found = false;
1653 for entry in prev.iter_mut() {
1654 if entry.own_transaction_id == *own_transaction_id {
1655 found = true;
1656 entry.kind = new_content;
1657 break;
1658 }
1659 }
1660
1661 if found {
1662 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1663 tx.await.into_result()?;
1664 }
1665
1666 Ok(found)
1667 }
1668
1669 async fn mark_dependent_queued_requests_as_ready(
1670 &self,
1671 room_id: &RoomId,
1672 parent_txn_id: &TransactionId,
1673 parent_key: SentRequestKey,
1674 ) -> Result<usize> {
1675 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1676
1677 let tx = self.inner.transaction_on_one_with_mode(
1678 keys::DEPENDENT_SEND_QUEUE,
1679 IdbTransactionMode::Readwrite,
1680 )?;
1681
1682 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1683
1684 let prev = obj.get(&encoded_key)?.await?;
1687
1688 let mut prev = prev.map_or_else(
1689 || Ok(Vec::new()),
1690 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1691 )?;
1692
1693 let mut num_updated = 0;
1695 for entry in prev.iter_mut().filter(|entry| entry.parent_transaction_id == parent_txn_id) {
1696 entry.parent_key = Some(parent_key.clone());
1697 num_updated += 1;
1698 }
1699
1700 if num_updated > 0 {
1701 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1702 tx.await.into_result()?;
1703 }
1704
1705 Ok(num_updated)
1706 }
1707
1708 async fn remove_dependent_queued_request(
1709 &self,
1710 room_id: &RoomId,
1711 txn_id: &ChildTransactionId,
1712 ) -> Result<bool> {
1713 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1714
1715 let tx = self.inner.transaction_on_one_with_mode(
1716 keys::DEPENDENT_SEND_QUEUE,
1717 IdbTransactionMode::Readwrite,
1718 )?;
1719
1720 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1721
1722 if let Some(val) = obj.get(&encoded_key)?.await? {
1725 let mut prev = self.deserialize_value::<Vec<DependentQueuedRequest>>(&val)?;
1726 if let Some(pos) = prev.iter().position(|item| item.own_transaction_id == *txn_id) {
1727 prev.remove(pos);
1728
1729 if prev.is_empty() {
1730 obj.delete(&encoded_key)?;
1731 } else {
1732 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1733 }
1734
1735 tx.await.into_result()?;
1736 return Ok(true);
1737 }
1738 }
1739
1740 Ok(false)
1741 }
1742
1743 async fn load_dependent_queued_requests(
1744 &self,
1745 room_id: &RoomId,
1746 ) -> Result<Vec<DependentQueuedRequest>> {
1747 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1748
1749 let prev = self
1751 .inner
1752 .transaction_on_one_with_mode(
1753 keys::DEPENDENT_SEND_QUEUE,
1754 IdbTransactionMode::Readwrite,
1755 )?
1756 .object_store(keys::DEPENDENT_SEND_QUEUE)?
1757 .get(&encoded_key)?
1758 .await?;
1759
1760 prev.map_or_else(
1761 || Ok(Vec::new()),
1762 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1763 )
1764 }
1765});
1766
1767#[derive(Debug, Serialize, Deserialize)]
1769struct RoomMember {
1770 user_id: OwnedUserId,
1771 membership: MembershipState,
1772}
1773
1774impl From<&SyncStateEvent<RoomMemberEventContent>> for RoomMember {
1775 fn from(event: &SyncStateEvent<RoomMemberEventContent>) -> Self {
1776 Self { user_id: event.state_key().clone(), membership: event.membership().clone() }
1777 }
1778}
1779
1780impl From<&StrippedRoomMemberEvent> for RoomMember {
1781 fn from(event: &StrippedRoomMemberEvent) -> Self {
1782 Self { user_id: event.state_key.clone(), membership: event.content.membership.clone() }
1783 }
1784}
1785
1786#[cfg(test)]
1787mod migration_tests {
1788 use assert_matches2::assert_matches;
1789 use matrix_sdk_base::store::{QueuedRequestKind, SerializableEventContent};
1790 use ruma::{
1791 events::room::message::RoomMessageEventContent, room_id, OwnedRoomId, OwnedTransactionId,
1792 TransactionId,
1793 };
1794 use serde::{Deserialize, Serialize};
1795
1796 use crate::state_store::PersistedQueuedRequest;
1797
1798 #[derive(Serialize, Deserialize)]
1799 struct OldPersistedQueuedRequest {
1800 room_id: OwnedRoomId,
1801 event: SerializableEventContent,
1802 transaction_id: OwnedTransactionId,
1803 is_wedged: bool,
1804 }
1805
1806 #[test]
1810 fn test_migrating_persisted_queue_event_serialization() {
1811 let room_a_id = room_id!("!room_a:dummy.local");
1812 let transaction_id = TransactionId::new();
1813 let content =
1814 SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())
1815 .unwrap();
1816
1817 let old_persisted_queue_event = OldPersistedQueuedRequest {
1818 room_id: room_a_id.to_owned(),
1819 event: content,
1820 transaction_id: transaction_id.clone(),
1821 is_wedged: true,
1822 };
1823
1824 let serialized_persisted = serde_json::to_vec(&old_persisted_queue_event).unwrap();
1825
1826 let new_persisted: PersistedQueuedRequest =
1828 serde_json::from_slice(&serialized_persisted).unwrap();
1829
1830 assert_eq!(new_persisted.is_wedged, Some(true));
1831 assert!(new_persisted.error.is_none());
1832
1833 assert!(new_persisted.event.is_some());
1834 assert!(new_persisted.kind.is_none());
1835
1836 let queued = new_persisted.into_queued_request().unwrap();
1837 assert_matches!(queued.kind, QueuedRequestKind::Event { .. });
1838 assert_eq!(queued.transaction_id, transaction_id);
1839 assert!(queued.error.is_some());
1840 }
1841}
1842
1843#[cfg(all(test, target_arch = "wasm32"))]
1844mod tests {
1845 #[cfg(target_arch = "wasm32")]
1846 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
1847
1848 use matrix_sdk_base::statestore_integration_tests;
1849 use uuid::Uuid;
1850
1851 use super::{IndexeddbStateStore, Result};
1852
1853 async fn get_store() -> Result<IndexeddbStateStore> {
1854 let db_name = format!("test-state-plain-{}", Uuid::new_v4().as_hyphenated());
1855 Ok(IndexeddbStateStore::builder().name(db_name).build().await?)
1856 }
1857
1858 statestore_integration_tests!();
1859}
1860
1861#[cfg(all(test, target_arch = "wasm32"))]
1862mod encrypted_tests {
1863 #[cfg(target_arch = "wasm32")]
1864 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
1865
1866 use matrix_sdk_base::statestore_integration_tests;
1867 use uuid::Uuid;
1868
1869 use super::{IndexeddbStateStore, Result};
1870
1871 async fn get_store() -> Result<IndexeddbStateStore> {
1872 let db_name = format!("test-state-encrypted-{}", Uuid::new_v4().as_hyphenated());
1873 let passphrase = format!("some_passphrase-{}", Uuid::new_v4().as_hyphenated());
1874 Ok(IndexeddbStateStore::builder().name(db_name).passphrase(passphrase).build().await?)
1875 }
1876
1877 statestore_integration_tests!();
1878}