Browse Source

Implement KeepAlive for WebSockets

gion 5 years ago
parent
commit
aad5058d25

+ 21 - 6
Emby.Server.Implementations/HttpServer/WebSocketConnection.cs

@@ -94,6 +94,9 @@ namespace Emby.Server.Implementations.HttpServer
         /// <value>The last activity date.</value>
         public DateTime LastActivityDate { get; private set; }
 
+        /// <inheritdoc />
+        public DateTime LastKeepAliveDate { get; set; }
+
         /// <summary>
         /// Gets the id.
         /// </summary>
@@ -158,11 +161,6 @@ namespace Emby.Server.Implementations.HttpServer
                 return;
             }
 
-            if (OnReceive == null)
-            {
-                return;
-            }
-
             try
             {
                 var stub = (WebSocketMessage<object>)_jsonSerializer.DeserializeFromString(message, typeof(WebSocketMessage<object>));
@@ -174,7 +172,15 @@ namespace Emby.Server.Implementations.HttpServer
                     Connection = this
                 };
 
-                OnReceive(info);
+                if (info.MessageType.Equals("KeepAlive", StringComparison.Ordinal))
+                {
+                    SendKeepAliveResponse();
+                }
+
+                if (OnReceive != null)
+                {
+                    OnReceive(info);
+                }
             }
             catch (Exception ex)
             {
@@ -233,6 +239,15 @@ namespace Emby.Server.Implementations.HttpServer
             return _socket.SendAsync(text, true, cancellationToken);
         }
 
