autopulse_service/settings/targets/
emby.rs1use super::RequestBuilderPerform;
2use crate::settings::rewrite::Rewrite;
3use crate::settings::targets::TargetProcess;
4use anyhow::Context;
5use autopulse_database::models::ScanEvent;
6use autopulse_utils::get_url;
7use reqwest::header;
8use serde::{Deserialize, Serialize};
9use std::{collections::HashMap, fmt::Display, io::Cursor, path::Path};
10use struson::{
11 json_path,
12 reader::{JsonReader, JsonStreamReader},
13};
14use tokio::sync::mpsc::UnboundedReceiver;
15use tracing::{debug, error};
16
17#[doc(hidden)]
18const fn default_true() -> bool {
19 true
20}
21
22#[derive(Clone, Deserialize)]
23pub struct Emby {
24 pub url: String,
26 pub token: String,
28 #[serde(default)]
30 pub metadata_refresh_mode: EmbyMetadataRefreshMode,
31 #[serde(default = "default_true")]
33 pub refresh_metadata: bool,
34 pub rewrite: Option<Rewrite>,
36}
37
38#[derive(Clone, Deserialize)]
40#[serde(rename_all = "snake_case")]
41pub enum EmbyMetadataRefreshMode {
42 None,
44 ValidationOnly,
46 Default,
48 FullRefresh,
50}
51
52impl Display for EmbyMetadataRefreshMode {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 let mode = match self {
55 Self::None => "None",
56 Self::ValidationOnly => "ValidationOnly",
57 Self::Default => "Default",
58 Self::FullRefresh => "FullRefresh",
59 };
60
61 write!(f, "{mode}")
62 }
63}
64
65impl Default for EmbyMetadataRefreshMode {
66 fn default() -> Self {
67 Self::FullRefresh
68 }
69}
70
71#[derive(Deserialize, Clone, Eq, PartialEq, Hash)]
72#[serde(rename_all = "PascalCase")]
73#[doc(hidden)]
74struct Library {
75 #[allow(dead_code)]
76 name: String,
77 locations: Vec<String>,
78 item_id: String,
79 collection_type: Option<String>,
80}
81
82#[derive(Serialize, Clone)]
83#[serde(rename_all = "PascalCase")]
84#[doc(hidden)]
85struct UpdateRequest {
86 path: String,
87 update_type: String,
88}
89
90#[derive(Serialize, Clone)]
91#[serde(rename_all = "PascalCase")]
92#[doc(hidden)]
93struct ScanPayload {
94 updates: Vec<UpdateRequest>,
95}
96
97#[derive(Deserialize, Clone)]
98#[serde(rename_all = "PascalCase")]
99#[doc(hidden)]
100struct Item {
101 id: String,
102 path: Option<String>,
103}
104
105impl Emby {
106 fn get_client(&self) -> anyhow::Result<reqwest::Client> {
107 let mut headers = header::HeaderMap::new();
108
109 headers.insert("X-Emby-Token", self.token.parse()?);
110 headers.insert(
111 "Authorization",
112 format!("MediaBrowser Token=\"{}\"", self.token).parse()?,
113 );
114 headers.insert("Accept", "application/json".parse()?);
115
116 reqwest::Client::builder()
117 .timeout(std::time::Duration::from_secs(10))
118 .default_headers(headers)
119 .build()
120 .map_err(Into::into)
121 }
122
123 async fn libraries(&self) -> anyhow::Result<Vec<Library>> {
124 let client = self.get_client()?;
125 let url = get_url(&self.url)?.join("Library/VirtualFolders")?;
126
127 let res = client.get(url).perform().await?;
128
129 Ok(res.json().await?)
130 }
131
132 fn get_libraries(&self, libraries: &[Library], path: &str) -> Vec<Library> {
133 let ev_path = Path::new(path);
134 let mut matched: Vec<Library> = vec![];
135
136 for library in libraries {
137 for location in &library.locations {
138 let path = Path::new(location);
139
140 if ev_path.starts_with(path) {
141 matched.push(library.clone());
142 }
143 }
144 }
145
146 matched
147 }
148
149 async fn _get_item(&self, library: &Library, path: &str) -> anyhow::Result<Option<Item>> {
150 let client = self.get_client()?;
151 let mut url = get_url(&self.url)?.join("Items")?;
152
153 url.query_pairs_mut().append_pair("Recursive", "true");
154 url.query_pairs_mut().append_pair("Fields", "Path");
155 url.query_pairs_mut().append_pair("EnableImages", "false");
156 if let Some(collection_type) = &library.collection_type {
157 url.query_pairs_mut().append_pair(
158 "IncludeItemTypes",
159 match collection_type.as_str() {
160 "tvshows" => "Episode",
161 "books" => "Book",
162 "music" => "Audio",
163 "movie" => "VideoFile,Movie",
164 _ => "",
165 },
166 );
167 }
168 url.query_pairs_mut()
169 .append_pair("ParentId", &library.item_id);
170 url.query_pairs_mut()
171 .append_pair("EnableTotalRecordCount", "false");
172
173 let res = client.get(url).perform().await?;
174
175 let bytes = res.bytes().await?;
177
178 let mut json_reader = JsonStreamReader::new(Cursor::new(bytes));
179
180 json_reader.seek_to(&json_path!["Items"])?;
181 json_reader.begin_array()?;
182
183 while json_reader.has_next()? {
184 let item: Item = json_reader.deserialize_next()?;
185
186 if item.path == Some(path.to_owned()) {
187 return Ok(Some(item));
188 }
189 }
190
191 Ok(None)
192 }
193
194 fn fetch_items(
195 &self,
196 library: &Library,
197 ) -> anyhow::Result<(
198 UnboundedReceiver<Item>,
199 tokio::task::JoinHandle<anyhow::Result<()>>,
200 )> {
201 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
202 let limit = 1000;
203
204 let client = self.get_client()?;
205 let mut url = get_url(&self.url)?.join("Items")?;
206
207 url.query_pairs_mut().append_pair("Recursive", "true");
208 url.query_pairs_mut().append_pair("Fields", "Path");
209 url.query_pairs_mut().append_pair("EnableImages", "false");
210 url.query_pairs_mut()
211 .append_pair("ParentId", &library.item_id);
212 url.query_pairs_mut()
213 .append_pair("EnableTotalRecordCount", "false");
214 url.query_pairs_mut()
215 .append_pair("Limit", &limit.to_string());
216 if let Some(collection_type) = &library.collection_type {
217 url.query_pairs_mut().append_pair(
218 "IncludeItemTypes",
219 match collection_type.as_str() {
220 "tvshows" => "Episode",
221 "books" => "Book",
222 "music" => "Audio",
223 "movie" => "VideoFile,Movie",
224 _ => "",
225 },
226 );
227 }
228
229 let handle = tokio::spawn(async move {
230 let mut page = 0;
231
232 loop {
233 let mut page_url = url.clone();
234 page_url
235 .query_pairs_mut()
236 .append_pair("StartIndex", &(page * limit).to_string());
237
238 let res = client.get(page_url).perform().await?;
239
240 let bytes = res.bytes().await?;
241
242 let mut json_reader = JsonStreamReader::new(Cursor::new(bytes));
243
244 json_reader.seek_to(&json_path!["Items"])?;
245 json_reader.begin_array()?;
246
247 let mut found_items_count = 0;
248
249 while json_reader.has_next()? {
250 let item: Item = json_reader.deserialize_next()?;
251
252 tx.send(item)?;
253
254 found_items_count += 1;
255 }
256
257 if found_items_count < limit {
258 break;
259 }
260
261 page += 1;
262 }
263
264 drop(tx);
265
266 Ok(())
267 });
268
269 Ok((rx, handle))
270 }
271
272 async fn get_items<'a>(
273 &self,
274 library: &Library,
275 events: Vec<&'a ScanEvent>,
276 ) -> anyhow::Result<(Vec<(&'a ScanEvent, Item)>, Vec<&'a ScanEvent>)> {
277 let (mut rx, handle) = self.fetch_items(library)?;
278
279 let mut found_in_library = Vec::new();
280 let mut not_found_in_library = events.clone();
281
282 while let Some(item) = rx.recv().await {
283 if let Some(ev) = events
284 .iter()
285 .find(|ev| item.path == Some(ev.get_path(&self.rewrite)))
286 {
287 found_in_library.push((*ev, item.clone()));
288 not_found_in_library.retain(|&e| e.id != ev.id);
289
290 if not_found_in_library.is_empty() {
291 break;
292 }
293 }
294 }
295
296 handle.abort();
297
298 Ok((found_in_library, not_found_in_library))
299 }
300
301 async fn scan(&self, ev: &[&ScanEvent]) -> anyhow::Result<()> {
303 let client = self.get_client()?;
304 let url = get_url(&self.url)?.join("Library/Media/Updated")?;
305
306 let updates = ev
307 .iter()
308 .map(|ev| UpdateRequest {
309 path: ev.get_path(&self.rewrite),
310 update_type: "Modified".to_string(),
311 })
312 .collect();
313
314 let body = ScanPayload { updates };
315
316 client
317 .post(url)
318 .header("Content-Type", "application/json")
319 .json(&body)
320 .perform()
321 .await
322 .map(|_| ())
323 }
324
325 async fn refresh_item(&self, item: &Item) -> anyhow::Result<()> {
326 let client = self.get_client()?;
327 let mut url = get_url(&self.url)?.join(&format!("Items/{}/Refresh", item.id))?;
328
329 url.query_pairs_mut().append_pair(
330 "MetadataRefreshMode",
331 &self.metadata_refresh_mode.to_string(),
332 );
333 url.query_pairs_mut()
334 .append_pair("ImageRefreshMode", &self.metadata_refresh_mode.to_string());
335 url.query_pairs_mut()
336 .append_pair("ReplaceAllMetadata", "true");
337 url.query_pairs_mut().append_pair("Recursive", "true");
338
339 url.query_pairs_mut()
341 .append_pair("ReplaceAllImages", "false");
342 url.query_pairs_mut()
343 .append_pair("RegenerateTrickplay", "false");
344
345 client.post(url).perform().await.map(|_| ())
346 }
347}
348
349impl TargetProcess for Emby {
350 async fn process(&self, evs: &[&ScanEvent]) -> anyhow::Result<Vec<String>> {
351 let libraries = self
352 .libraries()
353 .await
354 .context("failed to fetch libraries")?;
355
356 let mut succeeded: HashMap<String, bool> = HashMap::new();
357
358 let mut to_find = HashMap::new();
359 let mut to_refresh = Vec::new();
360 let mut to_scan = Vec::new();
361
362 if self.refresh_metadata {
363 for ev in evs {
364 let ev_path = ev.get_path(&self.rewrite);
365
366 let matched_libraries = self.get_libraries(&libraries, &ev_path);
367
368 if matched_libraries.is_empty() {
369 error!("failed to find library for file: {}", ev_path);
370 continue;
371 }
372
373 for library in matched_libraries {
374 to_find.entry(library).or_insert_with(Vec::new).push(*ev);
375 }
376 }
377
378 for (library, library_events) in to_find {
379 let (found_in_library, not_found_in_library) = self
380 .get_items(&library, library_events.clone())
381 .await
382 .with_context(|| {
383 format!(
384 "failed to fetch items for library: {}",
385 library.name.clone()
386 )
387 })?;
388
389 to_refresh.extend(found_in_library);
390 to_scan.extend(not_found_in_library);
391 }
392
393 for (ev, item) in to_refresh {
394 match self.refresh_item(&item).await {
395 Ok(()) => {
396 debug!("refreshed item: {}", item.id);
397 *succeeded.entry(ev.id.clone()).or_insert(true) &= true;
398 }
399 Err(e) => {
400 error!("failed to refresh item: {}", e);
401 succeeded.insert(ev.id.clone(), false);
402 }
403 }
404 }
405 } else {
406 to_scan.extend(evs.iter().copied());
407 }
408
409 if !to_scan.is_empty() {
410 match self.scan(&to_scan).await {
411 Ok(()) => {
412 for ev in &to_scan {
413 debug!("scanned file: {}", ev.file_path);
414
415 *succeeded.entry(ev.id.clone()).or_insert(true) &= true;
416 }
417 }
418 Err(e) => {
419 error!("failed to scan items: {}", e);
420
421 for ev in &to_scan {
422 succeeded.insert(ev.id.clone(), false);
423 }
424 }
425 }
426 }
427
428 Ok(succeeded
429 .iter()
430 .filter_map(|(k, v)| if *v { Some(k.clone()) } else { None })
431 .collect())
432 }
433}