WebSocketManager.cs 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Net.WebSockets;
  6. using System.Text;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. using MediaBrowser.Controller.Net;
  10. using MediaBrowser.Model.Net;
  11. using MediaBrowser.Model.Serialization;
  12. using Microsoft.Extensions.Logging;
  13. using UtfUnknown;
  14. namespace Emby.Server.Implementations.WebSockets
  15. {
  16. public class WebSocketManager
  17. {
  18. private readonly IWebSocketHandler[] _webSocketHandlers;
  19. private readonly IJsonSerializer _jsonSerializer;
  20. private readonly ILogger<WebSocketManager> _logger;
  21. private const int BufferSize = 4096;
  22. public WebSocketManager(IWebSocketHandler[] webSocketHandlers, IJsonSerializer jsonSerializer, ILogger<WebSocketManager> logger)
  23. {
  24. _webSocketHandlers = webSocketHandlers;
  25. _jsonSerializer = jsonSerializer;
  26. _logger = logger;
  27. }
  28. public async Task OnWebSocketConnected(WebSocket webSocket)
  29. {
  30. var taskCompletionSource = new TaskCompletionSource<bool>();
  31. var cancellationToken = new CancellationTokenSource().Token;
  32. WebSocketReceiveResult result;
  33. var message = new List<byte>();
  34. // Keep listening for incoming messages, otherwise the socket closes automatically
  35. do
  36. {
  37. var buffer = WebSocket.CreateServerBuffer(BufferSize);
  38. result = await webSocket.ReceiveAsync(buffer, cancellationToken).ConfigureAwait(false);
  39. message.AddRange(buffer.Array.Take(result.Count));
  40. if (result.EndOfMessage)
  41. {
  42. await ProcessMessage(message.ToArray(), taskCompletionSource).ConfigureAwait(false);
  43. message.Clear();
  44. }
  45. } while (!taskCompletionSource.Task.IsCompleted &&
  46. webSocket.State == WebSocketState.Open &&
  47. result.MessageType != WebSocketMessageType.Close);
  48. if (webSocket.State == WebSocketState.Open)
  49. {
  50. await webSocket.CloseAsync(
  51. result.CloseStatus ?? WebSocketCloseStatus.NormalClosure,
  52. result.CloseStatusDescription,
  53. cancellationToken).ConfigureAwait(false);
  54. }
  55. }
  56. private async Task ProcessMessage(byte[] messageBytes, TaskCompletionSource<bool> taskCompletionSource)
  57. {
  58. var charset = CharsetDetector.DetectFromBytes(messageBytes).Detected?.EncodingName;
  59. var message = string.Equals(charset, "utf-8", StringComparison.OrdinalIgnoreCase)
  60. ? Encoding.UTF8.GetString(messageBytes, 0, messageBytes.Length)
  61. : Encoding.ASCII.GetString(messageBytes, 0, messageBytes.Length);
  62. // All messages are expected to be valid JSON objects
  63. if (!message.StartsWith("{", StringComparison.OrdinalIgnoreCase))
  64. {
  65. _logger.LogDebug("Received web socket message that is not a json structure: {Message}", message);
  66. return;
  67. }
  68. try
  69. {
  70. var info = _jsonSerializer.DeserializeFromString<WebSocketMessage<object>>(message);
  71. _logger.LogDebug("Websocket message received: {0}", info.MessageType);
  72. var tasks = _webSocketHandlers.Select(handler => Task.Run(() =>
  73. {
  74. try
  75. {
  76. handler.ProcessMessage(info, taskCompletionSource).ConfigureAwait(false);
  77. }
  78. catch (Exception ex)
  79. {
  80. _logger.LogError(ex, "{HandlerType} failed processing WebSocket message {MessageType}",
  81. handler.GetType().Name, info.MessageType ?? string.Empty);
  82. }
  83. }));
  84. await Task.WhenAll(tasks);
  85. }
  86. catch (Exception ex)
  87. {
  88. _logger.LogError(ex, "Error processing web socket message");
  89. }
  90. }
  91. }
  92. }