DataModule.ts 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. import config from "config";
  2. // import { createClient, RedisClientType } from "redis";
  3. import mongoose, {
  4. Connection,
  5. MongooseDefaultQueryMiddleware,
  6. MongooseDistinctQueryMiddleware,
  7. MongooseQueryOrDocumentMiddleware
  8. } from "mongoose";
  9. import { patchHistoryPlugin, patchEventEmitter } from "ts-patch-mongoose";
  10. import { readdir } from "fs/promises";
  11. import path from "path";
  12. import JobContext from "../JobContext";
  13. import BaseModule, { ModuleStatus } from "../BaseModule";
  14. import { UniqueMethods } from "../types/Modules";
  15. import { Models } from "../types/Models";
  16. import { Schemas } from "../types/Schemas";
  17. import documentVersionPlugin from "../schemas/plugins/documentVersion";
  18. import getDataPlugin from "../schemas/plugins/getData";
  19. import Migration from "../Migration";
  20. /**
  21. * Experimental: function to get all nested keys from a MongoDB query object
  22. */
  23. function getAllKeys(obj: object) {
  24. const keys: string[] = [];
  25. function processObject(obj: object, parentKey = "") {
  26. let returnChanged = false;
  27. // eslint-disable-next-line
  28. for (let key in obj) {
  29. // eslint-disable-next-line
  30. if (obj.hasOwnProperty(key)) {
  31. if (key.startsWith("$")) {
  32. // eslint-disable-next-line
  33. // @ts-ignore
  34. // eslint-disable-next-line
  35. processNestedObject(obj[key], parentKey); // Process nested keys without including the current key
  36. // eslint-disable-next-line
  37. continue; // Skip the current key
  38. }
  39. const currentKey = parentKey ? `${parentKey}.${key}` : key;
  40. // eslint-disable-next-line
  41. // @ts-ignore
  42. if (typeof obj[key] === "object" && obj[key] !== null) {
  43. // eslint-disable-next-line
  44. // @ts-ignore
  45. if (Array.isArray(obj[key])) {
  46. // eslint-disable-next-line
  47. // @ts-ignore
  48. // eslint-disable-next-line
  49. if (processArray(obj[key], currentKey)) {
  50. returnChanged = true;
  51. // eslint-disable-next-line
  52. continue;
  53. }
  54. }
  55. // eslint-disable-next-line
  56. // @ts-ignore
  57. else if (processObject(obj[key], currentKey)) {
  58. returnChanged = true;
  59. // eslint-disable-next-line
  60. continue;
  61. }
  62. }
  63. keys.push(currentKey);
  64. returnChanged = true;
  65. }
  66. }
  67. return returnChanged;
  68. }
  69. function processArray(arr: Array<any>, parentKey: string) {
  70. let returnChanged = false;
  71. for (let i = 0; i < arr.length; i += 1) {
  72. const currentKey = parentKey;
  73. if (typeof arr[i] === "object" && arr[i] !== null) {
  74. if (Array.isArray(arr[i])) {
  75. if (processArray(arr[i], currentKey)) returnChanged = true;
  76. } else if (processObject(arr[i], currentKey))
  77. returnChanged = true;
  78. }
  79. }
  80. return returnChanged;
  81. }
  82. function processNestedObject(obj: object, parentKey: string) {
  83. if (typeof obj === "object" && obj !== null) {
  84. if (Array.isArray(obj)) {
  85. processArray(obj, parentKey);
  86. } else {
  87. processObject(obj, parentKey);
  88. }
  89. }
  90. }
  91. processObject(obj);
  92. return keys;
  93. }
  94. export default class DataModule extends BaseModule {
  95. private models?: Models;
  96. private mongoConnection?: Connection;
  97. // private redisClient?: RedisClientType;
  98. /**
  99. * Data Module
  100. */
  101. public constructor() {
  102. super("data");
  103. this.dependentModules = ["events"];
  104. }
  105. /**
  106. * startup - Startup data module
  107. */
  108. public override async startup() {
  109. await super.startup();
  110. await this.createMongoConnection();
  111. await this.runMigrations();
  112. await this.loadModels();
  113. await this.syncModelIndexes();
  114. // @ts-ignore
  115. // this.redisClient = createClient({ ...config.get("redis") });
  116. //
  117. // await this.redisClient.connect();
  118. //
  119. // const redisConfigResponse = await this.redisClient.sendCommand([
  120. // "CONFIG",
  121. // "GET",
  122. // "notify-keyspace-events"
  123. // ]);
  124. //
  125. // if (
  126. // !(
  127. // Array.isArray(redisConfigResponse) &&
  128. // redisConfigResponse[1] === "xE"
  129. // )
  130. // )
  131. // throw new Error(
  132. // `notify-keyspace-events is NOT configured correctly! It is set to: ${
  133. // (Array.isArray(redisConfigResponse) &&
  134. // redisConfigResponse[1]) ||
  135. // "unknown"
  136. // }`
  137. // );
  138. await super.started();
  139. }
  140. /**
  141. * shutdown - Shutdown data module
  142. */
  143. public override async shutdown() {
  144. await super.shutdown();
  145. // if (this.redisClient) await this.redisClient.quit();
  146. patchEventEmitter.removeAllListeners();
  147. if (this.mongoConnection) await this.mongoConnection.close();
  148. await this.stopped();
  149. }
  150. /**
  151. * createMongoConnection - Create mongo connection
  152. */
  153. private async createMongoConnection() {
  154. const { user, password, host, port, database } = config.get<{
  155. user: string;
  156. password: string;
  157. host: string;
  158. port: number;
  159. database: string;
  160. }>("mongo");
  161. const mongoUrl = `mongodb://${user}:${password}@${host}:${port}/${database}`;
  162. this.mongoConnection = await mongoose
  163. .createConnection(mongoUrl)
  164. .asPromise();
  165. this.mongoConnection.set("runValidators", true);
  166. this.mongoConnection.set("sanitizeFilter", true);
  167. this.mongoConnection.set("strict", "throw");
  168. this.mongoConnection.set("strictQuery", "throw");
  169. }
  170. /**
  171. * registerEvents - Register events for schema with event module
  172. */
  173. private async registerEvents<
  174. ModelName extends keyof Models,
  175. SchemaType extends Schemas[keyof ModelName]
  176. >(modelName: ModelName, schema: SchemaType) {
  177. const methods: string[] = [
  178. "aggregate",
  179. "count",
  180. "countDocuments",
  181. "deleteOne",
  182. "deleteMany",
  183. "estimatedDocumentCount",
  184. "find",
  185. "findOne",
  186. "findOneAndDelete",
  187. "findOneAndRemove",
  188. "findOneAndReplace",
  189. "findOneAndUpdate",
  190. // "init",
  191. "insertMany",
  192. "remove",
  193. "replaceOne",
  194. "save",
  195. "update",
  196. "updateOne",
  197. "updateMany"
  198. // "validate"
  199. ];
  200. methods.forEach(method => {
  201. // NOTE: some Mongo selectors may also search through linked documents. Prevent that
  202. schema.pre(method, async function () {
  203. console.log(`Pre-${method}! START`);
  204. if (
  205. this.options?.userContext &&
  206. ["find", "update", "deleteOne", "save"].indexOf(method) ===
  207. -1
  208. )
  209. throw new Error("Method not allowed");
  210. console.log(`Pre-${method}!`, this.options?.userContext);
  211. if (["find", "update", "deleteOne"].indexOf(method) !== -1) {
  212. const filter = this.getFilter();
  213. const filterKeys = getAllKeys(filter);
  214. filterKeys.forEach(filterKey => {
  215. const splitFilterKeys = filterKey
  216. .split(".")
  217. .reduce(
  218. (keys: string[], key: string) =>
  219. keys.length > 0
  220. ? [
  221. ...keys,
  222. `${
  223. keys[keys.length - 1]
  224. }.${key}`
  225. ]
  226. : [key],
  227. []
  228. );
  229. splitFilterKeys.forEach(splitFilterKey => {
  230. const path = this.schema.path(splitFilterKey);
  231. if (!path) {
  232. throw new Error(
  233. "Attempted to query with non-existant property"
  234. );
  235. }
  236. if (path.options.restricted) {
  237. throw new Error(
  238. "Attempted to query with restricted property"
  239. );
  240. }
  241. });
  242. });
  243. console.log(`Pre-${method}!`, filterKeys);
  244. // Here we want to always exclude some properties depending on the model, like passwords/tokens
  245. this.projection({ restrictedName: 0 });
  246. }
  247. console.log(`Pre-${method}! END`);
  248. });
  249. schema.post(method, async function (docOrDocs) {
  250. console.log(`Post-${method} START!`);
  251. console.log(`Post-${method}!`, docOrDocs);
  252. console.log(`Post-${method}!`, this);
  253. console.log(`Post-${method} END!`);
  254. });
  255. });
  256. const { enabled, eventCreated, eventUpdated, eventDeleted } =
  257. schema.get("patchHistory") ?? {};
  258. if (!enabled) return;
  259. Object.entries({
  260. created: eventCreated,
  261. updated: eventUpdated,
  262. deleted: eventDeleted
  263. })
  264. .filter(([, event]) => !!event)
  265. .forEach(([action, event]) => {
  266. patchEventEmitter.on(event, async ({ doc }) => {
  267. await this.jobQueue.runJob("events", "publish", {
  268. channel: `model.${modelName}.${doc._id}.${action}`,
  269. value: doc
  270. });
  271. });
  272. });
  273. }
  274. /**
  275. * loadModel - Import and load model schema
  276. *
  277. * @param modelName - Name of the model
  278. * @returns Model
  279. */
  280. private async loadModel<ModelName extends keyof Models>(
  281. modelName: ModelName
  282. ): Promise<Models[ModelName]> {
  283. if (!this.mongoConnection) throw new Error("Mongo is not available");
  284. const { schema }: { schema: Schemas[ModelName] } = await import(
  285. `../schemas/${modelName.toString()}`
  286. );
  287. schema.plugin(documentVersionPlugin);
  288. schema.set("timestamps", schema.get("timestamps") ?? true);
  289. const patchHistoryConfig = {
  290. enabled: true,
  291. patchHistoryDisabled: true,
  292. eventCreated: `${modelName}.created`,
  293. eventUpdated: `${modelName}.updated`,
  294. eventDeleted: `${modelName}.deleted`,
  295. ...(schema.get("patchHistory") ?? {})
  296. };
  297. schema.set("patchHistory", patchHistoryConfig);
  298. if (patchHistoryConfig.enabled) {
  299. schema.plugin(patchHistoryPlugin, patchHistoryConfig);
  300. }
  301. const { enabled: getDataEnabled = false } = schema.get("getData") ?? {};
  302. if (getDataEnabled) schema.plugin(getDataPlugin);
  303. await this.registerEvents(modelName, schema);
  304. return this.mongoConnection.model(modelName.toString(), schema);
  305. }
  306. /**
  307. * loadModels - Load and initialize all models
  308. *
  309. * @returns Promise
  310. */
  311. private async loadModels() {
  312. mongoose.SchemaTypes.String.set("trim", true);
  313. this.models = {
  314. abc: await this.loadModel("abc"),
  315. news: await this.loadModel("news"),
  316. session: await this.loadModel("session"),
  317. station: await this.loadModel("station"),
  318. user: await this.loadModel("user")
  319. };
  320. }
  321. /**
  322. * syncModelIndexes - Sync indexes for all models
  323. */
  324. private async syncModelIndexes() {
  325. if (!this.models) throw new Error("Models not loaded");
  326. await Promise.all(
  327. Object.values(this.models).map(model => model.syncIndexes())
  328. );
  329. }
  330. /**
  331. * getModel - Get model
  332. *
  333. * @returns Model
  334. */
  335. public async getModel<ModelName extends keyof Models>(
  336. jobContext: JobContext,
  337. payload: ModelName | { name: ModelName }
  338. ) {
  339. if (!this.models) throw new Error("Models not loaded");
  340. if (this.getStatus() !== ModuleStatus.STARTED)
  341. throw new Error("Module not started");
  342. const name = typeof payload === "object" ? payload.name : payload;
  343. return this.models[name];
  344. }
  345. private async loadMigrations() {
  346. if (!this.mongoConnection) throw new Error("Mongo is not available");
  347. const migrations = await readdir(
  348. path.resolve(__dirname, "../schemas/migrations/")
  349. );
  350. return Promise.all(
  351. migrations.map(async migrationFile => {
  352. const { default: Migrate }: { default: typeof Migration } =
  353. await import(`../schemas/migrations/${migrationFile}`);
  354. return new Migrate(this.mongoConnection as Connection);
  355. })
  356. );
  357. }
  358. private async runMigrations() {
  359. const migrations = await this.loadMigrations();
  360. for (let i = 0; i < migrations.length; i += 1) {
  361. const migration = migrations[i];
  362. // eslint-disable-next-line no-await-in-loop
  363. await migration.up();
  364. }
  365. }
  366. }
  367. export type DataModuleJobs = {
  368. [Property in keyof UniqueMethods<DataModule>]: {
  369. payload: Parameters<UniqueMethods<DataModule>[Property]>[1];
  370. returns: Awaited<ReturnType<UniqueMethods<DataModule>[Property]>>;
  371. };
  372. };