DataModule.ts 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. import config from "config";
  2. // import { createClient, RedisClientType } from "redis";
  3. import mongoose, {
  4. Connection,
  5. MongooseDefaultQueryMiddleware,
  6. MongooseDistinctQueryMiddleware,
  7. MongooseQueryOrDocumentMiddleware
  8. } from "mongoose";
  9. import { readdir } from "fs/promises";
  10. import path from "path";
  11. import JobContext from "../JobContext";
  12. import BaseModule, { ModuleStatus } from "../BaseModule";
  13. import { UniqueMethods } from "../types/Modules";
  14. import { Models } from "../types/Models";
  15. import { Schemas } from "../types/Schemas";
  16. import documentVersionPlugin from "../schemas/plugins/documentVersion";
  17. import getDataPlugin from "../schemas/plugins/getData";
  18. import Migration from "../Migration";
  19. export default class DataModule extends BaseModule {
  20. private models?: Models;
  21. private mongoConnection?: Connection;
  22. // private redisClient?: RedisClientType;
  23. /**
  24. * Data Module
  25. */
  26. public constructor() {
  27. super("data");
  28. }
  29. /**
  30. * startup - Startup data module
  31. */
  32. public override async startup() {
  33. await super.startup();
  34. const { user, password, host, port, database } = config.get<{
  35. user: string;
  36. password: string;
  37. host: string;
  38. port: number;
  39. database: string;
  40. }>("mongo");
  41. const mongoUrl = `mongodb://${user}:${password}@${host}:${port}/${database}`;
  42. this.mongoConnection = await mongoose
  43. .createConnection(mongoUrl)
  44. .asPromise();
  45. this.mongoConnection.set("runValidators", true);
  46. this.mongoConnection.set("sanitizeFilter", true);
  47. this.mongoConnection.set("strict", "throw");
  48. this.mongoConnection.set("strictQuery", "throw");
  49. mongoose.SchemaTypes.String.set("trim", true);
  50. this.mongoConnection.plugin(documentVersionPlugin);
  51. this.mongoConnection.plugin(getDataPlugin, {
  52. tags: ["useGetDataPlugin"]
  53. });
  54. await this.runMigrations();
  55. await this.loadModels();
  56. await this.syncModelIndexes();
  57. // @ts-ignore
  58. // this.redisClient = createClient({ ...config.get("redis") });
  59. //
  60. // await this.redisClient.connect();
  61. //
  62. // const redisConfigResponse = await this.redisClient.sendCommand([
  63. // "CONFIG",
  64. // "GET",
  65. // "notify-keyspace-events"
  66. // ]);
  67. //
  68. // if (
  69. // !(
  70. // Array.isArray(redisConfigResponse) &&
  71. // redisConfigResponse[1] === "xE"
  72. // )
  73. // )
  74. // throw new Error(
  75. // `notify-keyspace-events is NOT configured correctly! It is set to: ${
  76. // (Array.isArray(redisConfigResponse) &&
  77. // redisConfigResponse[1]) ||
  78. // "unknown"
  79. // }`
  80. // );
  81. await super.started();
  82. }
  83. /**
  84. * shutdown - Shutdown data module
  85. */
  86. public override async shutdown() {
  87. await super.shutdown();
  88. // if (this.redisClient) await this.redisClient.quit();
  89. if (this.mongoConnection) await this.mongoConnection.close();
  90. }
  91. /**
  92. * loadModel - Import and load model schema
  93. *
  94. * @param modelName - Name of the model
  95. * @returns Model
  96. */
  97. private async loadModel<ModelName extends keyof Models>(
  98. modelName: ModelName
  99. ): Promise<Models[ModelName]> {
  100. if (!this.mongoConnection) throw new Error("Mongo is not available");
  101. const { schema }: { schema: Schemas[ModelName] } = await import(
  102. `../schemas/${modelName.toString()}`
  103. );
  104. const preMethods: string[] = [
  105. "aggregate",
  106. "count",
  107. "countDocuments",
  108. "deleteOne",
  109. "deleteMany",
  110. "estimatedDocumentCount",
  111. "find",
  112. "findOne",
  113. "findOneAndDelete",
  114. "findOneAndRemove",
  115. "findOneAndReplace",
  116. "findOneAndUpdate",
  117. "init",
  118. "insertMany",
  119. "remove",
  120. "replaceOne",
  121. "save",
  122. "update",
  123. "updateOne",
  124. "updateMany",
  125. "validate"
  126. ];
  127. preMethods.forEach(preMethod => {
  128. // @ts-ignore
  129. schema.pre(preMethods, () => {
  130. console.log(`Pre-${preMethod}!`);
  131. });
  132. });
  133. return this.mongoConnection.model(modelName.toString(), schema);
  134. }
  135. /**
  136. * loadModels - Load and initialize all models
  137. *
  138. * @returns Promise
  139. */
  140. private async loadModels() {
  141. this.models = {
  142. abc: await this.loadModel("abc"),
  143. news: await this.loadModel("news"),
  144. station: await this.loadModel("station")
  145. };
  146. }
  147. /**
  148. * syncModelIndexes - Sync indexes for all models
  149. */
  150. private async syncModelIndexes() {
  151. if (!this.models) throw new Error("Models not loaded");
  152. await Promise.all(
  153. Object.values(this.models).map(model => model.syncIndexes())
  154. );
  155. }
  156. /**
  157. * getModel - Get model
  158. *
  159. * @returns Model
  160. */
  161. public async getModel<ModelName extends keyof Models>(
  162. jobContext: JobContext,
  163. payload: ModelName | { name: ModelName }
  164. ) {
  165. if (!this.models) throw new Error("Models not loaded");
  166. if (this.getStatus() !== ModuleStatus.STARTED)
  167. throw new Error("Module not started");
  168. const name = typeof payload === "object" ? payload.name : payload;
  169. return this.models[name];
  170. }
  171. private async loadMigrations() {
  172. if (!this.mongoConnection) throw new Error("Mongo is not available");
  173. const migrations = await readdir(
  174. path.resolve(__dirname, "../schemas/migrations/")
  175. );
  176. return Promise.all(
  177. migrations.map(async migrationFile => {
  178. const { default: Migrate }: { default: typeof Migration } =
  179. await import(`../schemas/migrations/${migrationFile}`);
  180. return new Migrate(this.mongoConnection as Connection);
  181. })
  182. );
  183. }
  184. private async runMigrations() {
  185. const migrations = await this.loadMigrations();
  186. for (let i = 0; i < migrations.length; i += 1) {
  187. const migration = migrations[i];
  188. // eslint-disable-next-line no-await-in-loop
  189. await migration.up();
  190. }
  191. }
  192. }
  193. export type DataModuleJobs = {
  194. [Property in keyof UniqueMethods<DataModule>]: {
  195. payload: Parameters<UniqueMethods<DataModule>[Property]>[1];
  196. returns: Awaited<ReturnType<UniqueMethods<DataModule>[Property]>>;
  197. };
  198. };