matrix_sdk_ui/
encryption_sync_service.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15//! Encryption Sync API.
16//!
17//! The encryption sync API is a high-level helper that is designed to take care
18//! of handling the synchronization of encryption and to-device events (required
19//! for encryption), be they received within the app or within a dedicated
20//! extension process (e.g. the [NSE] process on iOS devices).
21//!
22//! Under the hood, this uses a sliding sync instance configured with no lists,
23//! but that enables the e2ee and to-device extensions, so that it can both
24//! handle encryption and manage encryption keys; that's sufficient to decrypt
25//! messages received in the notification processes.
26//!
27//! [NSE]: https://developer.apple.com/documentation/usernotifications/unnotificationserviceextension
28
29use std::{pin::Pin, time::Duration};
30
31use async_stream::stream;
32use futures_core::stream::Stream;
33use futures_util::{pin_mut, StreamExt};
34use matrix_sdk::{sleep::sleep, Client, SlidingSync, LEASE_DURATION_MS};
35use ruma::{api::client::sync::sync_events::v5 as http, assign};
36use tokio::sync::OwnedMutexGuard;
37use tracing::{debug, instrument, trace, Span};
38
39/// Unit type representing a permit to *use* an [`EncryptionSyncService`].
40///
41/// This must be created once in the whole application's lifetime, wrapped in a
42/// mutex. Using an `EncryptionSyncService` must then lock that mutex in an
43/// owned way, so that there's at most a single `EncryptionSyncService` running
44/// at any time in the entire app.
45pub struct EncryptionSyncPermit(());
46
47impl EncryptionSyncPermit {
48    pub(crate) fn new() -> Self {
49        Self(())
50    }
51}
52
53impl EncryptionSyncPermit {
54    /// Test-only.
55    #[doc(hidden)]
56    pub fn new_for_testing() -> Self {
57        Self::new()
58    }
59}
60
61/// Should the `EncryptionSyncService` make use of locking?
62pub enum WithLocking {
63    Yes,
64    No,
65}
66
67impl From<bool> for WithLocking {
68    fn from(value: bool) -> Self {
69        if value {
70            Self::Yes
71        } else {
72            Self::No
73        }
74    }
75}
76
77/// High-level helper for synchronizing encryption events using sliding sync.
78///
79/// See the module's documentation for more details.
80pub struct EncryptionSyncService {
81    client: Client,
82    sliding_sync: SlidingSync,
83    with_locking: bool,
84}
85
86impl EncryptionSyncService {
87    /// Creates a new instance of a `EncryptionSyncService`.
88    ///
89    /// This will create and manage an instance of [`matrix_sdk::SlidingSync`].
90    pub async fn new(
91        client: Client,
92        poll_and_network_timeouts: Option<(Duration, Duration)>,
93        with_locking: WithLocking,
94    ) -> Result<Self, Error> {
95        // Make sure to use the same `conn_id` and caching store identifier, whichever
96        // process is running this sliding sync. There must be at most one
97        // sliding sync instance that enables the e2ee and to-device extensions.
98        let mut builder = client
99            .sliding_sync("encryption")
100            .map_err(Error::SlidingSync)?
101            //.share_pos() // TODO(bnjbvr) This is racy, needs cross-process lock :')
102            .with_to_device_extension(
103                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
104            )
105            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}));
106
107        if let Some((poll_timeout, network_timeout)) = poll_and_network_timeouts {
108            builder = builder.poll_timeout(poll_timeout).network_timeout(network_timeout);
109        }
110
111        let sliding_sync = builder.build().await.map_err(Error::SlidingSync)?;
112
113        let with_locking = matches!(with_locking, WithLocking::Yes);
114
115        if with_locking {
116            // Gently try to enable the cross-process lock on behalf of the user.
117            match client
118                .encryption()
119                .enable_cross_process_store_lock(
120                    client.cross_process_store_locks_holder_name().to_owned(),
121                )
122                .await
123            {
124                Ok(()) | Err(matrix_sdk::Error::BadCryptoStoreState) => {
125                    // Ignore; we've already set the crypto store lock to
126                    // something, and that's sufficient as
127                    // long as it uniquely identifies the process.
128                }
129                Err(err) => {
130                    // Any other error is fatal
131                    return Err(Error::ClientError(err));
132                }
133            };
134        }
135
136        Ok(Self { client, sliding_sync, with_locking })
137    }
138
139    /// Runs an `EncryptionSyncService` loop for a fixed number of iterations.
140    ///
141    /// This runs for the given number of iterations, or less than that, if it
142    /// stops earlier or could not acquire a cross-process lock (if configured
143    /// with it).
144    ///
145    /// Note: the [`EncryptionSyncPermit`] parameter ensures that there's at
146    /// most one encryption sync running at any time. See its documentation
147    /// for more details.
148    #[instrument(skip_all, fields(store_generation))]
149    pub async fn run_fixed_iterations(
150        self,
151        num_iterations: u8,
152        _permit: OwnedMutexGuard<EncryptionSyncPermit>,
153    ) -> Result<(), Error> {
154        let sync = self.sliding_sync.sync();
155
156        pin_mut!(sync);
157
158        let lock_guard = if self.with_locking {
159            let mut lock_guard =
160                self.client.encryption().try_lock_store_once().await.map_err(Error::LockError)?;
161
162            // Try to take the lock at the beginning; if it's busy, that means that another
163            // process already holds onto it, and as such we won't try to run the
164            // encryption sync loop at all (because we expect the other process to
165            // do so).
166
167            if lock_guard.is_none() {
168                // If we can't acquire the cross-process lock on the first attempt,
169                // that means the main process is running, or its lease hasn't expired
170                // yet. In case it's the latter, wait a bit and retry.
171                tracing::debug!(
172                    "Lock was already taken, and we're not the main loop; retrying in {}ms...",
173                    LEASE_DURATION_MS
174                );
175
176                sleep(Duration::from_millis(LEASE_DURATION_MS.into())).await;
177
178                lock_guard = self
179                    .client
180                    .encryption()
181                    .try_lock_store_once()
182                    .await
183                    .map_err(Error::LockError)?;
184
185                if lock_guard.is_none() {
186                    tracing::debug!(
187                        "Second attempt at locking outside the main app failed, aborting."
188                    );
189                    return Ok(());
190                }
191            }
192
193            lock_guard
194        } else {
195            None
196        };
197
198        Span::current().record("store_generation", lock_guard.map(|guard| guard.generation()));
199
200        for _ in 0..num_iterations {
201            match sync.next().await {
202                Some(Ok(update_summary)) => {
203                    // This API is only concerned with the e2ee and to-device extensions.
204                    // Warn if anything weird has been received from the homeserver.
205                    if !update_summary.lists.is_empty() {
206                        debug!(?update_summary.lists, "unexpected non-empty list of lists in encryption sync API");
207                    }
208                    if !update_summary.rooms.is_empty() {
209                        debug!(?update_summary.rooms, "unexpected non-empty list of rooms in encryption sync API");
210                    }
211
212                    // Cool cool, let's do it again.
213                    trace!("Encryption sync received an update!");
214                }
215
216                Some(Err(err)) => {
217                    trace!("Encryption sync stopped because of an error: {err:#}");
218                    return Err(Error::SlidingSync(err));
219                }
220
221                None => {
222                    trace!("Encryption sync properly terminated.");
223                    break;
224                }
225            }
226        }
227
228        Ok(())
229    }
230
231    /// Start synchronization.
232    ///
233    /// This should be regularly polled.
234    ///
235    /// Note: the [`EncryptionSyncPermit`] parameter ensures that there's at
236    /// most one encryption sync running at any time. See its documentation
237    /// for more details.
238    #[doc(hidden)] // Only public for testing purposes.
239    pub fn sync(
240        &self,
241        _permit: OwnedMutexGuard<EncryptionSyncPermit>,
242    ) -> impl Stream<Item = Result<(), Error>> + '_ {
243        stream!({
244            let sync = self.sliding_sync.sync();
245
246            pin_mut!(sync);
247
248            loop {
249                match self.next_sync_with_lock(&mut sync).await? {
250                    Some(Ok(update_summary)) => {
251                        // This API is only concerned with the e2ee and to-device extensions.
252                        // Warn if anything weird has been received from the homeserver.
253                        if !update_summary.lists.is_empty() {
254                            debug!(?update_summary.lists, "unexpected non-empty list of lists in encryption sync API");
255                        }
256                        if !update_summary.rooms.is_empty() {
257                            debug!(?update_summary.rooms, "unexpected non-empty list of rooms in encryption sync API");
258                        }
259
260                        // Cool cool, let's do it again.
261                        trace!("Encryption sync received an update!");
262                        yield Ok(());
263                        continue;
264                    }
265
266                    Some(Err(err)) => {
267                        trace!("Encryption sync stopped because of an error: {err:#}");
268                        yield Err(Error::SlidingSync(err));
269                        break;
270                    }
271
272                    None => {
273                        trace!("Encryption sync properly terminated.");
274                        break;
275                    }
276                }
277            }
278        })
279    }
280
281    /// Helper function for `sync`. Take the cross-process store lock, and call
282    /// `sync.next()`
283    #[instrument(skip_all, fields(store_generation))]
284    async fn next_sync_with_lock<Item>(
285        &self,
286        sync: &mut Pin<&mut impl Stream<Item = Item>>,
287    ) -> Result<Option<Item>, Error> {
288        let guard = if self.with_locking {
289            self.client.encryption().spin_lock_store(Some(60000)).await.map_err(Error::LockError)?
290        } else {
291            None
292        };
293
294        Span::current().record("store_generation", guard.map(|guard| guard.generation()));
295
296        Ok(sync.next().await)
297    }
298
299    /// Requests that the underlying sliding sync be stopped.
300    ///
301    /// This will unlock the cross-process lock, if taken.
302    pub(crate) fn stop_sync(&self) -> Result<(), Error> {
303        // Stopping the sync loop will cause the next `next()` call to return `None`, so
304        // this will also release the cross-process lock automatically.
305        self.sliding_sync.stop_sync().map_err(Error::SlidingSync)?;
306
307        Ok(())
308    }
309
310    pub(crate) async fn expire_sync_session(&self) {
311        self.sliding_sync.expire_session().await;
312    }
313}
314
315/// Errors for the [`EncryptionSyncService`].
316#[derive(Debug, thiserror::Error)]
317pub enum Error {
318    #[error("Something wrong happened in sliding sync: {0:#}")]
319    SlidingSync(matrix_sdk::Error),
320
321    #[error("Locking failed: {0:#}")]
322    LockError(matrix_sdk::Error),
323
324    #[error(transparent)]
325    ClientError(matrix_sdk::Error),
326}