DataModule.ts 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631
  1. import config from "config";
  2. // import { createClient, RedisClientType } from "redis";
  3. import mongoose, {
  4. Connection,
  5. isObjectIdOrHexString,
  6. MongooseDefaultQueryMiddleware,
  7. MongooseDistinctQueryMiddleware,
  8. MongooseQueryOrDocumentMiddleware,
  9. Types
  10. } from "mongoose";
  11. import { patchHistoryPlugin, patchEventEmitter } from "ts-patch-mongoose";
  12. import { readdir } from "fs/promises";
  13. import path from "path";
  14. import JobContext from "../JobContext";
  15. import BaseModule, { ModuleStatus } from "../BaseModule";
  16. import { UniqueMethods } from "../types/Modules";
  17. import { AnyModel, Models } from "../types/Models";
  18. import { Schemas } from "../types/Schemas";
  19. import documentVersionPlugin from "../models/plugins/documentVersion";
  20. import getDataPlugin from "../models/plugins/getData";
  21. import Migration from "../models/Migration";
  22. /**
  23. * Experimental: function to get all nested keys from a MongoDB query object
  24. */
  25. function getAllKeys(obj: object) {
  26. const keys: string[] = [];
  27. function processObject(obj: object, parentKey = "") {
  28. let returnChanged = false;
  29. // eslint-disable-next-line
  30. for (let key in obj) {
  31. // eslint-disable-next-line
  32. if (obj.hasOwnProperty(key)) {
  33. if (key.startsWith("$")) {
  34. // eslint-disable-next-line
  35. // @ts-ignore
  36. // eslint-disable-next-line
  37. processNestedObject(obj[key], parentKey); // Process nested keys without including the current key
  38. // eslint-disable-next-line
  39. continue; // Skip the current key
  40. }
  41. const currentKey = parentKey ? `${parentKey}.${key}` : key;
  42. // eslint-disable-next-line
  43. // @ts-ignore
  44. if (typeof obj[key] === "object" && obj[key] !== null) {
  45. // eslint-disable-next-line
  46. // @ts-ignore
  47. if (Array.isArray(obj[key])) {
  48. // eslint-disable-next-line
  49. // @ts-ignore
  50. // eslint-disable-next-line
  51. if (processArray(obj[key], currentKey)) {
  52. returnChanged = true;
  53. // eslint-disable-next-line
  54. continue;
  55. }
  56. }
  57. // eslint-disable-next-line
  58. // @ts-ignore
  59. else if (processObject(obj[key], currentKey)) {
  60. returnChanged = true;
  61. // eslint-disable-next-line
  62. continue;
  63. }
  64. }
  65. keys.push(currentKey);
  66. returnChanged = true;
  67. }
  68. }
  69. return returnChanged;
  70. }
  71. function processArray(arr: Array<any>, parentKey: string) {
  72. let returnChanged = false;
  73. for (let i = 0; i < arr.length; i += 1) {
  74. const currentKey = parentKey;
  75. if (typeof arr[i] === "object" && arr[i] !== null) {
  76. if (Array.isArray(arr[i])) {
  77. if (processArray(arr[i], currentKey)) returnChanged = true;
  78. } else if (processObject(arr[i], currentKey))
  79. returnChanged = true;
  80. }
  81. }
  82. return returnChanged;
  83. }
  84. function processNestedObject(obj: object, parentKey: string) {
  85. if (typeof obj === "object" && obj !== null) {
  86. if (Array.isArray(obj)) {
  87. processArray(obj, parentKey);
  88. } else {
  89. processObject(obj, parentKey);
  90. }
  91. }
  92. }
  93. processObject(obj);
  94. return keys;
  95. }
  96. export default class DataModule extends BaseModule {
  97. private _models?: Models;
  98. private _mongoConnection?: Connection;
  99. // private _redisClient?: RedisClientType;
  100. /**
  101. * Data Module
  102. */
  103. public constructor() {
  104. super("data");
  105. this._dependentModules = ["events"];
  106. this._jobConfig = {
  107. getModel: false,
  108. find: "disabled",
  109. addC: "disabled"
  110. };
  111. }
  112. /**
  113. * startup - Startup data module
  114. */
  115. public override async startup() {
  116. await super.startup();
  117. await this._createMongoConnection();
  118. await this._runMigrations();
  119. await this._loadModels();
  120. await this._syncModelIndexes();
  121. await this._defineModelJobs();
  122. // @ts-ignore
  123. // this._redisClient = createClient({ ...config.get("redis") });
  124. //
  125. // await this._redisClient.connect();
  126. //
  127. // const redisConfigResponse = await this._redisClient.sendCommand([
  128. // "CONFIG",
  129. // "GET",
  130. // "notify-keyspace-events"
  131. // ]);
  132. //
  133. // if (
  134. // !(
  135. // Array.isArray(redisConfigResponse) &&
  136. // redisConfigResponse[1] === "xE"
  137. // )
  138. // )
  139. // throw new Error(
  140. // `notify-keyspace-events is NOT configured correctly! It is set to: ${
  141. // (Array.isArray(redisConfigResponse) &&
  142. // redisConfigResponse[1]) ||
  143. // "unknown"
  144. // }`
  145. // );
  146. await super._started();
  147. }
  148. /**
  149. * shutdown - Shutdown data module
  150. */
  151. public override async shutdown() {
  152. await super.shutdown();
  153. // if (this._redisClient) await this._redisClient.quit();
  154. patchEventEmitter.removeAllListeners();
  155. if (this._mongoConnection) await this._mongoConnection.close();
  156. await this._stopped();
  157. }
  158. /**
  159. * createMongoConnection - Create mongo connection
  160. */
  161. private async _createMongoConnection() {
  162. mongoose.set({
  163. runValidators: true,
  164. sanitizeFilter: true,
  165. strict: "throw",
  166. strictQuery: "throw"
  167. });
  168. const { user, password, host, port, database } = config.get<{
  169. user: string;
  170. password: string;
  171. host: string;
  172. port: number;
  173. database: string;
  174. }>("mongo");
  175. const mongoUrl = `mongodb://${user}:${password}@${host}:${port}/${database}`;
  176. this._mongoConnection = await mongoose
  177. .createConnection(mongoUrl)
  178. .asPromise();
  179. }
  180. /**
  181. * registerEvents - Register events for schema with event module
  182. */
  183. private async _registerEvents<
  184. ModelName extends keyof Models,
  185. SchemaType extends Schemas[keyof ModelName]
  186. >(modelName: ModelName, schema: SchemaType) {
  187. const methods: string[] = [
  188. "aggregate",
  189. "count",
  190. "countDocuments",
  191. "deleteOne",
  192. "deleteMany",
  193. "estimatedDocumentCount",
  194. "find",
  195. "findOne",
  196. "findOneAndDelete",
  197. "findOneAndRemove",
  198. "findOneAndReplace",
  199. "findOneAndUpdate",
  200. // "init",
  201. "insertMany",
  202. "remove",
  203. "replaceOne",
  204. "save",
  205. "update",
  206. "updateOne",
  207. "updateMany"
  208. // "validate"
  209. ];
  210. methods.forEach(method => {
  211. // NOTE: some Mongo selectors may also search through linked documents. Prevent that
  212. schema.pre(method, async function () {
  213. // console.log(`Pre-${method}! START`);
  214. if (
  215. this.options?.userContext &&
  216. ["find", "update", "deleteOne", "save"].indexOf(method) ===
  217. -1
  218. )
  219. throw new Error("Method not allowed");
  220. // console.log(`Pre-${method}!`, this.options?.userContext);
  221. if (["find", "update", "deleteOne"].indexOf(method) !== -1) {
  222. const filter = this.getFilter();
  223. const filterKeys = getAllKeys(filter);
  224. filterKeys.forEach(filterKey => {
  225. const splitFilterKeys = filterKey
  226. .split(".")
  227. .reduce(
  228. (keys: string[], key: string) =>
  229. keys.length > 0
  230. ? [
  231. ...keys,
  232. `${
  233. keys[keys.length - 1]
  234. }.${key}`
  235. ]
  236. : [key],
  237. []
  238. );
  239. splitFilterKeys.forEach(splitFilterKey => {
  240. const path = this.schema.path(splitFilterKey);
  241. if (!path) {
  242. throw new Error(
  243. "Attempted to query with non-existant property"
  244. );
  245. }
  246. if (path.options.restricted) {
  247. throw new Error(
  248. "Attempted to query with restricted property"
  249. );
  250. }
  251. });
  252. });
  253. // console.log(`Pre-${method}!`, filterKeys);
  254. // Here we want to always exclude some properties depending on the model, like passwords/tokens
  255. this.projection({ restrictedName: 0 });
  256. }
  257. // console.log(`Pre-${method}! END`);
  258. });
  259. schema.post(method, async docOrDocs => {
  260. // console.log(`Post-${method} START!`);
  261. // console.log(`Post-${method}!`, docOrDocs);
  262. // console.log(`Post-${method}!`, this);
  263. // console.log(`Post-${method} END!`);
  264. });
  265. });
  266. const { enabled, eventCreated, eventUpdated, eventDeleted } =
  267. schema.get("patchHistory") ?? {};
  268. if (!enabled) return;
  269. Object.entries({
  270. created: eventCreated,
  271. updated: eventUpdated,
  272. deleted: eventDeleted
  273. })
  274. .filter(([, event]) => !!event)
  275. .forEach(([action, event]) => {
  276. patchEventEmitter.on(event, async ({ doc, oldDoc }) => {
  277. const modelId = doc?._id ?? oldDoc?._id;
  278. if (!modelId)
  279. throw new Error(`Model Id not found for "${event}"`);
  280. await this._jobQueue.runJob("events", "publish", {
  281. channel: `model.${modelName}.${modelId}.${action}`,
  282. value: { doc, oldDoc }
  283. });
  284. });
  285. });
  286. }
  287. /**
  288. * loadModel - Import and load model schema
  289. *
  290. * @param modelName - Name of the model
  291. * @returns Model
  292. */
  293. private async _loadModel<ModelName extends keyof Models>(
  294. modelName: ModelName
  295. ): Promise<Models[ModelName]> {
  296. if (!this._mongoConnection) throw new Error("Mongo is not available");
  297. const { schema }: { schema: Schemas[ModelName] } = await import(
  298. `../models/schemas/${modelName.toString()}/schema`
  299. );
  300. schema.plugin(documentVersionPlugin);
  301. schema.set("timestamps", schema.get("timestamps") ?? true);
  302. const patchHistoryConfig = {
  303. enabled: true,
  304. patchHistoryDisabled: true,
  305. eventCreated: `${modelName}.created`,
  306. eventUpdated: `${modelName}.updated`,
  307. eventDeleted: `${modelName}.deleted`,
  308. ...(schema.get("patchHistory") ?? {})
  309. };
  310. schema.set("patchHistory", patchHistoryConfig);
  311. if (patchHistoryConfig.enabled) {
  312. schema.plugin(patchHistoryPlugin, patchHistoryConfig);
  313. }
  314. const { enabled: getDataEnabled = false } = schema.get("getData") ?? {};
  315. if (getDataEnabled) schema.plugin(getDataPlugin);
  316. await this._registerEvents(modelName, schema);
  317. return this._mongoConnection.model(modelName.toString(), schema);
  318. }
  319. /**
  320. * loadModels - Load and initialize all models
  321. *
  322. * @returns Promise
  323. */
  324. private async _loadModels() {
  325. mongoose.SchemaTypes.String.set("trim", true);
  326. this._models = {
  327. abc: await this._loadModel("abc"),
  328. news: await this._loadModel("news"),
  329. sessions: await this._loadModel("sessions"),
  330. stations: await this._loadModel("stations"),
  331. users: await this._loadModel("users")
  332. };
  333. }
  334. /**
  335. * syncModelIndexes - Sync indexes for all models
  336. */
  337. private async _syncModelIndexes() {
  338. if (!this._models) throw new Error("Models not loaded");
  339. await Promise.all(
  340. Object.values(this._models).map(model => model.syncIndexes())
  341. );
  342. }
  343. /**
  344. * getModel - Get model
  345. *
  346. * @returns Model
  347. */
  348. public async getModel<ModelName extends keyof Models>(
  349. jobContext: JobContext,
  350. payload: ModelName | { name: ModelName }
  351. ) {
  352. if (!this._models) throw new Error("Models not loaded");
  353. if (this.getStatus() !== ModuleStatus.STARTED)
  354. throw new Error("Module not started");
  355. const name = typeof payload === "object" ? payload.name : payload;
  356. return this._models[name];
  357. }
  358. private async _loadMigrations() {
  359. if (!this._mongoConnection) throw new Error("Mongo is not available");
  360. const migrations = await readdir(
  361. path.resolve(__dirname, "../models/migrations/")
  362. );
  363. return Promise.all(
  364. migrations.map(async migrationFile => {
  365. const { default: Migrate }: { default: typeof Migration } =
  366. await import(`../models/migrations/${migrationFile}`);
  367. return new Migrate(this._mongoConnection as Connection);
  368. })
  369. );
  370. }
  371. private async _runMigrations() {
  372. const migrations = await this._loadMigrations();
  373. for (let i = 0; i < migrations.length; i += 1) {
  374. const migration = migrations[i];
  375. // eslint-disable-next-line no-await-in-loop
  376. await migration.up();
  377. }
  378. }
  379. private async _defineModelJobs() {
  380. if (!this._models) throw new Error("Models not loaded");
  381. await Promise.all(
  382. Object.entries(this._models).map(async ([modelName, model]) => {
  383. await Promise.all(
  384. ["create", "findById", "updateById", "deleteById"].map(
  385. async method => {
  386. this._jobConfig[`${modelName}.${method}`] = {
  387. method: async (context, payload) =>
  388. Object.getPrototypeOf(this)[`_${method}`](
  389. context,
  390. {
  391. ...payload,
  392. modelName,
  393. model
  394. }
  395. )
  396. };
  397. }
  398. )
  399. );
  400. const jobConfig = model.schema.get("jobConfig");
  401. if (
  402. typeof jobConfig === "object" &&
  403. Object.keys(jobConfig).length > 0
  404. )
  405. await Promise.all(
  406. Object.entries(jobConfig).map(
  407. async ([name, options]) => {
  408. if (options === "disabled") {
  409. if (this._jobConfig[`${modelName}.${name}`])
  410. delete this._jobConfig[
  411. `${modelName}.${name}`
  412. ];
  413. return;
  414. }
  415. let api = this._jobApiDefault;
  416. let method;
  417. const configOptions =
  418. this._jobConfig[`${modelName}.${name}`];
  419. if (typeof configOptions === "object") {
  420. if (typeof configOptions.api === "boolean")
  421. api = configOptions.api;
  422. if (
  423. typeof configOptions.method ===
  424. "function"
  425. )
  426. method = configOptions.method;
  427. } else if (typeof configOptions === "function")
  428. method = configOptions;
  429. else if (typeof configOptions === "boolean")
  430. api = configOptions;
  431. if (
  432. typeof options === "object" &&
  433. typeof options.api === "boolean"
  434. )
  435. api = options.api;
  436. else if (typeof options === "boolean")
  437. api = options;
  438. if (
  439. typeof options === "object" &&
  440. typeof options.method === "function"
  441. )
  442. method = async (...args) =>
  443. options.method.apply(model, args);
  444. else if (typeof options === "function")
  445. method = async (...args) =>
  446. options.apply(model, args);
  447. if (typeof method !== "function")
  448. throw new Error(
  449. `Job "${name}" has no function method defined`
  450. );
  451. this._jobConfig[`${modelName}.${name}`] = {
  452. api,
  453. method
  454. };
  455. }
  456. )
  457. );
  458. })
  459. );
  460. }
  461. private async _findById(
  462. context: JobContext,
  463. payload: {
  464. modelName: keyof Models;
  465. model: AnyModel;
  466. _id: Types.ObjectId;
  467. }
  468. ) {
  469. const { modelName, model, _id } = payload ?? {};
  470. await context.assertPermission(`data.${modelName}.findById.${_id}`);
  471. const query = model.findById(_id);
  472. return query.exec();
  473. }
  474. private async _create(
  475. context: JobContext,
  476. payload: {
  477. modelName: keyof Models;
  478. model: AnyModel;
  479. query: Record<string, any[]>;
  480. }
  481. ) {
  482. const { modelName, model, query } = payload ?? {};
  483. await context.assertPermission(`data.${modelName}.create`);
  484. if (typeof query !== "object")
  485. throw new Error("Query is not an object");
  486. if (Object.keys(query).length === 0)
  487. throw new Error("Empty query object provided");
  488. if (model.schema.path("createdBy"))
  489. query.createdBy = (await context.getUser())._id;
  490. return model.create(query);
  491. }
  492. private async _updateById(
  493. context: JobContext,
  494. payload: {
  495. modelName: keyof Models;
  496. model: AnyModel;
  497. _id: Types.ObjectId;
  498. query: Record<string, any[]>;
  499. }
  500. ) {
  501. const { modelName, model, _id, query } = payload ?? {};
  502. await context.assertPermission(`data.${modelName}.updateById.${_id}`);
  503. if (!isObjectIdOrHexString(_id))
  504. throw new Error("_id is not an ObjectId");
  505. if (typeof query !== "object")
  506. throw new Error("Query is not an object");
  507. if (Object.keys(query).length === 0)
  508. throw new Error("Empty query object provided");
  509. return model.updateOne({ _id }, { $set: query });
  510. }
  511. private async _deleteById(
  512. context: JobContext,
  513. payload: {
  514. modelName: keyof Models;
  515. model: AnyModel;
  516. _id: Types.ObjectId;
  517. }
  518. ) {
  519. const { modelName, model, _id } = payload ?? {};
  520. await context.assertPermission(`data.${modelName}.deleteById.${_id}`);
  521. if (!isObjectIdOrHexString(_id))
  522. throw new Error("_id is not an ObjectId");
  523. return model.deleteOne({ _id });
  524. }
  525. }
  526. export type DataModuleJobs = {
  527. [Property in keyof UniqueMethods<DataModule>]: {
  528. payload: Parameters<UniqueMethods<DataModule>[Property]>[1];
  529. returns: Awaited<ReturnType<UniqueMethods<DataModule>[Property]>>;
  530. };
  531. };