|
@@ -1,7 +1,7 @@
|
|
|
import async from "async";
|
|
|
import config from "config";
|
|
|
import { Db, MongoClient, ObjectId } from "mongodb";
|
|
|
-import hash from "object-hash";
|
|
|
+import { createHash } from "node:crypto";
|
|
|
import { createClient, RedisClientType } from "redis";
|
|
|
import JobContext from "../JobContext";
|
|
|
import BaseModule from "../BaseModule";
|
|
@@ -10,6 +10,7 @@ import Schema, { Types } from "../Schema";
|
|
|
import { Collections } from "../types/Collections";
|
|
|
import { Document as SchemaDocument } from "../types/Document";
|
|
|
import { UniqueMethods } from "../types/Modules";
|
|
|
+import { AttributeValue } from "../types/AttributeValue";
|
|
|
|
|
|
interface ProjectionObject {
|
|
|
[property: string]: boolean | string[] | ProjectionObject;
|
|
@@ -22,13 +23,8 @@ type NormalizedProjection = {
|
|
|
mode: "includeAllBut" | "excludeAllBut";
|
|
|
};
|
|
|
|
|
|
-type SchemaDocumentSimpleValue = string | number | boolean | Date | ObjectId;
|
|
|
-
|
|
|
interface MongoFilter {
|
|
|
- [property: string]:
|
|
|
- | SchemaDocumentSimpleValue
|
|
|
- | SchemaDocumentSimpleValue[]
|
|
|
- | MongoFilter;
|
|
|
+ [property: string]: AttributeValue | AttributeValue[] | MongoFilter;
|
|
|
}
|
|
|
|
|
|
// WIP
|
|
@@ -57,87 +53,71 @@ export default class DataModule extends BaseModule {
|
|
|
/**
|
|
|
* startup - Startup data module
|
|
|
*/
|
|
|
- public override startup(): Promise<void> {
|
|
|
- return new Promise((resolve, reject) => {
|
|
|
- async.waterfall(
|
|
|
- [
|
|
|
- async () => super.startup(),
|
|
|
-
|
|
|
- async () => {
|
|
|
- const mongoUrl = config.get<string>("mongo.url");
|
|
|
-
|
|
|
- this.mongoClient = new MongoClient(mongoUrl);
|
|
|
- await this.mongoClient.connect();
|
|
|
- this.mongoDb = this.mongoClient.db();
|
|
|
- },
|
|
|
-
|
|
|
- async () => this.loadCollections(),
|
|
|
-
|
|
|
- async () => {
|
|
|
- const { url, password } = config.get<{
|
|
|
- url: string;
|
|
|
- password: string;
|
|
|
- }>("redis");
|
|
|
-
|
|
|
- this.redisClient = createClient({
|
|
|
- url,
|
|
|
- password
|
|
|
- });
|
|
|
-
|
|
|
- return this.redisClient.connect();
|
|
|
- },
|
|
|
-
|
|
|
- async () => {
|
|
|
- if (!this.redisClient)
|
|
|
- throw new Error("Redis connection not established");
|
|
|
-
|
|
|
- return this.redisClient.sendCommand([
|
|
|
- "CONFIG",
|
|
|
- "GET",
|
|
|
- "notify-keyspace-events"
|
|
|
- ]);
|
|
|
- },
|
|
|
-
|
|
|
- async (redisConfigResponse: string[]) => {
|
|
|
- if (
|
|
|
- !(
|
|
|
- Array.isArray(redisConfigResponse) &&
|
|
|
- redisConfigResponse[1] === "xE"
|
|
|
- )
|
|
|
- )
|
|
|
- throw new Error(
|
|
|
- `notify-keyspace-events is NOT configured correctly! It is set to: ${
|
|
|
- (Array.isArray(redisConfigResponse) &&
|
|
|
- redisConfigResponse[1]) ||
|
|
|
- "unknown"
|
|
|
- }`
|
|
|
- );
|
|
|
- },
|
|
|
+ public override async startup() {
|
|
|
+ return async.waterfall<void>([
|
|
|
+ async () => super.startup(),
|
|
|
|
|
|
- async () => super.started()
|
|
|
- ],
|
|
|
- err => {
|
|
|
- if (err) reject(err);
|
|
|
- else resolve();
|
|
|
- }
|
|
|
- );
|
|
|
- });
|
|
|
+ async () => {
|
|
|
+ const mongoUrl = config.get<string>("mongo.url");
|
|
|
+
|
|
|
+ this.mongoClient = new MongoClient(mongoUrl);
|
|
|
+ await this.mongoClient.connect();
|
|
|
+ this.mongoDb = this.mongoClient.db();
|
|
|
+ },
|
|
|
+
|
|
|
+ async () => this.loadCollections(),
|
|
|
+
|
|
|
+ async () => {
|
|
|
+ const { url } = config.get<{
|
|
|
+ url: string;
|
|
|
+ }>("redis");
|
|
|
+
|
|
|
+ this.redisClient = createClient({
|
|
|
+ url
|
|
|
+ });
|
|
|
+
|
|
|
+ return this.redisClient.connect();
|
|
|
+ },
|
|
|
+
|
|
|
+ async () => {
|
|
|
+ if (!this.redisClient)
|
|
|
+ throw new Error("Redis connection not established");
|
|
|
+
|
|
|
+ return this.redisClient.sendCommand([
|
|
|
+ "CONFIG",
|
|
|
+ "GET",
|
|
|
+ "notify-keyspace-events"
|
|
|
+ ]);
|
|
|
+ },
|
|
|
+
|
|
|
+ async (redisConfigResponse: string[]) => {
|
|
|
+ if (
|
|
|
+ !(
|
|
|
+ Array.isArray(redisConfigResponse) &&
|
|
|
+ redisConfigResponse[1] === "xE"
|
|
|
+ )
|
|
|
+ )
|
|
|
+ throw new Error(
|
|
|
+ `notify-keyspace-events is NOT configured correctly! It is set to: ${
|
|
|
+ (Array.isArray(redisConfigResponse) &&
|
|
|
+ redisConfigResponse[1]) ||
|
|
|
+ "unknown"
|
|
|
+ }`
|
|
|
+ );
|
|
|
+ },
|
|
|
+
|
|
|
+ async () => super.started()
|
|
|
+ ]);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* shutdown - Shutdown data module
|
|
|
*/
|
|
|
- public override shutdown(): Promise<void> {
|
|
|
- return new Promise(resolve => {
|
|
|
- super
|
|
|
- .shutdown()
|
|
|
- .then(async () => {
|
|
|
- // TODO: Ensure the following shutdown correctly
|
|
|
- if (this.redisClient) await this.redisClient.quit();
|
|
|
- if (this.mongoClient) await this.mongoClient.close(false);
|
|
|
- })
|
|
|
- .finally(() => resolve());
|
|
|
- });
|
|
|
+ public override async shutdown() {
|
|
|
+ await super.shutdown();
|
|
|
+ // TODO: Ensure the following shutdown correctly
|
|
|
+ if (this.redisClient) await this.redisClient.quit();
|
|
|
+ if (this.mongoClient) await this.mongoClient.close(false);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -146,24 +126,16 @@ export default class DataModule extends BaseModule {
|
|
|
* @param collectionName - Name of the collection
|
|
|
* @returns Collection
|
|
|
*/
|
|
|
- private loadCollection<T extends keyof Collections>(
|
|
|
+ private async loadCollection<T extends keyof Collections>(
|
|
|
collectionName: T
|
|
|
- ): Promise<{
|
|
|
- schema: Collections[T]["schema"];
|
|
|
- collection: Collections[T]["collection"];
|
|
|
- }> {
|
|
|
- return new Promise(resolve => {
|
|
|
- import(`../collections/${collectionName.toString()}`).then(
|
|
|
- ({ default: schema }: { default: Schema }) => {
|
|
|
- resolve({
|
|
|
- schema,
|
|
|
- collection: this.mongoDb!.collection(
|
|
|
- collectionName.toString()
|
|
|
- )
|
|
|
- });
|
|
|
- }
|
|
|
- );
|
|
|
- });
|
|
|
+ ) {
|
|
|
+ const { default: schema }: { default: Schema } = await import(
|
|
|
+ `../collections/${collectionName.toString()}`
|
|
|
+ );
|
|
|
+ return {
|
|
|
+ schema,
|
|
|
+ collection: this.mongoDb!.collection(collectionName.toString())
|
|
|
+ };
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -171,21 +143,11 @@ export default class DataModule extends BaseModule {
|
|
|
*
|
|
|
* @returns Promise
|
|
|
*/
|
|
|
- private loadCollections(): Promise<void> {
|
|
|
- return new Promise((resolve, reject) => {
|
|
|
- const fetchCollections = async () => ({
|
|
|
- abc: await this.loadCollection("abc"),
|
|
|
- station: await this.loadCollection("station")
|
|
|
- });
|
|
|
- fetchCollections()
|
|
|
- .then(collections => {
|
|
|
- this.collections = collections;
|
|
|
- resolve();
|
|
|
- })
|
|
|
- .catch(err => {
|
|
|
- reject(new Error(err));
|
|
|
- });
|
|
|
- });
|
|
|
+ private async loadCollections() {
|
|
|
+ this.collections = {
|
|
|
+ abc: await this.loadCollection("abc"),
|
|
|
+ station: await this.loadCollection("station")
|
|
|
+ };
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -519,10 +481,7 @@ export default class DataModule extends BaseModule {
|
|
|
return newAllowedRestricted;
|
|
|
}
|
|
|
|
|
|
- private getCastedValue(
|
|
|
- value: SchemaDocumentSimpleValue,
|
|
|
- schemaType: Types
|
|
|
- ) {
|
|
|
+ private getCastedValue(value: AttributeValue, schemaType: Types) {
|
|
|
if (schemaType === Types.String) {
|
|
|
// Check if value is a string, and if not, convert the value to a string
|
|
|
const castedValue =
|
|
@@ -548,7 +507,7 @@ export default class DataModule extends BaseModule {
|
|
|
const castedValue =
|
|
|
Object.prototype.toString.call(value) === "[object Date]"
|
|
|
? (value as Date)
|
|
|
- : new Date(value.toString());
|
|
|
+ : new Date(value);
|
|
|
// TODO possibly allow this via a validate boolean option?
|
|
|
// We don't allow invalid dates, so throw an error
|
|
|
if (new Date(castedValue).toString() === "Invalid Date")
|
|
@@ -1081,7 +1040,7 @@ export default class DataModule extends BaseModule {
|
|
|
* @param payload - Payload
|
|
|
* @returns Returned object
|
|
|
*/
|
|
|
- public find<CollectionNameType extends keyof Collections>(
|
|
|
+ public async find<CollectionNameType extends keyof Collections>(
|
|
|
context: JobContext,
|
|
|
{
|
|
|
collection, // Collection name
|
|
@@ -1101,174 +1060,161 @@ export default class DataModule extends BaseModule {
|
|
|
page?: number;
|
|
|
useCache?: boolean;
|
|
|
}
|
|
|
- ): Promise<Document | Document[] | null> {
|
|
|
- return new Promise((resolve, reject) => {
|
|
|
- let queryHash: string | null = null;
|
|
|
- let cacheable = useCache !== false;
|
|
|
-
|
|
|
- let schema: Schema;
|
|
|
-
|
|
|
- let normalizedProjection: NormalizedProjection;
|
|
|
-
|
|
|
- let mongoFilter: MongoFilter;
|
|
|
- let mongoProjection: ProjectionObject;
|
|
|
-
|
|
|
- async.waterfall(
|
|
|
- [
|
|
|
- // Verify page and limit parameters
|
|
|
- async () => {
|
|
|
- if (page < 1)
|
|
|
- throw new Error("Page must be at least 1");
|
|
|
- if (limit < 1)
|
|
|
- throw new Error("Limit must be at least 1");
|
|
|
- if (limit > 100)
|
|
|
- throw new Error(
|
|
|
- "Limit must not be greater than 100"
|
|
|
- );
|
|
|
- },
|
|
|
-
|
|
|
- // Verify whether the collection exists, and get the schema
|
|
|
- async () => {
|
|
|
- if (!collection)
|
|
|
- throw new Error("No collection specified");
|
|
|
- if (this.collections && !this.collections[collection])
|
|
|
- throw new Error("Collection not found");
|
|
|
-
|
|
|
- schema = this.collections![collection].schema;
|
|
|
- },
|
|
|
-
|
|
|
- // Normalize the projection into something we understand, and which throws an error if we have any path collisions
|
|
|
- async () => {
|
|
|
- normalizedProjection =
|
|
|
- this.normalizeProjection(projection);
|
|
|
- },
|
|
|
-
|
|
|
- // TOOD validate the projection based on the schema here
|
|
|
-
|
|
|
- // Parse the projection into a mongo projection, and returns whether this query can be cached or not
|
|
|
- async () => {
|
|
|
- const parsedProjection = await this.parseFindProjection(
|
|
|
- normalizedProjection,
|
|
|
- schema.getDocument(),
|
|
|
- allowedRestricted
|
|
|
- );
|
|
|
+ ) {
|
|
|
+ let queryHash: string | null = null;
|
|
|
+ let cacheable = useCache !== false;
|
|
|
|
|
|
- cacheable = cacheable && parsedProjection.canCache;
|
|
|
- mongoProjection = parsedProjection.mongoProjection;
|
|
|
- },
|
|
|
+ let schema: Schema;
|
|
|
|
|
|
- // Parse the filter into a mongo filter, which also validates whether the filter is legal or not, and returns whether this query can be cached or not
|
|
|
- async () => {
|
|
|
- const parsedFilter = await this.parseFindFilter(
|
|
|
- filter,
|
|
|
- schema.getDocument(),
|
|
|
- allowedRestricted
|
|
|
- );
|
|
|
+ let normalizedProjection: NormalizedProjection;
|
|
|
|
|
|
- cacheable = cacheable && parsedFilter.canCache;
|
|
|
- mongoFilter = parsedFilter.mongoFilter;
|
|
|
- },
|
|
|
-
|
|
|
- // If we can use cache, get from the cache, and if we get results return those
|
|
|
- async () => {
|
|
|
- // If we're allowed to cache, and the filter doesn't reference any restricted fields, try to cache the query and its response
|
|
|
- if (cacheable) {
|
|
|
- // Turn the query object into a sha1 hash that can be used as a Redis key
|
|
|
- queryHash = hash(
|
|
|
- {
|
|
|
- collection,
|
|
|
- mongoFilter,
|
|
|
- limit,
|
|
|
- page
|
|
|
- },
|
|
|
- {
|
|
|
- algorithm: "sha1"
|
|
|
- }
|
|
|
- );
|
|
|
+ let mongoFilter: MongoFilter;
|
|
|
+ let mongoProjection: ProjectionObject;
|
|
|
|
|
|
- // Check if the query hash already exists in Redis, and get it if it is
|
|
|
- const cachedQuery = await this.redisClient?.GET(
|
|
|
- `query.find.${queryHash}`
|
|
|
- );
|
|
|
+ return async.waterfall<Document | Document[] | null>([
|
|
|
+ // Verify page and limit parameters
|
|
|
+ async () => {
|
|
|
+ if (page < 1) throw new Error("Page must be at least 1");
|
|
|
+ if (limit < 1) throw new Error("Limit must be at least 1");
|
|
|
+ if (limit > 100)
|
|
|
+ throw new Error("Limit must not be greater than 100");
|
|
|
+ },
|
|
|
|
|
|
- // Return the mongoFilter along with the cachedDocuments, if any
|
|
|
- return {
|
|
|
- cachedDocuments: cachedQuery
|
|
|
- ? JSON.parse(cachedQuery)
|
|
|
- : null
|
|
|
- };
|
|
|
- }
|
|
|
+ // Verify whether the collection exists, and get the schema
|
|
|
+ async () => {
|
|
|
+ if (!collection) throw new Error("No collection specified");
|
|
|
+ if (this.collections && !this.collections[collection])
|
|
|
+ throw new Error("Collection not found");
|
|
|
|
|
|
- // We can't use the cache, so just continue with no cached documents
|
|
|
- return { cachedDocuments: null };
|
|
|
- },
|
|
|
-
|
|
|
- // Get documents from Mongo if we got no cached documents
|
|
|
- async ({
|
|
|
- cachedDocuments
|
|
|
- }: {
|
|
|
- cachedDocuments: Document[] | null;
|
|
|
- }) => {
|
|
|
- // We got cached documents, so continue with those
|
|
|
- if (cachedDocuments) {
|
|
|
- cacheable = false;
|
|
|
- return cachedDocuments;
|
|
|
- }
|
|
|
+ schema = this.collections![collection].schema;
|
|
|
+ },
|
|
|
|
|
|
- // TODO, add mongo projection. Make sure to keep in mind caching with queryHash.
|
|
|
+ // Normalize the projection into something we understand, and which throws an error if we have any path collisions
|
|
|
+ async () => {
|
|
|
+ normalizedProjection = this.normalizeProjection(projection);
|
|
|
+ },
|
|
|
|
|
|
- const totalCount = await this.collections?.[
|
|
|
- collection
|
|
|
- ].collection.countDocuments({ $expr: mongoFilter });
|
|
|
- if (totalCount === 0) return [];
|
|
|
- const lastPage = Math.ceil(totalCount / limit);
|
|
|
- if (lastPage < page)
|
|
|
- throw new Error(
|
|
|
- `The last page available is ${lastPage}`
|
|
|
- );
|
|
|
+ // TOOD validate the projection based on the schema here
|
|
|
|
|
|
- // Create the Mongo cursor and then return the promise that gets the array of documents
|
|
|
- return this.collections?.[collection].collection
|
|
|
- .find(mongoFilter, mongoProjection)
|
|
|
- .limit(limit)
|
|
|
- .skip((page - 1) * limit)
|
|
|
- .toArray();
|
|
|
- },
|
|
|
-
|
|
|
- // Add documents to the cache
|
|
|
- async (documents: Document[]) => {
|
|
|
- // Adds query results to cache but doesnt await
|
|
|
- if (cacheable && queryHash) {
|
|
|
- this.redisClient!.SET(
|
|
|
- `query.find.${queryHash}`,
|
|
|
- JSON.stringify(documents),
|
|
|
- {
|
|
|
- EX: 60
|
|
|
- }
|
|
|
- );
|
|
|
- }
|
|
|
- return documents;
|
|
|
- },
|
|
|
-
|
|
|
- // Strips the document of any unneeded properties or properties that are restricted
|
|
|
- async (documents: Document[]) =>
|
|
|
- async.map(documents, async (document: Document) =>
|
|
|
- this.stripDocument(
|
|
|
- document,
|
|
|
- schema.getDocument(),
|
|
|
- normalizedProjection,
|
|
|
- allowedRestricted
|
|
|
- )
|
|
|
+ // Parse the projection into a mongo projection, and returns whether this query can be cached or not
|
|
|
+ async () => {
|
|
|
+ const parsedProjection = await this.parseFindProjection(
|
|
|
+ normalizedProjection,
|
|
|
+ schema.getDocument(),
|
|
|
+ allowedRestricted
|
|
|
+ );
|
|
|
+
|
|
|
+ cacheable = cacheable && parsedProjection.canCache;
|
|
|
+ mongoProjection = parsedProjection.mongoProjection;
|
|
|
+ },
|
|
|
+
|
|
|
+ // Parse the filter into a mongo filter, which also validates whether the filter is legal or not, and returns whether this query can be cached or not
|
|
|
+ async () => {
|
|
|
+ const parsedFilter = await this.parseFindFilter(
|
|
|
+ filter,
|
|
|
+ schema.getDocument(),
|
|
|
+ allowedRestricted
|
|
|
+ );
|
|
|
+
|
|
|
+ cacheable = cacheable && parsedFilter.canCache;
|
|
|
+ mongoFilter = parsedFilter.mongoFilter;
|
|
|
+ },
|
|
|
+
|
|
|
+ // If we can use cache, get from the cache, and if we get results return those
|
|
|
+ async () => {
|
|
|
+ // If we're allowed to cache, and the filter doesn't reference any restricted fields, try to cache the query and its response
|
|
|
+ if (cacheable) {
|
|
|
+ // Turn the query object into a md5 hash that can be used as a Redis key
|
|
|
+ queryHash = createHash("md5")
|
|
|
+ .update(
|
|
|
+ JSON.stringify({
|
|
|
+ collection,
|
|
|
+ mongoFilter,
|
|
|
+ limit,
|
|
|
+ page
|
|
|
+ })
|
|
|
)
|
|
|
- ],
|
|
|
- (err, documents?: any[]) => {
|
|
|
- if (err) reject(err);
|
|
|
- else if (!documents || documents!.length === 0)
|
|
|
- resolve(limit === 1 ? null : []);
|
|
|
- else resolve(limit === 1 ? documents![0] : documents);
|
|
|
+ .digest("hex");
|
|
|
+
|
|
|
+ // Check if the query hash already exists in Redis, and get it if it is
|
|
|
+ const cachedQuery = await this.redisClient?.GET(
|
|
|
+ `query.find.${queryHash}`
|
|
|
+ );
|
|
|
+
|
|
|
+ // Return the mongoFilter along with the cachedDocuments, if any
|
|
|
+ return {
|
|
|
+ cachedDocuments: cachedQuery
|
|
|
+ ? JSON.parse(cachedQuery)
|
|
|
+ : null
|
|
|
+ };
|
|
|
}
|
|
|
- );
|
|
|
- });
|
|
|
+
|
|
|
+ // We can't use the cache, so just continue with no cached documents
|
|
|
+ return { cachedDocuments: null };
|
|
|
+ },
|
|
|
+
|
|
|
+ // Get documents from Mongo if we got no cached documents
|
|
|
+ async ({
|
|
|
+ cachedDocuments
|
|
|
+ }: {
|
|
|
+ cachedDocuments: Document[] | null;
|
|
|
+ }) => {
|
|
|
+ // We got cached documents, so continue with those
|
|
|
+ if (cachedDocuments) {
|
|
|
+ cacheable = false;
|
|
|
+ return cachedDocuments;
|
|
|
+ }
|
|
|
+
|
|
|
+ // TODO, add mongo projection. Make sure to keep in mind caching with queryHash.
|
|
|
+
|
|
|
+ const totalCount = await this.collections?.[
|
|
|
+ collection
|
|
|
+ ].collection.countDocuments({ $expr: mongoFilter });
|
|
|
+ if (totalCount === 0 || totalCount === undefined) return [];
|
|
|
+ const lastPage = Math.ceil(totalCount / limit);
|
|
|
+ if (lastPage < page)
|
|
|
+ throw new Error(`The last page available is ${lastPage}`);
|
|
|
+
|
|
|
+ // Create the Mongo cursor and then return the promise that gets the array of documents
|
|
|
+ return this.collections?.[collection].collection
|
|
|
+ .find(mongoFilter, mongoProjection)
|
|
|
+ .limit(limit)
|
|
|
+ .skip((page - 1) * limit)
|
|
|
+ .toArray();
|
|
|
+ },
|
|
|
+
|
|
|
+ // Add documents to the cache
|
|
|
+ async (documents: Document[]) => {
|
|
|
+ // Adds query results to cache but doesnt await
|
|
|
+ if (cacheable && queryHash) {
|
|
|
+ this.redisClient!.SET(
|
|
|
+ `query.find.${queryHash}`,
|
|
|
+ JSON.stringify(documents),
|
|
|
+ {
|
|
|
+ EX: 60
|
|
|
+ }
|
|
|
+ );
|
|
|
+ }
|
|
|
+ return documents;
|
|
|
+ },
|
|
|
+
|
|
|
+ // Strips the document of any unneeded properties or properties that are restricted
|
|
|
+ async (documents: Document[]) =>
|
|
|
+ async.map(documents, async (document: Document) =>
|
|
|
+ this.stripDocument(
|
|
|
+ document,
|
|
|
+ schema.getDocument(),
|
|
|
+ normalizedProjection,
|
|
|
+ allowedRestricted
|
|
|
+ )
|
|
|
+ ),
|
|
|
+
|
|
|
+ async (documents: Document[]) => {
|
|
|
+ if (!documents || documents!.length === 0)
|
|
|
+ return limit === 1 ? null : [];
|
|
|
+ return limit === 1 ? documents![0] : documents;
|
|
|
+ }
|
|
|
+ ]);
|
|
|
}
|
|
|
}
|
|
|
|