1#![cfg(all(feature = "e2e-encryption", not(target_arch = "wasm32")))]
17
18use std::collections::BTreeMap;
19
20use async_stream::stream;
21use futures_core::Stream;
22use futures_util::{stream_select, StreamExt};
23use matrix_sdk_base::crypto::{
24 IdentityState, IdentityStatusChange, RoomIdentityChange, RoomIdentityState,
25};
26use ruma::{events::room::member::SyncRoomMemberEvent, OwnedUserId, UserId};
27use tokio::sync::mpsc;
28use tokio_stream::wrappers::ReceiverStream;
29
30use super::Room;
31use crate::{
32 encryption::identities::{IdentityUpdates, UserIdentity},
33 event_handler::EventHandlerDropGuard,
34 Client, Error, Result,
35};
36
37#[derive(Debug)]
49pub struct IdentityStatusChanges {
50 room_identity_state: RoomIdentityState<Room>,
52
53 _drop_guard: EventHandlerDropGuard,
56}
57
58impl IdentityStatusChanges {
59 pub async fn create_stream(
90 room: Room,
91 ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>>> {
92 let identity_updates = wrap_identity_updates(&room.client).await?;
93 let (drop_guard, room_member_events) = wrap_room_member_events(&room);
94 let mut unprocessed_stream = combine_streams(identity_updates, room_member_events);
95 let own_user_id = room.client.user_id().ok_or(Error::InsufficientData)?.to_owned();
96
97 let mut state = IdentityStatusChanges {
98 room_identity_state: RoomIdentityState::new(room).await,
99 _drop_guard: drop_guard,
100 };
101
102 Ok(stream!({
103 let mut current_state =
104 filter_for_initial_update(state.room_identity_state.current_state(), &own_user_id);
105
106 if !current_state.is_empty() {
107 current_state.sort();
108 yield current_state;
109 }
110
111 while let Some(item) = unprocessed_stream.next().await {
112 let mut update = filter_non_self(
113 state.room_identity_state.process_change(item).await,
114 &own_user_id,
115 );
116 if !update.is_empty() {
117 update.sort();
118 yield update;
119 }
120 }
121 }))
122 }
123}
124
125fn filter_for_initial_update(
126 mut input: Vec<IdentityStatusChange>,
127 own_user_id: &UserId,
128) -> Vec<IdentityStatusChange> {
129 input.retain(|change| {
134 change.user_id != own_user_id && change.changed_to != IdentityState::Verified
135 });
136
137 input
138}
139
140fn filter_non_self(
141 mut input: Vec<IdentityStatusChange>,
142 own_user_id: &UserId,
143) -> Vec<IdentityStatusChange> {
144 input.retain(|change| change.user_id != own_user_id);
146 input
147}
148
149fn combine_streams(
150 identity_updates: impl Stream<Item = RoomIdentityChange> + Unpin,
151 room_member_events: impl Stream<Item = RoomIdentityChange> + Unpin,
152) -> impl Stream<Item = RoomIdentityChange> {
153 stream_select!(identity_updates, room_member_events)
154}
155
156async fn wrap_identity_updates(client: &Client) -> Result<impl Stream<Item = RoomIdentityChange>> {
157 Ok(client
158 .encryption()
159 .user_identities_stream()
160 .await?
161 .map(|item| RoomIdentityChange::IdentityUpdates(to_base_updates(item))))
162}
163
164fn to_base_updates(input: IdentityUpdates) -> matrix_sdk_base::crypto::store::IdentityUpdates {
165 matrix_sdk_base::crypto::store::IdentityUpdates {
166 new: to_base_identities(input.new),
167 changed: to_base_identities(input.changed),
168 unchanged: Default::default(),
169 }
170}
171
172fn to_base_identities(
173 input: BTreeMap<OwnedUserId, UserIdentity>,
174) -> BTreeMap<OwnedUserId, matrix_sdk_base::crypto::UserIdentity> {
175 input.into_iter().map(|(k, v)| (k, v.underlying_identity())).collect()
176}
177
178fn wrap_room_member_events(
179 room: &Room,
180) -> (EventHandlerDropGuard, impl Stream<Item = RoomIdentityChange>) {
181 let own_user_id = room.own_user_id().to_owned();
182 let room_id = room.room_id();
183 let (sender, receiver) = mpsc::channel(16);
184 let handle =
185 room.client.add_room_event_handler(room_id, move |event: SyncRoomMemberEvent| async move {
186 if *event.state_key() == own_user_id {
187 return;
188 }
189 let _: Result<_, _> = sender.send(RoomIdentityChange::SyncRoomMemberEvent(event)).await;
190 });
191 let drop_guard = room.client.event_handler_drop_guard(handle);
192 (drop_guard, ReceiverStream::new(receiver))
193}
194
195#[cfg(test)]
196mod tests {
197 use std::time::Duration;
198
199 use futures_util::{pin_mut, FutureExt as _, StreamExt as _};
200 use matrix_sdk_base::crypto::IdentityState;
201 use matrix_sdk_test::{async_test, test_json::keys_query_sets::IdentityChangeDataSet};
202 use test_setup::TestSetup;
203
204 use crate::assert_next_with_timeout;
205
206 #[async_test]
207 async fn test_when_user_becomes_unpinned_we_report_it() {
208 let t = TestSetup::new_room_with_other_bob().await;
210
211 t.pin_bob().await;
213
214 let stream = t.subscribe_to_identity_status_changes().await;
216 pin_mut!(stream);
217
218 t.unpin_bob().await;
220
221 let change = assert_next_with_timeout!(stream);
223 assert_eq!(change[0].user_id, t.bob_user_id());
224 assert_eq!(change[0].changed_to, IdentityState::PinViolation);
225 assert_eq!(change.len(), 1);
226 }
227
228 #[async_test]
229 async fn test_when_user_becomes_verification_violation_we_report_it() {
230 let t = TestSetup::new_room_with_other_bob().await;
232
233 t.verify_bob().await;
235
236 let stream = t.subscribe_to_identity_status_changes().await;
238 pin_mut!(stream);
239
240 t.unpin_bob().await;
242
243 let change = assert_next_with_timeout!(stream);
245 assert_eq!(change[0].user_id, t.bob_user_id());
246 assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
247 assert_eq!(change.len(), 1);
248 }
249
250 #[async_test]
251 async fn test_when_user_becomes_pinned_we_report_it() {
252 let t = TestSetup::new_room_with_other_bob().await;
254
255 t.unpin_bob().await;
257
258 let stream = t.subscribe_to_identity_status_changes().await;
260 pin_mut!(stream);
261
262 t.pin_bob().await;
264
265 let change1 = assert_next_with_timeout!(stream);
267 assert_eq!(change1[0].user_id, t.bob_user_id());
268 assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
269 assert_eq!(change1.len(), 1);
270
271 let change2 = assert_next_with_timeout!(stream);
273 assert_eq!(change2[0].user_id, t.bob_user_id());
274 assert_eq!(change2[0].changed_to, IdentityState::Pinned);
275 assert_eq!(change2.len(), 1);
276 }
277
278 #[async_test]
279 async fn test_when_user_becomes_verified_we_report_it() {
280 let t = TestSetup::new_room_with_other_bob().await;
282
283 let stream = t.subscribe_to_identity_status_changes().await;
285 pin_mut!(stream);
286
287 t.verify_bob().await;
289
290 let change = assert_next_with_timeout!(stream);
292 assert_eq!(change[0].user_id, t.bob_user_id());
293 assert_eq!(change[0].changed_to, IdentityState::Verified);
294 assert_eq!(change.len(), 1);
295
296 t.unpin_bob().await;
298
299 let change = assert_next_with_timeout!(stream);
301 assert_eq!(change[0].user_id, t.bob_user_id());
302 assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
303 assert_eq!(change.len(), 1);
304 }
305
306 #[async_test]
307 async fn test_when_an_unpinned_user_becomes_verified_we_report_it() {
308 let t = TestSetup::new_room_with_other_bob().await;
310
311 t.unpin_bob_with(IdentityChangeDataSet::key_query_with_identity_a()).await;
313
314 let stream = t.subscribe_to_identity_status_changes().await;
316 pin_mut!(stream);
317
318 t.verify_bob().await;
320
321 let change1 = assert_next_with_timeout!(stream);
323 assert_eq!(change1[0].user_id, t.bob_user_id());
324 assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
325 assert_eq!(change1.len(), 1);
326
327 let change2 = assert_next_with_timeout!(stream);
329 assert_eq!(change2[0].user_id, t.bob_user_id());
330 assert_eq!(change2[0].changed_to, IdentityState::Verified);
331 assert_eq!(change2.len(), 1);
332 }
333
334 #[async_test]
335 async fn test_when_user_in_verification_violation_becomes_verified_we_report_it() {
336 let t = TestSetup::new_room_with_other_bob().await;
338
339 t.verify_bob_with(
341 IdentityChangeDataSet::key_query_with_identity_b(),
342 IdentityChangeDataSet::master_signing_keys_b(),
343 IdentityChangeDataSet::self_signing_keys_b(),
344 )
345 .await;
346 t.unpin_bob().await;
347
348 let stream = t.subscribe_to_identity_status_changes().await;
350 pin_mut!(stream);
351
352 t.verify_bob().await;
354
355 let change1 = assert_next_with_timeout!(stream);
357 assert_eq!(change1[0].user_id, t.bob_user_id());
358 assert_eq!(change1[0].changed_to, IdentityState::VerificationViolation);
359 assert_eq!(change1.len(), 1);
360
361 let change2 = assert_next_with_timeout!(stream);
363 assert_eq!(change2[0].user_id, t.bob_user_id());
364 assert_eq!(change2[0].changed_to, IdentityState::Verified);
365 assert_eq!(change2.len(), 1);
366 }
367
368 #[async_test]
369 async fn test_when_an_unpinned_user_joins_we_report_it() {
370 let mut t = TestSetup::new_just_me_room().await;
372
373 t.unpin_bob().await;
375
376 let stream = t.subscribe_to_identity_status_changes().await;
378 pin_mut!(stream);
379
380 t.bob_joins().await;
382
383 let change = assert_next_with_timeout!(stream);
385 assert_eq!(change[0].user_id, t.bob_user_id());
386 assert_eq!(change[0].changed_to, IdentityState::PinViolation);
387 assert_eq!(change.len(), 1);
388 }
389
390 #[async_test]
391 async fn test_when_an_verification_violating_user_joins_we_report_it() {
392 let mut t = TestSetup::new_just_me_room().await;
394
395 t.verify_bob().await;
397 t.unpin_bob().await;
398
399 let stream = t.subscribe_to_identity_status_changes().await;
401 pin_mut!(stream);
402
403 t.bob_joins().await;
405
406 let change = assert_next_with_timeout!(stream);
408 assert_eq!(change[0].user_id, t.bob_user_id());
409 assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
410 assert_eq!(change.len(), 1);
411 }
412
413 #[async_test]
414 async fn test_when_a_verified_user_joins_we_dont_report_it() {
415 let mut t = TestSetup::new_just_me_room().await;
417
418 t.verify_bob().await;
420
421 let stream = t.subscribe_to_identity_status_changes().await;
423 pin_mut!(stream);
424
425 t.bob_joins().await;
427
428 t.unpin_bob().await;
430
431 let change = assert_next_with_timeout!(stream);
433 assert_eq!(change[0].user_id, t.bob_user_id());
434 assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
435 assert_eq!(change.len(), 1);
436 }
437
438 #[async_test]
439 async fn test_when_a_pinned_user_joins_we_do_not_report() {
440 let mut t = TestSetup::new_just_me_room().await;
442
443 t.pin_bob().await;
445
446 let stream = t.subscribe_to_identity_status_changes().await;
448 pin_mut!(stream);
449
450 t.bob_joins().await;
452
453 tokio::time::sleep(Duration::from_millis(200)).await;
455 let change = stream.next().now_or_never();
456 assert!(change.is_none());
457 }
458
459 #[async_test]
460 async fn test_when_an_unpinned_user_leaves_we_report_it() {
461 let mut t = TestSetup::new_room_with_other_bob().await;
463
464 t.unpin_bob().await;
466
467 let stream = t.subscribe_to_identity_status_changes().await;
469 pin_mut!(stream);
470
471 t.bob_leaves().await;
473
474 let change1 = assert_next_with_timeout!(stream);
476 assert_eq!(change1[0].user_id, t.bob_user_id());
477 assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
478 assert_eq!(change1.len(), 1);
479
480 let change2 = assert_next_with_timeout!(stream);
482 assert_eq!(change2[0].user_id, t.bob_user_id());
485 assert_eq!(change2[0].changed_to, IdentityState::Pinned);
486 assert_eq!(change2.len(), 1);
487 }
488
489 #[async_test]
490 async fn test_multiple_identity_changes_are_reported() {
491 let mut t = TestSetup::new_just_me_room().await;
493
494 t.unpin_bob().await;
496
497 let stream = t.subscribe_to_identity_status_changes().await;
499 pin_mut!(stream);
500
501 t.bob_joins().await;
510 let change1 = assert_next_with_timeout!(stream);
511
512 t.pin_bob().await;
514 let change2 = assert_next_with_timeout!(stream);
515
516 t.bob_leaves().await;
518 t.bob_joins().await;
519
520 t.unpin_bob().await;
522 let change3 = assert_next_with_timeout!(stream);
523
524 t.bob_leaves().await;
526 let change4 = assert_next_with_timeout!(stream);
527
528 assert_eq!(change1[0].user_id, t.bob_user_id());
529 assert_eq!(change2[0].user_id, t.bob_user_id());
530 assert_eq!(change3[0].user_id, t.bob_user_id());
531 assert_eq!(change4[0].user_id, t.bob_user_id());
532
533 assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
534 assert_eq!(change2[0].changed_to, IdentityState::Pinned);
535 assert_eq!(change3[0].changed_to, IdentityState::PinViolation);
536 assert_eq!(change4[0].changed_to, IdentityState::Pinned);
537
538 assert_eq!(change1.len(), 1);
539 assert_eq!(change2.len(), 1);
540 assert_eq!(change3.len(), 1);
541 assert_eq!(change4.len(), 1);
542 }
543
544 #[async_test]
545 async fn test_when_an_unpinned_user_is_already_present_we_report_it_immediately() {
546 let t = TestSetup::new_room_with_other_bob().await;
548 t.unpin_bob().await;
549
550 let stream = t.subscribe_to_identity_status_changes().await;
552 pin_mut!(stream);
553
554 let change = assert_next_with_timeout!(stream);
556 assert_eq!(change[0].user_id, t.bob_user_id());
557 assert_eq!(change[0].changed_to, IdentityState::PinViolation);
558 assert_eq!(change.len(), 1);
559 }
560
561 #[async_test]
562 async fn test_when_a_verified_user_is_already_present_we_dont_report_it() {
563 let t = TestSetup::new_room_with_other_bob().await;
565 t.verify_bob().await;
566
567 let stream = t.subscribe_to_identity_status_changes().await;
569 pin_mut!(stream);
570
571 t.unpin_bob().await;
573
574 let next_change = assert_next_with_timeout!(stream);
576
577 assert_eq!(next_change[0].user_id, t.bob_user_id());
578 assert_eq!(next_change[0].changed_to, IdentityState::VerificationViolation);
579 assert_eq!(next_change.len(), 1);
580 }
581
582 mod test_setup {
587 use std::time::{SystemTime, UNIX_EPOCH};
588
589 use futures_core::Stream;
590 use matrix_sdk_base::{
591 crypto::{
592 testing::simulate_key_query_response_for_verification, IdentityStatusChange,
593 OtherUserIdentity,
594 },
595 RoomState,
596 };
597 use matrix_sdk_test::{
598 test_json, test_json::keys_query_sets::IdentityChangeDataSet, JoinedRoomBuilder,
599 StateTestEvent, SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
600 };
601 use ruma::{
602 api::client::keys::{get_keys, get_keys::v3::Response as KeyQueryResponse},
603 events::room::member::MembershipState,
604 owned_user_id, OwnedUserId, TransactionId, UserId,
605 };
606 use serde_json::json;
607 use wiremock::{
608 matchers::{header, method, path_regex},
609 Mock, MockServer, ResponseTemplate,
610 };
611
612 use crate::{
613 encryption::identities::UserIdentity, test_utils::logged_in_client, Client, Room,
614 };
615
616 pub(super) struct TestSetup {
626 client: Client,
627 bob_user_id: OwnedUserId,
628 sync_response_builder: SyncResponseBuilder,
629 room: Room,
630 }
631
632 impl TestSetup {
633 pub(super) async fn new_just_me_room() -> Self {
634 let (client, user_id, mut sync_response_builder) = Self::init().await;
635 let room = create_just_me_room(&client, &mut sync_response_builder).await;
636 Self { client, bob_user_id: user_id, sync_response_builder, room }
637 }
638
639 pub(super) async fn new_room_with_other_bob() -> Self {
640 let (client, bob_user_id, mut sync_response_builder) = Self::init().await;
641 let room = create_room_with_other_member(
642 &mut sync_response_builder,
643 &client,
644 &bob_user_id,
645 )
646 .await;
647 Self { client, bob_user_id, sync_response_builder, room }
648 }
649
650 pub(super) fn bob_user_id(&self) -> &UserId {
651 &self.bob_user_id
652 }
653
654 pub(super) async fn pin_bob(&self) {
655 if self.bob_user_identity().await.is_some() {
656 assert!(
657 !self.bob_is_pinned().await,
658 "pin_bob() called when the identity is already pinned!"
659 );
660
661 self.bob_user_identity()
663 .await
664 .expect("User should exist")
665 .pin()
666 .await
667 .expect("Should not fail to pin");
668 } else {
669 self.change_bob_identity(IdentityChangeDataSet::key_query_with_identity_a())
671 .await;
672 }
673
674 assert!(self.bob_is_pinned().await);
676 }
677
678 pub(super) async fn unpin_bob(&self) {
679 self.unpin_bob_with(IdentityChangeDataSet::key_query_with_identity_b()).await;
680 }
681
682 pub(super) async fn unpin_bob_with(&self, requested: KeyQueryResponse) {
683 fn master_key_json(key_query_response: &KeyQueryResponse) -> String {
684 serde_json::to_string(
685 key_query_response
686 .master_keys
687 .first_key_value()
688 .expect("Master key should have a value")
689 .1,
690 )
691 .expect("Should be able to serialise master key")
692 }
693
694 let a = IdentityChangeDataSet::key_query_with_identity_a();
695 let b = IdentityChangeDataSet::key_query_with_identity_b();
696 let requested_master_key = master_key_json(&requested);
697 let a_master_key = master_key_json(&a);
698
699 if requested_master_key == a_master_key {
703 self.change_bob_identity(b).await;
704 if !self.bob_is_pinned().await {
705 self.pin_bob().await;
706 }
707 self.change_bob_identity(a).await;
708 } else {
709 self.change_bob_identity(a).await;
710 if !self.bob_is_pinned().await {
711 self.pin_bob().await;
712 }
713 self.change_bob_identity(b).await;
714 }
715
716 assert!(!self.bob_is_pinned().await);
718 }
719
720 pub(super) async fn verify_bob(&self) {
721 self.verify_bob_with(
722 IdentityChangeDataSet::key_query_with_identity_a(),
723 IdentityChangeDataSet::master_signing_keys_a(),
724 IdentityChangeDataSet::self_signing_keys_a(),
725 )
726 .await;
727 }
728
729 pub(super) async fn verify_bob_with(
730 &self,
731 key_query: KeyQueryResponse,
732 master_signing_key: serde_json::Value,
733 self_signing_key: serde_json::Value,
734 ) {
735 self.change_bob_identity(key_query).await;
737
738 let my_user_id = self.client.user_id().expect("I should have a user id");
739 let my_identity = self
740 .client
741 .encryption()
742 .get_user_identity(my_user_id)
743 .await
744 .expect("Should not fail to get own user identity")
745 .expect("Should have an own user identity")
746 .underlying_identity()
747 .own()
748 .expect("Our own identity should be of type Own");
749
750 let signature_upload_request = self
752 .bob_crypto_other_identity()
753 .await
754 .verify()
755 .await
756 .expect("Should be able to verify other identity");
757
758 let verification_response = simulate_key_query_response_for_verification(
759 signature_upload_request,
760 my_identity,
761 my_user_id,
762 self.bob_user_id(),
763 master_signing_key,
764 self_signing_key,
765 );
766
767 self.client
769 .mark_request_as_sent(&TransactionId::new(), &verification_response)
770 .await
771 .unwrap();
772
773 assert!(self.bob_is_verified().await);
775 }
776
777 pub(super) async fn bob_joins(&mut self) {
778 self.bob_membership_change(MembershipState::Join).await;
779 }
780
781 pub(super) async fn bob_leaves(&mut self) {
782 self.bob_membership_change(MembershipState::Leave).await;
783 }
784
785 pub(super) async fn subscribe_to_identity_status_changes(
786 &self,
787 ) -> impl Stream<Item = Vec<IdentityStatusChange>> {
788 self.room
789 .subscribe_to_identity_status_changes()
790 .await
791 .expect("Should be able to subscribe")
792 }
793
794 async fn init() -> (Client, OwnedUserId, SyncResponseBuilder) {
795 let (client, _server) = create_client_and_server().await;
796
797 client
799 .olm_machine()
800 .await
801 .as_ref()
802 .expect("We should have an Olm machine")
803 .bootstrap_cross_signing(true)
804 .await
805 .expect("Should be able to bootstrap cross-signing");
806
807 let bob_user_id = owned_user_id!("@bob:localhost");
810
811 let sync_response_builder = SyncResponseBuilder::default();
812
813 (client, bob_user_id, sync_response_builder)
814 }
815
816 async fn change_bob_identity(
817 &self,
818 key_query_response: get_keys::v3::Response,
819 ) -> OtherUserIdentity {
820 self.client
821 .mark_request_as_sent(&TransactionId::new(), &key_query_response)
822 .await
823 .expect("Should not fail to send identity changes");
824
825 self.bob_crypto_other_identity().await
826 }
827
828 async fn bob_membership_change(&mut self, new_state: MembershipState) {
829 let sync_response = self
830 .sync_response_builder
831 .add_joined_room(JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID).add_state_event(
832 StateTestEvent::Custom(sync_response_member(
833 &self.bob_user_id,
834 new_state.clone(),
835 )),
836 ))
837 .build_sync_response();
838 self.room.client.process_sync(sync_response).await.unwrap();
839
840 let m = self
842 .room
843 .get_member_no_sync(&self.bob_user_id)
844 .await
845 .expect("Should not fail to get member");
846
847 match (&new_state, m) {
848 (MembershipState::Leave, None) => {}
849 (_, None) => {
850 panic!("Member should exist")
851 }
852 (_, Some(m)) => {
853 assert_eq!(*m.membership(), new_state);
854 }
855 };
856 }
857
858 async fn bob_is_pinned(&self) -> bool {
859 !self.bob_crypto_other_identity().await.identity_needs_user_approval()
860 }
861
862 async fn bob_is_verified(&self) -> bool {
863 self.bob_crypto_other_identity().await.is_verified()
864 }
865
866 async fn bob_crypto_other_identity(&self) -> OtherUserIdentity {
867 self.bob_user_identity()
868 .await
869 .expect("User identity should exist")
870 .underlying_identity()
871 .other()
872 .expect("Identity should be Other, not Own")
873 }
874
875 async fn bob_user_identity(&self) -> Option<UserIdentity> {
876 self.client
877 .encryption()
878 .get_user_identity(&self.bob_user_id)
879 .await
880 .expect("Should not fail to get user identity")
881 }
882 }
883
884 async fn create_just_me_room(
885 client: &Client,
886 sync_response_builder: &mut SyncResponseBuilder,
887 ) -> Room {
888 let create_room_sync_response = sync_response_builder
889 .add_joined_room(
890 JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID)
891 .add_state_event(StateTestEvent::Member),
892 )
893 .build_sync_response();
894 client.process_sync(create_room_sync_response).await.unwrap();
895 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
896 assert_eq!(room.state(), RoomState::Joined);
897 room
898 }
899
900 async fn create_room_with_other_member(
901 builder: &mut SyncResponseBuilder,
902 client: &Client,
903 other_user_id: &UserId,
904 ) -> Room {
905 let create_room_sync_response = builder
906 .add_joined_room(
907 JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID)
908 .add_state_event(StateTestEvent::Member)
909 .add_state_event(StateTestEvent::Custom(sync_response_member(
910 other_user_id,
911 MembershipState::Join,
912 ))),
913 )
914 .build_sync_response();
915 client.process_sync(create_room_sync_response).await.unwrap();
916 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
917 room.inner.mark_members_synced();
918
919 assert_eq!(room.state(), RoomState::Joined);
920 assert_eq!(
921 *room
922 .get_member_no_sync(other_user_id)
923 .await
924 .expect("Should not fail to get member")
925 .expect("Member should exist")
926 .membership(),
927 MembershipState::Join
928 );
929 room
930 }
931
932 async fn create_client_and_server() -> (Client, MockServer) {
933 let server = MockServer::start().await;
934 mock_members_request(&server).await;
935 mock_secret_storage_default_key(&server).await;
936 let client = logged_in_client(Some(server.uri())).await;
937 (client, server)
938 }
939
940 async fn mock_members_request(server: &MockServer) {
941 Mock::given(method("GET"))
942 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/members"))
943 .and(header("authorization", "Bearer 1234"))
944 .respond_with(
945 ResponseTemplate::new(200).set_body_json(&*test_json::members::MEMBERS),
946 )
947 .mount(server)
948 .await;
949 }
950
951 async fn mock_secret_storage_default_key(server: &MockServer) {
952 Mock::given(method("GET"))
953 .and(path_regex(
954 r"^/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
955 ))
956 .and(header("authorization", "Bearer 1234"))
957 .respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
958 .mount(server)
959 .await;
960 }
961
962 fn sync_response_member(
963 user_id: &UserId,
964 membership: MembershipState,
965 ) -> serde_json::Value {
966 json!({
967 "content": {
968 "membership": membership.to_string(),
969 },
970 "event_id": format!(
971 "$aa{}bb:localhost",
972 SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() % 100_000
973 ),
974 "origin_server_ts": 1472735824,
975 "sender": "@example:localhost",
976 "state_key": user_id,
977 "type": "m.room.member",
978 "unsigned": {
979 "age": 1234
980 }
981 })
982 }
983 }
984}