matrix_sdk_crypto/identities/
manager.rs

1// Copyright 2020 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
17    ops::Deref,
18    sync::Arc,
19    time::Duration,
20};
21
22use futures_util::future::join_all;
23use itertools::Itertools;
24use matrix_sdk_common::{executor::spawn, failures_cache::FailuresCache};
25use ruma::{
26    api::client::keys::get_keys::v3::Response as KeysQueryResponse, serde::Raw, OwnedDeviceId,
27    OwnedServerName, OwnedTransactionId, OwnedUserId, ServerName, TransactionId, UserId,
28};
29use tokio::sync::Mutex;
30use tracing::{debug, enabled, info, instrument, trace, warn, Level};
31
32use crate::{
33    error::OlmResult,
34    identities::{DeviceData, OtherUserIdentityData, OwnUserIdentityData, UserIdentityData},
35    olm::{InboundGroupSession, PrivateCrossSigningIdentity, SenderDataFinder, SenderDataType},
36    store::{
37        caches::SequenceNumber, Changes, DeviceChanges, IdentityChanges, KeyQueryManager,
38        Result as StoreResult, Store, StoreCache, StoreCacheGuard, UserKeyQueryResult,
39    },
40    types::{
41        requests::KeysQueryRequest, CrossSigningKey, DeviceKeys, MasterPubkey, SelfSigningPubkey,
42        UserSigningPubkey,
43    },
44    CryptoStoreError, LocalTrust, OwnUserIdentity, SignatureError, UserIdentity,
45};
46
47enum DeviceChange {
48    New(DeviceData),
49    Updated(DeviceData),
50    None,
51}
52
53/// This enum helps us to distinguish between the changed and unchanged
54/// identity case.
55/// An unchanged identity means same cross signing keys as well as same
56/// set of signatures on the master key.
57enum IdentityUpdateResult {
58    Updated(UserIdentityData),
59    Unchanged(UserIdentityData),
60}
61
62#[derive(Debug, Clone)]
63pub(crate) struct IdentityManager {
64    /// Servers that have previously appeared in the `failures` section of a
65    /// `/keys/query` response.
66    ///
67    /// See also [`crate::session_manager::SessionManager::failures`].
68    failures: FailuresCache<OwnedServerName>,
69    store: Store,
70
71    pub(crate) key_query_manager: Arc<KeyQueryManager>,
72
73    /// Details of the current "in-flight" key query request, if any
74    keys_query_request_details: Arc<Mutex<Option<KeysQueryRequestDetails>>>,
75}
76
77/// Details of an in-flight key query request
78#[derive(Debug, Clone, Default)]
79struct KeysQueryRequestDetails {
80    /// The sequence number, to be passed to
81    /// `Store.mark_tracked_users_as_up_to_date`.
82    sequence_number: SequenceNumber,
83
84    /// A single batch of queries returned by the Store is broken up into one or
85    /// more actual KeysQueryRequests, each with their own request id. We
86    /// record the outstanding request ids here.
87    request_ids: HashSet<OwnedTransactionId>,
88}
89
90// Helper type to handle key query response
91struct KeySetInfo {
92    user_id: OwnedUserId,
93    master_key: MasterPubkey,
94    self_signing: SelfSigningPubkey,
95}
96
97impl IdentityManager {
98    const MAX_KEY_QUERY_USERS: usize = 250;
99
100    pub fn new(store: Store) -> Self {
101        let keys_query_request_details = Mutex::new(None);
102
103        IdentityManager {
104            store,
105            key_query_manager: Default::default(),
106            failures: Default::default(),
107            keys_query_request_details: keys_query_request_details.into(),
108        }
109    }
110
111    fn user_id(&self) -> &UserId {
112        &self.store.static_account().user_id
113    }
114
115    /// Receive a successful `/keys/query` response.
116    ///
117    /// Returns a list of devices newly discovered devices and devices that
118    /// changed.
119    ///
120    /// # Arguments
121    ///
122    /// * `request_id` - The request_id returned by `users_for_key_query` or
123    ///   `build_key_query_for_users`
124    /// * `response` - The response of the `/keys/query` request that the client
125    ///   performed.
126    pub async fn receive_keys_query_response(
127        &self,
128        request_id: &TransactionId,
129        response: &KeysQueryResponse,
130    ) -> OlmResult<(DeviceChanges, IdentityChanges)> {
131        debug!(
132            ?request_id,
133            users = ?response.device_keys.keys().collect::<BTreeSet<_>>(),
134            failures = ?response.failures,
135            "Handling a `/keys/query` response"
136        );
137
138        // Parse the strings into server names and filter out our own server. We should
139        // never get failures from our own server but let's remove it as a
140        // precaution anyways.
141        let failed_servers = response
142            .failures
143            .keys()
144            .filter_map(|k| ServerName::parse(k).ok())
145            .filter(|s| s != self.user_id().server_name());
146        let successful_servers = response.device_keys.keys().map(|u| u.server_name());
147
148        // Append the new failed servers and remove any successful servers. We
149        // need to explicitly remove the successful servers because the cache
150        // doesn't automatically remove entries that elapse. Instead, the effect
151        // is that elapsed servers will be retried and their delays incremented.
152        self.failures.extend(failed_servers);
153        self.failures.remove(successful_servers);
154
155        let devices = self.handle_devices_from_key_query(response.device_keys.clone()).await?;
156        let (identities, cross_signing_identity) = self.handle_cross_signing_keys(response).await?;
157
158        let changes = Changes {
159            identities: identities.clone(),
160            devices: devices.clone(),
161            private_identity: cross_signing_identity,
162            ..Default::default()
163        };
164
165        self.store.save_changes(changes).await?;
166
167        // Update the sender data on any existing inbound group sessions based on the
168        // changes in this response.
169        //
170        // `update_sender_data_from_device_changes` relies on being able to look up the
171        // user identities from the store, so this has to happen *after* the
172        // changes from `handle_cross_signing_keys` are saved.
173        //
174        // Note: it might be possible for this to race against session creation. If a
175        // new session is received at the same time as a `/keys/query` response is being
176        // processed, it could be saved without up-to-date sender data, but it might be
177        // saved too late for it to be picked up by
178        // `update_sender_data_from_device_changes`. However, this should be rare,
179        // since, in general, /sync responses which might create a new session
180        // are not processed at the same time as /keys/query responses (assuming
181        // that the application does not call `OlmMachine::receive_sync_changes`
182        // at the same time as `OlmMachine::mark_request_as_sent`).
183        self.update_sender_data_from_device_changes(&devices).await?;
184
185        // if this request is one of those we expected to be in flight, pass the
186        // sequence number back to the store so that it can mark devices up to
187        // date
188        let sequence_number = {
189            let mut request_details = self.keys_query_request_details.lock().await;
190
191            request_details.as_mut().and_then(|details| {
192                if details.request_ids.remove(request_id) {
193                    Some(details.sequence_number)
194                } else {
195                    None
196                }
197            })
198        };
199
200        if let Some(sequence_number) = sequence_number {
201            let cache = self.store.cache().await?;
202            self.key_query_manager
203                .synced(&cache)
204                .await?
205                .mark_tracked_users_as_up_to_date(
206                    response.device_keys.keys().map(Deref::deref),
207                    sequence_number,
208                )
209                .await?;
210        }
211
212        if enabled!(Level::DEBUG) {
213            debug_log_keys_query_response(&devices, &identities, request_id);
214        }
215
216        Ok((devices, identities))
217    }
218
219    async fn update_or_create_device(
220        store: Store,
221        device_keys: DeviceKeys,
222    ) -> StoreResult<DeviceChange> {
223        let old_device =
224            store.get_device_data(&device_keys.user_id, &device_keys.device_id).await?;
225
226        if let Some(mut device) = old_device {
227            match device.update_device(&device_keys) {
228                Err(e) => {
229                    warn!(
230                        user_id = ?device.user_id(),
231                        device_id = ?device.device_id(),
232                        error = ?e,
233                        "Rejecting device update",
234                    );
235                    Ok(DeviceChange::None)
236                }
237                Ok(true) => Ok(DeviceChange::Updated(device)),
238                Ok(false) => Ok(DeviceChange::None),
239            }
240        } else {
241            match DeviceData::try_from(&device_keys) {
242                Ok(d) => {
243                    // If this is our own device, check that the server isn't
244                    // lying about our keys, also mark the device as locally
245                    // trusted.
246                    if d.user_id() == store.user_id() && d.device_id() == store.device_id() {
247                        let local_device_keys = store.static_account().unsigned_device_keys();
248
249                        if d.keys() == &local_device_keys.keys {
250                            d.set_trust_state(LocalTrust::Verified);
251
252                            trace!(
253                                user_id = ?d.user_id(),
254                                device_id = ?d.device_id(),
255                                keys = ?d.keys(),
256                                "Adding our own device to the device store, \
257                                marking it as locally verified",
258                            );
259
260                            Ok(DeviceChange::New(d))
261                        } else {
262                            Ok(DeviceChange::None)
263                        }
264                    } else {
265                        trace!(
266                            user_id = ?d.user_id(),
267                            device_id = ?d.device_id(),
268                            keys = ?d.keys(),
269                            "Adding a new device to the device store",
270                        );
271
272                        Ok(DeviceChange::New(d))
273                    }
274                }
275                Err(e) => {
276                    warn!(
277                        user_id = ?device_keys.user_id,
278                        device_id = ?device_keys.device_id,
279                        error = ?e,
280                        "Rejecting a previously unseen device",
281                    );
282
283                    Ok(DeviceChange::None)
284                }
285            }
286        }
287    }
288
289    async fn update_user_devices(
290        store: Store,
291        user_id: OwnedUserId,
292        device_map: BTreeMap<OwnedDeviceId, Raw<ruma::encryption::DeviceKeys>>,
293    ) -> StoreResult<DeviceChanges> {
294        let own_device_id = store.static_account().device_id().to_owned();
295
296        let mut changes = DeviceChanges::default();
297
298        let current_devices: HashSet<OwnedDeviceId> = device_map.keys().cloned().collect();
299
300        let tasks = device_map.into_iter().filter_map(|(device_id, device_keys)| match device_keys
301            .deserialize_as::<DeviceKeys>(
302        ) {
303            Ok(device_keys) => {
304                if user_id != device_keys.user_id || device_id != device_keys.device_id {
305                    warn!(
306                        ?user_id,
307                        ?device_id,
308                        device_key_user = ?device_keys.user_id,
309                        device_key_device_id = ?device_keys.device_id,
310                        "Mismatch in the device keys payload",
311                    );
312                    None
313                } else {
314                    Some(spawn(Self::update_or_create_device(store.clone(), device_keys)))
315                }
316            }
317            Err(e) => {
318                warn!(
319                    ?user_id, ?device_id, error = ?e,
320                    "Device keys failed to deserialize",
321                );
322                None
323            }
324        });
325
326        let results = join_all(tasks).await;
327
328        for device in results {
329            let device = device.expect("Creating or updating a device panicked")?;
330
331            match device {
332                DeviceChange::New(d) => changes.new.push(d),
333                DeviceChange::Updated(d) => changes.changed.push(d),
334                DeviceChange::None => (),
335            }
336        }
337
338        let current_devices: HashSet<&OwnedDeviceId> = current_devices.iter().collect();
339        let stored_devices = store.get_device_data_for_user(&user_id).await?;
340        let stored_devices_set: HashSet<&OwnedDeviceId> = stored_devices.keys().collect();
341        let deleted_devices_set = stored_devices_set.difference(&current_devices);
342
343        let own_user_id = store.static_account().user_id();
344        for device_id in deleted_devices_set {
345            if user_id == *own_user_id && *device_id == &own_device_id {
346                let identity_keys = store.static_account().identity_keys();
347
348                warn!(
349                    user_id = ?own_user_id,
350                    device_id = ?own_device_id,
351                    curve25519_key = ?identity_keys.curve25519,
352                    ed25519_key = ?identity_keys.ed25519,
353                    "Our own device might have been deleted"
354                );
355            } else if let Some(device) = stored_devices.get(*device_id) {
356                device.mark_as_deleted();
357                changes.deleted.push(device.clone());
358            }
359        }
360
361        Ok(changes)
362    }
363
364    /// Handle the device keys part of a key query response.
365    ///
366    /// # Arguments
367    ///
368    /// * `device_keys_map` - A map holding the device keys of the users for
369    ///   which the key query was done.
370    ///
371    /// Returns a list of devices that changed. Changed here means either
372    /// they are new, one of their properties has changed or they got deleted.
373    async fn handle_devices_from_key_query(
374        &self,
375        device_keys_map: BTreeMap<
376            OwnedUserId,
377            BTreeMap<OwnedDeviceId, Raw<ruma::encryption::DeviceKeys>>,
378        >,
379    ) -> StoreResult<DeviceChanges> {
380        let mut changes = DeviceChanges::default();
381
382        let tasks = device_keys_map.into_iter().map(|(user_id, device_keys_map)| {
383            spawn(Self::update_user_devices(self.store.clone(), user_id, device_keys_map))
384        });
385
386        let results = join_all(tasks).await;
387
388        for result in results {
389            let change_fragment = result.expect("Panic while updating user devices")?;
390
391            changes.extend(change_fragment);
392        }
393
394        Ok(changes)
395    }
396
397    /// Check if the given public identity matches our stored private one.
398    ///
399    /// If they don't match, this is an indication that our identity has been
400    /// rotated. In this case we return `Some(cleared_private_identity)`,
401    /// where `cleared_private_identity` is our currently-stored
402    /// private identity with the conflicting keys removed.
403    ///
404    /// Otherwise, assuming we do have a private master cross-signing key, we
405    /// mark the public identity as verified.
406    ///
407    /// # Returns
408    ///
409    /// If the private identity needs updating (because it does not match the
410    /// public keys), the updated private identity (which will need to be
411    /// persisted).
412    ///
413    /// Otherwise, `None`.
414    async fn check_private_identity(
415        &self,
416        identity: &OwnUserIdentityData,
417    ) -> Option<PrivateCrossSigningIdentity> {
418        let private_identity = self.store.private_identity();
419        let private_identity = private_identity.lock().await;
420        let result = private_identity.clear_if_differs(identity).await;
421
422        if result.any_differ() {
423            info!(cleared = ?result, "Removed some or all of our private cross signing keys");
424            Some((*private_identity).clone())
425        } else {
426            // If the master key didn't rotate above (`clear_if_differs`),
427            // then this means that the public part and the private parts of
428            // the master key match. We previously did a signature check, so
429            // this means that the private part of the master key has signed
430            // the identity. We can safely mark the public part of the
431            // identity as verified.
432            if private_identity.has_master_key().await && !identity.is_verified() {
433                trace!("Marked our own identity as verified");
434                identity.mark_as_verified()
435            }
436
437            None
438        }
439    }
440
441    /// Process an identity received in a `/keys/query` response that we
442    /// previously knew about.
443    ///
444    /// If the identity is our own, we will look for a user-signing key; if one
445    /// is not found, an error is returned. Otherwise, we then compare the
446    /// received public identity against our stored private identity;
447    /// if they match, the returned public identity is marked as verified and
448    /// `*changed_private_identity` is set to `None`. If they do *not* match,
449    /// it is an indication that our identity has been rotated, and
450    /// `*changed_private_identity` is set to our currently-stored private
451    /// identity with the conflicting keys removed (which will need to be
452    /// persisted).
453    ///
454    /// Whether the identity is our own or that of another, we check whether
455    /// there has been any change to the cross-signing keys, and classify
456    /// the result into [`IdentityUpdateResult::Updated`] or
457    /// [`IdentityUpdateResult::Unchanged`].
458    ///
459    /// # Arguments
460    ///
461    /// * `response` - The entire `/keys/query` response.
462    /// * `master_key` - The public master cross-signing key from the
463    ///   `/keys/query` response.
464    /// * `self_signing` - The public self-signing key from the `/keys/query`
465    ///   response.
466    /// * `i` - The existing identity for this user.
467    /// * `changed_private_identity` - Output parameter. Unchanged if the
468    ///   identity is that of another user. If it is our own, set to `None` or
469    ///   `Some` depending on whether our stored private identity needs
470    ///   updating. See above for more detail.
471    async fn handle_changed_identity(
472        &self,
473        response: &KeysQueryResponse,
474        maybe_verified_own_identity: Option<&OwnUserIdentity>,
475        master_key: MasterPubkey,
476        self_signing: SelfSigningPubkey,
477        i: UserIdentityData,
478        changed_private_identity: &mut Option<PrivateCrossSigningIdentity>,
479    ) -> Result<IdentityUpdateResult, SignatureError> {
480        match i {
481            UserIdentityData::Own(mut identity) => {
482                let user_signing = self.get_user_signing_key_from_response(response)?;
483                let has_changed = identity.update(master_key, self_signing, user_signing)?;
484                *changed_private_identity = self.check_private_identity(&identity).await;
485                if has_changed {
486                    Ok(IdentityUpdateResult::Updated(identity.into()))
487                } else {
488                    Ok(IdentityUpdateResult::Unchanged(identity.into()))
489                }
490            }
491            UserIdentityData::Other(mut identity) => {
492                let has_changed = identity.update(
493                    master_key,
494                    self_signing,
495                    maybe_verified_own_identity.map(|o| o.user_signing_key()),
496                )?;
497
498                if has_changed {
499                    Ok(IdentityUpdateResult::Updated(identity.into()))
500                } else {
501                    Ok(IdentityUpdateResult::Unchanged(identity.into()))
502                }
503            }
504        }
505    }
506
507    /// Process an identity received in a `/keys/query` response that we didn't
508    /// previously know about.
509    ///
510    /// If the identity is our own, we will look for a user-signing key, and if
511    /// it is present and correct, all three keys will be returned in the
512    /// `IdentityChange` result; otherwise, an error is returned. We will also
513    /// compare the received public identity against our stored private
514    /// identity; if they match, the returned public identity is marked as
515    /// verified and `*changed_private_identity` is set to `None`. If they do
516    /// *not* match, it is an indication that our identity has been rotated,
517    /// and `*changed_private_identity` is set to our currently-stored
518    /// private identity with the conflicting keys removed (which will need
519    /// to be persisted).
520    ///
521    /// If the identity is that of another user, we just parse the keys into the
522    /// `IdentityChange` result, since all other checks have already been done.
523    ///
524    /// # Arguments
525    ///
526    /// * `response` - The entire `/keys/query` response.
527    /// * `master_key` - The public master cross-signing key from the
528    ///   `/keys/query` response.
529    /// * `self_signing` - The public self-signing key from the `/keys/query`
530    ///   response.
531    /// * `changed_private_identity` - Output parameter. Unchanged if the
532    ///   identity is that of another user. If it is our own, set to `None` or
533    ///   `Some` depending on whether our stored private identity needs
534    ///   updating. See above for more detail.
535    async fn handle_new_identity(
536        &self,
537        response: &KeysQueryResponse,
538        maybe_verified_own_identity: Option<&OwnUserIdentity>,
539        master_key: MasterPubkey,
540        self_signing: SelfSigningPubkey,
541        changed_private_identity: &mut Option<PrivateCrossSigningIdentity>,
542    ) -> Result<UserIdentityData, SignatureError> {
543        if master_key.user_id() == self.user_id() {
544            // Own identity
545            let user_signing = self.get_user_signing_key_from_response(response)?;
546            let identity = OwnUserIdentityData::new(master_key, self_signing, user_signing)?;
547            *changed_private_identity = self.check_private_identity(&identity).await;
548            Ok(identity.into())
549        } else {
550            // First time seen, create the identity. The current MSK will be pinned.
551            let identity = OtherUserIdentityData::new(master_key, self_signing)?;
552            let is_verified = maybe_verified_own_identity
553                .is_some_and(|own_user_identity| own_user_identity.is_identity_signed(&identity));
554            if is_verified {
555                identity.mark_as_previously_verified();
556            }
557
558            Ok(identity.into())
559        }
560    }
561
562    /// Try to deserialize the master key and self-signing key of an
563    /// identity from a `/keys/query` response.
564    ///
565    /// Each user identity *must* at least contain a master and self-signing
566    /// key, and this function deserializes them. (Our own identity, in addition
567    /// to those two, also contains a user-signing key, but that is not
568    /// extracted here; see
569    /// [`IdentityManager::get_user_signing_key_from_response`])
570    ///
571    /// # Arguments
572    ///
573    ///  * `master_key` - The master key for a particular user from a
574    ///    `/keys/query` response.
575    ///  * `response` - The entire `/keys/query` response.
576    ///
577    /// # Returns
578    ///
579    /// `None` if the self-signing key couldn't be found in the response, or the
580    /// one of the keys couldn't be deserialized. Else, the deserialized
581    /// public keys.
582    fn get_minimal_set_of_keys(
583        master_key: &Raw<CrossSigningKey>,
584        response: &KeysQueryResponse,
585    ) -> Option<(MasterPubkey, SelfSigningPubkey)> {
586        match master_key.deserialize_as::<MasterPubkey>() {
587            Ok(master_key) => {
588                if let Some(self_signing) = response
589                    .self_signing_keys
590                    .get(master_key.user_id())
591                    .and_then(|k| k.deserialize_as::<SelfSigningPubkey>().ok())
592                {
593                    Some((master_key, self_signing))
594                } else {
595                    warn!("A user identity didn't contain a self signing pubkey or the key was invalid");
596                    None
597                }
598            }
599            Err(e) => {
600                warn!(
601                    error = ?e,
602                    "Couldn't update or create new user identity"
603                );
604                None
605            }
606        }
607    }
608
609    /// Try to deserialize the our user-signing key from a `/keys/query`
610    /// response.
611    ///
612    /// If a `/keys/query` response includes our own cross-signing keys, then it
613    /// should include our user-signing key. This method attempts to
614    /// extract, deserialize, and check the key from the response.
615    ///
616    /// # Arguments
617    ///
618    /// * `response` - the entire `/keys/query` response.
619    fn get_user_signing_key_from_response(
620        &self,
621        response: &KeysQueryResponse,
622    ) -> Result<UserSigningPubkey, SignatureError> {
623        let Some(user_signing) = response
624            .user_signing_keys
625            .get(self.user_id())
626            .and_then(|k| k.deserialize_as::<UserSigningPubkey>().ok())
627        else {
628            warn!(
629                "User identity for our own user didn't contain a user signing pubkey or the key \
630                    isn't valid",
631            );
632            return Err(SignatureError::MissingSigningKey);
633        };
634
635        if user_signing.user_id() != self.user_id() {
636            warn!(
637                expected = ?self.user_id(),
638                got = ?user_signing.user_id(),
639                "User ID mismatch in our user-signing key",
640            );
641            return Err(SignatureError::UserIdMismatch);
642        }
643
644        Ok(user_signing)
645    }
646
647    /// Process the cross-signing keys for a particular identity from a
648    /// `/keys/query` response.
649    ///
650    /// Checks that the keys are consistent, verifies the updates, and produces
651    /// a list of changes to be stored.
652    ///
653    /// # Arguments
654    ///
655    /// * `response` - The entire `/keys/query` response.
656    /// * `changes` - The identity results so far, which we will add to.
657    /// * `changed_identity` - Output parameter: Unchanged if the identity is
658    ///   that of another user. If it is our own, set to `None` or `Some`
659    ///   depending on whether our stored private identity needs updating.
660    /// * `maybe_verified_own_identity` - Own verified identity if any to check
661    ///   verification status of updated identity.
662    /// * `key_set_info` - The identity info as returned by the `/keys/query`
663    ///   response.
664    #[instrument(skip_all, fields(user_id))]
665    async fn update_or_create_identity(
666        &self,
667        response: &KeysQueryResponse,
668        changes: &mut IdentityChanges,
669        changed_private_identity: &mut Option<PrivateCrossSigningIdentity>,
670        maybe_verified_own_identity: Option<&OwnUserIdentity>,
671        key_set_info: KeySetInfo,
672    ) -> StoreResult<()> {
673        let KeySetInfo { user_id, master_key, self_signing } = key_set_info;
674        if master_key.user_id() != user_id || self_signing.user_id() != user_id {
675            warn!(?user_id, "User ID mismatch in one of the cross signing keys");
676        } else if let Some(i) = self.store.get_user_identity(&user_id).await? {
677            // an identity we knew about before, which is being updated
678            match self
679                .handle_changed_identity(
680                    response,
681                    maybe_verified_own_identity,
682                    master_key,
683                    self_signing,
684                    i,
685                    changed_private_identity,
686                )
687                .await
688            {
689                Ok(IdentityUpdateResult::Updated(identity)) => {
690                    debug!(?identity, "Updated a user identity");
691                    changes.changed.push(identity);
692                }
693                Ok(IdentityUpdateResult::Unchanged(identity)) => {
694                    trace!(?identity, "Received an unchanged user identity");
695                    changes.unchanged.push(identity);
696                }
697                Err(e) => {
698                    warn!(error = ?e, "Couldn't update an existing user identity");
699                }
700            }
701        } else {
702            // an identity we did not know about before
703            match self
704                .handle_new_identity(
705                    response,
706                    maybe_verified_own_identity,
707                    master_key,
708                    self_signing,
709                    changed_private_identity,
710                )
711                .await
712            {
713                Ok(identity) => {
714                    trace!(?identity, "Created new user identity");
715                    changes.new.push(identity);
716                }
717                Err(e) => {
718                    warn!(error = ?e, "Couldn't create new user identity");
719                }
720            }
721        };
722
723        Ok(())
724    }
725
726    /// Handle the cross signing keys part of a key query response.
727    ///
728    /// # Arguments
729    ///
730    /// * `response` - The `/keys/query` response.
731    ///
732    /// # Returns
733    ///
734    /// The processed results, to be saved to the datastore, comprising:
735    ///
736    ///  * A list of public identities that were received, categorised as "new",
737    ///    "changed" or "unchanged".
738    ///
739    ///  * If our own identity was updated and did not match our private
740    ///    identity, an update to that private identity. Otherwise, `None`.
741    async fn handle_cross_signing_keys(
742        &self,
743        response: &KeysQueryResponse,
744    ) -> StoreResult<(IdentityChanges, Option<PrivateCrossSigningIdentity>)> {
745        let mut changes = IdentityChanges::default();
746        let mut changed_identity = None;
747
748        // We want to check if the updated/new other identities are trusted by us or
749        // not. This is based on the current verified state of the own identity.
750        let maybe_own_verified_identity = self
751            .store
752            .get_identity(self.user_id())
753            .await?
754            .and_then(UserIdentity::own)
755            .filter(|own| own.is_verified());
756
757        for (user_id, master_key) in &response.master_keys {
758            // Get the master and self-signing key for each identity; those are required for
759            // every user identity type. If we don't have those we skip over.
760            let Some((master_key, self_signing)) =
761                Self::get_minimal_set_of_keys(master_key.cast_ref(), response)
762            else {
763                continue;
764            };
765
766            let key_set_info = KeySetInfo { user_id: user_id.clone(), master_key, self_signing };
767
768            self.update_or_create_identity(
769                response,
770                &mut changes,
771                &mut changed_identity,
772                maybe_own_verified_identity.as_ref(),
773                key_set_info,
774            )
775            .await?;
776        }
777
778        Ok((changes, changed_identity))
779    }
780
781    /// Generate an "out-of-band" key query request for the given set of users.
782    ///
783    /// Unlike the regular key query requests returned by `users_for_key_query`,
784    /// there can be several of these in flight at once. This can be useful
785    /// if we need results to be as up-to-date as possible.
786    ///
787    /// Once the request has been made, the response can be fed back into the
788    /// IdentityManager and store by calling `receive_keys_query_response`.
789    ///
790    /// # Arguments
791    ///
792    /// * `users` - list of users whose keys should be queried
793    ///
794    /// # Returns
795    ///
796    /// A tuple containing the request ID for the request, and the request
797    /// itself.
798    pub(crate) fn build_key_query_for_users<'a>(
799        &self,
800        users: impl IntoIterator<Item = &'a UserId>,
801    ) -> (OwnedTransactionId, KeysQueryRequest) {
802        // Since this is an "out-of-band" request, we just make up a transaction ID and
803        // do not store the details in `self.keys_query_request_details`.
804        //
805        // `receive_keys_query_response` will process the response as normal, except
806        // that it will not mark the users as "up-to-date".
807
808        // We assume that there aren't too many users here; if we find a usecase that
809        // requires lots of users to be up-to-date we may need to rethink this.
810        (TransactionId::new(), KeysQueryRequest::new(users.into_iter().map(|u| u.to_owned())))
811    }
812
813    /// Get a list of key query requests needed.
814    ///
815    /// # Returns
816    ///
817    /// A map of a request ID to the `/keys/query` request.
818    ///
819    /// The response of a successful key query requests needs to be passed to
820    /// the [`OlmMachine`] with the [`receive_keys_query_response`].
821    ///
822    /// [`receive_keys_query_response`]: Self::receive_keys_query_response
823    pub async fn users_for_key_query(
824        &self,
825    ) -> StoreResult<BTreeMap<OwnedTransactionId, KeysQueryRequest>> {
826        // Forget about any previous key queries in flight.
827        *self.keys_query_request_details.lock().await = None;
828
829        // We always want to track our own user, but in case we aren't in an encrypted
830        // room yet, we won't be tracking ourselves yet. This ensures we are always
831        // tracking ourselves.
832        //
833        // The check for emptiness is done first for performance.
834        let (users, sequence_number) = {
835            let cache = self.store.cache().await?;
836            let key_query_manager = self.key_query_manager.synced(&cache).await?;
837
838            let (users, sequence_number) = key_query_manager.users_for_key_query().await;
839
840            if users.is_empty() && !key_query_manager.tracked_users().contains(self.user_id()) {
841                key_query_manager.mark_user_as_changed(self.user_id()).await?;
842                key_query_manager.users_for_key_query().await
843            } else {
844                (users, sequence_number)
845            }
846        };
847
848        if users.is_empty() {
849            Ok(BTreeMap::new())
850        } else {
851            // Let's remove users that are part of the `FailuresCache`. The cache, which is
852            // a TTL cache, remembers users for which a previous `/key/query` request has
853            // failed. We don't retry a `/keys/query` for such users for a
854            // certain amount of time.
855            let users = users.into_iter().filter(|u| !self.failures.contains(u.server_name()));
856
857            // We don't want to create a single `/keys/query` request with an infinite
858            // amount of users. Some servers will likely bail out after a
859            // certain amount of users and the responses will be large. In the
860            // case of a transmission error, we'll have to retransmit the large
861            // response.
862            //
863            // Convert the set of users into multiple /keys/query requests.
864            let requests: BTreeMap<_, _> = users
865                .chunks(Self::MAX_KEY_QUERY_USERS)
866                .into_iter()
867                .map(|user_chunk| {
868                    let request_id = TransactionId::new();
869                    let request = KeysQueryRequest::new(user_chunk);
870
871                    debug!(?request_id, users = ?request.device_keys.keys(), "Created a /keys/query request");
872
873                    (request_id, request)
874                })
875                .collect();
876
877            // Collect the request IDs, these will be used later in the
878            // `receive_keys_query_response()` method to figure out if the user can be
879            // marked as up-to-date/non-dirty.
880            let request_ids = requests.keys().cloned().collect();
881            let request_details = KeysQueryRequestDetails { sequence_number, request_ids };
882
883            *self.keys_query_request_details.lock().await = Some(request_details);
884
885            Ok(requests)
886        }
887    }
888
889    /// Receive the list of users that contained changed devices from the
890    /// `/sync` response.
891    ///
892    /// This will queue up the given user for a key query.
893    ///
894    /// Note: The user already needs to be tracked for it to be queued up for a
895    /// key query.
896    pub async fn receive_device_changes(
897        &self,
898        cache: &StoreCache,
899        users: impl Iterator<Item = &UserId>,
900    ) -> StoreResult<()> {
901        self.key_query_manager.synced(cache).await?.mark_tracked_users_as_changed(users).await
902    }
903
904    /// See the docs for [`OlmMachine::update_tracked_users()`].
905    pub async fn update_tracked_users(
906        &self,
907        users: impl IntoIterator<Item = &UserId>,
908    ) -> StoreResult<()> {
909        let cache = self.store.cache().await?;
910        self.key_query_manager.synced(&cache).await?.update_tracked_users(users.into_iter()).await
911    }
912
913    /// Retrieve a list of a user's current devices, so we can encrypt a message
914    /// to them.
915    ///
916    /// If we have not yet seen any devices for the user, and their device list
917    /// has been marked as outdated, then we wait for the `/keys/query` request
918    /// to complete. This helps ensure that we attempt at least once to fetch a
919    /// user's devices before encrypting to them.
920    pub async fn get_user_devices_for_encryption(
921        &self,
922        users: impl Iterator<Item = &UserId>,
923    ) -> StoreResult<HashMap<OwnedUserId, HashMap<OwnedDeviceId, DeviceData>>> {
924        // How long we wait for /keys/query to complete.
925        const KEYS_QUERY_WAIT_TIME: Duration = Duration::from_secs(5);
926
927        let mut devices_by_user = HashMap::new();
928        let mut users_with_no_devices_on_failed_servers = Vec::new();
929        let mut users_with_no_devices_on_unfailed_servers = Vec::new();
930
931        for user_id in users {
932            // First of all, check the store for this user.
933            let devices = self.store.get_device_data_for_user_filtered(user_id).await?;
934
935            // Now, look for users who have no devices at all.
936            //
937            // If a user has no devices at all, that implies we have never (successfully)
938            // done a `/keys/query` for them; we wait for one to complete if it is
939            // in flight. (Of course, the user might genuinely have no devices, but
940            // that's fine, it just means we redundantly grab the cache guard and
941            // check the pending-query flag.)
942            if !devices.is_empty() {
943                // This user has at least one known device.
944                //
945                // The device list may also be outdated in this case; but in this
946                // situation, we are racing between sending a message and retrieving their
947                // device list. That's an inherently racy situation and there is no real
948                // benefit to waiting for the `/keys/query` request to complete. So we don't
949                // bother.
950                //
951                // We just add their devices to the result and carry on.
952                devices_by_user.insert(user_id.to_owned(), devices);
953                continue;
954            }
955
956            // *However*, if the user's server is currently subject to a backoff due to
957            // previous failures, then `users_for_key_query` won't attempt to query
958            // for the user's devices, so there's no point waiting.
959            //
960            // XXX: this is racy. It's possible that:
961            //  * `failures` included the user's server when `users_for_key_query` was
962            //    called, so the user was not returned in the `KeyQueryRequest`, and:
963            //  * The backoff has now expired.
964            //
965            // In that case, we'll end up waiting for the *next* `users_for_key_query` call,
966            // which might not be for 30 seconds or so. (And by then, it might be `failed`
967            // again.)
968            if self.failures.contains(user_id.server_name()) {
969                users_with_no_devices_on_failed_servers.push(user_id);
970                continue;
971            }
972
973            users_with_no_devices_on_unfailed_servers.push(user_id);
974        }
975
976        if !users_with_no_devices_on_failed_servers.is_empty() {
977            info!(
978                ?users_with_no_devices_on_failed_servers,
979                "Not waiting for `/keys/query` for users whose server has previously failed"
980            );
981        }
982
983        if !users_with_no_devices_on_unfailed_servers.is_empty() {
984            // For each user with no devices, fire off a task to wait for a `/keys/query`
985            // result if one is pending.
986            //
987            // We don't actually update the `devices_by_user` map here since that could
988            // require concurrent access to it. Instead each task returns a
989            // `(OwnedUserId, HashMap)` pair (or rather, an `Option` of one) so that we can
990            // add the results to the map.
991            let results = join_all(
992                users_with_no_devices_on_unfailed_servers
993                    .into_iter()
994                    .map(|user_id| self.get_updated_keys_for_user(KEYS_QUERY_WAIT_TIME, user_id)),
995            )
996            .await;
997
998            // Once all the tasks have completed, process the results.
999            let mut updated_users = Vec::new();
1000            for result in results {
1001                if let Some((user_id, updated_devices)) = result? {
1002                    devices_by_user.insert(user_id.to_owned(), updated_devices);
1003                    updated_users.push(user_id);
1004                }
1005            }
1006
1007            if !updated_users.is_empty() {
1008                info!(
1009                    ?updated_users,
1010                    "Waited for `/keys/query` to complete for users who have no devices"
1011                );
1012            }
1013        }
1014
1015        Ok(devices_by_user)
1016    }
1017
1018    /// Helper for get_user_devices_for_encryption.
1019    ///
1020    /// Waits for any pending `/keys/query` for the given user. If one was
1021    /// pending, reloads the device list and returns `Some(user_id,
1022    /// device_list)`. If no request was pending, returns `None`.
1023    #[allow(clippy::type_complexity)]
1024    #[instrument(skip(self))]
1025    async fn get_updated_keys_for_user<'a>(
1026        &self,
1027        timeout_duration: Duration,
1028        user_id: &'a UserId,
1029    ) -> Result<Option<(&'a UserId, HashMap<OwnedDeviceId, DeviceData>)>, CryptoStoreError> {
1030        let cache = self.store.cache().await?;
1031        match self
1032            .key_query_manager
1033            .wait_if_user_key_query_pending(cache, timeout_duration, user_id)
1034            .await?
1035        {
1036            UserKeyQueryResult::WasPending => {
1037                Ok(Some((user_id, self.store.get_device_data_for_user_filtered(user_id).await?)))
1038            }
1039            _ => Ok(None),
1040        }
1041    }
1042
1043    /// Given a list of changed devices, update any [`InboundGroupSession`]s
1044    /// which were sent from those devices and which do not have complete
1045    /// sender data.
1046    async fn update_sender_data_from_device_changes(
1047        &self,
1048        device_changes: &DeviceChanges,
1049    ) -> Result<(), CryptoStoreError> {
1050        for device in device_changes.new.iter().chain(device_changes.changed.iter()) {
1051            // 1. Look for InboundGroupSessions from the device whose sender_data is
1052            //    UnknownDevice. For such sessions, we now have the device, and can update
1053            //    the sender_data accordingly.
1054            //
1055            // In theory, we only need to do this for new devices. In practice, I'm a bit
1056            // worried about races leading us to getting stuck in the
1057            // UnknownDevice state, so we'll paper over that by doing this check
1058            // on device updates too.
1059            self.update_sender_data_for_sessions_for_device(device, SenderDataType::UnknownDevice)
1060                .await?;
1061
1062            // 2. If, and only if, the device is now correctly cross-signed (ie,
1063            //    device.is_cross_signed_by_owner() is true, and we have the master
1064            //    cross-signing key for the owner), look for InboundGroupSessions from the
1065            //    device whose sender_data is DeviceInfo. We can also update the sender_data
1066            //    for these sessions.
1067            //
1068            // In theory, we can skip a couple of steps of the SenderDataFinder algorithm,
1069            // because we're doing the cross-signing check here. In practice,
1070            // it's *way* easier just to use the same logic.
1071            let device_owner_identity = self.store.get_user_identity(device.user_id()).await?;
1072            if device_owner_identity.is_some_and(|id| device.is_cross_signed_by_owner(&id)) {
1073                self.update_sender_data_for_sessions_for_device(device, SenderDataType::DeviceInfo)
1074                    .await?;
1075            }
1076        }
1077
1078        Ok(())
1079    }
1080
1081    /// Given a device, look for [`InboundGroupSession`]s whose sender data is
1082    /// in the given state, and update it.
1083    #[instrument(skip(self))]
1084    async fn update_sender_data_for_sessions_for_device(
1085        &self,
1086        device: &DeviceData,
1087        sender_data_type: SenderDataType,
1088    ) -> Result<(), CryptoStoreError> {
1089        const IGS_BATCH_SIZE: usize = 50;
1090
1091        let Some(curve_key) = device.curve25519_key() else { return Ok(()) };
1092
1093        let mut last_session_id: Option<String> = None;
1094        loop {
1095            let mut sessions = self
1096                .store
1097                .get_inbound_group_sessions_for_device_batch(
1098                    curve_key,
1099                    sender_data_type,
1100                    last_session_id,
1101                    IGS_BATCH_SIZE,
1102                )
1103                .await?;
1104
1105            if sessions.is_empty() {
1106                // end of the session list
1107                return Ok(());
1108            }
1109
1110            last_session_id = None;
1111            for session in &mut sessions {
1112                last_session_id = Some(session.session_id().to_owned());
1113                self.update_sender_data_for_session(session, device).await?;
1114            }
1115            self.store.save_inbound_group_sessions(&sessions).await?;
1116        }
1117    }
1118
1119    /// Update the sender data on the given inbound group session, using the
1120    /// given device data.
1121    #[instrument(skip(self, device, session), fields(session_id = session.session_id()))]
1122    async fn update_sender_data_for_session(
1123        &self,
1124        session: &mut InboundGroupSession,
1125        device: &DeviceData,
1126    ) -> Result<(), CryptoStoreError> {
1127        use crate::olm::sender_data_finder::SessionDeviceCheckError::*;
1128
1129        match SenderDataFinder::find_using_device_data(&self.store, device.clone(), session).await {
1130            Ok(sender_data) => {
1131                debug!(
1132                    "Updating existing InboundGroupSession with new SenderData {:?}",
1133                    sender_data
1134                );
1135                session.sender_data = sender_data;
1136            }
1137            Err(CryptoStoreError(e)) => {
1138                return Err(e);
1139            }
1140            Err(MismatchedIdentityKeys(e)) => {
1141                warn!(
1142                    ?session,
1143                    ?device,
1144                    "cannot update existing InboundGroupSession due to ownership error: {}",
1145                    e
1146                );
1147            }
1148        };
1149
1150        Ok(())
1151    }
1152
1153    /// Mark all tracked users as dirty.
1154    ///
1155    /// All users *whose device lists we are tracking* are flagged as needing a
1156    /// key query. Users whose devices we are not tracking are ignored.
1157    pub(crate) async fn mark_all_tracked_users_as_dirty(
1158        &self,
1159        store_cache: StoreCacheGuard,
1160    ) -> StoreResult<()> {
1161        let store_wrapper = store_cache.store_wrapper();
1162        let tracked_users = store_wrapper.load_tracked_users().await?;
1163
1164        self.key_query_manager
1165            .synced(&store_cache)
1166            .await?
1167            .mark_tracked_users_as_changed(
1168                tracked_users.iter().map(|tracked_user| tracked_user.user_id.as_ref()),
1169            )
1170            .await?;
1171
1172        Ok(())
1173    }
1174}
1175
1176/// Log information about what changed after processing a /keys/query response.
1177/// Only does anything if the DEBUG log level is enabled.
1178fn debug_log_keys_query_response(
1179    devices: &DeviceChanges,
1180    identities: &IdentityChanges,
1181    request_id: &TransactionId,
1182) {
1183    #[allow(unknown_lints, clippy::unwrap_or_default)] // false positive
1184    let changed_devices = devices.changed.iter().fold(BTreeMap::new(), |mut acc, d| {
1185        acc.entry(d.user_id()).or_insert_with(BTreeSet::new).insert(d.device_id());
1186        acc
1187    });
1188
1189    #[allow(unknown_lints, clippy::unwrap_or_default)] // false positive
1190    let new_devices = devices.new.iter().fold(BTreeMap::new(), |mut acc, d| {
1191        acc.entry(d.user_id()).or_insert_with(BTreeSet::new).insert(d.device_id());
1192        acc
1193    });
1194
1195    #[allow(unknown_lints, clippy::unwrap_or_default)] // false positive
1196    let deleted_devices = devices.deleted.iter().fold(BTreeMap::new(), |mut acc, d| {
1197        acc.entry(d.user_id()).or_insert_with(BTreeSet::new).insert(d.device_id());
1198        acc
1199    });
1200
1201    let new_identities = identities.new.iter().map(|i| i.user_id()).collect::<BTreeSet<_>>();
1202    let changed_identities =
1203        identities.changed.iter().map(|i| i.user_id()).collect::<BTreeSet<_>>();
1204
1205    debug!(
1206        ?request_id,
1207        ?new_devices,
1208        ?changed_devices,
1209        ?deleted_devices,
1210        ?new_identities,
1211        ?changed_identities,
1212        "Finished handling of the `/keys/query` response"
1213    );
1214}
1215
1216#[cfg(any(test, feature = "testing"))]
1217#[allow(dead_code)]
1218pub(crate) mod testing {
1219    use std::sync::Arc;
1220
1221    use matrix_sdk_test::ruma_response_from_json;
1222    use ruma::{
1223        api::client::keys::get_keys::v3::Response as KeyQueryResponse, device_id, user_id,
1224        DeviceId, UserId,
1225    };
1226    use serde_json::json;
1227    use tokio::sync::Mutex;
1228
1229    use crate::{
1230        identities::IdentityManager,
1231        olm::{Account, PrivateCrossSigningIdentity},
1232        store::{CryptoStoreWrapper, MemoryStore, PendingChanges, Store},
1233        types::{requests::UploadSigningKeysRequest, DeviceKeys},
1234        verification::VerificationMachine,
1235    };
1236
1237    pub fn user_id() -> &'static UserId {
1238        user_id!("@example:localhost")
1239    }
1240
1241    pub fn other_user_id() -> &'static UserId {
1242        user_id!("@example2:localhost")
1243    }
1244
1245    pub fn device_id() -> &'static DeviceId {
1246        device_id!("WSKKLTJZCL")
1247    }
1248
1249    pub(crate) async fn manager_test_helper(
1250        user_id: &UserId,
1251        device_id: &DeviceId,
1252    ) -> IdentityManager {
1253        let identity = PrivateCrossSigningIdentity::new(user_id.into());
1254        let identity = Arc::new(Mutex::new(identity));
1255        let user_id = user_id.to_owned();
1256        let account = Account::with_device_id(&user_id, device_id);
1257        let static_account = account.static_data().clone();
1258        let store = Arc::new(CryptoStoreWrapper::new(&user_id, device_id, MemoryStore::new()));
1259        let verification =
1260            VerificationMachine::new(static_account.clone(), identity.clone(), store.clone());
1261        let store = Store::new(static_account, identity, store, verification);
1262        store.save_pending_changes(PendingChanges { account: Some(account) }).await.unwrap();
1263        IdentityManager::new(store)
1264    }
1265
1266    pub fn other_key_query() -> KeyQueryResponse {
1267        let data = &json!({
1268            "device_keys": {
1269                "@example2:localhost": {
1270                    "SKISMLNIMH": {
1271                        "algorithms": ["m.olm.v1.curve25519-aes-sha2", "m.megolm.v1.aes-sha2"],
1272                        "device_id": "SKISMLNIMH",
1273                        "keys": {
1274                            "curve25519:SKISMLNIMH": "qO9xFazIcW8dE0oqHGMojGgJwbBpMOhGnIfJy2pzvmI",
1275                            "ed25519:SKISMLNIMH": "y3wV3AoyIGREqrJJVH8DkQtlwHBUxoZ9ApP76kFgXQ8"
1276                        },
1277                        "signatures": {
1278                            "@example2:localhost": {
1279                                "ed25519:SKISMLNIMH": "YwbT35rbjKoYFZVU1tQP8MsL06+znVNhNzUMPt6jTEYRBFoC4GDq9hQEJBiFSq37r1jvLMteggVAWw37fs1yBA",
1280                                "ed25519:ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc": "PWuuTE/aTkp1EJQkPHhRx2BxbF+wjMIDFxDRp7JAerlMkDsNFUTfRRusl6vqROPU36cl+yY8oeJTZGFkU6+pBQ"
1281                            }
1282                        },
1283                        "user_id": "@example2:localhost",
1284                        "unsigned": {
1285                            "device_display_name": "Riot Desktop (Linux)"
1286                        }
1287                    }
1288                }
1289            },
1290            "failures": {},
1291            "master_keys": {
1292                "@example2:localhost": {
1293                    "user_id": "@example2:localhost",
1294                    "usage": ["master"],
1295                    "keys": {
1296                        "ed25519:kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do": "kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do"
1297                    },
1298                    "signatures": {
1299                        "@example2:localhost": {
1300                            "ed25519:SKISMLNIMH": "KdUZqzt8VScGNtufuQ8lOf25byYLWIhmUYpPENdmM8nsldexD7vj+Sxoo7PknnTX/BL9h2N7uBq0JuykjunCAw"
1301                        }
1302                    }
1303                }
1304            },
1305            "self_signing_keys": {
1306                "@example2:localhost": {
1307                    "user_id": "@example2:localhost",
1308                    "usage": ["self_signing"],
1309                    "keys": {
1310                        "ed25519:ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc": "ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc"
1311                    },
1312                    "signatures": {
1313                        "@example2:localhost": {
1314                            "ed25519:kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do": "W/O8BnmiUETPpH02mwYaBgvvgF/atXnusmpSTJZeUSH/vHg66xiZOhveQDG4cwaW8iMa+t9N4h1DWnRoHB4mCQ"
1315                        }
1316                    }
1317                }
1318            },
1319            "user_signing_keys": {}
1320        });
1321        ruma_response_from_json(data)
1322    }
1323
1324    // An updated version of `other_key_query` featuring an additional signature on
1325    // the master key *Note*: The added signature is actually not valid, but a
1326    // valid signature  is not required for our test.
1327    pub fn other_key_query_cross_signed() -> KeyQueryResponse {
1328        let data = json!({
1329            "device_keys": {
1330                "@example2:localhost": {
1331                    "SKISMLNIMH": {
1332                        "algorithms": ["m.olm.v1.curve25519-aes-sha2", "m.megolm.v1.aes-sha2"],
1333                        "device_id": "SKISMLNIMH",
1334                        "keys": {
1335                            "curve25519:SKISMLNIMH": "qO9xFazIcW8dE0oqHGMojGgJwbBpMOhGnIfJy2pzvmI",
1336                            "ed25519:SKISMLNIMH": "y3wV3AoyIGREqrJJVH8DkQtlwHBUxoZ9ApP76kFgXQ8"
1337                        },
1338                        "signatures": {
1339                            "@example2:localhost": {
1340                                "ed25519:SKISMLNIMH": "YwbT35rbjKoYFZVU1tQP8MsL06+znVNhNzUMPt6jTEYRBFoC4GDq9hQEJBiFSq37r1jvLMteggVAWw37fs1yBA",
1341                                "ed25519:ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc": "PWuuTE/aTkp1EJQkPHhRx2BxbF+wjMIDFxDRp7JAerlMkDsNFUTfRRusl6vqROPU36cl+yY8oeJTZGFkU6+pBQ"
1342                            }
1343                        },
1344                        "user_id": "@example2:localhost",
1345                        "unsigned": {
1346                            "device_display_name": "Riot Desktop (Linux)"
1347                        }
1348                    }
1349                }
1350            },
1351            "failures": {},
1352            "master_keys": {
1353                "@example2:localhost": {
1354                    "user_id": "@example2:localhost",
1355                    "usage": ["master"],
1356                    "keys": {
1357                        "ed25519:kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do": "kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do"
1358                    },
1359                    "signatures": {
1360                        "@example2:localhost": {
1361                            "ed25519:SKISMLNIMH": "KdUZqzt8VScGNtufuQ8lOf25byYLWIhmUYpPENdmM8nsldexD7vj+Sxoo7PknnTX/BL9h2N7uBq0JuykjunCAw"
1362                        },
1363                        // This is the added signature from alice USK compared to `other_key_query`. Note that actual signature is not valid.
1364                        "@alice:localhost": {
1365                            "ed25519:DU9z4gBFKFKCk7a13sW9wjT0Iyg7Hqv5f0BPM7DEhPo": "NotAValidSignature+GNtufuQ8lOf25byYLWIhmUYpPENdmM8nsldexD7vj+Sxoo7PknnTX/BL9h2N7uBq0JuykjunCAw"
1366                        }
1367                    }
1368                }
1369            },
1370            "self_signing_keys": {
1371                "@example2:localhost": {
1372                    "user_id": "@example2:localhost",
1373                    "usage": ["self_signing"],
1374                    "keys": {
1375                        "ed25519:ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc": "ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc"
1376                    },
1377                    "signatures": {
1378                        "@example2:localhost": {
1379                            "ed25519:kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do": "W/O8BnmiUETPpH02mwYaBgvvgF/atXnusmpSTJZeUSH/vHg66xiZOhveQDG4cwaW8iMa+t9N4h1DWnRoHB4mCQ"
1380                        }
1381                    }
1382                }
1383            },
1384            "user_signing_keys": {}
1385        });
1386        ruma_response_from_json(&data)
1387    }
1388
1389    /// Mocked response to a /keys/query request.
1390    pub fn own_key_query_with_user_id(user_id: &UserId) -> KeyQueryResponse {
1391        let data = json!({
1392          "device_keys": {
1393            user_id: {
1394              "WSKKLTJZCL": {
1395                "algorithms": [
1396                  "m.olm.v1.curve25519-aes-sha2",
1397                  "m.megolm.v1.aes-sha2"
1398                ],
1399                "device_id": "WSKKLTJZCL",
1400                "keys": {
1401                  "curve25519:WSKKLTJZCL": "wnip2tbJBJxrFayC88NNJpm61TeSNgYcqBH4T9yEDhU",
1402                  "ed25519:WSKKLTJZCL": "lQ+eshkhgKoo+qp9Qgnj3OX5PBoWMU5M9zbuEevwYqE"
1403                },
1404                "signatures": {
1405                  user_id: {
1406                    "ed25519:WSKKLTJZCL": "SKpIUnq7QK0xleav0PrIQyKjVm+TgZr7Yi8cKjLeZDtkgyToE2d4/e3Aj79dqOlLB92jFVE4d1cM/Ry04wFwCA",
1407                    "ed25519:0C8lCBxrvrv/O7BQfsKnkYogHZX3zAgw3RfJuyiq210": "9UGu1iC5YhFCdELGfB29YaV+QE0t/X5UDSsPf4QcdZyXIwyp9zBbHX2lh9vWudNQ+akZpaq7ZRaaM+4TCnw/Ag"
1408                  }
1409                },
1410                "user_id": user_id,
1411                "unsigned": {
1412                  "device_display_name": "Cross signing capable"
1413                }
1414              },
1415              "LVWOVGOXME": {
1416                "algorithms": [
1417                  "m.olm.v1.curve25519-aes-sha2",
1418                  "m.megolm.v1.aes-sha2"
1419                ],
1420                "device_id": "LVWOVGOXME",
1421                "keys": {
1422                  "curve25519:LVWOVGOXME": "KMfWKUhnDW1D11hNzATs/Ax1FQRsJxKCWzq0NyGtIiI",
1423                  "ed25519:LVWOVGOXME": "k+NC3L7CBD6fBClcHBrKLOkqCyGNSKhWXiH5Q2STRnA"
1424                },
1425                "signatures": {
1426                  user_id: {
1427                    "ed25519:LVWOVGOXME": "39Ir5Bttpc5+bQwzLj7rkjm5E5/cp/JTbMJ/t0enj6J5w9MXVBFOUqqM2hpaRaRwILMMpwYbJ8IOGjl0Y/MGAw"
1428                  }
1429                },
1430                "user_id": user_id,
1431                "unsigned": {
1432                  "device_display_name": "Non-cross signing"
1433                }
1434              }
1435            }
1436          },
1437          "failures": {},
1438          "master_keys": {
1439            user_id: {
1440              "user_id": user_id,
1441              "usage": [
1442                "master"
1443              ],
1444              "keys": {
1445                "ed25519:rJ2TAGkEOP6dX41Ksll6cl8K3J48l8s/59zaXyvl2p0": "rJ2TAGkEOP6dX41Ksll6cl8K3J48l8s/59zaXyvl2p0"
1446              },
1447              "signatures": {
1448                user_id: {
1449                  "ed25519:WSKKLTJZCL": "ZzJp1wtmRdykXAUEItEjNiFlBrxx8L6/Vaen9am8AuGwlxxJtOkuY4m+4MPLvDPOgavKHLsrRuNLAfCeakMlCQ"
1450                }
1451              }
1452            }
1453          },
1454          "self_signing_keys": {
1455            user_id: {
1456              "user_id": user_id,
1457              "usage": [
1458                "self_signing"
1459              ],
1460              "keys": {
1461                "ed25519:0C8lCBxrvrv/O7BQfsKnkYogHZX3zAgw3RfJuyiq210": "0C8lCBxrvrv/O7BQfsKnkYogHZX3zAgw3RfJuyiq210"
1462              },
1463              "signatures": {
1464                user_id: {
1465                  "ed25519:rJ2TAGkEOP6dX41Ksll6cl8K3J48l8s/59zaXyvl2p0": "AC7oDUW4rUhtInwb4lAoBJ0wAuu4a5k+8e34B5+NKsDB8HXRwgVwUWN/MRWc/sJgtSbVlhzqS9THEmQQ1C51Bw"
1466                }
1467              }
1468            }
1469          },
1470          "user_signing_keys": {
1471            user_id: {
1472              "user_id": user_id,
1473              "usage": [
1474                "user_signing"
1475              ],
1476              "keys": {
1477                "ed25519:DU9z4gBFKFKCk7a13sW9wjT0Iyg7Hqv5f0BPM7DEhPo": "DU9z4gBFKFKCk7a13sW9wjT0Iyg7Hqv5f0BPM7DEhPo"
1478              },
1479              "signatures": {
1480                user_id: {
1481                  "ed25519:rJ2TAGkEOP6dX41Ksll6cl8K3J48l8s/59zaXyvl2p0": "C4L2sx9frGqj8w41KyynHGqwUbbwBYRZpYCB+6QWnvQFA5Oi/1PJj8w5anwzEsoO0TWmLYmf7FXuAGewanOWDg"
1482                }
1483              }
1484            }
1485          }
1486        });
1487        ruma_response_from_json(&data)
1488    }
1489
1490    pub fn own_key_query() -> KeyQueryResponse {
1491        own_key_query_with_user_id(user_id())
1492    }
1493
1494    pub fn key_query(
1495        identity: UploadSigningKeysRequest,
1496        device_keys: DeviceKeys,
1497    ) -> KeyQueryResponse {
1498        let json = json!({
1499            "device_keys": {
1500                "@example:localhost": {
1501                    device_keys.device_id.to_string(): device_keys
1502                }
1503            },
1504            "failures": {},
1505            "master_keys": {
1506                "@example:localhost": identity.master_key
1507            },
1508            "self_signing_keys": {
1509                "@example:localhost": identity.self_signing_key
1510            },
1511            "user_signing_keys": {
1512                "@example:localhost": identity.user_signing_key
1513            },
1514          }
1515        );
1516
1517        ruma_response_from_json(&json)
1518    }
1519}
1520
1521#[cfg(test)]
1522pub(crate) mod tests {
1523    use std::ops::Deref;
1524
1525    use futures_util::pin_mut;
1526    use matrix_sdk_test::{async_test, ruma_response_from_json, test_json};
1527    use ruma::{
1528        api::client::keys::get_keys::v3::Response as KeysQueryResponse, device_id, user_id,
1529        TransactionId,
1530    };
1531    use serde_json::json;
1532    use stream_assert::{assert_closed, assert_pending, assert_ready};
1533
1534    use super::testing::{
1535        device_id, key_query, manager_test_helper, other_key_query, other_user_id, user_id,
1536    };
1537    use crate::{
1538        identities::manager::testing::{other_key_query_cross_signed, own_key_query},
1539        olm::PrivateCrossSigningIdentity,
1540        CrossSigningKeyExport, OlmMachine,
1541    };
1542
1543    fn key_query_with_failures() -> KeysQueryResponse {
1544        let response = json!({
1545            "device_keys": {
1546            },
1547            "failures": {
1548                "example.org": {
1549                    "errcode": "M_RESOURCE_LIMIT_EXCEEDED",
1550                    "error": "Not yet ready to retry",
1551                }
1552            }
1553        });
1554
1555        ruma_response_from_json(&response)
1556    }
1557
1558    #[async_test]
1559    async fn test_tracked_users() {
1560        let manager = manager_test_helper(user_id(), device_id()).await;
1561        let alice = user_id!("@alice:example.org");
1562
1563        let cache = manager.store.cache().await.unwrap();
1564        let key_query_manager = manager.key_query_manager.synced(&cache).await.unwrap();
1565
1566        assert!(key_query_manager.tracked_users().is_empty(), "No users are initially tracked");
1567
1568        manager.receive_device_changes(&cache, [alice].iter().map(Deref::deref)).await.unwrap();
1569
1570        assert!(
1571            !key_query_manager.tracked_users().contains(alice),
1572            "Receiving a device changes update for a user we don't track does nothing"
1573        );
1574
1575        assert!(
1576            !key_query_manager.users_for_key_query().await.0.contains(alice),
1577            "The user we don't track doesn't end up in the `/keys/query` request"
1578        );
1579    }
1580
1581    #[async_test]
1582    async fn test_manager_creation() {
1583        let manager = manager_test_helper(user_id(), device_id()).await;
1584        let cache = manager.store.cache().await.unwrap();
1585        assert!(manager.key_query_manager.synced(&cache).await.unwrap().tracked_users().is_empty())
1586    }
1587
1588    #[async_test]
1589    async fn test_manager_key_query_response() {
1590        let manager = manager_test_helper(user_id(), device_id()).await;
1591        let other_user = other_user_id();
1592        let devices = manager.store.get_user_devices(other_user).await.unwrap();
1593        assert_eq!(devices.devices().count(), 0);
1594
1595        manager
1596            .receive_keys_query_response(&TransactionId::new(), &other_key_query())
1597            .await
1598            .unwrap();
1599
1600        let devices = manager.store.get_user_devices(other_user).await.unwrap();
1601        assert_eq!(devices.devices().count(), 1);
1602
1603        let device = manager
1604            .store
1605            .get_device_data(other_user, device_id!("SKISMLNIMH"))
1606            .await
1607            .unwrap()
1608            .unwrap();
1609        let identity = manager.store.get_user_identity(other_user).await.unwrap().unwrap();
1610        let identity = identity.other().unwrap();
1611
1612        assert!(identity.is_device_signed(&device));
1613    }
1614
1615    #[async_test]
1616    async fn test_manager_own_key_query_response() {
1617        let manager = manager_test_helper(user_id(), device_id()).await;
1618        let our_user = user_id();
1619        let devices = manager.store.get_user_devices(our_user).await.unwrap();
1620        assert_eq!(devices.devices().count(), 0);
1621
1622        let private_identity = manager.store.private_identity();
1623        let private_identity = private_identity.lock().await;
1624        let identity_request = private_identity.as_upload_request().await;
1625        drop(private_identity);
1626
1627        let device_keys =
1628            manager.store.cache().await.unwrap().account().await.unwrap().device_keys();
1629        manager
1630            .receive_keys_query_response(
1631                &TransactionId::new(),
1632                &key_query(identity_request, device_keys),
1633            )
1634            .await
1635            .unwrap();
1636
1637        let identity = manager
1638            .store
1639            .get_user_identity(our_user)
1640            .await
1641            .unwrap()
1642            .expect("missing user identity");
1643        let identity = identity.own().expect("missing own identity");
1644        assert!(identity.is_verified());
1645
1646        let devices = manager.store.get_user_devices(our_user).await.unwrap();
1647        assert_eq!(devices.devices().count(), 1);
1648
1649        let device =
1650            manager.store.get_device_data(our_user, device_id!(device_id())).await.unwrap();
1651
1652        assert!(device.is_some());
1653    }
1654
1655    #[async_test]
1656    async fn test_private_identity_invalidation_after_public_keys_change() {
1657        let user_id = user_id!("@example1:localhost");
1658        let manager = manager_test_helper(user_id, "DEVICEID".into()).await;
1659
1660        let identity_request = {
1661            let private_identity = manager.store.private_identity();
1662            let private_identity = private_identity.lock().await;
1663            private_identity.as_upload_request().await
1664        };
1665        let device_keys = manager.store.static_account().unsigned_device_keys();
1666
1667        let response = json!({
1668            "device_keys": {
1669                user_id: {
1670                    device_keys.device_id.to_string(): device_keys
1671                }
1672            },
1673            "master_keys": {
1674                user_id: identity_request.master_key,
1675            },
1676            "self_signing_keys": {
1677                user_id: identity_request.self_signing_key,
1678            },
1679            "user_signing_keys": {
1680                user_id: identity_request.user_signing_key,
1681            }
1682        });
1683
1684        let response = ruma_response_from_json(&response);
1685        manager.receive_keys_query_response(&TransactionId::new(), &response).await.unwrap();
1686
1687        let identity = manager.store.get_user_identity(user_id).await.unwrap().unwrap();
1688        let identity = identity.own().unwrap();
1689        assert!(identity.is_verified());
1690
1691        let identity_request = {
1692            let private_identity = PrivateCrossSigningIdentity::new(user_id.into());
1693            private_identity.as_upload_request().await
1694        };
1695
1696        let response = json!({
1697            "master_keys": {
1698                user_id: identity_request.master_key,
1699                "@example2:localhost": {
1700                    "user_id": "@example2:localhost",
1701                    "usage": ["master"],
1702                    "keys": {
1703                        "ed25519:kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do": "kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do"
1704                    },
1705                    "signatures": {
1706                        "@example2:localhost": {
1707                            "ed25519:SKISMLNIMH": "KdUZqzt8VScGNtufuQ8lOf25byYLWIhmUYpPENdmM8nsldexD7vj+Sxoo7PknnTX/BL9h2N7uBq0JuykjunCAw"
1708                        }
1709                    }
1710                },
1711            },
1712            "self_signing_keys": {
1713                user_id: identity_request.self_signing_key,
1714                "@example2:localhost": {
1715                    "user_id": "@example2:localhost",
1716                    "usage": ["self_signing"],
1717                    "keys": {
1718                        "ed25519:ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc": "ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc"
1719                    },
1720                    "signatures": {
1721                        "@example2:localhost": {
1722                            "ed25519:kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do": "W/O8BnmiUETPpH02mwYaBgvvgF/atXnusmpSTJZeUSH/vHg66xiZOhveQDG4cwaW8iMa+t9N4h1DWnRoHB4mCQ"
1723                        }
1724                    }
1725                }
1726            },
1727            "user_signing_keys": {
1728                user_id: identity_request.user_signing_key,
1729            }
1730        });
1731
1732        let response = ruma_response_from_json(&response);
1733        let (_, private_identity) = manager.handle_cross_signing_keys(&response).await.unwrap();
1734
1735        assert!(private_identity.is_some());
1736        let private_identity = manager.store.private_identity();
1737        assert!(private_identity.lock().await.is_empty().await);
1738    }
1739
1740    #[async_test]
1741    async fn test_no_tracked_users_key_query_request() {
1742        let manager = manager_test_helper(user_id(), device_id()).await;
1743
1744        let cache = manager.store.cache().await.unwrap();
1745        assert!(
1746            manager.key_query_manager.synced(&cache).await.unwrap().tracked_users().is_empty(),
1747            "No users are initially tracked"
1748        );
1749
1750        let requests = manager.users_for_key_query().await.unwrap();
1751        assert!(!requests.is_empty(), "We query the keys for our own user");
1752
1753        assert!(
1754            manager
1755                .key_query_manager
1756                .synced(&cache)
1757                .await
1758                .unwrap()
1759                .tracked_users()
1760                .contains(manager.user_id()),
1761            "Our own user is now tracked"
1762        );
1763    }
1764
1765    /// If a user is invalidated while a /keys/query request is in flight, that
1766    /// user is not removed from the list of outdated users when the
1767    /// response is received
1768    #[async_test]
1769    async fn test_invalidation_race_handling() {
1770        let manager = manager_test_helper(user_id(), device_id()).await;
1771        let alice = other_user_id();
1772        manager.update_tracked_users([alice]).await.unwrap();
1773
1774        // alice should be in the list of key queries
1775        let (reqid, req) = manager.users_for_key_query().await.unwrap().pop_first().unwrap();
1776        assert!(req.device_keys.contains_key(alice));
1777
1778        // another invalidation turns up
1779        {
1780            let cache = manager.store.cache().await.unwrap();
1781            manager.receive_device_changes(&cache, [alice].into_iter()).await.unwrap();
1782        }
1783
1784        // the response from the query arrives
1785        manager.receive_keys_query_response(&reqid, &other_key_query()).await.unwrap();
1786
1787        // alice should *still* be in the list of key queries
1788        let (reqid, req) = manager.users_for_key_query().await.unwrap().pop_first().unwrap();
1789        assert!(req.device_keys.contains_key(alice));
1790
1791        // another key query response
1792        manager.receive_keys_query_response(&reqid, &other_key_query()).await.unwrap();
1793
1794        // finally alice should not be in the list
1795        let queries = manager.users_for_key_query().await.unwrap();
1796        assert!(!queries.iter().any(|(_, r)| r.device_keys.contains_key(alice)));
1797    }
1798
1799    #[async_test]
1800    async fn test_failure_handling() {
1801        let manager = manager_test_helper(user_id(), device_id()).await;
1802        let alice = user_id!("@alice:example.org");
1803
1804        {
1805            let cache = manager.store.cache().await.unwrap();
1806            let key_query_manager = manager.key_query_manager.synced(&cache).await.unwrap();
1807            assert!(key_query_manager.tracked_users().is_empty(), "No users are initially tracked");
1808
1809            key_query_manager.mark_user_as_changed(alice).await.unwrap();
1810
1811            assert!(
1812                key_query_manager.tracked_users().contains(alice),
1813                "Alice is tracked after being marked as tracked"
1814            );
1815        }
1816
1817        let (reqid, req) = manager.users_for_key_query().await.unwrap().pop_first().unwrap();
1818        assert!(req.device_keys.contains_key(alice));
1819
1820        // a failure should stop us querying for the user's keys.
1821        let response = key_query_with_failures();
1822        manager.receive_keys_query_response(&reqid, &response).await.unwrap();
1823        assert!(manager.failures.contains(alice.server_name()));
1824        assert!(!manager
1825            .users_for_key_query()
1826            .await
1827            .unwrap()
1828            .iter()
1829            .any(|(_, r)| r.device_keys.contains_key(alice)));
1830
1831        // clearing the failure flag should make the user reappear in the query list.
1832        manager.failures.remove([alice.server_name().to_owned()].iter());
1833        assert!(manager
1834            .users_for_key_query()
1835            .await
1836            .unwrap()
1837            .iter()
1838            .any(|(_, r)| r.device_keys.contains_key(alice)));
1839    }
1840
1841    #[async_test]
1842    async fn test_out_of_band_key_query() {
1843        // build the request
1844        let manager = manager_test_helper(user_id(), device_id()).await;
1845        let (reqid, req) = manager.build_key_query_for_users(vec![user_id()]);
1846        assert!(req.device_keys.contains_key(user_id()));
1847
1848        // make up a response and check it is processed
1849        let (device_changes, identity_changes) =
1850            manager.receive_keys_query_response(&reqid, &own_key_query()).await.unwrap();
1851        assert_eq!(device_changes.new.len(), 1);
1852        assert_eq!(device_changes.new[0].device_id(), "LVWOVGOXME");
1853        assert_eq!(identity_changes.new.len(), 1);
1854        assert_eq!(identity_changes.new[0].user_id(), user_id());
1855
1856        let devices = manager.store.get_user_devices(user_id()).await.unwrap();
1857        assert_eq!(devices.devices().count(), 1);
1858        assert_eq!(devices.devices().next().unwrap().device_id(), "LVWOVGOXME");
1859    }
1860
1861    #[async_test]
1862    async fn test_invalid_key_response() {
1863        let my_user_id = user_id();
1864        let my_device_id = device_id();
1865        let manager = manager_test_helper(my_user_id, my_device_id).await;
1866
1867        // First of all, populate the store with good data
1868        let (reqid, _) = manager.build_key_query_for_users(vec![user_id()]);
1869        let (device_changes, identity_changes) =
1870            manager.receive_keys_query_response(&reqid, &own_key_query()).await.unwrap();
1871        assert_eq!(device_changes.new.len(), 1);
1872        let test_device_id = device_changes.new.first().unwrap().device_id().to_owned();
1873        use crate::store::Changes;
1874        let changes =
1875            Changes { devices: device_changes, identities: identity_changes, ..Changes::default() };
1876        manager.store.save_changes(changes).await.unwrap();
1877
1878        // Now provide an invalid update
1879        let (reqid, _) = manager.build_key_query_for_users(vec![my_user_id]);
1880        let response = ruma_response_from_json(&json!({
1881            "device_keys": {
1882                my_user_id: {
1883                    test_device_id.as_str(): {
1884                        "algorithms": [
1885                            "m.olm.v1.curve25519-aes-sha2",
1886                        ],
1887                        "device_id": test_device_id.as_str(),
1888                        "keys": {
1889                            format!("curve25519:{}", test_device_id): "wnip2tbJBJxrFayC88NNJpm61TeSNgYcqBH4T9yEDhU",
1890                            format!("ed25519:{}", test_device_id): "lQ+eshkhgKoo+qp9Qgnj3OX5PBoWMU5M9zbuEevwYqE"
1891                        },
1892                        "signatures": {
1893                            my_user_id: {
1894                                // Not a valid signature.
1895                                format!("ed25519:{}", test_device_id): "imadethisup",
1896                            }
1897                        },
1898                        "user_id": my_user_id,
1899                    }
1900                }
1901            }
1902        }));
1903
1904        let (device_changes, identity_changes) =
1905            manager.receive_keys_query_response(&reqid, &response).await.unwrap();
1906
1907        // The result should be empty
1908        assert_eq!(device_changes.new.len(), 0);
1909        assert_eq!(device_changes.changed.len(), 0);
1910        assert_eq!(device_changes.deleted.len(), 0);
1911        assert_eq!(identity_changes.new.len(), 0);
1912
1913        // And the device should not have been updated.
1914        let device =
1915            manager.store.get_user_devices(my_user_id).await.unwrap().get(&test_device_id).unwrap();
1916        assert_eq!(device.algorithms().len(), 2);
1917    }
1918
1919    #[async_test]
1920    async fn test_devices_stream() {
1921        let manager = manager_test_helper(user_id(), device_id()).await;
1922        let (request_id, _) = manager.build_key_query_for_users(vec![user_id()]);
1923
1924        let stream = manager.store.devices_stream();
1925        pin_mut!(stream);
1926
1927        manager.receive_keys_query_response(&request_id, &own_key_query()).await.unwrap();
1928
1929        let update = assert_ready!(stream);
1930        assert!(!update.new.is_empty(), "The device update should contain some devices");
1931    }
1932
1933    #[async_test]
1934    async fn test_identities_stream() {
1935        let manager = manager_test_helper(user_id(), device_id()).await;
1936        let (request_id, _) = manager.build_key_query_for_users(vec![user_id()]);
1937
1938        let stream = manager.store.user_identities_stream();
1939        pin_mut!(stream);
1940
1941        manager.receive_keys_query_response(&request_id, &own_key_query()).await.unwrap();
1942
1943        let update = assert_ready!(stream);
1944        assert!(!update.new.is_empty(), "The identities update should contain some identities");
1945    }
1946
1947    #[async_test]
1948    async fn test_identities_stream_raw() {
1949        let mut manager = Some(manager_test_helper(user_id(), device_id()).await);
1950        let (request_id, _) = manager.as_ref().unwrap().build_key_query_for_users(vec![user_id()]);
1951
1952        let stream = manager.as_ref().unwrap().store.identities_stream_raw();
1953        pin_mut!(stream);
1954
1955        manager
1956            .as_ref()
1957            .unwrap()
1958            .receive_keys_query_response(&request_id, &own_key_query())
1959            .await
1960            .unwrap();
1961
1962        let (identity_update, _) = assert_ready!(stream);
1963        assert_eq!(identity_update.new.len(), 1);
1964        assert_eq!(identity_update.changed.len(), 0);
1965        assert_eq!(identity_update.unchanged.len(), 0);
1966        assert_eq!(identity_update.new[0].user_id(), user_id());
1967
1968        assert_pending!(stream);
1969
1970        let (new_request_id, _) =
1971            manager.as_ref().unwrap().build_key_query_for_users(vec![user_id()]);
1972
1973        // A second `/keys/query` response with the same result shouldn't fire a change
1974        // notification: the identity and device should be unchanged.
1975        manager
1976            .as_ref()
1977            .unwrap()
1978            .receive_keys_query_response(&new_request_id, &own_key_query())
1979            .await
1980            .unwrap();
1981
1982        assert_pending!(stream);
1983
1984        // dropping the manager (and hence dropping the store) should close the stream
1985        manager.take();
1986        assert_closed!(stream);
1987    }
1988
1989    #[async_test]
1990    async fn test_identities_stream_raw_signature_update() {
1991        let mut manager = Some(manager_test_helper(user_id(), device_id()).await);
1992        let (request_id, _) =
1993            manager.as_ref().unwrap().build_key_query_for_users(vec![other_user_id()]);
1994
1995        let stream = manager.as_ref().unwrap().store.identities_stream_raw();
1996        pin_mut!(stream);
1997
1998        manager
1999            .as_ref()
2000            .unwrap()
2001            .receive_keys_query_response(&request_id, &other_key_query())
2002            .await
2003            .unwrap();
2004
2005        let (identity_update, _) = assert_ready!(stream);
2006        assert_eq!(identity_update.new.len(), 1);
2007        assert_eq!(identity_update.changed.len(), 0);
2008        assert_eq!(identity_update.unchanged.len(), 0);
2009        assert_eq!(identity_update.new[0].user_id(), other_user_id());
2010
2011        let initial_msk = identity_update.new[0].master_key().clone();
2012
2013        let (new_request_id, _) =
2014            manager.as_ref().unwrap().build_key_query_for_users(vec![user_id()]);
2015        // There is a new signature on the msk, should trigger a change
2016        manager
2017            .as_ref()
2018            .unwrap()
2019            .receive_keys_query_response(&new_request_id, &other_key_query_cross_signed())
2020            .await
2021            .unwrap();
2022
2023        let (identity_update_2, _) = assert_ready!(stream);
2024        assert_eq!(identity_update_2.new.len(), 0);
2025        assert_eq!(identity_update_2.changed.len(), 1);
2026        assert_eq!(identity_update_2.unchanged.len(), 0);
2027
2028        let updated_msk = identity_update_2.changed[0].master_key().clone();
2029
2030        // Identity has a change (new signature) but it's the same msk
2031        assert_eq!(initial_msk, updated_msk);
2032
2033        assert_pending!(stream);
2034
2035        manager.take();
2036    }
2037
2038    #[async_test]
2039    async fn test_key_query_with_unknown_properties() {
2040        let manager = manager_test_helper(user_id(), device_id()).await;
2041        let other_user = user_id!("@example:localhost");
2042        let devices = manager.store.get_user_devices(other_user).await.unwrap();
2043        assert_eq!(devices.devices().count(), 0);
2044
2045        let response = json!({
2046            "device_keys": {
2047                "@example:localhost": {
2048                    "OBEBOSKTBE": {
2049                        "algorithms": ["m.olm.v1.curve25519-aes-sha2", "m.megolm.v1.aes-sha2"],
2050                        "user_id": "@example:localhost",
2051                        "device_id": "OBEBOSKTBE",
2052                        "extra_property": "somevalue",
2053                        "keys": {
2054                            "curve25519:OBEBOSKTBE": "ECrdZebl0DskwbkxoztsiKPb6ivu7M2qQ70BFWwre3w",
2055                            "ed25519:OBEBOSKTBE": "hFWo+pG6TVWNzq/ZubUQVL5Ardu9rqHxpKkCbf1/KiA"
2056                        },
2057                        "signatures": {
2058                            "@example:localhost": {
2059                                "ed25519:OBEBOSKTBE": "6vyYUgX+IoT1x6Mvf0g/GEPVb2UI3brfL7WZ75WZ81sH4FBFgAzkkuGpw9suGLKXnlEdLH0suBzaT4esVhFDCw",
2060                            },
2061                        },
2062                    },
2063                },
2064            },
2065        });
2066
2067        let response = ruma_response_from_json(&response);
2068        manager.receive_keys_query_response(&TransactionId::new(), &response).await.unwrap();
2069
2070        let devices = manager.store.get_user_devices(other_user).await.unwrap();
2071        assert_eq!(devices.devices().count(), 1);
2072
2073        manager.store.get_device_data(other_user, device_id!("OBEBOSKTBE")).await.unwrap().unwrap();
2074    }
2075
2076    #[async_test]
2077    async fn test_manager_identity_updates() {
2078        use test_json::keys_query_sets::IdentityChangeDataSet as DataSet;
2079
2080        let manager = manager_test_helper(user_id(), device_id()).await;
2081        let other_user = DataSet::user_id();
2082        let devices = manager.store.get_user_devices(other_user).await.unwrap();
2083        assert_eq!(devices.devices().count(), 0);
2084
2085        let identity = manager.store.get_user_identity(other_user).await.unwrap();
2086        assert!(identity.is_none());
2087
2088        manager
2089            .receive_keys_query_response(
2090                &TransactionId::new(),
2091                &DataSet::key_query_with_identity_a(),
2092            )
2093            .await
2094            .unwrap();
2095
2096        let identity = manager.store.get_user_identity(other_user).await.unwrap().unwrap();
2097        let other_identity = identity.other().unwrap();
2098
2099        // We should now have an identity for the user but no pin violation
2100        // (pinned master key is the current one)
2101        assert!(!other_identity.has_pin_violation());
2102        let first_device =
2103            manager.store.get_device_data(other_user, DataSet::device_a()).await.unwrap().unwrap();
2104        assert!(first_device.is_cross_signed_by_owner(&identity));
2105
2106        // We receive a new keys update for that user, with a new identity
2107        manager
2108            .receive_keys_query_response(
2109                &TransactionId::new(),
2110                &DataSet::key_query_with_identity_b(),
2111            )
2112            .await
2113            .unwrap();
2114
2115        let identity = manager.store.get_user_identity(other_user).await.unwrap().unwrap();
2116        let other_identity = identity.other().unwrap();
2117
2118        // The previous known identity has been replaced, there should be a pin
2119        // violation
2120        assert!(other_identity.has_pin_violation());
2121
2122        let second_device =
2123            manager.store.get_device_data(other_user, DataSet::device_b()).await.unwrap().unwrap();
2124
2125        // There is a new device signed by the new identity
2126        assert!(second_device.is_cross_signed_by_owner(&identity));
2127
2128        // The first device should not be signed by the new identity
2129        let first_device =
2130            manager.store.get_device_data(other_user, DataSet::device_a()).await.unwrap().unwrap();
2131        assert!(!first_device.is_cross_signed_by_owner(&identity));
2132
2133        let remember_previous_identity = other_identity.clone();
2134        // We receive updated keys for that user, with no identity anymore.
2135        // Notice that there is no server API to delete identity, but we want to
2136        // test here that a home server cannot clear the identity and
2137        // subsequently serve a new one which would get automatically approved.
2138        manager
2139            .receive_keys_query_response(
2140                &TransactionId::new(),
2141                &DataSet::key_query_with_identity_no_identity(),
2142            )
2143            .await
2144            .unwrap();
2145
2146        let identity = manager.store.get_user_identity(other_user).await.unwrap().unwrap();
2147        let other_identity = identity.other().unwrap();
2148
2149        assert_eq!(other_identity, &remember_previous_identity);
2150        assert!(other_identity.has_pin_violation());
2151    }
2152
2153    #[async_test]
2154    async fn test_manager_resolve_identity_pin_violation() {
2155        use test_json::keys_query_sets::IdentityChangeDataSet as DataSet;
2156
2157        let manager = manager_test_helper(user_id(), device_id()).await;
2158        let other_user = DataSet::user_id();
2159
2160        manager
2161            .receive_keys_query_response(
2162                &TransactionId::new(),
2163                &DataSet::key_query_with_identity_a(),
2164            )
2165            .await
2166            .unwrap();
2167
2168        // We receive a new keys update for that user, with a new identity
2169        manager
2170            .receive_keys_query_response(
2171                &TransactionId::new(),
2172                &DataSet::key_query_with_identity_b(),
2173            )
2174            .await
2175            .unwrap();
2176
2177        let identity = manager.store.get_user_identity(other_user).await.unwrap().unwrap();
2178        let other_identity = identity.other().unwrap();
2179
2180        // We have a new identity now, so there should be a pin violation
2181        assert!(other_identity.has_pin_violation());
2182
2183        // Resolve the violation by pinning the new identity
2184        other_identity.pin();
2185
2186        assert!(!other_identity.has_pin_violation());
2187    }
2188
2189    // Set up a machine do initial own key query and import cross-signing secret to
2190    // make the current session verified.
2191    async fn common_verified_identity_changes_machine_setup() -> OlmMachine {
2192        use test_json::keys_query_sets::VerificationViolationTestData as DataSet;
2193
2194        let machine = OlmMachine::new(DataSet::own_id(), device_id!("LOCAL")).await;
2195
2196        let keys_query = DataSet::own_keys_query_response_1();
2197        let txn_id = TransactionId::new();
2198        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2199
2200        machine
2201            .import_cross_signing_keys(CrossSigningKeyExport {
2202                master_key: DataSet::MASTER_KEY_PRIVATE_EXPORT.to_owned().into(),
2203                self_signing_key: DataSet::SELF_SIGNING_KEY_PRIVATE_EXPORT.to_owned().into(),
2204                user_signing_key: DataSet::USER_SIGNING_KEY_PRIVATE_EXPORT.to_owned().into(),
2205            })
2206            .await
2207            .unwrap();
2208        machine
2209    }
2210    #[async_test]
2211    async fn test_manager_verified_latch_setup_on_new_identities() {
2212        use test_json::keys_query_sets::VerificationViolationTestData as DataSet;
2213
2214        let machine = common_verified_identity_changes_machine_setup().await;
2215
2216        // ######
2217        // First test: Assert that the latch is properly set on new identities
2218        // ######
2219        let keys_query = DataSet::bob_keys_query_response_signed();
2220        let txn_id = TransactionId::new();
2221        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2222
2223        let own_identity =
2224            machine.get_identity(DataSet::own_id(), None).await.unwrap().unwrap().own().unwrap();
2225        // For sanity check that own identity is trusted
2226        assert!(own_identity.is_verified());
2227
2228        let bob_identity =
2229            machine.get_identity(DataSet::bob_id(), None).await.unwrap().unwrap().other().unwrap();
2230        // The verified latch should be true
2231        assert!(bob_identity.was_previously_verified());
2232        // And bob is verified
2233        assert!(bob_identity.is_verified());
2234
2235        // ######
2236        // Second test: Assert that the local latch stays on if the identity is rotated
2237        // ######
2238        let keys_query = DataSet::bob_keys_query_response_rotated();
2239        let txn_id = TransactionId::new();
2240        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2241
2242        let bob_identity =
2243            machine.get_identity(DataSet::bob_id(), None).await.unwrap().unwrap().other().unwrap();
2244        // Bob is not verified anymore
2245        assert!(!bob_identity.is_verified());
2246        // The verified latch should still be true
2247        assert!(bob_identity.was_previously_verified());
2248        // Bob device_2 is self-signed even if there is this verification latch
2249        // violation
2250        let bob_device = machine
2251            .get_device(DataSet::bob_id(), DataSet::bob_device_2_id(), None)
2252            .await
2253            .unwrap()
2254            .unwrap();
2255        assert!(bob_identity.is_device_signed(&bob_device));
2256        // there is also a pin violation
2257        assert!(bob_identity.has_pin_violation());
2258        // Fixing the pin violation won't fix the verification latch violation
2259        bob_identity.pin_current_master_key().await.unwrap();
2260        assert!(!bob_identity.has_pin_violation());
2261        let has_latch_violation =
2262            bob_identity.was_previously_verified() && !bob_identity.is_verified();
2263        assert!(has_latch_violation);
2264    }
2265
2266    #[async_test]
2267    async fn test_manager_verified_identity_changes_setup_on_updated_identities() {
2268        use test_json::keys_query_sets::VerificationViolationTestData as DataSet;
2269
2270        let machine = common_verified_identity_changes_machine_setup().await;
2271
2272        // ######
2273        // Get the Carol identity for the first time
2274        // ######
2275        let keys_query = DataSet::carol_keys_query_response_unsigned();
2276        let txn_id = TransactionId::new();
2277        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2278
2279        let carol_identity =
2280            machine.get_identity(DataSet::carol_id(), None).await.unwrap().unwrap();
2281        // The identity is not verified
2282        assert!(!carol_identity.is_verified());
2283        // The verified latch is off
2284        assert!(!carol_identity.was_previously_verified());
2285
2286        // Carol is verified, likely from another session. Ensure the latch is updated
2287        // when the key query response is processed
2288        let keys_query = DataSet::carol_keys_query_response_signed();
2289        let txn_id = TransactionId::new();
2290        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2291
2292        let carol_identity = machine
2293            .get_identity(DataSet::carol_id(), None)
2294            .await
2295            .unwrap()
2296            .unwrap()
2297            .other()
2298            .unwrap();
2299        assert!(carol_identity.is_verified());
2300        // This should have updated the latch
2301        assert!(carol_identity.was_previously_verified());
2302        // It is the same identity, it's just signed now so no pin violation
2303        assert!(!carol_identity.has_pin_violation());
2304    }
2305
2306    // Set up a machine do initial own key query.
2307    // The cross signing secrets are not yet uploaded.
2308    // Then query keys for carol and bob (both signed by own identity)
2309    async fn common_verified_identity_changes_own_trust_change_machine_setup() -> OlmMachine {
2310        use test_json::keys_query_sets::VerificationViolationTestData as DataSet;
2311
2312        // Start on a non-verified session
2313        let machine = OlmMachine::new(DataSet::own_id(), device_id!("LOCAL")).await;
2314
2315        let keys_query = DataSet::own_keys_query_response_1();
2316        let txn_id = TransactionId::new();
2317        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2318
2319        // For sanity check that own identity is not trusted
2320        let own_identity =
2321            machine.get_identity(DataSet::own_id(), None).await.unwrap().unwrap().own().unwrap();
2322        assert!(!own_identity.is_verified());
2323
2324        let keys_query = DataSet::own_keys_query_response_1();
2325        let txn_id = TransactionId::new();
2326        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2327
2328        // Get Bob and Carol already signed
2329        let keys_query = DataSet::bob_keys_query_response_signed();
2330        let txn_id = TransactionId::new();
2331        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2332
2333        let keys_query = DataSet::carol_keys_query_response_signed();
2334        let txn_id = TransactionId::new();
2335        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2336
2337        machine.update_tracked_users(vec![DataSet::bob_id(), DataSet::carol_id()]).await.unwrap();
2338
2339        machine
2340    }
2341
2342    #[async_test]
2343    async fn test_manager_verified_identity_changes_setup_on_own_identity_trust_change() {
2344        use test_json::keys_query_sets::VerificationViolationTestData as DataSet;
2345        let machine = common_verified_identity_changes_own_trust_change_machine_setup().await;
2346
2347        let own_identity =
2348            machine.get_identity(DataSet::own_id(), None).await.unwrap().unwrap().own().unwrap();
2349
2350        let bob_identity = machine.get_identity(DataSet::bob_id(), None).await.unwrap().unwrap();
2351        // Bob is verified by our identity but our own identity is not yet trusted
2352        assert!(!bob_identity.was_previously_verified());
2353        assert!(own_identity.is_identity_signed(&bob_identity.other().unwrap()));
2354
2355        let carol_identity =
2356            machine.get_identity(DataSet::carol_id(), None).await.unwrap().unwrap();
2357        // Carol is verified by our identity but our own identity is not yet trusted
2358        assert!(!carol_identity.was_previously_verified());
2359        assert!(own_identity.is_identity_signed(&carol_identity.other().unwrap()));
2360
2361        // Marking our own identity as trusted should update the existing identities
2362        let _ = own_identity.verify().await;
2363
2364        let own_identity = machine.get_identity(DataSet::own_id(), None).await.unwrap().unwrap();
2365        assert!(own_identity.is_verified());
2366
2367        let carol_identity =
2368            machine.get_identity(DataSet::carol_id(), None).await.unwrap().unwrap();
2369        assert!(carol_identity.is_verified());
2370        // The latch should be set now
2371        assert!(carol_identity.was_previously_verified());
2372
2373        let bob_identity = machine.get_identity(DataSet::bob_id(), None).await.unwrap().unwrap();
2374        assert!(bob_identity.is_verified());
2375        // The latch should be set now
2376        assert!(bob_identity.was_previously_verified());
2377    }
2378
2379    #[async_test]
2380    async fn test_manager_verified_identity_change_setup_on_import_secrets() {
2381        use test_json::keys_query_sets::VerificationViolationTestData as DataSet;
2382        let machine = common_verified_identity_changes_own_trust_change_machine_setup().await;
2383
2384        let own_identity =
2385            machine.get_identity(DataSet::own_id(), None).await.unwrap().unwrap().own().unwrap();
2386
2387        let bob_identity =
2388            machine.get_identity(DataSet::bob_id(), None).await.unwrap().unwrap().other().unwrap();
2389        // Carol is verified by our identity but our own identity is not yet trusted
2390        assert!(own_identity.is_identity_signed(&bob_identity));
2391        assert!(!bob_identity.was_previously_verified());
2392
2393        let carol_identity = machine
2394            .get_identity(DataSet::carol_id(), None)
2395            .await
2396            .unwrap()
2397            .unwrap()
2398            .other()
2399            .unwrap();
2400        // Carol is verified by our identity but our own identity is not yet trusted
2401        assert!(own_identity.is_identity_signed(&carol_identity));
2402        assert!(!carol_identity.was_previously_verified());
2403
2404        // Marking our own identity as trusted should update the existing identities
2405        machine
2406            .import_cross_signing_keys(CrossSigningKeyExport {
2407                master_key: DataSet::MASTER_KEY_PRIVATE_EXPORT.to_owned().into(),
2408                self_signing_key: DataSet::SELF_SIGNING_KEY_PRIVATE_EXPORT.to_owned().into(),
2409                user_signing_key: DataSet::USER_SIGNING_KEY_PRIVATE_EXPORT.to_owned().into(),
2410            })
2411            .await
2412            .unwrap();
2413
2414        let own_identity = machine.get_identity(DataSet::own_id(), None).await.unwrap().unwrap();
2415        assert!(own_identity.is_verified());
2416
2417        let carol_identity =
2418            machine.get_identity(DataSet::carol_id(), None).await.unwrap().unwrap();
2419        assert!(carol_identity.is_verified());
2420        // The latch should be set now
2421        assert!(carol_identity.was_previously_verified());
2422
2423        let bob_identity = machine.get_identity(DataSet::bob_id(), None).await.unwrap().unwrap();
2424        assert!(bob_identity.is_verified());
2425        // The latch should be set now
2426        assert!(bob_identity.was_previously_verified());
2427    }
2428
2429    mod update_sender_data {
2430        use assert_matches::assert_matches;
2431        use matrix_sdk_test::async_test;
2432        use ruma::room_id;
2433
2434        use super::{device_id, manager_test_helper};
2435        use crate::{
2436            identities::manager::testing::{other_user_id, user_id},
2437            olm::{InboundGroupSession, SenderData},
2438            store::{Changes, DeviceChanges},
2439            Account, DeviceData, EncryptionSettings,
2440        };
2441
2442        #[async_test]
2443        async fn test_adds_device_info_to_existing_sessions() {
2444            let manager = manager_test_helper(user_id(), device_id()).await;
2445
2446            // Given that we have lots of sessions in the store, from each of two devices
2447            let account1 = Account::new(user_id());
2448            let account2 = Account::new(other_user_id());
2449
2450            let mut account1_sessions = Vec::new();
2451            for _ in 0..60 {
2452                account1_sessions.push(create_inbound_group_session(&account1).await);
2453            }
2454            let mut account2_sessions = Vec::new();
2455            for _ in 0..60 {
2456                account2_sessions.push(create_inbound_group_session(&account2).await);
2457            }
2458            manager
2459                .store
2460                .save_changes(Changes {
2461                    inbound_group_sessions: [account1_sessions.clone(), account2_sessions.clone()]
2462                        .concat(),
2463                    ..Default::default()
2464                })
2465                .await
2466                .unwrap();
2467
2468            // When we get an update for one device
2469            let device_data = DeviceData::from_account(&account1);
2470            manager
2471                .update_sender_data_from_device_changes(&DeviceChanges {
2472                    changed: vec![device_data],
2473                    ..Default::default()
2474                })
2475                .await
2476                .unwrap();
2477
2478            // Then those sessions should be updated
2479            for session in account1_sessions {
2480                let updated = manager
2481                    .store
2482                    .get_inbound_group_session(session.room_id(), session.session_id())
2483                    .await
2484                    .unwrap()
2485                    .expect("Could not find session after update");
2486                assert_matches!(
2487                    updated.sender_data,
2488                    SenderData::DeviceInfo { .. },
2489                    "incorrect sender data for session {}",
2490                    session.session_id()
2491                );
2492            }
2493
2494            // ... and those from the other account should not
2495            for session in account2_sessions {
2496                let updated = manager
2497                    .store
2498                    .get_inbound_group_session(session.room_id(), session.session_id())
2499                    .await
2500                    .unwrap()
2501                    .expect("Could not find session after update");
2502                assert_matches!(updated.sender_data, SenderData::UnknownDevice { .. });
2503            }
2504        }
2505
2506        /// Create an InboundGroupSession sent from the given account
2507        async fn create_inbound_group_session(account: &Account) -> InboundGroupSession {
2508            let (_, igs) = account
2509                .create_group_session_pair(
2510                    room_id!("!test:room"),
2511                    EncryptionSettings::default(),
2512                    SenderData::unknown(),
2513                )
2514                .await
2515                .unwrap();
2516            igs
2517        }
2518    }
2519}