Browse Source

AsyncKeyedLock migration

Mark Cilia Vincenti 1 year ago
parent
commit
d1677dc680

+ 1 - 0
Directory.Packages.props

@@ -4,6 +4,7 @@
   </PropertyGroup>
   <!-- Run "dotnet list package (dash,dash)outdated" to see the latest versions of each package.-->
   <ItemGroup Label="Package Dependencies">
+    <PackageVersion Include="AsyncKeyedLock" Version="6.2.6" />
     <PackageVersion Include="AutoFixture.AutoMoq" Version="4.18.1" />
     <PackageVersion Include="AutoFixture.Xunit2" Version="4.18.1" />
     <PackageVersion Include="AutoFixture" Version="4.18.1" />

+ 56 - 77
Jellyfin.Api/Controllers/DynamicHlsController.cs

@@ -294,9 +294,7 @@ public class DynamicHlsController : BaseJellyfinApiController
 
         if (!System.IO.File.Exists(playlistPath))
         {
-            var transcodingLock = _transcodeManager.GetTranscodingLock(playlistPath);
-            await transcodingLock.WaitAsync(cancellationToken).ConfigureAwait(false);
-            try
+            using (await _transcodeManager.LockAsync(playlistPath, cancellationToken).ConfigureAwait(false))
             {
                 if (!System.IO.File.Exists(playlistPath))
                 {
@@ -326,10 +324,6 @@ public class DynamicHlsController : BaseJellyfinApiController
                     }
                 }
             }
-            finally
-            {
-                transcodingLock.Release();
-            }
         }
 
         job ??= _transcodeManager.OnTranscodeBeginRequest(playlistPath, TranscodingJobType);
@@ -1442,95 +1436,80 @@ public class DynamicHlsController : BaseJellyfinApiController
             return await GetSegmentResult(state, playlistPath, segmentPath, segmentExtension, segmentId, job, cancellationToken).ConfigureAwait(false);
         }
 
