1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- const PgBoss = require('pg-boss')
- const { DynamicThreadPool } = require('poolifier')
- const os = require('node:os')
- /* global WIKI */
- module.exports = {
- pool: null,
- boss: null,
- maxWorkers: 1,
- async init () {
- WIKI.logger.info('Initializing Scheduler...')
- this.boss = new PgBoss({
- db: {
- close: () => Promise.resolve('ok'),
- executeSql: async (text, values) => {
- try {
- const resource = await WIKI.models.knex.client.pool.acquire().promise
- const res = await resource.query(text, values)
- WIKI.models.knex.client.pool.release(resource)
- return res
- } catch (err) {
- WIKI.logger.error('Failed to acquire DB connection during scheduler query execution.')
- WIKI.logger.error(err)
- }
- }
- },
- // ...WIKI.models.knex.client.connectionSettings,
- application_name: 'Wiki.js Scheduler',
- schema: WIKI.config.db.schemas.scheduler,
- uuid: 'v4',
- archiveCompletedAfterSeconds: 120,
- deleteAfterHours: 24
- })
- this.maxWorkers = WIKI.config.workers === 'auto' ? os.cpus().length : WIKI.config.workers
- WIKI.logger.info(`Initializing Worker Pool (Limit: ${this.maxWorkers})...`)
- this.pool = new DynamicThreadPool(1, this.maxWorkers, './server/worker.js', {
- errorHandler: (err) => WIKI.logger.warn(err),
- exitHandler: () => WIKI.logger.debug('A worker has gone offline.'),
- onlineHandler: () => WIKI.logger.debug('New worker is online.')
- })
- return this
- },
- async start () {
- WIKI.logger.info('Starting Scheduler...')
- await this.boss.start()
- this.boss.work('wk-*', {
- teamSize: this.maxWorkers,
- teamConcurrency: this.maxWorkers
- }, async job => {
- WIKI.logger.debug(`Starting job ${job.name}:${job.id}...`)
- try {
- const result = await this.pool.execute({
- id: job.id,
- name: job.name,
- data: job.data
- })
- WIKI.logger.debug(`Completed job ${job.name}:${job.id}.`)
- job.done(null, result)
- } catch (err) {
- WIKI.logger.warn(`Failed job ${job.name}:${job.id}): ${err.message}`)
- job.done(err)
- }
- this.boss.complete(job.id)
- })
- WIKI.logger.info('Scheduler: [ STARTED ]')
- },
- async stop () {
- WIKI.logger.info('Stopping Scheduler...')
- await this.boss.stop({ timeout: 5000 })
- await this.pool.destroy()
- WIKI.logger.info('Scheduler: [ STOPPED ]')
- },
- async registerScheduledJobs () {
- for (const [key, job] of Object.entries(WIKI.data.jobs)) {
- if (job.schedule) {
- WIKI.logger.debug(`Scheduling regular job ${key}...`)
- await this.boss.schedule(`wk-${key}`, job.schedule)
- }
- }
- }
- }
|