WebSocketManager.cs 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Net.WebSockets;
  6. using System.Text;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. using MediaBrowser.Controller.Net;
  10. using MediaBrowser.Model.Net;
  11. using MediaBrowser.Model.Serialization;
  12. using Microsoft.Extensions.Logging;
  13. using UtfUnknown;
  14. namespace Emby.Server.Implementations.WebSockets
  15. {
  16. public class WebSocketManager
  17. {
  18. private readonly IWebSocketHandler[] _webSocketHandlers;
  19. private readonly IJsonSerializer _jsonSerializer;
  20. private readonly ILogger<WebSocketManager> _logger;
  21. private const int BufferSize = 4096;
  22. public WebSocketManager(IWebSocketHandler[] webSocketHandlers, IJsonSerializer jsonSerializer, ILogger<WebSocketManager> logger)
  23. {
  24. _webSocketHandlers = webSocketHandlers;
  25. _jsonSerializer = jsonSerializer;
  26. _logger = logger;
  27. }
  28. public async Task OnWebSocketConnected(WebSocket webSocket)
  29. {
  30. var taskCompletionSource = new TaskCompletionSource<bool>();
  31. var cancellationToken = new CancellationTokenSource().Token;
  32. WebSocketReceiveResult result;
  33. var message = new List<byte>();
  34. // Keep listening for incoming messages, otherwise the socket closes automatically
  35. do
  36. {
  37. var buffer = WebSocket.CreateServerBuffer(BufferSize);
  38. result = await webSocket.ReceiveAsync(buffer, cancellationToken);
  39. message.AddRange(buffer.Array.Take(result.Count));
  40. if (result.EndOfMessage)
  41. {
  42. await ProcessMessage(message.ToArray(), taskCompletionSource);
  43. message.Clear();
  44. }
  45. } while (!taskCompletionSource.Task.IsCompleted &&
  46. webSocket.State == WebSocketState.Open &&
  47. result.MessageType != WebSocketMessageType.Close);
  48. if (webSocket.State == WebSocketState.Open)
  49. {
  50. await webSocket.CloseAsync(result.CloseStatus ?? WebSocketCloseStatus.NormalClosure,
  51. result.CloseStatusDescription, cancellationToken);
  52. }
  53. }
  54. private async Task ProcessMessage(byte[] messageBytes, TaskCompletionSource<bool> taskCompletionSource)
  55. {
  56. var charset = CharsetDetector.DetectFromBytes(messageBytes).Detected?.EncodingName;
  57. var message = string.Equals(charset, "utf-8", StringComparison.OrdinalIgnoreCase)
  58. ? Encoding.UTF8.GetString(messageBytes, 0, messageBytes.Length)
  59. : Encoding.ASCII.GetString(messageBytes, 0, messageBytes.Length);
  60. // All messages are expected to be valid JSON objects
  61. if (!message.StartsWith("{", StringComparison.OrdinalIgnoreCase))
  62. {
  63. _logger.LogDebug("Received web socket message that is not a json structure: {Message}", message);
  64. return;
  65. }
  66. try
  67. {
  68. var info = _jsonSerializer.DeserializeFromString<WebSocketMessage<object>>(message);
  69. _logger.LogDebug("Websocket message received: {0}", info.MessageType);
  70. var tasks = _webSocketHandlers.Select(handler => Task.Run(() =>
  71. {
  72. try
  73. {
  74. handler.ProcessMessage(info, taskCompletionSource).ConfigureAwait(false);
  75. }
  76. catch (Exception ex)
  77. {
  78. _logger.LogError(ex, "{HandlerType} failed processing WebSocket message {MessageType}",
  79. handler.GetType().Name, info.MessageType ?? string.Empty);
  80. }
  81. }));
  82. await Task.WhenAll(tasks);
  83. }
  84. catch (Exception ex)
  85. {
  86. _logger.LogError(ex, "Error processing web socket message");
  87. }
  88. }
  89. }
  90. }