ProgressiveStreamWriter.cs 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using MediaBrowser.Controller.Library;
  7. using MediaBrowser.Model.IO;
  8. using MediaBrowser.Model.Services;
  9. using MediaBrowser.Model.System;
  10. using Microsoft.Extensions.Logging;
  11. namespace MediaBrowser.Api.Playback.Progressive
  12. {
  13. public class ProgressiveFileCopier : IAsyncStreamWriter, IHasHeaders
  14. {
  15. private readonly IFileSystem _fileSystem;
  16. private readonly TranscodingJob _job;
  17. private readonly ILogger _logger;
  18. private readonly string _path;
  19. private readonly CancellationToken _cancellationToken;
  20. private readonly Dictionary<string, string> _outputHeaders;
  21. const int StreamCopyToBufferSize = 81920;
  22. private long _bytesWritten = 0;
  23. public long StartPosition { get; set; }
  24. public bool AllowEndOfFile = true;
  25. private readonly IDirectStreamProvider _directStreamProvider;
  26. private readonly IEnvironmentInfo _environment;
  27. public ProgressiveFileCopier(IFileSystem fileSystem, string path, Dictionary<string, string> outputHeaders, TranscodingJob job, ILogger logger, IEnvironmentInfo environment, CancellationToken cancellationToken)
  28. {
  29. _fileSystem = fileSystem;
  30. _path = path;
  31. _outputHeaders = outputHeaders;
  32. _job = job;
  33. _logger = logger;
  34. _cancellationToken = cancellationToken;
  35. _environment = environment;
  36. }
  37. public ProgressiveFileCopier(IDirectStreamProvider directStreamProvider, Dictionary<string, string> outputHeaders, TranscodingJob job, ILogger logger, IEnvironmentInfo environment, CancellationToken cancellationToken)
  38. {
  39. _directStreamProvider = directStreamProvider;
  40. _outputHeaders = outputHeaders;
  41. _job = job;
  42. _logger = logger;
  43. _cancellationToken = cancellationToken;
  44. _environment = environment;
  45. }
  46. public IDictionary<string, string> Headers => _outputHeaders;
  47. private Stream GetInputStream(bool allowAsyncFileRead)
  48. {
  49. var fileOpenOptions = FileOpenOptions.SequentialScan;
  50. if (allowAsyncFileRead)
  51. {
  52. fileOpenOptions |= FileOpenOptions.Asynchronous;
  53. }
  54. return _fileSystem.GetFileStream(_path, FileOpenMode.Open, FileAccessMode.Read, FileShareMode.ReadWrite, fileOpenOptions);
  55. }
  56. public async Task WriteToAsync(Stream outputStream, CancellationToken cancellationToken)
  57. {
  58. cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancellationToken).Token;
  59. try
  60. {
  61. if (_directStreamProvider != null)
  62. {
  63. await _directStreamProvider.CopyToAsync(outputStream, cancellationToken).ConfigureAwait(false);
  64. return;
  65. }
  66. var eofCount = 0;
  67. // use non-async filestream along with read due to https://github.com/dotnet/corefx/issues/6039
  68. var allowAsyncFileRead = _environment.OperatingSystem != Model.System.OperatingSystem.Windows;
  69. using (var inputStream = GetInputStream(allowAsyncFileRead))
  70. {
  71. if (StartPosition > 0)
  72. {
  73. inputStream.Position = StartPosition;
  74. }
  75. while (eofCount < 20 || !AllowEndOfFile)
  76. {
  77. int bytesRead;
  78. if (allowAsyncFileRead)
  79. {
  80. bytesRead = await CopyToInternalAsync(inputStream, outputStream, cancellationToken).ConfigureAwait(false);
  81. }
  82. else
  83. {
  84. bytesRead = await CopyToInternalAsyncWithSyncRead(inputStream, outputStream, cancellationToken).ConfigureAwait(false);
  85. }
  86. //var position = fs.Position;
  87. //_logger.LogDebug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path);
  88. if (bytesRead == 0)
  89. {
  90. if (_job == null || _job.HasExited)
  91. {
  92. eofCount++;
  93. }
  94. await Task.Delay(100, cancellationToken).ConfigureAwait(false);
  95. }
  96. else
  97. {
  98. eofCount = 0;
  99. }
  100. }
  101. }
  102. }
  103. finally
  104. {
  105. if (_job != null)
  106. {
  107. ApiEntryPoint.Instance.OnTranscodeEndRequest(_job);
  108. }
  109. }
  110. }
  111. private async Task<int> CopyToInternalAsyncWithSyncRead(Stream source, Stream destination, CancellationToken cancellationToken)
  112. {
  113. var array = new byte[StreamCopyToBufferSize];
  114. int bytesRead;
  115. int totalBytesRead = 0;
  116. while ((bytesRead = source.Read(array, 0, array.Length)) != 0)
  117. {
  118. var bytesToWrite = bytesRead;
  119. if (bytesToWrite > 0)
  120. {
  121. await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false);
  122. _bytesWritten += bytesRead;
  123. totalBytesRead += bytesRead;
  124. if (_job != null)
  125. {
  126. _job.BytesDownloaded = Math.Max(_job.BytesDownloaded ?? _bytesWritten, _bytesWritten);
  127. }
  128. }
  129. }
  130. return totalBytesRead;
  131. }
  132. private async Task<int> CopyToInternalAsync(Stream source, Stream destination, CancellationToken cancellationToken)
  133. {
  134. var array = new byte[StreamCopyToBufferSize];
  135. int bytesRead;
  136. int totalBytesRead = 0;
  137. while ((bytesRead = await source.ReadAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(false)) != 0)
  138. {
  139. var bytesToWrite = bytesRead;
  140. if (bytesToWrite > 0)
  141. {
  142. await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false);
  143. _bytesWritten += bytesRead;
  144. totalBytesRead += bytesRead;
  145. if (_job != null)
  146. {
  147. _job.BytesDownloaded = Math.Max(_job.BytesDownloaded ?? _bytesWritten, _bytesWritten);
  148. }
  149. }
  150. }
  151. return totalBytesRead;
  152. }
  153. }
  154. }