Browse Source

update hdhomerun udp stream

Luke Pulverenti 8 years ago
parent
commit
55bfb71baa

+ 47 - 8
Emby.Common.Implementations/Net/UdpSocket.cs

@@ -16,12 +16,15 @@ namespace Emby.Common.Implementations.Net
 
 
     internal sealed class UdpSocket : DisposableManagedObjectBase, ISocket
     internal sealed class UdpSocket : DisposableManagedObjectBase, ISocket
     {
     {
-
-        #region Fields
-
         private Socket _Socket;
         private Socket _Socket;
         private int _LocalPort;
         private int _LocalPort;
-        #endregion
+
+        private SocketAsyncEventArgs _receiveSocketAsyncEventArgs = new SocketAsyncEventArgs()
+        {
+            SocketFlags = SocketFlags.None
+        };
+
+        private TaskCompletionSource<SocketReceiveResult> _currentReceiveTaskCompletionSource;
 
 
         public UdpSocket(Socket socket, int localPort, IPAddress ip)
         public UdpSocket(Socket socket, int localPort, IPAddress ip)
         {
         {
@@ -32,6 +35,32 @@ namespace Emby.Common.Implementations.Net
             LocalIPAddress = NetworkManager.ToIpAddressInfo(ip);
             LocalIPAddress = NetworkManager.ToIpAddressInfo(ip);
 
 
             _Socket.Bind(new IPEndPoint(ip, _LocalPort));
             _Socket.Bind(new IPEndPoint(ip, _LocalPort));
+
+            InitReceiveSocketAsyncEventArgs();
+        }
+
+        private void InitReceiveSocketAsyncEventArgs()
+        {
+            var buffer = new byte[8192];
+            _receiveSocketAsyncEventArgs.SetBuffer(buffer, 0, buffer.Length);
+            _receiveSocketAsyncEventArgs.Completed += _receiveSocketAsyncEventArgs_Completed;
+        }
+
+        private void _receiveSocketAsyncEventArgs_Completed(object sender, SocketAsyncEventArgs e)
+        {
+            var tcs = _currentReceiveTaskCompletionSource;
+            if (tcs != null)
+            {
+                _currentReceiveTaskCompletionSource = null;
+
+                tcs.TrySetResult(new SocketReceiveResult
+                {
+                    Buffer = e.Buffer,
+                    ReceivedBytes = e.BytesTransferred,
+                    RemoteEndPoint = ToIpEndPointInfo(e.RemoteEndPoint as IPEndPoint),
+                    LocalIPAddress = LocalIPAddress
+                });
+            }
         }
         }
 
 
         public UdpSocket(Socket socket, IpEndPointInfo endPoint)
         public UdpSocket(Socket socket, IpEndPointInfo endPoint)
@@ -40,6 +69,8 @@ namespace Emby.Common.Implementations.Net
 
 
             _Socket = socket;
             _Socket = socket;
             _Socket.Connect(NetworkManager.ToIPEndPoint(endPoint));
             _Socket.Connect(NetworkManager.ToIPEndPoint(endPoint));
+
+            InitReceiveSocketAsyncEventArgs();
         }
         }
 
 
         public IpAddressInfo LocalIPAddress
         public IpAddressInfo LocalIPAddress
@@ -57,12 +88,12 @@ namespace Emby.Common.Implementations.Net
             var tcs = new TaskCompletionSource<SocketReceiveResult>();
             var tcs = new TaskCompletionSource<SocketReceiveResult>();
 
 
             EndPoint receivedFromEndPoint = new IPEndPoint(IPAddress.Any, 0);
             EndPoint receivedFromEndPoint = new IPEndPoint(IPAddress.Any, 0);
-            var state = new AsyncReceiveState(_Socket, receivedFromEndPoint);
-            state.TaskCompletionSource = tcs;
-
             cancellationToken.Register(() => tcs.TrySetCanceled());
             cancellationToken.Register(() => tcs.TrySetCanceled());
 
 
 #if NETSTANDARD1_6
 #if NETSTANDARD1_6
