matrix_sdk_base/store/
integration_tests.rs

1//! Trait and macro of integration tests for StateStore implementations.
2
3use std::collections::{BTreeMap, BTreeSet, HashMap};
4
5use assert_matches::assert_matches;
6use assert_matches2::assert_let;
7use growable_bloom_filter::GrowableBloomBuilder;
8use matrix_sdk_test::{TestResult, event_factory::EventFactory, test_json};
9use ruma::{
10    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId, RoomId, TransactionId, UserId,
11    api::{
12        FeatureFlag, MatrixVersion,
13        client::discovery::discover_homeserver::{HomeserverInfo, RtcFocusInfo},
14    },
15    event_id,
16    events::{
17        AnyGlobalAccountDataEvent, AnyMessageLikeEventContent, AnyRoomAccountDataEvent,
18        AnyStrippedStateEvent, AnySyncStateEvent, GlobalAccountDataEventType,
19        RoomAccountDataEventType, StateEventType, SyncStateEvent,
20        presence::PresenceEvent,
21        receipt::{ReceiptThread, ReceiptType},
22        room::{
23            member::{
24                MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent,
25                SyncRoomMemberEvent,
26            },
27            message::RoomMessageEventContent,
28            power_levels::RoomPowerLevelsEventContent,
29            topic::RoomTopicEventContent,
30        },
31    },
32    owned_event_id, owned_mxc_uri,
33    push::Ruleset,
34    room_id,
35    room_version_rules::AuthorizationRules,
36    serde::Raw,
37    uint, user_id,
38};
39use serde_json::{json, value::Value as JsonValue};
40
41use super::{
42    DependentQueuedRequestKind, DisplayName, DynStateStore, RoomLoadSettings,
43    SupportedVersionsResponse, TtlStoreValue, WellKnownResponse, send_queue::SentRequestKey,
44};
45use crate::{
46    RoomInfo, RoomMemberships, RoomState, StateChanges, StateStoreDataKey, StateStoreDataValue,
47    deserialized_responses::MemberEvent,
48    store::{
49        ChildTransactionId, QueueWedgeError, SerializableEventContent, StateStoreExt,
50        StoredThreadSubscription, ThreadSubscriptionStatus,
51    },
52    utils::RawSyncStateEventWithKeys,
53};
54
55/// `StateStore` integration tests.
56///
57/// This trait is not meant to be used directly, but will be used with the
58/// `statestore_integration_tests!` macro.
59#[allow(async_fn_in_trait)]
60pub trait StateStoreIntegrationTests {
61    /// Populate the given `StateStore`.
62    async fn populate(&self) -> TestResult;
63    /// Test room topic redaction.
64    async fn test_topic_redaction(&self) -> TestResult;
65    /// Test populating the store.
66    async fn test_populate_store(&self) -> TestResult;
67    /// Test room member saving.
68    async fn test_member_saving(&self) -> TestResult;
69    /// Test filter saving.
70    async fn test_filter_saving(&self) -> TestResult;
71    /// Test saving a user avatar URL.
72    async fn test_user_avatar_url_saving(&self) -> TestResult;
73    /// Test sync token saving.
74    async fn test_sync_token_saving(&self) -> TestResult;
75    /// Test UtdHookManagerData saving.
76    async fn test_utd_hook_manager_data_saving(&self) -> TestResult;
77    /// Test the saving of the OneTimeKeyAlreadyUploaded key/value data type.
78    async fn test_one_time_key_already_uploaded_data_saving(&self) -> TestResult;
79    /// Test stripped room member saving.
80    async fn test_stripped_member_saving(&self) -> TestResult;
81    /// Test room power levels saving.
82    async fn test_power_level_saving(&self) -> TestResult;
83    /// Test user receipts saving.
84    async fn test_receipts_saving(&self) -> TestResult;
85    /// Test custom storage.
86    async fn test_custom_storage(&self) -> TestResult;
87    /// Test stripped and non-stripped room member saving.
88    async fn test_stripped_non_stripped(&self) -> TestResult;
89    /// Test room removal.
90    async fn test_room_removal(&self) -> TestResult;
91    /// Test profile removal.
92    async fn test_profile_removal(&self) -> TestResult;
93    /// Test presence saving.
94    async fn test_presence_saving(&self) -> TestResult;
95    /// Test display names saving.
96    async fn test_display_names_saving(&self) -> TestResult;
97    /// Test operations with the send queue.
98    async fn test_send_queue(&self) -> TestResult;
99    /// Test priority of operations with the send queue.
100    async fn test_send_queue_priority(&self) -> TestResult;
101    /// Test operations related to send queue dependents.
102    async fn test_send_queue_dependents(&self) -> TestResult;
103    /// Test an update to a send queue dependent request.
104    async fn test_update_send_queue_dependent(&self) -> TestResult;
105    /// Test saving/restoring the supported versions of the server.
106    async fn test_supported_versions_saving(&self) -> TestResult;
107    /// Test saving/restoring the well-known info of the server.
108    async fn test_well_known_saving(&self) -> TestResult;
109    /// Test fetching room infos based on [`RoomLoadSettings`].
110    async fn test_get_room_infos(&self) -> TestResult;
111    /// Test loading thread subscriptions.
112    async fn test_thread_subscriptions(&self) -> TestResult;
113    /// Test thread subscriptions bulk upsert, including bumpstamp semantics.
114    async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult;
115}
116
117impl StateStoreIntegrationTests for DynStateStore {
118    async fn populate(&self) -> TestResult {
119        let f = EventFactory::new();
120        let mut changes = StateChanges::default();
121
122        let user_id = user_id();
123        let invited_user_id = invited_user_id();
124        let room_id = room_id();
125        let stripped_room_id = stripped_room_id();
126
127        changes.sync_token = Some("t392-516_47314_0_7_1_1_1_11444_1".to_owned());
128
129        let presence_json: &JsonValue = &test_json::PRESENCE;
130        let presence_raw = serde_json::from_value::<Raw<PresenceEvent>>(presence_json.clone())?;
131        let presence_event = presence_raw.deserialize()?;
132        changes.add_presence_event(presence_event, presence_raw);
133
134        let pushrules_raw: Raw<AnyGlobalAccountDataEvent> =
135            f.push_rules(Ruleset::server_default(user_id)).into();
136        let pushrules_event = pushrules_raw.deserialize()?;
137        changes.account_data.insert(pushrules_event.event_type(), pushrules_raw);
138
139        let mut room = RoomInfo::new(room_id, RoomState::Joined);
140        room.mark_as_left();
141
142        let tag_json: &JsonValue = &test_json::TAG;
143        let tag_raw = serde_json::from_value::<Raw<AnyRoomAccountDataEvent>>(tag_json.clone())?;
144        let tag_event = tag_raw.deserialize()?;
145        changes.add_room_account_data(room_id, tag_event, tag_raw);
146
147        let name_json: &JsonValue = &test_json::NAME;
148        let name_raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(name_json.clone())?;
149        let name_event = name_raw.deserialize()?;
150        room.handle_state_event(
151            &mut RawSyncStateEventWithKeys::try_from_raw_state_event(name_raw.clone())
152                .expect("generated state event should be valid"),
153        );
154        changes.add_state_event(room_id, name_event, name_raw);
155
156        let topic_json: &JsonValue = &test_json::TOPIC;
157        let topic_raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(topic_json.clone())?;
158        let topic_event = topic_raw.deserialize()?;
159        room.handle_state_event(
160            &mut RawSyncStateEventWithKeys::try_from_raw_state_event(topic_raw.clone())
161                .expect("generated state event should be valid"),
162        );
163        changes.add_state_event(room_id, topic_event, topic_raw);
164
165        let mut room_ambiguity_map = HashMap::new();
166        let mut room_profiles = BTreeMap::new();
167
168        let member_json: &JsonValue = &test_json::MEMBER;
169        let member_event: SyncRoomMemberEvent = serde_json::from_value(member_json.clone())?;
170        let displayname = DisplayName::new(
171            member_event.as_original().unwrap().content.displayname.as_ref().unwrap(),
172        );
173        room_ambiguity_map.insert(displayname.clone(), BTreeSet::from([user_id.to_owned()]));
174        room_profiles.insert(user_id.to_owned(), (&member_event).into());
175
176        let member_state_raw =
177            serde_json::from_value::<Raw<AnySyncStateEvent>>(member_json.clone())?;
178        let member_state_event = member_state_raw.deserialize()?;
179        changes.add_state_event(room_id, member_state_event, member_state_raw);
180
181        let invited_member_json: &JsonValue = &test_json::MEMBER_INVITE;
182        // FIXME: Should be stripped room member event
183        let invited_member_event: SyncRoomMemberEvent =
184            serde_json::from_value(invited_member_json.clone())?;
185        room_ambiguity_map.entry(displayname).or_default().insert(invited_user_id.to_owned());
186        room_profiles.insert(invited_user_id.to_owned(), (&invited_member_event).into());
187
188        let invited_member_state_raw =
189            serde_json::from_value::<Raw<AnySyncStateEvent>>(invited_member_json.clone())?;
190        let invited_member_state_event = invited_member_state_raw.deserialize()?;
191        changes.add_state_event(room_id, invited_member_state_event, invited_member_state_raw);
192
193        let receipt_content = f
194            .room(room_id)
195            .read_receipts()
196            .add(event_id!("$example"), user_id, ReceiptType::Read, ReceiptThread::Unthreaded)
197            .into_content();
198        changes.add_receipts(room_id, receipt_content);
199
200        changes.ambiguity_maps.insert(room_id.to_owned(), room_ambiguity_map);
201        changes.profiles.insert(room_id.to_owned(), room_profiles);
202        changes.add_room(room);
203
204        let mut stripped_room = RoomInfo::new(stripped_room_id, RoomState::Invited);
205
206        let stripped_name_json: &JsonValue = &test_json::NAME_STRIPPED;
207        let stripped_name_raw =
208            serde_json::from_value::<Raw<AnyStrippedStateEvent>>(stripped_name_json.clone())?;
209        let stripped_name_event = stripped_name_raw.deserialize()?;
210        stripped_room.handle_stripped_state_event(&stripped_name_event);
211        changes.stripped_state.insert(
212            stripped_room_id.to_owned(),
213            BTreeMap::from([(
214                stripped_name_event.event_type(),
215                BTreeMap::from([(
216                    stripped_name_event.state_key().to_owned(),
217                    stripped_name_raw.clone(),
218                )]),
219            )]),
220        );
221
222        changes.add_room(stripped_room);
223
224        let stripped_member_json: &JsonValue = &test_json::MEMBER_STRIPPED;
225        let stripped_member_event = Raw::new(&stripped_member_json.clone())?.cast_unchecked();
226        changes.add_stripped_member(stripped_room_id, user_id, stripped_member_event);
227
228        self.save_changes(&changes).await?;
229
230        Ok(())
231    }
232
233    async fn test_topic_redaction(&self) -> TestResult {
234        let room_id = room_id();
235        self.populate().await?;
236
237        assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
238        assert_eq!(
239            self.get_state_event_static::<RoomTopicEventContent>(room_id)
240                .await?
241                .expect("room topic found before redaction")
242                .deserialize()
243                .expect("can deserialize room topic before redaction")
244                .as_sync()
245                .expect("room topic is a sync state event")
246                .as_original()
247                .expect("room topic is not redacted yet")
248                .content
249                .topic,
250            "😀"
251        );
252
253        let mut changes = StateChanges::default();
254
255        let redaction_json: &JsonValue = &test_json::TOPIC_REDACTION;
256        let redaction_evt: Raw<_> = serde_json::from_value(redaction_json.clone())
257            .expect("topic redaction event making works");
258        let redacted_event_id: OwnedEventId = redaction_evt.get_field("redacts")?.unwrap();
259
260        changes.add_redaction(room_id, &redacted_event_id, redaction_evt);
261        self.save_changes(&changes).await?;
262
263        let redacted_event = self
264            .get_state_event_static::<RoomTopicEventContent>(room_id)
265            .await?
266            .expect("room topic found after redaction")
267            .deserialize()
268            .expect("can deserialize room topic after redaction");
269
270        assert_matches!(redacted_event.as_sync(), Some(SyncStateEvent::Redacted(_)));
271
272        Ok(())
273    }
274
275    async fn test_populate_store(&self) -> TestResult {
276        let room_id = room_id();
277        let user_id = user_id();
278        let display_name = DisplayName::new("example");
279
280        self.populate().await?;
281
282        assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
283        assert!(self.get_presence_event(user_id).await?.is_some());
284        assert_eq!(
285            self.get_room_infos(&RoomLoadSettings::default()).await?.len(),
286            2,
287            "Expected to find 2 room infos"
288        );
289        assert!(
290            self.get_account_data_event(GlobalAccountDataEventType::PushRules).await?.is_some()
291        );
292
293        assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_some());
294        assert_eq!(
295            self.get_state_events(room_id, StateEventType::RoomTopic).await?.len(),
296            1,
297            "Expected to find 1 room topic"
298        );
299        assert!(self.get_profile(room_id, user_id).await?.is_some());
300        assert!(self.get_member_event(room_id, user_id).await?.is_some());
301        assert_eq!(
302            self.get_user_ids(room_id, RoomMemberships::empty()).await?.len(),
303            2,
304            "Expected to find 2 members for room"
305        );
306        assert_eq!(
307            self.get_user_ids(room_id, RoomMemberships::INVITE).await?.len(),
308            1,
309            "Expected to find 1 invited user ids"
310        );
311        assert_eq!(
312            self.get_user_ids(room_id, RoomMemberships::JOIN).await?.len(),
313            1,
314            "Expected to find 1 joined user ids"
315        );
316        assert_eq!(
317            self.get_users_with_display_name(room_id, &display_name).await?.len(),
318            2,
319            "Expected to find 2 display names for room"
320        );
321        assert!(
322            self.get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
323                .await?
324                .is_some()
325        );
326        assert!(
327            self.get_user_room_receipt_event(
328                room_id,
329                ReceiptType::Read,
330                ReceiptThread::Unthreaded,
331                user_id
332            )
333            .await?
334            .is_some()
335        );
336        assert_eq!(
337            self.get_event_room_receipt_events(
338                room_id,
339                ReceiptType::Read,
340                ReceiptThread::Unthreaded,
341                first_receipt_event_id()
342            )
343            .await?
344            .len(),
345            1,
346            "Expected to find 1 read receipt"
347        );
348        Ok(())
349    }
350
351    async fn test_member_saving(&self) -> TestResult {
352        let room_id = room_id!("!test_member_saving:localhost");
353        let user_id = user_id();
354        let second_user_id = user_id!("@second:localhost");
355        let third_user_id = user_id!("@third:localhost");
356        let unknown_user_id = user_id!("@unknown:localhost");
357
358        // No event in store.
359        let mut user_ids = vec![user_id.to_owned()];
360        assert!(self.get_member_event(room_id, user_id).await?.is_none());
361        let member_events = self
362            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
363            .await;
364        assert!(member_events?.is_empty());
365        assert!(self.get_profile(room_id, user_id).await?.is_none());
366        let profiles = self.get_profiles(room_id, &user_ids).await;
367        assert!(profiles?.is_empty());
368
369        // One event in store.
370        let mut changes = StateChanges::default();
371        let raw_member_event = membership_event();
372        let profile = raw_member_event.deserialize()?.into();
373        changes
374            .state
375            .entry(room_id.to_owned())
376            .or_default()
377            .entry(StateEventType::RoomMember)
378            .or_default()
379            .insert(user_id.into(), raw_member_event.cast());
380        changes.profiles.entry(room_id.to_owned()).or_default().insert(user_id.to_owned(), profile);
381        self.save_changes(&changes).await?;
382
383        assert!(self.get_member_event(room_id, user_id).await?.is_some());
384        let member_events = self
385            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
386            .await;
387        assert_eq!(member_events?.len(), 1);
388        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
389        assert_eq!(members.len(), 1, "We expected to find members for the room");
390        assert!(self.get_profile(room_id, user_id).await?.is_some());
391        let profiles = self.get_profiles(room_id, &user_ids).await;
392        assert_eq!(profiles?.len(), 1);
393
394        // Several events in store.
395        let mut changes = StateChanges::default();
396        let changes_members = changes
397            .state
398            .entry(room_id.to_owned())
399            .or_default()
400            .entry(StateEventType::RoomMember)
401            .or_default();
402        let changes_profiles = changes.profiles.entry(room_id.to_owned()).or_default();
403        let raw_second_member_event =
404            custom_membership_event(second_user_id, event_id!("$second_member_event"));
405        let second_profile = raw_second_member_event.deserialize()?.into();
406        changes_members.insert(second_user_id.into(), raw_second_member_event.cast());
407        changes_profiles.insert(second_user_id.to_owned(), second_profile);
408        let raw_third_member_event =
409            custom_membership_event(third_user_id, event_id!("$third_member_event"));
410        let third_profile = raw_third_member_event.deserialize()?.into();
411        changes_members.insert(third_user_id.into(), raw_third_member_event.cast());
412        changes_profiles.insert(third_user_id.to_owned(), third_profile);
413        self.save_changes(&changes).await?;
414
415        user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
416        assert!(self.get_member_event(room_id, second_user_id).await?.is_some());
417        assert!(self.get_member_event(room_id, third_user_id).await?.is_some());
418        let member_events = self
419            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
420            .await;
421        assert_eq!(member_events?.len(), 3);
422        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
423        assert_eq!(members.len(), 3, "We expected to find members for the room");
424        assert!(self.get_profile(room_id, second_user_id).await?.is_some());
425        assert!(self.get_profile(room_id, third_user_id).await?.is_some());
426        let profiles = self.get_profiles(room_id, &user_ids).await;
427        assert_eq!(profiles?.len(), 3);
428
429        // Several events in store with one unknown.
430        user_ids.push(unknown_user_id.to_owned());
431        let member_events = self
432            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
433            .await;
434        assert_eq!(member_events?.len(), 3);
435        let profiles = self.get_profiles(room_id, &user_ids).await;
436        assert_eq!(profiles?.len(), 3);
437
438        // Empty user IDs list.
439        let member_events = self
440            .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
441                room_id,
442                &[],
443            )
444            .await;
445        assert!(member_events?.is_empty());
446        let profiles = self.get_profiles(room_id, &[]).await;
447        assert!(profiles?.is_empty());
448
449        Ok(())
450    }
451
452    async fn test_filter_saving(&self) -> TestResult {
453        let filter_name = "filter_name";
454        let filter_id = "filter_id_1234";
455
456        self.set_kv_data(
457            StateStoreDataKey::Filter(filter_name),
458            StateStoreDataValue::Filter(filter_id.to_owned()),
459        )
460        .await?;
461        assert_let!(
462            Ok(Some(StateStoreDataValue::Filter(stored_filter_id))) =
463                self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await
464        );
465        assert_eq!(stored_filter_id, filter_id);
466
467        self.remove_kv_data(StateStoreDataKey::Filter(filter_name)).await?;
468        assert_matches!(self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await, Ok(None));
469
470        Ok(())
471    }
472
473    async fn test_user_avatar_url_saving(&self) -> TestResult {
474        let user_id = user_id!("@alice:example.org");
475        let url = owned_mxc_uri!("mxc://example.org/poiuyt098");
476
477        self.set_kv_data(
478            StateStoreDataKey::UserAvatarUrl(user_id),
479            StateStoreDataValue::UserAvatarUrl(url.clone()),
480        )
481        .await?;
482
483        assert_let!(
484            Ok(Some(StateStoreDataValue::UserAvatarUrl(stored_url))) =
485                self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await
486        );
487        assert_eq!(stored_url, url);
488
489        self.remove_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await?;
490        assert_matches!(
491            self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await,
492            Ok(None)
493        );
494
495        Ok(())
496    }
497
498    async fn test_supported_versions_saving(&self) -> TestResult {
499        let versions =
500            BTreeSet::from([MatrixVersion::V1_1, MatrixVersion::V1_2, MatrixVersion::V1_11]);
501        let supported_versions = SupportedVersionsResponse {
502            versions: versions.iter().map(|version| version.as_str().unwrap().to_owned()).collect(),
503            unstable_features: [("org.matrix.experimental".to_owned(), true)].into(),
504        };
505
506        self.set_kv_data(
507            StateStoreDataKey::SupportedVersions,
508            StateStoreDataValue::SupportedVersions(TtlStoreValue::new(supported_versions.clone())),
509        )
510        .await?;
511
512        assert_let!(
513            Ok(Some(StateStoreDataValue::SupportedVersions(stored_supported_versions))) =
514                self.get_kv_data(StateStoreDataKey::SupportedVersions).await
515        );
516        assert_let!(Some(stored_supported_versions) = stored_supported_versions.into_data());
517        assert_eq!(supported_versions, stored_supported_versions);
518
519        let stored_supported = stored_supported_versions.supported_versions();
520        assert_eq!(stored_supported.versions, versions);
521        assert_eq!(stored_supported.features.len(), 1);
522        assert!(stored_supported.features.contains(&FeatureFlag::from("org.matrix.experimental")));
523
524        self.remove_kv_data(StateStoreDataKey::SupportedVersions).await?;
525        assert_matches!(self.get_kv_data(StateStoreDataKey::SupportedVersions).await, Ok(None));
526
527        Ok(())
528    }
529
530    async fn test_well_known_saving(&self) -> TestResult {
531        let well_known = WellKnownResponse {
532            homeserver: HomeserverInfo::new("matrix.example.com".to_owned()),
533            identity_server: None,
534            tile_server: None,
535            rtc_foci: vec![RtcFocusInfo::livekit("livekit.example.com".to_owned())],
536        };
537
538        self.set_kv_data(
539            StateStoreDataKey::WellKnown,
540            StateStoreDataValue::WellKnown(TtlStoreValue::new(Some(well_known.clone()))),
541        )
542        .await?;
543
544        assert_let!(
545            Ok(Some(StateStoreDataValue::WellKnown(stored_well_known))) =
546                self.get_kv_data(StateStoreDataKey::WellKnown).await
547        );
548        assert_let!(Some(stored_well_known) = stored_well_known.into_data());
549        assert_eq!(stored_well_known, Some(well_known));
550
551        self.remove_kv_data(StateStoreDataKey::WellKnown).await?;
552        assert_matches!(self.get_kv_data(StateStoreDataKey::WellKnown).await, Ok(None));
553
554        self.set_kv_data(
555            StateStoreDataKey::WellKnown,
556            StateStoreDataValue::WellKnown(TtlStoreValue::new(None)),
557        )
558        .await?;
559
560        assert_let!(
561            Ok(Some(StateStoreDataValue::WellKnown(stored_well_known))) =
562                self.get_kv_data(StateStoreDataKey::WellKnown).await
563        );
564        assert_let!(Some(stored_well_known) = stored_well_known.into_data());
565        assert_eq!(stored_well_known, None);
566
567        Ok(())
568    }
569
570    async fn test_sync_token_saving(&self) -> TestResult {
571        let sync_token_1 = "t392-516_47314_0_7_1";
572        let sync_token_2 = "t392-516_47314_0_7_2";
573
574        assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
575
576        let changes =
577            StateChanges { sync_token: Some(sync_token_1.to_owned()), ..Default::default() };
578        self.save_changes(&changes).await?;
579        assert_let!(
580            Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
581                self.get_kv_data(StateStoreDataKey::SyncToken).await
582        );
583        assert_eq!(stored_sync_token, sync_token_1);
584
585        self.set_kv_data(
586            StateStoreDataKey::SyncToken,
587            StateStoreDataValue::SyncToken(sync_token_2.to_owned()),
588        )
589        .await?;
590        assert_let!(
591            Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
592                self.get_kv_data(StateStoreDataKey::SyncToken).await
593        );
594        assert_eq!(stored_sync_token, sync_token_2);
595
596        self.remove_kv_data(StateStoreDataKey::SyncToken).await?;
597        assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
598
599        Ok(())
600    }
601
602    async fn test_utd_hook_manager_data_saving(&self) -> TestResult {
603        // Before any data is written, the getter should return None.
604        assert!(
605            self.get_kv_data(StateStoreDataKey::UtdHookManagerData)
606                .await
607                .expect("Could not read data")
608                .is_none(),
609            "Store was not empty at start"
610        );
611
612        // Put some data in the store...
613        let data = GrowableBloomBuilder::new().build();
614        self.set_kv_data(
615            StateStoreDataKey::UtdHookManagerData,
616            StateStoreDataValue::UtdHookManagerData(data.clone()),
617        )
618        .await
619        .expect("Could not save data");
620
621        // ... and check it comes back.
622        let read_data = self
623            .get_kv_data(StateStoreDataKey::UtdHookManagerData)
624            .await
625            .expect("Could not read data")
626            .expect("no data found")
627            .into_utd_hook_manager_data()
628            .expect("not UtdHookManagerData");
629
630        assert_eq!(read_data, data);
631
632        Ok(())
633    }
634
635    async fn test_one_time_key_already_uploaded_data_saving(&self) -> TestResult {
636        // Before any data is written, the getter should return None.
637        assert!(
638            self.get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded).await?.is_none(),
639            "Store was not empty at start"
640        );
641
642        self.set_kv_data(
643            StateStoreDataKey::OneTimeKeyAlreadyUploaded,
644            StateStoreDataValue::OneTimeKeyAlreadyUploaded,
645        )
646        .await?;
647
648        let data = self.get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded).await?;
649        data.expect("The loaded data should be Some");
650
651        Ok(())
652    }
653
654    async fn test_stripped_member_saving(&self) -> TestResult {
655        let room_id = room_id!("!test_stripped_member_saving:localhost");
656        let user_id = user_id();
657        let second_user_id = user_id!("@second:localhost");
658        let third_user_id = user_id!("@third:localhost");
659        let unknown_user_id = user_id!("@unknown:localhost");
660
661        // No event in store.
662        assert!(self.get_member_event(room_id, user_id).await?.is_none());
663        let member_events = self
664            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
665                room_id,
666                &[user_id.to_owned()],
667            )
668            .await;
669        assert!(member_events?.is_empty());
670
671        // One event in store.
672        let mut changes = StateChanges::default();
673        changes
674            .stripped_state
675            .entry(room_id.to_owned())
676            .or_default()
677            .entry(StateEventType::RoomMember)
678            .or_default()
679            .insert(user_id.into(), stripped_membership_event().cast());
680        self.save_changes(&changes).await?;
681
682        assert!(self.get_member_event(room_id, user_id).await?.is_some());
683        let member_events = self
684            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
685                room_id,
686                &[user_id.to_owned()],
687            )
688            .await;
689        assert_eq!(member_events?.len(), 1);
690        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
691        assert_eq!(members.len(), 1, "We expected to find members for the room");
692
693        // Several events in store.
694        let mut changes = StateChanges::default();
695        let changes_members = changes
696            .stripped_state
697            .entry(room_id.to_owned())
698            .or_default()
699            .entry(StateEventType::RoomMember)
700            .or_default();
701        changes_members
702            .insert(second_user_id.into(), custom_stripped_membership_event(second_user_id).cast());
703        changes_members
704            .insert(third_user_id.into(), custom_stripped_membership_event(third_user_id).cast());
705        self.save_changes(&changes).await?;
706
707        assert!(self.get_member_event(room_id, second_user_id).await?.is_some());
708        assert!(self.get_member_event(room_id, third_user_id).await?.is_some());
709        let member_events = self
710            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
711                room_id,
712                &[user_id.to_owned(), second_user_id.to_owned(), third_user_id.to_owned()],
713            )
714            .await;
715        assert_eq!(member_events?.len(), 3);
716        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
717        assert_eq!(members.len(), 3, "We expected to find members for the room");
718
719        // Several events in store with one unknown.
720        let member_events = self
721            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
722                room_id,
723                &[
724                    user_id.to_owned(),
725                    second_user_id.to_owned(),
726                    third_user_id.to_owned(),
727                    unknown_user_id.to_owned(),
728                ],
729            )
730            .await;
731        assert_eq!(member_events?.len(), 3);
732
733        // Empty user IDs list.
734        let member_events = self
735            .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
736                room_id,
737                &[],
738            )
739            .await;
740        assert!(member_events?.is_empty());
741
742        Ok(())
743    }
744
745    async fn test_power_level_saving(&self) -> TestResult {
746        let room_id = room_id!("!test_power_level_saving:localhost");
747
748        let raw_event = power_level_event();
749        let event = raw_event.deserialize()?;
750
751        assert!(
752            self.get_state_event(room_id, StateEventType::RoomPowerLevels, "").await?.is_none()
753        );
754        let mut changes = StateChanges::default();
755        changes.add_state_event(room_id, event, raw_event);
756
757        self.save_changes(&changes).await?;
758        assert!(
759            self.get_state_event(room_id, StateEventType::RoomPowerLevels, "").await?.is_some()
760        );
761
762        Ok(())
763    }
764
765    async fn test_receipts_saving(&self) -> TestResult {
766        let room_id = room_id!("!test_receipts_saving:localhost");
767
768        let first_event_id = event_id!("$1435641916114394fHBLK:matrix.org");
769        let second_event_id = event_id!("$fHBLK1435641916114394:matrix.org");
770
771        let first_receipt_ts = uint!(1436451550);
772        let second_receipt_ts = uint!(1436451653);
773        let third_receipt_ts = uint!(1436474532);
774
775        let first_receipt_event = serde_json::from_value(json!({
776            first_event_id: {
777                "m.read": {
778                    user_id(): {
779                        "ts": first_receipt_ts,
780                    }
781                }
782            }
783        }))?;
784
785        let second_receipt_event = serde_json::from_value(json!({
786            second_event_id: {
787                "m.read": {
788                    user_id(): {
789                        "ts": second_receipt_ts,
790                    }
791                }
792            }
793        }))?;
794
795        let third_receipt_event = serde_json::from_value(json!({
796            second_event_id: {
797                "m.read": {
798                    user_id(): {
799                        "ts": third_receipt_ts,
800                        "thread_id": "main",
801                    }
802                }
803            }
804        }))?;
805
806        assert!(
807            self.get_user_room_receipt_event(
808                room_id,
809                ReceiptType::Read,
810                ReceiptThread::Unthreaded,
811                user_id()
812            )
813            .await
814            .expect("failed to read unthreaded user room receipt")
815            .is_none()
816        );
817        assert!(
818            self.get_event_room_receipt_events(
819                room_id,
820                ReceiptType::Read,
821                ReceiptThread::Unthreaded,
822                first_event_id
823            )
824            .await
825            .expect("failed to read unthreaded event room receipt for 1")
826            .is_empty()
827        );
828        assert!(
829            self.get_event_room_receipt_events(
830                room_id,
831                ReceiptType::Read,
832                ReceiptThread::Unthreaded,
833                second_event_id
834            )
835            .await
836            .expect("failed to read unthreaded event room receipt for 2")
837            .is_empty()
838        );
839
840        let mut changes = StateChanges::default();
841        changes.add_receipts(room_id, first_receipt_event);
842
843        self.save_changes(&changes).await?;
844        let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
845            .get_user_room_receipt_event(
846                room_id,
847                ReceiptType::Read,
848                ReceiptThread::Unthreaded,
849                user_id(),
850            )
851            .await
852            .expect("failed to read unthreaded user room receipt after save")
853            .unwrap();
854        assert_eq!(unthreaded_user_receipt_event_id, first_event_id);
855        assert_eq!(unthreaded_user_receipt.ts.unwrap().0, first_receipt_ts);
856        let first_event_unthreaded_receipts = self
857            .get_event_room_receipt_events(
858                room_id,
859                ReceiptType::Read,
860                ReceiptThread::Unthreaded,
861                first_event_id,
862            )
863            .await
864            .expect("failed to read unthreaded event room receipt for 1 after save");
865        assert_eq!(
866            first_event_unthreaded_receipts.len(),
867            1,
868            "Found a wrong number of unthreaded receipts for 1 after save"
869        );
870        assert_eq!(first_event_unthreaded_receipts[0].0, user_id());
871        assert_eq!(first_event_unthreaded_receipts[0].1.ts.unwrap().0, first_receipt_ts);
872        assert!(
873            self.get_event_room_receipt_events(
874                room_id,
875                ReceiptType::Read,
876                ReceiptThread::Unthreaded,
877                second_event_id
878            )
879            .await
880            .expect("failed to read unthreaded event room receipt for 2 after save")
881            .is_empty()
882        );
883
884        let mut changes = StateChanges::default();
885        changes.add_receipts(room_id, second_receipt_event);
886
887        self.save_changes(&changes).await.expect("Saving works");
888        let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
889            .get_user_room_receipt_event(
890                room_id,
891                ReceiptType::Read,
892                ReceiptThread::Unthreaded,
893                user_id(),
894            )
895            .await
896            .expect("Getting unthreaded user room receipt after save failed")
897            .unwrap();
898        assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
899        assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
900        assert!(
901            self.get_event_room_receipt_events(
902                room_id,
903                ReceiptType::Read,
904                ReceiptThread::Unthreaded,
905                first_event_id
906            )
907            .await
908            .expect("Getting unthreaded event room receipt events for first event failed")
909            .is_empty()
910        );
911        let second_event_unthreaded_receipts = self
912            .get_event_room_receipt_events(
913                room_id,
914                ReceiptType::Read,
915                ReceiptThread::Unthreaded,
916                second_event_id,
917            )
918            .await
919            .expect("Getting unthreaded event room receipt events for second event failed");
920        assert_eq!(
921            second_event_unthreaded_receipts.len(),
922            1,
923            "Found a wrong number of unthreaded receipts for second event after save"
924        );
925        assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
926        assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
927
928        assert!(
929            self.get_user_room_receipt_event(
930                room_id,
931                ReceiptType::Read,
932                ReceiptThread::Main,
933                user_id()
934            )
935            .await
936            .expect("failed to read threaded user room receipt")
937            .is_none()
938        );
939        assert!(
940            self.get_event_room_receipt_events(
941                room_id,
942                ReceiptType::Read,
943                ReceiptThread::Main,
944                second_event_id
945            )
946            .await
947            .expect("Getting threaded event room receipts for 2 failed")
948            .is_empty()
949        );
950
951        let mut changes = StateChanges::default();
952        changes.add_receipts(room_id, third_receipt_event);
953
954        self.save_changes(&changes).await.expect("Saving works");
955        // Unthreaded receipts should not have changed.
956        let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
957            .get_user_room_receipt_event(
958                room_id,
959                ReceiptType::Read,
960                ReceiptThread::Unthreaded,
961                user_id(),
962            )
963            .await
964            .expect("Getting unthreaded user room receipt after save failed")
965            .unwrap();
966        assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
967        assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
968        let second_event_unthreaded_receipts = self
969            .get_event_room_receipt_events(
970                room_id,
971                ReceiptType::Read,
972                ReceiptThread::Unthreaded,
973                second_event_id,
974            )
975            .await
976            .expect("Getting unthreaded event room receipt events for second event failed");
977        assert_eq!(
978            second_event_unthreaded_receipts.len(),
979            1,
980            "Found a wrong number of unthreaded receipts for second event after save"
981        );
982        assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
983        assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
984        // Threaded receipts should have changed
985        let (threaded_user_receipt_event_id, threaded_user_receipt) = self
986            .get_user_room_receipt_event(room_id, ReceiptType::Read, ReceiptThread::Main, user_id())
987            .await
988            .expect("Getting threaded user room receipt after save failed")
989            .unwrap();
990        assert_eq!(threaded_user_receipt_event_id, second_event_id);
991        assert_eq!(threaded_user_receipt.ts.unwrap().0, third_receipt_ts);
992        let second_event_threaded_receipts = self
993            .get_event_room_receipt_events(
994                room_id,
995                ReceiptType::Read,
996                ReceiptThread::Main,
997                second_event_id,
998            )
999            .await
1000            .expect("Getting threaded event room receipt events for second event failed");
1001        assert_eq!(
1002            second_event_threaded_receipts.len(),
1003            1,
1004            "Found a wrong number of threaded receipts for second event after save"
1005        );
1006        assert_eq!(second_event_threaded_receipts[0].0, user_id());
1007        assert_eq!(second_event_threaded_receipts[0].1.ts.unwrap().0, third_receipt_ts);
1008
1009        Ok(())
1010    }
1011
1012    async fn test_custom_storage(&self) -> TestResult {
1013        let key = "my_key";
1014        let value = &[0, 1, 2, 3];
1015
1016        self.set_custom_value(key.as_bytes(), value.to_vec()).await?;
1017
1018        let read = self.get_custom_value(key.as_bytes()).await?;
1019
1020        assert_eq!(Some(value.as_ref()), read.as_deref());
1021
1022        Ok(())
1023    }
1024
1025    async fn test_stripped_non_stripped(&self) -> TestResult {
1026        let room_id = room_id!("!test_stripped_non_stripped:localhost");
1027        let user_id = user_id();
1028
1029        assert!(self.get_member_event(room_id, user_id).await?.is_none());
1030        assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 0);
1031
1032        let mut changes = StateChanges::default();
1033        changes
1034            .state
1035            .entry(room_id.to_owned())
1036            .or_default()
1037            .entry(StateEventType::RoomMember)
1038            .or_default()
1039            .insert(user_id.into(), membership_event().cast());
1040        changes.add_room(RoomInfo::new(room_id, RoomState::Left));
1041        self.save_changes(&changes).await?;
1042
1043        let member_event = self.get_member_event(room_id, user_id).await?.unwrap().deserialize()?;
1044        assert!(matches!(member_event, MemberEvent::Sync(_)));
1045        assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 1);
1046
1047        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
1048        assert_eq!(members, vec![user_id.to_owned()]);
1049
1050        let mut changes = StateChanges::default();
1051        changes.add_stripped_member(room_id, user_id, custom_stripped_membership_event(user_id));
1052        changes.add_room(RoomInfo::new(room_id, RoomState::Invited));
1053        self.save_changes(&changes).await?;
1054
1055        let member_event = self.get_member_event(room_id, user_id).await?.unwrap().deserialize()?;
1056        assert!(matches!(member_event, MemberEvent::Stripped(_)));
1057        assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 1);
1058
1059        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
1060        assert_eq!(members, vec![user_id.to_owned()]);
1061
1062        Ok(())
1063    }
1064
1065    async fn test_room_removal(&self) -> TestResult {
1066        let room_id = room_id();
1067        let user_id = user_id();
1068        let display_name = DisplayName::new("example");
1069        let stripped_room_id = stripped_room_id();
1070
1071        self.populate().await?;
1072
1073        {
1074            // Add a send queue request in that room.
1075            let txn = TransactionId::new();
1076            let ev =
1077                SerializableEventContent::new(&RoomMessageEventContent::text_plain("sup").into())?;
1078            self.save_send_queue_request(
1079                room_id,
1080                txn.clone(),
1081                MilliSecondsSinceUnixEpoch::now(),
1082                ev.into(),
1083                0,
1084            )
1085            .await?;
1086
1087            // Add a single dependent queue request.
1088            self.save_dependent_queued_request(
1089                room_id,
1090                &txn,
1091                ChildTransactionId::new(),
1092                MilliSecondsSinceUnixEpoch::now(),
1093                DependentQueuedRequestKind::RedactEvent,
1094            )
1095            .await?;
1096        }
1097
1098        self.remove_room(room_id).await?;
1099
1100        assert_eq!(
1101            self.get_room_infos(&RoomLoadSettings::default()).await?.len(),
1102            1,
1103            "room is still there"
1104        );
1105
1106        assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_none());
1107        assert!(
1108            self.get_state_events(room_id, StateEventType::RoomTopic).await?.is_empty(),
1109            "still state events found"
1110        );
1111        assert!(self.get_profile(room_id, user_id).await?.is_none());
1112        assert!(self.get_member_event(room_id, user_id).await?.is_none());
1113        assert!(
1114            self.get_user_ids(room_id, RoomMemberships::empty()).await?.is_empty(),
1115            "still user ids found"
1116        );
1117        assert!(
1118            self.get_user_ids(room_id, RoomMemberships::INVITE).await?.is_empty(),
1119            "still invited user ids found"
1120        );
1121        assert!(
1122            self.get_user_ids(room_id, RoomMemberships::JOIN).await?.is_empty(),
1123            "still joined users found"
1124        );
1125        assert!(
1126            self.get_users_with_display_name(room_id, &display_name).await?.is_empty(),
1127            "still display names found"
1128        );
1129        assert!(
1130            self.get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
1131                .await?
1132                .is_none()
1133        );
1134        assert!(
1135            self.get_user_room_receipt_event(
1136                room_id,
1137                ReceiptType::Read,
1138                ReceiptThread::Unthreaded,
1139                user_id
1140            )
1141            .await?
1142            .is_none()
1143        );
1144        assert!(
1145            self.get_event_room_receipt_events(
1146                room_id,
1147                ReceiptType::Read,
1148                ReceiptThread::Unthreaded,
1149                first_receipt_event_id()
1150            )
1151            .await?
1152            .is_empty(),
1153            "still event recepts in the store"
1154        );
1155        assert!(self.load_send_queue_requests(room_id).await?.is_empty());
1156        assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1157
1158        self.remove_room(stripped_room_id).await?;
1159
1160        assert!(
1161            self.get_room_infos(&RoomLoadSettings::default()).await?.is_empty(),
1162            "still room info found"
1163        );
1164        Ok(())
1165    }
1166
1167    async fn test_profile_removal(&self) -> TestResult {
1168        let room_id = room_id();
1169
1170        // Both the user id and invited user id get a profile in populate().
1171        let user_id = user_id();
1172        let invited_user_id = invited_user_id();
1173
1174        self.populate().await?;
1175
1176        let new_invite_member_json = json!({
1177            "content": {
1178                "avatar_url": "mxc://localhost/SEsfnsuifSDFSSEG",
1179                "displayname": "example after update",
1180                "membership": "invite",
1181                "reason": "Looking for support"
1182            },
1183            "event_id": "$143273582443PhrSm:localhost",
1184            "origin_server_ts": 1432735824,
1185            "room_id": room_id,
1186            "sender": user_id,
1187            "state_key": invited_user_id,
1188            "type": "m.room.member",
1189        });
1190        let new_invite_member_event: SyncRoomMemberEvent =
1191            serde_json::from_value(new_invite_member_json.clone())?;
1192
1193        let mut changes = StateChanges {
1194            // Both get their profiles deleted…
1195            profiles_to_delete: [(
1196                room_id.to_owned(),
1197                vec![user_id.to_owned(), invited_user_id.to_owned()],
1198            )]
1199            .into(),
1200
1201            // …but the invited user get a new profile.
1202            profiles: {
1203                let mut map = BTreeMap::default();
1204                map.insert(
1205                    room_id.to_owned(),
1206                    [(invited_user_id.to_owned(), new_invite_member_event.into())]
1207                        .into_iter()
1208                        .collect(),
1209                );
1210                map
1211            },
1212
1213            ..StateChanges::default()
1214        };
1215
1216        let raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(new_invite_member_json)
1217            .expect("can create sync-state-event for topic");
1218        let event = raw.deserialize()?;
1219        changes.add_state_event(room_id, event, raw);
1220
1221        self.save_changes(&changes).await?;
1222
1223        // The profile for user has been removed.
1224        assert!(self.get_profile(room_id, user_id).await?.is_none());
1225        assert!(self.get_member_event(room_id, user_id).await?.is_some());
1226
1227        // The profile for the invited user has been updated.
1228        let invited_member_event = self.get_profile(room_id, invited_user_id).await?.unwrap();
1229        assert_eq!(
1230            invited_member_event.as_original().unwrap().content.displayname.as_deref(),
1231            Some("example after update")
1232        );
1233        assert!(self.get_member_event(room_id, invited_user_id).await?.is_some());
1234
1235        Ok(())
1236    }
1237
1238    async fn test_presence_saving(&self) -> TestResult {
1239        let user_id = user_id();
1240        let second_user_id = user_id!("@second:localhost");
1241        let third_user_id = user_id!("@third:localhost");
1242        let unknown_user_id = user_id!("@unknown:localhost");
1243
1244        // No event in store.
1245        let mut user_ids = vec![user_id.to_owned()];
1246        let presence_event = self.get_presence_event(user_id).await;
1247        assert!(presence_event?.is_none());
1248        let presence_events = self.get_presence_events(&user_ids).await;
1249        assert!(presence_events?.is_empty());
1250
1251        // One event in store.
1252        let mut changes = StateChanges::default();
1253        changes.presence.insert(user_id.to_owned(), custom_presence_event(user_id));
1254        self.save_changes(&changes).await?;
1255
1256        let presence_event = self.get_presence_event(user_id).await;
1257        assert!(presence_event?.is_some());
1258        let presence_events = self.get_presence_events(&user_ids).await;
1259        assert_eq!(presence_events?.len(), 1);
1260
1261        // Several events in store.
1262        let mut changes = StateChanges::default();
1263        changes.presence.insert(second_user_id.to_owned(), custom_presence_event(second_user_id));
1264        changes.presence.insert(third_user_id.to_owned(), custom_presence_event(third_user_id));
1265        self.save_changes(&changes).await?;
1266
1267        user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
1268        let presence_event = self.get_presence_event(second_user_id).await;
1269        assert!(presence_event?.is_some());
1270        let presence_event = self.get_presence_event(third_user_id).await;
1271        assert!(presence_event?.is_some());
1272        let presence_events = self.get_presence_events(&user_ids).await;
1273        assert_eq!(presence_events?.len(), 3);
1274
1275        // Several events in store with one unknown.
1276        user_ids.push(unknown_user_id.to_owned());
1277        let member_events = self.get_presence_events(&user_ids).await;
1278        assert_eq!(member_events?.len(), 3);
1279
1280        // Empty user IDs list.
1281        let presence_events = self.get_presence_events(&[]).await;
1282        assert!(presence_events?.is_empty());
1283
1284        Ok(())
1285    }
1286
1287    async fn test_display_names_saving(&self) -> TestResult {
1288        let room_id = room_id!("!test_display_names_saving:localhost");
1289        let user_id = user_id();
1290        let user_display_name = DisplayName::new("User");
1291        let second_user_id = user_id!("@second:localhost");
1292        let third_user_id = user_id!("@third:localhost");
1293        let other_display_name = DisplayName::new("Raoul");
1294        let unknown_display_name = DisplayName::new("Unknown");
1295
1296        // No event in store.
1297        let mut display_names = vec![user_display_name.to_owned()];
1298        let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1299        assert!(users.is_empty());
1300        let names = self.get_users_with_display_names(room_id, &display_names).await?;
1301        assert!(names.is_empty());
1302
1303        // One event in store.
1304        let mut changes = StateChanges::default();
1305        changes
1306            .ambiguity_maps
1307            .entry(room_id.to_owned())
1308            .or_default()
1309            .insert(user_display_name.to_owned(), [user_id.to_owned()].into());
1310        self.save_changes(&changes).await?;
1311
1312        let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1313        assert_eq!(users.len(), 1);
1314        let names = self.get_users_with_display_names(room_id, &display_names).await?;
1315        assert_eq!(names.len(), 1);
1316        assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1317
1318        // Several events in store.
1319        let mut changes = StateChanges::default();
1320        changes.ambiguity_maps.entry(room_id.to_owned()).or_default().insert(
1321            other_display_name.to_owned(),
1322            [second_user_id.to_owned(), third_user_id.to_owned()].into(),
1323        );
1324        self.save_changes(&changes).await?;
1325
1326        display_names.push(other_display_name.to_owned());
1327        let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1328        assert_eq!(users.len(), 1);
1329        let users = self.get_users_with_display_name(room_id, &other_display_name).await?;
1330        assert_eq!(users.len(), 2);
1331        let names = self.get_users_with_display_names(room_id, &display_names).await?;
1332        assert_eq!(names.len(), 2);
1333        assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1334        assert_eq!(names.get(&other_display_name).unwrap().len(), 2);
1335
1336        // Several events in store with one unknown.
1337        display_names.push(unknown_display_name.to_owned());
1338        let names = self.get_users_with_display_names(room_id, &display_names).await?;
1339        assert_eq!(names.len(), 2);
1340
1341        // Empty user IDs list.
1342        let names = self.get_users_with_display_names(room_id, &[]).await?;
1343        assert!(names.is_empty());
1344
1345        Ok(())
1346    }
1347
1348    #[allow(clippy::needless_range_loop)]
1349    async fn test_send_queue(&self) -> TestResult {
1350        let room_id = room_id!("!test_send_queue:localhost");
1351
1352        // No queued event in store at first.
1353        let events = self.load_send_queue_requests(room_id).await?;
1354        assert!(events.is_empty());
1355
1356        // Saving one thing should work.
1357        let txn0 = TransactionId::new();
1358        let event0 =
1359            SerializableEventContent::new(&RoomMessageEventContent::text_plain("msg0").into())?;
1360        self.save_send_queue_request(
1361            room_id,
1362            txn0.clone(),
1363            MilliSecondsSinceUnixEpoch::now(),
1364            event0.into(),
1365            0,
1366        )
1367        .await?;
1368
1369        // Reading it will work.
1370        let pending = self.load_send_queue_requests(room_id).await?;
1371
1372        assert_eq!(pending.len(), 1);
1373        {
1374            assert_eq!(pending[0].transaction_id, txn0);
1375
1376            let deserialized = pending[0].as_event().unwrap().deserialize()?;
1377            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1378            assert_eq!(content.body(), "msg0");
1379
1380            assert!(!pending[0].is_wedged());
1381        }
1382
1383        // Saving another three things should work.
1384        for i in 1..=3 {
1385            let txn = TransactionId::new();
1386            let event = SerializableEventContent::new(
1387                &RoomMessageEventContent::text_plain(format!("msg{i}")).into(),
1388            )?;
1389
1390            self.save_send_queue_request(
1391                room_id,
1392                txn,
1393                MilliSecondsSinceUnixEpoch::now(),
1394                event.into(),
1395                0,
1396            )
1397            .await?;
1398        }
1399
1400        // Reading all the events should work.
1401        let pending = self.load_send_queue_requests(room_id).await?;
1402
1403        // All the events should be retrieved, in the same order.
1404        assert_eq!(pending.len(), 4);
1405
1406        assert_eq!(pending[0].transaction_id, txn0);
1407
1408        for i in 0..4 {
1409            let deserialized = pending[i].as_event().unwrap().deserialize()?;
1410            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1411            assert_eq!(content.body(), format!("msg{i}"));
1412            assert!(!pending[i].is_wedged());
1413        }
1414
1415        // Marking an event as wedged works.
1416        let txn2 = &pending[2].transaction_id;
1417        self.update_send_queue_request_status(
1418            room_id,
1419            txn2,
1420            Some(QueueWedgeError::GenericApiError { msg: "Oops".to_owned() }),
1421        )
1422        .await?;
1423
1424        // And it is reflected.
1425        let pending = self.load_send_queue_requests(room_id).await?;
1426
1427        // All the events should be retrieved, in the same order.
1428        assert_eq!(pending.len(), 4);
1429        assert_eq!(pending[0].transaction_id, txn0);
1430        assert_eq!(pending[2].transaction_id, *txn2);
1431        assert!(pending[2].is_wedged());
1432        let error = pending[2].clone().error.unwrap();
1433        let generic_error = assert_matches!(error, QueueWedgeError::GenericApiError { msg } => msg);
1434        assert_eq!(generic_error, "Oops");
1435        for i in 0..4 {
1436            if i != 2 {
1437                assert!(!pending[i].is_wedged());
1438            }
1439        }
1440
1441        // Updating an event will work, and reset its wedged state to false.
1442        let event0 = SerializableEventContent::new(
1443            &RoomMessageEventContent::text_plain("wow that's a cool test").into(),
1444        )?;
1445        self.update_send_queue_request(room_id, txn2, event0.into()).await?;
1446
1447        // And it is reflected.
1448        let pending = self.load_send_queue_requests(room_id).await?;
1449
1450        assert_eq!(pending.len(), 4);
1451        {
1452            assert_eq!(pending[2].transaction_id, *txn2);
1453
1454            let deserialized = pending[2].as_event().unwrap().deserialize()?;
1455            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1456            assert_eq!(content.body(), "wow that's a cool test");
1457
1458            assert!(!pending[2].is_wedged());
1459
1460            for i in 0..4 {
1461                if i != 2 {
1462                    let deserialized = pending[i].as_event().unwrap().deserialize()?;
1463                    assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1464                    assert_eq!(content.body(), format!("msg{i}"));
1465
1466                    assert!(!pending[i].is_wedged());
1467                }
1468            }
1469        }
1470
1471        // Removing an event works.
1472        self.remove_send_queue_request(room_id, &txn0).await?;
1473
1474        // And it is reflected.
1475        let pending = self.load_send_queue_requests(room_id).await?;
1476
1477        assert_eq!(pending.len(), 3);
1478        assert_eq!(pending[1].transaction_id, *txn2);
1479        for i in 0..3 {
1480            assert_ne!(pending[i].transaction_id, txn0);
1481        }
1482
1483        // Now add one event for two other rooms, remove one of the events, and then
1484        // query all the rooms which have outstanding unsent events.
1485
1486        // Add one event for room2.
1487        let room_id2 = room_id!("!test_send_queue_two:localhost");
1488        {
1489            let txn = TransactionId::new();
1490            let event = SerializableEventContent::new(
1491                &RoomMessageEventContent::text_plain("room2").into(),
1492            )?;
1493            self.save_send_queue_request(
1494                room_id2,
1495                txn.clone(),
1496                MilliSecondsSinceUnixEpoch::now(),
1497                event.into(),
1498                0,
1499            )
1500            .await?;
1501        }
1502
1503        // Add and remove one event for room3.
1504        {
1505            let room_id3 = room_id!("!test_send_queue_three:localhost");
1506            let txn = TransactionId::new();
1507            let event = SerializableEventContent::new(
1508                &RoomMessageEventContent::text_plain("room3").into(),
1509            )?;
1510            self.save_send_queue_request(
1511                room_id3,
1512                txn.clone(),
1513                MilliSecondsSinceUnixEpoch::now(),
1514                event.into(),
1515                0,
1516            )
1517            .await?;
1518
1519            self.remove_send_queue_request(room_id3, &txn).await?;
1520        }
1521
1522        // Query all the rooms which have unsent events. Per the previous steps,
1523        // it should be room1 and room2, not room3.
1524        let outstanding_rooms = self.load_rooms_with_unsent_requests().await?;
1525        assert_eq!(outstanding_rooms.len(), 2);
1526        assert!(outstanding_rooms.iter().any(|room| room == room_id));
1527        assert!(outstanding_rooms.iter().any(|room| room == room_id2));
1528
1529        Ok(())
1530    }
1531
1532    async fn test_send_queue_priority(&self) -> TestResult {
1533        let room_id = room_id!("!test_send_queue:localhost");
1534
1535        // No queued event in store at first.
1536        let events = self.load_send_queue_requests(room_id).await?;
1537        assert!(events.is_empty());
1538
1539        // Saving one request should work.
1540        let low0_txn = TransactionId::new();
1541        let ev0 =
1542            SerializableEventContent::new(&RoomMessageEventContent::text_plain("low0").into())?;
1543        self.save_send_queue_request(
1544            room_id,
1545            low0_txn.clone(),
1546            MilliSecondsSinceUnixEpoch::now(),
1547            ev0.into(),
1548            2,
1549        )
1550        .await?;
1551
1552        // Saving one request with higher priority should work.
1553        let high_txn = TransactionId::new();
1554        let ev1 =
1555            SerializableEventContent::new(&RoomMessageEventContent::text_plain("high").into())?;
1556        self.save_send_queue_request(
1557            room_id,
1558            high_txn.clone(),
1559            MilliSecondsSinceUnixEpoch::now(),
1560            ev1.into(),
1561            10,
1562        )
1563        .await?;
1564
1565        // Saving another request with the low priority should work.
1566        let low1_txn = TransactionId::new();
1567        let ev2 =
1568            SerializableEventContent::new(&RoomMessageEventContent::text_plain("low1").into())?;
1569        self.save_send_queue_request(
1570            room_id,
1571            low1_txn.clone(),
1572            MilliSecondsSinceUnixEpoch::now(),
1573            ev2.into(),
1574            2,
1575        )
1576        .await?;
1577
1578        // The requests should be ordered from higher priority to lower, and when equal,
1579        // should use the insertion order instead.
1580        let pending = self.load_send_queue_requests(room_id).await?;
1581
1582        assert_eq!(pending.len(), 3);
1583        {
1584            assert_eq!(pending[0].transaction_id, high_txn);
1585
1586            let deserialized = pending[0].as_event().unwrap().deserialize()?;
1587            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1588            assert_eq!(content.body(), "high");
1589        }
1590
1591        {
1592            assert_eq!(pending[1].transaction_id, low0_txn);
1593
1594            let deserialized = pending[1].as_event().unwrap().deserialize()?;
1595            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1596            assert_eq!(content.body(), "low0");
1597        }
1598
1599        {
1600            assert_eq!(pending[2].transaction_id, low1_txn);
1601
1602            let deserialized = pending[2].as_event().unwrap().deserialize()?;
1603            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1604            assert_eq!(content.body(), "low1");
1605        }
1606
1607        Ok(())
1608    }
1609
1610    async fn test_send_queue_dependents(&self) -> TestResult {
1611        let room_id = room_id!("!test_send_queue_dependents:localhost");
1612
1613        // Save one send queue event to start with.
1614        let txn0 = TransactionId::new();
1615        let event0 =
1616            SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into())?;
1617        self.save_send_queue_request(
1618            room_id,
1619            txn0.clone(),
1620            MilliSecondsSinceUnixEpoch::now(),
1621            event0.clone().into(),
1622            0,
1623        )
1624        .await?;
1625
1626        // No dependents, to start with.
1627        assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1628
1629        // Save a redaction for that event.
1630        let child_txn = ChildTransactionId::new();
1631        self.save_dependent_queued_request(
1632            room_id,
1633            &txn0,
1634            child_txn.clone(),
1635            MilliSecondsSinceUnixEpoch::now(),
1636            DependentQueuedRequestKind::RedactEvent,
1637        )
1638        .await?;
1639
1640        // It worked.
1641        let dependents = self.load_dependent_queued_requests(room_id).await?;
1642        assert_eq!(dependents.len(), 1);
1643        assert_eq!(dependents[0].parent_transaction_id, txn0);
1644        assert_eq!(dependents[0].own_transaction_id, child_txn);
1645        assert!(dependents[0].parent_key.is_none());
1646        assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1647
1648        // Update the event id.
1649        let (event, event_type) = event0.raw();
1650        let event_id = owned_event_id!("$1");
1651        let num_updated = self
1652            .mark_dependent_queued_requests_as_ready(
1653                room_id,
1654                &txn0,
1655                SentRequestKey::Event {
1656                    event_id: event_id.clone(),
1657                    event: event.clone(),
1658                    event_type: event_type.to_owned(),
1659                },
1660            )
1661            .await?;
1662        assert_eq!(num_updated, 1);
1663
1664        // It worked.
1665        let dependents = self.load_dependent_queued_requests(room_id).await?;
1666        assert_eq!(dependents.len(), 1);
1667        assert_eq!(dependents[0].parent_transaction_id, txn0);
1668        assert_eq!(dependents[0].own_transaction_id, child_txn);
1669        assert_matches!(
1670            dependents[0].parent_key.as_ref(),
1671            Some(SentRequestKey::Event {
1672                event_id: received_event_id,
1673                event: received_event,
1674                event_type: received_event_type
1675            }) => {
1676                assert_eq!(received_event_id, &event_id);
1677                assert_eq!(received_event.json().to_string(), event.json().to_string());
1678                assert_eq!(received_event_type.as_str(), event_type);
1679            }
1680        );
1681        assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1682
1683        // Now remove it.
1684        let removed = self
1685            .remove_dependent_queued_request(room_id, &dependents[0].own_transaction_id)
1686            .await?;
1687        assert!(removed);
1688
1689        // It worked.
1690        assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1691
1692        // Now, inserting a dependent event and removing the original send queue event
1693        // will NOT remove the dependent event.
1694        let txn1 = TransactionId::new();
1695        let event1 =
1696            SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into())?;
1697        self.save_send_queue_request(
1698            room_id,
1699            txn1.clone(),
1700            MilliSecondsSinceUnixEpoch::now(),
1701            event1.into(),
1702            0,
1703        )
1704        .await?;
1705
1706        self.save_dependent_queued_request(
1707            room_id,
1708            &txn0,
1709            ChildTransactionId::new(),
1710            MilliSecondsSinceUnixEpoch::now(),
1711            DependentQueuedRequestKind::RedactEvent,
1712        )
1713        .await?;
1714        assert_eq!(self.load_dependent_queued_requests(room_id).await?.len(), 1);
1715
1716        self.save_dependent_queued_request(
1717            room_id,
1718            &txn1,
1719            ChildTransactionId::new(),
1720            MilliSecondsSinceUnixEpoch::now(),
1721            DependentQueuedRequestKind::EditEvent {
1722                new_content: SerializableEventContent::new(
1723                    &RoomMessageEventContent::text_plain("edit").into(),
1724                )?,
1725            },
1726        )
1727        .await?;
1728        assert_eq!(self.load_dependent_queued_requests(room_id).await?.len(), 2);
1729
1730        // Remove event0 / txn0.
1731        let removed = self.remove_send_queue_request(room_id, &txn0).await?;
1732        assert!(removed);
1733
1734        // This has removed none of the dependent events.
1735        let dependents = self.load_dependent_queued_requests(room_id).await?;
1736        assert_eq!(dependents.len(), 2);
1737
1738        Ok(())
1739    }
1740
1741    async fn test_update_send_queue_dependent(&self) -> TestResult {
1742        let room_id = room_id!("!test_send_queue_dependents:localhost");
1743
1744        let txn = TransactionId::new();
1745
1746        // Save a dependent redaction for an event.
1747        let child_txn = ChildTransactionId::new();
1748
1749        self.save_dependent_queued_request(
1750            room_id,
1751            &txn,
1752            child_txn.clone(),
1753            MilliSecondsSinceUnixEpoch::now(),
1754            DependentQueuedRequestKind::RedactEvent,
1755        )
1756        .await?;
1757
1758        // It worked.
1759        let dependents = self.load_dependent_queued_requests(room_id).await?;
1760        assert_eq!(dependents.len(), 1);
1761        assert_eq!(dependents[0].parent_transaction_id, txn);
1762        assert_eq!(dependents[0].own_transaction_id, child_txn);
1763        assert!(dependents[0].parent_key.is_none());
1764        assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1765
1766        // Make it a reaction, instead of a redaction.
1767        self.update_dependent_queued_request(
1768            room_id,
1769            &child_txn,
1770            DependentQueuedRequestKind::ReactEvent { key: "👍".to_owned() },
1771        )
1772        .await?;
1773
1774        // It worked.
1775        let dependents = self.load_dependent_queued_requests(room_id).await?;
1776        assert_eq!(dependents.len(), 1);
1777        assert_eq!(dependents[0].parent_transaction_id, txn);
1778        assert_eq!(dependents[0].own_transaction_id, child_txn);
1779        assert!(dependents[0].parent_key.is_none());
1780        assert_matches!(
1781            &dependents[0].kind,
1782            DependentQueuedRequestKind::ReactEvent { key } => {
1783                assert_eq!(key, "👍");
1784            }
1785        );
1786
1787        Ok(())
1788    }
1789
1790    async fn test_get_room_infos(&self) -> TestResult {
1791        let room_id_0 = room_id!("!r0");
1792        let room_id_1 = room_id!("!r1");
1793        let room_id_2 = room_id!("!r2");
1794
1795        // There is no room for the moment.
1796        {
1797            assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 0);
1798        }
1799
1800        // Save rooms.
1801        let mut changes = StateChanges::default();
1802        changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
1803        changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
1804        self.save_changes(&changes).await?;
1805
1806        // We can find all the rooms with `RoomLoadSettings::All`.
1807        {
1808            let mut all_rooms = self.get_room_infos(&RoomLoadSettings::All).await?;
1809
1810            // (We need to sort by `room_id` so that the test is stable across all
1811            // `StateStore` implementations).
1812            all_rooms.sort_by(|a, b| a.room_id.cmp(&b.room_id));
1813
1814            assert_eq!(all_rooms.len(), 2);
1815            assert_eq!(all_rooms[0].room_id, room_id_0);
1816            assert_eq!(all_rooms[1].room_id, room_id_1);
1817        }
1818
1819        // We can find a single room with `RoomLoadSettings::One`.
1820        {
1821            let all_rooms =
1822                self.get_room_infos(&RoomLoadSettings::One(room_id_1.to_owned())).await?;
1823
1824            assert_eq!(all_rooms.len(), 1);
1825            assert_eq!(all_rooms[0].room_id, room_id_1);
1826        }
1827
1828        // `RoomLoadSetting::One` can result in loading zero room if the room is
1829        // unknown.
1830        {
1831            let all_rooms =
1832                self.get_room_infos(&RoomLoadSettings::One(room_id_2.to_owned())).await?;
1833
1834            assert_eq!(all_rooms.len(), 0);
1835        }
1836
1837        Ok(())
1838    }
1839
1840    async fn test_thread_subscriptions(&self) -> TestResult {
1841        let first_thread = event_id!("$t1");
1842        let second_thread = event_id!("$t2");
1843
1844        // At first, there is no thread subscription.
1845        let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1846        assert!(maybe_sub.is_none());
1847
1848        let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1849        assert!(maybe_sub.is_none());
1850
1851        // Setting the thread subscription works.
1852        self.upsert_thread_subscriptions(vec![(
1853            room_id(),
1854            first_thread,
1855            StoredThreadSubscription {
1856                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1857                bump_stamp: None,
1858            },
1859        )])
1860        .await?;
1861
1862        self.upsert_thread_subscriptions(vec![(
1863            room_id(),
1864            second_thread,
1865            StoredThreadSubscription {
1866                status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1867                bump_stamp: None,
1868            },
1869        )])
1870        .await?;
1871
1872        // Now, reading the thread subscription returns the expected status.
1873        let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1874        assert_eq!(
1875            maybe_sub,
1876            Some(StoredThreadSubscription {
1877                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1878                bump_stamp: None,
1879            })
1880        );
1881
1882        let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1883        assert_eq!(
1884            maybe_sub,
1885            Some(StoredThreadSubscription {
1886                status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1887                bump_stamp: None,
1888            })
1889        );
1890
1891        // We can override the thread subscription status.
1892        self.upsert_thread_subscriptions(vec![(
1893            room_id(),
1894            first_thread,
1895            StoredThreadSubscription {
1896                status: ThreadSubscriptionStatus::Unsubscribed,
1897                bump_stamp: None,
1898            },
1899        )])
1900        .await?;
1901
1902        // And it's correctly reflected.
1903        let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1904        assert_eq!(
1905            maybe_sub,
1906            Some(StoredThreadSubscription {
1907                status: ThreadSubscriptionStatus::Unsubscribed,
1908                bump_stamp: None,
1909            })
1910        );
1911
1912        // And the second thread is still subscribed.
1913        let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1914        assert_eq!(
1915            maybe_sub,
1916            Some(StoredThreadSubscription {
1917                status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1918                bump_stamp: None,
1919            })
1920        );
1921
1922        // We can remove a thread subscription.
1923        self.remove_thread_subscription(room_id(), second_thread).await?;
1924
1925        // And it's correctly reflected.
1926        let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1927        assert_eq!(maybe_sub, None);
1928
1929        // And the first thread is still unsubscribed.
1930        let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1931        assert_eq!(
1932            maybe_sub,
1933            Some(StoredThreadSubscription {
1934                status: ThreadSubscriptionStatus::Unsubscribed,
1935                bump_stamp: None,
1936            })
1937        );
1938
1939        // Removing a thread subscription for an unknown thread is a no-op.
1940        self.remove_thread_subscription(room_id(), second_thread).await?;
1941
1942        Ok(())
1943    }
1944
1945    async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult {
1946        let threads = [
1947            event_id!("$t1"),
1948            event_id!("$t2"),
1949            event_id!("$t3"),
1950            event_id!("$t4"),
1951            event_id!("$t5"),
1952            event_id!("$t6"),
1953        ];
1954        // Helper for building the input for `upsert_thread_subscriptions()`,
1955        // which is of the type: Vec<(&RoomId, &EventId, StoredThreadSubscription)>
1956        let build_subscription_updates = |subs: &[StoredThreadSubscription]| {
1957            threads
1958                .iter()
1959                .zip(subs)
1960                .map(|(&event_id, &sub)| (room_id(), event_id, sub))
1961                .collect::<Vec<_>>()
1962        };
1963
1964        // Test bump_stamp logic
1965        let initial_subscriptions = build_subscription_updates(&[
1966            StoredThreadSubscription {
1967                status: ThreadSubscriptionStatus::Unsubscribed,
1968                bump_stamp: None,
1969            },
1970            StoredThreadSubscription {
1971                status: ThreadSubscriptionStatus::Unsubscribed,
1972                bump_stamp: Some(14),
1973            },
1974            StoredThreadSubscription {
1975                status: ThreadSubscriptionStatus::Unsubscribed,
1976                bump_stamp: None,
1977            },
1978            StoredThreadSubscription {
1979                status: ThreadSubscriptionStatus::Unsubscribed,
1980                bump_stamp: Some(210),
1981            },
1982            StoredThreadSubscription {
1983                status: ThreadSubscriptionStatus::Unsubscribed,
1984                bump_stamp: Some(5),
1985            },
1986            StoredThreadSubscription {
1987                status: ThreadSubscriptionStatus::Unsubscribed,
1988                bump_stamp: Some(100),
1989            },
1990        ]);
1991
1992        let update_subscriptions = build_subscription_updates(&[
1993            StoredThreadSubscription {
1994                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1995                bump_stamp: None,
1996            },
1997            StoredThreadSubscription {
1998                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1999                bump_stamp: None,
2000            },
2001            StoredThreadSubscription {
2002                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2003                bump_stamp: Some(1101),
2004            },
2005            StoredThreadSubscription {
2006                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2007                bump_stamp: Some(222),
2008            },
2009            StoredThreadSubscription {
2010                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2011                bump_stamp: Some(1),
2012            },
2013            StoredThreadSubscription {
2014                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2015                bump_stamp: Some(100),
2016            },
2017        ]);
2018
2019        let expected_subscriptions = build_subscription_updates(&[
2020            // Status should be updated, because prev and new bump_stamp are both None
2021            StoredThreadSubscription {
2022                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2023                bump_stamp: None,
2024            },
2025            // Status should be updated, but keep initial bump_stamp (new is None)
2026            StoredThreadSubscription {
2027                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2028                bump_stamp: Some(14),
2029            },
2030            // Status should be updated and also bump_stamp should be updated (initial was None)
2031            StoredThreadSubscription {
2032                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2033                bump_stamp: Some(1101),
2034            },
2035            // Status should be updated and also bump_stamp should be updated (initial was lower)
2036            StoredThreadSubscription {
2037                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2038                bump_stamp: Some(222),
2039            },
2040            // Status shouldn't change, as new bump_stamp is lower
2041            StoredThreadSubscription {
2042                status: ThreadSubscriptionStatus::Unsubscribed,
2043                bump_stamp: Some(5),
2044            },
2045            // Status shouldn't change, as bump_stamp is equal to the previous one
2046            StoredThreadSubscription {
2047                status: ThreadSubscriptionStatus::Unsubscribed,
2048                bump_stamp: Some(100),
2049            },
2050        ]);
2051
2052        // Set the initial subscriptions
2053        self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?;
2054
2055        // Assert the subscriptions have been added
2056        for (room_id, thread_id, expected_sub) in &initial_subscriptions {
2057            let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2058            assert_eq!(stored_subscription, Some(*expected_sub));
2059        }
2060
2061        // Update subscriptions
2062        self.upsert_thread_subscriptions(update_subscriptions).await?;
2063
2064        // Assert the expected subscriptions and bump_stamps
2065        for (room_id, thread_id, expected_sub) in &expected_subscriptions {
2066            let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2067            assert_eq!(stored_subscription, Some(*expected_sub));
2068        }
2069
2070        // Test just state changes, but first remove previous subscriptions
2071        for (room_id, thread_id, _) in &expected_subscriptions {
2072            self.remove_thread_subscription(room_id, thread_id).await?;
2073        }
2074
2075        let initial_subscriptions = build_subscription_updates(&[
2076            StoredThreadSubscription {
2077                status: ThreadSubscriptionStatus::Unsubscribed,
2078                bump_stamp: Some(1),
2079            },
2080            StoredThreadSubscription {
2081                status: ThreadSubscriptionStatus::Subscribed { automatic: false },
2082                bump_stamp: Some(1),
2083            },
2084            StoredThreadSubscription {
2085                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2086                bump_stamp: Some(1),
2087            },
2088        ]);
2089
2090        self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?;
2091
2092        for (room_id, thread_id, expected_sub) in &initial_subscriptions {
2093            let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2094            assert_eq!(stored_subscription, Some(*expected_sub));
2095        }
2096
2097        let update_subscriptions = build_subscription_updates(&[
2098            StoredThreadSubscription {
2099                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2100                bump_stamp: Some(2),
2101            },
2102            StoredThreadSubscription {
2103                status: ThreadSubscriptionStatus::Unsubscribed,
2104                bump_stamp: Some(2),
2105            },
2106            StoredThreadSubscription {
2107                status: ThreadSubscriptionStatus::Subscribed { automatic: false },
2108                bump_stamp: Some(2),
2109            },
2110        ]);
2111
2112        self.upsert_thread_subscriptions(update_subscriptions.clone()).await?;
2113
2114        for (room_id, thread_id, expected_sub) in &update_subscriptions {
2115            let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2116            assert_eq!(stored_subscription, Some(*expected_sub));
2117        }
2118
2119        Ok(())
2120    }
2121}
2122
2123/// Macro building to allow your StateStore implementation to run the entire
2124/// tests suite locally.
2125///
2126/// You need to provide a `async fn get_store() -> StoreResult<impl StateStore>`
2127/// providing a fresh store on the same level you invoke the macro.
2128///
2129/// ## Usage Example:
2130/// ```no_run
2131/// # use matrix_sdk_base::store::{
2132/// #    StateStore,
2133/// #    MemoryStore as MyStore,
2134/// #    Result as StoreResult,
2135/// # };
2136///
2137/// #[cfg(test)]
2138/// mod tests {
2139///     use super::{MyStore, StateStore, StoreResult};
2140///
2141///     async fn get_store() -> StoreResult<impl StateStore> {
2142///         Ok(MyStore::new())
2143///     }
2144///
2145///     statestore_integration_tests!();
2146/// }
2147/// ```
2148#[allow(unused_macros, unused_extern_crates)]
2149#[macro_export]
2150macro_rules! statestore_integration_tests {
2151    () => {
2152        mod statestore_integration_tests {
2153            use matrix_sdk_test::{TestResult, async_test};
2154            use $crate::store::{IntoStateStore, StateStoreIntegrationTests};
2155
2156            use super::get_store;
2157
2158            #[async_test]
2159            async fn test_topic_redaction() -> TestResult {
2160                let store = get_store().await?.into_state_store();
2161                store.test_topic_redaction().await
2162            }
2163
2164            #[async_test]
2165            async fn test_populate_store() -> TestResult {
2166                let store = get_store().await?.into_state_store();
2167                store.test_populate_store().await
2168            }
2169
2170            #[async_test]
2171            async fn test_member_saving() -> TestResult {
2172                let store = get_store().await?.into_state_store();
2173                store.test_member_saving().await
2174            }
2175
2176            #[async_test]
2177            async fn test_filter_saving() -> TestResult {
2178                let store = get_store().await?.into_state_store();
2179                store.test_filter_saving().await
2180            }
2181
2182            #[async_test]
2183            async fn test_user_avatar_url_saving() -> TestResult {
2184                let store = get_store().await?.into_state_store();
2185                store.test_user_avatar_url_saving().await
2186            }
2187
2188            #[async_test]
2189            async fn test_supported_versions_saving() -> TestResult {
2190                let store = get_store().await?.into_state_store();
2191                store.test_supported_versions_saving().await
2192            }
2193
2194            #[async_test]
2195            async fn test_well_known_saving() -> TestResult {
2196                let store = get_store().await?.into_state_store();
2197                store.test_well_known_saving().await
2198            }
2199
2200            #[async_test]
2201            async fn test_sync_token_saving() -> TestResult {
2202                let store = get_store().await?.into_state_store();
2203                store.test_sync_token_saving().await
2204            }
2205
2206            #[async_test]
2207            async fn test_utd_hook_manager_data_saving() -> TestResult {
2208                let store = get_store().await?.into_state_store();
2209                store.test_utd_hook_manager_data_saving().await
2210            }
2211
2212            #[async_test]
2213            async fn test_one_time_key_already_uploaded_data_saving() -> TestResult {
2214                let store = get_store().await?.into_state_store();
2215                store.test_one_time_key_already_uploaded_data_saving().await
2216            }
2217
2218            #[async_test]
2219            async fn test_stripped_member_saving() -> TestResult {
2220                let store = get_store().await?.into_state_store();
2221                store.test_stripped_member_saving().await
2222            }
2223
2224            #[async_test]
2225            async fn test_power_level_saving() -> TestResult {
2226                let store = get_store().await?.into_state_store();
2227                store.test_power_level_saving().await
2228            }
2229
2230            #[async_test]
2231            async fn test_receipts_saving() -> TestResult {
2232                let store = get_store().await?.into_state_store();
2233                store.test_receipts_saving().await
2234            }
2235
2236            #[async_test]
2237            async fn test_custom_storage() -> TestResult {
2238                let store = get_store().await?.into_state_store();
2239                store.test_custom_storage().await
2240            }
2241
2242            #[async_test]
2243            async fn test_stripped_non_stripped() -> TestResult {
2244                let store = get_store().await?.into_state_store();
2245                store.test_stripped_non_stripped().await
2246            }
2247
2248            #[async_test]
2249            async fn test_room_removal() -> TestResult {
2250                let store = get_store().await?.into_state_store();
2251                store.test_room_removal().await
2252            }
2253
2254            #[async_test]
2255            async fn test_profile_removal() -> TestResult {
2256                let store = get_store().await?.into_state_store();
2257                store.test_profile_removal().await
2258            }
2259
2260            #[async_test]
2261            async fn test_presence_saving() -> TestResult {
2262                let store = get_store().await?.into_state_store();
2263                store.test_presence_saving().await
2264            }
2265
2266            #[async_test]
2267            async fn test_display_names_saving() -> TestResult {
2268                let store = get_store().await?.into_state_store();
2269                store.test_display_names_saving().await
2270            }
2271
2272            #[async_test]
2273            async fn test_send_queue() -> TestResult {
2274                let store = get_store().await?.into_state_store();
2275                store.test_send_queue().await
2276            }
2277
2278            #[async_test]
2279            async fn test_send_queue_priority() -> TestResult {
2280                let store = get_store().await?.into_state_store();
2281                store.test_send_queue_priority().await
2282            }
2283
2284            #[async_test]
2285            async fn test_send_queue_dependents() -> TestResult {
2286                let store = get_store().await?.into_state_store();
2287                store.test_send_queue_dependents().await
2288            }
2289
2290            #[async_test]
2291            async fn test_update_send_queue_dependent() -> TestResult {
2292                let store = get_store().await?.into_state_store();
2293                store.test_update_send_queue_dependent().await
2294            }
2295
2296            #[async_test]
2297            async fn test_get_room_infos() -> TestResult {
2298                let store = get_store().await?.into_state_store();
2299                store.test_get_room_infos().await
2300            }
2301
2302            #[async_test]
2303            async fn test_thread_subscriptions() -> TestResult {
2304                let store = get_store().await?.into_state_store();
2305                store.test_thread_subscriptions().await
2306            }
2307
2308            #[async_test]
2309            async fn test_thread_subscriptions_bulk_upsert() -> TestResult {
2310                let store = get_store().await?.into_state_store();
2311                store.test_thread_subscriptions_bulk_upsert().await
2312            }
2313        }
2314    };
2315}
2316
2317fn user_id() -> &'static UserId {
2318    user_id!("@example:localhost")
2319}
2320
2321fn invited_user_id() -> &'static UserId {
2322    user_id!("@invited:localhost")
2323}
2324
2325fn room_id() -> &'static RoomId {
2326    room_id!("!test:localhost")
2327}
2328
2329fn stripped_room_id() -> &'static RoomId {
2330    room_id!("!stripped:localhost")
2331}
2332
2333fn first_receipt_event_id() -> &'static EventId {
2334    event_id!("$example")
2335}
2336
2337fn power_level_event() -> Raw<AnySyncStateEvent> {
2338    let content = RoomPowerLevelsEventContent::new(&AuthorizationRules::V1);
2339
2340    let event = json!({
2341        "event_id": "$h29iv0s8:example.com",
2342        "content": content,
2343        "sender": user_id(),
2344        "type": "m.room.power_levels",
2345        "origin_server_ts": 0u64,
2346        "state_key": "",
2347    });
2348
2349    serde_json::from_value(event).unwrap()
2350}
2351
2352fn stripped_membership_event() -> Raw<StrippedRoomMemberEvent> {
2353    custom_stripped_membership_event(user_id())
2354}
2355
2356fn custom_stripped_membership_event(user_id: &UserId) -> Raw<StrippedRoomMemberEvent> {
2357    let ev_json = json!({
2358        "type": "m.room.member",
2359        "content": RoomMemberEventContent::new(MembershipState::Join),
2360        "sender": user_id,
2361        "state_key": user_id,
2362    });
2363
2364    Raw::new(&ev_json).unwrap().cast_unchecked()
2365}
2366
2367fn membership_event() -> Raw<SyncRoomMemberEvent> {
2368    custom_membership_event(user_id(), event_id!("$h29iv0s8:example.com"))
2369}
2370
2371fn custom_membership_event(user_id: &UserId, event_id: &EventId) -> Raw<SyncRoomMemberEvent> {
2372    let ev_json = json!({
2373        "type": "m.room.member",
2374        "content": RoomMemberEventContent::new(MembershipState::Join),
2375        "event_id": event_id,
2376        "origin_server_ts": 198,
2377        "sender": user_id,
2378        "state_key": user_id,
2379    });
2380
2381    Raw::new(&ev_json).unwrap().cast_unchecked()
2382}
2383
2384fn custom_presence_event(user_id: &UserId) -> Raw<PresenceEvent> {
2385    let ev_json = json!({
2386        "content": {
2387            "presence": "online"
2388        },
2389        "sender": user_id,
2390    });
2391
2392    Raw::new(&ev_json).unwrap().cast_unchecked()
2393}