matrix_sdk_ui/encryption_sync_service.rs
1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15//! Encryption Sync API.
16//!
17//! The encryption sync API is a high-level helper that is designed to take care
18//! of handling the synchronization of encryption and to-device events (required
19//! for encryption), be they received within the app or within a dedicated
20//! extension process (e.g. the [NSE] process on iOS devices).
21//!
22//! Under the hood, this uses a sliding sync instance configured with no lists,
23//! but that enables the e2ee and to-device extensions, so that it can both
24//! handle encryption and manage encryption keys; that's sufficient to decrypt
25//! messages received in the notification processes.
26//!
27//! [NSE]: https://developer.apple.com/documentation/usernotifications/unnotificationserviceextension
28
29use std::{pin::Pin, time::Duration};
30
31use async_stream::stream;
32use futures_core::stream::Stream;
33use futures_util::{StreamExt, pin_mut};
34use matrix_sdk::{Client, LEASE_DURATION_MS, SlidingSync, sleep::sleep};
35use ruma::{api::client::sync::sync_events::v5 as http, assign};
36use tokio::sync::OwnedMutexGuard;
37use tracing::{Span, debug, instrument, trace};
38
39/// Unit type representing a permit to *use* an [`EncryptionSyncService`].
40///
41/// This must be created once in the whole application's lifetime, wrapped in a
42/// mutex. Using an `EncryptionSyncService` must then lock that mutex in an
43/// owned way, so that there's at most a single `EncryptionSyncService` running
44/// at any time in the entire app.
45pub struct EncryptionSyncPermit(());
46
47impl EncryptionSyncPermit {
48 pub(crate) fn new() -> Self {
49 Self(())
50 }
51}
52
53impl EncryptionSyncPermit {
54 /// Test-only.
55 #[doc(hidden)]
56 pub fn new_for_testing() -> Self {
57 Self::new()
58 }
59}
60
61/// Should the `EncryptionSyncService` make use of locking?
62pub enum WithLocking {
63 Yes,
64 No,
65}
66
67impl From<bool> for WithLocking {
68 fn from(value: bool) -> Self {
69 if value { Self::Yes } else { Self::No }
70 }
71}
72
73/// High-level helper for synchronizing encryption events using sliding sync.
74///
75/// See the module's documentation for more details.
76pub struct EncryptionSyncService {
77 client: Client,
78 sliding_sync: SlidingSync,
79 with_locking: bool,
80}
81
82impl EncryptionSyncService {
83 /// Creates a new instance of a `EncryptionSyncService`.
84 ///
85 /// This will create and manage an instance of [`matrix_sdk::SlidingSync`].
86 pub async fn new(
87 client: Client,
88 poll_and_network_timeouts: Option<(Duration, Duration)>,
89 with_locking: WithLocking,
90 ) -> Result<Self, Error> {
91 // Make sure to use the same `conn_id` and caching store identifier, whichever
92 // process is running this sliding sync. There must be at most one
93 // sliding sync instance that enables the e2ee and to-device extensions.
94 let mut builder = client
95 .sliding_sync("encryption")
96 .map_err(Error::SlidingSync)?
97 //.share_pos() // TODO(bnjbvr) This is racy, needs cross-process lock :')
98 .with_to_device_extension(
99 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
100 )
101 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}));
102
103 if let Some((poll_timeout, network_timeout)) = poll_and_network_timeouts {
104 builder = builder.poll_timeout(poll_timeout).network_timeout(network_timeout);
105 }
106
107 let sliding_sync = builder.build().await.map_err(Error::SlidingSync)?;
108
109 let with_locking = matches!(with_locking, WithLocking::Yes);
110
111 if with_locking {
112 // Gently try to enable the cross-process lock on behalf of the user.
113 match client
114 .encryption()
115 .enable_cross_process_store_lock(
116 client.cross_process_store_locks_holder_name().to_owned(),
117 )
118 .await
119 {
120 Ok(()) | Err(matrix_sdk::Error::BadCryptoStoreState) => {
121 // Ignore; we've already set the crypto store lock to
122 // something, and that's sufficient as
123 // long as it uniquely identifies the process.
124 }
125 Err(err) => {
126 // Any other error is fatal
127 return Err(Error::ClientError(err));
128 }
129 }
130 }
131
132 Ok(Self { client, sliding_sync, with_locking })
133 }
134
135 /// Runs an `EncryptionSyncService` loop for a fixed number of iterations.
136 ///
137 /// This runs for the given number of iterations, or less than that, if it
138 /// stops earlier or could not acquire a cross-process lock (if configured
139 /// with it).
140 ///
141 /// Note: the [`EncryptionSyncPermit`] parameter ensures that there's at
142 /// most one encryption sync running at any time. See its documentation
143 /// for more details.
144 #[instrument(skip_all, fields(store_generation))]
145 pub async fn run_fixed_iterations(
146 self,
147 num_iterations: u8,
148 _permit: OwnedMutexGuard<EncryptionSyncPermit>,
149 ) -> Result<(), Error> {
150 let sync = self.sliding_sync.sync();
151
152 pin_mut!(sync);
153
154 let lock_guard = if self.with_locking {
155 let mut lock_guard =
156 self.client.encryption().try_lock_store_once().await.map_err(Error::LockError)?;
157
158 // Try to take the lock at the beginning; if it's busy, that means that another
159 // process already holds onto it, and as such we won't try to run the
160 // encryption sync loop at all (because we expect the other process to
161 // do so).
162
163 if lock_guard.is_none() {
164 // If we can't acquire the cross-process lock on the first attempt,
165 // that means the main process is running, or its lease hasn't expired
166 // yet. In case it's the latter, wait a bit and retry.
167 tracing::debug!(
168 "Lock was already taken, and we're not the main loop; retrying in {}ms...",
169 LEASE_DURATION_MS
170 );
171
172 sleep(Duration::from_millis(LEASE_DURATION_MS.into())).await;
173
174 lock_guard = self
175 .client
176 .encryption()
177 .try_lock_store_once()
178 .await
179 .map_err(Error::LockError)?;
180
181 if lock_guard.is_none() {
182 tracing::debug!(
183 "Second attempt at locking outside the main app failed, aborting."
184 );
185 return Ok(());
186 }
187 }
188
189 lock_guard
190 } else {
191 None
192 };
193
194 Span::current().record("store_generation", lock_guard.map(|guard| guard.generation()));
195
196 for _ in 0..num_iterations {
197 match sync.next().await {
198 Some(Ok(update_summary)) => {
199 // This API is only concerned with the e2ee and to-device extensions.
200 // Warn if anything weird has been received from the homeserver.
201 if !update_summary.lists.is_empty() {
202 debug!(?update_summary.lists, "unexpected non-empty list of lists in encryption sync API");
203 }
204 if !update_summary.rooms.is_empty() {
205 debug!(?update_summary.rooms, "unexpected non-empty list of rooms in encryption sync API");
206 }
207
208 // Cool cool, let's do it again.
209 trace!("Encryption sync received an update!");
210 }
211
212 Some(Err(err)) => {
213 trace!("Encryption sync stopped because of an error: {err:#}");
214 return Err(Error::SlidingSync(err));
215 }
216
217 None => {
218 trace!("Encryption sync properly terminated.");
219 break;
220 }
221 }
222 }
223
224 Ok(())
225 }
226
227 /// Start synchronization.
228 ///
229 /// This should be regularly polled.
230 ///
231 /// Note: the [`EncryptionSyncPermit`] parameter ensures that there's at
232 /// most one encryption sync running at any time. See its documentation
233 /// for more details.
234 #[doc(hidden)] // Only public for testing purposes.
235 pub fn sync(
236 &self,
237 _permit: OwnedMutexGuard<EncryptionSyncPermit>,
238 ) -> impl Stream<Item = Result<(), Error>> + '_ {
239 stream!({
240 let sync = self.sliding_sync.sync();
241
242 pin_mut!(sync);
243
244 loop {
245 match self.next_sync_with_lock(&mut sync).await? {
246 Some(Ok(update_summary)) => {
247 // This API is only concerned with the e2ee and to-device extensions.
248 // Warn if anything weird has been received from the homeserver.
249 if !update_summary.lists.is_empty() {
250 debug!(?update_summary.lists, "unexpected non-empty list of lists in encryption sync API");
251 }
252 if !update_summary.rooms.is_empty() {
253 debug!(?update_summary.rooms, "unexpected non-empty list of rooms in encryption sync API");
254 }
255
256 // Cool cool, let's do it again.
257 trace!("Encryption sync received an update!");
258 yield Ok(());
259 continue;
260 }
261
262 Some(Err(err)) => {
263 trace!("Encryption sync stopped because of an error: {err:#}");
264 yield Err(Error::SlidingSync(err));
265 break;
266 }
267
268 None => {
269 trace!("Encryption sync properly terminated.");
270 break;
271 }
272 }
273 }
274 })
275 }
276
277 /// Helper function for `sync`. Take the cross-process store lock, and call
278 /// `sync.next()`
279 #[instrument(skip_all, fields(store_generation))]
280 async fn next_sync_with_lock<Item>(
281 &self,
282 sync: &mut Pin<&mut impl Stream<Item = Item>>,
283 ) -> Result<Option<Item>, Error> {
284 let guard = if self.with_locking {
285 self.client.encryption().spin_lock_store(Some(60000)).await.map_err(Error::LockError)?
286 } else {
287 None
288 };
289
290 Span::current().record("store_generation", guard.map(|guard| guard.generation()));
291
292 Ok(sync.next().await)
293 }
294
295 /// Requests that the underlying sliding sync be stopped.
296 ///
297 /// This will unlock the cross-process lock, if taken.
298 pub(crate) fn stop_sync(&self) -> Result<(), Error> {
299 // Stopping the sync loop will cause the next `next()` call to return `None`, so
300 // this will also release the cross-process lock automatically.
301 self.sliding_sync.stop_sync().map_err(Error::SlidingSync)?;
302
303 Ok(())
304 }
305
306 pub(crate) async fn expire_sync_session(&self) {
307 self.sliding_sync.expire_session().await;
308 }
309}
310
311/// Errors for the [`EncryptionSyncService`].
312#[derive(Debug, thiserror::Error)]
313pub enum Error {
314 #[error("Something wrong happened in sliding sync: {0:#}")]
315 SlidingSync(matrix_sdk::Error),
316
317 #[error("Locking failed: {0:#}")]
318 LockError(matrix_sdk::Error),
319
320 #[error(transparent)]
321 ClientError(matrix_sdk::Error),
322}