-        var transcodingLock = _transcodeManager.GetTranscodingLock(playlistPath);
-        await transcodingLock.WaitAsync(cancellationToken).ConfigureAwait(false);
-        var released = false;
-        var startTranscoding = false;
-
-        try
+        using (await _transcodeManager.LockAsync(playlistPath, cancellationToken).ConfigureAwait(false))
         {
+            var startTranscoding = false;
             if (System.IO.File.Exists(segmentPath))
             {
                 job = _transcodeManager.OnTranscodeBeginRequest(playlistPath, TranscodingJobType);
-                transcodingLock.Release();
-                released = true;
                 _logger.LogDebug("returning {0} [it exists, try 2]", segmentPath);
                 return await GetSegmentResult(state, playlistPath, segmentPath, segmentExtension, segmentId, job, cancellationToken).ConfigureAwait(false);
             }
-            else
-            {
-                var currentTranscodingIndex = GetCurrentTranscodingIndex(playlistPath, segmentExtension);
-                var segmentGapRequiringTranscodingChange = 24 / state.SegmentLength;
-
-                if (segmentId == -1)
-                {
-                    _logger.LogDebug("Starting transcoding because fmp4 init file is being requested");
-                    startTranscoding = true;
-                    segmentId = 0;
-                }
-                else if (currentTranscodingIndex is null)
-                {
-                    _logger.LogDebug("Starting transcoding because currentTranscodingIndex=null");
-                    startTranscoding = true;
-                }
-                else if (segmentId < currentTranscodingIndex.Value)
-                {
-                    _logger.LogDebug("Starting transcoding because requestedIndex={0} and currentTranscodingIndex={1}", segmentId, currentTranscodingIndex);
-                    startTranscoding = true;
-                }
-                else if (segmentId - currentTranscodingIndex.Value > segmentGapRequiringTranscodingChange)
-                {
-                    _logger.LogDebug("Starting transcoding because segmentGap is {0} and max allowed gap is {1}. requestedIndex={2}", segmentId - currentTranscodingIndex.Value, segmentGapRequiringTranscodingChange, segmentId);
-                    startTranscoding = true;
-                }
 
-                if (startTranscoding)
-                {
-                    // If the playlist doesn't already exist, startup ffmpeg
-                    try
-                    {
-                        await _transcodeManager.KillTranscodingJobs(streamingRequest.DeviceId, streamingRequest.PlaySessionId, p => false)
-                            .ConfigureAwait(false);
+            var currentTranscodingIndex = GetCurrentTranscodingIndex(playlistPath, segmentExtension);
+            var segmentGapRequiringTranscodingChange = 24 / state.SegmentLength;
 
-                        if (currentTranscodingIndex.HasValue)
-                        {
-                            DeleteLastFile(playlistPath, segmentExtension, 0);
-                        }
+            if (segmentId == -1)
+            {
+                _logger.LogDebug("Starting transcoding because fmp4 init file is being requested");
+                startTranscoding = true;
+                segmentId = 0;
+            }
+            else if (currentTranscodingIndex is null)
+            {
+                _logger.LogDebug("Starting transcoding because currentTranscodingIndex=null");
+                startTranscoding = true;
+            }
+            else if (segmentId < currentTranscodingIndex.Value)
+            {
+                _logger.LogDebug("Starting transcoding because requestedIndex={0} and currentTranscodingIndex={1}", segmentId, currentTranscodingIndex);
+                startTranscoding = true;
+            }
+            else if (segmentId - currentTranscodingIndex.Value > segmentGapRequiringTranscodingChange)
+            {
+                _logger.LogDebug("Starting transcoding because segmentGap is {0} and max allowed gap is {1}. requestedIndex={2}", segmentId - currentTranscodingIndex.Value, segmentGapRequiringTranscodingChange, segmentId);
+                startTranscoding = true;
+            }
 
-                        streamingRequest.StartTimeTicks = streamingRequest.CurrentRuntimeTicks;
+            if (startTranscoding)
+            {
+                // If the playlist doesn't already exist, startup ffmpeg
+                try
+                {
+                    await _transcodeManager.KillTranscodingJobs(streamingRequest.DeviceId, streamingRequest.PlaySessionId, p => false)
+                        .ConfigureAwait(false);
 
-                        state.WaitForPath = segmentPath;
-                        job = await _transcodeManager.StartFfMpeg(
-                            state,
-                            playlistPath,
-                            GetCommandLineArguments(playlistPath, state, false, segmentId),
-                            Request.HttpContext.User.GetUserId(),
-                            TranscodingJobType,
-                            cancellationTokenSource).ConfigureAwait(false);
-                    }
-                    catch
+                    if (currentTranscodingIndex.HasValue)
                     {
-                        state.Dispose();
-                        throw;
+                        DeleteLastFile(playlistPath, segmentExtension, 0);
                     }
 
-                    // await WaitForMinimumSegmentCount(playlistPath, 1, cancellationTokenSource.Token).ConfigureAwait(false);
+                    streamingRequest.StartTimeTicks = streamingRequest.CurrentRuntimeTicks;
+
+                    state.WaitForPath = segmentPath;
+                    job = await _transcodeManager.StartFfMpeg(
+                        state,
+                        playlistPath,
+                        GetCommandLineArguments(playlistPath, state, false, segmentId),
+                        Request.HttpContext.User.GetUserId(),
+                        TranscodingJobType,
+                        cancellationTokenSource).ConfigureAwait(false);
                 }
-                else
+                catch
                 {
-                    job = _transcodeManager.OnTranscodeBeginRequest(playlistPath, TranscodingJobType);
-                    if (job?.TranscodingThrottler is not null)
-                    {
-                        await job.TranscodingThrottler.UnpauseTranscoding().ConfigureAwait(false);
-                    }
+                    state.Dispose();
+                    throw;
                 }
+
+                // await WaitForMinimumSegmentCount(playlistPath, 1, cancellationTokenSource.Token).ConfigureAwait(false);
             }
-        }
-        finally
-        {
-            if (!released)
+            else
             {
-                transcodingLock.Release();
+                job = _transcodeManager.OnTranscodeBeginRequest(playlistPath, TranscodingJobType);
+                if (job?.TranscodingThrottler is not null)
+                {
+                    await job.TranscodingThrottler.UnpauseTranscoding().ConfigureAwait(false);
+                }
             }
         }
 

+ 1 - 7
Jellyfin.Api/Helpers/FileStreamResponseHelpers.cs

@@ -93,9 +93,7 @@ public static class FileStreamResponseHelpers
             return new OkResult();
         }
 
-        var transcodingLock = transcodeManager.GetTranscodingLock(outputPath);
-        await transcodingLock.WaitAsync(cancellationTokenSource.Token).ConfigureAwait(false);
-        try
+        using (await transcodeManager.LockAsync(outputPath, cancellationTokenSource.Token).ConfigureAwait(false))
         {
             TranscodingJob? job;
             if (!File.Exists(outputPath))
@@ -117,9 +115,5 @@ public static class FileStreamResponseHelpers
             var stream = new ProgressiveFileStream(outputPath, job, transcodeManager);
             return new FileStreamResult(stream, contentType);
         }
-        finally
-        {
-            transcodingLock.Release();
-        }
     }
 }

