소스 검색

Use proper scheduler that honors the parallel task limit (#14281)

JPVenson 22 시간 전
부모
커밋
0e1be6ce30

+ 3 - 0
Emby.Server.Implementations/ApplicationHost.cs

@@ -62,6 +62,7 @@ using MediaBrowser.Controller.Entities;
 using MediaBrowser.Controller.Entities.TV;
 using MediaBrowser.Controller.IO;
 using MediaBrowser.Controller.Library;
+using MediaBrowser.Controller.LibraryTaskScheduler;
 using MediaBrowser.Controller.LiveTv;
 using MediaBrowser.Controller.Lyrics;
 using MediaBrowser.Controller.MediaEncoding;
@@ -552,6 +553,7 @@ namespace Emby.Server.Implementations
             serviceCollection.AddSingleton<ISessionManager, SessionManager>();
 
             serviceCollection.AddSingleton<ICollectionManager, CollectionManager>();
+            serviceCollection.AddSingleton<ILimitedConcurrencyLibraryScheduler, LimitedConcurrencyLibraryScheduler>();
 
             serviceCollection.AddSingleton<IPlaylistManager, PlaylistManager>();
 
@@ -650,6 +652,7 @@ namespace Emby.Server.Implementations
             CollectionFolder.ApplicationHost = this;
             Folder.UserViewManager = Resolve<IUserViewManager>();
             Folder.CollectionManager = Resolve<ICollectionManager>();
+            Folder.LimitedConcurrencyLibraryScheduler = Resolve<ILimitedConcurrencyLibraryScheduler>();
             Episode.MediaEncoder = Resolve<IMediaEncoder>();
             UserView.TVSeriesManager = Resolve<ITVSeriesManager>();
             Video.RecordingsManager = Resolve<IRecordingsManager>();

+ 2 - 2
Emby.Server.Implementations/ScheduledTasks/Tasks/RefreshMediaLibraryTask.cs

@@ -54,12 +54,12 @@ public class RefreshMediaLibraryTask : IScheduledTask
     }
 
     /// <inheritdoc />
-    public Task ExecuteAsync(IProgress<double> progress, CancellationToken cancellationToken)
+    public async Task ExecuteAsync(IProgress<double> progress, CancellationToken cancellationToken)
     {
         cancellationToken.ThrowIfCancellationRequested();
 
         progress.Report(0);
 
-        return ((LibraryManager)_libraryManager).ValidateMediaLibraryInternal(progress, cancellationToken);
+        await ((LibraryManager)_libraryManager).ValidateMediaLibraryInternal(progress, cancellationToken).ConfigureAwait(false);
     }
 }

+ 12 - 48
MediaBrowser.Controller/Entities/Folder.cs

@@ -11,7 +11,6 @@ using System.Security;
 using System.Text.Json.Serialization;
 using System.Threading;
 using System.Threading.Tasks;
-using System.Threading.Tasks.Dataflow;
 using J2N.Collections.Generic.Extensions;
 using Jellyfin.Data;
 using Jellyfin.Data.Enums;
@@ -25,6 +24,7 @@ using MediaBrowser.Controller.Dto;
 using MediaBrowser.Controller.Entities.Audio;
 using MediaBrowser.Controller.Entities.Movies;
 using MediaBrowser.Controller.Library;
+using MediaBrowser.Controller.LibraryTaskScheduler;
 using MediaBrowser.Controller.Providers;
 using MediaBrowser.Model.Dto;
 using MediaBrowser.Model.IO;
@@ -49,6 +49,8 @@ namespace MediaBrowser.Controller.Entities
 
         public static IUserViewManager UserViewManager { get; set; }
 
+        public static ILimitedConcurrencyLibraryScheduler LimitedConcurrencyLibraryScheduler { get; set; }
+
         /// <summary>
         /// Gets or sets a value indicating whether this instance is root.
         /// </summary>
