autopulse_service/settings/targets/
emby.rs

1use super::RequestBuilderPerform;
2use crate::settings::rewrite::Rewrite;
3use crate::settings::targets::TargetProcess;
4use anyhow::Context;
5use autopulse_database::models::ScanEvent;
6use autopulse_utils::get_url;
7use reqwest::header;
8use serde::{Deserialize, Serialize};
9use std::{collections::HashMap, fmt::Display, io::Cursor, path::Path};
10use struson::{
11    json_path,
12    reader::{JsonReader, JsonStreamReader},
13};
14use tokio::sync::mpsc::UnboundedReceiver;
15use tracing::{debug, error};
16
17#[doc(hidden)]
18const fn default_true() -> bool {
19    true
20}
21
22#[derive(Serialize, Clone, Deserialize)]
23pub struct Emby {
24    /// URL to the Jellyfin/Emby server
25    pub url: String,
26    /// API token for the Jellyfin/Emby server
27    pub token: String,
28    /// Metadata refresh mode (default: `FullRefresh`)
29    #[serde(default)]
30    pub metadata_refresh_mode: EmbyMetadataRefreshMode,
31    /// Whether to try to refresh metadata for the item instead of scan (default: true)
32    #[serde(default = "default_true")]
33    pub refresh_metadata: bool,
34    /// Rewrite path for the file
35    pub rewrite: Option<Rewrite>,
36}
37
38/// Metadata refresh mode for Jellyfin/Emby
39#[derive(Serialize, Clone, Deserialize)]
40#[serde(rename_all = "snake_case")]
41#[derive(Default)]
42pub enum EmbyMetadataRefreshMode {
43    /// `none`
44    None,
45    /// `validation_only`
46    ValidationOnly,
47    /// `default`
48    Default,
49    /// `full_refresh`
50    #[default]
51    FullRefresh,
52}
53
54impl Display for EmbyMetadataRefreshMode {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        let mode = match self {
57            Self::None => "None",
58            Self::ValidationOnly => "ValidationOnly",
59            Self::Default => "Default",
60            Self::FullRefresh => "FullRefresh",
61        };
62
63        write!(f, "{mode}")
64    }
65}
66
67#[derive(Deserialize, Clone, Eq, PartialEq, Hash)]
68#[serde(rename_all = "PascalCase")]
69#[doc(hidden)]
70struct Library {
71    #[allow(dead_code)]
72    name: String,
73    locations: Vec<String>,
74    item_id: String,
75    collection_type: Option<String>,
76}
77
78#[derive(Serialize, Clone)]
79#[serde(rename_all = "PascalCase")]
80#[doc(hidden)]
81struct UpdateRequest {
82    path: String,
83    update_type: String,
84}
85
86#[derive(Serialize, Clone)]
87#[serde(rename_all = "PascalCase")]
88#[doc(hidden)]
89struct ScanPayload {
90    updates: Vec<UpdateRequest>,
91}
92
93#[derive(Deserialize, Clone)]
94#[serde(rename_all = "PascalCase")]
95#[doc(hidden)]
96struct Item {
97    id: String,
98    path: Option<String>,
99}
100
101impl Emby {
102    fn get_client(&self) -> anyhow::Result<reqwest::Client> {
103        let mut headers = header::HeaderMap::new();
104
105        headers.insert("X-Emby-Token", self.token.parse()?);
106        headers.insert(
107            "Authorization",
108            format!("MediaBrowser Token=\"{}\"", self.token).parse()?,
109        );
110        headers.insert("Accept", "application/json".parse()?);
111
112        reqwest::Client::builder()
113            .timeout(std::time::Duration::from_secs(10))
114            .default_headers(headers)
115            .build()
116            .map_err(Into::into)
117    }
118
119    async fn libraries(&self) -> anyhow::Result<Vec<Library>> {
120        let client = self.get_client()?;
121        let url = get_url(&self.url)?.join("Library/VirtualFolders")?;
122
123        let res = client.get(url).perform().await?;
124
125        Ok(res.json().await?)
126    }
127
128    fn get_libraries(&self, libraries: &[Library], path: &str) -> Vec<Library> {
129        let ev_path = Path::new(path);
130        let mut matched: Vec<Library> = vec![];
131
132        for library in libraries {
133            for location in &library.locations {
134                let path = Path::new(location);
135
136                if ev_path.starts_with(path) {
137                    matched.push(library.clone());
138                }
139            }
140        }
141
142        matched
143    }
144
145    async fn _get_item(&self, library: &Library, path: &str) -> anyhow::Result<Option<Item>> {
146        let client = self.get_client()?;
147        let mut url = get_url(&self.url)?.join("Items")?;
148
149        url.query_pairs_mut().append_pair("Recursive", "true");
150        url.query_pairs_mut().append_pair("Fields", "Path");
151        url.query_pairs_mut().append_pair("EnableImages", "false");
152        if let Some(collection_type) = &library.collection_type {
153            url.query_pairs_mut().append_pair(
154                "IncludeItemTypes",
155                match collection_type.as_str() {
156                    "tvshows" => "Episode",
157                    "books" => "Book",
158                    "music" => "Audio",
159                    "movie" => "VideoFile,Movie",
160                    _ => "",
161                },
162            );
163        }
164        url.query_pairs_mut()
165            .append_pair("ParentId", &library.item_id);
166        url.query_pairs_mut()
167            .append_pair("EnableTotalRecordCount", "false");
168
169        let res = client.get(url).perform().await?;
170
171        // Possibly unneeded unless we can use streams
172        let bytes = res.bytes().await?;
173
174        let mut json_reader = JsonStreamReader::new(Cursor::new(bytes));
175
176        json_reader.seek_to(&json_path!["Items"])?;
177        json_reader.begin_array()?;
178
179        while json_reader.has_next()? {
180            let item: Item = json_reader.deserialize_next()?;
181
182            if item.path == Some(path.to_owned()) {
183                return Ok(Some(item));
184            }
185        }
186
187        Ok(None)
188    }
189
190    fn fetch_items(
191        &self,
192        library: &Library,
193    ) -> anyhow::Result<(
194        UnboundedReceiver<Item>,
195        tokio::task::JoinHandle<anyhow::Result<()>>,
196    )> {
197        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
198        let limit = 1000;
199
200        let client = self.get_client()?;
201        let mut url = get_url(&self.url)?.join("Items")?;
202
203        url.query_pairs_mut().append_pair("Recursive", "true");
204        url.query_pairs_mut().append_pair("Fields", "Path");
205        url.query_pairs_mut().append_pair("EnableImages", "false");
206        url.query_pairs_mut()
207            .append_pair("ParentId", &library.item_id);
208        url.query_pairs_mut()
209            .append_pair("EnableTotalRecordCount", "false");
210        url.query_pairs_mut()
211            .append_pair("Limit", &limit.to_string());
212        if let Some(collection_type) = &library.collection_type {
213            url.query_pairs_mut().append_pair(
214                "IncludeItemTypes",
215                match collection_type.as_str() {
216                    "tvshows" => "Episode",
217                    "books" => "Book",
218                    "music" => "Audio",
219                    "movie" => "VideoFile,Movie",
220                    _ => "",
221                },
222            );
223        }
224
225        let handle = tokio::spawn(async move {
226            let mut page = 0;
227
228            loop {
229                let mut page_url = url.clone();
230                page_url
231                    .query_pairs_mut()
232                    .append_pair("StartIndex", &(page * limit).to_string());
233
234                let res = client.get(page_url).perform().await?;
235
236                let bytes = res.bytes().await?;
237
238                let mut json_reader = JsonStreamReader::new(Cursor::new(bytes));
239
240                json_reader.seek_to(&json_path!["Items"])?;
241                json_reader.begin_array()?;
242
243                let mut found_items_count = 0;
244
245                while json_reader.has_next()? {
246                    let item: Item = json_reader.deserialize_next()?;
247
248                    tx.send(item)?;
249
250                    found_items_count += 1;
251                }
252
253                if found_items_count < limit {
254                    break;
255                }
256
257                page += 1;
258            }
259
260            drop(tx);
261
262            Ok(())
263        });
264
265        Ok((rx, handle))
266    }
267
268    async fn get_items<'a>(
269        &self,
270        library: &Library,
271        events: Vec<&'a ScanEvent>,
272    ) -> anyhow::Result<(Vec<(&'a ScanEvent, Item)>, Vec<&'a ScanEvent>)> {
273        let (mut rx, handle) = self.fetch_items(library)?;
274
275        let mut found_in_library = Vec::new();
276        let mut not_found_in_library = events.clone();
277
278        while let Some(item) = rx.recv().await {
279            if let Some(ev) = events
280                .iter()
281                .find(|ev| item.path == Some(ev.get_path(&self.rewrite)))
282            {
283                found_in_library.push((*ev, item.clone()));
284                not_found_in_library.retain(|&e| e.id != ev.id);
285
286                if not_found_in_library.is_empty() {
287                    break;
288                }
289            }
290        }
291
292        handle.abort();
293
294        Ok((found_in_library, not_found_in_library))
295    }
296
297    // not as effective as refreshing the item, but good enough
298    async fn scan(&self, ev: &[&ScanEvent]) -> anyhow::Result<()> {
299        let client = self.get_client()?;
300        let url = get_url(&self.url)?.join("Library/Media/Updated")?;
301
302        let updates = ev
303            .iter()
304            .map(|ev| UpdateRequest {
305                path: ev.get_path(&self.rewrite),
306                update_type: "Modified".to_string(),
307            })
308            .collect();
309
310        let body = ScanPayload { updates };
311
312        client
313            .post(url)
314            .header("Content-Type", "application/json")
315            .json(&body)
316            .perform()
317            .await
318            .map(|_| ())
319    }
320
321    async fn refresh_item(&self, item: &Item) -> anyhow::Result<()> {
322        let client = self.get_client()?;
323        let mut url = get_url(&self.url)?.join(&format!("Items/{}/Refresh", item.id))?;
324
325        url.query_pairs_mut().append_pair(
326            "MetadataRefreshMode",
327            &self.metadata_refresh_mode.to_string(),
328        );
329        url.query_pairs_mut()
330            .append_pair("ImageRefreshMode", &self.metadata_refresh_mode.to_string());
331        url.query_pairs_mut()
332            .append_pair("ReplaceAllMetadata", "true");
333        url.query_pairs_mut().append_pair("Recursive", "true");
334
335        // TODO: Possible options in future?
336        url.query_pairs_mut()
337            .append_pair("ReplaceAllImages", "false");
338        url.query_pairs_mut()
339            .append_pair("RegenerateTrickplay", "false");
340
341        client.post(url).perform().await.map(|_| ())
342    }
343}
344
345impl TargetProcess for Emby {
346    async fn process(&self, evs: &[&ScanEvent]) -> anyhow::Result<Vec<String>> {
347        let libraries = self
348            .libraries()
349            .await
350            .context("failed to fetch libraries")?;
351
352        let mut succeeded: HashMap<String, bool> = HashMap::new();
353
354        let mut to_find = HashMap::new();
355        let mut to_refresh = Vec::new();
356        let mut to_scan = Vec::new();
357
358        if self.refresh_metadata {
359            for ev in evs {
360                let ev_path = ev.get_path(&self.rewrite);
361
362                let matched_libraries = self.get_libraries(&libraries, &ev_path);
363
364                if matched_libraries.is_empty() {
365                    error!("failed to find library for file: {}", ev_path);
366                    continue;
367                }
368
369                for library in matched_libraries {
370                    to_find.entry(library).or_insert_with(Vec::new).push(*ev);
371                }
372            }
373
374            for (library, library_events) in to_find {
375                let (found_in_library, not_found_in_library) = self
376                    .get_items(&library, library_events.clone())
377                    .await
378                    .with_context(|| {
379                        format!(
380                            "failed to fetch items for library: {}",
381                            library.name.clone()
382                        )
383                    })?;
384
385                to_refresh.extend(found_in_library);
386                to_scan.extend(not_found_in_library);
387            }
388
389            for (ev, item) in to_refresh {
390                match self.refresh_item(&item).await {
391                    Ok(()) => {
392                        debug!("refreshed item: {}", item.id);
393                        *succeeded.entry(ev.id.clone()).or_insert(true) &= true;
394                    }
395                    Err(e) => {
396                        error!("failed to refresh item: {}", e);
397                        succeeded.insert(ev.id.clone(), false);
398                    }
399                }
400            }
401        } else {
402            to_scan.extend(evs.iter().copied());
403        }
404
405        if !to_scan.is_empty() {
406            match self.scan(&to_scan).await {
407                Ok(()) => {
408                    for ev in &to_scan {
409                        debug!("scanned file: {}", ev.file_path);
410
411                        *succeeded.entry(ev.id.clone()).or_insert(true) &= true;
412                    }
413                }
414                Err(e) => {
415                    error!("failed to scan items: {}", e);
416
417                    for ev in &to_scan {
418                        succeeded.insert(ev.id.clone(), false);
419                    }
420                }
421            }
422        }
423
424        Ok(succeeded
425            .iter()
426            .filter_map(|(k, v)| if *v { Some(k.clone()) } else { None })
427            .collect())
428    }
429}