+            var state = new AsyncReceiveState(_Socket, receivedFromEndPoint);
+            state.TaskCompletionSource = tcs;
+
             _Socket.ReceiveFromAsync(new ArraySegment<Byte>(state.Buffer), SocketFlags.None, state.RemoteEndPoint)
             _Socket.ReceiveFromAsync(new ArraySegment<Byte>(state.Buffer), SocketFlags.None, state.RemoteEndPoint)
                 .ContinueWith((task, asyncState) =>
                 .ContinueWith((task, asyncState) =>
                 {
                 {
@@ -74,7 +105,15 @@ namespace Emby.Common.Implementations.Net
                     }
                     }
                 }, state);
                 }, state);
 #else
 #else
-            _Socket.BeginReceiveFrom(state.Buffer, 0, state.Buffer.Length, SocketFlags.None, ref state.RemoteEndPoint, ProcessResponse, state);
+            //var state = new AsyncReceiveState(_Socket, receivedFromEndPoint);
+            //state.TaskCompletionSource = tcs;
+
+            //_Socket.BeginReceiveFrom(state.Buffer, 0, state.Buffer.Length, SocketFlags.None, ref state.RemoteEndPoint, ProcessResponse, state);
+
+            _receiveSocketAsyncEventArgs.RemoteEndPoint = receivedFromEndPoint;
+            _currentReceiveTaskCompletionSource = tcs;
+
+            var isPending = _Socket.ReceiveFromAsync(_receiveSocketAsyncEventArgs);
 #endif
 #endif
 
 
             return tcs.Task;
             return tcs.Task;

+ 17 - 103
Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHost.cs

@@ -275,6 +275,16 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
             public int HD { get; set; }
             public int HD { get; set; }
         }
         }
 
 
+        protected EncodingOptions GetEncodingOptions()
+        {
+            return Config.GetConfiguration<EncodingOptions>("encoding");
+        }
+
+        private string GetHdHrIdFromChannelId(string channelId)
+        {
+            return channelId.Split('_')[1];
+        }
+
         private MediaSourceInfo GetMediaSource(TunerHostInfo info, string channelId, ChannelInfo channelInfo, string profile)
         private MediaSourceInfo GetMediaSource(TunerHostInfo info, string channelId, ChannelInfo channelInfo, string profile)
         {
         {
             int? width = null;
             int? width = null;
@@ -362,14 +372,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
                 nal = "0";
                 nal = "0";
             }
             }
 
 
-            var url = GetApiUrl(info, true) + "/auto/v" + channelId;
-
-            // If raw was used, the tuner doesn't support params
-            if (!string.IsNullOrWhiteSpace(profile)
-                && !string.Equals(profile, "native", StringComparison.OrdinalIgnoreCase))
-            {
-                url += "?transcode=" + profile;
-            }
+            var url = GetApiUrl(info, false);
 
 
             var id = profile;
             var id = profile;
             if (string.IsNullOrWhiteSpace(id))
             if (string.IsNullOrWhiteSpace(id))
@@ -378,92 +381,6 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
             }
             }
             id += "_" + url.GetMD5().ToString("N");
             id += "_" + url.GetMD5().ToString("N");
 
 
