Skip to main content

matrix_sdk_base/store/
mod.rs

1// Copyright 2021 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The state store holds the overall state for rooms, users and their
16//! profiles and their timelines. It is an overall cache for faster access
17//! and convenience- accessible through `Store`.
18//!
19//! Implementing the `StateStore` trait, you can plug any storage backend
20//! into the store for the actual storage. By default this brings an in-memory
21//! store.
22
23use std::{
24    borrow::Borrow,
25    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
26    fmt,
27    ops::Deref,
28    result::Result as StdResult,
29    str::{FromStr, Utf8Error},
30    sync::{Arc, OnceLock, RwLock as StdRwLock},
31};
32
33use eyeball_im::{Vector, VectorDiff};
34use futures_util::Stream;
35use matrix_sdk_common::ROOM_VERSION_RULES_FALLBACK;
36
37#[cfg(any(test, feature = "testing"))]
38#[macro_use]
39pub mod integration_tests;
40mod observable_map;
41mod traits;
42
43use matrix_sdk_common::{cross_process_lock::CrossProcessLockConfig, locks::Mutex as SyncMutex};
44#[cfg(feature = "e2e-encryption")]
45use matrix_sdk_crypto::store::{DynCryptoStore, IntoCryptoStore};
46pub use matrix_sdk_store_encryption::Error as StoreEncryptionError;
47use observable_map::ObservableMap;
48use ruma::{
49    EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId,
50    events::{
51        AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
52        AnySyncStateEvent, EmptyStateKey, GlobalAccountDataEventType, RedactContent,
53        RedactedStateEventContent, RoomAccountDataEventType, StateEventType, StaticEventContent,
54        StaticStateEventContent, StrippedStateEvent, SyncStateEvent,
55        presence::PresenceEvent,
56        receipt::ReceiptEventContent,
57        room::{
58            create::RoomCreateEventContent,
59            member::{RoomMemberEventContent, StrippedRoomMemberEvent},
60            power_levels::{RoomPowerLevels, RoomPowerLevelsEventContent},
61            redaction::SyncRoomRedactionEvent,
62        },
63    },
64    serde::Raw,
65};
66use serde::de::DeserializeOwned;
67use tokio::sync::{Mutex, RwLock, broadcast};
68use tracing::warn;
69pub use traits::compare_thread_subscription_bump_stamps;
70
71use crate::{
72    MinimalRoomMemberEvent, Room, RoomCreateWithCreatorEventContent, RoomStateFilter, SessionMeta,
73    deserialized_responses::DisplayName,
74    event_cache::store as event_cache_store,
75    media::store as media_store,
76    room::{RoomInfo, RoomInfoNotableUpdate, RoomState},
77};
78
79pub(crate) mod ambiguity_map;
80mod memory_store;
81pub mod migration_helpers;
82mod send_queue;
83
84#[cfg(any(test, feature = "testing"))]
85pub use self::integration_tests::StateStoreIntegrationTests;
86#[cfg(feature = "unstable-msc4274")]
87pub use self::send_queue::{AccumulatedSentMediaInfo, FinishGalleryItemInfo};
88pub use self::{
89    memory_store::MemoryStore,
90    send_queue::{
91        ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
92        FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
93        SentMediaInfo, SentRequestKey, SerializableEventContent,
94    },
95    traits::{
96        ComposerDraft, ComposerDraftType, DraftAttachment, DraftAttachmentContent, DraftThumbnail,
97        DynStateStore, IntoStateStore, StateStore, StateStoreDataKey, StateStoreDataValue,
98        StateStoreExt, SupportedVersionsResponse, ThreadSubscriptionCatchupToken, TtlStoreValue,
99        WellKnownResponse,
100    },
101};
102
103/// State store specific error type.
104#[derive(Debug, thiserror::Error)]
105pub enum StoreError {
106    /// An error happened in the underlying database backend.
107    #[error(transparent)]
108    Backend(Box<dyn std::error::Error + Send + Sync>),
109
110    /// An error happened while serializing or deserializing some data.
111    #[error(transparent)]
112    Json(#[from] serde_json::Error),
113
114    /// An error happened while deserializing a Matrix identifier, e.g. an user
115    /// id.
116    #[error(transparent)]
117    Identifier(#[from] ruma::IdParseError),
118
119    /// The store is locked with a passphrase and an incorrect passphrase was
120    /// given.
121    #[error("The store failed to be unlocked")]
122    StoreLocked,
123
124    /// An unencrypted store was tried to be unlocked with a passphrase.
125    #[error("The store is not encrypted but was tried to be opened with a passphrase")]
126    UnencryptedStore,
127
128    /// The store failed to encrypt or decrypt some data.
129    #[error("Error encrypting or decrypting data from the store: {0}")]
130    Encryption(#[from] StoreEncryptionError),
131
132    /// The store failed to encode or decode some data.
133    #[error("Error encoding or decoding data from the store: {0}")]
134    Codec(#[from] Utf8Error),
135
136    /// The database format has changed in a backwards incompatible way.
137    #[error(
138        "The database format changed in an incompatible way, current \
139        version: {0}, latest version: {1}"
140    )]
141    UnsupportedDatabaseVersion(usize, usize),
142
143    /// Redacting an event in the store has failed.
144    ///
145    /// This should never happen.
146    #[error("Redaction failed: {0}")]
147    Redaction(#[source] ruma::canonical_json::RedactionError),
148
149    /// The store contains invalid data.
150    #[error("The store contains invalid data: {details}")]
151    InvalidData {
152        /// Details about which data is invalid, and how.
153        details: String,
154    },
155}
156
157impl StoreError {
158    /// Create a new [`Backend`][Self::Backend] error.
159    ///
160    /// Shorthand for `StoreError::Backend(Box::new(error))`.
161    #[inline]
162    pub fn backend<E>(error: E) -> Self
163    where
164        E: std::error::Error + Send + Sync + 'static,
165    {
166        Self::Backend(Box::new(error))
167    }
168}
169
170/// A `StateStore` specific result type.
171pub type Result<T, E = StoreError> = std::result::Result<T, E>;
172
173/// A state store wrapper for the SDK.
174///
175/// This adds additional higher level store functionality on top of a
176/// `StateStore` implementation.
177#[derive(Clone)]
178pub(crate) struct BaseStateStore {
179    pub(super) inner: Arc<DynStateStore>,
180    session_meta: Arc<OnceLock<SessionMeta>>,
181    room_load_settings: Arc<RwLock<RoomLoadSettings>>,
182
183    /// A sender that is used to communicate changes to room information. Each
184    /// tick contains the room ID and the reasons that have generated this tick.
185    pub(crate) room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
186
187    /// The current sync token that should be used for the next sync call.
188    pub(super) sync_token: Arc<RwLock<Option<String>>>,
189
190    /// All rooms the store knows about.
191    rooms: Arc<StdRwLock<ObservableMap<OwnedRoomId, Room>>>,
192
193    /// A lock to synchronize access to the store, such that data by the sync is
194    /// never overwritten.
195    lock: Arc<Mutex<()>>,
196
197    /// Which rooms have already logged a log line about missing room info, in
198    /// the context of response processors?
199    pub(crate) already_logged_missing_room: Arc<SyncMutex<HashSet<OwnedRoomId>>>,
200}
201
202impl BaseStateStore {
203    /// Create a new store, wrapping the given `StateStore`
204    pub fn new(inner: Arc<DynStateStore>) -> Self {
205        // Create the channel to receive `RoomInfoNotableUpdate`.
206        //
207        // Let's consider the channel will receive 5 updates for 100 rooms maximum. This
208        // is unrealistic in practise, as the sync mechanism is pretty unlikely to
209        // trigger such amount of updates, it's a safe value.
210        //
211        // Also, note that it must not be zero, because (i) it will panic,
212        // (ii) a new user has no room, but can create rooms; remember that the
213        // channel's capacity is immutable.
214        let (room_info_notable_update_sender, _room_info_notable_update_receiver) =
215            broadcast::channel(500);
216
217        Self {
218            inner,
219            session_meta: Default::default(),
220            room_load_settings: Default::default(),
221            room_info_notable_update_sender,
222            sync_token: Default::default(),
223            rooms: Arc::new(StdRwLock::new(ObservableMap::new())),
224            lock: Default::default(),
225            already_logged_missing_room: Default::default(),
226        }
227    }
228
229    /// Get access to the syncing lock.
230    pub fn lock(&self) -> &Mutex<()> {
231        &self.lock
232    }
233
234    /// Set the [`SessionMeta`] into [`BaseStateStore::session_meta`].
235    ///
236    /// # Panics
237    ///
238    /// Panics if called twice.
239    pub(crate) fn set_session_meta(&self, session_meta: SessionMeta) {
240        self.session_meta.set(session_meta).expect("`SessionMeta` was already set");
241    }
242
243    /// Loads rooms from the given [`DynStateStore`] (in
244    /// [`BaseStateStore::new`]) into [`BaseStateStore::rooms`].
245    pub(crate) async fn load_rooms(
246        &self,
247        user_id: &UserId,
248        room_load_settings: RoomLoadSettings,
249    ) -> Result<()> {
250        *self.room_load_settings.write().await = room_load_settings.clone();
251
252        let room_infos = self.load_and_migrate_room_infos(room_load_settings).await?;
253
254        let mut rooms = self.rooms.write().unwrap();
255
256        for room_info in room_infos {
257            let new_room = Room::restore(
258                user_id,
259                self.inner.clone(),
260                room_info,
261                self.room_info_notable_update_sender.clone(),
262            );
263            let new_room_id = new_room.room_id().to_owned();
264
265            rooms.insert(new_room_id, new_room);
266        }
267
268        Ok(())
269    }
270
271    /// Load room infos from the [`StateStore`] and applies migrations onto
272    /// them.
273    async fn load_and_migrate_room_infos(
274        &self,
275        room_load_settings: RoomLoadSettings,
276    ) -> Result<Vec<RoomInfo>> {
277        let mut room_infos = self.inner.get_room_infos(&room_load_settings).await?;
278        let mut migrated_room_infos = Vec::with_capacity(room_infos.len());
279
280        for room_info in room_infos.iter_mut() {
281            if room_info.apply_migrations(self.inner.clone()).await {
282                migrated_room_infos.push(room_info.clone());
283            }
284        }
285
286        if !migrated_room_infos.is_empty() {
287            let changes = StateChanges {
288                room_infos: migrated_room_infos
289                    .into_iter()
290                    .map(|room_info| (room_info.room_id.clone(), room_info))
291                    .collect(),
292                ..Default::default()
293            };
294
295            if let Err(error) = self.inner.save_changes(&changes).await {
296                warn!("Failed to save migrated room infos: {error}");
297            }
298        }
299
300        Ok(room_infos)
301    }
302
303    /// Load sync token from the [`StateStore`], and put it in
304    /// [`BaseStateStore::sync_token`].
305    pub(crate) async fn load_sync_token(&self) -> Result<()> {
306        let token =
307            self.get_kv_data(StateStoreDataKey::SyncToken).await?.and_then(|s| s.into_sync_token());
308        *self.sync_token.write().await = token;
309
310        Ok(())
311    }
312
313    /// Restore the session meta, sync token and rooms from an existing
314    /// [`BaseStateStore`].
315    #[cfg(any(feature = "e2e-encryption", test))]
316    pub(crate) async fn derive_from_other(&self, other: &Self) -> Result<()> {
317        let Some(session_meta) = other.session_meta.get() else {
318            return Ok(());
319        };
320
321        let room_load_settings = other.room_load_settings.read().await.clone();
322
323        self.load_rooms(&session_meta.user_id, room_load_settings).await?;
324        self.load_sync_token().await?;
325        self.set_session_meta(session_meta.clone());
326
327        Ok(())
328    }
329
330    /// The current [`SessionMeta`] containing our user ID and device ID.
331    pub fn session_meta(&self) -> Option<&SessionMeta> {
332        self.session_meta.get()
333    }
334
335    /// Get all the rooms this store knows about.
336    pub fn rooms(&self) -> Vec<Room> {
337        self.rooms.read().unwrap().iter().cloned().collect()
338    }
339
340    /// Get all the rooms this store knows about, filtered by state.
341    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
342        self.rooms
343            .read()
344            .unwrap()
345            .iter()
346            .filter(|room| filter.matches(room.state()))
347            .cloned()
348            .collect()
349    }
350
351    /// Get a stream of all the rooms changes, in addition to the existing
352    /// rooms.
353    pub fn rooms_stream(
354        &self,
355    ) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + use<>) {
356        self.rooms.read().unwrap().stream()
357    }
358
359    /// Get the room with the given room id.
360    pub fn room(&self, room_id: &RoomId) -> Option<Room> {
361        self.rooms.read().unwrap().get(room_id).cloned()
362    }
363
364    /// Check if a room exists.
365    pub(crate) fn room_exists(&self, room_id: &RoomId) -> bool {
366        self.rooms.read().unwrap().get(room_id).is_some()
367    }
368
369    /// Lookup the `Room` for the given `RoomId`, or create one, if it didn't
370    /// exist yet in the store
371    pub fn get_or_create_room(&self, room_id: &RoomId, room_state: RoomState) -> Room {
372        let user_id =
373            &self.session_meta.get().expect("Creating room while not being logged in").user_id;
374
375        self.rooms
376            .write()
377            .unwrap()
378            .get_or_create(room_id, || {
379                Room::new(
380                    user_id,
381                    self.inner.clone(),
382                    room_id,
383                    room_state,
384                    self.room_info_notable_update_sender.clone(),
385                )
386            })
387            .clone()
388    }
389
390    /// Forget the room with the given room ID.
391    ///
392    /// # Arguments
393    ///
394    /// * `room_id` - The id of the room that should be forgotten.
395    pub(crate) async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
396        self.inner.remove_room(room_id).await?;
397        self.rooms.write().unwrap().remove(room_id);
398        Ok(())
399    }
400}
401
402#[cfg(not(tarpaulin_include))]
403impl fmt::Debug for BaseStateStore {
404    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
405        f.debug_struct("Store")
406            .field("inner", &self.inner)
407            .field("session_meta", &self.session_meta)
408            .field("sync_token", &self.sync_token)
409            .field("rooms", &self.rooms)
410            .finish_non_exhaustive()
411    }
412}
413
414impl Deref for BaseStateStore {
415    type Target = DynStateStore;
416
417    fn deref(&self) -> &Self::Target {
418        self.inner.deref()
419    }
420}
421
422/// Configure how many rooms will be restored when restoring the session with
423/// `BaseStateStore::load_rooms`.
424///
425/// <div class="warning">
426///
427/// # ⚠️ Be careful!
428///
429/// When loading a single room with [`RoomLoadSettings::One`], the in-memory
430/// state may not reflect the store state (in the databases). Thus, when one
431/// will get a room that exists in the store state but _not_ in the in-memory
432/// state, it will be created from scratch and, when saved, will override the
433/// data in the store state (in the databases). This can lead to weird
434/// behaviours.
435///
436/// This option is expected to be used as follows:
437///
438/// 1. Create a `BaseStateStore` with a [`StateStore`] based on SQLite for
439///    example,
440/// 2. Restore a session and load one room from the [`StateStore`] (in the case
441///    of dealing with a notification for example),
442/// 3. Derive the `BaseStateStore`, with `BaseStateStore::derive_from_other`,
443///    into another one with an in-memory [`StateStore`], such as
444///    [`MemoryStore`],
445/// 4. Work on this derived `BaseStateStore`.
446///
447/// Now, all operations happen in the [`MemoryStore`], not on the original store
448/// (SQLite in this example), thus protecting original data.
449///
450/// From a higher-level point of view, this is what
451/// [`BaseClient::clone_with_in_memory_state_store`] does.
452///
453/// </div>
454///
455/// [`BaseClient::clone_with_in_memory_state_store`]: crate::BaseClient::clone_with_in_memory_state_store
456#[derive(Clone, Debug, Default)]
457pub enum RoomLoadSettings {
458    /// Load all rooms from the [`StateStore`] into the in-memory state store
459    /// `BaseStateStore`.
460    ///
461    /// This is the default variant.
462    #[default]
463    All,
464
465    /// Load a single room from the [`StateStore`] into the in-memory state
466    /// store `BaseStateStore`.
467    ///
468    /// Please, be careful with this option. Read the documentation of
469    /// [`RoomLoadSettings`].
470    One(OwnedRoomId),
471}
472
473/// The subscription status of a thread.
474///
475/// We keep unsubscriptions in the database, because we need the bumpstamp
476/// information (in `ThreadSubscription`) to be around to order subscriptions
477/// and unsubscriptions.
478#[derive(Clone, Copy, Debug, PartialEq, Eq)]
479pub enum ThreadSubscriptionStatus {
480    /// The user is subscribed to the related thread.
481    Subscribed {
482        /// Whether the subscription was made automatically by a client, not by
483        /// manual user choice.
484        automatic: bool,
485    },
486
487    /// The user has been unsubscribed to the related thread.
488    Unsubscribed,
489}
490
491impl FromStr for ThreadSubscriptionStatus {
492    type Err = ();
493
494    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
495        match s {
496            "automatic" => Ok(ThreadSubscriptionStatus::Subscribed { automatic: true }),
497            "manual" => Ok(ThreadSubscriptionStatus::Subscribed { automatic: false }),
498            "unsubscribed" => Ok(ThreadSubscriptionStatus::Unsubscribed),
499            _ => Err(()),
500        }
501    }
502}
503
504impl ThreadSubscriptionStatus {
505    /// Represent the status as a static string ref, for it to be stored into a
506    /// persistent format.
507    ///
508    /// Note: this is serialized in some databases implementations, so make sure
509    /// to not change it lightly, and keep it in sync with
510    /// [`Self::from_str`].
511    pub fn as_str(&self) -> &'static str {
512        match self {
513            ThreadSubscriptionStatus::Subscribed { automatic } => {
514                if *automatic {
515                    "automatic"
516                } else {
517                    "manual"
518                }
519            }
520            ThreadSubscriptionStatus::Unsubscribed => "unsubscribed",
521        }
522    }
523}
524
525/// A thread subscription, as saved in the state store.
526#[derive(Clone, Copy, Debug, PartialEq, Eq)]
527pub struct StoredThreadSubscription {
528    /// Current status of the subscription.
529    pub status: ThreadSubscriptionStatus,
530
531    /// An optional bump stamp, as defined in the MSC; the higher the value, the
532    /// most recent the thread subscription information is, and should be
533    /// remembered.
534    ///
535    /// If not set, this means it's a user-provided thread subscription, for
536    /// which we're waiting validation from a server (e.g. through a remote
537    /// echo via sync).
538    pub bump_stamp: Option<u64>,
539}
540
541/// Store state changes and pass them to the StateStore.
542#[derive(Clone, Debug, Default)]
543pub struct StateChanges {
544    /// The sync token that relates to this update.
545    pub sync_token: Option<String>,
546    /// A mapping of event type string to `AnyBasicEvent`.
547    pub account_data: BTreeMap<GlobalAccountDataEventType, Raw<AnyGlobalAccountDataEvent>>,
548    /// A mapping of `UserId` to `PresenceEvent`.
549    pub presence: BTreeMap<OwnedUserId, Raw<PresenceEvent>>,
550
551    /// A mapping of `RoomId` to a map of users and their
552    /// `MinimalRoomMemberEvent`.
553    pub profiles: BTreeMap<OwnedRoomId, BTreeMap<OwnedUserId, MinimalRoomMemberEvent>>,
554
555    /// A mapping of room profiles to delete.
556    ///
557    /// These are deleted *before* other room profiles are inserted.
558    pub profiles_to_delete: BTreeMap<OwnedRoomId, Vec<OwnedUserId>>,
559
560    /// A mapping of `RoomId` to a map of event type string to a state key and
561    /// `AnySyncStateEvent`.
562    pub state:
563        BTreeMap<OwnedRoomId, BTreeMap<StateEventType, BTreeMap<String, Raw<AnySyncStateEvent>>>>,
564    /// A mapping of `RoomId` to a map of event type string to `AnyBasicEvent`.
565    pub room_account_data:
566        BTreeMap<OwnedRoomId, BTreeMap<RoomAccountDataEventType, Raw<AnyRoomAccountDataEvent>>>,
567
568    /// A map of `OwnedRoomId` to `RoomInfo`.
569    pub room_infos: BTreeMap<OwnedRoomId, RoomInfo>,
570
571    /// A map of `RoomId` to `ReceiptEventContent`.
572    pub receipts: BTreeMap<OwnedRoomId, ReceiptEventContent>,
573
574    /// A map of `RoomId` to maps of `OwnedEventId` to be redacted by
575    /// `SyncRoomRedactionEvent`.
576    pub redactions: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, Raw<SyncRoomRedactionEvent>>>,
577
578    /// A mapping of `RoomId` to a map of event type to a map of state key to
579    /// `StrippedState`.
580    pub stripped_state: BTreeMap<
581        OwnedRoomId,
582        BTreeMap<StateEventType, BTreeMap<String, Raw<AnyStrippedStateEvent>>>,
583    >,
584
585    /// A map from room id to a map of a display name and a set of user ids that
586    /// share that display name in the given room.
587    pub ambiguity_maps: BTreeMap<OwnedRoomId, HashMap<DisplayName, BTreeSet<OwnedUserId>>>,
588}
589
590impl StateChanges {
591    /// Create a new `StateChanges` struct with the given sync_token.
592    pub fn new(sync_token: String) -> Self {
593        Self { sync_token: Some(sync_token), ..Default::default() }
594    }
595
596    /// Update the `StateChanges` struct with the given `PresenceEvent`.
597    pub fn add_presence_event(&mut self, event: PresenceEvent, raw_event: Raw<PresenceEvent>) {
598        self.presence.insert(event.sender, raw_event);
599    }
600
601    /// Update the `StateChanges` struct with the given `RoomInfo`.
602    pub fn add_room(&mut self, room: RoomInfo) {
603        self.room_infos.insert(room.room_id.clone(), room);
604    }
605
606    /// Update the `StateChanges` struct with the given room with a new
607    /// `AnyBasicEvent`.
608    pub fn add_room_account_data(
609        &mut self,
610        room_id: &RoomId,
611        event: AnyRoomAccountDataEvent,
612        raw_event: Raw<AnyRoomAccountDataEvent>,
613    ) {
614        self.room_account_data
615            .entry(room_id.to_owned())
616            .or_default()
617            .insert(event.event_type(), raw_event);
618    }
619
620    /// Update the `StateChanges` struct with the given room with a new
621    /// `StrippedMemberEvent`.
622    pub fn add_stripped_member(
623        &mut self,
624        room_id: &RoomId,
625        user_id: &UserId,
626        event: Raw<StrippedRoomMemberEvent>,
627    ) {
628        self.stripped_state
629            .entry(room_id.to_owned())
630            .or_default()
631            .entry(StateEventType::RoomMember)
632            .or_default()
633            .insert(user_id.into(), event.cast());
634    }
635
636    /// Update the `StateChanges` struct with the given room with a new
637    /// `AnySyncStateEvent`.
638    pub fn add_state_event(
639        &mut self,
640        room_id: &RoomId,
641        event: AnySyncStateEvent,
642        raw_event: Raw<AnySyncStateEvent>,
643    ) {
644        self.state
645            .entry(room_id.to_owned())
646            .or_default()
647            .entry(event.event_type())
648            .or_default()
649            .insert(event.state_key().to_owned(), raw_event);
650    }
651
652    /// Redact an event in the room
653    pub fn add_redaction(
654        &mut self,
655        room_id: &RoomId,
656        redacted_event_id: &EventId,
657        redaction: Raw<SyncRoomRedactionEvent>,
658    ) {
659        self.redactions
660            .entry(room_id.to_owned())
661            .or_default()
662            .insert(redacted_event_id.to_owned(), redaction);
663    }
664
665    /// Update the `StateChanges` struct with the given room with a new
666    /// `Receipts`.
667    pub fn add_receipts(&mut self, room_id: &RoomId, event: ReceiptEventContent) {
668        self.receipts.insert(room_id.to_owned(), event);
669    }
670
671    /// Get a specific state event of statically-known type with the given state
672    /// key in the given room, if it is present in the `state` map of these
673    /// `StateChanges`.
674    pub(crate) fn state_static_for_key<C, K>(
675        &self,
676        room_id: &RoomId,
677        state_key: &K,
678    ) -> Option<&Raw<SyncStateEvent<C>>>
679    where
680        C: StaticEventContent<IsPrefix = ruma::events::False>
681            + StaticStateEventContent
682            + RedactContent,
683        C::Redacted: RedactedStateEventContent,
684        C::StateKey: Borrow<K>,
685        K: AsRef<str> + ?Sized,
686    {
687        self.state
688            .get(room_id)?
689            .get(&C::TYPE.into())?
690            .get(state_key.as_ref())
691            .map(Raw::cast_ref_unchecked)
692    }
693
694    /// Get a specific stripped state event of statically-known type with the
695    /// given state key in the given room, if it is present in the
696    /// `stripped_state` map of these `StateChanges`.
697    pub(crate) fn stripped_state_static_for_key<C, K>(
698        &self,
699        room_id: &RoomId,
700        state_key: &K,
701    ) -> Option<&Raw<StrippedStateEvent<C::PossiblyRedacted>>>
702    where
703        C: StaticEventContent<IsPrefix = ruma::events::False> + StaticStateEventContent,
704        C::StateKey: Borrow<K>,
705        K: AsRef<str> + ?Sized,
706    {
707        self.stripped_state
708            .get(room_id)?
709            .get(&C::TYPE.into())?
710            .get(state_key.as_ref())
711            .map(Raw::cast_ref_unchecked)
712    }
713
714    /// Get a specific state event of statically-known type with the given state
715    /// key in the given room, if it is present in the `state` or
716    /// `stripped_state` map of these `StateChanges` and it deserializes
717    /// successfully.
718    pub(crate) fn any_state_static_for_key<C, K>(
719        &self,
720        room_id: &RoomId,
721        state_key: &K,
722    ) -> Option<StrippedStateEvent<C::PossiblyRedacted>>
723    where
724        C: StaticEventContent<IsPrefix = ruma::events::False>
725            + StaticStateEventContent
726            + RedactContent,
727        C::Redacted: RedactedStateEventContent,
728        C::PossiblyRedacted: StaticEventContent + DeserializeOwned,
729        C::StateKey: Borrow<K>,
730        K: AsRef<str> + ?Sized,
731    {
732        self.state_static_for_key::<C, K>(room_id, state_key)
733            .map(Raw::cast_ref)
734            .or_else(|| self.stripped_state_static_for_key::<C, K>(room_id, state_key))?
735            .deserialize()
736            .ok()
737    }
738
739    /// Get the member for the given user in the given room from an event
740    /// contained in these `StateChanges`, if any.
741    pub(crate) fn member(
742        &self,
743        room_id: &RoomId,
744        user_id: &UserId,
745    ) -> Option<StrippedRoomMemberEvent> {
746        self.any_state_static_for_key::<RoomMemberEventContent, _>(room_id, user_id)
747    }
748
749    /// Get the create event for the given room from an event contained in these
750    /// `StateChanges`, if any.
751    pub(crate) fn create(&self, room_id: &RoomId) -> Option<RoomCreateWithCreatorEventContent> {
752        self.any_state_static_for_key::<RoomCreateEventContent, _>(room_id, &EmptyStateKey)
753            .map(|event| {
754                RoomCreateWithCreatorEventContent::from_event_content(event.content, event.sender)
755            })
756            // Fallback to the content in the room info.
757            .or_else(|| self.room_infos.get(room_id)?.create().cloned())
758    }
759
760    /// Get the power levels for the given room from an event contained in these
761    /// `StateChanges`, if any.
762    pub(crate) fn power_levels(&self, room_id: &RoomId) -> Option<RoomPowerLevels> {
763        let power_levels_content = self
764            .any_state_static_for_key::<RoomPowerLevelsEventContent, _>(room_id, &EmptyStateKey)?;
765
766        let create_content = self.create(room_id)?;
767        let rules = create_content.room_version.rules().unwrap_or(ROOM_VERSION_RULES_FALLBACK);
768        let creators = create_content.creators();
769
770        Some(power_levels_content.power_levels(&rules.authorization, creators))
771    }
772}
773
774/// Configuration for the various stores.
775///
776/// By default, this always includes a state store and an event cache store.
777/// When the `e2e-encryption` feature is enabled, this also includes a crypto
778/// store.
779///
780/// # Examples
781///
782/// ```
783/// # use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
784/// # use matrix_sdk_base::store::StoreConfig;
785/// #
786/// let store_config = StoreConfig::new(CrossProcessLockConfig::MultiProcess {
787///     holder_name: "cross-process-store-locks-holder-name".to_owned(),
788/// });
789/// ```
790#[derive(Clone)]
791pub struct StoreConfig {
792    #[cfg(feature = "e2e-encryption")]
793    pub(crate) crypto_store: Arc<DynCryptoStore>,
794    pub(crate) state_store: Arc<DynStateStore>,
795    pub(crate) event_cache_store: event_cache_store::EventCacheStoreLock,
796    pub(crate) media_store: media_store::MediaStoreLock,
797    cross_process_lock_config: CrossProcessLockConfig,
798}
799
800#[cfg(not(tarpaulin_include))]
801impl fmt::Debug for StoreConfig {
802    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> StdResult<(), fmt::Error> {
803        fmt.debug_struct("StoreConfig").finish()
804    }
805}
806
807impl StoreConfig {
808    /// Create a new default `StoreConfig`.
809    ///
810    /// To learn more about `cross_process_lock_config`, please read
811    /// [`CrossProcessLock::new`](matrix_sdk_common::cross_process_lock::CrossProcessLock::new).
812    #[must_use]
813    pub fn new(cross_process_lock_config: CrossProcessLockConfig) -> Self {
814        Self {
815            #[cfg(feature = "e2e-encryption")]
816            crypto_store: matrix_sdk_crypto::store::MemoryStore::new().into_crypto_store(),
817            state_store: Arc::new(MemoryStore::new()),
818            event_cache_store: event_cache_store::EventCacheStoreLock::new(
819                event_cache_store::MemoryStore::new(),
820                cross_process_lock_config.clone(),
821            ),
822            media_store: media_store::MediaStoreLock::new(
823                media_store::MemoryMediaStore::new(),
824                cross_process_lock_config.clone(),
825            ),
826            cross_process_lock_config,
827        }
828    }
829
830    /// Set a custom implementation of a `CryptoStore`.
831    ///
832    /// The crypto store must be opened before being set.
833    #[cfg(feature = "e2e-encryption")]
834    pub fn crypto_store(mut self, store: impl IntoCryptoStore) -> Self {
835        self.crypto_store = store.into_crypto_store();
836        self
837    }
838
839    /// Set a custom implementation of a `StateStore`.
840    pub fn state_store(mut self, store: impl IntoStateStore) -> Self {
841        self.state_store = store.into_state_store();
842        self
843    }
844
845    /// Set a custom implementation of an `EventCacheStore`.
846    pub fn event_cache_store<S>(mut self, event_cache_store: S) -> Self
847    where
848        S: event_cache_store::IntoEventCacheStore,
849    {
850        self.event_cache_store = event_cache_store::EventCacheStoreLock::new(
851            event_cache_store,
852            self.cross_process_lock_config.clone(),
853        );
854        self
855    }
856
857    /// Set a custom implementation of an `MediaStore`.
858    pub fn media_store<S>(mut self, media_store: S) -> Self
859    where
860        S: media_store::IntoMediaStore,
861    {
862        self.media_store =
863            media_store::MediaStoreLock::new(media_store, self.cross_process_lock_config.clone());
864        self
865    }
866}
867
868#[cfg(test)]
869mod tests {
870    use std::{ops::Not, sync::Arc};
871
872    use assert_matches::assert_matches;
873    use matrix_sdk_test::async_test;
874    use ruma::{owned_device_id, owned_user_id, room_id, user_id};
875
876    use super::{BaseStateStore, MemoryStore, RoomLoadSettings};
877    use crate::{RoomInfo, RoomState, SessionMeta, StateChanges};
878
879    #[async_test]
880    async fn test_set_session_meta() {
881        let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
882
883        let session_meta = SessionMeta {
884            user_id: owned_user_id!("@mnt_io:matrix.org"),
885            device_id: owned_device_id!("HELLOYOU"),
886        };
887
888        assert!(store.session_meta.get().is_none());
889
890        store.set_session_meta(session_meta.clone());
891
892        assert_eq!(store.session_meta.get(), Some(&session_meta));
893    }
894
895    #[async_test]
896    #[should_panic]
897    async fn test_set_session_meta_twice() {
898        let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
899
900        let session_meta = SessionMeta {
901            user_id: owned_user_id!("@mnt_io:matrix.org"),
902            device_id: owned_device_id!("HELLOYOU"),
903        };
904
905        store.set_session_meta(session_meta.clone());
906        // Kaboom.
907        store.set_session_meta(session_meta);
908    }
909
910    #[async_test]
911    async fn test_derive_from_other() {
912        // The first store.
913        let other = BaseStateStore::new(Arc::new(MemoryStore::new()));
914
915        let session_meta = SessionMeta {
916            user_id: owned_user_id!("@mnt_io:matrix.org"),
917            device_id: owned_device_id!("HELLOYOU"),
918        };
919        let room_id_0 = room_id!("!r0");
920
921        other
922            .load_rooms(&session_meta.user_id, RoomLoadSettings::One(room_id_0.to_owned()))
923            .await
924            .unwrap();
925        other.set_session_meta(session_meta.clone());
926
927        // Derive another store.
928        let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
929        store.derive_from_other(&other).await.unwrap();
930
931        // `SessionMeta` is derived.
932        assert_eq!(store.session_meta.get(), Some(&session_meta));
933        // `RoomLoadSettings` is derived.
934        assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::One(ref room_id) => {
935            assert_eq!(room_id, room_id_0);
936        });
937
938        // The `RoomInfoNotableUpdate` is not derived. Every one has its own channel.
939        assert!(
940            store
941                .room_info_notable_update_sender
942                .same_channel(&other.room_info_notable_update_sender)
943                .not()
944        );
945    }
946
947    #[test]
948    fn test_room_load_settings_default() {
949        assert_matches!(RoomLoadSettings::default(), RoomLoadSettings::All);
950    }
951
952    #[async_test]
953    async fn test_load_all_rooms() {
954        let room_id_0 = room_id!("!r0");
955        let room_id_1 = room_id!("!r1");
956        let user_id = user_id!("@mnt_io:matrix.org");
957
958        let memory_state_store = Arc::new(MemoryStore::new());
959
960        // Initial state.
961        {
962            let store = BaseStateStore::new(memory_state_store.clone());
963            let mut changes = StateChanges::default();
964            changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
965            changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
966
967            store.inner.save_changes(&changes).await.unwrap();
968        }
969
970        // Check a `BaseStateStore` is able to load all rooms.
971        {
972            let store = BaseStateStore::new(memory_state_store.clone());
973
974            // Default value.
975            assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
976
977            // Load rooms.
978            store.load_rooms(user_id, RoomLoadSettings::All).await.unwrap();
979
980            // Check the last room load settings.
981            assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
982
983            // Check the loaded rooms.
984            let mut rooms = store.rooms();
985            rooms.sort_by(|a, b| a.room_id().cmp(b.room_id()));
986
987            assert_eq!(rooms.len(), 2);
988
989            assert_eq!(rooms[0].room_id(), room_id_0);
990            assert_eq!(rooms[0].own_user_id(), user_id);
991
992            assert_eq!(rooms[1].room_id(), room_id_1);
993            assert_eq!(rooms[1].own_user_id(), user_id);
994        }
995    }
996
997    #[async_test]
998    async fn test_load_one_room() {
999        let room_id_0 = room_id!("!r0");
1000        let room_id_1 = room_id!("!r1");
1001        let user_id = user_id!("@mnt_io:matrix.org");
1002
1003        let memory_state_store = Arc::new(MemoryStore::new());
1004
1005        // Initial state.
1006        {
1007            let store = BaseStateStore::new(memory_state_store.clone());
1008            let mut changes = StateChanges::default();
1009            changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
1010            changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
1011
1012            store.inner.save_changes(&changes).await.unwrap();
1013        }
1014
1015        // Check a `BaseStateStore` is able to load one room.
1016        {
1017            let store = BaseStateStore::new(memory_state_store.clone());
1018
1019            // Default value.
1020            assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
1021
1022            // Load rooms.
1023            store.load_rooms(user_id, RoomLoadSettings::One(room_id_1.to_owned())).await.unwrap();
1024
1025            // Check the last room load settings.
1026            assert_matches!(
1027                *store.room_load_settings.read().await,
1028                RoomLoadSettings::One(ref room_id) => {
1029                    assert_eq!(room_id, room_id_1);
1030                }
1031            );
1032
1033            // Check the loaded rooms.
1034            let rooms = store.rooms();
1035            assert_eq!(rooms.len(), 1);
1036
1037            assert_eq!(rooms[0].room_id(), room_id_1);
1038            assert_eq!(rooms[0].own_user_id(), user_id);
1039        }
1040    }
1041}