DataModule.ts 38 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228
  1. import config from "config";
  2. import { Db, MongoClient, ObjectId } from "mongodb";
  3. import { createHash } from "node:crypto";
  4. import { createClient, RedisClientType } from "redis";
  5. import JobContext from "../JobContext";
  6. import BaseModule from "../BaseModule";
  7. import ModuleManager from "../ModuleManager";
  8. import Schema, { Types } from "../Schema";
  9. import { Collections } from "../types/Collections";
  10. import { Document as SchemaDocument } from "../types/Document";
  11. import { UniqueMethods } from "../types/Modules";
  12. import { AttributeValue } from "../types/AttributeValue";
  13. type Entries<T> = {
  14. [K in keyof T]: [K, T[K]];
  15. }[keyof T][];
  16. interface ProjectionObject {
  17. [property: string]: boolean | string[] | ProjectionObject;
  18. }
  19. type Projection = null | undefined | string[] | ProjectionObject;
  20. type NormalizedProjection = {
  21. projection: [string, boolean][];
  22. mode: "includeAllBut" | "excludeAllBut";
  23. };
  24. interface MongoFilter {
  25. [property: string]:
  26. | AttributeValue
  27. | AttributeValue[]
  28. | MongoFilter
  29. | MongoFilter[];
  30. }
  31. interface Document {
  32. [property: string]:
  33. | AttributeValue
  34. | AttributeValue[]
  35. | Document
  36. | Document[];
  37. }
  38. type AllowedRestricted = boolean | string[] | null | undefined;
  39. export default class DataModule extends BaseModule {
  40. private collections?: Collections;
  41. private mongoClient?: MongoClient;
  42. private mongoDb?: Db;
  43. private redisClient?: RedisClientType;
  44. /**
  45. * Data Module
  46. *
  47. * @param moduleManager - Module manager class
  48. */
  49. public constructor(moduleManager: ModuleManager) {
  50. super(moduleManager, "data");
  51. }
  52. /**
  53. * startup - Startup data module
  54. */
  55. public override async startup() {
  56. await super.startup();
  57. const mongoUrl = config.get<string>("mongo.url");
  58. this.mongoClient = new MongoClient(mongoUrl);
  59. await this.mongoClient.connect();
  60. this.mongoDb = this.mongoClient.db();
  61. await this.loadCollections();
  62. const { url } = config.get<{ url: string }>("redis");
  63. this.redisClient = createClient({ url });
  64. await this.redisClient.connect();
  65. const redisConfigResponse = await this.redisClient.sendCommand([
  66. "CONFIG",
  67. "GET",
  68. "notify-keyspace-events"
  69. ]);
  70. if (
  71. !(
  72. Array.isArray(redisConfigResponse) &&
  73. redisConfigResponse[1] === "xE"
  74. )
  75. )
  76. throw new Error(
  77. `notify-keyspace-events is NOT configured correctly! It is set to: ${
  78. (Array.isArray(redisConfigResponse) &&
  79. redisConfigResponse[1]) ||
  80. "unknown"
  81. }`
  82. );
  83. await super.started();
  84. }
  85. /**
  86. * shutdown - Shutdown data module
  87. */
  88. public override async shutdown() {
  89. await super.shutdown();
  90. if (this.redisClient) await this.redisClient.quit();
  91. if (this.mongoClient) await this.mongoClient.close(false);
  92. }
  93. /**
  94. * loadColllection - Import and load collection schema
  95. *
  96. * @param collectionName - Name of the collection
  97. * @returns Collection
  98. */
  99. private async loadCollection<T extends keyof Collections>(
  100. collectionName: T
  101. ) {
  102. const { default: schema }: { default: Schema } = await import(
  103. `../collections/${collectionName.toString()}`
  104. );
  105. return {
  106. schema,
  107. collection: this.mongoDb!.collection(collectionName.toString())
  108. };
  109. }
  110. /**
  111. * loadCollections - Load and initialize all collections
  112. *
  113. * @returns Promise
  114. */
  115. private async loadCollections() {
  116. this.collections = {
  117. abc: await this.loadCollection("abc"),
  118. station: await this.loadCollection("station")
  119. };
  120. }
  121. /**
  122. * Takes a raw projection and turns it into a projection we can easily use
  123. *
  124. * @param projection - The raw projection
  125. * @returns Normalized projection
  126. */
  127. private normalizeProjection(projection: Projection): NormalizedProjection {
  128. let initialProjection = projection;
  129. if (
  130. !(projection && typeof initialProjection === "object") &&
  131. !Array.isArray(initialProjection)
  132. )
  133. initialProjection = [];
  134. // Flatten the projection into a 2-dimensional array of key-value pairs
  135. let flattenedProjection = this.flattenProjection(initialProjection);
  136. // Make sure all values are booleans
  137. flattenedProjection = flattenedProjection.map(([key, value]) => {
  138. if (typeof value !== "boolean") return [key, !!value];
  139. return [key, value];
  140. });
  141. // Validate whether we have any 1:1 duplicate keys, and if we do, throw a path collision error
  142. const projectionKeys = flattenedProjection.map(([key]) => key);
  143. const uniqueProjectionKeys = new Set(projectionKeys);
  144. if (uniqueProjectionKeys.size !== flattenedProjection.length)
  145. throw new Error("Path collision, non-unique key");
  146. // Check for path collisions that are not the same, but for example for nested keys, like prop1.prop2 and prop1.prop2.prop3
  147. projectionKeys.forEach(key => {
  148. // Non-nested paths don't need to be checked, they're covered by the earlier path collision checking
  149. if (key.indexOf(".") !== -1) {
  150. // Recursively check for each layer of a key whether that key exists already, and if it does, throw a path collision error
  151. const recursivelyCheckForPathCollision = (
  152. keyToCheck: string
  153. ) => {
  154. // Remove the last ".prop" from the key we want to check, to check if that has any collisions
  155. const subKey = keyToCheck.substring(
  156. 0,
  157. keyToCheck.lastIndexOf(".")
  158. );
  159. if (projectionKeys.indexOf(subKey) !== -1)
  160. throw new Error(
  161. `Path collision! ${key} collides with ${subKey}`
  162. );
  163. // The sub key has another layer or more, so check that layer for path collisions too
  164. if (subKey.indexOf(".") !== -1)
  165. recursivelyCheckForPathCollision(subKey);
  166. };
  167. recursivelyCheckForPathCollision(key);
  168. }
  169. });
  170. // Check if we explicitly allow anything (with the exception of _id)
  171. const anyNonIdTrues = flattenedProjection.reduce(
  172. (anyTrues, [key, value]) => anyTrues || (value && key !== "_id"),
  173. false
  174. );
  175. // By default, include everything except keys whose value is false
  176. let mode: "includeAllBut" | "excludeAllBut" = "includeAllBut";
  177. // If in the projection we have any keys whose value is true (with the exception of _id), switch to excluding all but keys we explicitly set to true in the projection
  178. if (anyNonIdTrues) mode = "excludeAllBut";
  179. return { projection: flattenedProjection, mode };
  180. }
  181. /**
  182. * Flatten the projection we've given (which can be an array of an object) into an array with key/value pairs
  183. *
  184. * @param projection - Projection
  185. * @returns
  186. */
  187. private flattenProjection(projection: Projection): [string, boolean][] {
  188. let flattenedProjection: [
  189. string,
  190. boolean | string[] | ProjectionObject
  191. ][] = [];
  192. if (!projection) throw new Error("Projection can't be null");
  193. // Turn object/array into a key/value array
  194. if (Array.isArray(projection))
  195. flattenedProjection = projection.map(key => [key, true]);
  196. else if (typeof projection === "object")
  197. flattenedProjection = Object.entries(projection);
  198. // Go through our projection array, and recursively check if there is another layer we need to flatten
  199. const newProjection: [string, boolean][] = flattenedProjection.reduce(
  200. (currentEntries: [string, boolean][], [key, value]) => {
  201. if (typeof value === "object") {
  202. let flattenedValue = this.flattenProjection(value);
  203. flattenedValue = flattenedValue.map(
  204. ([nextKey, nextValue]) => [
  205. `${key}.${nextKey}`,
  206. nextValue
  207. ]
  208. );
  209. return [...currentEntries, ...flattenedValue];
  210. }
  211. return [...currentEntries, [key, value]];
  212. },
  213. []
  214. );
  215. return newProjection;
  216. }
  217. /**
  218. * Parse a projection based on the schema and any given projection
  219. * If no projection is given, it will exclude any restricted properties
  220. * If a projection is given, it will exclude restricted properties that are not explicitly allowed in a projection
  221. * It will return a projection used in Mongo, and if any restricted property is explicitly allowed, return that we can't use the cache
  222. *
  223. * @param schema - The schema object
  224. * @param projection - The project, which can be null
  225. * @returns
  226. */
  227. private async parseFindProjection(
  228. projection: NormalizedProjection,
  229. schema: SchemaDocument,
  230. allowedRestricted: AllowedRestricted
  231. ) {
  232. // The mongo projection object we're going to build
  233. const mongoProjection: ProjectionObject = {};
  234. // This will be false if we let Mongo return any restricted properties
  235. let canCache = true;
  236. const unfilteredEntries = Object.entries(schema);
  237. await Promise.all(
  238. unfilteredEntries.map(async ([key, value]) => {
  239. const { restricted } = value;
  240. // Check if the current property is allowed or not based on allowedRestricted
  241. const allowedByRestricted =
  242. !restricted ||
  243. this.allowedByRestricted(allowedRestricted, key);
  244. // If the property is explicitly allowed in the projection, but also restricted, find can't use cache
  245. if (allowedByRestricted && restricted) {
  246. canCache = false;
  247. }
  248. // If the property is restricted, but not explicitly allowed, make sure to have mongo exclude it. As it's excluded from Mongo, caching isn't an issue for this property
  249. else if (!allowedByRestricted) {
  250. mongoProjection[key] = false;
  251. }
  252. // If the current property is a nested schema
  253. else if (value.type === Types.Schema) {
  254. // Get the projection for the next layer
  255. const deeperProjection = this.getDeeperProjection(
  256. projection,
  257. key
  258. );
  259. // Get the allowedRestricted for the next layer
  260. const deeperAllowedRestricted =
  261. this.getDeeperAllowedRestricted(allowedRestricted, key);
  262. if (!value.schema) throw new Error("Schema is not defined");
  263. // Parse projection for the current value, so one level deeper
  264. const parsedProjection = await this.parseFindProjection(
  265. deeperProjection,
  266. value.schema,
  267. deeperAllowedRestricted
  268. );
  269. // If the parsed projection mongo projection contains anything, update our own mongo projection
  270. if (
  271. Object.keys(parsedProjection.mongoProjection).length > 0
  272. )
  273. mongoProjection[key] = parsedProjection.mongoProjection;
  274. // If the parsed projection says we can't use the cache, make sure we can't use cache either
  275. canCache = canCache && parsedProjection.canCache;
  276. }
  277. })
  278. );
  279. return {
  280. canCache,
  281. mongoProjection
  282. };
  283. }
  284. /**
  285. * Whether a property is allowed if it's restricted
  286. *
  287. * @param projection - The projection object/array
  288. * @param property - Property name
  289. * @returns
  290. */
  291. private allowedByRestricted(
  292. allowedRestricted: AllowedRestricted,
  293. property: string
  294. ) {
  295. // All restricted properties are allowed, so allow
  296. if (allowedRestricted === true) return true;
  297. // No restricted properties are allowed, so don't allow
  298. if (!allowedRestricted) return false;
  299. // allowedRestricted is not valid, so don't allow
  300. if (!Array.isArray(allowedRestricted)) return false;
  301. // This exact property is allowed, so allow
  302. if (allowedRestricted.indexOf(property) !== -1) return true;
  303. // Don't allow by default
  304. return false;
  305. }
  306. /**
  307. * Whether a property is allowed in a projection array/object
  308. *
  309. * @param projection - The projection object/array
  310. * @param property - Property name
  311. * @returns
  312. */
  313. private allowedByProjection(
  314. projection: NormalizedProjection,
  315. property: string
  316. ) {
  317. const obj = Object.fromEntries(projection.projection);
  318. if (projection.mode === "excludeAllBut") {
  319. // Only allow if explicitly allowed
  320. if (obj[property]) return true;
  321. // If this is a nested property that has any allowed properties at some lower level, allow at this level
  322. const nestedTrue = projection.projection.reduce(
  323. (nestedTrue, [key, value]) => {
  324. if (value && key.startsWith(`${property}.`)) return true;
  325. return nestedTrue;
  326. },
  327. false
  328. );
  329. return nestedTrue;
  330. }
  331. if (projection.mode === "includeAllBut") {
  332. // Explicitly excluded, so don't allow
  333. if (obj[property] === false) return false;
  334. // Not explicitly excluded, so allow this level
  335. return true;
  336. }
  337. // This should never happen
  338. return false;
  339. }
  340. /**
  341. * Returns the projection array/object that is one level deeper based on the property key
  342. *
  343. * @param projection - The projection object/array
  344. * @param key - The property key
  345. * @returns Array or Object
  346. */
  347. private getDeeperProjection(
  348. projection: NormalizedProjection,
  349. currentKey: string
  350. ): NormalizedProjection {
  351. const newProjection: [string, boolean][] = projection.projection
  352. // Go through all key/values
  353. .map(([key, value]) => {
  354. // If a key has no ".", it has no deeper level, so return false
  355. // If a key doesn't start with the provided currentKey, it's useless to us, so return false
  356. if (
  357. key.indexOf(".") === -1 ||
  358. !key.startsWith(`${currentKey}.`)
  359. )
  360. return false;
  361. // Get the lower key, so everything after "."
  362. const lowerKey = key.substring(
  363. key.indexOf(".") + 1,
  364. key.length
  365. );
  366. // If the lower key is empty for some reason, return false, but this should never happen
  367. if (lowerKey.length === 0) return false;
  368. return [lowerKey, value];
  369. })
  370. // Filter out any false's, so only key/value pairs remain
  371. // .filter<[string, boolean]>(entries => !!entries);
  372. .filter((entries): entries is [string, boolean] => !!entries);
  373. // Return the new projection with the projection array, and the same existing mode for the projection
  374. return { projection: newProjection, mode: projection.mode };
  375. }
  376. /**
  377. * Returns the allowedRestricted that is one level deeper based on the property key
  378. *
  379. * @param projection - The projection object/array
  380. * @param key - The property key
  381. * @returns Array or Object
  382. */
  383. private getDeeperAllowedRestricted(
  384. allowedRestricted: AllowedRestricted,
  385. currentKey: string
  386. ): AllowedRestricted {
  387. //
  388. if (typeof allowedRestricted === "boolean") return allowedRestricted;
  389. if (!Array.isArray(allowedRestricted)) return false;
  390. const newAllowedRestricted: string[] = <string[]>allowedRestricted
  391. // Go through all key/values
  392. .map(key => {
  393. // If a key has no ".", it has no deeper level, so return false
  394. // If a key doesn't start with the provided currentKey, it's useless to us, so return false
  395. if (
  396. key.indexOf(".") === -1 ||
  397. !key.startsWith(`${currentKey}.`)
  398. )
  399. return false;
  400. // Get the lower key, so everything after "."
  401. const lowerKey = key.substring(
  402. key.indexOf(".") + 1,
  403. key.length
  404. );
  405. // If the lower key is empty for some reason, return false, but this should never happen
  406. if (lowerKey.length === 0) return false;
  407. return lowerKey;
  408. })
  409. // Filter out any false's, so only keys remain
  410. .filter(entries => entries);
  411. // Return the new allowedRestricted
  412. return newAllowedRestricted;
  413. }
  414. private getCastedValue(value: unknown, schemaType: Types): AttributeValue {
  415. if (value === null || value === undefined) return null;
  416. if (schemaType === Types.String) {
  417. // Check if value is a string, and if not, convert the value to a string
  418. const castedValue =
  419. typeof value === "string" ? value : String(value);
  420. // Any additional validation comes here
  421. return castedValue;
  422. }
  423. if (schemaType === Types.Number) {
  424. // Check if value is a number, and if not, convert the value to a number
  425. const castedValue =
  426. typeof value === "number" ? value : Number(value);
  427. // We don't allow NaN for numbers, so throw an error
  428. if (Number.isNaN(castedValue))
  429. throw new Error(
  430. `Cast error, number cannot be NaN, with value ${value}`
  431. );
  432. // Any additional validation comes here
  433. return castedValue;
  434. }
  435. if (schemaType === Types.Date) {
  436. // Check if value is a Date, and if not, convert the value to a Date
  437. const castedValue =
  438. Object.prototype.toString.call(value) === "[object Date]"
  439. ? (value as Date)
  440. : new Date(value.toString());
  441. // We don't allow invalid dates, so throw an error
  442. if (new Date(castedValue).toString() === "Invalid Date")
  443. throw new Error(
  444. `Cast error, date cannot be invalid, with value ${value}`
  445. );
  446. // Any additional validation comes here
  447. return castedValue;
  448. }
  449. if (schemaType === Types.Boolean) {
  450. // Check if value is a boolean, and if not, convert the value to a boolean
  451. const castedValue =
  452. typeof value === "boolean" ? value : Boolean(value);
  453. // Any additional validation comes here
  454. return castedValue;
  455. }
  456. if (schemaType === Types.ObjectId) {
  457. if (typeof value !== "string" && !(value instanceof ObjectId))
  458. throw new Error(
  459. `Cast error, ObjectId invalid, with value ${value}`
  460. );
  461. // Cast the value as an ObjectId and let Mongoose handle the rest
  462. const castedValue = new ObjectId(value);
  463. // Any additional validation comes here
  464. return castedValue;
  465. }
  466. throw new Error(
  467. `Unsupported schema type found with type ${Types[schemaType]}. This should never happen.`
  468. );
  469. }
  470. /**
  471. * parseFindFilter - Ensure validity of filter and return a mongo filter
  472. *
  473. * @param filter - Filter
  474. * @param schema - Schema of collection document
  475. * @param options - Parser options
  476. * @returns Promise returning object with query values cast to schema types
  477. * and whether query includes restricted attributes
  478. */
  479. private async parseFindFilter(
  480. filter: MongoFilter,
  481. schema: SchemaDocument,
  482. allowedRestricted: AllowedRestricted,
  483. options?: {
  484. operators?: boolean;
  485. }
  486. ): Promise<{
  487. mongoFilter: MongoFilter;
  488. containsRestrictedProperties: boolean;
  489. canCache: boolean;
  490. }> {
  491. if (!filter || typeof filter !== "object")
  492. throw new Error(
  493. "Invalid filter provided. Filter must be an object."
  494. );
  495. const keys = Object.keys(filter);
  496. if (keys.length === 0)
  497. throw new Error(
  498. "Invalid filter provided. Filter must contain keys."
  499. );
  500. // Whether to parse operators or not
  501. const operators = !(options && options.operators === false);
  502. // The MongoDB filter we're building
  503. const mongoFilter: MongoFilter = {};
  504. // If the filter references any properties that are restricted, this will be true, so that find knows not to cache the query object
  505. let containsRestrictedProperties = false;
  506. // Whether this filter is cachable or not
  507. let canCache = true;
  508. // Operators at the key level that we support right now
  509. const allowedKeyOperators = ["$or", "$and"];
  510. // Operators at the value level that we support right now
  511. const allowedValueOperators = ["$in"];
  512. // Loop through all key/value properties
  513. await Promise.all(
  514. Object.entries(filter).map(async ([key, value]) => {
  515. // Key must be 1 character and exist
  516. if (!key || key.length === 0)
  517. throw new Error(
  518. `Invalid filter provided. Key must be at least 1 character.`
  519. );
  520. // Handle key operators, which always start with a $
  521. if (operators && key[0] === "$") {
  522. // Operator isn't found, so throw an error
  523. if (allowedKeyOperators.indexOf(key) === -1)
  524. throw new Error(
  525. `Invalid filter provided. Operator "${key}" is not allowed.`
  526. );
  527. // We currently only support $or and $and, but here we can have different logic for different operators
  528. if (key === "$or" || key === "$and") {
  529. // $or and $and should always be an array, so check if it is
  530. if (!Array.isArray(value) || value.length === 0)
  531. throw new Error(
  532. `Key "${key}" must contain array of filters.`
  533. );
  534. // Add the operator to the mongo filter object as an empty array
  535. mongoFilter[key] = [];
  536. // Run parseFindQuery again for child objects and add them to the mongo filter operator array
  537. await Promise.all(
  538. value.map(async _value => {
  539. // Value must be an actual object, so if it's not, throw an error
  540. if (
  541. !_value ||
  542. typeof _value !== "object" ||
  543. _value.constructor.name !== "Object"
  544. )
  545. throw Error("not an object");
  546. const {
  547. mongoFilter: _mongoFilter,
  548. containsRestrictedProperties:
  549. _containsRestrictedProperties
  550. } = await this.parseFindFilter(
  551. _value as MongoFilter,
  552. schema,
  553. allowedRestricted,
  554. options
  555. );
  556. // Actually add the returned filter object to the mongo filter we're building
  557. (<MongoFilter[]>mongoFilter[key]).push(
  558. _mongoFilter
  559. );
  560. if (_containsRestrictedProperties)
  561. containsRestrictedProperties = true;
  562. })
  563. );
  564. } else
  565. throw new Error(
  566. `Unhandled operator "${key}", this should never happen!`
  567. );
  568. } else {
  569. // Here we handle any normal keys in the query object
  570. let currentKey = key;
  571. // If the key doesn't exist in the schema, throw an error
  572. if (!Object.hasOwn(schema, key)) {
  573. if (key.indexOf(".") !== -1) {
  574. currentKey = key.substring(0, key.indexOf("."));
  575. if (!Object.hasOwn(schema, currentKey))
  576. throw new Error(
  577. `Key "${currentKey}" does not exist in the schema.`
  578. );
  579. if (
  580. schema[currentKey].type !== Types.Schema &&
  581. (schema[currentKey].type !== Types.Array ||
  582. (schema[currentKey].item!.type !==
  583. Types.Schema &&
  584. schema[currentKey].item!.type !==
  585. Types.Array))
  586. )
  587. throw new Error(
  588. `Key "${currentKey}" is not a schema/array.`
  589. );
  590. } else
  591. throw new Error(
  592. `Key "${key}" does not exist in the schema.`
  593. );
  594. }
  595. const { restricted } = schema[currentKey];
  596. // Check if the current property is allowed or not based on allowedRestricted
  597. const allowedByRestricted =
  598. !restricted ||
  599. this.allowedByRestricted(allowedRestricted, currentKey);
  600. if (!allowedByRestricted)
  601. throw new Error(`Key "${currentKey}" is restricted.`);
  602. // If the key in the schema is marked as restricted, containsRestrictedProperties will be true
  603. if (restricted) containsRestrictedProperties = true;
  604. // Handle value operators
  605. if (
  606. operators &&
  607. typeof value === "object" &&
  608. value &&
  609. Object.keys(value).length === 1 &&
  610. Object.keys(value)[0] &&
  611. Object.keys(value)[0][0] === "$"
  612. ) {
  613. // This entire if statement is for handling value operators like $in
  614. const operator = Object.keys(value)[0];
  615. // Operator isn't found, so throw an error
  616. if (allowedValueOperators.indexOf(operator) === -1)
  617. throw new Error(
  618. `Invalid filter provided. Operator "${operator}" is not allowed.`
  619. );
  620. // Handle the $in value operator
  621. if (operator === "$in") {
  622. // Decide what type should be for the values for $in
  623. let { type } = schema[currentKey];
  624. // We don't allow schema type for $in
  625. if (type === Types.Schema)
  626. throw new Error(
  627. `Key "${currentKey}" is of type schema, which is not allowed with $in`
  628. );
  629. // Set the type to be the array item type if it's about an array
  630. if (type === Types.Array)
  631. type = schema[key].item!.type;
  632. const value$in = (<{ $in: AttributeValue[] }>value)
  633. .$in;
  634. let filter$in: AttributeValue[] = [];
  635. if (!Array.isArray(value$in))
  636. throw new Error("$in musr be array");
  637. // Loop through all $in array items, check if they're not null/undefined, cast them, and return a new array
  638. if (value$in.length > 0)
  639. filter$in = await Promise.all(
  640. value$in.map(async _value => {
  641. const isNullOrUndefined =
  642. _value === null ||
  643. _value === undefined;
  644. if (isNullOrUndefined)
  645. throw new Error(
  646. `Value for key ${currentKey} using $in is undefuned/null, which is not allowed.`
  647. );
  648. const castedValue = this.getCastedValue(
  649. _value,
  650. type
  651. );
  652. return castedValue;
  653. })
  654. );
  655. mongoFilter[currentKey] = { $in: filter$in };
  656. } else
  657. throw new Error(
  658. `Unhandled operator "${operator}", this should never happen!`
  659. );
  660. }
  661. // Handle schema type
  662. else if (schema[currentKey].type === Types.Schema) {
  663. let subFilter;
  664. if (key.indexOf(".") !== -1) {
  665. const subKey = key.substring(
  666. key.indexOf(".") + 1,
  667. key.length
  668. );
  669. subFilter = {
  670. [subKey]: value
  671. };
  672. } else subFilter = value;
  673. // Sub-filter must be an actual object, so if it's not, throw an error
  674. if (
  675. !subFilter ||
  676. typeof subFilter !== "object" ||
  677. subFilter.constructor.name !== "Object"
  678. )
  679. throw Error("not an object");
  680. // Get the allowedRestricted for the next layer
  681. const deeperAllowedRestricted =
  682. this.getDeeperAllowedRestricted(
  683. allowedRestricted,
  684. currentKey
  685. );
  686. // Run parseFindFilter on the nested schema object
  687. const {
  688. mongoFilter: _mongoFilter,
  689. containsRestrictedProperties:
  690. _containsRestrictedProperties
  691. } = await this.parseFindFilter(
  692. subFilter as MongoFilter,
  693. schema[currentKey].schema!,
  694. deeperAllowedRestricted,
  695. options
  696. );
  697. mongoFilter[currentKey] = _mongoFilter;
  698. if (_containsRestrictedProperties)
  699. containsRestrictedProperties = true;
  700. }
  701. // Handle array type
  702. else if (schema[currentKey].type === Types.Array) {
  703. const isNullOrUndefined =
  704. value === null || value === undefined;
  705. if (isNullOrUndefined)
  706. throw new Error(
  707. `Value for key ${currentKey} is an array item, so it cannot be null/undefined.`
  708. );
  709. // The type of the array items
  710. const itemType = schema[currentKey].item!.type;
  711. // Handle nested arrays, which are not supported
  712. if (itemType === Types.Array)
  713. throw new Error("Nested arrays not supported");
  714. // Handle schema array item type
  715. else if (itemType === Types.Schema) {
  716. let subFilter;
  717. if (key.indexOf(".") !== -1) {
  718. const subKey = key.substring(
  719. key.indexOf(".") + 1,
  720. key.length
  721. );
  722. subFilter = {
  723. [subKey]: value
  724. };
  725. } else subFilter = value;
  726. // Sub-filter must be an actual object, so if it's not, throw an error
  727. if (
  728. typeof subFilter !== "object" ||
  729. subFilter.constructor.name !== "Object"
  730. )
  731. throw Error("not an object");
  732. // Get the allowedRestricted for the next layer
  733. const deeperAllowedRestricted =
  734. this.getDeeperAllowedRestricted(
  735. allowedRestricted,
  736. currentKey
  737. );
  738. const {
  739. mongoFilter: _mongoFilter,
  740. containsRestrictedProperties:
  741. _containsRestrictedProperties
  742. } = await this.parseFindFilter(
  743. subFilter as MongoFilter,
  744. schema[currentKey].item!.schema!,
  745. deeperAllowedRestricted,
  746. options
  747. );
  748. mongoFilter[currentKey] = _mongoFilter;
  749. if (_containsRestrictedProperties)
  750. containsRestrictedProperties = true;
  751. }
  752. // Normal array item type
  753. else {
  754. // Value must not be an array, so if it is, throw an error
  755. if (Array.isArray(value)) throw Error("an array");
  756. // Value must not be an actual object, so if it is, throw an error
  757. if (
  758. typeof value === "object" &&
  759. value.constructor.name === "Object"
  760. )
  761. throw Error("an object");
  762. mongoFilter[currentKey] = this.getCastedValue(
  763. value as AttributeValue,
  764. itemType
  765. );
  766. }
  767. }
  768. // Handle normal types
  769. else {
  770. const isNullOrUndefined =
  771. value === null || value === undefined;
  772. if (isNullOrUndefined && schema[key].required)
  773. throw new Error(
  774. `Value for key ${key} is required, so it cannot be null/undefined.`
  775. );
  776. // If the value is null or undefined, just set it as null
  777. if (isNullOrUndefined) mongoFilter[key] = null;
  778. // Cast and validate values
  779. else {
  780. const schemaType = schema[key].type;
  781. // Value must not be an array, so if it is, throw an error
  782. if (Array.isArray(value)) throw Error("an array");
  783. // Value must not be an actual object, so if it is, throw an error
  784. if (
  785. typeof value === "object" &&
  786. value.constructor.name === "Object"
  787. )
  788. throw Error("an object");
  789. mongoFilter[key] = this.getCastedValue(
  790. value as AttributeValue,
  791. schemaType
  792. );
  793. }
  794. }
  795. }
  796. })
  797. );
  798. if (containsRestrictedProperties) canCache = false;
  799. return { mongoFilter, containsRestrictedProperties, canCache };
  800. }
  801. /**
  802. * Strip a document object from any unneeded properties, or of any restricted properties
  803. * If a projection is given
  804. * Also casts some values
  805. *
  806. * @param document - The document object
  807. * @param schema - The schema object
  808. * @param projection - The projection, which can be null
  809. */
  810. private async stripDocument(
  811. document: Document,
  812. schema: SchemaDocument,
  813. projection: NormalizedProjection,
  814. allowedRestricted: AllowedRestricted
  815. ): Promise<Document> {
  816. const unfilteredEntries = Object.entries(document);
  817. // Go through all properties in the document to decide whether to allow it or not, and possibly casts the value to its property type
  818. const filteredEntries: Entries<Document> = [];
  819. await Promise.all(
  820. unfilteredEntries.map(async ([key, value]) => {
  821. // If the property does not exist in the schema, return the memo, so we won't return the key/value in the stripped document
  822. if (!schema[key]) return;
  823. // If we have a projection, check if the current key is allowed by it. If it not, just return the memo
  824. const allowedByProjection = this.allowedByProjection(
  825. projection,
  826. key
  827. );
  828. const allowedByRestricted =
  829. !schema[key].restricted ||
  830. this.allowedByRestricted(allowedRestricted, key);
  831. if (!allowedByProjection) return;
  832. if (!allowedByRestricted) return;
  833. // Handle nested object
  834. if (schema[key].type === Types.Schema) {
  835. // If value is falsy, it can't be an object, so just return null
  836. if (!value) {
  837. filteredEntries.push([key, null]);
  838. return;
  839. }
  840. // Value must be an actual object, so if it's not, throw an error
  841. if (
  842. typeof value !== "object" ||
  843. value.constructor.name !== "Object"
  844. )
  845. throw Error("not an object");
  846. // Get the projection for the next layer
  847. const deeperProjection = this.getDeeperProjection(
  848. projection,
  849. key
  850. );
  851. // Get the allowedRestricted for the next layer
  852. const deeperAllowedRestricted =
  853. this.getDeeperAllowedRestricted(allowedRestricted, key);
  854. // Generate a stripped document/object for the current key/value
  855. const strippedDocument = await this.stripDocument(
  856. value as Document, // We can be sure the value is a document, so this is for TypeScript to be happy
  857. schema[key].schema!,
  858. deeperProjection,
  859. deeperAllowedRestricted
  860. );
  861. // If the returned stripped document/object has keys, add the current key with that document/object to the memo
  862. if (Object.keys(strippedDocument).length > 0) {
  863. filteredEntries.push([key, strippedDocument]);
  864. return;
  865. }
  866. // The current key has no values that should be returned, so just return empty object
  867. filteredEntries.push([key, {}]);
  868. return;
  869. }
  870. // Handle array type
  871. if (schema[key].type === Types.Array) {
  872. // If value is falsy, return null with the key instead
  873. if (!value) {
  874. filteredEntries.push([key, null]);
  875. return;
  876. }
  877. // If value isn't a valid array, throw error
  878. if (!Array.isArray(value)) throw Error("not an array");
  879. // The type of the array items
  880. const itemType = schema[key].item!.type;
  881. const items = (await Promise.all(
  882. value.map(async item => {
  883. // Handle schema objects inside an array
  884. if (itemType === Types.Schema) {
  885. // Item must be an actual object, so if it's not, throw an error
  886. if (
  887. !item ||
  888. typeof item !== "object" ||
  889. item.constructor.name !== "Object"
  890. )
  891. throw Error("not an object");
  892. // Get the projection for the next layer
  893. const deeperProjection =
  894. this.getDeeperProjection(projection, key);
  895. // Get the allowedRestricted for the next layer
  896. const deeperAllowedRestricted =
  897. this.getDeeperAllowedRestricted(
  898. allowedRestricted,
  899. key
  900. );
  901. // Generate a stripped document/object for the current key/value
  902. const strippedDocument =
  903. await this.stripDocument(
  904. item as Document, // We can be sure the item is a document, so this is for TypeScript to be happy
  905. schema[key].item!.schema!,
  906. deeperProjection,
  907. deeperAllowedRestricted
  908. );
  909. // If the returned stripped document/object has keys, return the stripped document
  910. if (Object.keys(strippedDocument).length > 0)
  911. return strippedDocument;
  912. // The current item has no values that should be returned, so just return empty object
  913. return {};
  914. }
  915. // Nested arrays are not supported
  916. if (itemType === Types.Array) {
  917. throw new Error("Nested arrays not supported");
  918. }
  919. // Handle normal types
  920. else {
  921. // If item is null or undefined, return null
  922. const isNullOrUndefined =
  923. item === null || item === undefined;
  924. if (isNullOrUndefined) return null;
  925. // Cast item
  926. const castedValue = this.getCastedValue(
  927. item,
  928. itemType
  929. );
  930. return castedValue;
  931. }
  932. })
  933. )) as AttributeValue[] | Document[];
  934. filteredEntries.push([key, items]);
  935. return;
  936. }
  937. // Handle normal types
  938. // Cast item
  939. const castedValue = this.getCastedValue(
  940. value,
  941. schema[key].type
  942. );
  943. filteredEntries.push([key, castedValue]);
  944. })
  945. );
  946. return Object.fromEntries(filteredEntries);
  947. }
  948. /**
  949. * find - Get one or more document(s) from a single collection
  950. *
  951. * @param payload - Payload
  952. * @returns Returned object
  953. */
  954. public async find<CollectionNameType extends keyof Collections>(
  955. context: JobContext,
  956. {
  957. collection, // Collection name
  958. filter, // Similar to MongoDB filter
  959. projection,
  960. allowedRestricted,
  961. limit = 1,
  962. page = 1,
  963. useCache = true
  964. }: {
  965. collection: CollectionNameType;
  966. filter: MongoFilter;
  967. projection?: Projection;
  968. allowedRestricted?: boolean | string[];
  969. limit?: number;
  970. page?: number;
  971. useCache?: boolean;
  972. }
  973. ) {
  974. // Verify page and limit parameters
  975. if (page < 1) throw new Error("Page must be at least 1");
  976. if (limit < 1) throw new Error("Limit must be at least 1");
  977. if (limit > 100) throw new Error("Limit must not be greater than 100");
  978. // Verify whether the collection exists, and get the schema
  979. if (!collection) throw new Error("No collection specified");
  980. if (this.collections && !this.collections[collection])
  981. throw new Error("Collection not found");
  982. const { schema } = this.collections![collection];
  983. // Normalize the projection into something we understand, and which throws an error if we have any path collisions
  984. const normalizedProjection = this.normalizeProjection(projection);
  985. // Parse the projection into a mongo projection, and returns whether this query can be cached or not
  986. const parsedProjection = await this.parseFindProjection(
  987. normalizedProjection,
  988. schema.getDocument(),
  989. allowedRestricted
  990. );
  991. let cacheable = useCache !== false && parsedProjection.canCache;
  992. const { mongoProjection } = parsedProjection;
  993. // Parse the filter into a mongo filter, which also validates whether the filter is legal or not, and returns whether this query can be cached or not
  994. const parsedFilter = await this.parseFindFilter(
  995. filter,
  996. schema.getDocument(),
  997. allowedRestricted
  998. );
  999. cacheable = cacheable && parsedFilter.canCache;
  1000. const { mongoFilter } = parsedFilter;
  1001. let queryHash: string | null = null;
  1002. let documents: Document[] | null = null;
  1003. // If we can use cache, get from the cache, and if we get results return those
  1004. // If we're allowed to cache, and the filter doesn't reference any restricted fields, try to cache the query and its response
  1005. if (cacheable) {
  1006. // Turn the query object into a md5 hash that can be used as a Redis key
  1007. queryHash = createHash("md5")
  1008. .update(
  1009. JSON.stringify({
  1010. collection,
  1011. mongoFilter,
  1012. limit,
  1013. page
  1014. })
  1015. )
  1016. .digest("hex");
  1017. // Check if the query hash already exists in Redis, and get it if it is
  1018. const cachedQuery = await this.redisClient?.GET(
  1019. `query.find.${queryHash}`
  1020. );
  1021. // Return the mongoFilter along with the cachedDocuments, if any
  1022. documents = cachedQuery ? JSON.parse(cachedQuery) : null;
  1023. }
  1024. // We got cached documents, so continue with those
  1025. if (documents) {
  1026. cacheable = false;
  1027. } else {
  1028. const totalCount = await this.collections?.[
  1029. collection
  1030. ].collection.countDocuments(mongoFilter);
  1031. if (totalCount === 0 || totalCount === undefined)
  1032. return limit === 1 ? null : [];
  1033. const lastPage = Math.ceil(totalCount / limit);
  1034. if (lastPage < page)
  1035. throw new Error(`The last page available is ${lastPage}`);
  1036. // Create the Mongo cursor and then return the promise that gets the array of documents
  1037. documents = (await this.collections?.[collection].collection
  1038. .find(mongoFilter, mongoProjection)
  1039. .limit(limit)
  1040. .skip((page - 1) * limit)
  1041. .toArray()) as Document[];
  1042. }
  1043. // Adds query results to cache but doesnt await
  1044. if (cacheable && queryHash) {
  1045. this.redisClient!.SET(
  1046. `query.find.${queryHash}`,
  1047. JSON.stringify(documents),
  1048. {
  1049. EX: 60
  1050. }
  1051. );
  1052. }
  1053. // Strips the document of any unneeded properties or properties that are restricted
  1054. documents = await Promise.all(
  1055. documents.map(async (document: Document) =>
  1056. this.stripDocument(
  1057. document,
  1058. schema.getDocument(),
  1059. normalizedProjection,
  1060. allowedRestricted
  1061. )
  1062. )
  1063. );
  1064. if (!documents || documents!.length === 0)
  1065. return limit === 1 ? null : [];
  1066. return limit === 1 ? documents![0] : documents;
  1067. }
  1068. }
  1069. export type DataModuleJobs = {
  1070. [Property in keyof UniqueMethods<DataModule>]: {
  1071. payload: Parameters<UniqueMethods<DataModule>[Property]>[1];
  1072. returns: Awaited<ReturnType<UniqueMethods<DataModule>[Property]>>;
  1073. };
  1074. };