WebSocketModule.ts 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. import config from "config";
  2. import express from "express";
  3. import http, { Server, IncomingMessage } from "node:http";
  4. import { RawData, WebSocketServer } from "ws";
  5. import { Types } from "mongoose";
  6. import BaseModule from "../BaseModule";
  7. import { UniqueMethods } from "../types/Modules";
  8. import WebSocket from "../WebSocket";
  9. import JobContext from "../JobContext";
  10. import Job from "../Job";
  11. export default class WebSocketModule extends BaseModule {
  12. private httpServer?: Server;
  13. private wsServer?: WebSocketServer;
  14. private keepAliveInterval?: NodeJS.Timer;
  15. /**
  16. * WebSocket Module
  17. */
  18. public constructor() {
  19. super("websocket");
  20. }
  21. /**
  22. * startup - Startup websocket module
  23. */
  24. public override async startup() {
  25. await super.startup();
  26. this.httpServer = http
  27. .createServer(express())
  28. .listen(config.get("port"));
  29. this.wsServer = new WebSocketServer({
  30. server: this.httpServer,
  31. path: "/ws",
  32. WebSocket
  33. });
  34. this.wsServer.on(
  35. "connection",
  36. (socket: WebSocket, request: IncomingMessage) =>
  37. this.handleConnection(socket, request)
  38. );
  39. this.keepAliveInterval = setInterval(() => this.keepAlive(), 45000);
  40. this.wsServer.on("close", async () =>
  41. clearInterval(this.keepAliveInterval)
  42. );
  43. await super.started();
  44. }
  45. /**
  46. * keepAlive - Ping open clients and terminate closed
  47. */
  48. private async keepAlive() {
  49. if (!this.wsServer) return;
  50. for await (const clients of this.wsServer.clients.entries()) {
  51. await Promise.all(
  52. clients.map(async socket => {
  53. switch (socket.readyState) {
  54. case socket.OPEN:
  55. socket.ping();
  56. break;
  57. case socket.CLOSED:
  58. socket.terminate();
  59. break;
  60. default:
  61. break;
  62. }
  63. })
  64. );
  65. }
  66. }
  67. /**
  68. * handleConnection - Handle websocket connection
  69. */
  70. private async handleConnection(
  71. socket: WebSocket,
  72. request: IncomingMessage
  73. ) {
  74. if (this.jobQueue.getStatus().isPaused) {
  75. socket.close();
  76. return;
  77. }
  78. const readyData = await new Job("prepareWebsocket", "api", {
  79. socket,
  80. request
  81. }).execute();
  82. socket.log({
  83. type: "debug",
  84. message: `WebSocket opened #${socket.getSocketId()}`
  85. });
  86. socket.on("error", error =>
  87. socket.log({
  88. type: "error",
  89. message: error.message,
  90. data: { error }
  91. })
  92. );
  93. socket.on("close", async () => {
  94. socket.log({
  95. type: "debug",
  96. message: `WebSocket closed #${socket.getSocketId()}`
  97. });
  98. });
  99. socket.dispatch("ready", readyData);
  100. socket.on("message", message => this.handleMessage(socket, message));
  101. }
  102. /**
  103. * handleMessage - Handle websocket message
  104. */
  105. private async handleMessage(socket: WebSocket, message: RawData) {
  106. if (this.jobQueue.getStatus().isPaused) {
  107. socket.close();
  108. return;
  109. }
  110. try {
  111. const data = JSON.parse(message.toString());
  112. if (!Array.isArray(data) || data.length < 1)
  113. throw new Error("Invalid request");
  114. const [moduleJob, payload, options] = data;
  115. const [moduleName, jobName] = moduleJob.split(".");
  116. const { CB_REF } = options ?? payload ?? {};
  117. const res = await this.jobQueue.runJob("api", "runJob", {
  118. moduleName,
  119. jobName,
  120. payload,
  121. sessionId: socket.getSessionId(),
  122. socketId: socket.getSocketId()
  123. });
  124. socket.dispatch("CB_REF", CB_REF, res);
  125. } catch (error) {
  126. const message = error?.message ?? error;
  127. this.log({ type: "error", message });
  128. socket.dispatch("ERROR", error?.message ?? error);
  129. }
  130. }
  131. /**
  132. * getSockets - Get websocket clients
  133. */
  134. public async getSockets(context: JobContext) {
  135. return this.wsServer?.clients;
  136. }
  137. /**
  138. * getSocket - Get websocket client
  139. */
  140. public async getSocket(
  141. context: JobContext,
  142. {
  143. socketId,
  144. sessionId
  145. }: { socketId?: string; sessionId?: Types.ObjectId }
  146. ) {
  147. if (!this.wsServer) return null;
  148. for (const clients of this.wsServer.clients.entries() as IterableIterator<
  149. [WebSocket, WebSocket]
  150. >) {
  151. const socket = clients.find(socket => {
  152. if (socket.getSocketId() === socketId) return true;
  153. if (socket.getSessionId() === sessionId) return true;
  154. return false;
  155. });
  156. if (socket) return socket;
  157. }
  158. return null;
  159. }
  160. /**
  161. * shutdown - Shutdown websocket module
  162. */
  163. public override async shutdown() {
  164. await super.shutdown();
  165. if (this.httpServer) this.httpServer.close();
  166. if (this.wsServer) this.wsServer.close();
  167. await this.stopped();
  168. }
  169. }
  170. export type WebSocketModuleJobs = {
  171. [Property in keyof UniqueMethods<WebSocketModule>]: {
  172. payload: Parameters<UniqueMethods<WebSocketModule>[Property]>[1];
  173. returns: Awaited<ReturnType<UniqueMethods<WebSocketModule>[Property]>>;
  174. };
  175. };