| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260 | 
							- /*
 
-  * HTTP Upload Transfer Queue
 
-  */
 
- // 2MB default upload chunk size
 
- // Can be overridden by user with FS.config.uploadChunkSize or per FS.Collection in collection options
 
- var defaultChunkSize = 2 * 1024 * 1024;
 
- /**
 
-  * @private
 
-  * @param {Object} task
 
-  * @param {Function} next
 
-  * @return {undefined}
 
-  */
 
- var _taskHandler = function(task, next) {
 
-   FS.debug && console.log("uploading chunk " + task.chunk + ", bytes " + task.start + " to " + Math.min(task.end, task.fileObj.size()) + " of " + task.fileObj.size());
 
-   task.fileObj.data.getBinary(task.start, task.end, function gotBinaryCallback(err, data) {
 
-     if (err) {
 
-       next(new Meteor.Error(err.error, err.message));
 
-     } else {
 
-       FS.debug && console.log('PUT to URL', task.url, task.urlParams);
 
-       HTTP.call("PUT", task.url, {
 
-         params: FS.Utility.extend({chunk: task.chunk}, task.urlParams),
 
-         content: data,
 
-         headers: {
 
-           'Content-Type': task.fileObj.type()
 
-         }
 
-       }, function(error, result) {
 
-         task = null;
 
-         if (error) {
 
-           next(new Meteor.Error(error.error, error.message));
 
-         } else {
 
-           next();
 
-         }
 
-       });
 
-     }
 
-   });
 
- };
 
- /**
 
-  * @private
 
-  * @param {Object} data
 
-  * @param {Function} addTask
 
-  * @return {undefined}
 
-  */
 
- var _errorHandler = function(data, addTask, failures) {
 
-   // If file upload fails
 
-   // TODO We should retry a few times and then emit error?
 
-   // data.fileObj.emit("error", error);
 
- };
 
- /** @method UploadTransferQueue
 
-  * @namespace UploadTransferQueue
 
-  * @constructor
 
-  * @param {Object} [options]
 
-  */
 
