WebSocketController.cs 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. #pragma warning disable CS1591
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Net.WebSockets;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using MediaBrowser.Controller.Net;
  9. using MediaBrowser.Controller.Net.WebSocketMessages;
  10. using MediaBrowser.Controller.Session;
  11. using MediaBrowser.Model.Session;
  12. using Microsoft.Extensions.Logging;
  13. namespace Emby.Server.Implementations.Session
  14. {
  15. public sealed class WebSocketController : ISessionController, IAsyncDisposable, IDisposable
  16. {
  17. private readonly ILogger<WebSocketController> _logger;
  18. private readonly ISessionManager _sessionManager;
  19. private readonly SessionInfo _session;
  20. private readonly List<IWebSocketConnection> _sockets;
  21. private readonly ReaderWriterLockSlim _socketsLock;
  22. private bool _disposed = false;
  23. public WebSocketController(
  24. ILogger<WebSocketController> logger,
  25. SessionInfo session,
  26. ISessionManager sessionManager)
  27. {
  28. _logger = logger;
  29. _session = session;
  30. _sessionManager = sessionManager;
  31. _sockets = new();
  32. _socketsLock = new();
  33. }
  34. private bool HasOpenSockets
  35. {
  36. get
  37. {
  38. ObjectDisposedException.ThrowIf(_disposed, this);
  39. try
  40. {
  41. _socketsLock.EnterReadLock();
  42. return _sockets.Any(i => i.State == WebSocketState.Open);
  43. }
  44. finally
  45. {
  46. _socketsLock.ExitReadLock();
  47. }
  48. }
  49. }
  50. /// <inheritdoc />
  51. public bool SupportsMediaControl => HasOpenSockets;
  52. /// <inheritdoc />
  53. public bool IsSessionActive => HasOpenSockets;
  54. public void AddWebSocket(IWebSocketConnection connection)
  55. {
  56. _logger.LogDebug("Adding websocket to session {Session}", _session.Id);
  57. ObjectDisposedException.ThrowIf(_disposed, this);
  58. try
  59. {
  60. _socketsLock.EnterWriteLock();
  61. _sockets.Add(connection);
  62. connection.Closed += OnConnectionClosed;
  63. }
  64. finally
  65. {
  66. _socketsLock.ExitWriteLock();
  67. }
  68. }
  69. private async void OnConnectionClosed(object? sender, EventArgs e)
  70. {
  71. var connection = sender as IWebSocketConnection ?? throw new ArgumentException($"{nameof(sender)} is not of type {nameof(IWebSocketConnection)}", nameof(sender));
  72. _logger.LogDebug("Removing websocket from session {Session}", _session.Id);
  73. ObjectDisposedException.ThrowIf(_disposed, this);
  74. try
  75. {
  76. _socketsLock.EnterWriteLock();
  77. _sockets.Remove(connection);
  78. connection.Closed -= OnConnectionClosed;
  79. }
  80. finally
  81. {
  82. _socketsLock.ExitWriteLock();
  83. }
  84. await _sessionManager.CloseIfNeededAsync(_session).ConfigureAwait(false);
  85. }
  86. /// <inheritdoc />
  87. public Task SendMessage<T>(
  88. SessionMessageType name,
  89. Guid messageId,
  90. T data,
  91. CancellationToken cancellationToken)
  92. {
  93. ObjectDisposedException.ThrowIf(_disposed, this);
  94. IWebSocketConnection? socket;
  95. try
  96. {
  97. _socketsLock.EnterReadLock();
  98. socket = _sockets.Where(i => i.State == WebSocketState.Open).MaxBy(i => i.LastActivityDate);
  99. }
  100. finally
  101. {
  102. _socketsLock.ExitReadLock();
  103. }
  104. if (socket is null)
  105. {
  106. return Task.CompletedTask;
  107. }
  108. return socket.SendAsync(
  109. new OutboundWebSocketMessage<T>
  110. {
  111. Data = data,
  112. MessageType = name,
  113. MessageId = messageId
  114. },
  115. cancellationToken);
  116. }
  117. /// <inheritdoc />
  118. public void Dispose()
  119. {
  120. if (_disposed)
  121. {
  122. return;
  123. }
  124. try
  125. {
  126. _socketsLock.EnterWriteLock();
  127. foreach (var socket in _sockets)
  128. {
  129. socket.Closed -= OnConnectionClosed;
  130. socket.Dispose();
  131. }
  132. _sockets.Clear();
  133. }
  134. finally
  135. {
  136. _socketsLock.ExitWriteLock();
  137. }
  138. _socketsLock.Dispose();
  139. _disposed = true;
  140. }
  141. public async ValueTask DisposeAsync()
  142. {
  143. if (_disposed)
  144. {
  145. return;
  146. }
  147. try
  148. {
  149. _socketsLock.EnterWriteLock();
  150. foreach (var socket in _sockets)
  151. {
  152. socket.Closed -= OnConnectionClosed;
  153. await socket.DisposeAsync().ConfigureAwait(false);
  154. }
  155. _sockets.Clear();
  156. }
  157. finally
  158. {
  159. _socketsLock.ExitWriteLock();
  160. }
  161. _socketsLock.Dispose();
  162. _disposed = true;
  163. }
  164. }
  165. }