matrix_sdk/encryption/backups/
futures.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Named futures for the backup support.
16
17use std::{future::IntoFuture, time::Duration};
18
19use futures_core::Stream;
20use futures_util::StreamExt;
21use matrix_sdk_common::boxed_into_future;
22use thiserror::Error;
23use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
24use tracing::trace;
25
26use super::{Backups, UploadState};
27use crate::utils::ChannelObservable;
28
29/// Error describing the ways that waiting for the backup upload to settle down
30/// can fail.
31#[derive(Clone, Copy, Debug, Error)]
32pub enum SteadyStateError {
33    /// The currently active backup got either deleted or a new one was created.
34    ///
35    /// No further room keys will be uploaded to the currently active
36    /// backup.
37    #[error("The backup got disabled while waiting for the room keys to be uploaded.")]
38    BackupDisabled,
39    /// Uploading the room keys to the homeserver failed due to a network error.
40    ///
41    /// Uploading will be retried again at a later point in time, or
42    /// immediately if you wait for the steady state again.
43    #[error("There was a network connection error.")]
44    Connection,
45    /// We missed some updates to the [`UploadState`] from the upload task.
46    ///
47    /// This error doesn't imply that there was an error with the uploading of
48    /// room keys, it just means that we didn't receive all the transitions
49    /// in the [`UploadState`]. You might want to retry waiting for the
50    /// steady state.
51    #[error("We couldn't read status updates from the upload task quickly enough.")]
52    Lagged,
53}
54
55/// Named future for the [`Backups::wait_for_steady_state()`] method.
56#[derive(Debug)]
57pub struct WaitForSteadyState<'a> {
58    pub(super) backups: &'a Backups,
59    pub(super) progress: ChannelObservable<UploadState>,
60    pub(super) timeout: Option<Duration>,
61}
62
63impl WaitForSteadyState<'_> {
64    /// Subscribe to the progress of the backup upload step while waiting for it
65    /// to settle down.
66    pub fn subscribe_to_progress(
67        &self,
68    ) -> impl Stream<Item = Result<UploadState, BroadcastStreamRecvError>> {
69        self.progress.subscribe()
70    }
71
72    /// Set the delay between each upload request.
73    ///
74    /// Uploading room keys might require multiple requests to be sent out. The
75    /// [`Client`] waits for a while before it sends the next request out.
76    ///
77    /// This method allows you to override how long the [`Client`] will wait.
78    /// The default value is 100 ms.
79    ///
80    /// [`Client`]: crate::Client
81    pub fn with_delay(mut self, delay: Duration) -> Self {
82        self.timeout = Some(delay);
83
84        self
85    }
86}
87
88impl<'a> IntoFuture for WaitForSteadyState<'a> {
89    type Output = Result<(), SteadyStateError>;
90    boxed_into_future!(extra_bounds: 'a);
91
92    fn into_future(self) -> Self::IntoFuture {
93        Box::pin(async move {
94            let Self { backups, timeout, progress } = self;
95
96            trace!("Creating a stream to wait for the steady state");
97
98            let mut progress_stream = progress.subscribe();
99
100            let old_delay = if let Some(delay) = timeout {
101                let mut lock = backups.client.inner.e2ee.backup_state.upload_delay.write().unwrap();
102                let old_delay = Some(lock.to_owned());
103
104                *lock = delay;
105
106                old_delay
107            } else {
108                None
109            };
110
111            trace!("Waiting for the upload steady state");
112
113            let ret = if backups.are_enabled().await {
114                backups.maybe_trigger_backup();
115
116                let mut ret = Ok(());
117
118                // TODO: Do we want to be smart here and remember the count when we started
119                // waiting and prevent the total from increasing, in case new room
120                // keys arrive after we started waiting.
121                while let Some(state) = progress_stream.next().await {
122                    trace!(?state, "Update state while waiting for the backup steady state");
123
124                    match state {
125                        Ok(UploadState::Done) => {
126                            ret = Ok(());
127                            break;
128                        }
129                        Ok(UploadState::Error) => {
130                            if backups.are_enabled().await {
131                                ret = Err(SteadyStateError::Connection);
132                            } else {
133                                ret = Err(SteadyStateError::BackupDisabled);
134                            }
135
136                            break;
137                        }
138                        Err(_) => {
139                            ret = Err(SteadyStateError::Lagged);
140                            break;
141                        }
142                        _ => (),
143                    }
144                }
145
146                ret
147            } else {
148                Err(SteadyStateError::BackupDisabled)
149            };
150
151            if let Some(old_delay) = old_delay {
152                let mut lock = backups.client.inner.e2ee.backup_state.upload_delay.write().unwrap();
153                *lock = old_delay;
154            }
155
156            ret
157        })
158    }
159}