DataModule.ts 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976
  1. // @ts-nocheck
  2. import async from "async";
  3. import config from "config";
  4. import { Db, MongoClient, ObjectId } from "mongodb";
  5. import hash from "object-hash";
  6. import { createClient, RedisClientType } from "redis";
  7. import JobContext from "src/JobContext";
  8. import BaseModule from "../BaseModule";
  9. import ModuleManager from "../ModuleManager";
  10. import { UniqueMethods } from "../types/Modules";
  11. import { Collections, Types } from "../types/Collections";
  12. export default class DataModule extends BaseModule {
  13. private collections?: Collections;
  14. private mongoClient?: MongoClient;
  15. private mongoDb?: Db;
  16. private redisClient?: RedisClientType;
  17. /**
  18. * Data Module
  19. *
  20. * @param moduleManager - Module manager class
  21. */
  22. public constructor(moduleManager: ModuleManager) {
  23. super(moduleManager, "data");
  24. }
  25. /**
  26. * startup - Startup data module
  27. */
  28. public override startup(): Promise<void> {
  29. return new Promise((resolve, reject) => {
  30. async.waterfall(
  31. [
  32. async () => super.startup(),
  33. async () => {
  34. const mongoUrl = config.get<string>("mongo.url");
  35. this.mongoClient = new MongoClient(mongoUrl);
  36. await this.mongoClient.connect();
  37. this.mongoDb = this.mongoClient.db();
  38. },
  39. async () => this.loadCollections(),
  40. async () => {
  41. const { url, password } = config.get<{
  42. url: string;
  43. password: string;
  44. }>("redis");
  45. this.redisClient = createClient({
  46. url,
  47. password
  48. });
  49. return this.redisClient.connect();
  50. },
  51. async () => {
  52. if (!this.redisClient)
  53. throw new Error("Redis connection not established");
  54. return this.redisClient.sendCommand([
  55. "CONFIG",
  56. "GET",
  57. "notify-keyspace-events"
  58. ]);
  59. },
  60. async (redisConfigResponse: string[]) => {
  61. if (
  62. !(
  63. Array.isArray(redisConfigResponse) &&
  64. redisConfigResponse[1] === "xE"
  65. )
  66. )
  67. throw new Error(
  68. `notify-keyspace-events is NOT configured correctly! It is set to: ${
  69. (Array.isArray(redisConfigResponse) &&
  70. redisConfigResponse[1]) ||
  71. "unknown"
  72. }`
  73. );
  74. },
  75. async () => super.started()
  76. ],
  77. err => {
  78. if (err) reject(err);
  79. else resolve();
  80. }
  81. );
  82. });
  83. }
  84. /**
  85. * shutdown - Shutdown data module
  86. */
  87. public override shutdown(): Promise<void> {
  88. return new Promise(resolve => {
  89. super
  90. .shutdown()
  91. .then(async () => {
  92. // TODO: Ensure the following shutdown correctly
  93. if (this.redisClient) await this.redisClient.quit();
  94. if (this.mongoClient) await this.mongoClient.close(false);
  95. })
  96. .finally(() => resolve());
  97. });
  98. }
  99. /**
  100. *
  101. * @param schema Our own schema format
  102. * @returns A Mongoose-compatible schema format
  103. */
  104. private convertSchemaToMongooseSchema(schema: any) {
  105. // Convert basic types from our own schema types to Mongoose schema types
  106. const typeToMongooseType = (type: Types) => {
  107. switch (type) {
  108. case Types.String:
  109. return String;
  110. case Types.Number:
  111. return Number;
  112. case Types.Date:
  113. return Date;
  114. case Types.Boolean:
  115. return Boolean;
  116. case Types.ObjectId:
  117. return MongooseTypes.ObjectId;
  118. default:
  119. return null;
  120. }
  121. };
  122. const schemaEntries = Object.entries(schema);
  123. const mongooseSchemaEntries = schemaEntries.map(([key, value]) => {
  124. let mongooseEntry = {};
  125. // Handle arrays
  126. if (value.type === Types.Array) {
  127. // Handle schemas in arrays
  128. if (value.item.type === Types.Schema)
  129. mongooseEntry = [
  130. this.convertSchemaToMongooseSchema(value.item.schema)
  131. ];
  132. // We don't support nested arrays
  133. else if (value.item.type === Types.Array)
  134. throw new Error("Nested arrays are not supported.");
  135. // Handle regular types in array
  136. else mongooseEntry.type = [typeToMongooseType(value.item.type)];
  137. }
  138. // Handle schemas
  139. else if (value.type === Types.Schema)
  140. mongooseEntry = this.convertSchemaToMongooseSchema(
  141. value.schema
  142. );
  143. // Handle regular types
  144. else mongooseEntry.type = typeToMongooseType(value.type);
  145. return [key, mongooseEntry];
  146. });
  147. const mongooseSchema = Object.fromEntries(mongooseSchemaEntries);
  148. return mongooseSchema;
  149. }
  150. /**
  151. * loadColllection - Import and load collection schema
  152. *
  153. * @param collectionName - Name of the collection
  154. * @returns Collection
  155. */
  156. private loadCollection<T extends keyof Collections>(
  157. collectionName: T
  158. ): Promise<Collections[T]> {
  159. return new Promise(resolve => {
  160. import(`../collections/${collectionName.toString()}`).then(
  161. ({ schema }: { schema: Collections[T]["schema"] }) => {
  162. resolve({
  163. schema,
  164. collection: this.mongoDb?.collection(
  165. collectionName.toString()
  166. )
  167. });
  168. }
  169. );
  170. });
  171. }
  172. /**
  173. * loadCollections - Load and initialize all collections
  174. *
  175. * @returns Promise
  176. */
  177. private loadCollections(): Promise<void> {
  178. return new Promise((resolve, reject) => {
  179. const fetchCollections = async () => ({
  180. abc: await this.loadCollection("abc")
  181. });
  182. fetchCollections()
  183. .then(collections => {
  184. this.collections = collections;
  185. resolve();
  186. })
  187. .catch(err => {
  188. reject(new Error(err));
  189. });
  190. });
  191. }
  192. /**
  193. * Returns the projection array/object that is one level deeper based on the property key
  194. *
  195. * @param projection The projection object/array
  196. * @param key The property key
  197. * @returns Array or Object
  198. */
  199. private getDeeperProjection(projection: any, key: string) {
  200. let deeperProjection;
  201. if (Array.isArray(projection))
  202. deeperProjection = projection
  203. .filter(property => property.startsWith(`${key}.`))
  204. .map(property => property.substr(`${key}.`.length));
  205. else if (typeof projection === "object")
  206. deeperProjection =
  207. projection[key] ??
  208. Object.keys(projection).reduce(
  209. (wipProjection, property) =>
  210. property.startsWith(`${key}.`)
  211. ? {
  212. ...wipProjection,
  213. [property.substr(`${key}.`.length)]:
  214. projection[property]
  215. }
  216. : wipProjection,
  217. {}
  218. );
  219. return deeperProjection;
  220. }
  221. /**
  222. * Whether a property is allowed in a projection array/object
  223. *
  224. * @param projection
  225. * @param property
  226. * @returns
  227. */
  228. private allowedByProjection(projection: any, property: string) {
  229. let topLevelKeys = [];
  230. if (Array.isArray(projection))
  231. topLevelKeys = projection.map(key => [key, true]);
  232. else if (typeof projection === "object")
  233. topLevelKeys = Object.entries(projection);
  234. // Turn a list of properties like ["propertyName", "propertyNameTwo.nestedProperty", "propertyName.test"] and into ["propertyName", "propertyNameTwo"]
  235. topLevelKeys = topLevelKeys.reduce((arr, [key, value]) => {
  236. let normalizedKey = key;
  237. if (normalizedKey.indexOf(".") !== -1)
  238. normalizedKey = normalizedKey.substr(
  239. 0,
  240. normalizedKey.indexOf(".")
  241. );
  242. if (arr.indexOf(normalizedKey) === -1)
  243. return [...arr, [normalizedKey, value]];
  244. return arr;
  245. }, []);
  246. topLevelKeys = Object.fromEntries(topLevelKeys);
  247. return !!topLevelKeys[property];
  248. }
  249. /**
  250. * Strip a document object from any unneeded properties, or of any restricted properties
  251. * If a projection is given
  252. *
  253. * @param document The document object
  254. * @param schema The schema object
  255. * @param projection The projection, which can be null
  256. * @returns
  257. */
  258. private async stripDocument(document: any, schema: any, projection: any) {
  259. // TODO add better comments
  260. // TODO add support for nested objects in arrays
  261. // TODO possibly do different things with required properties?
  262. // TODO possibly do different things with properties with default?
  263. // TODO handle projection excluding properties, rather than assume it's only including properties
  264. const unfilteredEntries = Object.entries(document);
  265. const filteredEntries = await async.reduce(
  266. unfilteredEntries,
  267. [],
  268. async (memo, [key, value]) => {
  269. // If the property does not exist in the schema, return the memo, so we won't return the key/value in the stripped document
  270. if (!schema[key]) return memo;
  271. // If we have a projection, check if the current key is allowed by it. If it not, just return the memo
  272. if (projection) {
  273. const allowedByProjection = this.allowedByProjection(
  274. projection,
  275. key
  276. );
  277. if (!allowedByProjection) return memo;
  278. }
  279. // Handle nested object
  280. if (schema[key].type === Types.Schema) {
  281. // TODO possibly return nothing, or an empty object here instead?
  282. // If value is falsy, it can't be an object, so just return null
  283. if (!value) return [...memo, [key, null]];
  284. // Get the projection for the next layer
  285. const deeperProjection = this.getDeeperProjection(
  286. projection,
  287. key
  288. );
  289. // Generate a stripped document/object for the current key/value
  290. const strippedDocument = await this.stripDocument(
  291. value,
  292. schema[key].schema,
  293. deeperProjection
  294. );
  295. // If the returned stripped document/object has keys, add the current key with that document/object to the memeo
  296. if (Object.keys(strippedDocument).length > 0)
  297. return [...memo, [key, strippedDocument]];
  298. // TODO possibly return null or an object here for the key instead?
  299. // The current key has no values that should be returned, so just return the memo
  300. return memo;
  301. }
  302. // Handle array type
  303. if (schema[key].type === Types.Array) {
  304. // TODO possibly return nothing, or an empty array here instead?
  305. // If value is falsy, return null with the key instead
  306. if (!value) return [...memo, [key, null]];
  307. // TODO possibly return nothing, or an empty array here instead?
  308. // If value isn't a valid array, return null with the key instead
  309. if (!Array.isArray(value)) return [...memo, [key, null]];
  310. // The type of the array items
  311. const itemType = schema[key].item.type;
  312. const items = await async.map(value, async item => {
  313. // Handle schema objects inside an array
  314. if (itemType === Types.Schema) {
  315. // TODO possibly return nothing, or an empty object here instead?
  316. // If item is falsy, it can't be an object, so just return null
  317. if (!item) return null;
  318. // Get the projection for the next layer
  319. const deeperProjection = this.getDeeperProjection(
  320. projection,
  321. key
  322. );
  323. // Generate a stripped document/object for the current key/value
  324. const strippedDocument = await this.stripDocument(
  325. item,
  326. schema[key].item.schema,
  327. deeperProjection
  328. );
  329. // If the returned stripped document/object has keys, return the stripped document
  330. if (Object.keys(strippedDocument).length > 0)
  331. return strippedDocument;
  332. // TODO possibly return object here instead?
  333. // The current item has no values that should be returned, so just return null
  334. return null;
  335. }
  336. // Nested arrays are not supported
  337. if (itemType === Types.Array) {
  338. throw new Error("Nested arrays not supported");
  339. }
  340. // Handle normal types
  341. else {
  342. // If item is null or undefined, return null
  343. const isNullOrUndefined =
  344. item === null || item === undefined;
  345. if (isNullOrUndefined) return null;
  346. // TODO possibly don't validate casted?
  347. // Cast item
  348. const castedValue = this.getCastedValue(
  349. item,
  350. itemType
  351. );
  352. return castedValue;
  353. }
  354. });
  355. return [...memo, [key, items]];
  356. }
  357. // If the property is restricted, return memo
  358. if (schema[key].restricted) return memo;
  359. // The property exists in the schema, is not explicitly allowed, is not restricted, so add it to memo
  360. // Add type casting here
  361. // TODO possible don't validate casted?
  362. const castedValue = this.getCastedValue(
  363. value,
  364. schema[key].type
  365. );
  366. return [...memo, [key, castedValue]];
  367. }
  368. );
  369. return Object.fromEntries(filteredEntries);
  370. }
  371. /**
  372. * Parse a projection based on the schema and any given projection
  373. * If no projection is given, it will exclude any restricted properties
  374. * If a projection is given, it will exclude restricted properties that are not explicitly allowed in a projection
  375. * It will return a projection used in Mongo, and if any restricted property is explicitly allowed, return that we can't use the cache
  376. *
  377. * @param schema The schema object
  378. * @param projection The project, which can be null
  379. * @returns
  380. */
  381. private async parseFindProjection(projection: any, schema: any) {
  382. // The mongo projection object we're going to build
  383. const mongoProjection = {};
  384. // This will be false if we let Mongo return any restricted properties
  385. let canCache = true;
  386. // TODO add better comments
  387. // TODO add support for nested objects in arrays
  388. const unfilteredEntries = Object.entries(schema);
  389. await async.forEach(unfilteredEntries, async ([key, value]) => {
  390. // If we have a projection set:
  391. if (projection) {
  392. const allowed = this.allowedByProjection(projection, key);
  393. const { restricted } = value;
  394. // If the property is explicitly allowed in the projection, but also restricted, find can't use cache
  395. if (allowed && restricted) {
  396. canCache = false;
  397. }
  398. // If the property is restricted, but not explicitly allowed, make sure to have mongo exclude it. As it's excluded from Mongo, caching isn't an issue for this property
  399. else if (restricted) {
  400. mongoProjection[key] = false;
  401. }
  402. // If the current property is a nested schema
  403. else if (value.type === Types.Schema) {
  404. // Get the projection for the next layer
  405. const deeperProjection = this.getDeeperProjection(
  406. projection,
  407. key
  408. );
  409. // Parse projection for the current value, so one level deeper
  410. const parsedProjection = await this.parseFindProjection(
  411. deeperProjection,
  412. value
  413. );
  414. // If the parsed projection mongo projection contains anything, update our own mongo projection
  415. if (
  416. Object.keys(parsedProjection.mongoProjection).length > 0
  417. )
  418. mongoProjection[key] = parsedProjection.mongoProjection;
  419. // If the parsed projection says we can't use the cache, make sure we can't use cache either
  420. canCache = canCache && parsedProjection.canCache;
  421. }
  422. }
  423. // If we have no projection set, and the current property is restricted, exclude the property from mongo, but don't say we can't use the cache
  424. else if (value.restricted) mongoProjection[key] = false;
  425. // If we have no projection set, and the current property is not restricted, and the current property is a nested object
  426. else if (value.type === Types.Schema) {
  427. // Pass the nested schema object recursively into the parseFindProjection function
  428. const parsedProjection = await this.parseFindProjection(
  429. null,
  430. value
  431. );
  432. // If the returned mongo projection includes anything special, include it in the mongo projection we're returning
  433. if (Object.keys(parsedProjection.mongoProjection).length > 0)
  434. mongoProjection[key] = parsedProjection.mongoProjection;
  435. // Since we're not passing a projection into parseFindProjection, there's no chance that we can't cache
  436. }
  437. });
  438. return {
  439. canCache,
  440. mongoProjection
  441. };
  442. }
  443. private getCastedValue(value, schemaType) {
  444. if (schemaType === Types.String) {
  445. // Check if value is a string, and if not, convert the value to a string
  446. const castedValue =
  447. typeof value === "string" ? value : String(value);
  448. // Any additional validation comes here
  449. return castedValue;
  450. }
  451. if (schemaType === Types.Number) {
  452. // Check if value is a number, and if not, convert the value to a number
  453. const castedValue =
  454. typeof value === "number" ? value : Number(value);
  455. // TODO possibly allow this via a validate boolean option?
  456. // We don't allow NaN for numbers, so throw an error
  457. if (Number.isNaN(castedValue))
  458. throw new Error(
  459. `Cast error, number cannot be NaN, at key ${key} with value ${value}`
  460. );
  461. // Any additional validation comes here
  462. return castedValue;
  463. }
  464. if (schemaType === Types.Date) {
  465. // Check if value is a Date, and if not, convert the value to a Date
  466. const castedValue =
  467. Object.prototype.toString.call(value) === "[object Date]"
  468. ? value
  469. : new Date(value);
  470. // TODO possibly allow this via a validate boolean option?
  471. // We don't allow invalid dates, so throw an error
  472. // eslint-disable-next-line
  473. if (isNaN(castedValue)) throw new Error(`Cast error, date cannot be invalid, at key ${key} with value ${value}`);
  474. // Any additional validation comes here
  475. return castedValue;
  476. }
  477. if (schemaType === Types.Boolean) {
  478. // Check if value is a boolean, and if not, convert the value to a boolean
  479. const castedValue =
  480. typeof value === "boolean" ? value : Boolean(value);
  481. // Any additional validation comes here
  482. return castedValue;
  483. }
  484. if (schemaType === Types.ObjectId) {
  485. // Cast the value as an ObjectId and let Mongoose handle the rest
  486. const castedValue = ObjectId(value);
  487. // Any additional validation comes here
  488. return castedValue;
  489. }
  490. throw new Error(
  491. `Unsupported schema type found with type ${Types[schemaType]}. This should never happen.`
  492. );
  493. }
  494. /**
  495. * parseFindFilter - Ensure validity of filter and return a mongo filter ---, or the document itself re-constructed
  496. *
  497. * @param filter - Filter
  498. * @param schema - Schema of collection document
  499. * @param options - Parser options
  500. * @returns Promise returning object with query values cast to schema types
  501. * and whether query includes restricted attributes
  502. */
  503. private async parseFindFilter(
  504. filter: any,
  505. schema: any,
  506. options?: {
  507. operators?: boolean;
  508. }
  509. ): Promise<{
  510. mongoFilter: any;
  511. containsRestrictedProperties: boolean;
  512. canCache: boolean;
  513. }> {
  514. if (!filter || typeof filter !== "object")
  515. throw new Error(
  516. "Invalid filter provided. Filter must be an object."
  517. );
  518. const keys = Object.keys(filter);
  519. if (keys.length === 0)
  520. throw new Error(
  521. "Invalid filter provided. Filter must contain keys."
  522. );
  523. // Whether to parse operators or not
  524. const operators = !(options && options.operators === false);
  525. // The MongoDB filter we're building
  526. const mongoFilter: any = {};
  527. // If the filter references any properties that are restricted, this will be true, so that find knows not to cache the query object
  528. let containsRestrictedProperties = false;
  529. // Whether this filter is cachable or not
  530. let canCache = true;
  531. // Operators at the key level that we support right now
  532. const allowedKeyOperators = ["$or", "$and"];
  533. // Operators at the value level that we support right now
  534. const allowedValueOperators = ["$in"];
  535. // Loop through all key/value properties
  536. await async.each(Object.entries(filter), async ([key, value]) => {
  537. // Key must be 1 character and exist
  538. if (!key || key.length === 0)
  539. throw new Error(
  540. `Invalid filter provided. Key must be at least 1 character.`
  541. );
  542. // Handle key operators, which always start with a $
  543. if (operators && key[0] === "$") {
  544. // Operator isn't found, so throw an error
  545. if (allowedKeyOperators.indexOf(key) === -1)
  546. throw new Error(
  547. `Invalid filter provided. Operator "${key}" is not allowed.`
  548. );
  549. // We currently only support $or and $and, but here we can have different logic for different operators
  550. if (key === "$or" || key === "$and") {
  551. // $or and $and should always be an array, so check if it is
  552. if (!Array.isArray(value) || value.length === 0)
  553. throw new Error(
  554. `Key "${key}" must contain array of filters.`
  555. );
  556. // Add the operator to the mongo filter object as an empty array
  557. mongoFilter[key] = [];
  558. // Run parseFindQuery again for child objects and add them to the mongo filter operator array
  559. await async.each(value, async _value => {
  560. const {
  561. mongoFilter: _mongoFilter,
  562. containsRestrictedProperties:
  563. _containsRestrictedProperties
  564. } = await this.parseFindFilter(_value, schema, options);
  565. // Actually add the returned filter object to the mongo filter we're building
  566. mongoFilter[key].push(_mongoFilter);
  567. if (_containsRestrictedProperties)
  568. containsRestrictedProperties = true;
  569. });
  570. } else
  571. throw new Error(
  572. `Unhandled operator "${key}", this should never happen!`
  573. );
  574. } else {
  575. // Here we handle any normal keys in the query object
  576. // If the key doesn't exist in the schema, throw an error
  577. if (!Object.hasOwn(schema, key))
  578. throw new Error(
  579. `Key "${key} does not exist in the schema."`
  580. );
  581. // If the key in the schema is marked as restricted, containsRestrictedProperties will be true
  582. if (schema[key].restricted) containsRestrictedProperties = true;
  583. // Handle schema type
  584. if (schema[key].type === Types.Schema) {
  585. // Run parseFindFilter on the nested schema object
  586. const {
  587. mongoFilter: _mongoFilter,
  588. containsRestrictedProperties:
  589. _containsRestrictedProperties
  590. } = await this.parseFindFilter(
  591. value,
  592. schema[key].schema,
  593. options
  594. );
  595. mongoFilter[key] = _mongoFilter;
  596. if (_containsRestrictedProperties)
  597. containsRestrictedProperties = true;
  598. }
  599. // Handle array type
  600. else if (schema[key].type === Types.Array) {
  601. // // Run parseFindFilter on the nested schema object
  602. // const {
  603. // mongoFilter: _mongoFilter,
  604. // containsRestrictedProperties:
  605. // _containsRestrictedProperties
  606. // } = await this.parseFindFilter(
  607. // value,
  608. // schema[key].schema,
  609. // options
  610. // );
  611. // mongoFilter[key] = _mongoFilter;
  612. // if (_containsRestrictedProperties)
  613. // containsRestrictedProperties = true;
  614. throw new Error("NOT SUPPORTED YET.");
  615. }
  616. // else if (
  617. // operators &&
  618. // typeof value === "object" &&
  619. // value &&
  620. // Object.keys(value).length === 1 &&
  621. // Object.keys(value)[0] &&
  622. // Object.keys(value)[0][0] === "$"
  623. // ) {
  624. // // This entire if statement is for handling value operators
  625. // const operator = Object.keys(value)[0];
  626. // // Operator isn't found, so throw an error
  627. // if (allowedValueOperators.indexOf(operator) === -1)
  628. // throw new Error(
  629. // `Invalid filter provided. Operator "${key}" is not allowed.`
  630. // );
  631. // // Handle the $in value operator
  632. // if (operator === "$in") {
  633. // mongoFilter[key] = {
  634. // $in: []
  635. // };
  636. // if (value.$in.length > 0)
  637. // mongoFilter[key].$in = await async.map(
  638. // value.$in,
  639. // async (_value: any) => {
  640. // // if (
  641. // // typeof schema[key].type === "function"
  642. // // ) {
  643. // // //
  644. // // // const Type = schema[key].type;
  645. // // // const castValue = new Type(_value);
  646. // // // if (schema[key].validate)
  647. // // // await schema[key]
  648. // // // .validate(castValue)
  649. // // // .catch(err => {
  650. // // // throw new Error(
  651. // // // `Invalid value for ${key}, ${err}`
  652. // // // );
  653. // // // });
  654. // // return _value;
  655. // // }
  656. // // throw new Error(
  657. // // `Invalid schema type for ${key}`
  658. // // );
  659. // console.log(_value);
  660. // return _value;
  661. // }
  662. // );
  663. // }
  664. // else
  665. // throw new Error(
  666. // `Unhandled operator "${operator}", this should never happen!`
  667. // );
  668. // }
  669. // Handle normal types
  670. else {
  671. const isNullOrUndefined =
  672. value === null || value === undefined;
  673. if (isNullOrUndefined && schema[key].required)
  674. throw new Error(
  675. `Value for key ${key} is required, so it cannot be null/undefined.`
  676. );
  677. // If the value is null or undefined, just set it as null
  678. if (isNullOrUndefined) mongoFilter[key] = null;
  679. // Cast and validate values
  680. else {
  681. const schemaType = schema[key].type;
  682. mongoFilter[key] = this.getCastedValue(
  683. value,
  684. schemaType
  685. );
  686. }
  687. }
  688. }
  689. });
  690. if (containsRestrictedProperties) canCache = false;
  691. return { mongoFilter, containsRestrictedProperties, canCache };
  692. }
  693. // TODO improve caching
  694. // TODO add support for computed fields
  695. // TODO parse query - validation
  696. // TODO add proper typescript support
  697. // TODO add proper jsdoc
  698. // TODO add support for enum document attributes
  699. // TODO add support for array document attributes
  700. // TODO add support for reference document attributes
  701. // TODO fix 2nd layer of schema
  702. /**
  703. * find - Get one or more document(s) from a single collection
  704. *
  705. * @param payload - Payload
  706. * @returns Returned object
  707. */
  708. public find<CollectionNameType extends keyof Collections>(
  709. context: JobContext,
  710. {
  711. collection, // Collection name
  712. filter, // Similar to MongoDB filter
  713. projection,
  714. limit = 0, // TODO have limit off by default?
  715. page = 1,
  716. useCache = true
  717. }: {
  718. collection: CollectionNameType;
  719. filter: Record<string, any>;
  720. projection?: Record<string, any> | string[];
  721. values?: Record<string, any>;
  722. limit?: number;
  723. page?: number;
  724. useCache?: boolean;
  725. }
  726. ): Promise<any | null> {
  727. return new Promise((resolve, reject) => {
  728. let queryHash: string | null = null;
  729. let cacheable = useCache !== false;
  730. let mongoFilter;
  731. let mongoProjection;
  732. async.waterfall(
  733. [
  734. // Verify whether the collection exists
  735. async () => {
  736. if (!collection)
  737. throw new Error("No collection specified");
  738. if (this.collections && !this.collections[collection])
  739. throw new Error("Collection not found");
  740. },
  741. // Verify whether the query is valid-enough to continue
  742. async () => {
  743. const parsedFilter = await this.parseFindFilter(
  744. filter,
  745. this.collections![collection].schema.document
  746. );
  747. cacheable = cacheable && parsedFilter.canCache;
  748. mongoFilter = parsedFilter.mongoFilter;
  749. },
  750. // Verify whether the query is valid-enough to continue
  751. async () => {
  752. const parsedProjection = await this.parseFindProjection(
  753. projection,
  754. this.collections![collection].schema.document
  755. );
  756. cacheable = cacheable && parsedProjection.canCache;
  757. mongoProjection = parsedProjection.mongoProjection;
  758. },
  759. // If we can use cache, get from the cache, and if we get results return those
  760. async () => {
  761. // If we're allowed to cache, and the filter doesn't reference any restricted fields, try to cache the query and its response
  762. if (cacheable) {
  763. // Turn the query object into a sha1 hash that can be used as a Redis key
  764. queryHash = hash(
  765. {
  766. collection,
  767. mongoFilter,
  768. limit,
  769. page
  770. },
  771. {
  772. algorithm: "sha1"
  773. }
  774. );
  775. // Check if the query hash already exists in Redis, and get it if it is
  776. const cachedQuery = await this.redisClient?.GET(
  777. `query.find.${queryHash}`
  778. );
  779. // Return the mongoFilter along with the cachedDocuments, if any
  780. return {
  781. cachedDocuments: cachedQuery
  782. ? JSON.parse(cachedQuery)
  783. : null
  784. };
  785. }
  786. return { cachedDocuments: null };
  787. },
  788. // If we didn't get documents from the cache, get them from mongo
  789. async ({ cachedDocuments }: any) => {
  790. if (cachedDocuments) {
  791. cacheable = false;
  792. return cachedDocuments;
  793. }
  794. // const getFindValues = async (object: any) => {
  795. // const find: any = {};
  796. // await async.each(
  797. // Object.entries(object),
  798. // async ([key, value]) => {
  799. // if (
  800. // value.type === undefined &&
  801. // Object.keys(value).length > 0
  802. // ) {
  803. // const _find = await getFindValues(
  804. // value
  805. // );
  806. // if (Object.keys(_find).length > 0)
  807. // find[key] = _find;
  808. // } else if (!value.restricted)
  809. // find[key] = true;
  810. // }
  811. // );
  812. // return find;
  813. // };
  814. // const find: any = await getFindValues(
  815. // this.collections![collection].schema.document
  816. // );
  817. // TODO, add mongo projection. Make sure to keep in mind caching with queryHash.
  818. return this.collections?.[collection].collection
  819. .find(mongoFilter, mongoProjection)
  820. .limit(limit)
  821. .skip((page - 1) * limit);
  822. },
  823. // Convert documents from MongoDB model to regular objects
  824. async (documents: any[]) =>
  825. async.map(documents, async (document: any) =>
  826. document._doc ? document._doc : document
  827. ),
  828. // Add documents to the cache
  829. async (documents: any[]) => {
  830. // Adds query results to cache but doesnt await
  831. if (cacheable && queryHash) {
  832. this.redisClient!.SET(
  833. `query.find.${queryHash}`,
  834. JSON.stringify(documents),
  835. {
  836. EX: 60
  837. }
  838. );
  839. }
  840. return documents;
  841. },
  842. // Strips the document of any unneeded properties or properties that are restricted
  843. async (documents: any[]) =>
  844. async.map(documents, async (document: any) =>
  845. this.stripDocument(
  846. document,
  847. this.collections![collection].schema.document,
  848. projection
  849. )
  850. )
  851. ],
  852. (err, documents?: any[]) => {
  853. if (err) reject(err);
  854. else if (!documents || documents!.length === 0)
  855. resolve(limit === 1 ? null : []);
  856. else resolve(limit === 1 ? documents![0] : documents);
  857. }
  858. );
  859. });
  860. }
  861. }
  862. export type DataModuleJobs = {
  863. [Property in keyof UniqueMethods<DataModule>]: {
  864. payload: Parameters<UniqueMethods<DataModule>[Property]>[1];
  865. returns: Awaited<ReturnType<UniqueMethods<DataModule>[Property]>>;
  866. };
  867. };