+        private Task SendKeepAliveResponse()
+        {
+            LastKeepAliveDate = DateTime.UtcNow;
+            return SendAsync(new WebSocketMessage<string>
+            {
+                MessageType = "KeepAlive"
+            }, CancellationToken.None);
+        }
+
         /// <inheritdoc />
         public void Dispose()
         {

+ 156 - 0
Emby.Server.Implementations/Session/SessionWebSocketListener.cs

@@ -1,8 +1,13 @@
 using System;
+using System.Collections.Concurrent;
+using System.Linq;
+using System.Net.WebSockets;
+using System.Threading;
 using System.Threading.Tasks;
 using MediaBrowser.Controller.Net;
 using MediaBrowser.Controller.Session;
 using MediaBrowser.Model.Events;
+using MediaBrowser.Model.Net;
 using MediaBrowser.Model.Serialization;
 using Microsoft.AspNetCore.Http;
 using Microsoft.Extensions.Logging;
@@ -14,6 +19,21 @@ namespace Emby.Server.Implementations.Session
     /// </summary>
     public class SessionWebSocketListener : IWebSocketListener, IDisposable
     {
+        /// <summary>
+        /// The timeout in seconds after which a WebSocket is considered to be lost.
+        /// </summary>
+        public readonly int WebSocketLostTimeout = 60;
+
+        /// <summary>
+        /// The timer factor; controls the frequency of the timer.
+        /// </summary>
+        public readonly double TimerFactor = 0.2;
+
+        /// <summary>
+        /// The ForceKeepAlive factor; controls when a ForceKeepAlive is sent.
+        /// </summary>
+        public readonly double ForceKeepAliveFactor = 0.75;
+
         /// <summary>
         /// The _session manager
         /// </summary>
@@ -31,6 +51,15 @@ namespace Emby.Server.Implementations.Session
 
         private readonly IHttpServer _httpServer;
 
+        /// <summary>
+        /// The KeepAlive timer.
+        /// </summary>
+        private Timer _keepAliveTimer;
+
+        /// <summary>
+        /// The WebSocket watchlist.
+        /// </summary>
+        private readonly ConcurrentDictionary<IWebSocketConnection, byte> _webSockets = new ConcurrentDictionary<IWebSocketConnection, byte>();
 
         /// <summary>
         /// Initializes a new instance of the <see cref="SessionWebSocketListener" /> class.
@@ -55,6 +84,7 @@ namespace Emby.Server.Implementations.Session
             if (session != null)
             {
                 EnsureController(session, e.Argument);
+                KeepAliveWebSocket(e.Argument);
             }
             else
             {
@@ -82,6 +112,7 @@ namespace Emby.Server.Implementations.Session
         public void Dispose()
         {
             _httpServer.WebSocketConnected -= _serverManager_WebSocketConnected;
+            StopKeepAliveTimer();
         }
 
         /// <summary>
@@ -99,5 +130,130 @@ namespace Emby.Server.Implementations.Session
             var controller = (WebSocketController)controllerInfo.Item1;
             controller.AddWebSocket(connection);
         }
+
+        /// <summary>
+        /// Called when a WebSocket is closed.
+        /// </summary>
+        /// <param name="sender">The WebSocket.</param>
+        /// <param name="e">The event arguments.</param>
+        private void _webSocket_Closed(object sender, EventArgs e)
+        {
+            var webSocket = (IWebSocketConnection) sender;
+            webSocket.Closed -= _webSocket_Closed;
+            _webSockets.TryRemove(webSocket, out _);
+        }
+
+        /// <summary>
+        /// Adds a WebSocket to the KeepAlive watchlist.
+        /// </summary>
+        /// <param name="webSocket">The WebSocket to monitor.</param>
+        private async void KeepAliveWebSocket(IWebSocketConnection webSocket)
+        {
+            _webSockets.TryAdd(webSocket, 0);
+            webSocket.Closed += _webSocket_Closed;
+            webSocket.LastKeepAliveDate = DateTime.UtcNow;
+
+            // Notify WebSocket about timeout
+            try
+            {
+                await SendForceKeepAlive(webSocket);
+            }
+            catch (WebSocketException exception)
+            {
+                _logger.LogDebug(exception, "Error sending ForceKeepAlive message to WebSocket.");
+            }
+
+            StartKeepAliveTimer();
+        }
+
+        /// <summary>
+        /// Starts the KeepAlive timer.
+        /// </summary>
+        private void StartKeepAliveTimer()
+        {
+            if (_keepAliveTimer == null)
+            {
+                _keepAliveTimer = new Timer(
+                    KeepAliveSockets,
+                    null,
+                    TimeSpan.FromSeconds(WebSocketLostTimeout * TimerFactor),
+                    TimeSpan.FromSeconds(WebSocketLostTimeout * TimerFactor)
+                );
+            }
+        }
+
+        /// <summary>
+        /// Stops the KeepAlive timer.
+        /// </summary>
+        private void StopKeepAliveTimer()
+        {
+            if (_keepAliveTimer != null)
+            {
+                _keepAliveTimer.Dispose();
+                _keepAliveTimer = null;
+            }
+
+            foreach (var pair in _webSockets)
+            {
+                pair.Key.Closed -= _webSocket_Closed;
+            }
+        }
+
+        /// <summary>
+        /// Checks status of KeepAlive of WebSockets.
+        /// </summary>
+        /// <param name="state">The state.</param>
+        private async void KeepAliveSockets(object state)
+        {
+            var inactive = _webSockets.Keys.Where(i =>
+            {
+                var elapsed = (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds;
+                return (elapsed > WebSocketLostTimeout * ForceKeepAliveFactor) && (elapsed < WebSocketLostTimeout);
+            });
+            var lost = _webSockets.Keys.Where(i => (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds >= WebSocketLostTimeout);
+
+            if (inactive.Any())
+            {
+                _logger.LogDebug("Sending ForceKeepAlive message to {0} WebSockets.", inactive.Count());
+            }
+
+            foreach (var webSocket in inactive)
+            {
+                try
+                {
+                    await SendForceKeepAlive(webSocket);
+                }
+                catch (WebSocketException exception)
+                {
+                    _logger.LogDebug(exception, "Error sending ForceKeepAlive message to WebSocket.");
+                    lost.Append(webSocket);
+                }
+            }
+
+            if (lost.Any())
+            {
+                // TODO: handle lost webSockets
+                _logger.LogDebug("Lost {0} WebSockets.", lost.Count());
+            }
+
+            if (!_webSockets.Any())
+            {
+                StopKeepAliveTimer();
+            }
+        }
+
+        /// <summary>
+        /// Sends a ForceKeepAlive message to a WebSocket.
+        /// </summary>
+        /// <param name="webSocket">The WebSocket.</param>
+        /// <returns>Task.</returns>
+        private Task SendForceKeepAlive(IWebSocketConnection webSocket)
+        {
+            return webSocket.SendAsync(new WebSocketMessage<int>
+            {
+                MessageType = "ForceKeepAlive",
+                Data = WebSocketLostTimeout
+            }, CancellationToken.None);
+        }
     }
 }

+ 6 - 0
MediaBrowser.Controller/Net/IWebSocketConnection.cs

@@ -26,6 +26,12 @@ namespace MediaBrowser.Controller.Net
         /// <value>The last activity date.</value>
         DateTime LastActivityDate { get; }
 
+        /// <summary>
+        /// Gets or sets the date of last Keeplive received.
+        /// </summary>
+        /// <value>The date of last Keeplive received.</value>
+        public DateTime LastKeepAliveDate { get; set; }
+
         /// <summary>
         /// Gets or sets the URL.
         /// </summary>