1use std::collections::{BTreeMap, BTreeSet, HashMap};
4
5use assert_matches::assert_matches;
6use assert_matches2::assert_let;
7use async_trait::async_trait;
8use growable_bloom_filter::GrowableBloomBuilder;
9use matrix_sdk_test::test_json;
10use ruma::{
11 api::MatrixVersion,
12 event_id,
13 events::{
14 presence::PresenceEvent,
15 receipt::{ReceiptThread, ReceiptType},
16 room::{
17 member::{
18 MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent,
19 SyncRoomMemberEvent,
20 },
21 message::RoomMessageEventContent,
22 power_levels::RoomPowerLevelsEventContent,
23 topic::RoomTopicEventContent,
24 },
25 AnyEphemeralRoomEventContent, AnyGlobalAccountDataEvent, AnyMessageLikeEventContent,
26 AnyRoomAccountDataEvent, AnyStrippedStateEvent, AnySyncEphemeralRoomEvent,
27 AnySyncStateEvent, GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
28 SyncStateEvent,
29 },
30 owned_event_id, owned_mxc_uri, room_id,
31 serde::Raw,
32 uint, user_id, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId, RoomId,
33 TransactionId, UserId,
34};
35use serde_json::{json, value::Value as JsonValue};
36
37use super::{
38 send_queue::SentRequestKey, DependentQueuedRequestKind, DisplayName, DynStateStore,
39 ServerCapabilities,
40};
41use crate::{
42 deserialized_responses::MemberEvent,
43 store::{ChildTransactionId, QueueWedgeError, Result, SerializableEventContent, StateStoreExt},
44 RoomInfo, RoomMemberships, RoomState, StateChanges, StateStoreDataKey, StateStoreDataValue,
45};
46
47#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
52#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
53pub trait StateStoreIntegrationTests {
54 async fn populate(&self) -> Result<()>;
56 async fn test_topic_redaction(&self) -> Result<()>;
58 async fn test_populate_store(&self) -> Result<()>;
60 async fn test_member_saving(&self);
62 async fn test_filter_saving(&self);
64 async fn test_user_avatar_url_saving(&self);
66 async fn test_sync_token_saving(&self);
68 async fn test_utd_hook_manager_data_saving(&self);
70 async fn test_stripped_member_saving(&self);
72 async fn test_power_level_saving(&self);
74 async fn test_receipts_saving(&self);
76 async fn test_custom_storage(&self) -> Result<()>;
78 async fn test_stripped_non_stripped(&self) -> Result<()>;
80 async fn test_room_removal(&self) -> Result<()>;
82 async fn test_profile_removal(&self) -> Result<()>;
84 async fn test_presence_saving(&self);
86 async fn test_display_names_saving(&self);
88 async fn test_send_queue(&self);
90 async fn test_send_queue_priority(&self);
92 async fn test_send_queue_dependents(&self);
94 async fn test_update_send_queue_dependent(&self);
96 async fn test_server_capabilities_saving(&self);
98}
99
100#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
101#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
102impl StateStoreIntegrationTests for DynStateStore {
103 async fn populate(&self) -> Result<()> {
104 let mut changes = StateChanges::default();
105
106 let user_id = user_id();
107 let invited_user_id = invited_user_id();
108 let room_id = room_id();
109 let stripped_room_id = stripped_room_id();
110
111 changes.sync_token = Some("t392-516_47314_0_7_1_1_1_11444_1".to_owned());
112
113 let presence_json: &JsonValue = &test_json::PRESENCE;
114 let presence_raw =
115 serde_json::from_value::<Raw<PresenceEvent>>(presence_json.clone()).unwrap();
116 let presence_event = presence_raw.deserialize().unwrap();
117 changes.add_presence_event(presence_event, presence_raw);
118
119 let pushrules_json: &JsonValue = &test_json::PUSH_RULES;
120 let pushrules_raw =
121 serde_json::from_value::<Raw<AnyGlobalAccountDataEvent>>(pushrules_json.clone())
122 .unwrap();
123 let pushrules_event = pushrules_raw.deserialize().unwrap();
124 changes.account_data.insert(pushrules_event.event_type(), pushrules_raw);
125
126 let mut room = RoomInfo::new(room_id, RoomState::Joined);
127 room.mark_as_left();
128
129 let tag_json: &JsonValue = &test_json::TAG;
130 let tag_raw =
131 serde_json::from_value::<Raw<AnyRoomAccountDataEvent>>(tag_json.clone()).unwrap();
132 let tag_event = tag_raw.deserialize().unwrap();
133 changes.add_room_account_data(room_id, tag_event, tag_raw);
134
135 let name_json: &JsonValue = &test_json::NAME;
136 let name_raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(name_json.clone()).unwrap();
137 let name_event = name_raw.deserialize().unwrap();
138 room.handle_state_event(&name_event);
139 changes.add_state_event(room_id, name_event, name_raw);
140
141 let topic_json: &JsonValue = &test_json::TOPIC;
142 let topic_raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(topic_json.clone())
143 .expect("can create sync-state-event for topic");
144 let topic_event = topic_raw.deserialize().expect("can deserialize raw topic");
145 room.handle_state_event(&topic_event);
146 changes.add_state_event(room_id, topic_event, topic_raw);
147
148 let mut room_ambiguity_map = HashMap::new();
149 let mut room_profiles = BTreeMap::new();
150
151 let member_json: &JsonValue = &test_json::MEMBER;
152 let member_event: SyncRoomMemberEvent =
153 serde_json::from_value(member_json.clone()).unwrap();
154 let displayname = DisplayName::new(
155 member_event.as_original().unwrap().content.displayname.as_ref().unwrap(),
156 );
157 room_ambiguity_map.insert(displayname.clone(), BTreeSet::from([user_id.to_owned()]));
158 room_profiles.insert(user_id.to_owned(), (&member_event).into());
159
160 let member_state_raw =
161 serde_json::from_value::<Raw<AnySyncStateEvent>>(member_json.clone()).unwrap();
162 let member_state_event = member_state_raw.deserialize().unwrap();
163 changes.add_state_event(room_id, member_state_event, member_state_raw);
164
165 let invited_member_json: &JsonValue = &test_json::MEMBER_INVITE;
166 let invited_member_event: SyncRoomMemberEvent =
168 serde_json::from_value(invited_member_json.clone()).unwrap();
169 room_ambiguity_map.entry(displayname).or_default().insert(invited_user_id.to_owned());
170 room_profiles.insert(invited_user_id.to_owned(), (&invited_member_event).into());
171
172 let invited_member_state_raw =
173 serde_json::from_value::<Raw<AnySyncStateEvent>>(invited_member_json.clone()).unwrap();
174 let invited_member_state_event = invited_member_state_raw.deserialize().unwrap();
175 changes.add_state_event(room_id, invited_member_state_event, invited_member_state_raw);
176
177 let receipt_json: &JsonValue = &test_json::READ_RECEIPT;
178 let receipt_event =
179 serde_json::from_value::<AnySyncEphemeralRoomEvent>(receipt_json.clone()).unwrap();
180 let receipt_content = match receipt_event.content() {
181 AnyEphemeralRoomEventContent::Receipt(content) => content,
182 _ => panic!(),
183 };
184 changes.add_receipts(room_id, receipt_content);
185
186 changes.ambiguity_maps.insert(room_id.to_owned(), room_ambiguity_map);
187 changes.profiles.insert(room_id.to_owned(), room_profiles);
188 changes.add_room(room);
189
190 let mut stripped_room = RoomInfo::new(stripped_room_id, RoomState::Invited);
191
192 let stripped_name_json: &JsonValue = &test_json::NAME_STRIPPED;
193 let stripped_name_raw =
194 serde_json::from_value::<Raw<AnyStrippedStateEvent>>(stripped_name_json.clone())
195 .unwrap();
196 let stripped_name_event = stripped_name_raw.deserialize().unwrap();
197 stripped_room.handle_stripped_state_event(&stripped_name_event);
198 changes.stripped_state.insert(
199 stripped_room_id.to_owned(),
200 BTreeMap::from([(
201 stripped_name_event.event_type(),
202 BTreeMap::from([(
203 stripped_name_event.state_key().to_owned(),
204 stripped_name_raw.clone(),
205 )]),
206 )]),
207 );
208
209 changes.add_room(stripped_room);
210
211 let stripped_member_json: &JsonValue = &test_json::MEMBER_STRIPPED;
212 let stripped_member_event = Raw::new(&stripped_member_json.clone()).unwrap().cast();
213 changes.add_stripped_member(stripped_room_id, user_id, stripped_member_event);
214
215 self.save_changes(&changes).await?;
216
217 Ok(())
218 }
219
220 async fn test_topic_redaction(&self) -> Result<()> {
221 let room_id = room_id();
222 self.populate().await?;
223
224 assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
225 assert_eq!(
226 self.get_state_event_static::<RoomTopicEventContent>(room_id)
227 .await?
228 .expect("room topic found before redaction")
229 .deserialize()
230 .expect("can deserialize room topic before redaction")
231 .as_sync()
232 .expect("room topic is a sync state event")
233 .as_original()
234 .expect("room topic is not redacted yet")
235 .content
236 .topic,
237 "😀"
238 );
239
240 let mut changes = StateChanges::default();
241
242 let redaction_json: &JsonValue = &test_json::TOPIC_REDACTION;
243 let redaction_evt: Raw<_> = serde_json::from_value(redaction_json.clone())
244 .expect("topic redaction event making works");
245 let redacted_event_id: OwnedEventId = redaction_evt.get_field("redacts").unwrap().unwrap();
246
247 changes.add_redaction(room_id, &redacted_event_id, redaction_evt);
248 self.save_changes(&changes).await?;
249
250 let redacted_event = self
251 .get_state_event_static::<RoomTopicEventContent>(room_id)
252 .await?
253 .expect("room topic found after redaction")
254 .deserialize()
255 .expect("can deserialize room topic after redaction");
256
257 assert_matches!(redacted_event.as_sync(), Some(SyncStateEvent::Redacted(_)));
258
259 Ok(())
260 }
261
262 async fn test_populate_store(&self) -> Result<()> {
263 let room_id = room_id();
264 let user_id = user_id();
265 let display_name = DisplayName::new("example");
266
267 self.populate().await?;
268
269 assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
270 assert!(self.get_presence_event(user_id).await?.is_some());
271 assert_eq!(self.get_room_infos().await?.len(), 2, "Expected to find 2 room infos");
272 assert!(self
273 .get_account_data_event(GlobalAccountDataEventType::PushRules)
274 .await?
275 .is_some());
276
277 assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_some());
278 assert_eq!(
279 self.get_state_events(room_id, StateEventType::RoomTopic).await?.len(),
280 1,
281 "Expected to find 1 room topic"
282 );
283 assert!(self.get_profile(room_id, user_id).await?.is_some());
284 assert!(self.get_member_event(room_id, user_id).await?.is_some());
285 assert_eq!(
286 self.get_user_ids(room_id, RoomMemberships::empty()).await?.len(),
287 2,
288 "Expected to find 2 members for room"
289 );
290 assert_eq!(
291 self.get_user_ids(room_id, RoomMemberships::INVITE).await?.len(),
292 1,
293 "Expected to find 1 invited user ids"
294 );
295 assert_eq!(
296 self.get_user_ids(room_id, RoomMemberships::JOIN).await?.len(),
297 1,
298 "Expected to find 1 joined user ids"
299 );
300 assert_eq!(
301 self.get_users_with_display_name(room_id, &display_name).await?.len(),
302 2,
303 "Expected to find 2 display names for room"
304 );
305 assert!(self
306 .get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
307 .await?
308 .is_some());
309 assert!(self
310 .get_user_room_receipt_event(
311 room_id,
312 ReceiptType::Read,
313 ReceiptThread::Unthreaded,
314 user_id
315 )
316 .await?
317 .is_some());
318 assert_eq!(
319 self.get_event_room_receipt_events(
320 room_id,
321 ReceiptType::Read,
322 ReceiptThread::Unthreaded,
323 first_receipt_event_id()
324 )
325 .await?
326 .len(),
327 1,
328 "Expected to find 1 read receipt"
329 );
330 Ok(())
331 }
332
333 async fn test_member_saving(&self) {
334 let room_id = room_id!("!test_member_saving:localhost");
335 let user_id = user_id();
336 let second_user_id = user_id!("@second:localhost");
337 let third_user_id = user_id!("@third:localhost");
338 let unknown_user_id = user_id!("@unknown:localhost");
339
340 let mut user_ids = vec![user_id.to_owned()];
342 assert!(self.get_member_event(room_id, user_id).await.unwrap().is_none());
343 let member_events = self
344 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
345 .await;
346 assert!(member_events.unwrap().is_empty());
347 assert!(self.get_profile(room_id, user_id).await.unwrap().is_none());
348 let profiles = self.get_profiles(room_id, &user_ids).await;
349 assert!(profiles.unwrap().is_empty());
350
351 let mut changes = StateChanges::default();
353 let raw_member_event = membership_event();
354 let profile = raw_member_event.deserialize().unwrap().into();
355 changes
356 .state
357 .entry(room_id.to_owned())
358 .or_default()
359 .entry(StateEventType::RoomMember)
360 .or_default()
361 .insert(user_id.into(), raw_member_event.cast());
362 changes.profiles.entry(room_id.to_owned()).or_default().insert(user_id.to_owned(), profile);
363 self.save_changes(&changes).await.unwrap();
364
365 assert!(self.get_member_event(room_id, user_id).await.unwrap().is_some());
366 let member_events = self
367 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
368 .await;
369 assert_eq!(member_events.unwrap().len(), 1);
370 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await.unwrap();
371 assert_eq!(members.len(), 1, "We expected to find members for the room");
372 assert!(self.get_profile(room_id, user_id).await.unwrap().is_some());
373 let profiles = self.get_profiles(room_id, &user_ids).await;
374 assert_eq!(profiles.unwrap().len(), 1);
375
376 let mut changes = StateChanges::default();
378 let changes_members = changes
379 .state
380 .entry(room_id.to_owned())
381 .or_default()
382 .entry(StateEventType::RoomMember)
383 .or_default();
384 let changes_profiles = changes.profiles.entry(room_id.to_owned()).or_default();
385 let raw_second_member_event =
386 custom_membership_event(second_user_id, event_id!("$second_member_event"));
387 let second_profile = raw_second_member_event.deserialize().unwrap().into();
388 changes_members.insert(second_user_id.into(), raw_second_member_event.cast());
389 changes_profiles.insert(second_user_id.to_owned(), second_profile);
390 let raw_third_member_event =
391 custom_membership_event(third_user_id, event_id!("$third_member_event"));
392 let third_profile = raw_third_member_event.deserialize().unwrap().into();
393 changes_members.insert(third_user_id.into(), raw_third_member_event.cast());
394 changes_profiles.insert(third_user_id.to_owned(), third_profile);
395 self.save_changes(&changes).await.unwrap();
396
397 user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
398 assert!(self.get_member_event(room_id, second_user_id).await.unwrap().is_some());
399 assert!(self.get_member_event(room_id, third_user_id).await.unwrap().is_some());
400 let member_events = self
401 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
402 .await;
403 assert_eq!(member_events.unwrap().len(), 3);
404 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await.unwrap();
405 assert_eq!(members.len(), 3, "We expected to find members for the room");
406 assert!(self.get_profile(room_id, second_user_id).await.unwrap().is_some());
407 assert!(self.get_profile(room_id, third_user_id).await.unwrap().is_some());
408 let profiles = self.get_profiles(room_id, &user_ids).await;
409 assert_eq!(profiles.unwrap().len(), 3);
410
411 user_ids.push(unknown_user_id.to_owned());
413 let member_events = self
414 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
415 .await;
416 assert_eq!(member_events.unwrap().len(), 3);
417 let profiles = self.get_profiles(room_id, &user_ids).await;
418 assert_eq!(profiles.unwrap().len(), 3);
419
420 let member_events = self
422 .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
423 room_id,
424 &[],
425 )
426 .await;
427 assert!(member_events.unwrap().is_empty());
428 let profiles = self.get_profiles(room_id, &[]).await;
429 assert!(profiles.unwrap().is_empty());
430 }
431
432 async fn test_filter_saving(&self) {
433 let filter_name = "filter_name";
434 let filter_id = "filter_id_1234";
435
436 self.set_kv_data(
437 StateStoreDataKey::Filter(filter_name),
438 StateStoreDataValue::Filter(filter_id.to_owned()),
439 )
440 .await
441 .unwrap();
442 assert_let!(
443 Ok(Some(StateStoreDataValue::Filter(stored_filter_id))) =
444 self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await
445 );
446 assert_eq!(stored_filter_id, filter_id);
447
448 self.remove_kv_data(StateStoreDataKey::Filter(filter_name)).await.unwrap();
449 assert_matches!(self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await, Ok(None));
450 }
451
452 async fn test_user_avatar_url_saving(&self) {
453 let user_id = user_id!("@alice:example.org");
454 let url = owned_mxc_uri!("mxc://example.org/poiuyt098");
455
456 self.set_kv_data(
457 StateStoreDataKey::UserAvatarUrl(user_id),
458 StateStoreDataValue::UserAvatarUrl(url.clone()),
459 )
460 .await
461 .unwrap();
462
463 assert_let!(
464 Ok(Some(StateStoreDataValue::UserAvatarUrl(stored_url))) =
465 self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await
466 );
467 assert_eq!(stored_url, url);
468
469 self.remove_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await.unwrap();
470 assert_matches!(
471 self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await,
472 Ok(None)
473 );
474 }
475
476 async fn test_server_capabilities_saving(&self) {
477 let versions = &[MatrixVersion::V1_1, MatrixVersion::V1_2, MatrixVersion::V1_11];
478 let server_caps = ServerCapabilities::new(
479 versions,
480 [("org.matrix.experimental".to_owned(), true)].into(),
481 );
482
483 self.set_kv_data(
484 StateStoreDataKey::ServerCapabilities,
485 StateStoreDataValue::ServerCapabilities(server_caps.clone()),
486 )
487 .await
488 .unwrap();
489
490 assert_let!(
491 Ok(Some(StateStoreDataValue::ServerCapabilities(stored_caps))) =
492 self.get_kv_data(StateStoreDataKey::ServerCapabilities).await
493 );
494 assert_eq!(stored_caps, server_caps);
495
496 let (stored_versions, stored_features) = stored_caps.maybe_decode().unwrap();
497
498 assert_eq!(stored_versions, versions);
499 assert_eq!(stored_features.len(), 1);
500 assert_eq!(stored_features.get("org.matrix.experimental"), Some(&true));
501
502 self.remove_kv_data(StateStoreDataKey::ServerCapabilities).await.unwrap();
503 assert_matches!(self.get_kv_data(StateStoreDataKey::ServerCapabilities).await, Ok(None));
504 }
505
506 async fn test_sync_token_saving(&self) {
507 let sync_token_1 = "t392-516_47314_0_7_1";
508 let sync_token_2 = "t392-516_47314_0_7_2";
509
510 assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
511
512 let changes =
513 StateChanges { sync_token: Some(sync_token_1.to_owned()), ..Default::default() };
514 self.save_changes(&changes).await.unwrap();
515 assert_let!(
516 Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
517 self.get_kv_data(StateStoreDataKey::SyncToken).await
518 );
519 assert_eq!(stored_sync_token, sync_token_1);
520
521 self.set_kv_data(
522 StateStoreDataKey::SyncToken,
523 StateStoreDataValue::SyncToken(sync_token_2.to_owned()),
524 )
525 .await
526 .unwrap();
527 assert_let!(
528 Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
529 self.get_kv_data(StateStoreDataKey::SyncToken).await
530 );
531 assert_eq!(stored_sync_token, sync_token_2);
532
533 self.remove_kv_data(StateStoreDataKey::SyncToken).await.unwrap();
534 assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
535 }
536
537 async fn test_utd_hook_manager_data_saving(&self) {
538 assert!(
540 self.get_kv_data(StateStoreDataKey::UtdHookManagerData)
541 .await
542 .expect("Could not read data")
543 .is_none(),
544 "Store was not empty at start"
545 );
546
547 let data = GrowableBloomBuilder::new().build();
549 self.set_kv_data(
550 StateStoreDataKey::UtdHookManagerData,
551 StateStoreDataValue::UtdHookManagerData(data.clone()),
552 )
553 .await
554 .expect("Could not save data");
555
556 let read_data = self
558 .get_kv_data(StateStoreDataKey::UtdHookManagerData)
559 .await
560 .expect("Could not read data")
561 .expect("no data found")
562 .into_utd_hook_manager_data()
563 .expect("not UtdHookManagerData");
564
565 assert_eq!(read_data, data);
566 }
567
568 async fn test_stripped_member_saving(&self) {
569 let room_id = room_id!("!test_stripped_member_saving:localhost");
570 let user_id = user_id();
571 let second_user_id = user_id!("@second:localhost");
572 let third_user_id = user_id!("@third:localhost");
573 let unknown_user_id = user_id!("@unknown:localhost");
574
575 assert!(self.get_member_event(room_id, user_id).await.unwrap().is_none());
577 let member_events = self
578 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
579 room_id,
580 &[user_id.to_owned()],
581 )
582 .await;
583 assert!(member_events.unwrap().is_empty());
584
585 let mut changes = StateChanges::default();
587 changes
588 .stripped_state
589 .entry(room_id.to_owned())
590 .or_default()
591 .entry(StateEventType::RoomMember)
592 .or_default()
593 .insert(user_id.into(), stripped_membership_event().cast());
594 self.save_changes(&changes).await.unwrap();
595
596 assert!(self.get_member_event(room_id, user_id).await.unwrap().is_some());
597 let member_events = self
598 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
599 room_id,
600 &[user_id.to_owned()],
601 )
602 .await;
603 assert_eq!(member_events.unwrap().len(), 1);
604 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await.unwrap();
605 assert_eq!(members.len(), 1, "We expected to find members for the room");
606
607 let mut changes = StateChanges::default();
609 let changes_members = changes
610 .stripped_state
611 .entry(room_id.to_owned())
612 .or_default()
613 .entry(StateEventType::RoomMember)
614 .or_default();
615 changes_members
616 .insert(second_user_id.into(), custom_stripped_membership_event(second_user_id).cast());
617 changes_members
618 .insert(third_user_id.into(), custom_stripped_membership_event(third_user_id).cast());
619 self.save_changes(&changes).await.unwrap();
620
621 assert!(self.get_member_event(room_id, second_user_id).await.unwrap().is_some());
622 assert!(self.get_member_event(room_id, third_user_id).await.unwrap().is_some());
623 let member_events = self
624 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
625 room_id,
626 &[user_id.to_owned(), second_user_id.to_owned(), third_user_id.to_owned()],
627 )
628 .await;
629 assert_eq!(member_events.unwrap().len(), 3);
630 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await.unwrap();
631 assert_eq!(members.len(), 3, "We expected to find members for the room");
632
633 let member_events = self
635 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
636 room_id,
637 &[
638 user_id.to_owned(),
639 second_user_id.to_owned(),
640 third_user_id.to_owned(),
641 unknown_user_id.to_owned(),
642 ],
643 )
644 .await;
645 assert_eq!(member_events.unwrap().len(), 3);
646
647 let member_events = self
649 .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
650 room_id,
651 &[],
652 )
653 .await;
654 assert!(member_events.unwrap().is_empty());
655 }
656
657 async fn test_power_level_saving(&self) {
658 let room_id = room_id!("!test_power_level_saving:localhost");
659
660 let raw_event = power_level_event();
661 let event = raw_event.deserialize().unwrap();
662
663 assert!(self
664 .get_state_event(room_id, StateEventType::RoomPowerLevels, "")
665 .await
666 .unwrap()
667 .is_none());
668 let mut changes = StateChanges::default();
669 changes.add_state_event(room_id, event, raw_event);
670
671 self.save_changes(&changes).await.unwrap();
672 assert!(self
673 .get_state_event(room_id, StateEventType::RoomPowerLevels, "")
674 .await
675 .unwrap()
676 .is_some());
677 }
678
679 async fn test_receipts_saving(&self) {
680 let room_id = room_id!("!test_receipts_saving:localhost");
681
682 let first_event_id = event_id!("$1435641916114394fHBLK:matrix.org");
683 let second_event_id = event_id!("$fHBLK1435641916114394:matrix.org");
684
685 let first_receipt_ts = uint!(1436451550);
686 let second_receipt_ts = uint!(1436451653);
687 let third_receipt_ts = uint!(1436474532);
688
689 let first_receipt_event = serde_json::from_value(json!({
690 first_event_id: {
691 "m.read": {
692 user_id(): {
693 "ts": first_receipt_ts,
694 }
695 }
696 }
697 }))
698 .expect("json creation failed");
699
700 let second_receipt_event = serde_json::from_value(json!({
701 second_event_id: {
702 "m.read": {
703 user_id(): {
704 "ts": second_receipt_ts,
705 }
706 }
707 }
708 }))
709 .expect("json creation failed");
710
711 let third_receipt_event = serde_json::from_value(json!({
712 second_event_id: {
713 "m.read": {
714 user_id(): {
715 "ts": third_receipt_ts,
716 "thread_id": "main",
717 }
718 }
719 }
720 }))
721 .expect("json creation failed");
722
723 assert!(self
724 .get_user_room_receipt_event(
725 room_id,
726 ReceiptType::Read,
727 ReceiptThread::Unthreaded,
728 user_id()
729 )
730 .await
731 .expect("failed to read unthreaded user room receipt")
732 .is_none());
733 assert!(self
734 .get_event_room_receipt_events(
735 room_id,
736 ReceiptType::Read,
737 ReceiptThread::Unthreaded,
738 first_event_id
739 )
740 .await
741 .expect("failed to read unthreaded event room receipt for 1")
742 .is_empty());
743 assert!(self
744 .get_event_room_receipt_events(
745 room_id,
746 ReceiptType::Read,
747 ReceiptThread::Unthreaded,
748 second_event_id
749 )
750 .await
751 .expect("failed to read unthreaded event room receipt for 2")
752 .is_empty());
753
754 let mut changes = StateChanges::default();
755 changes.add_receipts(room_id, first_receipt_event);
756
757 self.save_changes(&changes).await.expect("writing changes fauked");
758 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
759 .get_user_room_receipt_event(
760 room_id,
761 ReceiptType::Read,
762 ReceiptThread::Unthreaded,
763 user_id(),
764 )
765 .await
766 .expect("failed to read unthreaded user room receipt after save")
767 .unwrap();
768 assert_eq!(unthreaded_user_receipt_event_id, first_event_id);
769 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, first_receipt_ts);
770 let first_event_unthreaded_receipts = self
771 .get_event_room_receipt_events(
772 room_id,
773 ReceiptType::Read,
774 ReceiptThread::Unthreaded,
775 first_event_id,
776 )
777 .await
778 .expect("failed to read unthreaded event room receipt for 1 after save");
779 assert_eq!(
780 first_event_unthreaded_receipts.len(),
781 1,
782 "Found a wrong number of unthreaded receipts for 1 after save"
783 );
784 assert_eq!(first_event_unthreaded_receipts[0].0, user_id());
785 assert_eq!(first_event_unthreaded_receipts[0].1.ts.unwrap().0, first_receipt_ts);
786 assert!(self
787 .get_event_room_receipt_events(
788 room_id,
789 ReceiptType::Read,
790 ReceiptThread::Unthreaded,
791 second_event_id
792 )
793 .await
794 .expect("failed to read unthreaded event room receipt for 2 after save")
795 .is_empty());
796
797 let mut changes = StateChanges::default();
798 changes.add_receipts(room_id, second_receipt_event);
799
800 self.save_changes(&changes).await.expect("Saving works");
801 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
802 .get_user_room_receipt_event(
803 room_id,
804 ReceiptType::Read,
805 ReceiptThread::Unthreaded,
806 user_id(),
807 )
808 .await
809 .expect("Getting unthreaded user room receipt after save failed")
810 .unwrap();
811 assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
812 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
813 assert!(self
814 .get_event_room_receipt_events(
815 room_id,
816 ReceiptType::Read,
817 ReceiptThread::Unthreaded,
818 first_event_id
819 )
820 .await
821 .expect("Getting unthreaded event room receipt events for first event failed")
822 .is_empty());
823 let second_event_unthreaded_receipts = self
824 .get_event_room_receipt_events(
825 room_id,
826 ReceiptType::Read,
827 ReceiptThread::Unthreaded,
828 second_event_id,
829 )
830 .await
831 .expect("Getting unthreaded event room receipt events for second event failed");
832 assert_eq!(
833 second_event_unthreaded_receipts.len(),
834 1,
835 "Found a wrong number of unthreaded receipts for second event after save"
836 );
837 assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
838 assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
839
840 assert!(self
841 .get_user_room_receipt_event(room_id, ReceiptType::Read, ReceiptThread::Main, user_id())
842 .await
843 .expect("failed to read threaded user room receipt")
844 .is_none());
845 assert!(self
846 .get_event_room_receipt_events(
847 room_id,
848 ReceiptType::Read,
849 ReceiptThread::Main,
850 second_event_id
851 )
852 .await
853 .expect("Getting threaded event room receipts for 2 failed")
854 .is_empty());
855
856 let mut changes = StateChanges::default();
857 changes.add_receipts(room_id, third_receipt_event);
858
859 self.save_changes(&changes).await.expect("Saving works");
860 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
862 .get_user_room_receipt_event(
863 room_id,
864 ReceiptType::Read,
865 ReceiptThread::Unthreaded,
866 user_id(),
867 )
868 .await
869 .expect("Getting unthreaded user room receipt after save failed")
870 .unwrap();
871 assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
872 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
873 let second_event_unthreaded_receipts = self
874 .get_event_room_receipt_events(
875 room_id,
876 ReceiptType::Read,
877 ReceiptThread::Unthreaded,
878 second_event_id,
879 )
880 .await
881 .expect("Getting unthreaded event room receipt events for second event failed");
882 assert_eq!(
883 second_event_unthreaded_receipts.len(),
884 1,
885 "Found a wrong number of unthreaded receipts for second event after save"
886 );
887 assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
888 assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
889 let (threaded_user_receipt_event_id, threaded_user_receipt) = self
891 .get_user_room_receipt_event(room_id, ReceiptType::Read, ReceiptThread::Main, user_id())
892 .await
893 .expect("Getting threaded user room receipt after save failed")
894 .unwrap();
895 assert_eq!(threaded_user_receipt_event_id, second_event_id);
896 assert_eq!(threaded_user_receipt.ts.unwrap().0, third_receipt_ts);
897 let second_event_threaded_receipts = self
898 .get_event_room_receipt_events(
899 room_id,
900 ReceiptType::Read,
901 ReceiptThread::Main,
902 second_event_id,
903 )
904 .await
905 .expect("Getting threaded event room receipt events for second event failed");
906 assert_eq!(
907 second_event_threaded_receipts.len(),
908 1,
909 "Found a wrong number of threaded receipts for second event after save"
910 );
911 assert_eq!(second_event_threaded_receipts[0].0, user_id());
912 assert_eq!(second_event_threaded_receipts[0].1.ts.unwrap().0, third_receipt_ts);
913 }
914
915 async fn test_custom_storage(&self) -> Result<()> {
916 let key = "my_key";
917 let value = &[0, 1, 2, 3];
918
919 self.set_custom_value(key.as_bytes(), value.to_vec()).await?;
920
921 let read = self.get_custom_value(key.as_bytes()).await?;
922
923 assert_eq!(Some(value.as_ref()), read.as_deref());
924
925 Ok(())
926 }
927
928 async fn test_stripped_non_stripped(&self) -> Result<()> {
929 let room_id = room_id!("!test_stripped_non_stripped:localhost");
930 let user_id = user_id();
931
932 assert!(self.get_member_event(room_id, user_id).await.unwrap().is_none());
933 assert_eq!(self.get_room_infos().await.unwrap().len(), 0);
934
935 let mut changes = StateChanges::default();
936 changes
937 .state
938 .entry(room_id.to_owned())
939 .or_default()
940 .entry(StateEventType::RoomMember)
941 .or_default()
942 .insert(user_id.into(), membership_event().cast());
943 changes.add_room(RoomInfo::new(room_id, RoomState::Left));
944 self.save_changes(&changes).await.unwrap();
945
946 let member_event =
947 self.get_member_event(room_id, user_id).await.unwrap().unwrap().deserialize().unwrap();
948 assert!(matches!(member_event, MemberEvent::Sync(_)));
949 assert_eq!(self.get_room_infos().await.unwrap().len(), 1);
950
951 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await.unwrap();
952 assert_eq!(members, vec![user_id.to_owned()]);
953
954 let mut changes = StateChanges::default();
955 changes.add_stripped_member(room_id, user_id, custom_stripped_membership_event(user_id));
956 changes.add_room(RoomInfo::new(room_id, RoomState::Invited));
957 self.save_changes(&changes).await.unwrap();
958
959 let member_event =
960 self.get_member_event(room_id, user_id).await.unwrap().unwrap().deserialize().unwrap();
961 assert!(matches!(member_event, MemberEvent::Stripped(_)));
962 assert_eq!(self.get_room_infos().await.unwrap().len(), 1);
963
964 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await.unwrap();
965 assert_eq!(members, vec![user_id.to_owned()]);
966
967 Ok(())
968 }
969
970 async fn test_room_removal(&self) -> Result<()> {
971 let room_id = room_id();
972 let user_id = user_id();
973 let display_name = DisplayName::new("example");
974 let stripped_room_id = stripped_room_id();
975
976 self.populate().await?;
977
978 {
979 let txn = TransactionId::new();
981 let ev =
982 SerializableEventContent::new(&RoomMessageEventContent::text_plain("sup").into())
983 .unwrap();
984 self.save_send_queue_request(
985 room_id,
986 txn.clone(),
987 MilliSecondsSinceUnixEpoch::now(),
988 ev.into(),
989 0,
990 )
991 .await?;
992
993 self.save_dependent_queued_request(
995 room_id,
996 &txn,
997 ChildTransactionId::new(),
998 MilliSecondsSinceUnixEpoch::now(),
999 DependentQueuedRequestKind::RedactEvent,
1000 )
1001 .await?;
1002 }
1003
1004 self.remove_room(room_id).await?;
1005
1006 assert_eq!(self.get_room_infos().await?.len(), 1, "room is still there");
1007
1008 assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_none());
1009 assert!(
1010 self.get_state_events(room_id, StateEventType::RoomTopic).await?.is_empty(),
1011 "still state events found"
1012 );
1013 assert!(self.get_profile(room_id, user_id).await?.is_none());
1014 assert!(self.get_member_event(room_id, user_id).await?.is_none());
1015 assert!(
1016 self.get_user_ids(room_id, RoomMemberships::empty()).await?.is_empty(),
1017 "still user ids found"
1018 );
1019 assert!(
1020 self.get_user_ids(room_id, RoomMemberships::INVITE).await?.is_empty(),
1021 "still invited user ids found"
1022 );
1023 assert!(
1024 self.get_user_ids(room_id, RoomMemberships::JOIN).await?.is_empty(),
1025 "still joined users found"
1026 );
1027 assert!(
1028 self.get_users_with_display_name(room_id, &display_name).await?.is_empty(),
1029 "still display names found"
1030 );
1031 assert!(self
1032 .get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
1033 .await?
1034 .is_none());
1035 assert!(self
1036 .get_user_room_receipt_event(
1037 room_id,
1038 ReceiptType::Read,
1039 ReceiptThread::Unthreaded,
1040 user_id
1041 )
1042 .await?
1043 .is_none());
1044 assert!(
1045 self.get_event_room_receipt_events(
1046 room_id,
1047 ReceiptType::Read,
1048 ReceiptThread::Unthreaded,
1049 first_receipt_event_id()
1050 )
1051 .await?
1052 .is_empty(),
1053 "still event recepts in the store"
1054 );
1055 assert!(self.load_send_queue_requests(room_id).await?.is_empty());
1056 assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1057
1058 self.remove_room(stripped_room_id).await?;
1059
1060 assert!(self.get_room_infos().await?.is_empty(), "still room info found");
1061 Ok(())
1062 }
1063
1064 async fn test_profile_removal(&self) -> Result<()> {
1065 let room_id = room_id();
1066
1067 let user_id = user_id();
1069 let invited_user_id = invited_user_id();
1070
1071 self.populate().await?;
1072
1073 let new_invite_member_json = json!({
1074 "content": {
1075 "avatar_url": "mxc://localhost/SEsfnsuifSDFSSEG",
1076 "displayname": "example after update",
1077 "membership": "invite",
1078 "reason": "Looking for support"
1079 },
1080 "event_id": "$143273582443PhrSm:localhost",
1081 "origin_server_ts": 1432735824,
1082 "room_id": room_id,
1083 "sender": user_id,
1084 "state_key": invited_user_id,
1085 "type": "m.room.member",
1086 });
1087 let new_invite_member_event: SyncRoomMemberEvent =
1088 serde_json::from_value(new_invite_member_json.clone()).unwrap();
1089
1090 let mut changes = StateChanges {
1091 profiles_to_delete: [(
1093 room_id.to_owned(),
1094 vec![user_id.to_owned(), invited_user_id.to_owned()],
1095 )]
1096 .into(),
1097
1098 profiles: {
1100 let mut map = BTreeMap::default();
1101 map.insert(
1102 room_id.to_owned(),
1103 [(invited_user_id.to_owned(), new_invite_member_event.into())]
1104 .into_iter()
1105 .collect(),
1106 );
1107 map
1108 },
1109
1110 ..StateChanges::default()
1111 };
1112
1113 let raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(new_invite_member_json)
1114 .expect("can create sync-state-event for topic");
1115 let event = raw.deserialize().unwrap();
1116 changes.add_state_event(room_id, event, raw);
1117
1118 self.save_changes(&changes).await.unwrap();
1119
1120 assert!(self.get_profile(room_id, user_id).await?.is_none());
1122 assert!(self.get_member_event(room_id, user_id).await?.is_some());
1123
1124 let invited_member_event = self.get_profile(room_id, invited_user_id).await?.unwrap();
1126 assert_eq!(
1127 invited_member_event.as_original().unwrap().content.displayname.as_deref(),
1128 Some("example after update")
1129 );
1130 assert!(self.get_member_event(room_id, invited_user_id).await?.is_some());
1131
1132 Ok(())
1133 }
1134
1135 async fn test_presence_saving(&self) {
1136 let user_id = user_id();
1137 let second_user_id = user_id!("@second:localhost");
1138 let third_user_id = user_id!("@third:localhost");
1139 let unknown_user_id = user_id!("@unknown:localhost");
1140
1141 let mut user_ids = vec![user_id.to_owned()];
1143 let presence_event = self.get_presence_event(user_id).await;
1144 assert!(presence_event.unwrap().is_none());
1145 let presence_events = self.get_presence_events(&user_ids).await;
1146 assert!(presence_events.unwrap().is_empty());
1147
1148 let mut changes = StateChanges::default();
1150 changes.presence.insert(user_id.to_owned(), custom_presence_event(user_id));
1151 self.save_changes(&changes).await.unwrap();
1152
1153 let presence_event = self.get_presence_event(user_id).await;
1154 assert!(presence_event.unwrap().is_some());
1155 let presence_events = self.get_presence_events(&user_ids).await;
1156 assert_eq!(presence_events.unwrap().len(), 1);
1157
1158 let mut changes = StateChanges::default();
1160 changes.presence.insert(second_user_id.to_owned(), custom_presence_event(second_user_id));
1161 changes.presence.insert(third_user_id.to_owned(), custom_presence_event(third_user_id));
1162 self.save_changes(&changes).await.unwrap();
1163
1164 user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
1165 let presence_event = self.get_presence_event(second_user_id).await;
1166 assert!(presence_event.unwrap().is_some());
1167 let presence_event = self.get_presence_event(third_user_id).await;
1168 assert!(presence_event.unwrap().is_some());
1169 let presence_events = self.get_presence_events(&user_ids).await;
1170 assert_eq!(presence_events.unwrap().len(), 3);
1171
1172 user_ids.push(unknown_user_id.to_owned());
1174 let member_events = self.get_presence_events(&user_ids).await;
1175 assert_eq!(member_events.unwrap().len(), 3);
1176
1177 let presence_events = self.get_presence_events(&[]).await;
1179 assert!(presence_events.unwrap().is_empty());
1180 }
1181
1182 async fn test_display_names_saving(&self) {
1183 let room_id = room_id!("!test_display_names_saving:localhost");
1184 let user_id = user_id();
1185 let user_display_name = DisplayName::new("User");
1186 let second_user_id = user_id!("@second:localhost");
1187 let third_user_id = user_id!("@third:localhost");
1188 let other_display_name = DisplayName::new("Raoul");
1189 let unknown_display_name = DisplayName::new("Unknown");
1190
1191 let mut display_names = vec![user_display_name.to_owned()];
1193 let users = self.get_users_with_display_name(room_id, &user_display_name).await.unwrap();
1194 assert!(users.is_empty());
1195 let names = self.get_users_with_display_names(room_id, &display_names).await.unwrap();
1196 assert!(names.is_empty());
1197
1198 let mut changes = StateChanges::default();
1200 changes
1201 .ambiguity_maps
1202 .entry(room_id.to_owned())
1203 .or_default()
1204 .insert(user_display_name.to_owned(), [user_id.to_owned()].into());
1205 self.save_changes(&changes).await.unwrap();
1206
1207 let users = self.get_users_with_display_name(room_id, &user_display_name).await.unwrap();
1208 assert_eq!(users.len(), 1);
1209 let names = self.get_users_with_display_names(room_id, &display_names).await.unwrap();
1210 assert_eq!(names.len(), 1);
1211 assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1212
1213 let mut changes = StateChanges::default();
1215 changes.ambiguity_maps.entry(room_id.to_owned()).or_default().insert(
1216 other_display_name.to_owned(),
1217 [second_user_id.to_owned(), third_user_id.to_owned()].into(),
1218 );
1219 self.save_changes(&changes).await.unwrap();
1220
1221 display_names.push(other_display_name.to_owned());
1222 let users = self.get_users_with_display_name(room_id, &user_display_name).await.unwrap();
1223 assert_eq!(users.len(), 1);
1224 let users = self.get_users_with_display_name(room_id, &other_display_name).await.unwrap();
1225 assert_eq!(users.len(), 2);
1226 let names = self.get_users_with_display_names(room_id, &display_names).await.unwrap();
1227 assert_eq!(names.len(), 2);
1228 assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1229 assert_eq!(names.get(&other_display_name).unwrap().len(), 2);
1230
1231 display_names.push(unknown_display_name.to_owned());
1233 let names = self.get_users_with_display_names(room_id, &display_names).await.unwrap();
1234 assert_eq!(names.len(), 2);
1235
1236 let names = self.get_users_with_display_names(room_id, &[]).await;
1238 assert!(names.unwrap().is_empty());
1239 }
1240
1241 #[allow(clippy::needless_range_loop)]
1242 async fn test_send_queue(&self) {
1243 let room_id = room_id!("!test_send_queue:localhost");
1244
1245 let events = self.load_send_queue_requests(room_id).await.unwrap();
1247 assert!(events.is_empty());
1248
1249 let txn0 = TransactionId::new();
1251 let event0 =
1252 SerializableEventContent::new(&RoomMessageEventContent::text_plain("msg0").into())
1253 .unwrap();
1254 self.save_send_queue_request(
1255 room_id,
1256 txn0.clone(),
1257 MilliSecondsSinceUnixEpoch::now(),
1258 event0.into(),
1259 0,
1260 )
1261 .await
1262 .unwrap();
1263
1264 let pending = self.load_send_queue_requests(room_id).await.unwrap();
1266
1267 assert_eq!(pending.len(), 1);
1268 {
1269 assert_eq!(pending[0].transaction_id, txn0);
1270
1271 let deserialized = pending[0].as_event().unwrap().deserialize().unwrap();
1272 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1273 assert_eq!(content.body(), "msg0");
1274
1275 assert!(!pending[0].is_wedged());
1276 }
1277
1278 for i in 1..=3 {
1280 let txn = TransactionId::new();
1281 let event = SerializableEventContent::new(
1282 &RoomMessageEventContent::text_plain(format!("msg{i}")).into(),
1283 )
1284 .unwrap();
1285
1286 self.save_send_queue_request(
1287 room_id,
1288 txn,
1289 MilliSecondsSinceUnixEpoch::now(),
1290 event.into(),
1291 0,
1292 )
1293 .await
1294 .unwrap();
1295 }
1296
1297 let pending = self.load_send_queue_requests(room_id).await.unwrap();
1299
1300 assert_eq!(pending.len(), 4);
1302
1303 assert_eq!(pending[0].transaction_id, txn0);
1304
1305 for i in 0..4 {
1306 let deserialized = pending[i].as_event().unwrap().deserialize().unwrap();
1307 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1308 assert_eq!(content.body(), format!("msg{i}"));
1309 assert!(!pending[i].is_wedged());
1310 }
1311
1312 let txn2 = &pending[2].transaction_id;
1314 self.update_send_queue_request_status(
1315 room_id,
1316 txn2,
1317 Some(QueueWedgeError::GenericApiError { msg: "Oops".to_owned() }),
1318 )
1319 .await
1320 .unwrap();
1321
1322 let pending = self.load_send_queue_requests(room_id).await.unwrap();
1324
1325 assert_eq!(pending.len(), 4);
1327 assert_eq!(pending[0].transaction_id, txn0);
1328 assert_eq!(pending[2].transaction_id, *txn2);
1329 assert!(pending[2].is_wedged());
1330 let error = pending[2].clone().error.unwrap();
1331 let generic_error = assert_matches!(error, QueueWedgeError::GenericApiError { msg } => msg);
1332 assert_eq!(generic_error, "Oops");
1333 for i in 0..4 {
1334 if i != 2 {
1335 assert!(!pending[i].is_wedged());
1336 }
1337 }
1338
1339 let event0 = SerializableEventContent::new(
1341 &RoomMessageEventContent::text_plain("wow that's a cool test").into(),
1342 )
1343 .unwrap();
1344 self.update_send_queue_request(room_id, txn2, event0.into()).await.unwrap();
1345
1346 let pending = self.load_send_queue_requests(room_id).await.unwrap();
1348
1349 assert_eq!(pending.len(), 4);
1350 {
1351 assert_eq!(pending[2].transaction_id, *txn2);
1352
1353 let deserialized = pending[2].as_event().unwrap().deserialize().unwrap();
1354 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1355 assert_eq!(content.body(), "wow that's a cool test");
1356
1357 assert!(!pending[2].is_wedged());
1358
1359 for i in 0..4 {
1360 if i != 2 {
1361 let deserialized = pending[i].as_event().unwrap().deserialize().unwrap();
1362 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1363 assert_eq!(content.body(), format!("msg{i}"));
1364
1365 assert!(!pending[i].is_wedged());
1366 }
1367 }
1368 }
1369
1370 self.remove_send_queue_request(room_id, &txn0).await.unwrap();
1372
1373 let pending = self.load_send_queue_requests(room_id).await.unwrap();
1375
1376 assert_eq!(pending.len(), 3);
1377 assert_eq!(pending[1].transaction_id, *txn2);
1378 for i in 0..3 {
1379 assert_ne!(pending[i].transaction_id, txn0);
1380 }
1381
1382 let room_id2 = room_id!("!test_send_queue_two:localhost");
1387 {
1388 let txn = TransactionId::new();
1389 let event =
1390 SerializableEventContent::new(&RoomMessageEventContent::text_plain("room2").into())
1391 .unwrap();
1392 self.save_send_queue_request(
1393 room_id2,
1394 txn.clone(),
1395 MilliSecondsSinceUnixEpoch::now(),
1396 event.into(),
1397 0,
1398 )
1399 .await
1400 .unwrap();
1401 }
1402
1403 {
1405 let room_id3 = room_id!("!test_send_queue_three:localhost");
1406 let txn = TransactionId::new();
1407 let event =
1408 SerializableEventContent::new(&RoomMessageEventContent::text_plain("room3").into())
1409 .unwrap();
1410 self.save_send_queue_request(
1411 room_id3,
1412 txn.clone(),
1413 MilliSecondsSinceUnixEpoch::now(),
1414 event.into(),
1415 0,
1416 )
1417 .await
1418 .unwrap();
1419
1420 self.remove_send_queue_request(room_id3, &txn).await.unwrap();
1421 }
1422
1423 let outstanding_rooms = self.load_rooms_with_unsent_requests().await.unwrap();
1426 assert_eq!(outstanding_rooms.len(), 2);
1427 assert!(outstanding_rooms.iter().any(|room| room == room_id));
1428 assert!(outstanding_rooms.iter().any(|room| room == room_id2));
1429 }
1430
1431 async fn test_send_queue_priority(&self) {
1432 let room_id = room_id!("!test_send_queue:localhost");
1433
1434 let events = self.load_send_queue_requests(room_id).await.unwrap();
1436 assert!(events.is_empty());
1437
1438 let low0_txn = TransactionId::new();
1440 let ev0 =
1441 SerializableEventContent::new(&RoomMessageEventContent::text_plain("low0").into())
1442 .unwrap();
1443 self.save_send_queue_request(
1444 room_id,
1445 low0_txn.clone(),
1446 MilliSecondsSinceUnixEpoch::now(),
1447 ev0.into(),
1448 2,
1449 )
1450 .await
1451 .unwrap();
1452
1453 let high_txn = TransactionId::new();
1455 let ev1 =
1456 SerializableEventContent::new(&RoomMessageEventContent::text_plain("high").into())
1457 .unwrap();
1458 self.save_send_queue_request(
1459 room_id,
1460 high_txn.clone(),
1461 MilliSecondsSinceUnixEpoch::now(),
1462 ev1.into(),
1463 10,
1464 )
1465 .await
1466 .unwrap();
1467
1468 let low1_txn = TransactionId::new();
1470 let ev2 =
1471 SerializableEventContent::new(&RoomMessageEventContent::text_plain("low1").into())
1472 .unwrap();
1473 self.save_send_queue_request(
1474 room_id,
1475 low1_txn.clone(),
1476 MilliSecondsSinceUnixEpoch::now(),
1477 ev2.into(),
1478 2,
1479 )
1480 .await
1481 .unwrap();
1482
1483 let pending = self.load_send_queue_requests(room_id).await.unwrap();
1486
1487 assert_eq!(pending.len(), 3);
1488 {
1489 assert_eq!(pending[0].transaction_id, high_txn);
1490
1491 let deserialized = pending[0].as_event().unwrap().deserialize().unwrap();
1492 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1493 assert_eq!(content.body(), "high");
1494 }
1495
1496 {
1497 assert_eq!(pending[1].transaction_id, low0_txn);
1498
1499 let deserialized = pending[1].as_event().unwrap().deserialize().unwrap();
1500 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1501 assert_eq!(content.body(), "low0");
1502 }
1503
1504 {
1505 assert_eq!(pending[2].transaction_id, low1_txn);
1506
1507 let deserialized = pending[2].as_event().unwrap().deserialize().unwrap();
1508 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1509 assert_eq!(content.body(), "low1");
1510 }
1511 }
1512
1513 async fn test_send_queue_dependents(&self) {
1514 let room_id = room_id!("!test_send_queue_dependents:localhost");
1515
1516 let txn0 = TransactionId::new();
1518 let event0 =
1519 SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into())
1520 .unwrap();
1521 self.save_send_queue_request(
1522 room_id,
1523 txn0.clone(),
1524 MilliSecondsSinceUnixEpoch::now(),
1525 event0.into(),
1526 0,
1527 )
1528 .await
1529 .unwrap();
1530
1531 assert!(self.load_dependent_queued_requests(room_id).await.unwrap().is_empty());
1533
1534 let child_txn = ChildTransactionId::new();
1536 self.save_dependent_queued_request(
1537 room_id,
1538 &txn0,
1539 child_txn.clone(),
1540 MilliSecondsSinceUnixEpoch::now(),
1541 DependentQueuedRequestKind::RedactEvent,
1542 )
1543 .await
1544 .unwrap();
1545
1546 let dependents = self.load_dependent_queued_requests(room_id).await.unwrap();
1548 assert_eq!(dependents.len(), 1);
1549 assert_eq!(dependents[0].parent_transaction_id, txn0);
1550 assert_eq!(dependents[0].own_transaction_id, child_txn);
1551 assert!(dependents[0].parent_key.is_none());
1552 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1553
1554 let event_id = owned_event_id!("$1");
1556 let num_updated = self
1557 .mark_dependent_queued_requests_as_ready(
1558 room_id,
1559 &txn0,
1560 SentRequestKey::Event(event_id.clone()),
1561 )
1562 .await
1563 .unwrap();
1564 assert_eq!(num_updated, 1);
1565
1566 let dependents = self.load_dependent_queued_requests(room_id).await.unwrap();
1568 assert_eq!(dependents.len(), 1);
1569 assert_eq!(dependents[0].parent_transaction_id, txn0);
1570 assert_eq!(dependents[0].own_transaction_id, child_txn);
1571 assert_matches!(dependents[0].parent_key.as_ref(), Some(SentRequestKey::Event(eid)) => {
1572 assert_eq!(*eid, event_id);
1573 });
1574 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1575
1576 let removed = self
1578 .remove_dependent_queued_request(room_id, &dependents[0].own_transaction_id)
1579 .await
1580 .unwrap();
1581 assert!(removed);
1582
1583 assert!(self.load_dependent_queued_requests(room_id).await.unwrap().is_empty());
1585
1586 let txn1 = TransactionId::new();
1589 let event1 =
1590 SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into())
1591 .unwrap();
1592 self.save_send_queue_request(
1593 room_id,
1594 txn1.clone(),
1595 MilliSecondsSinceUnixEpoch::now(),
1596 event1.into(),
1597 0,
1598 )
1599 .await
1600 .unwrap();
1601
1602 self.save_dependent_queued_request(
1603 room_id,
1604 &txn0,
1605 ChildTransactionId::new(),
1606 MilliSecondsSinceUnixEpoch::now(),
1607 DependentQueuedRequestKind::RedactEvent,
1608 )
1609 .await
1610 .unwrap();
1611 assert_eq!(self.load_dependent_queued_requests(room_id).await.unwrap().len(), 1);
1612
1613 self.save_dependent_queued_request(
1614 room_id,
1615 &txn1,
1616 ChildTransactionId::new(),
1617 MilliSecondsSinceUnixEpoch::now(),
1618 DependentQueuedRequestKind::EditEvent {
1619 new_content: SerializableEventContent::new(
1620 &RoomMessageEventContent::text_plain("edit").into(),
1621 )
1622 .unwrap(),
1623 },
1624 )
1625 .await
1626 .unwrap();
1627 assert_eq!(self.load_dependent_queued_requests(room_id).await.unwrap().len(), 2);
1628
1629 let removed = self.remove_send_queue_request(room_id, &txn0).await.unwrap();
1631 assert!(removed);
1632
1633 let dependents = self.load_dependent_queued_requests(room_id).await.unwrap();
1635 assert_eq!(dependents.len(), 2);
1636 }
1637
1638 async fn test_update_send_queue_dependent(&self) {
1639 let room_id = room_id!("!test_send_queue_dependents:localhost");
1640
1641 let txn = TransactionId::new();
1642
1643 let child_txn = ChildTransactionId::new();
1645
1646 self.save_dependent_queued_request(
1647 room_id,
1648 &txn,
1649 child_txn.clone(),
1650 MilliSecondsSinceUnixEpoch::now(),
1651 DependentQueuedRequestKind::RedactEvent,
1652 )
1653 .await
1654 .unwrap();
1655
1656 let dependents = self.load_dependent_queued_requests(room_id).await.unwrap();
1658 assert_eq!(dependents.len(), 1);
1659 assert_eq!(dependents[0].parent_transaction_id, txn);
1660 assert_eq!(dependents[0].own_transaction_id, child_txn);
1661 assert!(dependents[0].parent_key.is_none());
1662 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1663
1664 self.update_dependent_queued_request(
1666 room_id,
1667 &child_txn,
1668 DependentQueuedRequestKind::ReactEvent { key: "👍".to_owned() },
1669 )
1670 .await
1671 .unwrap();
1672
1673 let dependents = self.load_dependent_queued_requests(room_id).await.unwrap();
1675 assert_eq!(dependents.len(), 1);
1676 assert_eq!(dependents[0].parent_transaction_id, txn);
1677 assert_eq!(dependents[0].own_transaction_id, child_txn);
1678 assert!(dependents[0].parent_key.is_none());
1679 assert_matches!(
1680 &dependents[0].kind,
1681 DependentQueuedRequestKind::ReactEvent { key } => {
1682 assert_eq!(key, "👍");
1683 }
1684 );
1685 }
1686}
1687
1688#[allow(unused_macros, unused_extern_crates)]
1714#[macro_export]
1715macro_rules! statestore_integration_tests {
1716 () => {
1717 mod statestore_integration_tests {
1718 use matrix_sdk_test::async_test;
1719 use $crate::store::{
1720 IntoStateStore, Result as StoreResult, StateStoreIntegrationTests,
1721 };
1722
1723 use super::get_store;
1724
1725 #[async_test]
1726 async fn test_topic_redaction() -> StoreResult<()> {
1727 let store = get_store().await?.into_state_store();
1728 store.test_topic_redaction().await
1729 }
1730
1731 #[async_test]
1732 async fn test_populate_store() -> StoreResult<()> {
1733 let store = get_store().await?.into_state_store();
1734 store.test_populate_store().await
1735 }
1736
1737 #[async_test]
1738 async fn test_member_saving() {
1739 let store = get_store().await.unwrap().into_state_store();
1740 store.test_member_saving().await
1741 }
1742
1743 #[async_test]
1744 async fn test_filter_saving() {
1745 let store = get_store().await.unwrap().into_state_store();
1746 store.test_filter_saving().await
1747 }
1748
1749 #[async_test]
1750 async fn test_user_avatar_url_saving() {
1751 let store = get_store().await.unwrap().into_state_store();
1752 store.test_user_avatar_url_saving().await
1753 }
1754
1755 #[async_test]
1756 async fn test_server_capabilities_saving() {
1757 let store = get_store().await.unwrap().into_state_store();
1758 store.test_server_capabilities_saving().await
1759 }
1760
1761 #[async_test]
1762 async fn test_sync_token_saving() {
1763 let store = get_store().await.unwrap().into_state_store();
1764 store.test_sync_token_saving().await
1765 }
1766
1767 #[async_test]
1768 async fn test_utd_hook_manager_data_saving() {
1769 let store = get_store().await.expect("creating store failed").into_state_store();
1770 store.test_utd_hook_manager_data_saving().await;
1771 }
1772
1773 #[async_test]
1774 async fn test_stripped_member_saving() {
1775 let store = get_store().await.unwrap().into_state_store();
1776 store.test_stripped_member_saving().await
1777 }
1778
1779 #[async_test]
1780 async fn test_power_level_saving() {
1781 let store = get_store().await.unwrap().into_state_store();
1782 store.test_power_level_saving().await
1783 }
1784
1785 #[async_test]
1786 async fn test_receipts_saving() {
1787 let store = get_store().await.expect("creating store failed").into_state_store();
1788 store.test_receipts_saving().await;
1789 }
1790
1791 #[async_test]
1792 async fn test_custom_storage() -> StoreResult<()> {
1793 let store = get_store().await?.into_state_store();
1794 store.test_custom_storage().await
1795 }
1796
1797 #[async_test]
1798 async fn test_stripped_non_stripped() -> StoreResult<()> {
1799 let store = get_store().await.unwrap().into_state_store();
1800 store.test_stripped_non_stripped().await
1801 }
1802
1803 #[async_test]
1804 async fn test_room_removal() -> StoreResult<()> {
1805 let store = get_store().await?.into_state_store();
1806 store.test_room_removal().await
1807 }
1808
1809 #[async_test]
1810 async fn test_profile_removal() -> StoreResult<()> {
1811 let store = get_store().await?.into_state_store();
1812 store.test_profile_removal().await
1813 }
1814
1815 #[async_test]
1816 async fn test_presence_saving() {
1817 let store = get_store().await.expect("creating store failed").into_state_store();
1818 store.test_presence_saving().await;
1819 }
1820
1821 #[async_test]
1822 async fn test_display_names_saving() {
1823 let store = get_store().await.expect("creating store failed").into_state_store();
1824 store.test_display_names_saving().await;
1825 }
1826
1827 #[async_test]
1828 async fn test_send_queue() {
1829 let store = get_store().await.expect("creating store failed").into_state_store();
1830 store.test_send_queue().await;
1831 }
1832
1833 #[async_test]
1834 async fn test_send_queue_priority() {
1835 let store = get_store().await.expect("creating store failed").into_state_store();
1836 store.test_send_queue_priority().await;
1837 }
1838
1839 #[async_test]
1840 async fn test_send_queue_dependents() {
1841 let store = get_store().await.expect("creating store failed").into_state_store();
1842 store.test_send_queue_dependents().await;
1843 }
1844
1845 #[async_test]
1846 async fn test_update_send_queue_dependent() {
1847 let store = get_store().await.expect("creating store failed").into_state_store();
1848 store.test_update_send_queue_dependent().await;
1849 }
1850 }
1851 };
1852}
1853
1854fn user_id() -> &'static UserId {
1855 user_id!("@example:localhost")
1856}
1857
1858fn invited_user_id() -> &'static UserId {
1859 user_id!("@invited:localhost")
1860}
1861
1862fn room_id() -> &'static RoomId {
1863 room_id!("!test:localhost")
1864}
1865
1866fn stripped_room_id() -> &'static RoomId {
1867 room_id!("!stripped:localhost")
1868}
1869
1870fn first_receipt_event_id() -> &'static EventId {
1871 event_id!("$example")
1872}
1873
1874fn power_level_event() -> Raw<AnySyncStateEvent> {
1875 let content = RoomPowerLevelsEventContent::default();
1876
1877 let event = json!({
1878 "event_id": "$h29iv0s8:example.com",
1879 "content": content,
1880 "sender": user_id(),
1881 "type": "m.room.power_levels",
1882 "origin_server_ts": 0u64,
1883 "state_key": "",
1884 });
1885
1886 serde_json::from_value(event).unwrap()
1887}
1888
1889fn stripped_membership_event() -> Raw<StrippedRoomMemberEvent> {
1890 custom_stripped_membership_event(user_id())
1891}
1892
1893fn custom_stripped_membership_event(user_id: &UserId) -> Raw<StrippedRoomMemberEvent> {
1894 let ev_json = json!({
1895 "type": "m.room.member",
1896 "content": RoomMemberEventContent::new(MembershipState::Join),
1897 "sender": user_id,
1898 "state_key": user_id,
1899 });
1900
1901 Raw::new(&ev_json).unwrap().cast()
1902}
1903
1904fn membership_event() -> Raw<SyncRoomMemberEvent> {
1905 custom_membership_event(user_id(), event_id!("$h29iv0s8:example.com"))
1906}
1907
1908fn custom_membership_event(user_id: &UserId, event_id: &EventId) -> Raw<SyncRoomMemberEvent> {
1909 let ev_json = json!({
1910 "type": "m.room.member",
1911 "content": RoomMemberEventContent::new(MembershipState::Join),
1912 "event_id": event_id,
1913 "origin_server_ts": 198,
1914 "sender": user_id,
1915 "state_key": user_id,
1916 });
1917
1918 Raw::new(&ev_json).unwrap().cast()
1919}
1920
1921fn custom_presence_event(user_id: &UserId) -> Raw<PresenceEvent> {
1922 let ev_json = json!({
1923 "content": {
1924 "presence": "online"
1925 },
1926 "sender": user_id,
1927 });
1928
1929 Raw::new(&ev_json).unwrap().cast()
1930}