WebSocketModule.ts 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. import config from "config";
  2. import express from "express";
  3. import http, { Server, IncomingMessage } from "node:http";
  4. import { RawData, WebSocketServer } from "ws";
  5. import BaseModule from "../BaseModule";
  6. import { UniqueMethods } from "../types/Modules";
  7. import WebSocket from "../WebSocket";
  8. export default class WebSocketModule extends BaseModule {
  9. private httpServer?: Server;
  10. private wsServer?: WebSocketServer;
  11. private keepAliveInterval?: NodeJS.Timer;
  12. /**
  13. * WebSocket Module
  14. */
  15. public constructor() {
  16. super("websocket");
  17. }
  18. /**
  19. * startup - Startup websocket module
  20. */
  21. public override async startup() {
  22. await super.startup();
  23. this.httpServer = http
  24. .createServer(express())
  25. .listen(config.get("port"));
  26. this.wsServer = new WebSocketServer({
  27. server: this.httpServer,
  28. path: "/ws",
  29. WebSocket
  30. });
  31. this.wsServer.on(
  32. "connection",
  33. (socket: WebSocket, request: IncomingMessage) =>
  34. this.handleConnection(socket, request)
  35. );
  36. this.keepAliveInterval = setInterval(() => this.keepAlive(), 45000);
  37. this.wsServer.on("close", async () =>
  38. clearInterval(this.keepAliveInterval)
  39. );
  40. await super.started();
  41. }
  42. /**
  43. * keepAlive - Ping open clients and terminate closed
  44. */
  45. private async keepAlive() {
  46. if (!this.wsServer) return;
  47. for await (const clients of this.wsServer.clients.entries()) {
  48. await Promise.all(
  49. clients.map(async socket => {
  50. switch (socket.readyState) {
  51. case socket.OPEN:
  52. socket.ping();
  53. break;
  54. case socket.CLOSED:
  55. socket.terminate();
  56. break;
  57. default:
  58. break;
  59. }
  60. })
  61. );
  62. }
  63. }
  64. /**
  65. * handleConnection - Handle websocket connection
  66. */
  67. private async handleConnection(
  68. socket: WebSocket,
  69. request: IncomingMessage
  70. ) {
  71. if (this.jobQueue.getStatus().isPaused) {
  72. socket.close();
  73. return;
  74. }
  75. socket.log({ type: "debug", message: "WebSocket #ID connected" });
  76. socket.on("error", error =>
  77. socket.log({
  78. type: "error",
  79. message: error.message,
  80. data: { error }
  81. })
  82. );
  83. socket.on("close", () =>
  84. socket.log({ type: "debug", message: "WebSocket #ID closed" })
  85. );
  86. const readyData = {
  87. config: {
  88. cookie: config.get("cookie"),
  89. sitename: config.get("sitename"),
  90. recaptcha: {
  91. enabled: config.get("apis.recaptcha.enabled"),
  92. key: config.get("apis.recaptcha.key")
  93. },
  94. githubAuthentication: config.get("apis.github.enabled"),
  95. messages: config.get("messages"),
  96. christmas: config.get("christmas"),
  97. footerLinks: config.get("footerLinks"),
  98. shortcutOverrides: config.get("shortcutOverrides"),
  99. registrationDisabled: config.get("registrationDisabled"),
  100. mailEnabled: config.get("mail.enabled"),
  101. discogsEnabled: config.get("apis.discogs.enabled"),
  102. experimental: {
  103. changable_listen_mode: config.get(
  104. "experimental.changable_listen_mode"
  105. ),
  106. media_session: config.get("experimental.media_session"),
  107. disable_youtube_search: config.get(
  108. "experimental.disable_youtube_search"
  109. ),
  110. station_history: config.get("experimental.station_history"),
  111. soundcloud: config.get("experimental.soundcloud"),
  112. spotify: config.get("experimental.spotify")
  113. }
  114. },
  115. user: { loggedIn: false }
  116. };
  117. socket.dispatch("ready", readyData);
  118. socket.on("message", message => this.handleMessage(socket, message));
  119. }
  120. /**
  121. * handleMessage - Handle websocket message
  122. */
  123. private async handleMessage(socket: WebSocket, message: RawData) {
  124. if (this.jobQueue.getStatus().isPaused) {
  125. socket.close();
  126. return;
  127. }
  128. try {
  129. const data = JSON.parse(message.toString());
  130. if (!Array.isArray(data) || data.length < 1)
  131. throw new Error("Invalid request");
  132. const [moduleJob, payload, options] = data;
  133. const [moduleName, jobName] = moduleJob.split(".");
  134. const { CB_REF } = options ?? payload ?? {};
  135. await this.jobQueue
  136. .runJob(moduleName, jobName, payload)
  137. .then(res => socket.dispatch("CB_REF", CB_REF, res));
  138. } catch (error) {
  139. const message = error?.message ?? error;
  140. this.log({ type: "error", message });
  141. socket.dispatch("ERROR", error?.message ?? error);
  142. }
  143. }
  144. /**
  145. * shutdown - Shutdown websocket module
  146. */
  147. public override async shutdown() {
  148. await super.shutdown();
  149. if (this.httpServer) this.httpServer.close();
  150. if (this.wsServer) this.wsServer.close();
  151. }
  152. }
  153. export type WebSocketModuleJobs = {
  154. [Property in keyof UniqueMethods<WebSocketModule>]: {
  155. payload: Parameters<UniqueMethods<WebSocketModule>[Property]>[1];
  156. returns: Awaited<ReturnType<UniqueMethods<WebSocketModule>[Property]>>;
  157. };
  158. };