123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390 |
- /**
- * Cron Job Persistent Storage
- * Manages persistent storage of cron job status and steps in MongoDB
- */
- import { Meteor } from 'meteor/meteor';
- import { Mongo } from 'meteor/mongo';
- // Collections for persistent storage
- export const CronJobStatus = new Mongo.Collection('cronJobStatus');
- export const CronJobSteps = new Mongo.Collection('cronJobSteps');
- export const CronJobQueue = new Mongo.Collection('cronJobQueue');
- // Indexes for performance
- if (Meteor.isServer) {
- Meteor.startup(() => {
- // Index for job status queries
- CronJobStatus._collection.createIndex({ jobId: 1 });
- CronJobStatus._collection.createIndex({ status: 1 });
- CronJobStatus._collection.createIndex({ createdAt: 1 });
- CronJobStatus._collection.createIndex({ updatedAt: 1 });
-
- // Index for job steps queries
- CronJobSteps._collection.createIndex({ jobId: 1 });
- CronJobSteps._collection.createIndex({ stepIndex: 1 });
- CronJobSteps._collection.createIndex({ status: 1 });
-
- // Index for job queue queries
- CronJobQueue._collection.createIndex({ priority: 1, createdAt: 1 });
- CronJobQueue._collection.createIndex({ status: 1 });
- CronJobQueue._collection.createIndex({ jobType: 1 });
- });
- }
- class CronJobStorage {
- constructor() {
- this.maxConcurrentJobs = this.getMaxConcurrentJobs();
- this.cpuThreshold = 80; // CPU usage threshold percentage
- this.memoryThreshold = 95; // Memory usage threshold percentage (increased for better job processing)
- }
- /**
- * Get maximum concurrent jobs based on system resources
- */
- getMaxConcurrentJobs() {
- // Default to 3 concurrent jobs, but can be configured via environment
- const envLimit = process.env.MAX_CONCURRENT_CRON_JOBS;
- if (envLimit) {
- return parseInt(envLimit, 10);
- }
-
- // Auto-detect based on CPU cores
- const os = require('os');
- const cpuCores = os.cpus().length;
- return Math.max(1, Math.min(5, Math.floor(cpuCores / 2)));
- }
- /**
- * Save job status to persistent storage
- */
- saveJobStatus(jobId, jobData) {
- const now = new Date();
- const existingJob = CronJobStatus.findOne({ jobId });
-
- if (existingJob) {
- CronJobStatus.update(
- { jobId },
- {
- $set: {
- ...jobData,
- updatedAt: now
- }
- }
- );
- } else {
- CronJobStatus.insert({
- jobId,
- ...jobData,
- createdAt: now,
- updatedAt: now
- });
- }
- }
- /**
- * Get job status from persistent storage
- */
- getJobStatus(jobId) {
- return CronJobStatus.findOne({ jobId });
- }
- /**
- * Get all incomplete jobs
- */
- getIncompleteJobs() {
- return CronJobStatus.find({
- status: { $in: ['pending', 'running', 'paused'] }
- }).fetch();
- }
- /**
- * Save job step status
- */
- saveJobStep(jobId, stepIndex, stepData) {
- const now = new Date();
- const existingStep = CronJobSteps.findOne({ jobId, stepIndex });
-
- if (existingStep) {
- CronJobSteps.update(
- { jobId, stepIndex },
- {
- $set: {
- ...stepData,
- updatedAt: now
- }
- }
- );
- } else {
- CronJobSteps.insert({
- jobId,
- stepIndex,
- ...stepData,
- createdAt: now,
- updatedAt: now
- });
- }
- }
- /**
- * Get job steps
- */
- getJobSteps(jobId) {
- return CronJobSteps.find(
- { jobId },
- { sort: { stepIndex: 1 } }
- ).fetch();
- }
- /**
- * Get incomplete steps for a job
- */
- getIncompleteSteps(jobId) {
- return CronJobSteps.find({
- jobId,
- status: { $in: ['pending', 'running'] }
- }, { sort: { stepIndex: 1 } }).fetch();
- }
- /**
- * Add job to queue
- */
- addToQueue(jobId, jobType, priority = 5, jobData = {}) {
- const now = new Date();
-
- // Check if job already exists in queue
- const existingJob = CronJobQueue.findOne({ jobId });
- if (existingJob) {
- return existingJob._id;
- }
-
- return CronJobQueue.insert({
- jobId,
- jobType,
- priority,
- status: 'pending',
- jobData,
- createdAt: now,
- updatedAt: now
- });
- }
- /**
- * Get next job from queue
- */
- getNextJob() {
- return CronJobQueue.findOne({
- status: 'pending'
- }, {
- sort: { priority: 1, createdAt: 1 }
- });
- }
- /**
- * Update job queue status
- */
- updateQueueStatus(jobId, status, additionalData = {}) {
- const now = new Date();
- CronJobQueue.update(
- { jobId },
- {
- $set: {
- status,
- ...additionalData,
- updatedAt: now
- }
- }
- );
- }
- /**
- * Remove job from queue
- */
- removeFromQueue(jobId) {
- CronJobQueue.remove({ jobId });
- }
- /**
- * Get system resource usage
- */
- getSystemResources() {
- const os = require('os');
-
- // Get CPU usage (simplified)
- const cpus = os.cpus();
- let totalIdle = 0;
- let totalTick = 0;
-
- cpus.forEach(cpu => {
- for (const type in cpu.times) {
- totalTick += cpu.times[type];
- }
- totalIdle += cpu.times.idle;
- });
-
- const cpuUsage = 100 - Math.round(100 * totalIdle / totalTick);
-
- // Get memory usage
- const totalMem = os.totalmem();
- const freeMem = os.freemem();
- const memoryUsage = Math.round(100 * (totalMem - freeMem) / totalMem);
-
- return {
- cpuUsage,
- memoryUsage,
- totalMem,
- freeMem,
- cpuCores: cpus.length
- };
- }
- /**
- * Check if system can handle more jobs
- */
- canStartNewJob() {
- const resources = this.getSystemResources();
- const runningJobs = CronJobQueue.find({ status: 'running' }).count();
-
- // Check CPU and memory thresholds
- if (resources.cpuUsage > this.cpuThreshold) {
- return { canStart: false, reason: 'CPU usage too high' };
- }
-
- if (resources.memoryUsage > this.memoryThreshold) {
- return { canStart: false, reason: 'Memory usage too high' };
- }
-
- // Check concurrent job limit
- if (runningJobs >= this.maxConcurrentJobs) {
- return { canStart: false, reason: 'Maximum concurrent jobs reached' };
- }
-
- return { canStart: true, reason: 'System can handle new job' };
- }
- /**
- * Get queue statistics
- */
- getQueueStats() {
- const total = CronJobQueue.find().count();
- const pending = CronJobQueue.find({ status: 'pending' }).count();
- const running = CronJobQueue.find({ status: 'running' }).count();
- const completed = CronJobQueue.find({ status: 'completed' }).count();
- const failed = CronJobQueue.find({ status: 'failed' }).count();
-
- return {
- total,
- pending,
- running,
- completed,
- failed,
- maxConcurrent: this.maxConcurrentJobs
- };
- }
- /**
- * Clean up old completed jobs
- */
- cleanupOldJobs(daysOld = 7) {
- const cutoffDate = new Date();
- cutoffDate.setDate(cutoffDate.getDate() - daysOld);
-
- // Remove old completed jobs from queue
- const removedQueue = CronJobQueue.remove({
- status: 'completed',
- updatedAt: { $lt: cutoffDate }
- });
-
- // Remove old job statuses
- const removedStatus = CronJobStatus.remove({
- status: 'completed',
- updatedAt: { $lt: cutoffDate }
- });
-
- // Remove old job steps
- const removedSteps = CronJobSteps.remove({
- status: 'completed',
- updatedAt: { $lt: cutoffDate }
- });
-
- return {
- removedQueue,
- removedStatus,
- removedSteps
- };
- }
- /**
- * Resume incomplete jobs on startup
- */
- resumeIncompleteJobs() {
- const incompleteJobs = this.getIncompleteJobs();
- const resumedJobs = [];
-
- incompleteJobs.forEach(job => {
- // Reset running jobs to pending
- if (job.status === 'running') {
- this.saveJobStatus(job.jobId, {
- ...job,
- status: 'pending',
- error: 'Job was interrupted during startup'
- });
- resumedJobs.push(job.jobId);
- }
-
- // Add to queue if not already there
- const queueJob = CronJobQueue.findOne({ jobId: job.jobId });
- if (!queueJob) {
- this.addToQueue(job.jobId, job.jobType || 'unknown', job.priority || 5, job);
- }
- });
-
- return resumedJobs;
- }
- /**
- * Get job progress percentage
- */
- getJobProgress(jobId) {
- const steps = this.getJobSteps(jobId);
- if (steps.length === 0) return 0;
-
- const completedSteps = steps.filter(step => step.status === 'completed').length;
- return Math.round((completedSteps / steps.length) * 100);
- }
- /**
- * Get detailed job information
- */
- getJobDetails(jobId) {
- const jobStatus = this.getJobStatus(jobId);
- const jobSteps = this.getJobSteps(jobId);
- const progress = this.getJobProgress(jobId);
-
- return {
- ...jobStatus,
- steps: jobSteps,
- progress,
- totalSteps: jobSteps.length,
- completedSteps: jobSteps.filter(step => step.status === 'completed').length
- };
- }
- }
- // Export singleton instance
- export const cronJobStorage = new CronJobStorage();
- // Cleanup old jobs on startup
- Meteor.startup(() => {
- // Resume incomplete jobs
- const resumedJobs = cronJobStorage.resumeIncompleteJobs();
- if (resumedJobs.length > 0) {
- // Resumed incomplete cron jobs
- }
-
- // Cleanup old jobs
- const cleanup = cronJobStorage.cleanupOldJobs();
- if (cleanup.removedQueue > 0 || cleanup.removedStatus > 0 || cleanup.removedSteps > 0) {
- // Cleaned up old cron jobs
- }
- });
|