autopulse_service/settings/targets/
fileflows.rs1use super::{Request, RequestBuilderPerform};
2use crate::settings::path_filter::PathFilter;
3use crate::settings::rewrite::Rewrite;
4use crate::settings::targets::TargetProcess;
5use anyhow::Context;
6use autopulse_database::models::ScanEvent;
7use autopulse_utils::{get_url, what_is, PathType};
8use reqwest::header;
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use std::{collections::HashMap, path::Path};
12use tracing::{debug, error, trace};
13
14#[derive(Serialize, Deserialize, Clone)]
15pub struct FileFlows {
16 pub url: String,
18 pub rewrite: Option<Rewrite>,
20 #[serde(default)]
22 pub filter: PathFilter,
23 #[serde(default)]
25 pub request: Request,
26}
27
28#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Hash, Debug)]
29#[doc(hidden)]
30#[serde(rename_all = "PascalCase")]
31struct FileFlowsFlow {
32 uid: String,
33}
34
35#[derive(Deserialize, Clone, Eq, PartialEq, Hash, Debug)]
36#[doc(hidden)]
37#[serde(rename_all = "PascalCase")]
38struct FileFlowsLibrary {
39 uid: String,
40 enabled: bool,
41 path: Option<String>,
42 flow: Option<FileFlowsFlow>,
43}
44
45#[derive(Serialize, Debug)]
53#[doc(hidden)]
54#[serde(rename_all = "PascalCase")]
55struct FileFlowsManuallyAddRequest {
56 flow_uid: String,
57 files: Vec<String>,
58 #[serde(default)]
59 custom_variables: HashMap<String, String>,
60}
61
62#[derive(Serialize)]
63#[doc(hidden)]
64#[serde(rename_all = "PascalCase")]
65struct FileFlowsSearchRequest {
66 path: String,
67 limit: u32, }
69
70#[derive(Serialize, Default, Debug)]
71#[doc(hidden)]
72#[serde(rename_all = "PascalCase")]
73struct FileFlowsReprocessRequest {
74 uids: Vec<String>,
75 custom_variables: HashMap<String, String>,
76 mode: u8,
77 flow: Option<Value>,
78 node: Option<Value>,
79 bottom_of_queue: bool,
80}
81
82#[derive(Deserialize, Clone, Eq, PartialEq, Hash, Debug)]
83#[doc(hidden)]
84#[serde(rename_all = "PascalCase")]
85struct FileFlowsLibraryFile {
86 uid: String,
87 flow_uid: String,
88 name: String, }
90
91impl FileFlows {
100 fn get_client(&self) -> anyhow::Result<reqwest::Client> {
101 self.request
102 .client_builder(header::HeaderMap::new())
103 .build()
104 .map_err(Into::into)
105 }
106
107 async fn get_libraries(&self) -> anyhow::Result<Vec<FileFlowsLibrary>> {
108 let client = self.get_client()?;
109 let url = get_url(&self.url)?.join("api/library")?;
110
111 let res = client.get(url).perform().await?;
112
113 let libraries: Vec<FileFlowsLibrary> = res.json().await?;
114
115 Ok(libraries)
116 }
117
118 async fn get_library_file(
119 &self,
120 ev: &ScanEvent,
121 ) -> anyhow::Result<Option<FileFlowsLibraryFile>> {
122 let client = self.get_client()?;
123
124 let url = get_url(&self.url)?.join("api/library-file/search")?;
125 let req = FileFlowsSearchRequest {
126 path: ev.get_path(&self.rewrite),
127 limit: 1,
128 };
129
130 let res = client.post(url).json(&req).perform().await?;
131
132 let files: Vec<FileFlowsLibraryFile> = res.json().await?;
133
134 Ok(files.first().cloned())
135 }
136
137 async fn reprocess_library_file(&self, evs: Vec<&FileFlowsLibraryFile>) -> anyhow::Result<()> {
138 let client = self.get_client()?;
139
140 let url = get_url(&self.url)?.join("api/library-file/reprocess")?;
141
142 let req = FileFlowsReprocessRequest {
143 uids: evs.iter().map(|ev| ev.uid.clone()).collect(),
144 ..Default::default()
145 };
146
147 client.post(url).json(&req).perform().await.map(|_| ())
148 }
149
150 async fn manually_add_files(
151 &self,
152 library: &FileFlowsLibrary,
153 files: Vec<&ScanEvent>,
154 ) -> anyhow::Result<()> {
155 let client = self.get_client()?;
156
157 let url = get_url(&self.url)?.join("api/library-file/manually-add")?;
158
159 let flow = library
160 .flow
161 .as_ref()
162 .context("library has no flow configured")?;
163
164 let req = FileFlowsManuallyAddRequest {
165 flow_uid: flow.uid.clone(),
166 files: files.iter().map(|ev| ev.get_path(&self.rewrite)).collect(),
167 custom_variables: HashMap::new(),
168 };
169
170 client.post(url).json(&req).perform().await.map(|_| ())
171 }
172
173 }
210
211impl TargetProcess for FileFlows {
212 async fn process(&self, evs: &[&ScanEvent]) -> anyhow::Result<Vec<String>> {
213 let mut succeeded = Vec::new();
214 let libraries = self
215 .get_libraries()
216 .await
217 .context("failed to get libraries")?;
218
219 let mut to_scan: HashMap<FileFlowsLibrary, Vec<&ScanEvent>> = HashMap::new();
220
221 for library in libraries {
222 let files = evs
223 .iter()
224 .filter_map(|ev| {
225 let ev_path = ev.get_path(&self.rewrite);
226 let ev_path = Path::new(&ev_path);
227 let lib_path = Path::new(library.path.as_deref()?);
228
229 if ev_path.starts_with(lib_path) {
230 Some(*ev)
231 } else {
232 None
233 }
234 })
235 .collect::<Vec<_>>();
236
237 if files.is_empty() {
238 continue;
239 }
240
241 if !library.enabled {
242 error!(
243 "library '{}' is disabled but {} files will fail to scan",
244 library.uid,
245 files.len()
246 );
247 continue;
248 }
249
250 to_scan.insert(library, files);
251 }
252
253 for (library, evs) in to_scan {
254 let mut library_files = HashMap::new();
255
256 for ev in evs {
257 let what_is_path = what_is(ev.get_path(&self.rewrite));
258 if matches!(what_is_path, PathType::Directory) {
259 succeeded.push(ev.id.clone());
260 continue;
261 }
262
263 match self.get_library_file(ev).await {
264 Ok(file) => {
265 library_files.insert(ev, file);
266 }
267 Err(e) => {
268 error!("failed to get library file: {}", e);
269 library_files.insert(ev, None);
270 }
271 }
272 }
273
274 let (processed, not_processed): (Vec<_>, Vec<_>) =
275 library_files.iter().partition(|(_, file)| file.is_some());
276
277 trace!(
278 "library {} has {} processed and {} not processed files",
279 library.uid,
280 processed.len(),
281 not_processed.len()
282 );
283
284 if !processed.is_empty() {
285 match self
286 .reprocess_library_file(
287 processed
288 .iter()
289 .filter_map(|(_, file)| file.as_ref())
290 .collect(),
291 )
292 .await
293 {
294 Ok(()) => {
295 for (ev, _) in &processed {
296 debug!("reprocessed file: {}", ev.get_path(&self.rewrite));
297 }
298 succeeded.extend(processed.iter().map(|(ev, _)| ev.id.clone()));
299 }
300 Err(e) => error!("failed to reprocess files: {}", e),
301 }
302 }
303
304 if !not_processed.is_empty() {
305 match self
306 .manually_add_files(
307 &library,
308 not_processed.iter().map(|(ev, _)| **ev).collect(),
309 )
310 .await
311 {
312 Ok(()) => {
313 for (ev, _) in ¬_processed {
314 debug!("manually added file: {}", ev.get_path(&self.rewrite));
315 }
316 succeeded.extend(not_processed.iter().map(|(ev, _)| ev.id.clone()));
317 }
318 Err(e) => error!("failed to manually add files: {}", e),
319 }
320 }
321 }
322
323 Ok(succeeded)
324 }
325}