TaskManager.cs 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. #pragma warning disable CS1591
  2. using System;
  3. using System.Collections.Concurrent;
  4. using System.Collections.Generic;
  5. using System.Linq;
  6. using System.Threading.Tasks;
  7. using Jellyfin.Data.Events;
  8. using MediaBrowser.Common.Configuration;
  9. using MediaBrowser.Model.Tasks;
  10. using Microsoft.Extensions.Logging;
  11. namespace Emby.Server.Implementations.ScheduledTasks
  12. {
  13. /// <summary>
  14. /// Class TaskManager.
  15. /// </summary>
  16. public class TaskManager : ITaskManager
  17. {
  18. public event EventHandler<GenericEventArgs<IScheduledTaskWorker>> TaskExecuting;
  19. public event EventHandler<TaskCompletionEventArgs> TaskCompleted;
  20. /// <summary>
  21. /// Gets the list of Scheduled Tasks.
  22. /// </summary>
  23. /// <value>The scheduled tasks.</value>
  24. public IScheduledTaskWorker[] ScheduledTasks { get; private set; }
  25. /// <summary>
  26. /// The _task queue.
  27. /// </summary>
  28. private readonly ConcurrentQueue<Tuple<Type, TaskOptions>> _taskQueue =
  29. new ConcurrentQueue<Tuple<Type, TaskOptions>>();
  30. private readonly IApplicationPaths _applicationPaths;
  31. private readonly ILogger<TaskManager> _logger;
  32. /// <summary>
  33. /// Initializes a new instance of the <see cref="TaskManager" /> class.
  34. /// </summary>
  35. /// <param name="applicationPaths">The application paths.</param>
  36. /// <param name="jsonSerializer">The json serializer.</param>
  37. /// <param name="logger">The logger.</param>
  38. public TaskManager(
  39. IApplicationPaths applicationPaths,
  40. ILogger<TaskManager> logger)
  41. {
  42. _applicationPaths = applicationPaths;
  43. _logger = logger;
  44. ScheduledTasks = Array.Empty<IScheduledTaskWorker>();
  45. }
  46. /// <summary>
  47. /// Cancels if running and queue.
  48. /// </summary>
  49. /// <typeparam name="T"></typeparam>
  50. /// <param name="options">Task options.</param>
  51. public void CancelIfRunningAndQueue<T>(TaskOptions options)
  52. where T : IScheduledTask
  53. {
  54. var task = ScheduledTasks.First(t => t.ScheduledTask.GetType() == typeof(T));
  55. ((ScheduledTaskWorker)task).CancelIfRunning();
  56. QueueScheduledTask<T>(options);
  57. }
  58. public void CancelIfRunningAndQueue<T>()
  59. where T : IScheduledTask
  60. {
  61. CancelIfRunningAndQueue<T>(new TaskOptions());
  62. }
  63. /// <summary>
  64. /// Cancels if running.
  65. /// </summary>
  66. /// <typeparam name="T"></typeparam>
  67. public void CancelIfRunning<T>()
  68. where T : IScheduledTask
  69. {
  70. var task = ScheduledTasks.First(t => t.ScheduledTask.GetType() == typeof(T));
  71. ((ScheduledTaskWorker)task).CancelIfRunning();
  72. }
  73. /// <summary>
  74. /// Queues the scheduled task.
  75. /// </summary>
  76. /// <typeparam name="T"></typeparam>
  77. /// <param name="options">Task options.</param>
  78. public void QueueScheduledTask<T>(TaskOptions options)
  79. where T : IScheduledTask
  80. {
  81. var scheduledTask = ScheduledTasks.FirstOrDefault(t => t.ScheduledTask.GetType() == typeof(T));
  82. if (scheduledTask == null)
  83. {
  84. _logger.LogError("Unable to find scheduled task of type {0} in QueueScheduledTask.", typeof(T).Name);
  85. }
  86. else
  87. {
  88. QueueScheduledTask(scheduledTask, options);
  89. }
  90. }
  91. public void QueueScheduledTask<T>()
  92. where T : IScheduledTask
  93. {
  94. QueueScheduledTask<T>(new TaskOptions());
  95. }
  96. public void QueueIfNotRunning<T>()
  97. where T : IScheduledTask
  98. {
  99. var task = ScheduledTasks.First(t => t.ScheduledTask.GetType() == typeof(T));
  100. if (task.State != TaskState.Running)
  101. {
  102. QueueScheduledTask<T>(new TaskOptions());
  103. }
  104. }
  105. public void Execute<T>()
  106. where T : IScheduledTask
  107. {
  108. var scheduledTask = ScheduledTasks.FirstOrDefault(t => t.ScheduledTask.GetType() == typeof(T));
  109. if (scheduledTask == null)
  110. {
  111. _logger.LogError("Unable to find scheduled task of type {0} in Execute.", typeof(T).Name);
  112. }
  113. else
  114. {
  115. var type = scheduledTask.ScheduledTask.GetType();
  116. _logger.LogInformation("Queuing task {0}", type.Name);
  117. lock (_taskQueue)
  118. {
  119. if (scheduledTask.State == TaskState.Idle)
  120. {
  121. Execute(scheduledTask, new TaskOptions());
  122. }
  123. }
  124. }
  125. }
  126. /// <summary>
  127. /// Queues the scheduled task.
  128. /// </summary>
  129. /// <param name="task">The task.</param>
  130. /// <param name="options">The task options.</param>
  131. public void QueueScheduledTask(IScheduledTask task, TaskOptions options)
  132. {
  133. var scheduledTask = ScheduledTasks.FirstOrDefault(t => t.ScheduledTask.GetType() == task.GetType());
  134. if (scheduledTask == null)
  135. {
  136. _logger.LogError("Unable to find scheduled task of type {0} in QueueScheduledTask.", task.GetType().Name);
  137. }
  138. else
  139. {
  140. QueueScheduledTask(scheduledTask, options);
  141. }
  142. }
  143. /// <summary>
  144. /// Queues the scheduled task.
  145. /// </summary>
  146. /// <param name="task">The task.</param>
  147. /// <param name="options">The task options.</param>
  148. private void QueueScheduledTask(IScheduledTaskWorker task, TaskOptions options)
  149. {
  150. var type = task.ScheduledTask.GetType();
  151. _logger.LogInformation("Queuing task {0}", type.Name);
  152. lock (_taskQueue)
  153. {
  154. if (task.State == TaskState.Idle)
  155. {
  156. Execute(task, options);
  157. return;
  158. }
  159. _taskQueue.Enqueue(new Tuple<Type, TaskOptions>(type, options));
  160. }
  161. }
  162. /// <summary>
  163. /// Adds the tasks.
  164. /// </summary>
  165. /// <param name="tasks">The tasks.</param>
  166. public void AddTasks(IEnumerable<IScheduledTask> tasks)
  167. {
  168. var list = tasks.Select(t => new ScheduledTaskWorker(t, _applicationPaths, this, _logger));
  169. ScheduledTasks = ScheduledTasks.Concat(list).ToArray();
  170. }
  171. /// <summary>
  172. /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
  173. /// </summary>
  174. public void Dispose()
  175. {
  176. Dispose(true);
  177. GC.SuppressFinalize(this);
  178. }
  179. /// <summary>
  180. /// Releases unmanaged and - optionally - managed resources.
  181. /// </summary>
  182. /// <param name="dispose"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param>
  183. protected virtual void Dispose(bool dispose)
  184. {
  185. foreach (var task in ScheduledTasks)
  186. {
  187. task.Dispose();
  188. }
  189. }
  190. public void Cancel(IScheduledTaskWorker task)
  191. {
  192. ((ScheduledTaskWorker)task).Cancel();
  193. }
  194. public Task Execute(IScheduledTaskWorker task, TaskOptions options)
  195. {
  196. return ((ScheduledTaskWorker)task).Execute(options);
  197. }
  198. /// <summary>
  199. /// Called when [task executing].
  200. /// </summary>
  201. /// <param name="task">The task.</param>
  202. internal void OnTaskExecuting(IScheduledTaskWorker task)
  203. {
  204. TaskExecuting?.Invoke(this, new GenericEventArgs<IScheduledTaskWorker>(task));
  205. }
  206. /// <summary>
  207. /// Called when [task completed].
  208. /// </summary>
  209. /// <param name="task">The task.</param>
  210. /// <param name="result">The result.</param>
  211. internal void OnTaskCompleted(IScheduledTaskWorker task, TaskResult result)
  212. {
  213. TaskCompleted?.Invoke(task, new TaskCompletionEventArgs(task, result));
  214. ExecuteQueuedTasks();
  215. }
  216. /// <summary>
  217. /// Executes the queued tasks.
  218. /// </summary>
  219. private void ExecuteQueuedTasks()
  220. {
  221. _logger.LogInformation("ExecuteQueuedTasks");
  222. // Execute queued tasks
  223. lock (_taskQueue)
  224. {
  225. var list = new List<Tuple<Type, TaskOptions>>();
  226. while (_taskQueue.TryDequeue(out var item))
  227. {
  228. if (list.All(i => i.Item1 != item.Item1))
  229. {
  230. list.Add(item);
  231. }
  232. }
  233. foreach (var enqueuedType in list)
  234. {
  235. var scheduledTask = ScheduledTasks.First(t => t.ScheduledTask.GetType() == enqueuedType.Item1);
  236. if (scheduledTask.State == TaskState.Idle)
  237. {
  238. Execute(scheduledTask, enqueuedType.Item2);
  239. }
  240. }
  241. }
  242. }
  243. }
  244. }