scheduler.js 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. const { DynamicThreadPool } = require('poolifier')
  2. const os = require('node:os')
  3. const autoload = require('auto-load')
  4. const path = require('node:path')
  5. const cronparser = require('cron-parser')
  6. const { DateTime } = require('luxon')
  7. module.exports = {
  8. workerPool: null,
  9. maxWorkers: 1,
  10. activeWorkers: 0,
  11. pollingRef: null,
  12. scheduledRef: null,
  13. tasks: null,
  14. async init () {
  15. this.maxWorkers = WIKI.config.scheduler.workers === 'auto' ? os.cpus().length : WIKI.config.scheduler.workers
  16. WIKI.logger.info(`Initializing Worker Pool (Limit: ${this.maxWorkers})...`)
  17. this.workerPool = new DynamicThreadPool(1, this.maxWorkers, './server/worker.js', {
  18. errorHandler: (err) => WIKI.logger.warn(err),
  19. exitHandler: () => WIKI.logger.debug('A worker has gone offline.'),
  20. onlineHandler: () => WIKI.logger.debug('New worker is online.')
  21. })
  22. this.tasks = autoload(path.join(WIKI.SERVERPATH, 'tasks/simple'))
  23. return this
  24. },
  25. async start () {
  26. WIKI.logger.info('Starting Scheduler...')
  27. // -> Add PostgreSQL Sub Channel
  28. WIKI.db.listener.addChannel('scheduler', async payload => {
  29. switch (payload.event) {
  30. case 'newJob': {
  31. if (this.activeWorkers < this.maxWorkers) {
  32. this.activeWorkers++
  33. await this.processJob()
  34. this.activeWorkers--
  35. }
  36. break
  37. }
  38. }
  39. })
  40. // -> Start scheduled jobs check
  41. this.scheduledRef = setInterval(async () => {
  42. this.addScheduled()
  43. }, WIKI.config.scheduler.scheduledCheck * 1000)
  44. // -> Add scheduled jobs on init
  45. await this.addScheduled()
  46. // -> Start job polling
  47. this.pollingRef = setInterval(async () => {
  48. this.processJob()
  49. }, WIKI.config.scheduler.pollingCheck * 1000)
  50. WIKI.logger.info('Scheduler: [ STARTED ]')
  51. },
  52. async addJob ({ task, payload, waitUntil, maxRetries, isScheduled = false, notify = true }) {
  53. try {
  54. await WIKI.db.knex('jobs').insert({
  55. task,
  56. useWorker: !(typeof this.tasks[task] === 'function'),
  57. payload,
  58. maxRetries: maxRetries ?? WIKI.config.scheduler.maxRetries,
  59. isScheduled,
  60. waitUntil,
  61. createdBy: WIKI.INSTANCE_ID
  62. })
  63. if (notify) {
  64. WIKI.db.listener.publish('scheduler', {
  65. source: WIKI.INSTANCE_ID,
  66. event: 'newJob'
  67. })
  68. }
  69. } catch (err) {
  70. WIKI.logger.warn(`Failed to add job to scheduler: ${err.message}`)
  71. }
  72. },
  73. async processJob () {
  74. let jobId = null
  75. try {
  76. await WIKI.db.knex.transaction(async trx => {
  77. const jobs = await trx('jobs')
  78. .where('id', WIKI.db.knex.raw('(SELECT id FROM jobs WHERE ("waitUntil" IS NULL OR "waitUntil" <= NOW()) ORDER BY id FOR UPDATE SKIP LOCKED LIMIT 1)'))
  79. .returning('*')
  80. .del()
  81. if (jobs && jobs.length === 1) {
  82. const job = jobs[0]
  83. WIKI.logger.info(`Processing new job ${job.id}: ${job.task}...`)
  84. jobId = job.id
  85. // -> Add to Job History
  86. await WIKI.db.knex('jobHistory').insert({
  87. id: job.id,
  88. task: job.task,
  89. state: 'active',
  90. useWorker: job.useWorker,
  91. wasScheduled: job.isScheduled,
  92. payload: job.payload,
  93. attempt: job.retries + 1,
  94. maxRetries: job.maxRetries,
  95. createdAt: job.createdAt
  96. }).onConflict('id').merge({
  97. startedAt: new Date()
  98. })
  99. // -> Start working on it
  100. try {
  101. if (job.useWorker) {
  102. await this.workerPool.execute({
  103. id: job.id,
  104. name: job.task,
  105. data: job.payload
  106. })
  107. } else {
  108. await this.tasks[job.task](job.payload)
  109. }
  110. // -> Update job history (success)
  111. await WIKI.db.knex('jobHistory').where({
  112. id: job.id
  113. }).update({
  114. state: 'completed',
  115. completedAt: new Date()
  116. })
  117. WIKI.logger.info(`Completed job ${job.id}: ${job.task} [ SUCCESS ]`)
  118. } catch (err) {
  119. WIKI.logger.warn(`Failed to complete job ${job.id}: ${job.task} [ FAILED ]`)
  120. WIKI.logger.warn(err)
  121. // -> Update job history (fail)
  122. await WIKI.db.knex('jobHistory').where({
  123. id: job.id
  124. }).update({
  125. state: 'failed',
  126. lastErrorMessage: err.message
  127. })
  128. // -> Reschedule for retry
  129. if (job.retries < job.maxRetries) {
  130. const backoffDelay = (2 ** job.retries) * WIKI.config.scheduler.retryBackoff
  131. await trx('jobs').insert({
  132. ...job,
  133. retries: job.retries + 1,
  134. waitUntil: DateTime.utc().plus({ seconds: backoffDelay }).toJSDate(),
  135. updatedAt: new Date()
  136. })
  137. WIKI.logger.warn(`Rescheduling new attempt for job ${job.id}: ${job.task}...`)
  138. }
  139. }
  140. }
  141. })
  142. } catch (err) {
  143. WIKI.logger.warn(err)
  144. if (jobId) {
  145. WIKI.db.knex('jobHistory').where({
  146. id: jobId
  147. }).update({
  148. state: 'interrupted',
  149. lastErrorMessage: err.message
  150. })
  151. }
  152. }
  153. },
  154. async addScheduled () {
  155. try {
  156. await WIKI.db.knex.transaction(async trx => {
  157. // -> Acquire lock
  158. const jobLock = await trx('jobLock')
  159. .where(
  160. 'key',
  161. WIKI.db.knex('jobLock')
  162. .select('key')
  163. .where('key', 'cron')
  164. .andWhere('lastCheckedAt', '<=', DateTime.utc().minus({ minutes: 5 }).toISO())
  165. .forUpdate()
  166. .skipLocked()
  167. .limit(1)
  168. ).update({
  169. lastCheckedBy: WIKI.INSTANCE_ID,
  170. lastCheckedAt: DateTime.utc().toISO()
  171. })
  172. if (jobLock > 0) {
  173. WIKI.logger.info(`Scheduling future planned jobs...`)
  174. const scheduledJobs = await WIKI.db.knex('jobSchedule')
  175. if (scheduledJobs?.length > 0) {
  176. // -> Get existing scheduled jobs
  177. const existingJobs = await WIKI.db.knex('jobs').where('isScheduled', true)
  178. for (const job of scheduledJobs) {
  179. // -> Get next planned iterations
  180. const plannedIterations = cronparser.parseExpression(job.cron, {
  181. startDate: DateTime.utc().toJSDate(),
  182. endDate: DateTime.utc().plus({ days: 1, minutes: 5 }).toJSDate(),
  183. iterator: true,
  184. tz: 'UTC'
  185. })
  186. // -> Add a maximum of 10 future iterations for a single task
  187. let addedFutureJobs = 0
  188. while (true) {
  189. try {
  190. const next = plannedIterations.next()
  191. // -> Ensure this iteration isn't already scheduled
  192. if (!existingJobs.some(j => j.task === job.task && j.waitUntil.getTime() === next.value.getTime())) {
  193. this.addJob({
  194. task: job.task,
  195. useWorker: !(typeof this.tasks[job.task] === 'function'),
  196. payload: job.payload,
  197. isScheduled: true,
  198. waitUntil: next.value.toISOString(),
  199. notify: false
  200. })
  201. addedFutureJobs++
  202. }
  203. // -> No more iterations for this period or max iterations count reached
  204. if (next.done || addedFutureJobs >= 10) { break }
  205. } catch (err) {
  206. break
  207. }
  208. }
  209. }
  210. }
  211. }
  212. })
  213. } catch (err) {
  214. WIKI.logger.warn(err)
  215. }
  216. },
  217. async stop () {
  218. WIKI.logger.info('Stopping Scheduler...')
  219. clearInterval(this.scheduledRef)
  220. clearInterval(this.pollingRef)
  221. await this.workerPool.destroy()
  222. WIKI.logger.info('Scheduler: [ STOPPED ]')
  223. }
  224. }