matrix_sdk_crypto/identities/
manager.rs

1// Copyright 2020 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
17    ops::Deref,
18    sync::Arc,
19    time::Duration,
20};
21
22use futures_util::future::join_all;
23use itertools::Itertools;
24use matrix_sdk_common::{executor::spawn, failures_cache::FailuresCache};
25use ruma::{
26    OwnedDeviceId, OwnedServerName, OwnedTransactionId, OwnedUserId, ServerName, TransactionId,
27    UserId, api::client::keys::get_keys::v3::Response as KeysQueryResponse, serde::Raw,
28};
29use tokio::sync::Mutex;
30use tracing::{Level, debug, enabled, info, instrument, trace, warn};
31
32use crate::{
33    CryptoStoreError, LocalTrust, OwnUserIdentity, SignatureError, UserIdentity,
34    error::OlmResult,
35    identities::{DeviceData, OtherUserIdentityData, OwnUserIdentityData, UserIdentityData},
36    olm::{
37        InboundGroupSession, PrivateCrossSigningIdentity, SenderDataFinder, SenderDataType,
38        sender_data_finder::SessionDeviceCheckError,
39    },
40    store::{
41        KeyQueryManager, Result as StoreResult, Store,
42        caches::{SequenceNumber, StoreCache, StoreCacheGuard},
43        types::{Changes, DeviceChanges, IdentityChanges, UserKeyQueryResult},
44    },
45    types::{
46        CrossSigningKey, DeviceKeys, MasterPubkey, SelfSigningPubkey, UserSigningPubkey,
47        requests::KeysQueryRequest,
48    },
49};
50
51enum DeviceChange {
52    New(DeviceData),
53    Updated(DeviceData),
54    None,
55}
56
57/// This enum helps us to distinguish between the changed and unchanged
58/// identity case.
59/// An unchanged identity means same cross signing keys as well as same
60/// set of signatures on the master key.
61enum IdentityUpdateResult {
62    Updated(UserIdentityData),
63    Unchanged(UserIdentityData),
64}
65
66#[derive(Debug, Clone)]
67pub(crate) struct IdentityManager {
68    /// Servers that have previously appeared in the `failures` section of a
69    /// `/keys/query` response.
70    ///
71    /// See also [`crate::session_manager::SessionManager::failures`].
72    failures: FailuresCache<OwnedServerName>,
73    store: Store,
74
75    pub(crate) key_query_manager: Arc<KeyQueryManager>,
76
77    /// Details of the current "in-flight" key query request, if any
78    keys_query_request_details: Arc<Mutex<Option<KeysQueryRequestDetails>>>,
79}
80
81/// Details of an in-flight key query request
82#[derive(Debug, Clone, Default)]
83struct KeysQueryRequestDetails {
84    /// The sequence number, to be passed to
85    /// `Store.mark_tracked_users_as_up_to_date`.
86    sequence_number: SequenceNumber,
87
88    /// A single batch of queries returned by the Store is broken up into one or
89    /// more actual KeysQueryRequests, each with their own request id. We
90    /// record the outstanding request ids here.
91    request_ids: HashSet<OwnedTransactionId>,
92}
93
94// Helper type to handle key query response
95struct KeySetInfo {
96    user_id: OwnedUserId,
97    master_key: MasterPubkey,
98    self_signing: SelfSigningPubkey,
99}
100
101impl IdentityManager {
102    const MAX_KEY_QUERY_USERS: usize = 250;
103
104    pub fn new(store: Store) -> Self {
105        let keys_query_request_details = Mutex::new(None);
106
107        IdentityManager {
108            store,
109            key_query_manager: Default::default(),
110            failures: Default::default(),
111            keys_query_request_details: keys_query_request_details.into(),
112        }
113    }
114
115    fn user_id(&self) -> &UserId {
116        &self.store.static_account().user_id
117    }
118
119    /// Receive a successful `/keys/query` response.
120    ///
121    /// Returns a list of devices newly discovered devices and devices that
122    /// changed.
123    ///
124    /// # Arguments
125    ///
126    /// * `request_id` - The request_id returned by `users_for_key_query` or
127    ///   `build_key_query_for_users`
128    /// * `response` - The response of the `/keys/query` request that the client
129    ///   performed.
130    pub async fn receive_keys_query_response(
131        &self,
132        request_id: &TransactionId,
133        response: &KeysQueryResponse,
134    ) -> OlmResult<(DeviceChanges, IdentityChanges)> {
135        debug!(
136            ?request_id,
137            users = ?response.device_keys.keys().collect::<BTreeSet<_>>(),
138            failures = ?response.failures,
139            "Handling a `/keys/query` response"
140        );
141
142        // Parse the strings into server names and filter out our own server. We should
143        // never get failures from our own server but let's remove it as a
144        // precaution anyways.
145        let failed_servers = response
146            .failures
147            .keys()
148            .filter_map(|k| ServerName::parse(k).ok())
149            .filter(|s| s != self.user_id().server_name());
150        let successful_servers = response.device_keys.keys().map(|u| u.server_name());
151
152        // Append the new failed servers and remove any successful servers. We
153        // need to explicitly remove the successful servers because the cache
154        // doesn't automatically remove entries that elapse. Instead, the effect
155        // is that elapsed servers will be retried and their delays incremented.
156        self.failures.extend(failed_servers);
157        self.failures.remove(successful_servers);
158
159        let devices = self.handle_devices_from_key_query(response.device_keys.clone()).await?;
160        let (identities, cross_signing_identity) = self.handle_cross_signing_keys(response).await?;
161
162        let changes = Changes {
163            identities: identities.clone(),
164            devices: devices.clone(),
165            private_identity: cross_signing_identity,
166            ..Default::default()
167        };
168
169        self.store.save_changes(changes).await?;
170
171        // Update the sender data on any existing inbound group sessions based on the
172        // changes in this response.
173        //
174        // `update_sender_data_from_device_changes` relies on being able to look up the
175        // user identities from the store, so this has to happen *after* the
176        // changes from `handle_cross_signing_keys` are saved.
177        //
178        // Note: it might be possible for this to race against session creation. If a
179        // new session is received at the same time as a `/keys/query` response is being
180        // processed, it could be saved without up-to-date sender data, but it might be
181        // saved too late for it to be picked up by
182        // `update_sender_data_from_device_changes`. However, this should be rare,
183        // since, in general, /sync responses which might create a new session
184        // are not processed at the same time as /keys/query responses (assuming
185        // that the application does not call `OlmMachine::receive_sync_changes`
186        // at the same time as `OlmMachine::mark_request_as_sent`).
187        self.update_sender_data_from_device_changes(&devices).await?;
188
189        // if this request is one of those we expected to be in flight, pass the
190        // sequence number back to the store so that it can mark devices up to
191        // date
192        let sequence_number = {
193            let mut request_details = self.keys_query_request_details.lock().await;
194
195            request_details.as_mut().and_then(|details| {
196                if details.request_ids.remove(request_id) {
197                    Some(details.sequence_number)
198                } else {
199                    None
200                }
201            })
202        };
203
204        if let Some(sequence_number) = sequence_number {
205            let cache = self.store.cache().await?;
206            self.key_query_manager
207                .synced(&cache)
208                .await?
209                .mark_tracked_users_as_up_to_date(
210                    response.device_keys.keys().map(Deref::deref),
211                    sequence_number,
212                )
213                .await?;
214        }
215
216        if enabled!(Level::DEBUG) {
217            debug_log_keys_query_response(&devices, &identities, request_id);
218        }
219
220        Ok((devices, identities))
221    }
222
223    async fn update_or_create_device(
224        store: Store,
225        device_keys: DeviceKeys,
226    ) -> StoreResult<DeviceChange> {
227        let old_device =
228            store.get_device_data(&device_keys.user_id, &device_keys.device_id).await?;
229
230        if let Some(mut device) = old_device {
231            match device.update_device(&device_keys) {
232                Err(e) => {
233                    warn!(
234                        user_id = ?device.user_id(),
235                        device_id = ?device.device_id(),
236                        error = ?e,
237                        "Rejecting device update",
238                    );
239                    Ok(DeviceChange::None)
240                }
241                Ok(true) => Ok(DeviceChange::Updated(device)),
242                Ok(false) => Ok(DeviceChange::None),
243            }
244        } else {
245            match DeviceData::try_from(&device_keys) {
246                Ok(d) => {
247                    // If this is our own device, check that the server isn't
248                    // lying about our keys, also mark the device as locally
249                    // trusted.
250                    if d.user_id() == store.user_id() && d.device_id() == store.device_id() {
251                        let local_device_keys = store.static_account().unsigned_device_keys();
252
253                        if d.keys() == &local_device_keys.keys {
254                            d.set_trust_state(LocalTrust::Verified);
255
256                            trace!(
257                                user_id = ?d.user_id(),
258                                device_id = ?d.device_id(),
259                                keys = ?d.keys(),
260                                "Adding our own device to the device store, \
261                                marking it as locally verified",
262                            );
263
264                            Ok(DeviceChange::New(d))
265                        } else {
266                            Ok(DeviceChange::None)
267                        }
268                    } else {
269                        trace!(
270                            user_id = ?d.user_id(),
271                            device_id = ?d.device_id(),
272                            keys = ?d.keys(),
273                            "Adding a new device to the device store",
274                        );
275
276                        Ok(DeviceChange::New(d))
277                    }
278                }
279                Err(e) => {
280                    warn!(
281                        user_id = ?device_keys.user_id,
282                        device_id = ?device_keys.device_id,
283                        error = ?e,
284                        "Rejecting a previously unseen device",
285                    );
286
287                    Ok(DeviceChange::None)
288                }
289            }
290        }
291    }
292
293    async fn update_user_devices(
294        store: Store,
295        user_id: OwnedUserId,
296        device_map: BTreeMap<OwnedDeviceId, Raw<ruma::encryption::DeviceKeys>>,
297    ) -> StoreResult<DeviceChanges> {
298        let own_device_id = store.static_account().device_id().to_owned();
299
300        let mut changes = DeviceChanges::default();
301
302        let current_devices: HashSet<OwnedDeviceId> = device_map.keys().cloned().collect();
303
304        let tasks = device_map.into_iter().filter_map(|(device_id, device_keys)| match device_keys
305            .deserialize_as::<DeviceKeys>(
306        ) {
307            Ok(device_keys) => {
308                if user_id != device_keys.user_id || device_id != device_keys.device_id {
309                    warn!(
310                        ?user_id,
311                        ?device_id,
312                        device_key_user = ?device_keys.user_id,
313                        device_key_device_id = ?device_keys.device_id,
314                        "Mismatch in the device keys payload",
315                    );
316                    None
317                } else {
318                    Some(spawn(Self::update_or_create_device(store.clone(), device_keys)))
319                }
320            }
321            Err(e) => {
322                warn!(
323                    ?user_id, ?device_id, error = ?e,
324                    "Device keys failed to deserialize",
325                );
326                None
327            }
328        });
329
330        let results = join_all(tasks).await;
331
332        for device in results {
333            let device = device.expect("Creating or updating a device panicked")?;
334
335            match device {
336                DeviceChange::New(d) => changes.new.push(d),
337                DeviceChange::Updated(d) => changes.changed.push(d),
338                DeviceChange::None => (),
339            }
340        }
341
342        let current_devices: HashSet<&OwnedDeviceId> = current_devices.iter().collect();
343        let stored_devices = store.get_device_data_for_user(&user_id).await?;
344        let stored_devices_set: HashSet<&OwnedDeviceId> = stored_devices.keys().collect();
345        let deleted_devices_set = stored_devices_set.difference(&current_devices);
346
347        let own_user_id = store.static_account().user_id();
348        for device_id in deleted_devices_set {
349            if user_id == *own_user_id && *device_id == &own_device_id {
350                let identity_keys = store.static_account().identity_keys();
351
352                warn!(
353                    user_id = ?own_user_id,
354                    device_id = ?own_device_id,
355                    curve25519_key = ?identity_keys.curve25519,
356                    ed25519_key = ?identity_keys.ed25519,
357                    "Our own device might have been deleted"
358                );
359            } else if let Some(device) = stored_devices.get(*device_id) {
360                device.mark_as_deleted();
361                changes.deleted.push(device.clone());
362            }
363        }
364
365        Ok(changes)
366    }
367
368    /// Handle the device keys part of a key query response.
369    ///
370    /// # Arguments
371    ///
372    /// * `device_keys_map` - A map holding the device keys of the users for
373    ///   which the key query was done.
374    ///
375    /// Returns a list of devices that changed. Changed here means either
376    /// they are new, one of their properties has changed or they got deleted.
377    async fn handle_devices_from_key_query(
378        &self,
379        device_keys_map: BTreeMap<
380            OwnedUserId,
381            BTreeMap<OwnedDeviceId, Raw<ruma::encryption::DeviceKeys>>,
382        >,
383    ) -> StoreResult<DeviceChanges> {
384        let mut changes = DeviceChanges::default();
385
386        let tasks = device_keys_map.into_iter().map(|(user_id, device_keys_map)| {
387            spawn(Self::update_user_devices(self.store.clone(), user_id, device_keys_map))
388        });
389
390        let results = join_all(tasks).await;
391
392        for result in results {
393            let change_fragment = result.expect("Panic while updating user devices")?;
394
395            changes.extend(change_fragment);
396        }
397
398        Ok(changes)
399    }
400
401    /// Check if the given public identity matches our stored private one.
402    ///
403    /// If they don't match, this is an indication that our identity has been
404    /// rotated. In this case we return `Some(cleared_private_identity)`,
405    /// where `cleared_private_identity` is our currently-stored
406    /// private identity with the conflicting keys removed.
407    ///
408    /// Otherwise, assuming we do have a private master cross-signing key, we
409    /// mark the public identity as verified.
410    ///
411    /// # Returns
412    ///
413    /// If the private identity needs updating (because it does not match the
414    /// public keys), the updated private identity (which will need to be
415    /// persisted).
416    ///
417    /// Otherwise, `None`.
418    async fn check_private_identity(
419        &self,
420        identity: &OwnUserIdentityData,
421    ) -> Option<PrivateCrossSigningIdentity> {
422        let private_identity = self.store.private_identity();
423        let private_identity = private_identity.lock().await;
424        let result = private_identity.clear_if_differs(identity).await;
425
426        if result.any_differ() {
427            info!(cleared = ?result, "Removed some or all of our private cross signing keys");
428            Some((*private_identity).clone())
429        } else {
430            // If the master key didn't rotate above (`clear_if_differs`),
431            // then this means that the public part and the private parts of
432            // the master key match. We previously did a signature check, so
433            // this means that the private part of the master key has signed
434            // the identity. We can safely mark the public part of the
435            // identity as verified.
436            if private_identity.has_master_key().await && !identity.is_verified() {
437                trace!("Marked our own identity as verified");
438                identity.mark_as_verified()
439            }
440
441            None
442        }
443    }
444
445    /// Process an identity received in a `/keys/query` response that we
446    /// previously knew about.
447    ///
448    /// If the identity is our own, we will look for a user-signing key; if one
449    /// is not found, an error is returned. Otherwise, we then compare the
450    /// received public identity against our stored private identity;
451    /// if they match, the returned public identity is marked as verified and
452    /// `*changed_private_identity` is set to `None`. If they do *not* match,
453    /// it is an indication that our identity has been rotated, and
454    /// `*changed_private_identity` is set to our currently-stored private
455    /// identity with the conflicting keys removed (which will need to be
456    /// persisted).
457    ///
458    /// Whether the identity is our own or that of another, we check whether
459    /// there has been any change to the cross-signing keys, and classify
460    /// the result into [`IdentityUpdateResult::Updated`] or
461    /// [`IdentityUpdateResult::Unchanged`].
462    ///
463    /// # Arguments
464    ///
465    /// * `response` - The entire `/keys/query` response.
466    /// * `master_key` - The public master cross-signing key from the
467    ///   `/keys/query` response.
468    /// * `self_signing` - The public self-signing key from the `/keys/query`
469    ///   response.
470    /// * `i` - The existing identity for this user.
471    /// * `changed_private_identity` - Output parameter. Unchanged if the
472    ///   identity is that of another user. If it is our own, set to `None` or
473    ///   `Some` depending on whether our stored private identity needs
474    ///   updating. See above for more detail.
475    async fn handle_changed_identity(
476        &self,
477        response: &KeysQueryResponse,
478        maybe_verified_own_identity: Option<&OwnUserIdentity>,
479        master_key: MasterPubkey,
480        self_signing: SelfSigningPubkey,
481        i: UserIdentityData,
482        changed_private_identity: &mut Option<PrivateCrossSigningIdentity>,
483    ) -> Result<IdentityUpdateResult, SignatureError> {
484        match i {
485            UserIdentityData::Own(mut identity) => {
486                let user_signing = self.get_user_signing_key_from_response(response)?;
487                let has_changed = identity.update(master_key, self_signing, user_signing)?;
488                *changed_private_identity = self.check_private_identity(&identity).await;
489                if has_changed {
490                    Ok(IdentityUpdateResult::Updated(identity.into()))
491                } else {
492                    Ok(IdentityUpdateResult::Unchanged(identity.into()))
493                }
494            }
495            UserIdentityData::Other(mut identity) => {
496                let has_changed = identity.update(
497                    master_key,
498                    self_signing,
499                    maybe_verified_own_identity.map(|o| o.user_signing_key()),
500                )?;
501
502                if has_changed {
503                    Ok(IdentityUpdateResult::Updated(identity.into()))
504                } else {
505                    Ok(IdentityUpdateResult::Unchanged(identity.into()))
506                }
507            }
508        }
509    }
510
511    /// Process an identity received in a `/keys/query` response that we didn't
512    /// previously know about.
513    ///
514    /// If the identity is our own, we will look for a user-signing key, and if
515    /// it is present and correct, all three keys will be returned in the
516    /// `IdentityChange` result; otherwise, an error is returned. We will also
517    /// compare the received public identity against our stored private
518    /// identity; if they match, the returned public identity is marked as
519    /// verified and `*changed_private_identity` is set to `None`. If they do
520    /// *not* match, it is an indication that our identity has been rotated,
521    /// and `*changed_private_identity` is set to our currently-stored
522    /// private identity with the conflicting keys removed (which will need
523    /// to be persisted).
524    ///
525    /// If the identity is that of another user, we just parse the keys into the
526    /// `IdentityChange` result, since all other checks have already been done.
527    ///
528    /// # Arguments
529    ///
530    /// * `response` - The entire `/keys/query` response.
531    /// * `master_key` - The public master cross-signing key from the
532    ///   `/keys/query` response.
533    /// * `self_signing` - The public self-signing key from the `/keys/query`
534    ///   response.
535    /// * `changed_private_identity` - Output parameter. Unchanged if the
536    ///   identity is that of another user. If it is our own, set to `None` or
537    ///   `Some` depending on whether our stored private identity needs
538    ///   updating. See above for more detail.
539    async fn handle_new_identity(
540        &self,
541        response: &KeysQueryResponse,
542        maybe_verified_own_identity: Option<&OwnUserIdentity>,
543        master_key: MasterPubkey,
544        self_signing: SelfSigningPubkey,
545        changed_private_identity: &mut Option<PrivateCrossSigningIdentity>,
546    ) -> Result<UserIdentityData, SignatureError> {
547        if master_key.user_id() == self.user_id() {
548            // Own identity
549            let user_signing = self.get_user_signing_key_from_response(response)?;
550            let identity = OwnUserIdentityData::new(master_key, self_signing, user_signing)?;
551            *changed_private_identity = self.check_private_identity(&identity).await;
552            Ok(identity.into())
553        } else {
554            // First time seen, create the identity. The current MSK will be pinned.
555            let identity = OtherUserIdentityData::new(master_key, self_signing)?;
556            let is_verified = maybe_verified_own_identity
557                .is_some_and(|own_user_identity| own_user_identity.is_identity_signed(&identity));
558            if is_verified {
559                identity.mark_as_previously_verified();
560            }
561
562            Ok(identity.into())
563        }
564    }
565
566    /// Try to deserialize the master key and self-signing key of an
567    /// identity from a `/keys/query` response.
568    ///
569    /// Each user identity *must* at least contain a master and self-signing
570    /// key, and this function deserializes them. (Our own identity, in addition
571    /// to those two, also contains a user-signing key, but that is not
572    /// extracted here; see
573    /// [`IdentityManager::get_user_signing_key_from_response`])
574    ///
575    /// # Arguments
576    ///
577    ///  * `master_key` - The master key for a particular user from a
578    ///    `/keys/query` response.
579    ///  * `response` - The entire `/keys/query` response.
580    ///
581    /// # Returns
582    ///
583    /// `None` if the self-signing key couldn't be found in the response, or the
584    /// one of the keys couldn't be deserialized. Else, the deserialized
585    /// public keys.
586    fn get_minimal_set_of_keys(
587        master_key: &Raw<CrossSigningKey>,
588        response: &KeysQueryResponse,
589    ) -> Option<(MasterPubkey, SelfSigningPubkey)> {
590        match master_key.deserialize_as_unchecked::<MasterPubkey>() {
591            Ok(master_key) => {
592                if let Some(self_signing) = response
593                    .self_signing_keys
594                    .get(master_key.user_id())
595                    .and_then(|k| k.deserialize_as_unchecked::<SelfSigningPubkey>().ok())
596                {
597                    Some((master_key, self_signing))
598                } else {
599                    warn!(
600                        "A user identity didn't contain a self signing pubkey or the key was invalid"
601                    );
602                    None
603                }
604            }
605            Err(e) => {
606                warn!(
607                    error = ?e,
608                    "Couldn't update or create new user identity"
609                );
610                None
611            }
612        }
613    }
614
615    /// Try to deserialize the our user-signing key from a `/keys/query`
616    /// response.
617    ///
618    /// If a `/keys/query` response includes our own cross-signing keys, then it
619    /// should include our user-signing key. This method attempts to
620    /// extract, deserialize, and check the key from the response.
621    ///
622    /// # Arguments
623    ///
624    /// * `response` - the entire `/keys/query` response.
625    fn get_user_signing_key_from_response(
626        &self,
627        response: &KeysQueryResponse,
628    ) -> Result<UserSigningPubkey, SignatureError> {
629        let Some(user_signing) = response
630            .user_signing_keys
631            .get(self.user_id())
632            .and_then(|k| k.deserialize_as_unchecked::<UserSigningPubkey>().ok())
633        else {
634            warn!(
635                "User identity for our own user didn't contain a user signing pubkey or the key \
636                    isn't valid",
637            );
638            return Err(SignatureError::MissingSigningKey);
639        };
640
641        if user_signing.user_id() != self.user_id() {
642            warn!(
643                expected = ?self.user_id(),
644                got = ?user_signing.user_id(),
645                "User ID mismatch in our user-signing key",
646            );
647            return Err(SignatureError::UserIdMismatch);
648        }
649
650        Ok(user_signing)
651    }
652
653    /// Process the cross-signing keys for a particular identity from a
654    /// `/keys/query` response.
655    ///
656    /// Checks that the keys are consistent, verifies the updates, and produces
657    /// a list of changes to be stored.
658    ///
659    /// # Arguments
660    ///
661    /// * `response` - The entire `/keys/query` response.
662    /// * `changes` - The identity results so far, which we will add to.
663    /// * `changed_identity` - Output parameter: Unchanged if the identity is
664    ///   that of another user. If it is our own, set to `None` or `Some`
665    ///   depending on whether our stored private identity needs updating.
666    /// * `maybe_verified_own_identity` - Own verified identity if any to check
667    ///   verification status of updated identity.
668    /// * `key_set_info` - The identity info as returned by the `/keys/query`
669    ///   response.
670    #[instrument(skip_all, fields(user_id))]
671    async fn update_or_create_identity(
672        &self,
673        response: &KeysQueryResponse,
674        changes: &mut IdentityChanges,
675        changed_private_identity: &mut Option<PrivateCrossSigningIdentity>,
676        maybe_verified_own_identity: Option<&OwnUserIdentity>,
677        key_set_info: KeySetInfo,
678    ) -> StoreResult<()> {
679        let KeySetInfo { user_id, master_key, self_signing } = key_set_info;
680        if master_key.user_id() != user_id || self_signing.user_id() != user_id {
681            warn!(?user_id, "User ID mismatch in one of the cross signing keys");
682        } else if let Some(i) = self.store.get_user_identity(&user_id).await? {
683            // an identity we knew about before, which is being updated
684            match self
685                .handle_changed_identity(
686                    response,
687                    maybe_verified_own_identity,
688                    master_key,
689                    self_signing,
690                    i,
691                    changed_private_identity,
692                )
693                .await
694            {
695                Ok(IdentityUpdateResult::Updated(identity)) => {
696                    trace!(?identity, "Updated a user identity");
697                    changes.changed.push(identity);
698                }
699                Ok(IdentityUpdateResult::Unchanged(identity)) => {
700                    trace!(?identity, "Received an unchanged user identity");
701                    changes.unchanged.push(identity);
702                }
703                Err(e) => {
704                    warn!(error = ?e, "Couldn't update an existing user identity");
705                }
706            }
707        } else {
708            // an identity we did not know about before
709            match self
710                .handle_new_identity(
711                    response,
712                    maybe_verified_own_identity,
713                    master_key,
714                    self_signing,
715                    changed_private_identity,
716                )
717                .await
718            {
719                Ok(identity) => {
720                    trace!(?identity, "Created new user identity");
721                    changes.new.push(identity);
722                }
723                Err(e) => {
724                    warn!(error = ?e, "Couldn't create new user identity");
725                }
726            }
727        }
728
729        Ok(())
730    }
731
732    /// Handle the cross signing keys part of a key query response.
733    ///
734    /// # Arguments
735    ///
736    /// * `response` - The `/keys/query` response.
737    ///
738    /// # Returns
739    ///
740    /// The processed results, to be saved to the datastore, comprising:
741    ///
742    ///  * A list of public identities that were received, categorised as "new",
743    ///    "changed" or "unchanged".
744    ///
745    ///  * If our own identity was updated and did not match our private
746    ///    identity, an update to that private identity. Otherwise, `None`.
747    async fn handle_cross_signing_keys(
748        &self,
749        response: &KeysQueryResponse,
750    ) -> StoreResult<(IdentityChanges, Option<PrivateCrossSigningIdentity>)> {
751        let mut changes = IdentityChanges::default();
752        let mut changed_identity = None;
753
754        // We want to check if the updated/new other identities are trusted by us or
755        // not. This is based on the current verified state of the own identity.
756        let maybe_own_verified_identity = self
757            .store
758            .get_identity(self.user_id())
759            .await?
760            .and_then(UserIdentity::own)
761            .filter(|own| own.is_verified());
762
763        for (user_id, master_key) in &response.master_keys {
764            // Get the master and self-signing key for each identity; those are required for
765            // every user identity type. If we don't have those we skip over.
766            let Some((master_key, self_signing)) =
767                Self::get_minimal_set_of_keys(master_key.cast_ref(), response)
768            else {
769                continue;
770            };
771
772            let key_set_info = KeySetInfo { user_id: user_id.clone(), master_key, self_signing };
773
774            self.update_or_create_identity(
775                response,
776                &mut changes,
777                &mut changed_identity,
778                maybe_own_verified_identity.as_ref(),
779                key_set_info,
780            )
781            .await?;
782        }
783
784        Ok((changes, changed_identity))
785    }
786
787    /// Generate an "out-of-band" key query request for the given set of users.
788    ///
789    /// Unlike the regular key query requests returned by `users_for_key_query`,
790    /// there can be several of these in flight at once. This can be useful
791    /// if we need results to be as up-to-date as possible.
792    ///
793    /// Once the request has been made, the response can be fed back into the
794    /// IdentityManager and store by calling `receive_keys_query_response`.
795    ///
796    /// # Arguments
797    ///
798    /// * `users` - list of users whose keys should be queried
799    ///
800    /// # Returns
801    ///
802    /// A tuple containing the request ID for the request, and the request
803    /// itself.
804    pub(crate) fn build_key_query_for_users<'a>(
805        &self,
806        users: impl IntoIterator<Item = &'a UserId>,
807    ) -> (OwnedTransactionId, KeysQueryRequest) {
808        // Since this is an "out-of-band" request, we just make up a transaction ID and
809        // do not store the details in `self.keys_query_request_details`.
810        //
811        // `receive_keys_query_response` will process the response as normal, except
812        // that it will not mark the users as "up-to-date".
813
814        // We assume that there aren't too many users here; if we find a usecase that
815        // requires lots of users to be up-to-date we may need to rethink this.
816        (TransactionId::new(), KeysQueryRequest::new(users.into_iter().map(|u| u.to_owned())))
817    }
818
819    /// Get a list of key query requests needed.
820    ///
821    /// # Returns
822    ///
823    /// A map of a request ID to the `/keys/query` request.
824    ///
825    /// The response of a successful key query requests needs to be passed to
826    /// the [`OlmMachine`] with the [`receive_keys_query_response`].
827    ///
828    /// [`receive_keys_query_response`]: Self::receive_keys_query_response
829    pub async fn users_for_key_query(
830        &self,
831    ) -> StoreResult<BTreeMap<OwnedTransactionId, KeysQueryRequest>> {
832        // Forget about any previous key queries in flight.
833        *self.keys_query_request_details.lock().await = None;
834
835        // We always want to track our own user, but in case we aren't in an encrypted
836        // room yet, we won't be tracking ourselves yet. This ensures we are always
837        // tracking ourselves.
838        //
839        // The check for emptiness is done first for performance.
840        let (users, sequence_number) = {
841            let cache = self.store.cache().await?;
842            let key_query_manager = self.key_query_manager.synced(&cache).await?;
843
844            let (users, sequence_number) = key_query_manager.users_for_key_query().await;
845
846            if users.is_empty() && !key_query_manager.tracked_users().contains(self.user_id()) {
847                key_query_manager.mark_user_as_changed(self.user_id()).await?;
848                key_query_manager.users_for_key_query().await
849            } else {
850                (users, sequence_number)
851            }
852        };
853
854        if users.is_empty() {
855            Ok(BTreeMap::new())
856        } else {
857            // Let's remove users that are part of the `FailuresCache`. The cache, which is
858            // a TTL cache, remembers users for which a previous `/key/query` request has
859            // failed. We don't retry a `/keys/query` for such users for a
860            // certain amount of time.
861            let users = users.into_iter().filter(|u| !self.failures.contains(u.server_name()));
862
863            // We don't want to create a single `/keys/query` request with an infinite
864            // amount of users. Some servers will likely bail out after a
865            // certain amount of users and the responses will be large. In the
866            // case of a transmission error, we'll have to retransmit the large
867            // response.
868            //
869            // Convert the set of users into multiple /keys/query requests.
870            let requests: BTreeMap<_, _> = users
871                .chunks(Self::MAX_KEY_QUERY_USERS)
872                .into_iter()
873                .map(|user_chunk| {
874                    let request_id = TransactionId::new();
875                    let request = KeysQueryRequest::new(user_chunk);
876
877                    debug!(?request_id, users = ?request.device_keys.keys(), "Created a /keys/query request");
878
879                    (request_id, request)
880                })
881                .collect();
882
883            // Collect the request IDs, these will be used later in the
884            // `receive_keys_query_response()` method to figure out if the user can be
885            // marked as up-to-date/non-dirty.
886            let request_ids = requests.keys().cloned().collect();
887            let request_details = KeysQueryRequestDetails { sequence_number, request_ids };
888
889            *self.keys_query_request_details.lock().await = Some(request_details);
890
891            Ok(requests)
892        }
893    }
894
895    /// Receive the list of users that contained changed devices from the
896    /// `/sync` response.
897    ///
898    /// This will queue up the given user for a key query.
899    ///
900    /// Note: The user already needs to be tracked for it to be queued up for a
901    /// key query.
902    pub async fn receive_device_changes(
903        &self,
904        cache: &StoreCache,
905        users: impl Iterator<Item = &UserId>,
906    ) -> StoreResult<()> {
907        self.key_query_manager.synced(cache).await?.mark_tracked_users_as_changed(users).await
908    }
909
910    /// See the docs for [`OlmMachine::update_tracked_users()`].
911    pub async fn update_tracked_users(
912        &self,
913        users: impl IntoIterator<Item = &UserId>,
914    ) -> StoreResult<()> {
915        let cache = self.store.cache().await?;
916        self.key_query_manager.synced(&cache).await?.update_tracked_users(users.into_iter()).await
917    }
918
919    /// Retrieve a list of a user's current devices, so we can encrypt a message
920    /// to them.
921    ///
922    /// If we have not yet seen any devices for the user, and their device list
923    /// has been marked as outdated, then we wait for the `/keys/query` request
924    /// to complete. This helps ensure that we attempt at least once to fetch a
925    /// user's devices before encrypting to them.
926    pub async fn get_user_devices_for_encryption(
927        &self,
928        users: impl Iterator<Item = &UserId>,
929    ) -> StoreResult<HashMap<OwnedUserId, HashMap<OwnedDeviceId, DeviceData>>> {
930        // How long we wait for /keys/query to complete.
931        const KEYS_QUERY_WAIT_TIME: Duration = Duration::from_secs(5);
932
933        let mut devices_by_user = HashMap::new();
934        let mut users_with_no_devices_on_failed_servers = Vec::new();
935        let mut users_with_no_devices_on_unfailed_servers = Vec::new();
936
937        for user_id in users {
938            // First of all, check the store for this user.
939            let devices = self.store.get_device_data_for_user_filtered(user_id).await?;
940
941            // Now, look for users who have no devices at all.
942            //
943            // If a user has no devices at all, that implies we have never (successfully)
944            // done a `/keys/query` for them; we wait for one to complete if it is
945            // in flight. (Of course, the user might genuinely have no devices, but
946            // that's fine, it just means we redundantly grab the cache guard and
947            // check the pending-query flag.)
948            if !devices.is_empty() {
949                // This user has at least one known device.
950                //
951                // The device list may also be outdated in this case; but in this
952                // situation, we are racing between sending a message and retrieving their
953                // device list. That's an inherently racy situation and there is no real
954                // benefit to waiting for the `/keys/query` request to complete. So we don't
955                // bother.
956                //
957                // We just add their devices to the result and carry on.
958                devices_by_user.insert(user_id.to_owned(), devices);
959                continue;
960            }
961
962            // *However*, if the user's server is currently subject to a backoff due to
963            // previous failures, then `users_for_key_query` won't attempt to query
964            // for the user's devices, so there's no point waiting.
965            //
966            // XXX: this is racy. It's possible that:
967            //  * `failures` included the user's server when `users_for_key_query` was
968            //    called, so the user was not returned in the `KeyQueryRequest`, and:
969            //  * The backoff has now expired.
970            //
971            // In that case, we'll end up waiting for the *next* `users_for_key_query` call,
972            // which might not be for 30 seconds or so. (And by then, it might be `failed`
973            // again.)
974            if self.failures.contains(user_id.server_name()) {
975                users_with_no_devices_on_failed_servers.push(user_id);
976                continue;
977            }
978
979            users_with_no_devices_on_unfailed_servers.push(user_id);
980        }
981
982        if !users_with_no_devices_on_failed_servers.is_empty() {
983            info!(
984                ?users_with_no_devices_on_failed_servers,
985                "Not waiting for `/keys/query` for users whose server has previously failed"
986            );
987        }
988
989        if !users_with_no_devices_on_unfailed_servers.is_empty() {
990            // For each user with no devices, fire off a task to wait for a `/keys/query`
991            // result if one is pending.
992            //
993            // We don't actually update the `devices_by_user` map here since that could
994            // require concurrent access to it. Instead each task returns a
995            // `(OwnedUserId, HashMap)` pair (or rather, an `Option` of one) so that we can
996            // add the results to the map.
997            let results = join_all(
998                users_with_no_devices_on_unfailed_servers
999                    .into_iter()
1000                    .map(|user_id| self.get_updated_keys_for_user(KEYS_QUERY_WAIT_TIME, user_id)),
1001            )
1002            .await;
1003
1004            // Once all the tasks have completed, process the results.
1005            let mut updated_users = Vec::new();
1006            for result in results {
1007                if let Some((user_id, updated_devices)) = result? {
1008                    devices_by_user.insert(user_id.to_owned(), updated_devices);
1009                    updated_users.push(user_id);
1010                }
1011            }
1012
1013            if !updated_users.is_empty() {
1014                info!(
1015                    ?updated_users,
1016                    "Waited for `/keys/query` to complete for users who have no devices"
1017                );
1018            }
1019        }
1020
1021        Ok(devices_by_user)
1022    }
1023
1024    /// Helper for get_user_devices_for_encryption.
1025    ///
1026    /// Waits for any pending `/keys/query` for the given user. If one was
1027    /// pending, reloads the device list and returns `Some(user_id,
1028    /// device_list)`. If no request was pending, returns `None`.
1029    #[allow(clippy::type_complexity)]
1030    #[instrument(skip(self))]
1031    async fn get_updated_keys_for_user<'a>(
1032        &self,
1033        timeout_duration: Duration,
1034        user_id: &'a UserId,
1035    ) -> Result<Option<(&'a UserId, HashMap<OwnedDeviceId, DeviceData>)>, CryptoStoreError> {
1036        let cache = self.store.cache().await?;
1037        match self
1038            .key_query_manager
1039            .wait_if_user_key_query_pending(cache, timeout_duration, user_id)
1040            .await?
1041        {
1042            UserKeyQueryResult::WasPending => {
1043                Ok(Some((user_id, self.store.get_device_data_for_user_filtered(user_id).await?)))
1044            }
1045            _ => Ok(None),
1046        }
1047    }
1048
1049    /// Given a list of changed devices, update any [`InboundGroupSession`]s
1050    /// which were sent from those devices and which do not have complete
1051    /// sender data.
1052    async fn update_sender_data_from_device_changes(
1053        &self,
1054        device_changes: &DeviceChanges,
1055    ) -> Result<(), CryptoStoreError> {
1056        for device in device_changes.new.iter().chain(device_changes.changed.iter()) {
1057            // 1. Look for InboundGroupSessions from the device whose sender_data is
1058            //    UnknownDevice. For such sessions, we now have the device, and can update
1059            //    the sender_data accordingly.
1060            //
1061            // In theory, we only need to do this for new devices. In practice, I'm a bit
1062            // worried about races leading us to getting stuck in the
1063            // UnknownDevice state, so we'll paper over that by doing this check
1064            // on device updates too.
1065            self.update_sender_data_for_sessions_for_device(device, SenderDataType::UnknownDevice)
1066                .await?;
1067
1068            // 2. If, and only if, the device is now correctly cross-signed (ie,
1069            //    device.is_cross_signed_by_owner() is true, and we have the master
1070            //    cross-signing key for the owner), look for InboundGroupSessions from the
1071            //    device whose sender_data is DeviceInfo. We can also update the sender_data
1072            //    for these sessions.
1073            //
1074            // In theory, we can skip a couple of steps of the SenderDataFinder algorithm,
1075            // because we're doing the cross-signing check here. In practice,
1076            // it's *way* easier just to use the same logic.
1077            let device_owner_identity = self.store.get_user_identity(device.user_id()).await?;
1078            if device_owner_identity.is_some_and(|id| device.is_cross_signed_by_owner(&id)) {
1079                self.update_sender_data_for_sessions_for_device(device, SenderDataType::DeviceInfo)
1080                    .await?;
1081            }
1082        }
1083
1084        Ok(())
1085    }
1086
1087    /// Given a device, look for [`InboundGroupSession`]s whose sender data is
1088    /// in the given state, and update it.
1089    #[instrument(skip(self))]
1090    async fn update_sender_data_for_sessions_for_device(
1091        &self,
1092        device: &DeviceData,
1093        sender_data_type: SenderDataType,
1094    ) -> Result<(), CryptoStoreError> {
1095        const IGS_BATCH_SIZE: usize = 50;
1096
1097        let Some(curve_key) = device.curve25519_key() else { return Ok(()) };
1098
1099        let mut last_session_id: Option<String> = None;
1100        loop {
1101            let mut sessions = self
1102                .store
1103                .get_inbound_group_sessions_for_device_batch(
1104                    curve_key,
1105                    sender_data_type,
1106                    last_session_id,
1107                    IGS_BATCH_SIZE,
1108                )
1109                .await?;
1110
1111            if sessions.is_empty() {
1112                // end of the session list
1113                return Ok(());
1114            }
1115
1116            last_session_id = None;
1117            for session in &mut sessions {
1118                last_session_id = Some(session.session_id().to_owned());
1119                self.update_sender_data_for_session(session, device).await?;
1120            }
1121            self.store.save_inbound_group_sessions(&sessions).await?;
1122        }
1123    }
1124
1125    /// Update the sender data on the given inbound group session, using the
1126    /// given device data.
1127    #[instrument(skip(self, device, session), fields(session_id = session.session_id()))]
1128    async fn update_sender_data_for_session(
1129        &self,
1130        session: &mut InboundGroupSession,
1131        device: &DeviceData,
1132    ) -> Result<(), CryptoStoreError> {
1133        match SenderDataFinder::find_using_device_data(&self.store, device.clone(), session).await {
1134            Ok(sender_data) => {
1135                debug!("Updating existing InboundGroupSession with new SenderData {sender_data:?}");
1136                session.sender_data = sender_data;
1137            }
1138            Err(SessionDeviceCheckError::CryptoStoreError(e)) => {
1139                return Err(e);
1140            }
1141            Err(SessionDeviceCheckError::MismatchedIdentityKeys(e)) => {
1142                warn!(
1143                    ?session,
1144                    ?device,
1145                    "cannot update existing InboundGroupSession due to ownership error: {e}",
1146                );
1147            }
1148        }
1149
1150        Ok(())
1151    }
1152
1153    /// Mark all tracked users as dirty.
1154    ///
1155    /// All users *whose device lists we are tracking* are flagged as needing a
1156    /// key query. Users whose devices we are not tracking are ignored.
1157    pub(crate) async fn mark_all_tracked_users_as_dirty(
1158        &self,
1159        store_cache: StoreCacheGuard,
1160    ) -> StoreResult<()> {
1161        let store_wrapper = store_cache.store_wrapper();
1162        let tracked_users = store_wrapper.load_tracked_users().await?;
1163
1164        self.key_query_manager
1165            .synced(&store_cache)
1166            .await?
1167            .mark_tracked_users_as_changed(
1168                tracked_users.iter().map(|tracked_user| tracked_user.user_id.as_ref()),
1169            )
1170            .await?;
1171
1172        Ok(())
1173    }
1174}
1175
1176/// Log information about what changed after processing a /keys/query response.
1177/// Only does anything if the DEBUG log level is enabled.
1178fn debug_log_keys_query_response(
1179    devices: &DeviceChanges,
1180    identities: &IdentityChanges,
1181    request_id: &TransactionId,
1182) {
1183    #[allow(unknown_lints, clippy::unwrap_or_default)] // false positive
1184    let changed_devices = devices.changed.iter().fold(BTreeMap::new(), |mut acc, d| {
1185        acc.entry(d.user_id()).or_insert_with(BTreeSet::new).insert(d.device_id());
1186        acc
1187    });
1188
1189    #[allow(unknown_lints, clippy::unwrap_or_default)] // false positive
1190    let new_devices = devices.new.iter().fold(BTreeMap::new(), |mut acc, d| {
1191        acc.entry(d.user_id()).or_insert_with(BTreeSet::new).insert(d.device_id());
1192        acc
1193    });
1194
1195    #[allow(unknown_lints, clippy::unwrap_or_default)] // false positive
1196    let deleted_devices = devices.deleted.iter().fold(BTreeMap::new(), |mut acc, d| {
1197        acc.entry(d.user_id()).or_insert_with(BTreeSet::new).insert(d.device_id());
1198        acc
1199    });
1200
1201    let new_identities = identities.new.iter().map(|i| i.user_id()).collect::<BTreeSet<_>>();
1202    let changed_identities =
1203        identities.changed.iter().map(|i| i.user_id()).collect::<BTreeSet<_>>();
1204
1205    debug!(
1206        ?request_id,
1207        ?new_devices,
1208        ?changed_devices,
1209        ?deleted_devices,
1210        ?new_identities,
1211        ?changed_identities,
1212        "Finished handling of the `/keys/query` response"
1213    );
1214}
1215
1216#[cfg(any(test, feature = "testing"))]
1217#[allow(dead_code)]
1218pub(crate) mod testing {
1219    use std::sync::Arc;
1220
1221    use matrix_sdk_test::ruma_response_from_json;
1222    use ruma::{
1223        DeviceId, UserId, api::client::keys::get_keys::v3::Response as KeyQueryResponse, device_id,
1224        user_id,
1225    };
1226    use serde_json::json;
1227    use tokio::sync::Mutex;
1228
1229    use crate::{
1230        identities::IdentityManager,
1231        olm::{Account, PrivateCrossSigningIdentity},
1232        store::{CryptoStoreWrapper, MemoryStore, Store, types::PendingChanges},
1233        types::{DeviceKeys, requests::UploadSigningKeysRequest},
1234        verification::VerificationMachine,
1235    };
1236
1237    pub fn user_id() -> &'static UserId {
1238        user_id!("@example:localhost")
1239    }
1240
1241    pub fn other_user_id() -> &'static UserId {
1242        user_id!("@example2:localhost")
1243    }
1244
1245    pub fn device_id() -> &'static DeviceId {
1246        device_id!("WSKKLTJZCL")
1247    }
1248
1249    pub(crate) async fn manager_test_helper(
1250        user_id: &UserId,
1251        device_id: &DeviceId,
1252    ) -> IdentityManager {
1253        let identity = PrivateCrossSigningIdentity::new(user_id.into());
1254        let identity = Arc::new(Mutex::new(identity));
1255        let user_id = user_id.to_owned();
1256        let account = Account::with_device_id(&user_id, device_id);
1257        let static_account = account.static_data().clone();
1258        let store = Arc::new(CryptoStoreWrapper::new(&user_id, device_id, MemoryStore::new()));
1259        let verification =
1260            VerificationMachine::new(static_account.clone(), identity.clone(), store.clone());
1261        let store = Store::new(static_account, identity, store, verification);
1262        store.save_pending_changes(PendingChanges { account: Some(account) }).await.unwrap();
1263        IdentityManager::new(store)
1264    }
1265
1266    pub fn other_key_query() -> KeyQueryResponse {
1267        let data = &json!({
1268            "device_keys": {
1269                "@example2:localhost": {
1270                    "SKISMLNIMH": {
1271                        "algorithms": ["m.olm.v1.curve25519-aes-sha2", "m.megolm.v1.aes-sha2"],
1272                        "device_id": "SKISMLNIMH",
1273                        "keys": {
1274                            "curve25519:SKISMLNIMH": "qO9xFazIcW8dE0oqHGMojGgJwbBpMOhGnIfJy2pzvmI",
1275                            "ed25519:SKISMLNIMH": "y3wV3AoyIGREqrJJVH8DkQtlwHBUxoZ9ApP76kFgXQ8"
1276                        },
1277                        "signatures": {
1278                            "@example2:localhost": {
1279                                "ed25519:SKISMLNIMH": "YwbT35rbjKoYFZVU1tQP8MsL06+znVNhNzUMPt6jTEYRBFoC4GDq9hQEJBiFSq37r1jvLMteggVAWw37fs1yBA",
1280                                "ed25519:ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc": "PWuuTE/aTkp1EJQkPHhRx2BxbF+wjMIDFxDRp7JAerlMkDsNFUTfRRusl6vqROPU36cl+yY8oeJTZGFkU6+pBQ"
1281                            }
1282                        },
1283                        "user_id": "@example2:localhost",
1284                        "unsigned": {
1285                            "device_display_name": "Riot Desktop (Linux)"
1286                        }
1287                    }
1288                }
1289            },
1290            "failures": {},
1291            "master_keys": {
1292                "@example2:localhost": {
1293                    "user_id": "@example2:localhost",
1294                    "usage": ["master"],
1295                    "keys": {
1296                        "ed25519:kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do": "kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do"
1297                    },
1298                    "signatures": {
1299                        "@example2:localhost": {
1300                            "ed25519:SKISMLNIMH": "KdUZqzt8VScGNtufuQ8lOf25byYLWIhmUYpPENdmM8nsldexD7vj+Sxoo7PknnTX/BL9h2N7uBq0JuykjunCAw"
1301                        }
1302                    }
1303                }
1304            },
1305            "self_signing_keys": {
1306                "@example2:localhost": {
1307                    "user_id": "@example2:localhost",
1308                    "usage": ["self_signing"],
1309                    "keys": {
1310                        "ed25519:ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc": "ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc"
1311                    },
1312                    "signatures": {
1313                        "@example2:localhost": {
1314                            "ed25519:kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do": "W/O8BnmiUETPpH02mwYaBgvvgF/atXnusmpSTJZeUSH/vHg66xiZOhveQDG4cwaW8iMa+t9N4h1DWnRoHB4mCQ"
1315                        }
1316                    }
1317                }
1318            },
1319            "user_signing_keys": {}
1320        });
1321        ruma_response_from_json(data)
1322    }
1323
1324    // An updated version of `other_key_query` featuring an additional signature on
1325    // the master key *Note*: The added signature is actually not valid, but a
1326    // valid signature  is not required for our test.
1327    pub fn other_key_query_cross_signed() -> KeyQueryResponse {
1328        let data = json!({
1329            "device_keys": {
1330                "@example2:localhost": {
1331                    "SKISMLNIMH": {
1332                        "algorithms": ["m.olm.v1.curve25519-aes-sha2", "m.megolm.v1.aes-sha2"],
1333                        "device_id": "SKISMLNIMH",
1334                        "keys": {
1335                            "curve25519:SKISMLNIMH": "qO9xFazIcW8dE0oqHGMojGgJwbBpMOhGnIfJy2pzvmI",
1336                            "ed25519:SKISMLNIMH": "y3wV3AoyIGREqrJJVH8DkQtlwHBUxoZ9ApP76kFgXQ8"
1337                        },
1338                        "signatures": {
1339                            "@example2:localhost": {
1340                                "ed25519:SKISMLNIMH": "YwbT35rbjKoYFZVU1tQP8MsL06+znVNhNzUMPt6jTEYRBFoC4GDq9hQEJBiFSq37r1jvLMteggVAWw37fs1yBA",
1341                                "ed25519:ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc": "PWuuTE/aTkp1EJQkPHhRx2BxbF+wjMIDFxDRp7JAerlMkDsNFUTfRRusl6vqROPU36cl+yY8oeJTZGFkU6+pBQ"
1342                            }
1343                        },
1344                        "user_id": "@example2:localhost",
1345                        "unsigned": {
1346                            "device_display_name": "Riot Desktop (Linux)"
1347                        }
1348                    }
1349                }
1350            },
1351            "failures": {},
1352            "master_keys": {
1353                "@example2:localhost": {
1354                    "user_id": "@example2:localhost",
1355                    "usage": ["master"],
1356                    "keys": {
1357                        "ed25519:kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do": "kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do"
1358                    },
1359                    "signatures": {
1360                        "@example2:localhost": {
1361                            "ed25519:SKISMLNIMH": "KdUZqzt8VScGNtufuQ8lOf25byYLWIhmUYpPENdmM8nsldexD7vj+Sxoo7PknnTX/BL9h2N7uBq0JuykjunCAw"
1362                        },
1363                        // This is the added signature from alice USK compared to `other_key_query`. Note that actual signature is not valid.
1364                        "@alice:localhost": {
1365                            "ed25519:DU9z4gBFKFKCk7a13sW9wjT0Iyg7Hqv5f0BPM7DEhPo": "NotAValidSignature+GNtufuQ8lOf25byYLWIhmUYpPENdmM8nsldexD7vj+Sxoo7PknnTX/BL9h2N7uBq0JuykjunCAw"
1366                        }
1367                    }
1368                }
1369            },
1370            "self_signing_keys": {
1371                "@example2:localhost": {
1372                    "user_id": "@example2:localhost",
1373                    "usage": ["self_signing"],
1374                    "keys": {
1375                        "ed25519:ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc": "ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc"
1376                    },
1377                    "signatures": {
1378                        "@example2:localhost": {
1379                            "ed25519:kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do": "W/O8BnmiUETPpH02mwYaBgvvgF/atXnusmpSTJZeUSH/vHg66xiZOhveQDG4cwaW8iMa+t9N4h1DWnRoHB4mCQ"
1380                        }
1381                    }
1382                }
1383            },
1384            "user_signing_keys": {}
1385        });
1386        ruma_response_from_json(&data)
1387    }
1388
1389    /// Mocked response to a /keys/query request.
1390    pub fn own_key_query_with_user_id(user_id: &UserId) -> KeyQueryResponse {
1391        let data = json!({
1392          "device_keys": {
1393            user_id: {
1394              "WSKKLTJZCL": {
1395                "algorithms": [
1396                  "m.olm.v1.curve25519-aes-sha2",
1397                  "m.megolm.v1.aes-sha2"
1398                ],
1399                "device_id": "WSKKLTJZCL",
1400                "keys": {
1401                  "curve25519:WSKKLTJZCL": "wnip2tbJBJxrFayC88NNJpm61TeSNgYcqBH4T9yEDhU",
1402                  "ed25519:WSKKLTJZCL": "lQ+eshkhgKoo+qp9Qgnj3OX5PBoWMU5M9zbuEevwYqE"
1403                },
1404                "signatures": {
1405                  user_id: {
1406                    "ed25519:WSKKLTJZCL": "SKpIUnq7QK0xleav0PrIQyKjVm+TgZr7Yi8cKjLeZDtkgyToE2d4/e3Aj79dqOlLB92jFVE4d1cM/Ry04wFwCA",
1407                    "ed25519:0C8lCBxrvrv/O7BQfsKnkYogHZX3zAgw3RfJuyiq210": "9UGu1iC5YhFCdELGfB29YaV+QE0t/X5UDSsPf4QcdZyXIwyp9zBbHX2lh9vWudNQ+akZpaq7ZRaaM+4TCnw/Ag"
1408                  }
1409                },
1410                "user_id": user_id,
1411                "unsigned": {
1412                  "device_display_name": "Cross signing capable"
1413                }
1414              },
1415              "LVWOVGOXME": {
1416                "algorithms": [
1417                  "m.olm.v1.curve25519-aes-sha2",
1418                  "m.megolm.v1.aes-sha2"
1419                ],
1420                "device_id": "LVWOVGOXME",
1421                "keys": {
1422                  "curve25519:LVWOVGOXME": "KMfWKUhnDW1D11hNzATs/Ax1FQRsJxKCWzq0NyGtIiI",
1423                  "ed25519:LVWOVGOXME": "k+NC3L7CBD6fBClcHBrKLOkqCyGNSKhWXiH5Q2STRnA"
1424                },
1425                "signatures": {
1426                  user_id: {
1427                    "ed25519:LVWOVGOXME": "39Ir5Bttpc5+bQwzLj7rkjm5E5/cp/JTbMJ/t0enj6J5w9MXVBFOUqqM2hpaRaRwILMMpwYbJ8IOGjl0Y/MGAw"
1428                  }
1429                },
1430                "user_id": user_id,
1431                "unsigned": {
1432                  "device_display_name": "Non-cross signing"
1433                }
1434              }
1435            }
1436          },
1437          "failures": {},
1438          "master_keys": {
1439            user_id: {
1440              "user_id": user_id,
1441              "usage": [
1442                "master"
1443              ],
1444              "keys": {
1445                "ed25519:rJ2TAGkEOP6dX41Ksll6cl8K3J48l8s/59zaXyvl2p0": "rJ2TAGkEOP6dX41Ksll6cl8K3J48l8s/59zaXyvl2p0"
1446              },
1447              "signatures": {
1448                user_id: {
1449                  "ed25519:WSKKLTJZCL": "ZzJp1wtmRdykXAUEItEjNiFlBrxx8L6/Vaen9am8AuGwlxxJtOkuY4m+4MPLvDPOgavKHLsrRuNLAfCeakMlCQ"
1450                }
1451              }
1452            }
1453          },
1454          "self_signing_keys": {
1455            user_id: {
1456              "user_id": user_id,
1457              "usage": [
1458                "self_signing"
1459              ],
1460              "keys": {
1461                "ed25519:0C8lCBxrvrv/O7BQfsKnkYogHZX3zAgw3RfJuyiq210": "0C8lCBxrvrv/O7BQfsKnkYogHZX3zAgw3RfJuyiq210"
1462              },
1463              "signatures": {
1464                user_id: {
1465                  "ed25519:rJ2TAGkEOP6dX41Ksll6cl8K3J48l8s/59zaXyvl2p0": "AC7oDUW4rUhtInwb4lAoBJ0wAuu4a5k+8e34B5+NKsDB8HXRwgVwUWN/MRWc/sJgtSbVlhzqS9THEmQQ1C51Bw"
1466                }
1467              }
1468            }
1469          },
1470          "user_signing_keys": {
1471            user_id: {
1472              "user_id": user_id,
1473              "usage": [
1474                "user_signing"
1475              ],
1476              "keys": {
1477                "ed25519:DU9z4gBFKFKCk7a13sW9wjT0Iyg7Hqv5f0BPM7DEhPo": "DU9z4gBFKFKCk7a13sW9wjT0Iyg7Hqv5f0BPM7DEhPo"
1478              },
1479              "signatures": {
1480                user_id: {
1481                  "ed25519:rJ2TAGkEOP6dX41Ksll6cl8K3J48l8s/59zaXyvl2p0": "C4L2sx9frGqj8w41KyynHGqwUbbwBYRZpYCB+6QWnvQFA5Oi/1PJj8w5anwzEsoO0TWmLYmf7FXuAGewanOWDg"
1482                }
1483              }
1484            }
1485          }
1486        });
1487        ruma_response_from_json(&data)
1488    }
1489
1490    pub fn own_key_query() -> KeyQueryResponse {
1491        own_key_query_with_user_id(user_id())
1492    }
1493
1494    pub fn key_query(
1495        identity: UploadSigningKeysRequest,
1496        device_keys: DeviceKeys,
1497    ) -> KeyQueryResponse {
1498        let json = json!({
1499            "device_keys": {
1500                "@example:localhost": {
1501                    device_keys.device_id.to_string(): device_keys
1502                }
1503            },
1504            "failures": {},
1505            "master_keys": {
1506                "@example:localhost": identity.master_key
1507            },
1508            "self_signing_keys": {
1509                "@example:localhost": identity.self_signing_key
1510            },
1511            "user_signing_keys": {
1512                "@example:localhost": identity.user_signing_key
1513            },
1514          }
1515        );
1516
1517        ruma_response_from_json(&json)
1518    }
1519}
1520
1521#[cfg(test)]
1522pub(crate) mod tests {
1523    use std::ops::Deref;
1524
1525    use futures_util::pin_mut;
1526    use matrix_sdk_test::{async_test, ruma_response_from_json, test_json};
1527    use ruma::{
1528        TransactionId, api::client::keys::get_keys::v3::Response as KeysQueryResponse, device_id,
1529        user_id,
1530    };
1531    use serde_json::json;
1532    use stream_assert::{assert_closed, assert_pending, assert_ready};
1533
1534    use super::testing::{
1535        device_id, key_query, manager_test_helper, other_key_query, other_user_id, user_id,
1536    };
1537    use crate::{
1538        CrossSigningKeyExport, OlmMachine,
1539        identities::manager::testing::{other_key_query_cross_signed, own_key_query},
1540        olm::PrivateCrossSigningIdentity,
1541        store::types::Changes,
1542    };
1543
1544    fn key_query_with_failures() -> KeysQueryResponse {
1545        let response = json!({
1546            "device_keys": {
1547            },
1548            "failures": {
1549                "example.org": {
1550                    "errcode": "M_RESOURCE_LIMIT_EXCEEDED",
1551                    "error": "Not yet ready to retry",
1552                }
1553            }
1554        });
1555
1556        ruma_response_from_json(&response)
1557    }
1558
1559    #[async_test]
1560    async fn test_tracked_users() {
1561        let manager = manager_test_helper(user_id(), device_id()).await;
1562        let alice = user_id!("@alice:example.org");
1563
1564        let cache = manager.store.cache().await.unwrap();
1565        let key_query_manager = manager.key_query_manager.synced(&cache).await.unwrap();
1566
1567        assert!(key_query_manager.tracked_users().is_empty(), "No users are initially tracked");
1568
1569        manager.receive_device_changes(&cache, [alice].iter().map(Deref::deref)).await.unwrap();
1570
1571        assert!(
1572            !key_query_manager.tracked_users().contains(alice),
1573            "Receiving a device changes update for a user we don't track does nothing"
1574        );
1575
1576        assert!(
1577            !key_query_manager.users_for_key_query().await.0.contains(alice),
1578            "The user we don't track doesn't end up in the `/keys/query` request"
1579        );
1580    }
1581
1582    #[async_test]
1583    async fn test_manager_creation() {
1584        let manager = manager_test_helper(user_id(), device_id()).await;
1585        let cache = manager.store.cache().await.unwrap();
1586        assert!(manager.key_query_manager.synced(&cache).await.unwrap().tracked_users().is_empty())
1587    }
1588
1589    #[async_test]
1590    async fn test_manager_key_query_response() {
1591        let manager = manager_test_helper(user_id(), device_id()).await;
1592        let other_user = other_user_id();
1593        let devices = manager.store.get_user_devices(other_user).await.unwrap();
1594        assert_eq!(devices.devices().count(), 0);
1595
1596        manager
1597            .receive_keys_query_response(&TransactionId::new(), &other_key_query())
1598            .await
1599            .unwrap();
1600
1601        let devices = manager.store.get_user_devices(other_user).await.unwrap();
1602        assert_eq!(devices.devices().count(), 1);
1603
1604        let device = manager
1605            .store
1606            .get_device_data(other_user, device_id!("SKISMLNIMH"))
1607            .await
1608            .unwrap()
1609            .unwrap();
1610        let identity = manager.store.get_user_identity(other_user).await.unwrap().unwrap();
1611        let identity = identity.other().unwrap();
1612
1613        assert!(identity.is_device_signed(&device));
1614    }
1615
1616    #[async_test]
1617    async fn test_manager_own_key_query_response() {
1618        let manager = manager_test_helper(user_id(), device_id()).await;
1619        let our_user = user_id();
1620        let devices = manager.store.get_user_devices(our_user).await.unwrap();
1621        assert_eq!(devices.devices().count(), 0);
1622
1623        let private_identity = manager.store.private_identity();
1624        let private_identity = private_identity.lock().await;
1625        let identity_request = private_identity.as_upload_request().await;
1626        drop(private_identity);
1627
1628        let device_keys =
1629            manager.store.cache().await.unwrap().account().await.unwrap().device_keys();
1630        manager
1631            .receive_keys_query_response(
1632                &TransactionId::new(),
1633                &key_query(identity_request, device_keys),
1634            )
1635            .await
1636            .unwrap();
1637
1638        let identity = manager
1639            .store
1640            .get_user_identity(our_user)
1641            .await
1642            .unwrap()
1643            .expect("missing user identity");
1644        let identity = identity.own().expect("missing own identity");
1645        assert!(identity.is_verified());
1646
1647        let devices = manager.store.get_user_devices(our_user).await.unwrap();
1648        assert_eq!(devices.devices().count(), 1);
1649
1650        let device =
1651            manager.store.get_device_data(our_user, device_id!(device_id())).await.unwrap();
1652
1653        assert!(device.is_some());
1654    }
1655
1656    #[async_test]
1657    async fn test_private_identity_invalidation_after_public_keys_change() {
1658        let user_id = user_id!("@example1:localhost");
1659        let manager = manager_test_helper(user_id, "DEVICEID".into()).await;
1660
1661        let identity_request = {
1662            let private_identity = manager.store.private_identity();
1663            let private_identity = private_identity.lock().await;
1664            private_identity.as_upload_request().await
1665        };
1666        let device_keys = manager.store.static_account().unsigned_device_keys();
1667
1668        let response = json!({
1669            "device_keys": {
1670                user_id: {
1671                    device_keys.device_id.to_string(): device_keys
1672                }
1673            },
1674            "master_keys": {
1675                user_id: identity_request.master_key,
1676            },
1677            "self_signing_keys": {
1678                user_id: identity_request.self_signing_key,
1679            },
1680            "user_signing_keys": {
1681                user_id: identity_request.user_signing_key,
1682            }
1683        });
1684
1685        let response = ruma_response_from_json(&response);
1686        manager.receive_keys_query_response(&TransactionId::new(), &response).await.unwrap();
1687
1688        let identity = manager.store.get_user_identity(user_id).await.unwrap().unwrap();
1689        let identity = identity.own().unwrap();
1690        assert!(identity.is_verified());
1691
1692        let identity_request = {
1693            let private_identity = PrivateCrossSigningIdentity::new(user_id.into());
1694            private_identity.as_upload_request().await
1695        };
1696
1697        let response = json!({
1698            "master_keys": {
1699                user_id: identity_request.master_key,
1700                "@example2:localhost": {
1701                    "user_id": "@example2:localhost",
1702                    "usage": ["master"],
1703                    "keys": {
1704                        "ed25519:kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do": "kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do"
1705                    },
1706                    "signatures": {
1707                        "@example2:localhost": {
1708                            "ed25519:SKISMLNIMH": "KdUZqzt8VScGNtufuQ8lOf25byYLWIhmUYpPENdmM8nsldexD7vj+Sxoo7PknnTX/BL9h2N7uBq0JuykjunCAw"
1709                        }
1710                    }
1711                },
1712            },
1713            "self_signing_keys": {
1714                user_id: identity_request.self_signing_key,
1715                "@example2:localhost": {
1716                    "user_id": "@example2:localhost",
1717                    "usage": ["self_signing"],
1718                    "keys": {
1719                        "ed25519:ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc": "ZtFrSkJ1qB8Jph/ql9Eo/lKpIYCzwvKAKXfkaS4XZNc"
1720                    },
1721                    "signatures": {
1722                        "@example2:localhost": {
1723                            "ed25519:kC/HmRYw4HNqUp/i4BkwYENrf+hd9tvdB7A1YOf5+Do": "W/O8BnmiUETPpH02mwYaBgvvgF/atXnusmpSTJZeUSH/vHg66xiZOhveQDG4cwaW8iMa+t9N4h1DWnRoHB4mCQ"
1724                        }
1725                    }
1726                }
1727            },
1728            "user_signing_keys": {
1729                user_id: identity_request.user_signing_key,
1730            }
1731        });
1732
1733        let response = ruma_response_from_json(&response);
1734        let (_, private_identity) = manager.handle_cross_signing_keys(&response).await.unwrap();
1735
1736        assert!(private_identity.is_some());
1737        let private_identity = manager.store.private_identity();
1738        assert!(private_identity.lock().await.is_empty().await);
1739    }
1740
1741    #[async_test]
1742    async fn test_no_tracked_users_key_query_request() {
1743        let manager = manager_test_helper(user_id(), device_id()).await;
1744
1745        let cache = manager.store.cache().await.unwrap();
1746        assert!(
1747            manager.key_query_manager.synced(&cache).await.unwrap().tracked_users().is_empty(),
1748            "No users are initially tracked"
1749        );
1750
1751        let requests = manager.users_for_key_query().await.unwrap();
1752        assert!(!requests.is_empty(), "We query the keys for our own user");
1753
1754        assert!(
1755            manager
1756                .key_query_manager
1757                .synced(&cache)
1758                .await
1759                .unwrap()
1760                .tracked_users()
1761                .contains(manager.user_id()),
1762            "Our own user is now tracked"
1763        );
1764    }
1765
1766    /// If a user is invalidated while a /keys/query request is in flight, that
1767    /// user is not removed from the list of outdated users when the
1768    /// response is received
1769    #[async_test]
1770    async fn test_invalidation_race_handling() {
1771        let manager = manager_test_helper(user_id(), device_id()).await;
1772        let alice = other_user_id();
1773        manager.update_tracked_users([alice]).await.unwrap();
1774
1775        // alice should be in the list of key queries
1776        let (reqid, req) = manager.users_for_key_query().await.unwrap().pop_first().unwrap();
1777        assert!(req.device_keys.contains_key(alice));
1778
1779        // another invalidation turns up
1780        {
1781            let cache = manager.store.cache().await.unwrap();
1782            manager.receive_device_changes(&cache, [alice].into_iter()).await.unwrap();
1783        }
1784
1785        // the response from the query arrives
1786        manager.receive_keys_query_response(&reqid, &other_key_query()).await.unwrap();
1787
1788        // alice should *still* be in the list of key queries
1789        let (reqid, req) = manager.users_for_key_query().await.unwrap().pop_first().unwrap();
1790        assert!(req.device_keys.contains_key(alice));
1791
1792        // another key query response
1793        manager.receive_keys_query_response(&reqid, &other_key_query()).await.unwrap();
1794
1795        // finally alice should not be in the list
1796        let queries = manager.users_for_key_query().await.unwrap();
1797        assert!(!queries.iter().any(|(_, r)| r.device_keys.contains_key(alice)));
1798    }
1799
1800    #[async_test]
1801    async fn test_failure_handling() {
1802        let manager = manager_test_helper(user_id(), device_id()).await;
1803        let alice = user_id!("@alice:example.org");
1804
1805        {
1806            let cache = manager.store.cache().await.unwrap();
1807            let key_query_manager = manager.key_query_manager.synced(&cache).await.unwrap();
1808            assert!(key_query_manager.tracked_users().is_empty(), "No users are initially tracked");
1809
1810            key_query_manager.mark_user_as_changed(alice).await.unwrap();
1811
1812            assert!(
1813                key_query_manager.tracked_users().contains(alice),
1814                "Alice is tracked after being marked as tracked"
1815            );
1816        }
1817
1818        let (reqid, req) = manager.users_for_key_query().await.unwrap().pop_first().unwrap();
1819        assert!(req.device_keys.contains_key(alice));
1820
1821        // a failure should stop us querying for the user's keys.
1822        let response = key_query_with_failures();
1823        manager.receive_keys_query_response(&reqid, &response).await.unwrap();
1824        assert!(manager.failures.contains(alice.server_name()));
1825        assert!(
1826            !manager
1827                .users_for_key_query()
1828                .await
1829                .unwrap()
1830                .iter()
1831                .any(|(_, r)| r.device_keys.contains_key(alice))
1832        );
1833
1834        // clearing the failure flag should make the user reappear in the query list.
1835        manager.failures.remove([alice.server_name().to_owned()].iter());
1836        assert!(
1837            manager
1838                .users_for_key_query()
1839                .await
1840                .unwrap()
1841                .iter()
1842                .any(|(_, r)| r.device_keys.contains_key(alice))
1843        );
1844    }
1845
1846    #[async_test]
1847    async fn test_out_of_band_key_query() {
1848        // build the request
1849        let manager = manager_test_helper(user_id(), device_id()).await;
1850        let (reqid, req) = manager.build_key_query_for_users(vec![user_id()]);
1851        assert!(req.device_keys.contains_key(user_id()));
1852
1853        // make up a response and check it is processed
1854        let (device_changes, identity_changes) =
1855            manager.receive_keys_query_response(&reqid, &own_key_query()).await.unwrap();
1856        assert_eq!(device_changes.new.len(), 1);
1857        assert_eq!(device_changes.new[0].device_id(), "LVWOVGOXME");
1858        assert_eq!(identity_changes.new.len(), 1);
1859        assert_eq!(identity_changes.new[0].user_id(), user_id());
1860
1861        let devices = manager.store.get_user_devices(user_id()).await.unwrap();
1862        assert_eq!(devices.devices().count(), 1);
1863        assert_eq!(devices.devices().next().unwrap().device_id(), "LVWOVGOXME");
1864    }
1865
1866    #[async_test]
1867    async fn test_invalid_key_response() {
1868        let my_user_id = user_id();
1869        let my_device_id = device_id();
1870        let manager = manager_test_helper(my_user_id, my_device_id).await;
1871
1872        // First of all, populate the store with good data
1873        let (reqid, _) = manager.build_key_query_for_users(vec![user_id()]);
1874        let (device_changes, identity_changes) =
1875            manager.receive_keys_query_response(&reqid, &own_key_query()).await.unwrap();
1876        assert_eq!(device_changes.new.len(), 1);
1877        let test_device_id = device_changes.new.first().unwrap().device_id().to_owned();
1878        let changes =
1879            Changes { devices: device_changes, identities: identity_changes, ..Changes::default() };
1880        manager.store.save_changes(changes).await.unwrap();
1881
1882        // Now provide an invalid update
1883        let (reqid, _) = manager.build_key_query_for_users(vec![my_user_id]);
1884        let response = ruma_response_from_json(&json!({
1885            "device_keys": {
1886                my_user_id: {
1887                    test_device_id.as_str(): {
1888                        "algorithms": [
1889                            "m.olm.v1.curve25519-aes-sha2",
1890                        ],
1891                        "device_id": test_device_id.as_str(),
1892                        "keys": {
1893                            format!("curve25519:{}", test_device_id): "wnip2tbJBJxrFayC88NNJpm61TeSNgYcqBH4T9yEDhU",
1894                            format!("ed25519:{}", test_device_id): "lQ+eshkhgKoo+qp9Qgnj3OX5PBoWMU5M9zbuEevwYqE"
1895                        },
1896                        "signatures": {
1897                            my_user_id: {
1898                                // Not a valid signature.
1899                                format!("ed25519:{}", test_device_id): "imadethisup",
1900                            }
1901                        },
1902                        "user_id": my_user_id,
1903                    }
1904                }
1905            }
1906        }));
1907
1908        let (device_changes, identity_changes) =
1909            manager.receive_keys_query_response(&reqid, &response).await.unwrap();
1910
1911        // The result should be empty
1912        assert_eq!(device_changes.new.len(), 0);
1913        assert_eq!(device_changes.changed.len(), 0);
1914        assert_eq!(device_changes.deleted.len(), 0);
1915        assert_eq!(identity_changes.new.len(), 0);
1916
1917        // And the device should not have been updated.
1918        let device =
1919            manager.store.get_user_devices(my_user_id).await.unwrap().get(&test_device_id).unwrap();
1920        assert_eq!(device.algorithms().len(), 2);
1921    }
1922
1923    #[async_test]
1924    async fn test_devices_stream() {
1925        let manager = manager_test_helper(user_id(), device_id()).await;
1926        let (request_id, _) = manager.build_key_query_for_users(vec![user_id()]);
1927
1928        let stream = manager.store.devices_stream();
1929        pin_mut!(stream);
1930
1931        manager.receive_keys_query_response(&request_id, &own_key_query()).await.unwrap();
1932
1933        let update = assert_ready!(stream);
1934        assert!(!update.new.is_empty(), "The device update should contain some devices");
1935    }
1936
1937    #[async_test]
1938    async fn test_identities_stream() {
1939        let manager = manager_test_helper(user_id(), device_id()).await;
1940        let (request_id, _) = manager.build_key_query_for_users(vec![user_id()]);
1941
1942        let stream = manager.store.user_identities_stream();
1943        pin_mut!(stream);
1944
1945        manager.receive_keys_query_response(&request_id, &own_key_query()).await.unwrap();
1946
1947        let update = assert_ready!(stream);
1948        assert!(!update.new.is_empty(), "The identities update should contain some identities");
1949    }
1950
1951    #[async_test]
1952    async fn test_identities_stream_raw() {
1953        let mut manager = Some(manager_test_helper(user_id(), device_id()).await);
1954        let (request_id, _) = manager.as_ref().unwrap().build_key_query_for_users(vec![user_id()]);
1955
1956        let stream = manager.as_ref().unwrap().store.identities_stream_raw();
1957        pin_mut!(stream);
1958
1959        manager
1960            .as_ref()
1961            .unwrap()
1962            .receive_keys_query_response(&request_id, &own_key_query())
1963            .await
1964            .unwrap();
1965
1966        let (identity_update, _) = assert_ready!(stream);
1967        assert_eq!(identity_update.new.len(), 1);
1968        assert_eq!(identity_update.changed.len(), 0);
1969        assert_eq!(identity_update.unchanged.len(), 0);
1970        assert_eq!(identity_update.new[0].user_id(), user_id());
1971
1972        assert_pending!(stream);
1973
1974        let (new_request_id, _) =
1975            manager.as_ref().unwrap().build_key_query_for_users(vec![user_id()]);
1976
1977        // A second `/keys/query` response with the same result shouldn't fire a change
1978        // notification: the identity and device should be unchanged.
1979        manager
1980            .as_ref()
1981            .unwrap()
1982            .receive_keys_query_response(&new_request_id, &own_key_query())
1983            .await
1984            .unwrap();
1985
1986        assert_pending!(stream);
1987
1988        // dropping the manager (and hence dropping the store) should close the stream
1989        manager.take();
1990        assert_closed!(stream);
1991    }
1992
1993    #[async_test]
1994    async fn test_identities_stream_raw_signature_update() {
1995        let mut manager = Some(manager_test_helper(user_id(), device_id()).await);
1996        let (request_id, _) =
1997            manager.as_ref().unwrap().build_key_query_for_users(vec![other_user_id()]);
1998
1999        let stream = manager.as_ref().unwrap().store.identities_stream_raw();
2000        pin_mut!(stream);
2001
2002        manager
2003            .as_ref()
2004            .unwrap()
2005            .receive_keys_query_response(&request_id, &other_key_query())
2006            .await
2007            .unwrap();
2008
2009        let (identity_update, _) = assert_ready!(stream);
2010        assert_eq!(identity_update.new.len(), 1);
2011        assert_eq!(identity_update.changed.len(), 0);
2012        assert_eq!(identity_update.unchanged.len(), 0);
2013        assert_eq!(identity_update.new[0].user_id(), other_user_id());
2014
2015        let initial_msk = identity_update.new[0].master_key().clone();
2016
2017        let (new_request_id, _) =
2018            manager.as_ref().unwrap().build_key_query_for_users(vec![user_id()]);
2019        // There is a new signature on the msk, should trigger a change
2020        manager
2021            .as_ref()
2022            .unwrap()
2023            .receive_keys_query_response(&new_request_id, &other_key_query_cross_signed())
2024            .await
2025            .unwrap();
2026
2027        let (identity_update_2, _) = assert_ready!(stream);
2028        assert_eq!(identity_update_2.new.len(), 0);
2029        assert_eq!(identity_update_2.changed.len(), 1);
2030        assert_eq!(identity_update_2.unchanged.len(), 0);
2031
2032        let updated_msk = identity_update_2.changed[0].master_key().clone();
2033
2034        // Identity has a change (new signature) but it's the same msk
2035        assert_eq!(initial_msk, updated_msk);
2036
2037        assert_pending!(stream);
2038
2039        manager.take();
2040    }
2041
2042    #[async_test]
2043    async fn test_key_query_with_unknown_properties() {
2044        let manager = manager_test_helper(user_id(), device_id()).await;
2045        let other_user = user_id!("@example:localhost");
2046        let devices = manager.store.get_user_devices(other_user).await.unwrap();
2047        assert_eq!(devices.devices().count(), 0);
2048
2049        let response = json!({
2050            "device_keys": {
2051                "@example:localhost": {
2052                    "OBEBOSKTBE": {
2053                        "algorithms": ["m.olm.v1.curve25519-aes-sha2", "m.megolm.v1.aes-sha2"],
2054                        "user_id": "@example:localhost",
2055                        "device_id": "OBEBOSKTBE",
2056                        "extra_property": "somevalue",
2057                        "keys": {
2058                            "curve25519:OBEBOSKTBE": "ECrdZebl0DskwbkxoztsiKPb6ivu7M2qQ70BFWwre3w",
2059                            "ed25519:OBEBOSKTBE": "hFWo+pG6TVWNzq/ZubUQVL5Ardu9rqHxpKkCbf1/KiA"
2060                        },
2061                        "signatures": {
2062                            "@example:localhost": {
2063                                "ed25519:OBEBOSKTBE": "6vyYUgX+IoT1x6Mvf0g/GEPVb2UI3brfL7WZ75WZ81sH4FBFgAzkkuGpw9suGLKXnlEdLH0suBzaT4esVhFDCw",
2064                            },
2065                        },
2066                    },
2067                },
2068            },
2069        });
2070
2071        let response = ruma_response_from_json(&response);
2072        manager.receive_keys_query_response(&TransactionId::new(), &response).await.unwrap();
2073
2074        let devices = manager.store.get_user_devices(other_user).await.unwrap();
2075        assert_eq!(devices.devices().count(), 1);
2076
2077        manager.store.get_device_data(other_user, device_id!("OBEBOSKTBE")).await.unwrap().unwrap();
2078    }
2079
2080    #[async_test]
2081    async fn test_manager_identity_updates() {
2082        use test_json::keys_query_sets::IdentityChangeDataSet as DataSet;
2083
2084        let manager = manager_test_helper(user_id(), device_id()).await;
2085        let other_user = DataSet::user_id();
2086        let devices = manager.store.get_user_devices(other_user).await.unwrap();
2087        assert_eq!(devices.devices().count(), 0);
2088
2089        let identity = manager.store.get_user_identity(other_user).await.unwrap();
2090        assert!(identity.is_none());
2091
2092        manager
2093            .receive_keys_query_response(
2094                &TransactionId::new(),
2095                &DataSet::key_query_with_identity_a(),
2096            )
2097            .await
2098            .unwrap();
2099
2100        let identity = manager.store.get_user_identity(other_user).await.unwrap().unwrap();
2101        let other_identity = identity.other().unwrap();
2102
2103        // We should now have an identity for the user but no pin violation
2104        // (pinned master key is the current one)
2105        assert!(!other_identity.has_pin_violation());
2106        let first_device =
2107            manager.store.get_device_data(other_user, DataSet::device_a()).await.unwrap().unwrap();
2108        assert!(first_device.is_cross_signed_by_owner(&identity));
2109
2110        // We receive a new keys update for that user, with a new identity
2111        manager
2112            .receive_keys_query_response(
2113                &TransactionId::new(),
2114                &DataSet::key_query_with_identity_b(),
2115            )
2116            .await
2117            .unwrap();
2118
2119        let identity = manager.store.get_user_identity(other_user).await.unwrap().unwrap();
2120        let other_identity = identity.other().unwrap();
2121
2122        // The previous known identity has been replaced, there should be a pin
2123        // violation
2124        assert!(other_identity.has_pin_violation());
2125
2126        let second_device =
2127            manager.store.get_device_data(other_user, DataSet::device_b()).await.unwrap().unwrap();
2128
2129        // There is a new device signed by the new identity
2130        assert!(second_device.is_cross_signed_by_owner(&identity));
2131
2132        // The first device should not be signed by the new identity
2133        let first_device =
2134            manager.store.get_device_data(other_user, DataSet::device_a()).await.unwrap().unwrap();
2135        assert!(!first_device.is_cross_signed_by_owner(&identity));
2136
2137        let remember_previous_identity = other_identity.clone();
2138        // We receive updated keys for that user, with no identity anymore.
2139        // Notice that there is no server API to delete identity, but we want to
2140        // test here that a home server cannot clear the identity and
2141        // subsequently serve a new one which would get automatically approved.
2142        manager
2143            .receive_keys_query_response(
2144                &TransactionId::new(),
2145                &DataSet::key_query_with_identity_no_identity(),
2146            )
2147            .await
2148            .unwrap();
2149
2150        let identity = manager.store.get_user_identity(other_user).await.unwrap().unwrap();
2151        let other_identity = identity.other().unwrap();
2152
2153        assert_eq!(other_identity, &remember_previous_identity);
2154        assert!(other_identity.has_pin_violation());
2155    }
2156
2157    #[async_test]
2158    async fn test_manager_resolve_identity_pin_violation() {
2159        use test_json::keys_query_sets::IdentityChangeDataSet as DataSet;
2160
2161        let manager = manager_test_helper(user_id(), device_id()).await;
2162        let other_user = DataSet::user_id();
2163
2164        manager
2165            .receive_keys_query_response(
2166                &TransactionId::new(),
2167                &DataSet::key_query_with_identity_a(),
2168            )
2169            .await
2170            .unwrap();
2171
2172        // We receive a new keys update for that user, with a new identity
2173        manager
2174            .receive_keys_query_response(
2175                &TransactionId::new(),
2176                &DataSet::key_query_with_identity_b(),
2177            )
2178            .await
2179            .unwrap();
2180
2181        let identity = manager.store.get_user_identity(other_user).await.unwrap().unwrap();
2182        let other_identity = identity.other().unwrap();
2183
2184        // We have a new identity now, so there should be a pin violation
2185        assert!(other_identity.has_pin_violation());
2186
2187        // Resolve the violation by pinning the new identity
2188        other_identity.pin();
2189
2190        assert!(!other_identity.has_pin_violation());
2191    }
2192
2193    // Set up a machine do initial own key query and import cross-signing secret to
2194    // make the current session verified.
2195    async fn common_verified_identity_changes_machine_setup() -> OlmMachine {
2196        use test_json::keys_query_sets::VerificationViolationTestData as DataSet;
2197
2198        let machine = OlmMachine::new(DataSet::own_id(), device_id!("LOCAL")).await;
2199
2200        let keys_query = DataSet::own_keys_query_response_1();
2201        let txn_id = TransactionId::new();
2202        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2203
2204        machine
2205            .import_cross_signing_keys(CrossSigningKeyExport {
2206                master_key: DataSet::MASTER_KEY_PRIVATE_EXPORT.to_owned().into(),
2207                self_signing_key: DataSet::SELF_SIGNING_KEY_PRIVATE_EXPORT.to_owned().into(),
2208                user_signing_key: DataSet::USER_SIGNING_KEY_PRIVATE_EXPORT.to_owned().into(),
2209            })
2210            .await
2211            .unwrap();
2212        machine
2213    }
2214    #[async_test]
2215    async fn test_manager_verified_latch_setup_on_new_identities() {
2216        use test_json::keys_query_sets::VerificationViolationTestData as DataSet;
2217
2218        let machine = common_verified_identity_changes_machine_setup().await;
2219
2220        // ######
2221        // First test: Assert that the latch is properly set on new identities
2222        // ######
2223        let keys_query = DataSet::bob_keys_query_response_signed();
2224        let txn_id = TransactionId::new();
2225        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2226
2227        let own_identity =
2228            machine.get_identity(DataSet::own_id(), None).await.unwrap().unwrap().own().unwrap();
2229        // For sanity check that own identity is trusted
2230        assert!(own_identity.is_verified());
2231
2232        let bob_identity =
2233            machine.get_identity(DataSet::bob_id(), None).await.unwrap().unwrap().other().unwrap();
2234        // The verified latch should be true
2235        assert!(bob_identity.was_previously_verified());
2236        // And bob is verified
2237        assert!(bob_identity.is_verified());
2238
2239        // ######
2240        // Second test: Assert that the local latch stays on if the identity is rotated
2241        // ######
2242        let keys_query = DataSet::bob_keys_query_response_rotated();
2243        let txn_id = TransactionId::new();
2244        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2245
2246        let bob_identity =
2247            machine.get_identity(DataSet::bob_id(), None).await.unwrap().unwrap().other().unwrap();
2248        // Bob is not verified anymore
2249        assert!(!bob_identity.is_verified());
2250        // The verified latch should still be true
2251        assert!(bob_identity.was_previously_verified());
2252        // Bob device_2 is self-signed even if there is this verification latch
2253        // violation
2254        let bob_device = machine
2255            .get_device(DataSet::bob_id(), DataSet::bob_device_2_id(), None)
2256            .await
2257            .unwrap()
2258            .unwrap();
2259        assert!(bob_identity.is_device_signed(&bob_device));
2260        // there is also a pin violation
2261        assert!(bob_identity.has_pin_violation());
2262        // Fixing the pin violation won't fix the verification latch violation
2263        bob_identity.pin_current_master_key().await.unwrap();
2264        assert!(!bob_identity.has_pin_violation());
2265        let has_latch_violation =
2266            bob_identity.was_previously_verified() && !bob_identity.is_verified();
2267        assert!(has_latch_violation);
2268    }
2269
2270    #[async_test]
2271    async fn test_manager_verified_identity_changes_setup_on_updated_identities() {
2272        use test_json::keys_query_sets::VerificationViolationTestData as DataSet;
2273
2274        let machine = common_verified_identity_changes_machine_setup().await;
2275
2276        // ######
2277        // Get the Carol identity for the first time
2278        // ######
2279        let keys_query = DataSet::carol_keys_query_response_unsigned();
2280        let txn_id = TransactionId::new();
2281        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2282
2283        let carol_identity =
2284            machine.get_identity(DataSet::carol_id(), None).await.unwrap().unwrap();
2285        // The identity is not verified
2286        assert!(!carol_identity.is_verified());
2287        // The verified latch is off
2288        assert!(!carol_identity.was_previously_verified());
2289
2290        // Carol is verified, likely from another session. Ensure the latch is updated
2291        // when the key query response is processed
2292        let keys_query = DataSet::carol_keys_query_response_signed();
2293        let txn_id = TransactionId::new();
2294        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2295
2296        let carol_identity = machine
2297            .get_identity(DataSet::carol_id(), None)
2298            .await
2299            .unwrap()
2300            .unwrap()
2301            .other()
2302            .unwrap();
2303        assert!(carol_identity.is_verified());
2304        // This should have updated the latch
2305        assert!(carol_identity.was_previously_verified());
2306        // It is the same identity, it's just signed now so no pin violation
2307        assert!(!carol_identity.has_pin_violation());
2308    }
2309
2310    // Set up a machine do initial own key query.
2311    // The cross signing secrets are not yet uploaded.
2312    // Then query keys for carol and bob (both signed by own identity)
2313    async fn common_verified_identity_changes_own_trust_change_machine_setup() -> OlmMachine {
2314        use test_json::keys_query_sets::VerificationViolationTestData as DataSet;
2315
2316        // Start on a non-verified session
2317        let machine = OlmMachine::new(DataSet::own_id(), device_id!("LOCAL")).await;
2318
2319        let keys_query = DataSet::own_keys_query_response_1();
2320        let txn_id = TransactionId::new();
2321        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2322
2323        // For sanity check that own identity is not trusted
2324        let own_identity =
2325            machine.get_identity(DataSet::own_id(), None).await.unwrap().unwrap().own().unwrap();
2326        assert!(!own_identity.is_verified());
2327
2328        let keys_query = DataSet::own_keys_query_response_1();
2329        let txn_id = TransactionId::new();
2330        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2331
2332        // Get Bob and Carol already signed
2333        let keys_query = DataSet::bob_keys_query_response_signed();
2334        let txn_id = TransactionId::new();
2335        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2336
2337        let keys_query = DataSet::carol_keys_query_response_signed();
2338        let txn_id = TransactionId::new();
2339        machine.mark_request_as_sent(&txn_id, &keys_query).await.unwrap();
2340
2341        machine.update_tracked_users(vec![DataSet::bob_id(), DataSet::carol_id()]).await.unwrap();
2342
2343        machine
2344    }
2345
2346    #[async_test]
2347    async fn test_manager_verified_identity_changes_setup_on_own_identity_trust_change() {
2348        use test_json::keys_query_sets::VerificationViolationTestData as DataSet;
2349        let machine = common_verified_identity_changes_own_trust_change_machine_setup().await;
2350
2351        let own_identity =
2352            machine.get_identity(DataSet::own_id(), None).await.unwrap().unwrap().own().unwrap();
2353
2354        let bob_identity = machine.get_identity(DataSet::bob_id(), None).await.unwrap().unwrap();
2355        // Bob is verified by our identity but our own identity is not yet trusted
2356        assert!(!bob_identity.was_previously_verified());
2357        assert!(own_identity.is_identity_signed(&bob_identity.other().unwrap()));
2358
2359        let carol_identity =
2360            machine.get_identity(DataSet::carol_id(), None).await.unwrap().unwrap();
2361        // Carol is verified by our identity but our own identity is not yet trusted
2362        assert!(!carol_identity.was_previously_verified());
2363        assert!(own_identity.is_identity_signed(&carol_identity.other().unwrap()));
2364
2365        // Marking our own identity as trusted should update the existing identities
2366        let _ = own_identity.verify().await;
2367
2368        let own_identity = machine.get_identity(DataSet::own_id(), None).await.unwrap().unwrap();
2369        assert!(own_identity.is_verified());
2370
2371        let carol_identity =
2372            machine.get_identity(DataSet::carol_id(), None).await.unwrap().unwrap();
2373        assert!(carol_identity.is_verified());
2374        // The latch should be set now
2375        assert!(carol_identity.was_previously_verified());
2376
2377        let bob_identity = machine.get_identity(DataSet::bob_id(), None).await.unwrap().unwrap();
2378        assert!(bob_identity.is_verified());
2379        // The latch should be set now
2380        assert!(bob_identity.was_previously_verified());
2381    }
2382
2383    #[async_test]
2384    async fn test_manager_verified_identity_change_setup_on_import_secrets() {
2385        use test_json::keys_query_sets::VerificationViolationTestData as DataSet;
2386        let machine = common_verified_identity_changes_own_trust_change_machine_setup().await;
2387
2388        let own_identity =
2389            machine.get_identity(DataSet::own_id(), None).await.unwrap().unwrap().own().unwrap();
2390
2391        let bob_identity =
2392            machine.get_identity(DataSet::bob_id(), None).await.unwrap().unwrap().other().unwrap();
2393        // Carol is verified by our identity but our own identity is not yet trusted
2394        assert!(own_identity.is_identity_signed(&bob_identity));
2395        assert!(!bob_identity.was_previously_verified());
2396
2397        let carol_identity = machine
2398            .get_identity(DataSet::carol_id(), None)
2399            .await
2400            .unwrap()
2401            .unwrap()
2402            .other()
2403            .unwrap();
2404        // Carol is verified by our identity but our own identity is not yet trusted
2405        assert!(own_identity.is_identity_signed(&carol_identity));
2406        assert!(!carol_identity.was_previously_verified());
2407
2408        // Marking our own identity as trusted should update the existing identities
2409        machine
2410            .import_cross_signing_keys(CrossSigningKeyExport {
2411                master_key: DataSet::MASTER_KEY_PRIVATE_EXPORT.to_owned().into(),
2412                self_signing_key: DataSet::SELF_SIGNING_KEY_PRIVATE_EXPORT.to_owned().into(),
2413                user_signing_key: DataSet::USER_SIGNING_KEY_PRIVATE_EXPORT.to_owned().into(),
2414            })
2415            .await
2416            .unwrap();
2417
2418        let own_identity = machine.get_identity(DataSet::own_id(), None).await.unwrap().unwrap();
2419        assert!(own_identity.is_verified());
2420
2421        let carol_identity =
2422            machine.get_identity(DataSet::carol_id(), None).await.unwrap().unwrap();
2423        assert!(carol_identity.is_verified());
2424        // The latch should be set now
2425        assert!(carol_identity.was_previously_verified());
2426
2427        let bob_identity = machine.get_identity(DataSet::bob_id(), None).await.unwrap().unwrap();
2428        assert!(bob_identity.is_verified());
2429        // The latch should be set now
2430        assert!(bob_identity.was_previously_verified());
2431    }
2432
2433    mod update_sender_data {
2434        use assert_matches::assert_matches;
2435        use matrix_sdk_test::async_test;
2436        use ruma::room_id;
2437
2438        use super::{device_id, manager_test_helper};
2439        use crate::{
2440            Account, DeviceData, EncryptionSettings,
2441            identities::manager::testing::{other_user_id, user_id},
2442            olm::{InboundGroupSession, SenderData},
2443            store::types::{Changes, DeviceChanges},
2444        };
2445
2446        #[async_test]
2447        async fn test_adds_device_info_to_existing_sessions() {
2448            let manager = manager_test_helper(user_id(), device_id()).await;
2449
2450            // Given that we have lots of sessions in the store, from each of two devices
2451            let account1 = Account::new(user_id());
2452            let account2 = Account::new(other_user_id());
2453
2454            let mut account1_sessions = Vec::new();
2455            for _ in 0..60 {
2456                account1_sessions.push(create_inbound_group_session(&account1).await);
2457            }
2458            let mut account2_sessions = Vec::new();
2459            for _ in 0..60 {
2460                account2_sessions.push(create_inbound_group_session(&account2).await);
2461            }
2462            manager
2463                .store
2464                .save_changes(Changes {
2465                    inbound_group_sessions: [account1_sessions.clone(), account2_sessions.clone()]
2466                        .concat(),
2467                    ..Default::default()
2468                })
2469                .await
2470                .unwrap();
2471
2472            // When we get an update for one device
2473            let device_data = DeviceData::from_account(&account1);
2474            manager
2475                .update_sender_data_from_device_changes(&DeviceChanges {
2476                    changed: vec![device_data],
2477                    ..Default::default()
2478                })
2479                .await
2480                .unwrap();
2481
2482            // Then those sessions should be updated
2483            for session in account1_sessions {
2484                let updated = manager
2485                    .store
2486                    .get_inbound_group_session(session.room_id(), session.session_id())
2487                    .await
2488                    .unwrap()
2489                    .expect("Could not find session after update");
2490                assert_matches!(
2491                    updated.sender_data,
2492                    SenderData::DeviceInfo { .. },
2493                    "incorrect sender data for session {}",
2494                    session.session_id()
2495                );
2496            }
2497
2498            // ... and those from the other account should not
2499            for session in account2_sessions {
2500                let updated = manager
2501                    .store
2502                    .get_inbound_group_session(session.room_id(), session.session_id())
2503                    .await
2504                    .unwrap()
2505                    .expect("Could not find session after update");
2506                assert_matches!(updated.sender_data, SenderData::UnknownDevice { .. });
2507            }
2508        }
2509
2510        /// Create an InboundGroupSession sent from the given account
2511        async fn create_inbound_group_session(account: &Account) -> InboundGroupSession {
2512            let (_, igs) = account
2513                .create_group_session_pair(
2514                    room_id!("!test:room"),
2515                    EncryptionSettings::default(),
2516                    SenderData::unknown(),
2517                )
2518                .await
2519                .unwrap();
2520            igs
2521        }
2522    }
2523}