autopulse_service/settings/targets/
fileflows.rs

1use super::{Request, RequestBuilderPerform};
2use crate::settings::rewrite::Rewrite;
3use crate::settings::targets::TargetProcess;
4use anyhow::Context;
5use autopulse_database::models::ScanEvent;
6use autopulse_utils::{get_url, what_is, PathType};
7use reqwest::header;
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10use std::{collections::HashMap, path::Path};
11use tracing::{debug, error, trace};
12
13#[derive(Serialize, Deserialize, Clone)]
14pub struct FileFlows {
15    /// URL to the `FileFlows` server
16    pub url: String,
17    /// Rewrite path for the file
18    pub rewrite: Option<Rewrite>,
19    /// HTTP request options
20    #[serde(default)]
21    pub request: Request,
22}
23
24#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Hash, Debug)]
25#[doc(hidden)]
26#[serde(rename_all = "PascalCase")]
27struct FileFlowsFlow {
28    uid: String,
29}
30
31#[derive(Deserialize, Clone, Eq, PartialEq, Hash, Debug)]
32#[doc(hidden)]
33#[serde(rename_all = "PascalCase")]
34struct FileFlowsLibrary {
35    uid: String,
36    enabled: bool,
37    path: Option<String>,
38    flow: Option<FileFlowsFlow>,
39}
40
41// #[derive(Serialize)]
42// #[doc(hidden)]
43// #[serde(rename_all = "PascalCase")]
44// struct FileFlowsRescanLibraryRequest {
45//     uids: Vec<String>,
46// }
47
48#[derive(Serialize, Debug)]
49#[doc(hidden)]
50#[serde(rename_all = "PascalCase")]
51struct FileFlowsManuallyAddRequest {
52    flow_uid: String,
53    files: Vec<String>,
54    #[serde(default)]
55    custom_variables: HashMap<String, String>,
56}
57
58#[derive(Serialize)]
59#[doc(hidden)]
60#[serde(rename_all = "PascalCase")]
61struct FileFlowsSearchRequest {
62    path: String,
63    limit: u32, // set to 1
64}
65
66#[derive(Serialize, Default, Debug)]
67#[doc(hidden)]
68#[serde(rename_all = "PascalCase")]
69struct FileFlowsReprocessRequest {
70    uids: Vec<String>,
71    custom_variables: HashMap<String, String>,
72    mode: u8,
73    flow: Option<Value>,
74    node: Option<Value>,
75    bottom_of_queue: bool,
76}
77
78#[derive(Deserialize, Clone, Eq, PartialEq, Hash, Debug)]
79#[doc(hidden)]
80#[serde(rename_all = "PascalCase")]
81struct FileFlowsLibraryFile {
82    uid: String,
83    flow_uid: String,
84    name: String, // filename, maybe use output_path later..
85}
86
87// How to "scan" a file in fileflows
88// First, get the libraries
89// Group files with their library
90// if the library disabled- error
91// Next get each file and check their status
92// If they are processed, send a reprocess request individually
93// For the rest, send a manual-add request, again still in a group with their library
94
95impl FileFlows {
96    fn get_client(&self) -> anyhow::Result<reqwest::Client> {
97        self.request
98            .client_builder(header::HeaderMap::new())
99            .build()
100            .map_err(Into::into)
101    }
102
103    async fn get_libraries(&self) -> anyhow::Result<Vec<FileFlowsLibrary>> {
104        let client = self.get_client()?;
105        let url = get_url(&self.url)?.join("api/library")?;
106
107        let res = client.get(url).perform().await?;
108
109        let libraries: Vec<FileFlowsLibrary> = res.json().await?;
110
111        Ok(libraries)
112    }
113
114    async fn get_library_file(
115        &self,
116        ev: &ScanEvent,
117    ) -> anyhow::Result<Option<FileFlowsLibraryFile>> {
118        let client = self.get_client()?;
119
120        let url = get_url(&self.url)?.join("api/library-file/search")?;
121        let req = FileFlowsSearchRequest {
122            path: ev.get_path(&self.rewrite),
123            limit: 1,
124        };
125
126        let res = client.post(url).json(&req).perform().await?;
127
128        let files: Vec<FileFlowsLibraryFile> = res.json().await?;
129
130        Ok(files.first().cloned())
131    }
132
133    async fn reprocess_library_file(&self, evs: Vec<&FileFlowsLibraryFile>) -> anyhow::Result<()> {
134        let client = self.get_client()?;
135
136        let url = get_url(&self.url)?.join("api/library-file/reprocess")?;
137
138        let req = FileFlowsReprocessRequest {
139            uids: evs.iter().map(|ev| ev.uid.clone()).collect(),
140            ..Default::default()
141        };
142
143        client.post(url).json(&req).perform().await.map(|_| ())
144    }
145
146    async fn manually_add_files(
147        &self,
148        library: &FileFlowsLibrary,
149        files: Vec<&ScanEvent>,
150    ) -> anyhow::Result<()> {
151        let client = self.get_client()?;
152
153        let url = get_url(&self.url)?.join("api/library-file/manually-add")?;
154
155        let req = FileFlowsManuallyAddRequest {
156            flow_uid: library.flow.as_ref().unwrap().uid.clone(),
157            files: files.iter().map(|ev| ev.get_path(&self.rewrite)).collect(),
158            custom_variables: HashMap::new(),
159        };
160
161        client.post(url).json(&req).perform().await.map(|_| ())
162    }
163
164    // async fn rescan_library(&self, libraries: &FileFlowsLibrary) -> anyhow::Result<()> {
165    //     let client = self.get_client()?;
166
167    //     let url = get_url(&self.url)?.join("/api/library/rescan")?;
168
169    //     let req = FileFlowsRescanLibraryRequest {
170    //         uids: vec![libraries.uid.clone()],
171    //     };
172
173    //     let res = client.put(url.to_string()).json(&req).send().await?;
174
175    //     if res.status().is_success() {
176    //         Ok(())
177    //     } else {
178    //         let body = res.text().await?;
179    //         Err(anyhow::anyhow!("failed to send rescan: {}", body))
180    //     }
181    // }
182
183    // No longer in fileflows..
184    // async fn scan(&self, ev: &ScanEvent, library: &FileFlowsLibrary) -> anyhow::Result<()> {
185    //     let client = self.get_client()?;
186
187    //     let mut url = get_url(&self.url)?.join("/api/library-file/process-file")?;
188
189    //     url.query_pairs_mut().append_pair("filename", &ev.file_path);
190
191    //     let res = client.post(url.to_string()).send().await?;
192
193    //     if res.status().is_success() {
194    //         Ok(())
195    //     } else {
196    //         let body = res.text().await?;
197    //         Err(anyhow::anyhow!("failed to send scan: {}", body))
198    //     }
199    // }
200}
201
202impl TargetProcess for FileFlows {
203    async fn process(&self, evs: &[&ScanEvent]) -> anyhow::Result<Vec<String>> {
204        let mut succeeded = Vec::new();
205        let libraries = self
206            .get_libraries()
207            .await
208            .context("failed to get libraries")?;
209
210        let mut to_scan: HashMap<FileFlowsLibrary, Vec<&ScanEvent>> = HashMap::new();
211
212        for library in libraries {
213            let files = evs
214                .iter()
215                .filter_map(|ev| {
216                    let ev_path = ev.get_path(&self.rewrite);
217                    let ev_path = Path::new(&ev_path);
218                    let lib_path = Path::new(library.path.as_deref()?);
219
220                    if ev_path.starts_with(lib_path) {
221                        Some(*ev)
222                    } else {
223                        None
224                    }
225                })
226                .collect::<Vec<_>>();
227
228            if files.is_empty() {
229                continue;
230            }
231
232            if !library.enabled {
233                error!(
234                    "library '{}' is disabled but {} files will fail to scan",
235                    library.uid,
236                    files.len()
237                );
238                continue;
239            }
240
241            to_scan.insert(library, files);
242        }
243
244        for (library, evs) in to_scan {
245            let mut library_files = HashMap::new();
246
247            for ev in evs {
248                let what_is_path = what_is(ev.get_path(&self.rewrite));
249                if matches!(what_is_path, PathType::Directory) {
250                    succeeded.push(ev.id.clone());
251                    continue;
252                }
253
254                match self.get_library_file(ev).await {
255                    Ok(file) => {
256                        library_files.insert(ev, file);
257                    }
258                    Err(e) => {
259                        error!("failed to get library file: {}", e);
260                        library_files.insert(ev, None);
261                    }
262                }
263            }
264
265            let (processed, not_processed): (Vec<_>, Vec<_>) =
266                library_files.iter().partition(|(_, file)| file.is_some());
267
268            trace!(
269                "library {} has {} processed and {} not processed files",
270                library.uid,
271                processed.len(),
272                not_processed.len()
273            );
274
275            if !processed.is_empty() {
276                match self
277                    .reprocess_library_file(
278                        processed
279                            .iter()
280                            .filter_map(|(_, file)| file.as_ref())
281                            .collect(),
282                    )
283                    .await
284                {
285                    Ok(()) => {
286                        for (ev, _) in &processed {
287                            debug!("reprocessed file: {}", ev.get_path(&self.rewrite));
288                        }
289                        succeeded.extend(processed.iter().map(|(ev, _)| ev.id.clone()));
290                    }
291                    Err(e) => error!("failed to reprocess files: {}", e),
292                }
293            }
294
295            if !not_processed.is_empty() {
296                match self
297                    .manually_add_files(
298                        &library,
299                        not_processed.iter().map(|(ev, _)| **ev).collect(),
300                    )
301                    .await
302                {
303                    Ok(()) => {
304                        for (ev, _) in &not_processed {
305                            debug!("manually added file: {}", ev.get_path(&self.rewrite));
306                        }
307                        succeeded.extend(not_processed.iter().map(|(ev, _)| ev.id.clone()));
308                    }
309                    Err(e) => error!("failed to manually add files: {}", e),
310                }
311            }
312        }
313
314        Ok(succeeded)
315    }
316}