autopulse_service/settings/targets/
emby.rs1use super::RequestBuilderPerform;
2use crate::settings::rewrite::Rewrite;
3use crate::settings::targets::TargetProcess;
4use anyhow::Context;
5use autopulse_database::models::ScanEvent;
6use autopulse_utils::get_url;
7use reqwest::header;
8use serde::{Deserialize, Serialize};
9use std::{collections::HashMap, fmt::Display, io::Cursor, path::Path};
10use struson::{
11 json_path,
12 reader::{JsonReader, JsonStreamReader},
13};
14use tokio::sync::mpsc::UnboundedReceiver;
15use tracing::{debug, error};
16
17#[doc(hidden)]
18const fn default_true() -> bool {
19 true
20}
21
22#[derive(Serialize, Clone, Deserialize)]
23pub struct Emby {
24 pub url: String,
26 pub token: String,
28 #[serde(default)]
30 pub metadata_refresh_mode: EmbyMetadataRefreshMode,
31 #[serde(default = "default_true")]
33 pub refresh_metadata: bool,
34 pub rewrite: Option<Rewrite>,
36}
37
38#[derive(Serialize, Clone, Deserialize)]
40#[serde(rename_all = "snake_case")]
41#[derive(Default)]
42pub enum EmbyMetadataRefreshMode {
43 None,
45 ValidationOnly,
47 Default,
49 #[default]
51 FullRefresh,
52}
53
54impl Display for EmbyMetadataRefreshMode {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 let mode = match self {
57 Self::None => "None",
58 Self::ValidationOnly => "ValidationOnly",
59 Self::Default => "Default",
60 Self::FullRefresh => "FullRefresh",
61 };
62
63 write!(f, "{mode}")
64 }
65}
66
67#[derive(Deserialize, Clone, Eq, PartialEq, Hash)]
68#[serde(rename_all = "PascalCase")]
69#[doc(hidden)]
70struct Library {
71 #[allow(dead_code)]
72 name: String,
73 locations: Vec<String>,
74 item_id: String,
75 collection_type: Option<String>,
76}
77
78#[derive(Serialize, Clone)]
79#[serde(rename_all = "PascalCase")]
80#[doc(hidden)]
81struct UpdateRequest {
82 path: String,
83 update_type: String,
84}
85
86#[derive(Serialize, Clone)]
87#[serde(rename_all = "PascalCase")]
88#[doc(hidden)]
89struct ScanPayload {
90 updates: Vec<UpdateRequest>,
91}
92
93#[derive(Deserialize, Clone)]
94#[serde(rename_all = "PascalCase")]
95#[doc(hidden)]
96struct Item {
97 id: String,
98 path: Option<String>,
99}
100
101impl Emby {
102 fn get_client(&self) -> anyhow::Result<reqwest::Client> {
103 let mut headers = header::HeaderMap::new();
104
105 headers.insert("X-Emby-Token", self.token.parse()?);
106 headers.insert(
107 "Authorization",
108 format!("MediaBrowser Token=\"{}\"", self.token).parse()?,
109 );
110 headers.insert("Accept", "application/json".parse()?);
111
112 reqwest::Client::builder()
113 .timeout(std::time::Duration::from_secs(10))
114 .default_headers(headers)
115 .build()
116 .map_err(Into::into)
117 }
118
119 async fn libraries(&self) -> anyhow::Result<Vec<Library>> {
120 let client = self.get_client()?;
121 let url = get_url(&self.url)?.join("Library/VirtualFolders")?;
122
123 let res = client.get(url).perform().await?;
124
125 Ok(res.json().await?)
126 }
127
128 fn get_libraries(&self, libraries: &[Library], path: &str) -> Vec<Library> {
129 let ev_path = Path::new(path);
130 let mut matched: Vec<Library> = vec![];
131
132 for library in libraries {
133 for location in &library.locations {
134 let path = Path::new(location);
135
136 if ev_path.starts_with(path) {
137 matched.push(library.clone());
138 }
139 }
140 }
141
142 matched
143 }
144
145 async fn _get_item(&self, library: &Library, path: &str) -> anyhow::Result<Option<Item>> {
146 let client = self.get_client()?;
147 let mut url = get_url(&self.url)?.join("Items")?;
148
149 url.query_pairs_mut().append_pair("Recursive", "true");
150 url.query_pairs_mut().append_pair("Fields", "Path");
151 url.query_pairs_mut().append_pair("EnableImages", "false");
152 if let Some(collection_type) = &library.collection_type {
153 url.query_pairs_mut().append_pair(
154 "IncludeItemTypes",
155 match collection_type.as_str() {
156 "tvshows" => "Episode",
157 "books" => "Book",
158 "music" => "Audio",
159 "movie" => "VideoFile,Movie",
160 _ => "",
161 },
162 );
163 }
164 url.query_pairs_mut()
165 .append_pair("ParentId", &library.item_id);
166 url.query_pairs_mut()
167 .append_pair("EnableTotalRecordCount", "false");
168
169 let res = client.get(url).perform().await?;
170
171 let bytes = res.bytes().await?;
173
174 let mut json_reader = JsonStreamReader::new(Cursor::new(bytes));
175
176 json_reader.seek_to(&json_path!["Items"])?;
177 json_reader.begin_array()?;
178
179 while json_reader.has_next()? {
180 let item: Item = json_reader.deserialize_next()?;
181
182 if item.path == Some(path.to_owned()) {
183 return Ok(Some(item));
184 }
185 }
186
187 Ok(None)
188 }
189
190 fn fetch_items(
191 &self,
192 library: &Library,
193 ) -> anyhow::Result<(
194 UnboundedReceiver<Item>,
195 tokio::task::JoinHandle<anyhow::Result<()>>,
196 )> {
197 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
198 let limit = 1000;
199
200 let client = self.get_client()?;
201 let mut url = get_url(&self.url)?.join("Items")?;
202
203 url.query_pairs_mut().append_pair("Recursive", "true");
204 url.query_pairs_mut().append_pair("Fields", "Path");
205 url.query_pairs_mut().append_pair("EnableImages", "false");
206 url.query_pairs_mut()
207 .append_pair("ParentId", &library.item_id);
208 url.query_pairs_mut()
209 .append_pair("EnableTotalRecordCount", "false");
210 url.query_pairs_mut()
211 .append_pair("Limit", &limit.to_string());
212 if let Some(collection_type) = &library.collection_type {
213 url.query_pairs_mut().append_pair(
214 "IncludeItemTypes",
215 match collection_type.as_str() {
216 "tvshows" => "Episode",
217 "books" => "Book",
218 "music" => "Audio",
219 "movie" => "VideoFile,Movie",
220 _ => "",
221 },
222 );
223 }
224
225 let handle = tokio::spawn(async move {
226 let mut page = 0;
227
228 loop {
229 let mut page_url = url.clone();
230 page_url
231 .query_pairs_mut()
232 .append_pair("StartIndex", &(page * limit).to_string());
233
234 let res = client.get(page_url).perform().await?;
235
236 let bytes = res.bytes().await?;
237
238 let mut json_reader = JsonStreamReader::new(Cursor::new(bytes));
239
240 json_reader.seek_to(&json_path!["Items"])?;
241 json_reader.begin_array()?;
242
243 let mut found_items_count = 0;
244
245 while json_reader.has_next()? {
246 let item: Item = json_reader.deserialize_next()?;
247
248 tx.send(item)?;
249
250 found_items_count += 1;
251 }
252
253 if found_items_count < limit {
254 break;
255 }
256
257 page += 1;
258 }
259
260 drop(tx);
261
262 Ok(())
263 });
264
265 Ok((rx, handle))
266 }
267
268 async fn get_items<'a>(
269 &self,
270 library: &Library,
271 events: Vec<&'a ScanEvent>,
272 ) -> anyhow::Result<(Vec<(&'a ScanEvent, Item)>, Vec<&'a ScanEvent>)> {
273 let (mut rx, handle) = self.fetch_items(library)?;
274
275 let mut found_in_library = Vec::new();
276 let mut not_found_in_library = events.clone();
277
278 while let Some(item) = rx.recv().await {
279 if let Some(ev) = events
280 .iter()
281 .find(|ev| item.path == Some(ev.get_path(&self.rewrite)))
282 {
283 found_in_library.push((*ev, item.clone()));
284 not_found_in_library.retain(|&e| e.id != ev.id);
285
286 if not_found_in_library.is_empty() {
287 break;
288 }
289 }
290 }
291
292 handle.abort();
293
294 Ok((found_in_library, not_found_in_library))
295 }
296
297 async fn scan(&self, ev: &[&ScanEvent]) -> anyhow::Result<()> {
299 let client = self.get_client()?;
300 let url = get_url(&self.url)?.join("Library/Media/Updated")?;
301
302 let updates = ev
303 .iter()
304 .map(|ev| UpdateRequest {
305 path: ev.get_path(&self.rewrite),
306 update_type: "Modified".to_string(),
307 })
308 .collect();
309
310 let body = ScanPayload { updates };
311
312 client
313 .post(url)
314 .header("Content-Type", "application/json")
315 .json(&body)
316 .perform()
317 .await
318 .map(|_| ())
319 }
320
321 async fn refresh_item(&self, item: &Item) -> anyhow::Result<()> {
322 let client = self.get_client()?;
323 let mut url = get_url(&self.url)?.join(&format!("Items/{}/Refresh", item.id))?;
324
325 url.query_pairs_mut().append_pair(
326 "MetadataRefreshMode",
327 &self.metadata_refresh_mode.to_string(),
328 );
329 url.query_pairs_mut()
330 .append_pair("ImageRefreshMode", &self.metadata_refresh_mode.to_string());
331 url.query_pairs_mut()
332 .append_pair("ReplaceAllMetadata", "true");
333 url.query_pairs_mut().append_pair("Recursive", "true");
334
335 url.query_pairs_mut()
337 .append_pair("ReplaceAllImages", "false");
338 url.query_pairs_mut()
339 .append_pair("RegenerateTrickplay", "false");
340
341 client.post(url).perform().await.map(|_| ())
342 }
343}
344
345impl TargetProcess for Emby {
346 async fn process(&self, evs: &[&ScanEvent]) -> anyhow::Result<Vec<String>> {
347 let libraries = self
348 .libraries()
349 .await
350 .context("failed to fetch libraries")?;
351
352 let mut succeeded: HashMap<String, bool> = HashMap::new();
353
354 let mut to_find = HashMap::new();
355 let mut to_refresh = Vec::new();
356 let mut to_scan = Vec::new();
357
358 if self.refresh_metadata {
359 for ev in evs {
360 let ev_path = ev.get_path(&self.rewrite);
361
362 let matched_libraries = self.get_libraries(&libraries, &ev_path);
363
364 if matched_libraries.is_empty() {
365 error!("failed to find library for file: {}", ev_path);
366 continue;
367 }
368
369 for library in matched_libraries {
370 to_find.entry(library).or_insert_with(Vec::new).push(*ev);
371 }
372 }
373
374 for (library, library_events) in to_find {
375 let (found_in_library, not_found_in_library) = self
376 .get_items(&library, library_events.clone())
377 .await
378 .with_context(|| {
379 format!(
380 "failed to fetch items for library: {}",
381 library.name.clone()
382 )
383 })?;
384
385 to_refresh.extend(found_in_library);
386 to_scan.extend(not_found_in_library);
387 }
388
389 for (ev, item) in to_refresh {
390 match self.refresh_item(&item).await {
391 Ok(()) => {
392 debug!("refreshed item: {}", item.id);
393 *succeeded.entry(ev.id.clone()).or_insert(true) &= true;
394 }
395 Err(e) => {
396 error!("failed to refresh item: {}", e);
397 succeeded.insert(ev.id.clone(), false);
398 }
399 }
400 }
401 } else {
402 to_scan.extend(evs.iter().copied());
403 }
404
405 if !to_scan.is_empty() {
406 match self.scan(&to_scan).await {
407 Ok(()) => {
408 for ev in &to_scan {
409 debug!("scanned file: {}", ev.file_path);
410
411 *succeeded.entry(ev.id.clone()).or_insert(true) &= true;
412 }
413 }
414 Err(e) => {
415 error!("failed to scan items: {}", e);
416
417 for ev in &to_scan {
418 succeeded.insert(ev.id.clone(), false);
419 }
420 }
421 }
422 }
423
424 Ok(succeeded
425 .iter()
426 .filter_map(|(k, v)| if *v { Some(k.clone()) } else { None })
427 .collect())
428 }
429}