scheduler.js 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. const { run, parseCronItems, Logger } = require('graphile-worker')
  2. const { Pool } = require('pg')
  3. const { DynamicThreadPool } = require('poolifier')
  4. const { v4: uuid } = require('uuid')
  5. const os = require('node:os')
  6. const path = require('node:path')
  7. module.exports = {
  8. pool: null,
  9. runner: null,
  10. maxWorkers: 1,
  11. async init () {
  12. this.maxWorkers = WIKI.config.scheduler.workers === 'auto' ? os.cpus().length : WIKI.config.scheduler.workers
  13. WIKI.logger.info(`Initializing Worker Pool (Limit: ${this.maxWorkers})...`)
  14. this.pool = new DynamicThreadPool(1, this.maxWorkers, './server/worker.js', {
  15. errorHandler: (err) => WIKI.logger.warn(err),
  16. exitHandler: () => WIKI.logger.debug('A worker has gone offline.'),
  17. onlineHandler: () => WIKI.logger.debug('New worker is online.')
  18. })
  19. return this
  20. },
  21. async start () {
  22. WIKI.logger.info('Starting Scheduler...')
  23. this.runner = await run({
  24. pgPool: new Pool({
  25. ...(typeof WIKI.models.config === 'string') ? {
  26. connectionString: WIKI.models.config
  27. } : WIKI.models.config,
  28. max: this.maxWorkers + 2
  29. }),
  30. schema: WIKI.config.db.schemas.scheduler,
  31. concurrency: this.maxWorkers,
  32. noHandleSignals: true,
  33. logger: new Logger(scope => {
  34. return (level, message, meta) => {
  35. const prefix = (scope?.workerId) ? `[${scope.workerId}] ` : ''
  36. WIKI.logger[level](`${prefix}${message}`, meta)
  37. }
  38. }),
  39. parsedCronItems: parseCronItems(WIKI.data.jobs.map(j => ({
  40. ...j,
  41. identifier: uuid()
  42. }))),
  43. taskList: {
  44. simple: async (payload, helpers) => {
  45. // TODO: Handle task
  46. },
  47. background: async (payload, helpers) => {
  48. try {
  49. await this.pool.execute({
  50. id: helpers.job.id,
  51. name: payload.name,
  52. data: payload.data
  53. })
  54. } catch (err) {
  55. helpers.logger.warn(`Failed job: ${err.message}`)
  56. throw err
  57. }
  58. }
  59. }
  60. })
  61. WIKI.logger.info('Scheduler: [ STARTED ]')
  62. },
  63. async stop () {
  64. WIKI.logger.info('Stopping Scheduler...')
  65. await this.runner.stop()
  66. WIKI.logger.info('Scheduler: [ STOPPED ]')
  67. }
  68. }