DataModule.ts 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484
  1. import config from "config";
  2. // import { createClient, RedisClientType } from "redis";
  3. import mongoose, {
  4. Connection,
  5. MongooseDefaultQueryMiddleware,
  6. MongooseDistinctQueryMiddleware,
  7. MongooseQueryOrDocumentMiddleware
  8. } from "mongoose";
  9. import { patchHistoryPlugin, patchEventEmitter } from "ts-patch-mongoose";
  10. import { readdir } from "fs/promises";
  11. import path from "path";
  12. import JobContext from "../JobContext";
  13. import BaseModule, { ModuleStatus } from "../BaseModule";
  14. import { UniqueMethods } from "../types/Modules";
  15. import { Models } from "../types/Models";
  16. import { Schemas } from "../types/Schemas";
  17. import documentVersionPlugin from "../schemas/plugins/documentVersion";
  18. import getDataPlugin from "../schemas/plugins/getData";
  19. import Migration from "../Migration";
  20. /**
  21. * Experimental: function to get all nested keys from a MongoDB query object
  22. */
  23. function getAllKeys(obj: object) {
  24. const keys: string[] = [];
  25. function processObject(obj: object, parentKey = "") {
  26. let returnChanged = false;
  27. // eslint-disable-next-line
  28. for (let key in obj) {
  29. // eslint-disable-next-line
  30. if (obj.hasOwnProperty(key)) {
  31. if (key.startsWith("$")) {
  32. // eslint-disable-next-line
  33. // @ts-ignore
  34. // eslint-disable-next-line
  35. processNestedObject(obj[key], parentKey); // Process nested keys without including the current key
  36. // eslint-disable-next-line
  37. continue; // Skip the current key
  38. }
  39. const currentKey = parentKey ? `${parentKey}.${key}` : key;
  40. // eslint-disable-next-line
  41. // @ts-ignore
  42. if (typeof obj[key] === "object" && obj[key] !== null) {
  43. // eslint-disable-next-line
  44. // @ts-ignore
  45. if (Array.isArray(obj[key])) {
  46. // eslint-disable-next-line
  47. // @ts-ignore
  48. // eslint-disable-next-line
  49. if (processArray(obj[key], currentKey)) {
  50. returnChanged = true;
  51. // eslint-disable-next-line
  52. continue;
  53. }
  54. }
  55. // eslint-disable-next-line
  56. // @ts-ignore
  57. else if (processObject(obj[key], currentKey)) {
  58. returnChanged = true;
  59. // eslint-disable-next-line
  60. continue;
  61. }
  62. }
  63. keys.push(currentKey);
  64. returnChanged = true;
  65. }
  66. }
  67. return returnChanged;
  68. }
  69. function processArray(arr: Array<any>, parentKey: string) {
  70. let returnChanged = false;
  71. for (let i = 0; i < arr.length; i += 1) {
  72. const currentKey = parentKey;
  73. if (typeof arr[i] === "object" && arr[i] !== null) {
  74. if (Array.isArray(arr[i])) {
  75. if (processArray(arr[i], currentKey)) returnChanged = true;
  76. } else if (processObject(arr[i], currentKey))
  77. returnChanged = true;
  78. }
  79. }
  80. return returnChanged;
  81. }
  82. function processNestedObject(obj: object, parentKey: string) {
  83. if (typeof obj === "object" && obj !== null) {
  84. if (Array.isArray(obj)) {
  85. processArray(obj, parentKey);
  86. } else {
  87. processObject(obj, parentKey);
  88. }
  89. }
  90. }
  91. processObject(obj);
  92. return keys;
  93. }
  94. export default class DataModule extends BaseModule {
  95. private models?: Models;
  96. private mongoConnection?: Connection;
  97. // private redisClient?: RedisClientType;
  98. /**
  99. * Data Module
  100. */
  101. public constructor() {
  102. super("data");
  103. this.dependentModules = ["events"];
  104. }
  105. /**
  106. * startup - Startup data module
  107. */
  108. public override async startup() {
  109. await super.startup();
  110. await this.createMongoConnection();
  111. await this.runMigrations();
  112. await this.loadModels();
  113. await this.syncModelIndexes();
  114. await this.defineModelJobs();
  115. // @ts-ignore
  116. // this.redisClient = createClient({ ...config.get("redis") });
  117. //
  118. // await this.redisClient.connect();
  119. //
  120. // const redisConfigResponse = await this.redisClient.sendCommand([
  121. // "CONFIG",
  122. // "GET",
  123. // "notify-keyspace-events"
  124. // ]);
  125. //
  126. // if (
  127. // !(
  128. // Array.isArray(redisConfigResponse) &&
  129. // redisConfigResponse[1] === "xE"
  130. // )
  131. // )
  132. // throw new Error(
  133. // `notify-keyspace-events is NOT configured correctly! It is set to: ${
  134. // (Array.isArray(redisConfigResponse) &&
  135. // redisConfigResponse[1]) ||
  136. // "unknown"
  137. // }`
  138. // );
  139. await super.started();
  140. }
  141. /**
  142. * shutdown - Shutdown data module
  143. */
  144. public override async shutdown() {
  145. await super.shutdown();
  146. // if (this.redisClient) await this.redisClient.quit();
  147. patchEventEmitter.removeAllListeners();
  148. if (this.mongoConnection) await this.mongoConnection.close();
  149. await this.stopped();
  150. }
  151. /**
  152. * createMongoConnection - Create mongo connection
  153. */
  154. private async createMongoConnection() {
  155. const { user, password, host, port, database } = config.get<{
  156. user: string;
  157. password: string;
  158. host: string;
  159. port: number;
  160. database: string;
  161. }>("mongo");
  162. const mongoUrl = `mongodb://${user}:${password}@${host}:${port}/${database}`;
  163. this.mongoConnection = await mongoose
  164. .createConnection(mongoUrl)
  165. .asPromise();
  166. this.mongoConnection.set("runValidators", true);
  167. this.mongoConnection.set("sanitizeFilter", true);
  168. this.mongoConnection.set("strict", "throw");
  169. this.mongoConnection.set("strictQuery", "throw");
  170. }
  171. /**
  172. * registerEvents - Register events for schema with event module
  173. */
  174. private async registerEvents<
  175. ModelName extends keyof Models,
  176. SchemaType extends Schemas[keyof ModelName]
  177. >(modelName: ModelName, schema: SchemaType) {
  178. const methods: string[] = [
  179. "aggregate",
  180. "count",
  181. "countDocuments",
  182. "deleteOne",
  183. "deleteMany",
  184. "estimatedDocumentCount",
  185. "find",
  186. "findOne",
  187. "findOneAndDelete",
  188. "findOneAndRemove",
  189. "findOneAndReplace",
  190. "findOneAndUpdate",
  191. // "init",
  192. "insertMany",
  193. "remove",
  194. "replaceOne",
  195. "save",
  196. "update",
  197. "updateOne",
  198. "updateMany"
  199. // "validate"
  200. ];
  201. methods.forEach(method => {
  202. // NOTE: some Mongo selectors may also search through linked documents. Prevent that
  203. schema.pre(method, async function () {
  204. console.log(`Pre-${method}! START`);
  205. if (
  206. this.options?.userContext &&
  207. ["find", "update", "deleteOne", "save"].indexOf(method) ===
  208. -1
  209. )
  210. throw new Error("Method not allowed");
  211. console.log(`Pre-${method}!`, this.options?.userContext);
  212. if (["find", "update", "deleteOne"].indexOf(method) !== -1) {
  213. const filter = this.getFilter();
  214. const filterKeys = getAllKeys(filter);
  215. filterKeys.forEach(filterKey => {
  216. const splitFilterKeys = filterKey
  217. .split(".")
  218. .reduce(
  219. (keys: string[], key: string) =>
  220. keys.length > 0
  221. ? [
  222. ...keys,
  223. `${
  224. keys[keys.length - 1]
  225. }.${key}`
  226. ]
  227. : [key],
  228. []
  229. );
  230. splitFilterKeys.forEach(splitFilterKey => {
  231. const path = this.schema.path(splitFilterKey);
  232. if (!path) {
  233. throw new Error(
  234. "Attempted to query with non-existant property"
  235. );
  236. }
  237. if (path.options.restricted) {
  238. throw new Error(
  239. "Attempted to query with restricted property"
  240. );
  241. }
  242. });
  243. });
  244. console.log(`Pre-${method}!`, filterKeys);
  245. // Here we want to always exclude some properties depending on the model, like passwords/tokens
  246. this.projection({ restrictedName: 0 });
  247. }
  248. console.log(`Pre-${method}! END`);
  249. });
  250. schema.post(method, async function (docOrDocs) {
  251. console.log(`Post-${method} START!`);
  252. console.log(`Post-${method}!`, docOrDocs);
  253. console.log(`Post-${method}!`, this);
  254. console.log(`Post-${method} END!`);
  255. });
  256. });
  257. const { enabled, eventCreated, eventUpdated, eventDeleted } =
  258. schema.get("patchHistory") ?? {};
  259. if (!enabled) return;
  260. Object.entries({
  261. created: eventCreated,
  262. updated: eventUpdated,
  263. deleted: eventDeleted
  264. })
  265. .filter(([, event]) => !!event)
  266. .forEach(([action, event]) => {
  267. patchEventEmitter.on(event, async ({ doc }) => {
  268. await this.jobQueue.runJob("events", "publish", {
  269. channel: `model.${modelName}.${doc._id}.${action}`,
  270. value: doc
  271. });
  272. });
  273. });
  274. }
  275. /**
  276. * loadModel - Import and load model schema
  277. *
  278. * @param modelName - Name of the model
  279. * @returns Model
  280. */
  281. private async loadModel<ModelName extends keyof Models>(
  282. modelName: ModelName
  283. ): Promise<Models[ModelName]> {
  284. if (!this.mongoConnection) throw new Error("Mongo is not available");
  285. const { schema }: { schema: Schemas[ModelName] } = await import(
  286. `../schemas/${modelName.toString()}`
  287. );
  288. schema.plugin(documentVersionPlugin);
  289. schema.set("timestamps", schema.get("timestamps") ?? true);
  290. const patchHistoryConfig = {
  291. enabled: true,
  292. patchHistoryDisabled: true,
  293. eventCreated: `${modelName}.created`,
  294. eventUpdated: `${modelName}.updated`,
  295. eventDeleted: `${modelName}.deleted`,
  296. ...(schema.get("patchHistory") ?? {})
  297. };
  298. schema.set("patchHistory", patchHistoryConfig);
  299. if (patchHistoryConfig.enabled) {
  300. schema.plugin(patchHistoryPlugin, patchHistoryConfig);
  301. }
  302. const { enabled: getDataEnabled = false } = schema.get("getData") ?? {};
  303. if (getDataEnabled) schema.plugin(getDataPlugin);
  304. await this.registerEvents(modelName, schema);
  305. return this.mongoConnection.model(modelName.toString(), schema);
  306. }
  307. /**
  308. * loadModels - Load and initialize all models
  309. *
  310. * @returns Promise
  311. */
  312. private async loadModels() {
  313. mongoose.SchemaTypes.String.set("trim", true);
  314. this.models = {
  315. abc: await this.loadModel("abc"),
  316. news: await this.loadModel("news"),
  317. session: await this.loadModel("session"),
  318. station: await this.loadModel("station"),
  319. user: await this.loadModel("user")
  320. };
  321. }
  322. /**
  323. * syncModelIndexes - Sync indexes for all models
  324. */
  325. private async syncModelIndexes() {
  326. if (!this.models) throw new Error("Models not loaded");
  327. await Promise.all(
  328. Object.values(this.models).map(model => model.syncIndexes())
  329. );
  330. }
  331. /**
  332. * getModel - Get model
  333. *
  334. * @returns Model
  335. */
  336. public async getModel<ModelName extends keyof Models>(
  337. jobContext: JobContext,
  338. payload: ModelName | { name: ModelName }
  339. ) {
  340. if (!this.models) throw new Error("Models not loaded");
  341. if (this.getStatus() !== ModuleStatus.STARTED)
  342. throw new Error("Module not started");
  343. const name = typeof payload === "object" ? payload.name : payload;
  344. return this.models[name];
  345. }
  346. private async loadMigrations() {
  347. if (!this.mongoConnection) throw new Error("Mongo is not available");
  348. const migrations = await readdir(
  349. path.resolve(__dirname, "../schemas/migrations/")
  350. );
  351. return Promise.all(
  352. migrations.map(async migrationFile => {
  353. const { default: Migrate }: { default: typeof Migration } =
  354. await import(`../schemas/migrations/${migrationFile}`);
  355. return new Migrate(this.mongoConnection as Connection);
  356. })
  357. );
  358. }
  359. private async runMigrations() {
  360. const migrations = await this.loadMigrations();
  361. for (let i = 0; i < migrations.length; i += 1) {
  362. const migration = migrations[i];
  363. // eslint-disable-next-line no-await-in-loop
  364. await migration.up();
  365. }
  366. }
  367. private async defineModelJobs() {
  368. if (!this.models) throw new Error("Models not loaded");
  369. await Promise.all(
  370. Object.entries(this.models).map(async ([modelName, model]) => {
  371. await Promise.all(
  372. ["findById"].map(async method => {
  373. this.jobConfig[`${modelName}.${method}`] = {
  374. method: async (context, payload) =>
  375. Object.getPrototypeOf(this)[method](context, {
  376. ...payload,
  377. model: modelName
  378. })
  379. };
  380. })
  381. );
  382. await Promise.all(
  383. Object.keys(model.schema.statics).map(async name => {
  384. this.jobConfig[`${modelName}.${name}`] = {
  385. method: async (...args) => model[name](...args)
  386. };
  387. })
  388. );
  389. })
  390. );
  391. }
  392. private async findById(
  393. context: JobContext,
  394. payload: { model: keyof Models; _id: Types.ObjectId }
  395. ) {
  396. // await context.assertPermission(
  397. // `data.${payload.model}.findById.${payload._id}`
  398. // );
  399. const model = await context.getModel(payload.model);
  400. const query = model.findById(payload._id);
  401. return query.exec();
  402. }
  403. }
  404. export type DataModuleJobs = {
  405. [Property in keyof UniqueMethods<DataModule>]: {
  406. payload: Parameters<UniqueMethods<DataModule>[Property]>[1];
  407. returns: Awaited<ReturnType<UniqueMethods<DataModule>[Property]>>;
  408. };
  409. };