| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335 | 
							- using System;
 
- using System.Collections.Concurrent;
 
- using System.Collections.Generic;
 
- using System.Diagnostics;
 
- using System.Linq;
 
- using System.Threading;
 
- using System.Threading.Tasks;
 
- using MediaBrowser.Controller.Configuration;
 
- using Microsoft.Extensions.Hosting;
 
- using Microsoft.Extensions.Logging;
 
- namespace MediaBrowser.Controller.LibraryTaskScheduler;
 
- /// <summary>
 
- /// Provides Parallel action interface to process tasks with a set concurrency level.
 
- /// </summary>
 
- public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibraryScheduler, IAsyncDisposable
 
- {
 
-     private const int CleanupGracePeriod = 60;
 
-     private readonly IHostApplicationLifetime _hostApplicationLifetime;
 
-     private readonly ILogger<LimitedConcurrencyLibraryScheduler> _logger;
 
-     private readonly IServerConfigurationManager _serverConfigurationManager;
 
-     private readonly Dictionary<CancellationTokenSource, Task> _taskRunners = new();
 
-     private static readonly AsyncLocal<CancellationTokenSource> _deadlockDetector = new();
 
-     /// <summary>
 
-     /// Gets used to lock all operations on the Tasks queue and creating workers.
 
-     /// </summary>
 
-     private readonly Lock _taskLock = new();
 
-     private readonly BlockingCollection<TaskQueueItem> _tasks = new();
 
-     private volatile int _workCounter;
 
-     private Task? _cleanupTask;
 
-     private bool _disposed;
 
-     /// <summary>
 
-     /// Initializes a new instance of the <see cref="LimitedConcurrencyLibraryScheduler"/> class.
 
-     /// </summary>
 
-     /// <param name="hostApplicationLifetime">The hosting lifetime.</param>
 
-     /// <param name="logger">The logger.</param>
 
-     /// <param name="serverConfigurationManager">The server configuration manager.</param>
 
-     public LimitedConcurrencyLibraryScheduler(
 
-         IHostApplicationLifetime hostApplicationLifetime,
 
-         ILogger<LimitedConcurrencyLibraryScheduler> logger,
 
-         IServerConfigurationManager serverConfigurationManager)
 
-     {
 
-         _hostApplicationLifetime = hostApplicationLifetime;
 
-         _logger = logger;
 
-         _serverConfigurationManager = serverConfigurationManager;
 
-     }
 
-     private void ScheduleTaskCleanup()
 
-     {
 
-         lock (_taskLock)
 
-         {
 
-             if (_cleanupTask is not null)
 
-             {
 
-                 _logger.LogDebug("Cleanup task already scheduled.");
 
-                 // cleanup task is already running.
 
-                 return;
 
-             }
 
-             _cleanupTask = RunCleanupTask();
 
-         }
 
-         async Task RunCleanupTask()
 
-         {
 
-             _logger.LogDebug("Schedule cleanup task in {CleanupGracePerioid} sec.", CleanupGracePeriod);
 
-             await Task.Delay(TimeSpan.FromSeconds(CleanupGracePeriod)).ConfigureAwait(false);
 
-             if (_disposed)
 
-             {
 
-                 _logger.LogDebug("Abort cleaning up, already disposed.");
 
-                 return;
 
-             }
 
-             lock (_taskLock)
 
-             {
 
-                 if (_tasks.Count > 0 || _workCounter > 0)
 
-                 {
 
-                     _logger.LogDebug("Delay cleanup task, operations still running.");
 
-                     // tasks are still there so its still in use. Reschedule cleanup task.
 
-                     // we cannot just exit here and rely on the other invoker because there is a considerable timeframe where it could have already ended.
 
-                     _cleanupTask = RunCleanupTask();
 
-                     return;
 
-                 }
 
-             }
 
-             _logger.LogDebug("Cleanup runners.");
 
-             foreach (var item in _taskRunners.ToArray())
 
-             {
 
-                 await item.Key.CancelAsync().ConfigureAwait(false);
 
-                 _taskRunners.Remove(item.Key);
 
-             }
 
-         }
 
-     }
 
-     private bool ShouldForceSequentialOperation()
 
-     {
 
-         // if the user either set the setting to 1 or it's unset and we have fewer than 4 cores it's better to run sequentially.
 
-         var fanoutSetting = _serverConfigurationManager.Configuration.LibraryScanFanoutConcurrency;
 
-         return fanoutSetting == 1 || (fanoutSetting <= 0 && Environment.ProcessorCount <= 3);
 
-     }
 
-     private int CalculateScanConcurrencyLimit()
 
-     {
 
-         // when this is invoked, we already checked ShouldForceSequentialOperation for the sequential check.
 
-         var fanoutConcurrency = _serverConfigurationManager.Configuration.LibraryScanFanoutConcurrency;
 
-         if (fanoutConcurrency <= 0)
 
-         {
 
-             // in case the user did not set a limit manually, we can assume he has 3 or more cores as already checked by ShouldForceSequentialOperation.
 
-             return Environment.ProcessorCount - 3;
 
-         }
 
-         return fanoutConcurrency;
 
-     }
 
-     private void Worker()
 
-     {
 
-         lock (_taskLock)
 
-         {
 
-             var operationFanout = Math.Max(0, CalculateScanConcurrencyLimit() - _taskRunners.Count);
 
-             _logger.LogDebug("Spawn {NumberRunners} new runners.", operationFanout);
 
-             for (int i = 0; i < operationFanout; i++)
 
-             {
 
-                 var stopToken = new CancellationTokenSource();
 
-                 var combinedSource = CancellationTokenSource.CreateLinkedTokenSource(stopToken.Token, _hostApplicationLifetime.ApplicationStopping);
 
-                 _taskRunners.Add(
 
-                     combinedSource,
 
-                     Task.Factory.StartNew(
 
-                         ItemWorker,
 
-                         (combinedSource, stopToken),
 
-                         combinedSource.Token,
 
-                         TaskCreationOptions.PreferFairness,
 
-                         TaskScheduler.Default));
 
-             }
 
-         }
 
-     }
 
-     private async Task ItemWorker(object? obj)
 
-     {
 
-         var stopToken = ((CancellationTokenSource TaskStop, CancellationTokenSource GlobalStop))obj!;
 
-         _deadlockDetector.Value = stopToken.TaskStop;
 
-         try
 
-         {
 
-             foreach (var item in _tasks.GetConsumingEnumerable(stopToken.GlobalStop.Token))
 
-             {
 
-                 stopToken.GlobalStop.Token.ThrowIfCancellationRequested();
 
-                 try
 
-                 {
 
-                     var newWorkerLimit = Interlocked.Increment(ref _workCounter) > 0;
 
-                     Debug.Assert(newWorkerLimit, "_workCounter > 0");
 
-                     _logger.LogDebug("Process new item '{Data}'.", item.Data);
 
-                     await ProcessItem(item).ConfigureAwait(false);
 
-                 }
 
-                 finally
 
-                 {
 
-                     var newWorkerLimit = Interlocked.Decrement(ref _workCounter) >= 0;
 
-                     Debug.Assert(newWorkerLimit, "_workCounter > 0");
 
-                 }
 
-             }
 
-         }
 
-         catch (OperationCanceledException) when (stopToken.TaskStop.IsCancellationRequested)
 
-         {
 
-             // thats how you do it, interupt the waiter thread. There is nothing to do here when it was on purpose.
 
-         }
 
-         finally
 
-         {
 
-             _logger.LogDebug("Cleanup Runner'.");
 
-             _deadlockDetector.Value = default!;
 
-             _taskRunners.Remove(stopToken.TaskStop);
 
-             stopToken.GlobalStop.Dispose();
 
-             stopToken.TaskStop.Dispose();
 
-         }
 
-     }
 
-     private async Task ProcessItem(TaskQueueItem item)
 
-     {
 
-         try
 
-         {
 
-             if (item.CancellationToken.IsCancellationRequested)
 
-             {
 
-                 // if item is cancelled, just skip it
 
-                 return;
 
-             }
 
-             await item.Worker(item.Data).ConfigureAwait(true);
 
-         }
 
-         catch (System.Exception ex)
 
-         {
 
-             _logger.LogError(ex, "Error while performing a library operation");
 
-         }
 
-         finally
 
-         {
 
-             item.Progress.Report(100);
 
-             item.Done.SetResult();
 
-         }
 
-     }
 
-     /// <inheritdoc/>
 
-     public async Task Enqueue<T>(T[] data, Func<T, IProgress<double>, Task> worker, IProgress<double> progress, CancellationToken cancellationToken)
 
-     {
 
-         if (_disposed)
 
-         {
 
-             return;
 
-         }
 
-         if (data.Length == 0 || cancellationToken.IsCancellationRequested)
 
-         {
 
-             progress.Report(100);
 
-             return;
 
-         }
 
-         _logger.LogDebug("Enqueue new Workset of {NoItems} items.", data.Length);
 
-         TaskQueueItem[] workItems = null!;
 
-         void UpdateProgress()
 
-         {
 
-             progress.Report(workItems.Select(e => e.ProgressValue).Average());
 
-         }
 
-         workItems = data.Select(item =>
 
-         {
 
-             TaskQueueItem queueItem = null!;
 
-             return queueItem = new TaskQueueItem()
 
-             {
 
-                 Data = item!,
 
-                 Progress = new Progress<double>(innerPercent =>
 
-                     {
 
-                         // round the percent and only update progress if it changed to prevent excessive UpdateProgress calls
 
-                         var innerPercentRounded = Math.Round(innerPercent);
 
-                         if (queueItem.ProgressValue != innerPercentRounded)
 
-                         {
 
-                             queueItem.ProgressValue = innerPercentRounded;
 
-                             UpdateProgress();
 
-                         }
 
-                     }),
 
-                 Worker = (val) => worker((T)val, queueItem.Progress),
 
-                 CancellationToken = cancellationToken
 
-             };
 
-         }).ToArray();
 
-         if (ShouldForceSequentialOperation())
 
-         {
 
-             _logger.LogDebug("Process sequentially.");
 
-             try
 
-             {
 
-                 foreach (var item in workItems)
 
-                 {
 
-                     await ProcessItem(item).ConfigureAwait(false);
 
-                 }
 
-             }
 
-             catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
 
-             {
 
-                 // operation is cancelled. Do nothing.
 
-             }
 
-             _logger.LogDebug("Process sequentially done.");
 
-             return;
 
-         }
 
-         for (var i = 0; i < workItems.Length; i++)
 
-         {
 
-             var item = workItems[i]!;
 
-             _tasks.Add(item, CancellationToken.None);
 
-         }
 
-         if (_deadlockDetector.Value is not null)
 
-         {
 
-             _logger.LogDebug("Nested invocation detected, process in-place.");
 
-             try
 
-             {
 
-                 // we are in a nested loop. There is no reason to spawn a task here as that would just lead to deadlocks and no additional concurrency is achieved
 
-                 while (workItems.Any(e => !e.Done.Task.IsCompleted) && _tasks.TryTake(out var item, 200, _deadlockDetector.Value.Token))
 
-                 {
 
-                     await ProcessItem(item).ConfigureAwait(false);
 
-                 }
 
-             }
 
-             catch (OperationCanceledException) when (_deadlockDetector.Value.IsCancellationRequested)
 
-             {
 
-                 // operation is cancelled. Do nothing.
 
-             }
 
-             _logger.LogDebug("process in-place done.");
 
-         }
 
-         else
 
-         {
 
-             Worker();
 
-             _logger.LogDebug("Wait for {NoWorkers} to complete.", workItems.Length);
 
-             await Task.WhenAll([.. workItems.Select(f => f.Done.Task)]).ConfigureAwait(false);
 
-             _logger.LogDebug("{NoWorkers} completed.", workItems.Length);
 
-             ScheduleTaskCleanup();
 
-         }
 
-     }
 
-     /// <inheritdoc/>
 
-     public async ValueTask DisposeAsync()
 
-     {
 
-         if (_disposed)
 
-         {
 
-             return;
 
-         }
 
-         _disposed = true;
 
-         _tasks.CompleteAdding();
 
-         foreach (var item in _taskRunners)
 
-         {
 
-             await item.Key.CancelAsync().ConfigureAwait(false);
 
-         }
 
-         _tasks.Dispose();
 
-         if (_cleanupTask is not null)
 
-         {
 
-             await _cleanupTask.ConfigureAwait(false);
 
-             _cleanupTask?.Dispose();
 
-         }
 
-     }
 
-     private class TaskQueueItem
 
-     {
 
-         public required object Data { get; init; }
 
-         public double ProgressValue { get; set; }
 
-         public required Func<object, Task> Worker { get; init; }
 
-         public required IProgress<double> Progress { get; init; }
 
-         public TaskCompletionSource Done { get; } = new();
 
-         public CancellationToken CancellationToken { get; init; }
 
-     }
 
- }
 
 
  |