1use std::collections::{BTreeMap, BTreeSet, HashMap};
4
5use assert_matches::assert_matches;
6use assert_matches2::assert_let;
7use growable_bloom_filter::GrowableBloomBuilder;
8use matrix_sdk_test::{TestResult, event_factory::EventFactory, test_json};
9use ruma::{
10 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId, RoomId, TransactionId, UserId,
11 api::{
12 FeatureFlag, MatrixVersion,
13 client::discovery::discover_homeserver::{HomeserverInfo, RtcFocusInfo},
14 },
15 event_id,
16 events::{
17 AnyGlobalAccountDataEvent, AnyMessageLikeEventContent, AnyRoomAccountDataEvent,
18 AnyStrippedStateEvent, AnySyncStateEvent, GlobalAccountDataEventType,
19 RoomAccountDataEventType, StateEventType, SyncStateEvent,
20 presence::PresenceEvent,
21 receipt::{ReceiptThread, ReceiptType},
22 room::{
23 member::{
24 MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent,
25 SyncRoomMemberEvent,
26 },
27 message::RoomMessageEventContent,
28 power_levels::RoomPowerLevelsEventContent,
29 topic::RoomTopicEventContent,
30 },
31 },
32 owned_event_id, owned_mxc_uri,
33 push::Ruleset,
34 room_id,
35 room_version_rules::AuthorizationRules,
36 serde::Raw,
37 uint, user_id,
38};
39use serde_json::{json, value::Value as JsonValue};
40
41use super::{
42 DependentQueuedRequestKind, DisplayName, DynStateStore, RoomLoadSettings, ServerInfo,
43 WellKnownResponse, send_queue::SentRequestKey,
44};
45use crate::{
46 RoomInfo, RoomMemberships, RoomState, StateChanges, StateStoreDataKey, StateStoreDataValue,
47 deserialized_responses::MemberEvent,
48 store::{
49 ChildTransactionId, QueueWedgeError, SerializableEventContent, StateStoreExt,
50 ThreadSubscription,
51 },
52};
53
54#[allow(async_fn_in_trait)]
59pub trait StateStoreIntegrationTests {
60 async fn populate(&self) -> TestResult;
62 async fn test_topic_redaction(&self) -> TestResult;
64 async fn test_populate_store(&self) -> TestResult;
66 async fn test_member_saving(&self) -> TestResult;
68 async fn test_filter_saving(&self) -> TestResult;
70 async fn test_user_avatar_url_saving(&self) -> TestResult;
72 async fn test_sync_token_saving(&self) -> TestResult;
74 async fn test_utd_hook_manager_data_saving(&self) -> TestResult;
76 async fn test_one_time_key_already_uploaded_data_saving(&self) -> TestResult;
78 async fn test_stripped_member_saving(&self) -> TestResult;
80 async fn test_power_level_saving(&self) -> TestResult;
82 async fn test_receipts_saving(&self) -> TestResult;
84 async fn test_custom_storage(&self) -> TestResult;
86 async fn test_stripped_non_stripped(&self) -> TestResult;
88 async fn test_room_removal(&self) -> TestResult;
90 async fn test_profile_removal(&self) -> TestResult;
92 async fn test_presence_saving(&self) -> TestResult;
94 async fn test_display_names_saving(&self) -> TestResult;
96 async fn test_send_queue(&self) -> TestResult;
98 async fn test_send_queue_priority(&self) -> TestResult;
100 async fn test_send_queue_dependents(&self) -> TestResult;
102 async fn test_update_send_queue_dependent(&self) -> TestResult;
104 async fn test_server_info_saving(&self) -> TestResult;
106 async fn test_get_room_infos(&self) -> TestResult;
108 async fn test_thread_subscriptions(&self) -> TestResult;
110}
111
112impl StateStoreIntegrationTests for DynStateStore {
113 async fn populate(&self) -> TestResult {
114 let f = EventFactory::new();
115 let mut changes = StateChanges::default();
116
117 let user_id = user_id();
118 let invited_user_id = invited_user_id();
119 let room_id = room_id();
120 let stripped_room_id = stripped_room_id();
121
122 changes.sync_token = Some("t392-516_47314_0_7_1_1_1_11444_1".to_owned());
123
124 let presence_json: &JsonValue = &test_json::PRESENCE;
125 let presence_raw = serde_json::from_value::<Raw<PresenceEvent>>(presence_json.clone())?;
126 let presence_event = presence_raw.deserialize()?;
127 changes.add_presence_event(presence_event, presence_raw);
128
129 let pushrules_raw: Raw<AnyGlobalAccountDataEvent> =
130 f.push_rules(Ruleset::server_default(user_id)).into_raw();
131 let pushrules_event = pushrules_raw.deserialize()?;
132 changes.account_data.insert(pushrules_event.event_type(), pushrules_raw);
133
134 let mut room = RoomInfo::new(room_id, RoomState::Joined);
135 room.mark_as_left();
136
137 let tag_json: &JsonValue = &test_json::TAG;
138 let tag_raw = serde_json::from_value::<Raw<AnyRoomAccountDataEvent>>(tag_json.clone())?;
139 let tag_event = tag_raw.deserialize()?;
140 changes.add_room_account_data(room_id, tag_event, tag_raw);
141
142 let name_json: &JsonValue = &test_json::NAME;
143 let name_raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(name_json.clone())?;
144 let name_event = name_raw.deserialize()?;
145 room.handle_state_event(&name_event);
146 changes.add_state_event(room_id, name_event, name_raw);
147
148 let topic_json: &JsonValue = &test_json::TOPIC;
149 let topic_raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(topic_json.clone())?;
150 let topic_event = topic_raw.deserialize()?;
151 room.handle_state_event(&topic_event);
152 changes.add_state_event(room_id, topic_event, topic_raw);
153
154 let mut room_ambiguity_map = HashMap::new();
155 let mut room_profiles = BTreeMap::new();
156
157 let member_json: &JsonValue = &test_json::MEMBER;
158 let member_event: SyncRoomMemberEvent = serde_json::from_value(member_json.clone())?;
159 let displayname = DisplayName::new(
160 member_event.as_original().unwrap().content.displayname.as_ref().unwrap(),
161 );
162 room_ambiguity_map.insert(displayname.clone(), BTreeSet::from([user_id.to_owned()]));
163 room_profiles.insert(user_id.to_owned(), (&member_event).into());
164
165 let member_state_raw =
166 serde_json::from_value::<Raw<AnySyncStateEvent>>(member_json.clone())?;
167 let member_state_event = member_state_raw.deserialize()?;
168 changes.add_state_event(room_id, member_state_event, member_state_raw);
169
170 let invited_member_json: &JsonValue = &test_json::MEMBER_INVITE;
171 let invited_member_event: SyncRoomMemberEvent =
173 serde_json::from_value(invited_member_json.clone())?;
174 room_ambiguity_map.entry(displayname).or_default().insert(invited_user_id.to_owned());
175 room_profiles.insert(invited_user_id.to_owned(), (&invited_member_event).into());
176
177 let invited_member_state_raw =
178 serde_json::from_value::<Raw<AnySyncStateEvent>>(invited_member_json.clone())?;
179 let invited_member_state_event = invited_member_state_raw.deserialize()?;
180 changes.add_state_event(room_id, invited_member_state_event, invited_member_state_raw);
181
182 let receipt_content = f
183 .room(room_id)
184 .read_receipts()
185 .add(event_id!("$example"), user_id, ReceiptType::Read, ReceiptThread::Unthreaded)
186 .into_content();
187 changes.add_receipts(room_id, receipt_content);
188
189 changes.ambiguity_maps.insert(room_id.to_owned(), room_ambiguity_map);
190 changes.profiles.insert(room_id.to_owned(), room_profiles);
191 changes.add_room(room);
192
193 let mut stripped_room = RoomInfo::new(stripped_room_id, RoomState::Invited);
194
195 let stripped_name_json: &JsonValue = &test_json::NAME_STRIPPED;
196 let stripped_name_raw =
197 serde_json::from_value::<Raw<AnyStrippedStateEvent>>(stripped_name_json.clone())?;
198 let stripped_name_event = stripped_name_raw.deserialize()?;
199 stripped_room.handle_stripped_state_event(&stripped_name_event);
200 changes.stripped_state.insert(
201 stripped_room_id.to_owned(),
202 BTreeMap::from([(
203 stripped_name_event.event_type(),
204 BTreeMap::from([(
205 stripped_name_event.state_key().to_owned(),
206 stripped_name_raw.clone(),
207 )]),
208 )]),
209 );
210
211 changes.add_room(stripped_room);
212
213 let stripped_member_json: &JsonValue = &test_json::MEMBER_STRIPPED;
214 let stripped_member_event = Raw::new(&stripped_member_json.clone())?.cast_unchecked();
215 changes.add_stripped_member(stripped_room_id, user_id, stripped_member_event);
216
217 self.save_changes(&changes).await?;
218
219 Ok(())
220 }
221
222 async fn test_topic_redaction(&self) -> TestResult {
223 let room_id = room_id();
224 self.populate().await?;
225
226 assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
227 assert_eq!(
228 self.get_state_event_static::<RoomTopicEventContent>(room_id)
229 .await?
230 .expect("room topic found before redaction")
231 .deserialize()
232 .expect("can deserialize room topic before redaction")
233 .as_sync()
234 .expect("room topic is a sync state event")
235 .as_original()
236 .expect("room topic is not redacted yet")
237 .content
238 .topic,
239 "😀"
240 );
241
242 let mut changes = StateChanges::default();
243
244 let redaction_json: &JsonValue = &test_json::TOPIC_REDACTION;
245 let redaction_evt: Raw<_> = serde_json::from_value(redaction_json.clone())
246 .expect("topic redaction event making works");
247 let redacted_event_id: OwnedEventId = redaction_evt.get_field("redacts")?.unwrap();
248
249 changes.add_redaction(room_id, &redacted_event_id, redaction_evt);
250 self.save_changes(&changes).await?;
251
252 let redacted_event = self
253 .get_state_event_static::<RoomTopicEventContent>(room_id)
254 .await?
255 .expect("room topic found after redaction")
256 .deserialize()
257 .expect("can deserialize room topic after redaction");
258
259 assert_matches!(redacted_event.as_sync(), Some(SyncStateEvent::Redacted(_)));
260
261 Ok(())
262 }
263
264 async fn test_populate_store(&self) -> TestResult {
265 let room_id = room_id();
266 let user_id = user_id();
267 let display_name = DisplayName::new("example");
268
269 self.populate().await?;
270
271 assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
272 assert!(self.get_presence_event(user_id).await?.is_some());
273 assert_eq!(
274 self.get_room_infos(&RoomLoadSettings::default()).await?.len(),
275 2,
276 "Expected to find 2 room infos"
277 );
278 assert!(
279 self.get_account_data_event(GlobalAccountDataEventType::PushRules).await?.is_some()
280 );
281
282 assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_some());
283 assert_eq!(
284 self.get_state_events(room_id, StateEventType::RoomTopic).await?.len(),
285 1,
286 "Expected to find 1 room topic"
287 );
288 assert!(self.get_profile(room_id, user_id).await?.is_some());
289 assert!(self.get_member_event(room_id, user_id).await?.is_some());
290 assert_eq!(
291 self.get_user_ids(room_id, RoomMemberships::empty()).await?.len(),
292 2,
293 "Expected to find 2 members for room"
294 );
295 assert_eq!(
296 self.get_user_ids(room_id, RoomMemberships::INVITE).await?.len(),
297 1,
298 "Expected to find 1 invited user ids"
299 );
300 assert_eq!(
301 self.get_user_ids(room_id, RoomMemberships::JOIN).await?.len(),
302 1,
303 "Expected to find 1 joined user ids"
304 );
305 assert_eq!(
306 self.get_users_with_display_name(room_id, &display_name).await?.len(),
307 2,
308 "Expected to find 2 display names for room"
309 );
310 assert!(
311 self.get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
312 .await?
313 .is_some()
314 );
315 assert!(
316 self.get_user_room_receipt_event(
317 room_id,
318 ReceiptType::Read,
319 ReceiptThread::Unthreaded,
320 user_id
321 )
322 .await?
323 .is_some()
324 );
325 assert_eq!(
326 self.get_event_room_receipt_events(
327 room_id,
328 ReceiptType::Read,
329 ReceiptThread::Unthreaded,
330 first_receipt_event_id()
331 )
332 .await?
333 .len(),
334 1,
335 "Expected to find 1 read receipt"
336 );
337 Ok(())
338 }
339
340 async fn test_member_saving(&self) -> TestResult {
341 let room_id = room_id!("!test_member_saving:localhost");
342 let user_id = user_id();
343 let second_user_id = user_id!("@second:localhost");
344 let third_user_id = user_id!("@third:localhost");
345 let unknown_user_id = user_id!("@unknown:localhost");
346
347 let mut user_ids = vec![user_id.to_owned()];
349 assert!(self.get_member_event(room_id, user_id).await?.is_none());
350 let member_events = self
351 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
352 .await;
353 assert!(member_events?.is_empty());
354 assert!(self.get_profile(room_id, user_id).await?.is_none());
355 let profiles = self.get_profiles(room_id, &user_ids).await;
356 assert!(profiles?.is_empty());
357
358 let mut changes = StateChanges::default();
360 let raw_member_event = membership_event();
361 let profile = raw_member_event.deserialize()?.into();
362 changes
363 .state
364 .entry(room_id.to_owned())
365 .or_default()
366 .entry(StateEventType::RoomMember)
367 .or_default()
368 .insert(user_id.into(), raw_member_event.cast());
369 changes.profiles.entry(room_id.to_owned()).or_default().insert(user_id.to_owned(), profile);
370 self.save_changes(&changes).await?;
371
372 assert!(self.get_member_event(room_id, user_id).await?.is_some());
373 let member_events = self
374 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
375 .await;
376 assert_eq!(member_events?.len(), 1);
377 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
378 assert_eq!(members.len(), 1, "We expected to find members for the room");
379 assert!(self.get_profile(room_id, user_id).await?.is_some());
380 let profiles = self.get_profiles(room_id, &user_ids).await;
381 assert_eq!(profiles?.len(), 1);
382
383 let mut changes = StateChanges::default();
385 let changes_members = changes
386 .state
387 .entry(room_id.to_owned())
388 .or_default()
389 .entry(StateEventType::RoomMember)
390 .or_default();
391 let changes_profiles = changes.profiles.entry(room_id.to_owned()).or_default();
392 let raw_second_member_event =
393 custom_membership_event(second_user_id, event_id!("$second_member_event"));
394 let second_profile = raw_second_member_event.deserialize()?.into();
395 changes_members.insert(second_user_id.into(), raw_second_member_event.cast());
396 changes_profiles.insert(second_user_id.to_owned(), second_profile);
397 let raw_third_member_event =
398 custom_membership_event(third_user_id, event_id!("$third_member_event"));
399 let third_profile = raw_third_member_event.deserialize()?.into();
400 changes_members.insert(third_user_id.into(), raw_third_member_event.cast());
401 changes_profiles.insert(third_user_id.to_owned(), third_profile);
402 self.save_changes(&changes).await?;
403
404 user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
405 assert!(self.get_member_event(room_id, second_user_id).await?.is_some());
406 assert!(self.get_member_event(room_id, third_user_id).await?.is_some());
407 let member_events = self
408 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
409 .await;
410 assert_eq!(member_events?.len(), 3);
411 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
412 assert_eq!(members.len(), 3, "We expected to find members for the room");
413 assert!(self.get_profile(room_id, second_user_id).await?.is_some());
414 assert!(self.get_profile(room_id, third_user_id).await?.is_some());
415 let profiles = self.get_profiles(room_id, &user_ids).await;
416 assert_eq!(profiles?.len(), 3);
417
418 user_ids.push(unknown_user_id.to_owned());
420 let member_events = self
421 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
422 .await;
423 assert_eq!(member_events?.len(), 3);
424 let profiles = self.get_profiles(room_id, &user_ids).await;
425 assert_eq!(profiles?.len(), 3);
426
427 let member_events = self
429 .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
430 room_id,
431 &[],
432 )
433 .await;
434 assert!(member_events?.is_empty());
435 let profiles = self.get_profiles(room_id, &[]).await;
436 assert!(profiles?.is_empty());
437
438 Ok(())
439 }
440
441 async fn test_filter_saving(&self) -> TestResult {
442 let filter_name = "filter_name";
443 let filter_id = "filter_id_1234";
444
445 self.set_kv_data(
446 StateStoreDataKey::Filter(filter_name),
447 StateStoreDataValue::Filter(filter_id.to_owned()),
448 )
449 .await?;
450 assert_let!(
451 Ok(Some(StateStoreDataValue::Filter(stored_filter_id))) =
452 self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await
453 );
454 assert_eq!(stored_filter_id, filter_id);
455
456 self.remove_kv_data(StateStoreDataKey::Filter(filter_name)).await?;
457 assert_matches!(self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await, Ok(None));
458
459 Ok(())
460 }
461
462 async fn test_user_avatar_url_saving(&self) -> TestResult {
463 let user_id = user_id!("@alice:example.org");
464 let url = owned_mxc_uri!("mxc://example.org/poiuyt098");
465
466 self.set_kv_data(
467 StateStoreDataKey::UserAvatarUrl(user_id),
468 StateStoreDataValue::UserAvatarUrl(url.clone()),
469 )
470 .await?;
471
472 assert_let!(
473 Ok(Some(StateStoreDataValue::UserAvatarUrl(stored_url))) =
474 self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await
475 );
476 assert_eq!(stored_url, url);
477
478 self.remove_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await?;
479 assert_matches!(
480 self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await,
481 Ok(None)
482 );
483
484 Ok(())
485 }
486
487 async fn test_server_info_saving(&self) -> TestResult {
488 let versions =
489 BTreeSet::from([MatrixVersion::V1_1, MatrixVersion::V1_2, MatrixVersion::V1_11]);
490 let server_info = ServerInfo::new(
491 versions.iter().map(|version| version.as_str().unwrap().to_owned()).collect(),
492 [("org.matrix.experimental".to_owned(), true)].into(),
493 Some(WellKnownResponse {
494 homeserver: HomeserverInfo::new("matrix.example.com".to_owned()),
495 identity_server: None,
496 tile_server: None,
497 rtc_foci: vec![RtcFocusInfo::livekit("livekit.example.com".to_owned())],
498 }),
499 );
500
501 self.set_kv_data(
502 StateStoreDataKey::ServerInfo,
503 StateStoreDataValue::ServerInfo(server_info.clone()),
504 )
505 .await?;
506
507 assert_let!(
508 Ok(Some(StateStoreDataValue::ServerInfo(stored_info))) =
509 self.get_kv_data(StateStoreDataKey::ServerInfo).await
510 );
511 assert_eq!(stored_info, server_info);
512
513 let decoded_server_info = stored_info.maybe_decode().unwrap();
514 let stored_supported = decoded_server_info.supported_versions();
515
516 assert_eq!(stored_supported.versions, versions);
517 assert_eq!(stored_supported.features.len(), 1);
518 assert!(stored_supported.features.contains(&FeatureFlag::from("org.matrix.experimental")));
519
520 self.remove_kv_data(StateStoreDataKey::ServerInfo).await?;
521 assert_matches!(self.get_kv_data(StateStoreDataKey::ServerInfo).await, Ok(None));
522
523 Ok(())
524 }
525
526 async fn test_sync_token_saving(&self) -> TestResult {
527 let sync_token_1 = "t392-516_47314_0_7_1";
528 let sync_token_2 = "t392-516_47314_0_7_2";
529
530 assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
531
532 let changes =
533 StateChanges { sync_token: Some(sync_token_1.to_owned()), ..Default::default() };
534 self.save_changes(&changes).await?;
535 assert_let!(
536 Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
537 self.get_kv_data(StateStoreDataKey::SyncToken).await
538 );
539 assert_eq!(stored_sync_token, sync_token_1);
540
541 self.set_kv_data(
542 StateStoreDataKey::SyncToken,
543 StateStoreDataValue::SyncToken(sync_token_2.to_owned()),
544 )
545 .await?;
546 assert_let!(
547 Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
548 self.get_kv_data(StateStoreDataKey::SyncToken).await
549 );
550 assert_eq!(stored_sync_token, sync_token_2);
551
552 self.remove_kv_data(StateStoreDataKey::SyncToken).await?;
553 assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
554
555 Ok(())
556 }
557
558 async fn test_utd_hook_manager_data_saving(&self) -> TestResult {
559 assert!(
561 self.get_kv_data(StateStoreDataKey::UtdHookManagerData)
562 .await
563 .expect("Could not read data")
564 .is_none(),
565 "Store was not empty at start"
566 );
567
568 let data = GrowableBloomBuilder::new().build();
570 self.set_kv_data(
571 StateStoreDataKey::UtdHookManagerData,
572 StateStoreDataValue::UtdHookManagerData(data.clone()),
573 )
574 .await
575 .expect("Could not save data");
576
577 let read_data = self
579 .get_kv_data(StateStoreDataKey::UtdHookManagerData)
580 .await
581 .expect("Could not read data")
582 .expect("no data found")
583 .into_utd_hook_manager_data()
584 .expect("not UtdHookManagerData");
585
586 assert_eq!(read_data, data);
587
588 Ok(())
589 }
590
591 async fn test_one_time_key_already_uploaded_data_saving(&self) -> TestResult {
592 assert!(
594 self.get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded).await?.is_none(),
595 "Store was not empty at start"
596 );
597
598 self.set_kv_data(
599 StateStoreDataKey::OneTimeKeyAlreadyUploaded,
600 StateStoreDataValue::OneTimeKeyAlreadyUploaded,
601 )
602 .await?;
603
604 let data = self.get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded).await?;
605 data.expect("The loaded data should be Some");
606
607 Ok(())
608 }
609
610 async fn test_stripped_member_saving(&self) -> TestResult {
611 let room_id = room_id!("!test_stripped_member_saving:localhost");
612 let user_id = user_id();
613 let second_user_id = user_id!("@second:localhost");
614 let third_user_id = user_id!("@third:localhost");
615 let unknown_user_id = user_id!("@unknown:localhost");
616
617 assert!(self.get_member_event(room_id, user_id).await?.is_none());
619 let member_events = self
620 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
621 room_id,
622 &[user_id.to_owned()],
623 )
624 .await;
625 assert!(member_events?.is_empty());
626
627 let mut changes = StateChanges::default();
629 changes
630 .stripped_state
631 .entry(room_id.to_owned())
632 .or_default()
633 .entry(StateEventType::RoomMember)
634 .or_default()
635 .insert(user_id.into(), stripped_membership_event().cast());
636 self.save_changes(&changes).await?;
637
638 assert!(self.get_member_event(room_id, user_id).await?.is_some());
639 let member_events = self
640 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
641 room_id,
642 &[user_id.to_owned()],
643 )
644 .await;
645 assert_eq!(member_events?.len(), 1);
646 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
647 assert_eq!(members.len(), 1, "We expected to find members for the room");
648
649 let mut changes = StateChanges::default();
651 let changes_members = changes
652 .stripped_state
653 .entry(room_id.to_owned())
654 .or_default()
655 .entry(StateEventType::RoomMember)
656 .or_default();
657 changes_members
658 .insert(second_user_id.into(), custom_stripped_membership_event(second_user_id).cast());
659 changes_members
660 .insert(third_user_id.into(), custom_stripped_membership_event(third_user_id).cast());
661 self.save_changes(&changes).await?;
662
663 assert!(self.get_member_event(room_id, second_user_id).await?.is_some());
664 assert!(self.get_member_event(room_id, third_user_id).await?.is_some());
665 let member_events = self
666 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
667 room_id,
668 &[user_id.to_owned(), second_user_id.to_owned(), third_user_id.to_owned()],
669 )
670 .await;
671 assert_eq!(member_events?.len(), 3);
672 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
673 assert_eq!(members.len(), 3, "We expected to find members for the room");
674
675 let member_events = self
677 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
678 room_id,
679 &[
680 user_id.to_owned(),
681 second_user_id.to_owned(),
682 third_user_id.to_owned(),
683 unknown_user_id.to_owned(),
684 ],
685 )
686 .await;
687 assert_eq!(member_events?.len(), 3);
688
689 let member_events = self
691 .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
692 room_id,
693 &[],
694 )
695 .await;
696 assert!(member_events?.is_empty());
697
698 Ok(())
699 }
700
701 async fn test_power_level_saving(&self) -> TestResult {
702 let room_id = room_id!("!test_power_level_saving:localhost");
703
704 let raw_event = power_level_event();
705 let event = raw_event.deserialize()?;
706
707 assert!(
708 self.get_state_event(room_id, StateEventType::RoomPowerLevels, "").await?.is_none()
709 );
710 let mut changes = StateChanges::default();
711 changes.add_state_event(room_id, event, raw_event);
712
713 self.save_changes(&changes).await?;
714 assert!(
715 self.get_state_event(room_id, StateEventType::RoomPowerLevels, "").await?.is_some()
716 );
717
718 Ok(())
719 }
720
721 async fn test_receipts_saving(&self) -> TestResult {
722 let room_id = room_id!("!test_receipts_saving:localhost");
723
724 let first_event_id = event_id!("$1435641916114394fHBLK:matrix.org");
725 let second_event_id = event_id!("$fHBLK1435641916114394:matrix.org");
726
727 let first_receipt_ts = uint!(1436451550);
728 let second_receipt_ts = uint!(1436451653);
729 let third_receipt_ts = uint!(1436474532);
730
731 let first_receipt_event = serde_json::from_value(json!({
732 first_event_id: {
733 "m.read": {
734 user_id(): {
735 "ts": first_receipt_ts,
736 }
737 }
738 }
739 }))?;
740
741 let second_receipt_event = serde_json::from_value(json!({
742 second_event_id: {
743 "m.read": {
744 user_id(): {
745 "ts": second_receipt_ts,
746 }
747 }
748 }
749 }))?;
750
751 let third_receipt_event = serde_json::from_value(json!({
752 second_event_id: {
753 "m.read": {
754 user_id(): {
755 "ts": third_receipt_ts,
756 "thread_id": "main",
757 }
758 }
759 }
760 }))?;
761
762 assert!(
763 self.get_user_room_receipt_event(
764 room_id,
765 ReceiptType::Read,
766 ReceiptThread::Unthreaded,
767 user_id()
768 )
769 .await
770 .expect("failed to read unthreaded user room receipt")
771 .is_none()
772 );
773 assert!(
774 self.get_event_room_receipt_events(
775 room_id,
776 ReceiptType::Read,
777 ReceiptThread::Unthreaded,
778 first_event_id
779 )
780 .await
781 .expect("failed to read unthreaded event room receipt for 1")
782 .is_empty()
783 );
784 assert!(
785 self.get_event_room_receipt_events(
786 room_id,
787 ReceiptType::Read,
788 ReceiptThread::Unthreaded,
789 second_event_id
790 )
791 .await
792 .expect("failed to read unthreaded event room receipt for 2")
793 .is_empty()
794 );
795
796 let mut changes = StateChanges::default();
797 changes.add_receipts(room_id, first_receipt_event);
798
799 self.save_changes(&changes).await?;
800 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
801 .get_user_room_receipt_event(
802 room_id,
803 ReceiptType::Read,
804 ReceiptThread::Unthreaded,
805 user_id(),
806 )
807 .await
808 .expect("failed to read unthreaded user room receipt after save")
809 .unwrap();
810 assert_eq!(unthreaded_user_receipt_event_id, first_event_id);
811 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, first_receipt_ts);
812 let first_event_unthreaded_receipts = self
813 .get_event_room_receipt_events(
814 room_id,
815 ReceiptType::Read,
816 ReceiptThread::Unthreaded,
817 first_event_id,
818 )
819 .await
820 .expect("failed to read unthreaded event room receipt for 1 after save");
821 assert_eq!(
822 first_event_unthreaded_receipts.len(),
823 1,
824 "Found a wrong number of unthreaded receipts for 1 after save"
825 );
826 assert_eq!(first_event_unthreaded_receipts[0].0, user_id());
827 assert_eq!(first_event_unthreaded_receipts[0].1.ts.unwrap().0, first_receipt_ts);
828 assert!(
829 self.get_event_room_receipt_events(
830 room_id,
831 ReceiptType::Read,
832 ReceiptThread::Unthreaded,
833 second_event_id
834 )
835 .await
836 .expect("failed to read unthreaded event room receipt for 2 after save")
837 .is_empty()
838 );
839
840 let mut changes = StateChanges::default();
841 changes.add_receipts(room_id, second_receipt_event);
842
843 self.save_changes(&changes).await.expect("Saving works");
844 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
845 .get_user_room_receipt_event(
846 room_id,
847 ReceiptType::Read,
848 ReceiptThread::Unthreaded,
849 user_id(),
850 )
851 .await
852 .expect("Getting unthreaded user room receipt after save failed")
853 .unwrap();
854 assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
855 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
856 assert!(
857 self.get_event_room_receipt_events(
858 room_id,
859 ReceiptType::Read,
860 ReceiptThread::Unthreaded,
861 first_event_id
862 )
863 .await
864 .expect("Getting unthreaded event room receipt events for first event failed")
865 .is_empty()
866 );
867 let second_event_unthreaded_receipts = self
868 .get_event_room_receipt_events(
869 room_id,
870 ReceiptType::Read,
871 ReceiptThread::Unthreaded,
872 second_event_id,
873 )
874 .await
875 .expect("Getting unthreaded event room receipt events for second event failed");
876 assert_eq!(
877 second_event_unthreaded_receipts.len(),
878 1,
879 "Found a wrong number of unthreaded receipts for second event after save"
880 );
881 assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
882 assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
883
884 assert!(
885 self.get_user_room_receipt_event(
886 room_id,
887 ReceiptType::Read,
888 ReceiptThread::Main,
889 user_id()
890 )
891 .await
892 .expect("failed to read threaded user room receipt")
893 .is_none()
894 );
895 assert!(
896 self.get_event_room_receipt_events(
897 room_id,
898 ReceiptType::Read,
899 ReceiptThread::Main,
900 second_event_id
901 )
902 .await
903 .expect("Getting threaded event room receipts for 2 failed")
904 .is_empty()
905 );
906
907 let mut changes = StateChanges::default();
908 changes.add_receipts(room_id, third_receipt_event);
909
910 self.save_changes(&changes).await.expect("Saving works");
911 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
913 .get_user_room_receipt_event(
914 room_id,
915 ReceiptType::Read,
916 ReceiptThread::Unthreaded,
917 user_id(),
918 )
919 .await
920 .expect("Getting unthreaded user room receipt after save failed")
921 .unwrap();
922 assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
923 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
924 let second_event_unthreaded_receipts = self
925 .get_event_room_receipt_events(
926 room_id,
927 ReceiptType::Read,
928 ReceiptThread::Unthreaded,
929 second_event_id,
930 )
931 .await
932 .expect("Getting unthreaded event room receipt events for second event failed");
933 assert_eq!(
934 second_event_unthreaded_receipts.len(),
935 1,
936 "Found a wrong number of unthreaded receipts for second event after save"
937 );
938 assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
939 assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
940 let (threaded_user_receipt_event_id, threaded_user_receipt) = self
942 .get_user_room_receipt_event(room_id, ReceiptType::Read, ReceiptThread::Main, user_id())
943 .await
944 .expect("Getting threaded user room receipt after save failed")
945 .unwrap();
946 assert_eq!(threaded_user_receipt_event_id, second_event_id);
947 assert_eq!(threaded_user_receipt.ts.unwrap().0, third_receipt_ts);
948 let second_event_threaded_receipts = self
949 .get_event_room_receipt_events(
950 room_id,
951 ReceiptType::Read,
952 ReceiptThread::Main,
953 second_event_id,
954 )
955 .await
956 .expect("Getting threaded event room receipt events for second event failed");
957 assert_eq!(
958 second_event_threaded_receipts.len(),
959 1,
960 "Found a wrong number of threaded receipts for second event after save"
961 );
962 assert_eq!(second_event_threaded_receipts[0].0, user_id());
963 assert_eq!(second_event_threaded_receipts[0].1.ts.unwrap().0, third_receipt_ts);
964
965 Ok(())
966 }
967
968 async fn test_custom_storage(&self) -> TestResult {
969 let key = "my_key";
970 let value = &[0, 1, 2, 3];
971
972 self.set_custom_value(key.as_bytes(), value.to_vec()).await?;
973
974 let read = self.get_custom_value(key.as_bytes()).await?;
975
976 assert_eq!(Some(value.as_ref()), read.as_deref());
977
978 Ok(())
979 }
980
981 async fn test_stripped_non_stripped(&self) -> TestResult {
982 let room_id = room_id!("!test_stripped_non_stripped:localhost");
983 let user_id = user_id();
984
985 assert!(self.get_member_event(room_id, user_id).await?.is_none());
986 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 0);
987
988 let mut changes = StateChanges::default();
989 changes
990 .state
991 .entry(room_id.to_owned())
992 .or_default()
993 .entry(StateEventType::RoomMember)
994 .or_default()
995 .insert(user_id.into(), membership_event().cast());
996 changes.add_room(RoomInfo::new(room_id, RoomState::Left));
997 self.save_changes(&changes).await?;
998
999 let member_event = self.get_member_event(room_id, user_id).await?.unwrap().deserialize()?;
1000 assert!(matches!(member_event, MemberEvent::Sync(_)));
1001 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 1);
1002
1003 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
1004 assert_eq!(members, vec![user_id.to_owned()]);
1005
1006 let mut changes = StateChanges::default();
1007 changes.add_stripped_member(room_id, user_id, custom_stripped_membership_event(user_id));
1008 changes.add_room(RoomInfo::new(room_id, RoomState::Invited));
1009 self.save_changes(&changes).await?;
1010
1011 let member_event = self.get_member_event(room_id, user_id).await?.unwrap().deserialize()?;
1012 assert!(matches!(member_event, MemberEvent::Stripped(_)));
1013 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 1);
1014
1015 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
1016 assert_eq!(members, vec![user_id.to_owned()]);
1017
1018 Ok(())
1019 }
1020
1021 async fn test_room_removal(&self) -> TestResult {
1022 let room_id = room_id();
1023 let user_id = user_id();
1024 let display_name = DisplayName::new("example");
1025 let stripped_room_id = stripped_room_id();
1026
1027 self.populate().await?;
1028
1029 {
1030 let txn = TransactionId::new();
1032 let ev =
1033 SerializableEventContent::new(&RoomMessageEventContent::text_plain("sup").into())?;
1034 self.save_send_queue_request(
1035 room_id,
1036 txn.clone(),
1037 MilliSecondsSinceUnixEpoch::now(),
1038 ev.into(),
1039 0,
1040 )
1041 .await?;
1042
1043 self.save_dependent_queued_request(
1045 room_id,
1046 &txn,
1047 ChildTransactionId::new(),
1048 MilliSecondsSinceUnixEpoch::now(),
1049 DependentQueuedRequestKind::RedactEvent,
1050 )
1051 .await?;
1052 }
1053
1054 self.remove_room(room_id).await?;
1055
1056 assert_eq!(
1057 self.get_room_infos(&RoomLoadSettings::default()).await?.len(),
1058 1,
1059 "room is still there"
1060 );
1061
1062 assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_none());
1063 assert!(
1064 self.get_state_events(room_id, StateEventType::RoomTopic).await?.is_empty(),
1065 "still state events found"
1066 );
1067 assert!(self.get_profile(room_id, user_id).await?.is_none());
1068 assert!(self.get_member_event(room_id, user_id).await?.is_none());
1069 assert!(
1070 self.get_user_ids(room_id, RoomMemberships::empty()).await?.is_empty(),
1071 "still user ids found"
1072 );
1073 assert!(
1074 self.get_user_ids(room_id, RoomMemberships::INVITE).await?.is_empty(),
1075 "still invited user ids found"
1076 );
1077 assert!(
1078 self.get_user_ids(room_id, RoomMemberships::JOIN).await?.is_empty(),
1079 "still joined users found"
1080 );
1081 assert!(
1082 self.get_users_with_display_name(room_id, &display_name).await?.is_empty(),
1083 "still display names found"
1084 );
1085 assert!(
1086 self.get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
1087 .await?
1088 .is_none()
1089 );
1090 assert!(
1091 self.get_user_room_receipt_event(
1092 room_id,
1093 ReceiptType::Read,
1094 ReceiptThread::Unthreaded,
1095 user_id
1096 )
1097 .await?
1098 .is_none()
1099 );
1100 assert!(
1101 self.get_event_room_receipt_events(
1102 room_id,
1103 ReceiptType::Read,
1104 ReceiptThread::Unthreaded,
1105 first_receipt_event_id()
1106 )
1107 .await?
1108 .is_empty(),
1109 "still event recepts in the store"
1110 );
1111 assert!(self.load_send_queue_requests(room_id).await?.is_empty());
1112 assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1113
1114 self.remove_room(stripped_room_id).await?;
1115
1116 assert!(
1117 self.get_room_infos(&RoomLoadSettings::default()).await?.is_empty(),
1118 "still room info found"
1119 );
1120 Ok(())
1121 }
1122
1123 async fn test_profile_removal(&self) -> TestResult {
1124 let room_id = room_id();
1125
1126 let user_id = user_id();
1128 let invited_user_id = invited_user_id();
1129
1130 self.populate().await?;
1131
1132 let new_invite_member_json = json!({
1133 "content": {
1134 "avatar_url": "mxc://localhost/SEsfnsuifSDFSSEG",
1135 "displayname": "example after update",
1136 "membership": "invite",
1137 "reason": "Looking for support"
1138 },
1139 "event_id": "$143273582443PhrSm:localhost",
1140 "origin_server_ts": 1432735824,
1141 "room_id": room_id,
1142 "sender": user_id,
1143 "state_key": invited_user_id,
1144 "type": "m.room.member",
1145 });
1146 let new_invite_member_event: SyncRoomMemberEvent =
1147 serde_json::from_value(new_invite_member_json.clone())?;
1148
1149 let mut changes = StateChanges {
1150 profiles_to_delete: [(
1152 room_id.to_owned(),
1153 vec![user_id.to_owned(), invited_user_id.to_owned()],
1154 )]
1155 .into(),
1156
1157 profiles: {
1159 let mut map = BTreeMap::default();
1160 map.insert(
1161 room_id.to_owned(),
1162 [(invited_user_id.to_owned(), new_invite_member_event.into())]
1163 .into_iter()
1164 .collect(),
1165 );
1166 map
1167 },
1168
1169 ..StateChanges::default()
1170 };
1171
1172 let raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(new_invite_member_json)
1173 .expect("can create sync-state-event for topic");
1174 let event = raw.deserialize()?;
1175 changes.add_state_event(room_id, event, raw);
1176
1177 self.save_changes(&changes).await?;
1178
1179 assert!(self.get_profile(room_id, user_id).await?.is_none());
1181 assert!(self.get_member_event(room_id, user_id).await?.is_some());
1182
1183 let invited_member_event = self.get_profile(room_id, invited_user_id).await?.unwrap();
1185 assert_eq!(
1186 invited_member_event.as_original().unwrap().content.displayname.as_deref(),
1187 Some("example after update")
1188 );
1189 assert!(self.get_member_event(room_id, invited_user_id).await?.is_some());
1190
1191 Ok(())
1192 }
1193
1194 async fn test_presence_saving(&self) -> TestResult {
1195 let user_id = user_id();
1196 let second_user_id = user_id!("@second:localhost");
1197 let third_user_id = user_id!("@third:localhost");
1198 let unknown_user_id = user_id!("@unknown:localhost");
1199
1200 let mut user_ids = vec![user_id.to_owned()];
1202 let presence_event = self.get_presence_event(user_id).await;
1203 assert!(presence_event?.is_none());
1204 let presence_events = self.get_presence_events(&user_ids).await;
1205 assert!(presence_events?.is_empty());
1206
1207 let mut changes = StateChanges::default();
1209 changes.presence.insert(user_id.to_owned(), custom_presence_event(user_id));
1210 self.save_changes(&changes).await?;
1211
1212 let presence_event = self.get_presence_event(user_id).await;
1213 assert!(presence_event?.is_some());
1214 let presence_events = self.get_presence_events(&user_ids).await;
1215 assert_eq!(presence_events?.len(), 1);
1216
1217 let mut changes = StateChanges::default();
1219 changes.presence.insert(second_user_id.to_owned(), custom_presence_event(second_user_id));
1220 changes.presence.insert(third_user_id.to_owned(), custom_presence_event(third_user_id));
1221 self.save_changes(&changes).await?;
1222
1223 user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
1224 let presence_event = self.get_presence_event(second_user_id).await;
1225 assert!(presence_event?.is_some());
1226 let presence_event = self.get_presence_event(third_user_id).await;
1227 assert!(presence_event?.is_some());
1228 let presence_events = self.get_presence_events(&user_ids).await;
1229 assert_eq!(presence_events?.len(), 3);
1230
1231 user_ids.push(unknown_user_id.to_owned());
1233 let member_events = self.get_presence_events(&user_ids).await;
1234 assert_eq!(member_events?.len(), 3);
1235
1236 let presence_events = self.get_presence_events(&[]).await;
1238 assert!(presence_events?.is_empty());
1239
1240 Ok(())
1241 }
1242
1243 async fn test_display_names_saving(&self) -> TestResult {
1244 let room_id = room_id!("!test_display_names_saving:localhost");
1245 let user_id = user_id();
1246 let user_display_name = DisplayName::new("User");
1247 let second_user_id = user_id!("@second:localhost");
1248 let third_user_id = user_id!("@third:localhost");
1249 let other_display_name = DisplayName::new("Raoul");
1250 let unknown_display_name = DisplayName::new("Unknown");
1251
1252 let mut display_names = vec![user_display_name.to_owned()];
1254 let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1255 assert!(users.is_empty());
1256 let names = self.get_users_with_display_names(room_id, &display_names).await?;
1257 assert!(names.is_empty());
1258
1259 let mut changes = StateChanges::default();
1261 changes
1262 .ambiguity_maps
1263 .entry(room_id.to_owned())
1264 .or_default()
1265 .insert(user_display_name.to_owned(), [user_id.to_owned()].into());
1266 self.save_changes(&changes).await?;
1267
1268 let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1269 assert_eq!(users.len(), 1);
1270 let names = self.get_users_with_display_names(room_id, &display_names).await?;
1271 assert_eq!(names.len(), 1);
1272 assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1273
1274 let mut changes = StateChanges::default();
1276 changes.ambiguity_maps.entry(room_id.to_owned()).or_default().insert(
1277 other_display_name.to_owned(),
1278 [second_user_id.to_owned(), third_user_id.to_owned()].into(),
1279 );
1280 self.save_changes(&changes).await?;
1281
1282 display_names.push(other_display_name.to_owned());
1283 let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1284 assert_eq!(users.len(), 1);
1285 let users = self.get_users_with_display_name(room_id, &other_display_name).await?;
1286 assert_eq!(users.len(), 2);
1287 let names = self.get_users_with_display_names(room_id, &display_names).await?;
1288 assert_eq!(names.len(), 2);
1289 assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1290 assert_eq!(names.get(&other_display_name).unwrap().len(), 2);
1291
1292 display_names.push(unknown_display_name.to_owned());
1294 let names = self.get_users_with_display_names(room_id, &display_names).await?;
1295 assert_eq!(names.len(), 2);
1296
1297 let names = self.get_users_with_display_names(room_id, &[]).await?;
1299 assert!(names.is_empty());
1300
1301 Ok(())
1302 }
1303
1304 #[allow(clippy::needless_range_loop)]
1305 async fn test_send_queue(&self) -> TestResult {
1306 let room_id = room_id!("!test_send_queue:localhost");
1307
1308 let events = self.load_send_queue_requests(room_id).await?;
1310 assert!(events.is_empty());
1311
1312 let txn0 = TransactionId::new();
1314 let event0 =
1315 SerializableEventContent::new(&RoomMessageEventContent::text_plain("msg0").into())?;
1316 self.save_send_queue_request(
1317 room_id,
1318 txn0.clone(),
1319 MilliSecondsSinceUnixEpoch::now(),
1320 event0.into(),
1321 0,
1322 )
1323 .await?;
1324
1325 let pending = self.load_send_queue_requests(room_id).await?;
1327
1328 assert_eq!(pending.len(), 1);
1329 {
1330 assert_eq!(pending[0].transaction_id, txn0);
1331
1332 let deserialized = pending[0].as_event().unwrap().deserialize()?;
1333 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1334 assert_eq!(content.body(), "msg0");
1335
1336 assert!(!pending[0].is_wedged());
1337 }
1338
1339 for i in 1..=3 {
1341 let txn = TransactionId::new();
1342 let event = SerializableEventContent::new(
1343 &RoomMessageEventContent::text_plain(format!("msg{i}")).into(),
1344 )?;
1345
1346 self.save_send_queue_request(
1347 room_id,
1348 txn,
1349 MilliSecondsSinceUnixEpoch::now(),
1350 event.into(),
1351 0,
1352 )
1353 .await?;
1354 }
1355
1356 let pending = self.load_send_queue_requests(room_id).await?;
1358
1359 assert_eq!(pending.len(), 4);
1361
1362 assert_eq!(pending[0].transaction_id, txn0);
1363
1364 for i in 0..4 {
1365 let deserialized = pending[i].as_event().unwrap().deserialize()?;
1366 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1367 assert_eq!(content.body(), format!("msg{i}"));
1368 assert!(!pending[i].is_wedged());
1369 }
1370
1371 let txn2 = &pending[2].transaction_id;
1373 self.update_send_queue_request_status(
1374 room_id,
1375 txn2,
1376 Some(QueueWedgeError::GenericApiError { msg: "Oops".to_owned() }),
1377 )
1378 .await?;
1379
1380 let pending = self.load_send_queue_requests(room_id).await?;
1382
1383 assert_eq!(pending.len(), 4);
1385 assert_eq!(pending[0].transaction_id, txn0);
1386 assert_eq!(pending[2].transaction_id, *txn2);
1387 assert!(pending[2].is_wedged());
1388 let error = pending[2].clone().error.unwrap();
1389 let generic_error = assert_matches!(error, QueueWedgeError::GenericApiError { msg } => msg);
1390 assert_eq!(generic_error, "Oops");
1391 for i in 0..4 {
1392 if i != 2 {
1393 assert!(!pending[i].is_wedged());
1394 }
1395 }
1396
1397 let event0 = SerializableEventContent::new(
1399 &RoomMessageEventContent::text_plain("wow that's a cool test").into(),
1400 )?;
1401 self.update_send_queue_request(room_id, txn2, event0.into()).await?;
1402
1403 let pending = self.load_send_queue_requests(room_id).await?;
1405
1406 assert_eq!(pending.len(), 4);
1407 {
1408 assert_eq!(pending[2].transaction_id, *txn2);
1409
1410 let deserialized = pending[2].as_event().unwrap().deserialize()?;
1411 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1412 assert_eq!(content.body(), "wow that's a cool test");
1413
1414 assert!(!pending[2].is_wedged());
1415
1416 for i in 0..4 {
1417 if i != 2 {
1418 let deserialized = pending[i].as_event().unwrap().deserialize()?;
1419 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1420 assert_eq!(content.body(), format!("msg{i}"));
1421
1422 assert!(!pending[i].is_wedged());
1423 }
1424 }
1425 }
1426
1427 self.remove_send_queue_request(room_id, &txn0).await?;
1429
1430 let pending = self.load_send_queue_requests(room_id).await?;
1432
1433 assert_eq!(pending.len(), 3);
1434 assert_eq!(pending[1].transaction_id, *txn2);
1435 for i in 0..3 {
1436 assert_ne!(pending[i].transaction_id, txn0);
1437 }
1438
1439 let room_id2 = room_id!("!test_send_queue_two:localhost");
1444 {
1445 let txn = TransactionId::new();
1446 let event = SerializableEventContent::new(
1447 &RoomMessageEventContent::text_plain("room2").into(),
1448 )?;
1449 self.save_send_queue_request(
1450 room_id2,
1451 txn.clone(),
1452 MilliSecondsSinceUnixEpoch::now(),
1453 event.into(),
1454 0,
1455 )
1456 .await?;
1457 }
1458
1459 {
1461 let room_id3 = room_id!("!test_send_queue_three:localhost");
1462 let txn = TransactionId::new();
1463 let event = SerializableEventContent::new(
1464 &RoomMessageEventContent::text_plain("room3").into(),
1465 )?;
1466 self.save_send_queue_request(
1467 room_id3,
1468 txn.clone(),
1469 MilliSecondsSinceUnixEpoch::now(),
1470 event.into(),
1471 0,
1472 )
1473 .await?;
1474
1475 self.remove_send_queue_request(room_id3, &txn).await?;
1476 }
1477
1478 let outstanding_rooms = self.load_rooms_with_unsent_requests().await?;
1481 assert_eq!(outstanding_rooms.len(), 2);
1482 assert!(outstanding_rooms.iter().any(|room| room == room_id));
1483 assert!(outstanding_rooms.iter().any(|room| room == room_id2));
1484
1485 Ok(())
1486 }
1487
1488 async fn test_send_queue_priority(&self) -> TestResult {
1489 let room_id = room_id!("!test_send_queue:localhost");
1490
1491 let events = self.load_send_queue_requests(room_id).await?;
1493 assert!(events.is_empty());
1494
1495 let low0_txn = TransactionId::new();
1497 let ev0 =
1498 SerializableEventContent::new(&RoomMessageEventContent::text_plain("low0").into())?;
1499 self.save_send_queue_request(
1500 room_id,
1501 low0_txn.clone(),
1502 MilliSecondsSinceUnixEpoch::now(),
1503 ev0.into(),
1504 2,
1505 )
1506 .await?;
1507
1508 let high_txn = TransactionId::new();
1510 let ev1 =
1511 SerializableEventContent::new(&RoomMessageEventContent::text_plain("high").into())?;
1512 self.save_send_queue_request(
1513 room_id,
1514 high_txn.clone(),
1515 MilliSecondsSinceUnixEpoch::now(),
1516 ev1.into(),
1517 10,
1518 )
1519 .await?;
1520
1521 let low1_txn = TransactionId::new();
1523 let ev2 =
1524 SerializableEventContent::new(&RoomMessageEventContent::text_plain("low1").into())?;
1525 self.save_send_queue_request(
1526 room_id,
1527 low1_txn.clone(),
1528 MilliSecondsSinceUnixEpoch::now(),
1529 ev2.into(),
1530 2,
1531 )
1532 .await?;
1533
1534 let pending = self.load_send_queue_requests(room_id).await?;
1537
1538 assert_eq!(pending.len(), 3);
1539 {
1540 assert_eq!(pending[0].transaction_id, high_txn);
1541
1542 let deserialized = pending[0].as_event().unwrap().deserialize()?;
1543 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1544 assert_eq!(content.body(), "high");
1545 }
1546
1547 {
1548 assert_eq!(pending[1].transaction_id, low0_txn);
1549
1550 let deserialized = pending[1].as_event().unwrap().deserialize()?;
1551 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1552 assert_eq!(content.body(), "low0");
1553 }
1554
1555 {
1556 assert_eq!(pending[2].transaction_id, low1_txn);
1557
1558 let deserialized = pending[2].as_event().unwrap().deserialize()?;
1559 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1560 assert_eq!(content.body(), "low1");
1561 }
1562
1563 Ok(())
1564 }
1565
1566 async fn test_send_queue_dependents(&self) -> TestResult {
1567 let room_id = room_id!("!test_send_queue_dependents:localhost");
1568
1569 let txn0 = TransactionId::new();
1571 let event0 =
1572 SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into())?;
1573 self.save_send_queue_request(
1574 room_id,
1575 txn0.clone(),
1576 MilliSecondsSinceUnixEpoch::now(),
1577 event0.into(),
1578 0,
1579 )
1580 .await?;
1581
1582 assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1584
1585 let child_txn = ChildTransactionId::new();
1587 self.save_dependent_queued_request(
1588 room_id,
1589 &txn0,
1590 child_txn.clone(),
1591 MilliSecondsSinceUnixEpoch::now(),
1592 DependentQueuedRequestKind::RedactEvent,
1593 )
1594 .await?;
1595
1596 let dependents = self.load_dependent_queued_requests(room_id).await?;
1598 assert_eq!(dependents.len(), 1);
1599 assert_eq!(dependents[0].parent_transaction_id, txn0);
1600 assert_eq!(dependents[0].own_transaction_id, child_txn);
1601 assert!(dependents[0].parent_key.is_none());
1602 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1603
1604 let event_id = owned_event_id!("$1");
1606 let num_updated = self
1607 .mark_dependent_queued_requests_as_ready(
1608 room_id,
1609 &txn0,
1610 SentRequestKey::Event(event_id.clone()),
1611 )
1612 .await?;
1613 assert_eq!(num_updated, 1);
1614
1615 let dependents = self.load_dependent_queued_requests(room_id).await?;
1617 assert_eq!(dependents.len(), 1);
1618 assert_eq!(dependents[0].parent_transaction_id, txn0);
1619 assert_eq!(dependents[0].own_transaction_id, child_txn);
1620 assert_matches!(dependents[0].parent_key.as_ref(), Some(SentRequestKey::Event(eid)) => {
1621 assert_eq!(*eid, event_id);
1622 });
1623 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1624
1625 let removed = self
1627 .remove_dependent_queued_request(room_id, &dependents[0].own_transaction_id)
1628 .await?;
1629 assert!(removed);
1630
1631 assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1633
1634 let txn1 = TransactionId::new();
1637 let event1 =
1638 SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into())?;
1639 self.save_send_queue_request(
1640 room_id,
1641 txn1.clone(),
1642 MilliSecondsSinceUnixEpoch::now(),
1643 event1.into(),
1644 0,
1645 )
1646 .await?;
1647
1648 self.save_dependent_queued_request(
1649 room_id,
1650 &txn0,
1651 ChildTransactionId::new(),
1652 MilliSecondsSinceUnixEpoch::now(),
1653 DependentQueuedRequestKind::RedactEvent,
1654 )
1655 .await?;
1656 assert_eq!(self.load_dependent_queued_requests(room_id).await?.len(), 1);
1657
1658 self.save_dependent_queued_request(
1659 room_id,
1660 &txn1,
1661 ChildTransactionId::new(),
1662 MilliSecondsSinceUnixEpoch::now(),
1663 DependentQueuedRequestKind::EditEvent {
1664 new_content: SerializableEventContent::new(
1665 &RoomMessageEventContent::text_plain("edit").into(),
1666 )?,
1667 },
1668 )
1669 .await?;
1670 assert_eq!(self.load_dependent_queued_requests(room_id).await?.len(), 2);
1671
1672 let removed = self.remove_send_queue_request(room_id, &txn0).await?;
1674 assert!(removed);
1675
1676 let dependents = self.load_dependent_queued_requests(room_id).await?;
1678 assert_eq!(dependents.len(), 2);
1679
1680 Ok(())
1681 }
1682
1683 async fn test_update_send_queue_dependent(&self) -> TestResult {
1684 let room_id = room_id!("!test_send_queue_dependents:localhost");
1685
1686 let txn = TransactionId::new();
1687
1688 let child_txn = ChildTransactionId::new();
1690
1691 self.save_dependent_queued_request(
1692 room_id,
1693 &txn,
1694 child_txn.clone(),
1695 MilliSecondsSinceUnixEpoch::now(),
1696 DependentQueuedRequestKind::RedactEvent,
1697 )
1698 .await?;
1699
1700 let dependents = self.load_dependent_queued_requests(room_id).await?;
1702 assert_eq!(dependents.len(), 1);
1703 assert_eq!(dependents[0].parent_transaction_id, txn);
1704 assert_eq!(dependents[0].own_transaction_id, child_txn);
1705 assert!(dependents[0].parent_key.is_none());
1706 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1707
1708 self.update_dependent_queued_request(
1710 room_id,
1711 &child_txn,
1712 DependentQueuedRequestKind::ReactEvent { key: "👍".to_owned() },
1713 )
1714 .await?;
1715
1716 let dependents = self.load_dependent_queued_requests(room_id).await?;
1718 assert_eq!(dependents.len(), 1);
1719 assert_eq!(dependents[0].parent_transaction_id, txn);
1720 assert_eq!(dependents[0].own_transaction_id, child_txn);
1721 assert!(dependents[0].parent_key.is_none());
1722 assert_matches!(
1723 &dependents[0].kind,
1724 DependentQueuedRequestKind::ReactEvent { key } => {
1725 assert_eq!(key, "👍");
1726 }
1727 );
1728
1729 Ok(())
1730 }
1731
1732 async fn test_get_room_infos(&self) -> TestResult {
1733 let room_id_0 = room_id!("!r0");
1734 let room_id_1 = room_id!("!r1");
1735 let room_id_2 = room_id!("!r2");
1736
1737 {
1739 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 0);
1740 }
1741
1742 let mut changes = StateChanges::default();
1744 changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
1745 changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
1746 self.save_changes(&changes).await?;
1747
1748 {
1750 let mut all_rooms = self.get_room_infos(&RoomLoadSettings::All).await?;
1751
1752 all_rooms.sort_by(|a, b| a.room_id.cmp(&b.room_id));
1755
1756 assert_eq!(all_rooms.len(), 2);
1757 assert_eq!(all_rooms[0].room_id, room_id_0);
1758 assert_eq!(all_rooms[1].room_id, room_id_1);
1759 }
1760
1761 {
1763 let all_rooms =
1764 self.get_room_infos(&RoomLoadSettings::One(room_id_1.to_owned())).await?;
1765
1766 assert_eq!(all_rooms.len(), 1);
1767 assert_eq!(all_rooms[0].room_id, room_id_1);
1768 }
1769
1770 {
1773 let all_rooms =
1774 self.get_room_infos(&RoomLoadSettings::One(room_id_2.to_owned())).await?;
1775
1776 assert_eq!(all_rooms.len(), 0);
1777 }
1778
1779 Ok(())
1780 }
1781
1782 async fn test_thread_subscriptions(&self) -> TestResult {
1783 let first_thread = event_id!("$t1");
1784 let second_thread = event_id!("$t2");
1785
1786 let maybe_status = self.load_thread_subscription(room_id(), first_thread).await?;
1788 assert!(maybe_status.is_none());
1789
1790 let maybe_status = self.load_thread_subscription(room_id(), second_thread).await?;
1791 assert!(maybe_status.is_none());
1792
1793 self.upsert_thread_subscription(
1795 room_id(),
1796 first_thread,
1797 ThreadSubscription { automatic: true },
1798 )
1799 .await?;
1800
1801 self.upsert_thread_subscription(
1802 room_id(),
1803 second_thread,
1804 ThreadSubscription { automatic: false },
1805 )
1806 .await?;
1807
1808 let maybe_status = self.load_thread_subscription(room_id(), first_thread).await?;
1810 assert_eq!(maybe_status, Some(ThreadSubscription { automatic: true }));
1811 let maybe_status = self.load_thread_subscription(room_id(), second_thread).await?;
1812 assert_eq!(maybe_status, Some(ThreadSubscription { automatic: false }));
1813
1814 self.upsert_thread_subscription(
1816 room_id(),
1817 first_thread,
1818 ThreadSubscription { automatic: false },
1819 )
1820 .await?;
1821
1822 let maybe_status = self.load_thread_subscription(room_id(), first_thread).await?;
1824 assert_eq!(maybe_status, Some(ThreadSubscription { automatic: false }));
1825 let maybe_status = self.load_thread_subscription(room_id(), second_thread).await?;
1827 assert_eq!(maybe_status, Some(ThreadSubscription { automatic: false }));
1828
1829 self.remove_thread_subscription(room_id(), second_thread).await?;
1831
1832 let maybe_status = self.load_thread_subscription(room_id(), second_thread).await?;
1834 assert_eq!(maybe_status, None);
1835 let maybe_status = self.load_thread_subscription(room_id(), first_thread).await?;
1837 assert_eq!(maybe_status, Some(ThreadSubscription { automatic: false }));
1838
1839 self.remove_thread_subscription(room_id(), second_thread).await?;
1841
1842 Ok(())
1843 }
1844}
1845
1846#[allow(unused_macros, unused_extern_crates)]
1872#[macro_export]
1873macro_rules! statestore_integration_tests {
1874 () => {
1875 mod statestore_integration_tests {
1876 use matrix_sdk_test::{TestResult, async_test};
1877 use $crate::store::{IntoStateStore, StateStoreIntegrationTests};
1878
1879 use super::get_store;
1880
1881 #[async_test]
1882 async fn test_topic_redaction() -> TestResult {
1883 let store = get_store().await?.into_state_store();
1884 store.test_topic_redaction().await
1885 }
1886
1887 #[async_test]
1888 async fn test_populate_store() -> TestResult {
1889 let store = get_store().await?.into_state_store();
1890 store.test_populate_store().await
1891 }
1892
1893 #[async_test]
1894 async fn test_member_saving() -> TestResult {
1895 let store = get_store().await?.into_state_store();
1896 store.test_member_saving().await
1897 }
1898
1899 #[async_test]
1900 async fn test_filter_saving() -> TestResult {
1901 let store = get_store().await?.into_state_store();
1902 store.test_filter_saving().await
1903 }
1904
1905 #[async_test]
1906 async fn test_user_avatar_url_saving() -> TestResult {
1907 let store = get_store().await?.into_state_store();
1908 store.test_user_avatar_url_saving().await
1909 }
1910
1911 #[async_test]
1912 async fn test_server_info_saving() -> TestResult {
1913 let store = get_store().await?.into_state_store();
1914 store.test_server_info_saving().await
1915 }
1916
1917 #[async_test]
1918 async fn test_sync_token_saving() -> TestResult {
1919 let store = get_store().await?.into_state_store();
1920 store.test_sync_token_saving().await
1921 }
1922
1923 #[async_test]
1924 async fn test_utd_hook_manager_data_saving() -> TestResult {
1925 let store = get_store().await?.into_state_store();
1926 store.test_utd_hook_manager_data_saving().await
1927 }
1928
1929 #[async_test]
1930 async fn test_one_time_key_already_uploaded_data_saving() -> TestResult {
1931 let store = get_store().await?.into_state_store();
1932 store.test_one_time_key_already_uploaded_data_saving().await
1933 }
1934
1935 #[async_test]
1936 async fn test_stripped_member_saving() -> TestResult {
1937 let store = get_store().await?.into_state_store();
1938 store.test_stripped_member_saving().await
1939 }
1940
1941 #[async_test]
1942 async fn test_power_level_saving() -> TestResult {
1943 let store = get_store().await?.into_state_store();
1944 store.test_power_level_saving().await
1945 }
1946
1947 #[async_test]
1948 async fn test_receipts_saving() -> TestResult {
1949 let store = get_store().await?.into_state_store();
1950 store.test_receipts_saving().await
1951 }
1952
1953 #[async_test]
1954 async fn test_custom_storage() -> TestResult {
1955 let store = get_store().await?.into_state_store();
1956 store.test_custom_storage().await
1957 }
1958
1959 #[async_test]
1960 async fn test_stripped_non_stripped() -> TestResult {
1961 let store = get_store().await?.into_state_store();
1962 store.test_stripped_non_stripped().await
1963 }
1964
1965 #[async_test]
1966 async fn test_room_removal() -> TestResult {
1967 let store = get_store().await?.into_state_store();
1968 store.test_room_removal().await
1969 }
1970
1971 #[async_test]
1972 async fn test_profile_removal() -> TestResult {
1973 let store = get_store().await?.into_state_store();
1974 store.test_profile_removal().await
1975 }
1976
1977 #[async_test]
1978 async fn test_presence_saving() -> TestResult {
1979 let store = get_store().await?.into_state_store();
1980 store.test_presence_saving().await
1981 }
1982
1983 #[async_test]
1984 async fn test_display_names_saving() -> TestResult {
1985 let store = get_store().await?.into_state_store();
1986 store.test_display_names_saving().await
1987 }
1988
1989 #[async_test]
1990 async fn test_send_queue() -> TestResult {
1991 let store = get_store().await?.into_state_store();
1992 store.test_send_queue().await
1993 }
1994
1995 #[async_test]
1996 async fn test_send_queue_priority() -> TestResult {
1997 let store = get_store().await?.into_state_store();
1998 store.test_send_queue_priority().await
1999 }
2000
2001 #[async_test]
2002 async fn test_send_queue_dependents() -> TestResult {
2003 let store = get_store().await?.into_state_store();
2004 store.test_send_queue_dependents().await
2005 }
2006
2007 #[async_test]
2008 async fn test_update_send_queue_dependent() -> TestResult {
2009 let store = get_store().await?.into_state_store();
2010 store.test_update_send_queue_dependent().await
2011 }
2012
2013 #[async_test]
2014 async fn test_get_room_infos() -> TestResult {
2015 let store = get_store().await?.into_state_store();
2016 store.test_get_room_infos().await
2017 }
2018
2019 #[async_test]
2020 async fn test_thread_subscriptions() -> TestResult {
2021 let store = get_store().await?.into_state_store();
2022 store.test_thread_subscriptions().await
2023 }
2024 }
2025 };
2026}
2027
2028fn user_id() -> &'static UserId {
2029 user_id!("@example:localhost")
2030}
2031
2032fn invited_user_id() -> &'static UserId {
2033 user_id!("@invited:localhost")
2034}
2035
2036fn room_id() -> &'static RoomId {
2037 room_id!("!test:localhost")
2038}
2039
2040fn stripped_room_id() -> &'static RoomId {
2041 room_id!("!stripped:localhost")
2042}
2043
2044fn first_receipt_event_id() -> &'static EventId {
2045 event_id!("$example")
2046}
2047
2048fn power_level_event() -> Raw<AnySyncStateEvent> {
2049 let content = RoomPowerLevelsEventContent::new(&AuthorizationRules::V1);
2050
2051 let event = json!({
2052 "event_id": "$h29iv0s8:example.com",
2053 "content": content,
2054 "sender": user_id(),
2055 "type": "m.room.power_levels",
2056 "origin_server_ts": 0u64,
2057 "state_key": "",
2058 });
2059
2060 serde_json::from_value(event).unwrap()
2061}
2062
2063fn stripped_membership_event() -> Raw<StrippedRoomMemberEvent> {
2064 custom_stripped_membership_event(user_id())
2065}
2066
2067fn custom_stripped_membership_event(user_id: &UserId) -> Raw<StrippedRoomMemberEvent> {
2068 let ev_json = json!({
2069 "type": "m.room.member",
2070 "content": RoomMemberEventContent::new(MembershipState::Join),
2071 "sender": user_id,
2072 "state_key": user_id,
2073 });
2074
2075 Raw::new(&ev_json).unwrap().cast_unchecked()
2076}
2077
2078fn membership_event() -> Raw<SyncRoomMemberEvent> {
2079 custom_membership_event(user_id(), event_id!("$h29iv0s8:example.com"))
2080}
2081
2082fn custom_membership_event(user_id: &UserId, event_id: &EventId) -> Raw<SyncRoomMemberEvent> {
2083 let ev_json = json!({
2084 "type": "m.room.member",
2085 "content": RoomMemberEventContent::new(MembershipState::Join),
2086 "event_id": event_id,
2087 "origin_server_ts": 198,
2088 "sender": user_id,
2089 "state_key": user_id,
2090 });
2091
2092 Raw::new(&ev_json).unwrap().cast_unchecked()
2093}
2094
2095fn custom_presence_event(user_id: &UserId) -> Raw<PresenceEvent> {
2096 let ev_json = json!({
2097 "content": {
2098 "presence": "online"
2099 },
2100 "sender": user_id,
2101 });
2102
2103 Raw::new(&ev_json).unwrap().cast_unchecked()
2104}