| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335 | using System;using System.Collections.Concurrent;using System.Collections.Generic;using System.Diagnostics;using System.Linq;using System.Threading;using System.Threading.Tasks;using MediaBrowser.Controller.Configuration;using Microsoft.Extensions.Hosting;using Microsoft.Extensions.Logging;namespace MediaBrowser.Controller.LibraryTaskScheduler;/// <summary>/// Provides Parallel action interface to process tasks with a set concurrency level./// </summary>public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibraryScheduler, IAsyncDisposable{    private const int CleanupGracePeriod = 60;    private readonly IHostApplicationLifetime _hostApplicationLifetime;    private readonly ILogger<LimitedConcurrencyLibraryScheduler> _logger;    private readonly IServerConfigurationManager _serverConfigurationManager;    private readonly Dictionary<CancellationTokenSource, Task> _taskRunners = new();    private static readonly AsyncLocal<CancellationTokenSource> _deadlockDetector = new();    /// <summary>    /// Gets used to lock all operations on the Tasks queue and creating workers.    /// </summary>    private readonly Lock _taskLock = new();    private readonly BlockingCollection<TaskQueueItem> _tasks = new();    private volatile int _workCounter;    private Task? _cleanupTask;    private bool _disposed;    /// <summary>    /// Initializes a new instance of the <see cref="LimitedConcurrencyLibraryScheduler"/> class.    /// </summary>    /// <param name="hostApplicationLifetime">The hosting lifetime.</param>    /// <param name="logger">The logger.</param>    /// <param name="serverConfigurationManager">The server configuration manager.</param>    public LimitedConcurrencyLibraryScheduler(        IHostApplicationLifetime hostApplicationLifetime,        ILogger<LimitedConcurrencyLibraryScheduler> logger,        IServerConfigurationManager serverConfigurationManager)    {        _hostApplicationLifetime = hostApplicationLifetime;        _logger = logger;        _serverConfigurationManager = serverConfigurationManager;    }    private void ScheduleTaskCleanup()    {        lock (_taskLock)        {            if (_cleanupTask is not null)            {                _logger.LogDebug("Cleanup task already scheduled.");                // cleanup task is already running.                return;            }            _cleanupTask = RunCleanupTask();        }        async Task RunCleanupTask()        {            _logger.LogDebug("Schedule cleanup task in {CleanupGracePerioid} sec.", CleanupGracePeriod);            await Task.Delay(TimeSpan.FromSeconds(CleanupGracePeriod)).ConfigureAwait(false);            if (_disposed)            {                _logger.LogDebug("Abort cleaning up, already disposed.");                return;            }            lock (_taskLock)            {                if (_tasks.Count > 0 || _workCounter > 0)                {                    _logger.LogDebug("Delay cleanup task, operations still running.");                    // tasks are still there so its still in use. Reschedule cleanup task.                    // we cannot just exit here and rely on the other invoker because there is a considerable timeframe where it could have already ended.                    _cleanupTask = RunCleanupTask();                    return;                }            }            _logger.LogDebug("Cleanup runners.");            foreach (var item in _taskRunners.ToArray())            {                await item.Key.CancelAsync().ConfigureAwait(false);                _taskRunners.Remove(item.Key);            }        }    }    private bool ShouldForceSequentialOperation()    {        // if the user either set the setting to 1 or it's unset and we have fewer than 4 cores it's better to run sequentially.        var fanoutSetting = _serverConfigurationManager.Configuration.LibraryScanFanoutConcurrency;        return fanoutSetting == 1 || (fanoutSetting <= 0 && Environment.ProcessorCount <= 3);    }    private int CalculateScanConcurrencyLimit()    {        // when this is invoked, we already checked ShouldForceSequentialOperation for the sequential check.        var fanoutConcurrency = _serverConfigurationManager.Configuration.LibraryScanFanoutConcurrency;        if (fanoutConcurrency <= 0)        {            // in case the user did not set a limit manually, we can assume he has 3 or more cores as already checked by ShouldForceSequentialOperation.            return Environment.ProcessorCount - 3;        }        return fanoutConcurrency;    }    private void Worker()    {        lock (_taskLock)        {            var operationFanout = Math.Max(0, CalculateScanConcurrencyLimit() - _taskRunners.Count);            _logger.LogDebug("Spawn {NumberRunners} new runners.", operationFanout);            for (int i = 0; i < operationFanout; i++)            {                var stopToken = new CancellationTokenSource();                var combinedSource = CancellationTokenSource.CreateLinkedTokenSource(stopToken.Token, _hostApplicationLifetime.ApplicationStopping);                _taskRunners.Add(                    combinedSource,                    Task.Factory.StartNew(                        ItemWorker,                        (combinedSource, stopToken),                        combinedSource.Token,                        TaskCreationOptions.PreferFairness,                        TaskScheduler.Default));            }        }    }    private async Task ItemWorker(object? obj)    {        var stopToken = ((CancellationTokenSource TaskStop, CancellationTokenSource GlobalStop))obj!;        _deadlockDetector.Value = stopToken.TaskStop;        try        {            foreach (var item in _tasks.GetConsumingEnumerable(stopToken.GlobalStop.Token))            {                stopToken.GlobalStop.Token.ThrowIfCancellationRequested();                try                {                    var newWorkerLimit = Interlocked.Increment(ref _workCounter) > 0;                    Debug.Assert(newWorkerLimit, "_workCounter > 0");                    _logger.LogDebug("Process new item '{Data}'.", item.Data);                    await ProcessItem(item).ConfigureAwait(false);                }                finally                {                    var newWorkerLimit = Interlocked.Decrement(ref _workCounter) >= 0;                    Debug.Assert(newWorkerLimit, "_workCounter > 0");                }            }        }        catch (OperationCanceledException) when (stopToken.TaskStop.IsCancellationRequested)        {            // thats how you do it, interupt the waiter thread. There is nothing to do here when it was on purpose.        }        finally        {            _logger.LogDebug("Cleanup Runner'.");            _deadlockDetector.Value = default!;            _taskRunners.Remove(stopToken.TaskStop);            stopToken.GlobalStop.Dispose();            stopToken.TaskStop.Dispose();        }    }    private async Task ProcessItem(TaskQueueItem item)    {        try        {            if (item.CancellationToken.IsCancellationRequested)            {                // if item is cancelled, just skip it                return;            }            await item.Worker(item.Data).ConfigureAwait(true);        }        catch (System.Exception ex)        {            _logger.LogError(ex, "Error while performing a library operation");        }        finally        {            item.Progress.Report(100);            item.Done.SetResult();        }    }    /// <inheritdoc/>    public async Task Enqueue<T>(T[] data, Func<T, IProgress<double>, Task> worker, IProgress<double> progress, CancellationToken cancellationToken)    {        if (_disposed)        {            return;        }        if (data.Length == 0 || cancellationToken.IsCancellationRequested)        {            progress.Report(100);            return;        }        _logger.LogDebug("Enqueue new Workset of {NoItems} items.", data.Length);        TaskQueueItem[] workItems = null!;        void UpdateProgress()        {            progress.Report(workItems.Select(e => e.ProgressValue).Average());        }        workItems = data.Select(item =>        {            TaskQueueItem queueItem = null!;            return queueItem = new TaskQueueItem()            {                Data = item!,                Progress = new Progress<double>(innerPercent =>                    {                        // round the percent and only update progress if it changed to prevent excessive UpdateProgress calls                        var innerPercentRounded = Math.Round(innerPercent);                        if (queueItem.ProgressValue != innerPercentRounded)                        {                            queueItem.ProgressValue = innerPercentRounded;                            UpdateProgress();                        }                    }),                Worker = (val) => worker((T)val, queueItem.Progress),                CancellationToken = cancellationToken            };        }).ToArray();        if (ShouldForceSequentialOperation())        {            _logger.LogDebug("Process sequentially.");            try            {                foreach (var item in workItems)                {                    await ProcessItem(item).ConfigureAwait(false);                }            }            catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)            {                // operation is cancelled. Do nothing.            }            _logger.LogDebug("Process sequentially done.");            return;        }        for (var i = 0; i < workItems.Length; i++)        {            var item = workItems[i]!;            _tasks.Add(item, CancellationToken.None);        }        if (_deadlockDetector.Value is not null)        {            _logger.LogDebug("Nested invocation detected, process in-place.");            try            {                // we are in a nested loop. There is no reason to spawn a task here as that would just lead to deadlocks and no additional concurrency is achieved                while (workItems.Any(e => !e.Done.Task.IsCompleted) && _tasks.TryTake(out var item, 200, _deadlockDetector.Value.Token))                {                    await ProcessItem(item).ConfigureAwait(false);                }            }            catch (OperationCanceledException) when (_deadlockDetector.Value.IsCancellationRequested)            {                // operation is cancelled. Do nothing.            }            _logger.LogDebug("process in-place done.");        }        else        {            Worker();            _logger.LogDebug("Wait for {NoWorkers} to complete.", workItems.Length);            await Task.WhenAll([.. workItems.Select(f => f.Done.Task)]).ConfigureAwait(false);            _logger.LogDebug("{NoWorkers} completed.", workItems.Length);            ScheduleTaskCleanup();        }    }    /// <inheritdoc/>    public async ValueTask DisposeAsync()    {        if (_disposed)        {            return;        }        _disposed = true;        _tasks.CompleteAdding();        foreach (var item in _taskRunners)        {            await item.Key.CancelAsync().ConfigureAwait(false);        }        _tasks.Dispose();        if (_cleanupTask is not null)        {            await _cleanupTask.ConfigureAwait(false);            _cleanupTask?.Dispose();        }    }    private class TaskQueueItem    {        public required object Data { get; init; }        public double ProgressValue { get; set; }        public required Func<object, Task> Worker { get; init; }        public required IProgress<double> Progress { get; init; }        public TaskCompletionSource Done { get; } = new();        public CancellationToken CancellationToken { get; init; }    }}
 |