Skip to main content

autopulse_service/settings/targets/
fileflows.rs

1use super::{Request, RequestBuilderPerform};
2use crate::settings::path_filter::PathFilter;
3use crate::settings::rewrite::Rewrite;
4use crate::settings::targets::TargetProcess;
5use anyhow::Context;
6use autopulse_database::models::ScanEvent;
7use autopulse_utils::{get_url, what_is, PathType};
8use reqwest::header;
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use std::{collections::HashMap, path::Path};
12use tracing::{debug, error, trace};
13
14#[derive(Serialize, Deserialize, Clone)]
15pub struct FileFlows {
16    /// URL to the `FileFlows` server
17    pub url: String,
18    /// Rewrite path for the file
19    pub rewrite: Option<Rewrite>,
20    /// Path filter matched against the target-rewritten path.
21    #[serde(default)]
22    pub filter: PathFilter,
23    /// HTTP request options
24    #[serde(default)]
25    pub request: Request,
26}
27
28#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Hash, Debug)]
29#[doc(hidden)]
30#[serde(rename_all = "PascalCase")]
31struct FileFlowsFlow {
32    uid: String,
33}
34
35#[derive(Deserialize, Clone, Eq, PartialEq, Hash, Debug)]
36#[doc(hidden)]
37#[serde(rename_all = "PascalCase")]
38struct FileFlowsLibrary {
39    uid: String,
40    enabled: bool,
41    path: Option<String>,
42    flow: Option<FileFlowsFlow>,
43}
44
45// #[derive(Serialize)]
46// #[doc(hidden)]
47// #[serde(rename_all = "PascalCase")]
48// struct FileFlowsRescanLibraryRequest {
49//     uids: Vec<String>,
50// }
51
52#[derive(Serialize, Debug)]
53#[doc(hidden)]
54#[serde(rename_all = "PascalCase")]
55struct FileFlowsManuallyAddRequest {
56    flow_uid: String,
57    files: Vec<String>,
58    #[serde(default)]
59    custom_variables: HashMap<String, String>,
60}
61
62#[derive(Serialize)]
63#[doc(hidden)]
64#[serde(rename_all = "PascalCase")]
65struct FileFlowsSearchRequest {
66    path: String,
67    limit: u32, // set to 1
68}
69
70#[derive(Serialize, Default, Debug)]
71#[doc(hidden)]
72#[serde(rename_all = "PascalCase")]
73struct FileFlowsReprocessRequest {
74    uids: Vec<String>,
75    custom_variables: HashMap<String, String>,
76    mode: u8,
77    flow: Option<Value>,
78    node: Option<Value>,
79    bottom_of_queue: bool,
80}
81
82#[derive(Deserialize, Clone, Eq, PartialEq, Hash, Debug)]
83#[doc(hidden)]
84#[serde(rename_all = "PascalCase")]
85struct FileFlowsLibraryFile {
86    uid: String,
87    flow_uid: String,
88    name: String, // filename, maybe use output_path later..
89}
90
91// How to "scan" a file in fileflows
92// First, get the libraries
93// Group files with their library
94// if the library disabled- error
95// Next get each file and check their status
96// If they are processed, send a reprocess request individually
97// For the rest, send a manual-add request, again still in a group with their library
98
99impl FileFlows {
100    fn get_client(&self) -> anyhow::Result<reqwest::Client> {
101        self.request
102            .client_builder(header::HeaderMap::new())
103            .build()
104            .map_err(Into::into)
105    }
106
107    async fn get_libraries(&self) -> anyhow::Result<Vec<FileFlowsLibrary>> {
108        let client = self.get_client()?;
109        let url = get_url(&self.url)?.join("api/library")?;
110
111        let res = client.get(url).perform().await?;
112
113        let libraries: Vec<FileFlowsLibrary> = res.json().await?;
114
115        Ok(libraries)
116    }
117
118    async fn get_library_file(
119        &self,
120        ev: &ScanEvent,
121    ) -> anyhow::Result<Option<FileFlowsLibraryFile>> {
122        let client = self.get_client()?;
123
124        let url = get_url(&self.url)?.join("api/library-file/search")?;
125        let req = FileFlowsSearchRequest {
126            path: ev.get_path(&self.rewrite),
127            limit: 1,
128        };
129
130        let res = client.post(url).json(&req).perform().await?;
131
132        let files: Vec<FileFlowsLibraryFile> = res.json().await?;
133
134        Ok(files.first().cloned())
135    }
136
137    async fn reprocess_library_file(&self, evs: Vec<&FileFlowsLibraryFile>) -> anyhow::Result<()> {
138        let client = self.get_client()?;
139
140        let url = get_url(&self.url)?.join("api/library-file/reprocess")?;
141
142        let req = FileFlowsReprocessRequest {
143            uids: evs.iter().map(|ev| ev.uid.clone()).collect(),
144            ..Default::default()
145        };
146
147        client.post(url).json(&req).perform().await.map(|_| ())
148    }
149
150    async fn manually_add_files(
151        &self,
152        library: &FileFlowsLibrary,
153        files: Vec<&ScanEvent>,
154    ) -> anyhow::Result<()> {
155        let client = self.get_client()?;
156
157        let url = get_url(&self.url)?.join("api/library-file/manually-add")?;
158
159        let flow = library
160            .flow
161            .as_ref()
162            .context("library has no flow configured")?;
163
164        let req = FileFlowsManuallyAddRequest {
165            flow_uid: flow.uid.clone(),
166            files: files.iter().map(|ev| ev.get_path(&self.rewrite)).collect(),
167            custom_variables: HashMap::new(),
168        };
169
170        client.post(url).json(&req).perform().await.map(|_| ())
171    }
172
173    // async fn rescan_library(&self, libraries: &FileFlowsLibrary) -> anyhow::Result<()> {
174    //     let client = self.get_client()?;
175
176    //     let url = get_url(&self.url)?.join("/api/library/rescan")?;
177
178    //     let req = FileFlowsRescanLibraryRequest {
179    //         uids: vec![libraries.uid.clone()],
180    //     };
181
182    //     let res = client.put(url.to_string()).json(&req).send().await?;
183
184    //     if res.status().is_success() {
185    //         Ok(())
186    //     } else {
187    //         let body = res.text().await?;
188    //         Err(anyhow::anyhow!("failed to send rescan: {}", body))
189    //     }
190    // }
191
192    // No longer in fileflows..
193    // async fn scan(&self, ev: &ScanEvent, library: &FileFlowsLibrary) -> anyhow::Result<()> {
194    //     let client = self.get_client()?;
195
196    //     let mut url = get_url(&self.url)?.join("/api/library-file/process-file")?;
197
198    //     url.query_pairs_mut().append_pair("filename", &ev.file_path);
199
200    //     let res = client.post(url.to_string()).send().await?;
201
202    //     if res.status().is_success() {
203    //         Ok(())
204    //     } else {
205    //         let body = res.text().await?;
206    //         Err(anyhow::anyhow!("failed to send scan: {}", body))
207    //     }
208    // }
209}
210
211impl TargetProcess for FileFlows {
212    async fn process(&self, evs: &[&ScanEvent]) -> anyhow::Result<Vec<String>> {
213        let mut succeeded = Vec::new();
214        let libraries = self
215            .get_libraries()
216            .await
217            .context("failed to get libraries")?;
218
219        let mut to_scan: HashMap<FileFlowsLibrary, Vec<&ScanEvent>> = HashMap::new();
220
221        for library in libraries {
222            let files = evs
223                .iter()
224                .filter_map(|ev| {
225                    let ev_path = ev.get_path(&self.rewrite);
226                    let ev_path = Path::new(&ev_path);
227                    let lib_path = Path::new(library.path.as_deref()?);
228
229                    if ev_path.starts_with(lib_path) {
230                        Some(*ev)
231                    } else {
232                        None
233                    }
234                })
235                .collect::<Vec<_>>();
236
237            if files.is_empty() {
238                continue;
239            }
240
241            if !library.enabled {
242                error!(
243                    "library '{}' is disabled but {} files will fail to scan",
244                    library.uid,
245                    files.len()
246                );
247                continue;
248            }
249
250            to_scan.insert(library, files);
251        }
252
253        for (library, evs) in to_scan {
254            let mut library_files = HashMap::new();
255
256            for ev in evs {
257                let what_is_path = what_is(ev.get_path(&self.rewrite));
258                if matches!(what_is_path, PathType::Directory) {
259                    succeeded.push(ev.id.clone());
260                    continue;
261                }
262
263                match self.get_library_file(ev).await {
264                    Ok(file) => {
265                        library_files.insert(ev, file);
266                    }
267                    Err(e) => {
268                        error!("failed to get library file: {}", e);
269                        library_files.insert(ev, None);
270                    }
271                }
272            }
273
274            let (processed, not_processed): (Vec<_>, Vec<_>) =
275                library_files.iter().partition(|(_, file)| file.is_some());
276
277            trace!(
278                "library {} has {} processed and {} not processed files",
279                library.uid,
280                processed.len(),
281                not_processed.len()
282            );
283
284            if !processed.is_empty() {
285                match self
286                    .reprocess_library_file(
287                        processed
288                            .iter()
289                            .filter_map(|(_, file)| file.as_ref())
290                            .collect(),
291                    )
292                    .await
293                {
294                    Ok(()) => {
295                        for (ev, _) in &processed {
296                            debug!("reprocessed file: {}", ev.get_path(&self.rewrite));
297                        }
298                        succeeded.extend(processed.iter().map(|(ev, _)| ev.id.clone()));
299                    }
300                    Err(e) => error!("failed to reprocess files: {}", e),
301                }
302            }
303
304            if !not_processed.is_empty() {
305                match self
306                    .manually_add_files(
307                        &library,
308                        not_processed.iter().map(|(ev, _)| **ev).collect(),
309                    )
310                    .await
311                {
312                    Ok(()) => {
313                        for (ev, _) in &not_processed {
314                            debug!("manually added file: {}", ev.get_path(&self.rewrite));
315                        }
316                        succeeded.extend(not_processed.iter().map(|(ev, _)| ev.id.clone()));
317                    }
318                    Err(e) => error!("failed to manually add files: {}", e),
319                }
320            }
321        }
322
323        Ok(succeeded)
324    }
325}