SessionWebSocketListener.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Net.WebSockets;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using MediaBrowser.Controller.Net;
  8. using MediaBrowser.Controller.Session;
  9. using MediaBrowser.Model.Events;
  10. using MediaBrowser.Model.Net;
  11. using Microsoft.AspNetCore.Http;
  12. using Microsoft.Extensions.Logging;
  13. namespace Emby.Server.Implementations.Session
  14. {
  15. /// <summary>
  16. /// Class SessionWebSocketListener
  17. /// </summary>
  18. public sealed class SessionWebSocketListener : IWebSocketListener, IDisposable
  19. {
  20. /// <summary>
  21. /// The timeout in seconds after which a WebSocket is considered to be lost.
  22. /// </summary>
  23. public const int WebSocketLostTimeout = 60;
  24. /// <summary>
  25. /// The keep-alive interval factor; controls how often the watcher will check on the status of the WebSockets.
  26. /// </summary>
  27. public const float IntervalFactor = 0.2f;
  28. /// <summary>
  29. /// The ForceKeepAlive factor; controls when a ForceKeepAlive is sent.
  30. /// </summary>
  31. public const float ForceKeepAliveFactor = 0.75f;
  32. /// <summary>
  33. /// The _session manager
  34. /// </summary>
  35. private readonly ISessionManager _sessionManager;
  36. /// <summary>
  37. /// The _logger
  38. /// </summary>
  39. private readonly ILogger<SessionWebSocketListener> _logger;
  40. private readonly ILoggerFactory _loggerFactory;
  41. private readonly IHttpServer _httpServer;
  42. /// <summary>
  43. /// The KeepAlive cancellation token.
  44. /// </summary>
  45. private CancellationTokenSource _keepAliveCancellationToken;
  46. /// <summary>
  47. /// Lock used for accesing the KeepAlive cancellation token.
  48. /// </summary>
  49. private readonly object _keepAliveLock = new object();
  50. /// <summary>
  51. /// The WebSocket watchlist.
  52. /// </summary>
  53. private readonly HashSet<IWebSocketConnection> _webSockets = new HashSet<IWebSocketConnection>();
  54. /// <summary>
  55. /// Lock used for accesing the WebSockets watchlist.
  56. /// </summary>
  57. private readonly object _webSocketsLock = new object();
  58. /// <summary>
  59. /// Initializes a new instance of the <see cref="SessionWebSocketListener" /> class.
  60. /// </summary>
  61. /// <param name="logger">The logger.</param>
  62. /// <param name="sessionManager">The session manager.</param>
  63. /// <param name="loggerFactory">The logger factory.</param>
  64. /// <param name="httpServer">The HTTP server.</param>
  65. public SessionWebSocketListener(
  66. ILogger<SessionWebSocketListener> logger,
  67. ISessionManager sessionManager,
  68. ILoggerFactory loggerFactory,
  69. IHttpServer httpServer)
  70. {
  71. _logger = logger;
  72. _sessionManager = sessionManager;
  73. _loggerFactory = loggerFactory;
  74. _httpServer = httpServer;
  75. httpServer.WebSocketConnected += OnServerManagerWebSocketConnected;
  76. }
  77. private async void OnServerManagerWebSocketConnected(object sender, GenericEventArgs<IWebSocketConnection> e)
  78. {
  79. var session = GetSession(e.Argument.QueryString, e.Argument.RemoteEndPoint.ToString());
  80. if (session != null)
  81. {
  82. EnsureController(session, e.Argument);
  83. await KeepAliveWebSocket(e.Argument);
  84. }
  85. else
  86. {
  87. _logger.LogWarning("Unable to determine session based on query string: {0}", e.Argument.QueryString);
  88. }
  89. }
  90. private SessionInfo GetSession(IQueryCollection queryString, string remoteEndpoint)
  91. {
  92. if (queryString == null)
  93. {
  94. return null;
  95. }
  96. var token = queryString["api_key"];
  97. if (string.IsNullOrWhiteSpace(token))
  98. {
  99. return null;
  100. }
  101. var deviceId = queryString["deviceId"];
  102. return _sessionManager.GetSessionByAuthenticationToken(token, deviceId, remoteEndpoint);
  103. }
  104. /// <inheritdoc />
  105. public void Dispose()
  106. {
  107. _httpServer.WebSocketConnected -= OnServerManagerWebSocketConnected;
  108. StopKeepAlive();
  109. }
  110. /// <summary>
  111. /// Processes the message.
  112. /// </summary>
  113. /// <param name="message">The message.</param>
  114. /// <returns>Task.</returns>
  115. public Task ProcessMessageAsync(WebSocketMessageInfo message)
  116. => Task.CompletedTask;
  117. private void EnsureController(SessionInfo session, IWebSocketConnection connection)
  118. {
  119. var controllerInfo = session.EnsureController<WebSocketController>(
  120. s => new WebSocketController(_loggerFactory.CreateLogger<WebSocketController>(), s, _sessionManager));
  121. var controller = (WebSocketController)controllerInfo.Item1;
  122. controller.AddWebSocket(connection);
  123. }
  124. /// <summary>
  125. /// Called when a WebSocket is closed.
  126. /// </summary>
  127. /// <param name="sender">The WebSocket.</param>
  128. /// <param name="e">The event arguments.</param>
  129. private void OnWebSocketClosed(object sender, EventArgs e)
  130. {
  131. var webSocket = (IWebSocketConnection)sender;
  132. _logger.LogDebug("WebSocket {0} is closed.", webSocket);
  133. RemoveWebSocket(webSocket);
  134. }
  135. /// <summary>
  136. /// Adds a WebSocket to the KeepAlive watchlist.
  137. /// </summary>
  138. /// <param name="webSocket">The WebSocket to monitor.</param>
  139. private async Task KeepAliveWebSocket(IWebSocketConnection webSocket)
  140. {
  141. lock (_webSocketsLock)
  142. {
  143. if (!_webSockets.Add(webSocket))
  144. {
  145. _logger.LogWarning("Multiple attempts to keep alive single WebSocket {0}", webSocket);
  146. return;
  147. }
  148. webSocket.Closed += OnWebSocketClosed;
  149. webSocket.LastKeepAliveDate = DateTime.UtcNow;
  150. StartKeepAlive();
  151. }
  152. // Notify WebSocket about timeout
  153. try
  154. {
  155. await SendForceKeepAlive(webSocket);
  156. }
  157. catch (WebSocketException exception)
  158. {
  159. _logger.LogWarning(exception, "Cannot send ForceKeepAlive message to WebSocket {0}.", webSocket);
  160. }
  161. }
  162. /// <summary>
  163. /// Removes a WebSocket from the KeepAlive watchlist.
  164. /// </summary>
  165. /// <param name="webSocket">The WebSocket to remove.</param>
  166. private void RemoveWebSocket(IWebSocketConnection webSocket)
  167. {
  168. lock (_webSocketsLock)
  169. {
  170. if (!_webSockets.Remove(webSocket))
  171. {
  172. _logger.LogWarning("WebSocket {0} not on watchlist.", webSocket);
  173. }
  174. else
  175. {
  176. webSocket.Closed -= OnWebSocketClosed;
  177. }
  178. }
  179. }
  180. /// <summary>
  181. /// Starts the KeepAlive watcher.
  182. /// </summary>
  183. private void StartKeepAlive()
  184. {
  185. lock (_keepAliveLock)
  186. {
  187. if (_keepAliveCancellationToken == null)
  188. {
  189. _keepAliveCancellationToken = new CancellationTokenSource();
  190. // Start KeepAlive watcher
  191. _ = RepeatAsyncCallbackEvery(
  192. KeepAliveSockets,
  193. TimeSpan.FromSeconds(WebSocketLostTimeout * IntervalFactor),
  194. _keepAliveCancellationToken.Token);
  195. }
  196. }
  197. }
  198. /// <summary>
  199. /// Stops the KeepAlive watcher.
  200. /// </summary>
  201. private void StopKeepAlive()
  202. {
  203. lock (_keepAliveLock)
  204. {
  205. if (_keepAliveCancellationToken != null)
  206. {
  207. _keepAliveCancellationToken.Cancel();
  208. _keepAliveCancellationToken = null;
  209. }
  210. }
  211. lock (_webSocketsLock)
  212. {
  213. foreach (var webSocket in _webSockets)
  214. {
  215. webSocket.Closed -= OnWebSocketClosed;
  216. }
  217. _webSockets.Clear();
  218. }
  219. }
  220. /// <summary>
  221. /// Checks status of KeepAlive of WebSockets.
  222. /// </summary>
  223. private async Task KeepAliveSockets()
  224. {
  225. List<IWebSocketConnection> inactive;
  226. List<IWebSocketConnection> lost;
  227. lock (_webSocketsLock)
  228. {
  229. _logger.LogDebug("Watching {0} WebSockets.", _webSockets.Count);
  230. inactive = _webSockets.Where(i =>
  231. {
  232. var elapsed = (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds;
  233. return (elapsed > WebSocketLostTimeout * ForceKeepAliveFactor) && (elapsed < WebSocketLostTimeout);
  234. }).ToList();
  235. lost = _webSockets.Where(i => (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds >= WebSocketLostTimeout).ToList();
  236. }
  237. if (inactive.Any())
  238. {
  239. _logger.LogInformation("Sending ForceKeepAlive message to {0} inactive WebSockets.", inactive.Count);
  240. }
  241. foreach (var webSocket in inactive)
  242. {
  243. try
  244. {
  245. await SendForceKeepAlive(webSocket);
  246. }
  247. catch (WebSocketException exception)
  248. {
  249. _logger.LogInformation(exception, "Error sending ForceKeepAlive message to WebSocket.");
  250. lost.Add(webSocket);
  251. }
  252. }
  253. lock (_webSocketsLock)
  254. {
  255. if (lost.Any())
  256. {
  257. _logger.LogInformation("Lost {0} WebSockets.", lost.Count);
  258. foreach (var webSocket in lost)
  259. {
  260. // TODO: handle session relative to the lost webSocket
  261. RemoveWebSocket(webSocket);
  262. }
  263. }
  264. if (!_webSockets.Any())
  265. {
  266. StopKeepAlive();
  267. }
  268. }
  269. }
  270. /// <summary>
  271. /// Sends a ForceKeepAlive message to a WebSocket.
  272. /// </summary>
  273. /// <param name="webSocket">The WebSocket.</param>
  274. /// <returns>Task.</returns>
  275. private Task SendForceKeepAlive(IWebSocketConnection webSocket)
  276. {
  277. return webSocket.SendAsync(new WebSocketMessage<int>
  278. {
  279. MessageType = "ForceKeepAlive",
  280. Data = WebSocketLostTimeout
  281. }, CancellationToken.None);
  282. }
  283. /// <summary>
  284. /// Runs a given async callback once every specified interval time, until cancelled.
  285. /// </summary>
  286. /// <param name="callback">The async callback.</param>
  287. /// <param name="interval">The interval time.</param>
  288. /// <param name="cancellationToken">The cancellation token.</param>
  289. /// <returns>Task.</returns>
  290. private async Task RepeatAsyncCallbackEvery(Func<Task> callback, TimeSpan interval, CancellationToken cancellationToken)
  291. {
  292. while (!cancellationToken.IsCancellationRequested)
  293. {
  294. await callback();
  295. Task task = Task.Delay(interval, cancellationToken);
  296. try
  297. {
  298. await task;
  299. }
  300. catch (TaskCanceledException)
  301. {
  302. return;
  303. }
  304. }
  305. }
  306. }
  307. }