1use std::collections::{BTreeMap, BTreeSet, HashMap};
4
5use assert_matches::assert_matches;
6use assert_matches2::assert_let;
7use growable_bloom_filter::GrowableBloomBuilder;
8use matrix_sdk_test::{TestResult, event_factory::EventFactory, test_json};
9use ruma::{
10 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId, RoomId, TransactionId, UserId,
11 api::{
12 FeatureFlag, MatrixVersion,
13 client::discovery::discover_homeserver::{HomeserverInfo, RtcFocusInfo},
14 },
15 event_id,
16 events::{
17 AnyGlobalAccountDataEvent, AnyMessageLikeEventContent, AnyRoomAccountDataEvent,
18 AnyStrippedStateEvent, AnySyncStateEvent, GlobalAccountDataEventType,
19 RoomAccountDataEventType, StateEventType, SyncStateEvent,
20 presence::PresenceEvent,
21 receipt::{ReceiptThread, ReceiptType},
22 room::{
23 member::{
24 MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent,
25 SyncRoomMemberEvent,
26 },
27 message::RoomMessageEventContent,
28 power_levels::RoomPowerLevelsEventContent,
29 topic::RoomTopicEventContent,
30 },
31 },
32 owned_event_id, owned_mxc_uri,
33 push::Ruleset,
34 room_id,
35 room_version_rules::AuthorizationRules,
36 serde::Raw,
37 uint, user_id,
38};
39use serde_json::{json, value::Value as JsonValue};
40
41use super::{
42 DependentQueuedRequestKind, DisplayName, DynStateStore, RoomLoadSettings,
43 SupportedVersionsResponse, TtlStoreValue, WellKnownResponse, send_queue::SentRequestKey,
44};
45use crate::{
46 RoomInfo, RoomMemberships, RoomState, StateChanges, StateStoreDataKey, StateStoreDataValue,
47 deserialized_responses::MemberEvent,
48 store::{
49 ChildTransactionId, QueueWedgeError, SerializableEventContent, StateStoreExt,
50 StoredThreadSubscription, ThreadSubscriptionStatus,
51 },
52};
53
54#[allow(async_fn_in_trait)]
59pub trait StateStoreIntegrationTests {
60 async fn populate(&self) -> TestResult;
62 async fn test_topic_redaction(&self) -> TestResult;
64 async fn test_populate_store(&self) -> TestResult;
66 async fn test_member_saving(&self) -> TestResult;
68 async fn test_filter_saving(&self) -> TestResult;
70 async fn test_user_avatar_url_saving(&self) -> TestResult;
72 async fn test_sync_token_saving(&self) -> TestResult;
74 async fn test_utd_hook_manager_data_saving(&self) -> TestResult;
76 async fn test_one_time_key_already_uploaded_data_saving(&self) -> TestResult;
78 async fn test_stripped_member_saving(&self) -> TestResult;
80 async fn test_power_level_saving(&self) -> TestResult;
82 async fn test_receipts_saving(&self) -> TestResult;
84 async fn test_custom_storage(&self) -> TestResult;
86 async fn test_stripped_non_stripped(&self) -> TestResult;
88 async fn test_room_removal(&self) -> TestResult;
90 async fn test_profile_removal(&self) -> TestResult;
92 async fn test_presence_saving(&self) -> TestResult;
94 async fn test_display_names_saving(&self) -> TestResult;
96 async fn test_send_queue(&self) -> TestResult;
98 async fn test_send_queue_priority(&self) -> TestResult;
100 async fn test_send_queue_dependents(&self) -> TestResult;
102 async fn test_update_send_queue_dependent(&self) -> TestResult;
104 async fn test_supported_versions_saving(&self) -> TestResult;
106 async fn test_well_known_saving(&self) -> TestResult;
108 async fn test_get_room_infos(&self) -> TestResult;
110 async fn test_thread_subscriptions(&self) -> TestResult;
112 async fn test_thread_subscriptions_bumpstamps(&self) -> TestResult;
114 async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult;
116}
117
118impl StateStoreIntegrationTests for DynStateStore {
119 async fn populate(&self) -> TestResult {
120 let f = EventFactory::new();
121 let mut changes = StateChanges::default();
122
123 let user_id = user_id();
124 let invited_user_id = invited_user_id();
125 let room_id = room_id();
126 let stripped_room_id = stripped_room_id();
127
128 changes.sync_token = Some("t392-516_47314_0_7_1_1_1_11444_1".to_owned());
129
130 let presence_json: &JsonValue = &test_json::PRESENCE;
131 let presence_raw = serde_json::from_value::<Raw<PresenceEvent>>(presence_json.clone())?;
132 let presence_event = presence_raw.deserialize()?;
133 changes.add_presence_event(presence_event, presence_raw);
134
135 let pushrules_raw: Raw<AnyGlobalAccountDataEvent> =
136 f.push_rules(Ruleset::server_default(user_id)).into();
137 let pushrules_event = pushrules_raw.deserialize()?;
138 changes.account_data.insert(pushrules_event.event_type(), pushrules_raw);
139
140 let mut room = RoomInfo::new(room_id, RoomState::Joined);
141 room.mark_as_left();
142
143 let tag_json: &JsonValue = &test_json::TAG;
144 let tag_raw = serde_json::from_value::<Raw<AnyRoomAccountDataEvent>>(tag_json.clone())?;
145 let tag_event = tag_raw.deserialize()?;
146 changes.add_room_account_data(room_id, tag_event, tag_raw);
147
148 let name_json: &JsonValue = &test_json::NAME;
149 let name_raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(name_json.clone())?;
150 let name_event = name_raw.deserialize()?;
151 room.handle_state_event(&name_event);
152 changes.add_state_event(room_id, name_event, name_raw);
153
154 let topic_json: &JsonValue = &test_json::TOPIC;
155 let topic_raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(topic_json.clone())?;
156 let topic_event = topic_raw.deserialize()?;
157 room.handle_state_event(&topic_event);
158 changes.add_state_event(room_id, topic_event, topic_raw);
159
160 let mut room_ambiguity_map = HashMap::new();
161 let mut room_profiles = BTreeMap::new();
162
163 let member_json: &JsonValue = &test_json::MEMBER;
164 let member_event: SyncRoomMemberEvent = serde_json::from_value(member_json.clone())?;
165 let displayname = DisplayName::new(
166 member_event.as_original().unwrap().content.displayname.as_ref().unwrap(),
167 );
168 room_ambiguity_map.insert(displayname.clone(), BTreeSet::from([user_id.to_owned()]));
169 room_profiles.insert(user_id.to_owned(), (&member_event).into());
170
171 let member_state_raw =
172 serde_json::from_value::<Raw<AnySyncStateEvent>>(member_json.clone())?;
173 let member_state_event = member_state_raw.deserialize()?;
174 changes.add_state_event(room_id, member_state_event, member_state_raw);
175
176 let invited_member_json: &JsonValue = &test_json::MEMBER_INVITE;
177 let invited_member_event: SyncRoomMemberEvent =
179 serde_json::from_value(invited_member_json.clone())?;
180 room_ambiguity_map.entry(displayname).or_default().insert(invited_user_id.to_owned());
181 room_profiles.insert(invited_user_id.to_owned(), (&invited_member_event).into());
182
183 let invited_member_state_raw =
184 serde_json::from_value::<Raw<AnySyncStateEvent>>(invited_member_json.clone())?;
185 let invited_member_state_event = invited_member_state_raw.deserialize()?;
186 changes.add_state_event(room_id, invited_member_state_event, invited_member_state_raw);
187
188 let receipt_content = f
189 .room(room_id)
190 .read_receipts()
191 .add(event_id!("$example"), user_id, ReceiptType::Read, ReceiptThread::Unthreaded)
192 .into_content();
193 changes.add_receipts(room_id, receipt_content);
194
195 changes.ambiguity_maps.insert(room_id.to_owned(), room_ambiguity_map);
196 changes.profiles.insert(room_id.to_owned(), room_profiles);
197 changes.add_room(room);
198
199 let mut stripped_room = RoomInfo::new(stripped_room_id, RoomState::Invited);
200
201 let stripped_name_json: &JsonValue = &test_json::NAME_STRIPPED;
202 let stripped_name_raw =
203 serde_json::from_value::<Raw<AnyStrippedStateEvent>>(stripped_name_json.clone())?;
204 let stripped_name_event = stripped_name_raw.deserialize()?;
205 stripped_room.handle_stripped_state_event(&stripped_name_event);
206 changes.stripped_state.insert(
207 stripped_room_id.to_owned(),
208 BTreeMap::from([(
209 stripped_name_event.event_type(),
210 BTreeMap::from([(
211 stripped_name_event.state_key().to_owned(),
212 stripped_name_raw.clone(),
213 )]),
214 )]),
215 );
216
217 changes.add_room(stripped_room);
218
219 let stripped_member_json: &JsonValue = &test_json::MEMBER_STRIPPED;
220 let stripped_member_event = Raw::new(&stripped_member_json.clone())?.cast_unchecked();
221 changes.add_stripped_member(stripped_room_id, user_id, stripped_member_event);
222
223 self.save_changes(&changes).await?;
224
225 Ok(())
226 }
227
228 async fn test_topic_redaction(&self) -> TestResult {
229 let room_id = room_id();
230 self.populate().await?;
231
232 assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
233 assert_eq!(
234 self.get_state_event_static::<RoomTopicEventContent>(room_id)
235 .await?
236 .expect("room topic found before redaction")
237 .deserialize()
238 .expect("can deserialize room topic before redaction")
239 .as_sync()
240 .expect("room topic is a sync state event")
241 .as_original()
242 .expect("room topic is not redacted yet")
243 .content
244 .topic,
245 "😀"
246 );
247
248 let mut changes = StateChanges::default();
249
250 let redaction_json: &JsonValue = &test_json::TOPIC_REDACTION;
251 let redaction_evt: Raw<_> = serde_json::from_value(redaction_json.clone())
252 .expect("topic redaction event making works");
253 let redacted_event_id: OwnedEventId = redaction_evt.get_field("redacts")?.unwrap();
254
255 changes.add_redaction(room_id, &redacted_event_id, redaction_evt);
256 self.save_changes(&changes).await?;
257
258 let redacted_event = self
259 .get_state_event_static::<RoomTopicEventContent>(room_id)
260 .await?
261 .expect("room topic found after redaction")
262 .deserialize()
263 .expect("can deserialize room topic after redaction");
264
265 assert_matches!(redacted_event.as_sync(), Some(SyncStateEvent::Redacted(_)));
266
267 Ok(())
268 }
269
270 async fn test_populate_store(&self) -> TestResult {
271 let room_id = room_id();
272 let user_id = user_id();
273 let display_name = DisplayName::new("example");
274
275 self.populate().await?;
276
277 assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
278 assert!(self.get_presence_event(user_id).await?.is_some());
279 assert_eq!(
280 self.get_room_infos(&RoomLoadSettings::default()).await?.len(),
281 2,
282 "Expected to find 2 room infos"
283 );
284 assert!(
285 self.get_account_data_event(GlobalAccountDataEventType::PushRules).await?.is_some()
286 );
287
288 assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_some());
289 assert_eq!(
290 self.get_state_events(room_id, StateEventType::RoomTopic).await?.len(),
291 1,
292 "Expected to find 1 room topic"
293 );
294 assert!(self.get_profile(room_id, user_id).await?.is_some());
295 assert!(self.get_member_event(room_id, user_id).await?.is_some());
296 assert_eq!(
297 self.get_user_ids(room_id, RoomMemberships::empty()).await?.len(),
298 2,
299 "Expected to find 2 members for room"
300 );
301 assert_eq!(
302 self.get_user_ids(room_id, RoomMemberships::INVITE).await?.len(),
303 1,
304 "Expected to find 1 invited user ids"
305 );
306 assert_eq!(
307 self.get_user_ids(room_id, RoomMemberships::JOIN).await?.len(),
308 1,
309 "Expected to find 1 joined user ids"
310 );
311 assert_eq!(
312 self.get_users_with_display_name(room_id, &display_name).await?.len(),
313 2,
314 "Expected to find 2 display names for room"
315 );
316 assert!(
317 self.get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
318 .await?
319 .is_some()
320 );
321 assert!(
322 self.get_user_room_receipt_event(
323 room_id,
324 ReceiptType::Read,
325 ReceiptThread::Unthreaded,
326 user_id
327 )
328 .await?
329 .is_some()
330 );
331 assert_eq!(
332 self.get_event_room_receipt_events(
333 room_id,
334 ReceiptType::Read,
335 ReceiptThread::Unthreaded,
336 first_receipt_event_id()
337 )
338 .await?
339 .len(),
340 1,
341 "Expected to find 1 read receipt"
342 );
343 Ok(())
344 }
345
346 async fn test_member_saving(&self) -> TestResult {
347 let room_id = room_id!("!test_member_saving:localhost");
348 let user_id = user_id();
349 let second_user_id = user_id!("@second:localhost");
350 let third_user_id = user_id!("@third:localhost");
351 let unknown_user_id = user_id!("@unknown:localhost");
352
353 let mut user_ids = vec![user_id.to_owned()];
355 assert!(self.get_member_event(room_id, user_id).await?.is_none());
356 let member_events = self
357 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
358 .await;
359 assert!(member_events?.is_empty());
360 assert!(self.get_profile(room_id, user_id).await?.is_none());
361 let profiles = self.get_profiles(room_id, &user_ids).await;
362 assert!(profiles?.is_empty());
363
364 let mut changes = StateChanges::default();
366 let raw_member_event = membership_event();
367 let profile = raw_member_event.deserialize()?.into();
368 changes
369 .state
370 .entry(room_id.to_owned())
371 .or_default()
372 .entry(StateEventType::RoomMember)
373 .or_default()
374 .insert(user_id.into(), raw_member_event.cast());
375 changes.profiles.entry(room_id.to_owned()).or_default().insert(user_id.to_owned(), profile);
376 self.save_changes(&changes).await?;
377
378 assert!(self.get_member_event(room_id, user_id).await?.is_some());
379 let member_events = self
380 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
381 .await;
382 assert_eq!(member_events?.len(), 1);
383 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
384 assert_eq!(members.len(), 1, "We expected to find members for the room");
385 assert!(self.get_profile(room_id, user_id).await?.is_some());
386 let profiles = self.get_profiles(room_id, &user_ids).await;
387 assert_eq!(profiles?.len(), 1);
388
389 let mut changes = StateChanges::default();
391 let changes_members = changes
392 .state
393 .entry(room_id.to_owned())
394 .or_default()
395 .entry(StateEventType::RoomMember)
396 .or_default();
397 let changes_profiles = changes.profiles.entry(room_id.to_owned()).or_default();
398 let raw_second_member_event =
399 custom_membership_event(second_user_id, event_id!("$second_member_event"));
400 let second_profile = raw_second_member_event.deserialize()?.into();
401 changes_members.insert(second_user_id.into(), raw_second_member_event.cast());
402 changes_profiles.insert(second_user_id.to_owned(), second_profile);
403 let raw_third_member_event =
404 custom_membership_event(third_user_id, event_id!("$third_member_event"));
405 let third_profile = raw_third_member_event.deserialize()?.into();
406 changes_members.insert(third_user_id.into(), raw_third_member_event.cast());
407 changes_profiles.insert(third_user_id.to_owned(), third_profile);
408 self.save_changes(&changes).await?;
409
410 user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
411 assert!(self.get_member_event(room_id, second_user_id).await?.is_some());
412 assert!(self.get_member_event(room_id, third_user_id).await?.is_some());
413 let member_events = self
414 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
415 .await;
416 assert_eq!(member_events?.len(), 3);
417 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
418 assert_eq!(members.len(), 3, "We expected to find members for the room");
419 assert!(self.get_profile(room_id, second_user_id).await?.is_some());
420 assert!(self.get_profile(room_id, third_user_id).await?.is_some());
421 let profiles = self.get_profiles(room_id, &user_ids).await;
422 assert_eq!(profiles?.len(), 3);
423
424 user_ids.push(unknown_user_id.to_owned());
426 let member_events = self
427 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
428 .await;
429 assert_eq!(member_events?.len(), 3);
430 let profiles = self.get_profiles(room_id, &user_ids).await;
431 assert_eq!(profiles?.len(), 3);
432
433 let member_events = self
435 .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
436 room_id,
437 &[],
438 )
439 .await;
440 assert!(member_events?.is_empty());
441 let profiles = self.get_profiles(room_id, &[]).await;
442 assert!(profiles?.is_empty());
443
444 Ok(())
445 }
446
447 async fn test_filter_saving(&self) -> TestResult {
448 let filter_name = "filter_name";
449 let filter_id = "filter_id_1234";
450
451 self.set_kv_data(
452 StateStoreDataKey::Filter(filter_name),
453 StateStoreDataValue::Filter(filter_id.to_owned()),
454 )
455 .await?;
456 assert_let!(
457 Ok(Some(StateStoreDataValue::Filter(stored_filter_id))) =
458 self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await
459 );
460 assert_eq!(stored_filter_id, filter_id);
461
462 self.remove_kv_data(StateStoreDataKey::Filter(filter_name)).await?;
463 assert_matches!(self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await, Ok(None));
464
465 Ok(())
466 }
467
468 async fn test_user_avatar_url_saving(&self) -> TestResult {
469 let user_id = user_id!("@alice:example.org");
470 let url = owned_mxc_uri!("mxc://example.org/poiuyt098");
471
472 self.set_kv_data(
473 StateStoreDataKey::UserAvatarUrl(user_id),
474 StateStoreDataValue::UserAvatarUrl(url.clone()),
475 )
476 .await?;
477
478 assert_let!(
479 Ok(Some(StateStoreDataValue::UserAvatarUrl(stored_url))) =
480 self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await
481 );
482 assert_eq!(stored_url, url);
483
484 self.remove_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await?;
485 assert_matches!(
486 self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await,
487 Ok(None)
488 );
489
490 Ok(())
491 }
492
493 async fn test_supported_versions_saving(&self) -> TestResult {
494 let versions =
495 BTreeSet::from([MatrixVersion::V1_1, MatrixVersion::V1_2, MatrixVersion::V1_11]);
496 let supported_versions = SupportedVersionsResponse {
497 versions: versions.iter().map(|version| version.as_str().unwrap().to_owned()).collect(),
498 unstable_features: [("org.matrix.experimental".to_owned(), true)].into(),
499 };
500
501 self.set_kv_data(
502 StateStoreDataKey::SupportedVersions,
503 StateStoreDataValue::SupportedVersions(TtlStoreValue::new(supported_versions.clone())),
504 )
505 .await?;
506
507 assert_let!(
508 Ok(Some(StateStoreDataValue::SupportedVersions(stored_supported_versions))) =
509 self.get_kv_data(StateStoreDataKey::SupportedVersions).await
510 );
511 assert_let!(Some(stored_supported_versions) = stored_supported_versions.into_data());
512 assert_eq!(supported_versions, stored_supported_versions);
513
514 let stored_supported = stored_supported_versions.supported_versions();
515 assert_eq!(stored_supported.versions, versions);
516 assert_eq!(stored_supported.features.len(), 1);
517 assert!(stored_supported.features.contains(&FeatureFlag::from("org.matrix.experimental")));
518
519 self.remove_kv_data(StateStoreDataKey::SupportedVersions).await?;
520 assert_matches!(self.get_kv_data(StateStoreDataKey::SupportedVersions).await, Ok(None));
521
522 Ok(())
523 }
524
525 async fn test_well_known_saving(&self) -> TestResult {
526 let well_known = WellKnownResponse {
527 homeserver: HomeserverInfo::new("matrix.example.com".to_owned()),
528 identity_server: None,
529 tile_server: None,
530 rtc_foci: vec![RtcFocusInfo::livekit("livekit.example.com".to_owned())],
531 };
532
533 self.set_kv_data(
534 StateStoreDataKey::WellKnown,
535 StateStoreDataValue::WellKnown(TtlStoreValue::new(Some(well_known.clone()))),
536 )
537 .await?;
538
539 assert_let!(
540 Ok(Some(StateStoreDataValue::WellKnown(stored_well_known))) =
541 self.get_kv_data(StateStoreDataKey::WellKnown).await
542 );
543 assert_let!(Some(stored_well_known) = stored_well_known.into_data());
544 assert_eq!(stored_well_known, Some(well_known));
545
546 self.remove_kv_data(StateStoreDataKey::WellKnown).await?;
547 assert_matches!(self.get_kv_data(StateStoreDataKey::WellKnown).await, Ok(None));
548
549 self.set_kv_data(
550 StateStoreDataKey::WellKnown,
551 StateStoreDataValue::WellKnown(TtlStoreValue::new(None)),
552 )
553 .await?;
554
555 assert_let!(
556 Ok(Some(StateStoreDataValue::WellKnown(stored_well_known))) =
557 self.get_kv_data(StateStoreDataKey::WellKnown).await
558 );
559 assert_let!(Some(stored_well_known) = stored_well_known.into_data());
560 assert_eq!(stored_well_known, None);
561
562 Ok(())
563 }
564
565 async fn test_sync_token_saving(&self) -> TestResult {
566 let sync_token_1 = "t392-516_47314_0_7_1";
567 let sync_token_2 = "t392-516_47314_0_7_2";
568
569 assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
570
571 let changes =
572 StateChanges { sync_token: Some(sync_token_1.to_owned()), ..Default::default() };
573 self.save_changes(&changes).await?;
574 assert_let!(
575 Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
576 self.get_kv_data(StateStoreDataKey::SyncToken).await
577 );
578 assert_eq!(stored_sync_token, sync_token_1);
579
580 self.set_kv_data(
581 StateStoreDataKey::SyncToken,
582 StateStoreDataValue::SyncToken(sync_token_2.to_owned()),
583 )
584 .await?;
585 assert_let!(
586 Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
587 self.get_kv_data(StateStoreDataKey::SyncToken).await
588 );
589 assert_eq!(stored_sync_token, sync_token_2);
590
591 self.remove_kv_data(StateStoreDataKey::SyncToken).await?;
592 assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
593
594 Ok(())
595 }
596
597 async fn test_utd_hook_manager_data_saving(&self) -> TestResult {
598 assert!(
600 self.get_kv_data(StateStoreDataKey::UtdHookManagerData)
601 .await
602 .expect("Could not read data")
603 .is_none(),
604 "Store was not empty at start"
605 );
606
607 let data = GrowableBloomBuilder::new().build();
609 self.set_kv_data(
610 StateStoreDataKey::UtdHookManagerData,
611 StateStoreDataValue::UtdHookManagerData(data.clone()),
612 )
613 .await
614 .expect("Could not save data");
615
616 let read_data = self
618 .get_kv_data(StateStoreDataKey::UtdHookManagerData)
619 .await
620 .expect("Could not read data")
621 .expect("no data found")
622 .into_utd_hook_manager_data()
623 .expect("not UtdHookManagerData");
624
625 assert_eq!(read_data, data);
626
627 Ok(())
628 }
629
630 async fn test_one_time_key_already_uploaded_data_saving(&self) -> TestResult {
631 assert!(
633 self.get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded).await?.is_none(),
634 "Store was not empty at start"
635 );
636
637 self.set_kv_data(
638 StateStoreDataKey::OneTimeKeyAlreadyUploaded,
639 StateStoreDataValue::OneTimeKeyAlreadyUploaded,
640 )
641 .await?;
642
643 let data = self.get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded).await?;
644 data.expect("The loaded data should be Some");
645
646 Ok(())
647 }
648
649 async fn test_stripped_member_saving(&self) -> TestResult {
650 let room_id = room_id!("!test_stripped_member_saving:localhost");
651 let user_id = user_id();
652 let second_user_id = user_id!("@second:localhost");
653 let third_user_id = user_id!("@third:localhost");
654 let unknown_user_id = user_id!("@unknown:localhost");
655
656 assert!(self.get_member_event(room_id, user_id).await?.is_none());
658 let member_events = self
659 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
660 room_id,
661 &[user_id.to_owned()],
662 )
663 .await;
664 assert!(member_events?.is_empty());
665
666 let mut changes = StateChanges::default();
668 changes
669 .stripped_state
670 .entry(room_id.to_owned())
671 .or_default()
672 .entry(StateEventType::RoomMember)
673 .or_default()
674 .insert(user_id.into(), stripped_membership_event().cast());
675 self.save_changes(&changes).await?;
676
677 assert!(self.get_member_event(room_id, user_id).await?.is_some());
678 let member_events = self
679 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
680 room_id,
681 &[user_id.to_owned()],
682 )
683 .await;
684 assert_eq!(member_events?.len(), 1);
685 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
686 assert_eq!(members.len(), 1, "We expected to find members for the room");
687
688 let mut changes = StateChanges::default();
690 let changes_members = changes
691 .stripped_state
692 .entry(room_id.to_owned())
693 .or_default()
694 .entry(StateEventType::RoomMember)
695 .or_default();
696 changes_members
697 .insert(second_user_id.into(), custom_stripped_membership_event(second_user_id).cast());
698 changes_members
699 .insert(third_user_id.into(), custom_stripped_membership_event(third_user_id).cast());
700 self.save_changes(&changes).await?;
701
702 assert!(self.get_member_event(room_id, second_user_id).await?.is_some());
703 assert!(self.get_member_event(room_id, third_user_id).await?.is_some());
704 let member_events = self
705 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
706 room_id,
707 &[user_id.to_owned(), second_user_id.to_owned(), third_user_id.to_owned()],
708 )
709 .await;
710 assert_eq!(member_events?.len(), 3);
711 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
712 assert_eq!(members.len(), 3, "We expected to find members for the room");
713
714 let member_events = self
716 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
717 room_id,
718 &[
719 user_id.to_owned(),
720 second_user_id.to_owned(),
721 third_user_id.to_owned(),
722 unknown_user_id.to_owned(),
723 ],
724 )
725 .await;
726 assert_eq!(member_events?.len(), 3);
727
728 let member_events = self
730 .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
731 room_id,
732 &[],
733 )
734 .await;
735 assert!(member_events?.is_empty());
736
737 Ok(())
738 }
739
740 async fn test_power_level_saving(&self) -> TestResult {
741 let room_id = room_id!("!test_power_level_saving:localhost");
742
743 let raw_event = power_level_event();
744 let event = raw_event.deserialize()?;
745
746 assert!(
747 self.get_state_event(room_id, StateEventType::RoomPowerLevels, "").await?.is_none()
748 );
749 let mut changes = StateChanges::default();
750 changes.add_state_event(room_id, event, raw_event);
751
752 self.save_changes(&changes).await?;
753 assert!(
754 self.get_state_event(room_id, StateEventType::RoomPowerLevels, "").await?.is_some()
755 );
756
757 Ok(())
758 }
759
760 async fn test_receipts_saving(&self) -> TestResult {
761 let room_id = room_id!("!test_receipts_saving:localhost");
762
763 let first_event_id = event_id!("$1435641916114394fHBLK:matrix.org");
764 let second_event_id = event_id!("$fHBLK1435641916114394:matrix.org");
765
766 let first_receipt_ts = uint!(1436451550);
767 let second_receipt_ts = uint!(1436451653);
768 let third_receipt_ts = uint!(1436474532);
769
770 let first_receipt_event = serde_json::from_value(json!({
771 first_event_id: {
772 "m.read": {
773 user_id(): {
774 "ts": first_receipt_ts,
775 }
776 }
777 }
778 }))?;
779
780 let second_receipt_event = serde_json::from_value(json!({
781 second_event_id: {
782 "m.read": {
783 user_id(): {
784 "ts": second_receipt_ts,
785 }
786 }
787 }
788 }))?;
789
790 let third_receipt_event = serde_json::from_value(json!({
791 second_event_id: {
792 "m.read": {
793 user_id(): {
794 "ts": third_receipt_ts,
795 "thread_id": "main",
796 }
797 }
798 }
799 }))?;
800
801 assert!(
802 self.get_user_room_receipt_event(
803 room_id,
804 ReceiptType::Read,
805 ReceiptThread::Unthreaded,
806 user_id()
807 )
808 .await
809 .expect("failed to read unthreaded user room receipt")
810 .is_none()
811 );
812 assert!(
813 self.get_event_room_receipt_events(
814 room_id,
815 ReceiptType::Read,
816 ReceiptThread::Unthreaded,
817 first_event_id
818 )
819 .await
820 .expect("failed to read unthreaded event room receipt for 1")
821 .is_empty()
822 );
823 assert!(
824 self.get_event_room_receipt_events(
825 room_id,
826 ReceiptType::Read,
827 ReceiptThread::Unthreaded,
828 second_event_id
829 )
830 .await
831 .expect("failed to read unthreaded event room receipt for 2")
832 .is_empty()
833 );
834
835 let mut changes = StateChanges::default();
836 changes.add_receipts(room_id, first_receipt_event);
837
838 self.save_changes(&changes).await?;
839 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
840 .get_user_room_receipt_event(
841 room_id,
842 ReceiptType::Read,
843 ReceiptThread::Unthreaded,
844 user_id(),
845 )
846 .await
847 .expect("failed to read unthreaded user room receipt after save")
848 .unwrap();
849 assert_eq!(unthreaded_user_receipt_event_id, first_event_id);
850 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, first_receipt_ts);
851 let first_event_unthreaded_receipts = self
852 .get_event_room_receipt_events(
853 room_id,
854 ReceiptType::Read,
855 ReceiptThread::Unthreaded,
856 first_event_id,
857 )
858 .await
859 .expect("failed to read unthreaded event room receipt for 1 after save");
860 assert_eq!(
861 first_event_unthreaded_receipts.len(),
862 1,
863 "Found a wrong number of unthreaded receipts for 1 after save"
864 );
865 assert_eq!(first_event_unthreaded_receipts[0].0, user_id());
866 assert_eq!(first_event_unthreaded_receipts[0].1.ts.unwrap().0, first_receipt_ts);
867 assert!(
868 self.get_event_room_receipt_events(
869 room_id,
870 ReceiptType::Read,
871 ReceiptThread::Unthreaded,
872 second_event_id
873 )
874 .await
875 .expect("failed to read unthreaded event room receipt for 2 after save")
876 .is_empty()
877 );
878
879 let mut changes = StateChanges::default();
880 changes.add_receipts(room_id, second_receipt_event);
881
882 self.save_changes(&changes).await.expect("Saving works");
883 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
884 .get_user_room_receipt_event(
885 room_id,
886 ReceiptType::Read,
887 ReceiptThread::Unthreaded,
888 user_id(),
889 )
890 .await
891 .expect("Getting unthreaded user room receipt after save failed")
892 .unwrap();
893 assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
894 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
895 assert!(
896 self.get_event_room_receipt_events(
897 room_id,
898 ReceiptType::Read,
899 ReceiptThread::Unthreaded,
900 first_event_id
901 )
902 .await
903 .expect("Getting unthreaded event room receipt events for first event failed")
904 .is_empty()
905 );
906 let second_event_unthreaded_receipts = self
907 .get_event_room_receipt_events(
908 room_id,
909 ReceiptType::Read,
910 ReceiptThread::Unthreaded,
911 second_event_id,
912 )
913 .await
914 .expect("Getting unthreaded event room receipt events for second event failed");
915 assert_eq!(
916 second_event_unthreaded_receipts.len(),
917 1,
918 "Found a wrong number of unthreaded receipts for second event after save"
919 );
920 assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
921 assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
922
923 assert!(
924 self.get_user_room_receipt_event(
925 room_id,
926 ReceiptType::Read,
927 ReceiptThread::Main,
928 user_id()
929 )
930 .await
931 .expect("failed to read threaded user room receipt")
932 .is_none()
933 );
934 assert!(
935 self.get_event_room_receipt_events(
936 room_id,
937 ReceiptType::Read,
938 ReceiptThread::Main,
939 second_event_id
940 )
941 .await
942 .expect("Getting threaded event room receipts for 2 failed")
943 .is_empty()
944 );
945
946 let mut changes = StateChanges::default();
947 changes.add_receipts(room_id, third_receipt_event);
948
949 self.save_changes(&changes).await.expect("Saving works");
950 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
952 .get_user_room_receipt_event(
953 room_id,
954 ReceiptType::Read,
955 ReceiptThread::Unthreaded,
956 user_id(),
957 )
958 .await
959 .expect("Getting unthreaded user room receipt after save failed")
960 .unwrap();
961 assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
962 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
963 let second_event_unthreaded_receipts = self
964 .get_event_room_receipt_events(
965 room_id,
966 ReceiptType::Read,
967 ReceiptThread::Unthreaded,
968 second_event_id,
969 )
970 .await
971 .expect("Getting unthreaded event room receipt events for second event failed");
972 assert_eq!(
973 second_event_unthreaded_receipts.len(),
974 1,
975 "Found a wrong number of unthreaded receipts for second event after save"
976 );
977 assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
978 assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
979 let (threaded_user_receipt_event_id, threaded_user_receipt) = self
981 .get_user_room_receipt_event(room_id, ReceiptType::Read, ReceiptThread::Main, user_id())
982 .await
983 .expect("Getting threaded user room receipt after save failed")
984 .unwrap();
985 assert_eq!(threaded_user_receipt_event_id, second_event_id);
986 assert_eq!(threaded_user_receipt.ts.unwrap().0, third_receipt_ts);
987 let second_event_threaded_receipts = self
988 .get_event_room_receipt_events(
989 room_id,
990 ReceiptType::Read,
991 ReceiptThread::Main,
992 second_event_id,
993 )
994 .await
995 .expect("Getting threaded event room receipt events for second event failed");
996 assert_eq!(
997 second_event_threaded_receipts.len(),
998 1,
999 "Found a wrong number of threaded receipts for second event after save"
1000 );
1001 assert_eq!(second_event_threaded_receipts[0].0, user_id());
1002 assert_eq!(second_event_threaded_receipts[0].1.ts.unwrap().0, third_receipt_ts);
1003
1004 Ok(())
1005 }
1006
1007 async fn test_custom_storage(&self) -> TestResult {
1008 let key = "my_key";
1009 let value = &[0, 1, 2, 3];
1010
1011 self.set_custom_value(key.as_bytes(), value.to_vec()).await?;
1012
1013 let read = self.get_custom_value(key.as_bytes()).await?;
1014
1015 assert_eq!(Some(value.as_ref()), read.as_deref());
1016
1017 Ok(())
1018 }
1019
1020 async fn test_stripped_non_stripped(&self) -> TestResult {
1021 let room_id = room_id!("!test_stripped_non_stripped:localhost");
1022 let user_id = user_id();
1023
1024 assert!(self.get_member_event(room_id, user_id).await?.is_none());
1025 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 0);
1026
1027 let mut changes = StateChanges::default();
1028 changes
1029 .state
1030 .entry(room_id.to_owned())
1031 .or_default()
1032 .entry(StateEventType::RoomMember)
1033 .or_default()
1034 .insert(user_id.into(), membership_event().cast());
1035 changes.add_room(RoomInfo::new(room_id, RoomState::Left));
1036 self.save_changes(&changes).await?;
1037
1038 let member_event = self.get_member_event(room_id, user_id).await?.unwrap().deserialize()?;
1039 assert!(matches!(member_event, MemberEvent::Sync(_)));
1040 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 1);
1041
1042 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
1043 assert_eq!(members, vec![user_id.to_owned()]);
1044
1045 let mut changes = StateChanges::default();
1046 changes.add_stripped_member(room_id, user_id, custom_stripped_membership_event(user_id));
1047 changes.add_room(RoomInfo::new(room_id, RoomState::Invited));
1048 self.save_changes(&changes).await?;
1049
1050 let member_event = self.get_member_event(room_id, user_id).await?.unwrap().deserialize()?;
1051 assert!(matches!(member_event, MemberEvent::Stripped(_)));
1052 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 1);
1053
1054 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
1055 assert_eq!(members, vec![user_id.to_owned()]);
1056
1057 Ok(())
1058 }
1059
1060 async fn test_room_removal(&self) -> TestResult {
1061 let room_id = room_id();
1062 let user_id = user_id();
1063 let display_name = DisplayName::new("example");
1064 let stripped_room_id = stripped_room_id();
1065
1066 self.populate().await?;
1067
1068 {
1069 let txn = TransactionId::new();
1071 let ev =
1072 SerializableEventContent::new(&RoomMessageEventContent::text_plain("sup").into())?;
1073 self.save_send_queue_request(
1074 room_id,
1075 txn.clone(),
1076 MilliSecondsSinceUnixEpoch::now(),
1077 ev.into(),
1078 0,
1079 )
1080 .await?;
1081
1082 self.save_dependent_queued_request(
1084 room_id,
1085 &txn,
1086 ChildTransactionId::new(),
1087 MilliSecondsSinceUnixEpoch::now(),
1088 DependentQueuedRequestKind::RedactEvent,
1089 )
1090 .await?;
1091 }
1092
1093 self.remove_room(room_id).await?;
1094
1095 assert_eq!(
1096 self.get_room_infos(&RoomLoadSettings::default()).await?.len(),
1097 1,
1098 "room is still there"
1099 );
1100
1101 assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_none());
1102 assert!(
1103 self.get_state_events(room_id, StateEventType::RoomTopic).await?.is_empty(),
1104 "still state events found"
1105 );
1106 assert!(self.get_profile(room_id, user_id).await?.is_none());
1107 assert!(self.get_member_event(room_id, user_id).await?.is_none());
1108 assert!(
1109 self.get_user_ids(room_id, RoomMemberships::empty()).await?.is_empty(),
1110 "still user ids found"
1111 );
1112 assert!(
1113 self.get_user_ids(room_id, RoomMemberships::INVITE).await?.is_empty(),
1114 "still invited user ids found"
1115 );
1116 assert!(
1117 self.get_user_ids(room_id, RoomMemberships::JOIN).await?.is_empty(),
1118 "still joined users found"
1119 );
1120 assert!(
1121 self.get_users_with_display_name(room_id, &display_name).await?.is_empty(),
1122 "still display names found"
1123 );
1124 assert!(
1125 self.get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
1126 .await?
1127 .is_none()
1128 );
1129 assert!(
1130 self.get_user_room_receipt_event(
1131 room_id,
1132 ReceiptType::Read,
1133 ReceiptThread::Unthreaded,
1134 user_id
1135 )
1136 .await?
1137 .is_none()
1138 );
1139 assert!(
1140 self.get_event_room_receipt_events(
1141 room_id,
1142 ReceiptType::Read,
1143 ReceiptThread::Unthreaded,
1144 first_receipt_event_id()
1145 )
1146 .await?
1147 .is_empty(),
1148 "still event recepts in the store"
1149 );
1150 assert!(self.load_send_queue_requests(room_id).await?.is_empty());
1151 assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1152
1153 self.remove_room(stripped_room_id).await?;
1154
1155 assert!(
1156 self.get_room_infos(&RoomLoadSettings::default()).await?.is_empty(),
1157 "still room info found"
1158 );
1159 Ok(())
1160 }
1161
1162 async fn test_profile_removal(&self) -> TestResult {
1163 let room_id = room_id();
1164
1165 let user_id = user_id();
1167 let invited_user_id = invited_user_id();
1168
1169 self.populate().await?;
1170
1171 let new_invite_member_json = json!({
1172 "content": {
1173 "avatar_url": "mxc://localhost/SEsfnsuifSDFSSEG",
1174 "displayname": "example after update",
1175 "membership": "invite",
1176 "reason": "Looking for support"
1177 },
1178 "event_id": "$143273582443PhrSm:localhost",
1179 "origin_server_ts": 1432735824,
1180 "room_id": room_id,
1181 "sender": user_id,
1182 "state_key": invited_user_id,
1183 "type": "m.room.member",
1184 });
1185 let new_invite_member_event: SyncRoomMemberEvent =
1186 serde_json::from_value(new_invite_member_json.clone())?;
1187
1188 let mut changes = StateChanges {
1189 profiles_to_delete: [(
1191 room_id.to_owned(),
1192 vec![user_id.to_owned(), invited_user_id.to_owned()],
1193 )]
1194 .into(),
1195
1196 profiles: {
1198 let mut map = BTreeMap::default();
1199 map.insert(
1200 room_id.to_owned(),
1201 [(invited_user_id.to_owned(), new_invite_member_event.into())]
1202 .into_iter()
1203 .collect(),
1204 );
1205 map
1206 },
1207
1208 ..StateChanges::default()
1209 };
1210
1211 let raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(new_invite_member_json)
1212 .expect("can create sync-state-event for topic");
1213 let event = raw.deserialize()?;
1214 changes.add_state_event(room_id, event, raw);
1215
1216 self.save_changes(&changes).await?;
1217
1218 assert!(self.get_profile(room_id, user_id).await?.is_none());
1220 assert!(self.get_member_event(room_id, user_id).await?.is_some());
1221
1222 let invited_member_event = self.get_profile(room_id, invited_user_id).await?.unwrap();
1224 assert_eq!(
1225 invited_member_event.as_original().unwrap().content.displayname.as_deref(),
1226 Some("example after update")
1227 );
1228 assert!(self.get_member_event(room_id, invited_user_id).await?.is_some());
1229
1230 Ok(())
1231 }
1232
1233 async fn test_presence_saving(&self) -> TestResult {
1234 let user_id = user_id();
1235 let second_user_id = user_id!("@second:localhost");
1236 let third_user_id = user_id!("@third:localhost");
1237 let unknown_user_id = user_id!("@unknown:localhost");
1238
1239 let mut user_ids = vec![user_id.to_owned()];
1241 let presence_event = self.get_presence_event(user_id).await;
1242 assert!(presence_event?.is_none());
1243 let presence_events = self.get_presence_events(&user_ids).await;
1244 assert!(presence_events?.is_empty());
1245
1246 let mut changes = StateChanges::default();
1248 changes.presence.insert(user_id.to_owned(), custom_presence_event(user_id));
1249 self.save_changes(&changes).await?;
1250
1251 let presence_event = self.get_presence_event(user_id).await;
1252 assert!(presence_event?.is_some());
1253 let presence_events = self.get_presence_events(&user_ids).await;
1254 assert_eq!(presence_events?.len(), 1);
1255
1256 let mut changes = StateChanges::default();
1258 changes.presence.insert(second_user_id.to_owned(), custom_presence_event(second_user_id));
1259 changes.presence.insert(third_user_id.to_owned(), custom_presence_event(third_user_id));
1260 self.save_changes(&changes).await?;
1261
1262 user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
1263 let presence_event = self.get_presence_event(second_user_id).await;
1264 assert!(presence_event?.is_some());
1265 let presence_event = self.get_presence_event(third_user_id).await;
1266 assert!(presence_event?.is_some());
1267 let presence_events = self.get_presence_events(&user_ids).await;
1268 assert_eq!(presence_events?.len(), 3);
1269
1270 user_ids.push(unknown_user_id.to_owned());
1272 let member_events = self.get_presence_events(&user_ids).await;
1273 assert_eq!(member_events?.len(), 3);
1274
1275 let presence_events = self.get_presence_events(&[]).await;
1277 assert!(presence_events?.is_empty());
1278
1279 Ok(())
1280 }
1281
1282 async fn test_display_names_saving(&self) -> TestResult {
1283 let room_id = room_id!("!test_display_names_saving:localhost");
1284 let user_id = user_id();
1285 let user_display_name = DisplayName::new("User");
1286 let second_user_id = user_id!("@second:localhost");
1287 let third_user_id = user_id!("@third:localhost");
1288 let other_display_name = DisplayName::new("Raoul");
1289 let unknown_display_name = DisplayName::new("Unknown");
1290
1291 let mut display_names = vec![user_display_name.to_owned()];
1293 let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1294 assert!(users.is_empty());
1295 let names = self.get_users_with_display_names(room_id, &display_names).await?;
1296 assert!(names.is_empty());
1297
1298 let mut changes = StateChanges::default();
1300 changes
1301 .ambiguity_maps
1302 .entry(room_id.to_owned())
1303 .or_default()
1304 .insert(user_display_name.to_owned(), [user_id.to_owned()].into());
1305 self.save_changes(&changes).await?;
1306
1307 let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1308 assert_eq!(users.len(), 1);
1309 let names = self.get_users_with_display_names(room_id, &display_names).await?;
1310 assert_eq!(names.len(), 1);
1311 assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1312
1313 let mut changes = StateChanges::default();
1315 changes.ambiguity_maps.entry(room_id.to_owned()).or_default().insert(
1316 other_display_name.to_owned(),
1317 [second_user_id.to_owned(), third_user_id.to_owned()].into(),
1318 );
1319 self.save_changes(&changes).await?;
1320
1321 display_names.push(other_display_name.to_owned());
1322 let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1323 assert_eq!(users.len(), 1);
1324 let users = self.get_users_with_display_name(room_id, &other_display_name).await?;
1325 assert_eq!(users.len(), 2);
1326 let names = self.get_users_with_display_names(room_id, &display_names).await?;
1327 assert_eq!(names.len(), 2);
1328 assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1329 assert_eq!(names.get(&other_display_name).unwrap().len(), 2);
1330
1331 display_names.push(unknown_display_name.to_owned());
1333 let names = self.get_users_with_display_names(room_id, &display_names).await?;
1334 assert_eq!(names.len(), 2);
1335
1336 let names = self.get_users_with_display_names(room_id, &[]).await?;
1338 assert!(names.is_empty());
1339
1340 Ok(())
1341 }
1342
1343 #[allow(clippy::needless_range_loop)]
1344 async fn test_send_queue(&self) -> TestResult {
1345 let room_id = room_id!("!test_send_queue:localhost");
1346
1347 let events = self.load_send_queue_requests(room_id).await?;
1349 assert!(events.is_empty());
1350
1351 let txn0 = TransactionId::new();
1353 let event0 =
1354 SerializableEventContent::new(&RoomMessageEventContent::text_plain("msg0").into())?;
1355 self.save_send_queue_request(
1356 room_id,
1357 txn0.clone(),
1358 MilliSecondsSinceUnixEpoch::now(),
1359 event0.into(),
1360 0,
1361 )
1362 .await?;
1363
1364 let pending = self.load_send_queue_requests(room_id).await?;
1366
1367 assert_eq!(pending.len(), 1);
1368 {
1369 assert_eq!(pending[0].transaction_id, txn0);
1370
1371 let deserialized = pending[0].as_event().unwrap().deserialize()?;
1372 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1373 assert_eq!(content.body(), "msg0");
1374
1375 assert!(!pending[0].is_wedged());
1376 }
1377
1378 for i in 1..=3 {
1380 let txn = TransactionId::new();
1381 let event = SerializableEventContent::new(
1382 &RoomMessageEventContent::text_plain(format!("msg{i}")).into(),
1383 )?;
1384
1385 self.save_send_queue_request(
1386 room_id,
1387 txn,
1388 MilliSecondsSinceUnixEpoch::now(),
1389 event.into(),
1390 0,
1391 )
1392 .await?;
1393 }
1394
1395 let pending = self.load_send_queue_requests(room_id).await?;
1397
1398 assert_eq!(pending.len(), 4);
1400
1401 assert_eq!(pending[0].transaction_id, txn0);
1402
1403 for i in 0..4 {
1404 let deserialized = pending[i].as_event().unwrap().deserialize()?;
1405 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1406 assert_eq!(content.body(), format!("msg{i}"));
1407 assert!(!pending[i].is_wedged());
1408 }
1409
1410 let txn2 = &pending[2].transaction_id;
1412 self.update_send_queue_request_status(
1413 room_id,
1414 txn2,
1415 Some(QueueWedgeError::GenericApiError { msg: "Oops".to_owned() }),
1416 )
1417 .await?;
1418
1419 let pending = self.load_send_queue_requests(room_id).await?;
1421
1422 assert_eq!(pending.len(), 4);
1424 assert_eq!(pending[0].transaction_id, txn0);
1425 assert_eq!(pending[2].transaction_id, *txn2);
1426 assert!(pending[2].is_wedged());
1427 let error = pending[2].clone().error.unwrap();
1428 let generic_error = assert_matches!(error, QueueWedgeError::GenericApiError { msg } => msg);
1429 assert_eq!(generic_error, "Oops");
1430 for i in 0..4 {
1431 if i != 2 {
1432 assert!(!pending[i].is_wedged());
1433 }
1434 }
1435
1436 let event0 = SerializableEventContent::new(
1438 &RoomMessageEventContent::text_plain("wow that's a cool test").into(),
1439 )?;
1440 self.update_send_queue_request(room_id, txn2, event0.into()).await?;
1441
1442 let pending = self.load_send_queue_requests(room_id).await?;
1444
1445 assert_eq!(pending.len(), 4);
1446 {
1447 assert_eq!(pending[2].transaction_id, *txn2);
1448
1449 let deserialized = pending[2].as_event().unwrap().deserialize()?;
1450 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1451 assert_eq!(content.body(), "wow that's a cool test");
1452
1453 assert!(!pending[2].is_wedged());
1454
1455 for i in 0..4 {
1456 if i != 2 {
1457 let deserialized = pending[i].as_event().unwrap().deserialize()?;
1458 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1459 assert_eq!(content.body(), format!("msg{i}"));
1460
1461 assert!(!pending[i].is_wedged());
1462 }
1463 }
1464 }
1465
1466 self.remove_send_queue_request(room_id, &txn0).await?;
1468
1469 let pending = self.load_send_queue_requests(room_id).await?;
1471
1472 assert_eq!(pending.len(), 3);
1473 assert_eq!(pending[1].transaction_id, *txn2);
1474 for i in 0..3 {
1475 assert_ne!(pending[i].transaction_id, txn0);
1476 }
1477
1478 let room_id2 = room_id!("!test_send_queue_two:localhost");
1483 {
1484 let txn = TransactionId::new();
1485 let event = SerializableEventContent::new(
1486 &RoomMessageEventContent::text_plain("room2").into(),
1487 )?;
1488 self.save_send_queue_request(
1489 room_id2,
1490 txn.clone(),
1491 MilliSecondsSinceUnixEpoch::now(),
1492 event.into(),
1493 0,
1494 )
1495 .await?;
1496 }
1497
1498 {
1500 let room_id3 = room_id!("!test_send_queue_three:localhost");
1501 let txn = TransactionId::new();
1502 let event = SerializableEventContent::new(
1503 &RoomMessageEventContent::text_plain("room3").into(),
1504 )?;
1505 self.save_send_queue_request(
1506 room_id3,
1507 txn.clone(),
1508 MilliSecondsSinceUnixEpoch::now(),
1509 event.into(),
1510 0,
1511 )
1512 .await?;
1513
1514 self.remove_send_queue_request(room_id3, &txn).await?;
1515 }
1516
1517 let outstanding_rooms = self.load_rooms_with_unsent_requests().await?;
1520 assert_eq!(outstanding_rooms.len(), 2);
1521 assert!(outstanding_rooms.iter().any(|room| room == room_id));
1522 assert!(outstanding_rooms.iter().any(|room| room == room_id2));
1523
1524 Ok(())
1525 }
1526
1527 async fn test_send_queue_priority(&self) -> TestResult {
1528 let room_id = room_id!("!test_send_queue:localhost");
1529
1530 let events = self.load_send_queue_requests(room_id).await?;
1532 assert!(events.is_empty());
1533
1534 let low0_txn = TransactionId::new();
1536 let ev0 =
1537 SerializableEventContent::new(&RoomMessageEventContent::text_plain("low0").into())?;
1538 self.save_send_queue_request(
1539 room_id,
1540 low0_txn.clone(),
1541 MilliSecondsSinceUnixEpoch::now(),
1542 ev0.into(),
1543 2,
1544 )
1545 .await?;
1546
1547 let high_txn = TransactionId::new();
1549 let ev1 =
1550 SerializableEventContent::new(&RoomMessageEventContent::text_plain("high").into())?;
1551 self.save_send_queue_request(
1552 room_id,
1553 high_txn.clone(),
1554 MilliSecondsSinceUnixEpoch::now(),
1555 ev1.into(),
1556 10,
1557 )
1558 .await?;
1559
1560 let low1_txn = TransactionId::new();
1562 let ev2 =
1563 SerializableEventContent::new(&RoomMessageEventContent::text_plain("low1").into())?;
1564 self.save_send_queue_request(
1565 room_id,
1566 low1_txn.clone(),
1567 MilliSecondsSinceUnixEpoch::now(),
1568 ev2.into(),
1569 2,
1570 )
1571 .await?;
1572
1573 let pending = self.load_send_queue_requests(room_id).await?;
1576
1577 assert_eq!(pending.len(), 3);
1578 {
1579 assert_eq!(pending[0].transaction_id, high_txn);
1580
1581 let deserialized = pending[0].as_event().unwrap().deserialize()?;
1582 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1583 assert_eq!(content.body(), "high");
1584 }
1585
1586 {
1587 assert_eq!(pending[1].transaction_id, low0_txn);
1588
1589 let deserialized = pending[1].as_event().unwrap().deserialize()?;
1590 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1591 assert_eq!(content.body(), "low0");
1592 }
1593
1594 {
1595 assert_eq!(pending[2].transaction_id, low1_txn);
1596
1597 let deserialized = pending[2].as_event().unwrap().deserialize()?;
1598 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1599 assert_eq!(content.body(), "low1");
1600 }
1601
1602 Ok(())
1603 }
1604
1605 async fn test_send_queue_dependents(&self) -> TestResult {
1606 let room_id = room_id!("!test_send_queue_dependents:localhost");
1607
1608 let txn0 = TransactionId::new();
1610 let event0 =
1611 SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into())?;
1612 self.save_send_queue_request(
1613 room_id,
1614 txn0.clone(),
1615 MilliSecondsSinceUnixEpoch::now(),
1616 event0.clone().into(),
1617 0,
1618 )
1619 .await?;
1620
1621 assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1623
1624 let child_txn = ChildTransactionId::new();
1626 self.save_dependent_queued_request(
1627 room_id,
1628 &txn0,
1629 child_txn.clone(),
1630 MilliSecondsSinceUnixEpoch::now(),
1631 DependentQueuedRequestKind::RedactEvent,
1632 )
1633 .await?;
1634
1635 let dependents = self.load_dependent_queued_requests(room_id).await?;
1637 assert_eq!(dependents.len(), 1);
1638 assert_eq!(dependents[0].parent_transaction_id, txn0);
1639 assert_eq!(dependents[0].own_transaction_id, child_txn);
1640 assert!(dependents[0].parent_key.is_none());
1641 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1642
1643 let (event, event_type) = event0.raw();
1645 let event_id = owned_event_id!("$1");
1646 let num_updated = self
1647 .mark_dependent_queued_requests_as_ready(
1648 room_id,
1649 &txn0,
1650 SentRequestKey::Event {
1651 event_id: event_id.clone(),
1652 event: event.clone(),
1653 event_type: event_type.to_owned(),
1654 },
1655 )
1656 .await?;
1657 assert_eq!(num_updated, 1);
1658
1659 let dependents = self.load_dependent_queued_requests(room_id).await?;
1661 assert_eq!(dependents.len(), 1);
1662 assert_eq!(dependents[0].parent_transaction_id, txn0);
1663 assert_eq!(dependents[0].own_transaction_id, child_txn);
1664 assert_matches!(
1665 dependents[0].parent_key.as_ref(),
1666 Some(SentRequestKey::Event {
1667 event_id: received_event_id,
1668 event: received_event,
1669 event_type: received_event_type
1670 }) => {
1671 assert_eq!(received_event_id, &event_id);
1672 assert_eq!(received_event.json().to_string(), event.json().to_string());
1673 assert_eq!(received_event_type.as_str(), event_type);
1674 }
1675 );
1676 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1677
1678 let removed = self
1680 .remove_dependent_queued_request(room_id, &dependents[0].own_transaction_id)
1681 .await?;
1682 assert!(removed);
1683
1684 assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1686
1687 let txn1 = TransactionId::new();
1690 let event1 =
1691 SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into())?;
1692 self.save_send_queue_request(
1693 room_id,
1694 txn1.clone(),
1695 MilliSecondsSinceUnixEpoch::now(),
1696 event1.into(),
1697 0,
1698 )
1699 .await?;
1700
1701 self.save_dependent_queued_request(
1702 room_id,
1703 &txn0,
1704 ChildTransactionId::new(),
1705 MilliSecondsSinceUnixEpoch::now(),
1706 DependentQueuedRequestKind::RedactEvent,
1707 )
1708 .await?;
1709 assert_eq!(self.load_dependent_queued_requests(room_id).await?.len(), 1);
1710
1711 self.save_dependent_queued_request(
1712 room_id,
1713 &txn1,
1714 ChildTransactionId::new(),
1715 MilliSecondsSinceUnixEpoch::now(),
1716 DependentQueuedRequestKind::EditEvent {
1717 new_content: SerializableEventContent::new(
1718 &RoomMessageEventContent::text_plain("edit").into(),
1719 )?,
1720 },
1721 )
1722 .await?;
1723 assert_eq!(self.load_dependent_queued_requests(room_id).await?.len(), 2);
1724
1725 let removed = self.remove_send_queue_request(room_id, &txn0).await?;
1727 assert!(removed);
1728
1729 let dependents = self.load_dependent_queued_requests(room_id).await?;
1731 assert_eq!(dependents.len(), 2);
1732
1733 Ok(())
1734 }
1735
1736 async fn test_update_send_queue_dependent(&self) -> TestResult {
1737 let room_id = room_id!("!test_send_queue_dependents:localhost");
1738
1739 let txn = TransactionId::new();
1740
1741 let child_txn = ChildTransactionId::new();
1743
1744 self.save_dependent_queued_request(
1745 room_id,
1746 &txn,
1747 child_txn.clone(),
1748 MilliSecondsSinceUnixEpoch::now(),
1749 DependentQueuedRequestKind::RedactEvent,
1750 )
1751 .await?;
1752
1753 let dependents = self.load_dependent_queued_requests(room_id).await?;
1755 assert_eq!(dependents.len(), 1);
1756 assert_eq!(dependents[0].parent_transaction_id, txn);
1757 assert_eq!(dependents[0].own_transaction_id, child_txn);
1758 assert!(dependents[0].parent_key.is_none());
1759 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1760
1761 self.update_dependent_queued_request(
1763 room_id,
1764 &child_txn,
1765 DependentQueuedRequestKind::ReactEvent { key: "👍".to_owned() },
1766 )
1767 .await?;
1768
1769 let dependents = self.load_dependent_queued_requests(room_id).await?;
1771 assert_eq!(dependents.len(), 1);
1772 assert_eq!(dependents[0].parent_transaction_id, txn);
1773 assert_eq!(dependents[0].own_transaction_id, child_txn);
1774 assert!(dependents[0].parent_key.is_none());
1775 assert_matches!(
1776 &dependents[0].kind,
1777 DependentQueuedRequestKind::ReactEvent { key } => {
1778 assert_eq!(key, "👍");
1779 }
1780 );
1781
1782 Ok(())
1783 }
1784
1785 async fn test_get_room_infos(&self) -> TestResult {
1786 let room_id_0 = room_id!("!r0");
1787 let room_id_1 = room_id!("!r1");
1788 let room_id_2 = room_id!("!r2");
1789
1790 {
1792 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 0);
1793 }
1794
1795 let mut changes = StateChanges::default();
1797 changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
1798 changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
1799 self.save_changes(&changes).await?;
1800
1801 {
1803 let mut all_rooms = self.get_room_infos(&RoomLoadSettings::All).await?;
1804
1805 all_rooms.sort_by(|a, b| a.room_id.cmp(&b.room_id));
1808
1809 assert_eq!(all_rooms.len(), 2);
1810 assert_eq!(all_rooms[0].room_id, room_id_0);
1811 assert_eq!(all_rooms[1].room_id, room_id_1);
1812 }
1813
1814 {
1816 let all_rooms =
1817 self.get_room_infos(&RoomLoadSettings::One(room_id_1.to_owned())).await?;
1818
1819 assert_eq!(all_rooms.len(), 1);
1820 assert_eq!(all_rooms[0].room_id, room_id_1);
1821 }
1822
1823 {
1826 let all_rooms =
1827 self.get_room_infos(&RoomLoadSettings::One(room_id_2.to_owned())).await?;
1828
1829 assert_eq!(all_rooms.len(), 0);
1830 }
1831
1832 Ok(())
1833 }
1834
1835 async fn test_thread_subscriptions(&self) -> TestResult {
1836 let first_thread = event_id!("$t1");
1837 let second_thread = event_id!("$t2");
1838
1839 let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1841 assert!(maybe_sub.is_none());
1842
1843 let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1844 assert!(maybe_sub.is_none());
1845
1846 self.upsert_thread_subscription(
1848 room_id(),
1849 first_thread,
1850 StoredThreadSubscription {
1851 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1852 bump_stamp: None,
1853 },
1854 )
1855 .await?;
1856
1857 self.upsert_thread_subscription(
1858 room_id(),
1859 second_thread,
1860 StoredThreadSubscription {
1861 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1862 bump_stamp: None,
1863 },
1864 )
1865 .await?;
1866
1867 let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1869 assert_eq!(
1870 maybe_sub,
1871 Some(StoredThreadSubscription {
1872 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1873 bump_stamp: None,
1874 })
1875 );
1876
1877 let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1878 assert_eq!(
1879 maybe_sub,
1880 Some(StoredThreadSubscription {
1881 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1882 bump_stamp: None,
1883 })
1884 );
1885
1886 self.upsert_thread_subscription(
1888 room_id(),
1889 first_thread,
1890 StoredThreadSubscription {
1891 status: ThreadSubscriptionStatus::Unsubscribed,
1892 bump_stamp: None,
1893 },
1894 )
1895 .await?;
1896
1897 let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1899 assert_eq!(
1900 maybe_sub,
1901 Some(StoredThreadSubscription {
1902 status: ThreadSubscriptionStatus::Unsubscribed,
1903 bump_stamp: None,
1904 })
1905 );
1906
1907 let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1909 assert_eq!(
1910 maybe_sub,
1911 Some(StoredThreadSubscription {
1912 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1913 bump_stamp: None,
1914 })
1915 );
1916
1917 self.remove_thread_subscription(room_id(), second_thread).await?;
1919
1920 let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1922 assert_eq!(maybe_sub, None);
1923
1924 let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1926 assert_eq!(
1927 maybe_sub,
1928 Some(StoredThreadSubscription {
1929 status: ThreadSubscriptionStatus::Unsubscribed,
1930 bump_stamp: None,
1931 })
1932 );
1933
1934 self.remove_thread_subscription(room_id(), second_thread).await?;
1936
1937 Ok(())
1938 }
1939
1940 async fn test_thread_subscriptions_bumpstamps(&self) -> TestResult {
1941 let thread = event_id!("$fred");
1942
1943 let sub = self.load_thread_subscription(room_id(), thread).await?;
1945 assert!(sub.is_none());
1946
1947 self.upsert_thread_subscription(
1949 room_id(),
1950 thread,
1951 StoredThreadSubscription {
1952 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1953 bump_stamp: Some(42),
1954 },
1955 )
1956 .await?;
1957
1958 let sub = self.load_thread_subscription(room_id(), thread).await?.unwrap();
1959 assert_eq!(
1960 sub,
1961 StoredThreadSubscription {
1962 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1963 bump_stamp: Some(42),
1964 }
1965 );
1966
1967 self.upsert_thread_subscription(
1969 room_id(),
1970 thread,
1971 StoredThreadSubscription {
1972 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1973 bump_stamp: Some(41),
1974 },
1975 )
1976 .await?;
1977
1978 let sub = self.load_thread_subscription(room_id(), thread).await?.unwrap();
1979 assert_eq!(
1980 sub,
1981 StoredThreadSubscription {
1982 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1983 bump_stamp: Some(42),
1984 }
1985 );
1986
1987 self.upsert_thread_subscription(
1989 room_id(),
1990 thread,
1991 StoredThreadSubscription {
1992 status: ThreadSubscriptionStatus::Unsubscribed,
1993 bump_stamp: None,
1994 },
1995 )
1996 .await?;
1997
1998 let sub = self.load_thread_subscription(room_id(), thread).await?.unwrap();
1999 assert_eq!(
2000 sub,
2001 StoredThreadSubscription {
2002 status: ThreadSubscriptionStatus::Unsubscribed,
2003 bump_stamp: Some(42),
2004 }
2005 );
2006
2007 Ok(())
2008 }
2009
2010 async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult {
2011 let threads = [
2012 event_id!("$t1"),
2013 event_id!("$t2"),
2014 event_id!("$t3"),
2015 event_id!("$t4"),
2016 event_id!("$t5"),
2017 event_id!("$t6"),
2018 ];
2019 let build_subscription_updates = |subs: &[StoredThreadSubscription]| {
2022 threads
2023 .iter()
2024 .zip(subs)
2025 .map(|(&event_id, &sub)| (room_id(), event_id, sub))
2026 .collect::<Vec<_>>()
2027 };
2028
2029 let initial_subscriptions = build_subscription_updates(&[
2031 StoredThreadSubscription {
2032 status: ThreadSubscriptionStatus::Unsubscribed,
2033 bump_stamp: None,
2034 },
2035 StoredThreadSubscription {
2036 status: ThreadSubscriptionStatus::Unsubscribed,
2037 bump_stamp: Some(14),
2038 },
2039 StoredThreadSubscription {
2040 status: ThreadSubscriptionStatus::Unsubscribed,
2041 bump_stamp: None,
2042 },
2043 StoredThreadSubscription {
2044 status: ThreadSubscriptionStatus::Unsubscribed,
2045 bump_stamp: Some(210),
2046 },
2047 StoredThreadSubscription {
2048 status: ThreadSubscriptionStatus::Unsubscribed,
2049 bump_stamp: Some(5),
2050 },
2051 StoredThreadSubscription {
2052 status: ThreadSubscriptionStatus::Unsubscribed,
2053 bump_stamp: Some(100),
2054 },
2055 ]);
2056
2057 let update_subscriptions = build_subscription_updates(&[
2058 StoredThreadSubscription {
2059 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2060 bump_stamp: None,
2061 },
2062 StoredThreadSubscription {
2063 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2064 bump_stamp: None,
2065 },
2066 StoredThreadSubscription {
2067 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2068 bump_stamp: Some(1101),
2069 },
2070 StoredThreadSubscription {
2071 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2072 bump_stamp: Some(222),
2073 },
2074 StoredThreadSubscription {
2075 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2076 bump_stamp: Some(1),
2077 },
2078 StoredThreadSubscription {
2079 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2080 bump_stamp: Some(100),
2081 },
2082 ]);
2083
2084 let expected_subscriptions = build_subscription_updates(&[
2085 StoredThreadSubscription {
2087 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2088 bump_stamp: None,
2089 },
2090 StoredThreadSubscription {
2092 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2093 bump_stamp: Some(14),
2094 },
2095 StoredThreadSubscription {
2097 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2098 bump_stamp: Some(1101),
2099 },
2100 StoredThreadSubscription {
2102 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2103 bump_stamp: Some(222),
2104 },
2105 StoredThreadSubscription {
2107 status: ThreadSubscriptionStatus::Unsubscribed,
2108 bump_stamp: Some(5),
2109 },
2110 StoredThreadSubscription {
2112 status: ThreadSubscriptionStatus::Unsubscribed,
2113 bump_stamp: Some(100),
2114 },
2115 ]);
2116
2117 self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?;
2119
2120 for (room_id, thread_id, expected_sub) in &initial_subscriptions {
2122 let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2123 assert_eq!(stored_subscription, Some(*expected_sub));
2124 }
2125
2126 self.upsert_thread_subscriptions(update_subscriptions).await?;
2128
2129 for (room_id, thread_id, expected_sub) in &expected_subscriptions {
2131 let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2132 assert_eq!(stored_subscription, Some(*expected_sub));
2133 }
2134
2135 for (room_id, thread_id, _) in &expected_subscriptions {
2137 self.remove_thread_subscription(room_id, thread_id).await?;
2138 }
2139
2140 let initial_subscriptions = build_subscription_updates(&[
2141 StoredThreadSubscription {
2142 status: ThreadSubscriptionStatus::Unsubscribed,
2143 bump_stamp: Some(1),
2144 },
2145 StoredThreadSubscription {
2146 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
2147 bump_stamp: Some(1),
2148 },
2149 StoredThreadSubscription {
2150 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2151 bump_stamp: Some(1),
2152 },
2153 ]);
2154
2155 self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?;
2156
2157 for (room_id, thread_id, expected_sub) in &initial_subscriptions {
2158 let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2159 assert_eq!(stored_subscription, Some(*expected_sub));
2160 }
2161
2162 let update_subscriptions = build_subscription_updates(&[
2163 StoredThreadSubscription {
2164 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2165 bump_stamp: Some(2),
2166 },
2167 StoredThreadSubscription {
2168 status: ThreadSubscriptionStatus::Unsubscribed,
2169 bump_stamp: Some(2),
2170 },
2171 StoredThreadSubscription {
2172 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
2173 bump_stamp: Some(2),
2174 },
2175 ]);
2176
2177 self.upsert_thread_subscriptions(update_subscriptions.clone()).await?;
2178
2179 for (room_id, thread_id, expected_sub) in &update_subscriptions {
2180 let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2181 assert_eq!(stored_subscription, Some(*expected_sub));
2182 }
2183
2184 Ok(())
2185 }
2186}
2187
2188#[allow(unused_macros, unused_extern_crates)]
2214#[macro_export]
2215macro_rules! statestore_integration_tests {
2216 () => {
2217 mod statestore_integration_tests {
2218 use matrix_sdk_test::{TestResult, async_test};
2219 use $crate::store::{IntoStateStore, StateStoreIntegrationTests};
2220
2221 use super::get_store;
2222
2223 #[async_test]
2224 async fn test_topic_redaction() -> TestResult {
2225 let store = get_store().await?.into_state_store();
2226 store.test_topic_redaction().await
2227 }
2228
2229 #[async_test]
2230 async fn test_populate_store() -> TestResult {
2231 let store = get_store().await?.into_state_store();
2232 store.test_populate_store().await
2233 }
2234
2235 #[async_test]
2236 async fn test_member_saving() -> TestResult {
2237 let store = get_store().await?.into_state_store();
2238 store.test_member_saving().await
2239 }
2240
2241 #[async_test]
2242 async fn test_filter_saving() -> TestResult {
2243 let store = get_store().await?.into_state_store();
2244 store.test_filter_saving().await
2245 }
2246
2247 #[async_test]
2248 async fn test_user_avatar_url_saving() -> TestResult {
2249 let store = get_store().await?.into_state_store();
2250 store.test_user_avatar_url_saving().await
2251 }
2252
2253 #[async_test]
2254 async fn test_supported_versions_saving() -> TestResult {
2255 let store = get_store().await?.into_state_store();
2256 store.test_supported_versions_saving().await
2257 }
2258
2259 #[async_test]
2260 async fn test_well_known_saving() -> TestResult {
2261 let store = get_store().await?.into_state_store();
2262 store.test_well_known_saving().await
2263 }
2264
2265 #[async_test]
2266 async fn test_sync_token_saving() -> TestResult {
2267 let store = get_store().await?.into_state_store();
2268 store.test_sync_token_saving().await
2269 }
2270
2271 #[async_test]
2272 async fn test_utd_hook_manager_data_saving() -> TestResult {
2273 let store = get_store().await?.into_state_store();
2274 store.test_utd_hook_manager_data_saving().await
2275 }
2276
2277 #[async_test]
2278 async fn test_one_time_key_already_uploaded_data_saving() -> TestResult {
2279 let store = get_store().await?.into_state_store();
2280 store.test_one_time_key_already_uploaded_data_saving().await
2281 }
2282
2283 #[async_test]
2284 async fn test_stripped_member_saving() -> TestResult {
2285 let store = get_store().await?.into_state_store();
2286 store.test_stripped_member_saving().await
2287 }
2288
2289 #[async_test]
2290 async fn test_power_level_saving() -> TestResult {
2291 let store = get_store().await?.into_state_store();
2292 store.test_power_level_saving().await
2293 }
2294
2295 #[async_test]
2296 async fn test_receipts_saving() -> TestResult {
2297 let store = get_store().await?.into_state_store();
2298 store.test_receipts_saving().await
2299 }
2300
2301 #[async_test]
2302 async fn test_custom_storage() -> TestResult {
2303 let store = get_store().await?.into_state_store();
2304 store.test_custom_storage().await
2305 }
2306
2307 #[async_test]
2308 async fn test_stripped_non_stripped() -> TestResult {
2309 let store = get_store().await?.into_state_store();
2310 store.test_stripped_non_stripped().await
2311 }
2312
2313 #[async_test]
2314 async fn test_room_removal() -> TestResult {
2315 let store = get_store().await?.into_state_store();
2316 store.test_room_removal().await
2317 }
2318
2319 #[async_test]
2320 async fn test_profile_removal() -> TestResult {
2321 let store = get_store().await?.into_state_store();
2322 store.test_profile_removal().await
2323 }
2324
2325 #[async_test]
2326 async fn test_presence_saving() -> TestResult {
2327 let store = get_store().await?.into_state_store();
2328 store.test_presence_saving().await
2329 }
2330
2331 #[async_test]
2332 async fn test_display_names_saving() -> TestResult {
2333 let store = get_store().await?.into_state_store();
2334 store.test_display_names_saving().await
2335 }
2336
2337 #[async_test]
2338 async fn test_send_queue() -> TestResult {
2339 let store = get_store().await?.into_state_store();
2340 store.test_send_queue().await
2341 }
2342
2343 #[async_test]
2344 async fn test_send_queue_priority() -> TestResult {
2345 let store = get_store().await?.into_state_store();
2346 store.test_send_queue_priority().await
2347 }
2348
2349 #[async_test]
2350 async fn test_send_queue_dependents() -> TestResult {
2351 let store = get_store().await?.into_state_store();
2352 store.test_send_queue_dependents().await
2353 }
2354
2355 #[async_test]
2356 async fn test_update_send_queue_dependent() -> TestResult {
2357 let store = get_store().await?.into_state_store();
2358 store.test_update_send_queue_dependent().await
2359 }
2360
2361 #[async_test]
2362 async fn test_get_room_infos() -> TestResult {
2363 let store = get_store().await?.into_state_store();
2364 store.test_get_room_infos().await
2365 }
2366
2367 #[async_test]
2368 async fn test_thread_subscriptions() -> TestResult {
2369 let store = get_store().await?.into_state_store();
2370 store.test_thread_subscriptions().await
2371 }
2372
2373 #[async_test]
2374 async fn test_thread_subscriptions_bumpstamps() -> TestResult {
2375 let store = get_store().await?.into_state_store();
2376 store.test_thread_subscriptions_bumpstamps().await
2377 }
2378
2379 #[async_test]
2380 async fn test_thread_subscriptions_bulk_upsert() -> TestResult {
2381 let store = get_store().await?.into_state_store();
2382 store.test_thread_subscriptions_bulk_upsert().await
2383 }
2384 }
2385 };
2386}
2387
2388fn user_id() -> &'static UserId {
2389 user_id!("@example:localhost")
2390}
2391
2392fn invited_user_id() -> &'static UserId {
2393 user_id!("@invited:localhost")
2394}
2395
2396fn room_id() -> &'static RoomId {
2397 room_id!("!test:localhost")
2398}
2399
2400fn stripped_room_id() -> &'static RoomId {
2401 room_id!("!stripped:localhost")
2402}
2403
2404fn first_receipt_event_id() -> &'static EventId {
2405 event_id!("$example")
2406}
2407
2408fn power_level_event() -> Raw<AnySyncStateEvent> {
2409 let content = RoomPowerLevelsEventContent::new(&AuthorizationRules::V1);
2410
2411 let event = json!({
2412 "event_id": "$h29iv0s8:example.com",
2413 "content": content,
2414 "sender": user_id(),
2415 "type": "m.room.power_levels",
2416 "origin_server_ts": 0u64,
2417 "state_key": "",
2418 });
2419
2420 serde_json::from_value(event).unwrap()
2421}
2422
2423fn stripped_membership_event() -> Raw<StrippedRoomMemberEvent> {
2424 custom_stripped_membership_event(user_id())
2425}
2426
2427fn custom_stripped_membership_event(user_id: &UserId) -> Raw<StrippedRoomMemberEvent> {
2428 let ev_json = json!({
2429 "type": "m.room.member",
2430 "content": RoomMemberEventContent::new(MembershipState::Join),
2431 "sender": user_id,
2432 "state_key": user_id,
2433 });
2434
2435 Raw::new(&ev_json).unwrap().cast_unchecked()
2436}
2437
2438fn membership_event() -> Raw<SyncRoomMemberEvent> {
2439 custom_membership_event(user_id(), event_id!("$h29iv0s8:example.com"))
2440}
2441
2442fn custom_membership_event(user_id: &UserId, event_id: &EventId) -> Raw<SyncRoomMemberEvent> {
2443 let ev_json = json!({
2444 "type": "m.room.member",
2445 "content": RoomMemberEventContent::new(MembershipState::Join),
2446 "event_id": event_id,
2447 "origin_server_ts": 198,
2448 "sender": user_id,
2449 "state_key": user_id,
2450 });
2451
2452 Raw::new(&ev_json).unwrap().cast_unchecked()
2453}
2454
2455fn custom_presence_event(user_id: &UserId) -> Raw<PresenceEvent> {
2456 let ev_json = json!({
2457 "content": {
2458 "presence": "online"
2459 },
2460 "sender": user_id,
2461 });
2462
2463 Raw::new(&ev_json).unwrap().cast_unchecked()
2464}