Job.ts 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. import { getErrorMessage } from "@common/utils/getErrorMessage";
  2. import { generateUuid } from "@common/utils/generateUuid";
  3. import Joi from "joi";
  4. import * as inflection from "inflection";
  5. import JobContext from "@/JobContext";
  6. import JobStatistics, { JobStatisticsType } from "@/JobStatistics";
  7. import LogBook, { Log } from "@/LogBook";
  8. import BaseModule from "./BaseModule";
  9. import EventsModule from "./modules/EventsModule";
  10. import JobCompletedEvent from "./modules/EventsModule/events/JobCompletedEvent";
  11. import User from "./modules/DataModule/models/User";
  12. import Session from "./modules/DataModule/models/Session";
  13. export enum JobStatus {
  14. QUEUED = "QUEUED",
  15. ACTIVE = "ACTIVE",
  16. PAUSED = "PAUSED",
  17. COMPLETED = "COMPLETED"
  18. }
  19. export type JobOptions = {
  20. priority?: number;
  21. longJob?: string;
  22. session?: Session;
  23. socketId?: string;
  24. callbackRef?: string;
  25. };
  26. export default abstract class Job {
  27. protected static _apiEnabled = true;
  28. protected _module: InstanceType<typeof BaseModule>;
  29. protected _payload: any;
  30. protected _context: JobContext;
  31. protected _priority: number;
  32. protected _longJob?: {
  33. title: string;
  34. progress?: {
  35. data: unknown;
  36. time: Date;
  37. timeout?: NodeJS.Timeout;
  38. };
  39. };
  40. protected _uuid: string;
  41. protected _status: JobStatus;
  42. protected _createdAt: number;
  43. protected _startedAt?: number;
  44. protected _completedAt?: number;
  45. /**
  46. * Job
  47. *
  48. * @param name - Job name
  49. * @param module - Job module
  50. * @param callback - Job callback
  51. * @param options - Job options
  52. */
  53. public constructor(
  54. module: InstanceType<typeof BaseModule>,
  55. payload: unknown,
  56. options?: JobOptions
  57. ) {
  58. this._createdAt = performance.now();
  59. this._module = module;
  60. this._payload = payload;
  61. this._priority = 1;
  62. this._status = JobStatus.QUEUED;
  63. /* eslint-disable no-bitwise, eqeqeq */
  64. this._uuid = generateUuid();
  65. let contextOptions;
  66. if (options) {
  67. const { priority, longJob, session, socketId, callbackRef } =
  68. options;
  69. if (session || socketId)
  70. contextOptions = { session, socketId, callbackRef };
  71. if (priority) this._priority = priority;
  72. if (longJob)
  73. this._longJob = {
  74. title: longJob
  75. };
  76. }
  77. this._context = new JobContext(this, contextOptions);
  78. JobStatistics.updateStats(
  79. this.getPath(),
  80. JobStatisticsType.CONSTRUCTED
  81. );
  82. }
  83. /**
  84. * getName - Get job name
  85. */
  86. public static getName() {
  87. return inflection.camelize(this.name, true);
  88. }
  89. /**
  90. * getName - Get job name
  91. */
  92. public getName() {
  93. return (this.constructor as typeof Job).getName();
  94. }
  95. /**
  96. * getPath - Get module and job name in a dot format, e.g. module.jobName
  97. */
  98. public getPath() {
  99. return `${this._module.getName()}.${this.getName()}`;
  100. }
  101. /**
  102. * getPriority - Get job priority
  103. *
  104. * @returns priority
  105. */
  106. public getPriority() {
  107. return this._priority;
  108. }
  109. /**
  110. * getUuid - Get job UUID
  111. *
  112. * @returns UUID
  113. */
  114. public getUuid() {
  115. return this._uuid;
  116. }
  117. /**
  118. * getStatus - Get job status
  119. *
  120. * @returns status
  121. */
  122. public getStatus() {
  123. return this._status;
  124. }
  125. /**
  126. * setStatus - Set job status
  127. *
  128. * @param status - Job status
  129. */
  130. protected _setStatus(status: JobStatus) {
  131. this._status = status;
  132. }
  133. /**
  134. * getModule - Get module
  135. *
  136. * @returns module
  137. */
  138. public getModule() {
  139. return this._module;
  140. }
  141. public static isApiEnabled() {
  142. return this._apiEnabled;
  143. }
  144. public isApiEnabled() {
  145. return (this.constructor as typeof Job)._apiEnabled;
  146. }
  147. public getPayloadSchema() {
  148. return (this.constructor as typeof Job)._payloadSchema;
  149. }
  150. protected static _hasPermission:
  151. | boolean
  152. | CallableFunction
  153. | (boolean | CallableFunction)[] = false;
  154. // Check if a given user has generic permission to execute a job, using _hasPermission
  155. public static async hasPermission(user: User | null) {
  156. const options = Array.isArray(this._hasPermission)
  157. ? this._hasPermission
  158. : [this._hasPermission];
  159. return options.reduce(async (previous, option) => {
  160. if (await previous) return true;
  161. if (typeof option === "boolean") return option;
  162. if (typeof option === "function") return option(user);
  163. return false;
  164. }, Promise.resolve(false));
  165. }
  166. // If a job expects a payload, it must override this
  167. protected static _payloadSchema: Joi.ObjectSchema<any> | null = null;
  168. // Whether this _validate has been called. May not be modified by classes that extend Job
  169. protected _validated = false;
  170. // If a class that extends Job overrides _validate, it must still call super._validate, so this always gets called
  171. protected async _validate() {
  172. const payloadSchema = this.getPayloadSchema();
  173. if (this._payload === undefined && !payloadSchema)
  174. this._validated = true;
  175. else if (!payloadSchema) {
  176. throw new Error(
  177. "Payload provided, but no payload schema specified."
  178. );
  179. } else {
  180. await payloadSchema.validateAsync(this._payload, {
  181. presence: "required"
  182. });
  183. }
  184. this._validated = true;
  185. }
  186. protected async _authorize() {
  187. await this._context.assertPermission(this.getPath());
  188. }
  189. protected abstract _execute(): Promise<unknown>;
  190. protected _transformResponse: null | ((response: unknown) => unknown) =
  191. null;
  192. /**
  193. * execute - Execute job
  194. *
  195. * @returns Promise
  196. */
  197. public async execute() {
  198. if (this._startedAt) throw new Error("Job has already been executed.");
  199. if (!this.getModule().canRunJobs())
  200. throw new Error("Module can not currently run jobs.");
  201. this._setStatus(JobStatus.ACTIVE);
  202. this._startedAt = performance.now();
  203. try {
  204. await this._validate();
  205. // Safety check, to make sure this class' _validate function was called
  206. if (!this._validated) {
  207. throw new Error(
  208. "Validate function was fine, but validated was false. Warning. Make sure to call super when you override _validate."
  209. );
  210. }
  211. await this._authorize();
  212. let response = await this._execute();
  213. if (this._transformResponse)
  214. response = this._transformResponse(response);
  215. const socketId = this._context.getSocketId();
  216. const callbackRef = this._context.getCallbackRef();
  217. if (callbackRef) {
  218. await EventsModule.publish(
  219. new JobCompletedEvent(
  220. {
  221. socketId,
  222. callbackRef,
  223. status: "success",
  224. data: response
  225. },
  226. this.getUuid()
  227. )
  228. );
  229. }
  230. this.log({
  231. message: "Job completed successfully",
  232. type: "success"
  233. });
  234. JobStatistics.updateStats(
  235. this.getPath(),
  236. JobStatisticsType.SUCCESSFUL
  237. );
  238. return response;
  239. } catch (error: unknown) {
  240. const message = getErrorMessage(error);
  241. const socketId = this._context.getSocketId();
  242. const callbackRef = this._context.getCallbackRef();
  243. if (callbackRef) {
  244. await EventsModule.publish(
  245. new JobCompletedEvent(
  246. {
  247. socketId,
  248. callbackRef,
  249. status: "error",
  250. message
  251. },
  252. this.getUuid()
  253. )
  254. );
  255. }
  256. this.log({
  257. message: `Job failed with error "${message}"`,
  258. type: "error",
  259. data: { error }
  260. });
  261. JobStatistics.updateStats(this.getPath(), JobStatisticsType.FAILED);
  262. throw error;
  263. } finally {
  264. this._completedAt = performance.now();
  265. JobStatistics.updateStats(this.getPath(), JobStatisticsType.TOTAL);
  266. if (this._startedAt)
  267. JobStatistics.updateStats(
  268. this.getPath(),
  269. JobStatisticsType.DURATION,
  270. this._completedAt - this._startedAt
  271. );
  272. this._setStatus(JobStatus.COMPLETED);
  273. }
  274. }
  275. /**
  276. * Log a message in the context of the current job, which automatically sets the category and data
  277. *
  278. * @param log - Log message or object
  279. */
  280. public log(log: string | Omit<Log, "timestamp" | "category">) {
  281. const {
  282. message,
  283. type = undefined,
  284. data = {}
  285. } = {
  286. ...(typeof log === "string" ? { message: log } : log)
  287. };
  288. LogBook.log({
  289. message,
  290. type,
  291. category: this.getPath(),
  292. data: {
  293. ...this.toJSON(),
  294. ...data
  295. }
  296. });
  297. }
  298. /**
  299. * Serialize job info
  300. *
  301. * @returns json
  302. */
  303. public toJSON() {
  304. return {
  305. uuid: this.getUuid(),
  306. priority: this.getPriority(),
  307. name: this.getPath(),
  308. status: this.getStatus(),
  309. moduleStatus: this._module.getStatus(),
  310. createdAt: this._createdAt,
  311. startedAt: this._startedAt,
  312. completedAt: this._completedAt,
  313. payload: JSON.stringify(this._payload)
  314. };
  315. }
  316. }