matrix_sdk/room/
identity_status_changes.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Facility to track changes to the identity of members of rooms.
16#![cfg(all(feature = "e2e-encryption", not(target_arch = "wasm32")))]
17
18use std::collections::BTreeMap;
19
20use async_stream::stream;
21use futures_core::Stream;
22use futures_util::{stream_select, StreamExt};
23use matrix_sdk_base::crypto::{
24    IdentityState, IdentityStatusChange, RoomIdentityChange, RoomIdentityState,
25};
26use ruma::{events::room::member::SyncRoomMemberEvent, OwnedUserId, UserId};
27use tokio::sync::mpsc;
28use tokio_stream::wrappers::ReceiverStream;
29
30use super::Room;
31use crate::{
32    encryption::identities::{IdentityUpdates, UserIdentity},
33    event_handler::EventHandlerDropGuard,
34    Client, Error, Result,
35};
36
37/// Support for creating a stream of batches of [`IdentityStatusChange`].
38///
39/// Internally, this subscribes to all identity changes, and to room events that
40/// change the membership, and provides a stream of all changes to the identity
41/// status of all room members.
42///
43/// This struct does not represent the actual stream, but the state that is used
44/// to produce the values of the stream.
45///
46/// It does provide a method to create the stream:
47/// [`IdentityStatusChanges::create_stream`].
48#[derive(Debug)]
49pub struct IdentityStatusChanges {
50    /// Who is in the room and who is in identity violation at this moment
51    room_identity_state: RoomIdentityState<Room>,
52
53    /// Dropped when we are dropped, and unregisters the event handler we
54    /// registered to listen for room events
55    _drop_guard: EventHandlerDropGuard,
56}
57
58impl IdentityStatusChanges {
59    /// Create a new stream of significant changes to the identity status of
60    /// members of a room.
61    ///
62    /// The "status" of an identity changes when our level of trust in it
63    /// changes.
64    ///
65    /// A "significant" change means a warning should either be added or removed
66    /// (e.g. the user changed from pinned to unpinned (show a warning) or
67    /// from verification violation to pinned (remove a warning). An
68    /// insignificant change would be from pinned to verified - no warning
69    /// is needed in this case.
70    ///
71    /// For example, if an identity is "pinned" i.e. not manually verified, but
72    /// known, and it becomes a "unpinned" i.e. unknown, because the
73    /// encryption keys are different and the user has not acknowledged
74    /// this, then this constitutes a status change. Also, if an identity is
75    /// "unpinned" and becomes "pinned", this is also a status change.
76    ///
77    /// The supplied stream is intended to provide enough information for a
78    /// client to display a list of room members whose identities have
79    /// changed, and allow the user to acknowledge this or act upon it.
80    ///
81    /// The first item in the stream provides the current state of the room:
82    /// each member of the room who is not in "pinned" or "verified" state will
83    /// be included (except the current user).
84    ///
85    /// Note: when an unpinned user leaves a room, an update is generated
86    /// stating that they have become pinned, even though they may not
87    /// necessarily have become pinned, but we don't care any more because they
88    /// left the room.
89    pub async fn create_stream(
90        room: Room,
91    ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>>> {
92        let identity_updates = wrap_identity_updates(&room.client).await?;
93        let (drop_guard, room_member_events) = wrap_room_member_events(&room);
94        let mut unprocessed_stream = combine_streams(identity_updates, room_member_events);
95        let own_user_id = room.client.user_id().ok_or(Error::InsufficientData)?.to_owned();
96
97        let mut state = IdentityStatusChanges {
98            room_identity_state: RoomIdentityState::new(room).await,
99            _drop_guard: drop_guard,
100        };
101
102        Ok(stream!({
103            let mut current_state =
104                filter_for_initial_update(state.room_identity_state.current_state(), &own_user_id);
105
106            if !current_state.is_empty() {
107                current_state.sort();
108                yield current_state;
109            }
110
111            while let Some(item) = unprocessed_stream.next().await {
112                let mut update = filter_non_self(
113                    state.room_identity_state.process_change(item).await,
114                    &own_user_id,
115                );
116                if !update.is_empty() {
117                    update.sort();
118                    yield update;
119                }
120            }
121        }))
122    }
123}
124
125fn filter_for_initial_update(
126    mut input: Vec<IdentityStatusChange>,
127    own_user_id: &UserId,
128) -> Vec<IdentityStatusChange> {
129    // We are never interested in changes to our own identity, and also for initial
130    // updates, we are only interested in "bad" states where we need to
131    // notify the user, so we can remove Verified states (Pinned states are
132    // already missing, because Pinned is considered the default).
133    input.retain(|change| {
134        change.user_id != own_user_id && change.changed_to != IdentityState::Verified
135    });
136
137    input
138}
139
140fn filter_non_self(
141    mut input: Vec<IdentityStatusChange>,
142    own_user_id: &UserId,
143) -> Vec<IdentityStatusChange> {
144    // We are never interested in changes to our own identity
145    input.retain(|change| change.user_id != own_user_id);
146    input
147}
148
149fn combine_streams(
150    identity_updates: impl Stream<Item = RoomIdentityChange> + Unpin,
151    room_member_events: impl Stream<Item = RoomIdentityChange> + Unpin,
152) -> impl Stream<Item = RoomIdentityChange> {
153    stream_select!(identity_updates, room_member_events)
154}
155
156async fn wrap_identity_updates(client: &Client) -> Result<impl Stream<Item = RoomIdentityChange>> {
157    Ok(client
158        .encryption()
159        .user_identities_stream()
160        .await?
161        .map(|item| RoomIdentityChange::IdentityUpdates(to_base_updates(item))))
162}
163
164fn to_base_updates(input: IdentityUpdates) -> matrix_sdk_base::crypto::store::IdentityUpdates {
165    matrix_sdk_base::crypto::store::IdentityUpdates {
166        new: to_base_identities(input.new),
167        changed: to_base_identities(input.changed),
168        unchanged: Default::default(),
169    }
170}
171
172fn to_base_identities(
173    input: BTreeMap<OwnedUserId, UserIdentity>,
174) -> BTreeMap<OwnedUserId, matrix_sdk_base::crypto::UserIdentity> {
175    input.into_iter().map(|(k, v)| (k, v.underlying_identity())).collect()
176}
177
178fn wrap_room_member_events(
179    room: &Room,
180) -> (EventHandlerDropGuard, impl Stream<Item = RoomIdentityChange>) {
181    let own_user_id = room.own_user_id().to_owned();
182    let room_id = room.room_id();
183    let (sender, receiver) = mpsc::channel(16);
184    let handle =
185        room.client.add_room_event_handler(room_id, move |event: SyncRoomMemberEvent| async move {
186            if *event.state_key() == own_user_id {
187                return;
188            }
189            let _: Result<_, _> = sender.send(RoomIdentityChange::SyncRoomMemberEvent(event)).await;
190        });
191    let drop_guard = room.client.event_handler_drop_guard(handle);
192    (drop_guard, ReceiverStream::new(receiver))
193}
194
195#[cfg(test)]
196mod tests {
197    use std::time::Duration;
198
199    use futures_util::{pin_mut, FutureExt as _, StreamExt as _};
200    use matrix_sdk_base::crypto::IdentityState;
201    use matrix_sdk_test::{async_test, test_json::keys_query_sets::IdentityChangeDataSet};
202    use test_setup::TestSetup;
203
204    use crate::assert_next_with_timeout;
205
206    #[async_test]
207    async fn test_when_user_becomes_unpinned_we_report_it() {
208        // Given a room containing us and Bob
209        let t = TestSetup::new_room_with_other_bob().await;
210
211        // And Bob's identity is pinned
212        t.pin_bob().await;
213
214        // And we are listening for identity changes
215        let stream = t.subscribe_to_identity_status_changes().await;
216        pin_mut!(stream);
217
218        // When Bob becomes unpinned
219        t.unpin_bob().await;
220
221        // Then we were notified about it
222        let change = assert_next_with_timeout!(stream);
223        assert_eq!(change[0].user_id, t.bob_user_id());
224        assert_eq!(change[0].changed_to, IdentityState::PinViolation);
225        assert_eq!(change.len(), 1);
226    }
227
228    #[async_test]
229    async fn test_when_user_becomes_verification_violation_we_report_it() {
230        // Given a room containing us and Bob
231        let t = TestSetup::new_room_with_other_bob().await;
232
233        // And Bob's identity is verified
234        t.verify_bob().await;
235
236        // And we are listening for identity changes
237        let stream = t.subscribe_to_identity_status_changes().await;
238        pin_mut!(stream);
239
240        // When Bob's identity changes
241        t.unpin_bob().await;
242
243        // Then we were notified about a verification violation
244        let change = assert_next_with_timeout!(stream);
245        assert_eq!(change[0].user_id, t.bob_user_id());
246        assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
247        assert_eq!(change.len(), 1);
248    }
249
250    #[async_test]
251    async fn test_when_user_becomes_pinned_we_report_it() {
252        // Given a room containing us and Bob
253        let t = TestSetup::new_room_with_other_bob().await;
254
255        // And Bob's identity is unpinned
256        t.unpin_bob().await;
257
258        // And we are listening for identity changes
259        let stream = t.subscribe_to_identity_status_changes().await;
260        pin_mut!(stream);
261
262        // When Bob becomes pinned
263        t.pin_bob().await;
264
265        // Then we were notified about the initial state of the room
266        let change1 = assert_next_with_timeout!(stream);
267        assert_eq!(change1[0].user_id, t.bob_user_id());
268        assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
269        assert_eq!(change1.len(), 1);
270
271        // And the change when Bob became pinned
272        let change2 = assert_next_with_timeout!(stream);
273        assert_eq!(change2[0].user_id, t.bob_user_id());
274        assert_eq!(change2[0].changed_to, IdentityState::Pinned);
275        assert_eq!(change2.len(), 1);
276    }
277
278    #[async_test]
279    async fn test_when_user_becomes_verified_we_report_it() {
280        // Given a room containing us and Bob
281        let t = TestSetup::new_room_with_other_bob().await;
282
283        // And we are listening for identity changes
284        let stream = t.subscribe_to_identity_status_changes().await;
285        pin_mut!(stream);
286
287        // When Bob becomes verified
288        t.verify_bob().await;
289
290        // Then we are notified about Bob being verified
291        let change = assert_next_with_timeout!(stream);
292        assert_eq!(change[0].user_id, t.bob_user_id());
293        assert_eq!(change[0].changed_to, IdentityState::Verified);
294        assert_eq!(change.len(), 1);
295
296        // (And then unpinned, so we have something to come through the stream)
297        t.unpin_bob().await;
298
299        // Then we are notified about the unpinning part
300        let change = assert_next_with_timeout!(stream);
301        assert_eq!(change[0].user_id, t.bob_user_id());
302        assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
303        assert_eq!(change.len(), 1);
304    }
305
306    #[async_test]
307    async fn test_when_an_unpinned_user_becomes_verified_we_report_it() {
308        // Given a room containing us and Bob
309        let t = TestSetup::new_room_with_other_bob().await;
310
311        // And Bob's identity is unpinned
312        t.unpin_bob_with(IdentityChangeDataSet::key_query_with_identity_a()).await;
313
314        // And we are listening for identity changes
315        let stream = t.subscribe_to_identity_status_changes().await;
316        pin_mut!(stream);
317
318        // When Bob becomes verified
319        t.verify_bob().await;
320
321        // Then we were notified about the initial state of the room
322        let change1 = assert_next_with_timeout!(stream);
323        assert_eq!(change1[0].user_id, t.bob_user_id());
324        assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
325        assert_eq!(change1.len(), 1);
326
327        // And the change when Bob became verified
328        let change2 = assert_next_with_timeout!(stream);
329        assert_eq!(change2[0].user_id, t.bob_user_id());
330        assert_eq!(change2[0].changed_to, IdentityState::Verified);
331        assert_eq!(change2.len(), 1);
332    }
333
334    #[async_test]
335    async fn test_when_user_in_verification_violation_becomes_verified_we_report_it() {
336        // Given a room containing us and Bob
337        let t = TestSetup::new_room_with_other_bob().await;
338
339        // And Bob is in verification violation
340        t.verify_bob_with(
341            IdentityChangeDataSet::key_query_with_identity_b(),
342            IdentityChangeDataSet::master_signing_keys_b(),
343            IdentityChangeDataSet::self_signing_keys_b(),
344        )
345        .await;
346        t.unpin_bob().await;
347
348        // And we are listening for identity changes
349        let stream = t.subscribe_to_identity_status_changes().await;
350        pin_mut!(stream);
351
352        // When Bob becomes verified
353        t.verify_bob().await;
354
355        // Then we were notified about the initial state of the room
356        let change1 = assert_next_with_timeout!(stream);
357        assert_eq!(change1[0].user_id, t.bob_user_id());
358        assert_eq!(change1[0].changed_to, IdentityState::VerificationViolation);
359        assert_eq!(change1.len(), 1);
360
361        // And the change when Bob became verified
362        let change2 = assert_next_with_timeout!(stream);
363        assert_eq!(change2[0].user_id, t.bob_user_id());
364        assert_eq!(change2[0].changed_to, IdentityState::Verified);
365        assert_eq!(change2.len(), 1);
366    }
367
368    #[async_test]
369    async fn test_when_an_unpinned_user_joins_we_report_it() {
370        // Given a room containing just us
371        let mut t = TestSetup::new_just_me_room().await;
372
373        // And Bob's identity is unpinned
374        t.unpin_bob().await;
375
376        // And we are listening for identity changes
377        let stream = t.subscribe_to_identity_status_changes().await;
378        pin_mut!(stream);
379
380        // When Bob joins the room
381        t.bob_joins().await;
382
383        // Then we were notified about it
384        let change = assert_next_with_timeout!(stream);
385        assert_eq!(change[0].user_id, t.bob_user_id());
386        assert_eq!(change[0].changed_to, IdentityState::PinViolation);
387        assert_eq!(change.len(), 1);
388    }
389
390    #[async_test]
391    async fn test_when_an_verification_violating_user_joins_we_report_it() {
392        // Given a room containing just us
393        let mut t = TestSetup::new_just_me_room().await;
394
395        // And Bob's identity is in verification violation
396        t.verify_bob().await;
397        t.unpin_bob().await;
398
399        // And we are listening for identity changes
400        let stream = t.subscribe_to_identity_status_changes().await;
401        pin_mut!(stream);
402
403        // When Bob joins the room
404        t.bob_joins().await;
405
406        // Then we were notified about it
407        let change = assert_next_with_timeout!(stream);
408        assert_eq!(change[0].user_id, t.bob_user_id());
409        assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
410        assert_eq!(change.len(), 1);
411    }
412
413    #[async_test]
414    async fn test_when_a_verified_user_joins_we_dont_report_it() {
415        // Given a room containing just us
416        let mut t = TestSetup::new_just_me_room().await;
417
418        // And Bob's identity is verified
419        t.verify_bob().await;
420
421        // And we are listening for identity changes
422        let stream = t.subscribe_to_identity_status_changes().await;
423        pin_mut!(stream);
424
425        // When Bob joins the room
426        t.bob_joins().await;
427
428        // (Then becomes unpinned so we have something to report)
429        t.unpin_bob().await;
430
431        //// Then we were only notified about the unpin
432        let change = assert_next_with_timeout!(stream);
433        assert_eq!(change[0].user_id, t.bob_user_id());
434        assert_eq!(change[0].changed_to, IdentityState::VerificationViolation);
435        assert_eq!(change.len(), 1);
436    }
437
438    #[async_test]
439    async fn test_when_a_pinned_user_joins_we_do_not_report() {
440        // Given a room containing just us
441        let mut t = TestSetup::new_just_me_room().await;
442
443        // And Bob's identity is unpinned
444        t.pin_bob().await;
445
446        // And we are listening for identity changes
447        let stream = t.subscribe_to_identity_status_changes().await;
448        pin_mut!(stream);
449
450        // When Bob joins the room
451        t.bob_joins().await;
452
453        // Then there is no notification
454        tokio::time::sleep(Duration::from_millis(200)).await;
455        let change = stream.next().now_or_never();
456        assert!(change.is_none());
457    }
458
459    #[async_test]
460    async fn test_when_an_unpinned_user_leaves_we_report_it() {
461        // Given a room containing us and Bob
462        let mut t = TestSetup::new_room_with_other_bob().await;
463
464        // And Bob's identity is unpinned
465        t.unpin_bob().await;
466
467        // And we are listening for identity changes
468        let stream = t.subscribe_to_identity_status_changes().await;
469        pin_mut!(stream);
470
471        // When Bob leaves the room
472        t.bob_leaves().await;
473
474        // Then we were notified about the initial state of the room
475        let change1 = assert_next_with_timeout!(stream);
476        assert_eq!(change1[0].user_id, t.bob_user_id());
477        assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
478        assert_eq!(change1.len(), 1);
479
480        // And we were notified about the change when the user left
481        let change2 = assert_next_with_timeout!(stream);
482        // Note: the user left the room, but we see that as them "becoming pinned" i.e.
483        // "you no longer need to notify about this user".
484        assert_eq!(change2[0].user_id, t.bob_user_id());
485        assert_eq!(change2[0].changed_to, IdentityState::Pinned);
486        assert_eq!(change2.len(), 1);
487    }
488
489    #[async_test]
490    async fn test_multiple_identity_changes_are_reported() {
491        // Given a room containing just us
492        let mut t = TestSetup::new_just_me_room().await;
493
494        // And Bob's identity is unpinned
495        t.unpin_bob().await;
496
497        // And we are listening for identity changes
498        let stream = t.subscribe_to_identity_status_changes().await;
499        pin_mut!(stream);
500
501        // NOTE: below we pull the changes out of the subscription after each action.
502        // This makes sure that the identity changes and membership changes are properly
503        // ordered. If we pull them out later, the identity changes get shifted forward
504        // because they rely on less-complex async stuff under the hood. Calling
505        // next_change ends up winding the async machinery sufficiently that the
506        // membership change and any subsequent events have fully completed.
507
508        // When Bob joins the room ...
509        t.bob_joins().await;
510        let change1 = assert_next_with_timeout!(stream);
511
512        // ... becomes pinned ...
513        t.pin_bob().await;
514        let change2 = assert_next_with_timeout!(stream);
515
516        // ... leaves and joins again (ignored since they stay pinned) ...
517        t.bob_leaves().await;
518        t.bob_joins().await;
519
520        // ... becomes unpinned ...
521        t.unpin_bob().await;
522        let change3 = assert_next_with_timeout!(stream);
523
524        // ... and leaves.
525        t.bob_leaves().await;
526        let change4 = assert_next_with_timeout!(stream);
527
528        assert_eq!(change1[0].user_id, t.bob_user_id());
529        assert_eq!(change2[0].user_id, t.bob_user_id());
530        assert_eq!(change3[0].user_id, t.bob_user_id());
531        assert_eq!(change4[0].user_id, t.bob_user_id());
532
533        assert_eq!(change1[0].changed_to, IdentityState::PinViolation);
534        assert_eq!(change2[0].changed_to, IdentityState::Pinned);
535        assert_eq!(change3[0].changed_to, IdentityState::PinViolation);
536        assert_eq!(change4[0].changed_to, IdentityState::Pinned);
537
538        assert_eq!(change1.len(), 1);
539        assert_eq!(change2.len(), 1);
540        assert_eq!(change3.len(), 1);
541        assert_eq!(change4.len(), 1);
542    }
543
544    #[async_test]
545    async fn test_when_an_unpinned_user_is_already_present_we_report_it_immediately() {
546        // Given a room containing Bob, who is unpinned
547        let t = TestSetup::new_room_with_other_bob().await;
548        t.unpin_bob().await;
549
550        // When we start listening for identity changes
551        let stream = t.subscribe_to_identity_status_changes().await;
552        pin_mut!(stream);
553
554        // Then we were immediately notified about Bob being unpinned
555        let change = assert_next_with_timeout!(stream);
556        assert_eq!(change[0].user_id, t.bob_user_id());
557        assert_eq!(change[0].changed_to, IdentityState::PinViolation);
558        assert_eq!(change.len(), 1);
559    }
560
561    #[async_test]
562    async fn test_when_a_verified_user_is_already_present_we_dont_report_it() {
563        // Given a room containing Bob, who is unpinned
564        let t = TestSetup::new_room_with_other_bob().await;
565        t.verify_bob().await;
566
567        // When we start listening for identity changes
568        let stream = t.subscribe_to_identity_status_changes().await;
569        pin_mut!(stream);
570
571        // (And we unpin so that something is available in the changes stream)
572        t.unpin_bob().await;
573
574        // Then we were only notified about the unpin, not being verified
575        let next_change = assert_next_with_timeout!(stream);
576
577        assert_eq!(next_change[0].user_id, t.bob_user_id());
578        assert_eq!(next_change[0].changed_to, IdentityState::VerificationViolation);
579        assert_eq!(next_change.len(), 1);
580    }
581
582    // TODO: I (andyb) haven't figured out how to test room membership changes that
583    // affect our own user (they should not be shown). Specifically, I haven't
584    // figure out how to get out own user into a non-pinned state.
585
586    mod test_setup {
587        use std::time::{SystemTime, UNIX_EPOCH};
588
589        use futures_core::Stream;
590        use matrix_sdk_base::{
591            crypto::{
592                testing::simulate_key_query_response_for_verification, IdentityStatusChange,
593                OtherUserIdentity,
594            },
595            RoomState,
596        };
597        use matrix_sdk_test::{
598            test_json, test_json::keys_query_sets::IdentityChangeDataSet, JoinedRoomBuilder,
599            StateTestEvent, SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
600        };
601        use ruma::{
602            api::client::keys::{get_keys, get_keys::v3::Response as KeyQueryResponse},
603            events::room::member::MembershipState,
604            owned_user_id, OwnedUserId, TransactionId, UserId,
605        };
606        use serde_json::json;
607        use wiremock::{
608            matchers::{header, method, path_regex},
609            Mock, MockServer, ResponseTemplate,
610        };
611
612        use crate::{
613            encryption::identities::UserIdentity, test_utils::logged_in_client, Client, Room,
614        };
615
616        /// Sets up a client and a room and allows changing user identities and
617        /// room memberships. Note: most methods e.g. [`TestSetup::bob_user_id`]
618        /// are talking about the OTHER user, not our own user. Only
619        /// methods starting with `self_` are talking about this user.
620        ///
621        /// This user is called `@example:localhost` but is rarely used
622        /// mentioned.
623        ///
624        /// The other user is called `@bob:localhost`.
625        pub(super) struct TestSetup {
626            client: Client,
627            bob_user_id: OwnedUserId,
628            sync_response_builder: SyncResponseBuilder,
629            room: Room,
630        }
631
632        impl TestSetup {
633            pub(super) async fn new_just_me_room() -> Self {
634                let (client, user_id, mut sync_response_builder) = Self::init().await;
635                let room = create_just_me_room(&client, &mut sync_response_builder).await;
636                Self { client, bob_user_id: user_id, sync_response_builder, room }
637            }
638
639            pub(super) async fn new_room_with_other_bob() -> Self {
640                let (client, bob_user_id, mut sync_response_builder) = Self::init().await;
641                let room = create_room_with_other_member(
642                    &mut sync_response_builder,
643                    &client,
644                    &bob_user_id,
645                )
646                .await;
647                Self { client, bob_user_id, sync_response_builder, room }
648            }
649
650            pub(super) fn bob_user_id(&self) -> &UserId {
651                &self.bob_user_id
652            }
653
654            pub(super) async fn pin_bob(&self) {
655                if self.bob_user_identity().await.is_some() {
656                    assert!(
657                        !self.bob_is_pinned().await,
658                        "pin_bob() called when the identity is already pinned!"
659                    );
660
661                    // Pin it
662                    self.bob_user_identity()
663                        .await
664                        .expect("User should exist")
665                        .pin()
666                        .await
667                        .expect("Should not fail to pin");
668                } else {
669                    // There was no existing identity. Set one. It will be pinned by default.
670                    self.change_bob_identity(IdentityChangeDataSet::key_query_with_identity_a())
671                        .await;
672                }
673
674                // Sanity check: they are pinned
675                assert!(self.bob_is_pinned().await);
676            }
677
678            pub(super) async fn unpin_bob(&self) {
679                self.unpin_bob_with(IdentityChangeDataSet::key_query_with_identity_b()).await;
680            }
681
682            pub(super) async fn unpin_bob_with(&self, requested: KeyQueryResponse) {
683                fn master_key_json(key_query_response: &KeyQueryResponse) -> String {
684                    serde_json::to_string(
685                        key_query_response
686                            .master_keys
687                            .first_key_value()
688                            .expect("Master key should have a value")
689                            .1,
690                    )
691                    .expect("Should be able to serialise master key")
692                }
693
694                let a = IdentityChangeDataSet::key_query_with_identity_a();
695                let b = IdentityChangeDataSet::key_query_with_identity_b();
696                let requested_master_key = master_key_json(&requested);
697                let a_master_key = master_key_json(&a);
698
699                // Change/set their identity pin it, then change it again - this will definitely
700                // unpin, even if the first identity we supply is their very first, making them
701                // initially pinned.
702                if requested_master_key == a_master_key {
703                    self.change_bob_identity(b).await;
704                    if !self.bob_is_pinned().await {
705                        self.pin_bob().await;
706                    }
707                    self.change_bob_identity(a).await;
708                } else {
709                    self.change_bob_identity(a).await;
710                    if !self.bob_is_pinned().await {
711                        self.pin_bob().await;
712                    }
713                    self.change_bob_identity(b).await;
714                }
715
716                // Sanity: they are unpinned
717                assert!(!self.bob_is_pinned().await);
718            }
719
720            pub(super) async fn verify_bob(&self) {
721                self.verify_bob_with(
722                    IdentityChangeDataSet::key_query_with_identity_a(),
723                    IdentityChangeDataSet::master_signing_keys_a(),
724                    IdentityChangeDataSet::self_signing_keys_a(),
725                )
726                .await;
727            }
728
729            pub(super) async fn verify_bob_with(
730                &self,
731                key_query: KeyQueryResponse,
732                master_signing_key: serde_json::Value,
733                self_signing_key: serde_json::Value,
734            ) {
735                // Make sure the requested identity is set
736                self.change_bob_identity(key_query).await;
737
738                let my_user_id = self.client.user_id().expect("I should have a user id");
739                let my_identity = self
740                    .client
741                    .encryption()
742                    .get_user_identity(my_user_id)
743                    .await
744                    .expect("Should not fail to get own user identity")
745                    .expect("Should have an own user identity")
746                    .underlying_identity()
747                    .own()
748                    .expect("Our own identity should be of type Own");
749
750                // Get the request
751                let signature_upload_request = self
752                    .bob_crypto_other_identity()
753                    .await
754                    .verify()
755                    .await
756                    .expect("Should be able to verify other identity");
757
758                let verification_response = simulate_key_query_response_for_verification(
759                    signature_upload_request,
760                    my_identity,
761                    my_user_id,
762                    self.bob_user_id(),
763                    master_signing_key,
764                    self_signing_key,
765                );
766
767                // Receive the response into our client
768                self.client
769                    .mark_request_as_sent(&TransactionId::new(), &verification_response)
770                    .await
771                    .unwrap();
772
773                // Sanity: they are verified
774                assert!(self.bob_is_verified().await);
775            }
776
777            pub(super) async fn bob_joins(&mut self) {
778                self.bob_membership_change(MembershipState::Join).await;
779            }
780
781            pub(super) async fn bob_leaves(&mut self) {
782                self.bob_membership_change(MembershipState::Leave).await;
783            }
784
785            pub(super) async fn subscribe_to_identity_status_changes(
786                &self,
787            ) -> impl Stream<Item = Vec<IdentityStatusChange>> {
788                self.room
789                    .subscribe_to_identity_status_changes()
790                    .await
791                    .expect("Should be able to subscribe")
792            }
793
794            async fn init() -> (Client, OwnedUserId, SyncResponseBuilder) {
795                let (client, _server) = create_client_and_server().await;
796
797                // Ensure our user has cross-signing keys etc.
798                client
799                    .olm_machine()
800                    .await
801                    .as_ref()
802                    .expect("We should have an Olm machine")
803                    .bootstrap_cross_signing(true)
804                    .await
805                    .expect("Should be able to bootstrap cross-signing");
806
807                // Note: if you change the user_id, you will need to change lots of hard-coded
808                // stuff inside IdentityChangeDataSet
809                let bob_user_id = owned_user_id!("@bob:localhost");
810
811                let sync_response_builder = SyncResponseBuilder::default();
812
813                (client, bob_user_id, sync_response_builder)
814            }
815
816            async fn change_bob_identity(
817                &self,
818                key_query_response: get_keys::v3::Response,
819            ) -> OtherUserIdentity {
820                self.client
821                    .mark_request_as_sent(&TransactionId::new(), &key_query_response)
822                    .await
823                    .expect("Should not fail to send identity changes");
824
825                self.bob_crypto_other_identity().await
826            }
827
828            async fn bob_membership_change(&mut self, new_state: MembershipState) {
829                let sync_response = self
830                    .sync_response_builder
831                    .add_joined_room(JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID).add_state_event(
832                        StateTestEvent::Custom(sync_response_member(
833                            &self.bob_user_id,
834                            new_state.clone(),
835                        )),
836                    ))
837                    .build_sync_response();
838                self.room.client.process_sync(sync_response).await.unwrap();
839
840                // Make sure the membership stuck as expected
841                let m = self
842                    .room
843                    .get_member_no_sync(&self.bob_user_id)
844                    .await
845                    .expect("Should not fail to get member");
846
847                match (&new_state, m) {
848                    (MembershipState::Leave, None) => {}
849                    (_, None) => {
850                        panic!("Member should exist")
851                    }
852                    (_, Some(m)) => {
853                        assert_eq!(*m.membership(), new_state);
854                    }
855                };
856            }
857
858            async fn bob_is_pinned(&self) -> bool {
859                !self.bob_crypto_other_identity().await.identity_needs_user_approval()
860            }
861
862            async fn bob_is_verified(&self) -> bool {
863                self.bob_crypto_other_identity().await.is_verified()
864            }
865
866            async fn bob_crypto_other_identity(&self) -> OtherUserIdentity {
867                self.bob_user_identity()
868                    .await
869                    .expect("User identity should exist")
870                    .underlying_identity()
871                    .other()
872                    .expect("Identity should be Other, not Own")
873            }
874
875            async fn bob_user_identity(&self) -> Option<UserIdentity> {
876                self.client
877                    .encryption()
878                    .get_user_identity(&self.bob_user_id)
879                    .await
880                    .expect("Should not fail to get user identity")
881            }
882        }
883
884        async fn create_just_me_room(
885            client: &Client,
886            sync_response_builder: &mut SyncResponseBuilder,
887        ) -> Room {
888            let create_room_sync_response = sync_response_builder
889                .add_joined_room(
890                    JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID)
891                        .add_state_event(StateTestEvent::Member),
892                )
893                .build_sync_response();
894            client.process_sync(create_room_sync_response).await.unwrap();
895            let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
896            assert_eq!(room.state(), RoomState::Joined);
897            room
898        }
899
900        async fn create_room_with_other_member(
901            builder: &mut SyncResponseBuilder,
902            client: &Client,
903            other_user_id: &UserId,
904        ) -> Room {
905            let create_room_sync_response = builder
906                .add_joined_room(
907                    JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID)
908                        .add_state_event(StateTestEvent::Member)
909                        .add_state_event(StateTestEvent::Custom(sync_response_member(
910                            other_user_id,
911                            MembershipState::Join,
912                        ))),
913                )
914                .build_sync_response();
915            client.process_sync(create_room_sync_response).await.unwrap();
916            let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
917            room.inner.mark_members_synced();
918
919            assert_eq!(room.state(), RoomState::Joined);
920            assert_eq!(
921                *room
922                    .get_member_no_sync(other_user_id)
923                    .await
924                    .expect("Should not fail to get member")
925                    .expect("Member should exist")
926                    .membership(),
927                MembershipState::Join
928            );
929            room
930        }
931
932        async fn create_client_and_server() -> (Client, MockServer) {
933            let server = MockServer::start().await;
934            mock_members_request(&server).await;
935            mock_secret_storage_default_key(&server).await;
936            let client = logged_in_client(Some(server.uri())).await;
937            (client, server)
938        }
939
940        async fn mock_members_request(server: &MockServer) {
941            Mock::given(method("GET"))
942                .and(path_regex(r"^/_matrix/client/r0/rooms/.*/members"))
943                .and(header("authorization", "Bearer 1234"))
944                .respond_with(
945                    ResponseTemplate::new(200).set_body_json(&*test_json::members::MEMBERS),
946                )
947                .mount(server)
948                .await;
949        }
950
951        async fn mock_secret_storage_default_key(server: &MockServer) {
952            Mock::given(method("GET"))
953                .and(path_regex(
954                    r"^/_matrix/client/r0/user/.*/account_data/m.secret_storage.default_key",
955                ))
956                .and(header("authorization", "Bearer 1234"))
957                .respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
958                .mount(server)
959                .await;
960        }
961
962        fn sync_response_member(
963            user_id: &UserId,
964            membership: MembershipState,
965        ) -> serde_json::Value {
966            json!({
967                "content": {
968                    "membership": membership.to_string(),
969                },
970                "event_id": format!(
971                    "$aa{}bb:localhost",
972                    SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() % 100_000
973                ),
974                "origin_server_ts": 1472735824,
975                "sender": "@example:localhost",
976                "state_key": user_id,
977                "type": "m.room.member",
978                "unsigned": {
979                    "age": 1234
980                }
981            })
982        }
983    }
984}