| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 | using System;using System.Collections.Concurrent;using System.Collections.Generic;using System.IO;using System.Linq;using System.Text;using System.Threading;using System.Threading.Tasks;using MediaBrowser.Model.Logging;namespace Emby.Server.Implementations.LiveTv.TunerHosts{    public class QueueStream    {        private readonly Stream _outputStream;        private readonly ConcurrentQueue<byte[]> _queue = new ConcurrentQueue<byte[]>();        private CancellationToken _cancellationToken;        public TaskCompletionSource<bool> TaskCompletion { get; private set; }        public Action<QueueStream> OnFinished { get; set; }        private readonly ILogger _logger;        public Guid Id = Guid.NewGuid();        public QueueStream(Stream outputStream, ILogger logger)        {            _outputStream = outputStream;            _logger = logger;            TaskCompletion = new TaskCompletionSource<bool>();        }        public void Queue(byte[] bytes)        {            _queue.Enqueue(bytes);        }        public void Start(CancellationToken cancellationToken)        {            _cancellationToken = cancellationToken;            Task.Run(() => StartInternal());        }        private byte[] Dequeue()        {            byte[] bytes;            if (_queue.TryDequeue(out bytes))            {                return bytes;            }            return null;        }        private async Task StartInternal()        {            var cancellationToken = _cancellationToken;            try            {                while (true)                {                    var bytes = Dequeue();                    if (bytes != null)                    {                        await _outputStream.WriteAsync(bytes, 0, bytes.Length, cancellationToken).ConfigureAwait(false);                    }                    else                    {                        await Task.Delay(50, cancellationToken).ConfigureAwait(false);                    }                }            }            catch (OperationCanceledException)            {                _logger.Debug("QueueStream cancelled");                TaskCompletion.TrySetCanceled();            }            catch (Exception ex)            {                _logger.ErrorException("Error in QueueStream", ex);                TaskCompletion.TrySetException(ex);            }            finally            {                if (OnFinished != null)                {                    OnFinished(this);                }            }        }    }}
 |