1use std::{
24 borrow::Borrow,
25 collections::{BTreeMap, BTreeSet, HashMap},
26 fmt,
27 ops::Deref,
28 result::Result as StdResult,
29 str::{FromStr, Utf8Error},
30 sync::{Arc, RwLock as StdRwLock},
31};
32
33use eyeball_im::{Vector, VectorDiff};
34use futures_util::Stream;
35use matrix_sdk_common::ROOM_VERSION_RULES_FALLBACK;
36use once_cell::sync::OnceCell;
37
38#[cfg(any(test, feature = "testing"))]
39#[macro_use]
40pub mod integration_tests;
41mod observable_map;
42mod traits;
43
44#[cfg(feature = "e2e-encryption")]
45use matrix_sdk_crypto::store::{DynCryptoStore, IntoCryptoStore};
46pub use matrix_sdk_store_encryption::Error as StoreEncryptionError;
47use observable_map::ObservableMap;
48use ruma::{
49 EventId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UserId,
50 events::{
51 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
52 AnySyncStateEvent, EmptyStateKey, GlobalAccountDataEventType, RedactContent,
53 RedactedStateEventContent, RoomAccountDataEventType, StateEventType, StaticEventContent,
54 StaticStateEventContent, StrippedStateEvent, SyncStateEvent,
55 presence::PresenceEvent,
56 receipt::ReceiptEventContent,
57 room::{
58 create::RoomCreateEventContent,
59 member::{RoomMemberEventContent, StrippedRoomMemberEvent},
60 power_levels::{RoomPowerLevels, RoomPowerLevelsEventContent},
61 redaction::SyncRoomRedactionEvent,
62 },
63 },
64 serde::Raw,
65};
66use serde::de::DeserializeOwned;
67use tokio::sync::{Mutex, RwLock, broadcast};
68use tracing::warn;
69pub use traits::compare_thread_subscription_bump_stamps;
70
71use crate::{
72 MinimalRoomMemberEvent, Room, RoomCreateWithCreatorEventContent, RoomStateFilter, SessionMeta,
73 deserialized_responses::DisplayName,
74 event_cache::store as event_cache_store,
75 media::store as media_store,
76 room::{RoomInfo, RoomInfoNotableUpdate, RoomState},
77};
78
79pub(crate) mod ambiguity_map;
80mod memory_store;
81pub mod migration_helpers;
82mod send_queue;
83
84#[cfg(any(test, feature = "testing"))]
85pub use self::integration_tests::StateStoreIntegrationTests;
86#[cfg(feature = "unstable-msc4274")]
87pub use self::send_queue::{AccumulatedSentMediaInfo, FinishGalleryItemInfo};
88pub use self::{
89 memory_store::MemoryStore,
90 send_queue::{
91 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
92 FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
93 SentMediaInfo, SentRequestKey, SerializableEventContent,
94 },
95 traits::{
96 ComposerDraft, ComposerDraftType, DynStateStore, IntoStateStore, ServerInfo, StateStore,
97 StateStoreDataKey, StateStoreDataValue, StateStoreExt, ThreadSubscriptionCatchupToken,
98 WellKnownResponse,
99 },
100};
101
102#[derive(Debug, thiserror::Error)]
104pub enum StoreError {
105 #[error(transparent)]
107 Backend(Box<dyn std::error::Error + Send + Sync>),
108
109 #[error(transparent)]
111 Json(#[from] serde_json::Error),
112
113 #[error(transparent)]
116 Identifier(#[from] ruma::IdParseError),
117
118 #[error("The store failed to be unlocked")]
121 StoreLocked,
122
123 #[error("The store is not encrypted but was tried to be opened with a passphrase")]
125 UnencryptedStore,
126
127 #[error("Error encrypting or decrypting data from the store: {0}")]
129 Encryption(#[from] StoreEncryptionError),
130
131 #[error("Error encoding or decoding data from the store: {0}")]
133 Codec(#[from] Utf8Error),
134
135 #[error(
137 "The database format changed in an incompatible way, current \
138 version: {0}, latest version: {1}"
139 )]
140 UnsupportedDatabaseVersion(usize, usize),
141
142 #[error("Redaction failed: {0}")]
146 Redaction(#[source] ruma::canonical_json::RedactionError),
147
148 #[error("The store contains invalid data: {details}")]
150 InvalidData {
151 details: String,
153 },
154}
155
156impl StoreError {
157 #[inline]
161 pub fn backend<E>(error: E) -> Self
162 where
163 E: std::error::Error + Send + Sync + 'static,
164 {
165 Self::Backend(Box::new(error))
166 }
167}
168
169pub type Result<T, E = StoreError> = std::result::Result<T, E>;
171
172#[derive(Clone)]
177pub(crate) struct BaseStateStore {
178 pub(super) inner: Arc<DynStateStore>,
179 session_meta: Arc<OnceCell<SessionMeta>>,
180 room_load_settings: Arc<RwLock<RoomLoadSettings>>,
181 pub(super) sync_token: Arc<RwLock<Option<String>>>,
183 rooms: Arc<StdRwLock<ObservableMap<OwnedRoomId, Room>>>,
185 sync_lock: Arc<Mutex<()>>,
188}
189
190impl BaseStateStore {
191 pub fn new(inner: Arc<DynStateStore>) -> Self {
193 Self {
194 inner,
195 session_meta: Default::default(),
196 room_load_settings: Default::default(),
197 sync_token: Default::default(),
198 rooms: Arc::new(StdRwLock::new(ObservableMap::new())),
199 sync_lock: Default::default(),
200 }
201 }
202
203 pub fn sync_lock(&self) -> &Mutex<()> {
205 &self.sync_lock
206 }
207
208 pub(crate) fn set_session_meta(&self, session_meta: SessionMeta) {
214 self.session_meta.set(session_meta).expect("`SessionMeta` was already set");
215 }
216
217 pub(crate) async fn load_rooms(
220 &self,
221 user_id: &UserId,
222 room_load_settings: RoomLoadSettings,
223 room_info_notable_update_sender: &broadcast::Sender<RoomInfoNotableUpdate>,
224 ) -> Result<()> {
225 *self.room_load_settings.write().await = room_load_settings.clone();
226
227 let room_infos = self.load_and_migrate_room_infos(room_load_settings).await?;
228
229 let mut rooms = self.rooms.write().unwrap();
230
231 for room_info in room_infos {
232 let new_room = Room::restore(
233 user_id,
234 self.inner.clone(),
235 room_info,
236 room_info_notable_update_sender.clone(),
237 );
238 let new_room_id = new_room.room_id().to_owned();
239
240 rooms.insert(new_room_id, new_room);
241 }
242
243 Ok(())
244 }
245
246 async fn load_and_migrate_room_infos(
249 &self,
250 room_load_settings: RoomLoadSettings,
251 ) -> Result<Vec<RoomInfo>> {
252 let mut room_infos = self.inner.get_room_infos(&room_load_settings).await?;
253 let mut migrated_room_infos = Vec::with_capacity(room_infos.len());
254
255 for room_info in room_infos.iter_mut() {
256 if room_info.apply_migrations(self.inner.clone()).await {
257 migrated_room_infos.push(room_info.clone());
258 }
259 }
260
261 if !migrated_room_infos.is_empty() {
262 let changes = StateChanges {
263 room_infos: migrated_room_infos
264 .into_iter()
265 .map(|room_info| (room_info.room_id.clone(), room_info))
266 .collect(),
267 ..Default::default()
268 };
269
270 if let Err(error) = self.inner.save_changes(&changes).await {
271 warn!("Failed to save migrated room infos: {error}");
272 }
273 }
274
275 Ok(room_infos)
276 }
277
278 pub(crate) async fn load_sync_token(&self) -> Result<()> {
281 let token =
282 self.get_kv_data(StateStoreDataKey::SyncToken).await?.and_then(|s| s.into_sync_token());
283 *self.sync_token.write().await = token;
284
285 Ok(())
286 }
287
288 #[cfg(any(feature = "e2e-encryption", test))]
291 pub(crate) async fn derive_from_other(
292 &self,
293 other: &Self,
294 room_info_notable_update_sender: &broadcast::Sender<RoomInfoNotableUpdate>,
295 ) -> Result<()> {
296 let Some(session_meta) = other.session_meta.get() else {
297 return Ok(());
298 };
299
300 let room_load_settings = other.room_load_settings.read().await.clone();
301
302 self.load_rooms(&session_meta.user_id, room_load_settings, room_info_notable_update_sender)
303 .await?;
304 self.load_sync_token().await?;
305 self.set_session_meta(session_meta.clone());
306
307 Ok(())
308 }
309
310 pub fn session_meta(&self) -> Option<&SessionMeta> {
312 self.session_meta.get()
313 }
314
315 pub fn rooms(&self) -> Vec<Room> {
317 self.rooms.read().unwrap().iter().cloned().collect()
318 }
319
320 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
322 self.rooms
323 .read()
324 .unwrap()
325 .iter()
326 .filter(|room| filter.matches(room.state()))
327 .cloned()
328 .collect()
329 }
330
331 pub fn rooms_stream(
334 &self,
335 ) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + use<>) {
336 self.rooms.read().unwrap().stream()
337 }
338
339 pub fn room(&self, room_id: &RoomId) -> Option<Room> {
341 self.rooms.read().unwrap().get(room_id).cloned()
342 }
343
344 pub(crate) fn room_exists(&self, room_id: &RoomId) -> bool {
346 self.rooms.read().unwrap().get(room_id).is_some()
347 }
348
349 pub fn get_or_create_room(
352 &self,
353 room_id: &RoomId,
354 room_state: RoomState,
355 room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
356 ) -> Room {
357 let user_id =
358 &self.session_meta.get().expect("Creating room while not being logged in").user_id;
359
360 self.rooms
361 .write()
362 .unwrap()
363 .get_or_create(room_id, || {
364 Room::new(
365 user_id,
366 self.inner.clone(),
367 room_id,
368 room_state,
369 room_info_notable_update_sender,
370 )
371 })
372 .clone()
373 }
374
375 pub(crate) async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
381 self.inner.remove_room(room_id).await?;
382 self.rooms.write().unwrap().remove(room_id);
383 Ok(())
384 }
385}
386
387#[cfg(not(tarpaulin_include))]
388impl fmt::Debug for BaseStateStore {
389 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
390 f.debug_struct("Store")
391 .field("inner", &self.inner)
392 .field("session_meta", &self.session_meta)
393 .field("sync_token", &self.sync_token)
394 .field("rooms", &self.rooms)
395 .finish_non_exhaustive()
396 }
397}
398
399impl Deref for BaseStateStore {
400 type Target = DynStateStore;
401
402 fn deref(&self) -> &Self::Target {
403 self.inner.deref()
404 }
405}
406
407#[derive(Clone, Debug, Default)]
442pub enum RoomLoadSettings {
443 #[default]
448 All,
449
450 One(OwnedRoomId),
456}
457
458#[derive(Clone, Copy, Debug, PartialEq, Eq)]
464pub enum ThreadSubscriptionStatus {
465 Subscribed {
467 automatic: bool,
470 },
471
472 Unsubscribed,
474}
475
476impl FromStr for ThreadSubscriptionStatus {
477 type Err = ();
478
479 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
480 match s {
481 "automatic" => Ok(ThreadSubscriptionStatus::Subscribed { automatic: true }),
482 "manual" => Ok(ThreadSubscriptionStatus::Subscribed { automatic: false }),
483 "unsubscribed" => Ok(ThreadSubscriptionStatus::Unsubscribed),
484 _ => Err(()),
485 }
486 }
487}
488
489impl ThreadSubscriptionStatus {
490 pub fn as_str(&self) -> &'static str {
497 match self {
498 ThreadSubscriptionStatus::Subscribed { automatic } => {
499 if *automatic {
500 "automatic"
501 } else {
502 "manual"
503 }
504 }
505 ThreadSubscriptionStatus::Unsubscribed => "unsubscribed",
506 }
507 }
508}
509
510#[derive(Clone, Copy, Debug, PartialEq, Eq)]
512pub struct StoredThreadSubscription {
513 pub status: ThreadSubscriptionStatus,
515
516 pub bump_stamp: Option<u64>,
524}
525
526#[derive(Clone, Debug, Default)]
528pub struct StateChanges {
529 pub sync_token: Option<String>,
531 pub account_data: BTreeMap<GlobalAccountDataEventType, Raw<AnyGlobalAccountDataEvent>>,
533 pub presence: BTreeMap<OwnedUserId, Raw<PresenceEvent>>,
535
536 pub profiles: BTreeMap<OwnedRoomId, BTreeMap<OwnedUserId, MinimalRoomMemberEvent>>,
539
540 pub profiles_to_delete: BTreeMap<OwnedRoomId, Vec<OwnedUserId>>,
544
545 pub state:
548 BTreeMap<OwnedRoomId, BTreeMap<StateEventType, BTreeMap<String, Raw<AnySyncStateEvent>>>>,
549 pub room_account_data:
551 BTreeMap<OwnedRoomId, BTreeMap<RoomAccountDataEventType, Raw<AnyRoomAccountDataEvent>>>,
552
553 pub room_infos: BTreeMap<OwnedRoomId, RoomInfo>,
555
556 pub receipts: BTreeMap<OwnedRoomId, ReceiptEventContent>,
558
559 pub redactions: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, Raw<SyncRoomRedactionEvent>>>,
562
563 pub stripped_state: BTreeMap<
566 OwnedRoomId,
567 BTreeMap<StateEventType, BTreeMap<String, Raw<AnyStrippedStateEvent>>>,
568 >,
569
570 pub ambiguity_maps: BTreeMap<OwnedRoomId, HashMap<DisplayName, BTreeSet<OwnedUserId>>>,
573}
574
575impl StateChanges {
576 pub fn new(sync_token: String) -> Self {
578 Self { sync_token: Some(sync_token), ..Default::default() }
579 }
580
581 pub fn add_presence_event(&mut self, event: PresenceEvent, raw_event: Raw<PresenceEvent>) {
583 self.presence.insert(event.sender, raw_event);
584 }
585
586 pub fn add_room(&mut self, room: RoomInfo) {
588 self.room_infos.insert(room.room_id.clone(), room);
589 }
590
591 pub fn add_room_account_data(
594 &mut self,
595 room_id: &RoomId,
596 event: AnyRoomAccountDataEvent,
597 raw_event: Raw<AnyRoomAccountDataEvent>,
598 ) {
599 self.room_account_data
600 .entry(room_id.to_owned())
601 .or_default()
602 .insert(event.event_type(), raw_event);
603 }
604
605 pub fn add_stripped_member(
608 &mut self,
609 room_id: &RoomId,
610 user_id: &UserId,
611 event: Raw<StrippedRoomMemberEvent>,
612 ) {
613 self.stripped_state
614 .entry(room_id.to_owned())
615 .or_default()
616 .entry(StateEventType::RoomMember)
617 .or_default()
618 .insert(user_id.into(), event.cast());
619 }
620
621 pub fn add_state_event(
624 &mut self,
625 room_id: &RoomId,
626 event: AnySyncStateEvent,
627 raw_event: Raw<AnySyncStateEvent>,
628 ) {
629 self.state
630 .entry(room_id.to_owned())
631 .or_default()
632 .entry(event.event_type())
633 .or_default()
634 .insert(event.state_key().to_owned(), raw_event);
635 }
636
637 pub fn add_redaction(
639 &mut self,
640 room_id: &RoomId,
641 redacted_event_id: &EventId,
642 redaction: Raw<SyncRoomRedactionEvent>,
643 ) {
644 self.redactions
645 .entry(room_id.to_owned())
646 .or_default()
647 .insert(redacted_event_id.to_owned(), redaction);
648 }
649
650 pub fn add_receipts(&mut self, room_id: &RoomId, event: ReceiptEventContent) {
653 self.receipts.insert(room_id.to_owned(), event);
654 }
655
656 pub(crate) fn state_static_for_key<C, K>(
660 &self,
661 room_id: &RoomId,
662 state_key: &K,
663 ) -> Option<&Raw<SyncStateEvent<C>>>
664 where
665 C: StaticEventContent<IsPrefix = ruma::events::False>
666 + StaticStateEventContent
667 + RedactContent,
668 C::Redacted: RedactedStateEventContent,
669 C::StateKey: Borrow<K>,
670 K: AsRef<str> + ?Sized,
671 {
672 self.state
673 .get(room_id)?
674 .get(&C::TYPE.into())?
675 .get(state_key.as_ref())
676 .map(Raw::cast_ref_unchecked)
677 }
678
679 pub(crate) fn stripped_state_static_for_key<C, K>(
683 &self,
684 room_id: &RoomId,
685 state_key: &K,
686 ) -> Option<&Raw<StrippedStateEvent<C::PossiblyRedacted>>>
687 where
688 C: StaticEventContent<IsPrefix = ruma::events::False> + StaticStateEventContent,
689 C::StateKey: Borrow<K>,
690 K: AsRef<str> + ?Sized,
691 {
692 self.stripped_state
693 .get(room_id)?
694 .get(&C::TYPE.into())?
695 .get(state_key.as_ref())
696 .map(Raw::cast_ref_unchecked)
697 }
698
699 pub(crate) fn any_state_static_for_key<C, K>(
704 &self,
705 room_id: &RoomId,
706 state_key: &K,
707 ) -> Option<StrippedStateEvent<C::PossiblyRedacted>>
708 where
709 C: StaticEventContent<IsPrefix = ruma::events::False>
710 + StaticStateEventContent
711 + RedactContent,
712 C::Redacted: RedactedStateEventContent,
713 C::PossiblyRedacted: StaticEventContent + DeserializeOwned,
714 C::StateKey: Borrow<K>,
715 K: AsRef<str> + ?Sized,
716 {
717 self.state_static_for_key::<C, K>(room_id, state_key)
718 .map(Raw::cast_ref)
719 .or_else(|| self.stripped_state_static_for_key::<C, K>(room_id, state_key))?
720 .deserialize()
721 .ok()
722 }
723
724 pub(crate) fn member(
727 &self,
728 room_id: &RoomId,
729 user_id: &UserId,
730 ) -> Option<StrippedRoomMemberEvent> {
731 self.any_state_static_for_key::<RoomMemberEventContent, _>(room_id, user_id)
732 }
733
734 pub(crate) fn create(&self, room_id: &RoomId) -> Option<RoomCreateWithCreatorEventContent> {
737 self.any_state_static_for_key::<RoomCreateEventContent, _>(room_id, &EmptyStateKey)
738 .map(|event| {
739 RoomCreateWithCreatorEventContent::from_event_content(event.content, event.sender)
740 })
741 .or_else(|| self.room_infos.get(room_id)?.create().cloned())
743 }
744
745 pub(crate) fn power_levels(&self, room_id: &RoomId) -> Option<RoomPowerLevels> {
748 let power_levels_content = self
749 .any_state_static_for_key::<RoomPowerLevelsEventContent, _>(room_id, &EmptyStateKey)?;
750
751 let create_content = self.create(room_id)?;
752 let rules = create_content.room_version.rules().unwrap_or(ROOM_VERSION_RULES_FALLBACK);
753 let creators = create_content.creators();
754
755 Some(power_levels_content.power_levels(&rules.authorization, creators))
756 }
757}
758
759#[derive(Clone)]
774pub struct StoreConfig {
775 #[cfg(feature = "e2e-encryption")]
776 pub(crate) crypto_store: Arc<DynCryptoStore>,
777 pub(crate) state_store: Arc<DynStateStore>,
778 pub(crate) event_cache_store: event_cache_store::EventCacheStoreLock,
779 pub(crate) media_store: media_store::MediaStoreLock,
780 cross_process_store_locks_holder_name: String,
781}
782
783#[cfg(not(tarpaulin_include))]
784impl fmt::Debug for StoreConfig {
785 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> StdResult<(), fmt::Error> {
786 fmt.debug_struct("StoreConfig").finish()
787 }
788}
789
790impl StoreConfig {
791 #[must_use]
796 pub fn new(cross_process_store_locks_holder_name: String) -> Self {
797 Self {
798 #[cfg(feature = "e2e-encryption")]
799 crypto_store: matrix_sdk_crypto::store::MemoryStore::new().into_crypto_store(),
800 state_store: Arc::new(MemoryStore::new()),
801 event_cache_store: event_cache_store::EventCacheStoreLock::new(
802 event_cache_store::MemoryStore::new(),
803 cross_process_store_locks_holder_name.clone(),
804 ),
805 media_store: media_store::MediaStoreLock::new(
806 media_store::MemoryMediaStore::new(),
807 cross_process_store_locks_holder_name.clone(),
808 ),
809 cross_process_store_locks_holder_name,
810 }
811 }
812
813 #[cfg(feature = "e2e-encryption")]
817 pub fn crypto_store(mut self, store: impl IntoCryptoStore) -> Self {
818 self.crypto_store = store.into_crypto_store();
819 self
820 }
821
822 pub fn state_store(mut self, store: impl IntoStateStore) -> Self {
824 self.state_store = store.into_state_store();
825 self
826 }
827
828 pub fn event_cache_store<S>(mut self, event_cache_store: S) -> Self
830 where
831 S: event_cache_store::IntoEventCacheStore,
832 {
833 self.event_cache_store = event_cache_store::EventCacheStoreLock::new(
834 event_cache_store,
835 self.cross_process_store_locks_holder_name.clone(),
836 );
837 self
838 }
839
840 pub fn media_store<S>(mut self, media_store: S) -> Self
842 where
843 S: media_store::IntoMediaStore,
844 {
845 self.media_store = media_store::MediaStoreLock::new(
846 media_store,
847 self.cross_process_store_locks_holder_name.clone(),
848 );
849 self
850 }
851}
852
853#[cfg(test)]
854mod tests {
855 use std::sync::Arc;
856
857 use assert_matches::assert_matches;
858 use matrix_sdk_test::async_test;
859 use ruma::{owned_device_id, owned_user_id, room_id, user_id};
860 use tokio::sync::broadcast;
861
862 use super::{BaseStateStore, MemoryStore, RoomLoadSettings};
863 use crate::{RoomInfo, RoomState, SessionMeta, StateChanges};
864
865 #[async_test]
866 async fn test_set_session_meta() {
867 let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
868
869 let session_meta = SessionMeta {
870 user_id: owned_user_id!("@mnt_io:matrix.org"),
871 device_id: owned_device_id!("HELLOYOU"),
872 };
873
874 assert!(store.session_meta.get().is_none());
875
876 store.set_session_meta(session_meta.clone());
877
878 assert_eq!(store.session_meta.get(), Some(&session_meta));
879 }
880
881 #[async_test]
882 #[should_panic]
883 async fn test_set_session_meta_twice() {
884 let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
885
886 let session_meta = SessionMeta {
887 user_id: owned_user_id!("@mnt_io:matrix.org"),
888 device_id: owned_device_id!("HELLOYOU"),
889 };
890
891 store.set_session_meta(session_meta.clone());
892 store.set_session_meta(session_meta);
894 }
895
896 #[async_test]
897 async fn test_derive_from_other() {
898 let other = BaseStateStore::new(Arc::new(MemoryStore::new()));
900
901 let session_meta = SessionMeta {
902 user_id: owned_user_id!("@mnt_io:matrix.org"),
903 device_id: owned_device_id!("HELLOYOU"),
904 };
905 let (room_info_notable_update_sender, _) = broadcast::channel(1);
906 let room_id_0 = room_id!("!r0");
907
908 other
909 .load_rooms(
910 &session_meta.user_id,
911 RoomLoadSettings::One(room_id_0.to_owned()),
912 &room_info_notable_update_sender,
913 )
914 .await
915 .unwrap();
916 other.set_session_meta(session_meta.clone());
917
918 let store = BaseStateStore::new(Arc::new(MemoryStore::new()));
920 store.derive_from_other(&other, &room_info_notable_update_sender).await.unwrap();
921
922 assert_eq!(store.session_meta.get(), Some(&session_meta));
924 assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::One(ref room_id) => {
926 assert_eq!(room_id, room_id_0);
927 });
928 }
929
930 #[test]
931 fn test_room_load_settings_default() {
932 assert_matches!(RoomLoadSettings::default(), RoomLoadSettings::All);
933 }
934
935 #[async_test]
936 async fn test_load_all_rooms() {
937 let room_id_0 = room_id!("!r0");
938 let room_id_1 = room_id!("!r1");
939 let user_id = user_id!("@mnt_io:matrix.org");
940
941 let memory_state_store = Arc::new(MemoryStore::new());
942
943 {
945 let store = BaseStateStore::new(memory_state_store.clone());
946 let mut changes = StateChanges::default();
947 changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
948 changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
949
950 store.inner.save_changes(&changes).await.unwrap();
951 }
952
953 {
955 let store = BaseStateStore::new(memory_state_store.clone());
956 let (room_info_notable_update_sender, _) = broadcast::channel(2);
957
958 assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
960
961 store
963 .load_rooms(user_id, RoomLoadSettings::All, &room_info_notable_update_sender)
964 .await
965 .unwrap();
966
967 assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
969
970 let mut rooms = store.rooms();
972 rooms.sort_by(|a, b| a.room_id().cmp(b.room_id()));
973
974 assert_eq!(rooms.len(), 2);
975
976 assert_eq!(rooms[0].room_id(), room_id_0);
977 assert_eq!(rooms[0].own_user_id(), user_id);
978
979 assert_eq!(rooms[1].room_id(), room_id_1);
980 assert_eq!(rooms[1].own_user_id(), user_id);
981 }
982 }
983
984 #[async_test]
985 async fn test_load_one_room() {
986 let room_id_0 = room_id!("!r0");
987 let room_id_1 = room_id!("!r1");
988 let user_id = user_id!("@mnt_io:matrix.org");
989
990 let memory_state_store = Arc::new(MemoryStore::new());
991
992 {
994 let store = BaseStateStore::new(memory_state_store.clone());
995 let mut changes = StateChanges::default();
996 changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
997 changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
998
999 store.inner.save_changes(&changes).await.unwrap();
1000 }
1001
1002 {
1004 let store = BaseStateStore::new(memory_state_store.clone());
1005 let (room_info_notable_update_sender, _) = broadcast::channel(2);
1006
1007 assert_matches!(*store.room_load_settings.read().await, RoomLoadSettings::All);
1009
1010 store
1012 .load_rooms(
1013 user_id,
1014 RoomLoadSettings::One(room_id_1.to_owned()),
1015 &room_info_notable_update_sender,
1016 )
1017 .await
1018 .unwrap();
1019
1020 assert_matches!(
1022 *store.room_load_settings.read().await,
1023 RoomLoadSettings::One(ref room_id) => {
1024 assert_eq!(room_id, room_id_1);
1025 }
1026 );
1027
1028 let rooms = store.rooms();
1030 assert_eq!(rooms.len(), 1);
1031
1032 assert_eq!(rooms[0].room_id(), room_id_1);
1033 assert_eq!(rooms[0].own_user_id(), user_id);
1034 }
1035 }
1036}