| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316 | #nullable disable#pragma warning disable CS1591, SA1306, SA1401using System;using System.Collections.Generic;using System.Globalization;using System.Linq;using System.Net.WebSockets;using System.Threading;using System.Threading.Channels;using System.Threading.Tasks;using MediaBrowser.Controller.Net.WebSocketMessages;using MediaBrowser.Model.Session;using Microsoft.AspNetCore.Http;using Microsoft.Extensions.Logging;namespace MediaBrowser.Controller.Net{    /// <summary>    /// Starts sending data over a web socket periodically when a message is received, and then stops when a corresponding stop message is received.    /// </summary>    /// <typeparam name="TReturnDataType">The type of the T return data type.</typeparam>    /// <typeparam name="TStateType">The type of the T state type.</typeparam>    public abstract class BasePeriodicWebSocketListener<TReturnDataType, TStateType> : IWebSocketListener, IAsyncDisposable        where TStateType : WebSocketListenerState, new()        where TReturnDataType : class    {        private readonly Channel<bool> _channel = Channel.CreateUnbounded<bool>(new UnboundedChannelOptions        {            AllowSynchronousContinuations = false,            SingleReader = true,            SingleWriter = false        });        private readonly Lock _activeConnectionsLock = new();        /// <summary>        /// The _active connections.        /// </summary>        private readonly List<(IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource, TStateType State)> _activeConnections = new();        /// <summary>        /// The logger.        /// </summary>        protected readonly ILogger<BasePeriodicWebSocketListener<TReturnDataType, TStateType>> Logger;        private readonly Task _messageConsumerTask;        protected BasePeriodicWebSocketListener(ILogger<BasePeriodicWebSocketListener<TReturnDataType, TStateType>> logger)        {            ArgumentNullException.ThrowIfNull(logger);            Logger = logger;            _messageConsumerTask = HandleMessages();        }        /// <summary>        /// Gets the type used for the messages sent to the client.        /// </summary>        /// <value>The type.</value>        protected abstract SessionMessageType Type { get; }        /// <summary>        /// Gets the message type received from the client to start sending messages.        /// </summary>        /// <value>The type.</value>        protected abstract SessionMessageType StartType { get; }        /// <summary>        /// Gets the message type received from the client to stop sending messages.        /// </summary>        /// <value>The type.</value>        protected abstract SessionMessageType StopType { get; }        /// <summary>        /// Gets the data to send.        /// </summary>        /// <returns>Task{`1}.</returns>        protected abstract Task<TReturnDataType> GetDataToSend();        /// <summary>        /// Gets the data to send for a specific connection.        /// </summary>        /// <param name="connection">The connection.</param>        /// <returns>Task{`1}.</returns>        protected virtual Task<TReturnDataType> GetDataToSendForConnection(IWebSocketConnection connection)        {            return GetDataToSend();        }        /// <summary>        /// Processes the message.        /// </summary>        /// <param name="message">The message.</param>        /// <returns>Task.</returns>        public Task ProcessMessageAsync(WebSocketMessageInfo message)        {            ArgumentNullException.ThrowIfNull(message);            if (message.MessageType == StartType)            {                Start(message);            }            if (message.MessageType == StopType)            {                Stop(message);            }            return Task.CompletedTask;        }        /// <inheritdoc />        public Task ProcessWebSocketConnectedAsync(IWebSocketConnection connection, HttpContext httpContext) => Task.CompletedTask;        /// <summary>        /// Starts sending messages over a web socket.        /// </summary>        /// <param name="message">The message.</param>        protected virtual void Start(WebSocketMessageInfo message)        {            var vals = message.Data.Split(',');            var dueTimeMs = long.Parse(vals[0], CultureInfo.InvariantCulture);            var periodMs = long.Parse(vals[1], CultureInfo.InvariantCulture);            var cancellationTokenSource = new CancellationTokenSource();            Logger.LogDebug("WS {1} begin transmitting to {0}", message.Connection.RemoteEndPoint, GetType().Name);            var state = new TStateType            {                IntervalMs = periodMs,                InitialDelayMs = dueTimeMs            };            lock (_activeConnectionsLock)            {                _activeConnections.Add((message.Connection, cancellationTokenSource, state));            }        }        protected void SendData(bool force)        {            _channel.Writer.TryWrite(force);        }        private async Task HandleMessages()        {            while (await _channel.Reader.WaitToReadAsync().ConfigureAwait(false))            {                while (_channel.Reader.TryRead(out var force))                {                    try                    {                        (IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource, TStateType State)[] tuples;                        var now = DateTime.UtcNow;                        lock (_activeConnectionsLock)                        {                            if (_activeConnections.Count == 0)                            {                                continue;                            }                            tuples = _activeConnections                                .Where(c =>                                {                                    if (c.Connection.State != WebSocketState.Open || c.CancellationTokenSource.IsCancellationRequested)                                    {                                        return false;                                    }                                    var state = c.State;                                    return force || (now - state.DateLastSendUtc).TotalMilliseconds >= state.IntervalMs;                                })                                .ToArray();                        }                        if (tuples.Length == 0)                        {                            continue;                        }                        IEnumerable<Task> GetTasks()                        {                            foreach (var tuple in tuples)                            {                                yield return SendDataForConnectionAsync(tuple);                            }                        }                        await Task.WhenAll(GetTasks()).ConfigureAwait(false);                    }                    catch (Exception ex)                    {                        Logger.LogError(ex, "Failed to send updates to websockets");                    }                }            }        }        private async Task SendDataForConnectionAsync((IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource, TStateType State) tuple)        {            try            {                var (connection, cts, state) = tuple;                var cancellationToken = cts.Token;                var data = await GetDataToSendForConnection(connection).ConfigureAwait(false);                if (data is null)                {                    return;                }                await connection.SendAsync(                    new OutboundWebSocketMessage<TReturnDataType> { MessageType = Type, Data = data },                    cancellationToken).ConfigureAwait(false);                state.DateLastSendUtc = DateTime.UtcNow;            }            catch (OperationCanceledException)            {                if (tuple.CancellationTokenSource.IsCancellationRequested)                {                    DisposeConnection(tuple);                }            }            catch (Exception ex)            {                Logger.LogError(ex, "Error sending web socket message {Name}", Type);                DisposeConnection(tuple);            }        }        /// <summary>        /// Stops sending messages over a web socket.        /// </summary>        /// <param name="message">The message.</param>        private void Stop(WebSocketMessageInfo message)        {            lock (_activeConnectionsLock)            {                var connection = _activeConnections.FirstOrDefault(c => c.Connection == message.Connection);                if (connection != default)                {                    DisposeConnection(connection);                }            }        }        /// <summary>        /// Disposes the connection.        /// </summary>        /// <param name="connection">The connection.</param>        private void DisposeConnection((IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource, TStateType State) connection)        {            Logger.LogDebug("WS {1} stop transmitting to {0}", connection.Connection.RemoteEndPoint, GetType().Name);            // TODO disposing the connection seems to break websockets in subtle ways, so what is the purpose of this function really...            // connection.Item1.Dispose();            try            {                connection.CancellationTokenSource.Cancel();                connection.CancellationTokenSource.Dispose();            }            catch (ObjectDisposedException ex)            {                // TODO Investigate and properly fix.                Logger.LogError(ex, "Object Disposed");            }            catch (Exception ex)            {                // TODO Investigate and properly fix.                Logger.LogError(ex, "Error disposing websocket");            }            lock (_activeConnectionsLock)            {                _activeConnections.Remove(connection);            }        }        protected virtual async ValueTask DisposeAsyncCore()        {            try            {                _channel.Writer.TryComplete();                await _messageConsumerTask.ConfigureAwait(false);            }            catch (Exception ex)            {                Logger.LogError(ex, "Disposing the message consumer failed");            }            lock (_activeConnectionsLock)            {                foreach (var connection in _activeConnections.ToList())                {                    DisposeConnection(connection);                }            }        }        /// <inheritdoc />        public async ValueTask DisposeAsync()        {            await DisposeAsyncCore().ConfigureAwait(false);            GC.SuppressFinalize(this);        }    }}
 |