1use std::collections::{BTreeMap, BTreeSet, HashMap};
4
5use assert_matches::assert_matches;
6use assert_matches2::assert_let;
7use growable_bloom_filter::GrowableBloomBuilder;
8use matrix_sdk_test::{TestResult, event_factory::EventFactory, test_json};
9use ruma::{
10 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId, RoomId, TransactionId, UserId,
11 api::{
12 FeatureFlag, MatrixVersion,
13 client::discovery::discover_homeserver::{HomeserverInfo, RtcFocusInfo},
14 },
15 event_id,
16 events::{
17 AnyGlobalAccountDataEvent, AnyMessageLikeEventContent, AnyRoomAccountDataEvent,
18 AnyStrippedStateEvent, AnySyncStateEvent, GlobalAccountDataEventType,
19 RoomAccountDataEventType, StateEventType, SyncStateEvent,
20 presence::PresenceEvent,
21 receipt::{ReceiptThread, ReceiptType},
22 room::{
23 member::{
24 MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent,
25 SyncRoomMemberEvent,
26 },
27 message::RoomMessageEventContent,
28 power_levels::RoomPowerLevelsEventContent,
29 topic::RoomTopicEventContent,
30 },
31 },
32 owned_event_id, owned_mxc_uri,
33 push::Ruleset,
34 room_id,
35 room_version_rules::AuthorizationRules,
36 serde::Raw,
37 uint, user_id,
38};
39use serde_json::{json, value::Value as JsonValue};
40
41use super::{
42 DependentQueuedRequestKind, DisplayName, DynStateStore, RoomLoadSettings, ServerInfo,
43 WellKnownResponse, send_queue::SentRequestKey,
44};
45use crate::{
46 RoomInfo, RoomMemberships, RoomState, StateChanges, StateStoreDataKey, StateStoreDataValue,
47 deserialized_responses::MemberEvent,
48 store::{
49 ChildTransactionId, QueueWedgeError, SerializableEventContent, StateStoreExt,
50 StoredThreadSubscription, ThreadSubscriptionStatus,
51 },
52};
53
54#[allow(async_fn_in_trait)]
59pub trait StateStoreIntegrationTests {
60 async fn populate(&self) -> TestResult;
62 async fn test_topic_redaction(&self) -> TestResult;
64 async fn test_populate_store(&self) -> TestResult;
66 async fn test_member_saving(&self) -> TestResult;
68 async fn test_filter_saving(&self) -> TestResult;
70 async fn test_user_avatar_url_saving(&self) -> TestResult;
72 async fn test_sync_token_saving(&self) -> TestResult;
74 async fn test_utd_hook_manager_data_saving(&self) -> TestResult;
76 async fn test_one_time_key_already_uploaded_data_saving(&self) -> TestResult;
78 async fn test_stripped_member_saving(&self) -> TestResult;
80 async fn test_power_level_saving(&self) -> TestResult;
82 async fn test_receipts_saving(&self) -> TestResult;
84 async fn test_custom_storage(&self) -> TestResult;
86 async fn test_stripped_non_stripped(&self) -> TestResult;
88 async fn test_room_removal(&self) -> TestResult;
90 async fn test_profile_removal(&self) -> TestResult;
92 async fn test_presence_saving(&self) -> TestResult;
94 async fn test_display_names_saving(&self) -> TestResult;
96 async fn test_send_queue(&self) -> TestResult;
98 async fn test_send_queue_priority(&self) -> TestResult;
100 async fn test_send_queue_dependents(&self) -> TestResult;
102 async fn test_update_send_queue_dependent(&self) -> TestResult;
104 async fn test_server_info_saving(&self) -> TestResult;
106 async fn test_get_room_infos(&self) -> TestResult;
108 async fn test_thread_subscriptions(&self) -> TestResult;
110 async fn test_thread_subscriptions_bumpstamps(&self) -> TestResult;
112}
113
114impl StateStoreIntegrationTests for DynStateStore {
115 async fn populate(&self) -> TestResult {
116 let f = EventFactory::new();
117 let mut changes = StateChanges::default();
118
119 let user_id = user_id();
120 let invited_user_id = invited_user_id();
121 let room_id = room_id();
122 let stripped_room_id = stripped_room_id();
123
124 changes.sync_token = Some("t392-516_47314_0_7_1_1_1_11444_1".to_owned());
125
126 let presence_json: &JsonValue = &test_json::PRESENCE;
127 let presence_raw = serde_json::from_value::<Raw<PresenceEvent>>(presence_json.clone())?;
128 let presence_event = presence_raw.deserialize()?;
129 changes.add_presence_event(presence_event, presence_raw);
130
131 let pushrules_raw: Raw<AnyGlobalAccountDataEvent> =
132 f.push_rules(Ruleset::server_default(user_id)).into_raw();
133 let pushrules_event = pushrules_raw.deserialize()?;
134 changes.account_data.insert(pushrules_event.event_type(), pushrules_raw);
135
136 let mut room = RoomInfo::new(room_id, RoomState::Joined);
137 room.mark_as_left();
138
139 let tag_json: &JsonValue = &test_json::TAG;
140 let tag_raw = serde_json::from_value::<Raw<AnyRoomAccountDataEvent>>(tag_json.clone())?;
141 let tag_event = tag_raw.deserialize()?;
142 changes.add_room_account_data(room_id, tag_event, tag_raw);
143
144 let name_json: &JsonValue = &test_json::NAME;
145 let name_raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(name_json.clone())?;
146 let name_event = name_raw.deserialize()?;
147 room.handle_state_event(&name_event);
148 changes.add_state_event(room_id, name_event, name_raw);
149
150 let topic_json: &JsonValue = &test_json::TOPIC;
151 let topic_raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(topic_json.clone())?;
152 let topic_event = topic_raw.deserialize()?;
153 room.handle_state_event(&topic_event);
154 changes.add_state_event(room_id, topic_event, topic_raw);
155
156 let mut room_ambiguity_map = HashMap::new();
157 let mut room_profiles = BTreeMap::new();
158
159 let member_json: &JsonValue = &test_json::MEMBER;
160 let member_event: SyncRoomMemberEvent = serde_json::from_value(member_json.clone())?;
161 let displayname = DisplayName::new(
162 member_event.as_original().unwrap().content.displayname.as_ref().unwrap(),
163 );
164 room_ambiguity_map.insert(displayname.clone(), BTreeSet::from([user_id.to_owned()]));
165 room_profiles.insert(user_id.to_owned(), (&member_event).into());
166
167 let member_state_raw =
168 serde_json::from_value::<Raw<AnySyncStateEvent>>(member_json.clone())?;
169 let member_state_event = member_state_raw.deserialize()?;
170 changes.add_state_event(room_id, member_state_event, member_state_raw);
171
172 let invited_member_json: &JsonValue = &test_json::MEMBER_INVITE;
173 let invited_member_event: SyncRoomMemberEvent =
175 serde_json::from_value(invited_member_json.clone())?;
176 room_ambiguity_map.entry(displayname).or_default().insert(invited_user_id.to_owned());
177 room_profiles.insert(invited_user_id.to_owned(), (&invited_member_event).into());
178
179 let invited_member_state_raw =
180 serde_json::from_value::<Raw<AnySyncStateEvent>>(invited_member_json.clone())?;
181 let invited_member_state_event = invited_member_state_raw.deserialize()?;
182 changes.add_state_event(room_id, invited_member_state_event, invited_member_state_raw);
183
184 let receipt_content = f
185 .room(room_id)
186 .read_receipts()
187 .add(event_id!("$example"), user_id, ReceiptType::Read, ReceiptThread::Unthreaded)
188 .into_content();
189 changes.add_receipts(room_id, receipt_content);
190
191 changes.ambiguity_maps.insert(room_id.to_owned(), room_ambiguity_map);
192 changes.profiles.insert(room_id.to_owned(), room_profiles);
193 changes.add_room(room);
194
195 let mut stripped_room = RoomInfo::new(stripped_room_id, RoomState::Invited);
196
197 let stripped_name_json: &JsonValue = &test_json::NAME_STRIPPED;
198 let stripped_name_raw =
199 serde_json::from_value::<Raw<AnyStrippedStateEvent>>(stripped_name_json.clone())?;
200 let stripped_name_event = stripped_name_raw.deserialize()?;
201 stripped_room.handle_stripped_state_event(&stripped_name_event);
202 changes.stripped_state.insert(
203 stripped_room_id.to_owned(),
204 BTreeMap::from([(
205 stripped_name_event.event_type(),
206 BTreeMap::from([(
207 stripped_name_event.state_key().to_owned(),
208 stripped_name_raw.clone(),
209 )]),
210 )]),
211 );
212
213 changes.add_room(stripped_room);
214
215 let stripped_member_json: &JsonValue = &test_json::MEMBER_STRIPPED;
216 let stripped_member_event = Raw::new(&stripped_member_json.clone())?.cast_unchecked();
217 changes.add_stripped_member(stripped_room_id, user_id, stripped_member_event);
218
219 self.save_changes(&changes).await?;
220
221 Ok(())
222 }
223
224 async fn test_topic_redaction(&self) -> TestResult {
225 let room_id = room_id();
226 self.populate().await?;
227
228 assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
229 assert_eq!(
230 self.get_state_event_static::<RoomTopicEventContent>(room_id)
231 .await?
232 .expect("room topic found before redaction")
233 .deserialize()
234 .expect("can deserialize room topic before redaction")
235 .as_sync()
236 .expect("room topic is a sync state event")
237 .as_original()
238 .expect("room topic is not redacted yet")
239 .content
240 .topic,
241 "😀"
242 );
243
244 let mut changes = StateChanges::default();
245
246 let redaction_json: &JsonValue = &test_json::TOPIC_REDACTION;
247 let redaction_evt: Raw<_> = serde_json::from_value(redaction_json.clone())
248 .expect("topic redaction event making works");
249 let redacted_event_id: OwnedEventId = redaction_evt.get_field("redacts")?.unwrap();
250
251 changes.add_redaction(room_id, &redacted_event_id, redaction_evt);
252 self.save_changes(&changes).await?;
253
254 let redacted_event = self
255 .get_state_event_static::<RoomTopicEventContent>(room_id)
256 .await?
257 .expect("room topic found after redaction")
258 .deserialize()
259 .expect("can deserialize room topic after redaction");
260
261 assert_matches!(redacted_event.as_sync(), Some(SyncStateEvent::Redacted(_)));
262
263 Ok(())
264 }
265
266 async fn test_populate_store(&self) -> TestResult {
267 let room_id = room_id();
268 let user_id = user_id();
269 let display_name = DisplayName::new("example");
270
271 self.populate().await?;
272
273 assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
274 assert!(self.get_presence_event(user_id).await?.is_some());
275 assert_eq!(
276 self.get_room_infos(&RoomLoadSettings::default()).await?.len(),
277 2,
278 "Expected to find 2 room infos"
279 );
280 assert!(
281 self.get_account_data_event(GlobalAccountDataEventType::PushRules).await?.is_some()
282 );
283
284 assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_some());
285 assert_eq!(
286 self.get_state_events(room_id, StateEventType::RoomTopic).await?.len(),
287 1,
288 "Expected to find 1 room topic"
289 );
290 assert!(self.get_profile(room_id, user_id).await?.is_some());
291 assert!(self.get_member_event(room_id, user_id).await?.is_some());
292 assert_eq!(
293 self.get_user_ids(room_id, RoomMemberships::empty()).await?.len(),
294 2,
295 "Expected to find 2 members for room"
296 );
297 assert_eq!(
298 self.get_user_ids(room_id, RoomMemberships::INVITE).await?.len(),
299 1,
300 "Expected to find 1 invited user ids"
301 );
302 assert_eq!(
303 self.get_user_ids(room_id, RoomMemberships::JOIN).await?.len(),
304 1,
305 "Expected to find 1 joined user ids"
306 );
307 assert_eq!(
308 self.get_users_with_display_name(room_id, &display_name).await?.len(),
309 2,
310 "Expected to find 2 display names for room"
311 );
312 assert!(
313 self.get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
314 .await?
315 .is_some()
316 );
317 assert!(
318 self.get_user_room_receipt_event(
319 room_id,
320 ReceiptType::Read,
321 ReceiptThread::Unthreaded,
322 user_id
323 )
324 .await?
325 .is_some()
326 );
327 assert_eq!(
328 self.get_event_room_receipt_events(
329 room_id,
330 ReceiptType::Read,
331 ReceiptThread::Unthreaded,
332 first_receipt_event_id()
333 )
334 .await?
335 .len(),
336 1,
337 "Expected to find 1 read receipt"
338 );
339 Ok(())
340 }
341
342 async fn test_member_saving(&self) -> TestResult {
343 let room_id = room_id!("!test_member_saving:localhost");
344 let user_id = user_id();
345 let second_user_id = user_id!("@second:localhost");
346 let third_user_id = user_id!("@third:localhost");
347 let unknown_user_id = user_id!("@unknown:localhost");
348
349 let mut user_ids = vec![user_id.to_owned()];
351 assert!(self.get_member_event(room_id, user_id).await?.is_none());
352 let member_events = self
353 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
354 .await;
355 assert!(member_events?.is_empty());
356 assert!(self.get_profile(room_id, user_id).await?.is_none());
357 let profiles = self.get_profiles(room_id, &user_ids).await;
358 assert!(profiles?.is_empty());
359
360 let mut changes = StateChanges::default();
362 let raw_member_event = membership_event();
363 let profile = raw_member_event.deserialize()?.into();
364 changes
365 .state
366 .entry(room_id.to_owned())
367 .or_default()
368 .entry(StateEventType::RoomMember)
369 .or_default()
370 .insert(user_id.into(), raw_member_event.cast());
371 changes.profiles.entry(room_id.to_owned()).or_default().insert(user_id.to_owned(), profile);
372 self.save_changes(&changes).await?;
373
374 assert!(self.get_member_event(room_id, user_id).await?.is_some());
375 let member_events = self
376 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
377 .await;
378 assert_eq!(member_events?.len(), 1);
379 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
380 assert_eq!(members.len(), 1, "We expected to find members for the room");
381 assert!(self.get_profile(room_id, user_id).await?.is_some());
382 let profiles = self.get_profiles(room_id, &user_ids).await;
383 assert_eq!(profiles?.len(), 1);
384
385 let mut changes = StateChanges::default();
387 let changes_members = changes
388 .state
389 .entry(room_id.to_owned())
390 .or_default()
391 .entry(StateEventType::RoomMember)
392 .or_default();
393 let changes_profiles = changes.profiles.entry(room_id.to_owned()).or_default();
394 let raw_second_member_event =
395 custom_membership_event(second_user_id, event_id!("$second_member_event"));
396 let second_profile = raw_second_member_event.deserialize()?.into();
397 changes_members.insert(second_user_id.into(), raw_second_member_event.cast());
398 changes_profiles.insert(second_user_id.to_owned(), second_profile);
399 let raw_third_member_event =
400 custom_membership_event(third_user_id, event_id!("$third_member_event"));
401 let third_profile = raw_third_member_event.deserialize()?.into();
402 changes_members.insert(third_user_id.into(), raw_third_member_event.cast());
403 changes_profiles.insert(third_user_id.to_owned(), third_profile);
404 self.save_changes(&changes).await?;
405
406 user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
407 assert!(self.get_member_event(room_id, second_user_id).await?.is_some());
408 assert!(self.get_member_event(room_id, third_user_id).await?.is_some());
409 let member_events = self
410 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
411 .await;
412 assert_eq!(member_events?.len(), 3);
413 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
414 assert_eq!(members.len(), 3, "We expected to find members for the room");
415 assert!(self.get_profile(room_id, second_user_id).await?.is_some());
416 assert!(self.get_profile(room_id, third_user_id).await?.is_some());
417 let profiles = self.get_profiles(room_id, &user_ids).await;
418 assert_eq!(profiles?.len(), 3);
419
420 user_ids.push(unknown_user_id.to_owned());
422 let member_events = self
423 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
424 .await;
425 assert_eq!(member_events?.len(), 3);
426 let profiles = self.get_profiles(room_id, &user_ids).await;
427 assert_eq!(profiles?.len(), 3);
428
429 let member_events = self
431 .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
432 room_id,
433 &[],
434 )
435 .await;
436 assert!(member_events?.is_empty());
437 let profiles = self.get_profiles(room_id, &[]).await;
438 assert!(profiles?.is_empty());
439
440 Ok(())
441 }
442
443 async fn test_filter_saving(&self) -> TestResult {
444 let filter_name = "filter_name";
445 let filter_id = "filter_id_1234";
446
447 self.set_kv_data(
448 StateStoreDataKey::Filter(filter_name),
449 StateStoreDataValue::Filter(filter_id.to_owned()),
450 )
451 .await?;
452 assert_let!(
453 Ok(Some(StateStoreDataValue::Filter(stored_filter_id))) =
454 self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await
455 );
456 assert_eq!(stored_filter_id, filter_id);
457
458 self.remove_kv_data(StateStoreDataKey::Filter(filter_name)).await?;
459 assert_matches!(self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await, Ok(None));
460
461 Ok(())
462 }
463
464 async fn test_user_avatar_url_saving(&self) -> TestResult {
465 let user_id = user_id!("@alice:example.org");
466 let url = owned_mxc_uri!("mxc://example.org/poiuyt098");
467
468 self.set_kv_data(
469 StateStoreDataKey::UserAvatarUrl(user_id),
470 StateStoreDataValue::UserAvatarUrl(url.clone()),
471 )
472 .await?;
473
474 assert_let!(
475 Ok(Some(StateStoreDataValue::UserAvatarUrl(stored_url))) =
476 self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await
477 );
478 assert_eq!(stored_url, url);
479
480 self.remove_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await?;
481 assert_matches!(
482 self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await,
483 Ok(None)
484 );
485
486 Ok(())
487 }
488
489 async fn test_server_info_saving(&self) -> TestResult {
490 let versions =
491 BTreeSet::from([MatrixVersion::V1_1, MatrixVersion::V1_2, MatrixVersion::V1_11]);
492 let server_info = ServerInfo::new(
493 versions.iter().map(|version| version.as_str().unwrap().to_owned()).collect(),
494 [("org.matrix.experimental".to_owned(), true)].into(),
495 Some(WellKnownResponse {
496 homeserver: HomeserverInfo::new("matrix.example.com".to_owned()),
497 identity_server: None,
498 tile_server: None,
499 rtc_foci: vec![RtcFocusInfo::livekit("livekit.example.com".to_owned())],
500 }),
501 );
502
503 self.set_kv_data(
504 StateStoreDataKey::ServerInfo,
505 StateStoreDataValue::ServerInfo(server_info.clone()),
506 )
507 .await?;
508
509 assert_let!(
510 Ok(Some(StateStoreDataValue::ServerInfo(stored_info))) =
511 self.get_kv_data(StateStoreDataKey::ServerInfo).await
512 );
513 assert_eq!(stored_info, server_info);
514
515 let decoded_server_info = stored_info.maybe_decode().unwrap();
516 let stored_supported = decoded_server_info.supported_versions();
517
518 assert_eq!(stored_supported.versions, versions);
519 assert_eq!(stored_supported.features.len(), 1);
520 assert!(stored_supported.features.contains(&FeatureFlag::from("org.matrix.experimental")));
521
522 self.remove_kv_data(StateStoreDataKey::ServerInfo).await?;
523 assert_matches!(self.get_kv_data(StateStoreDataKey::ServerInfo).await, Ok(None));
524
525 Ok(())
526 }
527
528 async fn test_sync_token_saving(&self) -> TestResult {
529 let sync_token_1 = "t392-516_47314_0_7_1";
530 let sync_token_2 = "t392-516_47314_0_7_2";
531
532 assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
533
534 let changes =
535 StateChanges { sync_token: Some(sync_token_1.to_owned()), ..Default::default() };
536 self.save_changes(&changes).await?;
537 assert_let!(
538 Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
539 self.get_kv_data(StateStoreDataKey::SyncToken).await
540 );
541 assert_eq!(stored_sync_token, sync_token_1);
542
543 self.set_kv_data(
544 StateStoreDataKey::SyncToken,
545 StateStoreDataValue::SyncToken(sync_token_2.to_owned()),
546 )
547 .await?;
548 assert_let!(
549 Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
550 self.get_kv_data(StateStoreDataKey::SyncToken).await
551 );
552 assert_eq!(stored_sync_token, sync_token_2);
553
554 self.remove_kv_data(StateStoreDataKey::SyncToken).await?;
555 assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
556
557 Ok(())
558 }
559
560 async fn test_utd_hook_manager_data_saving(&self) -> TestResult {
561 assert!(
563 self.get_kv_data(StateStoreDataKey::UtdHookManagerData)
564 .await
565 .expect("Could not read data")
566 .is_none(),
567 "Store was not empty at start"
568 );
569
570 let data = GrowableBloomBuilder::new().build();
572 self.set_kv_data(
573 StateStoreDataKey::UtdHookManagerData,
574 StateStoreDataValue::UtdHookManagerData(data.clone()),
575 )
576 .await
577 .expect("Could not save data");
578
579 let read_data = self
581 .get_kv_data(StateStoreDataKey::UtdHookManagerData)
582 .await
583 .expect("Could not read data")
584 .expect("no data found")
585 .into_utd_hook_manager_data()
586 .expect("not UtdHookManagerData");
587
588 assert_eq!(read_data, data);
589
590 Ok(())
591 }
592
593 async fn test_one_time_key_already_uploaded_data_saving(&self) -> TestResult {
594 assert!(
596 self.get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded).await?.is_none(),
597 "Store was not empty at start"
598 );
599
600 self.set_kv_data(
601 StateStoreDataKey::OneTimeKeyAlreadyUploaded,
602 StateStoreDataValue::OneTimeKeyAlreadyUploaded,
603 )
604 .await?;
605
606 let data = self.get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded).await?;
607 data.expect("The loaded data should be Some");
608
609 Ok(())
610 }
611
612 async fn test_stripped_member_saving(&self) -> TestResult {
613 let room_id = room_id!("!test_stripped_member_saving:localhost");
614 let user_id = user_id();
615 let second_user_id = user_id!("@second:localhost");
616 let third_user_id = user_id!("@third:localhost");
617 let unknown_user_id = user_id!("@unknown:localhost");
618
619 assert!(self.get_member_event(room_id, user_id).await?.is_none());
621 let member_events = self
622 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
623 room_id,
624 &[user_id.to_owned()],
625 )
626 .await;
627 assert!(member_events?.is_empty());
628
629 let mut changes = StateChanges::default();
631 changes
632 .stripped_state
633 .entry(room_id.to_owned())
634 .or_default()
635 .entry(StateEventType::RoomMember)
636 .or_default()
637 .insert(user_id.into(), stripped_membership_event().cast());
638 self.save_changes(&changes).await?;
639
640 assert!(self.get_member_event(room_id, user_id).await?.is_some());
641 let member_events = self
642 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
643 room_id,
644 &[user_id.to_owned()],
645 )
646 .await;
647 assert_eq!(member_events?.len(), 1);
648 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
649 assert_eq!(members.len(), 1, "We expected to find members for the room");
650
651 let mut changes = StateChanges::default();
653 let changes_members = changes
654 .stripped_state
655 .entry(room_id.to_owned())
656 .or_default()
657 .entry(StateEventType::RoomMember)
658 .or_default();
659 changes_members
660 .insert(second_user_id.into(), custom_stripped_membership_event(second_user_id).cast());
661 changes_members
662 .insert(third_user_id.into(), custom_stripped_membership_event(third_user_id).cast());
663 self.save_changes(&changes).await?;
664
665 assert!(self.get_member_event(room_id, second_user_id).await?.is_some());
666 assert!(self.get_member_event(room_id, third_user_id).await?.is_some());
667 let member_events = self
668 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
669 room_id,
670 &[user_id.to_owned(), second_user_id.to_owned(), third_user_id.to_owned()],
671 )
672 .await;
673 assert_eq!(member_events?.len(), 3);
674 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
675 assert_eq!(members.len(), 3, "We expected to find members for the room");
676
677 let member_events = self
679 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
680 room_id,
681 &[
682 user_id.to_owned(),
683 second_user_id.to_owned(),
684 third_user_id.to_owned(),
685 unknown_user_id.to_owned(),
686 ],
687 )
688 .await;
689 assert_eq!(member_events?.len(), 3);
690
691 let member_events = self
693 .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
694 room_id,
695 &[],
696 )
697 .await;
698 assert!(member_events?.is_empty());
699
700 Ok(())
701 }
702
703 async fn test_power_level_saving(&self) -> TestResult {
704 let room_id = room_id!("!test_power_level_saving:localhost");
705
706 let raw_event = power_level_event();
707 let event = raw_event.deserialize()?;
708
709 assert!(
710 self.get_state_event(room_id, StateEventType::RoomPowerLevels, "").await?.is_none()
711 );
712 let mut changes = StateChanges::default();
713 changes.add_state_event(room_id, event, raw_event);
714
715 self.save_changes(&changes).await?;
716 assert!(
717 self.get_state_event(room_id, StateEventType::RoomPowerLevels, "").await?.is_some()
718 );
719
720 Ok(())
721 }
722
723 async fn test_receipts_saving(&self) -> TestResult {
724 let room_id = room_id!("!test_receipts_saving:localhost");
725
726 let first_event_id = event_id!("$1435641916114394fHBLK:matrix.org");
727 let second_event_id = event_id!("$fHBLK1435641916114394:matrix.org");
728
729 let first_receipt_ts = uint!(1436451550);
730 let second_receipt_ts = uint!(1436451653);
731 let third_receipt_ts = uint!(1436474532);
732
733 let first_receipt_event = serde_json::from_value(json!({
734 first_event_id: {
735 "m.read": {
736 user_id(): {
737 "ts": first_receipt_ts,
738 }
739 }
740 }
741 }))?;
742
743 let second_receipt_event = serde_json::from_value(json!({
744 second_event_id: {
745 "m.read": {
746 user_id(): {
747 "ts": second_receipt_ts,
748 }
749 }
750 }
751 }))?;
752
753 let third_receipt_event = serde_json::from_value(json!({
754 second_event_id: {
755 "m.read": {
756 user_id(): {
757 "ts": third_receipt_ts,
758 "thread_id": "main",
759 }
760 }
761 }
762 }))?;
763
764 assert!(
765 self.get_user_room_receipt_event(
766 room_id,
767 ReceiptType::Read,
768 ReceiptThread::Unthreaded,
769 user_id()
770 )
771 .await
772 .expect("failed to read unthreaded user room receipt")
773 .is_none()
774 );
775 assert!(
776 self.get_event_room_receipt_events(
777 room_id,
778 ReceiptType::Read,
779 ReceiptThread::Unthreaded,
780 first_event_id
781 )
782 .await
783 .expect("failed to read unthreaded event room receipt for 1")
784 .is_empty()
785 );
786 assert!(
787 self.get_event_room_receipt_events(
788 room_id,
789 ReceiptType::Read,
790 ReceiptThread::Unthreaded,
791 second_event_id
792 )
793 .await
794 .expect("failed to read unthreaded event room receipt for 2")
795 .is_empty()
796 );
797
798 let mut changes = StateChanges::default();
799 changes.add_receipts(room_id, first_receipt_event);
800
801 self.save_changes(&changes).await?;
802 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
803 .get_user_room_receipt_event(
804 room_id,
805 ReceiptType::Read,
806 ReceiptThread::Unthreaded,
807 user_id(),
808 )
809 .await
810 .expect("failed to read unthreaded user room receipt after save")
811 .unwrap();
812 assert_eq!(unthreaded_user_receipt_event_id, first_event_id);
813 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, first_receipt_ts);
814 let first_event_unthreaded_receipts = self
815 .get_event_room_receipt_events(
816 room_id,
817 ReceiptType::Read,
818 ReceiptThread::Unthreaded,
819 first_event_id,
820 )
821 .await
822 .expect("failed to read unthreaded event room receipt for 1 after save");
823 assert_eq!(
824 first_event_unthreaded_receipts.len(),
825 1,
826 "Found a wrong number of unthreaded receipts for 1 after save"
827 );
828 assert_eq!(first_event_unthreaded_receipts[0].0, user_id());
829 assert_eq!(first_event_unthreaded_receipts[0].1.ts.unwrap().0, first_receipt_ts);
830 assert!(
831 self.get_event_room_receipt_events(
832 room_id,
833 ReceiptType::Read,
834 ReceiptThread::Unthreaded,
835 second_event_id
836 )
837 .await
838 .expect("failed to read unthreaded event room receipt for 2 after save")
839 .is_empty()
840 );
841
842 let mut changes = StateChanges::default();
843 changes.add_receipts(room_id, second_receipt_event);
844
845 self.save_changes(&changes).await.expect("Saving works");
846 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
847 .get_user_room_receipt_event(
848 room_id,
849 ReceiptType::Read,
850 ReceiptThread::Unthreaded,
851 user_id(),
852 )
853 .await
854 .expect("Getting unthreaded user room receipt after save failed")
855 .unwrap();
856 assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
857 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
858 assert!(
859 self.get_event_room_receipt_events(
860 room_id,
861 ReceiptType::Read,
862 ReceiptThread::Unthreaded,
863 first_event_id
864 )
865 .await
866 .expect("Getting unthreaded event room receipt events for first event failed")
867 .is_empty()
868 );
869 let second_event_unthreaded_receipts = self
870 .get_event_room_receipt_events(
871 room_id,
872 ReceiptType::Read,
873 ReceiptThread::Unthreaded,
874 second_event_id,
875 )
876 .await
877 .expect("Getting unthreaded event room receipt events for second event failed");
878 assert_eq!(
879 second_event_unthreaded_receipts.len(),
880 1,
881 "Found a wrong number of unthreaded receipts for second event after save"
882 );
883 assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
884 assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
885
886 assert!(
887 self.get_user_room_receipt_event(
888 room_id,
889 ReceiptType::Read,
890 ReceiptThread::Main,
891 user_id()
892 )
893 .await
894 .expect("failed to read threaded user room receipt")
895 .is_none()
896 );
897 assert!(
898 self.get_event_room_receipt_events(
899 room_id,
900 ReceiptType::Read,
901 ReceiptThread::Main,
902 second_event_id
903 )
904 .await
905 .expect("Getting threaded event room receipts for 2 failed")
906 .is_empty()
907 );
908
909 let mut changes = StateChanges::default();
910 changes.add_receipts(room_id, third_receipt_event);
911
912 self.save_changes(&changes).await.expect("Saving works");
913 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
915 .get_user_room_receipt_event(
916 room_id,
917 ReceiptType::Read,
918 ReceiptThread::Unthreaded,
919 user_id(),
920 )
921 .await
922 .expect("Getting unthreaded user room receipt after save failed")
923 .unwrap();
924 assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
925 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
926 let second_event_unthreaded_receipts = self
927 .get_event_room_receipt_events(
928 room_id,
929 ReceiptType::Read,
930 ReceiptThread::Unthreaded,
931 second_event_id,
932 )
933 .await
934 .expect("Getting unthreaded event room receipt events for second event failed");
935 assert_eq!(
936 second_event_unthreaded_receipts.len(),
937 1,
938 "Found a wrong number of unthreaded receipts for second event after save"
939 );
940 assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
941 assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
942 let (threaded_user_receipt_event_id, threaded_user_receipt) = self
944 .get_user_room_receipt_event(room_id, ReceiptType::Read, ReceiptThread::Main, user_id())
945 .await
946 .expect("Getting threaded user room receipt after save failed")
947 .unwrap();
948 assert_eq!(threaded_user_receipt_event_id, second_event_id);
949 assert_eq!(threaded_user_receipt.ts.unwrap().0, third_receipt_ts);
950 let second_event_threaded_receipts = self
951 .get_event_room_receipt_events(
952 room_id,
953 ReceiptType::Read,
954 ReceiptThread::Main,
955 second_event_id,
956 )
957 .await
958 .expect("Getting threaded event room receipt events for second event failed");
959 assert_eq!(
960 second_event_threaded_receipts.len(),
961 1,
962 "Found a wrong number of threaded receipts for second event after save"
963 );
964 assert_eq!(second_event_threaded_receipts[0].0, user_id());
965 assert_eq!(second_event_threaded_receipts[0].1.ts.unwrap().0, third_receipt_ts);
966
967 Ok(())
968 }
969
970 async fn test_custom_storage(&self) -> TestResult {
971 let key = "my_key";
972 let value = &[0, 1, 2, 3];
973
974 self.set_custom_value(key.as_bytes(), value.to_vec()).await?;
975
976 let read = self.get_custom_value(key.as_bytes()).await?;
977
978 assert_eq!(Some(value.as_ref()), read.as_deref());
979
980 Ok(())
981 }
982
983 async fn test_stripped_non_stripped(&self) -> TestResult {
984 let room_id = room_id!("!test_stripped_non_stripped:localhost");
985 let user_id = user_id();
986
987 assert!(self.get_member_event(room_id, user_id).await?.is_none());
988 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 0);
989
990 let mut changes = StateChanges::default();
991 changes
992 .state
993 .entry(room_id.to_owned())
994 .or_default()
995 .entry(StateEventType::RoomMember)
996 .or_default()
997 .insert(user_id.into(), membership_event().cast());
998 changes.add_room(RoomInfo::new(room_id, RoomState::Left));
999 self.save_changes(&changes).await?;
1000
1001 let member_event = self.get_member_event(room_id, user_id).await?.unwrap().deserialize()?;
1002 assert!(matches!(member_event, MemberEvent::Sync(_)));
1003 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 1);
1004
1005 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
1006 assert_eq!(members, vec![user_id.to_owned()]);
1007
1008 let mut changes = StateChanges::default();
1009 changes.add_stripped_member(room_id, user_id, custom_stripped_membership_event(user_id));
1010 changes.add_room(RoomInfo::new(room_id, RoomState::Invited));
1011 self.save_changes(&changes).await?;
1012
1013 let member_event = self.get_member_event(room_id, user_id).await?.unwrap().deserialize()?;
1014 assert!(matches!(member_event, MemberEvent::Stripped(_)));
1015 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 1);
1016
1017 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
1018 assert_eq!(members, vec![user_id.to_owned()]);
1019
1020 Ok(())
1021 }
1022
1023 async fn test_room_removal(&self) -> TestResult {
1024 let room_id = room_id();
1025 let user_id = user_id();
1026 let display_name = DisplayName::new("example");
1027 let stripped_room_id = stripped_room_id();
1028
1029 self.populate().await?;
1030
1031 {
1032 let txn = TransactionId::new();
1034 let ev =
1035 SerializableEventContent::new(&RoomMessageEventContent::text_plain("sup").into())?;
1036 self.save_send_queue_request(
1037 room_id,
1038 txn.clone(),
1039 MilliSecondsSinceUnixEpoch::now(),
1040 ev.into(),
1041 0,
1042 )
1043 .await?;
1044
1045 self.save_dependent_queued_request(
1047 room_id,
1048 &txn,
1049 ChildTransactionId::new(),
1050 MilliSecondsSinceUnixEpoch::now(),
1051 DependentQueuedRequestKind::RedactEvent,
1052 )
1053 .await?;
1054 }
1055
1056 self.remove_room(room_id).await?;
1057
1058 assert_eq!(
1059 self.get_room_infos(&RoomLoadSettings::default()).await?.len(),
1060 1,
1061 "room is still there"
1062 );
1063
1064 assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_none());
1065 assert!(
1066 self.get_state_events(room_id, StateEventType::RoomTopic).await?.is_empty(),
1067 "still state events found"
1068 );
1069 assert!(self.get_profile(room_id, user_id).await?.is_none());
1070 assert!(self.get_member_event(room_id, user_id).await?.is_none());
1071 assert!(
1072 self.get_user_ids(room_id, RoomMemberships::empty()).await?.is_empty(),
1073 "still user ids found"
1074 );
1075 assert!(
1076 self.get_user_ids(room_id, RoomMemberships::INVITE).await?.is_empty(),
1077 "still invited user ids found"
1078 );
1079 assert!(
1080 self.get_user_ids(room_id, RoomMemberships::JOIN).await?.is_empty(),
1081 "still joined users found"
1082 );
1083 assert!(
1084 self.get_users_with_display_name(room_id, &display_name).await?.is_empty(),
1085 "still display names found"
1086 );
1087 assert!(
1088 self.get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
1089 .await?
1090 .is_none()
1091 );
1092 assert!(
1093 self.get_user_room_receipt_event(
1094 room_id,
1095 ReceiptType::Read,
1096 ReceiptThread::Unthreaded,
1097 user_id
1098 )
1099 .await?
1100 .is_none()
1101 );
1102 assert!(
1103 self.get_event_room_receipt_events(
1104 room_id,
1105 ReceiptType::Read,
1106 ReceiptThread::Unthreaded,
1107 first_receipt_event_id()
1108 )
1109 .await?
1110 .is_empty(),
1111 "still event recepts in the store"
1112 );
1113 assert!(self.load_send_queue_requests(room_id).await?.is_empty());
1114 assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1115
1116 self.remove_room(stripped_room_id).await?;
1117
1118 assert!(
1119 self.get_room_infos(&RoomLoadSettings::default()).await?.is_empty(),
1120 "still room info found"
1121 );
1122 Ok(())
1123 }
1124
1125 async fn test_profile_removal(&self) -> TestResult {
1126 let room_id = room_id();
1127
1128 let user_id = user_id();
1130 let invited_user_id = invited_user_id();
1131
1132 self.populate().await?;
1133
1134 let new_invite_member_json = json!({
1135 "content": {
1136 "avatar_url": "mxc://localhost/SEsfnsuifSDFSSEG",
1137 "displayname": "example after update",
1138 "membership": "invite",
1139 "reason": "Looking for support"
1140 },
1141 "event_id": "$143273582443PhrSm:localhost",
1142 "origin_server_ts": 1432735824,
1143 "room_id": room_id,
1144 "sender": user_id,
1145 "state_key": invited_user_id,
1146 "type": "m.room.member",
1147 });
1148 let new_invite_member_event: SyncRoomMemberEvent =
1149 serde_json::from_value(new_invite_member_json.clone())?;
1150
1151 let mut changes = StateChanges {
1152 profiles_to_delete: [(
1154 room_id.to_owned(),
1155 vec![user_id.to_owned(), invited_user_id.to_owned()],
1156 )]
1157 .into(),
1158
1159 profiles: {
1161 let mut map = BTreeMap::default();
1162 map.insert(
1163 room_id.to_owned(),
1164 [(invited_user_id.to_owned(), new_invite_member_event.into())]
1165 .into_iter()
1166 .collect(),
1167 );
1168 map
1169 },
1170
1171 ..StateChanges::default()
1172 };
1173
1174 let raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(new_invite_member_json)
1175 .expect("can create sync-state-event for topic");
1176 let event = raw.deserialize()?;
1177 changes.add_state_event(room_id, event, raw);
1178
1179 self.save_changes(&changes).await?;
1180
1181 assert!(self.get_profile(room_id, user_id).await?.is_none());
1183 assert!(self.get_member_event(room_id, user_id).await?.is_some());
1184
1185 let invited_member_event = self.get_profile(room_id, invited_user_id).await?.unwrap();
1187 assert_eq!(
1188 invited_member_event.as_original().unwrap().content.displayname.as_deref(),
1189 Some("example after update")
1190 );
1191 assert!(self.get_member_event(room_id, invited_user_id).await?.is_some());
1192
1193 Ok(())
1194 }
1195
1196 async fn test_presence_saving(&self) -> TestResult {
1197 let user_id = user_id();
1198 let second_user_id = user_id!("@second:localhost");
1199 let third_user_id = user_id!("@third:localhost");
1200 let unknown_user_id = user_id!("@unknown:localhost");
1201
1202 let mut user_ids = vec![user_id.to_owned()];
1204 let presence_event = self.get_presence_event(user_id).await;
1205 assert!(presence_event?.is_none());
1206 let presence_events = self.get_presence_events(&user_ids).await;
1207 assert!(presence_events?.is_empty());
1208
1209 let mut changes = StateChanges::default();
1211 changes.presence.insert(user_id.to_owned(), custom_presence_event(user_id));
1212 self.save_changes(&changes).await?;
1213
1214 let presence_event = self.get_presence_event(user_id).await;
1215 assert!(presence_event?.is_some());
1216 let presence_events = self.get_presence_events(&user_ids).await;
1217 assert_eq!(presence_events?.len(), 1);
1218
1219 let mut changes = StateChanges::default();
1221 changes.presence.insert(second_user_id.to_owned(), custom_presence_event(second_user_id));
1222 changes.presence.insert(third_user_id.to_owned(), custom_presence_event(third_user_id));
1223 self.save_changes(&changes).await?;
1224
1225 user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
1226 let presence_event = self.get_presence_event(second_user_id).await;
1227 assert!(presence_event?.is_some());
1228 let presence_event = self.get_presence_event(third_user_id).await;
1229 assert!(presence_event?.is_some());
1230 let presence_events = self.get_presence_events(&user_ids).await;
1231 assert_eq!(presence_events?.len(), 3);
1232
1233 user_ids.push(unknown_user_id.to_owned());
1235 let member_events = self.get_presence_events(&user_ids).await;
1236 assert_eq!(member_events?.len(), 3);
1237
1238 let presence_events = self.get_presence_events(&[]).await;
1240 assert!(presence_events?.is_empty());
1241
1242 Ok(())
1243 }
1244
1245 async fn test_display_names_saving(&self) -> TestResult {
1246 let room_id = room_id!("!test_display_names_saving:localhost");
1247 let user_id = user_id();
1248 let user_display_name = DisplayName::new("User");
1249 let second_user_id = user_id!("@second:localhost");
1250 let third_user_id = user_id!("@third:localhost");
1251 let other_display_name = DisplayName::new("Raoul");
1252 let unknown_display_name = DisplayName::new("Unknown");
1253
1254 let mut display_names = vec![user_display_name.to_owned()];
1256 let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1257 assert!(users.is_empty());
1258 let names = self.get_users_with_display_names(room_id, &display_names).await?;
1259 assert!(names.is_empty());
1260
1261 let mut changes = StateChanges::default();
1263 changes
1264 .ambiguity_maps
1265 .entry(room_id.to_owned())
1266 .or_default()
1267 .insert(user_display_name.to_owned(), [user_id.to_owned()].into());
1268 self.save_changes(&changes).await?;
1269
1270 let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1271 assert_eq!(users.len(), 1);
1272 let names = self.get_users_with_display_names(room_id, &display_names).await?;
1273 assert_eq!(names.len(), 1);
1274 assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1275
1276 let mut changes = StateChanges::default();
1278 changes.ambiguity_maps.entry(room_id.to_owned()).or_default().insert(
1279 other_display_name.to_owned(),
1280 [second_user_id.to_owned(), third_user_id.to_owned()].into(),
1281 );
1282 self.save_changes(&changes).await?;
1283
1284 display_names.push(other_display_name.to_owned());
1285 let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1286 assert_eq!(users.len(), 1);
1287 let users = self.get_users_with_display_name(room_id, &other_display_name).await?;
1288 assert_eq!(users.len(), 2);
1289 let names = self.get_users_with_display_names(room_id, &display_names).await?;
1290 assert_eq!(names.len(), 2);
1291 assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1292 assert_eq!(names.get(&other_display_name).unwrap().len(), 2);
1293
1294 display_names.push(unknown_display_name.to_owned());
1296 let names = self.get_users_with_display_names(room_id, &display_names).await?;
1297 assert_eq!(names.len(), 2);
1298
1299 let names = self.get_users_with_display_names(room_id, &[]).await?;
1301 assert!(names.is_empty());
1302
1303 Ok(())
1304 }
1305
1306 #[allow(clippy::needless_range_loop)]
1307 async fn test_send_queue(&self) -> TestResult {
1308 let room_id = room_id!("!test_send_queue:localhost");
1309
1310 let events = self.load_send_queue_requests(room_id).await?;
1312 assert!(events.is_empty());
1313
1314 let txn0 = TransactionId::new();
1316 let event0 =
1317 SerializableEventContent::new(&RoomMessageEventContent::text_plain("msg0").into())?;
1318 self.save_send_queue_request(
1319 room_id,
1320 txn0.clone(),
1321 MilliSecondsSinceUnixEpoch::now(),
1322 event0.into(),
1323 0,
1324 )
1325 .await?;
1326
1327 let pending = self.load_send_queue_requests(room_id).await?;
1329
1330 assert_eq!(pending.len(), 1);
1331 {
1332 assert_eq!(pending[0].transaction_id, txn0);
1333
1334 let deserialized = pending[0].as_event().unwrap().deserialize()?;
1335 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1336 assert_eq!(content.body(), "msg0");
1337
1338 assert!(!pending[0].is_wedged());
1339 }
1340
1341 for i in 1..=3 {
1343 let txn = TransactionId::new();
1344 let event = SerializableEventContent::new(
1345 &RoomMessageEventContent::text_plain(format!("msg{i}")).into(),
1346 )?;
1347
1348 self.save_send_queue_request(
1349 room_id,
1350 txn,
1351 MilliSecondsSinceUnixEpoch::now(),
1352 event.into(),
1353 0,
1354 )
1355 .await?;
1356 }
1357
1358 let pending = self.load_send_queue_requests(room_id).await?;
1360
1361 assert_eq!(pending.len(), 4);
1363
1364 assert_eq!(pending[0].transaction_id, txn0);
1365
1366 for i in 0..4 {
1367 let deserialized = pending[i].as_event().unwrap().deserialize()?;
1368 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1369 assert_eq!(content.body(), format!("msg{i}"));
1370 assert!(!pending[i].is_wedged());
1371 }
1372
1373 let txn2 = &pending[2].transaction_id;
1375 self.update_send_queue_request_status(
1376 room_id,
1377 txn2,
1378 Some(QueueWedgeError::GenericApiError { msg: "Oops".to_owned() }),
1379 )
1380 .await?;
1381
1382 let pending = self.load_send_queue_requests(room_id).await?;
1384
1385 assert_eq!(pending.len(), 4);
1387 assert_eq!(pending[0].transaction_id, txn0);
1388 assert_eq!(pending[2].transaction_id, *txn2);
1389 assert!(pending[2].is_wedged());
1390 let error = pending[2].clone().error.unwrap();
1391 let generic_error = assert_matches!(error, QueueWedgeError::GenericApiError { msg } => msg);
1392 assert_eq!(generic_error, "Oops");
1393 for i in 0..4 {
1394 if i != 2 {
1395 assert!(!pending[i].is_wedged());
1396 }
1397 }
1398
1399 let event0 = SerializableEventContent::new(
1401 &RoomMessageEventContent::text_plain("wow that's a cool test").into(),
1402 )?;
1403 self.update_send_queue_request(room_id, txn2, event0.into()).await?;
1404
1405 let pending = self.load_send_queue_requests(room_id).await?;
1407
1408 assert_eq!(pending.len(), 4);
1409 {
1410 assert_eq!(pending[2].transaction_id, *txn2);
1411
1412 let deserialized = pending[2].as_event().unwrap().deserialize()?;
1413 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1414 assert_eq!(content.body(), "wow that's a cool test");
1415
1416 assert!(!pending[2].is_wedged());
1417
1418 for i in 0..4 {
1419 if i != 2 {
1420 let deserialized = pending[i].as_event().unwrap().deserialize()?;
1421 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1422 assert_eq!(content.body(), format!("msg{i}"));
1423
1424 assert!(!pending[i].is_wedged());
1425 }
1426 }
1427 }
1428
1429 self.remove_send_queue_request(room_id, &txn0).await?;
1431
1432 let pending = self.load_send_queue_requests(room_id).await?;
1434
1435 assert_eq!(pending.len(), 3);
1436 assert_eq!(pending[1].transaction_id, *txn2);
1437 for i in 0..3 {
1438 assert_ne!(pending[i].transaction_id, txn0);
1439 }
1440
1441 let room_id2 = room_id!("!test_send_queue_two:localhost");
1446 {
1447 let txn = TransactionId::new();
1448 let event = SerializableEventContent::new(
1449 &RoomMessageEventContent::text_plain("room2").into(),
1450 )?;
1451 self.save_send_queue_request(
1452 room_id2,
1453 txn.clone(),
1454 MilliSecondsSinceUnixEpoch::now(),
1455 event.into(),
1456 0,
1457 )
1458 .await?;
1459 }
1460
1461 {
1463 let room_id3 = room_id!("!test_send_queue_three:localhost");
1464 let txn = TransactionId::new();
1465 let event = SerializableEventContent::new(
1466 &RoomMessageEventContent::text_plain("room3").into(),
1467 )?;
1468 self.save_send_queue_request(
1469 room_id3,
1470 txn.clone(),
1471 MilliSecondsSinceUnixEpoch::now(),
1472 event.into(),
1473 0,
1474 )
1475 .await?;
1476
1477 self.remove_send_queue_request(room_id3, &txn).await?;
1478 }
1479
1480 let outstanding_rooms = self.load_rooms_with_unsent_requests().await?;
1483 assert_eq!(outstanding_rooms.len(), 2);
1484 assert!(outstanding_rooms.iter().any(|room| room == room_id));
1485 assert!(outstanding_rooms.iter().any(|room| room == room_id2));
1486
1487 Ok(())
1488 }
1489
1490 async fn test_send_queue_priority(&self) -> TestResult {
1491 let room_id = room_id!("!test_send_queue:localhost");
1492
1493 let events = self.load_send_queue_requests(room_id).await?;
1495 assert!(events.is_empty());
1496
1497 let low0_txn = TransactionId::new();
1499 let ev0 =
1500 SerializableEventContent::new(&RoomMessageEventContent::text_plain("low0").into())?;
1501 self.save_send_queue_request(
1502 room_id,
1503 low0_txn.clone(),
1504 MilliSecondsSinceUnixEpoch::now(),
1505 ev0.into(),
1506 2,
1507 )
1508 .await?;
1509
1510 let high_txn = TransactionId::new();
1512 let ev1 =
1513 SerializableEventContent::new(&RoomMessageEventContent::text_plain("high").into())?;
1514 self.save_send_queue_request(
1515 room_id,
1516 high_txn.clone(),
1517 MilliSecondsSinceUnixEpoch::now(),
1518 ev1.into(),
1519 10,
1520 )
1521 .await?;
1522
1523 let low1_txn = TransactionId::new();
1525 let ev2 =
1526 SerializableEventContent::new(&RoomMessageEventContent::text_plain("low1").into())?;
1527 self.save_send_queue_request(
1528 room_id,
1529 low1_txn.clone(),
1530 MilliSecondsSinceUnixEpoch::now(),
1531 ev2.into(),
1532 2,
1533 )
1534 .await?;
1535
1536 let pending = self.load_send_queue_requests(room_id).await?;
1539
1540 assert_eq!(pending.len(), 3);
1541 {
1542 assert_eq!(pending[0].transaction_id, high_txn);
1543
1544 let deserialized = pending[0].as_event().unwrap().deserialize()?;
1545 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1546 assert_eq!(content.body(), "high");
1547 }
1548
1549 {
1550 assert_eq!(pending[1].transaction_id, low0_txn);
1551
1552 let deserialized = pending[1].as_event().unwrap().deserialize()?;
1553 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1554 assert_eq!(content.body(), "low0");
1555 }
1556
1557 {
1558 assert_eq!(pending[2].transaction_id, low1_txn);
1559
1560 let deserialized = pending[2].as_event().unwrap().deserialize()?;
1561 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1562 assert_eq!(content.body(), "low1");
1563 }
1564
1565 Ok(())
1566 }
1567
1568 async fn test_send_queue_dependents(&self) -> TestResult {
1569 let room_id = room_id!("!test_send_queue_dependents:localhost");
1570
1571 let txn0 = TransactionId::new();
1573 let event0 =
1574 SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into())?;
1575 self.save_send_queue_request(
1576 room_id,
1577 txn0.clone(),
1578 MilliSecondsSinceUnixEpoch::now(),
1579 event0.into(),
1580 0,
1581 )
1582 .await?;
1583
1584 assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1586
1587 let child_txn = ChildTransactionId::new();
1589 self.save_dependent_queued_request(
1590 room_id,
1591 &txn0,
1592 child_txn.clone(),
1593 MilliSecondsSinceUnixEpoch::now(),
1594 DependentQueuedRequestKind::RedactEvent,
1595 )
1596 .await?;
1597
1598 let dependents = self.load_dependent_queued_requests(room_id).await?;
1600 assert_eq!(dependents.len(), 1);
1601 assert_eq!(dependents[0].parent_transaction_id, txn0);
1602 assert_eq!(dependents[0].own_transaction_id, child_txn);
1603 assert!(dependents[0].parent_key.is_none());
1604 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1605
1606 let event_id = owned_event_id!("$1");
1608 let num_updated = self
1609 .mark_dependent_queued_requests_as_ready(
1610 room_id,
1611 &txn0,
1612 SentRequestKey::Event(event_id.clone()),
1613 )
1614 .await?;
1615 assert_eq!(num_updated, 1);
1616
1617 let dependents = self.load_dependent_queued_requests(room_id).await?;
1619 assert_eq!(dependents.len(), 1);
1620 assert_eq!(dependents[0].parent_transaction_id, txn0);
1621 assert_eq!(dependents[0].own_transaction_id, child_txn);
1622 assert_matches!(dependents[0].parent_key.as_ref(), Some(SentRequestKey::Event(eid)) => {
1623 assert_eq!(*eid, event_id);
1624 });
1625 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1626
1627 let removed = self
1629 .remove_dependent_queued_request(room_id, &dependents[0].own_transaction_id)
1630 .await?;
1631 assert!(removed);
1632
1633 assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1635
1636 let txn1 = TransactionId::new();
1639 let event1 =
1640 SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into())?;
1641 self.save_send_queue_request(
1642 room_id,
1643 txn1.clone(),
1644 MilliSecondsSinceUnixEpoch::now(),
1645 event1.into(),
1646 0,
1647 )
1648 .await?;
1649
1650 self.save_dependent_queued_request(
1651 room_id,
1652 &txn0,
1653 ChildTransactionId::new(),
1654 MilliSecondsSinceUnixEpoch::now(),
1655 DependentQueuedRequestKind::RedactEvent,
1656 )
1657 .await?;
1658 assert_eq!(self.load_dependent_queued_requests(room_id).await?.len(), 1);
1659
1660 self.save_dependent_queued_request(
1661 room_id,
1662 &txn1,
1663 ChildTransactionId::new(),
1664 MilliSecondsSinceUnixEpoch::now(),
1665 DependentQueuedRequestKind::EditEvent {
1666 new_content: SerializableEventContent::new(
1667 &RoomMessageEventContent::text_plain("edit").into(),
1668 )?,
1669 },
1670 )
1671 .await?;
1672 assert_eq!(self.load_dependent_queued_requests(room_id).await?.len(), 2);
1673
1674 let removed = self.remove_send_queue_request(room_id, &txn0).await?;
1676 assert!(removed);
1677
1678 let dependents = self.load_dependent_queued_requests(room_id).await?;
1680 assert_eq!(dependents.len(), 2);
1681
1682 Ok(())
1683 }
1684
1685 async fn test_update_send_queue_dependent(&self) -> TestResult {
1686 let room_id = room_id!("!test_send_queue_dependents:localhost");
1687
1688 let txn = TransactionId::new();
1689
1690 let child_txn = ChildTransactionId::new();
1692
1693 self.save_dependent_queued_request(
1694 room_id,
1695 &txn,
1696 child_txn.clone(),
1697 MilliSecondsSinceUnixEpoch::now(),
1698 DependentQueuedRequestKind::RedactEvent,
1699 )
1700 .await?;
1701
1702 let dependents = self.load_dependent_queued_requests(room_id).await?;
1704 assert_eq!(dependents.len(), 1);
1705 assert_eq!(dependents[0].parent_transaction_id, txn);
1706 assert_eq!(dependents[0].own_transaction_id, child_txn);
1707 assert!(dependents[0].parent_key.is_none());
1708 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1709
1710 self.update_dependent_queued_request(
1712 room_id,
1713 &child_txn,
1714 DependentQueuedRequestKind::ReactEvent { key: "👍".to_owned() },
1715 )
1716 .await?;
1717
1718 let dependents = self.load_dependent_queued_requests(room_id).await?;
1720 assert_eq!(dependents.len(), 1);
1721 assert_eq!(dependents[0].parent_transaction_id, txn);
1722 assert_eq!(dependents[0].own_transaction_id, child_txn);
1723 assert!(dependents[0].parent_key.is_none());
1724 assert_matches!(
1725 &dependents[0].kind,
1726 DependentQueuedRequestKind::ReactEvent { key } => {
1727 assert_eq!(key, "👍");
1728 }
1729 );
1730
1731 Ok(())
1732 }
1733
1734 async fn test_get_room_infos(&self) -> TestResult {
1735 let room_id_0 = room_id!("!r0");
1736 let room_id_1 = room_id!("!r1");
1737 let room_id_2 = room_id!("!r2");
1738
1739 {
1741 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 0);
1742 }
1743
1744 let mut changes = StateChanges::default();
1746 changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
1747 changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
1748 self.save_changes(&changes).await?;
1749
1750 {
1752 let mut all_rooms = self.get_room_infos(&RoomLoadSettings::All).await?;
1753
1754 all_rooms.sort_by(|a, b| a.room_id.cmp(&b.room_id));
1757
1758 assert_eq!(all_rooms.len(), 2);
1759 assert_eq!(all_rooms[0].room_id, room_id_0);
1760 assert_eq!(all_rooms[1].room_id, room_id_1);
1761 }
1762
1763 {
1765 let all_rooms =
1766 self.get_room_infos(&RoomLoadSettings::One(room_id_1.to_owned())).await?;
1767
1768 assert_eq!(all_rooms.len(), 1);
1769 assert_eq!(all_rooms[0].room_id, room_id_1);
1770 }
1771
1772 {
1775 let all_rooms =
1776 self.get_room_infos(&RoomLoadSettings::One(room_id_2.to_owned())).await?;
1777
1778 assert_eq!(all_rooms.len(), 0);
1779 }
1780
1781 Ok(())
1782 }
1783
1784 async fn test_thread_subscriptions(&self) -> TestResult {
1785 let first_thread = event_id!("$t1");
1786 let second_thread = event_id!("$t2");
1787
1788 let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1790 assert!(maybe_sub.is_none());
1791
1792 let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1793 assert!(maybe_sub.is_none());
1794
1795 self.upsert_thread_subscription(
1797 room_id(),
1798 first_thread,
1799 StoredThreadSubscription {
1800 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1801 bump_stamp: None,
1802 },
1803 )
1804 .await?;
1805
1806 self.upsert_thread_subscription(
1807 room_id(),
1808 second_thread,
1809 StoredThreadSubscription {
1810 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1811 bump_stamp: None,
1812 },
1813 )
1814 .await?;
1815
1816 let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1818 assert_eq!(
1819 maybe_sub,
1820 Some(StoredThreadSubscription {
1821 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1822 bump_stamp: None,
1823 })
1824 );
1825
1826 let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1827 assert_eq!(
1828 maybe_sub,
1829 Some(StoredThreadSubscription {
1830 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1831 bump_stamp: None,
1832 })
1833 );
1834
1835 self.upsert_thread_subscription(
1837 room_id(),
1838 first_thread,
1839 StoredThreadSubscription {
1840 status: ThreadSubscriptionStatus::Unsubscribed,
1841 bump_stamp: None,
1842 },
1843 )
1844 .await?;
1845
1846 let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1848 assert_eq!(
1849 maybe_sub,
1850 Some(StoredThreadSubscription {
1851 status: ThreadSubscriptionStatus::Unsubscribed,
1852 bump_stamp: None,
1853 })
1854 );
1855
1856 let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1858 assert_eq!(
1859 maybe_sub,
1860 Some(StoredThreadSubscription {
1861 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1862 bump_stamp: None,
1863 })
1864 );
1865
1866 self.remove_thread_subscription(room_id(), second_thread).await?;
1868
1869 let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1871 assert_eq!(maybe_sub, None);
1872
1873 let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1875 assert_eq!(
1876 maybe_sub,
1877 Some(StoredThreadSubscription {
1878 status: ThreadSubscriptionStatus::Unsubscribed,
1879 bump_stamp: None,
1880 })
1881 );
1882
1883 self.remove_thread_subscription(room_id(), second_thread).await?;
1885
1886 Ok(())
1887 }
1888
1889 async fn test_thread_subscriptions_bumpstamps(&self) -> TestResult {
1890 let thread = event_id!("$fred");
1891
1892 let sub = self.load_thread_subscription(room_id(), thread).await?;
1894 assert!(sub.is_none());
1895
1896 self.upsert_thread_subscription(
1898 room_id(),
1899 thread,
1900 StoredThreadSubscription {
1901 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1902 bump_stamp: Some(42),
1903 },
1904 )
1905 .await?;
1906
1907 let sub = self.load_thread_subscription(room_id(), thread).await?.unwrap();
1908 assert_eq!(
1909 sub,
1910 StoredThreadSubscription {
1911 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1912 bump_stamp: Some(42),
1913 }
1914 );
1915
1916 self.upsert_thread_subscription(
1918 room_id(),
1919 thread,
1920 StoredThreadSubscription {
1921 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1922 bump_stamp: Some(41),
1923 },
1924 )
1925 .await?;
1926
1927 let sub = self.load_thread_subscription(room_id(), thread).await?.unwrap();
1928 assert_eq!(
1929 sub,
1930 StoredThreadSubscription {
1931 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1932 bump_stamp: Some(42),
1933 }
1934 );
1935
1936 self.upsert_thread_subscription(
1938 room_id(),
1939 thread,
1940 StoredThreadSubscription {
1941 status: ThreadSubscriptionStatus::Unsubscribed,
1942 bump_stamp: None,
1943 },
1944 )
1945 .await?;
1946
1947 let sub = self.load_thread_subscription(room_id(), thread).await?.unwrap();
1948 assert_eq!(
1949 sub,
1950 StoredThreadSubscription {
1951 status: ThreadSubscriptionStatus::Unsubscribed,
1952 bump_stamp: Some(42),
1953 }
1954 );
1955
1956 Ok(())
1957 }
1958}
1959
1960#[allow(unused_macros, unused_extern_crates)]
1986#[macro_export]
1987macro_rules! statestore_integration_tests {
1988 () => {
1989 mod statestore_integration_tests {
1990 use matrix_sdk_test::{TestResult, async_test};
1991 use $crate::store::{IntoStateStore, StateStoreIntegrationTests};
1992
1993 use super::get_store;
1994
1995 #[async_test]
1996 async fn test_topic_redaction() -> TestResult {
1997 let store = get_store().await?.into_state_store();
1998 store.test_topic_redaction().await
1999 }
2000
2001 #[async_test]
2002 async fn test_populate_store() -> TestResult {
2003 let store = get_store().await?.into_state_store();
2004 store.test_populate_store().await
2005 }
2006
2007 #[async_test]
2008 async fn test_member_saving() -> TestResult {
2009 let store = get_store().await?.into_state_store();
2010 store.test_member_saving().await
2011 }
2012
2013 #[async_test]
2014 async fn test_filter_saving() -> TestResult {
2015 let store = get_store().await?.into_state_store();
2016 store.test_filter_saving().await
2017 }
2018
2019 #[async_test]
2020 async fn test_user_avatar_url_saving() -> TestResult {
2021 let store = get_store().await?.into_state_store();
2022 store.test_user_avatar_url_saving().await
2023 }
2024
2025 #[async_test]
2026 async fn test_server_info_saving() -> TestResult {
2027 let store = get_store().await?.into_state_store();
2028 store.test_server_info_saving().await
2029 }
2030
2031 #[async_test]
2032 async fn test_sync_token_saving() -> TestResult {
2033 let store = get_store().await?.into_state_store();
2034 store.test_sync_token_saving().await
2035 }
2036
2037 #[async_test]
2038 async fn test_utd_hook_manager_data_saving() -> TestResult {
2039 let store = get_store().await?.into_state_store();
2040 store.test_utd_hook_manager_data_saving().await
2041 }
2042
2043 #[async_test]
2044 async fn test_one_time_key_already_uploaded_data_saving() -> TestResult {
2045 let store = get_store().await?.into_state_store();
2046 store.test_one_time_key_already_uploaded_data_saving().await
2047 }
2048
2049 #[async_test]
2050 async fn test_stripped_member_saving() -> TestResult {
2051 let store = get_store().await?.into_state_store();
2052 store.test_stripped_member_saving().await
2053 }
2054
2055 #[async_test]
2056 async fn test_power_level_saving() -> TestResult {
2057 let store = get_store().await?.into_state_store();
2058 store.test_power_level_saving().await
2059 }
2060
2061 #[async_test]
2062 async fn test_receipts_saving() -> TestResult {
2063 let store = get_store().await?.into_state_store();
2064 store.test_receipts_saving().await
2065 }
2066
2067 #[async_test]
2068 async fn test_custom_storage() -> TestResult {
2069 let store = get_store().await?.into_state_store();
2070 store.test_custom_storage().await
2071 }
2072
2073 #[async_test]
2074 async fn test_stripped_non_stripped() -> TestResult {
2075 let store = get_store().await?.into_state_store();
2076 store.test_stripped_non_stripped().await
2077 }
2078
2079 #[async_test]
2080 async fn test_room_removal() -> TestResult {
2081 let store = get_store().await?.into_state_store();
2082 store.test_room_removal().await
2083 }
2084
2085 #[async_test]
2086 async fn test_profile_removal() -> TestResult {
2087 let store = get_store().await?.into_state_store();
2088 store.test_profile_removal().await
2089 }
2090
2091 #[async_test]
2092 async fn test_presence_saving() -> TestResult {
2093 let store = get_store().await?.into_state_store();
2094 store.test_presence_saving().await
2095 }
2096
2097 #[async_test]
2098 async fn test_display_names_saving() -> TestResult {
2099 let store = get_store().await?.into_state_store();
2100 store.test_display_names_saving().await
2101 }
2102
2103 #[async_test]
2104 async fn test_send_queue() -> TestResult {
2105 let store = get_store().await?.into_state_store();
2106 store.test_send_queue().await
2107 }
2108
2109 #[async_test]
2110 async fn test_send_queue_priority() -> TestResult {
2111 let store = get_store().await?.into_state_store();
2112 store.test_send_queue_priority().await
2113 }
2114
2115 #[async_test]
2116 async fn test_send_queue_dependents() -> TestResult {
2117 let store = get_store().await?.into_state_store();
2118 store.test_send_queue_dependents().await
2119 }
2120
2121 #[async_test]
2122 async fn test_update_send_queue_dependent() -> TestResult {
2123 let store = get_store().await?.into_state_store();
2124 store.test_update_send_queue_dependent().await
2125 }
2126
2127 #[async_test]
2128 async fn test_get_room_infos() -> TestResult {
2129 let store = get_store().await?.into_state_store();
2130 store.test_get_room_infos().await
2131 }
2132
2133 #[async_test]
2134 async fn test_thread_subscriptions() -> TestResult {
2135 let store = get_store().await?.into_state_store();
2136 store.test_thread_subscriptions().await
2137 }
2138
2139 #[async_test]
2140 async fn test_thread_subscriptions_bumpstamps() -> TestResult {
2141 let store = get_store().await?.into_state_store();
2142 store.test_thread_subscriptions_bumpstamps().await
2143 }
2144 }
2145 };
2146}
2147
2148fn user_id() -> &'static UserId {
2149 user_id!("@example:localhost")
2150}
2151
2152fn invited_user_id() -> &'static UserId {
2153 user_id!("@invited:localhost")
2154}
2155
2156fn room_id() -> &'static RoomId {
2157 room_id!("!test:localhost")
2158}
2159
2160fn stripped_room_id() -> &'static RoomId {
2161 room_id!("!stripped:localhost")
2162}
2163
2164fn first_receipt_event_id() -> &'static EventId {
2165 event_id!("$example")
2166}
2167
2168fn power_level_event() -> Raw<AnySyncStateEvent> {
2169 let content = RoomPowerLevelsEventContent::new(&AuthorizationRules::V1);
2170
2171 let event = json!({
2172 "event_id": "$h29iv0s8:example.com",
2173 "content": content,
2174 "sender": user_id(),
2175 "type": "m.room.power_levels",
2176 "origin_server_ts": 0u64,
2177 "state_key": "",
2178 });
2179
2180 serde_json::from_value(event).unwrap()
2181}
2182
2183fn stripped_membership_event() -> Raw<StrippedRoomMemberEvent> {
2184 custom_stripped_membership_event(user_id())
2185}
2186
2187fn custom_stripped_membership_event(user_id: &UserId) -> Raw<StrippedRoomMemberEvent> {
2188 let ev_json = json!({
2189 "type": "m.room.member",
2190 "content": RoomMemberEventContent::new(MembershipState::Join),
2191 "sender": user_id,
2192 "state_key": user_id,
2193 });
2194
2195 Raw::new(&ev_json).unwrap().cast_unchecked()
2196}
2197
2198fn membership_event() -> Raw<SyncRoomMemberEvent> {
2199 custom_membership_event(user_id(), event_id!("$h29iv0s8:example.com"))
2200}
2201
2202fn custom_membership_event(user_id: &UserId, event_id: &EventId) -> Raw<SyncRoomMemberEvent> {
2203 let ev_json = json!({
2204 "type": "m.room.member",
2205 "content": RoomMemberEventContent::new(MembershipState::Join),
2206 "event_id": event_id,
2207 "origin_server_ts": 198,
2208 "sender": user_id,
2209 "state_key": user_id,
2210 });
2211
2212 Raw::new(&ev_json).unwrap().cast_unchecked()
2213}
2214
2215fn custom_presence_event(user_id: &UserId) -> Raw<PresenceEvent> {
2216 let ev_json = json!({
2217 "content": {
2218 "presence": "online"
2219 },
2220 "sender": user_id,
2221 });
2222
2223 Raw::new(&ev_json).unwrap().cast_unchecked()
2224}