1#![cfg(feature = "e2e-encryption")]
17
18use std::collections::BTreeMap;
19
20use async_stream::stream;
21use futures_core::Stream;
22use futures_util::{stream_select, StreamExt};
23use matrix_sdk_base::crypto::{
24 IdentityState, IdentityStatusChange, RoomIdentityChange, RoomIdentityState,
25};
26use ruma::{events::room::member::SyncRoomMemberEvent, OwnedUserId, UserId};
27use tokio::sync::mpsc;
28use tokio_stream::wrappers::ReceiverStream;
29
30use super::Room;
31use crate::{
32 encryption::identities::{IdentityUpdates, UserIdentity},
33 event_handler::EventHandlerDropGuard,
34 Client, Error, Result,
35};
36
37#[derive(Debug)]
49pub struct IdentityStatusChanges {
50 room_identity_state: RoomIdentityState<Room>,
52
53 _drop_guard: EventHandlerDropGuard,
56}
57
58impl IdentityStatusChanges {
59 pub async fn create_stream(
90 room: Room,
91 ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>>> {
92 let identity_updates = wrap_identity_updates(&room.client).await?;
93 let (drop_guard, room_member_events) = wrap_room_member_events(&room);
94 let mut unprocessed_stream = combine_streams(identity_updates, room_member_events);
95 let own_user_id = room.client.user_id().ok_or(Error::InsufficientData)?.to_owned();
96
97 let mut state = IdentityStatusChanges {
98 room_identity_state: RoomIdentityState::new(room).await,
99 _drop_guard: drop_guard,
100 };
101
102 Ok(stream!({
103 let mut current_state =
104 filter_for_initial_update(state.room_identity_state.current_state(), &own_user_id);
105
106 if !current_state.is_empty() {
107 current_state.sort();
108 yield current_state;
109 }
110
111 while let Some(item) = unprocessed_stream.next().await {
112 let mut update = filter_non_self(
113 state.room_identity_state.process_change(item).await,
114 &own_user_id,
115 );
116 if !update.is_empty() {
117 update.sort();
118 yield update;
119 }
120 }
121 }))
122 }
123}
124
125fn filter_for_initial_update(
126 mut input: Vec<IdentityStatusChange>,
127 own_user_id: &UserId,
128) -> Vec<IdentityStatusChange> {
129 input.retain(|change| {
134 change.user_id != own_user_id && change.changed_to != IdentityState::Verified
135 });
136
137 input
138}
139
140fn filter_non_self(
141 mut input: Vec<IdentityStatusChange>,
142 own_user_id: &UserId,
143) -> Vec<IdentityStatusChange> {
144 input.retain(|change| change.user_id != own_user_id);
146 input
147}
148
149fn combine_streams(
150 identity_updates: impl Stream<Item = RoomIdentityChange> + Unpin,
151 room_member_events: impl Stream<Item = RoomIdentityChange> + Unpin,
152) -> impl Stream<Item = RoomIdentityChange> {
153 stream_select!(identity_updates, room_member_events)
154}
155
156async fn wrap_identity_updates(client: &Client) -> Result<impl Stream<Item = RoomIdentityChange>> {
157 Ok(client
158 .encryption()
159 .user_identities_stream()
160 .await?
161 .map(|item| RoomIdentityChange::IdentityUpdates(to_base_updates(item))))
162}
163
164fn to_base_updates(
165 input: IdentityUpdates,
166) -> matrix_sdk_base::crypto::store::types::IdentityUpdates {
167 matrix_sdk_base::crypto::store::types::IdentityUpdates {
168 new: to_base_identities(input.new),
169 changed: to_base_identities(input.changed),
170 unchanged: Default::default(),
171 }
172}
173
174fn to_base_identities(
175 input: BTreeMap<OwnedUserId, UserIdentity>,
176) -> BTreeMap<OwnedUserId, matrix_sdk_base::crypto::UserIdentity> {
177 input.into_iter().map(|(k, v)| (k, v.underlying_identity())).collect()
178}
179
180fn wrap_room_member_events(
181 room: &Room,
182) -> (EventHandlerDropGuard, impl Stream<Item = RoomIdentityChange>) {
183 let own_user_id = room.own_user_id().to_owned();
184 let room_id = room.room_id();
185 let (sender, receiver) = mpsc::channel(16);
186 let handle =
187 room.client.add_room_event_handler(room_id, move |event: SyncRoomMemberEvent| async move {
188 if *event.state_key() == own_user_id {
189 return;
190 }
191 let _: Result<_, _> =
192 sender.send(RoomIdentityChange::SyncRoomMemberEvent(Box::new(event))).await;
193 });
194 let drop_guard = room.client.event_handler_drop_guard(handle);
195 (drop_guard, ReceiverStream::new(receiver))
196}
197
198#[cfg(all(test, not(target_family = "wasm")))]
199mod tests {
200 use std::time::Duration;
201
202 use futures_util::{pin_mut, FutureExt as _, StreamExt as _};
203 use matrix_sdk_base::crypto::IdentityState;
204 use matrix_sdk_test::{async_test, test_json::keys_query_sets::IdentityChangeDataSet};
205 use test_setup::TestSetup;
206
207 use crate::assert_next_with_timeout;
208
209 #[async_test]
210 async fn test_when_user_becomes_unpinned_we_report_it() {
211 let t = TestSetup::new_room_with_other_bob().await;
213
214 t.pin_bob().await;
216
217 let stream = t.subscribe_to_identity_status_changes().await;
219 pin_mut!(stream);
220
221 t.unpin_bob().await;
223
224 let change = assert_next_with_timeout!(stream);
226 assert_eq!(change[0].user_id, t.bob_user_id());
227 assert_eq!(change[0].changed_to, IdentityState::PinViolation);
228 assert_eq!(change.len(), 1);
229 }
230
231 #[async_test]
232 async fn test_when_user_becomes_verification_violation_we_report_it() {
233 let t = TestSetup::new_room_with_other_bob().await;
235
236 t.verify_bob().await;
238
239 let stream = t.subscribe_to_identity_status_changes().await;
241 pin_mut!(stream);
242
243 t.unpin_bob().await;
245
246 let change = assert_next_with_timeout!(stream);
248 assert_eq!(change[0].user_id, t.bob_user_id());
249 assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
250 assert_eq!(change.len(), 1);
251 }
252
253 #[async_test]
254 async fn test_when_user_becomes_pinned_we_report_it() {
255 let t = TestSetup::new_room_with_other_bob().await;
257
258 t.unpin_bob().await;
260
261 let stream = t.subscribe_to_identity_status_changes().await;
263 pin_mut!(stream);
264
265 t.pin_bob().await;
267
268 let change1 = assert_next_with_timeout!(stream);
270 assert_eq!(change1[0].user_id, t.bob_user_id());
271 assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
272 assert_eq!(change1.len(), 1);
273
274 let change2 = assert_next_with_timeout!(stream);
276 assert_eq!(change2[0].user_id, t.bob_user_id());
277 assert_eq!(change2[0].changed_to, IdentityState::Pinned);
278 assert_eq!(change2.len(), 1);
279 }
280
281 #[async_test]
282 async fn test_when_user_becomes_verified_we_report_it() {
283 let t = TestSetup::new_room_with_other_bob().await;
285
286 let stream = t.subscribe_to_identity_status_changes().await;
288 pin_mut!(stream);
289
290 t.verify_bob().await;
292
293 let change = assert_next_with_timeout!(stream);
295 assert_eq!(change[0].user_id, t.bob_user_id());
296 assert_eq!(change[0].changed_to, IdentityState::Verified);
297 assert_eq!(change.len(), 1);
298
299 t.unpin_bob().await;
301
302 let change = assert_next_with_timeout!(stream);
304 assert_eq!(change[0].user_id, t.bob_user_id());
305 assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
306 assert_eq!(change.len(), 1);
307 }
308
309 #[async_test]
310 async fn test_when_an_unpinned_user_becomes_verified_we_report_it() {
311 let t = TestSetup::new_room_with_other_bob().await;
313
314 t.unpin_bob_with(IdentityChangeDataSet::key_query_with_identity_a()).await;
316
317 let stream = t.subscribe_to_identity_status_changes().await;
319 pin_mut!(stream);
320
321 t.verify_bob().await;
323
324 let change1 = assert_next_with_timeout!(stream);
326 assert_eq!(change1[0].user_id, t.bob_user_id());
327 assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
328 assert_eq!(change1.len(), 1);
329
330 let change2 = assert_next_with_timeout!(stream);
332 assert_eq!(change2[0].user_id, t.bob_user_id());
333 assert_eq!(change2[0].changed_to, IdentityState::Verified);
334 assert_eq!(change2.len(), 1);
335 }
336
337 #[async_test]
338 async fn test_when_user_in_verification_violation_becomes_verified_we_report_it() {
339 let t = TestSetup::new_room_with_other_bob().await;
341
342 t.verify_bob_with(
344 IdentityChangeDataSet::key_query_with_identity_b(),
345 IdentityChangeDataSet::master_signing_keys_b(),
346 IdentityChangeDataSet::self_signing_keys_b(),
347 )
348 .await;
349 t.unpin_bob().await;
350
351 let stream = t.subscribe_to_identity_status_changes().await;
353 pin_mut!(stream);
354
355 t.verify_bob().await;
357
358 let change1 = assert_next_with_timeout!(stream);
360 assert_eq!(change1[0].user_id, t.bob_user_id());
361 assert_eq!(change1[0].changed_to, IdentityState::VerificationViolation);
362 assert_eq!(change1.len(), 1);
363
364 let change2 = assert_next_with_timeout!(stream);
366 assert_eq!(change2[0].user_id, t.bob_user_id());
367 assert_eq!(change2[0].changed_to, IdentityState::Verified);
368 assert_eq!(change2.len(), 1);
369 }
370
371 #[async_test]
372 async fn test_when_an_unpinned_user_joins_we_report_it() {
373 let mut t = TestSetup::new_just_me_room().await;
375
376 t.unpin_bob().await;
378
379 let stream = t.subscribe_to_identity_status_changes().await;
381 pin_mut!(stream);
382
383 t.bob_joins().await;
385
386 let change = assert_next_with_timeout!(stream);
388 assert_eq!(change[0].user_id, t.bob_user_id());
389 assert_eq!(change[0].changed_to, IdentityState::PinViolation);
390 assert_eq!(change.len(), 1);
391 }
392
393 #[async_test]
394 async fn test_when_an_verification_violating_user_joins_we_report_it() {
395 let mut t = TestSetup::new_just_me_room().await;
397
398 t.verify_bob().await;
400 t.unpin_bob().await;
401
402 let stream = t.subscribe_to_identity_status_changes().await;
404 pin_mut!(stream);
405
406 t.bob_joins().await;
408
409 let change = assert_next_with_timeout!(stream);
411 assert_eq!(change[0].user_id, t.bob_user_id());
412 assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
413 assert_eq!(change.len(), 1);
414 }
415
416 #[async_test]
417 async fn test_when_a_verified_user_joins_we_dont_report_it() {
418 let mut t = TestSetup::new_just_me_room().await;
420
421 t.verify_bob().await;
423
424 let stream = t.subscribe_to_identity_status_changes().await;
426 pin_mut!(stream);
427
428 t.bob_joins().await;
430
431 t.unpin_bob().await;
433
434 let change = assert_next_with_timeout!(stream);
436 assert_eq!(change[0].user_id, t.bob_user_id());
437 assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
438 assert_eq!(change.len(), 1);
439 }
440
441 #[async_test]
442 async fn test_when_a_pinned_user_joins_we_do_not_report() {
443 let mut t = TestSetup::new_just_me_room().await;
445
446 t.pin_bob().await;
448
449 let stream = t.subscribe_to_identity_status_changes().await;
451 pin_mut!(stream);
452
453 t.bob_joins().await;
455
456 tokio::time::sleep(Duration::from_millis(200)).await;
458 let change = stream.next().now_or_never();
459 assert!(change.is_none());
460 }
461
462 #[async_test]
463 async fn test_when_an_unpinned_user_leaves_we_report_it() {
464 let mut t = TestSetup::new_room_with_other_bob().await;
466
467 t.unpin_bob().await;
469
470 let stream = t.subscribe_to_identity_status_changes().await;
472 pin_mut!(stream);
473
474 t.bob_leaves().await;
476
477 let change1 = assert_next_with_timeout!(stream);
479 assert_eq!(change1[0].user_id, t.bob_user_id());
480 assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
481 assert_eq!(change1.len(), 1);
482
483 let change2 = assert_next_with_timeout!(stream);
485 assert_eq!(change2[0].user_id, t.bob_user_id());
488 assert_eq!(change2[0].changed_to, IdentityState::Pinned);
489 assert_eq!(change2.len(), 1);
490 }
491
492 #[async_test]
493 async fn test_multiple_identity_changes_are_reported() {
494 let mut t = TestSetup::new_just_me_room().await;
496
497 t.unpin_bob().await;
499
500 let stream = t.subscribe_to_identity_status_changes().await;
502 pin_mut!(stream);
503
504 t.bob_joins().await;
513 let change1 = assert_next_with_timeout!(stream);
514
515 t.pin_bob().await;
517 let change2 = assert_next_with_timeout!(stream);
518
519 t.bob_leaves().await;
521 t.bob_joins().await;
522
523 t.unpin_bob().await;
525 let change3 = assert_next_with_timeout!(stream);
526
527 t.bob_leaves().await;
529 let change4 = assert_next_with_timeout!(stream);
530
531 assert_eq!(change1[0].user_id, t.bob_user_id());
532 assert_eq!(change2[0].user_id, t.bob_user_id());
533 assert_eq!(change3[0].user_id, t.bob_user_id());
534 assert_eq!(change4[0].user_id, t.bob_user_id());
535
536 assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
537 assert_eq!(change2[0].changed_to, IdentityState::Pinned);
538 assert_eq!(change3[0].changed_to, IdentityState::PinViolation);
539 assert_eq!(change4[0].changed_to, IdentityState::Pinned);
540
541 assert_eq!(change1.len(), 1);
542 assert_eq!(change2.len(), 1);
543 assert_eq!(change3.len(), 1);
544 assert_eq!(change4.len(), 1);
545 }
546
547 #[async_test]
548 async fn test_when_an_unpinned_user_is_already_present_we_report_it_immediately() {
549 let t = TestSetup::new_room_with_other_bob().await;
551 t.unpin_bob().await;
552
553 let stream = t.subscribe_to_identity_status_changes().await;
555 pin_mut!(stream);
556
557 let change = assert_next_with_timeout!(stream);
559 assert_eq!(change[0].user_id, t.bob_user_id());
560 assert_eq!(change[0].changed_to, IdentityState::PinViolation);
561 assert_eq!(change.len(), 1);
562 }
563
564 #[async_test]
565 async fn test_when_a_verified_user_is_already_present_we_dont_report_it() {
566 let t = TestSetup::new_room_with_other_bob().await;
568 t.verify_bob().await;
569
570 let stream = t.subscribe_to_identity_status_changes().await;
572 pin_mut!(stream);
573
574 t.unpin_bob().await;
576
577 let next_change = assert_next_with_timeout!(stream);
579
580 assert_eq!(next_change[0].user_id, t.bob_user_id());
581 assert_eq!(next_change[0].changed_to, IdentityState::VerificationViolation);
582 assert_eq!(next_change.len(), 1);
583 }
584
585 mod test_setup {
590 use std::time::{SystemTime, UNIX_EPOCH};
591
592 use futures_core::Stream;
593 use matrix_sdk_base::{
594 crypto::{
595 testing::simulate_key_query_response_for_verification, IdentityStatusChange,
596 OtherUserIdentity,
597 },
598 RoomState,
599 };
600 use matrix_sdk_test::{
601 test_json, test_json::keys_query_sets::IdentityChangeDataSet, JoinedRoomBuilder,
602 StateTestEvent, SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
603 };
604 use ruma::{
605 api::client::keys::{get_keys, get_keys::v3::Response as KeyQueryResponse},
606 events::room::member::MembershipState,
607 owned_user_id, OwnedUserId, TransactionId, UserId,
608 };
609 use serde_json::json;
610 use wiremock::{
611 matchers::{header, method, path_regex},
612 Mock, MockServer, ResponseTemplate,
613 };
614
615 use crate::{
616 encryption::identities::UserIdentity, test_utils::logged_in_client, Client, Room,
617 };
618
619 pub(super) struct TestSetup {
629 client: Client,
630 bob_user_id: OwnedUserId,
631 sync_response_builder: SyncResponseBuilder,
632 room: Room,
633 }
634
635 impl TestSetup {
636 pub(super) async fn new_just_me_room() -> Self {
637 let (client, user_id, mut sync_response_builder) = Self::init().await;
638 let room = create_just_me_room(&client, &mut sync_response_builder).await;
639 Self { client, bob_user_id: user_id, sync_response_builder, room }
640 }
641
642 pub(super) async fn new_room_with_other_bob() -> Self {
643 let (client, bob_user_id, mut sync_response_builder) = Self::init().await;
644 let room = create_room_with_other_member(
645 &mut sync_response_builder,
646 &client,
647 &bob_user_id,
648 )
649 .await;
650 Self { client, bob_user_id, sync_response_builder, room }
651 }
652
653 pub(super) fn bob_user_id(&self) -> &UserId {
654 &self.bob_user_id
655 }
656
657 pub(super) async fn pin_bob(&self) {
658 if self.bob_user_identity().await.is_some() {
659 assert!(
660 !self.bob_is_pinned().await,
661 "pin_bob() called when the identity is already pinned!"
662 );
663
664 self.bob_user_identity()
666 .await
667 .expect("User should exist")
668 .pin()
669 .await
670 .expect("Should not fail to pin");
671 } else {
672 self.change_bob_identity(IdentityChangeDataSet::key_query_with_identity_a())
674 .await;
675 }
676
677 assert!(self.bob_is_pinned().await);
679 }
680
681 pub(super) async fn unpin_bob(&self) {
682 self.unpin_bob_with(IdentityChangeDataSet::key_query_with_identity_b()).await;
683 }
684
685 pub(super) async fn unpin_bob_with(&self, requested: KeyQueryResponse) {
686 fn master_key_json(key_query_response: &KeyQueryResponse) -> String {
687 serde_json::to_string(
688 key_query_response
689 .master_keys
690 .first_key_value()
691 .expect("Master key should have a value")
692 .1,
693 )
694 .expect("Should be able to serialise master key")
695 }
696
697 let a = IdentityChangeDataSet::key_query_with_identity_a();
698 let b = IdentityChangeDataSet::key_query_with_identity_b();
699 let requested_master_key = master_key_json(&requested);
700 let a_master_key = master_key_json(&a);
701
702 if requested_master_key == a_master_key {
706 self.change_bob_identity(b).await;
707 if !self.bob_is_pinned().await {
708 self.pin_bob().await;
709 }
710 self.change_bob_identity(a).await;
711 } else {
712 self.change_bob_identity(a).await;
713 if !self.bob_is_pinned().await {
714 self.pin_bob().await;
715 }
716 self.change_bob_identity(b).await;
717 }
718
719 assert!(!self.bob_is_pinned().await);
721 }
722
723 pub(super) async fn verify_bob(&self) {
724 self.verify_bob_with(
725 IdentityChangeDataSet::key_query_with_identity_a(),
726 IdentityChangeDataSet::master_signing_keys_a(),
727 IdentityChangeDataSet::self_signing_keys_a(),
728 )
729 .await;
730 }
731
732 pub(super) async fn verify_bob_with(
733 &self,
734 key_query: KeyQueryResponse,
735 master_signing_key: serde_json::Value,
736 self_signing_key: serde_json::Value,
737 ) {
738 self.change_bob_identity(key_query).await;
740
741 let my_user_id = self.client.user_id().expect("I should have a user id");
742 let my_identity = self
743 .client
744 .encryption()
745 .get_user_identity(my_user_id)
746 .await
747 .expect("Should not fail to get own user identity")
748 .expect("Should have an own user identity")
749 .underlying_identity()
750 .own()
751 .expect("Our own identity should be of type Own");
752
753 let signature_upload_request = self
755 .bob_crypto_other_identity()
756 .await
757 .verify()
758 .await
759 .expect("Should be able to verify other identity");
760
761 let verification_response = simulate_key_query_response_for_verification(
762 signature_upload_request,
763 my_identity,
764 my_user_id,
765 self.bob_user_id(),
766 master_signing_key,
767 self_signing_key,
768 );
769
770 self.client
772 .mark_request_as_sent(&TransactionId::new(), &verification_response)
773 .await
774 .unwrap();
775
776 assert!(self.bob_is_verified().await);
778 }
779
780 pub(super) async fn bob_joins(&mut self) {
781 self.bob_membership_change(MembershipState::Join).await;
782 }
783
784 pub(super) async fn bob_leaves(&mut self) {
785 self.bob_membership_change(MembershipState::Leave).await;
786 }
787
788 pub(super) async fn subscribe_to_identity_status_changes(
789 &self,
790 ) -> impl Stream<Item = Vec<IdentityStatusChange>> {
791 self.room
792 .subscribe_to_identity_status_changes()
793 .await
794 .expect("Should be able to subscribe")
795 }
796
797 async fn init() -> (Client, OwnedUserId, SyncResponseBuilder) {
798 let (client, _server) = create_client_and_server().await;
799
800 client
802 .olm_machine()
803 .await
804 .as_ref()
805 .expect("We should have an Olm machine")
806 .bootstrap_cross_signing(true)
807 .await
808 .expect("Should be able to bootstrap cross-signing");
809
810 let bob_user_id = owned_user_id!("@bob:localhost");
813
814 let sync_response_builder = SyncResponseBuilder::default();
815
816 (client, bob_user_id, sync_response_builder)
817 }
818
819 async fn change_bob_identity(
820 &self,
821 key_query_response: get_keys::v3::Response,
822 ) -> OtherUserIdentity {
823 self.client
824 .mark_request_as_sent(&TransactionId::new(), &key_query_response)
825 .await
826 .expect("Should not fail to send identity changes");
827
828 self.bob_crypto_other_identity().await
829 }
830
831 async fn bob_membership_change(&mut self, new_state: MembershipState) {
832 let sync_response = self
833 .sync_response_builder
834 .add_joined_room(JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID).add_state_event(
835 StateTestEvent::Custom(sync_response_member(
836 &self.bob_user_id,
837 new_state.clone(),
838 )),
839 ))
840 .build_sync_response();
841 self.room.client.process_sync(sync_response).await.unwrap();
842
843 let m = self
845 .room
846 .get_member_no_sync(&self.bob_user_id)
847 .await
848 .expect("Should not fail to get member");
849
850 match (&new_state, m) {
851 (MembershipState::Leave, None) => {}
852 (_, None) => {
853 panic!("Member should exist")
854 }
855 (_, Some(m)) => {
856 assert_eq!(*m.membership(), new_state);
857 }
858 };
859 }
860
861 async fn bob_is_pinned(&self) -> bool {
862 !self.bob_crypto_other_identity().await.identity_needs_user_approval()
863 }
864
865 async fn bob_is_verified(&self) -> bool {
866 self.bob_crypto_other_identity().await.is_verified()
867 }
868
869 async fn bob_crypto_other_identity(&self) -> OtherUserIdentity {
870 self.bob_user_identity()
871 .await
872 .expect("User identity should exist")
873 .underlying_identity()
874 .other()
875 .expect("Identity should be Other, not Own")
876 }
877
878 async fn bob_user_identity(&self) -> Option<UserIdentity> {
879 self.client
880 .encryption()
881 .get_user_identity(&self.bob_user_id)
882 .await
883 .expect("Should not fail to get user identity")
884 }
885 }
886
887 async fn create_just_me_room(
888 client: &Client,
889 sync_response_builder: &mut SyncResponseBuilder,
890 ) -> Room {
891 let create_room_sync_response = sync_response_builder
892 .add_joined_room(
893 JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID)
894 .add_state_event(StateTestEvent::Member),
895 )
896 .build_sync_response();
897 client.process_sync(create_room_sync_response).await.unwrap();
898 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
899 assert_eq!(room.state(), RoomState::Joined);
900 room
901 }
902
903 async fn create_room_with_other_member(
904 builder: &mut SyncResponseBuilder,
905 client: &Client,
906 other_user_id: &UserId,
907 ) -> Room {
908 let create_room_sync_response = builder
909 .add_joined_room(
910 JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID)
911 .add_state_event(StateTestEvent::Member)
912 .add_state_event(StateTestEvent::Custom(sync_response_member(
913 other_user_id,
914 MembershipState::Join,
915 ))),
916 )
917 .build_sync_response();
918 client.process_sync(create_room_sync_response).await.unwrap();
919 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
920 room.inner.mark_members_synced();
921
922 assert_eq!(room.state(), RoomState::Joined);
923 assert_eq!(
924 *room
925 .get_member_no_sync(other_user_id)
926 .await
927 .expect("Should not fail to get member")
928 .expect("Member should exist")
929 .membership(),
930 MembershipState::Join
931 );
932 room
933 }
934
935 async fn create_client_and_server() -> (Client, MockServer) {
936 let server = MockServer::start().await;
937 mock_members_request(&server).await;
938 mock_secret_storage_default_key(&server).await;
939 let client = logged_in_client(Some(server.uri())).await;
940 (client, server)
941 }
942
943 async fn mock_members_request(server: &MockServer) {
944 Mock::given(method("GET"))
945 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/members"))
946 .and(header("authorization", "Bearer 1234"))
947 .respond_with(
948 ResponseTemplate::new(200).set_body_json(&*test_json::members::MEMBERS),
949 )
950 .mount(server)
951 .await;
952 }
953
954 async fn mock_secret_storage_default_key(server: &MockServer) {
955 Mock::given(method("GET"))
956 .and(path_regex(
957 r"^/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
958 ))
959 .and(header("authorization", "Bearer 1234"))
960 .respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
961 .mount(server)
962 .await;
963 }
964
965 fn sync_response_member(
966 user_id: &UserId,
967 membership: MembershipState,
968 ) -> serde_json::Value {
969 json!({
970 "content": {
971 "membership": membership.to_string(),
972 },
973 "event_id": format!(
974 "$aa{}bb:localhost",
975 SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() % 100_000
976 ),
977 "origin_server_ts": 1472735824,
978 "sender": "@example:localhost",
979 "state_key": user_id,
980 "type": "m.room.member",
981 "unsigned": {
982 "age": 1234
983 }
984 })
985 }
986 }
987}