1use super::{Request, RequestBuilderPerform};
2use crate::settings::path_filter::PathFilter;
3use crate::settings::rewrite::Rewrite;
4use crate::settings::targets::TargetProcess;
5use anyhow::Context;
6use autopulse_database::models::ScanEvent;
7use autopulse_utils::get_url;
8use reqwest::header;
9use serde::{Deserialize, Serialize};
10use std::{collections::HashMap, fmt::Display, io::Cursor, path::Path};
11use struson::{
12 json_path,
13 reader::{JsonReader, JsonStreamReader},
14};
15use tokio::sync::mpsc::UnboundedReceiver;
16use tracing::{debug, error};
17
18#[doc(hidden)]
19const fn default_true() -> bool {
20 true
21}
22
23#[derive(Serialize, Clone, Deserialize)]
24pub struct Emby {
25 pub url: String,
27 pub token: String,
29 #[serde(default)]
31 pub metadata_refresh_mode: EmbyMetadataRefreshMode,
32 #[serde(default = "default_true")]
34 pub refresh_metadata: bool,
35 pub rewrite: Option<Rewrite>,
37 #[serde(default)]
39 pub filter: PathFilter,
40 #[serde(default)]
42 pub request: Request,
43 #[serde(default)]
45 pub path_match: PathMatch,
46}
47
48#[derive(Serialize, Clone, Deserialize, Default)]
50#[serde(rename_all = "snake_case")]
51pub enum PathMatch {
52 #[default]
54 CaseSensitive,
55 CaseInsensitive,
57}
58
59#[derive(Serialize, Clone, Deserialize)]
61#[serde(rename_all = "snake_case")]
62#[derive(Default)]
63pub enum EmbyMetadataRefreshMode {
64 None,
66 ValidationOnly,
68 Default,
70 #[default]
72 FullRefresh,
73}
74
75impl Display for EmbyMetadataRefreshMode {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 let mode = match self {
78 Self::None => "None",
79 Self::ValidationOnly => "ValidationOnly",
80 Self::Default => "Default",
81 Self::FullRefresh => "FullRefresh",
82 };
83
84 write!(f, "{mode}")
85 }
86}
87
88#[derive(Deserialize, Clone, Eq, PartialEq, Hash)]
89#[serde(rename_all = "PascalCase")]
90#[doc(hidden)]
91struct Library {
92 #[allow(dead_code)]
93 name: String,
94 locations: Vec<String>,
95 item_id: String,
96 collection_type: Option<String>,
97}
98
99#[derive(Serialize, Clone)]
100#[serde(rename_all = "PascalCase")]
101#[doc(hidden)]
102struct UpdateRequest {
103 path: String,
104 update_type: String,
105}
106
107#[derive(Serialize, Clone)]
108#[serde(rename_all = "PascalCase")]
109#[doc(hidden)]
110struct ScanPayload {
111 updates: Vec<UpdateRequest>,
112}
113
114#[derive(Deserialize, Clone)]
115#[serde(rename_all = "PascalCase")]
116#[doc(hidden)]
117struct Item {
118 id: String,
119 path: Option<String>,
120}
121
122impl Emby {
123 fn get_client(&self) -> anyhow::Result<reqwest::Client> {
124 let mut headers = header::HeaderMap::new();
125
126 headers.insert("X-Emby-Token", self.token.parse()?);
127 headers.insert(
128 "Authorization",
129 format!("MediaBrowser Token=\"{}\"", self.token).parse()?,
130 );
131 headers.insert("Accept", "application/json".parse()?);
132
133 self.request
134 .client_builder(headers)
135 .build()
136 .map_err(Into::into)
137 }
138
139 async fn libraries(&self) -> anyhow::Result<Vec<Library>> {
140 let client = self.get_client()?;
141 let url = get_url(&self.url)?.join("Library/VirtualFolders")?;
142
143 let res = client.get(url).perform().await?;
144
145 Ok(res.json().await?)
146 }
147
148 fn get_libraries(&self, libraries: &[Library], path: &str) -> Vec<Library> {
149 let mut matched: Vec<Library> = vec![];
150
151 for library in libraries {
152 for location in &library.locations {
153 if self.path_prefix_matches(location, path) {
154 matched.push(library.clone());
155 break; }
157 }
158 }
159
160 matched
161 }
162
163 fn path_prefix_matches(&self, location: &str, ev_path: &str) -> bool {
164 let trimmed = location.trim_end_matches(['/', '\\']);
166
167 match self.path_match {
168 PathMatch::CaseSensitive => Path::new(ev_path).starts_with(trimmed),
169 PathMatch::CaseInsensitive => {
170 let lhs = ev_path.to_ascii_lowercase();
171 let rhs = trimmed.to_ascii_lowercase();
172 Path::new(&lhs).starts_with(&rhs)
173 }
174 }
175 }
176
177 async fn _get_item(&self, library: &Library, path: &str) -> anyhow::Result<Option<Item>> {
178 let client = self.get_client()?;
179 let mut url = get_url(&self.url)?.join("Items")?;
180
181 url.query_pairs_mut().append_pair("Recursive", "true");
182 url.query_pairs_mut().append_pair("Fields", "Path");
183 url.query_pairs_mut().append_pair("EnableImages", "false");
184 if let Some(collection_type) = &library.collection_type {
185 url.query_pairs_mut().append_pair(
186 "IncludeItemTypes",
187 match collection_type.as_str() {
188 "tvshows" => "Episode",
189 "books" => "Book",
190 "music" => "Audio",
191 "movie" => "VideoFile,Movie",
192 _ => "",
193 },
194 );
195 }
196 url.query_pairs_mut()
197 .append_pair("ParentId", &library.item_id);
198 url.query_pairs_mut()
199 .append_pair("EnableTotalRecordCount", "false");
200
201 let res = client.get(url).perform().await?;
202
203 let bytes = res.bytes().await?;
205
206 let mut json_reader = JsonStreamReader::new(Cursor::new(bytes));
207
208 json_reader.seek_to(&json_path!["Items"])?;
209 json_reader.begin_array()?;
210
211 while json_reader.has_next()? {
212 let item: Item = json_reader.deserialize_next()?;
213
214 if item.path == Some(path.to_owned()) {
215 return Ok(Some(item));
216 }
217 }
218
219 Ok(None)
220 }
221
222 fn fetch_items(
223 &self,
224 library: &Library,
225 ) -> anyhow::Result<(
226 UnboundedReceiver<Item>,
227 tokio::task::JoinHandle<anyhow::Result<()>>,
228 )> {
229 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
230 let limit = 1000;
231
232 let client = self.get_client()?;
233 let mut url = get_url(&self.url)?.join("Items")?;
234
235 url.query_pairs_mut().append_pair("Recursive", "true");
236 url.query_pairs_mut().append_pair("Fields", "Path");
237 url.query_pairs_mut().append_pair("EnableImages", "false");
238 url.query_pairs_mut()
239 .append_pair("ParentId", &library.item_id);
240 url.query_pairs_mut()
241 .append_pair("EnableTotalRecordCount", "false");
242 url.query_pairs_mut()
243 .append_pair("Limit", &limit.to_string());
244 if let Some(collection_type) = &library.collection_type {
245 url.query_pairs_mut().append_pair(
246 "IncludeItemTypes",
247 match collection_type.as_str() {
248 "tvshows" => "Episode",
249 "books" => "Book",
250 "music" => "Audio",
251 "movie" => "VideoFile,Movie",
252 _ => "",
253 },
254 );
255 }
256
257 let handle = tokio::spawn(async move {
258 let mut page = 0;
259
260 loop {
261 let mut page_url = url.clone();
262 page_url
263 .query_pairs_mut()
264 .append_pair("StartIndex", &(page * limit).to_string());
265
266 let res = client.get(page_url).perform().await?;
267
268 let bytes = res.bytes().await?;
269
270 let mut json_reader = JsonStreamReader::new(Cursor::new(bytes));
271
272 json_reader.seek_to(&json_path!["Items"])?;
273 json_reader.begin_array()?;
274
275 let mut found_items_count = 0;
276
277 while json_reader.has_next()? {
278 let item: Item = json_reader.deserialize_next()?;
279
280 tx.send(item)?;
281
282 found_items_count += 1;
283 }
284
285 if found_items_count < limit {
286 break;
287 }
288
289 page += 1;
290 }
291
292 drop(tx);
293
294 Ok(())
295 });
296
297 Ok((rx, handle))
298 }
299
300 async fn get_items<'a>(
301 &self,
302 library: &Library,
303 events: Vec<&'a ScanEvent>,
304 ) -> anyhow::Result<(Vec<(&'a ScanEvent, Item)>, Vec<&'a ScanEvent>)> {
305 let (mut rx, handle) = self.fetch_items(library)?;
306
307 let mut found_in_library = Vec::new();
308 let mut not_found_in_library = events.clone();
309
310 while let Some(item) = rx.recv().await {
311 if let Some(ev) = events
312 .iter()
313 .find(|ev| item.path == Some(ev.get_path(&self.rewrite)))
314 {
315 found_in_library.push((*ev, item.clone()));
316 not_found_in_library.retain(|&e| e.id != ev.id);
317
318 if not_found_in_library.is_empty() {
319 break;
320 }
321 }
322 }
323
324 handle.abort();
325
326 Ok((found_in_library, not_found_in_library))
327 }
328
329 async fn scan(&self, ev: &[&ScanEvent]) -> anyhow::Result<()> {
331 let client = self.get_client()?;
332 let url = get_url(&self.url)?.join("Library/Media/Updated")?;
333
334 let updates = ev
335 .iter()
336 .map(|ev| UpdateRequest {
337 path: ev.get_path(&self.rewrite),
338 update_type: "Modified".to_string(),
339 })
340 .collect();
341
342 let body = ScanPayload { updates };
343
344 client
345 .post(url)
346 .header("Content-Type", "application/json")
347 .json(&body)
348 .perform()
349 .await
350 .map(|_| ())
351 }
352
353 async fn refresh_item(&self, item: &Item) -> anyhow::Result<()> {
354 let client = self.get_client()?;
355 let mut url = get_url(&self.url)?.join(&format!("Items/{}/Refresh", item.id))?;
356
357 url.query_pairs_mut().append_pair(
358 "MetadataRefreshMode",
359 &self.metadata_refresh_mode.to_string(),
360 );
361 url.query_pairs_mut()
362 .append_pair("ImageRefreshMode", &self.metadata_refresh_mode.to_string());
363 url.query_pairs_mut()
364 .append_pair("ReplaceAllMetadata", "true");
365 url.query_pairs_mut().append_pair("Recursive", "true");
366
367 url.query_pairs_mut()
369 .append_pair("ReplaceAllImages", "false");
370 url.query_pairs_mut()
371 .append_pair("RegenerateTrickplay", "false");
372
373 client.post(url).perform().await.map(|_| ())
374 }
375}
376
377impl TargetProcess for Emby {
378 async fn process(&self, evs: &[&ScanEvent]) -> anyhow::Result<Vec<String>> {
379 let libraries = self
380 .libraries()
381 .await
382 .context("failed to fetch libraries")?;
383
384 let mut succeeded: HashMap<String, bool> = HashMap::new();
385
386 let mut to_find = HashMap::new();
387 let mut to_refresh = Vec::new();
388 let mut to_scan = Vec::new();
389
390 if self.refresh_metadata {
391 for ev in evs {
392 let ev_path = ev.get_path(&self.rewrite);
393
394 let matched_libraries = self.get_libraries(&libraries, &ev_path);
395
396 if matched_libraries.is_empty() {
397 let known: Vec<&str> = libraries
398 .iter()
399 .flat_map(|l| l.locations.iter().map(String::as_str))
400 .collect();
401 error!(
402 "failed to find library for file '{ev_path}'. Known locations: {known:?}"
403 );
404 continue;
405 }
406
407 for library in matched_libraries {
408 to_find.entry(library).or_insert_with(Vec::new).push(*ev);
409 }
410 }
411
412 for (library, library_events) in to_find {
413 let (found_in_library, not_found_in_library) = self
414 .get_items(&library, library_events.clone())
415 .await
416 .with_context(|| {
417 format!(
418 "failed to fetch items for library: {}",
419 library.name.clone()
420 )
421 })?;
422
423 to_refresh.extend(found_in_library);
424 to_scan.extend(not_found_in_library);
425 }
426
427 for (ev, item) in to_refresh {
428 match self.refresh_item(&item).await {
429 Ok(()) => {
430 debug!("refreshed item: {}", item.id);
431 *succeeded.entry(ev.id.clone()).or_insert(true) &= true;
432 }
433 Err(e) => {
434 error!("failed to refresh item: {}", e);
435 succeeded.insert(ev.id.clone(), false);
436 }
437 }
438 }
439 } else {
440 to_scan.extend(evs.iter().copied());
441 }
442
443 if !to_scan.is_empty() {
444 match self.scan(&to_scan).await {
445 Ok(()) => {
446 for ev in &to_scan {
447 debug!("scanned file: {}", ev.file_path);
448
449 *succeeded.entry(ev.id.clone()).or_insert(true) &= true;
450 }
451 }
452 Err(e) => {
453 error!("failed to scan items: {}", e);
454
455 for ev in &to_scan {
456 succeeded.insert(ev.id.clone(), false);
457 }
458 }
459 }
460 }
461
462 Ok(succeeded
463 .iter()
464 .filter_map(|(k, v)| if *v { Some(k.clone()) } else { None })
465 .collect())
466 }
467}
468
469#[cfg(test)]
470mod tests {
471 use super::*;
472
473 fn lib(name: &str, paths: &[&str]) -> Library {
474 Library {
475 name: name.to_string(),
476 locations: paths.iter().map(|s| (*s).to_string()).collect(),
477 item_id: format!("id-{name}"),
478 collection_type: None,
479 }
480 }
481
482 fn target() -> Emby {
483 Emby {
484 url: "http://x".to_string(),
485 token: "t".to_string(),
486 metadata_refresh_mode: EmbyMetadataRefreshMode::default(),
487 refresh_metadata: true,
488 rewrite: None,
489 filter: PathFilter::default(),
490 request: Request::default(),
491 path_match: PathMatch::default(),
492 }
493 }
494
495 #[test]
496 fn matches_when_library_location_is_exact_prefix() {
497 let libs = vec![lib("TV", &["/media/TV"])];
498 let m = target().get_libraries(&libs, "/media/TV/Show/S01E01.mkv");
499 assert_eq!(m.len(), 1);
500 }
501
502 #[test]
503 fn matches_when_library_location_has_trailing_slash() {
504 let libs = vec![lib("TV", &["/media/TV/"])];
505 let m = target().get_libraries(&libs, "/media/TV/Show/S01E01.mkv");
506 assert_eq!(
507 m.len(),
508 1,
509 "trailing slash on library location must not break match"
510 );
511 }
512
513 #[test]
514 fn no_match_when_path_outside_library() {
515 let libs = vec![lib("TV", &["/data/TV"])];
516 let m = target().get_libraries(&libs, "/media/TV/Show.mkv");
517 assert_eq!(m.len(), 0);
518 }
519
520 #[test]
521 fn case_insensitive_matching_when_opted_in() {
522 let mut t = target();
523 t.path_match = PathMatch::CaseInsensitive;
524 let libs = vec![lib("TV", &["/Media/TV"])];
525 let m = t.get_libraries(&libs, "/media/tv/Show.mkv");
526 assert_eq!(
527 m.len(),
528 1,
529 "case-insensitive must match across case differences"
530 );
531 }
532
533 #[test]
534 fn case_sensitive_matching_by_default() {
535 let libs = vec![lib("TV", &["/Media/TV"])];
536 let m = target().get_libraries(&libs, "/media/tv/Show.mkv");
537 assert_eq!(m.len(), 0);
538 }
539}