1use std::{
16 borrow::Borrow,
17 collections::{BTreeMap, BTreeSet, HashMap},
18 fmt,
19 sync::Arc,
20};
21
22use as_variant::as_variant;
23use async_trait::async_trait;
24use growable_bloom_filter::GrowableBloom;
25use matrix_sdk_common::AsyncTraitDeps;
26use ruma::{
27 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri, OwnedRoomId,
28 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
29 api::{
30 SupportedVersions,
31 client::discovery::discover_homeserver::{
32 self, HomeserverInfo, IdentityServerInfo, RtcFocusInfo, TileServerInfo,
33 },
34 },
35 events::{
36 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, EmptyStateKey, GlobalAccountDataEvent,
37 GlobalAccountDataEventContent, GlobalAccountDataEventType, RedactContent,
38 RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
39 RoomAccountDataEventType, StateEventType, StaticEventContent, StaticStateEventContent,
40 presence::PresenceEvent,
41 receipt::{Receipt, ReceiptThread, ReceiptType},
42 },
43 serde::Raw,
44 time::SystemTime,
45};
46use serde::{Deserialize, Serialize};
47
48use super::{
49 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError,
50 QueuedRequest, QueuedRequestKind, RoomLoadSettings, StateChanges, StoreError,
51 send_queue::SentRequestKey,
52};
53use crate::{
54 MinimalRoomMemberEvent, RoomInfo, RoomMemberships,
55 deserialized_responses::{
56 DisplayName, RawAnySyncOrStrippedState, RawMemberEvent, RawSyncOrStrippedState,
57 },
58 store::StoredThreadSubscription,
59};
60
61#[cfg_attr(target_family = "wasm", async_trait(?Send))]
64#[cfg_attr(not(target_family = "wasm"), async_trait)]
65pub trait StateStore: AsyncTraitDeps {
66 type Error: fmt::Debug + Into<StoreError> + From<serde_json::Error>;
68
69 async fn get_kv_data(
75 &self,
76 key: StateStoreDataKey<'_>,
77 ) -> Result<Option<StateStoreDataValue>, Self::Error>;
78
79 async fn set_kv_data(
89 &self,
90 key: StateStoreDataKey<'_>,
91 value: StateStoreDataValue,
92 ) -> Result<(), Self::Error>;
93
94 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error>;
100
101 async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error>;
103
104 async fn get_presence_event(
111 &self,
112 user_id: &UserId,
113 ) -> Result<Option<Raw<PresenceEvent>>, Self::Error>;
114
115 async fn get_presence_events(
121 &self,
122 user_ids: &[OwnedUserId],
123 ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error>;
124
125 async fn get_state_event(
133 &self,
134 room_id: &RoomId,
135 event_type: StateEventType,
136 state_key: &str,
137 ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error>;
138
139 async fn get_state_events(
147 &self,
148 room_id: &RoomId,
149 event_type: StateEventType,
150 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error>;
151
152 async fn get_state_events_for_keys(
163 &self,
164 room_id: &RoomId,
165 event_type: StateEventType,
166 state_keys: &[&str],
167 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error>;
168
169 async fn get_profile(
177 &self,
178 room_id: &RoomId,
179 user_id: &UserId,
180 ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error>;
181
182 async fn get_profiles<'a>(
190 &self,
191 room_id: &RoomId,
192 user_ids: &'a [OwnedUserId],
193 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error>;
194
195 async fn get_user_ids(
198 &self,
199 room_id: &RoomId,
200 memberships: RoomMemberships,
201 ) -> Result<Vec<OwnedUserId>, Self::Error>;
202
203 async fn get_room_infos(
205 &self,
206 room_load_settings: &RoomLoadSettings,
207 ) -> Result<Vec<RoomInfo>, Self::Error>;
208
209 async fn get_users_with_display_name(
218 &self,
219 room_id: &RoomId,
220 display_name: &DisplayName,
221 ) -> Result<BTreeSet<OwnedUserId>, Self::Error>;
222
223 async fn get_users_with_display_names<'a>(
231 &self,
232 room_id: &RoomId,
233 display_names: &'a [DisplayName],
234 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error>;
235
236 async fn get_account_data_event(
242 &self,
243 event_type: GlobalAccountDataEventType,
244 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error>;
245
246 async fn get_room_account_data_event(
256 &self,
257 room_id: &RoomId,
258 event_type: RoomAccountDataEventType,
259 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error>;
260
261 async fn get_user_room_receipt_event(
274 &self,
275 room_id: &RoomId,
276 receipt_type: ReceiptType,
277 thread: ReceiptThread,
278 user_id: &UserId,
279 ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error>;
280
281 async fn get_event_room_receipt_events(
295 &self,
296 room_id: &RoomId,
297 receipt_type: ReceiptType,
298 thread: ReceiptThread,
299 event_id: &EventId,
300 ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error>;
301
302 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
308
309 async fn set_custom_value(
318 &self,
319 key: &[u8],
320 value: Vec<u8>,
321 ) -> Result<Option<Vec<u8>>, Self::Error>;
322
323 async fn set_custom_value_no_read(
337 &self,
338 key: &[u8],
339 value: Vec<u8>,
340 ) -> Result<(), Self::Error> {
341 self.set_custom_value(key, value).await.map(|_| ())
342 }
343
344 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
350
351 async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error>;
357
358 async fn save_send_queue_request(
368 &self,
369 room_id: &RoomId,
370 transaction_id: OwnedTransactionId,
371 created_at: MilliSecondsSinceUnixEpoch,
372 request: QueuedRequestKind,
373 priority: usize,
374 ) -> Result<(), Self::Error>;
375
376 async fn update_send_queue_request(
388 &self,
389 room_id: &RoomId,
390 transaction_id: &TransactionId,
391 content: QueuedRequestKind,
392 ) -> Result<bool, Self::Error>;
393
394 async fn remove_send_queue_request(
400 &self,
401 room_id: &RoomId,
402 transaction_id: &TransactionId,
403 ) -> Result<bool, Self::Error>;
404
405 async fn load_send_queue_requests(
411 &self,
412 room_id: &RoomId,
413 ) -> Result<Vec<QueuedRequest>, Self::Error>;
414
415 async fn update_send_queue_request_status(
418 &self,
419 room_id: &RoomId,
420 transaction_id: &TransactionId,
421 error: Option<QueueWedgeError>,
422 ) -> Result<(), Self::Error>;
423
424 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error>;
426
427 async fn save_dependent_queued_request(
430 &self,
431 room_id: &RoomId,
432 parent_txn_id: &TransactionId,
433 own_txn_id: ChildTransactionId,
434 created_at: MilliSecondsSinceUnixEpoch,
435 content: DependentQueuedRequestKind,
436 ) -> Result<(), Self::Error>;
437
438 async fn mark_dependent_queued_requests_as_ready(
447 &self,
448 room_id: &RoomId,
449 parent_txn_id: &TransactionId,
450 sent_parent_key: SentRequestKey,
451 ) -> Result<usize, Self::Error>;
452
453 async fn update_dependent_queued_request(
457 &self,
458 room_id: &RoomId,
459 own_transaction_id: &ChildTransactionId,
460 new_content: DependentQueuedRequestKind,
461 ) -> Result<bool, Self::Error>;
462
463 async fn remove_dependent_queued_request(
468 &self,
469 room: &RoomId,
470 own_txn_id: &ChildTransactionId,
471 ) -> Result<bool, Self::Error>;
472
473 async fn load_dependent_queued_requests(
479 &self,
480 room: &RoomId,
481 ) -> Result<Vec<DependentQueuedRequest>, Self::Error>;
482
483 async fn upsert_thread_subscriptions(
493 &self,
494 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
495 ) -> Result<(), Self::Error>;
496
497 async fn remove_thread_subscription(
501 &self,
502 room: &RoomId,
503 thread_id: &EventId,
504 ) -> Result<(), Self::Error>;
505
506 async fn load_thread_subscription(
510 &self,
511 room: &RoomId,
512 thread_id: &EventId,
513 ) -> Result<Option<StoredThreadSubscription>, Self::Error>;
514
515 #[doc(hidden)]
521 async fn optimize(&self) -> Result<(), Self::Error>;
522
523 async fn get_size(&self) -> Result<Option<usize>, Self::Error>;
525}
526
527#[repr(transparent)]
528struct EraseStateStoreError<T>(T);
529
530#[cfg(not(tarpaulin_include))]
531impl<T: fmt::Debug> fmt::Debug for EraseStateStoreError<T> {
532 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
533 self.0.fmt(f)
534 }
535}
536
537#[cfg_attr(target_family = "wasm", async_trait(?Send))]
538#[cfg_attr(not(target_family = "wasm"), async_trait)]
539impl<T: StateStore> StateStore for EraseStateStoreError<T> {
540 type Error = StoreError;
541
542 async fn get_kv_data(
543 &self,
544 key: StateStoreDataKey<'_>,
545 ) -> Result<Option<StateStoreDataValue>, Self::Error> {
546 self.0.get_kv_data(key).await.map_err(Into::into)
547 }
548
549 async fn set_kv_data(
550 &self,
551 key: StateStoreDataKey<'_>,
552 value: StateStoreDataValue,
553 ) -> Result<(), Self::Error> {
554 self.0.set_kv_data(key, value).await.map_err(Into::into)
555 }
556
557 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
558 self.0.remove_kv_data(key).await.map_err(Into::into)
559 }
560
561 async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
562 self.0.save_changes(changes).await.map_err(Into::into)
563 }
564
565 async fn get_presence_event(
566 &self,
567 user_id: &UserId,
568 ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
569 self.0.get_presence_event(user_id).await.map_err(Into::into)
570 }
571
572 async fn get_presence_events(
573 &self,
574 user_ids: &[OwnedUserId],
575 ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
576 self.0.get_presence_events(user_ids).await.map_err(Into::into)
577 }
578
579 async fn get_state_event(
580 &self,
581 room_id: &RoomId,
582 event_type: StateEventType,
583 state_key: &str,
584 ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
585 self.0.get_state_event(room_id, event_type, state_key).await.map_err(Into::into)
586 }
587
588 async fn get_state_events(
589 &self,
590 room_id: &RoomId,
591 event_type: StateEventType,
592 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
593 self.0.get_state_events(room_id, event_type).await.map_err(Into::into)
594 }
595
596 async fn get_state_events_for_keys(
597 &self,
598 room_id: &RoomId,
599 event_type: StateEventType,
600 state_keys: &[&str],
601 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
602 self.0.get_state_events_for_keys(room_id, event_type, state_keys).await.map_err(Into::into)
603 }
604
605 async fn get_profile(
606 &self,
607 room_id: &RoomId,
608 user_id: &UserId,
609 ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
610 self.0.get_profile(room_id, user_id).await.map_err(Into::into)
611 }
612
613 async fn get_profiles<'a>(
614 &self,
615 room_id: &RoomId,
616 user_ids: &'a [OwnedUserId],
617 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
618 self.0.get_profiles(room_id, user_ids).await.map_err(Into::into)
619 }
620
621 async fn get_user_ids(
622 &self,
623 room_id: &RoomId,
624 memberships: RoomMemberships,
625 ) -> Result<Vec<OwnedUserId>, Self::Error> {
626 self.0.get_user_ids(room_id, memberships).await.map_err(Into::into)
627 }
628
629 async fn get_room_infos(
630 &self,
631 room_load_settings: &RoomLoadSettings,
632 ) -> Result<Vec<RoomInfo>, Self::Error> {
633 self.0.get_room_infos(room_load_settings).await.map_err(Into::into)
634 }
635
636 async fn get_users_with_display_name(
637 &self,
638 room_id: &RoomId,
639 display_name: &DisplayName,
640 ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
641 self.0.get_users_with_display_name(room_id, display_name).await.map_err(Into::into)
642 }
643
644 async fn get_users_with_display_names<'a>(
645 &self,
646 room_id: &RoomId,
647 display_names: &'a [DisplayName],
648 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
649 self.0.get_users_with_display_names(room_id, display_names).await.map_err(Into::into)
650 }
651
652 async fn get_account_data_event(
653 &self,
654 event_type: GlobalAccountDataEventType,
655 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
656 self.0.get_account_data_event(event_type).await.map_err(Into::into)
657 }
658
659 async fn get_room_account_data_event(
660 &self,
661 room_id: &RoomId,
662 event_type: RoomAccountDataEventType,
663 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
664 self.0.get_room_account_data_event(room_id, event_type).await.map_err(Into::into)
665 }
666
667 async fn get_user_room_receipt_event(
668 &self,
669 room_id: &RoomId,
670 receipt_type: ReceiptType,
671 thread: ReceiptThread,
672 user_id: &UserId,
673 ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
674 self.0
675 .get_user_room_receipt_event(room_id, receipt_type, thread, user_id)
676 .await
677 .map_err(Into::into)
678 }
679
680 async fn get_event_room_receipt_events(
681 &self,
682 room_id: &RoomId,
683 receipt_type: ReceiptType,
684 thread: ReceiptThread,
685 event_id: &EventId,
686 ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
687 self.0
688 .get_event_room_receipt_events(room_id, receipt_type, thread, event_id)
689 .await
690 .map_err(Into::into)
691 }
692
693 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
694 self.0.get_custom_value(key).await.map_err(Into::into)
695 }
696
697 async fn set_custom_value(
698 &self,
699 key: &[u8],
700 value: Vec<u8>,
701 ) -> Result<Option<Vec<u8>>, Self::Error> {
702 self.0.set_custom_value(key, value).await.map_err(Into::into)
703 }
704
705 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
706 self.0.remove_custom_value(key).await.map_err(Into::into)
707 }
708
709 async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
710 self.0.remove_room(room_id).await.map_err(Into::into)
711 }
712
713 async fn save_send_queue_request(
714 &self,
715 room_id: &RoomId,
716 transaction_id: OwnedTransactionId,
717 created_at: MilliSecondsSinceUnixEpoch,
718 content: QueuedRequestKind,
719 priority: usize,
720 ) -> Result<(), Self::Error> {
721 self.0
722 .save_send_queue_request(room_id, transaction_id, created_at, content, priority)
723 .await
724 .map_err(Into::into)
725 }
726
727 async fn update_send_queue_request(
728 &self,
729 room_id: &RoomId,
730 transaction_id: &TransactionId,
731 content: QueuedRequestKind,
732 ) -> Result<bool, Self::Error> {
733 self.0.update_send_queue_request(room_id, transaction_id, content).await.map_err(Into::into)
734 }
735
736 async fn remove_send_queue_request(
737 &self,
738 room_id: &RoomId,
739 transaction_id: &TransactionId,
740 ) -> Result<bool, Self::Error> {
741 self.0.remove_send_queue_request(room_id, transaction_id).await.map_err(Into::into)
742 }
743
744 async fn load_send_queue_requests(
745 &self,
746 room_id: &RoomId,
747 ) -> Result<Vec<QueuedRequest>, Self::Error> {
748 self.0.load_send_queue_requests(room_id).await.map_err(Into::into)
749 }
750
751 async fn update_send_queue_request_status(
752 &self,
753 room_id: &RoomId,
754 transaction_id: &TransactionId,
755 error: Option<QueueWedgeError>,
756 ) -> Result<(), Self::Error> {
757 self.0
758 .update_send_queue_request_status(room_id, transaction_id, error)
759 .await
760 .map_err(Into::into)
761 }
762
763 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
764 self.0.load_rooms_with_unsent_requests().await.map_err(Into::into)
765 }
766
767 async fn save_dependent_queued_request(
768 &self,
769 room_id: &RoomId,
770 parent_txn_id: &TransactionId,
771 own_txn_id: ChildTransactionId,
772 created_at: MilliSecondsSinceUnixEpoch,
773 content: DependentQueuedRequestKind,
774 ) -> Result<(), Self::Error> {
775 self.0
776 .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
777 .await
778 .map_err(Into::into)
779 }
780
781 async fn mark_dependent_queued_requests_as_ready(
782 &self,
783 room_id: &RoomId,
784 parent_txn_id: &TransactionId,
785 sent_parent_key: SentRequestKey,
786 ) -> Result<usize, Self::Error> {
787 self.0
788 .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
789 .await
790 .map_err(Into::into)
791 }
792
793 async fn remove_dependent_queued_request(
794 &self,
795 room_id: &RoomId,
796 own_txn_id: &ChildTransactionId,
797 ) -> Result<bool, Self::Error> {
798 self.0.remove_dependent_queued_request(room_id, own_txn_id).await.map_err(Into::into)
799 }
800
801 async fn load_dependent_queued_requests(
802 &self,
803 room_id: &RoomId,
804 ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
805 self.0.load_dependent_queued_requests(room_id).await.map_err(Into::into)
806 }
807
808 async fn update_dependent_queued_request(
809 &self,
810 room_id: &RoomId,
811 own_transaction_id: &ChildTransactionId,
812 new_content: DependentQueuedRequestKind,
813 ) -> Result<bool, Self::Error> {
814 self.0
815 .update_dependent_queued_request(room_id, own_transaction_id, new_content)
816 .await
817 .map_err(Into::into)
818 }
819
820 async fn upsert_thread_subscriptions(
821 &self,
822 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
823 ) -> Result<(), Self::Error> {
824 self.0.upsert_thread_subscriptions(updates).await.map_err(Into::into)
825 }
826
827 async fn load_thread_subscription(
828 &self,
829 room: &RoomId,
830 thread_id: &EventId,
831 ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
832 self.0.load_thread_subscription(room, thread_id).await.map_err(Into::into)
833 }
834
835 async fn remove_thread_subscription(
836 &self,
837 room: &RoomId,
838 thread_id: &EventId,
839 ) -> Result<(), Self::Error> {
840 self.0.remove_thread_subscription(room, thread_id).await.map_err(Into::into)
841 }
842
843 async fn optimize(&self) -> Result<(), Self::Error> {
844 self.0.optimize().await.map_err(Into::into)
845 }
846
847 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
848 self.0.get_size().await.map_err(Into::into)
849 }
850}
851
852#[cfg_attr(target_family = "wasm", async_trait(?Send))]
854#[cfg_attr(not(target_family = "wasm"), async_trait)]
855pub trait StateStoreExt: StateStore {
856 async fn get_state_event_static<C>(
862 &self,
863 room_id: &RoomId,
864 ) -> Result<Option<RawSyncOrStrippedState<C>>, Self::Error>
865 where
866 C: StaticEventContent<IsPrefix = ruma::events::False>
867 + StaticStateEventContent<StateKey = EmptyStateKey>
868 + RedactContent,
869 C::Redacted: RedactedStateEventContent,
870 {
871 Ok(self.get_state_event(room_id, C::TYPE.into(), "").await?.map(|raw| raw.cast()))
872 }
873
874 async fn get_state_event_static_for_key<C, K>(
880 &self,
881 room_id: &RoomId,
882 state_key: &K,
883 ) -> Result<Option<RawSyncOrStrippedState<C>>, Self::Error>
884 where
885 C: StaticEventContent<IsPrefix = ruma::events::False>
886 + StaticStateEventContent
887 + RedactContent,
888 C::StateKey: Borrow<K>,
889 C::Redacted: RedactedStateEventContent,
890 K: AsRef<str> + ?Sized + Sync,
891 {
892 Ok(self
893 .get_state_event(room_id, C::TYPE.into(), state_key.as_ref())
894 .await?
895 .map(|raw| raw.cast()))
896 }
897
898 async fn get_state_events_static<C>(
904 &self,
905 room_id: &RoomId,
906 ) -> Result<Vec<RawSyncOrStrippedState<C>>, Self::Error>
907 where
908 C: StaticEventContent<IsPrefix = ruma::events::False>
909 + StaticStateEventContent
910 + RedactContent,
911 C::Redacted: RedactedStateEventContent,
912 {
913 Ok(self
915 .get_state_events(room_id, C::TYPE.into())
916 .await?
917 .into_iter()
918 .map(|raw| raw.cast())
919 .collect())
920 }
921
922 async fn get_state_events_for_keys_static<'a, C, K, I>(
931 &self,
932 room_id: &RoomId,
933 state_keys: I,
934 ) -> Result<Vec<RawSyncOrStrippedState<C>>, Self::Error>
935 where
936 C: StaticEventContent<IsPrefix = ruma::events::False>
937 + StaticStateEventContent
938 + RedactContent,
939 C::StateKey: Borrow<K>,
940 C::Redacted: RedactedStateEventContent,
941 K: AsRef<str> + Sized + Sync + 'a,
942 I: IntoIterator<Item = &'a K> + Send,
943 I::IntoIter: Send,
944 {
945 Ok(self
946 .get_state_events_for_keys(
947 room_id,
948 C::TYPE.into(),
949 &state_keys.into_iter().map(|k| k.as_ref()).collect::<Vec<_>>(),
950 )
951 .await?
952 .into_iter()
953 .map(|raw| raw.cast())
954 .collect())
955 }
956
957 async fn get_account_data_event_static<C>(
959 &self,
960 ) -> Result<Option<Raw<GlobalAccountDataEvent<C>>>, Self::Error>
961 where
962 C: StaticEventContent<IsPrefix = ruma::events::False> + GlobalAccountDataEventContent,
963 {
964 Ok(self.get_account_data_event(C::TYPE.into()).await?.map(Raw::cast_unchecked))
965 }
966
967 async fn get_room_account_data_event_static<C>(
975 &self,
976 room_id: &RoomId,
977 ) -> Result<Option<Raw<RoomAccountDataEvent<C>>>, Self::Error>
978 where
979 C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
980 {
981 Ok(self
982 .get_room_account_data_event(room_id, C::TYPE.into())
983 .await?
984 .map(Raw::cast_unchecked))
985 }
986
987 async fn get_member_event(
995 &self,
996 room_id: &RoomId,
997 state_key: &UserId,
998 ) -> Result<Option<RawMemberEvent>, Self::Error> {
999 self.get_state_event_static_for_key(room_id, state_key).await
1000 }
1001}
1002
1003#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1004#[cfg_attr(not(target_family = "wasm"), async_trait)]
1005impl<T: StateStore + ?Sized> StateStoreExt for T {}
1006
1007pub type DynStateStore = dyn StateStore<Error = StoreError>;
1009
1010pub trait IntoStateStore {
1016 #[doc(hidden)]
1017 fn into_state_store(self) -> Arc<DynStateStore>;
1018}
1019
1020impl<T> IntoStateStore for T
1021where
1022 T: StateStore + Sized + 'static,
1023{
1024 fn into_state_store(self) -> Arc<DynStateStore> {
1025 Arc::new(EraseStateStoreError(self))
1026 }
1027}
1028
1029impl<T> IntoStateStore for Arc<T>
1032where
1033 T: StateStore + 'static,
1034{
1035 fn into_state_store(self) -> Arc<DynStateStore> {
1036 let ptr: *const T = Arc::into_raw(self);
1037 let ptr_erased = ptr as *const EraseStateStoreError<T>;
1038 unsafe { Arc::from_raw(ptr_erased) }
1041 }
1042}
1043
1044#[derive(Debug, Clone, Serialize, Deserialize)]
1046pub struct TtlStoreValue<T> {
1047 #[serde(flatten)]
1049 data: T,
1050
1051 last_fetch_ts: f64,
1054}
1055
1056impl<T> TtlStoreValue<T> {
1057 pub const STALE_THRESHOLD: f64 = (1000 * 60 * 60 * 24 * 7) as _; pub fn new(data: T) -> Self {
1062 Self { data, last_fetch_ts: now_timestamp_ms() }
1063 }
1064
1065 pub fn into_data(self) -> Option<T> {
1067 if now_timestamp_ms() - self.last_fetch_ts >= Self::STALE_THRESHOLD {
1068 None
1069 } else {
1070 Some(self.data)
1071 }
1072 }
1073}
1074
1075#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1077pub struct SupportedVersionsResponse {
1078 pub versions: Vec<String>,
1080
1081 pub unstable_features: BTreeMap<String, bool>,
1083}
1084
1085impl SupportedVersionsResponse {
1086 pub fn supported_versions(&self) -> SupportedVersions {
1092 SupportedVersions::from_parts(&self.versions, &self.unstable_features)
1093 }
1094}
1095
1096#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1097pub struct WellKnownResponse {
1099 pub homeserver: HomeserverInfo,
1101
1102 pub identity_server: Option<IdentityServerInfo>,
1104
1105 pub tile_server: Option<TileServerInfo>,
1107
1108 pub rtc_foci: Vec<RtcFocusInfo>,
1110}
1111
1112impl From<discover_homeserver::Response> for WellKnownResponse {
1113 fn from(response: discover_homeserver::Response) -> Self {
1114 Self {
1115 homeserver: response.homeserver,
1116 identity_server: response.identity_server,
1117 tile_server: response.tile_server,
1118 rtc_foci: response.rtc_foci,
1119 }
1120 }
1121}
1122
1123fn now_timestamp_ms() -> f64 {
1125 SystemTime::now()
1126 .duration_since(SystemTime::UNIX_EPOCH)
1127 .expect("System clock was before 1970.")
1128 .as_secs_f64()
1129 * 1000.0
1130}
1131
1132#[derive(Debug, Clone)]
1134pub enum StateStoreDataValue {
1135 SyncToken(String),
1137
1138 SupportedVersions(TtlStoreValue<SupportedVersionsResponse>),
1140
1141 WellKnown(TtlStoreValue<Option<WellKnownResponse>>),
1143
1144 Filter(String),
1146
1147 UserAvatarUrl(OwnedMxcUri),
1149
1150 RecentlyVisitedRooms(Vec<OwnedRoomId>),
1152
1153 UtdHookManagerData(GrowableBloom),
1156
1157 OneTimeKeyAlreadyUploaded,
1160
1161 ComposerDraft(ComposerDraft),
1166
1167 SeenKnockRequests(BTreeMap<OwnedEventId, OwnedUserId>),
1169
1170 ThreadSubscriptionsCatchupTokens(Vec<ThreadSubscriptionCatchupToken>),
1175}
1176
1177#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1187pub struct ThreadSubscriptionCatchupToken {
1188 pub from: String,
1194
1195 pub to: Option<String>,
1201}
1202
1203#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1205pub struct ComposerDraft {
1206 pub plain_text: String,
1208 pub html_text: Option<String>,
1211 pub draft_type: ComposerDraftType,
1213 #[serde(default)]
1215 pub attachments: Vec<DraftAttachment>,
1216}
1217
1218#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1220pub struct DraftAttachment {
1221 pub filename: String,
1223 pub content: DraftAttachmentContent,
1225}
1226
1227#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1229#[serde(tag = "type")]
1230pub enum DraftAttachmentContent {
1231 Image {
1233 data: Vec<u8>,
1235 mimetype: Option<String>,
1237 size: Option<u64>,
1239 width: Option<u64>,
1241 height: Option<u64>,
1243 blurhash: Option<String>,
1245 thumbnail: Option<DraftThumbnail>,
1247 },
1248 Video {
1250 data: Vec<u8>,
1252 mimetype: Option<String>,
1254 size: Option<u64>,
1256 width: Option<u64>,
1258 height: Option<u64>,
1260 duration: Option<std::time::Duration>,
1262 blurhash: Option<String>,
1264 thumbnail: Option<DraftThumbnail>,
1266 },
1267 Audio {
1269 data: Vec<u8>,
1271 mimetype: Option<String>,
1273 size: Option<u64>,
1275 duration: Option<std::time::Duration>,
1277 },
1278 File {
1280 data: Vec<u8>,
1282 mimetype: Option<String>,
1284 size: Option<u64>,
1286 },
1287}
1288
1289#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1291pub struct DraftThumbnail {
1292 pub filename: String,
1294 pub data: Vec<u8>,
1296 pub mimetype: Option<String>,
1298 pub width: Option<u64>,
1300 pub height: Option<u64>,
1302 pub size: Option<u64>,
1304}
1305
1306#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1308pub enum ComposerDraftType {
1309 NewMessage,
1311 Reply {
1313 event_id: OwnedEventId,
1315 },
1316 Edit {
1318 event_id: OwnedEventId,
1320 },
1321}
1322
1323impl StateStoreDataValue {
1324 pub fn into_sync_token(self) -> Option<String> {
1326 as_variant!(self, Self::SyncToken)
1327 }
1328
1329 pub fn into_filter(self) -> Option<String> {
1331 as_variant!(self, Self::Filter)
1332 }
1333
1334 pub fn into_user_avatar_url(self) -> Option<OwnedMxcUri> {
1336 as_variant!(self, Self::UserAvatarUrl)
1337 }
1338
1339 pub fn into_recently_visited_rooms(self) -> Option<Vec<OwnedRoomId>> {
1341 as_variant!(self, Self::RecentlyVisitedRooms)
1342 }
1343
1344 pub fn into_utd_hook_manager_data(self) -> Option<GrowableBloom> {
1346 as_variant!(self, Self::UtdHookManagerData)
1347 }
1348
1349 pub fn into_composer_draft(self) -> Option<ComposerDraft> {
1351 as_variant!(self, Self::ComposerDraft)
1352 }
1353
1354 pub fn into_supported_versions(self) -> Option<TtlStoreValue<SupportedVersionsResponse>> {
1356 as_variant!(self, Self::SupportedVersions)
1357 }
1358
1359 pub fn into_well_known(self) -> Option<TtlStoreValue<Option<WellKnownResponse>>> {
1361 as_variant!(self, Self::WellKnown)
1362 }
1363
1364 pub fn into_seen_knock_requests(self) -> Option<BTreeMap<OwnedEventId, OwnedUserId>> {
1366 as_variant!(self, Self::SeenKnockRequests)
1367 }
1368
1369 pub fn into_thread_subscriptions_catchup_tokens(
1372 self,
1373 ) -> Option<Vec<ThreadSubscriptionCatchupToken>> {
1374 as_variant!(self, Self::ThreadSubscriptionsCatchupTokens)
1375 }
1376}
1377
1378#[derive(Debug, Clone, Copy)]
1380pub enum StateStoreDataKey<'a> {
1381 SyncToken,
1383
1384 SupportedVersions,
1386
1387 WellKnown,
1389
1390 Filter(&'a str),
1392
1393 UserAvatarUrl(&'a UserId),
1395
1396 RecentlyVisitedRooms(&'a UserId),
1398
1399 UtdHookManagerData,
1402
1403 OneTimeKeyAlreadyUploaded,
1406
1407 ComposerDraft(&'a RoomId, Option<&'a EventId>),
1412
1413 SeenKnockRequests(&'a RoomId),
1415
1416 ThreadSubscriptionsCatchupTokens,
1418}
1419
1420impl StateStoreDataKey<'_> {
1421 pub const SYNC_TOKEN: &'static str = "sync_token";
1423
1424 pub const SUPPORTED_VERSIONS: &'static str = "server_capabilities"; pub const WELL_KNOWN: &'static str = "well_known";
1431
1432 pub const FILTER: &'static str = "filter";
1434
1435 pub const USER_AVATAR_URL: &'static str = "user_avatar_url";
1438
1439 pub const RECENTLY_VISITED_ROOMS: &'static str = "recently_visited_rooms";
1442
1443 pub const UTD_HOOK_MANAGER_DATA: &'static str = "utd_hook_manager_data";
1446
1447 pub const ONE_TIME_KEY_ALREADY_UPLOADED: &'static str = "one_time_key_already_uploaded";
1450
1451 pub const COMPOSER_DRAFT: &'static str = "composer_draft";
1454
1455 pub const SEEN_KNOCK_REQUESTS: &'static str = "seen_knock_requests";
1458
1459 pub const THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS: &'static str =
1462 "thread_subscriptions_catchup_tokens";
1463}
1464
1465pub fn compare_thread_subscription_bump_stamps(
1474 previous: Option<u64>,
1475 new: &mut Option<u64>,
1476) -> bool {
1477 match (previous, &new) {
1478 (Some(prev_bump), None) => {
1481 *new = Some(prev_bump);
1482 }
1483
1484 (Some(prev_bump), Some(new_bump)) if *new_bump <= prev_bump => {
1486 return false;
1487 }
1488
1489 _ => {}
1491 }
1492
1493 true
1494}
1495
1496#[cfg(test)]
1497mod tests {
1498 use serde_json::json;
1499
1500 use super::{SupportedVersionsResponse, TtlStoreValue, now_timestamp_ms};
1501
1502 #[test]
1503 fn test_stale_ttl_store_value() {
1504 let ttl_value = TtlStoreValue {
1506 data: (),
1507 last_fetch_ts: now_timestamp_ms() - TtlStoreValue::<()>::STALE_THRESHOLD - 1.0,
1508 };
1509 assert!(ttl_value.into_data().is_none());
1510
1511 let ttl_value = TtlStoreValue::new(());
1513 assert!(ttl_value.into_data().is_some());
1514 }
1515
1516 #[test]
1517 fn test_stale_ttl_store_value_serialize_roundtrip() {
1518 let server_info = SupportedVersionsResponse {
1519 versions: vec!["1.2".to_owned(), "1.3".to_owned(), "1.4".to_owned()],
1520 unstable_features: [("org.matrix.msc3916.stable".to_owned(), true)].into(),
1521 };
1522 let ttl_value = TtlStoreValue { data: server_info.clone(), last_fetch_ts: 1000.0 };
1523 let json = json!({
1524 "versions": ["1.2", "1.3", "1.4"],
1525 "unstable_features": {
1526 "org.matrix.msc3916.stable": true,
1527 },
1528 "last_fetch_ts": 1000.0,
1529 });
1530
1531 assert_eq!(serde_json::to_value(&ttl_value).unwrap(), json);
1532
1533 let deserialized =
1534 serde_json::from_value::<TtlStoreValue<SupportedVersionsResponse>>(json).unwrap();
1535 assert_eq!(deserialized.data, server_info);
1536 assert!(deserialized.last_fetch_ts - ttl_value.last_fetch_ts < 0.0001);
1537 }
1538}