| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470 | import async from "async";import fs from "fs";import path from "path";import { fileURLToPath } from "url";import CoreClass from "../core";import Timer from "../classes/Timer.class";const __dirname = path.dirname(fileURLToPath(import.meta.url));let TasksModule;let CacheModule;let StationsModule;let UtilsModule;let WSModule;let DBModule;class _TasksModule extends CoreClass {	// eslint-disable-next-line require-jsdoc	constructor() {		super("tasks");		this.tasks = {};		TasksModule = this;	}	/**	 * Initialises the tasks module	 *	 * @returns {Promise} - returns promise (reject, resolve)	 */	initialize() {		return new Promise(resolve => {			CacheModule = this.moduleManager.modules.cache;			StationsModule = this.moduleManager.modules.stations;			UtilsModule = this.moduleManager.modules.utils;			WSModule = this.moduleManager.modules.ws;			DBModule = this.moduleManager.modules.db;			TasksModule.runJob("CREATE_TASK", {				name: "stationSkipTask",				fn: TasksModule.checkStationSkipTask,				timeout: 1000 * 60 * 30			});			TasksModule.runJob("CREATE_TASK", {				name: "sessionClearTask",				fn: TasksModule.sessionClearingTask,				timeout: 1000 * 60 * 60 * 6			});			// TasksModule.runJob("CREATE_TASK", {			// 	name: "logFileSizeCheckTask",			// 	fn: TasksModule.logFileSizeCheckTask,			// 	timeout: 1000 * 60 * 60			// });			TasksModule.runJob("CREATE_TASK", {				name: "collectStationUsersTask",				fn: TasksModule.collectStationUsersTask,				timeout: 1000 * 3			});			resolve();		});	}	/**	 * Creates a new task	 *	 * @param {object} payload - object that contains the payload	 * @param {string} payload.name - the name of the task	 * @param {string} payload.fn - the function the task will run	 * @param {string} payload.paused - if the task is currently paused	 * @param {boolean} payload.timeout - how often to run the task	 * @returns {Promise} - returns promise (reject, resolve)	 */	CREATE_TASK(payload) {		return new Promise((resolve, reject) => {			TasksModule.tasks[payload.name] = {				name: payload.name,				fn: payload.fn,				timeout: payload.timeout,				lastRan: 0,				timer: null			};			if (!payload.paused) {				TasksModule.runJob("RUN_TASK", { name: payload.name }, this)					.then(() => resolve())					.catch(err => reject(err));			} else resolve();		});	}	/**	 * Pauses a task	 *	 * @param {object} payload - object that contains the payload	 * @param {string} payload.taskName - the name of the task to pause	 * @returns {Promise} - returns promise (reject, resolve)	 */	PAUSE_TASK(payload) {		const taskName = { payload };		return new Promise(resolve => {			if (TasksModule.tasks[taskName].timer) TasksModule.tasks[taskName].timer.pause();			resolve();		});	}	/**	 * Resumes a task	 *	 * @param {object} payload - object that contains the payload	 * @param {string} payload.name - the name of the task to resume	 * @returns {Promise} - returns promise (reject, resolve)	 */	RESUME_TASK(payload) {		return new Promise(resolve => {			TasksModule.tasks[payload.name].timer.resume();			resolve();		});	}	/**	 * Runs a task's function and restarts the timer	 *	 * @param {object} payload - object that contains the payload	 * @param {string} payload.name - the name of the task to run	 * @returns {Promise} - returns promise (reject, resolve)	 */	RUN_TASK(payload) {		return new Promise(resolve => {			const task = TasksModule.tasks[payload.name];			if (task.timer) task.timer.pause();			task.fn.apply(this).then(() => {				task.lastRan = Date.now();				task.timer = new Timer(					() => TasksModule.runJob("RUN_TASK", { name: payload.name }),					task.timeout,					false				);				resolve();			});		});	}	/**	 * Periodically checks if any stations need to be skipped	 *	 * @returns {Promise} - returns promise (reject, resolve)	 */	checkStationSkipTask() {		return new Promise(resolve => {			TasksModule.log("INFO", "TASK_STATIONS_SKIP_CHECK", `Checking for stations to be skipped.`, false);			async.waterfall(				[					next => {						CacheModule.runJob("HGETALL", { table: "stations" })							.then(response => next(null, response))							.catch(next);					},					(stations, next) => {						async.each(							stations,							(station, next2) => {								if (station.paused || !station.currentSong || !station.currentSong.title)									return next2();								const timeElapsed = Date.now() - station.startedAt - station.timePaused;								if (timeElapsed <= station.currentSong.duration) return next2();								TasksModule.log(									"ERROR",									"TASK_STATIONS_SKIP_CHECK",									`Skipping ${station._id} as it should have skipped already.`								);								return StationsModule.runJob("INITIALIZE_STATION", {									stationId: station._id								}).then(() => next2());							},							() => next()						);					}				],				() => resolve()			);		});	}	/**	 * Periodically checks if any sessions are out of date and need to be cleared	 *	 * @returns {Promise} - returns promise (reject, resolve)	 */	sessionClearingTask() {		return new Promise(resolve => {			TasksModule.log("INFO", "TASK_SESSION_CLEAR", `Checking for sessions to be cleared.`);			async.waterfall(				[					next => {						CacheModule.runJob("HGETALL", { table: "sessions" })							.then(sessions => next(null, sessions))							.catch(next);					},					(sessions, next) => {						if (!sessions) return next();						const keys = Object.keys(sessions);						return async.each(							keys,							(sessionId, next2) => {								const session = sessions[sessionId];								if (									session &&									session.refreshDate &&									Date.now() - session.refreshDate < 60 * 60 * 24 * 30 * 1000								)									return next2();								if (!session) {									TasksModule.log("INFO", "TASK_SESSION_CLEAR", "Removing an empty session.");									return CacheModule.runJob("HDEL", {										table: "sessions",										key: sessionId									}).finally(() => {										next2();									});								}								if (!session.refreshDate) {									session.refreshDate = Date.now();									return CacheModule.runJob("HSET", {										table: "sessions",										key: sessionId,										value: session									}).finally(() => next2());								}								if (Date.now() - session.refreshDate > 60 * 60 * 24 * 30 * 1000) {									return WSModule.runJob("SOCKETS_FROM_SESSION_ID", {										sessionId: session.sessionId									}).then(sockets => {										if (sockets.length > 0) {											session.refreshDate = Date.now();											CacheModule.runJob("HSET", {												table: "sessions",												key: sessionId,												value: session											}).finally(() => {												next2();											});										} else {											TasksModule.log(												"INFO",												"TASK_SESSION_CLEAR",												`Removing session ${sessionId} for user ${session.userId} since inactive for 30 days and not currently in use.`											);											CacheModule.runJob("HDEL", {												table: "sessions",												key: session.sessionId											}).finally(() => next2());										}									});								}								TasksModule.log("ERROR", "TASK_SESSION_CLEAR", "This should never log.");								return next2();							},							() => next()						);					}				],				() => resolve()			);		});	}	/**	 * Periodically warns about the size of any log files	 *	 * @returns {Promise} - returns promise (reject, resolve)	 */	logFileSizeCheckTask() {		return new Promise((resolve, reject) => {			TasksModule.log("INFO", "TASK_LOG_FILE_SIZE_CHECK", `Checking the size for the log files.`);			async.each(				["all.log", "debugStation.log", "error.log", "info.log", "success.log"],				(fileName, next) => {					try {						const stats = fs.statSync(path.resolve(__dirname, "../../log/", fileName));						const mb = stats.size / 1000000;						if (mb > 25) return next(true);						return next();					} catch (err) {						return next(err);					}				},				async err => {					if (err && err !== true) {						err = await UtilsModule.runJob("GET_ERROR", { error: err });						return reject(new Error(err));					}					if (err === true) {						TasksModule.log(							"ERROR",							"LOGGER_FILE_SIZE_WARNING",							"************************************WARNING*************************************"						);						TasksModule.log(							"ERROR",							"LOGGER_FILE_SIZE_WARNING",							"***************ONE OR MORE LOG FILES APPEAR TO BE MORE THAN 25MB****************"						);						TasksModule.log(							"ERROR",							"LOGGER_FILE_SIZE_WARNING",							"****MAKE SURE TO REGULARLY CLEAR UP THE LOG FILES, MANUALLY OR AUTOMATICALLY****"						);						TasksModule.log(							"ERROR",							"LOGGER_FILE_SIZE_WARNING",							"********************************************************************************"						);					}					return resolve();				}			);		});	}	/**	 * Periodically collect users in stations	 *	 * @returns {Promise} - returns promise (reject, resolve)	 */	async collectStationUsersTask() {		const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });		return new Promise(resolve => {			TasksModule.log("INFO", "TASK_COLLECT_STATION_USERS_TASK", `Checking for users in stations.`, false);			const stationsCountUpdated = [];			const stationsUpdated = [];			const oldUsersPerStation = StationsModule.usersPerStation;			const usersPerStation = { loggedIn: [], loggedOut: [] };			const oldUsersPerStationCount = JSON.parse(JSON.stringify(StationsModule.usersPerStationCount));			const usersPerStationCount = {};			async.each(				Object.keys(StationsModule.userList),				(socketId, next) => {					WSModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId }).then(async socket => {						const stationId = StationsModule.userList[socketId];						const room = await WSModule.runJob("GET_SOCKETS_FOR_ROOM", {							room: `station.${stationId}`						});						if (!socket || !room.includes(socketId)) {							if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);							if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(String(stationId));							delete StationsModule.userList[socketId];							return next();						}						if (!usersPerStationCount[stationId]) usersPerStationCount[stationId] = 0; // start count for station						if (!usersPerStation[stationId]) usersPerStation[stationId] = { loggedIn: [], loggedOut: [] };						return async.waterfall(							[								next => {									if (!socket.session || !socket.session.sessionId) {										return next("No session found.", { ip: socket.ip });									}									return CacheModule.runJob("HGET", {										table: "sessions",										key: socket.session.sessionId									})										.then(session => next(null, session))										.catch(next);								},								(session, next) => {									if (!session) return next("Session not found.");									return userModel.findOne({ _id: session.userId }, next);								},								(user, next) => {									if (!user) return next("User not found.");									if (usersPerStation[stationId].loggedIn.some(u => user.username === u.username))										return next("User already in the list.");									usersPerStationCount[stationId] += 1; // increment user count for station									return next(null, {										username: user.username,										name: user.name,										avatar: user.avatar									});								}							],							(err, user) => {								if (!err) usersPerStation[stationId].loggedIn.push(user);								// if user is logged out (an ip can only be counted once)								if (									err === "No session found." &&									!usersPerStation[stationId].loggedOut.some(u => user.ip === u.ip)								) {									usersPerStationCount[stationId] += 1; // increment user count for station									usersPerStation[stationId].loggedOut.push(user);								}								next();							}						);					});				},				() => {					Object.keys(usersPerStationCount).forEach(stationId => {						if (							oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId] &&							stationsCountUpdated.indexOf(stationId) === -1						) {							this.log("INFO", "UPDATE_STATION_USER_COUNT", `Updating user count of ${stationId}.`);							CacheModule.runJob("PUB", {								channel: "station.updateUserCount",								value: { stationId, usersPerStationCount: usersPerStationCount[stationId] }							});						}					});					Object.keys(usersPerStation).forEach(stationId => {						if (							!oldUsersPerStation[stationId] ||							JSON.stringify(oldUsersPerStation[stationId]) !==								JSON.stringify(usersPerStation[stationId]) ||							oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId]						) {							this.log("INFO", "UPDATE_STATION_USER_LIST", `Updating user list of ${stationId}.`);							CacheModule.runJob("PUB", {								channel: "station.updateUsers",								value: { stationId, usersPerStation: usersPerStation[stationId] }							});						}					});					StationsModule.usersPerStationCount = usersPerStationCount;					StationsModule.usersPerStation = usersPerStation;				}			);			resolve();		});	}}export default new _TasksModule();
 |