2
0

TranscodingJobHelper.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Diagnostics;
  4. using System.IO;
  5. using System.Linq;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using Jellyfin.Api.Models.PlaybackDtos;
  9. using MediaBrowser.Controller.Library;
  10. using MediaBrowser.Controller.MediaEncoding;
  11. using MediaBrowser.Model.IO;
  12. using Microsoft.Extensions.Logging;
  13. namespace Jellyfin.Api.Helpers
  14. {
  15. /// <summary>
  16. /// Transcoding job helpers.
  17. /// </summary>
  18. public class TranscodingJobHelper
  19. {
  20. /// <summary>
  21. /// The active transcoding jobs.
  22. /// </summary>
  23. private static readonly List<TranscodingJobDto> _activeTranscodingJobs = new List<TranscodingJobDto>();
  24. /// <summary>
  25. /// The transcoding locks.
  26. /// </summary>
  27. private static readonly Dictionary<string, SemaphoreSlim> _transcodingLocks = new Dictionary<string, SemaphoreSlim>();
  28. private readonly ILogger<TranscodingJobHelper> _logger;
  29. private readonly IMediaSourceManager _mediaSourceManager;
  30. private readonly IFileSystem _fileSystem;
  31. /// <summary>
  32. /// Initializes a new instance of the <see cref="TranscodingJobHelper"/> class.
  33. /// </summary>
  34. /// <param name="logger">Instance of the <see cref="ILogger{TranscodingJobHelpers}"/> interface.</param>
  35. /// <param name="mediaSourceManager">Instance of the <see cref="IMediaSourceManager"/> interface.</param>
  36. /// <param name="fileSystem">Instance of the <see cref="IFileSystem"/> interface.</param>
  37. public TranscodingJobHelper(
  38. ILogger<TranscodingJobHelper> logger,
  39. IMediaSourceManager mediaSourceManager,
  40. IFileSystem fileSystem)
  41. {
  42. _logger = logger;
  43. _mediaSourceManager = mediaSourceManager;
  44. _fileSystem = fileSystem;
  45. }
  46. /// <summary>
  47. /// Get transcoding job.
  48. /// </summary>
  49. /// <param name="playSessionId">Playback session id.</param>
  50. /// <returns>The transcoding job.</returns>
  51. public TranscodingJobDto GetTranscodingJob(string playSessionId)
  52. {
  53. lock (_activeTranscodingJobs)
  54. {
  55. return _activeTranscodingJobs.FirstOrDefault(j => string.Equals(j.PlaySessionId, playSessionId, StringComparison.OrdinalIgnoreCase));
  56. }
  57. }
  58. /// <summary>
  59. /// Ping transcoding job.
  60. /// </summary>
  61. /// <param name="playSessionId">Play session id.</param>
  62. /// <param name="isUserPaused">Is user paused.</param>
  63. /// <exception cref="ArgumentNullException">Play session id is null.</exception>
  64. public void PingTranscodingJob(string playSessionId, bool? isUserPaused)
  65. {
  66. if (string.IsNullOrEmpty(playSessionId))
  67. {
  68. throw new ArgumentNullException(nameof(playSessionId));
  69. }
  70. _logger.LogDebug("PingTranscodingJob PlaySessionId={0} isUsedPaused: {1}", playSessionId, isUserPaused);
  71. List<TranscodingJobDto> jobs;
  72. lock (_activeTranscodingJobs)
  73. {
  74. // This is really only needed for HLS.
  75. // Progressive streams can stop on their own reliably
  76. jobs = _activeTranscodingJobs.Where(j => string.Equals(playSessionId, j.PlaySessionId, StringComparison.OrdinalIgnoreCase)).ToList();
  77. }
  78. foreach (var job in jobs)
  79. {
  80. if (isUserPaused.HasValue)
  81. {
  82. _logger.LogDebug("Setting job.IsUserPaused to {0}. jobId: {1}", isUserPaused, job.Id);
  83. job.IsUserPaused = isUserPaused.Value;
  84. }
  85. PingTimer(job, true);
  86. }
  87. }
  88. private void PingTimer(TranscodingJobDto job, bool isProgressCheckIn)
  89. {
  90. if (job.HasExited)
  91. {
  92. job.StopKillTimer();
  93. return;
  94. }
  95. var timerDuration = 10000;
  96. if (job.Type != TranscodingJobType.Progressive)
  97. {
  98. timerDuration = 60000;
  99. }
  100. job.PingTimeout = timerDuration;
  101. job.LastPingDate = DateTime.UtcNow;
  102. // Don't start the timer for playback checkins with progressive streaming
  103. if (job.Type != TranscodingJobType.Progressive || !isProgressCheckIn)
  104. {
  105. job.StartKillTimer(OnTranscodeKillTimerStopped);
  106. }
  107. else
  108. {
  109. job.ChangeKillTimerIfStarted();
  110. }
  111. }
  112. /// <summary>
  113. /// Called when [transcode kill timer stopped].
  114. /// </summary>
  115. /// <param name="state">The state.</param>
  116. private async void OnTranscodeKillTimerStopped(object state)
  117. {
  118. var job = (TranscodingJobDto)state;
  119. if (!job.HasExited && job.Type != TranscodingJobType.Progressive)
  120. {
  121. var timeSinceLastPing = (DateTime.UtcNow - job.LastPingDate).TotalMilliseconds;
  122. if (timeSinceLastPing < job.PingTimeout)
  123. {
  124. job.StartKillTimer(OnTranscodeKillTimerStopped, job.PingTimeout);
  125. return;
  126. }
  127. }
  128. _logger.LogInformation("Transcoding kill timer stopped for JobId {0} PlaySessionId {1}. Killing transcoding", job.Id, job.PlaySessionId);
  129. await KillTranscodingJob(job, true, path => true).ConfigureAwait(false);
  130. }
  131. /// <summary>
  132. /// Kills the single transcoding job.
  133. /// </summary>
  134. /// <param name="deviceId">The device id.</param>
  135. /// <param name="playSessionId">The play session identifier.</param>
  136. /// <param name="deleteFiles">The delete files.</param>
  137. /// <returns>Task.</returns>
  138. public Task KillTranscodingJobs(string deviceId, string playSessionId, Func<string, bool> deleteFiles)
  139. {
  140. return KillTranscodingJobs(
  141. j => string.IsNullOrWhiteSpace(playSessionId)
  142. ? string.Equals(deviceId, j.DeviceId, StringComparison.OrdinalIgnoreCase)
  143. : string.Equals(playSessionId, j.PlaySessionId, StringComparison.OrdinalIgnoreCase), deleteFiles);
  144. }
  145. /// <summary>
  146. /// Kills the transcoding jobs.
  147. /// </summary>
  148. /// <param name="killJob">The kill job.</param>
  149. /// <param name="deleteFiles">The delete files.</param>
  150. /// <returns>Task.</returns>
  151. private Task KillTranscodingJobs(Func<TranscodingJobDto, bool> killJob, Func<string, bool> deleteFiles)
  152. {
  153. var jobs = new List<TranscodingJobDto>();
  154. lock (_activeTranscodingJobs)
  155. {
  156. // This is really only needed for HLS.
  157. // Progressive streams can stop on their own reliably
  158. jobs.AddRange(_activeTranscodingJobs.Where(killJob));
  159. }
  160. if (jobs.Count == 0)
  161. {
  162. return Task.CompletedTask;
  163. }
  164. IEnumerable<Task> GetKillJobs()
  165. {
  166. foreach (var job in jobs)
  167. {
  168. yield return KillTranscodingJob(job, false, deleteFiles);
  169. }
  170. }
  171. return Task.WhenAll(GetKillJobs());
  172. }
  173. /// <summary>
  174. /// Kills the transcoding job.
  175. /// </summary>
  176. /// <param name="job">The job.</param>
  177. /// <param name="closeLiveStream">if set to <c>true</c> [close live stream].</param>
  178. /// <param name="delete">The delete.</param>
  179. private async Task KillTranscodingJob(TranscodingJobDto job, bool closeLiveStream, Func<string, bool> delete)
  180. {
  181. job.DisposeKillTimer();
  182. _logger.LogDebug("KillTranscodingJob - JobId {0} PlaySessionId {1}. Killing transcoding", job.Id, job.PlaySessionId);
  183. lock (_activeTranscodingJobs)
  184. {
  185. _activeTranscodingJobs.Remove(job);
  186. if (!job.CancellationTokenSource!.IsCancellationRequested)
  187. {
  188. job.CancellationTokenSource.Cancel();
  189. }
  190. }
  191. lock (_transcodingLocks)
  192. {
  193. _transcodingLocks.Remove(job.Path!);
  194. }
  195. lock (job.ProcessLock!)
  196. {
  197. job.TranscodingThrottler?.Stop().GetAwaiter().GetResult();
  198. var process = job.Process;
  199. var hasExited = job.HasExited;
  200. if (!hasExited)
  201. {
  202. try
  203. {
  204. _logger.LogInformation("Stopping ffmpeg process with q command for {Path}", job.Path);
  205. process!.StandardInput.WriteLine("q");
  206. // Need to wait because killing is asynchronous
  207. if (!process.WaitForExit(5000))
  208. {
  209. _logger.LogInformation("Killing ffmpeg process for {Path}", job.Path);
  210. process.Kill();
  211. }
  212. }
  213. catch (InvalidOperationException)
  214. {
  215. }
  216. }
  217. }
  218. if (delete(job.Path!))
  219. {
  220. await DeletePartialStreamFiles(job.Path!, job.Type, 0, 1500).ConfigureAwait(false);
  221. }
  222. if (closeLiveStream && !string.IsNullOrWhiteSpace(job.LiveStreamId))
  223. {
  224. try
  225. {
  226. await _mediaSourceManager.CloseLiveStream(job.LiveStreamId).ConfigureAwait(false);
  227. }
  228. catch (Exception ex)
  229. {
  230. _logger.LogError(ex, "Error closing live stream for {Path}", job.Path);
  231. }
  232. }
  233. }
  234. private async Task DeletePartialStreamFiles(string path, TranscodingJobType jobType, int retryCount, int delayMs)
  235. {
  236. if (retryCount >= 10)
  237. {
  238. return;
  239. }
  240. _logger.LogInformation("Deleting partial stream file(s) {Path}", path);
  241. await Task.Delay(delayMs).ConfigureAwait(false);
  242. try
  243. {
  244. if (jobType == TranscodingJobType.Progressive)
  245. {
  246. DeleteProgressivePartialStreamFiles(path);
  247. }
  248. else
  249. {
  250. DeleteHlsPartialStreamFiles(path);
  251. }
  252. }
  253. catch (IOException ex)
  254. {
  255. _logger.LogError(ex, "Error deleting partial stream file(s) {Path}", path);
  256. await DeletePartialStreamFiles(path, jobType, retryCount + 1, 500).ConfigureAwait(false);
  257. }
  258. catch (Exception ex)
  259. {
  260. _logger.LogError(ex, "Error deleting partial stream file(s) {Path}", path);
  261. }
  262. }
  263. /// <summary>
  264. /// Deletes the progressive partial stream files.
  265. /// </summary>
  266. /// <param name="outputFilePath">The output file path.</param>
  267. private void DeleteProgressivePartialStreamFiles(string outputFilePath)
  268. {
  269. if (File.Exists(outputFilePath))
  270. {
  271. _fileSystem.DeleteFile(outputFilePath);
  272. }
  273. }
  274. /// <summary>
  275. /// Deletes the HLS partial stream files.
  276. /// </summary>
  277. /// <param name="outputFilePath">The output file path.</param>
  278. private void DeleteHlsPartialStreamFiles(string outputFilePath)
  279. {
  280. var directory = Path.GetDirectoryName(outputFilePath);
  281. var name = Path.GetFileNameWithoutExtension(outputFilePath);
  282. var filesToDelete = _fileSystem.GetFilePaths(directory)
  283. .Where(f => f.IndexOf(name, StringComparison.OrdinalIgnoreCase) != -1);
  284. List<Exception>? exs = null;
  285. foreach (var file in filesToDelete)
  286. {
  287. try
  288. {
  289. _logger.LogDebug("Deleting HLS file {0}", file);
  290. _fileSystem.DeleteFile(file);
  291. }
  292. catch (IOException ex)
  293. {
  294. (exs ??= new List<Exception>(4)).Add(ex);
  295. _logger.LogError(ex, "Error deleting HLS file {Path}", file);
  296. }
  297. }
  298. if (exs != null)
  299. {
  300. throw new AggregateException("Error deleting HLS files", exs);
  301. }
  302. }
  303. }
  304. }