ScheduledTaskWorker.cs 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678
  1. #nullable disable
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Globalization;
  5. using System.IO;
  6. using System.Linq;
  7. using System.Text.Json;
  8. using System.Threading;
  9. using System.Threading.Tasks;
  10. using Emby.Server.Implementations.ScheduledTasks.Triggers;
  11. using Jellyfin.Data.Events;
  12. using Jellyfin.Extensions.Json;
  13. using MediaBrowser.Common.Configuration;
  14. using MediaBrowser.Common.Extensions;
  15. using MediaBrowser.Model.Tasks;
  16. using Microsoft.Extensions.Logging;
  17. namespace Emby.Server.Implementations.ScheduledTasks;
  18. /// <summary>
  19. /// Class ScheduledTaskWorker.
  20. /// </summary>
  21. public class ScheduledTaskWorker : IScheduledTaskWorker
  22. {
  23. private readonly JsonSerializerOptions _jsonOptions = JsonDefaults.Options;
  24. private readonly IApplicationPaths _applicationPaths;
  25. private readonly ILogger _logger;
  26. private readonly ITaskManager _taskManager;
  27. private readonly Lock _lastExecutionResultSyncLock = new();
  28. private bool _readFromFile;
  29. private TaskResult _lastExecutionResult;
  30. private Task _currentTask;
  31. private Tuple<TaskTriggerInfo, ITaskTrigger>[] _triggers;
  32. private string _id;
  33. /// <summary>
  34. /// Initializes a new instance of the <see cref="ScheduledTaskWorker" /> class.
  35. /// </summary>
  36. /// <param name="scheduledTask">The scheduled task.</param>
  37. /// <param name="applicationPaths">The application paths.</param>
  38. /// <param name="taskManager">The task manager.</param>
  39. /// <param name="logger">The logger.</param>
  40. /// <exception cref="ArgumentNullException">
  41. /// scheduledTask
  42. /// or
  43. /// applicationPaths
  44. /// or
  45. /// taskManager
  46. /// or
  47. /// jsonSerializer
  48. /// or
  49. /// logger.
  50. /// </exception>
  51. public ScheduledTaskWorker(IScheduledTask scheduledTask, IApplicationPaths applicationPaths, ITaskManager taskManager, ILogger logger)
  52. {
  53. ArgumentNullException.ThrowIfNull(scheduledTask);
  54. ArgumentNullException.ThrowIfNull(applicationPaths);
  55. ArgumentNullException.ThrowIfNull(taskManager);
  56. ArgumentNullException.ThrowIfNull(logger);
  57. ScheduledTask = scheduledTask;
  58. _applicationPaths = applicationPaths;
  59. _taskManager = taskManager;
  60. _logger = logger;
  61. InitTriggerEvents();
  62. }
  63. /// <inheritdoc />
  64. public event EventHandler<GenericEventArgs<double>> TaskProgress;
  65. /// <inheritdoc />
  66. public IScheduledTask ScheduledTask { get; private set; }
  67. /// <inheritdoc />
  68. public TaskResult LastExecutionResult
  69. {
  70. get
  71. {
  72. var path = GetHistoryFilePath();
  73. lock (_lastExecutionResultSyncLock)
  74. {
  75. if (_lastExecutionResult is null && !_readFromFile)
  76. {
  77. if (File.Exists(path))
  78. {
  79. var bytes = File.ReadAllBytes(path);
  80. if (bytes.Length > 0)
  81. {
  82. try
  83. {
  84. _lastExecutionResult = JsonSerializer.Deserialize<TaskResult>(bytes, _jsonOptions);
  85. }
  86. catch (JsonException ex)
  87. {
  88. _logger.LogError(ex, "Error deserializing {File}", path);
  89. }
  90. }
  91. else
  92. {
  93. _logger.LogDebug("Scheduled Task history file {Path} is empty. Skipping deserialization.", path);
  94. }
  95. }
  96. _readFromFile = true;
  97. }
  98. }
  99. return _lastExecutionResult;
  100. }
  101. private set
  102. {
  103. _lastExecutionResult = value;
  104. var path = GetHistoryFilePath();
  105. Directory.CreateDirectory(Path.GetDirectoryName(path));
  106. lock (_lastExecutionResultSyncLock)
  107. {
  108. using FileStream createStream = new FileStream(path, FileMode.Create, FileAccess.Write, FileShare.None);
  109. using Utf8JsonWriter jsonStream = new Utf8JsonWriter(createStream);
  110. JsonSerializer.Serialize(jsonStream, value, _jsonOptions);
  111. }
  112. }
  113. }
  114. /// <inheritdoc />
  115. public string Name => ScheduledTask.Name;
  116. /// <inheritdoc />
  117. public string Description => ScheduledTask.Description;
  118. /// <inheritdoc />
  119. public string Category => ScheduledTask.Category;
  120. /// <summary>
  121. /// Gets or sets the current cancellation token.
  122. /// </summary>
  123. /// <value>The current cancellation token source.</value>
  124. private CancellationTokenSource CurrentCancellationTokenSource { get; set; }
  125. /// <summary>
  126. /// Gets or sets the current execution start time.
  127. /// </summary>
  128. /// <value>The current execution start time.</value>
  129. private DateTime CurrentExecutionStartTime { get; set; }
  130. /// <inheritdoc />
  131. public TaskState State
  132. {
  133. get
  134. {
  135. if (CurrentCancellationTokenSource is not null)
  136. {
  137. return CurrentCancellationTokenSource.IsCancellationRequested
  138. ? TaskState.Cancelling
  139. : TaskState.Running;
  140. }
  141. return TaskState.Idle;
  142. }
  143. }
  144. /// <inheritdoc />
  145. public double? CurrentProgress { get; private set; }
  146. /// <summary>
  147. /// Gets or sets the triggers that define when the task will run.
  148. /// </summary>
  149. /// <value>The triggers.</value>
  150. private Tuple<TaskTriggerInfo, ITaskTrigger>[] InternalTriggers
  151. {
  152. get => _triggers;
  153. set
  154. {
  155. ArgumentNullException.ThrowIfNull(value);
  156. // Cleanup current triggers
  157. if (_triggers is not null)
  158. {
  159. DisposeTriggers();
  160. }
  161. _triggers = value.ToArray();
  162. ReloadTriggerEvents(false);
  163. }
  164. }
  165. /// <inheritdoc />
  166. public IReadOnlyList<TaskTriggerInfo> Triggers
  167. {
  168. get
  169. {
  170. return Array.ConvertAll(InternalTriggers, i => i.Item1);
  171. }
  172. set
  173. {
  174. ArgumentNullException.ThrowIfNull(value);
  175. // This null check is not great, but is needed to handle bad user input, or user mucking with the config file incorrectly
  176. var triggerList = value.Where(i => i is not null).ToArray();
  177. SaveTriggers(triggerList);
  178. InternalTriggers = Array.ConvertAll(triggerList, i => new Tuple<TaskTriggerInfo, ITaskTrigger>(i, GetTrigger(i)));
  179. }
  180. }
  181. /// <inheritdoc />
  182. public string Id
  183. {
  184. get
  185. {
  186. return _id ??= ScheduledTask.GetType().FullName.GetMD5().ToString("N", CultureInfo.InvariantCulture);
  187. }
  188. }
  189. private void InitTriggerEvents()
  190. {
  191. _triggers = LoadTriggers();
  192. ReloadTriggerEvents(true);
  193. }
  194. /// <inheritdoc />
  195. public void ReloadTriggerEvents()
  196. {
  197. ReloadTriggerEvents(false);
  198. }
  199. /// <summary>
  200. /// Reloads the trigger events.
  201. /// </summary>
  202. /// <param name="isApplicationStartup">if set to <c>true</c> [is application startup].</param>
  203. private void ReloadTriggerEvents(bool isApplicationStartup)
  204. {
  205. foreach (var triggerInfo in InternalTriggers)
  206. {
  207. var trigger = triggerInfo.Item2;
  208. trigger.Stop();
  209. trigger.Triggered -= OnTriggerTriggered;
  210. trigger.Triggered += OnTriggerTriggered;
  211. trigger.Start(LastExecutionResult, _logger, Name, isApplicationStartup);
  212. }
  213. }
  214. /// <summary>
  215. /// Handles the Triggered event of the trigger control.
  216. /// </summary>
  217. /// <param name="sender">The source of the event.</param>
  218. /// <param name="e">The <see cref="EventArgs" /> instance containing the event data.</param>
  219. private async void OnTriggerTriggered(object sender, EventArgs e)
  220. {
  221. var trigger = (ITaskTrigger)sender;
  222. if (ScheduledTask is IConfigurableScheduledTask configurableTask && !configurableTask.IsEnabled)
  223. {
  224. return;
  225. }
  226. _logger.LogDebug("{0} fired for task: {1}", trigger.GetType().Name, Name);
  227. trigger.Stop();
  228. _taskManager.QueueScheduledTask(ScheduledTask, trigger.TaskOptions);
  229. await Task.Delay(1000).ConfigureAwait(false);
  230. trigger.Start(LastExecutionResult, _logger, Name, false);
  231. }
  232. /// <summary>
  233. /// Executes the task.
  234. /// </summary>
  235. /// <param name="options">Task options.</param>
  236. /// <returns>Task.</returns>
  237. /// <exception cref="InvalidOperationException">Cannot execute a Task that is already running.</exception>
  238. public async Task Execute(TaskOptions options)
  239. {
  240. var task = Task.Run(async () => await ExecuteInternal(options).ConfigureAwait(false));
  241. _currentTask = task;
  242. try
  243. {
  244. await task.ConfigureAwait(false);
  245. }
  246. finally
  247. {
  248. _currentTask = null;
  249. GC.Collect();
  250. }
  251. }
  252. private async Task ExecuteInternal(TaskOptions options)
  253. {
  254. // Cancel the current execution, if any
  255. if (CurrentCancellationTokenSource is not null)
  256. {
  257. throw new InvalidOperationException("Cannot execute a Task that is already running");
  258. }
  259. var progress = new Progress<double>();
  260. CurrentCancellationTokenSource = new CancellationTokenSource();
  261. _logger.LogDebug("Executing {0}", Name);
  262. ((TaskManager)_taskManager).OnTaskExecuting(this);
  263. progress.ProgressChanged += OnProgressChanged;
  264. TaskCompletionStatus status;
  265. CurrentExecutionStartTime = DateTime.UtcNow;
  266. Exception failureException = null;
  267. try
  268. {
  269. if (options is not null && options.MaxRuntimeTicks.HasValue)
  270. {
  271. CurrentCancellationTokenSource.CancelAfter(TimeSpan.FromTicks(options.MaxRuntimeTicks.Value));
  272. }
  273. await ScheduledTask.ExecuteAsync(progress, CurrentCancellationTokenSource.Token).ConfigureAwait(false);
  274. status = TaskCompletionStatus.Completed;
  275. }
  276. catch (OperationCanceledException)
  277. {
  278. status = TaskCompletionStatus.Cancelled;
  279. }
  280. catch (Exception ex)
  281. {
  282. _logger.LogError(ex, "Error executing Scheduled Task");
  283. failureException = ex;
  284. status = TaskCompletionStatus.Failed;
  285. }
  286. var startTime = CurrentExecutionStartTime;
  287. var endTime = DateTime.UtcNow;
  288. progress.ProgressChanged -= OnProgressChanged;
  289. CurrentCancellationTokenSource.Dispose();
  290. CurrentCancellationTokenSource = null;
  291. CurrentProgress = null;
  292. OnTaskCompleted(startTime, endTime, status, failureException);
  293. }
  294. /// <summary>
  295. /// Progress_s the progress changed.
  296. /// </summary>
  297. /// <param name="sender">The sender.</param>
  298. /// <param name="e">The e.</param>
  299. private void OnProgressChanged(object sender, double e)
  300. {
  301. e = Math.Min(e, 100);
  302. CurrentProgress = e;
  303. TaskProgress?.Invoke(this, new GenericEventArgs<double>(e));
  304. }
  305. /// <summary>
  306. /// Stops the task if it is currently executing.
  307. /// </summary>
  308. /// <exception cref="InvalidOperationException">Cannot cancel a Task unless it is in the Running state.</exception>
  309. public void Cancel()
  310. {
  311. if (State != TaskState.Running)
  312. {
  313. throw new InvalidOperationException("Cannot cancel a Task unless it is in the Running state.");
  314. }
  315. CancelIfRunning();
  316. }
  317. /// <summary>
  318. /// Cancels if running.
  319. /// </summary>
  320. public void CancelIfRunning()
  321. {
  322. if (State == TaskState.Running)
  323. {
  324. _logger.LogInformation("Attempting to cancel Scheduled Task {0}", Name);
  325. CurrentCancellationTokenSource.Cancel();
  326. }
  327. }
  328. /// <summary>
  329. /// Gets the scheduled tasks configuration directory.
  330. /// </summary>
  331. /// <returns>System.String.</returns>
  332. private string GetScheduledTasksConfigurationDirectory()
  333. {
  334. return Path.Combine(_applicationPaths.ConfigurationDirectoryPath, "ScheduledTasks");
  335. }
  336. /// <summary>
  337. /// Gets the scheduled tasks data directory.
  338. /// </summary>
  339. /// <returns>System.String.</returns>
  340. private string GetScheduledTasksDataDirectory()
  341. {
  342. return Path.Combine(_applicationPaths.DataPath, "ScheduledTasks");
  343. }
  344. /// <summary>
  345. /// Gets the history file path.
  346. /// </summary>
  347. /// <value>The history file path.</value>
  348. private string GetHistoryFilePath()
  349. {
  350. return Path.Combine(GetScheduledTasksDataDirectory(), new Guid(Id) + ".js");
  351. }
  352. /// <summary>
  353. /// Gets the configuration file path.
  354. /// </summary>
  355. /// <returns>System.String.</returns>
  356. private string GetConfigurationFilePath()
  357. {
  358. return Path.Combine(GetScheduledTasksConfigurationDirectory(), new Guid(Id) + ".js");
  359. }
  360. /// <summary>
  361. /// Loads the triggers.
  362. /// </summary>
  363. /// <returns>IEnumerable{BaseTaskTrigger}.</returns>
  364. private Tuple<TaskTriggerInfo, ITaskTrigger>[] LoadTriggers()
  365. {
  366. // This null check is not great, but is needed to handle bad user input, or user mucking with the config file incorrectly
  367. var settings = LoadTriggerSettings().Where(i => i is not null);
  368. return settings.Select(i => new Tuple<TaskTriggerInfo, ITaskTrigger>(i, GetTrigger(i))).ToArray();
  369. }
  370. private TaskTriggerInfo[] LoadTriggerSettings()
  371. {
  372. string path = GetConfigurationFilePath();
  373. TaskTriggerInfo[] list = null;
  374. if (File.Exists(path))
  375. {
  376. var bytes = File.ReadAllBytes(path);
  377. list = JsonSerializer.Deserialize<TaskTriggerInfo[]>(bytes, _jsonOptions);
  378. }
  379. // Return defaults if file doesn't exist.
  380. return list ?? GetDefaultTriggers();
  381. }
  382. private TaskTriggerInfo[] GetDefaultTriggers()
  383. {
  384. try
  385. {
  386. return ScheduledTask.GetDefaultTriggers().ToArray();
  387. }
  388. catch
  389. {
  390. return
  391. [
  392. new()
  393. {
  394. IntervalTicks = TimeSpan.FromDays(1).Ticks,
  395. Type = TaskTriggerInfoType.IntervalTrigger
  396. }
  397. ];
  398. }
  399. }
  400. /// <summary>
  401. /// Saves the triggers.
  402. /// </summary>
  403. /// <param name="triggers">The triggers.</param>
  404. private void SaveTriggers(TaskTriggerInfo[] triggers)
  405. {
  406. var path = GetConfigurationFilePath();
  407. Directory.CreateDirectory(Path.GetDirectoryName(path));
  408. using FileStream createStream = new FileStream(path, FileMode.Create, FileAccess.Write, FileShare.None);
  409. using Utf8JsonWriter jsonWriter = new Utf8JsonWriter(createStream);
  410. JsonSerializer.Serialize(jsonWriter, triggers, _jsonOptions);
  411. }
  412. /// <summary>
  413. /// Called when [task completed].
  414. /// </summary>
  415. /// <param name="startTime">The start time.</param>
  416. /// <param name="endTime">The end time.</param>
  417. /// <param name="status">The status.</param>
  418. /// <param name="ex">The exception.</param>
  419. private void OnTaskCompleted(DateTime startTime, DateTime endTime, TaskCompletionStatus status, Exception ex)
  420. {
  421. var elapsedTime = endTime - startTime;
  422. _logger.LogInformation("{0} {1} after {2} minute(s) and {3} seconds", Name, status, Math.Truncate(elapsedTime.TotalMinutes), elapsedTime.Seconds);
  423. var result = new TaskResult
  424. {
  425. StartTimeUtc = startTime,
  426. EndTimeUtc = endTime,
  427. Status = status,
  428. Name = Name,
  429. Id = Id
  430. };
  431. result.Key = ScheduledTask.Key;
  432. if (ex is not null)
  433. {
  434. result.ErrorMessage = ex.Message;
  435. result.LongErrorMessage = ex.StackTrace;
  436. }
  437. LastExecutionResult = result;
  438. ((TaskManager)_taskManager).OnTaskCompleted(this, result);
  439. }
  440. /// <inheritdoc />
  441. public void Dispose()
  442. {
  443. Dispose(true);
  444. GC.SuppressFinalize(this);
  445. }
  446. /// <summary>
  447. /// Releases unmanaged and - optionally - managed resources.
  448. /// </summary>
  449. /// <param name="dispose"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param>
  450. protected virtual void Dispose(bool dispose)
  451. {
  452. if (dispose)
  453. {
  454. DisposeTriggers();
  455. var wasRunning = State == TaskState.Running;
  456. var startTime = CurrentExecutionStartTime;
  457. var token = CurrentCancellationTokenSource;
  458. if (token is not null)
  459. {
  460. try
  461. {
  462. _logger.LogInformation("{Name}: Cancelling", Name);
  463. token.Cancel();
  464. }
  465. catch (Exception ex)
  466. {
  467. _logger.LogError(ex, "Error calling CancellationToken.Cancel();");
  468. }
  469. }
  470. var task = _currentTask;
  471. if (task is not null)
  472. {
  473. try
  474. {
  475. _logger.LogInformation("{Name}: Waiting on Task", Name);
  476. var exited = task.Wait(2000);
  477. if (exited)
  478. {
  479. _logger.LogInformation("{Name}: Task exited", Name);
  480. }
  481. else
  482. {
  483. _logger.LogInformation("{Name}: Timed out waiting for task to stop", Name);
  484. }
  485. }
  486. catch (Exception ex)
  487. {
  488. _logger.LogError(ex, "Error calling Task.WaitAll();");
  489. }
  490. }
  491. if (token is not null)
  492. {
  493. try
  494. {
  495. _logger.LogDebug("{Name}: Disposing CancellationToken", Name);
  496. token.Dispose();
  497. }
  498. catch (Exception ex)
  499. {
  500. _logger.LogError(ex, "Error calling CancellationToken.Dispose();");
  501. }
  502. }
  503. if (wasRunning)
  504. {
  505. OnTaskCompleted(startTime, DateTime.UtcNow, TaskCompletionStatus.Aborted, null);
  506. }
  507. }
  508. }
  509. /// <summary>
  510. /// Converts a TaskTriggerInfo into a concrete BaseTaskTrigger.
  511. /// </summary>
  512. /// <param name="info">The info.</param>
  513. /// <returns>BaseTaskTrigger.</returns>
  514. /// <exception cref="ArgumentException">Invalid trigger type: + info.Type.</exception>
  515. private ITaskTrigger GetTrigger(TaskTriggerInfo info)
  516. {
  517. var options = new TaskOptions
  518. {
  519. MaxRuntimeTicks = info.MaxRuntimeTicks
  520. };
  521. if (info.Type == TaskTriggerInfoType.DailyTrigger)
  522. {
  523. if (!info.TimeOfDayTicks.HasValue)
  524. {
  525. throw new ArgumentException("Info did not contain a TimeOfDayTicks.", nameof(info));
  526. }
  527. return new DailyTrigger(TimeSpan.FromTicks(info.TimeOfDayTicks.Value), options);
  528. }
  529. if (info.Type == TaskTriggerInfoType.WeeklyTrigger)
  530. {
  531. if (!info.TimeOfDayTicks.HasValue)
  532. {
  533. throw new ArgumentException("Info did not contain a TimeOfDayTicks.", nameof(info));
  534. }
  535. if (!info.DayOfWeek.HasValue)
  536. {
  537. throw new ArgumentException("Info did not contain a DayOfWeek.", nameof(info));
  538. }
  539. return new WeeklyTrigger(TimeSpan.FromTicks(info.TimeOfDayTicks.Value), info.DayOfWeek.Value, options);
  540. }
  541. if (info.Type == TaskTriggerInfoType.IntervalTrigger)
  542. {
  543. if (!info.IntervalTicks.HasValue)
  544. {
  545. throw new ArgumentException("Info did not contain a IntervalTicks.", nameof(info));
  546. }
  547. return new IntervalTrigger(TimeSpan.FromTicks(info.IntervalTicks.Value), options);
  548. }
  549. if (info.Type == TaskTriggerInfoType.StartupTrigger)
  550. {
  551. return new StartupTrigger(options);
  552. }
  553. throw new ArgumentException("Unrecognized trigger type: " + info.Type);
  554. }
  555. /// <summary>
  556. /// Disposes each trigger.
  557. /// </summary>
  558. private void DisposeTriggers()
  559. {
  560. foreach (var triggerInfo in InternalTriggers)
  561. {
  562. var trigger = triggerInfo.Item2;
  563. trigger.Triggered -= OnTriggerTriggered;
  564. trigger.Stop();
  565. if (trigger is IDisposable disposable)
  566. {
  567. disposable.Dispose();
  568. }
  569. }
  570. }
  571. }