autopulse_service/settings/triggers/
notify.rs1use crate::settings::path_filter::PathFilter;
2use crate::settings::rewrite::Rewrite;
3use crate::settings::timer::Timer;
4use crate::settings::triggers::TriggerConfig;
5use autopulse_utils::regex::Regex;
6use notify_debouncer_full::{
7 new_debouncer, new_debouncer_opt,
8 notify::{
9 event::{AccessKind, AccessMode, ModifyKind, RenameMode},
10 Config, EventKind, PollWatcher, RecommendedWatcher, RecursiveMode,
11 },
12 DebounceEventResult, Debouncer, NoCache, RecommendedCache,
13};
14use serde::{Deserialize, Serialize};
15use std::{path::PathBuf, time::Duration};
16use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
17use tracing::{error, trace};
18
19#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Debug, Default)]
20pub enum NotifyBackendType {
21 #[serde(rename = "recommended")]
22 #[default]
24 Recommended,
25 #[serde(rename = "polling")]
26 Polling,
28}
29
30#[doc(hidden)]
31pub enum NotifyBackend {
32 Recommended(Debouncer<RecommendedWatcher, RecommendedCache>),
33 Polling(Debouncer<PollWatcher, NoCache>),
34}
35
36impl NotifyBackend {
37 pub fn watch(&mut self, path: String, mode: RecursiveMode) -> anyhow::Result<()> {
38 let path = std::path::Path::new(&path);
39
40 match self {
41 Self::Recommended(debouncer) => debouncer.watch(path, mode).map_err(Into::into),
42 Self::Polling(debouncer) => debouncer.watch(path, mode).map_err(Into::into),
43 }
44 }
45}
46
47#[derive(Serialize, Deserialize, Clone)]
48pub struct Notify {
49 pub paths: Vec<String>,
51 pub rewrite: Option<Rewrite>,
53 pub recursive: Option<bool>,
55 #[serde(default)]
59 pub backend: NotifyBackendType,
60 pub filters: Option<Vec<String>>,
62
63 #[serde(default)]
65 pub excludes: Vec<String>,
66 #[serde(default)]
68 pub filter: PathFilter,
69 pub timer: Option<Timer>,
71 pub debounce: Option<u64>,
73}
74
75impl TriggerConfig for Notify {
76 fn rewrite(&self) -> Option<&Rewrite> {
77 self.rewrite.as_ref()
78 }
79
80 fn timer(&self) -> Option<&Timer> {
81 self.timer.as_ref()
82 }
83
84 fn excludes(&self) -> &Vec<String> {
85 &self.excludes
86 }
87
88 fn filter(&self) -> &PathFilter {
89 &self.filter
90 }
91}
92
93impl Notify {
94 pub fn send_event(
95 &self,
96 tx: UnboundedSender<(String, EventKind)>,
97 path: Option<&PathBuf>,
98 reason: EventKind,
99 ) -> anyhow::Result<()> {
100 if path.is_none() {
101 return Ok(());
102 }
103
104 let mut path = path.unwrap().to_string_lossy().to_string();
105
106 if let Some(ref filters) = self.filters {
107 let mut matched = false;
108
109 for regex in filters {
110 if Regex::new(regex)?.is_match(&path) {
111 matched = true;
112 break;
113 }
114 }
115
116 if !matched {
117 return Ok(());
118 }
119 }
120
121 if let Some(rewrite) = &self.rewrite {
122 path = rewrite.rewrite_path(path);
123 }
124
125 if !self.filter.allows(&path) {
126 trace!("notify trigger filtered path '{path}'");
127 return Ok(());
128 }
129
130 tx.send((path, reason)).map_err(|e| anyhow::anyhow!(e))
131 }
132
133 pub fn async_watcher(
134 &self,
135 ) -> anyhow::Result<(NotifyBackend, UnboundedReceiver<DebounceEventResult>)> {
136 let (tx, rx) = unbounded_channel();
137
138 let event_handler = move |result: DebounceEventResult| {
139 if let Err(e) = tx.send(result) {
140 error!("failed to process notify event: {e}");
141 }
142 };
143
144 let timeout = Duration::from_secs(self.debounce.unwrap_or(2));
145
146 if self.backend == NotifyBackendType::Recommended {
147 let debouncer = new_debouncer(timeout, None, event_handler)?;
148
149 Ok((NotifyBackend::Recommended(debouncer), rx))
150 } else {
151 let debouncer = new_debouncer_opt::<_, PollWatcher, NoCache>(
152 timeout,
153 None,
154 event_handler,
155 NoCache,
156 Config::default().with_poll_interval(Duration::from_secs(10)),
157 )?;
158
159 Ok((NotifyBackend::Polling(debouncer), rx))
160 }
161 }
162
163 pub async fn watcher(&self, tx: UnboundedSender<(String, EventKind)>) -> anyhow::Result<()> {
164 let (mut watcher, mut rx) = self.async_watcher()?;
165
166 for path in &self.paths {
167 let start = std::time::Instant::now();
168
169 let recursive_mode = self.recursive.unwrap_or(true);
170
171 watcher.watch(
172 path.clone(),
173 if recursive_mode {
174 RecursiveMode::Recursive
175 } else {
176 RecursiveMode::NonRecursive
177 },
178 )?;
179
180 if let NotifyBackend::Polling(_) = watcher {
181 trace!("watching '{}' took: {:?}", path, start.elapsed());
182 }
183 }
184
185 while let Some(result) = rx.recv().await {
186 match result {
187 Ok(events) => {
188 for debounced_event in events {
189 let kind = debounced_event.event.kind;
190
191 match kind {
192 EventKind::Access(AccessKind::Close(AccessMode::Write))
193 | EventKind::Modify(
194 ModifyKind::Metadata(_) | ModifyKind::Name(RenameMode::Both),
195 )
196 | EventKind::Create(_)
197 | EventKind::Remove(_) => {
198 for path in debounced_event.event.paths {
199 self.send_event(tx.clone(), Some(&path), kind)?;
200 }
201 }
202 _ => {}
203 }
204 }
205 }
206 Err(errors) => {
207 for error in errors {
208 error!("failed to process notify event: {error}");
209 }
210 }
211 }
212 }
213
214 Ok(())
215 }
216}