| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576 | import async from "async";import config from "config";import redis from "redis";import mongoose from "mongoose";import CoreClass from "../../core";// Lightweight / convenience wrapper around redis module for our needsconst pubs = {};const subs = {};let CacheModule;class _CacheModule extends CoreClass {	// eslint-disable-next-line require-jsdoc	constructor() {		super("cache");		CacheModule = this;	}	/**	 * Initialises the cache/redis module	 *	 * @returns {Promise} - returns promise (reject, resolve)	 */	async initialize() {		const importSchema = schemaName =>			new Promise(resolve => {				import(`./schemas/${schemaName}`).then(schema => resolve(schema.default));			});		this.schemas = {			session: await importSchema("session"),			station: await importSchema("station"),			playlist: await importSchema("playlist"),			officialPlaylist: await importSchema("officialPlaylist"),			song: await importSchema("song"),			punishment: await importSchema("punishment"),			recentActivity: await importSchema("recentActivity"),			ratings: await importSchema("ratings")		};		return new Promise((resolve, reject) => {			this.log("INFO", "Connecting...");			this.client = redis.createClient({				...config.get("redis"),				reconnectStrategy: retries => {					if (this.getStatus() !== "LOCKDOWN") {						if (this.getStatus() !== "RECONNECTING") this.setStatus("RECONNECTING");						this.log("INFO", `Attempting to reconnect.`);						if (retries >= 10) {							this.log("ERROR", `Stopped trying to reconnect.`);							this.setStatus("FAILED");							new Error("Stopped trying to reconnect.");						} else {							Math.min(retries * 50, 500);						}					}				}			});			this.client.on("error", err => {				if (this.getStatus() === "INITIALIZING") reject(err);				if (this.getStatus() === "LOCKDOWN") return;				this.log("ERROR", `Error ${err.message}.`);			});			this.client.on("ready", () => {				this.log("INFO", "Redis is ready.");				if (this.getStatus() === "INITIALIZING") resolve();				else if (this.getStatus() === "FAILED" || this.getStatus() === "RECONNECTING") this.setStatus("READY");			});			this.client.connect().then(async () => {				this.log("INFO", "Connected succesfully.");			});			// TODO move to a better place			CacheModule.runJob("KEYS", { pattern: "longJobs.*" }).then(keys => {				async.eachLimit(keys, 1, (key, next) => {					CacheModule.runJob("DEL", { key }).finally(() => {						next();					});				});			});		});	}	/**	 * Quits redis client	 *	 * @returns {Promise} - returns promise (reject, resolve)	 */	QUIT() {		return new Promise(resolve => {			if (CacheModule.client.connected) {				CacheModule.client.quit();				Object.keys(pubs).forEach(channel => pubs[channel].quit());				Object.keys(subs).forEach(channel => subs[channel].client.quit());			}			resolve();		});	}	/**	 * Sets a single value	 *	 * @param {object} payload - object containing payload	 * @param {string} payload.key -  name of the key to set	 * @param {*} payload.value - the value we want to set	 * @param {number} payload.ttl -  ttl of the key in seconds	 * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array	 * @returns {Promise} - returns a promise (resolve, reject)	 */	SET(payload) {		return new Promise((resolve, reject) => {			let { key, value } = payload;			const { ttl } = payload;			if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();			// automatically stringify objects and arrays into JSON			if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);			let options = null;			if (ttl) {				options = {					EX: ttl				};			}			CacheModule.client				.SET(key, value, options)				.then(() => {					let parsed = value;					try {						parsed = JSON.parse(value);					} catch {						// Do nothing					}					resolve(parsed);				})				.catch(err => reject(new Error(err)));		});	}	/**	 * Sets a single value in a table	 *	 * @param {object} payload - object containing payload	 * @param {string} payload.table - name of the table we want to set a key of (table === redis hash)	 * @param {string} payload.key -  name of the key to set	 * @param {*} payload.value - the value we want to set	 * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array	 * @returns {Promise} - returns a promise (resolve, reject)	 */	HSET(payload) {		return new Promise((resolve, reject) => {			let { key } = payload;			let { value } = payload;			if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();			// automatically stringify objects and arrays into JSON			if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);			CacheModule.client				.HSET(payload.table, key, value)				.then(() => resolve(JSON.parse(value)))				.catch(err => reject(new Error(err)));		});	}	/**	 * Gets a single value	 *	 * @param {object} payload - object containing payload	 * @param {string} payload.key - name of the key to fetch	 * @param {boolean} [payload.parseJson=true] - attempt to parse returned data as JSON	 * @returns {Promise} - returns a promise (resolve, reject)	 */	GET(payload) {		return new Promise((resolve, reject) => {			let { key } = payload;			if (!key) {				reject(new Error("Invalid key!"));				return;			}			if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();			CacheModule.client				.GET(key, payload.value)				.then(value => {					if (value && !value.startsWith("{") && !value.startsWith("[")) return resolve(value);					let parsedValue;					try {						parsedValue = JSON.parse(value);					} catch (err) {						return reject(err);					}					return resolve(parsedValue);				})				.catch(err => reject(new Error(err)));		});	}	/**	 * Gets a single value from a table	 *	 * @param {object} payload - object containing payload	 * @param {string} payload.table - name of the table to get the value from (table === redis hash)	 * @param {string} payload.key - name of the key to fetch	 * @param {boolean} [payload.parseJson=true] - attempt to parse returned data as JSON	 * @returns {Promise} - returns a promise (resolve, reject)	 */	HGET(payload) {		return new Promise((resolve, reject) => {			let { key } = payload;			if (!key) {				reject(new Error("Invalid key!"));				return;			}			if (!payload.table) {				reject(new Error("Invalid table!"));				return;			}			if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();			CacheModule.client				.HGET(payload.table, key, payload.value)				.then(value => {					let parsedValue;					try {						parsedValue = JSON.parse(value);					} catch (err) {						return reject(err);					}					return resolve(parsedValue);				})				.catch(err => reject(new Error(err)));		});	}	/**	 * Deletes a single value from a table	 *	 * @param {object} payload - object containing payload	 * @param {string} payload.table - name of the table to delete the value from (table === redis hash)	 * @param {string} payload.key - name of the key to delete	 * @returns {Promise} - returns a promise (resolve, reject)	 */	HDEL(payload) {		return new Promise((resolve, reject) => {			let { key } = payload;			if (!payload.table) {				reject(new Error("Invalid table!"));				return;			}			if (!key) {				reject(new Error("Invalid key!"));				return;			}			if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();			CacheModule.client				.HDEL(payload.table, key)				.then(() => resolve())				.catch(err => reject(new Error(err)));		});	}	/**	 * Returns all the keys for a table	 *	 * @param {object} payload - object containing payload	 * @param {string} payload.table - name of the table to get the values from (table === redis hash)	 * @param {boolean} [payload.parseJson=true] - attempts to parse all values as JSON by default	 * @returns {Promise} - returns a promise (resolve, reject)	 */	HGETALL(payload) {		return new Promise((resolve, reject) => {			if (!payload.table) {				reject(new Error("Invalid table!"));				return;			}			CacheModule.client				.HGETALL(payload.table)				.then(obj => {					if (obj)						Object.keys(obj).forEach(key => {							obj[key] = JSON.parse(obj[key]);						});					else if (!obj) obj = [];					resolve(obj);				})				.catch(err => reject(new Error(err)));		});	}	/**	 * Deletes a single value	 *	 * @param {object} payload - object containing payload	 * @param {string} payload.key - name of the key to delete	 * @returns {Promise} - returns a promise (resolve, reject)	 */	DEL(payload) {		return new Promise((resolve, reject) => {			let { key } = payload;			if (!key) {				reject(new Error("Invalid key!"));				return;			}			if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();			CacheModule.client				.DEL(key)				.then(() => resolve())				.catch(err => reject(new Error(err)));		});	}	/**	 * Publish a message to a channel, caches the redis client connection	 *	 * @param {object} payload - object containing payload	 * @param {string} payload.channel - the name of the channel we want to publish a message to	 * @param {*} payload.value - the value we want to send	 * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array	 * @returns {Promise} - returns a promise (resolve, reject)	 */	PUB(payload) {		return new Promise((resolve, reject) => {			let { value } = payload;			if (!payload.channel) {				reject(new Error("Invalid channel!"));				return;			}			if (!value) {				reject(new Error("Invalid value!"));				return;			}			if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);			CacheModule.client				.publish(payload.channel, value)				.then(() => resolve())				.catch(err => reject(new Error(err)));		});	}	/**	 * Subscribe to a channel, caches the redis client connection	 *	 * @param {object} payload - object containing payload	 * @param {string} payload.channel - name of the channel to subscribe to	 * @param {boolean} [payload.parseJson=true] - parse the message as JSON	 * @returns {Promise} - returns a promise (resolve, reject)	 */	SUB(payload) {		return new Promise((resolve, reject) => {			if (!payload.channel) {				reject(new Error("Invalid channel!"));				return;			}			if (subs[payload.channel] === undefined) {				subs[payload.channel] = {					client: redis.createClient(config.get("redis")),					cbs: []				};				subs[payload.channel].client.connect().then(() => {					subs[payload.channel].client.subscribe(payload.channel, (message, channel) => {						if (message.startsWith("[") || message.startsWith("{"))							try {								message = JSON.parse(message);							} catch (err) {								console.error(err);							}						else if (message.startsWith('"') && message.endsWith('"'))							message = message.substring(1).substring(0, message.length - 2);						subs[channel].cbs.forEach(cb => cb(message));					});				});			}			subs[payload.channel].cbs.push(payload.cb);			resolve();		});	}	/**	 * Gets a full list from Redis	 *	 * @param {object} payload - object containing payload	 * @param {string} payload.key - name of the table to get the value from (table === redis hash)	 * @returns {Promise} - returns a promise (resolve, reject)	 */	LRANGE(payload) {		return new Promise((resolve, reject) => {			let { key } = payload;			if (!key) {				reject(new Error("Invalid key!"));				return;			}			if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();			CacheModule.client				.LRANGE(key, 0, -1)				.then(list => resolve(list))				.catch(err => reject(new Error(err)));		});	}	/**	 * Adds a value to a list in Redis	 *	 * @param {object} payload - object containing payload	 * @param {string} payload.key -  name of the list	 * @param {*} payload.value - the value we want to set	 * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array	 * @returns {Promise} - returns a promise (resolve, reject)	 */	RPUSH(payload) {		return new Promise((resolve, reject) => {			let { key, value } = payload;			if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();			// automatically stringify objects and arrays into JSON			if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);			CacheModule.client				.RPUSH(key, value)				.then(() => resolve())				.catch(err => reject(new Error(err)));		});	}	/**	 * Adds a value to a list in Redis using LPUSH	 *	 * @param {object} payload - object containing payload	 * @param {string} payload.key -  name of the list	 * @param {*} payload.value - the value we want to set	 * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array	 * @returns {Promise} - returns a promise (resolve, reject)	 */	LPUSH(payload) {		return new Promise((resolve, reject) => {			let { key, value } = payload;			if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();			// automatically stringify objects and arrays into JSON			if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);			CacheModule.client				.LPUSH(key, value)				.then(() => resolve())				.catch(err => reject(new Error(err)));		});	}	/**	 * Gets the length of a Redis list	 *	 * @param {object} payload - object containing payload	 * @param {string} payload.key -  name of the list	 * @returns {Promise} - returns a promise (resolve, reject)	 */	LLEN(payload) {		return new Promise((resolve, reject) => {			const { key } = payload;			CacheModule.client				.LLEN(key)				.then(len => resolve(len))				.catch(err => reject(new Error(err)));		});	}	/**	 * Removes an item from a list using RPOP	 *	 * @param {object} payload - object containing payload	 * @param {string} payload.key -  name of the list	 * @returns {Promise} - returns a promise (resolve, reject)	 */	RPOP(payload) {		return new Promise((resolve, reject) => {			const { key } = payload;			CacheModule.client				.RPOP(key)				.then(() => resolve())				.catch(err => reject(new Error(err)));		});	}	/**	 * Removes a value from a list in Redis	 *	 * @param {object} payload - object containing payload	 * @param {string} payload.key -  name of the list	 * @param {*} payload.value - the value we want to remove	 * @param {boolean} [payload.stringifyJson=true] - stringify 'value' if it's an Object or Array	 * @returns {Promise} - returns a promise (resolve, reject)	 */	LREM(payload) {		return new Promise((resolve, reject) => {			let { key, value } = payload;			if (mongoose.Types.ObjectId.isValid(key)) key = key.toString();			// automatically stringify objects and arrays into JSON			if (["object", "array"].includes(typeof value)) value = JSON.stringify(value);			CacheModule.client				.LREM(key, 1, value)				.then(() => resolve())				.catch(err => reject(new Error(err)));		});	}	/**	 * Gets a list of keys in Redis with a matching pattern	 *	 * @param {object} payload - object containing payload	 * @param {string} payload.pattern -  pattern to search for	 * @returns {Promise} - returns a promise (resolve, reject)	 */	KEYS(payload) {		return new Promise((resolve, reject) => {			const { pattern } = payload;			CacheModule.client				.KEYS(pattern)				.then(keys => resolve(keys))				.catch(err => reject(new Error(err)));		});	}	/**	 * Returns a redis schema	 *	 * @param {object} payload - object containing the payload	 * @param {string} payload.schemaName - the name of the schema to get	 * @returns {Promise} - returns promise (reject, resolve)	 */	GET_SCHEMA(payload) {		return new Promise(resolve => {			resolve(CacheModule.schemas[payload.schemaName]);		});	}}export default new _CacheModule();
 |