+ 3 - 2
MediaBrowser.Controller/MediaEncoding/ITranscodeManager.cs

@@ -96,9 +96,10 @@ public interface ITranscodeManager
     public void OnTranscodeEndRequest(TranscodingJob job);
 
     /// <summary>
-    /// Gets the transcoding lock.
+    /// Transcoding lock.
     /// </summary>
     /// <param name="outputPath">The output path of the transcoded file.</param>
+    /// <param name="cancellationToken">The cancellation token.</param>
     /// <returns>A <see cref="SemaphoreSlim"/>.</returns>
-    public SemaphoreSlim GetTranscodingLock(string outputPath);
+    ValueTask<IDisposable> LockAsync(string outputPath, CancellationToken cancellationToken);
 }

+ 16 - 30
MediaBrowser.MediaEncoding/Attachments/AttachmentExtractor.cs

@@ -8,6 +8,7 @@ using System.IO;
 using System.Linq;
 using System.Threading;
 using System.Threading.Tasks;
+using AsyncKeyedLock;
 using MediaBrowser.Common.Configuration;
 using MediaBrowser.Common.Extensions;
 using MediaBrowser.Controller.Entities;
@@ -22,7 +23,7 @@ using Microsoft.Extensions.Logging;
 
 namespace MediaBrowser.MediaEncoding.Attachments
 {
-    public sealed class AttachmentExtractor : IAttachmentExtractor
+    public sealed class AttachmentExtractor : IAttachmentExtractor, IDisposable
     {
         private readonly ILogger<AttachmentExtractor> _logger;
         private readonly IApplicationPaths _appPaths;
@@ -30,8 +31,11 @@ namespace MediaBrowser.MediaEncoding.Attachments
         private readonly IMediaEncoder _mediaEncoder;
         private readonly IMediaSourceManager _mediaSourceManager;
 
-        private readonly ConcurrentDictionary<string, SemaphoreSlim> _semaphoreLocks =
-            new ConcurrentDictionary<string, SemaphoreSlim>();
+        private readonly AsyncKeyedLocker<string> _semaphoreLocks = new(o =>
+        {
+            o.PoolSize = 20;
+            o.PoolInitialFill = 1;
+        });
 
         public AttachmentExtractor(
             ILogger<AttachmentExtractor> logger,
@@ -84,11 +88,7 @@ namespace MediaBrowser.MediaEncoding.Attachments
             string outputPath,
             CancellationToken cancellationToken)
         {
-            var semaphore = _semaphoreLocks.GetOrAdd(outputPath, key => new SemaphoreSlim(1, 1));
-
-            await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
-
-            try
+            using (await _semaphoreLocks.LockAsync(outputPath, cancellationToken).ConfigureAwait(false))
             {
                 if (!Directory.Exists(outputPath))
                 {
@@ -99,10 +99,6 @@ namespace MediaBrowser.MediaEncoding.Attachments
                         cancellationToken).ConfigureAwait(false);
                 }
             }
-            finally
-            {
-                semaphore.Release();
-            }
         }
 
         public async Task ExtractAllAttachmentsExternal(
@@ -111,11 +107,7 @@ namespace MediaBrowser.MediaEncoding.Attachments
             string outputPath,
             CancellationToken cancellationToken)
         {
-            var semaphore = _semaphoreLocks.GetOrAdd(outputPath, key => new SemaphoreSlim(1, 1));
-
-            await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
-
-            try
+            using (await _semaphoreLocks.LockAsync(outputPath, cancellationToken).ConfigureAwait(false))
             {
                 if (!File.Exists(Path.Join(outputPath, id)))
                 {
@@ -131,10 +123,6 @@ namespace MediaBrowser.MediaEncoding.Attachments
                     }
                 }
             }
-            finally
-            {
-                semaphore.Release();
-            }
         }
 
         private async Task ExtractAllAttachmentsInternal(
@@ -256,11 +244,7 @@ namespace MediaBrowser.MediaEncoding.Attachments
             string outputPath,
             CancellationToken cancellationToken)
         {
-            var semaphore = _semaphoreLocks.GetOrAdd(outputPath, key => new SemaphoreSlim(1, 1));
-
-            await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
-
-            try
+            using (await _semaphoreLocks.LockAsync(outputPath, cancellationToken).ConfigureAwait(false))
             {
                 if (!File.Exists(outputPath))
                 {
@@ -271,10 +255,6 @@ namespace MediaBrowser.MediaEncoding.Attachments
                         cancellationToken).ConfigureAwait(false);
                 }
             }
