LimitedConcurrencyLibraryScheduler.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Diagnostics;
  5. using System.Linq;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using MediaBrowser.Controller.Configuration;
  9. using Microsoft.Extensions.Hosting;
  10. using Microsoft.Extensions.Logging;
  11. namespace MediaBrowser.Controller.LibraryTaskScheduler;
  12. /// <summary>
  13. /// Provides Parallel action interface to process tasks with a set concurrency level.
  14. /// </summary>
  15. public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibraryScheduler, IAsyncDisposable
  16. {
  17. private const int CleanupGracePeriod = 60;
  18. private readonly IHostApplicationLifetime _hostApplicationLifetime;
  19. private readonly ILogger<LimitedConcurrencyLibraryScheduler> _logger;
  20. private readonly IServerConfigurationManager _serverConfigurationManager;
  21. private readonly Dictionary<CancellationTokenSource, Task> _taskRunners = new();
  22. private static readonly AsyncLocal<CancellationTokenSource> _deadlockDetector = new();
  23. /// <summary>
  24. /// Gets used to lock all operations on the Tasks queue and creating workers.
  25. /// </summary>
  26. private readonly Lock _taskLock = new();
  27. private readonly BlockingCollection<TaskQueueItem> _tasks = new();
  28. private volatile int _workCounter;
  29. private Task? _cleanupTask;
  30. private bool _disposed;
  31. /// <summary>
  32. /// Initializes a new instance of the <see cref="LimitedConcurrencyLibraryScheduler"/> class.
  33. /// </summary>
  34. /// <param name="hostApplicationLifetime">The hosting lifetime.</param>
  35. /// <param name="logger">The logger.</param>
  36. /// <param name="serverConfigurationManager">The server configuration manager.</param>
  37. public LimitedConcurrencyLibraryScheduler(
  38. IHostApplicationLifetime hostApplicationLifetime,
  39. ILogger<LimitedConcurrencyLibraryScheduler> logger,
  40. IServerConfigurationManager serverConfigurationManager)
  41. {
  42. _hostApplicationLifetime = hostApplicationLifetime;
  43. _logger = logger;
  44. _serverConfigurationManager = serverConfigurationManager;
  45. }
  46. private void ScheduleTaskCleanup()
  47. {
  48. lock (_taskLock)
  49. {
  50. if (_cleanupTask is not null)
  51. {
  52. _logger.LogDebug("Cleanup task already scheduled.");
  53. // cleanup task is already running.
  54. return;
  55. }
  56. _cleanupTask = RunCleanupTask();
  57. }
  58. async Task RunCleanupTask()
  59. {
  60. _logger.LogDebug("Schedule cleanup task in {CleanupGracePerioid} sec.", CleanupGracePeriod);
  61. await Task.Delay(TimeSpan.FromSeconds(CleanupGracePeriod)).ConfigureAwait(false);
  62. if (_disposed)
  63. {
  64. _logger.LogDebug("Abort cleaning up, already disposed.");
  65. return;
  66. }
  67. lock (_taskLock)
  68. {
  69. if (_tasks.Count > 0 || _workCounter > 0)
  70. {
  71. _logger.LogDebug("Delay cleanup task, operations still running.");
  72. // tasks are still there so its still in use. Reschedule cleanup task.
  73. // we cannot just exit here and rely on the other invoker because there is a considerable timeframe where it could have already ended.
  74. _cleanupTask = RunCleanupTask();
  75. return;
  76. }
  77. }
  78. _logger.LogDebug("Cleanup runners.");
  79. foreach (var item in _taskRunners.ToArray())
  80. {
  81. await item.Key.CancelAsync().ConfigureAwait(false);
  82. _taskRunners.Remove(item.Key);
  83. }
  84. }
  85. }
  86. private void Worker()
  87. {
  88. lock (_taskLock)
  89. {
  90. var fanoutConcurrency = _serverConfigurationManager.Configuration.LibraryScanFanoutConcurrency;
  91. var parallelism = (fanoutConcurrency > 0 ? fanoutConcurrency : Environment.ProcessorCount) - _taskRunners.Count;
  92. _logger.LogDebug("Spawn {NumberRunners} new runners.", parallelism);
  93. for (int i = 0; i < parallelism; i++)
  94. {
  95. var stopToken = new CancellationTokenSource();
  96. var combinedSource = CancellationTokenSource.CreateLinkedTokenSource(stopToken.Token, _hostApplicationLifetime.ApplicationStopping);
  97. _taskRunners.Add(
  98. combinedSource,
  99. Task.Factory.StartNew(
  100. ItemWorker,
  101. (combinedSource, stopToken),
  102. combinedSource.Token,
  103. TaskCreationOptions.PreferFairness,
  104. TaskScheduler.Default));
  105. }
  106. }
  107. }
  108. private async Task ItemWorker(object? obj)
  109. {
  110. var stopToken = ((CancellationTokenSource TaskStop, CancellationTokenSource GlobalStop))obj!;
  111. _deadlockDetector.Value = stopToken.TaskStop;
  112. try
  113. {
  114. foreach (var item in _tasks.GetConsumingEnumerable(stopToken.GlobalStop.Token))
  115. {
  116. stopToken.GlobalStop.Token.ThrowIfCancellationRequested();
  117. try
  118. {
  119. var newWorkerLimit = Interlocked.Increment(ref _workCounter) > 0;
  120. Debug.Assert(newWorkerLimit, "_workCounter > 0");
  121. _logger.LogDebug("Process new item '{Data}'.", item.Data);
  122. await ProcessItem(item).ConfigureAwait(false);
  123. }
  124. finally
  125. {
  126. var newWorkerLimit = Interlocked.Decrement(ref _workCounter) >= 0;
  127. Debug.Assert(newWorkerLimit, "_workCounter > 0");
  128. }
  129. }
  130. }
  131. catch (OperationCanceledException) when (stopToken.TaskStop.IsCancellationRequested)
  132. {
  133. // thats how you do it, interupt the waiter thread. There is nothing to do here when it was on purpose.
  134. }
  135. finally
  136. {
  137. _logger.LogDebug("Cleanup Runner'.");
  138. _deadlockDetector.Value = default!;
  139. _taskRunners.Remove(stopToken.TaskStop);
  140. stopToken.GlobalStop.Dispose();
  141. stopToken.TaskStop.Dispose();
  142. }
  143. }
  144. private async Task ProcessItem(TaskQueueItem item)
  145. {
  146. try
  147. {
  148. if (item.CancellationToken.IsCancellationRequested)
  149. {
  150. // if item is cancelled, just skip it
  151. return;
  152. }
  153. await item.Worker(item.Data).ConfigureAwait(true);
  154. }
  155. catch (System.Exception ex)
  156. {
  157. _logger.LogError(ex, "Error while performing a library operation");
  158. }
  159. finally
  160. {
  161. item.Progress.Report(100);
  162. item.Done.SetResult();
  163. }
  164. }
  165. /// <inheritdoc/>
  166. public async Task Enqueue<T>(T[] data, Func<T, IProgress<double>, Task> worker, IProgress<double> progress, CancellationToken cancellationToken)
  167. {
  168. if (_disposed)
  169. {
  170. return;
  171. }
  172. if (data.Length == 0 || cancellationToken.IsCancellationRequested)
  173. {
  174. progress.Report(100);
  175. return;
  176. }
  177. _logger.LogDebug("Enqueue new Workset of {NoItems} items.", data.Length);
  178. TaskQueueItem[] workItems = null!;
  179. void UpdateProgress()
  180. {
  181. progress.Report(workItems.Select(e => e.ProgressValue).Average());
  182. }
  183. workItems = data.Select(item =>
  184. {
  185. TaskQueueItem queueItem = null!;
  186. return queueItem = new TaskQueueItem()
  187. {
  188. Data = item!,
  189. Progress = new Progress<double>(innerPercent =>
  190. {
  191. // round the percent and only update progress if it changed to prevent excessive UpdateProgress calls
  192. var innerPercentRounded = Math.Round(innerPercent);
  193. if (queueItem.ProgressValue != innerPercentRounded)
  194. {
  195. queueItem.ProgressValue = innerPercentRounded;
  196. UpdateProgress();
  197. }
  198. }),
  199. Worker = (val) => worker((T)val, queueItem.Progress),
  200. CancellationToken = cancellationToken
  201. };
  202. }).ToArray();
  203. if (_serverConfigurationManager.Configuration.LibraryScanFanoutConcurrency == 1)
  204. {
  205. _logger.LogDebug("Process sequentially.");
  206. try
  207. {
  208. foreach (var item in workItems)
  209. {
  210. await ProcessItem(item).ConfigureAwait(false);
  211. }
  212. }
  213. catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
  214. {
  215. // operation is cancelled. Do nothing.
  216. }
  217. _logger.LogDebug("Process sequentially done.");
  218. return;
  219. }
  220. for (var i = 0; i < workItems.Length; i++)
  221. {
  222. var item = workItems[i]!;
  223. _tasks.Add(item, CancellationToken.None);
  224. }
  225. if (_deadlockDetector.Value is not null)
  226. {
  227. _logger.LogDebug("Nested invocation detected, process in-place.");
  228. try
  229. {
  230. // we are in a nested loop. There is no reason to spawn a task here as that would just lead to deadlocks and no additional concurrency is achieved
  231. while (workItems.Any(e => !e.Done.Task.IsCompleted) && _tasks.TryTake(out var item, 200, _deadlockDetector.Value.Token))
  232. {
  233. await ProcessItem(item).ConfigureAwait(false);
  234. }
  235. }
  236. catch (OperationCanceledException) when (_deadlockDetector.Value.IsCancellationRequested)
  237. {
  238. // operation is cancelled. Do nothing.
  239. }
  240. _logger.LogDebug("process in-place done.");
  241. }
  242. else
  243. {
  244. Worker();
  245. _logger.LogDebug("Wait for {NoWorkers} to complete.", workItems.Length);
  246. await Task.WhenAll([.. workItems.Select(f => f.Done.Task)]).ConfigureAwait(false);
  247. _logger.LogDebug("{NoWorkers} completed.", workItems.Length);
  248. ScheduleTaskCleanup();
  249. }
  250. }
  251. /// <inheritdoc/>
  252. public async ValueTask DisposeAsync()
  253. {
  254. if (_disposed)
  255. {
  256. return;
  257. }
  258. _disposed = true;
  259. _tasks.CompleteAdding();
  260. foreach (var item in _taskRunners)
  261. {
  262. await item.Key.CancelAsync().ConfigureAwait(false);
  263. }
  264. _tasks.Dispose();
  265. if (_cleanupTask is not null)
  266. {
  267. await _cleanupTask.ConfigureAwait(false);
  268. _cleanupTask?.Dispose();
  269. }
  270. }
  271. private class TaskQueueItem
  272. {
  273. public required object Data { get; init; }
  274. public double ProgressValue { get; set; }
  275. public required Func<object, Task> Worker { get; init; }
  276. public required IProgress<double> Progress { get; init; }
  277. public TaskCompletionSource Done { get; } = new();
  278. public CancellationToken CancellationToken { get; init; }
  279. }
  280. }