Skip to main content

matrix_sdk_base/store/
integration_tests.rs

1//! Trait and macro of integration tests for StateStore implementations.
2
3use std::{
4    collections::{BTreeMap, BTreeSet, HashMap},
5    str::FromStr,
6};
7
8use assert_matches::assert_matches;
9use assert_matches2::assert_let;
10use growable_bloom_filter::GrowableBloomBuilder;
11use matrix_sdk_test::{TestResult, event_factory::EventFactory};
12use ruma::{
13    EventId, MilliSecondsSinceUnixEpoch, OwnedUserId, RoomId, TransactionId, UserId,
14    api::{
15        FeatureFlag, MatrixVersion,
16        client::discovery::discover_homeserver::{HomeserverInfo, RtcFocusInfo},
17    },
18    event_id,
19    events::{
20        AnyGlobalAccountDataEvent, AnyMessageLikeEventContent, AnyRoomAccountDataEvent,
21        AnyStrippedStateEvent, AnySyncStateEvent, GlobalAccountDataEventType,
22        RoomAccountDataEventType, StateEventType, SyncStateEvent,
23        presence::PresenceEvent,
24        receipt::{ReceiptThread, ReceiptType},
25        room::{
26            member::{
27                MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent,
28                SyncRoomMemberEvent,
29            },
30            message::RoomMessageEventContent,
31            power_levels::RoomPowerLevelsEventContent,
32            redaction::SyncRoomRedactionEvent,
33            topic::RoomTopicEventContent,
34        },
35        tag::{TagInfo, TagName, Tags, UserTagName},
36    },
37    mxc_uri, owned_event_id, owned_mxc_uri,
38    presence::PresenceState,
39    push::Ruleset,
40    room_id,
41    room_version_rules::AuthorizationRules,
42    serde::Raw,
43    uint, user_id,
44};
45use serde_json::json;
46
47use super::{
48    DependentQueuedRequestKind, DisplayName, DynStateStore, RoomLoadSettings,
49    SupportedVersionsResponse, TtlStoreValue, WellKnownResponse, send_queue::SentRequestKey,
50};
51use crate::{
52    RoomInfo, RoomMemberships, RoomState, StateChanges, StateStoreDataKey, StateStoreDataValue,
53    deserialized_responses::MemberEvent,
54    store::{
55        ChildTransactionId, QueueWedgeError, SerializableEventContent, StateStoreExt,
56        StoredThreadSubscription, ThreadSubscriptionStatus,
57    },
58    utils::RawSyncStateEventWithKeys,
59};
60
61/// `StateStore` integration tests.
62///
63/// This trait is not meant to be used directly, but will be used with the
64/// `statestore_integration_tests!` macro.
65#[allow(async_fn_in_trait)]
66pub trait StateStoreIntegrationTests {
67    /// Populate the given `StateStore`.
68    async fn populate(&self) -> TestResult;
69    /// Test room topic redaction.
70    async fn test_topic_redaction(&self) -> TestResult;
71    /// Test populating the store.
72    async fn test_populate_store(&self) -> TestResult;
73    /// Test room member saving.
74    async fn test_member_saving(&self) -> TestResult;
75    /// Test filter saving.
76    async fn test_filter_saving(&self) -> TestResult;
77    /// Test saving a user avatar URL.
78    async fn test_user_avatar_url_saving(&self) -> TestResult;
79    /// Test sync token saving.
80    async fn test_sync_token_saving(&self) -> TestResult;
81    /// Test UtdHookManagerData saving.
82    async fn test_utd_hook_manager_data_saving(&self) -> TestResult;
83    /// Test the saving of the OneTimeKeyAlreadyUploaded key/value data type.
84    async fn test_one_time_key_already_uploaded_data_saving(&self) -> TestResult;
85    /// Test stripped room member saving.
86    async fn test_stripped_member_saving(&self) -> TestResult;
87    /// Test room power levels saving.
88    async fn test_power_level_saving(&self) -> TestResult;
89    /// Test user receipts saving.
90    async fn test_receipts_saving(&self) -> TestResult;
91    /// Test custom storage.
92    async fn test_custom_storage(&self) -> TestResult;
93    /// Test stripped and non-stripped room member saving.
94    async fn test_stripped_non_stripped(&self) -> TestResult;
95    /// Test room removal.
96    async fn test_room_removal(&self) -> TestResult;
97    /// Test profile removal.
98    async fn test_profile_removal(&self) -> TestResult;
99    /// Test presence saving.
100    async fn test_presence_saving(&self) -> TestResult;
101    /// Test display names saving.
102    async fn test_display_names_saving(&self) -> TestResult;
103    /// Test operations with the send queue.
104    async fn test_send_queue(&self) -> TestResult;
105    /// Test priority of operations with the send queue.
106    async fn test_send_queue_priority(&self) -> TestResult;
107    /// Test operations related to send queue dependents.
108    async fn test_send_queue_dependents(&self) -> TestResult;
109    /// Test an update to a send queue dependent request.
110    async fn test_update_send_queue_dependent(&self) -> TestResult;
111    /// Test saving/restoring the supported versions of the server.
112    async fn test_supported_versions_saving(&self) -> TestResult;
113    /// Test saving/restoring the well-known info of the server.
114    async fn test_well_known_saving(&self) -> TestResult;
115    /// Test fetching room infos based on [`RoomLoadSettings`].
116    async fn test_get_room_infos(&self) -> TestResult;
117    /// Test loading thread subscriptions.
118    async fn test_thread_subscriptions(&self) -> TestResult;
119    /// Test thread subscriptions bulk upsert, including bumpstamp semantics.
120    async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult;
121}
122
123impl StateStoreIntegrationTests for DynStateStore {
124    async fn populate(&self) -> TestResult {
125        let mut changes = StateChanges::default();
126
127        let user_id = user_id();
128        let invited_user_id = invited_user_id();
129        let room_id = room_id();
130        let stripped_room_id = stripped_room_id();
131
132        changes.sync_token = Some("t392-516_47314_0_7_1_1_1_11444_1".to_owned());
133
134        let f = EventFactory::new().sender(user_id);
135        let presence_raw: Raw<PresenceEvent> = f
136            .presence(PresenceState::Online)
137            .avatar_url(mxc_uri!("mxc://localhost/wefuiwegh8742w"))
138            .currently_active(false)
139            .last_active_ago(1)
140            .status_msg("Making cupcakes")
141            .into();
142        let presence_event = presence_raw.deserialize()?;
143        changes.add_presence_event(presence_event, presence_raw);
144
145        let f = EventFactory::new().sender(user_id);
146        let pushrules_raw: Raw<AnyGlobalAccountDataEvent> =
147            f.push_rules(Ruleset::server_default(user_id)).into();
148        let pushrules_event = pushrules_raw.deserialize()?;
149        changes.account_data.insert(pushrules_event.event_type(), pushrules_raw);
150
151        let mut room = RoomInfo::new(room_id, RoomState::Joined);
152        room.mark_as_left();
153
154        let f = EventFactory::new().sender(user_id).room(room_id);
155        let mut tags = Tags::new();
156        tags.insert(TagName::Favorite, TagInfo::new());
157        tags.insert(TagName::User(UserTagName::from_str("u.work").unwrap()), TagInfo::new());
158        let tag_raw: Raw<AnyRoomAccountDataEvent> = f.tag(tags).into();
159        let tag_event = tag_raw.deserialize()?;
160        changes.add_room_account_data(room_id, tag_event, tag_raw);
161
162        let f = EventFactory::new().sender(user_id).room(room_id);
163        let name_raw: Raw<AnySyncStateEvent> = f.room_name("room name").into();
164        let name_event = name_raw.deserialize()?;
165        room.handle_state_event(
166            &mut RawSyncStateEventWithKeys::try_from_raw_state_event(name_raw.clone())
167                .expect("generated state event should be valid"),
168        );
169        changes.add_state_event(room_id, name_event, name_raw);
170
171        let f = EventFactory::new().sender(user_id);
172        let receipt_content = f
173            .room(room_id)
174            .read_receipts()
175            .add(event_id!("$example"), user_id, ReceiptType::Read, ReceiptThread::Unthreaded)
176            .into_content();
177        changes.add_receipts(room_id, receipt_content);
178
179        let topic_event_id = topic_event_id();
180        let topic_raw: Raw<AnySyncStateEvent> = EventFactory::new()
181            .room(room_id)
182            .sender(user_id)
183            .room_topic("😀")
184            .event_id(topic_event_id)
185            .prev_content(RoomTopicEventContent::new("test".to_owned()))
186            .into_raw_sync_state();
187        let topic_event = topic_raw.deserialize()?;
188        room.handle_state_event(
189            &mut RawSyncStateEventWithKeys::try_from_raw_state_event(topic_raw.clone())
190                .expect("generated state event should be valid"),
191        );
192        changes.add_state_event(room_id, topic_event, topic_raw);
193
194        let mut room_ambiguity_map = HashMap::new();
195        let mut room_profiles = BTreeMap::new();
196
197        let f = EventFactory::new().sender(user_id).room(room_id);
198        let member_raw: Raw<SyncRoomMemberEvent> =
199            f.member(user_id).display_name("example").previous(MembershipState::Invite).into();
200        let member_event: SyncRoomMemberEvent = member_raw.deserialize()?;
201        let displayname = DisplayName::new(
202            member_event.as_original().unwrap().content.displayname.as_ref().unwrap(),
203        );
204        room_ambiguity_map.insert(displayname.clone(), BTreeSet::from([user_id.to_owned()]));
205        room_profiles.insert(user_id.to_owned(), (&member_event).into());
206
207        let member_raw: Raw<AnySyncStateEvent> = member_raw.cast_unchecked();
208        let member_state_event = member_raw.deserialize()?;
209        changes.add_state_event(room_id, member_state_event, member_raw);
210
211        let f = EventFactory::new().sender(user_id).room(room_id);
212        let invited_member_raw: Raw<SyncRoomMemberEvent> = f
213            .member(invited_user_id)
214            .invited(invited_user_id)
215            .display_name("example")
216            .avatar_url(mxc_uri!("mxc://localhost/SEsfnsuifSDFSSEF"))
217            .reason("Looking for support")
218            .into();
219        // FIXME: Should be stripped room member event
220        let invited_member_event: SyncRoomMemberEvent = invited_member_raw.deserialize()?;
221        room_ambiguity_map.entry(displayname).or_default().insert(invited_user_id.to_owned());
222        room_profiles.insert(invited_user_id.to_owned(), (&invited_member_event).into());
223
224        let invited_member_raw: Raw<AnySyncStateEvent> = invited_member_raw.cast_unchecked();
225        let invited_member_state_event = invited_member_raw.deserialize()?;
226        changes.add_state_event(room_id, invited_member_state_event, invited_member_raw);
227
228        changes.ambiguity_maps.insert(room_id.to_owned(), room_ambiguity_map);
229        changes.profiles.insert(room_id.to_owned(), room_profiles);
230        changes.add_room(room);
231
232        let mut stripped_room = RoomInfo::new(stripped_room_id, RoomState::Invited);
233
234        let f = EventFactory::new().sender(user_id).room(stripped_room_id);
235        let stripped_name_raw: Raw<AnyStrippedStateEvent> = f.room_name("room name").into();
236        let stripped_name_event = stripped_name_raw.deserialize()?;
237        stripped_room.handle_stripped_state_event(&stripped_name_event);
238        changes.stripped_state.insert(
239            stripped_room_id.to_owned(),
240            BTreeMap::from([(
241                stripped_name_event.event_type(),
242                BTreeMap::from([(
243                    stripped_name_event.state_key().to_owned(),
244                    stripped_name_raw.clone(),
245                )]),
246            )]),
247        );
248
249        changes.add_room(stripped_room);
250
251        let f = EventFactory::new().sender(user_id).room(stripped_room_id);
252        let stripped_member_raw: Raw<StrippedRoomMemberEvent> =
253            f.member(user_id).display_name("example").into();
254        changes.add_stripped_member(stripped_room_id, user_id, stripped_member_raw);
255
256        self.save_changes(&changes).await?;
257
258        Ok(())
259    }
260
261    async fn test_topic_redaction(&self) -> TestResult {
262        let room_id = room_id();
263        let f = EventFactory::new();
264        self.populate().await?;
265
266        assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
267        assert_eq!(
268            self.get_state_event_static::<RoomTopicEventContent>(room_id)
269                .await?
270                .expect("room topic found before redaction")
271                .deserialize()
272                .expect("can deserialize room topic before redaction")
273                .as_sync()
274                .expect("room topic is a sync state event")
275                .as_original()
276                .expect("room topic is not redacted yet")
277                .content
278                .topic,
279            "😀"
280        );
281
282        let mut changes = StateChanges::default();
283
284        let topic_event_id = topic_event_id();
285        let redaction_evt: Raw<SyncRoomRedactionEvent> =
286            f.room(room_id).sender(user_id()).redaction(topic_event_id).into_raw();
287
288        changes.add_redaction(room_id, topic_event_id, redaction_evt);
289        self.save_changes(&changes).await?;
290
291        let redacted_event = self
292            .get_state_event_static::<RoomTopicEventContent>(room_id)
293            .await?
294            .expect("room topic found after redaction")
295            .deserialize()
296            .expect("can deserialize room topic after redaction");
297
298        assert_matches!(redacted_event.as_sync(), Some(SyncStateEvent::Redacted(_)));
299
300        Ok(())
301    }
302
303    async fn test_populate_store(&self) -> TestResult {
304        let room_id = room_id();
305        let user_id = user_id();
306        let display_name = DisplayName::new("example");
307
308        self.populate().await?;
309
310        assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
311        assert!(self.get_presence_event(user_id).await?.is_some());
312        assert_eq!(
313            self.get_room_infos(&RoomLoadSettings::default()).await?.len(),
314            2,
315            "Expected to find 2 room infos"
316        );
317        assert!(
318            self.get_account_data_event(GlobalAccountDataEventType::PushRules).await?.is_some()
319        );
320
321        assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_some());
322        assert_eq!(
323            self.get_state_events(room_id, StateEventType::RoomTopic).await?.len(),
324            1,
325            "Expected to find 1 room topic"
326        );
327        assert!(self.get_profile(room_id, user_id).await?.is_some());
328        assert!(self.get_member_event(room_id, user_id).await?.is_some());
329        assert_eq!(
330            self.get_user_ids(room_id, RoomMemberships::empty()).await?.len(),
331            2,
332            "Expected to find 2 members for room"
333        );
334        assert_eq!(
335            self.get_user_ids(room_id, RoomMemberships::INVITE).await?.len(),
336            1,
337            "Expected to find 1 invited user ids"
338        );
339        assert_eq!(
340            self.get_user_ids(room_id, RoomMemberships::JOIN).await?.len(),
341            1,
342            "Expected to find 1 joined user ids"
343        );
344        assert_eq!(
345            self.get_users_with_display_name(room_id, &display_name).await?.len(),
346            2,
347            "Expected to find 2 display names for room"
348        );
349        assert!(
350            self.get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
351                .await?
352                .is_some()
353        );
354        assert!(
355            self.get_user_room_receipt_event(
356                room_id,
357                ReceiptType::Read,
358                ReceiptThread::Unthreaded,
359                user_id
360            )
361            .await?
362            .is_some()
363        );
364        assert_eq!(
365            self.get_event_room_receipt_events(
366                room_id,
367                ReceiptType::Read,
368                ReceiptThread::Unthreaded,
369                first_receipt_event_id()
370            )
371            .await?
372            .len(),
373            1,
374            "Expected to find 1 read receipt"
375        );
376        Ok(())
377    }
378
379    async fn test_member_saving(&self) -> TestResult {
380        let room_id = room_id!("!test_member_saving:localhost");
381        let user_id = user_id();
382        let second_user_id = user_id!("@second:localhost");
383        let third_user_id = user_id!("@third:localhost");
384        let unknown_user_id = user_id!("@unknown:localhost");
385
386        // No event in store.
387        let mut user_ids = vec![user_id.to_owned()];
388        assert!(self.get_member_event(room_id, user_id).await?.is_none());
389        let member_events = self
390            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
391            .await;
392        assert!(member_events?.is_empty());
393        assert!(self.get_profile(room_id, user_id).await?.is_none());
394        let profiles = self.get_profiles(room_id, &user_ids).await;
395        assert!(profiles?.is_empty());
396
397        // One event in store.
398        let mut changes = StateChanges::default();
399        let raw_member_event = membership_event();
400        let profile = raw_member_event.deserialize()?.into();
401        changes
402            .state
403            .entry(room_id.to_owned())
404            .or_default()
405            .entry(StateEventType::RoomMember)
406            .or_default()
407            .insert(user_id.into(), raw_member_event.cast());
408        changes.profiles.entry(room_id.to_owned()).or_default().insert(user_id.to_owned(), profile);
409        self.save_changes(&changes).await?;
410
411        assert!(self.get_member_event(room_id, user_id).await?.is_some());
412        let member_events = self
413            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
414            .await;
415        assert_eq!(member_events?.len(), 1);
416        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
417        assert_eq!(members.len(), 1, "We expected to find members for the room");
418        assert!(self.get_profile(room_id, user_id).await?.is_some());
419        let profiles = self.get_profiles(room_id, &user_ids).await;
420        assert_eq!(profiles?.len(), 1);
421
422        // Several events in store.
423        let mut changes = StateChanges::default();
424        let changes_members = changes
425            .state
426            .entry(room_id.to_owned())
427            .or_default()
428            .entry(StateEventType::RoomMember)
429            .or_default();
430        let changes_profiles = changes.profiles.entry(room_id.to_owned()).or_default();
431        let raw_second_member_event =
432            custom_membership_event(second_user_id, event_id!("$second_member_event"));
433        let second_profile = raw_second_member_event.deserialize()?.into();
434        changes_members.insert(second_user_id.into(), raw_second_member_event.cast());
435        changes_profiles.insert(second_user_id.to_owned(), second_profile);
436        let raw_third_member_event =
437            custom_membership_event(third_user_id, event_id!("$third_member_event"));
438        let third_profile = raw_third_member_event.deserialize()?.into();
439        changes_members.insert(third_user_id.into(), raw_third_member_event.cast());
440        changes_profiles.insert(third_user_id.to_owned(), third_profile);
441        self.save_changes(&changes).await?;
442
443        user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
444        assert!(self.get_member_event(room_id, second_user_id).await?.is_some());
445        assert!(self.get_member_event(room_id, third_user_id).await?.is_some());
446        let member_events = self
447            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
448            .await;
449        assert_eq!(member_events?.len(), 3);
450        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
451        assert_eq!(members.len(), 3, "We expected to find members for the room");
452        assert!(self.get_profile(room_id, second_user_id).await?.is_some());
453        assert!(self.get_profile(room_id, third_user_id).await?.is_some());
454        let profiles = self.get_profiles(room_id, &user_ids).await;
455        assert_eq!(profiles?.len(), 3);
456
457        // Several events in store with one unknown.
458        user_ids.push(unknown_user_id.to_owned());
459        let member_events = self
460            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
461            .await;
462        assert_eq!(member_events?.len(), 3);
463        let profiles = self.get_profiles(room_id, &user_ids).await;
464        assert_eq!(profiles?.len(), 3);
465
466        // Empty user IDs list.
467        let member_events = self
468            .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
469                room_id,
470                &[],
471            )
472            .await;
473        assert!(member_events?.is_empty());
474        let profiles = self.get_profiles(room_id, &[]).await;
475        assert!(profiles?.is_empty());
476
477        Ok(())
478    }
479
480    async fn test_filter_saving(&self) -> TestResult {
481        let filter_name = "filter_name";
482        let filter_id = "filter_id_1234";
483
484        self.set_kv_data(
485            StateStoreDataKey::Filter(filter_name),
486            StateStoreDataValue::Filter(filter_id.to_owned()),
487        )
488        .await?;
489        assert_let!(
490            Ok(Some(StateStoreDataValue::Filter(stored_filter_id))) =
491                self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await
492        );
493        assert_eq!(stored_filter_id, filter_id);
494
495        self.remove_kv_data(StateStoreDataKey::Filter(filter_name)).await?;
496        assert_matches!(self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await, Ok(None));
497
498        Ok(())
499    }
500
501    async fn test_user_avatar_url_saving(&self) -> TestResult {
502        let user_id = user_id!("@alice:example.org");
503        let url = owned_mxc_uri!("mxc://example.org/poiuyt098");
504
505        self.set_kv_data(
506            StateStoreDataKey::UserAvatarUrl(user_id),
507            StateStoreDataValue::UserAvatarUrl(url.clone()),
508        )
509        .await?;
510
511        assert_let!(
512            Ok(Some(StateStoreDataValue::UserAvatarUrl(stored_url))) =
513                self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await
514        );
515        assert_eq!(stored_url, url);
516
517        self.remove_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await?;
518        assert_matches!(
519            self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await,
520            Ok(None)
521        );
522
523        Ok(())
524    }
525
526    async fn test_supported_versions_saving(&self) -> TestResult {
527        let versions =
528            BTreeSet::from([MatrixVersion::V1_1, MatrixVersion::V1_2, MatrixVersion::V1_11]);
529        let supported_versions = SupportedVersionsResponse {
530            versions: versions.iter().map(|version| version.as_str().unwrap().to_owned()).collect(),
531            unstable_features: [("org.matrix.experimental".to_owned(), true)].into(),
532        };
533
534        self.set_kv_data(
535            StateStoreDataKey::SupportedVersions,
536            StateStoreDataValue::SupportedVersions(TtlStoreValue::new(supported_versions.clone())),
537        )
538        .await?;
539
540        assert_let!(
541            Ok(Some(StateStoreDataValue::SupportedVersions(stored_supported_versions))) =
542                self.get_kv_data(StateStoreDataKey::SupportedVersions).await
543        );
544        assert_let!(Some(stored_supported_versions) = stored_supported_versions.into_data());
545        assert_eq!(supported_versions, stored_supported_versions);
546
547        let stored_supported = stored_supported_versions.supported_versions();
548        assert_eq!(stored_supported.versions, versions);
549        assert_eq!(stored_supported.features.len(), 1);
550        assert!(stored_supported.features.contains(&FeatureFlag::from("org.matrix.experimental")));
551
552        self.remove_kv_data(StateStoreDataKey::SupportedVersions).await?;
553        assert_matches!(self.get_kv_data(StateStoreDataKey::SupportedVersions).await, Ok(None));
554
555        Ok(())
556    }
557
558    async fn test_well_known_saving(&self) -> TestResult {
559        let well_known = WellKnownResponse {
560            homeserver: HomeserverInfo::new("matrix.example.com".to_owned()),
561            identity_server: None,
562            tile_server: None,
563            rtc_foci: vec![RtcFocusInfo::livekit("livekit.example.com".to_owned())],
564        };
565
566        self.set_kv_data(
567            StateStoreDataKey::WellKnown,
568            StateStoreDataValue::WellKnown(TtlStoreValue::new(Some(well_known.clone()))),
569        )
570        .await?;
571
572        assert_let!(
573            Ok(Some(StateStoreDataValue::WellKnown(stored_well_known))) =
574                self.get_kv_data(StateStoreDataKey::WellKnown).await
575        );
576        assert_let!(Some(stored_well_known) = stored_well_known.into_data());
577        assert_eq!(stored_well_known, Some(well_known));
578
579        self.remove_kv_data(StateStoreDataKey::WellKnown).await?;
580        assert_matches!(self.get_kv_data(StateStoreDataKey::WellKnown).await, Ok(None));
581
582        self.set_kv_data(
583            StateStoreDataKey::WellKnown,
584            StateStoreDataValue::WellKnown(TtlStoreValue::new(None)),
585        )
586        .await?;
587
588        assert_let!(
589            Ok(Some(StateStoreDataValue::WellKnown(stored_well_known))) =
590                self.get_kv_data(StateStoreDataKey::WellKnown).await
591        );
592        assert_let!(Some(stored_well_known) = stored_well_known.into_data());
593        assert_eq!(stored_well_known, None);
594
595        Ok(())
596    }
597
598    async fn test_sync_token_saving(&self) -> TestResult {
599        let sync_token_1 = "t392-516_47314_0_7_1";
600        let sync_token_2 = "t392-516_47314_0_7_2";
601
602        assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
603
604        let changes =
605            StateChanges { sync_token: Some(sync_token_1.to_owned()), ..Default::default() };
606        self.save_changes(&changes).await?;
607        assert_let!(
608            Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
609                self.get_kv_data(StateStoreDataKey::SyncToken).await
610        );
611        assert_eq!(stored_sync_token, sync_token_1);
612
613        self.set_kv_data(
614            StateStoreDataKey::SyncToken,
615            StateStoreDataValue::SyncToken(sync_token_2.to_owned()),
616        )
617        .await?;
618        assert_let!(
619            Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
620                self.get_kv_data(StateStoreDataKey::SyncToken).await
621        );
622        assert_eq!(stored_sync_token, sync_token_2);
623
624        self.remove_kv_data(StateStoreDataKey::SyncToken).await?;
625        assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
626
627        Ok(())
628    }
629
630    async fn test_utd_hook_manager_data_saving(&self) -> TestResult {
631        // Before any data is written, the getter should return None.
632        assert!(
633            self.get_kv_data(StateStoreDataKey::UtdHookManagerData)
634                .await
635                .expect("Could not read data")
636                .is_none(),
637            "Store was not empty at start"
638        );
639
640        // Put some data in the store...
641        let data = GrowableBloomBuilder::new().build();
642        self.set_kv_data(
643            StateStoreDataKey::UtdHookManagerData,
644            StateStoreDataValue::UtdHookManagerData(data.clone()),
645        )
646        .await
647        .expect("Could not save data");
648
649        // ... and check it comes back.
650        let read_data = self
651            .get_kv_data(StateStoreDataKey::UtdHookManagerData)
652            .await
653            .expect("Could not read data")
654            .expect("no data found")
655            .into_utd_hook_manager_data()
656            .expect("not UtdHookManagerData");
657
658        assert_eq!(read_data, data);
659
660        Ok(())
661    }
662
663    async fn test_one_time_key_already_uploaded_data_saving(&self) -> TestResult {
664        // Before any data is written, the getter should return None.
665        assert!(
666            self.get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded).await?.is_none(),
667            "Store was not empty at start"
668        );
669
670        self.set_kv_data(
671            StateStoreDataKey::OneTimeKeyAlreadyUploaded,
672            StateStoreDataValue::OneTimeKeyAlreadyUploaded,
673        )
674        .await?;
675
676        let data = self.get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded).await?;
677        data.expect("The loaded data should be Some");
678
679        Ok(())
680    }
681
682    async fn test_stripped_member_saving(&self) -> TestResult {
683        let room_id = room_id!("!test_stripped_member_saving:localhost");
684        let user_id = user_id();
685        let second_user_id = user_id!("@second:localhost");
686        let third_user_id = user_id!("@third:localhost");
687        let unknown_user_id = user_id!("@unknown:localhost");
688
689        // No event in store.
690        assert!(self.get_member_event(room_id, user_id).await?.is_none());
691        let member_events = self
692            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
693                room_id,
694                &[user_id.to_owned()],
695            )
696            .await;
697        assert!(member_events?.is_empty());
698
699        // One event in store.
700        let mut changes = StateChanges::default();
701        changes
702            .stripped_state
703            .entry(room_id.to_owned())
704            .or_default()
705            .entry(StateEventType::RoomMember)
706            .or_default()
707            .insert(user_id.into(), stripped_membership_event().cast());
708        self.save_changes(&changes).await?;
709
710        assert!(self.get_member_event(room_id, user_id).await?.is_some());
711        let member_events = self
712            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
713                room_id,
714                &[user_id.to_owned()],
715            )
716            .await;
717        assert_eq!(member_events?.len(), 1);
718        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
719        assert_eq!(members.len(), 1, "We expected to find members for the room");
720
721        // Several events in store.
722        let mut changes = StateChanges::default();
723        let changes_members = changes
724            .stripped_state
725            .entry(room_id.to_owned())
726            .or_default()
727            .entry(StateEventType::RoomMember)
728            .or_default();
729        changes_members
730            .insert(second_user_id.into(), custom_stripped_membership_event(second_user_id).cast());
731        changes_members
732            .insert(third_user_id.into(), custom_stripped_membership_event(third_user_id).cast());
733        self.save_changes(&changes).await?;
734
735        assert!(self.get_member_event(room_id, second_user_id).await?.is_some());
736        assert!(self.get_member_event(room_id, third_user_id).await?.is_some());
737        let member_events = self
738            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
739                room_id,
740                &[user_id.to_owned(), second_user_id.to_owned(), third_user_id.to_owned()],
741            )
742            .await;
743        assert_eq!(member_events?.len(), 3);
744        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
745        assert_eq!(members.len(), 3, "We expected to find members for the room");
746
747        // Several events in store with one unknown.
748        let member_events = self
749            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
750                room_id,
751                &[
752                    user_id.to_owned(),
753                    second_user_id.to_owned(),
754                    third_user_id.to_owned(),
755                    unknown_user_id.to_owned(),
756                ],
757            )
758            .await;
759        assert_eq!(member_events?.len(), 3);
760
761        // Empty user IDs list.
762        let member_events = self
763            .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
764                room_id,
765                &[],
766            )
767            .await;
768        assert!(member_events?.is_empty());
769
770        Ok(())
771    }
772
773    async fn test_power_level_saving(&self) -> TestResult {
774        let room_id = room_id!("!test_power_level_saving:localhost");
775
776        let raw_event = power_level_event();
777        let event = raw_event.deserialize()?;
778
779        assert!(
780            self.get_state_event(room_id, StateEventType::RoomPowerLevels, "").await?.is_none()
781        );
782        let mut changes = StateChanges::default();
783        changes.add_state_event(room_id, event, raw_event);
784
785        self.save_changes(&changes).await?;
786        assert!(
787            self.get_state_event(room_id, StateEventType::RoomPowerLevels, "").await?.is_some()
788        );
789
790        Ok(())
791    }
792
793    async fn test_receipts_saving(&self) -> TestResult {
794        let room_id = room_id!("!test_receipts_saving:localhost");
795
796        let first_event_id = event_id!("$1435641916114394fHBLK:matrix.org");
797        let second_event_id = event_id!("$fHBLK1435641916114394:matrix.org");
798
799        let first_receipt_ts = uint!(1436451550);
800        let second_receipt_ts = uint!(1436451653);
801        let third_receipt_ts = uint!(1436474532);
802
803        let first_receipt_event = serde_json::from_value(json!({
804            first_event_id: {
805                "m.read": {
806                    user_id(): {
807                        "ts": first_receipt_ts,
808                    }
809                }
810            }
811        }))?;
812
813        let second_receipt_event = serde_json::from_value(json!({
814            second_event_id: {
815                "m.read": {
816                    user_id(): {
817                        "ts": second_receipt_ts,
818                    }
819                }
820            }
821        }))?;
822
823        let third_receipt_event = serde_json::from_value(json!({
824            second_event_id: {
825                "m.read": {
826                    user_id(): {
827                        "ts": third_receipt_ts,
828                        "thread_id": "main",
829                    }
830                }
831            }
832        }))?;
833
834        assert!(
835            self.get_user_room_receipt_event(
836                room_id,
837                ReceiptType::Read,
838                ReceiptThread::Unthreaded,
839                user_id()
840            )
841            .await
842            .expect("failed to read unthreaded user room receipt")
843            .is_none()
844        );
845        assert!(
846            self.get_event_room_receipt_events(
847                room_id,
848                ReceiptType::Read,
849                ReceiptThread::Unthreaded,
850                first_event_id
851            )
852            .await
853            .expect("failed to read unthreaded event room receipt for 1")
854            .is_empty()
855        );
856        assert!(
857            self.get_event_room_receipt_events(
858                room_id,
859                ReceiptType::Read,
860                ReceiptThread::Unthreaded,
861                second_event_id
862            )
863            .await
864            .expect("failed to read unthreaded event room receipt for 2")
865            .is_empty()
866        );
867
868        let mut changes = StateChanges::default();
869        changes.add_receipts(room_id, first_receipt_event);
870
871        self.save_changes(&changes).await?;
872        let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
873            .get_user_room_receipt_event(
874                room_id,
875                ReceiptType::Read,
876                ReceiptThread::Unthreaded,
877                user_id(),
878            )
879            .await
880            .expect("failed to read unthreaded user room receipt after save")
881            .unwrap();
882        assert_eq!(unthreaded_user_receipt_event_id, first_event_id);
883        assert_eq!(unthreaded_user_receipt.ts.unwrap().0, first_receipt_ts);
884        let first_event_unthreaded_receipts = self
885            .get_event_room_receipt_events(
886                room_id,
887                ReceiptType::Read,
888                ReceiptThread::Unthreaded,
889                first_event_id,
890            )
891            .await
892            .expect("failed to read unthreaded event room receipt for 1 after save");
893        assert_eq!(
894            first_event_unthreaded_receipts.len(),
895            1,
896            "Found a wrong number of unthreaded receipts for 1 after save"
897        );
898        assert_eq!(first_event_unthreaded_receipts[0].0, user_id());
899        assert_eq!(first_event_unthreaded_receipts[0].1.ts.unwrap().0, first_receipt_ts);
900        assert!(
901            self.get_event_room_receipt_events(
902                room_id,
903                ReceiptType::Read,
904                ReceiptThread::Unthreaded,
905                second_event_id
906            )
907            .await
908            .expect("failed to read unthreaded event room receipt for 2 after save")
909            .is_empty()
910        );
911
912        let mut changes = StateChanges::default();
913        changes.add_receipts(room_id, second_receipt_event);
914
915        self.save_changes(&changes).await.expect("Saving works");
916        let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
917            .get_user_room_receipt_event(
918                room_id,
919                ReceiptType::Read,
920                ReceiptThread::Unthreaded,
921                user_id(),
922            )
923            .await
924            .expect("Getting unthreaded user room receipt after save failed")
925            .unwrap();
926        assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
927        assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
928        assert!(
929            self.get_event_room_receipt_events(
930                room_id,
931                ReceiptType::Read,
932                ReceiptThread::Unthreaded,
933                first_event_id
934            )
935            .await
936            .expect("Getting unthreaded event room receipt events for first event failed")
937            .is_empty()
938        );
939        let second_event_unthreaded_receipts = self
940            .get_event_room_receipt_events(
941                room_id,
942                ReceiptType::Read,
943                ReceiptThread::Unthreaded,
944                second_event_id,
945            )
946            .await
947            .expect("Getting unthreaded event room receipt events for second event failed");
948        assert_eq!(
949            second_event_unthreaded_receipts.len(),
950            1,
951            "Found a wrong number of unthreaded receipts for second event after save"
952        );
953        assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
954        assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
955
956        assert!(
957            self.get_user_room_receipt_event(
958                room_id,
959                ReceiptType::Read,
960                ReceiptThread::Main,
961                user_id()
962            )
963            .await
964            .expect("failed to read threaded user room receipt")
965            .is_none()
966        );
967        assert!(
968            self.get_event_room_receipt_events(
969                room_id,
970                ReceiptType::Read,
971                ReceiptThread::Main,
972                second_event_id
973            )
974            .await
975            .expect("Getting threaded event room receipts for 2 failed")
976            .is_empty()
977        );
978
979        let mut changes = StateChanges::default();
980        changes.add_receipts(room_id, third_receipt_event);
981
982        self.save_changes(&changes).await.expect("Saving works");
983        // Unthreaded receipts should not have changed.
984        let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
985            .get_user_room_receipt_event(
986                room_id,
987                ReceiptType::Read,
988                ReceiptThread::Unthreaded,
989                user_id(),
990            )
991            .await
992            .expect("Getting unthreaded user room receipt after save failed")
993            .unwrap();
994        assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
995        assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
996        let second_event_unthreaded_receipts = self
997            .get_event_room_receipt_events(
998                room_id,
999                ReceiptType::Read,
1000                ReceiptThread::Unthreaded,
1001                second_event_id,
1002            )
1003            .await
1004            .expect("Getting unthreaded event room receipt events for second event failed");
1005        assert_eq!(
1006            second_event_unthreaded_receipts.len(),
1007            1,
1008            "Found a wrong number of unthreaded receipts for second event after save"
1009        );
1010        assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
1011        assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
1012        // Threaded receipts should have changed
1013        let (threaded_user_receipt_event_id, threaded_user_receipt) = self
1014            .get_user_room_receipt_event(room_id, ReceiptType::Read, ReceiptThread::Main, user_id())
1015            .await
1016            .expect("Getting threaded user room receipt after save failed")
1017            .unwrap();
1018        assert_eq!(threaded_user_receipt_event_id, second_event_id);
1019        assert_eq!(threaded_user_receipt.ts.unwrap().0, third_receipt_ts);
1020        let second_event_threaded_receipts = self
1021            .get_event_room_receipt_events(
1022                room_id,
1023                ReceiptType::Read,
1024                ReceiptThread::Main,
1025                second_event_id,
1026            )
1027            .await
1028            .expect("Getting threaded event room receipt events for second event failed");
1029        assert_eq!(
1030            second_event_threaded_receipts.len(),
1031            1,
1032            "Found a wrong number of threaded receipts for second event after save"
1033        );
1034        assert_eq!(second_event_threaded_receipts[0].0, user_id());
1035        assert_eq!(second_event_threaded_receipts[0].1.ts.unwrap().0, third_receipt_ts);
1036
1037        Ok(())
1038    }
1039
1040    async fn test_custom_storage(&self) -> TestResult {
1041        let key = "my_key";
1042        let value = &[0, 1, 2, 3];
1043
1044        self.set_custom_value(key.as_bytes(), value.to_vec()).await?;
1045
1046        let read = self.get_custom_value(key.as_bytes()).await?;
1047
1048        assert_eq!(Some(value.as_ref()), read.as_deref());
1049
1050        Ok(())
1051    }
1052
1053    async fn test_stripped_non_stripped(&self) -> TestResult {
1054        let room_id = room_id!("!test_stripped_non_stripped:localhost");
1055        let user_id = user_id();
1056
1057        assert!(self.get_member_event(room_id, user_id).await?.is_none());
1058        assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 0);
1059
1060        let mut changes = StateChanges::default();
1061        changes
1062            .state
1063            .entry(room_id.to_owned())
1064            .or_default()
1065            .entry(StateEventType::RoomMember)
1066            .or_default()
1067            .insert(user_id.into(), membership_event().cast());
1068        changes.add_room(RoomInfo::new(room_id, RoomState::Left));
1069        self.save_changes(&changes).await?;
1070
1071        let member_event = self.get_member_event(room_id, user_id).await?.unwrap().deserialize()?;
1072        assert!(matches!(member_event, MemberEvent::Sync(_)));
1073        assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 1);
1074
1075        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
1076        assert_eq!(members, vec![user_id.to_owned()]);
1077
1078        let mut changes = StateChanges::default();
1079        changes.add_stripped_member(room_id, user_id, custom_stripped_membership_event(user_id));
1080        changes.add_room(RoomInfo::new(room_id, RoomState::Invited));
1081        self.save_changes(&changes).await?;
1082
1083        let member_event = self.get_member_event(room_id, user_id).await?.unwrap().deserialize()?;
1084        assert!(matches!(member_event, MemberEvent::Stripped(_)));
1085        assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 1);
1086
1087        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
1088        assert_eq!(members, vec![user_id.to_owned()]);
1089
1090        Ok(())
1091    }
1092
1093    async fn test_room_removal(&self) -> TestResult {
1094        let room_id = room_id();
1095        let user_id = user_id();
1096        let display_name = DisplayName::new("example");
1097        let stripped_room_id = stripped_room_id();
1098
1099        self.populate().await?;
1100
1101        {
1102            // Add a send queue request in that room.
1103            let txn = TransactionId::new();
1104            let ev =
1105                SerializableEventContent::new(&RoomMessageEventContent::text_plain("sup").into())?;
1106            self.save_send_queue_request(
1107                room_id,
1108                txn.clone(),
1109                MilliSecondsSinceUnixEpoch::now(),
1110                ev.into(),
1111                0,
1112            )
1113            .await?;
1114
1115            // Add a single dependent queue request.
1116            self.save_dependent_queued_request(
1117                room_id,
1118                &txn,
1119                ChildTransactionId::new(),
1120                MilliSecondsSinceUnixEpoch::now(),
1121                DependentQueuedRequestKind::RedactEvent,
1122            )
1123            .await?;
1124        }
1125
1126        self.remove_room(room_id).await?;
1127
1128        assert_eq!(
1129            self.get_room_infos(&RoomLoadSettings::default()).await?.len(),
1130            1,
1131            "room is still there"
1132        );
1133
1134        assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_none());
1135        assert!(
1136            self.get_state_events(room_id, StateEventType::RoomTopic).await?.is_empty(),
1137            "still state events found"
1138        );
1139        assert!(self.get_profile(room_id, user_id).await?.is_none());
1140        assert!(self.get_member_event(room_id, user_id).await?.is_none());
1141        assert!(
1142            self.get_user_ids(room_id, RoomMemberships::empty()).await?.is_empty(),
1143            "still user ids found"
1144        );
1145        assert!(
1146            self.get_user_ids(room_id, RoomMemberships::INVITE).await?.is_empty(),
1147            "still invited user ids found"
1148        );
1149        assert!(
1150            self.get_user_ids(room_id, RoomMemberships::JOIN).await?.is_empty(),
1151            "still joined users found"
1152        );
1153        assert!(
1154            self.get_users_with_display_name(room_id, &display_name).await?.is_empty(),
1155            "still display names found"
1156        );
1157        assert!(
1158            self.get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
1159                .await?
1160                .is_none()
1161        );
1162        assert!(
1163            self.get_user_room_receipt_event(
1164                room_id,
1165                ReceiptType::Read,
1166                ReceiptThread::Unthreaded,
1167                user_id
1168            )
1169            .await?
1170            .is_none()
1171        );
1172        assert!(
1173            self.get_event_room_receipt_events(
1174                room_id,
1175                ReceiptType::Read,
1176                ReceiptThread::Unthreaded,
1177                first_receipt_event_id()
1178            )
1179            .await?
1180            .is_empty(),
1181            "still event recepts in the store"
1182        );
1183        assert!(self.load_send_queue_requests(room_id).await?.is_empty());
1184        assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1185
1186        self.remove_room(stripped_room_id).await?;
1187
1188        assert!(
1189            self.get_room_infos(&RoomLoadSettings::default()).await?.is_empty(),
1190            "still room info found"
1191        );
1192        Ok(())
1193    }
1194
1195    async fn test_profile_removal(&self) -> TestResult {
1196        let room_id = room_id();
1197
1198        // Both the user id and invited user id get a profile in populate().
1199        let user_id = user_id();
1200        let invited_user_id = invited_user_id();
1201
1202        self.populate().await?;
1203
1204        let new_invite_member_json = json!({
1205            "content": {
1206                "avatar_url": "mxc://localhost/SEsfnsuifSDFSSEG",
1207                "displayname": "example after update",
1208                "membership": "invite",
1209                "reason": "Looking for support"
1210            },
1211            "event_id": "$143273582443PhrSm:localhost",
1212            "origin_server_ts": 1432735824,
1213            "room_id": room_id,
1214            "sender": user_id,
1215            "state_key": invited_user_id,
1216            "type": "m.room.member",
1217        });
1218        let new_invite_member_event: SyncRoomMemberEvent =
1219            serde_json::from_value(new_invite_member_json.clone())?;
1220
1221        let mut changes = StateChanges {
1222            // Both get their profiles deleted…
1223            profiles_to_delete: [(
1224                room_id.to_owned(),
1225                vec![user_id.to_owned(), invited_user_id.to_owned()],
1226            )]
1227            .into(),
1228
1229            // …but the invited user get a new profile.
1230            profiles: {
1231                let mut map = BTreeMap::default();
1232                map.insert(
1233                    room_id.to_owned(),
1234                    [(invited_user_id.to_owned(), new_invite_member_event.into())]
1235                        .into_iter()
1236                        .collect(),
1237                );
1238                map
1239            },
1240
1241            ..StateChanges::default()
1242        };
1243
1244        let raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(new_invite_member_json)
1245            .expect("can create sync-state-event for topic");
1246        let event = raw.deserialize()?;
1247        changes.add_state_event(room_id, event, raw);
1248
1249        self.save_changes(&changes).await?;
1250
1251        // The profile for user has been removed.
1252        assert!(self.get_profile(room_id, user_id).await?.is_none());
1253        assert!(self.get_member_event(room_id, user_id).await?.is_some());
1254
1255        // The profile for the invited user has been updated.
1256        let invited_member_event = self.get_profile(room_id, invited_user_id).await?.unwrap();
1257        assert_eq!(
1258            invited_member_event.content.displayname.as_deref(),
1259            Some("example after update")
1260        );
1261        assert!(self.get_member_event(room_id, invited_user_id).await?.is_some());
1262
1263        Ok(())
1264    }
1265
1266    async fn test_presence_saving(&self) -> TestResult {
1267        let user_id = user_id();
1268        let second_user_id = user_id!("@second:localhost");
1269        let third_user_id = user_id!("@third:localhost");
1270        let unknown_user_id = user_id!("@unknown:localhost");
1271
1272        // No event in store.
1273        let mut user_ids = vec![user_id.to_owned()];
1274        let presence_event = self.get_presence_event(user_id).await;
1275        assert!(presence_event?.is_none());
1276        let presence_events = self.get_presence_events(&user_ids).await;
1277        assert!(presence_events?.is_empty());
1278
1279        // One event in store.
1280        let mut changes = StateChanges::default();
1281        changes.presence.insert(user_id.to_owned(), custom_presence_event(user_id));
1282        self.save_changes(&changes).await?;
1283
1284        let presence_event = self.get_presence_event(user_id).await;
1285        assert!(presence_event?.is_some());
1286        let presence_events = self.get_presence_events(&user_ids).await;
1287        assert_eq!(presence_events?.len(), 1);
1288
1289        // Several events in store.
1290        let mut changes = StateChanges::default();
1291        changes.presence.insert(second_user_id.to_owned(), custom_presence_event(second_user_id));
1292        changes.presence.insert(third_user_id.to_owned(), custom_presence_event(third_user_id));
1293        self.save_changes(&changes).await?;
1294
1295        user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
1296        let presence_event = self.get_presence_event(second_user_id).await;
1297        assert!(presence_event?.is_some());
1298        let presence_event = self.get_presence_event(third_user_id).await;
1299        assert!(presence_event?.is_some());
1300        let presence_events = self.get_presence_events(&user_ids).await;
1301        assert_eq!(presence_events?.len(), 3);
1302
1303        // Several events in store with one unknown.
1304        user_ids.push(unknown_user_id.to_owned());
1305        let member_events = self.get_presence_events(&user_ids).await;
1306        assert_eq!(member_events?.len(), 3);
1307
1308        // Empty user IDs list.
1309        let presence_events = self.get_presence_events(&[]).await;
1310        assert!(presence_events?.is_empty());
1311
1312        Ok(())
1313    }
1314
1315    async fn test_display_names_saving(&self) -> TestResult {
1316        let room_id = room_id!("!test_display_names_saving:localhost");
1317        let user_id = user_id();
1318        let user_display_name = DisplayName::new("User");
1319        let second_user_id = user_id!("@second:localhost");
1320        let third_user_id = user_id!("@third:localhost");
1321        let other_display_name = DisplayName::new("Raoul");
1322        let unknown_display_name = DisplayName::new("Unknown");
1323
1324        // No event in store.
1325        let mut display_names = vec![user_display_name.to_owned()];
1326        let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1327        assert!(users.is_empty());
1328        let names = self.get_users_with_display_names(room_id, &display_names).await?;
1329        assert!(names.is_empty());
1330
1331        // One event in store.
1332        let mut changes = StateChanges::default();
1333        changes
1334            .ambiguity_maps
1335            .entry(room_id.to_owned())
1336            .or_default()
1337            .insert(user_display_name.to_owned(), [user_id.to_owned()].into());
1338        self.save_changes(&changes).await?;
1339
1340        let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1341        assert_eq!(users.len(), 1);
1342        let names = self.get_users_with_display_names(room_id, &display_names).await?;
1343        assert_eq!(names.len(), 1);
1344        assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1345
1346        // Several events in store.
1347        let mut changes = StateChanges::default();
1348        changes.ambiguity_maps.entry(room_id.to_owned()).or_default().insert(
1349            other_display_name.to_owned(),
1350            [second_user_id.to_owned(), third_user_id.to_owned()].into(),
1351        );
1352        self.save_changes(&changes).await?;
1353
1354        display_names.push(other_display_name.to_owned());
1355        let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1356        assert_eq!(users.len(), 1);
1357        let users = self.get_users_with_display_name(room_id, &other_display_name).await?;
1358        assert_eq!(users.len(), 2);
1359        let names = self.get_users_with_display_names(room_id, &display_names).await?;
1360        assert_eq!(names.len(), 2);
1361        assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1362        assert_eq!(names.get(&other_display_name).unwrap().len(), 2);
1363
1364        // Several events in store with one unknown.
1365        display_names.push(unknown_display_name.to_owned());
1366        let names = self.get_users_with_display_names(room_id, &display_names).await?;
1367        assert_eq!(names.len(), 2);
1368
1369        // Empty user IDs list.
1370        let names = self.get_users_with_display_names(room_id, &[]).await?;
1371        assert!(names.is_empty());
1372
1373        Ok(())
1374    }
1375
1376    #[allow(clippy::needless_range_loop)]
1377    async fn test_send_queue(&self) -> TestResult {
1378        let room_id = room_id!("!test_send_queue:localhost");
1379
1380        // No queued event in store at first.
1381        let events = self.load_send_queue_requests(room_id).await?;
1382        assert!(events.is_empty());
1383
1384        // Saving one thing should work.
1385        let txn0 = TransactionId::new();
1386        let event0 =
1387            SerializableEventContent::new(&RoomMessageEventContent::text_plain("msg0").into())?;
1388        self.save_send_queue_request(
1389            room_id,
1390            txn0.clone(),
1391            MilliSecondsSinceUnixEpoch::now(),
1392            event0.into(),
1393            0,
1394        )
1395        .await?;
1396
1397        // Reading it will work.
1398        let pending = self.load_send_queue_requests(room_id).await?;
1399
1400        assert_eq!(pending.len(), 1);
1401        {
1402            assert_eq!(pending[0].transaction_id, txn0);
1403
1404            let deserialized = pending[0].as_event().unwrap().deserialize()?;
1405            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1406            assert_eq!(content.body(), "msg0");
1407
1408            assert!(!pending[0].is_wedged());
1409        }
1410
1411        // Saving another three things should work.
1412        for i in 1..=3 {
1413            let txn = TransactionId::new();
1414            let event = SerializableEventContent::new(
1415                &RoomMessageEventContent::text_plain(format!("msg{i}")).into(),
1416            )?;
1417
1418            self.save_send_queue_request(
1419                room_id,
1420                txn,
1421                MilliSecondsSinceUnixEpoch::now(),
1422                event.into(),
1423                0,
1424            )
1425            .await?;
1426        }
1427
1428        // Reading all the events should work.
1429        let pending = self.load_send_queue_requests(room_id).await?;
1430
1431        // All the events should be retrieved, in the same order.
1432        assert_eq!(pending.len(), 4);
1433
1434        assert_eq!(pending[0].transaction_id, txn0);
1435
1436        for i in 0..4 {
1437            let deserialized = pending[i].as_event().unwrap().deserialize()?;
1438            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1439            assert_eq!(content.body(), format!("msg{i}"));
1440            assert!(!pending[i].is_wedged());
1441        }
1442
1443        // Marking an event as wedged works.
1444        let txn2 = &pending[2].transaction_id;
1445        self.update_send_queue_request_status(
1446            room_id,
1447            txn2,
1448            Some(QueueWedgeError::GenericApiError { msg: "Oops".to_owned() }),
1449        )
1450        .await?;
1451
1452        // And it is reflected.
1453        let pending = self.load_send_queue_requests(room_id).await?;
1454
1455        // All the events should be retrieved, in the same order.
1456        assert_eq!(pending.len(), 4);
1457        assert_eq!(pending[0].transaction_id, txn0);
1458        assert_eq!(pending[2].transaction_id, *txn2);
1459        assert!(pending[2].is_wedged());
1460        let error = pending[2].clone().error.unwrap();
1461        let generic_error = assert_matches!(error, QueueWedgeError::GenericApiError { msg } => msg);
1462        assert_eq!(generic_error, "Oops");
1463        for i in 0..4 {
1464            if i != 2 {
1465                assert!(!pending[i].is_wedged());
1466            }
1467        }
1468
1469        // Updating an event will work, and reset its wedged state to false.
1470        let event0 = SerializableEventContent::new(
1471            &RoomMessageEventContent::text_plain("wow that's a cool test").into(),
1472        )?;
1473        self.update_send_queue_request(room_id, txn2, event0.into()).await?;
1474
1475        // And it is reflected.
1476        let pending = self.load_send_queue_requests(room_id).await?;
1477
1478        assert_eq!(pending.len(), 4);
1479        {
1480            assert_eq!(pending[2].transaction_id, *txn2);
1481
1482            let deserialized = pending[2].as_event().unwrap().deserialize()?;
1483            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1484            assert_eq!(content.body(), "wow that's a cool test");
1485
1486            assert!(!pending[2].is_wedged());
1487
1488            for i in 0..4 {
1489                if i != 2 {
1490                    let deserialized = pending[i].as_event().unwrap().deserialize()?;
1491                    assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1492                    assert_eq!(content.body(), format!("msg{i}"));
1493
1494                    assert!(!pending[i].is_wedged());
1495                }
1496            }
1497        }
1498
1499        // Removing an event works.
1500        self.remove_send_queue_request(room_id, &txn0).await?;
1501
1502        // And it is reflected.
1503        let pending = self.load_send_queue_requests(room_id).await?;
1504
1505        assert_eq!(pending.len(), 3);
1506        assert_eq!(pending[1].transaction_id, *txn2);
1507        for i in 0..3 {
1508            assert_ne!(pending[i].transaction_id, txn0);
1509        }
1510
1511        // Now add one event for two other rooms, remove one of the events, and then
1512        // query all the rooms which have outstanding unsent events.
1513
1514        // Add one event for room2.
1515        let room_id2 = room_id!("!test_send_queue_two:localhost");
1516        {
1517            let txn = TransactionId::new();
1518            let event = SerializableEventContent::new(
1519                &RoomMessageEventContent::text_plain("room2").into(),
1520            )?;
1521            self.save_send_queue_request(
1522                room_id2,
1523                txn.clone(),
1524                MilliSecondsSinceUnixEpoch::now(),
1525                event.into(),
1526                0,
1527            )
1528            .await?;
1529        }
1530
1531        // Add and remove one event for room3.
1532        {
1533            let room_id3 = room_id!("!test_send_queue_three:localhost");
1534            let txn = TransactionId::new();
1535            let event = SerializableEventContent::new(
1536                &RoomMessageEventContent::text_plain("room3").into(),
1537            )?;
1538            self.save_send_queue_request(
1539                room_id3,
1540                txn.clone(),
1541                MilliSecondsSinceUnixEpoch::now(),
1542                event.into(),
1543                0,
1544            )
1545            .await?;
1546
1547            self.remove_send_queue_request(room_id3, &txn).await?;
1548        }
1549
1550        // Query all the rooms which have unsent events. Per the previous steps,
1551        // it should be room1 and room2, not room3.
1552        let outstanding_rooms = self.load_rooms_with_unsent_requests().await?;
1553        assert_eq!(outstanding_rooms.len(), 2);
1554        assert!(outstanding_rooms.iter().any(|room| room == room_id));
1555        assert!(outstanding_rooms.iter().any(|room| room == room_id2));
1556
1557        Ok(())
1558    }
1559
1560    async fn test_send_queue_priority(&self) -> TestResult {
1561        let room_id = room_id!("!test_send_queue:localhost");
1562
1563        // No queued event in store at first.
1564        let events = self.load_send_queue_requests(room_id).await?;
1565        assert!(events.is_empty());
1566
1567        // Saving one request should work.
1568        let low0_txn = TransactionId::new();
1569        let ev0 =
1570            SerializableEventContent::new(&RoomMessageEventContent::text_plain("low0").into())?;
1571        self.save_send_queue_request(
1572            room_id,
1573            low0_txn.clone(),
1574            MilliSecondsSinceUnixEpoch::now(),
1575            ev0.into(),
1576            2,
1577        )
1578        .await?;
1579
1580        // Saving one request with higher priority should work.
1581        let high_txn = TransactionId::new();
1582        let ev1 =
1583            SerializableEventContent::new(&RoomMessageEventContent::text_plain("high").into())?;
1584        self.save_send_queue_request(
1585            room_id,
1586            high_txn.clone(),
1587            MilliSecondsSinceUnixEpoch::now(),
1588            ev1.into(),
1589            10,
1590        )
1591        .await?;
1592
1593        // Saving another request with the low priority should work.
1594        let low1_txn = TransactionId::new();
1595        let ev2 =
1596            SerializableEventContent::new(&RoomMessageEventContent::text_plain("low1").into())?;
1597        self.save_send_queue_request(
1598            room_id,
1599            low1_txn.clone(),
1600            MilliSecondsSinceUnixEpoch::now(),
1601            ev2.into(),
1602            2,
1603        )
1604        .await?;
1605
1606        // The requests should be ordered from higher priority to lower, and when equal,
1607        // should use the insertion order instead.
1608        let pending = self.load_send_queue_requests(room_id).await?;
1609
1610        assert_eq!(pending.len(), 3);
1611        {
1612            assert_eq!(pending[0].transaction_id, high_txn);
1613
1614            let deserialized = pending[0].as_event().unwrap().deserialize()?;
1615            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1616            assert_eq!(content.body(), "high");
1617        }
1618
1619        {
1620            assert_eq!(pending[1].transaction_id, low0_txn);
1621
1622            let deserialized = pending[1].as_event().unwrap().deserialize()?;
1623            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1624            assert_eq!(content.body(), "low0");
1625        }
1626
1627        {
1628            assert_eq!(pending[2].transaction_id, low1_txn);
1629
1630            let deserialized = pending[2].as_event().unwrap().deserialize()?;
1631            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1632            assert_eq!(content.body(), "low1");
1633        }
1634
1635        Ok(())
1636    }
1637
1638    async fn test_send_queue_dependents(&self) -> TestResult {
1639        let room_id = room_id!("!test_send_queue_dependents:localhost");
1640
1641        // Save one send queue event to start with.
1642        let txn0 = TransactionId::new();
1643        let event0 =
1644            SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into())?;
1645        self.save_send_queue_request(
1646            room_id,
1647            txn0.clone(),
1648            MilliSecondsSinceUnixEpoch::now(),
1649            event0.clone().into(),
1650            0,
1651        )
1652        .await?;
1653
1654        // No dependents, to start with.
1655        assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1656
1657        // Save a redaction for that event.
1658        let child_txn = ChildTransactionId::new();
1659        self.save_dependent_queued_request(
1660            room_id,
1661            &txn0,
1662            child_txn.clone(),
1663            MilliSecondsSinceUnixEpoch::now(),
1664            DependentQueuedRequestKind::RedactEvent,
1665        )
1666        .await?;
1667
1668        // It worked.
1669        let dependents = self.load_dependent_queued_requests(room_id).await?;
1670        assert_eq!(dependents.len(), 1);
1671        assert_eq!(dependents[0].parent_transaction_id, txn0);
1672        assert_eq!(dependents[0].own_transaction_id, child_txn);
1673        assert!(dependents[0].parent_key.is_none());
1674        assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1675
1676        // Update the event id.
1677        let (event, event_type) = event0.raw();
1678        let event_id = owned_event_id!("$1");
1679        let num_updated = self
1680            .mark_dependent_queued_requests_as_ready(
1681                room_id,
1682                &txn0,
1683                SentRequestKey::Event {
1684                    event_id: event_id.clone(),
1685                    event: event.clone(),
1686                    event_type: event_type.to_owned(),
1687                },
1688            )
1689            .await?;
1690        assert_eq!(num_updated, 1);
1691
1692        // It worked.
1693        let dependents = self.load_dependent_queued_requests(room_id).await?;
1694        assert_eq!(dependents.len(), 1);
1695        assert_eq!(dependents[0].parent_transaction_id, txn0);
1696        assert_eq!(dependents[0].own_transaction_id, child_txn);
1697        assert_matches!(
1698            dependents[0].parent_key.as_ref(),
1699            Some(SentRequestKey::Event {
1700                event_id: received_event_id,
1701                event: received_event,
1702                event_type: received_event_type
1703            }) => {
1704                assert_eq!(received_event_id, &event_id);
1705                assert_eq!(received_event.json().to_string(), event.json().to_string());
1706                assert_eq!(received_event_type.as_str(), event_type);
1707            }
1708        );
1709        assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1710
1711        // Now remove it.
1712        let removed = self
1713            .remove_dependent_queued_request(room_id, &dependents[0].own_transaction_id)
1714            .await?;
1715        assert!(removed);
1716
1717        // It worked.
1718        assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1719
1720        // Now, inserting a dependent event and removing the original send queue event
1721        // will NOT remove the dependent event.
1722        let txn1 = TransactionId::new();
1723        let event1 =
1724            SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into())?;
1725        self.save_send_queue_request(
1726            room_id,
1727            txn1.clone(),
1728            MilliSecondsSinceUnixEpoch::now(),
1729            event1.into(),
1730            0,
1731        )
1732        .await?;
1733
1734        self.save_dependent_queued_request(
1735            room_id,
1736            &txn0,
1737            ChildTransactionId::new(),
1738            MilliSecondsSinceUnixEpoch::now(),
1739            DependentQueuedRequestKind::RedactEvent,
1740        )
1741        .await?;
1742        assert_eq!(self.load_dependent_queued_requests(room_id).await?.len(), 1);
1743
1744        self.save_dependent_queued_request(
1745            room_id,
1746            &txn1,
1747            ChildTransactionId::new(),
1748            MilliSecondsSinceUnixEpoch::now(),
1749            DependentQueuedRequestKind::EditEvent {
1750                new_content: SerializableEventContent::new(
1751                    &RoomMessageEventContent::text_plain("edit").into(),
1752                )?,
1753            },
1754        )
1755        .await?;
1756        assert_eq!(self.load_dependent_queued_requests(room_id).await?.len(), 2);
1757
1758        // Remove event0 / txn0.
1759        let removed = self.remove_send_queue_request(room_id, &txn0).await?;
1760        assert!(removed);
1761
1762        // This has removed none of the dependent events.
1763        let dependents = self.load_dependent_queued_requests(room_id).await?;
1764        assert_eq!(dependents.len(), 2);
1765
1766        Ok(())
1767    }
1768
1769    async fn test_update_send_queue_dependent(&self) -> TestResult {
1770        let room_id = room_id!("!test_send_queue_dependents:localhost");
1771
1772        let txn = TransactionId::new();
1773
1774        // Save a dependent redaction for an event.
1775        let child_txn = ChildTransactionId::new();
1776
1777        self.save_dependent_queued_request(
1778            room_id,
1779            &txn,
1780            child_txn.clone(),
1781            MilliSecondsSinceUnixEpoch::now(),
1782            DependentQueuedRequestKind::RedactEvent,
1783        )
1784        .await?;
1785
1786        // It worked.
1787        let dependents = self.load_dependent_queued_requests(room_id).await?;
1788        assert_eq!(dependents.len(), 1);
1789        assert_eq!(dependents[0].parent_transaction_id, txn);
1790        assert_eq!(dependents[0].own_transaction_id, child_txn);
1791        assert!(dependents[0].parent_key.is_none());
1792        assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1793
1794        // Make it a reaction, instead of a redaction.
1795        self.update_dependent_queued_request(
1796            room_id,
1797            &child_txn,
1798            DependentQueuedRequestKind::ReactEvent { key: "👍".to_owned() },
1799        )
1800        .await?;
1801
1802        // It worked.
1803        let dependents = self.load_dependent_queued_requests(room_id).await?;
1804        assert_eq!(dependents.len(), 1);
1805        assert_eq!(dependents[0].parent_transaction_id, txn);
1806        assert_eq!(dependents[0].own_transaction_id, child_txn);
1807        assert!(dependents[0].parent_key.is_none());
1808        assert_matches!(
1809            &dependents[0].kind,
1810            DependentQueuedRequestKind::ReactEvent { key } => {
1811                assert_eq!(key, "👍");
1812            }
1813        );
1814
1815        Ok(())
1816    }
1817
1818    async fn test_get_room_infos(&self) -> TestResult {
1819        let room_id_0 = room_id!("!r0");
1820        let room_id_1 = room_id!("!r1");
1821        let room_id_2 = room_id!("!r2");
1822
1823        // There is no room for the moment.
1824        {
1825            assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 0);
1826        }
1827
1828        // Save rooms.
1829        let mut changes = StateChanges::default();
1830        changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
1831        changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
1832        self.save_changes(&changes).await?;
1833
1834        // We can find all the rooms with `RoomLoadSettings::All`.
1835        {
1836            let mut all_rooms = self.get_room_infos(&RoomLoadSettings::All).await?;
1837
1838            // (We need to sort by `room_id` so that the test is stable across all
1839            // `StateStore` implementations).
1840            all_rooms.sort_by(|a, b| a.room_id.cmp(&b.room_id));
1841
1842            assert_eq!(all_rooms.len(), 2);
1843            assert_eq!(all_rooms[0].room_id, room_id_0);
1844            assert_eq!(all_rooms[1].room_id, room_id_1);
1845        }
1846
1847        // We can find a single room with `RoomLoadSettings::One`.
1848        {
1849            let all_rooms =
1850                self.get_room_infos(&RoomLoadSettings::One(room_id_1.to_owned())).await?;
1851
1852            assert_eq!(all_rooms.len(), 1);
1853            assert_eq!(all_rooms[0].room_id, room_id_1);
1854        }
1855
1856        // `RoomLoadSetting::One` can result in loading zero room if the room is
1857        // unknown.
1858        {
1859            let all_rooms =
1860                self.get_room_infos(&RoomLoadSettings::One(room_id_2.to_owned())).await?;
1861
1862            assert_eq!(all_rooms.len(), 0);
1863        }
1864
1865        Ok(())
1866    }
1867
1868    async fn test_thread_subscriptions(&self) -> TestResult {
1869        let first_thread = event_id!("$t1");
1870        let second_thread = event_id!("$t2");
1871
1872        // At first, there is no thread subscription.
1873        let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1874        assert!(maybe_sub.is_none());
1875
1876        let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1877        assert!(maybe_sub.is_none());
1878
1879        // Setting the thread subscription works.
1880        self.upsert_thread_subscriptions(vec![(
1881            room_id(),
1882            first_thread,
1883            StoredThreadSubscription {
1884                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1885                bump_stamp: None,
1886            },
1887        )])
1888        .await?;
1889
1890        self.upsert_thread_subscriptions(vec![(
1891            room_id(),
1892            second_thread,
1893            StoredThreadSubscription {
1894                status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1895                bump_stamp: None,
1896            },
1897        )])
1898        .await?;
1899
1900        // Now, reading the thread subscription returns the expected status.
1901        let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1902        assert_eq!(
1903            maybe_sub,
1904            Some(StoredThreadSubscription {
1905                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1906                bump_stamp: None,
1907            })
1908        );
1909
1910        let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1911        assert_eq!(
1912            maybe_sub,
1913            Some(StoredThreadSubscription {
1914                status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1915                bump_stamp: None,
1916            })
1917        );
1918
1919        // We can override the thread subscription status.
1920        self.upsert_thread_subscriptions(vec![(
1921            room_id(),
1922            first_thread,
1923            StoredThreadSubscription {
1924                status: ThreadSubscriptionStatus::Unsubscribed,
1925                bump_stamp: None,
1926            },
1927        )])
1928        .await?;
1929
1930        // And it's correctly reflected.
1931        let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1932        assert_eq!(
1933            maybe_sub,
1934            Some(StoredThreadSubscription {
1935                status: ThreadSubscriptionStatus::Unsubscribed,
1936                bump_stamp: None,
1937            })
1938        );
1939
1940        // And the second thread is still subscribed.
1941        let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1942        assert_eq!(
1943            maybe_sub,
1944            Some(StoredThreadSubscription {
1945                status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1946                bump_stamp: None,
1947            })
1948        );
1949
1950        // We can remove a thread subscription.
1951        self.remove_thread_subscription(room_id(), second_thread).await?;
1952
1953        // And it's correctly reflected.
1954        let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1955        assert_eq!(maybe_sub, None);
1956
1957        // And the first thread is still unsubscribed.
1958        let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1959        assert_eq!(
1960            maybe_sub,
1961            Some(StoredThreadSubscription {
1962                status: ThreadSubscriptionStatus::Unsubscribed,
1963                bump_stamp: None,
1964            })
1965        );
1966
1967        // Removing a thread subscription for an unknown thread is a no-op.
1968        self.remove_thread_subscription(room_id(), second_thread).await?;
1969
1970        Ok(())
1971    }
1972
1973    async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult {
1974        let threads = [
1975            event_id!("$t1"),
1976            event_id!("$t2"),
1977            event_id!("$t3"),
1978            event_id!("$t4"),
1979            event_id!("$t5"),
1980            event_id!("$t6"),
1981        ];
1982        // Helper for building the input for `upsert_thread_subscriptions()`,
1983        // which is of the type: Vec<(&RoomId, &EventId, StoredThreadSubscription)>
1984        let build_subscription_updates = |subs: &[StoredThreadSubscription]| {
1985            threads
1986                .iter()
1987                .zip(subs)
1988                .map(|(&event_id, &sub)| (room_id(), event_id, sub))
1989                .collect::<Vec<_>>()
1990        };
1991
1992        // Test bump_stamp logic
1993        let initial_subscriptions = build_subscription_updates(&[
1994            StoredThreadSubscription {
1995                status: ThreadSubscriptionStatus::Unsubscribed,
1996                bump_stamp: None,
1997            },
1998            StoredThreadSubscription {
1999                status: ThreadSubscriptionStatus::Unsubscribed,
2000                bump_stamp: Some(14),
2001            },
2002            StoredThreadSubscription {
2003                status: ThreadSubscriptionStatus::Unsubscribed,
2004                bump_stamp: None,
2005            },
2006            StoredThreadSubscription {
2007                status: ThreadSubscriptionStatus::Unsubscribed,
2008                bump_stamp: Some(210),
2009            },
2010            StoredThreadSubscription {
2011                status: ThreadSubscriptionStatus::Unsubscribed,
2012                bump_stamp: Some(5),
2013            },
2014            StoredThreadSubscription {
2015                status: ThreadSubscriptionStatus::Unsubscribed,
2016                bump_stamp: Some(100),
2017            },
2018        ]);
2019
2020        let update_subscriptions = build_subscription_updates(&[
2021            StoredThreadSubscription {
2022                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2023                bump_stamp: None,
2024            },
2025            StoredThreadSubscription {
2026                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2027                bump_stamp: None,
2028            },
2029            StoredThreadSubscription {
2030                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2031                bump_stamp: Some(1101),
2032            },
2033            StoredThreadSubscription {
2034                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2035                bump_stamp: Some(222),
2036            },
2037            StoredThreadSubscription {
2038                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2039                bump_stamp: Some(1),
2040            },
2041            StoredThreadSubscription {
2042                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2043                bump_stamp: Some(100),
2044            },
2045        ]);
2046
2047        let expected_subscriptions = build_subscription_updates(&[
2048            // Status should be updated, because prev and new bump_stamp are both None
2049            StoredThreadSubscription {
2050                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2051                bump_stamp: None,
2052            },
2053            // Status should be updated, but keep initial bump_stamp (new is None)
2054            StoredThreadSubscription {
2055                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2056                bump_stamp: Some(14),
2057            },
2058            // Status should be updated and also bump_stamp should be updated (initial was None)
2059            StoredThreadSubscription {
2060                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2061                bump_stamp: Some(1101),
2062            },
2063            // Status should be updated and also bump_stamp should be updated (initial was lower)
2064            StoredThreadSubscription {
2065                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2066                bump_stamp: Some(222),
2067            },
2068            // Status shouldn't change, as new bump_stamp is lower
2069            StoredThreadSubscription {
2070                status: ThreadSubscriptionStatus::Unsubscribed,
2071                bump_stamp: Some(5),
2072            },
2073            // Status shouldn't change, as bump_stamp is equal to the previous one
2074            StoredThreadSubscription {
2075                status: ThreadSubscriptionStatus::Unsubscribed,
2076                bump_stamp: Some(100),
2077            },
2078        ]);
2079
2080        // Set the initial subscriptions
2081        self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?;
2082
2083        // Assert the subscriptions have been added
2084        for (room_id, thread_id, expected_sub) in &initial_subscriptions {
2085            let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2086            assert_eq!(stored_subscription, Some(*expected_sub));
2087        }
2088
2089        // Update subscriptions
2090        self.upsert_thread_subscriptions(update_subscriptions).await?;
2091
2092        // Assert the expected subscriptions and bump_stamps
2093        for (room_id, thread_id, expected_sub) in &expected_subscriptions {
2094            let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2095            assert_eq!(stored_subscription, Some(*expected_sub));
2096        }
2097
2098        // Test just state changes, but first remove previous subscriptions
2099        for (room_id, thread_id, _) in &expected_subscriptions {
2100            self.remove_thread_subscription(room_id, thread_id).await?;
2101        }
2102
2103        let initial_subscriptions = build_subscription_updates(&[
2104            StoredThreadSubscription {
2105                status: ThreadSubscriptionStatus::Unsubscribed,
2106                bump_stamp: Some(1),
2107            },
2108            StoredThreadSubscription {
2109                status: ThreadSubscriptionStatus::Subscribed { automatic: false },
2110                bump_stamp: Some(1),
2111            },
2112            StoredThreadSubscription {
2113                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2114                bump_stamp: Some(1),
2115            },
2116        ]);
2117
2118        self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?;
2119
2120        for (room_id, thread_id, expected_sub) in &initial_subscriptions {
2121            let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2122            assert_eq!(stored_subscription, Some(*expected_sub));
2123        }
2124
2125        let update_subscriptions = build_subscription_updates(&[
2126            StoredThreadSubscription {
2127                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2128                bump_stamp: Some(2),
2129            },
2130            StoredThreadSubscription {
2131                status: ThreadSubscriptionStatus::Unsubscribed,
2132                bump_stamp: Some(2),
2133            },
2134            StoredThreadSubscription {
2135                status: ThreadSubscriptionStatus::Subscribed { automatic: false },
2136                bump_stamp: Some(2),
2137            },
2138        ]);
2139
2140        self.upsert_thread_subscriptions(update_subscriptions.clone()).await?;
2141
2142        for (room_id, thread_id, expected_sub) in &update_subscriptions {
2143            let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2144            assert_eq!(stored_subscription, Some(*expected_sub));
2145        }
2146
2147        Ok(())
2148    }
2149}
2150
2151/// Macro building to allow your StateStore implementation to run the entire
2152/// tests suite locally.
2153///
2154/// You need to provide a `async fn get_store() -> StoreResult<impl StateStore>`
2155/// providing a fresh store on the same level you invoke the macro.
2156///
2157/// ## Usage Example:
2158/// ```no_run
2159/// # use matrix_sdk_base::store::{
2160/// #    StateStore,
2161/// #    MemoryStore as MyStore,
2162/// #    Result as StoreResult,
2163/// # };
2164///
2165/// #[cfg(test)]
2166/// mod tests {
2167///     use super::{MyStore, StateStore, StoreResult};
2168///
2169///     async fn get_store() -> StoreResult<impl StateStore> {
2170///         Ok(MyStore::new())
2171///     }
2172///
2173///     statestore_integration_tests!();
2174/// }
2175/// ```
2176#[allow(unused_macros, unused_extern_crates)]
2177#[macro_export]
2178macro_rules! statestore_integration_tests {
2179    () => {
2180        mod statestore_integration_tests {
2181            use matrix_sdk_test::{TestResult, async_test};
2182            use $crate::store::{IntoStateStore, StateStoreIntegrationTests};
2183
2184            use super::get_store;
2185
2186            #[async_test]
2187            async fn test_topic_redaction() -> TestResult {
2188                let store = get_store().await?.into_state_store();
2189                store.test_topic_redaction().await
2190            }
2191
2192            #[async_test]
2193            async fn test_populate_store() -> TestResult {
2194                let store = get_store().await?.into_state_store();
2195                store.test_populate_store().await
2196            }
2197
2198            #[async_test]
2199            async fn test_member_saving() -> TestResult {
2200                let store = get_store().await?.into_state_store();
2201                store.test_member_saving().await
2202            }
2203
2204            #[async_test]
2205            async fn test_filter_saving() -> TestResult {
2206                let store = get_store().await?.into_state_store();
2207                store.test_filter_saving().await
2208            }
2209
2210            #[async_test]
2211            async fn test_user_avatar_url_saving() -> TestResult {
2212                let store = get_store().await?.into_state_store();
2213                store.test_user_avatar_url_saving().await
2214            }
2215
2216            #[async_test]
2217            async fn test_supported_versions_saving() -> TestResult {
2218                let store = get_store().await?.into_state_store();
2219                store.test_supported_versions_saving().await
2220            }
2221
2222            #[async_test]
2223            async fn test_well_known_saving() -> TestResult {
2224                let store = get_store().await?.into_state_store();
2225                store.test_well_known_saving().await
2226            }
2227
2228            #[async_test]
2229            async fn test_sync_token_saving() -> TestResult {
2230                let store = get_store().await?.into_state_store();
2231                store.test_sync_token_saving().await
2232            }
2233
2234            #[async_test]
2235            async fn test_utd_hook_manager_data_saving() -> TestResult {
2236                let store = get_store().await?.into_state_store();
2237                store.test_utd_hook_manager_data_saving().await
2238            }
2239
2240            #[async_test]
2241            async fn test_one_time_key_already_uploaded_data_saving() -> TestResult {
2242                let store = get_store().await?.into_state_store();
2243                store.test_one_time_key_already_uploaded_data_saving().await
2244            }
2245
2246            #[async_test]
2247            async fn test_stripped_member_saving() -> TestResult {
2248                let store = get_store().await?.into_state_store();
2249                store.test_stripped_member_saving().await
2250            }
2251
2252            #[async_test]
2253            async fn test_power_level_saving() -> TestResult {
2254                let store = get_store().await?.into_state_store();
2255                store.test_power_level_saving().await
2256            }
2257
2258            #[async_test]
2259            async fn test_receipts_saving() -> TestResult {
2260                let store = get_store().await?.into_state_store();
2261                store.test_receipts_saving().await
2262            }
2263
2264            #[async_test]
2265            async fn test_custom_storage() -> TestResult {
2266                let store = get_store().await?.into_state_store();
2267                store.test_custom_storage().await
2268            }
2269
2270            #[async_test]
2271            async fn test_stripped_non_stripped() -> TestResult {
2272                let store = get_store().await?.into_state_store();
2273                store.test_stripped_non_stripped().await
2274            }
2275
2276            #[async_test]
2277            async fn test_room_removal() -> TestResult {
2278                let store = get_store().await?.into_state_store();
2279                store.test_room_removal().await
2280            }
2281
2282            #[async_test]
2283            async fn test_profile_removal() -> TestResult {
2284                let store = get_store().await?.into_state_store();
2285                store.test_profile_removal().await
2286            }
2287
2288            #[async_test]
2289            async fn test_presence_saving() -> TestResult {
2290                let store = get_store().await?.into_state_store();
2291                store.test_presence_saving().await
2292            }
2293
2294            #[async_test]
2295            async fn test_display_names_saving() -> TestResult {
2296                let store = get_store().await?.into_state_store();
2297                store.test_display_names_saving().await
2298            }
2299
2300            #[async_test]
2301            async fn test_send_queue() -> TestResult {
2302                let store = get_store().await?.into_state_store();
2303                store.test_send_queue().await
2304            }
2305
2306            #[async_test]
2307            async fn test_send_queue_priority() -> TestResult {
2308                let store = get_store().await?.into_state_store();
2309                store.test_send_queue_priority().await
2310            }
2311
2312            #[async_test]
2313            async fn test_send_queue_dependents() -> TestResult {
2314                let store = get_store().await?.into_state_store();
2315                store.test_send_queue_dependents().await
2316            }
2317
2318            #[async_test]
2319            async fn test_update_send_queue_dependent() -> TestResult {
2320                let store = get_store().await?.into_state_store();
2321                store.test_update_send_queue_dependent().await
2322            }
2323
2324            #[async_test]
2325            async fn test_get_room_infos() -> TestResult {
2326                let store = get_store().await?.into_state_store();
2327                store.test_get_room_infos().await
2328            }
2329
2330            #[async_test]
2331            async fn test_thread_subscriptions() -> TestResult {
2332                let store = get_store().await?.into_state_store();
2333                store.test_thread_subscriptions().await
2334            }
2335
2336            #[async_test]
2337            async fn test_thread_subscriptions_bulk_upsert() -> TestResult {
2338                let store = get_store().await?.into_state_store();
2339                store.test_thread_subscriptions_bulk_upsert().await
2340            }
2341        }
2342    };
2343}
2344
2345fn user_id() -> &'static UserId {
2346    user_id!("@example:localhost")
2347}
2348
2349fn invited_user_id() -> &'static UserId {
2350    user_id!("@invited:localhost")
2351}
2352
2353fn room_id() -> &'static RoomId {
2354    room_id!("!test:localhost")
2355}
2356
2357fn stripped_room_id() -> &'static RoomId {
2358    room_id!("!stripped:localhost")
2359}
2360
2361fn first_receipt_event_id() -> &'static EventId {
2362    event_id!("$example")
2363}
2364
2365fn topic_event_id() -> &'static EventId {
2366    event_id!("$topic_event")
2367}
2368
2369fn power_level_event() -> Raw<AnySyncStateEvent> {
2370    let content = RoomPowerLevelsEventContent::new(&AuthorizationRules::V1);
2371
2372    let event = json!({
2373        "event_id": "$h29iv0s8:example.com",
2374        "content": content,
2375        "sender": user_id(),
2376        "type": "m.room.power_levels",
2377        "origin_server_ts": 0u64,
2378        "state_key": "",
2379    });
2380
2381    serde_json::from_value(event).unwrap()
2382}
2383
2384fn stripped_membership_event() -> Raw<StrippedRoomMemberEvent> {
2385    custom_stripped_membership_event(user_id())
2386}
2387
2388fn custom_stripped_membership_event(user_id: &UserId) -> Raw<StrippedRoomMemberEvent> {
2389    let ev_json = json!({
2390        "type": "m.room.member",
2391        "content": RoomMemberEventContent::new(MembershipState::Join),
2392        "sender": user_id,
2393        "state_key": user_id,
2394    });
2395
2396    Raw::new(&ev_json).unwrap().cast_unchecked()
2397}
2398
2399fn membership_event() -> Raw<SyncRoomMemberEvent> {
2400    custom_membership_event(user_id(), event_id!("$h29iv0s8:example.com"))
2401}
2402
2403fn custom_membership_event(user_id: &UserId, event_id: &EventId) -> Raw<SyncRoomMemberEvent> {
2404    let ev_json = json!({
2405        "type": "m.room.member",
2406        "content": RoomMemberEventContent::new(MembershipState::Join),
2407        "event_id": event_id,
2408        "origin_server_ts": 198,
2409        "sender": user_id,
2410        "state_key": user_id,
2411    });
2412
2413    Raw::new(&ev_json).unwrap().cast_unchecked()
2414}
2415
2416fn custom_presence_event(user_id: &UserId) -> Raw<PresenceEvent> {
2417    let ev_json = json!({
2418        "content": {
2419            "presence": "online"
2420        },
2421        "sender": user_id,
2422    });
2423
2424    Raw::new(&ev_json).unwrap().cast_unchecked()
2425}