WebSocketModule.ts 8.3 KB


  1. import config from "config";
  2. import express from "express";
  3. import http, { Server, IncomingMessage } from "node:http";
  4. import { RawData, WebSocketServer } from "ws";
  5. import { isObjectIdOrHexString } from "mongoose";
  6. import { forEachIn } from "@common/utils/forEachIn";
  7. import { getErrorMessage } from "@common/utils/getErrorMessage";
  8. import BaseModule from "@/BaseModule";
  9. import WebSocket from "@/WebSocket";
  10. import ModuleManager from "@/ModuleManager";
  11. import JobQueue from "@/JobQueue";
  12. import DataModule from "./DataModule";
  13. import EventsModule from "./EventsModule";
  14. import User from "./DataModule/models/User";
  15. import Session from "./DataModule/models/Session";
  16. // import assertEventDerived from "@/utils/assertEventDerived";
  17. export class WebSocketModule extends BaseModule {
  18. private _httpServer?: Server;
  19. private _wsServer?: WebSocketServer;
  20. private _keepAliveInterval?: NodeJS.Timeout;
  21. /**
  22. * WebSocket Module
  23. */
  24. public constructor() {
  25. super("websocket");
  26. this._dependentModules = ["data", "events"];
  27. }
  28. /**
  29. * startup - Startup websocket module
  30. */
  31. public override async startup() {
  32. await super.startup();
  33. this._httpServer = http
  34. .createServer(express())
  35. .listen(config.get("port"));
  36. this._wsServer = new WebSocketServer({
  37. server: this._httpServer,
  38. path: "/ws",
  39. WebSocket
  40. });
  41. this._wsServer.on(
  42. "connection",
  43. (socket: WebSocket, request: IncomingMessage) =>
  44. this._handleConnection(socket, request)
  45. );
  46. this._keepAliveInterval = setInterval(() => this._keepAlive(), 45000);
  47. this._wsServer.on("close", async () =>
  48. clearInterval(this._keepAliveInterval)
  49. );
  50. await EventsModule.pSubscribe("events.job.completed:*", async event => {
  51. // assertEventDerived(event);
  52. const data = event.getData();
  53. const { socketId, callbackRef } = data;
  54. if (!socketId || !callbackRef) return;
  55. delete data.socketId;
  56. delete data.callbackRef;
  57. this.dispatch(socketId, "jobCallback", callbackRef, data);
  58. });
  59. await EventsModule.pSubscribe(
  60. "data.sessions.deleted:*",
  61. async event => {
  62. // assertEventDerived(event);
  63. const { oldDoc } = event.getData();
  64. for (const clients of this._wsServer!.clients.entries() as IterableIterator<
  65. [WebSocket, WebSocket]
  66. >) {
  67. const socket = clients.find(
  68. socket => socket.getSessionId() === oldDoc._id
  69. );
  70. if (!socket) continue;
  71. socket.close(1000, "logout");
  72. }
  73. }
  74. );
  75. await super._started();
  76. }
  77. /**
  78. * keepAlive - Ping open clients and terminate closed
  79. */
  80. private async _keepAlive() {
  81. if (!this._wsServer) return;
  82. for await (const clients of this._wsServer.clients.entries()) {
  83. await forEachIn(clients, async socket => {
  84. switch (socket.readyState) {
  85. case socket.OPEN:
  86. socket.ping();
  87. break;
  88. case socket.CLOSED:
  89. socket.terminate();
  90. break;
  91. default:
  92. break;
  93. }
  94. });
  95. }
  96. }
  97. /**
  98. * handleConnection - Handle websocket connection
  99. */
  100. private async _handleConnection(
  101. socket: WebSocket,
  102. request: IncomingMessage
  103. ) {
  104. if (JobQueue.getStatus().isPaused) {
  105. socket.close();
  106. return;
  107. }
  108. socket.setSocketId(request.headers["sec-websocket-key"]);
  109. let sessionId;
  110. let user;
  111. if (request.headers.cookie) {
  112. sessionId = request.headers.cookie
  113. .split("; ")
  114. .find(
  115. cookie =>
  116. cookie.substring(0, cookie.indexOf("=")) ===
  117. config.get<string>("cookie")
  118. );
  119. sessionId = sessionId?.substring(
  120. sessionId.indexOf("=") + 1,
  121. sessionId.length
  122. );
  123. }
  124. if (sessionId && isObjectIdOrHexString(sessionId)) {
  125. socket.setSessionId(sessionId);
  126. const Session = await DataModule.getModel<Session>("sessions");
  127. await Session.update(
  128. {
  129. updatedAt: new Date()
  130. },
  131. {
  132. where: {
  133. _id: sessionId
  134. }
  135. }
  136. );
  137. const session = await Session.findByPk(sessionId); // pk = primary key
  138. if (session) {
  139. const User = await DataModule.getModel<User>("users");
  140. user = await User.findByPk(session.userId);
  141. }
  142. }
  143. const readyData = {
  144. config: {
  145. cookie: config.get("cookie"),
  146. sitename: config.get("sitename"),
  147. githubAuthentication: config.get("apis.github.enabled"),
  148. messages: config.get("messages"),
  149. christmas: config.get("christmas"),
  150. footerLinks: config.get("footerLinks"),
  151. shortcutOverrides: config.get("shortcutOverrides"),
  152. registrationDisabled: config.get("registrationDisabled"),
  153. mailEnabled: config.get("mail.enabled"),
  154. discogsEnabled: config.get("apis.discogs.enabled"),
  155. experimental: {
  156. changable_listen_mode: config.get(
  157. "experimental.changable_listen_mode"
  158. ),
  159. media_session: config.get("experimental.media_session"),
  160. disable_youtube_search: config.get(
  161. "experimental.disable_youtube_search"
  162. ),
  163. station_history: config.get("experimental.station_history"),
  164. soundcloud: config.get("experimental.soundcloud"),
  165. spotify: config.get("experimental.spotify")
  166. }
  167. },
  168. user
  169. };
  170. socket.log({
  171. type: "debug",
  172. message: `WebSocket opened #${socket.getSocketId()}`
  173. });
  174. socket.on("error", error =>
  175. socket.log({
  176. type: "error",
  177. message: error.message,
  178. data: { error }
  179. })
  180. );
  181. socket.on("close", async () => {
  182. const socketId = socket.getSocketId();
  183. const Job = EventsModule.getJob("unsubscribeAll");
  184. await JobQueue.runJob(Job, undefined, {
  185. socketId
  186. });
  187. socket.log({
  188. type: "debug",
  189. message: `WebSocket closed #${socketId}`
  190. });
  191. });
  192. socket.dispatch("ready", readyData);
  193. socket.on("message", message => this._handleMessage(socket, message));
  194. }
  195. /**
  196. * handleMessage - Handle websocket message
  197. */
  198. private async _handleMessage(socket: WebSocket, message: RawData) {
  199. if (JobQueue.getStatus().isPaused) {
  200. socket.close();
  201. return;
  202. }
  203. let callbackRef;
  204. try {
  205. const data = JSON.parse(message.toString());
  206. if (!Array.isArray(data) || data.length < 1)
  207. throw new Error("Invalid request");
  208. const [moduleJob, _payload, options] = data;
  209. const moduleName = moduleJob.substring(0, moduleJob.indexOf("."));
  210. const jobName = moduleJob.substring(moduleJob.indexOf(".") + 1);
  211. const { callbackRef } = options ?? _payload ?? {};
  212. if (!callbackRef)
  213. throw new Error(
  214. `No callback reference provided for job ${moduleJob}`
  215. );
  216. const module = ModuleManager.getModule(moduleName);
  217. if (!module) throw new Error(`Module "${moduleName}" not found`);
  218. const Job = module.getJob(jobName);
  219. if (!Job?.isApiEnabled())
  220. throw new Error(`Job "${jobName}" not found.`);
  221. let session;
  222. if (socket.getSessionId()) {
  223. const Session = await DataModule.getModel<Session>("sessions");
  224. await Session.update(
  225. {
  226. updatedAt: new Date()
  227. },
  228. {
  229. where: {
  230. _id: socket.getSessionId()
  231. }
  232. }
  233. );
  234. session = await Session.findByPk(socket.getSessionId());
  235. if (!session) throw new Error("Session not found.");
  236. }
  237. // Transform null to undefined, as JSON doesn't support undefined
  238. const payload = _payload === null ? undefined : _payload;
  239. await JobQueue.queueJob(Job, payload, {
  240. session,
  241. socketId: socket.getSocketId(),
  242. callbackRef
  243. });
  244. } catch (error) {
  245. const message = getErrorMessage(error);
  246. if (callbackRef)
  247. socket.dispatch("jobCallback", callbackRef, {
  248. status: "error",
  249. message
  250. });
  251. else socket.dispatch("error", message);
  252. }
  253. }
  254. /**
  255. * getSockets - Get websocket clients
  256. */
  257. public async getSockets() {
  258. return this._wsServer?.clients;
  259. }
  260. /**
  261. * getSocketById - Get websocket client by id
  262. */
  263. public async getSocketById(socketId: string) {
  264. if (!this._wsServer) return null;
  265. for (const clients of this._wsServer.clients.entries() as IterableIterator<
  266. [WebSocket, WebSocket]
  267. >) {
  268. const socket = clients.find(
  269. socket => socket.getSocketId() === socketId
  270. );
  271. if (socket) return socket;
  272. }
  273. return null;
  274. }
  275. /**
  276. * dispatch - Dispatch message to socket
  277. */
  278. public async dispatch(
  279. socketId: string,
  280. channel: string,
  281. ...values: unknown[]
  282. ) {
  283. const socket = await this.getSocketById(socketId);
  284. if (!socket) return;
  285. socket.dispatch(channel, ...values);
  286. }
  287. /**
  288. * shutdown - Shutdown websocket module
  289. */
  290. public override async shutdown() {
  291. await super.shutdown();
  292. if (this._httpServer) this._httpServer.close();
  293. if (this._wsServer) this._wsServer.close();
  294. await this._stopped();
  295. }
  296. }
  297. export default new WebSocketModule();