Skip to main content

autopulse_service/settings/triggers/
notify.rs

1use crate::settings::path_filter::PathFilter;
2use crate::settings::rewrite::Rewrite;
3use crate::settings::timer::Timer;
4use crate::settings::triggers::TriggerConfig;
5use autopulse_utils::regex::Regex;
6use notify_debouncer_full::{
7    new_debouncer, new_debouncer_opt,
8    notify::{
9        event::{AccessKind, AccessMode, ModifyKind, RenameMode},
10        Config, EventKind, PollWatcher, RecommendedWatcher, RecursiveMode,
11    },
12    DebounceEventResult, Debouncer, NoCache, RecommendedCache,
13};
14use serde::{Deserialize, Serialize};
15use std::{path::PathBuf, time::Duration};
16use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
17use tracing::{error, trace};
18
19#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Debug, Default)]
20pub enum NotifyBackendType {
21    #[serde(rename = "recommended")]
22    /// Uses the recommended backend such as `inotify` on Linux, `FSEvents` on macOS, and `ReadDirectoryChangesW` on Windows
23    #[default]
24    Recommended,
25    #[serde(rename = "polling")]
26    /// Uses a polling backend (useful for rclone/nfs/etc mounts), which will be extremely inefficient with a high number of files
27    Polling,
28}
29
30#[doc(hidden)]
31pub enum NotifyBackend {
32    Recommended(Debouncer<RecommendedWatcher, RecommendedCache>),
33    Polling(Debouncer<PollWatcher, NoCache>),
34}
35
36impl NotifyBackend {
37    pub fn watch(&mut self, path: String, mode: RecursiveMode) -> anyhow::Result<()> {
38        let path = std::path::Path::new(&path);
39
40        match self {
41            Self::Recommended(debouncer) => debouncer.watch(path, mode).map_err(Into::into),
42            Self::Polling(debouncer) => debouncer.watch(path, mode).map_err(Into::into),
43        }
44    }
45}
46
47#[derive(Serialize, Deserialize, Clone)]
48pub struct Notify {
49    /// Paths to monitor
50    pub paths: Vec<String>,
51    /// Rewrite path
52    pub rewrite: Option<Rewrite>,
53    /// Recursive monitoring (default: true)
54    pub recursive: Option<bool>,
55    /// Backend to use
56    /// - `recommended`: Uses the recommended backend such as `inotify` on Linux, `FSEvents` on macOS, and `ReadDirectoryChangesW` on Windows
57    /// - `polling`: Uses a polling backend (useful for rclone/nfs/etc mounts), which will be extremely inefficient with a high number of files
58    #[serde(default)]
59    pub backend: NotifyBackendType,
60    /// Filter by regex
61    pub filters: Option<Vec<String>>,
62
63    /// Targets to exclude
64    #[serde(default)]
65    pub excludes: Vec<String>,
66    /// Path filter matched against the rewritten file path.
67    #[serde(default)]
68    pub filter: PathFilter,
69    /// Timer
70    pub timer: Option<Timer>,
71    /// Debounce timeout in seconds (default: 2)
72    pub debounce: Option<u64>,
73}
74
75impl TriggerConfig for Notify {
76    fn rewrite(&self) -> Option<&Rewrite> {
77        self.rewrite.as_ref()
78    }
79
80    fn timer(&self) -> Option<&Timer> {
81        self.timer.as_ref()
82    }
83
84    fn excludes(&self) -> &Vec<String> {
85        &self.excludes
86    }
87
88    fn filter(&self) -> &PathFilter {
89        &self.filter
90    }
91}
92
93impl Notify {
94    pub fn send_event(
95        &self,
96        tx: UnboundedSender<(String, EventKind)>,
97        path: Option<&PathBuf>,
98        reason: EventKind,
99    ) -> anyhow::Result<()> {
100        if path.is_none() {
101            return Ok(());
102        }
103
104        let mut path = path.unwrap().to_string_lossy().to_string();
105
106        if let Some(ref filters) = self.filters {
107            let mut matched = false;
108
109            for regex in filters {
110                if Regex::new(regex)?.is_match(&path) {
111                    matched = true;
112                    break;
113                }
114            }
115
116            if !matched {
117                return Ok(());
118            }
119        }
120
121        if let Some(rewrite) = &self.rewrite {
122            path = rewrite.rewrite_path(path);
123        }
124
125        if !self.filter.allows(&path) {
126            trace!("notify trigger filtered path '{path}'");
127            return Ok(());
128        }
129
130        tx.send((path, reason)).map_err(|e| anyhow::anyhow!(e))
131    }
132
133    pub fn async_watcher(
134        &self,
135    ) -> anyhow::Result<(NotifyBackend, UnboundedReceiver<DebounceEventResult>)> {
136        let (tx, rx) = unbounded_channel();
137
138        let event_handler = move |result: DebounceEventResult| {
139            if let Err(e) = tx.send(result) {
140                error!("failed to process notify event: {e}");
141            }
142        };
143
144        let timeout = Duration::from_secs(self.debounce.unwrap_or(2));
145
146        if self.backend == NotifyBackendType::Recommended {
147            let debouncer = new_debouncer(timeout, None, event_handler)?;
148
149            Ok((NotifyBackend::Recommended(debouncer), rx))
150        } else {
151            let debouncer = new_debouncer_opt::<_, PollWatcher, NoCache>(
152                timeout,
153                None,
154                event_handler,
155                NoCache,
156                Config::default().with_poll_interval(Duration::from_secs(10)),
157            )?;
158
159            Ok((NotifyBackend::Polling(debouncer), rx))
160        }
161    }
162
163    pub async fn watcher(&self, tx: UnboundedSender<(String, EventKind)>) -> anyhow::Result<()> {
164        let (mut watcher, mut rx) = self.async_watcher()?;
165
166        for path in &self.paths {
167            let start = std::time::Instant::now();
168
169            let recursive_mode = self.recursive.unwrap_or(true);
170
171            watcher.watch(
172                path.clone(),
173                if recursive_mode {
174                    RecursiveMode::Recursive
175                } else {
176                    RecursiveMode::NonRecursive
177                },
178            )?;
179
180            if let NotifyBackend::Polling(_) = watcher {
181                trace!("watching '{}' took: {:?}", path, start.elapsed());
182            }
183        }
184
185        while let Some(result) = rx.recv().await {
186            match result {
187                Ok(events) => {
188                    for debounced_event in events {
189                        let kind = debounced_event.event.kind;
190
191                        match kind {
192                            EventKind::Access(AccessKind::Close(AccessMode::Write))
193                            | EventKind::Modify(
194                                ModifyKind::Metadata(_) | ModifyKind::Name(RenameMode::Both),
195                            )
196                            | EventKind::Create(_)
197                            | EventKind::Remove(_) => {
198                                for path in debounced_event.event.paths {
199                                    self.send_event(tx.clone(), Some(&path), kind)?;
200                                }
201                            }
202                            _ => {}
203                        }
204                    }
205                }
206                Err(errors) => {
207                    for error in errors {
208                        error!("failed to process notify event: {error}");
209                    }
210                }
211            }
212        }
213
214        Ok(())
215    }
216}