Skip to main content

matrix_sdk_base/store/
integration_tests.rs

1//! Trait and macro of integration tests for StateStore implementations.
2
3use std::{
4    collections::{BTreeMap, BTreeSet, HashMap},
5    str::FromStr,
6};
7
8use assert_matches::assert_matches;
9use assert_matches2::assert_let;
10use growable_bloom_filter::GrowableBloomBuilder;
11use matrix_sdk_test::{TestResult, event_factory::EventFactory};
12use ruma::{
13    EventId, MilliSecondsSinceUnixEpoch, OwnedUserId, RoomId, TransactionId, UserId,
14    api::{
15        FeatureFlag, MatrixVersion,
16        client::discovery::discover_homeserver::{HomeserverInfo, RtcFocusInfo},
17    },
18    event_id,
19    events::{
20        AnyGlobalAccountDataEvent, AnyMessageLikeEventContent, AnyRoomAccountDataEvent,
21        AnyStrippedStateEvent, AnySyncStateEvent, GlobalAccountDataEventType,
22        RoomAccountDataEventType, StateEventType, SyncStateEvent,
23        presence::PresenceEvent,
24        receipt::{ReceiptThread, ReceiptType},
25        room::{
26            member::{
27                MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent,
28                SyncRoomMemberEvent,
29            },
30            message::RoomMessageEventContent,
31            power_levels::RoomPowerLevelsEventContent,
32            redaction::SyncRoomRedactionEvent,
33            topic::RoomTopicEventContent,
34        },
35        tag::{TagInfo, TagName, Tags, UserTagName},
36    },
37    mxc_uri, owned_event_id, owned_mxc_uri,
38    presence::PresenceState,
39    push::Ruleset,
40    room_id,
41    room_version_rules::AuthorizationRules,
42    serde::Raw,
43    uint, user_id,
44};
45use serde_json::json;
46
47use super::{
48    DependentQueuedRequestKind, DisplayName, DynStateStore, RoomLoadSettings,
49    SupportedVersionsResponse, TtlStoreValue, WellKnownResponse, send_queue::SentRequestKey,
50};
51use crate::{
52    RoomInfo, RoomMemberships, RoomState, StateChanges, StateStoreDataKey, StateStoreDataValue,
53    deserialized_responses::MemberEvent,
54    store::{
55        ChildTransactionId, QueueWedgeError, SerializableEventContent, StateStoreExt,
56        StoredThreadSubscription, ThreadSubscriptionStatus,
57    },
58    utils::RawStateEventWithKeys,
59};
60
61/// `StateStore` integration tests.
62///
63/// This trait is not meant to be used directly, but will be used with the
64/// `statestore_integration_tests!` macro.
65#[allow(async_fn_in_trait)]
66pub trait StateStoreIntegrationTests {
67    /// Populate the given `StateStore`.
68    async fn populate(&self) -> TestResult;
69    /// Test room topic redaction.
70    async fn test_topic_redaction(&self) -> TestResult;
71    /// Test populating the store.
72    async fn test_populate_store(&self) -> TestResult;
73    /// Test room member saving.
74    async fn test_member_saving(&self) -> TestResult;
75    /// Test filter saving.
76    async fn test_filter_saving(&self) -> TestResult;
77    /// Test saving a user avatar URL.
78    async fn test_user_avatar_url_saving(&self) -> TestResult;
79    /// Test sync token saving.
80    async fn test_sync_token_saving(&self) -> TestResult;
81    /// Test UtdHookManagerData saving.
82    async fn test_utd_hook_manager_data_saving(&self) -> TestResult;
83    /// Test the saving of the OneTimeKeyAlreadyUploaded key/value data type.
84    async fn test_one_time_key_already_uploaded_data_saving(&self) -> TestResult;
85    /// Test stripped room member saving.
86    async fn test_stripped_member_saving(&self) -> TestResult;
87    /// Test room power levels saving.
88    async fn test_power_level_saving(&self) -> TestResult;
89    /// Test user receipts saving.
90    async fn test_receipts_saving(&self) -> TestResult;
91    /// Test custom storage.
92    async fn test_custom_storage(&self) -> TestResult;
93    /// Test stripped and non-stripped room member saving.
94    async fn test_stripped_non_stripped(&self) -> TestResult;
95    /// Test room removal.
96    async fn test_room_removal(&self) -> TestResult;
97    /// Test profile removal.
98    async fn test_profile_removal(&self) -> TestResult;
99    /// Test presence saving.
100    async fn test_presence_saving(&self) -> TestResult;
101    /// Test display names saving.
102    async fn test_display_names_saving(&self) -> TestResult;
103    /// Test operations with the send queue.
104    async fn test_send_queue(&self) -> TestResult;
105    /// Test priority of operations with the send queue.
106    async fn test_send_queue_priority(&self) -> TestResult;
107    /// Test operations related to send queue dependents.
108    async fn test_send_queue_dependents(&self) -> TestResult;
109    /// Test an update to a send queue dependent request.
110    async fn test_update_send_queue_dependent(&self) -> TestResult;
111    /// Test saving/restoring the supported versions of the server.
112    async fn test_supported_versions_saving(&self) -> TestResult;
113    /// Test saving/restoring the well-known info of the server.
114    async fn test_well_known_saving(&self) -> TestResult;
115    /// Test fetching room infos based on [`RoomLoadSettings`].
116    async fn test_get_room_infos(&self) -> TestResult;
117    /// Test loading thread subscriptions.
118    async fn test_thread_subscriptions(&self) -> TestResult;
119    /// Test thread subscriptions bulk upsert, including bumpstamp semantics.
120    async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult;
121}
122
123impl StateStoreIntegrationTests for DynStateStore {
124    async fn populate(&self) -> TestResult {
125        let mut changes = StateChanges::default();
126
127        let user_id = user_id();
128        let invited_user_id = invited_user_id();
129        let room_id = room_id();
130        let stripped_room_id = stripped_room_id();
131
132        changes.sync_token = Some("t392-516_47314_0_7_1_1_1_11444_1".to_owned());
133
134        let f = EventFactory::new().sender(user_id);
135        let presence_raw: Raw<PresenceEvent> = f
136            .presence(PresenceState::Online)
137            .avatar_url(mxc_uri!("mxc://localhost/wefuiwegh8742w"))
138            .currently_active(false)
139            .last_active_ago(1)
140            .status_msg("Making cupcakes")
141            .into();
142        let presence_event = presence_raw.deserialize()?;
143        changes.add_presence_event(presence_event, presence_raw);
144
145        let f = EventFactory::new().sender(user_id);
146        let pushrules_raw: Raw<AnyGlobalAccountDataEvent> =
147            f.push_rules(Ruleset::server_default(user_id)).into();
148        let pushrules_event = pushrules_raw.deserialize()?;
149        changes.account_data.insert(pushrules_event.event_type(), pushrules_raw);
150
151        let mut room = RoomInfo::new(room_id, RoomState::Joined);
152        room.mark_as_left();
153
154        let f = EventFactory::new().sender(user_id).room(room_id);
155        let mut tags = Tags::new();
156        tags.insert(TagName::Favorite, TagInfo::new());
157        tags.insert(TagName::User(UserTagName::from_str("u.work").unwrap()), TagInfo::new());
158        let tag_raw: Raw<AnyRoomAccountDataEvent> = f.tag(tags).into();
159        let tag_event = tag_raw.deserialize()?;
160        changes.add_room_account_data(room_id, tag_event, tag_raw);
161
162        let f = EventFactory::new().sender(user_id).room(room_id);
163        let name_raw: Raw<AnySyncStateEvent> = f.room_name("room name").into();
164        let name_event = name_raw.deserialize()?;
165        room.handle_state_event(
166            &mut RawStateEventWithKeys::try_from_raw_state_event(name_raw.clone())
167                .expect("generated state event should be valid"),
168        );
169        changes.add_state_event(room_id, name_event, name_raw);
170
171        let f = EventFactory::new().sender(user_id);
172        let receipt_content = f
173            .room(room_id)
174            .read_receipts()
175            .add(event_id!("$example"), user_id, ReceiptType::Read, ReceiptThread::Unthreaded)
176            .into_content();
177        changes.add_receipts(room_id, receipt_content);
178
179        let topic_event_id = topic_event_id();
180        let topic_raw: Raw<AnySyncStateEvent> = EventFactory::new()
181            .room(room_id)
182            .sender(user_id)
183            .room_topic("😀")
184            .event_id(topic_event_id)
185            .prev_content(RoomTopicEventContent::new("test".to_owned()))
186            .into_raw_sync_state();
187        let topic_event = topic_raw.deserialize()?;
188        room.handle_state_event(
189            &mut RawStateEventWithKeys::try_from_raw_state_event(topic_raw.clone())
190                .expect("generated state event should be valid"),
191        );
192        changes.add_state_event(room_id, topic_event, topic_raw);
193
194        let mut room_ambiguity_map = HashMap::new();
195        let mut room_profiles = BTreeMap::new();
196
197        let f = EventFactory::new().sender(user_id).room(room_id);
198        let member_raw: Raw<SyncRoomMemberEvent> =
199            f.member(user_id).display_name("example").previous(MembershipState::Invite).into();
200        let member_event: SyncRoomMemberEvent = member_raw.deserialize()?;
201        let displayname = DisplayName::new(
202            member_event.as_original().unwrap().content.displayname.as_ref().unwrap(),
203        );
204        room_ambiguity_map.insert(displayname.clone(), BTreeSet::from([user_id.to_owned()]));
205        room_profiles.insert(user_id.to_owned(), (&member_event).into());
206
207        let member_raw: Raw<AnySyncStateEvent> = member_raw.cast_unchecked();
208        let member_state_event = member_raw.deserialize()?;
209        changes.add_state_event(room_id, member_state_event, member_raw);
210
211        let f = EventFactory::new().sender(user_id).room(room_id);
212        let invited_member_raw: Raw<SyncRoomMemberEvent> = f
213            .member(invited_user_id)
214            .invited(invited_user_id)
215            .display_name("example")
216            .avatar_url(mxc_uri!("mxc://localhost/SEsfnsuifSDFSSEF"))
217            .reason("Looking for support")
218            .into();
219        // FIXME: Should be stripped room member event
220        let invited_member_event: SyncRoomMemberEvent = invited_member_raw.deserialize()?;
221        room_ambiguity_map.entry(displayname).or_default().insert(invited_user_id.to_owned());
222        room_profiles.insert(invited_user_id.to_owned(), (&invited_member_event).into());
223
224        let invited_member_raw: Raw<AnySyncStateEvent> = invited_member_raw.cast_unchecked();
225        let invited_member_state_event = invited_member_raw.deserialize()?;
226        changes.add_state_event(room_id, invited_member_state_event, invited_member_raw);
227
228        changes.ambiguity_maps.insert(room_id.to_owned(), room_ambiguity_map);
229        changes.profiles.insert(room_id.to_owned(), room_profiles);
230        changes.add_room(room);
231
232        let mut stripped_room = RoomInfo::new(stripped_room_id, RoomState::Invited);
233
234        let f = EventFactory::new().sender(user_id).room(stripped_room_id);
235        let stripped_name_raw: Raw<AnyStrippedStateEvent> = f.room_name("room name").into();
236        let mut stripped_name_event =
237            RawStateEventWithKeys::try_from_raw_state_event(stripped_name_raw).unwrap();
238        stripped_room.handle_stripped_state_event(&mut stripped_name_event);
239        changes.stripped_state.insert(
240            stripped_room_id.to_owned(),
241            BTreeMap::from([(
242                stripped_name_event.event_type,
243                BTreeMap::from([(stripped_name_event.state_key, stripped_name_event.raw)]),
244            )]),
245        );
246
247        changes.add_room(stripped_room);
248
249        let f = EventFactory::new().sender(user_id).room(stripped_room_id);
250        let stripped_member_raw: Raw<StrippedRoomMemberEvent> =
251            f.member(user_id).display_name("example").into();
252        changes.add_stripped_member(stripped_room_id, user_id, stripped_member_raw);
253
254        self.save_changes(&changes).await?;
255
256        Ok(())
257    }
258
259    async fn test_topic_redaction(&self) -> TestResult {
260        let room_id = room_id();
261        let f = EventFactory::new();
262        self.populate().await?;
263
264        assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
265        assert_eq!(
266            self.get_state_event_static::<RoomTopicEventContent>(room_id)
267                .await?
268                .expect("room topic found before redaction")
269                .deserialize()
270                .expect("can deserialize room topic before redaction")
271                .as_sync()
272                .expect("room topic is a sync state event")
273                .as_original()
274                .expect("room topic is not redacted yet")
275                .content
276                .topic,
277            "😀"
278        );
279
280        let mut changes = StateChanges::default();
281
282        let topic_event_id = topic_event_id();
283        let redaction_evt: Raw<SyncRoomRedactionEvent> =
284            f.room(room_id).sender(user_id()).redaction(topic_event_id).into_raw();
285
286        changes.add_redaction(room_id, topic_event_id, redaction_evt);
287        self.save_changes(&changes).await?;
288
289        let redacted_event = self
290            .get_state_event_static::<RoomTopicEventContent>(room_id)
291            .await?
292            .expect("room topic found after redaction")
293            .deserialize()
294            .expect("can deserialize room topic after redaction");
295
296        assert_matches!(redacted_event.as_sync(), Some(SyncStateEvent::Redacted(_)));
297
298        Ok(())
299    }
300
301    async fn test_populate_store(&self) -> TestResult {
302        let room_id = room_id();
303        let user_id = user_id();
304        let display_name = DisplayName::new("example");
305
306        self.populate().await?;
307
308        assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
309        assert!(self.get_presence_event(user_id).await?.is_some());
310        assert_eq!(
311            self.get_room_infos(&RoomLoadSettings::default()).await?.len(),
312            2,
313            "Expected to find 2 room infos"
314        );
315        assert!(
316            self.get_account_data_event(GlobalAccountDataEventType::PushRules).await?.is_some()
317        );
318
319        assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_some());
320        assert_eq!(
321            self.get_state_events(room_id, StateEventType::RoomTopic).await?.len(),
322            1,
323            "Expected to find 1 room topic"
324        );
325        assert!(self.get_profile(room_id, user_id).await?.is_some());
326        assert!(self.get_member_event(room_id, user_id).await?.is_some());
327        assert_eq!(
328            self.get_user_ids(room_id, RoomMemberships::empty()).await?.len(),
329            2,
330            "Expected to find 2 members for room"
331        );
332        assert_eq!(
333            self.get_user_ids(room_id, RoomMemberships::INVITE).await?.len(),
334            1,
335            "Expected to find 1 invited user ids"
336        );
337        assert_eq!(
338            self.get_user_ids(room_id, RoomMemberships::JOIN).await?.len(),
339            1,
340            "Expected to find 1 joined user ids"
341        );
342        assert_eq!(
343            self.get_users_with_display_name(room_id, &display_name).await?.len(),
344            2,
345            "Expected to find 2 display names for room"
346        );
347        assert!(
348            self.get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
349                .await?
350                .is_some()
351        );
352        assert!(
353            self.get_user_room_receipt_event(
354                room_id,
355                ReceiptType::Read,
356                ReceiptThread::Unthreaded,
357                user_id
358            )
359            .await?
360            .is_some()
361        );
362        assert_eq!(
363            self.get_event_room_receipt_events(
364                room_id,
365                ReceiptType::Read,
366                ReceiptThread::Unthreaded,
367                first_receipt_event_id()
368            )
369            .await?
370            .len(),
371            1,
372            "Expected to find 1 read receipt"
373        );
374        Ok(())
375    }
376
377    async fn test_member_saving(&self) -> TestResult {
378        let room_id = room_id!("!test_member_saving:localhost");
379        let user_id = user_id();
380        let second_user_id = user_id!("@second:localhost");
381        let third_user_id = user_id!("@third:localhost");
382        let unknown_user_id = user_id!("@unknown:localhost");
383
384        // No event in store.
385        let mut user_ids = vec![user_id.to_owned()];
386        assert!(self.get_member_event(room_id, user_id).await?.is_none());
387        let member_events = self
388            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
389            .await;
390        assert!(member_events?.is_empty());
391        assert!(self.get_profile(room_id, user_id).await?.is_none());
392        let profiles = self.get_profiles(room_id, &user_ids).await;
393        assert!(profiles?.is_empty());
394
395        // One event in store.
396        let mut changes = StateChanges::default();
397        let raw_member_event = membership_event();
398        let profile = raw_member_event.deserialize()?.into();
399        changes
400            .state
401            .entry(room_id.to_owned())
402            .or_default()
403            .entry(StateEventType::RoomMember)
404            .or_default()
405            .insert(user_id.into(), raw_member_event.cast());
406        changes.profiles.entry(room_id.to_owned()).or_default().insert(user_id.to_owned(), profile);
407        self.save_changes(&changes).await?;
408
409        assert!(self.get_member_event(room_id, user_id).await?.is_some());
410        let member_events = self
411            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
412            .await;
413        assert_eq!(member_events?.len(), 1);
414        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
415        assert_eq!(members.len(), 1, "We expected to find members for the room");
416        assert!(self.get_profile(room_id, user_id).await?.is_some());
417        let profiles = self.get_profiles(room_id, &user_ids).await;
418        assert_eq!(profiles?.len(), 1);
419
420        // Several events in store.
421        let mut changes = StateChanges::default();
422        let changes_members = changes
423            .state
424            .entry(room_id.to_owned())
425            .or_default()
426            .entry(StateEventType::RoomMember)
427            .or_default();
428        let changes_profiles = changes.profiles.entry(room_id.to_owned()).or_default();
429        let raw_second_member_event =
430            custom_membership_event(second_user_id, event_id!("$second_member_event"));
431        let second_profile = raw_second_member_event.deserialize()?.into();
432        changes_members.insert(second_user_id.into(), raw_second_member_event.cast());
433        changes_profiles.insert(second_user_id.to_owned(), second_profile);
434        let raw_third_member_event =
435            custom_membership_event(third_user_id, event_id!("$third_member_event"));
436        let third_profile = raw_third_member_event.deserialize()?.into();
437        changes_members.insert(third_user_id.into(), raw_third_member_event.cast());
438        changes_profiles.insert(third_user_id.to_owned(), third_profile);
439        self.save_changes(&changes).await?;
440
441        user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
442        assert!(self.get_member_event(room_id, second_user_id).await?.is_some());
443        assert!(self.get_member_event(room_id, third_user_id).await?.is_some());
444        let member_events = self
445            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
446            .await;
447        assert_eq!(member_events?.len(), 3);
448        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
449        assert_eq!(members.len(), 3, "We expected to find members for the room");
450        assert!(self.get_profile(room_id, second_user_id).await?.is_some());
451        assert!(self.get_profile(room_id, third_user_id).await?.is_some());
452        let profiles = self.get_profiles(room_id, &user_ids).await;
453        assert_eq!(profiles?.len(), 3);
454
455        // Several events in store with one unknown.
456        user_ids.push(unknown_user_id.to_owned());
457        let member_events = self
458            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
459            .await;
460        assert_eq!(member_events?.len(), 3);
461        let profiles = self.get_profiles(room_id, &user_ids).await;
462        assert_eq!(profiles?.len(), 3);
463
464        // Empty user IDs list.
465        let member_events = self
466            .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
467                room_id,
468                &[],
469            )
470            .await;
471        assert!(member_events?.is_empty());
472        let profiles = self.get_profiles(room_id, &[]).await;
473        assert!(profiles?.is_empty());
474
475        Ok(())
476    }
477
478    async fn test_filter_saving(&self) -> TestResult {
479        let filter_name = "filter_name";
480        let filter_id = "filter_id_1234";
481
482        self.set_kv_data(
483            StateStoreDataKey::Filter(filter_name),
484            StateStoreDataValue::Filter(filter_id.to_owned()),
485        )
486        .await?;
487        assert_let!(
488            Ok(Some(StateStoreDataValue::Filter(stored_filter_id))) =
489                self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await
490        );
491        assert_eq!(stored_filter_id, filter_id);
492
493        self.remove_kv_data(StateStoreDataKey::Filter(filter_name)).await?;
494        assert_matches!(self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await, Ok(None));
495
496        Ok(())
497    }
498
499    async fn test_user_avatar_url_saving(&self) -> TestResult {
500        let user_id = user_id!("@alice:example.org");
501        let url = owned_mxc_uri!("mxc://example.org/poiuyt098");
502
503        self.set_kv_data(
504            StateStoreDataKey::UserAvatarUrl(user_id),
505            StateStoreDataValue::UserAvatarUrl(url.clone()),
506        )
507        .await?;
508
509        assert_let!(
510            Ok(Some(StateStoreDataValue::UserAvatarUrl(stored_url))) =
511                self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await
512        );
513        assert_eq!(stored_url, url);
514
515        self.remove_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await?;
516        assert_matches!(
517            self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await,
518            Ok(None)
519        );
520
521        Ok(())
522    }
523
524    async fn test_supported_versions_saving(&self) -> TestResult {
525        let versions =
526            BTreeSet::from([MatrixVersion::V1_1, MatrixVersion::V1_2, MatrixVersion::V1_11]);
527        let supported_versions = SupportedVersionsResponse {
528            versions: versions.iter().map(|version| version.as_str().unwrap().to_owned()).collect(),
529            unstable_features: [("org.matrix.experimental".to_owned(), true)].into(),
530        };
531
532        self.set_kv_data(
533            StateStoreDataKey::SupportedVersions,
534            StateStoreDataValue::SupportedVersions(TtlStoreValue::new(supported_versions.clone())),
535        )
536        .await?;
537
538        assert_let!(
539            Ok(Some(StateStoreDataValue::SupportedVersions(stored_supported_versions))) =
540                self.get_kv_data(StateStoreDataKey::SupportedVersions).await
541        );
542        assert_let!(Some(stored_supported_versions) = stored_supported_versions.into_data());
543        assert_eq!(supported_versions, stored_supported_versions);
544
545        let stored_supported = stored_supported_versions.supported_versions();
546        assert_eq!(stored_supported.versions, versions);
547        assert_eq!(stored_supported.features.len(), 1);
548        assert!(stored_supported.features.contains(&FeatureFlag::from("org.matrix.experimental")));
549
550        self.remove_kv_data(StateStoreDataKey::SupportedVersions).await?;
551        assert_matches!(self.get_kv_data(StateStoreDataKey::SupportedVersions).await, Ok(None));
552
553        Ok(())
554    }
555
556    async fn test_well_known_saving(&self) -> TestResult {
557        let well_known = WellKnownResponse {
558            homeserver: HomeserverInfo::new("matrix.example.com".to_owned()),
559            identity_server: None,
560            tile_server: None,
561            rtc_foci: vec![RtcFocusInfo::livekit("livekit.example.com".to_owned())],
562        };
563
564        self.set_kv_data(
565            StateStoreDataKey::WellKnown,
566            StateStoreDataValue::WellKnown(TtlStoreValue::new(Some(well_known.clone()))),
567        )
568        .await?;
569
570        assert_let!(
571            Ok(Some(StateStoreDataValue::WellKnown(stored_well_known))) =
572                self.get_kv_data(StateStoreDataKey::WellKnown).await
573        );
574        assert_let!(Some(stored_well_known) = stored_well_known.into_data());
575        assert_eq!(stored_well_known, Some(well_known));
576
577        self.remove_kv_data(StateStoreDataKey::WellKnown).await?;
578        assert_matches!(self.get_kv_data(StateStoreDataKey::WellKnown).await, Ok(None));
579
580        self.set_kv_data(
581            StateStoreDataKey::WellKnown,
582            StateStoreDataValue::WellKnown(TtlStoreValue::new(None)),
583        )
584        .await?;
585
586        assert_let!(
587            Ok(Some(StateStoreDataValue::WellKnown(stored_well_known))) =
588                self.get_kv_data(StateStoreDataKey::WellKnown).await
589        );
590        assert_let!(Some(stored_well_known) = stored_well_known.into_data());
591        assert_eq!(stored_well_known, None);
592
593        Ok(())
594    }
595
596    async fn test_sync_token_saving(&self) -> TestResult {
597        let sync_token_1 = "t392-516_47314_0_7_1";
598        let sync_token_2 = "t392-516_47314_0_7_2";
599
600        assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
601
602        let changes =
603            StateChanges { sync_token: Some(sync_token_1.to_owned()), ..Default::default() };
604        self.save_changes(&changes).await?;
605        assert_let!(
606            Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
607                self.get_kv_data(StateStoreDataKey::SyncToken).await
608        );
609        assert_eq!(stored_sync_token, sync_token_1);
610
611        self.set_kv_data(
612            StateStoreDataKey::SyncToken,
613            StateStoreDataValue::SyncToken(sync_token_2.to_owned()),
614        )
615        .await?;
616        assert_let!(
617            Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
618                self.get_kv_data(StateStoreDataKey::SyncToken).await
619        );
620        assert_eq!(stored_sync_token, sync_token_2);
621
622        self.remove_kv_data(StateStoreDataKey::SyncToken).await?;
623        assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
624
625        Ok(())
626    }
627
628    async fn test_utd_hook_manager_data_saving(&self) -> TestResult {
629        // Before any data is written, the getter should return None.
630        assert!(
631            self.get_kv_data(StateStoreDataKey::UtdHookManagerData)
632                .await
633                .expect("Could not read data")
634                .is_none(),
635            "Store was not empty at start"
636        );
637
638        // Put some data in the store...
639        let data = GrowableBloomBuilder::new().build();
640        self.set_kv_data(
641            StateStoreDataKey::UtdHookManagerData,
642            StateStoreDataValue::UtdHookManagerData(data.clone()),
643        )
644        .await
645        .expect("Could not save data");
646
647        // ... and check it comes back.
648        let read_data = self
649            .get_kv_data(StateStoreDataKey::UtdHookManagerData)
650            .await
651            .expect("Could not read data")
652            .expect("no data found")
653            .into_utd_hook_manager_data()
654            .expect("not UtdHookManagerData");
655
656        assert_eq!(read_data, data);
657
658        Ok(())
659    }
660
661    async fn test_one_time_key_already_uploaded_data_saving(&self) -> TestResult {
662        // Before any data is written, the getter should return None.
663        assert!(
664            self.get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded).await?.is_none(),
665            "Store was not empty at start"
666        );
667
668        self.set_kv_data(
669            StateStoreDataKey::OneTimeKeyAlreadyUploaded,
670            StateStoreDataValue::OneTimeKeyAlreadyUploaded,
671        )
672        .await?;
673
674        let data = self.get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded).await?;
675        data.expect("The loaded data should be Some");
676
677        Ok(())
678    }
679
680    async fn test_stripped_member_saving(&self) -> TestResult {
681        let room_id = room_id!("!test_stripped_member_saving:localhost");
682        let user_id = user_id();
683        let second_user_id = user_id!("@second:localhost");
684        let third_user_id = user_id!("@third:localhost");
685        let unknown_user_id = user_id!("@unknown:localhost");
686
687        // No event in store.
688        assert!(self.get_member_event(room_id, user_id).await?.is_none());
689        let member_events = self
690            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
691                room_id,
692                &[user_id.to_owned()],
693            )
694            .await;
695        assert!(member_events?.is_empty());
696
697        // One event in store.
698        let mut changes = StateChanges::default();
699        changes
700            .stripped_state
701            .entry(room_id.to_owned())
702            .or_default()
703            .entry(StateEventType::RoomMember)
704            .or_default()
705            .insert(user_id.into(), stripped_membership_event().cast());
706        self.save_changes(&changes).await?;
707
708        assert!(self.get_member_event(room_id, user_id).await?.is_some());
709        let member_events = self
710            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
711                room_id,
712                &[user_id.to_owned()],
713            )
714            .await;
715        assert_eq!(member_events?.len(), 1);
716        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
717        assert_eq!(members.len(), 1, "We expected to find members for the room");
718
719        // Several events in store.
720        let mut changes = StateChanges::default();
721        let changes_members = changes
722            .stripped_state
723            .entry(room_id.to_owned())
724            .or_default()
725            .entry(StateEventType::RoomMember)
726            .or_default();
727        changes_members
728            .insert(second_user_id.into(), custom_stripped_membership_event(second_user_id).cast());
729        changes_members
730            .insert(third_user_id.into(), custom_stripped_membership_event(third_user_id).cast());
731        self.save_changes(&changes).await?;
732
733        assert!(self.get_member_event(room_id, second_user_id).await?.is_some());
734        assert!(self.get_member_event(room_id, third_user_id).await?.is_some());
735        let member_events = self
736            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
737                room_id,
738                &[user_id.to_owned(), second_user_id.to_owned(), third_user_id.to_owned()],
739            )
740            .await;
741        assert_eq!(member_events?.len(), 3);
742        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
743        assert_eq!(members.len(), 3, "We expected to find members for the room");
744
745        // Several events in store with one unknown.
746        let member_events = self
747            .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
748                room_id,
749                &[
750                    user_id.to_owned(),
751                    second_user_id.to_owned(),
752                    third_user_id.to_owned(),
753                    unknown_user_id.to_owned(),
754                ],
755            )
756            .await;
757        assert_eq!(member_events?.len(), 3);
758
759        // Empty user IDs list.
760        let member_events = self
761            .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
762                room_id,
763                &[],
764            )
765            .await;
766        assert!(member_events?.is_empty());
767
768        Ok(())
769    }
770
771    async fn test_power_level_saving(&self) -> TestResult {
772        let room_id = room_id!("!test_power_level_saving:localhost");
773
774        let raw_event = power_level_event();
775        let event = raw_event.deserialize()?;
776
777        assert!(
778            self.get_state_event(room_id, StateEventType::RoomPowerLevels, "").await?.is_none()
779        );
780        let mut changes = StateChanges::default();
781        changes.add_state_event(room_id, event, raw_event);
782
783        self.save_changes(&changes).await?;
784        assert!(
785            self.get_state_event(room_id, StateEventType::RoomPowerLevels, "").await?.is_some()
786        );
787
788        Ok(())
789    }
790
791    async fn test_receipts_saving(&self) -> TestResult {
792        let room_id = room_id!("!test_receipts_saving:localhost");
793
794        let first_event_id = event_id!("$1435641916114394fHBLK:matrix.org");
795        let second_event_id = event_id!("$fHBLK1435641916114394:matrix.org");
796
797        let first_receipt_ts = uint!(1436451550);
798        let second_receipt_ts = uint!(1436451653);
799        let third_receipt_ts = uint!(1436474532);
800
801        let first_receipt_event = serde_json::from_value(json!({
802            first_event_id: {
803                "m.read": {
804                    user_id(): {
805                        "ts": first_receipt_ts,
806                    }
807                }
808            }
809        }))?;
810
811        let second_receipt_event = serde_json::from_value(json!({
812            second_event_id: {
813                "m.read": {
814                    user_id(): {
815                        "ts": second_receipt_ts,
816                    }
817                }
818            }
819        }))?;
820
821        let third_receipt_event = serde_json::from_value(json!({
822            second_event_id: {
823                "m.read": {
824                    user_id(): {
825                        "ts": third_receipt_ts,
826                        "thread_id": "main",
827                    }
828                }
829            }
830        }))?;
831
832        assert!(
833            self.get_user_room_receipt_event(
834                room_id,
835                ReceiptType::Read,
836                ReceiptThread::Unthreaded,
837                user_id()
838            )
839            .await
840            .expect("failed to read unthreaded user room receipt")
841            .is_none()
842        );
843        assert!(
844            self.get_event_room_receipt_events(
845                room_id,
846                ReceiptType::Read,
847                ReceiptThread::Unthreaded,
848                first_event_id
849            )
850            .await
851            .expect("failed to read unthreaded event room receipt for 1")
852            .is_empty()
853        );
854        assert!(
855            self.get_event_room_receipt_events(
856                room_id,
857                ReceiptType::Read,
858                ReceiptThread::Unthreaded,
859                second_event_id
860            )
861            .await
862            .expect("failed to read unthreaded event room receipt for 2")
863            .is_empty()
864        );
865
866        let mut changes = StateChanges::default();
867        changes.add_receipts(room_id, first_receipt_event);
868
869        self.save_changes(&changes).await?;
870        let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
871            .get_user_room_receipt_event(
872                room_id,
873                ReceiptType::Read,
874                ReceiptThread::Unthreaded,
875                user_id(),
876            )
877            .await
878            .expect("failed to read unthreaded user room receipt after save")
879            .unwrap();
880        assert_eq!(unthreaded_user_receipt_event_id, first_event_id);
881        assert_eq!(unthreaded_user_receipt.ts.unwrap().0, first_receipt_ts);
882        let first_event_unthreaded_receipts = self
883            .get_event_room_receipt_events(
884                room_id,
885                ReceiptType::Read,
886                ReceiptThread::Unthreaded,
887                first_event_id,
888            )
889            .await
890            .expect("failed to read unthreaded event room receipt for 1 after save");
891        assert_eq!(
892            first_event_unthreaded_receipts.len(),
893            1,
894            "Found a wrong number of unthreaded receipts for 1 after save"
895        );
896        assert_eq!(first_event_unthreaded_receipts[0].0, user_id());
897        assert_eq!(first_event_unthreaded_receipts[0].1.ts.unwrap().0, first_receipt_ts);
898        assert!(
899            self.get_event_room_receipt_events(
900                room_id,
901                ReceiptType::Read,
902                ReceiptThread::Unthreaded,
903                second_event_id
904            )
905            .await
906            .expect("failed to read unthreaded event room receipt for 2 after save")
907            .is_empty()
908        );
909
910        let mut changes = StateChanges::default();
911        changes.add_receipts(room_id, second_receipt_event);
912
913        self.save_changes(&changes).await.expect("Saving works");
914        let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
915            .get_user_room_receipt_event(
916                room_id,
917                ReceiptType::Read,
918                ReceiptThread::Unthreaded,
919                user_id(),
920            )
921            .await
922            .expect("Getting unthreaded user room receipt after save failed")
923            .unwrap();
924        assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
925        assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
926        assert!(
927            self.get_event_room_receipt_events(
928                room_id,
929                ReceiptType::Read,
930                ReceiptThread::Unthreaded,
931                first_event_id
932            )
933            .await
934            .expect("Getting unthreaded event room receipt events for first event failed")
935            .is_empty()
936        );
937        let second_event_unthreaded_receipts = self
938            .get_event_room_receipt_events(
939                room_id,
940                ReceiptType::Read,
941                ReceiptThread::Unthreaded,
942                second_event_id,
943            )
944            .await
945            .expect("Getting unthreaded event room receipt events for second event failed");
946        assert_eq!(
947            second_event_unthreaded_receipts.len(),
948            1,
949            "Found a wrong number of unthreaded receipts for second event after save"
950        );
951        assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
952        assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
953
954        assert!(
955            self.get_user_room_receipt_event(
956                room_id,
957                ReceiptType::Read,
958                ReceiptThread::Main,
959                user_id()
960            )
961            .await
962            .expect("failed to read threaded user room receipt")
963            .is_none()
964        );
965        assert!(
966            self.get_event_room_receipt_events(
967                room_id,
968                ReceiptType::Read,
969                ReceiptThread::Main,
970                second_event_id
971            )
972            .await
973            .expect("Getting threaded event room receipts for 2 failed")
974            .is_empty()
975        );
976
977        let mut changes = StateChanges::default();
978        changes.add_receipts(room_id, third_receipt_event);
979
980        self.save_changes(&changes).await.expect("Saving works");
981        // Unthreaded receipts should not have changed.
982        let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
983            .get_user_room_receipt_event(
984                room_id,
985                ReceiptType::Read,
986                ReceiptThread::Unthreaded,
987                user_id(),
988            )
989            .await
990            .expect("Getting unthreaded user room receipt after save failed")
991            .unwrap();
992        assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
993        assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
994        let second_event_unthreaded_receipts = self
995            .get_event_room_receipt_events(
996                room_id,
997                ReceiptType::Read,
998                ReceiptThread::Unthreaded,
999                second_event_id,
1000            )
1001            .await
1002            .expect("Getting unthreaded event room receipt events for second event failed");
1003        assert_eq!(
1004            second_event_unthreaded_receipts.len(),
1005            1,
1006            "Found a wrong number of unthreaded receipts for second event after save"
1007        );
1008        assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
1009        assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
1010        // Threaded receipts should have changed
1011        let (threaded_user_receipt_event_id, threaded_user_receipt) = self
1012            .get_user_room_receipt_event(room_id, ReceiptType::Read, ReceiptThread::Main, user_id())
1013            .await
1014            .expect("Getting threaded user room receipt after save failed")
1015            .unwrap();
1016        assert_eq!(threaded_user_receipt_event_id, second_event_id);
1017        assert_eq!(threaded_user_receipt.ts.unwrap().0, third_receipt_ts);
1018        let second_event_threaded_receipts = self
1019            .get_event_room_receipt_events(
1020                room_id,
1021                ReceiptType::Read,
1022                ReceiptThread::Main,
1023                second_event_id,
1024            )
1025            .await
1026            .expect("Getting threaded event room receipt events for second event failed");
1027        assert_eq!(
1028            second_event_threaded_receipts.len(),
1029            1,
1030            "Found a wrong number of threaded receipts for second event after save"
1031        );
1032        assert_eq!(second_event_threaded_receipts[0].0, user_id());
1033        assert_eq!(second_event_threaded_receipts[0].1.ts.unwrap().0, third_receipt_ts);
1034
1035        Ok(())
1036    }
1037
1038    async fn test_custom_storage(&self) -> TestResult {
1039        let key = "my_key";
1040        let value = &[0, 1, 2, 3];
1041
1042        self.set_custom_value(key.as_bytes(), value.to_vec()).await?;
1043
1044        let read = self.get_custom_value(key.as_bytes()).await?;
1045
1046        assert_eq!(Some(value.as_ref()), read.as_deref());
1047
1048        Ok(())
1049    }
1050
1051    async fn test_stripped_non_stripped(&self) -> TestResult {
1052        let room_id = room_id!("!test_stripped_non_stripped:localhost");
1053        let user_id = user_id();
1054
1055        assert!(self.get_member_event(room_id, user_id).await?.is_none());
1056        assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 0);
1057
1058        let mut changes = StateChanges::default();
1059        changes
1060            .state
1061            .entry(room_id.to_owned())
1062            .or_default()
1063            .entry(StateEventType::RoomMember)
1064            .or_default()
1065            .insert(user_id.into(), membership_event().cast());
1066        changes.add_room(RoomInfo::new(room_id, RoomState::Left));
1067        self.save_changes(&changes).await?;
1068
1069        let member_event = self.get_member_event(room_id, user_id).await?.unwrap().deserialize()?;
1070        assert!(matches!(member_event, MemberEvent::Sync(_)));
1071        assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 1);
1072
1073        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
1074        assert_eq!(members, vec![user_id.to_owned()]);
1075
1076        let mut changes = StateChanges::default();
1077        changes.add_stripped_member(room_id, user_id, custom_stripped_membership_event(user_id));
1078        changes.add_room(RoomInfo::new(room_id, RoomState::Invited));
1079        self.save_changes(&changes).await?;
1080
1081        let member_event = self.get_member_event(room_id, user_id).await?.unwrap().deserialize()?;
1082        assert!(matches!(member_event, MemberEvent::Stripped(_)));
1083        assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 1);
1084
1085        let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
1086        assert_eq!(members, vec![user_id.to_owned()]);
1087
1088        Ok(())
1089    }
1090
1091    async fn test_room_removal(&self) -> TestResult {
1092        let room_id = room_id();
1093        let user_id = user_id();
1094        let display_name = DisplayName::new("example");
1095        let stripped_room_id = stripped_room_id();
1096
1097        self.populate().await?;
1098
1099        {
1100            // Add a send queue request in that room.
1101            let txn = TransactionId::new();
1102            let ev =
1103                SerializableEventContent::new(&RoomMessageEventContent::text_plain("sup").into())?;
1104            self.save_send_queue_request(
1105                room_id,
1106                txn.clone(),
1107                MilliSecondsSinceUnixEpoch::now(),
1108                ev.into(),
1109                0,
1110            )
1111            .await?;
1112
1113            // Add a single dependent queue request.
1114            self.save_dependent_queued_request(
1115                room_id,
1116                &txn,
1117                ChildTransactionId::new(),
1118                MilliSecondsSinceUnixEpoch::now(),
1119                DependentQueuedRequestKind::RedactEvent,
1120            )
1121            .await?;
1122        }
1123
1124        self.remove_room(room_id).await?;
1125
1126        assert_eq!(
1127            self.get_room_infos(&RoomLoadSettings::default()).await?.len(),
1128            1,
1129            "room is still there"
1130        );
1131
1132        assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_none());
1133        assert!(
1134            self.get_state_events(room_id, StateEventType::RoomTopic).await?.is_empty(),
1135            "still state events found"
1136        );
1137        assert!(self.get_profile(room_id, user_id).await?.is_none());
1138        assert!(self.get_member_event(room_id, user_id).await?.is_none());
1139        assert!(
1140            self.get_user_ids(room_id, RoomMemberships::empty()).await?.is_empty(),
1141            "still user ids found"
1142        );
1143        assert!(
1144            self.get_user_ids(room_id, RoomMemberships::INVITE).await?.is_empty(),
1145            "still invited user ids found"
1146        );
1147        assert!(
1148            self.get_user_ids(room_id, RoomMemberships::JOIN).await?.is_empty(),
1149            "still joined users found"
1150        );
1151        assert!(
1152            self.get_users_with_display_name(room_id, &display_name).await?.is_empty(),
1153            "still display names found"
1154        );
1155        assert!(
1156            self.get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
1157                .await?
1158                .is_none()
1159        );
1160        assert!(
1161            self.get_user_room_receipt_event(
1162                room_id,
1163                ReceiptType::Read,
1164                ReceiptThread::Unthreaded,
1165                user_id
1166            )
1167            .await?
1168            .is_none()
1169        );
1170        assert!(
1171            self.get_event_room_receipt_events(
1172                room_id,
1173                ReceiptType::Read,
1174                ReceiptThread::Unthreaded,
1175                first_receipt_event_id()
1176            )
1177            .await?
1178            .is_empty(),
1179            "still event recepts in the store"
1180        );
1181        assert!(self.load_send_queue_requests(room_id).await?.is_empty());
1182        assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1183
1184        self.remove_room(stripped_room_id).await?;
1185
1186        assert!(
1187            self.get_room_infos(&RoomLoadSettings::default()).await?.is_empty(),
1188            "still room info found"
1189        );
1190        Ok(())
1191    }
1192
1193    async fn test_profile_removal(&self) -> TestResult {
1194        let room_id = room_id();
1195
1196        // Both the user id and invited user id get a profile in populate().
1197        let user_id = user_id();
1198        let invited_user_id = invited_user_id();
1199
1200        self.populate().await?;
1201
1202        let new_invite_member_json = json!({
1203            "content": {
1204                "avatar_url": "mxc://localhost/SEsfnsuifSDFSSEG",
1205                "displayname": "example after update",
1206                "membership": "invite",
1207                "reason": "Looking for support"
1208            },
1209            "event_id": "$143273582443PhrSm:localhost",
1210            "origin_server_ts": 1432735824,
1211            "room_id": room_id,
1212            "sender": user_id,
1213            "state_key": invited_user_id,
1214            "type": "m.room.member",
1215        });
1216        let new_invite_member_event: SyncRoomMemberEvent =
1217            serde_json::from_value(new_invite_member_json.clone())?;
1218
1219        let mut changes = StateChanges {
1220            // Both get their profiles deleted…
1221            profiles_to_delete: [(
1222                room_id.to_owned(),
1223                vec![user_id.to_owned(), invited_user_id.to_owned()],
1224            )]
1225            .into(),
1226
1227            // …but the invited user get a new profile.
1228            profiles: {
1229                let mut map = BTreeMap::default();
1230                map.insert(
1231                    room_id.to_owned(),
1232                    [(invited_user_id.to_owned(), new_invite_member_event.into())]
1233                        .into_iter()
1234                        .collect(),
1235                );
1236                map
1237            },
1238
1239            ..StateChanges::default()
1240        };
1241
1242        let raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(new_invite_member_json)
1243            .expect("can create sync-state-event for topic");
1244        let event = raw.deserialize()?;
1245        changes.add_state_event(room_id, event, raw);
1246
1247        self.save_changes(&changes).await?;
1248
1249        // The profile for user has been removed.
1250        assert!(self.get_profile(room_id, user_id).await?.is_none());
1251        assert!(self.get_member_event(room_id, user_id).await?.is_some());
1252
1253        // The profile for the invited user has been updated.
1254        let invited_member_event = self.get_profile(room_id, invited_user_id).await?.unwrap();
1255        assert_eq!(
1256            invited_member_event.content.displayname.as_deref(),
1257            Some("example after update")
1258        );
1259        assert!(self.get_member_event(room_id, invited_user_id).await?.is_some());
1260
1261        Ok(())
1262    }
1263
1264    async fn test_presence_saving(&self) -> TestResult {
1265        let user_id = user_id();
1266        let second_user_id = user_id!("@second:localhost");
1267        let third_user_id = user_id!("@third:localhost");
1268        let unknown_user_id = user_id!("@unknown:localhost");
1269
1270        // No event in store.
1271        let mut user_ids = vec![user_id.to_owned()];
1272        let presence_event = self.get_presence_event(user_id).await;
1273        assert!(presence_event?.is_none());
1274        let presence_events = self.get_presence_events(&user_ids).await;
1275        assert!(presence_events?.is_empty());
1276
1277        // One event in store.
1278        let mut changes = StateChanges::default();
1279        changes.presence.insert(user_id.to_owned(), custom_presence_event(user_id));
1280        self.save_changes(&changes).await?;
1281
1282        let presence_event = self.get_presence_event(user_id).await;
1283        assert!(presence_event?.is_some());
1284        let presence_events = self.get_presence_events(&user_ids).await;
1285        assert_eq!(presence_events?.len(), 1);
1286
1287        // Several events in store.
1288        let mut changes = StateChanges::default();
1289        changes.presence.insert(second_user_id.to_owned(), custom_presence_event(second_user_id));
1290        changes.presence.insert(third_user_id.to_owned(), custom_presence_event(third_user_id));
1291        self.save_changes(&changes).await?;
1292
1293        user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
1294        let presence_event = self.get_presence_event(second_user_id).await;
1295        assert!(presence_event?.is_some());
1296        let presence_event = self.get_presence_event(third_user_id).await;
1297        assert!(presence_event?.is_some());
1298        let presence_events = self.get_presence_events(&user_ids).await;
1299        assert_eq!(presence_events?.len(), 3);
1300
1301        // Several events in store with one unknown.
1302        user_ids.push(unknown_user_id.to_owned());
1303        let member_events = self.get_presence_events(&user_ids).await;
1304        assert_eq!(member_events?.len(), 3);
1305
1306        // Empty user IDs list.
1307        let presence_events = self.get_presence_events(&[]).await;
1308        assert!(presence_events?.is_empty());
1309
1310        Ok(())
1311    }
1312
1313    async fn test_display_names_saving(&self) -> TestResult {
1314        let room_id = room_id!("!test_display_names_saving:localhost");
1315        let user_id = user_id();
1316        let user_display_name = DisplayName::new("User");
1317        let second_user_id = user_id!("@second:localhost");
1318        let third_user_id = user_id!("@third:localhost");
1319        let other_display_name = DisplayName::new("Raoul");
1320        let unknown_display_name = DisplayName::new("Unknown");
1321
1322        // No event in store.
1323        let mut display_names = vec![user_display_name.to_owned()];
1324        let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1325        assert!(users.is_empty());
1326        let names = self.get_users_with_display_names(room_id, &display_names).await?;
1327        assert!(names.is_empty());
1328
1329        // One event in store.
1330        let mut changes = StateChanges::default();
1331        changes
1332            .ambiguity_maps
1333            .entry(room_id.to_owned())
1334            .or_default()
1335            .insert(user_display_name.to_owned(), [user_id.to_owned()].into());
1336        self.save_changes(&changes).await?;
1337
1338        let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1339        assert_eq!(users.len(), 1);
1340        let names = self.get_users_with_display_names(room_id, &display_names).await?;
1341        assert_eq!(names.len(), 1);
1342        assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1343
1344        // Several events in store.
1345        let mut changes = StateChanges::default();
1346        changes.ambiguity_maps.entry(room_id.to_owned()).or_default().insert(
1347            other_display_name.to_owned(),
1348            [second_user_id.to_owned(), third_user_id.to_owned()].into(),
1349        );
1350        self.save_changes(&changes).await?;
1351
1352        display_names.push(other_display_name.to_owned());
1353        let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1354        assert_eq!(users.len(), 1);
1355        let users = self.get_users_with_display_name(room_id, &other_display_name).await?;
1356        assert_eq!(users.len(), 2);
1357        let names = self.get_users_with_display_names(room_id, &display_names).await?;
1358        assert_eq!(names.len(), 2);
1359        assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1360        assert_eq!(names.get(&other_display_name).unwrap().len(), 2);
1361
1362        // Several events in store with one unknown.
1363        display_names.push(unknown_display_name.to_owned());
1364        let names = self.get_users_with_display_names(room_id, &display_names).await?;
1365        assert_eq!(names.len(), 2);
1366
1367        // Empty user IDs list.
1368        let names = self.get_users_with_display_names(room_id, &[]).await?;
1369        assert!(names.is_empty());
1370
1371        Ok(())
1372    }
1373
1374    #[allow(clippy::needless_range_loop)]
1375    async fn test_send_queue(&self) -> TestResult {
1376        let room_id = room_id!("!test_send_queue:localhost");
1377
1378        // No queued event in store at first.
1379        let events = self.load_send_queue_requests(room_id).await?;
1380        assert!(events.is_empty());
1381
1382        // Saving one thing should work.
1383        let txn0 = TransactionId::new();
1384        let event0 =
1385            SerializableEventContent::new(&RoomMessageEventContent::text_plain("msg0").into())?;
1386        self.save_send_queue_request(
1387            room_id,
1388            txn0.clone(),
1389            MilliSecondsSinceUnixEpoch::now(),
1390            event0.into(),
1391            0,
1392        )
1393        .await?;
1394
1395        // Reading it will work.
1396        let pending = self.load_send_queue_requests(room_id).await?;
1397
1398        assert_eq!(pending.len(), 1);
1399        {
1400            assert_eq!(pending[0].transaction_id, txn0);
1401
1402            let deserialized = pending[0].as_event().unwrap().deserialize()?;
1403            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1404            assert_eq!(content.body(), "msg0");
1405
1406            assert!(!pending[0].is_wedged());
1407        }
1408
1409        // Saving another three things should work.
1410        for i in 1..=3 {
1411            let txn = TransactionId::new();
1412            let event = SerializableEventContent::new(
1413                &RoomMessageEventContent::text_plain(format!("msg{i}")).into(),
1414            )?;
1415
1416            self.save_send_queue_request(
1417                room_id,
1418                txn,
1419                MilliSecondsSinceUnixEpoch::now(),
1420                event.into(),
1421                0,
1422            )
1423            .await?;
1424        }
1425
1426        // Reading all the events should work.
1427        let pending = self.load_send_queue_requests(room_id).await?;
1428
1429        // All the events should be retrieved, in the same order.
1430        assert_eq!(pending.len(), 4);
1431
1432        assert_eq!(pending[0].transaction_id, txn0);
1433
1434        for i in 0..4 {
1435            let deserialized = pending[i].as_event().unwrap().deserialize()?;
1436            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1437            assert_eq!(content.body(), format!("msg{i}"));
1438            assert!(!pending[i].is_wedged());
1439        }
1440
1441        // Marking an event as wedged works.
1442        let txn2 = &pending[2].transaction_id;
1443        self.update_send_queue_request_status(
1444            room_id,
1445            txn2,
1446            Some(QueueWedgeError::GenericApiError { msg: "Oops".to_owned() }),
1447        )
1448        .await?;
1449
1450        // And it is reflected.
1451        let pending = self.load_send_queue_requests(room_id).await?;
1452
1453        // All the events should be retrieved, in the same order.
1454        assert_eq!(pending.len(), 4);
1455        assert_eq!(pending[0].transaction_id, txn0);
1456        assert_eq!(pending[2].transaction_id, *txn2);
1457        assert!(pending[2].is_wedged());
1458        let error = pending[2].clone().error.unwrap();
1459        let generic_error = assert_matches!(error, QueueWedgeError::GenericApiError { msg } => msg);
1460        assert_eq!(generic_error, "Oops");
1461        for i in 0..4 {
1462            if i != 2 {
1463                assert!(!pending[i].is_wedged());
1464            }
1465        }
1466
1467        // Updating an event will work, and reset its wedged state to false.
1468        let event0 = SerializableEventContent::new(
1469            &RoomMessageEventContent::text_plain("wow that's a cool test").into(),
1470        )?;
1471        self.update_send_queue_request(room_id, txn2, event0.into()).await?;
1472
1473        // And it is reflected.
1474        let pending = self.load_send_queue_requests(room_id).await?;
1475
1476        assert_eq!(pending.len(), 4);
1477        {
1478            assert_eq!(pending[2].transaction_id, *txn2);
1479
1480            let deserialized = pending[2].as_event().unwrap().deserialize()?;
1481            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1482            assert_eq!(content.body(), "wow that's a cool test");
1483
1484            assert!(!pending[2].is_wedged());
1485
1486            for i in 0..4 {
1487                if i != 2 {
1488                    let deserialized = pending[i].as_event().unwrap().deserialize()?;
1489                    assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1490                    assert_eq!(content.body(), format!("msg{i}"));
1491
1492                    assert!(!pending[i].is_wedged());
1493                }
1494            }
1495        }
1496
1497        // Removing an event works.
1498        self.remove_send_queue_request(room_id, &txn0).await?;
1499
1500        // And it is reflected.
1501        let pending = self.load_send_queue_requests(room_id).await?;
1502
1503        assert_eq!(pending.len(), 3);
1504        assert_eq!(pending[1].transaction_id, *txn2);
1505        for i in 0..3 {
1506            assert_ne!(pending[i].transaction_id, txn0);
1507        }
1508
1509        // Now add one event for two other rooms, remove one of the events, and then
1510        // query all the rooms which have outstanding unsent events.
1511
1512        // Add one event for room2.
1513        let room_id2 = room_id!("!test_send_queue_two:localhost");
1514        {
1515            let txn = TransactionId::new();
1516            let event = SerializableEventContent::new(
1517                &RoomMessageEventContent::text_plain("room2").into(),
1518            )?;
1519            self.save_send_queue_request(
1520                room_id2,
1521                txn.clone(),
1522                MilliSecondsSinceUnixEpoch::now(),
1523                event.into(),
1524                0,
1525            )
1526            .await?;
1527        }
1528
1529        // Add and remove one event for room3.
1530        {
1531            let room_id3 = room_id!("!test_send_queue_three:localhost");
1532            let txn = TransactionId::new();
1533            let event = SerializableEventContent::new(
1534                &RoomMessageEventContent::text_plain("room3").into(),
1535            )?;
1536            self.save_send_queue_request(
1537                room_id3,
1538                txn.clone(),
1539                MilliSecondsSinceUnixEpoch::now(),
1540                event.into(),
1541                0,
1542            )
1543            .await?;
1544
1545            self.remove_send_queue_request(room_id3, &txn).await?;
1546        }
1547
1548        // Query all the rooms which have unsent events. Per the previous steps,
1549        // it should be room1 and room2, not room3.
1550        let outstanding_rooms = self.load_rooms_with_unsent_requests().await?;
1551        assert_eq!(outstanding_rooms.len(), 2);
1552        assert!(outstanding_rooms.iter().any(|room| room == room_id));
1553        assert!(outstanding_rooms.iter().any(|room| room == room_id2));
1554
1555        Ok(())
1556    }
1557
1558    async fn test_send_queue_priority(&self) -> TestResult {
1559        let room_id = room_id!("!test_send_queue:localhost");
1560
1561        // No queued event in store at first.
1562        let events = self.load_send_queue_requests(room_id).await?;
1563        assert!(events.is_empty());
1564
1565        // Saving one request should work.
1566        let low0_txn = TransactionId::new();
1567        let ev0 =
1568            SerializableEventContent::new(&RoomMessageEventContent::text_plain("low0").into())?;
1569        self.save_send_queue_request(
1570            room_id,
1571            low0_txn.clone(),
1572            MilliSecondsSinceUnixEpoch::now(),
1573            ev0.into(),
1574            2,
1575        )
1576        .await?;
1577
1578        // Saving one request with higher priority should work.
1579        let high_txn = TransactionId::new();
1580        let ev1 =
1581            SerializableEventContent::new(&RoomMessageEventContent::text_plain("high").into())?;
1582        self.save_send_queue_request(
1583            room_id,
1584            high_txn.clone(),
1585            MilliSecondsSinceUnixEpoch::now(),
1586            ev1.into(),
1587            10,
1588        )
1589        .await?;
1590
1591        // Saving another request with the low priority should work.
1592        let low1_txn = TransactionId::new();
1593        let ev2 =
1594            SerializableEventContent::new(&RoomMessageEventContent::text_plain("low1").into())?;
1595        self.save_send_queue_request(
1596            room_id,
1597            low1_txn.clone(),
1598            MilliSecondsSinceUnixEpoch::now(),
1599            ev2.into(),
1600            2,
1601        )
1602        .await?;
1603
1604        // The requests should be ordered from higher priority to lower, and when equal,
1605        // should use the insertion order instead.
1606        let pending = self.load_send_queue_requests(room_id).await?;
1607
1608        assert_eq!(pending.len(), 3);
1609        {
1610            assert_eq!(pending[0].transaction_id, high_txn);
1611
1612            let deserialized = pending[0].as_event().unwrap().deserialize()?;
1613            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1614            assert_eq!(content.body(), "high");
1615        }
1616
1617        {
1618            assert_eq!(pending[1].transaction_id, low0_txn);
1619
1620            let deserialized = pending[1].as_event().unwrap().deserialize()?;
1621            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1622            assert_eq!(content.body(), "low0");
1623        }
1624
1625        {
1626            assert_eq!(pending[2].transaction_id, low1_txn);
1627
1628            let deserialized = pending[2].as_event().unwrap().deserialize()?;
1629            assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1630            assert_eq!(content.body(), "low1");
1631        }
1632
1633        Ok(())
1634    }
1635
1636    async fn test_send_queue_dependents(&self) -> TestResult {
1637        let room_id = room_id!("!test_send_queue_dependents:localhost");
1638
1639        // Save one send queue event to start with.
1640        let txn0 = TransactionId::new();
1641        let event0 =
1642            SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into())?;
1643        self.save_send_queue_request(
1644            room_id,
1645            txn0.clone(),
1646            MilliSecondsSinceUnixEpoch::now(),
1647            event0.clone().into(),
1648            0,
1649        )
1650        .await?;
1651
1652        // No dependents, to start with.
1653        assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1654
1655        // Save a redaction for that event.
1656        let child_txn = ChildTransactionId::new();
1657        self.save_dependent_queued_request(
1658            room_id,
1659            &txn0,
1660            child_txn.clone(),
1661            MilliSecondsSinceUnixEpoch::now(),
1662            DependentQueuedRequestKind::RedactEvent,
1663        )
1664        .await?;
1665
1666        // It worked.
1667        let dependents = self.load_dependent_queued_requests(room_id).await?;
1668        assert_eq!(dependents.len(), 1);
1669        assert_eq!(dependents[0].parent_transaction_id, txn0);
1670        assert_eq!(dependents[0].own_transaction_id, child_txn);
1671        assert!(dependents[0].parent_key.is_none());
1672        assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1673
1674        // Update the event id.
1675        let (event, event_type) = event0.raw();
1676        let event_id = owned_event_id!("$1");
1677        let num_updated = self
1678            .mark_dependent_queued_requests_as_ready(
1679                room_id,
1680                &txn0,
1681                SentRequestKey::Event {
1682                    event_id: event_id.clone(),
1683                    event: event.clone(),
1684                    event_type: event_type.to_owned(),
1685                },
1686            )
1687            .await?;
1688        assert_eq!(num_updated, 1);
1689
1690        // It worked.
1691        let dependents = self.load_dependent_queued_requests(room_id).await?;
1692        assert_eq!(dependents.len(), 1);
1693        assert_eq!(dependents[0].parent_transaction_id, txn0);
1694        assert_eq!(dependents[0].own_transaction_id, child_txn);
1695        assert_matches!(
1696            dependents[0].parent_key.as_ref(),
1697            Some(SentRequestKey::Event {
1698                event_id: received_event_id,
1699                event: received_event,
1700                event_type: received_event_type
1701            }) => {
1702                assert_eq!(received_event_id, &event_id);
1703                assert_eq!(received_event.json().to_string(), event.json().to_string());
1704                assert_eq!(received_event_type.as_str(), event_type);
1705            }
1706        );
1707        assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1708
1709        // Now remove it.
1710        let removed = self
1711            .remove_dependent_queued_request(room_id, &dependents[0].own_transaction_id)
1712            .await?;
1713        assert!(removed);
1714
1715        // It worked.
1716        assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1717
1718        // Now, inserting a dependent event and removing the original send queue event
1719        // will NOT remove the dependent event.
1720        let txn1 = TransactionId::new();
1721        let event1 =
1722            SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into())?;
1723        self.save_send_queue_request(
1724            room_id,
1725            txn1.clone(),
1726            MilliSecondsSinceUnixEpoch::now(),
1727            event1.into(),
1728            0,
1729        )
1730        .await?;
1731
1732        self.save_dependent_queued_request(
1733            room_id,
1734            &txn0,
1735            ChildTransactionId::new(),
1736            MilliSecondsSinceUnixEpoch::now(),
1737            DependentQueuedRequestKind::RedactEvent,
1738        )
1739        .await?;
1740        assert_eq!(self.load_dependent_queued_requests(room_id).await?.len(), 1);
1741
1742        self.save_dependent_queued_request(
1743            room_id,
1744            &txn1,
1745            ChildTransactionId::new(),
1746            MilliSecondsSinceUnixEpoch::now(),
1747            DependentQueuedRequestKind::EditEvent {
1748                new_content: SerializableEventContent::new(
1749                    &RoomMessageEventContent::text_plain("edit").into(),
1750                )?,
1751            },
1752        )
1753        .await?;
1754        assert_eq!(self.load_dependent_queued_requests(room_id).await?.len(), 2);
1755
1756        // Remove event0 / txn0.
1757        let removed = self.remove_send_queue_request(room_id, &txn0).await?;
1758        assert!(removed);
1759
1760        // This has removed none of the dependent events.
1761        let dependents = self.load_dependent_queued_requests(room_id).await?;
1762        assert_eq!(dependents.len(), 2);
1763
1764        Ok(())
1765    }
1766
1767    async fn test_update_send_queue_dependent(&self) -> TestResult {
1768        let room_id = room_id!("!test_send_queue_dependents:localhost");
1769
1770        let txn = TransactionId::new();
1771
1772        // Save a dependent redaction for an event.
1773        let child_txn = ChildTransactionId::new();
1774
1775        self.save_dependent_queued_request(
1776            room_id,
1777            &txn,
1778            child_txn.clone(),
1779            MilliSecondsSinceUnixEpoch::now(),
1780            DependentQueuedRequestKind::RedactEvent,
1781        )
1782        .await?;
1783
1784        // It worked.
1785        let dependents = self.load_dependent_queued_requests(room_id).await?;
1786        assert_eq!(dependents.len(), 1);
1787        assert_eq!(dependents[0].parent_transaction_id, txn);
1788        assert_eq!(dependents[0].own_transaction_id, child_txn);
1789        assert!(dependents[0].parent_key.is_none());
1790        assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1791
1792        // Make it a reaction, instead of a redaction.
1793        self.update_dependent_queued_request(
1794            room_id,
1795            &child_txn,
1796            DependentQueuedRequestKind::ReactEvent { key: "👍".to_owned() },
1797        )
1798        .await?;
1799
1800        // It worked.
1801        let dependents = self.load_dependent_queued_requests(room_id).await?;
1802        assert_eq!(dependents.len(), 1);
1803        assert_eq!(dependents[0].parent_transaction_id, txn);
1804        assert_eq!(dependents[0].own_transaction_id, child_txn);
1805        assert!(dependents[0].parent_key.is_none());
1806        assert_matches!(
1807            &dependents[0].kind,
1808            DependentQueuedRequestKind::ReactEvent { key } => {
1809                assert_eq!(key, "👍");
1810            }
1811        );
1812
1813        Ok(())
1814    }
1815
1816    async fn test_get_room_infos(&self) -> TestResult {
1817        let room_id_0 = room_id!("!r0");
1818        let room_id_1 = room_id!("!r1");
1819        let room_id_2 = room_id!("!r2");
1820
1821        // There is no room for the moment.
1822        {
1823            assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 0);
1824        }
1825
1826        // Save rooms.
1827        let mut changes = StateChanges::default();
1828        changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
1829        changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
1830        self.save_changes(&changes).await?;
1831
1832        // We can find all the rooms with `RoomLoadSettings::All`.
1833        {
1834            let mut all_rooms = self.get_room_infos(&RoomLoadSettings::All).await?;
1835
1836            // (We need to sort by `room_id` so that the test is stable across all
1837            // `StateStore` implementations).
1838            all_rooms.sort_by(|a, b| a.room_id.cmp(&b.room_id));
1839
1840            assert_eq!(all_rooms.len(), 2);
1841            assert_eq!(all_rooms[0].room_id, room_id_0);
1842            assert_eq!(all_rooms[1].room_id, room_id_1);
1843        }
1844
1845        // We can find a single room with `RoomLoadSettings::One`.
1846        {
1847            let all_rooms =
1848                self.get_room_infos(&RoomLoadSettings::One(room_id_1.to_owned())).await?;
1849
1850            assert_eq!(all_rooms.len(), 1);
1851            assert_eq!(all_rooms[0].room_id, room_id_1);
1852        }
1853
1854        // `RoomLoadSetting::One` can result in loading zero room if the room is
1855        // unknown.
1856        {
1857            let all_rooms =
1858                self.get_room_infos(&RoomLoadSettings::One(room_id_2.to_owned())).await?;
1859
1860            assert_eq!(all_rooms.len(), 0);
1861        }
1862
1863        Ok(())
1864    }
1865
1866    async fn test_thread_subscriptions(&self) -> TestResult {
1867        let first_thread = event_id!("$t1");
1868        let second_thread = event_id!("$t2");
1869
1870        // At first, there is no thread subscription.
1871        let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1872        assert!(maybe_sub.is_none());
1873
1874        let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1875        assert!(maybe_sub.is_none());
1876
1877        // Setting the thread subscription works.
1878        self.upsert_thread_subscriptions(vec![(
1879            room_id(),
1880            first_thread,
1881            StoredThreadSubscription {
1882                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1883                bump_stamp: None,
1884            },
1885        )])
1886        .await?;
1887
1888        self.upsert_thread_subscriptions(vec![(
1889            room_id(),
1890            second_thread,
1891            StoredThreadSubscription {
1892                status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1893                bump_stamp: None,
1894            },
1895        )])
1896        .await?;
1897
1898        // Now, reading the thread subscription returns the expected status.
1899        let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1900        assert_eq!(
1901            maybe_sub,
1902            Some(StoredThreadSubscription {
1903                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1904                bump_stamp: None,
1905            })
1906        );
1907
1908        let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1909        assert_eq!(
1910            maybe_sub,
1911            Some(StoredThreadSubscription {
1912                status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1913                bump_stamp: None,
1914            })
1915        );
1916
1917        // We can override the thread subscription status.
1918        self.upsert_thread_subscriptions(vec![(
1919            room_id(),
1920            first_thread,
1921            StoredThreadSubscription {
1922                status: ThreadSubscriptionStatus::Unsubscribed,
1923                bump_stamp: None,
1924            },
1925        )])
1926        .await?;
1927
1928        // And it's correctly reflected.
1929        let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1930        assert_eq!(
1931            maybe_sub,
1932            Some(StoredThreadSubscription {
1933                status: ThreadSubscriptionStatus::Unsubscribed,
1934                bump_stamp: None,
1935            })
1936        );
1937
1938        // And the second thread is still subscribed.
1939        let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1940        assert_eq!(
1941            maybe_sub,
1942            Some(StoredThreadSubscription {
1943                status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1944                bump_stamp: None,
1945            })
1946        );
1947
1948        // We can remove a thread subscription.
1949        self.remove_thread_subscription(room_id(), second_thread).await?;
1950
1951        // And it's correctly reflected.
1952        let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1953        assert_eq!(maybe_sub, None);
1954
1955        // And the first thread is still unsubscribed.
1956        let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1957        assert_eq!(
1958            maybe_sub,
1959            Some(StoredThreadSubscription {
1960                status: ThreadSubscriptionStatus::Unsubscribed,
1961                bump_stamp: None,
1962            })
1963        );
1964
1965        // Removing a thread subscription for an unknown thread is a no-op.
1966        self.remove_thread_subscription(room_id(), second_thread).await?;
1967
1968        Ok(())
1969    }
1970
1971    async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult {
1972        let threads = [
1973            event_id!("$t1"),
1974            event_id!("$t2"),
1975            event_id!("$t3"),
1976            event_id!("$t4"),
1977            event_id!("$t5"),
1978            event_id!("$t6"),
1979        ];
1980        // Helper for building the input for `upsert_thread_subscriptions()`,
1981        // which is of the type: Vec<(&RoomId, &EventId, StoredThreadSubscription)>
1982        let build_subscription_updates = |subs: &[StoredThreadSubscription]| {
1983            threads
1984                .iter()
1985                .zip(subs)
1986                .map(|(&event_id, &sub)| (room_id(), event_id, sub))
1987                .collect::<Vec<_>>()
1988        };
1989
1990        // Test bump_stamp logic
1991        let initial_subscriptions = build_subscription_updates(&[
1992            StoredThreadSubscription {
1993                status: ThreadSubscriptionStatus::Unsubscribed,
1994                bump_stamp: None,
1995            },
1996            StoredThreadSubscription {
1997                status: ThreadSubscriptionStatus::Unsubscribed,
1998                bump_stamp: Some(14),
1999            },
2000            StoredThreadSubscription {
2001                status: ThreadSubscriptionStatus::Unsubscribed,
2002                bump_stamp: None,
2003            },
2004            StoredThreadSubscription {
2005                status: ThreadSubscriptionStatus::Unsubscribed,
2006                bump_stamp: Some(210),
2007            },
2008            StoredThreadSubscription {
2009                status: ThreadSubscriptionStatus::Unsubscribed,
2010                bump_stamp: Some(5),
2011            },
2012            StoredThreadSubscription {
2013                status: ThreadSubscriptionStatus::Unsubscribed,
2014                bump_stamp: Some(100),
2015            },
2016        ]);
2017
2018        let update_subscriptions = build_subscription_updates(&[
2019            StoredThreadSubscription {
2020                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2021                bump_stamp: None,
2022            },
2023            StoredThreadSubscription {
2024                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2025                bump_stamp: None,
2026            },
2027            StoredThreadSubscription {
2028                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2029                bump_stamp: Some(1101),
2030            },
2031            StoredThreadSubscription {
2032                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2033                bump_stamp: Some(222),
2034            },
2035            StoredThreadSubscription {
2036                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2037                bump_stamp: Some(1),
2038            },
2039            StoredThreadSubscription {
2040                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2041                bump_stamp: Some(100),
2042            },
2043        ]);
2044
2045        let expected_subscriptions = build_subscription_updates(&[
2046            // Status should be updated, because prev and new bump_stamp are both None
2047            StoredThreadSubscription {
2048                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2049                bump_stamp: None,
2050            },
2051            // Status should be updated, but keep initial bump_stamp (new is None)
2052            StoredThreadSubscription {
2053                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2054                bump_stamp: Some(14),
2055            },
2056            // Status should be updated and also bump_stamp should be updated (initial was None)
2057            StoredThreadSubscription {
2058                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2059                bump_stamp: Some(1101),
2060            },
2061            // Status should be updated and also bump_stamp should be updated (initial was lower)
2062            StoredThreadSubscription {
2063                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2064                bump_stamp: Some(222),
2065            },
2066            // Status shouldn't change, as new bump_stamp is lower
2067            StoredThreadSubscription {
2068                status: ThreadSubscriptionStatus::Unsubscribed,
2069                bump_stamp: Some(5),
2070            },
2071            // Status shouldn't change, as bump_stamp is equal to the previous one
2072            StoredThreadSubscription {
2073                status: ThreadSubscriptionStatus::Unsubscribed,
2074                bump_stamp: Some(100),
2075            },
2076        ]);
2077
2078        // Set the initial subscriptions
2079        self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?;
2080
2081        // Assert the subscriptions have been added
2082        for (room_id, thread_id, expected_sub) in &initial_subscriptions {
2083            let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2084            assert_eq!(stored_subscription, Some(*expected_sub));
2085        }
2086
2087        // Update subscriptions
2088        self.upsert_thread_subscriptions(update_subscriptions).await?;
2089
2090        // Assert the expected subscriptions and bump_stamps
2091        for (room_id, thread_id, expected_sub) in &expected_subscriptions {
2092            let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2093            assert_eq!(stored_subscription, Some(*expected_sub));
2094        }
2095
2096        // Test just state changes, but first remove previous subscriptions
2097        for (room_id, thread_id, _) in &expected_subscriptions {
2098            self.remove_thread_subscription(room_id, thread_id).await?;
2099        }
2100
2101        let initial_subscriptions = build_subscription_updates(&[
2102            StoredThreadSubscription {
2103                status: ThreadSubscriptionStatus::Unsubscribed,
2104                bump_stamp: Some(1),
2105            },
2106            StoredThreadSubscription {
2107                status: ThreadSubscriptionStatus::Subscribed { automatic: false },
2108                bump_stamp: Some(1),
2109            },
2110            StoredThreadSubscription {
2111                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2112                bump_stamp: Some(1),
2113            },
2114        ]);
2115
2116        self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?;
2117
2118        for (room_id, thread_id, expected_sub) in &initial_subscriptions {
2119            let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2120            assert_eq!(stored_subscription, Some(*expected_sub));
2121        }
2122
2123        let update_subscriptions = build_subscription_updates(&[
2124            StoredThreadSubscription {
2125                status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2126                bump_stamp: Some(2),
2127            },
2128            StoredThreadSubscription {
2129                status: ThreadSubscriptionStatus::Unsubscribed,
2130                bump_stamp: Some(2),
2131            },
2132            StoredThreadSubscription {
2133                status: ThreadSubscriptionStatus::Subscribed { automatic: false },
2134                bump_stamp: Some(2),
2135            },
2136        ]);
2137
2138        self.upsert_thread_subscriptions(update_subscriptions.clone()).await?;
2139
2140        for (room_id, thread_id, expected_sub) in &update_subscriptions {
2141            let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2142            assert_eq!(stored_subscription, Some(*expected_sub));
2143        }
2144
2145        Ok(())
2146    }
2147}
2148
2149/// Macro building to allow your StateStore implementation to run the entire
2150/// tests suite locally.
2151///
2152/// You need to provide a `async fn get_store() -> StoreResult<impl StateStore>`
2153/// providing a fresh store on the same level you invoke the macro.
2154///
2155/// ## Usage Example:
2156/// ```no_run
2157/// # use matrix_sdk_base::store::{
2158/// #    StateStore,
2159/// #    MemoryStore as MyStore,
2160/// #    Result as StoreResult,
2161/// # };
2162///
2163/// #[cfg(test)]
2164/// mod tests {
2165///     use super::{MyStore, StateStore, StoreResult};
2166///
2167///     async fn get_store() -> StoreResult<impl StateStore> {
2168///         Ok(MyStore::new())
2169///     }
2170///
2171///     statestore_integration_tests!();
2172/// }
2173/// ```
2174#[allow(unused_macros, unused_extern_crates)]
2175#[macro_export]
2176macro_rules! statestore_integration_tests {
2177    () => {
2178        mod statestore_integration_tests {
2179            use matrix_sdk_test::{TestResult, async_test};
2180            use $crate::store::{IntoStateStore, StateStoreIntegrationTests};
2181
2182            use super::get_store;
2183
2184            #[async_test]
2185            async fn test_topic_redaction() -> TestResult {
2186                let store = get_store().await?.into_state_store();
2187                store.test_topic_redaction().await
2188            }
2189
2190            #[async_test]
2191            async fn test_populate_store() -> TestResult {
2192                let store = get_store().await?.into_state_store();
2193                store.test_populate_store().await
2194            }
2195
2196            #[async_test]
2197            async fn test_member_saving() -> TestResult {
2198                let store = get_store().await?.into_state_store();
2199                store.test_member_saving().await
2200            }
2201
2202            #[async_test]
2203            async fn test_filter_saving() -> TestResult {
2204                let store = get_store().await?.into_state_store();
2205                store.test_filter_saving().await
2206            }
2207
2208            #[async_test]
2209            async fn test_user_avatar_url_saving() -> TestResult {
2210                let store = get_store().await?.into_state_store();
2211                store.test_user_avatar_url_saving().await
2212            }
2213
2214            #[async_test]
2215            async fn test_supported_versions_saving() -> TestResult {
2216                let store = get_store().await?.into_state_store();
2217                store.test_supported_versions_saving().await
2218            }
2219
2220            #[async_test]
2221            async fn test_well_known_saving() -> TestResult {
2222                let store = get_store().await?.into_state_store();
2223                store.test_well_known_saving().await
2224            }
2225
2226            #[async_test]
2227            async fn test_sync_token_saving() -> TestResult {
2228                let store = get_store().await?.into_state_store();
2229                store.test_sync_token_saving().await
2230            }
2231
2232            #[async_test]
2233            async fn test_utd_hook_manager_data_saving() -> TestResult {
2234                let store = get_store().await?.into_state_store();
2235                store.test_utd_hook_manager_data_saving().await
2236            }
2237
2238            #[async_test]
2239            async fn test_one_time_key_already_uploaded_data_saving() -> TestResult {
2240                let store = get_store().await?.into_state_store();
2241                store.test_one_time_key_already_uploaded_data_saving().await
2242            }
2243
2244            #[async_test]
2245            async fn test_stripped_member_saving() -> TestResult {
2246                let store = get_store().await?.into_state_store();
2247                store.test_stripped_member_saving().await
2248            }
2249
2250            #[async_test]
2251            async fn test_power_level_saving() -> TestResult {
2252                let store = get_store().await?.into_state_store();
2253                store.test_power_level_saving().await
2254            }
2255
2256            #[async_test]
2257            async fn test_receipts_saving() -> TestResult {
2258                let store = get_store().await?.into_state_store();
2259                store.test_receipts_saving().await
2260            }
2261
2262            #[async_test]
2263            async fn test_custom_storage() -> TestResult {
2264                let store = get_store().await?.into_state_store();
2265                store.test_custom_storage().await
2266            }
2267
2268            #[async_test]
2269            async fn test_stripped_non_stripped() -> TestResult {
2270                let store = get_store().await?.into_state_store();
2271                store.test_stripped_non_stripped().await
2272            }
2273
2274            #[async_test]
2275            async fn test_room_removal() -> TestResult {
2276                let store = get_store().await?.into_state_store();
2277                store.test_room_removal().await
2278            }
2279
2280            #[async_test]
2281            async fn test_profile_removal() -> TestResult {
2282                let store = get_store().await?.into_state_store();
2283                store.test_profile_removal().await
2284            }
2285
2286            #[async_test]
2287            async fn test_presence_saving() -> TestResult {
2288                let store = get_store().await?.into_state_store();
2289                store.test_presence_saving().await
2290            }
2291
2292            #[async_test]
2293            async fn test_display_names_saving() -> TestResult {
2294                let store = get_store().await?.into_state_store();
2295                store.test_display_names_saving().await
2296            }
2297
2298            #[async_test]
2299            async fn test_send_queue() -> TestResult {
2300                let store = get_store().await?.into_state_store();
2301                store.test_send_queue().await
2302            }
2303
2304            #[async_test]
2305            async fn test_send_queue_priority() -> TestResult {
2306                let store = get_store().await?.into_state_store();
2307                store.test_send_queue_priority().await
2308            }
2309
2310            #[async_test]
2311            async fn test_send_queue_dependents() -> TestResult {
2312                let store = get_store().await?.into_state_store();
2313                store.test_send_queue_dependents().await
2314            }
2315
2316            #[async_test]
2317            async fn test_update_send_queue_dependent() -> TestResult {
2318                let store = get_store().await?.into_state_store();
2319                store.test_update_send_queue_dependent().await
2320            }
2321
2322            #[async_test]
2323            async fn test_get_room_infos() -> TestResult {
2324                let store = get_store().await?.into_state_store();
2325                store.test_get_room_infos().await
2326            }
2327
2328            #[async_test]
2329            async fn test_thread_subscriptions() -> TestResult {
2330                let store = get_store().await?.into_state_store();
2331                store.test_thread_subscriptions().await
2332            }
2333
2334            #[async_test]
2335            async fn test_thread_subscriptions_bulk_upsert() -> TestResult {
2336                let store = get_store().await?.into_state_store();
2337                store.test_thread_subscriptions_bulk_upsert().await
2338            }
2339        }
2340    };
2341}
2342
2343fn user_id() -> &'static UserId {
2344    user_id!("@example:localhost")
2345}
2346
2347fn invited_user_id() -> &'static UserId {
2348    user_id!("@invited:localhost")
2349}
2350
2351fn room_id() -> &'static RoomId {
2352    room_id!("!test:localhost")
2353}
2354
2355fn stripped_room_id() -> &'static RoomId {
2356    room_id!("!stripped:localhost")
2357}
2358
2359fn first_receipt_event_id() -> &'static EventId {
2360    event_id!("$example")
2361}
2362
2363fn topic_event_id() -> &'static EventId {
2364    event_id!("$topic_event")
2365}
2366
2367fn power_level_event() -> Raw<AnySyncStateEvent> {
2368    let content = RoomPowerLevelsEventContent::new(&AuthorizationRules::V1);
2369
2370    let event = json!({
2371        "event_id": "$h29iv0s8:example.com",
2372        "content": content,
2373        "sender": user_id(),
2374        "type": "m.room.power_levels",
2375        "origin_server_ts": 0u64,
2376        "state_key": "",
2377    });
2378
2379    serde_json::from_value(event).unwrap()
2380}
2381
2382fn stripped_membership_event() -> Raw<StrippedRoomMemberEvent> {
2383    custom_stripped_membership_event(user_id())
2384}
2385
2386fn custom_stripped_membership_event(user_id: &UserId) -> Raw<StrippedRoomMemberEvent> {
2387    let ev_json = json!({
2388        "type": "m.room.member",
2389        "content": RoomMemberEventContent::new(MembershipState::Join),
2390        "sender": user_id,
2391        "state_key": user_id,
2392    });
2393
2394    Raw::new(&ev_json).unwrap().cast_unchecked()
2395}
2396
2397fn membership_event() -> Raw<SyncRoomMemberEvent> {
2398    custom_membership_event(user_id(), event_id!("$h29iv0s8:example.com"))
2399}
2400
2401fn custom_membership_event(user_id: &UserId, event_id: &EventId) -> Raw<SyncRoomMemberEvent> {
2402    let ev_json = json!({
2403        "type": "m.room.member",
2404        "content": RoomMemberEventContent::new(MembershipState::Join),
2405        "event_id": event_id,
2406        "origin_server_ts": 198,
2407        "sender": user_id,
2408        "state_key": user_id,
2409    });
2410
2411    Raw::new(&ev_json).unwrap().cast_unchecked()
2412}
2413
2414fn custom_presence_event(user_id: &UserId) -> Raw<PresenceEvent> {
2415    let ev_json = json!({
2416        "content": {
2417            "presence": "online"
2418        },
2419        "sender": user_id,
2420    });
2421
2422    Raw::new(&ev_json).unwrap().cast_unchecked()
2423}