Skip to main content

autopulse_service/settings/triggers/
notify.rs

1use crate::settings::rewrite::Rewrite;
2use crate::settings::timer::Timer;
3use crate::settings::triggers::TriggerConfig;
4use autopulse_utils::regex::Regex;
5use notify_debouncer_full::{
6    new_debouncer, new_debouncer_opt,
7    notify::{
8        event::{AccessKind, AccessMode, ModifyKind, RenameMode},
9        Config, EventKind, PollWatcher, RecommendedWatcher, RecursiveMode,
10    },
11    DebounceEventResult, Debouncer, NoCache, RecommendedCache,
12};
13use serde::{Deserialize, Serialize};
14use std::{path::PathBuf, time::Duration};
15use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
16use tracing::{error, trace};
17
18#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Debug, Default)]
19pub enum NotifyBackendType {
20    #[serde(rename = "recommended")]
21    /// Uses the recommended backend such as `inotify` on Linux, `FSEvents` on macOS, and `ReadDirectoryChangesW` on Windows
22    #[default]
23    Recommended,
24    #[serde(rename = "polling")]
25    /// Uses a polling backend (useful for rclone/nfs/etc mounts), which will be extremely inefficient with a high number of files
26    Polling,
27}
28
29#[doc(hidden)]
30pub enum NotifyBackend {
31    Recommended(Debouncer<RecommendedWatcher, RecommendedCache>),
32    Polling(Debouncer<PollWatcher, NoCache>),
33}
34
35impl NotifyBackend {
36    pub fn watch(&mut self, path: String, mode: RecursiveMode) -> anyhow::Result<()> {
37        let path = std::path::Path::new(&path);
38
39        match self {
40            Self::Recommended(debouncer) => debouncer.watch(path, mode).map_err(Into::into),
41            Self::Polling(debouncer) => debouncer.watch(path, mode).map_err(Into::into),
42        }
43    }
44}
45
46#[derive(Serialize, Deserialize, Clone)]
47pub struct Notify {
48    /// Paths to monitor
49    pub paths: Vec<String>,
50    /// Rewrite path
51    pub rewrite: Option<Rewrite>,
52    /// Recursive monitoring (default: true)
53    pub recursive: Option<bool>,
54    /// Backend to use
55    /// - `recommended`: Uses the recommended backend such as `inotify` on Linux, `FSEvents` on macOS, and `ReadDirectoryChangesW` on Windows
56    /// - `polling`: Uses a polling backend (useful for rclone/nfs/etc mounts), which will be extremely inefficient with a high number of files
57    #[serde(default)]
58    pub backend: NotifyBackendType,
59    /// Filter by regex
60    pub filters: Option<Vec<String>>,
61
62    /// Targets to exclude
63    #[serde(default)]
64    pub excludes: Vec<String>,
65    /// Timer
66    pub timer: Option<Timer>,
67    /// Debounce timeout in seconds (default: 2)
68    pub debounce: Option<u64>,
69}
70
71impl TriggerConfig for Notify {
72    fn rewrite(&self) -> Option<&Rewrite> {
73        self.rewrite.as_ref()
74    }
75
76    fn timer(&self) -> Option<&Timer> {
77        self.timer.as_ref()
78    }
79
80    fn excludes(&self) -> &Vec<String> {
81        &self.excludes
82    }
83}
84
85impl Notify {
86    pub fn send_event(
87        &self,
88        tx: UnboundedSender<(String, EventKind)>,
89        path: Option<&PathBuf>,
90        reason: EventKind,
91    ) -> anyhow::Result<()> {
92        if path.is_none() {
93            return Ok(());
94        }
95
96        let mut path = path.unwrap().to_string_lossy().to_string();
97
98        if let Some(ref filters) = self.filters {
99            let mut matched = false;
100
101            for regex in filters {
102                if Regex::new(regex)?.is_match(&path) {
103                    matched = true;
104                    break;
105                }
106            }
107
108            if !matched {
109                return Ok(());
110            }
111        }
112
113        if let Some(rewrite) = &self.rewrite {
114            path = rewrite.rewrite_path(path);
115        }
116
117        tx.send((path, reason)).map_err(|e| anyhow::anyhow!(e))
118    }
119
120    pub fn async_watcher(
121        &self,
122    ) -> anyhow::Result<(NotifyBackend, UnboundedReceiver<DebounceEventResult>)> {
123        let (tx, rx) = unbounded_channel();
124
125        let event_handler = move |result: DebounceEventResult| {
126            if let Err(e) = tx.send(result) {
127                error!("failed to process notify event: {e}");
128            }
129        };
130
131        let timeout = Duration::from_secs(self.debounce.unwrap_or(2));
132
133        if self.backend == NotifyBackendType::Recommended {
134            let debouncer = new_debouncer(timeout, None, event_handler)?;
135
136            Ok((NotifyBackend::Recommended(debouncer), rx))
137        } else {
138            let debouncer = new_debouncer_opt::<_, PollWatcher, NoCache>(
139                timeout,
140                None,
141                event_handler,
142                NoCache,
143                Config::default().with_poll_interval(Duration::from_secs(10)),
144            )?;
145
146            Ok((NotifyBackend::Polling(debouncer), rx))
147        }
148    }
149
150    pub async fn watcher(&self, tx: UnboundedSender<(String, EventKind)>) -> anyhow::Result<()> {
151        let (mut watcher, mut rx) = self.async_watcher()?;
152
153        for path in &self.paths {
154            let start = std::time::Instant::now();
155
156            let recursive_mode = self.recursive.unwrap_or(true);
157
158            watcher.watch(
159                path.clone(),
160                if recursive_mode {
161                    RecursiveMode::Recursive
162                } else {
163                    RecursiveMode::NonRecursive
164                },
165            )?;
166
167            if let NotifyBackend::Polling(_) = watcher {
168                trace!("watching '{}' took: {:?}", path, start.elapsed());
169            }
170        }
171
172        while let Some(result) = rx.recv().await {
173            match result {
174                Ok(events) => {
175                    for debounced_event in events {
176                        let kind = debounced_event.event.kind;
177
178                        match kind {
179                            EventKind::Access(AccessKind::Close(AccessMode::Write))
180                            | EventKind::Modify(
181                                ModifyKind::Metadata(_) | ModifyKind::Name(RenameMode::Both),
182                            )
183                            | EventKind::Create(_)
184                            | EventKind::Remove(_) => {
185                                for path in debounced_event.event.paths {
186                                    self.send_event(tx.clone(), Some(&path), kind)?;
187                                }
188                            }
189                            _ => {}
190                        }
191                    }
192                }
193                Err(errors) => {
194                    for error in errors {
195                        error!("failed to process notify event: {error}");
196                    }
197                }
198            }
199        }
200
201        Ok(())
202    }
203}