DataModule.ts 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. import config from "config";
  2. // import { createClient, RedisClientType } from "redis";
  3. import mongoose, {
  4. Connection,
  5. MongooseDefaultQueryMiddleware,
  6. MongooseDistinctQueryMiddleware,
  7. MongooseQueryOrDocumentMiddleware
  8. } from "mongoose";
  9. import { patchHistoryPlugin, patchEventEmitter } from "ts-patch-mongoose";
  10. import { readdir } from "fs/promises";
  11. import path from "path";
  12. import JobContext from "../JobContext";
  13. import BaseModule, { ModuleStatus } from "../BaseModule";
  14. import { UniqueMethods } from "../types/Modules";
  15. import { Models } from "../types/Models";
  16. import { Schemas } from "../types/Schemas";
  17. import documentVersionPlugin from "../schemas/plugins/documentVersion";
  18. import getDataPlugin from "../schemas/plugins/getData";
  19. import Migration from "../Migration";
  20. import JobQueue from "../JobQueue";
  21. export default class DataModule extends BaseModule {
  22. private models?: Models;
  23. private mongoConnection?: Connection;
  24. // private redisClient?: RedisClientType;
  25. private jobQueue: JobQueue;
  26. /**
  27. * Data Module
  28. */
  29. public constructor() {
  30. super("data");
  31. this.dependentModules = ["events"];
  32. this.jobQueue = JobQueue.getPrimaryInstance();
  33. }
  34. /**
  35. * startup - Startup data module
  36. */
  37. public override async startup() {
  38. await super.startup();
  39. await this.createMongoConnection();
  40. await this.runMigrations();
  41. await this.loadModels();
  42. await this.syncModelIndexes();
  43. // @ts-ignore
  44. // this.redisClient = createClient({ ...config.get("redis") });
  45. //
  46. // await this.redisClient.connect();
  47. //
  48. // const redisConfigResponse = await this.redisClient.sendCommand([
  49. // "CONFIG",
  50. // "GET",
  51. // "notify-keyspace-events"
  52. // ]);
  53. //
  54. // if (
  55. // !(
  56. // Array.isArray(redisConfigResponse) &&
  57. // redisConfigResponse[1] === "xE"
  58. // )
  59. // )
  60. // throw new Error(
  61. // `notify-keyspace-events is NOT configured correctly! It is set to: ${
  62. // (Array.isArray(redisConfigResponse) &&
  63. // redisConfigResponse[1]) ||
  64. // "unknown"
  65. // }`
  66. // );
  67. await super.started();
  68. }
  69. /**
  70. * shutdown - Shutdown data module
  71. */
  72. public override async shutdown() {
  73. await super.shutdown();
  74. // if (this.redisClient) await this.redisClient.quit();
  75. patchEventEmitter.removeAllListeners();
  76. if (this.mongoConnection) await this.mongoConnection.close();
  77. }
  78. /**
  79. * createMongoConnection - Create mongo connection
  80. */
  81. private async createMongoConnection() {
  82. const { user, password, host, port, database } = config.get<{
  83. user: string;
  84. password: string;
  85. host: string;
  86. port: number;
  87. database: string;
  88. }>("mongo");
  89. const mongoUrl = `mongodb://${user}:${password}@${host}:${port}/${database}`;
  90. this.mongoConnection = await mongoose
  91. .createConnection(mongoUrl)
  92. .asPromise();
  93. this.mongoConnection.set("runValidators", true);
  94. this.mongoConnection.set("sanitizeFilter", true);
  95. this.mongoConnection.set("strict", "throw");
  96. this.mongoConnection.set("strictQuery", "throw");
  97. }
  98. /**
  99. * registerEvents - Register events for schema with event module
  100. */
  101. private async registerEvents<
  102. ModelName extends keyof Models,
  103. SchemaType extends Schemas[keyof ModelName]
  104. >(modelName: ModelName, schema: SchemaType) {
  105. // const preMethods: string[] = [
  106. // "aggregate",
  107. // "count",
  108. // "countDocuments",
  109. // "deleteOne",
  110. // "deleteMany",
  111. // "estimatedDocumentCount",
  112. // "find",
  113. // "findOne",
  114. // "findOneAndDelete",
  115. // "findOneAndRemove",
  116. // "findOneAndReplace",
  117. // "findOneAndUpdate",
  118. // "init",
  119. // "insertMany",
  120. // "remove",
  121. // "replaceOne",
  122. // "save",
  123. // "update",
  124. // "updateOne",
  125. // "updateMany",
  126. // "validate"
  127. // ];
  128. // preMethods.forEach(preMethod => {
  129. // // @ts-ignore
  130. // schema.pre(preMethods, () => {
  131. // console.log(`Pre-${preMethod}!`);
  132. // });
  133. // });
  134. const { enabled, eventCreated, eventUpdated, eventDeleted } =
  135. schema.get("patchHistory") ?? {};
  136. if (!enabled) return;
  137. Object.entries({
  138. created: eventCreated,
  139. updated: eventUpdated,
  140. deleted: eventDeleted
  141. })
  142. .filter(([, event]) => !!event)
  143. .forEach(([action, event]) => {
  144. patchEventEmitter.on(event, async ({ doc }) => {
  145. await this.jobQueue.runJob("events", "publish", {
  146. channel: `model.${modelName}.${doc._id}.${action}`,
  147. value: doc
  148. });
  149. });
  150. });
  151. }
  152. /**
  153. * loadModel - Import and load model schema
  154. *
  155. * @param modelName - Name of the model
  156. * @returns Model
  157. */
  158. private async loadModel<ModelName extends keyof Models>(
  159. modelName: ModelName
  160. ): Promise<Models[ModelName]> {
  161. if (!this.mongoConnection) throw new Error("Mongo is not available");
  162. const { schema }: { schema: Schemas[ModelName] } = await import(
  163. `../schemas/${modelName.toString()}`
  164. );
  165. schema.plugin(documentVersionPlugin);
  166. schema.set("timestamps", schema.get("timestamps") ?? true);
  167. const patchHistoryConfig = {
  168. enabled: true,
  169. patchHistoryDisabled: true,
  170. eventCreated: `${modelName}.created`,
  171. eventUpdated: `${modelName}.updated`,
  172. eventDeleted: `${modelName}.deleted`,
  173. ...(schema.get("patchHistory") ?? {})
  174. };
  175. schema.set("patchHistory", patchHistoryConfig);
  176. if (patchHistoryConfig.enabled) {
  177. schema.plugin(patchHistoryPlugin, patchHistoryConfig);
  178. }
  179. const { enabled: getDataEnabled = false } = schema.get("getData") ?? {};
  180. if (getDataEnabled) schema.plugin(getDataPlugin);
  181. await this.registerEvents(modelName, schema);
  182. return this.mongoConnection.model(modelName.toString(), schema);
  183. }
  184. /**
  185. * loadModels - Load and initialize all models
  186. *
  187. * @returns Promise
  188. */
  189. private async loadModels() {
  190. mongoose.SchemaTypes.String.set("trim", true);
  191. this.models = {
  192. abc: await this.loadModel("abc"),
  193. news: await this.loadModel("news"),
  194. station: await this.loadModel("station")
  195. };
  196. }
  197. /**
  198. * syncModelIndexes - Sync indexes for all models
  199. */
  200. private async syncModelIndexes() {
  201. if (!this.models) throw new Error("Models not loaded");
  202. await Promise.all(
  203. Object.values(this.models).map(model => model.syncIndexes())
  204. );
  205. }
  206. /**
  207. * getModel - Get model
  208. *
  209. * @returns Model
  210. */
  211. public async getModel<ModelName extends keyof Models>(
  212. jobContext: JobContext,
  213. payload: ModelName | { name: ModelName }
  214. ) {
  215. if (!this.models) throw new Error("Models not loaded");
  216. if (this.getStatus() !== ModuleStatus.STARTED)
  217. throw new Error("Module not started");
  218. const name = typeof payload === "object" ? payload.name : payload;
  219. return this.models[name];
  220. }
  221. private async loadMigrations() {
  222. if (!this.mongoConnection) throw new Error("Mongo is not available");
  223. const migrations = await readdir(
  224. path.resolve(__dirname, "../schemas/migrations/")
  225. );
  226. return Promise.all(
  227. migrations.map(async migrationFile => {
  228. const { default: Migrate }: { default: typeof Migration } =
  229. await import(`../schemas/migrations/${migrationFile}`);
  230. return new Migrate(this.mongoConnection as Connection);
  231. })
  232. );
  233. }
  234. private async runMigrations() {
  235. const migrations = await this.loadMigrations();
  236. for (let i = 0; i < migrations.length; i += 1) {
  237. const migration = migrations[i];
  238. // eslint-disable-next-line no-await-in-loop
  239. await migration.up();
  240. }
  241. }
  242. }
  243. export type DataModuleJobs = {
  244. [Property in keyof UniqueMethods<DataModule>]: {
  245. payload: Parameters<UniqueMethods<DataModule>[Property]>[1];
  246. returns: Awaited<ReturnType<UniqueMethods<DataModule>[Property]>>;
  247. };
  248. };