matrix_sdk_base/store/
integration_tests.rs

1//! Trait and macro of integration tests for StateStore implementations.
2
3use std::collections::{BTreeMap, BTreeSet, HashMap};
4
5use assert_matches::assert_matches;
6use assert_matches2::assert_let;
7use growable_bloom_filter::GrowableBloomBuilder;
8use matrix_sdk_test::{TestResult, event_factory::EventFactory, test_json};
9use ruma::{
10    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId, RoomId, TransactionId, UserId,
11    api::{
12        FeatureFlag, MatrixVersion,
13        client::discovery::discover_homeserver::{HomeserverInfo, RtcFocusInfo},
14    },
15    event_id,
16    events::{
17        AnyGlobalAccountDataEvent, AnyMessageLikeEventContent, AnyRoomAccountDataEvent,
18        AnyStrippedStateEvent, AnySyncStateEvent, GlobalAccountDataEventType,
19        RoomAccountDataEventType, StateEventType, SyncStateEvent,
20        presence::PresenceEvent,
21        receipt::{ReceiptThread, ReceiptType},
22        room::{
23            member::{
24                MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent,
25                SyncRoomMemberEvent,
26            },
27            message::RoomMessageEventContent,
28            power_levels::RoomPowerLevelsEventContent,
29            topic::RoomTopicEventContent,
30        },
31    },
32    owned_event_id, owned_mxc_uri,
33    push::Ruleset,
34    room_id,
35    room_version_rules::AuthorizationRules,
36    serde::Raw,
37    uint, user_id,
38};
39use serde_json::{json, value::Value as JsonValue};
40
41use super::{
42    DependentQueuedRequestKind, DisplayName, DynStateStore, RoomLoadSettings,
43    SupportedVersionsResponse, TtlStoreValue, WellKnownResponse, send_queue::SentRequestKey,
44};
45use crate::{
46    RoomInfo, RoomMemberships, RoomState, StateChanges, StateStoreDataKey, StateStoreDataValue,
47    deserialized_responses::MemberEvent,
48    store::{
49        ChildTransactionId, QueueWedgeError, SerializableEventContent, StateStoreExt,
50        StoredThreadSubscription, ThreadSubscriptionStatus,
51    },
52};
53
54/// `StateStore` integration tests.
55///
56/// This trait is not meant to be used directly, but will be used with the
57/// `statestore_integration_tests!` macro.
58#[allow(async_fn_in_trait)]
59pub trait StateStoreIntegrationTests {
60    /// Populate the given `StateStore`.
61    async fn populate(&self) -> TestResult;
62    /// Test room topic redaction.
63    async fn test_topic_redaction(&self) -> TestResult;
64    /// Test populating the store.
65    async fn test_populate_store(&self) -> TestResult;
66    /// Test room member saving.
67    async fn test_member_saving(&self) -> TestResult;
68    /// Test filter saving.
69    async fn test_filter_saving(&self) -> TestResult;
70    /// Test saving a user avatar URL.
71    async fn test_user_avatar_url_saving(&self) -> TestResult;
72    /// Test sync token saving.
73    async fn test_sync_token_saving(&self) -> TestResult;
74    /// Test UtdHookManagerData saving.
75    async fn test_utd_hook_manager_data_saving(&self) -> TestResult;
76    /// Test the saving of the OneTimeKeyAlreadyUploaded key/value data type.
77    async fn test_one_time_key_already_uploaded_data_saving(&self) -> TestResult;
78    /// Test stripped room member saving.
79    async fn test_stripped_member_saving(&self) -> TestResult;
80    /// Test room power levels saving.
81    async fn test_power_level_saving(&self) -> TestResult;
82    /// Test user receipts saving.
83    async fn test_receipts_saving(&self) -> TestResult;
84    /// Test custom storage.
85    async fn test_custom_storage(&self) -> TestResult;
86    /// Test stripped and non-stripped room member saving.
87    async fn test_stripped_non_stripped(&self) -> TestResult;
88    /// Test room removal.
89    async fn test_room_removal(&self) -> TestResult;
90    /// Test profile removal.
91    async fn test_profile_removal(&self) -> TestResult;
92    /// Test presence saving.
93    async fn test_presence_saving(&self) -> TestResult;
94    /// Test display names saving.
95    async fn test_display_names_saving(&self) -> TestResult;
96    /// Test operations with the send queue.
97    async fn test_send_queue(&self) -> TestResult;
98    /// Test priority of operations with the send queue.
99    async fn test_send_queue_priority(&self) -> TestResult;
100    /// Test operations related to send queue dependents.
101    async fn test_send_queue_dependents(&self) -> TestResult;
102    /// Test an update to a send queue dependent request.
103    async fn test_update_send_queue_dependent(&self) -> TestResult;
104    /// Test saving/restoring the supported versions of the server.
105    async fn test_supported_versions_saving(&self) -> TestResult;
106    /// Test saving/restoring the well-known info of the server.
107    async fn test_well_known_saving(&self) -> TestResult;
108    /// Test fetching room infos based on [`RoomLoadSettings`].
109    async fn test_get_room_infos(&self) -> TestResult;
110    /// Test loading thread subscriptions.
111    async fn test_thread_subscriptions(&self) -> TestResult;
112    /// Test thread subscription bumpstamp semantics.
113    async fn test_thread_subscriptions_bumpstamps(&self) -> TestResult;
114    /// Test thread subscriptions bulk upsert, including bumpstamp semantics.
115    async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult;
116}
117
118impl StateStoreIntegrationTests for DynStateStore {
119    async fn populate(&self) -> TestResult {
120        let f = EventFactory::new();
121        let mut changes = StateChanges::default();
122
123        let user_id = user_id();
124        let invited_user_id = invited_user_id();
125        let room_id = room_id();
126        let stripped_room_id = stripped_room_id();
127
128        changes.sync_token = Some("t392-516_47314_0_7_1_1_1_11444_1".to_owned());
129
130        let presence_json: &JsonValue = &test_json::PRESENCE;
131        let presence_raw = serde_json::from_value::<Raw<PresenceEvent>>(presence_json.clone())?;
132        let presence_event = presence_raw.deserialize()?;
133        changes.add_presence_event(presence_event, presence_raw);
134
135        let pushrules_raw: Raw<AnyGlobalAccountDataEvent> =
136            f.push_rules(Ruleset::server_default(user_id)).into();
137        let pushrules_event = pushrules_raw.deserialize()?;
138        changes.account_data.insert(pushrules_event.event_type(), pushrules_raw);
139
140        let mut room = RoomInfo::new(room_id, RoomState::Joined);
141        room.mark_as_left();
142
143        let tag_json: &JsonValue = &test_json::TAG;
144        let tag_raw = serde_json::from_value::<Raw<AnyRoomAccountDataEvent>>(tag_json.clone())?;
145        let tag_event = tag_raw.deserialize()?;
146        changes.add_room_account_data(room_id, tag_event, tag_raw);
147
148        let name_json: &JsonValue = &test_json::NAME;
149        let name_raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(name_json.clone())?;
150        let name_event = name_raw.deserialize()?;
151        room.handle_state_event(&name_event);
152        changes.add_state_event(room_id, name_event, name_raw);
153
154        let topic_json: &JsonValue = &test_json::TOPIC;
155        let topic_raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(topic_json.clone())?;
156        let topic_event = topic_raw.deserialize()?;
157        room.handle_state_event(&topic_event);
158        changes.add_state_event(room_id, topic_event, topic_raw);
159
160        let mut room_ambiguity_map = HashMap::new();
161        let mut room_profiles = BTreeMap::new();
162
163        let member_json: &JsonValue = &test_json::MEMBER;
164        let member_event: SyncRoomMemberEvent = serde_json::from_value(member_json.clone())?;
165        let displayname = DisplayName::new(
166            member_event.as_original().unwrap().content.displayname.as_ref().unwrap(),
167        );
168        room_ambiguity_map.insert(displayname.clone(), BTreeSet::from([user_id.to_owned()]));
169        room_profiles.insert(user_id.to_owned(), (&member_event).into());
170
171        let member_state_raw =
172            serde_json::from_value::<Raw<AnySyncStateEvent>>(member_json.clone())?;
173        let member_state_event = member_state_raw.deserialize()?;
174        changes.add_state_event(room_id, member_state_event, member_state_raw);
175
176        let invited_member_json: &JsonValue = &test_json::MEMBER_INVITE;
177        // FIXME: Should be stripped room member event
178        let invited_member_event: SyncRoomMemberEvent =
179            serde_json::from_value(invited_member_json.clone())?;
180        room_ambiguity_map.entry(displayname).or_default().insert(invited_user_id.to_owned());
181        room_profiles.insert(invited_user_id.to_owned(), (&invited_member_event).into());
182
183        let invited_member_state_raw =
184            serde_json::from_value::<Raw<AnySyncStateEvent>>(invited_member_json.clone())?;
185        let invited_member_state_event = invited_member_state_raw.deserialize()?;
186        changes.add_state_event(room_id, invited_member_state_event, invited_member_state_raw);
187
188        let receipt_content = f
189            .room(room_id)
190            .read_receipts()
191            .add(event_id!("$example"), user_id, ReceiptType::Read, ReceiptThread::Unthreaded)
192            .into_content();
193        changes.add_receipts(room_id, receipt_content);
194
195        changes.ambiguity_maps.insert(room_id.to_owned(), room_ambiguity_map);
196        changes.profiles.insert(room_id.to_owned(), room_profiles);
197        changes.add_room(room);
198
199        let mut stripped_room = RoomInfo::new(stripped_room_id, RoomState::Invited);
200
201        let stripped_name_json: &JsonValue = &test_json::NAME_STRIPPED;
202        let stripped_name_raw =
203            serde_json::from_value::<Raw<AnyStrippedStateEvent>>(stripped_name_json.clone())?;
204        let stripped_name_event = stripped_name_raw.deserialize()?;
205        stripped_room.handle_stripped_state_event(&stripped_name_event);
206        changes.stripped_state.insert(
207            stripped_room_id.to_owned(),
208            BTreeMap::from([(
209                stripped_name_event.event_type(),
210                BTreeMap::from([(
211                    stripped_name_event.state_key().to_owned(),
212                    stripped_name_raw.clone(),
213                )]),
214            )]),
215        );
216
217        changes.add_room(stripped_room);
218
219        let stripped_member_json: &JsonValue = &test_json::MEMBER_STRIPPED;
220        let stripped_member_event = Raw::new(&stripped_member_json.clone())?.cast_unchecked();
221        changes.add_stripped_member(stripped_room_id, user_id, stripped_member_event);
222
223        self.save_changes(&changes).await?;
224
225        Ok(())
226    }
227
228    async fn test_topic_redaction(&self) -> TestResult {
229        let room_id = room_id();
230        self.populate().await?;
231
232        assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
233        assert_eq!(
234            self.get_state_event_static::<RoomTopicEventContent>(room_id)
235                .await?
236                .expect("room topic found before redaction")
237                .deserialize()
238                .expect("can deserialize room topic before redaction")
239                .as_sync()
240                .expect("room topic is a sync state event")
241                .as_original()
242                .expect("room topic is not redacted yet")
243                .content
244                .topic,
245            "😀"
246        );
247
248        let mut changes = StateChanges::default();
249
250        let redaction_json: &JsonValue = &test_json::TOPIC_REDACTION;
251        let redaction_evt: Raw<_> = serde_json::from_value(redaction_json.clone())
252            .expect("topic redaction event making works");
253        let redacted_event_id: OwnedEventId = redaction_evt.get_field("redacts")?.unwrap();
254
255        changes.add_redaction(room_id, &redacted_event_id, redaction_evt);
256        self.save_changes(&changes).await?;
257
258        let redacted_event = self
259            .get_state_event_static::<RoomTopicEventContent>(room_id)
260            .await?
261            .expect("room topic found after redaction")
262            .deserialize()
263            .expect("can deserialize room topic after redaction");
264
265        assert_matches!(redacted_event.as_sync(), Some(SyncStateEvent::Redacted(_)));
266
267        Ok(())
268    }
269
270    async fn test_populate_store(&self) -> TestResult {
271        let room_id = room_id();
272        let user_id = user_id();
273        let display_name = DisplayName::new("example");
274
275        self.populate().await?;
276
277        assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
278        assert!(self.get_presence_event(user_id).await?.is_some());
279        assert_eq!(
280            self.get_room_infos(&RoomLoadSettings::default()).await?.len(),
281            2,
282            "Expected to find 2 room infos"
283        );
284        assert!(
285            self.get_account_data_event(GlobalAccountDataEventType::PushRules).await?.is_some()
286        );
287
288        assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_some());
289        assert_eq!(
290            self.get_state_events(room_id, StateEventType::RoomTopic).await?.len(),
291            1,
292            "Expected to find 1 room topic"
293        );
294        assert!(self.get_profile(room_id, user_id).await?.is_some());
295        assert!(self.get_member_event(room_id, user_id).await?.is_some());
296        assert_eq!(
297            self.get_user_ids(room_id, RoomMemberships::empty()).await?.len(),
298            2,
299            "Expected to find 2 members for room"
300        );
301        assert_eq!(
302            self.get_user_ids(room_id, RoomMemberships::INVITE).await?.len(),
303            1,
304            "Expected to find 1 invited user ids"
305        );
306        assert_eq!(
307            self.get_user_ids(room_id, RoomMemberships::JOIN).await?.len(),
308            1,
309            "Expected to find 1 joined user ids"
310        );
311        assert_eq!(
312            self.get_users_with_display_name(room_id, &display_name).await?.len(),
313            2,
314            "Expected to find 2 display names for room"
315        );
316        assert!(
317            self.get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
318                .await?
319                .is_some()
320        );
321        assert!(
322            self.get_user_room_receipt_event(
323                room_id,
324                ReceiptType::Read,
325                ReceiptThread::Unthreaded,
326                user_id
327            )
328            .await?
329            .is_some()
330        );
331        assert_eq!(
332            self.get_event_room_receipt_events(
333                room_id,
334                ReceiptType::Read,
335                ReceiptThread::Unthreaded,
336                first_receipt_event_id()
337            )
338            .await?
339            .len(),
340            1,
341            "Expected to find 1 read receipt"
342        );
343        Ok(())
344    }
345
346    async fn test_member_saving(&self) -> TestResult {
347        let room_id = room_id!("!test_member_saving:localhost");
348        let user_id = user_id();
349        let second_user_id = user_id!("@second:localhost");
350        let third_user_id = user_id!("@third:localhost");
351        let unknown_user_id = user_id!("@unknown:localhost");
352
353        // No event in store.
354        let mut user_ids = vec![user_id.to_owned()];
355        assert!(self.get_member_event(room_id, user_id).await?.is_none());
356        let member_events = self
357            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
358            .await;
359        assert!(member_events?.is_empty());
360        assert!(self.get_profile(room_id, user_id).await?.is_none());
361        let profiles = self.get_profiles(room_id, &user_ids).await;
362        assert!(profiles?.is_empty());
363
364        // One event in store.
365        let mut changes = StateChanges::default();
366        let raw_member_event = membership_event();
367        let profile = raw_member_event.deserialize()?.into();
368        changes
369            .state
370            .entry(room_id.to_owned())
371            .or_default()
372            .entry(StateEventType::RoomMember)
373            .or_default()
374            .insert(user_id.into(), raw_member_event.cast());
375        changes.profiles.entry(room_id.to_owned()).or_default().insert(user_id.to_owned(), profile);
376        self.save_changes(&changes).await?;
377
378        assert!(self.get_member_event(room_id, user_id).await?.is_some());
379        let member_events = self
380            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
381            .await;
382        assert_eq!(member_events?.len(), 1);
383        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
384        assert_eq!(members.len(), 1, "We expected to find members for the room");
385        assert!(self.get_profile(room_id, user_id).await?.is_some());
386        let profiles = self.get_profiles(room_id, &user_ids).await;
387        assert_eq!(profiles?.len(), 1);
388
389        // Several events in store.
390        let mut changes = StateChanges::default();
391        let changes_members = changes
392            .state
393            .entry(room_id.to_owned())
394            .or_default()
395            .entry(StateEventType::RoomMember)
396            .or_default();
397        let changes_profiles = changes.profiles.entry(room_id.to_owned()).or_default();
398        let raw_second_member_event =
399            custom_membership_event(second_user_id, event_id!("$second_member_event"));
400        let second_profile = raw_second_member_event.deserialize()?.into();
401        changes_members.insert(second_user_id.into(), raw_second_member_event.cast());
402        changes_profiles.insert(second_user_id.to_owned(), second_profile);
403        let raw_third_member_event =
404            custom_membership_event(third_user_id, event_id!("$third_member_event"));
405        let third_profile = raw_third_member_event.deserialize()?.into();
406        changes_members.insert(third_user_id.into(), raw_third_member_event.cast());
407        changes_profiles.insert(third_user_id.to_owned(), third_profile);
408        self.save_changes(&changes).await?;
409
410        user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
411        assert!(self.get_member_event(room_id, second_user_id).await?.is_some());
412        assert!(self.get_member_event(room_id, third_user_id).await?.is_some());
413        let member_events = self
414            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
415            .await;
416        assert_eq!(member_events?.len(), 3);
417        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
418        assert_eq!(members.len(), 3, "We expected to find members for the room");
419        assert!(self.get_profile(room_id, second_user_id).await?.is_some());
420        assert!(self.get_profile(room_id, third_user_id).await?.is_some());
421        let profiles = self.get_profiles(room_id, &user_ids).await;
422        assert_eq!(profiles?.len(), 3);
423
424        // Several events in store with one unknown.
425        user_ids.push(unknown_user_id.to_owned());
426        let member_events = self
427            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
428            .await;
429        assert_eq!(member_events?.len(), 3);
430        let profiles = self.get_profiles(room_id, &user_ids).await;
431        assert_eq!(profiles?.len(), 3);
432
433        // Empty user IDs list.
434        let member_events = self
435            .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
436                room_id,
437                &[],
438            )
439            .await;
440        assert!(member_events?.is_empty());
441        let profiles = self.get_profiles(room_id, &[]).await;
442        assert!(profiles?.is_empty());
443
444        Ok(())
445    }
446
447    async fn test_filter_saving(&self) -> TestResult {
448        let filter_name = "filter_name";
449        let filter_id = "filter_id_1234";
450
451        self.set_kv_data(
452            StateStoreDataKey::Filter(filter_name),
453            StateStoreDataValue::Filter(filter_id.to_owned()),
454        )
455        .await?;
456        assert_let!(
457            Ok(Some(StateStoreDataValue::Filter(stored_filter_id))) =
458                self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await
459        );
460        assert_eq!(stored_filter_id, filter_id);
461
462        self.remove_kv_data(StateStoreDataKey::Filter(filter_name)).await?;
463        assert_matches!(self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await, Ok(None));
464
465        Ok(())
466    }
467
468    async fn test_user_avatar_url_saving(&self) -> TestResult {
469        let user_id = user_id!("@alice:example.org");
470        let url = owned_mxc_uri!("mxc://example.org/poiuyt098");
471
472        self.set_kv_data(
473            StateStoreDataKey::UserAvatarUrl(user_id),
474            StateStoreDataValue::UserAvatarUrl(url.clone()),
475        )
476        .await?;
477
478        assert_let!(
479            Ok(Some(StateStoreDataValue::UserAvatarUrl(stored_url))) =
480                self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await
481        );
482        assert_eq!(stored_url, url);
483
484        self.remove_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await?;
485        assert_matches!(
486            self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await,
487            Ok(None)
488        );
489
490        Ok(())
491    }
492
493    async fn test_supported_versions_saving(&self) -> TestResult {
494        let versions =
495            BTreeSet::from([MatrixVersion::V1_1, MatrixVersion::V1_2, MatrixVersion::V1_11]);
496        let supported_versions = SupportedVersionsResponse {
497            versions: versions.iter().map(|version| version.as_str().unwrap().to_owned()).collect(),
498            unstable_features: [("org.matrix.experimental".to_owned(), true)].into(),
499        };
500
501        self.set_kv_data(
502            StateStoreDataKey::SupportedVersions,
503            StateStoreDataValue::SupportedVersions(TtlStoreValue::new(supported_versions.clone())),
504        )
505        .await?;
506
507        assert_let!(
508            Ok(Some(StateStoreDataValue::SupportedVersions(stored_supported_versions))) =
509                self.get_kv_data(StateStoreDataKey::SupportedVersions).await
510        );
511        assert_let!(Some(stored_supported_versions) = stored_supported_versions.into_data());
512        assert_eq!(supported_versions, stored_supported_versions);
513
514        let stored_supported = stored_supported_versions.supported_versions();
515        assert_eq!(stored_supported.versions, versions);
516        assert_eq!(stored_supported.features.len(), 1);
517        assert!(stored_supported.features.contains(&FeatureFlag::from("org.matrix.experimental")));
518
519        self.remove_kv_data(StateStoreDataKey::SupportedVersions).await?;
520        assert_matches!(self.get_kv_data(StateStoreDataKey::SupportedVersions).await, Ok(None));
521
522        Ok(())
523    }
524
525    async fn test_well_known_saving(&self) -> TestResult {
526        let well_known = WellKnownResponse {
527            homeserver: HomeserverInfo::new("matrix.example.com".to_owned()),
528            identity_server: None,
529            tile_server: None,
530            rtc_foci: vec![RtcFocusInfo::livekit("livekit.example.com".to_owned())],
531        };
532
533        self.set_kv_data(
534            StateStoreDataKey::WellKnown,
535            StateStoreDataValue::WellKnown(TtlStoreValue::new(Some(well_known.clone()))),
536        )
537        .await?;
538
539        assert_let!(
540            Ok(Some(StateStoreDataValue::WellKnown(stored_well_known))) =
541                self.get_kv_data(StateStoreDataKey::WellKnown).await
542        );
543        assert_let!(Some(stored_well_known) = stored_well_known.into_data());
544        assert_eq!(stored_well_known, Some(well_known));
545
546        self.remove_kv_data(StateStoreDataKey::WellKnown).await?;
547        assert_matches!(self.get_kv_data(StateStoreDataKey::WellKnown).await, Ok(None));
548
549        self.set_kv_data(
550            StateStoreDataKey::WellKnown,
551            StateStoreDataValue::WellKnown(TtlStoreValue::new(None)),
552        )
553        .await?;
554
555        assert_let!(
556            Ok(Some(StateStoreDataValue::WellKnown(stored_well_known))) =
557                self.get_kv_data(StateStoreDataKey::WellKnown).await
558        );
559        assert_let!(Some(stored_well_known) = stored_well_known.into_data());
560        assert_eq!(stored_well_known, None);
561
562        Ok(())
563    }
564
565    async fn test_sync_token_saving(&self) -> TestResult {
566        let sync_token_1 = "t392-516_47314_0_7_1";
567        let sync_token_2 = "t392-516_47314_0_7_2";
568
569        assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
570
571        let changes =
572            StateChanges { sync_token: Some(sync_token_1.to_owned()), ..Default::default() };
573        self.save_changes(&changes).await?;
574        assert_let!(
575            Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
576                self.get_kv_data(StateStoreDataKey::SyncToken).await
577        );
578        assert_eq!(stored_sync_token, sync_token_1);
579
580        self.set_kv_data(
581            StateStoreDataKey::SyncToken,
582            StateStoreDataValue::SyncToken(sync_token_2.to_owned()),
583        )
584        .await?;
585        assert_let!(
586            Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
587                self.get_kv_data(StateStoreDataKey::SyncToken).await
588        );
589        assert_eq!(stored_sync_token, sync_token_2);
590
591        self.remove_kv_data(StateStoreDataKey::SyncToken).await?;
592        assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
593
594        Ok(())
595    }
596
597    async fn test_utd_hook_manager_data_saving(&self) -> TestResult {
598        // Before any data is written, the getter should return None.
599        assert!(
600            self.get_kv_data(StateStoreDataKey::UtdHookManagerData)
601                .await
602                .expect("Could not read data")
603                .is_none(),
604            "Store was not empty at start"
605        );
606
607        // Put some data in the store...
608        let data = GrowableBloomBuilder::new().build();
609        self.set_kv_data(
610            StateStoreDataKey::UtdHookManagerData,
611            StateStoreDataValue::UtdHookManagerData(data.clone()),
612        )
613        .await
614        .expect("Could not save data");
615
616        // ... and check it comes back.
617        let read_data = self
618            .get_kv_data(StateStoreDataKey::UtdHookManagerData)
619            .await
620            .expect("Could not read data")
621            .expect("no data found")
622            .into_utd_hook_manager_data()
623            .expect("not UtdHookManagerData");
624
625        assert_eq!(read_data, data);
626
627        Ok(())
628    }
629
630    async fn test_one_time_key_already_uploaded_data_saving(&self) -> TestResult {
631        // Before any data is written, the getter should return None.
632        assert!(
633            self.get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded).await?.is_none(),
634            "Store was not empty at start"
635        );
636
637        self.set_kv_data(
638            StateStoreDataKey::OneTimeKeyAlreadyUploaded,
639            StateStoreDataValue::OneTimeKeyAlreadyUploaded,
640        )
641        .await?;
642
643        let data = self.get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded).await?;
644        data.expect("The loaded data should be Some");
645
646        Ok(())
647    }
648
649    async fn test_stripped_member_saving(&self) -> TestResult {
650        let room_id = room_id!("!test_stripped_member_saving:localhost");
651        let user_id = user_id();
652        let second_user_id = user_id!("@second:localhost");
653        let third_user_id = user_id!("@third:localhost");
654        let unknown_user_id = user_id!("@unknown:localhost");
655
656        // No event in store.
657        assert!(self.get_member_event(room_id, user_id).await?.is_none());
658        let member_events = self
659            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
660                room_id,
661                &[user_id.to_owned()],
662            )
663            .await;
664        assert!(member_events?.is_empty());
665
666        // One event in store.
667        let mut changes = StateChanges::default();
668        changes
669            .stripped_state
670            .entry(room_id.to_owned())
671            .or_default()
672            .entry(StateEventType::RoomMember)
673            .or_default()
674            .insert(user_id.into(), stripped_membership_event().cast());
675        self.save_changes(&changes).await?;
676
677        assert!(self.get_member_event(room_id, user_id).await?.is_some());
678        let member_events = self
679            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
680                room_id,
681                &[user_id.to_owned()],
682            )
683            .await;
684        assert_eq!(member_events?.len(), 1);
685        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
686        assert_eq!(members.len(), 1, "We expected to find members for the room");
687
688        // Several events in store.
689        let mut changes = StateChanges::default();
690        let changes_members = changes
691            .stripped_state
692            .entry(room_id.to_owned())
693            .or_default()
694            .entry(StateEventType::RoomMember)
695            .or_default();
696        changes_members
697            .insert(second_user_id.into(), custom_stripped_membership_event(second_user_id).cast());
698        changes_members
699            .insert(third_user_id.into(), custom_stripped_membership_event(third_user_id).cast());
700        self.save_changes(&changes).await?;
701
702        assert!(self.get_member_event(room_id, second_user_id).await?.is_some());
703        assert!(self.get_member_event(room_id, third_user_id).await?.is_some());
704        let member_events = self
705            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
706                room_id,
707                &[user_id.to_owned(), second_user_id.to_owned(), third_user_id.to_owned()],
708            )
709            .await;
710        assert_eq!(member_events?.len(), 3);
711        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
712        assert_eq!(members.len(), 3, "We expected to find members for the room");
713
714        // Several events in store with one unknown.
715        let member_events = self
716            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
717                room_id,
718                &[
719                    user_id.to_owned(),
720                    second_user_id.to_owned(),
721                    third_user_id.to_owned(),
722                    unknown_user_id.to_owned(),
723                ],
724            )
725            .await;
726        assert_eq!(member_events?.len(), 3);
727
728        // Empty user IDs list.
729        let member_events = self
730            .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
731                room_id,
732                &[],
733            )
734            .await;
735        assert!(member_events?.is_empty());
736
737        Ok(())
738    }
739
740    async fn test_power_level_saving(&self) -> TestResult {
741        let room_id = room_id!("!test_power_level_saving:localhost");
742
743        let raw_event = power_level_event();
744        let event = raw_event.deserialize()?;
745
746        assert!(
747            self.get_state_event(room_id, StateEventType::RoomPowerLevels, "").await?.is_none()
748        );
749        let mut changes = StateChanges::default();
750        changes.add_state_event(room_id, event, raw_event);
751
752        self.save_changes(&changes).await?;
753        assert!(
754            self.get_state_event(room_id, StateEventType::RoomPowerLevels, "").await?.is_some()
755        );
756
757        Ok(())
758    }
759
760    async fn test_receipts_saving(&self) -> TestResult {
761        let room_id = room_id!("!test_receipts_saving:localhost");
762
763        let first_event_id = event_id!("$1435641916114394fHBLK:matrix.org");
764        let second_event_id = event_id!("$fHBLK1435641916114394:matrix.org");
765
766        let first_receipt_ts = uint!(1436451550);
767        let second_receipt_ts = uint!(1436451653);
768        let third_receipt_ts = uint!(1436474532);
769
770        let first_receipt_event = serde_json::from_value(json!({
771            first_event_id: {
772                "m.read": {
773                    user_id(): {
774                        "ts": first_receipt_ts,
775                    }
776                }
777            }
778        }))?;
779
780        let second_receipt_event = serde_json::from_value(json!({
781            second_event_id: {
782                "m.read": {
783                    user_id(): {
784                        "ts": second_receipt_ts,
785                    }
786                }
787            }
788        }))?;
789
790        let third_receipt_event = serde_json::from_value(json!({
791            second_event_id: {
792                "m.read": {
793                    user_id(): {
794                        "ts": third_receipt_ts,
795                        "thread_id": "main",
796                    }
797                }
798            }
799        }))?;
800
801        assert!(
802            self.get_user_room_receipt_event(
803                room_id,
804                ReceiptType::Read,
805                ReceiptThread::Unthreaded,
806                user_id()
807            )
808            .await
809            .expect("failed to read unthreaded user room receipt")
810            .is_none()
811        );
812        assert!(
813            self.get_event_room_receipt_events(
814                room_id,
815                ReceiptType::Read,
816                ReceiptThread::Unthreaded,
817                first_event_id
818            )
819            .await
820            .expect("failed to read unthreaded event room receipt for 1")
821            .is_empty()
822        );
823        assert!(
824            self.get_event_room_receipt_events(
825                room_id,
826                ReceiptType::Read,
827                ReceiptThread::Unthreaded,
828                second_event_id
829            )
830            .await
831            .expect("failed to read unthreaded event room receipt for 2")
832            .is_empty()
833        );
834
835        let mut changes = StateChanges::default();
836        changes.add_receipts(room_id, first_receipt_event);
837
838        self.save_changes(&changes).await?;
839        let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
840            .get_user_room_receipt_event(
841                room_id,
842                ReceiptType::Read,
843                ReceiptThread::Unthreaded,
844                user_id(),
845            )
846            .await
847            .expect("failed to read unthreaded user room receipt after save")
848            .unwrap();
849        assert_eq!(unthreaded_user_receipt_event_id, first_event_id);
850        assert_eq!(unthreaded_user_receipt.ts.unwrap().0, first_receipt_ts);
851        let first_event_unthreaded_receipts = self
852            .get_event_room_receipt_events(
853                room_id,
854                ReceiptType::Read,
855                ReceiptThread::Unthreaded,
856                first_event_id,
857            )
858            .await
859            .expect("failed to read unthreaded event room receipt for 1 after save");
860        assert_eq!(
861            first_event_unthreaded_receipts.len(),
862            1,
863            "Found a wrong number of unthreaded receipts for 1 after save"
864        );
865        assert_eq!(first_event_unthreaded_receipts[0].0, user_id());
866        assert_eq!(first_event_unthreaded_receipts[0].1.ts.unwrap().0, first_receipt_ts);
867        assert!(
868            self.get_event_room_receipt_events(
869                room_id,
870                ReceiptType::Read,
871                ReceiptThread::Unthreaded,
872                second_event_id
873            )
874            .await
875            .expect("failed to read unthreaded event room receipt for 2 after save")
876            .is_empty()
877        );
878
879        let mut changes = StateChanges::default();
880        changes.add_receipts(room_id, second_receipt_event);
881
882        self.save_changes(&changes).await.expect("Saving works");
883        let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
884            .get_user_room_receipt_event(
885                room_id,
886                ReceiptType::Read,
887                ReceiptThread::Unthreaded,
888                user_id(),
889            )
890            .await
891            .expect("Getting unthreaded user room receipt after save failed")
892            .unwrap();
893        assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
894        assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
895        assert!(
896            self.get_event_room_receipt_events(
897                room_id,
898                ReceiptType::Read,
899                ReceiptThread::Unthreaded,
900                first_event_id
901            )
902            .await
903            .expect("Getting unthreaded event room receipt events for first event failed")
904            .is_empty()
905        );
906        let second_event_unthreaded_receipts = self
907            .get_event_room_receipt_events(
908                room_id,
909                ReceiptType::Read,
910                ReceiptThread::Unthreaded,
911                second_event_id,
912            )
913            .await
914            .expect("Getting unthreaded event room receipt events for second event failed");
915        assert_eq!(
916            second_event_unthreaded_receipts.len(),
917            1,
918            "Found a wrong number of unthreaded receipts for second event after save"
919        );
920        assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
921        assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
922
923        assert!(
924            self.get_user_room_receipt_event(
925                room_id,
926                ReceiptType::Read,
927                ReceiptThread::Main,
928                user_id()
929            )
930            .await
931            .expect("failed to read threaded user room receipt")
932            .is_none()
933        );
934        assert!(
935            self.get_event_room_receipt_events(
936                room_id,
937                ReceiptType::Read,
938                ReceiptThread::Main,
939                second_event_id
940            )
941            .await
942            .expect("Getting threaded event room receipts for 2 failed")
943            .is_empty()
944        );
945
946        let mut changes = StateChanges::default();
947        changes.add_receipts(room_id, third_receipt_event);
948
949        self.save_changes(&changes).await.expect("Saving works");
950        // Unthreaded receipts should not have changed.
951        let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
952            .get_user_room_receipt_event(
953                room_id,
954                ReceiptType::Read,
955                ReceiptThread::Unthreaded,
956                user_id(),
957            )
958            .await
959            .expect("Getting unthreaded user room receipt after save failed")
960            .unwrap();
961        assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
962        assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
963        let second_event_unthreaded_receipts = self
964            .get_event_room_receipt_events(
965                room_id,
966                ReceiptType::Read,
967                ReceiptThread::Unthreaded,
968                second_event_id,
969            )
970            .await
971            .expect("Getting unthreaded event room receipt events for second event failed");
972        assert_eq!(
973            second_event_unthreaded_receipts.len(),
974            1,
975            "Found a wrong number of unthreaded receipts for second event after save"
976        );
977        assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
978        assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
979        // Threaded receipts should have changed
980        let (threaded_user_receipt_event_id, threaded_user_receipt) = self
981            .get_user_room_receipt_event(room_id, ReceiptType::Read, ReceiptThread::Main, user_id())
982            .await
983            .expect("Getting threaded user room receipt after save failed")
984            .unwrap();
985        assert_eq!(threaded_user_receipt_event_id, second_event_id);
986        assert_eq!(threaded_user_receipt.ts.unwrap().0, third_receipt_ts);
987        let second_event_threaded_receipts = self
988            .get_event_room_receipt_events(
989                room_id,
990                ReceiptType::Read,
991                ReceiptThread::Main,
992                second_event_id,
993            )
994            .await
995            .expect("Getting threaded event room receipt events for second event failed");
996        assert_eq!(
997            second_event_threaded_receipts.len(),
998            1,
999            "Found a wrong number of threaded receipts for second event after save"
1000        );
1001        assert_eq!(second_event_threaded_receipts[0].0, user_id());
1002        assert_eq!(second_event_threaded_receipts[0].1.ts.unwrap().0, third_receipt_ts);
1003
1004        Ok(())
1005    }
1006
1007    async fn test_custom_storage(&self) -> TestResult {
1008        let key = "my_key";
1009        let value = &[0, 1, 2, 3];
1010
1011        self.set_custom_value(key.as_bytes(), value.to_vec()).await?;
1012
1013        let read = self.get_custom_value(key.as_bytes()).await?;
1014
1015        assert_eq!(Some(value.as_ref()), read.as_deref());
1016
1017        Ok(())
1018    }
1019
1020    async fn test_stripped_non_stripped(&self) -> TestResult {
1021        let room_id = room_id!("!test_stripped_non_stripped:localhost");
1022        let user_id = user_id();
1023
1024        assert!(self.get_member_event(room_id, user_id).await?.is_none());
1025        assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 0);
1026
1027        let mut changes = StateChanges::default();
1028        changes
1029            .state
1030            .entry(room_id.to_owned())
1031            .or_default()
1032            .entry(StateEventType::RoomMember)
1033            .or_default()
1034            .insert(user_id.into(), membership_event().cast());
1035        changes.add_room(RoomInfo::new(room_id, RoomState::Left));
1036        self.save_changes(&changes).await?;
1037
1038        let member_event = self.get_member_event(room_id, user_id).await?.unwrap().deserialize()?;
1039        assert!(matches!(member_event, MemberEvent::Sync(_)));
1040        assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 1);
1041
1042        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
1043        assert_eq!(members, vec![user_id.to_owned()]);
1044
1045        let mut changes = StateChanges::default();
1046        changes.add_stripped_member(room_id, user_id, custom_stripped_membership_event(user_id));
1047        changes.add_room(RoomInfo::new(room_id, RoomState::Invited));
1048        self.save_changes(&changes).await?;
1049
1050        let member_event = self.get_member_event(room_id, user_id).await?.unwrap().deserialize()?;
1051        assert!(matches!(member_event, MemberEvent::Stripped(_)));
1052        assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 1);
1053
1054        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
1055        assert_eq!(members, vec![user_id.to_owned()]);
1056
1057        Ok(())
1058    }
1059
1060    async fn test_room_removal(&self) -> TestResult {
1061        let room_id = room_id();
1062        let user_id = user_id();
1063        let display_name = DisplayName::new("example");
1064        let stripped_room_id = stripped_room_id();
1065
1066        self.populate().await?;
1067
1068        {
1069            // Add a send queue request in that room.
1070            let txn = TransactionId::new();
1071            let ev =
1072                SerializableEventContent::new(&RoomMessageEventContent::text_plain("sup").into())?;
1073            self.save_send_queue_request(
1074                room_id,
1075                txn.clone(),
1076                MilliSecondsSinceUnixEpoch::now(),
1077                ev.into(),
1078                0,
1079            )
1080            .await?;
1081
1082            // Add a single dependent queue request.
1083            self.save_dependent_queued_request(
1084                room_id,
1085                &txn,
1086                ChildTransactionId::new(),
1087                MilliSecondsSinceUnixEpoch::now(),
1088                DependentQueuedRequestKind::RedactEvent,
1089            )
1090            .await?;
1091        }
1092
1093        self.remove_room(room_id).await?;
1094
1095        assert_eq!(
1096            self.get_room_infos(&RoomLoadSettings::default()).await?.len(),
1097            1,
1098            "room is still there"
1099        );
1100
1101        assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_none());
1102        assert!(
1103            self.get_state_events(room_id, StateEventType::RoomTopic).await?.is_empty(),
1104            "still state events found"
1105        );
1106        assert!(self.get_profile(room_id, user_id).await?.is_none());
1107        assert!(self.get_member_event(room_id, user_id).await?.is_none());
1108        assert!(
1109            self.get_user_ids(room_id, RoomMemberships::empty()).await?.is_empty(),
1110            "still user ids found"
1111        );
1112        assert!(
1113            self.get_user_ids(room_id, RoomMemberships::INVITE).await?.is_empty(),
1114            "still invited user ids found"
1115        );
1116        assert!(
1117            self.get_user_ids(room_id, RoomMemberships::JOIN).await?.is_empty(),
1118            "still joined users found"
1119        );
1120        assert!(
1121            self.get_users_with_display_name(room_id, &display_name).await?.is_empty(),
1122            "still display names found"
1123        );
1124        assert!(
1125            self.get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
1126                .await?
1127                .is_none()
1128        );
1129        assert!(
1130            self.get_user_room_receipt_event(
1131                room_id,
1132                ReceiptType::Read,
1133                ReceiptThread::Unthreaded,
1134                user_id
1135            )
1136            .await?
1137            .is_none()
1138        );
1139        assert!(
1140            self.get_event_room_receipt_events(
1141                room_id,
1142                ReceiptType::Read,
1143                ReceiptThread::Unthreaded,
1144                first_receipt_event_id()
1145            )
1146            .await?
1147            .is_empty(),
1148            "still event recepts in the store"
1149        );
1150        assert!(self.load_send_queue_requests(room_id).await?.is_empty());
1151        assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1152
1153        self.remove_room(stripped_room_id).await?;
1154
1155        assert!(
1156            self.get_room_infos(&RoomLoadSettings::default()).await?.is_empty(),
1157            "still room info found"
1158        );
1159        Ok(())
1160    }
1161
1162    async fn test_profile_removal(&self) -> TestResult {
1163        let room_id = room_id();
1164
1165        // Both the user id and invited user id get a profile in populate().
1166        let user_id = user_id();
1167        let invited_user_id = invited_user_id();
1168
1169        self.populate().await?;
1170
1171        let new_invite_member_json = json!({
1172            "content": {
1173                "avatar_url": "mxc://localhost/SEsfnsuifSDFSSEG",
1174                "displayname": "example after update",
1175                "membership": "invite",
1176                "reason": "Looking for support"
1177            },
1178            "event_id": "$143273582443PhrSm:localhost",
1179            "origin_server_ts": 1432735824,
1180            "room_id": room_id,
1181            "sender": user_id,
1182            "state_key": invited_user_id,
1183            "type": "m.room.member",
1184        });
1185        let new_invite_member_event: SyncRoomMemberEvent =
1186            serde_json::from_value(new_invite_member_json.clone())?;
1187
1188        let mut changes = StateChanges {
1189            // Both get their profiles deleted…
1190            profiles_to_delete: [(
1191                room_id.to_owned(),
1192                vec![user_id.to_owned(), invited_user_id.to_owned()],
1193            )]
1194            .into(),
1195
1196            // …but the invited user get a new profile.
1197            profiles: {
1198                let mut map = BTreeMap::default();
1199                map.insert(
1200                    room_id.to_owned(),
1201                    [(invited_user_id.to_owned(), new_invite_member_event.into())]
1202                        .into_iter()
1203                        .collect(),
1204                );
1205                map
1206            },
1207
1208            ..StateChanges::default()
1209        };
1210
1211        let raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(new_invite_member_json)
1212            .expect("can create sync-state-event for topic");
1213        let event = raw.deserialize()?;
1214        changes.add_state_event(room_id, event, raw);
1215
1216        self.save_changes(&changes).await?;
1217
1218        // The profile for user has been removed.
1219        assert!(self.get_profile(room_id, user_id).await?.is_none());
1220        assert!(self.get_member_event(room_id, user_id).await?.is_some());
1221
1222        // The profile for the invited user has been updated.
1223        let invited_member_event = self.get_profile(room_id, invited_user_id).await?.unwrap();
1224        assert_eq!(
1225            invited_member_event.as_original().unwrap().content.displayname.as_deref(),
1226            Some("example after update")
1227        );
1228        assert!(self.get_member_event(room_id, invited_user_id).await?.is_some());
1229
1230        Ok(())
1231    }
1232
1233    async fn test_presence_saving(&self) -> TestResult {
1234        let user_id = user_id();
1235        let second_user_id = user_id!("@second:localhost");
1236        let third_user_id = user_id!("@third:localhost");
1237        let unknown_user_id = user_id!("@unknown:localhost");
1238
1239        // No event in store.
1240        let mut user_ids = vec![user_id.to_owned()];
1241        let presence_event = self.get_presence_event(user_id).await;
1242        assert!(presence_event?.is_none());
1243        let presence_events = self.get_presence_events(&user_ids).await;
1244        assert!(presence_events?.is_empty());
1245
1246        // One event in store.
1247        let mut changes = StateChanges::default();
1248        changes.presence.insert(user_id.to_owned(), custom_presence_event(user_id));
1249        self.save_changes(&changes).await?;
1250
1251        let presence_event = self.get_presence_event(user_id).await;
1252        assert!(presence_event?.is_some());
1253        let presence_events = self.get_presence_events(&user_ids).await;
1254        assert_eq!(presence_events?.len(), 1);
1255
1256        // Several events in store.
1257        let mut changes = StateChanges::default();
1258        changes.presence.insert(second_user_id.to_owned(), custom_presence_event(second_user_id));
1259        changes.presence.insert(third_user_id.to_owned(), custom_presence_event(third_user_id));
1260        self.save_changes(&changes).await?;
1261
1262        user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
1263        let presence_event = self.get_presence_event(second_user_id).await;
1264        assert!(presence_event?.is_some());
1265        let presence_event = self.get_presence_event(third_user_id).await;
1266        assert!(presence_event?.is_some());
1267        let presence_events = self.get_presence_events(&user_ids).await;
1268        assert_eq!(presence_events?.len(), 3);
1269
1270        // Several events in store with one unknown.
1271        user_ids.push(unknown_user_id.to_owned());
1272        let member_events = self.get_presence_events(&user_ids).await;
1273        assert_eq!(member_events?.len(), 3);
1274
1275        // Empty user IDs list.
1276        let presence_events = self.get_presence_events(&[]).await;
1277        assert!(presence_events?.is_empty());
1278
1279        Ok(())
1280    }
1281
1282    async fn test_display_names_saving(&self) -> TestResult {
1283        let room_id = room_id!("!test_display_names_saving:localhost");
1284        let user_id = user_id();
1285        let user_display_name = DisplayName::new("User");
1286        let second_user_id = user_id!("@second:localhost");
1287        let third_user_id = user_id!("@third:localhost");
1288        let other_display_name = DisplayName::new("Raoul");
1289        let unknown_display_name = DisplayName::new("Unknown");
1290
1291        // No event in store.
1292        let mut display_names = vec![user_display_name.to_owned()];
1293        let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1294        assert!(users.is_empty());
1295        let names = self.get_users_with_display_names(room_id, &display_names).await?;
1296        assert!(names.is_empty());
1297
1298        // One event in store.
1299        let mut changes = StateChanges::default();
1300        changes
1301            .ambiguity_maps
1302            .entry(room_id.to_owned())
1303            .or_default()
1304            .insert(user_display_name.to_owned(), [user_id.to_owned()].into());
1305        self.save_changes(&changes).await?;
1306
1307        let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1308        assert_eq!(users.len(), 1);
1309        let names = self.get_users_with_display_names(room_id, &display_names).await?;
1310        assert_eq!(names.len(), 1);
1311        assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1312
1313        // Several events in store.
1314        let mut changes = StateChanges::default();
1315        changes.ambiguity_maps.entry(room_id.to_owned()).or_default().insert(
1316            other_display_name.to_owned(),
1317            [second_user_id.to_owned(), third_user_id.to_owned()].into(),
1318        );
1319        self.save_changes(&changes).await?;
1320
1321        display_names.push(other_display_name.to_owned());
1322        let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1323        assert_eq!(users.len(), 1);
1324        let users = self.get_users_with_display_name(room_id, &other_display_name).await?;
1325        assert_eq!(users.len(), 2);
1326        let names = self.get_users_with_display_names(room_id, &display_names).await?;
1327        assert_eq!(names.len(), 2);
1328        assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1329        assert_eq!(names.get(&other_display_name).unwrap().len(), 2);
1330
1331        // Several events in store with one unknown.
1332        display_names.push(unknown_display_name.to_owned());
1333        let names = self.get_users_with_display_names(room_id, &display_names).await?;
1334        assert_eq!(names.len(), 2);
1335
1336        // Empty user IDs list.
1337        let names = self.get_users_with_display_names(room_id, &[]).await?;
1338        assert!(names.is_empty());
1339
1340        Ok(())
1341    }
1342
1343    #[allow(clippy::needless_range_loop)]
1344    async fn test_send_queue(&self) -> TestResult {
1345        let room_id = room_id!("!test_send_queue:localhost");
1346
1347        // No queued event in store at first.
1348        let events = self.load_send_queue_requests(room_id).await?;
1349        assert!(events.is_empty());
1350
1351        // Saving one thing should work.
1352        let txn0 = TransactionId::new();
1353        let event0 =
1354            SerializableEventContent::new(&RoomMessageEventContent::text_plain("msg0").into())?;
1355        self.save_send_queue_request(
1356            room_id,
1357            txn0.clone(),
1358            MilliSecondsSinceUnixEpoch::now(),
1359            event0.into(),
1360            0,
1361        )
1362        .await?;
1363
1364        // Reading it will work.
1365        let pending = self.load_send_queue_requests(room_id).await?;
1366
1367        assert_eq!(pending.len(), 1);
1368        {
1369            assert_eq!(pending[0].transaction_id, txn0);
1370
1371            let deserialized = pending[0].as_event().unwrap().deserialize()?;
1372            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1373            assert_eq!(content.body(), "msg0");
1374
1375            assert!(!pending[0].is_wedged());
1376        }
1377
1378        // Saving another three things should work.
1379        for i in 1..=3 {
1380            let txn = TransactionId::new();
1381            let event = SerializableEventContent::new(
1382                &RoomMessageEventContent::text_plain(format!("msg{i}")).into(),
1383            )?;
1384
1385            self.save_send_queue_request(
1386                room_id,
1387                txn,
1388                MilliSecondsSinceUnixEpoch::now(),
1389                event.into(),
1390                0,
1391            )
1392            .await?;
1393        }
1394
1395        // Reading all the events should work.
1396        let pending = self.load_send_queue_requests(room_id).await?;
1397
1398        // All the events should be retrieved, in the same order.
1399        assert_eq!(pending.len(), 4);
1400
1401        assert_eq!(pending[0].transaction_id, txn0);
1402
1403        for i in 0..4 {
1404            let deserialized = pending[i].as_event().unwrap().deserialize()?;
1405            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1406            assert_eq!(content.body(), format!("msg{i}"));
1407            assert!(!pending[i].is_wedged());
1408        }
1409
1410        // Marking an event as wedged works.
1411        let txn2 = &pending[2].transaction_id;
1412        self.update_send_queue_request_status(
1413            room_id,
1414            txn2,
1415            Some(QueueWedgeError::GenericApiError { msg: "Oops".to_owned() }),
1416        )
1417        .await?;
1418
1419        // And it is reflected.
1420        let pending = self.load_send_queue_requests(room_id).await?;
1421
1422        // All the events should be retrieved, in the same order.
1423        assert_eq!(pending.len(), 4);
1424        assert_eq!(pending[0].transaction_id, txn0);
1425        assert_eq!(pending[2].transaction_id, *txn2);
1426        assert!(pending[2].is_wedged());
1427        let error = pending[2].clone().error.unwrap();
1428        let generic_error = assert_matches!(error, QueueWedgeError::GenericApiError { msg } => msg);
1429        assert_eq!(generic_error, "Oops");
1430        for i in 0..4 {
1431            if i != 2 {
1432                assert!(!pending[i].is_wedged());
1433            }
1434        }
1435
1436        // Updating an event will work, and reset its wedged state to false.
1437        let event0 = SerializableEventContent::new(
1438            &RoomMessageEventContent::text_plain("wow that's a cool test").into(),
1439        )?;
1440        self.update_send_queue_request(room_id, txn2, event0.into()).await?;
1441
1442        // And it is reflected.
1443        let pending = self.load_send_queue_requests(room_id).await?;
1444
1445        assert_eq!(pending.len(), 4);
1446        {
1447            assert_eq!(pending[2].transaction_id, *txn2);
1448
1449            let deserialized = pending[2].as_event().unwrap().deserialize()?;
1450            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1451            assert_eq!(content.body(), "wow that's a cool test");
1452
1453            assert!(!pending[2].is_wedged());
1454
1455            for i in 0..4 {
1456                if i != 2 {
1457                    let deserialized = pending[i].as_event().unwrap().deserialize()?;
1458                    assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1459                    assert_eq!(content.body(), format!("msg{i}"));
1460
1461                    assert!(!pending[i].is_wedged());
1462                }
1463            }
1464        }
1465
1466        // Removing an event works.
1467        self.remove_send_queue_request(room_id, &txn0).await?;
1468
1469        // And it is reflected.
1470        let pending = self.load_send_queue_requests(room_id).await?;
1471
1472        assert_eq!(pending.len(), 3);
1473        assert_eq!(pending[1].transaction_id, *txn2);
1474        for i in 0..3 {
1475            assert_ne!(pending[i].transaction_id, txn0);
1476        }
1477
1478        // Now add one event for two other rooms, remove one of the events, and then
1479        // query all the rooms which have outstanding unsent events.
1480
1481        // Add one event for room2.
1482        let room_id2 = room_id!("!test_send_queue_two:localhost");
1483        {
1484            let txn = TransactionId::new();
1485            let event = SerializableEventContent::new(
1486                &RoomMessageEventContent::text_plain("room2").into(),
1487            )?;
1488            self.save_send_queue_request(
1489                room_id2,
1490                txn.clone(),
1491                MilliSecondsSinceUnixEpoch::now(),
1492                event.into(),
1493                0,
1494            )
1495            .await?;
1496        }
1497
1498        // Add and remove one event for room3.
1499        {
1500            let room_id3 = room_id!("!test_send_queue_three:localhost");
1501            let txn = TransactionId::new();
1502            let event = SerializableEventContent::new(
1503                &RoomMessageEventContent::text_plain("room3").into(),
1504            )?;
1505            self.save_send_queue_request(
1506                room_id3,
1507                txn.clone(),
1508                MilliSecondsSinceUnixEpoch::now(),
1509                event.into(),
1510                0,
1511            )
1512            .await?;
1513
1514            self.remove_send_queue_request(room_id3, &txn).await?;
1515        }
1516
1517        // Query all the rooms which have unsent events. Per the previous steps,
1518        // it should be room1 and room2, not room3.
1519        let outstanding_rooms = self.load_rooms_with_unsent_requests().await?;
1520        assert_eq!(outstanding_rooms.len(), 2);
1521        assert!(outstanding_rooms.iter().any(|room| room == room_id));
1522        assert!(outstanding_rooms.iter().any(|room| room == room_id2));
1523
1524        Ok(())
1525    }
1526
1527    async fn test_send_queue_priority(&self) -> TestResult {
1528        let room_id = room_id!("!test_send_queue:localhost");
1529
1530        // No queued event in store at first.
1531        let events = self.load_send_queue_requests(room_id).await?;
1532        assert!(events.is_empty());
1533
1534        // Saving one request should work.
1535        let low0_txn = TransactionId::new();
1536        let ev0 =
1537            SerializableEventContent::new(&RoomMessageEventContent::text_plain("low0").into())?;
1538        self.save_send_queue_request(
1539            room_id,
1540            low0_txn.clone(),
1541            MilliSecondsSinceUnixEpoch::now(),
1542            ev0.into(),
1543            2,
1544        )
1545        .await?;
1546
1547        // Saving one request with higher priority should work.
1548        let high_txn = TransactionId::new();
1549        let ev1 =
1550            SerializableEventContent::new(&RoomMessageEventContent::text_plain("high").into())?;
1551        self.save_send_queue_request(
1552            room_id,
1553            high_txn.clone(),
1554            MilliSecondsSinceUnixEpoch::now(),
1555            ev1.into(),
1556            10,
1557        )
1558        .await?;
1559
1560        // Saving another request with the low priority should work.
1561        let low1_txn = TransactionId::new();
1562        let ev2 =
1563            SerializableEventContent::new(&RoomMessageEventContent::text_plain("low1").into())?;
1564        self.save_send_queue_request(
1565            room_id,
1566            low1_txn.clone(),
1567            MilliSecondsSinceUnixEpoch::now(),
1568            ev2.into(),
1569            2,
1570        )
1571        .await?;
1572
1573        // The requests should be ordered from higher priority to lower, and when equal,
1574        // should use the insertion order instead.
1575        let pending = self.load_send_queue_requests(room_id).await?;
1576
1577        assert_eq!(pending.len(), 3);
1578        {
1579            assert_eq!(pending[0].transaction_id, high_txn);
1580
1581            let deserialized = pending[0].as_event().unwrap().deserialize()?;
1582            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1583            assert_eq!(content.body(), "high");
1584        }
1585
1586        {
1587            assert_eq!(pending[1].transaction_id, low0_txn);
1588
1589            let deserialized = pending[1].as_event().unwrap().deserialize()?;
1590            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1591            assert_eq!(content.body(), "low0");
1592        }
1593
1594        {
1595            assert_eq!(pending[2].transaction_id, low1_txn);
1596
1597            let deserialized = pending[2].as_event().unwrap().deserialize()?;
1598            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1599            assert_eq!(content.body(), "low1");
1600        }
1601
1602        Ok(())
1603    }
1604
1605    async fn test_send_queue_dependents(&self) -> TestResult {
1606        let room_id = room_id!("!test_send_queue_dependents:localhost");
1607
1608        // Save one send queue event to start with.
1609        let txn0 = TransactionId::new();
1610        let event0 =
1611            SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into())?;
1612        self.save_send_queue_request(
1613            room_id,
1614            txn0.clone(),
1615            MilliSecondsSinceUnixEpoch::now(),
1616            event0.clone().into(),
1617            0,
1618        )
1619        .await?;
1620
1621        // No dependents, to start with.
1622        assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1623
1624        // Save a redaction for that event.
1625        let child_txn = ChildTransactionId::new();
1626        self.save_dependent_queued_request(
1627            room_id,
1628            &txn0,
1629            child_txn.clone(),
1630            MilliSecondsSinceUnixEpoch::now(),
1631            DependentQueuedRequestKind::RedactEvent,
1632        )
1633        .await?;
1634
1635        // It worked.
1636        let dependents = self.load_dependent_queued_requests(room_id).await?;
1637        assert_eq!(dependents.len(), 1);
1638        assert_eq!(dependents[0].parent_transaction_id, txn0);
1639        assert_eq!(dependents[0].own_transaction_id, child_txn);
1640        assert!(dependents[0].parent_key.is_none());
1641        assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1642
1643        // Update the event id.
1644        let (event, event_type) = event0.raw();
1645        let event_id = owned_event_id!("$1");
1646        let num_updated = self
1647            .mark_dependent_queued_requests_as_ready(
1648                room_id,
1649                &txn0,
1650                SentRequestKey::Event {
1651                    event_id: event_id.clone(),
1652                    event: event.clone(),
1653                    event_type: event_type.to_owned(),
1654                },
1655            )
1656            .await?;
1657        assert_eq!(num_updated, 1);
1658
1659        // It worked.
1660        let dependents = self.load_dependent_queued_requests(room_id).await?;
1661        assert_eq!(dependents.len(), 1);
1662        assert_eq!(dependents[0].parent_transaction_id, txn0);
1663        assert_eq!(dependents[0].own_transaction_id, child_txn);
1664        assert_matches!(
1665            dependents[0].parent_key.as_ref(),
1666            Some(SentRequestKey::Event {
1667                event_id: received_event_id,
1668                event: received_event,
1669                event_type: received_event_type
1670            }) => {
1671                assert_eq!(received_event_id, &event_id);
1672                assert_eq!(received_event.json().to_string(), event.json().to_string());
1673                assert_eq!(received_event_type.as_str(), event_type);
1674            }
1675        );
1676        assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1677
1678        // Now remove it.
1679        let removed = self
1680            .remove_dependent_queued_request(room_id, &dependents[0].own_transaction_id)
1681            .await?;
1682        assert!(removed);
1683
1684        // It worked.
1685        assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1686
1687        // Now, inserting a dependent event and removing the original send queue event
1688        // will NOT remove the dependent event.
1689        let txn1 = TransactionId::new();
1690        let event1 =
1691            SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into())?;
1692        self.save_send_queue_request(
1693            room_id,
1694            txn1.clone(),
1695            MilliSecondsSinceUnixEpoch::now(),
1696            event1.into(),
1697            0,
1698        )
1699        .await?;
1700
1701        self.save_dependent_queued_request(
1702            room_id,
1703            &txn0,
1704            ChildTransactionId::new(),
1705            MilliSecondsSinceUnixEpoch::now(),
1706            DependentQueuedRequestKind::RedactEvent,
1707        )
1708        .await?;
1709        assert_eq!(self.load_dependent_queued_requests(room_id).await?.len(), 1);
1710
1711        self.save_dependent_queued_request(
1712            room_id,
1713            &txn1,
1714            ChildTransactionId::new(),
1715            MilliSecondsSinceUnixEpoch::now(),
1716            DependentQueuedRequestKind::EditEvent {
1717                new_content: SerializableEventContent::new(
1718                    &RoomMessageEventContent::text_plain("edit").into(),
1719                )?,
1720            },
1721        )
1722        .await?;
1723        assert_eq!(self.load_dependent_queued_requests(room_id).await?.len(), 2);
1724
1725        // Remove event0 / txn0.
1726        let removed = self.remove_send_queue_request(room_id, &txn0).await?;
1727        assert!(removed);
1728
1729        // This has removed none of the dependent events.
1730        let dependents = self.load_dependent_queued_requests(room_id).await?;
1731        assert_eq!(dependents.len(), 2);
1732
1733        Ok(())
1734    }
1735
1736    async fn test_update_send_queue_dependent(&self) -> TestResult {
1737        let room_id = room_id!("!test_send_queue_dependents:localhost");
1738
1739        let txn = TransactionId::new();
1740
1741        // Save a dependent redaction for an event.
1742        let child_txn = ChildTransactionId::new();
1743
1744        self.save_dependent_queued_request(
1745            room_id,
1746            &txn,
1747            child_txn.clone(),
1748            MilliSecondsSinceUnixEpoch::now(),
1749            DependentQueuedRequestKind::RedactEvent,
1750        )
1751        .await?;
1752
1753        // It worked.
1754        let dependents = self.load_dependent_queued_requests(room_id).await?;
1755        assert_eq!(dependents.len(), 1);
1756        assert_eq!(dependents[0].parent_transaction_id, txn);
1757        assert_eq!(dependents[0].own_transaction_id, child_txn);
1758        assert!(dependents[0].parent_key.is_none());
1759        assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1760
1761        // Make it a reaction, instead of a redaction.
1762        self.update_dependent_queued_request(
1763            room_id,
1764            &child_txn,
1765            DependentQueuedRequestKind::ReactEvent { key: "👍".to_owned() },
1766        )
1767        .await?;
1768
1769        // It worked.
1770        let dependents = self.load_dependent_queued_requests(room_id).await?;
1771        assert_eq!(dependents.len(), 1);
1772        assert_eq!(dependents[0].parent_transaction_id, txn);
1773        assert_eq!(dependents[0].own_transaction_id, child_txn);
1774        assert!(dependents[0].parent_key.is_none());
1775        assert_matches!(
1776            &dependents[0].kind,
1777            DependentQueuedRequestKind::ReactEvent { key } => {
1778                assert_eq!(key, "👍");
1779            }
1780        );
1781
1782        Ok(())
1783    }
1784
1785    async fn test_get_room_infos(&self) -> TestResult {
1786        let room_id_0 = room_id!("!r0");
1787        let room_id_1 = room_id!("!r1");
1788        let room_id_2 = room_id!("!r2");
1789
1790        // There is no room for the moment.
1791        {
1792            assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 0);
1793        }
1794
1795        // Save rooms.
1796        let mut changes = StateChanges::default();
1797        changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
1798        changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
1799        self.save_changes(&changes).await?;
1800
1801        // We can find all the rooms with `RoomLoadSettings::All`.
1802        {
1803            let mut all_rooms = self.get_room_infos(&RoomLoadSettings::All).await?;
1804
1805            // (We need to sort by `room_id` so that the test is stable across all
1806            // `StateStore` implementations).
1807            all_rooms.sort_by(|a, b| a.room_id.cmp(&b.room_id));
1808
1809            assert_eq!(all_rooms.len(), 2);
1810            assert_eq!(all_rooms[0].room_id, room_id_0);
1811            assert_eq!(all_rooms[1].room_id, room_id_1);
1812        }
1813
1814        // We can find a single room with `RoomLoadSettings::One`.
1815        {
1816            let all_rooms =
1817                self.get_room_infos(&RoomLoadSettings::One(room_id_1.to_owned())).await?;
1818
1819            assert_eq!(all_rooms.len(), 1);
1820            assert_eq!(all_rooms[0].room_id, room_id_1);
1821        }
1822
1823        // `RoomLoadSetting::One` can result in loading zero room if the room is
1824        // unknown.
1825        {
1826            let all_rooms =
1827                self.get_room_infos(&RoomLoadSettings::One(room_id_2.to_owned())).await?;
1828
1829            assert_eq!(all_rooms.len(), 0);
1830        }
1831
1832        Ok(())
1833    }
1834
1835    async fn test_thread_subscriptions(&self) -> TestResult {
1836        let first_thread = event_id!("$t1");
1837        let second_thread = event_id!("$t2");
1838
1839        // At first, there is no thread subscription.
1840        let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1841        assert!(maybe_sub.is_none());
1842
1843        let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1844        assert!(maybe_sub.is_none());
1845
1846        // Setting the thread subscription works.
1847        self.upsert_thread_subscription(
1848            room_id(),
1849            first_thread,
1850            StoredThreadSubscription {
1851                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1852                bump_stamp: None,
1853            },
1854        )
1855        .await?;
1856
1857        self.upsert_thread_subscription(
1858            room_id(),
1859            second_thread,
1860            StoredThreadSubscription {
1861                status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1862                bump_stamp: None,
1863            },
1864        )
1865        .await?;
1866
1867        // Now, reading the thread subscription returns the expected status.
1868        let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1869        assert_eq!(
1870            maybe_sub,
1871            Some(StoredThreadSubscription {
1872                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1873                bump_stamp: None,
1874            })
1875        );
1876
1877        let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1878        assert_eq!(
1879            maybe_sub,
1880            Some(StoredThreadSubscription {
1881                status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1882                bump_stamp: None,
1883            })
1884        );
1885
1886        // We can override the thread subscription status.
1887        self.upsert_thread_subscription(
1888            room_id(),
1889            first_thread,
1890            StoredThreadSubscription {
1891                status: ThreadSubscriptionStatus::Unsubscribed,
1892                bump_stamp: None,
1893            },
1894        )
1895        .await?;
1896
1897        // And it's correctly reflected.
1898        let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1899        assert_eq!(
1900            maybe_sub,
1901            Some(StoredThreadSubscription {
1902                status: ThreadSubscriptionStatus::Unsubscribed,
1903                bump_stamp: None,
1904            })
1905        );
1906
1907        // And the second thread is still subscribed.
1908        let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1909        assert_eq!(
1910            maybe_sub,
1911            Some(StoredThreadSubscription {
1912                status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1913                bump_stamp: None,
1914            })
1915        );
1916
1917        // We can remove a thread subscription.
1918        self.remove_thread_subscription(room_id(), second_thread).await?;
1919
1920        // And it's correctly reflected.
1921        let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1922        assert_eq!(maybe_sub, None);
1923
1924        // And the first thread is still unsubscribed.
1925        let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1926        assert_eq!(
1927            maybe_sub,
1928            Some(StoredThreadSubscription {
1929                status: ThreadSubscriptionStatus::Unsubscribed,
1930                bump_stamp: None,
1931            })
1932        );
1933
1934        // Removing a thread subscription for an unknown thread is a no-op.
1935        self.remove_thread_subscription(room_id(), second_thread).await?;
1936
1937        Ok(())
1938    }
1939
1940    async fn test_thread_subscriptions_bumpstamps(&self) -> TestResult {
1941        let thread = event_id!("$fred");
1942
1943        // At first, there is no thread subscription.
1944        let sub = self.load_thread_subscription(room_id(), thread).await?;
1945        assert!(sub.is_none());
1946
1947        // Setting the thread subscription with some bumpstamp works.
1948        self.upsert_thread_subscription(
1949            room_id(),
1950            thread,
1951            StoredThreadSubscription {
1952                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1953                bump_stamp: Some(42),
1954            },
1955        )
1956        .await?;
1957
1958        let sub = self.load_thread_subscription(room_id(), thread).await?.unwrap();
1959        assert_eq!(
1960            sub,
1961            StoredThreadSubscription {
1962                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1963                bump_stamp: Some(42),
1964            }
1965        );
1966
1967        // Storing a subscription with an older bumpstamp has no effect.
1968        self.upsert_thread_subscription(
1969            room_id(),
1970            thread,
1971            StoredThreadSubscription {
1972                status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1973                bump_stamp: Some(41),
1974            },
1975        )
1976        .await?;
1977
1978        let sub = self.load_thread_subscription(room_id(), thread).await?.unwrap();
1979        assert_eq!(
1980            sub,
1981            StoredThreadSubscription {
1982                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1983                bump_stamp: Some(42),
1984            }
1985        );
1986
1987        // Storing with no bumpstamps keeps the previous one.
1988        self.upsert_thread_subscription(
1989            room_id(),
1990            thread,
1991            StoredThreadSubscription {
1992                status: ThreadSubscriptionStatus::Unsubscribed,
1993                bump_stamp: None,
1994            },
1995        )
1996        .await?;
1997
1998        let sub = self.load_thread_subscription(room_id(), thread).await?.unwrap();
1999        assert_eq!(
2000            sub,
2001            StoredThreadSubscription {
2002                status: ThreadSubscriptionStatus::Unsubscribed,
2003                bump_stamp: Some(42),
2004            }
2005        );
2006
2007        Ok(())
2008    }
2009
2010    async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult {
2011        let threads = [
2012            event_id!("$t1"),
2013            event_id!("$t2"),
2014            event_id!("$t3"),
2015            event_id!("$t4"),
2016            event_id!("$t5"),
2017            event_id!("$t6"),
2018        ];
2019        // Helper for building the input for `upsert_thread_subscriptions()`,
2020        // which is of the type: Vec<(&RoomId, &EventId, StoredThreadSubscription)>
2021        let build_subscription_updates = |subs: &[StoredThreadSubscription]| {
2022            threads
2023                .iter()
2024                .zip(subs)
2025                .map(|(&event_id, &sub)| (room_id(), event_id, sub))
2026                .collect::<Vec<_>>()
2027        };
2028
2029        // Test bump_stamp logic
2030        let initial_subscriptions = build_subscription_updates(&[
2031            StoredThreadSubscription {
2032                status: ThreadSubscriptionStatus::Unsubscribed,
2033                bump_stamp: None,
2034            },
2035            StoredThreadSubscription {
2036                status: ThreadSubscriptionStatus::Unsubscribed,
2037                bump_stamp: Some(14),
2038            },
2039            StoredThreadSubscription {
2040                status: ThreadSubscriptionStatus::Unsubscribed,
2041                bump_stamp: None,
2042            },
2043            StoredThreadSubscription {
2044                status: ThreadSubscriptionStatus::Unsubscribed,
2045                bump_stamp: Some(210),
2046            },
2047            StoredThreadSubscription {
2048                status: ThreadSubscriptionStatus::Unsubscribed,
2049                bump_stamp: Some(5),
2050            },
2051            StoredThreadSubscription {
2052                status: ThreadSubscriptionStatus::Unsubscribed,
2053                bump_stamp: Some(100),
2054            },
2055        ]);
2056
2057        let update_subscriptions = build_subscription_updates(&[
2058            StoredThreadSubscription {
2059                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2060                bump_stamp: None,
2061            },
2062            StoredThreadSubscription {
2063                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2064                bump_stamp: None,
2065            },
2066            StoredThreadSubscription {
2067                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2068                bump_stamp: Some(1101),
2069            },
2070            StoredThreadSubscription {
2071                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2072                bump_stamp: Some(222),
2073            },
2074            StoredThreadSubscription {
2075                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2076                bump_stamp: Some(1),
2077            },
2078            StoredThreadSubscription {
2079                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2080                bump_stamp: Some(100),
2081            },
2082        ]);
2083
2084        let expected_subscriptions = build_subscription_updates(&[
2085            // Status should be updated, because prev and new bump_stamp are both None
2086            StoredThreadSubscription {
2087                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2088                bump_stamp: None,
2089            },
2090            // Status should be updated, but keep initial bump_stamp (new is None)
2091            StoredThreadSubscription {
2092                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2093                bump_stamp: Some(14),
2094            },
2095            // Status should be updated and also bump_stamp should be updated (initial was None)
2096            StoredThreadSubscription {
2097                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2098                bump_stamp: Some(1101),
2099            },
2100            // Status should be updated and also bump_stamp should be updated (initial was lower)
2101            StoredThreadSubscription {
2102                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2103                bump_stamp: Some(222),
2104            },
2105            // Status shouldn't change, as new bump_stamp is lower
2106            StoredThreadSubscription {
2107                status: ThreadSubscriptionStatus::Unsubscribed,
2108                bump_stamp: Some(5),
2109            },
2110            // Status shouldn't change, as bump_stamp is equal to the previous one
2111            StoredThreadSubscription {
2112                status: ThreadSubscriptionStatus::Unsubscribed,
2113                bump_stamp: Some(100),
2114            },
2115        ]);
2116
2117        // Set the initial subscriptions
2118        self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?;
2119
2120        // Assert the subscriptions have been added
2121        for (room_id, thread_id, expected_sub) in &initial_subscriptions {
2122            let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2123            assert_eq!(stored_subscription, Some(*expected_sub));
2124        }
2125
2126        // Update subscriptions
2127        self.upsert_thread_subscriptions(update_subscriptions).await?;
2128
2129        // Assert the expected subscriptions and bump_stamps
2130        for (room_id, thread_id, expected_sub) in &expected_subscriptions {
2131            let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2132            assert_eq!(stored_subscription, Some(*expected_sub));
2133        }
2134
2135        // Test just state changes, but first remove previous subscriptions
2136        for (room_id, thread_id, _) in &expected_subscriptions {
2137            self.remove_thread_subscription(room_id, thread_id).await?;
2138        }
2139
2140        let initial_subscriptions = build_subscription_updates(&[
2141            StoredThreadSubscription {
2142                status: ThreadSubscriptionStatus::Unsubscribed,
2143                bump_stamp: Some(1),
2144            },
2145            StoredThreadSubscription {
2146                status: ThreadSubscriptionStatus::Subscribed { automatic: false },
2147                bump_stamp: Some(1),
2148            },
2149            StoredThreadSubscription {
2150                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2151                bump_stamp: Some(1),
2152            },
2153        ]);
2154
2155        self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?;
2156
2157        for (room_id, thread_id, expected_sub) in &initial_subscriptions {
2158            let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2159            assert_eq!(stored_subscription, Some(*expected_sub));
2160        }
2161
2162        let update_subscriptions = build_subscription_updates(&[
2163            StoredThreadSubscription {
2164                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2165                bump_stamp: Some(2),
2166            },
2167            StoredThreadSubscription {
2168                status: ThreadSubscriptionStatus::Unsubscribed,
2169                bump_stamp: Some(2),
2170            },
2171            StoredThreadSubscription {
2172                status: ThreadSubscriptionStatus::Subscribed { automatic: false },
2173                bump_stamp: Some(2),
2174            },
2175        ]);
2176
2177        self.upsert_thread_subscriptions(update_subscriptions.clone()).await?;
2178
2179        for (room_id, thread_id, expected_sub) in &update_subscriptions {
2180            let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2181            assert_eq!(stored_subscription, Some(*expected_sub));
2182        }
2183
2184        Ok(())
2185    }
2186}
2187
2188/// Macro building to allow your StateStore implementation to run the entire
2189/// tests suite locally.
2190///
2191/// You need to provide a `async fn get_store() -> StoreResult<impl StateStore>`
2192/// providing a fresh store on the same level you invoke the macro.
2193///
2194/// ## Usage Example:
2195/// ```no_run
2196/// # use matrix_sdk_base::store::{
2197/// #    StateStore,
2198/// #    MemoryStore as MyStore,
2199/// #    Result as StoreResult,
2200/// # };
2201///
2202/// #[cfg(test)]
2203/// mod tests {
2204///     use super::{MyStore, StateStore, StoreResult};
2205///
2206///     async fn get_store() -> StoreResult<impl StateStore> {
2207///         Ok(MyStore::new())
2208///     }
2209///
2210///     statestore_integration_tests!();
2211/// }
2212/// ```
2213#[allow(unused_macros, unused_extern_crates)]
2214#[macro_export]
2215macro_rules! statestore_integration_tests {
2216    () => {
2217        mod statestore_integration_tests {
2218            use matrix_sdk_test::{TestResult, async_test};
2219            use $crate::store::{IntoStateStore, StateStoreIntegrationTests};
2220
2221            use super::get_store;
2222
2223            #[async_test]
2224            async fn test_topic_redaction() -> TestResult {
2225                let store = get_store().await?.into_state_store();
2226                store.test_topic_redaction().await
2227            }
2228
2229            #[async_test]
2230            async fn test_populate_store() -> TestResult {
2231                let store = get_store().await?.into_state_store();
2232                store.test_populate_store().await
2233            }
2234
2235            #[async_test]
2236            async fn test_member_saving() -> TestResult {
2237                let store = get_store().await?.into_state_store();
2238                store.test_member_saving().await
2239            }
2240
2241            #[async_test]
2242            async fn test_filter_saving() -> TestResult {
2243                let store = get_store().await?.into_state_store();
2244                store.test_filter_saving().await
2245            }
2246
2247            #[async_test]
2248            async fn test_user_avatar_url_saving() -> TestResult {
2249                let store = get_store().await?.into_state_store();
2250                store.test_user_avatar_url_saving().await
2251            }
2252
2253            #[async_test]
2254            async fn test_supported_versions_saving() -> TestResult {
2255                let store = get_store().await?.into_state_store();
2256                store.test_supported_versions_saving().await
2257            }
2258
2259            #[async_test]
2260            async fn test_well_known_saving() -> TestResult {
2261                let store = get_store().await?.into_state_store();
2262                store.test_well_known_saving().await
2263            }
2264
2265            #[async_test]
2266            async fn test_sync_token_saving() -> TestResult {
2267                let store = get_store().await?.into_state_store();
2268                store.test_sync_token_saving().await
2269            }
2270
2271            #[async_test]
2272            async fn test_utd_hook_manager_data_saving() -> TestResult {
2273                let store = get_store().await?.into_state_store();
2274                store.test_utd_hook_manager_data_saving().await
2275            }
2276
2277            #[async_test]
2278            async fn test_one_time_key_already_uploaded_data_saving() -> TestResult {
2279                let store = get_store().await?.into_state_store();
2280                store.test_one_time_key_already_uploaded_data_saving().await
2281            }
2282
2283            #[async_test]
2284            async fn test_stripped_member_saving() -> TestResult {
2285                let store = get_store().await?.into_state_store();
2286                store.test_stripped_member_saving().await
2287            }
2288
2289            #[async_test]
2290            async fn test_power_level_saving() -> TestResult {
2291                let store = get_store().await?.into_state_store();
2292                store.test_power_level_saving().await
2293            }
2294
2295            #[async_test]
2296            async fn test_receipts_saving() -> TestResult {
2297                let store = get_store().await?.into_state_store();
2298                store.test_receipts_saving().await
2299            }
2300
2301            #[async_test]
2302            async fn test_custom_storage() -> TestResult {
2303                let store = get_store().await?.into_state_store();
2304                store.test_custom_storage().await
2305            }
2306
2307            #[async_test]
2308            async fn test_stripped_non_stripped() -> TestResult {
2309                let store = get_store().await?.into_state_store();
2310                store.test_stripped_non_stripped().await
2311            }
2312
2313            #[async_test]
2314            async fn test_room_removal() -> TestResult {
2315                let store = get_store().await?.into_state_store();
2316                store.test_room_removal().await
2317            }
2318
2319            #[async_test]
2320            async fn test_profile_removal() -> TestResult {
2321                let store = get_store().await?.into_state_store();
2322                store.test_profile_removal().await
2323            }
2324
2325            #[async_test]
2326            async fn test_presence_saving() -> TestResult {
2327                let store = get_store().await?.into_state_store();
2328                store.test_presence_saving().await
2329            }
2330
2331            #[async_test]
2332            async fn test_display_names_saving() -> TestResult {
2333                let store = get_store().await?.into_state_store();
2334                store.test_display_names_saving().await
2335            }
2336
2337            #[async_test]
2338            async fn test_send_queue() -> TestResult {
2339                let store = get_store().await?.into_state_store();
2340                store.test_send_queue().await
2341            }
2342
2343            #[async_test]
2344            async fn test_send_queue_priority() -> TestResult {
2345                let store = get_store().await?.into_state_store();
2346                store.test_send_queue_priority().await
2347            }
2348
2349            #[async_test]
2350            async fn test_send_queue_dependents() -> TestResult {
2351                let store = get_store().await?.into_state_store();
2352                store.test_send_queue_dependents().await
2353            }
2354
2355            #[async_test]
2356            async fn test_update_send_queue_dependent() -> TestResult {
2357                let store = get_store().await?.into_state_store();
2358                store.test_update_send_queue_dependent().await
2359            }
2360
2361            #[async_test]
2362            async fn test_get_room_infos() -> TestResult {
2363                let store = get_store().await?.into_state_store();
2364                store.test_get_room_infos().await
2365            }
2366
2367            #[async_test]
2368            async fn test_thread_subscriptions() -> TestResult {
2369                let store = get_store().await?.into_state_store();
2370                store.test_thread_subscriptions().await
2371            }
2372
2373            #[async_test]
2374            async fn test_thread_subscriptions_bumpstamps() -> TestResult {
2375                let store = get_store().await?.into_state_store();
2376                store.test_thread_subscriptions_bumpstamps().await
2377            }
2378
2379            #[async_test]
2380            async fn test_thread_subscriptions_bulk_upsert() -> TestResult {
2381                let store = get_store().await?.into_state_store();
2382                store.test_thread_subscriptions_bulk_upsert().await
2383            }
2384        }
2385    };
2386}
2387
2388fn user_id() -> &'static UserId {
2389    user_id!("@example:localhost")
2390}
2391
2392fn invited_user_id() -> &'static UserId {
2393    user_id!("@invited:localhost")
2394}
2395
2396fn room_id() -> &'static RoomId {
2397    room_id!("!test:localhost")
2398}
2399
2400fn stripped_room_id() -> &'static RoomId {
2401    room_id!("!stripped:localhost")
2402}
2403
2404fn first_receipt_event_id() -> &'static EventId {
2405    event_id!("$example")
2406}
2407
2408fn power_level_event() -> Raw<AnySyncStateEvent> {
2409    let content = RoomPowerLevelsEventContent::new(&AuthorizationRules::V1);
2410
2411    let event = json!({
2412        "event_id": "$h29iv0s8:example.com",
2413        "content": content,
2414        "sender": user_id(),
2415        "type": "m.room.power_levels",
2416        "origin_server_ts": 0u64,
2417        "state_key": "",
2418    });
2419
2420    serde_json::from_value(event).unwrap()
2421}
2422
2423fn stripped_membership_event() -> Raw<StrippedRoomMemberEvent> {
2424    custom_stripped_membership_event(user_id())
2425}
2426
2427fn custom_stripped_membership_event(user_id: &UserId) -> Raw<StrippedRoomMemberEvent> {
2428    let ev_json = json!({
2429        "type": "m.room.member",
2430        "content": RoomMemberEventContent::new(MembershipState::Join),
2431        "sender": user_id,
2432        "state_key": user_id,
2433    });
2434
2435    Raw::new(&ev_json).unwrap().cast_unchecked()
2436}
2437
2438fn membership_event() -> Raw<SyncRoomMemberEvent> {
2439    custom_membership_event(user_id(), event_id!("$h29iv0s8:example.com"))
2440}
2441
2442fn custom_membership_event(user_id: &UserId, event_id: &EventId) -> Raw<SyncRoomMemberEvent> {
2443    let ev_json = json!({
2444        "type": "m.room.member",
2445        "content": RoomMemberEventContent::new(MembershipState::Join),
2446        "event_id": event_id,
2447        "origin_server_ts": 198,
2448        "sender": user_id,
2449        "state_key": user_id,
2450    });
2451
2452    Raw::new(&ev_json).unwrap().cast_unchecked()
2453}
2454
2455fn custom_presence_event(user_id: &UserId) -> Raw<PresenceEvent> {
2456    let ev_json = json!({
2457        "content": {
2458            "presence": "online"
2459        },
2460        "sender": user_id,
2461    });
2462
2463    Raw::new(&ev_json).unwrap().cast_unchecked()
2464}