matrix_sdk_common/
task_monitor.rs

1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module provides a [`TaskMonitor`] for spawning and monitoring
16//! long-running background tasks. Tasks spawned through the monitor are
17//! monitored for panics, errors, and unexpected termination.
18//!
19//! ```no_run
20//! use matrix_sdk_common::task_monitor::TaskMonitor;
21//!
22//! let monitor = TaskMonitor::new();
23//!
24//! // Subscribe to failure notifications
25//! let mut failures = monitor.subscribe();
26//!
27//! // Spawn a monitored background task
28//! let handle = monitor.spawn_background_task("my_task", async {
29//!     loop {
30//!         // Do background work...
31//!         matrix_sdk_common::sleep::sleep(std::time::Duration::from_secs(1))
32//!             .await;
33//!     }
34//! });
35//!
36//! // It's also possible to have the task be aborted safely (and without a report)
37//! // when the handle is dropped.
38//! let _handle = handle.abort_on_drop();
39//!
40//! // Listen for failures in another task
41//! // while let Ok(failure) = failures.recv().await {
42//! //     eprintln!("Task {} failed: {:?}", failure.task.name, failure.reason);
43//! // }
44//! ```
45//!
46//! ## A word about unwind safety
47//!
48//! This assumes that all the code running inside the monitored tasks is [unwind
49//! safe](https://doc.rust-lang.org/std/panic/trait.UnwindSafe.html). The assumption is that these
50//! are long-running tasks that:
51//!
52//! - should not panic under normal operation,
53//! - will not be automatically restarted with state shared previously (they can
54//!   be restarted, but in this case they have to be restarted with a clean
55//!   state).
56//!
57//! In general, observers of the task monitor should consider any reported
58//! failure as fatal, and they may decide to report the error one way or another
59//! (e.g., logging, metrics) and subsequently crash the process to avoid running
60//! in a potentially corrupted state.
61//!
62//! ## WebAssembly (WASM) support
63//!
64//! Unfortunately, safe unwinding isn't supported on most WASM targets, as of
65//! 2026-01-28, so panics in monitored tasks cannot be caught and reported.
66//! Instead, a panic in a monitored task may throw a JS exception. The rest of
67//! the monitoring features (error reporting, early termination)
68//! is still functional, though.
69
70use std::{
71    any::Any,
72    collections::HashMap,
73    future::Future,
74    panic::AssertUnwindSafe,
75    sync::{
76        Arc,
77        atomic::{AtomicBool, AtomicU64, Ordering},
78    },
79};
80
81use futures_util::FutureExt;
82use tokio::sync::broadcast;
83
84use crate::{
85    SendOutsideWasm,
86    executor::{AbortHandle, spawn},
87    locks::RwLock,
88};
89
90/// Unique identifier for a background task.
91#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
92pub struct TaskId(u64);
93
94impl TaskId {
95    /// Create a new unique task ID, by incrementing a global counter.
96    fn new() -> Self {
97        static NEXT_ID: AtomicU64 = AtomicU64::new(0);
98        Self(NEXT_ID.fetch_add(1, Ordering::SeqCst))
99    }
100}
101
102impl std::fmt::Display for TaskId {
103    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104        write!(f, "TaskId({})", self.0)
105    }
106}
107
108/// Metadata about a spawned background task.
109#[derive(Debug, Clone)]
110pub struct BackgroundTaskInfo {
111    /// Unique identifier for this task.
112    pub id: TaskId,
113
114    /// Human-readable name for the task, as defined when spawning it.
115    pub name: String,
116}
117
118/// Reason why a background task failed.
119#[derive(Debug, Clone)]
120#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
121pub enum BackgroundTaskFailureReason {
122    /// The task panicked.
123    Panic {
124        /// The panic message, if it could be extracted.
125        message: Option<String>,
126        /// Backtrace captured after the panic (if available).
127        panic_backtrace: Option<String>,
128    },
129
130    /// The task returned an error.
131    Error {
132        /// String representation of the error.
133        // TODO(bnjbvr): consider storing a boxed error instead?
134        error: String,
135    },
136
137    /// The task ended unexpectedly (for tasks expected to run forever).
138    EarlyTermination,
139}
140
141/// A report of a background task failure.
142///
143/// This is sent through the broadcast channel when a monitored task fails.
144#[derive(Debug, Clone)]
145pub struct BackgroundTaskFailure {
146    /// Information about the task that failed.
147    pub task: BackgroundTaskInfo,
148
149    /// Why the task failed.
150    pub reason: BackgroundTaskFailureReason,
151}
152
153/// Internal entry for tracking an active task.
154#[derive(Debug)]
155struct ActiveTask {
156    /// The tokio's handle to preemptively abort the task.
157    // TODO: might be useful to abort on drop?
158    _abort_handle: AbortHandle,
159}
160
161/// Default capacity for the failure broadcast channel.
162///
163/// It doesn't have to be large, because it's expected that consumers of such a
164/// failure report would likely stop execution of the SDK or take immediate
165/// corrective action, and that failures should be rare.
166const FAILURE_CHANNEL_CAPACITY: usize = 8;
167
168/// A monitor for spawning and monitoring background tasks.
169///
170/// The [`TaskMonitor`] allows you to spawn background tasks that are
171/// automatically monitored for panics, errors, and unexpected termination.
172/// In such cases, a [`BackgroundTaskFailure`] is sent through a broadcast
173/// channel that subscribers can listen to.
174///
175/// # Example
176///
177/// ```no_run
178/// use matrix_sdk_common::task_monitor::TaskMonitor;
179///
180/// let monitor = TaskMonitor::new();
181///
182/// // Subscribe to failures
183/// let mut failures = monitor.subscribe();
184///
185/// // Spawn a task that runs indefinitely
186/// let _handle = monitor.spawn_background_task("worker", async {
187///     loop {
188///         // Do work...
189///         matrix_sdk_common::sleep::sleep(std::time::Duration::from_secs(1))
190///             .await;
191///     }
192/// });
193/// ```
194#[derive(Debug)]
195pub struct TaskMonitor {
196    /// Sender for failure notifications.
197    failure_sender: broadcast::Sender<BackgroundTaskFailure>,
198
199    /// Map of active tasks by ID.
200    active_task_handles: Arc<RwLock<HashMap<TaskId, ActiveTask>>>,
201}
202
203impl Default for TaskMonitor {
204    fn default() -> Self {
205        Self::new()
206    }
207}
208
209impl TaskMonitor {
210    /// Create a new task monitor.
211    pub fn new() -> Self {
212        let (failure_sender, _) = broadcast::channel(FAILURE_CHANNEL_CAPACITY);
213        Self { failure_sender, active_task_handles: Default::default() }
214    }
215
216    /// Subscribe to failure notifications.
217    ///
218    /// Returns a broadcast receiver that will receive [`BackgroundTaskFailure`]
219    /// messages whenever a monitored task fails.
220    ///
221    /// Note: If the receiver falls behind, older messages may be dropped.
222    pub fn subscribe(&self) -> broadcast::Receiver<BackgroundTaskFailure> {
223        self.failure_sender.subscribe()
224    }
225
226    /// Spawn a background task that is expected to run indefinitely.
227    ///
228    /// If the task completes (whether successfully or by panicking), it will be
229    /// reported as a [`BackgroundTaskFailure`] report through the broadcast
230    /// channel.
231    ///
232    /// Use this for long-running tasks like event loops, sync tasks, or
233    /// background workers that should never complete under normal
234    /// operation.
235    ///
236    /// # Arguments
237    ///
238    /// * `name` - A human-readable name for the task (for debugging purposes).
239    /// * `future` - The async task to run.
240    ///
241    /// # Returns
242    ///
243    /// A [`BackgroundTaskHandle`] that can be used to abort the task or check
244    /// if it has finished. This is the equivalent of tokio's `JoinHandle`.
245    pub fn spawn_background_task<F>(
246        &self,
247        name: impl Into<String>,
248        future: F,
249    ) -> BackgroundTaskHandle
250    where
251        F: Future<Output = ()> + SendOutsideWasm + 'static,
252    {
253        let name = name.into();
254        let task_id = TaskId::new();
255        let task_info = BackgroundTaskInfo { id: task_id, name };
256
257        let intentionally_aborted = Arc::new(AtomicBool::new(false));
258
259        let active_tasks = self.active_task_handles.clone();
260        let failure_sender = self.failure_sender.clone();
261        let aborted_flag = intentionally_aborted.clone();
262
263        let wrapped = async move {
264            // SAFETY: see module-level documentation about unwind safety.
265            let result = AssertUnwindSafe(future).catch_unwind().await;
266
267            // Remove the task from the list of active ones.
268            active_tasks.write().remove(&task_id);
269
270            // Don't report if intentionally aborted.
271            if aborted_flag.load(Ordering::Acquire) {
272                return;
273            }
274
275            let failure_reason = match result {
276                Ok(()) => {
277                    // The task ended, this is considered an early termination.
278                    BackgroundTaskFailureReason::EarlyTermination
279                }
280
281                Err(panic_payload) => BackgroundTaskFailureReason::Panic {
282                    message: extract_panic_message(&panic_payload),
283                    panic_backtrace: capture_backtrace(),
284                },
285            };
286
287            let failure = BackgroundTaskFailure { task: task_info, reason: failure_reason };
288
289            // Forward failure to observers (ignore if there's none).
290            let _ = failure_sender.send(failure);
291        };
292
293        let join_handle = spawn(wrapped);
294        let abort_handle = join_handle.abort_handle();
295
296        // Register the task.
297        self.active_task_handles
298            .write()
299            .insert(task_id, ActiveTask { _abort_handle: abort_handle.clone() });
300
301        BackgroundTaskHandle { abort_on_drop: false, abort_handle, intentionally_aborted }
302    }
303
304    /// Spawn a background task that returns a `Result`.
305    ///
306    /// The task is monitored for panics and errors; see also
307    /// [`BackgroundTaskFailure`].
308    ///
309    /// If the task returns `Ok(())`, it is considered successful and no failure
310    /// is reported.
311    ///
312    /// # Arguments
313    ///
314    /// * `name` - A human-readable name for the task (for debugging purposes).
315    /// * `future` - The async task to run.
316    ///
317    /// # Returns
318    ///
319    /// A [`BackgroundTaskHandle`] that can be used to abort the task or check
320    /// if it has finished. This is the equivalent of tokio's `JoinHandle`.
321    pub fn spawn_fallible_task<F, E>(
322        &self,
323        name: impl Into<String>,
324        future: F,
325    ) -> BackgroundTaskHandle
326    where
327        F: Future<Output = Result<(), E>> + SendOutsideWasm + 'static,
328        E: std::error::Error + SendOutsideWasm + 'static,
329    {
330        let name = name.into();
331        let task_id = TaskId::new();
332        let task_info = BackgroundTaskInfo { id: task_id, name };
333
334        let intentionally_aborted = Arc::new(AtomicBool::new(false));
335
336        let active_tasks = self.active_task_handles.clone();
337        let failure_sender = self.failure_sender.clone();
338        let aborted_flag = intentionally_aborted.clone();
339
340        let wrapped = async move {
341            let result = AssertUnwindSafe(future).catch_unwind().await;
342
343            active_tasks.write().remove(&task_id);
344
345            // Don't report if intentionally aborted.
346            if aborted_flag.load(Ordering::Acquire) {
347                return;
348            }
349
350            let failure_reason = match result {
351                Ok(Ok(())) => {
352                    // The task ended successfully, no failure to report.
353                    return;
354                }
355
356                Ok(Err(e)) => BackgroundTaskFailureReason::Error { error: e.to_string() },
357
358                Err(panic_payload) => BackgroundTaskFailureReason::Panic {
359                    message: extract_panic_message(&panic_payload),
360                    panic_backtrace: capture_backtrace(),
361                },
362            };
363
364            // Send failure (ignore if no receivers).
365            let _ = failure_sender
366                .send(BackgroundTaskFailure { task: task_info, reason: failure_reason });
367        };
368
369        let join_handle = spawn(wrapped);
370        let abort_handle = join_handle.abort_handle();
371
372        // Register the task.
373        self.active_task_handles
374            .write()
375            .insert(task_id, ActiveTask { _abort_handle: abort_handle.clone() });
376
377        BackgroundTaskHandle { abort_on_drop: false, abort_handle, intentionally_aborted }
378    }
379}
380
381/// A handle to a spawned background task.
382///
383/// This handle can be used to abort the task or check if it has finished.
384/// When aborted through this handle, the task will NOT be reported as a
385/// failure.
386#[derive(Debug)]
387pub struct BackgroundTaskHandle {
388    /// The underlying tokio's [`AbortHandle`].
389    abort_handle: AbortHandle,
390
391    /// Should the task be safely aborted on drop?
392    ///
393    /// This won't result in a failure report, as it's an intentional abort.
394    abort_on_drop: bool,
395
396    /// An additional flag to indicate if the task was intentionally aborted, so
397    /// we don't report it as a failure when that happens.
398    intentionally_aborted: Arc<AtomicBool>,
399}
400
401impl Drop for BackgroundTaskHandle {
402    fn drop(&mut self) {
403        if self.abort_on_drop {
404            self.abort();
405        }
406    }
407}
408
409impl BackgroundTaskHandle {
410    /// Configure the handle to abort the task when dropped.
411    ///
412    /// The task will be stopped and will NOT be reported as a failure
413    /// (this is considered intentional termination).
414    pub fn abort_on_drop(mut self) -> Self {
415        self.abort_on_drop = true;
416        self
417    }
418
419    /// Abort the task.
420    ///
421    /// The task will be stopped and will NOT be reported as a failure
422    /// (this is considered intentional termination).
423    pub fn abort(&self) {
424        // Note: ordering matters here, we set the flag before aborting otherwise
425        // there's a possible race condition where the abort() is observed
426        // before the flag is set, and the task monitor would consider this an
427        // unexpected termination.
428        self.intentionally_aborted.store(true, Ordering::Release);
429        self.abort_handle.abort();
430    }
431
432    /// Check if the task has finished.
433    ///
434    /// Returns `true` if the task completed, panicked, or was aborted on
435    /// non-wasm; on wasm, returns whether the task has been aborted only
436    /// (due to lack of better APIs).
437    pub fn is_finished(&self) -> bool {
438        #[cfg(not(target_family = "wasm"))]
439        {
440            self.abort_handle.is_finished()
441        }
442        #[cfg(target_family = "wasm")]
443        {
444            self.abort_handle.is_aborted()
445        }
446    }
447}
448
449/// Capture a backtrace at the current location.
450///
451/// Returns `None` if backtraces are not enabled or not available.
452#[cfg(not(target_family = "wasm"))]
453fn capture_backtrace() -> Option<String> {
454    use std::backtrace::{Backtrace, BacktraceStatus};
455
456    let bt = Backtrace::capture();
457    if bt.status() == BacktraceStatus::Captured { Some(bt.to_string()) } else { None }
458}
459
460/// Capture a backtrace - WASM version (backtraces not typically available).
461#[cfg(target_family = "wasm")]
462fn capture_backtrace() -> Option<String> {
463    None
464}
465
466/// Extract a message from a panic payload.
467fn extract_panic_message(payload: &Box<dyn Any + Send>) -> Option<String> {
468    if let Some(s) = payload.downcast_ref::<&str>() {
469        Some((*s).to_owned())
470    } else {
471        payload.downcast_ref::<String>().cloned()
472    }
473}
474
475#[cfg(test)]
476mod tests {
477    use std::time::Duration;
478
479    use assert_matches::assert_matches;
480    use matrix_sdk_test_macros::async_test;
481
482    use super::{BackgroundTaskFailureReason, TaskMonitor};
483    use crate::{sleep::sleep, timeout::timeout};
484
485    #[async_test]
486    async fn test_early_termination_is_reported() {
487        let monitor = TaskMonitor::new();
488        let mut failures = monitor.subscribe();
489
490        // Spawn a task that completes immediately.
491        let _handle = monitor.spawn_background_task("test_task", async {
492            // Completes immediately: this is an "early termination".
493        });
494
495        // Should receive an early termination failure.
496        let failure = timeout(failures.recv(), Duration::from_secs(1))
497            .await
498            .expect("timeout waiting for failure")
499            .expect("channel closed");
500
501        assert_eq!(failure.task.name, "test_task");
502        assert_matches!(failure.reason, BackgroundTaskFailureReason::EarlyTermination);
503    }
504
505    #[async_test]
506    #[cfg(not(target_family = "wasm"))] // Unfortunately, safe unwinding doesn't work on wasm.
507    async fn test_panic_is_captured() {
508        let monitor = TaskMonitor::new();
509        let mut failures = monitor.subscribe();
510
511        // Spawn a task that panics.
512        let _handle = monitor.spawn_background_task("panicking_task", async {
513            panic!("test panic message");
514        });
515
516        // Should receive a panic failure.
517        let failure = timeout(failures.recv(), Duration::from_secs(1))
518            .await
519            .expect("timeout waiting for failure")
520            .expect("channel closed");
521
522        assert_eq!(failure.task.name, "panicking_task");
523        assert_matches!(
524            failure.reason,
525            BackgroundTaskFailureReason::Panic { message, .. } => {
526                assert_eq!(message.as_deref(), Some("test panic message"));
527            }
528        );
529    }
530
531    #[async_test]
532    async fn test_error_is_captured() {
533        let monitor = TaskMonitor::new();
534        let mut failures = monitor.subscribe();
535
536        // Spawn a fallible task that returns an error.
537        let _handle = monitor.spawn_fallible_task("fallible_task", async {
538            Err::<(), _>(std::io::Error::other("test error message"))
539        });
540
541        // Should receive an error failure.
542        let failure = timeout(failures.recv(), Duration::from_secs(1))
543            .await
544            .expect("timeout waiting for failure")
545            .expect("channel closed");
546
547        assert_eq!(failure.task.name, "fallible_task");
548        assert_matches!(
549            failure.reason,
550            BackgroundTaskFailureReason::Error { error } => {
551                assert!(error.contains("test error message"));
552            }
553        );
554    }
555
556    #[async_test]
557    async fn test_successful_fallible_task_no_failure() {
558        let monitor = TaskMonitor::new();
559        let mut failures = monitor.subscribe();
560
561        // Spawn a fallible task that succeeds.
562        let _handle =
563            monitor.spawn_fallible_task("success_task", async { Ok::<(), std::io::Error>(()) });
564
565        // Should NOT receive any failure: use a short timeout.
566        let result = timeout(failures.recv(), Duration::from_millis(100)).await;
567        assert!(result.is_err(), "should timeout, no failure expected");
568    }
569
570    #[async_test]
571    async fn test_abort_does_not_report_failure() {
572        let monitor = TaskMonitor::new();
573        let mut failures = monitor.subscribe();
574
575        // Spawn a long-running task.
576        let handle = monitor.spawn_background_task("aborted_task", async {
577            loop {
578                sleep(Duration::from_secs(10)).await;
579            }
580        });
581
582        // Give the task time to start.
583        sleep(Duration::from_millis(10)).await;
584
585        // Abort it.
586        handle.abort();
587
588        // Should NOT receive a failure for intentional abort.
589        let result = timeout(failures.recv(), Duration::from_millis(100)).await;
590        assert!(result.is_err(), "should timeout, no failure expected for abort");
591
592        assert!(handle.is_finished(), "task should be finished after abort");
593    }
594
595    #[async_test]
596    async fn test_abort_on_drop_does_not_report_failure() {
597        let monitor = TaskMonitor::new();
598        let mut failures = monitor.subscribe();
599
600        // Spawn a long-running task.
601        let handle = monitor
602            .spawn_background_task("aborted_task", async {
603                loop {
604                    sleep(Duration::from_secs(10)).await;
605                }
606            })
607            .abort_on_drop();
608
609        // Give the task time to start.
610        sleep(Duration::from_millis(10)).await;
611
612        // Abort it.
613        drop(handle);
614
615        // Should NOT receive a failure for intentional abort.
616        let result = timeout(failures.recv(), Duration::from_millis(100)).await;
617        assert!(result.is_err(), "should timeout, no failure expected for abort");
618    }
619}