| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260 | /* * HTTP Upload Transfer Queue */// 2MB default upload chunk size// Can be overridden by user with FS.config.uploadChunkSize or per FS.Collection in collection optionsvar defaultChunkSize = 2 * 1024 * 1024;/** * @private * @param {Object} task * @param {Function} next * @return {undefined} */var _taskHandler = function(task, next) {  FS.debug && console.log("uploading chunk " + task.chunk + ", bytes " + task.start + " to " + Math.min(task.end, task.fileObj.size()) + " of " + task.fileObj.size());  task.fileObj.data.getBinary(task.start, task.end, function gotBinaryCallback(err, data) {    if (err) {      next(new Meteor.Error(err.error, err.message));    } else {      FS.debug && console.log('PUT to URL', task.url, task.urlParams);      HTTP.call("PUT", task.url, {        params: FS.Utility.extend({chunk: task.chunk}, task.urlParams),        content: data,        headers: {          'Content-Type': task.fileObj.type()        }      }, function(error, result) {        task = null;        if (error) {          next(new Meteor.Error(error.error, error.message));        } else {          next();        }      });    }  });};/** * @private * @param {Object} data * @param {Function} addTask * @return {undefined} */var _errorHandler = function(data, addTask, failures) {  // If file upload fails  // TODO We should retry a few times and then emit error?  // data.fileObj.emit("error", error);};/** @method UploadTransferQueue * @namespace UploadTransferQueue * @constructor * @param {Object} [options] */UploadTransferQueue = function(options) {  // Rig options  options = options || {};  // Init the power queue  var self = new PowerQueue({    name: 'HTTPUploadTransferQueue',    // spinalQueue: ReactiveList,    maxProcessing: 1,    maxFailures: 5,    jumpOnFailure: true,    autostart: true,    isPaused: false,    filo: false,    debug: FS.debug  });  // Keep track of uploaded files via this queue  self.files = {};  // cancel maps onto queue reset  self.cancel = self.reset;  /**    * @method UploadTransferQueue.isUploadingFile    * @param {FS.File} fileObj File to check if uploading    * @returns {Boolean} True if the file is uploading    *    * @todo Maybe have a similar function for accessing the file upload queue?    */  self.isUploadingFile = function(fileObj) {    // Check if file is already in queue    return !!(fileObj && fileObj._id && fileObj.collectionName && (self.files[fileObj.collectionName] || {})[fileObj._id]);  };  /** @method UploadTransferQueue.resumeUploadingFile   * @param {FS.File} File to resume uploading   * @todo Not sure if this is the best way to handle resumes   */  self.resumeUploadingFile = function(fileObj) {    // Make sure we are handed a FS.File    if (!(fileObj instanceof FS.File)) {      throw new Error('Transfer queue expects a FS.File');    }    if (fileObj.isMounted()) {      // This might still be true, preventing upload, if      // there was a server restart without client restart.      self.files[fileObj.collectionName] = self.files[fileObj.collectionName] || {};      self.files[fileObj.collectionName][fileObj._id] = false;      // Kick off normal upload      self.uploadFile(fileObj);    }  };  /** @method UploadTransferQueue.uploadFile   * @param {FS.File} File to upload   * @todo Check that a file can only be added once - maybe a visual helper on the FS.File?   * @todo Have an initial request to the server getting uploaded chunks for resume   */  self.uploadFile = function(fileObj) {    FS.debug && console.log("HTTP uploadFile");    // Make sure we are handed a FS.File    if (!(fileObj instanceof FS.File)) {      throw new Error('Transfer queue expects a FS.File');    }    // Make sure that we have size as number    if (typeof fileObj.size() !== 'number') {      throw new Error('TransferQueue upload failed: fileObj size not set');    }    // We don't add the file if it's already in transfer or if already uploaded    if (self.isUploadingFile(fileObj) || fileObj.isUploaded()) {      return;    }    // Make sure the file object is mounted on a collection    if (fileObj.isMounted()) {      var collectionName = fileObj.collectionName;      var id = fileObj._id;      // Set the chunkSize to match the collection options, or global config, or default      fileObj.chunkSize = fileObj.collection.options.chunkSize || FS.config.uploadChunkSize || defaultChunkSize;      // Set counter for uploaded chunks      fileObj.chunkCount = 0;      // Calc the number of chunks      fileObj.chunkSum = Math.ceil(fileObj.size() / fileObj.chunkSize);      if (fileObj.chunkSum === 0)        return;      // Update the filerecord      // TODO eventually we should be able to do this without storing any chunk info in the filerecord      fileObj.update({$set: {chunkSize: fileObj.chunkSize, chunkCount: fileObj.chunkCount, chunkSum: fileObj.chunkSum}});      // Create a sub queue      var chunkQueue = new PowerQueue({        onEnded: function oneChunkQueueEnded() {          // Remove from list of files being uploaded          self.files[collectionName][id] = false;          // XXX It might be possible for this to be called even though there were errors uploading?          fileObj.emit("uploaded");        },        spinalQueue: ReactiveList,        maxProcessing: 1,        maxFailures: 5,        jumpOnFailure: true,        autostart: false,        isPaused: false,        filo: false      });      // Rig the custom task handler      chunkQueue.taskHandler = _taskHandler;      // Rig the error handler      chunkQueue.errorHandler = _errorHandler;      // Set flag that this file is being transfered      self.files[collectionName] = self.files[collectionName] || {};      self.files[collectionName][id] = true;      // Construct URL      var url = FS.HTTP.uploadUrl + '/' + collectionName;      if (id) {        url += '/' + id;      }      // TODO: Could we somehow figure out if the collection requires login?      var authToken = '';      if (typeof Accounts !== "undefined") {        var authObject = {          authToken: Accounts._storedLoginToken() || '',        };        // Set the authToken        var authString = JSON.stringify(authObject);        authToken = FS.Utility.btoa(authString);      }      // Construct query string      var urlParams = {        filename: fileObj.name()      };      if (authToken !== '') {        urlParams.token = authToken;      }      // Add chunk upload tasks      for (var chunk = 0, start; chunk < fileObj.chunkSum; chunk++) {        start = chunk * fileObj.chunkSize;        // Create and add the task        // XXX should we somehow make sure we haven't uploaded this chunk already, in        // case we are resuming?        chunkQueue.add({          chunk: chunk,          name: fileObj.name(),          url: url,          urlParams: urlParams,          fileObj: fileObj,          start: start,          end: (chunk + 1) * fileObj.chunkSize        });      }      // Add the queue to the main upload queue      self.add(chunkQueue);    }  };  return self;};/** * @namespace FS * @type UploadTransferQueue * * There is a single uploads transfer queue per client (not per CFS) */FS.HTTP.uploadQueue = new UploadTransferQueue();/* * FS.File extensions *//** * @method FS.File.prototype.resume * @public * @param {File|Blob|Buffer} ref * @todo WIP, Not yet implemented for server * * > This function is not yet implemented for server */FS.File.prototype.resume = function(ref) {  var self = this;  FS.uploadQueue.resumeUploadingFile(self);};
 |