WebSocketManager.cs 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Net.WebSockets;
  5. using System.Text;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using MediaBrowser.Model.Net;
  9. using MediaBrowser.Model.Serialization;
  10. using Microsoft.Extensions.Logging;
  11. using UtfUnknown;
  12. namespace Emby.Server.Implementations.WebSockets
  13. {
  14. public class WebSocketManager
  15. {
  16. private readonly IWebSocketHandler[] _webSocketHandlers;
  17. private readonly IJsonSerializer _jsonSerializer;
  18. private readonly ILogger<WebSocketManager> _logger;
  19. private const int BufferSize = 4096;
  20. public WebSocketManager(IWebSocketHandler[] webSocketHandlers, IJsonSerializer jsonSerializer, ILogger<WebSocketManager> logger)
  21. {
  22. _webSocketHandlers = webSocketHandlers;
  23. _jsonSerializer = jsonSerializer;
  24. _logger = logger;
  25. }
  26. public async Task OnWebSocketConnected(WebSocket webSocket)
  27. {
  28. var taskCompletionSource = new TaskCompletionSource<bool>();
  29. var cancellationToken = new CancellationTokenSource().Token;
  30. WebSocketReceiveResult result;
  31. var message = new List<byte>();
  32. // Keep listening for incoming messages, otherwise the socket closes automatically
  33. do
  34. {
  35. var buffer = WebSocket.CreateServerBuffer(BufferSize);
  36. result = await webSocket.ReceiveAsync(buffer, cancellationToken).ConfigureAwait(false);
  37. message.AddRange(buffer.Array.Take(result.Count));
  38. if (result.EndOfMessage)
  39. {
  40. await ProcessMessage(message.ToArray(), taskCompletionSource).ConfigureAwait(false);
  41. message.Clear();
  42. }
  43. } while (!taskCompletionSource.Task.IsCompleted &&
  44. webSocket.State == WebSocketState.Open &&
  45. result.MessageType != WebSocketMessageType.Close);
  46. if (webSocket.State == WebSocketState.Open)
  47. {
  48. await webSocket.CloseAsync(
  49. result.CloseStatus ?? WebSocketCloseStatus.NormalClosure,
  50. result.CloseStatusDescription,
  51. cancellationToken).ConfigureAwait(false);
  52. }
  53. }
  54. private async Task ProcessMessage(byte[] messageBytes, TaskCompletionSource<bool> taskCompletionSource)
  55. {
  56. var charset = CharsetDetector.DetectFromBytes(messageBytes).Detected?.EncodingName;
  57. var message = string.Equals(charset, "utf-8", StringComparison.OrdinalIgnoreCase)
  58. ? Encoding.UTF8.GetString(messageBytes, 0, messageBytes.Length)
  59. : Encoding.ASCII.GetString(messageBytes, 0, messageBytes.Length);
  60. // All messages are expected to be valid JSON objects
  61. if (!message.StartsWith("{", StringComparison.OrdinalIgnoreCase))
  62. {
  63. _logger.LogDebug("Received web socket message that is not a json structure: {Message}", message);
  64. return;
  65. }
  66. try
  67. {
  68. var info = _jsonSerializer.DeserializeFromString<WebSocketMessage<object>>(message);
  69. _logger.LogDebug("Websocket message received: {0}", info.MessageType);
  70. var tasks = _webSocketHandlers.Select(handler => Task.Run(() =>
  71. {
  72. try
  73. {
  74. handler.ProcessMessage(info, taskCompletionSource).ConfigureAwait(false);
  75. }
  76. catch (Exception ex)
  77. {
  78. _logger.LogError(ex, "{HandlerType} failed processing WebSocket message {MessageType}",
  79. handler.GetType().Name, info.MessageType ?? string.Empty);
  80. }
  81. }));
  82. await Task.WhenAll(tasks);
  83. }
  84. catch (Exception ex)
  85. {
  86. _logger.LogError(ex, "Error processing web socket message");
  87. }
  88. }
  89. }
  90. }