cronJobStorage.js 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. /**
  2. * Cron Job Persistent Storage
  3. * Manages persistent storage of cron job status and steps in MongoDB
  4. */
  5. import { Meteor } from 'meteor/meteor';
  6. import { Mongo } from 'meteor/mongo';
  7. // Collections for persistent storage
  8. export const CronJobStatus = new Mongo.Collection('cronJobStatus');
  9. export const CronJobSteps = new Mongo.Collection('cronJobSteps');
  10. export const CronJobQueue = new Mongo.Collection('cronJobQueue');
  11. // Indexes for performance
  12. if (Meteor.isServer) {
  13. Meteor.startup(() => {
  14. // Index for job status queries
  15. CronJobStatus._collection.createIndex({ jobId: 1 });
  16. CronJobStatus._collection.createIndex({ status: 1 });
  17. CronJobStatus._collection.createIndex({ createdAt: 1 });
  18. CronJobStatus._collection.createIndex({ updatedAt: 1 });
  19. // Index for job steps queries
  20. CronJobSteps._collection.createIndex({ jobId: 1 });
  21. CronJobSteps._collection.createIndex({ stepIndex: 1 });
  22. CronJobSteps._collection.createIndex({ status: 1 });
  23. // Index for job queue queries
  24. CronJobQueue._collection.createIndex({ priority: 1, createdAt: 1 });
  25. CronJobQueue._collection.createIndex({ status: 1 });
  26. CronJobQueue._collection.createIndex({ jobType: 1 });
  27. });
  28. }
  29. class CronJobStorage {
  30. constructor() {
  31. this.maxConcurrentJobs = this.getMaxConcurrentJobs();
  32. this.cpuThreshold = 80; // CPU usage threshold percentage
  33. this.memoryThreshold = 90; // Memory usage threshold percentage
  34. }
  35. /**
  36. * Get maximum concurrent jobs based on system resources
  37. */
  38. getMaxConcurrentJobs() {
  39. // Default to 3 concurrent jobs, but can be configured via environment
  40. const envLimit = process.env.MAX_CONCURRENT_CRON_JOBS;
  41. if (envLimit) {
  42. return parseInt(envLimit, 10);
  43. }
  44. // Auto-detect based on CPU cores
  45. const os = require('os');
  46. const cpuCores = os.cpus().length;
  47. return Math.max(1, Math.min(5, Math.floor(cpuCores / 2)));
  48. }
  49. /**
  50. * Save job status to persistent storage
  51. */
  52. saveJobStatus(jobId, jobData) {
  53. const now = new Date();
  54. const existingJob = CronJobStatus.findOne({ jobId });
  55. if (existingJob) {
  56. CronJobStatus.update(
  57. { jobId },
  58. {
  59. $set: {
  60. ...jobData,
  61. updatedAt: now
  62. }
  63. }
  64. );
  65. } else {
  66. CronJobStatus.insert({
  67. jobId,
  68. ...jobData,
  69. createdAt: now,
  70. updatedAt: now
  71. });
  72. }
  73. }
  74. /**
  75. * Get job status from persistent storage
  76. */
  77. getJobStatus(jobId) {
  78. return CronJobStatus.findOne({ jobId });
  79. }
  80. /**
  81. * Get all incomplete jobs
  82. */
  83. getIncompleteJobs() {
  84. return CronJobStatus.find({
  85. status: { $in: ['pending', 'running', 'paused'] }
  86. }).fetch();
  87. }
  88. /**
  89. * Save job step status
  90. */
  91. saveJobStep(jobId, stepIndex, stepData) {
  92. const now = new Date();
  93. const existingStep = CronJobSteps.findOne({ jobId, stepIndex });
  94. if (existingStep) {
  95. CronJobSteps.update(
  96. { jobId, stepIndex },
  97. {
  98. $set: {
  99. ...stepData,
  100. updatedAt: now
  101. }
  102. }
  103. );
  104. } else {
  105. CronJobSteps.insert({
  106. jobId,
  107. stepIndex,
  108. ...stepData,
  109. createdAt: now,
  110. updatedAt: now
  111. });
  112. }
  113. }
  114. /**
  115. * Get job steps
  116. */
  117. getJobSteps(jobId) {
  118. return CronJobSteps.find(
  119. { jobId },
  120. { sort: { stepIndex: 1 } }
  121. ).fetch();
  122. }
  123. /**
  124. * Get incomplete steps for a job
  125. */
  126. getIncompleteSteps(jobId) {
  127. return CronJobSteps.find({
  128. jobId,
  129. status: { $in: ['pending', 'running'] }
  130. }, { sort: { stepIndex: 1 } }).fetch();
  131. }
  132. /**
  133. * Add job to queue
  134. */
  135. addToQueue(jobId, jobType, priority = 5, jobData = {}) {
  136. const now = new Date();
  137. // Check if job already exists in queue
  138. const existingJob = CronJobQueue.findOne({ jobId });
  139. if (existingJob) {
  140. return existingJob._id;
  141. }
  142. return CronJobQueue.insert({
  143. jobId,
  144. jobType,
  145. priority,
  146. status: 'pending',
  147. jobData,
  148. createdAt: now,
  149. updatedAt: now
  150. });
  151. }
  152. /**
  153. * Get next job from queue
  154. */
  155. getNextJob() {
  156. return CronJobQueue.findOne({
  157. status: 'pending'
  158. }, {
  159. sort: { priority: 1, createdAt: 1 }
  160. });
  161. }
  162. /**
  163. * Update job queue status
  164. */
  165. updateQueueStatus(jobId, status, additionalData = {}) {
  166. const now = new Date();
  167. CronJobQueue.update(
  168. { jobId },
  169. {
  170. $set: {
  171. status,
  172. ...additionalData,
  173. updatedAt: now
  174. }
  175. }
  176. );
  177. }
  178. /**
  179. * Remove job from queue
  180. */
  181. removeFromQueue(jobId) {
  182. CronJobQueue.remove({ jobId });
  183. }
  184. /**
  185. * Get system resource usage
  186. */
  187. getSystemResources() {
  188. const os = require('os');
  189. // Get CPU usage (simplified)
  190. const cpus = os.cpus();
  191. let totalIdle = 0;
  192. let totalTick = 0;
  193. cpus.forEach(cpu => {
  194. for (const type in cpu.times) {
  195. totalTick += cpu.times[type];
  196. }
  197. totalIdle += cpu.times.idle;
  198. });
  199. const cpuUsage = 100 - Math.round(100 * totalIdle / totalTick);
  200. // Get memory usage
  201. const totalMem = os.totalmem();
  202. const freeMem = os.freemem();
  203. const memoryUsage = Math.round(100 * (totalMem - freeMem) / totalMem);
  204. return {
  205. cpuUsage,
  206. memoryUsage,
  207. totalMem,
  208. freeMem,
  209. cpuCores: cpus.length
  210. };
  211. }
  212. /**
  213. * Check if system can handle more jobs
  214. */
  215. canStartNewJob() {
  216. const resources = this.getSystemResources();
  217. const runningJobs = CronJobQueue.find({ status: 'running' }).count();
  218. // Check CPU and memory thresholds
  219. if (resources.cpuUsage > this.cpuThreshold) {
  220. return { canStart: false, reason: 'CPU usage too high' };
  221. }
  222. if (resources.memoryUsage > this.memoryThreshold) {
  223. return { canStart: false, reason: 'Memory usage too high' };
  224. }
  225. // Check concurrent job limit
  226. if (runningJobs >= this.maxConcurrentJobs) {
  227. return { canStart: false, reason: 'Maximum concurrent jobs reached' };
  228. }
  229. return { canStart: true, reason: 'System can handle new job' };
  230. }
  231. /**
  232. * Get queue statistics
  233. */
  234. getQueueStats() {
  235. const total = CronJobQueue.find().count();
  236. const pending = CronJobQueue.find({ status: 'pending' }).count();
  237. const running = CronJobQueue.find({ status: 'running' }).count();
  238. const completed = CronJobQueue.find({ status: 'completed' }).count();
  239. const failed = CronJobQueue.find({ status: 'failed' }).count();
  240. return {
  241. total,
  242. pending,
  243. running,
  244. completed,
  245. failed,
  246. maxConcurrent: this.maxConcurrentJobs
  247. };
  248. }
  249. /**
  250. * Clean up old completed jobs
  251. */
  252. cleanupOldJobs(daysOld = 7) {
  253. const cutoffDate = new Date();
  254. cutoffDate.setDate(cutoffDate.getDate() - daysOld);
  255. // Remove old completed jobs from queue
  256. const removedQueue = CronJobQueue.remove({
  257. status: 'completed',
  258. updatedAt: { $lt: cutoffDate }
  259. });
  260. // Remove old job statuses
  261. const removedStatus = CronJobStatus.remove({
  262. status: 'completed',
  263. updatedAt: { $lt: cutoffDate }
  264. });
  265. // Remove old job steps
  266. const removedSteps = CronJobSteps.remove({
  267. status: 'completed',
  268. updatedAt: { $lt: cutoffDate }
  269. });
  270. return {
  271. removedQueue,
  272. removedStatus,
  273. removedSteps
  274. };
  275. }
  276. /**
  277. * Resume incomplete jobs on startup
  278. */
  279. resumeIncompleteJobs() {
  280. const incompleteJobs = this.getIncompleteJobs();
  281. const resumedJobs = [];
  282. incompleteJobs.forEach(job => {
  283. // Reset running jobs to pending
  284. if (job.status === 'running') {
  285. this.saveJobStatus(job.jobId, {
  286. ...job,
  287. status: 'pending',
  288. error: 'Job was interrupted during startup'
  289. });
  290. resumedJobs.push(job.jobId);
  291. }
  292. // Add to queue if not already there
  293. const queueJob = CronJobQueue.findOne({ jobId: job.jobId });
  294. if (!queueJob) {
  295. this.addToQueue(job.jobId, job.jobType || 'unknown', job.priority || 5, job);
  296. }
  297. });
  298. return resumedJobs;
  299. }
  300. /**
  301. * Get job progress percentage
  302. */
  303. getJobProgress(jobId) {
  304. const steps = this.getJobSteps(jobId);
  305. if (steps.length === 0) return 0;
  306. const completedSteps = steps.filter(step => step.status === 'completed').length;
  307. return Math.round((completedSteps / steps.length) * 100);
  308. }
  309. /**
  310. * Get detailed job information
  311. */
  312. getJobDetails(jobId) {
  313. const jobStatus = this.getJobStatus(jobId);
  314. const jobSteps = this.getJobSteps(jobId);
  315. const progress = this.getJobProgress(jobId);
  316. return {
  317. ...jobStatus,
  318. steps: jobSteps,
  319. progress,
  320. totalSteps: jobSteps.length,
  321. completedSteps: jobSteps.filter(step => step.status === 'completed').length
  322. };
  323. }
  324. }
  325. // Export singleton instance
  326. export const cronJobStorage = new CronJobStorage();
  327. // Cleanup old jobs on startup
  328. Meteor.startup(() => {
  329. // Resume incomplete jobs
  330. const resumedJobs = cronJobStorage.resumeIncompleteJobs();
  331. if (resumedJobs.length > 0) {
  332. console.log(`Resumed ${resumedJobs.length} incomplete cron jobs:`, resumedJobs);
  333. }
  334. // Cleanup old jobs
  335. const cleanup = cronJobStorage.cleanupOldJobs();
  336. if (cleanup.removedQueue > 0 || cleanup.removedStatus > 0 || cleanup.removedSteps > 0) {
  337. console.log('Cleaned up old cron jobs:', cleanup);
  338. }
  339. });