1use std::{
24 collections::{BTreeMap, BTreeSet, HashMap},
25 fmt,
26 ops::Deref,
27 result::Result as StdResult,
28 str::Utf8Error,
29 sync::{Arc, RwLock as StdRwLock},
30};
31
32use eyeball_im::{Vector, VectorDiff};
33use futures_util::Stream;
34use once_cell::sync::OnceCell;
35
36#[cfg(any(test, feature = "testing"))]
37#[macro_use]
38pub mod integration_tests;
39mod observable_map;
40mod traits;
41
42#[cfg(feature = "e2e-encryption")]
43use matrix_sdk_crypto::store::{DynCryptoStore, IntoCryptoStore};
44pub use matrix_sdk_store_encryption::Error as StoreEncryptionError;
45use observable_map::ObservableMap;
46use ruma::{
47 events::{
48 presence::PresenceEvent,
49 receipt::ReceiptEventContent,
50 room::{member::StrippedRoomMemberEvent, redaction::SyncRoomRedactionEvent},
51 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
52 AnySyncStateEvent, GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
53 },
54 serde::Raw,
55 EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId,
56};
57use tokio::sync::{broadcast, Mutex, RwLock};
58use tracing::warn;
59
60use crate::{
61 deserialized_responses::DisplayName,
62 event_cache::store as event_cache_store,
63 rooms::{normal::RoomInfoNotableUpdate, RoomInfo, RoomState},
64 MinimalRoomMemberEvent, Room, RoomStateFilter, SessionMeta,
65};
66
67pub(crate) mod ambiguity_map;
68mod memory_store;
69pub mod migration_helpers;
70mod send_queue;
71
72#[cfg(any(test, feature = "testing"))]
73pub use self::integration_tests::StateStoreIntegrationTests;
74pub use self::{
75 memory_store::MemoryStore,
76 send_queue::{
77 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
78 FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
79 SentMediaInfo, SentRequestKey, SerializableEventContent,
80 },
81 traits::{
82 ComposerDraft, ComposerDraftType, DynStateStore, IntoStateStore, ServerCapabilities,
83 StateStore, StateStoreDataKey, StateStoreDataValue, StateStoreExt,
84 },
85};
86
87#[derive(Debug, thiserror::Error)]
89pub enum StoreError {
90 #[error(transparent)]
92 Backend(Box<dyn std::error::Error + Send + Sync>),
93 #[error(transparent)]
95 Json(#[from] serde_json::Error),
96 #[error(transparent)]
99 Identifier(#[from] ruma::IdParseError),
100 #[error("The store failed to be unlocked")]
103 StoreLocked,
104 #[error("The store is not encrypted but was tried to be opened with a passphrase")]
106 UnencryptedStore,
107 #[error("Error encrypting or decrypting data from the store: {0}")]
109 Encryption(#[from] StoreEncryptionError),
110
111 #[error("Error encoding or decoding data from the store: {0}")]
113 Codec(#[from] Utf8Error),
114
115 #[error(
117 "The database format changed in an incompatible way, current \
118 version: {0}, latest version: {1}"
119 )]
120 UnsupportedDatabaseVersion(usize, usize),
121 #[error("Redaction failed: {0}")]
125 Redaction(#[source] ruma::canonical_json::RedactionError),
126}
127
128impl StoreError {
129 #[inline]
133 pub fn backend<E>(error: E) -> Self
134 where
135 E: std::error::Error + Send + Sync + 'static,
136 {
137 Self::Backend(Box::new(error))
138 }
139}
140
141pub type Result<T, E = StoreError> = std::result::Result<T, E>;
143
144#[derive(Clone)]
149pub(crate) struct BaseStateStore {
150 pub(super) inner: Arc<DynStateStore>,
151 session_meta: Arc<OnceCell<SessionMeta>>,
152 room_load_settings: Arc<RwLock<RoomLoadSettings>>,
153 pub(super) sync_token: Arc<RwLock<Option<String>>>,
155 rooms: Arc<StdRwLock<ObservableMap<OwnedRoomId, Room>>>,
157 sync_lock: Arc<Mutex<()>>,
160}
161
162impl BaseStateStore {
163 pub fn new(inner: Arc<DynStateStore>) -> Self {
165 Self {
166 inner,
167 session_meta: Default::default(),
168 room_load_settings: Default::default(),
169 sync_token: Default::default(),
170 rooms: Arc::new(StdRwLock::new(ObservableMap::new())),
171 sync_lock: Default::default(),
172 }
173 }
174
175 pub fn sync_lock(&self) -> &Mutex<()> {
177 &self.sync_lock
178 }
179
180 pub(crate) fn set_session_meta(&self, session_meta: SessionMeta) {
186 self.session_meta.set(session_meta).expect("`SessionMeta` was already set");
187 }
188
189 pub(crate) async fn load_rooms(
192 &self,
193 user_id: &UserId,
194 room_load_settings: RoomLoadSettings,
195 room_info_notable_update_sender: &broadcast::Sender<RoomInfoNotableUpdate>,
196 ) -> Result<()> {
197 *self.room_load_settings.write().await = room_load_settings.clone();
198
199 let room_infos = self.load_and_migrate_room_infos(room_load_settings).await?;
200
201 let mut rooms = self.rooms.write().unwrap();
202
203 for room_info in room_infos {
204 let new_room = Room::restore(
205 user_id,
206 self.inner.clone(),
207 room_info,
208 room_info_notable_update_sender.clone(),
209 );
210 let new_room_id = new_room.room_id().to_owned();
211
212 rooms.insert(new_room_id, new_room);
213 }
214
215 Ok(())
216 }
217
218 async fn load_and_migrate_room_infos(
221 &self,
222 room_load_settings: RoomLoadSettings,
223 ) -> Result<Vec<RoomInfo>> {
224 let mut room_infos = self.inner.get_room_infos(&room_load_settings).await?;
225 let mut migrated_room_infos = Vec::with_capacity(room_infos.len());
226
227 for room_info in room_infos.iter_mut() {
228 if room_info.apply_migrations(self.inner.clone()).await {
229 migrated_room_infos.push(room_info.clone());
230 }
231 }
232
233 if !migrated_room_infos.is_empty() {
234 let changes = StateChanges {
235 room_infos: migrated_room_infos
236 .into_iter()
237 .map(|room_info| (room_info.room_id.clone(), room_info))
238 .collect(),
239 ..Default::default()
240 };
241
242 if let Err(error) = self.inner.save_changes(&changes).await {
243 warn!("Failed to save migrated room infos: {error}");
244 }
245 }
246
247 Ok(room_infos)
248 }
249
250 pub(crate) async fn load_sync_token(&self) -> Result<()> {
253 let token =
254 self.get_kv_data(StateStoreDataKey::SyncToken).await?.and_then(|s| s.into_sync_token());
255 *self.sync_token.write().await = token;
256
257 Ok(())
258 }
259
260 #[cfg(any(feature = "e2e-encryption", test))]
263 pub(crate) async fn derive_from_other(
264 &self,
265 other: &Self,
266 room_info_notable_update_sender: &broadcast::Sender<RoomInfoNotableUpdate>,
267 ) -> Result<()> {
268 let Some(session_meta) = other.session_meta.get() else {
269 return Ok(());
270 };
271
272 let room_load_settings = other.room_load_settings.read().await.clone();
273
274 self.load_rooms(&session_meta.user_id, room_load_settings, room_info_notable_update_sender)
275 .await?;
276 self.load_sync_token().await?;
277 self.set_session_meta(session_meta.clone());
278
279 Ok(())
280 }
281
282 pub fn session_meta(&self) -> Option<&SessionMeta> {
284 self.session_meta.get()
285 }
286
287 pub fn rooms(&self) -> Vec<Room> {
289 self.rooms.read().unwrap().iter().cloned().collect()
290 }
291
292 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
294 self.rooms
295 .read()
296 .unwrap()
297 .iter()
298 .filter(|room| filter.matches(room.state()))
299 .cloned()
300 .collect()
301 }
302
303 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>>) {
306 self.rooms.read().unwrap().stream()
307 }
308
309 pub fn room(&self, room_id: &RoomId) -> Option<Room> {
311 self.rooms.read().unwrap().get(room_id).cloned()
312 }
313
314 pub(crate) fn room_exists(&self, room_id: &RoomId) -> bool {
316 self.rooms.read().unwrap().get(room_id).is_some()
317 }
318
319 pub fn get_or_create_room(
322 &self,
323 room_id: &RoomId,
324 room_type: RoomState,
325 room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
326 ) -> Room {
327 let user_id =
328 &self.session_meta.get().expect("Creating room while not being logged in").user_id;
329
330 self.rooms
331 .write()
332 .unwrap()
333 .get_or_create(room_id, || {
334 Room::new(
335 user_id,
336 self.inner.clone(),
337 room_id,
338 room_type,
339 room_info_notable_update_sender,
340 )
341 })
342 .clone()
343 }
344
345 pub(crate) async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
351 self.inner.remove_room(room_id).await?;
352 self.rooms.write().unwrap().remove(room_id);
353 Ok(())
354 }
355}
356
357#[cfg(not(tarpaulin_include))]
358impl fmt::Debug for BaseStateStore {
359 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
360 f.debug_struct("Store")
361 .field("inner", &self.inner)
362 .field("session_meta", &self.session_meta)
363 .field("sync_token", &self.sync_token)
364 .field("rooms", &self.rooms)
365 .finish_non_exhaustive()
366 }
367}
368
369impl Deref for BaseStateStore {
370 type Target = DynStateStore;
371
372 fn deref(&self) -> &Self::Target {
373 self.inner.deref()
374 }
375}
376
377#[derive(Clone, Debug, Default)]
412pub enum RoomLoadSettings {
413 #[default]
418 All,
419
420 One(OwnedRoomId),
426}
427
428#[derive(Clone, Debug, Default)]
430pub struct StateChanges {
431 pub sync_token: Option<String>,
433 pub account_data: BTreeMap<GlobalAccountDataEventType, Raw<AnyGlobalAccountDataEvent>>,
435 pub presence: BTreeMap<OwnedUserId, Raw<PresenceEvent>>,
437
438 pub profiles: BTreeMap<OwnedRoomId, BTreeMap<OwnedUserId, MinimalRoomMemberEvent>>,
441
442 pub profiles_to_delete: BTreeMap<OwnedRoomId, Vec<OwnedUserId>>,
446
447 pub state:
450 BTreeMap<OwnedRoomId, BTreeMap<StateEventType, BTreeMap<String, Raw<AnySyncStateEvent>>>>,
451 pub room_account_data:
453 BTreeMap<OwnedRoomId, BTreeMap<RoomAccountDataEventType, Raw<AnyRoomAccountDataEvent>>>,
454
455 pub room_infos: BTreeMap<OwnedRoomId, RoomInfo>,
457
458 pub receipts: BTreeMap<OwnedRoomId, ReceiptEventContent>,
460
461 pub redactions: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, Raw<SyncRoomRedactionEvent>>>,
464
465 pub stripped_state: BTreeMap<
468 OwnedRoomId,
469 BTreeMap<StateEventType, BTreeMap<String, Raw<AnyStrippedStateEvent>>>,
470 >,
471
472 pub ambiguity_maps: BTreeMap<OwnedRoomId, HashMap<DisplayName, BTreeSet<OwnedUserId>>>,
475}
476
477impl StateChanges {
478 pub fn new(sync_token: String) -> Self {
480 Self { sync_token: Some(sync_token), ..Default::default() }
481 }
482
483 pub fn add_presence_event(&mut self, event: PresenceEvent, raw_event: Raw<PresenceEvent>) {
485 self.presence.insert(event.sender, raw_event);
486 }
487
488 pub fn add_room(&mut self, room: RoomInfo) {
490 self.room_infos.insert(room.room_id.clone(), room);
491 }
492
493 pub fn add_room_account_data(
496 &mut self,
497 room_id: &RoomId,
498 event: AnyRoomAccountDataEvent,
499 raw_event: Raw<AnyRoomAccountDataEvent>,
500 ) {
501 self.room_account_data
502 .entry(room_id.to_owned())
503 .or_default()
504 .insert(event.event_type(), raw_event);
505 }
506
507 pub fn add_stripped_member(
510 &mut self,
511 room_id: &RoomId,
512 user_id: &UserId,
513 event: Raw<StrippedRoomMemberEvent>,
514 ) {
515 self.stripped_state
516 .entry(room_id.to_owned())
517 .or_default()
518 .entry(StateEventType::RoomMember)
519 .or_default()
520 .insert(user_id.into(), event.cast());
521 }
522
523 pub fn add_state_event(
526 &mut self,
527 room_id: &RoomId,
528 event: AnySyncStateEvent,
529 raw_event: Raw<AnySyncStateEvent>,
530 ) {
531 self.state
532 .entry(room_id.to_owned())
533 .or_default()
534 .entry(event.event_type())
535 .or_default()
536 .insert(event.state_key().to_owned(), raw_event);
537 }
538
539 pub fn add_redaction(
541 &mut self,
542 room_id: &RoomId,
543 redacted_event_id: &EventId,
544 redaction: Raw<SyncRoomRedactionEvent>,
545 ) {
546 self.redactions
547 .entry(room_id.to_owned())
548 .or_default()
549 .insert(redacted_event_id.to_owned(), redaction);
550 }
551
552 pub fn add_receipts(&mut self, room_id: &RoomId, event: ReceiptEventContent) {
555 self.receipts.insert(room_id.to_owned(), event);
556 }
557}
558
559#[derive(Clone)]
574pub struct StoreConfig {
575 #[cfg(feature = "e2e-encryption")]
576 pub(crate) crypto_store: Arc<DynCryptoStore>,
577 pub(crate) state_store: Arc<DynStateStore>,
578 pub(crate) event_cache_store: event_cache_store::EventCacheStoreLock,
579 cross_process_store_locks_holder_name: String,
580}
581
582#[cfg(not(tarpaulin_include))]
583impl fmt::Debug for StoreConfig {
584 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> StdResult<(), fmt::Error> {
585 fmt.debug_struct("StoreConfig").finish()
586 }
587}
588
589impl StoreConfig {
590 #[must_use]
595 pub fn new(cross_process_store_locks_holder_name: String) -> Self {
596 Self {
597 #[cfg(feature = "e2e-encryption")]
598 crypto_store: matrix_sdk_crypto::store::MemoryStore::new().into_crypto_store(),
599 state_store: Arc::new(MemoryStore::new()),
600 event_cache_store: event_cache_store::EventCacheStoreLock::new(
601 event_cache_store::MemoryStore::new(),
602 cross_process_store_locks_holder_name.clone(),
603 ),
604 cross_process_store_locks_holder_name,
605 }
606 }
607
608 #[cfg(feature = "e2e-encryption")]
612 pub fn crypto_store(mut self, store: impl IntoCryptoStore) -> Self {
613 self.crypto_store = store.into_crypto_store();
614 self
615 }
616
617 pub fn state_store(mut self, store: impl IntoStateStore) -> Self {
619 self.state_store = store.into_state_store();
620 self
621 }
622
623 pub fn event_cache_store<S>(mut self, event_cache_store: S) -> Self
625 where
626 S: event_cache_store::IntoEventCacheStore,
627 {
628 self.event_cache_store = event_cache_store::EventCacheStoreLock::new(
629 event_cache_store,
630 self.cross_process_store_locks_holder_name.clone(),
631 );
632 self
633 }
634}
635
636#[cfg(test)]
637mod tests {
638 use std::sync::Arc;
639
640 use assert_matches::assert_matches;
641 use matrix_sdk_test::async_test;
642 use ruma::{owned_device_id, owned_user_id, room_id, user_id};
643 use tokio::sync::broadcast;
644
645 use super::{BaseStateStore, MemoryStore, RoomLoadSettings};
646 use crate::{RoomInfo, RoomState, SessionMeta, StateChanges};
647
648 #[async_test]
649 async fn test_set_session_meta() {
650 let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
651
652 let session_meta = SessionMeta {
653 user_id: owned_user_id!("@mnt_io:matrix.org"),
654 device_id: owned_device_id!("HELLOYOU"),
655 };
656
657 assert!(store.session_meta.get().is_none());
658
659 store.set_session_meta(session_meta.clone());
660
661 assert_eq!(store.session_meta.get(), Some(&session_meta));
662 }
663
664 #[async_test]
665 #[should_panic]
666 async fn test_set_session_meta_twice() {
667 let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
668
669 let session_meta = SessionMeta {
670 user_id: owned_user_id!("@mnt_io:matrix.org"),
671 device_id: owned_device_id!("HELLOYOU"),
672 };
673
674 store.set_session_meta(session_meta.clone());
675 store.set_session_meta(session_meta);
677 }
678
679 #[async_test]
680 async fn test_derive_from_other() {
681 let other = BaseStateStore::new(Arc::new(MemoryStore::new()));
683
684 let session_meta = SessionMeta {
685 user_id: owned_user_id!("@mnt_io:matrix.org"),
686 device_id: owned_device_id!("HELLOYOU"),
687 };
688 let (room_info_notable_update_sender, _) = broadcast::channel(1);
689 let room_id_0 = room_id!("!r0");
690
691 other
692 .load_rooms(
693 &session_meta.user_id,
694 RoomLoadSettings::One(room_id_0.to_owned()),
695 &room_info_notable_update_sender,
696 )
697 .await
698 .unwrap();
699 other.set_session_meta(session_meta.clone());
700
701 let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
703 store.derive_from_other(&other, &room_info_notable_update_sender).await.unwrap();
704
705 assert_eq!(store.session_meta.get(), Some(&session_meta));
707 assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::One(ref room_id) => {
709 assert_eq!(room_id, room_id_0);
710 });
711 }
712
713 #[test]
714 fn test_room_load_settings_default() {
715 assert_matches!(RoomLoadSettings::default(), RoomLoadSettings::All);
716 }
717
718 #[async_test]
719 async fn test_load_all_rooms() {
720 let room_id_0 = room_id!("!r0");
721 let room_id_1 = room_id!("!r1");
722 let user_id = user_id!("@mnt_io:matrix.org");
723
724 let memory_state_store = Arc::new(MemoryStore::new());
725
726 {
728 let store = BaseStateStore::new(memory_state_store.clone());
729 let mut changes = StateChanges::default();
730 changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
731 changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
732
733 store.inner.save_changes(&changes).await.unwrap();
734 }
735
736 {
738 let store = BaseStateStore::new(memory_state_store.clone());
739 let (room_info_notable_update_sender, _) = broadcast::channel(2);
740
741 assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
743
744 store
746 .load_rooms(user_id, RoomLoadSettings::All, &room_info_notable_update_sender)
747 .await
748 .unwrap();
749
750 assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
752
753 let mut rooms = store.rooms();
755 rooms.sort_by(|a, b| a.room_id().cmp(b.room_id()));
756
757 assert_eq!(rooms.len(), 2);
758
759 assert_eq!(rooms[0].room_id(), room_id_0);
760 assert_eq!(rooms[0].own_user_id(), user_id);
761
762 assert_eq!(rooms[1].room_id(), room_id_1);
763 assert_eq!(rooms[1].own_user_id(), user_id);
764 }
765 }
766
767 #[async_test]
768 async fn test_load_one_room() {
769 let room_id_0 = room_id!("!r0");
770 let room_id_1 = room_id!("!r1");
771 let user_id = user_id!("@mnt_io:matrix.org");
772
773 let memory_state_store = Arc::new(MemoryStore::new());
774
775 {
777 let store = BaseStateStore::new(memory_state_store.clone());
778 let mut changes = StateChanges::default();
779 changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
780 changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
781
782 store.inner.save_changes(&changes).await.unwrap();
783 }
784
785 {
787 let store = BaseStateStore::new(memory_state_store.clone());
788 let (room_info_notable_update_sender, _) = broadcast::channel(2);
789
790 assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
792
793 store
795 .load_rooms(
796 user_id,
797 RoomLoadSettings::One(room_id_1.to_owned()),
798 &room_info_notable_update_sender,
799 )
800 .await
801 .unwrap();
802
803 assert_matches!(
805 *store.room_load_settings.read().await,
806 RoomLoadSettings::One(ref room_id) => {
807 assert_eq!(room_id, room_id_1);
808 }
809 );
810
811 let rooms = store.rooms();
813 assert_eq!(rooms.len(), 1);
814
815 assert_eq!(rooms[0].room_id(), room_id_1);
816 assert_eq!(rooms[0].own_user_id(), user_id);
817 }
818 }
819}