1use std::{
24 borrow::Borrow,
25 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
26 fmt,
27 ops::Deref,
28 result::Result as StdResult,
29 str::{FromStr, Utf8Error},
30 sync::{Arc, RwLock as StdRwLock},
31};
32
33use eyeball_im::{Vector, VectorDiff};
34use futures_util::Stream;
35use matrix_sdk_common::ROOM_VERSION_RULES_FALLBACK;
36use once_cell::sync::OnceCell;
37
38#[cfg(any(test, feature = "testing"))]
39#[macro_use]
40pub mod integration_tests;
41mod observable_map;
42mod traits;
43
44use matrix_sdk_common::locks::Mutex as SyncMutex;
45#[cfg(feature = "e2e-encryption")]
46use matrix_sdk_crypto::store::{DynCryptoStore, IntoCryptoStore};
47pub use matrix_sdk_store_encryption::Error as StoreEncryptionError;
48use observable_map::ObservableMap;
49use ruma::{
50 EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId,
51 events::{
52 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
53 AnySyncStateEvent, EmptyStateKey, GlobalAccountDataEventType, RedactContent,
54 RedactedStateEventContent, RoomAccountDataEventType, StateEventType, StaticEventContent,
55 StaticStateEventContent, StrippedStateEvent, SyncStateEvent,
56 presence::PresenceEvent,
57 receipt::ReceiptEventContent,
58 room::{
59 create::RoomCreateEventContent,
60 member::{RoomMemberEventContent, StrippedRoomMemberEvent},
61 power_levels::{RoomPowerLevels, RoomPowerLevelsEventContent},
62 redaction::SyncRoomRedactionEvent,
63 },
64 },
65 serde::Raw,
66};
67use serde::de::DeserializeOwned;
68use tokio::sync::{Mutex, RwLock, broadcast};
69use tracing::warn;
70pub use traits::compare_thread_subscription_bump_stamps;
71
72use crate::{
73 MinimalRoomMemberEvent, Room, RoomCreateWithCreatorEventContent, RoomStateFilter, SessionMeta,
74 deserialized_responses::DisplayName,
75 event_cache::store as event_cache_store,
76 media::store as media_store,
77 room::{RoomInfo, RoomInfoNotableUpdate, RoomState},
78};
79
80pub(crate) mod ambiguity_map;
81mod memory_store;
82pub mod migration_helpers;
83mod send_queue;
84
85#[cfg(any(test, feature = "testing"))]
86pub use self::integration_tests::StateStoreIntegrationTests;
87#[cfg(feature = "unstable-msc4274")]
88pub use self::send_queue::{AccumulatedSentMediaInfo, FinishGalleryItemInfo};
89pub use self::{
90 memory_store::MemoryStore,
91 send_queue::{
92 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
93 FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
94 SentMediaInfo, SentRequestKey, SerializableEventContent,
95 },
96 traits::{
97 ComposerDraft, ComposerDraftType, DynStateStore, IntoStateStore, ServerInfo, StateStore,
98 StateStoreDataKey, StateStoreDataValue, StateStoreExt, ThreadSubscriptionCatchupToken,
99 WellKnownResponse,
100 },
101};
102
103#[derive(Debug, thiserror::Error)]
105pub enum StoreError {
106 #[error(transparent)]
108 Backend(Box<dyn std::error::Error + Send + Sync>),
109
110 #[error(transparent)]
112 Json(#[from] serde_json::Error),
113
114 #[error(transparent)]
117 Identifier(#[from] ruma::IdParseError),
118
119 #[error("The store failed to be unlocked")]
122 StoreLocked,
123
124 #[error("The store is not encrypted but was tried to be opened with a passphrase")]
126 UnencryptedStore,
127
128 #[error("Error encrypting or decrypting data from the store: {0}")]
130 Encryption(#[from] StoreEncryptionError),
131
132 #[error("Error encoding or decoding data from the store: {0}")]
134 Codec(#[from] Utf8Error),
135
136 #[error(
138 "The database format changed in an incompatible way, current \
139 version: {0}, latest version: {1}"
140 )]
141 UnsupportedDatabaseVersion(usize, usize),
142
143 #[error("Redaction failed: {0}")]
147 Redaction(#[source] ruma::canonical_json::RedactionError),
148
149 #[error("The store contains invalid data: {details}")]
151 InvalidData {
152 details: String,
154 },
155}
156
157impl StoreError {
158 #[inline]
162 pub fn backend<E>(error: E) -> Self
163 where
164 E: std::error::Error + Send + Sync + 'static,
165 {
166 Self::Backend(Box::new(error))
167 }
168}
169
170pub type Result<T, E = StoreError> = std::result::Result<T, E>;
172
173#[derive(Clone)]
178pub(crate) struct BaseStateStore {
179 pub(super) inner: Arc<DynStateStore>,
180 session_meta: Arc<OnceCell<SessionMeta>>,
181 room_load_settings: Arc<RwLock<RoomLoadSettings>>,
182 pub(super) sync_token: Arc<RwLock<Option<String>>>,
184 rooms: Arc<StdRwLock<ObservableMap<OwnedRoomId, Room>>>,
186 lock: Arc<Mutex<()>>,
189
190 pub(crate) already_logged_missing_room: Arc<SyncMutex<HashSet<OwnedRoomId>>>,
193}
194
195impl BaseStateStore {
196 pub fn new(inner: Arc<DynStateStore>) -> Self {
198 Self {
199 inner,
200 session_meta: Default::default(),
201 room_load_settings: Default::default(),
202 sync_token: Default::default(),
203 rooms: Arc::new(StdRwLock::new(ObservableMap::new())),
204 lock: Default::default(),
205 already_logged_missing_room: Default::default(),
206 }
207 }
208
209 pub fn lock(&self) -> &Mutex<()> {
211 &self.lock
212 }
213
214 pub(crate) fn set_session_meta(&self, session_meta: SessionMeta) {
220 self.session_meta.set(session_meta).expect("`SessionMeta` was already set");
221 }
222
223 pub(crate) async fn load_rooms(
226 &self,
227 user_id: &UserId,
228 room_load_settings: RoomLoadSettings,
229 room_info_notable_update_sender: &broadcast::Sender<RoomInfoNotableUpdate>,
230 ) -> Result<()> {
231 *self.room_load_settings.write().await = room_load_settings.clone();
232
233 let room_infos = self.load_and_migrate_room_infos(room_load_settings).await?;
234
235 let mut rooms = self.rooms.write().unwrap();
236
237 for room_info in room_infos {
238 let new_room = Room::restore(
239 user_id,
240 self.inner.clone(),
241 room_info,
242 room_info_notable_update_sender.clone(),
243 );
244 let new_room_id = new_room.room_id().to_owned();
245
246 rooms.insert(new_room_id, new_room);
247 }
248
249 Ok(())
250 }
251
252 async fn load_and_migrate_room_infos(
255 &self,
256 room_load_settings: RoomLoadSettings,
257 ) -> Result<Vec<RoomInfo>> {
258 let mut room_infos = self.inner.get_room_infos(&room_load_settings).await?;
259 let mut migrated_room_infos = Vec::with_capacity(room_infos.len());
260
261 for room_info in room_infos.iter_mut() {
262 if room_info.apply_migrations(self.inner.clone()).await {
263 migrated_room_infos.push(room_info.clone());
264 }
265 }
266
267 if !migrated_room_infos.is_empty() {
268 let changes = StateChanges {
269 room_infos: migrated_room_infos
270 .into_iter()
271 .map(|room_info| (room_info.room_id.clone(), room_info))
272 .collect(),
273 ..Default::default()
274 };
275
276 if let Err(error) = self.inner.save_changes(&changes).await {
277 warn!("Failed to save migrated room infos: {error}");
278 }
279 }
280
281 Ok(room_infos)
282 }
283
284 pub(crate) async fn load_sync_token(&self) -> Result<()> {
287 let token =
288 self.get_kv_data(StateStoreDataKey::SyncToken).await?.and_then(|s| s.into_sync_token());
289 *self.sync_token.write().await = token;
290
291 Ok(())
292 }
293
294 #[cfg(any(feature = "e2e-encryption", test))]
297 pub(crate) async fn derive_from_other(
298 &self,
299 other: &Self,
300 room_info_notable_update_sender: &broadcast::Sender<RoomInfoNotableUpdate>,
301 ) -> Result<()> {
302 let Some(session_meta) = other.session_meta.get() else {
303 return Ok(());
304 };
305
306 let room_load_settings = other.room_load_settings.read().await.clone();
307
308 self.load_rooms(&session_meta.user_id, room_load_settings, room_info_notable_update_sender)
309 .await?;
310 self.load_sync_token().await?;
311 self.set_session_meta(session_meta.clone());
312
313 Ok(())
314 }
315
316 pub fn session_meta(&self) -> Option<&SessionMeta> {
318 self.session_meta.get()
319 }
320
321 pub fn rooms(&self) -> Vec<Room> {
323 self.rooms.read().unwrap().iter().cloned().collect()
324 }
325
326 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
328 self.rooms
329 .read()
330 .unwrap()
331 .iter()
332 .filter(|room| filter.matches(room.state()))
333 .cloned()
334 .collect()
335 }
336
337 pub fn rooms_stream(
340 &self,
341 ) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + use<>) {
342 self.rooms.read().unwrap().stream()
343 }
344
345 pub fn room(&self, room_id: &RoomId) -> Option<Room> {
347 self.rooms.read().unwrap().get(room_id).cloned()
348 }
349
350 pub(crate) fn room_exists(&self, room_id: &RoomId) -> bool {
352 self.rooms.read().unwrap().get(room_id).is_some()
353 }
354
355 pub fn get_or_create_room(
358 &self,
359 room_id: &RoomId,
360 room_state: RoomState,
361 room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
362 ) -> Room {
363 let user_id =
364 &self.session_meta.get().expect("Creating room while not being logged in").user_id;
365
366 self.rooms
367 .write()
368 .unwrap()
369 .get_or_create(room_id, || {
370 Room::new(
371 user_id,
372 self.inner.clone(),
373 room_id,
374 room_state,
375 room_info_notable_update_sender,
376 )
377 })
378 .clone()
379 }
380
381 pub(crate) async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
387 self.inner.remove_room(room_id).await?;
388 self.rooms.write().unwrap().remove(room_id);
389 Ok(())
390 }
391}
392
393#[cfg(not(tarpaulin_include))]
394impl fmt::Debug for BaseStateStore {
395 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
396 f.debug_struct("Store")
397 .field("inner", &self.inner)
398 .field("session_meta", &self.session_meta)
399 .field("sync_token", &self.sync_token)
400 .field("rooms", &self.rooms)
401 .finish_non_exhaustive()
402 }
403}
404
405impl Deref for BaseStateStore {
406 type Target = DynStateStore;
407
408 fn deref(&self) -> &Self::Target {
409 self.inner.deref()
410 }
411}
412
413#[derive(Clone, Debug, Default)]
448pub enum RoomLoadSettings {
449 #[default]
454 All,
455
456 One(OwnedRoomId),
462}
463
464#[derive(Clone, Copy, Debug, PartialEq, Eq)]
470pub enum ThreadSubscriptionStatus {
471 Subscribed {
473 automatic: bool,
476 },
477
478 Unsubscribed,
480}
481
482impl FromStr for ThreadSubscriptionStatus {
483 type Err = ();
484
485 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
486 match s {
487 "automatic" => Ok(ThreadSubscriptionStatus::Subscribed { automatic: true }),
488 "manual" => Ok(ThreadSubscriptionStatus::Subscribed { automatic: false }),
489 "unsubscribed" => Ok(ThreadSubscriptionStatus::Unsubscribed),
490 _ => Err(()),
491 }
492 }
493}
494
495impl ThreadSubscriptionStatus {
496 pub fn as_str(&self) -> &'static str {
503 match self {
504 ThreadSubscriptionStatus::Subscribed { automatic } => {
505 if *automatic {
506 "automatic"
507 } else {
508 "manual"
509 }
510 }
511 ThreadSubscriptionStatus::Unsubscribed => "unsubscribed",
512 }
513 }
514}
515
516#[derive(Clone, Copy, Debug, PartialEq, Eq)]
518pub struct StoredThreadSubscription {
519 pub status: ThreadSubscriptionStatus,
521
522 pub bump_stamp: Option<u64>,
530}
531
532#[derive(Clone, Debug, Default)]
534pub struct StateChanges {
535 pub sync_token: Option<String>,
537 pub account_data: BTreeMap<GlobalAccountDataEventType, Raw<AnyGlobalAccountDataEvent>>,
539 pub presence: BTreeMap<OwnedUserId, Raw<PresenceEvent>>,
541
542 pub profiles: BTreeMap<OwnedRoomId, BTreeMap<OwnedUserId, MinimalRoomMemberEvent>>,
545
546 pub profiles_to_delete: BTreeMap<OwnedRoomId, Vec<OwnedUserId>>,
550
551 pub state:
554 BTreeMap<OwnedRoomId, BTreeMap<StateEventType, BTreeMap<String, Raw<AnySyncStateEvent>>>>,
555 pub room_account_data:
557 BTreeMap<OwnedRoomId, BTreeMap<RoomAccountDataEventType, Raw<AnyRoomAccountDataEvent>>>,
558
559 pub room_infos: BTreeMap<OwnedRoomId, RoomInfo>,
561
562 pub receipts: BTreeMap<OwnedRoomId, ReceiptEventContent>,
564
565 pub redactions: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, Raw<SyncRoomRedactionEvent>>>,
568
569 pub stripped_state: BTreeMap<
572 OwnedRoomId,
573 BTreeMap<StateEventType, BTreeMap<String, Raw<AnyStrippedStateEvent>>>,
574 >,
575
576 pub ambiguity_maps: BTreeMap<OwnedRoomId, HashMap<DisplayName, BTreeSet<OwnedUserId>>>,
579}
580
581impl StateChanges {
582 pub fn new(sync_token: String) -> Self {
584 Self { sync_token: Some(sync_token), ..Default::default() }
585 }
586
587 pub fn add_presence_event(&mut self, event: PresenceEvent, raw_event: Raw<PresenceEvent>) {
589 self.presence.insert(event.sender, raw_event);
590 }
591
592 pub fn add_room(&mut self, room: RoomInfo) {
594 self.room_infos.insert(room.room_id.clone(), room);
595 }
596
597 pub fn add_room_account_data(
600 &mut self,
601 room_id: &RoomId,
602 event: AnyRoomAccountDataEvent,
603 raw_event: Raw<AnyRoomAccountDataEvent>,
604 ) {
605 self.room_account_data
606 .entry(room_id.to_owned())
607 .or_default()
608 .insert(event.event_type(), raw_event);
609 }
610
611 pub fn add_stripped_member(
614 &mut self,
615 room_id: &RoomId,
616 user_id: &UserId,
617 event: Raw<StrippedRoomMemberEvent>,
618 ) {
619 self.stripped_state
620 .entry(room_id.to_owned())
621 .or_default()
622 .entry(StateEventType::RoomMember)
623 .or_default()
624 .insert(user_id.into(), event.cast());
625 }
626
627 pub fn add_state_event(
630 &mut self,
631 room_id: &RoomId,
632 event: AnySyncStateEvent,
633 raw_event: Raw<AnySyncStateEvent>,
634 ) {
635 self.state
636 .entry(room_id.to_owned())
637 .or_default()
638 .entry(event.event_type())
639 .or_default()
640 .insert(event.state_key().to_owned(), raw_event);
641 }
642
643 pub fn add_redaction(
645 &mut self,
646 room_id: &RoomId,
647 redacted_event_id: &EventId,
648 redaction: Raw<SyncRoomRedactionEvent>,
649 ) {
650 self.redactions
651 .entry(room_id.to_owned())
652 .or_default()
653 .insert(redacted_event_id.to_owned(), redaction);
654 }
655
656 pub fn add_receipts(&mut self, room_id: &RoomId, event: ReceiptEventContent) {
659 self.receipts.insert(room_id.to_owned(), event);
660 }
661
662 pub(crate) fn state_static_for_key<C, K>(
666 &self,
667 room_id: &RoomId,
668 state_key: &K,
669 ) -> Option<&Raw<SyncStateEvent<C>>>
670 where
671 C: StaticEventContent<IsPrefix = ruma::events::False>
672 + StaticStateEventContent
673 + RedactContent,
674 C::Redacted: RedactedStateEventContent,
675 C::StateKey: Borrow<K>,
676 K: AsRef<str> + ?Sized,
677 {
678 self.state
679 .get(room_id)?
680 .get(&C::TYPE.into())?
681 .get(state_key.as_ref())
682 .map(Raw::cast_ref_unchecked)
683 }
684
685 pub(crate) fn stripped_state_static_for_key<C, K>(
689 &self,
690 room_id: &RoomId,
691 state_key: &K,
692 ) -> Option<&Raw<StrippedStateEvent<C::PossiblyRedacted>>>
693 where
694 C: StaticEventContent<IsPrefix = ruma::events::False> + StaticStateEventContent,
695 C::StateKey: Borrow<K>,
696 K: AsRef<str> + ?Sized,
697 {
698 self.stripped_state
699 .get(room_id)?
700 .get(&C::TYPE.into())?
701 .get(state_key.as_ref())
702 .map(Raw::cast_ref_unchecked)
703 }
704
705 pub(crate) fn any_state_static_for_key<C, K>(
710 &self,
711 room_id: &RoomId,
712 state_key: &K,
713 ) -> Option<StrippedStateEvent<C::PossiblyRedacted>>
714 where
715 C: StaticEventContent<IsPrefix = ruma::events::False>
716 + StaticStateEventContent
717 + RedactContent,
718 C::Redacted: RedactedStateEventContent,
719 C::PossiblyRedacted: StaticEventContent + DeserializeOwned,
720 C::StateKey: Borrow<K>,
721 K: AsRef<str> + ?Sized,
722 {
723 self.state_static_for_key::<C, K>(room_id, state_key)
724 .map(Raw::cast_ref)
725 .or_else(|| self.stripped_state_static_for_key::<C, K>(room_id, state_key))?
726 .deserialize()
727 .ok()
728 }
729
730 pub(crate) fn member(
733 &self,
734 room_id: &RoomId,
735 user_id: &UserId,
736 ) -> Option<StrippedRoomMemberEvent> {
737 self.any_state_static_for_key::<RoomMemberEventContent, _>(room_id, user_id)
738 }
739
740 pub(crate) fn create(&self, room_id: &RoomId) -> Option<RoomCreateWithCreatorEventContent> {
743 self.any_state_static_for_key::<RoomCreateEventContent, _>(room_id, &EmptyStateKey)
744 .map(|event| {
745 RoomCreateWithCreatorEventContent::from_event_content(event.content, event.sender)
746 })
747 .or_else(|| self.room_infos.get(room_id)?.create().cloned())
749 }
750
751 pub(crate) fn power_levels(&self, room_id: &RoomId) -> Option<RoomPowerLevels> {
754 let power_levels_content = self
755 .any_state_static_for_key::<RoomPowerLevelsEventContent, _>(room_id, &EmptyStateKey)?;
756
757 let create_content = self.create(room_id)?;
758 let rules = create_content.room_version.rules().unwrap_or(ROOM_VERSION_RULES_FALLBACK);
759 let creators = create_content.creators();
760
761 Some(power_levels_content.power_levels(&rules.authorization, creators))
762 }
763}
764
765#[derive(Clone)]
780pub struct StoreConfig {
781 #[cfg(feature = "e2e-encryption")]
782 pub(crate) crypto_store: Arc<DynCryptoStore>,
783 pub(crate) state_store: Arc<DynStateStore>,
784 pub(crate) event_cache_store: event_cache_store::EventCacheStoreLock,
785 pub(crate) media_store: media_store::MediaStoreLock,
786 cross_process_store_locks_holder_name: String,
787}
788
789#[cfg(not(tarpaulin_include))]
790impl fmt::Debug for StoreConfig {
791 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> StdResult<(), fmt::Error> {
792 fmt.debug_struct("StoreConfig").finish()
793 }
794}
795
796impl StoreConfig {
797 #[must_use]
802 pub fn new(cross_process_store_locks_holder_name: String) -> Self {
803 Self {
804 #[cfg(feature = "e2e-encryption")]
805 crypto_store: matrix_sdk_crypto::store::MemoryStore::new().into_crypto_store(),
806 state_store: Arc::new(MemoryStore::new()),
807 event_cache_store: event_cache_store::EventCacheStoreLock::new(
808 event_cache_store::MemoryStore::new(),
809 cross_process_store_locks_holder_name.clone(),
810 ),
811 media_store: media_store::MediaStoreLock::new(
812 media_store::MemoryMediaStore::new(),
813 cross_process_store_locks_holder_name.clone(),
814 ),
815 cross_process_store_locks_holder_name,
816 }
817 }
818
819 #[cfg(feature = "e2e-encryption")]
823 pub fn crypto_store(mut self, store: impl IntoCryptoStore) -> Self {
824 self.crypto_store = store.into_crypto_store();
825 self
826 }
827
828 pub fn state_store(mut self, store: impl IntoStateStore) -> Self {
830 self.state_store = store.into_state_store();
831 self
832 }
833
834 pub fn event_cache_store<S>(mut self, event_cache_store: S) -> Self
836 where
837 S: event_cache_store::IntoEventCacheStore,
838 {
839 self.event_cache_store = event_cache_store::EventCacheStoreLock::new(
840 event_cache_store,
841 self.cross_process_store_locks_holder_name.clone(),
842 );
843 self
844 }
845
846 pub fn media_store<S>(mut self, media_store: S) -> Self
848 where
849 S: media_store::IntoMediaStore,
850 {
851 self.media_store = media_store::MediaStoreLock::new(
852 media_store,
853 self.cross_process_store_locks_holder_name.clone(),
854 );
855 self
856 }
857}
858
859#[cfg(test)]
860mod tests {
861 use std::sync::Arc;
862
863 use assert_matches::assert_matches;
864 use matrix_sdk_test::async_test;
865 use ruma::{owned_device_id, owned_user_id, room_id, user_id};
866 use tokio::sync::broadcast;
867
868 use super::{BaseStateStore, MemoryStore, RoomLoadSettings};
869 use crate::{RoomInfo, RoomState, SessionMeta, StateChanges};
870
871 #[async_test]
872 async fn test_set_session_meta() {
873 let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
874
875 let session_meta = SessionMeta {
876 user_id: owned_user_id!("@mnt_io:matrix.org"),
877 device_id: owned_device_id!("HELLOYOU"),
878 };
879
880 assert!(store.session_meta.get().is_none());
881
882 store.set_session_meta(session_meta.clone());
883
884 assert_eq!(store.session_meta.get(), Some(&session_meta));
885 }
886
887 #[async_test]
888 #[should_panic]
889 async fn test_set_session_meta_twice() {
890 let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
891
892 let session_meta = SessionMeta {
893 user_id: owned_user_id!("@mnt_io:matrix.org"),
894 device_id: owned_device_id!("HELLOYOU"),
895 };
896
897 store.set_session_meta(session_meta.clone());
898 store.set_session_meta(session_meta);
900 }
901
902 #[async_test]
903 async fn test_derive_from_other() {
904 let other = BaseStateStore::new(Arc::new(MemoryStore::new()));
906
907 let session_meta = SessionMeta {
908 user_id: owned_user_id!("@mnt_io:matrix.org"),
909 device_id: owned_device_id!("HELLOYOU"),
910 };
911 let (room_info_notable_update_sender, _) = broadcast::channel(1);
912 let room_id_0 = room_id!("!r0");
913
914 other
915 .load_rooms(
916 &session_meta.user_id,
917 RoomLoadSettings::One(room_id_0.to_owned()),
918 &room_info_notable_update_sender,
919 )
920 .await
921 .unwrap();
922 other.set_session_meta(session_meta.clone());
923
924 let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
926 store.derive_from_other(&other, &room_info_notable_update_sender).await.unwrap();
927
928 assert_eq!(store.session_meta.get(), Some(&session_meta));
930 assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::One(ref room_id) => {
932 assert_eq!(room_id, room_id_0);
933 });
934 }
935
936 #[test]
937 fn test_room_load_settings_default() {
938 assert_matches!(RoomLoadSettings::default(), RoomLoadSettings::All);
939 }
940
941 #[async_test]
942 async fn test_load_all_rooms() {
943 let room_id_0 = room_id!("!r0");
944 let room_id_1 = room_id!("!r1");
945 let user_id = user_id!("@mnt_io:matrix.org");
946
947 let memory_state_store = Arc::new(MemoryStore::new());
948
949 {
951 let store = BaseStateStore::new(memory_state_store.clone());
952 let mut changes = StateChanges::default();
953 changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
954 changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
955
956 store.inner.save_changes(&changes).await.unwrap();
957 }
958
959 {
961 let store = BaseStateStore::new(memory_state_store.clone());
962 let (room_info_notable_update_sender, _) = broadcast::channel(2);
963
964 assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
966
967 store
969 .load_rooms(user_id, RoomLoadSettings::All, &room_info_notable_update_sender)
970 .await
971 .unwrap();
972
973 assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
975
976 let mut rooms = store.rooms();
978 rooms.sort_by(|a, b| a.room_id().cmp(b.room_id()));
979
980 assert_eq!(rooms.len(), 2);
981
982 assert_eq!(rooms[0].room_id(), room_id_0);
983 assert_eq!(rooms[0].own_user_id(), user_id);
984
985 assert_eq!(rooms[1].room_id(), room_id_1);
986 assert_eq!(rooms[1].own_user_id(), user_id);
987 }
988 }
989
990 #[async_test]
991 async fn test_load_one_room() {
992 let room_id_0 = room_id!("!r0");
993 let room_id_1 = room_id!("!r1");
994 let user_id = user_id!("@mnt_io:matrix.org");
995
996 let memory_state_store = Arc::new(MemoryStore::new());
997
998 {
1000 let store = BaseStateStore::new(memory_state_store.clone());
1001 let mut changes = StateChanges::default();
1002 changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
1003 changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
1004
1005 store.inner.save_changes(&changes).await.unwrap();
1006 }
1007
1008 {
1010 let store = BaseStateStore::new(memory_state_store.clone());
1011 let (room_info_notable_update_sender, _) = broadcast::channel(2);
1012
1013 assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
1015
1016 store
1018 .load_rooms(
1019 user_id,
1020 RoomLoadSettings::One(room_id_1.to_owned()),
1021 &room_info_notable_update_sender,
1022 )
1023 .await
1024 .unwrap();
1025
1026 assert_matches!(
1028 *store.room_load_settings.read().await,
1029 RoomLoadSettings::One(ref room_id) => {
1030 assert_eq!(room_id, room_id_1);
1031 }
1032 );
1033
1034 let rooms = store.rooms();
1036 assert_eq!(rooms.len(), 1);
1037
1038 assert_eq!(rooms[0].room_id(), room_id_1);
1039 assert_eq!(rooms[0].own_user_id(), user_id);
1040 }
1041 }
1042}