matrix_sdk/encryption/backups/futures.rs
1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Named futures for the backup support.
16
17use std::{future::IntoFuture, time::Duration};
18
19use futures_core::Stream;
20use futures_util::StreamExt;
21use matrix_sdk_common::boxed_into_future;
22use thiserror::Error;
23use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
24use tracing::trace;
25
26use super::{Backups, UploadState};
27use crate::utils::ChannelObservable;
28
29/// Error describing the ways that waiting for the backup upload to settle down
30/// can fail.
31#[derive(Clone, Copy, Debug, Error)]
32pub enum SteadyStateError {
33 /// The currently active backup got either deleted or a new one was created.
34 ///
35 /// No further room keys will be uploaded to the currently active
36 /// backup.
37 #[error("The backup got disabled while waiting for the room keys to be uploaded.")]
38 BackupDisabled,
39 /// Uploading the room keys to the homeserver failed due to a network error.
40 ///
41 /// Uploading will be retried again at a later point in time, or
42 /// immediately if you wait for the steady state again.
43 #[error("There was a network connection error.")]
44 Connection,
45 /// We missed some updates to the [`UploadState`] from the upload task.
46 ///
47 /// This error doesn't imply that there was an error with the uploading of
48 /// room keys, it just means that we didn't receive all the transitions
49 /// in the [`UploadState`]. You might want to retry waiting for the
50 /// steady state.
51 #[error("We couldn't read status updates from the upload task quickly enough.")]
52 Lagged,
53}
54
55/// Named future for the [`Backups::wait_for_steady_state()`] method.
56#[derive(Debug)]
57pub struct WaitForSteadyState<'a> {
58 pub(super) backups: &'a Backups,
59 pub(super) progress: ChannelObservable<UploadState>,
60 pub(super) timeout: Option<Duration>,
61}
62
63impl WaitForSteadyState<'_> {
64 /// Subscribe to the progress of the backup upload step while waiting for it
65 /// to settle down.
66 pub fn subscribe_to_progress(
67 &self,
68 ) -> impl Stream<Item = Result<UploadState, BroadcastStreamRecvError>> {
69 self.progress.subscribe()
70 }
71
72 /// Set the delay between each upload request.
73 ///
74 /// Uploading room keys might require multiple requests to be sent out. The
75 /// [`Client`] waits for a while before it sends the next request out.
76 ///
77 /// This method allows you to override how long the [`Client`] will wait.
78 /// The default value is 100 ms.
79 ///
80 /// [`Client`]: crate::Client
81 pub fn with_delay(mut self, delay: Duration) -> Self {
82 self.timeout = Some(delay);
83
84 self
85 }
86}
87
88impl<'a> IntoFuture for WaitForSteadyState<'a> {
89 type Output = Result<(), SteadyStateError>;
90 boxed_into_future!(extra_bounds: 'a);
91
92 fn into_future(self) -> Self::IntoFuture {
93 Box::pin(async move {
94 let Self { backups, timeout, progress } = self;
95
96 trace!("Creating a stream to wait for the steady state");
97
98 let mut progress_stream = progress.subscribe();
99
100 let old_delay = if let Some(delay) = timeout {
101 let mut lock = backups.client.inner.e2ee.backup_state.upload_delay.write().unwrap();
102 let old_delay = Some(lock.to_owned());
103
104 *lock = delay;
105
106 old_delay
107 } else {
108 None
109 };
110
111 trace!("Waiting for the upload steady state");
112
113 let ret = if backups.are_enabled().await {
114 backups.maybe_trigger_backup();
115
116 let mut ret = Ok(());
117
118 // TODO: Do we want to be smart here and remember the count when we started
119 // waiting and prevent the total from increasing, in case new room
120 // keys arrive after we started waiting.
121 while let Some(state) = progress_stream.next().await {
122 trace!(?state, "Update state while waiting for the backup steady state");
123
124 match state {
125 Ok(UploadState::Done) => {
126 ret = Ok(());
127 break;
128 }
129 Ok(UploadState::Error) => {
130 if backups.are_enabled().await {
131 ret = Err(SteadyStateError::Connection);
132 } else {
133 ret = Err(SteadyStateError::BackupDisabled);
134 }
135
136 break;
137 }
138 Err(_) => {
139 ret = Err(SteadyStateError::Lagged);
140 break;
141 }
142 _ => (),
143 }
144 }
145
146 ret
147 } else {
148 Err(SteadyStateError::BackupDisabled)
149 };
150
151 if let Some(old_delay) = old_delay {
152 let mut lock = backups.client.inner.e2ee.backup_state.upload_delay.write().unwrap();
153 *lock = old_delay;
154 }
155
156 ret
157 })
158 }
159}