autopulse_service/settings/triggers/
notify.rs1use crate::settings::rewrite::Rewrite;
2use crate::settings::timer::Timer;
3use autopulse_utils::regex::Regex;
4use notify_debouncer_full::{
5 new_debouncer, new_debouncer_opt,
6 notify::{
7 event::{AccessKind, AccessMode, ModifyKind, RenameMode},
8 Config, EventKind, PollWatcher, RecommendedWatcher, RecursiveMode,
9 },
10 DebounceEventResult, Debouncer, NoCache, RecommendedCache,
11};
12use serde::{Deserialize, Serialize};
13use std::{path::PathBuf, time::Duration};
14use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
15use tracing::{error, trace};
16
17#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Debug, Default)]
18pub enum NotifyBackendType {
19 #[serde(rename = "recommended")]
20 #[default]
22 Recommended,
23 #[serde(rename = "polling")]
24 Polling,
26}
27
28#[doc(hidden)]
29pub enum NotifyBackend {
30 Recommended(Debouncer<RecommendedWatcher, RecommendedCache>),
31 Polling(Debouncer<PollWatcher, NoCache>),
32}
33
34impl NotifyBackend {
35 pub fn watch(&mut self, path: String, mode: RecursiveMode) -> anyhow::Result<()> {
36 let path = std::path::Path::new(&path);
37
38 match self {
39 Self::Recommended(debouncer) => debouncer.watch(path, mode).map_err(Into::into),
40 Self::Polling(debouncer) => debouncer.watch(path, mode).map_err(Into::into),
41 }
42 }
43}
44
45#[derive(Serialize, Deserialize, Clone)]
46pub struct Notify {
47 pub paths: Vec<String>,
49 pub rewrite: Option<Rewrite>,
51 pub recursive: Option<bool>,
53 #[serde(default)]
57 pub backend: NotifyBackendType,
58 pub filters: Option<Vec<String>>,
60
61 #[serde(default)]
63 pub excludes: Vec<String>,
64 pub timer: Option<Timer>,
66 pub debounce: Option<u64>,
68}
69
70impl Notify {
71 pub fn send_event(
72 &self,
73 tx: UnboundedSender<(String, EventKind)>,
74 path: Option<&PathBuf>,
75 reason: EventKind,
76 ) -> anyhow::Result<()> {
77 if path.is_none() {
78 return Ok(());
79 }
80
81 let mut path = path.unwrap().to_string_lossy().to_string();
82
83 if let Some(ref filters) = self.filters {
84 let mut matched = false;
85
86 for regex in filters {
87 if Regex::new(regex)?.is_match(&path) {
88 matched = true;
89 break;
90 }
91 }
92
93 if !matched {
94 return Ok(());
95 }
96 }
97
98 if let Some(rewrite) = &self.rewrite {
99 path = rewrite.rewrite_path(path);
100 }
101
102 tx.send((path, reason)).map_err(|e| anyhow::anyhow!(e))
103 }
104
105 pub fn async_watcher(
106 &self,
107 ) -> anyhow::Result<(NotifyBackend, UnboundedReceiver<DebounceEventResult>)> {
108 let (tx, rx) = unbounded_channel();
109
110 let event_handler = move |result: DebounceEventResult| {
111 if let Err(e) = tx.send(result) {
112 error!("failed to process notify event: {e}");
113 }
114 };
115
116 let timeout = Duration::from_secs(self.debounce.unwrap_or(2));
117
118 if self.backend == NotifyBackendType::Recommended {
119 let debouncer = new_debouncer(timeout, None, event_handler)?;
120
121 Ok((NotifyBackend::Recommended(debouncer), rx))
122 } else {
123 let debouncer = new_debouncer_opt::<_, PollWatcher, NoCache>(
124 timeout,
125 None,
126 event_handler,
127 NoCache,
128 Config::default().with_poll_interval(Duration::from_secs(10)),
129 )?;
130
131 Ok((NotifyBackend::Polling(debouncer), rx))
132 }
133 }
134
135 pub async fn watcher(&self, tx: UnboundedSender<(String, EventKind)>) -> anyhow::Result<()> {
136 let (mut watcher, mut rx) = self.async_watcher()?;
137
138 for path in &self.paths {
139 let start = std::time::Instant::now();
140
141 let recursive_mode = self.recursive.unwrap_or(true);
142
143 watcher.watch(
144 path.clone(),
145 if recursive_mode {
146 RecursiveMode::Recursive
147 } else {
148 RecursiveMode::NonRecursive
149 },
150 )?;
151
152 if let NotifyBackend::Polling(_) = watcher {
153 trace!("watching '{}' took: {:?}", path, start.elapsed());
154 }
155 }
156
157 while let Some(result) = rx.recv().await {
158 match result {
159 Ok(events) => {
160 for debounced_event in events {
161 let kind = debounced_event.event.kind;
162
163 match kind {
164 EventKind::Access(AccessKind::Close(AccessMode::Write))
165 | EventKind::Modify(
166 ModifyKind::Metadata(_) | ModifyKind::Name(RenameMode::Both),
167 )
168 | EventKind::Create(_)
169 | EventKind::Remove(_) => {
170 for path in debounced_event.event.paths {
171 self.send_event(tx.clone(), Some(&path), kind)?;
172 }
173 }
174 _ => {}
175 }
176 }
177 }
178 Err(errors) => {
179 for error in errors {
180 error!("failed to process notify event: {error}");
181 }
182 }
183 }
184 }
185
186 Ok(())
187 }
188}