|
@@ -4,27 +4,28 @@
|
|
|
|
|
|
import config from "config";
|
|
|
import async from "async";
|
|
|
-import socketio from "socket.io";
|
|
|
+import WebSocket from "ws";
|
|
|
+import { EventEmitter } from "events";
|
|
|
|
|
|
import CoreClass from "../core";
|
|
|
|
|
|
-let IOModule;
|
|
|
+let WSModule;
|
|
|
let AppModule;
|
|
|
let CacheModule;
|
|
|
let UtilsModule;
|
|
|
let DBModule;
|
|
|
let PunishmentsModule;
|
|
|
|
|
|
-class _IOModule extends CoreClass {
|
|
|
+class _WSModule extends CoreClass {
|
|
|
// eslint-disable-next-line require-jsdoc
|
|
|
constructor() {
|
|
|
- super("io");
|
|
|
+ super("ws");
|
|
|
|
|
|
- IOModule = this;
|
|
|
+ WSModule = this;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Initialises the io module
|
|
|
+ * Initialises the ws module
|
|
|
*
|
|
|
* @returns {Promise} - returns promise (reject, resolve)
|
|
|
*/
|
|
@@ -48,55 +49,61 @@ class _IOModule extends CoreClass {
|
|
|
// TODO: Check every 30s/, for all sockets, if they are still allowed to be in the rooms they are in, and on socket at all (permission changing/banning)
|
|
|
const server = await AppModule.runJob("SERVER");
|
|
|
|
|
|
- this._io = socketio(server);
|
|
|
// this._io.origins(config.get("cors.origin"));
|
|
|
|
|
|
+ this._io = new WebSocket.Server({ server, path: "/ws" });
|
|
|
+
|
|
|
+ this.rooms = {};
|
|
|
+
|
|
|
return new Promise(resolve => {
|
|
|
this.setStage(3);
|
|
|
|
|
|
- this._io.use(async (socket, cb) => {
|
|
|
- IOModule.runJob("HANDLE_IO_USE", { socket, cb });
|
|
|
- });
|
|
|
+ this._io.on("connection", async (socket, req) => {
|
|
|
+ socket.dispatch = (...args) => socket.send(JSON.stringify(args));
|
|
|
|
|
|
- this.setStage(4);
|
|
|
+ socket.actions = new EventEmitter();
|
|
|
+ socket.actions.setMaxListeners(0);
|
|
|
+ socket.listen = (target, cb) => socket.actions.addListener(target, args => cb(args));
|
|
|
|
|
|
- this._io.on("connection", async socket => {
|
|
|
- IOModule.runJob("HANDLE_IO_CONNECTION", { socket });
|
|
|
+ WSModule.runJob("HANDLE_WS_USE", { socket, req }).then(socket =>
|
|
|
+ WSModule.runJob("HANDLE_WS_CONNECTION", { socket })
|
|
|
+ );
|
|
|
});
|
|
|
|
|
|
- this.setStage(5);
|
|
|
+ this.setStage(4);
|
|
|
|
|
|
return resolve();
|
|
|
});
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Returns the socket io variable
|
|
|
+ * Returns the websockets variable
|
|
|
*
|
|
|
* @returns {Promise} - returns a promise (resolve, reject)
|
|
|
*/
|
|
|
- IO() {
|
|
|
- return new Promise(resolve => {
|
|
|
- resolve(IOModule._io);
|
|
|
- });
|
|
|
+ WS() {
|
|
|
+ return new Promise(resolve => resolve(WSModule._io));
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Returns whether there is a socket for a session id or not
|
|
|
+ * Obtains socket object for a specified socket id
|
|
|
*
|
|
|
* @param {object} payload - object containing the payload
|
|
|
- * @param {string} payload.sessionId - user session id
|
|
|
+ * @param {string} payload.socketId - the id of the socket
|
|
|
* @returns {Promise} - returns promise (reject, resolve)
|
|
|
*/
|
|
|
- async SOCKET_FROM_SESSION(payload) {
|
|
|
- // socketId
|
|
|
- return new Promise((resolve, reject) => {
|
|
|
- const ns = IOModule._io.of("/");
|
|
|
- if (ns) {
|
|
|
- return resolve(ns.connected[payload.socketId]);
|
|
|
- }
|
|
|
+ async SOCKET_FROM_SOCKET_ID(payload) {
|
|
|
+ return new Promise(resolve => {
|
|
|
+ const { clients } = WSModule._io;
|
|
|
+
|
|
|
+ if (clients)
|
|
|
+ // eslint-disable-next-line consistent-return
|
|
|
+ clients.forEach(socket => {
|
|
|
+ if (socket.session.socketId === payload.socketId) return resolve(socket);
|
|
|
+ });
|
|
|
|
|
|
- return reject();
|
|
|
+ // socket doesn't exist
|
|
|
+ return resolve();
|
|
|
});
|
|
|
}
|
|
|
|
|
@@ -109,20 +116,18 @@ class _IOModule extends CoreClass {
|
|
|
*/
|
|
|
async SOCKETS_FROM_SESSION_ID(payload) {
|
|
|
return new Promise(resolve => {
|
|
|
- const ns = IOModule._io.of("/");
|
|
|
+ const { clients } = WSModule._io;
|
|
|
const sockets = [];
|
|
|
|
|
|
- if (ns) {
|
|
|
+ if (clients) {
|
|
|
return async.each(
|
|
|
- Object.keys(ns.connected),
|
|
|
+ Object.keys(clients),
|
|
|
(id, next) => {
|
|
|
- const { session } = ns.connected[id];
|
|
|
+ const { session } = clients[id];
|
|
|
if (session.sessionId === payload.sessionId) sockets.push(session.sessionId);
|
|
|
next();
|
|
|
},
|
|
|
- () => {
|
|
|
- resolve({ sockets });
|
|
|
- }
|
|
|
+ () => resolve(sockets)
|
|
|
);
|
|
|
}
|
|
|
|
|
@@ -139,33 +144,30 @@ class _IOModule extends CoreClass {
|
|
|
*/
|
|
|
async SOCKETS_FROM_USER(payload) {
|
|
|
return new Promise((resolve, reject) => {
|
|
|
- const ns = IOModule._io.of("/");
|
|
|
const sockets = [];
|
|
|
|
|
|
- if (ns) {
|
|
|
- return async.eachLimit(
|
|
|
- Object.keys(ns.connected),
|
|
|
- 1,
|
|
|
- (id, next) => {
|
|
|
- const { session } = ns.connected[id];
|
|
|
-
|
|
|
- if (session.sessionId) {
|
|
|
- CacheModule.runJob("HGET", { table: "sessions", key: session.sessionId }, this)
|
|
|
- .then(session => {
|
|
|
- if (session && session.userId === payload.userId) sockets.push(ns.connected[id]);
|
|
|
- next();
|
|
|
- })
|
|
|
- .catch(err => next(err));
|
|
|
- } else next();
|
|
|
- },
|
|
|
- err => {
|
|
|
- if (err) return reject(err);
|
|
|
- return resolve({ sockets });
|
|
|
+ return async.eachLimit(
|
|
|
+ WSModule._io.clients,
|
|
|
+ 1,
|
|
|
+ (socket, next) => {
|
|
|
+ const { sessionId } = socket.session;
|
|
|
+
|
|
|
+ if (sessionId) {
|
|
|
+ return CacheModule.runJob("HGET", { table: "sessions", key: sessionId }, this)
|
|
|
+ .then(session => {
|
|
|
+ if (session && session.userId === payload.userId) sockets.push(socket);
|
|
|
+ next();
|
|
|
+ })
|
|
|
+ .catch(err => next(err));
|
|
|
}
|
|
|
- );
|
|
|
- }
|
|
|
|
|
|
- return resolve();
|
|
|
+ return next();
|
|
|
+ },
|
|
|
+ err => {
|
|
|
+ if (err) return reject(err);
|
|
|
+ return resolve(sockets);
|
|
|
+ }
|
|
|
+ );
|
|
|
});
|
|
|
}
|
|
|
|
|
@@ -178,35 +180,24 @@ class _IOModule extends CoreClass {
|
|
|
*/
|
|
|
async SOCKETS_FROM_IP(payload) {
|
|
|
return new Promise(resolve => {
|
|
|
- const ns = IOModule._io.of("/");
|
|
|
+ const { clients } = WSModule._io;
|
|
|
+
|
|
|
const sockets = [];
|
|
|
- if (ns) {
|
|
|
- return async.each(
|
|
|
- Object.keys(ns.connected),
|
|
|
- (id, next) => {
|
|
|
- const { session } = ns.connected[id];
|
|
|
-
|
|
|
- CacheModule.runJob(
|
|
|
- "HGET",
|
|
|
- {
|
|
|
- table: "sessions",
|
|
|
- key: session.sessionId
|
|
|
- },
|
|
|
- this
|
|
|
- )
|
|
|
- .then(session => {
|
|
|
- if (session && ns.connected[id].ip === payload.ip) sockets.push(ns.connected[id]);
|
|
|
- next();
|
|
|
- })
|
|
|
- .catch(() => next());
|
|
|
- },
|
|
|
- () => {
|
|
|
- resolve(sockets);
|
|
|
- }
|
|
|
- );
|
|
|
- }
|
|
|
|
|
|
- return resolve();
|
|
|
+ return async.each(
|
|
|
+ Object.keys(clients),
|
|
|
+ (id, next) => {
|
|
|
+ const { session } = clients[id];
|
|
|
+
|
|
|
+ CacheModule.runJob("HGET", { table: "sessions", key: session.sessionId }, this)
|
|
|
+ .then(session => {
|
|
|
+ if (session && clients[id].ip === payload.ip) sockets.push(clients[id]);
|
|
|
+ next();
|
|
|
+ })
|
|
|
+ .catch(() => next());
|
|
|
+ },
|
|
|
+ () => resolve(sockets)
|
|
|
+ );
|
|
|
});
|
|
|
}
|
|
|
|
|
@@ -219,20 +210,18 @@ class _IOModule extends CoreClass {
|
|
|
*/
|
|
|
async SOCKETS_FROM_USER_WITHOUT_CACHE(payload) {
|
|
|
return new Promise(resolve => {
|
|
|
- const ns = IOModule._io.of("/");
|
|
|
+ const { clients } = WSModule._io;
|
|
|
const sockets = [];
|
|
|
|
|
|
- if (ns) {
|
|
|
+ if (clients) {
|
|
|
return async.each(
|
|
|
- Object.keys(ns.connected),
|
|
|
+ Object.keys(clients),
|
|
|
(id, next) => {
|
|
|
- const { session } = ns.connected[id];
|
|
|
- if (session.userId === payload.userId) sockets.push(ns.connected[id]);
|
|
|
+ const { session } = clients[id];
|
|
|
+ if (session.userId === payload.userId) sockets.push(clients[id]);
|
|
|
next();
|
|
|
},
|
|
|
- () => {
|
|
|
- resolve({ sockets });
|
|
|
- }
|
|
|
+ () => resolve(sockets)
|
|
|
);
|
|
|
}
|
|
|
|
|
@@ -248,20 +237,10 @@ class _IOModule extends CoreClass {
|
|
|
* @returns {Promise} - returns promise (reject, resolve)
|
|
|
*/
|
|
|
async SOCKET_LEAVE_ROOMS(payload) {
|
|
|
- const socket = await IOModule.runJob(
|
|
|
- "SOCKET_FROM_SESSION",
|
|
|
- {
|
|
|
- socketId: payload.socketId
|
|
|
- },
|
|
|
- this
|
|
|
- );
|
|
|
-
|
|
|
return new Promise(resolve => {
|
|
|
- const { rooms } = socket;
|
|
|
-
|
|
|
- Object.keys(rooms).forEach(roomKey => {
|
|
|
- const room = rooms[roomKey];
|
|
|
- socket.leave(room);
|
|
|
+ // filter out rooms that the user is in
|
|
|
+ Object.keys(WSModule.rooms).forEach(room => {
|
|
|
+ WSModule.rooms[room] = WSModule.rooms[room].filter(participant => participant !== payload.socketId);
|
|
|
});
|
|
|
|
|
|
return resolve();
|
|
@@ -269,118 +248,114 @@ class _IOModule extends CoreClass {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Allows a socket to join a specified room
|
|
|
+ * Allows a socket to join a specified room (this will remove them from any rooms they are currently in)
|
|
|
*
|
|
|
* @param {object} payload - object that contains the payload
|
|
|
* @param {string} payload.socketId - the id of the socket which should join the room
|
|
|
- * @param {object} payload.room - the object representing the room the socket should join
|
|
|
+ * @param {string} payload.room - the name of the room
|
|
|
* @returns {Promise} - returns promise (reject, resolve)
|
|
|
*/
|
|
|
async SOCKET_JOIN_ROOM(payload) {
|
|
|
- const socket = await IOModule.runJob(
|
|
|
- "SOCKET_FROM_SESSION",
|
|
|
- {
|
|
|
- socketId: payload.socketId
|
|
|
- },
|
|
|
- this
|
|
|
- );
|
|
|
+ const { room, socketId } = payload;
|
|
|
|
|
|
- return new Promise(resolve => {
|
|
|
- const { rooms } = socket;
|
|
|
- Object.keys(rooms).forEach(roomKey => {
|
|
|
- const room = rooms[roomKey];
|
|
|
- socket.leave(room);
|
|
|
- });
|
|
|
+ // leave all other rooms
|
|
|
+ await WSModule.runJob("SOCKET_LEAVE_ROOMS", { socketId }, this);
|
|
|
|
|
|
- socket.join(payload.room);
|
|
|
+ return new Promise(resolve => {
|
|
|
+ // create room if it doesn't exist, and add socketId to array
|
|
|
+ if (WSModule.rooms[room]) WSModule.rooms[room].push(socketId);
|
|
|
+ else WSModule.rooms[room] = [socketId];
|
|
|
|
|
|
return resolve();
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- // UNKNOWN
|
|
|
- // eslint-disable-next-line require-jsdoc
|
|
|
- async SOCKET_JOIN_SONG_ROOM(payload) {
|
|
|
- // socketId, room
|
|
|
- const socket = await IOModule.runJob(
|
|
|
- "SOCKET_FROM_SESSION",
|
|
|
- {
|
|
|
- socketId: payload.socketId
|
|
|
- },
|
|
|
- this
|
|
|
- );
|
|
|
-
|
|
|
+ /**
|
|
|
+ * Emits arguments to any sockets that are in a specified a room
|
|
|
+ *
|
|
|
+ * @param {object} payload - object that contains the payload
|
|
|
+ * @param {string} payload.room - the name of the room to emit arguments
|
|
|
+ * @param {object} payload.args - any arguments to be emitted to the sockets in the specific room
|
|
|
+ * @returns {Promise} - returns promise (reject, resolve)
|
|
|
+ */
|
|
|
+ async EMIT_TO_ROOM(payload) {
|
|
|
return new Promise(resolve => {
|
|
|
- const { rooms } = socket;
|
|
|
- Object.keys(rooms).forEach(roomKey => {
|
|
|
- const room = rooms[roomKey];
|
|
|
- if (room.indexOf("song.") !== -1) socket.leave(room);
|
|
|
- });
|
|
|
-
|
|
|
- socket.join(payload.room);
|
|
|
+ // if the room exists
|
|
|
+ if (WSModule.rooms[payload.room])
|
|
|
+ return WSModule.rooms[payload.room].forEach(async socketId => {
|
|
|
+ // get every socketId (and thus every socket) in the room, and dispatch to each
|
|
|
+ const socket = await WSModule.runJob("SOCKET_FROM_SOCKET_ID", { socketId }, this);
|
|
|
+ socket.dispatch(...payload.args);
|
|
|
+ return resolve();
|
|
|
+ });
|
|
|
|
|
|
return resolve();
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- // UNKNOWN
|
|
|
- // eslint-disable-next-line require-jsdoc
|
|
|
- SOCKETS_JOIN_SONG_ROOM(payload) {
|
|
|
- // sockets, room
|
|
|
- return new Promise(resolve => {
|
|
|
- Object.keys(payload.sockets).forEach(socketKey => {
|
|
|
- const socket = payload.sockets[socketKey];
|
|
|
+ /**
|
|
|
+ * Allows a socket to join a 'song' room
|
|
|
+ *
|
|
|
+ * @param {object} payload - object that contains the payload
|
|
|
+ * @param {string} payload.socketId - the id of the socket which should join the room
|
|
|
+ * @param {string} payload.room - the name of the room
|
|
|
+ * @returns {Promise} - returns promise (reject, resolve)
|
|
|
+ */
|
|
|
+ async SOCKET_JOIN_SONG_ROOM(payload) {
|
|
|
+ const { room, socketId } = payload;
|
|
|
|
|
|
- const { rooms } = socket;
|
|
|
- Object.keys(rooms).forEach(roomKey => {
|
|
|
- const room = rooms[roomKey];
|
|
|
- if (room.indexOf("song.") !== -1) socket.leave(room);
|
|
|
- });
|
|
|
+ // leave any other song rooms the user is in
|
|
|
+ await WSModule.runJob("SOCKETS_LEAVE_SONG_ROOMS", { sockets: [socketId] }, this);
|
|
|
|
|
|
- socket.join(payload.room);
|
|
|
- });
|
|
|
+ return new Promise(resolve => {
|
|
|
+ // join the room
|
|
|
+ if (WSModule.rooms[room]) WSModule.rooms[room].push(socketId);
|
|
|
+ else WSModule.rooms[room] = [socketId];
|
|
|
|
|
|
return resolve();
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- // UNKNOWN
|
|
|
- // eslint-disable-next-line require-jsdoc
|
|
|
- SOCKETS_LEAVE_SONG_ROOMS(payload) {
|
|
|
- // sockets
|
|
|
+ /**
|
|
|
+ * Allows multiple sockets to join a 'song' room
|
|
|
+ *
|
|
|
+ * @param {object} payload - object that contains the payload
|
|
|
+ * @param {Array} payload.sockets - array of socketIds
|
|
|
+ * @param {object} payload.room - the name of the room
|
|
|
+ * @returns {Promise} - returns promise (reject, resolve)
|
|
|
+ */
|
|
|
+ SOCKETS_JOIN_SONG_ROOM(payload) {
|
|
|
return new Promise(resolve => {
|
|
|
- Object.keys(payload.sockets).forEach(socketKey => {
|
|
|
- const socket = payload.sockets[socketKey];
|
|
|
- const { rooms } = socket;
|
|
|
- Object.keys(rooms).forEach(roomKey => {
|
|
|
- const room = rooms[roomKey];
|
|
|
- if (room.indexOf("song.") !== -1) socket.leave(room);
|
|
|
- });
|
|
|
- });
|
|
|
- resolve();
|
|
|
+ Promise.allSettled(
|
|
|
+ payload.sockets.map(async socketId => {
|
|
|
+ await WSModule.runJob("SOCKET_JOIN_SONG_ROOM", { socketId }, this);
|
|
|
+ })
|
|
|
+ ).then(() => resolve());
|
|
|
});
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Emits arguments to any sockets that are in a specified a room
|
|
|
+ * Allows multiple sockets to leave any 'song' rooms they are in
|
|
|
*
|
|
|
* @param {object} payload - object that contains the payload
|
|
|
- * @param {string} payload.room - the name of the room to emit arguments
|
|
|
- * @param {object} payload.args - any arguments to be emitted to the sockets in the specific room
|
|
|
+ * @param {Array} payload.sockets - array of socketIds
|
|
|
* @returns {Promise} - returns promise (reject, resolve)
|
|
|
*/
|
|
|
- async EMIT_TO_ROOM(payload) {
|
|
|
- return new Promise(resolve => {
|
|
|
- const { sockets } = IOModule._io.sockets;
|
|
|
- Object.keys(sockets).forEach(socketKey => {
|
|
|
- const socket = sockets[socketKey];
|
|
|
- if (socket.rooms[payload.room]) {
|
|
|
- socket.emit(...payload.args);
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
- return resolve();
|
|
|
- });
|
|
|
+ SOCKETS_LEAVE_SONG_ROOMS(payload) {
|
|
|
+ return new Promise(resolve =>
|
|
|
+ Promise.allSettled(
|
|
|
+ payload.sockets.map(async socketId => {
|
|
|
+ const rooms = await WSModule.runJob("GET_ROOMS_FOR_SOCKET", { socketId }, this);
|
|
|
+
|
|
|
+ rooms.forEach(room => {
|
|
|
+ if (room.indexOf("song.") !== -1)
|
|
|
+ WSModule.rooms[room] = WSModule.rooms[room].filter(
|
|
|
+ participant => participant !== payload.socketId
|
|
|
+ );
|
|
|
+ });
|
|
|
+ })
|
|
|
+ ).then(() => resolve())
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -390,47 +365,55 @@ class _IOModule extends CoreClass {
|
|
|
* @param {string} payload.room - the name of the room
|
|
|
* @returns {Promise} - returns promise (reject, resolve)
|
|
|
*/
|
|
|
- async GET_ROOM_SOCKETS(payload) {
|
|
|
+ async GET_SOCKETS_FOR_ROOM(payload) {
|
|
|
return new Promise(resolve => {
|
|
|
- const { sockets } = IOModule._io.sockets;
|
|
|
- const roomSockets = [];
|
|
|
- Object.keys(sockets).forEach(socketKey => {
|
|
|
- const socket = sockets[socketKey];
|
|
|
- if (socket.rooms[payload.room]) roomSockets.push(socket);
|
|
|
+ if (WSModule.rooms[payload.room]) return resolve(WSModule.rooms[payload.room]);
|
|
|
+ return resolve([]);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Gets any rooms a socket is connected to
|
|
|
+ *
|
|
|
+ * @param {object} payload - object that contains the payload
|
|
|
+ * @param {string} payload.socketId - the id of the socket to check the rooms for
|
|
|
+ * @returns {Promise} - returns promise (reject, resolve)
|
|
|
+ */
|
|
|
+ async GET_ROOMS_FOR_SOCKET(payload) {
|
|
|
+ return new Promise(resolve => {
|
|
|
+ const rooms = [];
|
|
|
+
|
|
|
+ Object.keys(WSModule.rooms).forEach(room => {
|
|
|
+ if (WSModule.rooms[room].includes(payload.socketId)) rooms.push(room);
|
|
|
});
|
|
|
|
|
|
- return resolve(roomSockets);
|
|
|
+ return resolve(rooms);
|
|
|
});
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Handles io.use
|
|
|
+ * Handles use of websockets
|
|
|
*
|
|
|
* @param {object} payload - object that contains the payload
|
|
|
* @returns {Promise} - returns promise (reject, resolve)
|
|
|
*/
|
|
|
- async HANDLE_IO_USE(payload) {
|
|
|
+ async HANDLE_WS_USE(payload) {
|
|
|
return new Promise(resolve => {
|
|
|
- const { socket, cb } = payload;
|
|
|
+ const { socket, req } = payload;
|
|
|
+ let SID = "";
|
|
|
|
|
|
- let SID;
|
|
|
-
|
|
|
- socket.ip = socket.request.headers["x-forwarded-for"] || "0.0.0.0";
|
|
|
+ socket.ip = req.headers["x-forwarded-for"] || "0..0.0";
|
|
|
|
|
|
return async.waterfall(
|
|
|
[
|
|
|
next => {
|
|
|
- if (!socket.request.headers.cookie) return next("No cookie exists yet.");
|
|
|
- return UtilsModule.runJob(
|
|
|
- "PARSE_COOKIES",
|
|
|
- {
|
|
|
- cookieString: socket.request.headers.cookie
|
|
|
- },
|
|
|
- this
|
|
|
- ).then(res => {
|
|
|
- SID = res[IOModule.SIDname];
|
|
|
- next(null);
|
|
|
- });
|
|
|
+ if (!req.headers.cookie) return next("No cookie exists yet.");
|
|
|
+ return UtilsModule.runJob("PARSE_COOKIES", { cookieString: req.headers.cookie }, this).then(
|
|
|
+ res => {
|
|
|
+ SID = res[WSModule.SIDname];
|
|
|
+ next(null);
|
|
|
+ }
|
|
|
+ );
|
|
|
},
|
|
|
|
|
|
next => {
|
|
@@ -438,11 +421,10 @@ class _IOModule extends CoreClass {
|
|
|
return next();
|
|
|
},
|
|
|
|
|
|
+ // see if session exists for cookie
|
|
|
next => {
|
|
|
CacheModule.runJob("HGET", { table: "sessions", key: SID }, this)
|
|
|
- .then(session => {
|
|
|
- next(null, session);
|
|
|
- })
|
|
|
+ .then(session => next(null, session))
|
|
|
.catch(next);
|
|
|
},
|
|
|
|
|
@@ -455,15 +437,9 @@ class _IOModule extends CoreClass {
|
|
|
|
|
|
return CacheModule.runJob(
|
|
|
"HSET",
|
|
|
- {
|
|
|
- table: "sessions",
|
|
|
- key: SID,
|
|
|
- value: session
|
|
|
- },
|
|
|
+ { table: "sessions", key: SID, value: session },
|
|
|
this
|
|
|
- ).then(session => {
|
|
|
- next(null, session);
|
|
|
- });
|
|
|
+ ).then(session => next(null, session));
|
|
|
},
|
|
|
|
|
|
(res, next) => {
|
|
@@ -490,81 +466,61 @@ class _IOModule extends CoreClass {
|
|
|
|
|
|
next();
|
|
|
})
|
|
|
- .catch(() => {
|
|
|
- next();
|
|
|
- });
|
|
|
+ .catch(() => next());
|
|
|
}
|
|
|
],
|
|
|
() => {
|
|
|
- if (!socket.session) socket.session = { socketId: socket.id };
|
|
|
- else socket.session.socketId = socket.id;
|
|
|
+ if (!socket.session) socket.session = { socketId: req.headers["sec-websocket-key"] };
|
|
|
+ else socket.session.socketId = req.headers["sec-websocket-key"];
|
|
|
|
|
|
- cb();
|
|
|
- resolve();
|
|
|
+ resolve(socket);
|
|
|
}
|
|
|
);
|
|
|
});
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Handles io.connection
|
|
|
+ * Handles a websocket connection
|
|
|
*
|
|
|
* @param {object} payload - object that contains the payload
|
|
|
+ * @param {object} payload.socket - socket itself
|
|
|
* @returns {Promise} - returns promise (reject, resolve)
|
|
|
*/
|
|
|
- async HANDLE_IO_CONNECTION(payload) {
|
|
|
+ async HANDLE_WS_CONNECTION(payload) {
|
|
|
return new Promise(resolve => {
|
|
|
const { socket } = payload;
|
|
|
|
|
|
let sessionInfo = "";
|
|
|
-
|
|
|
if (socket.session.sessionId) sessionInfo = ` UserID: ${socket.session.userId}.`;
|
|
|
|
|
|
// if session is banned
|
|
|
if (socket.banishment && socket.banishment.banned) {
|
|
|
- IOModule.log(
|
|
|
+ WSModule.log(
|
|
|
"INFO",
|
|
|
"IO_BANNED_CONNECTION",
|
|
|
`A user tried to connect, but is currently banned. IP: ${socket.ip}.${sessionInfo}`
|
|
|
);
|
|
|
|
|
|
- socket.emit("keep.event:banned", socket.banishment.ban);
|
|
|
+ socket.dispatch("keep.event:banned", socket.banishment.ban);
|
|
|
|
|
|
- return socket.disconnect(true);
|
|
|
+ return socket.close(); // close socket connection
|
|
|
}
|
|
|
|
|
|
- IOModule.log("INFO", "IO_CONNECTION", `User connected. IP: ${socket.ip}.${sessionInfo}`);
|
|
|
+ WSModule.log("INFO", "IO_CONNECTION", `User connected. IP: ${socket.ip}.${sessionInfo}`);
|
|
|
|
|
|
// catch when the socket has been disconnected
|
|
|
- socket.on("disconnect", () => {
|
|
|
+ socket.on("close", async () => {
|
|
|
if (socket.session.sessionId) sessionInfo = ` UserID: ${socket.session.userId}.`;
|
|
|
- IOModule.log("INFO", "IO_DISCONNECTION", `User disconnected. IP: ${socket.ip}.${sessionInfo}`);
|
|
|
- });
|
|
|
-
|
|
|
- socket.use((data, next) => {
|
|
|
- if (data.length === 0) return next(new Error("Not enough arguments specified."));
|
|
|
- if (typeof data[0] !== "string") return next(new Error("First argument must be a string."));
|
|
|
-
|
|
|
- const namespaceAction = data[0];
|
|
|
- if (
|
|
|
- !namespaceAction ||
|
|
|
- namespaceAction.indexOf(".") === -1 ||
|
|
|
- namespaceAction.indexOf(".") !== namespaceAction.lastIndexOf(".")
|
|
|
- )
|
|
|
- return next(new Error("Invalid first argument"));
|
|
|
- const namespace = data[0].split(".")[0];
|
|
|
- const action = data[0].split(".")[1];
|
|
|
+ WSModule.log("INFO", "IO_DISCONNECTION", `User disconnected. IP: ${socket.ip}.${sessionInfo}`);
|
|
|
|
|
|
- if (!namespace) return next(new Error("Invalid namespace."));
|
|
|
- if (!action) return next(new Error("Invalid action."));
|
|
|
- if (!IOModule.actions[namespace]) return next(new Error("Namespace not found."));
|
|
|
- if (!IOModule.actions[namespace][action]) return next(new Error("Action not found."));
|
|
|
-
|
|
|
- return next();
|
|
|
+ // leave all rooms when a socket connection is closed (to prevent rooms object building up)
|
|
|
+ await WSModule.runJob("SOCKET_LEAVE_ROOMS", { socketId: socket.session.socketId });
|
|
|
});
|
|
|
|
|
|
- // catch errors on the socket (internal to socket.io)
|
|
|
- socket.on("error", console.error);
|
|
|
+ // catch errors on the socket
|
|
|
+ socket.onerror = error => {
|
|
|
+ console.error("SOCKET ERROR: ", error);
|
|
|
+ };
|
|
|
|
|
|
if (socket.session.sessionId) {
|
|
|
CacheModule.runJob("HGET", {
|
|
@@ -573,112 +529,68 @@ class _IOModule extends CoreClass {
|
|
|
})
|
|
|
.then(session => {
|
|
|
if (session && session.userId) {
|
|
|
- IOModule.userModel.findOne({ _id: session.userId }, (err, user) => {
|
|
|
- if (err || !user) return socket.emit("ready", false);
|
|
|
+ WSModule.userModel.findOne({ _id: session.userId }, (err, user) => {
|
|
|
+ if (err || !user) return socket.dispatch("ready", false);
|
|
|
|
|
|
let role = "";
|
|
|
let username = "";
|
|
|
let userId = "";
|
|
|
+
|
|
|
if (user) {
|
|
|
role = user.role;
|
|
|
username = user.username;
|
|
|
userId = session.userId;
|
|
|
}
|
|
|
|
|
|
- return socket.emit("ready", true, role, username, userId);
|
|
|
+ return socket.dispatch("ready", true, role, username, userId);
|
|
|
});
|
|
|
- } else socket.emit("ready", false);
|
|
|
+ } else socket.dispatch("ready", false);
|
|
|
})
|
|
|
- .catch(() => {
|
|
|
- socket.emit("ready", false);
|
|
|
- });
|
|
|
- } else socket.emit("ready", false);
|
|
|
+ .catch(() => socket.dispatch("ready", false));
|
|
|
+ } else socket.dispatch("ready", false);
|
|
|
+
|
|
|
+ socket.onmessage = message => {
|
|
|
+ const data = JSON.parse(message.data);
|
|
|
+
|
|
|
+ if (data.length === 0) return socket.dispatch("ERROR", "Not enough arguments specified.");
|
|
|
+ if (typeof data[0] !== "string") return socket.dispatch("ERROR", "First argument must be a string.");
|
|
|
+
|
|
|
+ const namespaceAction = data[0];
|
|
|
+ if (
|
|
|
+ !namespaceAction ||
|
|
|
+ namespaceAction.indexOf(".") === -1 ||
|
|
|
+ namespaceAction.indexOf(".") !== namespaceAction.lastIndexOf(".")
|
|
|
+ )
|
|
|
+ return socket.dispatch("ERROR", "Invalid first argument");
|
|
|
+
|
|
|
+ const namespace = data[0].split(".")[0];
|
|
|
+ const action = data[0].split(".")[1];
|
|
|
+
|
|
|
+ if (!namespace) return socket.dispatch("ERROR", "Invalid namespace.");
|
|
|
+ if (!action) return socket.dispatch("ERROR", "Invalid action.");
|
|
|
+ if (!WSModule.actions[namespace]) return socket.dispatch("ERROR", "Namespace not found.");
|
|
|
+ if (!WSModule.actions[namespace][action]) return socket.dispatch("ERROR", "Action not found.");
|
|
|
+
|
|
|
+ if (data[data.length - 1].CB_REF) {
|
|
|
+ const { CB_REF } = data[data.length - 1];
|
|
|
+ data.pop();
|
|
|
+
|
|
|
+ return socket.actions.emit(data.shift(0), [...data, res => socket.dispatch("CB_REF", CB_REF, res)]);
|
|
|
+ }
|
|
|
+
|
|
|
+ return socket.actions.emit(data.shift(0), data);
|
|
|
+ };
|
|
|
|
|
|
// have the socket listen for each action
|
|
|
- Object.keys(IOModule.actions).forEach(namespace => {
|
|
|
- Object.keys(IOModule.actions[namespace]).forEach(action => {
|
|
|
+ Object.keys(WSModule.actions).forEach(namespace => {
|
|
|
+ Object.keys(WSModule.actions[namespace]).forEach(action => {
|
|
|
// the full name of the action
|
|
|
const name = `${namespace}.${action}`;
|
|
|
|
|
|
// listen for this action to be called
|
|
|
- socket.on(name, async (...args) => {
|
|
|
- IOModule.runJob("RUN_ACTION", { socket, namespace, action, args });
|
|
|
-
|
|
|
- /* let cb = args[args.length - 1];
|
|
|
-
|
|
|
- if (typeof cb !== "function")
|
|
|
- cb = () => {
|
|
|
- IOModule.log("INFO", "IO_MODULE", `There was no callback provided for ${name}.`);
|
|
|
- };
|
|
|
- else args.pop();
|
|
|
-
|
|
|
- if (this.getStatus() !== "READY") {
|
|
|
- IOModule.log(
|
|
|
- "INFO",
|
|
|
- "IO_REJECTED_ACTION",
|
|
|
- `A user tried to execute an action, but the IO module is currently not ready. Action: ${namespace}.${action}.`
|
|
|
- );
|
|
|
- return;
|
|
|
- }
|
|
|
- IOModule.log("INFO", "IO_ACTION", `A user executed an action. Action: ${namespace}.${action}.`);
|
|
|
-
|
|
|
- let failedGettingSession = false;
|
|
|
- // load the session from the cache
|
|
|
- if (socket.session.sessionId)
|
|
|
- await CacheModule.runJob("HGET", {
|
|
|
- table: "sessions",
|
|
|
- key: socket.session.sessionId
|
|
|
- })
|
|
|
- .then(session => {
|
|
|
- // make sure the sockets sessionId isn't set if there is no session
|
|
|
- if (socket.session.sessionId && session === null) delete socket.session.sessionId;
|
|
|
- })
|
|
|
- .catch(() => {
|
|
|
- failedGettingSession = true;
|
|
|
- if (typeof cb === "function")
|
|
|
- cb({
|
|
|
- status: "error",
|
|
|
- message: "An error occurred while obtaining your session"
|
|
|
- });
|
|
|
- });
|
|
|
- if (!failedGettingSession)
|
|
|
- try {
|
|
|
- // call the action, passing it the session, and the arguments socket.io passed us
|
|
|
- this.runJob("RUN_ACTION", { namespace, action, session: socket.session, args })
|
|
|
- .then(response => {
|
|
|
- if (typeof cb === "function") cb(response);
|
|
|
- })
|
|
|
- .catch(err => {
|
|
|
- if (typeof cb === "function") cb(err);
|
|
|
- });
|
|
|
- // actions[namespace][action].apply(
|
|
|
- // null,
|
|
|
- // [socket.session].concat(args).concat([
|
|
|
- // result => {
|
|
|
- // IOModule.log(
|
|
|
- // "INFO",
|
|
|
- // "IO_ACTION",
|
|
|
- // `Response to action. Action: ${namespace}.${action}. Response status: ${result.status}`
|
|
|
- // );
|
|
|
- // // respond to the socket with our message
|
|
|
- // if (typeof cb === "function") cb(result);
|
|
|
- // }
|
|
|
- // ])
|
|
|
- // );
|
|
|
- } catch (err) {
|
|
|
- if (typeof cb === "function")
|
|
|
- cb({
|
|
|
- status: "error",
|
|
|
- message: "An error occurred while executing the specified action."
|
|
|
- });
|
|
|
-
|
|
|
- IOModule.log(
|
|
|
- "ERROR",
|
|
|
- "IO_ACTION_ERROR",
|
|
|
- `Some type of exception occurred in the action ${namespace}.${action}. Error message: ${err.message}`
|
|
|
- );
|
|
|
- } */
|
|
|
- });
|
|
|
+ socket.listen(name, async args =>
|
|
|
+ WSModule.runJob("RUN_ACTION", { socket, namespace, action, args })
|
|
|
+ );
|
|
|
});
|
|
|
});
|
|
|
|
|
@@ -700,13 +612,14 @@ class _IOModule extends CoreClass {
|
|
|
const name = `${namespace}.${action}`;
|
|
|
|
|
|
let cb = args[args.length - 1];
|
|
|
+
|
|
|
if (typeof cb !== "function")
|
|
|
cb = () => {
|
|
|
- IOModule.log("INFO", "IO_MODULE", `There was no callback provided for ${name}.`);
|
|
|
+ WSModule.log("INFO", "IO_MODULE", `There was no callback provided for ${name}.`);
|
|
|
};
|
|
|
else args.pop();
|
|
|
|
|
|
- IOModule.log("INFO", "IO_ACTION", `A user executed an action. Action: ${namespace}.${action}.`);
|
|
|
+ WSModule.log("INFO", "IO_ACTION", `A user executed an action. Action: ${namespace}.${action}.`);
|
|
|
|
|
|
// load the session from the cache
|
|
|
new Promise(resolve => {
|
|
@@ -731,9 +644,9 @@ class _IOModule extends CoreClass {
|
|
|
else resolve();
|
|
|
})
|
|
|
.then(() => {
|
|
|
- // call the job that calls the action, passing it the session, and the arguments socket.io passed us
|
|
|
+ // call the job that calls the action, passing it the session, and the arguments the websocket passed us
|
|
|
|
|
|
- IOModule.runJob("RUN_ACTION2", { session: socket.session, namespace, action, args }, this)
|
|
|
+ WSModule.runJob("RUN_ACTION2", { session: socket.session, namespace, action, args }, this)
|
|
|
.then(response => {
|
|
|
cb(response);
|
|
|
resolve();
|
|
@@ -744,9 +657,10 @@ class _IOModule extends CoreClass {
|
|
|
status: "error",
|
|
|
message: "An error occurred while executing the specified action."
|
|
|
});
|
|
|
+
|
|
|
reject(err);
|
|
|
|
|
|
- IOModule.log(
|
|
|
+ WSModule.log(
|
|
|
"ERROR",
|
|
|
"IO_ACTION_ERROR",
|
|
|
`Some type of exception occurred in the action ${namespace}.${action}. Error message: ${err.message}`
|
|
@@ -768,16 +682,17 @@ class _IOModule extends CoreClass {
|
|
|
const { session, namespace, action, args } = payload;
|
|
|
|
|
|
try {
|
|
|
- // call the the action, passing it the session, and the arguments socket.io passed us
|
|
|
- IOModule.actions[namespace][action].apply(
|
|
|
+ // call the the action, passing it the session, and the arguments the websocket passed us
|
|
|
+ WSModule.actions[namespace][action].apply(
|
|
|
this,
|
|
|
[session].concat(args).concat([
|
|
|
result => {
|
|
|
- IOModule.log(
|
|
|
+ WSModule.log(
|
|
|
"INFO",
|
|
|
"RUN_ACTION2",
|
|
|
`Response to action. Action: ${namespace}.${action}. Response status: ${result.status}`
|
|
|
);
|
|
|
+
|
|
|
resolve(result);
|
|
|
}
|
|
|
])
|
|
@@ -785,7 +700,7 @@ class _IOModule extends CoreClass {
|
|
|
} catch (err) {
|
|
|
reject(err);
|
|
|
|
|
|
- IOModule.log(
|
|
|
+ WSModule.log(
|
|
|
"ERROR",
|
|
|
"IO_ACTION_ERROR",
|
|
|
`Some type of exception occurred in the action ${namespace}.${action}. Error message: ${err.message}`
|
|
@@ -795,4 +710,4 @@ class _IOModule extends CoreClass {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-export default new _IOModule();
|
|
|
+export default new _WSModule();
|