Skip to main content

matrix_sdk/room/
identity_status_changes.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Facility to track changes to the identity of members of rooms.
16#![cfg(feature = "e2e-encryption")]
17
18use std::collections::BTreeMap;
19
20use async_stream::stream;
21use futures_core::Stream;
22use futures_util::{StreamExt, stream_select};
23use matrix_sdk_base::crypto::{
24    IdentityState, IdentityStatusChange, RoomIdentityChange, RoomIdentityState,
25};
26use ruma::{OwnedUserId, UserId, events::room::member::SyncRoomMemberEvent};
27use tokio::sync::mpsc;
28use tokio_stream::wrappers::ReceiverStream;
29
30use super::Room;
31use crate::{
32    Client, Error, Result,
33    encryption::identities::{IdentityUpdates, UserIdentity},
34    event_handler::EventHandlerDropGuard,
35};
36
37/// Support for creating a stream of batches of [`IdentityStatusChange`].
38///
39/// Internally, this subscribes to all identity changes, and to room events that
40/// change the membership, and provides a stream of all changes to the identity
41/// status of all room members.
42///
43/// This struct does not represent the actual stream, but the state that is used
44/// to produce the values of the stream.
45///
46/// It does provide a method to create the stream:
47/// [`IdentityStatusChanges::create_stream`].
48#[derive(Debug)]
49pub struct IdentityStatusChanges {
50    /// Who is in the room and who is in identity violation at this moment
51    room_identity_state: RoomIdentityState<Room>,
52
53    /// Dropped when we are dropped, and unregisters the event handler we
54    /// registered to listen for room events
55    _drop_guard: EventHandlerDropGuard,
56}
57
58impl IdentityStatusChanges {
59    /// Create a new stream of significant changes to the identity status of
60    /// members of a room.
61    ///
62    /// The "status" of an identity changes when our level of trust in it
63    /// changes.
64    ///
65    /// A "significant" change means a warning should either be added or removed
66    /// (e.g. the user changed from pinned to unpinned (show a warning) or
67    /// from verification violation to pinned (remove a warning). An
68    /// insignificant change would be from pinned to verified - no warning
69    /// is needed in this case.
70    ///
71    /// For example, if an identity is "pinned" i.e. not manually verified, but
72    /// known, and it becomes a "unpinned" i.e. unknown, because the
73    /// encryption keys are different and the user has not acknowledged
74    /// this, then this constitutes a status change. Also, if an identity is
75    /// "unpinned" and becomes "pinned", this is also a status change.
76    ///
77    /// The supplied stream is intended to provide enough information for a
78    /// client to display a list of room members whose identities have
79    /// changed, and allow the user to acknowledge this or act upon it.
80    ///
81    /// The first item in the stream provides the current state of the room:
82    /// each member of the room who is not in "pinned" or "verified" state will
83    /// be included (except the current user).
84    ///
85    /// Note: when an unpinned user leaves a room, an update is generated
86    /// stating that they have become pinned, even though they may not
87    /// necessarily have become pinned, but we don't care any more because they
88    /// left the room.
89    pub async fn create_stream(
90        room: Room,
91    ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>>> {
92        let identity_updates = wrap_identity_updates(&room.client).await?;
93        let (drop_guard, room_member_events) = wrap_room_member_events(&room);
94        let mut unprocessed_stream = combine_streams(identity_updates, room_member_events);
95        let own_user_id = room.client.user_id().ok_or(Error::InsufficientData)?.to_owned();
96
97        let state = IdentityStatusChanges {
98            room_identity_state: RoomIdentityState::new(room).await,
99            _drop_guard: drop_guard,
100        };
101
102        Ok(stream!({
103            // Force enclosing stream to take ownership of state, so that
104            // _drop_guard does not get dropped.
105            let mut state = state;
106
107            let mut current_state =
108                filter_for_initial_update(state.room_identity_state.current_state(), &own_user_id);
109
110            if !current_state.is_empty() {
111                current_state.sort();
112                yield current_state;
113            }
114
115            while let Some(item) = unprocessed_stream.next().await {
116                let mut update = filter_non_self(
117                    state.room_identity_state.process_change(item).await,
118                    &own_user_id,
119                );
120                if !update.is_empty() {
121                    update.sort();
122                    yield update;
123                }
124            }
125        }))
126    }
127}
128
129fn filter_for_initial_update(
130    mut input: Vec<IdentityStatusChange>,
131    own_user_id: &UserId,
132) -> Vec<IdentityStatusChange> {
133    // We are never interested in changes to our own identity, and also for initial
134    // updates, we are only interested in "bad" states where we need to
135    // notify the user, so we can remove Verified states (Pinned states are
136    // already missing, because Pinned is considered the default).
137    input.retain(|change| {
138        change.user_id != own_user_id && change.changed_to != IdentityState::Verified
139    });
140
141    input
142}
143
144fn filter_non_self(
145    mut input: Vec<IdentityStatusChange>,
146    own_user_id: &UserId,
147) -> Vec<IdentityStatusChange> {
148    // We are never interested in changes to our own identity
149    input.retain(|change| change.user_id != own_user_id);
150    input
151}
152
153fn combine_streams(
154    identity_updates: impl Stream<Item = RoomIdentityChange> + Unpin,
155    room_member_events: impl Stream<Item = RoomIdentityChange> + Unpin,
156) -> impl Stream<Item = RoomIdentityChange> {
157    stream_select!(identity_updates, room_member_events)
158}
159
160async fn wrap_identity_updates(
161    client: &Client,
162) -> Result<impl Stream<Item = RoomIdentityChange> + use<>> {
163    Ok(client
164        .encryption()
165        .user_identities_stream()
166        .await?
167        .map(|item| RoomIdentityChange::IdentityUpdates(to_base_updates(item))))
168}
169
170fn to_base_updates(
171    input: IdentityUpdates,
172) -> matrix_sdk_base::crypto::store::types::IdentityUpdates {
173    matrix_sdk_base::crypto::store::types::IdentityUpdates {
174        new: to_base_identities(input.new),
175        changed: to_base_identities(input.changed),
176        unchanged: Default::default(),
177    }
178}
179
180fn to_base_identities(
181    input: BTreeMap<OwnedUserId, UserIdentity>,
182) -> BTreeMap<OwnedUserId, matrix_sdk_base::crypto::UserIdentity> {
183    input.into_iter().map(|(k, v)| (k, v.underlying_identity())).collect()
184}
185
186fn wrap_room_member_events(
187    room: &Room,
188) -> (EventHandlerDropGuard, impl Stream<Item = RoomIdentityChange> + use<>) {
189    let own_user_id = room.own_user_id().to_owned();
190    let room_id = room.room_id();
191    let (sender, receiver) = mpsc::channel(16);
192    let handle =
193        room.client.add_room_event_handler(room_id, move |event: SyncRoomMemberEvent| async move {
194            if *event.state_key() == own_user_id {
195                return;
196            }
197            let _: Result<_, _> =
198                sender.send(RoomIdentityChange::SyncRoomMemberEvent(Box::new(event))).await;
199        });
200    let drop_guard = room.client.event_handler_drop_guard(handle);
201    (drop_guard, ReceiverStream::new(receiver))
202}
203
204#[cfg(all(test, not(target_family = "wasm")))]
205mod tests {
206    use std::time::Duration;
207
208    use futures_util::{FutureExt as _, StreamExt as _, pin_mut};
209    use matrix_sdk_base::crypto::IdentityState;
210    use matrix_sdk_test::{async_test, test_json::keys_query_sets::IdentityChangeDataSet};
211    use test_setup::TestSetup;
212
213    use crate::assert_next_with_timeout;
214
215    #[async_test]
216    async fn test_when_user_becomes_unpinned_we_report_it() {
217        // Given a room containing us and Bob
218        let t = TestSetup::new_room_with_other_bob().await;
219
220        // And Bob's identity is pinned
221        t.pin_bob().await;
222
223        // And we are listening for identity changes
224        let stream = t.subscribe_to_identity_status_changes().await;
225        pin_mut!(stream);
226
227        // When Bob becomes unpinned
228        t.unpin_bob().await;
229
230        // Then we were notified about it
231        let change = assert_next_with_timeout!(stream);
232        assert_eq!(change[0].user_id, t.bob_user_id());
233        assert_eq!(change[0].changed_to, IdentityState::PinViolation);
234        assert_eq!(change.len(), 1);
235    }
236
237    #[async_test]
238    async fn test_when_user_becomes_verification_violation_we_report_it() {
239        // Given a room containing us and Bob
240        let t = TestSetup::new_room_with_other_bob().await;
241
242        // And Bob's identity is verified
243        t.verify_bob().await;
244
245        // And we are listening for identity changes
246        let stream = t.subscribe_to_identity_status_changes().await;
247        pin_mut!(stream);
248
249        // When Bob's identity changes
250        t.unpin_bob().await;
251
252        // Then we were notified about a verification violation
253        let change = assert_next_with_timeout!(stream);
254        assert_eq!(change[0].user_id, t.bob_user_id());
255        assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
256        assert_eq!(change.len(), 1);
257    }
258
259    #[async_test]
260    async fn test_when_user_becomes_pinned_we_report_it() {
261        // Given a room containing us and Bob
262        let t = TestSetup::new_room_with_other_bob().await;
263
264        // And Bob's identity is unpinned
265        t.unpin_bob().await;
266
267        // And we are listening for identity changes
268        let stream = t.subscribe_to_identity_status_changes().await;
269        pin_mut!(stream);
270
271        // When Bob becomes pinned
272        t.pin_bob().await;
273
274        // Then we were notified about the initial state of the room
275        let change1 = assert_next_with_timeout!(stream);
276        assert_eq!(change1[0].user_id, t.bob_user_id());
277        assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
278        assert_eq!(change1.len(), 1);
279
280        // And the change when Bob became pinned
281        let change2 = assert_next_with_timeout!(stream);
282        assert_eq!(change2[0].user_id, t.bob_user_id());
283        assert_eq!(change2[0].changed_to, IdentityState::Pinned);
284        assert_eq!(change2.len(), 1);
285    }
286
287    #[async_test]
288    async fn test_when_user_becomes_verified_we_report_it() {
289        // Given a room containing us and Bob
290        let t = TestSetup::new_room_with_other_bob().await;
291
292        // And we are listening for identity changes
293        let stream = t.subscribe_to_identity_status_changes().await;
294        pin_mut!(stream);
295
296        // When Bob becomes verified
297        t.verify_bob().await;
298
299        // Then we are notified about Bob being verified
300        let change = assert_next_with_timeout!(stream);
301        assert_eq!(change[0].user_id, t.bob_user_id());
302        assert_eq!(change[0].changed_to, IdentityState::Verified);
303        assert_eq!(change.len(), 1);
304
305        // (And then unpinned, so we have something to come through the stream)
306        t.unpin_bob().await;
307
308        // Then we are notified about the unpinning part
309        let change = assert_next_with_timeout!(stream);
310        assert_eq!(change[0].user_id, t.bob_user_id());
311        assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
312        assert_eq!(change.len(), 1);
313    }
314
315    #[async_test]
316    async fn test_when_an_unpinned_user_becomes_verified_we_report_it() {
317        // Given a room containing us and Bob
318        let t = TestSetup::new_room_with_other_bob().await;
319
320        // And Bob's identity is unpinned
321        t.unpin_bob_with(IdentityChangeDataSet::key_query_with_identity_a()).await;
322
323        // And we are listening for identity changes
324        let stream = t.subscribe_to_identity_status_changes().await;
325        pin_mut!(stream);
326
327        // When Bob becomes verified
328        t.verify_bob().await;
329
330        // Then we were notified about the initial state of the room
331        let change1 = assert_next_with_timeout!(stream);
332        assert_eq!(change1[0].user_id, t.bob_user_id());
333        assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
334        assert_eq!(change1.len(), 1);
335
336        // And the change when Bob became verified
337        let change2 = assert_next_with_timeout!(stream);
338        assert_eq!(change2[0].user_id, t.bob_user_id());
339        assert_eq!(change2[0].changed_to, IdentityState::Verified);
340        assert_eq!(change2.len(), 1);
341    }
342
343    #[async_test]
344    async fn test_when_user_in_verification_violation_becomes_verified_we_report_it() {
345        // Given a room containing us and Bob
346        let t = TestSetup::new_room_with_other_bob().await;
347
348        // And Bob is in verification violation
349        t.verify_bob_with(
350            IdentityChangeDataSet::key_query_with_identity_b(),
351            IdentityChangeDataSet::master_signing_keys_b(),
352            IdentityChangeDataSet::self_signing_keys_b(),
353        )
354        .await;
355        t.unpin_bob().await;
356
357        // And we are listening for identity changes
358        let stream = t.subscribe_to_identity_status_changes().await;
359        pin_mut!(stream);
360
361        // When Bob becomes verified
362        t.verify_bob().await;
363
364        // Then we were notified about the initial state of the room
365        let change1 = assert_next_with_timeout!(stream);
366        assert_eq!(change1[0].user_id, t.bob_user_id());
367        assert_eq!(change1[0].changed_to, IdentityState::VerificationViolation);
368        assert_eq!(change1.len(), 1);
369
370        // And the change when Bob became verified
371        let change2 = assert_next_with_timeout!(stream);
372        assert_eq!(change2[0].user_id, t.bob_user_id());
373        assert_eq!(change2[0].changed_to, IdentityState::Verified);
374        assert_eq!(change2.len(), 1);
375    }
376
377    #[async_test]
378    async fn test_when_an_unpinned_user_joins_we_report_it() {
379        // Given a room containing just us
380        let mut t = TestSetup::new_just_me_room().await;
381
382        // And Bob's identity is unpinned
383        t.unpin_bob().await;
384
385        // And we are listening for identity changes
386        let stream = t.subscribe_to_identity_status_changes().await;
387        pin_mut!(stream);
388
389        // When Bob joins the room
390        t.bob_joins().await;
391
392        // Then we were notified about it
393        let change = assert_next_with_timeout!(stream);
394        assert_eq!(change[0].user_id, t.bob_user_id());
395        assert_eq!(change[0].changed_to, IdentityState::PinViolation);
396        assert_eq!(change.len(), 1);
397    }
398
399    #[async_test]
400    async fn test_when_an_verification_violating_user_joins_we_report_it() {
401        // Given a room containing just us
402        let mut t = TestSetup::new_just_me_room().await;
403
404        // And Bob's identity is in verification violation
405        t.verify_bob().await;
406        t.unpin_bob().await;
407
408        // And we are listening for identity changes
409        let stream = t.subscribe_to_identity_status_changes().await;
410        pin_mut!(stream);
411
412        // When Bob joins the room
413        t.bob_joins().await;
414
415        // Then we were notified about it
416        let change = assert_next_with_timeout!(stream);
417        assert_eq!(change[0].user_id, t.bob_user_id());
418        assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
419        assert_eq!(change.len(), 1);
420    }
421
422    #[async_test]
423    async fn test_when_a_verified_user_joins_we_dont_report_it() {
424        // Given a room containing just us
425        let mut t = TestSetup::new_just_me_room().await;
426
427        // And Bob's identity is verified
428        t.verify_bob().await;
429
430        // And we are listening for identity changes
431        let stream = t.subscribe_to_identity_status_changes().await;
432        pin_mut!(stream);
433
434        // When Bob joins the room
435        t.bob_joins().await;
436
437        // (Then becomes unpinned so we have something to report)
438        t.unpin_bob().await;
439
440        //// Then we were only notified about the unpin
441        let change = assert_next_with_timeout!(stream);
442        assert_eq!(change[0].user_id, t.bob_user_id());
443        assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
444        assert_eq!(change.len(), 1);
445    }
446
447    #[async_test]
448    async fn test_when_a_pinned_user_joins_we_do_not_report() {
449        // Given a room containing just us
450        let mut t = TestSetup::new_just_me_room().await;
451
452        // And Bob's identity is unpinned
453        t.pin_bob().await;
454
455        // And we are listening for identity changes
456        let stream = t.subscribe_to_identity_status_changes().await;
457        pin_mut!(stream);
458
459        // When Bob joins the room
460        t.bob_joins().await;
461
462        // Then there is no notification
463        tokio::time::sleep(Duration::from_millis(200)).await;
464        let change = stream.next().now_or_never();
465        assert!(change.is_none());
466    }
467
468    #[async_test]
469    async fn test_when_an_unpinned_user_leaves_we_report_it() {
470        // Given a room containing us and Bob
471        let mut t = TestSetup::new_room_with_other_bob().await;
472
473        // And Bob's identity is unpinned
474        t.unpin_bob().await;
475
476        // And we are listening for identity changes
477        let stream = t.subscribe_to_identity_status_changes().await;
478        pin_mut!(stream);
479
480        // When Bob leaves the room
481        t.bob_leaves().await;
482
483        // Then we were notified about the initial state of the room
484        let change1 = assert_next_with_timeout!(stream);
485        assert_eq!(change1[0].user_id, t.bob_user_id());
486        assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
487        assert_eq!(change1.len(), 1);
488
489        // And we were notified about the change when the user left
490        let change2 = assert_next_with_timeout!(stream);
491        // Note: the user left the room, but we see that as them "becoming pinned" i.e.
492        // "you no longer need to notify about this user".
493        assert_eq!(change2[0].user_id, t.bob_user_id());
494        assert_eq!(change2[0].changed_to, IdentityState::Pinned);
495        assert_eq!(change2.len(), 1);
496    }
497
498    #[async_test]
499    async fn test_multiple_identity_changes_are_reported() {
500        // Given a room containing just us
501        let mut t = TestSetup::new_just_me_room().await;
502
503        // And Bob's identity is unpinned
504        t.unpin_bob().await;
505
506        // And we are listening for identity changes
507        let stream = t.subscribe_to_identity_status_changes().await;
508        pin_mut!(stream);
509
510        // NOTE: below we pull the changes out of the subscription after each action.
511        // This makes sure that the identity changes and membership changes are properly
512        // ordered. If we pull them out later, the identity changes get shifted forward
513        // because they rely on less-complex async stuff under the hood. Calling
514        // next_change ends up winding the async machinery sufficiently that the
515        // membership change and any subsequent events have fully completed.
516
517        // When Bob joins the room ...
518        t.bob_joins().await;
519        let change1 = assert_next_with_timeout!(stream);
520
521        // ... becomes pinned ...
522        t.pin_bob().await;
523        let change2 = assert_next_with_timeout!(stream);
524
525        // ... leaves and joins again (ignored since they stay pinned) ...
526        t.bob_leaves().await;
527        t.bob_joins().await;
528
529        // ... becomes unpinned ...
530        t.unpin_bob().await;
531        let change3 = assert_next_with_timeout!(stream);
532
533        // ... and leaves.
534        t.bob_leaves().await;
535        let change4 = assert_next_with_timeout!(stream);
536
537        assert_eq!(change1[0].user_id, t.bob_user_id());
538        assert_eq!(change2[0].user_id, t.bob_user_id());
539        assert_eq!(change3[0].user_id, t.bob_user_id());
540        assert_eq!(change4[0].user_id, t.bob_user_id());
541
542        assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
543        assert_eq!(change2[0].changed_to, IdentityState::Pinned);
544        assert_eq!(change3[0].changed_to, IdentityState::PinViolation);
545        assert_eq!(change4[0].changed_to, IdentityState::Pinned);
546
547        assert_eq!(change1.len(), 1);
548        assert_eq!(change2.len(), 1);
549        assert_eq!(change3.len(), 1);
550        assert_eq!(change4.len(), 1);
551    }
552
553    #[async_test]
554    async fn test_when_an_unpinned_user_is_already_present_we_report_it_immediately() {
555        // Given a room containing Bob, who is unpinned
556        let t = TestSetup::new_room_with_other_bob().await;
557        t.unpin_bob().await;
558
559        // When we start listening for identity changes
560        let stream = t.subscribe_to_identity_status_changes().await;
561        pin_mut!(stream);
562
563        // Then we were immediately notified about Bob being unpinned
564        let change = assert_next_with_timeout!(stream);
565        assert_eq!(change[0].user_id, t.bob_user_id());
566        assert_eq!(change[0].changed_to, IdentityState::PinViolation);
567        assert_eq!(change.len(), 1);
568    }
569
570    #[async_test]
571    async fn test_when_a_verified_user_is_already_present_we_dont_report_it() {
572        // Given a room containing Bob, who is unpinned
573        let t = TestSetup::new_room_with_other_bob().await;
574        t.verify_bob().await;
575
576        // When we start listening for identity changes
577        let stream = t.subscribe_to_identity_status_changes().await;
578        pin_mut!(stream);
579
580        // (And we unpin so that something is available in the changes stream)
581        t.unpin_bob().await;
582
583        // Then we were only notified about the unpin, not being verified
584        let next_change = assert_next_with_timeout!(stream);
585
586        assert_eq!(next_change[0].user_id, t.bob_user_id());
587        assert_eq!(next_change[0].changed_to, IdentityState::VerificationViolation);
588        assert_eq!(next_change.len(), 1);
589    }
590
591    // TODO: I (andyb) haven't figured out how to test room membership changes that
592    // affect our own user (they should not be shown). Specifically, I haven't
593    // figure out how to get out own user into a non-pinned state.
594
595    mod test_setup {
596        use futures_core::Stream;
597        use matrix_sdk_base::{
598            RoomState,
599            crypto::{
600                IdentityStatusChange, OtherUserIdentity,
601                testing::simulate_key_query_response_for_verification,
602            },
603        };
604        use matrix_sdk_test::{
605            DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder,
606            event_factory::EventFactory, test_json,
607            test_json::keys_query_sets::IdentityChangeDataSet,
608        };
609        use ruma::{
610            OwnedUserId, TransactionId, UserId,
611            api::client::keys::{get_keys, get_keys::v3::Response as KeyQueryResponse},
612            events::room::member::MembershipState,
613            owned_user_id, user_id,
614        };
615        use serde_json::json;
616        use wiremock::{
617            Mock, MockServer, ResponseTemplate,
618            matchers::{header, method, path_regex},
619        };
620
621        use crate::{
622            Client, Room, encryption::identities::UserIdentity, test_utils::logged_in_client,
623        };
624
625        /// Sets up a client and a room and allows changing user identities and
626        /// room memberships. Note: most methods e.g. [`TestSetup::bob_user_id`]
627        /// are talking about the OTHER user, not our own user. Only
628        /// methods starting with `self_` are talking about this user.
629        ///
630        /// This user is called `@example:localhost` but is rarely used
631        /// mentioned.
632        ///
633        /// The other user is called `@bob:localhost`.
634        pub(super) struct TestSetup {
635            client: Client,
636            bob_user_id: OwnedUserId,
637            sync_response_builder: SyncResponseBuilder,
638            room: Room,
639        }
640
641        impl TestSetup {
642            pub(super) async fn new_just_me_room() -> Self {
643                let (client, user_id, mut sync_response_builder) = Self::init().await;
644                let room = create_just_me_room(&client, &mut sync_response_builder).await;
645                Self { client, bob_user_id: user_id, sync_response_builder, room }
646            }
647
648            pub(super) async fn new_room_with_other_bob() -> Self {
649                let (client, bob_user_id, mut sync_response_builder) = Self::init().await;
650                let room = create_room_with_other_member(
651                    &mut sync_response_builder,
652                    &client,
653                    &bob_user_id,
654                )
655                .await;
656                Self { client, bob_user_id, sync_response_builder, room }
657            }
658
659            pub(super) fn bob_user_id(&self) -> &UserId {
660                &self.bob_user_id
661            }
662
663            pub(super) async fn pin_bob(&self) {
664                if self.bob_user_identity().await.is_some() {
665                    assert!(
666                        !self.bob_is_pinned().await,
667                        "pin_bob() called when the identity is already pinned!"
668                    );
669
670                    // Pin it
671                    self.bob_user_identity()
672                        .await
673                        .expect("User should exist")
674                        .pin()
675                        .await
676                        .expect("Should not fail to pin");
677                } else {
678                    // There was no existing identity. Set one. It will be pinned by default.
679                    self.change_bob_identity(IdentityChangeDataSet::key_query_with_identity_a())
680                        .await;
681                }
682
683                // Sanity check: they are pinned
684                assert!(self.bob_is_pinned().await);
685            }
686
687            pub(super) async fn unpin_bob(&self) {
688                self.unpin_bob_with(IdentityChangeDataSet::key_query_with_identity_b()).await;
689            }
690
691            pub(super) async fn unpin_bob_with(&self, requested: KeyQueryResponse) {
692                fn master_key_json(key_query_response: &KeyQueryResponse) -> String {
693                    serde_json::to_string(
694                        key_query_response
695                            .master_keys
696                            .first_key_value()
697                            .expect("Master key should have a value")
698                            .1,
699                    )
700                    .expect("Should be able to serialise master key")
701                }
702
703                let a = IdentityChangeDataSet::key_query_with_identity_a();
704                let b = IdentityChangeDataSet::key_query_with_identity_b();
705                let requested_master_key = master_key_json(&requested);
706                let a_master_key = master_key_json(&a);
707
708                // Change/set their identity pin it, then change it again - this will definitely
709                // unpin, even if the first identity we supply is their very first, making them
710                // initially pinned.
711                if requested_master_key == a_master_key {
712                    self.change_bob_identity(b).await;
713                    if !self.bob_is_pinned().await {
714                        self.pin_bob().await;
715                    }
716                    self.change_bob_identity(a).await;
717                } else {
718                    self.change_bob_identity(a).await;
719                    if !self.bob_is_pinned().await {
720                        self.pin_bob().await;
721                    }
722                    self.change_bob_identity(b).await;
723                }
724
725                // Sanity: they are unpinned
726                assert!(!self.bob_is_pinned().await);
727            }
728
729            pub(super) async fn verify_bob(&self) {
730                self.verify_bob_with(
731                    IdentityChangeDataSet::key_query_with_identity_a(),
732                    IdentityChangeDataSet::master_signing_keys_a(),
733                    IdentityChangeDataSet::self_signing_keys_a(),
734                )
735                .await;
736            }
737
738            pub(super) async fn verify_bob_with(
739                &self,
740                key_query: KeyQueryResponse,
741                master_signing_key: serde_json::Value,
742                self_signing_key: serde_json::Value,
743            ) {
744                // Make sure the requested identity is set
745                self.change_bob_identity(key_query).await;
746
747                let my_user_id = self.client.user_id().expect("I should have a user id");
748                let my_identity = self
749                    .client
750                    .encryption()
751                    .get_user_identity(my_user_id)
752                    .await
753                    .expect("Should not fail to get own user identity")
754                    .expect("Should have an own user identity")
755                    .underlying_identity()
756                    .own()
757                    .expect("Our own identity should be of type Own");
758
759                // Get the request
760                let signature_upload_request = self
761                    .bob_crypto_other_identity()
762                    .await
763                    .verify()
764                    .await
765                    .expect("Should be able to verify other identity");
766
767                let verification_response = simulate_key_query_response_for_verification(
768                    signature_upload_request,
769                    my_identity,
770                    my_user_id,
771                    self.bob_user_id(),
772                    master_signing_key,
773                    self_signing_key,
774                );
775
776                // Receive the response into our client
777                self.client
778                    .mark_request_as_sent(&TransactionId::new(), &verification_response)
779                    .await
780                    .unwrap();
781
782                // Sanity: they are verified
783                assert!(self.bob_is_verified().await);
784            }
785
786            pub(super) async fn bob_joins(&mut self) {
787                self.bob_membership_change(MembershipState::Join).await;
788            }
789
790            pub(super) async fn bob_leaves(&mut self) {
791                self.bob_membership_change(MembershipState::Leave).await;
792            }
793
794            pub(super) async fn subscribe_to_identity_status_changes(
795                &self,
796            ) -> impl Stream<Item = Vec<IdentityStatusChange>> + use<> {
797                self.room
798                    .subscribe_to_identity_status_changes()
799                    .await
800                    .expect("Should be able to subscribe")
801            }
802
803            async fn init() -> (Client, OwnedUserId, SyncResponseBuilder) {
804                let (client, _server) = create_client_and_server().await;
805
806                // Ensure our user has cross-signing keys etc.
807                client
808                    .olm_machine()
809                    .await
810                    .as_ref()
811                    .expect("We should have an Olm machine")
812                    .bootstrap_cross_signing(true)
813                    .await
814                    .expect("Should be able to bootstrap cross-signing");
815
816                // Note: if you change the user_id, you will need to change lots of hard-coded
817                // stuff inside IdentityChangeDataSet
818                let bob_user_id = owned_user_id!("@bob:localhost");
819
820                let sync_response_builder = SyncResponseBuilder::default();
821
822                (client, bob_user_id, sync_response_builder)
823            }
824
825            async fn change_bob_identity(
826                &self,
827                key_query_response: get_keys::v3::Response,
828            ) -> OtherUserIdentity {
829                self.client
830                    .mark_request_as_sent(&TransactionId::new(), &key_query_response)
831                    .await
832                    .expect("Should not fail to send identity changes");
833
834                self.bob_crypto_other_identity().await
835            }
836
837            async fn bob_membership_change(&mut self, new_state: MembershipState) {
838                let f = EventFactory::new().sender(user_id!("@example:localhost"));
839                let sync_response = self
840                    .sync_response_builder
841                    .add_joined_room(
842                        JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID).add_state_event(
843                            f.member(&self.bob_user_id).membership(new_state.clone()),
844                        ),
845                    )
846                    .build_sync_response();
847                self.room.client.process_sync(sync_response).await.unwrap();
848
849                // Make sure the membership stuck as expected
850                let m = self
851                    .room
852                    .get_member_no_sync(&self.bob_user_id)
853                    .await
854                    .expect("Should not fail to get member");
855
856                match (&new_state, m) {
857                    (MembershipState::Leave, None) => {}
858                    (_, None) => {
859                        panic!("Member should exist")
860                    }
861                    (_, Some(m)) => {
862                        assert_eq!(*m.membership(), new_state);
863                    }
864                }
865            }
866
867            async fn bob_is_pinned(&self) -> bool {
868                !self.bob_crypto_other_identity().await.identity_needs_user_approval()
869            }
870
871            async fn bob_is_verified(&self) -> bool {
872                self.bob_crypto_other_identity().await.is_verified()
873            }
874
875            async fn bob_crypto_other_identity(&self) -> OtherUserIdentity {
876                self.bob_user_identity()
877                    .await
878                    .expect("User identity should exist")
879                    .underlying_identity()
880                    .other()
881                    .expect("Identity should be Other, not Own")
882            }
883
884            async fn bob_user_identity(&self) -> Option<UserIdentity> {
885                self.client
886                    .encryption()
887                    .get_user_identity(&self.bob_user_id)
888                    .await
889                    .expect("Should not fail to get user identity")
890            }
891        }
892
893        async fn create_just_me_room(
894            client: &Client,
895            sync_response_builder: &mut SyncResponseBuilder,
896        ) -> Room {
897            let f = EventFactory::new().sender(user_id!("@example:localhost"));
898            let create_room_sync_response = sync_response_builder
899                .add_joined_room(JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID).add_state_event(
900                    f.member(user_id!("@example:localhost")).display_name("example"),
901                ))
902                .build_sync_response();
903            client.process_sync(create_room_sync_response).await.unwrap();
904            let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
905            assert_eq!(room.state(), RoomState::Joined);
906            room
907        }
908
909        async fn create_room_with_other_member(
910            builder: &mut SyncResponseBuilder,
911            client: &Client,
912            other_user_id: &UserId,
913        ) -> Room {
914            let f = EventFactory::new().sender(user_id!("@example:localhost"));
915            let create_room_sync_response = builder
916                .add_joined_room(
917                    JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID)
918                        .add_state_event(
919                            f.member(user_id!("@example:localhost")).display_name("example"),
920                        )
921                        .add_state_event(f.member(other_user_id).membership(MembershipState::Join)),
922                )
923                .build_sync_response();
924            client.process_sync(create_room_sync_response).await.unwrap();
925            let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
926            room.inner.mark_members_synced();
927
928            assert_eq!(room.state(), RoomState::Joined);
929            assert_eq!(
930                *room
931                    .get_member_no_sync(other_user_id)
932                    .await
933                    .expect("Should not fail to get member")
934                    .expect("Member should exist")
935                    .membership(),
936                MembershipState::Join
937            );
938            room
939        }
940
941        async fn create_client_and_server() -> (Client, MockServer) {
942            let server = MockServer::start().await;
943            mock_members_request(&server).await;
944            mock_secret_storage_default_key(&server).await;
945            let client = logged_in_client(Some(server.uri())).await;
946            (client, server)
947        }
948
949        async fn mock_members_request(server: &MockServer) {
950            Mock::given(method("GET"))
951                .and(path_regex(r"^/_matrix/client/r0/rooms/.*/members"))
952                .and(header("authorization", "Bearer 1234"))
953                .respond_with(
954                    ResponseTemplate::new(200).set_body_json(&*test_json::members::MEMBERS),
955                )
956                .mount(server)
957                .await;
958        }
959
960        async fn mock_secret_storage_default_key(server: &MockServer) {
961            Mock::given(method("GET"))
962                .and(path_regex(
963                    r"^/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
964                ))
965                .and(header("authorization", "Bearer 1234"))
966                .respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
967                .mount(server)
968                .await;
969        }
970    }
971}