WebSocketModule.ts 7.4 KB


  1. import config from "config";
  2. import express from "express";
  3. import http, { Server, IncomingMessage } from "node:http";
  4. import { RawData, WebSocketServer } from "ws";
  5. import { Types, isObjectIdOrHexString } from "mongoose";
  6. import BaseModule from "@/BaseModule";
  7. import { UniqueMethods } from "@/types/Modules";
  8. import WebSocket from "@/WebSocket";
  9. import JobContext from "@/JobContext";
  10. import Job from "@/Job";
  11. import ModuleManager from "@/ModuleManager";
  12. import JobQueue from "@/JobQueue";
  13. import DataModule from "./DataModule";
  14. export class WebSocketModule extends BaseModule {
  15. private _httpServer?: Server;
  16. private _wsServer?: WebSocketServer;
  17. private _keepAliveInterval?: NodeJS.Timer;
  18. /**
  19. * WebSocket Module
  20. */
  21. public constructor() {
  22. super("websocket");
  23. this._jobConfigDefault = "disabled";
  24. }
  25. /**
  26. * startup - Startup websocket module
  27. */
  28. public override async startup() {
  29. await super.startup();
  30. this._httpServer = http
  31. .createServer(express())
  32. .listen(config.get("port"));
  33. this._wsServer = new WebSocketServer({
  34. server: this._httpServer,
  35. path: "/ws",
  36. WebSocket
  37. });
  38. this._wsServer.on(
  39. "connection",
  40. (socket: WebSocket, request: IncomingMessage) =>
  41. this._handleConnection(socket, request)
  42. );
  43. this._keepAliveInterval = setInterval(() => this._keepAlive(), 45000);
  44. this._wsServer.on("close", async () =>
  45. clearInterval(this._keepAliveInterval)
  46. );
  47. await super._started();
  48. }
  49. /**
  50. * keepAlive - Ping open clients and terminate closed
  51. */
  52. private async _keepAlive() {
  53. if (!this._wsServer) return;
  54. for await (const clients of this._wsServer.clients.entries()) {
  55. await Promise.all(
  56. clients.map(async socket => {
  57. switch (socket.readyState) {
  58. case socket.OPEN:
  59. socket.ping();
  60. break;
  61. case socket.CLOSED:
  62. socket.terminate();
  63. break;
  64. default:
  65. break;
  66. }
  67. })
  68. );
  69. }
  70. }
  71. /**
  72. * handleConnection - Handle websocket connection
  73. */
  74. private async _handleConnection(
  75. socket: WebSocket,
  76. request: IncomingMessage
  77. ) {
  78. if (JobQueue.getStatus().isPaused) {
  79. socket.close();
  80. return;
  81. }
  82. socket.setSocketId(request.headers["sec-websocket-key"]);
  83. let sessionId;
  84. let user;
  85. if (request.headers.cookie) {
  86. sessionId = request.headers.cookie
  87. .split("; ")
  88. .find(
  89. cookie =>
  90. cookie.substring(0, cookie.indexOf("=")) ===
  91. config.get<string>("cookie")
  92. );
  93. sessionId = sessionId?.substring(
  94. sessionId.indexOf("=") + 1,
  95. sessionId.length
  96. );
  97. }
  98. if (sessionId && isObjectIdOrHexString(sessionId)) {
  99. socket.setSessionId(sessionId);
  100. const Session = await DataModule.getModel("sessions");
  101. const session = await Session.findByIdAndUpdate(sessionId, {
  102. updatedAt: Date.now()
  103. });
  104. if (session) {
  105. const User = await DataModule.getModel("users");
  106. user = await User.findById(session.userId);
  107. }
  108. }
  109. const readyData = {
  110. config: {
  111. cookie: config.get("cookie"),
  112. sitename: config.get("sitename"),
  113. recaptcha: {
  114. enabled: config.get("apis.recaptcha.enabled"),
  115. key: config.get("apis.recaptcha.key")
  116. },
  117. githubAuthentication: config.get("apis.github.enabled"),
  118. messages: config.get("messages"),
  119. christmas: config.get("christmas"),
  120. footerLinks: config.get("footerLinks"),
  121. shortcutOverrides: config.get("shortcutOverrides"),
  122. registrationDisabled: config.get("registrationDisabled"),
  123. mailEnabled: config.get("mail.enabled"),
  124. discogsEnabled: config.get("apis.discogs.enabled"),
  125. experimental: {
  126. changable_listen_mode: config.get(
  127. "experimental.changable_listen_mode"
  128. ),
  129. media_session: config.get("experimental.media_session"),
  130. disable_youtube_search: config.get(
  131. "experimental.disable_youtube_search"
  132. ),
  133. station_history: config.get("experimental.station_history"),
  134. soundcloud: config.get("experimental.soundcloud"),
  135. spotify: config.get("experimental.spotify")
  136. }
  137. },
  138. user
  139. };
  140. socket.log({
  141. type: "debug",
  142. message: `WebSocket opened #${socket.getSocketId()}`
  143. });
  144. socket.on("error", error =>
  145. socket.log({
  146. type: "error",
  147. message: error.message,
  148. data: { error }
  149. })
  150. );
  151. socket.on("close", async () => {
  152. await JobQueue.runJob(
  153. "api",
  154. "unsubscribeAll",
  155. {},
  156. {
  157. socketId: socket.getSocketId()
  158. }
  159. );
  160. socket.log({
  161. type: "debug",
  162. message: `WebSocket closed #${socket.getSocketId()}`
  163. });
  164. });
  165. socket.dispatch("ready", readyData);
  166. socket.on("message", message => this._handleMessage(socket, message));
  167. }
  168. /**
  169. * handleMessage - Handle websocket message
  170. */
  171. private async _handleMessage(socket: WebSocket, message: RawData) {
  172. if (JobQueue.getStatus().isPaused) {
  173. socket.close();
  174. return;
  175. }
  176. let callbackRef;
  177. try {
  178. const data = JSON.parse(message.toString());
  179. if (!Array.isArray(data) || data.length < 1)
  180. throw new Error("Invalid request");
  181. const [moduleJob, payload, options] = data;
  182. const [moduleName, ...jobNameParts] = moduleJob.split(".");
  183. const jobName = jobNameParts.join(".");
  184. const { callbackRef } = options ?? payload ?? {};
  185. if (!callbackRef)
  186. throw new Error(
  187. `No callback reference provided for job ${moduleJob}`
  188. );
  189. const module = ModuleManager.getModule(moduleName);
  190. if (!module) throw new Error(`Module "${moduleName}" not found`);
  191. const job = module.getJob(jobName);
  192. if (!job.api) throw new Error(`Job "${jobName}" not found.`);
  193. let session;
  194. if (socket.getSessionId()) {
  195. const Session = await DataModule.getModel("sessions");
  196. session = await Session.findByIdAndUpdate(
  197. socket.getSessionId(),
  198. {
  199. updatedAt: Date.now()
  200. }
  201. );
  202. }
  203. await JobQueue.queueJob(
  204. moduleName,
  205. jobName,
  206. payload,
  207. {},
  208. {
  209. session,
  210. socketId: socket.getSocketId(),
  211. callbackRef
  212. }
  213. );
  214. } catch (error) {
  215. const message = error?.message ?? error;
  216. if (callbackRef)
  217. socket.dispatch("jobCallback", callbackRef, {
  218. status: "error",
  219. message
  220. });
  221. else socket.dispatch("error", message);
  222. }
  223. }
  224. /**
  225. * getSockets - Get websocket clients
  226. */
  227. public async getSockets() {
  228. return this._wsServer?.clients;
  229. }
  230. /**
  231. * getSocket - Get websocket client
  232. */
  233. public async getSocket(socketId?: string, sessionId?: Types.ObjectId) {
  234. if (!this._wsServer) return null;
  235. for (const clients of this._wsServer.clients.entries() as IterableIterator<
  236. [WebSocket, WebSocket]
  237. >) {
  238. const socket = clients.find(socket => {
  239. if (socket.getSocketId() === socketId) return true;
  240. if (socket.getSessionId() === sessionId) return true;
  241. return false;
  242. });
  243. if (socket) return socket;
  244. }
  245. return null;
  246. }
  247. /**
  248. * dispatch - Dispatch message to socket
  249. */
  250. public async dispatch(socketId: string, channel: string, ...values) {
  251. const socket = await this.getSocket(socketId);
  252. if (!socket) return;
  253. socket.dispatch(channel, ...values);
  254. }
  255. /**
  256. * shutdown - Shutdown websocket module
  257. */
  258. public override async shutdown() {
  259. await super.shutdown();
  260. if (this._httpServer) this._httpServer.close();
  261. if (this._wsServer) this._wsServer.close();
  262. await this._stopped();
  263. }
  264. }
  265. export type WebSocketModuleJobs = {
  266. [Property in keyof UniqueMethods<WebSocketModule>]: {
  267. payload: Parameters<UniqueMethods<WebSocketModule>[Property]>[1];
  268. returns: Awaited<ReturnType<UniqueMethods<WebSocketModule>[Property]>>;
  269. };
  270. };
  271. export default new WebSocketModule();