matrix_sdk_ui/sync_service.rs
1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15//! Unified API for both the Room List API and the Encryption Sync API, that
16//! takes care of all the underlying details.
17//!
18//! This is an opiniated way to run both APIs, with high-level callbacks that
19//! should be called in reaction to user actions and/or system events.
20//!
21//! The sync service will signal errors via its [`state`](SyncService::state)
22//! that the user MUST observe. Whenever an error/termination is observed, the
23//! user should call [`SyncService::start()`] again to restart the room list
24//! sync, if that is not desirable, the offline support for the [`SyncService`]
25//! may be enabled using the [`SyncServiceBuilder::with_offline_mode`] setting.
26
27use std::{sync::Arc, time::Duration};
28
29use eyeball::{SharedObservable, Subscriber};
30use futures_util::{
31 StreamExt as _,
32 future::{Either, select},
33 pin_mut,
34};
35use matrix_sdk::{
36 Client,
37 config::RequestConfig,
38 executor::{JoinHandle, spawn},
39 sleep::sleep,
40};
41use thiserror::Error;
42use tokio::sync::{
43 Mutex as AsyncMutex, OwnedMutexGuard,
44 mpsc::{Receiver, Sender},
45};
46use tracing::{Instrument, Level, Span, error, info, instrument, trace, warn};
47
48use crate::{
49 encryption_sync_service::{self, EncryptionSyncPermit, EncryptionSyncService},
50 room_list_service::{
51 self, DEFAULT_CONNECTION_ID, DEFAULT_LIST_TIMELINE_LIMIT, RoomListService,
52 },
53};
54
55/// Current state of the application.
56///
57/// This is a high-level state indicating what's the status of the underlying
58/// syncs. The application starts in [`State::Running`] mode, and then hits a
59/// terminal state [`State::Terminated`] (if it gracefully exited) or
60/// [`State::Error`] (in case any of the underlying syncs ran into an error).
61///
62/// This can be observed with [`SyncService::state`].
63#[derive(Clone, Debug)]
64pub enum State {
65 /// The service hasn't ever been started yet, or has been stopped.
66 Idle,
67
68 /// The underlying syncs are properly running in the background.
69 Running,
70
71 /// Any of the underlying syncs has terminated gracefully (i.e. be stopped).
72 Terminated,
73
74 /// Any of the underlying syncs has ran into an error.
75 ///
76 /// The associated [`enum@Error`] is inside an [`Arc`] to (i) make [`State`]
77 /// cloneable, and to (ii) not make it heavier.
78 Error(Arc<Error>),
79
80 /// The service has entered offline mode. This state will only be entered if
81 /// the [`SyncService`] has been built with the
82 /// [`SyncServiceBuilder::with_offline_mode`] setting.
83 ///
84 /// The [`SyncService`] will enter the offline mode if syncing with the
85 /// server fails, it will then periodically check if the server is
86 /// available using the `/_matrix/client/versions` endpoint.
87 ///
88 /// Once the [`SyncService`] receives a 200 response from the
89 /// `/_matrix/client/versions` endpoint, it will go back into the
90 /// [`State::Running`] mode and attempt to sync again.
91 ///
92 /// Calling [`SyncService::start()`] while in this state will abort the
93 /// `/_matrix/client/versions` checks and attempt to sync immediately.
94 ///
95 /// Calling [`SyncService::stop()`] will abort the offline mode and the
96 /// [`SyncService`] will go into the [`State::Idle`] mode.
97 Offline,
98}
99
100enum MaybeAcquiredPermit {
101 Acquired(OwnedMutexGuard<EncryptionSyncPermit>),
102 Unacquired(Arc<AsyncMutex<EncryptionSyncPermit>>),
103}
104
105impl MaybeAcquiredPermit {
106 async fn acquire(self) -> OwnedMutexGuard<EncryptionSyncPermit> {
107 match self {
108 MaybeAcquiredPermit::Acquired(owned_mutex_guard) => owned_mutex_guard,
109 MaybeAcquiredPermit::Unacquired(lock) => lock.lock_owned().await,
110 }
111 }
112}
113
114/// A supervisor responsible for managing two sync tasks: one for handling the
115/// room list and another for supporting end-to-end encryption.
116///
117/// The two sync tasks are spawned as child tasks and are contained within the
118/// supervising task, which is stored in the [`SyncTaskSupervisor::task`] field.
119///
120/// The supervisor ensures the two child tasks are managed as a single unit,
121/// allowing for them to be shutdown in unison.
122struct SyncTaskSupervisor {
123 /// The supervising task that manages and contains the two sync child tasks.
124 task: JoinHandle<()>,
125 /// [`TerminationReport`] sender for the [`SyncTaskSupervisor::shutdown()`]
126 /// function.
127 termination_sender: Sender<TerminationReport>,
128}
129
130impl SyncTaskSupervisor {
131 async fn new(
132 inner: &SyncServiceInner,
133 room_list_service: Arc<RoomListService>,
134 encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
135 ) -> Self {
136 let (task, termination_sender) =
137 Self::spawn_supervisor_task(inner, room_list_service, encryption_sync_permit).await;
138
139 Self { task, termination_sender }
140 }
141
142 /// Check if a homeserver is reachable.
143 ///
144 /// This function handles the offline mode by waiting for either a
145 /// termination report or a successful `/_matrix/client/versions` response.
146 ///
147 /// This function waits for two conditions:
148 ///
149 /// 1. Waiting for a termination report: This ensures that the user can exit
150 /// offline mode and attempt to restart the [`SyncService`] manually.
151 ///
152 /// 2. Waiting to come back online: This continuously checks server
153 /// availability.
154 ///
155 /// If the `/_matrix/client/versions` request succeeds, the function exits
156 /// without a termination report. If we receive a [`TerminationReport`] from
157 /// the user, we exit immediately and return the termination report.
158 async fn offline_check(
159 client: &Client,
160 receiver: &mut Receiver<TerminationReport>,
161 ) -> Option<TerminationReport> {
162 info!("Entering the offline mode");
163
164 let wait_for_termination_report = async {
165 loop {
166 // Since we didn't empty the channel when entering the offline mode in fear that
167 // we might miss a report with the
168 // `TerminationOrigin::Supervisor` origin and the channel might contain stale
169 // reports from one of the sync services, in case both of them have sent a
170 // report, let's ignore all reports we receive from the sync
171 // services.
172 let report =
173 receiver.recv().await.unwrap_or_else(TerminationReport::supervisor_error);
174
175 match report.origin {
176 TerminationOrigin::EncryptionSync | TerminationOrigin::RoomList => {}
177 // Since the sync service aren't running anymore, we can only receive a report
178 // from the supervisor. It would have probably made sense to have separate
179 // channels for reports the sync services send and the user can send using the
180 // `SyncService::stop()` method.
181 TerminationOrigin::Supervisor => break report,
182 }
183 }
184 };
185
186 let wait_to_be_online = async move {
187 loop {
188 // Encountering network failures when sending a request which has with no retry
189 // limit set in the `RequestConfig` are treated as permanent failures and our
190 // exponential backoff doesn't kick in.
191 //
192 // Let's set a retry limit so network failures are retried as well.
193 let request_config = RequestConfig::default().retry_limit(5);
194
195 // We're in an infinite loop, but our request sending already has an exponential
196 // backoff set up. This will kick in for any request errors that we consider to
197 // be transient. Common network errors (timeouts, DNS failures) or any server
198 // error in the 5xx range of HTTP errors are considered to be transient.
199 //
200 // Still, as a precaution, we're going to sleep here for a while in the Error
201 // case.
202 match client.fetch_server_versions(Some(request_config)).await {
203 Ok(_) => break,
204 Err(_) => sleep(Duration::from_millis(100)).await,
205 }
206 }
207 };
208
209 pin_mut!(wait_for_termination_report);
210 pin_mut!(wait_to_be_online);
211
212 let maybe_termination_report = select(wait_for_termination_report, wait_to_be_online).await;
213
214 let report = match maybe_termination_report {
215 Either::Left((termination_report, _)) => Some(termination_report),
216 Either::Right((_, _)) => None,
217 };
218
219 info!("Exiting offline mode: {report:?}");
220
221 report
222 }
223
224 /// The role of the supervisor task is to wait for a termination message
225 /// ([`TerminationReport`]), sent either because we wanted to stop both
226 /// syncs, or because one of the syncs failed (in which case we'll stop the
227 /// other one too).
228 async fn spawn_supervisor_task(
229 inner: &SyncServiceInner,
230 room_list_service: Arc<RoomListService>,
231 encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
232 ) -> (JoinHandle<()>, Sender<TerminationReport>) {
233 let (sender, mut receiver) = tokio::sync::mpsc::channel(16);
234
235 let encryption_sync = inner.encryption_sync_service.clone();
236 let state = inner.state.clone();
237 let termination_sender = sender.clone();
238
239 // When we first start, and don't use offline mode, we want to acquire the sync
240 // permit before we enter a future that might be polled at a later time,
241 // this means that the permit will be acquired as soon as this future,
242 // the one the `spawn_supervisor_task` function creates, is awaited.
243 //
244 // In other words, once `sync_service.start().await` is finished, the permit
245 // will be in the acquired state.
246 let mut sync_permit_guard =
247 MaybeAcquiredPermit::Acquired(encryption_sync_permit.clone().lock_owned().await);
248
249 let offline_mode = inner.with_offline_mode;
250 let parent_span = inner.parent_span.clone();
251
252 let future = async move {
253 loop {
254 let (room_list_task, encryption_sync_task) = Self::spawn_child_tasks(
255 room_list_service.clone(),
256 encryption_sync.clone(),
257 sync_permit_guard,
258 sender.clone(),
259 parent_span.clone(),
260 )
261 .await;
262
263 sync_permit_guard = MaybeAcquiredPermit::Unacquired(encryption_sync_permit.clone());
264
265 let report = if let Some(report) = receiver.recv().await {
266 report
267 } else {
268 info!("internal channel has been closed?");
269 // We should still stop the child tasks in the unlikely scenario that our
270 // receiver died.
271 TerminationReport::supervisor_error()
272 };
273
274 // If one service failed, make sure to request stopping the other one.
275 let (stop_room_list, stop_encryption) = match &report.origin {
276 TerminationOrigin::EncryptionSync => (true, false),
277 TerminationOrigin::RoomList => (false, true),
278 TerminationOrigin::Supervisor => (true, true),
279 };
280
281 // Stop both services, and wait for the streams to properly finish: at some
282 // point they'll return `None` and will exit their infinite loops, and their
283 // tasks will gracefully terminate.
284
285 if stop_room_list {
286 if let Err(err) = room_list_service.stop_sync() {
287 warn!(?report, "unable to stop room list service: {err:#}");
288 }
289
290 if report.has_expired() {
291 room_list_service.expire_sync_session().await;
292 }
293 }
294
295 if let Err(err) = room_list_task.await {
296 error!("when awaiting room list service: {err:#}");
297 }
298
299 if stop_encryption {
300 if let Err(err) = encryption_sync.stop_sync() {
301 warn!(?report, "unable to stop encryption sync: {err:#}");
302 }
303
304 if report.has_expired() {
305 encryption_sync.expire_sync_session().await;
306 }
307 }
308
309 if let Err(err) = encryption_sync_task.await {
310 error!("when awaiting encryption sync: {err:#}");
311 }
312
313 if let Some(error) = report.error {
314 if offline_mode {
315 state.set(State::Offline);
316
317 let client = room_list_service.client();
318
319 if let Some(report) = Self::offline_check(client, &mut receiver).await {
320 if let Some(error) = report.error {
321 state.set(State::Error(Arc::new(error)));
322 } else {
323 state.set(State::Idle);
324 }
325 break;
326 }
327
328 state.set(State::Running);
329 } else {
330 state.set(State::Error(Arc::new(error)));
331 break;
332 }
333 } else if matches!(report.origin, TerminationOrigin::Supervisor) {
334 state.set(State::Idle);
335 break;
336 } else {
337 state.set(State::Terminated);
338 break;
339 }
340 }
341 }
342 .instrument(tracing::span!(Level::WARN, "supervisor task"));
343
344 let task = spawn(future);
345
346 (task, termination_sender)
347 }
348
349 async fn spawn_child_tasks(
350 room_list_service: Arc<RoomListService>,
351 encryption_sync_service: Arc<EncryptionSyncService>,
352 sync_permit_guard: MaybeAcquiredPermit,
353 sender: Sender<TerminationReport>,
354 parent_span: Span,
355 ) -> (JoinHandle<()>, JoinHandle<()>) {
356 // First, take care of the room list.
357 let room_list_task = spawn(
358 Self::room_list_sync_task(room_list_service, sender.clone())
359 .instrument(parent_span.clone()),
360 );
361
362 // Then, take care of the encryption sync.
363 let encryption_sync_task = spawn(
364 Self::encryption_sync_task(
365 encryption_sync_service,
366 sender.clone(),
367 sync_permit_guard.acquire().await,
368 )
369 .instrument(parent_span),
370 );
371
372 (room_list_task, encryption_sync_task)
373 }
374
375 async fn encryption_sync_task(
376 encryption_sync: Arc<EncryptionSyncService>,
377 sender: Sender<TerminationReport>,
378 sync_permit_guard: OwnedMutexGuard<EncryptionSyncPermit>,
379 ) {
380 let encryption_sync_stream = encryption_sync.sync(sync_permit_guard);
381 pin_mut!(encryption_sync_stream);
382
383 let termination_report = loop {
384 match encryption_sync_stream.next().await {
385 Some(Ok(())) => {
386 // Carry on.
387 }
388 Some(Err(error)) => {
389 let termination_report = TerminationReport::encryption_sync(Some(error));
390
391 if !termination_report.has_expired() {
392 error!(
393 "Error while processing encryption in sync service: {:#?}",
394 termination_report.error
395 );
396 }
397
398 break termination_report;
399 }
400 None => {
401 // The stream has ended.
402 break TerminationReport::encryption_sync(None);
403 }
404 }
405 };
406
407 if let Err(err) = sender.send(termination_report).await {
408 error!("Error while sending termination report: {err:#}");
409 }
410 }
411
412 async fn room_list_sync_task(
413 room_list_service: Arc<RoomListService>,
414 sender: Sender<TerminationReport>,
415 ) {
416 let room_list_stream = room_list_service.sync();
417 pin_mut!(room_list_stream);
418
419 let termination_report = loop {
420 match room_list_stream.next().await {
421 Some(Ok(())) => {
422 // Carry on.
423 }
424 Some(Err(error)) => {
425 let termination_report = TerminationReport::room_list(Some(error));
426
427 if !termination_report.has_expired() {
428 error!(
429 "Error while processing room list in sync service: {:#?}",
430 termination_report.error
431 );
432 }
433
434 break termination_report;
435 }
436 None => {
437 // The stream has ended.
438 break TerminationReport::room_list(None);
439 }
440 }
441 };
442
443 if let Err(err) = sender.send(termination_report).await {
444 error!("Error while sending termination report: {err:#}");
445 }
446 }
447
448 async fn shutdown(self) {
449 match self.termination_sender.send(TerminationReport::supervisor()).await {
450 Ok(_) => {
451 let _ = self.task.await.inspect_err(|err| {
452 // A `JoinError` indicates that the task was already dead, either because it got
453 // cancelled or because it panicked. We only cancel the task in the Err branch
454 // below and the task shouldn't be able to panic.
455 //
456 // So let's log an error and return.
457 error!("The supervisor task has stopped unexpectedly: {err:?}");
458 });
459 }
460 Err(err) => {
461 error!("Couldn't send the termination report to the supervisor task: {err}");
462 // Let's abort the task if it won't shut down properly, otherwise we would have
463 // left it as a detached task.
464 self.task.abort();
465 }
466 }
467 }
468}
469
470struct SyncServiceInner {
471 encryption_sync_service: Arc<EncryptionSyncService>,
472
473 /// Is the offline mode for the [`SyncService`] enabled?
474 ///
475 /// The offline mode is described in the [`State::Offline`] enum variant.
476 with_offline_mode: bool,
477
478 /// What's the state of this sync service?
479 state: SharedObservable<State>,
480
481 /// The parent tracing span to use for the tasks within this service.
482 ///
483 /// Normally this will be [`Span::none`], but it may be useful to assign a
484 /// defined span, for example if there is more than one active sync
485 /// service.
486 parent_span: Span,
487
488 /// Supervisor task ensuring proper termination.
489 ///
490 /// This task is waiting for a [`TerminationReport`] from any of the other
491 /// two tasks, or from a user request via [`SyncService::stop()`]. It
492 /// makes sure that the two services are properly shut up and just
493 /// interrupted.
494 ///
495 /// This is set at the same time as the other two tasks.
496 supervisor: Option<SyncTaskSupervisor>,
497}
498
499impl SyncServiceInner {
500 async fn start(
501 &mut self,
502 room_list_service: Arc<RoomListService>,
503 encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
504 ) {
505 trace!("starting sync service");
506
507 self.supervisor =
508 Some(SyncTaskSupervisor::new(self, room_list_service, encryption_sync_permit).await);
509 self.state.set(State::Running);
510 }
511
512 async fn stop(&mut self) {
513 trace!("pausing sync service");
514
515 // Remove the supervisor from our state and request the tasks to be shutdown.
516 if let Some(supervisor) = self.supervisor.take() {
517 supervisor.shutdown().await;
518 } else {
519 error!("The sync service was not properly started, the supervisor task doesn't exist");
520 }
521 }
522
523 async fn restart(
524 &mut self,
525 room_list_service: Arc<RoomListService>,
526 encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
527 ) {
528 self.stop().await;
529 self.start(room_list_service, encryption_sync_permit).await;
530 }
531}
532
533/// A high level manager for your Matrix syncing needs.
534///
535/// The [`SyncService`] is responsible for managing real-time synchronization
536/// with a Matrix server. It can initiate and maintain the necessary
537/// synchronization tasks for you.
538///
539/// **Note**: The [`SyncService`] requires a server with support for [MSC4186],
540/// otherwise it will fail with an 404 `M_UNRECOGNIZED` request error.
541///
542/// [MSC4186]: https://github.com/matrix-org/matrix-spec-proposals/pull/4186/
543///
544/// # Example
545///
546/// ```no_run
547/// use matrix_sdk::Client;
548/// use matrix_sdk_ui::sync_service::{State, SyncService};
549/// # use url::Url;
550/// # async {
551/// let homeserver = Url::parse("http://example.com")?;
552/// let client = Client::new(homeserver).await?;
553///
554/// client
555/// .matrix_auth()
556/// .login_username("example", "wordpass")
557/// .initial_device_display_name("My bot")
558/// .await?;
559///
560/// let sync_service = SyncService::builder(client).build().await?;
561/// let mut state = sync_service.state();
562///
563/// while let Some(state) = state.next().await {
564/// match state {
565/// State::Idle => eprintln!("The sync service is idle."),
566/// State::Running => eprintln!("The sync has started to run."),
567/// State::Offline => eprintln!(
568/// "We have entered the offline mode, the server seems to be
569/// unavailable"
570/// ),
571/// State::Terminated => {
572/// eprintln!("The sync service has been gracefully terminated");
573/// break;
574/// }
575/// State::Error(_) => {
576/// eprintln!("The sync service has run into an error");
577/// break;
578/// }
579/// }
580/// }
581/// # anyhow::Ok(()) };
582/// ```
583pub struct SyncService {
584 inner: Arc<AsyncMutex<SyncServiceInner>>,
585
586 /// Room list service used to synchronize the rooms state.
587 room_list_service: Arc<RoomListService>,
588
589 /// What's the state of this sync service? This field is replicated from the
590 /// [`SyncServiceInner`] struct, but it should not be modified in this
591 /// struct. It's re-exposed here so we can subscribe to the state without
592 /// taking the lock on the `inner` field.
593 state: SharedObservable<State>,
594
595 /// Global lock to allow using at most one [`EncryptionSyncService`] at all
596 /// times.
597 ///
598 /// This ensures that there's only one ever existing in the application's
599 /// lifetime (under the assumption that there is at most one [`SyncService`]
600 /// per application).
601 encryption_sync_permit: Arc<AsyncMutex<EncryptionSyncPermit>>,
602}
603
604impl SyncService {
605 /// Create a new builder for configuring an `SyncService`.
606 pub fn builder(client: Client) -> SyncServiceBuilder {
607 SyncServiceBuilder::new(client)
608 }
609
610 /// Get the underlying `RoomListService` instance for easier access to its
611 /// methods.
612 pub fn room_list_service(&self) -> Arc<RoomListService> {
613 self.room_list_service.clone()
614 }
615
616 /// Returns the state of the sync service.
617 pub fn state(&self) -> Subscriber<State> {
618 self.state.subscribe()
619 }
620
621 /// Start (or restart) the underlying sliding syncs.
622 ///
623 /// This can be called multiple times safely:
624 /// - if the stream is still properly running, it won't be restarted.
625 /// - if the [`SyncService`] is in the offline mode we will exit the offline
626 /// mode and immediately attempt to sync again.
627 /// - if the stream has been aborted before, it will be properly cleaned up
628 /// and restarted.
629 pub async fn start(&self) {
630 let mut inner = self.inner.lock().await;
631
632 // Only (re)start the tasks if it's stopped or if we're in the offline mode.
633 match inner.state.get() {
634 // If we're already running, there's nothing to do.
635 State::Running => {}
636 // If we're in the offline mode, first stop the service and then start it again.
637 State::Offline => {
638 inner
639 .restart(self.room_list_service.clone(), self.encryption_sync_permit.clone())
640 .await
641 }
642 // Otherwise just start.
643 State::Idle | State::Terminated | State::Error(_) => {
644 inner
645 .start(self.room_list_service.clone(), self.encryption_sync_permit.clone())
646 .await
647 }
648 }
649 }
650
651 /// Stop the underlying sliding syncs.
652 ///
653 /// This must be called when the app goes into the background. It's better
654 /// to call this API when the application exits, although not strictly
655 /// necessary.
656 #[instrument(skip_all)]
657 pub async fn stop(&self) {
658 let mut inner = self.inner.lock().await;
659
660 match inner.state.get() {
661 State::Idle | State::Terminated | State::Error(_) => {
662 // No need to stop if we were not running.
663 return;
664 }
665 State::Running | State::Offline => {}
666 }
667
668 inner.stop().await;
669 }
670
671 /// Force expiring both sessions.
672 ///
673 /// This ensures that the sync service is stopped before expiring both
674 /// sessions. It should be used sparingly, as it will cause a restart of
675 /// the sessions on the server as well.
676 #[instrument(skip_all)]
677 pub async fn expire_sessions(&self) {
678 // First, stop the sync service if it was running; it's a no-op if it was
679 // already stopped.
680 self.stop().await;
681
682 // Expire the room list sync session.
683 self.room_list_service.expire_sync_session().await;
684
685 // Expire the encryption sync session.
686 self.inner.lock().await.encryption_sync_service.expire_sync_session().await;
687 }
688
689 /// Attempt to get a permit to use an `EncryptionSyncService` at a given
690 /// time.
691 ///
692 /// This ensures there is at most one [`EncryptionSyncService`] active at
693 /// any time, per application.
694 pub fn try_get_encryption_sync_permit(&self) -> Option<OwnedMutexGuard<EncryptionSyncPermit>> {
695 self.encryption_sync_permit.clone().try_lock_owned().ok()
696 }
697}
698
699#[derive(Debug)]
700enum TerminationOrigin {
701 EncryptionSync,
702 RoomList,
703 Supervisor,
704}
705
706#[derive(Debug)]
707struct TerminationReport {
708 /// The origin of the termination.
709 origin: TerminationOrigin,
710
711 /// If the termination is due to an error, this is the cause.
712 error: Option<Error>,
713}
714
715impl TerminationReport {
716 /// Create a new [`TerminationReport`] with `origin` set to
717 /// [`TerminationOrigin::EncryptionSync`] and `error` set to
718 /// [`Error::EncryptionSync`].
719 fn encryption_sync(error: Option<encryption_sync_service::Error>) -> Self {
720 Self { origin: TerminationOrigin::EncryptionSync, error: error.map(Error::EncryptionSync) }
721 }
722
723 /// Create a new [`TerminationReport`] with `origin` set to
724 /// [`TerminationOrigin::RoomList`] and `error` set to [`Error::RoomList`].
725 fn room_list(error: Option<room_list_service::Error>) -> Self {
726 Self { origin: TerminationOrigin::RoomList, error: error.map(Error::RoomList) }
727 }
728
729 /// Create a new [`TerminationReport`] with `origin` set to
730 /// [`TerminationOrigin::Supervisor`] and `error` set to
731 /// [`Error::Supervisor`].
732 fn supervisor_error() -> Self {
733 Self { origin: TerminationOrigin::Supervisor, error: Some(Error::Supervisor) }
734 }
735
736 /// Create a new [`TerminationReport`] with `origin` set to
737 /// [`TerminationOrigin::Supervisor`] and `error` set to `None`.
738 fn supervisor() -> Self {
739 Self { origin: TerminationOrigin::Supervisor, error: None }
740 }
741
742 /// Check whether the termination is due to an expired sliding sync session.
743 fn has_expired(&self) -> bool {
744 match &self.error {
745 Some(Error::RoomList(room_list_service::Error::SlidingSync(error)))
746 | Some(Error::EncryptionSync(encryption_sync_service::Error::SlidingSync(error))) => {
747 error.client_api_error_kind() == Some(&ruma::api::error::ErrorKind::UnknownPos)
748 }
749 _ => false,
750 }
751 }
752}
753
754// Testing helpers, mostly.
755#[doc(hidden)]
756impl SyncService {
757 /// Is the task supervisor running?
758 pub async fn is_supervisor_running(&self) -> bool {
759 self.inner.lock().await.supervisor.is_some()
760 }
761}
762
763#[derive(Clone)]
764pub struct SyncServiceBuilder {
765 /// SDK client.
766 client: Client,
767
768 /// Is the offline mode for the [`SyncService`] enabled?
769 ///
770 /// The offline mode is described in the [`State::Offline`] enum variant.
771 with_offline_mode: bool,
772
773 /// Whether to turn [`SlidingSyncBuilder::share_pos`] on or off.
774 ///
775 /// [`SlidingSyncBuilder::share_pos`]: matrix_sdk::sliding_sync::SlidingSyncBuilder::share_pos
776 with_share_pos: bool,
777
778 /// Custom connection ID for the room list service.
779 /// Defaults to [`room_list_service::DEFAULT_CONNECTION_ID`]. Use a
780 /// different value for secondary processes such as iOS share extensions
781 /// that are not meant to reuse the main app's connection.
782 room_list_conn_id: String,
783
784 /// Custom timeline limit for the room list service. Defaults to
785 /// [`room_list_service::DEFAULT_LIST_TIMELINE_LIMIT`].
786 room_list_timeline_limit: u32,
787
788 /// The parent tracing span to use for the tasks within this service.
789 ///
790 /// Normally this will be [`Span::none`], but it may be useful to assign a
791 /// defined span, for example if there is more than one active sync
792 /// service.
793 parent_span: Span,
794}
795
796impl SyncServiceBuilder {
797 fn new(client: Client) -> Self {
798 Self {
799 client,
800 with_offline_mode: false,
801 with_share_pos: true,
802 room_list_conn_id: DEFAULT_CONNECTION_ID.to_owned(),
803 room_list_timeline_limit: DEFAULT_LIST_TIMELINE_LIMIT,
804 parent_span: Span::none(),
805 }
806 }
807
808 /// Enable the "offline" mode for the [`SyncService`].
809 ///
810 /// To learn more about the "offline" mode read the documentation for the
811 /// [`State::Offline`] enum variant.
812 pub fn with_offline_mode(mut self) -> Self {
813 self.with_offline_mode = true;
814 self
815 }
816
817 /// Whether to turn [`SlidingSyncBuilder::share_pos`] on or off.
818 ///
819 /// [`SlidingSyncBuilder::share_pos`]: matrix_sdk::sliding_sync::SlidingSyncBuilder::share_pos
820 pub fn with_share_pos(mut self, enable: bool) -> Self {
821 self.with_share_pos = enable;
822 self
823 }
824
825 /// Set a custom conn_id for the room list sliding sync connection.
826 pub fn with_room_list_conn_id(mut self, conn_id: String) -> Self {
827 self.room_list_conn_id = conn_id;
828 self
829 }
830
831 /// Set a custom timeline limit for the room list service.
832 pub fn with_room_list_timeline_limit(mut self, limit: u32) -> Self {
833 self.room_list_timeline_limit = limit;
834 self
835 }
836
837 /// Set the parent tracing span to be used for the tasks within this
838 /// service.
839 pub fn with_parent_span(mut self, parent_span: Span) -> Self {
840 self.parent_span = parent_span;
841 self
842 }
843
844 /// Finish setting up the [`SyncService`].
845 ///
846 /// This creates the underlying sliding syncs, and will *not* start them in
847 /// the background. The resulting [`SyncService`] must be kept alive as long
848 /// as the sliding syncs are supposed to run.
849 pub async fn build(self) -> Result<SyncService, Error> {
850 let Self {
851 client,
852 with_offline_mode,
853 with_share_pos,
854 room_list_conn_id,
855 room_list_timeline_limit,
856 parent_span,
857 } = self;
858
859 let encryption_sync_permit = Arc::new(AsyncMutex::new(EncryptionSyncPermit::new()));
860
861 let room_list = RoomListService::new_with(
862 client.clone(),
863 with_share_pos,
864 &room_list_conn_id,
865 room_list_timeline_limit,
866 )
867 .await?;
868
869 let encryption_sync = Arc::new(EncryptionSyncService::new(client, None).await?);
870
871 let room_list_service = Arc::new(room_list);
872 let state = SharedObservable::new(State::Idle);
873
874 Ok(SyncService {
875 state: state.clone(),
876 room_list_service,
877 encryption_sync_permit,
878 inner: Arc::new(AsyncMutex::new(SyncServiceInner {
879 supervisor: None,
880 encryption_sync_service: encryption_sync,
881 state,
882 with_offline_mode,
883 parent_span,
884 })),
885 })
886 }
887}
888
889/// Errors for the [`SyncService`] API.
890#[derive(Debug, Error)]
891pub enum Error {
892 /// An error received from the `RoomListService` API.
893 #[error(transparent)]
894 RoomList(#[from] room_list_service::Error),
895
896 /// An error received from the `EncryptionSyncService` API.
897 #[error(transparent)]
898 EncryptionSync(#[from] encryption_sync_service::Error),
899
900 /// An error had occurred in the sync task supervisor, likely due to a bug.
901 #[error("the supervisor channel has run into an unexpected error")]
902 Supervisor,
903}