matrix_sdk/encryption/recovery/
futures.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Named futures for the recovery support.
16
17use std::future::IntoFuture;
18
19use futures_core::Stream;
20use futures_util::{pin_mut, StreamExt};
21use matrix_sdk_common::boxed_into_future;
22use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
23use tracing::{warn, Instrument, Span};
24
25use super::{EnableProgress, Recovery, RecoveryError, Result};
26use crate::{
27    encryption::{backups::UploadState, secret_storage::SecretStore},
28    utils::ChannelObservable,
29};
30
31/// Named future for the [`Recovery::enable()`] method.
32#[derive(Debug)]
33pub struct Enable<'a> {
34    pub(super) recovery: &'a Recovery,
35    pub(super) progress: ChannelObservable<EnableProgress>,
36    pub(super) wait_for_backups_upload: bool,
37    pub(super) passphrase: Option<&'a str>,
38    tracing_span: Span,
39}
40
41impl<'a> Enable<'a> {
42    pub(super) fn new(recovery: &'a Recovery) -> Self {
43        Self {
44            recovery,
45            progress: Default::default(),
46            wait_for_backups_upload: false,
47            passphrase: None,
48            tracing_span: Span::current(),
49        }
50    }
51
52    /// Subscribe to updates to the recovery enabling progress.
53    pub fn subscribe_to_progress(
54        &self,
55    ) -> impl Stream<Item = Result<EnableProgress, BroadcastStreamRecvError>> {
56        self.progress.subscribe()
57    }
58
59    /// Should the enabling of the recovery also wait for *all* room keys to be
60    /// uploaded to the server-side key backup?
61    ///
62    /// This is useful if the user is enabling recovery and the room key backup
63    /// just before logging out. Otherwise the logout might finish before
64    /// all room keys have been backed up and thus historic messages will
65    /// fail to decrypt once the user logs back in again.
66    pub fn wait_for_backups_to_upload(mut self) -> Self {
67        self.wait_for_backups_upload = true;
68
69        self
70    }
71
72    /// In addition to the recovery key the [`Recovery::enable()`] method
73    /// returns, allow this passphrase to be used for the
74    /// [`Recovery::recover()`] method.
75    pub fn with_passphrase(mut self, passphrase: &'a str) -> Self {
76        self.passphrase = Some(passphrase);
77
78        self
79    }
80}
81
82impl<'a> IntoFuture for Enable<'a> {
83    type Output = Result<String>;
84    boxed_into_future!(extra_bounds: 'a);
85
86    fn into_future(self) -> Self::IntoFuture {
87        let Self { recovery, progress, wait_for_backups_upload, passphrase, tracing_span } = self;
88
89        let future = async move {
90            if !recovery.client.encryption().backups().are_enabled().await {
91                if recovery.client.encryption().backups().fetch_exists_on_server().await? {
92                    return Err(RecoveryError::BackupExistsOnServer);
93                } else {
94                    progress.set(EnableProgress::CreatingBackup);
95                    recovery.mark_backup_as_enabled().await?;
96                    recovery.client.encryption().backups().create().await?;
97                }
98            }
99
100            progress.set(EnableProgress::CreatingRecoveryKey);
101
102            let secret_storage = recovery.client.encryption().secret_storage();
103
104            let create_store = if let Some(passphrase) = passphrase {
105                secret_storage.create_secret_store().with_passphrase(passphrase)
106            } else {
107                secret_storage.create_secret_store()
108            };
109
110            let store: SecretStore = create_store.await?;
111
112            if wait_for_backups_upload {
113                let backups = recovery.client.encryption().backups();
114                let upload_future = backups.wait_for_steady_state();
115                let upload_progress = upload_future.subscribe_to_progress();
116
117                #[allow(unused_variables)]
118                let progress_task = matrix_sdk_common::executor::spawn({
119                    let progress = progress.clone();
120                    async move {
121                        pin_mut!(upload_progress);
122
123                        while let Some(update) = upload_progress.next().await {
124                            match update {
125                                Ok(UploadState::Uploading(count)) => {
126                                    progress.set(EnableProgress::BackingUp(count));
127                                }
128                                Ok(UploadState::Done | UploadState::Error) | Err(_) => break,
129                                _ => (),
130                            }
131                        }
132                    }
133                });
134
135                if let Err(e) = upload_future.await {
136                    warn!("Couldn't upload all the room keys to the backup: {e:?}");
137                    progress.set(EnableProgress::RoomKeyUploadError);
138                }
139
140                #[cfg(not(target_arch = "wasm32"))]
141                progress_task.abort();
142            } else {
143                recovery.client.encryption().backups().maybe_trigger_backup();
144            }
145
146            let key = store.secret_storage_key();
147
148            progress.set(EnableProgress::Done { recovery_key: key });
149            recovery.update_recovery_state().await?;
150
151            Ok(store.secret_storage_key())
152        };
153
154        Box::pin(future.instrument(tracing_span))
155    }
156}
157
158/// Named future for the [`Recovery::reset_key()`] method.
159#[derive(Debug)]
160pub struct Reset<'a> {
161    pub(super) recovery: &'a Recovery,
162    pub(super) passphrase: Option<&'a str>,
163    tracing_span: Span,
164}
165
166impl<'a> Reset<'a> {
167    pub(super) fn new(recovery: &'a Recovery) -> Self {
168        Self { recovery, passphrase: None, tracing_span: Span::current() }
169    }
170
171    /// In addition to the recovery key the [`Recovery::reset_key()`] method
172    /// returns, allow this passphrase to be used for the
173    /// [`Recovery::recover()`] method.
174    pub fn with_passphrase(mut self, passphrase: &'a str) -> Self {
175        self.passphrase = Some(passphrase);
176
177        self
178    }
179}
180
181impl<'a> IntoFuture for Reset<'a> {
182    type Output = Result<String>;
183    boxed_into_future!(extra_bounds: 'a);
184
185    fn into_future(self) -> Self::IntoFuture {
186        let Self { recovery, passphrase, tracing_span } = self;
187
188        let future = async move {
189            let secret_storage = recovery.client.encryption().secret_storage();
190
191            let create_store = if let Some(passphrase) = passphrase {
192                secret_storage.create_secret_store().with_passphrase(passphrase)
193            } else {
194                secret_storage.create_secret_store()
195            };
196
197            let store: SecretStore = create_store.await?;
198            recovery.update_recovery_state().await?;
199
200            Ok(store.secret_storage_key())
201        };
202
203        Box::pin(future.instrument(tracing_span))
204    }
205}
206
207/// Named future for the [`Recovery::recover_and_reset()`] method.
208#[derive(Debug)]
209pub struct RecoverAndReset<'a> {
210    pub(super) recovery: &'a Recovery,
211    pub(super) old_recovery_key: &'a str,
212    pub(super) passphrase: Option<&'a str>,
213    tracing_span: Span,
214}
215
216impl<'a> RecoverAndReset<'a> {
217    pub(super) fn new(recovery: &'a Recovery, old_recovery_key: &'a str) -> Self {
218        Self { recovery, old_recovery_key, passphrase: None, tracing_span: Span::current() }
219    }
220
221    /// In addition to the new recovery key the
222    /// [`Recovery::recover_and_reset()`] method returns, allow this
223    /// passphrase to be used for the [`Recovery::recover()`] method.
224    pub fn with_passphrase(mut self, passphrase: &'a str) -> Self {
225        self.passphrase = Some(passphrase);
226
227        self
228    }
229}
230
231impl<'a> IntoFuture for RecoverAndReset<'a> {
232    type Output = Result<String>;
233    boxed_into_future!(extra_bounds: 'a);
234
235    fn into_future(self) -> Self::IntoFuture {
236        let Self { recovery, old_recovery_key, passphrase, tracing_span } = self;
237
238        let future = async move {
239            recovery.recover(old_recovery_key).await?;
240
241            let reset = if let Some(passphrase) = passphrase {
242                recovery.reset_key().with_passphrase(passphrase)
243            } else {
244                recovery.reset_key()
245            };
246
247            reset.await
248        };
249
250        Box::pin(future.instrument(tracing_span))
251    }
252}