autopulse_service/settings/targets/
emby.rs

1use super::{Request, RequestBuilderPerform};
2use crate::settings::rewrite::Rewrite;
3use crate::settings::targets::TargetProcess;
4use anyhow::Context;
5use autopulse_database::models::ScanEvent;
6use autopulse_utils::get_url;
7use reqwest::header;
8use serde::{Deserialize, Serialize};
9use std::{collections::HashMap, fmt::Display, io::Cursor, path::Path};
10use struson::{
11    json_path,
12    reader::{JsonReader, JsonStreamReader},
13};
14use tokio::sync::mpsc::UnboundedReceiver;
15use tracing::{debug, error};
16
17#[doc(hidden)]
18const fn default_true() -> bool {
19    true
20}
21
22#[derive(Serialize, Clone, Deserialize)]
23pub struct Emby {
24    /// URL to the Jellyfin/Emby server
25    pub url: String,
26    /// API token for the Jellyfin/Emby server
27    pub token: String,
28    /// Metadata refresh mode (default: `FullRefresh`)
29    #[serde(default)]
30    pub metadata_refresh_mode: EmbyMetadataRefreshMode,
31    /// Whether to try to refresh metadata for the item instead of scan (default: true)
32    #[serde(default = "default_true")]
33    pub refresh_metadata: bool,
34    /// Rewrite path for the file
35    pub rewrite: Option<Rewrite>,
36    /// HTTP request options
37    #[serde(default)]
38    pub request: Request,
39}
40
41/// Metadata refresh mode for Jellyfin/Emby
42#[derive(Serialize, Clone, Deserialize)]
43#[serde(rename_all = "snake_case")]
44#[derive(Default)]
45pub enum EmbyMetadataRefreshMode {
46    /// `none`
47    None,
48    /// `validation_only`
49    ValidationOnly,
50    /// `default`
51    Default,
52    /// `full_refresh`
53    #[default]
54    FullRefresh,
55}
56
57impl Display for EmbyMetadataRefreshMode {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        let mode = match self {
60            Self::None => "None",
61            Self::ValidationOnly => "ValidationOnly",
62            Self::Default => "Default",
63            Self::FullRefresh => "FullRefresh",
64        };
65
66        write!(f, "{mode}")
67    }
68}
69
70#[derive(Deserialize, Clone, Eq, PartialEq, Hash)]
71#[serde(rename_all = "PascalCase")]
72#[doc(hidden)]
73struct Library {
74    #[allow(dead_code)]
75    name: String,
76    locations: Vec<String>,
77    item_id: String,
78    collection_type: Option<String>,
79}
80
81#[derive(Serialize, Clone)]
82#[serde(rename_all = "PascalCase")]
83#[doc(hidden)]
84struct UpdateRequest {
85    path: String,
86    update_type: String,
87}
88
89#[derive(Serialize, Clone)]
90#[serde(rename_all = "PascalCase")]
91#[doc(hidden)]
92struct ScanPayload {
93    updates: Vec<UpdateRequest>,
94}
95
96#[derive(Deserialize, Clone)]
97#[serde(rename_all = "PascalCase")]
98#[doc(hidden)]
99struct Item {
100    id: String,
101    path: Option<String>,
102}
103
104impl Emby {
105    fn get_client(&self) -> anyhow::Result<reqwest::Client> {
106        let mut headers = header::HeaderMap::new();
107
108        headers.insert("X-Emby-Token", self.token.parse()?);
109        headers.insert(
110            "Authorization",
111            format!("MediaBrowser Token=\"{}\"", self.token).parse()?,
112        );
113        headers.insert("Accept", "application/json".parse()?);
114
115        self.request
116            .client_builder(headers)
117            .build()
118            .map_err(Into::into)
119    }
120
121    async fn libraries(&self) -> anyhow::Result<Vec<Library>> {
122        let client = self.get_client()?;
123        let url = get_url(&self.url)?.join("Library/VirtualFolders")?;
124
125        let res = client.get(url).perform().await?;
126
127        Ok(res.json().await?)
128    }
129
130    fn get_libraries(&self, libraries: &[Library], path: &str) -> Vec<Library> {
131        let ev_path = Path::new(path);
132        let mut matched: Vec<Library> = vec![];
133
134        for library in libraries {
135            for location in &library.locations {
136                let path = Path::new(location);
137
138                if ev_path.starts_with(path) {
139                    matched.push(library.clone());
140                }
141            }
142        }
143
144        matched
145    }
146
147    async fn _get_item(&self, library: &Library, path: &str) -> anyhow::Result<Option<Item>> {
148        let client = self.get_client()?;
149        let mut url = get_url(&self.url)?.join("Items")?;
150
151        url.query_pairs_mut().append_pair("Recursive", "true");
152        url.query_pairs_mut().append_pair("Fields", "Path");
153        url.query_pairs_mut().append_pair("EnableImages", "false");
154        if let Some(collection_type) = &library.collection_type {
155            url.query_pairs_mut().append_pair(
156                "IncludeItemTypes",
157                match collection_type.as_str() {
158                    "tvshows" => "Episode",
159                    "books" => "Book",
160                    "music" => "Audio",
161                    "movie" => "VideoFile,Movie",
162                    _ => "",
163                },
164            );
165        }
166        url.query_pairs_mut()
167            .append_pair("ParentId", &library.item_id);
168        url.query_pairs_mut()
169            .append_pair("EnableTotalRecordCount", "false");
170
171        let res = client.get(url).perform().await?;
172
173        // Possibly unneeded unless we can use streams
174        let bytes = res.bytes().await?;
175
176        let mut json_reader = JsonStreamReader::new(Cursor::new(bytes));
177
178        json_reader.seek_to(&json_path!["Items"])?;
179        json_reader.begin_array()?;
180
181        while json_reader.has_next()? {
182            let item: Item = json_reader.deserialize_next()?;
183
184            if item.path == Some(path.to_owned()) {
185                return Ok(Some(item));
186            }
187        }
188
189        Ok(None)
190    }
191
192    fn fetch_items(
193        &self,
194        library: &Library,
195    ) -> anyhow::Result<(
196        UnboundedReceiver<Item>,
197        tokio::task::JoinHandle<anyhow::Result<()>>,
198    )> {
199        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
200        let limit = 1000;
201
202        let client = self.get_client()?;
203        let mut url = get_url(&self.url)?.join("Items")?;
204
205        url.query_pairs_mut().append_pair("Recursive", "true");
206        url.query_pairs_mut().append_pair("Fields", "Path");
207        url.query_pairs_mut().append_pair("EnableImages", "false");
208        url.query_pairs_mut()
209            .append_pair("ParentId", &library.item_id);
210        url.query_pairs_mut()
211            .append_pair("EnableTotalRecordCount", "false");
212        url.query_pairs_mut()
213            .append_pair("Limit", &limit.to_string());
214        if let Some(collection_type) = &library.collection_type {
215            url.query_pairs_mut().append_pair(
216                "IncludeItemTypes",
217                match collection_type.as_str() {
218                    "tvshows" => "Episode",
219                    "books" => "Book",
220                    "music" => "Audio",
221                    "movie" => "VideoFile,Movie",
222                    _ => "",
223                },
224            );
225        }
226
227        let handle = tokio::spawn(async move {
228            let mut page = 0;
229
230            loop {
231                let mut page_url = url.clone();
232                page_url
233                    .query_pairs_mut()
234                    .append_pair("StartIndex", &(page * limit).to_string());
235
236                let res = client.get(page_url).perform().await?;
237
238                let bytes = res.bytes().await?;
239
240                let mut json_reader = JsonStreamReader::new(Cursor::new(bytes));
241
242                json_reader.seek_to(&json_path!["Items"])?;
243                json_reader.begin_array()?;
244
245                let mut found_items_count = 0;
246
247                while json_reader.has_next()? {
248                    let item: Item = json_reader.deserialize_next()?;
249
250                    tx.send(item)?;
251
252                    found_items_count += 1;
253                }
254
255                if found_items_count < limit {
256                    break;
257                }
258
259                page += 1;
260            }
261
262            drop(tx);
263
264            Ok(())
265        });
266
267        Ok((rx, handle))
268    }
269
270    async fn get_items<'a>(
271        &self,
272        library: &Library,
273        events: Vec<&'a ScanEvent>,
274    ) -> anyhow::Result<(Vec<(&'a ScanEvent, Item)>, Vec<&'a ScanEvent>)> {
275        let (mut rx, handle) = self.fetch_items(library)?;
276
277        let mut found_in_library = Vec::new();
278        let mut not_found_in_library = events.clone();
279
280        while let Some(item) = rx.recv().await {
281            if let Some(ev) = events
282                .iter()
283                .find(|ev| item.path == Some(ev.get_path(&self.rewrite)))
284            {
285                found_in_library.push((*ev, item.clone()));
286                not_found_in_library.retain(|&e| e.id != ev.id);
287
288                if not_found_in_library.is_empty() {
289                    break;
290                }
291            }
292        }
293
294        handle.abort();
295
296        Ok((found_in_library, not_found_in_library))
297    }
298
299    // not as effective as refreshing the item, but good enough
300    async fn scan(&self, ev: &[&ScanEvent]) -> anyhow::Result<()> {
301        let client = self.get_client()?;
302        let url = get_url(&self.url)?.join("Library/Media/Updated")?;
303
304        let updates = ev
305            .iter()
306            .map(|ev| UpdateRequest {
307                path: ev.get_path(&self.rewrite),
308                update_type: "Modified".to_string(),
309            })
310            .collect();
311
312        let body = ScanPayload { updates };
313
314        client
315            .post(url)
316            .header("Content-Type", "application/json")
317            .json(&body)
318            .perform()
319            .await
320            .map(|_| ())
321    }
322
323    async fn refresh_item(&self, item: &Item) -> anyhow::Result<()> {
324        let client = self.get_client()?;
325        let mut url = get_url(&self.url)?.join(&format!("Items/{}/Refresh", item.id))?;
326
327        url.query_pairs_mut().append_pair(
328            "MetadataRefreshMode",
329            &self.metadata_refresh_mode.to_string(),
330        );
331        url.query_pairs_mut()
332            .append_pair("ImageRefreshMode", &self.metadata_refresh_mode.to_string());
333        url.query_pairs_mut()
334            .append_pair("ReplaceAllMetadata", "true");
335        url.query_pairs_mut().append_pair("Recursive", "true");
336
337        // TODO: Possible options in future?
338        url.query_pairs_mut()
339            .append_pair("ReplaceAllImages", "false");
340        url.query_pairs_mut()
341            .append_pair("RegenerateTrickplay", "false");
342
343        client.post(url).perform().await.map(|_| ())
344    }
345}
346
347impl TargetProcess for Emby {
348    async fn process(&self, evs: &[&ScanEvent]) -> anyhow::Result<Vec<String>> {
349        let libraries = self
350            .libraries()
351            .await
352            .context("failed to fetch libraries")?;
353
354        let mut succeeded: HashMap<String, bool> = HashMap::new();
355
356        let mut to_find = HashMap::new();
357        let mut to_refresh = Vec::new();
358        let mut to_scan = Vec::new();
359
360        if self.refresh_metadata {
361            for ev in evs {
362                let ev_path = ev.get_path(&self.rewrite);
363
364                let matched_libraries = self.get_libraries(&libraries, &ev_path);
365
366                if matched_libraries.is_empty() {
367                    error!("failed to find library for file: {}", ev_path);
368                    continue;
369                }
370
371                for library in matched_libraries {
372                    to_find.entry(library).or_insert_with(Vec::new).push(*ev);
373                }
374            }
375
376            for (library, library_events) in to_find {
377                let (found_in_library, not_found_in_library) = self
378                    .get_items(&library, library_events.clone())
379                    .await
380                    .with_context(|| {
381                        format!(
382                            "failed to fetch items for library: {}",
383                            library.name.clone()
384                        )
385                    })?;
386
387                to_refresh.extend(found_in_library);
388                to_scan.extend(not_found_in_library);
389            }
390
391            for (ev, item) in to_refresh {
392                match self.refresh_item(&item).await {
393                    Ok(()) => {
394                        debug!("refreshed item: {}", item.id);
395                        *succeeded.entry(ev.id.clone()).or_insert(true) &= true;
396                    }
397                    Err(e) => {
398                        error!("failed to refresh item: {}", e);
399                        succeeded.insert(ev.id.clone(), false);
400                    }
401                }
402            }
403        } else {
404            to_scan.extend(evs.iter().copied());
405        }
406
407        if !to_scan.is_empty() {
408            match self.scan(&to_scan).await {
409                Ok(()) => {
410                    for ev in &to_scan {
411                        debug!("scanned file: {}", ev.file_path);
412
413                        *succeeded.entry(ev.id.clone()).or_insert(true) &= true;
414                    }
415                }
416                Err(e) => {
417                    error!("failed to scan items: {}", e);
418
419                    for ev in &to_scan {
420                        succeeded.insert(ev.id.clone(), false);
421                    }
422                }
423            }
424        }
425
426        Ok(succeeded
427            .iter()
428            .filter_map(|(k, v)| if *v { Some(k.clone()) } else { None })
429            .collect())
430    }
431}