1#![cfg(feature = "e2e-encryption")]
17
18use std::collections::BTreeMap;
19
20use async_stream::stream;
21use futures_core::Stream;
22use futures_util::{StreamExt, stream_select};
23use matrix_sdk_base::crypto::{
24 IdentityState, IdentityStatusChange, RoomIdentityChange, RoomIdentityState,
25};
26use ruma::{OwnedUserId, UserId, events::room::member::SyncRoomMemberEvent};
27use tokio::sync::mpsc;
28use tokio_stream::wrappers::ReceiverStream;
29
30use super::Room;
31use crate::{
32 Client, Error, Result,
33 encryption::identities::{IdentityUpdates, UserIdentity},
34 event_handler::EventHandlerDropGuard,
35};
36
37#[derive(Debug)]
49pub struct IdentityStatusChanges {
50 room_identity_state: RoomIdentityState<Room>,
52
53 _drop_guard: EventHandlerDropGuard,
56}
57
58impl IdentityStatusChanges {
59 pub async fn create_stream(
90 room: Room,
91 ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>>> {
92 let identity_updates = wrap_identity_updates(&room.client).await?;
93 let (drop_guard, room_member_events) = wrap_room_member_events(&room);
94 let mut unprocessed_stream = combine_streams(identity_updates, room_member_events);
95 let own_user_id = room.client.user_id().ok_or(Error::InsufficientData)?.to_owned();
96
97 let state = IdentityStatusChanges {
98 room_identity_state: RoomIdentityState::new(room).await,
99 _drop_guard: drop_guard,
100 };
101
102 Ok(stream!({
103 let mut state = state;
106
107 let mut current_state =
108 filter_for_initial_update(state.room_identity_state.current_state(), &own_user_id);
109
110 if !current_state.is_empty() {
111 current_state.sort();
112 yield current_state;
113 }
114
115 while let Some(item) = unprocessed_stream.next().await {
116 let mut update = filter_non_self(
117 state.room_identity_state.process_change(item).await,
118 &own_user_id,
119 );
120 if !update.is_empty() {
121 update.sort();
122 yield update;
123 }
124 }
125 }))
126 }
127}
128
129fn filter_for_initial_update(
130 mut input: Vec<IdentityStatusChange>,
131 own_user_id: &UserId,
132) -> Vec<IdentityStatusChange> {
133 input.retain(|change| {
138 change.user_id != own_user_id && change.changed_to != IdentityState::Verified
139 });
140
141 input
142}
143
144fn filter_non_self(
145 mut input: Vec<IdentityStatusChange>,
146 own_user_id: &UserId,
147) -> Vec<IdentityStatusChange> {
148 input.retain(|change| change.user_id != own_user_id);
150 input
151}
152
153fn combine_streams(
154 identity_updates: impl Stream<Item = RoomIdentityChange> + Unpin,
155 room_member_events: impl Stream<Item = RoomIdentityChange> + Unpin,
156) -> impl Stream<Item = RoomIdentityChange> {
157 stream_select!(identity_updates, room_member_events)
158}
159
160async fn wrap_identity_updates(
161 client: &Client,
162) -> Result<impl Stream<Item = RoomIdentityChange> + use<>> {
163 Ok(client
164 .encryption()
165 .user_identities_stream()
166 .await?
167 .map(|item| RoomIdentityChange::IdentityUpdates(to_base_updates(item))))
168}
169
170fn to_base_updates(
171 input: IdentityUpdates,
172) -> matrix_sdk_base::crypto::store::types::IdentityUpdates {
173 matrix_sdk_base::crypto::store::types::IdentityUpdates {
174 new: to_base_identities(input.new),
175 changed: to_base_identities(input.changed),
176 unchanged: Default::default(),
177 }
178}
179
180fn to_base_identities(
181 input: BTreeMap<OwnedUserId, UserIdentity>,
182) -> BTreeMap<OwnedUserId, matrix_sdk_base::crypto::UserIdentity> {
183 input.into_iter().map(|(k, v)| (k, v.underlying_identity())).collect()
184}
185
186fn wrap_room_member_events(
187 room: &Room,
188) -> (EventHandlerDropGuard, impl Stream<Item = RoomIdentityChange> + use<>) {
189 let own_user_id = room.own_user_id().to_owned();
190 let room_id = room.room_id();
191 let (sender, receiver) = mpsc::channel(16);
192 let handle =
193 room.client.add_room_event_handler(room_id, move |event: SyncRoomMemberEvent| async move {
194 if *event.state_key() == own_user_id {
195 return;
196 }
197 let _: Result<_, _> =
198 sender.send(RoomIdentityChange::SyncRoomMemberEvent(Box::new(event))).await;
199 });
200 let drop_guard = room.client.event_handler_drop_guard(handle);
201 (drop_guard, ReceiverStream::new(receiver))
202}
203
204#[cfg(all(test, not(target_family = "wasm")))]
205mod tests {
206 use std::time::Duration;
207
208 use futures_util::{FutureExt as _, StreamExt as _, pin_mut};
209 use matrix_sdk_base::crypto::IdentityState;
210 use matrix_sdk_test::{async_test, test_json::keys_query_sets::IdentityChangeDataSet};
211 use test_setup::TestSetup;
212
213 use crate::assert_next_with_timeout;
214
215 #[async_test]
216 async fn test_when_user_becomes_unpinned_we_report_it() {
217 let t = TestSetup::new_room_with_other_bob().await;
219
220 t.pin_bob().await;
222
223 let stream = t.subscribe_to_identity_status_changes().await;
225 pin_mut!(stream);
226
227 t.unpin_bob().await;
229
230 let change = assert_next_with_timeout!(stream);
232 assert_eq!(change[0].user_id, t.bob_user_id());
233 assert_eq!(change[0].changed_to, IdentityState::PinViolation);
234 assert_eq!(change.len(), 1);
235 }
236
237 #[async_test]
238 async fn test_when_user_becomes_verification_violation_we_report_it() {
239 let t = TestSetup::new_room_with_other_bob().await;
241
242 t.verify_bob().await;
244
245 let stream = t.subscribe_to_identity_status_changes().await;
247 pin_mut!(stream);
248
249 t.unpin_bob().await;
251
252 let change = assert_next_with_timeout!(stream);
254 assert_eq!(change[0].user_id, t.bob_user_id());
255 assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
256 assert_eq!(change.len(), 1);
257 }
258
259 #[async_test]
260 async fn test_when_user_becomes_pinned_we_report_it() {
261 let t = TestSetup::new_room_with_other_bob().await;
263
264 t.unpin_bob().await;
266
267 let stream = t.subscribe_to_identity_status_changes().await;
269 pin_mut!(stream);
270
271 t.pin_bob().await;
273
274 let change1 = assert_next_with_timeout!(stream);
276 assert_eq!(change1[0].user_id, t.bob_user_id());
277 assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
278 assert_eq!(change1.len(), 1);
279
280 let change2 = assert_next_with_timeout!(stream);
282 assert_eq!(change2[0].user_id, t.bob_user_id());
283 assert_eq!(change2[0].changed_to, IdentityState::Pinned);
284 assert_eq!(change2.len(), 1);
285 }
286
287 #[async_test]
288 async fn test_when_user_becomes_verified_we_report_it() {
289 let t = TestSetup::new_room_with_other_bob().await;
291
292 let stream = t.subscribe_to_identity_status_changes().await;
294 pin_mut!(stream);
295
296 t.verify_bob().await;
298
299 let change = assert_next_with_timeout!(stream);
301 assert_eq!(change[0].user_id, t.bob_user_id());
302 assert_eq!(change[0].changed_to, IdentityState::Verified);
303 assert_eq!(change.len(), 1);
304
305 t.unpin_bob().await;
307
308 let change = assert_next_with_timeout!(stream);
310 assert_eq!(change[0].user_id, t.bob_user_id());
311 assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
312 assert_eq!(change.len(), 1);
313 }
314
315 #[async_test]
316 async fn test_when_an_unpinned_user_becomes_verified_we_report_it() {
317 let t = TestSetup::new_room_with_other_bob().await;
319
320 t.unpin_bob_with(IdentityChangeDataSet::key_query_with_identity_a()).await;
322
323 let stream = t.subscribe_to_identity_status_changes().await;
325 pin_mut!(stream);
326
327 t.verify_bob().await;
329
330 let change1 = assert_next_with_timeout!(stream);
332 assert_eq!(change1[0].user_id, t.bob_user_id());
333 assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
334 assert_eq!(change1.len(), 1);
335
336 let change2 = assert_next_with_timeout!(stream);
338 assert_eq!(change2[0].user_id, t.bob_user_id());
339 assert_eq!(change2[0].changed_to, IdentityState::Verified);
340 assert_eq!(change2.len(), 1);
341 }
342
343 #[async_test]
344 async fn test_when_user_in_verification_violation_becomes_verified_we_report_it() {
345 let t = TestSetup::new_room_with_other_bob().await;
347
348 t.verify_bob_with(
350 IdentityChangeDataSet::key_query_with_identity_b(),
351 IdentityChangeDataSet::master_signing_keys_b(),
352 IdentityChangeDataSet::self_signing_keys_b(),
353 )
354 .await;
355 t.unpin_bob().await;
356
357 let stream = t.subscribe_to_identity_status_changes().await;
359 pin_mut!(stream);
360
361 t.verify_bob().await;
363
364 let change1 = assert_next_with_timeout!(stream);
366 assert_eq!(change1[0].user_id, t.bob_user_id());
367 assert_eq!(change1[0].changed_to, IdentityState::VerificationViolation);
368 assert_eq!(change1.len(), 1);
369
370 let change2 = assert_next_with_timeout!(stream);
372 assert_eq!(change2[0].user_id, t.bob_user_id());
373 assert_eq!(change2[0].changed_to, IdentityState::Verified);
374 assert_eq!(change2.len(), 1);
375 }
376
377 #[async_test]
378 async fn test_when_an_unpinned_user_joins_we_report_it() {
379 let mut t = TestSetup::new_just_me_room().await;
381
382 t.unpin_bob().await;
384
385 let stream = t.subscribe_to_identity_status_changes().await;
387 pin_mut!(stream);
388
389 t.bob_joins().await;
391
392 let change = assert_next_with_timeout!(stream);
394 assert_eq!(change[0].user_id, t.bob_user_id());
395 assert_eq!(change[0].changed_to, IdentityState::PinViolation);
396 assert_eq!(change.len(), 1);
397 }
398
399 #[async_test]
400 async fn test_when_an_verification_violating_user_joins_we_report_it() {
401 let mut t = TestSetup::new_just_me_room().await;
403
404 t.verify_bob().await;
406 t.unpin_bob().await;
407
408 let stream = t.subscribe_to_identity_status_changes().await;
410 pin_mut!(stream);
411
412 t.bob_joins().await;
414
415 let change = assert_next_with_timeout!(stream);
417 assert_eq!(change[0].user_id, t.bob_user_id());
418 assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
419 assert_eq!(change.len(), 1);
420 }
421
422 #[async_test]
423 async fn test_when_a_verified_user_joins_we_dont_report_it() {
424 let mut t = TestSetup::new_just_me_room().await;
426
427 t.verify_bob().await;
429
430 let stream = t.subscribe_to_identity_status_changes().await;
432 pin_mut!(stream);
433
434 t.bob_joins().await;
436
437 t.unpin_bob().await;
439
440 let change = assert_next_with_timeout!(stream);
442 assert_eq!(change[0].user_id, t.bob_user_id());
443 assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
444 assert_eq!(change.len(), 1);
445 }
446
447 #[async_test]
448 async fn test_when_a_pinned_user_joins_we_do_not_report() {
449 let mut t = TestSetup::new_just_me_room().await;
451
452 t.pin_bob().await;
454
455 let stream = t.subscribe_to_identity_status_changes().await;
457 pin_mut!(stream);
458
459 t.bob_joins().await;
461
462 tokio::time::sleep(Duration::from_millis(200)).await;
464 let change = stream.next().now_or_never();
465 assert!(change.is_none());
466 }
467
468 #[async_test]
469 async fn test_when_an_unpinned_user_leaves_we_report_it() {
470 let mut t = TestSetup::new_room_with_other_bob().await;
472
473 t.unpin_bob().await;
475
476 let stream = t.subscribe_to_identity_status_changes().await;
478 pin_mut!(stream);
479
480 t.bob_leaves().await;
482
483 let change1 = assert_next_with_timeout!(stream);
485 assert_eq!(change1[0].user_id, t.bob_user_id());
486 assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
487 assert_eq!(change1.len(), 1);
488
489 let change2 = assert_next_with_timeout!(stream);
491 assert_eq!(change2[0].user_id, t.bob_user_id());
494 assert_eq!(change2[0].changed_to, IdentityState::Pinned);
495 assert_eq!(change2.len(), 1);
496 }
497
498 #[async_test]
499 async fn test_multiple_identity_changes_are_reported() {
500 let mut t = TestSetup::new_just_me_room().await;
502
503 t.unpin_bob().await;
505
506 let stream = t.subscribe_to_identity_status_changes().await;
508 pin_mut!(stream);
509
510 t.bob_joins().await;
519 let change1 = assert_next_with_timeout!(stream);
520
521 t.pin_bob().await;
523 let change2 = assert_next_with_timeout!(stream);
524
525 t.bob_leaves().await;
527 t.bob_joins().await;
528
529 t.unpin_bob().await;
531 let change3 = assert_next_with_timeout!(stream);
532
533 t.bob_leaves().await;
535 let change4 = assert_next_with_timeout!(stream);
536
537 assert_eq!(change1[0].user_id, t.bob_user_id());
538 assert_eq!(change2[0].user_id, t.bob_user_id());
539 assert_eq!(change3[0].user_id, t.bob_user_id());
540 assert_eq!(change4[0].user_id, t.bob_user_id());
541
542 assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
543 assert_eq!(change2[0].changed_to, IdentityState::Pinned);
544 assert_eq!(change3[0].changed_to, IdentityState::PinViolation);
545 assert_eq!(change4[0].changed_to, IdentityState::Pinned);
546
547 assert_eq!(change1.len(), 1);
548 assert_eq!(change2.len(), 1);
549 assert_eq!(change3.len(), 1);
550 assert_eq!(change4.len(), 1);
551 }
552
553 #[async_test]
554 async fn test_when_an_unpinned_user_is_already_present_we_report_it_immediately() {
555 let t = TestSetup::new_room_with_other_bob().await;
557 t.unpin_bob().await;
558
559 let stream = t.subscribe_to_identity_status_changes().await;
561 pin_mut!(stream);
562
563 let change = assert_next_with_timeout!(stream);
565 assert_eq!(change[0].user_id, t.bob_user_id());
566 assert_eq!(change[0].changed_to, IdentityState::PinViolation);
567 assert_eq!(change.len(), 1);
568 }
569
570 #[async_test]
571 async fn test_when_a_verified_user_is_already_present_we_dont_report_it() {
572 let t = TestSetup::new_room_with_other_bob().await;
574 t.verify_bob().await;
575
576 let stream = t.subscribe_to_identity_status_changes().await;
578 pin_mut!(stream);
579
580 t.unpin_bob().await;
582
583 let next_change = assert_next_with_timeout!(stream);
585
586 assert_eq!(next_change[0].user_id, t.bob_user_id());
587 assert_eq!(next_change[0].changed_to, IdentityState::VerificationViolation);
588 assert_eq!(next_change.len(), 1);
589 }
590
591 mod test_setup {
596 use std::time::{SystemTime, UNIX_EPOCH};
597
598 use futures_core::Stream;
599 use matrix_sdk_base::{
600 RoomState,
601 crypto::{
602 IdentityStatusChange, OtherUserIdentity,
603 testing::simulate_key_query_response_for_verification,
604 },
605 };
606 use matrix_sdk_test::{
607 DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder,
608 test_json, test_json::keys_query_sets::IdentityChangeDataSet,
609 };
610 use ruma::{
611 OwnedUserId, TransactionId, UserId,
612 api::client::keys::{get_keys, get_keys::v3::Response as KeyQueryResponse},
613 events::room::member::MembershipState,
614 owned_user_id,
615 };
616 use serde_json::json;
617 use wiremock::{
618 Mock, MockServer, ResponseTemplate,
619 matchers::{header, method, path_regex},
620 };
621
622 use crate::{
623 Client, Room, encryption::identities::UserIdentity, test_utils::logged_in_client,
624 };
625
626 pub(super) struct TestSetup {
636 client: Client,
637 bob_user_id: OwnedUserId,
638 sync_response_builder: SyncResponseBuilder,
639 room: Room,
640 }
641
642 impl TestSetup {
643 pub(super) async fn new_just_me_room() -> Self {
644 let (client, user_id, mut sync_response_builder) = Self::init().await;
645 let room = create_just_me_room(&client, &mut sync_response_builder).await;
646 Self { client, bob_user_id: user_id, sync_response_builder, room }
647 }
648
649 pub(super) async fn new_room_with_other_bob() -> Self {
650 let (client, bob_user_id, mut sync_response_builder) = Self::init().await;
651 let room = create_room_with_other_member(
652 &mut sync_response_builder,
653 &client,
654 &bob_user_id,
655 )
656 .await;
657 Self { client, bob_user_id, sync_response_builder, room }
658 }
659
660 pub(super) fn bob_user_id(&self) -> &UserId {
661 &self.bob_user_id
662 }
663
664 pub(super) async fn pin_bob(&self) {
665 if self.bob_user_identity().await.is_some() {
666 assert!(
667 !self.bob_is_pinned().await,
668 "pin_bob() called when the identity is already pinned!"
669 );
670
671 self.bob_user_identity()
673 .await
674 .expect("User should exist")
675 .pin()
676 .await
677 .expect("Should not fail to pin");
678 } else {
679 self.change_bob_identity(IdentityChangeDataSet::key_query_with_identity_a())
681 .await;
682 }
683
684 assert!(self.bob_is_pinned().await);
686 }
687
688 pub(super) async fn unpin_bob(&self) {
689 self.unpin_bob_with(IdentityChangeDataSet::key_query_with_identity_b()).await;
690 }
691
692 pub(super) async fn unpin_bob_with(&self, requested: KeyQueryResponse) {
693 fn master_key_json(key_query_response: &KeyQueryResponse) -> String {
694 serde_json::to_string(
695 key_query_response
696 .master_keys
697 .first_key_value()
698 .expect("Master key should have a value")
699 .1,
700 )
701 .expect("Should be able to serialise master key")
702 }
703
704 let a = IdentityChangeDataSet::key_query_with_identity_a();
705 let b = IdentityChangeDataSet::key_query_with_identity_b();
706 let requested_master_key = master_key_json(&requested);
707 let a_master_key = master_key_json(&a);
708
709 if requested_master_key == a_master_key {
713 self.change_bob_identity(b).await;
714 if !self.bob_is_pinned().await {
715 self.pin_bob().await;
716 }
717 self.change_bob_identity(a).await;
718 } else {
719 self.change_bob_identity(a).await;
720 if !self.bob_is_pinned().await {
721 self.pin_bob().await;
722 }
723 self.change_bob_identity(b).await;
724 }
725
726 assert!(!self.bob_is_pinned().await);
728 }
729
730 pub(super) async fn verify_bob(&self) {
731 self.verify_bob_with(
732 IdentityChangeDataSet::key_query_with_identity_a(),
733 IdentityChangeDataSet::master_signing_keys_a(),
734 IdentityChangeDataSet::self_signing_keys_a(),
735 )
736 .await;
737 }
738
739 pub(super) async fn verify_bob_with(
740 &self,
741 key_query: KeyQueryResponse,
742 master_signing_key: serde_json::Value,
743 self_signing_key: serde_json::Value,
744 ) {
745 self.change_bob_identity(key_query).await;
747
748 let my_user_id = self.client.user_id().expect("I should have a user id");
749 let my_identity = self
750 .client
751 .encryption()
752 .get_user_identity(my_user_id)
753 .await
754 .expect("Should not fail to get own user identity")
755 .expect("Should have an own user identity")
756 .underlying_identity()
757 .own()
758 .expect("Our own identity should be of type Own");
759
760 let signature_upload_request = self
762 .bob_crypto_other_identity()
763 .await
764 .verify()
765 .await
766 .expect("Should be able to verify other identity");
767
768 let verification_response = simulate_key_query_response_for_verification(
769 signature_upload_request,
770 my_identity,
771 my_user_id,
772 self.bob_user_id(),
773 master_signing_key,
774 self_signing_key,
775 );
776
777 self.client
779 .mark_request_as_sent(&TransactionId::new(), &verification_response)
780 .await
781 .unwrap();
782
783 assert!(self.bob_is_verified().await);
785 }
786
787 pub(super) async fn bob_joins(&mut self) {
788 self.bob_membership_change(MembershipState::Join).await;
789 }
790
791 pub(super) async fn bob_leaves(&mut self) {
792 self.bob_membership_change(MembershipState::Leave).await;
793 }
794
795 pub(super) async fn subscribe_to_identity_status_changes(
796 &self,
797 ) -> impl Stream<Item = Vec<IdentityStatusChange>> + use<> {
798 self.room
799 .subscribe_to_identity_status_changes()
800 .await
801 .expect("Should be able to subscribe")
802 }
803
804 async fn init() -> (Client, OwnedUserId, SyncResponseBuilder) {
805 let (client, _server) = create_client_and_server().await;
806
807 client
809 .olm_machine()
810 .await
811 .as_ref()
812 .expect("We should have an Olm machine")
813 .bootstrap_cross_signing(true)
814 .await
815 .expect("Should be able to bootstrap cross-signing");
816
817 let bob_user_id = owned_user_id!("@bob:localhost");
820
821 let sync_response_builder = SyncResponseBuilder::default();
822
823 (client, bob_user_id, sync_response_builder)
824 }
825
826 async fn change_bob_identity(
827 &self,
828 key_query_response: get_keys::v3::Response,
829 ) -> OtherUserIdentity {
830 self.client
831 .mark_request_as_sent(&TransactionId::new(), &key_query_response)
832 .await
833 .expect("Should not fail to send identity changes");
834
835 self.bob_crypto_other_identity().await
836 }
837
838 async fn bob_membership_change(&mut self, new_state: MembershipState) {
839 let sync_response = self
840 .sync_response_builder
841 .add_joined_room(JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID).add_state_event(
842 StateTestEvent::Custom(sync_response_member(
843 &self.bob_user_id,
844 new_state.clone(),
845 )),
846 ))
847 .build_sync_response();
848 self.room.client.process_sync(sync_response).await.unwrap();
849
850 let m = self
852 .room
853 .get_member_no_sync(&self.bob_user_id)
854 .await
855 .expect("Should not fail to get member");
856
857 match (&new_state, m) {
858 (MembershipState::Leave, None) => {}
859 (_, None) => {
860 panic!("Member should exist")
861 }
862 (_, Some(m)) => {
863 assert_eq!(*m.membership(), new_state);
864 }
865 }
866 }
867
868 async fn bob_is_pinned(&self) -> bool {
869 !self.bob_crypto_other_identity().await.identity_needs_user_approval()
870 }
871
872 async fn bob_is_verified(&self) -> bool {
873 self.bob_crypto_other_identity().await.is_verified()
874 }
875
876 async fn bob_crypto_other_identity(&self) -> OtherUserIdentity {
877 self.bob_user_identity()
878 .await
879 .expect("User identity should exist")
880 .underlying_identity()
881 .other()
882 .expect("Identity should be Other, not Own")
883 }
884
885 async fn bob_user_identity(&self) -> Option<UserIdentity> {
886 self.client
887 .encryption()
888 .get_user_identity(&self.bob_user_id)
889 .await
890 .expect("Should not fail to get user identity")
891 }
892 }
893
894 async fn create_just_me_room(
895 client: &Client,
896 sync_response_builder: &mut SyncResponseBuilder,
897 ) -> Room {
898 let create_room_sync_response = sync_response_builder
899 .add_joined_room(
900 JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID)
901 .add_state_event(StateTestEvent::Member),
902 )
903 .build_sync_response();
904 client.process_sync(create_room_sync_response).await.unwrap();
905 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
906 assert_eq!(room.state(), RoomState::Joined);
907 room
908 }
909
910 async fn create_room_with_other_member(
911 builder: &mut SyncResponseBuilder,
912 client: &Client,
913 other_user_id: &UserId,
914 ) -> Room {
915 let create_room_sync_response = builder
916 .add_joined_room(
917 JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID)
918 .add_state_event(StateTestEvent::Member)
919 .add_state_event(StateTestEvent::Custom(sync_response_member(
920 other_user_id,
921 MembershipState::Join,
922 ))),
923 )
924 .build_sync_response();
925 client.process_sync(create_room_sync_response).await.unwrap();
926 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
927 room.inner.mark_members_synced();
928
929 assert_eq!(room.state(), RoomState::Joined);
930 assert_eq!(
931 *room
932 .get_member_no_sync(other_user_id)
933 .await
934 .expect("Should not fail to get member")
935 .expect("Member should exist")
936 .membership(),
937 MembershipState::Join
938 );
939 room
940 }
941
942 async fn create_client_and_server() -> (Client, MockServer) {
943 let server = MockServer::start().await;
944 mock_members_request(&server).await;
945 mock_secret_storage_default_key(&server).await;
946 let client = logged_in_client(Some(server.uri())).await;
947 (client, server)
948 }
949
950 async fn mock_members_request(server: &MockServer) {
951 Mock::given(method("GET"))
952 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/members"))
953 .and(header("authorization", "Bearer 1234"))
954 .respond_with(
955 ResponseTemplate::new(200).set_body_json(&*test_json::members::MEMBERS),
956 )
957 .mount(server)
958 .await;
959 }
960
961 async fn mock_secret_storage_default_key(server: &MockServer) {
962 Mock::given(method("GET"))
963 .and(path_regex(
964 r"^/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
965 ))
966 .and(header("authorization", "Bearer 1234"))
967 .respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
968 .mount(server)
969 .await;
970 }
971
972 fn sync_response_member(
973 user_id: &UserId,
974 membership: MembershipState,
975 ) -> serde_json::Value {
976 json!({
977 "content": {
978 "membership": membership.to_string(),
979 },
980 "event_id": format!(
981 "$aa{}bb:localhost",
982 SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() % 100_000
983 ),
984 "origin_server_ts": 1472735824,
985 "sender": "@example:localhost",
986 "state_key": user_id,
987 "type": "m.room.member",
988 "unsigned": {
989 "age": 1234
990 }
991 })
992 }
993 }
994}