-            finally
-            {
-                semaphore.Release();
-            }
         }
 
         private async Task ExtractAttachmentInternal(
@@ -379,5 +359,11 @@ namespace MediaBrowser.MediaEncoding.Attachments
             var prefix = filename.AsSpan(0, 1);
             return Path.Join(_appPaths.DataPath, "attachments", prefix, filename);
         }
+
+        /// <inheritdoc />
+        public void Dispose()
+        {
+            _semaphoreLocks.Dispose();
+        }
     }
 }

+ 2 - 1
MediaBrowser.MediaEncoding/MediaBrowser.MediaEncoding.csproj

@@ -1,4 +1,4 @@
-<Project Sdk="Microsoft.NET.Sdk">
+<Project Sdk="Microsoft.NET.Sdk">
 
   <!-- ProjectGuid is only included as a requirement for SonarQube analysis -->
   <PropertyGroup>
@@ -22,6 +22,7 @@
   </ItemGroup>
 
   <ItemGroup>
+    <PackageReference Include="AsyncKeyedLock" />
     <PackageReference Include="BDInfo" />
     <PackageReference Include="libse" />
     <PackageReference Include="Microsoft.Extensions.Http" />

+ 18 - 34
MediaBrowser.MediaEncoding/Subtitles/SubtitleEncoder.cs

@@ -1,7 +1,6 @@
 #pragma warning disable CS1591
 
 using System;
-using System.Collections.Concurrent;
 using System.Diagnostics;
 using System.Diagnostics.CodeAnalysis;
 using System.Globalization;
@@ -11,6 +10,7 @@ using System.Net.Http;
 using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
+using AsyncKeyedLock;
 using MediaBrowser.Common;
 using MediaBrowser.Common.Configuration;
 using MediaBrowser.Common.Extensions;
@@ -18,6 +18,7 @@ using MediaBrowser.Common.Net;
 using MediaBrowser.Controller.Entities;
 using MediaBrowser.Controller.Library;
 using MediaBrowser.Controller.MediaEncoding;
