Job.ts 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. import JobContext from "@/JobContext";
  2. import JobStatistics from "@/JobStatistics";
  3. import LogBook, { Log } from "@/LogBook";
  4. import { JobOptions } from "@/types/JobOptions";
  5. import { Modules } from "@/types/Modules";
  6. import WebSocketModule from "./modules/WebSocketModule";
  7. export enum JobStatus {
  8. QUEUED = "QUEUED",
  9. ACTIVE = "ACTIVE",
  10. PAUSED = "PAUSED",
  11. COMPLETED = "COMPLETED"
  12. }
  13. export default abstract class Job {
  14. protected static _apiEnabled = true;
  15. protected _module: Modules[keyof Modules];
  16. protected _payload: any;
  17. protected _context: JobContext;
  18. protected _priority: number;
  19. protected _longJob?: {
  20. title: string;
  21. progress?: {
  22. data: unknown;
  23. time: Date;
  24. timeout?: NodeJS.Timeout;
  25. };
  26. };
  27. protected _uuid: string;
  28. protected _status: JobStatus;
  29. protected _createdAt: number;
  30. protected _startedAt?: number;
  31. protected _completedAt?: number;
  32. /**
  33. * Job
  34. *
  35. * @param name - Job name
  36. * @param module - Job module
  37. * @param callback - Job callback
  38. * @param options - Job options
  39. */
  40. public constructor(
  41. module: Modules[keyof Modules],
  42. payload: any,
  43. options?: Omit<JobOptions, "runDirectly">
  44. ) {
  45. this._module = module;
  46. this._payload = payload;
  47. this._priority = 1;
  48. let contextOptions;
  49. if (options) {
  50. const { priority, longJob, session, socketId, callbackRef } =
  51. options;
  52. if (session || socketId)
  53. contextOptions = { session, socketId, callbackRef };
  54. if (priority) this._priority = priority;
  55. if (longJob)
  56. this._longJob = {
  57. title: longJob
  58. };
  59. }
  60. this._context = new JobContext(this, contextOptions);
  61. /* eslint-disable no-bitwise, eqeqeq */
  62. this._uuid = "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx".replace(
  63. /[xy]/g,
  64. c => {
  65. const r = (Math.random() * 16) | 0;
  66. const v = c == "x" ? r : (r & 0x3) | 0x8;
  67. return v.toString(16);
  68. }
  69. );
  70. this._status = JobStatus.QUEUED;
  71. this._createdAt = performance.now();
  72. }
  73. /**
  74. * getName - Get job name
  75. */
  76. public static getName() {
  77. return this.name.substring(0, 1).toLowerCase() + this.name.substring(1);
  78. }
  79. /**
  80. * getName - Get job name
  81. */
  82. public getName() {
  83. return (
  84. this.constructor.name.substring(0, 1).toLowerCase() +
  85. this.constructor.name.substring(1)
  86. );
  87. }
  88. /**
  89. * getPath - Get module and job name in a dot format, e.g. module.jobName
  90. */
  91. public getPath() {
  92. return `${this._module.getName()}.${this.getName()}`;
  93. }
  94. /**
  95. * getPriority - Get job priority
  96. *
  97. * @returns priority
  98. */
  99. public getPriority() {
  100. return this._priority;
  101. }
  102. /**
  103. * getUuid - Get job UUID
  104. *
  105. * @returns UUID
  106. */
  107. public getUuid() {
  108. return this._uuid;
  109. }
  110. /**
  111. * getStatus - Get job status
  112. *
  113. * @returns status
  114. */
  115. public getStatus() {
  116. return this._status;
  117. }
  118. /**
  119. * setStatus - Set job status
  120. *
  121. * @param status - Job status
  122. */
  123. protected _setStatus(status: JobStatus) {
  124. this._status = status;
  125. }
  126. /**
  127. * getModule - Get module
  128. *
  129. * @returns module
  130. */
  131. public getModule() {
  132. return this._module;
  133. }
  134. public static isApiEnabled() {
  135. return this._apiEnabled;
  136. }
  137. public isApiEnabled() {
  138. return this.constructor._apiEnabled;
  139. }
  140. protected async _authorize() {
  141. await this._context.assertPermission(this.getPath());
  142. }
  143. protected abstract _execute();
  144. /**
  145. * execute - Execute job
  146. *
  147. * @returns Promise
  148. */
  149. public async execute() {
  150. if (this._startedAt) throw new Error("Job has already been executed.");
  151. if (!this.getModule().canRunJobs())
  152. throw new Error("Module can not currently run jobs.");
  153. this._setStatus(JobStatus.ACTIVE);
  154. this._startedAt = performance.now();
  155. try {
  156. await this._authorize();
  157. const data = await this._execute(this._payload);
  158. if (this._context.getSocketId() && this._context.getCallbackRef()) {
  159. await WebSocketModule.dispatch(
  160. this._context.getSocketId(),
  161. "jobCallback",
  162. this._context.getCallbackRef(),
  163. {
  164. status: "success",
  165. data
  166. }
  167. );
  168. }
  169. this.log({
  170. message: "Job completed successfully",
  171. type: "success"
  172. });
  173. JobStatistics.updateStats(this.getPath(), "successful");
  174. return data;
  175. } catch (error: any) {
  176. const message = error?.message ?? error;
  177. if (this._context.getSocketId() && this._context.getCallbackRef()) {
  178. await WebSocketModule.dispatch(
  179. this._context.getSocketId(),
  180. "jobCallback",
  181. this._context.getCallbackRef(),
  182. {
  183. status: "error",
  184. message
  185. }
  186. );
  187. }
  188. this.log({
  189. message: `Job failed with error "${message}"`,
  190. type: "error",
  191. data: { error }
  192. });
  193. JobStatistics.updateStats(this.getPath(), "failed");
  194. throw error;
  195. } finally {
  196. this._completedAt = performance.now();
  197. JobStatistics.updateStats(this.getPath(), "total");
  198. if (this._startedAt)
  199. JobStatistics.updateStats(
  200. this.getPath(),
  201. "duration",
  202. this._completedAt - this._startedAt
  203. );
  204. this._setStatus(JobStatus.COMPLETED);
  205. }
  206. }
  207. /**
  208. * Log a message in the context of the current job, which automatically sets the category and data
  209. *
  210. * @param log - Log message or object
  211. */
  212. public log(log: string | Omit<Log, "timestamp" | "category">) {
  213. const {
  214. message,
  215. type = undefined,
  216. data = {}
  217. } = {
  218. ...(typeof log === "string" ? { message: log } : log)
  219. };
  220. LogBook.log({
  221. message,
  222. type,
  223. category: this.getPath(),
  224. data: {
  225. ...this.toJSON(),
  226. ...data
  227. }
  228. });
  229. }
  230. /**
  231. * Serialize job info
  232. *
  233. * @returns json
  234. */
  235. public toJSON() {
  236. return {
  237. uuid: this.getUuid(),
  238. priority: this.getPriority(),
  239. name: this.getPath(),
  240. status: this.getStatus(),
  241. moduleStatus: this._module.getStatus(),
  242. createdAt: this._createdAt,
  243. startedAt: this._startedAt,
  244. completedAt: this._completedAt,
  245. payload: JSON.stringify(this._payload)
  246. };
  247. }
  248. }