autopulse_service/settings/triggers/
notify.rs

1use crate::settings::rewrite::Rewrite;
2use crate::settings::timer::Timer;
3use autopulse_utils::regex::Regex;
4use notify::{
5    event::{ModifyKind, RenameMode},
6    Config, Event, EventKind, PollWatcher, RecommendedWatcher, RecursiveMode, Watcher,
7};
8use serde::Deserialize;
9use std::{path::PathBuf, time::Duration};
10use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
11use tracing::{error, trace};
12
13#[derive(Deserialize, Clone, Eq, PartialEq, Debug)]
14pub enum NotifyBackendType {
15    #[serde(rename = "recommended")]
16    /// Uses the recommended backend such as `inotify` on Linux, `FSEvents` on macOS, and `ReadDirectoryChangesW` on Windows
17    Recommended,
18    #[serde(rename = "polling")]
19    /// Uses a polling backend (useful for rclone/nfs/etc mounts), which will be extremely inefficient with a high number of files
20    Polling,
21}
22
23#[doc(hidden)]
24pub enum NotifyBackend {
25    Recommended(RecommendedWatcher),
26    Polling(PollWatcher),
27}
28
29impl NotifyBackend {
30    pub fn watch(&mut self, path: String, mode: RecursiveMode) -> anyhow::Result<()> {
31        match self {
32            Self::Recommended(watcher) => watcher.watch(path.as_ref(), mode).map_err(Into::into),
33            Self::Polling(watcher) => watcher.watch(path.as_ref(), mode).map_err(Into::into),
34        }
35    }
36}
37
38impl Default for NotifyBackendType {
39    fn default() -> Self {
40        Self::Recommended
41    }
42}
43
44#[derive(Deserialize, Clone)]
45pub struct Notify {
46    /// Paths to monitor
47    pub paths: Vec<String>,
48    /// Rewrite path
49    pub rewrite: Option<Rewrite>,
50    /// Recursive monitoring (default: true)
51    pub recursive: Option<bool>,
52    /// Backend to use
53    /// - `recommended`: Uses the recommended backend such as `inotify` on Linux, `FSEvents` on macOS, and `ReadDirectoryChangesW` on Windows
54    /// - `polling`: Uses a polling backend (useful for rclone/nfs/etc mounts), which will be extremely inefficient with a high number of files
55    #[serde(default)]
56    pub backend: NotifyBackendType,
57    /// Filter by regex
58    pub filters: Option<Vec<String>>,
59
60    /// Targets to exclude
61    #[serde(default)]
62    pub excludes: Vec<String>,
63    /// Timer
64    #[serde(default)]
65    pub timer: Timer,
66}
67
68impl Notify {
69    pub fn send_event(
70        &self,
71        tx: UnboundedSender<(String, EventKind)>,
72        path: Option<&PathBuf>,
73        reason: EventKind,
74    ) -> anyhow::Result<()> {
75        if path.is_none() {
76            return Ok(());
77        }
78
79        let mut path = path.unwrap().to_string_lossy().to_string();
80
81        if let Some(ref filters) = self.filters {
82            let mut matched = false;
83
84            for regex in filters {
85                if Regex::new(regex)?.is_match(&path) {
86                    matched = true;
87                    break;
88                }
89            }
90
91            if !matched {
92                return Ok(());
93            }
94        }
95
96        if let Some(rewrite) = &self.rewrite {
97            path = rewrite.rewrite_path(path);
98        }
99
100        tx.send((path, reason)).map_err(|e| anyhow::anyhow!(e))
101    }
102
103    pub fn async_watcher(
104        &self,
105    ) -> anyhow::Result<(NotifyBackend, UnboundedReceiver<notify::Result<Event>>)> {
106        let (tx, rx) = unbounded_channel();
107
108        let event_handler = move |res| {
109            if let Err(e) = tx.send(res) {
110                error!("failed to process notify event: {e}");
111            }
112        };
113
114        if self.backend == NotifyBackendType::Recommended {
115            let watcher = RecommendedWatcher::new(event_handler, Config::default())?;
116
117            Ok((NotifyBackend::Recommended(watcher), rx))
118        } else {
119            let watcher = PollWatcher::new(
120                event_handler,
121                Config::default().with_poll_interval(Duration::from_secs(10)),
122            )?;
123
124            // watcher.poll()?;
125
126            Ok((NotifyBackend::Polling(watcher), rx))
127        }
128    }
129
130    pub async fn watcher(&self, tx: UnboundedSender<(String, EventKind)>) -> anyhow::Result<()> {
131        let (mut watcher, mut rx) = self.async_watcher()?;
132
133        for path in &self.paths {
134            let start = std::time::Instant::now();
135
136            let recursive_mode = self.recursive.unwrap_or(true);
137
138            watcher.watch(
139                path.clone(),
140                if recursive_mode {
141                    RecursiveMode::Recursive
142                } else {
143                    RecursiveMode::NonRecursive
144                },
145            )?;
146
147            if let NotifyBackend::Polling(_) = watcher {
148                trace!("watching '{}' took: {:?}", path, start.elapsed());
149            }
150        }
151
152        while let Some(res) = rx.recv().await {
153            match res {
154                Ok(event) => match event.kind {
155                    EventKind::Modify(
156                        ModifyKind::Data(_)
157                        | ModifyKind::Metadata(_)
158                        | ModifyKind::Name(RenameMode::Both),
159                    )
160                    | EventKind::Create(_)
161                    | EventKind::Remove(_) => {
162                        for path in event.paths {
163                            self.send_event(tx.clone(), Some(&path), event.kind)?;
164                        }
165                    }
166                    _ => {}
167                },
168                Err(e) => error!("failed to process notify event: {e}"),
169            }
170        }
171
172        Ok(())
173    }
174}