123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727 |
- // Rig weak dependencies
- if (typeof MicroQueue === 'undefined' && Package['micro-queue']) {
- MicroQueue = Package['micro-queue'].MicroQueue;
- }
- if (typeof ReactiveList === 'undefined' && Package['reactive-list']) {
- ReactiveList = Package['reactive-list'].ReactiveList;
- }
- // Rig weak dependencies in +0.9.1
- if (typeof MicroQueue === 'undefined' && Package['wekan-cfs-micro-queue']) {
- MicroQueue = Package['wekan-cfs-micro-queue'].MicroQueue;
- }
- if (typeof ReactiveList === 'undefined' && Package['wekan-cfs-reactive-list']) {
- ReactiveList = Package['wekan-cfs-reactive-list'].ReactiveList;
- }
- /**
- * Creates an instance of a power queue // Testing inline comment
- * [Check out demo](http://power-queue-test.meteor.com/)
- *
- * @constructor
- * @self powerqueue
- * @param {object} [options] Settings
- * @param {boolean} [options.filo=false] Make it a first in last out queue
- * @param {boolean} [options.isPaused=false] Set queue paused
- * @param {boolean} [options.autostart=true] May adding a task start the queue
- * @param {string} [options.name="Queue"] Name of the queue
- * @param {number} [options.maxProcessing=1] Limit of simultanous running tasks
- * @param {number} [options.maxFailures = 5] Limit retries of failed tasks, if 0 or below we allow infinite failures
- * @param {number} [options.jumpOnFailure = true] Jump to next task and retry failed task later
- * @param {boolean} [options.debug=false] Log verbose messages to the console
- * @param {boolean} [options.reactive=true] Set whether or not this queue should be reactive
- * @param {boolean} [options.onAutostart] Callback for the queue autostart event
- * @param {boolean} [options.onPaused] Callback for the queue paused event
- * @param {boolean} [options.onReleased] Callback for the queue release event
- * @param {boolean} [options.onEnded] Callback for the queue end event
- * @param {[SpinalQueue](spinal-queue.spec.md)} [options.spinalQueue] Set spinal queue uses pr. default `MicroQueue` or `ReactiveList` if added to the project
- */
- PowerQueue = function(options) {
- var self = this;
- var test = 5;
- self.reactive = (options && options.reactive === false) ? false : true;
- // Allow user to use another micro-queue #3
- // We try setting the ActiveQueue to MicroQueue if installed in the app
- var ActiveQueue = (typeof MicroQueue !== 'undefined') && MicroQueue || undefined;
- // If ReactiveList is added to the project we use this over MicroQueue
- ActiveQueue = (typeof ReactiveList !== 'undefined') && ReactiveList || ActiveQueue;
- // We allow user to overrule and set a custom spinal-queue spec complient queue
- if (options && typeof options.spinalQueue !== 'undefined') {
- ActiveQueue = options.spinalQueue;
- }
- if (typeof ActiveQueue === 'undefined') {
- console.log('Error: You need to add a spinal queue to the project');
- console.log('Please add "micro-queue", "reactive-list" to the project');
- throw new Error('Please add "micro-queue", "reactive-list" or other spinalQueue compatible packages');
- }
- // Default is fifo lilo
- self.invocations = new ActiveQueue({
- //
- sort: (options && (options.filo || options.lifo)),
- reactive: self.reactive
- });
- //var self.invocations = new ReactiveList(queueOrder);
- // List of current tasks being processed
- self._processList = new ActiveQueue({
- reactive: self.reactive
- }); //ReactiveList();
- // Max number of simultanious tasks being processed
- self._maxProcessing = new ReactiveProperty(options && options.maxProcessing || 1, self.reactive);
- // Reactive number of tasks being processed
- self._isProcessing = new ReactiveProperty(0, self.reactive);
- // Boolean indicating if queue is paused or not
- self._paused = new ReactiveProperty((options && options.isPaused || false), self.reactive);
- // Boolean indicator for queue status active / running (can still be paused)
- self._running = new ReactiveProperty(false, self.reactive);
- // Counter for errors, errors are triggered if maxFailures is exeeded
- self._errors = new ReactiveProperty(0, self.reactive);
- // Counter for task failures, contains error count
- self._failures = new ReactiveProperty(0, self.reactive);
- // On failure jump to new task - if false the current task is rerun until error
- self._jumpOnFailure = (options && options.jumpOnFailure === false) ? false : true;
- // Count of all added tasks
- self._maxLength = new ReactiveProperty(0, self.reactive);
- // Boolean indicate whether or not a "add" task is allowed to start the queue
- self._autostart = new ReactiveProperty( ((options && options.autostart === false) ? false : true), self.reactive);
- // Limit times a task is allowed to fail and be rerun later before triggering an error
- self._maxFailures = new ReactiveProperty( (options && options.maxFailures || 5), self.reactive);
- // Name / title of this queue - Not used - should deprecate
- self.title = options && options.name || 'Queue';
- // debug - will print error / failures passed to next
- self.debug = !!(options && options.debug);
- /** @method PowerQueue.total
- * @reactive
- * @returns {number} The total number of tasks added to this queue
- */
- self.total = self._maxLength.get;
- /** @method PowerQueue.isPaused
- * @reactive
- * @returns {boolean} Status of the paused state of the queue
- */
- self.isPaused = self._paused.get;
- /** @method PowerQueue.processing
- * @reactive
- * @returns {number} Number of tasks currently being processed
- */
- self.processing = self._isProcessing.get;
- /** @method PowerQueue.errors
- * @reactive
- * @returns {number} The total number of errors
- * Errors are triggered when [maxFailures](PowerQueue.maxFailures) are exeeded
- */
- self.errors = self._errors.get;
- /** @method PowerQueue.failures
- * @reactive
- * @returns {number} The total number of failed tasks
- */
- self.failures = self._failures.get;
- /** @method PowerQueue.isRunning
- * @reactive
- * @returns {boolean} True if the queue is running
- * > NOTE: The task can be paused but marked as running
- */
- self.isRunning = self._running.get;
- /** @method PowerQueue.maxProcessing Get setter for maxProcessing
- * @param {number} [max] If not used this function works as a getter
- * @reactive
- * @returns {number} Maximum number of simultaneous processing tasks
- *
- * Example:
- * ```js
- * foo.maxProcessing(); // Works as a getter and returns the current value
- * foo.maxProcessing(20); // This sets the value to 20
- * ```
- */
- self.maxProcessing = self._maxProcessing.getset;
- self._maxProcessing.onChange = function() {
- // The user can change the max allowed processing tasks up or down here...
- // Update the throttle up
- self.updateThrottleUp();
- // Update the throttle down
- self.updateThrottleDown();
- };
- /** @method PowerQueue.autostart Get setter for autostart
- * @param {boolean} [autorun] If not used this function works as a getter
- * @reactive
- * @returns {boolean} If adding a task may trigger the queue to start
- *
- * Example:
- * ```js
- * foo.autostart(); // Works as a getter and returns the current value
- * foo.autostart(true); // This sets the value to true
- * ```
- */
- self.autostart = self._autostart.getset;
- /** @method PowerQueue.maxFailures Get setter for maxFailures
- * @param {number} [max] If not used this function works as a getter
- * @reactive
- * @returns {number} The maximum for failures pr. task before triggering an error
- *
- * Example:
- * ```js
- * foo.maxFailures(); // Works as a getter and returns the current value
- * foo.maxFailures(10); // This sets the value to 10
- * ```
- */
- self.maxFailures = self._maxFailures.getset;
- /** @callback PowerQueue.onPaused
- * Is called when queue is ended
- */
- self.onPaused = options && options.onPaused || function() {
- self.debug && console.log(self.title + ' ENDED');
- };
- /** @callback PowerQueue.onEnded
- * Is called when queue is ended
- */
- self.onEnded = options && options.onEnded || function() {
- self.debug && console.log(self.title + ' ENDED');
- };
- /** @callback PowerQueue.onRelease
- * Is called when queue is released
- */
- self.onRelease = options && options.onRelease || function() {
- self.debug && console.log(self.title + ' RELEASED');
- };
- /** @callback PowerQueue.onAutostart
- * Is called when queue is auto started
- */
- self.onAutostart = options && options.onAutostart || function() {
- self.debug && console.log(self.title + ' Autostart');
- };
- };
- /** @method PowerQueue.prototype.processList
- * @reactive
- * @returns {array} List of tasks currently being processed
- */
- PowerQueue.prototype.processingList = function() {
- var self = this;
- return self._processList.fetch();
- };
- /** @method PowerQueue.prototype.isHalted
- * @reactive
- * @returns {boolean} True if the queue is not running or paused
- */
- PowerQueue.prototype.isHalted = function() {
- var self = this;
- return (!self._running.get() || self._paused.get());
- };
- /** @method PowerQueue.prototype.length
- * @reactive
- * @returns {number} Number of tasks left in queue to be processed
- */
- PowerQueue.prototype.length = function() {
- var self = this;
- return self.invocations.length();
- };
- /** @method PowerQueue.prototype.progress
- * @reactive
- * @returns {number} 0 .. 100 % Indicates the status of the queue
- */
- PowerQueue.prototype.progress = function() {
- var self = this;
- var progress = self._maxLength.get() - self.invocations.length() - self._isProcessing.get();
- if (self._maxLength.value > 0) {
- return Math.round(progress / self._maxLength.value * 100);
- }
- return 0;
- };
- /** @method PowerQueue.prototype.usage
- * @reactive
- * @returns {number} 0 .. 100 % Indicates resource usage of the queue
- */
- PowerQueue.prototype.usage = function() {
- var self = this;
- return Math.round(self._isProcessing.get() / self._maxProcessing.get() * 100);
- };
- /** @method PowerQueue.prototype.reset Reset the queue
- * Calling this will:
- * * stop the queue
- * * paused to false
- * * Discart all queue data
- *
- * > NOTE: At the moment if the queue has processing tasks they can change
- * > the `errors` and `failures` counters. This could change in the future or
- * > be prevented by creating a whole new instance of the `PowerQueue`
- */
- PowerQueue.prototype.reset = function() {
- var self = this;
- self.debug && console.log(self.title + ' RESET');
- self._running.set(false);
- self._paused.set(false);
- self.invocations.reset();
- self._processList.reset();
- // // Loop through the processing tasks and reset these
- // self._processList.forEach(function(data) {
- // if (data.queue instanceof PowerQueue) {
- // data.queue.reset();
- // }
- // }, true);
- self._maxLength.set(0);
- self._failures.set(0);
- self._errors.set(0);
- };
- /** @method PowerQueue._autoStartTasks
- * @private
- *
- * This method defines the autostart algorithm that allows add task to trigger
- * a start of the queue if queue is not paused.
- */
- PowerQueue.prototype._autoStartTasks = function() {
- var self = this;
- // We dont start anything by ourselfs if queue is paused
- if (!self._paused.value) {
- // Queue is not running and we are set to autostart so we start the queue
- if (!self._running.value && self._autostart.value) {
- // Trigger callback / event
- self.onAutostart();
- // Set queue as running
- self._running.set(true);
- }
- // Make sure that we use all available resources
- if (self._running.value) {
- // Call next to start up the queue
- self.next(null);
- }
- }
- };
- /** @method PowerQueue.prototype.add
- * @param {any} data The task to be handled
- * @param {number} [failures] Used internally to Pass on number of failures.
- */
- PowerQueue.prototype.add = function(data, failures, id) {
- var self = this;
- // Assign new id to task
- var assignNewId = self._jumpOnFailure || typeof id === 'undefined';
- // Set the task id
- var taskId = (assignNewId) ? self._maxLength.value + 1 : id;
- // self.invocations.add({ _id: currentId, data: data, failures: failures || 0 }, reversed);
- self.invocations.insert(taskId, { _id: taskId, data: data, failures: failures || 0 });
- // If we assigned new id then increase length
- if (assignNewId) self._maxLength.inc();
- self._autoStartTasks();
- };
- /** @method PowerQueue.prototype.updateThrottleUp
- * @private
- *
- * Calling this method will update the throttle on the queue adding tasks.
- *
- * > Note: Currently we only support the PowerQueue - but we could support
- * > a more general interface for pauseable tasks or other usecases.
- */
- PowerQueue.prototype.updateThrottleUp = function() {
- var self = this;
- // How many additional tasks can we handle?
- var availableSlots = self._maxProcessing.value - self._isProcessing.value;
- // If we can handle more, we have more, we're running, and we're not paused
- if (!self._paused.value && self._running.value && availableSlots > 0 && self.invocations._length > 0) {
- // Increase counter of current number of tasks being processed
- self._isProcessing.inc();
- // Run task
- self.runTask(self.invocations.getFirstItem());
- // Repeat recursively; this is better than a for loop to avoid blocking the UI
- self.updateThrottleUp();
- }
- };
- /** @method PowerQueue.prototype.updateThrottleDown
- * @private
- *
- * Calling this method will update the throttle on the queue pause tasks.
- *
- * > Note: Currently we only support the PowerQueue - but we could support
- * > a more general interface for pauseable tasks or other usecases.
- */
- PowerQueue.prototype.updateThrottleDown = function() {
- var self = this;
- // Calculate the differece between acutuall processing tasks and target
- var diff = self._isProcessing.value - self._maxProcessing.value;
- // If the diff is more than 0 then we have many tasks processing.
- if (diff > 0) {
- // We pause the latest added tasks
- self._processList.forEachReverse(function(data) {
- if (diff > 0 && data.queue instanceof PowerQueue) {
- diff--;
- // We dont mind calling pause on multiple times on each task
- // theres a simple check going on preventing any duplicate actions
- data.queue.pause();
- }
- }, true);
- }
- };
- /** @method PowerQueue.prototype.next
- * @param {string} [err] Error message if task failed
- * > * Can pass in `null` to start the queue
- * > * Passing in a string to `next` will trigger a failure
- * > * Passing nothing will simply let the next task run
- * `next` is handed into the [taskHandler](PowerQueue.taskHandler) as a
- * callback to mark an error or end of current task
- */
- PowerQueue.prototype.next = function(err) {
- var self = this;
- // Primary concern is to throttle up because we are either:
- // 1. Starting the queue
- // 2. Starting next task
- //
- // This function does not shut down running tasks
- self.updateThrottleUp();
- // We are running, no tasks are being processed even we just updated the
- // throttle up and we got no errors.
- // 1. We are paused and releasing tasks
- // 2. We are done
- if (self._running.value && self._isProcessing.value === 0 && err !== null) {
- // We have no tasks processing so this queue is now releasing resources
- // this could be that the queue is paused or stopped, in that case the
- // self.invocations._length would be > 0
- // If on the other hand the self.invocations._length is 0 then we have no more
- // tasks in the queue so the queue has ended
- self.onRelease(self.invocations._length);
- if (!self.invocations._length) { // !self._paused.value &&
- // Check if queue is done working
- // Stop the queue
- self._running.set(false);
- // self.invocations.reset(); // This should be implicit
- self.onEnded();
- }
- }
- };
- /** @callback done
- * @param {Meteor.Error | Error | String | null} [feedback] This allows the task to communicate with the queue
- *
- * Explaination of `feedback`
- * * `Meteor.Error` This means that the task failed in a controlled manner and is allowed to rerun
- * * `Error` This will throw the passed error - as its an unitended error
- * * `null` The task is not done yet, rerun later
- * * `String` The task can perform certain commands on the queue
- * * "pause" - pause the queue
- * * "stop" - stop the queue
- * * "reset" - reset the queue
- * * "cancel" - cancel the queue
- *
- */
- /** @method PowerQueue.prototype.runTaskDone
- * @private
- * @param {Meteor.Error | Error | String | null} [feedback] This allows the task to communicate with the queue
- * @param {object} invocation
- *
- * > Note: `feedback` is explained in [Done callback](#done)
- *
- */
- // Rig the callback function
- PowerQueue.prototype.runTaskDone = function(feedback, invocation) {
- var self = this;
- // If the task handler throws an error then add it to the queue again
- // we allow this for a max of self._maxFailures
- // If the error is null then we add the task silently back into the
- // microQueue in reverse... This could be due to pause or throttling
- if (feedback instanceof Meteor.Error) {
- // We only count failures if maxFailures are above 0
- if (self._maxFailures.value > 0) invocation.failures++;
- self._failures.inc();
- // If the user has set the debug flag we print out failures/errors
- self.debug && console.error('Error: "' + self.title + '" ' + feedback.message + ', ' + feedback.stack);
- if (invocation.failures < self._maxFailures.value) {
- // Add the task again with the increased failures
- self.add(invocation.data, invocation.failures, invocation._id);
- } else {
- self._errors.inc();
- self.errorHandler(invocation.data, self.add, invocation.failures);
- }
- // If a error is thrown we assume its not intended
- } else if (feedback instanceof Error) throw feedback;
- if (feedback)
- // We use null to throttle pauseable tasks
- if (feedback === null) {
- // We add this task into the queue, no questions asked
- self.invocations.insert(invocation._id, { data: invocation.data, failures: invocation.failures, _id: invocation._id });
- }
- // If the user returns a string we got a command
- if (feedback === ''+feedback) {
- var command = {
- 'pause': function() { self.pause(); },
- 'stop': function() { self.stop(); },
- 'reset': function() { self.reset(); },
- 'cancel': function() { self.cancel(); },
- };
- if (typeof command[feedback] === 'function') {
- // Run the command on this queue
- command[feedback]();
- } else {
- // We dont recognize this command, throw an error
- throw new Error('Unknown queue command "' + feedback + '"');
- }
- }
- // Decrease the number of tasks being processed
- // make sure we dont go below 0
- if (self._isProcessing.value > 0) self._isProcessing.dec();
- // Task has ended we remove the task from the process list
- self._processList.remove(invocation._id);
- invocation.data = null;
- invocation.failures = null;
- invocation._id = null;
- invocation = null;
- delete invocation;
- // Next task
- Meteor.setTimeout(function() {
- self.next();
- }, 0);
- };
- /** @method PowerQueue.prototype.runTask
- * @private // This is not part of the open api
- * @param {object} invocation The object stored in the micro-queue
- */
- PowerQueue.prototype.runTask = function(invocation) {
- var self = this;
- // We start the fitting task handler
- // Currently we only support the PowerQueue but we could have a more general
- // interface for tasks that allow throttling
- try {
- if (invocation.data instanceof PowerQueue) {
- // Insert PowerQueue into process list
- self._processList.insert(invocation._id, { id: invocation._id, queue: invocation.data });
- // Handle task
- self.queueTaskHandler(invocation.data, function subQueueCallbackDone(feedback) {
- self.runTaskDone(feedback, invocation);
- }, invocation.failures);
- } else {
- // Insert task into process list
- self._processList.insert(invocation._id, invocation.data);
- // Handle task
- self.taskHandler(invocation.data, function taskCallbackDone(feedback) {
- self.runTaskDone(feedback, invocation);
- }, invocation.failures);
- }
- } catch(err) {
- throw new Error('Error while running taskHandler for queue, Error: ' + err.message);
- }
- };
- /** @method PowerQueue.prototype.queueTaskHandler
- * This method handles tasks that are sub queues
- */
- PowerQueue.prototype.queueTaskHandler = function(subQueue, next, failures) {
- var self = this;
- // Monitor sub queue task releases
- subQueue.onRelease = function(remaining) {
- // Ok, we were paused - this could be throttling so we respect this
- // So when the queue is halted we add it back into the main queue
- if (remaining > 0) {
- // We get out of the queue but dont repport error and add to run later
- next(null);
- } else {
- // Queue has ended
- // We simply trigger next task when the sub queue is complete
- next();
- // When running subqueues it doesnt make sense to track failures and retry
- // the sub queue - this is sub queue domain
- }
- };
- // Start the queue
- subQueue.run();
- };
- /** @callback PowerQueue.prototype.taskHandler
- * @param {any} data This can be data or functions
- * @param {function} next Function `next` call this to end task
- * @param {number} failures Number of failures on this task
- *
- * Default task handler expects functions as data:
- * ```js
- * self.taskHandler = function(data, next, failures) {
- * // This default task handler expects invocation to be a function to run
- * if (typeof data !== 'function') {
- * throw new Error('Default task handler expects a function');
- * }
- * try {
- * // Have the function call next
- * data(next, failures);
- * } catch(err) {
- * // Throw to fail this task
- * next(err);
- * }
- * };
- * ```
- */
- // Can be overwrittin by the user
- PowerQueue.prototype.taskHandler = function(data, next, failures) {
- var self = this;
- // This default task handler expects invocation to be a function to run
- if (typeof data !== 'function') {
- throw new Error('Default task handler expects a function');
- }
- try {
- // Have the function call next
- data(next, failures);
- } catch(err) {
- // Throw to fail this task
- next(err);
- }
- };
- /** @callback PowerQueue.prototype.errorHandler
- * @param {any} data This can be data or functions
- * @param {function} addTask Use this function to insert the data into the queue again
- * @param {number} failures Number of failures on this task
- *
- * The default callback:
- * ```js
- * var foo = new PowerQueue();
- *
- * // Overwrite the default action
- * foo.errorHandler = function(data, addTask, failures) {
- * // This could be overwritten the data contains the task data and addTask
- * // is a helper for adding the task to the queue
- * // try again: addTask(data);
- * // console.log('Terminate at ' + failures + ' failures');
- * };
- * ```
- */
- PowerQueue.prototype.errorHandler = function(data, addTask, failures) {
- var self = this;
- // This could be overwritten the data contains the task data and addTask
- // is a helper for adding the task to the queue
- // try again: addTask(data);
- self.debug && console.log('Terminate at ' + failures + ' failures');
- };
- /** @method PowerQueue.prototype.pause Pause the queue
- * @todo We should have it pause all processing tasks
- */
- PowerQueue.prototype.pause = function() {
- var self = this;
- if (!self._paused.value) {
- self._paused.set(true);
- // Loop through the processing tasks and pause these
- self._processList.forEach(function(data) {
- if (data.queue instanceof PowerQueue) {
- // Pause the sub queue
- data.queue.pause();
- }
- }, true);
- // Trigger callback
- self.onPaused();
- }
- };
- /** @method PowerQueue.prototype.resume Start a paused queue
- * @todo We should have it resume all processing tasks
- *
- * > This will not start a stopped queue
- */
- PowerQueue.prototype.resume = function() {
- var self = this;
- self.run();
- };
- /** @method PowerQueue.prototype.run Starts the queue
- * > Using this command will resume a paused queue and will
- * > start a stopped queue.
- */
- PowerQueue.prototype.run = function() {
- var self = this;
- //not paused and already running or queue empty or paused subqueues
- if (!self._paused.value && self._running.value || !self.invocations._length) {
- return;
- }
- self._paused.set(false);
- self._running.set(true);
- self.next(null);
- };
- /** @method PowerQueue.prototype.stop Stops the queue
- */
- PowerQueue.prototype.stop = function() {
- var self = this;
- self._running.set(false);
- };
- /** @method PowerQueue.prototype.cancel Cancel the queue
- */
- PowerQueue.prototype.cancel = function() {
- var self = this;
- self.reset();
- };
|