matrix_sdk/encryption/recovery/
futures.rs1use std::future::IntoFuture;
18
19use futures_core::Stream;
20use futures_util::{pin_mut, StreamExt};
21use matrix_sdk_common::boxed_into_future;
22use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
23use tracing::{warn, Instrument, Span};
24
25use super::{EnableProgress, Recovery, RecoveryError, Result};
26use crate::{
27 encryption::{backups::UploadState, secret_storage::SecretStore},
28 utils::ChannelObservable,
29};
30
31#[derive(Debug)]
33pub struct Enable<'a> {
34 pub(super) recovery: &'a Recovery,
35 pub(super) progress: ChannelObservable<EnableProgress>,
36 pub(super) wait_for_backups_upload: bool,
37 pub(super) passphrase: Option<&'a str>,
38 tracing_span: Span,
39}
40
41impl<'a> Enable<'a> {
42 pub(super) fn new(recovery: &'a Recovery) -> Self {
43 Self {
44 recovery,
45 progress: Default::default(),
46 wait_for_backups_upload: false,
47 passphrase: None,
48 tracing_span: Span::current(),
49 }
50 }
51
52 pub fn subscribe_to_progress(
54 &self,
55 ) -> impl Stream<Item = Result<EnableProgress, BroadcastStreamRecvError>> {
56 self.progress.subscribe()
57 }
58
59 pub fn wait_for_backups_to_upload(mut self) -> Self {
67 self.wait_for_backups_upload = true;
68
69 self
70 }
71
72 pub fn with_passphrase(mut self, passphrase: &'a str) -> Self {
76 self.passphrase = Some(passphrase);
77
78 self
79 }
80}
81
82impl<'a> IntoFuture for Enable<'a> {
83 type Output = Result<String>;
84 boxed_into_future!(extra_bounds: 'a);
85
86 fn into_future(self) -> Self::IntoFuture {
87 let Self { recovery, progress, wait_for_backups_upload, passphrase, tracing_span } = self;
88
89 let future = async move {
90 if !recovery.client.encryption().backups().are_enabled().await {
91 if recovery.client.encryption().backups().fetch_exists_on_server().await? {
92 return Err(RecoveryError::BackupExistsOnServer);
93 } else {
94 progress.set(EnableProgress::CreatingBackup);
95 recovery.mark_backup_as_enabled().await?;
96 recovery.client.encryption().backups().create().await?;
97 }
98 }
99
100 progress.set(EnableProgress::CreatingRecoveryKey);
101
102 let secret_storage = recovery.client.encryption().secret_storage();
103
104 let create_store = if let Some(passphrase) = passphrase {
105 secret_storage.create_secret_store().with_passphrase(passphrase)
106 } else {
107 secret_storage.create_secret_store()
108 };
109
110 let store: SecretStore = create_store.await?;
111
112 if wait_for_backups_upload {
113 let backups = recovery.client.encryption().backups();
114 let upload_future = backups.wait_for_steady_state();
115 let upload_progress = upload_future.subscribe_to_progress();
116
117 #[allow(unused_variables)]
118 let progress_task = matrix_sdk_common::executor::spawn({
119 let progress = progress.clone();
120 async move {
121 pin_mut!(upload_progress);
122
123 while let Some(update) = upload_progress.next().await {
124 match update {
125 Ok(UploadState::Uploading(count)) => {
126 progress.set(EnableProgress::BackingUp(count));
127 }
128 Ok(UploadState::Done | UploadState::Error) | Err(_) => break,
129 _ => (),
130 }
131 }
132 }
133 });
134
135 if let Err(e) = upload_future.await {
136 warn!("Couldn't upload all the room keys to the backup: {e:?}");
137 progress.set(EnableProgress::RoomKeyUploadError);
138 }
139
140 #[cfg(not(target_arch = "wasm32"))]
141 progress_task.abort();
142 } else {
143 recovery.client.encryption().backups().maybe_trigger_backup();
144 }
145
146 let key = store.secret_storage_key();
147
148 progress.set(EnableProgress::Done { recovery_key: key });
149 recovery.update_recovery_state().await?;
150
151 Ok(store.secret_storage_key())
152 };
153
154 Box::pin(future.instrument(tracing_span))
155 }
156}
157
158#[derive(Debug)]
160pub struct Reset<'a> {
161 pub(super) recovery: &'a Recovery,
162 pub(super) passphrase: Option<&'a str>,
163 tracing_span: Span,
164}
165
166impl<'a> Reset<'a> {
167 pub(super) fn new(recovery: &'a Recovery) -> Self {
168 Self { recovery, passphrase: None, tracing_span: Span::current() }
169 }
170
171 pub fn with_passphrase(mut self, passphrase: &'a str) -> Self {
175 self.passphrase = Some(passphrase);
176
177 self
178 }
179}
180
181impl<'a> IntoFuture for Reset<'a> {
182 type Output = Result<String>;
183 boxed_into_future!(extra_bounds: 'a);
184
185 fn into_future(self) -> Self::IntoFuture {
186 let Self { recovery, passphrase, tracing_span } = self;
187
188 let future = async move {
189 let secret_storage = recovery.client.encryption().secret_storage();
190
191 let create_store = if let Some(passphrase) = passphrase {
192 secret_storage.create_secret_store().with_passphrase(passphrase)
193 } else {
194 secret_storage.create_secret_store()
195 };
196
197 let store: SecretStore = create_store.await?;
198 recovery.update_recovery_state().await?;
199
200 Ok(store.secret_storage_key())
201 };
202
203 Box::pin(future.instrument(tracing_span))
204 }
205}
206
207#[derive(Debug)]
209pub struct RecoverAndReset<'a> {
210 pub(super) recovery: &'a Recovery,
211 pub(super) old_recovery_key: &'a str,
212 pub(super) passphrase: Option<&'a str>,
213 tracing_span: Span,
214}
215
216impl<'a> RecoverAndReset<'a> {
217 pub(super) fn new(recovery: &'a Recovery, old_recovery_key: &'a str) -> Self {
218 Self { recovery, old_recovery_key, passphrase: None, tracing_span: Span::current() }
219 }
220
221 pub fn with_passphrase(mut self, passphrase: &'a str) -> Self {
225 self.passphrase = Some(passphrase);
226
227 self
228 }
229}
230
231impl<'a> IntoFuture for RecoverAndReset<'a> {
232 type Output = Result<String>;
233 boxed_into_future!(extra_bounds: 'a);
234
235 fn into_future(self) -> Self::IntoFuture {
236 let Self { recovery, old_recovery_key, passphrase, tracing_span } = self;
237
238 let future = async move {
239 recovery.recover(old_recovery_key).await?;
240
241 let reset = if let Some(passphrase) = passphrase {
242 recovery.reset_key().with_passphrase(passphrase)
243 } else {
244 recovery.reset_key()
245 };
246
247 reset.await
248 };
249
250 Box::pin(future.instrument(tracing_span))
251 }
252}