autopulse_service/settings/targets/
emby.rs1use super::{Request, RequestBuilderPerform};
2use crate::settings::rewrite::Rewrite;
3use crate::settings::targets::TargetProcess;
4use anyhow::Context;
5use autopulse_database::models::ScanEvent;
6use autopulse_utils::get_url;
7use reqwest::header;
8use serde::{Deserialize, Serialize};
9use std::{collections::HashMap, fmt::Display, io::Cursor, path::Path};
10use struson::{
11 json_path,
12 reader::{JsonReader, JsonStreamReader},
13};
14use tokio::sync::mpsc::UnboundedReceiver;
15use tracing::{debug, error};
16
17#[doc(hidden)]
18const fn default_true() -> bool {
19 true
20}
21
22#[derive(Serialize, Clone, Deserialize)]
23pub struct Emby {
24 pub url: String,
26 pub token: String,
28 #[serde(default)]
30 pub metadata_refresh_mode: EmbyMetadataRefreshMode,
31 #[serde(default = "default_true")]
33 pub refresh_metadata: bool,
34 pub rewrite: Option<Rewrite>,
36 #[serde(default)]
38 pub request: Request,
39}
40
41#[derive(Serialize, Clone, Deserialize)]
43#[serde(rename_all = "snake_case")]
44#[derive(Default)]
45pub enum EmbyMetadataRefreshMode {
46 None,
48 ValidationOnly,
50 Default,
52 #[default]
54 FullRefresh,
55}
56
57impl Display for EmbyMetadataRefreshMode {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 let mode = match self {
60 Self::None => "None",
61 Self::ValidationOnly => "ValidationOnly",
62 Self::Default => "Default",
63 Self::FullRefresh => "FullRefresh",
64 };
65
66 write!(f, "{mode}")
67 }
68}
69
70#[derive(Deserialize, Clone, Eq, PartialEq, Hash)]
71#[serde(rename_all = "PascalCase")]
72#[doc(hidden)]
73struct Library {
74 #[allow(dead_code)]
75 name: String,
76 locations: Vec<String>,
77 item_id: String,
78 collection_type: Option<String>,
79}
80
81#[derive(Serialize, Clone)]
82#[serde(rename_all = "PascalCase")]
83#[doc(hidden)]
84struct UpdateRequest {
85 path: String,
86 update_type: String,
87}
88
89#[derive(Serialize, Clone)]
90#[serde(rename_all = "PascalCase")]
91#[doc(hidden)]
92struct ScanPayload {
93 updates: Vec<UpdateRequest>,
94}
95
96#[derive(Deserialize, Clone)]
97#[serde(rename_all = "PascalCase")]
98#[doc(hidden)]
99struct Item {
100 id: String,
101 path: Option<String>,
102}
103
104impl Emby {
105 fn get_client(&self) -> anyhow::Result<reqwest::Client> {
106 let mut headers = header::HeaderMap::new();
107
108 headers.insert("X-Emby-Token", self.token.parse()?);
109 headers.insert(
110 "Authorization",
111 format!("MediaBrowser Token=\"{}\"", self.token).parse()?,
112 );
113 headers.insert("Accept", "application/json".parse()?);
114
115 self.request
116 .client_builder(headers)
117 .build()
118 .map_err(Into::into)
119 }
120
121 async fn libraries(&self) -> anyhow::Result<Vec<Library>> {
122 let client = self.get_client()?;
123 let url = get_url(&self.url)?.join("Library/VirtualFolders")?;
124
125 let res = client.get(url).perform().await?;
126
127 Ok(res.json().await?)
128 }
129
130 fn get_libraries(&self, libraries: &[Library], path: &str) -> Vec<Library> {
131 let ev_path = Path::new(path);
132 let mut matched: Vec<Library> = vec![];
133
134 for library in libraries {
135 for location in &library.locations {
136 let path = Path::new(location);
137
138 if ev_path.starts_with(path) {
139 matched.push(library.clone());
140 }
141 }
142 }
143
144 matched
145 }
146
147 async fn _get_item(&self, library: &Library, path: &str) -> anyhow::Result<Option<Item>> {
148 let client = self.get_client()?;
149 let mut url = get_url(&self.url)?.join("Items")?;
150
151 url.query_pairs_mut().append_pair("Recursive", "true");
152 url.query_pairs_mut().append_pair("Fields", "Path");
153 url.query_pairs_mut().append_pair("EnableImages", "false");
154 if let Some(collection_type) = &library.collection_type {
155 url.query_pairs_mut().append_pair(
156 "IncludeItemTypes",
157 match collection_type.as_str() {
158 "tvshows" => "Episode",
159 "books" => "Book",
160 "music" => "Audio",
161 "movie" => "VideoFile,Movie",
162 _ => "",
163 },
164 );
165 }
166 url.query_pairs_mut()
167 .append_pair("ParentId", &library.item_id);
168 url.query_pairs_mut()
169 .append_pair("EnableTotalRecordCount", "false");
170
171 let res = client.get(url).perform().await?;
172
173 let bytes = res.bytes().await?;
175
176 let mut json_reader = JsonStreamReader::new(Cursor::new(bytes));
177
178 json_reader.seek_to(&json_path!["Items"])?;
179 json_reader.begin_array()?;
180
181 while json_reader.has_next()? {
182 let item: Item = json_reader.deserialize_next()?;
183
184 if item.path == Some(path.to_owned()) {
185 return Ok(Some(item));
186 }
187 }
188
189 Ok(None)
190 }
191
192 fn fetch_items(
193 &self,
194 library: &Library,
195 ) -> anyhow::Result<(
196 UnboundedReceiver<Item>,
197 tokio::task::JoinHandle<anyhow::Result<()>>,
198 )> {
199 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
200 let limit = 1000;
201
202 let client = self.get_client()?;
203 let mut url = get_url(&self.url)?.join("Items")?;
204
205 url.query_pairs_mut().append_pair("Recursive", "true");
206 url.query_pairs_mut().append_pair("Fields", "Path");
207 url.query_pairs_mut().append_pair("EnableImages", "false");
208 url.query_pairs_mut()
209 .append_pair("ParentId", &library.item_id);
210 url.query_pairs_mut()
211 .append_pair("EnableTotalRecordCount", "false");
212 url.query_pairs_mut()
213 .append_pair("Limit", &limit.to_string());
214 if let Some(collection_type) = &library.collection_type {
215 url.query_pairs_mut().append_pair(
216 "IncludeItemTypes",
217 match collection_type.as_str() {
218 "tvshows" => "Episode",
219 "books" => "Book",
220 "music" => "Audio",
221 "movie" => "VideoFile,Movie",
222 _ => "",
223 },
224 );
225 }
226
227 let handle = tokio::spawn(async move {
228 let mut page = 0;
229
230 loop {
231 let mut page_url = url.clone();
232 page_url
233 .query_pairs_mut()
234 .append_pair("StartIndex", &(page * limit).to_string());
235
236 let res = client.get(page_url).perform().await?;
237
238 let bytes = res.bytes().await?;
239
240 let mut json_reader = JsonStreamReader::new(Cursor::new(bytes));
241
242 json_reader.seek_to(&json_path!["Items"])?;
243 json_reader.begin_array()?;
244
245 let mut found_items_count = 0;
246
247 while json_reader.has_next()? {
248 let item: Item = json_reader.deserialize_next()?;
249
250 tx.send(item)?;
251
252 found_items_count += 1;
253 }
254
255 if found_items_count < limit {
256 break;
257 }
258
259 page += 1;
260 }
261
262 drop(tx);
263
264 Ok(())
265 });
266
267 Ok((rx, handle))
268 }
269
270 async fn get_items<'a>(
271 &self,
272 library: &Library,
273 events: Vec<&'a ScanEvent>,
274 ) -> anyhow::Result<(Vec<(&'a ScanEvent, Item)>, Vec<&'a ScanEvent>)> {
275 let (mut rx, handle) = self.fetch_items(library)?;
276
277 let mut found_in_library = Vec::new();
278 let mut not_found_in_library = events.clone();
279
280 while let Some(item) = rx.recv().await {
281 if let Some(ev) = events
282 .iter()
283 .find(|ev| item.path == Some(ev.get_path(&self.rewrite)))
284 {
285 found_in_library.push((*ev, item.clone()));
286 not_found_in_library.retain(|&e| e.id != ev.id);
287
288 if not_found_in_library.is_empty() {
289 break;
290 }
291 }
292 }
293
294 handle.abort();
295
296 Ok((found_in_library, not_found_in_library))
297 }
298
299 async fn scan(&self, ev: &[&ScanEvent]) -> anyhow::Result<()> {
301 let client = self.get_client()?;
302 let url = get_url(&self.url)?.join("Library/Media/Updated")?;
303
304 let updates = ev
305 .iter()
306 .map(|ev| UpdateRequest {
307 path: ev.get_path(&self.rewrite),
308 update_type: "Modified".to_string(),
309 })
310 .collect();
311
312 let body = ScanPayload { updates };
313
314 client
315 .post(url)
316 .header("Content-Type", "application/json")
317 .json(&body)
318 .perform()
319 .await
320 .map(|_| ())
321 }
322
323 async fn refresh_item(&self, item: &Item) -> anyhow::Result<()> {
324 let client = self.get_client()?;
325 let mut url = get_url(&self.url)?.join(&format!("Items/{}/Refresh", item.id))?;
326
327 url.query_pairs_mut().append_pair(
328 "MetadataRefreshMode",
329 &self.metadata_refresh_mode.to_string(),
330 );
331 url.query_pairs_mut()
332 .append_pair("ImageRefreshMode", &self.metadata_refresh_mode.to_string());
333 url.query_pairs_mut()
334 .append_pair("ReplaceAllMetadata", "true");
335 url.query_pairs_mut().append_pair("Recursive", "true");
336
337 url.query_pairs_mut()
339 .append_pair("ReplaceAllImages", "false");
340 url.query_pairs_mut()
341 .append_pair("RegenerateTrickplay", "false");
342
343 client.post(url).perform().await.map(|_| ())
344 }
345}
346
347impl TargetProcess for Emby {
348 async fn process(&self, evs: &[&ScanEvent]) -> anyhow::Result<Vec<String>> {
349 let libraries = self
350 .libraries()
351 .await
352 .context("failed to fetch libraries")?;
353
354 let mut succeeded: HashMap<String, bool> = HashMap::new();
355
356 let mut to_find = HashMap::new();
357 let mut to_refresh = Vec::new();
358 let mut to_scan = Vec::new();
359
360 if self.refresh_metadata {
361 for ev in evs {
362 let ev_path = ev.get_path(&self.rewrite);
363
364 let matched_libraries = self.get_libraries(&libraries, &ev_path);
365
366 if matched_libraries.is_empty() {
367 error!("failed to find library for file: {}", ev_path);
368 continue;
369 }
370
371 for library in matched_libraries {
372 to_find.entry(library).or_insert_with(Vec::new).push(*ev);
373 }
374 }
375
376 for (library, library_events) in to_find {
377 let (found_in_library, not_found_in_library) = self
378 .get_items(&library, library_events.clone())
379 .await
380 .with_context(|| {
381 format!(
382 "failed to fetch items for library: {}",
383 library.name.clone()
384 )
385 })?;
386
387 to_refresh.extend(found_in_library);
388 to_scan.extend(not_found_in_library);
389 }
390
391 for (ev, item) in to_refresh {
392 match self.refresh_item(&item).await {
393 Ok(()) => {
394 debug!("refreshed item: {}", item.id);
395 *succeeded.entry(ev.id.clone()).or_insert(true) &= true;
396 }
397 Err(e) => {
398 error!("failed to refresh item: {}", e);
399 succeeded.insert(ev.id.clone(), false);
400 }
401 }
402 }
403 } else {
404 to_scan.extend(evs.iter().copied());
405 }
406
407 if !to_scan.is_empty() {
408 match self.scan(&to_scan).await {
409 Ok(()) => {
410 for ev in &to_scan {
411 debug!("scanned file: {}", ev.file_path);
412
413 *succeeded.entry(ev.id.clone()).or_insert(true) &= true;
414 }
415 }
416 Err(e) => {
417 error!("failed to scan items: {}", e);
418
419 for ev in &to_scan {
420 succeeded.insert(ev.id.clone(), false);
421 }
422 }
423 }
424 }
425
426 Ok(succeeded
427 .iter()
428 .filter_map(|(k, v)| if *v { Some(k.clone()) } else { None })
429 .collect())
430 }
431}