SessionWebSocketListener.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. #nullable disable
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Net.WebSockets;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using Jellyfin.Api.Extensions;
  9. using MediaBrowser.Controller.Net;
  10. using MediaBrowser.Controller.Session;
  11. using MediaBrowser.Model.Net;
  12. using MediaBrowser.Model.Session;
  13. using Microsoft.AspNetCore.Http;
  14. using Microsoft.Extensions.Logging;
  15. namespace Emby.Server.Implementations.Session
  16. {
  17. /// <summary>
  18. /// Class SessionWebSocketListener.
  19. /// </summary>
  20. public sealed class SessionWebSocketListener : IWebSocketListener, IDisposable
  21. {
  22. /// <summary>
  23. /// The timeout in seconds after which a WebSocket is considered to be lost.
  24. /// </summary>
  25. private const int WebSocketLostTimeout = 60;
  26. /// <summary>
  27. /// The keep-alive interval factor; controls how often the watcher will check on the status of the WebSockets.
  28. /// </summary>
  29. private const float IntervalFactor = 0.2f;
  30. /// <summary>
  31. /// The ForceKeepAlive factor; controls when a ForceKeepAlive is sent.
  32. /// </summary>
  33. private const float ForceKeepAliveFactor = 0.75f;
  34. /// <summary>
  35. /// Lock used for accessing the KeepAlive cancellation token.
  36. /// </summary>
  37. private readonly object _keepAliveLock = new object();
  38. /// <summary>
  39. /// The WebSocket watchlist.
  40. /// </summary>
  41. private readonly HashSet<IWebSocketConnection> _webSockets = new HashSet<IWebSocketConnection>();
  42. /// <summary>
  43. /// Lock used for accessing the WebSockets watchlist.
  44. /// </summary>
  45. private readonly object _webSocketsLock = new object();
  46. private readonly ISessionManager _sessionManager;
  47. private readonly ILogger<SessionWebSocketListener> _logger;
  48. private readonly ILoggerFactory _loggerFactory;
  49. /// <summary>
  50. /// The KeepAlive cancellation token.
  51. /// </summary>
  52. private CancellationTokenSource _keepAliveCancellationToken;
  53. /// <summary>
  54. /// Initializes a new instance of the <see cref="SessionWebSocketListener" /> class.
  55. /// </summary>
  56. /// <param name="logger">The logger.</param>
  57. /// <param name="sessionManager">The session manager.</param>
  58. /// <param name="loggerFactory">The logger factory.</param>
  59. public SessionWebSocketListener(
  60. ILogger<SessionWebSocketListener> logger,
  61. ISessionManager sessionManager,
  62. ILoggerFactory loggerFactory)
  63. {
  64. _logger = logger;
  65. _sessionManager = sessionManager;
  66. _loggerFactory = loggerFactory;
  67. }
  68. /// <inheritdoc />
  69. public void Dispose()
  70. {
  71. StopKeepAlive();
  72. }
  73. /// <summary>
  74. /// Processes the message.
  75. /// </summary>
  76. /// <param name="message">The message.</param>
  77. /// <returns>Task.</returns>
  78. public Task ProcessMessageAsync(WebSocketMessageInfo message)
  79. => Task.CompletedTask;
  80. /// <inheritdoc />
  81. public async Task ProcessWebSocketConnectedAsync(IWebSocketConnection connection, HttpContext httpContext)
  82. {
  83. var session = await GetSession(httpContext, connection.RemoteEndPoint?.ToString()).ConfigureAwait(false);
  84. if (session is not null)
  85. {
  86. EnsureController(session, connection);
  87. await KeepAliveWebSocket(connection).ConfigureAwait(false);
  88. }
  89. else
  90. {
  91. _logger.LogWarning("Unable to determine session based on query string: {0}", httpContext.Request.QueryString);
  92. }
  93. }
  94. private async Task<SessionInfo> GetSession(HttpContext httpContext, string remoteEndpoint)
  95. {
  96. if (!httpContext.User.Identity?.IsAuthenticated ?? false)
  97. {
  98. return null;
  99. }
  100. var deviceId = httpContext.User.GetDeviceId();
  101. if (httpContext.Request.Query.TryGetValue("deviceId", out var queryDeviceId))
  102. {
  103. deviceId = queryDeviceId;
  104. }
  105. return await _sessionManager.GetSessionByAuthenticationToken(httpContext.User.GetToken(), deviceId, remoteEndpoint)
  106. .ConfigureAwait(false);
  107. }
  108. private void EnsureController(SessionInfo session, IWebSocketConnection connection)
  109. {
  110. var controllerInfo = session.EnsureController<WebSocketController>(
  111. s => new WebSocketController(_loggerFactory.CreateLogger<WebSocketController>(), s, _sessionManager));
  112. var controller = (WebSocketController)controllerInfo.Item1;
  113. controller.AddWebSocket(connection);
  114. _sessionManager.OnSessionControllerConnected(session);
  115. }
  116. /// <summary>
  117. /// Called when a WebSocket is closed.
  118. /// </summary>
  119. /// <param name="sender">The WebSocket.</param>
  120. /// <param name="e">The event arguments.</param>
  121. private void OnWebSocketClosed(object sender, EventArgs e)
  122. {
  123. var webSocket = (IWebSocketConnection)sender;
  124. _logger.LogDebug("WebSocket {0} is closed.", webSocket);
  125. RemoveWebSocket(webSocket);
  126. }
  127. /// <summary>
  128. /// Adds a WebSocket to the KeepAlive watchlist.
  129. /// </summary>
  130. /// <param name="webSocket">The WebSocket to monitor.</param>
  131. private async Task KeepAliveWebSocket(IWebSocketConnection webSocket)
  132. {
  133. lock (_webSocketsLock)
  134. {
  135. if (!_webSockets.Add(webSocket))
  136. {
  137. _logger.LogWarning("Multiple attempts to keep alive single WebSocket {0}", webSocket);
  138. return;
  139. }
  140. webSocket.Closed += OnWebSocketClosed;
  141. webSocket.LastKeepAliveDate = DateTime.UtcNow;
  142. StartKeepAlive();
  143. }
  144. // Notify WebSocket about timeout
  145. try
  146. {
  147. await SendForceKeepAlive(webSocket).ConfigureAwait(false);
  148. }
  149. catch (WebSocketException exception)
  150. {
  151. _logger.LogWarning(exception, "Cannot send ForceKeepAlive message to WebSocket {0}.", webSocket);
  152. }
  153. }
  154. /// <summary>
  155. /// Removes a WebSocket from the KeepAlive watchlist.
  156. /// </summary>
  157. /// <param name="webSocket">The WebSocket to remove.</param>
  158. private void RemoveWebSocket(IWebSocketConnection webSocket)
  159. {
  160. lock (_webSocketsLock)
  161. {
  162. if (!_webSockets.Remove(webSocket))
  163. {
  164. _logger.LogWarning("WebSocket {0} not on watchlist.", webSocket);
  165. }
  166. else
  167. {
  168. webSocket.Closed -= OnWebSocketClosed;
  169. }
  170. }
  171. }
  172. /// <summary>
  173. /// Starts the KeepAlive watcher.
  174. /// </summary>
  175. private void StartKeepAlive()
  176. {
  177. lock (_keepAliveLock)
  178. {
  179. if (_keepAliveCancellationToken is null)
  180. {
  181. _keepAliveCancellationToken = new CancellationTokenSource();
  182. // Start KeepAlive watcher
  183. _ = RepeatAsyncCallbackEvery(
  184. KeepAliveSockets,
  185. TimeSpan.FromSeconds(WebSocketLostTimeout * IntervalFactor),
  186. _keepAliveCancellationToken.Token);
  187. }
  188. }
  189. }
  190. /// <summary>
  191. /// Stops the KeepAlive watcher.
  192. /// </summary>
  193. private void StopKeepAlive()
  194. {
  195. lock (_keepAliveLock)
  196. {
  197. if (_keepAliveCancellationToken is not null)
  198. {
  199. _keepAliveCancellationToken.Cancel();
  200. _keepAliveCancellationToken.Dispose();
  201. _keepAliveCancellationToken = null;
  202. }
  203. }
  204. lock (_webSocketsLock)
  205. {
  206. foreach (var webSocket in _webSockets)
  207. {
  208. webSocket.Closed -= OnWebSocketClosed;
  209. }
  210. _webSockets.Clear();
  211. }
  212. }
  213. /// <summary>
  214. /// Checks status of KeepAlive of WebSockets.
  215. /// </summary>
  216. private async Task KeepAliveSockets()
  217. {
  218. List<IWebSocketConnection> inactive;
  219. List<IWebSocketConnection> lost;
  220. lock (_webSocketsLock)
  221. {
  222. _logger.LogDebug("Watching {0} WebSockets.", _webSockets.Count);
  223. inactive = _webSockets.Where(i =>
  224. {
  225. var elapsed = (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds;
  226. return (elapsed > WebSocketLostTimeout * ForceKeepAliveFactor) && (elapsed < WebSocketLostTimeout);
  227. }).ToList();
  228. lost = _webSockets.Where(i => (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds >= WebSocketLostTimeout).ToList();
  229. }
  230. if (inactive.Count > 0)
  231. {
  232. _logger.LogInformation("Sending ForceKeepAlive message to {0} inactive WebSockets.", inactive.Count);
  233. }
  234. foreach (var webSocket in inactive)
  235. {
  236. try
  237. {
  238. await SendForceKeepAlive(webSocket).ConfigureAwait(false);
  239. }
  240. catch (WebSocketException exception)
  241. {
  242. _logger.LogInformation(exception, "Error sending ForceKeepAlive message to WebSocket.");
  243. lost.Add(webSocket);
  244. }
  245. }
  246. lock (_webSocketsLock)
  247. {
  248. if (lost.Count > 0)
  249. {
  250. _logger.LogInformation("Lost {0} WebSockets.", lost.Count);
  251. foreach (var webSocket in lost)
  252. {
  253. // TODO: handle session relative to the lost webSocket
  254. RemoveWebSocket(webSocket);
  255. }
  256. }
  257. if (_webSockets.Count == 0)
  258. {
  259. StopKeepAlive();
  260. }
  261. }
  262. }
  263. /// <summary>
  264. /// Sends a ForceKeepAlive message to a WebSocket.
  265. /// </summary>
  266. /// <param name="webSocket">The WebSocket.</param>
  267. /// <returns>Task.</returns>
  268. private Task SendForceKeepAlive(IWebSocketConnection webSocket)
  269. {
  270. return webSocket.SendAsync(
  271. new WebSocketMessage<int>
  272. {
  273. MessageType = SessionMessageType.ForceKeepAlive,
  274. Data = WebSocketLostTimeout
  275. },
  276. CancellationToken.None);
  277. }
  278. /// <summary>
  279. /// Runs a given async callback once every specified interval time, until cancelled.
  280. /// </summary>
  281. /// <param name="callback">The async callback.</param>
  282. /// <param name="interval">The interval time.</param>
  283. /// <param name="cancellationToken">The cancellation token.</param>
  284. /// <returns>Task.</returns>
  285. private async Task RepeatAsyncCallbackEvery(Func<Task> callback, TimeSpan interval, CancellationToken cancellationToken)
  286. {
  287. while (!cancellationToken.IsCancellationRequested)
  288. {
  289. await callback().ConfigureAwait(false);
  290. try
  291. {
  292. await Task.Delay(interval, cancellationToken).ConfigureAwait(false);
  293. }
  294. catch (TaskCanceledException)
  295. {
  296. return;
  297. }
  298. }
  299. }
  300. }
  301. }