Skip to main content

matrix_sdk_common/
task_monitor.rs

1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module provides a [`TaskMonitor`] for spawning and monitoring
16//! long-running background tasks. Tasks spawned through the monitor are
17//! monitored for panics, errors, and unexpected termination.
18//!
19//! ```no_run
20//! use matrix_sdk_common::task_monitor::TaskMonitor;
21//!
22//! let monitor = TaskMonitor::new();
23//!
24//! // Subscribe to failure notifications
25//! let mut failures = monitor.subscribe();
26//!
27//! // Spawn a monitored background task
28//! let handle = monitor.spawn_infinite_task("my_task", async {
29//!     loop {
30//!         // Do background work...
31//!         matrix_sdk_common::sleep::sleep(std::time::Duration::from_secs(1))
32//!             .await;
33//!     }
34//! });
35//!
36//! // It's also possible to have the task be aborted safely (and without a report)
37//! // when the handle is dropped.
38//! let _handle = handle.abort_on_drop();
39//!
40//! // Listen for failures in another task
41//! // while let Ok(failure) = failures.recv().await {
42//! //     eprintln!("Task {} failed: {:?}", failure.task.name, failure.reason);
43//! // }
44//! ```
45//!
46//! ## A word about unwind safety
47//!
48//! This assumes that all the code running inside the monitored tasks is [unwind
49//! safe](https://doc.rust-lang.org/std/panic/trait.UnwindSafe.html). The assumption is that these
50//! are long-running tasks that:
51//!
52//! - should not panic under normal operation,
53//! - will not be automatically restarted with state shared previously (they can
54//!   be restarted, but in this case they have to be restarted with a clean
55//!   state).
56//!
57//! In general, observers of the task monitor should consider any reported
58//! failure as fatal, and they may decide to report the error one way or another
59//! (e.g., logging, metrics) and subsequently crash the process to avoid running
60//! in a potentially corrupted state.
61//!
62//! ## WebAssembly (WASM) support
63//!
64//! Unfortunately, safe unwinding isn't supported on most WASM targets, as of
65//! 2026-01-28, so panics in monitored tasks cannot be caught and reported.
66//! Instead, a panic in a monitored task may throw a JS exception. The rest of
67//! the monitoring features (error reporting, early termination)
68//! is still functional, though.
69
70use std::{
71    any::Any,
72    collections::HashMap,
73    future::Future,
74    panic::AssertUnwindSafe,
75    sync::{
76        Arc,
77        atomic::{AtomicBool, AtomicU64, Ordering},
78    },
79};
80
81use futures_util::FutureExt;
82use tokio::sync::broadcast;
83
84use crate::{
85    SendOutsideWasm,
86    executor::{AbortHandle, spawn},
87    locks::RwLock,
88};
89
90/// Unique identifier for a background task.
91#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
92pub struct TaskId(u64);
93
94impl TaskId {
95    /// Create a new unique task ID, by incrementing a global counter.
96    fn new() -> Self {
97        static NEXT_ID: AtomicU64 = AtomicU64::new(0);
98        Self(NEXT_ID.fetch_add(1, Ordering::SeqCst))
99    }
100}
101
102impl std::fmt::Display for TaskId {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        write!(f, "TaskId({})", self.0)
105    }
106}
107
108/// Metadata about a spawned background task.
109#[derive(Debug, Clone)]
110pub struct BackgroundTaskInfo {
111    /// Unique identifier for this task.
112    pub id: TaskId,
113
114    /// Human-readable name for the task, as defined when spawning it.
115    pub name: String,
116}
117
118/// Reason why a background task failed.
119#[derive(Debug, Clone)]
120#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
121pub enum BackgroundTaskFailureReason {
122    /// The task panicked.
123    Panic {
124        /// The panic message, if it could be extracted.
125        message: Option<String>,
126        /// Backtrace captured after the panic (if available).
127        panic_backtrace: Option<String>,
128    },
129
130    /// The task returned an error.
131    Error {
132        /// String representation of the error.
133        // TODO: consider storing a boxed error instead?
134        error: String,
135    },
136
137    /// The task ended unexpectedly (for tasks expected to run forever).
138    EarlyTermination,
139}
140
141/// A report of a background task failure.
142///
143/// This is sent through the broadcast channel when a monitored task fails.
144#[derive(Debug, Clone)]
145pub struct BackgroundTaskFailure {
146    /// Information about the task that failed.
147    pub task: BackgroundTaskInfo,
148
149    /// Why the task failed.
150    pub reason: BackgroundTaskFailureReason,
151}
152
153/// Internal entry for tracking an active task.
154#[derive(Debug)]
155struct ActiveTask {
156    /// The tokio's handle to preemptively abort the task.
157    // TODO: might be useful to abort on drop?
158    _abort_handle: AbortHandle,
159}
160
161/// Default capacity for the failure broadcast channel.
162///
163/// It doesn't have to be large, because it's expected that consumers of such a
164/// failure report would likely stop execution of the SDK or take immediate
165/// corrective action, and that failures should be rare.
166const FAILURE_CHANNEL_CAPACITY: usize = 8;
167
168/// A monitor for spawning and monitoring background tasks.
169///
170/// The [`TaskMonitor`] allows you to spawn background tasks that are
171/// automatically monitored for panics, errors, and unexpected termination.
172/// In such cases, a [`BackgroundTaskFailure`] is sent through a broadcast
173/// channel that subscribers can listen to.
174///
175/// # Example
176///
177/// ```no_run
178/// use matrix_sdk_common::task_monitor::TaskMonitor;
179///
180/// let monitor = TaskMonitor::new();
181///
182/// // Subscribe to failures
183/// let mut failures = monitor.subscribe();
184///
185/// // Spawn a task that runs indefinitely
186/// let _handle = monitor.spawn_infinite_task("worker", async {
187///     loop {
188///         // Do work...
189///         matrix_sdk_common::sleep::sleep(std::time::Duration::from_secs(1))
190///             .await;
191///     }
192/// });
193/// ```
194#[derive(Debug)]
195pub struct TaskMonitor {
196    /// Sender for failure notifications.
197    failure_sender: broadcast::Sender<BackgroundTaskFailure>,
198
199    /// Map of active tasks by ID.
200    active_task_handles: Arc<RwLock<HashMap<TaskId, ActiveTask>>>,
201}
202
203impl Default for TaskMonitor {
204    fn default() -> Self {
205        Self::new()
206    }
207}
208
209impl TaskMonitor {
210    /// Create a new task monitor.
211    pub fn new() -> Self {
212        let (failure_sender, _) = broadcast::channel(FAILURE_CHANNEL_CAPACITY);
213        Self { failure_sender, active_task_handles: Default::default() }
214    }
215
216    /// Subscribe to failure notifications.
217    ///
218    /// Returns a broadcast receiver that will receive [`BackgroundTaskFailure`]
219    /// messages whenever a monitored task fails.
220    ///
221    /// Note: If the receiver falls behind, older messages may be dropped.
222    pub fn subscribe(&self) -> broadcast::Receiver<BackgroundTaskFailure> {
223        self.failure_sender.subscribe()
224    }
225
226    /// Spawn a background task that is expected to **run forever**.
227    ///
228    /// For one-off background tasks that are expected to complete successfully,
229    /// use [`Self::spawn_finite_task`] instead.
230    ///
231    /// If the task completes (whether successfully or by panicking), it will be
232    /// reported as a [`BackgroundTaskFailure`] report through the broadcast
233    /// channel.
234    ///
235    /// Use this for long-running tasks like event loops, sync tasks, or
236    /// background workers that should never complete under normal
237    /// operation.
238    ///
239    /// # Arguments
240    ///
241    /// * `name` - A human-readable name for the task (for debugging purposes).
242    /// * `future` - The async task to run.
243    ///
244    /// # Returns
245    ///
246    /// A [`BackgroundTaskHandle`] that can be used to abort the task or check
247    /// if it has finished. This is the equivalent of tokio's `JoinHandle`.
248    pub fn spawn_infinite_task<F>(&self, name: impl Into<String>, future: F) -> BackgroundTaskHandle
249    where
250        F: Future<Output = ()> + SendOutsideWasm + 'static,
251    {
252        self.spawn_task_internal(name, future, true)
253    }
254
255    /// Spawn a background job that is expected to run once and complete
256    /// successfully in the background.
257    ///
258    /// For long-term background jobs that are expected to run forever, use
259    /// [`Self::spawn_infinite_task`] instead.
260    ///
261    /// If the task completes (by panicking), it will be reported as a
262    /// [`BackgroundTaskFailure`] report through the broadcast channel.
263    ///
264    /// Use this for one-shot background tasks that should complete under normal
265    /// operation.
266    ///
267    /// # Arguments
268    ///
269    /// * `name` - A human-readable name for the task (for debugging purposes).
270    /// * `future` - The async task to run.
271    ///
272    /// # Returns
273    ///
274    /// A [`BackgroundTaskHandle`] that can be used to abort the task or check
275    /// if it has finished. This is the equivalent of tokio's `JoinHandle`.
276    pub fn spawn_finite_task<F>(&self, name: impl Into<String>, future: F) -> BackgroundTaskHandle
277    where
278        F: Future<Output = ()> + SendOutsideWasm + 'static,
279    {
280        self.spawn_task_internal(name, future, false)
281    }
282
283    fn spawn_task_internal<F>(
284        &self,
285        name: impl Into<String>,
286        future: F,
287        runs_forever: bool,
288    ) -> BackgroundTaskHandle
289    where
290        F: Future<Output = ()> + SendOutsideWasm + 'static,
291    {
292        let name = name.into();
293        let task_id = TaskId::new();
294        let task_info = BackgroundTaskInfo { id: task_id, name };
295
296        let intentionally_aborted = Arc::new(AtomicBool::new(false));
297
298        let active_tasks = self.active_task_handles.clone();
299        let failure_sender = self.failure_sender.clone();
300        let aborted_flag = intentionally_aborted.clone();
301
302        let wrapped = async move {
303            // SAFETY: see module-level documentation about unwind safety.
304            let result = AssertUnwindSafe(future).catch_unwind().await;
305
306            // Remove the task from the list of active ones.
307            active_tasks.write().remove(&task_id);
308
309            // Don't report if intentionally aborted.
310            if aborted_flag.load(Ordering::Acquire) {
311                return;
312            }
313
314            let failure_reason = match result {
315                Ok(()) => {
316                    if runs_forever {
317                        // The background forever task ended, this is considered an early
318                        // termination.
319                        BackgroundTaskFailureReason::EarlyTermination
320                    } else {
321                        // The task ended successfully, no failure to report.
322                        return;
323                    }
324                }
325
326                Err(panic_payload) => BackgroundTaskFailureReason::Panic {
327                    message: extract_panic_message(&panic_payload),
328                    panic_backtrace: capture_backtrace(),
329                },
330            };
331
332            let failure = BackgroundTaskFailure { task: task_info, reason: failure_reason };
333
334            // Forward failure to observers (ignore if there's none).
335            let _ = failure_sender.send(failure);
336        };
337
338        let join_handle = spawn(wrapped);
339        let abort_handle = join_handle.abort_handle();
340
341        // Register the task.
342        self.active_task_handles
343            .write()
344            .insert(task_id, ActiveTask { _abort_handle: abort_handle.clone() });
345
346        BackgroundTaskHandle { abort_on_drop: false, abort_handle, intentionally_aborted }
347    }
348
349    /// Spawn a background task that returns a `Result`.
350    ///
351    /// The task is monitored for panics and errors; see also
352    /// [`BackgroundTaskFailure`].
353    ///
354    /// If the task returns `Ok(())`, it is considered successful and no failure
355    /// is reported.
356    ///
357    /// # Arguments
358    ///
359    /// * `name` - A human-readable name for the task (for debugging purposes).
360    /// * `future` - The async task to run.
361    ///
362    /// # Returns
363    ///
364    /// A [`BackgroundTaskHandle`] that can be used to abort the task or check
365    /// if it has finished. This is the equivalent of tokio's `JoinHandle`.
366    pub fn spawn_fallible_task<F, E>(
367        &self,
368        name: impl Into<String>,
369        future: F,
370    ) -> BackgroundTaskHandle
371    where
372        F: Future<Output = Result<(), E>> + SendOutsideWasm + 'static,
373        E: std::error::Error + SendOutsideWasm + 'static,
374    {
375        let name = name.into();
376        let task_id = TaskId::new();
377        let task_info = BackgroundTaskInfo { id: task_id, name };
378
379        let intentionally_aborted = Arc::new(AtomicBool::new(false));
380
381        let active_tasks = self.active_task_handles.clone();
382        let failure_sender = self.failure_sender.clone();
383        let aborted_flag = intentionally_aborted.clone();
384
385        let wrapped = async move {
386            let result = AssertUnwindSafe(future).catch_unwind().await;
387
388            active_tasks.write().remove(&task_id);
389
390            // Don't report if intentionally aborted.
391            if aborted_flag.load(Ordering::Acquire) {
392                return;
393            }
394
395            let failure_reason = match result {
396                Ok(Ok(())) => {
397                    // The task ended successfully, no failure to report.
398                    return;
399                }
400
401                Ok(Err(e)) => BackgroundTaskFailureReason::Error { error: e.to_string() },
402
403                Err(panic_payload) => BackgroundTaskFailureReason::Panic {
404                    message: extract_panic_message(&panic_payload),
405                    panic_backtrace: capture_backtrace(),
406                },
407            };
408
409            // Send failure (ignore if no receivers).
410            let _ = failure_sender
411                .send(BackgroundTaskFailure { task: task_info, reason: failure_reason });
412        };
413
414        let join_handle = spawn(wrapped);
415        let abort_handle = join_handle.abort_handle();
416
417        // Register the task.
418        self.active_task_handles
419            .write()
420            .insert(task_id, ActiveTask { _abort_handle: abort_handle.clone() });
421
422        BackgroundTaskHandle { abort_on_drop: false, abort_handle, intentionally_aborted }
423    }
424}
425
426/// A handle to a spawned background task.
427///
428/// This handle can be used to abort the task or check if it has finished.
429/// When aborted through this handle, the task will NOT be reported as a
430/// failure.
431#[derive(Debug)]
432pub struct BackgroundTaskHandle {
433    /// The underlying tokio's [`AbortHandle`].
434    abort_handle: AbortHandle,
435
436    /// Should the task be safely aborted on drop?
437    ///
438    /// This won't result in a failure report, as it's an intentional abort.
439    abort_on_drop: bool,
440
441    /// An additional flag to indicate if the task was intentionally aborted, so
442    /// we don't report it as a failure when that happens.
443    intentionally_aborted: Arc<AtomicBool>,
444}
445
446impl Drop for BackgroundTaskHandle {
447    fn drop(&mut self) {
448        if self.abort_on_drop {
449            self.abort();
450        }
451    }
452}
453
454impl BackgroundTaskHandle {
455    /// Configure the handle to abort the task when dropped.
456    ///
457    /// The task will be stopped and will NOT be reported as a failure
458    /// (this is considered intentional termination).
459    pub fn abort_on_drop(mut self) -> Self {
460        self.abort_on_drop = true;
461        self
462    }
463
464    /// Abort the task.
465    ///
466    /// The task will be stopped and will NOT be reported as a failure
467    /// (this is considered intentional termination).
468    pub fn abort(&self) {
469        // Note: ordering matters here, we set the flag before aborting otherwise
470        // there's a possible race condition where the abort() is observed
471        // before the flag is set, and the task monitor would consider this an
472        // unexpected termination.
473        self.intentionally_aborted.store(true, Ordering::Release);
474        self.abort_handle.abort();
475    }
476
477    /// Check if the task has finished.
478    ///
479    /// Returns `true` if the task completed, panicked, or was aborted on
480    /// non-wasm; on wasm, returns whether the task has been aborted only
481    /// (due to lack of better APIs).
482    pub fn is_finished(&self) -> bool {
483        #[cfg(not(target_family = "wasm"))]
484        {
485            self.abort_handle.is_finished()
486        }
487        #[cfg(target_family = "wasm")]
488        {
489            self.abort_handle.is_aborted()
490        }
491    }
492}
493
494/// Capture a backtrace at the current location.
495///
496/// Returns `None` if backtraces are not enabled or not available.
497#[cfg(not(target_family = "wasm"))]
498fn capture_backtrace() -> Option<String> {
499    use std::backtrace::{Backtrace, BacktraceStatus};
500
501    let bt = Backtrace::capture();
502    if bt.status() == BacktraceStatus::Captured { Some(bt.to_string()) } else { None }
503}
504
505/// Capture a backtrace - WASM version (backtraces not typically available).
506#[cfg(target_family = "wasm")]
507fn capture_backtrace() -> Option<String> {
508    None
509}
510
511/// Extract a message from a panic payload.
512fn extract_panic_message(payload: &Box<dyn Any + Send>) -> Option<String> {
513    if let Some(s) = payload.downcast_ref::<&str>() {
514        Some((*s).to_owned())
515    } else {
516        payload.downcast_ref::<String>().cloned()
517    }
518}
519
520#[cfg(test)]
521mod tests {
522    use std::{
523        sync::{
524            Arc,
525            atomic::{AtomicBool, Ordering},
526        },
527        time::Duration,
528    };
529
530    use assert_matches::assert_matches;
531    use matrix_sdk_test_macros::async_test;
532
533    use super::{BackgroundTaskFailureReason, TaskMonitor};
534    use crate::{sleep::sleep, timeout::timeout};
535
536    #[async_test]
537    async fn test_early_termination_is_reported() {
538        let monitor = TaskMonitor::new();
539        let mut failures = monitor.subscribe();
540
541        // Spawn a task that completes immediately.
542        let _handle = monitor.spawn_infinite_task("test_task", async {
543            // Completes immediately: this is an "early termination".
544        });
545
546        // Should receive an early termination failure.
547        let failure = timeout(failures.recv(), Duration::from_secs(1))
548            .await
549            .expect("timeout waiting for failure")
550            .expect("channel closed");
551
552        assert_eq!(failure.task.name, "test_task");
553        assert_matches!(failure.reason, BackgroundTaskFailureReason::EarlyTermination);
554    }
555
556    #[async_test]
557    #[cfg(not(target_family = "wasm"))] // Unfortunately, safe unwinding doesn't work on wasm.
558    async fn test_panic_is_captured() {
559        let monitor = TaskMonitor::new();
560        let mut failures = monitor.subscribe();
561
562        // Spawn a task that panics.
563        let _handle = monitor.spawn_infinite_task("panicking_task", async {
564            panic!("test panic message");
565        });
566
567        // Should receive a panic failure.
568        let failure = timeout(failures.recv(), Duration::from_secs(1))
569            .await
570            .expect("timeout waiting for failure")
571            .expect("channel closed");
572
573        assert_eq!(failure.task.name, "panicking_task");
574        assert_matches!(
575            failure.reason,
576            BackgroundTaskFailureReason::Panic { message, .. } => {
577                assert_eq!(message.as_deref(), Some("test panic message"));
578            }
579        );
580    }
581
582    #[async_test]
583    async fn test_error_is_captured() {
584        let monitor = TaskMonitor::new();
585        let mut failures = monitor.subscribe();
586
587        // Spawn a fallible task that returns an error.
588        let _handle = monitor.spawn_fallible_task("fallible_task", async {
589            Err::<(), _>(std::io::Error::other("test error message"))
590        });
591
592        // Should receive an error failure.
593        let failure = timeout(failures.recv(), Duration::from_secs(1))
594            .await
595            .expect("timeout waiting for failure")
596            .expect("channel closed");
597
598        assert_eq!(failure.task.name, "fallible_task");
599        assert_matches!(
600            failure.reason,
601            BackgroundTaskFailureReason::Error { error } => {
602                assert!(error.contains("test error message"));
603            }
604        );
605    }
606
607    #[async_test]
608    async fn test_successful_fallible_task_no_failure() {
609        let monitor = TaskMonitor::new();
610        let mut failures = monitor.subscribe();
611
612        // Spawn a fallible task that succeeds.
613        let _handle =
614            monitor.spawn_fallible_task("success_task", async { Ok::<(), std::io::Error>(()) });
615
616        // Should NOT receive any failure: use a short timeout.
617        let result = timeout(failures.recv(), Duration::from_millis(100)).await;
618        assert!(result.is_err(), "should timeout, no failure expected");
619    }
620
621    #[async_test]
622    async fn test_abort_does_not_report_failure() {
623        let monitor = TaskMonitor::new();
624        let mut failures = monitor.subscribe();
625
626        // Spawn a long-running task.
627        let handle = monitor.spawn_infinite_task("aborted_task", async {
628            loop {
629                sleep(Duration::from_secs(10)).await;
630            }
631        });
632
633        // Give the task time to start.
634        sleep(Duration::from_millis(10)).await;
635
636        // Abort it.
637        handle.abort();
638
639        // Should NOT receive a failure for intentional abort.
640        let result = timeout(failures.recv(), Duration::from_millis(100)).await;
641        assert!(result.is_err(), "should timeout, no failure expected for abort");
642
643        assert!(handle.is_finished(), "task should be finished after abort");
644    }
645
646    #[async_test]
647    async fn test_abort_on_drop_does_not_report_failure() {
648        let monitor = TaskMonitor::new();
649        let mut failures = monitor.subscribe();
650
651        // Spawn a long-running task.
652        let handle = monitor
653            .spawn_infinite_task("aborted_task", async {
654                loop {
655                    sleep(Duration::from_secs(10)).await;
656                }
657            })
658            .abort_on_drop();
659
660        // Give the task time to start.
661        sleep(Duration::from_millis(10)).await;
662
663        // Abort it.
664        drop(handle);
665
666        // Should NOT receive a failure for intentional abort.
667        let result = timeout(failures.recv(), Duration::from_millis(100)).await;
668        assert!(result.is_err(), "should timeout, no failure expected for abort");
669    }
670
671    #[async_test]
672    async fn test_spawn_finite_task() {
673        let monitor = TaskMonitor::new();
674        let mut failures = monitor.subscribe();
675
676        let successful_completion = Arc::new(AtomicBool::new(false));
677
678        // Spawn a one-off background job that completes successfully.
679        let successful_completion_clone = successful_completion.clone();
680        let _handle = monitor.spawn_finite_task("one-shot job", async move {
681            sleep(Duration::from_millis(10)).await;
682            successful_completion_clone.store(true, Ordering::SeqCst);
683        });
684
685        // Give the task time to finish.
686        sleep(Duration::from_millis(20)).await;
687
688        // Should NOT receive a failure for successful completion.
689        let result = timeout(failures.recv(), Duration::from_millis(100)).await;
690        assert!(result.is_err(), "should timeout, no failure expected for abort");
691
692        assert!(
693            successful_completion.load(Ordering::SeqCst),
694            "background job should have completed successfully"
695        );
696    }
697}