autopulse_service/settings/targets/
fileflows.rs1use super::RequestBuilderPerform;
2use crate::settings::rewrite::Rewrite;
3use crate::settings::targets::TargetProcess;
4use anyhow::Context;
5use autopulse_database::models::ScanEvent;
6use autopulse_utils::{get_url, what_is, PathType};
7use reqwest::header;
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10use std::{collections::HashMap, path::Path};
11use tracing::{debug, error, trace};
12
13#[derive(Deserialize, Clone)]
14pub struct FileFlows {
15 pub url: String,
17 pub rewrite: Option<Rewrite>,
19}
20
21#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Hash, Debug)]
22#[doc(hidden)]
23#[serde(rename_all = "PascalCase")]
24struct FileFlowsFlow {
25 uid: String,
26}
27
28#[derive(Deserialize, Clone, Eq, PartialEq, Hash, Debug)]
29#[doc(hidden)]
30#[serde(rename_all = "PascalCase")]
31struct FileFlowsLibrary {
32 uid: String,
33 enabled: bool,
34 path: Option<String>,
35 flow: Option<FileFlowsFlow>,
36}
37
38#[derive(Serialize, Debug)]
46#[doc(hidden)]
47#[serde(rename_all = "PascalCase")]
48struct FileFlowsManuallyAddRequest {
49 flow_uid: String,
50 files: Vec<String>,
51 #[serde(default)]
52 custom_variables: HashMap<String, String>,
53}
54
55#[derive(Serialize)]
56#[doc(hidden)]
57#[serde(rename_all = "PascalCase")]
58struct FileFlowsSearchRequest {
59 path: String,
60 limit: u32, }
62
63#[derive(Serialize, Default, Debug)]
64#[doc(hidden)]
65#[serde(rename_all = "PascalCase")]
66struct FileFlowsReprocessRequest {
67 uids: Vec<String>,
68 custom_variables: HashMap<String, String>,
69 mode: u8,
70 flow: Option<Value>,
71 node: Option<Value>,
72 bottom_of_queue: bool,
73}
74
75#[derive(Deserialize, Clone, Eq, PartialEq, Hash, Debug)]
76#[doc(hidden)]
77#[serde(rename_all = "PascalCase")]
78struct FileFlowsLibraryFile {
79 uid: String,
80 flow_uid: String,
81 name: String, }
83
84impl FileFlows {
93 fn get_client(&self) -> anyhow::Result<reqwest::Client> {
94 let headers = header::HeaderMap::new();
95
96 reqwest::Client::builder()
97 .timeout(std::time::Duration::from_secs(10))
98 .default_headers(headers)
99 .build()
100 .map_err(Into::into)
101 }
102
103 async fn get_libraries(&self) -> anyhow::Result<Vec<FileFlowsLibrary>> {
104 let client = self.get_client()?;
105 let url = get_url(&self.url)?.join("api/library")?;
106
107 let res = client.get(url).perform().await?;
108
109 let libraries: Vec<FileFlowsLibrary> = res.json().await?;
110
111 Ok(libraries)
112 }
113
114 async fn get_library_file(
115 &self,
116 ev: &ScanEvent,
117 ) -> anyhow::Result<Option<FileFlowsLibraryFile>> {
118 let client = self.get_client()?;
119
120 let url = get_url(&self.url)?.join("api/library-file/search")?;
121 let req = FileFlowsSearchRequest {
122 path: ev.get_path(&self.rewrite),
123 limit: 1,
124 };
125
126 let res = client.post(url).json(&req).perform().await?;
127
128 let files: Vec<FileFlowsLibraryFile> = res.json().await?;
129
130 Ok(files.first().cloned())
131 }
132
133 async fn reprocess_library_file(&self, evs: Vec<&FileFlowsLibraryFile>) -> anyhow::Result<()> {
134 let client = self.get_client()?;
135
136 let url = get_url(&self.url)?.join("api/library-file/reprocess")?;
137
138 let req = FileFlowsReprocessRequest {
139 uids: evs.iter().map(|ev| ev.uid.clone()).collect(),
140 ..Default::default()
141 };
142
143 client.post(url).json(&req).perform().await.map(|_| ())
144 }
145
146 async fn manually_add_files(
147 &self,
148 library: &FileFlowsLibrary,
149 files: Vec<&ScanEvent>,
150 ) -> anyhow::Result<()> {
151 let client = self.get_client()?;
152
153 let url = get_url(&self.url)?.join("api/library-file/manually-add")?;
154
155 let req = FileFlowsManuallyAddRequest {
156 flow_uid: library.flow.as_ref().unwrap().uid.clone(),
157 files: files.iter().map(|ev| ev.get_path(&self.rewrite)).collect(),
158 custom_variables: HashMap::new(),
159 };
160
161 client.post(url).json(&req).perform().await.map(|_| ())
162 }
163
164 }
201
202impl TargetProcess for FileFlows {
203 async fn process(&self, evs: &[&ScanEvent]) -> anyhow::Result<Vec<String>> {
204 let mut succeeded = Vec::new();
205 let libraries = self
206 .get_libraries()
207 .await
208 .context("failed to get libraries")?;
209
210 let mut to_scan: HashMap<FileFlowsLibrary, Vec<&ScanEvent>> = HashMap::new();
211
212 for library in libraries {
213 let files = evs
214 .iter()
215 .filter_map(|ev| {
216 let ev_path = ev.get_path(&self.rewrite);
217 let ev_path = Path::new(&ev_path);
218 let lib_path = Path::new(library.path.as_deref()?);
219
220 if ev_path.starts_with(lib_path) {
221 Some(*ev)
222 } else {
223 None
224 }
225 })
226 .collect::<Vec<_>>();
227
228 if files.is_empty() {
229 continue;
230 }
231
232 if !library.enabled {
233 error!(
234 "library '{}' is disabled but {} files will fail to scan",
235 library.uid,
236 files.len()
237 );
238 continue;
239 }
240
241 to_scan.insert(library, files);
242 }
243
244 for (library, evs) in to_scan {
245 let mut library_files = HashMap::new();
246
247 for ev in evs {
248 let what_is_path = what_is(ev.get_path(&self.rewrite));
249 if matches!(what_is_path, PathType::Directory) {
250 succeeded.push(ev.id.clone());
251 continue;
252 }
253
254 match self.get_library_file(ev).await {
255 Ok(file) => {
256 library_files.insert(ev, file);
257 }
258 Err(e) => {
259 error!("failed to get library file: {}", e);
260 library_files.insert(ev, None);
261 }
262 }
263 }
264
265 let (processed, not_processed): (Vec<_>, Vec<_>) =
266 library_files.iter().partition(|(_, file)| file.is_some());
267
268 trace!(
269 "library {} has {} processed and {} not processed files",
270 library.uid,
271 processed.len(),
272 not_processed.len()
273 );
274
275 if !processed.is_empty() {
276 match self
277 .reprocess_library_file(
278 processed
279 .iter()
280 .filter_map(|(_, file)| file.as_ref())
281 .collect(),
282 )
283 .await
284 {
285 Ok(()) => {
286 for (ev, _) in &processed {
287 debug!("reprocessed file: {}", ev.get_path(&self.rewrite));
288 }
289 succeeded.extend(processed.iter().map(|(ev, _)| ev.id.clone()));
290 }
291 Err(e) => error!("failed to reprocess files: {}", e),
292 }
293 }
294
295 if !not_processed.is_empty() {
296 match self
297 .manually_add_files(
298 &library,
299 not_processed.iter().map(|(ev, _)| **ev).collect(),
300 )
301 .await
302 {
303 Ok(()) => {
304 for (ev, _) in ¬_processed {
305 debug!("manually added file: {}", ev.get_path(&self.rewrite));
306 }
307 succeeded.extend(not_processed.iter().map(|(ev, _)| ev.id.clone()));
308 }
309 Err(e) => error!("failed to manually add files: {}", e),
310 }
311 }
312 }
313
314 Ok(succeeded)
315 }
316}