matrix_sdk_ui/
sync_service.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15//! Unified API for both the Room List API and the Encryption Sync API, that
16//! takes care of all the underlying details.
17//!
18//! This is an opiniated way to run both APIs, with high-level callbacks that
19//! should be called in reaction to user actions and/or system events.
20//!
21//! The sync service will signal errors via its [`state`](SyncService::state)
22//! that the user MUST observe. Whenever an error/termination is observed, the
23//! user should call [`SyncService::start()`] again to restart the room list
24//! sync, if that is not desirable, the offline support for the [`SyncService`]
25//! may be enabled using the [`SyncServiceBuilder::with_offline_mode`] setting.
26
27use std::{sync::Arc, time::Duration};
28
29use eyeball::{SharedObservable, Subscriber};
30use futures_util::{
31    StreamExt as _,
32    future::{Either, select},
33    pin_mut,
34};
35use matrix_sdk::{
36    Client,
37    config::RequestConfig,
38    executor::{JoinHandle, spawn},
39    sleep::sleep,
40};
41use thiserror::Error;
42use tokio::sync::{
43    Mutex as AsyncMutex, OwnedMutexGuard,
44    mpsc::{Receiver, Sender},
45};
46use tracing::{Instrument, Level, Span, error, info, instrument, trace, warn};
47
48use crate::{
49    encryption_sync_service::{self, EncryptionSyncPermit, EncryptionSyncService, WithLocking},
50    room_list_service::{self, RoomListService},
51};
52
53/// Current state of the application.
54///
55/// This is a high-level state indicating what's the status of the underlying
56/// syncs. The application starts in [`State::Running`] mode, and then hits a
57/// terminal state [`State::Terminated`] (if it gracefully exited) or
58/// [`State::Error`] (in case any of the underlying syncs ran into an error).
59///
60/// This can be observed with [`SyncService::state`].
61#[derive(Clone, Debug)]
62pub enum State {
63    /// The service hasn't ever been started yet, or has been stopped.
64    Idle,
65
66    /// The underlying syncs are properly running in the background.
67    Running,
68
69    /// Any of the underlying syncs has terminated gracefully (i.e. be stopped).
70    Terminated,
71
72    /// Any of the underlying syncs has ran into an error.
73    ///
74    /// The associated [`enum@Error`] is inside an [`Arc`] to (i) make [`State`]
75    /// cloneable, and to (ii) not make it heavier.
76    Error(Arc<Error>),
77
78    /// The service has entered offline mode. This state will only be entered if
79    /// the [`SyncService`] has been built with the
80    /// [`SyncServiceBuilder::with_offline_mode`] setting.
81    ///
82    /// The [`SyncService`] will enter the offline mode if syncing with the
83    /// server fails, it will then periodically check if the server is
84    /// available using the `/_matrix/client/versions` endpoint.
85    ///
86    /// Once the [`SyncService`] receives a 200 response from the
87    /// `/_matrix/client/versions` endpoint, it will go back into the
88    /// [`State::Running`] mode and attempt to sync again.
89    ///
90    /// Calling [`SyncService::start()`] while in this state will abort the
91    /// `/_matrix/client/versions` checks and attempt to sync immediately.
92    ///
93    /// Calling [`SyncService::stop()`] will abort the offline mode and the
94    /// [`SyncService`] will go into the [`State::Idle`] mode.
95    Offline,
96}
97
98enum MaybeAcquiredPermit {
99    Acquired(OwnedMutexGuard<EncryptionSyncPermit>),
100    Unacquired(Arc<AsyncMutex<EncryptionSyncPermit>>),
101}
102
103impl MaybeAcquiredPermit {
104    async fn acquire(self) -> OwnedMutexGuard<EncryptionSyncPermit> {
105        match self {
106            MaybeAcquiredPermit::Acquired(owned_mutex_guard) => owned_mutex_guard,
107            MaybeAcquiredPermit::Unacquired(lock) => lock.lock_owned().await,
108        }
109    }
110}
111
112/// A supervisor responsible for managing two sync tasks: one for handling the
113/// room list and another for supporting end-to-end encryption.
114///
115/// The two sync tasks are spawned as child tasks and are contained within the
116/// supervising task, which is stored in the [`SyncTaskSupervisor::task`] field.
117///
118/// The supervisor ensures the two child tasks are managed as a single unit,
119/// allowing for them to be shutdown in unison.
120struct SyncTaskSupervisor {
121    /// The supervising task that manages and contains the two sync child tasks.
122    task: JoinHandle<()>,
123    /// [`TerminationReport`] sender for the [`SyncTaskSupervisor::shutdown()`]
124    /// function.
125    termination_sender: Sender<TerminationReport>,
126}
127
128impl SyncTaskSupervisor {
129    async fn new(
130        inner: &SyncServiceInner,
131        room_list_service: Arc<RoomListService>,
132        encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
133    ) -> Self {
134        let (task, termination_sender) =
135            Self::spawn_supervisor_task(inner, room_list_service, encryption_sync_permit).await;
136
137        Self { task, termination_sender }
138    }
139
140    /// Check if a homeserver is reachable.
141    ///
142    /// This function handles the offline mode by waiting for either a
143    /// termination report or a successful `/_matrix/client/versions` response.
144    ///
145    /// This function waits for two conditions:
146    ///
147    /// 1. Waiting for a termination report: This ensures that the user can exit
148    ///    offline mode and attempt to restart the [`SyncService`] manually.
149    ///
150    /// 2. Waiting to come back online: This continuously checks server
151    ///    availability.
152    ///
153    /// If the `/_matrix/client/versions` request succeeds, the function exits
154    /// without a termination report. If we receive a [`TerminationReport`] from
155    /// the user, we exit immediately and return the termination report.
156    async fn offline_check(
157        client: &Client,
158        receiver: &mut Receiver<TerminationReport>,
159    ) -> Option<TerminationReport> {
160        info!("Entering the offline mode");
161
162        let wait_for_termination_report = async {
163            loop {
164                // Since we didn't empty the channel when entering the offline mode in fear that
165                // we might miss a report with the
166                // `TerminationOrigin::Supervisor` origin and the channel might contain stale
167                // reports from one of the sync services, in case both of them have sent a
168                // report, let's ignore all reports we receive from the sync
169                // services.
170                let report =
171                    receiver.recv().await.unwrap_or_else(TerminationReport::supervisor_error);
172
173                match report.origin {
174                    TerminationOrigin::EncryptionSync | TerminationOrigin::RoomList => {}
175                    // Since the sync service aren't running anymore, we can only receive a report
176                    // from the supervisor. It would have probably made sense to have separate
177                    // channels for reports the sync services send and the user can send using the
178                    // `SyncService::stop()` method.
179                    TerminationOrigin::Supervisor => break report,
180                }
181            }
182        };
183
184        let wait_to_be_online = async move {
185            loop {
186                // Encountering network failures when sending a request which has with no retry
187                // limit set in the `RequestConfig` are treated as permanent failures and our
188                // exponential backoff doesn't kick in.
189                //
190                // Let's set a retry limit so network failures are retried as well.
191                let request_config = RequestConfig::default().retry_limit(5);
192
193                // We're in an infinite loop, but our request sending already has an exponential
194                // backoff set up. This will kick in for any request errors that we consider to
195                // be transient. Common network errors (timeouts, DNS failures) or any server
196                // error in the 5xx range of HTTP errors are considered to be transient.
197                //
198                // Still, as a precaution, we're going to sleep here for a while in the Error
199                // case.
200                match client.fetch_server_versions(Some(request_config)).await {
201                    Ok(_) => break,
202                    Err(_) => sleep(Duration::from_millis(100)).await,
203                }
204            }
205        };
206
207        pin_mut!(wait_for_termination_report);
208        pin_mut!(wait_to_be_online);
209
210        let maybe_termination_report = select(wait_for_termination_report, wait_to_be_online).await;
211
212        let report = match maybe_termination_report {
213            Either::Left((termination_report, _)) => Some(termination_report),
214            Either::Right((_, _)) => None,
215        };
216
217        info!("Exiting offline mode: {report:?}");
218
219        report
220    }
221
222    /// The role of the supervisor task is to wait for a termination message
223    /// ([`TerminationReport`]), sent either because we wanted to stop both
224    /// syncs, or because one of the syncs failed (in which case we'll stop the
225    /// other one too).
226    async fn spawn_supervisor_task(
227        inner: &SyncServiceInner,
228        room_list_service: Arc<RoomListService>,
229        encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
230    ) -> (JoinHandle<()>, Sender<TerminationReport>) {
231        let (sender, mut receiver) = tokio::sync::mpsc::channel(16);
232
233        let encryption_sync = inner.encryption_sync_service.clone();
234        let state = inner.state.clone();
235        let termination_sender = sender.clone();
236
237        // When we first start, and don't use offline mode, we want to acquire the sync
238        // permit before we enter a future that might be polled at a later time,
239        // this means that the permit will be acquired as soon as this future,
240        // the one the `spawn_supervisor_task` function creates, is awaited.
241        //
242        // In other words, once `sync_service.start().await` is finished, the permit
243        // will be in the acquired state.
244        let mut sync_permit_guard =
245            MaybeAcquiredPermit::Acquired(encryption_sync_permit.clone().lock_owned().await);
246
247        let offline_mode = inner.with_offline_mode;
248        let parent_span = inner.parent_span.clone();
249
250        let future = async move {
251            loop {
252                let (room_list_task, encryption_sync_task) = Self::spawn_child_tasks(
253                    room_list_service.clone(),
254                    encryption_sync.clone(),
255                    sync_permit_guard,
256                    sender.clone(),
257                    parent_span.clone(),
258                )
259                .await;
260
261                sync_permit_guard = MaybeAcquiredPermit::Unacquired(encryption_sync_permit.clone());
262
263                let report = if let Some(report) = receiver.recv().await {
264                    report
265                } else {
266                    info!("internal channel has been closed?");
267                    // We should still stop the child tasks in the unlikely scenario that our
268                    // receiver died.
269                    TerminationReport::supervisor_error()
270                };
271
272                // If one service failed, make sure to request stopping the other one.
273                let (stop_room_list, stop_encryption) = match &report.origin {
274                    TerminationOrigin::EncryptionSync => (true, false),
275                    TerminationOrigin::RoomList => (false, true),
276                    TerminationOrigin::Supervisor => (true, true),
277                };
278
279                // Stop both services, and wait for the streams to properly finish: at some
280                // point they'll return `None` and will exit their infinite loops, and their
281                // tasks will gracefully terminate.
282
283                if stop_room_list {
284                    if let Err(err) = room_list_service.stop_sync() {
285                        warn!(?report, "unable to stop room list service: {err:#}");
286                    }
287
288                    if report.has_expired() {
289                        room_list_service.expire_sync_session().await;
290                    }
291                }
292
293                if let Err(err) = room_list_task.await {
294                    error!("when awaiting room list service: {err:#}");
295                }
296
297                if stop_encryption {
298                    if let Err(err) = encryption_sync.stop_sync() {
299                        warn!(?report, "unable to stop encryption sync: {err:#}");
300                    }
301
302                    if report.has_expired() {
303                        encryption_sync.expire_sync_session().await;
304                    }
305                }
306
307                if let Err(err) = encryption_sync_task.await {
308                    error!("when awaiting encryption sync: {err:#}");
309                }
310
311                if let Some(error) = report.error {
312                    if offline_mode {
313                        state.set(State::Offline);
314
315                        let client = room_list_service.client();
316
317                        if let Some(report) = Self::offline_check(client, &mut receiver).await {
318                            if let Some(error) = report.error {
319                                state.set(State::Error(Arc::new(error)));
320                            } else {
321                                state.set(State::Idle);
322                            }
323                            break;
324                        }
325
326                        state.set(State::Running);
327                    } else {
328                        state.set(State::Error(Arc::new(error)));
329                        break;
330                    }
331                } else if matches!(report.origin, TerminationOrigin::Supervisor) {
332                    state.set(State::Idle);
333                    break;
334                } else {
335                    state.set(State::Terminated);
336                    break;
337                }
338            }
339        }
340        .instrument(tracing::span!(Level::WARN, "supervisor task"));
341
342        let task = spawn(future);
343
344        (task, termination_sender)
345    }
346
347    async fn spawn_child_tasks(
348        room_list_service: Arc<RoomListService>,
349        encryption_sync_service: Arc<EncryptionSyncService>,
350        sync_permit_guard: MaybeAcquiredPermit,
351        sender: Sender<TerminationReport>,
352        parent_span: Span,
353    ) -> (JoinHandle<()>, JoinHandle<()>) {
354        // First, take care of the room list.
355        let room_list_task = spawn(
356            Self::room_list_sync_task(room_list_service, sender.clone())
357                .instrument(parent_span.clone()),
358        );
359
360        // Then, take care of the encryption sync.
361        let encryption_sync_task = spawn(
362            Self::encryption_sync_task(
363                encryption_sync_service,
364                sender.clone(),
365                sync_permit_guard.acquire().await,
366            )
367            .instrument(parent_span),
368        );
369
370        (room_list_task, encryption_sync_task)
371    }
372
373    async fn encryption_sync_task(
374        encryption_sync: Arc<EncryptionSyncService>,
375        sender: Sender<TerminationReport>,
376        sync_permit_guard: OwnedMutexGuard<EncryptionSyncPermit>,
377    ) {
378        let encryption_sync_stream = encryption_sync.sync(sync_permit_guard);
379        pin_mut!(encryption_sync_stream);
380
381        let termination_report = loop {
382            match encryption_sync_stream.next().await {
383                Some(Ok(())) => {
384                    // Carry on.
385                }
386                Some(Err(error)) => {
387                    let termination_report = TerminationReport::encryption_sync(Some(error));
388
389                    if !termination_report.has_expired() {
390                        error!(
391                            "Error while processing encryption in sync service: {:#?}",
392                            termination_report.error
393                        );
394                    }
395
396                    break termination_report;
397                }
398                None => {
399                    // The stream has ended.
400                    break TerminationReport::encryption_sync(None);
401                }
402            }
403        };
404
405        if let Err(err) = sender.send(termination_report).await {
406            error!("Error while sending termination report: {err:#}");
407        }
408    }
409
410    async fn room_list_sync_task(
411        room_list_service: Arc<RoomListService>,
412        sender: Sender<TerminationReport>,
413    ) {
414        let room_list_stream = room_list_service.sync();
415        pin_mut!(room_list_stream);
416
417        let termination_report = loop {
418            match room_list_stream.next().await {
419                Some(Ok(())) => {
420                    // Carry on.
421                }
422                Some(Err(error)) => {
423                    let termination_report = TerminationReport::room_list(Some(error));
424
425                    if !termination_report.has_expired() {
426                        error!(
427                            "Error while processing room list in sync service: {:#?}",
428                            termination_report.error
429                        );
430                    }
431
432                    break termination_report;
433                }
434                None => {
435                    // The stream has ended.
436                    break TerminationReport::room_list(None);
437                }
438            }
439        };
440
441        if let Err(err) = sender.send(termination_report).await {
442            error!("Error while sending termination report: {err:#}");
443        }
444    }
445
446    async fn shutdown(self) {
447        match self.termination_sender.send(TerminationReport::supervisor()).await {
448            Ok(_) => {
449                let _ = self.task.await.inspect_err(|err| {
450                    // A `JoinError` indicates that the task was already dead, either because it got
451                    // cancelled or because it panicked. We only cancel the task in the Err branch
452                    // below and the task shouldn't be able to panic.
453                    //
454                    // So let's log an error and return.
455                    error!("The supervisor task has stopped unexpectedly: {err:?}");
456                });
457            }
458            Err(err) => {
459                error!("Couldn't send the termination report to the supervisor task: {err}");
460                // Let's abort the task if it won't shut down properly, otherwise we would have
461                // left it as a detached task.
462                self.task.abort();
463            }
464        }
465    }
466}
467
468struct SyncServiceInner {
469    encryption_sync_service: Arc<EncryptionSyncService>,
470
471    /// Is the offline mode for the [`SyncService`] enabled?
472    ///
473    /// The offline mode is described in the [`State::Offline`] enum variant.
474    with_offline_mode: bool,
475
476    /// What's the state of this sync service?
477    state: SharedObservable<State>,
478
479    /// The parent tracing span to use for the tasks within this service.
480    ///
481    /// Normally this will be [`Span::none`], but it may be useful to assign a
482    /// defined span, for example if there is more than one active sync
483    /// service.
484    parent_span: Span,
485
486    /// Supervisor task ensuring proper termination.
487    ///
488    /// This task is waiting for a [`TerminationReport`] from any of the other
489    /// two tasks, or from a user request via [`SyncService::stop()`]. It
490    /// makes sure that the two services are properly shut up and just
491    /// interrupted.
492    ///
493    /// This is set at the same time as the other two tasks.
494    supervisor: Option<SyncTaskSupervisor>,
495}
496
497impl SyncServiceInner {
498    async fn start(
499        &mut self,
500        room_list_service: Arc<RoomListService>,
501        encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
502    ) {
503        trace!("starting sync service");
504
505        self.supervisor =
506            Some(SyncTaskSupervisor::new(self, room_list_service, encryption_sync_permit).await);
507        self.state.set(State::Running);
508    }
509
510    async fn stop(&mut self) {
511        trace!("pausing sync service");
512
513        // Remove the supervisor from our state and request the tasks to be shutdown.
514        if let Some(supervisor) = self.supervisor.take() {
515            supervisor.shutdown().await;
516        } else {
517            error!("The sync service was not properly started, the supervisor task doesn't exist");
518        }
519    }
520
521    async fn restart(
522        &mut self,
523        room_list_service: Arc<RoomListService>,
524        encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
525    ) {
526        self.stop().await;
527        self.start(room_list_service, encryption_sync_permit).await;
528    }
529}
530
531/// A high level manager for your Matrix syncing needs.
532///
533/// The [`SyncService`] is responsible for managing real-time synchronization
534/// with a Matrix server. It can initiate and maintain the necessary
535/// synchronization tasks for you.
536///
537/// **Note**: The [`SyncService`] requires a server with support for [MSC4186],
538/// otherwise it will fail with an 404 `M_UNRECOGNIZED` request error.
539///
540/// [MSC4186]: https://github.com/matrix-org/matrix-spec-proposals/pull/4186/
541///
542/// # Example
543///
544/// ```no_run
545/// use matrix_sdk::Client;
546/// use matrix_sdk_ui::sync_service::{State, SyncService};
547/// # use url::Url;
548/// # async {
549/// let homeserver = Url::parse("http://example.com")?;
550/// let client = Client::new(homeserver).await?;
551///
552/// client
553///     .matrix_auth()
554///     .login_username("example", "wordpass")
555///     .initial_device_display_name("My bot")
556///     .await?;
557///
558/// let sync_service = SyncService::builder(client).build().await?;
559/// let mut state = sync_service.state();
560///
561/// while let Some(state) = state.next().await {
562///     match state {
563///         State::Idle => eprintln!("The sync service is idle."),
564///         State::Running => eprintln!("The sync has started to run."),
565///         State::Offline => eprintln!(
566///             "We have entered the offline mode, the server seems to be
567///              unavailable"
568///         ),
569///         State::Terminated => {
570///             eprintln!("The sync service has been gracefully terminated");
571///             break;
572///         }
573///         State::Error(_) => {
574///             eprintln!("The sync service has run into an error");
575///             break;
576///         }
577///     }
578/// }
579/// # anyhow::Ok(()) };
580/// ```
581pub struct SyncService {
582    inner: Arc<AsyncMutex<SyncServiceInner>>,
583
584    /// Room list service used to synchronize the rooms state.
585    room_list_service: Arc<RoomListService>,
586
587    /// What's the state of this sync service? This field is replicated from the
588    /// [`SyncServiceInner`] struct, but it should not be modified in this
589    /// struct. It's re-exposed here so we can subscribe to the state without
590    /// taking the lock on the `inner` field.
591    state: SharedObservable<State>,
592
593    /// Global lock to allow using at most one [`EncryptionSyncService`] at all
594    /// times.
595    ///
596    /// This ensures that there's only one ever existing in the application's
597    /// lifetime (under the assumption that there is at most one [`SyncService`]
598    /// per application).
599    encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
600}
601
602impl SyncService {
603    /// Create a new builder for configuring an `SyncService`.
604    pub fn builder(client: Client) -> SyncServiceBuilder {
605        SyncServiceBuilder::new(client)
606    }
607
608    /// Get the underlying `RoomListService` instance for easier access to its
609    /// methods.
610    pub fn room_list_service(&self) -> Arc<RoomListService> {
611        self.room_list_service.clone()
612    }
613
614    /// Returns the state of the sync service.
615    pub fn state(&self) -> Subscriber<State> {
616        self.state.subscribe()
617    }
618
619    /// Start (or restart) the underlying sliding syncs.
620    ///
621    /// This can be called multiple times safely:
622    /// - if the stream is still properly running, it won't be restarted.
623    /// - if the [`SyncService`] is in the offline mode we will exit the offline
624    ///   mode and immediately attempt to sync again.
625    /// - if the stream has been aborted before, it will be properly cleaned up
626    ///   and restarted.
627    pub async fn start(&self) {
628        let mut inner = self.inner.lock().await;
629
630        // Only (re)start the tasks if it's stopped or if we're in the offline mode.
631        match inner.state.get() {
632            // If we're already running, there's nothing to do.
633            State::Running => {}
634            // If we're in the offline mode, first stop the service and then start it again.
635            State::Offline => {
636                inner
637                    .restart(self.room_list_service.clone(), self.encryption_sync_permit.clone())
638                    .await
639            }
640            // Otherwise just start.
641            State::Idle | State::Terminated | State::Error(_) => {
642                inner
643                    .start(self.room_list_service.clone(), self.encryption_sync_permit.clone())
644                    .await
645            }
646        }
647    }
648
649    /// Stop the underlying sliding syncs.
650    ///
651    /// This must be called when the app goes into the background. It's better
652    /// to call this API when the application exits, although not strictly
653    /// necessary.
654    #[instrument(skip_all)]
655    pub async fn stop(&self) {
656        let mut inner = self.inner.lock().await;
657
658        match inner.state.get() {
659            State::Idle | State::Terminated | State::Error(_) => {
660                // No need to stop if we were not running.
661                return;
662            }
663            State::Running | State::Offline => {}
664        }
665
666        inner.stop().await;
667    }
668
669    /// Force expiring both sessions.
670    ///
671    /// This ensures that the sync service is stopped before expiring both
672    /// sessions. It should be used sparingly, as it will cause a restart of
673    /// the sessions on the server as well.
674    #[instrument(skip_all)]
675    pub async fn expire_sessions(&self) {
676        // First, stop the sync service if it was running; it's a no-op if it was
677        // already stopped.
678        self.stop().await;
679
680        // Expire the room list sync session.
681        self.room_list_service.expire_sync_session().await;
682
683        // Expire the encryption sync session.
684        self.inner.lock().await.encryption_sync_service.expire_sync_session().await;
685    }
686
687    /// Attempt to get a permit to use an `EncryptionSyncService` at a given
688    /// time.
689    ///
690    /// This ensures there is at most one [`EncryptionSyncService`] active at
691    /// any time, per application.
692    pub fn try_get_encryption_sync_permit(&self) -> Option<OwnedMutexGuard<EncryptionSyncPermit>> {
693        self.encryption_sync_permit.clone().try_lock_owned().ok()
694    }
695}
696
697#[derive(Debug)]
698enum TerminationOrigin {
699    EncryptionSync,
700    RoomList,
701    Supervisor,
702}
703
704#[derive(Debug)]
705struct TerminationReport {
706    /// The origin of the termination.
707    origin: TerminationOrigin,
708
709    /// If the termination is due to an error, this is the cause.
710    error: Option<Error>,
711}
712
713impl TerminationReport {
714    /// Create a new [`TerminationReport`] with `origin` set to
715    /// [`TerminationOrigin::EncryptionSync`] and `error` set to
716    /// [`Error::EncryptionSync`].
717    fn encryption_sync(error: Option<encryption_sync_service::Error>) -> Self {
718        Self { origin: TerminationOrigin::EncryptionSync, error: error.map(Error::EncryptionSync) }
719    }
720
721    /// Create a new [`TerminationReport`] with `origin` set to
722    /// [`TerminationOrigin::RoomList`] and `error` set to [`Error::RoomList`].
723    fn room_list(error: Option<room_list_service::Error>) -> Self {
724        Self { origin: TerminationOrigin::RoomList, error: error.map(Error::RoomList) }
725    }
726
727    /// Create a new [`TerminationReport`] with `origin` set to
728    /// [`TerminationOrigin::Supervisor`] and `error` set to
729    /// [`Error::Supervisor`].
730    fn supervisor_error() -> Self {
731        Self { origin: TerminationOrigin::Supervisor, error: Some(Error::Supervisor) }
732    }
733
734    /// Create a new [`TerminationReport`] with `origin` set to
735    /// [`TerminationOrigin::Supervisor`] and `error` set to `None`.
736    fn supervisor() -> Self {
737        Self { origin: TerminationOrigin::Supervisor, error: None }
738    }
739
740    /// Check whether the termination is due to an expired sliding sync session.
741    fn has_expired(&self) -> bool {
742        match &self.error {
743            Some(Error::RoomList(room_list_service::Error::SlidingSync(error)))
744            | Some(Error::EncryptionSync(encryption_sync_service::Error::SlidingSync(error))) => {
745                error.client_api_error_kind()
746                    == Some(&ruma::api::client::error::ErrorKind::UnknownPos)
747            }
748            _ => false,
749        }
750    }
751}
752
753// Testing helpers, mostly.
754#[doc(hidden)]
755impl SyncService {
756    /// Is the task supervisor running?
757    pub async fn is_supervisor_running(&self) -> bool {
758        self.inner.lock().await.supervisor.is_some()
759    }
760}
761
762#[derive(Clone)]
763pub struct SyncServiceBuilder {
764    /// SDK client.
765    client: Client,
766
767    /// Is the cross-process lock for the crypto store enabled?
768    with_cross_process_lock: bool,
769
770    /// Is the offline mode for the [`SyncService`] enabled?
771    ///
772    /// The offline mode is described in the [`State::Offline`] enum variant.
773    with_offline_mode: bool,
774
775    /// Whether to turn [`SlidingSyncBuilder::share_pos`] on or off.
776    ///
777    /// [`SlidingSyncBuilder::share_pos`]: matrix_sdk::sliding_sync::SlidingSyncBuilder::share_pos
778    with_share_pos: bool,
779
780    /// The parent tracing span to use for the tasks within this service.
781    ///
782    /// Normally this will be [`Span::none`], but it may be useful to assign a
783    /// defined span, for example if there is more than one active sync
784    /// service.
785    parent_span: Span,
786}
787
788impl SyncServiceBuilder {
789    fn new(client: Client) -> Self {
790        Self {
791            client,
792            with_cross_process_lock: false,
793            with_offline_mode: false,
794            with_share_pos: true,
795            parent_span: Span::none(),
796        }
797    }
798
799    /// Enables the cross-process lock, if the sync service is being built in a
800    /// multi-process setup.
801    ///
802    /// It's a prerequisite if another process can *also* process encryption
803    /// events. This is only applicable to very specific use cases, like an
804    /// external process attempting to decrypt notifications. In general,
805    /// `with_cross_process_lock` should not be called.
806    ///
807    /// Be sure to have configured
808    /// [`Client::cross_process_store_locks_holder_name`] accordingly.
809    pub fn with_cross_process_lock(mut self) -> Self {
810        self.with_cross_process_lock = true;
811        self
812    }
813
814    /// Enable the "offline" mode for the [`SyncService`].
815    ///
816    /// To learn more about the "offline" mode read the documentation for the
817    /// [`State::Offline`] enum variant.
818    pub fn with_offline_mode(mut self) -> Self {
819        self.with_offline_mode = true;
820        self
821    }
822
823    /// Whether to turn [`SlidingSyncBuilder::share_pos`] on or off.
824    ///
825    /// [`SlidingSyncBuilder::share_pos`]: matrix_sdk::sliding_sync::SlidingSyncBuilder::share_pos
826    pub fn with_share_pos(mut self, enable: bool) -> Self {
827        self.with_share_pos = enable;
828        self
829    }
830
831    /// Set the parent tracing span to be used for the tasks within this
832    /// service.
833    pub fn with_parent_span(mut self, parent_span: Span) -> Self {
834        self.parent_span = parent_span;
835        self
836    }
837
838    /// Finish setting up the [`SyncService`].
839    ///
840    /// This creates the underlying sliding syncs, and will *not* start them in
841    /// the background. The resulting [`SyncService`] must be kept alive as long
842    /// as the sliding syncs are supposed to run.
843    pub async fn build(self) -> Result<SyncService, Error> {
844        let Self {
845            client,
846            with_cross_process_lock,
847            with_offline_mode,
848            with_share_pos,
849            parent_span,
850        } = self;
851
852        let encryption_sync_permit = Arc::new(AsyncMutex::new(EncryptionSyncPermit::new()));
853
854        let room_list = RoomListService::new_with_share_pos(client.clone(), with_share_pos).await?;
855
856        let encryption_sync = Arc::new(
857            EncryptionSyncService::new(client, None, WithLocking::from(with_cross_process_lock))
858                .await?,
859        );
860
861        let room_list_service = Arc::new(room_list);
862        let state = SharedObservable::new(State::Idle);
863
864        Ok(SyncService {
865            state: state.clone(),
866            room_list_service,
867            encryption_sync_permit,
868            inner: Arc::new(AsyncMutex::new(SyncServiceInner {
869                supervisor: None,
870                encryption_sync_service: encryption_sync,
871                state,
872                with_offline_mode,
873                parent_span,
874            })),
875        })
876    }
877}
878
879/// Errors for the [`SyncService`] API.
880#[derive(Debug, Error)]
881pub enum Error {
882    /// An error received from the `RoomListService` API.
883    #[error(transparent)]
884    RoomList(#[from] room_list_service::Error),
885
886    /// An error received from the `EncryptionSyncService` API.
887    #[error(transparent)]
888    EncryptionSync(#[from] encryption_sync_service::Error),
889
890    /// An error had occurred in the sync task supervisor, likely due to a bug.
891    #[error("the supervisor channel has run into an unexpected error")]
892    Supervisor,
893}