@@ -598,51 +600,13 @@ namespace MediaBrowser.Controller.Entities
         /// <returns>Task.</returns>
         private async Task RunTasks<T>(Func<T, IProgress<double>, Task> task, IList<T> children, IProgress<double> progress, CancellationToken cancellationToken)
         {
-            var childrenCount = children.Count;
-            var childrenProgress = new double[childrenCount];
-
-            void UpdateProgress()
-            {
-                progress.Report(childrenProgress.Average());
-            }
-
-            var fanoutConcurrency = ConfigurationManager.Configuration.LibraryScanFanoutConcurrency;
-            var parallelism = fanoutConcurrency > 0 ? fanoutConcurrency : Environment.ProcessorCount;
-
-            var actionBlock = new ActionBlock<int>(
-                async i =>
-                {
-                    var innerProgress = new Progress<double>(innerPercent =>
-                    {
-                        // round the percent and only update progress if it changed to prevent excessive UpdateProgress calls
-                        var innerPercentRounded = Math.Round(innerPercent);
-                        if (childrenProgress[i] != innerPercentRounded)
-                        {
-                            childrenProgress[i] = innerPercentRounded;
-                            UpdateProgress();
-                        }
-                    });
-
-                    await task(children[i], innerProgress).ConfigureAwait(false);
-
-                    childrenProgress[i] = 100;
-
-                    UpdateProgress();
-                },
-                new ExecutionDataflowBlockOptions
-                {
-                    MaxDegreeOfParallelism = parallelism,
-                    CancellationToken = cancellationToken,
-                });
-
-            for (var i = 0; i < childrenCount; i++)
-            {
-                await actionBlock.SendAsync(i, cancellationToken).ConfigureAwait(false);
-            }
-
-            actionBlock.Complete();
-
-            await actionBlock.Completion.ConfigureAwait(false);
+            await LimitedConcurrencyLibraryScheduler
+                .Enqueue(
+                    children.ToArray(),
+                    task,
+                    progress,
+                    cancellationToken)
+                .ConfigureAwait(false);
         }
 
         /// <summary>
@@ -1008,7 +972,7 @@ namespace MediaBrowser.Controller.Entities
                 items = CollapseBoxSetItemsIfNeeded(items, query, this, user, ConfigurationManager, CollectionManager);
             }
 
