use std::{
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
sync::Arc,
};
use anyhow::anyhow;
use async_trait::async_trait;
use gloo_utils::format::JsValueSerdeExt;
use growable_bloom_filter::GrowableBloom;
use indexed_db_futures::prelude::*;
use matrix_sdk_base::{
deserialized_responses::{DisplayName, RawAnySyncOrStrippedState},
store::{
ChildTransactionId, ComposerDraft, DependentQueuedRequest, DependentQueuedRequestKind,
QueuedRequest, QueuedRequestKind, SentRequestKey, SerializableEventContent,
ServerCapabilities, StateChanges, StateStore, StoreError,
},
MinimalRoomMemberEvent, RoomInfo, RoomMemberships, StateStoreDataKey, StateStoreDataValue,
};
use matrix_sdk_store_encryption::{Error as EncryptionError, StoreCipher};
use ruma::{
canonical_json::{redact, RedactedBecause},
events::{
presence::PresenceEvent,
receipt::{Receipt, ReceiptThread, ReceiptType},
room::member::{
MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent, SyncRoomMemberEvent,
},
AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, SyncStateEvent,
},
serde::Raw,
CanonicalJsonObject, EventId, OwnedEventId, OwnedMxcUri, OwnedRoomId, OwnedTransactionId,
OwnedUserId, RoomId, RoomVersionId, TransactionId, UserId,
};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tracing::{debug, warn};
use wasm_bindgen::JsValue;
use web_sys::IdbKeyRange;
mod migrations;
pub use self::migrations::MigrationConflictStrategy;
use self::migrations::{upgrade_inner_db, upgrade_meta_db};
use crate::safe_encode::SafeEncode;
#[derive(Debug, thiserror::Error)]
pub enum IndexeddbStateStoreError {
#[error(transparent)]
Json(#[from] serde_json::Error),
#[error(transparent)]
Encryption(#[from] EncryptionError),
#[error("DomException {name} ({code}): {message}")]
DomException { name: String, message: String, code: u16 },
#[error(transparent)]
StoreError(#[from] StoreError),
#[error("Can't migrate {name} from {old_version} to {new_version} without deleting data. See MigrationConflictStrategy for ways to configure.")]
MigrationConflict { name: String, old_version: u32, new_version: u32 },
}
impl From<web_sys::DomException> for IndexeddbStateStoreError {
fn from(frm: web_sys::DomException) -> IndexeddbStateStoreError {
IndexeddbStateStoreError::DomException {
name: frm.name(),
message: frm.message(),
code: frm.code(),
}
}
}
impl From<IndexeddbStateStoreError> for StoreError {
fn from(e: IndexeddbStateStoreError) -> Self {
match e {
IndexeddbStateStoreError::Json(e) => StoreError::Json(e),
IndexeddbStateStoreError::StoreError(e) => e,
IndexeddbStateStoreError::Encryption(e) => StoreError::Encryption(e),
_ => StoreError::backend(e),
}
}
}
mod keys {
pub const INTERNAL_STATE: &str = "matrix-sdk-state";
pub const BACKUPS_META: &str = "backups";
pub const ACCOUNT_DATA: &str = "account_data";
pub const PROFILES: &str = "profiles";
pub const DISPLAY_NAMES: &str = "display_names";
pub const USER_IDS: &str = "user_ids";
pub const ROOM_STATE: &str = "room_state";
pub const ROOM_INFOS: &str = "room_infos";
pub const PRESENCE: &str = "presence";
pub const ROOM_ACCOUNT_DATA: &str = "room_account_data";
pub const ROOM_SEND_QUEUE: &str = "room_send_queue";
pub const DEPENDENT_SEND_QUEUE: &str = "room_dependent_send_queue";
pub const STRIPPED_ROOM_STATE: &str = "stripped_room_state";
pub const STRIPPED_USER_IDS: &str = "stripped_user_ids";
pub const ROOM_USER_RECEIPTS: &str = "room_user_receipts";
pub const ROOM_EVENT_RECEIPTS: &str = "room_event_receipts";
pub const CUSTOM: &str = "custom";
pub const KV: &str = "kv";
pub const ALL_STORES: &[&str] = &[
ACCOUNT_DATA,
PROFILES,
DISPLAY_NAMES,
USER_IDS,
ROOM_STATE,
ROOM_INFOS,
PRESENCE,
ROOM_ACCOUNT_DATA,
STRIPPED_ROOM_STATE,
STRIPPED_USER_IDS,
ROOM_USER_RECEIPTS,
ROOM_EVENT_RECEIPTS,
ROOM_SEND_QUEUE,
DEPENDENT_SEND_QUEUE,
CUSTOM,
KV,
];
pub const STORE_KEY: &str = "store_key";
}
pub use keys::ALL_STORES;
use matrix_sdk_base::store::QueueWedgeError;
fn serialize_value(store_cipher: Option<&StoreCipher>, event: &impl Serialize) -> Result<JsValue> {
Ok(match store_cipher {
Some(cipher) => {
let data = serde_json::to_vec(event)?;
JsValue::from_serde(&cipher.encrypt_value_data(data)?)?
}
None => JsValue::from_serde(event)?,
})
}
fn deserialize_value<T: DeserializeOwned>(
store_cipher: Option<&StoreCipher>,
event: &JsValue,
) -> Result<T> {
match store_cipher {
Some(cipher) => {
use zeroize::Zeroize;
let mut plaintext = cipher.decrypt_value_data(event.into_serde()?)?;
let ret = serde_json::from_slice(&plaintext);
plaintext.zeroize();
Ok(ret?)
}
None => Ok(event.into_serde()?),
}
}
fn encode_key<T>(store_cipher: Option<&StoreCipher>, table_name: &str, key: T) -> JsValue
where
T: SafeEncode,
{
match store_cipher {
Some(cipher) => key.as_secure_string(table_name, cipher),
None => key.as_encoded_string(),
}
.into()
}
fn encode_to_range<T>(
store_cipher: Option<&StoreCipher>,
table_name: &str,
key: T,
) -> Result<IdbKeyRange>
where
T: SafeEncode,
{
match store_cipher {
Some(cipher) => key.encode_to_range_secure(table_name, cipher),
None => key.encode_to_range(),
}
.map_err(|e| IndexeddbStateStoreError::StoreError(StoreError::Backend(anyhow!(e).into())))
}
#[derive(Debug)]
pub struct IndexeddbStateStoreBuilder {
name: Option<String>,
passphrase: Option<String>,
migration_conflict_strategy: MigrationConflictStrategy,
}
impl IndexeddbStateStoreBuilder {
fn new() -> Self {
Self {
name: None,
passphrase: None,
migration_conflict_strategy: MigrationConflictStrategy::BackupAndDrop,
}
}
pub fn name(mut self, value: String) -> Self {
self.name = Some(value);
self
}
pub fn passphrase(mut self, value: String) -> Self {
self.passphrase = Some(value);
self
}
pub fn migration_conflict_strategy(mut self, value: MigrationConflictStrategy) -> Self {
self.migration_conflict_strategy = value;
self
}
pub async fn build(self) -> Result<IndexeddbStateStore> {
let migration_strategy = self.migration_conflict_strategy.clone();
let name = self.name.unwrap_or_else(|| "state".to_owned());
let meta_name = format!("{name}::{}", keys::INTERNAL_STATE);
let (meta, store_cipher) = upgrade_meta_db(&meta_name, self.passphrase.as_deref()).await?;
let inner =
upgrade_inner_db(&name, store_cipher.as_deref(), migration_strategy, &meta).await?;
Ok(IndexeddbStateStore { name, inner, meta, store_cipher })
}
}
pub struct IndexeddbStateStore {
name: String,
pub(crate) inner: IdbDatabase,
pub(crate) meta: IdbDatabase,
pub(crate) store_cipher: Option<Arc<StoreCipher>>,
}
#[cfg(not(tarpaulin_include))]
impl std::fmt::Debug for IndexeddbStateStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IndexeddbStateStore").field("name", &self.name).finish()
}
}
type Result<A, E = IndexeddbStateStoreError> = std::result::Result<A, E>;
impl IndexeddbStateStore {
pub fn builder() -> IndexeddbStateStoreBuilder {
IndexeddbStateStoreBuilder::new()
}
pub fn version(&self) -> u32 {
self.inner.version() as u32
}
pub fn meta_version(&self) -> u32 {
self.meta.version() as u32
}
pub async fn has_backups(&self) -> Result<bool> {
Ok(self
.meta
.transaction_on_one_with_mode(keys::BACKUPS_META, IdbTransactionMode::Readonly)?
.object_store(keys::BACKUPS_META)?
.count()?
.await?
> 0)
}
pub async fn latest_backup(&self) -> Result<Option<String>> {
Ok(self
.meta
.transaction_on_one_with_mode(keys::BACKUPS_META, IdbTransactionMode::Readonly)?
.object_store(keys::BACKUPS_META)?
.open_cursor_with_direction(IdbCursorDirection::Prev)?
.await?
.and_then(|c| c.value().as_string()))
}
fn serialize_value(&self, event: &impl Serialize) -> Result<JsValue> {
serialize_value(self.store_cipher.as_deref(), event)
}
fn deserialize_value<T: DeserializeOwned>(&self, event: &JsValue) -> Result<T> {
deserialize_value(self.store_cipher.as_deref(), event)
}
fn encode_key<T>(&self, table_name: &str, key: T) -> JsValue
where
T: SafeEncode,
{
encode_key(self.store_cipher.as_deref(), table_name, key)
}
fn encode_to_range<T>(&self, table_name: &str, key: T) -> Result<IdbKeyRange>
where
T: SafeEncode,
{
encode_to_range(self.store_cipher.as_deref(), table_name, key)
}
pub async fn get_user_ids_inner(
&self,
room_id: &RoomId,
memberships: RoomMemberships,
stripped: bool,
) -> Result<Vec<OwnedUserId>> {
let store_name = if stripped { keys::STRIPPED_USER_IDS } else { keys::USER_IDS };
let tx =
self.inner.transaction_on_one_with_mode(store_name, IdbTransactionMode::Readonly)?;
let store = tx.object_store(store_name)?;
let range = self.encode_to_range(store_name, room_id)?;
let user_ids = if memberships.is_empty() {
store
.get_all_with_key(&range)?
.await?
.iter()
.filter_map(|f| self.deserialize_value::<RoomMember>(&f).ok().map(|m| m.user_id))
.collect::<Vec<_>>()
} else {
let mut user_ids = Vec::new();
let cursor = store.open_cursor_with_range(&range)?.await?;
if let Some(cursor) = cursor {
loop {
let value = cursor.value();
let member = self.deserialize_value::<RoomMember>(&value)?;
if memberships.matches(&member.membership) {
user_ids.push(member.user_id);
}
if !cursor.continue_cursor()?.await? {
break;
}
}
}
user_ids
};
Ok(user_ids)
}
async fn get_custom_value_for_js(&self, jskey: &JsValue) -> Result<Option<Vec<u8>>> {
self.inner
.transaction_on_one_with_mode(keys::CUSTOM, IdbTransactionMode::Readonly)?
.object_store(keys::CUSTOM)?
.get(jskey)?
.await?
.map(|f| self.deserialize_value(&f))
.transpose()
}
fn encode_kv_data_key(&self, key: StateStoreDataKey<'_>) -> JsValue {
match key {
StateStoreDataKey::SyncToken => {
self.encode_key(StateStoreDataKey::SYNC_TOKEN, StateStoreDataKey::SYNC_TOKEN)
}
StateStoreDataKey::ServerCapabilities => self.encode_key(
StateStoreDataKey::SERVER_CAPABILITIES,
StateStoreDataKey::SERVER_CAPABILITIES,
),
StateStoreDataKey::Filter(filter_name) => {
self.encode_key(StateStoreDataKey::FILTER, (StateStoreDataKey::FILTER, filter_name))
}
StateStoreDataKey::UserAvatarUrl(user_id) => {
self.encode_key(keys::KV, (StateStoreDataKey::USER_AVATAR_URL, user_id))
}
StateStoreDataKey::RecentlyVisitedRooms(user_id) => {
self.encode_key(keys::KV, (StateStoreDataKey::RECENTLY_VISITED_ROOMS, user_id))
}
StateStoreDataKey::UtdHookManagerData => {
self.encode_key(keys::KV, StateStoreDataKey::UTD_HOOK_MANAGER_DATA)
}
StateStoreDataKey::ComposerDraft(room_id) => {
self.encode_key(keys::KV, (StateStoreDataKey::COMPOSER_DRAFT, room_id))
}
StateStoreDataKey::SeenKnockRequests(room_id) => {
self.encode_key(keys::KV, (StateStoreDataKey::SEEN_KNOCK_REQUESTS, room_id))
}
}
}
}
#[derive(Serialize, Deserialize)]
struct PersistedQueuedRequest {
pub room_id: OwnedRoomId,
kind: Option<QueuedRequestKind>,
transaction_id: OwnedTransactionId,
pub error: Option<QueueWedgeError>,
priority: Option<usize>,
is_wedged: Option<bool>,
event: Option<SerializableEventContent>,
}
impl PersistedQueuedRequest {
fn into_queued_request(self) -> Option<QueuedRequest> {
let kind =
self.kind.or_else(|| self.event.map(|content| QueuedRequestKind::Event { content }))?;
let error = match self.is_wedged {
Some(true) => {
Some(QueueWedgeError::GenericApiError {
msg: "local echo failed to send in a previous session".into(),
})
}
_ => self.error,
};
let priority = self.priority.unwrap_or(0);
Some(QueuedRequest { kind, transaction_id: self.transaction_id, error, priority })
}
}
#[cfg(target_arch = "wasm32")]
macro_rules! impl_state_store {
({ $($body:tt)* }) => {
#[async_trait(?Send)]
impl StateStore for IndexeddbStateStore {
type Error = IndexeddbStateStoreError;
$($body)*
}
};
}
#[cfg(not(target_arch = "wasm32"))]
macro_rules! impl_state_store {
({ $($body:tt)* }) => {
impl IndexeddbStateStore {
$($body)*
}
};
}
impl_state_store!({
async fn get_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<Option<StateStoreDataValue>> {
let encoded_key = self.encode_kv_data_key(key);
let value = self
.inner
.transaction_on_one_with_mode(keys::KV, IdbTransactionMode::Readonly)?
.object_store(keys::KV)?
.get(&encoded_key)?
.await?;
let value = match key {
StateStoreDataKey::SyncToken => value
.map(|f| self.deserialize_value::<String>(&f))
.transpose()?
.map(StateStoreDataValue::SyncToken),
StateStoreDataKey::ServerCapabilities => value
.map(|f| self.deserialize_value::<ServerCapabilities>(&f))
.transpose()?
.map(StateStoreDataValue::ServerCapabilities),
StateStoreDataKey::Filter(_) => value
.map(|f| self.deserialize_value::<String>(&f))
.transpose()?
.map(StateStoreDataValue::Filter),
StateStoreDataKey::UserAvatarUrl(_) => value
.map(|f| self.deserialize_value::<OwnedMxcUri>(&f))
.transpose()?
.map(StateStoreDataValue::UserAvatarUrl),
StateStoreDataKey::RecentlyVisitedRooms(_) => value
.map(|f| self.deserialize_value::<Vec<OwnedRoomId>>(&f))
.transpose()?
.map(StateStoreDataValue::RecentlyVisitedRooms),
StateStoreDataKey::UtdHookManagerData => value
.map(|f| self.deserialize_value::<GrowableBloom>(&f))
.transpose()?
.map(StateStoreDataValue::UtdHookManagerData),
StateStoreDataKey::ComposerDraft(_) => value
.map(|f| self.deserialize_value::<ComposerDraft>(&f))
.transpose()?
.map(StateStoreDataValue::ComposerDraft),
StateStoreDataKey::SeenKnockRequests(_) => value
.map(|f| self.deserialize_value::<BTreeMap<OwnedEventId, OwnedUserId>>(&f))
.transpose()?
.map(StateStoreDataValue::SeenKnockRequests),
};
Ok(value)
}
async fn set_kv_data(
&self,
key: StateStoreDataKey<'_>,
value: StateStoreDataValue,
) -> Result<()> {
let encoded_key = self.encode_kv_data_key(key);
let serialized_value = match key {
StateStoreDataKey::SyncToken => self
.serialize_value(&value.into_sync_token().expect("Session data not a sync token")),
StateStoreDataKey::ServerCapabilities => self.serialize_value(
&value
.into_server_capabilities()
.expect("Session data not containing server capabilities"),
),
StateStoreDataKey::Filter(_) => {
self.serialize_value(&value.into_filter().expect("Session data not a filter"))
}
StateStoreDataKey::UserAvatarUrl(_) => self.serialize_value(
&value.into_user_avatar_url().expect("Session data not an user avatar url"),
),
StateStoreDataKey::RecentlyVisitedRooms(_) => self.serialize_value(
&value
.into_recently_visited_rooms()
.expect("Session data not a recently visited room list"),
),
StateStoreDataKey::UtdHookManagerData => self.serialize_value(
&value.into_utd_hook_manager_data().expect("Session data not UtdHookManagerData"),
),
StateStoreDataKey::ComposerDraft(_) => self.serialize_value(
&value.into_composer_draft().expect("Session data not a composer draft"),
),
StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value(
&value
.into_seen_knock_requests()
.expect("Session data is not a set of seen knock request ids"),
),
};
let tx =
self.inner.transaction_on_one_with_mode(keys::KV, IdbTransactionMode::Readwrite)?;
let obj = tx.object_store(keys::KV)?;
obj.put_key_val(&encoded_key, &serialized_value?)?;
tx.await.into_result()?;
Ok(())
}
async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<()> {
let encoded_key = self.encode_kv_data_key(key);
let tx =
self.inner.transaction_on_one_with_mode(keys::KV, IdbTransactionMode::Readwrite)?;
let obj = tx.object_store(keys::KV)?;
obj.delete(&encoded_key)?;
tx.await.into_result()?;
Ok(())
}
async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
let mut stores: HashSet<&'static str> = [
(changes.sync_token.is_some(), keys::KV),
(!changes.ambiguity_maps.is_empty(), keys::DISPLAY_NAMES),
(!changes.account_data.is_empty(), keys::ACCOUNT_DATA),
(!changes.presence.is_empty(), keys::PRESENCE),
(
!changes.profiles.is_empty() || !changes.profiles_to_delete.is_empty(),
keys::PROFILES,
),
(!changes.room_account_data.is_empty(), keys::ROOM_ACCOUNT_DATA),
(!changes.receipts.is_empty(), keys::ROOM_EVENT_RECEIPTS),
]
.iter()
.filter_map(|(id, key)| if *id { Some(*key) } else { None })
.collect();
if !changes.state.is_empty() {
stores.extend([
keys::ROOM_STATE,
keys::USER_IDS,
keys::STRIPPED_USER_IDS,
keys::STRIPPED_ROOM_STATE,
keys::PROFILES,
]);
}
if !changes.redactions.is_empty() {
stores.extend([keys::ROOM_STATE, keys::ROOM_INFOS]);
}
if !changes.room_infos.is_empty() {
stores.insert(keys::ROOM_INFOS);
}
if !changes.stripped_state.is_empty() {
stores.extend([keys::STRIPPED_ROOM_STATE, keys::STRIPPED_USER_IDS]);
}
if !changes.receipts.is_empty() {
stores.extend([keys::ROOM_EVENT_RECEIPTS, keys::ROOM_USER_RECEIPTS])
}
if stores.is_empty() {
return Ok(());
}
let stores: Vec<&'static str> = stores.into_iter().collect();
let tx =
self.inner.transaction_on_multi_with_mode(&stores, IdbTransactionMode::Readwrite)?;
if let Some(s) = &changes.sync_token {
tx.object_store(keys::KV)?.put_key_val(
&self.encode_kv_data_key(StateStoreDataKey::SyncToken),
&self.serialize_value(s)?,
)?;
}
if !changes.ambiguity_maps.is_empty() {
let store = tx.object_store(keys::DISPLAY_NAMES)?;
for (room_id, ambiguity_maps) in &changes.ambiguity_maps {
for (display_name, map) in ambiguity_maps {
let key = self.encode_key(
keys::DISPLAY_NAMES,
(
room_id,
display_name
.as_normalized_str()
.unwrap_or_else(|| display_name.as_raw_str()),
),
);
store.put_key_val(&key, &self.serialize_value(&map)?)?;
}
}
}
if !changes.account_data.is_empty() {
let store = tx.object_store(keys::ACCOUNT_DATA)?;
for (event_type, event) in &changes.account_data {
store.put_key_val(
&self.encode_key(keys::ACCOUNT_DATA, event_type),
&self.serialize_value(&event)?,
)?;
}
}
if !changes.room_account_data.is_empty() {
let store = tx.object_store(keys::ROOM_ACCOUNT_DATA)?;
for (room, events) in &changes.room_account_data {
for (event_type, event) in events {
let key = self.encode_key(keys::ROOM_ACCOUNT_DATA, (room, event_type));
store.put_key_val(&key, &self.serialize_value(&event)?)?;
}
}
}
if !changes.state.is_empty() {
let state = tx.object_store(keys::ROOM_STATE)?;
let profiles = tx.object_store(keys::PROFILES)?;
let user_ids = tx.object_store(keys::USER_IDS)?;
let stripped_state = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
let stripped_user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
for (room, user_ids) in &changes.profiles_to_delete {
for user_id in user_ids {
let key = self.encode_key(keys::PROFILES, (room, user_id));
profiles.delete(&key)?;
}
}
for (room, event_types) in &changes.state {
let profile_changes = changes.profiles.get(room);
for (event_type, events) in event_types {
for (state_key, raw_event) in events {
let key = self.encode_key(keys::ROOM_STATE, (room, event_type, state_key));
state.put_key_val(&key, &self.serialize_value(&raw_event)?)?;
stripped_state.delete(&key)?;
if *event_type == StateEventType::RoomMember {
let event = match raw_event.deserialize_as::<SyncRoomMemberEvent>() {
Ok(ev) => ev,
Err(e) => {
let event_id: Option<String> =
raw_event.get_field("event_id").ok().flatten();
debug!(event_id, "Failed to deserialize member event: {e}");
continue;
}
};
let key = (room, state_key);
stripped_user_ids
.delete(&self.encode_key(keys::STRIPPED_USER_IDS, key))?;
user_ids.put_key_val_owned(
self.encode_key(keys::USER_IDS, key),
&self.serialize_value(&RoomMember::from(&event))?,
)?;
if let Some(profile) =
profile_changes.and_then(|p| p.get(event.state_key()))
{
profiles.put_key_val_owned(
self.encode_key(keys::PROFILES, key),
&self.serialize_value(&profile)?,
)?;
}
}
}
}
}
}
if !changes.room_infos.is_empty() {
let room_infos = tx.object_store(keys::ROOM_INFOS)?;
for (room_id, room_info) in &changes.room_infos {
room_infos.put_key_val(
&self.encode_key(keys::ROOM_INFOS, room_id),
&self.serialize_value(&room_info)?,
)?;
}
}
if !changes.presence.is_empty() {
let store = tx.object_store(keys::PRESENCE)?;
for (sender, event) in &changes.presence {
store.put_key_val(
&self.encode_key(keys::PRESENCE, sender),
&self.serialize_value(&event)?,
)?;
}
}
if !changes.stripped_state.is_empty() {
let store = tx.object_store(keys::STRIPPED_ROOM_STATE)?;
let user_ids = tx.object_store(keys::STRIPPED_USER_IDS)?;
for (room, event_types) in &changes.stripped_state {
for (event_type, events) in event_types {
for (state_key, raw_event) in events {
let key = self
.encode_key(keys::STRIPPED_ROOM_STATE, (room, event_type, state_key));
store.put_key_val(&key, &self.serialize_value(&raw_event)?)?;
if *event_type == StateEventType::RoomMember {
let event = match raw_event.deserialize_as::<StrippedRoomMemberEvent>()
{
Ok(ev) => ev,
Err(e) => {
let event_id: Option<String> =
raw_event.get_field("event_id").ok().flatten();
debug!(
event_id,
"Failed to deserialize stripped member event: {e}"
);
continue;
}
};
let key = (room, state_key);
user_ids.put_key_val_owned(
self.encode_key(keys::STRIPPED_USER_IDS, key),
&self.serialize_value(&RoomMember::from(&event))?,
)?;
}
}
}
}
}
if !changes.receipts.is_empty() {
let room_user_receipts = tx.object_store(keys::ROOM_USER_RECEIPTS)?;
let room_event_receipts = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
for (room, content) in &changes.receipts {
for (event_id, receipts) in &content.0 {
for (receipt_type, receipts) in receipts {
for (user_id, receipt) in receipts {
let key = match receipt.thread.as_str() {
Some(thread_id) => self.encode_key(
keys::ROOM_USER_RECEIPTS,
(room, receipt_type, thread_id, user_id),
),
None => self.encode_key(
keys::ROOM_USER_RECEIPTS,
(room, receipt_type, user_id),
),
};
if let Some((old_event, _)) =
room_user_receipts.get(&key)?.await?.and_then(|f| {
self.deserialize_value::<(OwnedEventId, Receipt)>(&f).ok()
})
{
let key = match receipt.thread.as_str() {
Some(thread_id) => self.encode_key(
keys::ROOM_EVENT_RECEIPTS,
(room, receipt_type, thread_id, old_event, user_id),
),
None => self.encode_key(
keys::ROOM_EVENT_RECEIPTS,
(room, receipt_type, old_event, user_id),
),
};
room_event_receipts.delete(&key)?;
}
room_user_receipts
.put_key_val(&key, &self.serialize_value(&(event_id, receipt))?)?;
let key = match receipt.thread.as_str() {
Some(thread_id) => self.encode_key(
keys::ROOM_EVENT_RECEIPTS,
(room, receipt_type, thread_id, event_id, user_id),
),
None => self.encode_key(
keys::ROOM_EVENT_RECEIPTS,
(room, receipt_type, event_id, user_id),
),
};
room_event_receipts
.put_key_val(&key, &self.serialize_value(&(user_id, receipt))?)?;
}
}
}
}
}
if !changes.redactions.is_empty() {
let state = tx.object_store(keys::ROOM_STATE)?;
let room_info = tx.object_store(keys::ROOM_INFOS)?;
for (room_id, redactions) in &changes.redactions {
let range = self.encode_to_range(keys::ROOM_STATE, room_id)?;
let Some(cursor) = state.open_cursor_with_range(&range)?.await? else { continue };
let mut room_version = None;
while let Some(key) = cursor.key() {
let raw_evt =
self.deserialize_value::<Raw<AnySyncStateEvent>>(&cursor.value())?;
if let Ok(Some(event_id)) = raw_evt.get_field::<OwnedEventId>("event_id") {
if let Some(redaction) = redactions.get(&event_id) {
let version = {
if room_version.is_none() {
room_version.replace(room_info
.get(&self.encode_key(keys::ROOM_INFOS, room_id))?
.await?
.and_then(|f| self.deserialize_value::<RoomInfo>(&f).ok())
.and_then(|info| info.room_version().cloned())
.unwrap_or_else(|| {
warn!(?room_id, "Unable to find the room version, assume version 9");
RoomVersionId::V9
})
);
}
room_version.as_ref().unwrap()
};
let redacted = redact(
raw_evt.deserialize_as::<CanonicalJsonObject>()?,
version,
Some(RedactedBecause::from_raw_event(redaction)?),
)
.map_err(StoreError::Redaction)?;
state.put_key_val(&key, &self.serialize_value(&redacted)?)?;
}
}
cursor.advance(1)?.await?;
}
}
}
tx.await.into_result().map_err(|e| e.into())
}
async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
self.inner
.transaction_on_one_with_mode(keys::PRESENCE, IdbTransactionMode::Readonly)?
.object_store(keys::PRESENCE)?
.get(&self.encode_key(keys::PRESENCE, user_id))?
.await?
.map(|f| self.deserialize_value(&f))
.transpose()
}
async fn get_presence_events(
&self,
user_ids: &[OwnedUserId],
) -> Result<Vec<Raw<PresenceEvent>>> {
if user_ids.is_empty() {
return Ok(Vec::new());
}
let txn = self
.inner
.transaction_on_one_with_mode(keys::PRESENCE, IdbTransactionMode::Readonly)?;
let store = txn.object_store(keys::PRESENCE)?;
let mut events = Vec::with_capacity(user_ids.len());
for user_id in user_ids {
if let Some(event) = store
.get(&self.encode_key(keys::PRESENCE, user_id))?
.await?
.map(|f| self.deserialize_value(&f))
.transpose()?
{
events.push(event)
}
}
Ok(events)
}
async fn get_state_event(
&self,
room_id: &RoomId,
event_type: StateEventType,
state_key: &str,
) -> Result<Option<RawAnySyncOrStrippedState>> {
Ok(self
.get_state_events_for_keys(room_id, event_type, &[state_key])
.await?
.into_iter()
.next())
}
async fn get_state_events(
&self,
room_id: &RoomId,
event_type: StateEventType,
) -> Result<Vec<RawAnySyncOrStrippedState>> {
let stripped_range =
self.encode_to_range(keys::STRIPPED_ROOM_STATE, (room_id, &event_type))?;
let stripped_events = self
.inner
.transaction_on_one_with_mode(keys::STRIPPED_ROOM_STATE, IdbTransactionMode::Readonly)?
.object_store(keys::STRIPPED_ROOM_STATE)?
.get_all_with_key(&stripped_range)?
.await?
.iter()
.filter_map(|f| {
self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Stripped)
})
.collect::<Vec<_>>();
if !stripped_events.is_empty() {
return Ok(stripped_events);
}
let range = self.encode_to_range(keys::ROOM_STATE, (room_id, event_type))?;
Ok(self
.inner
.transaction_on_one_with_mode(keys::ROOM_STATE, IdbTransactionMode::Readonly)?
.object_store(keys::ROOM_STATE)?
.get_all_with_key(&range)?
.await?
.iter()
.filter_map(|f| self.deserialize_value(&f).ok().map(RawAnySyncOrStrippedState::Sync))
.collect::<Vec<_>>())
}
async fn get_state_events_for_keys(
&self,
room_id: &RoomId,
event_type: StateEventType,
state_keys: &[&str],
) -> Result<Vec<RawAnySyncOrStrippedState>> {
if state_keys.is_empty() {
return Ok(Vec::new());
}
let mut events = Vec::with_capacity(state_keys.len());
{
let txn = self.inner.transaction_on_one_with_mode(
keys::STRIPPED_ROOM_STATE,
IdbTransactionMode::Readonly,
)?;
let store = txn.object_store(keys::STRIPPED_ROOM_STATE)?;
for state_key in state_keys {
if let Some(event) =
store
.get(&self.encode_key(
keys::STRIPPED_ROOM_STATE,
(room_id, &event_type, state_key),
))?
.await?
.map(|f| self.deserialize_value(&f))
.transpose()?
{
events.push(RawAnySyncOrStrippedState::Stripped(event));
}
}
if !events.is_empty() {
return Ok(events);
}
}
let txn = self
.inner
.transaction_on_one_with_mode(keys::ROOM_STATE, IdbTransactionMode::Readonly)?;
let store = txn.object_store(keys::ROOM_STATE)?;
for state_key in state_keys {
if let Some(event) = store
.get(&self.encode_key(keys::ROOM_STATE, (room_id, &event_type, state_key)))?
.await?
.map(|f| self.deserialize_value(&f))
.transpose()?
{
events.push(RawAnySyncOrStrippedState::Sync(event));
}
}
Ok(events)
}
async fn get_profile(
&self,
room_id: &RoomId,
user_id: &UserId,
) -> Result<Option<MinimalRoomMemberEvent>> {
self.inner
.transaction_on_one_with_mode(keys::PROFILES, IdbTransactionMode::Readonly)?
.object_store(keys::PROFILES)?
.get(&self.encode_key(keys::PROFILES, (room_id, user_id)))?
.await?
.map(|f| self.deserialize_value(&f))
.transpose()
}
async fn get_profiles<'a>(
&self,
room_id: &RoomId,
user_ids: &'a [OwnedUserId],
) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>> {
if user_ids.is_empty() {
return Ok(BTreeMap::new());
}
let txn = self
.inner
.transaction_on_one_with_mode(keys::PROFILES, IdbTransactionMode::Readonly)?;
let store = txn.object_store(keys::PROFILES)?;
let mut profiles = BTreeMap::new();
for user_id in user_ids {
if let Some(profile) = store
.get(&self.encode_key(keys::PROFILES, (room_id, user_id)))?
.await?
.map(|f| self.deserialize_value(&f))
.transpose()?
{
profiles.insert(user_id.as_ref(), profile);
}
}
Ok(profiles)
}
async fn get_room_infos(&self) -> Result<Vec<RoomInfo>> {
let entries: Vec<_> = self
.inner
.transaction_on_one_with_mode(keys::ROOM_INFOS, IdbTransactionMode::Readonly)?
.object_store(keys::ROOM_INFOS)?
.get_all()?
.await?
.iter()
.filter_map(|f| self.deserialize_value::<RoomInfo>(&f).ok())
.collect();
Ok(entries)
}
async fn get_users_with_display_name(
&self,
room_id: &RoomId,
display_name: &DisplayName,
) -> Result<BTreeSet<OwnedUserId>> {
self.inner
.transaction_on_one_with_mode(keys::DISPLAY_NAMES, IdbTransactionMode::Readonly)?
.object_store(keys::DISPLAY_NAMES)?
.get(&self.encode_key(
keys::DISPLAY_NAMES,
(
room_id,
display_name.as_normalized_str().unwrap_or_else(|| display_name.as_raw_str()),
),
))?
.await?
.map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
.unwrap_or_else(|| Ok(Default::default()))
}
async fn get_users_with_display_names<'a>(
&self,
room_id: &RoomId,
display_names: &'a [DisplayName],
) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>> {
let mut map = HashMap::new();
if display_names.is_empty() {
return Ok(map);
}
let txn = self
.inner
.transaction_on_one_with_mode(keys::DISPLAY_NAMES, IdbTransactionMode::Readonly)?;
let store = txn.object_store(keys::DISPLAY_NAMES)?;
for display_name in display_names {
if let Some(user_ids) = store
.get(
&self.encode_key(
keys::DISPLAY_NAMES,
(
room_id,
display_name
.as_normalized_str()
.unwrap_or_else(|| display_name.as_raw_str()),
),
),
)?
.await?
.map(|f| self.deserialize_value::<BTreeSet<OwnedUserId>>(&f))
.transpose()?
{
map.insert(display_name, user_ids);
}
}
Ok(map)
}
async fn get_account_data_event(
&self,
event_type: GlobalAccountDataEventType,
) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
self.inner
.transaction_on_one_with_mode(keys::ACCOUNT_DATA, IdbTransactionMode::Readonly)?
.object_store(keys::ACCOUNT_DATA)?
.get(&self.encode_key(keys::ACCOUNT_DATA, event_type))?
.await?
.map(|f| self.deserialize_value(&f))
.transpose()
}
async fn get_room_account_data_event(
&self,
room_id: &RoomId,
event_type: RoomAccountDataEventType,
) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
self.inner
.transaction_on_one_with_mode(keys::ROOM_ACCOUNT_DATA, IdbTransactionMode::Readonly)?
.object_store(keys::ROOM_ACCOUNT_DATA)?
.get(&self.encode_key(keys::ROOM_ACCOUNT_DATA, (room_id, event_type)))?
.await?
.map(|f| self.deserialize_value(&f))
.transpose()
}
async fn get_user_room_receipt_event(
&self,
room_id: &RoomId,
receipt_type: ReceiptType,
thread: ReceiptThread,
user_id: &UserId,
) -> Result<Option<(OwnedEventId, Receipt)>> {
let key = match thread.as_str() {
Some(thread_id) => self
.encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, thread_id, user_id)),
None => self.encode_key(keys::ROOM_USER_RECEIPTS, (room_id, receipt_type, user_id)),
};
self.inner
.transaction_on_one_with_mode(keys::ROOM_USER_RECEIPTS, IdbTransactionMode::Readonly)?
.object_store(keys::ROOM_USER_RECEIPTS)?
.get(&key)?
.await?
.map(|f| self.deserialize_value(&f))
.transpose()
}
async fn get_event_room_receipt_events(
&self,
room_id: &RoomId,
receipt_type: ReceiptType,
thread: ReceiptThread,
event_id: &EventId,
) -> Result<Vec<(OwnedUserId, Receipt)>> {
let range = match thread.as_str() {
Some(thread_id) => self.encode_to_range(
keys::ROOM_EVENT_RECEIPTS,
(room_id, receipt_type, thread_id, event_id),
),
None => {
self.encode_to_range(keys::ROOM_EVENT_RECEIPTS, (room_id, receipt_type, event_id))
}
}?;
let tx = self.inner.transaction_on_one_with_mode(
keys::ROOM_EVENT_RECEIPTS,
IdbTransactionMode::Readonly,
)?;
let store = tx.object_store(keys::ROOM_EVENT_RECEIPTS)?;
Ok(store
.get_all_with_key(&range)?
.await?
.iter()
.filter_map(|f| self.deserialize_value(&f).ok())
.collect::<Vec<_>>())
}
async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let jskey = &JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
self.get_custom_value_for_js(jskey).await
}
async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
let prev = self.get_custom_value_for_js(&jskey).await?;
let tx =
self.inner.transaction_on_one_with_mode(keys::CUSTOM, IdbTransactionMode::Readwrite)?;
tx.object_store(keys::CUSTOM)?.put_key_val(&jskey, &self.serialize_value(&value)?)?;
tx.await.into_result().map_err(IndexeddbStateStoreError::from)?;
Ok(prev)
}
async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
let jskey = JsValue::from_str(core::str::from_utf8(key).map_err(StoreError::Codec)?);
let prev = self.get_custom_value_for_js(&jskey).await?;
let tx =
self.inner.transaction_on_one_with_mode(keys::CUSTOM, IdbTransactionMode::Readwrite)?;
tx.object_store(keys::CUSTOM)?.delete(&jskey)?;
tx.await.into_result().map_err(IndexeddbStateStoreError::from)?;
Ok(prev)
}
async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
let direct_stores = [keys::ROOM_INFOS, keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE];
let prefixed_stores = [
keys::PROFILES,
keys::DISPLAY_NAMES,
keys::USER_IDS,
keys::ROOM_STATE,
keys::ROOM_ACCOUNT_DATA,
keys::ROOM_EVENT_RECEIPTS,
keys::ROOM_USER_RECEIPTS,
keys::STRIPPED_ROOM_STATE,
keys::STRIPPED_USER_IDS,
];
let all_stores = {
let mut v = Vec::new();
v.extend(prefixed_stores);
v.extend(direct_stores);
v
};
let tx = self
.inner
.transaction_on_multi_with_mode(&all_stores, IdbTransactionMode::Readwrite)?;
for store_name in direct_stores {
tx.object_store(store_name)?.delete(&self.encode_key(store_name, room_id))?;
}
for store_name in prefixed_stores {
let store = tx.object_store(store_name)?;
let range = self.encode_to_range(store_name, room_id)?;
for key in store.get_all_keys_with_key(&range)?.await?.iter() {
store.delete(&key)?;
}
}
tx.await.into_result().map_err(|e| e.into())
}
async fn get_user_ids(
&self,
room_id: &RoomId,
memberships: RoomMemberships,
) -> Result<Vec<OwnedUserId>> {
let ids = self.get_user_ids_inner(room_id, memberships, true).await?;
if !ids.is_empty() {
return Ok(ids);
}
self.get_user_ids_inner(room_id, memberships, false).await
}
async fn save_send_queue_request(
&self,
room_id: &RoomId,
transaction_id: OwnedTransactionId,
kind: QueuedRequestKind,
priority: usize,
) -> Result<()> {
let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
let tx = self
.inner
.transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?;
let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
let prev = obj.get(&encoded_key)?.await?;
let mut prev = prev.map_or_else(
|| Ok(Vec::new()),
|val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
)?;
prev.push(PersistedQueuedRequest {
room_id: room_id.to_owned(),
kind: Some(kind),
transaction_id,
error: None,
is_wedged: None,
event: None,
priority: Some(priority),
});
obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
tx.await.into_result()?;
Ok(())
}
async fn update_send_queue_request(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
kind: QueuedRequestKind,
) -> Result<bool> {
let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
let tx = self
.inner
.transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?;
let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
let prev = obj.get(&encoded_key)?.await?;
let mut prev = prev.map_or_else(
|| Ok(Vec::new()),
|val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
)?;
if let Some(entry) = prev.iter_mut().find(|entry| entry.transaction_id == transaction_id) {
entry.kind = Some(kind);
entry.error = None;
entry.is_wedged = None;
entry.event = None;
obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
tx.await.into_result()?;
Ok(true)
} else {
Ok(false)
}
}
async fn remove_send_queue_request(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
) -> Result<bool> {
let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
let tx = self.inner.transaction_on_multi_with_mode(
&[keys::ROOM_SEND_QUEUE, keys::DEPENDENT_SEND_QUEUE],
IdbTransactionMode::Readwrite,
)?;
let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
if let Some(val) = obj.get(&encoded_key)?.await? {
let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
if let Some(pos) = prev.iter().position(|item| item.transaction_id == transaction_id) {
prev.remove(pos);
if prev.is_empty() {
obj.delete(&encoded_key)?;
} else {
obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
}
tx.await.into_result()?;
return Ok(true);
}
}
Ok(false)
}
async fn load_send_queue_requests(&self, room_id: &RoomId) -> Result<Vec<QueuedRequest>> {
let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
let prev = self
.inner
.transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?
.object_store(keys::ROOM_SEND_QUEUE)?
.get(&encoded_key)?
.await?;
let mut prev = prev.map_or_else(
|| Ok(Vec::new()),
|val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
)?;
prev.sort_by(|lhs, rhs| rhs.priority.unwrap_or(0).cmp(&lhs.priority.unwrap_or(0)));
Ok(prev.into_iter().filter_map(PersistedQueuedRequest::into_queued_request).collect())
}
async fn update_send_queue_request_status(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
error: Option<QueueWedgeError>,
) -> Result<()> {
let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);
let tx = self
.inner
.transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?;
let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
if let Some(val) = obj.get(&encoded_key)?.await? {
let mut prev = self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val)?;
if let Some(request) =
prev.iter_mut().find(|item| item.transaction_id == transaction_id)
{
request.is_wedged = None;
request.error = error;
obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
}
}
tx.await.into_result()?;
Ok(())
}
async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>> {
let tx = self
.inner
.transaction_on_one_with_mode(keys::ROOM_SEND_QUEUE, IdbTransactionMode::Readwrite)?;
let obj = tx.object_store(keys::ROOM_SEND_QUEUE)?;
let all_entries = obj
.get_all()?
.await?
.into_iter()
.map(|item| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&item))
.collect::<Result<Vec<Vec<PersistedQueuedRequest>>, _>>()?
.into_iter()
.flat_map(|vec| vec.into_iter().map(|item| item.room_id))
.collect::<BTreeSet<_>>();
Ok(all_entries.into_iter().collect())
}
async fn save_dependent_queued_request(
&self,
room_id: &RoomId,
parent_txn_id: &TransactionId,
own_txn_id: ChildTransactionId,
content: DependentQueuedRequestKind,
) -> Result<()> {
let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
let tx = self.inner.transaction_on_one_with_mode(
keys::DEPENDENT_SEND_QUEUE,
IdbTransactionMode::Readwrite,
)?;
let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
let prev = obj.get(&encoded_key)?.await?;
let mut prev = prev.map_or_else(
|| Ok(Vec::new()),
|val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
)?;
prev.push(DependentQueuedRequest {
kind: content,
parent_transaction_id: parent_txn_id.to_owned(),
own_transaction_id: own_txn_id,
parent_key: None,
});
obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
tx.await.into_result()?;
Ok(())
}
async fn update_dependent_queued_request(
&self,
room_id: &RoomId,
own_transaction_id: &ChildTransactionId,
new_content: DependentQueuedRequestKind,
) -> Result<bool> {
let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
let tx = self.inner.transaction_on_one_with_mode(
keys::DEPENDENT_SEND_QUEUE,
IdbTransactionMode::Readwrite,
)?;
let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
let prev = obj.get(&encoded_key)?.await?;
let mut prev = prev.map_or_else(
|| Ok(Vec::new()),
|val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
)?;
let mut found = false;
for entry in prev.iter_mut() {
if entry.own_transaction_id == *own_transaction_id {
found = true;
entry.kind = new_content;
break;
}
}
if found {
obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
tx.await.into_result()?;
}
Ok(found)
}
async fn mark_dependent_queued_requests_as_ready(
&self,
room_id: &RoomId,
parent_txn_id: &TransactionId,
parent_key: SentRequestKey,
) -> Result<usize> {
let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
let tx = self.inner.transaction_on_one_with_mode(
keys::DEPENDENT_SEND_QUEUE,
IdbTransactionMode::Readwrite,
)?;
let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
let prev = obj.get(&encoded_key)?.await?;
let mut prev = prev.map_or_else(
|| Ok(Vec::new()),
|val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
)?;
let mut num_updated = 0;
for entry in prev.iter_mut().filter(|entry| entry.parent_transaction_id == parent_txn_id) {
entry.parent_key = Some(parent_key.clone());
num_updated += 1;
}
if num_updated > 0 {
obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
tx.await.into_result()?;
}
Ok(num_updated)
}
async fn remove_dependent_queued_request(
&self,
room_id: &RoomId,
txn_id: &ChildTransactionId,
) -> Result<bool> {
let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
let tx = self.inner.transaction_on_one_with_mode(
keys::DEPENDENT_SEND_QUEUE,
IdbTransactionMode::Readwrite,
)?;
let obj = tx.object_store(keys::DEPENDENT_SEND_QUEUE)?;
if let Some(val) = obj.get(&encoded_key)?.await? {
let mut prev = self.deserialize_value::<Vec<DependentQueuedRequest>>(&val)?;
if let Some(pos) = prev.iter().position(|item| item.own_transaction_id == *txn_id) {
prev.remove(pos);
if prev.is_empty() {
obj.delete(&encoded_key)?;
} else {
obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;
}
tx.await.into_result()?;
return Ok(true);
}
}
Ok(false)
}
async fn load_dependent_queued_requests(
&self,
room_id: &RoomId,
) -> Result<Vec<DependentQueuedRequest>> {
let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);
let prev = self
.inner
.transaction_on_one_with_mode(
keys::DEPENDENT_SEND_QUEUE,
IdbTransactionMode::Readwrite,
)?
.object_store(keys::DEPENDENT_SEND_QUEUE)?
.get(&encoded_key)?
.await?;
prev.map_or_else(
|| Ok(Vec::new()),
|val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
)
}
});
#[derive(Debug, Serialize, Deserialize)]
struct RoomMember {
user_id: OwnedUserId,
membership: MembershipState,
}
impl From<&SyncStateEvent<RoomMemberEventContent>> for RoomMember {
fn from(event: &SyncStateEvent<RoomMemberEventContent>) -> Self {
Self { user_id: event.state_key().clone(), membership: event.membership().clone() }
}
}
impl From<&StrippedRoomMemberEvent> for RoomMember {
fn from(event: &StrippedRoomMemberEvent) -> Self {
Self { user_id: event.state_key.clone(), membership: event.content.membership.clone() }
}
}
#[cfg(test)]
mod migration_tests {
use assert_matches2::assert_matches;
use matrix_sdk_base::store::{QueuedRequestKind, SerializableEventContent};
use ruma::{
events::room::message::RoomMessageEventContent, room_id, OwnedRoomId, OwnedTransactionId,
TransactionId,
};
use serde::{Deserialize, Serialize};
use crate::state_store::PersistedQueuedRequest;
#[derive(Serialize, Deserialize)]
struct OldPersistedQueuedRequest {
room_id: OwnedRoomId,
event: SerializableEventContent,
transaction_id: OwnedTransactionId,
is_wedged: bool,
}
#[test]
fn test_migrating_persisted_queue_event_serialization() {
let room_a_id = room_id!("!room_a:dummy.local");
let transaction_id = TransactionId::new();
let content =
SerializableEventContent::new(&RoomMessageEventContent::text_plain("Hello").into())
.unwrap();
let old_persisted_queue_event = OldPersistedQueuedRequest {
room_id: room_a_id.to_owned(),
event: content,
transaction_id: transaction_id.clone(),
is_wedged: true,
};
let serialized_persisted = serde_json::to_vec(&old_persisted_queue_event).unwrap();
let new_persisted: PersistedQueuedRequest =
serde_json::from_slice(&serialized_persisted).unwrap();
assert_eq!(new_persisted.is_wedged, Some(true));
assert!(new_persisted.error.is_none());
assert!(new_persisted.event.is_some());
assert!(new_persisted.kind.is_none());
let queued = new_persisted.into_queued_request().unwrap();
assert_matches!(queued.kind, QueuedRequestKind::Event { .. });
assert_eq!(queued.transaction_id, transaction_id);
assert!(queued.error.is_some());
}
}
#[cfg(all(test, target_arch = "wasm32"))]
mod tests {
#[cfg(target_arch = "wasm32")]
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
use matrix_sdk_base::statestore_integration_tests;
use uuid::Uuid;
use super::{IndexeddbStateStore, Result};
async fn get_store() -> Result<IndexeddbStateStore> {
let db_name = format!("test-state-plain-{}", Uuid::new_v4().as_hyphenated());
Ok(IndexeddbStateStore::builder().name(db_name).build().await?)
}
statestore_integration_tests!();
}
#[cfg(all(test, target_arch = "wasm32"))]
mod encrypted_tests {
#[cfg(target_arch = "wasm32")]
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
use matrix_sdk_base::statestore_integration_tests;
use uuid::Uuid;
use super::{IndexeddbStateStore, Result};
async fn get_store() -> Result<IndexeddbStateStore> {
let db_name = format!("test-state-encrypted-{}", Uuid::new_v4().as_hyphenated());
let passphrase = format!("some_passphrase-{}", Uuid::new_v4().as_hyphenated());
Ok(IndexeddbStateStore::builder().name(db_name).passphrase(passphrase).build().await?)
}
statestore_integration_tests!();
}