1use std::{
16 borrow::Borrow,
17 collections::{BTreeMap, BTreeSet, HashMap},
18 fmt,
19 sync::Arc,
20};
21
22use as_variant::as_variant;
23use async_trait::async_trait;
24use growable_bloom_filter::GrowableBloom;
25use matrix_sdk_common::AsyncTraitDeps;
26use ruma::{
27 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri, OwnedRoomId,
28 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
29 api::{
30 SupportedVersions,
31 client::discovery::discover_homeserver::{
32 self, HomeserverInfo, IdentityServerInfo, RtcFocusInfo, TileServerInfo,
33 },
34 },
35 events::{
36 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, EmptyStateKey, GlobalAccountDataEvent,
37 GlobalAccountDataEventContent, GlobalAccountDataEventType, RedactContent,
38 RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
39 RoomAccountDataEventType, StateEventType, StaticEventContent, StaticStateEventContent,
40 presence::PresenceEvent,
41 receipt::{Receipt, ReceiptThread, ReceiptType},
42 },
43 serde::Raw,
44 time::SystemTime,
45};
46use serde::{Deserialize, Serialize};
47
48use super::{
49 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError,
50 QueuedRequest, QueuedRequestKind, RoomLoadSettings, StateChanges, StoreError,
51 send_queue::SentRequestKey,
52};
53use crate::{
54 MinimalRoomMemberEvent, RoomInfo, RoomMemberships,
55 deserialized_responses::{
56 DisplayName, RawAnySyncOrStrippedState, RawMemberEvent, RawSyncOrStrippedState,
57 },
58 store::StoredThreadSubscription,
59};
60
61#[cfg_attr(target_family = "wasm", async_trait(?Send))]
64#[cfg_attr(not(target_family = "wasm"), async_trait)]
65pub trait StateStore: AsyncTraitDeps {
66 type Error: fmt::Debug + Into<StoreError> + From<serde_json::Error>;
68
69 async fn get_kv_data(
75 &self,
76 key: StateStoreDataKey<'_>,
77 ) -> Result<Option<StateStoreDataValue>, Self::Error>;
78
79 async fn set_kv_data(
89 &self,
90 key: StateStoreDataKey<'_>,
91 value: StateStoreDataValue,
92 ) -> Result<(), Self::Error>;
93
94 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error>;
100
101 async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error>;
103
104 async fn get_presence_event(
111 &self,
112 user_id: &UserId,
113 ) -> Result<Option<Raw<PresenceEvent>>, Self::Error>;
114
115 async fn get_presence_events(
121 &self,
122 user_ids: &[OwnedUserId],
123 ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error>;
124
125 async fn get_state_event(
133 &self,
134 room_id: &RoomId,
135 event_type: StateEventType,
136 state_key: &str,
137 ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error>;
138
139 async fn get_state_events(
147 &self,
148 room_id: &RoomId,
149 event_type: StateEventType,
150 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error>;
151
152 async fn get_state_events_for_keys(
163 &self,
164 room_id: &RoomId,
165 event_type: StateEventType,
166 state_keys: &[&str],
167 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error>;
168
169 async fn get_profile(
177 &self,
178 room_id: &RoomId,
179 user_id: &UserId,
180 ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error>;
181
182 async fn get_profiles<'a>(
190 &self,
191 room_id: &RoomId,
192 user_ids: &'a [OwnedUserId],
193 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error>;
194
195 async fn get_user_ids(
198 &self,
199 room_id: &RoomId,
200 memberships: RoomMemberships,
201 ) -> Result<Vec<OwnedUserId>, Self::Error>;
202
203 async fn get_room_infos(
205 &self,
206 room_load_settings: &RoomLoadSettings,
207 ) -> Result<Vec<RoomInfo>, Self::Error>;
208
209 async fn get_users_with_display_name(
218 &self,
219 room_id: &RoomId,
220 display_name: &DisplayName,
221 ) -> Result<BTreeSet<OwnedUserId>, Self::Error>;
222
223 async fn get_users_with_display_names<'a>(
231 &self,
232 room_id: &RoomId,
233 display_names: &'a [DisplayName],
234 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error>;
235
236 async fn get_account_data_event(
242 &self,
243 event_type: GlobalAccountDataEventType,
244 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error>;
245
246 async fn get_room_account_data_event(
256 &self,
257 room_id: &RoomId,
258 event_type: RoomAccountDataEventType,
259 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error>;
260
261 async fn get_user_room_receipt_event(
274 &self,
275 room_id: &RoomId,
276 receipt_type: ReceiptType,
277 thread: ReceiptThread,
278 user_id: &UserId,
279 ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error>;
280
281 async fn get_event_room_receipt_events(
295 &self,
296 room_id: &RoomId,
297 receipt_type: ReceiptType,
298 thread: ReceiptThread,
299 event_id: &EventId,
300 ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error>;
301
302 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
308
309 async fn set_custom_value(
318 &self,
319 key: &[u8],
320 value: Vec<u8>,
321 ) -> Result<Option<Vec<u8>>, Self::Error>;
322
323 async fn set_custom_value_no_read(
337 &self,
338 key: &[u8],
339 value: Vec<u8>,
340 ) -> Result<(), Self::Error> {
341 self.set_custom_value(key, value).await.map(|_| ())
342 }
343
344 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
350
351 async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error>;
357
358 async fn save_send_queue_request(
368 &self,
369 room_id: &RoomId,
370 transaction_id: OwnedTransactionId,
371 created_at: MilliSecondsSinceUnixEpoch,
372 request: QueuedRequestKind,
373 priority: usize,
374 ) -> Result<(), Self::Error>;
375
376 async fn update_send_queue_request(
388 &self,
389 room_id: &RoomId,
390 transaction_id: &TransactionId,
391 content: QueuedRequestKind,
392 ) -> Result<bool, Self::Error>;
393
394 async fn remove_send_queue_request(
400 &self,
401 room_id: &RoomId,
402 transaction_id: &TransactionId,
403 ) -> Result<bool, Self::Error>;
404
405 async fn load_send_queue_requests(
411 &self,
412 room_id: &RoomId,
413 ) -> Result<Vec<QueuedRequest>, Self::Error>;
414
415 async fn update_send_queue_request_status(
418 &self,
419 room_id: &RoomId,
420 transaction_id: &TransactionId,
421 error: Option<QueueWedgeError>,
422 ) -> Result<(), Self::Error>;
423
424 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error>;
426
427 async fn save_dependent_queued_request(
430 &self,
431 room_id: &RoomId,
432 parent_txn_id: &TransactionId,
433 own_txn_id: ChildTransactionId,
434 created_at: MilliSecondsSinceUnixEpoch,
435 content: DependentQueuedRequestKind,
436 ) -> Result<(), Self::Error>;
437
438 async fn mark_dependent_queued_requests_as_ready(
447 &self,
448 room_id: &RoomId,
449 parent_txn_id: &TransactionId,
450 sent_parent_key: SentRequestKey,
451 ) -> Result<usize, Self::Error>;
452
453 async fn update_dependent_queued_request(
457 &self,
458 room_id: &RoomId,
459 own_transaction_id: &ChildTransactionId,
460 new_content: DependentQueuedRequestKind,
461 ) -> Result<bool, Self::Error>;
462
463 async fn remove_dependent_queued_request(
468 &self,
469 room: &RoomId,
470 own_txn_id: &ChildTransactionId,
471 ) -> Result<bool, Self::Error>;
472
473 async fn load_dependent_queued_requests(
479 &self,
480 room: &RoomId,
481 ) -> Result<Vec<DependentQueuedRequest>, Self::Error>;
482
483 async fn upsert_thread_subscription(
493 &self,
494 room: &RoomId,
495 thread_id: &EventId,
496 subscription: StoredThreadSubscription,
497 ) -> Result<(), Self::Error>;
498
499 async fn upsert_thread_subscriptions(
509 &self,
510 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
511 ) -> Result<(), Self::Error>;
512
513 async fn remove_thread_subscription(
517 &self,
518 room: &RoomId,
519 thread_id: &EventId,
520 ) -> Result<(), Self::Error>;
521
522 async fn load_thread_subscription(
526 &self,
527 room: &RoomId,
528 thread_id: &EventId,
529 ) -> Result<Option<StoredThreadSubscription>, Self::Error>;
530
531 #[doc(hidden)]
537 async fn optimize(&self) -> Result<(), Self::Error>;
538
539 async fn get_size(&self) -> Result<Option<usize>, Self::Error>;
541}
542
543#[repr(transparent)]
544struct EraseStateStoreError<T>(T);
545
546#[cfg(not(tarpaulin_include))]
547impl<T: fmt::Debug> fmt::Debug for EraseStateStoreError<T> {
548 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
549 self.0.fmt(f)
550 }
551}
552
553#[cfg_attr(target_family = "wasm", async_trait(?Send))]
554#[cfg_attr(not(target_family = "wasm"), async_trait)]
555impl<T: StateStore> StateStore for EraseStateStoreError<T> {
556 type Error = StoreError;
557
558 async fn get_kv_data(
559 &self,
560 key: StateStoreDataKey<'_>,
561 ) -> Result<Option<StateStoreDataValue>, Self::Error> {
562 self.0.get_kv_data(key).await.map_err(Into::into)
563 }
564
565 async fn set_kv_data(
566 &self,
567 key: StateStoreDataKey<'_>,
568 value: StateStoreDataValue,
569 ) -> Result<(), Self::Error> {
570 self.0.set_kv_data(key, value).await.map_err(Into::into)
571 }
572
573 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
574 self.0.remove_kv_data(key).await.map_err(Into::into)
575 }
576
577 async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
578 self.0.save_changes(changes).await.map_err(Into::into)
579 }
580
581 async fn get_presence_event(
582 &self,
583 user_id: &UserId,
584 ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
585 self.0.get_presence_event(user_id).await.map_err(Into::into)
586 }
587
588 async fn get_presence_events(
589 &self,
590 user_ids: &[OwnedUserId],
591 ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
592 self.0.get_presence_events(user_ids).await.map_err(Into::into)
593 }
594
595 async fn get_state_event(
596 &self,
597 room_id: &RoomId,
598 event_type: StateEventType,
599 state_key: &str,
600 ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
601 self.0.get_state_event(room_id, event_type, state_key).await.map_err(Into::into)
602 }
603
604 async fn get_state_events(
605 &self,
606 room_id: &RoomId,
607 event_type: StateEventType,
608 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
609 self.0.get_state_events(room_id, event_type).await.map_err(Into::into)
610 }
611
612 async fn get_state_events_for_keys(
613 &self,
614 room_id: &RoomId,
615 event_type: StateEventType,
616 state_keys: &[&str],
617 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
618 self.0.get_state_events_for_keys(room_id, event_type, state_keys).await.map_err(Into::into)
619 }
620
621 async fn get_profile(
622 &self,
623 room_id: &RoomId,
624 user_id: &UserId,
625 ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
626 self.0.get_profile(room_id, user_id).await.map_err(Into::into)
627 }
628
629 async fn get_profiles<'a>(
630 &self,
631 room_id: &RoomId,
632 user_ids: &'a [OwnedUserId],
633 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
634 self.0.get_profiles(room_id, user_ids).await.map_err(Into::into)
635 }
636
637 async fn get_user_ids(
638 &self,
639 room_id: &RoomId,
640 memberships: RoomMemberships,
641 ) -> Result<Vec<OwnedUserId>, Self::Error> {
642 self.0.get_user_ids(room_id, memberships).await.map_err(Into::into)
643 }
644
645 async fn get_room_infos(
646 &self,
647 room_load_settings: &RoomLoadSettings,
648 ) -> Result<Vec<RoomInfo>, Self::Error> {
649 self.0.get_room_infos(room_load_settings).await.map_err(Into::into)
650 }
651
652 async fn get_users_with_display_name(
653 &self,
654 room_id: &RoomId,
655 display_name: &DisplayName,
656 ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
657 self.0.get_users_with_display_name(room_id, display_name).await.map_err(Into::into)
658 }
659
660 async fn get_users_with_display_names<'a>(
661 &self,
662 room_id: &RoomId,
663 display_names: &'a [DisplayName],
664 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
665 self.0.get_users_with_display_names(room_id, display_names).await.map_err(Into::into)
666 }
667
668 async fn get_account_data_event(
669 &self,
670 event_type: GlobalAccountDataEventType,
671 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
672 self.0.get_account_data_event(event_type).await.map_err(Into::into)
673 }
674
675 async fn get_room_account_data_event(
676 &self,
677 room_id: &RoomId,
678 event_type: RoomAccountDataEventType,
679 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
680 self.0.get_room_account_data_event(room_id, event_type).await.map_err(Into::into)
681 }
682
683 async fn get_user_room_receipt_event(
684 &self,
685 room_id: &RoomId,
686 receipt_type: ReceiptType,
687 thread: ReceiptThread,
688 user_id: &UserId,
689 ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
690 self.0
691 .get_user_room_receipt_event(room_id, receipt_type, thread, user_id)
692 .await
693 .map_err(Into::into)
694 }
695
696 async fn get_event_room_receipt_events(
697 &self,
698 room_id: &RoomId,
699 receipt_type: ReceiptType,
700 thread: ReceiptThread,
701 event_id: &EventId,
702 ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
703 self.0
704 .get_event_room_receipt_events(room_id, receipt_type, thread, event_id)
705 .await
706 .map_err(Into::into)
707 }
708
709 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
710 self.0.get_custom_value(key).await.map_err(Into::into)
711 }
712
713 async fn set_custom_value(
714 &self,
715 key: &[u8],
716 value: Vec<u8>,
717 ) -> Result<Option<Vec<u8>>, Self::Error> {
718 self.0.set_custom_value(key, value).await.map_err(Into::into)
719 }
720
721 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
722 self.0.remove_custom_value(key).await.map_err(Into::into)
723 }
724
725 async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
726 self.0.remove_room(room_id).await.map_err(Into::into)
727 }
728
729 async fn save_send_queue_request(
730 &self,
731 room_id: &RoomId,
732 transaction_id: OwnedTransactionId,
733 created_at: MilliSecondsSinceUnixEpoch,
734 content: QueuedRequestKind,
735 priority: usize,
736 ) -> Result<(), Self::Error> {
737 self.0
738 .save_send_queue_request(room_id, transaction_id, created_at, content, priority)
739 .await
740 .map_err(Into::into)
741 }
742
743 async fn update_send_queue_request(
744 &self,
745 room_id: &RoomId,
746 transaction_id: &TransactionId,
747 content: QueuedRequestKind,
748 ) -> Result<bool, Self::Error> {
749 self.0.update_send_queue_request(room_id, transaction_id, content).await.map_err(Into::into)
750 }
751
752 async fn remove_send_queue_request(
753 &self,
754 room_id: &RoomId,
755 transaction_id: &TransactionId,
756 ) -> Result<bool, Self::Error> {
757 self.0.remove_send_queue_request(room_id, transaction_id).await.map_err(Into::into)
758 }
759
760 async fn load_send_queue_requests(
761 &self,
762 room_id: &RoomId,
763 ) -> Result<Vec<QueuedRequest>, Self::Error> {
764 self.0.load_send_queue_requests(room_id).await.map_err(Into::into)
765 }
766
767 async fn update_send_queue_request_status(
768 &self,
769 room_id: &RoomId,
770 transaction_id: &TransactionId,
771 error: Option<QueueWedgeError>,
772 ) -> Result<(), Self::Error> {
773 self.0
774 .update_send_queue_request_status(room_id, transaction_id, error)
775 .await
776 .map_err(Into::into)
777 }
778
779 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
780 self.0.load_rooms_with_unsent_requests().await.map_err(Into::into)
781 }
782
783 async fn save_dependent_queued_request(
784 &self,
785 room_id: &RoomId,
786 parent_txn_id: &TransactionId,
787 own_txn_id: ChildTransactionId,
788 created_at: MilliSecondsSinceUnixEpoch,
789 content: DependentQueuedRequestKind,
790 ) -> Result<(), Self::Error> {
791 self.0
792 .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
793 .await
794 .map_err(Into::into)
795 }
796
797 async fn mark_dependent_queued_requests_as_ready(
798 &self,
799 room_id: &RoomId,
800 parent_txn_id: &TransactionId,
801 sent_parent_key: SentRequestKey,
802 ) -> Result<usize, Self::Error> {
803 self.0
804 .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
805 .await
806 .map_err(Into::into)
807 }
808
809 async fn remove_dependent_queued_request(
810 &self,
811 room_id: &RoomId,
812 own_txn_id: &ChildTransactionId,
813 ) -> Result<bool, Self::Error> {
814 self.0.remove_dependent_queued_request(room_id, own_txn_id).await.map_err(Into::into)
815 }
816
817 async fn load_dependent_queued_requests(
818 &self,
819 room_id: &RoomId,
820 ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
821 self.0.load_dependent_queued_requests(room_id).await.map_err(Into::into)
822 }
823
824 async fn update_dependent_queued_request(
825 &self,
826 room_id: &RoomId,
827 own_transaction_id: &ChildTransactionId,
828 new_content: DependentQueuedRequestKind,
829 ) -> Result<bool, Self::Error> {
830 self.0
831 .update_dependent_queued_request(room_id, own_transaction_id, new_content)
832 .await
833 .map_err(Into::into)
834 }
835
836 async fn upsert_thread_subscription(
837 &self,
838 room: &RoomId,
839 thread_id: &EventId,
840 subscription: StoredThreadSubscription,
841 ) -> Result<(), Self::Error> {
842 self.0.upsert_thread_subscription(room, thread_id, subscription).await.map_err(Into::into)
843 }
844
845 async fn upsert_thread_subscriptions(
846 &self,
847 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
848 ) -> Result<(), Self::Error> {
849 self.0.upsert_thread_subscriptions(updates).await.map_err(Into::into)
850 }
851
852 async fn load_thread_subscription(
853 &self,
854 room: &RoomId,
855 thread_id: &EventId,
856 ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
857 self.0.load_thread_subscription(room, thread_id).await.map_err(Into::into)
858 }
859
860 async fn remove_thread_subscription(
861 &self,
862 room: &RoomId,
863 thread_id: &EventId,
864 ) -> Result<(), Self::Error> {
865 self.0.remove_thread_subscription(room, thread_id).await.map_err(Into::into)
866 }
867
868 async fn optimize(&self) -> Result<(), Self::Error> {
869 self.0.optimize().await.map_err(Into::into)
870 }
871
872 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
873 self.0.get_size().await.map_err(Into::into)
874 }
875}
876
877#[cfg_attr(target_family = "wasm", async_trait(?Send))]
879#[cfg_attr(not(target_family = "wasm"), async_trait)]
880pub trait StateStoreExt: StateStore {
881 async fn get_state_event_static<C>(
887 &self,
888 room_id: &RoomId,
889 ) -> Result<Option<RawSyncOrStrippedState<C>>, Self::Error>
890 where
891 C: StaticEventContent<IsPrefix = ruma::events::False>
892 + StaticStateEventContent<StateKey = EmptyStateKey>
893 + RedactContent,
894 C::Redacted: RedactedStateEventContent,
895 {
896 Ok(self.get_state_event(room_id, C::TYPE.into(), "").await?.map(|raw| raw.cast()))
897 }
898
899 async fn get_state_event_static_for_key<C, K>(
905 &self,
906 room_id: &RoomId,
907 state_key: &K,
908 ) -> Result<Option<RawSyncOrStrippedState<C>>, Self::Error>
909 where
910 C: StaticEventContent<IsPrefix = ruma::events::False>
911 + StaticStateEventContent
912 + RedactContent,
913 C::StateKey: Borrow<K>,
914 C::Redacted: RedactedStateEventContent,
915 K: AsRef<str> + ?Sized + Sync,
916 {
917 Ok(self
918 .get_state_event(room_id, C::TYPE.into(), state_key.as_ref())
919 .await?
920 .map(|raw| raw.cast()))
921 }
922
923 async fn get_state_events_static<C>(
929 &self,
930 room_id: &RoomId,
931 ) -> Result<Vec<RawSyncOrStrippedState<C>>, Self::Error>
932 where
933 C: StaticEventContent<IsPrefix = ruma::events::False>
934 + StaticStateEventContent
935 + RedactContent,
936 C::Redacted: RedactedStateEventContent,
937 {
938 Ok(self
940 .get_state_events(room_id, C::TYPE.into())
941 .await?
942 .into_iter()
943 .map(|raw| raw.cast())
944 .collect())
945 }
946
947 async fn get_state_events_for_keys_static<'a, C, K, I>(
956 &self,
957 room_id: &RoomId,
958 state_keys: I,
959 ) -> Result<Vec<RawSyncOrStrippedState<C>>, Self::Error>
960 where
961 C: StaticEventContent<IsPrefix = ruma::events::False>
962 + StaticStateEventContent
963 + RedactContent,
964 C::StateKey: Borrow<K>,
965 C::Redacted: RedactedStateEventContent,
966 K: AsRef<str> + Sized + Sync + 'a,
967 I: IntoIterator<Item = &'a K> + Send,
968 I::IntoIter: Send,
969 {
970 Ok(self
971 .get_state_events_for_keys(
972 room_id,
973 C::TYPE.into(),
974 &state_keys.into_iter().map(|k| k.as_ref()).collect::<Vec<_>>(),
975 )
976 .await?
977 .into_iter()
978 .map(|raw| raw.cast())
979 .collect())
980 }
981
982 async fn get_account_data_event_static<C>(
984 &self,
985 ) -> Result<Option<Raw<GlobalAccountDataEvent<C>>>, Self::Error>
986 where
987 C: StaticEventContent<IsPrefix = ruma::events::False> + GlobalAccountDataEventContent,
988 {
989 Ok(self.get_account_data_event(C::TYPE.into()).await?.map(Raw::cast_unchecked))
990 }
991
992 async fn get_room_account_data_event_static<C>(
1000 &self,
1001 room_id: &RoomId,
1002 ) -> Result<Option<Raw<RoomAccountDataEvent<C>>>, Self::Error>
1003 where
1004 C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
1005 {
1006 Ok(self
1007 .get_room_account_data_event(room_id, C::TYPE.into())
1008 .await?
1009 .map(Raw::cast_unchecked))
1010 }
1011
1012 async fn get_member_event(
1020 &self,
1021 room_id: &RoomId,
1022 state_key: &UserId,
1023 ) -> Result<Option<RawMemberEvent>, Self::Error> {
1024 self.get_state_event_static_for_key(room_id, state_key).await
1025 }
1026}
1027
1028#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1029#[cfg_attr(not(target_family = "wasm"), async_trait)]
1030impl<T: StateStore + ?Sized> StateStoreExt for T {}
1031
1032pub type DynStateStore = dyn StateStore<Error = StoreError>;
1034
1035pub trait IntoStateStore {
1041 #[doc(hidden)]
1042 fn into_state_store(self) -> Arc<DynStateStore>;
1043}
1044
1045impl<T> IntoStateStore for T
1046where
1047 T: StateStore + Sized + 'static,
1048{
1049 fn into_state_store(self) -> Arc<DynStateStore> {
1050 Arc::new(EraseStateStoreError(self))
1051 }
1052}
1053
1054impl<T> IntoStateStore for Arc<T>
1057where
1058 T: StateStore + 'static,
1059{
1060 fn into_state_store(self) -> Arc<DynStateStore> {
1061 let ptr: *const T = Arc::into_raw(self);
1062 let ptr_erased = ptr as *const EraseStateStoreError<T>;
1063 unsafe { Arc::from_raw(ptr_erased) }
1066 }
1067}
1068
1069#[derive(Debug, Clone, Serialize, Deserialize)]
1071pub struct TtlStoreValue<T> {
1072 #[serde(flatten)]
1074 data: T,
1075
1076 last_fetch_ts: f64,
1079}
1080
1081impl<T> TtlStoreValue<T> {
1082 pub const STALE_THRESHOLD: f64 = (1000 * 60 * 60 * 24 * 7) as _; pub fn new(data: T) -> Self {
1087 Self { data, last_fetch_ts: now_timestamp_ms() }
1088 }
1089
1090 pub fn into_data(self) -> Option<T> {
1092 if now_timestamp_ms() - self.last_fetch_ts >= Self::STALE_THRESHOLD {
1093 None
1094 } else {
1095 Some(self.data)
1096 }
1097 }
1098}
1099
1100#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1102pub struct SupportedVersionsResponse {
1103 pub versions: Vec<String>,
1105
1106 pub unstable_features: BTreeMap<String, bool>,
1108}
1109
1110impl SupportedVersionsResponse {
1111 pub fn supported_versions(&self) -> SupportedVersions {
1117 SupportedVersions::from_parts(&self.versions, &self.unstable_features)
1118 }
1119}
1120
1121#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1122pub struct WellKnownResponse {
1124 pub homeserver: HomeserverInfo,
1126
1127 pub identity_server: Option<IdentityServerInfo>,
1129
1130 pub tile_server: Option<TileServerInfo>,
1132
1133 pub rtc_foci: Vec<RtcFocusInfo>,
1135}
1136
1137impl From<discover_homeserver::Response> for WellKnownResponse {
1138 fn from(response: discover_homeserver::Response) -> Self {
1139 Self {
1140 homeserver: response.homeserver,
1141 identity_server: response.identity_server,
1142 tile_server: response.tile_server,
1143 rtc_foci: response.rtc_foci,
1144 }
1145 }
1146}
1147
1148fn now_timestamp_ms() -> f64 {
1150 SystemTime::now()
1151 .duration_since(SystemTime::UNIX_EPOCH)
1152 .expect("System clock was before 1970.")
1153 .as_secs_f64()
1154 * 1000.0
1155}
1156
1157#[derive(Debug, Clone)]
1159pub enum StateStoreDataValue {
1160 SyncToken(String),
1162
1163 SupportedVersions(TtlStoreValue<SupportedVersionsResponse>),
1165
1166 WellKnown(TtlStoreValue<Option<WellKnownResponse>>),
1168
1169 Filter(String),
1171
1172 UserAvatarUrl(OwnedMxcUri),
1174
1175 RecentlyVisitedRooms(Vec<OwnedRoomId>),
1177
1178 UtdHookManagerData(GrowableBloom),
1181
1182 OneTimeKeyAlreadyUploaded,
1185
1186 ComposerDraft(ComposerDraft),
1191
1192 SeenKnockRequests(BTreeMap<OwnedEventId, OwnedUserId>),
1194
1195 ThreadSubscriptionsCatchupTokens(Vec<ThreadSubscriptionCatchupToken>),
1200}
1201
1202#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1212pub struct ThreadSubscriptionCatchupToken {
1213 pub from: String,
1219
1220 pub to: Option<String>,
1226}
1227
1228#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1230pub struct ComposerDraft {
1231 pub plain_text: String,
1233 pub html_text: Option<String>,
1236 pub draft_type: ComposerDraftType,
1238 #[serde(default)]
1240 pub attachments: Vec<DraftAttachment>,
1241}
1242
1243#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1245pub struct DraftAttachment {
1246 pub filename: String,
1248 pub content: DraftAttachmentContent,
1250}
1251
1252#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1254#[serde(tag = "type")]
1255pub enum DraftAttachmentContent {
1256 Image {
1258 data: Vec<u8>,
1260 mimetype: Option<String>,
1262 size: Option<u64>,
1264 width: Option<u64>,
1266 height: Option<u64>,
1268 blurhash: Option<String>,
1270 thumbnail: Option<DraftThumbnail>,
1272 },
1273 Video {
1275 data: Vec<u8>,
1277 mimetype: Option<String>,
1279 size: Option<u64>,
1281 width: Option<u64>,
1283 height: Option<u64>,
1285 duration: Option<std::time::Duration>,
1287 blurhash: Option<String>,
1289 thumbnail: Option<DraftThumbnail>,
1291 },
1292 Audio {
1294 data: Vec<u8>,
1296 mimetype: Option<String>,
1298 size: Option<u64>,
1300 duration: Option<std::time::Duration>,
1302 },
1303 File {
1305 data: Vec<u8>,
1307 mimetype: Option<String>,
1309 size: Option<u64>,
1311 },
1312}
1313
1314#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1316pub struct DraftThumbnail {
1317 pub filename: String,
1319 pub data: Vec<u8>,
1321 pub mimetype: Option<String>,
1323 pub width: Option<u64>,
1325 pub height: Option<u64>,
1327 pub size: Option<u64>,
1329}
1330
1331#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1333pub enum ComposerDraftType {
1334 NewMessage,
1336 Reply {
1338 event_id: OwnedEventId,
1340 },
1341 Edit {
1343 event_id: OwnedEventId,
1345 },
1346}
1347
1348impl StateStoreDataValue {
1349 pub fn into_sync_token(self) -> Option<String> {
1351 as_variant!(self, Self::SyncToken)
1352 }
1353
1354 pub fn into_filter(self) -> Option<String> {
1356 as_variant!(self, Self::Filter)
1357 }
1358
1359 pub fn into_user_avatar_url(self) -> Option<OwnedMxcUri> {
1361 as_variant!(self, Self::UserAvatarUrl)
1362 }
1363
1364 pub fn into_recently_visited_rooms(self) -> Option<Vec<OwnedRoomId>> {
1366 as_variant!(self, Self::RecentlyVisitedRooms)
1367 }
1368
1369 pub fn into_utd_hook_manager_data(self) -> Option<GrowableBloom> {
1371 as_variant!(self, Self::UtdHookManagerData)
1372 }
1373
1374 pub fn into_composer_draft(self) -> Option<ComposerDraft> {
1376 as_variant!(self, Self::ComposerDraft)
1377 }
1378
1379 pub fn into_supported_versions(self) -> Option<TtlStoreValue<SupportedVersionsResponse>> {
1381 as_variant!(self, Self::SupportedVersions)
1382 }
1383
1384 pub fn into_well_known(self) -> Option<TtlStoreValue<Option<WellKnownResponse>>> {
1386 as_variant!(self, Self::WellKnown)
1387 }
1388
1389 pub fn into_seen_knock_requests(self) -> Option<BTreeMap<OwnedEventId, OwnedUserId>> {
1391 as_variant!(self, Self::SeenKnockRequests)
1392 }
1393
1394 pub fn into_thread_subscriptions_catchup_tokens(
1397 self,
1398 ) -> Option<Vec<ThreadSubscriptionCatchupToken>> {
1399 as_variant!(self, Self::ThreadSubscriptionsCatchupTokens)
1400 }
1401}
1402
1403#[derive(Debug, Clone, Copy)]
1405pub enum StateStoreDataKey<'a> {
1406 SyncToken,
1408
1409 SupportedVersions,
1411
1412 WellKnown,
1414
1415 Filter(&'a str),
1417
1418 UserAvatarUrl(&'a UserId),
1420
1421 RecentlyVisitedRooms(&'a UserId),
1423
1424 UtdHookManagerData,
1427
1428 OneTimeKeyAlreadyUploaded,
1431
1432 ComposerDraft(&'a RoomId, Option<&'a EventId>),
1437
1438 SeenKnockRequests(&'a RoomId),
1440
1441 ThreadSubscriptionsCatchupTokens,
1443}
1444
1445impl StateStoreDataKey<'_> {
1446 pub const SYNC_TOKEN: &'static str = "sync_token";
1448
1449 pub const SUPPORTED_VERSIONS: &'static str = "server_capabilities"; pub const WELL_KNOWN: &'static str = "well_known";
1456
1457 pub const FILTER: &'static str = "filter";
1459
1460 pub const USER_AVATAR_URL: &'static str = "user_avatar_url";
1463
1464 pub const RECENTLY_VISITED_ROOMS: &'static str = "recently_visited_rooms";
1467
1468 pub const UTD_HOOK_MANAGER_DATA: &'static str = "utd_hook_manager_data";
1471
1472 pub const ONE_TIME_KEY_ALREADY_UPLOADED: &'static str = "one_time_key_already_uploaded";
1475
1476 pub const COMPOSER_DRAFT: &'static str = "composer_draft";
1479
1480 pub const SEEN_KNOCK_REQUESTS: &'static str = "seen_knock_requests";
1483
1484 pub const THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS: &'static str =
1487 "thread_subscriptions_catchup_tokens";
1488}
1489
1490pub fn compare_thread_subscription_bump_stamps(
1499 previous: Option<u64>,
1500 new: &mut Option<u64>,
1501) -> bool {
1502 match (previous, &new) {
1503 (Some(prev_bump), None) => {
1506 *new = Some(prev_bump);
1507 }
1508
1509 (Some(prev_bump), Some(new_bump)) if *new_bump <= prev_bump => {
1511 return false;
1512 }
1513
1514 _ => {}
1516 }
1517
1518 true
1519}
1520
1521#[cfg(test)]
1522mod tests {
1523 use serde_json::json;
1524
1525 use super::{SupportedVersionsResponse, TtlStoreValue, now_timestamp_ms};
1526
1527 #[test]
1528 fn test_stale_ttl_store_value() {
1529 let ttl_value = TtlStoreValue {
1531 data: (),
1532 last_fetch_ts: now_timestamp_ms() - TtlStoreValue::<()>::STALE_THRESHOLD - 1.0,
1533 };
1534 assert!(ttl_value.into_data().is_none());
1535
1536 let ttl_value = TtlStoreValue::new(());
1538 assert!(ttl_value.into_data().is_some());
1539 }
1540
1541 #[test]
1542 fn test_stale_ttl_store_value_serialize_roundtrip() {
1543 let server_info = SupportedVersionsResponse {
1544 versions: vec!["1.2".to_owned(), "1.3".to_owned(), "1.4".to_owned()],
1545 unstable_features: [("org.matrix.msc3916.stable".to_owned(), true)].into(),
1546 };
1547 let ttl_value = TtlStoreValue { data: server_info.clone(), last_fetch_ts: 1000.0 };
1548 let json = json!({
1549 "versions": ["1.2", "1.3", "1.4"],
1550 "unstable_features": {
1551 "org.matrix.msc3916.stable": true,
1552 },
1553 "last_fetch_ts": 1000.0,
1554 });
1555
1556 assert_eq!(serde_json::to_value(&ttl_value).unwrap(), json);
1557
1558 let deserialized =
1559 serde_json::from_value::<TtlStoreValue<SupportedVersionsResponse>>(json).unwrap();
1560 assert_eq!(deserialized.data, server_info);
1561 assert!(deserialized.last_fetch_ts - ttl_value.last_fetch_ts < 0.0001);
1562 }
1563}