-            var mediaSource = new MediaSourceInfo
-            {
-                Path = url,
-                Protocol = MediaProtocol.Http,
-                MediaStreams = new List<MediaStream>
-                        {
-                            new MediaStream
-                            {
-                                Type = MediaStreamType.Video,
-                                // Set the index to -1 because we don't know the exact index of the video stream within the container
-                                Index = -1,
-                                IsInterlaced = isInterlaced,
-                                Codec = videoCodec,
-                                Width = width,
-                                Height = height,
-                                BitRate = videoBitrate,
-                                NalLengthSize = nal
-
-                            },
-                            new MediaStream
-                            {
-                                Type = MediaStreamType.Audio,
-                                // Set the index to -1 because we don't know the exact index of the audio stream within the container
-                                Index = -1,
-                                Codec = audioCodec,
-                                BitRate = audioBitrate
-                            }
-                        },
-                RequiresOpening = true,
-                RequiresClosing = false,
-                BufferMs = 0,
-                Container = "ts",
-                Id = id,
-                SupportsDirectPlay = false,
-                SupportsDirectStream = true,
-                SupportsTranscoding = true,
-                IsInfiniteStream = true
-            };
-
-            mediaSource.InferTotalBitrate();
-
-            return mediaSource;
-        }
-
-        protected EncodingOptions GetEncodingOptions()
-        {
-            return Config.GetConfiguration<EncodingOptions>("encoding");
-        }
-
-        private string GetHdHrIdFromChannelId(string channelId)
-        {
-            return channelId.Split('_')[1];
-        }
-
-        private MediaSourceInfo GetLegacyMediaSource(TunerHostInfo info, string channelId, ChannelInfo channel)
-        {
-            int? width = null;
-            int? height = null;
-            bool isInterlaced = true;
-            string videoCodec = null;
-            string audioCodec = null;
-
-            int? videoBitrate = null;
-            int? audioBitrate = null;
-
-            if (channel != null)
-            {
-                if (string.IsNullOrWhiteSpace(videoCodec))
-                {
-                    videoCodec = channel.VideoCodec;
-                }
-                audioCodec = channel.AudioCodec;
-            }
-
-            // normalize
-            if (string.Equals(videoCodec, "mpeg2", StringComparison.OrdinalIgnoreCase))
-            {
-                videoCodec = "mpeg2video";
-            }
-
-            string nal = null;
-
-            var url = GetApiUrl(info, false);
-            var id = channelId;
-            id += "_" + url.GetMD5().ToString("N");
-
             var mediaSource = new MediaSourceInfo
             var mediaSource = new MediaSourceInfo
             {
             {
                 Path = url,
                 Path = url,
@@ -527,7 +444,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
 
 
             if (isLegacyTuner)
             if (isLegacyTuner)
             {
             {
-                list.Add(GetLegacyMediaSource(info, hdhrId, channelInfo));
+                list.Add(GetMediaSource(info, hdhrId, channelInfo, "native"));
             }
             }
             else
             else
             {
             {
@@ -579,20 +496,17 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
 
 
             var hdhomerunChannel = channelInfo as HdHomerunChannelInfo;
             var hdhomerunChannel = channelInfo as HdHomerunChannelInfo;
 
 
+            var mediaSource = GetMediaSource(info, hdhrId, channelInfo, profile);
+            var modelInfo = await GetModelInfo(info, false, cancellationToken).ConfigureAwait(false);
+
             if (hdhomerunChannel != null && hdhomerunChannel.IsLegacyTuner)
             if (hdhomerunChannel != null && hdhomerunChannel.IsLegacyTuner)
             {
             {
-                var mediaSource = GetLegacyMediaSource(info, hdhrId, channelInfo);
-                var modelInfo = await GetModelInfo(info, false, cancellationToken).ConfigureAwait(false);
-
                 return new HdHomerunUdpStream(mediaSource, streamId, new LegacyHdHomerunChannelCommands(hdhomerunChannel.Url), modelInfo.TunerCount, _fileSystem, _httpClient, Logger, Config.ApplicationPaths, _appHost, _socketFactory, _networkManager);
                 return new HdHomerunUdpStream(mediaSource, streamId, new LegacyHdHomerunChannelCommands(hdhomerunChannel.Url), modelInfo.TunerCount, _fileSystem, _httpClient, Logger, Config.ApplicationPaths, _appHost, _socketFactory, _networkManager);
             }
             }
             else
             else
             {
             {
-                var mediaSource = GetMediaSource(info, hdhrId, channelInfo, profile);
-                //var modelInfo = await GetModelInfo(info, false, cancellationToken).ConfigureAwait(false);
-
-                return new HdHomerunHttpStream(mediaSource, streamId, _fileSystem, _httpClient, Logger, Config.ApplicationPaths, _appHost);
-                //return new HdHomerunUdpStream(mediaSource, streamId, new HdHomerunChannelCommands(hdhomerunChannel.Number), modelInfo.TunerCount, _fileSystem, _httpClient, Logger, Config.ApplicationPaths, _appHost, _socketFactory, _networkManager);
+                //return new HdHomerunHttpStream(mediaSource, streamId, _fileSystem, _httpClient, Logger, Config.ApplicationPaths, _appHost);
+                return new HdHomerunUdpStream(mediaSource, streamId, new HdHomerunChannelCommands(hdhomerunChannel.Number), modelInfo.TunerCount, _fileSystem, _httpClient, Logger, Config.ApplicationPaths, _appHost, _socketFactory, _networkManager);
             }
             }
         }
         }
 
 

+ 1 - 128
Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs

@@ -120,8 +120,6 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
                                 // send url to start streaming
                                 // send url to start streaming
                                 await hdHomerunManager.StartStreaming(remoteAddress, localAddress, localPort, _channelCommands, _numTuners, cancellationToken).ConfigureAwait(false);
                                 await hdHomerunManager.StartStreaming(remoteAddress, localAddress, localPort, _channelCommands, _numTuners, cancellationToken).ConfigureAwait(false);
 
 
-                                var timeoutToken = CancellationTokenSource.CreateLinkedTokenSource(new CancellationTokenSource(5000).Token, cancellationToken).Token;
-                                var response = await udpClient.ReceiveAsync(timeoutToken).ConfigureAwait(false);
                                 _logger.Info("Opened HDHR UDP stream from {0}", remoteAddress);
                                 _logger.Info("Opened HDHR UDP stream from {0}", remoteAddress);
 
 
                                 if (!cancellationToken.IsCancellationRequested)
                                 if (!cancellationToken.IsCancellationRequested)
@@ -132,8 +130,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
                                         onStarted = () => openTaskCompletionSource.TrySetResult(true);
                                         onStarted = () => openTaskCompletionSource.TrySetResult(true);
                                     }
                                     }
 
 
-                                    var stream = new UdpClientStream(udpClient);
-                                    await _multicastStream.CopyUntilCancelled(stream, onStarted, cancellationToken).ConfigureAwait(false);
+                                    await _multicastStream.CopyUntilCancelled(udpClient, onStarted, cancellationToken).ConfigureAwait(false);
                                 }
                                 }
                             }
                             }
                             catch (OperationCanceledException ex)
                             catch (OperationCanceledException ex)
