scheduler.js 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. const PgBoss = require('pg-boss')
  2. const { DynamicThreadPool } = require('poolifier')
  3. const os = require('node:os')
  4. /* global WIKI */
  5. module.exports = {
  6. pool: null,
  7. boss: null,
  8. maxWorkers: 1,
  9. async init () {
  10. WIKI.logger.info('Initializing Scheduler...')
  11. this.boss = new PgBoss({
  12. db: {
  13. close: () => Promise.resolve('ok'),
  14. executeSql: async (text, values) => {
  15. try {
  16. const resource = await WIKI.models.knex.client.pool.acquire().promise
  17. const res = await resource.query(text, values)
  18. WIKI.models.knex.client.pool.release(resource)
  19. return res
  20. } catch (err) {
  21. WIKI.logger.error('Failed to acquire DB connection during scheduler query execution.')
  22. WIKI.logger.error(err)
  23. }
  24. }
  25. },
  26. // ...WIKI.models.knex.client.connectionSettings,
  27. application_name: 'Wiki.js Scheduler',
  28. schema: WIKI.config.db.schemas.scheduler,
  29. uuid: 'v4',
  30. archiveCompletedAfterSeconds: 120,
  31. deleteAfterHours: 24
  32. })
  33. this.maxWorkers = WIKI.config.workers === 'auto' ? os.cpus().length : WIKI.config.workers
  34. WIKI.logger.info(`Initializing Worker Pool (Limit: ${this.maxWorkers})...`)
  35. this.pool = new DynamicThreadPool(1, this.maxWorkers, './server/worker.js', {
  36. errorHandler: (err) => WIKI.logger.warn(err),
  37. exitHandler: () => WIKI.logger.debug('A worker has gone offline.'),
  38. onlineHandler: () => WIKI.logger.debug('New worker is online.')
  39. })
  40. return this
  41. },
  42. async start () {
  43. WIKI.logger.info('Starting Scheduler...')
  44. await this.boss.start()
  45. this.boss.work('wk-*', {
  46. teamSize: this.maxWorkers,
  47. teamConcurrency: this.maxWorkers
  48. }, async job => {
  49. WIKI.logger.debug(`Starting job ${job.name}:${job.id}...`)
  50. try {
  51. const result = await this.pool.execute({
  52. id: job.id,
  53. name: job.name,
  54. data: job.data
  55. })
  56. WIKI.logger.debug(`Completed job ${job.name}:${job.id}.`)
  57. job.done(null, result)
  58. } catch (err) {
  59. WIKI.logger.warn(`Failed job ${job.name}:${job.id}): ${err.message}`)
  60. job.done(err)
  61. }
  62. this.boss.complete(job.id)
  63. })
  64. WIKI.logger.info('Scheduler: [ STARTED ]')
  65. },
  66. async stop () {
  67. WIKI.logger.info('Stopping Scheduler...')
  68. await this.boss.stop({ timeout: 5000 })
  69. await this.pool.destroy()
  70. WIKI.logger.info('Scheduler: [ STOPPED ]')
  71. },
  72. async registerScheduledJobs () {
  73. for (const [key, job] of Object.entries(WIKI.data.jobs)) {
  74. if (job.schedule) {
  75. WIKI.logger.debug(`Scheduling regular job ${key}...`)
  76. await this.boss.schedule(`wk-${key}`, job.schedule)
  77. }
  78. }
  79. }
  80. }