fileWorker.js 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. //// TODO: Use power queue to handle throttling etc.
  2. //// Use observe to monitor changes and have it create tasks for the power queue
  3. //// to perform.
  4. /**
  5. * @public
  6. * @type Object
  7. */
  8. FS.FileWorker = {};
  9. /**
  10. * @method FS.FileWorker.observe
  11. * @public
  12. * @param {FS.Collection} fsCollection
  13. * @returns {undefined}
  14. *
  15. * Sets up observes on the fsCollection to store file copies and delete
  16. * temp files at the appropriate times.
  17. */
  18. FS.FileWorker.observe = function(fsCollection) {
  19. // Initiate observe for finding newly uploaded/added files that need to be stored
  20. // per store.
  21. FS.Utility.each(fsCollection.options.stores, function(store) {
  22. var storeName = store.name;
  23. fsCollection.files.find(getReadyQuery(storeName), {
  24. fields: {
  25. copies: 0
  26. }
  27. }).observe({
  28. added: function(fsFile) {
  29. // added will catch fresh files
  30. FS.debug && console.log("FileWorker ADDED - calling saveCopy", storeName, "for", fsFile._id);
  31. saveCopy(fsFile, storeName);
  32. },
  33. changed: function(fsFile) {
  34. // changed will catch failures and retry them
  35. FS.debug && console.log("FileWorker CHANGED - calling saveCopy", storeName, "for", fsFile._id);
  36. saveCopy(fsFile, storeName);
  37. }
  38. });
  39. });
  40. // Initiate observe for finding files that have been stored so we can delete
  41. // any temp files
  42. fsCollection.files.find(getDoneQuery(fsCollection.options.stores)).observe({
  43. added: function(fsFile) {
  44. FS.debug && console.log("FileWorker ADDED - calling deleteChunks for", fsFile._id);
  45. try {
  46. FS.TempStore.removeFile(fsFile);
  47. } catch(err) {
  48. console.error(err);
  49. }
  50. }
  51. });
  52. // Initiate observe for catching files that have been removed and
  53. // removing the data from all stores as well
  54. fsCollection.files.find().observe({
  55. removed: function(fsFile) {
  56. FS.debug && console.log('FileWorker REMOVED - removing all stored data for', fsFile._id);
  57. //remove from temp store
  58. FS.TempStore.removeFile(fsFile);
  59. //delete from all stores
  60. FS.Utility.each(fsCollection.options.stores, function(storage) {
  61. storage.adapter.remove(fsFile);
  62. });
  63. }
  64. });
  65. };
  66. /**
  67. * @method getReadyQuery
  68. * @private
  69. * @param {string} storeName - The name of the store to observe
  70. *
  71. * Returns a selector that will be used to identify files that
  72. * have been uploaded but have not yet been stored to the
  73. * specified store.
  74. *
  75. * {
  76. * uploadedAt: {$exists: true},
  77. * 'copies.storeName`: null,
  78. * 'failures.copies.storeName.doneTrying': {$ne: true}
  79. * }
  80. */
  81. function getReadyQuery(storeName) {
  82. var selector = {uploadedAt: {$exists: true}};
  83. selector['copies.' + storeName] = null;
  84. selector['failures.copies.' + storeName + '.doneTrying'] = {$ne: true};
  85. return selector;
  86. }
  87. /**
  88. * @method getDoneQuery
  89. * @private
  90. * @param {Array} stores - The stores array from the FS.Collection options
  91. *
  92. * Returns a selector that will be used to identify files where all
  93. * stores have successfully save or have failed the
  94. * max number of times but still have chunks. The resulting selector
  95. * should be something like this:
  96. *
  97. * {
  98. * $and: [
  99. * {chunks: {$exists: true}},
  100. * {
  101. * $or: [
  102. * {
  103. * $and: [
  104. * {
  105. * 'copies.storeName': {$ne: null}
  106. * },
  107. * {
  108. * 'copies.storeName': {$ne: false}
  109. * }
  110. * ]
  111. * },
  112. * {
  113. * 'failures.copies.storeName.doneTrying': true
  114. * }
  115. * ]
  116. * },
  117. * REPEATED FOR EACH STORE
  118. * ]
  119. * }
  120. *
  121. */
  122. function getDoneQuery(stores) {
  123. var selector = {
  124. $and: [{chunks: {$exists: true}}]
  125. };
  126. // Add conditions for all defined stores
  127. FS.Utility.each(stores, function(store) {
  128. var storeName = store.name;
  129. var copyCond = {$or: [{$and: []}]};
  130. var tempCond = {};
  131. tempCond["copies." + storeName] = {$ne: null};
  132. copyCond.$or[0].$and.push(tempCond);
  133. tempCond = {};
  134. tempCond["copies." + storeName] = {$ne: false};
  135. copyCond.$or[0].$and.push(tempCond);
  136. tempCond = {};
  137. tempCond['failures.copies.' + storeName + '.doneTrying'] = true;
  138. copyCond.$or.push(tempCond);
  139. selector.$and.push(copyCond);
  140. })
  141. return selector;
  142. }
  143. /**
  144. * @method saveCopy
  145. * @private
  146. * @param {FS.File} fsFile
  147. * @param {string} storeName
  148. * @param {Object} options
  149. * @param {Boolean} [options.overwrite=false] - Force save to the specified store?
  150. * @returns {undefined}
  151. *
  152. * Saves to the specified store. If the
  153. * `overwrite` option is `true`, will save to the store even if we already
  154. * have, potentially overwriting any previously saved data. Synchronous.
  155. */
  156. function saveCopy(fsFile, storeName, options) {
  157. options = options || {};
  158. var storage = FS.StorageAdapter(storeName);
  159. if (!storage) {
  160. throw new Error('No store named "' + storeName + '" exists');
  161. }
  162. FS.debug && console.log('saving to store ' + storeName);
  163. try {
  164. var writeStream = storage.adapter.createWriteStream(fsFile);
  165. var readStream = FS.TempStore.createReadStream(fsFile);
  166. // Pipe the temp data into the storage adapter
  167. readStream.pipe(writeStream);
  168. } catch(err) {
  169. console.error(err);
  170. }
  171. }