+using MediaBrowser.Controller.Session;
 using MediaBrowser.Model.Dto;
 using MediaBrowser.Model.Entities;
 using MediaBrowser.Model.IO;
@@ -27,7 +28,7 @@ using UtfUnknown;
 
 namespace MediaBrowser.MediaEncoding.Subtitles
 {
-    public sealed class SubtitleEncoder : ISubtitleEncoder
+    public sealed class SubtitleEncoder : ISubtitleEncoder, IDisposable
     {
         private readonly ILogger<SubtitleEncoder> _logger;
         private readonly IApplicationPaths _appPaths;
@@ -40,8 +41,11 @@ namespace MediaBrowser.MediaEncoding.Subtitles
         /// <summary>
         /// The _semaphoreLocks.
         /// </summary>
-        private readonly ConcurrentDictionary<string, SemaphoreSlim> _semaphoreLocks =
-            new ConcurrentDictionary<string, SemaphoreSlim>();
+        private readonly AsyncKeyedLocker<string> _semaphoreLocks = new(o =>
+        {
+            o.PoolSize = 20;
+            o.PoolInitialFill = 1;
+        });
 
         public SubtitleEncoder(
             ILogger<SubtitleEncoder> logger,
@@ -317,16 +321,6 @@ namespace MediaBrowser.MediaEncoding.Subtitles
             throw new ArgumentException("Unsupported format: " + format);
         }
 
-        /// <summary>
-        /// Gets the lock.
-        /// </summary>
-        /// <param name="filename">The filename.</param>
-        /// <returns>System.Object.</returns>
-        private SemaphoreSlim GetLock(string filename)
-        {
-            return _semaphoreLocks.GetOrAdd(filename, _ => new SemaphoreSlim(1, 1));
-        }
-
         /// <summary>
         /// Converts the text subtitle to SRT.
         /// </summary>
@@ -337,21 +331,13 @@ namespace MediaBrowser.MediaEncoding.Subtitles
         /// <returns>Task.</returns>
         private async Task ConvertTextSubtitleToSrt(MediaStream subtitleStream, MediaSourceInfo mediaSource, string outputPath, CancellationToken cancellationToken)
         {
-            var semaphore = GetLock(outputPath);
-
-            await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
-
-            try
+            using (await _semaphoreLocks.LockAsync(outputPath, cancellationToken).ConfigureAwait(false))
             {
                 if (!File.Exists(outputPath))
                 {
                     await ConvertTextSubtitleToSrtInternal(subtitleStream, mediaSource, outputPath, cancellationToken).ConfigureAwait(false);
                 }
             }
-            finally
-            {
-                semaphore.Release();
-            }
         }
 
         /// <summary>
@@ -484,16 +470,12 @@ namespace MediaBrowser.MediaEncoding.Subtitles
             string outputPath,
             CancellationToken cancellationToken)
         {
-            var semaphore = GetLock(outputPath);
-
-            await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
-
-            var subtitleStreamIndex = EncodingHelper.FindIndex(mediaSource.MediaStreams, subtitleStream);
-
-            try
+            using (await _semaphoreLocks.LockAsync(outputPath, cancellationToken).ConfigureAwait(false))
             {
                 if (!File.Exists(outputPath))
                 {
+                    var subtitleStreamIndex = EncodingHelper.FindIndex(mediaSource.MediaStreams, subtitleStream);
+
                     var args = _mediaEncoder.GetInputArgument(mediaSource.Path, mediaSource);
 
                     if (subtitleStream.IsExternal)
@@ -509,10 +491,6 @@ namespace MediaBrowser.MediaEncoding.Subtitles
                         cancellationToken).ConfigureAwait(false);
                 }
             }
-            finally
-            {
-                semaphore.Release();
-            }
         }
 
         private async Task ExtractTextSubtitleInternal(
@@ -728,6 +706,12 @@ namespace MediaBrowser.MediaEncoding.Subtitles
             }
         }
 
