matrix_sdk/room/
identity_status_changes.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Facility to track changes to the identity of members of rooms.
16#![cfg(feature = "e2e-encryption")]
17
18use std::collections::BTreeMap;
19
20use async_stream::stream;
21use futures_core::Stream;
22use futures_util::{StreamExt, stream_select};
23use matrix_sdk_base::crypto::{
24    IdentityState, IdentityStatusChange, RoomIdentityChange, RoomIdentityState,
25};
26use ruma::{OwnedUserId, UserId, events::room::member::SyncRoomMemberEvent};
27use tokio::sync::mpsc;
28use tokio_stream::wrappers::ReceiverStream;
29
30use super::Room;
31use crate::{
32    Client, Error, Result,
33    encryption::identities::{IdentityUpdates, UserIdentity},
34    event_handler::EventHandlerDropGuard,
35};
36
37/// Support for creating a stream of batches of [`IdentityStatusChange`].
38///
39/// Internally, this subscribes to all identity changes, and to room events that
40/// change the membership, and provides a stream of all changes to the identity
41/// status of all room members.
42///
43/// This struct does not represent the actual stream, but the state that is used
44/// to produce the values of the stream.
45///
46/// It does provide a method to create the stream:
47/// [`IdentityStatusChanges::create_stream`].
48#[derive(Debug)]
49pub struct IdentityStatusChanges {
50    /// Who is in the room and who is in identity violation at this moment
51    room_identity_state: RoomIdentityState<Room>,
52
53    /// Dropped when we are dropped, and unregisters the event handler we
54    /// registered to listen for room events
55    _drop_guard: EventHandlerDropGuard,
56}
57
58impl IdentityStatusChanges {
59    /// Create a new stream of significant changes to the identity status of
60    /// members of a room.
61    ///
62    /// The "status" of an identity changes when our level of trust in it
63    /// changes.
64    ///
65    /// A "significant" change means a warning should either be added or removed
66    /// (e.g. the user changed from pinned to unpinned (show a warning) or
67    /// from verification violation to pinned (remove a warning). An
68    /// insignificant change would be from pinned to verified - no warning
69    /// is needed in this case.
70    ///
71    /// For example, if an identity is "pinned" i.e. not manually verified, but
72    /// known, and it becomes a "unpinned" i.e. unknown, because the
73    /// encryption keys are different and the user has not acknowledged
74    /// this, then this constitutes a status change. Also, if an identity is
75    /// "unpinned" and becomes "pinned", this is also a status change.
76    ///
77    /// The supplied stream is intended to provide enough information for a
78    /// client to display a list of room members whose identities have
79    /// changed, and allow the user to acknowledge this or act upon it.
80    ///
81    /// The first item in the stream provides the current state of the room:
82    /// each member of the room who is not in "pinned" or "verified" state will
83    /// be included (except the current user).
84    ///
85    /// Note: when an unpinned user leaves a room, an update is generated
86    /// stating that they have become pinned, even though they may not
87    /// necessarily have become pinned, but we don't care any more because they
88    /// left the room.
89    pub async fn create_stream(
90        room: Room,
91    ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>>> {
92        let identity_updates = wrap_identity_updates(&room.client).await?;
93        let (drop_guard, room_member_events) = wrap_room_member_events(&room);
94        let mut unprocessed_stream = combine_streams(identity_updates, room_member_events);
95        let own_user_id = room.client.user_id().ok_or(Error::InsufficientData)?.to_owned();
96
97        let mut state = IdentityStatusChanges {
98            room_identity_state: RoomIdentityState::new(room).await,
99            _drop_guard: drop_guard,
100        };
101
102        Ok(stream!({
103            let mut current_state =
104                filter_for_initial_update(state.room_identity_state.current_state(), &own_user_id);
105
106            if !current_state.is_empty() {
107                current_state.sort();
108                yield current_state;
109            }
110
111            while let Some(item) = unprocessed_stream.next().await {
112                let mut update = filter_non_self(
113                    state.room_identity_state.process_change(item).await,
114                    &own_user_id,
115                );
116                if !update.is_empty() {
117                    update.sort();
118                    yield update;
119                }
120            }
121        }))
122    }
123}
124
125fn filter_for_initial_update(
126    mut input: Vec<IdentityStatusChange>,
127    own_user_id: &UserId,
128) -> Vec<IdentityStatusChange> {
129    // We are never interested in changes to our own identity, and also for initial
130    // updates, we are only interested in "bad" states where we need to
131    // notify the user, so we can remove Verified states (Pinned states are
132    // already missing, because Pinned is considered the default).
133    input.retain(|change| {
134        change.user_id != own_user_id && change.changed_to != IdentityState::Verified
135    });
136
137    input
138}
139
140fn filter_non_self(
141    mut input: Vec<IdentityStatusChange>,
142    own_user_id: &UserId,
143) -> Vec<IdentityStatusChange> {
144    // We are never interested in changes to our own identity
145    input.retain(|change| change.user_id != own_user_id);
146    input
147}
148
149fn combine_streams(
150    identity_updates: impl Stream<Item = RoomIdentityChange> + Unpin,
151    room_member_events: impl Stream<Item = RoomIdentityChange> + Unpin,
152) -> impl Stream<Item = RoomIdentityChange> {
153    stream_select!(identity_updates, room_member_events)
154}
155
156async fn wrap_identity_updates(
157    client: &Client,
158) -> Result<impl Stream<Item = RoomIdentityChange> + use<>> {
159    Ok(client
160        .encryption()
161        .user_identities_stream()
162        .await?
163        .map(|item| RoomIdentityChange::IdentityUpdates(to_base_updates(item))))
164}
165
166fn to_base_updates(
167    input: IdentityUpdates,
168) -> matrix_sdk_base::crypto::store::types::IdentityUpdates {
169    matrix_sdk_base::crypto::store::types::IdentityUpdates {
170        new: to_base_identities(input.new),
171        changed: to_base_identities(input.changed),
172        unchanged: Default::default(),
173    }
174}
175
176fn to_base_identities(
177    input: BTreeMap<OwnedUserId, UserIdentity>,
178) -> BTreeMap<OwnedUserId, matrix_sdk_base::crypto::UserIdentity> {
179    input.into_iter().map(|(k, v)| (k, v.underlying_identity())).collect()
180}
181
182fn wrap_room_member_events(
183    room: &Room,
184) -> (EventHandlerDropGuard, impl Stream<Item = RoomIdentityChange> + use<>) {
185    let own_user_id = room.own_user_id().to_owned();
186    let room_id = room.room_id();
187    let (sender, receiver) = mpsc::channel(16);
188    let handle =
189        room.client.add_room_event_handler(room_id, move |event: SyncRoomMemberEvent| async move {
190            if *event.state_key() == own_user_id {
191                return;
192            }
193            let _: Result<_, _> =
194                sender.send(RoomIdentityChange::SyncRoomMemberEvent(Box::new(event))).await;
195        });
196    let drop_guard = room.client.event_handler_drop_guard(handle);
197    (drop_guard, ReceiverStream::new(receiver))
198}
199
200#[cfg(all(test, not(target_family = "wasm")))]
201mod tests {
202    use std::time::Duration;
203
204    use futures_util::{FutureExt as _, StreamExt as _, pin_mut};
205    use matrix_sdk_base::crypto::IdentityState;
206    use matrix_sdk_test::{async_test, test_json::keys_query_sets::IdentityChangeDataSet};
207    use test_setup::TestSetup;
208
209    use crate::assert_next_with_timeout;
210
211    #[async_test]
212    async fn test_when_user_becomes_unpinned_we_report_it() {
213        // Given a room containing us and Bob
214        let t = TestSetup::new_room_with_other_bob().await;
215
216        // And Bob's identity is pinned
217        t.pin_bob().await;
218
219        // And we are listening for identity changes
220        let stream = t.subscribe_to_identity_status_changes().await;
221        pin_mut!(stream);
222
223        // When Bob becomes unpinned
224        t.unpin_bob().await;
225
226        // Then we were notified about it
227        let change = assert_next_with_timeout!(stream);
228        assert_eq!(change[0].user_id, t.bob_user_id());
229        assert_eq!(change[0].changed_to, IdentityState::PinViolation);
230        assert_eq!(change.len(), 1);
231    }
232
233    #[async_test]
234    async fn test_when_user_becomes_verification_violation_we_report_it() {
235        // Given a room containing us and Bob
236        let t = TestSetup::new_room_with_other_bob().await;
237
238        // And Bob's identity is verified
239        t.verify_bob().await;
240
241        // And we are listening for identity changes
242        let stream = t.subscribe_to_identity_status_changes().await;
243        pin_mut!(stream);
244
245        // When Bob's identity changes
246        t.unpin_bob().await;
247
248        // Then we were notified about a verification violation
249        let change = assert_next_with_timeout!(stream);
250        assert_eq!(change[0].user_id, t.bob_user_id());
251        assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
252        assert_eq!(change.len(), 1);
253    }
254
255    #[async_test]
256    async fn test_when_user_becomes_pinned_we_report_it() {
257        // Given a room containing us and Bob
258        let t = TestSetup::new_room_with_other_bob().await;
259
260        // And Bob's identity is unpinned
261        t.unpin_bob().await;
262
263        // And we are listening for identity changes
264        let stream = t.subscribe_to_identity_status_changes().await;
265        pin_mut!(stream);
266
267        // When Bob becomes pinned
268        t.pin_bob().await;
269
270        // Then we were notified about the initial state of the room
271        let change1 = assert_next_with_timeout!(stream);
272        assert_eq!(change1[0].user_id, t.bob_user_id());
273        assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
274        assert_eq!(change1.len(), 1);
275
276        // And the change when Bob became pinned
277        let change2 = assert_next_with_timeout!(stream);
278        assert_eq!(change2[0].user_id, t.bob_user_id());
279        assert_eq!(change2[0].changed_to, IdentityState::Pinned);
280        assert_eq!(change2.len(), 1);
281    }
282
283    #[async_test]
284    async fn test_when_user_becomes_verified_we_report_it() {
285        // Given a room containing us and Bob
286        let t = TestSetup::new_room_with_other_bob().await;
287
288        // And we are listening for identity changes
289        let stream = t.subscribe_to_identity_status_changes().await;
290        pin_mut!(stream);
291
292        // When Bob becomes verified
293        t.verify_bob().await;
294
295        // Then we are notified about Bob being verified
296        let change = assert_next_with_timeout!(stream);
297        assert_eq!(change[0].user_id, t.bob_user_id());
298        assert_eq!(change[0].changed_to, IdentityState::Verified);
299        assert_eq!(change.len(), 1);
300
301        // (And then unpinned, so we have something to come through the stream)
302        t.unpin_bob().await;
303
304        // Then we are notified about the unpinning part
305        let change = assert_next_with_timeout!(stream);
306        assert_eq!(change[0].user_id, t.bob_user_id());
307        assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
308        assert_eq!(change.len(), 1);
309    }
310
311    #[async_test]
312    async fn test_when_an_unpinned_user_becomes_verified_we_report_it() {
313        // Given a room containing us and Bob
314        let t = TestSetup::new_room_with_other_bob().await;
315
316        // And Bob's identity is unpinned
317        t.unpin_bob_with(IdentityChangeDataSet::key_query_with_identity_a()).await;
318
319        // And we are listening for identity changes
320        let stream = t.subscribe_to_identity_status_changes().await;
321        pin_mut!(stream);
322
323        // When Bob becomes verified
324        t.verify_bob().await;
325
326        // Then we were notified about the initial state of the room
327        let change1 = assert_next_with_timeout!(stream);
328        assert_eq!(change1[0].user_id, t.bob_user_id());
329        assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
330        assert_eq!(change1.len(), 1);
331
332        // And the change when Bob became verified
333        let change2 = assert_next_with_timeout!(stream);
334        assert_eq!(change2[0].user_id, t.bob_user_id());
335        assert_eq!(change2[0].changed_to, IdentityState::Verified);
336        assert_eq!(change2.len(), 1);
337    }
338
339    #[async_test]
340    async fn test_when_user_in_verification_violation_becomes_verified_we_report_it() {
341        // Given a room containing us and Bob
342        let t = TestSetup::new_room_with_other_bob().await;
343
344        // And Bob is in verification violation
345        t.verify_bob_with(
346            IdentityChangeDataSet::key_query_with_identity_b(),
347            IdentityChangeDataSet::master_signing_keys_b(),
348            IdentityChangeDataSet::self_signing_keys_b(),
349        )
350        .await;
351        t.unpin_bob().await;
352
353        // And we are listening for identity changes
354        let stream = t.subscribe_to_identity_status_changes().await;
355        pin_mut!(stream);
356
357        // When Bob becomes verified
358        t.verify_bob().await;
359
360        // Then we were notified about the initial state of the room
361        let change1 = assert_next_with_timeout!(stream);
362        assert_eq!(change1[0].user_id, t.bob_user_id());
363        assert_eq!(change1[0].changed_to, IdentityState::VerificationViolation);
364        assert_eq!(change1.len(), 1);
365
366        // And the change when Bob became verified
367        let change2 = assert_next_with_timeout!(stream);
368        assert_eq!(change2[0].user_id, t.bob_user_id());
369        assert_eq!(change2[0].changed_to, IdentityState::Verified);
370        assert_eq!(change2.len(), 1);
371    }
372
373    #[async_test]
374    async fn test_when_an_unpinned_user_joins_we_report_it() {
375        // Given a room containing just us
376        let mut t = TestSetup::new_just_me_room().await;
377
378        // And Bob's identity is unpinned
379        t.unpin_bob().await;
380
381        // And we are listening for identity changes
382        let stream = t.subscribe_to_identity_status_changes().await;
383        pin_mut!(stream);
384
385        // When Bob joins the room
386        t.bob_joins().await;
387
388        // Then we were notified about it
389        let change = assert_next_with_timeout!(stream);
390        assert_eq!(change[0].user_id, t.bob_user_id());
391        assert_eq!(change[0].changed_to, IdentityState::PinViolation);
392        assert_eq!(change.len(), 1);
393    }
394
395    #[async_test]
396    async fn test_when_an_verification_violating_user_joins_we_report_it() {
397        // Given a room containing just us
398        let mut t = TestSetup::new_just_me_room().await;
399
400        // And Bob's identity is in verification violation
401        t.verify_bob().await;
402        t.unpin_bob().await;
403
404        // And we are listening for identity changes
405        let stream = t.subscribe_to_identity_status_changes().await;
406        pin_mut!(stream);
407
408        // When Bob joins the room
409        t.bob_joins().await;
410
411        // Then we were notified about it
412        let change = assert_next_with_timeout!(stream);
413        assert_eq!(change[0].user_id, t.bob_user_id());
414        assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
415        assert_eq!(change.len(), 1);
416    }
417
418    #[async_test]
419    async fn test_when_a_verified_user_joins_we_dont_report_it() {
420        // Given a room containing just us
421        let mut t = TestSetup::new_just_me_room().await;
422
423        // And Bob's identity is verified
424        t.verify_bob().await;
425
426        // And we are listening for identity changes
427        let stream = t.subscribe_to_identity_status_changes().await;
428        pin_mut!(stream);
429
430        // When Bob joins the room
431        t.bob_joins().await;
432
433        // (Then becomes unpinned so we have something to report)
434        t.unpin_bob().await;
435
436        //// Then we were only notified about the unpin
437        let change = assert_next_with_timeout!(stream);
438        assert_eq!(change[0].user_id, t.bob_user_id());
439        assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
440        assert_eq!(change.len(), 1);
441    }
442
443    #[async_test]
444    async fn test_when_a_pinned_user_joins_we_do_not_report() {
445        // Given a room containing just us
446        let mut t = TestSetup::new_just_me_room().await;
447
448        // And Bob's identity is unpinned
449        t.pin_bob().await;
450
451        // And we are listening for identity changes
452        let stream = t.subscribe_to_identity_status_changes().await;
453        pin_mut!(stream);
454
455        // When Bob joins the room
456        t.bob_joins().await;
457
458        // Then there is no notification
459        tokio::time::sleep(Duration::from_millis(200)).await;
460        let change = stream.next().now_or_never();
461        assert!(change.is_none());
462    }
463
464    #[async_test]
465    async fn test_when_an_unpinned_user_leaves_we_report_it() {
466        // Given a room containing us and Bob
467        let mut t = TestSetup::new_room_with_other_bob().await;
468
469        // And Bob's identity is unpinned
470        t.unpin_bob().await;
471
472        // And we are listening for identity changes
473        let stream = t.subscribe_to_identity_status_changes().await;
474        pin_mut!(stream);
475
476        // When Bob leaves the room
477        t.bob_leaves().await;
478
479        // Then we were notified about the initial state of the room
480        let change1 = assert_next_with_timeout!(stream);
481        assert_eq!(change1[0].user_id, t.bob_user_id());
482        assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
483        assert_eq!(change1.len(), 1);
484
485        // And we were notified about the change when the user left
486        let change2 = assert_next_with_timeout!(stream);
487        // Note: the user left the room, but we see that as them "becoming pinned" i.e.
488        // "you no longer need to notify about this user".
489        assert_eq!(change2[0].user_id, t.bob_user_id());
490        assert_eq!(change2[0].changed_to, IdentityState::Pinned);
491        assert_eq!(change2.len(), 1);
492    }
493
494    #[async_test]
495    async fn test_multiple_identity_changes_are_reported() {
496        // Given a room containing just us
497        let mut t = TestSetup::new_just_me_room().await;
498
499        // And Bob's identity is unpinned
500        t.unpin_bob().await;
501
502        // And we are listening for identity changes
503        let stream = t.subscribe_to_identity_status_changes().await;
504        pin_mut!(stream);
505
506        // NOTE: below we pull the changes out of the subscription after each action.
507        // This makes sure that the identity changes and membership changes are properly
508        // ordered. If we pull them out later, the identity changes get shifted forward
509        // because they rely on less-complex async stuff under the hood. Calling
510        // next_change ends up winding the async machinery sufficiently that the
511        // membership change and any subsequent events have fully completed.
512
513        // When Bob joins the room ...
514        t.bob_joins().await;
515        let change1 = assert_next_with_timeout!(stream);
516
517        // ... becomes pinned ...
518        t.pin_bob().await;
519        let change2 = assert_next_with_timeout!(stream);
520
521        // ... leaves and joins again (ignored since they stay pinned) ...
522        t.bob_leaves().await;
523        t.bob_joins().await;
524
525        // ... becomes unpinned ...
526        t.unpin_bob().await;
527        let change3 = assert_next_with_timeout!(stream);
528
529        // ... and leaves.
530        t.bob_leaves().await;
531        let change4 = assert_next_with_timeout!(stream);
532
533        assert_eq!(change1[0].user_id, t.bob_user_id());
534        assert_eq!(change2[0].user_id, t.bob_user_id());
535        assert_eq!(change3[0].user_id, t.bob_user_id());
536        assert_eq!(change4[0].user_id, t.bob_user_id());
537
538        assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
539        assert_eq!(change2[0].changed_to, IdentityState::Pinned);
540        assert_eq!(change3[0].changed_to, IdentityState::PinViolation);
541        assert_eq!(change4[0].changed_to, IdentityState::Pinned);
542
543        assert_eq!(change1.len(), 1);
544        assert_eq!(change2.len(), 1);
545        assert_eq!(change3.len(), 1);
546        assert_eq!(change4.len(), 1);
547    }
548
549    #[async_test]
550    async fn test_when_an_unpinned_user_is_already_present_we_report_it_immediately() {
551        // Given a room containing Bob, who is unpinned
552        let t = TestSetup::new_room_with_other_bob().await;
553        t.unpin_bob().await;
554
555        // When we start listening for identity changes
556        let stream = t.subscribe_to_identity_status_changes().await;
557        pin_mut!(stream);
558
559        // Then we were immediately notified about Bob being unpinned
560        let change = assert_next_with_timeout!(stream);
561        assert_eq!(change[0].user_id, t.bob_user_id());
562        assert_eq!(change[0].changed_to, IdentityState::PinViolation);
563        assert_eq!(change.len(), 1);
564    }
565
566    #[async_test]
567    async fn test_when_a_verified_user_is_already_present_we_dont_report_it() {
568        // Given a room containing Bob, who is unpinned
569        let t = TestSetup::new_room_with_other_bob().await;
570        t.verify_bob().await;
571
572        // When we start listening for identity changes
573        let stream = t.subscribe_to_identity_status_changes().await;
574        pin_mut!(stream);
575
576        // (And we unpin so that something is available in the changes stream)
577        t.unpin_bob().await;
578
579        // Then we were only notified about the unpin, not being verified
580        let next_change = assert_next_with_timeout!(stream);
581
582        assert_eq!(next_change[0].user_id, t.bob_user_id());
583        assert_eq!(next_change[0].changed_to, IdentityState::VerificationViolation);
584        assert_eq!(next_change.len(), 1);
585    }
586
587    // TODO: I (andyb) haven't figured out how to test room membership changes that
588    // affect our own user (they should not be shown). Specifically, I haven't
589    // figure out how to get out own user into a non-pinned state.
590
591    mod test_setup {
592        use std::time::{SystemTime, UNIX_EPOCH};
593
594        use futures_core::Stream;
595        use matrix_sdk_base::{
596            RoomState,
597            crypto::{
598                IdentityStatusChange, OtherUserIdentity,
599                testing::simulate_key_query_response_for_verification,
600            },
601        };
602        use matrix_sdk_test::{
603            DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder,
604            test_json, test_json::keys_query_sets::IdentityChangeDataSet,
605        };
606        use ruma::{
607            OwnedUserId, TransactionId, UserId,
608            api::client::keys::{get_keys, get_keys::v3::Response as KeyQueryResponse},
609            events::room::member::MembershipState,
610            owned_user_id,
611        };
612        use serde_json::json;
613        use wiremock::{
614            Mock, MockServer, ResponseTemplate,
615            matchers::{header, method, path_regex},
616        };
617
618        use crate::{
619            Client, Room, encryption::identities::UserIdentity, test_utils::logged_in_client,
620        };
621
622        /// Sets up a client and a room and allows changing user identities and
623        /// room memberships. Note: most methods e.g. [`TestSetup::bob_user_id`]
624        /// are talking about the OTHER user, not our own user. Only
625        /// methods starting with `self_` are talking about this user.
626        ///
627        /// This user is called `@example:localhost` but is rarely used
628        /// mentioned.
629        ///
630        /// The other user is called `@bob:localhost`.
631        pub(super) struct TestSetup {
632            client: Client,
633            bob_user_id: OwnedUserId,
634            sync_response_builder: SyncResponseBuilder,
635            room: Room,
636        }
637
638        impl TestSetup {
639            pub(super) async fn new_just_me_room() -> Self {
640                let (client, user_id, mut sync_response_builder) = Self::init().await;
641                let room = create_just_me_room(&client, &mut sync_response_builder).await;
642                Self { client, bob_user_id: user_id, sync_response_builder, room }
643            }
644
645            pub(super) async fn new_room_with_other_bob() -> Self {
646                let (client, bob_user_id, mut sync_response_builder) = Self::init().await;
647                let room = create_room_with_other_member(
648                    &mut sync_response_builder,
649                    &client,
650                    &bob_user_id,
651                )
652                .await;
653                Self { client, bob_user_id, sync_response_builder, room }
654            }
655
656            pub(super) fn bob_user_id(&self) -> &UserId {
657                &self.bob_user_id
658            }
659
660            pub(super) async fn pin_bob(&self) {
661                if self.bob_user_identity().await.is_some() {
662                    assert!(
663                        !self.bob_is_pinned().await,
664                        "pin_bob() called when the identity is already pinned!"
665                    );
666
667                    // Pin it
668                    self.bob_user_identity()
669                        .await
670                        .expect("User should exist")
671                        .pin()
672                        .await
673                        .expect("Should not fail to pin");
674                } else {
675                    // There was no existing identity. Set one. It will be pinned by default.
676                    self.change_bob_identity(IdentityChangeDataSet::key_query_with_identity_a())
677                        .await;
678                }
679
680                // Sanity check: they are pinned
681                assert!(self.bob_is_pinned().await);
682            }
683
684            pub(super) async fn unpin_bob(&self) {
685                self.unpin_bob_with(IdentityChangeDataSet::key_query_with_identity_b()).await;
686            }
687
688            pub(super) async fn unpin_bob_with(&self, requested: KeyQueryResponse) {
689                fn master_key_json(key_query_response: &KeyQueryResponse) -> String {
690                    serde_json::to_string(
691                        key_query_response
692                            .master_keys
693                            .first_key_value()
694                            .expect("Master key should have a value")
695                            .1,
696                    )
697                    .expect("Should be able to serialise master key")
698                }
699
700                let a = IdentityChangeDataSet::key_query_with_identity_a();
701                let b = IdentityChangeDataSet::key_query_with_identity_b();
702                let requested_master_key = master_key_json(&requested);
703                let a_master_key = master_key_json(&a);
704
705                // Change/set their identity pin it, then change it again - this will definitely
706                // unpin, even if the first identity we supply is their very first, making them
707                // initially pinned.
708                if requested_master_key == a_master_key {
709                    self.change_bob_identity(b).await;
710                    if !self.bob_is_pinned().await {
711                        self.pin_bob().await;
712                    }
713                    self.change_bob_identity(a).await;
714                } else {
715                    self.change_bob_identity(a).await;
716                    if !self.bob_is_pinned().await {
717                        self.pin_bob().await;
718                    }
719                    self.change_bob_identity(b).await;
720                }
721
722                // Sanity: they are unpinned
723                assert!(!self.bob_is_pinned().await);
724            }
725
726            pub(super) async fn verify_bob(&self) {
727                self.verify_bob_with(
728                    IdentityChangeDataSet::key_query_with_identity_a(),
729                    IdentityChangeDataSet::master_signing_keys_a(),
730                    IdentityChangeDataSet::self_signing_keys_a(),
731                )
732                .await;
733            }
734
735            pub(super) async fn verify_bob_with(
736                &self,
737                key_query: KeyQueryResponse,
738                master_signing_key: serde_json::Value,
739                self_signing_key: serde_json::Value,
740            ) {
741                // Make sure the requested identity is set
742                self.change_bob_identity(key_query).await;
743
744                let my_user_id = self.client.user_id().expect("I should have a user id");
745                let my_identity = self
746                    .client
747                    .encryption()
748                    .get_user_identity(my_user_id)
749                    .await
750                    .expect("Should not fail to get own user identity")
751                    .expect("Should have an own user identity")
752                    .underlying_identity()
753                    .own()
754                    .expect("Our own identity should be of type Own");
755
756                // Get the request
757                let signature_upload_request = self
758                    .bob_crypto_other_identity()
759                    .await
760                    .verify()
761                    .await
762                    .expect("Should be able to verify other identity");
763
764                let verification_response = simulate_key_query_response_for_verification(
765                    signature_upload_request,
766                    my_identity,
767                    my_user_id,
768                    self.bob_user_id(),
769                    master_signing_key,
770                    self_signing_key,
771                );
772
773                // Receive the response into our client
774                self.client
775                    .mark_request_as_sent(&TransactionId::new(), &verification_response)
776                    .await
777                    .unwrap();
778
779                // Sanity: they are verified
780                assert!(self.bob_is_verified().await);
781            }
782
783            pub(super) async fn bob_joins(&mut self) {
784                self.bob_membership_change(MembershipState::Join).await;
785            }
786
787            pub(super) async fn bob_leaves(&mut self) {
788                self.bob_membership_change(MembershipState::Leave).await;
789            }
790
791            pub(super) async fn subscribe_to_identity_status_changes(
792                &self,
793            ) -> impl Stream<Item = Vec<IdentityStatusChange>> + use<> {
794                self.room
795                    .subscribe_to_identity_status_changes()
796                    .await
797                    .expect("Should be able to subscribe")
798            }
799
800            async fn init() -> (Client, OwnedUserId, SyncResponseBuilder) {
801                let (client, _server) = create_client_and_server().await;
802
803                // Ensure our user has cross-signing keys etc.
804                client
805                    .olm_machine()
806                    .await
807                    .as_ref()
808                    .expect("We should have an Olm machine")
809                    .bootstrap_cross_signing(true)
810                    .await
811                    .expect("Should be able to bootstrap cross-signing");
812
813                // Note: if you change the user_id, you will need to change lots of hard-coded
814                // stuff inside IdentityChangeDataSet
815                let bob_user_id = owned_user_id!("@bob:localhost");
816
817                let sync_response_builder = SyncResponseBuilder::default();
818
819                (client, bob_user_id, sync_response_builder)
820            }
821
822            async fn change_bob_identity(
823                &self,
824                key_query_response: get_keys::v3::Response,
825            ) -> OtherUserIdentity {
826                self.client
827                    .mark_request_as_sent(&TransactionId::new(), &key_query_response)
828                    .await
829                    .expect("Should not fail to send identity changes");
830
831                self.bob_crypto_other_identity().await
832            }
833
834            async fn bob_membership_change(&mut self, new_state: MembershipState) {
835                let sync_response = self
836                    .sync_response_builder
837                    .add_joined_room(JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID).add_state_event(
838                        StateTestEvent::Custom(sync_response_member(
839                            &self.bob_user_id,
840                            new_state.clone(),
841                        )),
842                    ))
843                    .build_sync_response();
844                self.room.client.process_sync(sync_response).await.unwrap();
845
846                // Make sure the membership stuck as expected
847                let m = self
848                    .room
849                    .get_member_no_sync(&self.bob_user_id)
850                    .await
851                    .expect("Should not fail to get member");
852
853                match (&new_state, m) {
854                    (MembershipState::Leave, None) => {}
855                    (_, None) => {
856                        panic!("Member should exist")
857                    }
858                    (_, Some(m)) => {
859                        assert_eq!(*m.membership(), new_state);
860                    }
861                }
862            }
863
864            async fn bob_is_pinned(&self) -> bool {
865                !self.bob_crypto_other_identity().await.identity_needs_user_approval()
866            }
867
868            async fn bob_is_verified(&self) -> bool {
869                self.bob_crypto_other_identity().await.is_verified()
870            }
871
872            async fn bob_crypto_other_identity(&self) -> OtherUserIdentity {
873                self.bob_user_identity()
874                    .await
875                    .expect("User identity should exist")
876                    .underlying_identity()
877                    .other()
878                    .expect("Identity should be Other, not Own")
879            }
880
881            async fn bob_user_identity(&self) -> Option<UserIdentity> {
882                self.client
883                    .encryption()
884                    .get_user_identity(&self.bob_user_id)
885                    .await
886                    .expect("Should not fail to get user identity")
887            }
888        }
889
890        async fn create_just_me_room(
891            client: &Client,
892            sync_response_builder: &mut SyncResponseBuilder,
893        ) -> Room {
894            let create_room_sync_response = sync_response_builder
895                .add_joined_room(
896                    JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID)
897                        .add_state_event(StateTestEvent::Member),
898                )
899                .build_sync_response();
900            client.process_sync(create_room_sync_response).await.unwrap();
901            let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
902            assert_eq!(room.state(), RoomState::Joined);
903            room
904        }
905
906        async fn create_room_with_other_member(
907            builder: &mut SyncResponseBuilder,
908            client: &Client,
909            other_user_id: &UserId,
910        ) -> Room {
911            let create_room_sync_response = builder
912                .add_joined_room(
913                    JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID)
914                        .add_state_event(StateTestEvent::Member)
915                        .add_state_event(StateTestEvent::Custom(sync_response_member(
916                            other_user_id,
917                            MembershipState::Join,
918                        ))),
919                )
920                .build_sync_response();
921            client.process_sync(create_room_sync_response).await.unwrap();
922            let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
923            room.inner.mark_members_synced();
924
925            assert_eq!(room.state(), RoomState::Joined);
926            assert_eq!(
927                *room
928                    .get_member_no_sync(other_user_id)
929                    .await
930                    .expect("Should not fail to get member")
931                    .expect("Member should exist")
932                    .membership(),
933                MembershipState::Join
934            );
935            room
936        }
937
938        async fn create_client_and_server() -> (Client, MockServer) {
939            let server = MockServer::start().await;
940            mock_members_request(&server).await;
941            mock_secret_storage_default_key(&server).await;
942            let client = logged_in_client(Some(server.uri())).await;
943            (client, server)
944        }
945
946        async fn mock_members_request(server: &MockServer) {
947            Mock::given(method("GET"))
948                .and(path_regex(r"^/_matrix/client/r0/rooms/.*/members"))
949                .and(header("authorization", "Bearer 1234"))
950                .respond_with(
951                    ResponseTemplate::new(200).set_body_json(&*test_json::members::MEMBERS),
952                )
953                .mount(server)
954                .await;
955        }
956
957        async fn mock_secret_storage_default_key(server: &MockServer) {
958            Mock::given(method("GET"))
959                .and(path_regex(
960                    r"^/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
961                ))
962                .and(header("authorization", "Bearer 1234"))
963                .respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
964                .mount(server)
965                .await;
966        }
967
968        fn sync_response_member(
969            user_id: &UserId,
970            membership: MembershipState,
971        ) -> serde_json::Value {
972            json!({
973                "content": {
974                    "membership": membership.to_string(),
975                },
976                "event_id": format!(
977                    "$aa{}bb:localhost",
978                    SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() % 100_000
979                ),
980                "origin_server_ts": 1472735824,
981                "sender": "@example:localhost",
982                "state_key": user_id,
983                "type": "m.room.member",
984                "unsigned": {
985                    "age": 1234
986                }
987            })
988        }
989    }
990}