DataModule.ts 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517
  1. import async from "async";
  2. import config from "config";
  3. import mongoose, { Schema } from "mongoose";
  4. import hash from "object-hash";
  5. import { createClient, RedisClientType } from "redis";
  6. import JobContext from "src/JobContext";
  7. import BaseModule from "../BaseModule";
  8. import ModuleManager from "../ModuleManager";
  9. import { UniqueMethods } from "../types/Modules";
  10. import { Collections } from "../types/Collections";
  11. export default class DataModule extends BaseModule {
  12. collections?: Collections;
  13. redis?: RedisClientType;
  14. /**
  15. * Data Module
  16. *
  17. * @param moduleManager - Module manager class
  18. */
  19. public constructor(moduleManager: ModuleManager) {
  20. super(moduleManager, "data");
  21. }
  22. /**
  23. * startup - Startup data module
  24. */
  25. public override startup(): Promise<void> {
  26. return new Promise((resolve, reject) => {
  27. async.waterfall(
  28. [
  29. async () => super.startup(),
  30. async () => {
  31. const mongoUrl = config.get<string>("mongo.url");
  32. return mongoose.connect(mongoUrl);
  33. },
  34. async () => this.loadCollections(),
  35. async () => {
  36. if (this.collections) {
  37. await async.each(
  38. Object.values(this.collections),
  39. async collection =>
  40. collection.model.syncIndexes()
  41. );
  42. } else
  43. throw new Error("Collections have not been loaded");
  44. },
  45. async () => {
  46. const { url, password } = config.get<{
  47. url: string;
  48. password: string;
  49. }>("redis");
  50. this.redis = createClient({
  51. url,
  52. password
  53. });
  54. return this.redis.connect();
  55. },
  56. async () => {
  57. if (!this.redis)
  58. throw new Error("Redis connection not established");
  59. return this.redis.sendCommand([
  60. "CONFIG",
  61. "GET",
  62. "notify-keyspace-events"
  63. ]);
  64. },
  65. async (redisConfigResponse: string[]) => {
  66. if (
  67. !(
  68. Array.isArray(redisConfigResponse) &&
  69. redisConfigResponse[1] === "xE"
  70. )
  71. )
  72. throw new Error(
  73. `notify-keyspace-events is NOT configured correctly! It is set to: ${
  74. (Array.isArray(redisConfigResponse) &&
  75. redisConfigResponse[1]) ||
  76. "unknown"
  77. }`
  78. );
  79. },
  80. async () => super.started()
  81. ],
  82. err => {
  83. if (err) reject(err);
  84. else resolve();
  85. }
  86. );
  87. });
  88. }
  89. /**
  90. * shutdown - Shutdown data module
  91. */
  92. public override shutdown(): Promise<void> {
  93. return new Promise(resolve => {
  94. super
  95. .shutdown()
  96. .then(async () => {
  97. // TODO: Ensure the following shutdown correctly
  98. if (this.redis) await this.redis.quit();
  99. await mongoose.connection.close(false);
  100. })
  101. .finally(() => resolve());
  102. });
  103. }
  104. /**
  105. * loadColllection - Import and load collection schema
  106. *
  107. * @param collectionName - Name of the collection
  108. * @returns Collection
  109. */
  110. private loadCollection<T extends keyof Collections>(
  111. collectionName: T
  112. ): Promise<Collections[T]> {
  113. return new Promise(resolve => {
  114. import(`../collections/${collectionName.toString()}`).then(
  115. ({ schema }: { schema: Collections[T]["schema"] }) => {
  116. const mongoSchema = new Schema<
  117. Collections[T]["schema"]["document"]
  118. >(schema.document, {
  119. timestamps: schema.timestamps
  120. });
  121. const model = mongoose.model(
  122. collectionName.toString(),
  123. mongoSchema
  124. );
  125. // @ts-ignore
  126. resolve({
  127. // @ts-ignore
  128. schema,
  129. // @ts-ignore
  130. model
  131. });
  132. }
  133. );
  134. });
  135. }
  136. /**
  137. * loadCollections - Load and initialize all collections
  138. *
  139. * @returns Promise
  140. */
  141. private loadCollections(): Promise<void> {
  142. return new Promise((resolve, reject) => {
  143. const fetchCollections = async () => ({
  144. abc: await this.loadCollection("abc")
  145. });
  146. fetchCollections()
  147. .then(collections => {
  148. this.collections = collections;
  149. resolve();
  150. })
  151. .catch(err => {
  152. reject(new Error(err));
  153. });
  154. });
  155. }
  156. // TODO split core into parseDocument(document, schema, { partial: boolean; })
  157. /**
  158. * parseFindFilter - Ensure validity of filter and return a mongo filter ---, or the document itself re-constructed
  159. *
  160. * @param filter - Filter
  161. * @param schema - Schema of collection document
  162. * @param options - Parser options
  163. * @returns Promise returning object with query values cast to schema types
  164. * and whether query includes restricted attributes
  165. */
  166. private async parseFindFilter(
  167. filter: any,
  168. schema: any,
  169. options?: {
  170. operators?: boolean;
  171. }
  172. ): Promise<{ mongoFilter: any; containsRestrictedProperties: boolean, canCache: boolean }> {
  173. if (!filter || typeof filter !== "object")
  174. throw new Error("Invalid filter provided. Filter must be an object.");
  175. const keys = Object.keys(filter);
  176. if (keys.length === 0)
  177. throw new Error("Invalid filter provided. Filter must contain keys.");
  178. // Whether to parse operators or not
  179. const operators = !(options && options.operators === false);
  180. // The MongoDB filter we're building
  181. const mongoFilter: any = {};
  182. // If the filter references any properties that are restricted, this will be true, so that find knows not to cache the query object
  183. let containsRestrictedProperties = false;
  184. // Whether this filter is cachable or not
  185. let canCache = true;
  186. // Operators at the key level that we support right now
  187. const allowedKeyOperators = ["$or", "$and"];
  188. // Operators at the value level that we support right now
  189. const allowedValueOperators = ["$in"];
  190. // Loop through all key/value properties
  191. await async.each(Object.entries(filter), async ([key, value]) => {
  192. // Key must be 1 character and exist
  193. if (!key || key.length === 0)
  194. throw new Error(
  195. `Invalid filter provided. Key must be at least 1 character.`
  196. );
  197. // Handle key operators, which always start with a $
  198. if (operators && key[0] === "$") {
  199. // Operator isn't found, so throw an error
  200. if (allowedKeyOperators.indexOf(key) === -1)
  201. throw new Error(
  202. `Invalid filter provided. Operator "${key}" is not allowed.`
  203. );
  204. // We currently only support $or and $and, but here we can have different logic for different operators
  205. if (key === "$or" || key === "$and") {
  206. // $or and $and should always be an array, so check if it is
  207. if (!Array.isArray(value) || value.length === 0)
  208. throw new Error(
  209. `Key "${key}" must contain array of queries.`
  210. );
  211. // Add the operator to the mongo filter object as an empty array
  212. mongoFilter[key] = [];
  213. // Run parseFindQuery again for child objects and add them to the mongo query operator array
  214. await async.each(value, async _value => {
  215. const {
  216. mongoFilter: _mongoFilter,
  217. containsRestrictedProperties: _containsRestrictedProperties
  218. } = await this.parseFindFilter(_value, schema, options);
  219. // Actually add the returned filter object to the mongo query we're building
  220. mongoFilter[key].push(_mongoFilter);
  221. if (_containsRestrictedProperties) containsRestrictedProperties = true;
  222. });
  223. } else
  224. throw new Error(
  225. `Unhandled operator "${key}", this should never happen!`
  226. );
  227. } else {
  228. // Here we handle any normal keys in the query object
  229. // If the key doesn't exist in the schema, throw an error
  230. if (!Object.hasOwn(schema, key))
  231. throw new Error(
  232. `Key "${key} does not exist in the schema."`
  233. );
  234. // If the key in the schema is marked as restricted, containsRestrictedProperties will be true
  235. if (schema[key].restricted) containsRestrictedProperties = true;
  236. // Type will be undefined if it's a nested object
  237. if (schema[key].type === undefined) {
  238. // Run parseFindFilter on the nested schema object
  239. const { mongoFilter: _mongoFilter, containsRestrictedProperties: _containsRestrictedProperties } =
  240. await this.parseFindFilter(value, schema[key], options);
  241. mongoFilter[key] = _mongoFilter;
  242. if (_containsRestrictedProperties) containsRestrictedProperties = true;
  243. } else if (
  244. operators &&
  245. typeof value === "object" &&
  246. value &&
  247. Object.keys(value).length === 1 &&
  248. Object.keys(value)[0] &&
  249. Object.keys(value)[0][0] === "$"
  250. ) {
  251. // This entire if statement is for handling value operators
  252. const operator = Object.keys(value)[0];
  253. // Operator isn't found, so throw an error
  254. if (allowedValueOperators.indexOf(operator) === -1)
  255. throw new Error(
  256. `Invalid filter provided. Operator "${key}" is not allowed.`
  257. );
  258. // Handle the $in value operator
  259. if (operator === "$in") {
  260. mongoFilter[key] = {
  261. $in: []
  262. };
  263. if (value.$in.length > 0)
  264. mongoFilter[key].$in = await async.map(
  265. value.$in,
  266. async (_value: any) => {
  267. if (
  268. typeof schema[key].type === "function"
  269. ) {
  270. //
  271. // const Type = schema[key].type;
  272. // const castValue = new Type(_value);
  273. // if (schema[key].validate)
  274. // await schema[key]
  275. // .validate(castValue)
  276. // .catch(err => {
  277. // throw new Error(
  278. // `Invalid value for ${key}, ${err}`
  279. // );
  280. // });
  281. return _value;
  282. }
  283. throw new Error(
  284. `Invalid schema type for ${key}`
  285. );
  286. }
  287. );
  288. } else
  289. throw new Error(
  290. `Unhandled operator "${operator}", this should never happen!`
  291. );
  292. } else if (typeof schema[key].type === "function") {
  293. // Do type checking/casting here
  294. // const Type = schema[key].type;
  295. // // const castValue = new Type(value);
  296. // if (schema[key].validate)
  297. // await schema[key].validate(castValue).catch(err => {
  298. // throw new Error(`Invalid value for ${key}, ${err}`);
  299. // });
  300. mongoFilter[key] = value;
  301. } else throw new Error(`Invalid schema type for ${key}`);
  302. }
  303. });
  304. if (containsRestrictedProperties) canCache = false;
  305. return { mongoFilter, containsRestrictedProperties, canCache };
  306. }
  307. // TODO hide sensitive fields
  308. // TODO improve caching
  309. // TODO add option to only request certain fields
  310. // TODO add support for computed fields
  311. // TODO parse query - validation
  312. // TODO add proper typescript support
  313. // TODO add proper jsdoc
  314. // TODO add support for enum document attributes
  315. // TODO add support for array document attributes
  316. // TODO add support for reference document attributes
  317. // TODO prevent caching if requiring restricted values
  318. // TODO fix 2nd layer of schema
  319. /**
  320. * find - Get one or more document(s) from a single collection
  321. *
  322. * @param payload - Payload
  323. * @returns Returned object
  324. */
  325. public find<CollectionNameType extends keyof Collections>(
  326. context: JobContext,
  327. {
  328. collection, // Collection name
  329. filter, // Similar to MongoDB filter
  330. values, // TODO: Add support
  331. limit = 0, // TODO have limit off by default?
  332. page = 1,
  333. useCache = true
  334. }: {
  335. collection: CollectionNameType;
  336. filter: Record<string, any>;
  337. values?: Record<string, any>;
  338. limit?: number;
  339. page?: number;
  340. useCache?: boolean;
  341. }
  342. ): Promise<any | null> {
  343. return new Promise((resolve, reject) => {
  344. let queryHash: string | null = null;
  345. let cacheable = useCache !== false;
  346. async.waterfall(
  347. [
  348. // Verify whether the collection exists
  349. async () => {
  350. if (!collection)
  351. throw new Error("No collection specified");
  352. if (this.collections && !this.collections[collection])
  353. throw new Error("Collection not found");
  354. },
  355. // Verify whether the query is valid-enough to continue
  356. async () =>
  357. this.parseFindFilter(
  358. filter,
  359. this.collections![collection].schema.document
  360. ),
  361. // If we can use cache, get from the cache, and if we get results return those
  362. async ({ mongoFilter, canCache }: any) => {
  363. // console.log(111, mongoFilter, canCache);
  364. // If we're allowed to cache, and the filter doesn't reference any restricted fields, try to cache the query and its response
  365. if (cacheable && canCache) {
  366. // Turn the query object into a sha1 hash that can be used as a Redis key
  367. queryHash = hash(
  368. { collection, mongoFilter, values, limit, page },
  369. {
  370. algorithm: "sha1"
  371. }
  372. );
  373. // Check if the query hash already exists in Redis, and get it if it is
  374. const cachedQuery = await this.redis?.GET(
  375. `query.find.${queryHash}`
  376. );
  377. // Return the mongoFilter along with the cachedDocuments, if any
  378. return {
  379. mongoFilter,
  380. cachedDocuments: cachedQuery
  381. ? JSON.parse(cachedQuery)
  382. : null
  383. };
  384. }
  385. return { mongoFilter, cachedDocuments: null };
  386. },
  387. // If we didn't get documents from the cache, get them from mongo
  388. async ({ mongoFilter, cachedDocuments }: any) => {
  389. if (cachedDocuments) {
  390. cacheable = false;
  391. return cachedDocuments;
  392. }
  393. // const getFindValues = async (object: any) => {
  394. // const find: any = {};
  395. // await async.each(
  396. // Object.entries(object),
  397. // async ([key, value]) => {
  398. // if (
  399. // value.type === undefined &&
  400. // Object.keys(value).length > 0
  401. // ) {
  402. // const _find = await getFindValues(
  403. // value
  404. // );
  405. // if (Object.keys(_find).length > 0)
  406. // find[key] = _find;
  407. // } else if (!value.restricted)
  408. // find[key] = true;
  409. // }
  410. // );
  411. // return find;
  412. // };
  413. // const find: any = await getFindValues(
  414. // this.collections![collection].schema.document
  415. // );
  416. const mongoProjection = null;
  417. return this.collections?.[collection].model
  418. .find(mongoFilter, mongoProjection)
  419. .limit(limit)
  420. .skip((page - 1) * limit);
  421. },
  422. // Convert documents from Mongoose model to regular objects
  423. async (documents: any[]) =>
  424. async.map(documents, async (document: any) => {
  425. // const { castQuery } = await this.parseFindQuery(
  426. // document._doc || document,
  427. // this.collections![collection].schema.document,
  428. // { operators: false }
  429. // );
  430. // return castQuery;
  431. return document._doc ? document._doc : document;
  432. // console.log("DIE");
  433. }),
  434. // Add documents to the cache
  435. async (documents: any[]) => {
  436. // Adds query results to cache but doesnt await
  437. if (cacheable && queryHash) {
  438. this.redis!.SET(
  439. `query.find.${queryHash}`,
  440. JSON.stringify(documents),
  441. {
  442. EX: 60
  443. }
  444. );
  445. }
  446. return documents;
  447. }
  448. ],
  449. (err, documents?: any[]) => {
  450. if (err) reject(err);
  451. else if (!documents || documents!.length === 0)
  452. resolve(limit === 1 ? null : []);
  453. else resolve(limit === 1 ? documents![0] : documents);
  454. }
  455. );
  456. });
  457. }
  458. }
  459. export type DataModuleJobs = {
  460. [Property in keyof UniqueMethods<DataModule>]: {
  461. payload: Parameters<UniqueMethods<DataModule>[Property]>[1];
  462. returns: Awaited<ReturnType<UniqueMethods<DataModule>[Property]>>;
  463. };
  464. };