autopulse_service/settings/triggers/
notify.rs

1use crate::settings::rewrite::Rewrite;
2use crate::settings::timer::Timer;
3use autopulse_utils::regex::Regex;
4use notify::{
5    event::{ModifyKind, RenameMode},
6    Config, Event, EventKind, PollWatcher, RecommendedWatcher, RecursiveMode, Watcher,
7};
8use serde::{Deserialize, Serialize};
9use std::{path::PathBuf, time::Duration};
10use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
11use tracing::{error, trace};
12
13#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Debug, Default)]
14pub enum NotifyBackendType {
15    #[serde(rename = "recommended")]
16    /// Uses the recommended backend such as `inotify` on Linux, `FSEvents` on macOS, and `ReadDirectoryChangesW` on Windows
17    #[default]
18    Recommended,
19    #[serde(rename = "polling")]
20    /// Uses a polling backend (useful for rclone/nfs/etc mounts), which will be extremely inefficient with a high number of files
21    Polling,
22}
23
24#[doc(hidden)]
25pub enum NotifyBackend {
26    Recommended(RecommendedWatcher),
27    Polling(PollWatcher),
28}
29
30impl NotifyBackend {
31    pub fn watch(&mut self, path: String, mode: RecursiveMode) -> anyhow::Result<()> {
32        match self {
33            Self::Recommended(watcher) => watcher.watch(path.as_ref(), mode).map_err(Into::into),
34            Self::Polling(watcher) => watcher.watch(path.as_ref(), mode).map_err(Into::into),
35        }
36    }
37}
38
39#[derive(Serialize, Deserialize, Clone)]
40pub struct Notify {
41    /// Paths to monitor
42    pub paths: Vec<String>,
43    /// Rewrite path
44    pub rewrite: Option<Rewrite>,
45    /// Recursive monitoring (default: true)
46    pub recursive: Option<bool>,
47    /// Backend to use
48    /// - `recommended`: Uses the recommended backend such as `inotify` on Linux, `FSEvents` on macOS, and `ReadDirectoryChangesW` on Windows
49    /// - `polling`: Uses a polling backend (useful for rclone/nfs/etc mounts), which will be extremely inefficient with a high number of files
50    #[serde(default)]
51    pub backend: NotifyBackendType,
52    /// Filter by regex
53    pub filters: Option<Vec<String>>,
54
55    /// Targets to exclude
56    #[serde(default)]
57    pub excludes: Vec<String>,
58    /// Timer
59    pub timer: Option<Timer>,
60}
61
62impl Notify {
63    pub fn send_event(
64        &self,
65        tx: UnboundedSender<(String, EventKind)>,
66        path: Option<&PathBuf>,
67        reason: EventKind,
68    ) -> anyhow::Result<()> {
69        if path.is_none() {
70            return Ok(());
71        }
72
73        let mut path = path.unwrap().to_string_lossy().to_string();
74
75        if let Some(ref filters) = self.filters {
76            let mut matched = false;
77
78            for regex in filters {
79                if Regex::new(regex)?.is_match(&path) {
80                    matched = true;
81                    break;
82                }
83            }
84
85            if !matched {
86                return Ok(());
87            }
88        }
89
90        if let Some(rewrite) = &self.rewrite {
91            path = rewrite.rewrite_path(path);
92        }
93
94        tx.send((path, reason)).map_err(|e| anyhow::anyhow!(e))
95    }
96
97    pub fn async_watcher(
98        &self,
99    ) -> anyhow::Result<(NotifyBackend, UnboundedReceiver<notify::Result<Event>>)> {
100        let (tx, rx) = unbounded_channel();
101
102        let event_handler = move |res| {
103            if let Err(e) = tx.send(res) {
104                error!("failed to process notify event: {e}");
105            }
106        };
107
108        if self.backend == NotifyBackendType::Recommended {
109            let watcher = RecommendedWatcher::new(event_handler, Config::default())?;
110
111            Ok((NotifyBackend::Recommended(watcher), rx))
112        } else {
113            let watcher = PollWatcher::new(
114                event_handler,
115                Config::default().with_poll_interval(Duration::from_secs(10)),
116            )?;
117
118            // watcher.poll()?;
119
120            Ok((NotifyBackend::Polling(watcher), rx))
121        }
122    }
123
124    pub async fn watcher(&self, tx: UnboundedSender<(String, EventKind)>) -> anyhow::Result<()> {
125        let (mut watcher, mut rx) = self.async_watcher()?;
126
127        for path in &self.paths {
128            let start = std::time::Instant::now();
129
130            let recursive_mode = self.recursive.unwrap_or(true);
131
132            watcher.watch(
133                path.clone(),
134                if recursive_mode {
135                    RecursiveMode::Recursive
136                } else {
137                    RecursiveMode::NonRecursive
138                },
139            )?;
140
141            if let NotifyBackend::Polling(_) = watcher {
142                trace!("watching '{}' took: {:?}", path, start.elapsed());
143            }
144        }
145
146        while let Some(res) = rx.recv().await {
147            match res {
148                Ok(event) => match event.kind {
149                    EventKind::Modify(
150                        ModifyKind::Data(_)
151                        | ModifyKind::Metadata(_)
152                        | ModifyKind::Name(RenameMode::Both),
153                    )
154                    | EventKind::Create(_)
155                    | EventKind::Remove(_) => {
156                        for path in event.paths {
157                            self.send_event(tx.clone(), Some(&path), event.kind)?;
158                        }
159                    }
160                    _ => {}
161                },
162                Err(e) => error!("failed to process notify event: {e}"),
163            }
164        }
165
166        Ok(())
167    }
168}