1use std::{
4 collections::{BTreeMap, BTreeSet, HashMap},
5 str::FromStr,
6};
7
8use assert_matches::assert_matches;
9use assert_matches2::assert_let;
10use growable_bloom_filter::GrowableBloomBuilder;
11use matrix_sdk_common::ttl::TtlValue;
12use matrix_sdk_test::{TestResult, event_factory::EventFactory};
13use ruma::{
14 EventId, MilliSecondsSinceUnixEpoch, OwnedUserId, RoomId, TransactionId, UserId,
15 api::{
16 FeatureFlag, MatrixVersion,
17 client::discovery::discover_homeserver::{HomeserverInfo, RtcFocusInfo},
18 },
19 event_id,
20 events::{
21 AnyGlobalAccountDataEvent, AnyMessageLikeEventContent, AnyRoomAccountDataEvent,
22 AnyStrippedStateEvent, AnySyncStateEvent, GlobalAccountDataEventType,
23 RoomAccountDataEventType, StateEventType, SyncStateEvent,
24 presence::PresenceEvent,
25 receipt::{ReceiptThread, ReceiptType},
26 room::{
27 member::{
28 MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent,
29 SyncRoomMemberEvent,
30 },
31 message::RoomMessageEventContent,
32 power_levels::RoomPowerLevelsEventContent,
33 redaction::SyncRoomRedactionEvent,
34 topic::RoomTopicEventContent,
35 },
36 tag::{TagInfo, TagName, Tags, UserTagName},
37 },
38 mxc_uri, owned_event_id, owned_mxc_uri,
39 presence::PresenceState,
40 push::Ruleset,
41 room_id,
42 room_version_rules::AuthorizationRules,
43 serde::Raw,
44 uint, user_id,
45};
46use serde_json::json;
47
48use super::{
49 DependentQueuedRequestKind, DisplayName, DynStateStore, RoomLoadSettings,
50 SupportedVersionsResponse, WellKnownResponse, send_queue::SentRequestKey,
51};
52use crate::{
53 RoomInfo, RoomMemberships, RoomState, StateChanges, StateStoreDataKey, StateStoreDataValue,
54 deserialized_responses::MemberEvent,
55 store::{
56 ChildTransactionId, QueueWedgeError, SerializableEventContent, StateStoreExt,
57 StoredThreadSubscription, ThreadSubscriptionStatus,
58 },
59 utils::RawStateEventWithKeys,
60};
61
62#[allow(async_fn_in_trait)]
67pub trait StateStoreIntegrationTests {
68 async fn populate(&self) -> TestResult;
70 async fn test_topic_redaction(&self) -> TestResult;
72 async fn test_populate_store(&self) -> TestResult;
74 async fn test_member_saving(&self) -> TestResult;
76 async fn test_filter_saving(&self) -> TestResult;
78 async fn test_user_avatar_url_saving(&self) -> TestResult;
80 async fn test_sync_token_saving(&self) -> TestResult;
82 async fn test_utd_hook_manager_data_saving(&self) -> TestResult;
84 async fn test_one_time_key_already_uploaded_data_saving(&self) -> TestResult;
86 async fn test_stripped_member_saving(&self) -> TestResult;
88 async fn test_power_level_saving(&self) -> TestResult;
90 async fn test_receipts_saving(&self) -> TestResult;
92 async fn test_custom_storage(&self) -> TestResult;
94 async fn test_stripped_non_stripped(&self) -> TestResult;
96 async fn test_room_removal(&self) -> TestResult;
98 async fn test_profile_removal(&self) -> TestResult;
100 async fn test_presence_saving(&self) -> TestResult;
102 async fn test_display_names_saving(&self) -> TestResult;
104 async fn test_send_queue(&self) -> TestResult;
106 async fn test_send_queue_priority(&self) -> TestResult;
108 async fn test_send_queue_dependents(&self) -> TestResult;
110 async fn test_update_send_queue_dependent(&self) -> TestResult;
112 async fn test_supported_versions_saving(&self) -> TestResult;
114 async fn test_well_known_saving(&self) -> TestResult;
116 async fn test_get_room_infos(&self) -> TestResult;
118 async fn test_thread_subscriptions(&self) -> TestResult;
120 async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult;
122}
123
124impl StateStoreIntegrationTests for DynStateStore {
125 async fn populate(&self) -> TestResult {
126 let mut changes = StateChanges::default();
127
128 let user_id = user_id();
129 let invited_user_id = invited_user_id();
130 let room_id = room_id();
131 let stripped_room_id = stripped_room_id();
132
133 changes.sync_token = Some("t392-516_47314_0_7_1_1_1_11444_1".to_owned());
134
135 let f = EventFactory::new().sender(user_id);
136 let presence_raw: Raw<PresenceEvent> = f
137 .presence(PresenceState::Online)
138 .avatar_url(mxc_uri!("mxc://localhost/wefuiwegh8742w"))
139 .currently_active(false)
140 .last_active_ago(1)
141 .status_msg("Making cupcakes")
142 .into();
143 let presence_event = presence_raw.deserialize()?;
144 changes.add_presence_event(presence_event, presence_raw);
145
146 let f = EventFactory::new().sender(user_id);
147 let pushrules_raw: Raw<AnyGlobalAccountDataEvent> =
148 f.push_rules(Ruleset::server_default(user_id)).into();
149 let pushrules_event = pushrules_raw.deserialize()?;
150 changes.account_data.insert(pushrules_event.event_type(), pushrules_raw);
151
152 let mut room = RoomInfo::new(room_id, RoomState::Joined);
153 room.mark_as_left();
154
155 let f = EventFactory::new().sender(user_id).room(room_id);
156 let mut tags = Tags::new();
157 tags.insert(TagName::Favorite, TagInfo::new());
158 tags.insert(TagName::User(UserTagName::from_str("u.work").unwrap()), TagInfo::new());
159 let tag_raw: Raw<AnyRoomAccountDataEvent> = f.tag(tags).into();
160 let tag_event = tag_raw.deserialize()?;
161 changes.add_room_account_data(room_id, tag_event, tag_raw);
162
163 let f = EventFactory::new().sender(user_id).room(room_id);
164 let name_raw: Raw<AnySyncStateEvent> = f.room_name("room name").into();
165 let name_event = name_raw.deserialize()?;
166 room.handle_state_event(
167 &mut RawStateEventWithKeys::try_from_raw_state_event(name_raw.clone())
168 .expect("generated state event should be valid"),
169 );
170 changes.add_state_event(room_id, name_event, name_raw);
171
172 let f = EventFactory::new().sender(user_id);
173 let receipt_content = f
174 .room(room_id)
175 .read_receipts()
176 .add(event_id!("$example"), user_id, ReceiptType::Read, ReceiptThread::Unthreaded)
177 .into_content();
178 changes.add_receipts(room_id, receipt_content);
179
180 let topic_event_id = topic_event_id();
181 let topic_raw: Raw<AnySyncStateEvent> = EventFactory::new()
182 .room(room_id)
183 .sender(user_id)
184 .room_topic("😀")
185 .event_id(topic_event_id)
186 .prev_content(RoomTopicEventContent::new("test".to_owned()))
187 .into_raw_sync_state();
188 let topic_event = topic_raw.deserialize()?;
189 room.handle_state_event(
190 &mut RawStateEventWithKeys::try_from_raw_state_event(topic_raw.clone())
191 .expect("generated state event should be valid"),
192 );
193 changes.add_state_event(room_id, topic_event, topic_raw);
194
195 let mut room_ambiguity_map = HashMap::new();
196 let mut room_profiles = BTreeMap::new();
197
198 let f = EventFactory::new().sender(user_id).room(room_id);
199 let member_raw: Raw<SyncRoomMemberEvent> =
200 f.member(user_id).display_name("example").previous(MembershipState::Invite).into();
201 let member_event: SyncRoomMemberEvent = member_raw.deserialize()?;
202 let displayname = DisplayName::new(
203 member_event.as_original().unwrap().content.displayname.as_ref().unwrap(),
204 );
205 room_ambiguity_map.insert(displayname.clone(), BTreeSet::from([user_id.to_owned()]));
206 room_profiles.insert(user_id.to_owned(), (&member_event).into());
207
208 let member_raw: Raw<AnySyncStateEvent> = member_raw.cast_unchecked();
209 let member_state_event = member_raw.deserialize()?;
210 changes.add_state_event(room_id, member_state_event, member_raw);
211
212 let f = EventFactory::new().sender(user_id).room(room_id);
213 let invited_member_raw: Raw<SyncRoomMemberEvent> = f
214 .member(invited_user_id)
215 .invited(invited_user_id)
216 .display_name("example")
217 .avatar_url(mxc_uri!("mxc://localhost/SEsfnsuifSDFSSEF"))
218 .reason("Looking for support")
219 .into();
220 let invited_member_event: SyncRoomMemberEvent = invited_member_raw.deserialize()?;
222 room_ambiguity_map.entry(displayname).or_default().insert(invited_user_id.to_owned());
223 room_profiles.insert(invited_user_id.to_owned(), (&invited_member_event).into());
224
225 let invited_member_raw: Raw<AnySyncStateEvent> = invited_member_raw.cast_unchecked();
226 let invited_member_state_event = invited_member_raw.deserialize()?;
227 changes.add_state_event(room_id, invited_member_state_event, invited_member_raw);
228
229 changes.ambiguity_maps.insert(room_id.to_owned(), room_ambiguity_map);
230 changes.profiles.insert(room_id.to_owned(), room_profiles);
231 changes.add_room(room);
232
233 let mut stripped_room = RoomInfo::new(stripped_room_id, RoomState::Invited);
234
235 let f = EventFactory::new().sender(user_id).room(stripped_room_id);
236 let stripped_name_raw: Raw<AnyStrippedStateEvent> = f.room_name("room name").into();
237 let mut stripped_name_event =
238 RawStateEventWithKeys::try_from_raw_state_event(stripped_name_raw).unwrap();
239 stripped_room.handle_stripped_state_event(&mut stripped_name_event);
240 changes.stripped_state.insert(
241 stripped_room_id.to_owned(),
242 BTreeMap::from([(
243 stripped_name_event.event_type,
244 BTreeMap::from([(stripped_name_event.state_key, stripped_name_event.raw)]),
245 )]),
246 );
247
248 changes.add_room(stripped_room);
249
250 let f = EventFactory::new().sender(user_id).room(stripped_room_id);
251 let stripped_member_raw: Raw<StrippedRoomMemberEvent> =
252 f.member(user_id).display_name("example").into();
253 changes.add_stripped_member(stripped_room_id, user_id, stripped_member_raw);
254
255 self.save_changes(&changes).await?;
256
257 Ok(())
258 }
259
260 async fn test_topic_redaction(&self) -> TestResult {
261 let room_id = room_id();
262 let f = EventFactory::new();
263 self.populate().await?;
264
265 assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
266 assert_eq!(
267 self.get_state_event_static::<RoomTopicEventContent>(room_id)
268 .await?
269 .expect("room topic found before redaction")
270 .deserialize()
271 .expect("can deserialize room topic before redaction")
272 .as_sync()
273 .expect("room topic is a sync state event")
274 .as_original()
275 .expect("room topic is not redacted yet")
276 .content
277 .topic,
278 "😀"
279 );
280
281 let mut changes = StateChanges::default();
282
283 let topic_event_id = topic_event_id();
284 let redaction_evt: Raw<SyncRoomRedactionEvent> =
285 f.room(room_id).sender(user_id()).redaction(topic_event_id).into_raw();
286
287 changes.add_redaction(room_id, topic_event_id, redaction_evt);
288 self.save_changes(&changes).await?;
289
290 let redacted_event = self
291 .get_state_event_static::<RoomTopicEventContent>(room_id)
292 .await?
293 .expect("room topic found after redaction")
294 .deserialize()
295 .expect("can deserialize room topic after redaction");
296
297 assert_matches!(redacted_event.as_sync(), Some(SyncStateEvent::Redacted(_)));
298
299 Ok(())
300 }
301
302 async fn test_populate_store(&self) -> TestResult {
303 let room_id = room_id();
304 let user_id = user_id();
305 let display_name = DisplayName::new("example");
306
307 self.populate().await?;
308
309 assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
310 assert!(self.get_presence_event(user_id).await?.is_some());
311 assert_eq!(
312 self.get_room_infos(&RoomLoadSettings::default()).await?.len(),
313 2,
314 "Expected to find 2 room infos"
315 );
316 assert!(
317 self.get_account_data_event(GlobalAccountDataEventType::PushRules).await?.is_some()
318 );
319
320 assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_some());
321 assert_eq!(
322 self.get_state_events(room_id, StateEventType::RoomTopic).await?.len(),
323 1,
324 "Expected to find 1 room topic"
325 );
326 assert!(self.get_profile(room_id, user_id).await?.is_some());
327 assert!(self.get_member_event(room_id, user_id).await?.is_some());
328 assert_eq!(
329 self.get_user_ids(room_id, RoomMemberships::empty()).await?.len(),
330 2,
331 "Expected to find 2 members for room"
332 );
333 assert_eq!(
334 self.get_user_ids(room_id, RoomMemberships::INVITE).await?.len(),
335 1,
336 "Expected to find 1 invited user ids"
337 );
338 assert_eq!(
339 self.get_user_ids(room_id, RoomMemberships::JOIN).await?.len(),
340 1,
341 "Expected to find 1 joined user ids"
342 );
343 assert_eq!(
344 self.get_users_with_display_name(room_id, &display_name).await?.len(),
345 2,
346 "Expected to find 2 display names for room"
347 );
348 assert!(
349 self.get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
350 .await?
351 .is_some()
352 );
353 assert!(
354 self.get_user_room_receipt_event(
355 room_id,
356 ReceiptType::Read,
357 ReceiptThread::Unthreaded,
358 user_id
359 )
360 .await?
361 .is_some()
362 );
363 assert_eq!(
364 self.get_event_room_receipt_events(
365 room_id,
366 ReceiptType::Read,
367 ReceiptThread::Unthreaded,
368 first_receipt_event_id()
369 )
370 .await?
371 .len(),
372 1,
373 "Expected to find 1 read receipt"
374 );
375 Ok(())
376 }
377
378 async fn test_member_saving(&self) -> TestResult {
379 let room_id = room_id!("!test_member_saving:localhost");
380 let user_id = user_id();
381 let second_user_id = user_id!("@second:localhost");
382 let third_user_id = user_id!("@third:localhost");
383 let unknown_user_id = user_id!("@unknown:localhost");
384
385 let mut user_ids = vec![user_id.to_owned()];
387 assert!(self.get_member_event(room_id, user_id).await?.is_none());
388 let member_events = self
389 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
390 .await;
391 assert!(member_events?.is_empty());
392 assert!(self.get_profile(room_id, user_id).await?.is_none());
393 let profiles = self.get_profiles(room_id, &user_ids).await;
394 assert!(profiles?.is_empty());
395
396 let mut changes = StateChanges::default();
398 let raw_member_event = membership_event();
399 let profile = raw_member_event.deserialize()?.into();
400 changes
401 .state
402 .entry(room_id.to_owned())
403 .or_default()
404 .entry(StateEventType::RoomMember)
405 .or_default()
406 .insert(user_id.into(), raw_member_event.cast());
407 changes.profiles.entry(room_id.to_owned()).or_default().insert(user_id.to_owned(), profile);
408 self.save_changes(&changes).await?;
409
410 assert!(self.get_member_event(room_id, user_id).await?.is_some());
411 let member_events = self
412 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
413 .await;
414 assert_eq!(member_events?.len(), 1);
415 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
416 assert_eq!(members.len(), 1, "We expected to find members for the room");
417 assert!(self.get_profile(room_id, user_id).await?.is_some());
418 let profiles = self.get_profiles(room_id, &user_ids).await;
419 assert_eq!(profiles?.len(), 1);
420
421 let mut changes = StateChanges::default();
423 let changes_members = changes
424 .state
425 .entry(room_id.to_owned())
426 .or_default()
427 .entry(StateEventType::RoomMember)
428 .or_default();
429 let changes_profiles = changes.profiles.entry(room_id.to_owned()).or_default();
430 let raw_second_member_event =
431 custom_membership_event(second_user_id, event_id!("$second_member_event"));
432 let second_profile = raw_second_member_event.deserialize()?.into();
433 changes_members.insert(second_user_id.into(), raw_second_member_event.cast());
434 changes_profiles.insert(second_user_id.to_owned(), second_profile);
435 let raw_third_member_event =
436 custom_membership_event(third_user_id, event_id!("$third_member_event"));
437 let third_profile = raw_third_member_event.deserialize()?.into();
438 changes_members.insert(third_user_id.into(), raw_third_member_event.cast());
439 changes_profiles.insert(third_user_id.to_owned(), third_profile);
440 self.save_changes(&changes).await?;
441
442 user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
443 assert!(self.get_member_event(room_id, second_user_id).await?.is_some());
444 assert!(self.get_member_event(room_id, third_user_id).await?.is_some());
445 let member_events = self
446 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
447 .await;
448 assert_eq!(member_events?.len(), 3);
449 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
450 assert_eq!(members.len(), 3, "We expected to find members for the room");
451 assert!(self.get_profile(room_id, second_user_id).await?.is_some());
452 assert!(self.get_profile(room_id, third_user_id).await?.is_some());
453 let profiles = self.get_profiles(room_id, &user_ids).await;
454 assert_eq!(profiles?.len(), 3);
455
456 user_ids.push(unknown_user_id.to_owned());
458 let member_events = self
459 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
460 .await;
461 assert_eq!(member_events?.len(), 3);
462 let profiles = self.get_profiles(room_id, &user_ids).await;
463 assert_eq!(profiles?.len(), 3);
464
465 let member_events = self
467 .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
468 room_id,
469 &[],
470 )
471 .await;
472 assert!(member_events?.is_empty());
473 let profiles = self.get_profiles(room_id, &[]).await;
474 assert!(profiles?.is_empty());
475
476 Ok(())
477 }
478
479 async fn test_filter_saving(&self) -> TestResult {
480 let filter_name = "filter_name";
481 let filter_id = "filter_id_1234";
482
483 self.set_kv_data(
484 StateStoreDataKey::Filter(filter_name),
485 StateStoreDataValue::Filter(filter_id.to_owned()),
486 )
487 .await?;
488 assert_let!(
489 Ok(Some(StateStoreDataValue::Filter(stored_filter_id))) =
490 self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await
491 );
492 assert_eq!(stored_filter_id, filter_id);
493
494 self.remove_kv_data(StateStoreDataKey::Filter(filter_name)).await?;
495 assert_matches!(self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await, Ok(None));
496
497 Ok(())
498 }
499
500 async fn test_user_avatar_url_saving(&self) -> TestResult {
501 let user_id = user_id!("@alice:example.org");
502 let url = owned_mxc_uri!("mxc://example.org/poiuyt098");
503
504 self.set_kv_data(
505 StateStoreDataKey::UserAvatarUrl(user_id),
506 StateStoreDataValue::UserAvatarUrl(url.clone()),
507 )
508 .await?;
509
510 assert_let!(
511 Ok(Some(StateStoreDataValue::UserAvatarUrl(stored_url))) =
512 self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await
513 );
514 assert_eq!(stored_url, url);
515
516 self.remove_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await?;
517 assert_matches!(
518 self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await,
519 Ok(None)
520 );
521
522 Ok(())
523 }
524
525 async fn test_supported_versions_saving(&self) -> TestResult {
526 let versions =
527 BTreeSet::from([MatrixVersion::V1_1, MatrixVersion::V1_2, MatrixVersion::V1_11]);
528 let supported_versions = SupportedVersionsResponse {
529 versions: versions.iter().map(|version| version.as_str().unwrap().to_owned()).collect(),
530 unstable_features: [("org.matrix.experimental".to_owned(), true)].into(),
531 };
532
533 self.set_kv_data(
534 StateStoreDataKey::SupportedVersions,
535 StateStoreDataValue::SupportedVersions(TtlValue::new(supported_versions.clone())),
536 )
537 .await?;
538
539 assert_let!(
540 Ok(Some(StateStoreDataValue::SupportedVersions(stored_supported_versions))) =
541 self.get_kv_data(StateStoreDataKey::SupportedVersions).await
542 );
543 let stored_supported_versions = stored_supported_versions.into_data();
544 assert_eq!(supported_versions, stored_supported_versions);
545
546 let stored_supported = stored_supported_versions.supported_versions();
547 assert_eq!(stored_supported.versions, versions);
548 assert_eq!(stored_supported.features.len(), 1);
549 assert!(stored_supported.features.contains(&FeatureFlag::from("org.matrix.experimental")));
550
551 self.remove_kv_data(StateStoreDataKey::SupportedVersions).await?;
552 assert_matches!(self.get_kv_data(StateStoreDataKey::SupportedVersions).await, Ok(None));
553
554 Ok(())
555 }
556
557 async fn test_well_known_saving(&self) -> TestResult {
558 let well_known = WellKnownResponse {
559 homeserver: HomeserverInfo::new("matrix.example.com".to_owned()),
560 identity_server: None,
561 tile_server: None,
562 rtc_foci: vec![RtcFocusInfo::livekit("livekit.example.com".to_owned())],
563 };
564
565 self.set_kv_data(
566 StateStoreDataKey::WellKnown,
567 StateStoreDataValue::WellKnown(TtlValue::new(Some(well_known.clone()))),
568 )
569 .await?;
570
571 assert_let!(
572 Ok(Some(StateStoreDataValue::WellKnown(stored_well_known))) =
573 self.get_kv_data(StateStoreDataKey::WellKnown).await
574 );
575 let stored_well_known = stored_well_known.into_data();
576 assert_eq!(stored_well_known, Some(well_known));
577
578 self.remove_kv_data(StateStoreDataKey::WellKnown).await?;
579 assert_matches!(self.get_kv_data(StateStoreDataKey::WellKnown).await, Ok(None));
580
581 self.set_kv_data(
582 StateStoreDataKey::WellKnown,
583 StateStoreDataValue::WellKnown(TtlValue::new(None)),
584 )
585 .await?;
586
587 assert_let!(
588 Ok(Some(StateStoreDataValue::WellKnown(stored_well_known))) =
589 self.get_kv_data(StateStoreDataKey::WellKnown).await
590 );
591 let stored_well_known = stored_well_known.into_data();
592 assert_eq!(stored_well_known, None);
593
594 Ok(())
595 }
596
597 async fn test_sync_token_saving(&self) -> TestResult {
598 let sync_token_1 = "t392-516_47314_0_7_1";
599 let sync_token_2 = "t392-516_47314_0_7_2";
600
601 assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
602
603 let changes =
604 StateChanges { sync_token: Some(sync_token_1.to_owned()), ..Default::default() };
605 self.save_changes(&changes).await?;
606 assert_let!(
607 Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
608 self.get_kv_data(StateStoreDataKey::SyncToken).await
609 );
610 assert_eq!(stored_sync_token, sync_token_1);
611
612 self.set_kv_data(
613 StateStoreDataKey::SyncToken,
614 StateStoreDataValue::SyncToken(sync_token_2.to_owned()),
615 )
616 .await?;
617 assert_let!(
618 Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
619 self.get_kv_data(StateStoreDataKey::SyncToken).await
620 );
621 assert_eq!(stored_sync_token, sync_token_2);
622
623 self.remove_kv_data(StateStoreDataKey::SyncToken).await?;
624 assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
625
626 Ok(())
627 }
628
629 async fn test_utd_hook_manager_data_saving(&self) -> TestResult {
630 assert!(
632 self.get_kv_data(StateStoreDataKey::UtdHookManagerData)
633 .await
634 .expect("Could not read data")
635 .is_none(),
636 "Store was not empty at start"
637 );
638
639 let data = GrowableBloomBuilder::new().build();
641 self.set_kv_data(
642 StateStoreDataKey::UtdHookManagerData,
643 StateStoreDataValue::UtdHookManagerData(data.clone()),
644 )
645 .await
646 .expect("Could not save data");
647
648 let read_data = self
650 .get_kv_data(StateStoreDataKey::UtdHookManagerData)
651 .await
652 .expect("Could not read data")
653 .expect("no data found")
654 .into_utd_hook_manager_data()
655 .expect("not UtdHookManagerData");
656
657 assert_eq!(read_data, data);
658
659 Ok(())
660 }
661
662 async fn test_one_time_key_already_uploaded_data_saving(&self) -> TestResult {
663 assert!(
665 self.get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded).await?.is_none(),
666 "Store was not empty at start"
667 );
668
669 self.set_kv_data(
670 StateStoreDataKey::OneTimeKeyAlreadyUploaded,
671 StateStoreDataValue::OneTimeKeyAlreadyUploaded,
672 )
673 .await?;
674
675 let data = self.get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded).await?;
676 data.expect("The loaded data should be Some");
677
678 Ok(())
679 }
680
681 async fn test_stripped_member_saving(&self) -> TestResult {
682 let room_id = room_id!("!test_stripped_member_saving:localhost");
683 let user_id = user_id();
684 let second_user_id = user_id!("@second:localhost");
685 let third_user_id = user_id!("@third:localhost");
686 let unknown_user_id = user_id!("@unknown:localhost");
687
688 assert!(self.get_member_event(room_id, user_id).await?.is_none());
690 let member_events = self
691 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
692 room_id,
693 &[user_id.to_owned()],
694 )
695 .await;
696 assert!(member_events?.is_empty());
697
698 let mut changes = StateChanges::default();
700 changes
701 .stripped_state
702 .entry(room_id.to_owned())
703 .or_default()
704 .entry(StateEventType::RoomMember)
705 .or_default()
706 .insert(user_id.into(), stripped_membership_event().cast());
707 self.save_changes(&changes).await?;
708
709 assert!(self.get_member_event(room_id, user_id).await?.is_some());
710 let member_events = self
711 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
712 room_id,
713 &[user_id.to_owned()],
714 )
715 .await;
716 assert_eq!(member_events?.len(), 1);
717 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
718 assert_eq!(members.len(), 1, "We expected to find members for the room");
719
720 let mut changes = StateChanges::default();
722 let changes_members = changes
723 .stripped_state
724 .entry(room_id.to_owned())
725 .or_default()
726 .entry(StateEventType::RoomMember)
727 .or_default();
728 changes_members
729 .insert(second_user_id.into(), custom_stripped_membership_event(second_user_id).cast());
730 changes_members
731 .insert(third_user_id.into(), custom_stripped_membership_event(third_user_id).cast());
732 self.save_changes(&changes).await?;
733
734 assert!(self.get_member_event(room_id, second_user_id).await?.is_some());
735 assert!(self.get_member_event(room_id, third_user_id).await?.is_some());
736 let member_events = self
737 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
738 room_id,
739 &[user_id.to_owned(), second_user_id.to_owned(), third_user_id.to_owned()],
740 )
741 .await;
742 assert_eq!(member_events?.len(), 3);
743 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
744 assert_eq!(members.len(), 3, "We expected to find members for the room");
745
746 let member_events = self
748 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
749 room_id,
750 &[
751 user_id.to_owned(),
752 second_user_id.to_owned(),
753 third_user_id.to_owned(),
754 unknown_user_id.to_owned(),
755 ],
756 )
757 .await;
758 assert_eq!(member_events?.len(), 3);
759
760 let member_events = self
762 .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
763 room_id,
764 &[],
765 )
766 .await;
767 assert!(member_events?.is_empty());
768
769 Ok(())
770 }
771
772 async fn test_power_level_saving(&self) -> TestResult {
773 let room_id = room_id!("!test_power_level_saving:localhost");
774
775 let raw_event = power_level_event();
776 let event = raw_event.deserialize()?;
777
778 assert!(
779 self.get_state_event(room_id, StateEventType::RoomPowerLevels, "").await?.is_none()
780 );
781 let mut changes = StateChanges::default();
782 changes.add_state_event(room_id, event, raw_event);
783
784 self.save_changes(&changes).await?;
785 assert!(
786 self.get_state_event(room_id, StateEventType::RoomPowerLevels, "").await?.is_some()
787 );
788
789 Ok(())
790 }
791
792 async fn test_receipts_saving(&self) -> TestResult {
793 let room_id = room_id!("!test_receipts_saving:localhost");
794
795 let first_event_id = event_id!("$1435641916114394fHBLK:matrix.org");
796 let second_event_id = event_id!("$fHBLK1435641916114394:matrix.org");
797
798 let first_receipt_ts = uint!(1436451550);
799 let second_receipt_ts = uint!(1436451653);
800 let third_receipt_ts = uint!(1436474532);
801
802 let first_receipt_event = serde_json::from_value(json!({
803 first_event_id: {
804 "m.read": {
805 user_id(): {
806 "ts": first_receipt_ts,
807 }
808 }
809 }
810 }))?;
811
812 let second_receipt_event = serde_json::from_value(json!({
813 second_event_id: {
814 "m.read": {
815 user_id(): {
816 "ts": second_receipt_ts,
817 }
818 }
819 }
820 }))?;
821
822 let third_receipt_event = serde_json::from_value(json!({
823 second_event_id: {
824 "m.read": {
825 user_id(): {
826 "ts": third_receipt_ts,
827 "thread_id": "main",
828 }
829 }
830 }
831 }))?;
832
833 assert!(
834 self.get_user_room_receipt_event(
835 room_id,
836 ReceiptType::Read,
837 ReceiptThread::Unthreaded,
838 user_id()
839 )
840 .await
841 .expect("failed to read unthreaded user room receipt")
842 .is_none()
843 );
844 assert!(
845 self.get_event_room_receipt_events(
846 room_id,
847 ReceiptType::Read,
848 ReceiptThread::Unthreaded,
849 first_event_id
850 )
851 .await
852 .expect("failed to read unthreaded event room receipt for 1")
853 .is_empty()
854 );
855 assert!(
856 self.get_event_room_receipt_events(
857 room_id,
858 ReceiptType::Read,
859 ReceiptThread::Unthreaded,
860 second_event_id
861 )
862 .await
863 .expect("failed to read unthreaded event room receipt for 2")
864 .is_empty()
865 );
866
867 let mut changes = StateChanges::default();
868 changes.add_receipts(room_id, first_receipt_event);
869
870 self.save_changes(&changes).await?;
871 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
872 .get_user_room_receipt_event(
873 room_id,
874 ReceiptType::Read,
875 ReceiptThread::Unthreaded,
876 user_id(),
877 )
878 .await
879 .expect("failed to read unthreaded user room receipt after save")
880 .unwrap();
881 assert_eq!(unthreaded_user_receipt_event_id, first_event_id);
882 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, first_receipt_ts);
883 let first_event_unthreaded_receipts = self
884 .get_event_room_receipt_events(
885 room_id,
886 ReceiptType::Read,
887 ReceiptThread::Unthreaded,
888 first_event_id,
889 )
890 .await
891 .expect("failed to read unthreaded event room receipt for 1 after save");
892 assert_eq!(
893 first_event_unthreaded_receipts.len(),
894 1,
895 "Found a wrong number of unthreaded receipts for 1 after save"
896 );
897 assert_eq!(first_event_unthreaded_receipts[0].0, user_id());
898 assert_eq!(first_event_unthreaded_receipts[0].1.ts.unwrap().0, first_receipt_ts);
899 assert!(
900 self.get_event_room_receipt_events(
901 room_id,
902 ReceiptType::Read,
903 ReceiptThread::Unthreaded,
904 second_event_id
905 )
906 .await
907 .expect("failed to read unthreaded event room receipt for 2 after save")
908 .is_empty()
909 );
910
911 let mut changes = StateChanges::default();
912 changes.add_receipts(room_id, second_receipt_event);
913
914 self.save_changes(&changes).await.expect("Saving works");
915 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
916 .get_user_room_receipt_event(
917 room_id,
918 ReceiptType::Read,
919 ReceiptThread::Unthreaded,
920 user_id(),
921 )
922 .await
923 .expect("Getting unthreaded user room receipt after save failed")
924 .unwrap();
925 assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
926 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
927 assert!(
928 self.get_event_room_receipt_events(
929 room_id,
930 ReceiptType::Read,
931 ReceiptThread::Unthreaded,
932 first_event_id
933 )
934 .await
935 .expect("Getting unthreaded event room receipt events for first event failed")
936 .is_empty()
937 );
938 let second_event_unthreaded_receipts = self
939 .get_event_room_receipt_events(
940 room_id,
941 ReceiptType::Read,
942 ReceiptThread::Unthreaded,
943 second_event_id,
944 )
945 .await
946 .expect("Getting unthreaded event room receipt events for second event failed");
947 assert_eq!(
948 second_event_unthreaded_receipts.len(),
949 1,
950 "Found a wrong number of unthreaded receipts for second event after save"
951 );
952 assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
953 assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
954
955 assert!(
956 self.get_user_room_receipt_event(
957 room_id,
958 ReceiptType::Read,
959 ReceiptThread::Main,
960 user_id()
961 )
962 .await
963 .expect("failed to read threaded user room receipt")
964 .is_none()
965 );
966 assert!(
967 self.get_event_room_receipt_events(
968 room_id,
969 ReceiptType::Read,
970 ReceiptThread::Main,
971 second_event_id
972 )
973 .await
974 .expect("Getting threaded event room receipts for 2 failed")
975 .is_empty()
976 );
977
978 let mut changes = StateChanges::default();
979 changes.add_receipts(room_id, third_receipt_event);
980
981 self.save_changes(&changes).await.expect("Saving works");
982 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
984 .get_user_room_receipt_event(
985 room_id,
986 ReceiptType::Read,
987 ReceiptThread::Unthreaded,
988 user_id(),
989 )
990 .await
991 .expect("Getting unthreaded user room receipt after save failed")
992 .unwrap();
993 assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
994 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
995 let second_event_unthreaded_receipts = self
996 .get_event_room_receipt_events(
997 room_id,
998 ReceiptType::Read,
999 ReceiptThread::Unthreaded,
1000 second_event_id,
1001 )
1002 .await
1003 .expect("Getting unthreaded event room receipt events for second event failed");
1004 assert_eq!(
1005 second_event_unthreaded_receipts.len(),
1006 1,
1007 "Found a wrong number of unthreaded receipts for second event after save"
1008 );
1009 assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
1010 assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
1011 let (threaded_user_receipt_event_id, threaded_user_receipt) = self
1013 .get_user_room_receipt_event(room_id, ReceiptType::Read, ReceiptThread::Main, user_id())
1014 .await
1015 .expect("Getting threaded user room receipt after save failed")
1016 .unwrap();
1017 assert_eq!(threaded_user_receipt_event_id, second_event_id);
1018 assert_eq!(threaded_user_receipt.ts.unwrap().0, third_receipt_ts);
1019 let second_event_threaded_receipts = self
1020 .get_event_room_receipt_events(
1021 room_id,
1022 ReceiptType::Read,
1023 ReceiptThread::Main,
1024 second_event_id,
1025 )
1026 .await
1027 .expect("Getting threaded event room receipt events for second event failed");
1028 assert_eq!(
1029 second_event_threaded_receipts.len(),
1030 1,
1031 "Found a wrong number of threaded receipts for second event after save"
1032 );
1033 assert_eq!(second_event_threaded_receipts[0].0, user_id());
1034 assert_eq!(second_event_threaded_receipts[0].1.ts.unwrap().0, third_receipt_ts);
1035
1036 Ok(())
1037 }
1038
1039 async fn test_custom_storage(&self) -> TestResult {
1040 let key = "my_key";
1041 let value = &[0, 1, 2, 3];
1042
1043 self.set_custom_value(key.as_bytes(), value.to_vec()).await?;
1044
1045 let read = self.get_custom_value(key.as_bytes()).await?;
1046
1047 assert_eq!(Some(value.as_ref()), read.as_deref());
1048
1049 Ok(())
1050 }
1051
1052 async fn test_stripped_non_stripped(&self) -> TestResult {
1053 let room_id = room_id!("!test_stripped_non_stripped:localhost");
1054 let user_id = user_id();
1055
1056 assert!(self.get_member_event(room_id, user_id).await?.is_none());
1057 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 0);
1058
1059 let mut changes = StateChanges::default();
1060 changes
1061 .state
1062 .entry(room_id.to_owned())
1063 .or_default()
1064 .entry(StateEventType::RoomMember)
1065 .or_default()
1066 .insert(user_id.into(), membership_event().cast());
1067 changes.add_room(RoomInfo::new(room_id, RoomState::Left));
1068 self.save_changes(&changes).await?;
1069
1070 let member_event = self.get_member_event(room_id, user_id).await?.unwrap().deserialize()?;
1071 assert!(matches!(member_event, MemberEvent::Sync(_)));
1072 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 1);
1073
1074 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
1075 assert_eq!(members, vec![user_id.to_owned()]);
1076
1077 let mut changes = StateChanges::default();
1078 changes.add_stripped_member(room_id, user_id, custom_stripped_membership_event(user_id));
1079 changes.add_room(RoomInfo::new(room_id, RoomState::Invited));
1080 self.save_changes(&changes).await?;
1081
1082 let member_event = self.get_member_event(room_id, user_id).await?.unwrap().deserialize()?;
1083 assert!(matches!(member_event, MemberEvent::Stripped(_)));
1084 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 1);
1085
1086 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
1087 assert_eq!(members, vec![user_id.to_owned()]);
1088
1089 Ok(())
1090 }
1091
1092 async fn test_room_removal(&self) -> TestResult {
1093 let room_id = room_id();
1094 let user_id = user_id();
1095 let display_name = DisplayName::new("example");
1096 let stripped_room_id = stripped_room_id();
1097
1098 self.populate().await?;
1099
1100 {
1101 let txn = TransactionId::new();
1103 let ev =
1104 SerializableEventContent::new(&RoomMessageEventContent::text_plain("sup").into())?;
1105 self.save_send_queue_request(
1106 room_id,
1107 txn.clone(),
1108 MilliSecondsSinceUnixEpoch::now(),
1109 ev.into(),
1110 0,
1111 )
1112 .await?;
1113
1114 self.save_dependent_queued_request(
1116 room_id,
1117 &txn,
1118 ChildTransactionId::new(),
1119 MilliSecondsSinceUnixEpoch::now(),
1120 DependentQueuedRequestKind::RedactEvent,
1121 )
1122 .await?;
1123 }
1124
1125 self.remove_room(room_id).await?;
1126
1127 assert_eq!(
1128 self.get_room_infos(&RoomLoadSettings::default()).await?.len(),
1129 1,
1130 "room is still there"
1131 );
1132
1133 assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_none());
1134 assert!(
1135 self.get_state_events(room_id, StateEventType::RoomTopic).await?.is_empty(),
1136 "still state events found"
1137 );
1138 assert!(self.get_profile(room_id, user_id).await?.is_none());
1139 assert!(self.get_member_event(room_id, user_id).await?.is_none());
1140 assert!(
1141 self.get_user_ids(room_id, RoomMemberships::empty()).await?.is_empty(),
1142 "still user ids found"
1143 );
1144 assert!(
1145 self.get_user_ids(room_id, RoomMemberships::INVITE).await?.is_empty(),
1146 "still invited user ids found"
1147 );
1148 assert!(
1149 self.get_user_ids(room_id, RoomMemberships::JOIN).await?.is_empty(),
1150 "still joined users found"
1151 );
1152 assert!(
1153 self.get_users_with_display_name(room_id, &display_name).await?.is_empty(),
1154 "still display names found"
1155 );
1156 assert!(
1157 self.get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
1158 .await?
1159 .is_none()
1160 );
1161 assert!(
1162 self.get_user_room_receipt_event(
1163 room_id,
1164 ReceiptType::Read,
1165 ReceiptThread::Unthreaded,
1166 user_id
1167 )
1168 .await?
1169 .is_none()
1170 );
1171 assert!(
1172 self.get_event_room_receipt_events(
1173 room_id,
1174 ReceiptType::Read,
1175 ReceiptThread::Unthreaded,
1176 first_receipt_event_id()
1177 )
1178 .await?
1179 .is_empty(),
1180 "still event recepts in the store"
1181 );
1182 assert!(self.load_send_queue_requests(room_id).await?.is_empty());
1183 assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1184
1185 self.remove_room(stripped_room_id).await?;
1186
1187 assert!(
1188 self.get_room_infos(&RoomLoadSettings::default()).await?.is_empty(),
1189 "still room info found"
1190 );
1191 Ok(())
1192 }
1193
1194 async fn test_profile_removal(&self) -> TestResult {
1195 let room_id = room_id();
1196
1197 let user_id = user_id();
1199 let invited_user_id = invited_user_id();
1200
1201 self.populate().await?;
1202
1203 let new_invite_member_json = json!({
1204 "content": {
1205 "avatar_url": "mxc://localhost/SEsfnsuifSDFSSEG",
1206 "displayname": "example after update",
1207 "membership": "invite",
1208 "reason": "Looking for support"
1209 },
1210 "event_id": "$143273582443PhrSm:localhost",
1211 "origin_server_ts": 1432735824,
1212 "room_id": room_id,
1213 "sender": user_id,
1214 "state_key": invited_user_id,
1215 "type": "m.room.member",
1216 });
1217 let new_invite_member_event: SyncRoomMemberEvent =
1218 serde_json::from_value(new_invite_member_json.clone())?;
1219
1220 let mut changes = StateChanges {
1221 profiles_to_delete: [(
1223 room_id.to_owned(),
1224 vec![user_id.to_owned(), invited_user_id.to_owned()],
1225 )]
1226 .into(),
1227
1228 profiles: {
1230 let mut map = BTreeMap::default();
1231 map.insert(
1232 room_id.to_owned(),
1233 [(invited_user_id.to_owned(), new_invite_member_event.into())]
1234 .into_iter()
1235 .collect(),
1236 );
1237 map
1238 },
1239
1240 ..StateChanges::default()
1241 };
1242
1243 let raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(new_invite_member_json)
1244 .expect("can create sync-state-event for topic");
1245 let event = raw.deserialize()?;
1246 changes.add_state_event(room_id, event, raw);
1247
1248 self.save_changes(&changes).await?;
1249
1250 assert!(self.get_profile(room_id, user_id).await?.is_none());
1252 assert!(self.get_member_event(room_id, user_id).await?.is_some());
1253
1254 let invited_member_event = self.get_profile(room_id, invited_user_id).await?.unwrap();
1256 assert_eq!(
1257 invited_member_event.content.displayname.as_deref(),
1258 Some("example after update")
1259 );
1260 assert!(self.get_member_event(room_id, invited_user_id).await?.is_some());
1261
1262 Ok(())
1263 }
1264
1265 async fn test_presence_saving(&self) -> TestResult {
1266 let user_id = user_id();
1267 let second_user_id = user_id!("@second:localhost");
1268 let third_user_id = user_id!("@third:localhost");
1269 let unknown_user_id = user_id!("@unknown:localhost");
1270
1271 let mut user_ids = vec![user_id.to_owned()];
1273 let presence_event = self.get_presence_event(user_id).await;
1274 assert!(presence_event?.is_none());
1275 let presence_events = self.get_presence_events(&user_ids).await;
1276 assert!(presence_events?.is_empty());
1277
1278 let mut changes = StateChanges::default();
1280 changes.presence.insert(user_id.to_owned(), custom_presence_event(user_id));
1281 self.save_changes(&changes).await?;
1282
1283 let presence_event = self.get_presence_event(user_id).await;
1284 assert!(presence_event?.is_some());
1285 let presence_events = self.get_presence_events(&user_ids).await;
1286 assert_eq!(presence_events?.len(), 1);
1287
1288 let mut changes = StateChanges::default();
1290 changes.presence.insert(second_user_id.to_owned(), custom_presence_event(second_user_id));
1291 changes.presence.insert(third_user_id.to_owned(), custom_presence_event(third_user_id));
1292 self.save_changes(&changes).await?;
1293
1294 user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
1295 let presence_event = self.get_presence_event(second_user_id).await;
1296 assert!(presence_event?.is_some());
1297 let presence_event = self.get_presence_event(third_user_id).await;
1298 assert!(presence_event?.is_some());
1299 let presence_events = self.get_presence_events(&user_ids).await;
1300 assert_eq!(presence_events?.len(), 3);
1301
1302 user_ids.push(unknown_user_id.to_owned());
1304 let member_events = self.get_presence_events(&user_ids).await;
1305 assert_eq!(member_events?.len(), 3);
1306
1307 let presence_events = self.get_presence_events(&[]).await;
1309 assert!(presence_events?.is_empty());
1310
1311 Ok(())
1312 }
1313
1314 async fn test_display_names_saving(&self) -> TestResult {
1315 let room_id = room_id!("!test_display_names_saving:localhost");
1316 let user_id = user_id();
1317 let user_display_name = DisplayName::new("User");
1318 let second_user_id = user_id!("@second:localhost");
1319 let third_user_id = user_id!("@third:localhost");
1320 let other_display_name = DisplayName::new("Raoul");
1321 let unknown_display_name = DisplayName::new("Unknown");
1322
1323 let mut display_names = vec![user_display_name.to_owned()];
1325 let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1326 assert!(users.is_empty());
1327 let names = self.get_users_with_display_names(room_id, &display_names).await?;
1328 assert!(names.is_empty());
1329
1330 let mut changes = StateChanges::default();
1332 changes
1333 .ambiguity_maps
1334 .entry(room_id.to_owned())
1335 .or_default()
1336 .insert(user_display_name.to_owned(), [user_id.to_owned()].into());
1337 self.save_changes(&changes).await?;
1338
1339 let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1340 assert_eq!(users.len(), 1);
1341 let names = self.get_users_with_display_names(room_id, &display_names).await?;
1342 assert_eq!(names.len(), 1);
1343 assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1344
1345 let mut changes = StateChanges::default();
1347 changes.ambiguity_maps.entry(room_id.to_owned()).or_default().insert(
1348 other_display_name.to_owned(),
1349 [second_user_id.to_owned(), third_user_id.to_owned()].into(),
1350 );
1351 self.save_changes(&changes).await?;
1352
1353 display_names.push(other_display_name.to_owned());
1354 let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1355 assert_eq!(users.len(), 1);
1356 let users = self.get_users_with_display_name(room_id, &other_display_name).await?;
1357 assert_eq!(users.len(), 2);
1358 let names = self.get_users_with_display_names(room_id, &display_names).await?;
1359 assert_eq!(names.len(), 2);
1360 assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1361 assert_eq!(names.get(&other_display_name).unwrap().len(), 2);
1362
1363 display_names.push(unknown_display_name.to_owned());
1365 let names = self.get_users_with_display_names(room_id, &display_names).await?;
1366 assert_eq!(names.len(), 2);
1367
1368 let names = self.get_users_with_display_names(room_id, &[]).await?;
1370 assert!(names.is_empty());
1371
1372 Ok(())
1373 }
1374
1375 #[allow(clippy::needless_range_loop)]
1376 async fn test_send_queue(&self) -> TestResult {
1377 let room_id = room_id!("!test_send_queue:localhost");
1378
1379 let events = self.load_send_queue_requests(room_id).await?;
1381 assert!(events.is_empty());
1382
1383 let txn0 = TransactionId::new();
1385 let event0 =
1386 SerializableEventContent::new(&RoomMessageEventContent::text_plain("msg0").into())?;
1387 self.save_send_queue_request(
1388 room_id,
1389 txn0.clone(),
1390 MilliSecondsSinceUnixEpoch::now(),
1391 event0.into(),
1392 0,
1393 )
1394 .await?;
1395
1396 let pending = self.load_send_queue_requests(room_id).await?;
1398
1399 assert_eq!(pending.len(), 1);
1400 {
1401 assert_eq!(pending[0].transaction_id, txn0);
1402
1403 let deserialized = pending[0].as_event().unwrap().deserialize()?;
1404 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1405 assert_eq!(content.body(), "msg0");
1406
1407 assert!(!pending[0].is_wedged());
1408 }
1409
1410 for i in 1..=3 {
1412 let txn = TransactionId::new();
1413 let event = SerializableEventContent::new(
1414 &RoomMessageEventContent::text_plain(format!("msg{i}")).into(),
1415 )?;
1416
1417 self.save_send_queue_request(
1418 room_id,
1419 txn,
1420 MilliSecondsSinceUnixEpoch::now(),
1421 event.into(),
1422 0,
1423 )
1424 .await?;
1425 }
1426
1427 let pending = self.load_send_queue_requests(room_id).await?;
1429
1430 assert_eq!(pending.len(), 4);
1432
1433 assert_eq!(pending[0].transaction_id, txn0);
1434
1435 for i in 0..4 {
1436 let deserialized = pending[i].as_event().unwrap().deserialize()?;
1437 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1438 assert_eq!(content.body(), format!("msg{i}"));
1439 assert!(!pending[i].is_wedged());
1440 }
1441
1442 let txn2 = &pending[2].transaction_id;
1444 self.update_send_queue_request_status(
1445 room_id,
1446 txn2,
1447 Some(QueueWedgeError::GenericApiError { msg: "Oops".to_owned() }),
1448 )
1449 .await?;
1450
1451 let pending = self.load_send_queue_requests(room_id).await?;
1453
1454 assert_eq!(pending.len(), 4);
1456 assert_eq!(pending[0].transaction_id, txn0);
1457 assert_eq!(pending[2].transaction_id, *txn2);
1458 assert!(pending[2].is_wedged());
1459 let error = pending[2].clone().error.unwrap();
1460 let generic_error = assert_matches!(error, QueueWedgeError::GenericApiError { msg } => msg);
1461 assert_eq!(generic_error, "Oops");
1462 for i in 0..4 {
1463 if i != 2 {
1464 assert!(!pending[i].is_wedged());
1465 }
1466 }
1467
1468 let event0 = SerializableEventContent::new(
1470 &RoomMessageEventContent::text_plain("wow that's a cool test").into(),
1471 )?;
1472 self.update_send_queue_request(room_id, txn2, event0.into()).await?;
1473
1474 let pending = self.load_send_queue_requests(room_id).await?;
1476
1477 assert_eq!(pending.len(), 4);
1478 {
1479 assert_eq!(pending[2].transaction_id, *txn2);
1480
1481 let deserialized = pending[2].as_event().unwrap().deserialize()?;
1482 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1483 assert_eq!(content.body(), "wow that's a cool test");
1484
1485 assert!(!pending[2].is_wedged());
1486
1487 for i in 0..4 {
1488 if i != 2 {
1489 let deserialized = pending[i].as_event().unwrap().deserialize()?;
1490 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1491 assert_eq!(content.body(), format!("msg{i}"));
1492
1493 assert!(!pending[i].is_wedged());
1494 }
1495 }
1496 }
1497
1498 self.remove_send_queue_request(room_id, &txn0).await?;
1500
1501 let pending = self.load_send_queue_requests(room_id).await?;
1503
1504 assert_eq!(pending.len(), 3);
1505 assert_eq!(pending[1].transaction_id, *txn2);
1506 for i in 0..3 {
1507 assert_ne!(pending[i].transaction_id, txn0);
1508 }
1509
1510 let room_id2 = room_id!("!test_send_queue_two:localhost");
1515 {
1516 let txn = TransactionId::new();
1517 let event = SerializableEventContent::new(
1518 &RoomMessageEventContent::text_plain("room2").into(),
1519 )?;
1520 self.save_send_queue_request(
1521 room_id2,
1522 txn.clone(),
1523 MilliSecondsSinceUnixEpoch::now(),
1524 event.into(),
1525 0,
1526 )
1527 .await?;
1528 }
1529
1530 {
1532 let room_id3 = room_id!("!test_send_queue_three:localhost");
1533 let txn = TransactionId::new();
1534 let event = SerializableEventContent::new(
1535 &RoomMessageEventContent::text_plain("room3").into(),
1536 )?;
1537 self.save_send_queue_request(
1538 room_id3,
1539 txn.clone(),
1540 MilliSecondsSinceUnixEpoch::now(),
1541 event.into(),
1542 0,
1543 )
1544 .await?;
1545
1546 self.remove_send_queue_request(room_id3, &txn).await?;
1547 }
1548
1549 let outstanding_rooms = self.load_rooms_with_unsent_requests().await?;
1552 assert_eq!(outstanding_rooms.len(), 2);
1553 assert!(outstanding_rooms.iter().any(|room| room == room_id));
1554 assert!(outstanding_rooms.iter().any(|room| room == room_id2));
1555
1556 Ok(())
1557 }
1558
1559 async fn test_send_queue_priority(&self) -> TestResult {
1560 let room_id = room_id!("!test_send_queue:localhost");
1561
1562 let events = self.load_send_queue_requests(room_id).await?;
1564 assert!(events.is_empty());
1565
1566 let low0_txn = TransactionId::new();
1568 let ev0 =
1569 SerializableEventContent::new(&RoomMessageEventContent::text_plain("low0").into())?;
1570 self.save_send_queue_request(
1571 room_id,
1572 low0_txn.clone(),
1573 MilliSecondsSinceUnixEpoch::now(),
1574 ev0.into(),
1575 2,
1576 )
1577 .await?;
1578
1579 let high_txn = TransactionId::new();
1581 let ev1 =
1582 SerializableEventContent::new(&RoomMessageEventContent::text_plain("high").into())?;
1583 self.save_send_queue_request(
1584 room_id,
1585 high_txn.clone(),
1586 MilliSecondsSinceUnixEpoch::now(),
1587 ev1.into(),
1588 10,
1589 )
1590 .await?;
1591
1592 let low1_txn = TransactionId::new();
1594 let ev2 =
1595 SerializableEventContent::new(&RoomMessageEventContent::text_plain("low1").into())?;
1596 self.save_send_queue_request(
1597 room_id,
1598 low1_txn.clone(),
1599 MilliSecondsSinceUnixEpoch::now(),
1600 ev2.into(),
1601 2,
1602 )
1603 .await?;
1604
1605 let pending = self.load_send_queue_requests(room_id).await?;
1608
1609 assert_eq!(pending.len(), 3);
1610 {
1611 assert_eq!(pending[0].transaction_id, high_txn);
1612
1613 let deserialized = pending[0].as_event().unwrap().deserialize()?;
1614 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1615 assert_eq!(content.body(), "high");
1616 }
1617
1618 {
1619 assert_eq!(pending[1].transaction_id, low0_txn);
1620
1621 let deserialized = pending[1].as_event().unwrap().deserialize()?;
1622 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1623 assert_eq!(content.body(), "low0");
1624 }
1625
1626 {
1627 assert_eq!(pending[2].transaction_id, low1_txn);
1628
1629 let deserialized = pending[2].as_event().unwrap().deserialize()?;
1630 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1631 assert_eq!(content.body(), "low1");
1632 }
1633
1634 Ok(())
1635 }
1636
1637 async fn test_send_queue_dependents(&self) -> TestResult {
1638 let room_id = room_id!("!test_send_queue_dependents:localhost");
1639
1640 let txn0 = TransactionId::new();
1642 let event0 =
1643 SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into())?;
1644 self.save_send_queue_request(
1645 room_id,
1646 txn0.clone(),
1647 MilliSecondsSinceUnixEpoch::now(),
1648 event0.clone().into(),
1649 0,
1650 )
1651 .await?;
1652
1653 assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1655
1656 let child_txn = ChildTransactionId::new();
1658 self.save_dependent_queued_request(
1659 room_id,
1660 &txn0,
1661 child_txn.clone(),
1662 MilliSecondsSinceUnixEpoch::now(),
1663 DependentQueuedRequestKind::RedactEvent,
1664 )
1665 .await?;
1666
1667 let dependents = self.load_dependent_queued_requests(room_id).await?;
1669 assert_eq!(dependents.len(), 1);
1670 assert_eq!(dependents[0].parent_transaction_id, txn0);
1671 assert_eq!(dependents[0].own_transaction_id, child_txn);
1672 assert!(dependents[0].parent_key.is_none());
1673 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1674
1675 let (event, event_type) = event0.raw();
1677 let event_id = owned_event_id!("$1");
1678 let num_updated = self
1679 .mark_dependent_queued_requests_as_ready(
1680 room_id,
1681 &txn0,
1682 SentRequestKey::Event {
1683 event_id: event_id.clone(),
1684 event: event.clone(),
1685 event_type: event_type.to_owned(),
1686 },
1687 )
1688 .await?;
1689 assert_eq!(num_updated, 1);
1690
1691 let dependents = self.load_dependent_queued_requests(room_id).await?;
1693 assert_eq!(dependents.len(), 1);
1694 assert_eq!(dependents[0].parent_transaction_id, txn0);
1695 assert_eq!(dependents[0].own_transaction_id, child_txn);
1696 assert_matches!(
1697 dependents[0].parent_key.as_ref(),
1698 Some(SentRequestKey::Event {
1699 event_id: received_event_id,
1700 event: received_event,
1701 event_type: received_event_type
1702 }) => {
1703 assert_eq!(received_event_id, &event_id);
1704 assert_eq!(received_event.json().to_string(), event.json().to_string());
1705 assert_eq!(received_event_type.as_str(), event_type);
1706 }
1707 );
1708 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1709
1710 let removed = self
1712 .remove_dependent_queued_request(room_id, &dependents[0].own_transaction_id)
1713 .await?;
1714 assert!(removed);
1715
1716 assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1718
1719 let txn1 = TransactionId::new();
1722 let event1 =
1723 SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into())?;
1724 self.save_send_queue_request(
1725 room_id,
1726 txn1.clone(),
1727 MilliSecondsSinceUnixEpoch::now(),
1728 event1.into(),
1729 0,
1730 )
1731 .await?;
1732
1733 self.save_dependent_queued_request(
1734 room_id,
1735 &txn0,
1736 ChildTransactionId::new(),
1737 MilliSecondsSinceUnixEpoch::now(),
1738 DependentQueuedRequestKind::RedactEvent,
1739 )
1740 .await?;
1741 assert_eq!(self.load_dependent_queued_requests(room_id).await?.len(), 1);
1742
1743 self.save_dependent_queued_request(
1744 room_id,
1745 &txn1,
1746 ChildTransactionId::new(),
1747 MilliSecondsSinceUnixEpoch::now(),
1748 DependentQueuedRequestKind::EditEvent {
1749 new_content: SerializableEventContent::new(
1750 &RoomMessageEventContent::text_plain("edit").into(),
1751 )?,
1752 },
1753 )
1754 .await?;
1755 assert_eq!(self.load_dependent_queued_requests(room_id).await?.len(), 2);
1756
1757 let removed = self.remove_send_queue_request(room_id, &txn0).await?;
1759 assert!(removed);
1760
1761 let dependents = self.load_dependent_queued_requests(room_id).await?;
1763 assert_eq!(dependents.len(), 2);
1764
1765 Ok(())
1766 }
1767
1768 async fn test_update_send_queue_dependent(&self) -> TestResult {
1769 let room_id = room_id!("!test_send_queue_dependents:localhost");
1770
1771 let txn = TransactionId::new();
1772
1773 let child_txn = ChildTransactionId::new();
1775
1776 self.save_dependent_queued_request(
1777 room_id,
1778 &txn,
1779 child_txn.clone(),
1780 MilliSecondsSinceUnixEpoch::now(),
1781 DependentQueuedRequestKind::RedactEvent,
1782 )
1783 .await?;
1784
1785 let dependents = self.load_dependent_queued_requests(room_id).await?;
1787 assert_eq!(dependents.len(), 1);
1788 assert_eq!(dependents[0].parent_transaction_id, txn);
1789 assert_eq!(dependents[0].own_transaction_id, child_txn);
1790 assert!(dependents[0].parent_key.is_none());
1791 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1792
1793 self.update_dependent_queued_request(
1795 room_id,
1796 &child_txn,
1797 DependentQueuedRequestKind::ReactEvent { key: "👍".to_owned() },
1798 )
1799 .await?;
1800
1801 let dependents = self.load_dependent_queued_requests(room_id).await?;
1803 assert_eq!(dependents.len(), 1);
1804 assert_eq!(dependents[0].parent_transaction_id, txn);
1805 assert_eq!(dependents[0].own_transaction_id, child_txn);
1806 assert!(dependents[0].parent_key.is_none());
1807 assert_matches!(
1808 &dependents[0].kind,
1809 DependentQueuedRequestKind::ReactEvent { key } => {
1810 assert_eq!(key, "👍");
1811 }
1812 );
1813
1814 Ok(())
1815 }
1816
1817 async fn test_get_room_infos(&self) -> TestResult {
1818 let room_id_0 = room_id!("!r0");
1819 let room_id_1 = room_id!("!r1");
1820 let room_id_2 = room_id!("!r2");
1821
1822 {
1824 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 0);
1825 }
1826
1827 let mut changes = StateChanges::default();
1829 changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
1830 changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
1831 self.save_changes(&changes).await?;
1832
1833 {
1835 let mut all_rooms = self.get_room_infos(&RoomLoadSettings::All).await?;
1836
1837 all_rooms.sort_by(|a, b| a.room_id.cmp(&b.room_id));
1840
1841 assert_eq!(all_rooms.len(), 2);
1842 assert_eq!(all_rooms[0].room_id, room_id_0);
1843 assert_eq!(all_rooms[1].room_id, room_id_1);
1844 }
1845
1846 {
1848 let all_rooms =
1849 self.get_room_infos(&RoomLoadSettings::One(room_id_1.to_owned())).await?;
1850
1851 assert_eq!(all_rooms.len(), 1);
1852 assert_eq!(all_rooms[0].room_id, room_id_1);
1853 }
1854
1855 {
1858 let all_rooms =
1859 self.get_room_infos(&RoomLoadSettings::One(room_id_2.to_owned())).await?;
1860
1861 assert_eq!(all_rooms.len(), 0);
1862 }
1863
1864 Ok(())
1865 }
1866
1867 async fn test_thread_subscriptions(&self) -> TestResult {
1868 let first_thread = event_id!("$t1");
1869 let second_thread = event_id!("$t2");
1870
1871 let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1873 assert!(maybe_sub.is_none());
1874
1875 let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1876 assert!(maybe_sub.is_none());
1877
1878 self.upsert_thread_subscriptions(vec![(
1880 room_id(),
1881 first_thread,
1882 StoredThreadSubscription {
1883 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1884 bump_stamp: None,
1885 },
1886 )])
1887 .await?;
1888
1889 self.upsert_thread_subscriptions(vec![(
1890 room_id(),
1891 second_thread,
1892 StoredThreadSubscription {
1893 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1894 bump_stamp: None,
1895 },
1896 )])
1897 .await?;
1898
1899 let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1901 assert_eq!(
1902 maybe_sub,
1903 Some(StoredThreadSubscription {
1904 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1905 bump_stamp: None,
1906 })
1907 );
1908
1909 let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1910 assert_eq!(
1911 maybe_sub,
1912 Some(StoredThreadSubscription {
1913 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1914 bump_stamp: None,
1915 })
1916 );
1917
1918 self.upsert_thread_subscriptions(vec![(
1920 room_id(),
1921 first_thread,
1922 StoredThreadSubscription {
1923 status: ThreadSubscriptionStatus::Unsubscribed,
1924 bump_stamp: None,
1925 },
1926 )])
1927 .await?;
1928
1929 let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1931 assert_eq!(
1932 maybe_sub,
1933 Some(StoredThreadSubscription {
1934 status: ThreadSubscriptionStatus::Unsubscribed,
1935 bump_stamp: None,
1936 })
1937 );
1938
1939 let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1941 assert_eq!(
1942 maybe_sub,
1943 Some(StoredThreadSubscription {
1944 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1945 bump_stamp: None,
1946 })
1947 );
1948
1949 self.remove_thread_subscription(room_id(), second_thread).await?;
1951
1952 let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1954 assert_eq!(maybe_sub, None);
1955
1956 let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1958 assert_eq!(
1959 maybe_sub,
1960 Some(StoredThreadSubscription {
1961 status: ThreadSubscriptionStatus::Unsubscribed,
1962 bump_stamp: None,
1963 })
1964 );
1965
1966 self.remove_thread_subscription(room_id(), second_thread).await?;
1968
1969 Ok(())
1970 }
1971
1972 async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult {
1973 let threads = [
1974 event_id!("$t1"),
1975 event_id!("$t2"),
1976 event_id!("$t3"),
1977 event_id!("$t4"),
1978 event_id!("$t5"),
1979 event_id!("$t6"),
1980 ];
1981 let build_subscription_updates = |subs: &[StoredThreadSubscription]| {
1984 threads
1985 .iter()
1986 .zip(subs)
1987 .map(|(&event_id, &sub)| (room_id(), event_id, sub))
1988 .collect::<Vec<_>>()
1989 };
1990
1991 let initial_subscriptions = build_subscription_updates(&[
1993 StoredThreadSubscription {
1994 status: ThreadSubscriptionStatus::Unsubscribed,
1995 bump_stamp: None,
1996 },
1997 StoredThreadSubscription {
1998 status: ThreadSubscriptionStatus::Unsubscribed,
1999 bump_stamp: Some(14),
2000 },
2001 StoredThreadSubscription {
2002 status: ThreadSubscriptionStatus::Unsubscribed,
2003 bump_stamp: None,
2004 },
2005 StoredThreadSubscription {
2006 status: ThreadSubscriptionStatus::Unsubscribed,
2007 bump_stamp: Some(210),
2008 },
2009 StoredThreadSubscription {
2010 status: ThreadSubscriptionStatus::Unsubscribed,
2011 bump_stamp: Some(5),
2012 },
2013 StoredThreadSubscription {
2014 status: ThreadSubscriptionStatus::Unsubscribed,
2015 bump_stamp: Some(100),
2016 },
2017 ]);
2018
2019 let update_subscriptions = build_subscription_updates(&[
2020 StoredThreadSubscription {
2021 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2022 bump_stamp: None,
2023 },
2024 StoredThreadSubscription {
2025 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2026 bump_stamp: None,
2027 },
2028 StoredThreadSubscription {
2029 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2030 bump_stamp: Some(1101),
2031 },
2032 StoredThreadSubscription {
2033 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2034 bump_stamp: Some(222),
2035 },
2036 StoredThreadSubscription {
2037 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2038 bump_stamp: Some(1),
2039 },
2040 StoredThreadSubscription {
2041 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2042 bump_stamp: Some(100),
2043 },
2044 ]);
2045
2046 let expected_subscriptions = build_subscription_updates(&[
2047 StoredThreadSubscription {
2049 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2050 bump_stamp: None,
2051 },
2052 StoredThreadSubscription {
2054 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2055 bump_stamp: Some(14),
2056 },
2057 StoredThreadSubscription {
2059 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2060 bump_stamp: Some(1101),
2061 },
2062 StoredThreadSubscription {
2064 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2065 bump_stamp: Some(222),
2066 },
2067 StoredThreadSubscription {
2069 status: ThreadSubscriptionStatus::Unsubscribed,
2070 bump_stamp: Some(5),
2071 },
2072 StoredThreadSubscription {
2074 status: ThreadSubscriptionStatus::Unsubscribed,
2075 bump_stamp: Some(100),
2076 },
2077 ]);
2078
2079 self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?;
2081
2082 for (room_id, thread_id, expected_sub) in &initial_subscriptions {
2084 let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2085 assert_eq!(stored_subscription, Some(*expected_sub));
2086 }
2087
2088 self.upsert_thread_subscriptions(update_subscriptions).await?;
2090
2091 for (room_id, thread_id, expected_sub) in &expected_subscriptions {
2093 let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2094 assert_eq!(stored_subscription, Some(*expected_sub));
2095 }
2096
2097 for (room_id, thread_id, _) in &expected_subscriptions {
2099 self.remove_thread_subscription(room_id, thread_id).await?;
2100 }
2101
2102 let initial_subscriptions = build_subscription_updates(&[
2103 StoredThreadSubscription {
2104 status: ThreadSubscriptionStatus::Unsubscribed,
2105 bump_stamp: Some(1),
2106 },
2107 StoredThreadSubscription {
2108 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
2109 bump_stamp: Some(1),
2110 },
2111 StoredThreadSubscription {
2112 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2113 bump_stamp: Some(1),
2114 },
2115 ]);
2116
2117 self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?;
2118
2119 for (room_id, thread_id, expected_sub) in &initial_subscriptions {
2120 let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2121 assert_eq!(stored_subscription, Some(*expected_sub));
2122 }
2123
2124 let update_subscriptions = build_subscription_updates(&[
2125 StoredThreadSubscription {
2126 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2127 bump_stamp: Some(2),
2128 },
2129 StoredThreadSubscription {
2130 status: ThreadSubscriptionStatus::Unsubscribed,
2131 bump_stamp: Some(2),
2132 },
2133 StoredThreadSubscription {
2134 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
2135 bump_stamp: Some(2),
2136 },
2137 ]);
2138
2139 self.upsert_thread_subscriptions(update_subscriptions.clone()).await?;
2140
2141 for (room_id, thread_id, expected_sub) in &update_subscriptions {
2142 let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2143 assert_eq!(stored_subscription, Some(*expected_sub));
2144 }
2145
2146 Ok(())
2147 }
2148}
2149
2150#[allow(unused_macros, unused_extern_crates)]
2176#[macro_export]
2177macro_rules! statestore_integration_tests {
2178 () => {
2179 mod statestore_integration_tests {
2180 use matrix_sdk_test::{TestResult, async_test};
2181 use $crate::store::{IntoStateStore, StateStoreIntegrationTests};
2182
2183 use super::get_store;
2184
2185 #[async_test]
2186 async fn test_topic_redaction() -> TestResult {
2187 let store = get_store().await?.into_state_store();
2188 store.test_topic_redaction().await
2189 }
2190
2191 #[async_test]
2192 async fn test_populate_store() -> TestResult {
2193 let store = get_store().await?.into_state_store();
2194 store.test_populate_store().await
2195 }
2196
2197 #[async_test]
2198 async fn test_member_saving() -> TestResult {
2199 let store = get_store().await?.into_state_store();
2200 store.test_member_saving().await
2201 }
2202
2203 #[async_test]
2204 async fn test_filter_saving() -> TestResult {
2205 let store = get_store().await?.into_state_store();
2206 store.test_filter_saving().await
2207 }
2208
2209 #[async_test]
2210 async fn test_user_avatar_url_saving() -> TestResult {
2211 let store = get_store().await?.into_state_store();
2212 store.test_user_avatar_url_saving().await
2213 }
2214
2215 #[async_test]
2216 async fn test_supported_versions_saving() -> TestResult {
2217 let store = get_store().await?.into_state_store();
2218 store.test_supported_versions_saving().await
2219 }
2220
2221 #[async_test]
2222 async fn test_well_known_saving() -> TestResult {
2223 let store = get_store().await?.into_state_store();
2224 store.test_well_known_saving().await
2225 }
2226
2227 #[async_test]
2228 async fn test_sync_token_saving() -> TestResult {
2229 let store = get_store().await?.into_state_store();
2230 store.test_sync_token_saving().await
2231 }
2232
2233 #[async_test]
2234 async fn test_utd_hook_manager_data_saving() -> TestResult {
2235 let store = get_store().await?.into_state_store();
2236 store.test_utd_hook_manager_data_saving().await
2237 }
2238
2239 #[async_test]
2240 async fn test_one_time_key_already_uploaded_data_saving() -> TestResult {
2241 let store = get_store().await?.into_state_store();
2242 store.test_one_time_key_already_uploaded_data_saving().await
2243 }
2244
2245 #[async_test]
2246 async fn test_stripped_member_saving() -> TestResult {
2247 let store = get_store().await?.into_state_store();
2248 store.test_stripped_member_saving().await
2249 }
2250
2251 #[async_test]
2252 async fn test_power_level_saving() -> TestResult {
2253 let store = get_store().await?.into_state_store();
2254 store.test_power_level_saving().await
2255 }
2256
2257 #[async_test]
2258 async fn test_receipts_saving() -> TestResult {
2259 let store = get_store().await?.into_state_store();
2260 store.test_receipts_saving().await
2261 }
2262
2263 #[async_test]
2264 async fn test_custom_storage() -> TestResult {
2265 let store = get_store().await?.into_state_store();
2266 store.test_custom_storage().await
2267 }
2268
2269 #[async_test]
2270 async fn test_stripped_non_stripped() -> TestResult {
2271 let store = get_store().await?.into_state_store();
2272 store.test_stripped_non_stripped().await
2273 }
2274
2275 #[async_test]
2276 async fn test_room_removal() -> TestResult {
2277 let store = get_store().await?.into_state_store();
2278 store.test_room_removal().await
2279 }
2280
2281 #[async_test]
2282 async fn test_profile_removal() -> TestResult {
2283 let store = get_store().await?.into_state_store();
2284 store.test_profile_removal().await
2285 }
2286
2287 #[async_test]
2288 async fn test_presence_saving() -> TestResult {
2289 let store = get_store().await?.into_state_store();
2290 store.test_presence_saving().await
2291 }
2292
2293 #[async_test]
2294 async fn test_display_names_saving() -> TestResult {
2295 let store = get_store().await?.into_state_store();
2296 store.test_display_names_saving().await
2297 }
2298
2299 #[async_test]
2300 async fn test_send_queue() -> TestResult {
2301 let store = get_store().await?.into_state_store();
2302 store.test_send_queue().await
2303 }
2304
2305 #[async_test]
2306 async fn test_send_queue_priority() -> TestResult {
2307 let store = get_store().await?.into_state_store();
2308 store.test_send_queue_priority().await
2309 }
2310
2311 #[async_test]
2312 async fn test_send_queue_dependents() -> TestResult {
2313 let store = get_store().await?.into_state_store();
2314 store.test_send_queue_dependents().await
2315 }
2316
2317 #[async_test]
2318 async fn test_update_send_queue_dependent() -> TestResult {
2319 let store = get_store().await?.into_state_store();
2320 store.test_update_send_queue_dependent().await
2321 }
2322
2323 #[async_test]
2324 async fn test_get_room_infos() -> TestResult {
2325 let store = get_store().await?.into_state_store();
2326 store.test_get_room_infos().await
2327 }
2328
2329 #[async_test]
2330 async fn test_thread_subscriptions() -> TestResult {
2331 let store = get_store().await?.into_state_store();
2332 store.test_thread_subscriptions().await
2333 }
2334
2335 #[async_test]
2336 async fn test_thread_subscriptions_bulk_upsert() -> TestResult {
2337 let store = get_store().await?.into_state_store();
2338 store.test_thread_subscriptions_bulk_upsert().await
2339 }
2340 }
2341 };
2342}
2343
2344fn user_id() -> &'static UserId {
2345 user_id!("@example:localhost")
2346}
2347
2348fn invited_user_id() -> &'static UserId {
2349 user_id!("@invited:localhost")
2350}
2351
2352fn room_id() -> &'static RoomId {
2353 room_id!("!test:localhost")
2354}
2355
2356fn stripped_room_id() -> &'static RoomId {
2357 room_id!("!stripped:localhost")
2358}
2359
2360fn first_receipt_event_id() -> &'static EventId {
2361 event_id!("$example")
2362}
2363
2364fn topic_event_id() -> &'static EventId {
2365 event_id!("$topic_event")
2366}
2367
2368fn power_level_event() -> Raw<AnySyncStateEvent> {
2369 let content = RoomPowerLevelsEventContent::new(&AuthorizationRules::V1);
2370
2371 let event = json!({
2372 "event_id": "$h29iv0s8:example.com",
2373 "content": content,
2374 "sender": user_id(),
2375 "type": "m.room.power_levels",
2376 "origin_server_ts": 0u64,
2377 "state_key": "",
2378 });
2379
2380 serde_json::from_value(event).unwrap()
2381}
2382
2383fn stripped_membership_event() -> Raw<StrippedRoomMemberEvent> {
2384 custom_stripped_membership_event(user_id())
2385}
2386
2387fn custom_stripped_membership_event(user_id: &UserId) -> Raw<StrippedRoomMemberEvent> {
2388 let ev_json = json!({
2389 "type": "m.room.member",
2390 "content": RoomMemberEventContent::new(MembershipState::Join),
2391 "sender": user_id,
2392 "state_key": user_id,
2393 });
2394
2395 Raw::new(&ev_json).unwrap().cast_unchecked()
2396}
2397
2398fn membership_event() -> Raw<SyncRoomMemberEvent> {
2399 custom_membership_event(user_id(), event_id!("$h29iv0s8:example.com"))
2400}
2401
2402fn custom_membership_event(user_id: &UserId, event_id: &EventId) -> Raw<SyncRoomMemberEvent> {
2403 let ev_json = json!({
2404 "type": "m.room.member",
2405 "content": RoomMemberEventContent::new(MembershipState::Join),
2406 "event_id": event_id,
2407 "origin_server_ts": 198,
2408 "sender": user_id,
2409 "state_key": user_id,
2410 });
2411
2412 Raw::new(&ev_json).unwrap().cast_unchecked()
2413}
2414
2415fn custom_presence_event(user_id: &UserId) -> Raw<PresenceEvent> {
2416 let ev_json = json!({
2417 "content": {
2418 "presence": "online"
2419 },
2420 "sender": user_id,
2421 });
2422
2423 Raw::new(&ev_json).unwrap().cast_unchecked()
2424}