+        /// <inheritdoc />
+        public void Dispose()
+        {
+            _semaphoreLocks.Dispose();
+        }
+
 #pragma warning disable CA1034 // Nested types should not be visible
         // Only public for the unit tests
         public readonly record struct SubtitleInfo

+ 20 - 26
MediaBrowser.MediaEncoding/Transcoding/TranscodeManager.cs

@@ -4,10 +4,12 @@ using System.Diagnostics;
 using System.Globalization;
 using System.IO;
 using System.Linq;
+using System.Runtime.CompilerServices;
 using System.Text;
 using System.Text.Json;
 using System.Threading;
 using System.Threading.Tasks;
+using AsyncKeyedLock;
 using Jellyfin.Data.Enums;
 using MediaBrowser.Common;
 using MediaBrowser.Common.Configuration;
@@ -42,7 +44,11 @@ public sealed class TranscodeManager : ITranscodeManager, IDisposable
     private readonly IAttachmentExtractor _attachmentExtractor;
 
     private readonly List<TranscodingJob> _activeTranscodingJobs = new();
-    private readonly Dictionary<string, SemaphoreSlim> _transcodingLocks = new();
+    private readonly AsyncKeyedLocker<string> _transcodingLocks = new(o =>
+    {
+        o.PoolSize = 20;
+        o.PoolInitialFill = 1;
+    });
 
     /// <summary>
     /// Initializes a new instance of the <see cref="TranscodeManager"/> class.
@@ -223,11 +229,6 @@ public sealed class TranscodeManager : ITranscodeManager, IDisposable
             }
         }
 
-        lock (_transcodingLocks)
-        {
-            _transcodingLocks.Remove(job.Path!);
-        }
-
         job.Stop();
 
         if (delete(job.Path!))
@@ -624,11 +625,6 @@ public sealed class TranscodeManager : ITranscodeManager, IDisposable
             }
         }
 
-        lock (_transcodingLocks)
-        {
-            _transcodingLocks.Remove(path);
-        }
-
         if (!string.IsNullOrWhiteSpace(state.Request.DeviceId))
         {
             _sessionManager.ClearTranscodingInfo(state.Request.DeviceId);
@@ -704,21 +700,6 @@ public sealed class TranscodeManager : ITranscodeManager, IDisposable
         }
     }
 
-    /// <inheritdoc />
-    public SemaphoreSlim GetTranscodingLock(string outputPath)
-    {
-        lock (_transcodingLocks)
-        {
-            if (!_transcodingLocks.TryGetValue(outputPath, out SemaphoreSlim? result))
-            {
-                result = new SemaphoreSlim(1, 1);
-                _transcodingLocks[outputPath] = result;
-            }
-
-            return result;
-        }
-    }
-
     private void OnPlaybackProgress(object? sender, PlaybackProgressEventArgs e)
     {
         if (!string.IsNullOrWhiteSpace(e.PlaySessionId))
@@ -741,10 +722,23 @@ public sealed class TranscodeManager : ITranscodeManager, IDisposable
         }
     }
 
+    /// <summary>
+    /// Transcoding lock.
+    /// </summary>
+    /// <param name="outputPath">The output path of the transcoded file.</param>
+    /// <param name="cancellationToken">The cancellation token.</param>
+    /// <returns>A <see cref="SemaphoreSlim"/>.</returns>
+    [MethodImpl(MethodImplOptions.AggressiveInlining)]
+    public ValueTask<IDisposable> LockAsync(string outputPath, CancellationToken cancellationToken)
+    {
+        return _transcodingLocks.LockAsync(outputPath, cancellationToken);
+    }
+
     /// <inheritdoc />
     public void Dispose()
     {
         _sessionManager.PlaybackProgress -= OnPlaybackProgress;
         _sessionManager.PlaybackStart -= OnPlaybackProgress;
+        _transcodingLocks.Dispose();
     }
 }