Skip to main content

matrix_sdk_ui/
sync_service.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15//! Unified API for both the Room List API and the Encryption Sync API, that
16//! takes care of all the underlying details.
17//!
18//! This is an opiniated way to run both APIs, with high-level callbacks that
19//! should be called in reaction to user actions and/or system events.
20//!
21//! The sync service will signal errors via its [`state`](SyncService::state)
22//! that the user MUST observe. Whenever an error/termination is observed, the
23//! user should call [`SyncService::start()`] again to restart the room list
24//! sync, if that is not desirable, the offline support for the [`SyncService`]
25//! may be enabled using the [`SyncServiceBuilder::with_offline_mode`] setting.
26
27use std::{sync::Arc, time::Duration};
28
29use eyeball::{SharedObservable, Subscriber};
30use futures_util::{
31    StreamExt as _,
32    future::{Either, select},
33    pin_mut,
34};
35use matrix_sdk::{
36    Client,
37    config::RequestConfig,
38    executor::{JoinHandle, spawn},
39    sleep::sleep,
40};
41use thiserror::Error;
42use tokio::sync::{
43    Mutex as AsyncMutex, OwnedMutexGuard,
44    mpsc::{Receiver, Sender},
45};
46use tracing::{Instrument, Level, Span, error, info, instrument, trace, warn};
47
48use crate::{
49    encryption_sync_service::{self, EncryptionSyncPermit, EncryptionSyncService},
50    room_list_service::{
51        self, DEFAULT_CONNECTION_ID, DEFAULT_LIST_TIMELINE_LIMIT, RoomListService,
52    },
53};
54
55/// Current state of the application.
56///
57/// This is a high-level state indicating what's the status of the underlying
58/// syncs. The application starts in [`State::Running`] mode, and then hits a
59/// terminal state [`State::Terminated`] (if it gracefully exited) or
60/// [`State::Error`] (in case any of the underlying syncs ran into an error).
61///
62/// This can be observed with [`SyncService::state`].
63#[derive(Clone, Debug)]
64pub enum State {
65    /// The service hasn't ever been started yet, or has been stopped.
66    Idle,
67
68    /// The underlying syncs are properly running in the background.
69    Running,
70
71    /// Any of the underlying syncs has terminated gracefully (i.e. be stopped).
72    Terminated,
73
74    /// Any of the underlying syncs has ran into an error.
75    ///
76    /// The associated [`enum@Error`] is inside an [`Arc`] to (i) make [`State`]
77    /// cloneable, and to (ii) not make it heavier.
78    Error(Arc<Error>),
79
80    /// The service has entered offline mode. This state will only be entered if
81    /// the [`SyncService`] has been built with the
82    /// [`SyncServiceBuilder::with_offline_mode`] setting.
83    ///
84    /// The [`SyncService`] will enter the offline mode if syncing with the
85    /// server fails, it will then periodically check if the server is
86    /// available using the `/_matrix/client/versions` endpoint.
87    ///
88    /// Once the [`SyncService`] receives a 200 response from the
89    /// `/_matrix/client/versions` endpoint, it will go back into the
90    /// [`State::Running`] mode and attempt to sync again.
91    ///
92    /// Calling [`SyncService::start()`] while in this state will abort the
93    /// `/_matrix/client/versions` checks and attempt to sync immediately.
94    ///
95    /// Calling [`SyncService::stop()`] will abort the offline mode and the
96    /// [`SyncService`] will go into the [`State::Idle`] mode.
97    Offline,
98}
99
100enum MaybeAcquiredPermit {
101    Acquired(OwnedMutexGuard<EncryptionSyncPermit>),
102    Unacquired(Arc<AsyncMutex<EncryptionSyncPermit>>),
103}
104
105impl MaybeAcquiredPermit {
106    async fn acquire(self) -> OwnedMutexGuard<EncryptionSyncPermit> {
107        match self {
108            MaybeAcquiredPermit::Acquired(owned_mutex_guard) => owned_mutex_guard,
109            MaybeAcquiredPermit::Unacquired(lock) => lock.lock_owned().await,
110        }
111    }
112}
113
114/// A supervisor responsible for managing two sync tasks: one for handling the
115/// room list and another for supporting end-to-end encryption.
116///
117/// The two sync tasks are spawned as child tasks and are contained within the
118/// supervising task, which is stored in the [`SyncTaskSupervisor::task`] field.
119///
120/// The supervisor ensures the two child tasks are managed as a single unit,
121/// allowing for them to be shutdown in unison.
122struct SyncTaskSupervisor {
123    /// The supervising task that manages and contains the two sync child tasks.
124    task: JoinHandle<()>,
125    /// [`TerminationReport`] sender for the [`SyncTaskSupervisor::shutdown()`]
126    /// function.
127    termination_sender: Sender<TerminationReport>,
128}
129
130impl SyncTaskSupervisor {
131    async fn new(
132        inner: &SyncServiceInner,
133        room_list_service: Arc<RoomListService>,
134        encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
135    ) -> Self {
136        let (task, termination_sender) =
137            Self::spawn_supervisor_task(inner, room_list_service, encryption_sync_permit).await;
138
139        Self { task, termination_sender }
140    }
141
142    /// Check if a homeserver is reachable.
143    ///
144    /// This function handles the offline mode by waiting for either a
145    /// termination report or a successful `/_matrix/client/versions` response.
146    ///
147    /// This function waits for two conditions:
148    ///
149    /// 1. Waiting for a termination report: This ensures that the user can exit
150    ///    offline mode and attempt to restart the [`SyncService`] manually.
151    ///
152    /// 2. Waiting to come back online: This continuously checks server
153    ///    availability.
154    ///
155    /// If the `/_matrix/client/versions` request succeeds, the function exits
156    /// without a termination report. If we receive a [`TerminationReport`] from
157    /// the user, we exit immediately and return the termination report.
158    async fn offline_check(
159        client: &Client,
160        receiver: &mut Receiver<TerminationReport>,
161    ) -> Option<TerminationReport> {
162        info!("Entering the offline mode");
163
164        let wait_for_termination_report = async {
165            loop {
166                // Since we didn't empty the channel when entering the offline mode in fear that
167                // we might miss a report with the
168                // `TerminationOrigin::Supervisor` origin and the channel might contain stale
169                // reports from one of the sync services, in case both of them have sent a
170                // report, let's ignore all reports we receive from the sync
171                // services.
172                let report =
173                    receiver.recv().await.unwrap_or_else(TerminationReport::supervisor_error);
174
175                match report.origin {
176                    TerminationOrigin::EncryptionSync | TerminationOrigin::RoomList => {}
177                    // Since the sync service aren't running anymore, we can only receive a report
178                    // from the supervisor. It would have probably made sense to have separate
179                    // channels for reports the sync services send and the user can send using the
180                    // `SyncService::stop()` method.
181                    TerminationOrigin::Supervisor => break report,
182                }
183            }
184        };
185
186        let wait_to_be_online = async move {
187            loop {
188                // Encountering network failures when sending a request which has with no retry
189                // limit set in the `RequestConfig` are treated as permanent failures and our
190                // exponential backoff doesn't kick in.
191                //
192                // Let's set a retry limit so network failures are retried as well.
193                let request_config = RequestConfig::default().retry_limit(5);
194
195                // We're in an infinite loop, but our request sending already has an exponential
196                // backoff set up. This will kick in for any request errors that we consider to
197                // be transient. Common network errors (timeouts, DNS failures) or any server
198                // error in the 5xx range of HTTP errors are considered to be transient.
199                //
200                // Still, as a precaution, we're going to sleep here for a while in the Error
201                // case.
202                match client.fetch_server_versions(Some(request_config)).await {
203                    Ok(_) => break,
204                    Err(_) => sleep(Duration::from_millis(100)).await,
205                }
206            }
207        };
208
209        pin_mut!(wait_for_termination_report);
210        pin_mut!(wait_to_be_online);
211
212        let maybe_termination_report = select(wait_for_termination_report, wait_to_be_online).await;
213
214        let report = match maybe_termination_report {
215            Either::Left((termination_report, _)) => Some(termination_report),
216            Either::Right((_, _)) => None,
217        };
218
219        info!("Exiting offline mode: {report:?}");
220
221        report
222    }
223
224    /// The role of the supervisor task is to wait for a termination message
225    /// ([`TerminationReport`]), sent either because we wanted to stop both
226    /// syncs, or because one of the syncs failed (in which case we'll stop the
227    /// other one too).
228    async fn spawn_supervisor_task(
229        inner: &SyncServiceInner,
230        room_list_service: Arc<RoomListService>,
231        encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
232    ) -> (JoinHandle<()>, Sender<TerminationReport>) {
233        let (sender, mut receiver) = tokio::sync::mpsc::channel(16);
234
235        let encryption_sync = inner.encryption_sync_service.clone();
236        let state = inner.state.clone();
237        let termination_sender = sender.clone();
238
239        // When we first start, and don't use offline mode, we want to acquire the sync
240        // permit before we enter a future that might be polled at a later time,
241        // this means that the permit will be acquired as soon as this future,
242        // the one the `spawn_supervisor_task` function creates, is awaited.
243        //
244        // In other words, once `sync_service.start().await` is finished, the permit
245        // will be in the acquired state.
246        let mut sync_permit_guard =
247            MaybeAcquiredPermit::Acquired(encryption_sync_permit.clone().lock_owned().await);
248
249        let offline_mode = inner.with_offline_mode;
250        let parent_span = inner.parent_span.clone();
251
252        let future = async move {
253            loop {
254                let (room_list_task, encryption_sync_task) = Self::spawn_child_tasks(
255                    room_list_service.clone(),
256                    encryption_sync.clone(),
257                    sync_permit_guard,
258                    sender.clone(),
259                    parent_span.clone(),
260                )
261                .await;
262
263                sync_permit_guard = MaybeAcquiredPermit::Unacquired(encryption_sync_permit.clone());
264
265                let report = if let Some(report) = receiver.recv().await {
266                    report
267                } else {
268                    info!("internal channel has been closed?");
269                    // We should still stop the child tasks in the unlikely scenario that our
270                    // receiver died.
271                    TerminationReport::supervisor_error()
272                };
273
274                // If one service failed, make sure to request stopping the other one.
275                let (stop_room_list, stop_encryption) = match &report.origin {
276                    TerminationOrigin::EncryptionSync => (true, false),
277                    TerminationOrigin::RoomList => (false, true),
278                    TerminationOrigin::Supervisor => (true, true),
279                };
280
281                // Stop both services, and wait for the streams to properly finish: at some
282                // point they'll return `None` and will exit their infinite loops, and their
283                // tasks will gracefully terminate.
284
285                if stop_room_list {
286                    if let Err(err) = room_list_service.stop_sync() {
287                        warn!(?report, "unable to stop room list service: {err:#}");
288                    }
289
290                    if report.has_expired() {
291                        room_list_service.expire_sync_session().await;
292                    }
293                }
294
295                if let Err(err) = room_list_task.await {
296                    error!("when awaiting room list service: {err:#}");
297                }
298
299                if stop_encryption {
300                    if let Err(err) = encryption_sync.stop_sync() {
301                        warn!(?report, "unable to stop encryption sync: {err:#}");
302                    }
303
304                    if report.has_expired() {
305                        encryption_sync.expire_sync_session().await;
306                    }
307                }
308
309                if let Err(err) = encryption_sync_task.await {
310                    error!("when awaiting encryption sync: {err:#}");
311                }
312
313                if let Some(error) = report.error {
314                    if offline_mode {
315                        state.set(State::Offline);
316
317                        let client = room_list_service.client();
318
319                        if let Some(report) = Self::offline_check(client, &mut receiver).await {
320                            if let Some(error) = report.error {
321                                state.set(State::Error(Arc::new(error)));
322                            } else {
323                                state.set(State::Idle);
324                            }
325                            break;
326                        }
327
328                        state.set(State::Running);
329                    } else {
330                        state.set(State::Error(Arc::new(error)));
331                        break;
332                    }
333                } else if matches!(report.origin, TerminationOrigin::Supervisor) {
334                    state.set(State::Idle);
335                    break;
336                } else {
337                    state.set(State::Terminated);
338                    break;
339                }
340            }
341        }
342        .instrument(tracing::span!(Level::WARN, "supervisor task"));
343
344        let task = spawn(future);
345
346        (task, termination_sender)
347    }
348
349    async fn spawn_child_tasks(
350        room_list_service: Arc<RoomListService>,
351        encryption_sync_service: Arc<EncryptionSyncService>,
352        sync_permit_guard: MaybeAcquiredPermit,
353        sender: Sender<TerminationReport>,
354        parent_span: Span,
355    ) -> (JoinHandle<()>, JoinHandle<()>) {
356        // First, take care of the room list.
357        let room_list_task = spawn(
358            Self::room_list_sync_task(room_list_service, sender.clone())
359                .instrument(parent_span.clone()),
360        );
361
362        // Then, take care of the encryption sync.
363        let encryption_sync_task = spawn(
364            Self::encryption_sync_task(
365                encryption_sync_service,
366                sender.clone(),
367                sync_permit_guard.acquire().await,
368            )
369            .instrument(parent_span),
370        );
371
372        (room_list_task, encryption_sync_task)
373    }
374
375    async fn encryption_sync_task(
376        encryption_sync: Arc<EncryptionSyncService>,
377        sender: Sender<TerminationReport>,
378        sync_permit_guard: OwnedMutexGuard<EncryptionSyncPermit>,
379    ) {
380        let encryption_sync_stream = encryption_sync.sync(sync_permit_guard);
381        pin_mut!(encryption_sync_stream);
382
383        let termination_report = loop {
384            match encryption_sync_stream.next().await {
385                Some(Ok(())) => {
386                    // Carry on.
387                }
388                Some(Err(error)) => {
389                    let termination_report = TerminationReport::encryption_sync(Some(error));
390
391                    if !termination_report.has_expired() {
392                        error!(
393                            "Error while processing encryption in sync service: {:#?}",
394                            termination_report.error
395                        );
396                    }
397
398                    break termination_report;
399                }
400                None => {
401                    // The stream has ended.
402                    break TerminationReport::encryption_sync(None);
403                }
404            }
405        };
406
407        if let Err(err) = sender.send(termination_report).await {
408            error!("Error while sending termination report: {err:#}");
409        }
410    }
411
412    async fn room_list_sync_task(
413        room_list_service: Arc<RoomListService>,
414        sender: Sender<TerminationReport>,
415    ) {
416        let room_list_stream = room_list_service.sync();
417        pin_mut!(room_list_stream);
418
419        let termination_report = loop {
420            match room_list_stream.next().await {
421                Some(Ok(())) => {
422                    // Carry on.
423                }
424                Some(Err(error)) => {
425                    let termination_report = TerminationReport::room_list(Some(error));
426
427                    if !termination_report.has_expired() {
428                        error!(
429                            "Error while processing room list in sync service: {:#?}",
430                            termination_report.error
431                        );
432                    }
433
434                    break termination_report;
435                }
436                None => {
437                    // The stream has ended.
438                    break TerminationReport::room_list(None);
439                }
440            }
441        };
442
443        if let Err(err) = sender.send(termination_report).await {
444            error!("Error while sending termination report: {err:#}");
445        }
446    }
447
448    async fn shutdown(self) {
449        match self.termination_sender.send(TerminationReport::supervisor()).await {
450            Ok(_) => {
451                let _ = self.task.await.inspect_err(|err| {
452                    // A `JoinError` indicates that the task was already dead, either because it got
453                    // cancelled or because it panicked. We only cancel the task in the Err branch
454                    // below and the task shouldn't be able to panic.
455                    //
456                    // So let's log an error and return.
457                    error!("The supervisor task has stopped unexpectedly: {err:?}");
458                });
459            }
460            Err(err) => {
461                error!("Couldn't send the termination report to the supervisor task: {err}");
462                // Let's abort the task if it won't shut down properly, otherwise we would have
463                // left it as a detached task.
464                self.task.abort();
465            }
466        }
467    }
468}
469
470struct SyncServiceInner {
471    encryption_sync_service: Arc<EncryptionSyncService>,
472
473    /// Is the offline mode for the [`SyncService`] enabled?
474    ///
475    /// The offline mode is described in the [`State::Offline`] enum variant.
476    with_offline_mode: bool,
477
478    /// What's the state of this sync service?
479    state: SharedObservable<State>,
480
481    /// The parent tracing span to use for the tasks within this service.
482    ///
483    /// Normally this will be [`Span::none`], but it may be useful to assign a
484    /// defined span, for example if there is more than one active sync
485    /// service.
486    parent_span: Span,
487
488    /// Supervisor task ensuring proper termination.
489    ///
490    /// This task is waiting for a [`TerminationReport`] from any of the other
491    /// two tasks, or from a user request via [`SyncService::stop()`]. It
492    /// makes sure that the two services are properly shut up and just
493    /// interrupted.
494    ///
495    /// This is set at the same time as the other two tasks.
496    supervisor: Option<SyncTaskSupervisor>,
497}
498
499impl SyncServiceInner {
500    async fn start(
501        &mut self,
502        room_list_service: Arc<RoomListService>,
503        encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
504    ) {
505        trace!("starting sync service");
506
507        self.supervisor =
508            Some(SyncTaskSupervisor::new(self, room_list_service, encryption_sync_permit).await);
509        self.state.set(State::Running);
510    }
511
512    async fn stop(&mut self) {
513        trace!("pausing sync service");
514
515        // Remove the supervisor from our state and request the tasks to be shutdown.
516        if let Some(supervisor) = self.supervisor.take() {
517            supervisor.shutdown().await;
518        } else {
519            error!("The sync service was not properly started, the supervisor task doesn't exist");
520        }
521    }
522
523    async fn restart(
524        &mut self,
525        room_list_service: Arc<RoomListService>,
526        encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
527    ) {
528        self.stop().await;
529        self.start(room_list_service, encryption_sync_permit).await;
530    }
531}
532
533/// A high level manager for your Matrix syncing needs.
534///
535/// The [`SyncService`] is responsible for managing real-time synchronization
536/// with a Matrix server. It can initiate and maintain the necessary
537/// synchronization tasks for you.
538///
539/// **Note**: The [`SyncService`] requires a server with support for [MSC4186],
540/// otherwise it will fail with an 404 `M_UNRECOGNIZED` request error.
541///
542/// [MSC4186]: https://github.com/matrix-org/matrix-spec-proposals/pull/4186/
543///
544/// # Example
545///
546/// ```no_run
547/// use matrix_sdk::Client;
548/// use matrix_sdk_ui::sync_service::{State, SyncService};
549/// # use url::Url;
550/// # async {
551/// let homeserver = Url::parse("http://example.com")?;
552/// let client = Client::new(homeserver).await?;
553///
554/// client
555///     .matrix_auth()
556///     .login_username("example", "wordpass")
557///     .initial_device_display_name("My bot")
558///     .await?;
559///
560/// let sync_service = SyncService::builder(client).build().await?;
561/// let mut state = sync_service.state();
562///
563/// while let Some(state) = state.next().await {
564///     match state {
565///         State::Idle => eprintln!("The sync service is idle."),
566///         State::Running => eprintln!("The sync has started to run."),
567///         State::Offline => eprintln!(
568///             "We have entered the offline mode, the server seems to be
569///              unavailable"
570///         ),
571///         State::Terminated => {
572///             eprintln!("The sync service has been gracefully terminated");
573///             break;
574///         }
575///         State::Error(_) => {
576///             eprintln!("The sync service has run into an error");
577///             break;
578///         }
579///     }
580/// }
581/// # anyhow::Ok(()) };
582/// ```
583pub struct SyncService {
584    inner: Arc<AsyncMutex<SyncServiceInner>>,
585
586    /// Room list service used to synchronize the rooms state.
587    room_list_service: Arc<RoomListService>,
588
589    /// What's the state of this sync service? This field is replicated from the
590    /// [`SyncServiceInner`] struct, but it should not be modified in this
591    /// struct. It's re-exposed here so we can subscribe to the state without
592    /// taking the lock on the `inner` field.
593    state: SharedObservable<State>,
594
595    /// Global lock to allow using at most one [`EncryptionSyncService`] at all
596    /// times.
597    ///
598    /// This ensures that there's only one ever existing in the application's
599    /// lifetime (under the assumption that there is at most one [`SyncService`]
600    /// per application).
601    encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
602}
603
604impl SyncService {
605    /// Create a new builder for configuring an `SyncService`.
606    pub fn builder(client: Client) -> SyncServiceBuilder {
607        SyncServiceBuilder::new(client)
608    }
609
610    /// Get the underlying `RoomListService` instance for easier access to its
611    /// methods.
612    pub fn room_list_service(&self) -> Arc<RoomListService> {
613        self.room_list_service.clone()
614    }
615
616    /// Returns the state of the sync service.
617    pub fn state(&self) -> Subscriber<State> {
618        self.state.subscribe()
619    }
620
621    /// Start (or restart) the underlying sliding syncs.
622    ///
623    /// This can be called multiple times safely:
624    /// - if the stream is still properly running, it won't be restarted.
625    /// - if the [`SyncService`] is in the offline mode we will exit the offline
626    ///   mode and immediately attempt to sync again.
627    /// - if the stream has been aborted before, it will be properly cleaned up
628    ///   and restarted.
629    pub async fn start(&self) {
630        let mut inner = self.inner.lock().await;
631
632        // Only (re)start the tasks if it's stopped or if we're in the offline mode.
633        match inner.state.get() {
634            // If we're already running, there's nothing to do.
635            State::Running => {}
636            // If we're in the offline mode, first stop the service and then start it again.
637            State::Offline => {
638                inner
639                    .restart(self.room_list_service.clone(), self.encryption_sync_permit.clone())
640                    .await
641            }
642            // Otherwise just start.
643            State::Idle | State::Terminated | State::Error(_) => {
644                inner
645                    .start(self.room_list_service.clone(), self.encryption_sync_permit.clone())
646                    .await
647            }
648        }
649    }
650
651    /// Stop the underlying sliding syncs.
652    ///
653    /// This must be called when the app goes into the background. It's better
654    /// to call this API when the application exits, although not strictly
655    /// necessary.
656    #[instrument(skip_all)]
657    pub async fn stop(&self) {
658        let mut inner = self.inner.lock().await;
659
660        match inner.state.get() {
661            State::Idle | State::Terminated | State::Error(_) => {
662                // No need to stop if we were not running.
663                return;
664            }
665            State::Running | State::Offline => {}
666        }
667
668        inner.stop().await;
669    }
670
671    /// Force expiring both sessions.
672    ///
673    /// This ensures that the sync service is stopped before expiring both
674    /// sessions. It should be used sparingly, as it will cause a restart of
675    /// the sessions on the server as well.
676    #[instrument(skip_all)]
677    pub async fn expire_sessions(&self) {
678        // First, stop the sync service if it was running; it's a no-op if it was
679        // already stopped.
680        self.stop().await;
681
682        // Expire the room list sync session.
683        self.room_list_service.expire_sync_session().await;
684
685        // Expire the encryption sync session.
686        self.inner.lock().await.encryption_sync_service.expire_sync_session().await;
687    }
688
689    /// Attempt to get a permit to use an `EncryptionSyncService` at a given
690    /// time.
691    ///
692    /// This ensures there is at most one [`EncryptionSyncService`] active at
693    /// any time, per application.
694    pub fn try_get_encryption_sync_permit(&self) -> Option<OwnedMutexGuard<EncryptionSyncPermit>> {
695        self.encryption_sync_permit.clone().try_lock_owned().ok()
696    }
697}
698
699#[derive(Debug)]
700enum TerminationOrigin {
701    EncryptionSync,
702    RoomList,
703    Supervisor,
704}
705
706#[derive(Debug)]
707struct TerminationReport {
708    /// The origin of the termination.
709    origin: TerminationOrigin,
710
711    /// If the termination is due to an error, this is the cause.
712    error: Option<Error>,
713}
714
715impl TerminationReport {
716    /// Create a new [`TerminationReport`] with `origin` set to
717    /// [`TerminationOrigin::EncryptionSync`] and `error` set to
718    /// [`Error::EncryptionSync`].
719    fn encryption_sync(error: Option<encryption_sync_service::Error>) -> Self {
720        Self { origin: TerminationOrigin::EncryptionSync, error: error.map(Error::EncryptionSync) }
721    }
722
723    /// Create a new [`TerminationReport`] with `origin` set to
724    /// [`TerminationOrigin::RoomList`] and `error` set to [`Error::RoomList`].
725    fn room_list(error: Option<room_list_service::Error>) -> Self {
726        Self { origin: TerminationOrigin::RoomList, error: error.map(Error::RoomList) }
727    }
728
729    /// Create a new [`TerminationReport`] with `origin` set to
730    /// [`TerminationOrigin::Supervisor`] and `error` set to
731    /// [`Error::Supervisor`].
732    fn supervisor_error() -> Self {
733        Self { origin: TerminationOrigin::Supervisor, error: Some(Error::Supervisor) }
734    }
735
736    /// Create a new [`TerminationReport`] with `origin` set to
737    /// [`TerminationOrigin::Supervisor`] and `error` set to `None`.
738    fn supervisor() -> Self {
739        Self { origin: TerminationOrigin::Supervisor, error: None }
740    }
741
742    /// Check whether the termination is due to an expired sliding sync session.
743    fn has_expired(&self) -> bool {
744        match &self.error {
745            Some(Error::RoomList(room_list_service::Error::SlidingSync(error)))
746            | Some(Error::EncryptionSync(encryption_sync_service::Error::SlidingSync(error))) => {
747                error.client_api_error_kind() == Some(&ruma::api::error::ErrorKind::UnknownPos)
748            }
749            _ => false,
750        }
751    }
752}
753
754// Testing helpers, mostly.
755#[doc(hidden)]
756impl SyncService {
757    /// Is the task supervisor running?
758    pub async fn is_supervisor_running(&self) -> bool {
759        self.inner.lock().await.supervisor.is_some()
760    }
761}
762
763#[derive(Clone)]
764pub struct SyncServiceBuilder {
765    /// SDK client.
766    client: Client,
767
768    /// Is the offline mode for the [`SyncService`] enabled?
769    ///
770    /// The offline mode is described in the [`State::Offline`] enum variant.
771    with_offline_mode: bool,
772
773    /// Whether to turn [`SlidingSyncBuilder::share_pos`] on or off.
774    ///
775    /// [`SlidingSyncBuilder::share_pos`]: matrix_sdk::sliding_sync::SlidingSyncBuilder::share_pos
776    with_share_pos: bool,
777
778    /// Custom connection ID for the room list service.
779    /// Defaults to [`room_list_service::DEFAULT_CONNECTION_ID`]. Use a
780    /// different value for secondary processes such as iOS share extensions
781    /// that are not meant to reuse the main app's connection.
782    room_list_conn_id: String,
783
784    /// Custom timeline limit for the room list service. Defaults to
785    /// [`room_list_service::DEFAULT_LIST_TIMELINE_LIMIT`].
786    room_list_timeline_limit: u32,
787
788    /// The parent tracing span to use for the tasks within this service.
789    ///
790    /// Normally this will be [`Span::none`], but it may be useful to assign a
791    /// defined span, for example if there is more than one active sync
792    /// service.
793    parent_span: Span,
794}
795
796impl SyncServiceBuilder {
797    fn new(client: Client) -> Self {
798        Self {
799            client,
800            with_offline_mode: false,
801            with_share_pos: true,
802            room_list_conn_id: DEFAULT_CONNECTION_ID.to_owned(),
803            room_list_timeline_limit: DEFAULT_LIST_TIMELINE_LIMIT,
804            parent_span: Span::none(),
805        }
806    }
807
808    /// Enable the "offline" mode for the [`SyncService`].
809    ///
810    /// To learn more about the "offline" mode read the documentation for the
811    /// [`State::Offline`] enum variant.
812    pub fn with_offline_mode(mut self) -> Self {
813        self.with_offline_mode = true;
814        self
815    }
816
817    /// Whether to turn [`SlidingSyncBuilder::share_pos`] on or off.
818    ///
819    /// [`SlidingSyncBuilder::share_pos`]: matrix_sdk::sliding_sync::SlidingSyncBuilder::share_pos
820    pub fn with_share_pos(mut self, enable: bool) -> Self {
821        self.with_share_pos = enable;
822        self
823    }
824
825    /// Set a custom conn_id for the room list sliding sync connection.
826    pub fn with_room_list_conn_id(mut self, conn_id: String) -> Self {
827        self.room_list_conn_id = conn_id;
828        self
829    }
830
831    /// Set a custom timeline limit for the room list service.
832    pub fn with_room_list_timeline_limit(mut self, limit: u32) -> Self {
833        self.room_list_timeline_limit = limit;
834        self
835    }
836
837    /// Set the parent tracing span to be used for the tasks within this
838    /// service.
839    pub fn with_parent_span(mut self, parent_span: Span) -> Self {
840        self.parent_span = parent_span;
841        self
842    }
843
844    /// Finish setting up the [`SyncService`].
845    ///
846    /// This creates the underlying sliding syncs, and will *not* start them in
847    /// the background. The resulting [`SyncService`] must be kept alive as long
848    /// as the sliding syncs are supposed to run.
849    pub async fn build(self) -> Result<SyncService, Error> {
850        let Self {
851            client,
852            with_offline_mode,
853            with_share_pos,
854            room_list_conn_id,
855            room_list_timeline_limit,
856            parent_span,
857        } = self;
858
859        let encryption_sync_permit = Arc::new(AsyncMutex::new(EncryptionSyncPermit::new()));
860
861        let room_list = RoomListService::new_with(
862            client.clone(),
863            with_share_pos,
864            &room_list_conn_id,
865            room_list_timeline_limit,
866        )
867        .await?;
868
869        let encryption_sync = Arc::new(EncryptionSyncService::new(client, None).await?);
870
871        let room_list_service = Arc::new(room_list);
872        let state = SharedObservable::new(State::Idle);
873
874        Ok(SyncService {
875            state: state.clone(),
876            room_list_service,
877            encryption_sync_permit,
878            inner: Arc::new(AsyncMutex::new(SyncServiceInner {
879                supervisor: None,
880                encryption_sync_service: encryption_sync,
881                state,
882                with_offline_mode,
883                parent_span,
884            })),
885        })
886    }
887}
888
889/// Errors for the [`SyncService`] API.
890#[derive(Debug, Error)]
891pub enum Error {
892    /// An error received from the `RoomListService` API.
893    #[error(transparent)]
894    RoomList(#[from] room_list_service::Error),
895
896    /// An error received from the `EncryptionSyncService` API.
897    #[error(transparent)]
898    EncryptionSync(#[from] encryption_sync_service::Error),
899
900    /// An error had occurred in the sync task supervisor, likely due to a bug.
901    #[error("the supervisor channel has run into an unexpected error")]
902    Supervisor,
903}