autopulse_service/settings/targets/
fileflows.rs

1use super::RequestBuilderPerform;
2use crate::settings::rewrite::Rewrite;
3use crate::settings::targets::TargetProcess;
4use anyhow::Context;
5use autopulse_database::models::ScanEvent;
6use autopulse_utils::{get_url, what_is, PathType};
7use reqwest::header;
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10use std::{collections::HashMap, path::Path};
11use tracing::{debug, error, trace};
12
13#[derive(Deserialize, Clone)]
14pub struct FileFlows {
15    /// URL to the `FileFlows` server
16    pub url: String,
17    /// Rewrite path for the file
18    pub rewrite: Option<Rewrite>,
19}
20
21#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Hash, Debug)]
22#[doc(hidden)]
23#[serde(rename_all = "PascalCase")]
24struct FileFlowsFlow {
25    uid: String,
26}
27
28#[derive(Deserialize, Clone, Eq, PartialEq, Hash, Debug)]
29#[doc(hidden)]
30#[serde(rename_all = "PascalCase")]
31struct FileFlowsLibrary {
32    uid: String,
33    enabled: bool,
34    path: Option<String>,
35    flow: Option<FileFlowsFlow>,
36}
37
38// #[derive(Serialize)]
39// #[doc(hidden)]
40// #[serde(rename_all = "PascalCase")]
41// struct FileFlowsRescanLibraryRequest {
42//     uids: Vec<String>,
43// }
44
45#[derive(Serialize, Debug)]
46#[doc(hidden)]
47#[serde(rename_all = "PascalCase")]
48struct FileFlowsManuallyAddRequest {
49    flow_uid: String,
50    files: Vec<String>,
51    #[serde(default)]
52    custom_variables: HashMap<String, String>,
53}
54
55#[derive(Serialize)]
56#[doc(hidden)]
57#[serde(rename_all = "PascalCase")]
58struct FileFlowsSearchRequest {
59    path: String,
60    limit: u32, // set to 1
61}
62
63#[derive(Serialize, Default, Debug)]
64#[doc(hidden)]
65#[serde(rename_all = "PascalCase")]
66struct FileFlowsReprocessRequest {
67    uids: Vec<String>,
68    custom_variables: HashMap<String, String>,
69    mode: u8,
70    flow: Option<Value>,
71    node: Option<Value>,
72    bottom_of_queue: bool,
73}
74
75#[derive(Deserialize, Clone, Eq, PartialEq, Hash, Debug)]
76#[doc(hidden)]
77#[serde(rename_all = "PascalCase")]
78struct FileFlowsLibraryFile {
79    uid: String,
80    flow_uid: String,
81    name: String, // filename, maybe use output_path later..
82}
83
84// How to "scan" a file in fileflows
85// First, get the libraries
86// Group files with their library
87// if the library disabled- error
88// Next get each file and check their status
89// If they are processed, send a reprocess request individually
90// For the rest, send a manual-add request, again still in a group with their library
91
92impl FileFlows {
93    fn get_client(&self) -> anyhow::Result<reqwest::Client> {
94        let headers = header::HeaderMap::new();
95
96        reqwest::Client::builder()
97            .timeout(std::time::Duration::from_secs(10))
98            .default_headers(headers)
99            .build()
100            .map_err(Into::into)
101    }
102
103    async fn get_libraries(&self) -> anyhow::Result<Vec<FileFlowsLibrary>> {
104        let client = self.get_client()?;
105        let url = get_url(&self.url)?.join("api/library")?;
106
107        let res = client.get(url).perform().await?;
108
109        let libraries: Vec<FileFlowsLibrary> = res.json().await?;
110
111        Ok(libraries)
112    }
113
114    async fn get_library_file(
115        &self,
116        ev: &ScanEvent,
117    ) -> anyhow::Result<Option<FileFlowsLibraryFile>> {
118        let client = self.get_client()?;
119
120        let url = get_url(&self.url)?.join("api/library-file/search")?;
121        let req = FileFlowsSearchRequest {
122            path: ev.get_path(&self.rewrite),
123            limit: 1,
124        };
125
126        let res = client.post(url).json(&req).perform().await?;
127
128        let files: Vec<FileFlowsLibraryFile> = res.json().await?;
129
130        Ok(files.first().cloned())
131    }
132
133    async fn reprocess_library_file(&self, evs: Vec<&FileFlowsLibraryFile>) -> anyhow::Result<()> {
134        let client = self.get_client()?;
135
136        let url = get_url(&self.url)?.join("api/library-file/reprocess")?;
137
138        let req = FileFlowsReprocessRequest {
139            uids: evs.iter().map(|ev| ev.uid.clone()).collect(),
140            ..Default::default()
141        };
142
143        client.post(url).json(&req).perform().await.map(|_| ())
144    }
145
146    async fn manually_add_files(
147        &self,
148        library: &FileFlowsLibrary,
149        files: Vec<&ScanEvent>,
150    ) -> anyhow::Result<()> {
151        let client = self.get_client()?;
152
153        let url = get_url(&self.url)?.join("api/library-file/manually-add")?;
154
155        let req = FileFlowsManuallyAddRequest {
156            flow_uid: library.flow.as_ref().unwrap().uid.clone(),
157            files: files.iter().map(|ev| ev.get_path(&self.rewrite)).collect(),
158            custom_variables: HashMap::new(),
159        };
160
161        client.post(url).json(&req).perform().await.map(|_| ())
162    }
163
164    // async fn rescan_library(&self, libraries: &FileFlowsLibrary) -> anyhow::Result<()> {
165    //     let client = self.get_client()?;
166
167    //     let url = get_url(&self.url)?.join("/api/library/rescan")?;
168
169    //     let req = FileFlowsRescanLibraryRequest {
170    //         uids: vec![libraries.uid.clone()],
171    //     };
172
173    //     let res = client.put(url.to_string()).json(&req).send().await?;
174
175    //     if res.status().is_success() {
176    //         Ok(())
177    //     } else {
178    //         let body = res.text().await?;
179    //         Err(anyhow::anyhow!("failed to send rescan: {}", body))
180    //     }
181    // }
182
183    // No longer in fileflows..
184    // async fn scan(&self, ev: &ScanEvent, library: &FileFlowsLibrary) -> anyhow::Result<()> {
185    //     let client = self.get_client()?;
186
187    //     let mut url = get_url(&self.url)?.join("/api/library-file/process-file")?;
188
189    //     url.query_pairs_mut().append_pair("filename", &ev.file_path);
190
191    //     let res = client.post(url.to_string()).send().await?;
192
193    //     if res.status().is_success() {
194    //         Ok(())
195    //     } else {
196    //         let body = res.text().await?;
197    //         Err(anyhow::anyhow!("failed to send scan: {}", body))
198    //     }
199    // }
200}
201
202impl TargetProcess for FileFlows {
203    async fn process(&self, evs: &[&ScanEvent]) -> anyhow::Result<Vec<String>> {
204        let mut succeeded = Vec::new();
205        let libraries = self
206            .get_libraries()
207            .await
208            .context("failed to get libraries")?;
209
210        let mut to_scan: HashMap<FileFlowsLibrary, Vec<&ScanEvent>> = HashMap::new();
211
212        for library in libraries {
213            let files = evs
214                .iter()
215                .filter_map(|ev| {
216                    let ev_path = ev.get_path(&self.rewrite);
217                    let ev_path = Path::new(&ev_path);
218                    let lib_path = Path::new(library.path.as_deref()?);
219
220                    if ev_path.starts_with(lib_path) {
221                        Some(*ev)
222                    } else {
223                        None
224                    }
225                })
226                .collect::<Vec<_>>();
227
228            if files.is_empty() {
229                continue;
230            }
231
232            if !library.enabled {
233                error!(
234                    "library '{}' is disabled but {} files will fail to scan",
235                    library.uid,
236                    files.len()
237                );
238                continue;
239            }
240
241            to_scan.insert(library, files);
242        }
243
244        for (library, evs) in to_scan {
245            let mut library_files = HashMap::new();
246
247            for ev in evs {
248                let what_is_path = what_is(ev.get_path(&self.rewrite));
249                if matches!(what_is_path, PathType::Directory) {
250                    succeeded.push(ev.id.clone());
251                    continue;
252                }
253
254                match self.get_library_file(ev).await {
255                    Ok(file) => {
256                        library_files.insert(ev, file);
257                    }
258                    Err(e) => {
259                        error!("failed to get library file: {}", e);
260                        library_files.insert(ev, None);
261                    }
262                }
263            }
264
265            let (processed, not_processed): (Vec<_>, Vec<_>) =
266                library_files.iter().partition(|(_, file)| file.is_some());
267
268            trace!(
269                "library {} has {} processed and {} not processed files",
270                library.uid,
271                processed.len(),
272                not_processed.len()
273            );
274
275            if !processed.is_empty() {
276                match self
277                    .reprocess_library_file(
278                        processed
279                            .iter()
280                            .filter_map(|(_, file)| file.as_ref())
281                            .collect(),
282                    )
283                    .await
284                {
285                    Ok(()) => {
286                        for (ev, _) in &processed {
287                            debug!("reprocessed file: {}", ev.get_path(&self.rewrite));
288                        }
289                        succeeded.extend(processed.iter().map(|(ev, _)| ev.id.clone()));
290                    }
291                    Err(e) => error!("failed to reprocess files: {}", e),
292                }
293            }
294
295            if !not_processed.is_empty() {
296                match self
297                    .manually_add_files(
298                        &library,
299                        not_processed.iter().map(|(ev, _)| **ev).collect(),
300                    )
301                    .await
302                {
303                    Ok(()) => {
304                        for (ev, _) in &not_processed {
305                            debug!("manually added file: {}", ev.get_path(&self.rewrite));
306                        }
307                        succeeded.extend(not_processed.iter().map(|(ev, _)| ev.id.clone()));
308                    }
309                    Err(e) => error!("failed to manually add files: {}", e),
310                }
311            }
312        }
313
314        Ok(succeeded)
315    }
316}