autopulse_service/settings/triggers/
notify.rs1use crate::settings::rewrite::Rewrite;
2use crate::settings::timer::Timer;
3use crate::settings::triggers::TriggerConfig;
4use autopulse_utils::regex::Regex;
5use notify_debouncer_full::{
6 new_debouncer, new_debouncer_opt,
7 notify::{
8 event::{AccessKind, AccessMode, ModifyKind, RenameMode},
9 Config, EventKind, PollWatcher, RecommendedWatcher, RecursiveMode,
10 },
11 DebounceEventResult, Debouncer, NoCache, RecommendedCache,
12};
13use serde::{Deserialize, Serialize};
14use std::{path::PathBuf, time::Duration};
15use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
16use tracing::{error, trace};
17
18#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Debug, Default)]
19pub enum NotifyBackendType {
20 #[serde(rename = "recommended")]
21 #[default]
23 Recommended,
24 #[serde(rename = "polling")]
25 Polling,
27}
28
29#[doc(hidden)]
30pub enum NotifyBackend {
31 Recommended(Debouncer<RecommendedWatcher, RecommendedCache>),
32 Polling(Debouncer<PollWatcher, NoCache>),
33}
34
35impl NotifyBackend {
36 pub fn watch(&mut self, path: String, mode: RecursiveMode) -> anyhow::Result<()> {
37 let path = std::path::Path::new(&path);
38
39 match self {
40 Self::Recommended(debouncer) => debouncer.watch(path, mode).map_err(Into::into),
41 Self::Polling(debouncer) => debouncer.watch(path, mode).map_err(Into::into),
42 }
43 }
44}
45
46#[derive(Serialize, Deserialize, Clone)]
47pub struct Notify {
48 pub paths: Vec<String>,
50 pub rewrite: Option<Rewrite>,
52 pub recursive: Option<bool>,
54 #[serde(default)]
58 pub backend: NotifyBackendType,
59 pub filters: Option<Vec<String>>,
61
62 #[serde(default)]
64 pub excludes: Vec<String>,
65 pub timer: Option<Timer>,
67 pub debounce: Option<u64>,
69}
70
71impl TriggerConfig for Notify {
72 fn rewrite(&self) -> Option<&Rewrite> {
73 self.rewrite.as_ref()
74 }
75
76 fn timer(&self) -> Option<&Timer> {
77 self.timer.as_ref()
78 }
79
80 fn excludes(&self) -> &Vec<String> {
81 &self.excludes
82 }
83}
84
85impl Notify {
86 pub fn send_event(
87 &self,
88 tx: UnboundedSender<(String, EventKind)>,
89 path: Option<&PathBuf>,
90 reason: EventKind,
91 ) -> anyhow::Result<()> {
92 if path.is_none() {
93 return Ok(());
94 }
95
96 let mut path = path.unwrap().to_string_lossy().to_string();
97
98 if let Some(ref filters) = self.filters {
99 let mut matched = false;
100
101 for regex in filters {
102 if Regex::new(regex)?.is_match(&path) {
103 matched = true;
104 break;
105 }
106 }
107
108 if !matched {
109 return Ok(());
110 }
111 }
112
113 if let Some(rewrite) = &self.rewrite {
114 path = rewrite.rewrite_path(path);
115 }
116
117 tx.send((path, reason)).map_err(|e| anyhow::anyhow!(e))
118 }
119
120 pub fn async_watcher(
121 &self,
122 ) -> anyhow::Result<(NotifyBackend, UnboundedReceiver<DebounceEventResult>)> {
123 let (tx, rx) = unbounded_channel();
124
125 let event_handler = move |result: DebounceEventResult| {
126 if let Err(e) = tx.send(result) {
127 error!("failed to process notify event: {e}");
128 }
129 };
130
131 let timeout = Duration::from_secs(self.debounce.unwrap_or(2));
132
133 if self.backend == NotifyBackendType::Recommended {
134 let debouncer = new_debouncer(timeout, None, event_handler)?;
135
136 Ok((NotifyBackend::Recommended(debouncer), rx))
137 } else {
138 let debouncer = new_debouncer_opt::<_, PollWatcher, NoCache>(
139 timeout,
140 None,
141 event_handler,
142 NoCache,
143 Config::default().with_poll_interval(Duration::from_secs(10)),
144 )?;
145
146 Ok((NotifyBackend::Polling(debouncer), rx))
147 }
148 }
149
150 pub async fn watcher(&self, tx: UnboundedSender<(String, EventKind)>) -> anyhow::Result<()> {
151 let (mut watcher, mut rx) = self.async_watcher()?;
152
153 for path in &self.paths {
154 let start = std::time::Instant::now();
155
156 let recursive_mode = self.recursive.unwrap_or(true);
157
158 watcher.watch(
159 path.clone(),
160 if recursive_mode {
161 RecursiveMode::Recursive
162 } else {
163 RecursiveMode::NonRecursive
164 },
165 )?;
166
167 if let NotifyBackend::Polling(_) = watcher {
168 trace!("watching '{}' took: {:?}", path, start.elapsed());
169 }
170 }
171
172 while let Some(result) = rx.recv().await {
173 match result {
174 Ok(events) => {
175 for debounced_event in events {
176 let kind = debounced_event.event.kind;
177
178 match kind {
179 EventKind::Access(AccessKind::Close(AccessMode::Write))
180 | EventKind::Modify(
181 ModifyKind::Metadata(_) | ModifyKind::Name(RenameMode::Both),
182 )
183 | EventKind::Create(_)
184 | EventKind::Remove(_) => {
185 for path in debounced_event.event.paths {
186 self.send_event(tx.clone(), Some(&path), kind)?;
187 }
188 }
189 _ => {}
190 }
191 }
192 }
193 Err(errors) => {
194 for error in errors {
195 error!("failed to process notify event: {error}");
196 }
197 }
198 }
199 }
200
201 Ok(())
202 }
203}