QueueStream.cs 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.IO;
  5. using System.Linq;
  6. using System.Text;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. using MediaBrowser.Model.Logging;
  10. namespace Emby.Server.Implementations.LiveTv.TunerHosts
  11. {
  12. public class QueueStream
  13. {
  14. private readonly Stream _outputStream;
  15. private readonly ConcurrentQueue<Tuple<byte[], int, int>> _queue = new ConcurrentQueue<Tuple<byte[], int, int>>();
  16. public TaskCompletionSource<bool> TaskCompletion { get; private set; }
  17. public Action<QueueStream> OnFinished { get; set; }
  18. private readonly ILogger _logger;
  19. public Guid Id = Guid.NewGuid();
  20. public QueueStream(Stream outputStream, ILogger logger)
  21. {
  22. _outputStream = outputStream;
  23. _logger = logger;
  24. TaskCompletion = new TaskCompletionSource<bool>();
  25. }
  26. public void Queue(byte[] bytes, int offset, int count)
  27. {
  28. _queue.Enqueue(new Tuple<byte[], int, int>(bytes, offset, count));
  29. }
  30. public void Start(CancellationToken cancellationToken)
  31. {
  32. Task.Run(() => StartInternal(cancellationToken));
  33. }
  34. private Tuple<byte[], int, int> Dequeue()
  35. {
  36. Tuple<byte[], int, int> result;
  37. if (_queue.TryDequeue(out result))
  38. {
  39. return result;
  40. }
  41. return null;
  42. }
  43. private void OnClosed()
  44. {
  45. GC.Collect();
  46. if (OnFinished != null)
  47. {
  48. OnFinished(this);
  49. }
  50. }
  51. public void Write(byte[] bytes, int offset, int count)
  52. {
  53. //return _outputStream.WriteAsync(bytes, offset, count, cancellationToken);
  54. try
  55. {
  56. _outputStream.Write(bytes, offset, count);
  57. }
  58. catch (OperationCanceledException)
  59. {
  60. _logger.Debug("QueueStream cancelled");
  61. TaskCompletion.TrySetCanceled();
  62. OnClosed();
  63. }
  64. catch (Exception ex)
  65. {
  66. _logger.ErrorException("Error in QueueStream", ex);
  67. TaskCompletion.TrySetException(ex);
  68. OnClosed();
  69. }
  70. }
  71. private async Task StartInternal(CancellationToken cancellationToken)
  72. {
  73. try
  74. {
  75. while (true)
  76. {
  77. cancellationToken.ThrowIfCancellationRequested();
  78. var result = Dequeue();
  79. if (result != null)
  80. {
  81. _outputStream.Write(result.Item1, result.Item2, result.Item3);
  82. }
  83. else
  84. {
  85. await Task.Delay(50, cancellationToken).ConfigureAwait(false);
  86. }
  87. }
  88. }
  89. catch (OperationCanceledException)
  90. {
  91. _logger.Debug("QueueStream cancelled");
  92. TaskCompletion.TrySetCanceled();
  93. }
  94. catch (Exception ex)
  95. {
  96. _logger.ErrorException("Error in QueueStream", ex);
  97. TaskCompletion.TrySetException(ex);
  98. }
  99. finally
  100. {
  101. OnClosed();
  102. }
  103. }
  104. }
  105. }