LimitedConcurrencyLibraryScheduler.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Diagnostics;
  5. using System.Linq;
  6. using System.Threading;
  7. using System.Threading.Channels;
  8. using System.Threading.Tasks;
  9. using MediaBrowser.Controller.Configuration;
  10. using Microsoft.Extensions.Hosting;
  11. using Microsoft.Extensions.Logging;
  12. namespace MediaBrowser.Controller.LibraryTaskScheduler;
  13. /// <summary>
  14. /// Provides Parallel action interface to process tasks with a set concurrency level.
  15. /// </summary>
  16. public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibraryScheduler, IAsyncDisposable
  17. {
  18. private const int CleanupGracePeriod = 60;
  19. private readonly IHostApplicationLifetime _hostApplicationLifetime;
  20. private readonly ILogger<LimitedConcurrencyLibraryScheduler> _logger;
  21. private readonly IServerConfigurationManager _serverConfigurationManager;
  22. private readonly Dictionary<CancellationTokenSource, Task> _taskRunners = new();
  23. private static readonly AsyncLocal<CancellationTokenSource> _deadlockDetector = new();
  24. /// <summary>
  25. /// Gets used to lock all operations on the Tasks queue and creating workers.
  26. /// </summary>
  27. private readonly Lock _taskLock = new();
  28. private readonly Channel<TaskQueueItem> _tasks = Channel.CreateUnbounded<TaskQueueItem>();
  29. private volatile int _workCounter;
  30. private Task? _cleanupTask;
  31. private bool _disposed;
  32. /// <summary>
  33. /// Initializes a new instance of the <see cref="LimitedConcurrencyLibraryScheduler"/> class.
  34. /// </summary>
  35. /// <param name="hostApplicationLifetime">The hosting lifetime.</param>
  36. /// <param name="logger">The logger.</param>
  37. /// <param name="serverConfigurationManager">The server configuration manager.</param>
  38. public LimitedConcurrencyLibraryScheduler(
  39. IHostApplicationLifetime hostApplicationLifetime,
  40. ILogger<LimitedConcurrencyLibraryScheduler> logger,
  41. IServerConfigurationManager serverConfigurationManager)
  42. {
  43. _hostApplicationLifetime = hostApplicationLifetime;
  44. _logger = logger;
  45. _serverConfigurationManager = serverConfigurationManager;
  46. }
  47. private void ScheduleTaskCleanup()
  48. {
  49. lock (_taskLock)
  50. {
  51. if (_cleanupTask is not null)
  52. {
  53. _logger.LogDebug("Cleanup task already scheduled.");
  54. // cleanup task is already running.
  55. return;
  56. }
  57. _cleanupTask = RunCleanupTask();
  58. }
  59. async Task RunCleanupTask()
  60. {
  61. _logger.LogDebug("Schedule cleanup task in {CleanupGracePerioid} sec.", CleanupGracePeriod);
  62. await Task.Delay(TimeSpan.FromSeconds(CleanupGracePeriod)).ConfigureAwait(false);
  63. if (_disposed)
  64. {
  65. _logger.LogDebug("Abort cleaning up, already disposed.");
  66. return;
  67. }
  68. lock (_taskLock)
  69. {
  70. if (_tasks.Reader.Count > 0 || _workCounter > 0)
  71. {
  72. _logger.LogDebug("Delay cleanup task, operations still running.");
  73. // tasks are still there so its still in use. Reschedule cleanup task.
  74. // we cannot just exit here and rely on the other invoker because there is a considerable timeframe where it could have already ended.
  75. _cleanupTask = RunCleanupTask();
  76. return;
  77. }
  78. }
  79. _logger.LogDebug("Cleanup runners.");
  80. foreach (var item in _taskRunners.ToArray())
  81. {
  82. await item.Key.CancelAsync().ConfigureAwait(false);
  83. _taskRunners.Remove(item.Key);
  84. }
  85. }
  86. }
  87. private bool ShouldForceSequentialOperation()
  88. {
  89. // if the user either set the setting to 1 or it's unset and we have fewer than 4 cores it's better to run sequentially.
  90. var fanoutSetting = _serverConfigurationManager.Configuration.LibraryScanFanoutConcurrency;
  91. return fanoutSetting == 1 || (fanoutSetting <= 0 && Environment.ProcessorCount <= 3);
  92. }
  93. private int CalculateScanConcurrencyLimit()
  94. {
  95. // when this is invoked, we already checked ShouldForceSequentialOperation for the sequential check.
  96. var fanoutConcurrency = _serverConfigurationManager.Configuration.LibraryScanFanoutConcurrency;
  97. if (fanoutConcurrency <= 0)
  98. {
  99. // in case the user did not set a limit manually, we can assume he has 3 or more cores as already checked by ShouldForceSequentialOperation.
  100. return Environment.ProcessorCount - 3;
  101. }
  102. return fanoutConcurrency;
  103. }
  104. private void Worker()
  105. {
  106. lock (_taskLock)
  107. {
  108. var operationFanout = Math.Max(0, CalculateScanConcurrencyLimit() - _taskRunners.Count);
  109. _logger.LogDebug("Spawn {NumberRunners} new runners.", operationFanout);
  110. for (int i = 0; i < operationFanout; i++)
  111. {
  112. var stopToken = new CancellationTokenSource();
  113. var combinedSource = CancellationTokenSource.CreateLinkedTokenSource(stopToken.Token, _hostApplicationLifetime.ApplicationStopping);
  114. _taskRunners.Add(
  115. combinedSource,
  116. Task.Factory.StartNew(
  117. ItemWorker,
  118. (combinedSource, stopToken),
  119. combinedSource.Token,
  120. TaskCreationOptions.PreferFairness,
  121. TaskScheduler.Default));
  122. }
  123. }
  124. }
  125. private async Task ItemWorker(object? obj)
  126. {
  127. var stopToken = ((CancellationTokenSource TaskStop, CancellationTokenSource GlobalStop))obj!;
  128. _deadlockDetector.Value = stopToken.TaskStop;
  129. try
  130. {
  131. while (!stopToken.GlobalStop.Token.IsCancellationRequested)
  132. {
  133. var item = await _tasks.Reader.ReadAsync(stopToken.GlobalStop.Token).ConfigureAwait(false);
  134. try
  135. {
  136. var newWorkerLimit = Interlocked.Increment(ref _workCounter) > 0;
  137. Debug.Assert(newWorkerLimit, "_workCounter > 0");
  138. _logger.LogDebug("Process new item '{Data}'.", item.Data);
  139. await ProcessItem(item).ConfigureAwait(false);
  140. }
  141. finally
  142. {
  143. var newWorkerLimit = Interlocked.Decrement(ref _workCounter) >= 0;
  144. Debug.Assert(newWorkerLimit, "_workCounter > 0");
  145. }
  146. }
  147. }
  148. catch (OperationCanceledException) when (stopToken.TaskStop.IsCancellationRequested)
  149. {
  150. // thats how you do it, interupt the waiter thread. There is nothing to do here when it was on purpose.
  151. }
  152. finally
  153. {
  154. _logger.LogDebug("Cleanup Runner'.");
  155. _deadlockDetector.Value = default!;
  156. _taskRunners.Remove(stopToken.TaskStop);
  157. stopToken.GlobalStop.Dispose();
  158. stopToken.TaskStop.Dispose();
  159. }
  160. }
  161. private async Task ProcessItem(TaskQueueItem item)
  162. {
  163. try
  164. {
  165. if (item.CancellationToken.IsCancellationRequested)
  166. {
  167. // if item is cancelled, just skip it
  168. return;
  169. }
  170. await item.Worker(item.Data).ConfigureAwait(true);
  171. }
  172. catch (System.Exception ex)
  173. {
  174. _logger.LogError(ex, "Error while performing a library operation");
  175. }
  176. finally
  177. {
  178. item.Progress.Report(100);
  179. item.Done.SetResult();
  180. }
  181. }
  182. /// <inheritdoc/>
  183. public async Task Enqueue<T>(T[] data, Func<T, IProgress<double>, Task> worker, IProgress<double> progress, CancellationToken cancellationToken)
  184. {
  185. if (_disposed)
  186. {
  187. return;
  188. }
  189. if (data.Length == 0 || cancellationToken.IsCancellationRequested)
  190. {
  191. progress.Report(100);
  192. return;
  193. }
  194. _logger.LogDebug("Enqueue new Workset of {NoItems} items.", data.Length);
  195. TaskQueueItem[] workItems = null!;
  196. void UpdateProgress()
  197. {
  198. progress.Report(workItems.Select(e => e.ProgressValue).Average());
  199. }
  200. workItems = data.Select(item =>
  201. {
  202. TaskQueueItem queueItem = null!;
  203. return queueItem = new TaskQueueItem()
  204. {
  205. Data = item!,
  206. Progress = new Progress<double>(innerPercent =>
  207. {
  208. // round the percent and only update progress if it changed to prevent excessive UpdateProgress calls
  209. var innerPercentRounded = Math.Round(innerPercent);
  210. if (queueItem.ProgressValue != innerPercentRounded)
  211. {
  212. queueItem.ProgressValue = innerPercentRounded;
  213. UpdateProgress();
  214. }
  215. }),
  216. Worker = (val) => worker((T)val, queueItem.Progress),
  217. CancellationToken = cancellationToken
  218. };
  219. }).ToArray();
  220. if (ShouldForceSequentialOperation())
  221. {
  222. _logger.LogDebug("Process sequentially.");
  223. try
  224. {
  225. foreach (var item in workItems)
  226. {
  227. await ProcessItem(item).ConfigureAwait(false);
  228. }
  229. }
  230. catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
  231. {
  232. // operation is cancelled. Do nothing.
  233. }
  234. _logger.LogDebug("Process sequentially done.");
  235. return;
  236. }
  237. for (var i = 0; i < workItems.Length; i++)
  238. {
  239. var item = workItems[i]!;
  240. await _tasks.Writer.WriteAsync(item, CancellationToken.None).ConfigureAwait(false);
  241. }
  242. if (_deadlockDetector.Value is not null)
  243. {
  244. _logger.LogDebug("Nested invocation detected, process in-place.");
  245. try
  246. {
  247. // we are in a nested loop. There is no reason to spawn a task here as that would just lead to deadlocks and no additional concurrency is achieved
  248. while (workItems.Any(e => !e.Done.Task.IsCompleted) && _tasks.TryTake(out var item, 200, _deadlockDetector.Value.Token))
  249. {
  250. await ProcessItem(item).ConfigureAwait(false);
  251. }
  252. }
  253. catch (OperationCanceledException) when (_deadlockDetector.Value.IsCancellationRequested)
  254. {
  255. // operation is cancelled. Do nothing.
  256. }
  257. _logger.LogDebug("process in-place done.");
  258. }
  259. else
  260. {
  261. Worker();
  262. _logger.LogDebug("Wait for {NoWorkers} to complete.", workItems.Length);
  263. await Task.WhenAll([.. workItems.Select(f => f.Done.Task)]).ConfigureAwait(false);
  264. _logger.LogDebug("{NoWorkers} completed.", workItems.Length);
  265. ScheduleTaskCleanup();
  266. }
  267. }
  268. /// <inheritdoc/>
  269. public async ValueTask DisposeAsync()
  270. {
  271. if (_disposed)
  272. {
  273. return;
  274. }
  275. _disposed = true;
  276. _tasks.Writer.Complete();
  277. foreach (var item in _taskRunners)
  278. {
  279. await item.Key.CancelAsync().ConfigureAwait(false);
  280. }
  281. if (_cleanupTask is not null)
  282. {
  283. await _cleanupTask.ConfigureAwait(false);
  284. _cleanupTask?.Dispose();
  285. }
  286. }
  287. private class TaskQueueItem
  288. {
  289. public required object Data { get; init; }
  290. public double ProgressValue { get; set; }
  291. public required Func<object, Task> Worker { get; init; }
  292. public required IProgress<double> Progress { get; init; }
  293. public TaskCompletionSource Done { get; } = new();
  294. public CancellationToken CancellationToken { get; init; }
  295. }
  296. }