WebSocketModule.ts 5.5 KB


  1. import config from "config";
  2. import express from "express";
  3. import http, { Server, IncomingMessage } from "node:http";
  4. import { RawData, WebSocketServer } from "ws";
  5. import { Types } from "mongoose";
  6. import BaseModule from "@/BaseModule";
  7. import { UniqueMethods } from "@/types/Modules";
  8. import WebSocket from "@/WebSocket";
  9. import JobContext from "@/JobContext";
  10. import Job from "@/Job";
  11. import ModuleManager from "@/ModuleManager";
  12. import JobQueue from "@/JobQueue";
  13. export class WebSocketModule extends BaseModule {
  14. private _httpServer?: Server;
  15. private _wsServer?: WebSocketServer;
  16. private _keepAliveInterval?: NodeJS.Timer;
  17. /**
  18. * WebSocket Module
  19. */
  20. public constructor() {
  21. super("websocket");
  22. this._jobConfigDefault = false;
  23. this._jobConfig = {
  24. getSocket: "disabled",
  25. getSockets: "disabled"
  26. };
  27. }
  28. /**
  29. * startup - Startup websocket module
  30. */
  31. public override async startup() {
  32. await super.startup();
  33. this._httpServer = http
  34. .createServer(express())
  35. .listen(config.get("port"));
  36. this._wsServer = new WebSocketServer({
  37. server: this._httpServer,
  38. path: "/ws",
  39. WebSocket
  40. });
  41. this._wsServer.on(
  42. "connection",
  43. (socket: WebSocket, request: IncomingMessage) =>
  44. this._handleConnection(socket, request)
  45. );
  46. this._keepAliveInterval = setInterval(() => this._keepAlive(), 45000);
  47. this._wsServer.on("close", async () =>
  48. clearInterval(this._keepAliveInterval)
  49. );
  50. await super._started();
  51. }
  52. /**
  53. * keepAlive - Ping open clients and terminate closed
  54. */
  55. private async _keepAlive() {
  56. if (!this._wsServer) return;
  57. for await (const clients of this._wsServer.clients.entries()) {
  58. await Promise.all(
  59. clients.map(async socket => {
  60. switch (socket.readyState) {
  61. case socket.OPEN:
  62. socket.ping();
  63. break;
  64. case socket.CLOSED:
  65. socket.terminate();
  66. break;
  67. default:
  68. break;
  69. }
  70. })
  71. );
  72. }
  73. }
  74. /**
  75. * handleConnection - Handle websocket connection
  76. */
  77. private async _handleConnection(
  78. socket: WebSocket,
  79. request: IncomingMessage
  80. ) {
  81. if (JobQueue.getStatus().isPaused) {
  82. socket.close();
  83. return;
  84. }
  85. const readyData = await new Job("prepareWebsocket", "api", {
  86. socket,
  87. request
  88. }).execute();
  89. socket.log({
  90. type: "debug",
  91. message: `WebSocket opened #${socket.getSocketId()}`
  92. });
  93. socket.on("error", error =>
  94. socket.log({
  95. type: "error",
  96. message: error.message,
  97. data: { error }
  98. })
  99. );
  100. socket.on("close", async () => {
  101. socket.log({
  102. type: "debug",
  103. message: `WebSocket closed #${socket.getSocketId()}`
  104. });
  105. });
  106. socket.dispatch("ready", readyData);
  107. socket.on("message", message => this._handleMessage(socket, message));
  108. }
  109. /**
  110. * handleMessage - Handle websocket message
  111. */
  112. private async _handleMessage(socket: WebSocket, message: RawData) {
  113. if (JobQueue.getStatus().isPaused) {
  114. socket.close();
  115. return;
  116. }
  117. let callbackRef;
  118. try {
  119. const data = JSON.parse(message.toString());
  120. if (!Array.isArray(data) || data.length < 1)
  121. throw new Error("Invalid request");
  122. const [moduleJob, payload, options] = data;
  123. const [moduleName, ...jobNameParts] = moduleJob.split(".");
  124. const jobName = jobNameParts.join(".");
  125. const { callbackRef } = options ?? payload ?? {};
  126. if (!callbackRef)
  127. throw new Error(
  128. `No callback reference provided for job ${moduleJob}`
  129. );
  130. const module = ModuleManager.getModule(moduleName);
  131. if (!module) throw new Error(`Module "${moduleName}" not found`);
  132. const job = module.getJob(jobName);
  133. if (!job.api) throw new Error(`Job "${jobName}" not found.`);
  134. const res = await JobQueue.runJob("api", "runJob", {
  135. moduleName,
  136. jobName,
  137. payload,
  138. sessionId: socket.getSessionId(),
  139. socketId: socket.getSocketId()
  140. });
  141. socket.dispatch("jobCallback", callbackRef, {
  142. status: "success",
  143. data: res
  144. });
  145. } catch (error) {
  146. const message = error?.message ?? error;
  147. if (callbackRef)
  148. socket.dispatch("jobCallback", callbackRef, {
  149. status: "error",
  150. message
  151. });
  152. else socket.dispatch("error", message);
  153. }
  154. }
  155. /**
  156. * getSockets - Get websocket clients
  157. */
  158. public async getSockets() {
  159. return this._wsServer?.clients;
  160. }
  161. /**
  162. * getSocket - Get websocket client
  163. */
  164. public async getSocket(socketId?: string, sessionId?: Types.ObjectId) {
  165. if (!this._wsServer) return null;
  166. for (const clients of this._wsServer.clients.entries() as IterableIterator<
  167. [WebSocket, WebSocket]
  168. >) {
  169. const socket = clients.find(socket => {
  170. if (socket.getSocketId() === socketId) return true;
  171. if (socket.getSessionId() === sessionId) return true;
  172. return false;
  173. });
  174. if (socket) return socket;
  175. }
  176. return null;
  177. }
  178. /**
  179. * dispatch - Dispatch message to socket
  180. */
  181. public async dispatch(
  182. context: JobContext,
  183. {
  184. socketId,
  185. channel,
  186. value
  187. }: { socketId: string; channel: string; value?: any }
  188. ) {
  189. const socket = await this.getSocket(socketId);
  190. if (!socket) return;
  191. const values = Array.isArray(value) ? value : [value];
  192. socket.dispatch(channel, ...values);
  193. }
  194. /**
  195. * shutdown - Shutdown websocket module
  196. */
  197. public override async shutdown() {
  198. await super.shutdown();
  199. if (this._httpServer) this._httpServer.close();
  200. if (this._wsServer) this._wsServer.close();
  201. await this._stopped();
  202. }
  203. }
  204. export type WebSocketModuleJobs = {
  205. [Property in keyof UniqueMethods<WebSocketModule>]: {
  206. payload: Parameters<UniqueMethods<WebSocketModule>[Property]>[1];
  207. returns: Awaited<ReturnType<UniqueMethods<WebSocketModule>[Property]>>;
  208. };
  209. };
  210. export default new WebSocketModule();