@@ -158,7 +155,6 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
                         }
                         }
 
 
                         await hdHomerunManager.StopStreaming().ConfigureAwait(false);
                         await hdHomerunManager.StopStreaming().ConfigureAwait(false);
-                        udpClient.Dispose();
                         _liveStreamTaskCompletionSource.TrySetResult(true);
                         _liveStreamTaskCompletionSource.TrySetResult(true);
                     }
                     }
                 }
                 }
@@ -171,127 +167,4 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
             return _multicastStream.CopyToAsync(stream);
             return _multicastStream.CopyToAsync(stream);
         }
         }
     }
     }
-
-    // This handles the ReadAsync function only of a Stream object
-    // This is used to wrap a UDP socket into a stream for MulticastStream which only uses ReadAsync
-    public class UdpClientStream : Stream
-    {
-        private static int RtpHeaderBytes = 12;
-        private static int PacketSize = 1316;
-        private readonly ISocket _udpClient;
-        bool disposed;
-
-        public UdpClientStream(ISocket udpClient) : base()
-        {
-            _udpClient = udpClient;
-        }
-
-        public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
-        {
-            if (buffer == null)
-                throw new ArgumentNullException("buffer");
-
-            if (offset + count < 0)
-                throw new ArgumentOutOfRangeException("offset + count must not be negative", "offset+count");
-
-            if (offset + count > buffer.Length)
-                throw new ArgumentException("offset + count must not be greater than the length of buffer", "offset+count");
-
-            if (disposed)
-                throw new ObjectDisposedException(typeof(UdpClientStream).ToString());
-
-            // This will always receive a 1328 packet size (PacketSize + RtpHeaderSize)
-            // The RTP header will be stripped so see how many reads we need to make to fill the buffer.
-            int numReads = count / PacketSize;
-            int totalBytesRead = 0;
-
-            for (int i = 0; i < numReads; ++i)
-            {
-                var data = await _udpClient.ReceiveAsync(cancellationToken).ConfigureAwait(false);
-
-                var bytesRead = data.ReceivedBytes - RtpHeaderBytes;
-
-                // remove rtp header
-                Buffer.BlockCopy(data.Buffer, RtpHeaderBytes, buffer, offset, bytesRead);
-                offset += bytesRead;
-                totalBytesRead += bytesRead;
-            }
-            return totalBytesRead;
-        }
-
-        protected override void Dispose(bool disposing)
-        {
-            disposed = true;
-        }
-
-        public override bool CanRead
-        {
-            get
-            {
-                throw new NotImplementedException();
-            }
-        }
-
-        public override bool CanSeek
-        {
-            get
-            {
-                throw new NotImplementedException();
-            }
-        }
-
-        public override bool CanWrite
-        {
-            get
-            {
-                throw new NotImplementedException();
-            }
-        }
-
-        public override long Length
-        {
-            get
-            {
-                throw new NotImplementedException();
-            }
-        }
-
-        public override long Position
-        {
-            get
-            {
-                throw new NotImplementedException();
-            }
-
-            set
-            {
-                throw new NotImplementedException();
-            }
-        }
-
-        public override void Flush()
-        {
-            throw new NotImplementedException();
-        }
-
-        public override int Read(byte[] buffer, int offset, int count)
-        {
-            throw new NotImplementedException();
-        }
-
-        public override long Seek(long offset, SeekOrigin origin)
-        {
-            throw new NotImplementedException();
-        }
-
-        public override void SetLength(long value)
-        {
-            throw new NotImplementedException();
-        }
-
-        public override void Write(byte[] buffer, int offset, int count)
-        {
-            throw new NotImplementedException();
-        }
-    }
 }
 }

