123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- const { DynamicThreadPool } = require('poolifier')
- const os = require('node:os')
- const autoload = require('auto-load')
- const path = require('node:path')
- const cronparser = require('cron-parser')
- const { DateTime } = require('luxon')
- module.exports = {
- workerPool: null,
- maxWorkers: 1,
- activeWorkers: 0,
- pollingRef: null,
- scheduledRef: null,
- tasks: null,
- async init () {
- this.maxWorkers = WIKI.config.scheduler.workers === 'auto' ? os.cpus().length : WIKI.config.scheduler.workers
- WIKI.logger.info(`Initializing Worker Pool (Limit: ${this.maxWorkers})...`)
- this.workerPool = new DynamicThreadPool(1, this.maxWorkers, './server/worker.js', {
- errorHandler: (err) => WIKI.logger.warn(err),
- exitHandler: () => WIKI.logger.debug('A worker has gone offline.'),
- onlineHandler: () => WIKI.logger.debug('New worker is online.')
- })
- this.tasks = autoload(path.join(WIKI.SERVERPATH, 'tasks/simple'))
- return this
- },
- async start () {
- WIKI.logger.info('Starting Scheduler...')
- // -> Add PostgreSQL Sub Channel
- WIKI.db.listener.addChannel('scheduler', async payload => {
- switch (payload.event) {
- case 'newJob': {
- if (this.activeWorkers < this.maxWorkers) {
- this.activeWorkers++
- await this.processJob()
- this.activeWorkers--
- }
- break
- }
- }
- })
- // -> Start scheduled jobs check
- this.scheduledRef = setInterval(async () => {
- this.addScheduled()
- }, WIKI.config.scheduler.scheduledCheck * 1000)
- // -> Add scheduled jobs on init
- await this.addScheduled()
- // -> Start job polling
- this.pollingRef = setInterval(async () => {
- this.processJob()
- }, WIKI.config.scheduler.pollingCheck * 1000)
- WIKI.logger.info('Scheduler: [ STARTED ]')
- },
- async addJob ({ task, payload, waitUntil, maxRetries, isScheduled = false, notify = true }) {
- try {
- await WIKI.db.knex('jobs').insert({
- task,
- useWorker: !(typeof this.tasks[task] === 'function'),
- payload,
- maxRetries: maxRetries ?? WIKI.config.scheduler.maxRetries,
- isScheduled,
- waitUntil,
- createdBy: WIKI.INSTANCE_ID
- })
- if (notify) {
- WIKI.db.listener.publish('scheduler', {
- source: WIKI.INSTANCE_ID,
- event: 'newJob'
- })
- }
- } catch (err) {
- WIKI.logger.warn(`Failed to add job to scheduler: ${err.message}`)
- }
- },
- async processJob () {
- let jobId = null
- try {
- await WIKI.db.knex.transaction(async trx => {
- const jobs = await trx('jobs')
- .where('id', WIKI.db.knex.raw('(SELECT id FROM jobs WHERE ("waitUntil" IS NULL OR "waitUntil" <= NOW()) ORDER BY id FOR UPDATE SKIP LOCKED LIMIT 1)'))
- .returning('*')
- .del()
- if (jobs && jobs.length === 1) {
- const job = jobs[0]
- WIKI.logger.info(`Processing new job ${job.id}: ${job.task}...`)
- jobId = job.id
- // -> Add to Job History
- await WIKI.db.knex('jobHistory').insert({
- id: job.id,
- task: job.task,
- state: 'active',
- useWorker: job.useWorker,
- wasScheduled: job.isScheduled,
- payload: job.payload,
- attempt: job.retries + 1,
- maxRetries: job.maxRetries,
- createdAt: job.createdAt
- }).onConflict('id').merge({
- startedAt: new Date()
- })
- // -> Start working on it
- try {
- if (job.useWorker) {
- await this.workerPool.execute({
- id: job.id,
- name: job.task,
- data: job.payload
- })
- } else {
- await this.tasks[job.task](job.payload)
- }
- // -> Update job history (success)
- await WIKI.db.knex('jobHistory').where({
- id: job.id
- }).update({
- state: 'completed',
- completedAt: new Date()
- })
- WIKI.logger.info(`Completed job ${job.id}: ${job.task} [ SUCCESS ]`)
- } catch (err) {
- WIKI.logger.warn(`Failed to complete job ${job.id}: ${job.task} [ FAILED ]`)
- WIKI.logger.warn(err)
- // -> Update job history (fail)
- await WIKI.db.knex('jobHistory').where({
- id: job.id
- }).update({
- state: 'failed',
- lastErrorMessage: err.message
- })
- // -> Reschedule for retry
- if (job.retries < job.maxRetries) {
- const backoffDelay = (2 ** job.retries) * WIKI.config.scheduler.retryBackoff
- await trx('jobs').insert({
- ...job,
- retries: job.retries + 1,
- waitUntil: DateTime.utc().plus({ seconds: backoffDelay }).toJSDate(),
- updatedAt: new Date()
- })
- WIKI.logger.warn(`Rescheduling new attempt for job ${job.id}: ${job.task}...`)
- }
- }
- }
- })
- } catch (err) {
- WIKI.logger.warn(err)
- if (jobId) {
- WIKI.db.knex('jobHistory').where({
- id: jobId
- }).update({
- state: 'interrupted',
- lastErrorMessage: err.message
- })
- }
- }
- },
- async addScheduled () {
- try {
- await WIKI.db.knex.transaction(async trx => {
- // -> Acquire lock
- const jobLock = await trx('jobLock')
- .where(
- 'key',
- WIKI.db.knex('jobLock')
- .select('key')
- .where('key', 'cron')
- .andWhere('lastCheckedAt', '<=', DateTime.utc().minus({ minutes: 5 }).toISO())
- .forUpdate()
- .skipLocked()
- .limit(1)
- ).update({
- lastCheckedBy: WIKI.INSTANCE_ID,
- lastCheckedAt: DateTime.utc().toISO()
- })
- if (jobLock > 0) {
- WIKI.logger.info(`Scheduling future planned jobs...`)
- const scheduledJobs = await WIKI.db.knex('jobSchedule')
- if (scheduledJobs?.length > 0) {
- // -> Get existing scheduled jobs
- const existingJobs = await WIKI.db.knex('jobs').where('isScheduled', true)
- for (const job of scheduledJobs) {
- // -> Get next planned iterations
- const plannedIterations = cronparser.parseExpression(job.cron, {
- startDate: DateTime.utc().toJSDate(),
- endDate: DateTime.utc().plus({ days: 1, minutes: 5 }).toJSDate(),
- iterator: true,
- tz: 'UTC'
- })
- // -> Add a maximum of 10 future iterations for a single task
- let addedFutureJobs = 0
- while (true) {
- try {
- const next = plannedIterations.next()
- // -> Ensure this iteration isn't already scheduled
- if (!existingJobs.some(j => j.task === job.task && j.waitUntil.getTime() === next.value.getTime())) {
- this.addJob({
- task: job.task,
- useWorker: !(typeof this.tasks[job.task] === 'function'),
- payload: job.payload,
- isScheduled: true,
- waitUntil: next.value.toISOString(),
- notify: false
- })
- addedFutureJobs++
- }
- // -> No more iterations for this period or max iterations count reached
- if (next.done || addedFutureJobs >= 10) { break }
- } catch (err) {
- break
- }
- }
- }
- }
- }
- })
- } catch (err) {
- WIKI.logger.warn(err)
- }
- },
- async stop () {
- WIKI.logger.info('Stopping Scheduler...')
- clearInterval(this.scheduledRef)
- clearInterval(this.pollingRef)
- await this.workerPool.destroy()
- WIKI.logger.info('Scheduler: [ STOPPED ]')
- }
- }
|