1use std::{
16 borrow::Borrow,
17 collections::{BTreeMap, BTreeSet, HashMap},
18 fmt,
19 ops::Deref,
20 sync::Arc,
21};
22
23use as_variant::as_variant;
24use async_trait::async_trait;
25use growable_bloom_filter::GrowableBloom;
26use matrix_sdk_common::{AsyncTraitDeps, ttl::TtlValue};
27use ruma::{
28 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri, OwnedRoomId,
29 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
30 api::{
31 MatrixVersion, SupportedVersions,
32 client::discovery::{
33 discover_homeserver::{
34 self, HomeserverInfo, IdentityServerInfo, RtcFocusInfo, TileServerInfo,
35 },
36 get_capabilities::v3::Capabilities,
37 },
38 },
39 events::{
40 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, EmptyStateKey, GlobalAccountDataEvent,
41 GlobalAccountDataEventContent, GlobalAccountDataEventType, RedactContent,
42 RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
43 RoomAccountDataEventType, StateEventType, StaticEventContent, StaticStateEventContent,
44 presence::PresenceEvent,
45 receipt::{Receipt, ReceiptThread, ReceiptType},
46 },
47 serde::Raw,
48};
49use serde::{Deserialize, Serialize};
50use thiserror::Error;
51use tokio::sync::{Mutex, MutexGuard};
52
53use super::{
54 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError,
55 QueuedRequest, QueuedRequestKind, RoomLoadSettings, StateChanges, StoreError,
56 send_queue::SentRequestKey,
57};
58use crate::{
59 MinimalRoomMemberEvent, RoomInfo, RoomMemberships,
60 deserialized_responses::{
61 DisplayName, RawAnySyncOrStrippedState, RawMemberEvent, RawSyncOrStrippedState,
62 },
63 store::StoredThreadSubscription,
64};
65
66#[cfg_attr(target_family = "wasm", async_trait(?Send))]
69#[cfg_attr(not(target_family = "wasm"), async_trait)]
70pub trait StateStore: AsyncTraitDeps {
71 type Error: fmt::Debug + Into<StoreError> + From<serde_json::Error>;
73
74 async fn get_kv_data(
80 &self,
81 key: StateStoreDataKey<'_>,
82 ) -> Result<Option<StateStoreDataValue>, Self::Error>;
83
84 async fn set_kv_data(
94 &self,
95 key: StateStoreDataKey<'_>,
96 value: StateStoreDataValue,
97 ) -> Result<(), Self::Error>;
98
99 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error>;
105
106 async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error>;
108
109 async fn get_presence_event(
116 &self,
117 user_id: &UserId,
118 ) -> Result<Option<Raw<PresenceEvent>>, Self::Error>;
119
120 async fn get_presence_events(
126 &self,
127 user_ids: &[OwnedUserId],
128 ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error>;
129
130 async fn get_state_event(
138 &self,
139 room_id: &RoomId,
140 event_type: StateEventType,
141 state_key: &str,
142 ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error>;
143
144 async fn get_state_events(
152 &self,
153 room_id: &RoomId,
154 event_type: StateEventType,
155 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error>;
156
157 async fn get_state_events_for_keys(
168 &self,
169 room_id: &RoomId,
170 event_type: StateEventType,
171 state_keys: &[&str],
172 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error>;
173
174 async fn get_profile(
182 &self,
183 room_id: &RoomId,
184 user_id: &UserId,
185 ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error>;
186
187 async fn get_profiles<'a>(
195 &self,
196 room_id: &RoomId,
197 user_ids: &'a [OwnedUserId],
198 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error>;
199
200 async fn get_user_ids(
203 &self,
204 room_id: &RoomId,
205 memberships: RoomMemberships,
206 ) -> Result<Vec<OwnedUserId>, Self::Error>;
207
208 async fn get_room_infos(
210 &self,
211 room_load_settings: &RoomLoadSettings,
212 ) -> Result<Vec<RoomInfo>, Self::Error>;
213
214 async fn get_users_with_display_name(
223 &self,
224 room_id: &RoomId,
225 display_name: &DisplayName,
226 ) -> Result<BTreeSet<OwnedUserId>, Self::Error>;
227
228 async fn get_users_with_display_names<'a>(
236 &self,
237 room_id: &RoomId,
238 display_names: &'a [DisplayName],
239 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error>;
240
241 async fn get_account_data_event(
247 &self,
248 event_type: GlobalAccountDataEventType,
249 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error>;
250
251 async fn get_room_account_data_event(
261 &self,
262 room_id: &RoomId,
263 event_type: RoomAccountDataEventType,
264 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error>;
265
266 async fn get_user_room_receipt_event(
279 &self,
280 room_id: &RoomId,
281 receipt_type: ReceiptType,
282 thread: ReceiptThread,
283 user_id: &UserId,
284 ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error>;
285
286 async fn get_event_room_receipt_events(
300 &self,
301 room_id: &RoomId,
302 receipt_type: ReceiptType,
303 thread: ReceiptThread,
304 event_id: &EventId,
305 ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error>;
306
307 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
313
314 async fn set_custom_value(
323 &self,
324 key: &[u8],
325 value: Vec<u8>,
326 ) -> Result<Option<Vec<u8>>, Self::Error>;
327
328 async fn set_custom_value_no_read(
342 &self,
343 key: &[u8],
344 value: Vec<u8>,
345 ) -> Result<(), Self::Error> {
346 self.set_custom_value(key, value).await.map(|_| ())
347 }
348
349 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
355
356 async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error>;
362
363 async fn save_send_queue_request(
373 &self,
374 room_id: &RoomId,
375 transaction_id: OwnedTransactionId,
376 created_at: MilliSecondsSinceUnixEpoch,
377 request: QueuedRequestKind,
378 priority: usize,
379 ) -> Result<(), Self::Error>;
380
381 async fn update_send_queue_request(
393 &self,
394 room_id: &RoomId,
395 transaction_id: &TransactionId,
396 content: QueuedRequestKind,
397 ) -> Result<bool, Self::Error>;
398
399 async fn remove_send_queue_request(
405 &self,
406 room_id: &RoomId,
407 transaction_id: &TransactionId,
408 ) -> Result<bool, Self::Error>;
409
410 async fn load_send_queue_requests(
416 &self,
417 room_id: &RoomId,
418 ) -> Result<Vec<QueuedRequest>, Self::Error>;
419
420 async fn update_send_queue_request_status(
423 &self,
424 room_id: &RoomId,
425 transaction_id: &TransactionId,
426 error: Option<QueueWedgeError>,
427 ) -> Result<(), Self::Error>;
428
429 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error>;
431
432 async fn save_dependent_queued_request(
435 &self,
436 room_id: &RoomId,
437 parent_txn_id: &TransactionId,
438 own_txn_id: ChildTransactionId,
439 created_at: MilliSecondsSinceUnixEpoch,
440 content: DependentQueuedRequestKind,
441 ) -> Result<(), Self::Error>;
442
443 async fn mark_dependent_queued_requests_as_ready(
452 &self,
453 room_id: &RoomId,
454 parent_txn_id: &TransactionId,
455 sent_parent_key: SentRequestKey,
456 ) -> Result<usize, Self::Error>;
457
458 async fn update_dependent_queued_request(
462 &self,
463 room_id: &RoomId,
464 own_transaction_id: &ChildTransactionId,
465 new_content: DependentQueuedRequestKind,
466 ) -> Result<bool, Self::Error>;
467
468 async fn remove_dependent_queued_request(
473 &self,
474 room: &RoomId,
475 own_txn_id: &ChildTransactionId,
476 ) -> Result<bool, Self::Error>;
477
478 async fn load_dependent_queued_requests(
484 &self,
485 room: &RoomId,
486 ) -> Result<Vec<DependentQueuedRequest>, Self::Error>;
487
488 async fn upsert_thread_subscriptions(
498 &self,
499 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
500 ) -> Result<(), Self::Error>;
501
502 async fn remove_thread_subscription(
506 &self,
507 room: &RoomId,
508 thread_id: &EventId,
509 ) -> Result<(), Self::Error>;
510
511 async fn load_thread_subscription(
515 &self,
516 room: &RoomId,
517 thread_id: &EventId,
518 ) -> Result<Option<StoredThreadSubscription>, Self::Error>;
519
520 async fn close(&self) -> Result<(), Self::Error>;
526
527 async fn reopen(&self) -> Result<(), Self::Error>;
530
531 #[doc(hidden)]
537 async fn optimize(&self) -> Result<(), Self::Error>;
538
539 async fn get_size(&self) -> Result<Option<usize>, Self::Error>;
541}
542
543#[cfg_attr(target_family = "wasm", async_trait(?Send))]
544#[cfg_attr(not(target_family = "wasm"), async_trait)]
545impl<T: StateStore> StateStore for &T {
546 type Error = T::Error;
547
548 async fn get_kv_data(
549 &self,
550 key: StateStoreDataKey<'_>,
551 ) -> Result<Option<StateStoreDataValue>, Self::Error> {
552 (*self).get_kv_data(key).await
553 }
554
555 async fn set_kv_data(
556 &self,
557 key: StateStoreDataKey<'_>,
558 value: StateStoreDataValue,
559 ) -> Result<(), Self::Error> {
560 (*self).set_kv_data(key, value).await
561 }
562
563 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
564 (*self).remove_kv_data(key).await
565 }
566
567 async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
568 (*self).save_changes(changes).await
569 }
570
571 async fn get_presence_event(
572 &self,
573 user_id: &UserId,
574 ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
575 (*self).get_presence_event(user_id).await
576 }
577
578 async fn get_presence_events(
579 &self,
580 user_ids: &[OwnedUserId],
581 ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
582 (*self).get_presence_events(user_ids).await
583 }
584
585 async fn get_state_event(
586 &self,
587 room_id: &RoomId,
588 event_type: StateEventType,
589 state_key: &str,
590 ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
591 (*self).get_state_event(room_id, event_type, state_key).await
592 }
593
594 async fn get_state_events(
595 &self,
596 room_id: &RoomId,
597 event_type: StateEventType,
598 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
599 (*self).get_state_events(room_id, event_type).await
600 }
601
602 async fn get_state_events_for_keys(
603 &self,
604 room_id: &RoomId,
605 event_type: StateEventType,
606 state_keys: &[&str],
607 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
608 (*self).get_state_events_for_keys(room_id, event_type, state_keys).await
609 }
610
611 async fn get_profile(
612 &self,
613 room_id: &RoomId,
614 user_id: &UserId,
615 ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
616 (*self).get_profile(room_id, user_id).await
617 }
618
619 async fn get_profiles<'a>(
620 &self,
621 room_id: &RoomId,
622 user_ids: &'a [OwnedUserId],
623 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
624 (*self).get_profiles(room_id, user_ids).await
625 }
626
627 async fn get_user_ids(
628 &self,
629 room_id: &RoomId,
630 memberships: RoomMemberships,
631 ) -> Result<Vec<OwnedUserId>, Self::Error> {
632 (*self).get_user_ids(room_id, memberships).await
633 }
634
635 async fn get_room_infos(
636 &self,
637 room_load_settings: &RoomLoadSettings,
638 ) -> Result<Vec<RoomInfo>, Self::Error> {
639 (*self).get_room_infos(room_load_settings).await
640 }
641
642 async fn get_users_with_display_name(
643 &self,
644 room_id: &RoomId,
645 display_name: &DisplayName,
646 ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
647 (*self).get_users_with_display_name(room_id, display_name).await
648 }
649
650 async fn get_users_with_display_names<'a>(
651 &self,
652 room_id: &RoomId,
653 display_names: &'a [DisplayName],
654 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
655 (*self).get_users_with_display_names(room_id, display_names).await
656 }
657
658 async fn get_account_data_event(
659 &self,
660 event_type: GlobalAccountDataEventType,
661 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
662 (*self).get_account_data_event(event_type).await
663 }
664
665 async fn get_room_account_data_event(
666 &self,
667 room_id: &RoomId,
668 event_type: RoomAccountDataEventType,
669 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
670 (*self).get_room_account_data_event(room_id, event_type).await
671 }
672
673 async fn get_user_room_receipt_event(
674 &self,
675 room_id: &RoomId,
676 receipt_type: ReceiptType,
677 thread: ReceiptThread,
678 user_id: &UserId,
679 ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
680 (*self).get_user_room_receipt_event(room_id, receipt_type, thread, user_id).await
681 }
682
683 async fn get_event_room_receipt_events(
684 &self,
685 room_id: &RoomId,
686 receipt_type: ReceiptType,
687 thread: ReceiptThread,
688 event_id: &EventId,
689 ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
690 (*self).get_event_room_receipt_events(room_id, receipt_type, thread, event_id).await
691 }
692
693 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
694 (*self).get_custom_value(key).await
695 }
696
697 async fn set_custom_value(
698 &self,
699 key: &[u8],
700 value: Vec<u8>,
701 ) -> Result<Option<Vec<u8>>, Self::Error> {
702 (*self).set_custom_value(key, value).await
703 }
704
705 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
706 (*self).remove_custom_value(key).await
707 }
708
709 async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
710 (*self).remove_room(room_id).await
711 }
712
713 async fn save_send_queue_request(
714 &self,
715 room_id: &RoomId,
716 transaction_id: OwnedTransactionId,
717 created_at: MilliSecondsSinceUnixEpoch,
718 request: QueuedRequestKind,
719 priority: usize,
720 ) -> Result<(), Self::Error> {
721 (*self)
722 .save_send_queue_request(room_id, transaction_id, created_at, request, priority)
723 .await
724 }
725
726 async fn update_send_queue_request(
727 &self,
728 room_id: &RoomId,
729 transaction_id: &TransactionId,
730 content: QueuedRequestKind,
731 ) -> Result<bool, Self::Error> {
732 (*self).update_send_queue_request(room_id, transaction_id, content).await
733 }
734
735 async fn remove_send_queue_request(
736 &self,
737 room_id: &RoomId,
738 transaction_id: &TransactionId,
739 ) -> Result<bool, Self::Error> {
740 (*self).remove_send_queue_request(room_id, transaction_id).await
741 }
742
743 async fn load_send_queue_requests(
744 &self,
745 room_id: &RoomId,
746 ) -> Result<Vec<QueuedRequest>, Self::Error> {
747 (*self).load_send_queue_requests(room_id).await
748 }
749
750 async fn update_send_queue_request_status(
751 &self,
752 room_id: &RoomId,
753 transaction_id: &TransactionId,
754 error: Option<QueueWedgeError>,
755 ) -> Result<(), Self::Error> {
756 (*self).update_send_queue_request_status(room_id, transaction_id, error).await
757 }
758
759 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
760 (*self).load_rooms_with_unsent_requests().await
761 }
762
763 async fn save_dependent_queued_request(
764 &self,
765 room_id: &RoomId,
766 parent_txn_id: &TransactionId,
767 own_txn_id: ChildTransactionId,
768 created_at: MilliSecondsSinceUnixEpoch,
769 content: DependentQueuedRequestKind,
770 ) -> Result<(), Self::Error> {
771 (*self)
772 .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
773 .await
774 }
775
776 async fn mark_dependent_queued_requests_as_ready(
777 &self,
778 room_id: &RoomId,
779 parent_txn_id: &TransactionId,
780 sent_parent_key: SentRequestKey,
781 ) -> Result<usize, Self::Error> {
782 (*self)
783 .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
784 .await
785 }
786
787 async fn update_dependent_queued_request(
788 &self,
789 room_id: &RoomId,
790 own_transaction_id: &ChildTransactionId,
791 new_content: DependentQueuedRequestKind,
792 ) -> Result<bool, Self::Error> {
793 (*self).update_dependent_queued_request(room_id, own_transaction_id, new_content).await
794 }
795
796 async fn remove_dependent_queued_request(
797 &self,
798 room: &RoomId,
799 own_txn_id: &ChildTransactionId,
800 ) -> Result<bool, Self::Error> {
801 (*self).remove_dependent_queued_request(room, own_txn_id).await
802 }
803
804 async fn load_dependent_queued_requests(
805 &self,
806 room: &RoomId,
807 ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
808 (*self).load_dependent_queued_requests(room).await
809 }
810
811 async fn upsert_thread_subscriptions(
812 &self,
813 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
814 ) -> Result<(), Self::Error> {
815 (*self).upsert_thread_subscriptions(updates).await
816 }
817
818 async fn remove_thread_subscription(
819 &self,
820 room: &RoomId,
821 thread_id: &EventId,
822 ) -> Result<(), Self::Error> {
823 (*self).remove_thread_subscription(room, thread_id).await
824 }
825
826 async fn load_thread_subscription(
827 &self,
828 room: &RoomId,
829 thread_id: &EventId,
830 ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
831 (*self).load_thread_subscription(room, thread_id).await
832 }
833
834 async fn close(&self) -> Result<(), Self::Error> {
835 (*self).close().await
836 }
837
838 async fn reopen(&self) -> Result<(), Self::Error> {
839 (*self).reopen().await
840 }
841
842 async fn optimize(&self) -> Result<(), Self::Error> {
843 (*self).optimize().await
844 }
845
846 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
847 (*self).get_size().await
848 }
849}
850
851#[cfg_attr(target_family = "wasm", async_trait(?Send))]
852#[cfg_attr(not(target_family = "wasm"), async_trait)]
853impl<T: StateStore + ?Sized> StateStore for Arc<T> {
854 type Error = T::Error;
855
856 async fn get_kv_data(
857 &self,
858 key: StateStoreDataKey<'_>,
859 ) -> Result<Option<StateStoreDataValue>, Self::Error> {
860 self.deref().get_kv_data(key).await
861 }
862
863 async fn set_kv_data(
864 &self,
865 key: StateStoreDataKey<'_>,
866 value: StateStoreDataValue,
867 ) -> Result<(), Self::Error> {
868 self.deref().set_kv_data(key, value).await
869 }
870
871 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
872 self.deref().remove_kv_data(key).await
873 }
874
875 async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
876 self.deref().save_changes(changes).await
877 }
878
879 async fn get_presence_event(
880 &self,
881 user_id: &UserId,
882 ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
883 self.deref().get_presence_event(user_id).await
884 }
885
886 async fn get_presence_events(
887 &self,
888 user_ids: &[OwnedUserId],
889 ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
890 self.deref().get_presence_events(user_ids).await
891 }
892
893 async fn get_state_event(
894 &self,
895 room_id: &RoomId,
896 event_type: StateEventType,
897 state_key: &str,
898 ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
899 self.deref().get_state_event(room_id, event_type, state_key).await
900 }
901
902 async fn get_state_events(
903 &self,
904 room_id: &RoomId,
905 event_type: StateEventType,
906 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
907 self.deref().get_state_events(room_id, event_type).await
908 }
909
910 async fn get_state_events_for_keys(
911 &self,
912 room_id: &RoomId,
913 event_type: StateEventType,
914 state_keys: &[&str],
915 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
916 self.deref().get_state_events_for_keys(room_id, event_type, state_keys).await
917 }
918
919 async fn get_profile(
920 &self,
921 room_id: &RoomId,
922 user_id: &UserId,
923 ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
924 self.deref().get_profile(room_id, user_id).await
925 }
926
927 async fn get_profiles<'a>(
928 &self,
929 room_id: &RoomId,
930 user_ids: &'a [OwnedUserId],
931 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
932 self.deref().get_profiles(room_id, user_ids).await
933 }
934
935 async fn get_user_ids(
936 &self,
937 room_id: &RoomId,
938 memberships: RoomMemberships,
939 ) -> Result<Vec<OwnedUserId>, Self::Error> {
940 self.deref().get_user_ids(room_id, memberships).await
941 }
942
943 async fn get_room_infos(
944 &self,
945 room_load_settings: &RoomLoadSettings,
946 ) -> Result<Vec<RoomInfo>, Self::Error> {
947 self.deref().get_room_infos(room_load_settings).await
948 }
949
950 async fn get_users_with_display_name(
951 &self,
952 room_id: &RoomId,
953 display_name: &DisplayName,
954 ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
955 self.deref().get_users_with_display_name(room_id, display_name).await
956 }
957
958 async fn get_users_with_display_names<'a>(
959 &self,
960 room_id: &RoomId,
961 display_names: &'a [DisplayName],
962 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
963 self.deref().get_users_with_display_names(room_id, display_names).await
964 }
965
966 async fn get_account_data_event(
967 &self,
968 event_type: GlobalAccountDataEventType,
969 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
970 self.deref().get_account_data_event(event_type).await
971 }
972
973 async fn get_room_account_data_event(
974 &self,
975 room_id: &RoomId,
976 event_type: RoomAccountDataEventType,
977 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
978 self.deref().get_room_account_data_event(room_id, event_type).await
979 }
980
981 async fn get_user_room_receipt_event(
982 &self,
983 room_id: &RoomId,
984 receipt_type: ReceiptType,
985 thread: ReceiptThread,
986 user_id: &UserId,
987 ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
988 self.deref().get_user_room_receipt_event(room_id, receipt_type, thread, user_id).await
989 }
990
991 async fn get_event_room_receipt_events(
992 &self,
993 room_id: &RoomId,
994 receipt_type: ReceiptType,
995 thread: ReceiptThread,
996 event_id: &EventId,
997 ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
998 self.deref().get_event_room_receipt_events(room_id, receipt_type, thread, event_id).await
999 }
1000
1001 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1002 self.deref().get_custom_value(key).await
1003 }
1004
1005 async fn set_custom_value(
1006 &self,
1007 key: &[u8],
1008 value: Vec<u8>,
1009 ) -> Result<Option<Vec<u8>>, Self::Error> {
1010 self.deref().set_custom_value(key, value).await
1011 }
1012
1013 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1014 self.deref().remove_custom_value(key).await
1015 }
1016
1017 async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
1018 self.deref().remove_room(room_id).await
1019 }
1020
1021 async fn save_send_queue_request(
1022 &self,
1023 room_id: &RoomId,
1024 transaction_id: OwnedTransactionId,
1025 created_at: MilliSecondsSinceUnixEpoch,
1026 request: QueuedRequestKind,
1027 priority: usize,
1028 ) -> Result<(), Self::Error> {
1029 self.deref()
1030 .save_send_queue_request(room_id, transaction_id, created_at, request, priority)
1031 .await
1032 }
1033
1034 async fn update_send_queue_request(
1035 &self,
1036 room_id: &RoomId,
1037 transaction_id: &TransactionId,
1038 content: QueuedRequestKind,
1039 ) -> Result<bool, Self::Error> {
1040 self.deref().update_send_queue_request(room_id, transaction_id, content).await
1041 }
1042
1043 async fn remove_send_queue_request(
1044 &self,
1045 room_id: &RoomId,
1046 transaction_id: &TransactionId,
1047 ) -> Result<bool, Self::Error> {
1048 self.deref().remove_send_queue_request(room_id, transaction_id).await
1049 }
1050
1051 async fn load_send_queue_requests(
1052 &self,
1053 room_id: &RoomId,
1054 ) -> Result<Vec<QueuedRequest>, Self::Error> {
1055 self.deref().load_send_queue_requests(room_id).await
1056 }
1057
1058 async fn update_send_queue_request_status(
1059 &self,
1060 room_id: &RoomId,
1061 transaction_id: &TransactionId,
1062 error: Option<QueueWedgeError>,
1063 ) -> Result<(), Self::Error> {
1064 self.deref().update_send_queue_request_status(room_id, transaction_id, error).await
1065 }
1066
1067 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1068 self.deref().load_rooms_with_unsent_requests().await
1069 }
1070
1071 async fn save_dependent_queued_request(
1072 &self,
1073 room_id: &RoomId,
1074 parent_txn_id: &TransactionId,
1075 own_txn_id: ChildTransactionId,
1076 created_at: MilliSecondsSinceUnixEpoch,
1077 content: DependentQueuedRequestKind,
1078 ) -> Result<(), Self::Error> {
1079 self.deref()
1080 .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
1081 .await
1082 }
1083
1084 async fn mark_dependent_queued_requests_as_ready(
1085 &self,
1086 room_id: &RoomId,
1087 parent_txn_id: &TransactionId,
1088 sent_parent_key: SentRequestKey,
1089 ) -> Result<usize, Self::Error> {
1090 self.deref()
1091 .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
1092 .await
1093 }
1094
1095 async fn update_dependent_queued_request(
1096 &self,
1097 room_id: &RoomId,
1098 own_transaction_id: &ChildTransactionId,
1099 new_content: DependentQueuedRequestKind,
1100 ) -> Result<bool, Self::Error> {
1101 self.deref().update_dependent_queued_request(room_id, own_transaction_id, new_content).await
1102 }
1103
1104 async fn remove_dependent_queued_request(
1105 &self,
1106 room: &RoomId,
1107 own_txn_id: &ChildTransactionId,
1108 ) -> Result<bool, Self::Error> {
1109 self.deref().remove_dependent_queued_request(room, own_txn_id).await
1110 }
1111
1112 async fn load_dependent_queued_requests(
1113 &self,
1114 room: &RoomId,
1115 ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
1116 self.deref().load_dependent_queued_requests(room).await
1117 }
1118
1119 async fn upsert_thread_subscriptions(
1120 &self,
1121 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1122 ) -> Result<(), Self::Error> {
1123 self.deref().upsert_thread_subscriptions(updates).await
1124 }
1125
1126 async fn remove_thread_subscription(
1127 &self,
1128 room: &RoomId,
1129 thread_id: &EventId,
1130 ) -> Result<(), Self::Error> {
1131 self.deref().remove_thread_subscription(room, thread_id).await
1132 }
1133
1134 async fn load_thread_subscription(
1135 &self,
1136 room: &RoomId,
1137 thread_id: &EventId,
1138 ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
1139 self.deref().load_thread_subscription(room, thread_id).await
1140 }
1141
1142 async fn close(&self) -> Result<(), Self::Error> {
1143 self.deref().close().await
1144 }
1145
1146 async fn reopen(&self) -> Result<(), Self::Error> {
1147 self.deref().reopen().await
1148 }
1149
1150 async fn optimize(&self) -> Result<(), Self::Error> {
1151 self.deref().optimize().await
1152 }
1153
1154 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1155 self.deref().get_size().await
1156 }
1157}
1158
1159#[repr(transparent)]
1160struct EraseStateStoreError<T>(T);
1161
1162#[cfg(not(tarpaulin_include))]
1163impl<T: fmt::Debug> fmt::Debug for EraseStateStoreError<T> {
1164 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1165 self.0.fmt(f)
1166 }
1167}
1168
1169#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1170#[cfg_attr(not(target_family = "wasm"), async_trait)]
1171impl<T: StateStore> StateStore for EraseStateStoreError<T> {
1172 type Error = StoreError;
1173
1174 async fn get_kv_data(
1175 &self,
1176 key: StateStoreDataKey<'_>,
1177 ) -> Result<Option<StateStoreDataValue>, Self::Error> {
1178 self.0.get_kv_data(key).await.map_err(Into::into)
1179 }
1180
1181 async fn set_kv_data(
1182 &self,
1183 key: StateStoreDataKey<'_>,
1184 value: StateStoreDataValue,
1185 ) -> Result<(), Self::Error> {
1186 self.0.set_kv_data(key, value).await.map_err(Into::into)
1187 }
1188
1189 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
1190 self.0.remove_kv_data(key).await.map_err(Into::into)
1191 }
1192
1193 async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
1194 self.0.save_changes(changes).await.map_err(Into::into)
1195 }
1196
1197 async fn get_presence_event(
1198 &self,
1199 user_id: &UserId,
1200 ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
1201 self.0.get_presence_event(user_id).await.map_err(Into::into)
1202 }
1203
1204 async fn get_presence_events(
1205 &self,
1206 user_ids: &[OwnedUserId],
1207 ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
1208 self.0.get_presence_events(user_ids).await.map_err(Into::into)
1209 }
1210
1211 async fn get_state_event(
1212 &self,
1213 room_id: &RoomId,
1214 event_type: StateEventType,
1215 state_key: &str,
1216 ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
1217 self.0.get_state_event(room_id, event_type, state_key).await.map_err(Into::into)
1218 }
1219
1220 async fn get_state_events(
1221 &self,
1222 room_id: &RoomId,
1223 event_type: StateEventType,
1224 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1225 self.0.get_state_events(room_id, event_type).await.map_err(Into::into)
1226 }
1227
1228 async fn get_state_events_for_keys(
1229 &self,
1230 room_id: &RoomId,
1231 event_type: StateEventType,
1232 state_keys: &[&str],
1233 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1234 self.0.get_state_events_for_keys(room_id, event_type, state_keys).await.map_err(Into::into)
1235 }
1236
1237 async fn get_profile(
1238 &self,
1239 room_id: &RoomId,
1240 user_id: &UserId,
1241 ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
1242 self.0.get_profile(room_id, user_id).await.map_err(Into::into)
1243 }
1244
1245 async fn get_profiles<'a>(
1246 &self,
1247 room_id: &RoomId,
1248 user_ids: &'a [OwnedUserId],
1249 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
1250 self.0.get_profiles(room_id, user_ids).await.map_err(Into::into)
1251 }
1252
1253 async fn get_user_ids(
1254 &self,
1255 room_id: &RoomId,
1256 memberships: RoomMemberships,
1257 ) -> Result<Vec<OwnedUserId>, Self::Error> {
1258 self.0.get_user_ids(room_id, memberships).await.map_err(Into::into)
1259 }
1260
1261 async fn get_room_infos(
1262 &self,
1263 room_load_settings: &RoomLoadSettings,
1264 ) -> Result<Vec<RoomInfo>, Self::Error> {
1265 self.0.get_room_infos(room_load_settings).await.map_err(Into::into)
1266 }
1267
1268 async fn get_users_with_display_name(
1269 &self,
1270 room_id: &RoomId,
1271 display_name: &DisplayName,
1272 ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
1273 self.0.get_users_with_display_name(room_id, display_name).await.map_err(Into::into)
1274 }
1275
1276 async fn get_users_with_display_names<'a>(
1277 &self,
1278 room_id: &RoomId,
1279 display_names: &'a [DisplayName],
1280 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
1281 self.0.get_users_with_display_names(room_id, display_names).await.map_err(Into::into)
1282 }
1283
1284 async fn get_account_data_event(
1285 &self,
1286 event_type: GlobalAccountDataEventType,
1287 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
1288 self.0.get_account_data_event(event_type).await.map_err(Into::into)
1289 }
1290
1291 async fn get_room_account_data_event(
1292 &self,
1293 room_id: &RoomId,
1294 event_type: RoomAccountDataEventType,
1295 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
1296 self.0.get_room_account_data_event(room_id, event_type).await.map_err(Into::into)
1297 }
1298
1299 async fn get_user_room_receipt_event(
1300 &self,
1301 room_id: &RoomId,
1302 receipt_type: ReceiptType,
1303 thread: ReceiptThread,
1304 user_id: &UserId,
1305 ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
1306 self.0
1307 .get_user_room_receipt_event(room_id, receipt_type, thread, user_id)
1308 .await
1309 .map_err(Into::into)
1310 }
1311
1312 async fn get_event_room_receipt_events(
1313 &self,
1314 room_id: &RoomId,
1315 receipt_type: ReceiptType,
1316 thread: ReceiptThread,
1317 event_id: &EventId,
1318 ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
1319 self.0
1320 .get_event_room_receipt_events(room_id, receipt_type, thread, event_id)
1321 .await
1322 .map_err(Into::into)
1323 }
1324
1325 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1326 self.0.get_custom_value(key).await.map_err(Into::into)
1327 }
1328
1329 async fn set_custom_value(
1330 &self,
1331 key: &[u8],
1332 value: Vec<u8>,
1333 ) -> Result<Option<Vec<u8>>, Self::Error> {
1334 self.0.set_custom_value(key, value).await.map_err(Into::into)
1335 }
1336
1337 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1338 self.0.remove_custom_value(key).await.map_err(Into::into)
1339 }
1340
1341 async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
1342 self.0.remove_room(room_id).await.map_err(Into::into)
1343 }
1344
1345 async fn save_send_queue_request(
1346 &self,
1347 room_id: &RoomId,
1348 transaction_id: OwnedTransactionId,
1349 created_at: MilliSecondsSinceUnixEpoch,
1350 content: QueuedRequestKind,
1351 priority: usize,
1352 ) -> Result<(), Self::Error> {
1353 self.0
1354 .save_send_queue_request(room_id, transaction_id, created_at, content, priority)
1355 .await
1356 .map_err(Into::into)
1357 }
1358
1359 async fn update_send_queue_request(
1360 &self,
1361 room_id: &RoomId,
1362 transaction_id: &TransactionId,
1363 content: QueuedRequestKind,
1364 ) -> Result<bool, Self::Error> {
1365 self.0.update_send_queue_request(room_id, transaction_id, content).await.map_err(Into::into)
1366 }
1367
1368 async fn remove_send_queue_request(
1369 &self,
1370 room_id: &RoomId,
1371 transaction_id: &TransactionId,
1372 ) -> Result<bool, Self::Error> {
1373 self.0.remove_send_queue_request(room_id, transaction_id).await.map_err(Into::into)
1374 }
1375
1376 async fn load_send_queue_requests(
1377 &self,
1378 room_id: &RoomId,
1379 ) -> Result<Vec<QueuedRequest>, Self::Error> {
1380 self.0.load_send_queue_requests(room_id).await.map_err(Into::into)
1381 }
1382
1383 async fn update_send_queue_request_status(
1384 &self,
1385 room_id: &RoomId,
1386 transaction_id: &TransactionId,
1387 error: Option<QueueWedgeError>,
1388 ) -> Result<(), Self::Error> {
1389 self.0
1390 .update_send_queue_request_status(room_id, transaction_id, error)
1391 .await
1392 .map_err(Into::into)
1393 }
1394
1395 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1396 self.0.load_rooms_with_unsent_requests().await.map_err(Into::into)
1397 }
1398
1399 async fn save_dependent_queued_request(
1400 &self,
1401 room_id: &RoomId,
1402 parent_txn_id: &TransactionId,
1403 own_txn_id: ChildTransactionId,
1404 created_at: MilliSecondsSinceUnixEpoch,
1405 content: DependentQueuedRequestKind,
1406 ) -> Result<(), Self::Error> {
1407 self.0
1408 .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
1409 .await
1410 .map_err(Into::into)
1411 }
1412
1413 async fn mark_dependent_queued_requests_as_ready(
1414 &self,
1415 room_id: &RoomId,
1416 parent_txn_id: &TransactionId,
1417 sent_parent_key: SentRequestKey,
1418 ) -> Result<usize, Self::Error> {
1419 self.0
1420 .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
1421 .await
1422 .map_err(Into::into)
1423 }
1424
1425 async fn remove_dependent_queued_request(
1426 &self,
1427 room_id: &RoomId,
1428 own_txn_id: &ChildTransactionId,
1429 ) -> Result<bool, Self::Error> {
1430 self.0.remove_dependent_queued_request(room_id, own_txn_id).await.map_err(Into::into)
1431 }
1432
1433 async fn load_dependent_queued_requests(
1434 &self,
1435 room_id: &RoomId,
1436 ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
1437 self.0.load_dependent_queued_requests(room_id).await.map_err(Into::into)
1438 }
1439
1440 async fn update_dependent_queued_request(
1441 &self,
1442 room_id: &RoomId,
1443 own_transaction_id: &ChildTransactionId,
1444 new_content: DependentQueuedRequestKind,
1445 ) -> Result<bool, Self::Error> {
1446 self.0
1447 .update_dependent_queued_request(room_id, own_transaction_id, new_content)
1448 .await
1449 .map_err(Into::into)
1450 }
1451
1452 async fn upsert_thread_subscriptions(
1453 &self,
1454 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1455 ) -> Result<(), Self::Error> {
1456 self.0.upsert_thread_subscriptions(updates).await.map_err(Into::into)
1457 }
1458
1459 async fn load_thread_subscription(
1460 &self,
1461 room: &RoomId,
1462 thread_id: &EventId,
1463 ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
1464 self.0.load_thread_subscription(room, thread_id).await.map_err(Into::into)
1465 }
1466
1467 async fn remove_thread_subscription(
1468 &self,
1469 room: &RoomId,
1470 thread_id: &EventId,
1471 ) -> Result<(), Self::Error> {
1472 self.0.remove_thread_subscription(room, thread_id).await.map_err(Into::into)
1473 }
1474
1475 async fn close(&self) -> Result<(), Self::Error> {
1476 self.0.close().await.map_err(Into::into)
1477 }
1478
1479 async fn reopen(&self) -> Result<(), Self::Error> {
1480 self.0.reopen().await.map_err(Into::into)
1481 }
1482
1483 async fn optimize(&self) -> Result<(), Self::Error> {
1484 self.0.optimize().await.map_err(Into::into)
1485 }
1486
1487 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1488 self.0.get_size().await.map_err(Into::into)
1489 }
1490}
1491
1492#[derive(Debug, Clone)]
1495pub struct SaveLockedStateStore<T = Arc<DynStateStore>> {
1496 store: T,
1497 lock: Arc<Mutex<()>>,
1498}
1499
1500#[derive(Debug, Error)]
1504#[error("a mutex guard was provided, but it does not reference the correct mutex")]
1505pub struct IncorrectMutexGuardError;
1506
1507impl From<IncorrectMutexGuardError> for StoreError {
1508 fn from(value: IncorrectMutexGuardError) -> Self {
1509 Self::backend(value)
1510 }
1511}
1512
1513impl<T> SaveLockedStateStore<T> {
1514 pub fn new(store: T) -> Self {
1516 Self { store, lock: Arc::new(Mutex::new(())) }
1517 }
1518
1519 pub fn lock(&self) -> &Mutex<()> {
1522 self.lock.as_ref()
1523 }
1524}
1525
1526impl<T: StateStore> SaveLockedStateStore<T> {
1527 pub async fn save_changes_with_guard(
1532 &self,
1533 guard: &MutexGuard<'_, ()>,
1534 changes: &StateChanges,
1535 ) -> Result<(), StoreError> {
1536 if !std::ptr::eq(MutexGuard::mutex(guard), self.lock()) {
1537 Err(IncorrectMutexGuardError.into())
1538 } else {
1539 self.store.save_changes(changes).await.map_err(Into::into)
1540 }
1541 }
1542}
1543
1544#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1545#[cfg_attr(not(target_family = "wasm"), async_trait)]
1546impl<T: StateStore> StateStore for SaveLockedStateStore<T> {
1547 type Error = T::Error;
1548
1549 async fn get_kv_data(
1550 &self,
1551 key: StateStoreDataKey<'_>,
1552 ) -> Result<Option<StateStoreDataValue>, Self::Error> {
1553 self.store.get_kv_data(key).await
1554 }
1555
1556 async fn set_kv_data(
1557 &self,
1558 key: StateStoreDataKey<'_>,
1559 value: StateStoreDataValue,
1560 ) -> Result<(), Self::Error> {
1561 self.store.set_kv_data(key, value).await
1562 }
1563
1564 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
1565 self.store.remove_kv_data(key).await
1566 }
1567
1568 async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
1569 let _guard = self.lock.lock().await;
1570 self.store.save_changes(changes).await
1571 }
1572
1573 async fn get_presence_event(
1574 &self,
1575 user_id: &UserId,
1576 ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
1577 self.store.get_presence_event(user_id).await
1578 }
1579
1580 async fn get_presence_events(
1581 &self,
1582 user_ids: &[OwnedUserId],
1583 ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
1584 self.store.get_presence_events(user_ids).await
1585 }
1586
1587 async fn get_state_event(
1588 &self,
1589 room_id: &RoomId,
1590 event_type: StateEventType,
1591 state_key: &str,
1592 ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
1593 self.store.get_state_event(room_id, event_type, state_key).await
1594 }
1595
1596 async fn get_state_events(
1597 &self,
1598 room_id: &RoomId,
1599 event_type: StateEventType,
1600 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1601 self.store.get_state_events(room_id, event_type).await
1602 }
1603
1604 async fn get_state_events_for_keys(
1605 &self,
1606 room_id: &RoomId,
1607 event_type: StateEventType,
1608 state_keys: &[&str],
1609 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1610 self.store.get_state_events_for_keys(room_id, event_type, state_keys).await
1611 }
1612
1613 async fn get_profile(
1614 &self,
1615 room_id: &RoomId,
1616 user_id: &UserId,
1617 ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
1618 self.store.get_profile(room_id, user_id).await
1619 }
1620
1621 async fn get_profiles<'a>(
1622 &self,
1623 room_id: &RoomId,
1624 user_ids: &'a [OwnedUserId],
1625 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
1626 self.store.get_profiles(room_id, user_ids).await
1627 }
1628
1629 async fn get_user_ids(
1630 &self,
1631 room_id: &RoomId,
1632 memberships: RoomMemberships,
1633 ) -> Result<Vec<OwnedUserId>, Self::Error> {
1634 self.store.get_user_ids(room_id, memberships).await
1635 }
1636
1637 async fn get_room_infos(
1638 &self,
1639 room_load_settings: &RoomLoadSettings,
1640 ) -> Result<Vec<RoomInfo>, Self::Error> {
1641 self.store.get_room_infos(room_load_settings).await
1642 }
1643
1644 async fn get_users_with_display_name(
1645 &self,
1646 room_id: &RoomId,
1647 display_name: &DisplayName,
1648 ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
1649 self.store.get_users_with_display_name(room_id, display_name).await
1650 }
1651
1652 async fn get_users_with_display_names<'a>(
1653 &self,
1654 room_id: &RoomId,
1655 display_names: &'a [DisplayName],
1656 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
1657 self.store.get_users_with_display_names(room_id, display_names).await
1658 }
1659
1660 async fn get_account_data_event(
1661 &self,
1662 event_type: GlobalAccountDataEventType,
1663 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
1664 self.store.get_account_data_event(event_type).await
1665 }
1666
1667 async fn get_room_account_data_event(
1668 &self,
1669 room_id: &RoomId,
1670 event_type: RoomAccountDataEventType,
1671 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
1672 self.store.get_room_account_data_event(room_id, event_type).await
1673 }
1674
1675 async fn get_user_room_receipt_event(
1676 &self,
1677 room_id: &RoomId,
1678 receipt_type: ReceiptType,
1679 thread: ReceiptThread,
1680 user_id: &UserId,
1681 ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
1682 self.store.get_user_room_receipt_event(room_id, receipt_type, thread, user_id).await
1683 }
1684
1685 async fn get_event_room_receipt_events(
1686 &self,
1687 room_id: &RoomId,
1688 receipt_type: ReceiptType,
1689 thread: ReceiptThread,
1690 event_id: &EventId,
1691 ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
1692 self.store.get_event_room_receipt_events(room_id, receipt_type, thread, event_id).await
1693 }
1694
1695 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1696 self.store.get_custom_value(key).await
1697 }
1698
1699 async fn set_custom_value(
1700 &self,
1701 key: &[u8],
1702 value: Vec<u8>,
1703 ) -> Result<Option<Vec<u8>>, Self::Error> {
1704 self.store.set_custom_value(key, value).await
1705 }
1706
1707 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1708 self.store.remove_custom_value(key).await
1709 }
1710
1711 async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
1712 self.store.remove_room(room_id).await
1713 }
1714
1715 async fn save_send_queue_request(
1716 &self,
1717 room_id: &RoomId,
1718 transaction_id: OwnedTransactionId,
1719 created_at: MilliSecondsSinceUnixEpoch,
1720 request: QueuedRequestKind,
1721 priority: usize,
1722 ) -> Result<(), Self::Error> {
1723 self.store
1724 .save_send_queue_request(room_id, transaction_id, created_at, request, priority)
1725 .await
1726 }
1727
1728 async fn update_send_queue_request(
1729 &self,
1730 room_id: &RoomId,
1731 transaction_id: &TransactionId,
1732 content: QueuedRequestKind,
1733 ) -> Result<bool, Self::Error> {
1734 self.store.update_send_queue_request(room_id, transaction_id, content).await
1735 }
1736
1737 async fn remove_send_queue_request(
1738 &self,
1739 room_id: &RoomId,
1740 transaction_id: &TransactionId,
1741 ) -> Result<bool, Self::Error> {
1742 self.store.remove_send_queue_request(room_id, transaction_id).await
1743 }
1744
1745 async fn load_send_queue_requests(
1746 &self,
1747 room_id: &RoomId,
1748 ) -> Result<Vec<QueuedRequest>, Self::Error> {
1749 self.store.load_send_queue_requests(room_id).await
1750 }
1751
1752 async fn update_send_queue_request_status(
1753 &self,
1754 room_id: &RoomId,
1755 transaction_id: &TransactionId,
1756 error: Option<QueueWedgeError>,
1757 ) -> Result<(), Self::Error> {
1758 self.store.update_send_queue_request_status(room_id, transaction_id, error).await
1759 }
1760
1761 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1762 self.store.load_rooms_with_unsent_requests().await
1763 }
1764
1765 async fn save_dependent_queued_request(
1766 &self,
1767 room_id: &RoomId,
1768 parent_txn_id: &TransactionId,
1769 own_txn_id: ChildTransactionId,
1770 created_at: MilliSecondsSinceUnixEpoch,
1771 content: DependentQueuedRequestKind,
1772 ) -> Result<(), Self::Error> {
1773 self.store
1774 .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
1775 .await
1776 }
1777
1778 async fn mark_dependent_queued_requests_as_ready(
1779 &self,
1780 room_id: &RoomId,
1781 parent_txn_id: &TransactionId,
1782 sent_parent_key: SentRequestKey,
1783 ) -> Result<usize, Self::Error> {
1784 self.store
1785 .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
1786 .await
1787 }
1788
1789 async fn update_dependent_queued_request(
1790 &self,
1791 room_id: &RoomId,
1792 own_transaction_id: &ChildTransactionId,
1793 new_content: DependentQueuedRequestKind,
1794 ) -> Result<bool, Self::Error> {
1795 self.store.update_dependent_queued_request(room_id, own_transaction_id, new_content).await
1796 }
1797
1798 async fn remove_dependent_queued_request(
1799 &self,
1800 room: &RoomId,
1801 own_txn_id: &ChildTransactionId,
1802 ) -> Result<bool, Self::Error> {
1803 self.store.remove_dependent_queued_request(room, own_txn_id).await
1804 }
1805
1806 async fn load_dependent_queued_requests(
1807 &self,
1808 room: &RoomId,
1809 ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
1810 self.store.load_dependent_queued_requests(room).await
1811 }
1812
1813 async fn upsert_thread_subscriptions(
1814 &self,
1815 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1816 ) -> Result<(), Self::Error> {
1817 self.store.upsert_thread_subscriptions(updates).await
1818 }
1819
1820 async fn load_thread_subscription(
1821 &self,
1822 room: &RoomId,
1823 thread_id: &EventId,
1824 ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
1825 self.store.load_thread_subscription(room, thread_id).await
1826 }
1827
1828 async fn remove_thread_subscription(
1829 &self,
1830 room: &RoomId,
1831 thread_id: &EventId,
1832 ) -> Result<(), Self::Error> {
1833 self.store.remove_thread_subscription(room, thread_id).await
1834 }
1835
1836 async fn close(&self) -> Result<(), Self::Error> {
1837 self.store.close().await
1838 }
1839
1840 async fn reopen(&self) -> Result<(), Self::Error> {
1841 self.store.reopen().await
1842 }
1843
1844 async fn optimize(&self) -> Result<(), Self::Error> {
1845 self.store.optimize().await
1846 }
1847
1848 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1849 self.store.get_size().await
1850 }
1851}
1852
1853#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1855#[cfg_attr(not(target_family = "wasm"), async_trait)]
1856pub trait StateStoreExt: StateStore {
1857 async fn get_state_event_static<C>(
1863 &self,
1864 room_id: &RoomId,
1865 ) -> Result<Option<RawSyncOrStrippedState<C>>, Self::Error>
1866 where
1867 C: StaticEventContent<IsPrefix = ruma::events::False>
1868 + StaticStateEventContent<StateKey = EmptyStateKey>
1869 + RedactContent,
1870 C::Redacted: RedactedStateEventContent,
1871 {
1872 Ok(self.get_state_event(room_id, C::TYPE.into(), "").await?.map(|raw| raw.cast()))
1873 }
1874
1875 async fn get_state_event_static_for_key<C, K>(
1881 &self,
1882 room_id: &RoomId,
1883 state_key: &K,
1884 ) -> Result<Option<RawSyncOrStrippedState<C>>, Self::Error>
1885 where
1886 C: StaticEventContent<IsPrefix = ruma::events::False>
1887 + StaticStateEventContent
1888 + RedactContent,
1889 C::StateKey: Borrow<K>,
1890 C::Redacted: RedactedStateEventContent,
1891 K: AsRef<str> + ?Sized + Sync,
1892 {
1893 Ok(self
1894 .get_state_event(room_id, C::TYPE.into(), state_key.as_ref())
1895 .await?
1896 .map(|raw| raw.cast()))
1897 }
1898
1899 async fn get_state_events_static<C>(
1905 &self,
1906 room_id: &RoomId,
1907 ) -> Result<Vec<RawSyncOrStrippedState<C>>, Self::Error>
1908 where
1909 C: StaticEventContent<IsPrefix = ruma::events::False>
1910 + StaticStateEventContent
1911 + RedactContent,
1912 C::Redacted: RedactedStateEventContent,
1913 {
1914 Ok(self
1916 .get_state_events(room_id, C::TYPE.into())
1917 .await?
1918 .into_iter()
1919 .map(|raw| raw.cast())
1920 .collect())
1921 }
1922
1923 async fn get_state_events_for_keys_static<'a, C, K, I>(
1932 &self,
1933 room_id: &RoomId,
1934 state_keys: I,
1935 ) -> Result<Vec<RawSyncOrStrippedState<C>>, Self::Error>
1936 where
1937 C: StaticEventContent<IsPrefix = ruma::events::False>
1938 + StaticStateEventContent
1939 + RedactContent,
1940 C::StateKey: Borrow<K>,
1941 C::Redacted: RedactedStateEventContent,
1942 K: AsRef<str> + Sized + Sync + 'a,
1943 I: IntoIterator<Item = &'a K> + Send,
1944 I::IntoIter: Send,
1945 {
1946 Ok(self
1947 .get_state_events_for_keys(
1948 room_id,
1949 C::TYPE.into(),
1950 &state_keys.into_iter().map(|k| k.as_ref()).collect::<Vec<_>>(),
1951 )
1952 .await?
1953 .into_iter()
1954 .map(|raw| raw.cast())
1955 .collect())
1956 }
1957
1958 async fn get_account_data_event_static<C>(
1960 &self,
1961 ) -> Result<Option<Raw<GlobalAccountDataEvent<C>>>, Self::Error>
1962 where
1963 C: StaticEventContent<IsPrefix = ruma::events::False> + GlobalAccountDataEventContent,
1964 {
1965 Ok(self.get_account_data_event(C::TYPE.into()).await?.map(Raw::cast_unchecked))
1966 }
1967
1968 async fn get_room_account_data_event_static<C>(
1976 &self,
1977 room_id: &RoomId,
1978 ) -> Result<Option<Raw<RoomAccountDataEvent<C>>>, Self::Error>
1979 where
1980 C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
1981 {
1982 Ok(self
1983 .get_room_account_data_event(room_id, C::TYPE.into())
1984 .await?
1985 .map(Raw::cast_unchecked))
1986 }
1987
1988 async fn get_member_event(
1996 &self,
1997 room_id: &RoomId,
1998 state_key: &UserId,
1999 ) -> Result<Option<RawMemberEvent>, Self::Error> {
2000 self.get_state_event_static_for_key(room_id, state_key).await
2001 }
2002}
2003
2004#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2005#[cfg_attr(not(target_family = "wasm"), async_trait)]
2006impl<T: StateStore + ?Sized> StateStoreExt for T {}
2007
2008pub type DynStateStore = dyn StateStore<Error = StoreError>;
2010
2011pub trait IntoStateStore {
2017 #[doc(hidden)]
2018 fn into_state_store(self) -> Arc<DynStateStore>;
2019}
2020
2021impl<T> IntoStateStore for T
2022where
2023 T: StateStore + Sized + 'static,
2024{
2025 fn into_state_store(self) -> Arc<DynStateStore> {
2026 Arc::new(EraseStateStoreError(self))
2027 }
2028}
2029
2030#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
2032pub struct SupportedVersionsResponse {
2033 pub versions: Vec<String>,
2035
2036 pub unstable_features: BTreeMap<String, bool>,
2038}
2039
2040impl SupportedVersionsResponse {
2041 pub fn supported_versions(&self) -> SupportedVersions {
2047 let mut supported_versions =
2048 SupportedVersions::from_parts(&self.versions, &self.unstable_features);
2049
2050 if supported_versions.versions.is_empty() {
2053 supported_versions.versions.insert(MatrixVersion::V1_0);
2054 }
2055
2056 supported_versions
2057 }
2058}
2059
2060#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
2061pub struct WellKnownResponse {
2063 pub homeserver: HomeserverInfo,
2065
2066 pub identity_server: Option<IdentityServerInfo>,
2068
2069 pub tile_server: Option<TileServerInfo>,
2071
2072 pub rtc_foci: Vec<RtcFocusInfo>,
2074}
2075
2076impl From<discover_homeserver::Response> for WellKnownResponse {
2077 fn from(response: discover_homeserver::Response) -> Self {
2078 Self {
2079 homeserver: response.homeserver,
2080 identity_server: response.identity_server,
2081 tile_server: response.tile_server,
2082 rtc_foci: response.rtc_foci,
2083 }
2084 }
2085}
2086
2087#[derive(Debug, Clone)]
2089pub enum StateStoreDataValue {
2090 SyncToken(String),
2092
2093 SupportedVersions(TtlValue<SupportedVersionsResponse>),
2095
2096 WellKnown(TtlValue<Option<WellKnownResponse>>),
2098
2099 Filter(String),
2101
2102 UserAvatarUrl(OwnedMxcUri),
2104
2105 RecentlyVisitedRooms(Vec<OwnedRoomId>),
2107
2108 UtdHookManagerData(GrowableBloom),
2111
2112 OneTimeKeyAlreadyUploaded,
2115
2116 ComposerDraft(ComposerDraft),
2121
2122 SeenKnockRequests(BTreeMap<OwnedEventId, OwnedUserId>),
2124
2125 ThreadSubscriptionsCatchupTokens(Vec<ThreadSubscriptionCatchupToken>),
2130
2131 HomeserverCapabilities(TtlValue<Capabilities>),
2133}
2134
2135#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2145pub struct ThreadSubscriptionCatchupToken {
2146 pub from: String,
2152
2153 pub to: Option<String>,
2159}
2160
2161#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2163pub struct ComposerDraft {
2164 pub plain_text: String,
2166 pub html_text: Option<String>,
2169 pub draft_type: ComposerDraftType,
2171 #[serde(default)]
2173 pub attachments: Vec<DraftAttachment>,
2174}
2175
2176#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2178pub struct DraftAttachment {
2179 pub filename: String,
2181 pub content: DraftAttachmentContent,
2183}
2184
2185#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2187#[serde(tag = "type")]
2188pub enum DraftAttachmentContent {
2189 Image {
2191 data: Vec<u8>,
2193 mimetype: Option<String>,
2195 size: Option<u64>,
2197 width: Option<u64>,
2199 height: Option<u64>,
2201 blurhash: Option<String>,
2203 thumbnail: Option<DraftThumbnail>,
2205 },
2206 Video {
2208 data: Vec<u8>,
2210 mimetype: Option<String>,
2212 size: Option<u64>,
2214 width: Option<u64>,
2216 height: Option<u64>,
2218 duration: Option<std::time::Duration>,
2220 blurhash: Option<String>,
2222 thumbnail: Option<DraftThumbnail>,
2224 },
2225 Audio {
2227 data: Vec<u8>,
2229 mimetype: Option<String>,
2231 size: Option<u64>,
2233 duration: Option<std::time::Duration>,
2235 },
2236 File {
2238 data: Vec<u8>,
2240 mimetype: Option<String>,
2242 size: Option<u64>,
2244 },
2245}
2246
2247#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2249pub struct DraftThumbnail {
2250 pub filename: String,
2252 pub data: Vec<u8>,
2254 pub mimetype: Option<String>,
2256 pub width: Option<u64>,
2258 pub height: Option<u64>,
2260 pub size: Option<u64>,
2262}
2263
2264#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2266pub enum ComposerDraftType {
2267 NewMessage,
2269 Reply {
2271 event_id: OwnedEventId,
2273 },
2274 Edit {
2276 event_id: OwnedEventId,
2278 },
2279}
2280
2281impl StateStoreDataValue {
2282 pub fn into_sync_token(self) -> Option<String> {
2284 as_variant!(self, Self::SyncToken)
2285 }
2286
2287 pub fn into_filter(self) -> Option<String> {
2289 as_variant!(self, Self::Filter)
2290 }
2291
2292 pub fn into_user_avatar_url(self) -> Option<OwnedMxcUri> {
2294 as_variant!(self, Self::UserAvatarUrl)
2295 }
2296
2297 pub fn into_recently_visited_rooms(self) -> Option<Vec<OwnedRoomId>> {
2299 as_variant!(self, Self::RecentlyVisitedRooms)
2300 }
2301
2302 pub fn into_utd_hook_manager_data(self) -> Option<GrowableBloom> {
2304 as_variant!(self, Self::UtdHookManagerData)
2305 }
2306
2307 pub fn into_composer_draft(self) -> Option<ComposerDraft> {
2309 as_variant!(self, Self::ComposerDraft)
2310 }
2311
2312 pub fn into_supported_versions(self) -> Option<TtlValue<SupportedVersionsResponse>> {
2314 as_variant!(self, Self::SupportedVersions)
2315 }
2316
2317 pub fn into_well_known(self) -> Option<TtlValue<Option<WellKnownResponse>>> {
2319 as_variant!(self, Self::WellKnown)
2320 }
2321
2322 pub fn into_seen_knock_requests(self) -> Option<BTreeMap<OwnedEventId, OwnedUserId>> {
2324 as_variant!(self, Self::SeenKnockRequests)
2325 }
2326
2327 pub fn into_thread_subscriptions_catchup_tokens(
2330 self,
2331 ) -> Option<Vec<ThreadSubscriptionCatchupToken>> {
2332 as_variant!(self, Self::ThreadSubscriptionsCatchupTokens)
2333 }
2334
2335 pub fn into_homeserver_capabilities(self) -> Option<TtlValue<Capabilities>> {
2338 as_variant!(self, Self::HomeserverCapabilities)
2339 }
2340}
2341
2342#[derive(Debug, Clone, Copy)]
2344pub enum StateStoreDataKey<'a> {
2345 SyncToken,
2347
2348 SupportedVersions,
2350
2351 WellKnown,
2353
2354 Filter(&'a str),
2356
2357 UserAvatarUrl(&'a UserId),
2359
2360 RecentlyVisitedRooms(&'a UserId),
2362
2363 UtdHookManagerData,
2366
2367 OneTimeKeyAlreadyUploaded,
2370
2371 ComposerDraft(&'a RoomId, Option<&'a EventId>),
2376
2377 SeenKnockRequests(&'a RoomId),
2379
2380 ThreadSubscriptionsCatchupTokens,
2382
2383 HomeserverCapabilities,
2385}
2386
2387impl StateStoreDataKey<'_> {
2388 pub const SYNC_TOKEN: &'static str = "sync_token";
2390
2391 pub const SUPPORTED_VERSIONS: &'static str = "server_capabilities"; pub const WELL_KNOWN: &'static str = "well_known";
2398
2399 pub const FILTER: &'static str = "filter";
2401
2402 pub const USER_AVATAR_URL: &'static str = "user_avatar_url";
2405
2406 pub const RECENTLY_VISITED_ROOMS: &'static str = "recently_visited_rooms";
2409
2410 pub const UTD_HOOK_MANAGER_DATA: &'static str = "utd_hook_manager_data";
2413
2414 pub const ONE_TIME_KEY_ALREADY_UPLOADED: &'static str = "one_time_key_already_uploaded";
2417
2418 pub const COMPOSER_DRAFT: &'static str = "composer_draft";
2421
2422 pub const SEEN_KNOCK_REQUESTS: &'static str = "seen_knock_requests";
2425
2426 pub const THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS: &'static str =
2429 "thread_subscriptions_catchup_tokens";
2430
2431 pub const HOMESERVER_CAPABILITIES: &'static str = "homeserver_capabilities";
2433}
2434
2435pub fn compare_thread_subscription_bump_stamps(
2444 previous: Option<u64>,
2445 new: &mut Option<u64>,
2446) -> bool {
2447 match (previous, &new) {
2448 (Some(prev_bump), None) => {
2451 *new = Some(prev_bump);
2452 }
2453
2454 (Some(prev_bump), Some(new_bump)) if *new_bump <= prev_bump => {
2456 return false;
2457 }
2458
2459 _ => {}
2461 }
2462
2463 true
2464}
2465
2466#[cfg(test)]
2467mod tests {
2468 mod save_locked_state_store {
2469 use std::time::Duration;
2470
2471 use assert_matches::assert_matches;
2472 use futures_util::future::{self, Either};
2473 #[cfg(all(target_family = "wasm", target_os = "unknown"))]
2474 use gloo_timers::future::sleep;
2475 use matrix_sdk_common::executor::spawn;
2476 use matrix_sdk_test::async_test;
2477 use tokio::sync::Mutex;
2478 #[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
2479 use tokio::time::sleep;
2480
2481 use crate::{
2482 StateChanges, StateStore,
2483 store::{IntoStateStore, MemoryStore, Result, SaveLockedStateStore},
2484 };
2485
2486 async fn get_store() -> Result<impl StateStore> {
2487 Ok(SaveLockedStateStore::new(MemoryStore::new()))
2488 }
2489
2490 statestore_integration_tests!();
2491
2492 #[async_test]
2493 async fn test_state_store_only_accepts_guard_for_underlying_mutex() {
2494 let state_store = SaveLockedStateStore::new(MemoryStore::new());
2495 let state_changes = StateChanges::default();
2496 state_store
2497 .save_changes_with_guard(&state_store.lock().lock().await, &state_changes)
2498 .await
2499 .expect("state store accepts guard for underlying mutex");
2500
2501 let mutex = Mutex::new(());
2502 state_store
2503 .save_changes_with_guard(&mutex.lock().await, &state_changes)
2504 .await
2505 .expect_err("state store does not accept guard for unknown mutex");
2506 }
2507
2508 #[derive(Debug)]
2509 struct Elapsed;
2510
2511 async fn timeout<F: Future + Unpin>(
2512 duration: Duration,
2513 f: F,
2514 ) -> Result<F::Output, Elapsed> {
2515 #[cfg(all(target_family = "wasm", target_os = "unknown"))]
2516 {
2517 match future::select(sleep(duration), f).await {
2518 Either::Left(_) => return Err(Elapsed),
2519 Either::Right((output, _)) => Ok(output),
2520 }
2521 }
2522 #[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
2523 {
2524 tokio::time::timeout(duration, f).await.map_err(|_| Elapsed)
2525 }
2526 }
2527
2528 #[async_test]
2529 async fn test_state_store_waits_to_acquire_lock_before_saving_changes() {
2530 let state_store = SaveLockedStateStore::new(MemoryStore::new().into_state_store());
2531
2532 let lock_task = spawn({
2534 let state_store = state_store.clone();
2535 async move {
2536 let lock = state_store.lock();
2537 let _guard = lock.lock().await;
2538 sleep(Duration::from_secs(5)).await;
2539 }
2540 });
2541
2542 let save_task =
2544 spawn(async move { state_store.save_changes(&StateChanges::default()).await });
2545
2546 assert_matches!(future::select(lock_task, save_task).await, Either::Left((_, save_task)) => {
2549 timeout(Duration::from_millis(100), save_task)
2550 .await
2551 .expect("task completes before timeout")
2552 .expect("task completes successfully")
2553 .expect("task saves changes");
2554 });
2555 }
2556 }
2557}