+ 47 - 1
Emby.Server.Implementations/LiveTv/TunerHosts/MulticastStream.cs

@@ -7,6 +7,7 @@ using System.Text;
 using System.Threading;
 using System.Threading;
 using System.Threading.Tasks;
 using System.Threading.Tasks;
 using MediaBrowser.Model.Logging;
 using MediaBrowser.Model.Logging;
+using MediaBrowser.Model.Net;
 
 
 namespace Emby.Server.Implementations.LiveTv.TunerHosts
 namespace Emby.Server.Implementations.LiveTv.TunerHosts
 {
 {
@@ -40,7 +41,52 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
                     var allStreams = _outputStreams.ToList();
                     var allStreams = _outputStreams.ToList();
                     foreach (var stream in allStreams)
                     foreach (var stream in allStreams)
                     {
                     {
-                        stream.Value.Queue(copy);
+                        stream.Value.Queue(copy, 0, copy.Length);
+                    }
+
+                    if (onStarted != null)
+                    {
+                        var onStartedCopy = onStarted;
+                        onStarted = null;
+                        Task.Run(onStartedCopy);
+                    }
+                }
+
+                else
+                {
+                    await Task.Delay(100).ConfigureAwait(false);
+                }
+            }
+        }
+
+        private static int RtpHeaderBytes = 12;
+        public async Task CopyUntilCancelled(ISocket udpClient, Action onStarted, CancellationToken cancellationToken)
+        {
+            _cancellationToken = cancellationToken;
+
+            while (!cancellationToken.IsCancellationRequested)
+            {
+                var receiveToken = cancellationToken;
+
+                // On the first connection attempt, put a timeout to avoid being stuck indefinitely in the event of failure
+                if (onStarted != null)
+                {
+                    receiveToken = CancellationTokenSource.CreateLinkedTokenSource(new CancellationTokenSource(5000).Token, cancellationToken).Token;
+                }
+
+                var data = await udpClient.ReceiveAsync(receiveToken).ConfigureAwait(false);
+                var bytesRead = data.ReceivedBytes - RtpHeaderBytes;
+
+                if (bytesRead > 0)
+                {
+                    byte[] copy = new byte[bytesRead];
+                    Buffer.BlockCopy(data.Buffer, RtpHeaderBytes, copy, 0, bytesRead);
+
+                    var allStreams = _outputStreams.ToList();
+                    foreach (var stream in allStreams)
+                    {
+                        //stream.Value.Queue(data.Buffer, RtpHeaderBytes, bytesRead);
+                        stream.Value.Queue(copy, 0, copy.Length);
                     }
                     }
 
 
                     if (onStarted != null)
                     if (onStarted != null)

+ 10 - 10
Emby.Server.Implementations/LiveTv/TunerHosts/QueueStream.cs

@@ -13,7 +13,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
     public class QueueStream
     public class QueueStream
     {
     {
         private readonly Stream _outputStream;
         private readonly Stream _outputStream;
-        private readonly ConcurrentQueue<byte[]> _queue = new ConcurrentQueue<byte[]>();
+        private readonly ConcurrentQueue<Tuple<byte[],int,int>> _queue = new ConcurrentQueue<Tuple<byte[], int, int>>();
         private CancellationToken _cancellationToken;
         private CancellationToken _cancellationToken;
         public TaskCompletionSource<bool> TaskCompletion { get; private set; }
         public TaskCompletionSource<bool> TaskCompletion { get; private set; }
 
 
@@ -28,9 +28,9 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
             TaskCompletion = new TaskCompletionSource<bool>();
             TaskCompletion = new TaskCompletionSource<bool>();
         }
         }
 
 
-        public void Queue(byte[] bytes)
+        public void Queue(byte[] bytes, int offset, int count)
         {
         {
-            _queue.Enqueue(bytes);
+            _queue.Enqueue(new Tuple<byte[], int, int>(bytes, offset, count));
         }
         }
 
 
         public void Start(CancellationToken cancellationToken)
         public void Start(CancellationToken cancellationToken)
@@ -39,12 +39,12 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
             Task.Run(() => StartInternal());
             Task.Run(() => StartInternal());
         }
         }
 
 
-        private byte[] Dequeue()
+        private Tuple<byte[], int, int> Dequeue()
         {
         {
-            byte[] bytes;
-            if (_queue.TryDequeue(out bytes))
+            Tuple<byte[], int, int> result;
+            if (_queue.TryDequeue(out result))
             {
             {
-                return bytes;
+                return result;
             }
             }
 
 
             return null;
             return null;
@@ -58,10 +58,10 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
             {
             {
                 while (true)
                 while (true)
                 {
                 {
-                    var bytes = Dequeue();
-                    if (bytes != null)
+                    var result = Dequeue();
+                    if (result != null)
                     {
                     {
-                        await _outputStream.WriteAsync(bytes, 0, bytes.Length, cancellationToken).ConfigureAwait(false);
+                        await _outputStream.WriteAsync(result.Item1, result.Item2, result.Item3, cancellationToken).ConfigureAwait(false);
                     }
                     }
                     else
                     else
                     {
                     {