matrix_sdk_ui/sync_service.rs
1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15//! Unified API for both the Room List API and the Encryption Sync API, that
16//! takes care of all the underlying details.
17//!
18//! This is an opiniated way to run both APIs, with high-level callbacks that
19//! should be called in reaction to user actions and/or system events.
20//!
21//! The sync service will signal errors via its [`state`](SyncService::state)
22//! that the user MUST observe. Whenever an error/termination is observed, the
23//! user should call [`SyncService::start()`] again to restart the room list
24//! sync, if that is not desirable, the offline support for the [`SyncService`]
25//! may be enabled using the [`SyncServiceBuilder::with_offline_mode`] setting.
26
27use std::{sync::Arc, time::Duration};
28
29use eyeball::{SharedObservable, Subscriber};
30use futures_util::{
31 StreamExt as _,
32 future::{Either, select},
33 pin_mut,
34};
35use matrix_sdk::{
36 Client,
37 config::RequestConfig,
38 executor::{JoinHandle, spawn},
39 sleep::sleep,
40};
41use thiserror::Error;
42use tokio::sync::{
43 Mutex as AsyncMutex, OwnedMutexGuard,
44 mpsc::{Receiver, Sender},
45};
46use tracing::{Instrument, Level, Span, error, info, instrument, trace, warn};
47
48use crate::{
49 encryption_sync_service::{self, EncryptionSyncPermit, EncryptionSyncService, WithLocking},
50 room_list_service::{self, RoomListService},
51};
52
53/// Current state of the application.
54///
55/// This is a high-level state indicating what's the status of the underlying
56/// syncs. The application starts in [`State::Running`] mode, and then hits a
57/// terminal state [`State::Terminated`] (if it gracefully exited) or
58/// [`State::Error`] (in case any of the underlying syncs ran into an error).
59///
60/// This can be observed with [`SyncService::state`].
61#[derive(Clone, Debug)]
62pub enum State {
63 /// The service hasn't ever been started yet, or has been stopped.
64 Idle,
65
66 /// The underlying syncs are properly running in the background.
67 Running,
68
69 /// Any of the underlying syncs has terminated gracefully (i.e. be stopped).
70 Terminated,
71
72 /// Any of the underlying syncs has ran into an error.
73 ///
74 /// The associated [`enum@Error`] is inside an [`Arc`] to (i) make [`State`]
75 /// cloneable, and to (ii) not make it heavier.
76 Error(Arc<Error>),
77
78 /// The service has entered offline mode. This state will only be entered if
79 /// the [`SyncService`] has been built with the
80 /// [`SyncServiceBuilder::with_offline_mode`] setting.
81 ///
82 /// The [`SyncService`] will enter the offline mode if syncing with the
83 /// server fails, it will then periodically check if the server is
84 /// available using the `/_matrix/client/versions` endpoint.
85 ///
86 /// Once the [`SyncService`] receives a 200 response from the
87 /// `/_matrix/client/versions` endpoint, it will go back into the
88 /// [`State::Running`] mode and attempt to sync again.
89 ///
90 /// Calling [`SyncService::start()`] while in this state will abort the
91 /// `/_matrix/client/versions` checks and attempt to sync immediately.
92 ///
93 /// Calling [`SyncService::stop()`] will abort the offline mode and the
94 /// [`SyncService`] will go into the [`State::Idle`] mode.
95 Offline,
96}
97
98enum MaybeAcquiredPermit {
99 Acquired(OwnedMutexGuard<EncryptionSyncPermit>),
100 Unacquired(Arc<AsyncMutex<EncryptionSyncPermit>>),
101}
102
103impl MaybeAcquiredPermit {
104 async fn acquire(self) -> OwnedMutexGuard<EncryptionSyncPermit> {
105 match self {
106 MaybeAcquiredPermit::Acquired(owned_mutex_guard) => owned_mutex_guard,
107 MaybeAcquiredPermit::Unacquired(lock) => lock.lock_owned().await,
108 }
109 }
110}
111
112/// A supervisor responsible for managing two sync tasks: one for handling the
113/// room list and another for supporting end-to-end encryption.
114///
115/// The two sync tasks are spawned as child tasks and are contained within the
116/// supervising task, which is stored in the [`SyncTaskSupervisor::task`] field.
117///
118/// The supervisor ensures the two child tasks are managed as a single unit,
119/// allowing for them to be shutdown in unison.
120struct SyncTaskSupervisor {
121 /// The supervising task that manages and contains the two sync child tasks.
122 task: JoinHandle<()>,
123 /// [`TerminationReport`] sender for the [`SyncTaskSupervisor::shutdown()`]
124 /// function.
125 termination_sender: Sender<TerminationReport>,
126}
127
128impl SyncTaskSupervisor {
129 async fn new(
130 inner: &SyncServiceInner,
131 room_list_service: Arc<RoomListService>,
132 encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
133 ) -> Self {
134 let (task, termination_sender) =
135 Self::spawn_supervisor_task(inner, room_list_service, encryption_sync_permit).await;
136
137 Self { task, termination_sender }
138 }
139
140 /// Check if a homeserver is reachable.
141 ///
142 /// This function handles the offline mode by waiting for either a
143 /// termination report or a successful `/_matrix/client/versions` response.
144 ///
145 /// This function waits for two conditions:
146 ///
147 /// 1. Waiting for a termination report: This ensures that the user can exit
148 /// offline mode and attempt to restart the [`SyncService`] manually.
149 ///
150 /// 2. Waiting to come back online: This continuously checks server
151 /// availability.
152 ///
153 /// If the `/_matrix/client/versions` request succeeds, the function exits
154 /// without a termination report. If we receive a [`TerminationReport`] from
155 /// the user, we exit immediately and return the termination report.
156 async fn offline_check(
157 client: &Client,
158 receiver: &mut Receiver<TerminationReport>,
159 ) -> Option<TerminationReport> {
160 info!("Entering the offline mode");
161
162 let wait_for_termination_report = async {
163 loop {
164 // Since we didn't empty the channel when entering the offline mode in fear that
165 // we might miss a report with the
166 // `TerminationOrigin::Supervisor` origin and the channel might contain stale
167 // reports from one of the sync services, in case both of them have sent a
168 // report, let's ignore all reports we receive from the sync
169 // services.
170 let report =
171 receiver.recv().await.unwrap_or_else(TerminationReport::supervisor_error);
172
173 match report.origin {
174 TerminationOrigin::EncryptionSync | TerminationOrigin::RoomList => {}
175 // Since the sync service aren't running anymore, we can only receive a report
176 // from the supervisor. It would have probably made sense to have separate
177 // channels for reports the sync services send and the user can send using the
178 // `SyncService::stop()` method.
179 TerminationOrigin::Supervisor => break report,
180 }
181 }
182 };
183
184 let wait_to_be_online = async move {
185 loop {
186 // Encountering network failures when sending a request which has with no retry
187 // limit set in the `RequestConfig` are treated as permanent failures and our
188 // exponential backoff doesn't kick in.
189 //
190 // Let's set a retry limit so network failures are retried as well.
191 let request_config = RequestConfig::default().retry_limit(5);
192
193 // We're in an infinite loop, but our request sending already has an exponential
194 // backoff set up. This will kick in for any request errors that we consider to
195 // be transient. Common network errors (timeouts, DNS failures) or any server
196 // error in the 5xx range of HTTP errors are considered to be transient.
197 //
198 // Still, as a precaution, we're going to sleep here for a while in the Error
199 // case.
200 match client.fetch_server_versions(Some(request_config)).await {
201 Ok(_) => break,
202 Err(_) => sleep(Duration::from_millis(100)).await,
203 }
204 }
205 };
206
207 pin_mut!(wait_for_termination_report);
208 pin_mut!(wait_to_be_online);
209
210 let maybe_termination_report = select(wait_for_termination_report, wait_to_be_online).await;
211
212 let report = match maybe_termination_report {
213 Either::Left((termination_report, _)) => Some(termination_report),
214 Either::Right((_, _)) => None,
215 };
216
217 info!("Exiting offline mode: {report:?}");
218
219 report
220 }
221
222 /// The role of the supervisor task is to wait for a termination message
223 /// ([`TerminationReport`]), sent either because we wanted to stop both
224 /// syncs, or because one of the syncs failed (in which case we'll stop the
225 /// other one too).
226 async fn spawn_supervisor_task(
227 inner: &SyncServiceInner,
228 room_list_service: Arc<RoomListService>,
229 encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
230 ) -> (JoinHandle<()>, Sender<TerminationReport>) {
231 let (sender, mut receiver) = tokio::sync::mpsc::channel(16);
232
233 let encryption_sync = inner.encryption_sync_service.clone();
234 let state = inner.state.clone();
235 let termination_sender = sender.clone();
236
237 // When we first start, and don't use offline mode, we want to acquire the sync
238 // permit before we enter a future that might be polled at a later time,
239 // this means that the permit will be acquired as soon as this future,
240 // the one the `spawn_supervisor_task` function creates, is awaited.
241 //
242 // In other words, once `sync_service.start().await` is finished, the permit
243 // will be in the acquired state.
244 let mut sync_permit_guard =
245 MaybeAcquiredPermit::Acquired(encryption_sync_permit.clone().lock_owned().await);
246
247 let offline_mode = inner.with_offline_mode;
248 let parent_span = inner.parent_span.clone();
249
250 let future = async move {
251 loop {
252 let (room_list_task, encryption_sync_task) = Self::spawn_child_tasks(
253 room_list_service.clone(),
254 encryption_sync.clone(),
255 sync_permit_guard,
256 sender.clone(),
257 parent_span.clone(),
258 )
259 .await;
260
261 sync_permit_guard = MaybeAcquiredPermit::Unacquired(encryption_sync_permit.clone());
262
263 let report = if let Some(report) = receiver.recv().await {
264 report
265 } else {
266 info!("internal channel has been closed?");
267 // We should still stop the child tasks in the unlikely scenario that our
268 // receiver died.
269 TerminationReport::supervisor_error()
270 };
271
272 // If one service failed, make sure to request stopping the other one.
273 let (stop_room_list, stop_encryption) = match &report.origin {
274 TerminationOrigin::EncryptionSync => (true, false),
275 TerminationOrigin::RoomList => (false, true),
276 TerminationOrigin::Supervisor => (true, true),
277 };
278
279 // Stop both services, and wait for the streams to properly finish: at some
280 // point they'll return `None` and will exit their infinite loops, and their
281 // tasks will gracefully terminate.
282
283 if stop_room_list {
284 if let Err(err) = room_list_service.stop_sync() {
285 warn!(?report, "unable to stop room list service: {err:#}");
286 }
287
288 if report.has_expired() {
289 room_list_service.expire_sync_session().await;
290 }
291 }
292
293 if let Err(err) = room_list_task.await {
294 error!("when awaiting room list service: {err:#}");
295 }
296
297 if stop_encryption {
298 if let Err(err) = encryption_sync.stop_sync() {
299 warn!(?report, "unable to stop encryption sync: {err:#}");
300 }
301
302 if report.has_expired() {
303 encryption_sync.expire_sync_session().await;
304 }
305 }
306
307 if let Err(err) = encryption_sync_task.await {
308 error!("when awaiting encryption sync: {err:#}");
309 }
310
311 if let Some(error) = report.error {
312 if offline_mode {
313 state.set(State::Offline);
314
315 let client = room_list_service.client();
316
317 if let Some(report) = Self::offline_check(client, &mut receiver).await {
318 if let Some(error) = report.error {
319 state.set(State::Error(Arc::new(error)));
320 } else {
321 state.set(State::Idle);
322 }
323 break;
324 }
325
326 state.set(State::Running);
327 } else {
328 state.set(State::Error(Arc::new(error)));
329 break;
330 }
331 } else if matches!(report.origin, TerminationOrigin::Supervisor) {
332 state.set(State::Idle);
333 break;
334 } else {
335 state.set(State::Terminated);
336 break;
337 }
338 }
339 }
340 .instrument(tracing::span!(Level::WARN, "supervisor task"));
341
342 let task = spawn(future);
343
344 (task, termination_sender)
345 }
346
347 async fn spawn_child_tasks(
348 room_list_service: Arc<RoomListService>,
349 encryption_sync_service: Arc<EncryptionSyncService>,
350 sync_permit_guard: MaybeAcquiredPermit,
351 sender: Sender<TerminationReport>,
352 parent_span: Span,
353 ) -> (JoinHandle<()>, JoinHandle<()>) {
354 // First, take care of the room list.
355 let room_list_task = spawn(
356 Self::room_list_sync_task(room_list_service, sender.clone())
357 .instrument(parent_span.clone()),
358 );
359
360 // Then, take care of the encryption sync.
361 let encryption_sync_task = spawn(
362 Self::encryption_sync_task(
363 encryption_sync_service,
364 sender.clone(),
365 sync_permit_guard.acquire().await,
366 )
367 .instrument(parent_span),
368 );
369
370 (room_list_task, encryption_sync_task)
371 }
372
373 async fn encryption_sync_task(
374 encryption_sync: Arc<EncryptionSyncService>,
375 sender: Sender<TerminationReport>,
376 sync_permit_guard: OwnedMutexGuard<EncryptionSyncPermit>,
377 ) {
378 let encryption_sync_stream = encryption_sync.sync(sync_permit_guard);
379 pin_mut!(encryption_sync_stream);
380
381 let termination_report = loop {
382 match encryption_sync_stream.next().await {
383 Some(Ok(())) => {
384 // Carry on.
385 }
386 Some(Err(error)) => {
387 let termination_report = TerminationReport::encryption_sync(Some(error));
388
389 if !termination_report.has_expired() {
390 error!(
391 "Error while processing encryption in sync service: {:#?}",
392 termination_report.error
393 );
394 }
395
396 break termination_report;
397 }
398 None => {
399 // The stream has ended.
400 break TerminationReport::encryption_sync(None);
401 }
402 }
403 };
404
405 if let Err(err) = sender.send(termination_report).await {
406 error!("Error while sending termination report: {err:#}");
407 }
408 }
409
410 async fn room_list_sync_task(
411 room_list_service: Arc<RoomListService>,
412 sender: Sender<TerminationReport>,
413 ) {
414 let room_list_stream = room_list_service.sync();
415 pin_mut!(room_list_stream);
416
417 let termination_report = loop {
418 match room_list_stream.next().await {
419 Some(Ok(())) => {
420 // Carry on.
421 }
422 Some(Err(error)) => {
423 let termination_report = TerminationReport::room_list(Some(error));
424
425 if !termination_report.has_expired() {
426 error!(
427 "Error while processing room list in sync service: {:#?}",
428 termination_report.error
429 );
430 }
431
432 break termination_report;
433 }
434 None => {
435 // The stream has ended.
436 break TerminationReport::room_list(None);
437 }
438 }
439 };
440
441 if let Err(err) = sender.send(termination_report).await {
442 error!("Error while sending termination report: {err:#}");
443 }
444 }
445
446 async fn shutdown(self) {
447 match self.termination_sender.send(TerminationReport::supervisor()).await {
448 Ok(_) => {
449 let _ = self.task.await.inspect_err(|err| {
450 // A `JoinError` indicates that the task was already dead, either because it got
451 // cancelled or because it panicked. We only cancel the task in the Err branch
452 // below and the task shouldn't be able to panic.
453 //
454 // So let's log an error and return.
455 error!("The supervisor task has stopped unexpectedly: {err:?}");
456 });
457 }
458 Err(err) => {
459 error!("Couldn't send the termination report to the supervisor task: {err}");
460 // Let's abort the task if it won't shut down properly, otherwise we would have
461 // left it as a detached task.
462 self.task.abort();
463 }
464 }
465 }
466}
467
468struct SyncServiceInner {
469 encryption_sync_service: Arc<EncryptionSyncService>,
470
471 /// Is the offline mode for the [`SyncService`] enabled?
472 ///
473 /// The offline mode is described in the [`State::Offline`] enum variant.
474 with_offline_mode: bool,
475
476 /// What's the state of this sync service?
477 state: SharedObservable<State>,
478
479 /// The parent tracing span to use for the tasks within this service.
480 ///
481 /// Normally this will be [`Span::none`], but it may be useful to assign a
482 /// defined span, for example if there is more than one active sync
483 /// service.
484 parent_span: Span,
485
486 /// Supervisor task ensuring proper termination.
487 ///
488 /// This task is waiting for a [`TerminationReport`] from any of the other
489 /// two tasks, or from a user request via [`SyncService::stop()`]. It
490 /// makes sure that the two services are properly shut up and just
491 /// interrupted.
492 ///
493 /// This is set at the same time as the other two tasks.
494 supervisor: Option<SyncTaskSupervisor>,
495}
496
497impl SyncServiceInner {
498 async fn start(
499 &mut self,
500 room_list_service: Arc<RoomListService>,
501 encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
502 ) {
503 trace!("starting sync service");
504
505 self.supervisor =
506 Some(SyncTaskSupervisor::new(self, room_list_service, encryption_sync_permit).await);
507 self.state.set(State::Running);
508 }
509
510 async fn stop(&mut self) {
511 trace!("pausing sync service");
512
513 // Remove the supervisor from our state and request the tasks to be shutdown.
514 if let Some(supervisor) = self.supervisor.take() {
515 supervisor.shutdown().await;
516 } else {
517 error!("The sync service was not properly started, the supervisor task doesn't exist");
518 }
519 }
520
521 async fn restart(
522 &mut self,
523 room_list_service: Arc<RoomListService>,
524 encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
525 ) {
526 self.stop().await;
527 self.start(room_list_service, encryption_sync_permit).await;
528 }
529}
530
531/// A high level manager for your Matrix syncing needs.
532///
533/// The [`SyncService`] is responsible for managing real-time synchronization
534/// with a Matrix server. It can initiate and maintain the necessary
535/// synchronization tasks for you.
536///
537/// **Note**: The [`SyncService`] requires a server with support for [MSC4186],
538/// otherwise it will fail with an 404 `M_UNRECOGNIZED` request error.
539///
540/// [MSC4186]: https://github.com/matrix-org/matrix-spec-proposals/pull/4186/
541///
542/// # Example
543///
544/// ```no_run
545/// use matrix_sdk::Client;
546/// use matrix_sdk_ui::sync_service::{State, SyncService};
547/// # use url::Url;
548/// # async {
549/// let homeserver = Url::parse("http://example.com")?;
550/// let client = Client::new(homeserver).await?;
551///
552/// client
553/// .matrix_auth()
554/// .login_username("example", "wordpass")
555/// .initial_device_display_name("My bot")
556/// .await?;
557///
558/// let sync_service = SyncService::builder(client).build().await?;
559/// let mut state = sync_service.state();
560///
561/// while let Some(state) = state.next().await {
562/// match state {
563/// State::Idle => eprintln!("The sync service is idle."),
564/// State::Running => eprintln!("The sync has started to run."),
565/// State::Offline => eprintln!(
566/// "We have entered the offline mode, the server seems to be
567/// unavailable"
568/// ),
569/// State::Terminated => {
570/// eprintln!("The sync service has been gracefully terminated");
571/// break;
572/// }
573/// State::Error(_) => {
574/// eprintln!("The sync service has run into an error");
575/// break;
576/// }
577/// }
578/// }
579/// # anyhow::Ok(()) };
580/// ```
581pub struct SyncService {
582 inner: Arc<AsyncMutex<SyncServiceInner>>,
583
584 /// Room list service used to synchronize the rooms state.
585 room_list_service: Arc<RoomListService>,
586
587 /// What's the state of this sync service? This field is replicated from the
588 /// [`SyncServiceInner`] struct, but it should not be modified in this
589 /// struct. It's re-exposed here so we can subscribe to the state without
590 /// taking the lock on the `inner` field.
591 state: SharedObservable<State>,
592
593 /// Global lock to allow using at most one [`EncryptionSyncService`] at all
594 /// times.
595 ///
596 /// This ensures that there's only one ever existing in the application's
597 /// lifetime (under the assumption that there is at most one [`SyncService`]
598 /// per application).
599 encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
600}
601
602impl SyncService {
603 /// Create a new builder for configuring an `SyncService`.
604 pub fn builder(client: Client) -> SyncServiceBuilder {
605 SyncServiceBuilder::new(client)
606 }
607
608 /// Get the underlying `RoomListService` instance for easier access to its
609 /// methods.
610 pub fn room_list_service(&self) -> Arc<RoomListService> {
611 self.room_list_service.clone()
612 }
613
614 /// Returns the state of the sync service.
615 pub fn state(&self) -> Subscriber<State> {
616 self.state.subscribe()
617 }
618
619 /// Start (or restart) the underlying sliding syncs.
620 ///
621 /// This can be called multiple times safely:
622 /// - if the stream is still properly running, it won't be restarted.
623 /// - if the [`SyncService`] is in the offline mode we will exit the offline
624 /// mode and immediately attempt to sync again.
625 /// - if the stream has been aborted before, it will be properly cleaned up
626 /// and restarted.
627 pub async fn start(&self) {
628 let mut inner = self.inner.lock().await;
629
630 // Only (re)start the tasks if it's stopped or if we're in the offline mode.
631 match inner.state.get() {
632 // If we're already running, there's nothing to do.
633 State::Running => {}
634 // If we're in the offline mode, first stop the service and then start it again.
635 State::Offline => {
636 inner
637 .restart(self.room_list_service.clone(), self.encryption_sync_permit.clone())
638 .await
639 }
640 // Otherwise just start.
641 State::Idle | State::Terminated | State::Error(_) => {
642 inner
643 .start(self.room_list_service.clone(), self.encryption_sync_permit.clone())
644 .await
645 }
646 }
647 }
648
649 /// Stop the underlying sliding syncs.
650 ///
651 /// This must be called when the app goes into the background. It's better
652 /// to call this API when the application exits, although not strictly
653 /// necessary.
654 #[instrument(skip_all)]
655 pub async fn stop(&self) {
656 let mut inner = self.inner.lock().await;
657
658 match inner.state.get() {
659 State::Idle | State::Terminated | State::Error(_) => {
660 // No need to stop if we were not running.
661 return;
662 }
663 State::Running | State::Offline => {}
664 }
665
666 inner.stop().await;
667 }
668
669 /// Force expiring both sessions.
670 ///
671 /// This ensures that the sync service is stopped before expiring both
672 /// sessions. It should be used sparingly, as it will cause a restart of
673 /// the sessions on the server as well.
674 #[instrument(skip_all)]
675 pub async fn expire_sessions(&self) {
676 // First, stop the sync service if it was running; it's a no-op if it was
677 // already stopped.
678 self.stop().await;
679
680 // Expire the room list sync session.
681 self.room_list_service.expire_sync_session().await;
682
683 // Expire the encryption sync session.
684 self.inner.lock().await.encryption_sync_service.expire_sync_session().await;
685 }
686
687 /// Attempt to get a permit to use an `EncryptionSyncService` at a given
688 /// time.
689 ///
690 /// This ensures there is at most one [`EncryptionSyncService`] active at
691 /// any time, per application.
692 pub fn try_get_encryption_sync_permit(&self) -> Option<OwnedMutexGuard<EncryptionSyncPermit>> {
693 self.encryption_sync_permit.clone().try_lock_owned().ok()
694 }
695}
696
697#[derive(Debug)]
698enum TerminationOrigin {
699 EncryptionSync,
700 RoomList,
701 Supervisor,
702}
703
704#[derive(Debug)]
705struct TerminationReport {
706 /// The origin of the termination.
707 origin: TerminationOrigin,
708
709 /// If the termination is due to an error, this is the cause.
710 error: Option<Error>,
711}
712
713impl TerminationReport {
714 /// Create a new [`TerminationReport`] with `origin` set to
715 /// [`TerminationOrigin::EncryptionSync`] and `error` set to
716 /// [`Error::EncryptionSync`].
717 fn encryption_sync(error: Option<encryption_sync_service::Error>) -> Self {
718 Self { origin: TerminationOrigin::EncryptionSync, error: error.map(Error::EncryptionSync) }
719 }
720
721 /// Create a new [`TerminationReport`] with `origin` set to
722 /// [`TerminationOrigin::RoomList`] and `error` set to [`Error::RoomList`].
723 fn room_list(error: Option<room_list_service::Error>) -> Self {
724 Self { origin: TerminationOrigin::RoomList, error: error.map(Error::RoomList) }
725 }
726
727 /// Create a new [`TerminationReport`] with `origin` set to
728 /// [`TerminationOrigin::Supervisor`] and `error` set to
729 /// [`Error::Supervisor`].
730 fn supervisor_error() -> Self {
731 Self { origin: TerminationOrigin::Supervisor, error: Some(Error::Supervisor) }
732 }
733
734 /// Create a new [`TerminationReport`] with `origin` set to
735 /// [`TerminationOrigin::Supervisor`] and `error` set to `None`.
736 fn supervisor() -> Self {
737 Self { origin: TerminationOrigin::Supervisor, error: None }
738 }
739
740 /// Check whether the termination is due to an expired sliding sync session.
741 fn has_expired(&self) -> bool {
742 match &self.error {
743 Some(Error::RoomList(room_list_service::Error::SlidingSync(error)))
744 | Some(Error::EncryptionSync(encryption_sync_service::Error::SlidingSync(error))) => {
745 error.client_api_error_kind()
746 == Some(&ruma::api::client::error::ErrorKind::UnknownPos)
747 }
748 _ => false,
749 }
750 }
751}
752
753// Testing helpers, mostly.
754#[doc(hidden)]
755impl SyncService {
756 /// Is the task supervisor running?
757 pub async fn is_supervisor_running(&self) -> bool {
758 self.inner.lock().await.supervisor.is_some()
759 }
760}
761
762#[derive(Clone)]
763pub struct SyncServiceBuilder {
764 /// SDK client.
765 client: Client,
766
767 /// Is the cross-process lock for the crypto store enabled?
768 with_cross_process_lock: bool,
769
770 /// Is the offline mode for the [`SyncService`] enabled?
771 ///
772 /// The offline mode is described in the [`State::Offline`] enum variant.
773 with_offline_mode: bool,
774
775 /// Whether to turn [`SlidingSyncBuilder::share_pos`] on or off.
776 ///
777 /// [`SlidingSyncBuilder::share_pos`]: matrix_sdk::sliding_sync::SlidingSyncBuilder::share_pos
778 with_share_pos: bool,
779
780 /// The parent tracing span to use for the tasks within this service.
781 ///
782 /// Normally this will be [`Span::none`], but it may be useful to assign a
783 /// defined span, for example if there is more than one active sync
784 /// service.
785 parent_span: Span,
786}
787
788impl SyncServiceBuilder {
789 fn new(client: Client) -> Self {
790 Self {
791 client,
792 with_cross_process_lock: false,
793 with_offline_mode: false,
794 with_share_pos: true,
795 parent_span: Span::none(),
796 }
797 }
798
799 /// Enables the cross-process lock, if the sync service is being built in a
800 /// multi-process setup.
801 ///
802 /// It's a prerequisite if another process can *also* process encryption
803 /// events. This is only applicable to very specific use cases, like an
804 /// external process attempting to decrypt notifications. In general,
805 /// `with_cross_process_lock` should not be called.
806 ///
807 /// Be sure to have configured
808 /// [`Client::cross_process_store_locks_holder_name`] accordingly.
809 pub fn with_cross_process_lock(mut self) -> Self {
810 self.with_cross_process_lock = true;
811 self
812 }
813
814 /// Enable the "offline" mode for the [`SyncService`].
815 ///
816 /// To learn more about the "offline" mode read the documentation for the
817 /// [`State::Offline`] enum variant.
818 pub fn with_offline_mode(mut self) -> Self {
819 self.with_offline_mode = true;
820 self
821 }
822
823 /// Whether to turn [`SlidingSyncBuilder::share_pos`] on or off.
824 ///
825 /// [`SlidingSyncBuilder::share_pos`]: matrix_sdk::sliding_sync::SlidingSyncBuilder::share_pos
826 pub fn with_share_pos(mut self, enable: bool) -> Self {
827 self.with_share_pos = enable;
828 self
829 }
830
831 /// Set the parent tracing span to be used for the tasks within this
832 /// service.
833 pub fn with_parent_span(mut self, parent_span: Span) -> Self {
834 self.parent_span = parent_span;
835 self
836 }
837
838 /// Finish setting up the [`SyncService`].
839 ///
840 /// This creates the underlying sliding syncs, and will *not* start them in
841 /// the background. The resulting [`SyncService`] must be kept alive as long
842 /// as the sliding syncs are supposed to run.
843 pub async fn build(self) -> Result<SyncService, Error> {
844 let Self {
845 client,
846 with_cross_process_lock,
847 with_offline_mode,
848 with_share_pos,
849 parent_span,
850 } = self;
851
852 let encryption_sync_permit = Arc::new(AsyncMutex::new(EncryptionSyncPermit::new()));
853
854 let room_list = RoomListService::new_with_share_pos(client.clone(), with_share_pos).await?;
855
856 let encryption_sync = Arc::new(
857 EncryptionSyncService::new(client, None, WithLocking::from(with_cross_process_lock))
858 .await?,
859 );
860
861 let room_list_service = Arc::new(room_list);
862 let state = SharedObservable::new(State::Idle);
863
864 Ok(SyncService {
865 state: state.clone(),
866 room_list_service,
867 encryption_sync_permit,
868 inner: Arc::new(AsyncMutex::new(SyncServiceInner {
869 supervisor: None,
870 encryption_sync_service: encryption_sync,
871 state,
872 with_offline_mode,
873 parent_span,
874 })),
875 })
876 }
877}
878
879/// Errors for the [`SyncService`] API.
880#[derive(Debug, Error)]
881pub enum Error {
882 /// An error received from the `RoomListService` API.
883 #[error(transparent)]
884 RoomList(#[from] room_list_service::Error),
885
886 /// An error received from the `EncryptionSyncService` API.
887 #[error(transparent)]
888 EncryptionSync(#[from] encryption_sync_service::Error),
889
890 /// An error had occurred in the sync task supervisor, likely due to a bug.
891 #[error("the supervisor channel has run into an unexpected error")]
892 Supervisor,
893}