Browse Source

Fix consumer count off by one when closing a browser tab with a livestream that is transcoding (#13220)

Rework Implementation
Fix review issues
Add missing nullorempty check
Fix closely related #13721
timminator 2 months ago
parent
commit
181a37a8cd

+ 78 - 9
Emby.Server.Implementations/Session/SessionManager.cs

@@ -64,6 +64,9 @@ namespace Emby.Server.Implementations.Session
         private readonly ConcurrentDictionary<string, SessionInfo> _activeConnections
             = new(StringComparer.OrdinalIgnoreCase);
 
+        private readonly ConcurrentDictionary<string, ConcurrentDictionary<string, string>> _activeLiveStreamSessions
+            = new(StringComparer.OrdinalIgnoreCase);
+
         private Timer _idleTimer;
         private Timer _inactiveTimer;
 
@@ -311,13 +314,49 @@ namespace Emby.Server.Implementations.Session
                 _activeConnections.TryRemove(key, out _);
                 if (!string.IsNullOrEmpty(session.PlayState?.LiveStreamId))
                 {
-                    await _mediaSourceManager.CloseLiveStream(session.PlayState.LiveStreamId).ConfigureAwait(false);
+                    await CloseLiveStreamIfNeededAsync(session.PlayState.LiveStreamId, session.Id).ConfigureAwait(false);
                 }
 
                 await OnSessionEnded(session).ConfigureAwait(false);
             }
         }
 
+        /// <inheritdoc />
+        public async Task CloseLiveStreamIfNeededAsync(string liveStreamId, string sessionIdOrPlaySessionId)
+        {
+            bool liveStreamNeedsToBeClosed = false;
+
+            if (_activeLiveStreamSessions.TryGetValue(liveStreamId, out var activeSessionMappings))
+            {
+                if (activeSessionMappings.TryRemove(sessionIdOrPlaySessionId, out var correspondingId))
+                {
+                    if (!string.IsNullOrEmpty(correspondingId))
+                    {
+                        activeSessionMappings.TryRemove(correspondingId, out _);
+                    }
+
+                    liveStreamNeedsToBeClosed = true;
+                }
+
+                if (activeSessionMappings.IsEmpty)
+                {
+                    _activeLiveStreamSessions.TryRemove(liveStreamId, out _);
+                }
+            }
+
+            if (liveStreamNeedsToBeClosed)
+            {
+                try
+                {
+                    await _mediaSourceManager.CloseLiveStream(liveStreamId).ConfigureAwait(false);
+                }
+                catch (Exception ex)
+                {
+                    _logger.LogError(ex, "Error closing live stream");
+                }
+            }
+        }
+
         /// <inheritdoc />
         public async ValueTask ReportSessionEnded(string sessionId)
         {
@@ -737,6 +776,11 @@ namespace Emby.Server.Implementations.Session
                 }
             }
 
+            if (!string.IsNullOrEmpty(info.LiveStreamId))
+            {
+                UpdateLiveStreamActiveSessionMappings(info.LiveStreamId, info.SessionId, info.PlaySessionId);
+            }
+
             var eventArgs = new PlaybackStartEventArgs
             {
                 Item = libraryItem,
@@ -794,6 +838,32 @@ namespace Emby.Server.Implementations.Session
             return OnPlaybackProgress(info, false);
         }
 
+        private void UpdateLiveStreamActiveSessionMappings(string liveStreamId, string sessionId, string playSessionId)
+        {
+            var activeSessionMappings = _activeLiveStreamSessions.GetOrAdd(liveStreamId, _ => new ConcurrentDictionary<string, string>());
+
+            if (!string.IsNullOrEmpty(playSessionId))
+            {
+                if (!activeSessionMappings.TryGetValue(sessionId, out var currentPlaySessionId) || currentPlaySessionId != playSessionId)
+                {
+                    if (!string.IsNullOrEmpty(currentPlaySessionId))
+                    {
+                        activeSessionMappings.TryRemove(currentPlaySessionId, out _);
+                    }
+
+                    activeSessionMappings[sessionId] = playSessionId;
+                    activeSessionMappings[playSessionId] = sessionId;
+                }
+            }
+            else
+            {
+                if (!activeSessionMappings.TryGetValue(sessionId, out _))
+                {
+                    activeSessionMappings[sessionId] = string.Empty;
+                }
+            }
+        }
+
         /// <summary>
         /// Used to report playback progress for an item.
         /// </summary>
@@ -834,6 +904,11 @@ namespace Emby.Server.Implementations.Session
                 }
             }
 
+            if (!string.IsNullOrEmpty(info.LiveStreamId))
+            {
+                UpdateLiveStreamActiveSessionMappings(info.LiveStreamId, info.SessionId, info.PlaySessionId);
+            }
+
             var eventArgs = new PlaybackProgressEventArgs
             {
                 Item = libraryItem,
@@ -1016,14 +1091,7 @@ namespace Emby.Server.Implementations.Session
 
             if (!string.IsNullOrEmpty(info.LiveStreamId))
             {
-                try
-                {
-                    await _mediaSourceManager.CloseLiveStream(info.LiveStreamId).ConfigureAwait(false);
-                }
-                catch (Exception ex)
-                {
-                    _logger.LogError(ex, "Error closing live stream");
-                }
+                await CloseLiveStreamIfNeededAsync(info.LiveStreamId, session.Id).ConfigureAwait(false);
             }
 
             var eventArgs = new PlaybackStopEventArgs
@@ -2071,6 +2139,7 @@ namespace Emby.Server.Implementations.Session
             }
 
             _activeConnections.Clear();
+            _activeLiveStreamSessions.Clear();
         }
     }
 }

+ 8 - 0
MediaBrowser.Controller/Session/ISessionManager.cs

@@ -342,5 +342,13 @@ namespace MediaBrowser.Controller.Session
         Task RevokeUserTokens(Guid userId, string currentAccessToken);
 
         Task CloseIfNeededAsync(SessionInfo session);
+
+        /// <summary>
+        /// Used to close the livestream if needed.
+        /// </summary>
+        /// <param name="liveStreamId">The livestream id.</param>
+        /// <param name="sessionIdOrPlaySessionId">The session id or playsession id.</param>
+        /// <returns>Task.</returns>
+        Task CloseLiveStreamIfNeededAsync(string liveStreamId, string sessionIdOrPlaySessionId);
     }
 }

+ 1 - 8
MediaBrowser.MediaEncoding/Transcoding/TranscodeManager.cs

@@ -242,14 +242,7 @@ public sealed class TranscodeManager : ITranscodeManager, IDisposable
 
         if (closeLiveStream && !string.IsNullOrWhiteSpace(job.LiveStreamId))
         {
-            try
-            {
-                await _mediaSourceManager.CloseLiveStream(job.LiveStreamId).ConfigureAwait(false);
-            }
-            catch (Exception ex)
-            {
-                _logger.LogError(ex, "Error closing live stream for {Path}", job.Path);
-            }
+            await _sessionManager.CloseLiveStreamIfNeededAsync(job.LiveStreamId, job.PlaySessionId).ConfigureAwait(false);
         }
     }