matrix_sdk/encryption/backups/
mod.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Room key backup support
16//!
17//! This module implements support for server-side key backups[[1]]. The module
18//! allows you to connect to an existing backup, create or delete backups from
19//! the homeserver, and download room keys from a backup.
20//!
21//! [1]: https://spec.matrix.org/unstable/client-server-api/#server-side-key-backups
22
23use std::collections::{BTreeMap, BTreeSet};
24
25use futures_core::Stream;
26use futures_util::StreamExt;
27#[cfg(feature = "experimental-encrypted-state-events")]
28use matrix_sdk_base::crypto::types::events::room::encrypted::EncryptedEvent;
29use matrix_sdk_base::crypto::{
30    OlmMachine, RoomKeyImportResult,
31    backups::MegolmV1BackupKey,
32    store::types::BackupDecryptionKey,
33    types::{RoomKeyBackupInfo, requests::KeysBackupRequest},
34};
35#[cfg(feature = "experimental-encrypted-state-events")]
36use ruma::serde::JsonCastable;
37use ruma::{
38    OwnedRoomId, RoomId, TransactionId,
39    api::client::{
40        backup::{
41            RoomKeyBackup, add_backup_keys, create_backup_version, get_backup_keys,
42            get_backup_keys_for_room, get_backup_keys_for_session, get_latest_backup_info,
43        },
44        error::ErrorKind,
45    },
46    events::{
47        room::encrypted::OriginalSyncRoomEncryptedEvent,
48        secret::{request::SecretName, send::ToDeviceSecretSendEvent},
49    },
50    serde::Raw,
51};
52use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
53use tracing::{Span, error, info, instrument, trace, warn};
54
55pub mod futures;
56pub(crate) mod types;
57
58use matrix_sdk_base::crypto::olm::ExportedRoomKey;
59pub use types::{BackupState, UploadState};
60
61use self::futures::WaitForSteadyState;
62use crate::{Client, Error, Room, encryption::BackupDownloadStrategy};
63
64/// The backups manager for the [`Client`].
65#[derive(Debug, Clone)]
66pub struct Backups {
67    pub(super) client: Client,
68}
69
70impl Backups {
71    /// Create a new backup version, encrypted with a new backup recovery key.
72    ///
73    /// The backup recovery key will be persisted locally and shared with
74    /// trusted devices as `m.secret.send` to-device messages.
75    ///
76    /// After the backup has been created, all room keys will be uploaded to the
77    /// homeserver.
78    ///
79    /// *Warning*: This will overwrite any existing backup.
80    ///
81    /// # Examples
82    ///
83    /// ```no_run
84    /// # use matrix_sdk::{Client, encryption::backups::BackupState};
85    /// # use url::Url;
86    /// # async {
87    /// # let homeserver = Url::parse("http://example.com")?;
88    /// # let client = Client::new(homeserver).await?;
89    /// let backups = client.encryption().backups();
90    /// backups.create().await?;
91    ///
92    /// assert_eq!(backups.state(), BackupState::Enabled);
93    /// # anyhow::Ok(()) };
94    /// ```
95    pub async fn create(&self) -> Result<(), Error> {
96        self.client.inner.e2ee.backup_state.clear_backup_exists_on_server();
97        let _guard = self.client.locks().backup_modify_lock.lock().await;
98
99        self.set_state(BackupState::Creating);
100
101        // Create a future so we can catch errors and go back to the `Unknown`
102        // state. This is a hack to get around the lack of `try` blocks in Rust.
103        let future = async {
104            let olm_machine = self.client.olm_machine().await;
105            let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
106
107            // Create a new backup recovery key.
108            let decryption_key = BackupDecryptionKey::new().expect(
109                "We should be able to generate enough randomness to create a new backup recovery \
110                 key",
111            );
112
113            // Get the info about the new backup key, this needs to be uploaded to the
114            // homeserver[1].
115            //
116            // We need to sign the `RoomKeyBackupInfo` so other clients which might want
117            // to start using the backup without having access to the
118            // `BackupDecryptionKey` can do so, as per [spec]:
119            //
120            // Clients must only store keys in backups after they have ensured that the
121            // `auth_data` has not been tampered with. This can be done either by:
122            //
123            //  * checking that it is signed by the user's master cross-signing key or by a
124            //    verified device belonging to the same user, or
125            //  * by deriving the public key from a private key that it obtained from a
126            //    trusted source. Trusted sources for the private key include the user
127            //    entering the key, retrieving the key stored in secret storage, or
128            //    obtaining the key via secret sharing from a verified device belonging to
129            //    the same user.
130            //
131            //
132            // [1]: https://spec.matrix.org/v1.8/client-server-api/#post_matrixclientv3room_keysversion
133            // [spec]: https://spec.matrix.org/v1.8/client-server-api/#server-side-key-backups
134            let mut backup_info = decryption_key.to_backup_info();
135
136            if let Err(e) = olm_machine.backup_machine().sign_backup(&mut backup_info).await {
137                warn!("Unable to sign the newly created backup version: {e:?}");
138            }
139
140            let algorithm = Raw::new(&backup_info)?.cast();
141            let request = create_backup_version::v3::Request::new(algorithm);
142            let response = self.client.send(request).await?;
143            let version = response.version;
144
145            // Reset any state we might have had before the new backup was created.
146            // TODO: This should remove the old stored key and version.
147            olm_machine.backup_machine().disable_backup().await?;
148
149            let backup_key = decryption_key.megolm_v1_public_key();
150
151            // Save the newly created keys and the version we received from the server.
152            olm_machine
153                .backup_machine()
154                .save_decryption_key(Some(decryption_key), Some(version.to_owned()))
155                .await?;
156
157            // Enable the backup and start the upload of room keys.
158            self.enable(olm_machine, backup_key, version).await?;
159
160            Ok(())
161        };
162
163        let result = future.await;
164
165        if result.is_err() {
166            self.set_state(BackupState::Unknown);
167        }
168
169        result
170    }
171
172    /// Disable and delete the currently active backup only if previously
173    /// enabled before, otherwise an error will be returned.
174    ///
175    /// For a more aggressive variant see [`Backups::disable_and_delete`] which
176    /// will delete the remote backup without checking the local state.
177    ///
178    /// # Examples
179    ///
180    /// ```no_run
181    /// # use matrix_sdk::{Client, encryption::backups::BackupState};
182    /// # use url::Url;
183    /// # async {
184    /// # let homeserver = Url::parse("http://example.com")?;
185    /// # let client = Client::new(homeserver).await?;
186    /// let backups = client.encryption().backups();
187    /// backups.disable().await?;
188    ///
189    /// assert_eq!(backups.state(), BackupState::Unknown);
190    /// # anyhow::Ok(()) };
191    /// ```
192    #[instrument(skip_all, fields(version))]
193    pub async fn disable(&self) -> Result<(), Error> {
194        let _guard = self.client.locks().backup_modify_lock.lock().await;
195
196        self.set_state(BackupState::Disabling);
197
198        // Create a future so we can catch errors and go back to the `Unknown` state.
199        let future = async {
200            let olm_machine = self.client.olm_machine().await;
201            let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
202
203            let backup_keys = olm_machine.backup_machine().get_backup_keys().await?;
204
205            if let Some(version) = backup_keys.backup_version {
206                Span::current().record("version", &version);
207                info!("Deleting and disabling backup");
208
209                self.delete_backup_from_server(version).await?;
210                info!("Backup successfully deleted");
211
212                olm_machine.backup_machine().disable_backup().await?;
213
214                info!("Backup successfully disabled and deleted");
215
216                Ok(())
217            } else {
218                info!("Backup is not enabled, can't disable it");
219                Err(Error::BackupNotEnabled)
220            }
221        };
222
223        let result = future.await;
224
225        self.set_state(BackupState::Unknown);
226
227        result
228    }
229
230    /// Completely disable and delete the active backup both locally
231    /// and from the backend no matter if previously setup locally
232    /// or not.
233    ///
234    /// ⚠️ This method is mainly used when resetting the crypto identity
235    /// and for most other use cases its safer [`Backups::disable`] counterpart
236    /// should be used.
237    ///
238    /// It will fetch the current backup version from the backend and delete it
239    /// before proceeding to disabling local backups as well
240    ///
241    /// # Examples
242    ///
243    /// ```no_run
244    /// # use matrix_sdk::{Client, encryption::backups::BackupState};
245    /// # use url::Url;
246    /// # async {
247    /// # let homeserver = Url::parse("http://example.com")?;
248    /// # let client = Client::new(homeserver).await?;
249    /// let backups = client.encryption().backups();
250    /// backups.disable_and_delete().await?;
251    ///
252    /// assert_eq!(backups.state(), BackupState::Unknown);
253    /// # anyhow::Ok(()) };
254    /// ```
255    pub async fn disable_and_delete(&self) -> Result<(), Error> {
256        let _guard = self.client.locks().backup_modify_lock.lock().await;
257
258        self.set_state(BackupState::Disabling);
259
260        // Create a future so we can catch errors and go back to the `Unknown` state.
261        let future = async {
262            let response = self.get_current_version().await?;
263
264            if let Some(response) = response {
265                self.delete_backup_from_server(response.version).await?;
266            }
267
268            let olm_machine = self.client.olm_machine().await;
269            let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
270
271            olm_machine.backup_machine().disable_backup().await?;
272
273            Ok(())
274        };
275
276        let result = future.await;
277
278        self.set_state(BackupState::Unknown);
279
280        result
281    }
282
283    /// Returns a future to wait for room keys to be uploaded.
284    ///
285    /// Awaiting the future will wake up a task to upload room keys which have
286    /// not yet been uploaded to the homeserver. It will then wait for the task
287    /// to finish uploading.
288    ///
289    /// # Examples
290    ///
291    /// ```no_run
292    /// # use matrix_sdk::{Client, encryption::backups::UploadState};
293    /// # use url::Url;
294    /// # async {
295    /// # let homeserver = Url::parse("http://example.com")?;
296    /// # let client = Client::new(homeserver).await?;
297    /// use futures_util::StreamExt;
298    ///
299    /// let backups = client.encryption().backups();
300    /// let wait_for_steady_state = backups.wait_for_steady_state();
301    ///
302    /// let mut progress_stream = wait_for_steady_state.subscribe_to_progress();
303    ///
304    /// tokio::spawn(async move {
305    ///     while let Some(update) = progress_stream.next().await {
306    ///         let Ok(update) = update else { break };
307    ///
308    ///         match update {
309    ///             UploadState::Uploading(counts) => {
310    ///                 println!(
311    ///                     "Uploaded {} out of {} room keys.",
312    ///                     counts.backed_up, counts.total
313    ///                 );
314    ///             }
315    ///             UploadState::Error => break,
316    ///             UploadState::Done => break,
317    ///             _ => (),
318    ///         }
319    ///     }
320    /// });
321    ///
322    /// wait_for_steady_state.await?;
323    ///
324    /// # anyhow::Ok(()) };
325    /// ```
326    pub fn wait_for_steady_state(&self) -> WaitForSteadyState<'_> {
327        WaitForSteadyState {
328            backups: self,
329            progress: self.client.inner.e2ee.backup_state.upload_progress.clone(),
330            timeout: None,
331        }
332    }
333
334    /// Get a stream of updates to the [`BackupState`].
335    ///
336    /// This method will send out the current state as the first update.
337    ///
338    /// # Examples
339    ///
340    /// ```no_run
341    /// # use matrix_sdk::{Client, encryption::backups::BackupState};
342    /// # use url::Url;
343    /// # async {
344    /// # let homeserver = Url::parse("http://example.com")?;
345    /// # let client = Client::new(homeserver).await?;
346    /// use futures_util::StreamExt;
347    ///
348    /// let backups = client.encryption().backups();
349    ///
350    /// let mut state_stream = backups.state_stream();
351    ///
352    /// while let Some(update) = state_stream.next().await {
353    ///     let Ok(update) = update else { break };
354    ///
355    ///     match update {
356    ///         BackupState::Enabled => {
357    ///             println!("Backups have been enabled");
358    ///         }
359    ///         _ => (),
360    ///     }
361    /// }
362    /// # anyhow::Ok(()) };
363    /// ```
364    pub fn state_stream(
365        &self,
366    ) -> impl Stream<Item = Result<BackupState, BroadcastStreamRecvError>> + use<> {
367        self.client.inner.e2ee.backup_state.global_state.subscribe()
368    }
369
370    /// Get the current [`BackupState`] for this [`Client`].
371    pub fn state(&self) -> BackupState {
372        self.client.inner.e2ee.backup_state.global_state.get()
373    }
374
375    /// Are backups enabled for the current [`Client`]?
376    ///
377    /// This method will check if we locally have an active backup key and
378    /// backup version and are ready to upload room keys to a backup.
379    pub async fn are_enabled(&self) -> bool {
380        let olm_machine = self.client.olm_machine().await;
381
382        if let Some(machine) = olm_machine.as_ref() {
383            machine.backup_machine().enabled().await
384        } else {
385            false
386        }
387    }
388
389    /// Does a backup exist on the server?
390    ///
391    /// This method will request info about the current backup from the
392    /// homeserver and if a backup exists return `true`, otherwise `false`.
393    pub async fn fetch_exists_on_server(&self) -> Result<bool, Error> {
394        let exists_on_server = self.get_current_version().await?.is_some();
395        self.client.inner.e2ee.backup_state.set_backup_exists_on_server(exists_on_server);
396        Ok(exists_on_server)
397    }
398
399    /// Does a backup exist on the server?
400    ///
401    /// This method is identical to [`Self::fetch_exists_on_server`] except that
402    /// we cache the latest answer in memory and only empty the cache if the
403    /// local device adds or deletes a backup itself.
404    ///
405    /// Do not use this method if you need an accurate answer about whether a
406    /// backup exists - instead use [`Self::fetch_exists_on_server`]. This
407    /// method is useful when performance is more important than guaranteed
408    /// accuracy, such as when classifying UTDs.
409    pub async fn exists_on_server(&self) -> Result<bool, Error> {
410        // If we have an answer cached, return it immediately
411        if let Some(cached_value) = self.client.inner.e2ee.backup_state.backup_exists_on_server() {
412            return Ok(cached_value);
413        }
414
415        // Otherwise, delegate to fetch_exists_on_server. (It will update the cached
416        // value for us.)
417        self.fetch_exists_on_server().await
418    }
419
420    /// Subscribe to a stream that notifies when a room key for the specified
421    /// room is downloaded from the key backup.
422    pub fn room_keys_for_room_stream(
423        &self,
424        room_id: &RoomId,
425    ) -> impl Stream<Item = Result<BTreeMap<String, BTreeSet<String>>, BroadcastStreamRecvError>> + use<>
426    {
427        let room_id = room_id.to_owned();
428
429        // TODO: This is a bit crap to say the least. The type is
430        // non-descriptive and doesn't even contain all the important data. It
431        // should be a stream of `RoomKeyInfo` like the OlmMachine has... But on
432        // the other hand we should just be able to use the corresponding
433        // OlmMachine stream and remove this. Currently we can't do this because
434        // the OlmMachine gets destroyed and recreated all the time to be able
435        // to support the notifications-related multiprocessing on iOS.
436        self.room_keys_stream().filter_map(move |import_result| {
437            let room_id = room_id.to_owned();
438
439            async move {
440                match import_result {
441                    Ok(mut import_result) => import_result.keys.remove(&room_id).map(Ok),
442                    Err(e) => Some(Err(e)),
443                }
444            }
445        })
446    }
447
448    /// Download all room keys for a certain room from the server-side key
449    /// backup.
450    pub async fn download_room_keys_for_room(&self, room_id: &RoomId) -> Result<(), Error> {
451        let olm_machine = self.client.olm_machine().await;
452        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
453
454        let backup_keys = olm_machine.store().load_backup_keys().await?;
455
456        if let Some(decryption_key) = backup_keys.decryption_key
457            && let Some(version) = backup_keys.backup_version
458        {
459            let request =
460                get_backup_keys_for_room::v3::Request::new(version.clone(), room_id.to_owned());
461            let response = self.client.send(request).await?;
462
463            // Transform response to standard format (map of room ID -> room key).
464            let response = get_backup_keys::v3::Response::new(BTreeMap::from([(
465                room_id.to_owned(),
466                RoomKeyBackup::new(response.sessions),
467            )]));
468
469            self.handle_downloaded_room_keys(response, decryption_key, &version, olm_machine)
470                .await?;
471        }
472
473        Ok(())
474    }
475
476    /// Download a single room key from the server-side key backup.
477    ///
478    /// Returns `true` if we managed to download a room key, `false` or an error
479    /// if we failed to download it. `false` indicates that there was no
480    /// error, we just don't have backups enabled so we can't download a
481    /// room key.
482    pub async fn download_room_key(
483        &self,
484        room_id: &RoomId,
485        session_id: &str,
486    ) -> Result<bool, Error> {
487        let olm_machine = self.client.olm_machine().await;
488        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
489
490        let backup_keys = olm_machine.store().load_backup_keys().await?;
491
492        if let Some(decryption_key) = backup_keys.decryption_key {
493            if let Some(version) = backup_keys.backup_version {
494                let request = get_backup_keys_for_session::v3::Request::new(
495                    version.clone(),
496                    room_id.to_owned(),
497                    session_id.to_owned(),
498                );
499                let response = self.client.send(request).await?;
500
501                // Transform response to standard format (map of room ID -> room key).
502                let response = get_backup_keys::v3::Response::new(BTreeMap::from([(
503                    room_id.to_owned(),
504                    RoomKeyBackup::new(BTreeMap::from([(
505                        session_id.to_owned(),
506                        response.key_data,
507                    )])),
508                )]));
509
510                self.handle_downloaded_room_keys(response, decryption_key, &version, olm_machine)
511                    .await?;
512
513                Ok(true)
514            } else {
515                Ok(false)
516            }
517        } else {
518            Ok(false)
519        }
520    }
521
522    /// Set the state of the backup.
523    fn set_state(&self, new_state: BackupState) {
524        let old_state = self.client.inner.e2ee.backup_state.global_state.set(new_state);
525
526        if old_state != new_state {
527            info!("Backup state changed from {old_state:?} to {new_state:?}");
528        }
529    }
530
531    /// Set the backup state to the `Enabled` variant and insert the backup key
532    /// and version into the [`OlmMachine`].
533    async fn enable(
534        &self,
535        olm_machine: &OlmMachine,
536        backup_key: MegolmV1BackupKey,
537        version: String,
538    ) -> Result<(), Error> {
539        backup_key.set_version(version);
540        olm_machine.backup_machine().enable_backup_v1(backup_key).await?;
541
542        self.set_state(BackupState::Enabled);
543
544        Ok(())
545    }
546
547    /// Decrypt and forward a response containing backed up room keys to the
548    /// [`OlmMachine`].
549    async fn handle_downloaded_room_keys(
550        &self,
551        backed_up_keys: get_backup_keys::v3::Response,
552        backup_decryption_key: BackupDecryptionKey,
553        backup_version: &str,
554        olm_machine: &OlmMachine,
555    ) -> Result<(), Error> {
556        let mut decrypted_room_keys: Vec<_> = Vec::new();
557
558        for (room_id, room_keys) in backed_up_keys.rooms {
559            for (session_id, room_key) in room_keys.sessions {
560                let room_key = match room_key.deserialize() {
561                    Ok(k) => k,
562                    Err(e) => {
563                        warn!(
564                            "Couldn't deserialize a room key we downloaded from backups, session \
565                             ID: {session_id}, error: {e:?}"
566                        );
567                        continue;
568                    }
569                };
570
571                let room_key =
572                    match backup_decryption_key.decrypt_session_data(room_key.session_data) {
573                        Ok(k) => k,
574                        Err(e) => {
575                            warn!(
576                                "Couldn't decrypt a room key we downloaded from backups, session \
577                                 ID: {session_id}, error: {e:?}"
578                            );
579                            continue;
580                        }
581                    };
582
583                decrypted_room_keys.push(ExportedRoomKey::from_backed_up_room_key(
584                    room_id.to_owned(),
585                    session_id,
586                    room_key,
587                ));
588            }
589        }
590
591        let result = olm_machine
592            .store()
593            .import_room_keys(decrypted_room_keys, Some(backup_version), |_, _| {})
594            .await?;
595
596        // Since we can't use the usual room keys stream from the `OlmMachine`
597        // we're going to send things out in our own custom broadcaster.
598        let _ = self.client.inner.e2ee.backup_state.room_keys_broadcaster.send(result);
599
600        Ok(())
601    }
602
603    /// Download all room keys from the backup on the homeserver.
604    async fn download_all_room_keys(
605        &self,
606        decryption_key: BackupDecryptionKey,
607        version: String,
608    ) -> Result<(), Error> {
609        let request = get_backup_keys::v3::Request::new(version.clone());
610        let response = self.client.send(request).await?;
611
612        let olm_machine = self.client.olm_machine().await;
613        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
614
615        self.handle_downloaded_room_keys(response, decryption_key, &version, olm_machine).await?;
616
617        Ok(())
618    }
619
620    fn room_keys_stream(
621        &self,
622    ) -> impl Stream<Item = Result<RoomKeyImportResult, BroadcastStreamRecvError>> + use<> {
623        BroadcastStream::new(self.client.inner.e2ee.backup_state.room_keys_broadcaster.subscribe())
624    }
625
626    /// Get info about the currently active backup from the server.
627    async fn get_current_version(
628        &self,
629    ) -> Result<Option<get_latest_backup_info::v3::Response>, Error> {
630        let request = get_latest_backup_info::v3::Request::new();
631
632        match self.client.send(request).await {
633            Ok(r) => Ok(Some(r)),
634            Err(e) => {
635                if let Some(kind) = e.client_api_error_kind() {
636                    if kind == &ErrorKind::NotFound { Ok(None) } else { Err(e.into()) }
637                } else {
638                    Err(e.into())
639                }
640            }
641        }
642    }
643
644    async fn delete_backup_from_server(&self, version: String) -> Result<(), Error> {
645        let request = ruma::api::client::backup::delete_backup_version::v3::Request::new(version);
646
647        let ret = match self.client.send(request).await {
648            Ok(_) => Ok(()),
649            Err(e) => {
650                if let Some(kind) = e.client_api_error_kind() {
651                    if kind == &ErrorKind::NotFound { Ok(()) } else { Err(e.into()) }
652                } else {
653                    Err(e.into())
654                }
655            }
656        };
657
658        // If the request succeeded, the backup is gone. If it failed, we are not really
659        // sure what the backup state is. Either way, clear the cache so we check next
660        // time we need to know.
661        self.client.inner.e2ee.backup_state.clear_backup_exists_on_server();
662
663        ret
664    }
665
666    #[instrument(skip(self, olm_machine, request))]
667    async fn send_backup_request(
668        &self,
669        olm_machine: &OlmMachine,
670        request_id: &TransactionId,
671        request: KeysBackupRequest,
672    ) -> Result<(), Error> {
673        trace!("Uploading some room keys");
674
675        let add_backup_keys = add_backup_keys::v3::Request::new(request.version, request.rooms);
676
677        match self.client.send(add_backup_keys).await {
678            Ok(response) => {
679                olm_machine.mark_request_as_sent(request_id, &response).await?;
680
681                let new_counts = olm_machine.backup_machine().room_key_counts().await?;
682
683                self.client
684                    .inner
685                    .e2ee
686                    .backup_state
687                    .upload_progress
688                    .set(UploadState::Uploading(new_counts));
689
690                let delay =
691                    self.client.inner.e2ee.backup_state.upload_delay.read().unwrap().to_owned();
692                crate::sleep::sleep(delay).await;
693
694                Ok(())
695            }
696            Err(error) => {
697                if let Some(kind) = error.client_api_error_kind() {
698                    match kind {
699                        ErrorKind::NotFound => {
700                            warn!(
701                                "No backup found on the server, the backup likely got deleted, \
702                                 disabling backups."
703                            );
704
705                            self.handle_deleted_backup_version(olm_machine).await?;
706                        }
707                        ErrorKind::WrongRoomKeysVersion { current_version } => {
708                            warn!(
709                                new_version = current_version,
710                                "A new backup version was found on the server, disabling backups."
711                            );
712
713                            // TODO: If we're verified and there are other devices besides us,
714                            // request the new backup key over `m.secret.send`.
715
716                            self.handle_deleted_backup_version(olm_machine).await?;
717                        }
718
719                        _ => (),
720                    }
721                }
722
723                Err(error.into())
724            }
725        }
726    }
727
728    /// Poll the [`OlmMachine`] for room keys which need to be backed up and
729    /// send out the request to the homeserver.
730    ///
731    /// This should only be called by the [`BackupUploadingTask`].
732    ///
733    /// [`BackupUploadingTask`]: crate::client::tasks::BackupUploadingTask
734    pub(crate) async fn backup_room_keys(&self) -> Result<(), Error> {
735        let _guard = self.client.locks().backup_upload_lock.lock().await;
736
737        let olm_machine = self.client.olm_machine().await;
738        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
739
740        while let Some((request_id, request)) = olm_machine.backup_machine().backup().await? {
741            self.send_backup_request(olm_machine, &request_id, request).await?;
742        }
743
744        self.client.inner.e2ee.backup_state.upload_progress.set(UploadState::Done);
745
746        Ok(())
747    }
748
749    /// Set up a `m.secret.send` listener and re-enable backups if we have a
750    /// backup recovery key stored.
751    pub(crate) async fn setup_and_resume(&self) -> Result<(), Error> {
752        info!("Setting up secret listeners and trying to resume backups");
753
754        self.client.add_event_handler(Self::secret_send_event_handler);
755
756        if self.client.inner.e2ee.encryption_settings.backup_download_strategy
757            == BackupDownloadStrategy::AfterDecryptionFailure
758        {
759            self.client.add_event_handler(Self::utd_event_handler);
760        }
761
762        self.maybe_resume_backups().await?;
763
764        Ok(())
765    }
766
767    /// Try to enable backups with the given backup recovery key.
768    ///
769    /// This should be called if we receive a backup recovery, either:
770    ///
771    /// * As an `m.secret.send` to-device message from a trusted device.
772    /// * From 4S (i.e. from the `m.megolm_backup.v1` event global account
773    ///   data).
774    ///
775    /// In both cases the method will compare the currently active backup
776    /// version to the backup recovery key's version and, if there is a match,
777    /// activate backups on this device and start uploading room keys to the
778    /// backup.
779    ///
780    /// Returns true if backups were just enabled or were already enabled,
781    /// otherwise false.
782    #[instrument(skip_all)]
783    pub(crate) async fn maybe_enable_backups(
784        &self,
785        maybe_recovery_key: &str,
786    ) -> Result<bool, Error> {
787        let _guard = self.client.locks().backup_modify_lock.lock().await;
788
789        // Create a future here which allows us to catch any failure that might happen
790        // so we can later on fall back to the correct `BackupState`.
791        let future = async {
792            self.set_state(BackupState::Enabling);
793
794            let olm_machine = self.client.olm_machine().await;
795            let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
796            let backup_machine = olm_machine.backup_machine();
797
798            let decryption_key =
799                BackupDecryptionKey::from_base64(maybe_recovery_key).map_err(|e| {
800                    <serde_json::Error as serde::de::Error>::custom(format!(
801                        "Couldn't deserialize the backup recovery key: {e:?}"
802                    ))
803                })?;
804
805            // Let's try to see if there's a backup on the homeserver.
806            let current_version = self.get_current_version().await?;
807
808            let Some(current_version) = current_version else {
809                warn!("Tried to enable backups, but no backup version was found on the server.");
810                return Ok(false);
811            };
812
813            Span::current().record("backup_version", &current_version.version);
814
815            let backup_info: RoomKeyBackupInfo = current_version.algorithm.deserialize_as()?;
816            let stored_keys = backup_machine.get_backup_keys().await?;
817
818            if stored_keys.backup_version.as_ref() == Some(&current_version.version)
819                && self.are_enabled().await
820            {
821                // If we already have a backup enabled which is using the currently active
822                // backup version, do nothing but tell the caller using the return value that
823                // backups are enabled.
824                Ok(true)
825            } else if decryption_key.backup_key_matches(&backup_info) {
826                info!(
827                    "We have found the correct backup recovery key. Storing the backup recovery \
828                     key and enabling backups."
829                );
830
831                // We're enabling a new backup, reset the `backed_up` flags on the room keys and
832                // remove any key/version we might have.
833                backup_machine.disable_backup().await?;
834
835                let backup_key = decryption_key.megolm_v1_public_key();
836                backup_key.set_version(current_version.version.to_owned());
837
838                // Persist the new keys and enable the backup.
839                backup_machine
840                    .save_decryption_key(
841                        Some(decryption_key.to_owned()),
842                        Some(current_version.version.to_owned()),
843                    )
844                    .await?;
845                backup_machine.enable_backup_v1(backup_key).await?;
846
847                // If the user has set up the client to download any room keys, do so now. This
848                // is not really useful in a real scenario since the API to
849                // download room keys is not paginated.
850                //
851                // You need to download all room keys at once, parse a potentially huge JSON
852                // response and decrypt all the room keys found in the backup.
853                //
854                // This doesn't work for any sizeable account.
855                if self.client.inner.e2ee.encryption_settings.backup_download_strategy
856                    == BackupDownloadStrategy::OneShot
857                {
858                    self.set_state(BackupState::Downloading);
859
860                    if let Err(e) =
861                        self.download_all_room_keys(decryption_key, current_version.version).await
862                    {
863                        warn!("Couldn't automatically download all room keys from backup: {e:?}");
864                    }
865                }
866
867                // Trigger the upload of any room keys we might need to upload.
868                self.maybe_trigger_backup();
869
870                Ok(true)
871            } else {
872                let derived_key = decryption_key.megolm_v1_public_key();
873                let downloaded_key = current_version.algorithm;
874
875                warn!(
876                    ?derived_key,
877                    ?downloaded_key,
878                    "Found an active backup but the recovery key we received isn't the one used for \
879                     this backup version"
880                );
881
882                Ok(false)
883            }
884        };
885
886        match future.await {
887            Ok(enabled) => {
888                if enabled {
889                    self.set_state(BackupState::Enabled);
890                } else {
891                    self.set_state(BackupState::Unknown);
892                }
893
894                Ok(enabled)
895            }
896            Err(e) => {
897                self.set_state(BackupState::Unknown);
898
899                Err(e)
900            }
901        }
902    }
903
904    /// Try to resume backups from a backup recovery key we have found in the
905    /// crypto store.
906    ///
907    /// Returns true if backups have been resumed, false otherwise.
908    async fn resume_backup_from_stored_backup_key(
909        &self,
910        olm_machine: &OlmMachine,
911    ) -> Result<bool, Error> {
912        let backup_keys = olm_machine.store().load_backup_keys().await?;
913
914        if let Some(decryption_key) = backup_keys.decryption_key {
915            if let Some(version) = backup_keys.backup_version {
916                let backup_key = decryption_key.megolm_v1_public_key();
917
918                self.enable(olm_machine, backup_key, version).await?;
919
920                Ok(true)
921            } else {
922                Ok(false)
923            }
924        } else {
925            Ok(false)
926        }
927    }
928
929    /// Try to resume backups by iterating through the `m.secret.send` to-device
930    /// messages the [`OlmMachine`] has received and stored in the secret inbox.
931    async fn maybe_resume_from_secret_inbox(&self, olm_machine: &OlmMachine) -> Result<(), Error> {
932        let secrets = olm_machine.store().get_secrets_from_inbox(&SecretName::RecoveryKey).await?;
933
934        for secret in secrets {
935            if self.maybe_enable_backups(&secret.event.content.secret).await? {
936                break;
937            }
938        }
939
940        olm_machine.store().delete_secrets_from_inbox(&SecretName::RecoveryKey).await?;
941
942        Ok(())
943    }
944
945    /// Check and re-enable a backup if we have a backup recovery key locally.
946    async fn maybe_resume_backups(&self) -> Result<(), Error> {
947        let olm_machine = self.client.olm_machine().await;
948        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
949
950        // Let us first check if we have a stored backup recovery key and a backup
951        // version.
952        if !self.resume_backup_from_stored_backup_key(olm_machine).await? {
953            // We didn't manage to enable backups from a stored backup recovery key, let us
954            // check our secret inbox. Perhaps we can find a valid key there.
955            self.maybe_resume_from_secret_inbox(olm_machine).await?;
956        }
957
958        Ok(())
959    }
960
961    /// Listen for `m.secret.send` to-device messages and check the secret inbox
962    /// if we do receive one.
963    #[instrument(skip_all)]
964    pub(crate) async fn secret_send_event_handler(_: ToDeviceSecretSendEvent, client: Client) {
965        let olm_machine = client.olm_machine().await;
966
967        // TODO: Because of our crude multi-process support, which reloads the whole
968        // [`OlmMachine`] the `secrets_stream` might stop giving you updates. Once
969        // that's fixed, stop listening to individual secret send events and
970        // listen to the secrets stream.
971        if let Some(olm_machine) = olm_machine.as_ref() {
972            if let Err(e) =
973                client.encryption().backups().maybe_resume_from_secret_inbox(olm_machine).await
974            {
975                error!("Could not handle `m.secret.send` event: {e:?}");
976            }
977        } else {
978            error!("Tried to handle a `m.secret.send` event but no OlmMachine was initialized");
979        }
980    }
981
982    /// Handle UTD events by triggering download from key backup.
983    ///
984    /// This function is registered as an event handler; it exists to deal
985    /// with cases where [`Room::decrypt_event`] is not called and instead the
986    /// event should be decrypted by the time this crate sees the event, such as
987    /// for events received via `/sync` (as opposed to via `/messages`,
988    /// `/context`, etc.)
989    #[allow(clippy::unused_async)] // Because it's used as an event handler, which must be async.
990    pub(crate) async fn utd_event_handler(
991        event: Raw<OriginalSyncRoomEncryptedEvent>,
992        room: Room,
993        client: Client,
994    ) {
995        client.encryption().backups().maybe_download_room_key(room.room_id().to_owned(), event);
996    }
997
998    /// Send a notification to the task responsible for key backup downloads
999    /// that it should attempt to download the keys for the given event.
1000    #[cfg(not(feature = "experimental-encrypted-state-events"))]
1001    pub(crate) fn maybe_download_room_key(
1002        &self,
1003        room_id: OwnedRoomId,
1004        event: Raw<OriginalSyncRoomEncryptedEvent>,
1005    ) {
1006        let tasks = self.client.inner.e2ee.tasks.lock();
1007        if let Some(task) = tasks.download_room_keys.as_ref() {
1008            task.trigger_download_for_utd_event(room_id, event);
1009        }
1010    }
1011
1012    /// Send a notification to the task responsible for key backup downloads
1013    /// that it should attempt to download the keys for the given event.
1014    #[cfg(feature = "experimental-encrypted-state-events")]
1015    pub(crate) fn maybe_download_room_key<T: JsonCastable<EncryptedEvent>>(
1016        &self,
1017        room_id: OwnedRoomId,
1018        event: Raw<T>,
1019    ) {
1020        let tasks = self.client.inner.e2ee.tasks.lock();
1021        if let Some(task) = tasks.download_room_keys.as_ref() {
1022            task.trigger_download_for_utd_event(room_id, event);
1023        }
1024    }
1025
1026    /// Send a notification to the task which is responsible for uploading room
1027    /// keys to the backup that it might have new room keys to back up.
1028    pub(crate) fn maybe_trigger_backup(&self) {
1029        let tasks = self.client.inner.e2ee.tasks.lock();
1030
1031        if let Some(tasks) = tasks.upload_room_keys.as_ref() {
1032            tasks.trigger_upload();
1033        }
1034    }
1035
1036    /// Disable our backups locally if we notice that the backup has been
1037    /// removed on the homeserver.
1038    async fn handle_deleted_backup_version(&self, olm_machine: &OlmMachine) -> Result<(), Error> {
1039        olm_machine.backup_machine().disable_backup().await?;
1040        self.set_state(BackupState::Unknown);
1041
1042        Ok(())
1043    }
1044}
1045
1046#[cfg(all(test, not(target_family = "wasm")))]
1047mod test {
1048    use std::time::Duration;
1049
1050    use matrix_sdk_test::async_test;
1051    use serde_json::json;
1052    use wiremock::{
1053        Mock, MockServer, ResponseTemplate,
1054        matchers::{header, method, path},
1055    };
1056
1057    use super::*;
1058    use crate::test_utils::{logged_in_client, mocks::MatrixMockServer};
1059
1060    fn room_key() -> ExportedRoomKey {
1061        let json = json!({
1062            "algorithm": "m.megolm.v1.aes-sha2",
1063            "room_id": "!DovneieKSTkdHKpIXy:morpheus.localhost",
1064            "sender_key": "DeHIg4gwhClxzFYcmNntPNF9YtsdZbmMy8+3kzCMXHA",
1065            "session_id": "gM8i47Xhu0q52xLfgUXzanCMpLinoyVyH7R58cBuVBU",
1066            "session_key": "AQAAAABvWMNZjKFtebYIePKieQguozuoLgzeY6wKcyJjLJcJtQgy1dPqTBD12U+XrYLrRHn\
1067                            lKmxoozlhFqJl456+9hlHCL+yq+6ScFuBHtJepnY1l2bdLb4T0JMDkNsNErkiLiLnD6yp3J\
1068                            DSjIhkdHxmup/huygrmroq6/L5TaThEoqvW4DPIuO14btKudsS34FF82pwjKS4p6Mlch+0e\
1069                            fHAblQV",
1070            "sender_claimed_keys":{},
1071            "forwarding_curve25519_key_chain":[]
1072        });
1073
1074        serde_json::from_value(json)
1075            .expect("We should be able to deserialize our exported room key")
1076    }
1077
1078    async fn backup_disabling_test_body(
1079        client: &Client,
1080        server: &MockServer,
1081        put_response: ResponseTemplate,
1082    ) {
1083        let _post_scope = Mock::given(method("POST"))
1084            .and(path("_matrix/client/unstable/room_keys/version"))
1085            .and(header("authorization", "Bearer 1234"))
1086            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1087              "version": "1"
1088            })))
1089            .expect(1)
1090            .named("POST for the backup creation")
1091            .mount_as_scoped(server)
1092            .await;
1093
1094        let _put_scope = Mock::given(method("PUT"))
1095            .and(path("_matrix/client/unstable/room_keys/keys"))
1096            .and(header("authorization", "Bearer 1234"))
1097            .respond_with(put_response)
1098            .expect(1)
1099            .named("POST for the backup creation")
1100            .mount_as_scoped(server)
1101            .await;
1102
1103        client
1104            .encryption()
1105            .backups()
1106            .create()
1107            .await
1108            .expect("We should be able to create a new backup");
1109
1110        assert_eq!(client.encryption().backups().state(), BackupState::Enabled);
1111
1112        client
1113            .encryption()
1114            .backups()
1115            .backup_room_keys()
1116            .await
1117            .expect_err("Backups should be disabled");
1118
1119        assert_eq!(client.encryption().backups().state(), BackupState::Unknown);
1120    }
1121
1122    #[async_test]
1123    async fn test_backup_disabling_after_remote_deletion() {
1124        let server = MockServer::start().await;
1125        let client = logged_in_client(Some(server.uri())).await;
1126
1127        {
1128            let machine = client.olm_machine().await;
1129            machine
1130                .as_ref()
1131                .unwrap()
1132                .store()
1133                .import_exported_room_keys(vec![room_key()], |_, _| {})
1134                .await
1135                .expect("We should be able to import a room key");
1136        }
1137
1138        backup_disabling_test_body(
1139            &client,
1140            &server,
1141            ResponseTemplate::new(404).set_body_json(json!({
1142                "errcode": "M_NOT_FOUND",
1143                "error": "Unknown backup version"
1144            })),
1145        )
1146        .await;
1147
1148        backup_disabling_test_body(
1149            &client,
1150            &server,
1151            ResponseTemplate::new(403).set_body_json(json!({
1152                "current_version": "42",
1153                "errcode": "M_WRONG_ROOM_KEYS_VERSION",
1154                "error": "Wrong backup version."
1155            })),
1156        )
1157        .await;
1158
1159        server.verify().await;
1160    }
1161
1162    #[async_test]
1163    async fn test_when_a_backup_exists_then_fetch_exists_on_server_returns_true() {
1164        let server = MatrixMockServer::new().await;
1165        let client = server.client_builder().build().await;
1166
1167        server.mock_room_keys_version().exists().expect(1).mount().await;
1168
1169        let exists = client
1170            .encryption()
1171            .backups()
1172            .fetch_exists_on_server()
1173            .await
1174            .expect("We should be able to check if backups exist on the server");
1175
1176        assert!(exists, "We should deduce that a backup exists on the server");
1177    }
1178
1179    #[async_test]
1180    async fn test_repeated_calls_to_fetch_exists_on_server_makes_repeated_requests() {
1181        let server = MatrixMockServer::new().await;
1182        let client = server.client_builder().build().await;
1183
1184        // Expect 2 requests to the server
1185        server.mock_room_keys_version().exists().expect(2).mount().await;
1186
1187        let backups = client.encryption().backups();
1188
1189        // Call fetch_exists_on_server twice
1190        backups.fetch_exists_on_server().await.unwrap();
1191        let exists = backups.fetch_exists_on_server().await.unwrap();
1192
1193        assert!(exists, "We should deduce that a backup exists on the server");
1194    }
1195
1196    #[async_test]
1197    async fn test_when_no_backup_exists_then_fetch_exists_on_server_returns_false() {
1198        let server = MatrixMockServer::new().await;
1199        let client = server.client_builder().build().await;
1200
1201        server.mock_room_keys_version().none().expect(1).mount().await;
1202
1203        let exists = client
1204            .encryption()
1205            .backups()
1206            .fetch_exists_on_server()
1207            .await
1208            .expect("We should be able to check if backups exist on the server");
1209
1210        assert!(!exists, "We should deduce that no backup exists on the server");
1211    }
1212
1213    #[async_test]
1214    async fn test_when_server_returns_an_error_then_fetch_exists_on_server_returns_an_error() {
1215        let server = MatrixMockServer::new().await;
1216        let client = server.client_builder().build().await;
1217
1218        {
1219            let _scope =
1220                server.mock_room_keys_version().error429().expect(1).mount_as_scoped().await;
1221
1222            client.encryption().backups().fetch_exists_on_server().await.expect_err(
1223                "If the /version endpoint returns a non 404 error we should throw an error",
1224            );
1225        }
1226
1227        {
1228            let _scope =
1229                server.mock_room_keys_version().error404().expect(1).mount_as_scoped().await;
1230
1231            client.encryption().backups().fetch_exists_on_server().await.expect_err(
1232                "If the /version endpoint returns a non-Matrix 404 error we should throw an error",
1233            );
1234        }
1235    }
1236
1237    #[async_test]
1238    async fn test_when_a_backup_exists_then_exists_on_server_returns_true() {
1239        let server = MatrixMockServer::new().await;
1240        let client = server.client_builder().build().await;
1241
1242        server.mock_room_keys_version().exists().expect(1).mount().await;
1243
1244        let exists = client
1245            .encryption()
1246            .backups()
1247            .exists_on_server()
1248            .await
1249            .expect("We should be able to check if backups exist on the server");
1250
1251        assert!(exists, "We should deduce that a backup exists on the server");
1252    }
1253
1254    #[async_test]
1255    async fn test_when_no_backup_exists_then_exists_on_server_returns_false() {
1256        let server = MatrixMockServer::new().await;
1257        let client = server.client_builder().build().await;
1258
1259        server.mock_room_keys_version().none().expect(1).mount().await;
1260
1261        let exists = client
1262            .encryption()
1263            .backups()
1264            .exists_on_server()
1265            .await
1266            .expect("We should be able to check if backups exist on the server");
1267
1268        assert!(!exists, "We should deduce that no backup exists on the server");
1269    }
1270
1271    #[async_test]
1272    async fn test_when_server_returns_an_error_then_exists_on_server_returns_an_error() {
1273        let server = MatrixMockServer::new().await;
1274        let client = server.client_builder().build().await;
1275
1276        {
1277            let _scope =
1278                server.mock_room_keys_version().error429().expect(1).mount_as_scoped().await;
1279
1280            client.encryption().backups().exists_on_server().await.expect_err(
1281                "If the /version endpoint returns a non 404 error we should throw an error",
1282            );
1283        }
1284
1285        {
1286            let _scope =
1287                server.mock_room_keys_version().error404().expect(1).mount_as_scoped().await;
1288
1289            client.encryption().backups().exists_on_server().await.expect_err(
1290                "If the /version endpoint returns a non-Matrix 404 error we should throw an error",
1291            );
1292        }
1293    }
1294
1295    #[async_test]
1296    async fn test_repeated_calls_to_exists_on_server_do_not_make_additional_requests() {
1297        let server = MatrixMockServer::new().await;
1298        let client = server.client_builder().build().await;
1299
1300        // Create a mock stating that the request should only be made once
1301        server.mock_room_keys_version().exists().expect(1).mount().await;
1302
1303        let backups = client.encryption().backups();
1304
1305        // Call exists_on_server several times
1306        backups.exists_on_server().await.unwrap();
1307        backups.exists_on_server().await.unwrap();
1308        backups.exists_on_server().await.unwrap();
1309
1310        let exists = backups
1311            .exists_on_server()
1312            .await
1313            .expect("We should be able to check if backups exist on the server");
1314
1315        assert!(exists, "We should deduce that a backup exists on the server");
1316
1317        // We check expectations here, confirming that only one call was made
1318    }
1319
1320    #[async_test]
1321    async fn test_adding_a_backup_invalidates_exists_on_server_cache() {
1322        let server = MatrixMockServer::new().await;
1323        let client = server.client_builder().build().await;
1324        let backups = client.encryption().backups();
1325
1326        {
1327            let _scope = server.mock_room_keys_version().none().expect(1).mount_as_scoped().await;
1328
1329            // Call exists_on_server to fill the cache
1330            let exists = backups.exists_on_server().await.unwrap();
1331            assert!(!exists, "No backup exists at this point");
1332        }
1333
1334        // Create a new backup. Should invalidate the cache
1335        server.mock_add_room_keys_version().ok().expect(1).mount().await;
1336        backups.create().await.expect("Failed to create a backup");
1337
1338        server.mock_room_keys_version().exists().expect(1).mount().await;
1339        let exists = backups
1340            .exists_on_server()
1341            .await
1342            .expect("We should be able to check if backups exist on the server");
1343
1344        assert!(exists, "But now a backup does exist");
1345    }
1346
1347    #[async_test]
1348    async fn test_removing_a_backup_invalidates_exists_on_server_cache() {
1349        let server = MatrixMockServer::new().await;
1350        let client = server.client_builder().build().await;
1351        let backups = client.encryption().backups();
1352
1353        {
1354            let _scope = server.mock_room_keys_version().exists().expect(1).mount_as_scoped().await;
1355
1356            // Call exists_on_server to fill the cache
1357            let exists = backups.exists_on_server().await.unwrap();
1358            assert!(exists, "A backup exists at this point");
1359        }
1360
1361        // Delete the backup. Should invalidate the cache
1362        server.mock_delete_room_keys_version().ok().expect(1).mount().await;
1363        backups.delete_backup_from_server("1".to_owned()).await.expect("Failed to delete a backup");
1364
1365        server.mock_room_keys_version().none().expect(1).mount().await;
1366        let exists = backups
1367            .exists_on_server()
1368            .await
1369            .expect("We should be able to check if backups exist on the server");
1370
1371        assert!(!exists, "But now there is no backup");
1372    }
1373
1374    #[async_test]
1375    async fn test_waiting_for_steady_state_resets_the_delay() {
1376        let server = MatrixMockServer::new().await;
1377        let client = server.client_builder().build().await;
1378
1379        server.mock_add_room_keys_version().ok().expect(1).mount().await;
1380
1381        client
1382            .encryption()
1383            .backups()
1384            .create()
1385            .await
1386            .expect("We should be able to create a new backup");
1387
1388        let backups = client.encryption().backups();
1389
1390        let old_duration =
1391            { client.inner.e2ee.backup_state.upload_delay.read().unwrap().to_owned() };
1392
1393        let wait_for_steady_state =
1394            backups.wait_for_steady_state().with_delay(Duration::from_nanos(100));
1395
1396        let mut progress_stream = wait_for_steady_state.subscribe_to_progress();
1397
1398        let task = matrix_sdk_common::executor::spawn({
1399            let client = client.to_owned();
1400            async move {
1401                while let Some(state) = progress_stream.next().await {
1402                    let Ok(state) = state else {
1403                        panic!("Error while waiting for the upload state")
1404                    };
1405
1406                    match state {
1407                        UploadState::Idle => (),
1408                        UploadState::Done => {
1409                            let current_delay = {
1410                                client
1411                                    .inner
1412                                    .e2ee
1413                                    .backup_state
1414                                    .upload_delay
1415                                    .read()
1416                                    .unwrap()
1417                                    .to_owned()
1418                            };
1419
1420                            assert_ne!(current_delay, old_duration);
1421                            break;
1422                        }
1423                        _ => panic!("We should not have entered any other state"),
1424                    }
1425                }
1426            }
1427        });
1428
1429        wait_for_steady_state.await.expect("We should be able to wait for the steady state");
1430        task.await.unwrap();
1431
1432        let current_duration =
1433            { client.inner.e2ee.backup_state.upload_delay.read().unwrap().to_owned() };
1434
1435        assert_eq!(old_duration, current_duration);
1436    }
1437}