SessionWebSocketListener.cs 12 KB


  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Net.WebSockets;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using Jellyfin.Data.Events;
  8. using MediaBrowser.Controller.Net;
  9. using MediaBrowser.Controller.Session;
  10. using MediaBrowser.Model.Net;
  11. using Microsoft.AspNetCore.Http;
  12. using Microsoft.Extensions.Logging;
  13. namespace Emby.Server.Implementations.Session
  14. {
  15. /// <summary>
  16. /// Class SessionWebSocketListener.
  17. /// </summary>
  18. public sealed class SessionWebSocketListener : IWebSocketListener, IDisposable
  19. {
  20. /// <summary>
  21. /// The timeout in seconds after which a WebSocket is considered to be lost.
  22. /// </summary>
  23. public const int WebSocketLostTimeout = 60;
  24. /// <summary>
  25. /// The keep-alive interval factor; controls how often the watcher will check on the status of the WebSockets.
  26. /// </summary>
  27. public const float IntervalFactor = 0.2f;
  28. /// <summary>
  29. /// The ForceKeepAlive factor; controls when a ForceKeepAlive is sent.
  30. /// </summary>
  31. public const float ForceKeepAliveFactor = 0.75f;
  32. /// <summary>
  33. /// The _session manager.
  34. /// </summary>
  35. private readonly ISessionManager _sessionManager;
  36. /// <summary>
  37. /// The _logger.
  38. /// </summary>
  39. private readonly ILogger<SessionWebSocketListener> _logger;
  40. private readonly ILoggerFactory _loggerFactory;
  41. private readonly IWebSocketManager _webSocketManager;
  42. /// <summary>
  43. /// The KeepAlive cancellation token.
  44. /// </summary>
  45. private CancellationTokenSource _keepAliveCancellationToken;
  46. /// <summary>
  47. /// Lock used for accesing the KeepAlive cancellation token.
  48. /// </summary>
  49. private readonly object _keepAliveLock = new object();
  50. /// <summary>
  51. /// The WebSocket watchlist.
  52. /// </summary>
  53. private readonly HashSet<IWebSocketConnection> _webSockets = new HashSet<IWebSocketConnection>();
  54. /// <summary>
  55. /// Lock used for accesing the WebSockets watchlist.
  56. /// </summary>
  57. private readonly object _webSocketsLock = new object();
  58. /// <summary>
  59. /// Initializes a new instance of the <see cref="SessionWebSocketListener" /> class.
  60. /// </summary>
  61. /// <param name="logger">The logger.</param>
  62. /// <param name="sessionManager">The session manager.</param>
  63. /// <param name="loggerFactory">The logger factory.</param>
  64. /// <param name="webSocketManager">The HTTP server.</param>
  65. public SessionWebSocketListener(
  66. ILogger<SessionWebSocketListener> logger,
  67. ISessionManager sessionManager,
  68. ILoggerFactory loggerFactory,
  69. IWebSocketManager webSocketManager)
  70. {
  71. _logger = logger;
  72. _sessionManager = sessionManager;
  73. _loggerFactory = loggerFactory;
  74. _webSocketManager = webSocketManager;
  75. webSocketManager.WebSocketConnected += OnServerManagerWebSocketConnected;
  76. }
  77. private async void OnServerManagerWebSocketConnected(object sender, GenericEventArgs<IWebSocketConnection> e)
  78. {
  79. var session = GetSession(e.Argument.QueryString, e.Argument.RemoteEndPoint.ToString());
  80. if (session != null)
  81. {
  82. EnsureController(session, e.Argument);
  83. await KeepAliveWebSocket(e.Argument).ConfigureAwait(false);
  84. }
  85. else
  86. {
  87. _logger.LogWarning("Unable to determine session based on query string: {0}", e.Argument.QueryString);
  88. }
  89. }
  90. private SessionInfo GetSession(IQueryCollection queryString, string remoteEndpoint)
  91. {
  92. if (queryString == null)
  93. {
  94. return null;
  95. }
  96. var token = queryString["api_key"];
  97. if (string.IsNullOrWhiteSpace(token))
  98. {
  99. return null;
  100. }
  101. var deviceId = queryString["deviceId"];
  102. return _sessionManager.GetSessionByAuthenticationToken(token, deviceId, remoteEndpoint);
  103. }
  104. /// <inheritdoc />
  105. public void Dispose()
  106. {
  107. _webSocketManager.WebSocketConnected -= OnServerManagerWebSocketConnected;
  108. StopKeepAlive();
  109. }
  110. /// <summary>
  111. /// Processes the message.
  112. /// </summary>
  113. /// <param name="message">The message.</param>
  114. /// <returns>Task.</returns>
  115. public Task ProcessMessageAsync(WebSocketMessageInfo message)
  116. => Task.CompletedTask;
  117. private void EnsureController(SessionInfo session, IWebSocketConnection connection)
  118. {
  119. var controllerInfo = session.EnsureController<WebSocketController>(
  120. s => new WebSocketController(_loggerFactory.CreateLogger<WebSocketController>(), s, _sessionManager));
  121. var controller = (WebSocketController)controllerInfo.Item1;
  122. controller.AddWebSocket(connection);
  123. }
  124. /// <summary>
  125. /// Called when a WebSocket is closed.
  126. /// </summary>
  127. /// <param name="sender">The WebSocket.</param>
  128. /// <param name="e">The event arguments.</param>
  129. private void OnWebSocketClosed(object sender, EventArgs e)
  130. {
  131. var webSocket = (IWebSocketConnection)sender;
  132. _logger.LogDebug("WebSocket {0} is closed.", webSocket);
  133. RemoveWebSocket(webSocket);
  134. }
  135. /// <summary>
  136. /// Adds a WebSocket to the KeepAlive watchlist.
  137. /// </summary>
  138. /// <param name="webSocket">The WebSocket to monitor.</param>
  139. private async Task KeepAliveWebSocket(IWebSocketConnection webSocket)
  140. {
  141. lock (_webSocketsLock)
  142. {
  143. if (!_webSockets.Add(webSocket))
  144. {
  145. _logger.LogWarning("Multiple attempts to keep alive single WebSocket {0}", webSocket);
  146. return;
  147. }
  148. webSocket.Closed += OnWebSocketClosed;
  149. webSocket.LastKeepAliveDate = DateTime.UtcNow;
  150. StartKeepAlive();
  151. }
  152. // Notify WebSocket about timeout
  153. try
  154. {
  155. await SendForceKeepAlive(webSocket).ConfigureAwait(false);
  156. }
  157. catch (WebSocketException exception)
  158. {
  159. _logger.LogWarning(exception, "Cannot send ForceKeepAlive message to WebSocket {0}.", webSocket);
  160. }
  161. }
  162. /// <summary>
  163. /// Removes a WebSocket from the KeepAlive watchlist.
  164. /// </summary>
  165. /// <param name="webSocket">The WebSocket to remove.</param>
  166. private void RemoveWebSocket(IWebSocketConnection webSocket)
  167. {
  168. lock (_webSocketsLock)
  169. {
  170. if (!_webSockets.Remove(webSocket))
  171. {
  172. _logger.LogWarning("WebSocket {0} not on watchlist.", webSocket);
  173. }
  174. else
  175. {
  176. webSocket.Closed -= OnWebSocketClosed;
  177. }
  178. }
  179. }
  180. /// <summary>
  181. /// Starts the KeepAlive watcher.
  182. /// </summary>
  183. private void StartKeepAlive()
  184. {
  185. lock (_keepAliveLock)
  186. {
  187. if (_keepAliveCancellationToken == null)
  188. {
  189. _keepAliveCancellationToken = new CancellationTokenSource();
  190. // Start KeepAlive watcher
  191. _ = RepeatAsyncCallbackEvery(
  192. KeepAliveSockets,
  193. TimeSpan.FromSeconds(WebSocketLostTimeout * IntervalFactor),
  194. _keepAliveCancellationToken.Token);
  195. }
  196. }
  197. }
  198. /// <summary>
  199. /// Stops the KeepAlive watcher.
  200. /// </summary>
  201. private void StopKeepAlive()
  202. {
  203. lock (_keepAliveLock)
  204. {
  205. if (_keepAliveCancellationToken != null)
  206. {
  207. _keepAliveCancellationToken.Cancel();
  208. _keepAliveCancellationToken.Dispose();
  209. _keepAliveCancellationToken = null;
  210. }
  211. }
  212. lock (_webSocketsLock)
  213. {
  214. foreach (var webSocket in _webSockets)
  215. {
  216. webSocket.Closed -= OnWebSocketClosed;
  217. }
  218. _webSockets.Clear();
  219. }
  220. }
  221. /// <summary>
  222. /// Checks status of KeepAlive of WebSockets.
  223. /// </summary>
  224. private async Task KeepAliveSockets()
  225. {
  226. List<IWebSocketConnection> inactive;
  227. List<IWebSocketConnection> lost;
  228. lock (_webSocketsLock)
  229. {
  230. _logger.LogDebug("Watching {0} WebSockets.", _webSockets.Count);
  231. inactive = _webSockets.Where(i =>
  232. {
  233. var elapsed = (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds;
  234. return (elapsed > WebSocketLostTimeout * ForceKeepAliveFactor) && (elapsed < WebSocketLostTimeout);
  235. }).ToList();
  236. lost = _webSockets.Where(i => (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds >= WebSocketLostTimeout).ToList();
  237. }
  238. if (inactive.Count > 0)
  239. {
  240. _logger.LogInformation("Sending ForceKeepAlive message to {0} inactive WebSockets.", inactive.Count);
  241. }
  242. foreach (var webSocket in inactive)
  243. {
  244. try
  245. {
  246. await SendForceKeepAlive(webSocket).ConfigureAwait(false);
  247. }
  248. catch (WebSocketException exception)
  249. {
  250. _logger.LogInformation(exception, "Error sending ForceKeepAlive message to WebSocket.");
  251. lost.Add(webSocket);
  252. }
  253. }
  254. lock (_webSocketsLock)
  255. {
  256. if (lost.Count > 0)
  257. {
  258. _logger.LogInformation("Lost {0} WebSockets.", lost.Count);
  259. foreach (var webSocket in lost)
  260. {
  261. // TODO: handle session relative to the lost webSocket
  262. RemoveWebSocket(webSocket);
  263. }
  264. }
  265. if (_webSockets.Count == 0)
  266. {
  267. StopKeepAlive();
  268. }
  269. }
  270. }
  271. /// <summary>
  272. /// Sends a ForceKeepAlive message to a WebSocket.
  273. /// </summary>
  274. /// <param name="webSocket">The WebSocket.</param>
  275. /// <returns>Task.</returns>
  276. private Task SendForceKeepAlive(IWebSocketConnection webSocket)
  277. {
  278. return webSocket.SendAsync(
  279. new WebSocketMessage<int>
  280. {
  281. MessageType = "ForceKeepAlive",
  282. Data = WebSocketLostTimeout
  283. },
  284. CancellationToken.None);
  285. }
  286. /// <summary>
  287. /// Runs a given async callback once every specified interval time, until cancelled.
  288. /// </summary>
  289. /// <param name="callback">The async callback.</param>
  290. /// <param name="interval">The interval time.</param>
  291. /// <param name="cancellationToken">The cancellation token.</param>
  292. /// <returns>Task.</returns>
  293. private async Task RepeatAsyncCallbackEvery(Func<Task> callback, TimeSpan interval, CancellationToken cancellationToken)
  294. {
  295. while (!cancellationToken.IsCancellationRequested)
  296. {
  297. await callback().ConfigureAwait(false);
  298. try
  299. {
  300. await Task.Delay(interval, cancellationToken).ConfigureAwait(false);
  301. }
  302. catch (TaskCanceledException)
  303. {
  304. return;
  305. }
  306. }
  307. }
  308. }
  309. }