TranscodingJobHelper.cs 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Diagnostics;
  4. using System.Globalization;
  5. using System.IO;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Text.Json;
  9. using System.Threading;
  10. using System.Threading.Tasks;
  11. using Jellyfin.Api.Models.PlaybackDtos;
  12. using Jellyfin.Api.Models.StreamingDtos;
  13. using Jellyfin.Data.Enums;
  14. using MediaBrowser.Common.Configuration;
  15. using MediaBrowser.Controller.Configuration;
  16. using MediaBrowser.Controller.Library;
  17. using MediaBrowser.Controller.MediaEncoding;
  18. using MediaBrowser.Controller.Net;
  19. using MediaBrowser.Controller.Session;
  20. using MediaBrowser.Model.Entities;
  21. using MediaBrowser.Model.IO;
  22. using MediaBrowser.Model.MediaInfo;
  23. using MediaBrowser.Model.Session;
  24. using Microsoft.AspNetCore.Http;
  25. using Microsoft.Extensions.Configuration;
  26. using Microsoft.Extensions.Logging;
  27. namespace Jellyfin.Api.Helpers
  28. {
  29. /// <summary>
  30. /// Transcoding job helpers.
  31. /// </summary>
  32. public class TranscodingJobHelper : IDisposable
  33. {
  34. /// <summary>
  35. /// The active transcoding jobs.
  36. /// </summary>
  37. private static readonly List<TranscodingJobDto> _activeTranscodingJobs = new List<TranscodingJobDto>();
  38. /// <summary>
  39. /// The transcoding locks.
  40. /// </summary>
  41. private static readonly Dictionary<string, SemaphoreSlim> _transcodingLocks = new Dictionary<string, SemaphoreSlim>();
  42. private readonly IAuthorizationContext _authorizationContext;
  43. private readonly EncodingHelper _encodingHelper;
  44. private readonly IFileSystem _fileSystem;
  45. private readonly ILogger<TranscodingJobHelper> _logger;
  46. private readonly IMediaEncoder _mediaEncoder;
  47. private readonly IMediaSourceManager _mediaSourceManager;
  48. private readonly IServerConfigurationManager _serverConfigurationManager;
  49. private readonly ISessionManager _sessionManager;
  50. private readonly ILoggerFactory _loggerFactory;
  51. /// <summary>
  52. /// Initializes a new instance of the <see cref="TranscodingJobHelper"/> class.
  53. /// </summary>
  54. /// <param name="logger">Instance of the <see cref="ILogger{TranscodingJobHelpers}"/> interface.</param>
  55. /// <param name="mediaSourceManager">Instance of the <see cref="IMediaSourceManager"/> interface.</param>
  56. /// <param name="fileSystem">Instance of the <see cref="IFileSystem"/> interface.</param>
  57. /// <param name="mediaEncoder">Instance of the <see cref="IMediaEncoder"/> interface.</param>
  58. /// <param name="serverConfigurationManager">Instance of the <see cref="IServerConfigurationManager"/> interface.</param>
  59. /// <param name="sessionManager">Instance of the <see cref="ISessionManager"/> interface.</param>
  60. /// <param name="authorizationContext">Instance of the <see cref="IAuthorizationContext"/> interface.</param>
  61. /// <param name="subtitleEncoder">Instance of the <see cref="ISubtitleEncoder"/> interface.</param>
  62. /// <param name="configuration">Instance of the <see cref="IConfiguration"/> interface.</param>
  63. /// <param name="loggerFactory">Instance of the <see cref="ILoggerFactory"/> interface.</param>
  64. public TranscodingJobHelper(
  65. ILogger<TranscodingJobHelper> logger,
  66. IMediaSourceManager mediaSourceManager,
  67. IFileSystem fileSystem,
  68. IMediaEncoder mediaEncoder,
  69. IServerConfigurationManager serverConfigurationManager,
  70. ISessionManager sessionManager,
  71. IAuthorizationContext authorizationContext,
  72. ISubtitleEncoder subtitleEncoder,
  73. IConfiguration configuration,
  74. ILoggerFactory loggerFactory)
  75. {
  76. _logger = logger;
  77. _mediaSourceManager = mediaSourceManager;
  78. _fileSystem = fileSystem;
  79. _mediaEncoder = mediaEncoder;
  80. _serverConfigurationManager = serverConfigurationManager;
  81. _sessionManager = sessionManager;
  82. _authorizationContext = authorizationContext;
  83. _loggerFactory = loggerFactory;
  84. _encodingHelper = new EncodingHelper(mediaEncoder, fileSystem, subtitleEncoder, configuration);
  85. DeleteEncodedMediaCache();
  86. sessionManager!.PlaybackProgress += OnPlaybackProgress;
  87. sessionManager!.PlaybackStart += OnPlaybackProgress;
  88. }
  89. /// <summary>
  90. /// Get transcoding job.
  91. /// </summary>
  92. /// <param name="playSessionId">Playback session id.</param>
  93. /// <returns>The transcoding job.</returns>
  94. public TranscodingJobDto? GetTranscodingJob(string playSessionId)
  95. {
  96. lock (_activeTranscodingJobs)
  97. {
  98. return _activeTranscodingJobs.FirstOrDefault(j => string.Equals(j.PlaySessionId, playSessionId, StringComparison.OrdinalIgnoreCase));
  99. }
  100. }
  101. /// <summary>
  102. /// Get transcoding job.
  103. /// </summary>
  104. /// <param name="path">Path to the transcoding file.</param>
  105. /// <param name="type">The <see cref="TranscodingJobType"/>.</param>
  106. /// <returns>The transcoding job.</returns>
  107. public TranscodingJobDto? GetTranscodingJob(string path, TranscodingJobType type)
  108. {
  109. lock (_activeTranscodingJobs)
  110. {
  111. return _activeTranscodingJobs.FirstOrDefault(j => j.Type == type && string.Equals(j.Path, path, StringComparison.OrdinalIgnoreCase));
  112. }
  113. }
  114. /// <summary>
  115. /// Ping transcoding job.
  116. /// </summary>
  117. /// <param name="playSessionId">Play session id.</param>
  118. /// <param name="isUserPaused">Is user paused.</param>
  119. /// <exception cref="ArgumentNullException">Play session id is null.</exception>
  120. public void PingTranscodingJob(string playSessionId, bool? isUserPaused)
  121. {
  122. if (string.IsNullOrEmpty(playSessionId))
  123. {
  124. throw new ArgumentNullException(nameof(playSessionId));
  125. }
  126. _logger.LogDebug("PingTranscodingJob PlaySessionId={0} isUsedPaused: {1}", playSessionId, isUserPaused);
  127. List<TranscodingJobDto> jobs;
  128. lock (_activeTranscodingJobs)
  129. {
  130. // This is really only needed for HLS.
  131. // Progressive streams can stop on their own reliably.
  132. jobs = _activeTranscodingJobs.Where(j => string.Equals(playSessionId, j.PlaySessionId, StringComparison.OrdinalIgnoreCase)).ToList();
  133. }
  134. foreach (var job in jobs)
  135. {
  136. if (isUserPaused.HasValue)
  137. {
  138. _logger.LogDebug("Setting job.IsUserPaused to {0}. jobId: {1}", isUserPaused, job.Id);
  139. job.IsUserPaused = isUserPaused.Value;
  140. }
  141. PingTimer(job, true);
  142. }
  143. }
  144. private void PingTimer(TranscodingJobDto job, bool isProgressCheckIn)
  145. {
  146. if (job.HasExited)
  147. {
  148. job.StopKillTimer();
  149. return;
  150. }
  151. var timerDuration = 10000;
  152. if (job.Type != TranscodingJobType.Progressive)
  153. {
  154. timerDuration = 60000;
  155. }
  156. job.PingTimeout = timerDuration;
  157. job.LastPingDate = DateTime.UtcNow;
  158. // Don't start the timer for playback checkins with progressive streaming
  159. if (job.Type != TranscodingJobType.Progressive || !isProgressCheckIn)
  160. {
  161. job.StartKillTimer(OnTranscodeKillTimerStopped);
  162. }
  163. else
  164. {
  165. job.ChangeKillTimerIfStarted();
  166. }
  167. }
  168. /// <summary>
  169. /// Called when [transcode kill timer stopped].
  170. /// </summary>
  171. /// <param name="state">The state.</param>
  172. private async void OnTranscodeKillTimerStopped(object? state)
  173. {
  174. var job = state as TranscodingJobDto ?? throw new ArgumentException($"{nameof(state)} is not of type {nameof(TranscodingJobDto)}", nameof(state));
  175. if (!job.HasExited && job.Type != TranscodingJobType.Progressive)
  176. {
  177. var timeSinceLastPing = (DateTime.UtcNow - job.LastPingDate).TotalMilliseconds;
  178. if (timeSinceLastPing < job.PingTimeout)
  179. {
  180. job.StartKillTimer(OnTranscodeKillTimerStopped, job.PingTimeout);
  181. return;
  182. }
  183. }
  184. _logger.LogInformation("Transcoding kill timer stopped for JobId {0} PlaySessionId {1}. Killing transcoding", job.Id, job.PlaySessionId);
  185. await KillTranscodingJob(job, true, path => true).ConfigureAwait(false);
  186. }
  187. /// <summary>
  188. /// Kills the single transcoding job.
  189. /// </summary>
  190. /// <param name="deviceId">The device id.</param>
  191. /// <param name="playSessionId">The play session identifier.</param>
  192. /// <param name="deleteFiles">The delete files.</param>
  193. /// <returns>Task.</returns>
  194. public Task KillTranscodingJobs(string deviceId, string? playSessionId, Func<string, bool> deleteFiles)
  195. {
  196. return KillTranscodingJobs(
  197. j => string.IsNullOrWhiteSpace(playSessionId)
  198. ? string.Equals(deviceId, j.DeviceId, StringComparison.OrdinalIgnoreCase)
  199. : string.Equals(playSessionId, j.PlaySessionId, StringComparison.OrdinalIgnoreCase), deleteFiles);
  200. }
  201. /// <summary>
  202. /// Kills the transcoding jobs.
  203. /// </summary>
  204. /// <param name="killJob">The kill job.</param>
  205. /// <param name="deleteFiles">The delete files.</param>
  206. /// <returns>Task.</returns>
  207. private Task KillTranscodingJobs(Func<TranscodingJobDto, bool> killJob, Func<string, bool> deleteFiles)
  208. {
  209. var jobs = new List<TranscodingJobDto>();
  210. lock (_activeTranscodingJobs)
  211. {
  212. // This is really only needed for HLS.
  213. // Progressive streams can stop on their own reliably.
  214. jobs.AddRange(_activeTranscodingJobs.Where(killJob));
  215. }
  216. if (jobs.Count == 0)
  217. {
  218. return Task.CompletedTask;
  219. }
  220. IEnumerable<Task> GetKillJobs()
  221. {
  222. foreach (var job in jobs)
  223. {
  224. yield return KillTranscodingJob(job, false, deleteFiles);
  225. }
  226. }
  227. return Task.WhenAll(GetKillJobs());
  228. }
  229. /// <summary>
  230. /// Kills the transcoding job.
  231. /// </summary>
  232. /// <param name="job">The job.</param>
  233. /// <param name="closeLiveStream">if set to <c>true</c> [close live stream].</param>
  234. /// <param name="delete">The delete.</param>
  235. private async Task KillTranscodingJob(TranscodingJobDto job, bool closeLiveStream, Func<string, bool> delete)
  236. {
  237. job.DisposeKillTimer();
  238. _logger.LogDebug("KillTranscodingJob - JobId {0} PlaySessionId {1}. Killing transcoding", job.Id, job.PlaySessionId);
  239. lock (_activeTranscodingJobs)
  240. {
  241. _activeTranscodingJobs.Remove(job);
  242. if (!job.CancellationTokenSource!.IsCancellationRequested)
  243. {
  244. job.CancellationTokenSource.Cancel();
  245. }
  246. }
  247. lock (_transcodingLocks)
  248. {
  249. _transcodingLocks.Remove(job.Path!);
  250. }
  251. lock (job.ProcessLock!)
  252. {
  253. job.TranscodingThrottler?.Stop().GetAwaiter().GetResult();
  254. var process = job.Process;
  255. var hasExited = job.HasExited;
  256. if (!hasExited)
  257. {
  258. try
  259. {
  260. _logger.LogInformation("Stopping ffmpeg process with q command for {Path}", job.Path);
  261. process!.StandardInput.WriteLine("q");
  262. // Need to wait because killing is asynchronous.
  263. if (!process.WaitForExit(5000))
  264. {
  265. _logger.LogInformation("Killing FFmpeg process for {Path}", job.Path);
  266. process.Kill();
  267. }
  268. }
  269. catch (InvalidOperationException)
  270. {
  271. }
  272. }
  273. }
  274. if (delete(job.Path!))
  275. {
  276. await DeletePartialStreamFiles(job.Path!, job.Type, 0, 1500).ConfigureAwait(false);
  277. }
  278. if (closeLiveStream && !string.IsNullOrWhiteSpace(job.LiveStreamId))
  279. {
  280. try
  281. {
  282. await _mediaSourceManager.CloseLiveStream(job.LiveStreamId).ConfigureAwait(false);
  283. }
  284. catch (Exception ex)
  285. {
  286. _logger.LogError(ex, "Error closing live stream for {Path}", job.Path);
  287. }
  288. }
  289. }
  290. private async Task DeletePartialStreamFiles(string path, TranscodingJobType jobType, int retryCount, int delayMs)
  291. {
  292. if (retryCount >= 10)
  293. {
  294. return;
  295. }
  296. _logger.LogInformation("Deleting partial stream file(s) {Path}", path);
  297. await Task.Delay(delayMs).ConfigureAwait(false);
  298. try
  299. {
  300. if (jobType == TranscodingJobType.Progressive)
  301. {
  302. DeleteProgressivePartialStreamFiles(path);
  303. }
  304. else
  305. {
  306. DeleteHlsPartialStreamFiles(path);
  307. }
  308. }
  309. catch (IOException ex)
  310. {
  311. _logger.LogError(ex, "Error deleting partial stream file(s) {Path}", path);
  312. await DeletePartialStreamFiles(path, jobType, retryCount + 1, 500).ConfigureAwait(false);
  313. }
  314. catch (Exception ex)
  315. {
  316. _logger.LogError(ex, "Error deleting partial stream file(s) {Path}", path);
  317. }
  318. }
  319. /// <summary>
  320. /// Deletes the progressive partial stream files.
  321. /// </summary>
  322. /// <param name="outputFilePath">The output file path.</param>
  323. private void DeleteProgressivePartialStreamFiles(string outputFilePath)
  324. {
  325. if (File.Exists(outputFilePath))
  326. {
  327. _fileSystem.DeleteFile(outputFilePath);
  328. }
  329. }
  330. /// <summary>
  331. /// Deletes the HLS partial stream files.
  332. /// </summary>
  333. /// <param name="outputFilePath">The output file path.</param>
  334. private void DeleteHlsPartialStreamFiles(string outputFilePath)
  335. {
  336. var directory = Path.GetDirectoryName(outputFilePath);
  337. var name = Path.GetFileNameWithoutExtension(outputFilePath);
  338. var filesToDelete = _fileSystem.GetFilePaths(directory)
  339. .Where(f => f.IndexOf(name, StringComparison.OrdinalIgnoreCase) != -1);
  340. List<Exception>? exs = null;
  341. foreach (var file in filesToDelete)
  342. {
  343. try
  344. {
  345. _logger.LogDebug("Deleting HLS file {0}", file);
  346. _fileSystem.DeleteFile(file);
  347. }
  348. catch (IOException ex)
  349. {
  350. (exs ??= new List<Exception>(4)).Add(ex);
  351. _logger.LogError(ex, "Error deleting HLS file {Path}", file);
  352. }
  353. }
  354. if (exs != null)
  355. {
  356. throw new AggregateException("Error deleting HLS files", exs);
  357. }
  358. }
  359. /// <summary>
  360. /// Report the transcoding progress to the session manager.
  361. /// </summary>
  362. /// <param name="job">The <see cref="TranscodingJobDto"/> of which the progress will be reported.</param>
  363. /// <param name="state">The <see cref="StreamState"/> of the current transcoding job.</param>
  364. /// <param name="transcodingPosition">The current transcoding position.</param>
  365. /// <param name="framerate">The framerate of the transcoding job.</param>
  366. /// <param name="percentComplete">The completion percentage of the transcode.</param>
  367. /// <param name="bytesTranscoded">The number of bytes transcoded.</param>
  368. /// <param name="bitRate">The bitrate of the transcoding job.</param>
  369. public void ReportTranscodingProgress(
  370. TranscodingJobDto job,
  371. StreamState state,
  372. TimeSpan? transcodingPosition,
  373. float? framerate,
  374. double? percentComplete,
  375. long? bytesTranscoded,
  376. int? bitRate)
  377. {
  378. var ticks = transcodingPosition?.Ticks;
  379. if (job != null)
  380. {
  381. job.Framerate = framerate;
  382. job.CompletionPercentage = percentComplete;
  383. job.TranscodingPositionTicks = ticks;
  384. job.BytesTranscoded = bytesTranscoded;
  385. job.BitRate = bitRate;
  386. }
  387. var deviceId = state.Request.DeviceId;
  388. if (!string.IsNullOrWhiteSpace(deviceId))
  389. {
  390. var audioCodec = state.ActualOutputAudioCodec;
  391. var videoCodec = state.ActualOutputVideoCodec;
  392. _sessionManager.ReportTranscodingInfo(deviceId, new TranscodingInfo
  393. {
  394. Bitrate = bitRate ?? state.TotalOutputBitrate,
  395. AudioCodec = audioCodec,
  396. VideoCodec = videoCodec,
  397. Container = state.OutputContainer,
  398. Framerate = framerate,
  399. CompletionPercentage = percentComplete,
  400. Width = state.OutputWidth,
  401. Height = state.OutputHeight,
  402. AudioChannels = state.OutputAudioChannels,
  403. IsAudioDirect = EncodingHelper.IsCopyCodec(state.OutputAudioCodec),
  404. IsVideoDirect = EncodingHelper.IsCopyCodec(state.OutputVideoCodec),
  405. TranscodeReasons = state.TranscodeReasons
  406. });
  407. }
  408. }
  409. /// <summary>
  410. /// Starts FFmpeg.
  411. /// </summary>
  412. /// <param name="state">The state.</param>
  413. /// <param name="outputPath">The output path.</param>
  414. /// <param name="commandLineArguments">The command line arguments for FFmpeg.</param>
  415. /// <param name="request">The <see cref="HttpRequest"/>.</param>
  416. /// <param name="transcodingJobType">The <see cref="TranscodingJobType"/>.</param>
  417. /// <param name="cancellationTokenSource">The cancellation token source.</param>
  418. /// <param name="workingDirectory">The working directory.</param>
  419. /// <returns>Task.</returns>
  420. public async Task<TranscodingJobDto> StartFfMpeg(
  421. StreamState state,
  422. string outputPath,
  423. string commandLineArguments,
  424. HttpRequest request,
  425. TranscodingJobType transcodingJobType,
  426. CancellationTokenSource cancellationTokenSource,
  427. string? workingDirectory = null)
  428. {
  429. var directory = Path.GetDirectoryName(outputPath) ?? throw new ArgumentException($"Provided path ({outputPath}) is not valid.", nameof(outputPath));
  430. Directory.CreateDirectory(directory);
  431. await AcquireResources(state, cancellationTokenSource).ConfigureAwait(false);
  432. if (state.VideoRequest != null && !EncodingHelper.IsCopyCodec(state.OutputVideoCodec))
  433. {
  434. var auth = _authorizationContext.GetAuthorizationInfo(request);
  435. if (auth.User != null && !auth.User.HasPermission(PermissionKind.EnableVideoPlaybackTranscoding))
  436. {
  437. this.OnTranscodeFailedToStart(outputPath, transcodingJobType, state);
  438. throw new ArgumentException("User does not have access to video transcoding.");
  439. }
  440. }
  441. if (string.IsNullOrEmpty(_mediaEncoder.EncoderPath))
  442. {
  443. throw new ArgumentException("FFmpeg path not set.");
  444. }
  445. var process = new Process
  446. {
  447. StartInfo = new ProcessStartInfo
  448. {
  449. WindowStyle = ProcessWindowStyle.Hidden,
  450. CreateNoWindow = true,
  451. UseShellExecute = false,
  452. // Must consume both stdout and stderr or deadlocks may occur
  453. // RedirectStandardOutput = true,
  454. RedirectStandardError = true,
  455. RedirectStandardInput = true,
  456. FileName = _mediaEncoder.EncoderPath,
  457. Arguments = commandLineArguments,
  458. WorkingDirectory = string.IsNullOrWhiteSpace(workingDirectory) ? string.Empty : workingDirectory,
  459. ErrorDialog = false
  460. },
  461. EnableRaisingEvents = true
  462. };
  463. var transcodingJob = this.OnTranscodeBeginning(
  464. outputPath,
  465. state.Request.PlaySessionId,
  466. state.MediaSource.LiveStreamId,
  467. Guid.NewGuid().ToString("N", CultureInfo.InvariantCulture),
  468. transcodingJobType,
  469. process,
  470. state.Request.DeviceId,
  471. state,
  472. cancellationTokenSource);
  473. var commandLineLogMessage = process.StartInfo.FileName + " " + process.StartInfo.Arguments;
  474. _logger.LogInformation(commandLineLogMessage);
  475. var logFilePrefix = "FFmpeg.Transcode-";
  476. if (state.VideoRequest != null
  477. && EncodingHelper.IsCopyCodec(state.OutputVideoCodec))
  478. {
  479. logFilePrefix = EncodingHelper.IsCopyCodec(state.OutputAudioCodec)
  480. ? "FFmpeg.Remux-"
  481. : "FFmpeg.DirectStream-";
  482. }
  483. var logFilePath = Path.Combine(
  484. _serverConfigurationManager.ApplicationPaths.LogDirectoryPath,
  485. $"{logFilePrefix}{DateTime.Now:yyyy-MM-dd_HH-mm-ss}_{state.Request.MediaSourceId}_{Guid.NewGuid().ToString()[..8]}.log");
  486. // FFmpeg writes debug/error info to stderr. This is useful when debugging so let's put it in the log directory.
  487. // use FileShare.None as this bypasses dotnet bug dotnet/runtime#42790 .
  488. Stream logStream = new FileStream(logFilePath, FileMode.Create, FileAccess.Write, FileShare.None, IODefaults.FileStreamBufferSize, true);
  489. var commandLineLogMessageBytes = Encoding.UTF8.GetBytes(request.Path + Environment.NewLine + Environment.NewLine + JsonSerializer.Serialize(state.MediaSource) + Environment.NewLine + Environment.NewLine + commandLineLogMessage + Environment.NewLine + Environment.NewLine);
  490. await logStream.WriteAsync(commandLineLogMessageBytes, 0, commandLineLogMessageBytes.Length, cancellationTokenSource.Token).ConfigureAwait(false);
  491. process.Exited += (sender, args) => OnFfMpegProcessExited(process, transcodingJob, state);
  492. try
  493. {
  494. process.Start();
  495. }
  496. catch (Exception ex)
  497. {
  498. _logger.LogError(ex, "Error starting FFmpeg");
  499. this.OnTranscodeFailedToStart(outputPath, transcodingJobType, state);
  500. throw;
  501. }
  502. _logger.LogDebug("Launched FFmpeg process");
  503. state.TranscodingJob = transcodingJob;
  504. // Important - don't await the log task or we won't be able to kill FFmpeg when the user stops playback
  505. _ = new JobLogger(_logger).StartStreamingLog(state, process.StandardError.BaseStream, logStream);
  506. // Wait for the file to exist before proceeding
  507. var ffmpegTargetFile = state.WaitForPath ?? outputPath;
  508. _logger.LogDebug("Waiting for the creation of {0}", ffmpegTargetFile);
  509. while (!File.Exists(ffmpegTargetFile) && !transcodingJob.HasExited)
  510. {
  511. await Task.Delay(100, cancellationTokenSource.Token).ConfigureAwait(false);
  512. }
  513. _logger.LogDebug("File {0} created or transcoding has finished", ffmpegTargetFile);
  514. if (state.IsInputVideo && transcodingJob.Type == TranscodingJobType.Progressive && !transcodingJob.HasExited)
  515. {
  516. await Task.Delay(1000, cancellationTokenSource.Token).ConfigureAwait(false);
  517. if (state.ReadInputAtNativeFramerate && !transcodingJob.HasExited)
  518. {
  519. await Task.Delay(1500, cancellationTokenSource.Token).ConfigureAwait(false);
  520. }
  521. }
  522. if (!transcodingJob.HasExited)
  523. {
  524. StartThrottler(state, transcodingJob);
  525. }
  526. _logger.LogDebug("StartFfMpeg() finished successfully");
  527. return transcodingJob;
  528. }
  529. private void StartThrottler(StreamState state, TranscodingJobDto transcodingJob)
  530. {
  531. if (EnableThrottling(state))
  532. {
  533. transcodingJob.TranscodingThrottler = state.TranscodingThrottler = new TranscodingThrottler(transcodingJob, new Logger<TranscodingThrottler>(new LoggerFactory()), _serverConfigurationManager, _fileSystem);
  534. state.TranscodingThrottler.Start();
  535. }
  536. }
  537. private bool EnableThrottling(StreamState state)
  538. {
  539. var encodingOptions = _serverConfigurationManager.GetEncodingOptions();
  540. // enable throttling when NOT using hardware acceleration
  541. if (string.IsNullOrEmpty(encodingOptions.HardwareAccelerationType))
  542. {
  543. return state.InputProtocol == MediaProtocol.File &&
  544. state.RunTimeTicks.HasValue &&
  545. state.RunTimeTicks.Value >= TimeSpan.FromMinutes(5).Ticks &&
  546. state.IsInputVideo &&
  547. state.VideoType == VideoType.VideoFile &&
  548. !EncodingHelper.IsCopyCodec(state.OutputVideoCodec);
  549. }
  550. return false;
  551. }
  552. /// <summary>
  553. /// Called when [transcode beginning].
  554. /// </summary>
  555. /// <param name="path">The path.</param>
  556. /// <param name="playSessionId">The play session identifier.</param>
  557. /// <param name="liveStreamId">The live stream identifier.</param>
  558. /// <param name="transcodingJobId">The transcoding job identifier.</param>
  559. /// <param name="type">The type.</param>
  560. /// <param name="process">The process.</param>
  561. /// <param name="deviceId">The device id.</param>
  562. /// <param name="state">The state.</param>
  563. /// <param name="cancellationTokenSource">The cancellation token source.</param>
  564. /// <returns>TranscodingJob.</returns>
  565. public TranscodingJobDto OnTranscodeBeginning(
  566. string path,
  567. string? playSessionId,
  568. string? liveStreamId,
  569. string transcodingJobId,
  570. TranscodingJobType type,
  571. Process process,
  572. string? deviceId,
  573. StreamState state,
  574. CancellationTokenSource cancellationTokenSource)
  575. {
  576. lock (_activeTranscodingJobs)
  577. {
  578. var job = new TranscodingJobDto(_loggerFactory.CreateLogger<TranscodingJobDto>())
  579. {
  580. Type = type,
  581. Path = path,
  582. Process = process,
  583. ActiveRequestCount = 1,
  584. DeviceId = deviceId,
  585. CancellationTokenSource = cancellationTokenSource,
  586. Id = transcodingJobId,
  587. PlaySessionId = playSessionId,
  588. LiveStreamId = liveStreamId,
  589. MediaSource = state.MediaSource
  590. };
  591. _activeTranscodingJobs.Add(job);
  592. ReportTranscodingProgress(job, state, null, null, null, null, null);
  593. return job;
  594. }
  595. }
  596. /// <summary>
  597. /// Called when [transcode end].
  598. /// </summary>
  599. /// <param name="job">The transcode job.</param>
  600. public void OnTranscodeEndRequest(TranscodingJobDto job)
  601. {
  602. job.ActiveRequestCount--;
  603. _logger.LogDebug("OnTranscodeEndRequest job.ActiveRequestCount={ActiveRequestCount}", job.ActiveRequestCount);
  604. if (job.ActiveRequestCount <= 0)
  605. {
  606. PingTimer(job, false);
  607. }
  608. }
  609. /// <summary>
  610. /// <summary>
  611. /// The progressive
  612. /// </summary>
  613. /// Called when [transcode failed to start].
  614. /// </summary>
  615. /// <param name="path">The path.</param>
  616. /// <param name="type">The type.</param>
  617. /// <param name="state">The state.</param>
  618. public void OnTranscodeFailedToStart(string path, TranscodingJobType type, StreamState state)
  619. {
  620. lock (_activeTranscodingJobs)
  621. {
  622. var job = _activeTranscodingJobs.FirstOrDefault(j => j.Type == type && string.Equals(j.Path, path, StringComparison.OrdinalIgnoreCase));
  623. if (job != null)
  624. {
  625. _activeTranscodingJobs.Remove(job);
  626. }
  627. }
  628. lock (_transcodingLocks)
  629. {
  630. _transcodingLocks.Remove(path);
  631. }
  632. if (!string.IsNullOrWhiteSpace(state.Request.DeviceId))
  633. {
  634. _sessionManager.ClearTranscodingInfo(state.Request.DeviceId);
  635. }
  636. }
  637. /// <summary>
  638. /// Processes the exited.
  639. /// </summary>
  640. /// <param name="process">The process.</param>
  641. /// <param name="job">The job.</param>
  642. /// <param name="state">The state.</param>
  643. private void OnFfMpegProcessExited(Process process, TranscodingJobDto job, StreamState state)
  644. {
  645. job.HasExited = true;
  646. _logger.LogDebug("Disposing stream resources");
  647. state.Dispose();
  648. if (process.ExitCode == 0)
  649. {
  650. _logger.LogInformation("FFmpeg exited with code 0");
  651. }
  652. else
  653. {
  654. _logger.LogError("FFmpeg exited with code {0}", process.ExitCode);
  655. }
  656. process.Dispose();
  657. }
  658. private async Task AcquireResources(StreamState state, CancellationTokenSource cancellationTokenSource)
  659. {
  660. if (state.MediaSource.RequiresOpening && string.IsNullOrWhiteSpace(state.Request.LiveStreamId))
  661. {
  662. var liveStreamResponse = await _mediaSourceManager.OpenLiveStream(
  663. new LiveStreamRequest { OpenToken = state.MediaSource.OpenToken },
  664. cancellationTokenSource.Token)
  665. .ConfigureAwait(false);
  666. var encodingOptions = _serverConfigurationManager.GetEncodingOptions();
  667. _encodingHelper.AttachMediaSourceInfo(state, encodingOptions, liveStreamResponse.MediaSource, state.RequestedUrl);
  668. if (state.VideoRequest != null)
  669. {
  670. _encodingHelper.TryStreamCopy(state);
  671. }
  672. }
  673. if (state.MediaSource.BufferMs.HasValue)
  674. {
  675. await Task.Delay(state.MediaSource.BufferMs.Value, cancellationTokenSource.Token).ConfigureAwait(false);
  676. }
  677. }
  678. /// <summary>
  679. /// Called when [transcode begin request].
  680. /// </summary>
  681. /// <param name="path">The path.</param>
  682. /// <param name="type">The type.</param>
  683. /// <returns>The <see cref="TranscodingJobDto"/>.</returns>
  684. public TranscodingJobDto? OnTranscodeBeginRequest(string path, TranscodingJobType type)
  685. {
  686. lock (_activeTranscodingJobs)
  687. {
  688. var job = _activeTranscodingJobs.FirstOrDefault(j => j.Type == type && string.Equals(j.Path, path, StringComparison.OrdinalIgnoreCase));
  689. if (job == null)
  690. {
  691. return null;
  692. }
  693. OnTranscodeBeginRequest(job);
  694. return job;
  695. }
  696. }
  697. private void OnTranscodeBeginRequest(TranscodingJobDto job)
  698. {
  699. job.ActiveRequestCount++;
  700. if (string.IsNullOrWhiteSpace(job.PlaySessionId) || job.Type == TranscodingJobType.Progressive)
  701. {
  702. job.StopKillTimer();
  703. }
  704. }
  705. /// <summary>
  706. /// Gets the transcoding lock.
  707. /// </summary>
  708. /// <param name="outputPath">The output path of the transcoded file.</param>
  709. /// <returns>A <see cref="SemaphoreSlim"/>.</returns>
  710. public SemaphoreSlim GetTranscodingLock(string outputPath)
  711. {
  712. lock (_transcodingLocks)
  713. {
  714. if (!_transcodingLocks.TryGetValue(outputPath, out SemaphoreSlim? result))
  715. {
  716. result = new SemaphoreSlim(1, 1);
  717. _transcodingLocks[outputPath] = result;
  718. }
  719. return result;
  720. }
  721. }
  722. private void OnPlaybackProgress(object? sender, PlaybackProgressEventArgs e)
  723. {
  724. if (!string.IsNullOrWhiteSpace(e.PlaySessionId))
  725. {
  726. PingTranscodingJob(e.PlaySessionId, e.IsPaused);
  727. }
  728. }
  729. /// <summary>
  730. /// Deletes the encoded media cache.
  731. /// </summary>
  732. private void DeleteEncodedMediaCache()
  733. {
  734. var path = _serverConfigurationManager.GetTranscodePath();
  735. if (!Directory.Exists(path))
  736. {
  737. return;
  738. }
  739. foreach (var file in _fileSystem.GetFilePaths(path, true))
  740. {
  741. _fileSystem.DeleteFile(file);
  742. }
  743. }
  744. /// <summary>
  745. /// Dispose transcoding job helper.
  746. /// </summary>
  747. public void Dispose()
  748. {
  749. Dispose(true);
  750. GC.SuppressFinalize(this);
  751. }
  752. /// <summary>
  753. /// Dispose throttler.
  754. /// </summary>
  755. /// <param name="disposing">Disposing.</param>
  756. protected virtual void Dispose(bool disposing)
  757. {
  758. if (disposing)
  759. {
  760. _loggerFactory.Dispose();
  761. _sessionManager!.PlaybackProgress -= OnPlaybackProgress;
  762. _sessionManager!.PlaybackStart -= OnPlaybackProgress;
  763. }
  764. }
  765. }
  766. }