- UploadTransferQueue = function(options) {
 
-   // Rig options
 
-   options = options || {};
 
-   // Init the power queue
 
-   var self = new PowerQueue({
 
-     name: 'HTTPUploadTransferQueue',
 
-     // spinalQueue: ReactiveList,
 
-     maxProcessing: 1,
 
-     maxFailures: 5,
 
-     jumpOnFailure: true,
 
-     autostart: true,
 
-     isPaused: false,
 
-     filo: false,
 
-     debug: FS.debug
 
-   });
 
-   // Keep track of uploaded files via this queue
 
-   self.files = {};
 
-   // cancel maps onto queue reset
 
-   self.cancel = self.reset;
 
-   /**
 
-     * @method UploadTransferQueue.isUploadingFile
 
-     * @param {FS.File} fileObj File to check if uploading
 
-     * @returns {Boolean} True if the file is uploading
 
-     *
 
-     * @todo Maybe have a similar function for accessing the file upload queue?
 
-     */
 
-   self.isUploadingFile = function(fileObj) {
 
-     // Check if file is already in queue
 
-     return !!(fileObj && fileObj._id && fileObj.collectionName && (self.files[fileObj.collectionName] || {})[fileObj._id]);
 
-   };
 
-   /** @method UploadTransferQueue.resumeUploadingFile
 
-    * @param {FS.File} File to resume uploading
 
-    * @todo Not sure if this is the best way to handle resumes
 
-    */
 
-   self.resumeUploadingFile = function(fileObj) {
 
-     // Make sure we are handed a FS.File
 
-     if (!(fileObj instanceof FS.File)) {
 
-       throw new Error('Transfer queue expects a FS.File');
 
-     }
 
-     if (fileObj.isMounted()) {
 
-       // This might still be true, preventing upload, if
 
-       // there was a server restart without client restart.
 
-       self.files[fileObj.collectionName] = self.files[fileObj.collectionName] || {};
 
-       self.files[fileObj.collectionName][fileObj._id] = false;
 
-       // Kick off normal upload
 
-       self.uploadFile(fileObj);
 
-     }
 
-   };
 
-   /** @method UploadTransferQueue.uploadFile
 
-    * @param {FS.File} File to upload
 
-    * @todo Check that a file can only be added once - maybe a visual helper on the FS.File?
 
-    * @todo Have an initial request to the server getting uploaded chunks for resume
 
-    */
 
-   self.uploadFile = function(fileObj) {
 
-     FS.debug && console.log("HTTP uploadFile");
 
-     // Make sure we are handed a FS.File
 
-     if (!(fileObj instanceof FS.File)) {
 
-       throw new Error('Transfer queue expects a FS.File');
 
-     }
 
-     // Make sure that we have size as number
 
-     if (typeof fileObj.size() !== 'number') {
 
-       throw new Error('TransferQueue upload failed: fileObj size not set');
 
-     }
 
-     // We don't add the file if it's already in transfer or if already uploaded
 
-     if (self.isUploadingFile(fileObj) || fileObj.isUploaded()) {
 
-       return;
 
-     }
 
-     // Make sure the file object is mounted on a collection
 
-     if (fileObj.isMounted()) {
 
-       var collectionName = fileObj.collectionName;
 
-       var id = fileObj._id;
 
-       // Set the chunkSize to match the collection options, or global config, or default
 
-       fileObj.chunkSize = fileObj.collection.options.chunkSize || FS.config.uploadChunkSize || defaultChunkSize;
 
-       // Set counter for uploaded chunks
 
-       fileObj.chunkCount = 0;
 
-       // Calc the number of chunks
 
-       fileObj.chunkSum = Math.ceil(fileObj.size() / fileObj.chunkSize);
 
-       if (fileObj.chunkSum === 0)
 
-         return;
 
-       // Update the filerecord
 
-       // TODO eventually we should be able to do this without storing any chunk info in the filerecord
 
-       fileObj.update({$set: {chunkSize: fileObj.chunkSize, chunkCount: fileObj.chunkCount, chunkSum: fileObj.chunkSum}});
 
-       // Create a sub queue
 
-       var chunkQueue = new PowerQueue({
 
-         onEnded: function oneChunkQueueEnded() {
 
-           // Remove from list of files being uploaded
 
-           self.files[collectionName][id] = false;
 
-           // XXX It might be possible for this to be called even though there were errors uploading?
 
-           fileObj.emit("uploaded");
 
-         },
 
-         spinalQueue: ReactiveList,
 
-         maxProcessing: 1,
 
-         maxFailures: 5,
 
-         jumpOnFailure: true,
 
-         autostart: false,
 
-         isPaused: false,
 
-         filo: false
 
-       });
 
-       // Rig the custom task handler
 
-       chunkQueue.taskHandler = _taskHandler;
 
-       // Rig the error handler
 
-       chunkQueue.errorHandler = _errorHandler;
 
-       // Set flag that this file is being transfered
 
-       self.files[collectionName] = self.files[collectionName] || {};
 
-       self.files[collectionName][id] = true;
 
-       // Construct URL
 
-       var url = FS.HTTP.uploadUrl + '/' + collectionName;
 
-       if (id) {
 
-         url += '/' + id;
 
-       }
 
-       // TODO: Could we somehow figure out if the collection requires login?
 
-       var authToken = '';
 
-       if (typeof Accounts !== "undefined") {
 
-         var authObject = {
 
-           authToken: Accounts._storedLoginToken() || '',
 
-         };
 
-         // Set the authToken
 
-         var authString = JSON.stringify(authObject);
 
-         authToken = FS.Utility.btoa(authString);
 
-       }
 
-       // Construct query string
 
-       var urlParams = {
 
-         filename: fileObj.name()
 
-       };
 
-       if (authToken !== '') {
 
-         urlParams.token = authToken;
 
-       }
 
-       // Add chunk upload tasks
 
-       for (var chunk = 0, start; chunk < fileObj.chunkSum; chunk++) {
 
-         start = chunk * fileObj.chunkSize;
 
-         // Create and add the task
 
-         // XXX should we somehow make sure we haven't uploaded this chunk already, in
 
-         // case we are resuming?
 
-         chunkQueue.add({
 
-           chunk: chunk,
 
-           name: fileObj.name(),
 
-           url: url,
 
-           urlParams: urlParams,
 
-           fileObj: fileObj,
 
-           start: start,
 
-           end: (chunk + 1) * fileObj.chunkSize
 
-         });
 
-       }
 
-       // Add the queue to the main upload queue
 
-       self.add(chunkQueue);
 
-     }
 
-   };
 
-   return self;
 
- };
 
- /**
 
-  * @namespace FS
 
-  * @type UploadTransferQueue
 
-  *
 
-  * There is a single uploads transfer queue per client (not per CFS)
 
-  */
 
- FS.HTTP.uploadQueue = new UploadTransferQueue();
 
- /*
 
-  * FS.File extensions
 
-  */
 
- /**
 
-  * @method FS.File.prototype.resume
 
-  * @public
 
-  * @param {File|Blob|Buffer} ref
 
-  * @todo WIP, Not yet implemented for server
 
-  *
 
-  * > This function is not yet implemented for server
 
-  */
 
- FS.File.prototype.resume = function(ref) {
 
-   var self = this;
 
-   FS.uploadQueue.resumeUploadingFile(self);
 
- };
 
 
  |