Skip to main content

autopulse_service/settings/targets/
emby.rs

1use super::{Request, RequestBuilderPerform};
2use crate::settings::path_filter::PathFilter;
3use crate::settings::rewrite::Rewrite;
4use crate::settings::targets::TargetProcess;
5use anyhow::Context;
6use autopulse_database::models::ScanEvent;
7use autopulse_utils::get_url;
8use reqwest::header;
9use serde::{Deserialize, Serialize};
10use std::{collections::HashMap, fmt::Display, io::Cursor, path::Path};
11use struson::{
12    json_path,
13    reader::{JsonReader, JsonStreamReader},
14};
15use tokio::sync::mpsc::UnboundedReceiver;
16use tracing::{debug, error};
17
18#[doc(hidden)]
19const fn default_true() -> bool {
20    true
21}
22
23#[derive(Serialize, Clone, Deserialize)]
24pub struct Emby {
25    /// URL to the Jellyfin/Emby server
26    pub url: String,
27    /// API token for the Jellyfin/Emby server
28    pub token: String,
29    /// Metadata refresh mode (default: `FullRefresh`)
30    #[serde(default)]
31    pub metadata_refresh_mode: EmbyMetadataRefreshMode,
32    /// Whether to try to refresh metadata for the item instead of scan (default: true)
33    #[serde(default = "default_true")]
34    pub refresh_metadata: bool,
35    /// Rewrite path for the file
36    pub rewrite: Option<Rewrite>,
37    /// Path filter matched against the target-rewritten path.
38    #[serde(default)]
39    pub filter: PathFilter,
40    /// HTTP request options
41    #[serde(default)]
42    pub request: Request,
43    /// How library locations are compared to incoming paths. Default `case_sensitive`.
44    #[serde(default)]
45    pub path_match: PathMatch,
46}
47
48/// How library locations are compared to incoming paths.
49#[derive(Serialize, Clone, Deserialize, Default)]
50#[serde(rename_all = "snake_case")]
51pub enum PathMatch {
52    /// Exact byte-for-byte prefix match (default; safe on Linux).
53    #[default]
54    CaseSensitive,
55    /// Lowercase both sides before comparing. Useful on Windows / UNC paths.
56    CaseInsensitive,
57}
58
59/// Metadata refresh mode for Jellyfin/Emby
60#[derive(Serialize, Clone, Deserialize)]
61#[serde(rename_all = "snake_case")]
62#[derive(Default)]
63pub enum EmbyMetadataRefreshMode {
64    /// `none`
65    None,
66    /// `validation_only`
67    ValidationOnly,
68    /// `default`
69    Default,
70    /// `full_refresh`
71    #[default]
72    FullRefresh,
73}
74
75impl Display for EmbyMetadataRefreshMode {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        let mode = match self {
78            Self::None => "None",
79            Self::ValidationOnly => "ValidationOnly",
80            Self::Default => "Default",
81            Self::FullRefresh => "FullRefresh",
82        };
83
84        write!(f, "{mode}")
85    }
86}
87
88#[derive(Deserialize, Clone, Eq, PartialEq, Hash)]
89#[serde(rename_all = "PascalCase")]
90#[doc(hidden)]
91struct Library {
92    #[allow(dead_code)]
93    name: String,
94    locations: Vec<String>,
95    item_id: String,
96    collection_type: Option<String>,
97}
98
99#[derive(Serialize, Clone)]
100#[serde(rename_all = "PascalCase")]
101#[doc(hidden)]
102struct UpdateRequest {
103    path: String,
104    update_type: String,
105}
106
107#[derive(Serialize, Clone)]
108#[serde(rename_all = "PascalCase")]
109#[doc(hidden)]
110struct ScanPayload {
111    updates: Vec<UpdateRequest>,
112}
113
114#[derive(Deserialize, Clone)]
115#[serde(rename_all = "PascalCase")]
116#[doc(hidden)]
117struct Item {
118    id: String,
119    path: Option<String>,
120}
121
122impl Emby {
123    fn get_client(&self) -> anyhow::Result<reqwest::Client> {
124        let mut headers = header::HeaderMap::new();
125
126        headers.insert("X-Emby-Token", self.token.parse()?);
127        headers.insert(
128            "Authorization",
129            format!("MediaBrowser Token=\"{}\"", self.token).parse()?,
130        );
131        headers.insert("Accept", "application/json".parse()?);
132
133        self.request
134            .client_builder(headers)
135            .build()
136            .map_err(Into::into)
137    }
138
139    async fn libraries(&self) -> anyhow::Result<Vec<Library>> {
140        let client = self.get_client()?;
141        let url = get_url(&self.url)?.join("Library/VirtualFolders")?;
142
143        let res = client.get(url).perform().await?;
144
145        Ok(res.json().await?)
146    }
147
148    fn get_libraries(&self, libraries: &[Library], path: &str) -> Vec<Library> {
149        let mut matched: Vec<Library> = vec![];
150
151        for library in libraries {
152            for location in &library.locations {
153                if self.path_prefix_matches(location, path) {
154                    matched.push(library.clone());
155                    break; // one location is enough to match this library
156                }
157            }
158        }
159
160        matched
161    }
162
163    fn path_prefix_matches(&self, location: &str, ev_path: &str) -> bool {
164        // Trailing `/` or `\` on library locations shouldn't break the prefix check.
165        let trimmed = location.trim_end_matches(['/', '\\']);
166
167        match self.path_match {
168            PathMatch::CaseSensitive => Path::new(ev_path).starts_with(trimmed),
169            PathMatch::CaseInsensitive => {
170                let lhs = ev_path.to_ascii_lowercase();
171                let rhs = trimmed.to_ascii_lowercase();
172                Path::new(&lhs).starts_with(&rhs)
173            }
174        }
175    }
176
177    async fn _get_item(&self, library: &Library, path: &str) -> anyhow::Result<Option<Item>> {
178        let client = self.get_client()?;
179        let mut url = get_url(&self.url)?.join("Items")?;
180
181        url.query_pairs_mut().append_pair("Recursive", "true");
182        url.query_pairs_mut().append_pair("Fields", "Path");
183        url.query_pairs_mut().append_pair("EnableImages", "false");
184        if let Some(collection_type) = &library.collection_type {
185            url.query_pairs_mut().append_pair(
186                "IncludeItemTypes",
187                match collection_type.as_str() {
188                    "tvshows" => "Episode",
189                    "books" => "Book",
190                    "music" => "Audio",
191                    "movie" => "VideoFile,Movie",
192                    _ => "",
193                },
194            );
195        }
196        url.query_pairs_mut()
197            .append_pair("ParentId", &library.item_id);
198        url.query_pairs_mut()
199            .append_pair("EnableTotalRecordCount", "false");
200
201        let res = client.get(url).perform().await?;
202
203        // Possibly unneeded unless we can use streams
204        let bytes = res.bytes().await?;
205
206        let mut json_reader = JsonStreamReader::new(Cursor::new(bytes));
207
208        json_reader.seek_to(&json_path!["Items"])?;
209        json_reader.begin_array()?;
210
211        while json_reader.has_next()? {
212            let item: Item = json_reader.deserialize_next()?;
213
214            if item.path == Some(path.to_owned()) {
215                return Ok(Some(item));
216            }
217        }
218
219        Ok(None)
220    }
221
222    fn fetch_items(
223        &self,
224        library: &Library,
225    ) -> anyhow::Result<(
226        UnboundedReceiver<Item>,
227        tokio::task::JoinHandle<anyhow::Result<()>>,
228    )> {
229        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
230        let limit = 1000;
231
232        let client = self.get_client()?;
233        let mut url = get_url(&self.url)?.join("Items")?;
234
235        url.query_pairs_mut().append_pair("Recursive", "true");
236        url.query_pairs_mut().append_pair("Fields", "Path");
237        url.query_pairs_mut().append_pair("EnableImages", "false");
238        url.query_pairs_mut()
239            .append_pair("ParentId", &library.item_id);
240        url.query_pairs_mut()
241            .append_pair("EnableTotalRecordCount", "false");
242        url.query_pairs_mut()
243            .append_pair("Limit", &limit.to_string());
244        if let Some(collection_type) = &library.collection_type {
245            url.query_pairs_mut().append_pair(
246                "IncludeItemTypes",
247                match collection_type.as_str() {
248                    "tvshows" => "Episode",
249                    "books" => "Book",
250                    "music" => "Audio",
251                    "movie" => "VideoFile,Movie",
252                    _ => "",
253                },
254            );
255        }
256
257        let handle = tokio::spawn(async move {
258            let mut page = 0;
259
260            loop {
261                let mut page_url = url.clone();
262                page_url
263                    .query_pairs_mut()
264                    .append_pair("StartIndex", &(page * limit).to_string());
265
266                let res = client.get(page_url).perform().await?;
267
268                let bytes = res.bytes().await?;
269
270                let mut json_reader = JsonStreamReader::new(Cursor::new(bytes));
271
272                json_reader.seek_to(&json_path!["Items"])?;
273                json_reader.begin_array()?;
274
275                let mut found_items_count = 0;
276
277                while json_reader.has_next()? {
278                    let item: Item = json_reader.deserialize_next()?;
279
280                    tx.send(item)?;
281
282                    found_items_count += 1;
283                }
284
285                if found_items_count < limit {
286                    break;
287                }
288
289                page += 1;
290            }
291
292            drop(tx);
293
294            Ok(())
295        });
296
297        Ok((rx, handle))
298    }
299
300    async fn get_items<'a>(
301        &self,
302        library: &Library,
303        events: Vec<&'a ScanEvent>,
304    ) -> anyhow::Result<(Vec<(&'a ScanEvent, Item)>, Vec<&'a ScanEvent>)> {
305        let (mut rx, handle) = self.fetch_items(library)?;
306
307        let mut found_in_library = Vec::new();
308        let mut not_found_in_library = events.clone();
309
310        while let Some(item) = rx.recv().await {
311            if let Some(ev) = events
312                .iter()
313                .find(|ev| item.path == Some(ev.get_path(&self.rewrite)))
314            {
315                found_in_library.push((*ev, item.clone()));
316                not_found_in_library.retain(|&e| e.id != ev.id);
317
318                if not_found_in_library.is_empty() {
319                    break;
320                }
321            }
322        }
323
324        handle.abort();
325
326        Ok((found_in_library, not_found_in_library))
327    }
328
329    // not as effective as refreshing the item, but good enough
330    async fn scan(&self, ev: &[&ScanEvent]) -> anyhow::Result<()> {
331        let client = self.get_client()?;
332        let url = get_url(&self.url)?.join("Library/Media/Updated")?;
333
334        let updates = ev
335            .iter()
336            .map(|ev| UpdateRequest {
337                path: ev.get_path(&self.rewrite),
338                update_type: "Modified".to_string(),
339            })
340            .collect();
341
342        let body = ScanPayload { updates };
343
344        client
345            .post(url)
346            .header("Content-Type", "application/json")
347            .json(&body)
348            .perform()
349            .await
350            .map(|_| ())
351    }
352
353    async fn refresh_item(&self, item: &Item) -> anyhow::Result<()> {
354        let client = self.get_client()?;
355        let mut url = get_url(&self.url)?.join(&format!("Items/{}/Refresh", item.id))?;
356
357        url.query_pairs_mut().append_pair(
358            "MetadataRefreshMode",
359            &self.metadata_refresh_mode.to_string(),
360        );
361        url.query_pairs_mut()
362            .append_pair("ImageRefreshMode", &self.metadata_refresh_mode.to_string());
363        url.query_pairs_mut()
364            .append_pair("ReplaceAllMetadata", "true");
365        url.query_pairs_mut().append_pair("Recursive", "true");
366
367        // TODO: Possible options in future?
368        url.query_pairs_mut()
369            .append_pair("ReplaceAllImages", "false");
370        url.query_pairs_mut()
371            .append_pair("RegenerateTrickplay", "false");
372
373        client.post(url).perform().await.map(|_| ())
374    }
375}
376
377impl TargetProcess for Emby {
378    async fn process(&self, evs: &[&ScanEvent]) -> anyhow::Result<Vec<String>> {
379        let libraries = self
380            .libraries()
381            .await
382            .context("failed to fetch libraries")?;
383
384        let mut succeeded: HashMap<String, bool> = HashMap::new();
385
386        let mut to_find = HashMap::new();
387        let mut to_refresh = Vec::new();
388        let mut to_scan = Vec::new();
389
390        if self.refresh_metadata {
391            for ev in evs {
392                let ev_path = ev.get_path(&self.rewrite);
393
394                let matched_libraries = self.get_libraries(&libraries, &ev_path);
395
396                if matched_libraries.is_empty() {
397                    let known: Vec<&str> = libraries
398                        .iter()
399                        .flat_map(|l| l.locations.iter().map(String::as_str))
400                        .collect();
401                    error!(
402                        "failed to find library for file '{ev_path}'. Known locations: {known:?}"
403                    );
404                    continue;
405                }
406
407                for library in matched_libraries {
408                    to_find.entry(library).or_insert_with(Vec::new).push(*ev);
409                }
410            }
411
412            for (library, library_events) in to_find {
413                let (found_in_library, not_found_in_library) = self
414                    .get_items(&library, library_events.clone())
415                    .await
416                    .with_context(|| {
417                        format!(
418                            "failed to fetch items for library: {}",
419                            library.name.clone()
420                        )
421                    })?;
422
423                to_refresh.extend(found_in_library);
424                to_scan.extend(not_found_in_library);
425            }
426
427            for (ev, item) in to_refresh {
428                match self.refresh_item(&item).await {
429                    Ok(()) => {
430                        debug!("refreshed item: {}", item.id);
431                        *succeeded.entry(ev.id.clone()).or_insert(true) &= true;
432                    }
433                    Err(e) => {
434                        error!("failed to refresh item: {}", e);
435                        succeeded.insert(ev.id.clone(), false);
436                    }
437                }
438            }
439        } else {
440            to_scan.extend(evs.iter().copied());
441        }
442
443        if !to_scan.is_empty() {
444            match self.scan(&to_scan).await {
445                Ok(()) => {
446                    for ev in &to_scan {
447                        debug!("scanned file: {}", ev.file_path);
448
449                        *succeeded.entry(ev.id.clone()).or_insert(true) &= true;
450                    }
451                }
452                Err(e) => {
453                    error!("failed to scan items: {}", e);
454
455                    for ev in &to_scan {
456                        succeeded.insert(ev.id.clone(), false);
457                    }
458                }
459            }
460        }
461
462        Ok(succeeded
463            .iter()
464            .filter_map(|(k, v)| if *v { Some(k.clone()) } else { None })
465            .collect())
466    }
467}
468
469#[cfg(test)]
470mod tests {
471    use super::*;
472
473    fn lib(name: &str, paths: &[&str]) -> Library {
474        Library {
475            name: name.to_string(),
476            locations: paths.iter().map(|s| (*s).to_string()).collect(),
477            item_id: format!("id-{name}"),
478            collection_type: None,
479        }
480    }
481
482    fn target() -> Emby {
483        Emby {
484            url: "http://x".to_string(),
485            token: "t".to_string(),
486            metadata_refresh_mode: EmbyMetadataRefreshMode::default(),
487            refresh_metadata: true,
488            rewrite: None,
489            filter: PathFilter::default(),
490            request: Request::default(),
491            path_match: PathMatch::default(),
492        }
493    }
494
495    #[test]
496    fn matches_when_library_location_is_exact_prefix() {
497        let libs = vec![lib("TV", &["/media/TV"])];
498        let m = target().get_libraries(&libs, "/media/TV/Show/S01E01.mkv");
499        assert_eq!(m.len(), 1);
500    }
501
502    #[test]
503    fn matches_when_library_location_has_trailing_slash() {
504        let libs = vec![lib("TV", &["/media/TV/"])];
505        let m = target().get_libraries(&libs, "/media/TV/Show/S01E01.mkv");
506        assert_eq!(
507            m.len(),
508            1,
509            "trailing slash on library location must not break match"
510        );
511    }
512
513    #[test]
514    fn no_match_when_path_outside_library() {
515        let libs = vec![lib("TV", &["/data/TV"])];
516        let m = target().get_libraries(&libs, "/media/TV/Show.mkv");
517        assert_eq!(m.len(), 0);
518    }
519
520    #[test]
521    fn case_insensitive_matching_when_opted_in() {
522        let mut t = target();
523        t.path_match = PathMatch::CaseInsensitive;
524        let libs = vec![lib("TV", &["/Media/TV"])];
525        let m = t.get_libraries(&libs, "/media/tv/Show.mkv");
526        assert_eq!(
527            m.len(),
528            1,
529            "case-insensitive must match across case differences"
530        );
531    }
532
533    #[test]
534    fn case_sensitive_matching_by_default() {
535        let libs = vec![lib("TV", &["/Media/TV"])];
536        let m = target().get_libraries(&libs, "/media/tv/Show.mkv");
537        assert_eq!(m.len(), 0);
538    }
539}