autopulse_service/settings/triggers/
notify.rs1use crate::settings::rewrite::Rewrite;
2use crate::settings::timer::Timer;
3use autopulse_utils::regex::Regex;
4use notify::{
5 event::{ModifyKind, RenameMode},
6 Config, Event, EventKind, PollWatcher, RecommendedWatcher, RecursiveMode, Watcher,
7};
8use serde::{Deserialize, Serialize};
9use std::{path::PathBuf, time::Duration};
10use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
11use tracing::{error, trace};
12
13#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Debug, Default)]
14pub enum NotifyBackendType {
15 #[serde(rename = "recommended")]
16 #[default]
18 Recommended,
19 #[serde(rename = "polling")]
20 Polling,
22}
23
24#[doc(hidden)]
25pub enum NotifyBackend {
26 Recommended(RecommendedWatcher),
27 Polling(PollWatcher),
28}
29
30impl NotifyBackend {
31 pub fn watch(&mut self, path: String, mode: RecursiveMode) -> anyhow::Result<()> {
32 match self {
33 Self::Recommended(watcher) => watcher.watch(path.as_ref(), mode).map_err(Into::into),
34 Self::Polling(watcher) => watcher.watch(path.as_ref(), mode).map_err(Into::into),
35 }
36 }
37}
38
39#[derive(Serialize, Deserialize, Clone)]
40pub struct Notify {
41 pub paths: Vec<String>,
43 pub rewrite: Option<Rewrite>,
45 pub recursive: Option<bool>,
47 #[serde(default)]
51 pub backend: NotifyBackendType,
52 pub filters: Option<Vec<String>>,
54
55 #[serde(default)]
57 pub excludes: Vec<String>,
58 pub timer: Option<Timer>,
60}
61
62impl Notify {
63 pub fn send_event(
64 &self,
65 tx: UnboundedSender<(String, EventKind)>,
66 path: Option<&PathBuf>,
67 reason: EventKind,
68 ) -> anyhow::Result<()> {
69 if path.is_none() {
70 return Ok(());
71 }
72
73 let mut path = path.unwrap().to_string_lossy().to_string();
74
75 if let Some(ref filters) = self.filters {
76 let mut matched = false;
77
78 for regex in filters {
79 if Regex::new(regex)?.is_match(&path) {
80 matched = true;
81 break;
82 }
83 }
84
85 if !matched {
86 return Ok(());
87 }
88 }
89
90 if let Some(rewrite) = &self.rewrite {
91 path = rewrite.rewrite_path(path);
92 }
93
94 tx.send((path, reason)).map_err(|e| anyhow::anyhow!(e))
95 }
96
97 pub fn async_watcher(
98 &self,
99 ) -> anyhow::Result<(NotifyBackend, UnboundedReceiver<notify::Result<Event>>)> {
100 let (tx, rx) = unbounded_channel();
101
102 let event_handler = move |res| {
103 if let Err(e) = tx.send(res) {
104 error!("failed to process notify event: {e}");
105 }
106 };
107
108 if self.backend == NotifyBackendType::Recommended {
109 let watcher = RecommendedWatcher::new(event_handler, Config::default())?;
110
111 Ok((NotifyBackend::Recommended(watcher), rx))
112 } else {
113 let watcher = PollWatcher::new(
114 event_handler,
115 Config::default().with_poll_interval(Duration::from_secs(10)),
116 )?;
117
118 Ok((NotifyBackend::Polling(watcher), rx))
121 }
122 }
123
124 pub async fn watcher(&self, tx: UnboundedSender<(String, EventKind)>) -> anyhow::Result<()> {
125 let (mut watcher, mut rx) = self.async_watcher()?;
126
127 for path in &self.paths {
128 let start = std::time::Instant::now();
129
130 let recursive_mode = self.recursive.unwrap_or(true);
131
132 watcher.watch(
133 path.clone(),
134 if recursive_mode {
135 RecursiveMode::Recursive
136 } else {
137 RecursiveMode::NonRecursive
138 },
139 )?;
140
141 if let NotifyBackend::Polling(_) = watcher {
142 trace!("watching '{}' took: {:?}", path, start.elapsed());
143 }
144 }
145
146 while let Some(res) = rx.recv().await {
147 match res {
148 Ok(event) => match event.kind {
149 EventKind::Modify(
150 ModifyKind::Data(_)
151 | ModifyKind::Metadata(_)
152 | ModifyKind::Name(RenameMode::Both),
153 )
154 | EventKind::Create(_)
155 | EventKind::Remove(_) => {
156 for path in event.paths {
157 self.send_event(tx.clone(), Some(&path), event.kind)?;
158 }
159 }
160 _ => {}
161 },
162 Err(e) => error!("failed to process notify event: {e}"),
163 }
164 }
165
166 Ok(())
167 }
168}