| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 | 
							- const moment = require('moment')
 
- const childProcess = require('child_process')
 
- const _ = require('lodash')
 
- const configHelper = require('../helpers/config')
 
- /* global WIKI */
 
- class Job {
 
-   constructor({
 
-     name,
 
-     immediate = false,
 
-     schedule = 'P1D',
 
-     repeat = false,
 
-     worker = false
 
-   }, queue) {
 
-     this.queue = queue
 
-     this.finished = Promise.resolve()
 
-     this.name = name
 
-     this.immediate = immediate
 
-     this.schedule = moment.duration(schedule)
 
-     this.repeat = repeat
 
-     this.worker = worker
 
-   }
 
-   /**
 
-    * Start Job
 
-    *
 
-    * @param {Object} data Job Data
 
-    */
 
-   start(data) {
 
-     this.queue.jobs.push(this)
 
-     if (this.immediate) {
 
-       this.invoke(data)
 
-     } else {
 
-       this.enqueue(data)
 
-     }
 
-   }
 
-   /**
 
-    * Queue the next job run according to the wait duration
 
-    *
 
-    * @param {Object} data Job Data
 
-    */
 
-   enqueue(data) {
 
-     this.timeout = setTimeout(this.invoke.bind(this), this.schedule.asMilliseconds(), data)
 
-   }
 
-   /**
 
-    * Run the actual job
 
-    *
 
-    * @param {Object} data Job Data
 
-    */
 
-   async invoke(data) {
 
-     try {
 
-       if (this.worker) {
 
-         const proc = childProcess.fork(`server/core/worker.js`, [
 
-           `--job=${this.name}`,
 
-           `--data=${data}`
 
-         ], {
 
-           cwd: WIKI.ROOTPATH,
 
-           stdio: ['inherit', 'inherit', 'pipe', 'ipc']
 
-         })
 
-         const stderr = []
 
-         proc.stderr.on('data', chunk => stderr.push(chunk))
 
-         this.finished = new Promise((resolve, reject) => {
 
-           proc.on('exit', (code, signal) => {
 
-             const data = Buffer.concat(stderr).toString()
 
-             if (code === 0) {
 
-               resolve(data)
 
-             } else {
 
-               const err = new Error(`Error when running job ${this.name}: ${data}`)
 
-               err.exitSignal = signal
 
-               err.exitCode = code
 
-               err.stderr = data
 
-               reject(err)
 
-             }
 
-             proc.kill()
 
-           })
 
-         })
 
-       } else {
 
-         this.finished = require(`../jobs/${this.name}`)(data)
 
-       }
 
-       await this.finished
 
-     } catch (err) {
 
-       WIKI.logger.warn(err)
 
-     }
 
-     if (this.repeat && this.queue.jobs.includes(this)) {
 
-       this.enqueue(data)
 
-     } else {
 
-       this.stop().catch(() => {})
 
-     }
 
-   }
 
-   /**
 
-    * Stop any future job invocation from occuring
 
-    */
 
-   async stop() {
 
-     clearTimeout(this.timeout)
 
-     this.queue.jobs = this.queue.jobs.filter(x => x !== this)
 
-     return this.finished
 
-   }
 
- }
 
- module.exports = {
 
-   jobs: [],
 
-   init() {
 
-     return this
 
-   },
 
-   start() {
 
-     _.forOwn(WIKI.data.jobs, (queueParams, queueName) => {
 
-       if (WIKI.config.offline && queueParams.offlineSkip) {
 
-         WIKI.logger.warn(`Skipping job ${queueName} because offline mode is enabled. [SKIPPED]`)
 
-         return
 
-       }
 
-       const schedule = (configHelper.isValidDurationString(queueParams.schedule)) ? queueParams.schedule : 'P1D'
 
-       this.registerJob({
 
-         name: _.kebabCase(queueName),
 
-         immediate: _.get(queueParams, 'onInit', false),
 
-         schedule: schedule,
 
-         repeat: _.get(queueParams, 'repeat', false),
 
-         worker: _.get(queueParams, 'worker', false)
 
-       })
 
-     })
 
-   },
 
-   registerJob(opts, data) {
 
-     const job = new Job(opts, this)
 
-     job.start(data)
 
-     return job
 
-   },
 
-   async stop() {
 
-     return Promise.all(this.jobs.map(job => job.stop()))
 
-   }
 
- }
 
 
  |