Skip to main content

matrix_sdk_base/store/
integration_tests.rs

1//! Trait and macro of integration tests for StateStore implementations.
2
3use std::{
4    collections::{BTreeMap, BTreeSet, HashMap},
5    str::FromStr,
6};
7
8use assert_matches::assert_matches;
9use assert_matches2::assert_let;
10use growable_bloom_filter::GrowableBloomBuilder;
11use matrix_sdk_common::ttl::TtlValue;
12use matrix_sdk_test::{TestResult, event_factory::EventFactory};
13use ruma::{
14    EventId, MilliSecondsSinceUnixEpoch, OwnedUserId, RoomId, TransactionId, UserId,
15    api::{
16        FeatureFlag, MatrixVersion,
17        client::discovery::discover_homeserver::{HomeserverInfo, RtcFocusInfo},
18    },
19    event_id,
20    events::{
21        AnyGlobalAccountDataEvent, AnyMessageLikeEventContent, AnyRoomAccountDataEvent,
22        AnyStrippedStateEvent, AnySyncStateEvent, GlobalAccountDataEventType,
23        RoomAccountDataEventType, StateEventType, SyncStateEvent,
24        presence::PresenceEvent,
25        receipt::{ReceiptThread, ReceiptType},
26        room::{
27            member::{
28                MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent,
29                SyncRoomMemberEvent,
30            },
31            message::RoomMessageEventContent,
32            power_levels::RoomPowerLevelsEventContent,
33            redaction::SyncRoomRedactionEvent,
34            topic::RoomTopicEventContent,
35        },
36        tag::{TagInfo, TagName, Tags, UserTagName},
37    },
38    mxc_uri, owned_event_id, owned_mxc_uri,
39    presence::PresenceState,
40    push::Ruleset,
41    room_id,
42    room_version_rules::AuthorizationRules,
43    serde::Raw,
44    uint, user_id,
45};
46use serde_json::json;
47
48use super::{
49    DependentQueuedRequestKind, DisplayName, DynStateStore, RoomLoadSettings,
50    SupportedVersionsResponse, WellKnownResponse, send_queue::SentRequestKey,
51};
52use crate::{
53    RoomInfo, RoomMemberships, RoomState, StateChanges, StateStoreDataKey, StateStoreDataValue,
54    deserialized_responses::MemberEvent,
55    store::{
56        ChildTransactionId, QueueWedgeError, SerializableEventContent, StateStoreExt,
57        StoredThreadSubscription, ThreadSubscriptionStatus,
58    },
59    utils::RawStateEventWithKeys,
60};
61
62/// `StateStore` integration tests.
63///
64/// This trait is not meant to be used directly, but will be used with the
65/// `statestore_integration_tests!` macro.
66#[allow(async_fn_in_trait)]
67pub trait StateStoreIntegrationTests {
68    /// Populate the given `StateStore`.
69    async fn populate(&self) -> TestResult;
70    /// Test room topic redaction.
71    async fn test_topic_redaction(&self) -> TestResult;
72    /// Test populating the store.
73    async fn test_populate_store(&self) -> TestResult;
74    /// Test room member saving.
75    async fn test_member_saving(&self) -> TestResult;
76    /// Test filter saving.
77    async fn test_filter_saving(&self) -> TestResult;
78    /// Test saving a user avatar URL.
79    async fn test_user_avatar_url_saving(&self) -> TestResult;
80    /// Test sync token saving.
81    async fn test_sync_token_saving(&self) -> TestResult;
82    /// Test UtdHookManagerData saving.
83    async fn test_utd_hook_manager_data_saving(&self) -> TestResult;
84    /// Test the saving of the OneTimeKeyAlreadyUploaded key/value data type.
85    async fn test_one_time_key_already_uploaded_data_saving(&self) -> TestResult;
86    /// Test stripped room member saving.
87    async fn test_stripped_member_saving(&self) -> TestResult;
88    /// Test room power levels saving.
89    async fn test_power_level_saving(&self) -> TestResult;
90    /// Test user receipts saving.
91    async fn test_receipts_saving(&self) -> TestResult;
92    /// Test custom storage.
93    async fn test_custom_storage(&self) -> TestResult;
94    /// Test stripped and non-stripped room member saving.
95    async fn test_stripped_non_stripped(&self) -> TestResult;
96    /// Test room removal.
97    async fn test_room_removal(&self) -> TestResult;
98    /// Test profile removal.
99    async fn test_profile_removal(&self) -> TestResult;
100    /// Test presence saving.
101    async fn test_presence_saving(&self) -> TestResult;
102    /// Test display names saving.
103    async fn test_display_names_saving(&self) -> TestResult;
104    /// Test operations with the send queue.
105    async fn test_send_queue(&self) -> TestResult;
106    /// Test priority of operations with the send queue.
107    async fn test_send_queue_priority(&self) -> TestResult;
108    /// Test operations related to send queue dependents.
109    async fn test_send_queue_dependents(&self) -> TestResult;
110    /// Test an update to a send queue dependent request.
111    async fn test_update_send_queue_dependent(&self) -> TestResult;
112    /// Test saving/restoring the supported versions of the server.
113    async fn test_supported_versions_saving(&self) -> TestResult;
114    /// Test saving/restoring the well-known info of the server.
115    async fn test_well_known_saving(&self) -> TestResult;
116    /// Test fetching room infos based on [`RoomLoadSettings`].
117    async fn test_get_room_infos(&self) -> TestResult;
118    /// Test loading thread subscriptions.
119    async fn test_thread_subscriptions(&self) -> TestResult;
120    /// Test thread subscriptions bulk upsert, including bumpstamp semantics.
121    async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult;
122}
123
124impl StateStoreIntegrationTests for DynStateStore {
125    async fn populate(&self) -> TestResult {
126        let mut changes = StateChanges::default();
127
128        let user_id = user_id();
129        let invited_user_id = invited_user_id();
130        let room_id = room_id();
131        let stripped_room_id = stripped_room_id();
132
133        changes.sync_token = Some("t392-516_47314_0_7_1_1_1_11444_1".to_owned());
134
135        let f = EventFactory::new().sender(user_id);
136        let presence_raw: Raw<PresenceEvent> = f
137            .presence(PresenceState::Online)
138            .avatar_url(mxc_uri!("mxc://localhost/wefuiwegh8742w"))
139            .currently_active(false)
140            .last_active_ago(1)
141            .status_msg("Making cupcakes")
142            .into();
143        let presence_event = presence_raw.deserialize()?;
144        changes.add_presence_event(presence_event, presence_raw);
145
146        let f = EventFactory::new().sender(user_id);
147        let pushrules_raw: Raw<AnyGlobalAccountDataEvent> =
148            f.push_rules(Ruleset::server_default(user_id)).into();
149        let pushrules_event = pushrules_raw.deserialize()?;
150        changes.account_data.insert(pushrules_event.event_type(), pushrules_raw);
151
152        let mut room = RoomInfo::new(room_id, RoomState::Joined);
153        room.mark_as_left();
154
155        let f = EventFactory::new().sender(user_id).room(room_id);
156        let mut tags = Tags::new();
157        tags.insert(TagName::Favorite, TagInfo::new());
158        tags.insert(TagName::User(UserTagName::from_str("u.work").unwrap()), TagInfo::new());
159        let tag_raw: Raw<AnyRoomAccountDataEvent> = f.tag(tags).into();
160        let tag_event = tag_raw.deserialize()?;
161        changes.add_room_account_data(room_id, tag_event, tag_raw);
162
163        let f = EventFactory::new().sender(user_id).room(room_id);
164        let name_raw: Raw<AnySyncStateEvent> = f.room_name("room name").into();
165        let name_event = name_raw.deserialize()?;
166        room.handle_state_event(
167            &mut RawStateEventWithKeys::try_from_raw_state_event(name_raw.clone())
168                .expect("generated state event should be valid"),
169        );
170        changes.add_state_event(room_id, name_event, name_raw);
171
172        let f = EventFactory::new().sender(user_id);
173        let receipt_content = f
174            .room(room_id)
175            .read_receipts()
176            .add(event_id!("$example"), user_id, ReceiptType::Read, ReceiptThread::Unthreaded)
177            .into_content();
178        changes.add_receipts(room_id, receipt_content);
179
180        let topic_event_id = topic_event_id();
181        let topic_raw: Raw<AnySyncStateEvent> = EventFactory::new()
182            .room(room_id)
183            .sender(user_id)
184            .room_topic("😀")
185            .event_id(topic_event_id)
186            .prev_content(RoomTopicEventContent::new("test".to_owned()))
187            .into_raw_sync_state();
188        let topic_event = topic_raw.deserialize()?;
189        room.handle_state_event(
190            &mut RawStateEventWithKeys::try_from_raw_state_event(topic_raw.clone())
191                .expect("generated state event should be valid"),
192        );
193        changes.add_state_event(room_id, topic_event, topic_raw);
194
195        let mut room_ambiguity_map = HashMap::new();
196        let mut room_profiles = BTreeMap::new();
197
198        let f = EventFactory::new().sender(user_id).room(room_id);
199        let member_raw: Raw<SyncRoomMemberEvent> =
200            f.member(user_id).display_name("example").previous(MembershipState::Invite).into();
201        let member_event: SyncRoomMemberEvent = member_raw.deserialize()?;
202        let displayname = DisplayName::new(
203            member_event.as_original().unwrap().content.displayname.as_ref().unwrap(),
204        );
205        room_ambiguity_map.insert(displayname.clone(), BTreeSet::from([user_id.to_owned()]));
206        room_profiles.insert(user_id.to_owned(), (&member_event).into());
207
208        let member_raw: Raw<AnySyncStateEvent> = member_raw.cast_unchecked();
209        let member_state_event = member_raw.deserialize()?;
210        changes.add_state_event(room_id, member_state_event, member_raw);
211
212        let f = EventFactory::new().sender(user_id).room(room_id);
213        let invited_member_raw: Raw<SyncRoomMemberEvent> = f
214            .member(invited_user_id)
215            .invited(invited_user_id)
216            .display_name("example")
217            .avatar_url(mxc_uri!("mxc://localhost/SEsfnsuifSDFSSEF"))
218            .reason("Looking for support")
219            .into();
220        // FIXME: Should be stripped room member event
221        let invited_member_event: SyncRoomMemberEvent = invited_member_raw.deserialize()?;
222        room_ambiguity_map.entry(displayname).or_default().insert(invited_user_id.to_owned());
223        room_profiles.insert(invited_user_id.to_owned(), (&invited_member_event).into());
224
225        let invited_member_raw: Raw<AnySyncStateEvent> = invited_member_raw.cast_unchecked();
226        let invited_member_state_event = invited_member_raw.deserialize()?;
227        changes.add_state_event(room_id, invited_member_state_event, invited_member_raw);
228
229        changes.ambiguity_maps.insert(room_id.to_owned(), room_ambiguity_map);
230        changes.profiles.insert(room_id.to_owned(), room_profiles);
231        changes.add_room(room);
232
233        let mut stripped_room = RoomInfo::new(stripped_room_id, RoomState::Invited);
234
235        let f = EventFactory::new().sender(user_id).room(stripped_room_id);
236        let stripped_name_raw: Raw<AnyStrippedStateEvent> = f.room_name("room name").into();
237        let mut stripped_name_event =
238            RawStateEventWithKeys::try_from_raw_state_event(stripped_name_raw).unwrap();
239        stripped_room.handle_stripped_state_event(&mut stripped_name_event);
240        changes.stripped_state.insert(
241            stripped_room_id.to_owned(),
242            BTreeMap::from([(
243                stripped_name_event.event_type,
244                BTreeMap::from([(stripped_name_event.state_key, stripped_name_event.raw)]),
245            )]),
246        );
247
248        changes.add_room(stripped_room);
249
250        let f = EventFactory::new().sender(user_id).room(stripped_room_id);
251        let stripped_member_raw: Raw<StrippedRoomMemberEvent> =
252            f.member(user_id).display_name("example").into();
253        changes.add_stripped_member(stripped_room_id, user_id, stripped_member_raw);
254
255        self.save_changes(&changes).await?;
256
257        Ok(())
258    }
259
260    async fn test_topic_redaction(&self) -> TestResult {
261        let room_id = room_id();
262        let f = EventFactory::new();
263        self.populate().await?;
264
265        assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
266        assert_eq!(
267            self.get_state_event_static::<RoomTopicEventContent>(room_id)
268                .await?
269                .expect("room topic found before redaction")
270                .deserialize()
271                .expect("can deserialize room topic before redaction")
272                .as_sync()
273                .expect("room topic is a sync state event")
274                .as_original()
275                .expect("room topic is not redacted yet")
276                .content
277                .topic,
278            "😀"
279        );
280
281        let mut changes = StateChanges::default();
282
283        let topic_event_id = topic_event_id();
284        let redaction_evt: Raw<SyncRoomRedactionEvent> =
285            f.room(room_id).sender(user_id()).redaction(topic_event_id).into_raw();
286
287        changes.add_redaction(room_id, topic_event_id, redaction_evt);
288        self.save_changes(&changes).await?;
289
290        let redacted_event = self
291            .get_state_event_static::<RoomTopicEventContent>(room_id)
292            .await?
293            .expect("room topic found after redaction")
294            .deserialize()
295            .expect("can deserialize room topic after redaction");
296
297        assert_matches!(redacted_event.as_sync(), Some(SyncStateEvent::Redacted(_)));
298
299        Ok(())
300    }
301
302    async fn test_populate_store(&self) -> TestResult {
303        let room_id = room_id();
304        let user_id = user_id();
305        let display_name = DisplayName::new("example");
306
307        self.populate().await?;
308
309        assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
310        assert!(self.get_presence_event(user_id).await?.is_some());
311        assert_eq!(
312            self.get_room_infos(&RoomLoadSettings::default()).await?.len(),
313            2,
314            "Expected to find 2 room infos"
315        );
316        assert!(
317            self.get_account_data_event(GlobalAccountDataEventType::PushRules).await?.is_some()
318        );
319
320        assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_some());
321        assert_eq!(
322            self.get_state_events(room_id, StateEventType::RoomTopic).await?.len(),
323            1,
324            "Expected to find 1 room topic"
325        );
326        assert!(self.get_profile(room_id, user_id).await?.is_some());
327        assert!(self.get_member_event(room_id, user_id).await?.is_some());
328        assert_eq!(
329            self.get_user_ids(room_id, RoomMemberships::empty()).await?.len(),
330            2,
331            "Expected to find 2 members for room"
332        );
333        assert_eq!(
334            self.get_user_ids(room_id, RoomMemberships::INVITE).await?.len(),
335            1,
336            "Expected to find 1 invited user ids"
337        );
338        assert_eq!(
339            self.get_user_ids(room_id, RoomMemberships::JOIN).await?.len(),
340            1,
341            "Expected to find 1 joined user ids"
342        );
343        assert_eq!(
344            self.get_users_with_display_name(room_id, &display_name).await?.len(),
345            2,
346            "Expected to find 2 display names for room"
347        );
348        assert!(
349            self.get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
350                .await?
351                .is_some()
352        );
353        assert!(
354            self.get_user_room_receipt_event(
355                room_id,
356                ReceiptType::Read,
357                ReceiptThread::Unthreaded,
358                user_id
359            )
360            .await?
361            .is_some()
362        );
363        assert_eq!(
364            self.get_event_room_receipt_events(
365                room_id,
366                ReceiptType::Read,
367                ReceiptThread::Unthreaded,
368                first_receipt_event_id()
369            )
370            .await?
371            .len(),
372            1,
373            "Expected to find 1 read receipt"
374        );
375        Ok(())
376    }
377
378    async fn test_member_saving(&self) -> TestResult {
379        let room_id = room_id!("!test_member_saving:localhost");
380        let user_id = user_id();
381        let second_user_id = user_id!("@second:localhost");
382        let third_user_id = user_id!("@third:localhost");
383        let unknown_user_id = user_id!("@unknown:localhost");
384
385        // No event in store.
386        let mut user_ids = vec![user_id.to_owned()];
387        assert!(self.get_member_event(room_id, user_id).await?.is_none());
388        let member_events = self
389            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
390            .await;
391        assert!(member_events?.is_empty());
392        assert!(self.get_profile(room_id, user_id).await?.is_none());
393        let profiles = self.get_profiles(room_id, &user_ids).await;
394        assert!(profiles?.is_empty());
395
396        // One event in store.
397        let mut changes = StateChanges::default();
398        let raw_member_event = membership_event();
399        let profile = raw_member_event.deserialize()?.into();
400        changes
401            .state
402            .entry(room_id.to_owned())
403            .or_default()
404            .entry(StateEventType::RoomMember)
405            .or_default()
406            .insert(user_id.into(), raw_member_event.cast());
407        changes.profiles.entry(room_id.to_owned()).or_default().insert(user_id.to_owned(), profile);
408        self.save_changes(&changes).await?;
409
410        assert!(self.get_member_event(room_id, user_id).await?.is_some());
411        let member_events = self
412            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
413            .await;
414        assert_eq!(member_events?.len(), 1);
415        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
416        assert_eq!(members.len(), 1, "We expected to find members for the room");
417        assert!(self.get_profile(room_id, user_id).await?.is_some());
418        let profiles = self.get_profiles(room_id, &user_ids).await;
419        assert_eq!(profiles?.len(), 1);
420
421        // Several events in store.
422        let mut changes = StateChanges::default();
423        let changes_members = changes
424            .state
425            .entry(room_id.to_owned())
426            .or_default()
427            .entry(StateEventType::RoomMember)
428            .or_default();
429        let changes_profiles = changes.profiles.entry(room_id.to_owned()).or_default();
430        let raw_second_member_event =
431            custom_membership_event(second_user_id, event_id!("$second_member_event"));
432        let second_profile = raw_second_member_event.deserialize()?.into();
433        changes_members.insert(second_user_id.into(), raw_second_member_event.cast());
434        changes_profiles.insert(second_user_id.to_owned(), second_profile);
435        let raw_third_member_event =
436            custom_membership_event(third_user_id, event_id!("$third_member_event"));
437        let third_profile = raw_third_member_event.deserialize()?.into();
438        changes_members.insert(third_user_id.into(), raw_third_member_event.cast());
439        changes_profiles.insert(third_user_id.to_owned(), third_profile);
440        self.save_changes(&changes).await?;
441
442        user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
443        assert!(self.get_member_event(room_id, second_user_id).await?.is_some());
444        assert!(self.get_member_event(room_id, third_user_id).await?.is_some());
445        let member_events = self
446            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
447            .await;
448        assert_eq!(member_events?.len(), 3);
449        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
450        assert_eq!(members.len(), 3, "We expected to find members for the room");
451        assert!(self.get_profile(room_id, second_user_id).await?.is_some());
452        assert!(self.get_profile(room_id, third_user_id).await?.is_some());
453        let profiles = self.get_profiles(room_id, &user_ids).await;
454        assert_eq!(profiles?.len(), 3);
455
456        // Several events in store with one unknown.
457        user_ids.push(unknown_user_id.to_owned());
458        let member_events = self
459            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
460            .await;
461        assert_eq!(member_events?.len(), 3);
462        let profiles = self.get_profiles(room_id, &user_ids).await;
463        assert_eq!(profiles?.len(), 3);
464
465        // Empty user IDs list.
466        let member_events = self
467            .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
468                room_id,
469                &[],
470            )
471            .await;
472        assert!(member_events?.is_empty());
473        let profiles = self.get_profiles(room_id, &[]).await;
474        assert!(profiles?.is_empty());
475
476        Ok(())
477    }
478
479    async fn test_filter_saving(&self) -> TestResult {
480        let filter_name = "filter_name";
481        let filter_id = "filter_id_1234";
482
483        self.set_kv_data(
484            StateStoreDataKey::Filter(filter_name),
485            StateStoreDataValue::Filter(filter_id.to_owned()),
486        )
487        .await?;
488        assert_let!(
489            Ok(Some(StateStoreDataValue::Filter(stored_filter_id))) =
490                self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await
491        );
492        assert_eq!(stored_filter_id, filter_id);
493
494        self.remove_kv_data(StateStoreDataKey::Filter(filter_name)).await?;
495        assert_matches!(self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await, Ok(None));
496
497        Ok(())
498    }
499
500    async fn test_user_avatar_url_saving(&self) -> TestResult {
501        let user_id = user_id!("@alice:example.org");
502        let url = owned_mxc_uri!("mxc://example.org/poiuyt098");
503
504        self.set_kv_data(
505            StateStoreDataKey::UserAvatarUrl(user_id),
506            StateStoreDataValue::UserAvatarUrl(url.clone()),
507        )
508        .await?;
509
510        assert_let!(
511            Ok(Some(StateStoreDataValue::UserAvatarUrl(stored_url))) =
512                self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await
513        );
514        assert_eq!(stored_url, url);
515
516        self.remove_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await?;
517        assert_matches!(
518            self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await,
519            Ok(None)
520        );
521
522        Ok(())
523    }
524
525    async fn test_supported_versions_saving(&self) -> TestResult {
526        let versions =
527            BTreeSet::from([MatrixVersion::V1_1, MatrixVersion::V1_2, MatrixVersion::V1_11]);
528        let supported_versions = SupportedVersionsResponse {
529            versions: versions.iter().map(|version| version.as_str().unwrap().to_owned()).collect(),
530            unstable_features: [("org.matrix.experimental".to_owned(), true)].into(),
531        };
532
533        self.set_kv_data(
534            StateStoreDataKey::SupportedVersions,
535            StateStoreDataValue::SupportedVersions(TtlValue::new(supported_versions.clone())),
536        )
537        .await?;
538
539        assert_let!(
540            Ok(Some(StateStoreDataValue::SupportedVersions(stored_supported_versions))) =
541                self.get_kv_data(StateStoreDataKey::SupportedVersions).await
542        );
543        let stored_supported_versions = stored_supported_versions.into_data();
544        assert_eq!(supported_versions, stored_supported_versions);
545
546        let stored_supported = stored_supported_versions.supported_versions();
547        assert_eq!(stored_supported.versions, versions);
548        assert_eq!(stored_supported.features.len(), 1);
549        assert!(stored_supported.features.contains(&FeatureFlag::from("org.matrix.experimental")));
550
551        self.remove_kv_data(StateStoreDataKey::SupportedVersions).await?;
552        assert_matches!(self.get_kv_data(StateStoreDataKey::SupportedVersions).await, Ok(None));
553
554        Ok(())
555    }
556
557    async fn test_well_known_saving(&self) -> TestResult {
558        let well_known = WellKnownResponse {
559            homeserver: HomeserverInfo::new("matrix.example.com".to_owned()),
560            identity_server: None,
561            tile_server: None,
562            rtc_foci: vec![RtcFocusInfo::livekit("livekit.example.com".to_owned())],
563        };
564
565        self.set_kv_data(
566            StateStoreDataKey::WellKnown,
567            StateStoreDataValue::WellKnown(TtlValue::new(Some(well_known.clone()))),
568        )
569        .await?;
570
571        assert_let!(
572            Ok(Some(StateStoreDataValue::WellKnown(stored_well_known))) =
573                self.get_kv_data(StateStoreDataKey::WellKnown).await
574        );
575        let stored_well_known = stored_well_known.into_data();
576        assert_eq!(stored_well_known, Some(well_known));
577
578        self.remove_kv_data(StateStoreDataKey::WellKnown).await?;
579        assert_matches!(self.get_kv_data(StateStoreDataKey::WellKnown).await, Ok(None));
580
581        self.set_kv_data(
582            StateStoreDataKey::WellKnown,
583            StateStoreDataValue::WellKnown(TtlValue::new(None)),
584        )
585        .await?;
586
587        assert_let!(
588            Ok(Some(StateStoreDataValue::WellKnown(stored_well_known))) =
589                self.get_kv_data(StateStoreDataKey::WellKnown).await
590        );
591        let stored_well_known = stored_well_known.into_data();
592        assert_eq!(stored_well_known, None);
593
594        Ok(())
595    }
596
597    async fn test_sync_token_saving(&self) -> TestResult {
598        let sync_token_1 = "t392-516_47314_0_7_1";
599        let sync_token_2 = "t392-516_47314_0_7_2";
600
601        assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
602
603        let changes =
604            StateChanges { sync_token: Some(sync_token_1.to_owned()), ..Default::default() };
605        self.save_changes(&changes).await?;
606        assert_let!(
607            Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
608                self.get_kv_data(StateStoreDataKey::SyncToken).await
609        );
610        assert_eq!(stored_sync_token, sync_token_1);
611
612        self.set_kv_data(
613            StateStoreDataKey::SyncToken,
614            StateStoreDataValue::SyncToken(sync_token_2.to_owned()),
615        )
616        .await?;
617        assert_let!(
618            Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
619                self.get_kv_data(StateStoreDataKey::SyncToken).await
620        );
621        assert_eq!(stored_sync_token, sync_token_2);
622
623        self.remove_kv_data(StateStoreDataKey::SyncToken).await?;
624        assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
625
626        Ok(())
627    }
628
629    async fn test_utd_hook_manager_data_saving(&self) -> TestResult {
630        // Before any data is written, the getter should return None.
631        assert!(
632            self.get_kv_data(StateStoreDataKey::UtdHookManagerData)
633                .await
634                .expect("Could not read data")
635                .is_none(),
636            "Store was not empty at start"
637        );
638
639        // Put some data in the store...
640        let data = GrowableBloomBuilder::new().build();
641        self.set_kv_data(
642            StateStoreDataKey::UtdHookManagerData,
643            StateStoreDataValue::UtdHookManagerData(data.clone()),
644        )
645        .await
646        .expect("Could not save data");
647
648        // ... and check it comes back.
649        let read_data = self
650            .get_kv_data(StateStoreDataKey::UtdHookManagerData)
651            .await
652            .expect("Could not read data")
653            .expect("no data found")
654            .into_utd_hook_manager_data()
655            .expect("not UtdHookManagerData");
656
657        assert_eq!(read_data, data);
658
659        Ok(())
660    }
661
662    async fn test_one_time_key_already_uploaded_data_saving(&self) -> TestResult {
663        // Before any data is written, the getter should return None.
664        assert!(
665            self.get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded).await?.is_none(),
666            "Store was not empty at start"
667        );
668
669        self.set_kv_data(
670            StateStoreDataKey::OneTimeKeyAlreadyUploaded,
671            StateStoreDataValue::OneTimeKeyAlreadyUploaded,
672        )
673        .await?;
674
675        let data = self.get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded).await?;
676        data.expect("The loaded data should be Some");
677
678        Ok(())
679    }
680
681    async fn test_stripped_member_saving(&self) -> TestResult {
682        let room_id = room_id!("!test_stripped_member_saving:localhost");
683        let user_id = user_id();
684        let second_user_id = user_id!("@second:localhost");
685        let third_user_id = user_id!("@third:localhost");
686        let unknown_user_id = user_id!("@unknown:localhost");
687
688        // No event in store.
689        assert!(self.get_member_event(room_id, user_id).await?.is_none());
690        let member_events = self
691            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
692                room_id,
693                &[user_id.to_owned()],
694            )
695            .await;
696        assert!(member_events?.is_empty());
697
698        // One event in store.
699        let mut changes = StateChanges::default();
700        changes
701            .stripped_state
702            .entry(room_id.to_owned())
703            .or_default()
704            .entry(StateEventType::RoomMember)
705            .or_default()
706            .insert(user_id.into(), stripped_membership_event().cast());
707        self.save_changes(&changes).await?;
708
709        assert!(self.get_member_event(room_id, user_id).await?.is_some());
710        let member_events = self
711            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
712                room_id,
713                &[user_id.to_owned()],
714            )
715            .await;
716        assert_eq!(member_events?.len(), 1);
717        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
718        assert_eq!(members.len(), 1, "We expected to find members for the room");
719
720        // Several events in store.
721        let mut changes = StateChanges::default();
722        let changes_members = changes
723            .stripped_state
724            .entry(room_id.to_owned())
725            .or_default()
726            .entry(StateEventType::RoomMember)
727            .or_default();
728        changes_members
729            .insert(second_user_id.into(), custom_stripped_membership_event(second_user_id).cast());
730        changes_members
731            .insert(third_user_id.into(), custom_stripped_membership_event(third_user_id).cast());
732        self.save_changes(&changes).await?;
733
734        assert!(self.get_member_event(room_id, second_user_id).await?.is_some());
735        assert!(self.get_member_event(room_id, third_user_id).await?.is_some());
736        let member_events = self
737            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
738                room_id,
739                &[user_id.to_owned(), second_user_id.to_owned(), third_user_id.to_owned()],
740            )
741            .await;
742        assert_eq!(member_events?.len(), 3);
743        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
744        assert_eq!(members.len(), 3, "We expected to find members for the room");
745
746        // Several events in store with one unknown.
747        let member_events = self
748            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
749                room_id,
750                &[
751                    user_id.to_owned(),
752                    second_user_id.to_owned(),
753                    third_user_id.to_owned(),
754                    unknown_user_id.to_owned(),
755                ],
756            )
757            .await;
758        assert_eq!(member_events?.len(), 3);
759
760        // Empty user IDs list.
761        let member_events = self
762            .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
763                room_id,
764                &[],
765            )
766            .await;
767        assert!(member_events?.is_empty());
768
769        Ok(())
770    }
771
772    async fn test_power_level_saving(&self) -> TestResult {
773        let room_id = room_id!("!test_power_level_saving:localhost");
774
775        let raw_event = power_level_event();
776        let event = raw_event.deserialize()?;
777
778        assert!(
779            self.get_state_event(room_id, StateEventType::RoomPowerLevels, "").await?.is_none()
780        );
781        let mut changes = StateChanges::default();
782        changes.add_state_event(room_id, event, raw_event);
783
784        self.save_changes(&changes).await?;
785        assert!(
786            self.get_state_event(room_id, StateEventType::RoomPowerLevels, "").await?.is_some()
787        );
788
789        Ok(())
790    }
791
792    async fn test_receipts_saving(&self) -> TestResult {
793        let room_id = room_id!("!test_receipts_saving:localhost");
794
795        let first_event_id = event_id!("$1435641916114394fHBLK:matrix.org");
796        let second_event_id = event_id!("$fHBLK1435641916114394:matrix.org");
797
798        let first_receipt_ts = uint!(1436451550);
799        let second_receipt_ts = uint!(1436451653);
800        let third_receipt_ts = uint!(1436474532);
801
802        let first_receipt_event = serde_json::from_value(json!({
803            first_event_id: {
804                "m.read": {
805                    user_id(): {
806                        "ts": first_receipt_ts,
807                    }
808                }
809            }
810        }))?;
811
812        let second_receipt_event = serde_json::from_value(json!({
813            second_event_id: {
814                "m.read": {
815                    user_id(): {
816                        "ts": second_receipt_ts,
817                    }
818                }
819            }
820        }))?;
821
822        let third_receipt_event = serde_json::from_value(json!({
823            second_event_id: {
824                "m.read": {
825                    user_id(): {
826                        "ts": third_receipt_ts,
827                        "thread_id": "main",
828                    }
829                }
830            }
831        }))?;
832
833        assert!(
834            self.get_user_room_receipt_event(
835                room_id,
836                ReceiptType::Read,
837                ReceiptThread::Unthreaded,
838                user_id()
839            )
840            .await
841            .expect("failed to read unthreaded user room receipt")
842            .is_none()
843        );
844        assert!(
845            self.get_event_room_receipt_events(
846                room_id,
847                ReceiptType::Read,
848                ReceiptThread::Unthreaded,
849                first_event_id
850            )
851            .await
852            .expect("failed to read unthreaded event room receipt for 1")
853            .is_empty()
854        );
855        assert!(
856            self.get_event_room_receipt_events(
857                room_id,
858                ReceiptType::Read,
859                ReceiptThread::Unthreaded,
860                second_event_id
861            )
862            .await
863            .expect("failed to read unthreaded event room receipt for 2")
864            .is_empty()
865        );
866
867        let mut changes = StateChanges::default();
868        changes.add_receipts(room_id, first_receipt_event);
869
870        self.save_changes(&changes).await?;
871        let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
872            .get_user_room_receipt_event(
873                room_id,
874                ReceiptType::Read,
875                ReceiptThread::Unthreaded,
876                user_id(),
877            )
878            .await
879            .expect("failed to read unthreaded user room receipt after save")
880            .unwrap();
881        assert_eq!(unthreaded_user_receipt_event_id, first_event_id);
882        assert_eq!(unthreaded_user_receipt.ts.unwrap().0, first_receipt_ts);
883        let first_event_unthreaded_receipts = self
884            .get_event_room_receipt_events(
885                room_id,
886                ReceiptType::Read,
887                ReceiptThread::Unthreaded,
888                first_event_id,
889            )
890            .await
891            .expect("failed to read unthreaded event room receipt for 1 after save");
892        assert_eq!(
893            first_event_unthreaded_receipts.len(),
894            1,
895            "Found a wrong number of unthreaded receipts for 1 after save"
896        );
897        assert_eq!(first_event_unthreaded_receipts[0].0, user_id());
898        assert_eq!(first_event_unthreaded_receipts[0].1.ts.unwrap().0, first_receipt_ts);
899        assert!(
900            self.get_event_room_receipt_events(
901                room_id,
902                ReceiptType::Read,
903                ReceiptThread::Unthreaded,
904                second_event_id
905            )
906            .await
907            .expect("failed to read unthreaded event room receipt for 2 after save")
908            .is_empty()
909        );
910
911        let mut changes = StateChanges::default();
912        changes.add_receipts(room_id, second_receipt_event);
913
914        self.save_changes(&changes).await.expect("Saving works");
915        let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
916            .get_user_room_receipt_event(
917                room_id,
918                ReceiptType::Read,
919                ReceiptThread::Unthreaded,
920                user_id(),
921            )
922            .await
923            .expect("Getting unthreaded user room receipt after save failed")
924            .unwrap();
925        assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
926        assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
927        assert!(
928            self.get_event_room_receipt_events(
929                room_id,
930                ReceiptType::Read,
931                ReceiptThread::Unthreaded,
932                first_event_id
933            )
934            .await
935            .expect("Getting unthreaded event room receipt events for first event failed")
936            .is_empty()
937        );
938        let second_event_unthreaded_receipts = self
939            .get_event_room_receipt_events(
940                room_id,
941                ReceiptType::Read,
942                ReceiptThread::Unthreaded,
943                second_event_id,
944            )
945            .await
946            .expect("Getting unthreaded event room receipt events for second event failed");
947        assert_eq!(
948            second_event_unthreaded_receipts.len(),
949            1,
950            "Found a wrong number of unthreaded receipts for second event after save"
951        );
952        assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
953        assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
954
955        assert!(
956            self.get_user_room_receipt_event(
957                room_id,
958                ReceiptType::Read,
959                ReceiptThread::Main,
960                user_id()
961            )
962            .await
963            .expect("failed to read threaded user room receipt")
964            .is_none()
965        );
966        assert!(
967            self.get_event_room_receipt_events(
968                room_id,
969                ReceiptType::Read,
970                ReceiptThread::Main,
971                second_event_id
972            )
973            .await
974            .expect("Getting threaded event room receipts for 2 failed")
975            .is_empty()
976        );
977
978        let mut changes = StateChanges::default();
979        changes.add_receipts(room_id, third_receipt_event);
980
981        self.save_changes(&changes).await.expect("Saving works");
982        // Unthreaded receipts should not have changed.
983        let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
984            .get_user_room_receipt_event(
985                room_id,
986                ReceiptType::Read,
987                ReceiptThread::Unthreaded,
988                user_id(),
989            )
990            .await
991            .expect("Getting unthreaded user room receipt after save failed")
992            .unwrap();
993        assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
994        assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
995        let second_event_unthreaded_receipts = self
996            .get_event_room_receipt_events(
997                room_id,
998                ReceiptType::Read,
999                ReceiptThread::Unthreaded,
1000                second_event_id,
1001            )
1002            .await
1003            .expect("Getting unthreaded event room receipt events for second event failed");
1004        assert_eq!(
1005            second_event_unthreaded_receipts.len(),
1006            1,
1007            "Found a wrong number of unthreaded receipts for second event after save"
1008        );
1009        assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
1010        assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
1011        // Threaded receipts should have changed
1012        let (threaded_user_receipt_event_id, threaded_user_receipt) = self
1013            .get_user_room_receipt_event(room_id, ReceiptType::Read, ReceiptThread::Main, user_id())
1014            .await
1015            .expect("Getting threaded user room receipt after save failed")
1016            .unwrap();
1017        assert_eq!(threaded_user_receipt_event_id, second_event_id);
1018        assert_eq!(threaded_user_receipt.ts.unwrap().0, third_receipt_ts);
1019        let second_event_threaded_receipts = self
1020            .get_event_room_receipt_events(
1021                room_id,
1022                ReceiptType::Read,
1023                ReceiptThread::Main,
1024                second_event_id,
1025            )
1026            .await
1027            .expect("Getting threaded event room receipt events for second event failed");
1028        assert_eq!(
1029            second_event_threaded_receipts.len(),
1030            1,
1031            "Found a wrong number of threaded receipts for second event after save"
1032        );
1033        assert_eq!(second_event_threaded_receipts[0].0, user_id());
1034        assert_eq!(second_event_threaded_receipts[0].1.ts.unwrap().0, third_receipt_ts);
1035
1036        Ok(())
1037    }
1038
1039    async fn test_custom_storage(&self) -> TestResult {
1040        let key = "my_key";
1041        let value = &[0, 1, 2, 3];
1042
1043        self.set_custom_value(key.as_bytes(), value.to_vec()).await?;
1044
1045        let read = self.get_custom_value(key.as_bytes()).await?;
1046
1047        assert_eq!(Some(value.as_ref()), read.as_deref());
1048
1049        Ok(())
1050    }
1051
1052    async fn test_stripped_non_stripped(&self) -> TestResult {
1053        let room_id = room_id!("!test_stripped_non_stripped:localhost");
1054        let user_id = user_id();
1055
1056        assert!(self.get_member_event(room_id, user_id).await?.is_none());
1057        assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 0);
1058
1059        let mut changes = StateChanges::default();
1060        changes
1061            .state
1062            .entry(room_id.to_owned())
1063            .or_default()
1064            .entry(StateEventType::RoomMember)
1065            .or_default()
1066            .insert(user_id.into(), membership_event().cast());
1067        changes.add_room(RoomInfo::new(room_id, RoomState::Left));
1068        self.save_changes(&changes).await?;
1069
1070        let member_event = self.get_member_event(room_id, user_id).await?.unwrap().deserialize()?;
1071        assert!(matches!(member_event, MemberEvent::Sync(_)));
1072        assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 1);
1073
1074        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
1075        assert_eq!(members, vec![user_id.to_owned()]);
1076
1077        let mut changes = StateChanges::default();
1078        changes.add_stripped_member(room_id, user_id, custom_stripped_membership_event(user_id));
1079        changes.add_room(RoomInfo::new(room_id, RoomState::Invited));
1080        self.save_changes(&changes).await?;
1081
1082        let member_event = self.get_member_event(room_id, user_id).await?.unwrap().deserialize()?;
1083        assert!(matches!(member_event, MemberEvent::Stripped(_)));
1084        assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 1);
1085
1086        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
1087        assert_eq!(members, vec![user_id.to_owned()]);
1088
1089        Ok(())
1090    }
1091
1092    async fn test_room_removal(&self) -> TestResult {
1093        let room_id = room_id();
1094        let user_id = user_id();
1095        let display_name = DisplayName::new("example");
1096        let stripped_room_id = stripped_room_id();
1097
1098        self.populate().await?;
1099
1100        {
1101            // Add a send queue request in that room.
1102            let txn = TransactionId::new();
1103            let ev =
1104                SerializableEventContent::new(&RoomMessageEventContent::text_plain("sup").into())?;
1105            self.save_send_queue_request(
1106                room_id,
1107                txn.clone(),
1108                MilliSecondsSinceUnixEpoch::now(),
1109                ev.into(),
1110                0,
1111            )
1112            .await?;
1113
1114            // Add a single dependent queue request.
1115            self.save_dependent_queued_request(
1116                room_id,
1117                &txn,
1118                ChildTransactionId::new(),
1119                MilliSecondsSinceUnixEpoch::now(),
1120                DependentQueuedRequestKind::RedactEvent,
1121            )
1122            .await?;
1123        }
1124
1125        self.remove_room(room_id).await?;
1126
1127        assert_eq!(
1128            self.get_room_infos(&RoomLoadSettings::default()).await?.len(),
1129            1,
1130            "room is still there"
1131        );
1132
1133        assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_none());
1134        assert!(
1135            self.get_state_events(room_id, StateEventType::RoomTopic).await?.is_empty(),
1136            "still state events found"
1137        );
1138        assert!(self.get_profile(room_id, user_id).await?.is_none());
1139        assert!(self.get_member_event(room_id, user_id).await?.is_none());
1140        assert!(
1141            self.get_user_ids(room_id, RoomMemberships::empty()).await?.is_empty(),
1142            "still user ids found"
1143        );
1144        assert!(
1145            self.get_user_ids(room_id, RoomMemberships::INVITE).await?.is_empty(),
1146            "still invited user ids found"
1147        );
1148        assert!(
1149            self.get_user_ids(room_id, RoomMemberships::JOIN).await?.is_empty(),
1150            "still joined users found"
1151        );
1152        assert!(
1153            self.get_users_with_display_name(room_id, &display_name).await?.is_empty(),
1154            "still display names found"
1155        );
1156        assert!(
1157            self.get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
1158                .await?
1159                .is_none()
1160        );
1161        assert!(
1162            self.get_user_room_receipt_event(
1163                room_id,
1164                ReceiptType::Read,
1165                ReceiptThread::Unthreaded,
1166                user_id
1167            )
1168            .await?
1169            .is_none()
1170        );
1171        assert!(
1172            self.get_event_room_receipt_events(
1173                room_id,
1174                ReceiptType::Read,
1175                ReceiptThread::Unthreaded,
1176                first_receipt_event_id()
1177            )
1178            .await?
1179            .is_empty(),
1180            "still event recepts in the store"
1181        );
1182        assert!(self.load_send_queue_requests(room_id).await?.is_empty());
1183        assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1184
1185        self.remove_room(stripped_room_id).await?;
1186
1187        assert!(
1188            self.get_room_infos(&RoomLoadSettings::default()).await?.is_empty(),
1189            "still room info found"
1190        );
1191        Ok(())
1192    }
1193
1194    async fn test_profile_removal(&self) -> TestResult {
1195        let room_id = room_id();
1196
1197        // Both the user id and invited user id get a profile in populate().
1198        let user_id = user_id();
1199        let invited_user_id = invited_user_id();
1200
1201        self.populate().await?;
1202
1203        let new_invite_member_json = json!({
1204            "content": {
1205                "avatar_url": "mxc://localhost/SEsfnsuifSDFSSEG",
1206                "displayname": "example after update",
1207                "membership": "invite",
1208                "reason": "Looking for support"
1209            },
1210            "event_id": "$143273582443PhrSm:localhost",
1211            "origin_server_ts": 1432735824,
1212            "room_id": room_id,
1213            "sender": user_id,
1214            "state_key": invited_user_id,
1215            "type": "m.room.member",
1216        });
1217        let new_invite_member_event: SyncRoomMemberEvent =
1218            serde_json::from_value(new_invite_member_json.clone())?;
1219
1220        let mut changes = StateChanges {
1221            // Both get their profiles deleted…
1222            profiles_to_delete: [(
1223                room_id.to_owned(),
1224                vec![user_id.to_owned(), invited_user_id.to_owned()],
1225            )]
1226            .into(),
1227
1228            // …but the invited user get a new profile.
1229            profiles: {
1230                let mut map = BTreeMap::default();
1231                map.insert(
1232                    room_id.to_owned(),
1233                    [(invited_user_id.to_owned(), new_invite_member_event.into())]
1234                        .into_iter()
1235                        .collect(),
1236                );
1237                map
1238            },
1239
1240            ..StateChanges::default()
1241        };
1242
1243        let raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(new_invite_member_json)
1244            .expect("can create sync-state-event for topic");
1245        let event = raw.deserialize()?;
1246        changes.add_state_event(room_id, event, raw);
1247
1248        self.save_changes(&changes).await?;
1249
1250        // The profile for user has been removed.
1251        assert!(self.get_profile(room_id, user_id).await?.is_none());
1252        assert!(self.get_member_event(room_id, user_id).await?.is_some());
1253
1254        // The profile for the invited user has been updated.
1255        let invited_member_event = self.get_profile(room_id, invited_user_id).await?.unwrap();
1256        assert_eq!(
1257            invited_member_event.content.displayname.as_deref(),
1258            Some("example after update")
1259        );
1260        assert!(self.get_member_event(room_id, invited_user_id).await?.is_some());
1261
1262        Ok(())
1263    }
1264
1265    async fn test_presence_saving(&self) -> TestResult {
1266        let user_id = user_id();
1267        let second_user_id = user_id!("@second:localhost");
1268        let third_user_id = user_id!("@third:localhost");
1269        let unknown_user_id = user_id!("@unknown:localhost");
1270
1271        // No event in store.
1272        let mut user_ids = vec![user_id.to_owned()];
1273        let presence_event = self.get_presence_event(user_id).await;
1274        assert!(presence_event?.is_none());
1275        let presence_events = self.get_presence_events(&user_ids).await;
1276        assert!(presence_events?.is_empty());
1277
1278        // One event in store.
1279        let mut changes = StateChanges::default();
1280        changes.presence.insert(user_id.to_owned(), custom_presence_event(user_id));
1281        self.save_changes(&changes).await?;
1282
1283        let presence_event = self.get_presence_event(user_id).await;
1284        assert!(presence_event?.is_some());
1285        let presence_events = self.get_presence_events(&user_ids).await;
1286        assert_eq!(presence_events?.len(), 1);
1287
1288        // Several events in store.
1289        let mut changes = StateChanges::default();
1290        changes.presence.insert(second_user_id.to_owned(), custom_presence_event(second_user_id));
1291        changes.presence.insert(third_user_id.to_owned(), custom_presence_event(third_user_id));
1292        self.save_changes(&changes).await?;
1293
1294        user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
1295        let presence_event = self.get_presence_event(second_user_id).await;
1296        assert!(presence_event?.is_some());
1297        let presence_event = self.get_presence_event(third_user_id).await;
1298        assert!(presence_event?.is_some());
1299        let presence_events = self.get_presence_events(&user_ids).await;
1300        assert_eq!(presence_events?.len(), 3);
1301
1302        // Several events in store with one unknown.
1303        user_ids.push(unknown_user_id.to_owned());
1304        let member_events = self.get_presence_events(&user_ids).await;
1305        assert_eq!(member_events?.len(), 3);
1306
1307        // Empty user IDs list.
1308        let presence_events = self.get_presence_events(&[]).await;
1309        assert!(presence_events?.is_empty());
1310
1311        Ok(())
1312    }
1313
1314    async fn test_display_names_saving(&self) -> TestResult {
1315        let room_id = room_id!("!test_display_names_saving:localhost");
1316        let user_id = user_id();
1317        let user_display_name = DisplayName::new("User");
1318        let second_user_id = user_id!("@second:localhost");
1319        let third_user_id = user_id!("@third:localhost");
1320        let other_display_name = DisplayName::new("Raoul");
1321        let unknown_display_name = DisplayName::new("Unknown");
1322
1323        // No event in store.
1324        let mut display_names = vec![user_display_name.to_owned()];
1325        let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1326        assert!(users.is_empty());
1327        let names = self.get_users_with_display_names(room_id, &display_names).await?;
1328        assert!(names.is_empty());
1329
1330        // One event in store.
1331        let mut changes = StateChanges::default();
1332        changes
1333            .ambiguity_maps
1334            .entry(room_id.to_owned())
1335            .or_default()
1336            .insert(user_display_name.to_owned(), [user_id.to_owned()].into());
1337        self.save_changes(&changes).await?;
1338
1339        let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1340        assert_eq!(users.len(), 1);
1341        let names = self.get_users_with_display_names(room_id, &display_names).await?;
1342        assert_eq!(names.len(), 1);
1343        assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1344
1345        // Several events in store.
1346        let mut changes = StateChanges::default();
1347        changes.ambiguity_maps.entry(room_id.to_owned()).or_default().insert(
1348            other_display_name.to_owned(),
1349            [second_user_id.to_owned(), third_user_id.to_owned()].into(),
1350        );
1351        self.save_changes(&changes).await?;
1352
1353        display_names.push(other_display_name.to_owned());
1354        let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1355        assert_eq!(users.len(), 1);
1356        let users = self.get_users_with_display_name(room_id, &other_display_name).await?;
1357        assert_eq!(users.len(), 2);
1358        let names = self.get_users_with_display_names(room_id, &display_names).await?;
1359        assert_eq!(names.len(), 2);
1360        assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1361        assert_eq!(names.get(&other_display_name).unwrap().len(), 2);
1362
1363        // Several events in store with one unknown.
1364        display_names.push(unknown_display_name.to_owned());
1365        let names = self.get_users_with_display_names(room_id, &display_names).await?;
1366        assert_eq!(names.len(), 2);
1367
1368        // Empty user IDs list.
1369        let names = self.get_users_with_display_names(room_id, &[]).await?;
1370        assert!(names.is_empty());
1371
1372        Ok(())
1373    }
1374
1375    #[allow(clippy::needless_range_loop)]
1376    async fn test_send_queue(&self) -> TestResult {
1377        let room_id = room_id!("!test_send_queue:localhost");
1378
1379        // No queued event in store at first.
1380        let events = self.load_send_queue_requests(room_id).await?;
1381        assert!(events.is_empty());
1382
1383        // Saving one thing should work.
1384        let txn0 = TransactionId::new();
1385        let event0 =
1386            SerializableEventContent::new(&RoomMessageEventContent::text_plain("msg0").into())?;
1387        self.save_send_queue_request(
1388            room_id,
1389            txn0.clone(),
1390            MilliSecondsSinceUnixEpoch::now(),
1391            event0.into(),
1392            0,
1393        )
1394        .await?;
1395
1396        // Reading it will work.
1397        let pending = self.load_send_queue_requests(room_id).await?;
1398
1399        assert_eq!(pending.len(), 1);
1400        {
1401            assert_eq!(pending[0].transaction_id, txn0);
1402
1403            let deserialized = pending[0].as_event().unwrap().deserialize()?;
1404            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1405            assert_eq!(content.body(), "msg0");
1406
1407            assert!(!pending[0].is_wedged());
1408        }
1409
1410        // Saving another three things should work.
1411        for i in 1..=3 {
1412            let txn = TransactionId::new();
1413            let event = SerializableEventContent::new(
1414                &RoomMessageEventContent::text_plain(format!("msg{i}")).into(),
1415            )?;
1416
1417            self.save_send_queue_request(
1418                room_id,
1419                txn,
1420                MilliSecondsSinceUnixEpoch::now(),
1421                event.into(),
1422                0,
1423            )
1424            .await?;
1425        }
1426
1427        // Reading all the events should work.
1428        let pending = self.load_send_queue_requests(room_id).await?;
1429
1430        // All the events should be retrieved, in the same order.
1431        assert_eq!(pending.len(), 4);
1432
1433        assert_eq!(pending[0].transaction_id, txn0);
1434
1435        for i in 0..4 {
1436            let deserialized = pending[i].as_event().unwrap().deserialize()?;
1437            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1438            assert_eq!(content.body(), format!("msg{i}"));
1439            assert!(!pending[i].is_wedged());
1440        }
1441
1442        // Marking an event as wedged works.
1443        let txn2 = &pending[2].transaction_id;
1444        self.update_send_queue_request_status(
1445            room_id,
1446            txn2,
1447            Some(QueueWedgeError::GenericApiError { msg: "Oops".to_owned() }),
1448        )
1449        .await?;
1450
1451        // And it is reflected.
1452        let pending = self.load_send_queue_requests(room_id).await?;
1453
1454        // All the events should be retrieved, in the same order.
1455        assert_eq!(pending.len(), 4);
1456        assert_eq!(pending[0].transaction_id, txn0);
1457        assert_eq!(pending[2].transaction_id, *txn2);
1458        assert!(pending[2].is_wedged());
1459        let error = pending[2].clone().error.unwrap();
1460        let generic_error = assert_matches!(error, QueueWedgeError::GenericApiError { msg } => msg);
1461        assert_eq!(generic_error, "Oops");
1462        for i in 0..4 {
1463            if i != 2 {
1464                assert!(!pending[i].is_wedged());
1465            }
1466        }
1467
1468        // Updating an event will work, and reset its wedged state to false.
1469        let event0 = SerializableEventContent::new(
1470            &RoomMessageEventContent::text_plain("wow that's a cool test").into(),
1471        )?;
1472        self.update_send_queue_request(room_id, txn2, event0.into()).await?;
1473
1474        // And it is reflected.
1475        let pending = self.load_send_queue_requests(room_id).await?;
1476
1477        assert_eq!(pending.len(), 4);
1478        {
1479            assert_eq!(pending[2].transaction_id, *txn2);
1480
1481            let deserialized = pending[2].as_event().unwrap().deserialize()?;
1482            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1483            assert_eq!(content.body(), "wow that's a cool test");
1484
1485            assert!(!pending[2].is_wedged());
1486
1487            for i in 0..4 {
1488                if i != 2 {
1489                    let deserialized = pending[i].as_event().unwrap().deserialize()?;
1490                    assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1491                    assert_eq!(content.body(), format!("msg{i}"));
1492
1493                    assert!(!pending[i].is_wedged());
1494                }
1495            }
1496        }
1497
1498        // Removing an event works.
1499        self.remove_send_queue_request(room_id, &txn0).await?;
1500
1501        // And it is reflected.
1502        let pending = self.load_send_queue_requests(room_id).await?;
1503
1504        assert_eq!(pending.len(), 3);
1505        assert_eq!(pending[1].transaction_id, *txn2);
1506        for i in 0..3 {
1507            assert_ne!(pending[i].transaction_id, txn0);
1508        }
1509
1510        // Now add one event for two other rooms, remove one of the events, and then
1511        // query all the rooms which have outstanding unsent events.
1512
1513        // Add one event for room2.
1514        let room_id2 = room_id!("!test_send_queue_two:localhost");
1515        {
1516            let txn = TransactionId::new();
1517            let event = SerializableEventContent::new(
1518                &RoomMessageEventContent::text_plain("room2").into(),
1519            )?;
1520            self.save_send_queue_request(
1521                room_id2,
1522                txn.clone(),
1523                MilliSecondsSinceUnixEpoch::now(),
1524                event.into(),
1525                0,
1526            )
1527            .await?;
1528        }
1529
1530        // Add and remove one event for room3.
1531        {
1532            let room_id3 = room_id!("!test_send_queue_three:localhost");
1533            let txn = TransactionId::new();
1534            let event = SerializableEventContent::new(
1535                &RoomMessageEventContent::text_plain("room3").into(),
1536            )?;
1537            self.save_send_queue_request(
1538                room_id3,
1539                txn.clone(),
1540                MilliSecondsSinceUnixEpoch::now(),
1541                event.into(),
1542                0,
1543            )
1544            .await?;
1545
1546            self.remove_send_queue_request(room_id3, &txn).await?;
1547        }
1548
1549        // Query all the rooms which have unsent events. Per the previous steps,
1550        // it should be room1 and room2, not room3.
1551        let outstanding_rooms = self.load_rooms_with_unsent_requests().await?;
1552        assert_eq!(outstanding_rooms.len(), 2);
1553        assert!(outstanding_rooms.iter().any(|room| room == room_id));
1554        assert!(outstanding_rooms.iter().any(|room| room == room_id2));
1555
1556        Ok(())
1557    }
1558
1559    async fn test_send_queue_priority(&self) -> TestResult {
1560        let room_id = room_id!("!test_send_queue:localhost");
1561
1562        // No queued event in store at first.
1563        let events = self.load_send_queue_requests(room_id).await?;
1564        assert!(events.is_empty());
1565
1566        // Saving one request should work.
1567        let low0_txn = TransactionId::new();
1568        let ev0 =
1569            SerializableEventContent::new(&RoomMessageEventContent::text_plain("low0").into())?;
1570        self.save_send_queue_request(
1571            room_id,
1572            low0_txn.clone(),
1573            MilliSecondsSinceUnixEpoch::now(),
1574            ev0.into(),
1575            2,
1576        )
1577        .await?;
1578
1579        // Saving one request with higher priority should work.
1580        let high_txn = TransactionId::new();
1581        let ev1 =
1582            SerializableEventContent::new(&RoomMessageEventContent::text_plain("high").into())?;
1583        self.save_send_queue_request(
1584            room_id,
1585            high_txn.clone(),
1586            MilliSecondsSinceUnixEpoch::now(),
1587            ev1.into(),
1588            10,
1589        )
1590        .await?;
1591
1592        // Saving another request with the low priority should work.
1593        let low1_txn = TransactionId::new();
1594        let ev2 =
1595            SerializableEventContent::new(&RoomMessageEventContent::text_plain("low1").into())?;
1596        self.save_send_queue_request(
1597            room_id,
1598            low1_txn.clone(),
1599            MilliSecondsSinceUnixEpoch::now(),
1600            ev2.into(),
1601            2,
1602        )
1603        .await?;
1604
1605        // The requests should be ordered from higher priority to lower, and when equal,
1606        // should use the insertion order instead.
1607        let pending = self.load_send_queue_requests(room_id).await?;
1608
1609        assert_eq!(pending.len(), 3);
1610        {
1611            assert_eq!(pending[0].transaction_id, high_txn);
1612
1613            let deserialized = pending[0].as_event().unwrap().deserialize()?;
1614            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1615            assert_eq!(content.body(), "high");
1616        }
1617
1618        {
1619            assert_eq!(pending[1].transaction_id, low0_txn);
1620
1621            let deserialized = pending[1].as_event().unwrap().deserialize()?;
1622            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1623            assert_eq!(content.body(), "low0");
1624        }
1625
1626        {
1627            assert_eq!(pending[2].transaction_id, low1_txn);
1628
1629            let deserialized = pending[2].as_event().unwrap().deserialize()?;
1630            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1631            assert_eq!(content.body(), "low1");
1632        }
1633
1634        Ok(())
1635    }
1636
1637    async fn test_send_queue_dependents(&self) -> TestResult {
1638        let room_id = room_id!("!test_send_queue_dependents:localhost");
1639
1640        // Save one send queue event to start with.
1641        let txn0 = TransactionId::new();
1642        let event0 =
1643            SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into())?;
1644        self.save_send_queue_request(
1645            room_id,
1646            txn0.clone(),
1647            MilliSecondsSinceUnixEpoch::now(),
1648            event0.clone().into(),
1649            0,
1650        )
1651        .await?;
1652
1653        // No dependents, to start with.
1654        assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1655
1656        // Save a redaction for that event.
1657        let child_txn = ChildTransactionId::new();
1658        self.save_dependent_queued_request(
1659            room_id,
1660            &txn0,
1661            child_txn.clone(),
1662            MilliSecondsSinceUnixEpoch::now(),
1663            DependentQueuedRequestKind::RedactEvent,
1664        )
1665        .await?;
1666
1667        // It worked.
1668        let dependents = self.load_dependent_queued_requests(room_id).await?;
1669        assert_eq!(dependents.len(), 1);
1670        assert_eq!(dependents[0].parent_transaction_id, txn0);
1671        assert_eq!(dependents[0].own_transaction_id, child_txn);
1672        assert!(dependents[0].parent_key.is_none());
1673        assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1674
1675        // Update the event id.
1676        let (event, event_type) = event0.raw();
1677        let event_id = owned_event_id!("$1");
1678        let num_updated = self
1679            .mark_dependent_queued_requests_as_ready(
1680                room_id,
1681                &txn0,
1682                SentRequestKey::Event {
1683                    event_id: event_id.clone(),
1684                    event: event.clone(),
1685                    event_type: event_type.to_owned(),
1686                },
1687            )
1688            .await?;
1689        assert_eq!(num_updated, 1);
1690
1691        // It worked.
1692        let dependents = self.load_dependent_queued_requests(room_id).await?;
1693        assert_eq!(dependents.len(), 1);
1694        assert_eq!(dependents[0].parent_transaction_id, txn0);
1695        assert_eq!(dependents[0].own_transaction_id, child_txn);
1696        assert_matches!(
1697            dependents[0].parent_key.as_ref(),
1698            Some(SentRequestKey::Event {
1699                event_id: received_event_id,
1700                event: received_event,
1701                event_type: received_event_type
1702            }) => {
1703                assert_eq!(received_event_id, &event_id);
1704                assert_eq!(received_event.json().to_string(), event.json().to_string());
1705                assert_eq!(received_event_type.as_str(), event_type);
1706            }
1707        );
1708        assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1709
1710        // Now remove it.
1711        let removed = self
1712            .remove_dependent_queued_request(room_id, &dependents[0].own_transaction_id)
1713            .await?;
1714        assert!(removed);
1715
1716        // It worked.
1717        assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1718
1719        // Now, inserting a dependent event and removing the original send queue event
1720        // will NOT remove the dependent event.
1721        let txn1 = TransactionId::new();
1722        let event1 =
1723            SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into())?;
1724        self.save_send_queue_request(
1725            room_id,
1726            txn1.clone(),
1727            MilliSecondsSinceUnixEpoch::now(),
1728            event1.into(),
1729            0,
1730        )
1731        .await?;
1732
1733        self.save_dependent_queued_request(
1734            room_id,
1735            &txn0,
1736            ChildTransactionId::new(),
1737            MilliSecondsSinceUnixEpoch::now(),
1738            DependentQueuedRequestKind::RedactEvent,
1739        )
1740        .await?;
1741        assert_eq!(self.load_dependent_queued_requests(room_id).await?.len(), 1);
1742
1743        self.save_dependent_queued_request(
1744            room_id,
1745            &txn1,
1746            ChildTransactionId::new(),
1747            MilliSecondsSinceUnixEpoch::now(),
1748            DependentQueuedRequestKind::EditEvent {
1749                new_content: SerializableEventContent::new(
1750                    &RoomMessageEventContent::text_plain("edit").into(),
1751                )?,
1752            },
1753        )
1754        .await?;
1755        assert_eq!(self.load_dependent_queued_requests(room_id).await?.len(), 2);
1756
1757        // Remove event0 / txn0.
1758        let removed = self.remove_send_queue_request(room_id, &txn0).await?;
1759        assert!(removed);
1760
1761        // This has removed none of the dependent events.
1762        let dependents = self.load_dependent_queued_requests(room_id).await?;
1763        assert_eq!(dependents.len(), 2);
1764
1765        Ok(())
1766    }
1767
1768    async fn test_update_send_queue_dependent(&self) -> TestResult {
1769        let room_id = room_id!("!test_send_queue_dependents:localhost");
1770
1771        let txn = TransactionId::new();
1772
1773        // Save a dependent redaction for an event.
1774        let child_txn = ChildTransactionId::new();
1775
1776        self.save_dependent_queued_request(
1777            room_id,
1778            &txn,
1779            child_txn.clone(),
1780            MilliSecondsSinceUnixEpoch::now(),
1781            DependentQueuedRequestKind::RedactEvent,
1782        )
1783        .await?;
1784
1785        // It worked.
1786        let dependents = self.load_dependent_queued_requests(room_id).await?;
1787        assert_eq!(dependents.len(), 1);
1788        assert_eq!(dependents[0].parent_transaction_id, txn);
1789        assert_eq!(dependents[0].own_transaction_id, child_txn);
1790        assert!(dependents[0].parent_key.is_none());
1791        assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1792
1793        // Make it a reaction, instead of a redaction.
1794        self.update_dependent_queued_request(
1795            room_id,
1796            &child_txn,
1797            DependentQueuedRequestKind::ReactEvent { key: "👍".to_owned() },
1798        )
1799        .await?;
1800
1801        // It worked.
1802        let dependents = self.load_dependent_queued_requests(room_id).await?;
1803        assert_eq!(dependents.len(), 1);
1804        assert_eq!(dependents[0].parent_transaction_id, txn);
1805        assert_eq!(dependents[0].own_transaction_id, child_txn);
1806        assert!(dependents[0].parent_key.is_none());
1807        assert_matches!(
1808            &dependents[0].kind,
1809            DependentQueuedRequestKind::ReactEvent { key } => {
1810                assert_eq!(key, "👍");
1811            }
1812        );
1813
1814        Ok(())
1815    }
1816
1817    async fn test_get_room_infos(&self) -> TestResult {
1818        let room_id_0 = room_id!("!r0");
1819        let room_id_1 = room_id!("!r1");
1820        let room_id_2 = room_id!("!r2");
1821
1822        // There is no room for the moment.
1823        {
1824            assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 0);
1825        }
1826
1827        // Save rooms.
1828        let mut changes = StateChanges::default();
1829        changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
1830        changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
1831        self.save_changes(&changes).await?;
1832
1833        // We can find all the rooms with `RoomLoadSettings::All`.
1834        {
1835            let mut all_rooms = self.get_room_infos(&RoomLoadSettings::All).await?;
1836
1837            // (We need to sort by `room_id` so that the test is stable across all
1838            // `StateStore` implementations).
1839            all_rooms.sort_by(|a, b| a.room_id.cmp(&b.room_id));
1840
1841            assert_eq!(all_rooms.len(), 2);
1842            assert_eq!(all_rooms[0].room_id, room_id_0);
1843            assert_eq!(all_rooms[1].room_id, room_id_1);
1844        }
1845
1846        // We can find a single room with `RoomLoadSettings::One`.
1847        {
1848            let all_rooms =
1849                self.get_room_infos(&RoomLoadSettings::One(room_id_1.to_owned())).await?;
1850
1851            assert_eq!(all_rooms.len(), 1);
1852            assert_eq!(all_rooms[0].room_id, room_id_1);
1853        }
1854
1855        // `RoomLoadSetting::One` can result in loading zero room if the room is
1856        // unknown.
1857        {
1858            let all_rooms =
1859                self.get_room_infos(&RoomLoadSettings::One(room_id_2.to_owned())).await?;
1860
1861            assert_eq!(all_rooms.len(), 0);
1862        }
1863
1864        Ok(())
1865    }
1866
1867    async fn test_thread_subscriptions(&self) -> TestResult {
1868        let first_thread = event_id!("$t1");
1869        let second_thread = event_id!("$t2");
1870
1871        // At first, there is no thread subscription.
1872        let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1873        assert!(maybe_sub.is_none());
1874
1875        let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1876        assert!(maybe_sub.is_none());
1877
1878        // Setting the thread subscription works.
1879        self.upsert_thread_subscriptions(vec![(
1880            room_id(),
1881            first_thread,
1882            StoredThreadSubscription {
1883                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1884                bump_stamp: None,
1885            },
1886        )])
1887        .await?;
1888
1889        self.upsert_thread_subscriptions(vec![(
1890            room_id(),
1891            second_thread,
1892            StoredThreadSubscription {
1893                status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1894                bump_stamp: None,
1895            },
1896        )])
1897        .await?;
1898
1899        // Now, reading the thread subscription returns the expected status.
1900        let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1901        assert_eq!(
1902            maybe_sub,
1903            Some(StoredThreadSubscription {
1904                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1905                bump_stamp: None,
1906            })
1907        );
1908
1909        let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1910        assert_eq!(
1911            maybe_sub,
1912            Some(StoredThreadSubscription {
1913                status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1914                bump_stamp: None,
1915            })
1916        );
1917
1918        // We can override the thread subscription status.
1919        self.upsert_thread_subscriptions(vec![(
1920            room_id(),
1921            first_thread,
1922            StoredThreadSubscription {
1923                status: ThreadSubscriptionStatus::Unsubscribed,
1924                bump_stamp: None,
1925            },
1926        )])
1927        .await?;
1928
1929        // And it's correctly reflected.
1930        let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1931        assert_eq!(
1932            maybe_sub,
1933            Some(StoredThreadSubscription {
1934                status: ThreadSubscriptionStatus::Unsubscribed,
1935                bump_stamp: None,
1936            })
1937        );
1938
1939        // And the second thread is still subscribed.
1940        let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1941        assert_eq!(
1942            maybe_sub,
1943            Some(StoredThreadSubscription {
1944                status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1945                bump_stamp: None,
1946            })
1947        );
1948
1949        // We can remove a thread subscription.
1950        self.remove_thread_subscription(room_id(), second_thread).await?;
1951
1952        // And it's correctly reflected.
1953        let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1954        assert_eq!(maybe_sub, None);
1955
1956        // And the first thread is still unsubscribed.
1957        let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1958        assert_eq!(
1959            maybe_sub,
1960            Some(StoredThreadSubscription {
1961                status: ThreadSubscriptionStatus::Unsubscribed,
1962                bump_stamp: None,
1963            })
1964        );
1965
1966        // Removing a thread subscription for an unknown thread is a no-op.
1967        self.remove_thread_subscription(room_id(), second_thread).await?;
1968
1969        Ok(())
1970    }
1971
1972    async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult {
1973        let threads = [
1974            event_id!("$t1"),
1975            event_id!("$t2"),
1976            event_id!("$t3"),
1977            event_id!("$t4"),
1978            event_id!("$t5"),
1979            event_id!("$t6"),
1980        ];
1981        // Helper for building the input for `upsert_thread_subscriptions()`,
1982        // which is of the type: Vec<(&RoomId, &EventId, StoredThreadSubscription)>
1983        let build_subscription_updates = |subs: &[StoredThreadSubscription]| {
1984            threads
1985                .iter()
1986                .zip(subs)
1987                .map(|(&event_id, &sub)| (room_id(), event_id, sub))
1988                .collect::<Vec<_>>()
1989        };
1990
1991        // Test bump_stamp logic
1992        let initial_subscriptions = build_subscription_updates(&[
1993            StoredThreadSubscription {
1994                status: ThreadSubscriptionStatus::Unsubscribed,
1995                bump_stamp: None,
1996            },
1997            StoredThreadSubscription {
1998                status: ThreadSubscriptionStatus::Unsubscribed,
1999                bump_stamp: Some(14),
2000            },
2001            StoredThreadSubscription {
2002                status: ThreadSubscriptionStatus::Unsubscribed,
2003                bump_stamp: None,
2004            },
2005            StoredThreadSubscription {
2006                status: ThreadSubscriptionStatus::Unsubscribed,
2007                bump_stamp: Some(210),
2008            },
2009            StoredThreadSubscription {
2010                status: ThreadSubscriptionStatus::Unsubscribed,
2011                bump_stamp: Some(5),
2012            },
2013            StoredThreadSubscription {
2014                status: ThreadSubscriptionStatus::Unsubscribed,
2015                bump_stamp: Some(100),
2016            },
2017        ]);
2018
2019        let update_subscriptions = build_subscription_updates(&[
2020            StoredThreadSubscription {
2021                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2022                bump_stamp: None,
2023            },
2024            StoredThreadSubscription {
2025                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2026                bump_stamp: None,
2027            },
2028            StoredThreadSubscription {
2029                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2030                bump_stamp: Some(1101),
2031            },
2032            StoredThreadSubscription {
2033                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2034                bump_stamp: Some(222),
2035            },
2036            StoredThreadSubscription {
2037                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2038                bump_stamp: Some(1),
2039            },
2040            StoredThreadSubscription {
2041                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2042                bump_stamp: Some(100),
2043            },
2044        ]);
2045
2046        let expected_subscriptions = build_subscription_updates(&[
2047            // Status should be updated, because prev and new bump_stamp are both None
2048            StoredThreadSubscription {
2049                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2050                bump_stamp: None,
2051            },
2052            // Status should be updated, but keep initial bump_stamp (new is None)
2053            StoredThreadSubscription {
2054                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2055                bump_stamp: Some(14),
2056            },
2057            // Status should be updated and also bump_stamp should be updated (initial was None)
2058            StoredThreadSubscription {
2059                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2060                bump_stamp: Some(1101),
2061            },
2062            // Status should be updated and also bump_stamp should be updated (initial was lower)
2063            StoredThreadSubscription {
2064                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2065                bump_stamp: Some(222),
2066            },
2067            // Status shouldn't change, as new bump_stamp is lower
2068            StoredThreadSubscription {
2069                status: ThreadSubscriptionStatus::Unsubscribed,
2070                bump_stamp: Some(5),
2071            },
2072            // Status shouldn't change, as bump_stamp is equal to the previous one
2073            StoredThreadSubscription {
2074                status: ThreadSubscriptionStatus::Unsubscribed,
2075                bump_stamp: Some(100),
2076            },
2077        ]);
2078
2079        // Set the initial subscriptions
2080        self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?;
2081
2082        // Assert the subscriptions have been added
2083        for (room_id, thread_id, expected_sub) in &initial_subscriptions {
2084            let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2085            assert_eq!(stored_subscription, Some(*expected_sub));
2086        }
2087
2088        // Update subscriptions
2089        self.upsert_thread_subscriptions(update_subscriptions).await?;
2090
2091        // Assert the expected subscriptions and bump_stamps
2092        for (room_id, thread_id, expected_sub) in &expected_subscriptions {
2093            let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2094            assert_eq!(stored_subscription, Some(*expected_sub));
2095        }
2096
2097        // Test just state changes, but first remove previous subscriptions
2098        for (room_id, thread_id, _) in &expected_subscriptions {
2099            self.remove_thread_subscription(room_id, thread_id).await?;
2100        }
2101
2102        let initial_subscriptions = build_subscription_updates(&[
2103            StoredThreadSubscription {
2104                status: ThreadSubscriptionStatus::Unsubscribed,
2105                bump_stamp: Some(1),
2106            },
2107            StoredThreadSubscription {
2108                status: ThreadSubscriptionStatus::Subscribed { automatic: false },
2109                bump_stamp: Some(1),
2110            },
2111            StoredThreadSubscription {
2112                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2113                bump_stamp: Some(1),
2114            },
2115        ]);
2116
2117        self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?;
2118
2119        for (room_id, thread_id, expected_sub) in &initial_subscriptions {
2120            let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2121            assert_eq!(stored_subscription, Some(*expected_sub));
2122        }
2123
2124        let update_subscriptions = build_subscription_updates(&[
2125            StoredThreadSubscription {
2126                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2127                bump_stamp: Some(2),
2128            },
2129            StoredThreadSubscription {
2130                status: ThreadSubscriptionStatus::Unsubscribed,
2131                bump_stamp: Some(2),
2132            },
2133            StoredThreadSubscription {
2134                status: ThreadSubscriptionStatus::Subscribed { automatic: false },
2135                bump_stamp: Some(2),
2136            },
2137        ]);
2138
2139        self.upsert_thread_subscriptions(update_subscriptions.clone()).await?;
2140
2141        for (room_id, thread_id, expected_sub) in &update_subscriptions {
2142            let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2143            assert_eq!(stored_subscription, Some(*expected_sub));
2144        }
2145
2146        Ok(())
2147    }
2148}
2149
2150/// Macro building to allow your StateStore implementation to run the entire
2151/// tests suite locally.
2152///
2153/// You need to provide a `async fn get_store() -> StoreResult<impl StateStore>`
2154/// providing a fresh store on the same level you invoke the macro.
2155///
2156/// ## Usage Example:
2157/// ```no_run
2158/// # use matrix_sdk_base::store::{
2159/// #    StateStore,
2160/// #    MemoryStore as MyStore,
2161/// #    Result as StoreResult,
2162/// # };
2163///
2164/// #[cfg(test)]
2165/// mod tests {
2166///     use super::{MyStore, StateStore, StoreResult};
2167///
2168///     async fn get_store() -> StoreResult<impl StateStore> {
2169///         Ok(MyStore::new())
2170///     }
2171///
2172///     statestore_integration_tests!();
2173/// }
2174/// ```
2175#[allow(unused_macros, unused_extern_crates)]
2176#[macro_export]
2177macro_rules! statestore_integration_tests {
2178    () => {
2179        mod statestore_integration_tests {
2180            use matrix_sdk_test::{TestResult, async_test};
2181            use $crate::store::{IntoStateStore, StateStoreIntegrationTests};
2182
2183            use super::get_store;
2184
2185            #[async_test]
2186            async fn test_topic_redaction() -> TestResult {
2187                let store = get_store().await?.into_state_store();
2188                store.test_topic_redaction().await
2189            }
2190
2191            #[async_test]
2192            async fn test_populate_store() -> TestResult {
2193                let store = get_store().await?.into_state_store();
2194                store.test_populate_store().await
2195            }
2196
2197            #[async_test]
2198            async fn test_member_saving() -> TestResult {
2199                let store = get_store().await?.into_state_store();
2200                store.test_member_saving().await
2201            }
2202
2203            #[async_test]
2204            async fn test_filter_saving() -> TestResult {
2205                let store = get_store().await?.into_state_store();
2206                store.test_filter_saving().await
2207            }
2208
2209            #[async_test]
2210            async fn test_user_avatar_url_saving() -> TestResult {
2211                let store = get_store().await?.into_state_store();
2212                store.test_user_avatar_url_saving().await
2213            }
2214
2215            #[async_test]
2216            async fn test_supported_versions_saving() -> TestResult {
2217                let store = get_store().await?.into_state_store();
2218                store.test_supported_versions_saving().await
2219            }
2220
2221            #[async_test]
2222            async fn test_well_known_saving() -> TestResult {
2223                let store = get_store().await?.into_state_store();
2224                store.test_well_known_saving().await
2225            }
2226
2227            #[async_test]
2228            async fn test_sync_token_saving() -> TestResult {
2229                let store = get_store().await?.into_state_store();
2230                store.test_sync_token_saving().await
2231            }
2232
2233            #[async_test]
2234            async fn test_utd_hook_manager_data_saving() -> TestResult {
2235                let store = get_store().await?.into_state_store();
2236                store.test_utd_hook_manager_data_saving().await
2237            }
2238
2239            #[async_test]
2240            async fn test_one_time_key_already_uploaded_data_saving() -> TestResult {
2241                let store = get_store().await?.into_state_store();
2242                store.test_one_time_key_already_uploaded_data_saving().await
2243            }
2244
2245            #[async_test]
2246            async fn test_stripped_member_saving() -> TestResult {
2247                let store = get_store().await?.into_state_store();
2248                store.test_stripped_member_saving().await
2249            }
2250
2251            #[async_test]
2252            async fn test_power_level_saving() -> TestResult {
2253                let store = get_store().await?.into_state_store();
2254                store.test_power_level_saving().await
2255            }
2256
2257            #[async_test]
2258            async fn test_receipts_saving() -> TestResult {
2259                let store = get_store().await?.into_state_store();
2260                store.test_receipts_saving().await
2261            }
2262
2263            #[async_test]
2264            async fn test_custom_storage() -> TestResult {
2265                let store = get_store().await?.into_state_store();
2266                store.test_custom_storage().await
2267            }
2268
2269            #[async_test]
2270            async fn test_stripped_non_stripped() -> TestResult {
2271                let store = get_store().await?.into_state_store();
2272                store.test_stripped_non_stripped().await
2273            }
2274
2275            #[async_test]
2276            async fn test_room_removal() -> TestResult {
2277                let store = get_store().await?.into_state_store();
2278                store.test_room_removal().await
2279            }
2280
2281            #[async_test]
2282            async fn test_profile_removal() -> TestResult {
2283                let store = get_store().await?.into_state_store();
2284                store.test_profile_removal().await
2285            }
2286
2287            #[async_test]
2288            async fn test_presence_saving() -> TestResult {
2289                let store = get_store().await?.into_state_store();
2290                store.test_presence_saving().await
2291            }
2292
2293            #[async_test]
2294            async fn test_display_names_saving() -> TestResult {
2295                let store = get_store().await?.into_state_store();
2296                store.test_display_names_saving().await
2297            }
2298
2299            #[async_test]
2300            async fn test_send_queue() -> TestResult {
2301                let store = get_store().await?.into_state_store();
2302                store.test_send_queue().await
2303            }
2304
2305            #[async_test]
2306            async fn test_send_queue_priority() -> TestResult {
2307                let store = get_store().await?.into_state_store();
2308                store.test_send_queue_priority().await
2309            }
2310
2311            #[async_test]
2312            async fn test_send_queue_dependents() -> TestResult {
2313                let store = get_store().await?.into_state_store();
2314                store.test_send_queue_dependents().await
2315            }
2316
2317            #[async_test]
2318            async fn test_update_send_queue_dependent() -> TestResult {
2319                let store = get_store().await?.into_state_store();
2320                store.test_update_send_queue_dependent().await
2321            }
2322
2323            #[async_test]
2324            async fn test_get_room_infos() -> TestResult {
2325                let store = get_store().await?.into_state_store();
2326                store.test_get_room_infos().await
2327            }
2328
2329            #[async_test]
2330            async fn test_thread_subscriptions() -> TestResult {
2331                let store = get_store().await?.into_state_store();
2332                store.test_thread_subscriptions().await
2333            }
2334
2335            #[async_test]
2336            async fn test_thread_subscriptions_bulk_upsert() -> TestResult {
2337                let store = get_store().await?.into_state_store();
2338                store.test_thread_subscriptions_bulk_upsert().await
2339            }
2340        }
2341    };
2342}
2343
2344fn user_id() -> &'static UserId {
2345    user_id!("@example:localhost")
2346}
2347
2348fn invited_user_id() -> &'static UserId {
2349    user_id!("@invited:localhost")
2350}
2351
2352fn room_id() -> &'static RoomId {
2353    room_id!("!test:localhost")
2354}
2355
2356fn stripped_room_id() -> &'static RoomId {
2357    room_id!("!stripped:localhost")
2358}
2359
2360fn first_receipt_event_id() -> &'static EventId {
2361    event_id!("$example")
2362}
2363
2364fn topic_event_id() -> &'static EventId {
2365    event_id!("$topic_event")
2366}
2367
2368fn power_level_event() -> Raw<AnySyncStateEvent> {
2369    let content = RoomPowerLevelsEventContent::new(&AuthorizationRules::V1);
2370
2371    let event = json!({
2372        "event_id": "$h29iv0s8:example.com",
2373        "content": content,
2374        "sender": user_id(),
2375        "type": "m.room.power_levels",
2376        "origin_server_ts": 0u64,
2377        "state_key": "",
2378    });
2379
2380    serde_json::from_value(event).unwrap()
2381}
2382
2383fn stripped_membership_event() -> Raw<StrippedRoomMemberEvent> {
2384    custom_stripped_membership_event(user_id())
2385}
2386
2387fn custom_stripped_membership_event(user_id: &UserId) -> Raw<StrippedRoomMemberEvent> {
2388    let ev_json = json!({
2389        "type": "m.room.member",
2390        "content": RoomMemberEventContent::new(MembershipState::Join),
2391        "sender": user_id,
2392        "state_key": user_id,
2393    });
2394
2395    Raw::new(&ev_json).unwrap().cast_unchecked()
2396}
2397
2398fn membership_event() -> Raw<SyncRoomMemberEvent> {
2399    custom_membership_event(user_id(), event_id!("$h29iv0s8:example.com"))
2400}
2401
2402fn custom_membership_event(user_id: &UserId, event_id: &EventId) -> Raw<SyncRoomMemberEvent> {
2403    let ev_json = json!({
2404        "type": "m.room.member",
2405        "content": RoomMemberEventContent::new(MembershipState::Join),
2406        "event_id": event_id,
2407        "origin_server_ts": 198,
2408        "sender": user_id,
2409        "state_key": user_id,
2410    });
2411
2412    Raw::new(&ev_json).unwrap().cast_unchecked()
2413}
2414
2415fn custom_presence_event(user_id: &UserId) -> Raw<PresenceEvent> {
2416    let ev_json = json!({
2417        "content": {
2418            "presence": "online"
2419        },
2420        "sender": user_id,
2421    });
2422
2423    Raw::new(&ev_json).unwrap().cast_unchecked()
2424}