1use std::{
16 borrow::Borrow,
17 collections::{BTreeMap, BTreeSet, HashMap},
18 fmt,
19 sync::Arc,
20};
21
22use as_variant::as_variant;
23use async_trait::async_trait;
24use growable_bloom_filter::GrowableBloom;
25use matrix_sdk_common::AsyncTraitDeps;
26use ruma::{
27 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri, OwnedRoomId,
28 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
29 api::{
30 SupportedVersions,
31 client::discovery::discover_homeserver::{
32 self, HomeserverInfo, IdentityServerInfo, RtcFocusInfo, TileServerInfo,
33 },
34 },
35 events::{
36 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, EmptyStateKey, GlobalAccountDataEvent,
37 GlobalAccountDataEventContent, GlobalAccountDataEventType, RedactContent,
38 RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
39 RoomAccountDataEventType, StateEventType, StaticEventContent, StaticStateEventContent,
40 presence::PresenceEvent,
41 receipt::{Receipt, ReceiptThread, ReceiptType},
42 },
43 serde::Raw,
44 time::SystemTime,
45};
46use serde::{Deserialize, Serialize};
47
48use super::{
49 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError,
50 QueuedRequest, QueuedRequestKind, RoomLoadSettings, StateChanges, StoreError,
51 send_queue::SentRequestKey,
52};
53use crate::{
54 MinimalRoomMemberEvent, RoomInfo, RoomMemberships,
55 deserialized_responses::{
56 DisplayName, RawAnySyncOrStrippedState, RawMemberEvent, RawSyncOrStrippedState,
57 },
58 store::StoredThreadSubscription,
59};
60
61#[cfg_attr(target_family = "wasm", async_trait(?Send))]
64#[cfg_attr(not(target_family = "wasm"), async_trait)]
65pub trait StateStore: AsyncTraitDeps {
66 type Error: fmt::Debug + Into<StoreError> + From<serde_json::Error>;
68
69 async fn get_kv_data(
75 &self,
76 key: StateStoreDataKey<'_>,
77 ) -> Result<Option<StateStoreDataValue>, Self::Error>;
78
79 async fn set_kv_data(
89 &self,
90 key: StateStoreDataKey<'_>,
91 value: StateStoreDataValue,
92 ) -> Result<(), Self::Error>;
93
94 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error>;
100
101 async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error>;
103
104 async fn get_presence_event(
111 &self,
112 user_id: &UserId,
113 ) -> Result<Option<Raw<PresenceEvent>>, Self::Error>;
114
115 async fn get_presence_events(
121 &self,
122 user_ids: &[OwnedUserId],
123 ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error>;
124
125 async fn get_state_event(
133 &self,
134 room_id: &RoomId,
135 event_type: StateEventType,
136 state_key: &str,
137 ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error>;
138
139 async fn get_state_events(
147 &self,
148 room_id: &RoomId,
149 event_type: StateEventType,
150 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error>;
151
152 async fn get_state_events_for_keys(
163 &self,
164 room_id: &RoomId,
165 event_type: StateEventType,
166 state_keys: &[&str],
167 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error>;
168
169 async fn get_profile(
177 &self,
178 room_id: &RoomId,
179 user_id: &UserId,
180 ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error>;
181
182 async fn get_profiles<'a>(
190 &self,
191 room_id: &RoomId,
192 user_ids: &'a [OwnedUserId],
193 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error>;
194
195 async fn get_user_ids(
198 &self,
199 room_id: &RoomId,
200 memberships: RoomMemberships,
201 ) -> Result<Vec<OwnedUserId>, Self::Error>;
202
203 async fn get_room_infos(
205 &self,
206 room_load_settings: &RoomLoadSettings,
207 ) -> Result<Vec<RoomInfo>, Self::Error>;
208
209 async fn get_users_with_display_name(
218 &self,
219 room_id: &RoomId,
220 display_name: &DisplayName,
221 ) -> Result<BTreeSet<OwnedUserId>, Self::Error>;
222
223 async fn get_users_with_display_names<'a>(
231 &self,
232 room_id: &RoomId,
233 display_names: &'a [DisplayName],
234 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error>;
235
236 async fn get_account_data_event(
242 &self,
243 event_type: GlobalAccountDataEventType,
244 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error>;
245
246 async fn get_room_account_data_event(
256 &self,
257 room_id: &RoomId,
258 event_type: RoomAccountDataEventType,
259 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error>;
260
261 async fn get_user_room_receipt_event(
274 &self,
275 room_id: &RoomId,
276 receipt_type: ReceiptType,
277 thread: ReceiptThread,
278 user_id: &UserId,
279 ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error>;
280
281 async fn get_event_room_receipt_events(
295 &self,
296 room_id: &RoomId,
297 receipt_type: ReceiptType,
298 thread: ReceiptThread,
299 event_id: &EventId,
300 ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error>;
301
302 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
308
309 async fn set_custom_value(
318 &self,
319 key: &[u8],
320 value: Vec<u8>,
321 ) -> Result<Option<Vec<u8>>, Self::Error>;
322
323 async fn set_custom_value_no_read(
337 &self,
338 key: &[u8],
339 value: Vec<u8>,
340 ) -> Result<(), Self::Error> {
341 self.set_custom_value(key, value).await.map(|_| ())
342 }
343
344 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
350
351 async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error>;
357
358 async fn save_send_queue_request(
368 &self,
369 room_id: &RoomId,
370 transaction_id: OwnedTransactionId,
371 created_at: MilliSecondsSinceUnixEpoch,
372 request: QueuedRequestKind,
373 priority: usize,
374 ) -> Result<(), Self::Error>;
375
376 async fn update_send_queue_request(
388 &self,
389 room_id: &RoomId,
390 transaction_id: &TransactionId,
391 content: QueuedRequestKind,
392 ) -> Result<bool, Self::Error>;
393
394 async fn remove_send_queue_request(
400 &self,
401 room_id: &RoomId,
402 transaction_id: &TransactionId,
403 ) -> Result<bool, Self::Error>;
404
405 async fn load_send_queue_requests(
411 &self,
412 room_id: &RoomId,
413 ) -> Result<Vec<QueuedRequest>, Self::Error>;
414
415 async fn update_send_queue_request_status(
418 &self,
419 room_id: &RoomId,
420 transaction_id: &TransactionId,
421 error: Option<QueueWedgeError>,
422 ) -> Result<(), Self::Error>;
423
424 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error>;
426
427 async fn save_dependent_queued_request(
430 &self,
431 room_id: &RoomId,
432 parent_txn_id: &TransactionId,
433 own_txn_id: ChildTransactionId,
434 created_at: MilliSecondsSinceUnixEpoch,
435 content: DependentQueuedRequestKind,
436 ) -> Result<(), Self::Error>;
437
438 async fn mark_dependent_queued_requests_as_ready(
447 &self,
448 room_id: &RoomId,
449 parent_txn_id: &TransactionId,
450 sent_parent_key: SentRequestKey,
451 ) -> Result<usize, Self::Error>;
452
453 async fn update_dependent_queued_request(
457 &self,
458 room_id: &RoomId,
459 own_transaction_id: &ChildTransactionId,
460 new_content: DependentQueuedRequestKind,
461 ) -> Result<bool, Self::Error>;
462
463 async fn remove_dependent_queued_request(
468 &self,
469 room: &RoomId,
470 own_txn_id: &ChildTransactionId,
471 ) -> Result<bool, Self::Error>;
472
473 async fn load_dependent_queued_requests(
479 &self,
480 room: &RoomId,
481 ) -> Result<Vec<DependentQueuedRequest>, Self::Error>;
482
483 async fn upsert_thread_subscription(
493 &self,
494 room: &RoomId,
495 thread_id: &EventId,
496 subscription: StoredThreadSubscription,
497 ) -> Result<(), Self::Error>;
498
499 async fn remove_thread_subscription(
503 &self,
504 room: &RoomId,
505 thread_id: &EventId,
506 ) -> Result<(), Self::Error>;
507
508 async fn load_thread_subscription(
512 &self,
513 room: &RoomId,
514 thread_id: &EventId,
515 ) -> Result<Option<StoredThreadSubscription>, Self::Error>;
516}
517
518#[repr(transparent)]
519struct EraseStateStoreError<T>(T);
520
521#[cfg(not(tarpaulin_include))]
522impl<T: fmt::Debug> fmt::Debug for EraseStateStoreError<T> {
523 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
524 self.0.fmt(f)
525 }
526}
527
528#[cfg_attr(target_family = "wasm", async_trait(?Send))]
529#[cfg_attr(not(target_family = "wasm"), async_trait)]
530impl<T: StateStore> StateStore for EraseStateStoreError<T> {
531 type Error = StoreError;
532
533 async fn get_kv_data(
534 &self,
535 key: StateStoreDataKey<'_>,
536 ) -> Result<Option<StateStoreDataValue>, Self::Error> {
537 self.0.get_kv_data(key).await.map_err(Into::into)
538 }
539
540 async fn set_kv_data(
541 &self,
542 key: StateStoreDataKey<'_>,
543 value: StateStoreDataValue,
544 ) -> Result<(), Self::Error> {
545 self.0.set_kv_data(key, value).await.map_err(Into::into)
546 }
547
548 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
549 self.0.remove_kv_data(key).await.map_err(Into::into)
550 }
551
552 async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
553 self.0.save_changes(changes).await.map_err(Into::into)
554 }
555
556 async fn get_presence_event(
557 &self,
558 user_id: &UserId,
559 ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
560 self.0.get_presence_event(user_id).await.map_err(Into::into)
561 }
562
563 async fn get_presence_events(
564 &self,
565 user_ids: &[OwnedUserId],
566 ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
567 self.0.get_presence_events(user_ids).await.map_err(Into::into)
568 }
569
570 async fn get_state_event(
571 &self,
572 room_id: &RoomId,
573 event_type: StateEventType,
574 state_key: &str,
575 ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
576 self.0.get_state_event(room_id, event_type, state_key).await.map_err(Into::into)
577 }
578
579 async fn get_state_events(
580 &self,
581 room_id: &RoomId,
582 event_type: StateEventType,
583 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
584 self.0.get_state_events(room_id, event_type).await.map_err(Into::into)
585 }
586
587 async fn get_state_events_for_keys(
588 &self,
589 room_id: &RoomId,
590 event_type: StateEventType,
591 state_keys: &[&str],
592 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
593 self.0.get_state_events_for_keys(room_id, event_type, state_keys).await.map_err(Into::into)
594 }
595
596 async fn get_profile(
597 &self,
598 room_id: &RoomId,
599 user_id: &UserId,
600 ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
601 self.0.get_profile(room_id, user_id).await.map_err(Into::into)
602 }
603
604 async fn get_profiles<'a>(
605 &self,
606 room_id: &RoomId,
607 user_ids: &'a [OwnedUserId],
608 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
609 self.0.get_profiles(room_id, user_ids).await.map_err(Into::into)
610 }
611
612 async fn get_user_ids(
613 &self,
614 room_id: &RoomId,
615 memberships: RoomMemberships,
616 ) -> Result<Vec<OwnedUserId>, Self::Error> {
617 self.0.get_user_ids(room_id, memberships).await.map_err(Into::into)
618 }
619
620 async fn get_room_infos(
621 &self,
622 room_load_settings: &RoomLoadSettings,
623 ) -> Result<Vec<RoomInfo>, Self::Error> {
624 self.0.get_room_infos(room_load_settings).await.map_err(Into::into)
625 }
626
627 async fn get_users_with_display_name(
628 &self,
629 room_id: &RoomId,
630 display_name: &DisplayName,
631 ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
632 self.0.get_users_with_display_name(room_id, display_name).await.map_err(Into::into)
633 }
634
635 async fn get_users_with_display_names<'a>(
636 &self,
637 room_id: &RoomId,
638 display_names: &'a [DisplayName],
639 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
640 self.0.get_users_with_display_names(room_id, display_names).await.map_err(Into::into)
641 }
642
643 async fn get_account_data_event(
644 &self,
645 event_type: GlobalAccountDataEventType,
646 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
647 self.0.get_account_data_event(event_type).await.map_err(Into::into)
648 }
649
650 async fn get_room_account_data_event(
651 &self,
652 room_id: &RoomId,
653 event_type: RoomAccountDataEventType,
654 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
655 self.0.get_room_account_data_event(room_id, event_type).await.map_err(Into::into)
656 }
657
658 async fn get_user_room_receipt_event(
659 &self,
660 room_id: &RoomId,
661 receipt_type: ReceiptType,
662 thread: ReceiptThread,
663 user_id: &UserId,
664 ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
665 self.0
666 .get_user_room_receipt_event(room_id, receipt_type, thread, user_id)
667 .await
668 .map_err(Into::into)
669 }
670
671 async fn get_event_room_receipt_events(
672 &self,
673 room_id: &RoomId,
674 receipt_type: ReceiptType,
675 thread: ReceiptThread,
676 event_id: &EventId,
677 ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
678 self.0
679 .get_event_room_receipt_events(room_id, receipt_type, thread, event_id)
680 .await
681 .map_err(Into::into)
682 }
683
684 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
685 self.0.get_custom_value(key).await.map_err(Into::into)
686 }
687
688 async fn set_custom_value(
689 &self,
690 key: &[u8],
691 value: Vec<u8>,
692 ) -> Result<Option<Vec<u8>>, Self::Error> {
693 self.0.set_custom_value(key, value).await.map_err(Into::into)
694 }
695
696 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
697 self.0.remove_custom_value(key).await.map_err(Into::into)
698 }
699
700 async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
701 self.0.remove_room(room_id).await.map_err(Into::into)
702 }
703
704 async fn save_send_queue_request(
705 &self,
706 room_id: &RoomId,
707 transaction_id: OwnedTransactionId,
708 created_at: MilliSecondsSinceUnixEpoch,
709 content: QueuedRequestKind,
710 priority: usize,
711 ) -> Result<(), Self::Error> {
712 self.0
713 .save_send_queue_request(room_id, transaction_id, created_at, content, priority)
714 .await
715 .map_err(Into::into)
716 }
717
718 async fn update_send_queue_request(
719 &self,
720 room_id: &RoomId,
721 transaction_id: &TransactionId,
722 content: QueuedRequestKind,
723 ) -> Result<bool, Self::Error> {
724 self.0.update_send_queue_request(room_id, transaction_id, content).await.map_err(Into::into)
725 }
726
727 async fn remove_send_queue_request(
728 &self,
729 room_id: &RoomId,
730 transaction_id: &TransactionId,
731 ) -> Result<bool, Self::Error> {
732 self.0.remove_send_queue_request(room_id, transaction_id).await.map_err(Into::into)
733 }
734
735 async fn load_send_queue_requests(
736 &self,
737 room_id: &RoomId,
738 ) -> Result<Vec<QueuedRequest>, Self::Error> {
739 self.0.load_send_queue_requests(room_id).await.map_err(Into::into)
740 }
741
742 async fn update_send_queue_request_status(
743 &self,
744 room_id: &RoomId,
745 transaction_id: &TransactionId,
746 error: Option<QueueWedgeError>,
747 ) -> Result<(), Self::Error> {
748 self.0
749 .update_send_queue_request_status(room_id, transaction_id, error)
750 .await
751 .map_err(Into::into)
752 }
753
754 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
755 self.0.load_rooms_with_unsent_requests().await.map_err(Into::into)
756 }
757
758 async fn save_dependent_queued_request(
759 &self,
760 room_id: &RoomId,
761 parent_txn_id: &TransactionId,
762 own_txn_id: ChildTransactionId,
763 created_at: MilliSecondsSinceUnixEpoch,
764 content: DependentQueuedRequestKind,
765 ) -> Result<(), Self::Error> {
766 self.0
767 .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
768 .await
769 .map_err(Into::into)
770 }
771
772 async fn mark_dependent_queued_requests_as_ready(
773 &self,
774 room_id: &RoomId,
775 parent_txn_id: &TransactionId,
776 sent_parent_key: SentRequestKey,
777 ) -> Result<usize, Self::Error> {
778 self.0
779 .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
780 .await
781 .map_err(Into::into)
782 }
783
784 async fn remove_dependent_queued_request(
785 &self,
786 room_id: &RoomId,
787 own_txn_id: &ChildTransactionId,
788 ) -> Result<bool, Self::Error> {
789 self.0.remove_dependent_queued_request(room_id, own_txn_id).await.map_err(Into::into)
790 }
791
792 async fn load_dependent_queued_requests(
793 &self,
794 room_id: &RoomId,
795 ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
796 self.0.load_dependent_queued_requests(room_id).await.map_err(Into::into)
797 }
798
799 async fn update_dependent_queued_request(
800 &self,
801 room_id: &RoomId,
802 own_transaction_id: &ChildTransactionId,
803 new_content: DependentQueuedRequestKind,
804 ) -> Result<bool, Self::Error> {
805 self.0
806 .update_dependent_queued_request(room_id, own_transaction_id, new_content)
807 .await
808 .map_err(Into::into)
809 }
810
811 async fn upsert_thread_subscription(
812 &self,
813 room: &RoomId,
814 thread_id: &EventId,
815 subscription: StoredThreadSubscription,
816 ) -> Result<(), Self::Error> {
817 self.0.upsert_thread_subscription(room, thread_id, subscription).await.map_err(Into::into)
818 }
819
820 async fn load_thread_subscription(
821 &self,
822 room: &RoomId,
823 thread_id: &EventId,
824 ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
825 self.0.load_thread_subscription(room, thread_id).await.map_err(Into::into)
826 }
827
828 async fn remove_thread_subscription(
829 &self,
830 room: &RoomId,
831 thread_id: &EventId,
832 ) -> Result<(), Self::Error> {
833 self.0.remove_thread_subscription(room, thread_id).await.map_err(Into::into)
834 }
835}
836
837#[cfg_attr(target_family = "wasm", async_trait(?Send))]
839#[cfg_attr(not(target_family = "wasm"), async_trait)]
840pub trait StateStoreExt: StateStore {
841 async fn get_state_event_static<C>(
847 &self,
848 room_id: &RoomId,
849 ) -> Result<Option<RawSyncOrStrippedState<C>>, Self::Error>
850 where
851 C: StaticEventContent<IsPrefix = ruma::events::False>
852 + StaticStateEventContent<StateKey = EmptyStateKey>
853 + RedactContent,
854 C::Redacted: RedactedStateEventContent,
855 {
856 Ok(self.get_state_event(room_id, C::TYPE.into(), "").await?.map(|raw| raw.cast()))
857 }
858
859 async fn get_state_event_static_for_key<C, K>(
865 &self,
866 room_id: &RoomId,
867 state_key: &K,
868 ) -> Result<Option<RawSyncOrStrippedState<C>>, Self::Error>
869 where
870 C: StaticEventContent<IsPrefix = ruma::events::False>
871 + StaticStateEventContent
872 + RedactContent,
873 C::StateKey: Borrow<K>,
874 C::Redacted: RedactedStateEventContent,
875 K: AsRef<str> + ?Sized + Sync,
876 {
877 Ok(self
878 .get_state_event(room_id, C::TYPE.into(), state_key.as_ref())
879 .await?
880 .map(|raw| raw.cast()))
881 }
882
883 async fn get_state_events_static<C>(
889 &self,
890 room_id: &RoomId,
891 ) -> Result<Vec<RawSyncOrStrippedState<C>>, Self::Error>
892 where
893 C: StaticEventContent<IsPrefix = ruma::events::False>
894 + StaticStateEventContent
895 + RedactContent,
896 C::Redacted: RedactedStateEventContent,
897 {
898 Ok(self
900 .get_state_events(room_id, C::TYPE.into())
901 .await?
902 .into_iter()
903 .map(|raw| raw.cast())
904 .collect())
905 }
906
907 async fn get_state_events_for_keys_static<'a, C, K, I>(
916 &self,
917 room_id: &RoomId,
918 state_keys: I,
919 ) -> Result<Vec<RawSyncOrStrippedState<C>>, Self::Error>
920 where
921 C: StaticEventContent<IsPrefix = ruma::events::False>
922 + StaticStateEventContent
923 + RedactContent,
924 C::StateKey: Borrow<K>,
925 C::Redacted: RedactedStateEventContent,
926 K: AsRef<str> + Sized + Sync + 'a,
927 I: IntoIterator<Item = &'a K> + Send,
928 I::IntoIter: Send,
929 {
930 Ok(self
931 .get_state_events_for_keys(
932 room_id,
933 C::TYPE.into(),
934 &state_keys.into_iter().map(|k| k.as_ref()).collect::<Vec<_>>(),
935 )
936 .await?
937 .into_iter()
938 .map(|raw| raw.cast())
939 .collect())
940 }
941
942 async fn get_account_data_event_static<C>(
944 &self,
945 ) -> Result<Option<Raw<GlobalAccountDataEvent<C>>>, Self::Error>
946 where
947 C: StaticEventContent<IsPrefix = ruma::events::False> + GlobalAccountDataEventContent,
948 {
949 Ok(self.get_account_data_event(C::TYPE.into()).await?.map(Raw::cast_unchecked))
950 }
951
952 async fn get_room_account_data_event_static<C>(
960 &self,
961 room_id: &RoomId,
962 ) -> Result<Option<Raw<RoomAccountDataEvent<C>>>, Self::Error>
963 where
964 C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
965 {
966 Ok(self
967 .get_room_account_data_event(room_id, C::TYPE.into())
968 .await?
969 .map(Raw::cast_unchecked))
970 }
971
972 async fn get_member_event(
980 &self,
981 room_id: &RoomId,
982 state_key: &UserId,
983 ) -> Result<Option<RawMemberEvent>, Self::Error> {
984 self.get_state_event_static_for_key(room_id, state_key).await
985 }
986}
987
988#[cfg_attr(target_family = "wasm", async_trait(?Send))]
989#[cfg_attr(not(target_family = "wasm"), async_trait)]
990impl<T: StateStore + ?Sized> StateStoreExt for T {}
991
992pub type DynStateStore = dyn StateStore<Error = StoreError>;
994
995pub trait IntoStateStore {
1001 #[doc(hidden)]
1002 fn into_state_store(self) -> Arc<DynStateStore>;
1003}
1004
1005impl<T> IntoStateStore for T
1006where
1007 T: StateStore + Sized + 'static,
1008{
1009 fn into_state_store(self) -> Arc<DynStateStore> {
1010 Arc::new(EraseStateStoreError(self))
1011 }
1012}
1013
1014impl<T> IntoStateStore for Arc<T>
1017where
1018 T: StateStore + 'static,
1019{
1020 fn into_state_store(self) -> Arc<DynStateStore> {
1021 let ptr: *const T = Arc::into_raw(self);
1022 let ptr_erased = ptr as *const EraseStateStoreError<T>;
1023 unsafe { Arc::from_raw(ptr_erased) }
1026 }
1027}
1028
1029#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1032pub struct ServerInfo {
1033 pub versions: Vec<String>,
1035
1036 pub unstable_features: BTreeMap<String, bool>,
1038
1039 #[serde(skip_serializing_if = "Option::is_none")]
1041 pub well_known: Option<WellKnownResponse>,
1042
1043 last_fetch_ts: f64,
1046}
1047
1048impl ServerInfo {
1049 pub const STALE_THRESHOLD: f64 = (1000 * 60 * 60 * 24 * 7) as _; pub fn new(
1054 versions: Vec<String>,
1055 unstable_features: BTreeMap<String, bool>,
1056 well_known: Option<WellKnownResponse>,
1057 ) -> Self {
1058 Self { versions, unstable_features, well_known, last_fetch_ts: now_timestamp_ms() }
1059 }
1060
1061 pub fn maybe_decode(&self) -> Option<Self> {
1067 if now_timestamp_ms() - self.last_fetch_ts >= Self::STALE_THRESHOLD {
1068 None
1069 } else {
1070 Some(self.clone())
1071 }
1072 }
1073
1074 pub fn supported_versions(&self) -> SupportedVersions {
1080 SupportedVersions::from_parts(&self.versions, &self.unstable_features)
1081 }
1082}
1083
1084#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1085pub struct WellKnownResponse {
1087 pub homeserver: HomeserverInfo,
1089
1090 pub identity_server: Option<IdentityServerInfo>,
1092
1093 pub tile_server: Option<TileServerInfo>,
1095
1096 pub rtc_foci: Vec<RtcFocusInfo>,
1098}
1099
1100impl From<discover_homeserver::Response> for WellKnownResponse {
1101 fn from(response: discover_homeserver::Response) -> Self {
1102 Self {
1103 homeserver: response.homeserver,
1104 identity_server: response.identity_server,
1105 tile_server: response.tile_server,
1106 rtc_foci: response.rtc_foci,
1107 }
1108 }
1109}
1110
1111fn now_timestamp_ms() -> f64 {
1113 SystemTime::now()
1114 .duration_since(SystemTime::UNIX_EPOCH)
1115 .expect("System clock was before 1970.")
1116 .as_secs_f64()
1117 * 1000.0
1118}
1119
1120#[derive(Debug, Clone)]
1122pub enum StateStoreDataValue {
1123 SyncToken(String),
1125
1126 ServerInfo(ServerInfo),
1128
1129 Filter(String),
1131
1132 UserAvatarUrl(OwnedMxcUri),
1134
1135 RecentlyVisitedRooms(Vec<OwnedRoomId>),
1137
1138 UtdHookManagerData(GrowableBloom),
1141
1142 OneTimeKeyAlreadyUploaded,
1145
1146 ComposerDraft(ComposerDraft),
1151
1152 SeenKnockRequests(BTreeMap<OwnedEventId, OwnedUserId>),
1154
1155 ThreadSubscriptionsCatchupTokens(Vec<ThreadSubscriptionCatchupToken>),
1160}
1161
1162#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1172pub struct ThreadSubscriptionCatchupToken {
1173 pub from: String,
1179
1180 pub to: Option<String>,
1186}
1187
1188#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1190pub struct ComposerDraft {
1191 pub plain_text: String,
1193 pub html_text: Option<String>,
1196 pub draft_type: ComposerDraftType,
1198}
1199
1200#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1202pub enum ComposerDraftType {
1203 NewMessage,
1205 Reply {
1207 event_id: OwnedEventId,
1209 },
1210 Edit {
1212 event_id: OwnedEventId,
1214 },
1215}
1216
1217impl StateStoreDataValue {
1218 pub fn into_sync_token(self) -> Option<String> {
1220 as_variant!(self, Self::SyncToken)
1221 }
1222
1223 pub fn into_filter(self) -> Option<String> {
1225 as_variant!(self, Self::Filter)
1226 }
1227
1228 pub fn into_user_avatar_url(self) -> Option<OwnedMxcUri> {
1230 as_variant!(self, Self::UserAvatarUrl)
1231 }
1232
1233 pub fn into_recently_visited_rooms(self) -> Option<Vec<OwnedRoomId>> {
1235 as_variant!(self, Self::RecentlyVisitedRooms)
1236 }
1237
1238 pub fn into_utd_hook_manager_data(self) -> Option<GrowableBloom> {
1240 as_variant!(self, Self::UtdHookManagerData)
1241 }
1242
1243 pub fn into_composer_draft(self) -> Option<ComposerDraft> {
1245 as_variant!(self, Self::ComposerDraft)
1246 }
1247
1248 pub fn into_server_info(self) -> Option<ServerInfo> {
1250 as_variant!(self, Self::ServerInfo)
1251 }
1252
1253 pub fn into_seen_knock_requests(self) -> Option<BTreeMap<OwnedEventId, OwnedUserId>> {
1255 as_variant!(self, Self::SeenKnockRequests)
1256 }
1257
1258 pub fn into_thread_subscriptions_catchup_tokens(
1261 self,
1262 ) -> Option<Vec<ThreadSubscriptionCatchupToken>> {
1263 as_variant!(self, Self::ThreadSubscriptionsCatchupTokens)
1264 }
1265}
1266
1267#[derive(Debug, Clone, Copy)]
1269pub enum StateStoreDataKey<'a> {
1270 SyncToken,
1272
1273 ServerInfo,
1275
1276 Filter(&'a str),
1278
1279 UserAvatarUrl(&'a UserId),
1281
1282 RecentlyVisitedRooms(&'a UserId),
1284
1285 UtdHookManagerData,
1288
1289 OneTimeKeyAlreadyUploaded,
1292
1293 ComposerDraft(&'a RoomId, Option<&'a EventId>),
1298
1299 SeenKnockRequests(&'a RoomId),
1301
1302 ThreadSubscriptionsCatchupTokens,
1304}
1305
1306impl StateStoreDataKey<'_> {
1307 pub const SYNC_TOKEN: &'static str = "sync_token";
1309
1310 pub const SERVER_INFO: &'static str = "server_capabilities"; pub const FILTER: &'static str = "filter";
1316
1317 pub const USER_AVATAR_URL: &'static str = "user_avatar_url";
1320
1321 pub const RECENTLY_VISITED_ROOMS: &'static str = "recently_visited_rooms";
1324
1325 pub const UTD_HOOK_MANAGER_DATA: &'static str = "utd_hook_manager_data";
1328
1329 pub const ONE_TIME_KEY_ALREADY_UPLOADED: &'static str = "one_time_key_already_uploaded";
1332
1333 pub const COMPOSER_DRAFT: &'static str = "composer_draft";
1336
1337 pub const SEEN_KNOCK_REQUESTS: &'static str = "seen_knock_requests";
1340
1341 pub const THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS: &'static str =
1344 "thread_subscriptions_catchup_tokens";
1345}
1346
1347pub fn compare_thread_subscription_bump_stamps(
1356 previous: Option<u64>,
1357 new: &mut Option<u64>,
1358) -> bool {
1359 match (previous, &new) {
1360 (Some(prev_bump), None) => {
1363 *new = Some(prev_bump);
1364 }
1365
1366 (Some(prev_bump), Some(new_bump)) if *new_bump <= prev_bump => {
1368 return false;
1369 }
1370
1371 _ => {}
1373 }
1374
1375 true
1376}
1377
1378#[cfg(test)]
1379mod tests {
1380 use super::{ServerInfo, now_timestamp_ms};
1381
1382 #[test]
1383 fn test_stale_server_info() {
1384 let mut server_info = ServerInfo {
1385 versions: Default::default(),
1386 unstable_features: Default::default(),
1387 well_known: Default::default(),
1388 last_fetch_ts: now_timestamp_ms() - ServerInfo::STALE_THRESHOLD - 1.0,
1389 };
1390
1391 assert!(server_info.maybe_decode().is_none());
1393
1394 server_info.last_fetch_ts = now_timestamp_ms() - 1.0;
1396 assert!(server_info.maybe_decode().is_some());
1397 }
1398}