1#![cfg(feature = "e2e-encryption")]
17
18use std::collections::BTreeMap;
19
20use async_stream::stream;
21use futures_core::Stream;
22use futures_util::{StreamExt, stream_select};
23use matrix_sdk_base::crypto::{
24 IdentityState, IdentityStatusChange, RoomIdentityChange, RoomIdentityState,
25};
26use ruma::{OwnedUserId, UserId, events::room::member::SyncRoomMemberEvent};
27use tokio::sync::mpsc;
28use tokio_stream::wrappers::ReceiverStream;
29
30use super::Room;
31use crate::{
32 Client, Error, Result,
33 encryption::identities::{IdentityUpdates, UserIdentity},
34 event_handler::EventHandlerDropGuard,
35};
36
37#[derive(Debug)]
49pub struct IdentityStatusChanges {
50 room_identity_state: RoomIdentityState<Room>,
52
53 _drop_guard: EventHandlerDropGuard,
56}
57
58impl IdentityStatusChanges {
59 pub async fn create_stream(
90 room: Room,
91 ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>>> {
92 let identity_updates = wrap_identity_updates(&room.client).await?;
93 let (drop_guard, room_member_events) = wrap_room_member_events(&room);
94 let mut unprocessed_stream = combine_streams(identity_updates, room_member_events);
95 let own_user_id = room.client.user_id().ok_or(Error::InsufficientData)?.to_owned();
96
97 let mut state = IdentityStatusChanges {
98 room_identity_state: RoomIdentityState::new(room).await,
99 _drop_guard: drop_guard,
100 };
101
102 Ok(stream!({
103 let mut current_state =
104 filter_for_initial_update(state.room_identity_state.current_state(), &own_user_id);
105
106 if !current_state.is_empty() {
107 current_state.sort();
108 yield current_state;
109 }
110
111 while let Some(item) = unprocessed_stream.next().await {
112 let mut update = filter_non_self(
113 state.room_identity_state.process_change(item).await,
114 &own_user_id,
115 );
116 if !update.is_empty() {
117 update.sort();
118 yield update;
119 }
120 }
121 }))
122 }
123}
124
125fn filter_for_initial_update(
126 mut input: Vec<IdentityStatusChange>,
127 own_user_id: &UserId,
128) -> Vec<IdentityStatusChange> {
129 input.retain(|change| {
134 change.user_id != own_user_id && change.changed_to != IdentityState::Verified
135 });
136
137 input
138}
139
140fn filter_non_self(
141 mut input: Vec<IdentityStatusChange>,
142 own_user_id: &UserId,
143) -> Vec<IdentityStatusChange> {
144 input.retain(|change| change.user_id != own_user_id);
146 input
147}
148
149fn combine_streams(
150 identity_updates: impl Stream<Item = RoomIdentityChange> + Unpin,
151 room_member_events: impl Stream<Item = RoomIdentityChange> + Unpin,
152) -> impl Stream<Item = RoomIdentityChange> {
153 stream_select!(identity_updates, room_member_events)
154}
155
156async fn wrap_identity_updates(
157 client: &Client,
158) -> Result<impl Stream<Item = RoomIdentityChange> + use<>> {
159 Ok(client
160 .encryption()
161 .user_identities_stream()
162 .await?
163 .map(|item| RoomIdentityChange::IdentityUpdates(to_base_updates(item))))
164}
165
166fn to_base_updates(
167 input: IdentityUpdates,
168) -> matrix_sdk_base::crypto::store::types::IdentityUpdates {
169 matrix_sdk_base::crypto::store::types::IdentityUpdates {
170 new: to_base_identities(input.new),
171 changed: to_base_identities(input.changed),
172 unchanged: Default::default(),
173 }
174}
175
176fn to_base_identities(
177 input: BTreeMap<OwnedUserId, UserIdentity>,
178) -> BTreeMap<OwnedUserId, matrix_sdk_base::crypto::UserIdentity> {
179 input.into_iter().map(|(k, v)| (k, v.underlying_identity())).collect()
180}
181
182fn wrap_room_member_events(
183 room: &Room,
184) -> (EventHandlerDropGuard, impl Stream<Item = RoomIdentityChange> + use<>) {
185 let own_user_id = room.own_user_id().to_owned();
186 let room_id = room.room_id();
187 let (sender, receiver) = mpsc::channel(16);
188 let handle =
189 room.client.add_room_event_handler(room_id, move |event: SyncRoomMemberEvent| async move {
190 if *event.state_key() == own_user_id {
191 return;
192 }
193 let _: Result<_, _> =
194 sender.send(RoomIdentityChange::SyncRoomMemberEvent(Box::new(event))).await;
195 });
196 let drop_guard = room.client.event_handler_drop_guard(handle);
197 (drop_guard, ReceiverStream::new(receiver))
198}
199
200#[cfg(all(test, not(target_family = "wasm")))]
201mod tests {
202 use std::time::Duration;
203
204 use futures_util::{FutureExt as _, StreamExt as _, pin_mut};
205 use matrix_sdk_base::crypto::IdentityState;
206 use matrix_sdk_test::{async_test, test_json::keys_query_sets::IdentityChangeDataSet};
207 use test_setup::TestSetup;
208
209 use crate::assert_next_with_timeout;
210
211 #[async_test]
212 async fn test_when_user_becomes_unpinned_we_report_it() {
213 let t = TestSetup::new_room_with_other_bob().await;
215
216 t.pin_bob().await;
218
219 let stream = t.subscribe_to_identity_status_changes().await;
221 pin_mut!(stream);
222
223 t.unpin_bob().await;
225
226 let change = assert_next_with_timeout!(stream);
228 assert_eq!(change[0].user_id, t.bob_user_id());
229 assert_eq!(change[0].changed_to, IdentityState::PinViolation);
230 assert_eq!(change.len(), 1);
231 }
232
233 #[async_test]
234 async fn test_when_user_becomes_verification_violation_we_report_it() {
235 let t = TestSetup::new_room_with_other_bob().await;
237
238 t.verify_bob().await;
240
241 let stream = t.subscribe_to_identity_status_changes().await;
243 pin_mut!(stream);
244
245 t.unpin_bob().await;
247
248 let change = assert_next_with_timeout!(stream);
250 assert_eq!(change[0].user_id, t.bob_user_id());
251 assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
252 assert_eq!(change.len(), 1);
253 }
254
255 #[async_test]
256 async fn test_when_user_becomes_pinned_we_report_it() {
257 let t = TestSetup::new_room_with_other_bob().await;
259
260 t.unpin_bob().await;
262
263 let stream = t.subscribe_to_identity_status_changes().await;
265 pin_mut!(stream);
266
267 t.pin_bob().await;
269
270 let change1 = assert_next_with_timeout!(stream);
272 assert_eq!(change1[0].user_id, t.bob_user_id());
273 assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
274 assert_eq!(change1.len(), 1);
275
276 let change2 = assert_next_with_timeout!(stream);
278 assert_eq!(change2[0].user_id, t.bob_user_id());
279 assert_eq!(change2[0].changed_to, IdentityState::Pinned);
280 assert_eq!(change2.len(), 1);
281 }
282
283 #[async_test]
284 async fn test_when_user_becomes_verified_we_report_it() {
285 let t = TestSetup::new_room_with_other_bob().await;
287
288 let stream = t.subscribe_to_identity_status_changes().await;
290 pin_mut!(stream);
291
292 t.verify_bob().await;
294
295 let change = assert_next_with_timeout!(stream);
297 assert_eq!(change[0].user_id, t.bob_user_id());
298 assert_eq!(change[0].changed_to, IdentityState::Verified);
299 assert_eq!(change.len(), 1);
300
301 t.unpin_bob().await;
303
304 let change = assert_next_with_timeout!(stream);
306 assert_eq!(change[0].user_id, t.bob_user_id());
307 assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
308 assert_eq!(change.len(), 1);
309 }
310
311 #[async_test]
312 async fn test_when_an_unpinned_user_becomes_verified_we_report_it() {
313 let t = TestSetup::new_room_with_other_bob().await;
315
316 t.unpin_bob_with(IdentityChangeDataSet::key_query_with_identity_a()).await;
318
319 let stream = t.subscribe_to_identity_status_changes().await;
321 pin_mut!(stream);
322
323 t.verify_bob().await;
325
326 let change1 = assert_next_with_timeout!(stream);
328 assert_eq!(change1[0].user_id, t.bob_user_id());
329 assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
330 assert_eq!(change1.len(), 1);
331
332 let change2 = assert_next_with_timeout!(stream);
334 assert_eq!(change2[0].user_id, t.bob_user_id());
335 assert_eq!(change2[0].changed_to, IdentityState::Verified);
336 assert_eq!(change2.len(), 1);
337 }
338
339 #[async_test]
340 async fn test_when_user_in_verification_violation_becomes_verified_we_report_it() {
341 let t = TestSetup::new_room_with_other_bob().await;
343
344 t.verify_bob_with(
346 IdentityChangeDataSet::key_query_with_identity_b(),
347 IdentityChangeDataSet::master_signing_keys_b(),
348 IdentityChangeDataSet::self_signing_keys_b(),
349 )
350 .await;
351 t.unpin_bob().await;
352
353 let stream = t.subscribe_to_identity_status_changes().await;
355 pin_mut!(stream);
356
357 t.verify_bob().await;
359
360 let change1 = assert_next_with_timeout!(stream);
362 assert_eq!(change1[0].user_id, t.bob_user_id());
363 assert_eq!(change1[0].changed_to, IdentityState::VerificationViolation);
364 assert_eq!(change1.len(), 1);
365
366 let change2 = assert_next_with_timeout!(stream);
368 assert_eq!(change2[0].user_id, t.bob_user_id());
369 assert_eq!(change2[0].changed_to, IdentityState::Verified);
370 assert_eq!(change2.len(), 1);
371 }
372
373 #[async_test]
374 async fn test_when_an_unpinned_user_joins_we_report_it() {
375 let mut t = TestSetup::new_just_me_room().await;
377
378 t.unpin_bob().await;
380
381 let stream = t.subscribe_to_identity_status_changes().await;
383 pin_mut!(stream);
384
385 t.bob_joins().await;
387
388 let change = assert_next_with_timeout!(stream);
390 assert_eq!(change[0].user_id, t.bob_user_id());
391 assert_eq!(change[0].changed_to, IdentityState::PinViolation);
392 assert_eq!(change.len(), 1);
393 }
394
395 #[async_test]
396 async fn test_when_an_verification_violating_user_joins_we_report_it() {
397 let mut t = TestSetup::new_just_me_room().await;
399
400 t.verify_bob().await;
402 t.unpin_bob().await;
403
404 let stream = t.subscribe_to_identity_status_changes().await;
406 pin_mut!(stream);
407
408 t.bob_joins().await;
410
411 let change = assert_next_with_timeout!(stream);
413 assert_eq!(change[0].user_id, t.bob_user_id());
414 assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
415 assert_eq!(change.len(), 1);
416 }
417
418 #[async_test]
419 async fn test_when_a_verified_user_joins_we_dont_report_it() {
420 let mut t = TestSetup::new_just_me_room().await;
422
423 t.verify_bob().await;
425
426 let stream = t.subscribe_to_identity_status_changes().await;
428 pin_mut!(stream);
429
430 t.bob_joins().await;
432
433 t.unpin_bob().await;
435
436 let change = assert_next_with_timeout!(stream);
438 assert_eq!(change[0].user_id, t.bob_user_id());
439 assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
440 assert_eq!(change.len(), 1);
441 }
442
443 #[async_test]
444 async fn test_when_a_pinned_user_joins_we_do_not_report() {
445 let mut t = TestSetup::new_just_me_room().await;
447
448 t.pin_bob().await;
450
451 let stream = t.subscribe_to_identity_status_changes().await;
453 pin_mut!(stream);
454
455 t.bob_joins().await;
457
458 tokio::time::sleep(Duration::from_millis(200)).await;
460 let change = stream.next().now_or_never();
461 assert!(change.is_none());
462 }
463
464 #[async_test]
465 async fn test_when_an_unpinned_user_leaves_we_report_it() {
466 let mut t = TestSetup::new_room_with_other_bob().await;
468
469 t.unpin_bob().await;
471
472 let stream = t.subscribe_to_identity_status_changes().await;
474 pin_mut!(stream);
475
476 t.bob_leaves().await;
478
479 let change1 = assert_next_with_timeout!(stream);
481 assert_eq!(change1[0].user_id, t.bob_user_id());
482 assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
483 assert_eq!(change1.len(), 1);
484
485 let change2 = assert_next_with_timeout!(stream);
487 assert_eq!(change2[0].user_id, t.bob_user_id());
490 assert_eq!(change2[0].changed_to, IdentityState::Pinned);
491 assert_eq!(change2.len(), 1);
492 }
493
494 #[async_test]
495 async fn test_multiple_identity_changes_are_reported() {
496 let mut t = TestSetup::new_just_me_room().await;
498
499 t.unpin_bob().await;
501
502 let stream = t.subscribe_to_identity_status_changes().await;
504 pin_mut!(stream);
505
506 t.bob_joins().await;
515 let change1 = assert_next_with_timeout!(stream);
516
517 t.pin_bob().await;
519 let change2 = assert_next_with_timeout!(stream);
520
521 t.bob_leaves().await;
523 t.bob_joins().await;
524
525 t.unpin_bob().await;
527 let change3 = assert_next_with_timeout!(stream);
528
529 t.bob_leaves().await;
531 let change4 = assert_next_with_timeout!(stream);
532
533 assert_eq!(change1[0].user_id, t.bob_user_id());
534 assert_eq!(change2[0].user_id, t.bob_user_id());
535 assert_eq!(change3[0].user_id, t.bob_user_id());
536 assert_eq!(change4[0].user_id, t.bob_user_id());
537
538 assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
539 assert_eq!(change2[0].changed_to, IdentityState::Pinned);
540 assert_eq!(change3[0].changed_to, IdentityState::PinViolation);
541 assert_eq!(change4[0].changed_to, IdentityState::Pinned);
542
543 assert_eq!(change1.len(), 1);
544 assert_eq!(change2.len(), 1);
545 assert_eq!(change3.len(), 1);
546 assert_eq!(change4.len(), 1);
547 }
548
549 #[async_test]
550 async fn test_when_an_unpinned_user_is_already_present_we_report_it_immediately() {
551 let t = TestSetup::new_room_with_other_bob().await;
553 t.unpin_bob().await;
554
555 let stream = t.subscribe_to_identity_status_changes().await;
557 pin_mut!(stream);
558
559 let change = assert_next_with_timeout!(stream);
561 assert_eq!(change[0].user_id, t.bob_user_id());
562 assert_eq!(change[0].changed_to, IdentityState::PinViolation);
563 assert_eq!(change.len(), 1);
564 }
565
566 #[async_test]
567 async fn test_when_a_verified_user_is_already_present_we_dont_report_it() {
568 let t = TestSetup::new_room_with_other_bob().await;
570 t.verify_bob().await;
571
572 let stream = t.subscribe_to_identity_status_changes().await;
574 pin_mut!(stream);
575
576 t.unpin_bob().await;
578
579 let next_change = assert_next_with_timeout!(stream);
581
582 assert_eq!(next_change[0].user_id, t.bob_user_id());
583 assert_eq!(next_change[0].changed_to, IdentityState::VerificationViolation);
584 assert_eq!(next_change.len(), 1);
585 }
586
587 mod test_setup {
592 use std::time::{SystemTime, UNIX_EPOCH};
593
594 use futures_core::Stream;
595 use matrix_sdk_base::{
596 RoomState,
597 crypto::{
598 IdentityStatusChange, OtherUserIdentity,
599 testing::simulate_key_query_response_for_verification,
600 },
601 };
602 use matrix_sdk_test::{
603 DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder,
604 test_json, test_json::keys_query_sets::IdentityChangeDataSet,
605 };
606 use ruma::{
607 OwnedUserId, TransactionId, UserId,
608 api::client::keys::{get_keys, get_keys::v3::Response as KeyQueryResponse},
609 events::room::member::MembershipState,
610 owned_user_id,
611 };
612 use serde_json::json;
613 use wiremock::{
614 Mock, MockServer, ResponseTemplate,
615 matchers::{header, method, path_regex},
616 };
617
618 use crate::{
619 Client, Room, encryption::identities::UserIdentity, test_utils::logged_in_client,
620 };
621
622 pub(super) struct TestSetup {
632 client: Client,
633 bob_user_id: OwnedUserId,
634 sync_response_builder: SyncResponseBuilder,
635 room: Room,
636 }
637
638 impl TestSetup {
639 pub(super) async fn new_just_me_room() -> Self {
640 let (client, user_id, mut sync_response_builder) = Self::init().await;
641 let room = create_just_me_room(&client, &mut sync_response_builder).await;
642 Self { client, bob_user_id: user_id, sync_response_builder, room }
643 }
644
645 pub(super) async fn new_room_with_other_bob() -> Self {
646 let (client, bob_user_id, mut sync_response_builder) = Self::init().await;
647 let room = create_room_with_other_member(
648 &mut sync_response_builder,
649 &client,
650 &bob_user_id,
651 )
652 .await;
653 Self { client, bob_user_id, sync_response_builder, room }
654 }
655
656 pub(super) fn bob_user_id(&self) -> &UserId {
657 &self.bob_user_id
658 }
659
660 pub(super) async fn pin_bob(&self) {
661 if self.bob_user_identity().await.is_some() {
662 assert!(
663 !self.bob_is_pinned().await,
664 "pin_bob() called when the identity is already pinned!"
665 );
666
667 self.bob_user_identity()
669 .await
670 .expect("User should exist")
671 .pin()
672 .await
673 .expect("Should not fail to pin");
674 } else {
675 self.change_bob_identity(IdentityChangeDataSet::key_query_with_identity_a())
677 .await;
678 }
679
680 assert!(self.bob_is_pinned().await);
682 }
683
684 pub(super) async fn unpin_bob(&self) {
685 self.unpin_bob_with(IdentityChangeDataSet::key_query_with_identity_b()).await;
686 }
687
688 pub(super) async fn unpin_bob_with(&self, requested: KeyQueryResponse) {
689 fn master_key_json(key_query_response: &KeyQueryResponse) -> String {
690 serde_json::to_string(
691 key_query_response
692 .master_keys
693 .first_key_value()
694 .expect("Master key should have a value")
695 .1,
696 )
697 .expect("Should be able to serialise master key")
698 }
699
700 let a = IdentityChangeDataSet::key_query_with_identity_a();
701 let b = IdentityChangeDataSet::key_query_with_identity_b();
702 let requested_master_key = master_key_json(&requested);
703 let a_master_key = master_key_json(&a);
704
705 if requested_master_key == a_master_key {
709 self.change_bob_identity(b).await;
710 if !self.bob_is_pinned().await {
711 self.pin_bob().await;
712 }
713 self.change_bob_identity(a).await;
714 } else {
715 self.change_bob_identity(a).await;
716 if !self.bob_is_pinned().await {
717 self.pin_bob().await;
718 }
719 self.change_bob_identity(b).await;
720 }
721
722 assert!(!self.bob_is_pinned().await);
724 }
725
726 pub(super) async fn verify_bob(&self) {
727 self.verify_bob_with(
728 IdentityChangeDataSet::key_query_with_identity_a(),
729 IdentityChangeDataSet::master_signing_keys_a(),
730 IdentityChangeDataSet::self_signing_keys_a(),
731 )
732 .await;
733 }
734
735 pub(super) async fn verify_bob_with(
736 &self,
737 key_query: KeyQueryResponse,
738 master_signing_key: serde_json::Value,
739 self_signing_key: serde_json::Value,
740 ) {
741 self.change_bob_identity(key_query).await;
743
744 let my_user_id = self.client.user_id().expect("I should have a user id");
745 let my_identity = self
746 .client
747 .encryption()
748 .get_user_identity(my_user_id)
749 .await
750 .expect("Should not fail to get own user identity")
751 .expect("Should have an own user identity")
752 .underlying_identity()
753 .own()
754 .expect("Our own identity should be of type Own");
755
756 let signature_upload_request = self
758 .bob_crypto_other_identity()
759 .await
760 .verify()
761 .await
762 .expect("Should be able to verify other identity");
763
764 let verification_response = simulate_key_query_response_for_verification(
765 signature_upload_request,
766 my_identity,
767 my_user_id,
768 self.bob_user_id(),
769 master_signing_key,
770 self_signing_key,
771 );
772
773 self.client
775 .mark_request_as_sent(&TransactionId::new(), &verification_response)
776 .await
777 .unwrap();
778
779 assert!(self.bob_is_verified().await);
781 }
782
783 pub(super) async fn bob_joins(&mut self) {
784 self.bob_membership_change(MembershipState::Join).await;
785 }
786
787 pub(super) async fn bob_leaves(&mut self) {
788 self.bob_membership_change(MembershipState::Leave).await;
789 }
790
791 pub(super) async fn subscribe_to_identity_status_changes(
792 &self,
793 ) -> impl Stream<Item = Vec<IdentityStatusChange>> + use<> {
794 self.room
795 .subscribe_to_identity_status_changes()
796 .await
797 .expect("Should be able to subscribe")
798 }
799
800 async fn init() -> (Client, OwnedUserId, SyncResponseBuilder) {
801 let (client, _server) = create_client_and_server().await;
802
803 client
805 .olm_machine()
806 .await
807 .as_ref()
808 .expect("We should have an Olm machine")
809 .bootstrap_cross_signing(true)
810 .await
811 .expect("Should be able to bootstrap cross-signing");
812
813 let bob_user_id = owned_user_id!("@bob:localhost");
816
817 let sync_response_builder = SyncResponseBuilder::default();
818
819 (client, bob_user_id, sync_response_builder)
820 }
821
822 async fn change_bob_identity(
823 &self,
824 key_query_response: get_keys::v3::Response,
825 ) -> OtherUserIdentity {
826 self.client
827 .mark_request_as_sent(&TransactionId::new(), &key_query_response)
828 .await
829 .expect("Should not fail to send identity changes");
830
831 self.bob_crypto_other_identity().await
832 }
833
834 async fn bob_membership_change(&mut self, new_state: MembershipState) {
835 let sync_response = self
836 .sync_response_builder
837 .add_joined_room(JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID).add_state_event(
838 StateTestEvent::Custom(sync_response_member(
839 &self.bob_user_id,
840 new_state.clone(),
841 )),
842 ))
843 .build_sync_response();
844 self.room.client.process_sync(sync_response).await.unwrap();
845
846 let m = self
848 .room
849 .get_member_no_sync(&self.bob_user_id)
850 .await
851 .expect("Should not fail to get member");
852
853 match (&new_state, m) {
854 (MembershipState::Leave, None) => {}
855 (_, None) => {
856 panic!("Member should exist")
857 }
858 (_, Some(m)) => {
859 assert_eq!(*m.membership(), new_state);
860 }
861 }
862 }
863
864 async fn bob_is_pinned(&self) -> bool {
865 !self.bob_crypto_other_identity().await.identity_needs_user_approval()
866 }
867
868 async fn bob_is_verified(&self) -> bool {
869 self.bob_crypto_other_identity().await.is_verified()
870 }
871
872 async fn bob_crypto_other_identity(&self) -> OtherUserIdentity {
873 self.bob_user_identity()
874 .await
875 .expect("User identity should exist")
876 .underlying_identity()
877 .other()
878 .expect("Identity should be Other, not Own")
879 }
880
881 async fn bob_user_identity(&self) -> Option<UserIdentity> {
882 self.client
883 .encryption()
884 .get_user_identity(&self.bob_user_id)
885 .await
886 .expect("Should not fail to get user identity")
887 }
888 }
889
890 async fn create_just_me_room(
891 client: &Client,
892 sync_response_builder: &mut SyncResponseBuilder,
893 ) -> Room {
894 let create_room_sync_response = sync_response_builder
895 .add_joined_room(
896 JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID)
897 .add_state_event(StateTestEvent::Member),
898 )
899 .build_sync_response();
900 client.process_sync(create_room_sync_response).await.unwrap();
901 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
902 assert_eq!(room.state(), RoomState::Joined);
903 room
904 }
905
906 async fn create_room_with_other_member(
907 builder: &mut SyncResponseBuilder,
908 client: &Client,
909 other_user_id: &UserId,
910 ) -> Room {
911 let create_room_sync_response = builder
912 .add_joined_room(
913 JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID)
914 .add_state_event(StateTestEvent::Member)
915 .add_state_event(StateTestEvent::Custom(sync_response_member(
916 other_user_id,
917 MembershipState::Join,
918 ))),
919 )
920 .build_sync_response();
921 client.process_sync(create_room_sync_response).await.unwrap();
922 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
923 room.inner.mark_members_synced();
924
925 assert_eq!(room.state(), RoomState::Joined);
926 assert_eq!(
927 *room
928 .get_member_no_sync(other_user_id)
929 .await
930 .expect("Should not fail to get member")
931 .expect("Member should exist")
932 .membership(),
933 MembershipState::Join
934 );
935 room
936 }
937
938 async fn create_client_and_server() -> (Client, MockServer) {
939 let server = MockServer::start().await;
940 mock_members_request(&server).await;
941 mock_secret_storage_default_key(&server).await;
942 let client = logged_in_client(Some(server.uri())).await;
943 (client, server)
944 }
945
946 async fn mock_members_request(server: &MockServer) {
947 Mock::given(method("GET"))
948 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/members"))
949 .and(header("authorization", "Bearer 1234"))
950 .respond_with(
951 ResponseTemplate::new(200).set_body_json(&*test_json::members::MEMBERS),
952 )
953 .mount(server)
954 .await;
955 }
956
957 async fn mock_secret_storage_default_key(server: &MockServer) {
958 Mock::given(method("GET"))
959 .and(path_regex(
960 r"^/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
961 ))
962 .and(header("authorization", "Bearer 1234"))
963 .respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
964 .mount(server)
965 .await;
966 }
967
968 fn sync_response_member(
969 user_id: &UserId,
970 membership: MembershipState,
971 ) -> serde_json::Value {
972 json!({
973 "content": {
974 "membership": membership.to_string(),
975 },
976 "event_id": format!(
977 "$aa{}bb:localhost",
978 SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() % 100_000
979 ),
980 "origin_server_ts": 1472735824,
981 "sender": "@example:localhost",
982 "state_key": user_id,
983 "type": "m.room.member",
984 "unsigned": {
985 "age": 1234
986 }
987 })
988 }
989 }
990}