1use std::{
4 collections::{BTreeMap, BTreeSet, HashMap},
5 str::FromStr,
6};
7
8use assert_matches::assert_matches;
9use assert_matches2::assert_let;
10use growable_bloom_filter::GrowableBloomBuilder;
11use matrix_sdk_test::{TestResult, event_factory::EventFactory};
12use ruma::{
13 EventId, MilliSecondsSinceUnixEpoch, OwnedUserId, RoomId, TransactionId, UserId,
14 api::{
15 FeatureFlag, MatrixVersion,
16 client::discovery::discover_homeserver::{HomeserverInfo, RtcFocusInfo},
17 },
18 event_id,
19 events::{
20 AnyGlobalAccountDataEvent, AnyMessageLikeEventContent, AnyRoomAccountDataEvent,
21 AnyStrippedStateEvent, AnySyncStateEvent, GlobalAccountDataEventType,
22 RoomAccountDataEventType, StateEventType, SyncStateEvent,
23 presence::PresenceEvent,
24 receipt::{ReceiptThread, ReceiptType},
25 room::{
26 member::{
27 MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent,
28 SyncRoomMemberEvent,
29 },
30 message::RoomMessageEventContent,
31 power_levels::RoomPowerLevelsEventContent,
32 redaction::SyncRoomRedactionEvent,
33 topic::RoomTopicEventContent,
34 },
35 tag::{TagInfo, TagName, Tags, UserTagName},
36 },
37 mxc_uri, owned_event_id, owned_mxc_uri,
38 presence::PresenceState,
39 push::Ruleset,
40 room_id,
41 room_version_rules::AuthorizationRules,
42 serde::Raw,
43 uint, user_id,
44};
45use serde_json::json;
46
47use super::{
48 DependentQueuedRequestKind, DisplayName, DynStateStore, RoomLoadSettings,
49 SupportedVersionsResponse, TtlStoreValue, WellKnownResponse, send_queue::SentRequestKey,
50};
51use crate::{
52 RoomInfo, RoomMemberships, RoomState, StateChanges, StateStoreDataKey, StateStoreDataValue,
53 deserialized_responses::MemberEvent,
54 store::{
55 ChildTransactionId, QueueWedgeError, SerializableEventContent, StateStoreExt,
56 StoredThreadSubscription, ThreadSubscriptionStatus,
57 },
58 utils::RawStateEventWithKeys,
59};
60
61#[allow(async_fn_in_trait)]
66pub trait StateStoreIntegrationTests {
67 async fn populate(&self) -> TestResult;
69 async fn test_topic_redaction(&self) -> TestResult;
71 async fn test_populate_store(&self) -> TestResult;
73 async fn test_member_saving(&self) -> TestResult;
75 async fn test_filter_saving(&self) -> TestResult;
77 async fn test_user_avatar_url_saving(&self) -> TestResult;
79 async fn test_sync_token_saving(&self) -> TestResult;
81 async fn test_utd_hook_manager_data_saving(&self) -> TestResult;
83 async fn test_one_time_key_already_uploaded_data_saving(&self) -> TestResult;
85 async fn test_stripped_member_saving(&self) -> TestResult;
87 async fn test_power_level_saving(&self) -> TestResult;
89 async fn test_receipts_saving(&self) -> TestResult;
91 async fn test_custom_storage(&self) -> TestResult;
93 async fn test_stripped_non_stripped(&self) -> TestResult;
95 async fn test_room_removal(&self) -> TestResult;
97 async fn test_profile_removal(&self) -> TestResult;
99 async fn test_presence_saving(&self) -> TestResult;
101 async fn test_display_names_saving(&self) -> TestResult;
103 async fn test_send_queue(&self) -> TestResult;
105 async fn test_send_queue_priority(&self) -> TestResult;
107 async fn test_send_queue_dependents(&self) -> TestResult;
109 async fn test_update_send_queue_dependent(&self) -> TestResult;
111 async fn test_supported_versions_saving(&self) -> TestResult;
113 async fn test_well_known_saving(&self) -> TestResult;
115 async fn test_get_room_infos(&self) -> TestResult;
117 async fn test_thread_subscriptions(&self) -> TestResult;
119 async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult;
121}
122
123impl StateStoreIntegrationTests for DynStateStore {
124 async fn populate(&self) -> TestResult {
125 let mut changes = StateChanges::default();
126
127 let user_id = user_id();
128 let invited_user_id = invited_user_id();
129 let room_id = room_id();
130 let stripped_room_id = stripped_room_id();
131
132 changes.sync_token = Some("t392-516_47314_0_7_1_1_1_11444_1".to_owned());
133
134 let f = EventFactory::new().sender(user_id);
135 let presence_raw: Raw<PresenceEvent> = f
136 .presence(PresenceState::Online)
137 .avatar_url(mxc_uri!("mxc://localhost/wefuiwegh8742w"))
138 .currently_active(false)
139 .last_active_ago(1)
140 .status_msg("Making cupcakes")
141 .into();
142 let presence_event = presence_raw.deserialize()?;
143 changes.add_presence_event(presence_event, presence_raw);
144
145 let f = EventFactory::new().sender(user_id);
146 let pushrules_raw: Raw<AnyGlobalAccountDataEvent> =
147 f.push_rules(Ruleset::server_default(user_id)).into();
148 let pushrules_event = pushrules_raw.deserialize()?;
149 changes.account_data.insert(pushrules_event.event_type(), pushrules_raw);
150
151 let mut room = RoomInfo::new(room_id, RoomState::Joined);
152 room.mark_as_left();
153
154 let f = EventFactory::new().sender(user_id).room(room_id);
155 let mut tags = Tags::new();
156 tags.insert(TagName::Favorite, TagInfo::new());
157 tags.insert(TagName::User(UserTagName::from_str("u.work").unwrap()), TagInfo::new());
158 let tag_raw: Raw<AnyRoomAccountDataEvent> = f.tag(tags).into();
159 let tag_event = tag_raw.deserialize()?;
160 changes.add_room_account_data(room_id, tag_event, tag_raw);
161
162 let f = EventFactory::new().sender(user_id).room(room_id);
163 let name_raw: Raw<AnySyncStateEvent> = f.room_name("room name").into();
164 let name_event = name_raw.deserialize()?;
165 room.handle_state_event(
166 &mut RawStateEventWithKeys::try_from_raw_state_event(name_raw.clone())
167 .expect("generated state event should be valid"),
168 );
169 changes.add_state_event(room_id, name_event, name_raw);
170
171 let f = EventFactory::new().sender(user_id);
172 let receipt_content = f
173 .room(room_id)
174 .read_receipts()
175 .add(event_id!("$example"), user_id, ReceiptType::Read, ReceiptThread::Unthreaded)
176 .into_content();
177 changes.add_receipts(room_id, receipt_content);
178
179 let topic_event_id = topic_event_id();
180 let topic_raw: Raw<AnySyncStateEvent> = EventFactory::new()
181 .room(room_id)
182 .sender(user_id)
183 .room_topic("😀")
184 .event_id(topic_event_id)
185 .prev_content(RoomTopicEventContent::new("test".to_owned()))
186 .into_raw_sync_state();
187 let topic_event = topic_raw.deserialize()?;
188 room.handle_state_event(
189 &mut RawStateEventWithKeys::try_from_raw_state_event(topic_raw.clone())
190 .expect("generated state event should be valid"),
191 );
192 changes.add_state_event(room_id, topic_event, topic_raw);
193
194 let mut room_ambiguity_map = HashMap::new();
195 let mut room_profiles = BTreeMap::new();
196
197 let f = EventFactory::new().sender(user_id).room(room_id);
198 let member_raw: Raw<SyncRoomMemberEvent> =
199 f.member(user_id).display_name("example").previous(MembershipState::Invite).into();
200 let member_event: SyncRoomMemberEvent = member_raw.deserialize()?;
201 let displayname = DisplayName::new(
202 member_event.as_original().unwrap().content.displayname.as_ref().unwrap(),
203 );
204 room_ambiguity_map.insert(displayname.clone(), BTreeSet::from([user_id.to_owned()]));
205 room_profiles.insert(user_id.to_owned(), (&member_event).into());
206
207 let member_raw: Raw<AnySyncStateEvent> = member_raw.cast_unchecked();
208 let member_state_event = member_raw.deserialize()?;
209 changes.add_state_event(room_id, member_state_event, member_raw);
210
211 let f = EventFactory::new().sender(user_id).room(room_id);
212 let invited_member_raw: Raw<SyncRoomMemberEvent> = f
213 .member(invited_user_id)
214 .invited(invited_user_id)
215 .display_name("example")
216 .avatar_url(mxc_uri!("mxc://localhost/SEsfnsuifSDFSSEF"))
217 .reason("Looking for support")
218 .into();
219 let invited_member_event: SyncRoomMemberEvent = invited_member_raw.deserialize()?;
221 room_ambiguity_map.entry(displayname).or_default().insert(invited_user_id.to_owned());
222 room_profiles.insert(invited_user_id.to_owned(), (&invited_member_event).into());
223
224 let invited_member_raw: Raw<AnySyncStateEvent> = invited_member_raw.cast_unchecked();
225 let invited_member_state_event = invited_member_raw.deserialize()?;
226 changes.add_state_event(room_id, invited_member_state_event, invited_member_raw);
227
228 changes.ambiguity_maps.insert(room_id.to_owned(), room_ambiguity_map);
229 changes.profiles.insert(room_id.to_owned(), room_profiles);
230 changes.add_room(room);
231
232 let mut stripped_room = RoomInfo::new(stripped_room_id, RoomState::Invited);
233
234 let f = EventFactory::new().sender(user_id).room(stripped_room_id);
235 let stripped_name_raw: Raw<AnyStrippedStateEvent> = f.room_name("room name").into();
236 let mut stripped_name_event =
237 RawStateEventWithKeys::try_from_raw_state_event(stripped_name_raw).unwrap();
238 stripped_room.handle_stripped_state_event(&mut stripped_name_event);
239 changes.stripped_state.insert(
240 stripped_room_id.to_owned(),
241 BTreeMap::from([(
242 stripped_name_event.event_type,
243 BTreeMap::from([(stripped_name_event.state_key, stripped_name_event.raw)]),
244 )]),
245 );
246
247 changes.add_room(stripped_room);
248
249 let f = EventFactory::new().sender(user_id).room(stripped_room_id);
250 let stripped_member_raw: Raw<StrippedRoomMemberEvent> =
251 f.member(user_id).display_name("example").into();
252 changes.add_stripped_member(stripped_room_id, user_id, stripped_member_raw);
253
254 self.save_changes(&changes).await?;
255
256 Ok(())
257 }
258
259 async fn test_topic_redaction(&self) -> TestResult {
260 let room_id = room_id();
261 let f = EventFactory::new();
262 self.populate().await?;
263
264 assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
265 assert_eq!(
266 self.get_state_event_static::<RoomTopicEventContent>(room_id)
267 .await?
268 .expect("room topic found before redaction")
269 .deserialize()
270 .expect("can deserialize room topic before redaction")
271 .as_sync()
272 .expect("room topic is a sync state event")
273 .as_original()
274 .expect("room topic is not redacted yet")
275 .content
276 .topic,
277 "😀"
278 );
279
280 let mut changes = StateChanges::default();
281
282 let topic_event_id = topic_event_id();
283 let redaction_evt: Raw<SyncRoomRedactionEvent> =
284 f.room(room_id).sender(user_id()).redaction(topic_event_id).into_raw();
285
286 changes.add_redaction(room_id, topic_event_id, redaction_evt);
287 self.save_changes(&changes).await?;
288
289 let redacted_event = self
290 .get_state_event_static::<RoomTopicEventContent>(room_id)
291 .await?
292 .expect("room topic found after redaction")
293 .deserialize()
294 .expect("can deserialize room topic after redaction");
295
296 assert_matches!(redacted_event.as_sync(), Some(SyncStateEvent::Redacted(_)));
297
298 Ok(())
299 }
300
301 async fn test_populate_store(&self) -> TestResult {
302 let room_id = room_id();
303 let user_id = user_id();
304 let display_name = DisplayName::new("example");
305
306 self.populate().await?;
307
308 assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
309 assert!(self.get_presence_event(user_id).await?.is_some());
310 assert_eq!(
311 self.get_room_infos(&RoomLoadSettings::default()).await?.len(),
312 2,
313 "Expected to find 2 room infos"
314 );
315 assert!(
316 self.get_account_data_event(GlobalAccountDataEventType::PushRules).await?.is_some()
317 );
318
319 assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_some());
320 assert_eq!(
321 self.get_state_events(room_id, StateEventType::RoomTopic).await?.len(),
322 1,
323 "Expected to find 1 room topic"
324 );
325 assert!(self.get_profile(room_id, user_id).await?.is_some());
326 assert!(self.get_member_event(room_id, user_id).await?.is_some());
327 assert_eq!(
328 self.get_user_ids(room_id, RoomMemberships::empty()).await?.len(),
329 2,
330 "Expected to find 2 members for room"
331 );
332 assert_eq!(
333 self.get_user_ids(room_id, RoomMemberships::INVITE).await?.len(),
334 1,
335 "Expected to find 1 invited user ids"
336 );
337 assert_eq!(
338 self.get_user_ids(room_id, RoomMemberships::JOIN).await?.len(),
339 1,
340 "Expected to find 1 joined user ids"
341 );
342 assert_eq!(
343 self.get_users_with_display_name(room_id, &display_name).await?.len(),
344 2,
345 "Expected to find 2 display names for room"
346 );
347 assert!(
348 self.get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
349 .await?
350 .is_some()
351 );
352 assert!(
353 self.get_user_room_receipt_event(
354 room_id,
355 ReceiptType::Read,
356 ReceiptThread::Unthreaded,
357 user_id
358 )
359 .await?
360 .is_some()
361 );
362 assert_eq!(
363 self.get_event_room_receipt_events(
364 room_id,
365 ReceiptType::Read,
366 ReceiptThread::Unthreaded,
367 first_receipt_event_id()
368 )
369 .await?
370 .len(),
371 1,
372 "Expected to find 1 read receipt"
373 );
374 Ok(())
375 }
376
377 async fn test_member_saving(&self) -> TestResult {
378 let room_id = room_id!("!test_member_saving:localhost");
379 let user_id = user_id();
380 let second_user_id = user_id!("@second:localhost");
381 let third_user_id = user_id!("@third:localhost");
382 let unknown_user_id = user_id!("@unknown:localhost");
383
384 let mut user_ids = vec![user_id.to_owned()];
386 assert!(self.get_member_event(room_id, user_id).await?.is_none());
387 let member_events = self
388 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
389 .await;
390 assert!(member_events?.is_empty());
391 assert!(self.get_profile(room_id, user_id).await?.is_none());
392 let profiles = self.get_profiles(room_id, &user_ids).await;
393 assert!(profiles?.is_empty());
394
395 let mut changes = StateChanges::default();
397 let raw_member_event = membership_event();
398 let profile = raw_member_event.deserialize()?.into();
399 changes
400 .state
401 .entry(room_id.to_owned())
402 .or_default()
403 .entry(StateEventType::RoomMember)
404 .or_default()
405 .insert(user_id.into(), raw_member_event.cast());
406 changes.profiles.entry(room_id.to_owned()).or_default().insert(user_id.to_owned(), profile);
407 self.save_changes(&changes).await?;
408
409 assert!(self.get_member_event(room_id, user_id).await?.is_some());
410 let member_events = self
411 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
412 .await;
413 assert_eq!(member_events?.len(), 1);
414 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
415 assert_eq!(members.len(), 1, "We expected to find members for the room");
416 assert!(self.get_profile(room_id, user_id).await?.is_some());
417 let profiles = self.get_profiles(room_id, &user_ids).await;
418 assert_eq!(profiles?.len(), 1);
419
420 let mut changes = StateChanges::default();
422 let changes_members = changes
423 .state
424 .entry(room_id.to_owned())
425 .or_default()
426 .entry(StateEventType::RoomMember)
427 .or_default();
428 let changes_profiles = changes.profiles.entry(room_id.to_owned()).or_default();
429 let raw_second_member_event =
430 custom_membership_event(second_user_id, event_id!("$second_member_event"));
431 let second_profile = raw_second_member_event.deserialize()?.into();
432 changes_members.insert(second_user_id.into(), raw_second_member_event.cast());
433 changes_profiles.insert(second_user_id.to_owned(), second_profile);
434 let raw_third_member_event =
435 custom_membership_event(third_user_id, event_id!("$third_member_event"));
436 let third_profile = raw_third_member_event.deserialize()?.into();
437 changes_members.insert(third_user_id.into(), raw_third_member_event.cast());
438 changes_profiles.insert(third_user_id.to_owned(), third_profile);
439 self.save_changes(&changes).await?;
440
441 user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
442 assert!(self.get_member_event(room_id, second_user_id).await?.is_some());
443 assert!(self.get_member_event(room_id, third_user_id).await?.is_some());
444 let member_events = self
445 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
446 .await;
447 assert_eq!(member_events?.len(), 3);
448 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
449 assert_eq!(members.len(), 3, "We expected to find members for the room");
450 assert!(self.get_profile(room_id, second_user_id).await?.is_some());
451 assert!(self.get_profile(room_id, third_user_id).await?.is_some());
452 let profiles = self.get_profiles(room_id, &user_ids).await;
453 assert_eq!(profiles?.len(), 3);
454
455 user_ids.push(unknown_user_id.to_owned());
457 let member_events = self
458 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
459 .await;
460 assert_eq!(member_events?.len(), 3);
461 let profiles = self.get_profiles(room_id, &user_ids).await;
462 assert_eq!(profiles?.len(), 3);
463
464 let member_events = self
466 .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
467 room_id,
468 &[],
469 )
470 .await;
471 assert!(member_events?.is_empty());
472 let profiles = self.get_profiles(room_id, &[]).await;
473 assert!(profiles?.is_empty());
474
475 Ok(())
476 }
477
478 async fn test_filter_saving(&self) -> TestResult {
479 let filter_name = "filter_name";
480 let filter_id = "filter_id_1234";
481
482 self.set_kv_data(
483 StateStoreDataKey::Filter(filter_name),
484 StateStoreDataValue::Filter(filter_id.to_owned()),
485 )
486 .await?;
487 assert_let!(
488 Ok(Some(StateStoreDataValue::Filter(stored_filter_id))) =
489 self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await
490 );
491 assert_eq!(stored_filter_id, filter_id);
492
493 self.remove_kv_data(StateStoreDataKey::Filter(filter_name)).await?;
494 assert_matches!(self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await, Ok(None));
495
496 Ok(())
497 }
498
499 async fn test_user_avatar_url_saving(&self) -> TestResult {
500 let user_id = user_id!("@alice:example.org");
501 let url = owned_mxc_uri!("mxc://example.org/poiuyt098");
502
503 self.set_kv_data(
504 StateStoreDataKey::UserAvatarUrl(user_id),
505 StateStoreDataValue::UserAvatarUrl(url.clone()),
506 )
507 .await?;
508
509 assert_let!(
510 Ok(Some(StateStoreDataValue::UserAvatarUrl(stored_url))) =
511 self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await
512 );
513 assert_eq!(stored_url, url);
514
515 self.remove_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await?;
516 assert_matches!(
517 self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await,
518 Ok(None)
519 );
520
521 Ok(())
522 }
523
524 async fn test_supported_versions_saving(&self) -> TestResult {
525 let versions =
526 BTreeSet::from([MatrixVersion::V1_1, MatrixVersion::V1_2, MatrixVersion::V1_11]);
527 let supported_versions = SupportedVersionsResponse {
528 versions: versions.iter().map(|version| version.as_str().unwrap().to_owned()).collect(),
529 unstable_features: [("org.matrix.experimental".to_owned(), true)].into(),
530 };
531
532 self.set_kv_data(
533 StateStoreDataKey::SupportedVersions,
534 StateStoreDataValue::SupportedVersions(TtlStoreValue::new(supported_versions.clone())),
535 )
536 .await?;
537
538 assert_let!(
539 Ok(Some(StateStoreDataValue::SupportedVersions(stored_supported_versions))) =
540 self.get_kv_data(StateStoreDataKey::SupportedVersions).await
541 );
542 assert_let!(Some(stored_supported_versions) = stored_supported_versions.into_data());
543 assert_eq!(supported_versions, stored_supported_versions);
544
545 let stored_supported = stored_supported_versions.supported_versions();
546 assert_eq!(stored_supported.versions, versions);
547 assert_eq!(stored_supported.features.len(), 1);
548 assert!(stored_supported.features.contains(&FeatureFlag::from("org.matrix.experimental")));
549
550 self.remove_kv_data(StateStoreDataKey::SupportedVersions).await?;
551 assert_matches!(self.get_kv_data(StateStoreDataKey::SupportedVersions).await, Ok(None));
552
553 Ok(())
554 }
555
556 async fn test_well_known_saving(&self) -> TestResult {
557 let well_known = WellKnownResponse {
558 homeserver: HomeserverInfo::new("matrix.example.com".to_owned()),
559 identity_server: None,
560 tile_server: None,
561 rtc_foci: vec![RtcFocusInfo::livekit("livekit.example.com".to_owned())],
562 };
563
564 self.set_kv_data(
565 StateStoreDataKey::WellKnown,
566 StateStoreDataValue::WellKnown(TtlStoreValue::new(Some(well_known.clone()))),
567 )
568 .await?;
569
570 assert_let!(
571 Ok(Some(StateStoreDataValue::WellKnown(stored_well_known))) =
572 self.get_kv_data(StateStoreDataKey::WellKnown).await
573 );
574 assert_let!(Some(stored_well_known) = stored_well_known.into_data());
575 assert_eq!(stored_well_known, Some(well_known));
576
577 self.remove_kv_data(StateStoreDataKey::WellKnown).await?;
578 assert_matches!(self.get_kv_data(StateStoreDataKey::WellKnown).await, Ok(None));
579
580 self.set_kv_data(
581 StateStoreDataKey::WellKnown,
582 StateStoreDataValue::WellKnown(TtlStoreValue::new(None)),
583 )
584 .await?;
585
586 assert_let!(
587 Ok(Some(StateStoreDataValue::WellKnown(stored_well_known))) =
588 self.get_kv_data(StateStoreDataKey::WellKnown).await
589 );
590 assert_let!(Some(stored_well_known) = stored_well_known.into_data());
591 assert_eq!(stored_well_known, None);
592
593 Ok(())
594 }
595
596 async fn test_sync_token_saving(&self) -> TestResult {
597 let sync_token_1 = "t392-516_47314_0_7_1";
598 let sync_token_2 = "t392-516_47314_0_7_2";
599
600 assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
601
602 let changes =
603 StateChanges { sync_token: Some(sync_token_1.to_owned()), ..Default::default() };
604 self.save_changes(&changes).await?;
605 assert_let!(
606 Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
607 self.get_kv_data(StateStoreDataKey::SyncToken).await
608 );
609 assert_eq!(stored_sync_token, sync_token_1);
610
611 self.set_kv_data(
612 StateStoreDataKey::SyncToken,
613 StateStoreDataValue::SyncToken(sync_token_2.to_owned()),
614 )
615 .await?;
616 assert_let!(
617 Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
618 self.get_kv_data(StateStoreDataKey::SyncToken).await
619 );
620 assert_eq!(stored_sync_token, sync_token_2);
621
622 self.remove_kv_data(StateStoreDataKey::SyncToken).await?;
623 assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
624
625 Ok(())
626 }
627
628 async fn test_utd_hook_manager_data_saving(&self) -> TestResult {
629 assert!(
631 self.get_kv_data(StateStoreDataKey::UtdHookManagerData)
632 .await
633 .expect("Could not read data")
634 .is_none(),
635 "Store was not empty at start"
636 );
637
638 let data = GrowableBloomBuilder::new().build();
640 self.set_kv_data(
641 StateStoreDataKey::UtdHookManagerData,
642 StateStoreDataValue::UtdHookManagerData(data.clone()),
643 )
644 .await
645 .expect("Could not save data");
646
647 let read_data = self
649 .get_kv_data(StateStoreDataKey::UtdHookManagerData)
650 .await
651 .expect("Could not read data")
652 .expect("no data found")
653 .into_utd_hook_manager_data()
654 .expect("not UtdHookManagerData");
655
656 assert_eq!(read_data, data);
657
658 Ok(())
659 }
660
661 async fn test_one_time_key_already_uploaded_data_saving(&self) -> TestResult {
662 assert!(
664 self.get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded).await?.is_none(),
665 "Store was not empty at start"
666 );
667
668 self.set_kv_data(
669 StateStoreDataKey::OneTimeKeyAlreadyUploaded,
670 StateStoreDataValue::OneTimeKeyAlreadyUploaded,
671 )
672 .await?;
673
674 let data = self.get_kv_data(StateStoreDataKey::OneTimeKeyAlreadyUploaded).await?;
675 data.expect("The loaded data should be Some");
676
677 Ok(())
678 }
679
680 async fn test_stripped_member_saving(&self) -> TestResult {
681 let room_id = room_id!("!test_stripped_member_saving:localhost");
682 let user_id = user_id();
683 let second_user_id = user_id!("@second:localhost");
684 let third_user_id = user_id!("@third:localhost");
685 let unknown_user_id = user_id!("@unknown:localhost");
686
687 assert!(self.get_member_event(room_id, user_id).await?.is_none());
689 let member_events = self
690 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
691 room_id,
692 &[user_id.to_owned()],
693 )
694 .await;
695 assert!(member_events?.is_empty());
696
697 let mut changes = StateChanges::default();
699 changes
700 .stripped_state
701 .entry(room_id.to_owned())
702 .or_default()
703 .entry(StateEventType::RoomMember)
704 .or_default()
705 .insert(user_id.into(), stripped_membership_event().cast());
706 self.save_changes(&changes).await?;
707
708 assert!(self.get_member_event(room_id, user_id).await?.is_some());
709 let member_events = self
710 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
711 room_id,
712 &[user_id.to_owned()],
713 )
714 .await;
715 assert_eq!(member_events?.len(), 1);
716 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
717 assert_eq!(members.len(), 1, "We expected to find members for the room");
718
719 let mut changes = StateChanges::default();
721 let changes_members = changes
722 .stripped_state
723 .entry(room_id.to_owned())
724 .or_default()
725 .entry(StateEventType::RoomMember)
726 .or_default();
727 changes_members
728 .insert(second_user_id.into(), custom_stripped_membership_event(second_user_id).cast());
729 changes_members
730 .insert(third_user_id.into(), custom_stripped_membership_event(third_user_id).cast());
731 self.save_changes(&changes).await?;
732
733 assert!(self.get_member_event(room_id, second_user_id).await?.is_some());
734 assert!(self.get_member_event(room_id, third_user_id).await?.is_some());
735 let member_events = self
736 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
737 room_id,
738 &[user_id.to_owned(), second_user_id.to_owned(), third_user_id.to_owned()],
739 )
740 .await;
741 assert_eq!(member_events?.len(), 3);
742 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
743 assert_eq!(members.len(), 3, "We expected to find members for the room");
744
745 let member_events = self
747 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
748 room_id,
749 &[
750 user_id.to_owned(),
751 second_user_id.to_owned(),
752 third_user_id.to_owned(),
753 unknown_user_id.to_owned(),
754 ],
755 )
756 .await;
757 assert_eq!(member_events?.len(), 3);
758
759 let member_events = self
761 .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
762 room_id,
763 &[],
764 )
765 .await;
766 assert!(member_events?.is_empty());
767
768 Ok(())
769 }
770
771 async fn test_power_level_saving(&self) -> TestResult {
772 let room_id = room_id!("!test_power_level_saving:localhost");
773
774 let raw_event = power_level_event();
775 let event = raw_event.deserialize()?;
776
777 assert!(
778 self.get_state_event(room_id, StateEventType::RoomPowerLevels, "").await?.is_none()
779 );
780 let mut changes = StateChanges::default();
781 changes.add_state_event(room_id, event, raw_event);
782
783 self.save_changes(&changes).await?;
784 assert!(
785 self.get_state_event(room_id, StateEventType::RoomPowerLevels, "").await?.is_some()
786 );
787
788 Ok(())
789 }
790
791 async fn test_receipts_saving(&self) -> TestResult {
792 let room_id = room_id!("!test_receipts_saving:localhost");
793
794 let first_event_id = event_id!("$1435641916114394fHBLK:matrix.org");
795 let second_event_id = event_id!("$fHBLK1435641916114394:matrix.org");
796
797 let first_receipt_ts = uint!(1436451550);
798 let second_receipt_ts = uint!(1436451653);
799 let third_receipt_ts = uint!(1436474532);
800
801 let first_receipt_event = serde_json::from_value(json!({
802 first_event_id: {
803 "m.read": {
804 user_id(): {
805 "ts": first_receipt_ts,
806 }
807 }
808 }
809 }))?;
810
811 let second_receipt_event = serde_json::from_value(json!({
812 second_event_id: {
813 "m.read": {
814 user_id(): {
815 "ts": second_receipt_ts,
816 }
817 }
818 }
819 }))?;
820
821 let third_receipt_event = serde_json::from_value(json!({
822 second_event_id: {
823 "m.read": {
824 user_id(): {
825 "ts": third_receipt_ts,
826 "thread_id": "main",
827 }
828 }
829 }
830 }))?;
831
832 assert!(
833 self.get_user_room_receipt_event(
834 room_id,
835 ReceiptType::Read,
836 ReceiptThread::Unthreaded,
837 user_id()
838 )
839 .await
840 .expect("failed to read unthreaded user room receipt")
841 .is_none()
842 );
843 assert!(
844 self.get_event_room_receipt_events(
845 room_id,
846 ReceiptType::Read,
847 ReceiptThread::Unthreaded,
848 first_event_id
849 )
850 .await
851 .expect("failed to read unthreaded event room receipt for 1")
852 .is_empty()
853 );
854 assert!(
855 self.get_event_room_receipt_events(
856 room_id,
857 ReceiptType::Read,
858 ReceiptThread::Unthreaded,
859 second_event_id
860 )
861 .await
862 .expect("failed to read unthreaded event room receipt for 2")
863 .is_empty()
864 );
865
866 let mut changes = StateChanges::default();
867 changes.add_receipts(room_id, first_receipt_event);
868
869 self.save_changes(&changes).await?;
870 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
871 .get_user_room_receipt_event(
872 room_id,
873 ReceiptType::Read,
874 ReceiptThread::Unthreaded,
875 user_id(),
876 )
877 .await
878 .expect("failed to read unthreaded user room receipt after save")
879 .unwrap();
880 assert_eq!(unthreaded_user_receipt_event_id, first_event_id);
881 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, first_receipt_ts);
882 let first_event_unthreaded_receipts = self
883 .get_event_room_receipt_events(
884 room_id,
885 ReceiptType::Read,
886 ReceiptThread::Unthreaded,
887 first_event_id,
888 )
889 .await
890 .expect("failed to read unthreaded event room receipt for 1 after save");
891 assert_eq!(
892 first_event_unthreaded_receipts.len(),
893 1,
894 "Found a wrong number of unthreaded receipts for 1 after save"
895 );
896 assert_eq!(first_event_unthreaded_receipts[0].0, user_id());
897 assert_eq!(first_event_unthreaded_receipts[0].1.ts.unwrap().0, first_receipt_ts);
898 assert!(
899 self.get_event_room_receipt_events(
900 room_id,
901 ReceiptType::Read,
902 ReceiptThread::Unthreaded,
903 second_event_id
904 )
905 .await
906 .expect("failed to read unthreaded event room receipt for 2 after save")
907 .is_empty()
908 );
909
910 let mut changes = StateChanges::default();
911 changes.add_receipts(room_id, second_receipt_event);
912
913 self.save_changes(&changes).await.expect("Saving works");
914 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
915 .get_user_room_receipt_event(
916 room_id,
917 ReceiptType::Read,
918 ReceiptThread::Unthreaded,
919 user_id(),
920 )
921 .await
922 .expect("Getting unthreaded user room receipt after save failed")
923 .unwrap();
924 assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
925 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
926 assert!(
927 self.get_event_room_receipt_events(
928 room_id,
929 ReceiptType::Read,
930 ReceiptThread::Unthreaded,
931 first_event_id
932 )
933 .await
934 .expect("Getting unthreaded event room receipt events for first event failed")
935 .is_empty()
936 );
937 let second_event_unthreaded_receipts = self
938 .get_event_room_receipt_events(
939 room_id,
940 ReceiptType::Read,
941 ReceiptThread::Unthreaded,
942 second_event_id,
943 )
944 .await
945 .expect("Getting unthreaded event room receipt events for second event failed");
946 assert_eq!(
947 second_event_unthreaded_receipts.len(),
948 1,
949 "Found a wrong number of unthreaded receipts for second event after save"
950 );
951 assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
952 assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
953
954 assert!(
955 self.get_user_room_receipt_event(
956 room_id,
957 ReceiptType::Read,
958 ReceiptThread::Main,
959 user_id()
960 )
961 .await
962 .expect("failed to read threaded user room receipt")
963 .is_none()
964 );
965 assert!(
966 self.get_event_room_receipt_events(
967 room_id,
968 ReceiptType::Read,
969 ReceiptThread::Main,
970 second_event_id
971 )
972 .await
973 .expect("Getting threaded event room receipts for 2 failed")
974 .is_empty()
975 );
976
977 let mut changes = StateChanges::default();
978 changes.add_receipts(room_id, third_receipt_event);
979
980 self.save_changes(&changes).await.expect("Saving works");
981 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
983 .get_user_room_receipt_event(
984 room_id,
985 ReceiptType::Read,
986 ReceiptThread::Unthreaded,
987 user_id(),
988 )
989 .await
990 .expect("Getting unthreaded user room receipt after save failed")
991 .unwrap();
992 assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
993 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
994 let second_event_unthreaded_receipts = self
995 .get_event_room_receipt_events(
996 room_id,
997 ReceiptType::Read,
998 ReceiptThread::Unthreaded,
999 second_event_id,
1000 )
1001 .await
1002 .expect("Getting unthreaded event room receipt events for second event failed");
1003 assert_eq!(
1004 second_event_unthreaded_receipts.len(),
1005 1,
1006 "Found a wrong number of unthreaded receipts for second event after save"
1007 );
1008 assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
1009 assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
1010 let (threaded_user_receipt_event_id, threaded_user_receipt) = self
1012 .get_user_room_receipt_event(room_id, ReceiptType::Read, ReceiptThread::Main, user_id())
1013 .await
1014 .expect("Getting threaded user room receipt after save failed")
1015 .unwrap();
1016 assert_eq!(threaded_user_receipt_event_id, second_event_id);
1017 assert_eq!(threaded_user_receipt.ts.unwrap().0, third_receipt_ts);
1018 let second_event_threaded_receipts = self
1019 .get_event_room_receipt_events(
1020 room_id,
1021 ReceiptType::Read,
1022 ReceiptThread::Main,
1023 second_event_id,
1024 )
1025 .await
1026 .expect("Getting threaded event room receipt events for second event failed");
1027 assert_eq!(
1028 second_event_threaded_receipts.len(),
1029 1,
1030 "Found a wrong number of threaded receipts for second event after save"
1031 );
1032 assert_eq!(second_event_threaded_receipts[0].0, user_id());
1033 assert_eq!(second_event_threaded_receipts[0].1.ts.unwrap().0, third_receipt_ts);
1034
1035 Ok(())
1036 }
1037
1038 async fn test_custom_storage(&self) -> TestResult {
1039 let key = "my_key";
1040 let value = &[0, 1, 2, 3];
1041
1042 self.set_custom_value(key.as_bytes(), value.to_vec()).await?;
1043
1044 let read = self.get_custom_value(key.as_bytes()).await?;
1045
1046 assert_eq!(Some(value.as_ref()), read.as_deref());
1047
1048 Ok(())
1049 }
1050
1051 async fn test_stripped_non_stripped(&self) -> TestResult {
1052 let room_id = room_id!("!test_stripped_non_stripped:localhost");
1053 let user_id = user_id();
1054
1055 assert!(self.get_member_event(room_id, user_id).await?.is_none());
1056 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 0);
1057
1058 let mut changes = StateChanges::default();
1059 changes
1060 .state
1061 .entry(room_id.to_owned())
1062 .or_default()
1063 .entry(StateEventType::RoomMember)
1064 .or_default()
1065 .insert(user_id.into(), membership_event().cast());
1066 changes.add_room(RoomInfo::new(room_id, RoomState::Left));
1067 self.save_changes(&changes).await?;
1068
1069 let member_event = self.get_member_event(room_id, user_id).await?.unwrap().deserialize()?;
1070 assert!(matches!(member_event, MemberEvent::Sync(_)));
1071 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 1);
1072
1073 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
1074 assert_eq!(members, vec![user_id.to_owned()]);
1075
1076 let mut changes = StateChanges::default();
1077 changes.add_stripped_member(room_id, user_id, custom_stripped_membership_event(user_id));
1078 changes.add_room(RoomInfo::new(room_id, RoomState::Invited));
1079 self.save_changes(&changes).await?;
1080
1081 let member_event = self.get_member_event(room_id, user_id).await?.unwrap().deserialize()?;
1082 assert!(matches!(member_event, MemberEvent::Stripped(_)));
1083 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 1);
1084
1085 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await?;
1086 assert_eq!(members, vec![user_id.to_owned()]);
1087
1088 Ok(())
1089 }
1090
1091 async fn test_room_removal(&self) -> TestResult {
1092 let room_id = room_id();
1093 let user_id = user_id();
1094 let display_name = DisplayName::new("example");
1095 let stripped_room_id = stripped_room_id();
1096
1097 self.populate().await?;
1098
1099 {
1100 let txn = TransactionId::new();
1102 let ev =
1103 SerializableEventContent::new(&RoomMessageEventContent::text_plain("sup").into())?;
1104 self.save_send_queue_request(
1105 room_id,
1106 txn.clone(),
1107 MilliSecondsSinceUnixEpoch::now(),
1108 ev.into(),
1109 0,
1110 )
1111 .await?;
1112
1113 self.save_dependent_queued_request(
1115 room_id,
1116 &txn,
1117 ChildTransactionId::new(),
1118 MilliSecondsSinceUnixEpoch::now(),
1119 DependentQueuedRequestKind::RedactEvent,
1120 )
1121 .await?;
1122 }
1123
1124 self.remove_room(room_id).await?;
1125
1126 assert_eq!(
1127 self.get_room_infos(&RoomLoadSettings::default()).await?.len(),
1128 1,
1129 "room is still there"
1130 );
1131
1132 assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_none());
1133 assert!(
1134 self.get_state_events(room_id, StateEventType::RoomTopic).await?.is_empty(),
1135 "still state events found"
1136 );
1137 assert!(self.get_profile(room_id, user_id).await?.is_none());
1138 assert!(self.get_member_event(room_id, user_id).await?.is_none());
1139 assert!(
1140 self.get_user_ids(room_id, RoomMemberships::empty()).await?.is_empty(),
1141 "still user ids found"
1142 );
1143 assert!(
1144 self.get_user_ids(room_id, RoomMemberships::INVITE).await?.is_empty(),
1145 "still invited user ids found"
1146 );
1147 assert!(
1148 self.get_user_ids(room_id, RoomMemberships::JOIN).await?.is_empty(),
1149 "still joined users found"
1150 );
1151 assert!(
1152 self.get_users_with_display_name(room_id, &display_name).await?.is_empty(),
1153 "still display names found"
1154 );
1155 assert!(
1156 self.get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
1157 .await?
1158 .is_none()
1159 );
1160 assert!(
1161 self.get_user_room_receipt_event(
1162 room_id,
1163 ReceiptType::Read,
1164 ReceiptThread::Unthreaded,
1165 user_id
1166 )
1167 .await?
1168 .is_none()
1169 );
1170 assert!(
1171 self.get_event_room_receipt_events(
1172 room_id,
1173 ReceiptType::Read,
1174 ReceiptThread::Unthreaded,
1175 first_receipt_event_id()
1176 )
1177 .await?
1178 .is_empty(),
1179 "still event recepts in the store"
1180 );
1181 assert!(self.load_send_queue_requests(room_id).await?.is_empty());
1182 assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1183
1184 self.remove_room(stripped_room_id).await?;
1185
1186 assert!(
1187 self.get_room_infos(&RoomLoadSettings::default()).await?.is_empty(),
1188 "still room info found"
1189 );
1190 Ok(())
1191 }
1192
1193 async fn test_profile_removal(&self) -> TestResult {
1194 let room_id = room_id();
1195
1196 let user_id = user_id();
1198 let invited_user_id = invited_user_id();
1199
1200 self.populate().await?;
1201
1202 let new_invite_member_json = json!({
1203 "content": {
1204 "avatar_url": "mxc://localhost/SEsfnsuifSDFSSEG",
1205 "displayname": "example after update",
1206 "membership": "invite",
1207 "reason": "Looking for support"
1208 },
1209 "event_id": "$143273582443PhrSm:localhost",
1210 "origin_server_ts": 1432735824,
1211 "room_id": room_id,
1212 "sender": user_id,
1213 "state_key": invited_user_id,
1214 "type": "m.room.member",
1215 });
1216 let new_invite_member_event: SyncRoomMemberEvent =
1217 serde_json::from_value(new_invite_member_json.clone())?;
1218
1219 let mut changes = StateChanges {
1220 profiles_to_delete: [(
1222 room_id.to_owned(),
1223 vec![user_id.to_owned(), invited_user_id.to_owned()],
1224 )]
1225 .into(),
1226
1227 profiles: {
1229 let mut map = BTreeMap::default();
1230 map.insert(
1231 room_id.to_owned(),
1232 [(invited_user_id.to_owned(), new_invite_member_event.into())]
1233 .into_iter()
1234 .collect(),
1235 );
1236 map
1237 },
1238
1239 ..StateChanges::default()
1240 };
1241
1242 let raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(new_invite_member_json)
1243 .expect("can create sync-state-event for topic");
1244 let event = raw.deserialize()?;
1245 changes.add_state_event(room_id, event, raw);
1246
1247 self.save_changes(&changes).await?;
1248
1249 assert!(self.get_profile(room_id, user_id).await?.is_none());
1251 assert!(self.get_member_event(room_id, user_id).await?.is_some());
1252
1253 let invited_member_event = self.get_profile(room_id, invited_user_id).await?.unwrap();
1255 assert_eq!(
1256 invited_member_event.content.displayname.as_deref(),
1257 Some("example after update")
1258 );
1259 assert!(self.get_member_event(room_id, invited_user_id).await?.is_some());
1260
1261 Ok(())
1262 }
1263
1264 async fn test_presence_saving(&self) -> TestResult {
1265 let user_id = user_id();
1266 let second_user_id = user_id!("@second:localhost");
1267 let third_user_id = user_id!("@third:localhost");
1268 let unknown_user_id = user_id!("@unknown:localhost");
1269
1270 let mut user_ids = vec![user_id.to_owned()];
1272 let presence_event = self.get_presence_event(user_id).await;
1273 assert!(presence_event?.is_none());
1274 let presence_events = self.get_presence_events(&user_ids).await;
1275 assert!(presence_events?.is_empty());
1276
1277 let mut changes = StateChanges::default();
1279 changes.presence.insert(user_id.to_owned(), custom_presence_event(user_id));
1280 self.save_changes(&changes).await?;
1281
1282 let presence_event = self.get_presence_event(user_id).await;
1283 assert!(presence_event?.is_some());
1284 let presence_events = self.get_presence_events(&user_ids).await;
1285 assert_eq!(presence_events?.len(), 1);
1286
1287 let mut changes = StateChanges::default();
1289 changes.presence.insert(second_user_id.to_owned(), custom_presence_event(second_user_id));
1290 changes.presence.insert(third_user_id.to_owned(), custom_presence_event(third_user_id));
1291 self.save_changes(&changes).await?;
1292
1293 user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
1294 let presence_event = self.get_presence_event(second_user_id).await;
1295 assert!(presence_event?.is_some());
1296 let presence_event = self.get_presence_event(third_user_id).await;
1297 assert!(presence_event?.is_some());
1298 let presence_events = self.get_presence_events(&user_ids).await;
1299 assert_eq!(presence_events?.len(), 3);
1300
1301 user_ids.push(unknown_user_id.to_owned());
1303 let member_events = self.get_presence_events(&user_ids).await;
1304 assert_eq!(member_events?.len(), 3);
1305
1306 let presence_events = self.get_presence_events(&[]).await;
1308 assert!(presence_events?.is_empty());
1309
1310 Ok(())
1311 }
1312
1313 async fn test_display_names_saving(&self) -> TestResult {
1314 let room_id = room_id!("!test_display_names_saving:localhost");
1315 let user_id = user_id();
1316 let user_display_name = DisplayName::new("User");
1317 let second_user_id = user_id!("@second:localhost");
1318 let third_user_id = user_id!("@third:localhost");
1319 let other_display_name = DisplayName::new("Raoul");
1320 let unknown_display_name = DisplayName::new("Unknown");
1321
1322 let mut display_names = vec![user_display_name.to_owned()];
1324 let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1325 assert!(users.is_empty());
1326 let names = self.get_users_with_display_names(room_id, &display_names).await?;
1327 assert!(names.is_empty());
1328
1329 let mut changes = StateChanges::default();
1331 changes
1332 .ambiguity_maps
1333 .entry(room_id.to_owned())
1334 .or_default()
1335 .insert(user_display_name.to_owned(), [user_id.to_owned()].into());
1336 self.save_changes(&changes).await?;
1337
1338 let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1339 assert_eq!(users.len(), 1);
1340 let names = self.get_users_with_display_names(room_id, &display_names).await?;
1341 assert_eq!(names.len(), 1);
1342 assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1343
1344 let mut changes = StateChanges::default();
1346 changes.ambiguity_maps.entry(room_id.to_owned()).or_default().insert(
1347 other_display_name.to_owned(),
1348 [second_user_id.to_owned(), third_user_id.to_owned()].into(),
1349 );
1350 self.save_changes(&changes).await?;
1351
1352 display_names.push(other_display_name.to_owned());
1353 let users = self.get_users_with_display_name(room_id, &user_display_name).await?;
1354 assert_eq!(users.len(), 1);
1355 let users = self.get_users_with_display_name(room_id, &other_display_name).await?;
1356 assert_eq!(users.len(), 2);
1357 let names = self.get_users_with_display_names(room_id, &display_names).await?;
1358 assert_eq!(names.len(), 2);
1359 assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1360 assert_eq!(names.get(&other_display_name).unwrap().len(), 2);
1361
1362 display_names.push(unknown_display_name.to_owned());
1364 let names = self.get_users_with_display_names(room_id, &display_names).await?;
1365 assert_eq!(names.len(), 2);
1366
1367 let names = self.get_users_with_display_names(room_id, &[]).await?;
1369 assert!(names.is_empty());
1370
1371 Ok(())
1372 }
1373
1374 #[allow(clippy::needless_range_loop)]
1375 async fn test_send_queue(&self) -> TestResult {
1376 let room_id = room_id!("!test_send_queue:localhost");
1377
1378 let events = self.load_send_queue_requests(room_id).await?;
1380 assert!(events.is_empty());
1381
1382 let txn0 = TransactionId::new();
1384 let event0 =
1385 SerializableEventContent::new(&RoomMessageEventContent::text_plain("msg0").into())?;
1386 self.save_send_queue_request(
1387 room_id,
1388 txn0.clone(),
1389 MilliSecondsSinceUnixEpoch::now(),
1390 event0.into(),
1391 0,
1392 )
1393 .await?;
1394
1395 let pending = self.load_send_queue_requests(room_id).await?;
1397
1398 assert_eq!(pending.len(), 1);
1399 {
1400 assert_eq!(pending[0].transaction_id, txn0);
1401
1402 let deserialized = pending[0].as_event().unwrap().deserialize()?;
1403 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1404 assert_eq!(content.body(), "msg0");
1405
1406 assert!(!pending[0].is_wedged());
1407 }
1408
1409 for i in 1..=3 {
1411 let txn = TransactionId::new();
1412 let event = SerializableEventContent::new(
1413 &RoomMessageEventContent::text_plain(format!("msg{i}")).into(),
1414 )?;
1415
1416 self.save_send_queue_request(
1417 room_id,
1418 txn,
1419 MilliSecondsSinceUnixEpoch::now(),
1420 event.into(),
1421 0,
1422 )
1423 .await?;
1424 }
1425
1426 let pending = self.load_send_queue_requests(room_id).await?;
1428
1429 assert_eq!(pending.len(), 4);
1431
1432 assert_eq!(pending[0].transaction_id, txn0);
1433
1434 for i in 0..4 {
1435 let deserialized = pending[i].as_event().unwrap().deserialize()?;
1436 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1437 assert_eq!(content.body(), format!("msg{i}"));
1438 assert!(!pending[i].is_wedged());
1439 }
1440
1441 let txn2 = &pending[2].transaction_id;
1443 self.update_send_queue_request_status(
1444 room_id,
1445 txn2,
1446 Some(QueueWedgeError::GenericApiError { msg: "Oops".to_owned() }),
1447 )
1448 .await?;
1449
1450 let pending = self.load_send_queue_requests(room_id).await?;
1452
1453 assert_eq!(pending.len(), 4);
1455 assert_eq!(pending[0].transaction_id, txn0);
1456 assert_eq!(pending[2].transaction_id, *txn2);
1457 assert!(pending[2].is_wedged());
1458 let error = pending[2].clone().error.unwrap();
1459 let generic_error = assert_matches!(error, QueueWedgeError::GenericApiError { msg } => msg);
1460 assert_eq!(generic_error, "Oops");
1461 for i in 0..4 {
1462 if i != 2 {
1463 assert!(!pending[i].is_wedged());
1464 }
1465 }
1466
1467 let event0 = SerializableEventContent::new(
1469 &RoomMessageEventContent::text_plain("wow that's a cool test").into(),
1470 )?;
1471 self.update_send_queue_request(room_id, txn2, event0.into()).await?;
1472
1473 let pending = self.load_send_queue_requests(room_id).await?;
1475
1476 assert_eq!(pending.len(), 4);
1477 {
1478 assert_eq!(pending[2].transaction_id, *txn2);
1479
1480 let deserialized = pending[2].as_event().unwrap().deserialize()?;
1481 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1482 assert_eq!(content.body(), "wow that's a cool test");
1483
1484 assert!(!pending[2].is_wedged());
1485
1486 for i in 0..4 {
1487 if i != 2 {
1488 let deserialized = pending[i].as_event().unwrap().deserialize()?;
1489 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1490 assert_eq!(content.body(), format!("msg{i}"));
1491
1492 assert!(!pending[i].is_wedged());
1493 }
1494 }
1495 }
1496
1497 self.remove_send_queue_request(room_id, &txn0).await?;
1499
1500 let pending = self.load_send_queue_requests(room_id).await?;
1502
1503 assert_eq!(pending.len(), 3);
1504 assert_eq!(pending[1].transaction_id, *txn2);
1505 for i in 0..3 {
1506 assert_ne!(pending[i].transaction_id, txn0);
1507 }
1508
1509 let room_id2 = room_id!("!test_send_queue_two:localhost");
1514 {
1515 let txn = TransactionId::new();
1516 let event = SerializableEventContent::new(
1517 &RoomMessageEventContent::text_plain("room2").into(),
1518 )?;
1519 self.save_send_queue_request(
1520 room_id2,
1521 txn.clone(),
1522 MilliSecondsSinceUnixEpoch::now(),
1523 event.into(),
1524 0,
1525 )
1526 .await?;
1527 }
1528
1529 {
1531 let room_id3 = room_id!("!test_send_queue_three:localhost");
1532 let txn = TransactionId::new();
1533 let event = SerializableEventContent::new(
1534 &RoomMessageEventContent::text_plain("room3").into(),
1535 )?;
1536 self.save_send_queue_request(
1537 room_id3,
1538 txn.clone(),
1539 MilliSecondsSinceUnixEpoch::now(),
1540 event.into(),
1541 0,
1542 )
1543 .await?;
1544
1545 self.remove_send_queue_request(room_id3, &txn).await?;
1546 }
1547
1548 let outstanding_rooms = self.load_rooms_with_unsent_requests().await?;
1551 assert_eq!(outstanding_rooms.len(), 2);
1552 assert!(outstanding_rooms.iter().any(|room| room == room_id));
1553 assert!(outstanding_rooms.iter().any(|room| room == room_id2));
1554
1555 Ok(())
1556 }
1557
1558 async fn test_send_queue_priority(&self) -> TestResult {
1559 let room_id = room_id!("!test_send_queue:localhost");
1560
1561 let events = self.load_send_queue_requests(room_id).await?;
1563 assert!(events.is_empty());
1564
1565 let low0_txn = TransactionId::new();
1567 let ev0 =
1568 SerializableEventContent::new(&RoomMessageEventContent::text_plain("low0").into())?;
1569 self.save_send_queue_request(
1570 room_id,
1571 low0_txn.clone(),
1572 MilliSecondsSinceUnixEpoch::now(),
1573 ev0.into(),
1574 2,
1575 )
1576 .await?;
1577
1578 let high_txn = TransactionId::new();
1580 let ev1 =
1581 SerializableEventContent::new(&RoomMessageEventContent::text_plain("high").into())?;
1582 self.save_send_queue_request(
1583 room_id,
1584 high_txn.clone(),
1585 MilliSecondsSinceUnixEpoch::now(),
1586 ev1.into(),
1587 10,
1588 )
1589 .await?;
1590
1591 let low1_txn = TransactionId::new();
1593 let ev2 =
1594 SerializableEventContent::new(&RoomMessageEventContent::text_plain("low1").into())?;
1595 self.save_send_queue_request(
1596 room_id,
1597 low1_txn.clone(),
1598 MilliSecondsSinceUnixEpoch::now(),
1599 ev2.into(),
1600 2,
1601 )
1602 .await?;
1603
1604 let pending = self.load_send_queue_requests(room_id).await?;
1607
1608 assert_eq!(pending.len(), 3);
1609 {
1610 assert_eq!(pending[0].transaction_id, high_txn);
1611
1612 let deserialized = pending[0].as_event().unwrap().deserialize()?;
1613 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1614 assert_eq!(content.body(), "high");
1615 }
1616
1617 {
1618 assert_eq!(pending[1].transaction_id, low0_txn);
1619
1620 let deserialized = pending[1].as_event().unwrap().deserialize()?;
1621 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1622 assert_eq!(content.body(), "low0");
1623 }
1624
1625 {
1626 assert_eq!(pending[2].transaction_id, low1_txn);
1627
1628 let deserialized = pending[2].as_event().unwrap().deserialize()?;
1629 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1630 assert_eq!(content.body(), "low1");
1631 }
1632
1633 Ok(())
1634 }
1635
1636 async fn test_send_queue_dependents(&self) -> TestResult {
1637 let room_id = room_id!("!test_send_queue_dependents:localhost");
1638
1639 let txn0 = TransactionId::new();
1641 let event0 =
1642 SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into())?;
1643 self.save_send_queue_request(
1644 room_id,
1645 txn0.clone(),
1646 MilliSecondsSinceUnixEpoch::now(),
1647 event0.clone().into(),
1648 0,
1649 )
1650 .await?;
1651
1652 assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1654
1655 let child_txn = ChildTransactionId::new();
1657 self.save_dependent_queued_request(
1658 room_id,
1659 &txn0,
1660 child_txn.clone(),
1661 MilliSecondsSinceUnixEpoch::now(),
1662 DependentQueuedRequestKind::RedactEvent,
1663 )
1664 .await?;
1665
1666 let dependents = self.load_dependent_queued_requests(room_id).await?;
1668 assert_eq!(dependents.len(), 1);
1669 assert_eq!(dependents[0].parent_transaction_id, txn0);
1670 assert_eq!(dependents[0].own_transaction_id, child_txn);
1671 assert!(dependents[0].parent_key.is_none());
1672 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1673
1674 let (event, event_type) = event0.raw();
1676 let event_id = owned_event_id!("$1");
1677 let num_updated = self
1678 .mark_dependent_queued_requests_as_ready(
1679 room_id,
1680 &txn0,
1681 SentRequestKey::Event {
1682 event_id: event_id.clone(),
1683 event: event.clone(),
1684 event_type: event_type.to_owned(),
1685 },
1686 )
1687 .await?;
1688 assert_eq!(num_updated, 1);
1689
1690 let dependents = self.load_dependent_queued_requests(room_id).await?;
1692 assert_eq!(dependents.len(), 1);
1693 assert_eq!(dependents[0].parent_transaction_id, txn0);
1694 assert_eq!(dependents[0].own_transaction_id, child_txn);
1695 assert_matches!(
1696 dependents[0].parent_key.as_ref(),
1697 Some(SentRequestKey::Event {
1698 event_id: received_event_id,
1699 event: received_event,
1700 event_type: received_event_type
1701 }) => {
1702 assert_eq!(received_event_id, &event_id);
1703 assert_eq!(received_event.json().to_string(), event.json().to_string());
1704 assert_eq!(received_event_type.as_str(), event_type);
1705 }
1706 );
1707 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1708
1709 let removed = self
1711 .remove_dependent_queued_request(room_id, &dependents[0].own_transaction_id)
1712 .await?;
1713 assert!(removed);
1714
1715 assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1717
1718 let txn1 = TransactionId::new();
1721 let event1 =
1722 SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into())?;
1723 self.save_send_queue_request(
1724 room_id,
1725 txn1.clone(),
1726 MilliSecondsSinceUnixEpoch::now(),
1727 event1.into(),
1728 0,
1729 )
1730 .await?;
1731
1732 self.save_dependent_queued_request(
1733 room_id,
1734 &txn0,
1735 ChildTransactionId::new(),
1736 MilliSecondsSinceUnixEpoch::now(),
1737 DependentQueuedRequestKind::RedactEvent,
1738 )
1739 .await?;
1740 assert_eq!(self.load_dependent_queued_requests(room_id).await?.len(), 1);
1741
1742 self.save_dependent_queued_request(
1743 room_id,
1744 &txn1,
1745 ChildTransactionId::new(),
1746 MilliSecondsSinceUnixEpoch::now(),
1747 DependentQueuedRequestKind::EditEvent {
1748 new_content: SerializableEventContent::new(
1749 &RoomMessageEventContent::text_plain("edit").into(),
1750 )?,
1751 },
1752 )
1753 .await?;
1754 assert_eq!(self.load_dependent_queued_requests(room_id).await?.len(), 2);
1755
1756 let removed = self.remove_send_queue_request(room_id, &txn0).await?;
1758 assert!(removed);
1759
1760 let dependents = self.load_dependent_queued_requests(room_id).await?;
1762 assert_eq!(dependents.len(), 2);
1763
1764 Ok(())
1765 }
1766
1767 async fn test_update_send_queue_dependent(&self) -> TestResult {
1768 let room_id = room_id!("!test_send_queue_dependents:localhost");
1769
1770 let txn = TransactionId::new();
1771
1772 let child_txn = ChildTransactionId::new();
1774
1775 self.save_dependent_queued_request(
1776 room_id,
1777 &txn,
1778 child_txn.clone(),
1779 MilliSecondsSinceUnixEpoch::now(),
1780 DependentQueuedRequestKind::RedactEvent,
1781 )
1782 .await?;
1783
1784 let dependents = self.load_dependent_queued_requests(room_id).await?;
1786 assert_eq!(dependents.len(), 1);
1787 assert_eq!(dependents[0].parent_transaction_id, txn);
1788 assert_eq!(dependents[0].own_transaction_id, child_txn);
1789 assert!(dependents[0].parent_key.is_none());
1790 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1791
1792 self.update_dependent_queued_request(
1794 room_id,
1795 &child_txn,
1796 DependentQueuedRequestKind::ReactEvent { key: "👍".to_owned() },
1797 )
1798 .await?;
1799
1800 let dependents = self.load_dependent_queued_requests(room_id).await?;
1802 assert_eq!(dependents.len(), 1);
1803 assert_eq!(dependents[0].parent_transaction_id, txn);
1804 assert_eq!(dependents[0].own_transaction_id, child_txn);
1805 assert!(dependents[0].parent_key.is_none());
1806 assert_matches!(
1807 &dependents[0].kind,
1808 DependentQueuedRequestKind::ReactEvent { key } => {
1809 assert_eq!(key, "👍");
1810 }
1811 );
1812
1813 Ok(())
1814 }
1815
1816 async fn test_get_room_infos(&self) -> TestResult {
1817 let room_id_0 = room_id!("!r0");
1818 let room_id_1 = room_id!("!r1");
1819 let room_id_2 = room_id!("!r2");
1820
1821 {
1823 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await?.len(), 0);
1824 }
1825
1826 let mut changes = StateChanges::default();
1828 changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
1829 changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
1830 self.save_changes(&changes).await?;
1831
1832 {
1834 let mut all_rooms = self.get_room_infos(&RoomLoadSettings::All).await?;
1835
1836 all_rooms.sort_by(|a, b| a.room_id.cmp(&b.room_id));
1839
1840 assert_eq!(all_rooms.len(), 2);
1841 assert_eq!(all_rooms[0].room_id, room_id_0);
1842 assert_eq!(all_rooms[1].room_id, room_id_1);
1843 }
1844
1845 {
1847 let all_rooms =
1848 self.get_room_infos(&RoomLoadSettings::One(room_id_1.to_owned())).await?;
1849
1850 assert_eq!(all_rooms.len(), 1);
1851 assert_eq!(all_rooms[0].room_id, room_id_1);
1852 }
1853
1854 {
1857 let all_rooms =
1858 self.get_room_infos(&RoomLoadSettings::One(room_id_2.to_owned())).await?;
1859
1860 assert_eq!(all_rooms.len(), 0);
1861 }
1862
1863 Ok(())
1864 }
1865
1866 async fn test_thread_subscriptions(&self) -> TestResult {
1867 let first_thread = event_id!("$t1");
1868 let second_thread = event_id!("$t2");
1869
1870 let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1872 assert!(maybe_sub.is_none());
1873
1874 let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1875 assert!(maybe_sub.is_none());
1876
1877 self.upsert_thread_subscriptions(vec![(
1879 room_id(),
1880 first_thread,
1881 StoredThreadSubscription {
1882 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1883 bump_stamp: None,
1884 },
1885 )])
1886 .await?;
1887
1888 self.upsert_thread_subscriptions(vec![(
1889 room_id(),
1890 second_thread,
1891 StoredThreadSubscription {
1892 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1893 bump_stamp: None,
1894 },
1895 )])
1896 .await?;
1897
1898 let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1900 assert_eq!(
1901 maybe_sub,
1902 Some(StoredThreadSubscription {
1903 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
1904 bump_stamp: None,
1905 })
1906 );
1907
1908 let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1909 assert_eq!(
1910 maybe_sub,
1911 Some(StoredThreadSubscription {
1912 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1913 bump_stamp: None,
1914 })
1915 );
1916
1917 self.upsert_thread_subscriptions(vec![(
1919 room_id(),
1920 first_thread,
1921 StoredThreadSubscription {
1922 status: ThreadSubscriptionStatus::Unsubscribed,
1923 bump_stamp: None,
1924 },
1925 )])
1926 .await?;
1927
1928 let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1930 assert_eq!(
1931 maybe_sub,
1932 Some(StoredThreadSubscription {
1933 status: ThreadSubscriptionStatus::Unsubscribed,
1934 bump_stamp: None,
1935 })
1936 );
1937
1938 let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1940 assert_eq!(
1941 maybe_sub,
1942 Some(StoredThreadSubscription {
1943 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
1944 bump_stamp: None,
1945 })
1946 );
1947
1948 self.remove_thread_subscription(room_id(), second_thread).await?;
1950
1951 let maybe_sub = self.load_thread_subscription(room_id(), second_thread).await?;
1953 assert_eq!(maybe_sub, None);
1954
1955 let maybe_sub = self.load_thread_subscription(room_id(), first_thread).await?;
1957 assert_eq!(
1958 maybe_sub,
1959 Some(StoredThreadSubscription {
1960 status: ThreadSubscriptionStatus::Unsubscribed,
1961 bump_stamp: None,
1962 })
1963 );
1964
1965 self.remove_thread_subscription(room_id(), second_thread).await?;
1967
1968 Ok(())
1969 }
1970
1971 async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult {
1972 let threads = [
1973 event_id!("$t1"),
1974 event_id!("$t2"),
1975 event_id!("$t3"),
1976 event_id!("$t4"),
1977 event_id!("$t5"),
1978 event_id!("$t6"),
1979 ];
1980 let build_subscription_updates = |subs: &[StoredThreadSubscription]| {
1983 threads
1984 .iter()
1985 .zip(subs)
1986 .map(|(&event_id, &sub)| (room_id(), event_id, sub))
1987 .collect::<Vec<_>>()
1988 };
1989
1990 let initial_subscriptions = build_subscription_updates(&[
1992 StoredThreadSubscription {
1993 status: ThreadSubscriptionStatus::Unsubscribed,
1994 bump_stamp: None,
1995 },
1996 StoredThreadSubscription {
1997 status: ThreadSubscriptionStatus::Unsubscribed,
1998 bump_stamp: Some(14),
1999 },
2000 StoredThreadSubscription {
2001 status: ThreadSubscriptionStatus::Unsubscribed,
2002 bump_stamp: None,
2003 },
2004 StoredThreadSubscription {
2005 status: ThreadSubscriptionStatus::Unsubscribed,
2006 bump_stamp: Some(210),
2007 },
2008 StoredThreadSubscription {
2009 status: ThreadSubscriptionStatus::Unsubscribed,
2010 bump_stamp: Some(5),
2011 },
2012 StoredThreadSubscription {
2013 status: ThreadSubscriptionStatus::Unsubscribed,
2014 bump_stamp: Some(100),
2015 },
2016 ]);
2017
2018 let update_subscriptions = build_subscription_updates(&[
2019 StoredThreadSubscription {
2020 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2021 bump_stamp: None,
2022 },
2023 StoredThreadSubscription {
2024 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2025 bump_stamp: None,
2026 },
2027 StoredThreadSubscription {
2028 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2029 bump_stamp: Some(1101),
2030 },
2031 StoredThreadSubscription {
2032 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2033 bump_stamp: Some(222),
2034 },
2035 StoredThreadSubscription {
2036 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2037 bump_stamp: Some(1),
2038 },
2039 StoredThreadSubscription {
2040 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2041 bump_stamp: Some(100),
2042 },
2043 ]);
2044
2045 let expected_subscriptions = build_subscription_updates(&[
2046 StoredThreadSubscription {
2048 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2049 bump_stamp: None,
2050 },
2051 StoredThreadSubscription {
2053 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2054 bump_stamp: Some(14),
2055 },
2056 StoredThreadSubscription {
2058 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2059 bump_stamp: Some(1101),
2060 },
2061 StoredThreadSubscription {
2063 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2064 bump_stamp: Some(222),
2065 },
2066 StoredThreadSubscription {
2068 status: ThreadSubscriptionStatus::Unsubscribed,
2069 bump_stamp: Some(5),
2070 },
2071 StoredThreadSubscription {
2073 status: ThreadSubscriptionStatus::Unsubscribed,
2074 bump_stamp: Some(100),
2075 },
2076 ]);
2077
2078 self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?;
2080
2081 for (room_id, thread_id, expected_sub) in &initial_subscriptions {
2083 let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2084 assert_eq!(stored_subscription, Some(*expected_sub));
2085 }
2086
2087 self.upsert_thread_subscriptions(update_subscriptions).await?;
2089
2090 for (room_id, thread_id, expected_sub) in &expected_subscriptions {
2092 let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2093 assert_eq!(stored_subscription, Some(*expected_sub));
2094 }
2095
2096 for (room_id, thread_id, _) in &expected_subscriptions {
2098 self.remove_thread_subscription(room_id, thread_id).await?;
2099 }
2100
2101 let initial_subscriptions = build_subscription_updates(&[
2102 StoredThreadSubscription {
2103 status: ThreadSubscriptionStatus::Unsubscribed,
2104 bump_stamp: Some(1),
2105 },
2106 StoredThreadSubscription {
2107 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
2108 bump_stamp: Some(1),
2109 },
2110 StoredThreadSubscription {
2111 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2112 bump_stamp: Some(1),
2113 },
2114 ]);
2115
2116 self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?;
2117
2118 for (room_id, thread_id, expected_sub) in &initial_subscriptions {
2119 let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2120 assert_eq!(stored_subscription, Some(*expected_sub));
2121 }
2122
2123 let update_subscriptions = build_subscription_updates(&[
2124 StoredThreadSubscription {
2125 status: ThreadSubscriptionStatus::Subscribed { automatic: true },
2126 bump_stamp: Some(2),
2127 },
2128 StoredThreadSubscription {
2129 status: ThreadSubscriptionStatus::Unsubscribed,
2130 bump_stamp: Some(2),
2131 },
2132 StoredThreadSubscription {
2133 status: ThreadSubscriptionStatus::Subscribed { automatic: false },
2134 bump_stamp: Some(2),
2135 },
2136 ]);
2137
2138 self.upsert_thread_subscriptions(update_subscriptions.clone()).await?;
2139
2140 for (room_id, thread_id, expected_sub) in &update_subscriptions {
2141 let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
2142 assert_eq!(stored_subscription, Some(*expected_sub));
2143 }
2144
2145 Ok(())
2146 }
2147}
2148
2149#[allow(unused_macros, unused_extern_crates)]
2175#[macro_export]
2176macro_rules! statestore_integration_tests {
2177 () => {
2178 mod statestore_integration_tests {
2179 use matrix_sdk_test::{TestResult, async_test};
2180 use $crate::store::{IntoStateStore, StateStoreIntegrationTests};
2181
2182 use super::get_store;
2183
2184 #[async_test]
2185 async fn test_topic_redaction() -> TestResult {
2186 let store = get_store().await?.into_state_store();
2187 store.test_topic_redaction().await
2188 }
2189
2190 #[async_test]
2191 async fn test_populate_store() -> TestResult {
2192 let store = get_store().await?.into_state_store();
2193 store.test_populate_store().await
2194 }
2195
2196 #[async_test]
2197 async fn test_member_saving() -> TestResult {
2198 let store = get_store().await?.into_state_store();
2199 store.test_member_saving().await
2200 }
2201
2202 #[async_test]
2203 async fn test_filter_saving() -> TestResult {
2204 let store = get_store().await?.into_state_store();
2205 store.test_filter_saving().await
2206 }
2207
2208 #[async_test]
2209 async fn test_user_avatar_url_saving() -> TestResult {
2210 let store = get_store().await?.into_state_store();
2211 store.test_user_avatar_url_saving().await
2212 }
2213
2214 #[async_test]
2215 async fn test_supported_versions_saving() -> TestResult {
2216 let store = get_store().await?.into_state_store();
2217 store.test_supported_versions_saving().await
2218 }
2219
2220 #[async_test]
2221 async fn test_well_known_saving() -> TestResult {
2222 let store = get_store().await?.into_state_store();
2223 store.test_well_known_saving().await
2224 }
2225
2226 #[async_test]
2227 async fn test_sync_token_saving() -> TestResult {
2228 let store = get_store().await?.into_state_store();
2229 store.test_sync_token_saving().await
2230 }
2231
2232 #[async_test]
2233 async fn test_utd_hook_manager_data_saving() -> TestResult {
2234 let store = get_store().await?.into_state_store();
2235 store.test_utd_hook_manager_data_saving().await
2236 }
2237
2238 #[async_test]
2239 async fn test_one_time_key_already_uploaded_data_saving() -> TestResult {
2240 let store = get_store().await?.into_state_store();
2241 store.test_one_time_key_already_uploaded_data_saving().await
2242 }
2243
2244 #[async_test]
2245 async fn test_stripped_member_saving() -> TestResult {
2246 let store = get_store().await?.into_state_store();
2247 store.test_stripped_member_saving().await
2248 }
2249
2250 #[async_test]
2251 async fn test_power_level_saving() -> TestResult {
2252 let store = get_store().await?.into_state_store();
2253 store.test_power_level_saving().await
2254 }
2255
2256 #[async_test]
2257 async fn test_receipts_saving() -> TestResult {
2258 let store = get_store().await?.into_state_store();
2259 store.test_receipts_saving().await
2260 }
2261
2262 #[async_test]
2263 async fn test_custom_storage() -> TestResult {
2264 let store = get_store().await?.into_state_store();
2265 store.test_custom_storage().await
2266 }
2267
2268 #[async_test]
2269 async fn test_stripped_non_stripped() -> TestResult {
2270 let store = get_store().await?.into_state_store();
2271 store.test_stripped_non_stripped().await
2272 }
2273
2274 #[async_test]
2275 async fn test_room_removal() -> TestResult {
2276 let store = get_store().await?.into_state_store();
2277 store.test_room_removal().await
2278 }
2279
2280 #[async_test]
2281 async fn test_profile_removal() -> TestResult {
2282 let store = get_store().await?.into_state_store();
2283 store.test_profile_removal().await
2284 }
2285
2286 #[async_test]
2287 async fn test_presence_saving() -> TestResult {
2288 let store = get_store().await?.into_state_store();
2289 store.test_presence_saving().await
2290 }
2291
2292 #[async_test]
2293 async fn test_display_names_saving() -> TestResult {
2294 let store = get_store().await?.into_state_store();
2295 store.test_display_names_saving().await
2296 }
2297
2298 #[async_test]
2299 async fn test_send_queue() -> TestResult {
2300 let store = get_store().await?.into_state_store();
2301 store.test_send_queue().await
2302 }
2303
2304 #[async_test]
2305 async fn test_send_queue_priority() -> TestResult {
2306 let store = get_store().await?.into_state_store();
2307 store.test_send_queue_priority().await
2308 }
2309
2310 #[async_test]
2311 async fn test_send_queue_dependents() -> TestResult {
2312 let store = get_store().await?.into_state_store();
2313 store.test_send_queue_dependents().await
2314 }
2315
2316 #[async_test]
2317 async fn test_update_send_queue_dependent() -> TestResult {
2318 let store = get_store().await?.into_state_store();
2319 store.test_update_send_queue_dependent().await
2320 }
2321
2322 #[async_test]
2323 async fn test_get_room_infos() -> TestResult {
2324 let store = get_store().await?.into_state_store();
2325 store.test_get_room_infos().await
2326 }
2327
2328 #[async_test]
2329 async fn test_thread_subscriptions() -> TestResult {
2330 let store = get_store().await?.into_state_store();
2331 store.test_thread_subscriptions().await
2332 }
2333
2334 #[async_test]
2335 async fn test_thread_subscriptions_bulk_upsert() -> TestResult {
2336 let store = get_store().await?.into_state_store();
2337 store.test_thread_subscriptions_bulk_upsert().await
2338 }
2339 }
2340 };
2341}
2342
2343fn user_id() -> &'static UserId {
2344 user_id!("@example:localhost")
2345}
2346
2347fn invited_user_id() -> &'static UserId {
2348 user_id!("@invited:localhost")
2349}
2350
2351fn room_id() -> &'static RoomId {
2352 room_id!("!test:localhost")
2353}
2354
2355fn stripped_room_id() -> &'static RoomId {
2356 room_id!("!stripped:localhost")
2357}
2358
2359fn first_receipt_event_id() -> &'static EventId {
2360 event_id!("$example")
2361}
2362
2363fn topic_event_id() -> &'static EventId {
2364 event_id!("$topic_event")
2365}
2366
2367fn power_level_event() -> Raw<AnySyncStateEvent> {
2368 let content = RoomPowerLevelsEventContent::new(&AuthorizationRules::V1);
2369
2370 let event = json!({
2371 "event_id": "$h29iv0s8:example.com",
2372 "content": content,
2373 "sender": user_id(),
2374 "type": "m.room.power_levels",
2375 "origin_server_ts": 0u64,
2376 "state_key": "",
2377 });
2378
2379 serde_json::from_value(event).unwrap()
2380}
2381
2382fn stripped_membership_event() -> Raw<StrippedRoomMemberEvent> {
2383 custom_stripped_membership_event(user_id())
2384}
2385
2386fn custom_stripped_membership_event(user_id: &UserId) -> Raw<StrippedRoomMemberEvent> {
2387 let ev_json = json!({
2388 "type": "m.room.member",
2389 "content": RoomMemberEventContent::new(MembershipState::Join),
2390 "sender": user_id,
2391 "state_key": user_id,
2392 });
2393
2394 Raw::new(&ev_json).unwrap().cast_unchecked()
2395}
2396
2397fn membership_event() -> Raw<SyncRoomMemberEvent> {
2398 custom_membership_event(user_id(), event_id!("$h29iv0s8:example.com"))
2399}
2400
2401fn custom_membership_event(user_id: &UserId, event_id: &EventId) -> Raw<SyncRoomMemberEvent> {
2402 let ev_json = json!({
2403 "type": "m.room.member",
2404 "content": RoomMemberEventContent::new(MembershipState::Join),
2405 "event_id": event_id,
2406 "origin_server_ts": 198,
2407 "sender": user_id,
2408 "state_key": user_id,
2409 });
2410
2411 Raw::new(&ev_json).unwrap().cast_unchecked()
2412}
2413
2414fn custom_presence_event(user_id: &UserId) -> Raw<PresenceEvent> {
2415 let ev_json = json!({
2416 "content": {
2417 "presence": "online"
2418 },
2419 "sender": user_id,
2420 });
2421
2422 Raw::new(&ev_json).unwrap().cast_unchecked()
2423}