1use std::{
16 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
17 sync::Arc,
18};
19
20use anyhow::anyhow;
21use async_trait::async_trait;
22use gloo_utils::format::JsValueSerdeExt;
23use growable_bloom_filter::GrowableBloom;
24use indexed_db_futures::prelude::*;
25use matrix_sdk_base::{
26 deserialized_responses::{DisplayName, RawAnySyncOrStrippedState},
27 store::{
28 ChildTransactionId, ComposerDraft, DependentQueuedRequest, DependentQueuedRequestKind,
29 QueuedRequest, QueuedRequestKind, RoomLoadSettings, SentRequestKey,
30 SerializableEventContent, ServerCapabilities, StateChanges, StateStore, StoreError,
31 },
32 MinimalRoomMemberEvent, RoomInfo, RoomMemberships, StateStoreDataKey, StateStoreDataValue,
33};
34use matrix_sdk_store_encryption::{Error as EncryptionError, StoreCipher};
35use ruma::{
36 canonical_json::{redact, RedactedBecause},
37 events::{
38 presence::PresenceEvent,
39 receipt::{Receipt, ReceiptThread, ReceiptType},
40 room::member::{
41 MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent, SyncRoomMemberEvent,
42 },
43 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
44 GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, SyncStateEvent,
45 },
46 serde::Raw,
47 CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri,
48 OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, RoomVersionId, TransactionId, UserId,
49};
50use serde::{de::DeserializeOwned, Deserialize, Serialize};
51use tracing::{debug, warn};
52use wasm_bindgen::JsValue;
53use web_sys::IdbKeyRange;
54
55mod migrations;
56
57pub use self::migrations::MigrationConflictStrategy;
58use self::migrations::{upgrade_inner_db, upgrade_meta_db};
59use crate::safe_encode::SafeEncode;
60
61#[derive(Debug, thiserror::Error)]
62pub enum IndexeddbStateStoreError {
63 #[error(transparent)]
64 Json(#[from] serde_json::Error),
65 #[error(transparent)]
66 Encryption(#[from] EncryptionError),
67 #[error("DomException {name} ({code}): {message}")]
68 DomException { name: String, message: String, code: u16 },
69 #[error(transparent)]
70 StoreError(#[from] StoreError),
71 #[error("Can't migrate {name} from {old_version} to {new_version} without deleting data. See MigrationConflictStrategy for ways to configure.")]
72 MigrationConflict { name: String, old_version: u32, new_version: u32 },
73}
74
75impl From<web_sys::DomException> for IndexeddbStateStoreError {
76 fn from(frm: web_sys::DomException) -> IndexeddbStateStoreError {
77 IndexeddbStateStoreError::DomException {
78 name: frm.name(),
79 message: frm.message(),
80 code: frm.code(),
81 }
82 }
83}
84
85impl From<IndexeddbStateStoreError> for StoreError {
86 fn from(e: IndexeddbStateStoreError) -> Self {
87 match e {
88 IndexeddbStateStoreError::Json(e) => StoreError::Json(e),
89 IndexeddbStateStoreError::StoreError(e) => e,
90 IndexeddbStateStoreError::Encryption(e) => StoreError::Encryption(e),
91 _ => StoreError::backend(e),
92 }
93 }
94}
95
96mod keys {
97 pub const INTERNAL_STATE: &str = "matrix-sdk-state";
98 pub const BACKUPS_META: &str = "backups";
99
100 pub const ACCOUNT_DATA: &str = "account_data";
101
102 pub const PROFILES: &str = "profiles";
103 pub const DISPLAY_NAMES: &str = "display_names";
104 pub const USER_IDS: &str = "user_ids";
105
106 pub const ROOM_STATE: &str = "room_state";
107 pub const ROOM_INFOS: &str = "room_infos";
108 pub const PRESENCE: &str = "presence";
109 pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
110 pub const ROOM_SEND_QUEUE: &str = "room_send_queue";
112 pub const DEPENDENT_SEND_QUEUE: &str = "room_dependent_send_queue";
114
115 pub const STRIPPED_ROOM_STATE: &str = "stripped_room_state";
116 pub const STRIPPED_USER_IDS: &str = "stripped_user_ids";
117
118 pub const ROOM_USER_RECEIPTS: &str = "room_user_receipts";
119 pub const ROOM_EVENT_RECEIPTS: &str = "room_event_receipts";
120
121 pub const CUSTOM: &str = "custom";
122 pub const KV: &str = "kv";
123
124 pub const ALL_STORES: &[&str] = &[
126 ACCOUNT_DATA,
127 PROFILES,
128 DISPLAY_NAMES,
129 USER_IDS,
130 ROOM_STATE,
131 ROOM_INFOS,
132 PRESENCE,
133 ROOM_ACCOUNT_DATA,
134 STRIPPED_ROOM_STATE,
135 STRIPPED_USER_IDS,
136 ROOM_USER_RECEIPTS,
137 ROOM_EVENT_RECEIPTS,
138 ROOM_SEND_QUEUE,
139 DEPENDENT_SEND_QUEUE,
140 CUSTOM,
141 KV,
142 ];
143
144 pub const STORE_KEY: &str = "store_key";
147}
148
149pub use keys::ALL_STORES;
150use matrix_sdk_base::store::QueueWedgeError;
151
152fn serialize_value(store_cipher: Option<&StoreCipher>, event: &impl Serialize) -> Result<JsValue> {
154 Ok(match store_cipher {
155 Some(cipher) => {
156 let data = serde_json::to_vec(event)?;
157 JsValue::from_serde(&cipher.encrypt_value_data(data)?)?
158 }
159 None => JsValue::from_serde(event)?,
160 })
161}
162
163fn deserialize_value<T: DeserializeOwned>(
165 store_cipher: Option<&StoreCipher>,
166 event: &JsValue,
167) -> Result<T> {
168 match store_cipher {
169 Some(cipher) => {
170 use zeroize::Zeroize;
171 let mut plaintext = cipher.decrypt_value_data(event.into_serde()?)?;
172 let ret = serde_json::from_slice(&plaintext);
173 plaintext.zeroize();
174 Ok(ret?)
175 }
176 None => Ok(event.into_serde()?),
177 }
178}
179
180fn encode_key<T>(store_cipher: Option<&StoreCipher>, table_name: &str, key: T) -> JsValue
181where
182 T: SafeEncode,
183{
184 match store_cipher {
185 Some(cipher) => key.as_secure_string(table_name, cipher),
186 None => key.as_encoded_string(),
187 }
188 .into()
189}
190
191fn encode_to_range<T>(
192 store_cipher: Option<&StoreCipher>,
193 table_name: &str,
194 key: T,
195) -> Result<IdbKeyRange>
196where
197 T: SafeEncode,
198{
199 match store_cipher {
200 Some(cipher) => key.encode_to_range_secure(table_name, cipher),
201 None => key.encode_to_range(),
202 }
203 .map_err(|e| IndexeddbStateStoreError::StoreError(StoreError::Backend(anyhow!(e).into())))
204}
205
206#[derive(Debug)]
208pub struct IndexeddbStateStoreBuilder {
209 name: Option<String>,
210 passphrase: Option<String>,
211 migration_conflict_strategy: MigrationConflictStrategy,
212}
213
214impl IndexeddbStateStoreBuilder {
215 fn new() -> Self {
216 Self {
217 name: None,
218 passphrase: None,
219 migration_conflict_strategy: MigrationConflictStrategy::BackupAndDrop,
220 }
221 }
222
223 pub fn name(mut self, value: String) -> Self {
225 self.name = Some(value);
226 self
227 }
228
229 pub fn passphrase(mut self, value: String) -> Self {
233 self.passphrase = Some(value);
234 self
235 }
236
237 pub fn migration_conflict_strategy(mut self, value: MigrationConflictStrategy) -> Self {
241 self.migration_conflict_strategy = value;
242 self
243 }
244
245 pub async fn build(self) -> Result<IndexeddbStateStore> {
246 let migration_strategy = self.migration_conflict_strategy.clone();
247 let name = self.name.unwrap_or_else(|| "state".to_owned());
248
249 let meta_name = format!("{name}::{}", keys::INTERNAL_STATE);
250
251 let (meta, store_cipher) = upgrade_meta_db(&meta_name, self.passphrase.as_deref()).await?;
252 let inner =
253 upgrade_inner_db(&name, store_cipher.as_deref(), migration_strategy, &meta).await?;
254
255 Ok(IndexeddbStateStore { name, inner, meta, store_cipher })
256 }
257}
258
259pub struct IndexeddbStateStore {
260 name: String,
261 pub(crate) inner: IdbDatabase,
262 pub(crate) meta: IdbDatabase,
263 pub(crate) store_cipher: Option<Arc<StoreCipher>>,
264}
265
266#[cfg(not(tarpaulin_include))]
267impl std::fmt::Debug for IndexeddbStateStore {
268 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269 f.debug_struct("IndexeddbStateStore").field("name", &self.name).finish()
270 }
271}
272
273type Result<A, E = IndexeddbStateStoreError> = std::result::Result<A, E>;
274
275impl IndexeddbStateStore {
276 pub fn builder() -> IndexeddbStateStoreBuilder {
278 IndexeddbStateStoreBuilder::new()
279 }
280
281 pub fn version(&self) -> u32 {
283 self.inner.version() as u32
284 }
285
286 pub fn meta_version(&self) -> u32 {
288 self.meta.version() as u32
289 }
290
291 pub async fn has_backups(&self) -> Result<bool> {
293 Ok(self
294 .meta
295 .transaction_on_one_with_mode(keys::BACKUPS_META, IdbTransactionMode::Readonly)?
296 .object_store(keys::BACKUPS_META)?
297 .count()?
298 .await?
299 > 0)
300 }
301
302 pub async fn latest_backup(&self) -> Result<Option<String>> {
304 Ok(self
305 .meta
306 .transaction_on_one_with_mode(keys::BACKUPS_META, IdbTransactionMode::Readonly)?
307 .object_store(keys::BACKUPS_META)?
308 .open_cursor_with_direction(IdbCursorDirection::Prev)?
309 .await?
310 .and_then(|c| c.value().as_string()))
311 }
312
313 fn serialize_value(&self, event: &impl Serialize) -> Result<JsValue> {
315 serialize_value(self.store_cipher.as_deref(), event)
316 }
317
318 fn deserialize_value<T: DeserializeOwned>(&self, event: &JsValue) -> Result<T> {
320 deserialize_value(self.store_cipher.as_deref(), event)
321 }
322
323 fn encode_key<T>(&self, table_name: &str, key: T) -> JsValue
324 where
325 T: SafeEncode,
326 {
327 encode_key(self.store_cipher.as_deref(), table_name, key)
328 }
329
330 fn encode_to_range<T>(&self, table_name: &str, key: T) -> Result<IdbKeyRange>
331 where
332 T: SafeEncode,
333 {
334 encode_to_range(self.store_cipher.as_deref(), table_name, key)
335 }
336
337 pub async fn get_user_ids_inner(
340 &self,
341 room_id: &RoomId,
342 memberships: RoomMemberships,
343 stripped: bool,
344 ) -> Result<Vec<OwnedUserId>> {
345 let store_name = if stripped { keys::STRIPPED_USER_IDS } else { keys::USER_IDS };
346
347 let tx =
348 self.inner.transaction_on_one_with_mode(store_name, IdbTransactionMode::Readonly)?;
349 let store = tx.object_store(store_name)?;
350 let range = self.encode_to_range(store_name, room_id)?;
351
352 let user_ids = if memberships.is_empty() {
353 store
355 .get_all_with_key(&range)?
356 .await?
357 .iter()
358 .filter_map(|f| self.deserialize_value::<RoomMember>(&f).ok().map(|m| m.user_id))
359 .collect::<Vec<_>>()
360 } else {
361 let mut user_ids = Vec::new();
362 let cursor = store.open_cursor_with_range(&range)?.await?;
363
364 if let Some(cursor) = cursor {
365 loop {
366 let value = cursor.value();
367 let member = self.deserialize_value::<RoomMember>(&value)?;
368
369 if memberships.matches(&member.membership) {
370 user_ids.push(member.user_id);
371 }
372
373 if !cursor.continue_cursor()?.await? {
374 break;
375 }
376 }
377 }
378
379 user_ids
380 };
381
382 Ok(user_ids)
383 }
384
385 async fn get_custom_value_for_js(&self, jskey: &JsValue) -> Result<Option<Vec<u8>>> {
386 self.inner
387 .transaction_on_one_with_mode(keys::CUSTOM, IdbTransactionMode::Readonly)?
388 .object_store(keys::CUSTOM)?
389 .get(jskey)?
390 .await?
391 .map(|f| self.deserialize_value(&f))
392 .transpose()
393 }
394
395 fn encode_kv_data_key(&self, key: StateStoreDataKey<'_>) -> JsValue {
396 match key {
400 StateStoreDataKey::SyncToken => {
401 self.encode_key(StateStoreDataKey::SYNC_TOKEN, StateStoreDataKey::SYNC_TOKEN)
402 }
403 StateStoreDataKey::ServerCapabilities => self.encode_key(
404 StateStoreDataKey::SERVER_CAPABILITIES,
405 StateStoreDataKey::SERVER_CAPABILITIES,
406 ),
407 StateStoreDataKey::Filter(filter_name) => {
408 self.encode_key(StateStoreDataKey::FILTER, (StateStoreDataKey::FILTER, filter_name))
409 }
410 StateStoreDataKey::UserAvatarUrl(user_id) => {
411 self.encode_key(keys::KV, (StateStoreDataKey::USER_AVATAR_URL, user_id))
412 }
413 StateStoreDataKey::RecentlyVisitedRooms(user_id) => {
414 self.encode_key(keys::KV, (StateStoreDataKey::RECENTLY_VISITED_ROOMS, user_id))
415 }
416 StateStoreDataKey::UtdHookManagerData => {
417 self.encode_key(keys::KV, StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
418 }
419 StateStoreDataKey::ComposerDraft(room_id) => {
420 self.encode_key(keys::KV, (StateStoreDataKey::COMPOSER_DRAFT, room_id))
421 }
422 StateStoreDataKey::SeenKnockRequests(room_id) => {
423 self.encode_key(keys::KV, (StateStoreDataKey::SEEN_KNOCK_REQUESTS, room_id))
424 }
425 }
426 }
427}
428
429#[derive(Serialize, Deserialize)]
432struct PersistedQueuedRequest {
433 pub room_id: OwnedRoomId,
435
436 kind: Option<QueuedRequestKind>,
439 transaction_id: OwnedTransactionId,
440
441 pub error: Option<QueueWedgeError>,
442
443 priority: Option<usize>,
444
445 #[serde(default = "created_now")]
447 created_at: MilliSecondsSinceUnixEpoch,
448
449 is_wedged: Option<bool>,
452
453 event: Option<SerializableEventContent>,
454}
455
456fn created_now() -> MilliSecondsSinceUnixEpoch {
457 MilliSecondsSinceUnixEpoch::now()
458}
459
460impl PersistedQueuedRequest {
461 fn into_queued_request(self) -> Option<QueuedRequest> {
462 let kind =
463 self.kind.or_else(|| self.event.map(|content| QueuedRequestKind::Event { content }))?;
464
465 let error = match self.is_wedged {
466 Some(true) => {
467 Some(QueueWedgeError::GenericApiError {
469 msg: "local echo failed to send in a previous session".into(),
470 })
471 }
472 _ => self.error,
473 };
474
475 let priority = self.priority.unwrap_or(0);
477
478 Some(QueuedRequest {
479 kind,
480 transaction_id: self.transaction_id,
481 error,
482 priority,
483 created_at: self.created_at,
484 })
485 }
486}
487
488#[cfg(target_arch = "wasm32")]
497macro_rules! impl_state_store {
498 ({ $($body:tt)* }) => {
499 #[async_trait(?Send)]
500 impl StateStore for IndexeddbStateStore {
501 type Error = IndexeddbStateStoreError;
502
503 $($body)*
504 }
505 };
506}
507
508#[cfg(not(target_arch = "wasm32"))]
509macro_rules! impl_state_store {
510 ({ $($body:tt)* }) => {
511 impl IndexeddbStateStore {
512 $($body)*
513 }
514 };
515}
516
517impl_state_store!({
518 async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
519 let encoded_key = self.encode_kv_data_key(key);
520
521 let value = self
522 .inner
523 .transaction_on_one_with_mode(keys::KV, IdbTransactionMode::Readonly)?
524 .object_store(keys::KV)?
525 .get(&encoded_key)?
526 .await?;
527
528 let value = match key {
529 StateStoreDataKey::SyncToken => value
530 .map(|f| self.deserialize_value::<String>(&f))
531 .transpose()?
532 .map(StateStoreDataValue::SyncToken),
533 StateStoreDataKey::ServerCapabilities => value
534 .map(|f| self.deserialize_value::<ServerCapabilities>(&f))
535 .transpose()?
536 .map(StateStoreDataValue::ServerCapabilities),
537 StateStoreDataKey::Filter(_) => value
538 .map(|f| self.deserialize_value::<String>(&f))
539 .transpose()?
540 .map(StateStoreDataValue::Filter),
541 StateStoreDataKey::UserAvatarUrl(_) => value
542 .map(|f| self.deserialize_value::<OwnedMxcUri>(&f))
543 .transpose()?
544 .map(StateStoreDataValue::UserAvatarUrl),
545 StateStoreDataKey::RecentlyVisitedRooms(_) => value
546 .map(|f| self.deserialize_value::<Vec<OwnedRoomId>>(&f))
547 .transpose()?
548 .map(StateStoreDataValue::RecentlyVisitedRooms),
549 StateStoreDataKey::UtdHookManagerData => value
550 .map(|f| self.deserialize_value::<GrowableBloom>(&f))
551 .transpose()?
552 .map(StateStoreDataValue::UtdHookManagerData),
553 StateStoreDataKey::ComposerDraft(_) => value
554 .map(|f| self.deserialize_value::<ComposerDraft>(&f))
555 .transpose()?
556 .map(StateStoreDataValue::ComposerDraft),
557 StateStoreDataKey::SeenKnockRequests(_) => value
558 .map(|f| self.deserialize_value::<BTreeMap<OwnedEventId, OwnedUserId>>(&f))
559 .transpose()?
560 .map(StateStoreDataValue::SeenKnockRequests),
561 };
562
563 Ok(value)
564 }
565
566 async fn set_kv_data(
567 &self,
568 key: StateStoreDataKey<'_>,
569 value: StateStoreDataValue,
570 ) -> Result<()> {
571 let encoded_key = self.encode_kv_data_key(key);
572
573 let serialized_value = match key {
574 StateStoreDataKey::SyncToken => self
575 .serialize_value(&value.into_sync_token().expect("Session data not a sync token")),
576 StateStoreDataKey::ServerCapabilities => self.serialize_value(
577 &value
578 .into_server_capabilities()
579 .expect("Session data not containing server capabilities"),
580 ),
581 StateStoreDataKey::Filter(_) => {
582 self.serialize_value(&value.into_filter().expect("Session data not a filter"))
583 }
584 StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
585 &value.into_user_avatar_url().expect("Session data not an user avatar url"),
586 ),
587 StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
588 &value
589 .into_recently_visited_rooms()
590 .expect("Session data not a recently visited room list"),
591 ),
592 StateStoreDataKey::UtdHookManagerData => self.serialize_value(
593 &value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
594 ),
595 StateStoreDataKey::ComposerDraft(_) => self.serialize_value(
596 &value.into_composer_draft().expect("Session data not a composer draft"),
597 ),
598 StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
599 &value
600 .into_seen_knock_requests()
601 .expect("Session data is not a set of seen knock request ids"),
602 ),
603 };
604
605 let tx =
606 self.inner.transaction_on_one_with_mode(keys::KV, IdbTransactionMode::Readwrite)?;
607
608 let obj = tx.object_store(keys::KV)?;
609
610 obj.put_key_val(&encoded_key, &serialized_value?)?;
611
612 tx.await.into_result()?;
613
614 Ok(())
615 }
616
617 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
618 let encoded_key = self.encode_kv_data_key(key);
619
620 let tx =
621 self.inner.transaction_on_one_with_mode(keys::KV, IdbTransactionMode::Readwrite)?;
622 let obj = tx.object_store(keys::KV)?;
623
624 obj.delete(&encoded_key)?;
625
626 tx.await.into_result()?;
627
628 Ok(())
629 }
630
631 async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
632 let mut stores: HashSet<&'static str> = [
633 (changes.sync_token.is_some(), keys::KV),
634 (!changes.ambiguity_maps.is_empty(), keys::DISPLAY_NAMES),
635 (!changes.account_data.is_empty(), keys::ACCOUNT_DATA),
636 (!changes.presence.is_empty(), keys::PRESENCE),
637 (
638 !changes.profiles.is_empty() || !changes.profiles_to_delete.is_empty(),
639 keys::PROFILES,
640 ),
641 (!changes.room_account_data.is_empty(), keys::ROOM_ACCOUNT_DATA),
642 (!changes.receipts.is_empty(), keys::ROOM_EVENT_RECEIPTS),
643 ]
644 .iter()
645 .filter_map(|(id, key)| if *id { Some(*key) } else { None })
646 .collect();
647
648 if !changes.state.is_empty() {
649 stores.extend([
650 keys::ROOM_STATE,
651 keys::USER_IDS,
652 keys::STRIPPED_USER_IDS,
653 keys::STRIPPED_ROOM_STATE,
654 keys::PROFILES,
655 ]);
656 }
657
658 if !changes.redactions.is_empty() {
659 stores.extend([keys::ROOM_STATE, keys::ROOM_INFOS]);
660 }
661
662 if !changes.room_infos.is_empty() {
663 stores.insert(keys::ROOM_INFOS);
664 }
665
666 if !changes.stripped_state.is_empty() {
667 stores.extend([keys::STRIPPED_ROOM_STATE, keys::STRIPPED_USER_IDS]);
668 }
669
670 if !changes.receipts.is_empty() {
671 stores.extend([keys::ROOM_EVENT_RECEIPTS, keys::ROOM_USER_RECEIPTS])
672 }
673
674 if stores.is_empty() {
675 return Ok(());
677 }
678
679 let stores: Vec<&'static str> = stores.into_iter().collect();
680 let tx =
681 self.inner.transaction_on_multi_with_mode(&stores, IdbTransactionMode::Readwrite)?;
682
683 if let Some(s) = &changes.sync_token {
684 tx.object_store(keys::KV)?.put_key_val(
685 &self.encode_kv_data_key(StateStoreDataKey::SyncToken),
686 &self.serialize_value(s)?,
687 )?;
688 }
689
690 if !changes.ambiguity_maps.is_empty() {
691 let store = tx.object_store(keys::DISPLAY_NAMES)?;
692 for (room_id, ambiguity_maps) in &changes.ambiguity_maps {
693 for (display_name, map) in ambiguity_maps {
694 let key = self.encode_key(
695 keys::DISPLAY_NAMES,
696 (
697 room_id,
698 display_name
699 .as_normalized_str()
700 .unwrap_or_else(|| display_name.as_raw_str()),
701 ),
702 );
703
704 store.put_key_val(&key, &self.serialize_value(&map)?)?;
705 }
706 }
707 }
708
709 if !changes.account_data.is_empty() {
710 let store = tx.object_store(keys::ACCOUNT_DATA)?;
711 for (event_type, event) in &changes.account_data {
712 store.put_key_val(
713 &self.encode_key(keys::ACCOUNT_DATA, event_type),
714 &self.serialize_value(&event)?,
715 )?;
716 }
717 }
718
719 if !changes.room_account_data.is_empty() {
720 let store = tx.object_store(keys::ROOM_ACCOUNT_DATA)?;
721 for (room, events) in &changes.room_account_data {
722 for (event_type, event) in events {
723 let key = self.encode_key(keys::ROOM_ACCOUNT_DATA, (room, event_type));
724 store.put_key_val(&key, &self.serialize_value(&event)?)?;
725 }
726 }
727 }
728
729 if !changes.state.is_empty() {
730 let state = tx.object_store(keys::ROOM_STATE)?;
731 let profiles = tx.object_store(keys::PROFILES)?;
732 let user_ids = tx.object_store(keys::USER_IDS)?;
733 let stripped_state = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
734 let stripped_user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
735
736 for (room, user_ids) in &changes.profiles_to_delete {
737 for user_id in user_ids {
738 let key = self.encode_key(keys::PROFILES, (room, user_id));
739 profiles.delete(&key)?;
740 }
741 }
742
743 for (room, event_types) in &changes.state {
744 let profile_changes = changes.profiles.get(room);
745
746 for (event_type, events) in event_types {
747 for (state_key, raw_event) in events {
748 let key = self.encode_key(keys::ROOM_STATE, (room, event_type, state_key));
749 state.put_key_val(&key, &self.serialize_value(&raw_event)?)?;
750 stripped_state.delete(&key)?;
751
752 if *event_type == StateEventType::RoomMember {
753 let event = match raw_event.deserialize_as::<SyncRoomMemberEvent>() {
754 Ok(ev) => ev,
755 Err(e) => {
756 let event_id: Option<String> =
757 raw_event.get_field("event_id").ok().flatten();
758 debug!(event_id, "Failed to deserialize member event: {e}");
759 continue;
760 }
761 };
762
763 let key = (room, state_key);
764
765 stripped_user_ids
766 .delete(&self.encode_key(keys::STRIPPED_USER_IDS, key))?;
767
768 user_ids.put_key_val_owned(
769 self.encode_key(keys::USER_IDS, key),
770 &self.serialize_value(&RoomMember::from(&event))?,
771 )?;
772
773 if let Some(profile) =
774 profile_changes.and_then(|p| p.get(event.state_key()))
775 {
776 profiles.put_key_val_owned(
777 self.encode_key(keys::PROFILES, key),
778 &self.serialize_value(&profile)?,
779 )?;
780 }
781 }
782 }
783 }
784 }
785 }
786
787 if !changes.room_infos.is_empty() {
788 let room_infos = tx.object_store(keys::ROOM_INFOS)?;
789 for (room_id, room_info) in &changes.room_infos {
790 room_infos.put_key_val(
791 &self.encode_key(keys::ROOM_INFOS, room_id),
792 &self.serialize_value(&room_info)?,
793 )?;
794 }
795 }
796
797 if !changes.presence.is_empty() {
798 let store = tx.object_store(keys::PRESENCE)?;
799 for (sender, event) in &changes.presence {
800 store.put_key_val(
801 &self.encode_key(keys::PRESENCE, sender),
802 &self.serialize_value(&event)?,
803 )?;
804 }
805 }
806
807 if !changes.stripped_state.is_empty() {
808 let store = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
809 let user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
810
811 for (room, event_types) in &changes.stripped_state {
812 for (event_type, events) in event_types {
813 for (state_key, raw_event) in events {
814 let key = self
815 .encode_key(keys::STRIPPED_ROOM_STATE, (room, event_type, state_key));
816 store.put_key_val(&key, &self.serialize_value(&raw_event)?)?;
817
818 if *event_type == StateEventType::RoomMember {
819 let event = match raw_event.deserialize_as::<StrippedRoomMemberEvent>()
820 {
821 Ok(ev) => ev,
822 Err(e) => {
823 let event_id: Option<String> =
824 raw_event.get_field("event_id").ok().flatten();
825 debug!(
826 event_id,
827 "Failed to deserialize stripped member event: {e}"
828 );
829 continue;
830 }
831 };
832
833 let key = (room, state_key);
834
835 user_ids.put_key_val_owned(
836 self.encode_key(keys::STRIPPED_USER_IDS, key),
837 &self.serialize_value(&RoomMember::from(&event))?,
838 )?;
839 }
840 }
841 }
842 }
843 }
844
845 if !changes.receipts.is_empty() {
846 let room_user_receipts = tx.object_store(keys::ROOM_USER_RECEIPTS)?;
847 let room_event_receipts = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
848
849 for (room, content) in &changes.receipts {
850 for (event_id, receipts) in &content.0 {
851 for (receipt_type, receipts) in receipts {
852 for (user_id, receipt) in receipts {
853 let key = match receipt.thread.as_str() {
854 Some(thread_id) => self.encode_key(
855 keys::ROOM_USER_RECEIPTS,
856 (room, receipt_type, thread_id, user_id),
857 ),
858 None => self.encode_key(
859 keys::ROOM_USER_RECEIPTS,
860 (room, receipt_type, user_id),
861 ),
862 };
863
864 if let Some((old_event, _)) =
865 room_user_receipts.get(&key)?.await?.and_then(|f| {
866 self.deserialize_value::<(OwnedEventId, Receipt)>(&f).ok()
867 })
868 {
869 let key = match receipt.thread.as_str() {
870 Some(thread_id) => self.encode_key(
871 keys::ROOM_EVENT_RECEIPTS,
872 (room, receipt_type, thread_id, old_event, user_id),
873 ),
874 None => self.encode_key(
875 keys::ROOM_EVENT_RECEIPTS,
876 (room, receipt_type, old_event, user_id),
877 ),
878 };
879 room_event_receipts.delete(&key)?;
880 }
881
882 room_user_receipts
883 .put_key_val(&key, &self.serialize_value(&(event_id, receipt))?)?;
884
885 let key = match receipt.thread.as_str() {
887 Some(thread_id) => self.encode_key(
888 keys::ROOM_EVENT_RECEIPTS,
889 (room, receipt_type, thread_id, event_id, user_id),
890 ),
891 None => self.encode_key(
892 keys::ROOM_EVENT_RECEIPTS,
893 (room, receipt_type, event_id, user_id),
894 ),
895 };
896 room_event_receipts
897 .put_key_val(&key, &self.serialize_value(&(user_id, receipt))?)?;
898 }
899 }
900 }
901 }
902 }
903
904 if !changes.redactions.is_empty() {
905 let state = tx.object_store(keys::ROOM_STATE)?;
906 let room_info = tx.object_store(keys::ROOM_INFOS)?;
907
908 for (room_id, redactions) in &changes.redactions {
909 let range = self.encode_to_range(keys::ROOM_STATE, room_id)?;
910 let Some(cursor) = state.open_cursor_with_range(&range)?.await? else { continue };
911
912 let mut room_version = None;
913
914 while let Some(key) = cursor.key() {
915 let raw_evt =
916 self.deserialize_value::<Raw<AnySyncStateEvent>>(&cursor.value())?;
917 if let Ok(Some(event_id)) = raw_evt.get_field::<OwnedEventId>("event_id") {
918 if let Some(redaction) = redactions.get(&event_id) {
919 let version = {
920 if room_version.is_none() {
921 room_version.replace(room_info
922 .get(&self.encode_key(keys::ROOM_INFOS, room_id))?
923 .await?
924 .and_then(|f| self.deserialize_value::<RoomInfo>(&f).ok())
925 .and_then(|info| info.room_version().cloned())
926 .unwrap_or_else(|| {
927 warn!(?room_id, "Unable to find the room version, assume version 9");
928 RoomVersionId::V9
929 })
930 );
931 }
932 room_version.as_ref().unwrap()
933 };
934
935 let redacted = redact(
936 raw_evt.deserialize_as::<CanonicalJsonObject>()?,
937 version,
938 Some(RedactedBecause::from_raw_event(redaction)?),
939 )
940 .map_err(StoreError::Redaction)?;
941 state.put_key_val(&key, &self.serialize_value(&redacted)?)?;
942 }
943 }
944
945 cursor.advance(1)?.await?;
947 }
948 }
949 }
950
951 tx.await.into_result().map_err(|e| e.into())
952 }
953
954 async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
955 self.inner
956 .transaction_on_one_with_mode(keys::PRESENCE, IdbTransactionMode::Readonly)?
957 .object_store(keys::PRESENCE)?
958 .get(&self.encode_key(keys::PRESENCE, user_id))?
959 .await?
960 .map(|f| self.deserialize_value(&f))
961 .transpose()
962 }
963
964 async fn get_presence_events(
965 &self,
966 user_ids: &[OwnedUserId],
967 ) -> Result<Vec<Raw<PresenceEvent>>> {
968 if user_ids.is_empty() {
969 return Ok(Vec::new());
970 }
971
972 let txn = self
973 .inner
974 .transaction_on_one_with_mode(keys::PRESENCE, IdbTransactionMode::Readonly)?;
975 let store = txn.object_store(keys::PRESENCE)?;
976
977 let mut events = Vec::with_capacity(user_ids.len());
978
979 for user_id in user_ids {
980 if let Some(event) = store
981 .get(&self.encode_key(keys::PRESENCE, user_id))?
982 .await?
983 .map(|f| self.deserialize_value(&f))
984 .transpose()?
985 {
986 events.push(event)
987 }
988 }
989
990 Ok(events)
991 }
992
993 async fn get_state_event(
994 &self,
995 room_id: &RoomId,
996 event_type: StateEventType,
997 state_key: &str,
998 ) -> Result<Option<RawAnySyncOrStrippedState>> {
999 Ok(self
1000 .get_state_events_for_keys(room_id, event_type, &[state_key])
1001 .await?
1002 .into_iter()
1003 .next())
1004 }
1005
1006 async fn get_state_events(
1007 &self,
1008 room_id: &RoomId,
1009 event_type: StateEventType,
1010 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1011 let stripped_range =
1012 self.encode_to_range(keys::STRIPPED_ROOM_STATE, (room_id, &event_type))?;
1013 let stripped_events = self
1014 .inner
1015 .transaction_on_one_with_mode(keys::STRIPPED_ROOM_STATE, IdbTransactionMode::Readonly)?
1016 .object_store(keys::STRIPPED_ROOM_STATE)?
1017 .get_all_with_key(&stripped_range)?
1018 .await?
1019 .iter()
1020 .filter_map(|f| {
1021 self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Stripped)
1022 })
1023 .collect::<Vec<_>>();
1024
1025 if !stripped_events.is_empty() {
1026 return Ok(stripped_events);
1027 }
1028
1029 let range = self.encode_to_range(keys::ROOM_STATE, (room_id, event_type))?;
1030 Ok(self
1031 .inner
1032 .transaction_on_one_with_mode(keys::ROOM_STATE, IdbTransactionMode::Readonly)?
1033 .object_store(keys::ROOM_STATE)?
1034 .get_all_with_key(&range)?
1035 .await?
1036 .iter()
1037 .filter_map(|f| self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Sync))
1038 .collect::<Vec<_>>())
1039 }
1040
1041 async fn get_state_events_for_keys(
1042 &self,
1043 room_id: &RoomId,
1044 event_type: StateEventType,
1045 state_keys: &[&str],
1046 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1047 if state_keys.is_empty() {
1048 return Ok(Vec::new());
1049 }
1050
1051 let mut events = Vec::with_capacity(state_keys.len());
1052
1053 {
1054 let txn = self.inner.transaction_on_one_with_mode(
1055 keys::STRIPPED_ROOM_STATE,
1056 IdbTransactionMode::Readonly,
1057 )?;
1058 let store = txn.object_store(keys::STRIPPED_ROOM_STATE)?;
1059
1060 for state_key in state_keys {
1061 if let Some(event) =
1062 store
1063 .get(&self.encode_key(
1064 keys::STRIPPED_ROOM_STATE,
1065 (room_id, &event_type, state_key),
1066 ))?
1067 .await?
1068 .map(|f| self.deserialize_value(&f))
1069 .transpose()?
1070 {
1071 events.push(RawAnySyncOrStrippedState::Stripped(event));
1072 }
1073 }
1074
1075 if !events.is_empty() {
1076 return Ok(events);
1077 }
1078 }
1079
1080 let txn = self
1081 .inner
1082 .transaction_on_one_with_mode(keys::ROOM_STATE, IdbTransactionMode::Readonly)?;
1083 let store = txn.object_store(keys::ROOM_STATE)?;
1084
1085 for state_key in state_keys {
1086 if let Some(event) = store
1087 .get(&self.encode_key(keys::ROOM_STATE, (room_id, &event_type, state_key)))?
1088 .await?
1089 .map(|f| self.deserialize_value(&f))
1090 .transpose()?
1091 {
1092 events.push(RawAnySyncOrStrippedState::Sync(event));
1093 }
1094 }
1095
1096 Ok(events)
1097 }
1098
1099 async fn get_profile(
1100 &self,
1101 room_id: &RoomId,
1102 user_id: &UserId,
1103 ) -> Result<Option<MinimalRoomMemberEvent>> {
1104 self.inner
1105 .transaction_on_one_with_mode(keys::PROFILES, IdbTransactionMode::Readonly)?
1106 .object_store(keys::PROFILES)?
1107 .get(&self.encode_key(keys::PROFILES, (room_id, user_id)))?
1108 .await?
1109 .map(|f| self.deserialize_value(&f))
1110 .transpose()
1111 }
1112
1113 async fn get_profiles<'a>(
1114 &self,
1115 room_id: &RoomId,
1116 user_ids: &'a [OwnedUserId],
1117 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
1118 if user_ids.is_empty() {
1119 return Ok(BTreeMap::new());
1120 }
1121
1122 let txn = self
1123 .inner
1124 .transaction_on_one_with_mode(keys::PROFILES, IdbTransactionMode::Readonly)?;
1125 let store = txn.object_store(keys::PROFILES)?;
1126
1127 let mut profiles = BTreeMap::new();
1128 for user_id in user_ids {
1129 if let Some(profile) = store
1130 .get(&self.encode_key(keys::PROFILES, (room_id, user_id)))?
1131 .await?
1132 .map(|f| self.deserialize_value(&f))
1133 .transpose()?
1134 {
1135 profiles.insert(user_id.as_ref(), profile);
1136 }
1137 }
1138
1139 Ok(profiles)
1140 }
1141
1142 async fn get_room_infos(&self, room_load_settings: &RoomLoadSettings) -> Result<Vec<RoomInfo>> {
1143 let transaction = self
1144 .inner
1145 .transaction_on_one_with_mode(keys::ROOM_INFOS, IdbTransactionMode::Readonly)?;
1146
1147 let object_store = transaction.object_store(keys::ROOM_INFOS)?;
1148
1149 Ok(match room_load_settings {
1150 RoomLoadSettings::All => object_store
1151 .get_all()?
1152 .await?
1153 .iter()
1154 .map(|room_info| self.deserialize_value::<RoomInfo>(&room_info))
1155 .collect::<Result<_>>()?,
1156
1157 RoomLoadSettings::One(room_id) => {
1158 match object_store.get(&self.encode_key(keys::ROOM_INFOS, room_id))?.await? {
1159 Some(room_info) => vec![self.deserialize_value::<RoomInfo>(&room_info)?],
1160 None => vec![],
1161 }
1162 }
1163 })
1164 }
1165
1166 async fn get_users_with_display_name(
1167 &self,
1168 room_id: &RoomId,
1169 display_name: &DisplayName,
1170 ) -> Result<BTreeSet<OwnedUserId>> {
1171 self.inner
1172 .transaction_on_one_with_mode(keys::DISPLAY_NAMES, IdbTransactionMode::Readonly)?
1173 .object_store(keys::DISPLAY_NAMES)?
1174 .get(&self.encode_key(
1175 keys::DISPLAY_NAMES,
1176 (
1177 room_id,
1178 display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
1179 ),
1180 ))?
1181 .await?
1182 .map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
1183 .unwrap_or_else(|| Ok(Default::default()))
1184 }
1185
1186 async fn get_users_with_display_names<'a>(
1187 &self,
1188 room_id: &RoomId,
1189 display_names: &'a [DisplayName],
1190 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
1191 let mut map = HashMap::new();
1192
1193 if display_names.is_empty() {
1194 return Ok(map);
1195 }
1196
1197 let txn = self
1198 .inner
1199 .transaction_on_one_with_mode(keys::DISPLAY_NAMES, IdbTransactionMode::Readonly)?;
1200 let store = txn.object_store(keys::DISPLAY_NAMES)?;
1201
1202 for display_name in display_names {
1203 if let Some(user_ids) = store
1204 .get(
1205 &self.encode_key(
1206 keys::DISPLAY_NAMES,
1207 (
1208 room_id,
1209 display_name
1210 .as_normalized_str()
1211 .unwrap_or_else(|| display_name.as_raw_str()),
1212 ),
1213 ),
1214 )?
1215 .await?
1216 .map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
1217 .transpose()?
1218 {
1219 map.insert(display_name, user_ids);
1220 }
1221 }
1222
1223 Ok(map)
1224 }
1225
1226 async fn get_account_data_event(
1227 &self,
1228 event_type: GlobalAccountDataEventType,
1229 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
1230 self.inner
1231 .transaction_on_one_with_mode(keys::ACCOUNT_DATA, IdbTransactionMode::Readonly)?
1232 .object_store(keys::ACCOUNT_DATA)?
1233 .get(&self.encode_key(keys::ACCOUNT_DATA, event_type))?
1234 .await?
1235 .map(|f| self.deserialize_value(&f))
1236 .transpose()
1237 }
1238
1239 async fn get_room_account_data_event(
1240 &self,
1241 room_id: &RoomId,
1242 event_type: RoomAccountDataEventType,
1243 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1244 self.inner
1245 .transaction_on_one_with_mode(keys::ROOM_ACCOUNT_DATA, IdbTransactionMode::Readonly)?
1246 .object_store(keys::ROOM_ACCOUNT_DATA)?
1247 .get(&self.encode_key(keys::ROOM_ACCOUNT_DATA, (room_id, event_type)))?
1248 .await?
1249 .map(|f| self.deserialize_value(&f))
1250 .transpose()
1251 }
1252
1253 async fn get_user_room_receipt_event(
1254 &self,
1255 room_id: &RoomId,
1256 receipt_type: ReceiptType,
1257 thread: ReceiptThread,
1258 user_id: &UserId,
1259 ) -> Result<Option<(OwnedEventId, Receipt)>> {
1260 let key = match thread.as_str() {
1261 Some(thread_id) => self
1262 .encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, thread_id, user_id)),
1263 None => self.encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, user_id)),
1264 };
1265 self.inner
1266 .transaction_on_one_with_mode(keys::ROOM_USER_RECEIPTS, IdbTransactionMode::Readonly)?
1267 .object_store(keys::ROOM_USER_RECEIPTS)?
1268 .get(&key)?
1269 .await?
1270 .map(|f| self.deserialize_value(&f))
1271 .transpose()
1272 }
1273
1274 async fn get_event_room_receipt_events(
1275 &self,
1276 room_id: &RoomId,
1277 receipt_type: ReceiptType,
1278 thread: ReceiptThread,
1279 event_id: &EventId,
1280 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
1281 let range = match thread.as_str() {
1282 Some(thread_id) => self.encode_to_range(
1283 keys::ROOM_EVENT_RECEIPTS,
1284 (room_id, receipt_type, thread_id, event_id),
1285 ),
1286 None => {
1287 self.encode_to_range(keys::ROOM_EVENT_RECEIPTS, (room_id, receipt_type, event_id))
1288 }
1289 }?;
1290 let tx = self.inner.transaction_on_one_with_mode(
1291 keys::ROOM_EVENT_RECEIPTS,
1292 IdbTransactionMode::Readonly,
1293 )?;
1294 let store = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
1295
1296 Ok(store
1297 .get_all_with_key(&range)?
1298 .await?
1299 .iter()
1300 .filter_map(|f| self.deserialize_value(&f).ok())
1301 .collect::<Vec<_>>())
1302 }
1303
1304 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1305 let jskey = &JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1306 self.get_custom_value_for_js(jskey).await
1307 }
1308
1309 async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
1310 let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1311
1312 let prev = self.get_custom_value_for_js(&jskey).await?;
1313
1314 let tx =
1315 self.inner.transaction_on_one_with_mode(keys::CUSTOM, IdbTransactionMode::Readwrite)?;
1316
1317 tx.object_store(keys::CUSTOM)?.put_key_val(&jskey, &self.serialize_value(&value)?)?;
1318
1319 tx.await.into_result().map_err(IndexeddbStateStoreError::from)?;
1320 Ok(prev)
1321 }
1322
1323 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1324 let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
1325
1326 let prev = self.get_custom_value_for_js(&jskey).await?;
1327
1328 let tx =
1329 self.inner.transaction_on_one_with_mode(keys::CUSTOM, IdbTransactionMode::Readwrite)?;
1330
1331 tx.object_store(keys::CUSTOM)?.delete(&jskey)?;
1332
1333 tx.await.into_result().map_err(IndexeddbStateStoreError::from)?;
1334 Ok(prev)
1335 }
1336
1337 async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
1338 let direct_stores = [keys::ROOM_INFOS, keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE];
1340
1341 let prefixed_stores = [
1344 keys::PROFILES,
1345 keys::DISPLAY_NAMES,
1346 keys::USER_IDS,
1347 keys::ROOM_STATE,
1348 keys::ROOM_ACCOUNT_DATA,
1349 keys::ROOM_EVENT_RECEIPTS,
1350 keys::ROOM_USER_RECEIPTS,
1351 keys::STRIPPED_ROOM_STATE,
1352 keys::STRIPPED_USER_IDS,
1353 ];
1354
1355 let all_stores = {
1356 let mut v = Vec::new();
1357 v.extend(prefixed_stores);
1358 v.extend(direct_stores);
1359 v
1360 };
1361
1362 let tx = self
1363 .inner
1364 .transaction_on_multi_with_mode(&all_stores, IdbTransactionMode::Readwrite)?;
1365
1366 for store_name in direct_stores {
1367 tx.object_store(store_name)?.delete(&self.encode_key(store_name, room_id))?;
1368 }
1369
1370 for store_name in prefixed_stores {
1371 let store = tx.object_store(store_name)?;
1372 let range = self.encode_to_range(store_name, room_id)?;
1373 for key in store.get_all_keys_with_key(&range)?.await?.iter() {
1374 store.delete(&key)?;
1375 }
1376 }
1377
1378 tx.await.into_result().map_err(|e| e.into())
1379 }
1380
1381 async fn get_user_ids(
1382 &self,
1383 room_id: &RoomId,
1384 memberships: RoomMemberships,
1385 ) -> Result<Vec<OwnedUserId>> {
1386 let ids = self.get_user_ids_inner(room_id, memberships, true).await?;
1387 if !ids.is_empty() {
1388 return Ok(ids);
1389 }
1390 self.get_user_ids_inner(room_id, memberships, false).await
1391 }
1392
1393 async fn save_send_queue_request(
1394 &self,
1395 room_id: &RoomId,
1396 transaction_id: OwnedTransactionId,
1397 created_at: MilliSecondsSinceUnixEpoch,
1398 kind: QueuedRequestKind,
1399 priority: usize,
1400 ) -> Result<()> {
1401 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1402
1403 let tx = self
1404 .inner
1405 .transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?;
1406
1407 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1408
1409 let prev = obj.get(&encoded_key)?.await?;
1414
1415 let mut prev = prev.map_or_else(
1416 || Ok(Vec::new()),
1417 |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1418 )?;
1419
1420 prev.push(PersistedQueuedRequest {
1422 room_id: room_id.to_owned(),
1423 kind: Some(kind),
1424 transaction_id,
1425 error: None,
1426 is_wedged: None,
1427 event: None,
1428 priority: Some(priority),
1429 created_at,
1430 });
1431
1432 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1434
1435 tx.await.into_result()?;
1436
1437 Ok(())
1438 }
1439
1440 async fn update_send_queue_request(
1441 &self,
1442 room_id: &RoomId,
1443 transaction_id: &TransactionId,
1444 kind: QueuedRequestKind,
1445 ) -> Result<bool> {
1446 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1447
1448 let tx = self
1449 .inner
1450 .transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?;
1451
1452 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1453
1454 let prev = obj.get(&encoded_key)?.await?;
1459
1460 let mut prev = prev.map_or_else(
1461 || Ok(Vec::new()),
1462 |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1463 )?;
1464
1465 if let Some(entry) = prev.iter_mut().find(|entry| entry.transaction_id == transaction_id) {
1467 entry.kind = Some(kind);
1468 entry.error = None;
1470 entry.is_wedged = None;
1472 entry.event = None;
1473
1474 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1476 tx.await.into_result()?;
1477
1478 Ok(true)
1479 } else {
1480 Ok(false)
1481 }
1482 }
1483
1484 async fn remove_send_queue_request(
1485 &self,
1486 room_id: &RoomId,
1487 transaction_id: &TransactionId,
1488 ) -> Result<bool> {
1489 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1490
1491 let tx = self.inner.transaction_on_multi_with_mode(
1492 &[keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE],
1493 IdbTransactionMode::Readwrite,
1494 )?;
1495
1496 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1497
1498 if let Some(val) = obj.get(&encoded_key)?.await? {
1503 let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
1504 if let Some(pos) = prev.iter().position(|item| item.transaction_id == transaction_id) {
1505 prev.remove(pos);
1506
1507 if prev.is_empty() {
1508 obj.delete(&encoded_key)?;
1509 } else {
1510 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1511 }
1512
1513 tx.await.into_result()?;
1514 return Ok(true);
1515 }
1516 }
1517
1518 Ok(false)
1519 }
1520
1521 async fn load_send_queue_requests(&self, room_id: &RoomId) -> Result<Vec<QueuedRequest>> {
1522 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1523
1524 let prev = self
1527 .inner
1528 .transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?
1529 .object_store(keys::ROOM_SEND_QUEUE)?
1530 .get(&encoded_key)?
1531 .await?;
1532
1533 let mut prev = prev.map_or_else(
1534 || Ok(Vec::new()),
1535 |val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
1536 )?;
1537
1538 prev.sort_by(|lhs, rhs| rhs.priority.unwrap_or(0).cmp(&lhs.priority.unwrap_or(0)));
1540
1541 Ok(prev.into_iter().filter_map(PersistedQueuedRequest::into_queued_request).collect())
1542 }
1543
1544 async fn update_send_queue_request_status(
1545 &self,
1546 room_id: &RoomId,
1547 transaction_id: &TransactionId,
1548 error: Option<QueueWedgeError>,
1549 ) -> Result<()> {
1550 let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
1551
1552 let tx = self
1553 .inner
1554 .transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?;
1555
1556 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1557
1558 if let Some(val) = obj.get(&encoded_key)?.await? {
1559 let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
1560 if let Some(request) =
1561 prev.iter_mut().find(|item| item.transaction_id == transaction_id)
1562 {
1563 request.is_wedged = None;
1564 request.error = error;
1565 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1566 }
1567 }
1568
1569 tx.await.into_result()?;
1570
1571 Ok(())
1572 }
1573
1574 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>> {
1575 let tx = self
1576 .inner
1577 .transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?;
1578
1579 let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
1580
1581 let all_entries = obj
1582 .get_all()?
1583 .await?
1584 .into_iter()
1585 .map(|item| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&item))
1586 .collect::<Result<Vec<Vec<PersistedQueuedRequest>>, _>>()?
1587 .into_iter()
1588 .flat_map(|vec| vec.into_iter().map(|item| item.room_id))
1589 .collect::<BTreeSet<_>>();
1590
1591 Ok(all_entries.into_iter().collect())
1592 }
1593
1594 async fn save_dependent_queued_request(
1595 &self,
1596 room_id: &RoomId,
1597 parent_txn_id: &TransactionId,
1598 own_txn_id: ChildTransactionId,
1599 created_at: MilliSecondsSinceUnixEpoch,
1600 content: DependentQueuedRequestKind,
1601 ) -> Result<()> {
1602 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1603
1604 let tx = self.inner.transaction_on_one_with_mode(
1605 keys::DEPENDENT_SEND_QUEUE,
1606 IdbTransactionMode::Readwrite,
1607 )?;
1608
1609 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1610
1611 let prev = obj.get(&encoded_key)?.await?;
1614
1615 let mut prev = prev.map_or_else(
1616 || Ok(Vec::new()),
1617 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1618 )?;
1619
1620 prev.push(DependentQueuedRequest {
1622 kind: content,
1623 parent_transaction_id: parent_txn_id.to_owned(),
1624 own_transaction_id: own_txn_id,
1625 parent_key: None,
1626 created_at,
1627 });
1628
1629 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1631
1632 tx.await.into_result()?;
1633
1634 Ok(())
1635 }
1636
1637 async fn update_dependent_queued_request(
1638 &self,
1639 room_id: &RoomId,
1640 own_transaction_id: &ChildTransactionId,
1641 new_content: DependentQueuedRequestKind,
1642 ) -> Result<bool> {
1643 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1644
1645 let tx = self.inner.transaction_on_one_with_mode(
1646 keys::DEPENDENT_SEND_QUEUE,
1647 IdbTransactionMode::Readwrite,
1648 )?;
1649
1650 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1651
1652 let prev = obj.get(&encoded_key)?.await?;
1655
1656 let mut prev = prev.map_or_else(
1657 || Ok(Vec::new()),
1658 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1659 )?;
1660
1661 let mut found = false;
1663 for entry in prev.iter_mut() {
1664 if entry.own_transaction_id == *own_transaction_id {
1665 found = true;
1666 entry.kind = new_content;
1667 break;
1668 }
1669 }
1670
1671 if found {
1672 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1673 tx.await.into_result()?;
1674 }
1675
1676 Ok(found)
1677 }
1678
1679 async fn mark_dependent_queued_requests_as_ready(
1680 &self,
1681 room_id: &RoomId,
1682 parent_txn_id: &TransactionId,
1683 parent_key: SentRequestKey,
1684 ) -> Result<usize> {
1685 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1686
1687 let tx = self.inner.transaction_on_one_with_mode(
1688 keys::DEPENDENT_SEND_QUEUE,
1689 IdbTransactionMode::Readwrite,
1690 )?;
1691
1692 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1693
1694 let prev = obj.get(&encoded_key)?.await?;
1697
1698 let mut prev = prev.map_or_else(
1699 || Ok(Vec::new()),
1700 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1701 )?;
1702
1703 let mut num_updated = 0;
1705 for entry in prev.iter_mut().filter(|entry| entry.parent_transaction_id == parent_txn_id) {
1706 entry.parent_key = Some(parent_key.clone());
1707 num_updated += 1;
1708 }
1709
1710 if num_updated > 0 {
1711 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1712 tx.await.into_result()?;
1713 }
1714
1715 Ok(num_updated)
1716 }
1717
1718 async fn remove_dependent_queued_request(
1719 &self,
1720 room_id: &RoomId,
1721 txn_id: &ChildTransactionId,
1722 ) -> Result<bool> {
1723 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1724
1725 let tx = self.inner.transaction_on_one_with_mode(
1726 keys::DEPENDENT_SEND_QUEUE,
1727 IdbTransactionMode::Readwrite,
1728 )?;
1729
1730 let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
1731
1732 if let Some(val) = obj.get(&encoded_key)?.await? {
1735 let mut prev = self.deserialize_value::<Vec<DependentQueuedRequest>>(&val)?;
1736 if let Some(pos) = prev.iter().position(|item| item.own_transaction_id == *txn_id) {
1737 prev.remove(pos);
1738
1739 if prev.is_empty() {
1740 obj.delete(&encoded_key)?;
1741 } else {
1742 obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
1743 }
1744
1745 tx.await.into_result()?;
1746 return Ok(true);
1747 }
1748 }
1749
1750 Ok(false)
1751 }
1752
1753 async fn load_dependent_queued_requests(
1754 &self,
1755 room_id: &RoomId,
1756 ) -> Result<Vec<DependentQueuedRequest>> {
1757 let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
1758
1759 let prev = self
1761 .inner
1762 .transaction_on_one_with_mode(
1763 keys::DEPENDENT_SEND_QUEUE,
1764 IdbTransactionMode::Readwrite,
1765 )?
1766 .object_store(keys::DEPENDENT_SEND_QUEUE)?
1767 .get(&encoded_key)?
1768 .await?;
1769
1770 prev.map_or_else(
1771 || Ok(Vec::new()),
1772 |val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
1773 )
1774 }
1775});
1776
1777#[derive(Debug, Serialize, Deserialize)]
1779struct RoomMember {
1780 user_id: OwnedUserId,
1781 membership: MembershipState,
1782}
1783
1784impl From<&SyncStateEvent<RoomMemberEventContent>> for RoomMember {
1785 fn from(event: &SyncStateEvent<RoomMemberEventContent>) -> Self {
1786 Self { user_id: event.state_key().clone(), membership: event.membership().clone() }
1787 }
1788}
1789
1790impl From<&StrippedRoomMemberEvent> for RoomMember {
1791 fn from(event: &StrippedRoomMemberEvent) -> Self {
1792 Self { user_id: event.state_key.clone(), membership: event.content.membership.clone() }
1793 }
1794}
1795
1796#[cfg(test)]
1797mod migration_tests {
1798 use assert_matches2::assert_matches;
1799 use matrix_sdk_base::store::{QueuedRequestKind, SerializableEventContent};
1800 use ruma::{
1801 events::room::message::RoomMessageEventContent, room_id, OwnedRoomId, OwnedTransactionId,
1802 TransactionId,
1803 };
1804 use serde::{Deserialize, Serialize};
1805
1806 use crate::state_store::PersistedQueuedRequest;
1807
1808 #[derive(Serialize, Deserialize)]
1809 struct OldPersistedQueuedRequest {
1810 room_id: OwnedRoomId,
1811 event: SerializableEventContent,
1812 transaction_id: OwnedTransactionId,
1813 is_wedged: bool,
1814 }
1815
1816 #[test]
1820 fn test_migrating_persisted_queue_event_serialization() {
1821 let room_a_id = room_id!("!room_a:dummy.local");
1822 let transaction_id = TransactionId::new();
1823 let content =
1824 SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())
1825 .unwrap();
1826
1827 let old_persisted_queue_event = OldPersistedQueuedRequest {
1828 room_id: room_a_id.to_owned(),
1829 event: content,
1830 transaction_id: transaction_id.clone(),
1831 is_wedged: true,
1832 };
1833
1834 let serialized_persisted = serde_json::to_vec(&old_persisted_queue_event).unwrap();
1835
1836 let new_persisted: PersistedQueuedRequest =
1838 serde_json::from_slice(&serialized_persisted).unwrap();
1839
1840 assert_eq!(new_persisted.is_wedged, Some(true));
1841 assert!(new_persisted.error.is_none());
1842
1843 assert!(new_persisted.event.is_some());
1844 assert!(new_persisted.kind.is_none());
1845
1846 let queued = new_persisted.into_queued_request().unwrap();
1847 assert_matches!(queued.kind, QueuedRequestKind::Event { .. });
1848 assert_eq!(queued.transaction_id, transaction_id);
1849 assert!(queued.error.is_some());
1850 }
1851}
1852
1853#[cfg(all(test, target_arch = "wasm32"))]
1854mod tests {
1855 #[cfg(target_arch = "wasm32")]
1856 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
1857
1858 use matrix_sdk_base::statestore_integration_tests;
1859 use uuid::Uuid;
1860
1861 use super::{IndexeddbStateStore, Result};
1862
1863 async fn get_store() -> Result<IndexeddbStateStore> {
1864 let db_name = format!("test-state-plain-{}", Uuid::new_v4().as_hyphenated());
1865 Ok(IndexeddbStateStore::builder().name(db_name).build().await?)
1866 }
1867
1868 statestore_integration_tests!();
1869}
1870
1871#[cfg(all(test, target_arch = "wasm32"))]
1872mod encrypted_tests {
1873 #[cfg(target_arch = "wasm32")]
1874 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
1875
1876 use matrix_sdk_base::statestore_integration_tests;
1877 use uuid::Uuid;
1878
1879 use super::{IndexeddbStateStore, Result};
1880
1881 async fn get_store() -> Result<IndexeddbStateStore> {
1882 let db_name = format!("test-state-encrypted-{}", Uuid::new_v4().as_hyphenated());
1883 let passphrase = format!("some_passphrase-{}", Uuid::new_v4().as_hyphenated());
1884 Ok(IndexeddbStateStore::builder().name(db_name).passphrase(passphrase).build().await?)
1885 }
1886
1887 statestore_integration_tests!();
1888}