matrix_sdk_ui/
sync_service.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15//! Unified API for both the Room List API and the Encryption Sync API, that
16//! takes care of all the underlying details.
17//!
18//! This is an opiniated way to run both APIs, with high-level callbacks that
19//! should be called in reaction to user actions and/or system events.
20//!
21//! The sync service will signal errors via its [`state`](SyncService::state)
22//! that the user MUST observe. Whenever an error/termination is observed, the
23//! user should call [`SyncService::start()`] again to restart the room list
24//! sync, if that is not desirable, the offline support for the [`SyncService`]
25//! may be enabled using the [`SyncServiceBuilder::with_offline_mode`] setting.
26
27use std::{sync::Arc, time::Duration};
28
29use eyeball::{SharedObservable, Subscriber};
30use futures_util::{
31    StreamExt as _,
32    future::{Either, select},
33    pin_mut,
34};
35use matrix_sdk::{
36    Client,
37    config::RequestConfig,
38    executor::{JoinHandle, spawn},
39    sleep::sleep,
40};
41use thiserror::Error;
42use tokio::sync::{
43    Mutex as AsyncMutex, OwnedMutexGuard,
44    mpsc::{Receiver, Sender},
45};
46use tracing::{Instrument, Level, Span, error, info, instrument, trace, warn};
47
48use crate::{
49    encryption_sync_service::{self, EncryptionSyncPermit, EncryptionSyncService, WithLocking},
50    room_list_service::{self, RoomListService},
51};
52
53/// Current state of the application.
54///
55/// This is a high-level state indicating what's the status of the underlying
56/// syncs. The application starts in [`State::Running`] mode, and then hits a
57/// terminal state [`State::Terminated`] (if it gracefully exited) or
58/// [`State::Error`] (in case any of the underlying syncs ran into an error).
59///
60/// This can be observed with [`SyncService::state`].
61#[derive(Clone, Debug, PartialEq)]
62pub enum State {
63    /// The service hasn't ever been started yet, or has been stopped.
64    Idle,
65    /// The underlying syncs are properly running in the background.
66    Running,
67    /// Any of the underlying syncs has terminated gracefully (i.e. be stopped).
68    Terminated,
69    /// Any of the underlying syncs has ran into an error.
70    Error,
71    /// The service has entered offline mode. This state will only be entered if
72    /// the [`SyncService`] has been built with the
73    /// [`SyncServiceBuilder::with_offline_mode`] setting.
74    ///
75    /// The [`SyncService`] will enter the offline mode if syncing with the
76    /// server fails, it will then periodically check if the server is
77    /// available using the `/_matrix/client/versions` endpoint.
78    ///
79    /// Once the [`SyncService`] receives a 200 response from the
80    /// `/_matrix/client/versions` endpoint, it will go back into the
81    /// [`State::Running`] mode and attempt to sync again.
82    ///
83    /// Calling [`SyncService::start()`] while in this state will abort the
84    /// `/_matrix/client/versions` checks and attempt to sync immediately.
85    ///
86    /// Calling [`SyncService::stop()`] will abort the offline mode and the
87    /// [`SyncService`] will go into the [`State::Idle`] mode.
88    Offline,
89}
90
91enum MaybeAcquiredPermit {
92    Acquired(OwnedMutexGuard<EncryptionSyncPermit>),
93    Unacquired(Arc<AsyncMutex<EncryptionSyncPermit>>),
94}
95
96impl MaybeAcquiredPermit {
97    async fn acquire(self) -> OwnedMutexGuard<EncryptionSyncPermit> {
98        match self {
99            MaybeAcquiredPermit::Acquired(owned_mutex_guard) => owned_mutex_guard,
100            MaybeAcquiredPermit::Unacquired(lock) => lock.lock_owned().await,
101        }
102    }
103}
104
105/// A supervisor responsible for managing two sync tasks: one for handling the
106/// room list and another for supporting end-to-end encryption.
107///
108/// The two sync tasks are spawned as child tasks and are contained within the
109/// supervising task, which is stored in the [`SyncTaskSupervisor::task`] field.
110///
111/// The supervisor ensures the two child tasks are managed as a single unit,
112/// allowing for them to be shutdown in unison.
113struct SyncTaskSupervisor {
114    /// The supervising task that manages and contains the two sync child tasks.
115    task: JoinHandle<()>,
116    /// [`TerminationReport`] sender for the [`SyncTaskSupervisor::shutdown()`]
117    /// function.
118    termination_sender: Sender<TerminationReport>,
119}
120
121impl SyncTaskSupervisor {
122    async fn new(
123        inner: &SyncServiceInner,
124        room_list_service: Arc<RoomListService>,
125        encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
126    ) -> Self {
127        let (task, termination_sender) =
128            Self::spawn_supervisor_task(inner, room_list_service, encryption_sync_permit).await;
129
130        Self { task, termination_sender }
131    }
132
133    /// Check if a homeserver is reachable.
134    ///
135    /// This function handles the offline mode by waiting for either a
136    /// termination report or a successful `/_matrix/client/versions` response.
137    ///
138    /// This function waits for two conditions:
139    ///
140    /// 1. Waiting for a termination report: This ensures that the user can exit
141    ///    offline mode and attempt to restart the [`SyncService`] manually.
142    ///
143    /// 2. Waiting to come back online: This continuously checks server
144    ///    availability.
145    ///
146    /// If the `/_matrix/client/versions` request succeeds, the function exits
147    /// without a termination report. If we receive a [`TerminationReport`] from
148    /// the user, we exit immediately and return the termination report.
149    async fn offline_check(
150        client: &Client,
151        receiver: &mut Receiver<TerminationReport>,
152    ) -> Option<TerminationReport> {
153        info!("Entering the offline mode");
154
155        let wait_for_termination_report = async {
156            loop {
157                // Since we didn't empty the channel when entering the offline mode in fear that
158                // we might miss a report with the
159                // `TerminationOrigin::Supervisor` origin and the channel might contain stale
160                // reports from one of the sync services, in case both of them have sent a
161                // report, let's ignore all reports we receive from the sync
162                // services.
163                let report =
164                    receiver.recv().await.unwrap_or_else(TerminationReport::supervisor_error);
165
166                match report.origin {
167                    TerminationOrigin::EncryptionSync | TerminationOrigin::RoomList => {}
168                    // Since the sync service aren't running anymore, we can only receive a report
169                    // from the supervisor. It would have probably made sense to have separate
170                    // channels for reports the sync services send and the user can send using the
171                    // `SyncService::stop()` method.
172                    TerminationOrigin::Supervisor => break report,
173                }
174            }
175        };
176
177        let wait_to_be_online = async move {
178            loop {
179                // Encountering network failures when sending a request which has with no retry
180                // limit set in the `RequestConfig` are treated as permanent failures and our
181                // exponential backoff doesn't kick in.
182                //
183                // Let's set a retry limit so network failures are retried as well.
184                let request_config = RequestConfig::default().retry_limit(5);
185
186                // We're in an infinite loop, but our request sending already has an exponential
187                // backoff set up. This will kick in for any request errors that we consider to
188                // be transient. Common network errors (timeouts, DNS failures) or any server
189                // error in the 5xx range of HTTP errors are considered to be transient.
190                //
191                // Still, as a precaution, we're going to sleep here for a while in the Error
192                // case.
193                match client.fetch_server_versions(Some(request_config)).await {
194                    Ok(_) => break,
195                    Err(_) => sleep(Duration::from_millis(100)).await,
196                }
197            }
198        };
199
200        pin_mut!(wait_for_termination_report);
201        pin_mut!(wait_to_be_online);
202
203        let maybe_termination_report = select(wait_for_termination_report, wait_to_be_online).await;
204
205        let report = match maybe_termination_report {
206            Either::Left((termination_report, _)) => Some(termination_report),
207            Either::Right((_, _)) => None,
208        };
209
210        info!("Exiting offline mode: {report:?}");
211
212        report
213    }
214
215    /// The role of the supervisor task is to wait for a termination message
216    /// ([`TerminationReport`]), sent either because we wanted to stop both
217    /// syncs, or because one of the syncs failed (in which case we'll stop the
218    /// other one too).
219    async fn spawn_supervisor_task(
220        inner: &SyncServiceInner,
221        room_list_service: Arc<RoomListService>,
222        encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
223    ) -> (JoinHandle<()>, Sender<TerminationReport>) {
224        let (sender, mut receiver) = tokio::sync::mpsc::channel(16);
225
226        let encryption_sync = inner.encryption_sync_service.clone();
227        let state = inner.state.clone();
228        let termination_sender = sender.clone();
229
230        // When we first start, and don't use offline mode, we want to acquire the sync
231        // permit before we enter a future that might be polled at a later time,
232        // this means that the permit will be acquired as soon as this future,
233        // the one the `spawn_supervisor_task` function creates, is awaited.
234        //
235        // In other words, once `sync_service.start().await` is finished, the permit
236        // will be in the acquired state.
237        let mut sync_permit_guard =
238            MaybeAcquiredPermit::Acquired(encryption_sync_permit.clone().lock_owned().await);
239
240        let offline_mode = inner.with_offline_mode;
241        let parent_span = inner.parent_span.clone();
242
243        let future = async move {
244            loop {
245                let (room_list_task, encryption_sync_task) = Self::spawn_child_tasks(
246                    room_list_service.clone(),
247                    encryption_sync.clone(),
248                    sync_permit_guard,
249                    sender.clone(),
250                    parent_span.clone(),
251                )
252                .await;
253
254                sync_permit_guard = MaybeAcquiredPermit::Unacquired(encryption_sync_permit.clone());
255
256                let report = if let Some(report) = receiver.recv().await {
257                    report
258                } else {
259                    info!("internal channel has been closed?");
260                    // We should still stop the child tasks in the unlikely scenario that our
261                    // receiver died.
262                    TerminationReport::supervisor_error()
263                };
264
265                // If one service failed, make sure to request stopping the other one.
266                let (stop_room_list, stop_encryption) = match &report.origin {
267                    TerminationOrigin::EncryptionSync => (true, false),
268                    TerminationOrigin::RoomList => (false, true),
269                    TerminationOrigin::Supervisor => (true, true),
270                };
271
272                // Stop both services, and wait for the streams to properly finish: at some
273                // point they'll return `None` and will exit their infinite loops, and their
274                // tasks will gracefully terminate.
275
276                if stop_room_list {
277                    if let Err(err) = room_list_service.stop_sync() {
278                        warn!(?report, "unable to stop room list service: {err:#}");
279                    }
280
281                    if report.has_expired {
282                        room_list_service.expire_sync_session().await;
283                    }
284                }
285
286                if let Err(err) = room_list_task.await {
287                    error!("when awaiting room list service: {err:#}");
288                }
289
290                if stop_encryption {
291                    if let Err(err) = encryption_sync.stop_sync() {
292                        warn!(?report, "unable to stop encryption sync: {err:#}");
293                    }
294
295                    if report.has_expired {
296                        encryption_sync.expire_sync_session().await;
297                    }
298                }
299
300                if let Err(err) = encryption_sync_task.await {
301                    error!("when awaiting encryption sync: {err:#}");
302                }
303
304                if report.is_error {
305                    if offline_mode {
306                        state.set(State::Offline);
307
308                        let client = room_list_service.client();
309
310                        if let Some(report) = Self::offline_check(client, &mut receiver).await {
311                            if report.is_error {
312                                state.set(State::Error);
313                            } else {
314                                state.set(State::Idle);
315                            }
316                            break;
317                        }
318
319                        state.set(State::Running);
320                    } else {
321                        state.set(State::Error);
322                        break;
323                    }
324                } else if matches!(report.origin, TerminationOrigin::Supervisor) {
325                    state.set(State::Idle);
326                    break;
327                } else {
328                    state.set(State::Terminated);
329                    break;
330                }
331            }
332        }
333        .instrument(tracing::span!(Level::WARN, "supervisor task"));
334
335        let task = spawn(future);
336
337        (task, termination_sender)
338    }
339
340    async fn spawn_child_tasks(
341        room_list_service: Arc<RoomListService>,
342        encryption_sync_service: Arc<EncryptionSyncService>,
343        sync_permit_guard: MaybeAcquiredPermit,
344        sender: Sender<TerminationReport>,
345        parent_span: Span,
346    ) -> (JoinHandle<()>, JoinHandle<()>) {
347        // First, take care of the room list.
348        let room_list_task = spawn(
349            Self::room_list_sync_task(room_list_service, sender.clone())
350                .instrument(parent_span.clone()),
351        );
352
353        // Then, take care of the encryption sync.
354        let encryption_sync_task = spawn(
355            Self::encryption_sync_task(
356                encryption_sync_service,
357                sender.clone(),
358                sync_permit_guard.acquire().await,
359            )
360            .instrument(parent_span),
361        );
362
363        (room_list_task, encryption_sync_task)
364    }
365
366    fn check_if_expired(err: &matrix_sdk::Error) -> bool {
367        err.client_api_error_kind() == Some(&ruma::api::client::error::ErrorKind::UnknownPos)
368    }
369
370    async fn encryption_sync_task(
371        encryption_sync: Arc<EncryptionSyncService>,
372        sender: Sender<TerminationReport>,
373        sync_permit_guard: OwnedMutexGuard<EncryptionSyncPermit>,
374    ) {
375        use encryption_sync_service::Error;
376
377        let encryption_sync_stream = encryption_sync.sync(sync_permit_guard);
378        pin_mut!(encryption_sync_stream);
379
380        let (is_error, has_expired) = loop {
381            match encryption_sync_stream.next().await {
382                Some(Ok(())) => {
383                    // Carry on.
384                }
385                Some(Err(err)) => {
386                    // If the encryption sync error was an expired session, also expire the
387                    // room list sync.
388                    let has_expired = if let Error::SlidingSync(err) = &err {
389                        Self::check_if_expired(err)
390                    } else {
391                        false
392                    };
393
394                    if !has_expired {
395                        error!("Error while processing encryption in sync service: {err:#}");
396                    }
397
398                    break (true, has_expired);
399                }
400                None => {
401                    // The stream has ended.
402                    break (false, false);
403                }
404            }
405        };
406
407        if let Err(err) = sender
408            .send(TerminationReport {
409                is_error,
410                has_expired,
411                origin: TerminationOrigin::EncryptionSync,
412            })
413            .await
414        {
415            error!("Error while sending termination report: {err:#}");
416        }
417    }
418
419    async fn room_list_sync_task(
420        room_list_service: Arc<RoomListService>,
421        sender: Sender<TerminationReport>,
422    ) {
423        use room_list_service::Error;
424
425        let room_list_stream = room_list_service.sync();
426        pin_mut!(room_list_stream);
427
428        let (is_error, has_expired) = loop {
429            match room_list_stream.next().await {
430                Some(Ok(())) => {
431                    // Carry on.
432                }
433                Some(Err(err)) => {
434                    // If the room list error was an expired session, also expire the
435                    // encryption sync.
436                    let has_expired = if let Error::SlidingSync(err) = &err {
437                        Self::check_if_expired(err)
438                    } else {
439                        false
440                    };
441
442                    if !has_expired {
443                        error!("Error while processing room list in sync service: {err:#}");
444                    }
445
446                    break (true, has_expired);
447                }
448                None => {
449                    // The stream has ended.
450                    break (false, false);
451                }
452            }
453        };
454
455        if let Err(err) = sender
456            .send(TerminationReport { is_error, has_expired, origin: TerminationOrigin::RoomList })
457            .await
458        {
459            error!("Error while sending termination report: {err:#}");
460        }
461    }
462
463    async fn shutdown(self) {
464        match self
465            .termination_sender
466            .send(TerminationReport {
467                is_error: false,
468                has_expired: false,
469                origin: TerminationOrigin::Supervisor,
470            })
471            .await
472        {
473            Ok(_) => {
474                let _ = self.task.await.inspect_err(|err| {
475                    // A `JoinError` indicates that the task was already dead, either because it got
476                    // cancelled or because it panicked. We only cancel the task in the Err branch
477                    // below and the task shouldn't be able to panic.
478                    //
479                    // So let's log an error and return.
480                    error!("The supervisor task has stopped unexpectedly: {err:?}");
481                });
482            }
483            Err(err) => {
484                error!("Couldn't send the termination report to the supervisor task: {err}");
485                // Let's abort the task if it won't shut down properly, otherwise we would have
486                // left it as a detached task.
487                self.task.abort();
488            }
489        }
490    }
491}
492
493struct SyncServiceInner {
494    encryption_sync_service: Arc<EncryptionSyncService>,
495
496    /// Is the offline mode for the [`SyncService`] enabled?
497    ///
498    /// The offline mode is described in the [`State::Offline`] enum variant.
499    with_offline_mode: bool,
500
501    state: SharedObservable<State>,
502
503    /// The parent tracing span to use for the tasks within this service.
504    ///
505    /// Normally this will be [`Span::none`], but it may be useful to assign a
506    /// defined span, for example if there is more than one active sync
507    /// service.
508    parent_span: Span,
509
510    /// Supervisor task ensuring proper termination.
511    ///
512    /// This task is waiting for a [`TerminationReport`] from any of the other
513    /// two tasks, or from a user request via [`SyncService::stop()`]. It
514    /// makes sure that the two services are properly shut up and just
515    /// interrupted.
516    ///
517    /// This is set at the same time as the other two tasks.
518    supervisor: Option<SyncTaskSupervisor>,
519}
520
521impl SyncServiceInner {
522    async fn start(
523        &mut self,
524        room_list_service: Arc<RoomListService>,
525        encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
526    ) {
527        trace!("starting sync service");
528
529        self.supervisor =
530            Some(SyncTaskSupervisor::new(self, room_list_service, encryption_sync_permit).await);
531        self.state.set(State::Running);
532    }
533
534    async fn stop(&mut self) {
535        trace!("pausing sync service");
536
537        // Remove the supervisor from our state and request the tasks to be shutdown.
538        if let Some(supervisor) = self.supervisor.take() {
539            supervisor.shutdown().await;
540        } else {
541            error!("The sync service was not properly started, the supervisor task doesn't exist");
542        }
543    }
544
545    async fn restart(
546        &mut self,
547        room_list_service: Arc<RoomListService>,
548        encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
549    ) {
550        self.stop().await;
551        self.start(room_list_service, encryption_sync_permit).await;
552    }
553}
554
555/// A high level manager for your Matrix syncing needs.
556///
557/// The [`SyncService`] is responsible for managing real-time synchronization
558/// with a Matrix server. It can initiate and maintain the necessary
559/// synchronization tasks for you.
560///
561/// **Note**: The [`SyncService`] requires a server with support for [MSC4186],
562/// otherwise it will fail with an 404 `M_UNRECOGNIZED` request error.
563///
564/// [MSC4186]: https://github.com/matrix-org/matrix-spec-proposals/pull/4186/
565///
566/// # Example
567///
568/// ```no_run
569/// use matrix_sdk::Client;
570/// use matrix_sdk_ui::sync_service::{State, SyncService};
571/// # use url::Url;
572/// # async {
573/// let homeserver = Url::parse("http://example.com")?;
574/// let client = Client::new(homeserver).await?;
575///
576/// client
577///     .matrix_auth()
578///     .login_username("example", "wordpass")
579///     .initial_device_display_name("My bot")
580///     .await?;
581///
582/// let sync_service = SyncService::builder(client).build().await?;
583/// let mut state = sync_service.state();
584///
585/// while let Some(state) = state.next().await {
586///     match state {
587///         State::Idle => eprintln!("The sync service is idle."),
588///         State::Running => eprintln!("The sync has started to run."),
589///         State::Offline => eprintln!(
590///             "We have entered the offline mode, the server seems to be
591///              unavailable"
592///         ),
593///         State::Terminated => {
594///             eprintln!("The sync service has been gracefully terminated");
595///             break;
596///         }
597///         State::Error => {
598///             eprintln!("The sync service has run into an error");
599///             break;
600///         }
601///     }
602/// }
603/// # anyhow::Ok(()) };
604/// ```
605pub struct SyncService {
606    inner: Arc<AsyncMutex<SyncServiceInner>>,
607
608    /// Room list service used to synchronize the rooms state.
609    room_list_service: Arc<RoomListService>,
610
611    /// What's the state of this sync service? This field is replicated from the
612    /// [`SyncServiceInner`] struct, but it should not be modified in this
613    /// struct. It's re-exposed here so we can subscribe to the state without
614    /// taking the lock on the `inner` field.
615    state: SharedObservable<State>,
616
617    /// Global lock to allow using at most one [`EncryptionSyncService`] at all
618    /// times.
619    ///
620    /// This ensures that there's only one ever existing in the application's
621    /// lifetime (under the assumption that there is at most one [`SyncService`]
622    /// per application).
623    encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
624}
625
626impl SyncService {
627    /// Create a new builder for configuring an `SyncService`.
628    pub fn builder(client: Client) -> SyncServiceBuilder {
629        SyncServiceBuilder::new(client)
630    }
631
632    /// Get the underlying `RoomListService` instance for easier access to its
633    /// methods.
634    pub fn room_list_service(&self) -> Arc<RoomListService> {
635        self.room_list_service.clone()
636    }
637
638    /// Returns the state of the sync service.
639    pub fn state(&self) -> Subscriber<State> {
640        self.state.subscribe()
641    }
642
643    /// Start (or restart) the underlying sliding syncs.
644    ///
645    /// This can be called multiple times safely:
646    /// - if the stream is still properly running, it won't be restarted.
647    /// - if the [`SyncService`] is in the offline mode we will exit the offline
648    ///   mode and immediately attempt to sync again.
649    /// - if the stream has been aborted before, it will be properly cleaned up
650    ///   and restarted.
651    pub async fn start(&self) {
652        let mut inner = self.inner.lock().await;
653
654        // Only (re)start the tasks if it's stopped or if we're in the offline mode.
655        match inner.state.get() {
656            // If we're already running, there's nothing to do.
657            State::Running => {}
658            // If we're in the offline mode, first stop the service and then start it again.
659            State::Offline => {
660                inner
661                    .restart(self.room_list_service.clone(), self.encryption_sync_permit.clone())
662                    .await
663            }
664            // Otherwise just start.
665            State::Idle | State::Terminated | State::Error => {
666                inner
667                    .start(self.room_list_service.clone(), self.encryption_sync_permit.clone())
668                    .await
669            }
670        }
671    }
672
673    /// Stop the underlying sliding syncs.
674    ///
675    /// This must be called when the app goes into the background. It's better
676    /// to call this API when the application exits, although not strictly
677    /// necessary.
678    #[instrument(skip_all)]
679    pub async fn stop(&self) {
680        let mut inner = self.inner.lock().await;
681
682        match inner.state.get() {
683            State::Idle | State::Terminated | State::Error => {
684                // No need to stop if we were not running.
685                return;
686            }
687            State::Running | State::Offline => {}
688        }
689
690        inner.stop().await;
691    }
692
693    /// Force expiring both sessions.
694    ///
695    /// This ensures that the sync service is stopped before expiring both
696    /// sessions. It should be used sparingly, as it will cause a restart of
697    /// the sessions on the server as well.
698    #[instrument(skip_all)]
699    pub async fn expire_sessions(&self) {
700        // First, stop the sync service if it was running; it's a no-op if it was
701        // already stopped.
702        self.stop().await;
703
704        // Expire the room list sync session.
705        self.room_list_service.expire_sync_session().await;
706
707        // Expire the encryption sync session.
708        self.inner.lock().await.encryption_sync_service.expire_sync_session().await;
709    }
710
711    /// Attempt to get a permit to use an `EncryptionSyncService` at a given
712    /// time.
713    ///
714    /// This ensures there is at most one [`EncryptionSyncService`] active at
715    /// any time, per application.
716    pub fn try_get_encryption_sync_permit(&self) -> Option<OwnedMutexGuard<EncryptionSyncPermit>> {
717        self.encryption_sync_permit.clone().try_lock_owned().ok()
718    }
719}
720
721#[derive(Debug)]
722enum TerminationOrigin {
723    EncryptionSync,
724    RoomList,
725    Supervisor,
726}
727
728#[derive(Debug)]
729struct TerminationReport {
730    is_error: bool,
731    has_expired: bool,
732    origin: TerminationOrigin,
733}
734
735impl TerminationReport {
736    fn supervisor_error() -> Self {
737        TerminationReport {
738            is_error: true,
739            has_expired: false,
740            origin: TerminationOrigin::Supervisor,
741        }
742    }
743}
744
745// Testing helpers, mostly.
746#[doc(hidden)]
747impl SyncService {
748    /// Is the task supervisor running?
749    pub async fn is_supervisor_running(&self) -> bool {
750        self.inner.lock().await.supervisor.is_some()
751    }
752}
753
754#[derive(Clone)]
755pub struct SyncServiceBuilder {
756    /// SDK client.
757    client: Client,
758
759    /// Is the cross-process lock for the crypto store enabled?
760    with_cross_process_lock: bool,
761
762    /// Is the offline mode for the [`SyncService`] enabled?
763    ///
764    /// The offline mode is described in the [`State::Offline`] enum variant.
765    with_offline_mode: bool,
766
767    /// Whether to turn [`SlidingSyncBuilder::share_pos`] on or off.
768    ///
769    /// [`SlidingSyncBuilder::share_pos`]: matrix_sdk::sliding_sync::SlidingSyncBuilder::share_pos
770    with_share_pos: bool,
771
772    /// The parent tracing span to use for the tasks within this service.
773    ///
774    /// Normally this will be [`Span::none`], but it may be useful to assign a
775    /// defined span, for example if there is more than one active sync
776    /// service.
777    parent_span: Span,
778}
779
780impl SyncServiceBuilder {
781    fn new(client: Client) -> Self {
782        Self {
783            client,
784            with_cross_process_lock: false,
785            with_offline_mode: false,
786            with_share_pos: true,
787            parent_span: Span::none(),
788        }
789    }
790
791    /// Enables the cross-process lock, if the sync service is being built in a
792    /// multi-process setup.
793    ///
794    /// It's a prerequisite if another process can *also* process encryption
795    /// events. This is only applicable to very specific use cases, like an
796    /// external process attempting to decrypt notifications. In general,
797    /// `with_cross_process_lock` should not be called.
798    ///
799    /// Be sure to have configured
800    /// [`Client::cross_process_store_locks_holder_name`] accordingly.
801    pub fn with_cross_process_lock(mut self) -> Self {
802        self.with_cross_process_lock = true;
803        self
804    }
805
806    /// Enable the "offline" mode for the [`SyncService`].
807    ///
808    /// To learn more about the "offline" mode read the documentation for the
809    /// [`State::Offline`] enum variant.
810    pub fn with_offline_mode(mut self) -> Self {
811        self.with_offline_mode = true;
812        self
813    }
814
815    /// Whether to turn [`SlidingSyncBuilder::share_pos`] on or off.
816    ///
817    /// [`SlidingSyncBuilder::share_pos`]: matrix_sdk::sliding_sync::SlidingSyncBuilder::share_pos
818    pub fn with_share_pos(mut self, enable: bool) -> Self {
819        self.with_share_pos = enable;
820        self
821    }
822
823    /// Set the parent tracing span to be used for the tasks within this
824    /// service.
825    pub fn with_parent_span(mut self, parent_span: Span) -> Self {
826        self.parent_span = parent_span;
827        self
828    }
829
830    /// Finish setting up the [`SyncService`].
831    ///
832    /// This creates the underlying sliding syncs, and will *not* start them in
833    /// the background. The resulting [`SyncService`] must be kept alive as long
834    /// as the sliding syncs are supposed to run.
835    pub async fn build(self) -> Result<SyncService, Error> {
836        let Self {
837            client,
838            with_cross_process_lock,
839            with_offline_mode,
840            with_share_pos,
841            parent_span,
842        } = self;
843
844        let encryption_sync_permit = Arc::new(AsyncMutex::new(EncryptionSyncPermit::new()));
845
846        let room_list = RoomListService::new_with_share_pos(client.clone(), with_share_pos).await?;
847
848        let encryption_sync = Arc::new(
849            EncryptionSyncService::new(client, None, WithLocking::from(with_cross_process_lock))
850                .await?,
851        );
852
853        let room_list_service = Arc::new(room_list);
854        let state = SharedObservable::new(State::Idle);
855
856        Ok(SyncService {
857            state: state.clone(),
858            room_list_service,
859            encryption_sync_permit,
860            inner: Arc::new(AsyncMutex::new(SyncServiceInner {
861                supervisor: None,
862                encryption_sync_service: encryption_sync,
863                state,
864                with_offline_mode,
865                parent_span,
866            })),
867        })
868    }
869}
870
871/// Errors for the [`SyncService`] API.
872#[derive(Debug, Error)]
873pub enum Error {
874    /// An error received from the `RoomListService` API.
875    #[error(transparent)]
876    RoomList(#[from] room_list_service::Error),
877
878    /// An error received from the `EncryptionSyncService` API.
879    #[error(transparent)]
880    EncryptionSync(#[from] encryption_sync_service::Error),
881
882    /// An error had occurred in the sync task supervisor, likely due to a bug.
883    #[error("the supervisor channel has run into an unexpected error")]
884    InternalSupervisorError,
885}