Skip to main content

matrix_sdk_base/store/
mod.rs

1// Copyright 2021 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The state store holds the overall state for rooms, users and their
16//! profiles and their timelines. It is an overall cache for faster access
17//! and convenience- accessible through `Store`.
18//!
19//! Implementing the `StateStore` trait, you can plug any storage backend
20//! into the store for the actual storage. By default this brings an in-memory
21//! store.
22
23use std::{
24    borrow::Borrow,
25    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
26    fmt,
27    ops::Deref,
28    result::Result as StdResult,
29    str::{FromStr, Utf8Error},
30    sync::{Arc, OnceLock, RwLock as StdRwLock},
31};
32
33use eyeball_im::{Vector, VectorDiff};
34use futures_util::Stream;
35use matrix_sdk_common::ROOM_VERSION_RULES_FALLBACK;
36
37#[cfg(any(test, feature = "testing"))]
38#[macro_use]
39pub mod integration_tests;
40mod observable_map;
41mod traits;
42
43use matrix_sdk_common::{cross_process_lock::CrossProcessLockConfig, locks::Mutex as SyncMutex};
44#[cfg(feature = "e2e-encryption")]
45use matrix_sdk_crypto::store::{DynCryptoStore, IntoCryptoStore};
46pub use matrix_sdk_store_encryption::Error as StoreEncryptionError;
47use observable_map::ObservableMap;
48use ruma::{
49    EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId,
50    events::{
51        AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
52        AnySyncStateEvent, EmptyStateKey, GlobalAccountDataEventType, RedactContent,
53        RedactedStateEventContent, RoomAccountDataEventType, StateEventType, StaticEventContent,
54        StaticStateEventContent, StrippedStateEvent, SyncStateEvent,
55        presence::PresenceEvent,
56        receipt::ReceiptEventContent,
57        room::{
58            create::RoomCreateEventContent,
59            member::{RoomMemberEventContent, StrippedRoomMemberEvent},
60            power_levels::{RoomPowerLevels, RoomPowerLevelsEventContent},
61            redaction::SyncRoomRedactionEvent,
62        },
63    },
64    serde::Raw,
65};
66use serde::de::DeserializeOwned;
67use tokio::sync::{Mutex, RwLock, broadcast};
68use tracing::warn;
69pub use traits::compare_thread_subscription_bump_stamps;
70
71use crate::{
72    MinimalRoomMemberEvent, Room, RoomCreateWithCreatorEventContent, RoomStateFilter, SessionMeta,
73    deserialized_responses::DisplayName,
74    event_cache::store as event_cache_store,
75    media::store as media_store,
76    room::{RoomInfo, RoomInfoNotableUpdate, RoomState},
77};
78
79pub(crate) mod ambiguity_map;
80mod avatar_cache;
81mod memory_store;
82pub mod migration_helpers;
83mod send_queue;
84
85pub use avatar_cache::AvatarCache;
86
87#[cfg(any(test, feature = "testing"))]
88pub use self::integration_tests::StateStoreIntegrationTests;
89#[cfg(feature = "unstable-msc4274")]
90pub use self::send_queue::{AccumulatedSentMediaInfo, FinishGalleryItemInfo};
91pub use self::{
92    memory_store::MemoryStore,
93    send_queue::{
94        ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
95        FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
96        SentMediaInfo, SentRequestKey, SerializableEventContent,
97    },
98    traits::{
99        ComposerDraft, ComposerDraftType, DraftAttachment, DraftAttachmentContent, DraftThumbnail,
100        DynStateStore, IncorrectMutexGuardError, IntoStateStore, SaveLockedStateStore, StateStore,
101        StateStoreDataKey, StateStoreDataValue, StateStoreExt, SupportedVersionsResponse,
102        ThreadSubscriptionCatchupToken, WellKnownResponse,
103    },
104};
105
106/// State store specific error type.
107#[derive(Debug, thiserror::Error)]
108pub enum StoreError {
109    /// An error happened in the underlying database backend.
110    #[error(transparent)]
111    Backend(Box<dyn std::error::Error + Send + Sync>),
112
113    /// An error happened while serializing or deserializing some data.
114    #[error(transparent)]
115    Json(#[from] serde_json::Error),
116
117    /// An error happened while deserializing a Matrix identifier, e.g. an user
118    /// id.
119    #[error(transparent)]
120    Identifier(#[from] ruma::IdParseError),
121
122    /// The store is locked with a passphrase and an incorrect passphrase was
123    /// given.
124    #[error("The store failed to be unlocked")]
125    StoreLocked,
126
127    /// An unencrypted store was tried to be unlocked with a passphrase.
128    #[error("The store is not encrypted but was tried to be opened with a passphrase")]
129    UnencryptedStore,
130
131    /// The store failed to encrypt or decrypt some data.
132    #[error("Error encrypting or decrypting data from the store: {0}")]
133    Encryption(#[from] StoreEncryptionError),
134
135    /// The store failed to encode or decode some data.
136    #[error("Error encoding or decoding data from the store: {0}")]
137    Codec(#[from] Utf8Error),
138
139    /// The database format has changed in a backwards incompatible way.
140    #[error(
141        "The database format changed in an incompatible way, current \
142        version: {0}, latest version: {1}"
143    )]
144    UnsupportedDatabaseVersion(usize, usize),
145
146    /// Redacting an event in the store has failed.
147    ///
148    /// This should never happen.
149    #[error("Redaction failed: {0}")]
150    Redaction(#[source] ruma::canonical_json::RedactionError),
151
152    /// The store contains invalid data.
153    #[error("The store contains invalid data: {details}")]
154    InvalidData {
155        /// Details about which data is invalid, and how.
156        details: String,
157    },
158}
159
160impl StoreError {
161    /// Create a new [`Backend`][Self::Backend] error.
162    ///
163    /// Shorthand for `StoreError::Backend(Box::new(error))`.
164    #[inline]
165    pub fn backend<E>(error: E) -> Self
166    where
167        E: std::error::Error + Send + Sync + 'static,
168    {
169        Self::Backend(Box::new(error))
170    }
171}
172
173/// A `StateStore` specific result type.
174pub type Result<T, E = StoreError> = std::result::Result<T, E>;
175
176/// A state store wrapper for the SDK.
177///
178/// This adds additional higher level store functionality on top of a
179/// `StateStore` implementation.
180#[derive(Clone)]
181pub(crate) struct BaseStateStore {
182    pub(super) inner: SaveLockedStateStore,
183    session_meta: Arc<OnceLock<SessionMeta>>,
184    room_load_settings: Arc<RwLock<RoomLoadSettings>>,
185
186    /// A sender that is used to communicate changes to room information. Each
187    /// tick contains the room ID and the reasons that have generated this tick.
188    pub(crate) room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
189
190    /// The current sync token that should be used for the next sync call.
191    pub(super) sync_token: Arc<RwLock<Option<String>>>,
192
193    /// All rooms the store knows about.
194    rooms: Arc<StdRwLock<ObservableMap<OwnedRoomId, Room>>>,
195
196    /// Which rooms have already logged a log line about missing room info, in
197    /// the context of response processors?
198    pub(crate) already_logged_missing_room: Arc<SyncMutex<HashSet<OwnedRoomId>>>,
199}
200
201impl BaseStateStore {
202    /// Create a new store, wrapping the given `StateStore`
203    pub fn new(inner: Arc<DynStateStore>) -> Self {
204        // Create the channel to receive `RoomInfoNotableUpdate`.
205        //
206        // Let's consider the channel will receive 5 updates for 100 rooms maximum. This
207        // is unrealistic in practise, as the sync mechanism is pretty unlikely to
208        // trigger such amount of updates, it's a safe value.
209        //
210        // Also, note that it must not be zero, because (i) it will panic,
211        // (ii) a new user has no room, but can create rooms; remember that the
212        // channel's capacity is immutable.
213        let (room_info_notable_update_sender, _room_info_notable_update_receiver) =
214            broadcast::channel(500);
215
216        Self {
217            inner: SaveLockedStateStore::new(inner),
218            session_meta: Default::default(),
219            room_load_settings: Default::default(),
220            room_info_notable_update_sender,
221            sync_token: Default::default(),
222            rooms: Arc::new(StdRwLock::new(ObservableMap::new())),
223            already_logged_missing_room: Default::default(),
224        }
225    }
226
227    /// Get access to the syncing lock.
228    pub fn lock(&self) -> &Mutex<()> {
229        self.inner.lock()
230    }
231
232    /// Set the [`SessionMeta`] into [`BaseStateStore::session_meta`].
233    ///
234    /// # Panics
235    ///
236    /// Panics if called twice.
237    pub(crate) fn set_session_meta(&self, session_meta: SessionMeta) {
238        self.session_meta.set(session_meta).expect("`SessionMeta` was already set");
239    }
240
241    /// Loads rooms from the given [`DynStateStore`] (in
242    /// [`BaseStateStore::new`]) into [`BaseStateStore::rooms`].
243    pub(crate) async fn load_rooms(
244        &self,
245        user_id: &UserId,
246        room_load_settings: RoomLoadSettings,
247    ) -> Result<()> {
248        *self.room_load_settings.write().await = room_load_settings.clone();
249
250        let room_infos = self.load_and_migrate_room_infos(room_load_settings).await?;
251
252        let mut rooms = self.rooms.write().unwrap();
253
254        for room_info in room_infos {
255            let new_room = Room::restore(
256                user_id,
257                self.inner.clone(),
258                room_info,
259                self.room_info_notable_update_sender.clone(),
260            );
261            let new_room_id = new_room.room_id().to_owned();
262
263            rooms.insert(new_room_id, new_room);
264        }
265
266        Ok(())
267    }
268
269    /// Load room infos from the [`StateStore`] and applies migrations onto
270    /// them.
271    async fn load_and_migrate_room_infos(
272        &self,
273        room_load_settings: RoomLoadSettings,
274    ) -> Result<Vec<RoomInfo>> {
275        let mut room_infos = self.inner.get_room_infos(&room_load_settings).await?;
276        let mut migrated_room_infos = Vec::with_capacity(room_infos.len());
277
278        for room_info in room_infos.iter_mut() {
279            if room_info.apply_migrations(self.inner.clone()).await {
280                migrated_room_infos.push(room_info.clone());
281            }
282        }
283
284        if !migrated_room_infos.is_empty() {
285            let changes = StateChanges {
286                room_infos: migrated_room_infos
287                    .into_iter()
288                    .map(|room_info| (room_info.room_id.clone(), room_info))
289                    .collect(),
290                ..Default::default()
291            };
292
293            if let Err(error) = self.inner.save_changes(&changes).await {
294                warn!("Failed to save migrated room infos: {error}");
295            }
296        }
297
298        Ok(room_infos)
299    }
300
301    /// Load sync token from the [`StateStore`], and put it in
302    /// [`BaseStateStore::sync_token`].
303    pub(crate) async fn load_sync_token(&self) -> Result<()> {
304        let token =
305            self.get_kv_data(StateStoreDataKey::SyncToken).await?.and_then(|s| s.into_sync_token());
306        *self.sync_token.write().await = token;
307
308        Ok(())
309    }
310
311    /// Restore the session meta, sync token and rooms from an existing
312    /// [`BaseStateStore`].
313    #[cfg(any(feature = "e2e-encryption", test))]
314    pub(crate) async fn derive_from_other(&self, other: &Self) -> Result<()> {
315        let Some(session_meta) = other.session_meta.get() else {
316            return Ok(());
317        };
318
319        let room_load_settings = other.room_load_settings.read().await.clone();
320
321        self.load_rooms(&session_meta.user_id, room_load_settings).await?;
322        self.load_sync_token().await?;
323        self.set_session_meta(session_meta.clone());
324
325        Ok(())
326    }
327
328    /// The current [`SessionMeta`] containing our user ID and device ID.
329    pub fn session_meta(&self) -> Option<&SessionMeta> {
330        self.session_meta.get()
331    }
332
333    /// Get all the rooms this store knows about.
334    pub fn rooms(&self) -> Vec<Room> {
335        self.rooms.read().unwrap().iter().cloned().collect()
336    }
337
338    /// Get all the rooms this store knows about, filtered by state.
339    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
340        self.rooms
341            .read()
342            .unwrap()
343            .iter()
344            .filter(|room| filter.matches(room.state()))
345            .cloned()
346            .collect()
347    }
348
349    /// Get a stream of all the rooms changes, in addition to the existing
350    /// rooms.
351    pub fn rooms_stream(
352        &self,
353    ) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + use<>) {
354        self.rooms.read().unwrap().stream()
355    }
356
357    /// Get the room with the given room id.
358    pub fn room(&self, room_id: &RoomId) -> Option<Room> {
359        self.rooms.read().unwrap().get(room_id).cloned()
360    }
361
362    /// Check if a room exists.
363    pub(crate) fn room_exists(&self, room_id: &RoomId) -> bool {
364        self.rooms.read().unwrap().get(room_id).is_some()
365    }
366
367    /// Lookup the `Room` for the given `RoomId`, or create one, if it didn't
368    /// exist yet in the store
369    pub fn get_or_create_room(&self, room_id: &RoomId, room_state: RoomState) -> Room {
370        let user_id =
371            &self.session_meta.get().expect("Creating room while not being logged in").user_id;
372
373        self.rooms
374            .write()
375            .unwrap()
376            .get_or_create(room_id, || {
377                Room::new(
378                    user_id,
379                    self.inner.clone(),
380                    room_id,
381                    room_state,
382                    self.room_info_notable_update_sender.clone(),
383                )
384            })
385            .clone()
386    }
387
388    /// Forget the room with the given room ID.
389    ///
390    /// # Arguments
391    ///
392    /// * `room_id` - The id of the room that should be forgotten.
393    pub(crate) async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
394        self.inner.remove_room(room_id).await?;
395        self.rooms.write().unwrap().remove(room_id);
396        Ok(())
397    }
398}
399
400#[cfg(not(tarpaulin_include))]
401impl fmt::Debug for BaseStateStore {
402    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
403        f.debug_struct("Store")
404            .field("inner", &self.inner)
405            .field("session_meta", &self.session_meta)
406            .field("sync_token", &self.sync_token)
407            .field("rooms", &self.rooms)
408            .finish_non_exhaustive()
409    }
410}
411
412impl Deref for BaseStateStore {
413    type Target = SaveLockedStateStore;
414
415    fn deref(&self) -> &Self::Target {
416        &self.inner
417    }
418}
419
420/// Configure how many rooms will be restored when restoring the session with
421/// `BaseStateStore::load_rooms`.
422///
423/// <div class="warning">
424///
425/// # ⚠️ Be careful!
426///
427/// When loading a single room with [`RoomLoadSettings::One`], the in-memory
428/// state may not reflect the store state (in the databases). Thus, when one
429/// will get a room that exists in the store state but _not_ in the in-memory
430/// state, it will be created from scratch and, when saved, will override the
431/// data in the store state (in the databases). This can lead to weird
432/// behaviours.
433///
434/// This option is expected to be used as follows:
435///
436/// 1. Create a `BaseStateStore` with a [`StateStore`] based on SQLite for
437///    example,
438/// 2. Restore a session and load one room from the [`StateStore`] (in the case
439///    of dealing with a notification for example),
440/// 3. Derive the `BaseStateStore`, with `BaseStateStore::derive_from_other`,
441///    into another one with an in-memory [`StateStore`], such as
442///    [`MemoryStore`],
443/// 4. Work on this derived `BaseStateStore`.
444///
445/// Now, all operations happen in the [`MemoryStore`], not on the original store
446/// (SQLite in this example), thus protecting original data.
447///
448/// From a higher-level point of view, this is what
449/// [`BaseClient::clone_with_in_memory_state_store`] does.
450///
451/// </div>
452///
453/// [`BaseClient::clone_with_in_memory_state_store`]: crate::BaseClient::clone_with_in_memory_state_store
454#[derive(Clone, Debug, Default)]
455pub enum RoomLoadSettings {
456    /// Load all rooms from the [`StateStore`] into the in-memory state store
457    /// `BaseStateStore`.
458    ///
459    /// This is the default variant.
460    #[default]
461    All,
462
463    /// Load a single room from the [`StateStore`] into the in-memory state
464    /// store `BaseStateStore`.
465    ///
466    /// Please, be careful with this option. Read the documentation of
467    /// [`RoomLoadSettings`].
468    One(OwnedRoomId),
469}
470
471/// The subscription status of a thread.
472///
473/// We keep unsubscriptions in the database, because we need the bumpstamp
474/// information (in `ThreadSubscription`) to be around to order subscriptions
475/// and unsubscriptions.
476#[derive(Clone, Copy, Debug, PartialEq, Eq)]
477pub enum ThreadSubscriptionStatus {
478    /// The user is subscribed to the related thread.
479    Subscribed {
480        /// Whether the subscription was made automatically by a client, not by
481        /// manual user choice.
482        automatic: bool,
483    },
484
485    /// The user has been unsubscribed to the related thread.
486    Unsubscribed,
487}
488
489impl FromStr for ThreadSubscriptionStatus {
490    type Err = ();
491
492    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
493        match s {
494            "automatic" => Ok(ThreadSubscriptionStatus::Subscribed { automatic: true }),
495            "manual" => Ok(ThreadSubscriptionStatus::Subscribed { automatic: false }),
496            "unsubscribed" => Ok(ThreadSubscriptionStatus::Unsubscribed),
497            _ => Err(()),
498        }
499    }
500}
501
502impl ThreadSubscriptionStatus {
503    /// Represent the status as a static string ref, for it to be stored into a
504    /// persistent format.
505    ///
506    /// Note: this is serialized in some databases implementations, so make sure
507    /// to not change it lightly, and keep it in sync with
508    /// [`Self::from_str`].
509    pub fn as_str(&self) -> &'static str {
510        match self {
511            ThreadSubscriptionStatus::Subscribed { automatic } => {
512                if *automatic {
513                    "automatic"
514                } else {
515                    "manual"
516                }
517            }
518            ThreadSubscriptionStatus::Unsubscribed => "unsubscribed",
519        }
520    }
521}
522
523/// A thread subscription, as saved in the state store.
524#[derive(Clone, Copy, Debug, PartialEq, Eq)]
525pub struct StoredThreadSubscription {
526    /// Current status of the subscription.
527    pub status: ThreadSubscriptionStatus,
528
529    /// An optional bump stamp, as defined in the MSC; the higher the value, the
530    /// most recent the thread subscription information is, and should be
531    /// remembered.
532    ///
533    /// If not set, this means it's a user-provided thread subscription, for
534    /// which we're waiting validation from a server (e.g. through a remote
535    /// echo via sync).
536    pub bump_stamp: Option<u64>,
537}
538
539/// Store state changes and pass them to the StateStore.
540#[derive(Clone, Debug, Default)]
541pub struct StateChanges {
542    /// The sync token that relates to this update.
543    pub sync_token: Option<String>,
544    /// A mapping of event type string to `AnyBasicEvent`.
545    pub account_data: BTreeMap<GlobalAccountDataEventType, Raw<AnyGlobalAccountDataEvent>>,
546    /// A mapping of `UserId` to `PresenceEvent`.
547    pub presence: BTreeMap<OwnedUserId, Raw<PresenceEvent>>,
548
549    /// A mapping of `RoomId` to a map of users and their
550    /// `MinimalRoomMemberEvent`.
551    pub profiles: BTreeMap<OwnedRoomId, BTreeMap<OwnedUserId, MinimalRoomMemberEvent>>,
552
553    /// A mapping of room profiles to delete.
554    ///
555    /// These are deleted *before* other room profiles are inserted.
556    pub profiles_to_delete: BTreeMap<OwnedRoomId, Vec<OwnedUserId>>,
557
558    /// A mapping of `RoomId` to a map of event type string to a state key and
559    /// `AnySyncStateEvent`.
560    pub state:
561        BTreeMap<OwnedRoomId, BTreeMap<StateEventType, BTreeMap<String, Raw<AnySyncStateEvent>>>>,
562    /// A mapping of `RoomId` to a map of event type string to `AnyBasicEvent`.
563    pub room_account_data:
564        BTreeMap<OwnedRoomId, BTreeMap<RoomAccountDataEventType, Raw<AnyRoomAccountDataEvent>>>,
565
566    /// A map of `OwnedRoomId` to `RoomInfo`.
567    pub room_infos: BTreeMap<OwnedRoomId, RoomInfo>,
568
569    /// A map of `RoomId` to `ReceiptEventContent`.
570    pub receipts: BTreeMap<OwnedRoomId, ReceiptEventContent>,
571
572    /// A map of `RoomId` to maps of `OwnedEventId` to be redacted by
573    /// `SyncRoomRedactionEvent`.
574    pub redactions: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, Raw<SyncRoomRedactionEvent>>>,
575
576    /// A mapping of `RoomId` to a map of event type to a map of state key to
577    /// `StrippedState`.
578    pub stripped_state: BTreeMap<
579        OwnedRoomId,
580        BTreeMap<StateEventType, BTreeMap<String, Raw<AnyStrippedStateEvent>>>,
581    >,
582
583    /// A map from room id to a map of a display name and a set of user ids that
584    /// share that display name in the given room.
585    pub ambiguity_maps: BTreeMap<OwnedRoomId, HashMap<DisplayName, BTreeSet<OwnedUserId>>>,
586}
587
588impl StateChanges {
589    /// Create a new `StateChanges` struct with the given sync_token.
590    pub fn new(sync_token: String) -> Self {
591        Self { sync_token: Some(sync_token), ..Default::default() }
592    }
593
594    /// Update the `StateChanges` struct with the given `PresenceEvent`.
595    pub fn add_presence_event(&mut self, event: PresenceEvent, raw_event: Raw<PresenceEvent>) {
596        self.presence.insert(event.sender, raw_event);
597    }
598
599    /// Update the `StateChanges` struct with the given `RoomInfo`.
600    pub fn add_room(&mut self, room: RoomInfo) {
601        self.room_infos.insert(room.room_id.clone(), room);
602    }
603
604    /// Update the `StateChanges` struct with the given room with a new
605    /// `AnyBasicEvent`.
606    pub fn add_room_account_data(
607        &mut self,
608        room_id: &RoomId,
609        event: AnyRoomAccountDataEvent,
610        raw_event: Raw<AnyRoomAccountDataEvent>,
611    ) {
612        self.room_account_data
613            .entry(room_id.to_owned())
614            .or_default()
615            .insert(event.event_type(), raw_event);
616    }
617
618    /// Update the `StateChanges` struct with the given room with a new
619    /// `StrippedMemberEvent`.
620    pub fn add_stripped_member(
621        &mut self,
622        room_id: &RoomId,
623        user_id: &UserId,
624        event: Raw<StrippedRoomMemberEvent>,
625    ) {
626        self.stripped_state
627            .entry(room_id.to_owned())
628            .or_default()
629            .entry(StateEventType::RoomMember)
630            .or_default()
631            .insert(user_id.into(), event.cast());
632    }
633
634    /// Update the `StateChanges` struct with the given room with a new
635    /// `AnySyncStateEvent`.
636    pub fn add_state_event(
637        &mut self,
638        room_id: &RoomId,
639        event: AnySyncStateEvent,
640        raw_event: Raw<AnySyncStateEvent>,
641    ) {
642        self.state
643            .entry(room_id.to_owned())
644            .or_default()
645            .entry(event.event_type())
646            .or_default()
647            .insert(event.state_key().to_owned(), raw_event);
648    }
649
650    /// Redact an event in the room
651    pub fn add_redaction(
652        &mut self,
653        room_id: &RoomId,
654        redacted_event_id: &EventId,
655        redaction: Raw<SyncRoomRedactionEvent>,
656    ) {
657        self.redactions
658            .entry(room_id.to_owned())
659            .or_default()
660            .insert(redacted_event_id.to_owned(), redaction);
661    }
662
663    /// Update the `StateChanges` struct with the given room with a new
664    /// `Receipts`.
665    pub fn add_receipts(&mut self, room_id: &RoomId, event: ReceiptEventContent) {
666        self.receipts.insert(room_id.to_owned(), event);
667    }
668
669    /// Get a specific state event of statically-known type with the given state
670    /// key in the given room, if it is present in the `state` map of these
671    /// `StateChanges`.
672    pub(crate) fn state_static_for_key<C, K>(
673        &self,
674        room_id: &RoomId,
675        state_key: &K,
676    ) -> Option<&Raw<SyncStateEvent<C>>>
677    where
678        C: StaticEventContent<IsPrefix = ruma::events::False>
679            + StaticStateEventContent
680            + RedactContent,
681        C::Redacted: RedactedStateEventContent,
682        C::StateKey: Borrow<K>,
683        K: AsRef<str> + ?Sized,
684    {
685        self.state
686            .get(room_id)?
687            .get(&C::TYPE.into())?
688            .get(state_key.as_ref())
689            .map(Raw::cast_ref_unchecked)
690    }
691
692    /// Get a specific stripped state event of statically-known type with the
693    /// given state key in the given room, if it is present in the
694    /// `stripped_state` map of these `StateChanges`.
695    pub(crate) fn stripped_state_static_for_key<C, K>(
696        &self,
697        room_id: &RoomId,
698        state_key: &K,
699    ) -> Option<&Raw<StrippedStateEvent<C::PossiblyRedacted>>>
700    where
701        C: StaticEventContent<IsPrefix = ruma::events::False> + StaticStateEventContent,
702        C::StateKey: Borrow<K>,
703        K: AsRef<str> + ?Sized,
704    {
705        self.stripped_state
706            .get(room_id)?
707            .get(&C::TYPE.into())?
708            .get(state_key.as_ref())
709            .map(Raw::cast_ref_unchecked)
710    }
711
712    /// Get a specific state event of statically-known type with the given state
713    /// key in the given room, if it is present in the `state` or
714    /// `stripped_state` map of these `StateChanges` and it deserializes
715    /// successfully.
716    pub(crate) fn any_state_static_for_key<C, K>(
717        &self,
718        room_id: &RoomId,
719        state_key: &K,
720    ) -> Option<StrippedStateEvent<C::PossiblyRedacted>>
721    where
722        C: StaticEventContent<IsPrefix = ruma::events::False>
723            + StaticStateEventContent
724            + RedactContent,
725        C::Redacted: RedactedStateEventContent,
726        C::PossiblyRedacted: StaticEventContent + DeserializeOwned,
727        C::StateKey: Borrow<K>,
728        K: AsRef<str> + ?Sized,
729    {
730        self.state_static_for_key::<C, K>(room_id, state_key)
731            .map(Raw::cast_ref)
732            .or_else(|| self.stripped_state_static_for_key::<C, K>(room_id, state_key))?
733            .deserialize()
734            .ok()
735    }
736
737    /// Get the member for the given user in the given room from an event
738    /// contained in these `StateChanges`, if any.
739    pub(crate) fn member(
740        &self,
741        room_id: &RoomId,
742        user_id: &UserId,
743    ) -> Option<StrippedRoomMemberEvent> {
744        self.any_state_static_for_key::<RoomMemberEventContent, _>(room_id, user_id)
745    }
746
747    /// Get the create event for the given room from an event contained in these
748    /// `StateChanges`, if any.
749    pub(crate) fn create(&self, room_id: &RoomId) -> Option<RoomCreateWithCreatorEventContent> {
750        self.any_state_static_for_key::<RoomCreateEventContent, _>(room_id, &EmptyStateKey)
751            .map(|event| {
752                RoomCreateWithCreatorEventContent::from_event_content(event.content, event.sender)
753            })
754            // Fallback to the content in the room info.
755            .or_else(|| self.room_infos.get(room_id)?.create().cloned())
756    }
757
758    /// Get the power levels for the given room from an event contained in these
759    /// `StateChanges`, if any.
760    pub(crate) fn power_levels(&self, room_id: &RoomId) -> Option<RoomPowerLevels> {
761        let power_levels_content = self
762            .any_state_static_for_key::<RoomPowerLevelsEventContent, _>(room_id, &EmptyStateKey)?;
763
764        let create_content = self.create(room_id)?;
765        let rules = create_content.room_version.rules().unwrap_or(ROOM_VERSION_RULES_FALLBACK);
766        let creators = create_content.creators();
767
768        Some(power_levels_content.power_levels(&rules.authorization, creators))
769    }
770}
771
772/// Configuration for the various stores.
773///
774/// By default, this always includes a state store and an event cache store.
775/// When the `e2e-encryption` feature is enabled, this also includes a crypto
776/// store.
777///
778/// # Examples
779///
780/// ```
781/// # use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
782/// # use matrix_sdk_base::store::StoreConfig;
783/// #
784/// let store_config = StoreConfig::new(CrossProcessLockConfig::MultiProcess {
785///     holder_name: "cross-process-store-locks-holder-name".to_owned(),
786/// });
787/// ```
788#[derive(Clone)]
789pub struct StoreConfig {
790    #[cfg(feature = "e2e-encryption")]
791    pub(crate) crypto_store: Arc<DynCryptoStore>,
792    pub(crate) state_store: Arc<DynStateStore>,
793    pub(crate) event_cache_store: event_cache_store::EventCacheStoreLock,
794    pub(crate) media_store: media_store::MediaStoreLock,
795    cross_process_lock_config: CrossProcessLockConfig,
796}
797
798#[cfg(not(tarpaulin_include))]
799impl fmt::Debug for StoreConfig {
800    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> StdResult<(), fmt::Error> {
801        fmt.debug_struct("StoreConfig").finish()
802    }
803}
804
805impl StoreConfig {
806    /// Create a new default `StoreConfig`.
807    ///
808    /// To learn more about `cross_process_lock_config`, please read
809    /// [`CrossProcessLock::new`](matrix_sdk_common::cross_process_lock::CrossProcessLock::new).
810    #[must_use]
811    pub fn new(cross_process_lock_config: CrossProcessLockConfig) -> Self {
812        Self {
813            #[cfg(feature = "e2e-encryption")]
814            crypto_store: matrix_sdk_crypto::store::MemoryStore::new().into_crypto_store(),
815            state_store: Arc::new(MemoryStore::new()),
816            event_cache_store: event_cache_store::EventCacheStoreLock::new(
817                event_cache_store::MemoryStore::new(),
818                cross_process_lock_config.clone(),
819            ),
820            media_store: media_store::MediaStoreLock::new(
821                media_store::MemoryMediaStore::new(),
822                cross_process_lock_config.clone(),
823            ),
824            cross_process_lock_config,
825        }
826    }
827
828    /// Set a custom implementation of a `CryptoStore`.
829    ///
830    /// The crypto store must be opened before being set.
831    #[cfg(feature = "e2e-encryption")]
832    pub fn crypto_store(mut self, store: impl IntoCryptoStore) -> Self {
833        self.crypto_store = store.into_crypto_store();
834        self
835    }
836
837    /// Set a custom implementation of a `StateStore`.
838    pub fn state_store(mut self, store: impl IntoStateStore) -> Self {
839        self.state_store = store.into_state_store();
840        self
841    }
842
843    /// Set a custom implementation of an `EventCacheStore`.
844    pub fn event_cache_store<S>(mut self, event_cache_store: S) -> Self
845    where
846        S: event_cache_store::IntoEventCacheStore,
847    {
848        self.event_cache_store = event_cache_store::EventCacheStoreLock::new(
849            event_cache_store,
850            self.cross_process_lock_config.clone(),
851        );
852        self
853    }
854
855    /// Set a custom implementation of an `MediaStore`.
856    pub fn media_store<S>(mut self, media_store: S) -> Self
857    where
858        S: media_store::IntoMediaStore,
859    {
860        self.media_store =
861            media_store::MediaStoreLock::new(media_store, self.cross_process_lock_config.clone());
862        self
863    }
864}
865
866#[cfg(test)]
867mod tests {
868    use std::{ops::Not, sync::Arc};
869
870    use assert_matches::assert_matches;
871    use matrix_sdk_test::async_test;
872    use ruma::{owned_device_id, owned_user_id, room_id, user_id};
873
874    use super::{BaseStateStore, MemoryStore, RoomLoadSettings};
875    use crate::{RoomInfo, RoomState, SessionMeta, StateChanges, StateStore};
876
877    #[async_test]
878    async fn test_set_session_meta() {
879        let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
880
881        let session_meta = SessionMeta {
882            user_id: owned_user_id!("@mnt_io:matrix.org"),
883            device_id: owned_device_id!("HELLOYOU"),
884        };
885
886        assert!(store.session_meta.get().is_none());
887
888        store.set_session_meta(session_meta.clone());
889
890        assert_eq!(store.session_meta.get(), Some(&session_meta));
891    }
892
893    #[async_test]
894    #[should_panic]
895    async fn test_set_session_meta_twice() {
896        let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
897
898        let session_meta = SessionMeta {
899            user_id: owned_user_id!("@mnt_io:matrix.org"),
900            device_id: owned_device_id!("HELLOYOU"),
901        };
902
903        store.set_session_meta(session_meta.clone());
904        // Kaboom.
905        store.set_session_meta(session_meta);
906    }
907
908    #[async_test]
909    async fn test_derive_from_other() {
910        // The first store.
911        let other = BaseStateStore::new(Arc::new(MemoryStore::new()));
912
913        let session_meta = SessionMeta {
914            user_id: owned_user_id!("@mnt_io:matrix.org"),
915            device_id: owned_device_id!("HELLOYOU"),
916        };
917        let room_id_0 = room_id!("!r0");
918
919        other
920            .load_rooms(&session_meta.user_id, RoomLoadSettings::One(room_id_0.to_owned()))
921            .await
922            .unwrap();
923        other.set_session_meta(session_meta.clone());
924
925        // Derive another store.
926        let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
927        store.derive_from_other(&other).await.unwrap();
928
929        // `SessionMeta` is derived.
930        assert_eq!(store.session_meta.get(), Some(&session_meta));
931        // `RoomLoadSettings` is derived.
932        assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::One(ref room_id) => {
933            assert_eq!(room_id, room_id_0);
934        });
935
936        // The `RoomInfoNotableUpdate` is not derived. Every one has its own channel.
937        assert!(
938            store
939                .room_info_notable_update_sender
940                .same_channel(&other.room_info_notable_update_sender)
941                .not()
942        );
943    }
944
945    #[test]
946    fn test_room_load_settings_default() {
947        assert_matches!(RoomLoadSettings::default(), RoomLoadSettings::All);
948    }
949
950    #[async_test]
951    async fn test_load_all_rooms() {
952        let room_id_0 = room_id!("!r0");
953        let room_id_1 = room_id!("!r1");
954        let user_id = user_id!("@mnt_io:matrix.org");
955
956        let memory_state_store = Arc::new(MemoryStore::new());
957
958        // Initial state.
959        {
960            let store = BaseStateStore::new(memory_state_store.clone());
961            let mut changes = StateChanges::default();
962            changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
963            changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
964
965            store.inner.save_changes(&changes).await.unwrap();
966        }
967
968        // Check a `BaseStateStore` is able to load all rooms.
969        {
970            let store = BaseStateStore::new(memory_state_store.clone());
971
972            // Default value.
973            assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
974
975            // Load rooms.
976            store.load_rooms(user_id, RoomLoadSettings::All).await.unwrap();
977
978            // Check the last room load settings.
979            assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
980
981            // Check the loaded rooms.
982            let mut rooms = store.rooms();
983            rooms.sort_by(|a, b| a.room_id().cmp(b.room_id()));
984
985            assert_eq!(rooms.len(), 2);
986
987            assert_eq!(rooms[0].room_id(), room_id_0);
988            assert_eq!(rooms[0].own_user_id(), user_id);
989
990            assert_eq!(rooms[1].room_id(), room_id_1);
991            assert_eq!(rooms[1].own_user_id(), user_id);
992        }
993    }
994
995    #[async_test]
996    async fn test_load_one_room() {
997        let room_id_0 = room_id!("!r0");
998        let room_id_1 = room_id!("!r1");
999        let user_id = user_id!("@mnt_io:matrix.org");
1000
1001        let memory_state_store = Arc::new(MemoryStore::new());
1002
1003        // Initial state.
1004        {
1005            let store = BaseStateStore::new(memory_state_store.clone());
1006            let mut changes = StateChanges::default();
1007            changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
1008            changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
1009
1010            store.inner.save_changes(&changes).await.unwrap();
1011        }
1012
1013        // Check a `BaseStateStore` is able to load one room.
1014        {
1015            let store = BaseStateStore::new(memory_state_store.clone());
1016
1017            // Default value.
1018            assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
1019
1020            // Load rooms.
1021            store.load_rooms(user_id, RoomLoadSettings::One(room_id_1.to_owned())).await.unwrap();
1022
1023            // Check the last room load settings.
1024            assert_matches!(
1025                *store.room_load_settings.read().await,
1026                RoomLoadSettings::One(ref room_id) => {
1027                    assert_eq!(room_id, room_id_1);
1028                }
1029            );
1030
1031            // Check the loaded rooms.
1032            let rooms = store.rooms();
1033            assert_eq!(rooms.len(), 1);
1034
1035            assert_eq!(rooms[0].room_id(), room_id_1);
1036            assert_eq!(rooms[0].own_user_id(), user_id);
1037        }
1038    }
1039}