Skip to main content

matrix_sdk_ui/
encryption_sync_service.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15//! Encryption Sync API.
16//!
17//! The encryption sync API is a high-level helper that is designed to take care
18//! of handling the synchronization of encryption and to-device events (required
19//! for encryption), be they received within the app or within a dedicated
20//! extension process (e.g. the [NSE] process on iOS devices).
21//!
22//! Under the hood, this uses a sliding sync instance configured with no lists,
23//! but that enables the e2ee and to-device extensions, so that it can both
24//! handle encryption and manage encryption keys; that's sufficient to decrypt
25//! messages received in the notification processes.
26//!
27//! [NSE]: https://developer.apple.com/documentation/usernotifications/unnotificationserviceextension
28
29use std::{pin::Pin, time::Duration};
30
31use async_stream::stream;
32use futures_core::stream::Stream;
33use futures_util::{StreamExt, pin_mut};
34use matrix_sdk::{Client, LEASE_DURATION_MS, SlidingSync, sleep::sleep};
35use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
36use ruma::{api::client::sync::sync_events::v5 as http, assign};
37use tokio::sync::OwnedMutexGuard;
38use tracing::{Span, debug, instrument, trace};
39
40/// Unit type representing a permit to *use* an [`EncryptionSyncService`].
41///
42/// This must be created once in the whole application's lifetime, wrapped in a
43/// mutex. Using an `EncryptionSyncService` must then lock that mutex in an
44/// owned way, so that there's at most a single `EncryptionSyncService` running
45/// at any time in the entire app.
46pub struct EncryptionSyncPermit(());
47
48impl EncryptionSyncPermit {
49    pub(crate) fn new() -> Self {
50        Self(())
51    }
52}
53
54impl EncryptionSyncPermit {
55    /// Test-only.
56    #[doc(hidden)]
57    pub fn new_for_testing() -> Self {
58        Self::new()
59    }
60}
61
62/// High-level helper for synchronizing encryption events using sliding sync.
63///
64/// See the module's documentation for more details.
65pub struct EncryptionSyncService {
66    client: Client,
67    sliding_sync: SlidingSync,
68}
69
70impl EncryptionSyncService {
71    /// Creates a new instance of a `EncryptionSyncService`.
72    ///
73    /// This will create and manage an instance of [`matrix_sdk::SlidingSync`].
74    pub async fn new(
75        client: Client,
76        poll_and_network_timeouts: Option<(Duration, Duration)>,
77    ) -> Result<Self, Error> {
78        // Make sure to use the same `conn_id` and caching store identifier, whichever
79        // process is running this sliding sync. There must be at most one
80        // sliding sync instance that enables the e2ee and to-device extensions.
81        let mut builder = client
82            .sliding_sync("encryption")
83            .map_err(Error::SlidingSync)?
84            //.share_pos() // TODO(bnjbvr) This is racy, needs cross-process lock :')
85            .with_to_device_extension(
86                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
87            )
88            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}));
89
90        if let Some((poll_timeout, network_timeout)) = poll_and_network_timeouts {
91            builder = builder.poll_timeout(poll_timeout).network_timeout(network_timeout);
92        }
93
94        let sliding_sync = builder.build().await.map_err(Error::SlidingSync)?;
95
96        if let CrossProcessLockConfig::MultiProcess { holder_name } =
97            client.cross_process_lock_config()
98        {
99            // Gently try to enable the cross-process lock on behalf of the user.
100            match client.encryption().enable_cross_process_store_lock(holder_name.clone()).await {
101                Ok(()) | Err(matrix_sdk::Error::BadCryptoStoreState) => {
102                    // Ignore; we've already set the crypto store lock to
103                    // something, and that's sufficient as
104                    // long as it uniquely identifies the process.
105                }
106                Err(err) => {
107                    // Any other error is fatal
108                    return Err(Error::ClientError(err));
109                }
110            }
111        }
112
113        Ok(Self { client, sliding_sync })
114    }
115
116    /// Runs an `EncryptionSyncService` loop for a fixed number of iterations.
117    ///
118    /// This runs for the given number of iterations, or less than that, if it
119    /// stops earlier or could not acquire a cross-process lock (if configured
120    /// with it).
121    ///
122    /// Note: the [`EncryptionSyncPermit`] parameter ensures that there's at
123    /// most one encryption sync running at any time. See its documentation
124    /// for more details.
125    #[instrument(skip_all, fields(store_generation))]
126    pub async fn run_fixed_iterations(
127        self,
128        num_iterations: u8,
129        _permit: OwnedMutexGuard<EncryptionSyncPermit>,
130    ) -> Result<(), Error> {
131        let sync = self.sliding_sync.sync();
132
133        pin_mut!(sync);
134
135        let lock_guard = if let CrossProcessLockConfig::MultiProcess { .. } =
136            self.client.cross_process_lock_config()
137        {
138            let mut lock_guard =
139                self.client.encryption().try_lock_store_once().await.map_err(Error::LockError)?;
140
141            // Try to take the lock at the beginning; if it's busy, that means that another
142            // process already holds onto it, and as such we won't try to run the
143            // encryption sync loop at all (because we expect the other process to
144            // do so).
145
146            if lock_guard.is_none() {
147                // If we can't acquire the cross-process lock on the first attempt,
148                // that means the main process is running, or its lease hasn't expired
149                // yet. In case it's the latter, wait a bit and retry.
150                tracing::debug!(
151                    "Lock was already taken, and we're not the main loop; retrying in {}ms...",
152                    LEASE_DURATION_MS
153                );
154
155                sleep(Duration::from_millis(LEASE_DURATION_MS.into())).await;
156
157                lock_guard = self
158                    .client
159                    .encryption()
160                    .try_lock_store_once()
161                    .await
162                    .map_err(Error::LockError)?;
163
164                if lock_guard.is_none() {
165                    tracing::debug!(
166                        "Second attempt at locking outside the main app failed, aborting."
167                    );
168                    return Ok(());
169                }
170            }
171
172            lock_guard
173        } else {
174            None
175        };
176
177        Span::current().record("store_generation", lock_guard.map(|guard| guard.generation()));
178
179        for _ in 0..num_iterations {
180            match sync.next().await {
181                Some(Ok(update_summary)) => {
182                    // This API is only concerned with the e2ee and to-device extensions.
183                    // Warn if anything weird has been received from the homeserver.
184                    if !update_summary.lists.is_empty() {
185                        debug!(?update_summary.lists, "unexpected non-empty list of lists in encryption sync API");
186                    }
187                    if !update_summary.rooms.is_empty() {
188                        debug!(?update_summary.rooms, "unexpected non-empty list of rooms in encryption sync API");
189                    }
190
191                    // Cool cool, let's do it again.
192                    trace!("Encryption sync received an update!");
193                }
194
195                Some(Err(err)) => {
196                    trace!("Encryption sync stopped because of an error: {err:#}");
197                    return Err(Error::SlidingSync(err));
198                }
199
200                None => {
201                    trace!("Encryption sync properly terminated.");
202                    break;
203                }
204            }
205        }
206
207        Ok(())
208    }
209
210    /// Start synchronization.
211    ///
212    /// This should be regularly polled.
213    ///
214    /// Note: the [`EncryptionSyncPermit`] parameter ensures that there's at
215    /// most one encryption sync running at any time. See its documentation
216    /// for more details.
217    #[doc(hidden)] // Only public for testing purposes.
218    pub fn sync(
219        &self,
220        _permit: OwnedMutexGuard<EncryptionSyncPermit>,
221    ) -> impl Stream<Item = Result<(), Error>> + '_ {
222        stream!({
223            let sync = self.sliding_sync.sync();
224
225            pin_mut!(sync);
226
227            loop {
228                match self.next_sync_with_lock(&mut sync).await? {
229                    Some(Ok(update_summary)) => {
230                        // This API is only concerned with the e2ee and to-device extensions.
231                        // Warn if anything weird has been received from the homeserver.
232                        if !update_summary.lists.is_empty() {
233                            debug!(?update_summary.lists, "unexpected non-empty list of lists in encryption sync API");
234                        }
235                        if !update_summary.rooms.is_empty() {
236                            debug!(?update_summary.rooms, "unexpected non-empty list of rooms in encryption sync API");
237                        }
238
239                        // Cool cool, let's do it again.
240                        trace!("Encryption sync received an update!");
241                        yield Ok(());
242                        continue;
243                    }
244
245                    Some(Err(err)) => {
246                        trace!("Encryption sync stopped because of an error: {err:#}");
247                        yield Err(Error::SlidingSync(err));
248                        break;
249                    }
250
251                    None => {
252                        trace!("Encryption sync properly terminated.");
253                        break;
254                    }
255                }
256            }
257        })
258    }
259
260    /// Helper function for `sync`. Take the cross-process store lock, and call
261    /// `sync.next()`
262    #[instrument(skip_all, fields(store_generation))]
263    async fn next_sync_with_lock<Item>(
264        &self,
265        sync: &mut Pin<&mut impl Stream<Item = Item>>,
266    ) -> Result<Option<Item>, Error> {
267        let guard = if let CrossProcessLockConfig::MultiProcess { .. } =
268            self.client.cross_process_lock_config()
269        {
270            self.client.encryption().spin_lock_store(Some(60000)).await.map_err(Error::LockError)?
271        } else {
272            None
273        };
274
275        Span::current().record("store_generation", guard.map(|guard| guard.generation()));
276
277        Ok(sync.next().await)
278    }
279
280    /// Requests that the underlying sliding sync be stopped.
281    ///
282    /// This will unlock the cross-process lock, if taken.
283    pub(crate) fn stop_sync(&self) -> Result<(), Error> {
284        // Stopping the sync loop will cause the next `next()` call to return `None`, so
285        // this will also release the cross-process lock automatically.
286        self.sliding_sync.stop_sync().map_err(Error::SlidingSync)?;
287
288        Ok(())
289    }
290
291    pub(crate) async fn expire_sync_session(&self) {
292        self.sliding_sync.expire_session().await;
293    }
294}
295
296/// Errors for the [`EncryptionSyncService`].
297#[derive(Debug, thiserror::Error)]
298pub enum Error {
299    #[error("Something wrong happened in sliding sync: {0:#}")]
300    SlidingSync(matrix_sdk::Error),
301
302    #[error("Locking failed: {0:#}")]
303    LockError(matrix_sdk::Error),
304
305    #[error(transparent)]
306    ClientError(matrix_sdk::Error),
307}