| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474 | 
							- import async from "async";
 
- import fs from "fs";
 
- import path from "path";
 
- import { fileURLToPath } from "url";
 
- import CoreClass from "../core";
 
- import Timer from "../classes/Timer.class";
 
- const __dirname = path.dirname(fileURLToPath(import.meta.url));
 
- let TasksModule;
 
- let CacheModule;
 
- let StationsModule;
 
- let UtilsModule;
 
- let WSModule;
 
- let DBModule;
 
- class _TasksModule extends CoreClass {
 
- 	// eslint-disable-next-line require-jsdoc
 
- 	constructor() {
 
- 		super("tasks");
 
- 		this.tasks = {};
 
- 		TasksModule = this;
 
- 	}
 
- 	/**
 
- 	 * Initialises the tasks module
 
- 	 *
 
- 	 * @returns {Promise} - returns promise (reject, resolve)
 
- 	 */
 
- 	initialize() {
 
- 		return new Promise(resolve => {
 
- 			// return reject(new Error("Not fully migrated yet."));
 
- 			CacheModule = this.moduleManager.modules.cache;
 
- 			StationsModule = this.moduleManager.modules.stations;
 
- 			UtilsModule = this.moduleManager.modules.utils;
 
- 			WSModule = this.moduleManager.modules.ws;
 
- 			DBModule = this.moduleManager.modules.db;
 
- 			// this.createTask("testTask", testTask, 5000, true);
 
- 			TasksModule.runJob("CREATE_TASK", {
 
- 				name: "stationSkipTask",
 
- 				fn: TasksModule.checkStationSkipTask,
 
- 				timeout: 1000 * 60 * 30
 
- 			});
 
- 			TasksModule.runJob("CREATE_TASK", {
 
- 				name: "sessionClearTask",
 
- 				fn: TasksModule.sessionClearingTask,
 
- 				timeout: 1000 * 60 * 60 * 6
 
- 			});
 
- 			// TasksModule.runJob("CREATE_TASK", {
 
- 			// 	name: "logFileSizeCheckTask",
 
- 			// 	fn: TasksModule.logFileSizeCheckTask,
 
- 			// 	timeout: 1000 * 60 * 60
 
- 			// });
 
- 			TasksModule.runJob("CREATE_TASK", {
 
- 				name: "collectStationUsersTask",
 
- 				fn: TasksModule.collectStationUsersTask,
 
- 				timeout: 1000 * 3
 
- 			});
 
- 			resolve();
 
- 		});
 
- 	}
 
- 	/**
 
- 	 * Creates a new task
 
- 	 *
 
- 	 * @param {object} payload - object that contains the payload
 
- 	 * @param {string} payload.name - the name of the task
 
- 	 * @param {string} payload.fn - the function the task will run
 
- 	 * @param {string} payload.paused - if the task is currently paused
 
- 	 * @param {boolean} payload.timeout - how often to run the task
 
- 	 * @returns {Promise} - returns promise (reject, resolve)
 
- 	 */
 
- 	CREATE_TASK(payload) {
 
- 		return new Promise((resolve, reject) => {
 
- 			TasksModule.tasks[payload.name] = {
 
- 				name: payload.name,
 
- 				fn: payload.fn,
 
- 				timeout: payload.timeout,
 
- 				lastRan: 0,
 
- 				timer: null
 
- 			};
 
- 			if (!payload.paused) {
 
- 				TasksModule.runJob("RUN_TASK", { name: payload.name }, this)
 
- 					.then(() => resolve())
 
- 					.catch(err => reject(err));
 
- 			} else resolve();
 
- 		});
 
- 	}
 
- 	/**
 
- 	 * Pauses a task
 
- 	 *
 
- 	 * @param {object} payload - object that contains the payload
 
- 	 * @param {string} payload.taskName - the name of the task to pause
 
- 	 * @returns {Promise} - returns promise (reject, resolve)
 
- 	 */
 
- 	PAUSE_TASK(payload) {
 
- 		const taskName = { payload };
 
- 		return new Promise(resolve => {
 
- 			if (TasksModule.tasks[taskName].timer) TasksModule.tasks[taskName].timer.pause();
 
- 			resolve();
 
- 		});
 
- 	}
 
- 	/**
 
- 	 * Resumes a task
 
- 	 *
 
- 	 * @param {object} payload - object that contains the payload
 
- 	 * @param {string} payload.name - the name of the task to resume
 
- 	 * @returns {Promise} - returns promise (reject, resolve)
 
- 	 */
 
- 	RESUME_TASK(payload) {
 
- 		return new Promise(resolve => {
 
- 			TasksModule.tasks[payload.name].timer.resume();
 
- 			resolve();
 
- 		});
 
- 	}
 
- 	/**
 
- 	 * Runs a task's function and restarts the timer
 
- 	 *
 
- 	 * @param {object} payload - object that contains the payload
 
- 	 * @param {string} payload.name - the name of the task to run
 
- 	 * @returns {Promise} - returns promise (reject, resolve)
 
- 	 */
 
- 	RUN_TASK(payload) {
 
- 		return new Promise(resolve => {
 
- 			const task = TasksModule.tasks[payload.name];
 
- 			if (task.timer) task.timer.pause();
 
- 			task.fn.apply(this).then(() => {
 
- 				task.lastRan = Date.now();
 
- 				task.timer = new Timer(
 
- 					() => TasksModule.runJob("RUN_TASK", { name: payload.name }),
 
- 					task.timeout,
 
- 					false
 
- 				);
 
- 				resolve();
 
- 			});
 
- 		});
 
- 	}
 
- 	/**
 
- 	 * Periodically checks if any stations need to be skipped
 
- 	 *
 
- 	 * @returns {Promise} - returns promise (reject, resolve)
 
- 	 */
 
- 	checkStationSkipTask() {
 
- 		return new Promise(resolve => {
 
- 			TasksModule.log("INFO", "TASK_STATIONS_SKIP_CHECK", `Checking for stations to be skipped.`, false);
 
- 			async.waterfall(
 
- 				[
 
- 					next => {
 
- 						CacheModule.runJob("HGETALL", { table: "stations" })
 
- 							.then(response => next(null, response))
 
- 							.catch(next);
 
- 					},
 
- 					(stations, next) => {
 
- 						async.each(
 
- 							stations,
 
- 							(station, next2) => {
 
- 								if (station.paused || !station.currentSong || !station.currentSong.title)
 
- 									return next2();
 
- 								const timeElapsed = Date.now() - station.startedAt - station.timePaused;
 
- 								if (timeElapsed <= station.currentSong.duration) return next2();
 
- 								TasksModule.log(
 
- 									"ERROR",
 
- 									"TASK_STATIONS_SKIP_CHECK",
 
- 									`Skipping ${station._id} as it should have skipped already.`
 
- 								);
 
- 								return StationsModule.runJob("INITIALIZE_STATION", {
 
- 									stationId: station._id
 
- 								}).then(() => next2());
 
- 							},
 
- 							() => next()
 
- 						);
 
- 					}
 
- 				],
 
- 				() => resolve()
 
- 			);
 
- 		});
 
- 	}
 
- 	/**
 
- 	 * Periodically checks if any sessions are out of date and need to be cleared
 
- 	 *
 
- 	 * @returns {Promise} - returns promise (reject, resolve)
 
- 	 */
 
- 	sessionClearingTask() {
 
- 		return new Promise(resolve => {
 
- 			TasksModule.log("INFO", "TASK_SESSION_CLEAR", `Checking for sessions to be cleared.`);
 
- 			async.waterfall(
 
- 				[
 
- 					next => {
 
- 						CacheModule.runJob("HGETALL", { table: "sessions" })
 
- 							.then(sessions => next(null, sessions))
 
- 							.catch(next);
 
- 					},
 
- 					(sessions, next) => {
 
- 						if (!sessions) return next();
 
- 						const keys = Object.keys(sessions);
 
- 						return async.each(
 
- 							keys,
 
- 							(sessionId, next2) => {
 
- 								const session = sessions[sessionId];
 
- 								if (
 
- 									session &&
 
- 									session.refreshDate &&
 
- 									Date.now() - session.refreshDate < 60 * 60 * 24 * 30 * 1000
 
- 								)
 
- 									return next2();
 
- 								if (!session) {
 
- 									TasksModule.log("INFO", "TASK_SESSION_CLEAR", "Removing an empty session.");
 
- 									return CacheModule.runJob("HDEL", {
 
- 										table: "sessions",
 
- 										key: sessionId
 
- 									}).finally(() => {
 
- 										next2();
 
- 									});
 
- 								}
 
- 								if (!session.refreshDate) {
 
- 									session.refreshDate = Date.now();
 
- 									return CacheModule.runJob("HSET", {
 
- 										table: "sessions",
 
- 										key: sessionId,
 
- 										value: session
 
- 									}).finally(() => next2());
 
- 								}
 
- 								if (Date.now() - session.refreshDate > 60 * 60 * 24 * 30 * 1000) {
 
- 									return WSModule.runJob("SOCKETS_FROM_SESSION_ID", {
 
- 										sessionId: session.sessionId
 
- 									}).then(response => {
 
- 										if (response.sockets.length > 0) {
 
- 											session.refreshDate = Date.now();
 
- 											CacheModule.runJob("HSET", {
 
- 												table: "sessions",
 
- 												key: sessionId,
 
- 												value: session
 
- 											}).finally(() => {
 
- 												next2();
 
- 											});
 
- 										} else {
 
- 											TasksModule.log(
 
- 												"INFO",
 
- 												"TASK_SESSION_CLEAR",
 
- 												`Removing session ${sessionId} for user ${session.userId} since inactive for 30 days and not currently in use.`
 
- 											);
 
- 											CacheModule.runJob("HDEL", {
 
- 												table: "sessions",
 
- 												key: session.sessionId
 
- 											}).finally(() => next2());
 
- 										}
 
- 									});
 
- 								}
 
- 								TasksModule.log("ERROR", "TASK_SESSION_CLEAR", "This should never log.");
 
- 								return next2();
 
- 							},
 
- 							() => next()
 
- 						);
 
- 					}
 
- 				],
 
- 				() => resolve()
 
- 			);
 
- 		});
 
- 	}
 
- 	/**
 
- 	 * Periodically warns about the size of any log files
 
- 	 *
 
- 	 * @returns {Promise} - returns promise (reject, resolve)
 
- 	 */
 
- 	logFileSizeCheckTask() {
 
- 		return new Promise((resolve, reject) => {
 
- 			TasksModule.log("INFO", "TASK_LOG_FILE_SIZE_CHECK", `Checking the size for the log files.`);
 
- 			async.each(
 
- 				["all.log", "debugStation.log", "error.log", "info.log", "success.log"],
 
- 				(fileName, next) => {
 
- 					try {
 
- 						const stats = fs.statSync(path.resolve(__dirname, "../../log/", fileName));
 
- 						const mb = stats.size / 1000000;
 
- 						if (mb > 25) return next(true);
 
- 						return next();
 
- 					} catch (err) {
 
- 						return next(err);
 
- 					}
 
- 				},
 
- 				async err => {
 
- 					if (err && err !== true) {
 
- 						err = await UtilsModule.runJob("GET_ERROR", { error: err });
 
- 						return reject(new Error(err));
 
- 					}
 
- 					if (err === true) {
 
- 						TasksModule.log(
 
- 							"ERROR",
 
- 							"LOGGER_FILE_SIZE_WARNING",
 
- 							"************************************WARNING*************************************"
 
- 						);
 
- 						TasksModule.log(
 
- 							"ERROR",
 
- 							"LOGGER_FILE_SIZE_WARNING",
 
- 							"***************ONE OR MORE LOG FILES APPEAR TO BE MORE THAN 25MB****************"
 
- 						);
 
- 						TasksModule.log(
 
- 							"ERROR",
 
- 							"LOGGER_FILE_SIZE_WARNING",
 
- 							"****MAKE SURE TO REGULARLY CLEAR UP THE LOG FILES, MANUALLY OR AUTOMATICALLY****"
 
- 						);
 
- 						TasksModule.log(
 
- 							"ERROR",
 
- 							"LOGGER_FILE_SIZE_WARNING",
 
- 							"********************************************************************************"
 
- 						);
 
- 					}
 
- 					return resolve();
 
- 				}
 
- 			);
 
- 		});
 
- 	}
 
- 	/**
 
- 	 * Periodically collect users in stations
 
- 	 *
 
- 	 * @returns {Promise} - returns promise (reject, resolve)
 
- 	 */
 
- 	async collectStationUsersTask() {
 
- 		const userModel = await DBModule.runJob("GET_MODEL", { modelName: "user" });
 
- 		return new Promise(resolve => {
 
- 			TasksModule.log("INFO", "TASK_COLLECT_STATION_USERS_TASK", `Checking for users in stations.`, false);
 
- 			const stationsCountUpdated = [];
 
- 			const stationsUpdated = [];
 
- 			const oldUsersPerStation = StationsModule.usersPerStation;
 
- 			const usersPerStation = { loggedIn: [], loggedOut: [] };
 
- 			const oldUsersPerStationCount = JSON.parse(JSON.stringify(StationsModule.usersPerStationCount));
 
- 			const usersPerStationCount = {};
 
- 			async.each(
 
- 				Object.keys(StationsModule.userList),
 
- 				(socketId, next) => {
 
- 					WSModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId }).then(async socket => {
 
- 						const stationId = StationsModule.userList[socketId];
 
- 						const room = await WSModule.runJob("GET_SOCKETS_FOR_ROOM", {
 
- 							room: `station.${stationId}`
 
- 						});
 
- 						if (!socket || !room.includes(socketId)) {
 
- 							if (stationsCountUpdated.indexOf(stationId) === -1) stationsCountUpdated.push(stationId);
 
- 							if (stationsUpdated.indexOf(stationId) === -1) stationsUpdated.push(String(stationId));
 
- 							delete StationsModule.userList[socketId];
 
- 							return next();
 
- 						}
 
- 						if (!usersPerStationCount[stationId]) usersPerStationCount[stationId] = 0; // start count for station
 
- 						if (!usersPerStation[stationId]) usersPerStation[stationId] = { loggedIn: [], loggedOut: [] };
 
- 						return async.waterfall(
 
- 							[
 
- 								next => {
 
- 									if (!socket.session || !socket.session.sessionId) {
 
- 										return next("No session found.", { ip: socket.ip });
 
- 									}
 
- 									return CacheModule.runJob("HGET", {
 
- 										table: "sessions",
 
- 										key: socket.session.sessionId
 
- 									})
 
- 										.then(session => next(null, session))
 
- 										.catch(next);
 
- 								},
 
- 								(session, next) => {
 
- 									if (!session) return next("Session not found.");
 
- 									return userModel.findOne({ _id: session.userId }, next);
 
- 								},
 
- 								(user, next) => {
 
- 									if (!user) return next("User not found.");
 
- 									if (usersPerStation[stationId].loggedIn.some(u => user.username === u.username))
 
- 										return next("User already in the list.");
 
- 									usersPerStationCount[stationId] += 1; // increment user count for station
 
- 									return next(null, {
 
- 										username: user.username,
 
- 										name: user.name,
 
- 										avatar: user.avatar
 
- 									});
 
- 								}
 
- 							],
 
- 							(err, user) => {
 
- 								if (!err) usersPerStation[stationId].loggedIn.push(user);
 
- 								// if user is logged out (an ip can only be counted once)
 
- 								if (
 
- 									err === "No session found." &&
 
- 									!usersPerStation[stationId].loggedOut.some(u => user.ip === u.ip)
 
- 								) {
 
- 									usersPerStationCount[stationId] += 1; // increment user count for station
 
- 									usersPerStation[stationId].loggedOut.push(user);
 
- 								}
 
- 								next();
 
- 							}
 
- 						);
 
- 					});
 
- 				},
 
- 				() => {
 
- 					Object.keys(usersPerStationCount).forEach(stationId => {
 
- 						if (
 
- 							oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId] &&
 
- 							stationsCountUpdated.indexOf(stationId) === -1
 
- 						) {
 
- 							this.log("INFO", "UPDATE_STATION_USER_COUNT", `Updating user count of ${stationId}.`);
 
- 							CacheModule.runJob("PUB", {
 
- 								channel: "station.updateUserCount",
 
- 								value: { stationId, usersPerStationCount: usersPerStationCount[stationId] }
 
- 							});
 
- 						}
 
- 					});
 
- 					Object.keys(usersPerStation).forEach(stationId => {
 
- 						if (
 
- 							!oldUsersPerStation[stationId] ||
 
- 							JSON.stringify(oldUsersPerStation[stationId]) !==
 
- 								JSON.stringify(usersPerStation[stationId]) ||
 
- 							oldUsersPerStationCount[stationId] !== usersPerStationCount[stationId]
 
- 						) {
 
- 							this.log("INFO", "UPDATE_STATION_USER_LIST", `Updating user list of ${stationId}.`);
 
- 							CacheModule.runJob("PUB", {
 
- 								channel: "station.updateUsers",
 
- 								value: { stationId, usersPerStation: usersPerStation[stationId] }
 
- 							});
 
- 						}
 
- 					});
 
- 					StationsModule.usersPerStationCount = usersPerStationCount;
 
- 					StationsModule.usersPerStation = usersPerStation;
 
- 				}
 
- 			);
 
- 			resolve();
 
- 		});
 
- 	}
 
- }
 
- export default new _TasksModule();
 
 
  |