Skip to main content

matrix_sdk_base/store/
traits.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    borrow::Borrow,
17    collections::{BTreeMap, BTreeSet, HashMap},
18    fmt,
19    ops::Deref,
20    sync::Arc,
21};
22
23use as_variant::as_variant;
24use async_trait::async_trait;
25use growable_bloom_filter::GrowableBloom;
26use matrix_sdk_common::{AsyncTraitDeps, ttl::TtlValue};
27use ruma::{
28    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri, OwnedRoomId,
29    OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
30    api::{
31        MatrixVersion, SupportedVersions,
32        client::discovery::{
33            discover_homeserver::{
34                self, HomeserverInfo, IdentityServerInfo, RtcFocusInfo, TileServerInfo,
35            },
36            get_capabilities::v3::Capabilities,
37        },
38    },
39    events::{
40        AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, EmptyStateKey, GlobalAccountDataEvent,
41        GlobalAccountDataEventContent, GlobalAccountDataEventType, RedactContent,
42        RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
43        RoomAccountDataEventType, StateEventType, StaticEventContent, StaticStateEventContent,
44        presence::PresenceEvent,
45        receipt::{Receipt, ReceiptThread, ReceiptType},
46    },
47    serde::Raw,
48};
49use serde::{Deserialize, Serialize};
50use thiserror::Error;
51use tokio::sync::{Mutex, MutexGuard};
52
53use super::{
54    ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError,
55    QueuedRequest, QueuedRequestKind, RoomLoadSettings, StateChanges, StoreError,
56    send_queue::SentRequestKey,
57};
58use crate::{
59    MinimalRoomMemberEvent, RoomInfo, RoomMemberships,
60    deserialized_responses::{
61        DisplayName, RawAnySyncOrStrippedState, RawMemberEvent, RawSyncOrStrippedState,
62    },
63    store::StoredThreadSubscription,
64};
65
66/// An abstract state store trait that can be used to implement different stores
67/// for the SDK.
68#[cfg_attr(target_family = "wasm", async_trait(?Send))]
69#[cfg_attr(not(target_family = "wasm"), async_trait)]
70pub trait StateStore: AsyncTraitDeps {
71    /// The error type used by this state store.
72    type Error: fmt::Debug + Into<StoreError> + From<serde_json::Error>;
73
74    /// Get key-value data from the store.
75    ///
76    /// # Arguments
77    ///
78    /// * `key` - The key to fetch data for.
79    async fn get_kv_data(
80        &self,
81        key: StateStoreDataKey<'_>,
82    ) -> Result<Option<StateStoreDataValue>, Self::Error>;
83
84    /// Put key-value data into the store.
85    ///
86    /// # Arguments
87    ///
88    /// * `key` - The key to identify the data in the store.
89    ///
90    /// * `value` - The data to insert.
91    ///
92    /// Panics if the key and value variants do not match.
93    async fn set_kv_data(
94        &self,
95        key: StateStoreDataKey<'_>,
96        value: StateStoreDataValue,
97    ) -> Result<(), Self::Error>;
98
99    /// Remove key-value data from the store.
100    ///
101    /// # Arguments
102    ///
103    /// * `key` - The key to remove the data for.
104    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error>;
105
106    /// Save the set of state changes in the store.
107    async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error>;
108
109    /// Get the stored presence event for the given user.
110    ///
111    /// # Arguments
112    ///
113    /// * `user_id` - The id of the user for which we wish to fetch the presence
114    /// event for.
115    async fn get_presence_event(
116        &self,
117        user_id: &UserId,
118    ) -> Result<Option<Raw<PresenceEvent>>, Self::Error>;
119
120    /// Get the stored presence events for the given users.
121    ///
122    /// # Arguments
123    ///
124    /// * `user_ids` - The IDs of the users to fetch the presence events for.
125    async fn get_presence_events(
126        &self,
127        user_ids: &[OwnedUserId],
128    ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error>;
129
130    /// Get a state event out of the state store.
131    ///
132    /// # Arguments
133    ///
134    /// * `room_id` - The id of the room the state event was received for.
135    ///
136    /// * `event_type` - The event type of the state event.
137    async fn get_state_event(
138        &self,
139        room_id: &RoomId,
140        event_type: StateEventType,
141        state_key: &str,
142    ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error>;
143
144    /// Get a list of state events for a given room and `StateEventType`.
145    ///
146    /// # Arguments
147    ///
148    /// * `room_id` - The id of the room to find events for.
149    ///
150    /// * `event_type` - The event type.
151    async fn get_state_events(
152        &self,
153        room_id: &RoomId,
154        event_type: StateEventType,
155    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error>;
156
157    /// Get a list of state events for a given room, `StateEventType`, and the
158    /// given state keys.
159    ///
160    /// # Arguments
161    ///
162    /// * `room_id` - The id of the room to find events for.
163    ///
164    /// * `event_type` - The event type.
165    ///
166    /// * `state_keys` - The list of state keys to find.
167    async fn get_state_events_for_keys(
168        &self,
169        room_id: &RoomId,
170        event_type: StateEventType,
171        state_keys: &[&str],
172    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error>;
173
174    /// Get the current profile for the given user in the given room.
175    ///
176    /// # Arguments
177    ///
178    /// * `room_id` - The room id the profile is used in.
179    ///
180    /// * `user_id` - The id of the user the profile belongs to.
181    async fn get_profile(
182        &self,
183        room_id: &RoomId,
184        user_id: &UserId,
185    ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error>;
186
187    /// Get the current profiles for the given users in the given room.
188    ///
189    /// # Arguments
190    ///
191    /// * `room_id` - The ID of the room the profiles are used in.
192    ///
193    /// * `user_ids` - The IDs of the users the profiles belong to.
194    async fn get_profiles<'a>(
195        &self,
196        room_id: &RoomId,
197        user_ids: &'a [OwnedUserId],
198    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error>;
199
200    /// Get the user ids of members for a given room with the given memberships,
201    /// for stripped and regular rooms alike.
202    async fn get_user_ids(
203        &self,
204        room_id: &RoomId,
205        memberships: RoomMemberships,
206    ) -> Result<Vec<OwnedUserId>, Self::Error>;
207
208    /// Get a set of pure `RoomInfo`s the store knows about.
209    async fn get_room_infos(
210        &self,
211        room_load_settings: &RoomLoadSettings,
212    ) -> Result<Vec<RoomInfo>, Self::Error>;
213
214    /// Get all the users that use the given display name in the given room.
215    ///
216    /// # Arguments
217    ///
218    /// * `room_id` - The id of the room for which the display name users should
219    /// be fetched for.
220    ///
221    /// * `display_name` - The display name that the users use.
222    async fn get_users_with_display_name(
223        &self,
224        room_id: &RoomId,
225        display_name: &DisplayName,
226    ) -> Result<BTreeSet<OwnedUserId>, Self::Error>;
227
228    /// Get all the users that use the given display names in the given room.
229    ///
230    /// # Arguments
231    ///
232    /// * `room_id` - The ID of the room to fetch the display names for.
233    ///
234    /// * `display_names` - The display names that the users use.
235    async fn get_users_with_display_names<'a>(
236        &self,
237        room_id: &RoomId,
238        display_names: &'a [DisplayName],
239    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error>;
240
241    /// Get an event out of the account data store.
242    ///
243    /// # Arguments
244    ///
245    /// * `event_type` - The event type of the account data event.
246    async fn get_account_data_event(
247        &self,
248        event_type: GlobalAccountDataEventType,
249    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error>;
250
251    /// Get an event out of the room account data store.
252    ///
253    /// # Arguments
254    ///
255    /// * `room_id` - The id of the room for which the room account data event
256    ///   should
257    /// be fetched.
258    ///
259    /// * `event_type` - The event type of the room account data event.
260    async fn get_room_account_data_event(
261        &self,
262        room_id: &RoomId,
263        event_type: RoomAccountDataEventType,
264    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error>;
265
266    /// Get a user's read receipt for a given room and receipt type and thread.
267    ///
268    /// # Arguments
269    ///
270    /// * `room_id` - The id of the room for which the receipt should be
271    ///   fetched.
272    ///
273    /// * `receipt_type` - The type of the receipt.
274    ///
275    /// * `thread` - The thread containing this receipt.
276    ///
277    /// * `user_id` - The id of the user for whom the receipt should be fetched.
278    async fn get_user_room_receipt_event(
279        &self,
280        room_id: &RoomId,
281        receipt_type: ReceiptType,
282        thread: ReceiptThread,
283        user_id: &UserId,
284    ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error>;
285
286    /// Get an event's read receipts for a given room, receipt type, and thread.
287    ///
288    /// # Arguments
289    ///
290    /// * `room_id` - The id of the room for which the receipts should be
291    ///   fetched.
292    ///
293    /// * `receipt_type` - The type of the receipts.
294    ///
295    /// * `thread` - The thread containing this receipt.
296    ///
297    /// * `event_id` - The id of the event for which the receipts should be
298    ///   fetched.
299    async fn get_event_room_receipt_events(
300        &self,
301        room_id: &RoomId,
302        receipt_type: ReceiptType,
303        thread: ReceiptThread,
304        event_id: &EventId,
305    ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error>;
306
307    /// Get arbitrary data from the custom store
308    ///
309    /// # Arguments
310    ///
311    /// * `key` - The key to fetch data for
312    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
313
314    /// Put arbitrary data into the custom store, return the data previously
315    /// stored
316    ///
317    /// # Arguments
318    ///
319    /// * `key` - The key to insert data into
320    ///
321    /// * `value` - The value to insert
322    async fn set_custom_value(
323        &self,
324        key: &[u8],
325        value: Vec<u8>,
326    ) -> Result<Option<Vec<u8>>, Self::Error>;
327
328    /// Put arbitrary data into the custom store, do not attempt to read any
329    /// previous data
330    ///
331    /// Optimization option for set_custom_values for stores that would perform
332    /// better withouts the extra read and the caller not needing that data
333    /// returned. Otherwise this just wraps around `set_custom_data` and
334    /// discards the result.
335    ///
336    /// # Arguments
337    ///
338    /// * `key` - The key to insert data into
339    ///
340    /// * `value` - The value to insert
341    async fn set_custom_value_no_read(
342        &self,
343        key: &[u8],
344        value: Vec<u8>,
345    ) -> Result<(), Self::Error> {
346        self.set_custom_value(key, value).await.map(|_| ())
347    }
348
349    /// Remove arbitrary data from the custom store and return it if existed
350    ///
351    /// # Arguments
352    ///
353    /// * `key` - The key to remove data from
354    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
355
356    /// Remove a room and all elements associated from the state store.
357    ///
358    /// # Arguments
359    ///
360    /// * `room_id` - The `RoomId` of the room to delete.
361    async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error>;
362
363    /// Save a request to be sent by a send queue later (e.g. sending an event).
364    ///
365    /// # Arguments
366    ///
367    /// * `room_id` - The `RoomId` of the send queue's room.
368    /// * `transaction_id` - The unique key identifying the event to be sent
369    ///   (and its transaction). Note: this is expected to be randomly generated
370    ///   and thus unique.
371    /// * `content` - Serializable event content to be sent.
372    async fn save_send_queue_request(
373        &self,
374        room_id: &RoomId,
375        transaction_id: OwnedTransactionId,
376        created_at: MilliSecondsSinceUnixEpoch,
377        request: QueuedRequestKind,
378        priority: usize,
379    ) -> Result<(), Self::Error>;
380
381    /// Updates a send queue request with the given content, and resets its
382    /// error status.
383    ///
384    /// # Arguments
385    ///
386    /// * `room_id` - The `RoomId` of the send queue's room.
387    /// * `transaction_id` - The unique key identifying the request to be sent
388    ///   (and its transaction).
389    /// * `content` - Serializable event content to replace the original one.
390    ///
391    /// Returns true if a request has been updated, or false otherwise.
392    async fn update_send_queue_request(
393        &self,
394        room_id: &RoomId,
395        transaction_id: &TransactionId,
396        content: QueuedRequestKind,
397    ) -> Result<bool, Self::Error>;
398
399    /// Remove a request previously inserted with
400    /// [`Self::save_send_queue_request`] from the database, based on its
401    /// transaction id.
402    ///
403    /// Returns true if something has been removed, or false otherwise.
404    async fn remove_send_queue_request(
405        &self,
406        room_id: &RoomId,
407        transaction_id: &TransactionId,
408    ) -> Result<bool, Self::Error>;
409
410    /// Loads all the send queue requests for the given room.
411    ///
412    /// The resulting vector of queued requests should be ordered from higher
413    /// priority to lower priority, and respect the insertion order when
414    /// priorities are equal.
415    async fn load_send_queue_requests(
416        &self,
417        room_id: &RoomId,
418    ) -> Result<Vec<QueuedRequest>, Self::Error>;
419
420    /// Updates the send queue error status (wedge) for a given send queue
421    /// request.
422    async fn update_send_queue_request_status(
423        &self,
424        room_id: &RoomId,
425        transaction_id: &TransactionId,
426        error: Option<QueueWedgeError>,
427    ) -> Result<(), Self::Error>;
428
429    /// Loads all the rooms which have any pending requests in their send queue.
430    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error>;
431
432    /// Add a new entry to the list of dependent send queue requests for a
433    /// parent request.
434    async fn save_dependent_queued_request(
435        &self,
436        room_id: &RoomId,
437        parent_txn_id: &TransactionId,
438        own_txn_id: ChildTransactionId,
439        created_at: MilliSecondsSinceUnixEpoch,
440        content: DependentQueuedRequestKind,
441    ) -> Result<(), Self::Error>;
442
443    /// Mark a set of dependent send queue requests as ready, using a key
444    /// identifying the homeserver's response.
445    ///
446    /// ⚠ Beware! There's no verification applied that the parent key type is
447    /// compatible with the dependent event type. The invalid state may be
448    /// lazily filtered out in `load_dependent_queued_requests`.
449    ///
450    /// Returns the number of updated requests.
451    async fn mark_dependent_queued_requests_as_ready(
452        &self,
453        room_id: &RoomId,
454        parent_txn_id: &TransactionId,
455        sent_parent_key: SentRequestKey,
456    ) -> Result<usize, Self::Error>;
457
458    /// Update a dependent send queue request with the new content.
459    ///
460    /// Returns true if the request was found and could be updated.
461    async fn update_dependent_queued_request(
462        &self,
463        room_id: &RoomId,
464        own_transaction_id: &ChildTransactionId,
465        new_content: DependentQueuedRequestKind,
466    ) -> Result<bool, Self::Error>;
467
468    /// Remove a specific dependent send queue request by id.
469    ///
470    /// Returns true if the dependent send queue request has been indeed
471    /// removed.
472    async fn remove_dependent_queued_request(
473        &self,
474        room: &RoomId,
475        own_txn_id: &ChildTransactionId,
476    ) -> Result<bool, Self::Error>;
477
478    /// List all the dependent send queue requests.
479    ///
480    /// This returns absolutely all the dependent send queue requests, whether
481    /// they have a parent event id or not. As a contract for implementors, they
482    /// must be returned in insertion order.
483    async fn load_dependent_queued_requests(
484        &self,
485        room: &RoomId,
486    ) -> Result<Vec<DependentQueuedRequest>, Self::Error>;
487
488    /// Inserts or updates multiple thread subscriptions.
489    ///
490    /// If the new thread subscription hasn't set a bumpstamp, and there was a
491    /// previous subscription in the database with a bumpstamp, the existing
492    /// bumpstamp is kept.
493    ///
494    /// If the new thread subscription has a bumpstamp that's lower than or
495    /// equal to a previous one, the existing subscription is kept, i.e.
496    /// this method must have no effect.
497    async fn upsert_thread_subscriptions(
498        &self,
499        updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
500    ) -> Result<(), Self::Error>;
501
502    /// Remove a previous thread subscription for a given room and thread.
503    ///
504    /// Note: removing an unknown thread subscription is a no-op.
505    async fn remove_thread_subscription(
506        &self,
507        room: &RoomId,
508        thread_id: &EventId,
509    ) -> Result<(), Self::Error>;
510
511    /// Loads the current thread subscription for a given room and thread.
512    ///
513    /// Returns `None` if there was no entry for the given room/thread pair.
514    async fn load_thread_subscription(
515        &self,
516        room: &RoomId,
517        thread_id: &EventId,
518    ) -> Result<Option<StoredThreadSubscription>, Self::Error>;
519
520    /// Close the store, releasing all held resources (database connections,
521    /// file descriptors, file locks).
522    ///
523    /// In-flight operations complete before this method returns. After it
524    /// returns, operations will fail until [`Self::reopen()`] is called.
525    async fn close(&self) -> Result<(), Self::Error>;
526
527    /// Reopen the store after a [`Self::close()`], re-acquiring database
528    /// connections.
529    async fn reopen(&self) -> Result<(), Self::Error>;
530
531    /// Perform database optimizations if any are available, i.e. vacuuming in
532    /// SQLite.
533    ///
534    /// /// **Warning:** this was added to check if SQLite fragmentation was the
535    /// source of performance issues, **DO NOT use in production**.
536    #[doc(hidden)]
537    async fn optimize(&self) -> Result<(), Self::Error>;
538
539    /// Returns the size of the store in bytes, if known.
540    async fn get_size(&self) -> Result<Option<usize>, Self::Error>;
541}
542
543#[cfg_attr(target_family = "wasm", async_trait(?Send))]
544#[cfg_attr(not(target_family = "wasm"), async_trait)]
545impl<T: StateStore> StateStore for &T {
546    type Error = T::Error;
547
548    async fn get_kv_data(
549        &self,
550        key: StateStoreDataKey<'_>,
551    ) -> Result<Option<StateStoreDataValue>, Self::Error> {
552        (*self).get_kv_data(key).await
553    }
554
555    async fn set_kv_data(
556        &self,
557        key: StateStoreDataKey<'_>,
558        value: StateStoreDataValue,
559    ) -> Result<(), Self::Error> {
560        (*self).set_kv_data(key, value).await
561    }
562
563    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
564        (*self).remove_kv_data(key).await
565    }
566
567    async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
568        (*self).save_changes(changes).await
569    }
570
571    async fn get_presence_event(
572        &self,
573        user_id: &UserId,
574    ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
575        (*self).get_presence_event(user_id).await
576    }
577
578    async fn get_presence_events(
579        &self,
580        user_ids: &[OwnedUserId],
581    ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
582        (*self).get_presence_events(user_ids).await
583    }
584
585    async fn get_state_event(
586        &self,
587        room_id: &RoomId,
588        event_type: StateEventType,
589        state_key: &str,
590    ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
591        (*self).get_state_event(room_id, event_type, state_key).await
592    }
593
594    async fn get_state_events(
595        &self,
596        room_id: &RoomId,
597        event_type: StateEventType,
598    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
599        (*self).get_state_events(room_id, event_type).await
600    }
601
602    async fn get_state_events_for_keys(
603        &self,
604        room_id: &RoomId,
605        event_type: StateEventType,
606        state_keys: &[&str],
607    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
608        (*self).get_state_events_for_keys(room_id, event_type, state_keys).await
609    }
610
611    async fn get_profile(
612        &self,
613        room_id: &RoomId,
614        user_id: &UserId,
615    ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
616        (*self).get_profile(room_id, user_id).await
617    }
618
619    async fn get_profiles<'a>(
620        &self,
621        room_id: &RoomId,
622        user_ids: &'a [OwnedUserId],
623    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
624        (*self).get_profiles(room_id, user_ids).await
625    }
626
627    async fn get_user_ids(
628        &self,
629        room_id: &RoomId,
630        memberships: RoomMemberships,
631    ) -> Result<Vec<OwnedUserId>, Self::Error> {
632        (*self).get_user_ids(room_id, memberships).await
633    }
634
635    async fn get_room_infos(
636        &self,
637        room_load_settings: &RoomLoadSettings,
638    ) -> Result<Vec<RoomInfo>, Self::Error> {
639        (*self).get_room_infos(room_load_settings).await
640    }
641
642    async fn get_users_with_display_name(
643        &self,
644        room_id: &RoomId,
645        display_name: &DisplayName,
646    ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
647        (*self).get_users_with_display_name(room_id, display_name).await
648    }
649
650    async fn get_users_with_display_names<'a>(
651        &self,
652        room_id: &RoomId,
653        display_names: &'a [DisplayName],
654    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
655        (*self).get_users_with_display_names(room_id, display_names).await
656    }
657
658    async fn get_account_data_event(
659        &self,
660        event_type: GlobalAccountDataEventType,
661    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
662        (*self).get_account_data_event(event_type).await
663    }
664
665    async fn get_room_account_data_event(
666        &self,
667        room_id: &RoomId,
668        event_type: RoomAccountDataEventType,
669    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
670        (*self).get_room_account_data_event(room_id, event_type).await
671    }
672
673    async fn get_user_room_receipt_event(
674        &self,
675        room_id: &RoomId,
676        receipt_type: ReceiptType,
677        thread: ReceiptThread,
678        user_id: &UserId,
679    ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
680        (*self).get_user_room_receipt_event(room_id, receipt_type, thread, user_id).await
681    }
682
683    async fn get_event_room_receipt_events(
684        &self,
685        room_id: &RoomId,
686        receipt_type: ReceiptType,
687        thread: ReceiptThread,
688        event_id: &EventId,
689    ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
690        (*self).get_event_room_receipt_events(room_id, receipt_type, thread, event_id).await
691    }
692
693    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
694        (*self).get_custom_value(key).await
695    }
696
697    async fn set_custom_value(
698        &self,
699        key: &[u8],
700        value: Vec<u8>,
701    ) -> Result<Option<Vec<u8>>, Self::Error> {
702        (*self).set_custom_value(key, value).await
703    }
704
705    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
706        (*self).remove_custom_value(key).await
707    }
708
709    async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
710        (*self).remove_room(room_id).await
711    }
712
713    async fn save_send_queue_request(
714        &self,
715        room_id: &RoomId,
716        transaction_id: OwnedTransactionId,
717        created_at: MilliSecondsSinceUnixEpoch,
718        request: QueuedRequestKind,
719        priority: usize,
720    ) -> Result<(), Self::Error> {
721        (*self)
722            .save_send_queue_request(room_id, transaction_id, created_at, request, priority)
723            .await
724    }
725
726    async fn update_send_queue_request(
727        &self,
728        room_id: &RoomId,
729        transaction_id: &TransactionId,
730        content: QueuedRequestKind,
731    ) -> Result<bool, Self::Error> {
732        (*self).update_send_queue_request(room_id, transaction_id, content).await
733    }
734
735    async fn remove_send_queue_request(
736        &self,
737        room_id: &RoomId,
738        transaction_id: &TransactionId,
739    ) -> Result<bool, Self::Error> {
740        (*self).remove_send_queue_request(room_id, transaction_id).await
741    }
742
743    async fn load_send_queue_requests(
744        &self,
745        room_id: &RoomId,
746    ) -> Result<Vec<QueuedRequest>, Self::Error> {
747        (*self).load_send_queue_requests(room_id).await
748    }
749
750    async fn update_send_queue_request_status(
751        &self,
752        room_id: &RoomId,
753        transaction_id: &TransactionId,
754        error: Option<QueueWedgeError>,
755    ) -> Result<(), Self::Error> {
756        (*self).update_send_queue_request_status(room_id, transaction_id, error).await
757    }
758
759    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
760        (*self).load_rooms_with_unsent_requests().await
761    }
762
763    async fn save_dependent_queued_request(
764        &self,
765        room_id: &RoomId,
766        parent_txn_id: &TransactionId,
767        own_txn_id: ChildTransactionId,
768        created_at: MilliSecondsSinceUnixEpoch,
769        content: DependentQueuedRequestKind,
770    ) -> Result<(), Self::Error> {
771        (*self)
772            .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
773            .await
774    }
775
776    async fn mark_dependent_queued_requests_as_ready(
777        &self,
778        room_id: &RoomId,
779        parent_txn_id: &TransactionId,
780        sent_parent_key: SentRequestKey,
781    ) -> Result<usize, Self::Error> {
782        (*self)
783            .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
784            .await
785    }
786
787    async fn update_dependent_queued_request(
788        &self,
789        room_id: &RoomId,
790        own_transaction_id: &ChildTransactionId,
791        new_content: DependentQueuedRequestKind,
792    ) -> Result<bool, Self::Error> {
793        (*self).update_dependent_queued_request(room_id, own_transaction_id, new_content).await
794    }
795
796    async fn remove_dependent_queued_request(
797        &self,
798        room: &RoomId,
799        own_txn_id: &ChildTransactionId,
800    ) -> Result<bool, Self::Error> {
801        (*self).remove_dependent_queued_request(room, own_txn_id).await
802    }
803
804    async fn load_dependent_queued_requests(
805        &self,
806        room: &RoomId,
807    ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
808        (*self).load_dependent_queued_requests(room).await
809    }
810
811    async fn upsert_thread_subscriptions(
812        &self,
813        updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
814    ) -> Result<(), Self::Error> {
815        (*self).upsert_thread_subscriptions(updates).await
816    }
817
818    async fn remove_thread_subscription(
819        &self,
820        room: &RoomId,
821        thread_id: &EventId,
822    ) -> Result<(), Self::Error> {
823        (*self).remove_thread_subscription(room, thread_id).await
824    }
825
826    async fn load_thread_subscription(
827        &self,
828        room: &RoomId,
829        thread_id: &EventId,
830    ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
831        (*self).load_thread_subscription(room, thread_id).await
832    }
833
834    async fn close(&self) -> Result<(), Self::Error> {
835        (*self).close().await
836    }
837
838    async fn reopen(&self) -> Result<(), Self::Error> {
839        (*self).reopen().await
840    }
841
842    async fn optimize(&self) -> Result<(), Self::Error> {
843        (*self).optimize().await
844    }
845
846    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
847        (*self).get_size().await
848    }
849}
850
851#[cfg_attr(target_family = "wasm", async_trait(?Send))]
852#[cfg_attr(not(target_family = "wasm"), async_trait)]
853impl<T: StateStore + ?Sized> StateStore for Arc<T> {
854    type Error = T::Error;
855
856    async fn get_kv_data(
857        &self,
858        key: StateStoreDataKey<'_>,
859    ) -> Result<Option<StateStoreDataValue>, Self::Error> {
860        self.deref().get_kv_data(key).await
861    }
862
863    async fn set_kv_data(
864        &self,
865        key: StateStoreDataKey<'_>,
866        value: StateStoreDataValue,
867    ) -> Result<(), Self::Error> {
868        self.deref().set_kv_data(key, value).await
869    }
870
871    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
872        self.deref().remove_kv_data(key).await
873    }
874
875    async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
876        self.deref().save_changes(changes).await
877    }
878
879    async fn get_presence_event(
880        &self,
881        user_id: &UserId,
882    ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
883        self.deref().get_presence_event(user_id).await
884    }
885
886    async fn get_presence_events(
887        &self,
888        user_ids: &[OwnedUserId],
889    ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
890        self.deref().get_presence_events(user_ids).await
891    }
892
893    async fn get_state_event(
894        &self,
895        room_id: &RoomId,
896        event_type: StateEventType,
897        state_key: &str,
898    ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
899        self.deref().get_state_event(room_id, event_type, state_key).await
900    }
901
902    async fn get_state_events(
903        &self,
904        room_id: &RoomId,
905        event_type: StateEventType,
906    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
907        self.deref().get_state_events(room_id, event_type).await
908    }
909
910    async fn get_state_events_for_keys(
911        &self,
912        room_id: &RoomId,
913        event_type: StateEventType,
914        state_keys: &[&str],
915    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
916        self.deref().get_state_events_for_keys(room_id, event_type, state_keys).await
917    }
918
919    async fn get_profile(
920        &self,
921        room_id: &RoomId,
922        user_id: &UserId,
923    ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
924        self.deref().get_profile(room_id, user_id).await
925    }
926
927    async fn get_profiles<'a>(
928        &self,
929        room_id: &RoomId,
930        user_ids: &'a [OwnedUserId],
931    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
932        self.deref().get_profiles(room_id, user_ids).await
933    }
934
935    async fn get_user_ids(
936        &self,
937        room_id: &RoomId,
938        memberships: RoomMemberships,
939    ) -> Result<Vec<OwnedUserId>, Self::Error> {
940        self.deref().get_user_ids(room_id, memberships).await
941    }
942
943    async fn get_room_infos(
944        &self,
945        room_load_settings: &RoomLoadSettings,
946    ) -> Result<Vec<RoomInfo>, Self::Error> {
947        self.deref().get_room_infos(room_load_settings).await
948    }
949
950    async fn get_users_with_display_name(
951        &self,
952        room_id: &RoomId,
953        display_name: &DisplayName,
954    ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
955        self.deref().get_users_with_display_name(room_id, display_name).await
956    }
957
958    async fn get_users_with_display_names<'a>(
959        &self,
960        room_id: &RoomId,
961        display_names: &'a [DisplayName],
962    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
963        self.deref().get_users_with_display_names(room_id, display_names).await
964    }
965
966    async fn get_account_data_event(
967        &self,
968        event_type: GlobalAccountDataEventType,
969    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
970        self.deref().get_account_data_event(event_type).await
971    }
972
973    async fn get_room_account_data_event(
974        &self,
975        room_id: &RoomId,
976        event_type: RoomAccountDataEventType,
977    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
978        self.deref().get_room_account_data_event(room_id, event_type).await
979    }
980
981    async fn get_user_room_receipt_event(
982        &self,
983        room_id: &RoomId,
984        receipt_type: ReceiptType,
985        thread: ReceiptThread,
986        user_id: &UserId,
987    ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
988        self.deref().get_user_room_receipt_event(room_id, receipt_type, thread, user_id).await
989    }
990
991    async fn get_event_room_receipt_events(
992        &self,
993        room_id: &RoomId,
994        receipt_type: ReceiptType,
995        thread: ReceiptThread,
996        event_id: &EventId,
997    ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
998        self.deref().get_event_room_receipt_events(room_id, receipt_type, thread, event_id).await
999    }
1000
1001    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1002        self.deref().get_custom_value(key).await
1003    }
1004
1005    async fn set_custom_value(
1006        &self,
1007        key: &[u8],
1008        value: Vec<u8>,
1009    ) -> Result<Option<Vec<u8>>, Self::Error> {
1010        self.deref().set_custom_value(key, value).await
1011    }
1012
1013    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1014        self.deref().remove_custom_value(key).await
1015    }
1016
1017    async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
1018        self.deref().remove_room(room_id).await
1019    }
1020
1021    async fn save_send_queue_request(
1022        &self,
1023        room_id: &RoomId,
1024        transaction_id: OwnedTransactionId,
1025        created_at: MilliSecondsSinceUnixEpoch,
1026        request: QueuedRequestKind,
1027        priority: usize,
1028    ) -> Result<(), Self::Error> {
1029        self.deref()
1030            .save_send_queue_request(room_id, transaction_id, created_at, request, priority)
1031            .await
1032    }
1033
1034    async fn update_send_queue_request(
1035        &self,
1036        room_id: &RoomId,
1037        transaction_id: &TransactionId,
1038        content: QueuedRequestKind,
1039    ) -> Result<bool, Self::Error> {
1040        self.deref().update_send_queue_request(room_id, transaction_id, content).await
1041    }
1042
1043    async fn remove_send_queue_request(
1044        &self,
1045        room_id: &RoomId,
1046        transaction_id: &TransactionId,
1047    ) -> Result<bool, Self::Error> {
1048        self.deref().remove_send_queue_request(room_id, transaction_id).await
1049    }
1050
1051    async fn load_send_queue_requests(
1052        &self,
1053        room_id: &RoomId,
1054    ) -> Result<Vec<QueuedRequest>, Self::Error> {
1055        self.deref().load_send_queue_requests(room_id).await
1056    }
1057
1058    async fn update_send_queue_request_status(
1059        &self,
1060        room_id: &RoomId,
1061        transaction_id: &TransactionId,
1062        error: Option<QueueWedgeError>,
1063    ) -> Result<(), Self::Error> {
1064        self.deref().update_send_queue_request_status(room_id, transaction_id, error).await
1065    }
1066
1067    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1068        self.deref().load_rooms_with_unsent_requests().await
1069    }
1070
1071    async fn save_dependent_queued_request(
1072        &self,
1073        room_id: &RoomId,
1074        parent_txn_id: &TransactionId,
1075        own_txn_id: ChildTransactionId,
1076        created_at: MilliSecondsSinceUnixEpoch,
1077        content: DependentQueuedRequestKind,
1078    ) -> Result<(), Self::Error> {
1079        self.deref()
1080            .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
1081            .await
1082    }
1083
1084    async fn mark_dependent_queued_requests_as_ready(
1085        &self,
1086        room_id: &RoomId,
1087        parent_txn_id: &TransactionId,
1088        sent_parent_key: SentRequestKey,
1089    ) -> Result<usize, Self::Error> {
1090        self.deref()
1091            .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
1092            .await
1093    }
1094
1095    async fn update_dependent_queued_request(
1096        &self,
1097        room_id: &RoomId,
1098        own_transaction_id: &ChildTransactionId,
1099        new_content: DependentQueuedRequestKind,
1100    ) -> Result<bool, Self::Error> {
1101        self.deref().update_dependent_queued_request(room_id, own_transaction_id, new_content).await
1102    }
1103
1104    async fn remove_dependent_queued_request(
1105        &self,
1106        room: &RoomId,
1107        own_txn_id: &ChildTransactionId,
1108    ) -> Result<bool, Self::Error> {
1109        self.deref().remove_dependent_queued_request(room, own_txn_id).await
1110    }
1111
1112    async fn load_dependent_queued_requests(
1113        &self,
1114        room: &RoomId,
1115    ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
1116        self.deref().load_dependent_queued_requests(room).await
1117    }
1118
1119    async fn upsert_thread_subscriptions(
1120        &self,
1121        updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1122    ) -> Result<(), Self::Error> {
1123        self.deref().upsert_thread_subscriptions(updates).await
1124    }
1125
1126    async fn remove_thread_subscription(
1127        &self,
1128        room: &RoomId,
1129        thread_id: &EventId,
1130    ) -> Result<(), Self::Error> {
1131        self.deref().remove_thread_subscription(room, thread_id).await
1132    }
1133
1134    async fn load_thread_subscription(
1135        &self,
1136        room: &RoomId,
1137        thread_id: &EventId,
1138    ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
1139        self.deref().load_thread_subscription(room, thread_id).await
1140    }
1141
1142    async fn close(&self) -> Result<(), Self::Error> {
1143        self.deref().close().await
1144    }
1145
1146    async fn reopen(&self) -> Result<(), Self::Error> {
1147        self.deref().reopen().await
1148    }
1149
1150    async fn optimize(&self) -> Result<(), Self::Error> {
1151        self.deref().optimize().await
1152    }
1153
1154    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1155        self.deref().get_size().await
1156    }
1157}
1158
1159#[repr(transparent)]
1160struct EraseStateStoreError<T>(T);
1161
1162#[cfg(not(tarpaulin_include))]
1163impl<T: fmt::Debug> fmt::Debug for EraseStateStoreError<T> {
1164    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1165        self.0.fmt(f)
1166    }
1167}
1168
1169#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1170#[cfg_attr(not(target_family = "wasm"), async_trait)]
1171impl<T: StateStore> StateStore for EraseStateStoreError<T> {
1172    type Error = StoreError;
1173
1174    async fn get_kv_data(
1175        &self,
1176        key: StateStoreDataKey<'_>,
1177    ) -> Result<Option<StateStoreDataValue>, Self::Error> {
1178        self.0.get_kv_data(key).await.map_err(Into::into)
1179    }
1180
1181    async fn set_kv_data(
1182        &self,
1183        key: StateStoreDataKey<'_>,
1184        value: StateStoreDataValue,
1185    ) -> Result<(), Self::Error> {
1186        self.0.set_kv_data(key, value).await.map_err(Into::into)
1187    }
1188
1189    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
1190        self.0.remove_kv_data(key).await.map_err(Into::into)
1191    }
1192
1193    async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
1194        self.0.save_changes(changes).await.map_err(Into::into)
1195    }
1196
1197    async fn get_presence_event(
1198        &self,
1199        user_id: &UserId,
1200    ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
1201        self.0.get_presence_event(user_id).await.map_err(Into::into)
1202    }
1203
1204    async fn get_presence_events(
1205        &self,
1206        user_ids: &[OwnedUserId],
1207    ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
1208        self.0.get_presence_events(user_ids).await.map_err(Into::into)
1209    }
1210
1211    async fn get_state_event(
1212        &self,
1213        room_id: &RoomId,
1214        event_type: StateEventType,
1215        state_key: &str,
1216    ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
1217        self.0.get_state_event(room_id, event_type, state_key).await.map_err(Into::into)
1218    }
1219
1220    async fn get_state_events(
1221        &self,
1222        room_id: &RoomId,
1223        event_type: StateEventType,
1224    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1225        self.0.get_state_events(room_id, event_type).await.map_err(Into::into)
1226    }
1227
1228    async fn get_state_events_for_keys(
1229        &self,
1230        room_id: &RoomId,
1231        event_type: StateEventType,
1232        state_keys: &[&str],
1233    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1234        self.0.get_state_events_for_keys(room_id, event_type, state_keys).await.map_err(Into::into)
1235    }
1236
1237    async fn get_profile(
1238        &self,
1239        room_id: &RoomId,
1240        user_id: &UserId,
1241    ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
1242        self.0.get_profile(room_id, user_id).await.map_err(Into::into)
1243    }
1244
1245    async fn get_profiles<'a>(
1246        &self,
1247        room_id: &RoomId,
1248        user_ids: &'a [OwnedUserId],
1249    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
1250        self.0.get_profiles(room_id, user_ids).await.map_err(Into::into)
1251    }
1252
1253    async fn get_user_ids(
1254        &self,
1255        room_id: &RoomId,
1256        memberships: RoomMemberships,
1257    ) -> Result<Vec<OwnedUserId>, Self::Error> {
1258        self.0.get_user_ids(room_id, memberships).await.map_err(Into::into)
1259    }
1260
1261    async fn get_room_infos(
1262        &self,
1263        room_load_settings: &RoomLoadSettings,
1264    ) -> Result<Vec<RoomInfo>, Self::Error> {
1265        self.0.get_room_infos(room_load_settings).await.map_err(Into::into)
1266    }
1267
1268    async fn get_users_with_display_name(
1269        &self,
1270        room_id: &RoomId,
1271        display_name: &DisplayName,
1272    ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
1273        self.0.get_users_with_display_name(room_id, display_name).await.map_err(Into::into)
1274    }
1275
1276    async fn get_users_with_display_names<'a>(
1277        &self,
1278        room_id: &RoomId,
1279        display_names: &'a [DisplayName],
1280    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
1281        self.0.get_users_with_display_names(room_id, display_names).await.map_err(Into::into)
1282    }
1283
1284    async fn get_account_data_event(
1285        &self,
1286        event_type: GlobalAccountDataEventType,
1287    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
1288        self.0.get_account_data_event(event_type).await.map_err(Into::into)
1289    }
1290
1291    async fn get_room_account_data_event(
1292        &self,
1293        room_id: &RoomId,
1294        event_type: RoomAccountDataEventType,
1295    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
1296        self.0.get_room_account_data_event(room_id, event_type).await.map_err(Into::into)
1297    }
1298
1299    async fn get_user_room_receipt_event(
1300        &self,
1301        room_id: &RoomId,
1302        receipt_type: ReceiptType,
1303        thread: ReceiptThread,
1304        user_id: &UserId,
1305    ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
1306        self.0
1307            .get_user_room_receipt_event(room_id, receipt_type, thread, user_id)
1308            .await
1309            .map_err(Into::into)
1310    }
1311
1312    async fn get_event_room_receipt_events(
1313        &self,
1314        room_id: &RoomId,
1315        receipt_type: ReceiptType,
1316        thread: ReceiptThread,
1317        event_id: &EventId,
1318    ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
1319        self.0
1320            .get_event_room_receipt_events(room_id, receipt_type, thread, event_id)
1321            .await
1322            .map_err(Into::into)
1323    }
1324
1325    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1326        self.0.get_custom_value(key).await.map_err(Into::into)
1327    }
1328
1329    async fn set_custom_value(
1330        &self,
1331        key: &[u8],
1332        value: Vec<u8>,
1333    ) -> Result<Option<Vec<u8>>, Self::Error> {
1334        self.0.set_custom_value(key, value).await.map_err(Into::into)
1335    }
1336
1337    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1338        self.0.remove_custom_value(key).await.map_err(Into::into)
1339    }
1340
1341    async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
1342        self.0.remove_room(room_id).await.map_err(Into::into)
1343    }
1344
1345    async fn save_send_queue_request(
1346        &self,
1347        room_id: &RoomId,
1348        transaction_id: OwnedTransactionId,
1349        created_at: MilliSecondsSinceUnixEpoch,
1350        content: QueuedRequestKind,
1351        priority: usize,
1352    ) -> Result<(), Self::Error> {
1353        self.0
1354            .save_send_queue_request(room_id, transaction_id, created_at, content, priority)
1355            .await
1356            .map_err(Into::into)
1357    }
1358
1359    async fn update_send_queue_request(
1360        &self,
1361        room_id: &RoomId,
1362        transaction_id: &TransactionId,
1363        content: QueuedRequestKind,
1364    ) -> Result<bool, Self::Error> {
1365        self.0.update_send_queue_request(room_id, transaction_id, content).await.map_err(Into::into)
1366    }
1367
1368    async fn remove_send_queue_request(
1369        &self,
1370        room_id: &RoomId,
1371        transaction_id: &TransactionId,
1372    ) -> Result<bool, Self::Error> {
1373        self.0.remove_send_queue_request(room_id, transaction_id).await.map_err(Into::into)
1374    }
1375
1376    async fn load_send_queue_requests(
1377        &self,
1378        room_id: &RoomId,
1379    ) -> Result<Vec<QueuedRequest>, Self::Error> {
1380        self.0.load_send_queue_requests(room_id).await.map_err(Into::into)
1381    }
1382
1383    async fn update_send_queue_request_status(
1384        &self,
1385        room_id: &RoomId,
1386        transaction_id: &TransactionId,
1387        error: Option<QueueWedgeError>,
1388    ) -> Result<(), Self::Error> {
1389        self.0
1390            .update_send_queue_request_status(room_id, transaction_id, error)
1391            .await
1392            .map_err(Into::into)
1393    }
1394
1395    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1396        self.0.load_rooms_with_unsent_requests().await.map_err(Into::into)
1397    }
1398
1399    async fn save_dependent_queued_request(
1400        &self,
1401        room_id: &RoomId,
1402        parent_txn_id: &TransactionId,
1403        own_txn_id: ChildTransactionId,
1404        created_at: MilliSecondsSinceUnixEpoch,
1405        content: DependentQueuedRequestKind,
1406    ) -> Result<(), Self::Error> {
1407        self.0
1408            .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
1409            .await
1410            .map_err(Into::into)
1411    }
1412
1413    async fn mark_dependent_queued_requests_as_ready(
1414        &self,
1415        room_id: &RoomId,
1416        parent_txn_id: &TransactionId,
1417        sent_parent_key: SentRequestKey,
1418    ) -> Result<usize, Self::Error> {
1419        self.0
1420            .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
1421            .await
1422            .map_err(Into::into)
1423    }
1424
1425    async fn remove_dependent_queued_request(
1426        &self,
1427        room_id: &RoomId,
1428        own_txn_id: &ChildTransactionId,
1429    ) -> Result<bool, Self::Error> {
1430        self.0.remove_dependent_queued_request(room_id, own_txn_id).await.map_err(Into::into)
1431    }
1432
1433    async fn load_dependent_queued_requests(
1434        &self,
1435        room_id: &RoomId,
1436    ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
1437        self.0.load_dependent_queued_requests(room_id).await.map_err(Into::into)
1438    }
1439
1440    async fn update_dependent_queued_request(
1441        &self,
1442        room_id: &RoomId,
1443        own_transaction_id: &ChildTransactionId,
1444        new_content: DependentQueuedRequestKind,
1445    ) -> Result<bool, Self::Error> {
1446        self.0
1447            .update_dependent_queued_request(room_id, own_transaction_id, new_content)
1448            .await
1449            .map_err(Into::into)
1450    }
1451
1452    async fn upsert_thread_subscriptions(
1453        &self,
1454        updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1455    ) -> Result<(), Self::Error> {
1456        self.0.upsert_thread_subscriptions(updates).await.map_err(Into::into)
1457    }
1458
1459    async fn load_thread_subscription(
1460        &self,
1461        room: &RoomId,
1462        thread_id: &EventId,
1463    ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
1464        self.0.load_thread_subscription(room, thread_id).await.map_err(Into::into)
1465    }
1466
1467    async fn remove_thread_subscription(
1468        &self,
1469        room: &RoomId,
1470        thread_id: &EventId,
1471    ) -> Result<(), Self::Error> {
1472        self.0.remove_thread_subscription(room, thread_id).await.map_err(Into::into)
1473    }
1474
1475    async fn close(&self) -> Result<(), Self::Error> {
1476        self.0.close().await.map_err(Into::into)
1477    }
1478
1479    async fn reopen(&self) -> Result<(), Self::Error> {
1480        self.0.reopen().await.map_err(Into::into)
1481    }
1482
1483    async fn optimize(&self) -> Result<(), Self::Error> {
1484        self.0.optimize().await.map_err(Into::into)
1485    }
1486
1487    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1488        self.0.get_size().await.map_err(Into::into)
1489    }
1490}
1491
1492/// A wrapper around a [`StateStore`] that supports synchronizing calls to
1493/// [`StateStore::save_changes`].
1494#[derive(Debug, Clone)]
1495pub struct SaveLockedStateStore<T = Arc<DynStateStore>> {
1496    store: T,
1497    lock: Arc<Mutex<()>>,
1498}
1499
1500/// An error type that represents a scenario where a [`MutexGuard`] provided to
1501/// a function does not reference the underlying [`Mutex`] in the enclosing
1502/// [`SaveLockedStateStore`].
1503#[derive(Debug, Error)]
1504#[error("a mutex guard was provided, but it does not reference the correct mutex")]
1505pub struct IncorrectMutexGuardError;
1506
1507impl From<IncorrectMutexGuardError> for StoreError {
1508    fn from(value: IncorrectMutexGuardError) -> Self {
1509        Self::backend(value)
1510    }
1511}
1512
1513impl<T> SaveLockedStateStore<T> {
1514    /// Creates a new [`SaveLockedStateStore`] with the provided store.
1515    pub fn new(store: T) -> Self {
1516        Self { store, lock: Arc::new(Mutex::new(())) }
1517    }
1518
1519    /// Returns a reference to the underlying [`Mutex`] used to synchronize
1520    /// calls to [`StateStore::save_changes`].
1521    pub fn lock(&self) -> &Mutex<()> {
1522        self.lock.as_ref()
1523    }
1524}
1525
1526impl<T: StateStore> SaveLockedStateStore<T> {
1527    /// Provides a means of calling [`StateStore::save_changes`] when the caller
1528    /// has already acquired the underlying [`Mutex`]. Returns an error if
1529    /// the [`MutexGuard`] provided does not reference the underlying
1530    /// [`Mutex`].
1531    pub async fn save_changes_with_guard(
1532        &self,
1533        guard: &MutexGuard<'_, ()>,
1534        changes: &StateChanges,
1535    ) -> Result<(), StoreError> {
1536        if !std::ptr::eq(MutexGuard::mutex(guard), self.lock()) {
1537            Err(IncorrectMutexGuardError.into())
1538        } else {
1539            self.store.save_changes(changes).await.map_err(Into::into)
1540        }
1541    }
1542}
1543
1544#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1545#[cfg_attr(not(target_family = "wasm"), async_trait)]
1546impl<T: StateStore> StateStore for SaveLockedStateStore<T> {
1547    type Error = T::Error;
1548
1549    async fn get_kv_data(
1550        &self,
1551        key: StateStoreDataKey<'_>,
1552    ) -> Result<Option<StateStoreDataValue>, Self::Error> {
1553        self.store.get_kv_data(key).await
1554    }
1555
1556    async fn set_kv_data(
1557        &self,
1558        key: StateStoreDataKey<'_>,
1559        value: StateStoreDataValue,
1560    ) -> Result<(), Self::Error> {
1561        self.store.set_kv_data(key, value).await
1562    }
1563
1564    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
1565        self.store.remove_kv_data(key).await
1566    }
1567
1568    async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
1569        let _guard = self.lock.lock().await;
1570        self.store.save_changes(changes).await
1571    }
1572
1573    async fn get_presence_event(
1574        &self,
1575        user_id: &UserId,
1576    ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
1577        self.store.get_presence_event(user_id).await
1578    }
1579
1580    async fn get_presence_events(
1581        &self,
1582        user_ids: &[OwnedUserId],
1583    ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
1584        self.store.get_presence_events(user_ids).await
1585    }
1586
1587    async fn get_state_event(
1588        &self,
1589        room_id: &RoomId,
1590        event_type: StateEventType,
1591        state_key: &str,
1592    ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
1593        self.store.get_state_event(room_id, event_type, state_key).await
1594    }
1595
1596    async fn get_state_events(
1597        &self,
1598        room_id: &RoomId,
1599        event_type: StateEventType,
1600    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1601        self.store.get_state_events(room_id, event_type).await
1602    }
1603
1604    async fn get_state_events_for_keys(
1605        &self,
1606        room_id: &RoomId,
1607        event_type: StateEventType,
1608        state_keys: &[&str],
1609    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1610        self.store.get_state_events_for_keys(room_id, event_type, state_keys).await
1611    }
1612
1613    async fn get_profile(
1614        &self,
1615        room_id: &RoomId,
1616        user_id: &UserId,
1617    ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
1618        self.store.get_profile(room_id, user_id).await
1619    }
1620
1621    async fn get_profiles<'a>(
1622        &self,
1623        room_id: &RoomId,
1624        user_ids: &'a [OwnedUserId],
1625    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
1626        self.store.get_profiles(room_id, user_ids).await
1627    }
1628
1629    async fn get_user_ids(
1630        &self,
1631        room_id: &RoomId,
1632        memberships: RoomMemberships,
1633    ) -> Result<Vec<OwnedUserId>, Self::Error> {
1634        self.store.get_user_ids(room_id, memberships).await
1635    }
1636
1637    async fn get_room_infos(
1638        &self,
1639        room_load_settings: &RoomLoadSettings,
1640    ) -> Result<Vec<RoomInfo>, Self::Error> {
1641        self.store.get_room_infos(room_load_settings).await
1642    }
1643
1644    async fn get_users_with_display_name(
1645        &self,
1646        room_id: &RoomId,
1647        display_name: &DisplayName,
1648    ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
1649        self.store.get_users_with_display_name(room_id, display_name).await
1650    }
1651
1652    async fn get_users_with_display_names<'a>(
1653        &self,
1654        room_id: &RoomId,
1655        display_names: &'a [DisplayName],
1656    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
1657        self.store.get_users_with_display_names(room_id, display_names).await
1658    }
1659
1660    async fn get_account_data_event(
1661        &self,
1662        event_type: GlobalAccountDataEventType,
1663    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
1664        self.store.get_account_data_event(event_type).await
1665    }
1666
1667    async fn get_room_account_data_event(
1668        &self,
1669        room_id: &RoomId,
1670        event_type: RoomAccountDataEventType,
1671    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
1672        self.store.get_room_account_data_event(room_id, event_type).await
1673    }
1674
1675    async fn get_user_room_receipt_event(
1676        &self,
1677        room_id: &RoomId,
1678        receipt_type: ReceiptType,
1679        thread: ReceiptThread,
1680        user_id: &UserId,
1681    ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
1682        self.store.get_user_room_receipt_event(room_id, receipt_type, thread, user_id).await
1683    }
1684
1685    async fn get_event_room_receipt_events(
1686        &self,
1687        room_id: &RoomId,
1688        receipt_type: ReceiptType,
1689        thread: ReceiptThread,
1690        event_id: &EventId,
1691    ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
1692        self.store.get_event_room_receipt_events(room_id, receipt_type, thread, event_id).await
1693    }
1694
1695    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1696        self.store.get_custom_value(key).await
1697    }
1698
1699    async fn set_custom_value(
1700        &self,
1701        key: &[u8],
1702        value: Vec<u8>,
1703    ) -> Result<Option<Vec<u8>>, Self::Error> {
1704        self.store.set_custom_value(key, value).await
1705    }
1706
1707    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1708        self.store.remove_custom_value(key).await
1709    }
1710
1711    async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
1712        self.store.remove_room(room_id).await
1713    }
1714
1715    async fn save_send_queue_request(
1716        &self,
1717        room_id: &RoomId,
1718        transaction_id: OwnedTransactionId,
1719        created_at: MilliSecondsSinceUnixEpoch,
1720        request: QueuedRequestKind,
1721        priority: usize,
1722    ) -> Result<(), Self::Error> {
1723        self.store
1724            .save_send_queue_request(room_id, transaction_id, created_at, request, priority)
1725            .await
1726    }
1727
1728    async fn update_send_queue_request(
1729        &self,
1730        room_id: &RoomId,
1731        transaction_id: &TransactionId,
1732        content: QueuedRequestKind,
1733    ) -> Result<bool, Self::Error> {
1734        self.store.update_send_queue_request(room_id, transaction_id, content).await
1735    }
1736
1737    async fn remove_send_queue_request(
1738        &self,
1739        room_id: &RoomId,
1740        transaction_id: &TransactionId,
1741    ) -> Result<bool, Self::Error> {
1742        self.store.remove_send_queue_request(room_id, transaction_id).await
1743    }
1744
1745    async fn load_send_queue_requests(
1746        &self,
1747        room_id: &RoomId,
1748    ) -> Result<Vec<QueuedRequest>, Self::Error> {
1749        self.store.load_send_queue_requests(room_id).await
1750    }
1751
1752    async fn update_send_queue_request_status(
1753        &self,
1754        room_id: &RoomId,
1755        transaction_id: &TransactionId,
1756        error: Option<QueueWedgeError>,
1757    ) -> Result<(), Self::Error> {
1758        self.store.update_send_queue_request_status(room_id, transaction_id, error).await
1759    }
1760
1761    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1762        self.store.load_rooms_with_unsent_requests().await
1763    }
1764
1765    async fn save_dependent_queued_request(
1766        &self,
1767        room_id: &RoomId,
1768        parent_txn_id: &TransactionId,
1769        own_txn_id: ChildTransactionId,
1770        created_at: MilliSecondsSinceUnixEpoch,
1771        content: DependentQueuedRequestKind,
1772    ) -> Result<(), Self::Error> {
1773        self.store
1774            .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
1775            .await
1776    }
1777
1778    async fn mark_dependent_queued_requests_as_ready(
1779        &self,
1780        room_id: &RoomId,
1781        parent_txn_id: &TransactionId,
1782        sent_parent_key: SentRequestKey,
1783    ) -> Result<usize, Self::Error> {
1784        self.store
1785            .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
1786            .await
1787    }
1788
1789    async fn update_dependent_queued_request(
1790        &self,
1791        room_id: &RoomId,
1792        own_transaction_id: &ChildTransactionId,
1793        new_content: DependentQueuedRequestKind,
1794    ) -> Result<bool, Self::Error> {
1795        self.store.update_dependent_queued_request(room_id, own_transaction_id, new_content).await
1796    }
1797
1798    async fn remove_dependent_queued_request(
1799        &self,
1800        room: &RoomId,
1801        own_txn_id: &ChildTransactionId,
1802    ) -> Result<bool, Self::Error> {
1803        self.store.remove_dependent_queued_request(room, own_txn_id).await
1804    }
1805
1806    async fn load_dependent_queued_requests(
1807        &self,
1808        room: &RoomId,
1809    ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
1810        self.store.load_dependent_queued_requests(room).await
1811    }
1812
1813    async fn upsert_thread_subscriptions(
1814        &self,
1815        updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1816    ) -> Result<(), Self::Error> {
1817        self.store.upsert_thread_subscriptions(updates).await
1818    }
1819
1820    async fn load_thread_subscription(
1821        &self,
1822        room: &RoomId,
1823        thread_id: &EventId,
1824    ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
1825        self.store.load_thread_subscription(room, thread_id).await
1826    }
1827
1828    async fn remove_thread_subscription(
1829        &self,
1830        room: &RoomId,
1831        thread_id: &EventId,
1832    ) -> Result<(), Self::Error> {
1833        self.store.remove_thread_subscription(room, thread_id).await
1834    }
1835
1836    async fn close(&self) -> Result<(), Self::Error> {
1837        self.store.close().await
1838    }
1839
1840    async fn reopen(&self) -> Result<(), Self::Error> {
1841        self.store.reopen().await
1842    }
1843
1844    async fn optimize(&self) -> Result<(), Self::Error> {
1845        self.store.optimize().await
1846    }
1847
1848    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1849        self.store.get_size().await
1850    }
1851}
1852
1853/// Convenience functionality for state stores.
1854#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1855#[cfg_attr(not(target_family = "wasm"), async_trait)]
1856pub trait StateStoreExt: StateStore {
1857    /// Get a specific state event of statically-known type.
1858    ///
1859    /// # Arguments
1860    ///
1861    /// * `room_id` - The id of the room the state event was received for.
1862    async fn get_state_event_static<C>(
1863        &self,
1864        room_id: &RoomId,
1865    ) -> Result<Option<RawSyncOrStrippedState<C>>, Self::Error>
1866    where
1867        C: StaticEventContent<IsPrefix = ruma::events::False>
1868            + StaticStateEventContent<StateKey = EmptyStateKey>
1869            + RedactContent,
1870        C::Redacted: RedactedStateEventContent,
1871    {
1872        Ok(self.get_state_event(room_id, C::TYPE.into(), "").await?.map(|raw| raw.cast()))
1873    }
1874
1875    /// Get a specific state event of statically-known type.
1876    ///
1877    /// # Arguments
1878    ///
1879    /// * `room_id` - The id of the room the state event was received for.
1880    async fn get_state_event_static_for_key<C, K>(
1881        &self,
1882        room_id: &RoomId,
1883        state_key: &K,
1884    ) -> Result<Option<RawSyncOrStrippedState<C>>, Self::Error>
1885    where
1886        C: StaticEventContent<IsPrefix = ruma::events::False>
1887            + StaticStateEventContent
1888            + RedactContent,
1889        C::StateKey: Borrow<K>,
1890        C::Redacted: RedactedStateEventContent,
1891        K: AsRef<str> + ?Sized + Sync,
1892    {
1893        Ok(self
1894            .get_state_event(room_id, C::TYPE.into(), state_key.as_ref())
1895            .await?
1896            .map(|raw| raw.cast()))
1897    }
1898
1899    /// Get a list of state events of a statically-known type for a given room.
1900    ///
1901    /// # Arguments
1902    ///
1903    /// * `room_id` - The id of the room to find events for.
1904    async fn get_state_events_static<C>(
1905        &self,
1906        room_id: &RoomId,
1907    ) -> Result<Vec<RawSyncOrStrippedState<C>>, Self::Error>
1908    where
1909        C: StaticEventContent<IsPrefix = ruma::events::False>
1910            + StaticStateEventContent
1911            + RedactContent,
1912        C::Redacted: RedactedStateEventContent,
1913    {
1914        // FIXME: Could be more efficient, if we had streaming store accessor functions
1915        Ok(self
1916            .get_state_events(room_id, C::TYPE.into())
1917            .await?
1918            .into_iter()
1919            .map(|raw| raw.cast())
1920            .collect())
1921    }
1922
1923    /// Get a list of state events of a statically-known type for a given room
1924    /// and given state keys.
1925    ///
1926    /// # Arguments
1927    ///
1928    /// * `room_id` - The id of the room to find events for.
1929    ///
1930    /// * `state_keys` - The list of state keys to find.
1931    async fn get_state_events_for_keys_static<'a, C, K, I>(
1932        &self,
1933        room_id: &RoomId,
1934        state_keys: I,
1935    ) -> Result<Vec<RawSyncOrStrippedState<C>>, Self::Error>
1936    where
1937        C: StaticEventContent<IsPrefix = ruma::events::False>
1938            + StaticStateEventContent
1939            + RedactContent,
1940        C::StateKey: Borrow<K>,
1941        C::Redacted: RedactedStateEventContent,
1942        K: AsRef<str> + Sized + Sync + 'a,
1943        I: IntoIterator<Item = &'a K> + Send,
1944        I::IntoIter: Send,
1945    {
1946        Ok(self
1947            .get_state_events_for_keys(
1948                room_id,
1949                C::TYPE.into(),
1950                &state_keys.into_iter().map(|k| k.as_ref()).collect::<Vec<_>>(),
1951            )
1952            .await?
1953            .into_iter()
1954            .map(|raw| raw.cast())
1955            .collect())
1956    }
1957
1958    /// Get an event of a statically-known type from the account data store.
1959    async fn get_account_data_event_static<C>(
1960        &self,
1961    ) -> Result<Option<Raw<GlobalAccountDataEvent<C>>>, Self::Error>
1962    where
1963        C: StaticEventContent<IsPrefix = ruma::events::False> + GlobalAccountDataEventContent,
1964    {
1965        Ok(self.get_account_data_event(C::TYPE.into()).await?.map(Raw::cast_unchecked))
1966    }
1967
1968    /// Get an event of a statically-known type from the room account data
1969    /// store.
1970    ///
1971    /// # Arguments
1972    ///
1973    /// * `room_id` - The id of the room for which the room account data event
1974    ///   should be fetched.
1975    async fn get_room_account_data_event_static<C>(
1976        &self,
1977        room_id: &RoomId,
1978    ) -> Result<Option<Raw<RoomAccountDataEvent<C>>>, Self::Error>
1979    where
1980        C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
1981    {
1982        Ok(self
1983            .get_room_account_data_event(room_id, C::TYPE.into())
1984            .await?
1985            .map(Raw::cast_unchecked))
1986    }
1987
1988    /// Get the `MemberEvent` for the given state key in the given room id.
1989    ///
1990    /// # Arguments
1991    ///
1992    /// * `room_id` - The room id the member event belongs to.
1993    ///
1994    /// * `state_key` - The user id that the member event defines the state for.
1995    async fn get_member_event(
1996        &self,
1997        room_id: &RoomId,
1998        state_key: &UserId,
1999    ) -> Result<Option<RawMemberEvent>, Self::Error> {
2000        self.get_state_event_static_for_key(room_id, state_key).await
2001    }
2002}
2003
2004#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2005#[cfg_attr(not(target_family = "wasm"), async_trait)]
2006impl<T: StateStore + ?Sized> StateStoreExt for T {}
2007
2008/// A type-erased [`StateStore`].
2009pub type DynStateStore = dyn StateStore<Error = StoreError>;
2010
2011/// A type that can be type-erased into `Arc<dyn StateStore>`.
2012///
2013/// This trait is not meant to be implemented directly outside
2014/// `matrix-sdk-crypto`, but it is automatically implemented for everything that
2015/// implements `StateStore`.
2016pub trait IntoStateStore {
2017    #[doc(hidden)]
2018    fn into_state_store(self) -> Arc<DynStateStore>;
2019}
2020
2021impl<T> IntoStateStore for T
2022where
2023    T: StateStore + Sized + 'static,
2024{
2025    fn into_state_store(self) -> Arc<DynStateStore> {
2026        Arc::new(EraseStateStoreError(self))
2027    }
2028}
2029
2030/// Serialisable representation of get_supported_versions::Response.
2031#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
2032pub struct SupportedVersionsResponse {
2033    /// Versions supported by the remote server.
2034    pub versions: Vec<String>,
2035
2036    /// List of unstable features and their enablement status.
2037    pub unstable_features: BTreeMap<String, bool>,
2038}
2039
2040impl SupportedVersionsResponse {
2041    /// Extracts known Matrix versions and features from the un-typed lists of
2042    /// strings.
2043    ///
2044    /// Note: Matrix versions and features that Ruma cannot parse, or does not
2045    /// know about, are discarded.
2046    pub fn supported_versions(&self) -> SupportedVersions {
2047        let mut supported_versions =
2048            SupportedVersions::from_parts(&self.versions, &self.unstable_features);
2049
2050        // We need at least one supported version to be able to make requests, so we
2051        // default to Matrix 1.0.
2052        if supported_versions.versions.is_empty() {
2053            supported_versions.versions.insert(MatrixVersion::V1_0);
2054        }
2055
2056        supported_versions
2057    }
2058}
2059
2060#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
2061/// A serialisable representation of discover_homeserver::Response.
2062pub struct WellKnownResponse {
2063    /// Information about the homeserver to connect to.
2064    pub homeserver: HomeserverInfo,
2065
2066    /// Information about the identity server to connect to.
2067    pub identity_server: Option<IdentityServerInfo>,
2068
2069    /// Information about the tile server to use to display location data.
2070    pub tile_server: Option<TileServerInfo>,
2071
2072    /// A list of the available MatrixRTC foci, ordered by priority.
2073    pub rtc_foci: Vec<RtcFocusInfo>,
2074}
2075
2076impl From<discover_homeserver::Response> for WellKnownResponse {
2077    fn from(response: discover_homeserver::Response) -> Self {
2078        Self {
2079            homeserver: response.homeserver,
2080            identity_server: response.identity_server,
2081            tile_server: response.tile_server,
2082            rtc_foci: response.rtc_foci,
2083        }
2084    }
2085}
2086
2087/// A value for key-value data that should be persisted into the store.
2088#[derive(Debug, Clone)]
2089pub enum StateStoreDataValue {
2090    /// The sync token.
2091    SyncToken(String),
2092
2093    /// The supported versions of the server.
2094    SupportedVersions(TtlValue<SupportedVersionsResponse>),
2095
2096    /// The well-known information of the server.
2097    WellKnown(TtlValue<Option<WellKnownResponse>>),
2098
2099    /// A filter with the given ID.
2100    Filter(String),
2101
2102    /// The user avatar url
2103    UserAvatarUrl(OwnedMxcUri),
2104
2105    /// A list of recently visited room identifiers for the current user
2106    RecentlyVisitedRooms(Vec<OwnedRoomId>),
2107
2108    /// Persistent data for
2109    /// `matrix_sdk_ui::unable_to_decrypt_hook::UtdHookManager`.
2110    UtdHookManagerData(GrowableBloom),
2111
2112    /// A unit value telling us that the client uploaded duplicate one-time
2113    /// keys.
2114    OneTimeKeyAlreadyUploaded,
2115
2116    /// A composer draft for the room.
2117    /// To learn more, see [`ComposerDraft`].
2118    ///
2119    /// [`ComposerDraft`]: Self::ComposerDraft
2120    ComposerDraft(ComposerDraft),
2121
2122    /// A list of knock request ids marked as seen in a room.
2123    SeenKnockRequests(BTreeMap<OwnedEventId, OwnedUserId>),
2124
2125    /// A list of tokens to continue thread subscriptions catchup.
2126    ///
2127    /// See documentation of [`ThreadSubscriptionCatchupToken`] for more
2128    /// details.
2129    ThreadSubscriptionsCatchupTokens(Vec<ThreadSubscriptionCatchupToken>),
2130
2131    /// The capabilities the homeserver supports or disables.
2132    HomeserverCapabilities(TtlValue<Capabilities>),
2133}
2134
2135/// Tokens to use when catching up on thread subscriptions.
2136///
2137/// These tokens are created when the client receives some thread subscriptions
2138/// from sync, but the sync indicates that there are more thread subscriptions
2139/// available on the server. In this case, it's expected that the client will
2140/// call the [MSC4308] companion endpoint to catch up (back-paginate) on
2141/// previous thread subscriptions.
2142///
2143/// [MSC4308]: https://github.com/matrix-org/matrix-spec-proposals/pull/4308
2144#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2145pub struct ThreadSubscriptionCatchupToken {
2146    /// The token to use as the lower bound when fetching new threads
2147    /// subscriptions.
2148    ///
2149    /// In sliding sync, this is the `prev_batch` value of a sliding sync
2150    /// response.
2151    pub from: String,
2152
2153    /// The token to use as the upper bound when fetching new threads
2154    /// subscriptions.
2155    ///
2156    /// In sliding sync, it must be set to the `pos` value of the sliding sync
2157    /// *request*, which response received a `prev_batch` token.
2158    pub to: Option<String>,
2159}
2160
2161/// Current draft of the composer for the room.
2162#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2163pub struct ComposerDraft {
2164    /// The draft content in plain text.
2165    pub plain_text: String,
2166    /// If the message is formatted in HTML, the HTML representation of the
2167    /// message.
2168    pub html_text: Option<String>,
2169    /// The type of draft.
2170    pub draft_type: ComposerDraftType,
2171    /// Attachments associated with this draft.
2172    #[serde(default)]
2173    pub attachments: Vec<DraftAttachment>,
2174}
2175
2176/// An attachment stored with a composer draft.
2177#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2178pub struct DraftAttachment {
2179    /// The filename of the attachment.
2180    pub filename: String,
2181    /// The attachment content with type-specific data.
2182    pub content: DraftAttachmentContent,
2183}
2184
2185/// The content of a draft attachment with type-specific data.
2186#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2187#[serde(tag = "type")]
2188pub enum DraftAttachmentContent {
2189    /// Image attachment.
2190    Image {
2191        /// The image file data.
2192        data: Vec<u8>,
2193        /// MIME type.
2194        mimetype: Option<String>,
2195        /// File size in bytes.
2196        size: Option<u64>,
2197        /// Width in pixels.
2198        width: Option<u64>,
2199        /// Height in pixels.
2200        height: Option<u64>,
2201        /// BlurHash string.
2202        blurhash: Option<String>,
2203        /// Optional thumbnail.
2204        thumbnail: Option<DraftThumbnail>,
2205    },
2206    /// Video attachment.
2207    Video {
2208        /// The video file data.
2209        data: Vec<u8>,
2210        /// MIME type.
2211        mimetype: Option<String>,
2212        /// File size in bytes.
2213        size: Option<u64>,
2214        /// Width in pixels.
2215        width: Option<u64>,
2216        /// Height in pixels.
2217        height: Option<u64>,
2218        /// Duration.
2219        duration: Option<std::time::Duration>,
2220        /// BlurHash string.
2221        blurhash: Option<String>,
2222        /// Optional thumbnail.
2223        thumbnail: Option<DraftThumbnail>,
2224    },
2225    /// Audio attachment.
2226    Audio {
2227        /// The audio file data.
2228        data: Vec<u8>,
2229        /// MIME type.
2230        mimetype: Option<String>,
2231        /// File size in bytes.
2232        size: Option<u64>,
2233        /// Duration.
2234        duration: Option<std::time::Duration>,
2235    },
2236    /// Generic file attachment.
2237    File {
2238        /// The file data.
2239        data: Vec<u8>,
2240        /// MIME type.
2241        mimetype: Option<String>,
2242        /// File size in bytes.
2243        size: Option<u64>,
2244    },
2245}
2246
2247/// Thumbnail data for a draft attachment.
2248#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2249pub struct DraftThumbnail {
2250    /// The filename of the thumbnail.
2251    pub filename: String,
2252    /// The thumbnail image data.
2253    pub data: Vec<u8>,
2254    /// MIME type of the thumbnail.
2255    pub mimetype: Option<String>,
2256    /// Width in pixels.
2257    pub width: Option<u64>,
2258    /// Height in pixels.
2259    pub height: Option<u64>,
2260    /// File size in bytes.
2261    pub size: Option<u64>,
2262}
2263
2264/// The type of draft of the composer.
2265#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2266pub enum ComposerDraftType {
2267    /// The draft is a new message.
2268    NewMessage,
2269    /// The draft is a reply to an event.
2270    Reply {
2271        /// The ID of the event being replied to.
2272        event_id: OwnedEventId,
2273    },
2274    /// The draft is an edit of an event.
2275    Edit {
2276        /// The ID of the event being edited.
2277        event_id: OwnedEventId,
2278    },
2279}
2280
2281impl StateStoreDataValue {
2282    /// Get this value if it is a sync token.
2283    pub fn into_sync_token(self) -> Option<String> {
2284        as_variant!(self, Self::SyncToken)
2285    }
2286
2287    /// Get this value if it is a filter.
2288    pub fn into_filter(self) -> Option<String> {
2289        as_variant!(self, Self::Filter)
2290    }
2291
2292    /// Get this value if it is a user avatar url.
2293    pub fn into_user_avatar_url(self) -> Option<OwnedMxcUri> {
2294        as_variant!(self, Self::UserAvatarUrl)
2295    }
2296
2297    /// Get this value if it is a list of recently visited rooms.
2298    pub fn into_recently_visited_rooms(self) -> Option<Vec<OwnedRoomId>> {
2299        as_variant!(self, Self::RecentlyVisitedRooms)
2300    }
2301
2302    /// Get this value if it is the data for the `UtdHookManager`.
2303    pub fn into_utd_hook_manager_data(self) -> Option<GrowableBloom> {
2304        as_variant!(self, Self::UtdHookManagerData)
2305    }
2306
2307    /// Get this value if it is a composer draft.
2308    pub fn into_composer_draft(self) -> Option<ComposerDraft> {
2309        as_variant!(self, Self::ComposerDraft)
2310    }
2311
2312    /// Get this value if it is the supported versions metadata.
2313    pub fn into_supported_versions(self) -> Option<TtlValue<SupportedVersionsResponse>> {
2314        as_variant!(self, Self::SupportedVersions)
2315    }
2316
2317    /// Get this value if it is the well-known metadata.
2318    pub fn into_well_known(self) -> Option<TtlValue<Option<WellKnownResponse>>> {
2319        as_variant!(self, Self::WellKnown)
2320    }
2321
2322    /// Get this value if it is the data for the ignored join requests.
2323    pub fn into_seen_knock_requests(self) -> Option<BTreeMap<OwnedEventId, OwnedUserId>> {
2324        as_variant!(self, Self::SeenKnockRequests)
2325    }
2326
2327    /// Get this value if it is the data for the thread subscriptions catchup
2328    /// tokens.
2329    pub fn into_thread_subscriptions_catchup_tokens(
2330        self,
2331    ) -> Option<Vec<ThreadSubscriptionCatchupToken>> {
2332        as_variant!(self, Self::ThreadSubscriptionsCatchupTokens)
2333    }
2334
2335    /// Get this value if it is the data for the capabilities the homeserver
2336    /// supports or disables.
2337    pub fn into_homeserver_capabilities(self) -> Option<TtlValue<Capabilities>> {
2338        as_variant!(self, Self::HomeserverCapabilities)
2339    }
2340}
2341
2342/// A key for key-value data.
2343#[derive(Debug, Clone, Copy)]
2344pub enum StateStoreDataKey<'a> {
2345    /// The sync token.
2346    SyncToken,
2347
2348    /// The supported versions of the server,
2349    SupportedVersions,
2350
2351    /// The well-known information of the server,
2352    WellKnown,
2353
2354    /// A filter with the given name.
2355    Filter(&'a str),
2356
2357    /// Avatar URL
2358    UserAvatarUrl(&'a UserId),
2359
2360    /// Recently visited room identifiers
2361    RecentlyVisitedRooms(&'a UserId),
2362
2363    /// Persistent data for
2364    /// `matrix_sdk_ui::unable_to_decrypt_hook::UtdHookManager`.
2365    UtdHookManagerData,
2366
2367    /// Data remembering if the client already reported that it has uploaded
2368    /// duplicate one-time keys.
2369    OneTimeKeyAlreadyUploaded,
2370
2371    /// A composer draft for the room.
2372    /// To learn more, see [`ComposerDraft`].
2373    ///
2374    /// [`ComposerDraft`]: Self::ComposerDraft
2375    ComposerDraft(&'a RoomId, Option<&'a EventId>),
2376
2377    /// A list of knock request ids marked as seen in a room.
2378    SeenKnockRequests(&'a RoomId),
2379
2380    /// A list of thread subscriptions catchup tokens.
2381    ThreadSubscriptionsCatchupTokens,
2382
2383    /// A list of capabilities that the homeserver supports.
2384    HomeserverCapabilities,
2385}
2386
2387impl StateStoreDataKey<'_> {
2388    /// Key to use for the [`SyncToken`][Self::SyncToken] variant.
2389    pub const SYNC_TOKEN: &'static str = "sync_token";
2390
2391    /// Key to use for the [`SupportedVersions`][Self::SupportedVersions]
2392    /// variant.
2393    pub const SUPPORTED_VERSIONS: &'static str = "server_capabilities"; // Note: this is the old name, kept for backwards compatibility.
2394
2395    /// Key to use for the [`WellKnown`][Self::WellKnown]
2396    /// variant.
2397    pub const WELL_KNOWN: &'static str = "well_known";
2398
2399    /// Key prefix to use for the [`Filter`][Self::Filter] variant.
2400    pub const FILTER: &'static str = "filter";
2401
2402    /// Key prefix to use for the [`UserAvatarUrl`][Self::UserAvatarUrl]
2403    /// variant.
2404    pub const USER_AVATAR_URL: &'static str = "user_avatar_url";
2405
2406    /// Key prefix to use for the
2407    /// [`RecentlyVisitedRooms`][Self::RecentlyVisitedRooms] variant.
2408    pub const RECENTLY_VISITED_ROOMS: &'static str = "recently_visited_rooms";
2409
2410    /// Key to use for the [`UtdHookManagerData`][Self::UtdHookManagerData]
2411    /// variant.
2412    pub const UTD_HOOK_MANAGER_DATA: &'static str = "utd_hook_manager_data";
2413
2414    /// Key to use for the flag remembering that we already reported that we
2415    /// uploaded duplicate one-time keys.
2416    pub const ONE_TIME_KEY_ALREADY_UPLOADED: &'static str = "one_time_key_already_uploaded";
2417
2418    /// Key prefix to use for the [`ComposerDraft`][Self::ComposerDraft]
2419    /// variant.
2420    pub const COMPOSER_DRAFT: &'static str = "composer_draft";
2421
2422    /// Key prefix to use for the
2423    /// [`SeenKnockRequests`][Self::SeenKnockRequests] variant.
2424    pub const SEEN_KNOCK_REQUESTS: &'static str = "seen_knock_requests";
2425
2426    /// Key prefix to use for the
2427    /// [`ThreadSubscriptionsCatchupTokens`][Self::ThreadSubscriptionsCatchupTokens] variant.
2428    pub const THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS: &'static str =
2429        "thread_subscriptions_catchup_tokens";
2430
2431    /// Key prefix to use for the homeserver's [`Capabilities`].
2432    pub const HOMESERVER_CAPABILITIES: &'static str = "homeserver_capabilities";
2433}
2434
2435/// Compare two thread subscription changes bump stamps, given a fixed room and
2436/// thread root event id pair.
2437///
2438/// May update the newer one to keep the previous one if needed, under some
2439/// conditions.
2440///
2441/// Returns true if the new subscription should be stored, or false if the new
2442/// subscription should be ignored.
2443pub fn compare_thread_subscription_bump_stamps(
2444    previous: Option<u64>,
2445    new: &mut Option<u64>,
2446) -> bool {
2447    match (previous, &new) {
2448        // If the previous subscription had a bump stamp, and the new one doesn't, keep the
2449        // previous one; it should be updated soon via sync anyways.
2450        (Some(prev_bump), None) => {
2451            *new = Some(prev_bump);
2452        }
2453
2454        // If the previous bump stamp is newer than the new one, don't store the value at all.
2455        (Some(prev_bump), Some(new_bump)) if *new_bump <= prev_bump => {
2456            return false;
2457        }
2458
2459        // In all other cases, keep the new bumpstamp.
2460        _ => {}
2461    }
2462
2463    true
2464}
2465
2466#[cfg(test)]
2467mod tests {
2468    mod save_locked_state_store {
2469        use std::time::Duration;
2470
2471        use assert_matches::assert_matches;
2472        use futures_util::future::{self, Either};
2473        #[cfg(all(target_family = "wasm", target_os = "unknown"))]
2474        use gloo_timers::future::sleep;
2475        use matrix_sdk_common::executor::spawn;
2476        use matrix_sdk_test::async_test;
2477        use tokio::sync::Mutex;
2478        #[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
2479        use tokio::time::sleep;
2480
2481        use crate::{
2482            StateChanges, StateStore,
2483            store::{IntoStateStore, MemoryStore, Result, SaveLockedStateStore},
2484        };
2485
2486        async fn get_store() -> Result<impl StateStore> {
2487            Ok(SaveLockedStateStore::new(MemoryStore::new()))
2488        }
2489
2490        statestore_integration_tests!();
2491
2492        #[async_test]
2493        async fn test_state_store_only_accepts_guard_for_underlying_mutex() {
2494            let state_store = SaveLockedStateStore::new(MemoryStore::new());
2495            let state_changes = StateChanges::default();
2496            state_store
2497                .save_changes_with_guard(&state_store.lock().lock().await, &state_changes)
2498                .await
2499                .expect("state store accepts guard for underlying mutex");
2500
2501            let mutex = Mutex::new(());
2502            state_store
2503                .save_changes_with_guard(&mutex.lock().await, &state_changes)
2504                .await
2505                .expect_err("state store does not accept guard for unknown mutex");
2506        }
2507
2508        #[derive(Debug)]
2509        struct Elapsed;
2510
2511        async fn timeout<F: Future + Unpin>(
2512            duration: Duration,
2513            f: F,
2514        ) -> Result<F::Output, Elapsed> {
2515            #[cfg(all(target_family = "wasm", target_os = "unknown"))]
2516            {
2517                match future::select(sleep(duration), f).await {
2518                    Either::Left(_) => return Err(Elapsed),
2519                    Either::Right((output, _)) => Ok(output),
2520                }
2521            }
2522            #[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
2523            {
2524                tokio::time::timeout(duration, f).await.map_err(|_| Elapsed)
2525            }
2526        }
2527
2528        #[async_test]
2529        async fn test_state_store_waits_to_acquire_lock_before_saving_changes() {
2530            let state_store = SaveLockedStateStore::new(MemoryStore::new().into_state_store());
2531
2532            // Acquire lock and hold it for 5 seconds
2533            let lock_task = spawn({
2534                let state_store = state_store.clone();
2535                async move {
2536                    let lock = state_store.lock();
2537                    let _guard = lock.lock().await;
2538                    sleep(Duration::from_secs(5)).await;
2539                }
2540            });
2541
2542            // Try to save changes to the state store while the lock is held by another task
2543            let save_task =
2544                spawn(async move { state_store.save_changes(&StateChanges::default()).await });
2545
2546            // Ensure that the second task does not progress until the first task has
2547            // completed and therefore release the save lock
2548            assert_matches!(future::select(lock_task, save_task).await, Either::Left((_, save_task)) => {
2549                timeout(Duration::from_millis(100), save_task)
2550                    .await
2551                    .expect("task completes before timeout")
2552                    .expect("task completes successfully")
2553                    .expect("task saves changes");
2554            });
2555        }
2556    }
2557}