matrix_sdk_ui/encryption_sync_service.rs
1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15//! Encryption Sync API.
16//!
17//! The encryption sync API is a high-level helper that is designed to take care
18//! of handling the synchronization of encryption and to-device events (required
19//! for encryption), be they received within the app or within a dedicated
20//! extension process (e.g. the [NSE] process on iOS devices).
21//!
22//! Under the hood, this uses a sliding sync instance configured with no lists,
23//! but that enables the e2ee and to-device extensions, so that it can both
24//! handle encryption and manage encryption keys; that's sufficient to decrypt
25//! messages received in the notification processes.
26//!
27//! [NSE]: https://developer.apple.com/documentation/usernotifications/unnotificationserviceextension
28
29use std::{pin::Pin, time::Duration};
30
31use async_stream::stream;
32use futures_core::stream::Stream;
33use futures_util::{StreamExt, pin_mut};
34use matrix_sdk::{Client, LEASE_DURATION_MS, SlidingSync, sleep::sleep};
35use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
36use ruma::{api::client::sync::sync_events::v5 as http, assign};
37use tokio::sync::OwnedMutexGuard;
38use tracing::{Span, debug, instrument, trace};
39
40/// Unit type representing a permit to *use* an [`EncryptionSyncService`].
41///
42/// This must be created once in the whole application's lifetime, wrapped in a
43/// mutex. Using an `EncryptionSyncService` must then lock that mutex in an
44/// owned way, so that there's at most a single `EncryptionSyncService` running
45/// at any time in the entire app.
46pub struct EncryptionSyncPermit(());
47
48impl EncryptionSyncPermit {
49 pub(crate) fn new() -> Self {
50 Self(())
51 }
52}
53
54impl EncryptionSyncPermit {
55 /// Test-only.
56 #[doc(hidden)]
57 pub fn new_for_testing() -> Self {
58 Self::new()
59 }
60}
61
62/// High-level helper for synchronizing encryption events using sliding sync.
63///
64/// See the module's documentation for more details.
65pub struct EncryptionSyncService {
66 client: Client,
67 sliding_sync: SlidingSync,
68}
69
70impl EncryptionSyncService {
71 /// Creates a new instance of a `EncryptionSyncService`.
72 ///
73 /// This will create and manage an instance of [`matrix_sdk::SlidingSync`].
74 pub async fn new(
75 client: Client,
76 poll_and_network_timeouts: Option<(Duration, Duration)>,
77 ) -> Result<Self, Error> {
78 // Make sure to use the same `conn_id` and caching store identifier, whichever
79 // process is running this sliding sync. There must be at most one
80 // sliding sync instance that enables the e2ee and to-device extensions.
81 let mut builder = client
82 .sliding_sync("encryption")
83 .map_err(Error::SlidingSync)?
84 //.share_pos() // TODO(bnjbvr) This is racy, needs cross-process lock :')
85 .with_to_device_extension(
86 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
87 )
88 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}));
89
90 if let Some((poll_timeout, network_timeout)) = poll_and_network_timeouts {
91 builder = builder.poll_timeout(poll_timeout).network_timeout(network_timeout);
92 }
93
94 let sliding_sync = builder.build().await.map_err(Error::SlidingSync)?;
95
96 if let CrossProcessLockConfig::MultiProcess { holder_name } =
97 client.cross_process_lock_config()
98 {
99 // Gently try to enable the cross-process lock on behalf of the user.
100 match client.encryption().enable_cross_process_store_lock(holder_name.clone()).await {
101 Ok(()) | Err(matrix_sdk::Error::BadCryptoStoreState) => {
102 // Ignore; we've already set the crypto store lock to
103 // something, and that's sufficient as
104 // long as it uniquely identifies the process.
105 }
106 Err(err) => {
107 // Any other error is fatal
108 return Err(Error::ClientError(err));
109 }
110 }
111 }
112
113 Ok(Self { client, sliding_sync })
114 }
115
116 /// Runs an `EncryptionSyncService` loop for a fixed number of iterations.
117 ///
118 /// This runs for the given number of iterations, or less than that, if it
119 /// stops earlier or could not acquire a cross-process lock (if configured
120 /// with it).
121 ///
122 /// Note: the [`EncryptionSyncPermit`] parameter ensures that there's at
123 /// most one encryption sync running at any time. See its documentation
124 /// for more details.
125 #[instrument(skip_all, fields(store_generation))]
126 pub async fn run_fixed_iterations(
127 self,
128 num_iterations: u8,
129 _permit: OwnedMutexGuard<EncryptionSyncPermit>,
130 ) -> Result<(), Error> {
131 let sync = self.sliding_sync.sync();
132
133 pin_mut!(sync);
134
135 let lock_guard = if let CrossProcessLockConfig::MultiProcess { .. } =
136 self.client.cross_process_lock_config()
137 {
138 let mut lock_guard =
139 self.client.encryption().try_lock_store_once().await.map_err(Error::LockError)?;
140
141 // Try to take the lock at the beginning; if it's busy, that means that another
142 // process already holds onto it, and as such we won't try to run the
143 // encryption sync loop at all (because we expect the other process to
144 // do so).
145
146 if lock_guard.is_none() {
147 // If we can't acquire the cross-process lock on the first attempt,
148 // that means the main process is running, or its lease hasn't expired
149 // yet. In case it's the latter, wait a bit and retry.
150 tracing::debug!(
151 "Lock was already taken, and we're not the main loop; retrying in {}ms...",
152 LEASE_DURATION_MS
153 );
154
155 sleep(Duration::from_millis(LEASE_DURATION_MS.into())).await;
156
157 lock_guard = self
158 .client
159 .encryption()
160 .try_lock_store_once()
161 .await
162 .map_err(Error::LockError)?;
163
164 if lock_guard.is_none() {
165 tracing::debug!(
166 "Second attempt at locking outside the main app failed, aborting."
167 );
168 return Ok(());
169 }
170 }
171
172 lock_guard
173 } else {
174 None
175 };
176
177 Span::current().record("store_generation", lock_guard.map(|guard| guard.generation()));
178
179 for _ in 0..num_iterations {
180 match sync.next().await {
181 Some(Ok(update_summary)) => {
182 // This API is only concerned with the e2ee and to-device extensions.
183 // Warn if anything weird has been received from the homeserver.
184 if !update_summary.lists.is_empty() {
185 debug!(?update_summary.lists, "unexpected non-empty list of lists in encryption sync API");
186 }
187 if !update_summary.rooms.is_empty() {
188 debug!(?update_summary.rooms, "unexpected non-empty list of rooms in encryption sync API");
189 }
190
191 // Cool cool, let's do it again.
192 trace!("Encryption sync received an update!");
193 }
194
195 Some(Err(err)) => {
196 trace!("Encryption sync stopped because of an error: {err:#}");
197 return Err(Error::SlidingSync(err));
198 }
199
200 None => {
201 trace!("Encryption sync properly terminated.");
202 break;
203 }
204 }
205 }
206
207 Ok(())
208 }
209
210 /// Start synchronization.
211 ///
212 /// This should be regularly polled.
213 ///
214 /// Note: the [`EncryptionSyncPermit`] parameter ensures that there's at
215 /// most one encryption sync running at any time. See its documentation
216 /// for more details.
217 #[doc(hidden)] // Only public for testing purposes.
218 pub fn sync(
219 &self,
220 _permit: OwnedMutexGuard<EncryptionSyncPermit>,
221 ) -> impl Stream<Item = Result<(), Error>> + '_ {
222 stream!({
223 let sync = self.sliding_sync.sync();
224
225 pin_mut!(sync);
226
227 loop {
228 match self.next_sync_with_lock(&mut sync).await? {
229 Some(Ok(update_summary)) => {
230 // This API is only concerned with the e2ee and to-device extensions.
231 // Warn if anything weird has been received from the homeserver.
232 if !update_summary.lists.is_empty() {
233 debug!(?update_summary.lists, "unexpected non-empty list of lists in encryption sync API");
234 }
235 if !update_summary.rooms.is_empty() {
236 debug!(?update_summary.rooms, "unexpected non-empty list of rooms in encryption sync API");
237 }
238
239 // Cool cool, let's do it again.
240 trace!("Encryption sync received an update!");
241 yield Ok(());
242 continue;
243 }
244
245 Some(Err(err)) => {
246 trace!("Encryption sync stopped because of an error: {err:#}");
247 yield Err(Error::SlidingSync(err));
248 break;
249 }
250
251 None => {
252 trace!("Encryption sync properly terminated.");
253 break;
254 }
255 }
256 }
257 })
258 }
259
260 /// Helper function for `sync`. Take the cross-process store lock, and call
261 /// `sync.next()`
262 #[instrument(skip_all, fields(store_generation))]
263 async fn next_sync_with_lock<Item>(
264 &self,
265 sync: &mut Pin<&mut impl Stream<Item = Item>>,
266 ) -> Result<Option<Item>, Error> {
267 let guard = if let CrossProcessLockConfig::MultiProcess { .. } =
268 self.client.cross_process_lock_config()
269 {
270 self.client.encryption().spin_lock_store(Some(60000)).await.map_err(Error::LockError)?
271 } else {
272 None
273 };
274
275 Span::current().record("store_generation", guard.map(|guard| guard.generation()));
276
277 Ok(sync.next().await)
278 }
279
280 /// Requests that the underlying sliding sync be stopped.
281 ///
282 /// This will unlock the cross-process lock, if taken.
283 pub(crate) fn stop_sync(&self) -> Result<(), Error> {
284 // Stopping the sync loop will cause the next `next()` call to return `None`, so
285 // this will also release the cross-process lock automatically.
286 self.sliding_sync.stop_sync().map_err(Error::SlidingSync)?;
287
288 Ok(())
289 }
290
291 pub(crate) async fn expire_sync_session(&self) {
292 self.sliding_sync.expire_session().await;
293 }
294}
295
296/// Errors for the [`EncryptionSyncService`].
297#[derive(Debug, thiserror::Error)]
298pub enum Error {
299 #[error("Something wrong happened in sliding sync: {0:#}")]
300 SlidingSync(matrix_sdk::Error),
301
302 #[error("Locking failed: {0:#}")]
303 LockError(matrix_sdk::Error),
304
305 #[error(transparent)]
306 ClientError(matrix_sdk::Error),
307}