Pārlūkot izejas kodu

refactor: use Channels as queueing mechanism for periodic websocket messages (#11092)

Claus Vium 1 gadu atpakaļ
vecāks
revīzija
eae031ae5a

+ 2 - 5
Emby.Server.Implementations/Session/SessionManager.cs

@@ -456,8 +456,8 @@ namespace Emby.Server.Implementations.Session
 
             if (!_activeConnections.TryGetValue(key, out var sessionInfo))
             {
-                _activeConnections[key] = await CreateSession(key, appName, appVersion, deviceId, deviceName, remoteEndPoint, user).ConfigureAwait(false);
-                sessionInfo = _activeConnections[key];
+                sessionInfo = await CreateSession(key, appName, appVersion, deviceId, deviceName, remoteEndPoint, user).ConfigureAwait(false);
+                _activeConnections[key] = sessionInfo;
             }
 
             sessionInfo.UserId = user?.Id ?? Guid.Empty;
@@ -614,9 +614,6 @@ namespace Emby.Server.Implementations.Session
                         _logger.LogDebug(ex, "Error calling OnPlaybackStopped");
                     }
                 }
-
-                playingSessions = Sessions.Where(i => i.NowPlayingItem is not null)
-                    .ToList();
             }
             else
             {

+ 8 - 5
Jellyfin.Api/WebSocketListeners/ActivityLogWebSocketListener.cs

@@ -20,6 +20,8 @@ public class ActivityLogWebSocketListener : BasePeriodicWebSocketListener<Activi
     /// </summary>
     private readonly IActivityManager _activityManager;
 
+    private bool _disposed;
+
     /// <summary>
     /// Initializes a new instance of the <see cref="ActivityLogWebSocketListener"/> class.
     /// </summary>
@@ -51,14 +53,15 @@ public class ActivityLogWebSocketListener : BasePeriodicWebSocketListener<Activi
     }
 
     /// <inheritdoc />
-    protected override void Dispose(bool dispose)
+    protected override async ValueTask DisposeAsyncCore()
     {
-        if (dispose)
+        if (!_disposed)
         {
             _activityManager.EntryCreated -= OnEntryCreated;
+            _disposed = true;
         }
 
-        base.Dispose(dispose);
+        await base.DisposeAsyncCore().ConfigureAwait(false);
     }
 
     /// <summary>
@@ -75,8 +78,8 @@ public class ActivityLogWebSocketListener : BasePeriodicWebSocketListener<Activi
         base.Start(message);
     }
 
-    private async void OnEntryCreated(object? sender, GenericEventArgs<ActivityLogEntry> e)
+    private void OnEntryCreated(object? sender, GenericEventArgs<ActivityLogEntry> e)
     {
-        await SendData(true).ConfigureAwait(false);
+        SendData(true);
     }
 }

+ 12 - 9
Jellyfin.Api/WebSocketListeners/ScheduledTasksWebSocketListener.cs

@@ -20,6 +20,8 @@ public class ScheduledTasksWebSocketListener : BasePeriodicWebSocketListener<IEn
     /// <value>The task manager.</value>
     private readonly ITaskManager _taskManager;
 
+    private bool _disposed;
+
     /// <summary>
     /// Initializes a new instance of the <see cref="ScheduledTasksWebSocketListener"/> class.
     /// </summary>
@@ -56,31 +58,32 @@ public class ScheduledTasksWebSocketListener : BasePeriodicWebSocketListener<IEn
     }
 
     /// <inheritdoc />
-    protected override void Dispose(bool dispose)
+    protected override async ValueTask DisposeAsyncCore()
     {
-        if (dispose)
+        if (!_disposed)
         {
             _taskManager.TaskExecuting -= OnTaskExecuting;
             _taskManager.TaskCompleted -= OnTaskCompleted;
+            _disposed = true;
         }
 
-        base.Dispose(dispose);
+        await base.DisposeAsyncCore().ConfigureAwait(false);
     }
 
-    private async void OnTaskCompleted(object? sender, TaskCompletionEventArgs e)
+    private void OnTaskCompleted(object? sender, TaskCompletionEventArgs e)
     {
         e.Task.TaskProgress -= OnTaskProgress;
-        await SendData(true).ConfigureAwait(false);
+        SendData(true);
     }
 
-    private async void OnTaskExecuting(object? sender, GenericEventArgs<IScheduledTaskWorker> e)
+    private void OnTaskExecuting(object? sender, GenericEventArgs<IScheduledTaskWorker> e)
     {
-        await SendData(true).ConfigureAwait(false);
+        SendData(true);
         e.Argument.TaskProgress += OnTaskProgress;
     }
 
