TranscodingJobHelper.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Diagnostics;
  4. using System.IO;
  5. using System.Linq;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using Jellyfin.Api.Models;
  9. using Jellyfin.Api.Models.PlaybackDtos;
  10. using MediaBrowser.Controller.Library;
  11. using MediaBrowser.Controller.MediaEncoding;
  12. using MediaBrowser.Model.IO;
  13. using MediaBrowser.Model.Session;
  14. using Microsoft.Extensions.Logging;
  15. namespace Jellyfin.Api.Helpers
  16. {
  17. /// <summary>
  18. /// Transcoding job helpers.
  19. /// </summary>
  20. public class TranscodingJobHelper
  21. {
  22. /// <summary>
  23. /// The active transcoding jobs.
  24. /// </summary>
  25. private static readonly List<TranscodingJobDto> _activeTranscodingJobs = new List<TranscodingJobDto>();
  26. /// <summary>
  27. /// The transcoding locks.
  28. /// </summary>
  29. private static readonly Dictionary<string, SemaphoreSlim> _transcodingLocks = new Dictionary<string, SemaphoreSlim>();
  30. private readonly ILogger<TranscodingJobHelper> _logger;
  31. private readonly IMediaSourceManager _mediaSourceManager;
  32. private readonly IFileSystem _fileSystem;
  33. /// <summary>
  34. /// Initializes a new instance of the <see cref="TranscodingJobHelper"/> class.
  35. /// </summary>
  36. /// <param name="logger">Instance of the <see cref="ILogger{TranscodingJobHelpers}"/> interface.</param>
  37. /// <param name="mediaSourceManager">Instance of the <see cref="IMediaSourceManager"/> interface.</param>
  38. /// <param name="fileSystem">Instance of the <see cref="IFileSystem"/> interface.</param>
  39. public TranscodingJobHelper(
  40. ILogger<TranscodingJobHelper> logger,
  41. IMediaSourceManager mediaSourceManager,
  42. IFileSystem fileSystem)
  43. {
  44. _logger = logger;
  45. _mediaSourceManager = mediaSourceManager;
  46. _fileSystem = fileSystem;
  47. }
  48. /// <summary>
  49. /// Get transcoding job.
  50. /// </summary>
  51. /// <param name="playSessionId">Playback session id.</param>
  52. /// <returns>The transcoding job.</returns>
  53. public TranscodingJobDto GetTranscodingJob(string playSessionId)
  54. {
  55. lock (_activeTranscodingJobs)
  56. {
  57. return _activeTranscodingJobs.FirstOrDefault(j => string.Equals(j.PlaySessionId, playSessionId, StringComparison.OrdinalIgnoreCase));
  58. }
  59. }
  60. public static TranscodingJobDto GetTranscodingJob(string path, TranscodingJobType type)
  61. {
  62. lock (_activeTranscodingJobs)
  63. {
  64. return _activeTranscodingJobs.FirstOrDefault(j => j.Type == type && string.Equals(j.Path, path, StringComparison.OrdinalIgnoreCase));
  65. }
  66. }
  67. /// <summary>
  68. /// Ping transcoding job.
  69. /// </summary>
  70. /// <param name="playSessionId">Play session id.</param>
  71. /// <param name="isUserPaused">Is user paused.</param>
  72. /// <exception cref="ArgumentNullException">Play session id is null.</exception>
  73. public void PingTranscodingJob(string playSessionId, bool? isUserPaused)
  74. {
  75. if (string.IsNullOrEmpty(playSessionId))
  76. {
  77. throw new ArgumentNullException(nameof(playSessionId));
  78. }
  79. _logger.LogDebug("PingTranscodingJob PlaySessionId={0} isUsedPaused: {1}", playSessionId, isUserPaused);
  80. List<TranscodingJobDto> jobs;
  81. lock (_activeTranscodingJobs)
  82. {
  83. // This is really only needed for HLS.
  84. // Progressive streams can stop on their own reliably
  85. jobs = _activeTranscodingJobs.Where(j => string.Equals(playSessionId, j.PlaySessionId, StringComparison.OrdinalIgnoreCase)).ToList();
  86. }
  87. foreach (var job in jobs)
  88. {
  89. if (isUserPaused.HasValue)
  90. {
  91. _logger.LogDebug("Setting job.IsUserPaused to {0}. jobId: {1}", isUserPaused, job.Id);
  92. job.IsUserPaused = isUserPaused.Value;
  93. }
  94. PingTimer(job, true);
  95. }
  96. }
  97. private void PingTimer(TranscodingJobDto job, bool isProgressCheckIn)
  98. {
  99. if (job.HasExited)
  100. {
  101. job.StopKillTimer();
  102. return;
  103. }
  104. var timerDuration = 10000;
  105. if (job.Type != TranscodingJobType.Progressive)
  106. {
  107. timerDuration = 60000;
  108. }
  109. job.PingTimeout = timerDuration;
  110. job.LastPingDate = DateTime.UtcNow;
  111. // Don't start the timer for playback checkins with progressive streaming
  112. if (job.Type != TranscodingJobType.Progressive || !isProgressCheckIn)
  113. {
  114. job.StartKillTimer(OnTranscodeKillTimerStopped);
  115. }
  116. else
  117. {
  118. job.ChangeKillTimerIfStarted();
  119. }
  120. }
  121. /// <summary>
  122. /// Called when [transcode kill timer stopped].
  123. /// </summary>
  124. /// <param name="state">The state.</param>
  125. private async void OnTranscodeKillTimerStopped(object state)
  126. {
  127. var job = (TranscodingJobDto)state;
  128. if (!job.HasExited && job.Type != TranscodingJobType.Progressive)
  129. {
  130. var timeSinceLastPing = (DateTime.UtcNow - job.LastPingDate).TotalMilliseconds;
  131. if (timeSinceLastPing < job.PingTimeout)
  132. {
  133. job.StartKillTimer(OnTranscodeKillTimerStopped, job.PingTimeout);
  134. return;
  135. }
  136. }
  137. _logger.LogInformation("Transcoding kill timer stopped for JobId {0} PlaySessionId {1}. Killing transcoding", job.Id, job.PlaySessionId);
  138. await KillTranscodingJob(job, true, path => true).ConfigureAwait(false);
  139. }
  140. /// <summary>
  141. /// Kills the single transcoding job.
  142. /// </summary>
  143. /// <param name="deviceId">The device id.</param>
  144. /// <param name="playSessionId">The play session identifier.</param>
  145. /// <param name="deleteFiles">The delete files.</param>
  146. /// <returns>Task.</returns>
  147. public Task KillTranscodingJobs(string deviceId, string playSessionId, Func<string, bool> deleteFiles)
  148. {
  149. return KillTranscodingJobs(
  150. j => string.IsNullOrWhiteSpace(playSessionId)
  151. ? string.Equals(deviceId, j.DeviceId, StringComparison.OrdinalIgnoreCase)
  152. : string.Equals(playSessionId, j.PlaySessionId, StringComparison.OrdinalIgnoreCase), deleteFiles);
  153. }
  154. /// <summary>
  155. /// Kills the transcoding jobs.
  156. /// </summary>
  157. /// <param name="killJob">The kill job.</param>
  158. /// <param name="deleteFiles">The delete files.</param>
  159. /// <returns>Task.</returns>
  160. private Task KillTranscodingJobs(Func<TranscodingJobDto, bool> killJob, Func<string, bool> deleteFiles)
  161. {
  162. var jobs = new List<TranscodingJobDto>();
  163. lock (_activeTranscodingJobs)
  164. {
  165. // This is really only needed for HLS.
  166. // Progressive streams can stop on their own reliably
  167. jobs.AddRange(_activeTranscodingJobs.Where(killJob));
  168. }
  169. if (jobs.Count == 0)
  170. {
  171. return Task.CompletedTask;
  172. }
  173. IEnumerable<Task> GetKillJobs()
  174. {
  175. foreach (var job in jobs)
  176. {
  177. yield return KillTranscodingJob(job, false, deleteFiles);
  178. }
  179. }
  180. return Task.WhenAll(GetKillJobs());
  181. }
  182. /// <summary>
  183. /// Kills the transcoding job.
  184. /// </summary>
  185. /// <param name="job">The job.</param>
  186. /// <param name="closeLiveStream">if set to <c>true</c> [close live stream].</param>
  187. /// <param name="delete">The delete.</param>
  188. private async Task KillTranscodingJob(TranscodingJobDto job, bool closeLiveStream, Func<string, bool> delete)
  189. {
  190. job.DisposeKillTimer();
  191. _logger.LogDebug("KillTranscodingJob - JobId {0} PlaySessionId {1}. Killing transcoding", job.Id, job.PlaySessionId);
  192. lock (_activeTranscodingJobs)
  193. {
  194. _activeTranscodingJobs.Remove(job);
  195. if (!job.CancellationTokenSource!.IsCancellationRequested)
  196. {
  197. job.CancellationTokenSource.Cancel();
  198. }
  199. }
  200. lock (_transcodingLocks)
  201. {
  202. _transcodingLocks.Remove(job.Path!);
  203. }
  204. lock (job.ProcessLock!)
  205. {
  206. job.TranscodingThrottler?.Stop().GetAwaiter().GetResult();
  207. var process = job.Process;
  208. var hasExited = job.HasExited;
  209. if (!hasExited)
  210. {
  211. try
  212. {
  213. _logger.LogInformation("Stopping ffmpeg process with q command for {Path}", job.Path);
  214. process!.StandardInput.WriteLine("q");
  215. // Need to wait because killing is asynchronous
  216. if (!process.WaitForExit(5000))
  217. {
  218. _logger.LogInformation("Killing ffmpeg process for {Path}", job.Path);
  219. process.Kill();
  220. }
  221. }
  222. catch (InvalidOperationException)
  223. {
  224. }
  225. }
  226. }
  227. if (delete(job.Path!))
  228. {
  229. await DeletePartialStreamFiles(job.Path!, job.Type, 0, 1500).ConfigureAwait(false);
  230. }
  231. if (closeLiveStream && !string.IsNullOrWhiteSpace(job.LiveStreamId))
  232. {
  233. try
  234. {
  235. await _mediaSourceManager.CloseLiveStream(job.LiveStreamId).ConfigureAwait(false);
  236. }
  237. catch (Exception ex)
  238. {
  239. _logger.LogError(ex, "Error closing live stream for {Path}", job.Path);
  240. }
  241. }
  242. }
  243. private async Task DeletePartialStreamFiles(string path, TranscodingJobType jobType, int retryCount, int delayMs)
  244. {
  245. if (retryCount >= 10)
  246. {
  247. return;
  248. }
  249. _logger.LogInformation("Deleting partial stream file(s) {Path}", path);
  250. await Task.Delay(delayMs).ConfigureAwait(false);
  251. try
  252. {
  253. if (jobType == TranscodingJobType.Progressive)
  254. {
  255. DeleteProgressivePartialStreamFiles(path);
  256. }
  257. else
  258. {
  259. DeleteHlsPartialStreamFiles(path);
  260. }
  261. }
  262. catch (IOException ex)
  263. {
  264. _logger.LogError(ex, "Error deleting partial stream file(s) {Path}", path);
  265. await DeletePartialStreamFiles(path, jobType, retryCount + 1, 500).ConfigureAwait(false);
  266. }
  267. catch (Exception ex)
  268. {
  269. _logger.LogError(ex, "Error deleting partial stream file(s) {Path}", path);
  270. }
  271. }
  272. /// <summary>
  273. /// Deletes the progressive partial stream files.
  274. /// </summary>
  275. /// <param name="outputFilePath">The output file path.</param>
  276. private void DeleteProgressivePartialStreamFiles(string outputFilePath)
  277. {
  278. if (File.Exists(outputFilePath))
  279. {
  280. _fileSystem.DeleteFile(outputFilePath);
  281. }
  282. }
  283. /// <summary>
  284. /// Deletes the HLS partial stream files.
  285. /// </summary>
  286. /// <param name="outputFilePath">The output file path.</param>
  287. private void DeleteHlsPartialStreamFiles(string outputFilePath)
  288. {
  289. var directory = Path.GetDirectoryName(outputFilePath);
  290. var name = Path.GetFileNameWithoutExtension(outputFilePath);
  291. var filesToDelete = _fileSystem.GetFilePaths(directory)
  292. .Where(f => f.IndexOf(name, StringComparison.OrdinalIgnoreCase) != -1);
  293. List<Exception>? exs = null;
  294. foreach (var file in filesToDelete)
  295. {
  296. try
  297. {
  298. _logger.LogDebug("Deleting HLS file {0}", file);
  299. _fileSystem.DeleteFile(file);
  300. }
  301. catch (IOException ex)
  302. {
  303. (exs ??= new List<Exception>(4)).Add(ex);
  304. _logger.LogError(ex, "Error deleting HLS file {Path}", file);
  305. }
  306. }
  307. if (exs != null)
  308. {
  309. throw new AggregateException("Error deleting HLS files", exs);
  310. }
  311. }
  312. public void ReportTranscodingProgress(
  313. TranscodingJob job,
  314. StreamState state,
  315. TimeSpan? transcodingPosition,
  316. float? framerate,
  317. double? percentComplete,
  318. long? bytesTranscoded,
  319. int? bitRate)
  320. {
  321. var ticks = transcodingPosition?.Ticks;
  322. if (job != null)
  323. {
  324. job.Framerate = framerate;
  325. job.CompletionPercentage = percentComplete;
  326. job.TranscodingPositionTicks = ticks;
  327. job.BytesTranscoded = bytesTranscoded;
  328. job.BitRate = bitRate;
  329. }
  330. var deviceId = state.Request.DeviceId;
  331. if (!string.IsNullOrWhiteSpace(deviceId))
  332. {
  333. var audioCodec = state.ActualOutputAudioCodec;
  334. var videoCodec = state.ActualOutputVideoCodec;
  335. _sessionManager.ReportTranscodingInfo(deviceId, new TranscodingInfo
  336. {
  337. Bitrate = bitRate ?? state.TotalOutputBitrate,
  338. AudioCodec = audioCodec,
  339. VideoCodec = videoCodec,
  340. Container = state.OutputContainer,
  341. Framerate = framerate,
  342. CompletionPercentage = percentComplete,
  343. Width = state.OutputWidth,
  344. Height = state.OutputHeight,
  345. AudioChannels = state.OutputAudioChannels,
  346. IsAudioDirect = EncodingHelper.IsCopyCodec(state.OutputAudioCodec),
  347. IsVideoDirect = EncodingHelper.IsCopyCodec(state.OutputVideoCodec),
  348. TranscodeReasons = state.TranscodeReasons
  349. });
  350. }
  351. }
  352. }
  353. }