DataModule.ts 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. import config from "config";
  2. import mongoose, { Connection, Model, Schema, SchemaTypes } from "mongoose";
  3. import { patchHistoryPlugin, patchEventEmitter } from "ts-patch-mongoose";
  4. import { readdir } from "fs/promises";
  5. import path from "path";
  6. import updateVersioningPlugin from "mongoose-update-versioning";
  7. import { forEachIn } from "@common/utils/forEachIn";
  8. import Migration from "@/modules/DataModule/Migration";
  9. import documentVersionPlugin from "@/modules/DataModule/plugins/documentVersion";
  10. import getDataPlugin from "@/modules/DataModule/plugins/getData";
  11. import BaseModule, { ModuleStatus } from "@/BaseModule";
  12. import EventsModule from "./EventsModule";
  13. import DataModuleJob from "./DataModule/DataModuleJob";
  14. import Job from "@/Job";
  15. export class DataModule extends BaseModule {
  16. private _models?: Record<string, Model<any>>;
  17. private _mongoConnection?: Connection;
  18. declare _jobs: Record<string, typeof Job | typeof DataModuleJob>;
  19. /**
  20. * Data Module
  21. */
  22. public constructor() {
  23. super("data");
  24. this._dependentModules = ["events"];
  25. }
  26. /**
  27. * startup - Startup data module
  28. */
  29. public override async startup() {
  30. await super.startup();
  31. await this._createMongoConnection();
  32. await this._runMigrations();
  33. await this._loadModels();
  34. await this._syncModelIndexes();
  35. await this._loadModelJobs();
  36. await this._loadModelEvents();
  37. await super._started();
  38. }
  39. /**
  40. * shutdown - Shutdown data module
  41. */
  42. public override async shutdown() {
  43. await super.shutdown();
  44. patchEventEmitter.removeAllListeners();
  45. if (this._mongoConnection) await this._mongoConnection.close();
  46. await this._stopped();
  47. }
  48. /**
  49. * createMongoConnection - Create mongo connection
  50. */
  51. private async _createMongoConnection() {
  52. mongoose.set({
  53. runValidators: true,
  54. sanitizeFilter: true,
  55. strict: "throw",
  56. strictQuery: "throw"
  57. });
  58. const { user, password, host, port, database } = config.get<{
  59. user: string;
  60. password: string;
  61. host: string;
  62. port: number;
  63. database: string;
  64. }>("mongo");
  65. const mongoUrl = `mongodb://${user}:${password}@${host}:${port}/${database}`;
  66. this._mongoConnection = await mongoose
  67. .createConnection(mongoUrl)
  68. .asPromise();
  69. }
  70. /**
  71. * registerEvents - Register events for schema with event module
  72. */
  73. private async _registerEvents(modelName: string, schema: Schema<any>) {
  74. const { enabled, eventCreated, eventUpdated, eventDeleted } =
  75. schema.get("patchHistory") ?? {};
  76. if (!enabled) return;
  77. Object.entries({
  78. created: eventCreated,
  79. updated: eventUpdated,
  80. deleted: eventDeleted
  81. })
  82. .filter(([, event]) => !!event)
  83. .forEach(([action, event]) => {
  84. patchEventEmitter.on(event!, async ({ doc, oldDoc }) => {
  85. const modelId = doc?._id ?? oldDoc?._id;
  86. const Model = await this.getModel(modelName);
  87. if (doc) doc = Model.hydrate(doc);
  88. if (oldDoc) oldDoc = Model.hydrate(oldDoc);
  89. if (!modelId && action !== "created")
  90. throw new Error(`Model Id not found for "${event}"`);
  91. const EventClass = this.getEvent(`${modelName}.${action}`);
  92. await EventsModule.publish(
  93. new EventClass({ doc, oldDoc }, modelId)
  94. );
  95. });
  96. });
  97. }
  98. /**
  99. * registerEvents - Register events for schema with event module
  100. */
  101. private async _registerEventListeners(schema: Schema<any>) {
  102. const eventListeners = schema.get("eventListeners");
  103. if (
  104. typeof eventListeners !== "object" ||
  105. Object.keys(eventListeners).length === 0
  106. )
  107. return;
  108. await forEachIn(
  109. Object.entries(eventListeners),
  110. async ([event, callback]) =>
  111. EventsModule.pSubscribe(event, callback)
  112. );
  113. }
  114. /**
  115. * loadModel - Import and load model schema
  116. *
  117. * @param modelName - Name of the model
  118. * @returns Model
  119. */
  120. private async _loadModel(modelName: string): Promise<Model<any>> {
  121. if (!this._mongoConnection) throw new Error("Mongo is not available");
  122. const { schema }: { schema: Schema<any> } = await import(
  123. `./DataModule/models/${modelName.toString()}/schema`
  124. );
  125. schema.plugin(documentVersionPlugin);
  126. schema.set("timestamps", schema.get("timestamps") ?? true);
  127. const patchHistoryConfig = {
  128. enabled: true,
  129. patchHistoryDisabled: true,
  130. eventCreated: `${modelName}.created`,
  131. eventUpdated: `${modelName}.updated`,
  132. eventDeleted: `${modelName}.deleted`,
  133. ...(schema.get("patchHistory") ?? {})
  134. };
  135. schema.set("patchHistory", patchHistoryConfig);
  136. if (patchHistoryConfig.enabled) {
  137. schema.plugin(patchHistoryPlugin, patchHistoryConfig);
  138. }
  139. const { enabled: getDataEnabled = false } = schema.get("getData") ?? {};
  140. if (getDataEnabled) schema.plugin(getDataPlugin);
  141. schema.static("getModelName", () => modelName);
  142. await this._registerEvents(modelName, schema);
  143. await this._registerEventListeners(schema);
  144. schema.set("toObject", { getters: true, virtuals: true });
  145. schema.set("toJSON", { getters: true, virtuals: true });
  146. schema.virtual("_name").get(() => modelName);
  147. schema.plugin(updateVersioningPlugin);
  148. await forEachIn(
  149. Object.entries(schema.paths).filter(
  150. ([, type]) =>
  151. type instanceof SchemaTypes.ObjectId ||
  152. (type instanceof SchemaTypes.Array &&
  153. type.caster instanceof SchemaTypes.ObjectId)
  154. ),
  155. async ([key, type]) => {
  156. const { ref } =
  157. (type instanceof SchemaTypes.Array
  158. ? type.caster?.options
  159. : type?.options) ?? {};
  160. if (ref)
  161. schema.path(key).get((value: any) => {
  162. if (
  163. typeof value === "object" &&
  164. type instanceof SchemaTypes.ObjectId
  165. )
  166. return {
  167. _id: value,
  168. _name: ref
  169. };
  170. if (
  171. Array.isArray(value) &&
  172. type instanceof SchemaTypes.Array
  173. )
  174. return value.map(item =>
  175. item === null
  176. ? null
  177. : {
  178. _id: item,
  179. _name: ref
  180. }
  181. );
  182. return value;
  183. });
  184. }
  185. );
  186. return this._mongoConnection.model(modelName.toString(), schema);
  187. }
  188. /**
  189. * loadModels - Load and initialize all models
  190. *
  191. * @returns Promise
  192. */
  193. private async _loadModels() {
  194. mongoose.SchemaTypes.String.set("trim", true);
  195. this._models = {
  196. abc: await this._loadModel("abc"),
  197. minifiedUsers: await this._loadModel("minifiedUsers"),
  198. news: await this._loadModel("news"),
  199. sessions: await this._loadModel("sessions"),
  200. stations: await this._loadModel("stations"),
  201. users: await this._loadModel("users")
  202. };
  203. }
  204. /**
  205. * syncModelIndexes - Sync indexes for all models
  206. */
  207. private async _syncModelIndexes() {
  208. if (!this._models) throw new Error("Models not loaded");
  209. await forEachIn(
  210. Object.values(this._models).filter(
  211. model => model.schema.get("autoIndex") !== false
  212. ),
  213. model => model.syncIndexes()
  214. );
  215. }
  216. /**
  217. * getModel - Get model
  218. *
  219. * @returns Model
  220. */
  221. public async getModel<ModelType extends Model<any>>(
  222. name: string
  223. ): Promise<ModelType> {
  224. if (!this._models) throw new Error("Models not loaded");
  225. if (this.getStatus() !== ModuleStatus.STARTED)
  226. throw new Error("Module not started");
  227. if (!this._models[name]) throw new Error("Model not found");
  228. return this._models[name] as ModelType;
  229. }
  230. private async _loadModelMigrations(modelName: string) {
  231. if (!this._mongoConnection) throw new Error("Mongo is not available");
  232. let migrations;
  233. try {
  234. migrations = await readdir(
  235. path.resolve(
  236. __dirname,
  237. `./DataModule/models/${modelName}/migrations/`
  238. )
  239. );
  240. } catch (error) {
  241. if (
  242. error instanceof Error &&
  243. "code" in error &&
  244. error.code === "ENOENT"
  245. )
  246. return [];
  247. throw error;
  248. }
  249. return forEachIn(migrations, async migrationFile => {
  250. const { default: Migrate }: { default: typeof Migration } =
  251. await import(
  252. `./DataModule/models/${modelName}/migrations/${migrationFile}`
  253. );
  254. return new Migrate(this._mongoConnection as Connection);
  255. });
  256. }
  257. private async _loadMigrations() {
  258. const models = await readdir(
  259. path.resolve(__dirname, "./DataModule/models/")
  260. );
  261. return forEachIn(models, async modelName =>
  262. this._loadModelMigrations(modelName)
  263. );
  264. }
  265. private async _runMigrations() {
  266. const migrations = (await this._loadMigrations()).flat();
  267. for (let i = 0; i < migrations.length; i += 1) {
  268. const migration = migrations[i];
  269. // eslint-disable-next-line no-await-in-loop
  270. await migration.up();
  271. }
  272. }
  273. private async _loadModelJobs() {
  274. if (!this._models) throw new Error("Models not loaded");
  275. await forEachIn(Object.keys(this._models), async modelName => {
  276. let jobs;
  277. try {
  278. jobs = await readdir(
  279. path.resolve(
  280. __dirname,
  281. `./${this.constructor.name}/models/${modelName}/jobs/`
  282. )
  283. );
  284. } catch (error) {
  285. if (
  286. error instanceof Error &&
  287. "code" in error &&
  288. error.code === "ENOENT"
  289. )
  290. return;
  291. throw error;
  292. }
  293. await forEachIn(jobs, async jobFile => {
  294. const { default: Job } = await import(
  295. `./${this.constructor.name}/models/${modelName}/jobs/${jobFile}`
  296. );
  297. this._jobs[Job.getName()] = Job;
  298. });
  299. });
  300. }
  301. private async _loadModelEvents() {
  302. if (!this._models) throw new Error("Models not loaded");
  303. await forEachIn(Object.keys(this._models), async modelName => {
  304. let events;
  305. try {
  306. events = await readdir(
  307. path.resolve(
  308. __dirname,
  309. `./${this.constructor.name}/models/${modelName}/events/`
  310. )
  311. );
  312. } catch (error) {
  313. if (
  314. error instanceof Error &&
  315. "code" in error &&
  316. error.code === "ENOENT"
  317. )
  318. return;
  319. throw error;
  320. }
  321. await forEachIn(events, async eventFile => {
  322. const { default: EventClass } = await import(
  323. `./${this.constructor.name}/models/${modelName}/events/${eventFile}`
  324. );
  325. this._events[EventClass.getName()] = EventClass;
  326. });
  327. });
  328. }
  329. }
  330. export default new DataModule();