autopulse_service/settings/triggers/
notify.rs

1use crate::settings::rewrite::Rewrite;
2use crate::settings::timer::Timer;
3use autopulse_utils::regex::Regex;
4use notify::{
5    event::{ModifyKind, RenameMode},
6    Config, Event, EventKind, PollWatcher, RecommendedWatcher, RecursiveMode, Watcher,
7};
8use serde::{Deserialize, Serialize};
9use std::{path::PathBuf, time::Duration};
10use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
11use tracing::{error, trace};
12
13#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Debug)]
14pub enum NotifyBackendType {
15    #[serde(rename = "recommended")]
16    /// Uses the recommended backend such as `inotify` on Linux, `FSEvents` on macOS, and `ReadDirectoryChangesW` on Windows
17    Recommended,
18    #[serde(rename = "polling")]
19    /// Uses a polling backend (useful for rclone/nfs/etc mounts), which will be extremely inefficient with a high number of files
20    Polling,
21}
22
23#[doc(hidden)]
24pub enum NotifyBackend {
25    Recommended(RecommendedWatcher),
26    Polling(PollWatcher),
27}
28
29impl NotifyBackend {
30    pub fn watch(&mut self, path: String, mode: RecursiveMode) -> anyhow::Result<()> {
31        match self {
32            Self::Recommended(watcher) => watcher.watch(path.as_ref(), mode).map_err(Into::into),
33            Self::Polling(watcher) => watcher.watch(path.as_ref(), mode).map_err(Into::into),
34        }
35    }
36}
37
38impl Default for NotifyBackendType {
39    fn default() -> Self {
40        Self::Recommended
41    }
42}
43
44#[derive(Serialize, Deserialize, Clone)]
45pub struct Notify {
46    /// Paths to monitor
47    pub paths: Vec<String>,
48    /// Rewrite path
49    pub rewrite: Option<Rewrite>,
50    /// Recursive monitoring (default: true)
51    pub recursive: Option<bool>,
52    /// Backend to use
53    /// - `recommended`: Uses the recommended backend such as `inotify` on Linux, `FSEvents` on macOS, and `ReadDirectoryChangesW` on Windows
54    /// - `polling`: Uses a polling backend (useful for rclone/nfs/etc mounts), which will be extremely inefficient with a high number of files
55    #[serde(default)]
56    pub backend: NotifyBackendType,
57    /// Filter by regex
58    pub filters: Option<Vec<String>>,
59
60    /// Targets to exclude
61    #[serde(default)]
62    pub excludes: Vec<String>,
63    /// Timer
64    pub timer: Option<Timer>,
65}
66
67impl Notify {
68    pub fn send_event(
69        &self,
70        tx: UnboundedSender<(String, EventKind)>,
71        path: Option<&PathBuf>,
72        reason: EventKind,
73    ) -> anyhow::Result<()> {
74        if path.is_none() {
75            return Ok(());
76        }
77
78        let mut path = path.unwrap().to_string_lossy().to_string();
79
80        if let Some(ref filters) = self.filters {
81            let mut matched = false;
82
83            for regex in filters {
84                if Regex::new(regex)?.is_match(&path) {
85                    matched = true;
86                    break;
87                }
88            }
89
90            if !matched {
91                return Ok(());
92            }
93        }
94
95        if let Some(rewrite) = &self.rewrite {
96            path = rewrite.rewrite_path(path);
97        }
98
99        tx.send((path, reason)).map_err(|e| anyhow::anyhow!(e))
100    }
101
102    pub fn async_watcher(
103        &self,
104    ) -> anyhow::Result<(NotifyBackend, UnboundedReceiver<notify::Result<Event>>)> {
105        let (tx, rx) = unbounded_channel();
106
107        let event_handler = move |res| {
108            if let Err(e) = tx.send(res) {
109                error!("failed to process notify event: {e}");
110            }
111        };
112
113        if self.backend == NotifyBackendType::Recommended {
114            let watcher = RecommendedWatcher::new(event_handler, Config::default())?;
115
116            Ok((NotifyBackend::Recommended(watcher), rx))
117        } else {
118            let watcher = PollWatcher::new(
119                event_handler,
120                Config::default().with_poll_interval(Duration::from_secs(10)),
121            )?;
122
123            // watcher.poll()?;
124
125            Ok((NotifyBackend::Polling(watcher), rx))
126        }
127    }
128
129    pub async fn watcher(&self, tx: UnboundedSender<(String, EventKind)>) -> anyhow::Result<()> {
130        let (mut watcher, mut rx) = self.async_watcher()?;
131
132        for path in &self.paths {
133            let start = std::time::Instant::now();
134
135            let recursive_mode = self.recursive.unwrap_or(true);
136
137            watcher.watch(
138                path.clone(),
139                if recursive_mode {
140                    RecursiveMode::Recursive
141                } else {
142                    RecursiveMode::NonRecursive
143                },
144            )?;
145
146            if let NotifyBackend::Polling(_) = watcher {
147                trace!("watching '{}' took: {:?}", path, start.elapsed());
148            }
149        }
150
151        while let Some(res) = rx.recv().await {
152            match res {
153                Ok(event) => match event.kind {
154                    EventKind::Modify(
155                        ModifyKind::Data(_)
156                        | ModifyKind::Metadata(_)
157                        | ModifyKind::Name(RenameMode::Both),
158                    )
159                    | EventKind::Create(_)
160                    | EventKind::Remove(_) => {
161                        for path in event.paths {
162                            self.send_event(tx.clone(), Some(&path), event.kind)?;
163                        }
164                    }
165                    _ => {}
166                },
167                Err(e) => error!("failed to process notify event: {e}"),
168            }
169        }
170
171        Ok(())
172    }
173}