scheduler.js 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. const { DynamicThreadPool } = require('poolifier')
  2. const os = require('node:os')
  3. const autoload = require('auto-load')
  4. const path = require('node:path')
  5. const cronparser = require('cron-parser')
  6. const { DateTime } = require('luxon')
  7. module.exports = {
  8. workerPool: null,
  9. maxWorkers: 1,
  10. activeWorkers: 0,
  11. pollingRef: null,
  12. scheduledRef: null,
  13. tasks: null,
  14. async init () {
  15. this.maxWorkers = WIKI.config.scheduler.workers === 'auto' ? (os.cpus().length - 1) : WIKI.config.scheduler.workers
  16. if (this.maxWorkers < 1) { this.maxWorkers = 1 }
  17. WIKI.logger.info(`Initializing Worker Pool (Limit: ${this.maxWorkers})...`)
  18. this.workerPool = new DynamicThreadPool(1, this.maxWorkers, './server/worker.js', {
  19. errorHandler: (err) => WIKI.logger.warn(err),
  20. exitHandler: () => WIKI.logger.debug('A worker has gone offline.'),
  21. onlineHandler: () => WIKI.logger.debug('New worker is online.')
  22. })
  23. this.tasks = autoload(path.join(WIKI.SERVERPATH, 'tasks/simple'))
  24. return this
  25. },
  26. async start () {
  27. WIKI.logger.info('Starting Scheduler...')
  28. // -> Add PostgreSQL Sub Channel
  29. WIKI.db.listener.addChannel('scheduler', async payload => {
  30. switch (payload.event) {
  31. case 'newJob': {
  32. if (this.activeWorkers < this.maxWorkers) {
  33. this.activeWorkers++
  34. await this.processJob()
  35. this.activeWorkers--
  36. }
  37. break
  38. }
  39. }
  40. })
  41. // -> Start scheduled jobs check
  42. this.scheduledRef = setInterval(async () => {
  43. this.addScheduled()
  44. }, WIKI.config.scheduler.scheduledCheck * 1000)
  45. // -> Add scheduled jobs on init
  46. await this.addScheduled()
  47. // -> Start job polling
  48. this.pollingRef = setInterval(async () => {
  49. this.processJob()
  50. }, WIKI.config.scheduler.pollingCheck * 1000)
  51. WIKI.logger.info('Scheduler: [ STARTED ]')
  52. },
  53. async addJob ({ task, payload, waitUntil, maxRetries, isScheduled = false, notify = true }) {
  54. try {
  55. await WIKI.db.knex('jobs').insert({
  56. task,
  57. useWorker: !(typeof this.tasks[task] === 'function'),
  58. payload,
  59. maxRetries: maxRetries ?? WIKI.config.scheduler.maxRetries,
  60. isScheduled,
  61. waitUntil,
  62. createdBy: WIKI.INSTANCE_ID
  63. })
  64. if (notify) {
  65. WIKI.db.listener.publish('scheduler', {
  66. source: WIKI.INSTANCE_ID,
  67. event: 'newJob'
  68. })
  69. }
  70. } catch (err) {
  71. WIKI.logger.warn(`Failed to add job to scheduler: ${err.message}`)
  72. }
  73. },
  74. async processJob () {
  75. let jobIds = []
  76. try {
  77. const availableWorkers = this.maxWorkers - this.activeWorkers
  78. if (availableWorkers < 1) {
  79. WIKI.logger.debug('All workers are busy. Cannot process more jobs at the moment.')
  80. return
  81. }
  82. await WIKI.db.knex.transaction(async trx => {
  83. const jobs = await trx('jobs')
  84. .whereIn('id', WIKI.db.knex.raw(`(SELECT id FROM jobs WHERE ("waitUntil" IS NULL OR "waitUntil" <= NOW()) ORDER BY id FOR UPDATE SKIP LOCKED LIMIT ${availableWorkers})`))
  85. .returning('*')
  86. .del()
  87. if (jobs && jobs.length > 0) {
  88. for (const job of jobs) {
  89. WIKI.logger.info(`Processing new job ${job.id}: ${job.task}...`)
  90. // -> Add to Job History
  91. await WIKI.db.knex('jobHistory').insert({
  92. id: job.id,
  93. task: job.task,
  94. state: 'active',
  95. useWorker: job.useWorker,
  96. wasScheduled: job.isScheduled,
  97. payload: job.payload,
  98. attempt: job.retries + 1,
  99. maxRetries: job.maxRetries,
  100. executedBy: WIKI.INSTANCE_ID,
  101. createdAt: job.createdAt
  102. }).onConflict('id').merge({
  103. executedBy: WIKI.INSTANCE_ID,
  104. startedAt: new Date()
  105. })
  106. jobIds.push(job.id)
  107. // -> Start working on it
  108. try {
  109. if (job.useWorker) {
  110. await this.workerPool.execute({
  111. ...job,
  112. INSTANCE_ID: `${WIKI.INSTANCE_ID}:WKR`
  113. })
  114. } else {
  115. await this.tasks[job.task](job.payload)
  116. }
  117. // -> Update job history (success)
  118. await WIKI.db.knex('jobHistory').where({
  119. id: job.id
  120. }).update({
  121. state: 'completed',
  122. completedAt: new Date()
  123. })
  124. WIKI.logger.info(`Completed job ${job.id}: ${job.task}`)
  125. } catch (err) {
  126. WIKI.logger.warn(`Failed to complete job ${job.id}: ${job.task} [ FAILED ]`)
  127. WIKI.logger.warn(err)
  128. // -> Update job history (fail)
  129. await WIKI.db.knex('jobHistory').where({
  130. id: job.id
  131. }).update({
  132. state: 'failed',
  133. lastErrorMessage: err.message
  134. })
  135. // -> Reschedule for retry
  136. if (job.retries < job.maxRetries) {
  137. const backoffDelay = (2 ** job.retries) * WIKI.config.scheduler.retryBackoff
  138. await trx('jobs').insert({
  139. ...job,
  140. retries: job.retries + 1,
  141. waitUntil: DateTime.utc().plus({ seconds: backoffDelay }).toJSDate(),
  142. updatedAt: new Date()
  143. })
  144. WIKI.logger.warn(`Rescheduling new attempt for job ${job.id}: ${job.task}...`)
  145. }
  146. }
  147. }
  148. }
  149. })
  150. } catch (err) {
  151. WIKI.logger.warn(err)
  152. if (jobIds && jobIds.length > 0) {
  153. WIKI.db.knex('jobHistory').whereIn('id', jobIds).update({
  154. state: 'interrupted',
  155. lastErrorMessage: err.message
  156. })
  157. }
  158. }
  159. },
  160. async addScheduled () {
  161. try {
  162. await WIKI.db.knex.transaction(async trx => {
  163. // -> Acquire lock
  164. const jobLock = await trx('jobLock')
  165. .where(
  166. 'key',
  167. WIKI.db.knex('jobLock')
  168. .select('key')
  169. .where('key', 'cron')
  170. .andWhere('lastCheckedAt', '<=', DateTime.utc().minus({ minutes: 5 }).toISO())
  171. .forUpdate()
  172. .skipLocked()
  173. .limit(1)
  174. ).update({
  175. lastCheckedBy: WIKI.INSTANCE_ID,
  176. lastCheckedAt: DateTime.utc().toISO()
  177. })
  178. if (jobLock > 0) {
  179. WIKI.logger.info(`Scheduling future planned jobs...`)
  180. const scheduledJobs = await WIKI.db.knex('jobSchedule')
  181. if (scheduledJobs?.length > 0) {
  182. // -> Get existing scheduled jobs
  183. const existingJobs = await WIKI.db.knex('jobs').where('isScheduled', true)
  184. let totalAdded = 0
  185. for (const job of scheduledJobs) {
  186. // -> Get next planned iterations
  187. const plannedIterations = cronparser.parseExpression(job.cron, {
  188. startDate: DateTime.utc().toJSDate(),
  189. endDate: DateTime.utc().plus({ days: 1, minutes: 5 }).toJSDate(),
  190. iterator: true,
  191. tz: 'UTC'
  192. })
  193. // -> Add a maximum of 10 future iterations for a single task
  194. let addedFutureJobs = 0
  195. while (true) {
  196. try {
  197. const next = plannedIterations.next()
  198. // -> Ensure this iteration isn't already scheduled
  199. if (!existingJobs.some(j => j.task === job.task && j.waitUntil.getTime() === next.value.getTime())) {
  200. this.addJob({
  201. task: job.task,
  202. useWorker: !(typeof this.tasks[job.task] === 'function'),
  203. payload: job.payload,
  204. isScheduled: true,
  205. waitUntil: next.value.toISOString(),
  206. notify: false
  207. })
  208. addedFutureJobs++
  209. totalAdded++
  210. }
  211. // -> No more iterations for this period or max iterations count reached
  212. if (next.done || addedFutureJobs >= 10) { break }
  213. } catch (err) {
  214. break
  215. }
  216. }
  217. }
  218. if (totalAdded > 0) {
  219. WIKI.logger.info(`Scheduled ${totalAdded} new future planned jobs: [ OK ]`)
  220. } else {
  221. WIKI.logger.info(`No new future planned jobs to schedule: [ OK ]`)
  222. }
  223. }
  224. }
  225. })
  226. } catch (err) {
  227. WIKI.logger.warn(err)
  228. }
  229. },
  230. async stop () {
  231. WIKI.logger.info('Stopping Scheduler...')
  232. clearInterval(this.scheduledRef)
  233. clearInterval(this.pollingRef)
  234. await this.workerPool.destroy()
  235. WIKI.logger.info('Scheduler: [ STOPPED ]')
  236. }
  237. }