autopulse_utils/
task_manager.rs

1use std::sync::Arc;
2use tokio::{
3    sync::{Mutex, Notify},
4    task::JoinHandle,
5};
6
7pub struct TaskManager {
8    tasks: Arc<Mutex<Vec<Arc<JoinHandle<()>>>>>,
9    shutdown: Arc<Notify>,
10}
11
12impl Default for TaskManager {
13    fn default() -> Self {
14        Self::new()
15    }
16}
17
18impl TaskManager {
19    pub fn new() -> Self {
20        Self {
21            tasks: Arc::new(Mutex::new(Vec::new())),
22            shutdown: Arc::new(Notify::new()),
23        }
24    }
25
26    /// Spawns a new task and tracks it
27    pub async fn spawn<F, T>(&self, fut: F) -> Arc<JoinHandle<()>>
28    where
29        F: std::future::Future<Output = T> + Send + 'static,
30        T: Send + 'static,
31    {
32        let shutdown = self.shutdown.clone();
33        let fut = async move {
34            tokio::select! {
35                _ = fut => {},
36                () = shutdown.notified() => {
37                    // Task was asked to shut down
38                },
39            }
40        };
41
42        let handle: JoinHandle<()> = tokio::spawn(fut);
43        let handle = Arc::new(handle);
44
45        self.tasks.lock().await.push(handle.clone());
46
47        handle
48    }
49
50    /// Signals all tasks to shut down
51    pub async fn shutdown(&self) -> anyhow::Result<()> {
52        self.shutdown.notify_waiters();
53
54        let tasks = self.tasks.lock().await.drain(..).collect::<Vec<_>>();
55        for handle in tasks {
56            handle.abort();
57        }
58
59        Ok(())
60    }
61}