matrix_sdk/encryption/backups/
mod.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Room key backup support
16//!
17//! This module implements support for server-side key backups[[1]]. The module
18//! allows you to connect to an existing backup, create or delete backups from
19//! the homeserver, and download room keys from a backup.
20//!
21//! [1]: https://spec.matrix.org/unstable/client-server-api/#server-side-key-backups
22
23use std::collections::{BTreeMap, BTreeSet};
24
25use futures_core::Stream;
26use futures_util::StreamExt;
27#[cfg(feature = "experimental-encrypted-state-events")]
28use matrix_sdk_base::crypto::types::events::room::encrypted::EncryptedEvent;
29use matrix_sdk_base::crypto::{
30    OlmMachine, RoomKeyImportResult,
31    backups::MegolmV1BackupKey,
32    store::types::BackupDecryptionKey,
33    types::{RoomKeyBackupInfo, requests::KeysBackupRequest},
34};
35#[cfg(feature = "experimental-encrypted-state-events")]
36use ruma::serde::JsonCastable;
37use ruma::{
38    OwnedRoomId, RoomId, TransactionId,
39    api::client::{
40        backup::{
41            RoomKeyBackup, add_backup_keys, create_backup_version, get_backup_keys,
42            get_backup_keys_for_room, get_backup_keys_for_session, get_latest_backup_info,
43        },
44        error::ErrorKind,
45    },
46    events::{
47        room::encrypted::OriginalSyncRoomEncryptedEvent,
48        secret::{request::SecretName, send::ToDeviceSecretSendEvent},
49    },
50    serde::Raw,
51};
52use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
53use tracing::{Span, error, info, instrument, trace, warn};
54
55pub mod futures;
56pub(crate) mod types;
57
58pub use types::{BackupState, UploadState};
59
60use self::futures::WaitForSteadyState;
61use crate::{
62    Client, Error, Room, crypto::olm::ExportedRoomKey, encryption::BackupDownloadStrategy,
63};
64
65/// The backups manager for the [`Client`].
66#[derive(Debug, Clone)]
67pub struct Backups {
68    pub(super) client: Client,
69}
70
71impl Backups {
72    /// Create a new backup version, encrypted with a new backup recovery key.
73    ///
74    /// The backup recovery key will be persisted locally and shared with
75    /// trusted devices as `m.secret.send` to-device messages.
76    ///
77    /// After the backup has been created, all room keys will be uploaded to the
78    /// homeserver.
79    ///
80    /// *Warning*: This will overwrite any existing backup.
81    ///
82    /// # Examples
83    ///
84    /// ```no_run
85    /// # use matrix_sdk::{Client, encryption::backups::BackupState};
86    /// # use url::Url;
87    /// # async {
88    /// # let homeserver = Url::parse("http://example.com")?;
89    /// # let client = Client::new(homeserver).await?;
90    /// let backups = client.encryption().backups();
91    /// backups.create().await?;
92    ///
93    /// assert_eq!(backups.state(), BackupState::Enabled);
94    /// # anyhow::Ok(()) };
95    /// ```
96    pub async fn create(&self) -> Result<(), Error> {
97        self.client.inner.e2ee.backup_state.clear_backup_exists_on_server();
98        let _guard = self.client.locks().backup_modify_lock.lock().await;
99
100        self.set_state(BackupState::Creating);
101
102        // Create a future so we can catch errors and go back to the `Unknown`
103        // state. This is a hack to get around the lack of `try` blocks in Rust.
104        let future = async {
105            let olm_machine = self.client.olm_machine().await;
106            let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
107
108            // Create a new backup recovery key.
109            let decryption_key = BackupDecryptionKey::new().expect(
110                "We should be able to generate enough randomness to create a new backup recovery \
111                 key",
112            );
113
114            // Get the info about the new backup key, this needs to be uploaded to the
115            // homeserver[1].
116            //
117            // We need to sign the `RoomKeyBackupInfo` so other clients which might want
118            // to start using the backup without having access to the
119            // `BackupDecryptionKey` can do so, as per [spec]:
120            //
121            // Clients must only store keys in backups after they have ensured that the
122            // `auth_data` has not been tampered with. This can be done either by:
123            //
124            //  * checking that it is signed by the user's master cross-signing key or by a
125            //    verified device belonging to the same user, or
126            //  * by deriving the public key from a private key that it obtained from a
127            //    trusted source. Trusted sources for the private key include the user
128            //    entering the key, retrieving the key stored in secret storage, or
129            //    obtaining the key via secret sharing from a verified device belonging to
130            //    the same user.
131            //
132            //
133            // [1]: https://spec.matrix.org/v1.8/client-server-api/#post_matrixclientv3room_keysversion
134            // [spec]: https://spec.matrix.org/v1.8/client-server-api/#server-side-key-backups
135            let mut backup_info = decryption_key.to_backup_info();
136
137            if let Err(e) = olm_machine.backup_machine().sign_backup(&mut backup_info).await {
138                warn!("Unable to sign the newly created backup version: {e:?}");
139            }
140
141            let algorithm = Raw::new(&backup_info)?.cast();
142            let request = create_backup_version::v3::Request::new(algorithm);
143            let response = self.client.send(request).await?;
144            let version = response.version;
145
146            // Reset any state we might have had before the new backup was created.
147            // TODO: This should remove the old stored key and version.
148            olm_machine.backup_machine().disable_backup().await?;
149
150            let backup_key = decryption_key.megolm_v1_public_key();
151
152            // Save the newly created keys and the version we received from the server.
153            olm_machine
154                .backup_machine()
155                .save_decryption_key(Some(decryption_key), Some(version.to_owned()))
156                .await?;
157
158            // Enable the backup and start the upload of room keys.
159            self.enable(olm_machine, backup_key, version).await?;
160
161            Ok(())
162        };
163
164        let result = future.await;
165
166        if result.is_err() {
167            self.set_state(BackupState::Unknown);
168        }
169
170        result
171    }
172
173    /// Disable and delete the currently active backup only if previously
174    /// enabled before, otherwise an error will be returned.
175    ///
176    /// For a more aggressive variant see [`Backups::disable_and_delete`] which
177    /// will delete the remote backup without checking the local state.
178    ///
179    /// # Examples
180    ///
181    /// ```no_run
182    /// # use matrix_sdk::{Client, encryption::backups::BackupState};
183    /// # use url::Url;
184    /// # async {
185    /// # let homeserver = Url::parse("http://example.com")?;
186    /// # let client = Client::new(homeserver).await?;
187    /// let backups = client.encryption().backups();
188    /// backups.disable().await?;
189    ///
190    /// assert_eq!(backups.state(), BackupState::Unknown);
191    /// # anyhow::Ok(()) };
192    /// ```
193    #[instrument(skip_all, fields(version))]
194    pub async fn disable(&self) -> Result<(), Error> {
195        let _guard = self.client.locks().backup_modify_lock.lock().await;
196
197        self.set_state(BackupState::Disabling);
198
199        // Create a future so we can catch errors and go back to the `Unknown` state.
200        let future = async {
201            let olm_machine = self.client.olm_machine().await;
202            let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
203
204            let backup_keys = olm_machine.backup_machine().get_backup_keys().await?;
205
206            if let Some(version) = backup_keys.backup_version {
207                Span::current().record("version", &version);
208                info!("Deleting and disabling backup");
209
210                self.delete_backup_from_server(version).await?;
211                info!("Backup successfully deleted");
212
213                olm_machine.backup_machine().disable_backup().await?;
214
215                info!("Backup successfully disabled and deleted");
216
217                Ok(())
218            } else {
219                info!("Backup is not enabled, can't disable it");
220                Err(Error::BackupNotEnabled)
221            }
222        };
223
224        let result = future.await;
225
226        self.set_state(BackupState::Unknown);
227
228        result
229    }
230
231    /// Completely disable and delete the active backup both locally
232    /// and from the backend no matter if previously setup locally
233    /// or not.
234    ///
235    /// ⚠️ This method is mainly used when resetting the crypto identity
236    /// and for most other use cases its safer [`Backups::disable`] counterpart
237    /// should be used.
238    ///
239    /// It will fetch the current backup version from the backend and delete it
240    /// before proceeding to disabling local backups as well
241    ///
242    /// # Examples
243    ///
244    /// ```no_run
245    /// # use matrix_sdk::{Client, encryption::backups::BackupState};
246    /// # use url::Url;
247    /// # async {
248    /// # let homeserver = Url::parse("http://example.com")?;
249    /// # let client = Client::new(homeserver).await?;
250    /// let backups = client.encryption().backups();
251    /// backups.disable_and_delete().await?;
252    ///
253    /// assert_eq!(backups.state(), BackupState::Unknown);
254    /// # anyhow::Ok(()) };
255    /// ```
256    pub async fn disable_and_delete(&self) -> Result<(), Error> {
257        let _guard = self.client.locks().backup_modify_lock.lock().await;
258
259        self.set_state(BackupState::Disabling);
260
261        // Create a future so we can catch errors and go back to the `Unknown` state.
262        let future = async {
263            let response = self.get_current_version().await?;
264
265            if let Some(response) = response {
266                self.delete_backup_from_server(response.version).await?;
267            }
268
269            let olm_machine = self.client.olm_machine().await;
270            let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
271
272            olm_machine.backup_machine().disable_backup().await?;
273
274            Ok(())
275        };
276
277        let result = future.await;
278
279        self.set_state(BackupState::Unknown);
280
281        result
282    }
283
284    /// Returns a future to wait for room keys to be uploaded.
285    ///
286    /// Awaiting the future will wake up a task to upload room keys which have
287    /// not yet been uploaded to the homeserver. It will then wait for the task
288    /// to finish uploading.
289    ///
290    /// # Examples
291    ///
292    /// ```no_run
293    /// # use matrix_sdk::{Client, encryption::backups::UploadState};
294    /// # use url::Url;
295    /// # async {
296    /// # let homeserver = Url::parse("http://example.com")?;
297    /// # let client = Client::new(homeserver).await?;
298    /// use futures_util::StreamExt;
299    ///
300    /// let backups = client.encryption().backups();
301    /// let wait_for_steady_state = backups.wait_for_steady_state();
302    ///
303    /// let mut progress_stream = wait_for_steady_state.subscribe_to_progress();
304    ///
305    /// tokio::spawn(async move {
306    ///     while let Some(update) = progress_stream.next().await {
307    ///         let Ok(update) = update else { break };
308    ///
309    ///         match update {
310    ///             UploadState::Uploading(counts) => {
311    ///                 println!(
312    ///                     "Uploaded {} out of {} room keys.",
313    ///                     counts.backed_up, counts.total
314    ///                 );
315    ///             }
316    ///             UploadState::Error => break,
317    ///             UploadState::Done => break,
318    ///             _ => (),
319    ///         }
320    ///     }
321    /// });
322    ///
323    /// wait_for_steady_state.await?;
324    ///
325    /// # anyhow::Ok(()) };
326    /// ```
327    pub fn wait_for_steady_state(&self) -> WaitForSteadyState<'_> {
328        WaitForSteadyState {
329            backups: self,
330            progress: self.client.inner.e2ee.backup_state.upload_progress.clone(),
331            timeout: None,
332        }
333    }
334
335    /// Get a stream of updates to the [`BackupState`].
336    ///
337    /// This method will send out the current state as the first update.
338    ///
339    /// # Examples
340    ///
341    /// ```no_run
342    /// # use matrix_sdk::{Client, encryption::backups::BackupState};
343    /// # use url::Url;
344    /// # async {
345    /// # let homeserver = Url::parse("http://example.com")?;
346    /// # let client = Client::new(homeserver).await?;
347    /// use futures_util::StreamExt;
348    ///
349    /// let backups = client.encryption().backups();
350    ///
351    /// let mut state_stream = backups.state_stream();
352    ///
353    /// while let Some(update) = state_stream.next().await {
354    ///     let Ok(update) = update else { break };
355    ///
356    ///     match update {
357    ///         BackupState::Enabled => {
358    ///             println!("Backups have been enabled");
359    ///         }
360    ///         _ => (),
361    ///     }
362    /// }
363    /// # anyhow::Ok(()) };
364    /// ```
365    pub fn state_stream(
366        &self,
367    ) -> impl Stream<Item = Result<BackupState, BroadcastStreamRecvError>> + use<> {
368        self.client.inner.e2ee.backup_state.global_state.subscribe()
369    }
370
371    /// Get the current [`BackupState`] for this [`Client`].
372    pub fn state(&self) -> BackupState {
373        self.client.inner.e2ee.backup_state.global_state.get()
374    }
375
376    /// Are backups enabled for the current [`Client`]?
377    ///
378    /// This method will check if we locally have an active backup key and
379    /// backup version and are ready to upload room keys to a backup.
380    pub async fn are_enabled(&self) -> bool {
381        let olm_machine = self.client.olm_machine().await;
382
383        if let Some(machine) = olm_machine.as_ref() {
384            machine.backup_machine().enabled().await
385        } else {
386            false
387        }
388    }
389
390    /// Does a backup exist on the server?
391    ///
392    /// This method will request info about the current backup from the
393    /// homeserver and if a backup exists return `true`, otherwise `false`.
394    pub async fn fetch_exists_on_server(&self) -> Result<bool, Error> {
395        let exists_on_server = self.get_current_version().await?.is_some();
396        self.client.inner.e2ee.backup_state.set_backup_exists_on_server(exists_on_server);
397        Ok(exists_on_server)
398    }
399
400    /// Does a backup exist on the server?
401    ///
402    /// This method is identical to [`Self::fetch_exists_on_server`] except that
403    /// we cache the latest answer in memory and only empty the cache if the
404    /// local device adds or deletes a backup itself.
405    ///
406    /// Do not use this method if you need an accurate answer about whether a
407    /// backup exists - instead use [`Self::fetch_exists_on_server`]. This
408    /// method is useful when performance is more important than guaranteed
409    /// accuracy, such as when classifying UTDs.
410    pub async fn exists_on_server(&self) -> Result<bool, Error> {
411        // If we have an answer cached, return it immediately
412        if let Some(cached_value) = self.client.inner.e2ee.backup_state.backup_exists_on_server() {
413            return Ok(cached_value);
414        }
415
416        // Otherwise, delegate to fetch_exists_on_server. (It will update the cached
417        // value for us.)
418        self.fetch_exists_on_server().await
419    }
420
421    /// Subscribe to a stream that notifies when a room key for the specified
422    /// room is downloaded from the key backup.
423    pub fn room_keys_for_room_stream(
424        &self,
425        room_id: &RoomId,
426    ) -> impl Stream<Item = Result<BTreeMap<String, BTreeSet<String>>, BroadcastStreamRecvError>> + use<>
427    {
428        let room_id = room_id.to_owned();
429
430        // TODO: This is a bit crap to say the least. The type is
431        // non-descriptive and doesn't even contain all the important data. It
432        // should be a stream of `RoomKeyInfo` like the OlmMachine has... But on
433        // the other hand we should just be able to use the corresponding
434        // OlmMachine stream and remove this. Currently we can't do this because
435        // the OlmMachine gets destroyed and recreated all the time to be able
436        // to support the notifications-related multiprocessing on iOS.
437        self.room_keys_stream().filter_map(move |import_result| {
438            let room_id = room_id.to_owned();
439
440            async move {
441                match import_result {
442                    Ok(mut import_result) => import_result.keys.remove(&room_id).map(Ok),
443                    Err(e) => Some(Err(e)),
444                }
445            }
446        })
447    }
448
449    /// Download all room keys for a certain room from the server-side key
450    /// backup.
451    pub async fn download_room_keys_for_room(&self, room_id: &RoomId) -> Result<(), Error> {
452        let olm_machine = self.client.olm_machine().await;
453        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
454
455        let backup_keys = olm_machine.store().load_backup_keys().await?;
456
457        if let Some(decryption_key) = backup_keys.decryption_key
458            && let Some(version) = backup_keys.backup_version
459        {
460            let request =
461                get_backup_keys_for_room::v3::Request::new(version.clone(), room_id.to_owned());
462            let response = self.client.send(request).await?;
463
464            // Transform response to standard format (map of room ID -> room key).
465            let response = get_backup_keys::v3::Response::new(BTreeMap::from([(
466                room_id.to_owned(),
467                RoomKeyBackup::new(response.sessions),
468            )]));
469
470            self.handle_downloaded_room_keys(response, decryption_key, &version, olm_machine)
471                .await?;
472        }
473
474        Ok(())
475    }
476
477    /// Download a single room key from the server-side key backup.
478    ///
479    /// Returns `true` if we managed to download a room key, `false` or an error
480    /// if we failed to download it. `false` indicates that there was no
481    /// error, we just don't have backups enabled so we can't download a
482    /// room key.
483    pub async fn download_room_key(
484        &self,
485        room_id: &RoomId,
486        session_id: &str,
487    ) -> Result<bool, Error> {
488        let olm_machine = self.client.olm_machine().await;
489        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
490
491        let backup_keys = olm_machine.store().load_backup_keys().await?;
492
493        if let Some(decryption_key) = backup_keys.decryption_key {
494            if let Some(version) = backup_keys.backup_version {
495                let request = get_backup_keys_for_session::v3::Request::new(
496                    version.clone(),
497                    room_id.to_owned(),
498                    session_id.to_owned(),
499                );
500                let response = self.client.send(request).await?;
501
502                // Transform response to standard format (map of room ID -> room key).
503                let response = get_backup_keys::v3::Response::new(BTreeMap::from([(
504                    room_id.to_owned(),
505                    RoomKeyBackup::new(BTreeMap::from([(
506                        session_id.to_owned(),
507                        response.key_data,
508                    )])),
509                )]));
510
511                self.handle_downloaded_room_keys(response, decryption_key, &version, olm_machine)
512                    .await?;
513
514                Ok(true)
515            } else {
516                Ok(false)
517            }
518        } else {
519            Ok(false)
520        }
521    }
522
523    /// Set the state of the backup.
524    fn set_state(&self, new_state: BackupState) {
525        let old_state = self.client.inner.e2ee.backup_state.global_state.set(new_state);
526
527        if old_state != new_state {
528            info!("Backup state changed from {old_state:?} to {new_state:?}");
529        }
530    }
531
532    /// Set the backup state to the `Enabled` variant and insert the backup key
533    /// and version into the [`OlmMachine`].
534    async fn enable(
535        &self,
536        olm_machine: &OlmMachine,
537        backup_key: MegolmV1BackupKey,
538        version: String,
539    ) -> Result<(), Error> {
540        backup_key.set_version(version);
541        olm_machine.backup_machine().enable_backup_v1(backup_key).await?;
542
543        self.set_state(BackupState::Enabled);
544
545        Ok(())
546    }
547
548    /// Decrypt and forward a response containing backed up room keys to the
549    /// [`OlmMachine`].
550    async fn handle_downloaded_room_keys(
551        &self,
552        backed_up_keys: get_backup_keys::v3::Response,
553        backup_decryption_key: BackupDecryptionKey,
554        backup_version: &str,
555        olm_machine: &OlmMachine,
556    ) -> Result<(), Error> {
557        let mut decrypted_room_keys: Vec<_> = Vec::new();
558
559        for (room_id, room_keys) in backed_up_keys.rooms {
560            for (session_id, room_key) in room_keys.sessions {
561                let room_key = match room_key.deserialize() {
562                    Ok(k) => k,
563                    Err(e) => {
564                        warn!(
565                            "Couldn't deserialize a room key we downloaded from backups, session \
566                             ID: {session_id}, error: {e:?}"
567                        );
568                        continue;
569                    }
570                };
571
572                let room_key =
573                    match backup_decryption_key.decrypt_session_data(room_key.session_data) {
574                        Ok(k) => k,
575                        Err(e) => {
576                            warn!(
577                                "Couldn't decrypt a room key we downloaded from backups, session \
578                                 ID: {session_id}, error: {e:?}"
579                            );
580                            continue;
581                        }
582                    };
583
584                decrypted_room_keys.push(ExportedRoomKey::from_backed_up_room_key(
585                    room_id.to_owned(),
586                    session_id,
587                    room_key,
588                ));
589            }
590        }
591
592        let result = olm_machine
593            .store()
594            .import_room_keys(decrypted_room_keys, Some(backup_version), |_, _| {})
595            .await?;
596
597        // Since we can't use the usual room keys stream from the `OlmMachine`
598        // we're going to send things out in our own custom broadcaster.
599        let _ = self.client.inner.e2ee.backup_state.room_keys_broadcaster.send(result);
600
601        Ok(())
602    }
603
604    /// Download all room keys from the backup on the homeserver.
605    async fn download_all_room_keys(
606        &self,
607        decryption_key: BackupDecryptionKey,
608        version: String,
609    ) -> Result<(), Error> {
610        let request = get_backup_keys::v3::Request::new(version.clone());
611        let response = self.client.send(request).await?;
612
613        let olm_machine = self.client.olm_machine().await;
614        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
615
616        self.handle_downloaded_room_keys(response, decryption_key, &version, olm_machine).await?;
617
618        Ok(())
619    }
620
621    fn room_keys_stream(
622        &self,
623    ) -> impl Stream<Item = Result<RoomKeyImportResult, BroadcastStreamRecvError>> + use<> {
624        BroadcastStream::new(self.client.inner.e2ee.backup_state.room_keys_broadcaster.subscribe())
625    }
626
627    /// Get info about the currently active backup from the server.
628    async fn get_current_version(
629        &self,
630    ) -> Result<Option<get_latest_backup_info::v3::Response>, Error> {
631        let request = get_latest_backup_info::v3::Request::new();
632
633        match self.client.send(request).await {
634            Ok(r) => Ok(Some(r)),
635            Err(e) => {
636                if let Some(kind) = e.client_api_error_kind() {
637                    if kind == &ErrorKind::NotFound { Ok(None) } else { Err(e.into()) }
638                } else {
639                    Err(e.into())
640                }
641            }
642        }
643    }
644
645    async fn delete_backup_from_server(&self, version: String) -> Result<(), Error> {
646        let request = ruma::api::client::backup::delete_backup_version::v3::Request::new(version);
647
648        let ret = match self.client.send(request).await {
649            Ok(_) => Ok(()),
650            Err(e) => {
651                if let Some(kind) = e.client_api_error_kind() {
652                    if kind == &ErrorKind::NotFound { Ok(()) } else { Err(e.into()) }
653                } else {
654                    Err(e.into())
655                }
656            }
657        };
658
659        // If the request succeeded, the backup is gone. If it failed, we are not really
660        // sure what the backup state is. Either way, clear the cache so we check next
661        // time we need to know.
662        self.client.inner.e2ee.backup_state.clear_backup_exists_on_server();
663
664        ret
665    }
666
667    #[instrument(skip(self, olm_machine, request))]
668    async fn send_backup_request(
669        &self,
670        olm_machine: &OlmMachine,
671        request_id: &TransactionId,
672        request: KeysBackupRequest,
673    ) -> Result<(), Error> {
674        trace!("Uploading some room keys");
675
676        let add_backup_keys = add_backup_keys::v3::Request::new(request.version, request.rooms);
677
678        match self.client.send(add_backup_keys).await {
679            Ok(response) => {
680                olm_machine.mark_request_as_sent(request_id, &response).await?;
681
682                let new_counts = olm_machine.backup_machine().room_key_counts().await?;
683
684                self.client
685                    .inner
686                    .e2ee
687                    .backup_state
688                    .upload_progress
689                    .set(UploadState::Uploading(new_counts));
690
691                let delay =
692                    self.client.inner.e2ee.backup_state.upload_delay.read().unwrap().to_owned();
693                crate::sleep::sleep(delay).await;
694
695                Ok(())
696            }
697            Err(error) => {
698                if let Some(kind) = error.client_api_error_kind() {
699                    match kind {
700                        ErrorKind::NotFound => {
701                            warn!(
702                                "No backup found on the server, the backup likely got deleted, \
703                                 disabling backups."
704                            );
705
706                            self.handle_deleted_backup_version(olm_machine).await?;
707                        }
708                        ErrorKind::WrongRoomKeysVersion { current_version } => {
709                            warn!(
710                                new_version = current_version,
711                                "A new backup version was found on the server, disabling backups."
712                            );
713
714                            // TODO: If we're verified and there are other devices besides us,
715                            // request the new backup key over `m.secret.send`.
716
717                            self.handle_deleted_backup_version(olm_machine).await?;
718                        }
719
720                        _ => (),
721                    }
722                }
723
724                Err(error.into())
725            }
726        }
727    }
728
729    /// Poll the [`OlmMachine`] for room keys which need to be backed up and
730    /// send out the request to the homeserver.
731    ///
732    /// This should only be called by the [`BackupUploadingTask`].
733    ///
734    /// [`BackupUploadingTask`]: crate::client::tasks::BackupUploadingTask
735    pub(crate) async fn backup_room_keys(&self) -> Result<(), Error> {
736        let _guard = self.client.locks().backup_upload_lock.lock().await;
737
738        let olm_machine = self.client.olm_machine().await;
739        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
740
741        while let Some((request_id, request)) = olm_machine.backup_machine().backup().await? {
742            self.send_backup_request(olm_machine, &request_id, request).await?;
743        }
744
745        self.client.inner.e2ee.backup_state.upload_progress.set(UploadState::Done);
746
747        Ok(())
748    }
749
750    /// Set up a `m.secret.send` listener and re-enable backups if we have a
751    /// backup recovery key stored.
752    pub(crate) async fn setup_and_resume(&self) -> Result<(), Error> {
753        info!("Setting up secret listeners and trying to resume backups");
754
755        self.client.add_event_handler(Self::secret_send_event_handler);
756
757        if self.client.inner.e2ee.encryption_settings.backup_download_strategy
758            == BackupDownloadStrategy::AfterDecryptionFailure
759        {
760            self.client.add_event_handler(Self::utd_event_handler);
761        }
762
763        self.maybe_resume_backups().await?;
764
765        Ok(())
766    }
767
768    /// Try to enable backups with the given backup recovery key.
769    ///
770    /// This should be called if we receive a backup recovery, either:
771    ///
772    /// * As an `m.secret.send` to-device message from a trusted device.
773    /// * From 4S (i.e. from the `m.megolm_backup.v1` event global account
774    ///   data).
775    ///
776    /// In both cases the method will compare the currently active backup
777    /// version to the backup recovery key's version and, if there is a match,
778    /// activate backups on this device and start uploading room keys to the
779    /// backup.
780    ///
781    /// Returns true if backups were just enabled or were already enabled,
782    /// otherwise false.
783    #[instrument(skip_all)]
784    pub(crate) async fn maybe_enable_backups(
785        &self,
786        maybe_recovery_key: &str,
787    ) -> Result<bool, Error> {
788        let _guard = self.client.locks().backup_modify_lock.lock().await;
789
790        // Create a future here which allows us to catch any failure that might happen
791        // so we can later on fall back to the correct `BackupState`.
792        let future = async {
793            self.set_state(BackupState::Enabling);
794
795            let olm_machine = self.client.olm_machine().await;
796            let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
797            let backup_machine = olm_machine.backup_machine();
798
799            let decryption_key =
800                BackupDecryptionKey::from_base64(maybe_recovery_key).map_err(|e| {
801                    <serde_json::Error as serde::de::Error>::custom(format!(
802                        "Couldn't deserialize the backup recovery key: {e:?}"
803                    ))
804                })?;
805
806            // Let's try to see if there's a backup on the homeserver.
807            let current_version = self.get_current_version().await?;
808
809            let Some(current_version) = current_version else {
810                warn!("Tried to enable backups, but no backup version was found on the server.");
811                return Ok(false);
812            };
813
814            Span::current().record("backup_version", &current_version.version);
815
816            let backup_info: RoomKeyBackupInfo = current_version.algorithm.deserialize_as()?;
817            let stored_keys = backup_machine.get_backup_keys().await?;
818
819            if stored_keys.backup_version.as_ref() == Some(&current_version.version)
820                && self.are_enabled().await
821            {
822                // If we already have a backup enabled which is using the currently active
823                // backup version, do nothing but tell the caller using the return value that
824                // backups are enabled.
825                Ok(true)
826            } else if decryption_key.backup_key_matches(&backup_info) {
827                info!(
828                    "We have found the correct backup recovery key. Storing the backup recovery \
829                     key and enabling backups."
830                );
831
832                // We're enabling a new backup, reset the `backed_up` flags on the room keys and
833                // remove any key/version we might have.
834                backup_machine.disable_backup().await?;
835
836                let backup_key = decryption_key.megolm_v1_public_key();
837                backup_key.set_version(current_version.version.to_owned());
838
839                // Persist the new keys and enable the backup.
840                backup_machine
841                    .save_decryption_key(
842                        Some(decryption_key.to_owned()),
843                        Some(current_version.version.to_owned()),
844                    )
845                    .await?;
846                backup_machine.enable_backup_v1(backup_key).await?;
847
848                // If the user has set up the client to download any room keys, do so now. This
849                // is not really useful in a real scenario since the API to
850                // download room keys is not paginated.
851                //
852                // You need to download all room keys at once, parse a potentially huge JSON
853                // response and decrypt all the room keys found in the backup.
854                //
855                // This doesn't work for any sizeable account.
856                if self.client.inner.e2ee.encryption_settings.backup_download_strategy
857                    == BackupDownloadStrategy::OneShot
858                {
859                    self.set_state(BackupState::Downloading);
860
861                    if let Err(e) =
862                        self.download_all_room_keys(decryption_key, current_version.version).await
863                    {
864                        warn!("Couldn't automatically download all room keys from backup: {e:?}");
865                    }
866                }
867
868                // Trigger the upload of any room keys we might need to upload.
869                self.maybe_trigger_backup();
870
871                Ok(true)
872            } else {
873                let derived_key = decryption_key.megolm_v1_public_key();
874                let downloaded_key = current_version.algorithm;
875
876                warn!(
877                    ?derived_key,
878                    ?downloaded_key,
879                    "Found an active backup but the recovery key we received isn't the one used for \
880                     this backup version"
881                );
882
883                Ok(false)
884            }
885        };
886
887        match future.await {
888            Ok(enabled) => {
889                if enabled {
890                    self.set_state(BackupState::Enabled);
891                } else {
892                    self.set_state(BackupState::Unknown);
893                }
894
895                Ok(enabled)
896            }
897            Err(e) => {
898                self.set_state(BackupState::Unknown);
899
900                Err(e)
901            }
902        }
903    }
904
905    /// Try to resume backups from a backup recovery key we have found in the
906    /// crypto store.
907    ///
908    /// Returns true if backups have been resumed, false otherwise.
909    async fn resume_backup_from_stored_backup_key(
910        &self,
911        olm_machine: &OlmMachine,
912    ) -> Result<bool, Error> {
913        let backup_keys = olm_machine.store().load_backup_keys().await?;
914
915        if let Some(decryption_key) = backup_keys.decryption_key {
916            if let Some(version) = backup_keys.backup_version {
917                let backup_key = decryption_key.megolm_v1_public_key();
918
919                self.enable(olm_machine, backup_key, version).await?;
920
921                Ok(true)
922            } else {
923                Ok(false)
924            }
925        } else {
926            Ok(false)
927        }
928    }
929
930    /// Try to resume backups by iterating through the `m.secret.send` to-device
931    /// messages the [`OlmMachine`] has received and stored in the secret inbox.
932    async fn maybe_resume_from_secret_inbox(&self, olm_machine: &OlmMachine) -> Result<(), Error> {
933        let secrets = olm_machine.store().get_secrets_from_inbox(&SecretName::RecoveryKey).await?;
934
935        for secret in secrets {
936            if self.maybe_enable_backups(&secret.event.content.secret).await? {
937                break;
938            }
939        }
940
941        olm_machine.store().delete_secrets_from_inbox(&SecretName::RecoveryKey).await?;
942
943        Ok(())
944    }
945
946    /// Check and re-enable a backup if we have a backup recovery key locally.
947    async fn maybe_resume_backups(&self) -> Result<(), Error> {
948        let olm_machine = self.client.olm_machine().await;
949        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
950
951        // Let us first check if we have a stored backup recovery key and a backup
952        // version.
953        if !self.resume_backup_from_stored_backup_key(olm_machine).await? {
954            // We didn't manage to enable backups from a stored backup recovery key, let us
955            // check our secret inbox. Perhaps we can find a valid key there.
956            self.maybe_resume_from_secret_inbox(olm_machine).await?;
957        }
958
959        Ok(())
960    }
961
962    /// Listen for `m.secret.send` to-device messages and check the secret inbox
963    /// if we do receive one.
964    #[instrument(skip_all)]
965    pub(crate) async fn secret_send_event_handler(_: ToDeviceSecretSendEvent, client: Client) {
966        let olm_machine = client.olm_machine().await;
967
968        // TODO: Because of our crude multi-process support, which reloads the whole
969        // [`OlmMachine`] the `secrets_stream` might stop giving you updates. Once
970        // that's fixed, stop listening to individual secret send events and
971        // listen to the secrets stream.
972        if let Some(olm_machine) = olm_machine.as_ref() {
973            if let Err(e) =
974                client.encryption().backups().maybe_resume_from_secret_inbox(olm_machine).await
975            {
976                error!("Could not handle `m.secret.send` event: {e:?}");
977            }
978        } else {
979            error!("Tried to handle a `m.secret.send` event but no OlmMachine was initialized");
980        }
981    }
982
983    /// Handle UTD events by triggering download from key backup.
984    ///
985    /// This function is registered as an event handler; it exists to deal
986    /// with cases where [`Room::decrypt_event`] is not called and instead the
987    /// event should be decrypted by the time this crate sees the event, such as
988    /// for events received via `/sync` (as opposed to via `/messages`,
989    /// `/context`, etc.)
990    #[allow(clippy::unused_async)] // Because it's used as an event handler, which must be async.
991    pub(crate) async fn utd_event_handler(
992        event: Raw<OriginalSyncRoomEncryptedEvent>,
993        room: Room,
994        client: Client,
995    ) {
996        client.encryption().backups().maybe_download_room_key(room.room_id().to_owned(), event);
997    }
998
999    /// Send a notification to the task responsible for key backup downloads
1000    /// that it should attempt to download the keys for the given event.
1001    #[cfg(not(feature = "experimental-encrypted-state-events"))]
1002    pub(crate) fn maybe_download_room_key(
1003        &self,
1004        room_id: OwnedRoomId,
1005        event: Raw<OriginalSyncRoomEncryptedEvent>,
1006    ) {
1007        let tasks = self.client.inner.e2ee.tasks.lock();
1008        if let Some(task) = tasks.download_room_keys.as_ref() {
1009            task.trigger_download_for_utd_event(room_id, event);
1010        }
1011    }
1012
1013    /// Send a notification to the task responsible for key backup downloads
1014    /// that it should attempt to download the keys for the given event.
1015    #[cfg(feature = "experimental-encrypted-state-events")]
1016    pub(crate) fn maybe_download_room_key<T: JsonCastable<EncryptedEvent>>(
1017        &self,
1018        room_id: OwnedRoomId,
1019        event: Raw<T>,
1020    ) {
1021        let tasks = self.client.inner.e2ee.tasks.lock();
1022        if let Some(task) = tasks.download_room_keys.as_ref() {
1023            task.trigger_download_for_utd_event(room_id, event);
1024        }
1025    }
1026
1027    /// Send a notification to the task which is responsible for uploading room
1028    /// keys to the backup that it might have new room keys to back up.
1029    pub(crate) fn maybe_trigger_backup(&self) {
1030        let tasks = self.client.inner.e2ee.tasks.lock();
1031
1032        if let Some(tasks) = tasks.upload_room_keys.as_ref() {
1033            tasks.trigger_upload();
1034        }
1035    }
1036
1037    /// Disable our backups locally if we notice that the backup has been
1038    /// removed on the homeserver.
1039    async fn handle_deleted_backup_version(&self, olm_machine: &OlmMachine) -> Result<(), Error> {
1040        olm_machine.backup_machine().disable_backup().await?;
1041        self.set_state(BackupState::Unknown);
1042
1043        Ok(())
1044    }
1045}
1046
1047#[cfg(all(test, not(target_family = "wasm")))]
1048mod test {
1049    use std::time::Duration;
1050
1051    use matrix_sdk_test::async_test;
1052    use serde_json::json;
1053    use wiremock::{
1054        Mock, MockServer, ResponseTemplate,
1055        matchers::{header, method, path},
1056    };
1057
1058    use super::*;
1059    use crate::test_utils::{logged_in_client, mocks::MatrixMockServer};
1060
1061    fn room_key() -> ExportedRoomKey {
1062        let json = json!({
1063            "algorithm": "m.megolm.v1.aes-sha2",
1064            "room_id": "!DovneieKSTkdHKpIXy:morpheus.localhost",
1065            "sender_key": "DeHIg4gwhClxzFYcmNntPNF9YtsdZbmMy8+3kzCMXHA",
1066            "session_id": "gM8i47Xhu0q52xLfgUXzanCMpLinoyVyH7R58cBuVBU",
1067            "session_key": "AQAAAABvWMNZjKFtebYIePKieQguozuoLgzeY6wKcyJjLJcJtQgy1dPqTBD12U+XrYLrRHn\
1068                            lKmxoozlhFqJl456+9hlHCL+yq+6ScFuBHtJepnY1l2bdLb4T0JMDkNsNErkiLiLnD6yp3J\
1069                            DSjIhkdHxmup/huygrmroq6/L5TaThEoqvW4DPIuO14btKudsS34FF82pwjKS4p6Mlch+0e\
1070                            fHAblQV",
1071            "sender_claimed_keys":{},
1072            "forwarding_curve25519_key_chain":[]
1073        });
1074
1075        serde_json::from_value(json)
1076            .expect("We should be able to deserialize our exported room key")
1077    }
1078
1079    async fn backup_disabling_test_body(
1080        client: &Client,
1081        server: &MockServer,
1082        put_response: ResponseTemplate,
1083    ) {
1084        let _post_scope = Mock::given(method("POST"))
1085            .and(path("_matrix/client/unstable/room_keys/version"))
1086            .and(header("authorization", "Bearer 1234"))
1087            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1088              "version": "1"
1089            })))
1090            .expect(1)
1091            .named("POST for the backup creation")
1092            .mount_as_scoped(server)
1093            .await;
1094
1095        let _put_scope = Mock::given(method("PUT"))
1096            .and(path("_matrix/client/unstable/room_keys/keys"))
1097            .and(header("authorization", "Bearer 1234"))
1098            .respond_with(put_response)
1099            .expect(1)
1100            .named("POST for the backup creation")
1101            .mount_as_scoped(server)
1102            .await;
1103
1104        client
1105            .encryption()
1106            .backups()
1107            .create()
1108            .await
1109            .expect("We should be able to create a new backup");
1110
1111        assert_eq!(client.encryption().backups().state(), BackupState::Enabled);
1112
1113        client
1114            .encryption()
1115            .backups()
1116            .backup_room_keys()
1117            .await
1118            .expect_err("Backups should be disabled");
1119
1120        assert_eq!(client.encryption().backups().state(), BackupState::Unknown);
1121    }
1122
1123    #[async_test]
1124    async fn test_backup_disabling_after_remote_deletion() {
1125        let server = MockServer::start().await;
1126        let client = logged_in_client(Some(server.uri())).await;
1127
1128        {
1129            let machine = client.olm_machine().await;
1130            machine
1131                .as_ref()
1132                .unwrap()
1133                .store()
1134                .import_exported_room_keys(vec![room_key()], |_, _| {})
1135                .await
1136                .expect("We should be able to import a room key");
1137        }
1138
1139        backup_disabling_test_body(
1140            &client,
1141            &server,
1142            ResponseTemplate::new(404).set_body_json(json!({
1143                "errcode": "M_NOT_FOUND",
1144                "error": "Unknown backup version"
1145            })),
1146        )
1147        .await;
1148
1149        backup_disabling_test_body(
1150            &client,
1151            &server,
1152            ResponseTemplate::new(403).set_body_json(json!({
1153                "current_version": "42",
1154                "errcode": "M_WRONG_ROOM_KEYS_VERSION",
1155                "error": "Wrong backup version."
1156            })),
1157        )
1158        .await;
1159
1160        server.verify().await;
1161    }
1162
1163    #[async_test]
1164    async fn test_when_a_backup_exists_then_fetch_exists_on_server_returns_true() {
1165        let server = MatrixMockServer::new().await;
1166        let client = server.client_builder().build().await;
1167
1168        server.mock_room_keys_version().exists().expect(1).mount().await;
1169
1170        let exists = client
1171            .encryption()
1172            .backups()
1173            .fetch_exists_on_server()
1174            .await
1175            .expect("We should be able to check if backups exist on the server");
1176
1177        assert!(exists, "We should deduce that a backup exists on the server");
1178    }
1179
1180    #[async_test]
1181    async fn test_repeated_calls_to_fetch_exists_on_server_makes_repeated_requests() {
1182        let server = MatrixMockServer::new().await;
1183        let client = server.client_builder().build().await;
1184
1185        // Expect 2 requests to the server
1186        server.mock_room_keys_version().exists().expect(2).mount().await;
1187
1188        let backups = client.encryption().backups();
1189
1190        // Call fetch_exists_on_server twice
1191        backups.fetch_exists_on_server().await.unwrap();
1192        let exists = backups.fetch_exists_on_server().await.unwrap();
1193
1194        assert!(exists, "We should deduce that a backup exists on the server");
1195    }
1196
1197    #[async_test]
1198    async fn test_when_no_backup_exists_then_fetch_exists_on_server_returns_false() {
1199        let server = MatrixMockServer::new().await;
1200        let client = server.client_builder().build().await;
1201
1202        server.mock_room_keys_version().none().expect(1).mount().await;
1203
1204        let exists = client
1205            .encryption()
1206            .backups()
1207            .fetch_exists_on_server()
1208            .await
1209            .expect("We should be able to check if backups exist on the server");
1210
1211        assert!(!exists, "We should deduce that no backup exists on the server");
1212    }
1213
1214    #[async_test]
1215    async fn test_when_server_returns_an_error_then_fetch_exists_on_server_returns_an_error() {
1216        let server = MatrixMockServer::new().await;
1217        let client = server.client_builder().build().await;
1218
1219        {
1220            let _scope =
1221                server.mock_room_keys_version().error429().expect(1).mount_as_scoped().await;
1222
1223            client.encryption().backups().fetch_exists_on_server().await.expect_err(
1224                "If the /version endpoint returns a non 404 error we should throw an error",
1225            );
1226        }
1227
1228        {
1229            let _scope =
1230                server.mock_room_keys_version().error404().expect(1).mount_as_scoped().await;
1231
1232            client.encryption().backups().fetch_exists_on_server().await.expect_err(
1233                "If the /version endpoint returns a non-Matrix 404 error we should throw an error",
1234            );
1235        }
1236    }
1237
1238    #[async_test]
1239    async fn test_when_a_backup_exists_then_exists_on_server_returns_true() {
1240        let server = MatrixMockServer::new().await;
1241        let client = server.client_builder().build().await;
1242
1243        server.mock_room_keys_version().exists().expect(1).mount().await;
1244
1245        let exists = client
1246            .encryption()
1247            .backups()
1248            .exists_on_server()
1249            .await
1250            .expect("We should be able to check if backups exist on the server");
1251
1252        assert!(exists, "We should deduce that a backup exists on the server");
1253    }
1254
1255    #[async_test]
1256    async fn test_when_no_backup_exists_then_exists_on_server_returns_false() {
1257        let server = MatrixMockServer::new().await;
1258        let client = server.client_builder().build().await;
1259
1260        server.mock_room_keys_version().none().expect(1).mount().await;
1261
1262        let exists = client
1263            .encryption()
1264            .backups()
1265            .exists_on_server()
1266            .await
1267            .expect("We should be able to check if backups exist on the server");
1268
1269        assert!(!exists, "We should deduce that no backup exists on the server");
1270    }
1271
1272    #[async_test]
1273    async fn test_when_server_returns_an_error_then_exists_on_server_returns_an_error() {
1274        let server = MatrixMockServer::new().await;
1275        let client = server.client_builder().build().await;
1276
1277        {
1278            let _scope =
1279                server.mock_room_keys_version().error429().expect(1).mount_as_scoped().await;
1280
1281            client.encryption().backups().exists_on_server().await.expect_err(
1282                "If the /version endpoint returns a non 404 error we should throw an error",
1283            );
1284        }
1285
1286        {
1287            let _scope =
1288                server.mock_room_keys_version().error404().expect(1).mount_as_scoped().await;
1289
1290            client.encryption().backups().exists_on_server().await.expect_err(
1291                "If the /version endpoint returns a non-Matrix 404 error we should throw an error",
1292            );
1293        }
1294    }
1295
1296    #[async_test]
1297    async fn test_repeated_calls_to_exists_on_server_do_not_make_additional_requests() {
1298        let server = MatrixMockServer::new().await;
1299        let client = server.client_builder().build().await;
1300
1301        // Create a mock stating that the request should only be made once
1302        server.mock_room_keys_version().exists().expect(1).mount().await;
1303
1304        let backups = client.encryption().backups();
1305
1306        // Call exists_on_server several times
1307        backups.exists_on_server().await.unwrap();
1308        backups.exists_on_server().await.unwrap();
1309        backups.exists_on_server().await.unwrap();
1310
1311        let exists = backups
1312            .exists_on_server()
1313            .await
1314            .expect("We should be able to check if backups exist on the server");
1315
1316        assert!(exists, "We should deduce that a backup exists on the server");
1317
1318        // We check expectations here, confirming that only one call was made
1319    }
1320
1321    #[async_test]
1322    async fn test_adding_a_backup_invalidates_exists_on_server_cache() {
1323        let server = MatrixMockServer::new().await;
1324        let client = server.client_builder().build().await;
1325        let backups = client.encryption().backups();
1326
1327        {
1328            let _scope = server.mock_room_keys_version().none().expect(1).mount_as_scoped().await;
1329
1330            // Call exists_on_server to fill the cache
1331            let exists = backups.exists_on_server().await.unwrap();
1332            assert!(!exists, "No backup exists at this point");
1333        }
1334
1335        // Create a new backup. Should invalidate the cache
1336        server.mock_add_room_keys_version().ok().expect(1).mount().await;
1337        backups.create().await.expect("Failed to create a backup");
1338
1339        server.mock_room_keys_version().exists().expect(1).mount().await;
1340        let exists = backups
1341            .exists_on_server()
1342            .await
1343            .expect("We should be able to check if backups exist on the server");
1344
1345        assert!(exists, "But now a backup does exist");
1346    }
1347
1348    #[async_test]
1349    async fn test_removing_a_backup_invalidates_exists_on_server_cache() {
1350        let server = MatrixMockServer::new().await;
1351        let client = server.client_builder().build().await;
1352        let backups = client.encryption().backups();
1353
1354        {
1355            let _scope = server.mock_room_keys_version().exists().expect(1).mount_as_scoped().await;
1356
1357            // Call exists_on_server to fill the cache
1358            let exists = backups.exists_on_server().await.unwrap();
1359            assert!(exists, "A backup exists at this point");
1360        }
1361
1362        // Delete the backup. Should invalidate the cache
1363        server.mock_delete_room_keys_version().ok().expect(1).mount().await;
1364        backups.delete_backup_from_server("1".to_owned()).await.expect("Failed to delete a backup");
1365
1366        server.mock_room_keys_version().none().expect(1).mount().await;
1367        let exists = backups
1368            .exists_on_server()
1369            .await
1370            .expect("We should be able to check if backups exist on the server");
1371
1372        assert!(!exists, "But now there is no backup");
1373    }
1374
1375    #[async_test]
1376    async fn test_waiting_for_steady_state_resets_the_delay() {
1377        let server = MatrixMockServer::new().await;
1378        let client = server.client_builder().build().await;
1379
1380        server.mock_add_room_keys_version().ok().expect(1).mount().await;
1381
1382        client
1383            .encryption()
1384            .backups()
1385            .create()
1386            .await
1387            .expect("We should be able to create a new backup");
1388
1389        let backups = client.encryption().backups();
1390
1391        let old_duration =
1392            { client.inner.e2ee.backup_state.upload_delay.read().unwrap().to_owned() };
1393
1394        let wait_for_steady_state =
1395            backups.wait_for_steady_state().with_delay(Duration::from_nanos(100));
1396
1397        let mut progress_stream = wait_for_steady_state.subscribe_to_progress();
1398
1399        let task = matrix_sdk_common::executor::spawn({
1400            let client = client.to_owned();
1401            async move {
1402                while let Some(state) = progress_stream.next().await {
1403                    let Ok(state) = state else {
1404                        panic!("Error while waiting for the upload state")
1405                    };
1406
1407                    match state {
1408                        UploadState::Idle => (),
1409                        UploadState::Done => {
1410                            let current_delay = {
1411                                client
1412                                    .inner
1413                                    .e2ee
1414                                    .backup_state
1415                                    .upload_delay
1416                                    .read()
1417                                    .unwrap()
1418                                    .to_owned()
1419                            };
1420
1421                            assert_ne!(current_delay, old_duration);
1422                            break;
1423                        }
1424                        _ => panic!("We should not have entered any other state"),
1425                    }
1426                }
1427            }
1428        });
1429
1430        wait_for_steady_state.await.expect("We should be able to wait for the steady state");
1431        task.await.unwrap();
1432
1433        let current_duration =
1434            { client.inner.e2ee.backup_state.upload_delay.read().unwrap().to_owned() };
1435
1436        assert_eq!(old_duration, current_duration);
1437    }
1438}