1#![cfg(feature = "e2e-encryption")]
17
18use std::collections::BTreeMap;
19
20use async_stream::stream;
21use futures_core::Stream;
22use futures_util::{StreamExt, stream_select};
23use matrix_sdk_base::crypto::{
24 IdentityState, IdentityStatusChange, RoomIdentityChange, RoomIdentityState,
25};
26use ruma::{OwnedUserId, UserId, events::room::member::SyncRoomMemberEvent};
27use tokio::sync::mpsc;
28use tokio_stream::wrappers::ReceiverStream;
29
30use super::Room;
31use crate::{
32 Client, Error, Result,
33 encryption::identities::{IdentityUpdates, UserIdentity},
34 event_handler::EventHandlerDropGuard,
35};
36
37#[derive(Debug)]
49pub struct IdentityStatusChanges {
50 room_identity_state: RoomIdentityState<Room>,
52
53 _drop_guard: EventHandlerDropGuard,
56}
57
58impl IdentityStatusChanges {
59 pub async fn create_stream(
90 room: Room,
91 ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>>> {
92 let identity_updates = wrap_identity_updates(&room.client).await?;
93 let (drop_guard, room_member_events) = wrap_room_member_events(&room);
94 let mut unprocessed_stream = combine_streams(identity_updates, room_member_events);
95 let own_user_id = room.client.user_id().ok_or(Error::InsufficientData)?.to_owned();
96
97 let state = IdentityStatusChanges {
98 room_identity_state: RoomIdentityState::new(room).await,
99 _drop_guard: drop_guard,
100 };
101
102 Ok(stream!({
103 let mut state = state;
106
107 let mut current_state =
108 filter_for_initial_update(state.room_identity_state.current_state(), &own_user_id);
109
110 if !current_state.is_empty() {
111 current_state.sort();
112 yield current_state;
113 }
114
115 while let Some(item) = unprocessed_stream.next().await {
116 let mut update = filter_non_self(
117 state.room_identity_state.process_change(item).await,
118 &own_user_id,
119 );
120 if !update.is_empty() {
121 update.sort();
122 yield update;
123 }
124 }
125 }))
126 }
127}
128
129fn filter_for_initial_update(
130 mut input: Vec<IdentityStatusChange>,
131 own_user_id: &UserId,
132) -> Vec<IdentityStatusChange> {
133 input.retain(|change| {
138 change.user_id != own_user_id && change.changed_to != IdentityState::Verified
139 });
140
141 input
142}
143
144fn filter_non_self(
145 mut input: Vec<IdentityStatusChange>,
146 own_user_id: &UserId,
147) -> Vec<IdentityStatusChange> {
148 input.retain(|change| change.user_id != own_user_id);
150 input
151}
152
153fn combine_streams(
154 identity_updates: impl Stream<Item = RoomIdentityChange> + Unpin,
155 room_member_events: impl Stream<Item = RoomIdentityChange> + Unpin,
156) -> impl Stream<Item = RoomIdentityChange> {
157 stream_select!(identity_updates, room_member_events)
158}
159
160async fn wrap_identity_updates(
161 client: &Client,
162) -> Result<impl Stream<Item = RoomIdentityChange> + use<>> {
163 Ok(client
164 .encryption()
165 .user_identities_stream()
166 .await?
167 .map(|item| RoomIdentityChange::IdentityUpdates(to_base_updates(item))))
168}
169
170fn to_base_updates(
171 input: IdentityUpdates,
172) -> matrix_sdk_base::crypto::store::types::IdentityUpdates {
173 matrix_sdk_base::crypto::store::types::IdentityUpdates {
174 new: to_base_identities(input.new),
175 changed: to_base_identities(input.changed),
176 unchanged: Default::default(),
177 }
178}
179
180fn to_base_identities(
181 input: BTreeMap<OwnedUserId, UserIdentity>,
182) -> BTreeMap<OwnedUserId, matrix_sdk_base::crypto::UserIdentity> {
183 input.into_iter().map(|(k, v)| (k, v.underlying_identity())).collect()
184}
185
186fn wrap_room_member_events(
187 room: &Room,
188) -> (EventHandlerDropGuard, impl Stream<Item = RoomIdentityChange> + use<>) {
189 let own_user_id = room.own_user_id().to_owned();
190 let room_id = room.room_id();
191 let (sender, receiver) = mpsc::channel(16);
192 let handle =
193 room.client.add_room_event_handler(room_id, move |event: SyncRoomMemberEvent| async move {
194 if *event.state_key() == own_user_id {
195 return;
196 }
197 let _: Result<_, _> =
198 sender.send(RoomIdentityChange::SyncRoomMemberEvent(Box::new(event))).await;
199 });
200 let drop_guard = room.client.event_handler_drop_guard(handle);
201 (drop_guard, ReceiverStream::new(receiver))
202}
203
204#[cfg(all(test, not(target_family = "wasm")))]
205mod tests {
206 use std::time::Duration;
207
208 use futures_util::{FutureExt as _, StreamExt as _, pin_mut};
209 use matrix_sdk_base::crypto::IdentityState;
210 use matrix_sdk_test::{async_test, test_json::keys_query_sets::IdentityChangeDataSet};
211 use test_setup::TestSetup;
212
213 use crate::assert_next_with_timeout;
214
215 #[async_test]
216 async fn test_when_user_becomes_unpinned_we_report_it() {
217 let t = TestSetup::new_room_with_other_bob().await;
219
220 t.pin_bob().await;
222
223 let stream = t.subscribe_to_identity_status_changes().await;
225 pin_mut!(stream);
226
227 t.unpin_bob().await;
229
230 let change = assert_next_with_timeout!(stream);
232 assert_eq!(change[0].user_id, t.bob_user_id());
233 assert_eq!(change[0].changed_to, IdentityState::PinViolation);
234 assert_eq!(change.len(), 1);
235 }
236
237 #[async_test]
238 async fn test_when_user_becomes_verification_violation_we_report_it() {
239 let t = TestSetup::new_room_with_other_bob().await;
241
242 t.verify_bob().await;
244
245 let stream = t.subscribe_to_identity_status_changes().await;
247 pin_mut!(stream);
248
249 t.unpin_bob().await;
251
252 let change = assert_next_with_timeout!(stream);
254 assert_eq!(change[0].user_id, t.bob_user_id());
255 assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
256 assert_eq!(change.len(), 1);
257 }
258
259 #[async_test]
260 async fn test_when_user_becomes_pinned_we_report_it() {
261 let t = TestSetup::new_room_with_other_bob().await;
263
264 t.unpin_bob().await;
266
267 let stream = t.subscribe_to_identity_status_changes().await;
269 pin_mut!(stream);
270
271 t.pin_bob().await;
273
274 let change1 = assert_next_with_timeout!(stream);
276 assert_eq!(change1[0].user_id, t.bob_user_id());
277 assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
278 assert_eq!(change1.len(), 1);
279
280 let change2 = assert_next_with_timeout!(stream);
282 assert_eq!(change2[0].user_id, t.bob_user_id());
283 assert_eq!(change2[0].changed_to, IdentityState::Pinned);
284 assert_eq!(change2.len(), 1);
285 }
286
287 #[async_test]
288 async fn test_when_user_becomes_verified_we_report_it() {
289 let t = TestSetup::new_room_with_other_bob().await;
291
292 let stream = t.subscribe_to_identity_status_changes().await;
294 pin_mut!(stream);
295
296 t.verify_bob().await;
298
299 let change = assert_next_with_timeout!(stream);
301 assert_eq!(change[0].user_id, t.bob_user_id());
302 assert_eq!(change[0].changed_to, IdentityState::Verified);
303 assert_eq!(change.len(), 1);
304
305 t.unpin_bob().await;
307
308 let change = assert_next_with_timeout!(stream);
310 assert_eq!(change[0].user_id, t.bob_user_id());
311 assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
312 assert_eq!(change.len(), 1);
313 }
314
315 #[async_test]
316 async fn test_when_an_unpinned_user_becomes_verified_we_report_it() {
317 let t = TestSetup::new_room_with_other_bob().await;
319
320 t.unpin_bob_with(IdentityChangeDataSet::key_query_with_identity_a()).await;
322
323 let stream = t.subscribe_to_identity_status_changes().await;
325 pin_mut!(stream);
326
327 t.verify_bob().await;
329
330 let change1 = assert_next_with_timeout!(stream);
332 assert_eq!(change1[0].user_id, t.bob_user_id());
333 assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
334 assert_eq!(change1.len(), 1);
335
336 let change2 = assert_next_with_timeout!(stream);
338 assert_eq!(change2[0].user_id, t.bob_user_id());
339 assert_eq!(change2[0].changed_to, IdentityState::Verified);
340 assert_eq!(change2.len(), 1);
341 }
342
343 #[async_test]
344 async fn test_when_user_in_verification_violation_becomes_verified_we_report_it() {
345 let t = TestSetup::new_room_with_other_bob().await;
347
348 t.verify_bob_with(
350 IdentityChangeDataSet::key_query_with_identity_b(),
351 IdentityChangeDataSet::master_signing_keys_b(),
352 IdentityChangeDataSet::self_signing_keys_b(),
353 )
354 .await;
355 t.unpin_bob().await;
356
357 let stream = t.subscribe_to_identity_status_changes().await;
359 pin_mut!(stream);
360
361 t.verify_bob().await;
363
364 let change1 = assert_next_with_timeout!(stream);
366 assert_eq!(change1[0].user_id, t.bob_user_id());
367 assert_eq!(change1[0].changed_to, IdentityState::VerificationViolation);
368 assert_eq!(change1.len(), 1);
369
370 let change2 = assert_next_with_timeout!(stream);
372 assert_eq!(change2[0].user_id, t.bob_user_id());
373 assert_eq!(change2[0].changed_to, IdentityState::Verified);
374 assert_eq!(change2.len(), 1);
375 }
376
377 #[async_test]
378 async fn test_when_an_unpinned_user_joins_we_report_it() {
379 let mut t = TestSetup::new_just_me_room().await;
381
382 t.unpin_bob().await;
384
385 let stream = t.subscribe_to_identity_status_changes().await;
387 pin_mut!(stream);
388
389 t.bob_joins().await;
391
392 let change = assert_next_with_timeout!(stream);
394 assert_eq!(change[0].user_id, t.bob_user_id());
395 assert_eq!(change[0].changed_to, IdentityState::PinViolation);
396 assert_eq!(change.len(), 1);
397 }
398
399 #[async_test]
400 async fn test_when_an_verification_violating_user_joins_we_report_it() {
401 let mut t = TestSetup::new_just_me_room().await;
403
404 t.verify_bob().await;
406 t.unpin_bob().await;
407
408 let stream = t.subscribe_to_identity_status_changes().await;
410 pin_mut!(stream);
411
412 t.bob_joins().await;
414
415 let change = assert_next_with_timeout!(stream);
417 assert_eq!(change[0].user_id, t.bob_user_id());
418 assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
419 assert_eq!(change.len(), 1);
420 }
421
422 #[async_test]
423 async fn test_when_a_verified_user_joins_we_dont_report_it() {
424 let mut t = TestSetup::new_just_me_room().await;
426
427 t.verify_bob().await;
429
430 let stream = t.subscribe_to_identity_status_changes().await;
432 pin_mut!(stream);
433
434 t.bob_joins().await;
436
437 t.unpin_bob().await;
439
440 let change = assert_next_with_timeout!(stream);
442 assert_eq!(change[0].user_id, t.bob_user_id());
443 assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
444 assert_eq!(change.len(), 1);
445 }
446
447 #[async_test]
448 async fn test_when_a_pinned_user_joins_we_do_not_report() {
449 let mut t = TestSetup::new_just_me_room().await;
451
452 t.pin_bob().await;
454
455 let stream = t.subscribe_to_identity_status_changes().await;
457 pin_mut!(stream);
458
459 t.bob_joins().await;
461
462 tokio::time::sleep(Duration::from_millis(200)).await;
464 let change = stream.next().now_or_never();
465 assert!(change.is_none());
466 }
467
468 #[async_test]
469 async fn test_when_an_unpinned_user_leaves_we_report_it() {
470 let mut t = TestSetup::new_room_with_other_bob().await;
472
473 t.unpin_bob().await;
475
476 let stream = t.subscribe_to_identity_status_changes().await;
478 pin_mut!(stream);
479
480 t.bob_leaves().await;
482
483 let change1 = assert_next_with_timeout!(stream);
485 assert_eq!(change1[0].user_id, t.bob_user_id());
486 assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
487 assert_eq!(change1.len(), 1);
488
489 let change2 = assert_next_with_timeout!(stream);
491 assert_eq!(change2[0].user_id, t.bob_user_id());
494 assert_eq!(change2[0].changed_to, IdentityState::Pinned);
495 assert_eq!(change2.len(), 1);
496 }
497
498 #[async_test]
499 async fn test_multiple_identity_changes_are_reported() {
500 let mut t = TestSetup::new_just_me_room().await;
502
503 t.unpin_bob().await;
505
506 let stream = t.subscribe_to_identity_status_changes().await;
508 pin_mut!(stream);
509
510 t.bob_joins().await;
519 let change1 = assert_next_with_timeout!(stream);
520
521 t.pin_bob().await;
523 let change2 = assert_next_with_timeout!(stream);
524
525 t.bob_leaves().await;
527 t.bob_joins().await;
528
529 t.unpin_bob().await;
531 let change3 = assert_next_with_timeout!(stream);
532
533 t.bob_leaves().await;
535 let change4 = assert_next_with_timeout!(stream);
536
537 assert_eq!(change1[0].user_id, t.bob_user_id());
538 assert_eq!(change2[0].user_id, t.bob_user_id());
539 assert_eq!(change3[0].user_id, t.bob_user_id());
540 assert_eq!(change4[0].user_id, t.bob_user_id());
541
542 assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
543 assert_eq!(change2[0].changed_to, IdentityState::Pinned);
544 assert_eq!(change3[0].changed_to, IdentityState::PinViolation);
545 assert_eq!(change4[0].changed_to, IdentityState::Pinned);
546
547 assert_eq!(change1.len(), 1);
548 assert_eq!(change2.len(), 1);
549 assert_eq!(change3.len(), 1);
550 assert_eq!(change4.len(), 1);
551 }
552
553 #[async_test]
554 async fn test_when_an_unpinned_user_is_already_present_we_report_it_immediately() {
555 let t = TestSetup::new_room_with_other_bob().await;
557 t.unpin_bob().await;
558
559 let stream = t.subscribe_to_identity_status_changes().await;
561 pin_mut!(stream);
562
563 let change = assert_next_with_timeout!(stream);
565 assert_eq!(change[0].user_id, t.bob_user_id());
566 assert_eq!(change[0].changed_to, IdentityState::PinViolation);
567 assert_eq!(change.len(), 1);
568 }
569
570 #[async_test]
571 async fn test_when_a_verified_user_is_already_present_we_dont_report_it() {
572 let t = TestSetup::new_room_with_other_bob().await;
574 t.verify_bob().await;
575
576 let stream = t.subscribe_to_identity_status_changes().await;
578 pin_mut!(stream);
579
580 t.unpin_bob().await;
582
583 let next_change = assert_next_with_timeout!(stream);
585
586 assert_eq!(next_change[0].user_id, t.bob_user_id());
587 assert_eq!(next_change[0].changed_to, IdentityState::VerificationViolation);
588 assert_eq!(next_change.len(), 1);
589 }
590
591 mod test_setup {
596 use futures_core::Stream;
597 use matrix_sdk_base::{
598 RoomState,
599 crypto::{
600 IdentityStatusChange, OtherUserIdentity,
601 testing::simulate_key_query_response_for_verification,
602 },
603 };
604 use matrix_sdk_test::{
605 DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder,
606 event_factory::EventFactory, test_json,
607 test_json::keys_query_sets::IdentityChangeDataSet,
608 };
609 use ruma::{
610 OwnedUserId, TransactionId, UserId,
611 api::client::keys::{get_keys, get_keys::v3::Response as KeyQueryResponse},
612 events::room::member::MembershipState,
613 owned_user_id, user_id,
614 };
615 use serde_json::json;
616 use wiremock::{
617 Mock, MockServer, ResponseTemplate,
618 matchers::{header, method, path_regex},
619 };
620
621 use crate::{
622 Client, Room, encryption::identities::UserIdentity, test_utils::logged_in_client,
623 };
624
625 pub(super) struct TestSetup {
635 client: Client,
636 bob_user_id: OwnedUserId,
637 sync_response_builder: SyncResponseBuilder,
638 room: Room,
639 }
640
641 impl TestSetup {
642 pub(super) async fn new_just_me_room() -> Self {
643 let (client, user_id, mut sync_response_builder) = Self::init().await;
644 let room = create_just_me_room(&client, &mut sync_response_builder).await;
645 Self { client, bob_user_id: user_id, sync_response_builder, room }
646 }
647
648 pub(super) async fn new_room_with_other_bob() -> Self {
649 let (client, bob_user_id, mut sync_response_builder) = Self::init().await;
650 let room = create_room_with_other_member(
651 &mut sync_response_builder,
652 &client,
653 &bob_user_id,
654 )
655 .await;
656 Self { client, bob_user_id, sync_response_builder, room }
657 }
658
659 pub(super) fn bob_user_id(&self) -> &UserId {
660 &self.bob_user_id
661 }
662
663 pub(super) async fn pin_bob(&self) {
664 if self.bob_user_identity().await.is_some() {
665 assert!(
666 !self.bob_is_pinned().await,
667 "pin_bob() called when the identity is already pinned!"
668 );
669
670 self.bob_user_identity()
672 .await
673 .expect("User should exist")
674 .pin()
675 .await
676 .expect("Should not fail to pin");
677 } else {
678 self.change_bob_identity(IdentityChangeDataSet::key_query_with_identity_a())
680 .await;
681 }
682
683 assert!(self.bob_is_pinned().await);
685 }
686
687 pub(super) async fn unpin_bob(&self) {
688 self.unpin_bob_with(IdentityChangeDataSet::key_query_with_identity_b()).await;
689 }
690
691 pub(super) async fn unpin_bob_with(&self, requested: KeyQueryResponse) {
692 fn master_key_json(key_query_response: &KeyQueryResponse) -> String {
693 serde_json::to_string(
694 key_query_response
695 .master_keys
696 .first_key_value()
697 .expect("Master key should have a value")
698 .1,
699 )
700 .expect("Should be able to serialise master key")
701 }
702
703 let a = IdentityChangeDataSet::key_query_with_identity_a();
704 let b = IdentityChangeDataSet::key_query_with_identity_b();
705 let requested_master_key = master_key_json(&requested);
706 let a_master_key = master_key_json(&a);
707
708 if requested_master_key == a_master_key {
712 self.change_bob_identity(b).await;
713 if !self.bob_is_pinned().await {
714 self.pin_bob().await;
715 }
716 self.change_bob_identity(a).await;
717 } else {
718 self.change_bob_identity(a).await;
719 if !self.bob_is_pinned().await {
720 self.pin_bob().await;
721 }
722 self.change_bob_identity(b).await;
723 }
724
725 assert!(!self.bob_is_pinned().await);
727 }
728
729 pub(super) async fn verify_bob(&self) {
730 self.verify_bob_with(
731 IdentityChangeDataSet::key_query_with_identity_a(),
732 IdentityChangeDataSet::master_signing_keys_a(),
733 IdentityChangeDataSet::self_signing_keys_a(),
734 )
735 .await;
736 }
737
738 pub(super) async fn verify_bob_with(
739 &self,
740 key_query: KeyQueryResponse,
741 master_signing_key: serde_json::Value,
742 self_signing_key: serde_json::Value,
743 ) {
744 self.change_bob_identity(key_query).await;
746
747 let my_user_id = self.client.user_id().expect("I should have a user id");
748 let my_identity = self
749 .client
750 .encryption()
751 .get_user_identity(my_user_id)
752 .await
753 .expect("Should not fail to get own user identity")
754 .expect("Should have an own user identity")
755 .underlying_identity()
756 .own()
757 .expect("Our own identity should be of type Own");
758
759 let signature_upload_request = self
761 .bob_crypto_other_identity()
762 .await
763 .verify()
764 .await
765 .expect("Should be able to verify other identity");
766
767 let verification_response = simulate_key_query_response_for_verification(
768 signature_upload_request,
769 my_identity,
770 my_user_id,
771 self.bob_user_id(),
772 master_signing_key,
773 self_signing_key,
774 );
775
776 self.client
778 .mark_request_as_sent(&TransactionId::new(), &verification_response)
779 .await
780 .unwrap();
781
782 assert!(self.bob_is_verified().await);
784 }
785
786 pub(super) async fn bob_joins(&mut self) {
787 self.bob_membership_change(MembershipState::Join).await;
788 }
789
790 pub(super) async fn bob_leaves(&mut self) {
791 self.bob_membership_change(MembershipState::Leave).await;
792 }
793
794 pub(super) async fn subscribe_to_identity_status_changes(
795 &self,
796 ) -> impl Stream<Item = Vec<IdentityStatusChange>> + use<> {
797 self.room
798 .subscribe_to_identity_status_changes()
799 .await
800 .expect("Should be able to subscribe")
801 }
802
803 async fn init() -> (Client, OwnedUserId, SyncResponseBuilder) {
804 let (client, _server) = create_client_and_server().await;
805
806 client
808 .olm_machine()
809 .await
810 .as_ref()
811 .expect("We should have an Olm machine")
812 .bootstrap_cross_signing(true)
813 .await
814 .expect("Should be able to bootstrap cross-signing");
815
816 let bob_user_id = owned_user_id!("@bob:localhost");
819
820 let sync_response_builder = SyncResponseBuilder::default();
821
822 (client, bob_user_id, sync_response_builder)
823 }
824
825 async fn change_bob_identity(
826 &self,
827 key_query_response: get_keys::v3::Response,
828 ) -> OtherUserIdentity {
829 self.client
830 .mark_request_as_sent(&TransactionId::new(), &key_query_response)
831 .await
832 .expect("Should not fail to send identity changes");
833
834 self.bob_crypto_other_identity().await
835 }
836
837 async fn bob_membership_change(&mut self, new_state: MembershipState) {
838 let f = EventFactory::new().sender(user_id!("@example:localhost"));
839 let sync_response = self
840 .sync_response_builder
841 .add_joined_room(
842 JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID).add_state_event(
843 f.member(&self.bob_user_id).membership(new_state.clone()),
844 ),
845 )
846 .build_sync_response();
847 self.room.client.process_sync(sync_response).await.unwrap();
848
849 let m = self
851 .room
852 .get_member_no_sync(&self.bob_user_id)
853 .await
854 .expect("Should not fail to get member");
855
856 match (&new_state, m) {
857 (MembershipState::Leave, None) => {}
858 (_, None) => {
859 panic!("Member should exist")
860 }
861 (_, Some(m)) => {
862 assert_eq!(*m.membership(), new_state);
863 }
864 }
865 }
866
867 async fn bob_is_pinned(&self) -> bool {
868 !self.bob_crypto_other_identity().await.identity_needs_user_approval()
869 }
870
871 async fn bob_is_verified(&self) -> bool {
872 self.bob_crypto_other_identity().await.is_verified()
873 }
874
875 async fn bob_crypto_other_identity(&self) -> OtherUserIdentity {
876 self.bob_user_identity()
877 .await
878 .expect("User identity should exist")
879 .underlying_identity()
880 .other()
881 .expect("Identity should be Other, not Own")
882 }
883
884 async fn bob_user_identity(&self) -> Option<UserIdentity> {
885 self.client
886 .encryption()
887 .get_user_identity(&self.bob_user_id)
888 .await
889 .expect("Should not fail to get user identity")
890 }
891 }
892
893 async fn create_just_me_room(
894 client: &Client,
895 sync_response_builder: &mut SyncResponseBuilder,
896 ) -> Room {
897 let f = EventFactory::new().sender(user_id!("@example:localhost"));
898 let create_room_sync_response = sync_response_builder
899 .add_joined_room(JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID).add_state_event(
900 f.member(user_id!("@example:localhost")).display_name("example"),
901 ))
902 .build_sync_response();
903 client.process_sync(create_room_sync_response).await.unwrap();
904 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
905 assert_eq!(room.state(), RoomState::Joined);
906 room
907 }
908
909 async fn create_room_with_other_member(
910 builder: &mut SyncResponseBuilder,
911 client: &Client,
912 other_user_id: &UserId,
913 ) -> Room {
914 let f = EventFactory::new().sender(user_id!("@example:localhost"));
915 let create_room_sync_response = builder
916 .add_joined_room(
917 JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID)
918 .add_state_event(
919 f.member(user_id!("@example:localhost")).display_name("example"),
920 )
921 .add_state_event(f.member(other_user_id).membership(MembershipState::Join)),
922 )
923 .build_sync_response();
924 client.process_sync(create_room_sync_response).await.unwrap();
925 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
926 room.inner.mark_members_synced();
927
928 assert_eq!(room.state(), RoomState::Joined);
929 assert_eq!(
930 *room
931 .get_member_no_sync(other_user_id)
932 .await
933 .expect("Should not fail to get member")
934 .expect("Member should exist")
935 .membership(),
936 MembershipState::Join
937 );
938 room
939 }
940
941 async fn create_client_and_server() -> (Client, MockServer) {
942 let server = MockServer::start().await;
943 mock_members_request(&server).await;
944 mock_secret_storage_default_key(&server).await;
945 let client = logged_in_client(Some(server.uri())).await;
946 (client, server)
947 }
948
949 async fn mock_members_request(server: &MockServer) {
950 Mock::given(method("GET"))
951 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/members"))
952 .and(header("authorization", "Bearer 1234"))
953 .respond_with(
954 ResponseTemplate::new(200).set_body_json(&*test_json::members::MEMBERS),
955 )
956 .mount(server)
957 .await;
958 }
959
960 async fn mock_secret_storage_default_key(server: &MockServer) {
961 Mock::given(method("GET"))
962 .and(path_regex(
963 r"^/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
964 ))
965 .and(header("authorization", "Bearer 1234"))
966 .respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
967 .mount(server)
968 .await;
969 }
970 }
971}