| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516 | using System;using System.Collections.Generic;using System.Globalization;using System.Linq;using System.Threading;using System.Threading.Tasks;using MediaBrowser.Controller.Session;using MediaBrowser.Controller.SyncPlay;using MediaBrowser.Model.Session;using MediaBrowser.Model.SyncPlay;namespace Emby.Server.Implementations.SyncPlay{    /// <summary>    /// Class SyncPlayController.    /// </summary>    /// <remarks>    /// Class is not thread-safe, external locking is required when accessing methods.    /// </remarks>    public class SyncPlayController : ISyncPlayController    {        /// <summary>        /// Used to filter the sessions of a group.        /// </summary>        private enum BroadcastType        {            /// <summary>            /// All sessions will receive the message.            /// </summary>            AllGroup = 0,            /// <summary>            /// Only the specified session will receive the message.            /// </summary>            CurrentSession = 1,            /// <summary>            /// All sessions, except the current one, will receive the message.            /// </summary>            AllExceptCurrentSession = 2,            /// <summary>            /// Only sessions that are not buffering will receive the message.            /// </summary>            AllReady = 3        }        /// <summary>        /// The session manager.        /// </summary>        private readonly ISessionManager _sessionManager;        /// <summary>        /// The SyncPlay manager.        /// </summary>        private readonly ISyncPlayManager _syncPlayManager;        /// <summary>        /// The group to manage.        /// </summary>        private readonly GroupInfo _group = new GroupInfo();        /// <summary>        /// Initializes a new instance of the <see cref="SyncPlayController" /> class.        /// </summary>        /// <param name="sessionManager">The session manager.</param>        /// <param name="syncPlayManager">The SyncPlay manager.</param>        public SyncPlayController(            ISessionManager sessionManager,            ISyncPlayManager syncPlayManager)        {            _sessionManager = sessionManager;            _syncPlayManager = syncPlayManager;        }        /// <inheritdoc />        public Guid GetGroupId() => _group.GroupId;        /// <inheritdoc />        public Guid GetPlayingItemId() => _group.PlayingItem.Id;        /// <inheritdoc />        public bool IsGroupEmpty() => _group.IsEmpty();        /// <summary>        /// Converts DateTime to UTC string.        /// </summary>        /// <param name="date">The date to convert.</param>        /// <value>The UTC string.</value>        private string DateToUTCString(DateTime date)        {            return date.ToUniversalTime().ToString("o", CultureInfo.InvariantCulture);        }        /// <summary>        /// Filters sessions of this group.        /// </summary>        /// <param name="from">The current session.</param>        /// <param name="type">The filtering type.</param>        /// <value>The array of sessions matching the filter.</value>        private IEnumerable<SessionInfo> FilterSessions(SessionInfo from, BroadcastType type)        {            switch (type)            {                case BroadcastType.CurrentSession:                    return new SessionInfo[] { from };                case BroadcastType.AllGroup:                    return _group.Participants.Values                        .Select(session => session.Session);                case BroadcastType.AllExceptCurrentSession:                    return _group.Participants.Values                        .Select(session => session.Session)                        .Where(session => !session.Id.Equals(from.Id, StringComparison.Ordinal));                case BroadcastType.AllReady:                    return _group.Participants.Values                        .Where(session => !session.IsBuffering)                        .Select(session => session.Session);                default:                    return Array.Empty<SessionInfo>();            }        }        /// <summary>        /// Sends a GroupUpdate message to the interested sessions.        /// </summary>        /// <param name="from">The current session.</param>        /// <param name="type">The filtering type.</param>        /// <param name="message">The message to send.</param>        /// <param name="cancellationToken">The cancellation token.</param>        /// <value>The task.</value>        private Task SendGroupUpdate<T>(SessionInfo from, BroadcastType type, GroupUpdate<T> message, CancellationToken cancellationToken)        {            IEnumerable<Task> GetTasks()            {                foreach (var session in FilterSessions(from, type))                {                    yield return _sessionManager.SendSyncPlayGroupUpdate(session.Id, message, cancellationToken);                }            }            return Task.WhenAll(GetTasks());        }        /// <summary>        /// Sends a playback command to the interested sessions.        /// </summary>        /// <param name="from">The current session.</param>        /// <param name="type">The filtering type.</param>        /// <param name="message">The message to send.</param>        /// <param name="cancellationToken">The cancellation token.</param>        /// <value>The task.</value>        private Task SendCommand(SessionInfo from, BroadcastType type, SendCommand message, CancellationToken cancellationToken)        {            IEnumerable<Task> GetTasks()            {                foreach (var session in FilterSessions(from, type))                {                    yield return _sessionManager.SendSyncPlayCommand(session.Id, message, cancellationToken);                }            }            return Task.WhenAll(GetTasks());        }        /// <summary>        /// Builds a new playback command with some default values.        /// </summary>        /// <param name="type">The command type.</param>        /// <value>The SendCommand.</value>        private SendCommand NewSyncPlayCommand(SendCommandType type)        {            return new SendCommand()            {                GroupId = _group.GroupId.ToString(),                Command = type,                PositionTicks = _group.PositionTicks,                When = DateToUTCString(_group.LastActivity),                EmittedAt = DateToUTCString(DateTime.UtcNow)            };        }        /// <summary>        /// Builds a new group update message.        /// </summary>        /// <param name="type">The update type.</param>        /// <param name="data">The data to send.</param>        /// <value>The GroupUpdate.</value>        private GroupUpdate<T> NewSyncPlayGroupUpdate<T>(GroupUpdateType type, T data)        {            return new GroupUpdate<T>()            {                GroupId = _group.GroupId.ToString(),                Type = type,                Data = data            };        }        /// <inheritdoc />        public void CreateGroup(SessionInfo session, CancellationToken cancellationToken)        {            _group.AddSession(session);            _syncPlayManager.AddSessionToGroup(session, this);            _group.PlayingItem = session.FullNowPlayingItem;            _group.IsPaused = session.PlayState.IsPaused;            _group.PositionTicks = session.PlayState.PositionTicks ?? 0;            _group.LastActivity = DateTime.UtcNow;            var updateSession = NewSyncPlayGroupUpdate(GroupUpdateType.GroupJoined, DateToUTCString(DateTime.UtcNow));            SendGroupUpdate(session, BroadcastType.CurrentSession, updateSession, cancellationToken);        }        /// <inheritdoc />        public void SessionJoin(SessionInfo session, JoinGroupRequest request, CancellationToken cancellationToken)        {            if (session.NowPlayingItem?.Id == _group.PlayingItem.Id)            {                _group.AddSession(session);                _syncPlayManager.AddSessionToGroup(session, this);                var updateSession = NewSyncPlayGroupUpdate(GroupUpdateType.GroupJoined, DateToUTCString(DateTime.UtcNow));                SendGroupUpdate(session, BroadcastType.CurrentSession, updateSession, cancellationToken);                var updateOthers = NewSyncPlayGroupUpdate(GroupUpdateType.UserJoined, session.UserName);                SendGroupUpdate(session, BroadcastType.AllExceptCurrentSession, updateOthers, cancellationToken);                // Syncing will happen client-side                if (!_group.IsPaused)                {                    var playCommand = NewSyncPlayCommand(SendCommandType.Play);                    SendCommand(session, BroadcastType.CurrentSession, playCommand, cancellationToken);                }                else                {                    var pauseCommand = NewSyncPlayCommand(SendCommandType.Pause);                    SendCommand(session, BroadcastType.CurrentSession, pauseCommand, cancellationToken);                }            }            else            {                var playRequest = new PlayRequest                {                    ItemIds = new Guid[] { _group.PlayingItem.Id },                    StartPositionTicks = _group.PositionTicks                };                var update = NewSyncPlayGroupUpdate(GroupUpdateType.PrepareSession, playRequest);                SendGroupUpdate(session, BroadcastType.CurrentSession, update, cancellationToken);            }        }        /// <inheritdoc />        public void SessionLeave(SessionInfo session, CancellationToken cancellationToken)        {            _group.RemoveSession(session);            _syncPlayManager.RemoveSessionFromGroup(session, this);            var updateSession = NewSyncPlayGroupUpdate(GroupUpdateType.GroupLeft, _group.PositionTicks);            SendGroupUpdate(session, BroadcastType.CurrentSession, updateSession, cancellationToken);            var updateOthers = NewSyncPlayGroupUpdate(GroupUpdateType.UserLeft, session.UserName);            SendGroupUpdate(session, BroadcastType.AllExceptCurrentSession, updateOthers, cancellationToken);        }        /// <inheritdoc />        public void HandleRequest(SessionInfo session, PlaybackRequest request, CancellationToken cancellationToken)        {            // The server's job is to maintain a consistent state for clients to reference            // and notify clients of state changes. The actual syncing of media playback            // happens client side. Clients are aware of the server's time and use it to sync.            switch (request.Type)            {                case PlaybackRequestType.Play:                    HandlePlayRequest(session, request, cancellationToken);                    break;                case PlaybackRequestType.Pause:                    HandlePauseRequest(session, request, cancellationToken);                    break;                case PlaybackRequestType.Seek:                    HandleSeekRequest(session, request, cancellationToken);                    break;                case PlaybackRequestType.Buffer:                    HandleBufferingRequest(session, request, cancellationToken);                    break;                case PlaybackRequestType.Ready:                    HandleBufferingDoneRequest(session, request, cancellationToken);                    break;                case PlaybackRequestType.Ping:                    HandlePingUpdateRequest(session, request);                    break;            }        }        /// <summary>        /// Handles a play action requested by a session.        /// </summary>        /// <param name="session">The session.</param>        /// <param name="request">The play action.</param>        /// <param name="cancellationToken">The cancellation token.</param>        private void HandlePlayRequest(SessionInfo session, PlaybackRequest request, CancellationToken cancellationToken)        {            if (_group.IsPaused)            {                // Pick a suitable time that accounts for latency                var delay = _group.GetHighestPing() * 2;                delay = delay < _group.DefaultPing ? _group.DefaultPing : delay;                // Unpause group and set starting point in future                // Clients will start playback at LastActivity (datetime) from PositionTicks (playback position)                // The added delay does not guarantee, of course, that the command will be received in time                // Playback synchronization will mainly happen client side                _group.IsPaused = false;                _group.LastActivity = DateTime.UtcNow.AddMilliseconds(                    delay);                var command = NewSyncPlayCommand(SendCommandType.Play);                SendCommand(session, BroadcastType.AllGroup, command, cancellationToken);            }            else            {                // Client got lost, sending current state                var command = NewSyncPlayCommand(SendCommandType.Play);                SendCommand(session, BroadcastType.CurrentSession, command, cancellationToken);            }        }        /// <summary>        /// Handles a pause action requested by a session.        /// </summary>        /// <param name="session">The session.</param>        /// <param name="request">The pause action.</param>        /// <param name="cancellationToken">The cancellation token.</param>        private void HandlePauseRequest(SessionInfo session, PlaybackRequest request, CancellationToken cancellationToken)        {            if (!_group.IsPaused)            {                // Pause group and compute the media playback position                _group.IsPaused = true;                var currentTime = DateTime.UtcNow;                var elapsedTime = currentTime - _group.LastActivity;                _group.LastActivity = currentTime;                // Seek only if playback actually started                // Pause request may be issued during the delay added to account for latency                _group.PositionTicks += elapsedTime.Ticks > 0 ? elapsedTime.Ticks : 0;                var command = NewSyncPlayCommand(SendCommandType.Pause);                SendCommand(session, BroadcastType.AllGroup, command, cancellationToken);            }            else            {                // Client got lost, sending current state                var command = NewSyncPlayCommand(SendCommandType.Pause);                SendCommand(session, BroadcastType.CurrentSession, command, cancellationToken);            }        }        /// <summary>        /// Handles a seek action requested by a session.        /// </summary>        /// <param name="session">The session.</param>        /// <param name="request">The seek action.</param>        /// <param name="cancellationToken">The cancellation token.</param>        private void HandleSeekRequest(SessionInfo session, PlaybackRequest request, CancellationToken cancellationToken)        {            // Sanitize PositionTicks            var ticks = SanitizePositionTicks(request.PositionTicks);            // Pause and seek            _group.IsPaused = true;            _group.PositionTicks = ticks;            _group.LastActivity = DateTime.UtcNow;            var command = NewSyncPlayCommand(SendCommandType.Seek);            SendCommand(session, BroadcastType.AllGroup, command, cancellationToken);        }        /// <summary>        /// Handles a buffering action requested by a session.        /// </summary>        /// <param name="session">The session.</param>        /// <param name="request">The buffering action.</param>        /// <param name="cancellationToken">The cancellation token.</param>        private void HandleBufferingRequest(SessionInfo session, PlaybackRequest request, CancellationToken cancellationToken)        {            if (!_group.IsPaused)            {                // Pause group and compute the media playback position                _group.IsPaused = true;                var currentTime = DateTime.UtcNow;                var elapsedTime = currentTime - _group.LastActivity;                _group.LastActivity = currentTime;                _group.PositionTicks += elapsedTime.Ticks > 0 ? elapsedTime.Ticks : 0;                _group.SetBuffering(session, true);                // Send pause command to all non-buffering sessions                var command = NewSyncPlayCommand(SendCommandType.Pause);                SendCommand(session, BroadcastType.AllReady, command, cancellationToken);                var updateOthers = NewSyncPlayGroupUpdate(GroupUpdateType.GroupWait, session.UserName);                SendGroupUpdate(session, BroadcastType.AllExceptCurrentSession, updateOthers, cancellationToken);            }            else            {                // Client got lost, sending current state                var command = NewSyncPlayCommand(SendCommandType.Pause);                SendCommand(session, BroadcastType.CurrentSession, command, cancellationToken);            }        }        /// <summary>        /// Handles a buffering-done action requested by a session.        /// </summary>        /// <param name="session">The session.</param>        /// <param name="request">The buffering-done action.</param>        /// <param name="cancellationToken">The cancellation token.</param>        private void HandleBufferingDoneRequest(SessionInfo session, PlaybackRequest request, CancellationToken cancellationToken)        {            if (_group.IsPaused)            {                _group.SetBuffering(session, false);                var requestTicks = SanitizePositionTicks(request.PositionTicks);                var when = request.When ?? DateTime.UtcNow;                var currentTime = DateTime.UtcNow;                var elapsedTime = currentTime - when;                var clientPosition = TimeSpan.FromTicks(requestTicks) + elapsedTime;                var delay = _group.PositionTicks - clientPosition.Ticks;                if (_group.IsBuffering())                {                    // Others are still buffering, tell this client to pause when ready                    var command = NewSyncPlayCommand(SendCommandType.Pause);                    var pauseAtTime = currentTime.AddMilliseconds(delay);                    command.When = DateToUTCString(pauseAtTime);                    SendCommand(session, BroadcastType.CurrentSession, command, cancellationToken);                }                else                {                    // Let other clients resume as soon as the buffering client catches up                    _group.IsPaused = false;                    if (delay > _group.GetHighestPing() * 2)                    {                        // Client that was buffering is recovering, notifying others to resume                        _group.LastActivity = currentTime.AddMilliseconds(                            delay);                        var command = NewSyncPlayCommand(SendCommandType.Play);                        SendCommand(session, BroadcastType.AllExceptCurrentSession, command, cancellationToken);                    }                    else                    {                        // Client, that was buffering, resumed playback but did not update others in time                        delay = _group.GetHighestPing() * 2;                        delay = delay < _group.DefaultPing ? _group.DefaultPing : delay;                        _group.LastActivity = currentTime.AddMilliseconds(                            delay);                        var command = NewSyncPlayCommand(SendCommandType.Play);                        SendCommand(session, BroadcastType.AllGroup, command, cancellationToken);                    }                }            }            else            {                // Group was not waiting, make sure client has latest state                var command = NewSyncPlayCommand(SendCommandType.Play);                SendCommand(session, BroadcastType.CurrentSession, command, cancellationToken);            }        }        /// <summary>        /// Sanitizes the PositionTicks, considers the current playing item when available.        /// </summary>        /// <param name="positionTicks">The PositionTicks.</param>        /// <value>The sanitized PositionTicks.</value>        private long SanitizePositionTicks(long? positionTicks)        {            var ticks = positionTicks ?? 0;            ticks = ticks >= 0 ? ticks : 0;            if (_group.PlayingItem != null)            {                var runTimeTicks = _group.PlayingItem.RunTimeTicks ?? 0;                ticks = ticks > runTimeTicks ? runTimeTicks : ticks;            }            return ticks;        }        /// <summary>        /// Updates ping of a session.        /// </summary>        /// <param name="session">The session.</param>        /// <param name="request">The update.</param>        private void HandlePingUpdateRequest(SessionInfo session, PlaybackRequest request)        {            // Collected pings are used to account for network latency when unpausing playback            _group.UpdatePing(session, request.Ping ?? _group.DefaultPing);        }        /// <inheritdoc />        public GroupInfoView GetInfo()        {            return new GroupInfoView()            {                GroupId = GetGroupId().ToString(),                PlayingItemName = _group.PlayingItem.Name,                PlayingItemId = _group.PlayingItem.Id.ToString(),                PositionTicks = _group.PositionTicks,                Participants = _group.Participants.Values.Select(session => session.Session.UserName).Distinct().ToList()            };        }    }}
 |