ModuleManager.ts 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. import async from "async";
  2. import BaseModule from "./BaseModule";
  3. import Job from "./Job";
  4. import JobContext from "./JobContext";
  5. import JobQueue from "./JobQueue";
  6. import LogBook from "./LogBook";
  7. import { JobOptions } from "./types/JobOptions";
  8. import { Jobs, Modules, ModuleStatus, ModuleClass } from "./types/Modules";
  9. export default class ModuleManager {
  10. private modules?: Modules;
  11. public logBook: LogBook;
  12. private jobQueue: JobQueue;
  13. /**
  14. * Module Manager
  15. *
  16. * @param logBook - Logbook
  17. */
  18. public constructor(logBook: LogBook) {
  19. this.logBook = logBook;
  20. this.jobQueue = new JobQueue();
  21. }
  22. /**
  23. * getStatus - Get status of modules
  24. *
  25. * @returns Module statuses
  26. */
  27. public getStatus() {
  28. const status: Record<string, ModuleStatus> = {};
  29. Object.entries(this.modules || {}).forEach(([name, module]) => {
  30. status[name] = module.getStatus();
  31. });
  32. return status;
  33. }
  34. /**
  35. * getJobsStats - Get statistics of job queue
  36. *
  37. * @returns Job queue statistics
  38. */
  39. public getJobsStats() {
  40. return this.jobQueue.getStats();
  41. }
  42. /**
  43. * getJobsStatus - Get status of job queue
  44. *
  45. * @returns Job queue status
  46. */
  47. public getJobsStatus() {
  48. return this.jobQueue.getStatus();
  49. }
  50. /**
  51. * getQueueStatus - Get status of queued jobs
  52. *
  53. * @returns Job statuses
  54. */
  55. public getQueueStatus() {
  56. return this.jobQueue.getQueueStatus();
  57. }
  58. /**
  59. * loadModule - Load and initialize module
  60. *
  61. * @param moduleName - Name of the module
  62. * @returns Module
  63. */
  64. private loadModule<T extends keyof Modules>(
  65. moduleName: T
  66. ): Promise<Modules[T]> {
  67. return new Promise(resolve => {
  68. const mapper = {
  69. stations: "StationModule",
  70. others: "OtherModule",
  71. data: "DataModule"
  72. };
  73. import(`./modules/${mapper[moduleName]}`).then(
  74. ({ default: Module }: { default: ModuleClass<Modules[T]> }) => {
  75. const module = new Module(this);
  76. resolve(module);
  77. }
  78. );
  79. });
  80. }
  81. /**
  82. * loadModules - Load and initialize all modules
  83. *
  84. * @returns Promise
  85. */
  86. private loadModules(): Promise<void> {
  87. return new Promise((resolve, reject) => {
  88. const fetchModules = async () => ({
  89. data: await this.loadModule("data"),
  90. others: await this.loadModule("others"),
  91. stations: await this.loadModule("stations")
  92. });
  93. fetchModules()
  94. .then(modules => {
  95. this.modules = modules;
  96. resolve();
  97. })
  98. .catch(err => {
  99. reject(new Error(err));
  100. });
  101. });
  102. }
  103. /**
  104. * startup - Handle startup
  105. */
  106. public async startup(): Promise<void> {
  107. await this.loadModules().catch(async err => {
  108. await this.shutdown();
  109. throw err;
  110. });
  111. if (!this.modules) throw new Error("No modules were loaded");
  112. await async
  113. .each(Object.values(this.modules), async module => {
  114. await module.startup().catch(async err => {
  115. module.setStatus("ERROR");
  116. throw err;
  117. });
  118. })
  119. .catch(async err => {
  120. await this.shutdown();
  121. throw err;
  122. });
  123. this.jobQueue.resume();
  124. }
  125. /**
  126. * shutdown - Handle shutdown
  127. */
  128. public async shutdown(): Promise<void> {
  129. // TODO: await jobQueue completion/handle shutdown
  130. if (this.modules)
  131. await async.each(Object.values(this.modules), async module => {
  132. if (
  133. module.getStatus() === "STARTED" ||
  134. module.getStatus() === "STARTING" || // TODO: Handle better
  135. module.getStatus() === "ERROR"
  136. )
  137. await module.shutdown();
  138. });
  139. }
  140. /**
  141. * runJob - Run a job
  142. *
  143. * @param moduleName - Module name
  144. * @param jobName - Job name
  145. * @param params - Params
  146. */
  147. public runJob<
  148. ModuleNameType extends keyof Jobs & keyof Modules,
  149. JobNameType extends keyof Jobs[ModuleNameType] &
  150. keyof Omit<Modules[ModuleNameType], keyof BaseModule>,
  151. PayloadType extends "payload" extends keyof Jobs[ModuleNameType][JobNameType]
  152. ? Jobs[ModuleNameType][JobNameType]["payload"] extends undefined
  153. ? void
  154. : Jobs[ModuleNameType][JobNameType]["payload"]
  155. : void,
  156. ReturnType = "returns" extends keyof Jobs[ModuleNameType][JobNameType]
  157. ? Jobs[ModuleNameType][JobNameType]["returns"]
  158. : never
  159. >(
  160. moduleName: ModuleNameType,
  161. jobName: JobNameType,
  162. payload: PayloadType,
  163. options?: JobOptions
  164. ): Promise<ReturnType> {
  165. return new Promise<ReturnType>((resolve, reject) => {
  166. const module = this.modules && this.modules[moduleName];
  167. if (!module) reject(new Error("Module not found."));
  168. else {
  169. const jobFunction = module[jobName];
  170. if (!jobFunction || typeof jobFunction !== "function")
  171. reject(new Error("Job not found."));
  172. else if (
  173. Object.prototype.hasOwnProperty.call(BaseModule, jobName)
  174. )
  175. reject(new Error("Illegal job function."));
  176. else {
  177. const job = new Job(
  178. jobName.toString(),
  179. module,
  180. (job, resolveJob, rejectJob) => {
  181. const jobContext = new JobContext(
  182. this,
  183. this.logBook,
  184. job
  185. );
  186. jobFunction
  187. .apply(jobContext, [payload])
  188. .then((response: ReturnType) => {
  189. this.logBook.log({
  190. message: "Job completed successfully",
  191. type: "success",
  192. category: "jobs",
  193. data: {
  194. jobName: job.getName(),
  195. jobId: job.getUuid()
  196. }
  197. });
  198. resolveJob();
  199. resolve(response);
  200. })
  201. .catch((err: any) => {
  202. this.logBook.log({
  203. message: `Job failed with error "${err}"`,
  204. type: "error",
  205. category: "jobs",
  206. data: {
  207. jobName: job.getName(),
  208. jobId: job.getUuid()
  209. }
  210. });
  211. rejectJob();
  212. reject(err);
  213. });
  214. },
  215. {
  216. priority: (options && options.priority) || 10
  217. }
  218. );
  219. // If a job options.runDirectly is set to true, skip the queue and run a job directly
  220. if (options && options.runDirectly)
  221. this.jobQueue.runJob(job);
  222. else this.jobQueue.add(job);
  223. }
  224. }
  225. });
  226. }
  227. }