JobQueue.ts 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. import Job, { JobStatus, JobOptions } from "@/Job";
  2. import JobStatistics, { JobStatisticsType } from "./JobStatistics";
  3. import { JobDerived } from "./types/JobDerived";
  4. import assertJobDerived from "./utils/assertJobDerived";
  5. export class JobQueue {
  6. private _concurrency: number;
  7. private _isPaused: boolean;
  8. private _queue: Job[];
  9. private _active: Job[];
  10. private _processLock: boolean;
  11. private _callbacks: Record<
  12. string,
  13. {
  14. resolve: (value: unknown) => void;
  15. reject: (reason?: unknown) => void;
  16. }
  17. >;
  18. /**
  19. * Job Queue
  20. */
  21. public constructor() {
  22. this._concurrency = 50;
  23. this._isPaused = true;
  24. this._queue = [];
  25. this._active = [];
  26. this._callbacks = {};
  27. this._processLock = false;
  28. }
  29. /**
  30. * pause - Pause queue
  31. *
  32. * Pause processing of jobs in queue, active jobs will not be paused.
  33. */
  34. public pause() {
  35. this._isPaused = true;
  36. }
  37. /**
  38. * resume - Resume queue
  39. */
  40. public resume() {
  41. this._isPaused = false;
  42. this._process();
  43. }
  44. /**
  45. * runJob - Run a job
  46. */
  47. public async runJob(
  48. JobClass: Function,
  49. payload?: unknown,
  50. options?: JobOptions
  51. ): Promise<unknown> {
  52. assertJobDerived(JobClass);
  53. return new Promise<unknown>((resolve, reject) => {
  54. this.queueJob(JobClass as JobDerived, payload, options)
  55. .then(uuid => {
  56. this._callbacks[uuid] = { resolve, reject };
  57. })
  58. .catch(reject);
  59. });
  60. }
  61. /**
  62. * queueJob - Queue a job
  63. */
  64. public async queueJob(
  65. JobClass: Function,
  66. payload?: unknown,
  67. options?: JobOptions
  68. ): Promise<string> {
  69. assertJobDerived(JobClass);
  70. const job = new (JobClass as JobDerived)(payload, options);
  71. JobStatistics.updateStats(job.getPath(), JobStatisticsType.QUEUED);
  72. this._queue.push(job);
  73. this._process();
  74. return job.getUuid();
  75. }
  76. /**
  77. * process - Process queue
  78. */
  79. private async _process() {
  80. // If the process is locked, don't continue. This prevents running process at the same time which could lead to issues
  81. if (this._processLock) return;
  82. // If the queue is paused, we've reached the maximum number of active jobs, or there are no jobs in the queue, don't continue
  83. if (
  84. this._isPaused ||
  85. this._active.length >= this._concurrency ||
  86. this._queue.length === 0
  87. )
  88. return;
  89. // Lock the process function
  90. this._processLock = true;
  91. // Sort jobs based on priority, with a lower priority being preferred
  92. const jobs = this._queue.sort(
  93. (a, b) => a.getPriority() - b.getPriority()
  94. );
  95. // Loop through all jobs
  96. for (let i = 0; i < jobs.length; i += 1) {
  97. const job = jobs[i];
  98. // If the module of the job is not started, we can't run the job, so go to the next job in the queue
  99. // eslint-disable-next-line no-continue
  100. if (!job.getModule().canRunJobs()) continue;
  101. // Remove the job from the queue and add it to the active jobs array
  102. this._queue.splice(this._queue.indexOf(job), 1);
  103. // Execute the job
  104. this._active.push(job);
  105. const callback = this._callbacks[job.getUuid()];
  106. job.execute()
  107. .then(callback?.resolve)
  108. .catch(callback?.reject)
  109. .catch(() => {}) // Ignore errors, any handling required is in job or callback
  110. .finally(() => {
  111. delete this._callbacks[job.getUuid()];
  112. // If the current job is in the active jobs array, remove it, and then run the process function to run another job
  113. const activeJobIndex = this._active.indexOf(job);
  114. if (activeJobIndex > -1) {
  115. this._active.splice(activeJobIndex, 1);
  116. }
  117. this._process();
  118. });
  119. // Stop the for loop
  120. if (this._active.length >= this._concurrency) break;
  121. }
  122. // Unlock the process after the for loop is finished, so it can be run again
  123. this._processLock = false;
  124. }
  125. /**
  126. * getStatus - Get status of job queue
  127. *
  128. * @returns Job queue status
  129. */
  130. public getStatus() {
  131. return {
  132. isPaused: this._isPaused,
  133. queueLength: this._queue.length,
  134. activeLength: this._active.length,
  135. concurrency: this._concurrency
  136. };
  137. }
  138. /**
  139. * getQueueStatus - Get statistics of queued or active jobs
  140. *
  141. * @param type - Job type filter
  142. * @returns Job queue statistics
  143. */
  144. public getQueueStatus(type?: JobStatus) {
  145. const status: Record<string, ReturnType<Job["toJSON"]>[]> = {};
  146. if (!type || type === JobStatus.ACTIVE)
  147. status.active = this._active.map(job => job.toJSON());
  148. if (!type || type === JobStatus.QUEUED)
  149. status.queue = this._queue.map(job => job.toJSON());
  150. return status;
  151. }
  152. }
  153. export default new JobQueue();