autopulse_service/settings/triggers/
notify.rs1use crate::settings::rewrite::Rewrite;
2use crate::settings::timer::Timer;
3use autopulse_utils::regex::Regex;
4use notify::{
5 event::{ModifyKind, RenameMode},
6 Config, Event, EventKind, PollWatcher, RecommendedWatcher, RecursiveMode, Watcher,
7};
8use serde::Deserialize;
9use std::{path::PathBuf, time::Duration};
10use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
11use tracing::{error, trace};
12
13#[derive(Deserialize, Clone, Eq, PartialEq, Debug)]
14pub enum NotifyBackendType {
15 #[serde(rename = "recommended")]
16 Recommended,
18 #[serde(rename = "polling")]
19 Polling,
21}
22
23#[doc(hidden)]
24pub enum NotifyBackend {
25 Recommended(RecommendedWatcher),
26 Polling(PollWatcher),
27}
28
29impl NotifyBackend {
30 pub fn watch(&mut self, path: String, mode: RecursiveMode) -> anyhow::Result<()> {
31 match self {
32 Self::Recommended(watcher) => watcher.watch(path.as_ref(), mode).map_err(Into::into),
33 Self::Polling(watcher) => watcher.watch(path.as_ref(), mode).map_err(Into::into),
34 }
35 }
36}
37
38impl Default for NotifyBackendType {
39 fn default() -> Self {
40 Self::Recommended
41 }
42}
43
44#[derive(Deserialize, Clone)]
45pub struct Notify {
46 pub paths: Vec<String>,
48 pub rewrite: Option<Rewrite>,
50 pub recursive: Option<bool>,
52 #[serde(default)]
56 pub backend: NotifyBackendType,
57 pub filters: Option<Vec<String>>,
59
60 #[serde(default)]
62 pub excludes: Vec<String>,
63 #[serde(default)]
65 pub timer: Timer,
66}
67
68impl Notify {
69 pub fn send_event(
70 &self,
71 tx: UnboundedSender<(String, EventKind)>,
72 path: Option<&PathBuf>,
73 reason: EventKind,
74 ) -> anyhow::Result<()> {
75 if path.is_none() {
76 return Ok(());
77 }
78
79 let mut path = path.unwrap().to_string_lossy().to_string();
80
81 if let Some(ref filters) = self.filters {
82 let mut matched = false;
83
84 for regex in filters {
85 if Regex::new(regex)?.is_match(&path) {
86 matched = true;
87 break;
88 }
89 }
90
91 if !matched {
92 return Ok(());
93 }
94 }
95
96 if let Some(rewrite) = &self.rewrite {
97 path = rewrite.rewrite_path(path);
98 }
99
100 tx.send((path, reason)).map_err(|e| anyhow::anyhow!(e))
101 }
102
103 pub fn async_watcher(
104 &self,
105 ) -> anyhow::Result<(NotifyBackend, UnboundedReceiver<notify::Result<Event>>)> {
106 let (tx, rx) = unbounded_channel();
107
108 let event_handler = move |res| {
109 if let Err(e) = tx.send(res) {
110 error!("failed to process notify event: {e}");
111 }
112 };
113
114 if self.backend == NotifyBackendType::Recommended {
115 let watcher = RecommendedWatcher::new(event_handler, Config::default())?;
116
117 Ok((NotifyBackend::Recommended(watcher), rx))
118 } else {
119 let watcher = PollWatcher::new(
120 event_handler,
121 Config::default().with_poll_interval(Duration::from_secs(10)),
122 )?;
123
124 Ok((NotifyBackend::Polling(watcher), rx))
127 }
128 }
129
130 pub async fn watcher(&self, tx: UnboundedSender<(String, EventKind)>) -> anyhow::Result<()> {
131 let (mut watcher, mut rx) = self.async_watcher()?;
132
133 for path in &self.paths {
134 let start = std::time::Instant::now();
135
136 let recursive_mode = self.recursive.unwrap_or(true);
137
138 watcher.watch(
139 path.clone(),
140 if recursive_mode {
141 RecursiveMode::Recursive
142 } else {
143 RecursiveMode::NonRecursive
144 },
145 )?;
146
147 if let NotifyBackend::Polling(_) = watcher {
148 trace!("watching '{}' took: {:?}", path, start.elapsed());
149 }
150 }
151
152 while let Some(res) = rx.recv().await {
153 match res {
154 Ok(event) => match event.kind {
155 EventKind::Modify(
156 ModifyKind::Data(_)
157 | ModifyKind::Metadata(_)
158 | ModifyKind::Name(RenameMode::Both),
159 )
160 | EventKind::Create(_)
161 | EventKind::Remove(_) => {
162 for path in event.paths {
163 self.send_event(tx.clone(), Some(&path), event.kind)?;
164 }
165 }
166 _ => {}
167 },
168 Err(e) => error!("failed to process notify event: {e}"),
169 }
170 }
171
172 Ok(())
173 }
174}