QueueStream.cs 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.IO;
  5. using System.Linq;
  6. using System.Text;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. using MediaBrowser.Model.Logging;
  10. namespace Emby.Server.Implementations.LiveTv.TunerHosts
  11. {
  12. public class QueueStream
  13. {
  14. private readonly Stream _outputStream;
  15. private readonly ConcurrentQueue<Tuple<byte[], int, int>> _queue = new ConcurrentQueue<Tuple<byte[], int, int>>();
  16. private CancellationToken _cancellationToken;
  17. public TaskCompletionSource<bool> TaskCompletion { get; private set; }
  18. public Action<QueueStream> OnFinished { get; set; }
  19. private readonly ILogger _logger;
  20. public Guid Id = Guid.NewGuid();
  21. public QueueStream(Stream outputStream, ILogger logger)
  22. {
  23. _outputStream = outputStream;
  24. _logger = logger;
  25. TaskCompletion = new TaskCompletionSource<bool>();
  26. }
  27. public void Queue(byte[] bytes, int offset, int count)
  28. {
  29. _queue.Enqueue(new Tuple<byte[], int, int>(bytes, offset, count));
  30. }
  31. public void Start(CancellationToken cancellationToken)
  32. {
  33. _cancellationToken = cancellationToken;
  34. Task.Run(() => StartInternal());
  35. }
  36. private Tuple<byte[], int, int> Dequeue()
  37. {
  38. Tuple<byte[], int, int> result;
  39. if (_queue.TryDequeue(out result))
  40. {
  41. return result;
  42. }
  43. return null;
  44. }
  45. private void OnClosed()
  46. {
  47. GC.Collect();
  48. if (OnFinished != null)
  49. {
  50. OnFinished(this);
  51. }
  52. }
  53. public async Task WriteAsync(byte[] bytes, int offset, int count)
  54. {
  55. //return _outputStream.WriteAsync(bytes, offset, count, cancellationToken);
  56. var cancellationToken = _cancellationToken;
  57. try
  58. {
  59. await _outputStream.WriteAsync(bytes, offset, count, cancellationToken).ConfigureAwait(false);
  60. }
  61. catch (OperationCanceledException)
  62. {
  63. _logger.Debug("QueueStream cancelled");
  64. TaskCompletion.TrySetCanceled();
  65. OnClosed();
  66. }
  67. catch (Exception ex)
  68. {
  69. _logger.ErrorException("Error in QueueStream", ex);
  70. TaskCompletion.TrySetException(ex);
  71. OnClosed();
  72. }
  73. }
  74. private async Task StartInternal()
  75. {
  76. var cancellationToken = _cancellationToken;
  77. try
  78. {
  79. while (true)
  80. {
  81. var result = Dequeue();
  82. if (result != null)
  83. {
  84. await _outputStream.WriteAsync(result.Item1, result.Item2, result.Item3, cancellationToken).ConfigureAwait(false);
  85. }
  86. else
  87. {
  88. await Task.Delay(50, cancellationToken).ConfigureAwait(false);
  89. }
  90. }
  91. }
  92. catch (OperationCanceledException)
  93. {
  94. _logger.Debug("QueueStream cancelled");
  95. TaskCompletion.TrySetCanceled();
  96. }
  97. catch (Exception ex)
  98. {
  99. _logger.ErrorException("Error in QueueStream", ex);
  100. TaskCompletion.TrySetException(ex);
  101. }
  102. finally
  103. {
  104. OnClosed();
  105. }
  106. }
  107. }
  108. }