|
@@ -1,22 +1,17 @@
|
|
|
import config from "config";
|
|
|
-import mongoose, { Connection, Model, Schema, SchemaTypes } from "mongoose";
|
|
|
-import { patchHistoryPlugin, patchEventEmitter } from "ts-patch-mongoose";
|
|
|
import { readdir } from "fs/promises";
|
|
|
import path from "path";
|
|
|
-import updateVersioningPlugin from "mongoose-update-versioning";
|
|
|
import { forEachIn } from "@common/utils/forEachIn";
|
|
|
-import Migration from "@/modules/DataModule/Migration";
|
|
|
-import documentVersionPlugin from "@/modules/DataModule/plugins/documentVersion";
|
|
|
-import getDataPlugin from "@/modules/DataModule/plugins/getData";
|
|
|
+import { Sequelize, Model as SequelizeModel, ModelStatic } from "sequelize";
|
|
|
+import { Dirent } from "fs";
|
|
|
+import * as inflection from "inflection";
|
|
|
import BaseModule, { ModuleStatus } from "@/BaseModule";
|
|
|
import EventsModule from "./EventsModule";
|
|
|
import DataModuleJob from "./DataModule/DataModuleJob";
|
|
|
import Job from "@/Job";
|
|
|
|
|
|
export class DataModule extends BaseModule {
|
|
|
- private _models?: Record<string, Model<any>>;
|
|
|
-
|
|
|
- private _mongoConnection?: Connection;
|
|
|
+ private _sequelize?: Sequelize;
|
|
|
|
|
|
declare _jobs: Record<string, typeof Job | typeof DataModuleJob>;
|
|
|
|
|
@@ -35,17 +30,9 @@ export class DataModule extends BaseModule {
|
|
|
public override async startup() {
|
|
|
await super.startup();
|
|
|
|
|
|
- await this._createMongoConnection();
|
|
|
-
|
|
|
- await this._runMigrations();
|
|
|
-
|
|
|
- await this._loadModels();
|
|
|
-
|
|
|
- await this._syncModelIndexes();
|
|
|
+ await this._setupSequelize();
|
|
|
|
|
|
- await this._loadModelJobs();
|
|
|
-
|
|
|
- await this._loadModelEvents();
|
|
|
+ // await this._runMigrations();
|
|
|
|
|
|
await super._started();
|
|
|
}
|
|
@@ -55,246 +42,148 @@ export class DataModule extends BaseModule {
|
|
|
*/
|
|
|
public override async shutdown() {
|
|
|
await super.shutdown();
|
|
|
- patchEventEmitter.removeAllListeners();
|
|
|
- if (this._mongoConnection) await this._mongoConnection.close();
|
|
|
+ await this._sequelize?.close();
|
|
|
await this._stopped();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * createMongoConnection - Create mongo connection
|
|
|
+ * setupSequelize - Setup sequelize instance
|
|
|
*/
|
|
|
- private async _createMongoConnection() {
|
|
|
- mongoose.set({
|
|
|
- runValidators: true,
|
|
|
- sanitizeFilter: true,
|
|
|
- strict: "throw",
|
|
|
- strictQuery: "throw"
|
|
|
+ private async _setupSequelize() {
|
|
|
+ const { username, password, host, port, database } = config.get<any>("postgres");
|
|
|
+ this._sequelize = new Sequelize(database, username, password, {
|
|
|
+ host,
|
|
|
+ port,
|
|
|
+ dialect: "postgres",
|
|
|
+ logging: message => this.log(message)
|
|
|
});
|
|
|
|
|
|
- const { user, password, host, port, database } = config.get<{
|
|
|
- user: string;
|
|
|
- password: string;
|
|
|
- host: string;
|
|
|
- port: number;
|
|
|
- database: string;
|
|
|
- }>("mongo");
|
|
|
- const mongoUrl = `mongodb://${user}:${password}@${host}:${port}/${database}`;
|
|
|
-
|
|
|
- this._mongoConnection = await mongoose
|
|
|
- .createConnection(mongoUrl)
|
|
|
- .asPromise();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * registerEvents - Register events for schema with event module
|
|
|
- */
|
|
|
- private async _registerEvents(modelName: string, schema: Schema<any>) {
|
|
|
- const { enabled, eventCreated, eventUpdated, eventDeleted } =
|
|
|
- schema.get("patchHistory") ?? {};
|
|
|
-
|
|
|
- if (!enabled) return;
|
|
|
-
|
|
|
- Object.entries({
|
|
|
- created: eventCreated,
|
|
|
- updated: eventUpdated,
|
|
|
- deleted: eventDeleted
|
|
|
- })
|
|
|
- .filter(([, event]) => !!event)
|
|
|
- .forEach(([action, event]) => {
|
|
|
- patchEventEmitter.on(event!, async ({ doc, oldDoc }) => {
|
|
|
- const modelId = doc?._id ?? oldDoc?._id;
|
|
|
-
|
|
|
- const Model = await this.getModel(modelName);
|
|
|
-
|
|
|
- if (doc) doc = Model.hydrate(doc);
|
|
|
-
|
|
|
- if (oldDoc) oldDoc = Model.hydrate(oldDoc);
|
|
|
-
|
|
|
- if (!modelId && action !== "created")
|
|
|
- throw new Error(`Model Id not found for "${event}"`);
|
|
|
-
|
|
|
- const EventClass = this.getEvent(`${modelName}.${action}`);
|
|
|
-
|
|
|
- await EventsModule.publish(
|
|
|
- new EventClass({ doc, oldDoc }, modelId)
|
|
|
- );
|
|
|
- });
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * registerEvents - Register events for schema with event module
|
|
|
- */
|
|
|
- private async _registerEventListeners(schema: Schema<any>) {
|
|
|
- const eventListeners = schema.get("eventListeners");
|
|
|
-
|
|
|
- if (
|
|
|
- typeof eventListeners !== "object" ||
|
|
|
- Object.keys(eventListeners).length === 0
|
|
|
- )
|
|
|
- return;
|
|
|
+ await this._sequelize.authenticate();
|
|
|
|
|
|
await forEachIn(
|
|
|
- Object.entries(eventListeners),
|
|
|
- async ([event, callback]) =>
|
|
|
- EventsModule.pSubscribe(event, callback)
|
|
|
- );
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * loadModel - Import and load model schema
|
|
|
- *
|
|
|
- * @param modelName - Name of the model
|
|
|
- * @returns Model
|
|
|
- */
|
|
|
- private async _loadModel(modelName: string): Promise<Model<any>> {
|
|
|
- if (!this._mongoConnection) throw new Error("Mongo is not available");
|
|
|
-
|
|
|
- const { schema }: { schema: Schema<any> } = await import(
|
|
|
- `./DataModule/models/${modelName.toString()}/schema`
|
|
|
- );
|
|
|
-
|
|
|
- schema.plugin(documentVersionPlugin);
|
|
|
-
|
|
|
- schema.set("timestamps", schema.get("timestamps") ?? true);
|
|
|
-
|
|
|
- const patchHistoryConfig = {
|
|
|
- enabled: true,
|
|
|
- patchHistoryDisabled: true,
|
|
|
- eventCreated: `${modelName}.created`,
|
|
|
- eventUpdated: `${modelName}.updated`,
|
|
|
- eventDeleted: `${modelName}.deleted`,
|
|
|
- ...(schema.get("patchHistory") ?? {})
|
|
|
- };
|
|
|
- schema.set("patchHistory", patchHistoryConfig);
|
|
|
-
|
|
|
- if (patchHistoryConfig.enabled) {
|
|
|
- schema.plugin(patchHistoryPlugin, patchHistoryConfig);
|
|
|
- }
|
|
|
-
|
|
|
- const { enabled: getDataEnabled = false } = schema.get("getData") ?? {};
|
|
|
-
|
|
|
- if (getDataEnabled) schema.plugin(getDataPlugin);
|
|
|
-
|
|
|
- schema.static("getModelName", () => modelName);
|
|
|
+ await readdir(
|
|
|
+ path.resolve(__dirname, `./${this.constructor.name}/models`),
|
|
|
+ {
|
|
|
+ withFileTypes: true
|
|
|
+ }
|
|
|
+ ),
|
|
|
+ async modelFile => {
|
|
|
+ if (!modelFile.isFile() || modelFile.name.includes(".spec."))
|
|
|
+ return;
|
|
|
|
|
|
- await this._registerEvents(modelName, schema);
|
|
|
+ const {
|
|
|
+ default: ModelClass,
|
|
|
+ schema,
|
|
|
+ options = {},
|
|
|
+ setup
|
|
|
+ } = await import(`${modelFile.path}/${modelFile.name}`);
|
|
|
|
|
|
- await this._registerEventListeners(schema);
|
|
|
+ const tableName = inflection.camelize(
|
|
|
+ inflection.pluralize(ModelClass.name),
|
|
|
+ true
|
|
|
+ );
|
|
|
|
|
|
- schema.set("toObject", { getters: true, virtuals: true });
|
|
|
- schema.set("toJSON", { getters: true, virtuals: true });
|
|
|
+ ModelClass.init(schema, {
|
|
|
+ tableName,
|
|
|
+ ...options,
|
|
|
+ sequelize: this._sequelize
|
|
|
+ });
|
|
|
|
|
|
- schema.virtual("_name").get(() => modelName);
|
|
|
+ if (typeof setup === "function") await setup();
|
|
|
|
|
|
- schema.plugin(updateVersioningPlugin);
|
|
|
+ await this._loadModelEvents(ModelClass.name);
|
|
|
|
|
|
- await forEachIn(
|
|
|
- Object.entries(schema.paths).filter(
|
|
|
- ([, type]) =>
|
|
|
- type instanceof SchemaTypes.ObjectId ||
|
|
|
- (type instanceof SchemaTypes.Array &&
|
|
|
- type.caster instanceof SchemaTypes.ObjectId)
|
|
|
- ),
|
|
|
- async ([key, type]) => {
|
|
|
- const { ref } =
|
|
|
- (type instanceof SchemaTypes.Array
|
|
|
- ? type.caster?.options
|
|
|
- : type?.options) ?? {};
|
|
|
-
|
|
|
- if (ref)
|
|
|
- schema.path(key).get((value: any) => {
|
|
|
- if (
|
|
|
- typeof value === "object" &&
|
|
|
- type instanceof SchemaTypes.ObjectId
|
|
|
- )
|
|
|
- return {
|
|
|
- _id: value,
|
|
|
- _name: ref
|
|
|
- };
|
|
|
-
|
|
|
- if (
|
|
|
- Array.isArray(value) &&
|
|
|
- type instanceof SchemaTypes.Array
|
|
|
- )
|
|
|
- return value.map(item =>
|
|
|
- item === null
|
|
|
- ? null
|
|
|
- : {
|
|
|
- _id: item,
|
|
|
- _name: ref
|
|
|
- }
|
|
|
- );
|
|
|
-
|
|
|
- return value;
|
|
|
- });
|
|
|
+ await this._loadModelJobs(ModelClass.name);
|
|
|
}
|
|
|
);
|
|
|
|
|
|
- return this._mongoConnection.model(modelName.toString(), schema);
|
|
|
+ this._sequelize.sync();
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * loadModels - Load and initialize all models
|
|
|
- *
|
|
|
- * @returns Promise
|
|
|
- */
|
|
|
- private async _loadModels() {
|
|
|
- mongoose.SchemaTypes.String.set("trim", true);
|
|
|
-
|
|
|
- this._models = {
|
|
|
- abc: await this._loadModel("abc"),
|
|
|
- minifiedUsers: await this._loadModel("minifiedUsers"),
|
|
|
- news: await this._loadModel("news"),
|
|
|
- sessions: await this._loadModel("sessions"),
|
|
|
- stations: await this._loadModel("stations"),
|
|
|
- users: await this._loadModel("users")
|
|
|
- };
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * syncModelIndexes - Sync indexes for all models
|
|
|
- */
|
|
|
- private async _syncModelIndexes() {
|
|
|
- if (!this._models) throw new Error("Models not loaded");
|
|
|
-
|
|
|
- await forEachIn(
|
|
|
- Object.values(this._models).filter(
|
|
|
- model => model.schema.get("autoIndex") !== false
|
|
|
- ),
|
|
|
- model => model.syncIndexes()
|
|
|
- );
|
|
|
- }
|
|
|
+ // /**
|
|
|
+ // * registerEvents - Register events for schema with event module
|
|
|
+ // */
|
|
|
+ // private async _registerEvents(modelName: string, schema: Schema<any>) {
|
|
|
+ // const { enabled, eventCreated, eventUpdated, eventDeleted } =
|
|
|
+ // schema.get("patchHistory") ?? {};
|
|
|
+
|
|
|
+ // if (!enabled) return;
|
|
|
+
|
|
|
+ // Object.entries({
|
|
|
+ // created: eventCreated,
|
|
|
+ // updated: eventUpdated,
|
|
|
+ // deleted: eventDeleted
|
|
|
+ // })
|
|
|
+ // .filter(([, event]) => !!event)
|
|
|
+ // .forEach(([action, event]) => {
|
|
|
+ // patchEventEmitter.on(event!, async ({ doc, oldDoc }) => {
|
|
|
+ // const modelId = doc?._id ?? oldDoc?._id;
|
|
|
+
|
|
|
+ // const Model = await this.getModel(modelName);
|
|
|
+
|
|
|
+ // if (doc) doc = Model.hydrate(doc);
|
|
|
+
|
|
|
+ // if (oldDoc) oldDoc = Model.hydrate(oldDoc);
|
|
|
+
|
|
|
+ // if (!modelId && action !== "created")
|
|
|
+ // throw new Error(`Model Id not found for "${event}"`);
|
|
|
+
|
|
|
+ // const EventClass = this.getEvent(`${modelName}.${action}`);
|
|
|
+
|
|
|
+ // await EventsModule.publish(
|
|
|
+ // new EventClass({ doc, oldDoc }, modelId)
|
|
|
+ // );
|
|
|
+ // });
|
|
|
+ // });
|
|
|
+ // }
|
|
|
+
|
|
|
+ // /**
|
|
|
+ // * registerEvents - Register events for schema with event module
|
|
|
+ // */
|
|
|
+ // private async _registerEventListeners(schema: Schema<any>) {
|
|
|
+ // const eventListeners = schema.get("eventListeners");
|
|
|
+
|
|
|
+ // if (
|
|
|
+ // typeof eventListeners !== "object" ||
|
|
|
+ // Object.keys(eventListeners).length === 0
|
|
|
+ // )
|
|
|
+ // return;
|
|
|
+
|
|
|
+ // await forEachIn(
|
|
|
+ // Object.entries(eventListeners),
|
|
|
+ // async ([event, callback]) =>
|
|
|
+ // EventsModule.pSubscribe(event, callback)
|
|
|
+ // );
|
|
|
+ // }
|
|
|
|
|
|
/**
|
|
|
* getModel - Get model
|
|
|
*
|
|
|
* @returns Model
|
|
|
*/
|
|
|
- public async getModel<ModelType extends Model<any>>(
|
|
|
+ public async getModel<ModelType extends SequelizeModel<any>>(
|
|
|
name: string
|
|
|
- ): Promise<ModelType> {
|
|
|
- if (!this._models) throw new Error("Models not loaded");
|
|
|
+ ): Promise<ModelStatic<ModelType>> {
|
|
|
+ if (!this._sequelize?.models) throw new Error("Models not loaded");
|
|
|
|
|
|
if (this.getStatus() !== ModuleStatus.STARTED)
|
|
|
throw new Error("Module not started");
|
|
|
|
|
|
- if (!this._models[name]) throw new Error("Model not found");
|
|
|
-
|
|
|
- return this._models[name] as ModelType;
|
|
|
+ return this._sequelize.model(name) as ModelStatic<ModelType>;
|
|
|
}
|
|
|
|
|
|
- private async _loadModelMigrations(modelName: string) {
|
|
|
- if (!this._mongoConnection) throw new Error("Mongo is not available");
|
|
|
-
|
|
|
- let migrations;
|
|
|
+ private async _loadModelJobs(modelClassName: string) {
|
|
|
+ let jobs: Dirent[];
|
|
|
|
|
|
try {
|
|
|
- migrations = await readdir(
|
|
|
+ jobs = await readdir(
|
|
|
path.resolve(
|
|
|
__dirname,
|
|
|
- `./DataModule/models/${modelName}/migrations/`
|
|
|
- )
|
|
|
+ `./${this.constructor.name}/models/${modelClassName}/jobs/`
|
|
|
+ ),
|
|
|
+ {
|
|
|
+ withFileTypes: true
|
|
|
+ }
|
|
|
);
|
|
|
} catch (error) {
|
|
|
if (
|
|
@@ -302,107 +191,55 @@ export class DataModule extends BaseModule {
|
|
|
"code" in error &&
|
|
|
error.code === "ENOENT"
|
|
|
)
|
|
|
- return [];
|
|
|
+ return;
|
|
|
|
|
|
throw error;
|
|
|
}
|
|
|
|
|
|
- return forEachIn(migrations, async migrationFile => {
|
|
|
- const { default: Migrate }: { default: typeof Migration } =
|
|
|
- await import(
|
|
|
- `./DataModule/models/${modelName}/migrations/${migrationFile}`
|
|
|
- );
|
|
|
- return new Migrate(this._mongoConnection as Connection);
|
|
|
- });
|
|
|
- }
|
|
|
+ await forEachIn(jobs, async jobFile => {
|
|
|
+ if (!jobFile.isFile() || jobFile.name.includes(".spec.")) return;
|
|
|
|
|
|
- private async _loadMigrations() {
|
|
|
- const models = await readdir(
|
|
|
- path.resolve(__dirname, "./DataModule/models/")
|
|
|
- );
|
|
|
-
|
|
|
- return forEachIn(models, async modelName =>
|
|
|
- this._loadModelMigrations(modelName)
|
|
|
- );
|
|
|
- }
|
|
|
-
|
|
|
- private async _runMigrations() {
|
|
|
- const migrations = (await this._loadMigrations()).flat();
|
|
|
-
|
|
|
- for (let i = 0; i < migrations.length; i += 1) {
|
|
|
- const migration = migrations[i];
|
|
|
- // eslint-disable-next-line no-await-in-loop
|
|
|
- await migration.up();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private async _loadModelJobs() {
|
|
|
- if (!this._models) throw new Error("Models not loaded");
|
|
|
-
|
|
|
- await forEachIn(Object.keys(this._models), async modelName => {
|
|
|
- let jobs;
|
|
|
-
|
|
|
- try {
|
|
|
- jobs = await readdir(
|
|
|
- path.resolve(
|
|
|
- __dirname,
|
|
|
- `./${this.constructor.name}/models/${modelName}/jobs/`
|
|
|
- )
|
|
|
- );
|
|
|
- } catch (error) {
|
|
|
- if (
|
|
|
- error instanceof Error &&
|
|
|
- "code" in error &&
|
|
|
- error.code === "ENOENT"
|
|
|
- )
|
|
|
- return;
|
|
|
-
|
|
|
- throw error;
|
|
|
- }
|
|
|
-
|
|
|
- await forEachIn(jobs, async jobFile => {
|
|
|
- if (jobFile.includes(".spec.")) return;
|
|
|
-
|
|
|
- const { default: Job } = await import(
|
|
|
- `./${this.constructor.name}/models/${modelName}/jobs/${jobFile}`
|
|
|
- );
|
|
|
+ const { default: JobClass } = await import(
|
|
|
+ `${jobFile.path}/${jobFile.name}`
|
|
|
+ );
|
|
|
|
|
|
- this._jobs[Job.getName()] = Job;
|
|
|
- });
|
|
|
+ this._jobs[JobClass.getName()] = JobClass;
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- private async _loadModelEvents() {
|
|
|
- if (!this._models) throw new Error("Models not loaded");
|
|
|
+ private async _loadModelEvents(modelClassName: string) {
|
|
|
+ let events: Dirent[];
|
|
|
|
|
|
- await forEachIn(Object.keys(this._models), async modelName => {
|
|
|
- let events;
|
|
|
+ try {
|
|
|
+ events = await readdir(
|
|
|
+ path.resolve(
|
|
|
+ __dirname,
|
|
|
+ `./${this.constructor.name}/models/${modelClassName}/events/`
|
|
|
+ ),
|
|
|
+ {
|
|
|
+ withFileTypes: true
|
|
|
+ }
|
|
|
+ );
|
|
|
+ } catch (error) {
|
|
|
+ if (
|
|
|
+ error instanceof Error &&
|
|
|
+ "code" in error &&
|
|
|
+ error.code === "ENOENT"
|
|
|
+ )
|
|
|
+ return;
|
|
|
|
|
|
- try {
|
|
|
- events = await readdir(
|
|
|
- path.resolve(
|
|
|
- __dirname,
|
|
|
- `./${this.constructor.name}/models/${modelName}/events/`
|
|
|
- )
|
|
|
- );
|
|
|
- } catch (error) {
|
|
|
- if (
|
|
|
- error instanceof Error &&
|
|
|
- "code" in error &&
|
|
|
- error.code === "ENOENT"
|
|
|
- )
|
|
|
- return;
|
|
|
+ throw error;
|
|
|
+ }
|
|
|
|
|
|
- throw error;
|
|
|
- }
|
|
|
+ await forEachIn(events, async eventFile => {
|
|
|
+ if (!eventFile.isFile() || eventFile.name.includes(".spec."))
|
|
|
+ return;
|
|
|
|
|
|
- await forEachIn(events, async eventFile => {
|
|
|
- const { default: EventClass } = await import(
|
|
|
- `./${this.constructor.name}/models/${modelName}/events/${eventFile}`
|
|
|
- );
|
|
|
+ const { default: EventClass } = await import(
|
|
|
+ `${eventFile.path}/${eventFile.name}`
|
|
|
+ );
|
|
|
|
|
|
- this._events[EventClass.getName()] = EventClass;
|
|
|
- });
|
|
|
+ this._events[EventClass.getName()] = EventClass;
|
|
|
});
|
|
|
}
|
|
|
}
|