|  | @@ -2,6 +2,7 @@ using System;
 | 
	
		
			
				|  |  |  using System.IO;
 | 
	
		
			
				|  |  |  using System.Threading;
 | 
	
		
			
				|  |  |  using System.Threading.Tasks;
 | 
	
		
			
				|  |  | +using Jellyfin.Api.Models.PlaybackDtos;
 | 
	
		
			
				|  |  |  using MediaBrowser.Controller.Library;
 | 
	
		
			
				|  |  |  using MediaBrowser.Model.IO;
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -12,34 +13,53 @@ namespace Jellyfin.Api.Helpers
 | 
	
		
			
				|  |  |      /// </summary>
 | 
	
		
			
				|  |  |      public class ProgressiveFileCopier
 | 
	
		
			
				|  |  |      {
 | 
	
		
			
				|  |  | +        private readonly TranscodingJobDto? _job;
 | 
	
		
			
				|  |  |          private readonly string? _path;
 | 
	
		
			
				|  |  | +        private readonly CancellationToken _cancellationToken;
 | 
	
		
			
				|  |  |          private readonly IDirectStreamProvider? _directStreamProvider;
 | 
	
		
			
				|  |  | -        private readonly IStreamHelper _streamHelper;
 | 
	
		
			
				|  |  | +        private readonly TranscodingJobHelper _transcodingJobHelper;
 | 
	
		
			
				|  |  | +        private long _bytesWritten;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          /// <summary>
 | 
	
		
			
				|  |  |          /// Initializes a new instance of the <see cref="ProgressiveFileCopier"/> class.
 | 
	
		
			
				|  |  |          /// </summary>
 | 
	
		
			
				|  |  | -        /// <param name="streamHelper">Instance of the <see cref="IStreamHelper"/> interface.</param>
 | 
	
		
			
				|  |  | -        /// <param name="path">Filepath to stream from.</param>
 | 
	
		
			
				|  |  | -        public ProgressiveFileCopier(IStreamHelper streamHelper, string path)
 | 
	
		
			
				|  |  | +        /// <param name="path">The path to copy from.</param>
 | 
	
		
			
				|  |  | +        /// <param name="job">The transcoding job.</param>
 | 
	
		
			
				|  |  | +        /// <param name="transcodingJobHelper">Instance of the <see cref="TranscodingJobHelper"/>.</param>
 | 
	
		
			
				|  |  | +        /// <param name="cancellationToken">The cancellation token.</param>
 | 
	
		
			
				|  |  | +        public ProgressiveFileCopier(string path, TranscodingJobDto? job, TranscodingJobHelper transcodingJobHelper, CancellationToken cancellationToken)
 | 
	
		
			
				|  |  |          {
 | 
	
		
			
				|  |  |              _path = path;
 | 
	
		
			
				|  |  | -            _streamHelper = streamHelper;
 | 
	
		
			
				|  |  | -            _directStreamProvider = null;
 | 
	
		
			
				|  |  | +            _job = job;
 | 
	
		
			
				|  |  | +            _cancellationToken = cancellationToken;
 | 
	
		
			
				|  |  | +            _transcodingJobHelper = transcodingJobHelper;
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          /// <summary>
 | 
	
		
			
				|  |  |          /// Initializes a new instance of the <see cref="ProgressiveFileCopier"/> class.
 | 
	
		
			
				|  |  |          /// </summary>
 | 
	
		
			
				|  |  | -        /// <param name="streamHelper">Instance of the <see cref="IStreamHelper"/> interface.</param>
 | 
	
		
			
				|  |  |          /// <param name="directStreamProvider">Instance of the <see cref="IDirectStreamProvider"/> interface.</param>
 | 
	
		
			
				|  |  | -        public ProgressiveFileCopier(IStreamHelper streamHelper, IDirectStreamProvider directStreamProvider)
 | 
	
		
			
				|  |  | +        /// <param name="job">The transcoding job.</param>
 | 
	
		
			
				|  |  | +        /// <param name="transcodingJobHelper">Instance of the <see cref="TranscodingJobHelper"/>.</param>
 | 
	
		
			
				|  |  | +        /// <param name="cancellationToken">The cancellation token.</param>
 | 
	
		
			
				|  |  | +        public ProgressiveFileCopier(IDirectStreamProvider directStreamProvider, TranscodingJobDto? job, TranscodingJobHelper transcodingJobHelper, CancellationToken cancellationToken)
 | 
	
		
			
				|  |  |          {
 | 
	
		
			
				|  |  |              _directStreamProvider = directStreamProvider;
 | 
	
		
			
				|  |  | -            _streamHelper = streamHelper;
 | 
	
		
			
				|  |  | -            _path = null;
 | 
	
		
			
				|  |  | +            _job = job;
 | 
	
		
			
				|  |  | +            _cancellationToken = cancellationToken;
 | 
	
		
			
				|  |  | +            _transcodingJobHelper = transcodingJobHelper;
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +        /// <summary>
 | 
	
		
			
				|  |  | +        /// Gets or sets a value indicating whether allow read end of file.
 | 
	
		
			
				|  |  | +        /// </summary>
 | 
	
		
			
				|  |  | +        public bool AllowEndOfFile { get; set; } = true;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        /// <summary>
 | 
	
		
			
				|  |  | +        /// Gets or sets copy start position.
 | 
	
		
			
				|  |  | +        /// </summary>
 | 
	
		
			
				|  |  | +        public long StartPosition { get; set; }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |          /// <summary>
 | 
	
		
			
				|  |  |          /// Write source stream to output.
 | 
	
		
			
				|  |  |          /// </summary>
 | 
	
	
		
			
				|  | @@ -48,37 +68,123 @@ namespace Jellyfin.Api.Helpers
 | 
	
		
			
				|  |  |          /// <returns>A <see cref="Task"/>.</returns>
 | 
	
		
			
				|  |  |          public async Task WriteToAsync(Stream outputStream, CancellationToken cancellationToken)
 | 
	
		
			
				|  |  |          {
 | 
	
		
			
				|  |  | -            if (_directStreamProvider != null)
 | 
	
		
			
				|  |  | +            cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancellationToken).Token;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            try
 | 
	
		
			
				|  |  |              {
 | 
	
		
			
				|  |  | -                await _directStreamProvider.CopyToAsync(outputStream, cancellationToken).ConfigureAwait(false);
 | 
	
		
			
				|  |  | -                return;
 | 
	
		
			
				|  |  | -            }
 | 
	
		
			
				|  |  | +                if (_directStreamProvider != null)
 | 
	
		
			
				|  |  | +                {
 | 
	
		
			
				|  |  | +                    await _directStreamProvider.CopyToAsync(outputStream, cancellationToken).ConfigureAwait(false);
 | 
	
		
			
				|  |  | +                    return;
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                var fileOptions = FileOptions.SequentialScan;
 | 
	
		
			
				|  |  | +                var allowAsyncFileRead = false;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                // use non-async filestream along with read due to https://github.com/dotnet/corefx/issues/6039
 | 
	
		
			
				|  |  | +                if (Environment.OSVersion.Platform != PlatformID.Win32NT)
 | 
	
		
			
				|  |  | +                {
 | 
	
		
			
				|  |  | +                    fileOptions |= FileOptions.Asynchronous;
 | 
	
		
			
				|  |  | +                    allowAsyncFileRead = true;
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                await using var inputStream = new FileStream(_path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, IODefaults.FileStreamBufferSize, fileOptions);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                var eofCount = 0;
 | 
	
		
			
				|  |  | +                const int emptyReadLimit = 20;
 | 
	
		
			
				|  |  | +                if (StartPosition > 0)
 | 
	
		
			
				|  |  | +                {
 | 
	
		
			
				|  |  | +                    inputStream.Position = StartPosition;
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                while (eofCount < emptyReadLimit || !AllowEndOfFile)
 | 
	
		
			
				|  |  | +                {
 | 
	
		
			
				|  |  | +                    int bytesRead;
 | 
	
		
			
				|  |  | +                    if (allowAsyncFileRead)
 | 
	
		
			
				|  |  | +                    {
 | 
	
		
			
				|  |  | +                        bytesRead = await CopyToInternalAsync(inputStream, outputStream, cancellationToken).ConfigureAwait(false);
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                    else
 | 
	
		
			
				|  |  | +                    {
 | 
	
		
			
				|  |  | +                        bytesRead = await CopyToInternalAsyncWithSyncRead(inputStream, outputStream, cancellationToken).ConfigureAwait(false);
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            var fileOptions = FileOptions.SequentialScan;
 | 
	
		
			
				|  |  | +                    if (bytesRead == 0)
 | 
	
		
			
				|  |  | +                    {
 | 
	
		
			
				|  |  | +                        if (_job == null || _job.HasExited)
 | 
	
		
			
				|  |  | +                        {
 | 
	
		
			
				|  |  | +                            eofCount++;
 | 
	
		
			
				|  |  | +                        }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            // use non-async filestream along with read due to https://github.com/dotnet/corefx/issues/6039
 | 
	
		
			
				|  |  | -            if (Environment.OSVersion.Platform != PlatformID.Win32NT)
 | 
	
		
			
				|  |  | +                        await Task.Delay(100, cancellationToken).ConfigureAwait(false);
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                    else
 | 
	
		
			
				|  |  | +                    {
 | 
	
		
			
				|  |  | +                        eofCount = 0;
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +            finally
 | 
	
		
			
				|  |  |              {
 | 
	
		
			
				|  |  | -                fileOptions |= FileOptions.Asynchronous;
 | 
	
		
			
				|  |  | +                if (_job != null)
 | 
	
		
			
				|  |  | +                {
 | 
	
		
			
				|  |  | +                    _transcodingJobHelper.OnTranscodeEndRequest(_job);
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -            await using var inputStream = new FileStream(_path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 4096, fileOptions);
 | 
	
		
			
				|  |  | -            const int emptyReadLimit = 100;
 | 
	
		
			
				|  |  | -            var eofCount = 0;
 | 
	
		
			
				|  |  | -            while (eofCount < emptyReadLimit)
 | 
	
		
			
				|  |  | +        private async Task<int> CopyToInternalAsyncWithSyncRead(Stream source, Stream destination, CancellationToken cancellationToken)
 | 
	
		
			
				|  |  | +        {
 | 
	
		
			
				|  |  | +            var array = new byte[IODefaults.CopyToBufferSize];
 | 
	
		
			
				|  |  | +            int bytesRead;
 | 
	
		
			
				|  |  | +            int totalBytesRead = 0;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            while ((bytesRead = source.Read(array, 0, array.Length)) != 0)
 | 
	
		
			
				|  |  |              {
 | 
	
		
			
				|  |  | -                var bytesRead = await _streamHelper.CopyToAsync(inputStream, outputStream, cancellationToken).ConfigureAwait(false);
 | 
	
		
			
				|  |  | +                var bytesToWrite = bytesRead;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -                if (bytesRead == 0)
 | 
	
		
			
				|  |  | +                if (bytesToWrite > 0)
 | 
	
		
			
				|  |  |                  {
 | 
	
		
			
				|  |  | -                    eofCount++;
 | 
	
		
			
				|  |  | -                    await Task.Delay(100, cancellationToken).ConfigureAwait(false);
 | 
	
		
			
				|  |  | +                    await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                    _bytesWritten += bytesRead;
 | 
	
		
			
				|  |  | +                    totalBytesRead += bytesRead;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                    if (_job != null)
 | 
	
		
			
				|  |  | +                    {
 | 
	
		
			
				|  |  | +                        _job.BytesDownloaded = Math.Max(_job.BytesDownloaded ?? _bytesWritten, _bytesWritten);
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  | -                else
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            return totalBytesRead;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        private async Task<int> CopyToInternalAsync(Stream source, Stream destination, CancellationToken cancellationToken)
 | 
	
		
			
				|  |  | +        {
 | 
	
		
			
				|  |  | +            var array = new byte[IODefaults.CopyToBufferSize];
 | 
	
		
			
				|  |  | +            int bytesRead;
 | 
	
		
			
				|  |  | +            int totalBytesRead = 0;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            while ((bytesRead = await source.ReadAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(false)) != 0)
 | 
	
		
			
				|  |  | +            {
 | 
	
		
			
				|  |  | +                var bytesToWrite = bytesRead;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                if (bytesToWrite > 0)
 | 
	
		
			
				|  |  |                  {
 | 
	
		
			
				|  |  | -                    eofCount = 0;
 | 
	
		
			
				|  |  | +                    await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                    _bytesWritten += bytesRead;
 | 
	
		
			
				|  |  | +                    totalBytesRead += bytesRead;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                    if (_job != null)
 | 
	
		
			
				|  |  | +                    {
 | 
	
		
			
				|  |  | +                        _job.BytesDownloaded = Math.Max(_job.BytesDownloaded ?? _bytesWritten, _bytesWritten);
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            return totalBytesRead;
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  }
 |