matrix_sdk_common/task_monitor.rs
1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module provides a [`TaskMonitor`] for spawning and monitoring
16//! long-running background tasks. Tasks spawned through the monitor are
17//! monitored for panics, errors, and unexpected termination.
18//!
19//! ```no_run
20//! use matrix_sdk_common::task_monitor::TaskMonitor;
21//!
22//! let monitor = TaskMonitor::new();
23//!
24//! // Subscribe to failure notifications
25//! let mut failures = monitor.subscribe();
26//!
27//! // Spawn a monitored background task
28//! let handle = monitor.spawn_background_task("my_task", async {
29//! loop {
30//! // Do background work...
31//! matrix_sdk_common::sleep::sleep(std::time::Duration::from_secs(1))
32//! .await;
33//! }
34//! });
35//!
36//! // It's also possible to have the task be aborted safely (and without a report)
37//! // when the handle is dropped.
38//! let _handle = handle.abort_on_drop();
39//!
40//! // Listen for failures in another task
41//! // while let Ok(failure) = failures.recv().await {
42//! // eprintln!("Task {} failed: {:?}", failure.task.name, failure.reason);
43//! // }
44//! ```
45//!
46//! ## A word about unwind safety
47//!
48//! This assumes that all the code running inside the monitored tasks is [unwind
49//! safe](https://doc.rust-lang.org/std/panic/trait.UnwindSafe.html). The assumption is that these
50//! are long-running tasks that:
51//!
52//! - should not panic under normal operation,
53//! - will not be automatically restarted with state shared previously (they can
54//! be restarted, but in this case they have to be restarted with a clean
55//! state).
56//!
57//! In general, observers of the task monitor should consider any reported
58//! failure as fatal, and they may decide to report the error one way or another
59//! (e.g., logging, metrics) and subsequently crash the process to avoid running
60//! in a potentially corrupted state.
61//!
62//! ## WebAssembly (WASM) support
63//!
64//! Unfortunately, safe unwinding isn't supported on most WASM targets, as of
65//! 2026-01-28, so panics in monitored tasks cannot be caught and reported.
66//! Instead, a panic in a monitored task may throw a JS exception. The rest of
67//! the monitoring features (error reporting, early termination)
68//! is still functional, though.
69
70use std::{
71 any::Any,
72 collections::HashMap,
73 future::Future,
74 panic::AssertUnwindSafe,
75 sync::{
76 Arc,
77 atomic::{AtomicBool, AtomicU64, Ordering},
78 },
79};
80
81use futures_util::FutureExt;
82use tokio::sync::broadcast;
83
84use crate::{
85 SendOutsideWasm,
86 executor::{AbortHandle, spawn},
87 locks::RwLock,
88};
89
90/// Unique identifier for a background task.
91#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
92pub struct TaskId(u64);
93
94impl TaskId {
95 /// Create a new unique task ID, by incrementing a global counter.
96 fn new() -> Self {
97 static NEXT_ID: AtomicU64 = AtomicU64::new(0);
98 Self(NEXT_ID.fetch_add(1, Ordering::SeqCst))
99 }
100}
101
102impl std::fmt::Display for TaskId {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 write!(f, "TaskId({})", self.0)
105 }
106}
107
108/// Metadata about a spawned background task.
109#[derive(Debug, Clone)]
110pub struct BackgroundTaskInfo {
111 /// Unique identifier for this task.
112 pub id: TaskId,
113
114 /// Human-readable name for the task, as defined when spawning it.
115 pub name: String,
116}
117
118/// Reason why a background task failed.
119#[derive(Debug, Clone)]
120#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
121pub enum BackgroundTaskFailureReason {
122 /// The task panicked.
123 Panic {
124 /// The panic message, if it could be extracted.
125 message: Option<String>,
126 /// Backtrace captured after the panic (if available).
127 panic_backtrace: Option<String>,
128 },
129
130 /// The task returned an error.
131 Error {
132 /// String representation of the error.
133 // TODO(bnjbvr): consider storing a boxed error instead?
134 error: String,
135 },
136
137 /// The task ended unexpectedly (for tasks expected to run forever).
138 EarlyTermination,
139}
140
141/// A report of a background task failure.
142///
143/// This is sent through the broadcast channel when a monitored task fails.
144#[derive(Debug, Clone)]
145pub struct BackgroundTaskFailure {
146 /// Information about the task that failed.
147 pub task: BackgroundTaskInfo,
148
149 /// Why the task failed.
150 pub reason: BackgroundTaskFailureReason,
151}
152
153/// Internal entry for tracking an active task.
154#[derive(Debug)]
155struct ActiveTask {
156 /// The tokio's handle to preemptively abort the task.
157 // TODO: might be useful to abort on drop?
158 _abort_handle: AbortHandle,
159}
160
161/// Default capacity for the failure broadcast channel.
162///
163/// It doesn't have to be large, because it's expected that consumers of such a
164/// failure report would likely stop execution of the SDK or take immediate
165/// corrective action, and that failures should be rare.
166const FAILURE_CHANNEL_CAPACITY: usize = 8;
167
168/// A monitor for spawning and monitoring background tasks.
169///
170/// The [`TaskMonitor`] allows you to spawn background tasks that are
171/// automatically monitored for panics, errors, and unexpected termination.
172/// In such cases, a [`BackgroundTaskFailure`] is sent through a broadcast
173/// channel that subscribers can listen to.
174///
175/// # Example
176///
177/// ```no_run
178/// use matrix_sdk_common::task_monitor::TaskMonitor;
179///
180/// let monitor = TaskMonitor::new();
181///
182/// // Subscribe to failures
183/// let mut failures = monitor.subscribe();
184///
185/// // Spawn a task that runs indefinitely
186/// let _handle = monitor.spawn_background_task("worker", async {
187/// loop {
188/// // Do work...
189/// matrix_sdk_common::sleep::sleep(std::time::Duration::from_secs(1))
190/// .await;
191/// }
192/// });
193/// ```
194#[derive(Debug)]
195pub struct TaskMonitor {
196 /// Sender for failure notifications.
197 failure_sender: broadcast::Sender<BackgroundTaskFailure>,
198
199 /// Map of active tasks by ID.
200 active_task_handles: Arc<RwLock<HashMap<TaskId, ActiveTask>>>,
201}
202
203impl Default for TaskMonitor {
204 fn default() -> Self {
205 Self::new()
206 }
207}
208
209impl TaskMonitor {
210 /// Create a new task monitor.
211 pub fn new() -> Self {
212 let (failure_sender, _) = broadcast::channel(FAILURE_CHANNEL_CAPACITY);
213 Self { failure_sender, active_task_handles: Default::default() }
214 }
215
216 /// Subscribe to failure notifications.
217 ///
218 /// Returns a broadcast receiver that will receive [`BackgroundTaskFailure`]
219 /// messages whenever a monitored task fails.
220 ///
221 /// Note: If the receiver falls behind, older messages may be dropped.
222 pub fn subscribe(&self) -> broadcast::Receiver<BackgroundTaskFailure> {
223 self.failure_sender.subscribe()
224 }
225
226 /// Spawn a background task that is expected to run indefinitely.
227 ///
228 /// If the task completes (whether successfully or by panicking), it will be
229 /// reported as a [`BackgroundTaskFailure`] report through the broadcast
230 /// channel.
231 ///
232 /// Use this for long-running tasks like event loops, sync tasks, or
233 /// background workers that should never complete under normal
234 /// operation.
235 ///
236 /// # Arguments
237 ///
238 /// * `name` - A human-readable name for the task (for debugging purposes).
239 /// * `future` - The async task to run.
240 ///
241 /// # Returns
242 ///
243 /// A [`BackgroundTaskHandle`] that can be used to abort the task or check
244 /// if it has finished. This is the equivalent of tokio's `JoinHandle`.
245 pub fn spawn_background_task<F>(
246 &self,
247 name: impl Into<String>,
248 future: F,
249 ) -> BackgroundTaskHandle
250 where
251 F: Future<Output = ()> + SendOutsideWasm + 'static,
252 {
253 let name = name.into();
254 let task_id = TaskId::new();
255 let task_info = BackgroundTaskInfo { id: task_id, name };
256
257 let intentionally_aborted = Arc::new(AtomicBool::new(false));
258
259 let active_tasks = self.active_task_handles.clone();
260 let failure_sender = self.failure_sender.clone();
261 let aborted_flag = intentionally_aborted.clone();
262
263 let wrapped = async move {
264 // SAFETY: see module-level documentation about unwind safety.
265 let result = AssertUnwindSafe(future).catch_unwind().await;
266
267 // Remove the task from the list of active ones.
268 active_tasks.write().remove(&task_id);
269
270 // Don't report if intentionally aborted.
271 if aborted_flag.load(Ordering::Acquire) {
272 return;
273 }
274
275 let failure_reason = match result {
276 Ok(()) => {
277 // The task ended, this is considered an early termination.
278 BackgroundTaskFailureReason::EarlyTermination
279 }
280
281 Err(panic_payload) => BackgroundTaskFailureReason::Panic {
282 message: extract_panic_message(&panic_payload),
283 panic_backtrace: capture_backtrace(),
284 },
285 };
286
287 let failure = BackgroundTaskFailure { task: task_info, reason: failure_reason };
288
289 // Forward failure to observers (ignore if there's none).
290 let _ = failure_sender.send(failure);
291 };
292
293 let join_handle = spawn(wrapped);
294 let abort_handle = join_handle.abort_handle();
295
296 // Register the task.
297 self.active_task_handles
298 .write()
299 .insert(task_id, ActiveTask { _abort_handle: abort_handle.clone() });
300
301 BackgroundTaskHandle { abort_on_drop: false, abort_handle, intentionally_aborted }
302 }
303
304 /// Spawn a background task that returns a `Result`.
305 ///
306 /// The task is monitored for panics and errors; see also
307 /// [`BackgroundTaskFailure`].
308 ///
309 /// If the task returns `Ok(())`, it is considered successful and no failure
310 /// is reported.
311 ///
312 /// # Arguments
313 ///
314 /// * `name` - A human-readable name for the task (for debugging purposes).
315 /// * `future` - The async task to run.
316 ///
317 /// # Returns
318 ///
319 /// A [`BackgroundTaskHandle`] that can be used to abort the task or check
320 /// if it has finished. This is the equivalent of tokio's `JoinHandle`.
321 pub fn spawn_fallible_task<F, E>(
322 &self,
323 name: impl Into<String>,
324 future: F,
325 ) -> BackgroundTaskHandle
326 where
327 F: Future<Output = Result<(), E>> + SendOutsideWasm + 'static,
328 E: std::error::Error + SendOutsideWasm + 'static,
329 {
330 let name = name.into();
331 let task_id = TaskId::new();
332 let task_info = BackgroundTaskInfo { id: task_id, name };
333
334 let intentionally_aborted = Arc::new(AtomicBool::new(false));
335
336 let active_tasks = self.active_task_handles.clone();
337 let failure_sender = self.failure_sender.clone();
338 let aborted_flag = intentionally_aborted.clone();
339
340 let wrapped = async move {
341 let result = AssertUnwindSafe(future).catch_unwind().await;
342
343 active_tasks.write().remove(&task_id);
344
345 // Don't report if intentionally aborted.
346 if aborted_flag.load(Ordering::Acquire) {
347 return;
348 }
349
350 let failure_reason = match result {
351 Ok(Ok(())) => {
352 // The task ended successfully, no failure to report.
353 return;
354 }
355
356 Ok(Err(e)) => BackgroundTaskFailureReason::Error { error: e.to_string() },
357
358 Err(panic_payload) => BackgroundTaskFailureReason::Panic {
359 message: extract_panic_message(&panic_payload),
360 panic_backtrace: capture_backtrace(),
361 },
362 };
363
364 // Send failure (ignore if no receivers).
365 let _ = failure_sender
366 .send(BackgroundTaskFailure { task: task_info, reason: failure_reason });
367 };
368
369 let join_handle = spawn(wrapped);
370 let abort_handle = join_handle.abort_handle();
371
372 // Register the task.
373 self.active_task_handles
374 .write()
375 .insert(task_id, ActiveTask { _abort_handle: abort_handle.clone() });
376
377 BackgroundTaskHandle { abort_on_drop: false, abort_handle, intentionally_aborted }
378 }
379}
380
381/// A handle to a spawned background task.
382///
383/// This handle can be used to abort the task or check if it has finished.
384/// When aborted through this handle, the task will NOT be reported as a
385/// failure.
386#[derive(Debug)]
387pub struct BackgroundTaskHandle {
388 /// The underlying tokio's [`AbortHandle`].
389 abort_handle: AbortHandle,
390
391 /// Should the task be safely aborted on drop?
392 ///
393 /// This won't result in a failure report, as it's an intentional abort.
394 abort_on_drop: bool,
395
396 /// An additional flag to indicate if the task was intentionally aborted, so
397 /// we don't report it as a failure when that happens.
398 intentionally_aborted: Arc<AtomicBool>,
399}
400
401impl Drop for BackgroundTaskHandle {
402 fn drop(&mut self) {
403 if self.abort_on_drop {
404 self.abort();
405 }
406 }
407}
408
409impl BackgroundTaskHandle {
410 /// Configure the handle to abort the task when dropped.
411 ///
412 /// The task will be stopped and will NOT be reported as a failure
413 /// (this is considered intentional termination).
414 pub fn abort_on_drop(mut self) -> Self {
415 self.abort_on_drop = true;
416 self
417 }
418
419 /// Abort the task.
420 ///
421 /// The task will be stopped and will NOT be reported as a failure
422 /// (this is considered intentional termination).
423 pub fn abort(&self) {
424 // Note: ordering matters here, we set the flag before aborting otherwise
425 // there's a possible race condition where the abort() is observed
426 // before the flag is set, and the task monitor would consider this an
427 // unexpected termination.
428 self.intentionally_aborted.store(true, Ordering::Release);
429 self.abort_handle.abort();
430 }
431
432 /// Check if the task has finished.
433 ///
434 /// Returns `true` if the task completed, panicked, or was aborted on
435 /// non-wasm; on wasm, returns whether the task has been aborted only
436 /// (due to lack of better APIs).
437 pub fn is_finished(&self) -> bool {
438 #[cfg(not(target_family = "wasm"))]
439 {
440 self.abort_handle.is_finished()
441 }
442 #[cfg(target_family = "wasm")]
443 {
444 self.abort_handle.is_aborted()
445 }
446 }
447}
448
449/// Capture a backtrace at the current location.
450///
451/// Returns `None` if backtraces are not enabled or not available.
452#[cfg(not(target_family = "wasm"))]
453fn capture_backtrace() -> Option<String> {
454 use std::backtrace::{Backtrace, BacktraceStatus};
455
456 let bt = Backtrace::capture();
457 if bt.status() == BacktraceStatus::Captured { Some(bt.to_string()) } else { None }
458}
459
460/// Capture a backtrace - WASM version (backtraces not typically available).
461#[cfg(target_family = "wasm")]
462fn capture_backtrace() -> Option<String> {
463 None
464}
465
466/// Extract a message from a panic payload.
467fn extract_panic_message(payload: &Box<dyn Any + Send>) -> Option<String> {
468 if let Some(s) = payload.downcast_ref::<&str>() {
469 Some((*s).to_owned())
470 } else {
471 payload.downcast_ref::<String>().cloned()
472 }
473}
474
475#[cfg(test)]
476mod tests {
477 use std::time::Duration;
478
479 use assert_matches::assert_matches;
480 use matrix_sdk_test_macros::async_test;
481
482 use super::{BackgroundTaskFailureReason, TaskMonitor};
483 use crate::{sleep::sleep, timeout::timeout};
484
485 #[async_test]
486 async fn test_early_termination_is_reported() {
487 let monitor = TaskMonitor::new();
488 let mut failures = monitor.subscribe();
489
490 // Spawn a task that completes immediately.
491 let _handle = monitor.spawn_background_task("test_task", async {
492 // Completes immediately: this is an "early termination".
493 });
494
495 // Should receive an early termination failure.
496 let failure = timeout(failures.recv(), Duration::from_secs(1))
497 .await
498 .expect("timeout waiting for failure")
499 .expect("channel closed");
500
501 assert_eq!(failure.task.name, "test_task");
502 assert_matches!(failure.reason, BackgroundTaskFailureReason::EarlyTermination);
503 }
504
505 #[async_test]
506 #[cfg(not(target_family = "wasm"))] // Unfortunately, safe unwinding doesn't work on wasm.
507 async fn test_panic_is_captured() {
508 let monitor = TaskMonitor::new();
509 let mut failures = monitor.subscribe();
510
511 // Spawn a task that panics.
512 let _handle = monitor.spawn_background_task("panicking_task", async {
513 panic!("test panic message");
514 });
515
516 // Should receive a panic failure.
517 let failure = timeout(failures.recv(), Duration::from_secs(1))
518 .await
519 .expect("timeout waiting for failure")
520 .expect("channel closed");
521
522 assert_eq!(failure.task.name, "panicking_task");
523 assert_matches!(
524 failure.reason,
525 BackgroundTaskFailureReason::Panic { message, .. } => {
526 assert_eq!(message.as_deref(), Some("test panic message"));
527 }
528 );
529 }
530
531 #[async_test]
532 async fn test_error_is_captured() {
533 let monitor = TaskMonitor::new();
534 let mut failures = monitor.subscribe();
535
536 // Spawn a fallible task that returns an error.
537 let _handle = monitor.spawn_fallible_task("fallible_task", async {
538 Err::<(), _>(std::io::Error::other("test error message"))
539 });
540
541 // Should receive an error failure.
542 let failure = timeout(failures.recv(), Duration::from_secs(1))
543 .await
544 .expect("timeout waiting for failure")
545 .expect("channel closed");
546
547 assert_eq!(failure.task.name, "fallible_task");
548 assert_matches!(
549 failure.reason,
550 BackgroundTaskFailureReason::Error { error } => {
551 assert!(error.contains("test error message"));
552 }
553 );
554 }
555
556 #[async_test]
557 async fn test_successful_fallible_task_no_failure() {
558 let monitor = TaskMonitor::new();
559 let mut failures = monitor.subscribe();
560
561 // Spawn a fallible task that succeeds.
562 let _handle =
563 monitor.spawn_fallible_task("success_task", async { Ok::<(), std::io::Error>(()) });
564
565 // Should NOT receive any failure: use a short timeout.
566 let result = timeout(failures.recv(), Duration::from_millis(100)).await;
567 assert!(result.is_err(), "should timeout, no failure expected");
568 }
569
570 #[async_test]
571 async fn test_abort_does_not_report_failure() {
572 let monitor = TaskMonitor::new();
573 let mut failures = monitor.subscribe();
574
575 // Spawn a long-running task.
576 let handle = monitor.spawn_background_task("aborted_task", async {
577 loop {
578 sleep(Duration::from_secs(10)).await;
579 }
580 });
581
582 // Give the task time to start.
583 sleep(Duration::from_millis(10)).await;
584
585 // Abort it.
586 handle.abort();
587
588 // Should NOT receive a failure for intentional abort.
589 let result = timeout(failures.recv(), Duration::from_millis(100)).await;
590 assert!(result.is_err(), "should timeout, no failure expected for abort");
591
592 assert!(handle.is_finished(), "task should be finished after abort");
593 }
594
595 #[async_test]
596 async fn test_abort_on_drop_does_not_report_failure() {
597 let monitor = TaskMonitor::new();
598 let mut failures = monitor.subscribe();
599
600 // Spawn a long-running task.
601 let handle = monitor
602 .spawn_background_task("aborted_task", async {
603 loop {
604 sleep(Duration::from_secs(10)).await;
605 }
606 })
607 .abort_on_drop();
608
609 // Give the task time to start.
610 sleep(Duration::from_millis(10)).await;
611
612 // Abort it.
613 drop(handle);
614
615 // Should NOT receive a failure for intentional abort.
616 let result = timeout(failures.recv(), Duration::from_millis(100)).await;
617 assert!(result.is_err(), "should timeout, no failure expected for abort");
618 }
619}