scheduler.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. const { DynamicThreadPool } = require('poolifier')
  2. const os = require('node:os')
  3. const autoload = require('auto-load')
  4. const path = require('node:path')
  5. const cronparser = require('cron-parser')
  6. const { DateTime } = require('luxon')
  7. const { v4: uuid } = require('uuid')
  8. const { createDeferred } = require('../helpers/common')
  9. const _ = require('lodash')
  10. module.exports = {
  11. workerPool: null,
  12. maxWorkers: 1,
  13. activeWorkers: 0,
  14. pollingRef: null,
  15. scheduledRef: null,
  16. tasks: null,
  17. completionPromises: [],
  18. async init () {
  19. this.maxWorkers = WIKI.config.scheduler.workers === 'auto' ? (os.cpus().length - 1) : WIKI.config.scheduler.workers
  20. if (this.maxWorkers < 1) { this.maxWorkers = 1 }
  21. WIKI.logger.info(`Initializing Worker Pool (Limit: ${this.maxWorkers})...`)
  22. this.workerPool = new DynamicThreadPool(1, this.maxWorkers, './server/worker.js', {
  23. errorHandler: (err) => WIKI.logger.warn(err),
  24. exitHandler: () => WIKI.logger.debug('A worker has gone offline.'),
  25. onlineHandler: () => WIKI.logger.debug('New worker is online.')
  26. })
  27. this.tasks = autoload(path.join(WIKI.SERVERPATH, 'tasks/simple'))
  28. return this
  29. },
  30. async start () {
  31. WIKI.logger.info('Starting Scheduler...')
  32. // -> Add PostgreSQL Sub Channel
  33. WIKI.db.listener.addChannel('scheduler', async payload => {
  34. switch (payload.event) {
  35. case 'newJob': {
  36. if (this.activeWorkers < this.maxWorkers) {
  37. this.activeWorkers++
  38. await this.processJob()
  39. this.activeWorkers--
  40. }
  41. break
  42. }
  43. case 'jobCompleted': {
  44. const jobPromise = _.find(this.completionPromises, ['id', payload.id])
  45. if (jobPromise) {
  46. if (payload.state === 'success') {
  47. jobPromise.resolve()
  48. } else {
  49. jobPromise.reject(new Error(payload.errorMessage))
  50. }
  51. setTimeout(() => {
  52. _.remove(this.completionPromises, ['id', payload.id])
  53. })
  54. }
  55. break
  56. }
  57. }
  58. })
  59. // -> Start scheduled jobs check
  60. this.scheduledRef = setInterval(async () => {
  61. this.addScheduled()
  62. }, WIKI.config.scheduler.scheduledCheck * 1000)
  63. // -> Add scheduled jobs on init
  64. await this.addScheduled()
  65. // -> Start job polling
  66. this.pollingRef = setInterval(async () => {
  67. this.processJob()
  68. }, WIKI.config.scheduler.pollingCheck * 1000)
  69. WIKI.logger.info('Scheduler: [ STARTED ]')
  70. },
  71. /**
  72. * Add a job to the scheduler
  73. * @param {Object} opts - Job options
  74. * @param {string} opts.task - The task name to execute.
  75. * @param {Object} [opts.payload={}] - An optional data object to pass to the job.
  76. * @param {Date} [opts.waitUntil] - An optional datetime after which the task is allowed to run.
  77. * @param {Number} [opts.maxRetries] - The number of times this job can be restarted upon failure. Uses server defaults if not provided.
  78. * @param {Boolean} [opts.isScheduled=false] - Whether this is a scheduled job.
  79. * @param {Boolean} [opts.notify=true] - Whether to notify all instances that a new job is available.
  80. * @param {Boolean} [opts.promise=false] - Whether to return a promise property that resolves when the job completes.
  81. * @returns {Promise}
  82. */
  83. async addJob ({ task, payload = {}, waitUntil, maxRetries, isScheduled = false, notify = true, promise = false }) {
  84. try {
  85. const jobId = uuid()
  86. const jobDefer = createDeferred()
  87. if (promise) {
  88. this.completionPromises.push({
  89. id: jobId,
  90. added: DateTime.utc(),
  91. resolve: jobDefer.resolve,
  92. reject: jobDefer.reject
  93. })
  94. }
  95. await WIKI.db.knex('jobs')
  96. .insert({
  97. id: jobId,
  98. task,
  99. useWorker: !(typeof this.tasks[task] === 'function'),
  100. payload,
  101. maxRetries: maxRetries ?? WIKI.config.scheduler.maxRetries,
  102. isScheduled,
  103. waitUntil,
  104. createdBy: WIKI.INSTANCE_ID
  105. })
  106. if (notify) {
  107. WIKI.db.listener.publish('scheduler', {
  108. source: WIKI.INSTANCE_ID,
  109. event: 'newJob',
  110. id: jobId
  111. })
  112. }
  113. return {
  114. id: jobId,
  115. ...promise && { promise: jobDefer.promise }
  116. }
  117. } catch (err) {
  118. WIKI.logger.warn(`Failed to add job to scheduler: ${err.message}`)
  119. }
  120. },
  121. async processJob () {
  122. let jobIds = []
  123. try {
  124. const availableWorkers = this.maxWorkers - this.activeWorkers
  125. if (availableWorkers < 1) {
  126. WIKI.logger.debug('All workers are busy. Cannot process more jobs at the moment.')
  127. return
  128. }
  129. await WIKI.db.knex.transaction(async trx => {
  130. const jobs = await trx('jobs')
  131. .whereIn('id', WIKI.db.knex.raw(`(SELECT id FROM jobs WHERE ("waitUntil" IS NULL OR "waitUntil" <= NOW()) ORDER BY id FOR UPDATE SKIP LOCKED LIMIT ${availableWorkers})`))
  132. .returning('*')
  133. .del()
  134. if (jobs && jobs.length > 0) {
  135. for (const job of jobs) {
  136. WIKI.logger.info(`Processing new job ${job.id}: ${job.task}...`)
  137. // -> Add to Job History
  138. await WIKI.db.knex('jobHistory').insert({
  139. id: job.id,
  140. task: job.task,
  141. state: 'active',
  142. useWorker: job.useWorker,
  143. wasScheduled: job.isScheduled,
  144. payload: job.payload,
  145. attempt: job.retries + 1,
  146. maxRetries: job.maxRetries,
  147. executedBy: WIKI.INSTANCE_ID,
  148. createdAt: job.createdAt
  149. }).onConflict('id').merge({
  150. executedBy: WIKI.INSTANCE_ID,
  151. startedAt: new Date()
  152. })
  153. jobIds.push(job.id)
  154. // -> Start working on it
  155. try {
  156. if (job.useWorker) {
  157. await this.workerPool.execute({
  158. ...job,
  159. INSTANCE_ID: `${WIKI.INSTANCE_ID}:WKR`
  160. })
  161. } else {
  162. await this.tasks[job.task](job.payload)
  163. }
  164. // -> Update job history (success)
  165. await WIKI.db.knex('jobHistory').where({
  166. id: job.id
  167. }).update({
  168. state: 'completed',
  169. completedAt: new Date()
  170. })
  171. WIKI.logger.info(`Completed job ${job.id}: ${job.task}`)
  172. WIKI.db.listener.publish('scheduler', {
  173. source: WIKI.INSTANCE_ID,
  174. event: 'jobCompleted',
  175. state: 'success',
  176. id: job.id
  177. })
  178. } catch (err) {
  179. WIKI.logger.warn(`Failed to complete job ${job.id}: ${job.task} [ FAILED ]`)
  180. WIKI.logger.warn(err)
  181. // -> Update job history (fail)
  182. await WIKI.db.knex('jobHistory').where({
  183. id: job.id
  184. }).update({
  185. attempt: job.retries + 1,
  186. state: 'failed',
  187. lastErrorMessage: err.message
  188. })
  189. WIKI.db.listener.publish('scheduler', {
  190. source: WIKI.INSTANCE_ID,
  191. event: 'jobCompleted',
  192. state: 'failed',
  193. id: job.id,
  194. errorMessage: err.message
  195. })
  196. // -> Reschedule for retry
  197. if (job.retries < job.maxRetries) {
  198. const backoffDelay = (2 ** job.retries) * WIKI.config.scheduler.retryBackoff
  199. await trx('jobs').insert({
  200. ...job,
  201. retries: job.retries + 1,
  202. waitUntil: DateTime.utc().plus({ seconds: backoffDelay }).toJSDate(),
  203. updatedAt: new Date()
  204. })
  205. WIKI.logger.warn(`Rescheduling new attempt for job ${job.id}: ${job.task}...`)
  206. }
  207. }
  208. }
  209. }
  210. })
  211. } catch (err) {
  212. WIKI.logger.warn(err)
  213. if (jobIds && jobIds.length > 0) {
  214. WIKI.db.knex('jobHistory').whereIn('id', jobIds).update({
  215. state: 'interrupted',
  216. lastErrorMessage: err.message
  217. })
  218. }
  219. }
  220. },
  221. async addScheduled () {
  222. try {
  223. await WIKI.db.knex.transaction(async trx => {
  224. // -> Acquire lock
  225. const jobLock = await trx('jobLock')
  226. .where(
  227. 'key',
  228. WIKI.db.knex('jobLock')
  229. .select('key')
  230. .where('key', 'cron')
  231. .andWhere('lastCheckedAt', '<=', DateTime.utc().minus({ minutes: 5 }).toISO())
  232. .forUpdate()
  233. .skipLocked()
  234. .limit(1)
  235. ).update({
  236. lastCheckedBy: WIKI.INSTANCE_ID,
  237. lastCheckedAt: DateTime.utc().toISO()
  238. })
  239. if (jobLock > 0) {
  240. WIKI.logger.info(`Scheduling future planned jobs...`)
  241. const scheduledJobs = await WIKI.db.knex('jobSchedule')
  242. if (scheduledJobs?.length > 0) {
  243. // -> Get existing scheduled jobs
  244. const existingJobs = await WIKI.db.knex('jobs').where('isScheduled', true)
  245. let totalAdded = 0
  246. for (const job of scheduledJobs) {
  247. // -> Get next planned iterations
  248. const plannedIterations = cronparser.parseExpression(job.cron, {
  249. startDate: DateTime.utc().toJSDate(),
  250. endDate: DateTime.utc().plus({ days: 1, minutes: 5 }).toJSDate(),
  251. iterator: true,
  252. tz: 'UTC'
  253. })
  254. // -> Add a maximum of 10 future iterations for a single task
  255. let addedFutureJobs = 0
  256. while (true) {
  257. try {
  258. const next = plannedIterations.next()
  259. // -> Ensure this iteration isn't already scheduled
  260. if (!existingJobs.some(j => j.task === job.task && j.waitUntil.getTime() === next.value.getTime())) {
  261. this.addJob({
  262. task: job.task,
  263. useWorker: !(typeof this.tasks[job.task] === 'function'),
  264. payload: job.payload,
  265. isScheduled: true,
  266. waitUntil: next.value.toISOString(),
  267. notify: false
  268. })
  269. addedFutureJobs++
  270. totalAdded++
  271. }
  272. // -> No more iterations for this period or max iterations count reached
  273. if (next.done || addedFutureJobs >= 10) { break }
  274. } catch (err) {
  275. break
  276. }
  277. }
  278. }
  279. if (totalAdded > 0) {
  280. WIKI.logger.info(`Scheduled ${totalAdded} new future planned jobs: [ OK ]`)
  281. } else {
  282. WIKI.logger.info(`No new future planned jobs to schedule: [ OK ]`)
  283. }
  284. }
  285. }
  286. })
  287. } catch (err) {
  288. WIKI.logger.warn(err)
  289. }
  290. },
  291. async stop () {
  292. WIKI.logger.info('Stopping Scheduler...')
  293. clearInterval(this.scheduledRef)
  294. clearInterval(this.pollingRef)
  295. await this.workerPool.destroy()
  296. WIKI.logger.info('Scheduler: [ STOPPED ]')
  297. }
  298. }