upload-http-client.js 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. /*
  2. * HTTP Upload Transfer Queue
  3. */
  4. // 2MB default upload chunk size
  5. // Can be overridden by user with FS.config.uploadChunkSize or per FS.Collection in collection options
  6. var defaultChunkSize = 2 * 1024 * 1024;
  7. /**
  8. * @private
  9. * @param {Object} task
  10. * @param {Function} next
  11. * @return {undefined}
  12. */
  13. var _taskHandler = function(task, next) {
  14. FS.debug && console.log("uploading chunk " + task.chunk + ", bytes " + task.start + " to " + Math.min(task.end, task.fileObj.size()) + " of " + task.fileObj.size());
  15. task.fileObj.data.getBinary(task.start, task.end, function gotBinaryCallback(err, data) {
  16. if (err) {
  17. next(new Meteor.Error(err.error, err.message));
  18. } else {
  19. FS.debug && console.log('PUT to URL', task.url, task.urlParams);
  20. HTTP.call("PUT", task.url, {
  21. params: FS.Utility.extend({chunk: task.chunk}, task.urlParams),
  22. content: data,
  23. headers: {
  24. 'Content-Type': task.fileObj.type()
  25. }
  26. }, function(error, result) {
  27. task = null;
  28. if (error) {
  29. next(new Meteor.Error(error.error, error.message));
  30. } else {
  31. next();
  32. }
  33. });
  34. }
  35. });
  36. };
  37. /**
  38. * @private
  39. * @param {Object} data
  40. * @param {Function} addTask
  41. * @return {undefined}
  42. */
  43. var _errorHandler = function(data, addTask, failures) {
  44. // If file upload fails
  45. // TODO We should retry a few times and then emit error?
  46. // data.fileObj.emit("error", error);
  47. };
  48. /** @method UploadTransferQueue
  49. * @namespace UploadTransferQueue
  50. * @constructor
  51. * @param {Object} [options]
  52. */
  53. UploadTransferQueue = function(options) {
  54. // Rig options
  55. options = options || {};
  56. // Init the power queue
  57. var self = new PowerQueue({
  58. name: 'HTTPUploadTransferQueue',
  59. // spinalQueue: ReactiveList,
  60. maxProcessing: 1,
  61. maxFailures: 5,
  62. jumpOnFailure: true,
  63. autostart: true,
  64. isPaused: false,
  65. filo: false,
  66. debug: FS.debug
  67. });
  68. // Keep track of uploaded files via this queue
  69. self.files = {};
  70. // cancel maps onto queue reset
  71. self.cancel = self.reset;
  72. /**
  73. * @method UploadTransferQueue.isUploadingFile
  74. * @param {FS.File} fileObj File to check if uploading
  75. * @returns {Boolean} True if the file is uploading
  76. *
  77. * @todo Maybe have a similar function for accessing the file upload queue?
  78. */
  79. self.isUploadingFile = function(fileObj) {
  80. // Check if file is already in queue
  81. return !!(fileObj && fileObj._id && fileObj.collectionName && (self.files[fileObj.collectionName] || {})[fileObj._id]);
  82. };
  83. /** @method UploadTransferQueue.resumeUploadingFile
  84. * @param {FS.File} File to resume uploading
  85. * @todo Not sure if this is the best way to handle resumes
  86. */
  87. self.resumeUploadingFile = function(fileObj) {
  88. // Make sure we are handed a FS.File
  89. if (!(fileObj instanceof FS.File)) {
  90. throw new Error('Transfer queue expects a FS.File');
  91. }
  92. if (fileObj.isMounted()) {
  93. // This might still be true, preventing upload, if
  94. // there was a server restart without client restart.
  95. self.files[fileObj.collectionName] = self.files[fileObj.collectionName] || {};
  96. self.files[fileObj.collectionName][fileObj._id] = false;
  97. // Kick off normal upload
  98. self.uploadFile(fileObj);
  99. }
  100. };
  101. /** @method UploadTransferQueue.uploadFile
  102. * @param {FS.File} File to upload
  103. * @todo Check that a file can only be added once - maybe a visual helper on the FS.File?
  104. * @todo Have an initial request to the server getting uploaded chunks for resume
  105. */
  106. self.uploadFile = function(fileObj) {
  107. FS.debug && console.log("HTTP uploadFile");
  108. // Make sure we are handed a FS.File
  109. if (!(fileObj instanceof FS.File)) {
  110. throw new Error('Transfer queue expects a FS.File');
  111. }
  112. // Make sure that we have size as number
  113. if (typeof fileObj.size() !== 'number') {
  114. throw new Error('TransferQueue upload failed: fileObj size not set');
  115. }
  116. // We don't add the file if it's already in transfer or if already uploaded
  117. if (self.isUploadingFile(fileObj) || fileObj.isUploaded()) {
  118. return;
  119. }
  120. // Make sure the file object is mounted on a collection
  121. if (fileObj.isMounted()) {
  122. var collectionName = fileObj.collectionName;
  123. var id = fileObj._id;
  124. // Set the chunkSize to match the collection options, or global config, or default
  125. fileObj.chunkSize = fileObj.collection.options.chunkSize || FS.config.uploadChunkSize || defaultChunkSize;
  126. // Set counter for uploaded chunks
  127. fileObj.chunkCount = 0;
  128. // Calc the number of chunks
  129. fileObj.chunkSum = Math.ceil(fileObj.size() / fileObj.chunkSize);
  130. if (fileObj.chunkSum === 0)
  131. return;
  132. // Update the filerecord
  133. // TODO eventually we should be able to do this without storing any chunk info in the filerecord
  134. fileObj.update({$set: {chunkSize: fileObj.chunkSize, chunkCount: fileObj.chunkCount, chunkSum: fileObj.chunkSum}});
  135. // Create a sub queue
  136. var chunkQueue = new PowerQueue({
  137. onEnded: function oneChunkQueueEnded() {
  138. // Remove from list of files being uploaded
  139. self.files[collectionName][id] = false;
  140. // XXX It might be possible for this to be called even though there were errors uploading?
  141. fileObj.emit("uploaded");
  142. },
  143. spinalQueue: ReactiveList,
  144. maxProcessing: 1,
  145. maxFailures: 5,
  146. jumpOnFailure: true,
  147. autostart: false,
  148. isPaused: false,
  149. filo: false
  150. });
  151. // Rig the custom task handler
  152. chunkQueue.taskHandler = _taskHandler;
  153. // Rig the error handler
  154. chunkQueue.errorHandler = _errorHandler;
  155. // Set flag that this file is being transfered
  156. self.files[collectionName] = self.files[collectionName] || {};
  157. self.files[collectionName][id] = true;
  158. // Construct URL
  159. var url = FS.HTTP.uploadUrl + '/' + collectionName;
  160. if (id) {
  161. url += '/' + id;
  162. }
  163. // TODO: Could we somehow figure out if the collection requires login?
  164. var authToken = '';
  165. if (typeof Accounts !== "undefined") {
  166. var authObject = {
  167. authToken: Accounts._storedLoginToken() || '',
  168. };
  169. // Set the authToken
  170. var authString = JSON.stringify(authObject);
  171. authToken = FS.Utility.btoa(authString);
  172. }
  173. // Construct query string
  174. var urlParams = {
  175. filename: fileObj.name()
  176. };
  177. if (authToken !== '') {
  178. urlParams.token = authToken;
  179. }
  180. // Add chunk upload tasks
  181. for (var chunk = 0, start; chunk < fileObj.chunkSum; chunk++) {
  182. start = chunk * fileObj.chunkSize;
  183. // Create and add the task
  184. // XXX should we somehow make sure we haven't uploaded this chunk already, in
  185. // case we are resuming?
  186. chunkQueue.add({
  187. chunk: chunk,
  188. name: fileObj.name(),
  189. url: url,
  190. urlParams: urlParams,
  191. fileObj: fileObj,
  192. start: start,
  193. end: (chunk + 1) * fileObj.chunkSize
  194. });
  195. }
  196. // Add the queue to the main upload queue
  197. self.add(chunkQueue);
  198. }
  199. };
  200. return self;
  201. };
  202. /**
  203. * @namespace FS
  204. * @type UploadTransferQueue
  205. *
  206. * There is a single uploads transfer queue per client (not per CFS)
  207. */
  208. FS.HTTP.uploadQueue = new UploadTransferQueue();
  209. /*
  210. * FS.File extensions
  211. */
  212. /**
  213. * @method FS.File.prototype.resume
  214. * @public
  215. * @param {File|Blob|Buffer} ref
  216. * @todo WIP, Not yet implemented for server
  217. *
  218. * > This function is not yet implemented for server
  219. */
  220. FS.File.prototype.resume = function(ref) {
  221. var self = this;
  222. FS.uploadQueue.resumeUploadingFile(self);
  223. };