1use std::collections::{BTreeMap, BTreeSet, HashMap};
4
5use assert_matches::assert_matches;
6use assert_matches2::assert_let;
7use growable_bloom_filter::GrowableBloomBuilder;
8use matrix_sdk_test::{TestResult, event_factory::EventFactory, test_json};
9use ruma::{
10 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId, RoomId, TransactionId, UserId,
11 api::{
12 FeatureFlag, MatrixVersion,
13 client::discovery::discover_homeserver::{HomeserverInfo, RtcFocusInfo},
14 },
15 event_id,
16 events::{
17 AnyGlobalAccountDataEvent, AnyMessageLikeEventContent, AnyRoomAccountDataEvent,
18 AnyStrippedStateEvent, AnySyncStateEvent, GlobalAccountDataEventType,
19 RoomAccountDataEventType, StateEventType, SyncStateEvent,
20 presence::PresenceEvent,
21 receipt::{ReceiptThread, ReceiptType},
22 room::{
23 member::{
24 MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent,
25 SyncRoomMemberEvent,
26 },
27 message::RoomMessageEventContent,
28 power_levels::RoomPowerLevelsEventContent,
29 topic::RoomTopicEventContent,
30 },
31 },
32 owned_event_id, owned_mxc_uri,
33 push::Ruleset,
34 room_id,
35 room_version_rules::AuthorizationRules,
36 serde::Raw,
37 uint, user_id,
38};
39use serde_json::{json, value::Value as JsonValue};
40
41use super::{
42 DependentQueuedRequestKind, DisplayName, DynStateStore, RoomLoadSettings,
43 SupportedVersionsResponse, TtlStoreValue, WellKnownResponse, send_queue::SentRequestKey,
44};
45use crate::{
46 RoomInfo, RoomMemberships, RoomState, StateChanges, StateStoreDataKey, StateStoreDataValue,
47 deserialized_responses::MemberEvent,
48 store::{
49 ChildTransactionId, QueueWedgeError, SerializableEventContent, StateStoreExt,
50 StoredThreadSubscription, ThreadSubscriptionStatus,
51 },
52 utils::RawSyncStateEventWithKeys,
53};
54
55#[allow(async_fn_in_trait)]
60pub trait StateStoreIntegrationTests {
61 async fn populate(&self) -> TestResult;
63 async fn test_topic_redaction(&self) -> TestResult;
65 async fn test_populate_store(&self) -> TestResult;
67 async fn test_member_saving(&self) -> TestResult;
69 async fn test_filter_saving(&self) -> TestResult;
71 async fn test_user_avatar_url_saving(&self) -> TestResult;
73 async fn test_sync_token_saving(&self) -> TestResult;
75 async fn test_utd_hook_manager_data_saving(&self) -> TestResult;
77 async fn test_one_time_key_already_uploaded_data_saving(&self) -> TestResult;
79 async fn test_stripped_member_saving(&self) -> TestResult;
81 async fn test_power_level_saving(&self) -> TestResult;
83 async fn test_receipts_saving(&self) -> TestResult;
85 async fn test_custom_storage(&self) -> TestResult;
87 async fn test_stripped_non_stripped(&self) -> TestResult;
89 async fn test_room_removal(&self) -> TestResult;
91 async fn test_profile_removal(&self) -> TestResult;
93 async fn test_presence_saving(&self) -> TestResult;
95 async fn test_display_names_saving(&self) -> TestResult;
97 async fn test_send_queue(&self) -> TestResult;
99 async fn test_send_queue_priority(&self) -> TestResult;
101 async fn test_send_queue_dependents(&self) -> TestResult;
103 async fn test_update_send_queue_dependent(&self) -> TestResult;
105 async fn test_supported_versions_saving(&self) -> TestResult;
107 async fn test_well_known_saving(&self) -> TestResult;
109 async fn test_get_room_infos(&self) -> TestResult;
111 async fn test_thread_subscriptions(&self) -> TestResult;
113 async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult;
115}
116
117impl StateStoreIntegrationTests for DynStateStore {
118 async fn populate(&self) -> TestResult {
119 let f = EventFactory::new();
120 let mut changes = StateChanges::default();
121
122 let user_id = user_id();
123 let invited_user_id = invited_user_id();
124 let room_id = room_id();
125 let stripped_room_id = stripped_room_id();
126
127 changes.sync_token = Some("t392-516_47314_0_7_1_1_1_11444_1".to_owned());
128
129 let presence_json: &JsonValue = &test_json::PRESENCE;
130 let presence_raw = serde_json::from_value::<Raw<PresenceEvent>>(presence_json.clone())?;
131 let presence_event = presence_raw.deserialize()?;
132 changes.add_presence_event(presence_event, presence_raw);
133
134 let pushrules_raw: Raw<AnyGlobalAccountDataEvent> =
135 f.push_rules(Ruleset::server_default(user_id)).into();
136 let pushrules_event = pushrules_raw.deserialize()?;
137 changes.account_data.insert(pushrules_event.event_type(), pushrules_raw);
138
139 let mut room = RoomInfo::new(room_id, RoomState::Joined);
140 room.mark_as_left();
141
142 let tag_json: &JsonValue = &test_json::TAG;
143 let tag_raw = serde_json::from_value::<Raw<AnyRoomAccountDataEvent>>(tag_json.clone())?;
144 let tag_event = tag_raw.deserialize()?;
145 changes.add_room_account_data(room_id, tag_event, tag_raw);
146
147 let name_json: &JsonValue = &test_json::NAME;
148 let name_raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(name_json.clone())?;
149 let name_event = name_raw.deserialize()?;
150 room.handle_state_event(
151 &mut RawSyncStateEventWithKeys::try_from_raw_state_event(name_raw.clone())
152 .expect("generated state event should be valid"),
153 );
154 changes.add_state_event(room_id, name_event, name_raw);
155
156 let topic_json: &JsonValue = &test_json::TOPIC;
157 let topic_raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(topic_json.clone())?;
158 let topic_event = topic_raw.deserialize()?;
159 room.handle_state_event(
160 &mut RawSyncStateEventWithKeys::try_from_raw_state_event(topic_raw.clone())
161 .expect("generated state event should be valid"),
162 );
163 changes.add_state_event(room_id, topic_event, topic_raw);
164
165 let mut room_ambiguity_map = HashMap::new();
166 let mut room_profiles = BTreeMap::new();
167
168 let member_json: &JsonValue = &test_json::MEMBER;
169 let member_event: SyncRoomMemberEvent = serde_json::from_value(member_json.clone())?;
170 let displayname = DisplayName::new(
171 member_event.as_original().unwrap().content.displayname.as_ref().unwrap(),
172 );
173 room_ambiguity_map.insert(displayname.clone(), BTreeSet::from([user_id.to_owned()]));
174 room_profiles.insert(user_id.to_owned(), (&member_event).into());
175
176 let member_state_raw =
177 serde_json::from_value::<Raw<AnySyncStateEvent>>(member_json.clone())?;
178 let member_state_event = member_state_raw.deserialize()?;
179 changes.add_state_event(room_id, member_state_event, member_state_raw);
180
181 let invited_member_json: &JsonValue = &test_json::MEMBER_INVITE;
182 let invited_member_event: SyncRoomMemberEvent =
184 serde_json::from_value(invited_member_json.clone())?;
185 room_ambiguity_map.entry(displayname).or_default().insert(invited_user_id.to_owned());
186 room_profiles.insert(invited_user_id.to_owned(), (&invited_member_event).into());
187
188 let invited_member_state_raw =
189 serde_json::from_value::<Raw<AnySyncStateEvent>>(invited_member_json.clone())?;
190 let invited_member_state_event = invited_member_state_raw.deserialize()?;
191 changes.add_state_event(room_id, invited_member_state_event, invited_member_state_raw);
192
193 let receipt_content = f
194 .room(room_id)
195 .read_receipts()
196 .add(event_id!("$example"), user_id, ReceiptType::Read, ReceiptThread::Unthreaded)
197 .into_content();
198 changes.add_receipts(room_id, receipt_content);
199
200 changes.ambiguity_maps.insert(room_id.to_owned(), room_ambiguity_map);
201 changes.profiles.insert(room_id.to_owned(), room_profiles);
202 changes.add_room(room);
203
204 let mut stripped_room = RoomInfo::new(stripped_room_id, RoomState::Invited);
205
206 let stripped_name_json: &JsonValue = &test_json::NAME_STRIPPED;
207 let stripped_name_raw =
208 serde_json::from_value::<Raw<AnyStrippedStateEvent>>(stripped_name_json.clone())?;
209 let stripped_name_event = stripped_name_raw.deserialize()?;
210 stripped_room.handle_stripped_state_event(&stripped_name_event);
211 changes.stripped_state.insert(
212 stripped_room_id.to_owned(),
213 BTreeMap::from([(
214 stripped_name_event.event_type(),
215 BTreeMap::from([(
216 stripped_name_event.state_key().to_owned(),
217 stripped_name_raw.clone(),
218 )]),
219 )]),
220 );
221
222 changes.add_room(stripped_room);
223
224 let stripped_member_json: &JsonValue = &test_json::MEMBER_STRIPPED;
225 let stripped_member_event = Raw::new(&stripped_member_json.clone())?.cast_unchecked();
226 changes.add_stripped_member(stripped_room_id, user_id, stripped_member_event);
227
228 self.save_changes(&changes).await?;
229
230 Ok(())
231 }
232
233 async fn test_topic_redaction(&self) -> TestResult {
234 let room_id = room_id();
235 self.populate().await?;
236
237 assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
238 assert_eq!(
239 self.get_state_event_static::<RoomTopicEventContent>(room_id)
240 .await?
241 .expect("room topic found before redaction")
242 .deserialize()
243 .expect("can deserialize room topic before redaction")
244 .as_sync()
245 .expect("room topic is a sync state event")
246 .as_original()
247 .expect("room topic is not redacted yet")
248 .content
249 .topic,
250 "😀"
251 );
252
253 let mut changes = StateChanges::default();
254
255 let redaction_json: &JsonValue = &test_json::TOPIC_REDACTION;
256 let redaction_evt: Raw<_> = serde_json::from_value(redaction_json.clone())
257 .expect("topic redaction event making works");
258 let redacted_event_id: OwnedEventId = redaction_evt.get_field("redacts")?.unwrap();
259
260 changes.add_redaction(room_id, &redacted_event_id, redaction_evt);
261 self.save_changes(&changes).await?;
262
263 let redacted_event = self
264 .get_state_event_static::<RoomTopicEventContent>(room_id)
265 .await?
266 .expect("room topic found after redaction")
267 .deserialize()
268 .expect("can deserialize room topic after redaction");
269
270 assert_matches!(redacted_event.as_sync(), Some(SyncStateEvent::Redacted(_)));
271
272 Ok(())
273 }
274
275 async fn test_populate_store(&self) -> TestResult {
276 let room_id = room_id();
277 let user_id = user_id();
278 let display_name = DisplayName::new("example");
279
280 self.populate().await?;
281
282 assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
283 assert!(self.get_presence_event(user_id).await?.is_some());
284 assert_eq!(
285 self.get_room_infos(&RoomLoadSettings::default()).await?.len(),
286 2,
287 "Expected to find 2 room infos"
288 );
289 assert!(
290 self.get_account_data_event(GlobalAccountDataEventType::PushRules).await?.is_some()
291 );
292
293 assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_some());
294 assert_eq!(
295 self.get_state_events(room_id, StateEventType::RoomTopic).await?.len(),
296 1,
297 "Expected to find 1 room topic"
298 );
299 assert!(self.get_profile(room_id, user_id).await?.is_some());
300 assert!(self.get_member_event(room_id, user_id).await?.is_some());
301 assert_eq!(
302 self.get_user_ids(room_id, RoomMemberships::empty()).await?.len(),
303 2,
304 "Expected to find 2 members for room"
305 );
306 assert_eq!(
307 self.get_user_ids(room_id, RoomMemberships::INVITE).await?.len(),
308 1,
309 "Expected to find 1 invited user ids"
310 );
311 assert_eq!(
312 self.get_user_ids(room_id, RoomMemberships::JOIN).await?.len(),
313 1,
314 "Expected to find 1 joined user ids"
315 );
316 assert_eq!(
317 self.get_users_with_display_name(room_id, &display_name).await?.len(),
318 2,
319 "Expected to find 2 display names for room"
320 );
321 assert!(
322 self.get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
323 .await?
324 .is_some()
325 );
326 assert!(
327 self.get_user_room_receipt_event(
328 room_id,
329 ReceiptType::Read,
330 ReceiptThread::Unthreaded,
331 user_id
332 )
333 .await?
334 .is_some()
335 );
336 assert_eq!(
337 self.get_event_room_receipt_events(
338 room_id,
339 ReceiptType::Read,
340 ReceiptThread::Unthreaded,
341 first_receipt_event_id()
342 )
343 .await?
344 .len(),
345 1,
346 "Expected to find 1 read receipt"
347 );
348 Ok(())
349 }
350
351 async fn test_member_saving(&self) -> TestResult {
352 let room_id = room_id!("!test_member_saving:localhost");
353 let user_id = user_id();
354 let second_user_id = user_id!("@second:localhost");
355 let third_user_id = user_id!("@third:localhost");
356 let unknown_user_id = user_id!("@unknown:localhost");
357
358 let mut user_ids = vec![user_id.to_owned()];
360 assert!(self.get_member_event(room_id, user_id).await?.is_none());
361 let member_events = self
362 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
363 .await;
364 assert!(member_events?.is_empty());
365 assert!(self.get_profile(room_id, user_id).await?.is_none());
366 let profiles = self.get_profiles(room_id, &user_ids).await;
367 assert!(profiles?.is_empty());
368
369 let mut changes = StateChanges::default();
371 let raw_member_event = membership_event();
372 let profile = raw_member_event.deserialize()?.into();
373 changes
374 .state
375 .entry(room_id.to_owned())
376 .or_default()
377 .entry(StateEventType::RoomMember)
378 .or_default()
379 .insert(user_id.into(), raw_member_event.cast());
380 changes.profiles.entry(room_id.to_owned()).or_default().insert(user_id.to_owned(), profile);
381 self.save_changes(&changes).await?;
382
383 assert!(self.get_member_event(room_id, user_id).await?.is_some());
384 let member_events = self
385 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
386 .await;
387 assert_eq!(member_events?.len(), 1);
388 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
389 assert_eq!(members.len(), 1, "We expected to find members for the room");
390 assert!(self.get_profile(room_id, user_id).await?.is_some());
391 let profiles = self.get_profiles(room_id, &user_ids).await;
392 assert_eq!(profiles?.len(), 1);
393
394 let mut changes = StateChanges::default();
396 let changes_members = changes
397 .state
398 .entry(room_id.to_owned())
399 .or_default()
400 .entry(StateEventType::RoomMember)
401 .or_default();
402 let changes_profiles = changes.profiles.entry(room_id.to_owned()).or_default();
403 let raw_second_member_event =
404 custom_membership_event(second_user_id, event_id!("$second_member_event"));
405 let second_profile = raw_second_member_event.deserialize()?.into();
406 changes_members.insert(second_user_id.into(), raw_second_member_event.cast());
407 changes_profiles.insert(second_user_id.to_owned(), second_profile);
408 let raw_third_member_event =
409 custom_membership_event(third_user_id, event_id!("$third_member_event"));
410 let third_profile = raw_third_member_event.deserialize()?.into();
411 changes_members.insert(third_user_id.into(), raw_third_member_event.cast());
412 changes_profiles.insert(third_user_id.to_owned(), third_profile);
413 self.save_changes(&changes).await?;
414
415 user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
416 assert!(self.get_member_event(room_id, second_user_id).await?.is_some());
417 assert!(self.get_member_event(room_id, third_user_id).await?.is_some());
418 let member_events = self
419 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
420 .await;
421 assert_eq!(member_events?.len(), 3);
422 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
423 assert_eq!(members.len(), 3, "We expected to find members for the room");
424 assert!(self.get_profile(room_id, second_user_id).await?.is_some());
425 assert!(self.get_profile(room_id, third_user_id).await?.is_some());
426 let profiles = self.get_profiles(room_id, &user_ids).await;
427 assert_eq!(profiles?.len(), 3);
428
429 user_ids.push(unknown_user_id.to_owned());
431 let member_events = self
432 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
433 .await;
434 assert_eq!(member_events?.len(), 3);
435 let profiles = self.get_profiles(room_id, &user_ids).await;
436 assert_eq!(profiles?.len(), 3);
437
438 let member_events = self
440 .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
441 room_id,
442 &[],
443 )
444 .await;
445 assert!(member_events?.is_empty());
446 let profiles = self.get_profiles(room_id, &[]).await;
447 assert!(profiles?.is_empty());
448
449 Ok(())
450 }
451
452 async fn test_filter_saving(&self) -> TestResult {
453 let filter_name = "filter_name";
454 let filter_id = "filter_id_1234";
455
456 self.set_kv_data(
457 StateStoreDataKey::Filter(filter_name),
458 StateStoreDataValue::Filter(filter_id.to_owned()),
459 )
460 .await?;
461 assert_let!(
462 Ok(Some(StateStoreDataValue::Filter(stored_filter_id))) =
463 self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await
464 );
465 assert_eq!(stored_filter_id, filter_id);
466
467 self.remove_kv_data(StateStoreDataKey::Filter(filter_name)).await?;
468 assert_matches!(self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await, Ok(None));
469
470 Ok(())
471 }
472
473 async fn test_user_avatar_url_saving(&self) -> TestResult {
474 let user_id = user_id!("@alice:example.org");
475 let url = owned_mxc_uri!("mxc://example.org/poiuyt098");
476
477 self.set_kv_data(
478 StateStoreDataKey::UserAvatarUrl(user_id),
479 StateStoreDataValue::UserAvatarUrl(url.clone()),
480 )
481 .await?;
482
483 assert_let!(
484 Ok(Some(StateStoreDataValue::UserAvatarUrl(stored_url))) =
485 self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await
486 );
487 assert_eq!(stored_url, url);
488
489 self.remove_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await?;
490 assert_matches!(
491 self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await,
492 Ok(None)
493 );
494
495 Ok(())
496 }
497
498 async fn test_supported_versions_saving(&self) -> TestResult {
499 let versions =
500 BTreeSet::from([MatrixVersion::V1_1, MatrixVersion::V1_2, MatrixVersion::V1_11]);
501 let supported_versions = SupportedVersionsResponse {
502 versions: versions.iter().map(|version| version.as_str().unwrap().to_owned()).collect(),
503 unstable_features: [("org.matrix.experimental".to_owned(), true)].into(),
504 };
505
506 self.set_kv_data(
507 StateStoreDataKey::SupportedVersions,
508 StateStoreDataValue::SupportedVersions(TtlStoreValue::new(supported_versions.clone())),
509 )
510 .await?;
511
512 assert_let!(
513 Ok(Some(StateStoreDataValue::SupportedVersions(stored_supported_versions))) =
514 self.get_kv_data(StateStoreDataKey::SupportedVersions).await
515 );
516 assert_let!(Some(stored_supported_versions) = stored_supported_versions.into_data());
517 assert_eq!(supported_versions, stored_supported_versions);
518
519 let stored_supported = stored_supported_versions.supported_versions();
520 assert_eq!(stored_supported.versions, versions);
521 assert_eq!(stored_supported.features.len(), 1);
522 assert!(stored_supported.features.contains(&FeatureFlag::from("org.matrix.experimental")));
523
524 self.remove_kv_data(StateStoreDataKey::SupportedVersions).await?;
525 assert_matches!(self.get_kv_data(StateStoreDataKey::SupportedVersions).await, Ok(None));
526
527 Ok(())
528 }
529
530 async fn test_well_known_saving(&self) -> TestResult {
531 let well_known = WellKnownResponse {
532 homeserver: HomeserverInfo::new("matrix.example.com".to_owned()),
533 identity_server: None,
534 tile_server: None,
535 rtc_foci: vec![RtcFocusInfo::livekit("livekit.example.com".to_owned())],
536 };
537
538 self.set_kv_data(
539 StateStoreDataKey::WellKnown,
540 StateStoreDataValue::WellKnown(TtlStoreValue::new(Some(well_known.clone()))),
541 )
542 .await?;
543
544 assert_let!(
545 Ok(Some(StateStoreDataValue::WellKnown(stored_well_known))) =
546 self.get_kv_data(StateStoreDataKey::WellKnown).await
547 );
548 assert_let!(Some(stored_well_known) = stored_well_known.into_data());
549 assert_eq!(stored_well_known, Some(well_known));
550
551 self.remove_kv_data(StateStoreDataKey::WellKnown).await?;
552 assert_matches!(self.get_kv_data(StateStoreDataKey::WellKnown).await, Ok(None));
553
554 self.set_kv_data(
555 StateStoreDataKey::WellKnown,
556 StateStoreDataValue::WellKnown(TtlStoreValue::new(None)),
557 )
558 .await?;
559
560 assert_let!(
561 Ok(Some(StateStoreDataValue::WellKnown(stored_well_known))) =
562 self.get_kv_data(StateStoreDataKey::WellKnown).await
563 );
564 assert_let!(Some(stored_well_known) = stored_well_known.into_data());
565 assert_eq!(stored_well_known, None);
566
567 Ok(())
568 }
569
570 async fn test_sync_token_saving(&self) -> TestResult {
571 let sync_token_1 = "t392-516_47314_0_7_1";
572 let sync_token_2 = "t392-516_47314_0_7_2";
573
574 assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
575
576 let changes =
577 StateChanges { sync_token: Some(sync_token_1.to_owned()), ..Default::default() };
578 self.save_changes(&changes).await?;
579 assert_let!(
580 Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
581 self.get_kv_data(StateStoreDataKey::SyncToken).await
582 );
583 assert_eq!(stored_sync_token, sync_token_1);
584
585 self.set_kv_data(
586 StateStoreDataKey::SyncToken,
587 StateStoreDataValue::SyncToken(sync_token_2.to_owned()),
588 )
589 .await?;
590 assert_let!(
591 Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
592 self.get_kv_data(StateStoreDataKey::SyncToken).await
593 );
594 assert_eq!(stored_sync_token, sync_token_2);
595
596 self.remove_kv_data(StateStoreDataKey::SyncToken).await?;
597 assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
598
599 Ok(())
600 }
601
602 async fn test_utd_hook_manager_data_saving(&self) -> TestResult {
603 assert!(
605 self.get_kv_data(StateStoreDataKey::UtdHookManagerData)
606 .await
607 .expect("Could not read data")
608 .is_none(),
609 "Store was not empty at start"
610 );
611
612 let data = GrowableBloomBuilder::new().build();
614 self.set_kv_data(
615 StateStoreDataKey::UtdHookManagerData,
616 StateStoreDataValue::UtdHookManagerData(data.clone()),
617 )
618 .await
619 .expect("Could not save data");
620
621 let read_data = self
623 .get_kv_data(StateStoreDataKey::UtdHookManagerData)
624 .await
625 .expect("Could not read data")
626 .expect("no data found")
627 .into_utd_hook_manager_data()
628 .expect("not UtdHookManagerData");
629
630 assert_eq!(read_data, data);
631
632 Ok(())
633 }
634
635 async fn test_one_time_key_already_uploaded_data_saving(&self) -> TestResult {
636 assert!(
638 self.get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded).await?.is_none(),
639 "Store was not empty at start"
640 );
641
642 self.set_kv_data(
643 StateStoreDataKey::OneTimeKeyAlreadyUploaded,
644 StateStoreDataValue::OneTimeKeyAlreadyUploaded,
645 )
646 .await?;
647
648 let data = self.get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded).await?;
649 data.expect("The loaded data should be Some");
650
651 Ok(())
652 }
653
654 async fn test_stripped_member_saving(&self) -> TestResult {
655 let room_id = room_id!("!test_stripped_member_saving:localhost");
656 let user_id = user_id();
657 let second_user_id = user_id!("@second:localhost");
658 let third_user_id = user_id!("@third:localhost");
659 let unknown_user_id = user_id!("@unknown:localhost");
660
661 assert!(self.get_member_event(room_id, user_id).await?.is_none());
663 let member_events = self
664 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
665 room_id,
666 &[user_id.to_owned()],
667 )
668 .await;
669 assert!(member_events?.is_empty());
670
671 let mut changes = StateChanges::default();
673 changes
674 .stripped_state
675 .entry(room_id.to_owned())
676 .or_default()
677 .entry(StateEventType::RoomMember)
678 .or_default()
679 .insert(user_id.into(), stripped_membership_event().cast());
680 self.save_changes(&changes).await?;
681
682 assert!(self.get_member_event(room_id, user_id).await?.is_some());
683 let member_events = self
684 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
685 room_id,
686 &[user_id.to_owned()],
687 )
688 .await;
689 assert_eq!(member_events?.len(), 1);
690 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
691 assert_eq!(members.len(), 1, "We expected to find members for the room");
692
693 let mut changes = StateChanges::default();
695 let changes_members = changes
696 .stripped_state
697 .entry(room_id.to_owned())
698 .or_default()
699 .entry(StateEventType::RoomMember)
700 .or_default();
701 changes_members
702 .insert(second_user_id.into(), custom_stripped_membership_event(second_user_id).cast());
703 changes_members
704 .insert(third_user_id.into(), custom_stripped_membership_event(third_user_id).cast());
705 self.save_changes(&changes).await?;
706
707 assert!(self.get_member_event(room_id, second_user_id).await?.is_some());
708 assert!(self.get_member_event(room_id, third_user_id).await?.is_some());
709 let member_events = self
710 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
711 room_id,
712 &[user_id.to_owned(), second_user_id.to_owned(), third_user_id.to_owned()],
713 )
714 .await;
715 assert_eq!(member_events?.len(), 3);
716 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
717 assert_eq!(members.len(), 3, "We expected to find members for the room");
718
719 let member_events = self
721 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
722 room_id,
723 &[
724 user_id.to_owned(),
725 second_user_id.to_owned(),
726 third_user_id.to_owned(),
727 unknown_user_id.to_owned(),
728 ],
729 )
730 .await;
731 assert_eq!(member_events?.len(), 3);
732
733 let member_events = self
735 .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
736 room_id,
737 &[],
738 )
739 .await;
740 assert!(member_events?.is_empty());
741
742 Ok(())
743 }
744
745 async fn test_power_level_saving(&self) -> TestResult {
746 let room_id = room_id!("!test_power_level_saving:localhost");
747
748 let raw_event = power_level_event();
749 let event = raw_event.deserialize()?;
750
751 assert!(
752 self.get_state_event(room_id, StateEventType::RoomPowerLevels, "").await?.is_none()
753 );
754 let mut changes = StateChanges::default();
755 changes.add_state_event(room_id, event, raw_event);
756
757 self.save_changes(&changes).await?;
758 assert!(
759 self.get_state_event(room_id, StateEventType::RoomPowerLevels, "").await?.is_some()
760 );
761
762 Ok(())
763 }
764
765 async fn test_receipts_saving(&self) -> TestResult {
766 let room_id = room_id!("!test_receipts_saving:localhost");
767
768 let first_event_id = event_id!("$1435641916114394fHBLK:matrix.org");
769 let second_event_id = event_id!("$fHBLK1435641916114394:matrix.org");
770
771 let first_receipt_ts = uint!(1436451550);
772 let second_receipt_ts = uint!(1436451653);
773 let third_receipt_ts = uint!(1436474532);
774
775 let first_receipt_event = serde_json::from_value(json!({
776 first_event_id: {
777 "m.read": {
778 user_id(): {
779 "ts": first_receipt_ts,
780 }
781 }
782 }
783 }))?;
784
785 let second_receipt_event = serde_json::from_value(json!({
786 second_event_id: {
787 "m.read": {
788 user_id(): {
789 "ts": second_receipt_ts,
790 }
791 }
792 }
793 }))?;
794
795 let third_receipt_event = serde_json::from_value(json!({
796 second_event_id: {
797 "m.read": {
798 user_id(): {
799 "ts": third_receipt_ts,
800 "thread_id": "main",
801 }
802 }
803 }
804 }))?;
805
806 assert!(
807 self.get_user_room_receipt_event(
808 room_id,
809 ReceiptType::Read,
810 ReceiptThread::Unthreaded,
811 user_id()
812 )
813 .await
814 .expect("failed to read unthreaded user room receipt")
815 .is_none()
816 );
817 assert!(
818 self.get_event_room_receipt_events(
819 room_id,
820 ReceiptType::Read,
821 ReceiptThread::Unthreaded,
822 first_event_id
823 )
824 .await
825 .expect("failed to read unthreaded event room receipt for 1")
826 .is_empty()
827 );
828 assert!(
829 self.get_event_room_receipt_events(
830 room_id,
831 ReceiptType::Read,
832 ReceiptThread::Unthreaded,
833 second_event_id
834 )
835 .await
836 .expect("failed to read unthreaded event room receipt for 2")
837 .is_empty()
838 );
839
840 let mut changes = StateChanges::default();
841 changes.add_receipts(room_id, first_receipt_event);
842
843 self.save_changes(&changes).await?;
844 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
845 .get_user_room_receipt_event(
846 room_id,
847 ReceiptType::Read,
848 ReceiptThread::Unthreaded,
849 user_id(),
850 )
851 .await
852 .expect("failed to read unthreaded user room receipt after save")
853 .unwrap();
854 assert_eq!(unthreaded_user_receipt_event_id, first_event_id);
855 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, first_receipt_ts);
856 let first_event_unthreaded_receipts = self
857 .get_event_room_receipt_events(
858 room_id,
859 ReceiptType::Read,
860 ReceiptThread::Unthreaded,
861 first_event_id,
862 )
863 .await
864 .expect("failed to read unthreaded event room receipt for 1 after save");
865 assert_eq!(
866 first_event_unthreaded_receipts.len(),
867 1,
868 "Found a wrong number of unthreaded receipts for 1 after save"
869 );
870 assert_eq!(first_event_unthreaded_receipts[0].0, user_id());
871 assert_eq!(first_event_unthreaded_receipts[0].1.ts.unwrap().0, first_receipt_ts);
872 assert!(
873 self.get_event_room_receipt_events(
874 room_id,
875 ReceiptType::Read,
876 ReceiptThread::Unthreaded,
877 second_event_id
878 )
879 .await
880 .expect("failed to read unthreaded event room receipt for 2 after save")
881 .is_empty()
882 );
883
884 let mut changes = StateChanges::default();
885 changes.add_receipts(room_id, second_receipt_event);
886
887 self.save_changes(&changes).await.expect("Saving works");
888 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
889 .get_user_room_receipt_event(
890 room_id,
891 ReceiptType::Read,
892 ReceiptThread::Unthreaded,
893 user_id(),
894 )
895 .await
896 .expect("Getting unthreaded user room receipt after save failed")
897 .unwrap();
898 assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
899 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
900 assert!(
901 self.get_event_room_receipt_events(
902 room_id,
903 ReceiptType::Read,
904 ReceiptThread::Unthreaded,
905 first_event_id
906 )
907 .await
908 .expect("Getting unthreaded event room receipt events for first event failed")
909 .is_empty()
910 );
911 let second_event_unthreaded_receipts = self
912 .get_event_room_receipt_events(
913 room_id,
914 ReceiptType::Read,
915 ReceiptThread::Unthreaded,
916 second_event_id,
917 )
918 .await
919 .expect("Getting unthreaded event room receipt events for second event failed");
920 assert_eq!(
921 second_event_unthreaded_receipts.len(),
922 1,
923 "Found a wrong number of unthreaded receipts for second event after save"
924 );
925 assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
926 assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
927
928 assert!(
929 self.get_user_room_receipt_event(
930 room_id,
931 ReceiptType::Read,
932 ReceiptThread::Main,
933 user_id()
934 )
935 .await
936 .expect("failed to read threaded user room receipt")
937 .is_none()
938 );
939 assert!(
940 self.get_event_room_receipt_events(
941 room_id,
942 ReceiptType::Read,
943 ReceiptThread::Main,
944 second_event_id
945 )
946 .await
947 .expect("Getting threaded event room receipts for 2 failed")
948 .is_empty()
949 );
950
951 let mut changes = StateChanges::default();
952 changes.add_receipts(room_id, third_receipt_event);
953
954 self.save_changes(&changes).await.expect("Saving works");
955 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
957 .get_user_room_receipt_event(
958 room_id,
959 ReceiptType::Read,
960 ReceiptThread::Unthreaded,
961 user_id(),
962 )
963 .await
964 .expect("Getting unthreaded user room receipt after save failed")
965 .unwrap();
966 assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
967 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
968 let second_event_unthreaded_receipts = self
969 .get_event_room_receipt_events(
970 room_id,
971 ReceiptType::Read,
972 ReceiptThread::Unthreaded,
973 second_event_id,
974 )
975 .await
976 .expect("Getting unthreaded event room receipt events for second event failed");
977 assert_eq!(
978 second_event_unthreaded_receipts.len(),
979 1,
980 "Found a wrong number of unthreaded receipts for second event after save"
981 );
982 assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
983 assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
984 let (threaded_user_receipt_event_id, threaded_user_receipt) = self
986 .get_user_room_receipt_event(room_id, ReceiptType::Read, ReceiptThread::Main, user_id())
987 .await
988 .expect("Getting threaded user room receipt after save failed")
989 .unwrap();
990 assert_eq!(threaded_user_receipt_event_id, second_event_id);
991 assert_eq!(threaded_user_receipt.ts.unwrap().0, third_receipt_ts);
992 let second_event_threaded_receipts = self
993 .get_event_room_receipt_events(
994 room_id,
995 ReceiptType::Read,
996 ReceiptThread::Main,
997 second_event_id,
998 )
999 .await
1000 .expect("Getting threaded event room receipt events for second event failed");
1001 assert_eq!(
1002 second_event_threaded_receipts.len(),
1003 1,
1004 "Found a wrong number of threaded receipts for second event after save"
1005 );
1006 assert_eq!(second_event_threaded_receipts[0].0, user_id());
1007 assert_eq!(second_event_threaded_receipts[0].1.ts.unwrap().0, third_receipt_ts);
1008
1009 Ok(())
1010 }
1011
1012 async fn test_custom_storage(&self) -> TestResult {
1013 let key = "my_key";
1014 let value = &[0, 1, 2, 3];
1015
1016 self.set_custom_value(key.as_bytes(), value.to_vec()).await?;
1017
1018 let read = self.get_custom_value(key.as_bytes()).await?;
1019
1020 assert_eq!(Some(value.as_ref()), read.as_deref());
1021
1022 Ok(())
1023 }
1024
1025 async fn test_stripped_non_stripped(&self) -> TestResult {
1026 let room_id = room_id!("!test_stripped_non_stripped:localhost");
1027 let user_id = user_id();
1028
1029 assert!(self.get_member_event(room_id, user_id).await?.is_none());
1030 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 0);
1031
1032 let mut changes = StateChanges::default();
1033 changes
1034 .state
1035 .entry(room_id.to_owned())
1036 .or_default()
1037 .entry(StateEventType::RoomMember)
1038 .or_default()
1039 .insert(user_id.into(), membership_event().cast());
1040 changes.add_room(RoomInfo::new(room_id, RoomState::Left));
1041 self.save_changes(&changes).await?;
1042
1043 let member_event = self.get_member_event(room_id, user_id).await?.unwrap().deserialize()?;
1044 assert!(matches!(member_event, MemberEvent::Sync(_)));
1045 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 1);
1046
1047 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
1048 assert_eq!(members, vec![user_id.to_owned()]);
1049
1050 let mut changes = StateChanges::default();
1051 changes.add_stripped_member(room_id, user_id, custom_stripped_membership_event(user_id));
1052 changes.add_room(RoomInfo::new(room_id, RoomState::Invited));
1053 self.save_changes(&changes).await?;
1054
1055 let member_event = self.get_member_event(room_id, user_id).await?.unwrap().deserialize()?;
1056 assert!(matches!(member_event, MemberEvent::Stripped(_)));
1057 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 1);
1058
1059 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
1060 assert_eq!(members, vec![user_id.to_owned()]);
1061
1062 Ok(())
1063 }
1064
1065 async fn test_room_removal(&self) -> TestResult {
1066 let room_id = room_id();
1067 let user_id = user_id();
1068 let display_name = DisplayName::new("example");
1069 let stripped_room_id = stripped_room_id();
1070
1071 self.populate().await?;
1072
1073 {
1074 let txn = TransactionId::new();
1076 let ev =
1077 SerializableEventContent::new(&RoomMessageEventContent::text_plain("sup").into())?;
1078 self.save_send_queue_request(
1079 room_id,
1080 txn.clone(),
1081 MilliSecondsSinceUnixEpoch::now(),
1082 ev.into(),
1083 0,
1084 )
1085 .await?;
1086
1087 self.save_dependent_queued_request(
1089 room_id,
1090 &txn,
1091 ChildTransactionId::new(),
1092 MilliSecondsSinceUnixEpoch::now(),
1093 DependentQueuedRequestKind::RedactEvent,
1094 )
1095 .await?;
1096 }
1097
1098 self.remove_room(room_id).await?;
1099
1100 assert_eq!(
1101 self.get_room_infos(&RoomLoadSettings::default()).await?.len(),
1102 1,
1103 "room is still there"
1104 );
1105
1106 assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_none());
1107 assert!(
1108 self.get_state_events(room_id, StateEventType::RoomTopic).await?.is_empty(),
1109 "still state events found"
1110 );
1111 assert!(self.get_profile(room_id, user_id).await?.is_none());
1112 assert!(self.get_member_event(room_id, user_id).await?.is_none());
1113 assert!(
1114 self.get_user_ids(room_id, RoomMemberships::empty()).await?.is_empty(),
1115 "still user ids found"
1116 );
1117 assert!(
1118 self.get_user_ids(room_id, RoomMemberships::INVITE).await?.is_empty(),
1119 "still invited user ids found"
1120 );
1121 assert!(
1122 self.get_user_ids(room_id, RoomMemberships::JOIN).await?.is_empty(),
1123 "still joined users found"
1124 );
1125 assert!(
1126 self.get_users_with_display_name(room_id, &display_name).await?.is_empty(),
1127 "still display names found"
1128 );
1129 assert!(
1130 self.get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
1131 .await?
1132 .is_none()
1133 );
1134 assert!(
1135 self.get_user_room_receipt_event(
1136 room_id,
1137 ReceiptType::Read,
1138 ReceiptThread::Unthreaded,
1139 user_id
1140 )
1141 .await?
1142 .is_none()
1143 );
1144 assert!(
1145 self.get_event_room_receipt_events(
1146 room_id,
1147 ReceiptType::Read,
1148 ReceiptThread::Unthreaded,
1149 first_receipt_event_id()
1150 )
1151 .await?
1152 .is_empty(),
1153 "still event recepts in the store"
1154 );
1155 assert!(self.load_send_queue_requests(room_id).await?.is_empty());
1156 assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1157
1158 self.remove_room(stripped_room_id).await?;
1159
1160 assert!(
1161 self.get_room_infos(&RoomLoadSettings::default()).await?.is_empty(),
1162 "still room info found"
1163 );
1164 Ok(())
1165 }
1166
1167 async fn test_profile_removal(&self) -> TestResult {
1168 let room_id = room_id();
1169
1170 let user_id = user_id();
1172 let invited_user_id = invited_user_id();
1173
1174 self.populate().await?;
1175
1176 let new_invite_member_json = json!({
1177 "content": {
1178 "avatar_url": "mxc://localhost/SEsfnsuifSDFSSEG",
1179 "displayname": "example after update",
1180 "membership": "invite",
1181 "reason": "Looking for support"
1182 },
1183 "event_id": "$143273582443PhrSm:localhost",
1184 "origin_server_ts": 1432735824,
1185 "room_id": room_id,
1186 "sender": user_id,
1187 "state_key": invited_user_id,
1188 "type": "m.room.member",
1189 });
1190 let new_invite_member_event: SyncRoomMemberEvent =
1191 serde_json::from_value(new_invite_member_json.clone())?;
1192
1193 let mut changes = StateChanges {
1194 profiles_to_delete: [(
1196 room_id.to_owned(),
1197 vec![user_id.to_owned(), invited_user_id.to_owned()],
1198 )]
1199 .into(),
1200
1201 profiles: {
1203 let mut map = BTreeMap::default();
1204 map.insert(
1205 room_id.to_owned(),
1206 [(invited_user_id.to_owned(), new_invite_member_event.into())]
1207 .into_iter()
1208 .collect(),
1209 );
1210 map
1211 },
1212
1213 ..StateChanges::default()
1214 };
1215
1216 let raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(new_invite_member_json)
1217 .expect("can create sync-state-event for topic");
1218 let event = raw.deserialize()?;
1219 changes.add_state_event(room_id, event, raw);
1220
1221 self.save_changes(&changes).await?;
1222
1223 assert!(self.get_profile(room_id, user_id).await?.is_none());
1225 assert!(self.get_member_event(room_id, user_id).await?.is_some());
1226
1227 let invited_member_event = self.get_profile(room_id, invited_user_id).await?.unwrap();
1229 assert_eq!(
1230 invited_member_event.as_original().unwrap().content.displayname.as_deref(),
1231 Some("example after update")
1232 );
1233 assert!(self.get_member_event(room_id, invited_user_id).await?.is_some());
1234
1235 Ok(())
1236 }
1237
1238 async fn test_presence_saving(&self) -> TestResult {
1239 let user_id = user_id();
1240 let second_user_id = user_id!("@second:localhost");
1241 let third_user_id = user_id!("@third:localhost");
1242 let unknown_user_id = user_id!("@unknown:localhost");
1243
1244 let mut user_ids = vec![user_id.to_owned()];
1246 let presence_event = self.get_presence_event(user_id).await;
1247 assert!(presence_event?.is_none());
1248 let presence_events = self.get_presence_events(&user_ids).await;
1249 assert!(presence_events?.is_empty());
1250
1251 let mut changes = StateChanges::default();
1253 changes.presence.insert(user_id.to_owned(), custom_presence_event(user_id));
1254 self.save_changes(&changes).await?;
1255
1256 let presence_event = self.get_presence_event(user_id).await;
1257 assert!(presence_event?.is_some());
1258 let presence_events = self.get_presence_events(&user_ids).await;
1259 assert_eq!(presence_events?.len(), 1);
1260
1261 let mut changes = StateChanges::default();
1263 changes.presence.insert(second_user_id.to_owned(), custom_presence_event(second_user_id));
1264 changes.presence.insert(third_user_id.to_owned(), custom_presence_event(third_user_id));
1265 self.save_changes(&changes).await?;
1266
1267 user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
1268 let presence_event = self.get_presence_event(second_user_id).await;
1269 assert!(presence_event?.is_some());
1270 let presence_event = self.get_presence_event(third_user_id).await;
1271 assert!(presence_event?.is_some());
1272 let presence_events = self.get_presence_events(&user_ids).await;
1273 assert_eq!(presence_events?.len(), 3);
1274
1275 user_ids.push(unknown_user_id.to_owned());
1277 let member_events = self.get_presence_events(&user_ids).await;
1278 assert_eq!(member_events?.len(), 3);
1279
1280 let presence_events = self.get_presence_events(&[]).await;
1282 assert!(presence_events?.is_empty());
1283
1284 Ok(())
1285 }
1286
1287 async fn test_display_names_saving(&self) -> TestResult {
1288 let room_id = room_id!("!test_display_names_saving:localhost");
1289 let user_id = user_id();
1290 let user_display_name = DisplayName::new("User");
1291 let second_user_id = user_id!("@second:localhost");
1292 let third_user_id = user_id!("@third:localhost");
1293 let other_display_name = DisplayName::new("Raoul");
1294 let unknown_display_name = DisplayName::new("Unknown");
1295
1296 let mut display_names = vec![user_display_name.to_owned()];
1298 let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1299 assert!(users.is_empty());
1300 let names = self.get_users_with_display_names(room_id, &display_names).await?;
1301 assert!(names.is_empty());
1302
1303 let mut changes = StateChanges::default();
1305 changes
1306 .ambiguity_maps
1307 .entry(room_id.to_owned())
1308 .or_default()
1309 .insert(user_display_name.to_owned(), [user_id.to_owned()].into());
1310 self.save_changes(&changes).await?;
1311
1312 let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1313 assert_eq!(users.len(), 1);
1314 let names = self.get_users_with_display_names(room_id, &display_names).await?;
1315 assert_eq!(names.len(), 1);
1316 assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1317
1318 let mut changes = StateChanges::default();
1320 changes.ambiguity_maps.entry(room_id.to_owned()).or_default().insert(
1321 other_display_name.to_owned(),
1322 [second_user_id.to_owned(), third_user_id.to_owned()].into(),
1323 );
1324 self.save_changes(&changes).await?;
1325
1326 display_names.push(other_display_name.to_owned());
1327 let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1328 assert_eq!(users.len(), 1);
1329 let users = self.get_users_with_display_name(room_id, &other_display_name).await?;
1330 assert_eq!(users.len(), 2);
1331 let names = self.get_users_with_display_names(room_id, &display_names).await?;
1332 assert_eq!(names.len(), 2);
1333 assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1334 assert_eq!(names.get(&other_display_name).unwrap().len(), 2);
1335
1336 display_names.push(unknown_display_name.to_owned());
1338 let names = self.get_users_with_display_names(room_id, &display_names).await?;
1339 assert_eq!(names.len(), 2);
1340
1341 let names = self.get_users_with_display_names(room_id, &[]).await?;
1343 assert!(names.is_empty());
1344
1345 Ok(())
1346 }
1347
1348 #[allow(clippy::needless_range_loop)]
1349 async fn test_send_queue(&self) -> TestResult {
1350 let room_id = room_id!("!test_send_queue:localhost");
1351
1352 let events = self.load_send_queue_requests(room_id).await?;
1354 assert!(events.is_empty());
1355
1356 let txn0 = TransactionId::new();
1358 let event0 =
1359 SerializableEventContent::new(&RoomMessageEventContent::text_plain("msg0").into())?;
1360 self.save_send_queue_request(
1361 room_id,
1362 txn0.clone(),
1363 MilliSecondsSinceUnixEpoch::now(),
1364 event0.into(),
1365 0,
1366 )
1367 .await?;
1368
1369 let pending = self.load_send_queue_requests(room_id).await?;
1371
1372 assert_eq!(pending.len(), 1);
1373 {
1374 assert_eq!(pending[0].transaction_id, txn0);
1375
1376 let deserialized = pending[0].as_event().unwrap().deserialize()?;
1377 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1378 assert_eq!(content.body(), "msg0");
1379
1380 assert!(!pending[0].is_wedged());
1381 }
1382
1383 for i in 1..=3 {
1385 let txn = TransactionId::new();
1386 let event = SerializableEventContent::new(
1387 &RoomMessageEventContent::text_plain(format!("msg{i}")).into(),
1388 )?;
1389
1390 self.save_send_queue_request(
1391 room_id,
1392 txn,
1393 MilliSecondsSinceUnixEpoch::now(),
1394 event.into(),
1395 0,
1396 )
1397 .await?;
1398 }
1399
1400 let pending = self.load_send_queue_requests(room_id).await?;
1402
1403 assert_eq!(pending.len(), 4);
1405
1406 assert_eq!(pending[0].transaction_id, txn0);
1407
1408 for i in 0..4 {
1409 let deserialized = pending[i].as_event().unwrap().deserialize()?;
1410 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1411 assert_eq!(content.body(), format!("msg{i}"));
1412 assert!(!pending[i].is_wedged());
1413 }
1414
1415 let txn2 = &pending[2].transaction_id;
1417 self.update_send_queue_request_status(
1418 room_id,
1419 txn2,
1420 Some(QueueWedgeError::GenericApiError { msg: "Oops".to_owned() }),
1421 )
1422 .await?;
1423
1424 let pending = self.load_send_queue_requests(room_id).await?;
1426
1427 assert_eq!(pending.len(), 4);
1429 assert_eq!(pending[0].transaction_id, txn0);
1430 assert_eq!(pending[2].transaction_id, *txn2);
1431 assert!(pending[2].is_wedged());
1432 let error = pending[2].clone().error.unwrap();
1433 let generic_error = assert_matches!(error, QueueWedgeError::GenericApiError { msg } => msg);
1434 assert_eq!(generic_error, "Oops");
1435 for i in 0..4 {
1436 if i != 2 {
1437 assert!(!pending[i].is_wedged());
1438 }
1439 }
1440
1441 let event0 = SerializableEventContent::new(
1443 &RoomMessageEventContent::text_plain("wow that's a cool test").into(),
1444 )?;
1445 self.update_send_queue_request(room_id, txn2, event0.into()).await?;
1446
1447 let pending = self.load_send_queue_requests(room_id).await?;
1449
1450 assert_eq!(pending.len(), 4);
1451 {
1452 assert_eq!(pending[2].transaction_id, *txn2);
1453
1454 let deserialized = pending[2].as_event().unwrap().deserialize()?;
1455 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1456 assert_eq!(content.body(), "wow that's a cool test");
1457
1458 assert!(!pending[2].is_wedged());
1459
1460 for i in 0..4 {
1461 if i != 2 {
1462 let deserialized = pending[i].as_event().unwrap().deserialize()?;
1463 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1464 assert_eq!(content.body(), format!("msg{i}"));
1465
1466 assert!(!pending[i].is_wedged());
1467 }
1468 }
1469 }
1470
1471 self.remove_send_queue_request(room_id, &txn0).await?;
1473
1474 let pending = self.load_send_queue_requests(room_id).await?;
1476
1477 assert_eq!(pending.len(), 3);
1478 assert_eq!(pending[1].transaction_id, *txn2);
1479 for i in 0..3 {
1480 assert_ne!(pending[i].transaction_id, txn0);
1481 }
1482
1483 let room_id2 = room_id!("!test_send_queue_two:localhost");
1488 {
1489 let txn = TransactionId::new();
1490 let event = SerializableEventContent::new(
1491 &RoomMessageEventContent::text_plain("room2").into(),
1492 )?;
1493 self.save_send_queue_request(
1494 room_id2,
1495 txn.clone(),
1496 MilliSecondsSinceUnixEpoch::now(),
1497 event.into(),
1498 0,
1499 )
1500 .await?;
1501 }
1502
1503 {
1505 let room_id3 = room_id!("!test_send_queue_three:localhost");
1506 let txn = TransactionId::new();
1507 let event = SerializableEventContent::new(
1508 &RoomMessageEventContent::text_plain("room3").into(),
1509 )?;
1510 self.save_send_queue_request(
1511 room_id3,
1512 txn.clone(),
1513 MilliSecondsSinceUnixEpoch::now(),
1514 event.into(),
1515 0,
1516 )
1517 .await?;
1518
1519 self.remove_send_queue_request(room_id3, &txn).await?;
1520 }
1521
1522 let outstanding_rooms = self.load_rooms_with_unsent_requests().await?;
1525 assert_eq!(outstanding_rooms.len(), 2);
1526 assert!(outstanding_rooms.iter().any(|room| room == room_id));
1527 assert!(outstanding_rooms.iter().any(|room| room == room_id2));
1528
1529 Ok(())
1530 }
1531
1532 async fn test_send_queue_priority(&self) -> TestResult {
1533 let room_id = room_id!("!test_send_queue:localhost");
1534
1535 let events = self.load_send_queue_requests(room_id).await?;
1537 assert!(events.is_empty());
1538
1539 let low0_txn = TransactionId::new();
1541 let ev0 =
1542 SerializableEventContent::new(&RoomMessageEventContent::text_plain("low0").into())?;
1543 self.save_send_queue_request(
1544 room_id,
1545 low0_txn.clone(),
1546 MilliSecondsSinceUnixEpoch::now(),
1547 ev0.into(),
1548 2,
1549 )
1550 .await?;
1551
1552 let high_txn = TransactionId::new();
1554 let ev1 =
1555 SerializableEventContent::new(&RoomMessageEventContent::text_plain("high").into())?;
1556 self.save_send_queue_request(
1557 room_id,
1558 high_txn.clone(),
1559 MilliSecondsSinceUnixEpoch::now(),
1560 ev1.into(),
1561 10,
1562 )
1563 .await?;
1564
1565 let low1_txn = TransactionId::new();
1567 let ev2 =
1568 SerializableEventContent::new(&RoomMessageEventContent::text_plain("low1").into())?;
1569 self.save_send_queue_request(
1570 room_id,
1571 low1_txn.clone(),
1572 MilliSecondsSinceUnixEpoch::now(),
1573 ev2.into(),
1574 2,
1575 )
1576 .await?;
1577
1578 let pending = self.load_send_queue_requests(room_id).await?;
1581
1582 assert_eq!(pending.len(), 3);
1583 {
1584 assert_eq!(pending[0].transaction_id, high_txn);
1585
1586 let deserialized = pending[0].as_event().unwrap().deserialize()?;
1587 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1588 assert_eq!(content.body(), "high");
1589 }
1590
1591 {
1592 assert_eq!(pending[1].transaction_id, low0_txn);
1593
1594 let deserialized = pending[1].as_event().unwrap().deserialize()?;
1595 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1596 assert_eq!(content.body(), "low0");
1597 }
1598
1599 {
1600 assert_eq!(pending[2].transaction_id, low1_txn);
1601
1602 let deserialized = pending[2].as_event().unwrap().deserialize()?;
1603 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1604 assert_eq!(content.body(), "low1");
1605 }
1606
1607 Ok(())
1608 }
1609
1610 async fn test_send_queue_dependents(&self) -> TestResult {
1611 let room_id = room_id!("!test_send_queue_dependents:localhost");
1612
1613 let txn0 = TransactionId::new();
1615 let event0 =
1616 SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into())?;
1617 self.save_send_queue_request(
1618 room_id,
1619 txn0.clone(),
1620 MilliSecondsSinceUnixEpoch::now(),
1621 event0.clone().into(),
1622 0,
1623 )
1624 .await?;
1625
1626 assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1628
1629 let child_txn = ChildTransactionId::new();
1631 self.save_dependent_queued_request(
1632 room_id,
1633 &txn0,
1634 child_txn.clone(),
1635 MilliSecondsSinceUnixEpoch::now(),
1636 DependentQueuedRequestKind::RedactEvent,
1637 )
1638 .await?;
1639
1640 let dependents = self.load_dependent_queued_requests(room_id).await?;
1642 assert_eq!(dependents.len(), 1);
1643 assert_eq!(dependents[0].parent_transaction_id, txn0);
1644 assert_eq!(dependents[0].own_transaction_id, child_txn);
1645 assert!(dependents[0].parent_key.is_none());
1646 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1647
1648 let (event, event_type) = event0.raw();
1650 let event_id = owned_event_id!("$1");
1651 let num_updated = self
1652 .mark_dependent_queued_requests_as_ready(
1653 room_id,
1654 &txn0,
1655 SentRequestKey::Event {
1656 event_id: event_id.clone(),
1657 event: event.clone(),
1658 event_type: event_type.to_owned(),
1659 },
1660 )
1661 .await?;
1662 assert_eq!(num_updated, 1);
1663
1664 let dependents = self.load_dependent_queued_requests(room_id).await?;
1666 assert_eq!(dependents.len(), 1);
1667 assert_eq!(dependents[0].parent_transaction_id, txn0);
1668 assert_eq!(dependents[0].own_transaction_id, child_txn);
1669 assert_matches!(
1670 dependents[0].parent_key.as_ref(),
1671 Some(SentRequestKey::Event {
1672 event_id: received_event_id,
1673 event: received_event,
1674 event_type: received_event_type
1675 }) => {
1676 assert_eq!(received_event_id, &event_id);
1677 assert_eq!(received_event.json().to_string(), event.json().to_string());
1678 assert_eq!(received_event_type.as_str(), event_type);
1679 }
1680 );
1681 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1682
1683 let removed = self
1685 .remove_dependent_queued_request(room_id, &dependents[0].own_transaction_id)
1686 .await?;
1687 assert!(removed);
1688
1689 assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1691
1692 let txn1 = TransactionId::new();
1695 let event1 =
1696 SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into())?;
1697 self.save_send_queue_request(
1698 room_id,
1699 txn1.clone(),
1700 MilliSecondsSinceUnixEpoch::now(),
1701 event1.into(),
1702 0,
1703 )
1704 .await?;
1705
1706 self.save_dependent_queued_request(
1707 room_id,
1708 &txn0,
1709 ChildTransactionId::new(),
1710 MilliSecondsSinceUnixEpoch::now(),
1711 DependentQueuedRequestKind::RedactEvent,
1712 )
1713 .await?;
1714 assert_eq!(self.load_dependent_queued_requests(room_id).await?.len(), 1);
1715
1716 self.save_dependent_queued_request(
1717 room_id,
1718 &txn1,
1719 ChildTransactionId::new(),
1720 MilliSecondsSinceUnixEpoch::now(),
1721 DependentQueuedRequestKind::EditEvent {
1722 new_content: SerializableEventContent::new(
1723 &RoomMessageEventContent::text_plain("edit").into(),
1724 )?,
1725 },
1726 )
1727 .await?;
1728 assert_eq!(self.load_dependent_queued_requests(room_id).await?.len(), 2);
1729
1730 let removed = self.remove_send_queue_request(room_id, &txn0).await?;
1732 assert!(removed);
1733
1734 let dependents = self.load_dependent_queued_requests(room_id).await?;
1736 assert_eq!(dependents.len(), 2);
1737
1738 Ok(())
1739 }
1740
1741 async fn test_update_send_queue_dependent(&self) -> TestResult {
1742 let room_id = room_id!("!test_send_queue_dependents:localhost");
1743
1744 let txn = TransactionId::new();
1745
1746 let child_txn = ChildTransactionId::new();
1748
1749 self.save_dependent_queued_request(
1750 room_id,
1751 &txn,
1752 child_txn.clone(),
1753 MilliSecondsSinceUnixEpoch::now(),
1754 DependentQueuedRequestKind::RedactEvent,
1755 )
1756 .await?;
1757
1758 let dependents = self.load_dependent_queued_requests(room_id).await?;
1760 assert_eq!(dependents.len(), 1);
1761 assert_eq!(dependents[0].parent_transaction_id, txn);
1762 assert_eq!(dependents[0].own_transaction_id, child_txn);
1763 assert!(dependents[0].parent_key.is_none());
1764 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1765
1766 self.update_dependent_queued_request(
1768 room_id,
1769 &child_txn,
1770 DependentQueuedRequestKind::ReactEvent { key: "👍".to_owned() },
1771 )
1772 .await?;
1773
1774 let dependents = self.load_dependent_queued_requests(room_id).await?;
1776 assert_eq!(dependents.len(), 1);
1777 assert_eq!(dependents[0].parent_transaction_id, txn);
1778 assert_eq!(dependents[0].own_transaction_id, child_txn);
1779 assert!(dependents[0].parent_key.is_none());
1780 assert_matches!(
1781 &dependents[0].kind,
1782 DependentQueuedRequestKind::ReactEvent { key } => {
1783 assert_eq!(key, "👍");
1784 }
1785 );
1786
1787 Ok(())
1788 }
1789
1790 async fn test_get_room_infos(&self) -> TestResult {
1791 let room_id_0 = room_id!("!r0");
1792 let room_id_1 = room_id!("!r1");
1793 let room_id_2 = room_id!("!r2");
1794
1795 {
1797 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 0);
1798 }
1799
1800 let mut changes = StateChanges::default();
1802 changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
1803 changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
1804 self.save_changes(&changes).await?;
1805
1806 {
1808 let mut all_rooms = self.get_room_infos(&RoomLoadSettings::All).await?;
1809
1810 all_rooms.sort_by(|a, b| a.room_id.cmp(&b.room_id));
1813
1814 assert_eq!(all_rooms.len(), 2);
1815 assert_eq!(all_rooms[0].room_id, room_id_0);
1816 assert_eq!(all_rooms[1].room_id, room_id_1);
1817 }
1818
1819 {
1821 let all_rooms =
1822 self.get_room_infos(&RoomLoadSettings::One(room_id_1.to_owned())).await?;
1823
1824 assert_eq!(all_rooms.len(), 1);
1825 assert_eq!(all_rooms[0].room_id, room_id_1);
1826 }
1827
1828 {
1831 let all_rooms =
1832 self.get_room_infos(&RoomLoadSettings::One(room_id_2.to_owned())).await?;
1833
1834 assert_eq!(all_rooms.len(), 0);
1835 }
1836
1837 Ok(())
1838 }
1839
1840 async fn test_thread_subscriptions(&self) -> TestResult {
1841 let first_thread = event_id!("$t1");
1842 let second_thread = event_id!("$t2");
1843
1844 let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1846 assert!(maybe_sub.is_none());
1847
1848 let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1849 assert!(maybe_sub.is_none());
1850
1851 self.upsert_thread_subscriptions(vec![(
1853 room_id(),
1854 first_thread,
1855 StoredThreadSubscription {
1856 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1857 bump_stamp: None,
1858 },
1859 )])
1860 .await?;
1861
1862 self.upsert_thread_subscriptions(vec![(
1863 room_id(),
1864 second_thread,
1865 StoredThreadSubscription {
1866 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1867 bump_stamp: None,
1868 },
1869 )])
1870 .await?;
1871
1872 let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1874 assert_eq!(
1875 maybe_sub,
1876 Some(StoredThreadSubscription {
1877 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1878 bump_stamp: None,
1879 })
1880 );
1881
1882 let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1883 assert_eq!(
1884 maybe_sub,
1885 Some(StoredThreadSubscription {
1886 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1887 bump_stamp: None,
1888 })
1889 );
1890
1891 self.upsert_thread_subscriptions(vec![(
1893 room_id(),
1894 first_thread,
1895 StoredThreadSubscription {
1896 status: ThreadSubscriptionStatus::Unsubscribed,
1897 bump_stamp: None,
1898 },
1899 )])
1900 .await?;
1901
1902 let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1904 assert_eq!(
1905 maybe_sub,
1906 Some(StoredThreadSubscription {
1907 status: ThreadSubscriptionStatus::Unsubscribed,
1908 bump_stamp: None,
1909 })
1910 );
1911
1912 let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1914 assert_eq!(
1915 maybe_sub,
1916 Some(StoredThreadSubscription {
1917 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1918 bump_stamp: None,
1919 })
1920 );
1921
1922 self.remove_thread_subscription(room_id(), second_thread).await?;
1924
1925 let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1927 assert_eq!(maybe_sub, None);
1928
1929 let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1931 assert_eq!(
1932 maybe_sub,
1933 Some(StoredThreadSubscription {
1934 status: ThreadSubscriptionStatus::Unsubscribed,
1935 bump_stamp: None,
1936 })
1937 );
1938
1939 self.remove_thread_subscription(room_id(), second_thread).await?;
1941
1942 Ok(())
1943 }
1944
1945 async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult {
1946 let threads = [
1947 event_id!("$t1"),
1948 event_id!("$t2"),
1949 event_id!("$t3"),
1950 event_id!("$t4"),
1951 event_id!("$t5"),
1952 event_id!("$t6"),
1953 ];
1954 let build_subscription_updates = |subs: &[StoredThreadSubscription]| {
1957 threads
1958 .iter()
1959 .zip(subs)
1960 .map(|(&event_id, &sub)| (room_id(), event_id, sub))
1961 .collect::<Vec<_>>()
1962 };
1963
1964 let initial_subscriptions = build_subscription_updates(&[
1966 StoredThreadSubscription {
1967 status: ThreadSubscriptionStatus::Unsubscribed,
1968 bump_stamp: None,
1969 },
1970 StoredThreadSubscription {
1971 status: ThreadSubscriptionStatus::Unsubscribed,
1972 bump_stamp: Some(14),
1973 },
1974 StoredThreadSubscription {
1975 status: ThreadSubscriptionStatus::Unsubscribed,
1976 bump_stamp: None,
1977 },
1978 StoredThreadSubscription {
1979 status: ThreadSubscriptionStatus::Unsubscribed,
1980 bump_stamp: Some(210),
1981 },
1982 StoredThreadSubscription {
1983 status: ThreadSubscriptionStatus::Unsubscribed,
1984 bump_stamp: Some(5),
1985 },
1986 StoredThreadSubscription {
1987 status: ThreadSubscriptionStatus::Unsubscribed,
1988 bump_stamp: Some(100),
1989 },
1990 ]);
1991
1992 let update_subscriptions = build_subscription_updates(&[
1993 StoredThreadSubscription {
1994 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1995 bump_stamp: None,
1996 },
1997 StoredThreadSubscription {
1998 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1999 bump_stamp: None,
2000 },
2001 StoredThreadSubscription {
2002 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2003 bump_stamp: Some(1101),
2004 },
2005 StoredThreadSubscription {
2006 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2007 bump_stamp: Some(222),
2008 },
2009 StoredThreadSubscription {
2010 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2011 bump_stamp: Some(1),
2012 },
2013 StoredThreadSubscription {
2014 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2015 bump_stamp: Some(100),
2016 },
2017 ]);
2018
2019 let expected_subscriptions = build_subscription_updates(&[
2020 StoredThreadSubscription {
2022 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2023 bump_stamp: None,
2024 },
2025 StoredThreadSubscription {
2027 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2028 bump_stamp: Some(14),
2029 },
2030 StoredThreadSubscription {
2032 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2033 bump_stamp: Some(1101),
2034 },
2035 StoredThreadSubscription {
2037 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2038 bump_stamp: Some(222),
2039 },
2040 StoredThreadSubscription {
2042 status: ThreadSubscriptionStatus::Unsubscribed,
2043 bump_stamp: Some(5),
2044 },
2045 StoredThreadSubscription {
2047 status: ThreadSubscriptionStatus::Unsubscribed,
2048 bump_stamp: Some(100),
2049 },
2050 ]);
2051
2052 self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?;
2054
2055 for (room_id, thread_id, expected_sub) in &initial_subscriptions {
2057 let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2058 assert_eq!(stored_subscription, Some(*expected_sub));
2059 }
2060
2061 self.upsert_thread_subscriptions(update_subscriptions).await?;
2063
2064 for (room_id, thread_id, expected_sub) in &expected_subscriptions {
2066 let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2067 assert_eq!(stored_subscription, Some(*expected_sub));
2068 }
2069
2070 for (room_id, thread_id, _) in &expected_subscriptions {
2072 self.remove_thread_subscription(room_id, thread_id).await?;
2073 }
2074
2075 let initial_subscriptions = build_subscription_updates(&[
2076 StoredThreadSubscription {
2077 status: ThreadSubscriptionStatus::Unsubscribed,
2078 bump_stamp: Some(1),
2079 },
2080 StoredThreadSubscription {
2081 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
2082 bump_stamp: Some(1),
2083 },
2084 StoredThreadSubscription {
2085 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2086 bump_stamp: Some(1),
2087 },
2088 ]);
2089
2090 self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?;
2091
2092 for (room_id, thread_id, expected_sub) in &initial_subscriptions {
2093 let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2094 assert_eq!(stored_subscription, Some(*expected_sub));
2095 }
2096
2097 let update_subscriptions = build_subscription_updates(&[
2098 StoredThreadSubscription {
2099 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2100 bump_stamp: Some(2),
2101 },
2102 StoredThreadSubscription {
2103 status: ThreadSubscriptionStatus::Unsubscribed,
2104 bump_stamp: Some(2),
2105 },
2106 StoredThreadSubscription {
2107 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
2108 bump_stamp: Some(2),
2109 },
2110 ]);
2111
2112 self.upsert_thread_subscriptions(update_subscriptions.clone()).await?;
2113
2114 for (room_id, thread_id, expected_sub) in &update_subscriptions {
2115 let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2116 assert_eq!(stored_subscription, Some(*expected_sub));
2117 }
2118
2119 Ok(())
2120 }
2121}
2122
2123#[allow(unused_macros, unused_extern_crates)]
2149#[macro_export]
2150macro_rules! statestore_integration_tests {
2151 () => {
2152 mod statestore_integration_tests {
2153 use matrix_sdk_test::{TestResult, async_test};
2154 use $crate::store::{IntoStateStore, StateStoreIntegrationTests};
2155
2156 use super::get_store;
2157
2158 #[async_test]
2159 async fn test_topic_redaction() -> TestResult {
2160 let store = get_store().await?.into_state_store();
2161 store.test_topic_redaction().await
2162 }
2163
2164 #[async_test]
2165 async fn test_populate_store() -> TestResult {
2166 let store = get_store().await?.into_state_store();
2167 store.test_populate_store().await
2168 }
2169
2170 #[async_test]
2171 async fn test_member_saving() -> TestResult {
2172 let store = get_store().await?.into_state_store();
2173 store.test_member_saving().await
2174 }
2175
2176 #[async_test]
2177 async fn test_filter_saving() -> TestResult {
2178 let store = get_store().await?.into_state_store();
2179 store.test_filter_saving().await
2180 }
2181
2182 #[async_test]
2183 async fn test_user_avatar_url_saving() -> TestResult {
2184 let store = get_store().await?.into_state_store();
2185 store.test_user_avatar_url_saving().await
2186 }
2187
2188 #[async_test]
2189 async fn test_supported_versions_saving() -> TestResult {
2190 let store = get_store().await?.into_state_store();
2191 store.test_supported_versions_saving().await
2192 }
2193
2194 #[async_test]
2195 async fn test_well_known_saving() -> TestResult {
2196 let store = get_store().await?.into_state_store();
2197 store.test_well_known_saving().await
2198 }
2199
2200 #[async_test]
2201 async fn test_sync_token_saving() -> TestResult {
2202 let store = get_store().await?.into_state_store();
2203 store.test_sync_token_saving().await
2204 }
2205
2206 #[async_test]
2207 async fn test_utd_hook_manager_data_saving() -> TestResult {
2208 let store = get_store().await?.into_state_store();
2209 store.test_utd_hook_manager_data_saving().await
2210 }
2211
2212 #[async_test]
2213 async fn test_one_time_key_already_uploaded_data_saving() -> TestResult {
2214 let store = get_store().await?.into_state_store();
2215 store.test_one_time_key_already_uploaded_data_saving().await
2216 }
2217
2218 #[async_test]
2219 async fn test_stripped_member_saving() -> TestResult {
2220 let store = get_store().await?.into_state_store();
2221 store.test_stripped_member_saving().await
2222 }
2223
2224 #[async_test]
2225 async fn test_power_level_saving() -> TestResult {
2226 let store = get_store().await?.into_state_store();
2227 store.test_power_level_saving().await
2228 }
2229
2230 #[async_test]
2231 async fn test_receipts_saving() -> TestResult {
2232 let store = get_store().await?.into_state_store();
2233 store.test_receipts_saving().await
2234 }
2235
2236 #[async_test]
2237 async fn test_custom_storage() -> TestResult {
2238 let store = get_store().await?.into_state_store();
2239 store.test_custom_storage().await
2240 }
2241
2242 #[async_test]
2243 async fn test_stripped_non_stripped() -> TestResult {
2244 let store = get_store().await?.into_state_store();
2245 store.test_stripped_non_stripped().await
2246 }
2247
2248 #[async_test]
2249 async fn test_room_removal() -> TestResult {
2250 let store = get_store().await?.into_state_store();
2251 store.test_room_removal().await
2252 }
2253
2254 #[async_test]
2255 async fn test_profile_removal() -> TestResult {
2256 let store = get_store().await?.into_state_store();
2257 store.test_profile_removal().await
2258 }
2259
2260 #[async_test]
2261 async fn test_presence_saving() -> TestResult {
2262 let store = get_store().await?.into_state_store();
2263 store.test_presence_saving().await
2264 }
2265
2266 #[async_test]
2267 async fn test_display_names_saving() -> TestResult {
2268 let store = get_store().await?.into_state_store();
2269 store.test_display_names_saving().await
2270 }
2271
2272 #[async_test]
2273 async fn test_send_queue() -> TestResult {
2274 let store = get_store().await?.into_state_store();
2275 store.test_send_queue().await
2276 }
2277
2278 #[async_test]
2279 async fn test_send_queue_priority() -> TestResult {
2280 let store = get_store().await?.into_state_store();
2281 store.test_send_queue_priority().await
2282 }
2283
2284 #[async_test]
2285 async fn test_send_queue_dependents() -> TestResult {
2286 let store = get_store().await?.into_state_store();
2287 store.test_send_queue_dependents().await
2288 }
2289
2290 #[async_test]
2291 async fn test_update_send_queue_dependent() -> TestResult {
2292 let store = get_store().await?.into_state_store();
2293 store.test_update_send_queue_dependent().await
2294 }
2295
2296 #[async_test]
2297 async fn test_get_room_infos() -> TestResult {
2298 let store = get_store().await?.into_state_store();
2299 store.test_get_room_infos().await
2300 }
2301
2302 #[async_test]
2303 async fn test_thread_subscriptions() -> TestResult {
2304 let store = get_store().await?.into_state_store();
2305 store.test_thread_subscriptions().await
2306 }
2307
2308 #[async_test]
2309 async fn test_thread_subscriptions_bulk_upsert() -> TestResult {
2310 let store = get_store().await?.into_state_store();
2311 store.test_thread_subscriptions_bulk_upsert().await
2312 }
2313 }
2314 };
2315}
2316
2317fn user_id() -> &'static UserId {
2318 user_id!("@example:localhost")
2319}
2320
2321fn invited_user_id() -> &'static UserId {
2322 user_id!("@invited:localhost")
2323}
2324
2325fn room_id() -> &'static RoomId {
2326 room_id!("!test:localhost")
2327}
2328
2329fn stripped_room_id() -> &'static RoomId {
2330 room_id!("!stripped:localhost")
2331}
2332
2333fn first_receipt_event_id() -> &'static EventId {
2334 event_id!("$example")
2335}
2336
2337fn power_level_event() -> Raw<AnySyncStateEvent> {
2338 let content = RoomPowerLevelsEventContent::new(&AuthorizationRules::V1);
2339
2340 let event = json!({
2341 "event_id": "$h29iv0s8:example.com",
2342 "content": content,
2343 "sender": user_id(),
2344 "type": "m.room.power_levels",
2345 "origin_server_ts": 0u64,
2346 "state_key": "",
2347 });
2348
2349 serde_json::from_value(event).unwrap()
2350}
2351
2352fn stripped_membership_event() -> Raw<StrippedRoomMemberEvent> {
2353 custom_stripped_membership_event(user_id())
2354}
2355
2356fn custom_stripped_membership_event(user_id: &UserId) -> Raw<StrippedRoomMemberEvent> {
2357 let ev_json = json!({
2358 "type": "m.room.member",
2359 "content": RoomMemberEventContent::new(MembershipState::Join),
2360 "sender": user_id,
2361 "state_key": user_id,
2362 });
2363
2364 Raw::new(&ev_json).unwrap().cast_unchecked()
2365}
2366
2367fn membership_event() -> Raw<SyncRoomMemberEvent> {
2368 custom_membership_event(user_id(), event_id!("$h29iv0s8:example.com"))
2369}
2370
2371fn custom_membership_event(user_id: &UserId, event_id: &EventId) -> Raw<SyncRoomMemberEvent> {
2372 let ev_json = json!({
2373 "type": "m.room.member",
2374 "content": RoomMemberEventContent::new(MembershipState::Join),
2375 "event_id": event_id,
2376 "origin_server_ts": 198,
2377 "sender": user_id,
2378 "state_key": user_id,
2379 });
2380
2381 Raw::new(&ev_json).unwrap().cast_unchecked()
2382}
2383
2384fn custom_presence_event(user_id: &UserId) -> Raw<PresenceEvent> {
2385 let ev_json = json!({
2386 "content": {
2387 "presence": "online"
2388 },
2389 "sender": user_id,
2390 });
2391
2392 Raw::new(&ev_json).unwrap().cast_unchecked()
2393}