-            #pragma warning disable CA1309
+#pragma warning disable CA1309
             if (!string.IsNullOrEmpty(query.NameStartsWithOrGreater))
             {
                 items = items.Where(i => string.Compare(query.NameStartsWithOrGreater, i.SortName, StringComparison.InvariantCultureIgnoreCase) < 1);
@@ -1023,7 +987,7 @@ namespace MediaBrowser.Controller.Entities
             {
                 items = items.Where(i => string.Compare(query.NameLessThan, i.SortName, StringComparison.InvariantCultureIgnoreCase) == 1);
             }
-            #pragma warning restore CA1309
+#pragma warning restore CA1309
 
             // This must be the last filter
             if (!query.AdjacentTo.IsNullOrEmpty())

+ 23 - 0
MediaBrowser.Controller/LibraryTaskScheduler/ILimitedConcurrencyLibraryScheduler.cs

@@ -0,0 +1,23 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using MediaBrowser.Model.Configuration;
+
+namespace MediaBrowser.Controller.LibraryTaskScheduler;
+
+/// <summary>
+/// Provides a shared scheduler to run library related tasks based on the <see cref="ServerConfiguration.LibraryScanFanoutConcurrency"/>.
+/// </summary>
+public interface ILimitedConcurrencyLibraryScheduler
+{
+    /// <summary>
+    /// Enqueues an action that will be invoked with the set data.
+    /// </summary>
+    /// <typeparam name="T">The data Type.</typeparam>
+    /// <param name="data">The data.</param>
+    /// <param name="worker">The callback to process the data.</param>
+    /// <param name="progress">A progress reporter.</param>
+    /// <param name="cancellationToken">Stop token.</param>
+    /// <returns>A task that finishes when all data has been processed by the worker.</returns>
+    Task Enqueue<T>(T[] data, Func<T, IProgress<double>, Task> worker, IProgress<double> progress, CancellationToken cancellationToken);
+}

+ 316 - 0
MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs

@@ -0,0 +1,316 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using MediaBrowser.Controller.Configuration;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+namespace MediaBrowser.Controller.LibraryTaskScheduler;
+
+/// <summary>
+/// Provides Parallel action interface to process tasks with a set concurrency level.
+/// </summary>
+public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibraryScheduler, IAsyncDisposable
+{
+    private const int CleanupGracePeriod = 60;
+    private readonly IHostApplicationLifetime _hostApplicationLifetime;
+    private readonly ILogger<LimitedConcurrencyLibraryScheduler> _logger;
+    private readonly IServerConfigurationManager _serverConfigurationManager;
+    private readonly Dictionary<CancellationTokenSource, Task> _taskRunners = new();
+
+    private static readonly AsyncLocal<CancellationTokenSource> _deadlockDetector = new();
+
+    /// <summary>
+    /// Gets used to lock all operations on the Tasks queue and creating workers.
+    /// </summary>
+    private readonly Lock _taskLock = new();
+
+    private readonly BlockingCollection<TaskQueueItem> _tasks = new();
+
+    private volatile int _workCounter;
+    private Task? _cleanupTask;
+    private bool _disposed;
+
+    /// <summary>
+    /// Initializes a new instance of the <see cref="LimitedConcurrencyLibraryScheduler"/> class.
+    /// </summary>
+    /// <param name="hostApplicationLifetime">The hosting lifetime.</param>
+    /// <param name="logger">The logger.</param>
+    /// <param name="serverConfigurationManager">The server configuration manager.</param>
+    public LimitedConcurrencyLibraryScheduler(
+        IHostApplicationLifetime hostApplicationLifetime,
+        ILogger<LimitedConcurrencyLibraryScheduler> logger,
+        IServerConfigurationManager serverConfigurationManager)
+    {
+        _hostApplicationLifetime = hostApplicationLifetime;
+        _logger = logger;
+        _serverConfigurationManager = serverConfigurationManager;
+    }
+
+    private void ScheduleTaskCleanup()
+    {
+        lock (_taskLock)
+        {
+            if (_cleanupTask is not null)
+            {
+                _logger.LogDebug("Cleanup task already scheduled.");
+                // cleanup task is already running.
+                return;
+            }
+
+            _cleanupTask = RunCleanupTask();
+        }
+
+        async Task RunCleanupTask()
+        {
+            _logger.LogDebug("Schedule cleanup task in {CleanupGracePerioid} sec.", CleanupGracePeriod);
+            await Task.Delay(TimeSpan.FromSeconds(CleanupGracePeriod)).ConfigureAwait(false);
+            if (_disposed)
+            {
+                _logger.LogDebug("Abort cleaning up, already disposed.");
+                return;
+            }
+
+            lock (_taskLock)
+            {
+                if (_tasks.Count > 0 || _workCounter > 0)
+                {
+                    _logger.LogDebug("Delay cleanup task, operations still running.");
+                    // tasks are still there so its still in use. Reschedule cleanup task.
+                    // we cannot just exit here and rely on the other invoker because there is a considerable timeframe where it could have already ended.
+                    _cleanupTask = RunCleanupTask();
+                    return;
+                }
+            }
+
+            _logger.LogDebug("Cleanup runners.");
+            foreach (var item in _taskRunners.ToArray())
+            {
+                await item.Key.CancelAsync().ConfigureAwait(false);
+                _taskRunners.Remove(item.Key);
+            }
+        }
+    }
+
+    private void Worker()
+    {
+        lock (_taskLock)
+        {
+            var fanoutConcurrency = _serverConfigurationManager.Configuration.LibraryScanFanoutConcurrency;
+            var parallelism = (fanoutConcurrency > 0 ? fanoutConcurrency : Environment.ProcessorCount) - _taskRunners.Count;
+            _logger.LogDebug("Spawn {NumberRunners} new runners.", parallelism);
+            for (int i = 0; i < parallelism; i++)
+            {
+                var stopToken = new CancellationTokenSource();
+                var combinedSource = CancellationTokenSource.CreateLinkedTokenSource(stopToken.Token, _hostApplicationLifetime.ApplicationStopping);
+                _taskRunners.Add(
+                    combinedSource,
+                    Task.Factory.StartNew(
+                        ItemWorker,
+                        (combinedSource, stopToken),
+                        combinedSource.Token,
+                        TaskCreationOptions.PreferFairness,
+                        TaskScheduler.Default));
+            }
+        }
+    }
+
+    private async Task ItemWorker(object? obj)
+    {
+        var stopToken = ((CancellationTokenSource TaskStop, CancellationTokenSource GlobalStop))obj!;
+        _deadlockDetector.Value = stopToken.TaskStop;
+        try
+        {
+            foreach (var item in _tasks.GetConsumingEnumerable(stopToken.GlobalStop.Token))
+            {
+                stopToken.GlobalStop.Token.ThrowIfCancellationRequested();
+                try
+                {
+                    var newWorkerLimit = Interlocked.Increment(ref _workCounter) > 0;
+                    Debug.Assert(newWorkerLimit, "_workCounter > 0");
+                    _logger.LogDebug("Process new item '{Data}'.", item.Data);
+                    await ProcessItem(item).ConfigureAwait(false);
+                }
+                finally
+                {
+                    var newWorkerLimit = Interlocked.Decrement(ref _workCounter) >= 0;
+                    Debug.Assert(newWorkerLimit, "_workCounter > 0");
+                }
+            }
+        }
+        catch (OperationCanceledException) when (stopToken.TaskStop.IsCancellationRequested)
+        {
+            // thats how you do it, interupt the waiter thread. There is nothing to do here when it was on purpose.
+        }
+        finally
+        {
+            _logger.LogDebug("Cleanup Runner'.");
+            _deadlockDetector.Value = default!;
+            _taskRunners.Remove(stopToken.TaskStop);
+            stopToken.GlobalStop.Dispose();
+            stopToken.TaskStop.Dispose();
+        }
+    }
+
+    private async Task ProcessItem(TaskQueueItem item)
+    {
+        try
+        {
+            if (item.CancellationToken.IsCancellationRequested)
+            {
+                // if item is cancelled, just skip it
+                return;
+            }
+
+            await item.Worker(item.Data).ConfigureAwait(true);
+        }
+        catch (System.Exception ex)
+        {
+            _logger.LogError(ex, "Error while performing a library operation");
+        }
+        finally
+        {
+            item.Progress.Report(100);
+            item.Done.SetResult();
+        }
+    }
+
+    /// <inheritdoc/>
+    public async Task Enqueue<T>(T[] data, Func<T, IProgress<double>, Task> worker, IProgress<double> progress, CancellationToken cancellationToken)
+    {
+        if (_disposed)
+        {
+            return;
+        }
+
+        if (data.Length == 0 || cancellationToken.IsCancellationRequested)
+        {
+            progress.Report(100);
+            return;
+        }
+
+        _logger.LogDebug("Enqueue new Workset of {NoItems} items.", data.Length);
+
+        TaskQueueItem[] workItems = null!;
+
+        void UpdateProgress()
+        {
+            progress.Report(workItems.Select(e => e.ProgressValue).Average());
+        }
+
+        workItems = data.Select(item =>
+        {
+            TaskQueueItem queueItem = null!;
+            return queueItem = new TaskQueueItem()
+            {
+                Data = item!,
+                Progress = new Progress<double>(innerPercent =>
+                    {
+                        // round the percent and only update progress if it changed to prevent excessive UpdateProgress calls
+                        var innerPercentRounded = Math.Round(innerPercent);
+                        if (queueItem.ProgressValue != innerPercentRounded)
+                        {
+                            queueItem.ProgressValue = innerPercentRounded;
+                            UpdateProgress();
+                        }
+                    }),
+                Worker = (val) => worker((T)val, queueItem.Progress),
+                CancellationToken = cancellationToken
+            };
+        }).ToArray();
+
+        if (_serverConfigurationManager.Configuration.LibraryScanFanoutConcurrency == 1)
+        {
+            _logger.LogDebug("Process sequentially.");
+            try
+            {
+                foreach (var item in workItems)
+                {
+                    await ProcessItem(item).ConfigureAwait(false);
+                }
+            }
+            catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
+            {
+                // operation is cancelled. Do nothing.
+            }
+
+            _logger.LogDebug("Process sequentially done.");
+            return;
+        }
+
+        for (var i = 0; i < workItems.Length; i++)
+        {
+            var item = workItems[i]!;
+            _tasks.Add(item, CancellationToken.None);
+        }
+
+        if (_deadlockDetector.Value is not null)
+        {
+            _logger.LogDebug("Nested invocation detected, process in-place.");
+            try
+            {
+                // we are in a nested loop. There is no reason to spawn a task here as that would just lead to deadlocks and no additional concurrency is achieved
+                while (workItems.Any(e => !e.Done.Task.IsCompleted) && _tasks.TryTake(out var item, 200, _deadlockDetector.Value.Token))
+                {
+                    await ProcessItem(item).ConfigureAwait(false);
+                }
+            }
+            catch (OperationCanceledException) when (_deadlockDetector.Value.IsCancellationRequested)
+            {
+                // operation is cancelled. Do nothing.
+            }
+
+            _logger.LogDebug("process in-place done.");
+        }
+        else
+        {
+            Worker();
+            _logger.LogDebug("Wait for {NoWorkers} to complete.", workItems.Length);
+            await Task.WhenAll([.. workItems.Select(f => f.Done.Task)]).ConfigureAwait(false);
+            _logger.LogDebug("{NoWorkers} completed.", workItems.Length);
+            ScheduleTaskCleanup();
+        }
+    }
+
+    /// <inheritdoc/>
+    public async ValueTask DisposeAsync()
+    {
+        if (_disposed)
+        {
+            return;
+        }
+
+        _disposed = true;
+        _tasks.CompleteAdding();
+        foreach (var item in _taskRunners)
+        {
+            await item.Key.CancelAsync().ConfigureAwait(false);
+        }
+
+        _tasks.Dispose();
+        if (_cleanupTask is not null)
+        {
+            await _cleanupTask.ConfigureAwait(false);
+            _cleanupTask?.Dispose();
+        }
+    }
+
+    private class TaskQueueItem
+    {
+        public required object Data { get; init; }
+
+        public double ProgressValue { get; set; }
+
+        public required Func<object, Task> Worker { get; init; }
+
+        public required IProgress<double> Progress { get; init; }
+
+        public TaskCompletionSource Done { get; } = new();
+
+        public CancellationToken CancellationToken { get; init; }
+    }
+}