matrix_sdk/room/
identity_status_changes.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Facility to track changes to the identity of members of rooms.
16#![cfg(feature = "e2e-encryption")]
17
18use std::collections::BTreeMap;
19
20use async_stream::stream;
21use futures_core::Stream;
22use futures_util::{StreamExt, stream_select};
23use matrix_sdk_base::crypto::{
24    IdentityState, IdentityStatusChange, RoomIdentityChange, RoomIdentityState,
25};
26use ruma::{OwnedUserId, UserId, events::room::member::SyncRoomMemberEvent};
27use tokio::sync::mpsc;
28use tokio_stream::wrappers::ReceiverStream;
29
30use super::Room;
31use crate::{
32    Client, Error, Result,
33    encryption::identities::{IdentityUpdates, UserIdentity},
34    event_handler::EventHandlerDropGuard,
35};
36
37/// Support for creating a stream of batches of [`IdentityStatusChange`].
38///
39/// Internally, this subscribes to all identity changes, and to room events that
40/// change the membership, and provides a stream of all changes to the identity
41/// status of all room members.
42///
43/// This struct does not represent the actual stream, but the state that is used
44/// to produce the values of the stream.
45///
46/// It does provide a method to create the stream:
47/// [`IdentityStatusChanges::create_stream`].
48#[derive(Debug)]
49pub struct IdentityStatusChanges {
50    /// Who is in the room and who is in identity violation at this moment
51    room_identity_state: RoomIdentityState<Room>,
52
53    /// Dropped when we are dropped, and unregisters the event handler we
54    /// registered to listen for room events
55    _drop_guard: EventHandlerDropGuard,
56}
57
58impl IdentityStatusChanges {
59    /// Create a new stream of significant changes to the identity status of
60    /// members of a room.
61    ///
62    /// The "status" of an identity changes when our level of trust in it
63    /// changes.
64    ///
65    /// A "significant" change means a warning should either be added or removed
66    /// (e.g. the user changed from pinned to unpinned (show a warning) or
67    /// from verification violation to pinned (remove a warning). An
68    /// insignificant change would be from pinned to verified - no warning
69    /// is needed in this case.
70    ///
71    /// For example, if an identity is "pinned" i.e. not manually verified, but
72    /// known, and it becomes a "unpinned" i.e. unknown, because the
73    /// encryption keys are different and the user has not acknowledged
74    /// this, then this constitutes a status change. Also, if an identity is
75    /// "unpinned" and becomes "pinned", this is also a status change.
76    ///
77    /// The supplied stream is intended to provide enough information for a
78    /// client to display a list of room members whose identities have
79    /// changed, and allow the user to acknowledge this or act upon it.
80    ///
81    /// The first item in the stream provides the current state of the room:
82    /// each member of the room who is not in "pinned" or "verified" state will
83    /// be included (except the current user).
84    ///
85    /// Note: when an unpinned user leaves a room, an update is generated
86    /// stating that they have become pinned, even though they may not
87    /// necessarily have become pinned, but we don't care any more because they
88    /// left the room.
89    pub async fn create_stream(
90        room: Room,
91    ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>>> {
92        let identity_updates = wrap_identity_updates(&room.client).await?;
93        let (drop_guard, room_member_events) = wrap_room_member_events(&room);
94        let mut unprocessed_stream = combine_streams(identity_updates, room_member_events);
95        let own_user_id = room.client.user_id().ok_or(Error::InsufficientData)?.to_owned();
96
97        let state = IdentityStatusChanges {
98            room_identity_state: RoomIdentityState::new(room).await,
99            _drop_guard: drop_guard,
100        };
101
102        Ok(stream!({
103            // Force enclosing stream to take ownership of state, so that
104            // _drop_guard does not get dropped.
105            let mut state = state;
106
107            let mut current_state =
108                filter_for_initial_update(state.room_identity_state.current_state(), &own_user_id);
109
110            if !current_state.is_empty() {
111                current_state.sort();
112                yield current_state;
113            }
114
115            while let Some(item) = unprocessed_stream.next().await {
116                let mut update = filter_non_self(
117                    state.room_identity_state.process_change(item).await,
118                    &own_user_id,
119                );
120                if !update.is_empty() {
121                    update.sort();
122                    yield update;
123                }
124            }
125        }))
126    }
127}
128
129fn filter_for_initial_update(
130    mut input: Vec<IdentityStatusChange>,
131    own_user_id: &UserId,
132) -> Vec<IdentityStatusChange> {
133    // We are never interested in changes to our own identity, and also for initial
134    // updates, we are only interested in "bad" states where we need to
135    // notify the user, so we can remove Verified states (Pinned states are
136    // already missing, because Pinned is considered the default).
137    input.retain(|change| {
138        change.user_id != own_user_id && change.changed_to != IdentityState::Verified
139    });
140
141    input
142}
143
144fn filter_non_self(
145    mut input: Vec<IdentityStatusChange>,
146    own_user_id: &UserId,
147) -> Vec<IdentityStatusChange> {
148    // We are never interested in changes to our own identity
149    input.retain(|change| change.user_id != own_user_id);
150    input
151}
152
153fn combine_streams(
154    identity_updates: impl Stream<Item = RoomIdentityChange> + Unpin,
155    room_member_events: impl Stream<Item = RoomIdentityChange> + Unpin,
156) -> impl Stream<Item = RoomIdentityChange> {
157    stream_select!(identity_updates, room_member_events)
158}
159
160async fn wrap_identity_updates(
161    client: &Client,
162) -> Result<impl Stream<Item = RoomIdentityChange> + use<>> {
163    Ok(client
164        .encryption()
165        .user_identities_stream()
166        .await?
167        .map(|item| RoomIdentityChange::IdentityUpdates(to_base_updates(item))))
168}
169
170fn to_base_updates(
171    input: IdentityUpdates,
172) -> matrix_sdk_base::crypto::store::types::IdentityUpdates {
173    matrix_sdk_base::crypto::store::types::IdentityUpdates {
174        new: to_base_identities(input.new),
175        changed: to_base_identities(input.changed),
176        unchanged: Default::default(),
177    }
178}
179
180fn to_base_identities(
181    input: BTreeMap<OwnedUserId, UserIdentity>,
182) -> BTreeMap<OwnedUserId, matrix_sdk_base::crypto::UserIdentity> {
183    input.into_iter().map(|(k, v)| (k, v.underlying_identity())).collect()
184}
185
186fn wrap_room_member_events(
187    room: &Room,
188) -> (EventHandlerDropGuard, impl Stream<Item = RoomIdentityChange> + use<>) {
189    let own_user_id = room.own_user_id().to_owned();
190    let room_id = room.room_id();
191    let (sender, receiver) = mpsc::channel(16);
192    let handle =
193        room.client.add_room_event_handler(room_id, move |event: SyncRoomMemberEvent| async move {
194            if *event.state_key() == own_user_id {
195                return;
196            }
197            let _: Result<_, _> =
198                sender.send(RoomIdentityChange::SyncRoomMemberEvent(Box::new(event))).await;
199        });
200    let drop_guard = room.client.event_handler_drop_guard(handle);
201    (drop_guard, ReceiverStream::new(receiver))
202}
203
204#[cfg(all(test, not(target_family = "wasm")))]
205mod tests {
206    use std::time::Duration;
207
208    use futures_util::{FutureExt as _, StreamExt as _, pin_mut};
209    use matrix_sdk_base::crypto::IdentityState;
210    use matrix_sdk_test::{async_test, test_json::keys_query_sets::IdentityChangeDataSet};
211    use test_setup::TestSetup;
212
213    use crate::assert_next_with_timeout;
214
215    #[async_test]
216    async fn test_when_user_becomes_unpinned_we_report_it() {
217        // Given a room containing us and Bob
218        let t = TestSetup::new_room_with_other_bob().await;
219
220        // And Bob's identity is pinned
221        t.pin_bob().await;
222
223        // And we are listening for identity changes
224        let stream = t.subscribe_to_identity_status_changes().await;
225        pin_mut!(stream);
226
227        // When Bob becomes unpinned
228        t.unpin_bob().await;
229
230        // Then we were notified about it
231        let change = assert_next_with_timeout!(stream);
232        assert_eq!(change[0].user_id, t.bob_user_id());
233        assert_eq!(change[0].changed_to, IdentityState::PinViolation);
234        assert_eq!(change.len(), 1);
235    }
236
237    #[async_test]
238    async fn test_when_user_becomes_verification_violation_we_report_it() {
239        // Given a room containing us and Bob
240        let t = TestSetup::new_room_with_other_bob().await;
241
242        // And Bob's identity is verified
243        t.verify_bob().await;
244
245        // And we are listening for identity changes
246        let stream = t.subscribe_to_identity_status_changes().await;
247        pin_mut!(stream);
248
249        // When Bob's identity changes
250        t.unpin_bob().await;
251
252        // Then we were notified about a verification violation
253        let change = assert_next_with_timeout!(stream);
254        assert_eq!(change[0].user_id, t.bob_user_id());
255        assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
256        assert_eq!(change.len(), 1);
257    }
258
259    #[async_test]
260    async fn test_when_user_becomes_pinned_we_report_it() {
261        // Given a room containing us and Bob
262        let t = TestSetup::new_room_with_other_bob().await;
263
264        // And Bob's identity is unpinned
265        t.unpin_bob().await;
266
267        // And we are listening for identity changes
268        let stream = t.subscribe_to_identity_status_changes().await;
269        pin_mut!(stream);
270
271        // When Bob becomes pinned
272        t.pin_bob().await;
273
274        // Then we were notified about the initial state of the room
275        let change1 = assert_next_with_timeout!(stream);
276        assert_eq!(change1[0].user_id, t.bob_user_id());
277        assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
278        assert_eq!(change1.len(), 1);
279
280        // And the change when Bob became pinned
281        let change2 = assert_next_with_timeout!(stream);
282        assert_eq!(change2[0].user_id, t.bob_user_id());
283        assert_eq!(change2[0].changed_to, IdentityState::Pinned);
284        assert_eq!(change2.len(), 1);
285    }
286
287    #[async_test]
288    async fn test_when_user_becomes_verified_we_report_it() {
289        // Given a room containing us and Bob
290        let t = TestSetup::new_room_with_other_bob().await;
291
292        // And we are listening for identity changes
293        let stream = t.subscribe_to_identity_status_changes().await;
294        pin_mut!(stream);
295
296        // When Bob becomes verified
297        t.verify_bob().await;
298
299        // Then we are notified about Bob being verified
300        let change = assert_next_with_timeout!(stream);
301        assert_eq!(change[0].user_id, t.bob_user_id());
302        assert_eq!(change[0].changed_to, IdentityState::Verified);
303        assert_eq!(change.len(), 1);
304
305        // (And then unpinned, so we have something to come through the stream)
306        t.unpin_bob().await;
307
308        // Then we are notified about the unpinning part
309        let change = assert_next_with_timeout!(stream);
310        assert_eq!(change[0].user_id, t.bob_user_id());
311        assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
312        assert_eq!(change.len(), 1);
313    }
314
315    #[async_test]
316    async fn test_when_an_unpinned_user_becomes_verified_we_report_it() {
317        // Given a room containing us and Bob
318        let t = TestSetup::new_room_with_other_bob().await;
319
320        // And Bob's identity is unpinned
321        t.unpin_bob_with(IdentityChangeDataSet::key_query_with_identity_a()).await;
322
323        // And we are listening for identity changes
324        let stream = t.subscribe_to_identity_status_changes().await;
325        pin_mut!(stream);
326
327        // When Bob becomes verified
328        t.verify_bob().await;
329
330        // Then we were notified about the initial state of the room
331        let change1 = assert_next_with_timeout!(stream);
332        assert_eq!(change1[0].user_id, t.bob_user_id());
333        assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
334        assert_eq!(change1.len(), 1);
335
336        // And the change when Bob became verified
337        let change2 = assert_next_with_timeout!(stream);
338        assert_eq!(change2[0].user_id, t.bob_user_id());
339        assert_eq!(change2[0].changed_to, IdentityState::Verified);
340        assert_eq!(change2.len(), 1);
341    }
342
343    #[async_test]
344    async fn test_when_user_in_verification_violation_becomes_verified_we_report_it() {
345        // Given a room containing us and Bob
346        let t = TestSetup::new_room_with_other_bob().await;
347
348        // And Bob is in verification violation
349        t.verify_bob_with(
350            IdentityChangeDataSet::key_query_with_identity_b(),
351            IdentityChangeDataSet::master_signing_keys_b(),
352            IdentityChangeDataSet::self_signing_keys_b(),
353        )
354        .await;
355        t.unpin_bob().await;
356
357        // And we are listening for identity changes
358        let stream = t.subscribe_to_identity_status_changes().await;
359        pin_mut!(stream);
360
361        // When Bob becomes verified
362        t.verify_bob().await;
363
364        // Then we were notified about the initial state of the room
365        let change1 = assert_next_with_timeout!(stream);
366        assert_eq!(change1[0].user_id, t.bob_user_id());
367        assert_eq!(change1[0].changed_to, IdentityState::VerificationViolation);
368        assert_eq!(change1.len(), 1);
369
370        // And the change when Bob became verified
371        let change2 = assert_next_with_timeout!(stream);
372        assert_eq!(change2[0].user_id, t.bob_user_id());
373        assert_eq!(change2[0].changed_to, IdentityState::Verified);
374        assert_eq!(change2.len(), 1);
375    }
376
377    #[async_test]
378    async fn test_when_an_unpinned_user_joins_we_report_it() {
379        // Given a room containing just us
380        let mut t = TestSetup::new_just_me_room().await;
381
382        // And Bob's identity is unpinned
383        t.unpin_bob().await;
384
385        // And we are listening for identity changes
386        let stream = t.subscribe_to_identity_status_changes().await;
387        pin_mut!(stream);
388
389        // When Bob joins the room
390        t.bob_joins().await;
391
392        // Then we were notified about it
393        let change = assert_next_with_timeout!(stream);
394        assert_eq!(change[0].user_id, t.bob_user_id());
395        assert_eq!(change[0].changed_to, IdentityState::PinViolation);
396        assert_eq!(change.len(), 1);
397    }
398
399    #[async_test]
400    async fn test_when_an_verification_violating_user_joins_we_report_it() {
401        // Given a room containing just us
402        let mut t = TestSetup::new_just_me_room().await;
403
404        // And Bob's identity is in verification violation
405        t.verify_bob().await;
406        t.unpin_bob().await;
407
408        // And we are listening for identity changes
409        let stream = t.subscribe_to_identity_status_changes().await;
410        pin_mut!(stream);
411
412        // When Bob joins the room
413        t.bob_joins().await;
414
415        // Then we were notified about it
416        let change = assert_next_with_timeout!(stream);
417        assert_eq!(change[0].user_id, t.bob_user_id());
418        assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
419        assert_eq!(change.len(), 1);
420    }
421
422    #[async_test]
423    async fn test_when_a_verified_user_joins_we_dont_report_it() {
424        // Given a room containing just us
425        let mut t = TestSetup::new_just_me_room().await;
426
427        // And Bob's identity is verified
428        t.verify_bob().await;
429
430        // And we are listening for identity changes
431        let stream = t.subscribe_to_identity_status_changes().await;
432        pin_mut!(stream);
433
434        // When Bob joins the room
435        t.bob_joins().await;
436
437        // (Then becomes unpinned so we have something to report)
438        t.unpin_bob().await;
439
440        //// Then we were only notified about the unpin
441        let change = assert_next_with_timeout!(stream);
442        assert_eq!(change[0].user_id, t.bob_user_id());
443        assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
444        assert_eq!(change.len(), 1);
445    }
446
447    #[async_test]
448    async fn test_when_a_pinned_user_joins_we_do_not_report() {
449        // Given a room containing just us
450        let mut t = TestSetup::new_just_me_room().await;
451
452        // And Bob's identity is unpinned
453        t.pin_bob().await;
454
455        // And we are listening for identity changes
456        let stream = t.subscribe_to_identity_status_changes().await;
457        pin_mut!(stream);
458
459        // When Bob joins the room
460        t.bob_joins().await;
461
462        // Then there is no notification
463        tokio::time::sleep(Duration::from_millis(200)).await;
464        let change = stream.next().now_or_never();
465        assert!(change.is_none());
466    }
467
468    #[async_test]
469    async fn test_when_an_unpinned_user_leaves_we_report_it() {
470        // Given a room containing us and Bob
471        let mut t = TestSetup::new_room_with_other_bob().await;
472
473        // And Bob's identity is unpinned
474        t.unpin_bob().await;
475
476        // And we are listening for identity changes
477        let stream = t.subscribe_to_identity_status_changes().await;
478        pin_mut!(stream);
479
480        // When Bob leaves the room
481        t.bob_leaves().await;
482
483        // Then we were notified about the initial state of the room
484        let change1 = assert_next_with_timeout!(stream);
485        assert_eq!(change1[0].user_id, t.bob_user_id());
486        assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
487        assert_eq!(change1.len(), 1);
488
489        // And we were notified about the change when the user left
490        let change2 = assert_next_with_timeout!(stream);
491        // Note: the user left the room, but we see that as them "becoming pinned" i.e.
492        // "you no longer need to notify about this user".
493        assert_eq!(change2[0].user_id, t.bob_user_id());
494        assert_eq!(change2[0].changed_to, IdentityState::Pinned);
495        assert_eq!(change2.len(), 1);
496    }
497
498    #[async_test]
499    async fn test_multiple_identity_changes_are_reported() {
500        // Given a room containing just us
501        let mut t = TestSetup::new_just_me_room().await;
502
503        // And Bob's identity is unpinned
504        t.unpin_bob().await;
505
506        // And we are listening for identity changes
507        let stream = t.subscribe_to_identity_status_changes().await;
508        pin_mut!(stream);
509
510        // NOTE: below we pull the changes out of the subscription after each action.
511        // This makes sure that the identity changes and membership changes are properly
512        // ordered. If we pull them out later, the identity changes get shifted forward
513        // because they rely on less-complex async stuff under the hood. Calling
514        // next_change ends up winding the async machinery sufficiently that the
515        // membership change and any subsequent events have fully completed.
516
517        // When Bob joins the room ...
518        t.bob_joins().await;
519        let change1 = assert_next_with_timeout!(stream);
520
521        // ... becomes pinned ...
522        t.pin_bob().await;
523        let change2 = assert_next_with_timeout!(stream);
524
525        // ... leaves and joins again (ignored since they stay pinned) ...
526        t.bob_leaves().await;
527        t.bob_joins().await;
528
529        // ... becomes unpinned ...
530        t.unpin_bob().await;
531        let change3 = assert_next_with_timeout!(stream);
532
533        // ... and leaves.
534        t.bob_leaves().await;
535        let change4 = assert_next_with_timeout!(stream);
536
537        assert_eq!(change1[0].user_id, t.bob_user_id());
538        assert_eq!(change2[0].user_id, t.bob_user_id());
539        assert_eq!(change3[0].user_id, t.bob_user_id());
540        assert_eq!(change4[0].user_id, t.bob_user_id());
541
542        assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
543        assert_eq!(change2[0].changed_to, IdentityState::Pinned);
544        assert_eq!(change3[0].changed_to, IdentityState::PinViolation);
545        assert_eq!(change4[0].changed_to, IdentityState::Pinned);
546
547        assert_eq!(change1.len(), 1);
548        assert_eq!(change2.len(), 1);
549        assert_eq!(change3.len(), 1);
550        assert_eq!(change4.len(), 1);
551    }
552
553    #[async_test]
554    async fn test_when_an_unpinned_user_is_already_present_we_report_it_immediately() {
555        // Given a room containing Bob, who is unpinned
556        let t = TestSetup::new_room_with_other_bob().await;
557        t.unpin_bob().await;
558
559        // When we start listening for identity changes
560        let stream = t.subscribe_to_identity_status_changes().await;
561        pin_mut!(stream);
562
563        // Then we were immediately notified about Bob being unpinned
564        let change = assert_next_with_timeout!(stream);
565        assert_eq!(change[0].user_id, t.bob_user_id());
566        assert_eq!(change[0].changed_to, IdentityState::PinViolation);
567        assert_eq!(change.len(), 1);
568    }
569
570    #[async_test]
571    async fn test_when_a_verified_user_is_already_present_we_dont_report_it() {
572        // Given a room containing Bob, who is unpinned
573        let t = TestSetup::new_room_with_other_bob().await;
574        t.verify_bob().await;
575
576        // When we start listening for identity changes
577        let stream = t.subscribe_to_identity_status_changes().await;
578        pin_mut!(stream);
579
580        // (And we unpin so that something is available in the changes stream)
581        t.unpin_bob().await;
582
583        // Then we were only notified about the unpin, not being verified
584        let next_change = assert_next_with_timeout!(stream);
585
586        assert_eq!(next_change[0].user_id, t.bob_user_id());
587        assert_eq!(next_change[0].changed_to, IdentityState::VerificationViolation);
588        assert_eq!(next_change.len(), 1);
589    }
590
591    // TODO: I (andyb) haven't figured out how to test room membership changes that
592    // affect our own user (they should not be shown). Specifically, I haven't
593    // figure out how to get out own user into a non-pinned state.
594
595    mod test_setup {
596        use std::time::{SystemTime, UNIX_EPOCH};
597
598        use futures_core::Stream;
599        use matrix_sdk_base::{
600            RoomState,
601            crypto::{
602                IdentityStatusChange, OtherUserIdentity,
603                testing::simulate_key_query_response_for_verification,
604            },
605        };
606        use matrix_sdk_test::{
607            DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder,
608            test_json, test_json::keys_query_sets::IdentityChangeDataSet,
609        };
610        use ruma::{
611            OwnedUserId, TransactionId, UserId,
612            api::client::keys::{get_keys, get_keys::v3::Response as KeyQueryResponse},
613            events::room::member::MembershipState,
614            owned_user_id,
615        };
616        use serde_json::json;
617        use wiremock::{
618            Mock, MockServer, ResponseTemplate,
619            matchers::{header, method, path_regex},
620        };
621
622        use crate::{
623            Client, Room, encryption::identities::UserIdentity, test_utils::logged_in_client,
624        };
625
626        /// Sets up a client and a room and allows changing user identities and
627        /// room memberships. Note: most methods e.g. [`TestSetup::bob_user_id`]
628        /// are talking about the OTHER user, not our own user. Only
629        /// methods starting with `self_` are talking about this user.
630        ///
631        /// This user is called `@example:localhost` but is rarely used
632        /// mentioned.
633        ///
634        /// The other user is called `@bob:localhost`.
635        pub(super) struct TestSetup {
636            client: Client,
637            bob_user_id: OwnedUserId,
638            sync_response_builder: SyncResponseBuilder,
639            room: Room,
640        }
641
642        impl TestSetup {
643            pub(super) async fn new_just_me_room() -> Self {
644                let (client, user_id, mut sync_response_builder) = Self::init().await;
645                let room = create_just_me_room(&client, &mut sync_response_builder).await;
646                Self { client, bob_user_id: user_id, sync_response_builder, room }
647            }
648
649            pub(super) async fn new_room_with_other_bob() -> Self {
650                let (client, bob_user_id, mut sync_response_builder) = Self::init().await;
651                let room = create_room_with_other_member(
652                    &mut sync_response_builder,
653                    &client,
654                    &bob_user_id,
655                )
656                .await;
657                Self { client, bob_user_id, sync_response_builder, room }
658            }
659
660            pub(super) fn bob_user_id(&self) -> &UserId {
661                &self.bob_user_id
662            }
663
664            pub(super) async fn pin_bob(&self) {
665                if self.bob_user_identity().await.is_some() {
666                    assert!(
667                        !self.bob_is_pinned().await,
668                        "pin_bob() called when the identity is already pinned!"
669                    );
670
671                    // Pin it
672                    self.bob_user_identity()
673                        .await
674                        .expect("User should exist")
675                        .pin()
676                        .await
677                        .expect("Should not fail to pin");
678                } else {
679                    // There was no existing identity. Set one. It will be pinned by default.
680                    self.change_bob_identity(IdentityChangeDataSet::key_query_with_identity_a())
681                        .await;
682                }
683
684                // Sanity check: they are pinned
685                assert!(self.bob_is_pinned().await);
686            }
687
688            pub(super) async fn unpin_bob(&self) {
689                self.unpin_bob_with(IdentityChangeDataSet::key_query_with_identity_b()).await;
690            }
691
692            pub(super) async fn unpin_bob_with(&self, requested: KeyQueryResponse) {
693                fn master_key_json(key_query_response: &KeyQueryResponse) -> String {
694                    serde_json::to_string(
695                        key_query_response
696                            .master_keys
697                            .first_key_value()
698                            .expect("Master key should have a value")
699                            .1,
700                    )
701                    .expect("Should be able to serialise master key")
702                }
703
704                let a = IdentityChangeDataSet::key_query_with_identity_a();
705                let b = IdentityChangeDataSet::key_query_with_identity_b();
706                let requested_master_key = master_key_json(&requested);
707                let a_master_key = master_key_json(&a);
708
709                // Change/set their identity pin it, then change it again - this will definitely
710                // unpin, even if the first identity we supply is their very first, making them
711                // initially pinned.
712                if requested_master_key == a_master_key {
713                    self.change_bob_identity(b).await;
714                    if !self.bob_is_pinned().await {
715                        self.pin_bob().await;
716                    }
717                    self.change_bob_identity(a).await;
718                } else {
719                    self.change_bob_identity(a).await;
720                    if !self.bob_is_pinned().await {
721                        self.pin_bob().await;
722                    }
723                    self.change_bob_identity(b).await;
724                }
725
726                // Sanity: they are unpinned
727                assert!(!self.bob_is_pinned().await);
728            }
729
730            pub(super) async fn verify_bob(&self) {
731                self.verify_bob_with(
732                    IdentityChangeDataSet::key_query_with_identity_a(),
733                    IdentityChangeDataSet::master_signing_keys_a(),
734                    IdentityChangeDataSet::self_signing_keys_a(),
735                )
736                .await;
737            }
738
739            pub(super) async fn verify_bob_with(
740                &self,
741                key_query: KeyQueryResponse,
742                master_signing_key: serde_json::Value,
743                self_signing_key: serde_json::Value,
744            ) {
745                // Make sure the requested identity is set
746                self.change_bob_identity(key_query).await;
747
748                let my_user_id = self.client.user_id().expect("I should have a user id");
749                let my_identity = self
750                    .client
751                    .encryption()
752                    .get_user_identity(my_user_id)
753                    .await
754                    .expect("Should not fail to get own user identity")
755                    .expect("Should have an own user identity")
756                    .underlying_identity()
757                    .own()
758                    .expect("Our own identity should be of type Own");
759
760                // Get the request
761                let signature_upload_request = self
762                    .bob_crypto_other_identity()
763                    .await
764                    .verify()
765                    .await
766                    .expect("Should be able to verify other identity");
767
768                let verification_response = simulate_key_query_response_for_verification(
769                    signature_upload_request,
770                    my_identity,
771                    my_user_id,
772                    self.bob_user_id(),
773                    master_signing_key,
774                    self_signing_key,
775                );
776
777                // Receive the response into our client
778                self.client
779                    .mark_request_as_sent(&TransactionId::new(), &verification_response)
780                    .await
781                    .unwrap();
782
783                // Sanity: they are verified
784                assert!(self.bob_is_verified().await);
785            }
786
787            pub(super) async fn bob_joins(&mut self) {
788                self.bob_membership_change(MembershipState::Join).await;
789            }
790
791            pub(super) async fn bob_leaves(&mut self) {
792                self.bob_membership_change(MembershipState::Leave).await;
793            }
794
795            pub(super) async fn subscribe_to_identity_status_changes(
796                &self,
797            ) -> impl Stream<Item = Vec<IdentityStatusChange>> + use<> {
798                self.room
799                    .subscribe_to_identity_status_changes()
800                    .await
801                    .expect("Should be able to subscribe")
802            }
803
804            async fn init() -> (Client, OwnedUserId, SyncResponseBuilder) {
805                let (client, _server) = create_client_and_server().await;
806
807                // Ensure our user has cross-signing keys etc.
808                client
809                    .olm_machine()
810                    .await
811                    .as_ref()
812                    .expect("We should have an Olm machine")
813                    .bootstrap_cross_signing(true)
814                    .await
815                    .expect("Should be able to bootstrap cross-signing");
816
817                // Note: if you change the user_id, you will need to change lots of hard-coded
818                // stuff inside IdentityChangeDataSet
819                let bob_user_id = owned_user_id!("@bob:localhost");
820
821                let sync_response_builder = SyncResponseBuilder::default();
822
823                (client, bob_user_id, sync_response_builder)
824            }
825
826            async fn change_bob_identity(
827                &self,
828                key_query_response: get_keys::v3::Response,
829            ) -> OtherUserIdentity {
830                self.client
831                    .mark_request_as_sent(&TransactionId::new(), &key_query_response)
832                    .await
833                    .expect("Should not fail to send identity changes");
834
835                self.bob_crypto_other_identity().await
836            }
837
838            async fn bob_membership_change(&mut self, new_state: MembershipState) {
839                let sync_response = self
840                    .sync_response_builder
841                    .add_joined_room(JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID).add_state_event(
842                        StateTestEvent::Custom(sync_response_member(
843                            &self.bob_user_id,
844                            new_state.clone(),
845                        )),
846                    ))
847                    .build_sync_response();
848                self.room.client.process_sync(sync_response).await.unwrap();
849
850                // Make sure the membership stuck as expected
851                let m = self
852                    .room
853                    .get_member_no_sync(&self.bob_user_id)
854                    .await
855                    .expect("Should not fail to get member");
856
857                match (&new_state, m) {
858                    (MembershipState::Leave, None) => {}
859                    (_, None) => {
860                        panic!("Member should exist")
861                    }
862                    (_, Some(m)) => {
863                        assert_eq!(*m.membership(), new_state);
864                    }
865                }
866            }
867
868            async fn bob_is_pinned(&self) -> bool {
869                !self.bob_crypto_other_identity().await.identity_needs_user_approval()
870            }
871
872            async fn bob_is_verified(&self) -> bool {
873                self.bob_crypto_other_identity().await.is_verified()
874            }
875
876            async fn bob_crypto_other_identity(&self) -> OtherUserIdentity {
877                self.bob_user_identity()
878                    .await
879                    .expect("User identity should exist")
880                    .underlying_identity()
881                    .other()
882                    .expect("Identity should be Other, not Own")
883            }
884
885            async fn bob_user_identity(&self) -> Option<UserIdentity> {
886                self.client
887                    .encryption()
888                    .get_user_identity(&self.bob_user_id)
889                    .await
890                    .expect("Should not fail to get user identity")
891            }
892        }
893
894        async fn create_just_me_room(
895            client: &Client,
896            sync_response_builder: &mut SyncResponseBuilder,
897        ) -> Room {
898            let create_room_sync_response = sync_response_builder
899                .add_joined_room(
900                    JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID)
901                        .add_state_event(StateTestEvent::Member),
902                )
903                .build_sync_response();
904            client.process_sync(create_room_sync_response).await.unwrap();
905            let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
906            assert_eq!(room.state(), RoomState::Joined);
907            room
908        }
909
910        async fn create_room_with_other_member(
911            builder: &mut SyncResponseBuilder,
912            client: &Client,
913            other_user_id: &UserId,
914        ) -> Room {
915            let create_room_sync_response = builder
916                .add_joined_room(
917                    JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID)
918                        .add_state_event(StateTestEvent::Member)
919                        .add_state_event(StateTestEvent::Custom(sync_response_member(
920                            other_user_id,
921                            MembershipState::Join,
922                        ))),
923                )
924                .build_sync_response();
925            client.process_sync(create_room_sync_response).await.unwrap();
926            let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
927            room.inner.mark_members_synced();
928
929            assert_eq!(room.state(), RoomState::Joined);
930            assert_eq!(
931                *room
932                    .get_member_no_sync(other_user_id)
933                    .await
934                    .expect("Should not fail to get member")
935                    .expect("Member should exist")
936                    .membership(),
937                MembershipState::Join
938            );
939            room
940        }
941
942        async fn create_client_and_server() -> (Client, MockServer) {
943            let server = MockServer::start().await;
944            mock_members_request(&server).await;
945            mock_secret_storage_default_key(&server).await;
946            let client = logged_in_client(Some(server.uri())).await;
947            (client, server)
948        }
949
950        async fn mock_members_request(server: &MockServer) {
951            Mock::given(method("GET"))
952                .and(path_regex(r"^/_matrix/client/r0/rooms/.*/members"))
953                .and(header("authorization", "Bearer 1234"))
954                .respond_with(
955                    ResponseTemplate::new(200).set_body_json(&*test_json::members::MEMBERS),
956                )
957                .mount(server)
958                .await;
959        }
960
961        async fn mock_secret_storage_default_key(server: &MockServer) {
962            Mock::given(method("GET"))
963                .and(path_regex(
964                    r"^/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
965                ))
966                .and(header("authorization", "Bearer 1234"))
967                .respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
968                .mount(server)
969                .await;
970        }
971
972        fn sync_response_member(
973            user_id: &UserId,
974            membership: MembershipState,
975        ) -> serde_json::Value {
976            json!({
977                "content": {
978                    "membership": membership.to_string(),
979                },
980                "event_id": format!(
981                    "$aa{}bb:localhost",
982                    SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() % 100_000
983                ),
984                "origin_server_ts": 1472735824,
985                "sender": "@example:localhost",
986                "state_key": user_id,
987                "type": "m.room.member",
988                "unsigned": {
989                    "age": 1234
990                }
991            })
992        }
993    }
994}