scheduler.js 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. const { DynamicThreadPool } = require('poolifier')
  2. const os = require('node:os')
  3. const { setTimeout } = require('node:timers/promises')
  4. module.exports = {
  5. pool: null,
  6. maxWorkers: 1,
  7. activeWorkers: 0,
  8. async init () {
  9. this.maxWorkers = WIKI.config.scheduler.workers === 'auto' ? os.cpus().length : WIKI.config.scheduler.workers
  10. WIKI.logger.info(`Initializing Worker Pool (Limit: ${this.maxWorkers})...`)
  11. this.pool = new DynamicThreadPool(1, this.maxWorkers, './server/worker.js', {
  12. errorHandler: (err) => WIKI.logger.warn(err),
  13. exitHandler: () => WIKI.logger.debug('A worker has gone offline.'),
  14. onlineHandler: () => WIKI.logger.debug('New worker is online.')
  15. })
  16. return this
  17. },
  18. async start () {
  19. WIKI.logger.info('Starting Scheduler...')
  20. WIKI.db.listener.addChannel('scheduler', payload => {
  21. switch (payload.event) {
  22. case 'newJob': {
  23. if (this.activeWorkers < this.maxWorkers) {
  24. this.activeWorkers++
  25. this.processJob()
  26. }
  27. break
  28. }
  29. }
  30. })
  31. // await WIKI.db.knex('jobs').insert({
  32. // task: 'test',
  33. // payload: { foo: 'bar' }
  34. // })
  35. // WIKI.db.listener.publish('scheduler', {
  36. // source: WIKI.INSTANCE_ID,
  37. // event: 'newJob'
  38. // })
  39. WIKI.logger.info('Scheduler: [ STARTED ]')
  40. },
  41. async processJob () {
  42. try {
  43. await WIKI.db.knex.transaction(async trx => {
  44. const jobs = await trx('jobs')
  45. .where('id', WIKI.db.knex.raw('(SELECT id FROM jobs ORDER BY id FOR UPDATE SKIP LOCKED LIMIT 1)'))
  46. .returning('*')
  47. .del()
  48. if (jobs && jobs.length === 1) {
  49. const job = jobs[0]
  50. WIKI.logger.info(`Processing new job ${job.id}: ${job.task}...`)
  51. if (job.useWorker) {
  52. await this.pool.execute({
  53. id: job.id,
  54. name: job.task,
  55. data: job.payload
  56. })
  57. } else {
  58. }
  59. }
  60. })
  61. } catch (err) {
  62. WIKI.logger.warn(err)
  63. }
  64. },
  65. async stop () {
  66. WIKI.logger.info('Stopping Scheduler...')
  67. await this.pool.stop()
  68. WIKI.logger.info('Scheduler: [ STOPPED ]')
  69. }
  70. }