TaskManager.cs 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. #pragma warning disable CS1591
  2. using System;
  3. using System.Collections.Concurrent;
  4. using System.Collections.Generic;
  5. using System.Linq;
  6. using System.Threading.Tasks;
  7. using Jellyfin.Data.Events;
  8. using MediaBrowser.Common.Configuration;
  9. using MediaBrowser.Model.Serialization;
  10. using MediaBrowser.Model.Tasks;
  11. using Microsoft.Extensions.Logging;
  12. namespace Emby.Server.Implementations.ScheduledTasks
  13. {
  14. /// <summary>
  15. /// Class TaskManager.
  16. /// </summary>
  17. public class TaskManager : ITaskManager
  18. {
  19. public event EventHandler<GenericEventArgs<IScheduledTaskWorker>> TaskExecuting;
  20. public event EventHandler<TaskCompletionEventArgs> TaskCompleted;
  21. /// <summary>
  22. /// Gets the list of Scheduled Tasks.
  23. /// </summary>
  24. /// <value>The scheduled tasks.</value>
  25. public IScheduledTaskWorker[] ScheduledTasks { get; private set; }
  26. /// <summary>
  27. /// The _task queue.
  28. /// </summary>
  29. private readonly ConcurrentQueue<Tuple<Type, TaskOptions>> _taskQueue =
  30. new ConcurrentQueue<Tuple<Type, TaskOptions>>();
  31. private readonly IJsonSerializer _jsonSerializer;
  32. private readonly IApplicationPaths _applicationPaths;
  33. private readonly ILogger<TaskManager> _logger;
  34. /// <summary>
  35. /// Initializes a new instance of the <see cref="TaskManager" /> class.
  36. /// </summary>
  37. /// <param name="applicationPaths">The application paths.</param>
  38. /// <param name="jsonSerializer">The json serializer.</param>
  39. /// <param name="logger">The logger.</param>
  40. public TaskManager(
  41. IApplicationPaths applicationPaths,
  42. IJsonSerializer jsonSerializer,
  43. ILogger<TaskManager> logger)
  44. {
  45. _applicationPaths = applicationPaths;
  46. _jsonSerializer = jsonSerializer;
  47. _logger = logger;
  48. ScheduledTasks = Array.Empty<IScheduledTaskWorker>();
  49. }
  50. /// <summary>
  51. /// Cancels if running and queue.
  52. /// </summary>
  53. /// <typeparam name="T"></typeparam>
  54. /// <param name="options">Task options.</param>
  55. public void CancelIfRunningAndQueue<T>(TaskOptions options)
  56. where T : IScheduledTask
  57. {
  58. var task = ScheduledTasks.First(t => t.ScheduledTask.GetType() == typeof(T));
  59. ((ScheduledTaskWorker)task).CancelIfRunning();
  60. QueueScheduledTask<T>(options);
  61. }
  62. public void CancelIfRunningAndQueue<T>()
  63. where T : IScheduledTask
  64. {
  65. CancelIfRunningAndQueue<T>(new TaskOptions());
  66. }
  67. /// <summary>
  68. /// Cancels if running.
  69. /// </summary>
  70. /// <typeparam name="T"></typeparam>
  71. public void CancelIfRunning<T>()
  72. where T : IScheduledTask
  73. {
  74. var task = ScheduledTasks.First(t => t.ScheduledTask.GetType() == typeof(T));
  75. ((ScheduledTaskWorker)task).CancelIfRunning();
  76. }
  77. /// <summary>
  78. /// Queues the scheduled task.
  79. /// </summary>
  80. /// <typeparam name="T"></typeparam>
  81. /// <param name="options">Task options.</param>
  82. public void QueueScheduledTask<T>(TaskOptions options)
  83. where T : IScheduledTask
  84. {
  85. var scheduledTask = ScheduledTasks.FirstOrDefault(t => t.ScheduledTask.GetType() == typeof(T));
  86. if (scheduledTask == null)
  87. {
  88. _logger.LogError("Unable to find scheduled task of type {0} in QueueScheduledTask.", typeof(T).Name);
  89. }
  90. else
  91. {
  92. QueueScheduledTask(scheduledTask, options);
  93. }
  94. }
  95. public void QueueScheduledTask<T>()
  96. where T : IScheduledTask
  97. {
  98. QueueScheduledTask<T>(new TaskOptions());
  99. }
  100. public void QueueIfNotRunning<T>()
  101. where T : IScheduledTask
  102. {
  103. var task = ScheduledTasks.First(t => t.ScheduledTask.GetType() == typeof(T));
  104. if (task.State != TaskState.Running)
  105. {
  106. QueueScheduledTask<T>(new TaskOptions());
  107. }
  108. }
  109. public void Execute<T>()
  110. where T : IScheduledTask
  111. {
  112. var scheduledTask = ScheduledTasks.FirstOrDefault(t => t.ScheduledTask.GetType() == typeof(T));
  113. if (scheduledTask == null)
  114. {
  115. _logger.LogError("Unable to find scheduled task of type {0} in Execute.", typeof(T).Name);
  116. }
  117. else
  118. {
  119. var type = scheduledTask.ScheduledTask.GetType();
  120. _logger.LogInformation("Queueing task {0}", type.Name);
  121. lock (_taskQueue)
  122. {
  123. if (scheduledTask.State == TaskState.Idle)
  124. {
  125. Execute(scheduledTask, new TaskOptions());
  126. }
  127. }
  128. }
  129. }
  130. /// <summary>
  131. /// Queues the scheduled task.
  132. /// </summary>
  133. /// <param name="task">The task.</param>
  134. /// <param name="options">The task options.</param>
  135. public void QueueScheduledTask(IScheduledTask task, TaskOptions options)
  136. {
  137. var scheduledTask = ScheduledTasks.FirstOrDefault(t => t.ScheduledTask.GetType() == task.GetType());
  138. if (scheduledTask == null)
  139. {
  140. _logger.LogError("Unable to find scheduled task of type {0} in QueueScheduledTask.", task.GetType().Name);
  141. }
  142. else
  143. {
  144. QueueScheduledTask(scheduledTask, options);
  145. }
  146. }
  147. /// <summary>
  148. /// Queues the scheduled task.
  149. /// </summary>
  150. /// <param name="task">The task.</param>
  151. /// <param name="options">The task options.</param>
  152. private void QueueScheduledTask(IScheduledTaskWorker task, TaskOptions options)
  153. {
  154. var type = task.ScheduledTask.GetType();
  155. _logger.LogInformation("Queueing task {0}", type.Name);
  156. lock (_taskQueue)
  157. {
  158. if (task.State == TaskState.Idle)
  159. {
  160. Execute(task, options);
  161. return;
  162. }
  163. _taskQueue.Enqueue(new Tuple<Type, TaskOptions>(type, options));
  164. }
  165. }
  166. /// <summary>
  167. /// Adds the tasks.
  168. /// </summary>
  169. /// <param name="tasks">The tasks.</param>
  170. public void AddTasks(IEnumerable<IScheduledTask> tasks)
  171. {
  172. var list = tasks.Select(t => new ScheduledTaskWorker(t, _applicationPaths, this, _jsonSerializer, _logger));
  173. ScheduledTasks = ScheduledTasks.Concat(list).ToArray();
  174. }
  175. /// <summary>
  176. /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
  177. /// </summary>
  178. public void Dispose()
  179. {
  180. Dispose(true);
  181. }
  182. /// <summary>
  183. /// Releases unmanaged and - optionally - managed resources.
  184. /// </summary>
  185. /// <param name="dispose"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param>
  186. protected virtual void Dispose(bool dispose)
  187. {
  188. foreach (var task in ScheduledTasks)
  189. {
  190. task.Dispose();
  191. }
  192. }
  193. public void Cancel(IScheduledTaskWorker task)
  194. {
  195. ((ScheduledTaskWorker)task).Cancel();
  196. }
  197. public Task Execute(IScheduledTaskWorker task, TaskOptions options)
  198. {
  199. return ((ScheduledTaskWorker)task).Execute(options);
  200. }
  201. /// <summary>
  202. /// Called when [task executing].
  203. /// </summary>
  204. /// <param name="task">The task.</param>
  205. internal void OnTaskExecuting(IScheduledTaskWorker task)
  206. {
  207. TaskExecuting?.Invoke(this, new GenericEventArgs<IScheduledTaskWorker>(task));
  208. }
  209. /// <summary>
  210. /// Called when [task completed].
  211. /// </summary>
  212. /// <param name="task">The task.</param>
  213. /// <param name="result">The result.</param>
  214. internal void OnTaskCompleted(IScheduledTaskWorker task, TaskResult result)
  215. {
  216. TaskCompleted?.Invoke(task, new TaskCompletionEventArgs(task, result));
  217. ExecuteQueuedTasks();
  218. }
  219. /// <summary>
  220. /// Executes the queued tasks.
  221. /// </summary>
  222. private void ExecuteQueuedTasks()
  223. {
  224. _logger.LogInformation("ExecuteQueuedTasks");
  225. // Execute queued tasks
  226. lock (_taskQueue)
  227. {
  228. var list = new List<Tuple<Type, TaskOptions>>();
  229. while (_taskQueue.TryDequeue(out var item))
  230. {
  231. if (list.All(i => i.Item1 != item.Item1))
  232. {
  233. list.Add(item);
  234. }
  235. }
  236. foreach (var enqueuedType in list)
  237. {
  238. var scheduledTask = ScheduledTasks.First(t => t.ScheduledTask.GetType() == enqueuedType.Item1);
  239. if (scheduledTask.State == TaskState.Idle)
  240. {
  241. Execute(scheduledTask, enqueuedType.Item2);
  242. }
  243. }
  244. }
  245. }
  246. }
  247. }