1use std::{
16 borrow::Borrow,
17 collections::{BTreeMap, BTreeSet, HashMap},
18 fmt,
19 sync::Arc,
20};
21
22use as_variant::as_variant;
23use async_trait::async_trait;
24use growable_bloom_filter::GrowableBloom;
25use matrix_sdk_common::AsyncTraitDeps;
26use ruma::{
27 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri, OwnedRoomId,
28 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
29 api::{
30 SupportedVersions,
31 client::discovery::{
32 discover_homeserver::{
33 self, HomeserverInfo, IdentityServerInfo, RtcFocusInfo, TileServerInfo,
34 },
35 get_capabilities::v3::Capabilities,
36 },
37 },
38 events::{
39 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, EmptyStateKey, GlobalAccountDataEvent,
40 GlobalAccountDataEventContent, GlobalAccountDataEventType, RedactContent,
41 RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
42 RoomAccountDataEventType, StateEventType, StaticEventContent, StaticStateEventContent,
43 presence::PresenceEvent,
44 receipt::{Receipt, ReceiptThread, ReceiptType},
45 },
46 serde::Raw,
47 time::SystemTime,
48};
49use serde::{Deserialize, Serialize};
50
51use super::{
52 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError,
53 QueuedRequest, QueuedRequestKind, RoomLoadSettings, StateChanges, StoreError,
54 send_queue::SentRequestKey,
55};
56use crate::{
57 MinimalRoomMemberEvent, RoomInfo, RoomMemberships,
58 deserialized_responses::{
59 DisplayName, RawAnySyncOrStrippedState, RawMemberEvent, RawSyncOrStrippedState,
60 },
61 store::StoredThreadSubscription,
62};
63
64#[cfg_attr(target_family = "wasm", async_trait(?Send))]
67#[cfg_attr(not(target_family = "wasm"), async_trait)]
68pub trait StateStore: AsyncTraitDeps {
69 type Error: fmt::Debug + Into<StoreError> + From<serde_json::Error>;
71
72 async fn get_kv_data(
78 &self,
79 key: StateStoreDataKey<'_>,
80 ) -> Result<Option<StateStoreDataValue>, Self::Error>;
81
82 async fn set_kv_data(
92 &self,
93 key: StateStoreDataKey<'_>,
94 value: StateStoreDataValue,
95 ) -> Result<(), Self::Error>;
96
97 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error>;
103
104 async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error>;
106
107 async fn get_presence_event(
114 &self,
115 user_id: &UserId,
116 ) -> Result<Option<Raw<PresenceEvent>>, Self::Error>;
117
118 async fn get_presence_events(
124 &self,
125 user_ids: &[OwnedUserId],
126 ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error>;
127
128 async fn get_state_event(
136 &self,
137 room_id: &RoomId,
138 event_type: StateEventType,
139 state_key: &str,
140 ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error>;
141
142 async fn get_state_events(
150 &self,
151 room_id: &RoomId,
152 event_type: StateEventType,
153 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error>;
154
155 async fn get_state_events_for_keys(
166 &self,
167 room_id: &RoomId,
168 event_type: StateEventType,
169 state_keys: &[&str],
170 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error>;
171
172 async fn get_profile(
180 &self,
181 room_id: &RoomId,
182 user_id: &UserId,
183 ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error>;
184
185 async fn get_profiles<'a>(
193 &self,
194 room_id: &RoomId,
195 user_ids: &'a [OwnedUserId],
196 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error>;
197
198 async fn get_user_ids(
201 &self,
202 room_id: &RoomId,
203 memberships: RoomMemberships,
204 ) -> Result<Vec<OwnedUserId>, Self::Error>;
205
206 async fn get_room_infos(
208 &self,
209 room_load_settings: &RoomLoadSettings,
210 ) -> Result<Vec<RoomInfo>, Self::Error>;
211
212 async fn get_users_with_display_name(
221 &self,
222 room_id: &RoomId,
223 display_name: &DisplayName,
224 ) -> Result<BTreeSet<OwnedUserId>, Self::Error>;
225
226 async fn get_users_with_display_names<'a>(
234 &self,
235 room_id: &RoomId,
236 display_names: &'a [DisplayName],
237 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error>;
238
239 async fn get_account_data_event(
245 &self,
246 event_type: GlobalAccountDataEventType,
247 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error>;
248
249 async fn get_room_account_data_event(
259 &self,
260 room_id: &RoomId,
261 event_type: RoomAccountDataEventType,
262 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error>;
263
264 async fn get_user_room_receipt_event(
277 &self,
278 room_id: &RoomId,
279 receipt_type: ReceiptType,
280 thread: ReceiptThread,
281 user_id: &UserId,
282 ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error>;
283
284 async fn get_event_room_receipt_events(
298 &self,
299 room_id: &RoomId,
300 receipt_type: ReceiptType,
301 thread: ReceiptThread,
302 event_id: &EventId,
303 ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error>;
304
305 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
311
312 async fn set_custom_value(
321 &self,
322 key: &[u8],
323 value: Vec<u8>,
324 ) -> Result<Option<Vec<u8>>, Self::Error>;
325
326 async fn set_custom_value_no_read(
340 &self,
341 key: &[u8],
342 value: Vec<u8>,
343 ) -> Result<(), Self::Error> {
344 self.set_custom_value(key, value).await.map(|_| ())
345 }
346
347 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
353
354 async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error>;
360
361 async fn save_send_queue_request(
371 &self,
372 room_id: &RoomId,
373 transaction_id: OwnedTransactionId,
374 created_at: MilliSecondsSinceUnixEpoch,
375 request: QueuedRequestKind,
376 priority: usize,
377 ) -> Result<(), Self::Error>;
378
379 async fn update_send_queue_request(
391 &self,
392 room_id: &RoomId,
393 transaction_id: &TransactionId,
394 content: QueuedRequestKind,
395 ) -> Result<bool, Self::Error>;
396
397 async fn remove_send_queue_request(
403 &self,
404 room_id: &RoomId,
405 transaction_id: &TransactionId,
406 ) -> Result<bool, Self::Error>;
407
408 async fn load_send_queue_requests(
414 &self,
415 room_id: &RoomId,
416 ) -> Result<Vec<QueuedRequest>, Self::Error>;
417
418 async fn update_send_queue_request_status(
421 &self,
422 room_id: &RoomId,
423 transaction_id: &TransactionId,
424 error: Option<QueueWedgeError>,
425 ) -> Result<(), Self::Error>;
426
427 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error>;
429
430 async fn save_dependent_queued_request(
433 &self,
434 room_id: &RoomId,
435 parent_txn_id: &TransactionId,
436 own_txn_id: ChildTransactionId,
437 created_at: MilliSecondsSinceUnixEpoch,
438 content: DependentQueuedRequestKind,
439 ) -> Result<(), Self::Error>;
440
441 async fn mark_dependent_queued_requests_as_ready(
450 &self,
451 room_id: &RoomId,
452 parent_txn_id: &TransactionId,
453 sent_parent_key: SentRequestKey,
454 ) -> Result<usize, Self::Error>;
455
456 async fn update_dependent_queued_request(
460 &self,
461 room_id: &RoomId,
462 own_transaction_id: &ChildTransactionId,
463 new_content: DependentQueuedRequestKind,
464 ) -> Result<bool, Self::Error>;
465
466 async fn remove_dependent_queued_request(
471 &self,
472 room: &RoomId,
473 own_txn_id: &ChildTransactionId,
474 ) -> Result<bool, Self::Error>;
475
476 async fn load_dependent_queued_requests(
482 &self,
483 room: &RoomId,
484 ) -> Result<Vec<DependentQueuedRequest>, Self::Error>;
485
486 async fn upsert_thread_subscriptions(
496 &self,
497 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
498 ) -> Result<(), Self::Error>;
499
500 async fn remove_thread_subscription(
504 &self,
505 room: &RoomId,
506 thread_id: &EventId,
507 ) -> Result<(), Self::Error>;
508
509 async fn load_thread_subscription(
513 &self,
514 room: &RoomId,
515 thread_id: &EventId,
516 ) -> Result<Option<StoredThreadSubscription>, Self::Error>;
517
518 #[doc(hidden)]
524 async fn optimize(&self) -> Result<(), Self::Error>;
525
526 async fn get_size(&self) -> Result<Option<usize>, Self::Error>;
528}
529
530#[repr(transparent)]
531struct EraseStateStoreError<T>(T);
532
533#[cfg(not(tarpaulin_include))]
534impl<T: fmt::Debug> fmt::Debug for EraseStateStoreError<T> {
535 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
536 self.0.fmt(f)
537 }
538}
539
540#[cfg_attr(target_family = "wasm", async_trait(?Send))]
541#[cfg_attr(not(target_family = "wasm"), async_trait)]
542impl<T: StateStore> StateStore for EraseStateStoreError<T> {
543 type Error = StoreError;
544
545 async fn get_kv_data(
546 &self,
547 key: StateStoreDataKey<'_>,
548 ) -> Result<Option<StateStoreDataValue>, Self::Error> {
549 self.0.get_kv_data(key).await.map_err(Into::into)
550 }
551
552 async fn set_kv_data(
553 &self,
554 key: StateStoreDataKey<'_>,
555 value: StateStoreDataValue,
556 ) -> Result<(), Self::Error> {
557 self.0.set_kv_data(key, value).await.map_err(Into::into)
558 }
559
560 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
561 self.0.remove_kv_data(key).await.map_err(Into::into)
562 }
563
564 async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
565 self.0.save_changes(changes).await.map_err(Into::into)
566 }
567
568 async fn get_presence_event(
569 &self,
570 user_id: &UserId,
571 ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
572 self.0.get_presence_event(user_id).await.map_err(Into::into)
573 }
574
575 async fn get_presence_events(
576 &self,
577 user_ids: &[OwnedUserId],
578 ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
579 self.0.get_presence_events(user_ids).await.map_err(Into::into)
580 }
581
582 async fn get_state_event(
583 &self,
584 room_id: &RoomId,
585 event_type: StateEventType,
586 state_key: &str,
587 ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
588 self.0.get_state_event(room_id, event_type, state_key).await.map_err(Into::into)
589 }
590
591 async fn get_state_events(
592 &self,
593 room_id: &RoomId,
594 event_type: StateEventType,
595 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
596 self.0.get_state_events(room_id, event_type).await.map_err(Into::into)
597 }
598
599 async fn get_state_events_for_keys(
600 &self,
601 room_id: &RoomId,
602 event_type: StateEventType,
603 state_keys: &[&str],
604 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
605 self.0.get_state_events_for_keys(room_id, event_type, state_keys).await.map_err(Into::into)
606 }
607
608 async fn get_profile(
609 &self,
610 room_id: &RoomId,
611 user_id: &UserId,
612 ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
613 self.0.get_profile(room_id, user_id).await.map_err(Into::into)
614 }
615
616 async fn get_profiles<'a>(
617 &self,
618 room_id: &RoomId,
619 user_ids: &'a [OwnedUserId],
620 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
621 self.0.get_profiles(room_id, user_ids).await.map_err(Into::into)
622 }
623
624 async fn get_user_ids(
625 &self,
626 room_id: &RoomId,
627 memberships: RoomMemberships,
628 ) -> Result<Vec<OwnedUserId>, Self::Error> {
629 self.0.get_user_ids(room_id, memberships).await.map_err(Into::into)
630 }
631
632 async fn get_room_infos(
633 &self,
634 room_load_settings: &RoomLoadSettings,
635 ) -> Result<Vec<RoomInfo>, Self::Error> {
636 self.0.get_room_infos(room_load_settings).await.map_err(Into::into)
637 }
638
639 async fn get_users_with_display_name(
640 &self,
641 room_id: &RoomId,
642 display_name: &DisplayName,
643 ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
644 self.0.get_users_with_display_name(room_id, display_name).await.map_err(Into::into)
645 }
646
647 async fn get_users_with_display_names<'a>(
648 &self,
649 room_id: &RoomId,
650 display_names: &'a [DisplayName],
651 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
652 self.0.get_users_with_display_names(room_id, display_names).await.map_err(Into::into)
653 }
654
655 async fn get_account_data_event(
656 &self,
657 event_type: GlobalAccountDataEventType,
658 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
659 self.0.get_account_data_event(event_type).await.map_err(Into::into)
660 }
661
662 async fn get_room_account_data_event(
663 &self,
664 room_id: &RoomId,
665 event_type: RoomAccountDataEventType,
666 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
667 self.0.get_room_account_data_event(room_id, event_type).await.map_err(Into::into)
668 }
669
670 async fn get_user_room_receipt_event(
671 &self,
672 room_id: &RoomId,
673 receipt_type: ReceiptType,
674 thread: ReceiptThread,
675 user_id: &UserId,
676 ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
677 self.0
678 .get_user_room_receipt_event(room_id, receipt_type, thread, user_id)
679 .await
680 .map_err(Into::into)
681 }
682
683 async fn get_event_room_receipt_events(
684 &self,
685 room_id: &RoomId,
686 receipt_type: ReceiptType,
687 thread: ReceiptThread,
688 event_id: &EventId,
689 ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
690 self.0
691 .get_event_room_receipt_events(room_id, receipt_type, thread, event_id)
692 .await
693 .map_err(Into::into)
694 }
695
696 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
697 self.0.get_custom_value(key).await.map_err(Into::into)
698 }
699
700 async fn set_custom_value(
701 &self,
702 key: &[u8],
703 value: Vec<u8>,
704 ) -> Result<Option<Vec<u8>>, Self::Error> {
705 self.0.set_custom_value(key, value).await.map_err(Into::into)
706 }
707
708 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
709 self.0.remove_custom_value(key).await.map_err(Into::into)
710 }
711
712 async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
713 self.0.remove_room(room_id).await.map_err(Into::into)
714 }
715
716 async fn save_send_queue_request(
717 &self,
718 room_id: &RoomId,
719 transaction_id: OwnedTransactionId,
720 created_at: MilliSecondsSinceUnixEpoch,
721 content: QueuedRequestKind,
722 priority: usize,
723 ) -> Result<(), Self::Error> {
724 self.0
725 .save_send_queue_request(room_id, transaction_id, created_at, content, priority)
726 .await
727 .map_err(Into::into)
728 }
729
730 async fn update_send_queue_request(
731 &self,
732 room_id: &RoomId,
733 transaction_id: &TransactionId,
734 content: QueuedRequestKind,
735 ) -> Result<bool, Self::Error> {
736 self.0.update_send_queue_request(room_id, transaction_id, content).await.map_err(Into::into)
737 }
738
739 async fn remove_send_queue_request(
740 &self,
741 room_id: &RoomId,
742 transaction_id: &TransactionId,
743 ) -> Result<bool, Self::Error> {
744 self.0.remove_send_queue_request(room_id, transaction_id).await.map_err(Into::into)
745 }
746
747 async fn load_send_queue_requests(
748 &self,
749 room_id: &RoomId,
750 ) -> Result<Vec<QueuedRequest>, Self::Error> {
751 self.0.load_send_queue_requests(room_id).await.map_err(Into::into)
752 }
753
754 async fn update_send_queue_request_status(
755 &self,
756 room_id: &RoomId,
757 transaction_id: &TransactionId,
758 error: Option<QueueWedgeError>,
759 ) -> Result<(), Self::Error> {
760 self.0
761 .update_send_queue_request_status(room_id, transaction_id, error)
762 .await
763 .map_err(Into::into)
764 }
765
766 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
767 self.0.load_rooms_with_unsent_requests().await.map_err(Into::into)
768 }
769
770 async fn save_dependent_queued_request(
771 &self,
772 room_id: &RoomId,
773 parent_txn_id: &TransactionId,
774 own_txn_id: ChildTransactionId,
775 created_at: MilliSecondsSinceUnixEpoch,
776 content: DependentQueuedRequestKind,
777 ) -> Result<(), Self::Error> {
778 self.0
779 .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
780 .await
781 .map_err(Into::into)
782 }
783
784 async fn mark_dependent_queued_requests_as_ready(
785 &self,
786 room_id: &RoomId,
787 parent_txn_id: &TransactionId,
788 sent_parent_key: SentRequestKey,
789 ) -> Result<usize, Self::Error> {
790 self.0
791 .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
792 .await
793 .map_err(Into::into)
794 }
795
796 async fn remove_dependent_queued_request(
797 &self,
798 room_id: &RoomId,
799 own_txn_id: &ChildTransactionId,
800 ) -> Result<bool, Self::Error> {
801 self.0.remove_dependent_queued_request(room_id, own_txn_id).await.map_err(Into::into)
802 }
803
804 async fn load_dependent_queued_requests(
805 &self,
806 room_id: &RoomId,
807 ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
808 self.0.load_dependent_queued_requests(room_id).await.map_err(Into::into)
809 }
810
811 async fn update_dependent_queued_request(
812 &self,
813 room_id: &RoomId,
814 own_transaction_id: &ChildTransactionId,
815 new_content: DependentQueuedRequestKind,
816 ) -> Result<bool, Self::Error> {
817 self.0
818 .update_dependent_queued_request(room_id, own_transaction_id, new_content)
819 .await
820 .map_err(Into::into)
821 }
822
823 async fn upsert_thread_subscriptions(
824 &self,
825 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
826 ) -> Result<(), Self::Error> {
827 self.0.upsert_thread_subscriptions(updates).await.map_err(Into::into)
828 }
829
830 async fn load_thread_subscription(
831 &self,
832 room: &RoomId,
833 thread_id: &EventId,
834 ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
835 self.0.load_thread_subscription(room, thread_id).await.map_err(Into::into)
836 }
837
838 async fn remove_thread_subscription(
839 &self,
840 room: &RoomId,
841 thread_id: &EventId,
842 ) -> Result<(), Self::Error> {
843 self.0.remove_thread_subscription(room, thread_id).await.map_err(Into::into)
844 }
845
846 async fn optimize(&self) -> Result<(), Self::Error> {
847 self.0.optimize().await.map_err(Into::into)
848 }
849
850 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
851 self.0.get_size().await.map_err(Into::into)
852 }
853}
854
855#[cfg_attr(target_family = "wasm", async_trait(?Send))]
857#[cfg_attr(not(target_family = "wasm"), async_trait)]
858pub trait StateStoreExt: StateStore {
859 async fn get_state_event_static<C>(
865 &self,
866 room_id: &RoomId,
867 ) -> Result<Option<RawSyncOrStrippedState<C>>, Self::Error>
868 where
869 C: StaticEventContent<IsPrefix = ruma::events::False>
870 + StaticStateEventContent<StateKey = EmptyStateKey>
871 + RedactContent,
872 C::Redacted: RedactedStateEventContent,
873 {
874 Ok(self.get_state_event(room_id, C::TYPE.into(), "").await?.map(|raw| raw.cast()))
875 }
876
877 async fn get_state_event_static_for_key<C, K>(
883 &self,
884 room_id: &RoomId,
885 state_key: &K,
886 ) -> Result<Option<RawSyncOrStrippedState<C>>, Self::Error>
887 where
888 C: StaticEventContent<IsPrefix = ruma::events::False>
889 + StaticStateEventContent
890 + RedactContent,
891 C::StateKey: Borrow<K>,
892 C::Redacted: RedactedStateEventContent,
893 K: AsRef<str> + ?Sized + Sync,
894 {
895 Ok(self
896 .get_state_event(room_id, C::TYPE.into(), state_key.as_ref())
897 .await?
898 .map(|raw| raw.cast()))
899 }
900
901 async fn get_state_events_static<C>(
907 &self,
908 room_id: &RoomId,
909 ) -> Result<Vec<RawSyncOrStrippedState<C>>, Self::Error>
910 where
911 C: StaticEventContent<IsPrefix = ruma::events::False>
912 + StaticStateEventContent
913 + RedactContent,
914 C::Redacted: RedactedStateEventContent,
915 {
916 Ok(self
918 .get_state_events(room_id, C::TYPE.into())
919 .await?
920 .into_iter()
921 .map(|raw| raw.cast())
922 .collect())
923 }
924
925 async fn get_state_events_for_keys_static<'a, C, K, I>(
934 &self,
935 room_id: &RoomId,
936 state_keys: I,
937 ) -> Result<Vec<RawSyncOrStrippedState<C>>, Self::Error>
938 where
939 C: StaticEventContent<IsPrefix = ruma::events::False>
940 + StaticStateEventContent
941 + RedactContent,
942 C::StateKey: Borrow<K>,
943 C::Redacted: RedactedStateEventContent,
944 K: AsRef<str> + Sized + Sync + 'a,
945 I: IntoIterator<Item = &'a K> + Send,
946 I::IntoIter: Send,
947 {
948 Ok(self
949 .get_state_events_for_keys(
950 room_id,
951 C::TYPE.into(),
952 &state_keys.into_iter().map(|k| k.as_ref()).collect::<Vec<_>>(),
953 )
954 .await?
955 .into_iter()
956 .map(|raw| raw.cast())
957 .collect())
958 }
959
960 async fn get_account_data_event_static<C>(
962 &self,
963 ) -> Result<Option<Raw<GlobalAccountDataEvent<C>>>, Self::Error>
964 where
965 C: StaticEventContent<IsPrefix = ruma::events::False> + GlobalAccountDataEventContent,
966 {
967 Ok(self.get_account_data_event(C::TYPE.into()).await?.map(Raw::cast_unchecked))
968 }
969
970 async fn get_room_account_data_event_static<C>(
978 &self,
979 room_id: &RoomId,
980 ) -> Result<Option<Raw<RoomAccountDataEvent<C>>>, Self::Error>
981 where
982 C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
983 {
984 Ok(self
985 .get_room_account_data_event(room_id, C::TYPE.into())
986 .await?
987 .map(Raw::cast_unchecked))
988 }
989
990 async fn get_member_event(
998 &self,
999 room_id: &RoomId,
1000 state_key: &UserId,
1001 ) -> Result<Option<RawMemberEvent>, Self::Error> {
1002 self.get_state_event_static_for_key(room_id, state_key).await
1003 }
1004}
1005
1006#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1007#[cfg_attr(not(target_family = "wasm"), async_trait)]
1008impl<T: StateStore + ?Sized> StateStoreExt for T {}
1009
1010pub type DynStateStore = dyn StateStore<Error = StoreError>;
1012
1013pub trait IntoStateStore {
1019 #[doc(hidden)]
1020 fn into_state_store(self) -> Arc<DynStateStore>;
1021}
1022
1023impl<T> IntoStateStore for T
1024where
1025 T: StateStore + Sized + 'static,
1026{
1027 fn into_state_store(self) -> Arc<DynStateStore> {
1028 Arc::new(EraseStateStoreError(self))
1029 }
1030}
1031
1032impl<T> IntoStateStore for Arc<T>
1035where
1036 T: StateStore + 'static,
1037{
1038 fn into_state_store(self) -> Arc<DynStateStore> {
1039 let ptr: *const T = Arc::into_raw(self);
1040 let ptr_erased = ptr as *const EraseStateStoreError<T>;
1041 unsafe { Arc::from_raw(ptr_erased) }
1044 }
1045}
1046
1047#[derive(Debug, Clone, Serialize, Deserialize)]
1049pub struct TtlStoreValue<T> {
1050 #[serde(flatten)]
1052 data: T,
1053
1054 last_fetch_ts: f64,
1057}
1058
1059impl<T> TtlStoreValue<T> {
1060 pub const STALE_THRESHOLD: f64 = (1000 * 60 * 60 * 24 * 7) as _; pub fn new(data: T) -> Self {
1065 Self { data, last_fetch_ts: now_timestamp_ms() }
1066 }
1067
1068 pub fn into_data(self) -> Option<T> {
1070 if now_timestamp_ms() - self.last_fetch_ts >= Self::STALE_THRESHOLD {
1071 None
1072 } else {
1073 Some(self.data)
1074 }
1075 }
1076}
1077
1078#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1080pub struct SupportedVersionsResponse {
1081 pub versions: Vec<String>,
1083
1084 pub unstable_features: BTreeMap<String, bool>,
1086}
1087
1088impl SupportedVersionsResponse {
1089 pub fn supported_versions(&self) -> SupportedVersions {
1095 SupportedVersions::from_parts(&self.versions, &self.unstable_features)
1096 }
1097}
1098
1099#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1100pub struct WellKnownResponse {
1102 pub homeserver: HomeserverInfo,
1104
1105 pub identity_server: Option<IdentityServerInfo>,
1107
1108 pub tile_server: Option<TileServerInfo>,
1110
1111 pub rtc_foci: Vec<RtcFocusInfo>,
1113}
1114
1115impl From<discover_homeserver::Response> for WellKnownResponse {
1116 fn from(response: discover_homeserver::Response) -> Self {
1117 Self {
1118 homeserver: response.homeserver,
1119 identity_server: response.identity_server,
1120 tile_server: response.tile_server,
1121 rtc_foci: response.rtc_foci,
1122 }
1123 }
1124}
1125
1126fn now_timestamp_ms() -> f64 {
1128 SystemTime::now()
1129 .duration_since(SystemTime::UNIX_EPOCH)
1130 .expect("System clock was before 1970.")
1131 .as_secs_f64()
1132 * 1000.0
1133}
1134
1135#[derive(Debug, Clone)]
1137pub enum StateStoreDataValue {
1138 SyncToken(String),
1140
1141 SupportedVersions(TtlStoreValue<SupportedVersionsResponse>),
1143
1144 WellKnown(TtlStoreValue<Option<WellKnownResponse>>),
1146
1147 Filter(String),
1149
1150 UserAvatarUrl(OwnedMxcUri),
1152
1153 RecentlyVisitedRooms(Vec<OwnedRoomId>),
1155
1156 UtdHookManagerData(GrowableBloom),
1159
1160 OneTimeKeyAlreadyUploaded,
1163
1164 ComposerDraft(ComposerDraft),
1169
1170 SeenKnockRequests(BTreeMap<OwnedEventId, OwnedUserId>),
1172
1173 ThreadSubscriptionsCatchupTokens(Vec<ThreadSubscriptionCatchupToken>),
1178
1179 HomeserverCapabilities(Capabilities),
1181}
1182
1183#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1193pub struct ThreadSubscriptionCatchupToken {
1194 pub from: String,
1200
1201 pub to: Option<String>,
1207}
1208
1209#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1211pub struct ComposerDraft {
1212 pub plain_text: String,
1214 pub html_text: Option<String>,
1217 pub draft_type: ComposerDraftType,
1219 #[serde(default)]
1221 pub attachments: Vec<DraftAttachment>,
1222}
1223
1224#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1226pub struct DraftAttachment {
1227 pub filename: String,
1229 pub content: DraftAttachmentContent,
1231}
1232
1233#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1235#[serde(tag = "type")]
1236pub enum DraftAttachmentContent {
1237 Image {
1239 data: Vec<u8>,
1241 mimetype: Option<String>,
1243 size: Option<u64>,
1245 width: Option<u64>,
1247 height: Option<u64>,
1249 blurhash: Option<String>,
1251 thumbnail: Option<DraftThumbnail>,
1253 },
1254 Video {
1256 data: Vec<u8>,
1258 mimetype: Option<String>,
1260 size: Option<u64>,
1262 width: Option<u64>,
1264 height: Option<u64>,
1266 duration: Option<std::time::Duration>,
1268 blurhash: Option<String>,
1270 thumbnail: Option<DraftThumbnail>,
1272 },
1273 Audio {
1275 data: Vec<u8>,
1277 mimetype: Option<String>,
1279 size: Option<u64>,
1281 duration: Option<std::time::Duration>,
1283 },
1284 File {
1286 data: Vec<u8>,
1288 mimetype: Option<String>,
1290 size: Option<u64>,
1292 },
1293}
1294
1295#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1297pub struct DraftThumbnail {
1298 pub filename: String,
1300 pub data: Vec<u8>,
1302 pub mimetype: Option<String>,
1304 pub width: Option<u64>,
1306 pub height: Option<u64>,
1308 pub size: Option<u64>,
1310}
1311
1312#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1314pub enum ComposerDraftType {
1315 NewMessage,
1317 Reply {
1319 event_id: OwnedEventId,
1321 },
1322 Edit {
1324 event_id: OwnedEventId,
1326 },
1327}
1328
1329impl StateStoreDataValue {
1330 pub fn into_sync_token(self) -> Option<String> {
1332 as_variant!(self, Self::SyncToken)
1333 }
1334
1335 pub fn into_filter(self) -> Option<String> {
1337 as_variant!(self, Self::Filter)
1338 }
1339
1340 pub fn into_user_avatar_url(self) -> Option<OwnedMxcUri> {
1342 as_variant!(self, Self::UserAvatarUrl)
1343 }
1344
1345 pub fn into_recently_visited_rooms(self) -> Option<Vec<OwnedRoomId>> {
1347 as_variant!(self, Self::RecentlyVisitedRooms)
1348 }
1349
1350 pub fn into_utd_hook_manager_data(self) -> Option<GrowableBloom> {
1352 as_variant!(self, Self::UtdHookManagerData)
1353 }
1354
1355 pub fn into_composer_draft(self) -> Option<ComposerDraft> {
1357 as_variant!(self, Self::ComposerDraft)
1358 }
1359
1360 pub fn into_supported_versions(self) -> Option<TtlStoreValue<SupportedVersionsResponse>> {
1362 as_variant!(self, Self::SupportedVersions)
1363 }
1364
1365 pub fn into_well_known(self) -> Option<TtlStoreValue<Option<WellKnownResponse>>> {
1367 as_variant!(self, Self::WellKnown)
1368 }
1369
1370 pub fn into_seen_knock_requests(self) -> Option<BTreeMap<OwnedEventId, OwnedUserId>> {
1372 as_variant!(self, Self::SeenKnockRequests)
1373 }
1374
1375 pub fn into_thread_subscriptions_catchup_tokens(
1378 self,
1379 ) -> Option<Vec<ThreadSubscriptionCatchupToken>> {
1380 as_variant!(self, Self::ThreadSubscriptionsCatchupTokens)
1381 }
1382
1383 pub fn into_homeserver_capabilities(self) -> Option<Capabilities> {
1386 as_variant!(self, Self::HomeserverCapabilities)
1387 }
1388}
1389
1390#[derive(Debug, Clone, Copy)]
1392pub enum StateStoreDataKey<'a> {
1393 SyncToken,
1395
1396 SupportedVersions,
1398
1399 WellKnown,
1401
1402 Filter(&'a str),
1404
1405 UserAvatarUrl(&'a UserId),
1407
1408 RecentlyVisitedRooms(&'a UserId),
1410
1411 UtdHookManagerData,
1414
1415 OneTimeKeyAlreadyUploaded,
1418
1419 ComposerDraft(&'a RoomId, Option<&'a EventId>),
1424
1425 SeenKnockRequests(&'a RoomId),
1427
1428 ThreadSubscriptionsCatchupTokens,
1430
1431 HomeserverCapabilities,
1433}
1434
1435impl StateStoreDataKey<'_> {
1436 pub const SYNC_TOKEN: &'static str = "sync_token";
1438
1439 pub const SUPPORTED_VERSIONS: &'static str = "server_capabilities"; pub const WELL_KNOWN: &'static str = "well_known";
1446
1447 pub const FILTER: &'static str = "filter";
1449
1450 pub const USER_AVATAR_URL: &'static str = "user_avatar_url";
1453
1454 pub const RECENTLY_VISITED_ROOMS: &'static str = "recently_visited_rooms";
1457
1458 pub const UTD_HOOK_MANAGER_DATA: &'static str = "utd_hook_manager_data";
1461
1462 pub const ONE_TIME_KEY_ALREADY_UPLOADED: &'static str = "one_time_key_already_uploaded";
1465
1466 pub const COMPOSER_DRAFT: &'static str = "composer_draft";
1469
1470 pub const SEEN_KNOCK_REQUESTS: &'static str = "seen_knock_requests";
1473
1474 pub const THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS: &'static str =
1477 "thread_subscriptions_catchup_tokens";
1478
1479 pub const HOMESERVER_CAPABILITIES: &'static str = "homeserver_capabilities";
1481}
1482
1483pub fn compare_thread_subscription_bump_stamps(
1492 previous: Option<u64>,
1493 new: &mut Option<u64>,
1494) -> bool {
1495 match (previous, &new) {
1496 (Some(prev_bump), None) => {
1499 *new = Some(prev_bump);
1500 }
1501
1502 (Some(prev_bump), Some(new_bump)) if *new_bump <= prev_bump => {
1504 return false;
1505 }
1506
1507 _ => {}
1509 }
1510
1511 true
1512}
1513
1514#[cfg(test)]
1515mod tests {
1516 use serde_json::json;
1517
1518 use super::{SupportedVersionsResponse, TtlStoreValue, now_timestamp_ms};
1519
1520 #[test]
1521 fn test_stale_ttl_store_value() {
1522 let ttl_value = TtlStoreValue {
1524 data: (),
1525 last_fetch_ts: now_timestamp_ms() - TtlStoreValue::<()>::STALE_THRESHOLD - 1.0,
1526 };
1527 assert!(ttl_value.into_data().is_none());
1528
1529 let ttl_value = TtlStoreValue::new(());
1531 assert!(ttl_value.into_data().is_some());
1532 }
1533
1534 #[test]
1535 fn test_stale_ttl_store_value_serialize_roundtrip() {
1536 let server_info = SupportedVersionsResponse {
1537 versions: vec!["1.2".to_owned(), "1.3".to_owned(), "1.4".to_owned()],
1538 unstable_features: [("org.matrix.msc3916.stable".to_owned(), true)].into(),
1539 };
1540 let ttl_value = TtlStoreValue { data: server_info.clone(), last_fetch_ts: 1000.0 };
1541 let json = json!({
1542 "versions": ["1.2", "1.3", "1.4"],
1543 "unstable_features": {
1544 "org.matrix.msc3916.stable": true,
1545 },
1546 "last_fetch_ts": 1000.0,
1547 });
1548
1549 assert_eq!(serde_json::to_value(&ttl_value).unwrap(), json);
1550
1551 let deserialized =
1552 serde_json::from_value::<TtlStoreValue<SupportedVersionsResponse>>(json).unwrap();
1553 assert_eq!(deserialized.data, server_info);
1554 assert!(deserialized.last_fetch_ts - ttl_value.last_fetch_ts < 0.0001);
1555 }
1556}