matrix_sdk_ui/
sync_service.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15//! Unified API for both the Room List API and the Encryption Sync API, that
16//! takes care of all the underlying details.
17//!
18//! This is an opiniated way to run both APIs, with high-level callbacks that
19//! should be called in reaction to user actions and/or system events.
20//!
21//! The sync service will signal errors via its [`state`](SyncService::state)
22//! that the user MUST observe. Whenever an error/termination is observed, the
23//! user should call [`SyncService::start()`] again to restart the room list
24//! sync, if that is not desirable, the offline support for the [`SyncService`]
25//! may be enabled using the [`SyncServiceBuilder::with_offline_mode`] setting.
26
27use std::{sync::Arc, time::Duration};
28
29use eyeball::{SharedObservable, Subscriber};
30use futures_util::{
31    future::{select, Either},
32    pin_mut, StreamExt as _,
33};
34use matrix_sdk::{
35    config::RequestConfig,
36    executor::{spawn, JoinHandle},
37    sleep::sleep,
38    Client,
39};
40use thiserror::Error;
41use tokio::sync::{
42    mpsc::{Receiver, Sender},
43    Mutex as AsyncMutex, OwnedMutexGuard,
44};
45use tracing::{error, info, instrument, trace, warn, Instrument, Level, Span};
46
47use crate::{
48    encryption_sync_service::{self, EncryptionSyncPermit, EncryptionSyncService, WithLocking},
49    room_list_service::{self, RoomListService},
50};
51
52/// Current state of the application.
53///
54/// This is a high-level state indicating what's the status of the underlying
55/// syncs. The application starts in [`State::Running`] mode, and then hits a
56/// terminal state [`State::Terminated`] (if it gracefully exited) or
57/// [`State::Error`] (in case any of the underlying syncs ran into an error).
58///
59/// This can be observed with [`SyncService::state`].
60#[derive(Clone, Debug, PartialEq)]
61pub enum State {
62    /// The service hasn't ever been started yet, or has been stopped.
63    Idle,
64    /// The underlying syncs are properly running in the background.
65    Running,
66    /// Any of the underlying syncs has terminated gracefully (i.e. be stopped).
67    Terminated,
68    /// Any of the underlying syncs has ran into an error.
69    Error,
70    /// The service has entered offline mode. This state will only be entered if
71    /// the [`SyncService`] has been built with the
72    /// [`SyncServiceBuilder::with_offline_mode`] setting.
73    ///
74    /// The [`SyncService`] will enter the offline mode if syncing with the
75    /// server fails, it will then periodically check if the server is
76    /// available using the `/_matrix/client/versions` endpoint.
77    ///
78    /// Once the [`SyncService`] receives a 200 response from the
79    /// `/_matrix/client/versions` endpoint, it will go back into the
80    /// [`State::Running`] mode and attempt to sync again.
81    ///
82    /// Calling [`SyncService::start()`] while in this state will abort the
83    /// `/_matrix/client/versions` checks and attempt to sync immediately.
84    ///
85    /// Calling [`SyncService::stop()`] will abort the offline mode and the
86    /// [`SyncService`] will go into the [`State::Idle`] mode.
87    Offline,
88}
89
90enum MaybeAcquiredPermit {
91    Acquired(OwnedMutexGuard<EncryptionSyncPermit>),
92    Unacquired(Arc<AsyncMutex<EncryptionSyncPermit>>),
93}
94
95impl MaybeAcquiredPermit {
96    async fn acquire(self) -> OwnedMutexGuard<EncryptionSyncPermit> {
97        match self {
98            MaybeAcquiredPermit::Acquired(owned_mutex_guard) => owned_mutex_guard,
99            MaybeAcquiredPermit::Unacquired(lock) => lock.lock_owned().await,
100        }
101    }
102}
103
104/// A supervisor responsible for managing two sync tasks: one for handling the
105/// room list and another for supporting end-to-end encryption.
106///
107/// The two sync tasks are spawned as child tasks and are contained within the
108/// supervising task, which is stored in the [`SyncTaskSupervisor::task`] field.
109///
110/// The supervisor ensures the two child tasks are managed as a single unit,
111/// allowing for them to be shutdown in unison.
112struct SyncTaskSupervisor {
113    /// The supervising task that manages and contains the two sync child tasks.
114    task: JoinHandle<()>,
115    /// [`TerminationReport`] sender for the [`SyncTaskSupervisor::shutdown()`]
116    /// function.
117    termination_sender: Sender<TerminationReport>,
118}
119
120impl SyncTaskSupervisor {
121    async fn new(
122        inner: &SyncServiceInner,
123        room_list_service: Arc<RoomListService>,
124        encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
125    ) -> Self {
126        let (task, termination_sender) =
127            Self::spawn_supervisor_task(inner, room_list_service, encryption_sync_permit).await;
128
129        Self { task, termination_sender }
130    }
131
132    /// Check if a homeserver is reachable.
133    ///
134    /// This function handles the offline mode by waiting for either a
135    /// termination report or a successful `/_matrix/client/versions` response.
136    ///
137    /// This function waits for two conditions:
138    ///
139    /// 1. Waiting for a termination report: This ensures that the user can exit
140    ///    offline mode and attempt to restart the [`SyncService`] manually.
141    ///
142    /// 2. Waiting to come back online: This continuously checks server
143    ///    availability.
144    ///
145    /// If the `/_matrix/client/versions` request succeeds, the function exits
146    /// without a termination report. If we receive a [`TerminationReport`] from
147    /// the user, we exit immediately and return the termination report.
148    async fn offline_check(
149        client: &Client,
150        receiver: &mut Receiver<TerminationReport>,
151    ) -> Option<TerminationReport> {
152        info!("Entering the offline mode");
153
154        let wait_for_termination_report = async {
155            loop {
156                // Since we didn't empty the channel when entering the offline mode in fear that
157                // we might miss a report with the
158                // `TerminationOrigin::Supervisor` origin and the channel might contain stale
159                // reports from one of the sync services, in case both of them have sent a
160                // report, let's ignore all reports we receive from the sync
161                // services.
162                let report =
163                    receiver.recv().await.unwrap_or_else(TerminationReport::supervisor_error);
164
165                match report.origin {
166                    TerminationOrigin::EncryptionSync | TerminationOrigin::RoomList => {}
167                    // Since the sync service aren't running anymore, we can only receive a report
168                    // from the supervisor. It would have probably made sense to have separate
169                    // channels for reports the sync services send and the user can send using the
170                    // `SyncService::stop()` method.
171                    TerminationOrigin::Supervisor => break report,
172                }
173            }
174        };
175
176        let wait_to_be_online = async move {
177            loop {
178                // Encountering network failures when sending a request which has with no retry
179                // limit set in the `RequestConfig` are treated as permanent failures and our
180                // exponential backoff doesn't kick in.
181                //
182                // Let's set a retry limit so network failures are retried as well.
183                let request_config = RequestConfig::default().retry_limit(5);
184
185                // We're in an infinite loop, but our request sending already has an exponential
186                // backoff set up. This will kick in for any request errors that we consider to
187                // be transient. Common network errors (timeouts, DNS failures) or any server
188                // error in the 5xx range of HTTP errors are considered to be transient.
189                //
190                // Still, as a precaution, we're going to sleep here for a while in the Error
191                // case.
192                match client.fetch_server_capabilities(Some(request_config)).await {
193                    Ok(_) => break,
194                    Err(_) => sleep(Duration::from_millis(100)).await,
195                }
196            }
197        };
198
199        pin_mut!(wait_for_termination_report);
200        pin_mut!(wait_to_be_online);
201
202        let maybe_termination_report = select(wait_for_termination_report, wait_to_be_online).await;
203
204        let report = match maybe_termination_report {
205            Either::Left((termination_report, _)) => Some(termination_report),
206            Either::Right((_, _)) => None,
207        };
208
209        info!("Exiting offline mode: {report:?}");
210
211        report
212    }
213
214    /// The role of the supervisor task is to wait for a termination message
215    /// ([`TerminationReport`]), sent either because we wanted to stop both
216    /// syncs, or because one of the syncs failed (in which case we'll stop the
217    /// other one too).
218    async fn spawn_supervisor_task(
219        inner: &SyncServiceInner,
220        room_list_service: Arc<RoomListService>,
221        encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
222    ) -> (JoinHandle<()>, Sender<TerminationReport>) {
223        let (sender, mut receiver) = tokio::sync::mpsc::channel(16);
224
225        let encryption_sync = inner.encryption_sync_service.clone();
226        let state = inner.state.clone();
227        let termination_sender = sender.clone();
228
229        // When we first start, and don't use offline mode, we want to acquire the sync
230        // permit before we enter a future that might be polled at a later time,
231        // this means that the permit will be acquired as soon as this future,
232        // the one the `spawn_supervisor_task` function creates, is awaited.
233        //
234        // In other words, once `sync_service.start().await` is finished, the permit
235        // will be in the acquired state.
236        let mut sync_permit_guard =
237            MaybeAcquiredPermit::Acquired(encryption_sync_permit.clone().lock_owned().await);
238
239        let offline_mode = inner.with_offline_mode;
240        let parent_span = inner.parent_span.clone();
241
242        let future = async move {
243            loop {
244                let (room_list_task, encryption_sync_task) = Self::spawn_child_tasks(
245                    room_list_service.clone(),
246                    encryption_sync.clone(),
247                    sync_permit_guard,
248                    sender.clone(),
249                    parent_span.clone(),
250                )
251                .await;
252
253                sync_permit_guard = MaybeAcquiredPermit::Unacquired(encryption_sync_permit.clone());
254
255                let report = if let Some(report) = receiver.recv().await {
256                    report
257                } else {
258                    info!("internal channel has been closed?");
259                    // We should still stop the child tasks in the unlikely scenario that our
260                    // receiver died.
261                    TerminationReport::supervisor_error()
262                };
263
264                // If one service failed, make sure to request stopping the other one.
265                let (stop_room_list, stop_encryption) = match &report.origin {
266                    TerminationOrigin::EncryptionSync => (true, false),
267                    TerminationOrigin::RoomList => (false, true),
268                    TerminationOrigin::Supervisor => (true, true),
269                };
270
271                // Stop both services, and wait for the streams to properly finish: at some
272                // point they'll return `None` and will exit their infinite loops, and their
273                // tasks will gracefully terminate.
274
275                if stop_room_list {
276                    if let Err(err) = room_list_service.stop_sync() {
277                        warn!(?report, "unable to stop room list service: {err:#}");
278                    }
279
280                    if report.has_expired {
281                        room_list_service.expire_sync_session().await;
282                    }
283                }
284
285                if let Err(err) = room_list_task.await {
286                    error!("when awaiting room list service: {err:#}");
287                }
288
289                if stop_encryption {
290                    if let Err(err) = encryption_sync.stop_sync() {
291                        warn!(?report, "unable to stop encryption sync: {err:#}");
292                    }
293
294                    if report.has_expired {
295                        encryption_sync.expire_sync_session().await;
296                    }
297                }
298
299                if let Err(err) = encryption_sync_task.await {
300                    error!("when awaiting encryption sync: {err:#}");
301                }
302
303                if report.is_error {
304                    if offline_mode {
305                        state.set(State::Offline);
306
307                        let client = room_list_service.client();
308
309                        if let Some(report) = Self::offline_check(client, &mut receiver).await {
310                            if report.is_error {
311                                state.set(State::Error);
312                            } else {
313                                state.set(State::Idle);
314                            }
315                            break;
316                        }
317
318                        state.set(State::Running);
319                    } else {
320                        state.set(State::Error);
321                        break;
322                    }
323                } else if matches!(report.origin, TerminationOrigin::Supervisor) {
324                    state.set(State::Idle);
325                    break;
326                } else {
327                    state.set(State::Terminated);
328                    break;
329                }
330            }
331        }
332        .instrument(tracing::span!(Level::WARN, "supervisor task"));
333
334        let task = spawn(future);
335
336        (task, termination_sender)
337    }
338
339    async fn spawn_child_tasks(
340        room_list_service: Arc<RoomListService>,
341        encryption_sync_service: Arc<EncryptionSyncService>,
342        sync_permit_guard: MaybeAcquiredPermit,
343        sender: Sender<TerminationReport>,
344        parent_span: Span,
345    ) -> (JoinHandle<()>, JoinHandle<()>) {
346        // First, take care of the room list.
347        let room_list_task = spawn(
348            Self::room_list_sync_task(room_list_service, sender.clone())
349                .instrument(parent_span.clone()),
350        );
351
352        // Then, take care of the encryption sync.
353        let encryption_sync_task = spawn(
354            Self::encryption_sync_task(
355                encryption_sync_service,
356                sender.clone(),
357                sync_permit_guard.acquire().await,
358            )
359            .instrument(parent_span),
360        );
361
362        (room_list_task, encryption_sync_task)
363    }
364
365    fn check_if_expired(err: &matrix_sdk::Error) -> bool {
366        err.client_api_error_kind() == Some(&ruma::api::client::error::ErrorKind::UnknownPos)
367    }
368
369    async fn encryption_sync_task(
370        encryption_sync: Arc<EncryptionSyncService>,
371        sender: Sender<TerminationReport>,
372        sync_permit_guard: OwnedMutexGuard<EncryptionSyncPermit>,
373    ) {
374        use encryption_sync_service::Error;
375
376        let encryption_sync_stream = encryption_sync.sync(sync_permit_guard);
377        pin_mut!(encryption_sync_stream);
378
379        let (is_error, has_expired) = loop {
380            match encryption_sync_stream.next().await {
381                Some(Ok(())) => {
382                    // Carry on.
383                }
384                Some(Err(err)) => {
385                    // If the encryption sync error was an expired session, also expire the
386                    // room list sync.
387                    let has_expired = if let Error::SlidingSync(err) = &err {
388                        Self::check_if_expired(err)
389                    } else {
390                        false
391                    };
392
393                    if !has_expired {
394                        error!("Error while processing encryption in sync service: {err:#}");
395                    }
396
397                    break (true, has_expired);
398                }
399                None => {
400                    // The stream has ended.
401                    break (false, false);
402                }
403            }
404        };
405
406        if let Err(err) = sender
407            .send(TerminationReport {
408                is_error,
409                has_expired,
410                origin: TerminationOrigin::EncryptionSync,
411            })
412            .await
413        {
414            error!("Error while sending termination report: {err:#}");
415        }
416    }
417
418    async fn room_list_sync_task(
419        room_list_service: Arc<RoomListService>,
420        sender: Sender<TerminationReport>,
421    ) {
422        use room_list_service::Error;
423
424        let room_list_stream = room_list_service.sync();
425        pin_mut!(room_list_stream);
426
427        let (is_error, has_expired) = loop {
428            match room_list_stream.next().await {
429                Some(Ok(())) => {
430                    // Carry on.
431                }
432                Some(Err(err)) => {
433                    // If the room list error was an expired session, also expire the
434                    // encryption sync.
435                    let has_expired = if let Error::SlidingSync(err) = &err {
436                        Self::check_if_expired(err)
437                    } else {
438                        false
439                    };
440
441                    if !has_expired {
442                        error!("Error while processing room list in sync service: {err:#}");
443                    }
444
445                    break (true, has_expired);
446                }
447                None => {
448                    // The stream has ended.
449                    break (false, false);
450                }
451            }
452        };
453
454        if let Err(err) = sender
455            .send(TerminationReport { is_error, has_expired, origin: TerminationOrigin::RoomList })
456            .await
457        {
458            error!("Error while sending termination report: {err:#}");
459        }
460    }
461
462    async fn shutdown(self) {
463        match self
464            .termination_sender
465            .send(TerminationReport {
466                is_error: false,
467                has_expired: false,
468                origin: TerminationOrigin::Supervisor,
469            })
470            .await
471        {
472            Ok(_) => {
473                let _ = self.task.await.inspect_err(|err| {
474                    // A `JoinError` indicates that the task was already dead, either because it got
475                    // cancelled or because it panicked. We only cancel the task in the Err branch
476                    // below and the task shouldn't be able to panic.
477                    //
478                    // So let's log an error and return.
479                    error!("The supervisor task has stopped unexpectedly: {err:?}");
480                });
481            }
482            Err(err) => {
483                error!("Couldn't send the termination report to the supervisor task: {err}");
484                // Let's abort the task if it won't shut down properly, otherwise we would have
485                // left it as a detached task.
486                self.task.abort();
487            }
488        }
489    }
490}
491
492struct SyncServiceInner {
493    encryption_sync_service: Arc<EncryptionSyncService>,
494    /// Is the offline mode for the [`SyncService`] enabled?
495    ///
496    /// The offline mode is described in the [`State::Offline`] enum variant.
497    with_offline_mode: bool,
498    state: SharedObservable<State>,
499
500    /// The parent tracing span to use for the tasks within this service.
501    ///
502    /// Normally this will be [`Span::none`], but it may be useful to assign a
503    /// defined span, for example if there is more than one active sync
504    /// service.
505    parent_span: Span,
506
507    /// Supervisor task ensuring proper termination.
508    ///
509    /// This task is waiting for a [`TerminationReport`] from any of the other
510    /// two tasks, or from a user request via [`SyncService::stop()`]. It
511    /// makes sure that the two services are properly shut up and just
512    /// interrupted.
513    ///
514    /// This is set at the same time as the other two tasks.
515    supervisor: Option<SyncTaskSupervisor>,
516}
517
518impl SyncServiceInner {
519    async fn start(
520        &mut self,
521        room_list_service: Arc<RoomListService>,
522        encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
523    ) {
524        trace!("starting sync service");
525
526        self.supervisor =
527            Some(SyncTaskSupervisor::new(self, room_list_service, encryption_sync_permit).await);
528        self.state.set(State::Running);
529    }
530
531    async fn stop(&mut self) {
532        trace!("pausing sync service");
533
534        // Remove the supervisor from our state and request the tasks to be shutdown.
535        if let Some(supervisor) = self.supervisor.take() {
536            supervisor.shutdown().await;
537        } else {
538            error!("The sync service was not properly started, the supervisor task doesn't exist");
539        }
540    }
541
542    async fn restart(
543        &mut self,
544        room_list_service: Arc<RoomListService>,
545        encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
546    ) {
547        self.stop().await;
548        self.start(room_list_service, encryption_sync_permit).await;
549    }
550}
551
552/// A high level manager for your Matrix syncing needs.
553///
554/// The [`SyncService`] is responsible for managing real-time synchronization
555/// with a Matrix server. It can initiate and maintain the necessary
556/// synchronization tasks for you.
557///
558/// **Note**: The [`SyncService`] requires a server with support for [MSC4186],
559/// otherwise it will fail with an 404 `M_UNRECOGNIZED` request error.
560///
561/// [MSC4186]: https://github.com/matrix-org/matrix-spec-proposals/pull/4186/
562///
563/// # Example
564///
565/// ```no_run
566/// use matrix_sdk::Client;
567/// use matrix_sdk_ui::sync_service::{State, SyncService};
568/// # use url::Url;
569/// # async {
570/// let homeserver = Url::parse("http://example.com")?;
571/// let client = Client::new(homeserver).await?;
572///
573/// client
574///     .matrix_auth()
575///     .login_username("example", "wordpass")
576///     .initial_device_display_name("My bot")
577///     .await?;
578///
579/// let sync_service = SyncService::builder(client).build().await?;
580/// let mut state = sync_service.state();
581///
582/// while let Some(state) = state.next().await {
583///     match state {
584///         State::Idle => eprintln!("The sync service is idle."),
585///         State::Running => eprintln!("The sync has started to run."),
586///         State::Offline => eprintln!(
587///             "We have entered the offline mode, the server seems to be
588///              unavailable"
589///         ),
590///         State::Terminated => {
591///             eprintln!("The sync service has been gracefully terminated");
592///             break;
593///         }
594///         State::Error => {
595///             eprintln!("The sync service has run into an error");
596///             break;
597///         }
598///     }
599/// }
600/// # anyhow::Ok(()) };
601/// ```
602pub struct SyncService {
603    inner: Arc<AsyncMutex<SyncServiceInner>>,
604
605    /// Room list service used to synchronize the rooms state.
606    room_list_service: Arc<RoomListService>,
607
608    /// What's the state of this sync service? This field is replicated from the
609    /// [`SyncServiceInner`] struct, but it should not be modified in this
610    /// struct. It's re-exposed here so we can subscribe to the state without
611    /// taking the lock on the `inner` field.
612    state: SharedObservable<State>,
613
614    /// Global lock to allow using at most one [`EncryptionSyncService`] at all
615    /// times.
616    ///
617    /// This ensures that there's only one ever existing in the application's
618    /// lifetime (under the assumption that there is at most one [`SyncService`]
619    /// per application).
620    encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
621}
622
623impl SyncService {
624    /// Create a new builder for configuring an `SyncService`.
625    pub fn builder(client: Client) -> SyncServiceBuilder {
626        SyncServiceBuilder::new(client)
627    }
628
629    /// Get the underlying `RoomListService` instance for easier access to its
630    /// methods.
631    pub fn room_list_service(&self) -> Arc<RoomListService> {
632        self.room_list_service.clone()
633    }
634
635    /// Returns the state of the sync service.
636    pub fn state(&self) -> Subscriber<State> {
637        self.state.subscribe()
638    }
639
640    /// Start (or restart) the underlying sliding syncs.
641    ///
642    /// This can be called multiple times safely:
643    /// - if the stream is still properly running, it won't be restarted.
644    /// - if the [`SyncService`] is in the offline mode we will exit the offline
645    ///   mode and immediately attempt to sync again.
646    /// - if the stream has been aborted before, it will be properly cleaned up
647    ///   and restarted.
648    pub async fn start(&self) {
649        let mut inner = self.inner.lock().await;
650
651        // Only (re)start the tasks if it's stopped or if we're in the offline mode.
652        match inner.state.get() {
653            // If we're already running, there's nothing to do.
654            State::Running => {}
655            // If we're in the offline mode, first stop the service and then start it again.
656            State::Offline => {
657                inner
658                    .restart(self.room_list_service.clone(), self.encryption_sync_permit.clone())
659                    .await
660            }
661            // Otherwise just start.
662            State::Idle | State::Terminated | State::Error => {
663                inner
664                    .start(self.room_list_service.clone(), self.encryption_sync_permit.clone())
665                    .await
666            }
667        }
668    }
669
670    /// Stop the underlying sliding syncs.
671    ///
672    /// This must be called when the app goes into the background. It's better
673    /// to call this API when the application exits, although not strictly
674    /// necessary.
675    #[instrument(skip_all)]
676    pub async fn stop(&self) {
677        let mut inner = self.inner.lock().await;
678
679        match inner.state.get() {
680            State::Idle | State::Terminated | State::Error => {
681                // No need to stop if we were not running.
682                return;
683            }
684            State::Running | State::Offline => {}
685        }
686
687        inner.stop().await
688    }
689
690    /// Attempt to get a permit to use an `EncryptionSyncService` at a given
691    /// time.
692    ///
693    /// This ensures there is at most one [`EncryptionSyncService`] active at
694    /// any time, per application.
695    pub fn try_get_encryption_sync_permit(&self) -> Option<OwnedMutexGuard<EncryptionSyncPermit>> {
696        self.encryption_sync_permit.clone().try_lock_owned().ok()
697    }
698}
699
700#[derive(Debug)]
701enum TerminationOrigin {
702    EncryptionSync,
703    RoomList,
704    Supervisor,
705}
706
707#[derive(Debug)]
708struct TerminationReport {
709    is_error: bool,
710    has_expired: bool,
711    origin: TerminationOrigin,
712}
713
714impl TerminationReport {
715    fn supervisor_error() -> Self {
716        TerminationReport {
717            is_error: true,
718            has_expired: false,
719            origin: TerminationOrigin::Supervisor,
720        }
721    }
722}
723
724// Testing helpers, mostly.
725#[doc(hidden)]
726impl SyncService {
727    /// Is the task supervisor running?
728    pub async fn is_supervisor_running(&self) -> bool {
729        self.inner.lock().await.supervisor.is_some()
730    }
731}
732
733#[derive(Clone)]
734pub struct SyncServiceBuilder {
735    /// SDK client.
736    client: Client,
737
738    /// Is the cross-process lock for the crypto store enabled?
739    with_cross_process_lock: bool,
740
741    /// Is the offline mode for the [`SyncService`] enabled?
742    ///
743    /// The offline mode is described in the [`State::Offline`] enum variant.
744    with_offline_mode: bool,
745
746    /// The parent tracing span to use for the tasks within this service.
747    ///
748    /// Normally this will be [`Span::none`], but it may be useful to assign a
749    /// defined span, for example if there is more than one active sync
750    /// service.
751    parent_span: Span,
752}
753
754impl SyncServiceBuilder {
755    fn new(client: Client) -> Self {
756        Self {
757            client,
758            with_cross_process_lock: false,
759            with_offline_mode: false,
760            parent_span: Span::none(),
761        }
762    }
763
764    /// Enables the cross-process lock, if the sync service is being built in a
765    /// multi-process setup.
766    ///
767    /// It's a prerequisite if another process can *also* process encryption
768    /// events. This is only applicable to very specific use cases, like an
769    /// external process attempting to decrypt notifications. In general,
770    /// `with_cross_process_lock` should not be called.
771    ///
772    /// Be sure to have configured
773    /// [`Client::cross_process_store_locks_holder_name`] accordingly.
774    pub fn with_cross_process_lock(mut self) -> Self {
775        self.with_cross_process_lock = true;
776        self
777    }
778
779    /// Enable the "offline" mode for the [`SyncService`].
780    ///
781    /// To learn more about the "offline" mode read the documentation for the
782    /// [`State::Offline`] enum variant.
783    pub fn with_offline_mode(mut self) -> Self {
784        self.with_offline_mode = true;
785        self
786    }
787
788    /// Set the parent tracing span to be used for the tasks within this
789    /// service.
790    pub fn with_parent_span(mut self, parent_span: Span) -> Self {
791        self.parent_span = parent_span;
792        self
793    }
794
795    /// Finish setting up the [`SyncService`].
796    ///
797    /// This creates the underlying sliding syncs, and will *not* start them in
798    /// the background. The resulting [`SyncService`] must be kept alive as long
799    /// as the sliding syncs are supposed to run.
800    pub async fn build(self) -> Result<SyncService, Error> {
801        let Self { client, with_cross_process_lock, with_offline_mode, parent_span } = self;
802
803        let encryption_sync_permit = Arc::new(AsyncMutex::new(EncryptionSyncPermit::new()));
804
805        let room_list = RoomListService::new(client.clone()).await?;
806
807        let encryption_sync = Arc::new(
808            EncryptionSyncService::new(client, None, WithLocking::from(with_cross_process_lock))
809                .await?,
810        );
811
812        let room_list_service = Arc::new(room_list);
813        let state = SharedObservable::new(State::Idle);
814
815        Ok(SyncService {
816            state: state.clone(),
817            room_list_service,
818            encryption_sync_permit,
819            inner: Arc::new(AsyncMutex::new(SyncServiceInner {
820                supervisor: None,
821                encryption_sync_service: encryption_sync,
822                state,
823                with_offline_mode,
824                parent_span,
825            })),
826        })
827    }
828}
829
830/// Errors for the [`SyncService`] API.
831#[derive(Debug, Error)]
832pub enum Error {
833    /// An error received from the `RoomListService` API.
834    #[error(transparent)]
835    RoomList(#[from] room_list_service::Error),
836
837    /// An error received from the `EncryptionSyncService` API.
838    #[error(transparent)]
839    EncryptionSync(#[from] encryption_sync_service::Error),
840
841    /// An error had occurred in the sync task supervisor, likely due to a bug.
842    #[error("the supervisor channel has run into an unexpected error")]
843    InternalSupervisorError,
844}