DataModule.ts 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. import config from "config";
  2. import { readdir } from "fs/promises";
  3. import path from "path";
  4. import { forEachIn } from "@common/utils/forEachIn";
  5. import {
  6. Sequelize,
  7. Model as SequelizeModel,
  8. ModelStatic,
  9. DataTypes,
  10. Utils,
  11. Options
  12. } from "sequelize";
  13. import { Dirent } from "fs";
  14. import * as inflection from "inflection";
  15. import { SequelizeStorage, Umzug } from "umzug";
  16. import ObjectID from "bson-objectid";
  17. import BaseModule, { ModuleStatus } from "@/BaseModule";
  18. import DataModuleJob from "./DataModule/DataModuleJob";
  19. import Job from "@/Job";
  20. import EventsModule from "./EventsModule";
  21. import { EventClass } from "./EventsModule/Event";
  22. export type ObjectIdType = string;
  23. // TODO fix TS
  24. // TODO implement actual checking of ObjectId's
  25. // TODO move to a better spot
  26. // Strange behavior would result if we extended DataTypes.ABSTRACT because
  27. // it's a class wrapped in a Proxy by Utils.classToInvokable.
  28. export class OBJECTID extends DataTypes.ABSTRACT.prototype.constructor {
  29. // Mandatory: set the type key
  30. static key = "OBJECTID";
  31. key = OBJECTID.key;
  32. // Mandatory: complete definition of the new type in the database
  33. toSql() {
  34. return "VARCHAR(24)";
  35. }
  36. // Optional: validator function
  37. // eslint-disable-next-line
  38. // @ts-ignore
  39. validate() {
  40. // value, options
  41. return true;
  42. // return (typeof value === 'number') && (!Number.isNaN(value));
  43. }
  44. // Optional: sanitizer
  45. // eslint-disable-next-line
  46. // @ts-ignore
  47. _sanitize(value) {
  48. return value;
  49. // Force all numbers to be positive
  50. // return value < 0 ? 0 : Math.round(value);
  51. }
  52. // Optional: value stringifier before sending to database
  53. // eslint-disable-next-line
  54. // @ts-ignore
  55. _stringify(value) {
  56. if (value instanceof ObjectID) return value.toHexString();
  57. return value.toString();
  58. }
  59. // Optional: parser for values received from the database
  60. // eslint-disable-next-line
  61. // @ts-ignore
  62. static parse(value) {
  63. return value;
  64. // return Number.parseInt(value);
  65. }
  66. }
  67. // Optional: add the new type to DataTypes. Optionally wrap it on `Utils.classToInvokable` to
  68. // be able to use this datatype directly without having to call `new` on it.
  69. DataTypes.OBJECTID = Utils.classToInvokable(OBJECTID);
  70. export class DataModule extends BaseModule {
  71. private _sequelize?: Sequelize;
  72. declare _jobs: Record<string, typeof Job | typeof DataModuleJob>;
  73. /**
  74. * Data Module
  75. */
  76. public constructor() {
  77. super("data");
  78. this._dependentModules = ["events"];
  79. }
  80. /**
  81. * startup - Startup data module
  82. */
  83. public override async startup() {
  84. await super.startup();
  85. await this._runMigrations();
  86. await this._setupSequelize();
  87. await super._started();
  88. }
  89. /**
  90. * shutdown - Shutdown data module
  91. */
  92. public override async shutdown() {
  93. await super.shutdown();
  94. await this._sequelize?.close();
  95. await this._stopped();
  96. }
  97. private async _createSequelizeInstance(options: Options = {}) {
  98. const { username, password, host, port, database } =
  99. config.get<any>("postgres");
  100. const sequelize = new Sequelize(database, username, password, {
  101. host,
  102. port,
  103. dialect: "postgres",
  104. logging: message =>
  105. this.log({
  106. type: "debug",
  107. category: "sql",
  108. message
  109. }),
  110. ...options
  111. });
  112. await sequelize.authenticate();
  113. return sequelize;
  114. }
  115. private _setupSequelizeHooks() {
  116. const getEventFromModel = (
  117. model: SequelizeModel<any, any>,
  118. suffix: string
  119. ): EventClass | null => {
  120. const modelName = (
  121. model.constructor as ModelStatic<any>
  122. ).getTableName();
  123. let EventClass;
  124. try {
  125. EventClass = this.getEvent(`${modelName}.${suffix}`);
  126. } catch (error) {
  127. // TODO: Catch and ignore only event not found
  128. return null;
  129. }
  130. return EventClass;
  131. };
  132. this._sequelize!.addHook("afterCreate", async model => {
  133. const EventClass = getEventFromModel(model, "created");
  134. if (!EventClass) return;
  135. await EventsModule.publish(
  136. new EventClass({
  137. doc: model
  138. })
  139. );
  140. });
  141. this._sequelize!.addHook("afterUpdate", async model => {
  142. const EventClass = getEventFromModel(model, "updated");
  143. if (!EventClass) return;
  144. await EventsModule.publish(
  145. new EventClass(
  146. {
  147. doc: model,
  148. oldDoc: {
  149. _id: model.get("_id")
  150. }
  151. },
  152. model.get("_id")!.toString()
  153. )
  154. );
  155. });
  156. this._sequelize!.addHook("afterDestroy", async model => {
  157. const EventClass = getEventFromModel(model, "deleted");
  158. if (!EventClass) return;
  159. await EventsModule.publish(
  160. new EventClass(
  161. {
  162. oldDoc: {
  163. _id: model.get("_id")
  164. }
  165. },
  166. model.get("_id")!.toString()
  167. )
  168. );
  169. });
  170. // Make sure every update/destroy has individualhooks
  171. this._sequelize!.addHook("beforeValidate", async model => {
  172. // TODO review
  173. if (model.isNewRecord) {
  174. const key = (model.constructor as ModelStatic<any>)
  175. .primaryKeyAttribute;
  176. model.dataValues[key] ??= ObjectID();
  177. }
  178. });
  179. }
  180. /**
  181. * setupSequelize - Setup sequelize instance
  182. */
  183. private async _setupSequelize() {
  184. this._sequelize = await this._createSequelizeInstance({});
  185. await this._sequelize.authenticate();
  186. this._setupSequelizeHooks();
  187. const setupFunctions: (() => Promise<void>)[] = [];
  188. await forEachIn(
  189. await readdir(
  190. path.resolve(__dirname, `./${this.constructor.name}/models`),
  191. {
  192. withFileTypes: true
  193. }
  194. ),
  195. async modelFile => {
  196. if (!modelFile.isFile() || modelFile.name.includes(".spec."))
  197. return;
  198. const {
  199. default: ModelClass,
  200. schema,
  201. options = {},
  202. setup
  203. } = await import(`${modelFile.path}/${modelFile.name}`);
  204. const tableName = inflection.camelize(
  205. inflection.pluralize(ModelClass.name),
  206. true
  207. );
  208. ModelClass.init(schema, {
  209. tableName,
  210. ...options,
  211. sequelize: this._sequelize
  212. });
  213. if (typeof setup === "function") setupFunctions.push(setup);
  214. await this._loadModelEvents(ModelClass.name);
  215. await this._loadModelJobs(ModelClass.name);
  216. }
  217. );
  218. await forEachIn(setupFunctions, setup => setup());
  219. }
  220. /**
  221. * getModel - Get model
  222. *
  223. * @returns Model
  224. */
  225. public async getModel<ModelType extends SequelizeModel<any>>(
  226. name: string
  227. ): Promise<ModelStatic<ModelType>> {
  228. if (!this._sequelize?.models) throw new Error("Models not loaded");
  229. if (this.getStatus() !== ModuleStatus.STARTED)
  230. throw new Error("Module not started");
  231. // TODO check if we want to do it via singularize&camelize, or another way
  232. const camelizedName = inflection.singularize(inflection.camelize(name));
  233. return this._sequelize.model(camelizedName) as ModelStatic<ModelType>; // This fails - news has not been defined
  234. }
  235. private async _loadModelJobs(modelClassName: string) {
  236. let jobs: Dirent[];
  237. try {
  238. jobs = await readdir(
  239. path.resolve(
  240. __dirname,
  241. `./${this.constructor.name}/models/${modelClassName}/jobs/`
  242. ),
  243. {
  244. withFileTypes: true
  245. }
  246. );
  247. } catch (error) {
  248. if (
  249. error instanceof Error &&
  250. "code" in error &&
  251. error.code === "ENOENT"
  252. ) {
  253. this.log(
  254. `Loading ${modelClassName} jobs failed - folder doesn't exist`
  255. );
  256. return;
  257. }
  258. throw error;
  259. }
  260. await forEachIn(jobs, async jobFile => {
  261. if (!jobFile.isFile() || jobFile.name.includes(".spec.")) return;
  262. const { default: JobClass } = await import(
  263. `${jobFile.path}/${jobFile.name}`
  264. );
  265. this._jobs[JobClass.getName()] = JobClass;
  266. });
  267. }
  268. private async _loadModelEvents(modelClassName: string) {
  269. let events: Dirent[];
  270. try {
  271. events = await readdir(
  272. path.resolve(
  273. __dirname,
  274. `./${this.constructor.name}/models/${modelClassName}/events/`
  275. ),
  276. {
  277. withFileTypes: true
  278. }
  279. );
  280. } catch (error) {
  281. if (
  282. error instanceof Error &&
  283. "code" in error &&
  284. error.code === "ENOENT"
  285. )
  286. return;
  287. throw error;
  288. }
  289. await forEachIn(events, async eventFile => {
  290. if (!eventFile.isFile() || eventFile.name.includes(".spec."))
  291. return;
  292. const { default: EventClass } = await import(
  293. `${eventFile.path}/${eventFile.name}`
  294. );
  295. this._events[EventClass.getName()] = EventClass;
  296. });
  297. }
  298. private async _runMigrations() {
  299. const sequelize = await this._createSequelizeInstance({
  300. logging: message =>
  301. this.log({
  302. type: "debug",
  303. category: "migrations.sql",
  304. message
  305. })
  306. });
  307. const migrator = new Umzug({
  308. migrations: {
  309. glob: [
  310. `${this.constructor.name}/migrations/*.ts`,
  311. { cwd: __dirname }
  312. ]
  313. },
  314. context: sequelize,
  315. storage: new SequelizeStorage({
  316. sequelize: sequelize!
  317. }),
  318. logger: console
  319. });
  320. await migrator.up();
  321. await sequelize.close();
  322. }
  323. }
  324. export default new DataModule();