matrix_sdk_common/task_monitor.rs
1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module provides a [`TaskMonitor`] for spawning and monitoring
16//! long-running background tasks. Tasks spawned through the monitor are
17//! monitored for panics, errors, and unexpected termination.
18//!
19//! ```no_run
20//! use matrix_sdk_common::task_monitor::TaskMonitor;
21//!
22//! let monitor = TaskMonitor::new();
23//!
24//! // Subscribe to failure notifications
25//! let mut failures = monitor.subscribe();
26//!
27//! // Spawn a monitored background task
28//! let handle = monitor.spawn_infinite_task("my_task", async {
29//! loop {
30//! // Do background work...
31//! matrix_sdk_common::sleep::sleep(std::time::Duration::from_secs(1))
32//! .await;
33//! }
34//! });
35//!
36//! // It's also possible to have the task be aborted safely (and without a report)
37//! // when the handle is dropped.
38//! let _handle = handle.abort_on_drop();
39//!
40//! // Listen for failures in another task
41//! // while let Ok(failure) = failures.recv().await {
42//! // eprintln!("Task {} failed: {:?}", failure.task.name, failure.reason);
43//! // }
44//! ```
45//!
46//! ## A word about unwind safety
47//!
48//! This assumes that all the code running inside the monitored tasks is [unwind
49//! safe](https://doc.rust-lang.org/std/panic/trait.UnwindSafe.html). The assumption is that these
50//! are long-running tasks that:
51//!
52//! - should not panic under normal operation,
53//! - will not be automatically restarted with state shared previously (they can
54//! be restarted, but in this case they have to be restarted with a clean
55//! state).
56//!
57//! In general, observers of the task monitor should consider any reported
58//! failure as fatal, and they may decide to report the error one way or another
59//! (e.g., logging, metrics) and subsequently crash the process to avoid running
60//! in a potentially corrupted state.
61//!
62//! ## WebAssembly (WASM) support
63//!
64//! Unfortunately, safe unwinding isn't supported on most WASM targets, as of
65//! 2026-01-28, so panics in monitored tasks cannot be caught and reported.
66//! Instead, a panic in a monitored task may throw a JS exception. The rest of
67//! the monitoring features (error reporting, early termination)
68//! is still functional, though.
69
70use std::{
71 any::Any,
72 collections::HashMap,
73 future::Future,
74 panic::AssertUnwindSafe,
75 sync::{
76 Arc,
77 atomic::{AtomicBool, AtomicU64, Ordering},
78 },
79};
80
81use futures_util::FutureExt;
82use tokio::sync::broadcast;
83
84use crate::{
85 SendOutsideWasm,
86 executor::{AbortHandle, spawn},
87 locks::RwLock,
88};
89
90/// Unique identifier for a background task.
91#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
92pub struct TaskId(u64);
93
94impl TaskId {
95 /// Create a new unique task ID, by incrementing a global counter.
96 fn new() -> Self {
97 static NEXT_ID: AtomicU64 = AtomicU64::new(0);
98 Self(NEXT_ID.fetch_add(1, Ordering::SeqCst))
99 }
100}
101
102impl std::fmt::Display for TaskId {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 write!(f, "TaskId({})", self.0)
105 }
106}
107
108/// Metadata about a spawned background task.
109#[derive(Debug, Clone)]
110pub struct BackgroundTaskInfo {
111 /// Unique identifier for this task.
112 pub id: TaskId,
113
114 /// Human-readable name for the task, as defined when spawning it.
115 pub name: String,
116}
117
118/// Reason why a background task failed.
119#[derive(Debug, Clone)]
120#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
121pub enum BackgroundTaskFailureReason {
122 /// The task panicked.
123 Panic {
124 /// The panic message, if it could be extracted.
125 message: Option<String>,
126 /// Backtrace captured after the panic (if available).
127 panic_backtrace: Option<String>,
128 },
129
130 /// The task returned an error.
131 Error {
132 /// String representation of the error.
133 // TODO: consider storing a boxed error instead?
134 error: String,
135 },
136
137 /// The task ended unexpectedly (for tasks expected to run forever).
138 EarlyTermination,
139}
140
141/// A report of a background task failure.
142///
143/// This is sent through the broadcast channel when a monitored task fails.
144#[derive(Debug, Clone)]
145pub struct BackgroundTaskFailure {
146 /// Information about the task that failed.
147 pub task: BackgroundTaskInfo,
148
149 /// Why the task failed.
150 pub reason: BackgroundTaskFailureReason,
151}
152
153/// Internal entry for tracking an active task.
154#[derive(Debug)]
155struct ActiveTask {
156 /// The tokio's handle to preemptively abort the task.
157 // TODO: might be useful to abort on drop?
158 _abort_handle: AbortHandle,
159}
160
161/// Default capacity for the failure broadcast channel.
162///
163/// It doesn't have to be large, because it's expected that consumers of such a
164/// failure report would likely stop execution of the SDK or take immediate
165/// corrective action, and that failures should be rare.
166const FAILURE_CHANNEL_CAPACITY: usize = 8;
167
168/// A monitor for spawning and monitoring background tasks.
169///
170/// The [`TaskMonitor`] allows you to spawn background tasks that are
171/// automatically monitored for panics, errors, and unexpected termination.
172/// In such cases, a [`BackgroundTaskFailure`] is sent through a broadcast
173/// channel that subscribers can listen to.
174///
175/// # Example
176///
177/// ```no_run
178/// use matrix_sdk_common::task_monitor::TaskMonitor;
179///
180/// let monitor = TaskMonitor::new();
181///
182/// // Subscribe to failures
183/// let mut failures = monitor.subscribe();
184///
185/// // Spawn a task that runs indefinitely
186/// let _handle = monitor.spawn_infinite_task("worker", async {
187/// loop {
188/// // Do work...
189/// matrix_sdk_common::sleep::sleep(std::time::Duration::from_secs(1))
190/// .await;
191/// }
192/// });
193/// ```
194#[derive(Debug)]
195pub struct TaskMonitor {
196 /// Sender for failure notifications.
197 failure_sender: broadcast::Sender<BackgroundTaskFailure>,
198
199 /// Map of active tasks by ID.
200 active_task_handles: Arc<RwLock<HashMap<TaskId, ActiveTask>>>,
201}
202
203impl Default for TaskMonitor {
204 fn default() -> Self {
205 Self::new()
206 }
207}
208
209impl TaskMonitor {
210 /// Create a new task monitor.
211 pub fn new() -> Self {
212 let (failure_sender, _) = broadcast::channel(FAILURE_CHANNEL_CAPACITY);
213 Self { failure_sender, active_task_handles: Default::default() }
214 }
215
216 /// Subscribe to failure notifications.
217 ///
218 /// Returns a broadcast receiver that will receive [`BackgroundTaskFailure`]
219 /// messages whenever a monitored task fails.
220 ///
221 /// Note: If the receiver falls behind, older messages may be dropped.
222 pub fn subscribe(&self) -> broadcast::Receiver<BackgroundTaskFailure> {
223 self.failure_sender.subscribe()
224 }
225
226 /// Spawn a background task that is expected to **run forever**.
227 ///
228 /// For one-off background tasks that are expected to complete successfully,
229 /// use [`Self::spawn_finite_task`] instead.
230 ///
231 /// If the task completes (whether successfully or by panicking), it will be
232 /// reported as a [`BackgroundTaskFailure`] report through the broadcast
233 /// channel.
234 ///
235 /// Use this for long-running tasks like event loops, sync tasks, or
236 /// background workers that should never complete under normal
237 /// operation.
238 ///
239 /// # Arguments
240 ///
241 /// * `name` - A human-readable name for the task (for debugging purposes).
242 /// * `future` - The async task to run.
243 ///
244 /// # Returns
245 ///
246 /// A [`BackgroundTaskHandle`] that can be used to abort the task or check
247 /// if it has finished. This is the equivalent of tokio's `JoinHandle`.
248 pub fn spawn_infinite_task<F>(&self, name: impl Into<String>, future: F) -> BackgroundTaskHandle
249 where
250 F: Future<Output = ()> + SendOutsideWasm + 'static,
251 {
252 self.spawn_task_internal(name, future, true)
253 }
254
255 /// Spawn a background job that is expected to run once and complete
256 /// successfully in the background.
257 ///
258 /// For long-term background jobs that are expected to run forever, use
259 /// [`Self::spawn_infinite_task`] instead.
260 ///
261 /// If the task completes (by panicking), it will be reported as a
262 /// [`BackgroundTaskFailure`] report through the broadcast channel.
263 ///
264 /// Use this for one-shot background tasks that should complete under normal
265 /// operation.
266 ///
267 /// # Arguments
268 ///
269 /// * `name` - A human-readable name for the task (for debugging purposes).
270 /// * `future` - The async task to run.
271 ///
272 /// # Returns
273 ///
274 /// A [`BackgroundTaskHandle`] that can be used to abort the task or check
275 /// if it has finished. This is the equivalent of tokio's `JoinHandle`.
276 pub fn spawn_finite_task<F>(&self, name: impl Into<String>, future: F) -> BackgroundTaskHandle
277 where
278 F: Future<Output = ()> + SendOutsideWasm + 'static,
279 {
280 self.spawn_task_internal(name, future, false)
281 }
282
283 fn spawn_task_internal<F>(
284 &self,
285 name: impl Into<String>,
286 future: F,
287 runs_forever: bool,
288 ) -> BackgroundTaskHandle
289 where
290 F: Future<Output = ()> + SendOutsideWasm + 'static,
291 {
292 let name = name.into();
293 let task_id = TaskId::new();
294 let task_info = BackgroundTaskInfo { id: task_id, name };
295
296 let intentionally_aborted = Arc::new(AtomicBool::new(false));
297
298 let active_tasks = self.active_task_handles.clone();
299 let failure_sender = self.failure_sender.clone();
300 let aborted_flag = intentionally_aborted.clone();
301
302 let wrapped = async move {
303 // SAFETY: see module-level documentation about unwind safety.
304 let result = AssertUnwindSafe(future).catch_unwind().await;
305
306 // Remove the task from the list of active ones.
307 active_tasks.write().remove(&task_id);
308
309 // Don't report if intentionally aborted.
310 if aborted_flag.load(Ordering::Acquire) {
311 return;
312 }
313
314 let failure_reason = match result {
315 Ok(()) => {
316 if runs_forever {
317 // The background forever task ended, this is considered an early
318 // termination.
319 BackgroundTaskFailureReason::EarlyTermination
320 } else {
321 // The task ended successfully, no failure to report.
322 return;
323 }
324 }
325
326 Err(panic_payload) => BackgroundTaskFailureReason::Panic {
327 message: extract_panic_message(&panic_payload),
328 panic_backtrace: capture_backtrace(),
329 },
330 };
331
332 let failure = BackgroundTaskFailure { task: task_info, reason: failure_reason };
333
334 // Forward failure to observers (ignore if there's none).
335 let _ = failure_sender.send(failure);
336 };
337
338 let join_handle = spawn(wrapped);
339 let abort_handle = join_handle.abort_handle();
340
341 // Register the task.
342 self.active_task_handles
343 .write()
344 .insert(task_id, ActiveTask { _abort_handle: abort_handle.clone() });
345
346 BackgroundTaskHandle { abort_on_drop: false, abort_handle, intentionally_aborted }
347 }
348
349 /// Spawn a background task that returns a `Result`.
350 ///
351 /// The task is monitored for panics and errors; see also
352 /// [`BackgroundTaskFailure`].
353 ///
354 /// If the task returns `Ok(())`, it is considered successful and no failure
355 /// is reported.
356 ///
357 /// # Arguments
358 ///
359 /// * `name` - A human-readable name for the task (for debugging purposes).
360 /// * `future` - The async task to run.
361 ///
362 /// # Returns
363 ///
364 /// A [`BackgroundTaskHandle`] that can be used to abort the task or check
365 /// if it has finished. This is the equivalent of tokio's `JoinHandle`.
366 pub fn spawn_fallible_task<F, E>(
367 &self,
368 name: impl Into<String>,
369 future: F,
370 ) -> BackgroundTaskHandle
371 where
372 F: Future<Output = Result<(), E>> + SendOutsideWasm + 'static,
373 E: std::error::Error + SendOutsideWasm + 'static,
374 {
375 let name = name.into();
376 let task_id = TaskId::new();
377 let task_info = BackgroundTaskInfo { id: task_id, name };
378
379 let intentionally_aborted = Arc::new(AtomicBool::new(false));
380
381 let active_tasks = self.active_task_handles.clone();
382 let failure_sender = self.failure_sender.clone();
383 let aborted_flag = intentionally_aborted.clone();
384
385 let wrapped = async move {
386 let result = AssertUnwindSafe(future).catch_unwind().await;
387
388 active_tasks.write().remove(&task_id);
389
390 // Don't report if intentionally aborted.
391 if aborted_flag.load(Ordering::Acquire) {
392 return;
393 }
394
395 let failure_reason = match result {
396 Ok(Ok(())) => {
397 // The task ended successfully, no failure to report.
398 return;
399 }
400
401 Ok(Err(e)) => BackgroundTaskFailureReason::Error { error: e.to_string() },
402
403 Err(panic_payload) => BackgroundTaskFailureReason::Panic {
404 message: extract_panic_message(&panic_payload),
405 panic_backtrace: capture_backtrace(),
406 },
407 };
408
409 // Send failure (ignore if no receivers).
410 let _ = failure_sender
411 .send(BackgroundTaskFailure { task: task_info, reason: failure_reason });
412 };
413
414 let join_handle = spawn(wrapped);
415 let abort_handle = join_handle.abort_handle();
416
417 // Register the task.
418 self.active_task_handles
419 .write()
420 .insert(task_id, ActiveTask { _abort_handle: abort_handle.clone() });
421
422 BackgroundTaskHandle { abort_on_drop: false, abort_handle, intentionally_aborted }
423 }
424}
425
426/// A handle to a spawned background task.
427///
428/// This handle can be used to abort the task or check if it has finished.
429/// When aborted through this handle, the task will NOT be reported as a
430/// failure.
431#[derive(Debug)]
432pub struct BackgroundTaskHandle {
433 /// The underlying tokio's [`AbortHandle`].
434 abort_handle: AbortHandle,
435
436 /// Should the task be safely aborted on drop?
437 ///
438 /// This won't result in a failure report, as it's an intentional abort.
439 abort_on_drop: bool,
440
441 /// An additional flag to indicate if the task was intentionally aborted, so
442 /// we don't report it as a failure when that happens.
443 intentionally_aborted: Arc<AtomicBool>,
444}
445
446impl Drop for BackgroundTaskHandle {
447 fn drop(&mut self) {
448 if self.abort_on_drop {
449 self.abort();
450 }
451 }
452}
453
454impl BackgroundTaskHandle {
455 /// Configure the handle to abort the task when dropped.
456 ///
457 /// The task will be stopped and will NOT be reported as a failure
458 /// (this is considered intentional termination).
459 pub fn abort_on_drop(mut self) -> Self {
460 self.abort_on_drop = true;
461 self
462 }
463
464 /// Abort the task.
465 ///
466 /// The task will be stopped and will NOT be reported as a failure
467 /// (this is considered intentional termination).
468 pub fn abort(&self) {
469 // Note: ordering matters here, we set the flag before aborting otherwise
470 // there's a possible race condition where the abort() is observed
471 // before the flag is set, and the task monitor would consider this an
472 // unexpected termination.
473 self.intentionally_aborted.store(true, Ordering::Release);
474 self.abort_handle.abort();
475 }
476
477 /// Check if the task has finished.
478 ///
479 /// Returns `true` if the task completed, panicked, or was aborted on
480 /// non-wasm; on wasm, returns whether the task has been aborted only
481 /// (due to lack of better APIs).
482 pub fn is_finished(&self) -> bool {
483 #[cfg(not(target_family = "wasm"))]
484 {
485 self.abort_handle.is_finished()
486 }
487 #[cfg(target_family = "wasm")]
488 {
489 self.abort_handle.is_aborted()
490 }
491 }
492}
493
494/// Capture a backtrace at the current location.
495///
496/// Returns `None` if backtraces are not enabled or not available.
497#[cfg(not(target_family = "wasm"))]
498fn capture_backtrace() -> Option<String> {
499 use std::backtrace::{Backtrace, BacktraceStatus};
500
501 let bt = Backtrace::capture();
502 if bt.status() == BacktraceStatus::Captured { Some(bt.to_string()) } else { None }
503}
504
505/// Capture a backtrace - WASM version (backtraces not typically available).
506#[cfg(target_family = "wasm")]
507fn capture_backtrace() -> Option<String> {
508 None
509}
510
511/// Extract a message from a panic payload.
512fn extract_panic_message(payload: &Box<dyn Any + Send>) -> Option<String> {
513 if let Some(s) = payload.downcast_ref::<&str>() {
514 Some((*s).to_owned())
515 } else {
516 payload.downcast_ref::<String>().cloned()
517 }
518}
519
520#[cfg(test)]
521mod tests {
522 use std::{
523 sync::{
524 Arc,
525 atomic::{AtomicBool, Ordering},
526 },
527 time::Duration,
528 };
529
530 use assert_matches::assert_matches;
531 use matrix_sdk_test_macros::async_test;
532
533 use super::{BackgroundTaskFailureReason, TaskMonitor};
534 use crate::{sleep::sleep, timeout::timeout};
535
536 #[async_test]
537 async fn test_early_termination_is_reported() {
538 let monitor = TaskMonitor::new();
539 let mut failures = monitor.subscribe();
540
541 // Spawn a task that completes immediately.
542 let _handle = monitor.spawn_infinite_task("test_task", async {
543 // Completes immediately: this is an "early termination".
544 });
545
546 // Should receive an early termination failure.
547 let failure = timeout(failures.recv(), Duration::from_secs(1))
548 .await
549 .expect("timeout waiting for failure")
550 .expect("channel closed");
551
552 assert_eq!(failure.task.name, "test_task");
553 assert_matches!(failure.reason, BackgroundTaskFailureReason::EarlyTermination);
554 }
555
556 #[async_test]
557 #[cfg(not(target_family = "wasm"))] // Unfortunately, safe unwinding doesn't work on wasm.
558 async fn test_panic_is_captured() {
559 let monitor = TaskMonitor::new();
560 let mut failures = monitor.subscribe();
561
562 // Spawn a task that panics.
563 let _handle = monitor.spawn_infinite_task("panicking_task", async {
564 panic!("test panic message");
565 });
566
567 // Should receive a panic failure.
568 let failure = timeout(failures.recv(), Duration::from_secs(1))
569 .await
570 .expect("timeout waiting for failure")
571 .expect("channel closed");
572
573 assert_eq!(failure.task.name, "panicking_task");
574 assert_matches!(
575 failure.reason,
576 BackgroundTaskFailureReason::Panic { message, .. } => {
577 assert_eq!(message.as_deref(), Some("test panic message"));
578 }
579 );
580 }
581
582 #[async_test]
583 async fn test_error_is_captured() {
584 let monitor = TaskMonitor::new();
585 let mut failures = monitor.subscribe();
586
587 // Spawn a fallible task that returns an error.
588 let _handle = monitor.spawn_fallible_task("fallible_task", async {
589 Err::<(), _>(std::io::Error::other("test error message"))
590 });
591
592 // Should receive an error failure.
593 let failure = timeout(failures.recv(), Duration::from_secs(1))
594 .await
595 .expect("timeout waiting for failure")
596 .expect("channel closed");
597
598 assert_eq!(failure.task.name, "fallible_task");
599 assert_matches!(
600 failure.reason,
601 BackgroundTaskFailureReason::Error { error } => {
602 assert!(error.contains("test error message"));
603 }
604 );
605 }
606
607 #[async_test]
608 async fn test_successful_fallible_task_no_failure() {
609 let monitor = TaskMonitor::new();
610 let mut failures = monitor.subscribe();
611
612 // Spawn a fallible task that succeeds.
613 let _handle =
614 monitor.spawn_fallible_task("success_task", async { Ok::<(), std::io::Error>(()) });
615
616 // Should NOT receive any failure: use a short timeout.
617 let result = timeout(failures.recv(), Duration::from_millis(100)).await;
618 assert!(result.is_err(), "should timeout, no failure expected");
619 }
620
621 #[async_test]
622 async fn test_abort_does_not_report_failure() {
623 let monitor = TaskMonitor::new();
624 let mut failures = monitor.subscribe();
625
626 // Spawn a long-running task.
627 let handle = monitor.spawn_infinite_task("aborted_task", async {
628 loop {
629 sleep(Duration::from_secs(10)).await;
630 }
631 });
632
633 // Give the task time to start.
634 sleep(Duration::from_millis(10)).await;
635
636 // Abort it.
637 handle.abort();
638
639 // Should NOT receive a failure for intentional abort.
640 let result = timeout(failures.recv(), Duration::from_millis(100)).await;
641 assert!(result.is_err(), "should timeout, no failure expected for abort");
642
643 assert!(handle.is_finished(), "task should be finished after abort");
644 }
645
646 #[async_test]
647 async fn test_abort_on_drop_does_not_report_failure() {
648 let monitor = TaskMonitor::new();
649 let mut failures = monitor.subscribe();
650
651 // Spawn a long-running task.
652 let handle = monitor
653 .spawn_infinite_task("aborted_task", async {
654 loop {
655 sleep(Duration::from_secs(10)).await;
656 }
657 })
658 .abort_on_drop();
659
660 // Give the task time to start.
661 sleep(Duration::from_millis(10)).await;
662
663 // Abort it.
664 drop(handle);
665
666 // Should NOT receive a failure for intentional abort.
667 let result = timeout(failures.recv(), Duration::from_millis(100)).await;
668 assert!(result.is_err(), "should timeout, no failure expected for abort");
669 }
670
671 #[async_test]
672 async fn test_spawn_finite_task() {
673 let monitor = TaskMonitor::new();
674 let mut failures = monitor.subscribe();
675
676 let successful_completion = Arc::new(AtomicBool::new(false));
677
678 // Spawn a one-off background job that completes successfully.
679 let successful_completion_clone = successful_completion.clone();
680 let _handle = monitor.spawn_finite_task("one-shot job", async move {
681 sleep(Duration::from_millis(10)).await;
682 successful_completion_clone.store(true, Ordering::SeqCst);
683 });
684
685 // Give the task time to finish.
686 sleep(Duration::from_millis(20)).await;
687
688 // Should NOT receive a failure for successful completion.
689 let result = timeout(failures.recv(), Duration::from_millis(100)).await;
690 assert!(result.is_err(), "should timeout, no failure expected for abort");
691
692 assert!(
693 successful_completion.load(Ordering::SeqCst),
694 "background job should have completed successfully"
695 );
696 }
697}