SessionWebSocketListener.cs 9.3 KB


  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Net.WebSockets;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using Jellyfin.Api.Extensions;
  8. using Jellyfin.Api.Helpers;
  9. using MediaBrowser.Controller.Library;
  10. using MediaBrowser.Controller.Net;
  11. using MediaBrowser.Controller.Net.WebSocketMessages.Outbound;
  12. using MediaBrowser.Controller.Session;
  13. using Microsoft.AspNetCore.Http;
  14. using Microsoft.Extensions.Logging;
  15. namespace Emby.Server.Implementations.Session
  16. {
  17. /// <summary>
  18. /// Class SessionWebSocketListener.
  19. /// </summary>
  20. public sealed class SessionWebSocketListener : IWebSocketListener, IDisposable
  21. {
  22. /// <summary>
  23. /// The timeout in seconds after which a WebSocket is considered to be lost.
  24. /// </summary>
  25. private const int WebSocketLostTimeout = 60;
  26. /// <summary>
  27. /// The keep-alive interval factor; controls how often the watcher will check on the status of the WebSockets.
  28. /// </summary>
  29. private const float IntervalFactor = 0.2f;
  30. /// <summary>
  31. /// The ForceKeepAlive factor; controls when a ForceKeepAlive is sent.
  32. /// </summary>
  33. private const float ForceKeepAliveFactor = 0.75f;
  34. /// <summary>
  35. /// The WebSocket watchlist.
  36. /// </summary>
  37. private readonly HashSet<IWebSocketConnection> _webSockets = new HashSet<IWebSocketConnection>();
  38. /// <summary>
  39. /// Lock used for accessing the WebSockets watchlist.
  40. /// </summary>
  41. private readonly Lock _webSocketsLock = new();
  42. private readonly ISessionManager _sessionManager;
  43. private readonly IUserManager _userManager;
  44. private readonly ILogger<SessionWebSocketListener> _logger;
  45. private readonly ILoggerFactory _loggerFactory;
  46. /// <summary>
  47. /// The KeepAlive cancellation token.
  48. /// </summary>
  49. private System.Timers.Timer _keepAlive;
  50. /// <summary>
  51. /// Initializes a new instance of the <see cref="SessionWebSocketListener" /> class.
  52. /// </summary>
  53. /// <param name="logger">The logger.</param>
  54. /// <param name="sessionManager">The session manager.</param>
  55. /// <param name="userManager">The user manager.</param>
  56. /// <param name="loggerFactory">The logger factory.</param>
  57. public SessionWebSocketListener(
  58. ILogger<SessionWebSocketListener> logger,
  59. ISessionManager sessionManager,
  60. IUserManager userManager,
  61. ILoggerFactory loggerFactory)
  62. {
  63. _logger = logger;
  64. _sessionManager = sessionManager;
  65. _userManager = userManager;
  66. _loggerFactory = loggerFactory;
  67. _keepAlive = new System.Timers.Timer(TimeSpan.FromSeconds(WebSocketLostTimeout * IntervalFactor))
  68. {
  69. AutoReset = true,
  70. Enabled = false
  71. };
  72. _keepAlive.Elapsed += KeepAliveSockets;
  73. }
  74. /// <inheritdoc />
  75. public void Dispose()
  76. {
  77. if (_keepAlive is not null)
  78. {
  79. _keepAlive.Stop();
  80. _keepAlive.Elapsed -= KeepAliveSockets;
  81. _keepAlive.Dispose();
  82. _keepAlive = null!;
  83. }
  84. lock (_webSocketsLock)
  85. {
  86. foreach (var webSocket in _webSockets)
  87. {
  88. webSocket.Closed -= OnWebSocketClosed;
  89. }
  90. _webSockets.Clear();
  91. }
  92. }
  93. /// <summary>
  94. /// Processes the message.
  95. /// </summary>
  96. /// <param name="message">The message.</param>
  97. /// <returns>Task.</returns>
  98. public Task ProcessMessageAsync(WebSocketMessageInfo message)
  99. => Task.CompletedTask;
  100. /// <inheritdoc />
  101. public async Task ProcessWebSocketConnectedAsync(IWebSocketConnection connection, HttpContext httpContext)
  102. {
  103. var session = await RequestHelpers.GetSession(_sessionManager, _userManager, httpContext).ConfigureAwait(false);
  104. EnsureController(session, connection);
  105. await KeepAliveWebSocket(connection).ConfigureAwait(false);
  106. }
  107. private void EnsureController(SessionInfo session, IWebSocketConnection connection)
  108. {
  109. var controllerInfo = session.EnsureController<WebSocketController>(
  110. s => new WebSocketController(_loggerFactory.CreateLogger<WebSocketController>(), s, _sessionManager));
  111. var controller = (WebSocketController)controllerInfo.Item1;
  112. controller.AddWebSocket(connection);
  113. _sessionManager.OnSessionControllerConnected(session);
  114. }
  115. /// <summary>
  116. /// Called when a WebSocket is closed.
  117. /// </summary>
  118. /// <param name="sender">The WebSocket.</param>
  119. /// <param name="e">The event arguments.</param>
  120. private void OnWebSocketClosed(object? sender, EventArgs e)
  121. {
  122. if (sender is null)
  123. {
  124. return;
  125. }
  126. var webSocket = (IWebSocketConnection)sender;
  127. _logger.LogDebug("WebSocket {0} is closed.", webSocket);
  128. RemoveWebSocket(webSocket);
  129. }
  130. /// <summary>
  131. /// Adds a WebSocket to the KeepAlive watchlist.
  132. /// </summary>
  133. /// <param name="webSocket">The WebSocket to monitor.</param>
  134. private async Task KeepAliveWebSocket(IWebSocketConnection webSocket)
  135. {
  136. lock (_webSocketsLock)
  137. {
  138. if (!_webSockets.Add(webSocket))
  139. {
  140. _logger.LogWarning("Multiple attempts to keep alive single WebSocket {0}", webSocket);
  141. return;
  142. }
  143. webSocket.Closed += OnWebSocketClosed;
  144. webSocket.LastKeepAliveDate = DateTime.UtcNow;
  145. _keepAlive.Start();
  146. }
  147. // Notify WebSocket about timeout
  148. try
  149. {
  150. await SendForceKeepAlive(webSocket).ConfigureAwait(false);
  151. }
  152. catch (WebSocketException exception)
  153. {
  154. _logger.LogWarning(exception, "Cannot send ForceKeepAlive message to WebSocket {0}.", webSocket);
  155. }
  156. }
  157. /// <summary>
  158. /// Removes a WebSocket from the KeepAlive watchlist.
  159. /// </summary>
  160. /// <param name="webSocket">The WebSocket to remove.</param>
  161. private void RemoveWebSocket(IWebSocketConnection webSocket)
  162. {
  163. lock (_webSocketsLock)
  164. {
  165. if (_webSockets.Remove(webSocket))
  166. {
  167. webSocket.Closed -= OnWebSocketClosed;
  168. }
  169. else
  170. {
  171. _logger.LogWarning("WebSocket {0} not on watchlist.", webSocket);
  172. }
  173. if (_webSockets.Count == 0)
  174. {
  175. _keepAlive.Stop();
  176. }
  177. }
  178. }
  179. /// <summary>
  180. /// Checks status of KeepAlive of WebSockets.
  181. /// </summary>
  182. private async void KeepAliveSockets(object? o, EventArgs? e)
  183. {
  184. List<IWebSocketConnection> inactive;
  185. List<IWebSocketConnection> lost;
  186. lock (_webSocketsLock)
  187. {
  188. _logger.LogDebug("Watching {0} WebSockets.", _webSockets.Count);
  189. inactive = _webSockets.Where(i =>
  190. {
  191. var elapsed = (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds;
  192. return (elapsed > WebSocketLostTimeout * ForceKeepAliveFactor) && (elapsed < WebSocketLostTimeout);
  193. }).ToList();
  194. lost = _webSockets.Where(i => (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds >= WebSocketLostTimeout).ToList();
  195. }
  196. if (inactive.Count > 0)
  197. {
  198. _logger.LogInformation("Sending ForceKeepAlive message to {0} inactive WebSockets.", inactive.Count);
  199. }
  200. foreach (var webSocket in inactive)
  201. {
  202. try
  203. {
  204. await SendForceKeepAlive(webSocket).ConfigureAwait(false);
  205. }
  206. catch (WebSocketException exception)
  207. {
  208. _logger.LogInformation(exception, "Error sending ForceKeepAlive message to WebSocket.");
  209. lost.Add(webSocket);
  210. }
  211. }
  212. lock (_webSocketsLock)
  213. {
  214. if (lost.Count > 0)
  215. {
  216. _logger.LogInformation("Lost {0} WebSockets.", lost.Count);
  217. foreach (var webSocket in lost)
  218. {
  219. // TODO: handle session relative to the lost webSocket
  220. RemoveWebSocket(webSocket);
  221. }
  222. }
  223. }
  224. }
  225. /// <summary>
  226. /// Sends a ForceKeepAlive message to a WebSocket.
  227. /// </summary>
  228. /// <param name="webSocket">The WebSocket.</param>
  229. /// <returns>Task.</returns>
  230. private async Task SendForceKeepAlive(IWebSocketConnection webSocket)
  231. {
  232. await webSocket.SendAsync(
  233. new ForceKeepAliveMessage(WebSocketLostTimeout),
  234. CancellationToken.None).ConfigureAwait(false);
  235. }
  236. }
  237. }