autopulse_service/settings/targets/
emby.rs

1use super::RequestBuilderPerform;
2use crate::settings::rewrite::Rewrite;
3use crate::settings::targets::TargetProcess;
4use anyhow::Context;
5use autopulse_database::models::ScanEvent;
6use autopulse_utils::get_url;
7use reqwest::header;
8use serde::{Deserialize, Serialize};
9use std::{collections::HashMap, fmt::Display, io::Cursor, path::Path};
10use struson::{
11    json_path,
12    reader::{JsonReader, JsonStreamReader},
13};
14use tokio::sync::mpsc::UnboundedReceiver;
15use tracing::{debug, error};
16
17#[doc(hidden)]
18const fn default_true() -> bool {
19    true
20}
21
22#[derive(Clone, Deserialize)]
23pub struct Emby {
24    /// URL to the Jellyfin/Emby server
25    pub url: String,
26    /// API token for the Jellyfin/Emby server
27    pub token: String,
28    /// Metadata refresh mode (default: `FullRefresh`)
29    #[serde(default)]
30    pub metadata_refresh_mode: EmbyMetadataRefreshMode,
31    /// Whether to try to refresh metadata for the item instead of scan (default: true)
32    #[serde(default = "default_true")]
33    pub refresh_metadata: bool,
34    /// Rewrite path for the file
35    pub rewrite: Option<Rewrite>,
36}
37
38/// Metadata refresh mode for Jellyfin/Emby
39#[derive(Clone, Deserialize)]
40#[serde(rename_all = "snake_case")]
41pub enum EmbyMetadataRefreshMode {
42    /// `none`
43    None,
44    /// `validation_only`
45    ValidationOnly,
46    /// `default`
47    Default,
48    /// `full_refresh`
49    FullRefresh,
50}
51
52impl Display for EmbyMetadataRefreshMode {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        let mode = match self {
55            Self::None => "None",
56            Self::ValidationOnly => "ValidationOnly",
57            Self::Default => "Default",
58            Self::FullRefresh => "FullRefresh",
59        };
60
61        write!(f, "{mode}")
62    }
63}
64
65impl Default for EmbyMetadataRefreshMode {
66    fn default() -> Self {
67        Self::FullRefresh
68    }
69}
70
71#[derive(Deserialize, Clone, Eq, PartialEq, Hash)]
72#[serde(rename_all = "PascalCase")]
73#[doc(hidden)]
74struct Library {
75    #[allow(dead_code)]
76    name: String,
77    locations: Vec<String>,
78    item_id: String,
79    collection_type: Option<String>,
80}
81
82#[derive(Serialize, Clone)]
83#[serde(rename_all = "PascalCase")]
84#[doc(hidden)]
85struct UpdateRequest {
86    path: String,
87    update_type: String,
88}
89
90#[derive(Serialize, Clone)]
91#[serde(rename_all = "PascalCase")]
92#[doc(hidden)]
93struct ScanPayload {
94    updates: Vec<UpdateRequest>,
95}
96
97#[derive(Deserialize, Clone)]
98#[serde(rename_all = "PascalCase")]
99#[doc(hidden)]
100struct Item {
101    id: String,
102    path: Option<String>,
103}
104
105impl Emby {
106    fn get_client(&self) -> anyhow::Result<reqwest::Client> {
107        let mut headers = header::HeaderMap::new();
108
109        headers.insert("X-Emby-Token", self.token.parse()?);
110        headers.insert(
111            "Authorization",
112            format!("MediaBrowser Token=\"{}\"", self.token).parse()?,
113        );
114        headers.insert("Accept", "application/json".parse()?);
115
116        reqwest::Client::builder()
117            .timeout(std::time::Duration::from_secs(10))
118            .default_headers(headers)
119            .build()
120            .map_err(Into::into)
121    }
122
123    async fn libraries(&self) -> anyhow::Result<Vec<Library>> {
124        let client = self.get_client()?;
125        let url = get_url(&self.url)?.join("Library/VirtualFolders")?;
126
127        let res = client.get(url).perform().await?;
128
129        Ok(res.json().await?)
130    }
131
132    fn get_libraries(&self, libraries: &[Library], path: &str) -> Vec<Library> {
133        let ev_path = Path::new(path);
134        let mut matched: Vec<Library> = vec![];
135
136        for library in libraries {
137            for location in &library.locations {
138                let path = Path::new(location);
139
140                if ev_path.starts_with(path) {
141                    matched.push(library.clone());
142                }
143            }
144        }
145
146        matched
147    }
148
149    async fn _get_item(&self, library: &Library, path: &str) -> anyhow::Result<Option<Item>> {
150        let client = self.get_client()?;
151        let mut url = get_url(&self.url)?.join("Items")?;
152
153        url.query_pairs_mut().append_pair("Recursive", "true");
154        url.query_pairs_mut().append_pair("Fields", "Path");
155        url.query_pairs_mut().append_pair("EnableImages", "false");
156        if let Some(collection_type) = &library.collection_type {
157            url.query_pairs_mut().append_pair(
158                "IncludeItemTypes",
159                match collection_type.as_str() {
160                    "tvshows" => "Episode",
161                    "books" => "Book",
162                    "music" => "Audio",
163                    "movie" => "VideoFile,Movie",
164                    _ => "",
165                },
166            );
167        }
168        url.query_pairs_mut()
169            .append_pair("ParentId", &library.item_id);
170        url.query_pairs_mut()
171            .append_pair("EnableTotalRecordCount", "false");
172
173        let res = client.get(url).perform().await?;
174
175        // Possibly unneeded unless we can use streams
176        let bytes = res.bytes().await?;
177
178        let mut json_reader = JsonStreamReader::new(Cursor::new(bytes));
179
180        json_reader.seek_to(&json_path!["Items"])?;
181        json_reader.begin_array()?;
182
183        while json_reader.has_next()? {
184            let item: Item = json_reader.deserialize_next()?;
185
186            if item.path == Some(path.to_owned()) {
187                return Ok(Some(item));
188            }
189        }
190
191        Ok(None)
192    }
193
194    fn fetch_items(
195        &self,
196        library: &Library,
197    ) -> anyhow::Result<(
198        UnboundedReceiver<Item>,
199        tokio::task::JoinHandle<anyhow::Result<()>>,
200    )> {
201        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
202        let limit = 1000;
203
204        let client = self.get_client()?;
205        let mut url = get_url(&self.url)?.join("Items")?;
206
207        url.query_pairs_mut().append_pair("Recursive", "true");
208        url.query_pairs_mut().append_pair("Fields", "Path");
209        url.query_pairs_mut().append_pair("EnableImages", "false");
210        url.query_pairs_mut()
211            .append_pair("ParentId", &library.item_id);
212        url.query_pairs_mut()
213            .append_pair("EnableTotalRecordCount", "false");
214        url.query_pairs_mut()
215            .append_pair("Limit", &limit.to_string());
216        if let Some(collection_type) = &library.collection_type {
217            url.query_pairs_mut().append_pair(
218                "IncludeItemTypes",
219                match collection_type.as_str() {
220                    "tvshows" => "Episode",
221                    "books" => "Book",
222                    "music" => "Audio",
223                    "movie" => "VideoFile,Movie",
224                    _ => "",
225                },
226            );
227        }
228
229        let handle = tokio::spawn(async move {
230            let mut page = 0;
231
232            loop {
233                let mut page_url = url.clone();
234                page_url
235                    .query_pairs_mut()
236                    .append_pair("StartIndex", &(page * limit).to_string());
237
238                let res = client.get(page_url).perform().await?;
239
240                let bytes = res.bytes().await?;
241
242                let mut json_reader = JsonStreamReader::new(Cursor::new(bytes));
243
244                json_reader.seek_to(&json_path!["Items"])?;
245                json_reader.begin_array()?;
246
247                let mut found_items_count = 0;
248
249                while json_reader.has_next()? {
250                    let item: Item = json_reader.deserialize_next()?;
251
252                    tx.send(item)?;
253
254                    found_items_count += 1;
255                }
256
257                if found_items_count < limit {
258                    break;
259                }
260
261                page += 1;
262            }
263
264            drop(tx);
265
266            Ok(())
267        });
268
269        Ok((rx, handle))
270    }
271
272    async fn get_items<'a>(
273        &self,
274        library: &Library,
275        events: Vec<&'a ScanEvent>,
276    ) -> anyhow::Result<(Vec<(&'a ScanEvent, Item)>, Vec<&'a ScanEvent>)> {
277        let (mut rx, handle) = self.fetch_items(library)?;
278
279        let mut found_in_library = Vec::new();
280        let mut not_found_in_library = events.clone();
281
282        while let Some(item) = rx.recv().await {
283            if let Some(ev) = events
284                .iter()
285                .find(|ev| item.path == Some(ev.get_path(&self.rewrite)))
286            {
287                found_in_library.push((*ev, item.clone()));
288                not_found_in_library.retain(|&e| e.id != ev.id);
289
290                if not_found_in_library.is_empty() {
291                    break;
292                }
293            }
294        }
295
296        handle.abort();
297
298        Ok((found_in_library, not_found_in_library))
299    }
300
301    // not as effective as refreshing the item, but good enough
302    async fn scan(&self, ev: &[&ScanEvent]) -> anyhow::Result<()> {
303        let client = self.get_client()?;
304        let url = get_url(&self.url)?.join("Library/Media/Updated")?;
305
306        let updates = ev
307            .iter()
308            .map(|ev| UpdateRequest {
309                path: ev.get_path(&self.rewrite),
310                update_type: "Modified".to_string(),
311            })
312            .collect();
313
314        let body = ScanPayload { updates };
315
316        client
317            .post(url)
318            .header("Content-Type", "application/json")
319            .json(&body)
320            .perform()
321            .await
322            .map(|_| ())
323    }
324
325    async fn refresh_item(&self, item: &Item) -> anyhow::Result<()> {
326        let client = self.get_client()?;
327        let mut url = get_url(&self.url)?.join(&format!("Items/{}/Refresh", item.id))?;
328
329        url.query_pairs_mut().append_pair(
330            "MetadataRefreshMode",
331            &self.metadata_refresh_mode.to_string(),
332        );
333        url.query_pairs_mut()
334            .append_pair("ImageRefreshMode", &self.metadata_refresh_mode.to_string());
335        url.query_pairs_mut()
336            .append_pair("ReplaceAllMetadata", "true");
337        url.query_pairs_mut().append_pair("Recursive", "true");
338
339        // TODO: Possible options in future?
340        url.query_pairs_mut()
341            .append_pair("ReplaceAllImages", "false");
342        url.query_pairs_mut()
343            .append_pair("RegenerateTrickplay", "false");
344
345        client.post(url).perform().await.map(|_| ())
346    }
347}
348
349impl TargetProcess for Emby {
350    async fn process(&self, evs: &[&ScanEvent]) -> anyhow::Result<Vec<String>> {
351        let libraries = self
352            .libraries()
353            .await
354            .context("failed to fetch libraries")?;
355
356        let mut succeeded: HashMap<String, bool> = HashMap::new();
357
358        let mut to_find = HashMap::new();
359        let mut to_refresh = Vec::new();
360        let mut to_scan = Vec::new();
361
362        if self.refresh_metadata {
363            for ev in evs {
364                let ev_path = ev.get_path(&self.rewrite);
365
366                let matched_libraries = self.get_libraries(&libraries, &ev_path);
367
368                if matched_libraries.is_empty() {
369                    error!("failed to find library for file: {}", ev_path);
370                    continue;
371                }
372
373                for library in matched_libraries {
374                    to_find.entry(library).or_insert_with(Vec::new).push(*ev);
375                }
376            }
377
378            for (library, library_events) in to_find {
379                let (found_in_library, not_found_in_library) = self
380                    .get_items(&library, library_events.clone())
381                    .await
382                    .with_context(|| {
383                        format!(
384                            "failed to fetch items for library: {}",
385                            library.name.clone()
386                        )
387                    })?;
388
389                to_refresh.extend(found_in_library);
390                to_scan.extend(not_found_in_library);
391            }
392
393            for (ev, item) in to_refresh {
394                match self.refresh_item(&item).await {
395                    Ok(()) => {
396                        debug!("refreshed item: {}", item.id);
397                        *succeeded.entry(ev.id.clone()).or_insert(true) &= true;
398                    }
399                    Err(e) => {
400                        error!("failed to refresh item: {}", e);
401                        succeeded.insert(ev.id.clone(), false);
402                    }
403                }
404            }
405        } else {
406            to_scan.extend(evs.iter().copied());
407        }
408
409        if !to_scan.is_empty() {
410            match self.scan(&to_scan).await {
411                Ok(()) => {
412                    for ev in &to_scan {
413                        debug!("scanned file: {}", ev.file_path);
414
415                        *succeeded.entry(ev.id.clone()).or_insert(true) &= true;
416                    }
417                }
418                Err(e) => {
419                    error!("failed to scan items: {}", e);
420
421                    for ev in &to_scan {
422                        succeeded.insert(ev.id.clone(), false);
423                    }
424                }
425            }
426        }
427
428        Ok(succeeded
429            .iter()
430            .filter_map(|(k, v)| if *v { Some(k.clone()) } else { None })
431            .collect())
432    }
433}