Job.ts 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. import JobContext from "@/JobContext";
  2. import JobStatistics from "@/JobStatistics";
  3. import LogBook, { Log } from "@/LogBook";
  4. import { JobOptions } from "@/types/JobOptions";
  5. import WebSocketModule from "./modules/WebSocketModule";
  6. import BaseModule from "./BaseModule";
  7. import EventsModule from "./modules/EventsModule";
  8. export enum JobStatus {
  9. QUEUED = "QUEUED",
  10. ACTIVE = "ACTIVE",
  11. PAUSED = "PAUSED",
  12. COMPLETED = "COMPLETED"
  13. }
  14. export default abstract class Job {
  15. protected static _apiEnabled = true;
  16. protected _module: InstanceType<typeof BaseModule>;
  17. protected _payload: any;
  18. protected _context: JobContext;
  19. protected _priority: number;
  20. protected _longJob?: {
  21. title: string;
  22. progress?: {
  23. data: unknown;
  24. time: Date;
  25. timeout?: NodeJS.Timeout;
  26. };
  27. };
  28. protected _uuid: string;
  29. protected _status: JobStatus;
  30. protected _createdAt: number;
  31. protected _startedAt?: number;
  32. protected _completedAt?: number;
  33. /**
  34. * Job
  35. *
  36. * @param name - Job name
  37. * @param module - Job module
  38. * @param callback - Job callback
  39. * @param options - Job options
  40. */
  41. public constructor(
  42. module: InstanceType<typeof BaseModule>,
  43. payload: unknown,
  44. options?: JobOptions
  45. ) {
  46. this._createdAt = performance.now();
  47. this._module = module;
  48. this._payload = payload;
  49. this._priority = 1;
  50. this._status = JobStatus.QUEUED;
  51. /* eslint-disable no-bitwise, eqeqeq */
  52. this._uuid = "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx".replace(
  53. /[xy]/g,
  54. c => {
  55. const r = (Math.random() * 16) | 0;
  56. const v = c == "x" ? r : (r & 0x3) | 0x8;
  57. return v.toString(16);
  58. }
  59. );
  60. let contextOptions;
  61. if (options) {
  62. const { priority, longJob, session, socketId, callbackRef } =
  63. options;
  64. if (session || socketId)
  65. contextOptions = { session, socketId, callbackRef };
  66. if (priority) this._priority = priority;
  67. if (longJob)
  68. this._longJob = {
  69. title: longJob
  70. };
  71. }
  72. this._context = new JobContext(this, contextOptions);
  73. JobStatistics.updateStats(this.getPath(), "added");
  74. }
  75. /**
  76. * getName - Get job name
  77. */
  78. public static getName() {
  79. return this.name.substring(0, 1).toLowerCase() + this.name.substring(1);
  80. }
  81. /**
  82. * getName - Get job name
  83. */
  84. public getName() {
  85. return (
  86. this.constructor.name.substring(0, 1).toLowerCase() +
  87. this.constructor.name.substring(1)
  88. );
  89. }
  90. /**
  91. * getPath - Get module and job name in a dot format, e.g. module.jobName
  92. */
  93. public getPath() {
  94. return `${this._module.getName()}.${this.getName()}`;
  95. }
  96. /**
  97. * getPriority - Get job priority
  98. *
  99. * @returns priority
  100. */
  101. public getPriority() {
  102. return this._priority;
  103. }
  104. /**
  105. * getUuid - Get job UUID
  106. *
  107. * @returns UUID
  108. */
  109. public getUuid() {
  110. return this._uuid;
  111. }
  112. /**
  113. * getStatus - Get job status
  114. *
  115. * @returns status
  116. */
  117. public getStatus() {
  118. return this._status;
  119. }
  120. /**
  121. * setStatus - Set job status
  122. *
  123. * @param status - Job status
  124. */
  125. protected _setStatus(status: JobStatus) {
  126. this._status = status;
  127. }
  128. /**
  129. * getModule - Get module
  130. *
  131. * @returns module
  132. */
  133. public getModule() {
  134. return this._module;
  135. }
  136. public static isApiEnabled() {
  137. return this._apiEnabled;
  138. }
  139. public isApiEnabled() {
  140. return (this.constructor as typeof Job)._apiEnabled;
  141. }
  142. protected async _validate() {}
  143. protected async _authorize() {
  144. await this._context.assertPermission(this.getPath());
  145. }
  146. protected abstract _execute(): Promise<unknown>;
  147. /**
  148. * execute - Execute job
  149. *
  150. * @returns Promise
  151. */
  152. public async execute() {
  153. if (this._startedAt) throw new Error("Job has already been executed.");
  154. if (!this.getModule().canRunJobs())
  155. throw new Error("Module can not currently run jobs.");
  156. this._setStatus(JobStatus.ACTIVE);
  157. this._startedAt = performance.now();
  158. try {
  159. await this._validate();
  160. await this._authorize();
  161. const data = await this._execute();
  162. const socketId = this._context.getSocketId();
  163. const callbackRef = this._context.getCallbackRef();
  164. if (callbackRef) {
  165. await EventsModule.publish(`job.${this.getUuid()}`, {
  166. socketId,
  167. callbackRef,
  168. status: "success",
  169. data
  170. });
  171. }
  172. this.log({
  173. message: "Job completed successfully",
  174. type: "success"
  175. });
  176. JobStatistics.updateStats(this.getPath(), "successful");
  177. return data;
  178. } catch (error: unknown) {
  179. const message = error?.message ?? error;
  180. const socketId = this._context.getSocketId();
  181. const callbackRef = this._context.getCallbackRef();
  182. if (callbackRef) {
  183. await EventsModule.publish(`job.${this.getUuid()}`, {
  184. socketId,
  185. callbackRef,
  186. status: "error",
  187. message
  188. });
  189. }
  190. this.log({
  191. message: `Job failed with error "${message}"`,
  192. type: "error",
  193. data: { error }
  194. });
  195. JobStatistics.updateStats(this.getPath(), "failed");
  196. throw error;
  197. } finally {
  198. this._completedAt = performance.now();
  199. JobStatistics.updateStats(this.getPath(), "total");
  200. if (this._startedAt)
  201. JobStatistics.updateStats(
  202. this.getPath(),
  203. "duration",
  204. this._completedAt - this._startedAt
  205. );
  206. this._setStatus(JobStatus.COMPLETED);
  207. }
  208. }
  209. /**
  210. * Log a message in the context of the current job, which automatically sets the category and data
  211. *
  212. * @param log - Log message or object
  213. */
  214. public log(log: string | Omit<Log, "timestamp" | "category">) {
  215. const {
  216. message,
  217. type = undefined,
  218. data = {}
  219. } = {
  220. ...(typeof log === "string" ? { message: log } : log)
  221. };
  222. LogBook.log({
  223. message,
  224. type,
  225. category: this.getPath(),
  226. data: {
  227. ...this.toJSON(),
  228. ...data
  229. }
  230. });
  231. }
  232. /**
  233. * Serialize job info
  234. *
  235. * @returns json
  236. */
  237. public toJSON() {
  238. return {
  239. uuid: this.getUuid(),
  240. priority: this.getPriority(),
  241. name: this.getPath(),
  242. status: this.getStatus(),
  243. moduleStatus: this._module.getStatus(),
  244. createdAt: this._createdAt,
  245. startedAt: this._startedAt,
  246. completedAt: this._completedAt,
  247. payload: JSON.stringify(this._payload)
  248. };
  249. }
  250. }