DataModule.ts 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544
  1. import async from "async";
  2. import config from "config";
  3. import mongoose, { Schema } from "mongoose";
  4. import hash from "object-hash";
  5. import { createClient, RedisClientType } from "redis";
  6. import JobContext from "src/JobContext";
  7. import BaseModule from "../BaseModule";
  8. import ModuleManager from "../ModuleManager";
  9. import { UniqueMethods } from "../types/Modules";
  10. import { Collections } from "../types/Collections";
  11. export default class DataModule extends BaseModule {
  12. collections?: Collections;
  13. redis?: RedisClientType;
  14. /**
  15. * Data Module
  16. *
  17. * @param moduleManager - Module manager class
  18. */
  19. public constructor(moduleManager: ModuleManager) {
  20. super(moduleManager, "data");
  21. }
  22. /**
  23. * startup - Startup data module
  24. */
  25. public override startup(): Promise<void> {
  26. return new Promise((resolve, reject) => {
  27. async.waterfall(
  28. [
  29. async () => super.startup(),
  30. async () => {
  31. const mongoUrl = config.get<string>("mongo.url");
  32. return mongoose.connect(mongoUrl);
  33. },
  34. async () => this.loadCollections(),
  35. async () => {
  36. if (this.collections) {
  37. await async.each(
  38. Object.values(this.collections),
  39. async collection =>
  40. collection.model.syncIndexes()
  41. );
  42. } else
  43. throw new Error("Collections have not been loaded");
  44. },
  45. async () => {
  46. const { url, password } = config.get<{
  47. url: string;
  48. password: string;
  49. }>("redis");
  50. this.redis = createClient({
  51. url,
  52. password
  53. });
  54. return this.redis.connect();
  55. },
  56. async () => {
  57. if (!this.redis)
  58. throw new Error("Redis connection not established");
  59. return this.redis.sendCommand([
  60. "CONFIG",
  61. "GET",
  62. "notify-keyspace-events"
  63. ]);
  64. },
  65. async (redisConfigResponse: string[]) => {
  66. if (
  67. !(
  68. Array.isArray(redisConfigResponse) &&
  69. redisConfigResponse[1] === "xE"
  70. )
  71. )
  72. throw new Error(
  73. `notify-keyspace-events is NOT configured correctly! It is set to: ${
  74. (Array.isArray(redisConfigResponse) &&
  75. redisConfigResponse[1]) ||
  76. "unknown"
  77. }`
  78. );
  79. },
  80. async () => super.started()
  81. ],
  82. err => {
  83. if (err) reject(err);
  84. else resolve();
  85. }
  86. );
  87. });
  88. }
  89. /**
  90. * shutdown - Shutdown data module
  91. */
  92. public override shutdown(): Promise<void> {
  93. return new Promise(resolve => {
  94. super
  95. .shutdown()
  96. .then(async () => {
  97. // TODO: Ensure the following shutdown correctly
  98. if (this.redis) await this.redis.quit();
  99. await mongoose.connection.close(false);
  100. })
  101. .finally(() => resolve());
  102. });
  103. }
  104. /**
  105. * loadColllection - Import and load collection schema
  106. *
  107. * @param collectionName - Name of the collection
  108. * @returns Collection
  109. */
  110. private loadCollection<T extends keyof Collections>(
  111. collectionName: T
  112. ): Promise<Collections[T]> {
  113. return new Promise(resolve => {
  114. import(`../collections/${collectionName.toString()}`).then(
  115. ({ schema }: { schema: Collections[T]["schema"] }) => {
  116. const mongoSchema = new Schema<
  117. Collections[T]["schema"]["document"]
  118. >(schema.document, {
  119. timestamps: schema.timestamps
  120. });
  121. const model = mongoose.model(
  122. collectionName.toString(),
  123. mongoSchema
  124. );
  125. // @ts-ignore
  126. resolve({
  127. // @ts-ignore
  128. schema,
  129. // @ts-ignore
  130. model
  131. });
  132. }
  133. );
  134. });
  135. }
  136. /**
  137. * loadCollections - Load and initialize all collections
  138. *
  139. * @returns Promise
  140. */
  141. private loadCollections(): Promise<void> {
  142. return new Promise((resolve, reject) => {
  143. const fetchCollections = async () => ({
  144. abc: await this.loadCollection("abc")
  145. });
  146. fetchCollections()
  147. .then(collections => {
  148. this.collections = collections;
  149. resolve();
  150. })
  151. .catch(err => {
  152. reject(new Error(err));
  153. });
  154. });
  155. }
  156. /**
  157. * Strip a document object from any unneeded properties, or of any restricted properties
  158. *
  159. * @param document The document object
  160. * @param schema The schema object
  161. * @param projection The project, which can be null
  162. * @returns
  163. */
  164. private async stripDocument(
  165. document: any,
  166. schema: any,
  167. projection: any
  168. ) {
  169. const allowedByProjection = (property: string) => {
  170. if (Array.isArray(projection)) return projection.indexOf(property) !== -1;
  171. else if (typeof property === "object") !!projection[property];
  172. else return false;
  173. }
  174. const unfilteredEntries = Object.entries(document);
  175. const filteredEntries = await async.filter(unfilteredEntries, async ([key, value]) => {
  176. if (!schema[key]) return false;
  177. if (projection) return allowedByProjection(key);
  178. else {
  179. if (schema[key].restricted) return false;
  180. return true;
  181. }
  182. });
  183. return Object.fromEntries(filteredEntries);
  184. }
  185. /**
  186. * parseFindFilter - Ensure validity of filter and return a mongo filter ---, or the document itself re-constructed
  187. *
  188. * @param filter - Filter
  189. * @param schema - Schema of collection document
  190. * @param options - Parser options
  191. * @returns Promise returning object with query values cast to schema types
  192. * and whether query includes restricted attributes
  193. */
  194. private async parseFindFilter(
  195. filter: any,
  196. schema: any,
  197. options?: {
  198. operators?: boolean;
  199. }
  200. ): Promise<{ mongoFilter: any; containsRestrictedProperties: boolean, canCache: boolean }> {
  201. if (!filter || typeof filter !== "object")
  202. throw new Error("Invalid filter provided. Filter must be an object.");
  203. const keys = Object.keys(filter);
  204. if (keys.length === 0)
  205. throw new Error("Invalid filter provided. Filter must contain keys.");
  206. // Whether to parse operators or not
  207. const operators = !(options && options.operators === false);
  208. // The MongoDB filter we're building
  209. const mongoFilter: any = {};
  210. // If the filter references any properties that are restricted, this will be true, so that find knows not to cache the query object
  211. let containsRestrictedProperties = false;
  212. // Whether this filter is cachable or not
  213. let canCache = true;
  214. // Operators at the key level that we support right now
  215. const allowedKeyOperators = ["$or", "$and"];
  216. // Operators at the value level that we support right now
  217. const allowedValueOperators = ["$in"];
  218. // Loop through all key/value properties
  219. await async.each(Object.entries(filter), async ([key, value]) => {
  220. // Key must be 1 character and exist
  221. if (!key || key.length === 0)
  222. throw new Error(
  223. `Invalid filter provided. Key must be at least 1 character.`
  224. );
  225. // Handle key operators, which always start with a $
  226. if (operators && key[0] === "$") {
  227. // Operator isn't found, so throw an error
  228. if (allowedKeyOperators.indexOf(key) === -1)
  229. throw new Error(
  230. `Invalid filter provided. Operator "${key}" is not allowed.`
  231. );
  232. // We currently only support $or and $and, but here we can have different logic for different operators
  233. if (key === "$or" || key === "$and") {
  234. // $or and $and should always be an array, so check if it is
  235. if (!Array.isArray(value) || value.length === 0)
  236. throw new Error(
  237. `Key "${key}" must contain array of queries.`
  238. );
  239. // Add the operator to the mongo filter object as an empty array
  240. mongoFilter[key] = [];
  241. // Run parseFindQuery again for child objects and add them to the mongo query operator array
  242. await async.each(value, async _value => {
  243. const {
  244. mongoFilter: _mongoFilter,
  245. containsRestrictedProperties: _containsRestrictedProperties
  246. } = await this.parseFindFilter(_value, schema, options);
  247. // Actually add the returned filter object to the mongo query we're building
  248. mongoFilter[key].push(_mongoFilter);
  249. if (_containsRestrictedProperties) containsRestrictedProperties = true;
  250. });
  251. } else
  252. throw new Error(
  253. `Unhandled operator "${key}", this should never happen!`
  254. );
  255. } else {
  256. // Here we handle any normal keys in the query object
  257. // If the key doesn't exist in the schema, throw an error
  258. if (!Object.hasOwn(schema, key))
  259. throw new Error(
  260. `Key "${key} does not exist in the schema."`
  261. );
  262. // If the key in the schema is marked as restricted, containsRestrictedProperties will be true
  263. if (schema[key].restricted) containsRestrictedProperties = true;
  264. // Type will be undefined if it's a nested object
  265. if (schema[key].type === undefined) {
  266. // Run parseFindFilter on the nested schema object
  267. const { mongoFilter: _mongoFilter, containsRestrictedProperties: _containsRestrictedProperties } =
  268. await this.parseFindFilter(value, schema[key], options);
  269. mongoFilter[key] = _mongoFilter;
  270. if (_containsRestrictedProperties) containsRestrictedProperties = true;
  271. } else if (
  272. operators &&
  273. typeof value === "object" &&
  274. value &&
  275. Object.keys(value).length === 1 &&
  276. Object.keys(value)[0] &&
  277. Object.keys(value)[0][0] === "$"
  278. ) {
  279. // This entire if statement is for handling value operators
  280. const operator = Object.keys(value)[0];
  281. // Operator isn't found, so throw an error
  282. if (allowedValueOperators.indexOf(operator) === -1)
  283. throw new Error(
  284. `Invalid filter provided. Operator "${key}" is not allowed.`
  285. );
  286. // Handle the $in value operator
  287. if (operator === "$in") {
  288. mongoFilter[key] = {
  289. $in: []
  290. };
  291. if (value.$in.length > 0)
  292. mongoFilter[key].$in = await async.map(
  293. value.$in,
  294. async (_value: any) => {
  295. if (
  296. typeof schema[key].type === "function"
  297. ) {
  298. //
  299. // const Type = schema[key].type;
  300. // const castValue = new Type(_value);
  301. // if (schema[key].validate)
  302. // await schema[key]
  303. // .validate(castValue)
  304. // .catch(err => {
  305. // throw new Error(
  306. // `Invalid value for ${key}, ${err}`
  307. // );
  308. // });
  309. return _value;
  310. }
  311. throw new Error(
  312. `Invalid schema type for ${key}`
  313. );
  314. }
  315. );
  316. } else
  317. throw new Error(
  318. `Unhandled operator "${operator}", this should never happen!`
  319. );
  320. } else if (typeof schema[key].type === "function") {
  321. // Do type checking/casting here
  322. // const Type = schema[key].type;
  323. // // const castValue = new Type(value);
  324. // if (schema[key].validate)
  325. // await schema[key].validate(castValue).catch(err => {
  326. // throw new Error(`Invalid value for ${key}, ${err}`);
  327. // });
  328. mongoFilter[key] = value;
  329. } else throw new Error(`Invalid schema type for ${key}`);
  330. }
  331. });
  332. if (containsRestrictedProperties) canCache = false;
  333. return { mongoFilter, containsRestrictedProperties, canCache };
  334. }
  335. // TODO improve caching
  336. // TODO add support for computed fields
  337. // TODO parse query - validation
  338. // TODO add proper typescript support
  339. // TODO add proper jsdoc
  340. // TODO add support for enum document attributes
  341. // TODO add support for array document attributes
  342. // TODO add support for reference document attributes
  343. // TODO fix 2nd layer of schema
  344. /**
  345. * find - Get one or more document(s) from a single collection
  346. *
  347. * @param payload - Payload
  348. * @returns Returned object
  349. */
  350. public find<CollectionNameType extends keyof Collections>(
  351. context: JobContext,
  352. {
  353. collection, // Collection name
  354. filter, // Similar to MongoDB filter
  355. projection,
  356. values, // TODO: Add support
  357. limit = 0, // TODO have limit off by default?
  358. page = 1,
  359. useCache = true
  360. }: {
  361. collection: CollectionNameType;
  362. filter: Record<string, any>;
  363. projection?: Record<string, any> | string[],
  364. values?: Record<string, any>;
  365. limit?: number;
  366. page?: number;
  367. useCache?: boolean;
  368. }
  369. ): Promise<any | null> {
  370. return new Promise((resolve, reject) => {
  371. let queryHash: string | null = null;
  372. let cacheable = useCache !== false;
  373. async.waterfall(
  374. [
  375. // Verify whether the collection exists
  376. async () => {
  377. if (!collection)
  378. throw new Error("No collection specified");
  379. if (this.collections && !this.collections[collection])
  380. throw new Error("Collection not found");
  381. },
  382. // Verify whether the query is valid-enough to continue
  383. async () =>
  384. this.parseFindFilter(
  385. filter,
  386. this.collections![collection].schema.document
  387. ),
  388. // If we can use cache, get from the cache, and if we get results return those
  389. async ({ mongoFilter, canCache }: any) => {
  390. // console.log(111, mongoFilter, canCache);
  391. // If we're allowed to cache, and the filter doesn't reference any restricted fields, try to cache the query and its response
  392. if (cacheable && canCache) {
  393. // Turn the query object into a sha1 hash that can be used as a Redis key
  394. queryHash = hash(
  395. { collection, mongoFilter, values, limit, page },
  396. {
  397. algorithm: "sha1"
  398. }
  399. );
  400. // Check if the query hash already exists in Redis, and get it if it is
  401. const cachedQuery = await this.redis?.GET(
  402. `query.find.${queryHash}`
  403. );
  404. // Return the mongoFilter along with the cachedDocuments, if any
  405. return {
  406. mongoFilter,
  407. cachedDocuments: cachedQuery
  408. ? JSON.parse(cachedQuery)
  409. : null
  410. };
  411. }
  412. return { mongoFilter, cachedDocuments: null };
  413. },
  414. // If we didn't get documents from the cache, get them from mongo
  415. async ({ mongoFilter, cachedDocuments }: any) => {
  416. if (cachedDocuments) {
  417. cacheable = false;
  418. return cachedDocuments;
  419. }
  420. // const getFindValues = async (object: any) => {
  421. // const find: any = {};
  422. // await async.each(
  423. // Object.entries(object),
  424. // async ([key, value]) => {
  425. // if (
  426. // value.type === undefined &&
  427. // Object.keys(value).length > 0
  428. // ) {
  429. // const _find = await getFindValues(
  430. // value
  431. // );
  432. // if (Object.keys(_find).length > 0)
  433. // find[key] = _find;
  434. // } else if (!value.restricted)
  435. // find[key] = true;
  436. // }
  437. // );
  438. // return find;
  439. // };
  440. // const find: any = await getFindValues(
  441. // this.collections![collection].schema.document
  442. // );
  443. // TODO, add mongo projection. Make sure to keep in mind caching with queryHash.
  444. const mongoProjection = null;
  445. return this.collections?.[collection].model
  446. .find(mongoFilter, mongoProjection)
  447. .limit(limit)
  448. .skip((page - 1) * limit);
  449. },
  450. // Convert documents from Mongoose model to regular objects
  451. async (documents: any[]) =>
  452. async.map(documents, async (document: any) => document._doc ? document._doc : document),
  453. // Add documents to the cache
  454. async (documents: any[]) => {
  455. // Adds query results to cache but doesnt await
  456. if (cacheable && queryHash) {
  457. this.redis!.SET(
  458. `query.find.${queryHash}`,
  459. JSON.stringify(documents),
  460. {
  461. EX: 60
  462. }
  463. );
  464. }
  465. return documents;
  466. },
  467. // Strips the document of any unneeded properties or properties that are restricted
  468. async (documents: any[]) => async.map(documents, async (document: any) => {
  469. return await this.stripDocument(document, this.collections![collection].schema.document, projection);
  470. })
  471. ],
  472. (err, documents?: any[]) => {
  473. if (err) reject(err);
  474. else if (!documents || documents!.length === 0)
  475. resolve(limit === 1 ? null : []);
  476. else resolve(limit === 1 ? documents![0] : documents);
  477. }
  478. );
  479. });
  480. }
  481. }
  482. export type DataModuleJobs = {
  483. [Property in keyof UniqueMethods<DataModule>]: {
  484. payload: Parameters<UniqueMethods<DataModule>[Property]>[1];
  485. returns: Awaited<ReturnType<UniqueMethods<DataModule>[Property]>>;
  486. };
  487. };