1use std::{
4 collections::{BTreeMap, BTreeSet, HashMap},
5 str::FromStr,
6};
7
8use assert_matches::assert_matches;
9use assert_matches2::assert_let;
10use growable_bloom_filter::GrowableBloomBuilder;
11use matrix_sdk_test::{TestResult, event_factory::EventFactory};
12use ruma::{
13 EventId, MilliSecondsSinceUnixEpoch, OwnedUserId, RoomId, TransactionId, UserId,
14 api::{
15 FeatureFlag, MatrixVersion,
16 client::discovery::discover_homeserver::{HomeserverInfo, RtcFocusInfo},
17 },
18 event_id,
19 events::{
20 AnyGlobalAccountDataEvent, AnyMessageLikeEventContent, AnyRoomAccountDataEvent,
21 AnyStrippedStateEvent, AnySyncStateEvent, GlobalAccountDataEventType,
22 RoomAccountDataEventType, StateEventType, SyncStateEvent,
23 presence::PresenceEvent,
24 receipt::{ReceiptThread, ReceiptType},
25 room::{
26 member::{
27 MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent,
28 SyncRoomMemberEvent,
29 },
30 message::RoomMessageEventContent,
31 power_levels::RoomPowerLevelsEventContent,
32 redaction::SyncRoomRedactionEvent,
33 topic::RoomTopicEventContent,
34 },
35 tag::{TagInfo, TagName, Tags, UserTagName},
36 },
37 mxc_uri, owned_event_id, owned_mxc_uri,
38 presence::PresenceState,
39 push::Ruleset,
40 room_id,
41 room_version_rules::AuthorizationRules,
42 serde::Raw,
43 uint, user_id,
44};
45use serde_json::json;
46
47use super::{
48 DependentQueuedRequestKind, DisplayName, DynStateStore, RoomLoadSettings,
49 SupportedVersionsResponse, TtlStoreValue, WellKnownResponse, send_queue::SentRequestKey,
50};
51use crate::{
52 RoomInfo, RoomMemberships, RoomState, StateChanges, StateStoreDataKey, StateStoreDataValue,
53 deserialized_responses::MemberEvent,
54 store::{
55 ChildTransactionId, QueueWedgeError, SerializableEventContent, StateStoreExt,
56 StoredThreadSubscription, ThreadSubscriptionStatus,
57 },
58 utils::RawSyncStateEventWithKeys,
59};
60
61#[allow(async_fn_in_trait)]
66pub trait StateStoreIntegrationTests {
67 async fn populate(&self) -> TestResult;
69 async fn test_topic_redaction(&self) -> TestResult;
71 async fn test_populate_store(&self) -> TestResult;
73 async fn test_member_saving(&self) -> TestResult;
75 async fn test_filter_saving(&self) -> TestResult;
77 async fn test_user_avatar_url_saving(&self) -> TestResult;
79 async fn test_sync_token_saving(&self) -> TestResult;
81 async fn test_utd_hook_manager_data_saving(&self) -> TestResult;
83 async fn test_one_time_key_already_uploaded_data_saving(&self) -> TestResult;
85 async fn test_stripped_member_saving(&self) -> TestResult;
87 async fn test_power_level_saving(&self) -> TestResult;
89 async fn test_receipts_saving(&self) -> TestResult;
91 async fn test_custom_storage(&self) -> TestResult;
93 async fn test_stripped_non_stripped(&self) -> TestResult;
95 async fn test_room_removal(&self) -> TestResult;
97 async fn test_profile_removal(&self) -> TestResult;
99 async fn test_presence_saving(&self) -> TestResult;
101 async fn test_display_names_saving(&self) -> TestResult;
103 async fn test_send_queue(&self) -> TestResult;
105 async fn test_send_queue_priority(&self) -> TestResult;
107 async fn test_send_queue_dependents(&self) -> TestResult;
109 async fn test_update_send_queue_dependent(&self) -> TestResult;
111 async fn test_supported_versions_saving(&self) -> TestResult;
113 async fn test_well_known_saving(&self) -> TestResult;
115 async fn test_get_room_infos(&self) -> TestResult;
117 async fn test_thread_subscriptions(&self) -> TestResult;
119 async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult;
121}
122
123impl StateStoreIntegrationTests for DynStateStore {
124 async fn populate(&self) -> TestResult {
125 let mut changes = StateChanges::default();
126
127 let user_id = user_id();
128 let invited_user_id = invited_user_id();
129 let room_id = room_id();
130 let stripped_room_id = stripped_room_id();
131
132 changes.sync_token = Some("t392-516_47314_0_7_1_1_1_11444_1".to_owned());
133
134 let f = EventFactory::new().sender(user_id);
135 let presence_raw: Raw<PresenceEvent> = f
136 .presence(PresenceState::Online)
137 .avatar_url(mxc_uri!("mxc://localhost/wefuiwegh8742w"))
138 .currently_active(false)
139 .last_active_ago(1)
140 .status_msg("Making cupcakes")
141 .into();
142 let presence_event = presence_raw.deserialize()?;
143 changes.add_presence_event(presence_event, presence_raw);
144
145 let f = EventFactory::new().sender(user_id);
146 let pushrules_raw: Raw<AnyGlobalAccountDataEvent> =
147 f.push_rules(Ruleset::server_default(user_id)).into();
148 let pushrules_event = pushrules_raw.deserialize()?;
149 changes.account_data.insert(pushrules_event.event_type(), pushrules_raw);
150
151 let mut room = RoomInfo::new(room_id, RoomState::Joined);
152 room.mark_as_left();
153
154 let f = EventFactory::new().sender(user_id).room(room_id);
155 let mut tags = Tags::new();
156 tags.insert(TagName::Favorite, TagInfo::new());
157 tags.insert(TagName::User(UserTagName::from_str("u.work").unwrap()), TagInfo::new());
158 let tag_raw: Raw<AnyRoomAccountDataEvent> = f.tag(tags).into();
159 let tag_event = tag_raw.deserialize()?;
160 changes.add_room_account_data(room_id, tag_event, tag_raw);
161
162 let f = EventFactory::new().sender(user_id).room(room_id);
163 let name_raw: Raw<AnySyncStateEvent> = f.room_name("room name").into();
164 let name_event = name_raw.deserialize()?;
165 room.handle_state_event(
166 &mut RawSyncStateEventWithKeys::try_from_raw_state_event(name_raw.clone())
167 .expect("generated state event should be valid"),
168 );
169 changes.add_state_event(room_id, name_event, name_raw);
170
171 let f = EventFactory::new().sender(user_id);
172 let receipt_content = f
173 .room(room_id)
174 .read_receipts()
175 .add(event_id!("$example"), user_id, ReceiptType::Read, ReceiptThread::Unthreaded)
176 .into_content();
177 changes.add_receipts(room_id, receipt_content);
178
179 let topic_event_id = topic_event_id();
180 let topic_raw: Raw<AnySyncStateEvent> = EventFactory::new()
181 .room(room_id)
182 .sender(user_id)
183 .room_topic("😀")
184 .event_id(topic_event_id)
185 .prev_content(RoomTopicEventContent::new("test".to_owned()))
186 .into_raw_sync_state();
187 let topic_event = topic_raw.deserialize()?;
188 room.handle_state_event(
189 &mut RawSyncStateEventWithKeys::try_from_raw_state_event(topic_raw.clone())
190 .expect("generated state event should be valid"),
191 );
192 changes.add_state_event(room_id, topic_event, topic_raw);
193
194 let mut room_ambiguity_map = HashMap::new();
195 let mut room_profiles = BTreeMap::new();
196
197 let f = EventFactory::new().sender(user_id).room(room_id);
198 let member_raw: Raw<SyncRoomMemberEvent> =
199 f.member(user_id).display_name("example").previous(MembershipState::Invite).into();
200 let member_event: SyncRoomMemberEvent = member_raw.deserialize()?;
201 let displayname = DisplayName::new(
202 member_event.as_original().unwrap().content.displayname.as_ref().unwrap(),
203 );
204 room_ambiguity_map.insert(displayname.clone(), BTreeSet::from([user_id.to_owned()]));
205 room_profiles.insert(user_id.to_owned(), (&member_event).into());
206
207 let member_raw: Raw<AnySyncStateEvent> = member_raw.cast_unchecked();
208 let member_state_event = member_raw.deserialize()?;
209 changes.add_state_event(room_id, member_state_event, member_raw);
210
211 let f = EventFactory::new().sender(user_id).room(room_id);
212 let invited_member_raw: Raw<SyncRoomMemberEvent> = f
213 .member(invited_user_id)
214 .invited(invited_user_id)
215 .display_name("example")
216 .avatar_url(mxc_uri!("mxc://localhost/SEsfnsuifSDFSSEF"))
217 .reason("Looking for support")
218 .into();
219 let invited_member_event: SyncRoomMemberEvent = invited_member_raw.deserialize()?;
221 room_ambiguity_map.entry(displayname).or_default().insert(invited_user_id.to_owned());
222 room_profiles.insert(invited_user_id.to_owned(), (&invited_member_event).into());
223
224 let invited_member_raw: Raw<AnySyncStateEvent> = invited_member_raw.cast_unchecked();
225 let invited_member_state_event = invited_member_raw.deserialize()?;
226 changes.add_state_event(room_id, invited_member_state_event, invited_member_raw);
227
228 changes.ambiguity_maps.insert(room_id.to_owned(), room_ambiguity_map);
229 changes.profiles.insert(room_id.to_owned(), room_profiles);
230 changes.add_room(room);
231
232 let mut stripped_room = RoomInfo::new(stripped_room_id, RoomState::Invited);
233
234 let f = EventFactory::new().sender(user_id).room(stripped_room_id);
235 let stripped_name_raw: Raw<AnyStrippedStateEvent> = f.room_name("room name").into();
236 let stripped_name_event = stripped_name_raw.deserialize()?;
237 stripped_room.handle_stripped_state_event(&stripped_name_event);
238 changes.stripped_state.insert(
239 stripped_room_id.to_owned(),
240 BTreeMap::from([(
241 stripped_name_event.event_type(),
242 BTreeMap::from([(
243 stripped_name_event.state_key().to_owned(),
244 stripped_name_raw.clone(),
245 )]),
246 )]),
247 );
248
249 changes.add_room(stripped_room);
250
251 let f = EventFactory::new().sender(user_id).room(stripped_room_id);
252 let stripped_member_raw: Raw<StrippedRoomMemberEvent> =
253 f.member(user_id).display_name("example").into();
254 changes.add_stripped_member(stripped_room_id, user_id, stripped_member_raw);
255
256 self.save_changes(&changes).await?;
257
258 Ok(())
259 }
260
261 async fn test_topic_redaction(&self) -> TestResult {
262 let room_id = room_id();
263 let f = EventFactory::new();
264 self.populate().await?;
265
266 assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
267 assert_eq!(
268 self.get_state_event_static::<RoomTopicEventContent>(room_id)
269 .await?
270 .expect("room topic found before redaction")
271 .deserialize()
272 .expect("can deserialize room topic before redaction")
273 .as_sync()
274 .expect("room topic is a sync state event")
275 .as_original()
276 .expect("room topic is not redacted yet")
277 .content
278 .topic,
279 "😀"
280 );
281
282 let mut changes = StateChanges::default();
283
284 let topic_event_id = topic_event_id();
285 let redaction_evt: Raw<SyncRoomRedactionEvent> =
286 f.room(room_id).sender(user_id()).redaction(topic_event_id).into_raw();
287
288 changes.add_redaction(room_id, topic_event_id, redaction_evt);
289 self.save_changes(&changes).await?;
290
291 let redacted_event = self
292 .get_state_event_static::<RoomTopicEventContent>(room_id)
293 .await?
294 .expect("room topic found after redaction")
295 .deserialize()
296 .expect("can deserialize room topic after redaction");
297
298 assert_matches!(redacted_event.as_sync(), Some(SyncStateEvent::Redacted(_)));
299
300 Ok(())
301 }
302
303 async fn test_populate_store(&self) -> TestResult {
304 let room_id = room_id();
305 let user_id = user_id();
306 let display_name = DisplayName::new("example");
307
308 self.populate().await?;
309
310 assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
311 assert!(self.get_presence_event(user_id).await?.is_some());
312 assert_eq!(
313 self.get_room_infos(&RoomLoadSettings::default()).await?.len(),
314 2,
315 "Expected to find 2 room infos"
316 );
317 assert!(
318 self.get_account_data_event(GlobalAccountDataEventType::PushRules).await?.is_some()
319 );
320
321 assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_some());
322 assert_eq!(
323 self.get_state_events(room_id, StateEventType::RoomTopic).await?.len(),
324 1,
325 "Expected to find 1 room topic"
326 );
327 assert!(self.get_profile(room_id, user_id).await?.is_some());
328 assert!(self.get_member_event(room_id, user_id).await?.is_some());
329 assert_eq!(
330 self.get_user_ids(room_id, RoomMemberships::empty()).await?.len(),
331 2,
332 "Expected to find 2 members for room"
333 );
334 assert_eq!(
335 self.get_user_ids(room_id, RoomMemberships::INVITE).await?.len(),
336 1,
337 "Expected to find 1 invited user ids"
338 );
339 assert_eq!(
340 self.get_user_ids(room_id, RoomMemberships::JOIN).await?.len(),
341 1,
342 "Expected to find 1 joined user ids"
343 );
344 assert_eq!(
345 self.get_users_with_display_name(room_id, &display_name).await?.len(),
346 2,
347 "Expected to find 2 display names for room"
348 );
349 assert!(
350 self.get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
351 .await?
352 .is_some()
353 );
354 assert!(
355 self.get_user_room_receipt_event(
356 room_id,
357 ReceiptType::Read,
358 ReceiptThread::Unthreaded,
359 user_id
360 )
361 .await?
362 .is_some()
363 );
364 assert_eq!(
365 self.get_event_room_receipt_events(
366 room_id,
367 ReceiptType::Read,
368 ReceiptThread::Unthreaded,
369 first_receipt_event_id()
370 )
371 .await?
372 .len(),
373 1,
374 "Expected to find 1 read receipt"
375 );
376 Ok(())
377 }
378
379 async fn test_member_saving(&self) -> TestResult {
380 let room_id = room_id!("!test_member_saving:localhost");
381 let user_id = user_id();
382 let second_user_id = user_id!("@second:localhost");
383 let third_user_id = user_id!("@third:localhost");
384 let unknown_user_id = user_id!("@unknown:localhost");
385
386 let mut user_ids = vec![user_id.to_owned()];
388 assert!(self.get_member_event(room_id, user_id).await?.is_none());
389 let member_events = self
390 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
391 .await;
392 assert!(member_events?.is_empty());
393 assert!(self.get_profile(room_id, user_id).await?.is_none());
394 let profiles = self.get_profiles(room_id, &user_ids).await;
395 assert!(profiles?.is_empty());
396
397 let mut changes = StateChanges::default();
399 let raw_member_event = membership_event();
400 let profile = raw_member_event.deserialize()?.into();
401 changes
402 .state
403 .entry(room_id.to_owned())
404 .or_default()
405 .entry(StateEventType::RoomMember)
406 .or_default()
407 .insert(user_id.into(), raw_member_event.cast());
408 changes.profiles.entry(room_id.to_owned()).or_default().insert(user_id.to_owned(), profile);
409 self.save_changes(&changes).await?;
410
411 assert!(self.get_member_event(room_id, user_id).await?.is_some());
412 let member_events = self
413 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
414 .await;
415 assert_eq!(member_events?.len(), 1);
416 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
417 assert_eq!(members.len(), 1, "We expected to find members for the room");
418 assert!(self.get_profile(room_id, user_id).await?.is_some());
419 let profiles = self.get_profiles(room_id, &user_ids).await;
420 assert_eq!(profiles?.len(), 1);
421
422 let mut changes = StateChanges::default();
424 let changes_members = changes
425 .state
426 .entry(room_id.to_owned())
427 .or_default()
428 .entry(StateEventType::RoomMember)
429 .or_default();
430 let changes_profiles = changes.profiles.entry(room_id.to_owned()).or_default();
431 let raw_second_member_event =
432 custom_membership_event(second_user_id, event_id!("$second_member_event"));
433 let second_profile = raw_second_member_event.deserialize()?.into();
434 changes_members.insert(second_user_id.into(), raw_second_member_event.cast());
435 changes_profiles.insert(second_user_id.to_owned(), second_profile);
436 let raw_third_member_event =
437 custom_membership_event(third_user_id, event_id!("$third_member_event"));
438 let third_profile = raw_third_member_event.deserialize()?.into();
439 changes_members.insert(third_user_id.into(), raw_third_member_event.cast());
440 changes_profiles.insert(third_user_id.to_owned(), third_profile);
441 self.save_changes(&changes).await?;
442
443 user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
444 assert!(self.get_member_event(room_id, second_user_id).await?.is_some());
445 assert!(self.get_member_event(room_id, third_user_id).await?.is_some());
446 let member_events = self
447 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
448 .await;
449 assert_eq!(member_events?.len(), 3);
450 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
451 assert_eq!(members.len(), 3, "We expected to find members for the room");
452 assert!(self.get_profile(room_id, second_user_id).await?.is_some());
453 assert!(self.get_profile(room_id, third_user_id).await?.is_some());
454 let profiles = self.get_profiles(room_id, &user_ids).await;
455 assert_eq!(profiles?.len(), 3);
456
457 user_ids.push(unknown_user_id.to_owned());
459 let member_events = self
460 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
461 .await;
462 assert_eq!(member_events?.len(), 3);
463 let profiles = self.get_profiles(room_id, &user_ids).await;
464 assert_eq!(profiles?.len(), 3);
465
466 let member_events = self
468 .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
469 room_id,
470 &[],
471 )
472 .await;
473 assert!(member_events?.is_empty());
474 let profiles = self.get_profiles(room_id, &[]).await;
475 assert!(profiles?.is_empty());
476
477 Ok(())
478 }
479
480 async fn test_filter_saving(&self) -> TestResult {
481 let filter_name = "filter_name";
482 let filter_id = "filter_id_1234";
483
484 self.set_kv_data(
485 StateStoreDataKey::Filter(filter_name),
486 StateStoreDataValue::Filter(filter_id.to_owned()),
487 )
488 .await?;
489 assert_let!(
490 Ok(Some(StateStoreDataValue::Filter(stored_filter_id))) =
491 self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await
492 );
493 assert_eq!(stored_filter_id, filter_id);
494
495 self.remove_kv_data(StateStoreDataKey::Filter(filter_name)).await?;
496 assert_matches!(self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await, Ok(None));
497
498 Ok(())
499 }
500
501 async fn test_user_avatar_url_saving(&self) -> TestResult {
502 let user_id = user_id!("@alice:example.org");
503 let url = owned_mxc_uri!("mxc://example.org/poiuyt098");
504
505 self.set_kv_data(
506 StateStoreDataKey::UserAvatarUrl(user_id),
507 StateStoreDataValue::UserAvatarUrl(url.clone()),
508 )
509 .await?;
510
511 assert_let!(
512 Ok(Some(StateStoreDataValue::UserAvatarUrl(stored_url))) =
513 self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await
514 );
515 assert_eq!(stored_url, url);
516
517 self.remove_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await?;
518 assert_matches!(
519 self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await,
520 Ok(None)
521 );
522
523 Ok(())
524 }
525
526 async fn test_supported_versions_saving(&self) -> TestResult {
527 let versions =
528 BTreeSet::from([MatrixVersion::V1_1, MatrixVersion::V1_2, MatrixVersion::V1_11]);
529 let supported_versions = SupportedVersionsResponse {
530 versions: versions.iter().map(|version| version.as_str().unwrap().to_owned()).collect(),
531 unstable_features: [("org.matrix.experimental".to_owned(), true)].into(),
532 };
533
534 self.set_kv_data(
535 StateStoreDataKey::SupportedVersions,
536 StateStoreDataValue::SupportedVersions(TtlStoreValue::new(supported_versions.clone())),
537 )
538 .await?;
539
540 assert_let!(
541 Ok(Some(StateStoreDataValue::SupportedVersions(stored_supported_versions))) =
542 self.get_kv_data(StateStoreDataKey::SupportedVersions).await
543 );
544 assert_let!(Some(stored_supported_versions) = stored_supported_versions.into_data());
545 assert_eq!(supported_versions, stored_supported_versions);
546
547 let stored_supported = stored_supported_versions.supported_versions();
548 assert_eq!(stored_supported.versions, versions);
549 assert_eq!(stored_supported.features.len(), 1);
550 assert!(stored_supported.features.contains(&FeatureFlag::from("org.matrix.experimental")));
551
552 self.remove_kv_data(StateStoreDataKey::SupportedVersions).await?;
553 assert_matches!(self.get_kv_data(StateStoreDataKey::SupportedVersions).await, Ok(None));
554
555 Ok(())
556 }
557
558 async fn test_well_known_saving(&self) -> TestResult {
559 let well_known = WellKnownResponse {
560 homeserver: HomeserverInfo::new("matrix.example.com".to_owned()),
561 identity_server: None,
562 tile_server: None,
563 rtc_foci: vec![RtcFocusInfo::livekit("livekit.example.com".to_owned())],
564 };
565
566 self.set_kv_data(
567 StateStoreDataKey::WellKnown,
568 StateStoreDataValue::WellKnown(TtlStoreValue::new(Some(well_known.clone()))),
569 )
570 .await?;
571
572 assert_let!(
573 Ok(Some(StateStoreDataValue::WellKnown(stored_well_known))) =
574 self.get_kv_data(StateStoreDataKey::WellKnown).await
575 );
576 assert_let!(Some(stored_well_known) = stored_well_known.into_data());
577 assert_eq!(stored_well_known, Some(well_known));
578
579 self.remove_kv_data(StateStoreDataKey::WellKnown).await?;
580 assert_matches!(self.get_kv_data(StateStoreDataKey::WellKnown).await, Ok(None));
581
582 self.set_kv_data(
583 StateStoreDataKey::WellKnown,
584 StateStoreDataValue::WellKnown(TtlStoreValue::new(None)),
585 )
586 .await?;
587
588 assert_let!(
589 Ok(Some(StateStoreDataValue::WellKnown(stored_well_known))) =
590 self.get_kv_data(StateStoreDataKey::WellKnown).await
591 );
592 assert_let!(Some(stored_well_known) = stored_well_known.into_data());
593 assert_eq!(stored_well_known, None);
594
595 Ok(())
596 }
597
598 async fn test_sync_token_saving(&self) -> TestResult {
599 let sync_token_1 = "t392-516_47314_0_7_1";
600 let sync_token_2 = "t392-516_47314_0_7_2";
601
602 assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
603
604 let changes =
605 StateChanges { sync_token: Some(sync_token_1.to_owned()), ..Default::default() };
606 self.save_changes(&changes).await?;
607 assert_let!(
608 Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
609 self.get_kv_data(StateStoreDataKey::SyncToken).await
610 );
611 assert_eq!(stored_sync_token, sync_token_1);
612
613 self.set_kv_data(
614 StateStoreDataKey::SyncToken,
615 StateStoreDataValue::SyncToken(sync_token_2.to_owned()),
616 )
617 .await?;
618 assert_let!(
619 Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
620 self.get_kv_data(StateStoreDataKey::SyncToken).await
621 );
622 assert_eq!(stored_sync_token, sync_token_2);
623
624 self.remove_kv_data(StateStoreDataKey::SyncToken).await?;
625 assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
626
627 Ok(())
628 }
629
630 async fn test_utd_hook_manager_data_saving(&self) -> TestResult {
631 assert!(
633 self.get_kv_data(StateStoreDataKey::UtdHookManagerData)
634 .await
635 .expect("Could not read data")
636 .is_none(),
637 "Store was not empty at start"
638 );
639
640 let data = GrowableBloomBuilder::new().build();
642 self.set_kv_data(
643 StateStoreDataKey::UtdHookManagerData,
644 StateStoreDataValue::UtdHookManagerData(data.clone()),
645 )
646 .await
647 .expect("Could not save data");
648
649 let read_data = self
651 .get_kv_data(StateStoreDataKey::UtdHookManagerData)
652 .await
653 .expect("Could not read data")
654 .expect("no data found")
655 .into_utd_hook_manager_data()
656 .expect("not UtdHookManagerData");
657
658 assert_eq!(read_data, data);
659
660 Ok(())
661 }
662
663 async fn test_one_time_key_already_uploaded_data_saving(&self) -> TestResult {
664 assert!(
666 self.get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded).await?.is_none(),
667 "Store was not empty at start"
668 );
669
670 self.set_kv_data(
671 StateStoreDataKey::OneTimeKeyAlreadyUploaded,
672 StateStoreDataValue::OneTimeKeyAlreadyUploaded,
673 )
674 .await?;
675
676 let data = self.get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded).await?;
677 data.expect("The loaded data should be Some");
678
679 Ok(())
680 }
681
682 async fn test_stripped_member_saving(&self) -> TestResult {
683 let room_id = room_id!("!test_stripped_member_saving:localhost");
684 let user_id = user_id();
685 let second_user_id = user_id!("@second:localhost");
686 let third_user_id = user_id!("@third:localhost");
687 let unknown_user_id = user_id!("@unknown:localhost");
688
689 assert!(self.get_member_event(room_id, user_id).await?.is_none());
691 let member_events = self
692 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
693 room_id,
694 &[user_id.to_owned()],
695 )
696 .await;
697 assert!(member_events?.is_empty());
698
699 let mut changes = StateChanges::default();
701 changes
702 .stripped_state
703 .entry(room_id.to_owned())
704 .or_default()
705 .entry(StateEventType::RoomMember)
706 .or_default()
707 .insert(user_id.into(), stripped_membership_event().cast());
708 self.save_changes(&changes).await?;
709
710 assert!(self.get_member_event(room_id, user_id).await?.is_some());
711 let member_events = self
712 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
713 room_id,
714 &[user_id.to_owned()],
715 )
716 .await;
717 assert_eq!(member_events?.len(), 1);
718 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
719 assert_eq!(members.len(), 1, "We expected to find members for the room");
720
721 let mut changes = StateChanges::default();
723 let changes_members = changes
724 .stripped_state
725 .entry(room_id.to_owned())
726 .or_default()
727 .entry(StateEventType::RoomMember)
728 .or_default();
729 changes_members
730 .insert(second_user_id.into(), custom_stripped_membership_event(second_user_id).cast());
731 changes_members
732 .insert(third_user_id.into(), custom_stripped_membership_event(third_user_id).cast());
733 self.save_changes(&changes).await?;
734
735 assert!(self.get_member_event(room_id, second_user_id).await?.is_some());
736 assert!(self.get_member_event(room_id, third_user_id).await?.is_some());
737 let member_events = self
738 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
739 room_id,
740 &[user_id.to_owned(), second_user_id.to_owned(), third_user_id.to_owned()],
741 )
742 .await;
743 assert_eq!(member_events?.len(), 3);
744 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
745 assert_eq!(members.len(), 3, "We expected to find members for the room");
746
747 let member_events = self
749 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
750 room_id,
751 &[
752 user_id.to_owned(),
753 second_user_id.to_owned(),
754 third_user_id.to_owned(),
755 unknown_user_id.to_owned(),
756 ],
757 )
758 .await;
759 assert_eq!(member_events?.len(), 3);
760
761 let member_events = self
763 .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
764 room_id,
765 &[],
766 )
767 .await;
768 assert!(member_events?.is_empty());
769
770 Ok(())
771 }
772
773 async fn test_power_level_saving(&self) -> TestResult {
774 let room_id = room_id!("!test_power_level_saving:localhost");
775
776 let raw_event = power_level_event();
777 let event = raw_event.deserialize()?;
778
779 assert!(
780 self.get_state_event(room_id, StateEventType::RoomPowerLevels, "").await?.is_none()
781 );
782 let mut changes = StateChanges::default();
783 changes.add_state_event(room_id, event, raw_event);
784
785 self.save_changes(&changes).await?;
786 assert!(
787 self.get_state_event(room_id, StateEventType::RoomPowerLevels, "").await?.is_some()
788 );
789
790 Ok(())
791 }
792
793 async fn test_receipts_saving(&self) -> TestResult {
794 let room_id = room_id!("!test_receipts_saving:localhost");
795
796 let first_event_id = event_id!("$1435641916114394fHBLK:matrix.org");
797 let second_event_id = event_id!("$fHBLK1435641916114394:matrix.org");
798
799 let first_receipt_ts = uint!(1436451550);
800 let second_receipt_ts = uint!(1436451653);
801 let third_receipt_ts = uint!(1436474532);
802
803 let first_receipt_event = serde_json::from_value(json!({
804 first_event_id: {
805 "m.read": {
806 user_id(): {
807 "ts": first_receipt_ts,
808 }
809 }
810 }
811 }))?;
812
813 let second_receipt_event = serde_json::from_value(json!({
814 second_event_id: {
815 "m.read": {
816 user_id(): {
817 "ts": second_receipt_ts,
818 }
819 }
820 }
821 }))?;
822
823 let third_receipt_event = serde_json::from_value(json!({
824 second_event_id: {
825 "m.read": {
826 user_id(): {
827 "ts": third_receipt_ts,
828 "thread_id": "main",
829 }
830 }
831 }
832 }))?;
833
834 assert!(
835 self.get_user_room_receipt_event(
836 room_id,
837 ReceiptType::Read,
838 ReceiptThread::Unthreaded,
839 user_id()
840 )
841 .await
842 .expect("failed to read unthreaded user room receipt")
843 .is_none()
844 );
845 assert!(
846 self.get_event_room_receipt_events(
847 room_id,
848 ReceiptType::Read,
849 ReceiptThread::Unthreaded,
850 first_event_id
851 )
852 .await
853 .expect("failed to read unthreaded event room receipt for 1")
854 .is_empty()
855 );
856 assert!(
857 self.get_event_room_receipt_events(
858 room_id,
859 ReceiptType::Read,
860 ReceiptThread::Unthreaded,
861 second_event_id
862 )
863 .await
864 .expect("failed to read unthreaded event room receipt for 2")
865 .is_empty()
866 );
867
868 let mut changes = StateChanges::default();
869 changes.add_receipts(room_id, first_receipt_event);
870
871 self.save_changes(&changes).await?;
872 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
873 .get_user_room_receipt_event(
874 room_id,
875 ReceiptType::Read,
876 ReceiptThread::Unthreaded,
877 user_id(),
878 )
879 .await
880 .expect("failed to read unthreaded user room receipt after save")
881 .unwrap();
882 assert_eq!(unthreaded_user_receipt_event_id, first_event_id);
883 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, first_receipt_ts);
884 let first_event_unthreaded_receipts = self
885 .get_event_room_receipt_events(
886 room_id,
887 ReceiptType::Read,
888 ReceiptThread::Unthreaded,
889 first_event_id,
890 )
891 .await
892 .expect("failed to read unthreaded event room receipt for 1 after save");
893 assert_eq!(
894 first_event_unthreaded_receipts.len(),
895 1,
896 "Found a wrong number of unthreaded receipts for 1 after save"
897 );
898 assert_eq!(first_event_unthreaded_receipts[0].0, user_id());
899 assert_eq!(first_event_unthreaded_receipts[0].1.ts.unwrap().0, first_receipt_ts);
900 assert!(
901 self.get_event_room_receipt_events(
902 room_id,
903 ReceiptType::Read,
904 ReceiptThread::Unthreaded,
905 second_event_id
906 )
907 .await
908 .expect("failed to read unthreaded event room receipt for 2 after save")
909 .is_empty()
910 );
911
912 let mut changes = StateChanges::default();
913 changes.add_receipts(room_id, second_receipt_event);
914
915 self.save_changes(&changes).await.expect("Saving works");
916 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
917 .get_user_room_receipt_event(
918 room_id,
919 ReceiptType::Read,
920 ReceiptThread::Unthreaded,
921 user_id(),
922 )
923 .await
924 .expect("Getting unthreaded user room receipt after save failed")
925 .unwrap();
926 assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
927 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
928 assert!(
929 self.get_event_room_receipt_events(
930 room_id,
931 ReceiptType::Read,
932 ReceiptThread::Unthreaded,
933 first_event_id
934 )
935 .await
936 .expect("Getting unthreaded event room receipt events for first event failed")
937 .is_empty()
938 );
939 let second_event_unthreaded_receipts = self
940 .get_event_room_receipt_events(
941 room_id,
942 ReceiptType::Read,
943 ReceiptThread::Unthreaded,
944 second_event_id,
945 )
946 .await
947 .expect("Getting unthreaded event room receipt events for second event failed");
948 assert_eq!(
949 second_event_unthreaded_receipts.len(),
950 1,
951 "Found a wrong number of unthreaded receipts for second event after save"
952 );
953 assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
954 assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
955
956 assert!(
957 self.get_user_room_receipt_event(
958 room_id,
959 ReceiptType::Read,
960 ReceiptThread::Main,
961 user_id()
962 )
963 .await
964 .expect("failed to read threaded user room receipt")
965 .is_none()
966 );
967 assert!(
968 self.get_event_room_receipt_events(
969 room_id,
970 ReceiptType::Read,
971 ReceiptThread::Main,
972 second_event_id
973 )
974 .await
975 .expect("Getting threaded event room receipts for 2 failed")
976 .is_empty()
977 );
978
979 let mut changes = StateChanges::default();
980 changes.add_receipts(room_id, third_receipt_event);
981
982 self.save_changes(&changes).await.expect("Saving works");
983 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
985 .get_user_room_receipt_event(
986 room_id,
987 ReceiptType::Read,
988 ReceiptThread::Unthreaded,
989 user_id(),
990 )
991 .await
992 .expect("Getting unthreaded user room receipt after save failed")
993 .unwrap();
994 assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
995 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
996 let second_event_unthreaded_receipts = self
997 .get_event_room_receipt_events(
998 room_id,
999 ReceiptType::Read,
1000 ReceiptThread::Unthreaded,
1001 second_event_id,
1002 )
1003 .await
1004 .expect("Getting unthreaded event room receipt events for second event failed");
1005 assert_eq!(
1006 second_event_unthreaded_receipts.len(),
1007 1,
1008 "Found a wrong number of unthreaded receipts for second event after save"
1009 );
1010 assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
1011 assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
1012 let (threaded_user_receipt_event_id, threaded_user_receipt) = self
1014 .get_user_room_receipt_event(room_id, ReceiptType::Read, ReceiptThread::Main, user_id())
1015 .await
1016 .expect("Getting threaded user room receipt after save failed")
1017 .unwrap();
1018 assert_eq!(threaded_user_receipt_event_id, second_event_id);
1019 assert_eq!(threaded_user_receipt.ts.unwrap().0, third_receipt_ts);
1020 let second_event_threaded_receipts = self
1021 .get_event_room_receipt_events(
1022 room_id,
1023 ReceiptType::Read,
1024 ReceiptThread::Main,
1025 second_event_id,
1026 )
1027 .await
1028 .expect("Getting threaded event room receipt events for second event failed");
1029 assert_eq!(
1030 second_event_threaded_receipts.len(),
1031 1,
1032 "Found a wrong number of threaded receipts for second event after save"
1033 );
1034 assert_eq!(second_event_threaded_receipts[0].0, user_id());
1035 assert_eq!(second_event_threaded_receipts[0].1.ts.unwrap().0, third_receipt_ts);
1036
1037 Ok(())
1038 }
1039
1040 async fn test_custom_storage(&self) -> TestResult {
1041 let key = "my_key";
1042 let value = &[0, 1, 2, 3];
1043
1044 self.set_custom_value(key.as_bytes(), value.to_vec()).await?;
1045
1046 let read = self.get_custom_value(key.as_bytes()).await?;
1047
1048 assert_eq!(Some(value.as_ref()), read.as_deref());
1049
1050 Ok(())
1051 }
1052
1053 async fn test_stripped_non_stripped(&self) -> TestResult {
1054 let room_id = room_id!("!test_stripped_non_stripped:localhost");
1055 let user_id = user_id();
1056
1057 assert!(self.get_member_event(room_id, user_id).await?.is_none());
1058 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 0);
1059
1060 let mut changes = StateChanges::default();
1061 changes
1062 .state
1063 .entry(room_id.to_owned())
1064 .or_default()
1065 .entry(StateEventType::RoomMember)
1066 .or_default()
1067 .insert(user_id.into(), membership_event().cast());
1068 changes.add_room(RoomInfo::new(room_id, RoomState::Left));
1069 self.save_changes(&changes).await?;
1070
1071 let member_event = self.get_member_event(room_id, user_id).await?.unwrap().deserialize()?;
1072 assert!(matches!(member_event, MemberEvent::Sync(_)));
1073 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 1);
1074
1075 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
1076 assert_eq!(members, vec![user_id.to_owned()]);
1077
1078 let mut changes = StateChanges::default();
1079 changes.add_stripped_member(room_id, user_id, custom_stripped_membership_event(user_id));
1080 changes.add_room(RoomInfo::new(room_id, RoomState::Invited));
1081 self.save_changes(&changes).await?;
1082
1083 let member_event = self.get_member_event(room_id, user_id).await?.unwrap().deserialize()?;
1084 assert!(matches!(member_event, MemberEvent::Stripped(_)));
1085 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 1);
1086
1087 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
1088 assert_eq!(members, vec![user_id.to_owned()]);
1089
1090 Ok(())
1091 }
1092
1093 async fn test_room_removal(&self) -> TestResult {
1094 let room_id = room_id();
1095 let user_id = user_id();
1096 let display_name = DisplayName::new("example");
1097 let stripped_room_id = stripped_room_id();
1098
1099 self.populate().await?;
1100
1101 {
1102 let txn = TransactionId::new();
1104 let ev =
1105 SerializableEventContent::new(&RoomMessageEventContent::text_plain("sup").into())?;
1106 self.save_send_queue_request(
1107 room_id,
1108 txn.clone(),
1109 MilliSecondsSinceUnixEpoch::now(),
1110 ev.into(),
1111 0,
1112 )
1113 .await?;
1114
1115 self.save_dependent_queued_request(
1117 room_id,
1118 &txn,
1119 ChildTransactionId::new(),
1120 MilliSecondsSinceUnixEpoch::now(),
1121 DependentQueuedRequestKind::RedactEvent,
1122 )
1123 .await?;
1124 }
1125
1126 self.remove_room(room_id).await?;
1127
1128 assert_eq!(
1129 self.get_room_infos(&RoomLoadSettings::default()).await?.len(),
1130 1,
1131 "room is still there"
1132 );
1133
1134 assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_none());
1135 assert!(
1136 self.get_state_events(room_id, StateEventType::RoomTopic).await?.is_empty(),
1137 "still state events found"
1138 );
1139 assert!(self.get_profile(room_id, user_id).await?.is_none());
1140 assert!(self.get_member_event(room_id, user_id).await?.is_none());
1141 assert!(
1142 self.get_user_ids(room_id, RoomMemberships::empty()).await?.is_empty(),
1143 "still user ids found"
1144 );
1145 assert!(
1146 self.get_user_ids(room_id, RoomMemberships::INVITE).await?.is_empty(),
1147 "still invited user ids found"
1148 );
1149 assert!(
1150 self.get_user_ids(room_id, RoomMemberships::JOIN).await?.is_empty(),
1151 "still joined users found"
1152 );
1153 assert!(
1154 self.get_users_with_display_name(room_id, &display_name).await?.is_empty(),
1155 "still display names found"
1156 );
1157 assert!(
1158 self.get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
1159 .await?
1160 .is_none()
1161 );
1162 assert!(
1163 self.get_user_room_receipt_event(
1164 room_id,
1165 ReceiptType::Read,
1166 ReceiptThread::Unthreaded,
1167 user_id
1168 )
1169 .await?
1170 .is_none()
1171 );
1172 assert!(
1173 self.get_event_room_receipt_events(
1174 room_id,
1175 ReceiptType::Read,
1176 ReceiptThread::Unthreaded,
1177 first_receipt_event_id()
1178 )
1179 .await?
1180 .is_empty(),
1181 "still event recepts in the store"
1182 );
1183 assert!(self.load_send_queue_requests(room_id).await?.is_empty());
1184 assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1185
1186 self.remove_room(stripped_room_id).await?;
1187
1188 assert!(
1189 self.get_room_infos(&RoomLoadSettings::default()).await?.is_empty(),
1190 "still room info found"
1191 );
1192 Ok(())
1193 }
1194
1195 async fn test_profile_removal(&self) -> TestResult {
1196 let room_id = room_id();
1197
1198 let user_id = user_id();
1200 let invited_user_id = invited_user_id();
1201
1202 self.populate().await?;
1203
1204 let new_invite_member_json = json!({
1205 "content": {
1206 "avatar_url": "mxc://localhost/SEsfnsuifSDFSSEG",
1207 "displayname": "example after update",
1208 "membership": "invite",
1209 "reason": "Looking for support"
1210 },
1211 "event_id": "$143273582443PhrSm:localhost",
1212 "origin_server_ts": 1432735824,
1213 "room_id": room_id,
1214 "sender": user_id,
1215 "state_key": invited_user_id,
1216 "type": "m.room.member",
1217 });
1218 let new_invite_member_event: SyncRoomMemberEvent =
1219 serde_json::from_value(new_invite_member_json.clone())?;
1220
1221 let mut changes = StateChanges {
1222 profiles_to_delete: [(
1224 room_id.to_owned(),
1225 vec![user_id.to_owned(), invited_user_id.to_owned()],
1226 )]
1227 .into(),
1228
1229 profiles: {
1231 let mut map = BTreeMap::default();
1232 map.insert(
1233 room_id.to_owned(),
1234 [(invited_user_id.to_owned(), new_invite_member_event.into())]
1235 .into_iter()
1236 .collect(),
1237 );
1238 map
1239 },
1240
1241 ..StateChanges::default()
1242 };
1243
1244 let raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(new_invite_member_json)
1245 .expect("can create sync-state-event for topic");
1246 let event = raw.deserialize()?;
1247 changes.add_state_event(room_id, event, raw);
1248
1249 self.save_changes(&changes).await?;
1250
1251 assert!(self.get_profile(room_id, user_id).await?.is_none());
1253 assert!(self.get_member_event(room_id, user_id).await?.is_some());
1254
1255 let invited_member_event = self.get_profile(room_id, invited_user_id).await?.unwrap();
1257 assert_eq!(
1258 invited_member_event.content.displayname.as_deref(),
1259 Some("example after update")
1260 );
1261 assert!(self.get_member_event(room_id, invited_user_id).await?.is_some());
1262
1263 Ok(())
1264 }
1265
1266 async fn test_presence_saving(&self) -> TestResult {
1267 let user_id = user_id();
1268 let second_user_id = user_id!("@second:localhost");
1269 let third_user_id = user_id!("@third:localhost");
1270 let unknown_user_id = user_id!("@unknown:localhost");
1271
1272 let mut user_ids = vec![user_id.to_owned()];
1274 let presence_event = self.get_presence_event(user_id).await;
1275 assert!(presence_event?.is_none());
1276 let presence_events = self.get_presence_events(&user_ids).await;
1277 assert!(presence_events?.is_empty());
1278
1279 let mut changes = StateChanges::default();
1281 changes.presence.insert(user_id.to_owned(), custom_presence_event(user_id));
1282 self.save_changes(&changes).await?;
1283
1284 let presence_event = self.get_presence_event(user_id).await;
1285 assert!(presence_event?.is_some());
1286 let presence_events = self.get_presence_events(&user_ids).await;
1287 assert_eq!(presence_events?.len(), 1);
1288
1289 let mut changes = StateChanges::default();
1291 changes.presence.insert(second_user_id.to_owned(), custom_presence_event(second_user_id));
1292 changes.presence.insert(third_user_id.to_owned(), custom_presence_event(third_user_id));
1293 self.save_changes(&changes).await?;
1294
1295 user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
1296 let presence_event = self.get_presence_event(second_user_id).await;
1297 assert!(presence_event?.is_some());
1298 let presence_event = self.get_presence_event(third_user_id).await;
1299 assert!(presence_event?.is_some());
1300 let presence_events = self.get_presence_events(&user_ids).await;
1301 assert_eq!(presence_events?.len(), 3);
1302
1303 user_ids.push(unknown_user_id.to_owned());
1305 let member_events = self.get_presence_events(&user_ids).await;
1306 assert_eq!(member_events?.len(), 3);
1307
1308 let presence_events = self.get_presence_events(&[]).await;
1310 assert!(presence_events?.is_empty());
1311
1312 Ok(())
1313 }
1314
1315 async fn test_display_names_saving(&self) -> TestResult {
1316 let room_id = room_id!("!test_display_names_saving:localhost");
1317 let user_id = user_id();
1318 let user_display_name = DisplayName::new("User");
1319 let second_user_id = user_id!("@second:localhost");
1320 let third_user_id = user_id!("@third:localhost");
1321 let other_display_name = DisplayName::new("Raoul");
1322 let unknown_display_name = DisplayName::new("Unknown");
1323
1324 let mut display_names = vec![user_display_name.to_owned()];
1326 let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1327 assert!(users.is_empty());
1328 let names = self.get_users_with_display_names(room_id, &display_names).await?;
1329 assert!(names.is_empty());
1330
1331 let mut changes = StateChanges::default();
1333 changes
1334 .ambiguity_maps
1335 .entry(room_id.to_owned())
1336 .or_default()
1337 .insert(user_display_name.to_owned(), [user_id.to_owned()].into());
1338 self.save_changes(&changes).await?;
1339
1340 let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1341 assert_eq!(users.len(), 1);
1342 let names = self.get_users_with_display_names(room_id, &display_names).await?;
1343 assert_eq!(names.len(), 1);
1344 assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1345
1346 let mut changes = StateChanges::default();
1348 changes.ambiguity_maps.entry(room_id.to_owned()).or_default().insert(
1349 other_display_name.to_owned(),
1350 [second_user_id.to_owned(), third_user_id.to_owned()].into(),
1351 );
1352 self.save_changes(&changes).await?;
1353
1354 display_names.push(other_display_name.to_owned());
1355 let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1356 assert_eq!(users.len(), 1);
1357 let users = self.get_users_with_display_name(room_id, &other_display_name).await?;
1358 assert_eq!(users.len(), 2);
1359 let names = self.get_users_with_display_names(room_id, &display_names).await?;
1360 assert_eq!(names.len(), 2);
1361 assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1362 assert_eq!(names.get(&other_display_name).unwrap().len(), 2);
1363
1364 display_names.push(unknown_display_name.to_owned());
1366 let names = self.get_users_with_display_names(room_id, &display_names).await?;
1367 assert_eq!(names.len(), 2);
1368
1369 let names = self.get_users_with_display_names(room_id, &[]).await?;
1371 assert!(names.is_empty());
1372
1373 Ok(())
1374 }
1375
1376 #[allow(clippy::needless_range_loop)]
1377 async fn test_send_queue(&self) -> TestResult {
1378 let room_id = room_id!("!test_send_queue:localhost");
1379
1380 let events = self.load_send_queue_requests(room_id).await?;
1382 assert!(events.is_empty());
1383
1384 let txn0 = TransactionId::new();
1386 let event0 =
1387 SerializableEventContent::new(&RoomMessageEventContent::text_plain("msg0").into())?;
1388 self.save_send_queue_request(
1389 room_id,
1390 txn0.clone(),
1391 MilliSecondsSinceUnixEpoch::now(),
1392 event0.into(),
1393 0,
1394 )
1395 .await?;
1396
1397 let pending = self.load_send_queue_requests(room_id).await?;
1399
1400 assert_eq!(pending.len(), 1);
1401 {
1402 assert_eq!(pending[0].transaction_id, txn0);
1403
1404 let deserialized = pending[0].as_event().unwrap().deserialize()?;
1405 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1406 assert_eq!(content.body(), "msg0");
1407
1408 assert!(!pending[0].is_wedged());
1409 }
1410
1411 for i in 1..=3 {
1413 let txn = TransactionId::new();
1414 let event = SerializableEventContent::new(
1415 &RoomMessageEventContent::text_plain(format!("msg{i}")).into(),
1416 )?;
1417
1418 self.save_send_queue_request(
1419 room_id,
1420 txn,
1421 MilliSecondsSinceUnixEpoch::now(),
1422 event.into(),
1423 0,
1424 )
1425 .await?;
1426 }
1427
1428 let pending = self.load_send_queue_requests(room_id).await?;
1430
1431 assert_eq!(pending.len(), 4);
1433
1434 assert_eq!(pending[0].transaction_id, txn0);
1435
1436 for i in 0..4 {
1437 let deserialized = pending[i].as_event().unwrap().deserialize()?;
1438 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1439 assert_eq!(content.body(), format!("msg{i}"));
1440 assert!(!pending[i].is_wedged());
1441 }
1442
1443 let txn2 = &pending[2].transaction_id;
1445 self.update_send_queue_request_status(
1446 room_id,
1447 txn2,
1448 Some(QueueWedgeError::GenericApiError { msg: "Oops".to_owned() }),
1449 )
1450 .await?;
1451
1452 let pending = self.load_send_queue_requests(room_id).await?;
1454
1455 assert_eq!(pending.len(), 4);
1457 assert_eq!(pending[0].transaction_id, txn0);
1458 assert_eq!(pending[2].transaction_id, *txn2);
1459 assert!(pending[2].is_wedged());
1460 let error = pending[2].clone().error.unwrap();
1461 let generic_error = assert_matches!(error, QueueWedgeError::GenericApiError { msg } => msg);
1462 assert_eq!(generic_error, "Oops");
1463 for i in 0..4 {
1464 if i != 2 {
1465 assert!(!pending[i].is_wedged());
1466 }
1467 }
1468
1469 let event0 = SerializableEventContent::new(
1471 &RoomMessageEventContent::text_plain("wow that's a cool test").into(),
1472 )?;
1473 self.update_send_queue_request(room_id, txn2, event0.into()).await?;
1474
1475 let pending = self.load_send_queue_requests(room_id).await?;
1477
1478 assert_eq!(pending.len(), 4);
1479 {
1480 assert_eq!(pending[2].transaction_id, *txn2);
1481
1482 let deserialized = pending[2].as_event().unwrap().deserialize()?;
1483 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1484 assert_eq!(content.body(), "wow that's a cool test");
1485
1486 assert!(!pending[2].is_wedged());
1487
1488 for i in 0..4 {
1489 if i != 2 {
1490 let deserialized = pending[i].as_event().unwrap().deserialize()?;
1491 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1492 assert_eq!(content.body(), format!("msg{i}"));
1493
1494 assert!(!pending[i].is_wedged());
1495 }
1496 }
1497 }
1498
1499 self.remove_send_queue_request(room_id, &txn0).await?;
1501
1502 let pending = self.load_send_queue_requests(room_id).await?;
1504
1505 assert_eq!(pending.len(), 3);
1506 assert_eq!(pending[1].transaction_id, *txn2);
1507 for i in 0..3 {
1508 assert_ne!(pending[i].transaction_id, txn0);
1509 }
1510
1511 let room_id2 = room_id!("!test_send_queue_two:localhost");
1516 {
1517 let txn = TransactionId::new();
1518 let event = SerializableEventContent::new(
1519 &RoomMessageEventContent::text_plain("room2").into(),
1520 )?;
1521 self.save_send_queue_request(
1522 room_id2,
1523 txn.clone(),
1524 MilliSecondsSinceUnixEpoch::now(),
1525 event.into(),
1526 0,
1527 )
1528 .await?;
1529 }
1530
1531 {
1533 let room_id3 = room_id!("!test_send_queue_three:localhost");
1534 let txn = TransactionId::new();
1535 let event = SerializableEventContent::new(
1536 &RoomMessageEventContent::text_plain("room3").into(),
1537 )?;
1538 self.save_send_queue_request(
1539 room_id3,
1540 txn.clone(),
1541 MilliSecondsSinceUnixEpoch::now(),
1542 event.into(),
1543 0,
1544 )
1545 .await?;
1546
1547 self.remove_send_queue_request(room_id3, &txn).await?;
1548 }
1549
1550 let outstanding_rooms = self.load_rooms_with_unsent_requests().await?;
1553 assert_eq!(outstanding_rooms.len(), 2);
1554 assert!(outstanding_rooms.iter().any(|room| room == room_id));
1555 assert!(outstanding_rooms.iter().any(|room| room == room_id2));
1556
1557 Ok(())
1558 }
1559
1560 async fn test_send_queue_priority(&self) -> TestResult {
1561 let room_id = room_id!("!test_send_queue:localhost");
1562
1563 let events = self.load_send_queue_requests(room_id).await?;
1565 assert!(events.is_empty());
1566
1567 let low0_txn = TransactionId::new();
1569 let ev0 =
1570 SerializableEventContent::new(&RoomMessageEventContent::text_plain("low0").into())?;
1571 self.save_send_queue_request(
1572 room_id,
1573 low0_txn.clone(),
1574 MilliSecondsSinceUnixEpoch::now(),
1575 ev0.into(),
1576 2,
1577 )
1578 .await?;
1579
1580 let high_txn = TransactionId::new();
1582 let ev1 =
1583 SerializableEventContent::new(&RoomMessageEventContent::text_plain("high").into())?;
1584 self.save_send_queue_request(
1585 room_id,
1586 high_txn.clone(),
1587 MilliSecondsSinceUnixEpoch::now(),
1588 ev1.into(),
1589 10,
1590 )
1591 .await?;
1592
1593 let low1_txn = TransactionId::new();
1595 let ev2 =
1596 SerializableEventContent::new(&RoomMessageEventContent::text_plain("low1").into())?;
1597 self.save_send_queue_request(
1598 room_id,
1599 low1_txn.clone(),
1600 MilliSecondsSinceUnixEpoch::now(),
1601 ev2.into(),
1602 2,
1603 )
1604 .await?;
1605
1606 let pending = self.load_send_queue_requests(room_id).await?;
1609
1610 assert_eq!(pending.len(), 3);
1611 {
1612 assert_eq!(pending[0].transaction_id, high_txn);
1613
1614 let deserialized = pending[0].as_event().unwrap().deserialize()?;
1615 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1616 assert_eq!(content.body(), "high");
1617 }
1618
1619 {
1620 assert_eq!(pending[1].transaction_id, low0_txn);
1621
1622 let deserialized = pending[1].as_event().unwrap().deserialize()?;
1623 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1624 assert_eq!(content.body(), "low0");
1625 }
1626
1627 {
1628 assert_eq!(pending[2].transaction_id, low1_txn);
1629
1630 let deserialized = pending[2].as_event().unwrap().deserialize()?;
1631 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1632 assert_eq!(content.body(), "low1");
1633 }
1634
1635 Ok(())
1636 }
1637
1638 async fn test_send_queue_dependents(&self) -> TestResult {
1639 let room_id = room_id!("!test_send_queue_dependents:localhost");
1640
1641 let txn0 = TransactionId::new();
1643 let event0 =
1644 SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into())?;
1645 self.save_send_queue_request(
1646 room_id,
1647 txn0.clone(),
1648 MilliSecondsSinceUnixEpoch::now(),
1649 event0.clone().into(),
1650 0,
1651 )
1652 .await?;
1653
1654 assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1656
1657 let child_txn = ChildTransactionId::new();
1659 self.save_dependent_queued_request(
1660 room_id,
1661 &txn0,
1662 child_txn.clone(),
1663 MilliSecondsSinceUnixEpoch::now(),
1664 DependentQueuedRequestKind::RedactEvent,
1665 )
1666 .await?;
1667
1668 let dependents = self.load_dependent_queued_requests(room_id).await?;
1670 assert_eq!(dependents.len(), 1);
1671 assert_eq!(dependents[0].parent_transaction_id, txn0);
1672 assert_eq!(dependents[0].own_transaction_id, child_txn);
1673 assert!(dependents[0].parent_key.is_none());
1674 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1675
1676 let (event, event_type) = event0.raw();
1678 let event_id = owned_event_id!("$1");
1679 let num_updated = self
1680 .mark_dependent_queued_requests_as_ready(
1681 room_id,
1682 &txn0,
1683 SentRequestKey::Event {
1684 event_id: event_id.clone(),
1685 event: event.clone(),
1686 event_type: event_type.to_owned(),
1687 },
1688 )
1689 .await?;
1690 assert_eq!(num_updated, 1);
1691
1692 let dependents = self.load_dependent_queued_requests(room_id).await?;
1694 assert_eq!(dependents.len(), 1);
1695 assert_eq!(dependents[0].parent_transaction_id, txn0);
1696 assert_eq!(dependents[0].own_transaction_id, child_txn);
1697 assert_matches!(
1698 dependents[0].parent_key.as_ref(),
1699 Some(SentRequestKey::Event {
1700 event_id: received_event_id,
1701 event: received_event,
1702 event_type: received_event_type
1703 }) => {
1704 assert_eq!(received_event_id, &event_id);
1705 assert_eq!(received_event.json().to_string(), event.json().to_string());
1706 assert_eq!(received_event_type.as_str(), event_type);
1707 }
1708 );
1709 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1710
1711 let removed = self
1713 .remove_dependent_queued_request(room_id, &dependents[0].own_transaction_id)
1714 .await?;
1715 assert!(removed);
1716
1717 assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1719
1720 let txn1 = TransactionId::new();
1723 let event1 =
1724 SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into())?;
1725 self.save_send_queue_request(
1726 room_id,
1727 txn1.clone(),
1728 MilliSecondsSinceUnixEpoch::now(),
1729 event1.into(),
1730 0,
1731 )
1732 .await?;
1733
1734 self.save_dependent_queued_request(
1735 room_id,
1736 &txn0,
1737 ChildTransactionId::new(),
1738 MilliSecondsSinceUnixEpoch::now(),
1739 DependentQueuedRequestKind::RedactEvent,
1740 )
1741 .await?;
1742 assert_eq!(self.load_dependent_queued_requests(room_id).await?.len(), 1);
1743
1744 self.save_dependent_queued_request(
1745 room_id,
1746 &txn1,
1747 ChildTransactionId::new(),
1748 MilliSecondsSinceUnixEpoch::now(),
1749 DependentQueuedRequestKind::EditEvent {
1750 new_content: SerializableEventContent::new(
1751 &RoomMessageEventContent::text_plain("edit").into(),
1752 )?,
1753 },
1754 )
1755 .await?;
1756 assert_eq!(self.load_dependent_queued_requests(room_id).await?.len(), 2);
1757
1758 let removed = self.remove_send_queue_request(room_id, &txn0).await?;
1760 assert!(removed);
1761
1762 let dependents = self.load_dependent_queued_requests(room_id).await?;
1764 assert_eq!(dependents.len(), 2);
1765
1766 Ok(())
1767 }
1768
1769 async fn test_update_send_queue_dependent(&self) -> TestResult {
1770 let room_id = room_id!("!test_send_queue_dependents:localhost");
1771
1772 let txn = TransactionId::new();
1773
1774 let child_txn = ChildTransactionId::new();
1776
1777 self.save_dependent_queued_request(
1778 room_id,
1779 &txn,
1780 child_txn.clone(),
1781 MilliSecondsSinceUnixEpoch::now(),
1782 DependentQueuedRequestKind::RedactEvent,
1783 )
1784 .await?;
1785
1786 let dependents = self.load_dependent_queued_requests(room_id).await?;
1788 assert_eq!(dependents.len(), 1);
1789 assert_eq!(dependents[0].parent_transaction_id, txn);
1790 assert_eq!(dependents[0].own_transaction_id, child_txn);
1791 assert!(dependents[0].parent_key.is_none());
1792 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1793
1794 self.update_dependent_queued_request(
1796 room_id,
1797 &child_txn,
1798 DependentQueuedRequestKind::ReactEvent { key: "👍".to_owned() },
1799 )
1800 .await?;
1801
1802 let dependents = self.load_dependent_queued_requests(room_id).await?;
1804 assert_eq!(dependents.len(), 1);
1805 assert_eq!(dependents[0].parent_transaction_id, txn);
1806 assert_eq!(dependents[0].own_transaction_id, child_txn);
1807 assert!(dependents[0].parent_key.is_none());
1808 assert_matches!(
1809 &dependents[0].kind,
1810 DependentQueuedRequestKind::ReactEvent { key } => {
1811 assert_eq!(key, "👍");
1812 }
1813 );
1814
1815 Ok(())
1816 }
1817
1818 async fn test_get_room_infos(&self) -> TestResult {
1819 let room_id_0 = room_id!("!r0");
1820 let room_id_1 = room_id!("!r1");
1821 let room_id_2 = room_id!("!r2");
1822
1823 {
1825 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 0);
1826 }
1827
1828 let mut changes = StateChanges::default();
1830 changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
1831 changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
1832 self.save_changes(&changes).await?;
1833
1834 {
1836 let mut all_rooms = self.get_room_infos(&RoomLoadSettings::All).await?;
1837
1838 all_rooms.sort_by(|a, b| a.room_id.cmp(&b.room_id));
1841
1842 assert_eq!(all_rooms.len(), 2);
1843 assert_eq!(all_rooms[0].room_id, room_id_0);
1844 assert_eq!(all_rooms[1].room_id, room_id_1);
1845 }
1846
1847 {
1849 let all_rooms =
1850 self.get_room_infos(&RoomLoadSettings::One(room_id_1.to_owned())).await?;
1851
1852 assert_eq!(all_rooms.len(), 1);
1853 assert_eq!(all_rooms[0].room_id, room_id_1);
1854 }
1855
1856 {
1859 let all_rooms =
1860 self.get_room_infos(&RoomLoadSettings::One(room_id_2.to_owned())).await?;
1861
1862 assert_eq!(all_rooms.len(), 0);
1863 }
1864
1865 Ok(())
1866 }
1867
1868 async fn test_thread_subscriptions(&self) -> TestResult {
1869 let first_thread = event_id!("$t1");
1870 let second_thread = event_id!("$t2");
1871
1872 let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1874 assert!(maybe_sub.is_none());
1875
1876 let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1877 assert!(maybe_sub.is_none());
1878
1879 self.upsert_thread_subscriptions(vec![(
1881 room_id(),
1882 first_thread,
1883 StoredThreadSubscription {
1884 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1885 bump_stamp: None,
1886 },
1887 )])
1888 .await?;
1889
1890 self.upsert_thread_subscriptions(vec![(
1891 room_id(),
1892 second_thread,
1893 StoredThreadSubscription {
1894 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1895 bump_stamp: None,
1896 },
1897 )])
1898 .await?;
1899
1900 let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1902 assert_eq!(
1903 maybe_sub,
1904 Some(StoredThreadSubscription {
1905 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1906 bump_stamp: None,
1907 })
1908 );
1909
1910 let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1911 assert_eq!(
1912 maybe_sub,
1913 Some(StoredThreadSubscription {
1914 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1915 bump_stamp: None,
1916 })
1917 );
1918
1919 self.upsert_thread_subscriptions(vec![(
1921 room_id(),
1922 first_thread,
1923 StoredThreadSubscription {
1924 status: ThreadSubscriptionStatus::Unsubscribed,
1925 bump_stamp: None,
1926 },
1927 )])
1928 .await?;
1929
1930 let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1932 assert_eq!(
1933 maybe_sub,
1934 Some(StoredThreadSubscription {
1935 status: ThreadSubscriptionStatus::Unsubscribed,
1936 bump_stamp: None,
1937 })
1938 );
1939
1940 let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1942 assert_eq!(
1943 maybe_sub,
1944 Some(StoredThreadSubscription {
1945 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1946 bump_stamp: None,
1947 })
1948 );
1949
1950 self.remove_thread_subscription(room_id(), second_thread).await?;
1952
1953 let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1955 assert_eq!(maybe_sub, None);
1956
1957 let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1959 assert_eq!(
1960 maybe_sub,
1961 Some(StoredThreadSubscription {
1962 status: ThreadSubscriptionStatus::Unsubscribed,
1963 bump_stamp: None,
1964 })
1965 );
1966
1967 self.remove_thread_subscription(room_id(), second_thread).await?;
1969
1970 Ok(())
1971 }
1972
1973 async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult {
1974 let threads = [
1975 event_id!("$t1"),
1976 event_id!("$t2"),
1977 event_id!("$t3"),
1978 event_id!("$t4"),
1979 event_id!("$t5"),
1980 event_id!("$t6"),
1981 ];
1982 let build_subscription_updates = |subs: &[StoredThreadSubscription]| {
1985 threads
1986 .iter()
1987 .zip(subs)
1988 .map(|(&event_id, &sub)| (room_id(), event_id, sub))
1989 .collect::<Vec<_>>()
1990 };
1991
1992 let initial_subscriptions = build_subscription_updates(&[
1994 StoredThreadSubscription {
1995 status: ThreadSubscriptionStatus::Unsubscribed,
1996 bump_stamp: None,
1997 },
1998 StoredThreadSubscription {
1999 status: ThreadSubscriptionStatus::Unsubscribed,
2000 bump_stamp: Some(14),
2001 },
2002 StoredThreadSubscription {
2003 status: ThreadSubscriptionStatus::Unsubscribed,
2004 bump_stamp: None,
2005 },
2006 StoredThreadSubscription {
2007 status: ThreadSubscriptionStatus::Unsubscribed,
2008 bump_stamp: Some(210),
2009 },
2010 StoredThreadSubscription {
2011 status: ThreadSubscriptionStatus::Unsubscribed,
2012 bump_stamp: Some(5),
2013 },
2014 StoredThreadSubscription {
2015 status: ThreadSubscriptionStatus::Unsubscribed,
2016 bump_stamp: Some(100),
2017 },
2018 ]);
2019
2020 let update_subscriptions = build_subscription_updates(&[
2021 StoredThreadSubscription {
2022 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2023 bump_stamp: None,
2024 },
2025 StoredThreadSubscription {
2026 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2027 bump_stamp: None,
2028 },
2029 StoredThreadSubscription {
2030 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2031 bump_stamp: Some(1101),
2032 },
2033 StoredThreadSubscription {
2034 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2035 bump_stamp: Some(222),
2036 },
2037 StoredThreadSubscription {
2038 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2039 bump_stamp: Some(1),
2040 },
2041 StoredThreadSubscription {
2042 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2043 bump_stamp: Some(100),
2044 },
2045 ]);
2046
2047 let expected_subscriptions = build_subscription_updates(&[
2048 StoredThreadSubscription {
2050 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2051 bump_stamp: None,
2052 },
2053 StoredThreadSubscription {
2055 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2056 bump_stamp: Some(14),
2057 },
2058 StoredThreadSubscription {
2060 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2061 bump_stamp: Some(1101),
2062 },
2063 StoredThreadSubscription {
2065 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2066 bump_stamp: Some(222),
2067 },
2068 StoredThreadSubscription {
2070 status: ThreadSubscriptionStatus::Unsubscribed,
2071 bump_stamp: Some(5),
2072 },
2073 StoredThreadSubscription {
2075 status: ThreadSubscriptionStatus::Unsubscribed,
2076 bump_stamp: Some(100),
2077 },
2078 ]);
2079
2080 self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?;
2082
2083 for (room_id, thread_id, expected_sub) in &initial_subscriptions {
2085 let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2086 assert_eq!(stored_subscription, Some(*expected_sub));
2087 }
2088
2089 self.upsert_thread_subscriptions(update_subscriptions).await?;
2091
2092 for (room_id, thread_id, expected_sub) in &expected_subscriptions {
2094 let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2095 assert_eq!(stored_subscription, Some(*expected_sub));
2096 }
2097
2098 for (room_id, thread_id, _) in &expected_subscriptions {
2100 self.remove_thread_subscription(room_id, thread_id).await?;
2101 }
2102
2103 let initial_subscriptions = build_subscription_updates(&[
2104 StoredThreadSubscription {
2105 status: ThreadSubscriptionStatus::Unsubscribed,
2106 bump_stamp: Some(1),
2107 },
2108 StoredThreadSubscription {
2109 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
2110 bump_stamp: Some(1),
2111 },
2112 StoredThreadSubscription {
2113 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2114 bump_stamp: Some(1),
2115 },
2116 ]);
2117
2118 self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?;
2119
2120 for (room_id, thread_id, expected_sub) in &initial_subscriptions {
2121 let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2122 assert_eq!(stored_subscription, Some(*expected_sub));
2123 }
2124
2125 let update_subscriptions = build_subscription_updates(&[
2126 StoredThreadSubscription {
2127 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2128 bump_stamp: Some(2),
2129 },
2130 StoredThreadSubscription {
2131 status: ThreadSubscriptionStatus::Unsubscribed,
2132 bump_stamp: Some(2),
2133 },
2134 StoredThreadSubscription {
2135 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
2136 bump_stamp: Some(2),
2137 },
2138 ]);
2139
2140 self.upsert_thread_subscriptions(update_subscriptions.clone()).await?;
2141
2142 for (room_id, thread_id, expected_sub) in &update_subscriptions {
2143 let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2144 assert_eq!(stored_subscription, Some(*expected_sub));
2145 }
2146
2147 Ok(())
2148 }
2149}
2150
2151#[allow(unused_macros, unused_extern_crates)]
2177#[macro_export]
2178macro_rules! statestore_integration_tests {
2179 () => {
2180 mod statestore_integration_tests {
2181 use matrix_sdk_test::{TestResult, async_test};
2182 use $crate::store::{IntoStateStore, StateStoreIntegrationTests};
2183
2184 use super::get_store;
2185
2186 #[async_test]
2187 async fn test_topic_redaction() -> TestResult {
2188 let store = get_store().await?.into_state_store();
2189 store.test_topic_redaction().await
2190 }
2191
2192 #[async_test]
2193 async fn test_populate_store() -> TestResult {
2194 let store = get_store().await?.into_state_store();
2195 store.test_populate_store().await
2196 }
2197
2198 #[async_test]
2199 async fn test_member_saving() -> TestResult {
2200 let store = get_store().await?.into_state_store();
2201 store.test_member_saving().await
2202 }
2203
2204 #[async_test]
2205 async fn test_filter_saving() -> TestResult {
2206 let store = get_store().await?.into_state_store();
2207 store.test_filter_saving().await
2208 }
2209
2210 #[async_test]
2211 async fn test_user_avatar_url_saving() -> TestResult {
2212 let store = get_store().await?.into_state_store();
2213 store.test_user_avatar_url_saving().await
2214 }
2215
2216 #[async_test]
2217 async fn test_supported_versions_saving() -> TestResult {
2218 let store = get_store().await?.into_state_store();
2219 store.test_supported_versions_saving().await
2220 }
2221
2222 #[async_test]
2223 async fn test_well_known_saving() -> TestResult {
2224 let store = get_store().await?.into_state_store();
2225 store.test_well_known_saving().await
2226 }
2227
2228 #[async_test]
2229 async fn test_sync_token_saving() -> TestResult {
2230 let store = get_store().await?.into_state_store();
2231 store.test_sync_token_saving().await
2232 }
2233
2234 #[async_test]
2235 async fn test_utd_hook_manager_data_saving() -> TestResult {
2236 let store = get_store().await?.into_state_store();
2237 store.test_utd_hook_manager_data_saving().await
2238 }
2239
2240 #[async_test]
2241 async fn test_one_time_key_already_uploaded_data_saving() -> TestResult {
2242 let store = get_store().await?.into_state_store();
2243 store.test_one_time_key_already_uploaded_data_saving().await
2244 }
2245
2246 #[async_test]
2247 async fn test_stripped_member_saving() -> TestResult {
2248 let store = get_store().await?.into_state_store();
2249 store.test_stripped_member_saving().await
2250 }
2251
2252 #[async_test]
2253 async fn test_power_level_saving() -> TestResult {
2254 let store = get_store().await?.into_state_store();
2255 store.test_power_level_saving().await
2256 }
2257
2258 #[async_test]
2259 async fn test_receipts_saving() -> TestResult {
2260 let store = get_store().await?.into_state_store();
2261 store.test_receipts_saving().await
2262 }
2263
2264 #[async_test]
2265 async fn test_custom_storage() -> TestResult {
2266 let store = get_store().await?.into_state_store();
2267 store.test_custom_storage().await
2268 }
2269
2270 #[async_test]
2271 async fn test_stripped_non_stripped() -> TestResult {
2272 let store = get_store().await?.into_state_store();
2273 store.test_stripped_non_stripped().await
2274 }
2275
2276 #[async_test]
2277 async fn test_room_removal() -> TestResult {
2278 let store = get_store().await?.into_state_store();
2279 store.test_room_removal().await
2280 }
2281
2282 #[async_test]
2283 async fn test_profile_removal() -> TestResult {
2284 let store = get_store().await?.into_state_store();
2285 store.test_profile_removal().await
2286 }
2287
2288 #[async_test]
2289 async fn test_presence_saving() -> TestResult {
2290 let store = get_store().await?.into_state_store();
2291 store.test_presence_saving().await
2292 }
2293
2294 #[async_test]
2295 async fn test_display_names_saving() -> TestResult {
2296 let store = get_store().await?.into_state_store();
2297 store.test_display_names_saving().await
2298 }
2299
2300 #[async_test]
2301 async fn test_send_queue() -> TestResult {
2302 let store = get_store().await?.into_state_store();
2303 store.test_send_queue().await
2304 }
2305
2306 #[async_test]
2307 async fn test_send_queue_priority() -> TestResult {
2308 let store = get_store().await?.into_state_store();
2309 store.test_send_queue_priority().await
2310 }
2311
2312 #[async_test]
2313 async fn test_send_queue_dependents() -> TestResult {
2314 let store = get_store().await?.into_state_store();
2315 store.test_send_queue_dependents().await
2316 }
2317
2318 #[async_test]
2319 async fn test_update_send_queue_dependent() -> TestResult {
2320 let store = get_store().await?.into_state_store();
2321 store.test_update_send_queue_dependent().await
2322 }
2323
2324 #[async_test]
2325 async fn test_get_room_infos() -> TestResult {
2326 let store = get_store().await?.into_state_store();
2327 store.test_get_room_infos().await
2328 }
2329
2330 #[async_test]
2331 async fn test_thread_subscriptions() -> TestResult {
2332 let store = get_store().await?.into_state_store();
2333 store.test_thread_subscriptions().await
2334 }
2335
2336 #[async_test]
2337 async fn test_thread_subscriptions_bulk_upsert() -> TestResult {
2338 let store = get_store().await?.into_state_store();
2339 store.test_thread_subscriptions_bulk_upsert().await
2340 }
2341 }
2342 };
2343}
2344
2345fn user_id() -> &'static UserId {
2346 user_id!("@example:localhost")
2347}
2348
2349fn invited_user_id() -> &'static UserId {
2350 user_id!("@invited:localhost")
2351}
2352
2353fn room_id() -> &'static RoomId {
2354 room_id!("!test:localhost")
2355}
2356
2357fn stripped_room_id() -> &'static RoomId {
2358 room_id!("!stripped:localhost")
2359}
2360
2361fn first_receipt_event_id() -> &'static EventId {
2362 event_id!("$example")
2363}
2364
2365fn topic_event_id() -> &'static EventId {
2366 event_id!("$topic_event")
2367}
2368
2369fn power_level_event() -> Raw<AnySyncStateEvent> {
2370 let content = RoomPowerLevelsEventContent::new(&AuthorizationRules::V1);
2371
2372 let event = json!({
2373 "event_id": "$h29iv0s8:example.com",
2374 "content": content,
2375 "sender": user_id(),
2376 "type": "m.room.power_levels",
2377 "origin_server_ts": 0u64,
2378 "state_key": "",
2379 });
2380
2381 serde_json::from_value(event).unwrap()
2382}
2383
2384fn stripped_membership_event() -> Raw<StrippedRoomMemberEvent> {
2385 custom_stripped_membership_event(user_id())
2386}
2387
2388fn custom_stripped_membership_event(user_id: &UserId) -> Raw<StrippedRoomMemberEvent> {
2389 let ev_json = json!({
2390 "type": "m.room.member",
2391 "content": RoomMemberEventContent::new(MembershipState::Join),
2392 "sender": user_id,
2393 "state_key": user_id,
2394 });
2395
2396 Raw::new(&ev_json).unwrap().cast_unchecked()
2397}
2398
2399fn membership_event() -> Raw<SyncRoomMemberEvent> {
2400 custom_membership_event(user_id(), event_id!("$h29iv0s8:example.com"))
2401}
2402
2403fn custom_membership_event(user_id: &UserId, event_id: &EventId) -> Raw<SyncRoomMemberEvent> {
2404 let ev_json = json!({
2405 "type": "m.room.member",
2406 "content": RoomMemberEventContent::new(MembershipState::Join),
2407 "event_id": event_id,
2408 "origin_server_ts": 198,
2409 "sender": user_id,
2410 "state_key": user_id,
2411 });
2412
2413 Raw::new(&ev_json).unwrap().cast_unchecked()
2414}
2415
2416fn custom_presence_event(user_id: &UserId) -> Raw<PresenceEvent> {
2417 let ev_json = json!({
2418 "content": {
2419 "presence": "online"
2420 },
2421 "sender": user_id,
2422 });
2423
2424 Raw::new(&ev_json).unwrap().cast_unchecked()
2425}