matrix_sdk/room/
identity_status_changes.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Facility to track changes to the identity of members of rooms.
16#![cfg(feature = "e2e-encryption")]
17
18use std::collections::BTreeMap;
19
20use async_stream::stream;
21use futures_core::Stream;
22use futures_util::{stream_select, StreamExt};
23use matrix_sdk_base::crypto::{
24    IdentityState, IdentityStatusChange, RoomIdentityChange, RoomIdentityState,
25};
26use ruma::{events::room::member::SyncRoomMemberEvent, OwnedUserId, UserId};
27use tokio::sync::mpsc;
28use tokio_stream::wrappers::ReceiverStream;
29
30use super::Room;
31use crate::{
32    encryption::identities::{IdentityUpdates, UserIdentity},
33    event_handler::EventHandlerDropGuard,
34    Client, Error, Result,
35};
36
37/// Support for creating a stream of batches of [`IdentityStatusChange`].
38///
39/// Internally, this subscribes to all identity changes, and to room events that
40/// change the membership, and provides a stream of all changes to the identity
41/// status of all room members.
42///
43/// This struct does not represent the actual stream, but the state that is used
44/// to produce the values of the stream.
45///
46/// It does provide a method to create the stream:
47/// [`IdentityStatusChanges::create_stream`].
48#[derive(Debug)]
49pub struct IdentityStatusChanges {
50    /// Who is in the room and who is in identity violation at this moment
51    room_identity_state: RoomIdentityState<Room>,
52
53    /// Dropped when we are dropped, and unregisters the event handler we
54    /// registered to listen for room events
55    _drop_guard: EventHandlerDropGuard,
56}
57
58impl IdentityStatusChanges {
59    /// Create a new stream of significant changes to the identity status of
60    /// members of a room.
61    ///
62    /// The "status" of an identity changes when our level of trust in it
63    /// changes.
64    ///
65    /// A "significant" change means a warning should either be added or removed
66    /// (e.g. the user changed from pinned to unpinned (show a warning) or
67    /// from verification violation to pinned (remove a warning). An
68    /// insignificant change would be from pinned to verified - no warning
69    /// is needed in this case.
70    ///
71    /// For example, if an identity is "pinned" i.e. not manually verified, but
72    /// known, and it becomes a "unpinned" i.e. unknown, because the
73    /// encryption keys are different and the user has not acknowledged
74    /// this, then this constitutes a status change. Also, if an identity is
75    /// "unpinned" and becomes "pinned", this is also a status change.
76    ///
77    /// The supplied stream is intended to provide enough information for a
78    /// client to display a list of room members whose identities have
79    /// changed, and allow the user to acknowledge this or act upon it.
80    ///
81    /// The first item in the stream provides the current state of the room:
82    /// each member of the room who is not in "pinned" or "verified" state will
83    /// be included (except the current user).
84    ///
85    /// Note: when an unpinned user leaves a room, an update is generated
86    /// stating that they have become pinned, even though they may not
87    /// necessarily have become pinned, but we don't care any more because they
88    /// left the room.
89    pub async fn create_stream(
90        room: Room,
91    ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>>> {
92        let identity_updates = wrap_identity_updates(&room.client).await?;
93        let (drop_guard, room_member_events) = wrap_room_member_events(&room);
94        let mut unprocessed_stream = combine_streams(identity_updates, room_member_events);
95        let own_user_id = room.client.user_id().ok_or(Error::InsufficientData)?.to_owned();
96
97        let mut state = IdentityStatusChanges {
98            room_identity_state: RoomIdentityState::new(room).await,
99            _drop_guard: drop_guard,
100        };
101
102        Ok(stream!({
103            let mut current_state =
104                filter_for_initial_update(state.room_identity_state.current_state(), &own_user_id);
105
106            if !current_state.is_empty() {
107                current_state.sort();
108                yield current_state;
109            }
110
111            while let Some(item) = unprocessed_stream.next().await {
112                let mut update = filter_non_self(
113                    state.room_identity_state.process_change(item).await,
114                    &own_user_id,
115                );
116                if !update.is_empty() {
117                    update.sort();
118                    yield update;
119                }
120            }
121        }))
122    }
123}
124
125fn filter_for_initial_update(
126    mut input: Vec<IdentityStatusChange>,
127    own_user_id: &UserId,
128) -> Vec<IdentityStatusChange> {
129    // We are never interested in changes to our own identity, and also for initial
130    // updates, we are only interested in "bad" states where we need to
131    // notify the user, so we can remove Verified states (Pinned states are
132    // already missing, because Pinned is considered the default).
133    input.retain(|change| {
134        change.user_id != own_user_id && change.changed_to != IdentityState::Verified
135    });
136
137    input
138}
139
140fn filter_non_self(
141    mut input: Vec<IdentityStatusChange>,
142    own_user_id: &UserId,
143) -> Vec<IdentityStatusChange> {
144    // We are never interested in changes to our own identity
145    input.retain(|change| change.user_id != own_user_id);
146    input
147}
148
149fn combine_streams(
150    identity_updates: impl Stream<Item = RoomIdentityChange> + Unpin,
151    room_member_events: impl Stream<Item = RoomIdentityChange> + Unpin,
152) -> impl Stream<Item = RoomIdentityChange> {
153    stream_select!(identity_updates, room_member_events)
154}
155
156async fn wrap_identity_updates(client: &Client) -> Result<impl Stream<Item = RoomIdentityChange>> {
157    Ok(client
158        .encryption()
159        .user_identities_stream()
160        .await?
161        .map(|item| RoomIdentityChange::IdentityUpdates(to_base_updates(item))))
162}
163
164fn to_base_updates(
165    input: IdentityUpdates,
166) -> matrix_sdk_base::crypto::store::types::IdentityUpdates {
167    matrix_sdk_base::crypto::store::types::IdentityUpdates {
168        new: to_base_identities(input.new),
169        changed: to_base_identities(input.changed),
170        unchanged: Default::default(),
171    }
172}
173
174fn to_base_identities(
175    input: BTreeMap<OwnedUserId, UserIdentity>,
176) -> BTreeMap<OwnedUserId, matrix_sdk_base::crypto::UserIdentity> {
177    input.into_iter().map(|(k, v)| (k, v.underlying_identity())).collect()
178}
179
180fn wrap_room_member_events(
181    room: &Room,
182) -> (EventHandlerDropGuard, impl Stream<Item = RoomIdentityChange>) {
183    let own_user_id = room.own_user_id().to_owned();
184    let room_id = room.room_id();
185    let (sender, receiver) = mpsc::channel(16);
186    let handle =
187        room.client.add_room_event_handler(room_id, move |event: SyncRoomMemberEvent| async move {
188            if *event.state_key() == own_user_id {
189                return;
190            }
191            let _: Result<_, _> =
192                sender.send(RoomIdentityChange::SyncRoomMemberEvent(Box::new(event))).await;
193        });
194    let drop_guard = room.client.event_handler_drop_guard(handle);
195    (drop_guard, ReceiverStream::new(receiver))
196}
197
198#[cfg(all(test, not(target_family = "wasm")))]
199mod tests {
200    use std::time::Duration;
201
202    use futures_util::{pin_mut, FutureExt as _, StreamExt as _};
203    use matrix_sdk_base::crypto::IdentityState;
204    use matrix_sdk_test::{async_test, test_json::keys_query_sets::IdentityChangeDataSet};
205    use test_setup::TestSetup;
206
207    use crate::assert_next_with_timeout;
208
209    #[async_test]
210    async fn test_when_user_becomes_unpinned_we_report_it() {
211        // Given a room containing us and Bob
212        let t = TestSetup::new_room_with_other_bob().await;
213
214        // And Bob's identity is pinned
215        t.pin_bob().await;
216
217        // And we are listening for identity changes
218        let stream = t.subscribe_to_identity_status_changes().await;
219        pin_mut!(stream);
220
221        // When Bob becomes unpinned
222        t.unpin_bob().await;
223
224        // Then we were notified about it
225        let change = assert_next_with_timeout!(stream);
226        assert_eq!(change[0].user_id, t.bob_user_id());
227        assert_eq!(change[0].changed_to, IdentityState::PinViolation);
228        assert_eq!(change.len(), 1);
229    }
230
231    #[async_test]
232    async fn test_when_user_becomes_verification_violation_we_report_it() {
233        // Given a room containing us and Bob
234        let t = TestSetup::new_room_with_other_bob().await;
235
236        // And Bob's identity is verified
237        t.verify_bob().await;
238
239        // And we are listening for identity changes
240        let stream = t.subscribe_to_identity_status_changes().await;
241        pin_mut!(stream);
242
243        // When Bob's identity changes
244        t.unpin_bob().await;
245
246        // Then we were notified about a verification violation
247        let change = assert_next_with_timeout!(stream);
248        assert_eq!(change[0].user_id, t.bob_user_id());
249        assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
250        assert_eq!(change.len(), 1);
251    }
252
253    #[async_test]
254    async fn test_when_user_becomes_pinned_we_report_it() {
255        // Given a room containing us and Bob
256        let t = TestSetup::new_room_with_other_bob().await;
257
258        // And Bob's identity is unpinned
259        t.unpin_bob().await;
260
261        // And we are listening for identity changes
262        let stream = t.subscribe_to_identity_status_changes().await;
263        pin_mut!(stream);
264
265        // When Bob becomes pinned
266        t.pin_bob().await;
267
268        // Then we were notified about the initial state of the room
269        let change1 = assert_next_with_timeout!(stream);
270        assert_eq!(change1[0].user_id, t.bob_user_id());
271        assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
272        assert_eq!(change1.len(), 1);
273
274        // And the change when Bob became pinned
275        let change2 = assert_next_with_timeout!(stream);
276        assert_eq!(change2[0].user_id, t.bob_user_id());
277        assert_eq!(change2[0].changed_to, IdentityState::Pinned);
278        assert_eq!(change2.len(), 1);
279    }
280
281    #[async_test]
282    async fn test_when_user_becomes_verified_we_report_it() {
283        // Given a room containing us and Bob
284        let t = TestSetup::new_room_with_other_bob().await;
285
286        // And we are listening for identity changes
287        let stream = t.subscribe_to_identity_status_changes().await;
288        pin_mut!(stream);
289
290        // When Bob becomes verified
291        t.verify_bob().await;
292
293        // Then we are notified about Bob being verified
294        let change = assert_next_with_timeout!(stream);
295        assert_eq!(change[0].user_id, t.bob_user_id());
296        assert_eq!(change[0].changed_to, IdentityState::Verified);
297        assert_eq!(change.len(), 1);
298
299        // (And then unpinned, so we have something to come through the stream)
300        t.unpin_bob().await;
301
302        // Then we are notified about the unpinning part
303        let change = assert_next_with_timeout!(stream);
304        assert_eq!(change[0].user_id, t.bob_user_id());
305        assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
306        assert_eq!(change.len(), 1);
307    }
308
309    #[async_test]
310    async fn test_when_an_unpinned_user_becomes_verified_we_report_it() {
311        // Given a room containing us and Bob
312        let t = TestSetup::new_room_with_other_bob().await;
313
314        // And Bob's identity is unpinned
315        t.unpin_bob_with(IdentityChangeDataSet::key_query_with_identity_a()).await;
316
317        // And we are listening for identity changes
318        let stream = t.subscribe_to_identity_status_changes().await;
319        pin_mut!(stream);
320
321        // When Bob becomes verified
322        t.verify_bob().await;
323
324        // Then we were notified about the initial state of the room
325        let change1 = assert_next_with_timeout!(stream);
326        assert_eq!(change1[0].user_id, t.bob_user_id());
327        assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
328        assert_eq!(change1.len(), 1);
329
330        // And the change when Bob became verified
331        let change2 = assert_next_with_timeout!(stream);
332        assert_eq!(change2[0].user_id, t.bob_user_id());
333        assert_eq!(change2[0].changed_to, IdentityState::Verified);
334        assert_eq!(change2.len(), 1);
335    }
336
337    #[async_test]
338    async fn test_when_user_in_verification_violation_becomes_verified_we_report_it() {
339        // Given a room containing us and Bob
340        let t = TestSetup::new_room_with_other_bob().await;
341
342        // And Bob is in verification violation
343        t.verify_bob_with(
344            IdentityChangeDataSet::key_query_with_identity_b(),
345            IdentityChangeDataSet::master_signing_keys_b(),
346            IdentityChangeDataSet::self_signing_keys_b(),
347        )
348        .await;
349        t.unpin_bob().await;
350
351        // And we are listening for identity changes
352        let stream = t.subscribe_to_identity_status_changes().await;
353        pin_mut!(stream);
354
355        // When Bob becomes verified
356        t.verify_bob().await;
357
358        // Then we were notified about the initial state of the room
359        let change1 = assert_next_with_timeout!(stream);
360        assert_eq!(change1[0].user_id, t.bob_user_id());
361        assert_eq!(change1[0].changed_to, IdentityState::VerificationViolation);
362        assert_eq!(change1.len(), 1);
363
364        // And the change when Bob became verified
365        let change2 = assert_next_with_timeout!(stream);
366        assert_eq!(change2[0].user_id, t.bob_user_id());
367        assert_eq!(change2[0].changed_to, IdentityState::Verified);
368        assert_eq!(change2.len(), 1);
369    }
370
371    #[async_test]
372    async fn test_when_an_unpinned_user_joins_we_report_it() {
373        // Given a room containing just us
374        let mut t = TestSetup::new_just_me_room().await;
375
376        // And Bob's identity is unpinned
377        t.unpin_bob().await;
378
379        // And we are listening for identity changes
380        let stream = t.subscribe_to_identity_status_changes().await;
381        pin_mut!(stream);
382
383        // When Bob joins the room
384        t.bob_joins().await;
385
386        // Then we were notified about it
387        let change = assert_next_with_timeout!(stream);
388        assert_eq!(change[0].user_id, t.bob_user_id());
389        assert_eq!(change[0].changed_to, IdentityState::PinViolation);
390        assert_eq!(change.len(), 1);
391    }
392
393    #[async_test]
394    async fn test_when_an_verification_violating_user_joins_we_report_it() {
395        // Given a room containing just us
396        let mut t = TestSetup::new_just_me_room().await;
397
398        // And Bob's identity is in verification violation
399        t.verify_bob().await;
400        t.unpin_bob().await;
401
402        // And we are listening for identity changes
403        let stream = t.subscribe_to_identity_status_changes().await;
404        pin_mut!(stream);
405
406        // When Bob joins the room
407        t.bob_joins().await;
408
409        // Then we were notified about it
410        let change = assert_next_with_timeout!(stream);
411        assert_eq!(change[0].user_id, t.bob_user_id());
412        assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
413        assert_eq!(change.len(), 1);
414    }
415
416    #[async_test]
417    async fn test_when_a_verified_user_joins_we_dont_report_it() {
418        // Given a room containing just us
419        let mut t = TestSetup::new_just_me_room().await;
420
421        // And Bob's identity is verified
422        t.verify_bob().await;
423
424        // And we are listening for identity changes
425        let stream = t.subscribe_to_identity_status_changes().await;
426        pin_mut!(stream);
427
428        // When Bob joins the room
429        t.bob_joins().await;
430
431        // (Then becomes unpinned so we have something to report)
432        t.unpin_bob().await;
433
434        //// Then we were only notified about the unpin
435        let change = assert_next_with_timeout!(stream);
436        assert_eq!(change[0].user_id, t.bob_user_id());
437        assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
438        assert_eq!(change.len(), 1);
439    }
440
441    #[async_test]
442    async fn test_when_a_pinned_user_joins_we_do_not_report() {
443        // Given a room containing just us
444        let mut t = TestSetup::new_just_me_room().await;
445
446        // And Bob's identity is unpinned
447        t.pin_bob().await;
448
449        // And we are listening for identity changes
450        let stream = t.subscribe_to_identity_status_changes().await;
451        pin_mut!(stream);
452
453        // When Bob joins the room
454        t.bob_joins().await;
455
456        // Then there is no notification
457        tokio::time::sleep(Duration::from_millis(200)).await;
458        let change = stream.next().now_or_never();
459        assert!(change.is_none());
460    }
461
462    #[async_test]
463    async fn test_when_an_unpinned_user_leaves_we_report_it() {
464        // Given a room containing us and Bob
465        let mut t = TestSetup::new_room_with_other_bob().await;
466
467        // And Bob's identity is unpinned
468        t.unpin_bob().await;
469
470        // And we are listening for identity changes
471        let stream = t.subscribe_to_identity_status_changes().await;
472        pin_mut!(stream);
473
474        // When Bob leaves the room
475        t.bob_leaves().await;
476
477        // Then we were notified about the initial state of the room
478        let change1 = assert_next_with_timeout!(stream);
479        assert_eq!(change1[0].user_id, t.bob_user_id());
480        assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
481        assert_eq!(change1.len(), 1);
482
483        // And we were notified about the change when the user left
484        let change2 = assert_next_with_timeout!(stream);
485        // Note: the user left the room, but we see that as them "becoming pinned" i.e.
486        // "you no longer need to notify about this user".
487        assert_eq!(change2[0].user_id, t.bob_user_id());
488        assert_eq!(change2[0].changed_to, IdentityState::Pinned);
489        assert_eq!(change2.len(), 1);
490    }
491
492    #[async_test]
493    async fn test_multiple_identity_changes_are_reported() {
494        // Given a room containing just us
495        let mut t = TestSetup::new_just_me_room().await;
496
497        // And Bob's identity is unpinned
498        t.unpin_bob().await;
499
500        // And we are listening for identity changes
501        let stream = t.subscribe_to_identity_status_changes().await;
502        pin_mut!(stream);
503
504        // NOTE: below we pull the changes out of the subscription after each action.
505        // This makes sure that the identity changes and membership changes are properly
506        // ordered. If we pull them out later, the identity changes get shifted forward
507        // because they rely on less-complex async stuff under the hood. Calling
508        // next_change ends up winding the async machinery sufficiently that the
509        // membership change and any subsequent events have fully completed.
510
511        // When Bob joins the room ...
512        t.bob_joins().await;
513        let change1 = assert_next_with_timeout!(stream);
514
515        // ... becomes pinned ...
516        t.pin_bob().await;
517        let change2 = assert_next_with_timeout!(stream);
518
519        // ... leaves and joins again (ignored since they stay pinned) ...
520        t.bob_leaves().await;
521        t.bob_joins().await;
522
523        // ... becomes unpinned ...
524        t.unpin_bob().await;
525        let change3 = assert_next_with_timeout!(stream);
526
527        // ... and leaves.
528        t.bob_leaves().await;
529        let change4 = assert_next_with_timeout!(stream);
530
531        assert_eq!(change1[0].user_id, t.bob_user_id());
532        assert_eq!(change2[0].user_id, t.bob_user_id());
533        assert_eq!(change3[0].user_id, t.bob_user_id());
534        assert_eq!(change4[0].user_id, t.bob_user_id());
535
536        assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
537        assert_eq!(change2[0].changed_to, IdentityState::Pinned);
538        assert_eq!(change3[0].changed_to, IdentityState::PinViolation);
539        assert_eq!(change4[0].changed_to, IdentityState::Pinned);
540
541        assert_eq!(change1.len(), 1);
542        assert_eq!(change2.len(), 1);
543        assert_eq!(change3.len(), 1);
544        assert_eq!(change4.len(), 1);
545    }
546
547    #[async_test]
548    async fn test_when_an_unpinned_user_is_already_present_we_report_it_immediately() {
549        // Given a room containing Bob, who is unpinned
550        let t = TestSetup::new_room_with_other_bob().await;
551        t.unpin_bob().await;
552
553        // When we start listening for identity changes
554        let stream = t.subscribe_to_identity_status_changes().await;
555        pin_mut!(stream);
556
557        // Then we were immediately notified about Bob being unpinned
558        let change = assert_next_with_timeout!(stream);
559        assert_eq!(change[0].user_id, t.bob_user_id());
560        assert_eq!(change[0].changed_to, IdentityState::PinViolation);
561        assert_eq!(change.len(), 1);
562    }
563
564    #[async_test]
565    async fn test_when_a_verified_user_is_already_present_we_dont_report_it() {
566        // Given a room containing Bob, who is unpinned
567        let t = TestSetup::new_room_with_other_bob().await;
568        t.verify_bob().await;
569
570        // When we start listening for identity changes
571        let stream = t.subscribe_to_identity_status_changes().await;
572        pin_mut!(stream);
573
574        // (And we unpin so that something is available in the changes stream)
575        t.unpin_bob().await;
576
577        // Then we were only notified about the unpin, not being verified
578        let next_change = assert_next_with_timeout!(stream);
579
580        assert_eq!(next_change[0].user_id, t.bob_user_id());
581        assert_eq!(next_change[0].changed_to, IdentityState::VerificationViolation);
582        assert_eq!(next_change.len(), 1);
583    }
584
585    // TODO: I (andyb) haven't figured out how to test room membership changes that
586    // affect our own user (they should not be shown). Specifically, I haven't
587    // figure out how to get out own user into a non-pinned state.
588
589    mod test_setup {
590        use std::time::{SystemTime, UNIX_EPOCH};
591
592        use futures_core::Stream;
593        use matrix_sdk_base::{
594            crypto::{
595                testing::simulate_key_query_response_for_verification, IdentityStatusChange,
596                OtherUserIdentity,
597            },
598            RoomState,
599        };
600        use matrix_sdk_test::{
601            test_json, test_json::keys_query_sets::IdentityChangeDataSet, JoinedRoomBuilder,
602            StateTestEvent, SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
603        };
604        use ruma::{
605            api::client::keys::{get_keys, get_keys::v3::Response as KeyQueryResponse},
606            events::room::member::MembershipState,
607            owned_user_id, OwnedUserId, TransactionId, UserId,
608        };
609        use serde_json::json;
610        use wiremock::{
611            matchers::{header, method, path_regex},
612            Mock, MockServer, ResponseTemplate,
613        };
614
615        use crate::{
616            encryption::identities::UserIdentity, test_utils::logged_in_client, Client, Room,
617        };
618
619        /// Sets up a client and a room and allows changing user identities and
620        /// room memberships. Note: most methods e.g. [`TestSetup::bob_user_id`]
621        /// are talking about the OTHER user, not our own user. Only
622        /// methods starting with `self_` are talking about this user.
623        ///
624        /// This user is called `@example:localhost` but is rarely used
625        /// mentioned.
626        ///
627        /// The other user is called `@bob:localhost`.
628        pub(super) struct TestSetup {
629            client: Client,
630            bob_user_id: OwnedUserId,
631            sync_response_builder: SyncResponseBuilder,
632            room: Room,
633        }
634
635        impl TestSetup {
636            pub(super) async fn new_just_me_room() -> Self {
637                let (client, user_id, mut sync_response_builder) = Self::init().await;
638                let room = create_just_me_room(&client, &mut sync_response_builder).await;
639                Self { client, bob_user_id: user_id, sync_response_builder, room }
640            }
641
642            pub(super) async fn new_room_with_other_bob() -> Self {
643                let (client, bob_user_id, mut sync_response_builder) = Self::init().await;
644                let room = create_room_with_other_member(
645                    &mut sync_response_builder,
646                    &client,
647                    &bob_user_id,
648                )
649                .await;
650                Self { client, bob_user_id, sync_response_builder, room }
651            }
652
653            pub(super) fn bob_user_id(&self) -> &UserId {
654                &self.bob_user_id
655            }
656
657            pub(super) async fn pin_bob(&self) {
658                if self.bob_user_identity().await.is_some() {
659                    assert!(
660                        !self.bob_is_pinned().await,
661                        "pin_bob() called when the identity is already pinned!"
662                    );
663
664                    // Pin it
665                    self.bob_user_identity()
666                        .await
667                        .expect("User should exist")
668                        .pin()
669                        .await
670                        .expect("Should not fail to pin");
671                } else {
672                    // There was no existing identity. Set one. It will be pinned by default.
673                    self.change_bob_identity(IdentityChangeDataSet::key_query_with_identity_a())
674                        .await;
675                }
676
677                // Sanity check: they are pinned
678                assert!(self.bob_is_pinned().await);
679            }
680
681            pub(super) async fn unpin_bob(&self) {
682                self.unpin_bob_with(IdentityChangeDataSet::key_query_with_identity_b()).await;
683            }
684
685            pub(super) async fn unpin_bob_with(&self, requested: KeyQueryResponse) {
686                fn master_key_json(key_query_response: &KeyQueryResponse) -> String {
687                    serde_json::to_string(
688                        key_query_response
689                            .master_keys
690                            .first_key_value()
691                            .expect("Master key should have a value")
692                            .1,
693                    )
694                    .expect("Should be able to serialise master key")
695                }
696
697                let a = IdentityChangeDataSet::key_query_with_identity_a();
698                let b = IdentityChangeDataSet::key_query_with_identity_b();
699                let requested_master_key = master_key_json(&requested);
700                let a_master_key = master_key_json(&a);
701
702                // Change/set their identity pin it, then change it again - this will definitely
703                // unpin, even if the first identity we supply is their very first, making them
704                // initially pinned.
705                if requested_master_key == a_master_key {
706                    self.change_bob_identity(b).await;
707                    if !self.bob_is_pinned().await {
708                        self.pin_bob().await;
709                    }
710                    self.change_bob_identity(a).await;
711                } else {
712                    self.change_bob_identity(a).await;
713                    if !self.bob_is_pinned().await {
714                        self.pin_bob().await;
715                    }
716                    self.change_bob_identity(b).await;
717                }
718
719                // Sanity: they are unpinned
720                assert!(!self.bob_is_pinned().await);
721            }
722
723            pub(super) async fn verify_bob(&self) {
724                self.verify_bob_with(
725                    IdentityChangeDataSet::key_query_with_identity_a(),
726                    IdentityChangeDataSet::master_signing_keys_a(),
727                    IdentityChangeDataSet::self_signing_keys_a(),
728                )
729                .await;
730            }
731
732            pub(super) async fn verify_bob_with(
733                &self,
734                key_query: KeyQueryResponse,
735                master_signing_key: serde_json::Value,
736                self_signing_key: serde_json::Value,
737            ) {
738                // Make sure the requested identity is set
739                self.change_bob_identity(key_query).await;
740
741                let my_user_id = self.client.user_id().expect("I should have a user id");
742                let my_identity = self
743                    .client
744                    .encryption()
745                    .get_user_identity(my_user_id)
746                    .await
747                    .expect("Should not fail to get own user identity")
748                    .expect("Should have an own user identity")
749                    .underlying_identity()
750                    .own()
751                    .expect("Our own identity should be of type Own");
752
753                // Get the request
754                let signature_upload_request = self
755                    .bob_crypto_other_identity()
756                    .await
757                    .verify()
758                    .await
759                    .expect("Should be able to verify other identity");
760
761                let verification_response = simulate_key_query_response_for_verification(
762                    signature_upload_request,
763                    my_identity,
764                    my_user_id,
765                    self.bob_user_id(),
766                    master_signing_key,
767                    self_signing_key,
768                );
769
770                // Receive the response into our client
771                self.client
772                    .mark_request_as_sent(&TransactionId::new(), &verification_response)
773                    .await
774                    .unwrap();
775
776                // Sanity: they are verified
777                assert!(self.bob_is_verified().await);
778            }
779
780            pub(super) async fn bob_joins(&mut self) {
781                self.bob_membership_change(MembershipState::Join).await;
782            }
783
784            pub(super) async fn bob_leaves(&mut self) {
785                self.bob_membership_change(MembershipState::Leave).await;
786            }
787
788            pub(super) async fn subscribe_to_identity_status_changes(
789                &self,
790            ) -> impl Stream<Item = Vec<IdentityStatusChange>> {
791                self.room
792                    .subscribe_to_identity_status_changes()
793                    .await
794                    .expect("Should be able to subscribe")
795            }
796
797            async fn init() -> (Client, OwnedUserId, SyncResponseBuilder) {
798                let (client, _server) = create_client_and_server().await;
799
800                // Ensure our user has cross-signing keys etc.
801                client
802                    .olm_machine()
803                    .await
804                    .as_ref()
805                    .expect("We should have an Olm machine")
806                    .bootstrap_cross_signing(true)
807                    .await
808                    .expect("Should be able to bootstrap cross-signing");
809
810                // Note: if you change the user_id, you will need to change lots of hard-coded
811                // stuff inside IdentityChangeDataSet
812                let bob_user_id = owned_user_id!("@bob:localhost");
813
814                let sync_response_builder = SyncResponseBuilder::default();
815
816                (client, bob_user_id, sync_response_builder)
817            }
818
819            async fn change_bob_identity(
820                &self,
821                key_query_response: get_keys::v3::Response,
822            ) -> OtherUserIdentity {
823                self.client
824                    .mark_request_as_sent(&TransactionId::new(), &key_query_response)
825                    .await
826                    .expect("Should not fail to send identity changes");
827
828                self.bob_crypto_other_identity().await
829            }
830
831            async fn bob_membership_change(&mut self, new_state: MembershipState) {
832                let sync_response = self
833                    .sync_response_builder
834                    .add_joined_room(JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID).add_state_event(
835                        StateTestEvent::Custom(sync_response_member(
836                            &self.bob_user_id,
837                            new_state.clone(),
838                        )),
839                    ))
840                    .build_sync_response();
841                self.room.client.process_sync(sync_response).await.unwrap();
842
843                // Make sure the membership stuck as expected
844                let m = self
845                    .room
846                    .get_member_no_sync(&self.bob_user_id)
847                    .await
848                    .expect("Should not fail to get member");
849
850                match (&new_state, m) {
851                    (MembershipState::Leave, None) => {}
852                    (_, None) => {
853                        panic!("Member should exist")
854                    }
855                    (_, Some(m)) => {
856                        assert_eq!(*m.membership(), new_state);
857                    }
858                };
859            }
860
861            async fn bob_is_pinned(&self) -> bool {
862                !self.bob_crypto_other_identity().await.identity_needs_user_approval()
863            }
864
865            async fn bob_is_verified(&self) -> bool {
866                self.bob_crypto_other_identity().await.is_verified()
867            }
868
869            async fn bob_crypto_other_identity(&self) -> OtherUserIdentity {
870                self.bob_user_identity()
871                    .await
872                    .expect("User identity should exist")
873                    .underlying_identity()
874                    .other()
875                    .expect("Identity should be Other, not Own")
876            }
877
878            async fn bob_user_identity(&self) -> Option<UserIdentity> {
879                self.client
880                    .encryption()
881                    .get_user_identity(&self.bob_user_id)
882                    .await
883                    .expect("Should not fail to get user identity")
884            }
885        }
886
887        async fn create_just_me_room(
888            client: &Client,
889            sync_response_builder: &mut SyncResponseBuilder,
890        ) -> Room {
891            let create_room_sync_response = sync_response_builder
892                .add_joined_room(
893                    JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID)
894                        .add_state_event(StateTestEvent::Member),
895                )
896                .build_sync_response();
897            client.process_sync(create_room_sync_response).await.unwrap();
898            let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
899            assert_eq!(room.state(), RoomState::Joined);
900            room
901        }
902
903        async fn create_room_with_other_member(
904            builder: &mut SyncResponseBuilder,
905            client: &Client,
906            other_user_id: &UserId,
907        ) -> Room {
908            let create_room_sync_response = builder
909                .add_joined_room(
910                    JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID)
911                        .add_state_event(StateTestEvent::Member)
912                        .add_state_event(StateTestEvent::Custom(sync_response_member(
913                            other_user_id,
914                            MembershipState::Join,
915                        ))),
916                )
917                .build_sync_response();
918            client.process_sync(create_room_sync_response).await.unwrap();
919            let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
920            room.inner.mark_members_synced();
921
922            assert_eq!(room.state(), RoomState::Joined);
923            assert_eq!(
924                *room
925                    .get_member_no_sync(other_user_id)
926                    .await
927                    .expect("Should not fail to get member")
928                    .expect("Member should exist")
929                    .membership(),
930                MembershipState::Join
931            );
932            room
933        }
934
935        async fn create_client_and_server() -> (Client, MockServer) {
936            let server = MockServer::start().await;
937            mock_members_request(&server).await;
938            mock_secret_storage_default_key(&server).await;
939            let client = logged_in_client(Some(server.uri())).await;
940            (client, server)
941        }
942
943        async fn mock_members_request(server: &MockServer) {
944            Mock::given(method("GET"))
945                .and(path_regex(r"^/_matrix/client/r0/rooms/.*/members"))
946                .and(header("authorization", "Bearer 1234"))
947                .respond_with(
948                    ResponseTemplate::new(200).set_body_json(&*test_json::members::MEMBERS),
949                )
950                .mount(server)
951                .await;
952        }
953
954        async fn mock_secret_storage_default_key(server: &MockServer) {
955            Mock::given(method("GET"))
956                .and(path_regex(
957                    r"^/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
958                ))
959                .and(header("authorization", "Bearer 1234"))
960                .respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
961                .mount(server)
962                .await;
963        }
964
965        fn sync_response_member(
966            user_id: &UserId,
967            membership: MembershipState,
968        ) -> serde_json::Value {
969            json!({
970                "content": {
971                    "membership": membership.to_string(),
972                },
973                "event_id": format!(
974                    "$aa{}bb:localhost",
975                    SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() % 100_000
976                ),
977                "origin_server_ts": 1472735824,
978                "sender": "@example:localhost",
979                "state_key": user_id,
980                "type": "m.room.member",
981                "unsigned": {
982                    "age": 1234
983                }
984            })
985        }
986    }
987}