scheduler.js 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. const moment = require('moment')
  2. const childProcess = require('child_process')
  3. const _ = require('lodash')
  4. const configHelper = require('../helpers/config')
  5. /* global WIKI */
  6. class Job {
  7. constructor({
  8. name,
  9. immediate = false,
  10. schedule = 'P1D',
  11. repeat = false,
  12. worker = false
  13. }) {
  14. this.finished = Promise.resolve()
  15. this.name = name
  16. this.immediate = immediate
  17. this.schedule = moment.duration(schedule)
  18. this.repeat = repeat
  19. this.worker = worker
  20. }
  21. /**
  22. * Start Job
  23. *
  24. * @param {Object} data Job Data
  25. */
  26. start(data) {
  27. if (this.immediate) {
  28. this.invoke(data)
  29. } else {
  30. this.queue(data)
  31. }
  32. }
  33. /**
  34. * Queue the next job run according to the wait duration
  35. *
  36. * @param {Object} data Job Data
  37. */
  38. queue(data) {
  39. this.timeout = setTimeout(this.invoke.bind(this), this.schedule.asMilliseconds(), data)
  40. }
  41. /**
  42. * Run the actual job
  43. *
  44. * @param {Object} data Job Data
  45. */
  46. async invoke(data) {
  47. try {
  48. if (this.worker) {
  49. const proc = childProcess.fork(`server/core/worker.js`, [
  50. `--job=${this.name}`,
  51. `--data=${data}`
  52. ], {
  53. cwd: WIKI.ROOTPATH,
  54. stdio: ['inherit', 'inherit', 'pipe', 'ipc']
  55. })
  56. const stderr = [];
  57. proc.stderr.on('data', chunk => stderr.push(chunk))
  58. this.finished = new Promise((resolve, reject) => {
  59. proc.on('exit', (code, signal) => {
  60. const data = Buffer.concat(stderr).toString()
  61. if (code === 0) {
  62. resolve(data)
  63. } else {
  64. const err = new Error(`Error when running job ${this.name}: ${data}`)
  65. err.exitSignal = signal
  66. err.exitCode = code
  67. err.stderr = data
  68. reject(err)
  69. }
  70. proc.kill()
  71. })
  72. })
  73. } else {
  74. this.finished = require(`../jobs/${this.name}`)(data)
  75. }
  76. await this.finished
  77. } catch (err) {
  78. WIKI.logger.warn(err)
  79. }
  80. if (this.repeat) {
  81. this.queue(data)
  82. }
  83. }
  84. /**
  85. * Stop any future job invocation from occuring
  86. */
  87. stop() {
  88. clearTimeout(this.timeout)
  89. }
  90. }
  91. module.exports = {
  92. jobs: [],
  93. init() {
  94. return this
  95. },
  96. start() {
  97. _.forOwn(WIKI.data.jobs, (queueParams, queueName) => {
  98. if (WIKI.config.offline && queueParams.offlineSkip) {
  99. WIKI.logger.warn(`Skipping job ${queueName} because offline mode is enabled. [SKIPPED]`)
  100. return
  101. }
  102. const schedule = (configHelper.isValidDurationString(queueParams.schedule)) ? queueParams.schedule : 'P1D'
  103. this.registerJob({
  104. name: _.kebabCase(queueName),
  105. immediate: _.get(queueParams, 'onInit', false),
  106. schedule: schedule,
  107. repeat: _.get(queueParams, 'repeat', false),
  108. worker: _.get(queueParams, 'worker', false)
  109. })
  110. })
  111. },
  112. registerJob(opts, data) {
  113. const job = new Job(opts)
  114. job.start(data)
  115. if (job.repeat) {
  116. this.jobs.push(job)
  117. }
  118. return job
  119. },
  120. stop() {
  121. this.jobs.forEach(job => {
  122. job.stop()
  123. })
  124. }
  125. }