autopulse_service/settings/triggers/
notify.rs1use crate::settings::rewrite::Rewrite;
2use crate::settings::timer::Timer;
3use autopulse_utils::regex::Regex;
4use notify::{
5 event::{ModifyKind, RenameMode},
6 Config, Event, EventKind, PollWatcher, RecommendedWatcher, RecursiveMode, Watcher,
7};
8use serde::{Deserialize, Serialize};
9use std::{path::PathBuf, time::Duration};
10use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
11use tracing::{error, trace};
12
13#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Debug)]
14pub enum NotifyBackendType {
15 #[serde(rename = "recommended")]
16 Recommended,
18 #[serde(rename = "polling")]
19 Polling,
21}
22
23#[doc(hidden)]
24pub enum NotifyBackend {
25 Recommended(RecommendedWatcher),
26 Polling(PollWatcher),
27}
28
29impl NotifyBackend {
30 pub fn watch(&mut self, path: String, mode: RecursiveMode) -> anyhow::Result<()> {
31 match self {
32 Self::Recommended(watcher) => watcher.watch(path.as_ref(), mode).map_err(Into::into),
33 Self::Polling(watcher) => watcher.watch(path.as_ref(), mode).map_err(Into::into),
34 }
35 }
36}
37
38impl Default for NotifyBackendType {
39 fn default() -> Self {
40 Self::Recommended
41 }
42}
43
44#[derive(Serialize, Deserialize, Clone)]
45pub struct Notify {
46 pub paths: Vec<String>,
48 pub rewrite: Option<Rewrite>,
50 pub recursive: Option<bool>,
52 #[serde(default)]
56 pub backend: NotifyBackendType,
57 pub filters: Option<Vec<String>>,
59
60 #[serde(default)]
62 pub excludes: Vec<String>,
63 pub timer: Option<Timer>,
65}
66
67impl Notify {
68 pub fn send_event(
69 &self,
70 tx: UnboundedSender<(String, EventKind)>,
71 path: Option<&PathBuf>,
72 reason: EventKind,
73 ) -> anyhow::Result<()> {
74 if path.is_none() {
75 return Ok(());
76 }
77
78 let mut path = path.unwrap().to_string_lossy().to_string();
79
80 if let Some(ref filters) = self.filters {
81 let mut matched = false;
82
83 for regex in filters {
84 if Regex::new(regex)?.is_match(&path) {
85 matched = true;
86 break;
87 }
88 }
89
90 if !matched {
91 return Ok(());
92 }
93 }
94
95 if let Some(rewrite) = &self.rewrite {
96 path = rewrite.rewrite_path(path);
97 }
98
99 tx.send((path, reason)).map_err(|e| anyhow::anyhow!(e))
100 }
101
102 pub fn async_watcher(
103 &self,
104 ) -> anyhow::Result<(NotifyBackend, UnboundedReceiver<notify::Result<Event>>)> {
105 let (tx, rx) = unbounded_channel();
106
107 let event_handler = move |res| {
108 if let Err(e) = tx.send(res) {
109 error!("failed to process notify event: {e}");
110 }
111 };
112
113 if self.backend == NotifyBackendType::Recommended {
114 let watcher = RecommendedWatcher::new(event_handler, Config::default())?;
115
116 Ok((NotifyBackend::Recommended(watcher), rx))
117 } else {
118 let watcher = PollWatcher::new(
119 event_handler,
120 Config::default().with_poll_interval(Duration::from_secs(10)),
121 )?;
122
123 Ok((NotifyBackend::Polling(watcher), rx))
126 }
127 }
128
129 pub async fn watcher(&self, tx: UnboundedSender<(String, EventKind)>) -> anyhow::Result<()> {
130 let (mut watcher, mut rx) = self.async_watcher()?;
131
132 for path in &self.paths {
133 let start = std::time::Instant::now();
134
135 let recursive_mode = self.recursive.unwrap_or(true);
136
137 watcher.watch(
138 path.clone(),
139 if recursive_mode {
140 RecursiveMode::Recursive
141 } else {
142 RecursiveMode::NonRecursive
143 },
144 )?;
145
146 if let NotifyBackend::Polling(_) = watcher {
147 trace!("watching '{}' took: {:?}", path, start.elapsed());
148 }
149 }
150
151 while let Some(res) = rx.recv().await {
152 match res {
153 Ok(event) => match event.kind {
154 EventKind::Modify(
155 ModifyKind::Data(_)
156 | ModifyKind::Metadata(_)
157 | ModifyKind::Name(RenameMode::Both),
158 )
159 | EventKind::Create(_)
160 | EventKind::Remove(_) => {
161 for path in event.paths {
162 self.send_event(tx.clone(), Some(&path), event.kind)?;
163 }
164 }
165 _ => {}
166 },
167 Err(e) => error!("failed to process notify event: {e}"),
168 }
169 }
170
171 Ok(())
172 }
173}