LimitedConcurrencyLibraryScheduler.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Diagnostics;
  5. using System.Linq;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using MediaBrowser.Controller.Configuration;
  9. using Microsoft.Extensions.Hosting;
  10. using Microsoft.Extensions.Logging;
  11. namespace MediaBrowser.Controller.LibraryTaskScheduler;
  12. /// <summary>
  13. /// Provides Parallel action interface to process tasks with a set concurrency level.
  14. /// </summary>
  15. public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibraryScheduler, IAsyncDisposable
  16. {
  17. private const int CleanupGracePeriod = 60;
  18. private readonly IHostApplicationLifetime _hostApplicationLifetime;
  19. private readonly ILogger<LimitedConcurrencyLibraryScheduler> _logger;
  20. private readonly IServerConfigurationManager _serverConfigurationManager;
  21. private readonly Dictionary<CancellationTokenSource, Task> _taskRunners = new();
  22. private static readonly AsyncLocal<CancellationTokenSource> _deadlockDetector = new();
  23. /// <summary>
  24. /// Gets used to lock all operations on the Tasks queue and creating workers.
  25. /// </summary>
  26. private readonly Lock _taskLock = new();
  27. private readonly BlockingCollection<TaskQueueItem> _tasks = new();
  28. private volatile int _workCounter;
  29. private Task? _cleanupTask;
  30. private bool _disposed;
  31. /// <summary>
  32. /// Initializes a new instance of the <see cref="LimitedConcurrencyLibraryScheduler"/> class.
  33. /// </summary>
  34. /// <param name="hostApplicationLifetime">The hosting lifetime.</param>
  35. /// <param name="logger">The logger.</param>
  36. /// <param name="serverConfigurationManager">The server configuration manager.</param>
  37. public LimitedConcurrencyLibraryScheduler(
  38. IHostApplicationLifetime hostApplicationLifetime,
  39. ILogger<LimitedConcurrencyLibraryScheduler> logger,
  40. IServerConfigurationManager serverConfigurationManager)
  41. {
  42. _hostApplicationLifetime = hostApplicationLifetime;
  43. _logger = logger;
  44. _serverConfigurationManager = serverConfigurationManager;
  45. }
  46. private void ScheduleTaskCleanup()
  47. {
  48. lock (_taskLock)
  49. {
  50. if (_cleanupTask is not null)
  51. {
  52. _logger.LogDebug("Cleanup task already scheduled.");
  53. // cleanup task is already running.
  54. return;
  55. }
  56. _cleanupTask = RunCleanupTask();
  57. }
  58. async Task RunCleanupTask()
  59. {
  60. _logger.LogDebug("Schedule cleanup task in {CleanupGracePerioid} sec.", CleanupGracePeriod);
  61. await Task.Delay(TimeSpan.FromSeconds(CleanupGracePeriod)).ConfigureAwait(false);
  62. if (_disposed)
  63. {
  64. _logger.LogDebug("Abort cleaning up, already disposed.");
  65. return;
  66. }
  67. lock (_taskLock)
  68. {
  69. if (_tasks.Count > 0 || _workCounter > 0)
  70. {
  71. _logger.LogDebug("Delay cleanup task, operations still running.");
  72. // tasks are still there so its still in use. Reschedule cleanup task.
  73. // we cannot just exit here and rely on the other invoker because there is a considerable timeframe where it could have already ended.
  74. _cleanupTask = RunCleanupTask();
  75. return;
  76. }
  77. }
  78. _logger.LogDebug("Cleanup runners.");
  79. foreach (var item in _taskRunners.ToArray())
  80. {
  81. await item.Key.CancelAsync().ConfigureAwait(false);
  82. _taskRunners.Remove(item.Key);
  83. }
  84. }
  85. }
  86. private bool ShouldForceSequentialOperation()
  87. {
  88. // if the user either set the setting to 1 or it's unset and we have fewer than 4 cores it's better to run sequentially.
  89. var fanoutSetting = _serverConfigurationManager.Configuration.LibraryScanFanoutConcurrency;
  90. return fanoutSetting == 1 || (fanoutSetting <= 0 && Environment.ProcessorCount <= 3);
  91. }
  92. private int CalculateScanConcurrencyLimit()
  93. {
  94. // when this is invoked, we already checked ShouldForceSequentialOperation for the sequential check.
  95. var fanoutConcurrency = _serverConfigurationManager.Configuration.LibraryScanFanoutConcurrency;
  96. if (fanoutConcurrency <= 0)
  97. {
  98. // in case the user did not set a limit manually, we can assume he has 3 or more cores as already checked by ShouldForceSequentialOperation.
  99. return Environment.ProcessorCount - 3;
  100. }
  101. return fanoutConcurrency;
  102. }
  103. private void Worker()
  104. {
  105. lock (_taskLock)
  106. {
  107. var operationFanout = Math.Max(0, CalculateScanConcurrencyLimit() - _taskRunners.Count);
  108. _logger.LogDebug("Spawn {NumberRunners} new runners.", operationFanout);
  109. for (int i = 0; i < operationFanout; i++)
  110. {
  111. var stopToken = new CancellationTokenSource();
  112. var combinedSource = CancellationTokenSource.CreateLinkedTokenSource(stopToken.Token, _hostApplicationLifetime.ApplicationStopping);
  113. _taskRunners.Add(
  114. combinedSource,
  115. Task.Factory.StartNew(
  116. ItemWorker,
  117. (combinedSource, stopToken),
  118. combinedSource.Token,
  119. TaskCreationOptions.PreferFairness,
  120. TaskScheduler.Default));
  121. }
  122. }
  123. }
  124. private async Task ItemWorker(object? obj)
  125. {
  126. var stopToken = ((CancellationTokenSource TaskStop, CancellationTokenSource GlobalStop))obj!;
  127. _deadlockDetector.Value = stopToken.TaskStop;
  128. try
  129. {
  130. foreach (var item in _tasks.GetConsumingEnumerable(stopToken.GlobalStop.Token))
  131. {
  132. stopToken.GlobalStop.Token.ThrowIfCancellationRequested();
  133. try
  134. {
  135. var newWorkerLimit = Interlocked.Increment(ref _workCounter) > 0;
  136. Debug.Assert(newWorkerLimit, "_workCounter > 0");
  137. _logger.LogDebug("Process new item '{Data}'.", item.Data);
  138. await ProcessItem(item).ConfigureAwait(false);
  139. }
  140. finally
  141. {
  142. var newWorkerLimit = Interlocked.Decrement(ref _workCounter) >= 0;
  143. Debug.Assert(newWorkerLimit, "_workCounter > 0");
  144. }
  145. }
  146. }
  147. catch (OperationCanceledException) when (stopToken.TaskStop.IsCancellationRequested)
  148. {
  149. // thats how you do it, interupt the waiter thread. There is nothing to do here when it was on purpose.
  150. }
  151. finally
  152. {
  153. _logger.LogDebug("Cleanup Runner'.");
  154. _deadlockDetector.Value = default!;
  155. _taskRunners.Remove(stopToken.TaskStop);
  156. stopToken.GlobalStop.Dispose();
  157. stopToken.TaskStop.Dispose();
  158. }
  159. }
  160. private async Task ProcessItem(TaskQueueItem item)
  161. {
  162. try
  163. {
  164. if (item.CancellationToken.IsCancellationRequested)
  165. {
  166. // if item is cancelled, just skip it
  167. return;
  168. }
  169. await item.Worker(item.Data).ConfigureAwait(true);
  170. }
  171. catch (System.Exception ex)
  172. {
  173. _logger.LogError(ex, "Error while performing a library operation");
  174. }
  175. finally
  176. {
  177. item.Progress.Report(100);
  178. item.Done.SetResult();
  179. }
  180. }
  181. /// <inheritdoc/>
  182. public async Task Enqueue<T>(T[] data, Func<T, IProgress<double>, Task> worker, IProgress<double> progress, CancellationToken cancellationToken)
  183. {
  184. if (_disposed)
  185. {
  186. return;
  187. }
  188. if (data.Length == 0 || cancellationToken.IsCancellationRequested)
  189. {
  190. progress.Report(100);
  191. return;
  192. }
  193. _logger.LogDebug("Enqueue new Workset of {NoItems} items.", data.Length);
  194. TaskQueueItem[] workItems = null!;
  195. void UpdateProgress()
  196. {
  197. progress.Report(workItems.Select(e => e.ProgressValue).Average());
  198. }
  199. workItems = data.Select(item =>
  200. {
  201. TaskQueueItem queueItem = null!;
  202. return queueItem = new TaskQueueItem()
  203. {
  204. Data = item!,
  205. Progress = new Progress<double>(innerPercent =>
  206. {
  207. // round the percent and only update progress if it changed to prevent excessive UpdateProgress calls
  208. var innerPercentRounded = Math.Round(innerPercent);
  209. if (queueItem.ProgressValue != innerPercentRounded)
  210. {
  211. queueItem.ProgressValue = innerPercentRounded;
  212. UpdateProgress();
  213. }
  214. }),
  215. Worker = (val) => worker((T)val, queueItem.Progress),
  216. CancellationToken = cancellationToken
  217. };
  218. }).ToArray();
  219. if (ShouldForceSequentialOperation())
  220. {
  221. _logger.LogDebug("Process sequentially.");
  222. try
  223. {
  224. foreach (var item in workItems)
  225. {
  226. await ProcessItem(item).ConfigureAwait(false);
  227. }
  228. }
  229. catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
  230. {
  231. // operation is cancelled. Do nothing.
  232. }
  233. _logger.LogDebug("Process sequentially done.");
  234. return;
  235. }
  236. for (var i = 0; i < workItems.Length; i++)
  237. {
  238. var item = workItems[i]!;
  239. _tasks.Add(item, CancellationToken.None);
  240. }
  241. if (_deadlockDetector.Value is not null)
  242. {
  243. _logger.LogDebug("Nested invocation detected, process in-place.");
  244. try
  245. {
  246. // we are in a nested loop. There is no reason to spawn a task here as that would just lead to deadlocks and no additional concurrency is achieved
  247. while (workItems.Any(e => !e.Done.Task.IsCompleted) && _tasks.TryTake(out var item, 200, _deadlockDetector.Value.Token))
  248. {
  249. await ProcessItem(item).ConfigureAwait(false);
  250. }
  251. }
  252. catch (OperationCanceledException) when (_deadlockDetector.Value.IsCancellationRequested)
  253. {
  254. // operation is cancelled. Do nothing.
  255. }
  256. _logger.LogDebug("process in-place done.");
  257. }
  258. else
  259. {
  260. Worker();
  261. _logger.LogDebug("Wait for {NoWorkers} to complete.", workItems.Length);
  262. await Task.WhenAll([.. workItems.Select(f => f.Done.Task)]).ConfigureAwait(false);
  263. _logger.LogDebug("{NoWorkers} completed.", workItems.Length);
  264. ScheduleTaskCleanup();
  265. }
  266. }
  267. /// <inheritdoc/>
  268. public async ValueTask DisposeAsync()
  269. {
  270. if (_disposed)
  271. {
  272. return;
  273. }
  274. _disposed = true;
  275. _tasks.CompleteAdding();
  276. foreach (var item in _taskRunners)
  277. {
  278. await item.Key.CancelAsync().ConfigureAwait(false);
  279. }
  280. _tasks.Dispose();
  281. if (_cleanupTask is not null)
  282. {
  283. await _cleanupTask.ConfigureAwait(false);
  284. _cleanupTask?.Dispose();
  285. }
  286. }
  287. private class TaskQueueItem
  288. {
  289. public required object Data { get; init; }
  290. public double ProgressValue { get; set; }
  291. public required Func<object, Task> Worker { get; init; }
  292. public required IProgress<double> Progress { get; init; }
  293. public TaskCompletionSource Done { get; } = new();
  294. public CancellationToken CancellationToken { get; init; }
  295. }
  296. }