123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263 |
- const path = require('path')
- const Bull = require('bull')
- const Promise = require('bluebird')
- const _ = require('lodash')
- /* global WIKI */
- module.exports = {
- job: {},
- init() {
- _.forOwn(WIKI.data.jobs, (queueParams, queueName) => {
- this.job[queueName] = new Bull(queueName, {
- prefix: `queue`,
- redis: WIKI.config.redis
- })
- if (queueParams.concurrency > 0) {
- this.job[queueName].process(queueParams.concurrency, path.join(WIKI.SERVERPATH, `jobs/${_.kebabCase(queueName)}.js`))
- } else {
- this.job[queueName].process(path.join(WIKI.SERVERPATH, `jobs/${_.kebabCase(queueName)}.js`))
- }
- })
- return this
- },
- start() {
- _.forOwn(WIKI.data.jobs, (queueParams, queueName) => {
- if (queueParams.onInit) {
- this.job[queueName].add({}, {
- removeOnComplete: true
- })
- }
- if (queueParams.cron) {
- this.job[queueName].add({}, {
- repeat: { cron: queueParams.cron },
- removeOnComplete: true
- })
- }
- })
- },
- async quit() {
- for (const queueName in this.job) {
- await this.job[queueName].close()
- }
- },
- async clean() {
- return Promise.each(_.keys(WIKI.data.jobs), queueName => {
- return new Promise((resolve, reject) => {
- let keyStream = WIKI.redis.scanStream({
- match: `queue:${queueName}:*`
- })
- keyStream.on('data', resultKeys => {
- if (resultKeys.length > 0) {
- WIKI.redis.del(resultKeys)
- }
- })
- keyStream.on('end', resolve)
- })
- }).then(() => {
- WIKI.logger.info('Purging old queue jobs: [ OK ]')
- }).return(true).catch(err => {
- WIKI.logger.error(err)
- })
- }
- }
|