1
0

DataModule.ts 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. import config from "config";
  2. import mongoose, { Connection, SchemaTypes } from "mongoose";
  3. import { patchHistoryPlugin, patchEventEmitter } from "ts-patch-mongoose";
  4. import { readdir } from "fs/promises";
  5. import path from "path";
  6. import updateVersioningPlugin from "mongoose-update-versioning";
  7. import Migration from "@/modules/DataModule/Migration";
  8. import documentVersionPlugin from "@/modules/DataModule/plugins/documentVersion";
  9. import getDataPlugin from "@/modules/DataModule/plugins/getData";
  10. import BaseModule, { ModuleStatus } from "@/BaseModule";
  11. import { UniqueMethods } from "@/types/Modules";
  12. import { Models } from "@/types/Models";
  13. import { Schemas } from "@/types/Schemas";
  14. import EventsModule from "./EventsModule";
  15. export class DataModule extends BaseModule {
  16. private _models?: Models;
  17. private _mongoConnection?: Connection;
  18. /**
  19. * Data Module
  20. */
  21. public constructor() {
  22. super("data");
  23. this._dependentModules = ["events"];
  24. }
  25. /**
  26. * startup - Startup data module
  27. */
  28. public override async startup() {
  29. await super.startup();
  30. await this._createMongoConnection();
  31. await this._runMigrations();
  32. await this._loadModels();
  33. await this._syncModelIndexes();
  34. await this._loadModelJobs();
  35. await super._started();
  36. }
  37. /**
  38. * shutdown - Shutdown data module
  39. */
  40. public override async shutdown() {
  41. await super.shutdown();
  42. patchEventEmitter.removeAllListeners();
  43. if (this._mongoConnection) await this._mongoConnection.close();
  44. await this._stopped();
  45. }
  46. /**
  47. * createMongoConnection - Create mongo connection
  48. */
  49. private async _createMongoConnection() {
  50. mongoose.set({
  51. runValidators: true,
  52. sanitizeFilter: true,
  53. strict: "throw",
  54. strictQuery: "throw"
  55. });
  56. const { user, password, host, port, database } = config.get<{
  57. user: string;
  58. password: string;
  59. host: string;
  60. port: number;
  61. database: string;
  62. }>("mongo");
  63. const mongoUrl = `mongodb://${user}:${password}@${host}:${port}/${database}`;
  64. this._mongoConnection = await mongoose
  65. .createConnection(mongoUrl)
  66. .asPromise();
  67. }
  68. /**
  69. * registerEvents - Register events for schema with event module
  70. */
  71. private async _registerEvents<
  72. ModelName extends keyof Models,
  73. SchemaType extends Schemas[keyof ModelName]
  74. >(modelName: ModelName, schema: SchemaType) {
  75. const { enabled, eventCreated, eventUpdated, eventDeleted } =
  76. schema.get("patchHistory") ?? {};
  77. if (!enabled) return;
  78. Object.entries({
  79. created: eventCreated,
  80. updated: eventUpdated,
  81. deleted: eventDeleted
  82. })
  83. .filter(([, event]) => !!event)
  84. .forEach(([action, event]) => {
  85. patchEventEmitter.on(event, async ({ doc, oldDoc }) => {
  86. const modelId = doc?._id ?? oldDoc?._id;
  87. const Model = await this.getModel(modelName);
  88. if (doc) doc = Model.hydrate(doc);
  89. if (oldDoc) oldDoc = Model.hydrate(oldDoc);
  90. if (!modelId && action !== "created")
  91. throw new Error(`Model Id not found for "${event}"`);
  92. const channel = `model.${modelName}.${action}`;
  93. await EventsModule.publish(channel, { doc, oldDoc });
  94. if (action !== "created")
  95. await EventsModule.publish(`${channel}.${modelId}`, {
  96. doc,
  97. oldDoc
  98. });
  99. });
  100. });
  101. }
  102. /**
  103. * registerEvents - Register events for schema with event module
  104. */
  105. private async _registerEventListeners<
  106. ModelName extends keyof Models,
  107. SchemaType extends Schemas[keyof ModelName]
  108. >(schema: SchemaType) {
  109. const eventListeners = schema.get("eventListeners");
  110. if (
  111. typeof eventListeners !== "object" ||
  112. Object.keys(eventListeners).length === 0
  113. )
  114. return;
  115. await Promise.all(
  116. Object.entries(eventListeners).map(async ([event, callback]) =>
  117. EventsModule.subscribe("event", event, callback)
  118. )
  119. );
  120. }
  121. /**
  122. * loadModel - Import and load model schema
  123. *
  124. * @param modelName - Name of the model
  125. * @returns Model
  126. */
  127. private async _loadModel<ModelName extends keyof Models>(
  128. modelName: ModelName
  129. ): Promise<Models[ModelName]> {
  130. if (!this._mongoConnection) throw new Error("Mongo is not available");
  131. const { schema }: { schema: Schemas[ModelName] } = await import(
  132. `./DataModule/models/${modelName.toString()}/schema`
  133. );
  134. schema.plugin(documentVersionPlugin);
  135. schema.set("timestamps", schema.get("timestamps") ?? true);
  136. const patchHistoryConfig = {
  137. enabled: true,
  138. patchHistoryDisabled: true,
  139. eventCreated: `${modelName}.created`,
  140. eventUpdated: `${modelName}.updated`,
  141. eventDeleted: `${modelName}.deleted`,
  142. ...(schema.get("patchHistory") ?? {})
  143. };
  144. schema.set("patchHistory", patchHistoryConfig);
  145. if (patchHistoryConfig.enabled) {
  146. schema.plugin(patchHistoryPlugin, patchHistoryConfig);
  147. }
  148. const { enabled: getDataEnabled = false } = schema.get("getData") ?? {};
  149. if (getDataEnabled) schema.plugin(getDataPlugin);
  150. await this._registerEvents(modelName, schema);
  151. await this._registerEventListeners(schema);
  152. schema.set("toObject", { getters: true, virtuals: true });
  153. schema.set("toJSON", { getters: true, virtuals: true });
  154. schema.virtual("_name").get(() => modelName);
  155. schema.plugin(updateVersioningPlugin);
  156. await Promise.all(
  157. Object.entries(schema.paths)
  158. .filter(
  159. ([, type]) =>
  160. type instanceof SchemaTypes.ObjectId ||
  161. (type instanceof SchemaTypes.Array &&
  162. type.caster instanceof SchemaTypes.ObjectId)
  163. )
  164. .map(async ([key, type]) => {
  165. const { ref } =
  166. (type instanceof SchemaTypes.ObjectId
  167. ? type?.options
  168. : type.caster?.options) ?? {};
  169. if (ref)
  170. schema.path(key).get(value => {
  171. if (
  172. typeof value === "object" &&
  173. type instanceof SchemaTypes.ObjectId
  174. )
  175. return {
  176. _id: value,
  177. _name: ref
  178. };
  179. if (
  180. Array.isArray(value) &&
  181. type instanceof SchemaTypes.Array
  182. )
  183. return value.map(item =>
  184. item === null
  185. ? null
  186. : {
  187. _id: item,
  188. _name: ref
  189. }
  190. );
  191. return value;
  192. });
  193. })
  194. );
  195. return this._mongoConnection.model(modelName.toString(), schema);
  196. }
  197. /**
  198. * loadModels - Load and initialize all models
  199. *
  200. * @returns Promise
  201. */
  202. private async _loadModels() {
  203. mongoose.SchemaTypes.String.set("trim", true);
  204. this._models = {
  205. abc: await this._loadModel("abc"),
  206. news: await this._loadModel("news"),
  207. sessions: await this._loadModel("sessions"),
  208. stations: await this._loadModel("stations"),
  209. users: await this._loadModel("users")
  210. };
  211. }
  212. /**
  213. * syncModelIndexes - Sync indexes for all models
  214. */
  215. private async _syncModelIndexes() {
  216. if (!this._models) throw new Error("Models not loaded");
  217. await Promise.all(
  218. Object.values(this._models).map(model => model.syncIndexes())
  219. );
  220. }
  221. /**
  222. * getModel - Get model
  223. *
  224. * @returns Model
  225. */
  226. public async getModel<ModelName extends keyof Models>(name: ModelName) {
  227. if (!this._models) throw new Error("Models not loaded");
  228. if (this.getStatus() !== ModuleStatus.STARTED)
  229. throw new Error("Module not started");
  230. return this._models[name];
  231. }
  232. private async _loadModelMigrations(modelName: string) {
  233. if (!this._mongoConnection) throw new Error("Mongo is not available");
  234. let migrations;
  235. try {
  236. migrations = await readdir(
  237. path.resolve(
  238. __dirname,
  239. `./DataModule/models/${modelName}/migrations/`
  240. )
  241. );
  242. } catch (error) {
  243. if (error.code === "ENOENT") return [];
  244. throw error;
  245. }
  246. return Promise.all(
  247. migrations.map(async migrationFile => {
  248. const { default: Migrate }: { default: typeof Migration } =
  249. await import(
  250. `./DataModule/models/${modelName}/migrations/${migrationFile}`
  251. );
  252. return new Migrate(this._mongoConnection as Connection);
  253. })
  254. );
  255. }
  256. private async _loadMigrations() {
  257. const models = await readdir(
  258. path.resolve(__dirname, "./DataModule/models/")
  259. );
  260. return Promise.all(
  261. models.map(async modelName => this._loadModelMigrations(modelName))
  262. );
  263. }
  264. private async _runMigrations() {
  265. const migrations = (await this._loadMigrations()).flat();
  266. for (let i = 0; i < migrations.length; i += 1) {
  267. const migration = migrations[i];
  268. // eslint-disable-next-line no-await-in-loop
  269. await migration.up();
  270. }
  271. }
  272. private async _loadModelJobs() {
  273. if (!this._models) throw new Error("Models not loaded");
  274. await Promise.all(
  275. Object.keys(this._models).map(async modelName => {
  276. let jobs;
  277. try {
  278. jobs = await readdir(
  279. path.resolve(
  280. __dirname,
  281. `./${this.constructor.name}/models/${modelName}/jobs/`
  282. )
  283. );
  284. } catch (error) {
  285. if (error.code === "ENOENT") return;
  286. throw error;
  287. }
  288. await Promise.all(
  289. jobs.map(async jobFile => {
  290. const { default: Job } = await import(
  291. `./${this.constructor.name}/models/${modelName}/jobs/${jobFile}`
  292. );
  293. this._jobs[Job.getName()] = Job;
  294. })
  295. );
  296. })
  297. );
  298. }
  299. }
  300. export type DataModuleJobs = {
  301. [Property in keyof UniqueMethods<DataModule>]: {
  302. payload: Parameters<UniqueMethods<DataModule>[Property]>[1];
  303. returns: Awaited<ReturnType<UniqueMethods<DataModule>[Property]>>;
  304. };
  305. };
  306. export default new DataModule();