matrix_sdk/encryption/backups/
mod.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Room key backup support
16//!
17//! This module implements support for server-side key backups[[1]]. The module
18//! allows you to connect to an existing backup, create or delete backups from
19//! the homeserver, and download room keys from a backup.
20//!
21//! [1]: https://spec.matrix.org/unstable/client-server-api/#server-side-key-backups
22
23use std::collections::{BTreeMap, BTreeSet};
24
25use futures_core::Stream;
26use futures_util::StreamExt;
27#[cfg(feature = "experimental-encrypted-state-events")]
28use matrix_sdk_base::crypto::types::events::room::encrypted::EncryptedEvent;
29use matrix_sdk_base::crypto::{
30    backups::MegolmV1BackupKey,
31    store::types::BackupDecryptionKey,
32    types::{requests::KeysBackupRequest, RoomKeyBackupInfo},
33    OlmMachine, RoomKeyImportResult,
34};
35#[cfg(feature = "experimental-encrypted-state-events")]
36use ruma::serde::JsonCastable;
37use ruma::{
38    api::client::{
39        backup::{
40            add_backup_keys, create_backup_version, get_backup_keys, get_backup_keys_for_room,
41            get_backup_keys_for_session, get_latest_backup_info, RoomKeyBackup,
42        },
43        error::ErrorKind,
44    },
45    events::{
46        room::encrypted::OriginalSyncRoomEncryptedEvent,
47        secret::{request::SecretName, send::ToDeviceSecretSendEvent},
48    },
49    serde::Raw,
50    OwnedRoomId, RoomId, TransactionId,
51};
52use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
53use tracing::{error, info, instrument, trace, warn, Span};
54
55pub mod futures;
56pub(crate) mod types;
57
58pub use types::{BackupState, UploadState};
59
60use self::futures::WaitForSteadyState;
61use crate::{
62    crypto::olm::ExportedRoomKey, encryption::BackupDownloadStrategy, Client, Error, Room,
63};
64
65/// The backups manager for the [`Client`].
66#[derive(Debug, Clone)]
67pub struct Backups {
68    pub(super) client: Client,
69}
70
71impl Backups {
72    /// Create a new backup version, encrypted with a new backup recovery key.
73    ///
74    /// The backup recovery key will be persisted locally and shared with
75    /// trusted devices as `m.secret.send` to-device messages.
76    ///
77    /// After the backup has been created, all room keys will be uploaded to the
78    /// homeserver.
79    ///
80    /// *Warning*: This will overwrite any existing backup.
81    ///
82    /// # Examples
83    ///
84    /// ```no_run
85    /// # use matrix_sdk::{Client, encryption::backups::BackupState};
86    /// # use url::Url;
87    /// # async {
88    /// # let homeserver = Url::parse("http://example.com")?;
89    /// # let client = Client::new(homeserver).await?;
90    /// let backups = client.encryption().backups();
91    /// backups.create().await?;
92    ///
93    /// assert_eq!(backups.state(), BackupState::Enabled);
94    /// # anyhow::Ok(()) };
95    /// ```
96    pub async fn create(&self) -> Result<(), Error> {
97        self.client.inner.e2ee.backup_state.clear_backup_exists_on_server();
98        let _guard = self.client.locks().backup_modify_lock.lock().await;
99
100        self.set_state(BackupState::Creating);
101
102        // Create a future so we can catch errors and go back to the `Unknown`
103        // state. This is a hack to get around the lack of `try` blocks in Rust.
104        let future = async {
105            let olm_machine = self.client.olm_machine().await;
106            let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
107
108            // Create a new backup recovery key.
109            let decryption_key = BackupDecryptionKey::new().expect(
110                "We should be able to generate enough randomness to create a new backup recovery \
111                 key",
112            );
113
114            // Get the info about the new backup key, this needs to be uploaded to the
115            // homeserver[1].
116            //
117            // We need to sign the `RoomKeyBackupInfo` so other clients which might want
118            // to start using the backup without having access to the
119            // `BackupDecryptionKey` can do so, as per [spec]:
120            //
121            // Clients must only store keys in backups after they have ensured that the
122            // `auth_data` has not been tampered with. This can be done either by:
123            //
124            //  * checking that it is signed by the user's master cross-signing key or by a
125            //    verified device belonging to the same user, or
126            //  * by deriving the public key from a private key that it obtained from a
127            //    trusted source. Trusted sources for the private key include the user
128            //    entering the key, retrieving the key stored in secret storage, or
129            //    obtaining the key via secret sharing from a verified device belonging to
130            //    the same user.
131            //
132            //
133            // [1]: https://spec.matrix.org/v1.8/client-server-api/#post_matrixclientv3room_keysversion
134            // [spec]: https://spec.matrix.org/v1.8/client-server-api/#server-side-key-backups
135            let mut backup_info = decryption_key.to_backup_info();
136
137            if let Err(e) = olm_machine.backup_machine().sign_backup(&mut backup_info).await {
138                warn!("Unable to sign the newly created backup version: {e:?}");
139            }
140
141            let algorithm = Raw::new(&backup_info)?.cast();
142            let request = create_backup_version::v3::Request::new(algorithm);
143            let response = self.client.send(request).await?;
144            let version = response.version;
145
146            // Reset any state we might have had before the new backup was created.
147            // TODO: This should remove the old stored key and version.
148            olm_machine.backup_machine().disable_backup().await?;
149
150            let backup_key = decryption_key.megolm_v1_public_key();
151
152            // Save the newly created keys and the version we received from the server.
153            olm_machine
154                .backup_machine()
155                .save_decryption_key(Some(decryption_key), Some(version.to_owned()))
156                .await?;
157
158            // Enable the backup and start the upload of room keys.
159            self.enable(olm_machine, backup_key, version).await?;
160
161            Ok(())
162        };
163
164        let result = future.await;
165
166        if result.is_err() {
167            self.set_state(BackupState::Unknown);
168        }
169
170        result
171    }
172
173    /// Disable and delete the currently active backup only if previously
174    /// enabled before, otherwise an error will be returned.
175    ///
176    /// For a more aggressive variant see [`Backups::disable_and_delete`] which
177    /// will delete the remote backup without checking the local state.
178    ///
179    /// # Examples
180    ///
181    /// ```no_run
182    /// # use matrix_sdk::{Client, encryption::backups::BackupState};
183    /// # use url::Url;
184    /// # async {
185    /// # let homeserver = Url::parse("http://example.com")?;
186    /// # let client = Client::new(homeserver).await?;
187    /// let backups = client.encryption().backups();
188    /// backups.disable().await?;
189    ///
190    /// assert_eq!(backups.state(), BackupState::Unknown);
191    /// # anyhow::Ok(()) };
192    /// ```
193    #[instrument(skip_all, fields(version))]
194    pub async fn disable(&self) -> Result<(), Error> {
195        let _guard = self.client.locks().backup_modify_lock.lock().await;
196
197        self.set_state(BackupState::Disabling);
198
199        // Create a future so we can catch errors and go back to the `Unknown` state.
200        let future = async {
201            let olm_machine = self.client.olm_machine().await;
202            let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
203
204            let backup_keys = olm_machine.backup_machine().get_backup_keys().await?;
205
206            if let Some(version) = backup_keys.backup_version {
207                Span::current().record("version", &version);
208                info!("Deleting and disabling backup");
209
210                self.delete_backup_from_server(version).await?;
211                info!("Backup successfully deleted");
212
213                olm_machine.backup_machine().disable_backup().await?;
214
215                info!("Backup successfully disabled and deleted");
216
217                Ok(())
218            } else {
219                info!("Backup is not enabled, can't disable it");
220                Err(Error::BackupNotEnabled)
221            }
222        };
223
224        let result = future.await;
225
226        self.set_state(BackupState::Unknown);
227
228        result
229    }
230
231    /// Completely disable and delete the active backup both locally
232    /// and from the backend no matter if previously setup locally
233    /// or not.
234    ///
235    /// ⚠️ This method is mainly used when resetting the crypto identity
236    /// and for most other use cases its safer [`Backups::disable`] counterpart
237    /// should be used.
238    ///
239    /// It will fetch the current backup version from the backend and delete it
240    /// before proceeding to disabling local backups as well
241    ///
242    /// # Examples
243    ///
244    /// ```no_run
245    /// # use matrix_sdk::{Client, encryption::backups::BackupState};
246    /// # use url::Url;
247    /// # async {
248    /// # let homeserver = Url::parse("http://example.com")?;
249    /// # let client = Client::new(homeserver).await?;
250    /// let backups = client.encryption().backups();
251    /// backups.disable_and_delete().await?;
252    ///
253    /// assert_eq!(backups.state(), BackupState::Unknown);
254    /// # anyhow::Ok(()) };
255    /// ```
256    pub async fn disable_and_delete(&self) -> Result<(), Error> {
257        let _guard = self.client.locks().backup_modify_lock.lock().await;
258
259        self.set_state(BackupState::Disabling);
260
261        // Create a future so we can catch errors and go back to the `Unknown` state.
262        let future = async {
263            let response = self.get_current_version().await?;
264
265            if let Some(response) = response {
266                self.delete_backup_from_server(response.version).await?;
267            }
268
269            let olm_machine = self.client.olm_machine().await;
270            let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
271
272            olm_machine.backup_machine().disable_backup().await?;
273
274            Ok(())
275        };
276
277        let result = future.await;
278
279        self.set_state(BackupState::Unknown);
280
281        result
282    }
283
284    /// Returns a future to wait for room keys to be uploaded.
285    ///
286    /// Awaiting the future will wake up a task to upload room keys which have
287    /// not yet been uploaded to the homeserver. It will then wait for the task
288    /// to finish uploading.
289    ///
290    /// # Examples
291    ///
292    /// ```no_run
293    /// # use matrix_sdk::{Client, encryption::backups::UploadState};
294    /// # use url::Url;
295    /// # async {
296    /// # let homeserver = Url::parse("http://example.com")?;
297    /// # let client = Client::new(homeserver).await?;
298    /// use futures_util::StreamExt;
299    ///
300    /// let backups = client.encryption().backups();
301    /// let wait_for_steady_state = backups.wait_for_steady_state();
302    ///
303    /// let mut progress_stream = wait_for_steady_state.subscribe_to_progress();
304    ///
305    /// tokio::spawn(async move {
306    ///     while let Some(update) = progress_stream.next().await {
307    ///         let Ok(update) = update else { break };
308    ///
309    ///         match update {
310    ///             UploadState::Uploading(counts) => {
311    ///                 println!(
312    ///                     "Uploaded {} out of {} room keys.",
313    ///                     counts.backed_up, counts.total
314    ///                 );
315    ///             }
316    ///             UploadState::Error => break,
317    ///             UploadState::Done => break,
318    ///             _ => (),
319    ///         }
320    ///     }
321    /// });
322    ///
323    /// wait_for_steady_state.await?;
324    ///
325    /// # anyhow::Ok(()) };
326    /// ```
327    pub fn wait_for_steady_state(&self) -> WaitForSteadyState<'_> {
328        WaitForSteadyState {
329            backups: self,
330            progress: self.client.inner.e2ee.backup_state.upload_progress.clone(),
331            timeout: None,
332        }
333    }
334
335    /// Get a stream of updates to the [`BackupState`].
336    ///
337    /// This method will send out the current state as the first update.
338    ///
339    /// # Examples
340    ///
341    /// ```no_run
342    /// # use matrix_sdk::{Client, encryption::backups::BackupState};
343    /// # use url::Url;
344    /// # async {
345    /// # let homeserver = Url::parse("http://example.com")?;
346    /// # let client = Client::new(homeserver).await?;
347    /// use futures_util::StreamExt;
348    ///
349    /// let backups = client.encryption().backups();
350    ///
351    /// let mut state_stream = backups.state_stream();
352    ///
353    /// while let Some(update) = state_stream.next().await {
354    ///     let Ok(update) = update else { break };
355    ///
356    ///     match update {
357    ///         BackupState::Enabled => {
358    ///             println!("Backups have been enabled");
359    ///         }
360    ///         _ => (),
361    ///     }
362    /// }
363    /// # anyhow::Ok(()) };
364    /// ```
365    pub fn state_stream(
366        &self,
367    ) -> impl Stream<Item = Result<BackupState, BroadcastStreamRecvError>> {
368        self.client.inner.e2ee.backup_state.global_state.subscribe()
369    }
370
371    /// Get the current [`BackupState`] for this [`Client`].
372    pub fn state(&self) -> BackupState {
373        self.client.inner.e2ee.backup_state.global_state.get()
374    }
375
376    /// Are backups enabled for the current [`Client`]?
377    ///
378    /// This method will check if we locally have an active backup key and
379    /// backup version and are ready to upload room keys to a backup.
380    pub async fn are_enabled(&self) -> bool {
381        let olm_machine = self.client.olm_machine().await;
382
383        if let Some(machine) = olm_machine.as_ref() {
384            machine.backup_machine().enabled().await
385        } else {
386            false
387        }
388    }
389
390    /// Does a backup exist on the server?
391    ///
392    /// This method will request info about the current backup from the
393    /// homeserver and if a backup exists return `true`, otherwise `false`.
394    pub async fn fetch_exists_on_server(&self) -> Result<bool, Error> {
395        let exists_on_server = self.get_current_version().await?.is_some();
396        self.client.inner.e2ee.backup_state.set_backup_exists_on_server(exists_on_server);
397        Ok(exists_on_server)
398    }
399
400    /// Does a backup exist on the server?
401    ///
402    /// This method is identical to [`Self::fetch_exists_on_server`] except that
403    /// we cache the latest answer in memory and only empty the cache if the
404    /// local device adds or deletes a backup itself.
405    ///
406    /// Do not use this method if you need an accurate answer about whether a
407    /// backup exists - instead use [`Self::fetch_exists_on_server`]. This
408    /// method is useful when performance is more important than guaranteed
409    /// accuracy, such as when classifying UTDs.
410    pub async fn exists_on_server(&self) -> Result<bool, Error> {
411        // If we have an answer cached, return it immediately
412        if let Some(cached_value) = self.client.inner.e2ee.backup_state.backup_exists_on_server() {
413            return Ok(cached_value);
414        }
415
416        // Otherwise, delegate to fetch_exists_on_server. (It will update the cached
417        // value for us.)
418        self.fetch_exists_on_server().await
419    }
420
421    /// Subscribe to a stream that notifies when a room key for the specified
422    /// room is downloaded from the key backup.
423    pub fn room_keys_for_room_stream(
424        &self,
425        room_id: &RoomId,
426    ) -> impl Stream<Item = Result<BTreeMap<String, BTreeSet<String>>, BroadcastStreamRecvError>>
427    {
428        let room_id = room_id.to_owned();
429
430        // TODO: This is a bit crap to say the least. The type is
431        // non-descriptive and doesn't even contain all the important data. It
432        // should be a stream of `RoomKeyInfo` like the OlmMachine has... But on
433        // the other hand we should just be able to use the corresponding
434        // OlmMachine stream and remove this. Currently we can't do this because
435        // the OlmMachine gets destroyed and recreated all the time to be able
436        // to support the notifications-related multiprocessing on iOS.
437        self.room_keys_stream().filter_map(move |import_result| {
438            let room_id = room_id.to_owned();
439
440            async move {
441                match import_result {
442                    Ok(mut import_result) => import_result.keys.remove(&room_id).map(Ok),
443                    Err(e) => Some(Err(e)),
444                }
445            }
446        })
447    }
448
449    /// Download all room keys for a certain room from the server-side key
450    /// backup.
451    pub async fn download_room_keys_for_room(&self, room_id: &RoomId) -> Result<(), Error> {
452        let olm_machine = self.client.olm_machine().await;
453        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
454
455        let backup_keys = olm_machine.store().load_backup_keys().await?;
456
457        if let Some(decryption_key) = backup_keys.decryption_key {
458            if let Some(version) = backup_keys.backup_version {
459                let request =
460                    get_backup_keys_for_room::v3::Request::new(version.clone(), room_id.to_owned());
461                let response = self.client.send(request).await?;
462
463                // Transform response to standard format (map of room ID -> room key).
464                let response = get_backup_keys::v3::Response::new(BTreeMap::from([(
465                    room_id.to_owned(),
466                    RoomKeyBackup::new(response.sessions),
467                )]));
468
469                self.handle_downloaded_room_keys(response, decryption_key, &version, olm_machine)
470                    .await?;
471            }
472        }
473
474        Ok(())
475    }
476
477    /// Download a single room key from the server-side key backup.
478    ///
479    /// Returns `true` if we managed to download a room key, `false` or an error
480    /// if we failed to download it. `false` indicates that there was no
481    /// error, we just don't have backups enabled so we can't download a
482    /// room key.
483    pub async fn download_room_key(
484        &self,
485        room_id: &RoomId,
486        session_id: &str,
487    ) -> Result<bool, Error> {
488        let olm_machine = self.client.olm_machine().await;
489        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
490
491        let backup_keys = olm_machine.store().load_backup_keys().await?;
492
493        if let Some(decryption_key) = backup_keys.decryption_key {
494            if let Some(version) = backup_keys.backup_version {
495                let request = get_backup_keys_for_session::v3::Request::new(
496                    version.clone(),
497                    room_id.to_owned(),
498                    session_id.to_owned(),
499                );
500                let response = self.client.send(request).await?;
501
502                // Transform response to standard format (map of room ID -> room key).
503                let response = get_backup_keys::v3::Response::new(BTreeMap::from([(
504                    room_id.to_owned(),
505                    RoomKeyBackup::new(BTreeMap::from([(
506                        session_id.to_owned(),
507                        response.key_data,
508                    )])),
509                )]));
510
511                self.handle_downloaded_room_keys(response, decryption_key, &version, olm_machine)
512                    .await?;
513
514                Ok(true)
515            } else {
516                Ok(false)
517            }
518        } else {
519            Ok(false)
520        }
521    }
522
523    /// Set the state of the backup.
524    fn set_state(&self, new_state: BackupState) {
525        let old_state = self.client.inner.e2ee.backup_state.global_state.set(new_state);
526
527        if old_state != new_state {
528            info!("Backup state changed from {old_state:?} to {new_state:?}");
529        }
530    }
531
532    /// Set the backup state to the `Enabled` variant and insert the backup key
533    /// and version into the [`OlmMachine`].
534    async fn enable(
535        &self,
536        olm_machine: &OlmMachine,
537        backup_key: MegolmV1BackupKey,
538        version: String,
539    ) -> Result<(), Error> {
540        backup_key.set_version(version);
541        olm_machine.backup_machine().enable_backup_v1(backup_key).await?;
542
543        self.set_state(BackupState::Enabled);
544
545        Ok(())
546    }
547
548    /// Decrypt and forward a response containing backed up room keys to the
549    /// [`OlmMachine`].
550    async fn handle_downloaded_room_keys(
551        &self,
552        backed_up_keys: get_backup_keys::v3::Response,
553        backup_decryption_key: BackupDecryptionKey,
554        backup_version: &str,
555        olm_machine: &OlmMachine,
556    ) -> Result<(), Error> {
557        let mut decrypted_room_keys: Vec<_> = Vec::new();
558
559        for (room_id, room_keys) in backed_up_keys.rooms {
560            for (session_id, room_key) in room_keys.sessions {
561                let room_key = match room_key.deserialize() {
562                    Ok(k) => k,
563                    Err(e) => {
564                        warn!(
565                            "Couldn't deserialize a room key we downloaded from backups, session \
566                             ID: {session_id}, error: {e:?}"
567                        );
568                        continue;
569                    }
570                };
571
572                let room_key =
573                    match backup_decryption_key.decrypt_session_data(room_key.session_data) {
574                        Ok(k) => k,
575                        Err(e) => {
576                            warn!(
577                                "Couldn't decrypt a room key we downloaded from backups, session \
578                                 ID: {session_id}, error: {e:?}"
579                            );
580                            continue;
581                        }
582                    };
583
584                decrypted_room_keys.push(ExportedRoomKey::from_backed_up_room_key(
585                    room_id.to_owned(),
586                    session_id,
587                    room_key,
588                ));
589            }
590        }
591
592        let result = olm_machine
593            .store()
594            .import_room_keys(decrypted_room_keys, Some(backup_version), |_, _| {})
595            .await?;
596
597        // Since we can't use the usual room keys stream from the `OlmMachine`
598        // we're going to send things out in our own custom broadcaster.
599        let _ = self.client.inner.e2ee.backup_state.room_keys_broadcaster.send(result);
600
601        Ok(())
602    }
603
604    /// Download all room keys from the backup on the homeserver.
605    async fn download_all_room_keys(
606        &self,
607        decryption_key: BackupDecryptionKey,
608        version: String,
609    ) -> Result<(), Error> {
610        let request = get_backup_keys::v3::Request::new(version.clone());
611        let response = self.client.send(request).await?;
612
613        let olm_machine = self.client.olm_machine().await;
614        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
615
616        self.handle_downloaded_room_keys(response, decryption_key, &version, olm_machine).await?;
617
618        Ok(())
619    }
620
621    fn room_keys_stream(
622        &self,
623    ) -> impl Stream<Item = Result<RoomKeyImportResult, BroadcastStreamRecvError>> {
624        BroadcastStream::new(self.client.inner.e2ee.backup_state.room_keys_broadcaster.subscribe())
625    }
626
627    /// Get info about the currently active backup from the server.
628    async fn get_current_version(
629        &self,
630    ) -> Result<Option<get_latest_backup_info::v3::Response>, Error> {
631        let request = get_latest_backup_info::v3::Request::new();
632
633        match self.client.send(request).await {
634            Ok(r) => Ok(Some(r)),
635            Err(e) => {
636                if let Some(kind) = e.client_api_error_kind() {
637                    if kind == &ErrorKind::NotFound {
638                        Ok(None)
639                    } else {
640                        Err(e.into())
641                    }
642                } else {
643                    Err(e.into())
644                }
645            }
646        }
647    }
648
649    async fn delete_backup_from_server(&self, version: String) -> Result<(), Error> {
650        let request = ruma::api::client::backup::delete_backup_version::v3::Request::new(version);
651
652        let ret = match self.client.send(request).await {
653            Ok(_) => Ok(()),
654            Err(e) => {
655                if let Some(kind) = e.client_api_error_kind() {
656                    if kind == &ErrorKind::NotFound {
657                        Ok(())
658                    } else {
659                        Err(e.into())
660                    }
661                } else {
662                    Err(e.into())
663                }
664            }
665        };
666
667        // If the request succeeded, the backup is gone. If it failed, we are not really
668        // sure what the backup state is. Either way, clear the cache so we check next
669        // time we need to know.
670        self.client.inner.e2ee.backup_state.clear_backup_exists_on_server();
671
672        ret
673    }
674
675    #[instrument(skip(self, olm_machine, request))]
676    async fn send_backup_request(
677        &self,
678        olm_machine: &OlmMachine,
679        request_id: &TransactionId,
680        request: KeysBackupRequest,
681    ) -> Result<(), Error> {
682        trace!("Uploading some room keys");
683
684        let add_backup_keys = add_backup_keys::v3::Request::new(request.version, request.rooms);
685
686        match self.client.send(add_backup_keys).await {
687            Ok(response) => {
688                olm_machine.mark_request_as_sent(request_id, &response).await?;
689
690                let new_counts = olm_machine.backup_machine().room_key_counts().await?;
691
692                self.client
693                    .inner
694                    .e2ee
695                    .backup_state
696                    .upload_progress
697                    .set(UploadState::Uploading(new_counts));
698
699                let delay =
700                    self.client.inner.e2ee.backup_state.upload_delay.read().unwrap().to_owned();
701                crate::sleep::sleep(delay).await;
702
703                Ok(())
704            }
705            Err(error) => {
706                if let Some(kind) = error.client_api_error_kind() {
707                    match kind {
708                        ErrorKind::NotFound => {
709                            warn!(
710                                "No backup found on the server, the backup likely got deleted, \
711                                 disabling backups."
712                            );
713
714                            self.handle_deleted_backup_version(olm_machine).await?;
715                        }
716                        ErrorKind::WrongRoomKeysVersion { current_version } => {
717                            warn!(
718                                new_version = current_version,
719                                "A new backup version was found on the server, disabling backups."
720                            );
721
722                            // TODO: If we're verified and there are other devices besides us,
723                            // request the new backup key over `m.secret.send`.
724
725                            self.handle_deleted_backup_version(olm_machine).await?;
726                        }
727
728                        _ => (),
729                    }
730                }
731
732                Err(error.into())
733            }
734        }
735    }
736
737    /// Poll the [`OlmMachine`] for room keys which need to be backed up and
738    /// send out the request to the homeserver.
739    ///
740    /// This should only be called by the [`BackupUploadingTask`].
741    ///
742    /// [`BackupUploadingTask`]: crate::client::tasks::BackupUploadingTask
743    pub(crate) async fn backup_room_keys(&self) -> Result<(), Error> {
744        let _guard = self.client.locks().backup_upload_lock.lock().await;
745
746        let olm_machine = self.client.olm_machine().await;
747        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
748
749        while let Some((request_id, request)) = olm_machine.backup_machine().backup().await? {
750            self.send_backup_request(olm_machine, &request_id, request).await?;
751        }
752
753        self.client.inner.e2ee.backup_state.upload_progress.set(UploadState::Done);
754
755        Ok(())
756    }
757
758    /// Set up a `m.secret.send` listener and re-enable backups if we have a
759    /// backup recovery key stored.
760    pub(crate) async fn setup_and_resume(&self) -> Result<(), Error> {
761        info!("Setting up secret listeners and trying to resume backups");
762
763        self.client.add_event_handler(Self::secret_send_event_handler);
764
765        if self.client.inner.e2ee.encryption_settings.backup_download_strategy
766            == BackupDownloadStrategy::AfterDecryptionFailure
767        {
768            self.client.add_event_handler(Self::utd_event_handler);
769        }
770
771        self.maybe_resume_backups().await?;
772
773        Ok(())
774    }
775
776    /// Try to enable backups with the given backup recovery key.
777    ///
778    /// This should be called if we receive a backup recovery, either:
779    ///
780    /// * As an `m.secret.send` to-device message from a trusted device.
781    /// * From 4S (i.e. from the `m.megolm_backup.v1` event global account
782    ///   data).
783    ///
784    /// In both cases the method will compare the currently active backup
785    /// version to the backup recovery key's version and, if there is a match,
786    /// activate backups on this device and start uploading room keys to the
787    /// backup.
788    ///
789    /// Returns true if backups were just enabled or were already enabled,
790    /// otherwise false.
791    #[instrument(skip_all)]
792    pub(crate) async fn maybe_enable_backups(
793        &self,
794        maybe_recovery_key: &str,
795    ) -> Result<bool, Error> {
796        let _guard = self.client.locks().backup_modify_lock.lock().await;
797
798        // Create a future here which allows us to catch any failure that might happen
799        // so we can later on fall back to the correct `BackupState`.
800        let future = async {
801            self.set_state(BackupState::Enabling);
802
803            let olm_machine = self.client.olm_machine().await;
804            let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
805            let backup_machine = olm_machine.backup_machine();
806
807            let decryption_key =
808                BackupDecryptionKey::from_base64(maybe_recovery_key).map_err(|e| {
809                    <serde_json::Error as serde::de::Error>::custom(format!(
810                        "Couldn't deserialize the backup recovery key: {e:?}"
811                    ))
812                })?;
813
814            // Let's try to see if there's a backup on the homeserver.
815            let current_version = self.get_current_version().await?;
816
817            let Some(current_version) = current_version else {
818                warn!("Tried to enable backups, but no backup version was found on the server.");
819                return Ok(false);
820            };
821
822            Span::current().record("backup_version", &current_version.version);
823
824            let backup_info: RoomKeyBackupInfo = current_version.algorithm.deserialize_as()?;
825            let stored_keys = backup_machine.get_backup_keys().await?;
826
827            if stored_keys.backup_version.as_ref() == Some(&current_version.version)
828                && self.are_enabled().await
829            {
830                // If we already have a backup enabled which is using the currently active
831                // backup version, do nothing but tell the caller using the return value that
832                // backups are enabled.
833                Ok(true)
834            } else if decryption_key.backup_key_matches(&backup_info) {
835                info!(
836                    "We have found the correct backup recovery key. Storing the backup recovery \
837                     key and enabling backups."
838                );
839
840                // We're enabling a new backup, reset the `backed_up` flags on the room keys and
841                // remove any key/version we might have.
842                backup_machine.disable_backup().await?;
843
844                let backup_key = decryption_key.megolm_v1_public_key();
845                backup_key.set_version(current_version.version.to_owned());
846
847                // Persist the new keys and enable the backup.
848                backup_machine
849                    .save_decryption_key(
850                        Some(decryption_key.to_owned()),
851                        Some(current_version.version.to_owned()),
852                    )
853                    .await?;
854                backup_machine.enable_backup_v1(backup_key).await?;
855
856                // If the user has set up the client to download any room keys, do so now. This
857                // is not really useful in a real scenario since the API to
858                // download room keys is not paginated.
859                //
860                // You need to download all room keys at once, parse a potentially huge JSON
861                // response and decrypt all the room keys found in the backup.
862                //
863                // This doesn't work for any sizeable account.
864                if self.client.inner.e2ee.encryption_settings.backup_download_strategy
865                    == BackupDownloadStrategy::OneShot
866                {
867                    self.set_state(BackupState::Downloading);
868
869                    if let Err(e) =
870                        self.download_all_room_keys(decryption_key, current_version.version).await
871                    {
872                        warn!("Couldn't automatically download all room keys from backup: {e:?}");
873                    }
874                }
875
876                // Trigger the upload of any room keys we might need to upload.
877                self.maybe_trigger_backup();
878
879                Ok(true)
880            } else {
881                let derived_key = decryption_key.megolm_v1_public_key();
882                let downloaded_key = current_version.algorithm;
883
884                warn!(
885                    ?derived_key,
886                    ?downloaded_key,
887                    "Found an active backup but the recovery key we received isn't the one used for \
888                     this backup version"
889                );
890
891                Ok(false)
892            }
893        };
894
895        match future.await {
896            Ok(enabled) => {
897                if enabled {
898                    self.set_state(BackupState::Enabled);
899                } else {
900                    self.set_state(BackupState::Unknown);
901                }
902
903                Ok(enabled)
904            }
905            Err(e) => {
906                self.set_state(BackupState::Unknown);
907
908                Err(e)
909            }
910        }
911    }
912
913    /// Try to resume backups from a backup recovery key we have found in the
914    /// crypto store.
915    ///
916    /// Returns true if backups have been resumed, false otherwise.
917    async fn resume_backup_from_stored_backup_key(
918        &self,
919        olm_machine: &OlmMachine,
920    ) -> Result<bool, Error> {
921        let backup_keys = olm_machine.store().load_backup_keys().await?;
922
923        if let Some(decryption_key) = backup_keys.decryption_key {
924            if let Some(version) = backup_keys.backup_version {
925                let backup_key = decryption_key.megolm_v1_public_key();
926
927                self.enable(olm_machine, backup_key, version).await?;
928
929                Ok(true)
930            } else {
931                Ok(false)
932            }
933        } else {
934            Ok(false)
935        }
936    }
937
938    /// Try to resume backups by iterating through the `m.secret.send` to-device
939    /// messages the [`OlmMachine`] has received and stored in the secret inbox.
940    async fn maybe_resume_from_secret_inbox(&self, olm_machine: &OlmMachine) -> Result<(), Error> {
941        let secrets = olm_machine.store().get_secrets_from_inbox(&SecretName::RecoveryKey).await?;
942
943        for secret in secrets {
944            if self.maybe_enable_backups(&secret.event.content.secret).await? {
945                break;
946            }
947        }
948
949        olm_machine.store().delete_secrets_from_inbox(&SecretName::RecoveryKey).await?;
950
951        Ok(())
952    }
953
954    /// Check and re-enable a backup if we have a backup recovery key locally.
955    async fn maybe_resume_backups(&self) -> Result<(), Error> {
956        let olm_machine = self.client.olm_machine().await;
957        let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
958
959        // Let us first check if we have a stored backup recovery key and a backup
960        // version.
961        if !self.resume_backup_from_stored_backup_key(olm_machine).await? {
962            // We didn't manage to enable backups from a stored backup recovery key, let us
963            // check our secret inbox. Perhaps we can find a valid key there.
964            self.maybe_resume_from_secret_inbox(olm_machine).await?;
965        }
966
967        Ok(())
968    }
969
970    /// Listen for `m.secret.send` to-device messages and check the secret inbox
971    /// if we do receive one.
972    #[instrument(skip_all)]
973    pub(crate) async fn secret_send_event_handler(_: ToDeviceSecretSendEvent, client: Client) {
974        let olm_machine = client.olm_machine().await;
975
976        // TODO: Because of our crude multi-process support, which reloads the whole
977        // [`OlmMachine`] the `secrets_stream` might stop giving you updates. Once
978        // that's fixed, stop listening to individual secret send events and
979        // listen to the secrets stream.
980        if let Some(olm_machine) = olm_machine.as_ref() {
981            if let Err(e) =
982                client.encryption().backups().maybe_resume_from_secret_inbox(olm_machine).await
983            {
984                error!("Could not handle `m.secret.send` event: {e:?}");
985            }
986        } else {
987            error!("Tried to handle a `m.secret.send` event but no OlmMachine was initialized");
988        }
989    }
990
991    /// Handle UTD events by triggering download from key backup.
992    ///
993    /// This function is registered as an event handler; it exists to deal
994    /// with cases where [`Room::decrypt_event`] is not called and instead the
995    /// event should be decrypted by the time this crate sees the event, such as
996    /// for events received via `/sync` (as opposed to via `/messages`,
997    /// `/context`, etc.)
998    #[allow(clippy::unused_async)] // Because it's used as an event handler, which must be async.
999    pub(crate) async fn utd_event_handler(
1000        event: Raw<OriginalSyncRoomEncryptedEvent>,
1001        room: Room,
1002        client: Client,
1003    ) {
1004        client.encryption().backups().maybe_download_room_key(room.room_id().to_owned(), event);
1005    }
1006
1007    /// Send a notification to the task responsible for key backup downloads
1008    /// that it should attempt to download the keys for the given event.
1009    #[cfg(not(feature = "experimental-encrypted-state-events"))]
1010    pub(crate) fn maybe_download_room_key(
1011        &self,
1012        room_id: OwnedRoomId,
1013        event: Raw<OriginalSyncRoomEncryptedEvent>,
1014    ) {
1015        let tasks = self.client.inner.e2ee.tasks.lock();
1016        if let Some(task) = tasks.download_room_keys.as_ref() {
1017            task.trigger_download_for_utd_event(room_id, event);
1018        }
1019    }
1020
1021    /// Send a notification to the task responsible for key backup downloads
1022    /// that it should attempt to download the keys for the given event.
1023    #[cfg(feature = "experimental-encrypted-state-events")]
1024    pub(crate) fn maybe_download_room_key<T: JsonCastable<EncryptedEvent>>(
1025        &self,
1026        room_id: OwnedRoomId,
1027        event: Raw<T>,
1028    ) {
1029        let tasks = self.client.inner.e2ee.tasks.lock();
1030        if let Some(task) = tasks.download_room_keys.as_ref() {
1031            task.trigger_download_for_utd_event(room_id, event);
1032        }
1033    }
1034
1035    /// Send a notification to the task which is responsible for uploading room
1036    /// keys to the backup that it might have new room keys to back up.
1037    pub(crate) fn maybe_trigger_backup(&self) {
1038        let tasks = self.client.inner.e2ee.tasks.lock();
1039
1040        if let Some(tasks) = tasks.upload_room_keys.as_ref() {
1041            tasks.trigger_upload();
1042        }
1043    }
1044
1045    /// Disable our backups locally if we notice that the backup has been
1046    /// removed on the homeserver.
1047    async fn handle_deleted_backup_version(&self, olm_machine: &OlmMachine) -> Result<(), Error> {
1048        olm_machine.backup_machine().disable_backup().await?;
1049        self.set_state(BackupState::Unknown);
1050
1051        Ok(())
1052    }
1053}
1054
1055#[cfg(all(test, not(target_family = "wasm")))]
1056mod test {
1057    use std::time::Duration;
1058
1059    use matrix_sdk_test::async_test;
1060    use serde_json::json;
1061    use wiremock::{
1062        matchers::{header, method, path},
1063        Mock, MockServer, ResponseTemplate,
1064    };
1065
1066    use super::*;
1067    use crate::test_utils::{logged_in_client, mocks::MatrixMockServer};
1068
1069    fn room_key() -> ExportedRoomKey {
1070        let json = json!({
1071            "algorithm": "m.megolm.v1.aes-sha2",
1072            "room_id": "!DovneieKSTkdHKpIXy:morpheus.localhost",
1073            "sender_key": "DeHIg4gwhClxzFYcmNntPNF9YtsdZbmMy8+3kzCMXHA",
1074            "session_id": "gM8i47Xhu0q52xLfgUXzanCMpLinoyVyH7R58cBuVBU",
1075            "session_key": "AQAAAABvWMNZjKFtebYIePKieQguozuoLgzeY6wKcyJjLJcJtQgy1dPqTBD12U+XrYLrRHn\
1076                            lKmxoozlhFqJl456+9hlHCL+yq+6ScFuBHtJepnY1l2bdLb4T0JMDkNsNErkiLiLnD6yp3J\
1077                            DSjIhkdHxmup/huygrmroq6/L5TaThEoqvW4DPIuO14btKudsS34FF82pwjKS4p6Mlch+0e\
1078                            fHAblQV",
1079            "sender_claimed_keys":{},
1080            "forwarding_curve25519_key_chain":[]
1081        });
1082
1083        serde_json::from_value(json)
1084            .expect("We should be able to deserialize our exported room key")
1085    }
1086
1087    async fn backup_disabling_test_body(
1088        client: &Client,
1089        server: &MockServer,
1090        put_response: ResponseTemplate,
1091    ) {
1092        let _post_scope = Mock::given(method("POST"))
1093            .and(path("_matrix/client/unstable/room_keys/version"))
1094            .and(header("authorization", "Bearer 1234"))
1095            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1096              "version": "1"
1097            })))
1098            .expect(1)
1099            .named("POST for the backup creation")
1100            .mount_as_scoped(server)
1101            .await;
1102
1103        let _put_scope = Mock::given(method("PUT"))
1104            .and(path("_matrix/client/unstable/room_keys/keys"))
1105            .and(header("authorization", "Bearer 1234"))
1106            .respond_with(put_response)
1107            .expect(1)
1108            .named("POST for the backup creation")
1109            .mount_as_scoped(server)
1110            .await;
1111
1112        client
1113            .encryption()
1114            .backups()
1115            .create()
1116            .await
1117            .expect("We should be able to create a new backup");
1118
1119        assert_eq!(client.encryption().backups().state(), BackupState::Enabled);
1120
1121        client
1122            .encryption()
1123            .backups()
1124            .backup_room_keys()
1125            .await
1126            .expect_err("Backups should be disabled");
1127
1128        assert_eq!(client.encryption().backups().state(), BackupState::Unknown);
1129    }
1130
1131    #[async_test]
1132    async fn test_backup_disabling_after_remote_deletion() {
1133        let server = MockServer::start().await;
1134        let client = logged_in_client(Some(server.uri())).await;
1135
1136        {
1137            let machine = client.olm_machine().await;
1138            machine
1139                .as_ref()
1140                .unwrap()
1141                .store()
1142                .import_exported_room_keys(vec![room_key()], |_, _| {})
1143                .await
1144                .expect("We should be able to import a room key");
1145        }
1146
1147        backup_disabling_test_body(
1148            &client,
1149            &server,
1150            ResponseTemplate::new(404).set_body_json(json!({
1151                "errcode": "M_NOT_FOUND",
1152                "error": "Unknown backup version"
1153            })),
1154        )
1155        .await;
1156
1157        backup_disabling_test_body(
1158            &client,
1159            &server,
1160            ResponseTemplate::new(403).set_body_json(json!({
1161                "current_version": "42",
1162                "errcode": "M_WRONG_ROOM_KEYS_VERSION",
1163                "error": "Wrong backup version."
1164            })),
1165        )
1166        .await;
1167
1168        server.verify().await;
1169    }
1170
1171    #[async_test]
1172    async fn test_when_a_backup_exists_then_fetch_exists_on_server_returns_true() {
1173        let server = MatrixMockServer::new().await;
1174        let client = server.client_builder().build().await;
1175
1176        server.mock_room_keys_version().exists().expect(1).mount().await;
1177
1178        let exists = client
1179            .encryption()
1180            .backups()
1181            .fetch_exists_on_server()
1182            .await
1183            .expect("We should be able to check if backups exist on the server");
1184
1185        assert!(exists, "We should deduce that a backup exists on the server");
1186    }
1187
1188    #[async_test]
1189    async fn test_repeated_calls_to_fetch_exists_on_server_makes_repeated_requests() {
1190        let server = MatrixMockServer::new().await;
1191        let client = server.client_builder().build().await;
1192
1193        // Expect 2 requests to the server
1194        server.mock_room_keys_version().exists().expect(2).mount().await;
1195
1196        let backups = client.encryption().backups();
1197
1198        // Call fetch_exists_on_server twice
1199        backups.fetch_exists_on_server().await.unwrap();
1200        let exists = backups.fetch_exists_on_server().await.unwrap();
1201
1202        assert!(exists, "We should deduce that a backup exists on the server");
1203    }
1204
1205    #[async_test]
1206    async fn test_when_no_backup_exists_then_fetch_exists_on_server_returns_false() {
1207        let server = MatrixMockServer::new().await;
1208        let client = server.client_builder().build().await;
1209
1210        server.mock_room_keys_version().none().expect(1).mount().await;
1211
1212        let exists = client
1213            .encryption()
1214            .backups()
1215            .fetch_exists_on_server()
1216            .await
1217            .expect("We should be able to check if backups exist on the server");
1218
1219        assert!(!exists, "We should deduce that no backup exists on the server");
1220    }
1221
1222    #[async_test]
1223    async fn test_when_server_returns_an_error_then_fetch_exists_on_server_returns_an_error() {
1224        let server = MatrixMockServer::new().await;
1225        let client = server.client_builder().build().await;
1226
1227        {
1228            let _scope =
1229                server.mock_room_keys_version().error429().expect(1).mount_as_scoped().await;
1230
1231            client.encryption().backups().fetch_exists_on_server().await.expect_err(
1232                "If the /version endpoint returns a non 404 error we should throw an error",
1233            );
1234        }
1235
1236        {
1237            let _scope =
1238                server.mock_room_keys_version().error404().expect(1).mount_as_scoped().await;
1239
1240            client.encryption().backups().fetch_exists_on_server().await.expect_err(
1241                "If the /version endpoint returns a non-Matrix 404 error we should throw an error",
1242            );
1243        }
1244    }
1245
1246    #[async_test]
1247    async fn test_when_a_backup_exists_then_exists_on_server_returns_true() {
1248        let server = MatrixMockServer::new().await;
1249        let client = server.client_builder().build().await;
1250
1251        server.mock_room_keys_version().exists().expect(1).mount().await;
1252
1253        let exists = client
1254            .encryption()
1255            .backups()
1256            .exists_on_server()
1257            .await
1258            .expect("We should be able to check if backups exist on the server");
1259
1260        assert!(exists, "We should deduce that a backup exists on the server");
1261    }
1262
1263    #[async_test]
1264    async fn test_when_no_backup_exists_then_exists_on_server_returns_false() {
1265        let server = MatrixMockServer::new().await;
1266        let client = server.client_builder().build().await;
1267
1268        server.mock_room_keys_version().none().expect(1).mount().await;
1269
1270        let exists = client
1271            .encryption()
1272            .backups()
1273            .exists_on_server()
1274            .await
1275            .expect("We should be able to check if backups exist on the server");
1276
1277        assert!(!exists, "We should deduce that no backup exists on the server");
1278    }
1279
1280    #[async_test]
1281    async fn test_when_server_returns_an_error_then_exists_on_server_returns_an_error() {
1282        let server = MatrixMockServer::new().await;
1283        let client = server.client_builder().build().await;
1284
1285        {
1286            let _scope =
1287                server.mock_room_keys_version().error429().expect(1).mount_as_scoped().await;
1288
1289            client.encryption().backups().exists_on_server().await.expect_err(
1290                "If the /version endpoint returns a non 404 error we should throw an error",
1291            );
1292        }
1293
1294        {
1295            let _scope =
1296                server.mock_room_keys_version().error404().expect(1).mount_as_scoped().await;
1297
1298            client.encryption().backups().exists_on_server().await.expect_err(
1299                "If the /version endpoint returns a non-Matrix 404 error we should throw an error",
1300            );
1301        }
1302    }
1303
1304    #[async_test]
1305    async fn test_repeated_calls_to_exists_on_server_do_not_make_additional_requests() {
1306        let server = MatrixMockServer::new().await;
1307        let client = server.client_builder().build().await;
1308
1309        // Create a mock stating that the request should only be made once
1310        server.mock_room_keys_version().exists().expect(1).mount().await;
1311
1312        let backups = client.encryption().backups();
1313
1314        // Call exists_on_server several times
1315        backups.exists_on_server().await.unwrap();
1316        backups.exists_on_server().await.unwrap();
1317        backups.exists_on_server().await.unwrap();
1318
1319        let exists = backups
1320            .exists_on_server()
1321            .await
1322            .expect("We should be able to check if backups exist on the server");
1323
1324        assert!(exists, "We should deduce that a backup exists on the server");
1325
1326        // We check expectations here, confirming that only one call was made
1327    }
1328
1329    #[async_test]
1330    async fn test_adding_a_backup_invalidates_exists_on_server_cache() {
1331        let server = MatrixMockServer::new().await;
1332        let client = server.client_builder().build().await;
1333        let backups = client.encryption().backups();
1334
1335        {
1336            let _scope = server.mock_room_keys_version().none().expect(1).mount_as_scoped().await;
1337
1338            // Call exists_on_server to fill the cache
1339            let exists = backups.exists_on_server().await.unwrap();
1340            assert!(!exists, "No backup exists at this point");
1341        }
1342
1343        // Create a new backup. Should invalidate the cache
1344        server.mock_add_room_keys_version().ok().expect(1).mount().await;
1345        backups.create().await.expect("Failed to create a backup");
1346
1347        server.mock_room_keys_version().exists().expect(1).mount().await;
1348        let exists = backups
1349            .exists_on_server()
1350            .await
1351            .expect("We should be able to check if backups exist on the server");
1352
1353        assert!(exists, "But now a backup does exist");
1354    }
1355
1356    #[async_test]
1357    async fn test_removing_a_backup_invalidates_exists_on_server_cache() {
1358        let server = MatrixMockServer::new().await;
1359        let client = server.client_builder().build().await;
1360        let backups = client.encryption().backups();
1361
1362        {
1363            let _scope = server.mock_room_keys_version().exists().expect(1).mount_as_scoped().await;
1364
1365            // Call exists_on_server to fill the cache
1366            let exists = backups.exists_on_server().await.unwrap();
1367            assert!(exists, "A backup exists at this point");
1368        }
1369
1370        // Delete the backup. Should invalidate the cache
1371        server.mock_delete_room_keys_version().ok().expect(1).mount().await;
1372        backups.delete_backup_from_server("1".to_owned()).await.expect("Failed to delete a backup");
1373
1374        server.mock_room_keys_version().none().expect(1).mount().await;
1375        let exists = backups
1376            .exists_on_server()
1377            .await
1378            .expect("We should be able to check if backups exist on the server");
1379
1380        assert!(!exists, "But now there is no backup");
1381    }
1382
1383    #[async_test]
1384    async fn test_waiting_for_steady_state_resets_the_delay() {
1385        let server = MatrixMockServer::new().await;
1386        let client = server.client_builder().build().await;
1387
1388        server.mock_add_room_keys_version().ok().expect(1).mount().await;
1389
1390        client
1391            .encryption()
1392            .backups()
1393            .create()
1394            .await
1395            .expect("We should be able to create a new backup");
1396
1397        let backups = client.encryption().backups();
1398
1399        let old_duration =
1400            { client.inner.e2ee.backup_state.upload_delay.read().unwrap().to_owned() };
1401
1402        let wait_for_steady_state =
1403            backups.wait_for_steady_state().with_delay(Duration::from_nanos(100));
1404
1405        let mut progress_stream = wait_for_steady_state.subscribe_to_progress();
1406
1407        let task = matrix_sdk_common::executor::spawn({
1408            let client = client.to_owned();
1409            async move {
1410                while let Some(state) = progress_stream.next().await {
1411                    let Ok(state) = state else {
1412                        panic!("Error while waiting for the upload state")
1413                    };
1414
1415                    match state {
1416                        UploadState::Idle => (),
1417                        UploadState::Done => {
1418                            let current_delay = {
1419                                client
1420                                    .inner
1421                                    .e2ee
1422                                    .backup_state
1423                                    .upload_delay
1424                                    .read()
1425                                    .unwrap()
1426                                    .to_owned()
1427                            };
1428
1429                            assert_ne!(current_delay, old_duration);
1430                            break;
1431                        }
1432                        _ => panic!("We should not have entered any other state"),
1433                    }
1434                }
1435            }
1436        });
1437
1438        wait_for_steady_state.await.expect("We should be able to wait for the steady state");
1439        task.await.unwrap();
1440
1441        let current_duration =
1442            { client.inner.e2ee.backup_state.upload_delay.read().unwrap().to_owned() };
1443
1444        assert_eq!(old_duration, current_duration);
1445    }
1446}