-    private async void OnTaskProgress(object? sender, GenericEventArgs<double> e)
+    private void OnTaskProgress(object? sender, GenericEventArgs<double> e)
     {
-        await SendData(false).ConfigureAwait(false);
+        SendData(false);
     }
 }

+ 19 - 17
Jellyfin.Api/WebSocketListeners/SessionInfoWebSocketListener.cs

@@ -16,6 +16,7 @@ namespace Jellyfin.Api.WebSocketListeners;
 public class SessionInfoWebSocketListener : BasePeriodicWebSocketListener<IEnumerable<SessionInfo>, WebSocketListenerState>
 {
     private readonly ISessionManager _sessionManager;
+    private bool _disposed;
 
     /// <summary>
     /// Initializes a new instance of the <see cref="SessionInfoWebSocketListener"/> class.
@@ -55,9 +56,9 @@ public class SessionInfoWebSocketListener : BasePeriodicWebSocketListener<IEnume
     }
 
     /// <inheritdoc />
-    protected override void Dispose(bool dispose)
+    protected override async ValueTask DisposeAsyncCore()
     {
-        if (dispose)
+        if (!_disposed)
         {
             _sessionManager.SessionStarted -= OnSessionManagerSessionStarted;
             _sessionManager.SessionEnded -= OnSessionManagerSessionEnded;
@@ -66,9 +67,10 @@ public class SessionInfoWebSocketListener : BasePeriodicWebSocketListener<IEnume
             _sessionManager.PlaybackProgress -= OnSessionManagerPlaybackProgress;
             _sessionManager.CapabilitiesChanged -= OnSessionManagerCapabilitiesChanged;
             _sessionManager.SessionActivity -= OnSessionManagerSessionActivity;
+            _disposed = true;
         }
 
-        base.Dispose(dispose);
+        await base.DisposeAsyncCore().ConfigureAwait(false);
     }
 
     /// <summary>
@@ -85,38 +87,38 @@ public class SessionInfoWebSocketListener : BasePeriodicWebSocketListener<IEnume
         base.Start(message);
     }
 
-    private async void OnSessionManagerSessionActivity(object? sender, SessionEventArgs e)
+    private void OnSessionManagerSessionActivity(object? sender, SessionEventArgs e)
     {
-        await SendData(false).ConfigureAwait(false);
+        SendData(false);
     }
 
-    private async void OnSessionManagerCapabilitiesChanged(object? sender, SessionEventArgs e)
+    private void OnSessionManagerCapabilitiesChanged(object? sender, SessionEventArgs e)
     {
-        await SendData(true).ConfigureAwait(false);
+        SendData(true);
     }
 
-    private async void OnSessionManagerPlaybackProgress(object? sender, PlaybackProgressEventArgs e)
+    private void OnSessionManagerPlaybackProgress(object? sender, PlaybackProgressEventArgs e)
     {
-        await SendData(!e.IsAutomated).ConfigureAwait(false);
+        SendData(!e.IsAutomated);
     }
 
-    private async void OnSessionManagerPlaybackStopped(object? sender, PlaybackStopEventArgs e)
+    private void OnSessionManagerPlaybackStopped(object? sender, PlaybackStopEventArgs e)
     {
-        await SendData(true).ConfigureAwait(false);
+        SendData(true);
     }
 
-    private async void OnSessionManagerPlaybackStart(object? sender, PlaybackProgressEventArgs e)
+    private void OnSessionManagerPlaybackStart(object? sender, PlaybackProgressEventArgs e)
     {
-        await SendData(true).ConfigureAwait(false);
+        SendData(true);
     }
 
-    private async void OnSessionManagerSessionEnded(object? sender, SessionEventArgs e)
+    private void OnSessionManagerSessionEnded(object? sender, SessionEventArgs e)
     {
-        await SendData(true).ConfigureAwait(false);
+        SendData(true);
     }
 
-    private async void OnSessionManagerSessionStarted(object? sender, SessionEventArgs e)
+    private void OnSessionManagerSessionStarted(object? sender, SessionEventArgs e)
     {
-        await SendData(true).ConfigureAwait(false);
+        SendData(true);
     }
 }

+ 128 - 71
MediaBrowser.Controller/Net/BasePeriodicWebSocketListener.cs

@@ -8,6 +8,7 @@ using System.Globalization;
 using System.Linq;
 using System.Net.WebSockets;
 using System.Threading;
+using System.Threading.Channels;
 using System.Threading.Tasks;
 using MediaBrowser.Controller.Net.WebSocketMessages;
 using MediaBrowser.Model.Session;
@@ -21,26 +22,38 @@ namespace MediaBrowser.Controller.Net
     /// </summary>
     /// <typeparam name="TReturnDataType">The type of the T return data type.</typeparam>
     /// <typeparam name="TStateType">The type of the T state type.</typeparam>
