autopulse_service/settings/triggers/
notify.rs

1use crate::settings::rewrite::Rewrite;
2use crate::settings::timer::Timer;
3use autopulse_utils::regex::Regex;
4use notify_debouncer_full::{
5    new_debouncer, new_debouncer_opt,
6    notify::{
7        event::{AccessKind, AccessMode, ModifyKind, RenameMode},
8        Config, EventKind, PollWatcher, RecommendedWatcher, RecursiveMode,
9    },
10    DebounceEventResult, Debouncer, NoCache, RecommendedCache,
11};
12use serde::{Deserialize, Serialize};
13use std::{path::PathBuf, time::Duration};
14use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
15use tracing::{error, trace};
16
17#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Debug, Default)]
18pub enum NotifyBackendType {
19    #[serde(rename = "recommended")]
20    /// Uses the recommended backend such as `inotify` on Linux, `FSEvents` on macOS, and `ReadDirectoryChangesW` on Windows
21    #[default]
22    Recommended,
23    #[serde(rename = "polling")]
24    /// Uses a polling backend (useful for rclone/nfs/etc mounts), which will be extremely inefficient with a high number of files
25    Polling,
26}
27
28#[doc(hidden)]
29pub enum NotifyBackend {
30    Recommended(Debouncer<RecommendedWatcher, RecommendedCache>),
31    Polling(Debouncer<PollWatcher, NoCache>),
32}
33
34impl NotifyBackend {
35    pub fn watch(&mut self, path: String, mode: RecursiveMode) -> anyhow::Result<()> {
36        let path = std::path::Path::new(&path);
37
38        match self {
39            Self::Recommended(debouncer) => debouncer.watch(path, mode).map_err(Into::into),
40            Self::Polling(debouncer) => debouncer.watch(path, mode).map_err(Into::into),
41        }
42    }
43}
44
45#[derive(Serialize, Deserialize, Clone)]
46pub struct Notify {
47    /// Paths to monitor
48    pub paths: Vec<String>,
49    /// Rewrite path
50    pub rewrite: Option<Rewrite>,
51    /// Recursive monitoring (default: true)
52    pub recursive: Option<bool>,
53    /// Backend to use
54    /// - `recommended`: Uses the recommended backend such as `inotify` on Linux, `FSEvents` on macOS, and `ReadDirectoryChangesW` on Windows
55    /// - `polling`: Uses a polling backend (useful for rclone/nfs/etc mounts), which will be extremely inefficient with a high number of files
56    #[serde(default)]
57    pub backend: NotifyBackendType,
58    /// Filter by regex
59    pub filters: Option<Vec<String>>,
60
61    /// Targets to exclude
62    #[serde(default)]
63    pub excludes: Vec<String>,
64    /// Timer
65    pub timer: Option<Timer>,
66    /// Debounce timeout in seconds (default: 2)
67    pub debounce: Option<u64>,
68}
69
70impl Notify {
71    pub fn send_event(
72        &self,
73        tx: UnboundedSender<(String, EventKind)>,
74        path: Option<&PathBuf>,
75        reason: EventKind,
76    ) -> anyhow::Result<()> {
77        if path.is_none() {
78            return Ok(());
79        }
80
81        let mut path = path.unwrap().to_string_lossy().to_string();
82
83        if let Some(ref filters) = self.filters {
84            let mut matched = false;
85
86            for regex in filters {
87                if Regex::new(regex)?.is_match(&path) {
88                    matched = true;
89                    break;
90                }
91            }
92
93            if !matched {
94                return Ok(());
95            }
96        }
97
98        if let Some(rewrite) = &self.rewrite {
99            path = rewrite.rewrite_path(path);
100        }
101
102        tx.send((path, reason)).map_err(|e| anyhow::anyhow!(e))
103    }
104
105    pub fn async_watcher(
106        &self,
107    ) -> anyhow::Result<(NotifyBackend, UnboundedReceiver<DebounceEventResult>)> {
108        let (tx, rx) = unbounded_channel();
109
110        let event_handler = move |result: DebounceEventResult| {
111            if let Err(e) = tx.send(result) {
112                error!("failed to process notify event: {e}");
113            }
114        };
115
116        let timeout = Duration::from_secs(self.debounce.unwrap_or(2));
117
118        if self.backend == NotifyBackendType::Recommended {
119            let debouncer = new_debouncer(timeout, None, event_handler)?;
120
121            Ok((NotifyBackend::Recommended(debouncer), rx))
122        } else {
123            let debouncer = new_debouncer_opt::<_, PollWatcher, NoCache>(
124                timeout,
125                None,
126                event_handler,
127                NoCache,
128                Config::default().with_poll_interval(Duration::from_secs(10)),
129            )?;
130
131            Ok((NotifyBackend::Polling(debouncer), rx))
132        }
133    }
134
135    pub async fn watcher(&self, tx: UnboundedSender<(String, EventKind)>) -> anyhow::Result<()> {
136        let (mut watcher, mut rx) = self.async_watcher()?;
137
138        for path in &self.paths {
139            let start = std::time::Instant::now();
140
141            let recursive_mode = self.recursive.unwrap_or(true);
142
143            watcher.watch(
144                path.clone(),
145                if recursive_mode {
146                    RecursiveMode::Recursive
147                } else {
148                    RecursiveMode::NonRecursive
149                },
150            )?;
151
152            if let NotifyBackend::Polling(_) = watcher {
153                trace!("watching '{}' took: {:?}", path, start.elapsed());
154            }
155        }
156
157        while let Some(result) = rx.recv().await {
158            match result {
159                Ok(events) => {
160                    for debounced_event in events {
161                        let kind = debounced_event.event.kind;
162
163                        match kind {
164                            EventKind::Access(AccessKind::Close(AccessMode::Write))
165                            | EventKind::Modify(
166                                ModifyKind::Metadata(_) | ModifyKind::Name(RenameMode::Both),
167                            )
168                            | EventKind::Create(_)
169                            | EventKind::Remove(_) => {
170                                for path in debounced_event.event.paths {
171                                    self.send_event(tx.clone(), Some(&path), kind)?;
172                                }
173                            }
174                            _ => {}
175                        }
176                    }
177                }
178                Err(errors) => {
179                    for error in errors {
180                        error!("failed to process notify event: {error}");
181                    }
182                }
183            }
184        }
185
186        Ok(())
187    }
188}