ProgressiveFileStream.cs 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. using System;
  2. using System.Diagnostics;
  3. using System.IO;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using Jellyfin.Api.Models.PlaybackDtos;
  7. using MediaBrowser.Model.IO;
  8. namespace Jellyfin.Api.Helpers;
  9. /// <summary>
  10. /// A progressive file stream for transferring transcoded files as they are written to.
  11. /// </summary>
  12. public class ProgressiveFileStream : Stream
  13. {
  14. private readonly Stream _stream;
  15. private readonly TranscodingJobDto? _job;
  16. private readonly TranscodingJobHelper? _transcodingJobHelper;
  17. private readonly int _timeoutMs;
  18. private bool _disposed;
  19. /// <summary>
  20. /// Initializes a new instance of the <see cref="ProgressiveFileStream"/> class.
  21. /// </summary>
  22. /// <param name="filePath">The path to the transcoded file.</param>
  23. /// <param name="job">The transcoding job information.</param>
  24. /// <param name="transcodingJobHelper">The transcoding job helper.</param>
  25. /// <param name="timeoutMs">The timeout duration in milliseconds.</param>
  26. public ProgressiveFileStream(string filePath, TranscodingJobDto? job, TranscodingJobHelper transcodingJobHelper, int timeoutMs = 30000)
  27. {
  28. _job = job;
  29. _transcodingJobHelper = transcodingJobHelper;
  30. _timeoutMs = timeoutMs;
  31. _stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, IODefaults.FileStreamBufferSize, FileOptions.Asynchronous | FileOptions.SequentialScan);
  32. }
  33. /// <summary>
  34. /// Initializes a new instance of the <see cref="ProgressiveFileStream"/> class.
  35. /// </summary>
  36. /// <param name="stream">The stream to progressively copy.</param>
  37. /// <param name="timeoutMs">The timeout duration in milliseconds.</param>
  38. public ProgressiveFileStream(Stream stream, int timeoutMs = 30000)
  39. {
  40. _job = null;
  41. _transcodingJobHelper = null;
  42. _timeoutMs = timeoutMs;
  43. _stream = stream;
  44. }
  45. /// <inheritdoc />
  46. public override bool CanRead => _stream.CanRead;
  47. /// <inheritdoc />
  48. public override bool CanSeek => false;
  49. /// <inheritdoc />
  50. public override bool CanWrite => false;
  51. /// <inheritdoc />
  52. public override long Length => throw new NotSupportedException();
  53. /// <inheritdoc />
  54. public override long Position
  55. {
  56. get => throw new NotSupportedException();
  57. set => throw new NotSupportedException();
  58. }
  59. /// <inheritdoc />
  60. public override void Flush()
  61. {
  62. // Not supported
  63. }
  64. /// <inheritdoc />
  65. public override int Read(byte[] buffer, int offset, int count)
  66. => Read(buffer.AsSpan(offset, count));
  67. /// <inheritdoc />
  68. public override int Read(Span<byte> buffer)
  69. {
  70. int totalBytesRead = 0;
  71. var stopwatch = Stopwatch.StartNew();
  72. while (true)
  73. {
  74. totalBytesRead += _stream.Read(buffer);
  75. if (StopReading(totalBytesRead, stopwatch.ElapsedMilliseconds))
  76. {
  77. break;
  78. }
  79. Thread.Sleep(50);
  80. }
  81. UpdateBytesWritten(totalBytesRead);
  82. return totalBytesRead;
  83. }
  84. /// <inheritdoc />
  85. public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
  86. => await ReadAsync(buffer.AsMemory(offset, count), cancellationToken).ConfigureAwait(false);
  87. /// <inheritdoc />
  88. public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
  89. {
  90. int totalBytesRead = 0;
  91. var stopwatch = Stopwatch.StartNew();
  92. while (true)
  93. {
  94. totalBytesRead += await _stream.ReadAsync(buffer, cancellationToken).ConfigureAwait(false);
  95. if (StopReading(totalBytesRead, stopwatch.ElapsedMilliseconds))
  96. {
  97. break;
  98. }
  99. await Task.Delay(50, cancellationToken).ConfigureAwait(false);
  100. }
  101. UpdateBytesWritten(totalBytesRead);
  102. return totalBytesRead;
  103. }
  104. /// <inheritdoc />
  105. public override long Seek(long offset, SeekOrigin origin)
  106. => throw new NotSupportedException();
  107. /// <inheritdoc />
  108. public override void SetLength(long value)
  109. => throw new NotSupportedException();
  110. /// <inheritdoc />
  111. public override void Write(byte[] buffer, int offset, int count)
  112. => throw new NotSupportedException();
  113. /// <inheritdoc />
  114. protected override void Dispose(bool disposing)
  115. {
  116. if (_disposed)
  117. {
  118. return;
  119. }
  120. try
  121. {
  122. if (disposing)
  123. {
  124. _stream.Dispose();
  125. if (_job is not null)
  126. {
  127. _transcodingJobHelper?.OnTranscodeEndRequest(_job);
  128. }
  129. }
  130. }
  131. finally
  132. {
  133. _disposed = true;
  134. base.Dispose(disposing);
  135. }
  136. }
  137. private void UpdateBytesWritten(int totalBytesRead)
  138. {
  139. if (_job is not null)
  140. {
  141. _job.BytesDownloaded += totalBytesRead;
  142. }
  143. }
  144. private bool StopReading(int bytesRead, long elapsed)
  145. {
  146. // It should stop reading when anything has been successfully read or if the job has exited
  147. // If the job is null, however, it's a live stream and will require user action to close,
  148. // but don't keep it open indefinitely if it isn't reading anything
  149. return bytesRead > 0 || (_job?.HasExited ?? elapsed >= _timeoutMs);
  150. }
  151. }