-    public abstract class BasePeriodicWebSocketListener<TReturnDataType, TStateType> : IWebSocketListener, IDisposable
+    public abstract class BasePeriodicWebSocketListener<TReturnDataType, TStateType> : IWebSocketListener, IAsyncDisposable
         where TStateType : WebSocketListenerState, new()
         where TReturnDataType : class
     {
+        private readonly Channel<bool> _channel = Channel.CreateUnbounded<bool>(new UnboundedChannelOptions
+        {
+            AllowSynchronousContinuations = false,
+            SingleReader = true,
+            SingleWriter = false
+        });
+
+        private readonly SemaphoreSlim _lock = new(1, 1);
+
         /// <summary>
         /// The _active connections.
         /// </summary>
-        private readonly List<Tuple<IWebSocketConnection, CancellationTokenSource, TStateType>> _activeConnections =
-            new List<Tuple<IWebSocketConnection, CancellationTokenSource, TStateType>>();
+        private readonly List<(IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource, TStateType State)> _activeConnections = new();
 
         /// <summary>
         /// The logger.
         /// </summary>
         protected readonly ILogger<BasePeriodicWebSocketListener<TReturnDataType, TStateType>> Logger;
 
+        private readonly Task _messageConsumerTask;
+
         protected BasePeriodicWebSocketListener(ILogger<BasePeriodicWebSocketListener<TReturnDataType, TStateType>> logger)
         {
             ArgumentNullException.ThrowIfNull(logger);
 
             Logger = logger;
+
+            _messageConsumerTask = HandleMessages();
         }
 
         /// <summary>
@@ -113,75 +126,103 @@ namespace MediaBrowser.Controller.Net
                 InitialDelayMs = dueTimeMs
             };
 
-            lock (_activeConnections)
+            _lock.Wait();
+            try
+            {
+                _activeConnections.Add((message.Connection, cancellationTokenSource, state));
+            }
+            finally
             {
-                _activeConnections.Add(new Tuple<IWebSocketConnection, CancellationTokenSource, TStateType>(message.Connection, cancellationTokenSource, state));
+                _lock.Release();
             }
         }
 
-        protected async Task SendData(bool force)
+        protected void SendData(bool force)
         {
-            Tuple<IWebSocketConnection, CancellationTokenSource, TStateType>[] tuples;
+            _channel.Writer.TryWrite(force);
+        }
 
