1
0

JobQueue.ts 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. import Job from "./Job";
  2. import { JobStatus } from "./types/JobStatus";
  3. export default class JobQueue {
  4. private concurrency: number;
  5. private isPaused: boolean;
  6. private queue: Job[];
  7. private active: Job[];
  8. private stats: Record<
  9. string,
  10. {
  11. successful: number;
  12. failed: number;
  13. total: number;
  14. added: number;
  15. }
  16. >;
  17. /**
  18. * Job Queue
  19. */
  20. public constructor() {
  21. this.concurrency = 10;
  22. this.isPaused = true;
  23. this.queue = [];
  24. this.active = [];
  25. this.stats = {};
  26. }
  27. /**
  28. * add - Add job to queue
  29. *
  30. * @param {Job} job Job
  31. */
  32. public add(job: Job): void {
  33. this.queue.push(job);
  34. this.updateStats(job.getName(), "added");
  35. setTimeout(() => {
  36. this.process();
  37. }, 0);
  38. }
  39. /**
  40. * getJob - Fetch job
  41. *
  42. * @param {jobId} jobId Job UUID
  43. * @returns {Job|undefined} Job if found
  44. */
  45. public getJob(jobId: string): Job | undefined {
  46. return (
  47. this.queue.find(job => job.getUuid() === jobId) ||
  48. this.active.find(job => job.getUuid() === jobId)
  49. );
  50. }
  51. /**
  52. * pause - Pause queue
  53. *
  54. * Pause processing of jobs in queue, active jobs will not be paused.
  55. */
  56. public pause(): void {
  57. this.isPaused = true;
  58. }
  59. /**
  60. * resume - Resume queue
  61. */
  62. public resume(): void {
  63. this.isPaused = false;
  64. }
  65. /**
  66. * process - Process queue
  67. */
  68. private process(): void {
  69. if (
  70. this.isPaused ||
  71. this.active.length >= this.concurrency ||
  72. this.queue.length === 0
  73. )
  74. return;
  75. const job = this.queue.reduce((a, b) =>
  76. a.getPriority() <= b.getPriority() ? a : b
  77. );
  78. if (job.getPriority() === -1) return;
  79. this.queue.splice(this.queue.indexOf(job), 1);
  80. this.active.push(job);
  81. job.execute()
  82. .then(() => {
  83. this.updateStats(job.getName(), "successful");
  84. })
  85. .catch(() => {
  86. this.updateStats(job.getName(), "failed");
  87. })
  88. .finally(() => {
  89. this.updateStats(job.getName(), "total");
  90. this.active.splice(this.active.indexOf(job), 1);
  91. setTimeout(() => {
  92. this.process();
  93. }, 0);
  94. });
  95. }
  96. /**
  97. * getStatus - Get status of job queue
  98. *
  99. * @returns {object} Job queue status
  100. */
  101. public getStatus() {
  102. return {
  103. isPaused: this.isPaused,
  104. queueLength: this.queue.length,
  105. activeLength: this.active.length,
  106. concurrency: this.concurrency
  107. };
  108. }
  109. /**
  110. * getStats - Get statistics of job queue
  111. *
  112. * @returns {object} Job queue statistics
  113. */
  114. public getStats() {
  115. return {
  116. ...this.stats,
  117. total: Object.values(this.stats).reduce((a, b) => ({
  118. successful: a.successful + b.successful,
  119. failed: a.failed + b.failed,
  120. total: a.total + b.total,
  121. added: a.added + b.added
  122. }))
  123. };
  124. }
  125. /**
  126. * getQueueStatus - Get statistics of queued or active jobs
  127. *
  128. * @param {JobStatus|undefined} type Job type filter
  129. * @returns {object} Job queue statistics
  130. */
  131. public getQueueStatus(type?: JobStatus) {
  132. const status: Record<
  133. string,
  134. {
  135. uuid: string;
  136. priority: number;
  137. name: string;
  138. status: JobStatus;
  139. }[]
  140. > = {};
  141. const format = (job: Job) => ({
  142. uuid: job.getUuid(),
  143. priority: job.getPriority(),
  144. name: job.getName(),
  145. status: job.getStatus()
  146. });
  147. if (!type || type === "ACTIVE") status.active = this.active.map(format);
  148. if (!type || type === "QUEUED") status.queue = this.queue.map(format);
  149. return status;
  150. }
  151. /**
  152. * updateStats - Update job statistics
  153. *
  154. * @param {string} jobName Job name
  155. * @param {"successful"|"failed"|"total"} type Stats type
  156. */
  157. private updateStats(
  158. jobName: string,
  159. type: "successful" | "failed" | "total" | "added"
  160. ) {
  161. if (!this.stats[jobName])
  162. this.stats[jobName] = {
  163. successful: 0,
  164. failed: 0,
  165. total: 0,
  166. added: 0
  167. };
  168. this.stats[jobName][type] += 1;
  169. }
  170. }