WebSocketConnection.cs 9.9 KB


  1. using System;
  2. using System.Buffers;
  3. using System.IO.Pipelines;
  4. using System.Net;
  5. using System.Net.WebSockets;
  6. using System.Text;
  7. using System.Text.Json;
  8. using System.Threading;
  9. using System.Threading.Tasks;
  10. using Jellyfin.Extensions.Json;
  11. using MediaBrowser.Controller.Net;
  12. using MediaBrowser.Controller.Net.WebSocketMessages;
  13. using MediaBrowser.Model.Session;
  14. using Microsoft.Extensions.Logging;
  15. namespace Emby.Server.Implementations.HttpServer
  16. {
  17. /// <summary>
  18. /// Class WebSocketConnection.
  19. /// </summary>
  20. public class WebSocketConnection : IWebSocketConnection
  21. {
  22. /// <summary>
  23. /// The logger.
  24. /// </summary>
  25. private readonly ILogger<WebSocketConnection> _logger;
  26. /// <summary>
  27. /// The json serializer options.
  28. /// </summary>
  29. private readonly JsonSerializerOptions _jsonOptions;
  30. /// <summary>
  31. /// The socket.
  32. /// </summary>
  33. private readonly WebSocket _socket;
  34. private bool _disposed = false;
  35. /// <summary>
  36. /// Initializes a new instance of the <see cref="WebSocketConnection" /> class.
  37. /// </summary>
  38. /// <param name="logger">The logger.</param>
  39. /// <param name="socket">The socket.</param>
  40. /// <param name="remoteEndPoint">The remote end point.</param>
  41. public WebSocketConnection(
  42. ILogger<WebSocketConnection> logger,
  43. WebSocket socket,
  44. IPAddress? remoteEndPoint)
  45. {
  46. _logger = logger;
  47. _socket = socket;
  48. RemoteEndPoint = remoteEndPoint;
  49. _jsonOptions = JsonDefaults.Options;
  50. LastActivityDate = DateTime.Now;
  51. }
  52. /// <inheritdoc />
  53. public event EventHandler<EventArgs>? Closed;
  54. /// <summary>
  55. /// Gets the remote end point.
  56. /// </summary>
  57. public IPAddress? RemoteEndPoint { get; }
  58. /// <summary>
  59. /// Gets or sets the receive action.
  60. /// </summary>
  61. /// <value>The receive action.</value>
  62. public Func<WebSocketMessageInfo, Task>? OnReceive { get; set; }
  63. /// <summary>
  64. /// Gets the last activity date.
  65. /// </summary>
  66. /// <value>The last activity date.</value>
  67. public DateTime LastActivityDate { get; private set; }
  68. /// <inheritdoc />
  69. public DateTime LastKeepAliveDate { get; set; }
  70. /// <summary>
  71. /// Gets the state.
  72. /// </summary>
  73. /// <value>The state.</value>
  74. public WebSocketState State => _socket.State;
  75. /// <summary>
  76. /// Sends a message asynchronously.
  77. /// </summary>
  78. /// <param name="message">The message.</param>
  79. /// <param name="cancellationToken">The cancellation token.</param>
  80. /// <returns>Task.</returns>
  81. public Task SendAsync(WebSocketMessage message, CancellationToken cancellationToken)
  82. {
  83. var json = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions);
  84. return _socket.SendAsync(json, WebSocketMessageType.Text, true, cancellationToken);
  85. }
  86. /// <summary>
  87. /// Sends a message asynchronously.
  88. /// </summary>
  89. /// <typeparam name="T">The type of the message.</typeparam>
  90. /// <param name="message">The message.</param>
  91. /// <param name="cancellationToken">The cancellation token.</param>
  92. /// <returns>Task.</returns>
  93. public Task SendAsync<T>(WebSocketMessage<T> message, CancellationToken cancellationToken)
  94. {
  95. var json = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions);
  96. return _socket.SendAsync(json, WebSocketMessageType.Text, true, cancellationToken);
  97. }
  98. /// <inheritdoc />
  99. public async Task ProcessAsync(CancellationToken cancellationToken = default)
  100. {
  101. var pipe = new Pipe();
  102. var writer = pipe.Writer;
  103. ValueWebSocketReceiveResult receiveresult;
  104. do
  105. {
  106. // Allocate at least 512 bytes from the PipeWriter
  107. Memory<byte> memory = writer.GetMemory(512);
  108. try
  109. {
  110. receiveresult = await _socket.ReceiveAsync(memory, cancellationToken).ConfigureAwait(false);
  111. }
  112. catch (WebSocketException ex)
  113. {
  114. _logger.LogWarning("WS {IP} error receiving data: {Message}", RemoteEndPoint, ex.Message);
  115. break;
  116. }
  117. int bytesRead = receiveresult.Count;
  118. if (bytesRead == 0)
  119. {
  120. break;
  121. }
  122. // Tell the PipeWriter how much was read from the Socket
  123. writer.Advance(bytesRead);
  124. // Make the data available to the PipeReader
  125. FlushResult flushResult = await writer.FlushAsync(cancellationToken).ConfigureAwait(false);
  126. if (flushResult.IsCompleted)
  127. {
  128. // The PipeReader stopped reading
  129. break;
  130. }
  131. LastActivityDate = DateTime.UtcNow;
  132. if (receiveresult.EndOfMessage)
  133. {
  134. await ProcessInternal(pipe.Reader).ConfigureAwait(false);
  135. }
  136. }
  137. while ((_socket.State == WebSocketState.Open || _socket.State == WebSocketState.Connecting)
  138. && receiveresult.MessageType != WebSocketMessageType.Close);
  139. Closed?.Invoke(this, EventArgs.Empty);
  140. if (_socket.State == WebSocketState.Open
  141. || _socket.State == WebSocketState.CloseReceived
  142. || _socket.State == WebSocketState.CloseSent)
  143. {
  144. await _socket.CloseAsync(
  145. WebSocketCloseStatus.NormalClosure,
  146. string.Empty,
  147. cancellationToken).ConfigureAwait(false);
  148. }
  149. }
  150. private async Task ProcessInternal(PipeReader reader)
  151. {
  152. ReadResult result = await reader.ReadAsync().ConfigureAwait(false);
  153. ReadOnlySequence<byte> buffer = result.Buffer;
  154. if (OnReceive is null)
  155. {
  156. // Tell the PipeReader how much of the buffer we have consumed
  157. reader.AdvanceTo(buffer.End);
  158. return;
  159. }
  160. WebSocketMessage<object>? stub;
  161. long bytesConsumed;
  162. try
  163. {
  164. stub = DeserializeWebSocketMessage(buffer, out bytesConsumed);
  165. }
  166. catch (JsonException ex)
  167. {
  168. // Tell the PipeReader how much of the buffer we have consumed
  169. reader.AdvanceTo(buffer.End);
  170. _logger.LogError(ex, "Error processing web socket message: {Data}", Encoding.UTF8.GetString(buffer));
  171. return;
  172. }
  173. if (stub is null)
  174. {
  175. _logger.LogError("Error processing web socket message");
  176. return;
  177. }
  178. // Tell the PipeReader how much of the buffer we have consumed
  179. reader.AdvanceTo(buffer.GetPosition(bytesConsumed));
  180. _logger.LogDebug("WS {IP} received message: {@Message}", RemoteEndPoint, stub);
  181. if (stub.MessageType == SessionMessageType.KeepAlive)
  182. {
  183. await SendKeepAliveResponse().ConfigureAwait(false);
  184. }
  185. else
  186. {
  187. await OnReceive(
  188. new WebSocketMessageInfo
  189. {
  190. MessageType = stub.MessageType,
  191. Data = stub.Data?.ToString(), // Data can be null
  192. Connection = this
  193. }).ConfigureAwait(false);
  194. }
  195. }
  196. internal WebSocketMessage<object>? DeserializeWebSocketMessage(ReadOnlySequence<byte> bytes, out long bytesConsumed)
  197. {
  198. var jsonReader = new Utf8JsonReader(bytes);
  199. var ret = JsonSerializer.Deserialize<WebSocketMessage<object>>(ref jsonReader, _jsonOptions);
  200. bytesConsumed = jsonReader.BytesConsumed;
  201. return ret;
  202. }
  203. private Task SendKeepAliveResponse()
  204. {
  205. LastKeepAliveDate = DateTime.UtcNow;
  206. return SendAsync(
  207. new OutboundWebSocketMessage
  208. {
  209. MessageId = Guid.NewGuid(),
  210. MessageType = SessionMessageType.KeepAlive
  211. },
  212. CancellationToken.None);
  213. }
  214. /// <inheritdoc />
  215. public void Dispose()
  216. {
  217. Dispose(true);
  218. GC.SuppressFinalize(this);
  219. }
  220. /// <summary>
  221. /// Releases unmanaged and - optionally - managed resources.
  222. /// </summary>
  223. /// <param name="dispose"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param>
  224. protected virtual void Dispose(bool dispose)
  225. {
  226. if (_disposed)
  227. {
  228. return;
  229. }
  230. if (dispose)
  231. {
  232. _socket.Dispose();
  233. }
  234. _disposed = true;
  235. }
  236. /// <inheritdoc />
  237. public async ValueTask DisposeAsync()
  238. {
  239. await DisposeAsyncCore().ConfigureAwait(false);
  240. Dispose(false);
  241. GC.SuppressFinalize(this);
  242. }
  243. /// <summary>
  244. /// Used to perform asynchronous cleanup of managed resources or for cascading calls to <see cref="DisposeAsync"/>.
  245. /// </summary>
  246. /// <returns>A ValueTask.</returns>
  247. protected virtual async ValueTask DisposeAsyncCore()
  248. {
  249. if (_socket.State == WebSocketState.Open)
  250. {
  251. await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "System Shutdown", CancellationToken.None).ConfigureAwait(false);
  252. }
  253. _socket.Dispose();
  254. }
  255. }
  256. }