-            lock (_activeConnections)
+        private async Task HandleMessages()
+        {
+            while (await _channel.Reader.WaitToReadAsync().ConfigureAwait(false))
             {
-                tuples = _activeConnections
-                    .Where(c =>
+                while (_channel.Reader.TryRead(out var force))
+                {
+                    try
                     {
-                        if (c.Item1.State == WebSocketState.Open && !c.Item2.IsCancellationRequested)
-                        {
-                            var state = c.Item3;
+                        (IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource, TStateType State)[] tuples;
 
-                            if (force || (DateTime.UtcNow - state.DateLastSendUtc).TotalMilliseconds >= state.IntervalMs)
+                        var now = DateTime.UtcNow;
+                        await _lock.WaitAsync().ConfigureAwait(false);
+                        try
+                        {
+                            if (_activeConnections.Count == 0)
                             {
-                                return true;
+                                continue;
                             }
+
+                            tuples = _activeConnections
+                                .Where(c =>
+                                {
+                                    if (c.Connection.State != WebSocketState.Open || c.CancellationTokenSource.IsCancellationRequested)
+                                    {
+                                        return false;
+                                    }
+
+                                    var state = c.State;
+                                    return force || (now - state.DateLastSendUtc).TotalMilliseconds >= state.IntervalMs;
+                                })
+                                .ToArray();
+                        }
+                        finally
+                        {
+                            _lock.Release();
                         }
 
-                        return false;
-                    })
-                    .ToArray();
-            }
+                        if (tuples.Length == 0)
+                        {
+                            continue;
+                        }
 
-            IEnumerable<Task> GetTasks()
-            {
-                foreach (var tuple in tuples)
-                {
-                    yield return SendData(tuple);
+                        var data = await GetDataToSend().ConfigureAwait(false);
+                        if (data is null)
+                        {
+                            continue;
+                        }
+
+                        IEnumerable<Task> GetTasks()
+                        {
+                            foreach (var tuple in tuples)
+                            {
+                                yield return SendDataInternal(data, tuple);
+                            }
+                        }
+
+                        await Task.WhenAll(GetTasks()).ConfigureAwait(false);
+                    }
+                    catch (Exception ex)
+                    {
+                        Logger.LogError(ex, "Failed to send updates to websockets");
+                    }
                 }
             }
-
-            await Task.WhenAll(GetTasks()).ConfigureAwait(false);
         }
 
-        private async Task SendData(Tuple<IWebSocketConnection, CancellationTokenSource, TStateType> tuple)
+        private async Task SendDataInternal(TReturnDataType data, (IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource, TStateType State) tuple)
         {
-            var connection = tuple.Item1;
-
             try
             {
-                var state = tuple.Item3;
-
-                var cancellationToken = tuple.Item2.Token;
-
-                var data = await GetDataToSend().ConfigureAwait(false);
+                var (connection, cts, state) = tuple;
+                var cancellationToken = cts.Token;
+                await connection.SendAsync(
+                    new OutboundWebSocketMessage<TReturnDataType> { MessageType = Type, Data = data },
+                    cancellationToken).ConfigureAwait(false);
 
-                if (data is not null)
-                {
-                    await connection.SendAsync(
-                        new OutboundWebSocketMessage<TReturnDataType>
-                        {
-                            MessageType = Type,
-                            Data = data
-                        },
-                        cancellationToken).ConfigureAwait(false);
-
-                    state.DateLastSendUtc = DateTime.UtcNow;
-                }
+                state.DateLastSendUtc = DateTime.UtcNow;
             }
             catch (OperationCanceledException)
             {
-                if (tuple.Item2.IsCancellationRequested)
+                if (tuple.CancellationTokenSource.IsCancellationRequested)
                 {
                     DisposeConnection(tuple);
                 }
@@ -199,32 +240,37 @@ namespace MediaBrowser.Controller.Net
         /// <param name="message">The message.</param>
         private void Stop(WebSocketMessageInfo message)
         {
-            lock (_activeConnections)
+            _lock.Wait();
+            try
             {
-                var connection = _activeConnections.FirstOrDefault(c => c.Item1 == message.Connection);
+                var connection = _activeConnections.FirstOrDefault(c => c.Connection == message.Connection);
 
-                if (connection is not null)
+                if (connection != default)
                 {
                     DisposeConnection(connection);
                 }
             }
+            finally
+            {
+                _lock.Release();
+            }
         }
 
         /// <summary>
         /// Disposes the connection.
         /// </summary>
         /// <param name="connection">The connection.</param>
-        private void DisposeConnection(Tuple<IWebSocketConnection, CancellationTokenSource, TStateType> connection)
+        private void DisposeConnection((IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource, TStateType State) connection)
         {
-            Logger.LogDebug("WS {1} stop transmitting to {0}", connection.Item1.RemoteEndPoint, GetType().Name);
+            Logger.LogDebug("WS {1} stop transmitting to {0}", connection.Connection.RemoteEndPoint, GetType().Name);
 
             // TODO disposing the connection seems to break websockets in subtle ways, so what is the purpose of this function really...
             // connection.Item1.Dispose();
 
             try
             {
-                connection.Item2.Cancel();
-                connection.Item2.Dispose();
+                connection.CancellationTokenSource.Cancel();
+                connection.CancellationTokenSource.Dispose();
             }
             catch (ObjectDisposedException ex)
             {
@@ -237,36 +283,47 @@ namespace MediaBrowser.Controller.Net
                 Logger.LogError(ex, "Error disposing websocket");
             }
 
-            lock (_activeConnections)
+            _lock.Wait();
+            try
             {
                 _activeConnections.Remove(connection);
             }
+            finally
+            {
+                _lock.Release();
+            }
         }
 
-        /// <summary>
-        /// Releases unmanaged and - optionally - managed resources.
-        /// </summary>
-        /// <param name="dispose"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param>
-        protected virtual void Dispose(bool dispose)
+        protected virtual async ValueTask DisposeAsyncCore()
         {
-            if (dispose)
+            try
+            {
+                _channel.Writer.TryComplete();
+                await _messageConsumerTask.ConfigureAwait(false);
+            }
+            catch (Exception ex)
+            {
+                Logger.LogError(ex, "Disposing the message consumer failed");
+            }
+
+            await _lock.WaitAsync().ConfigureAwait(false);
+            try
             {
-                lock (_activeConnections)
+                foreach (var connection in _activeConnections.ToArray())
                 {
-                    foreach (var connection in _activeConnections.ToArray())
-                    {
-                        DisposeConnection(connection);
-                    }
+                    DisposeConnection(connection);
                 }
             }
+            finally
+            {
+                _lock.Release();
+            }
         }
 
-        /// <summary>
-        /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
-        /// </summary>
-        public void Dispose()
+        /// <inheritdoc />
+        public async ValueTask DisposeAsync()
         {
-            Dispose(true);
+            await DisposeAsyncCore().ConfigureAwait(false);
             GC.SuppressFinalize(this);
         }
     }