123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 |
- //// TODO: Use power queue to handle throttling etc.
- //// Use observe to monitor changes and have it create tasks for the power queue
- //// to perform.
- /**
- * @public
- * @type Object
- */
- FS.FileWorker = {};
- /**
- * @method FS.FileWorker.observe
- * @public
- * @param {FS.Collection} fsCollection
- * @returns {undefined}
- *
- * Sets up observes on the fsCollection to store file copies and delete
- * temp files at the appropriate times.
- */
- FS.FileWorker.observe = function(fsCollection) {
- // Initiate observe for finding newly uploaded/added files that need to be stored
- // per store.
- FS.Utility.each(fsCollection.options.stores, function(store) {
- var storeName = store.name;
- fsCollection.files.find(getReadyQuery(storeName), {
- fields: {
- copies: 0
- }
- }).observe({
- added: function(fsFile) {
- // added will catch fresh files
- FS.debug && console.log("FileWorker ADDED - calling saveCopy", storeName, "for", fsFile._id);
- saveCopy(fsFile, storeName);
- },
- changed: function(fsFile) {
- // changed will catch failures and retry them
- FS.debug && console.log("FileWorker CHANGED - calling saveCopy", storeName, "for", fsFile._id);
- saveCopy(fsFile, storeName);
- }
- });
- });
- // Initiate observe for finding files that have been stored so we can delete
- // any temp files
- fsCollection.files.find(getDoneQuery(fsCollection.options.stores)).observe({
- added: function(fsFile) {
- FS.debug && console.log("FileWorker ADDED - calling deleteChunks for", fsFile._id);
- try {
- FS.TempStore.removeFile(fsFile);
- } catch(err) {
- console.error(err);
- }
- }
- });
- // Initiate observe for catching files that have been removed and
- // removing the data from all stores as well
- fsCollection.files.find().observe({
- removed: function(fsFile) {
- FS.debug && console.log('FileWorker REMOVED - removing all stored data for', fsFile._id);
- //remove from temp store
- FS.TempStore.removeFile(fsFile);
- //delete from all stores
- FS.Utility.each(fsCollection.options.stores, function(storage) {
- storage.adapter.remove(fsFile);
- });
- }
- });
- };
- /**
- * @method getReadyQuery
- * @private
- * @param {string} storeName - The name of the store to observe
- *
- * Returns a selector that will be used to identify files that
- * have been uploaded but have not yet been stored to the
- * specified store.
- *
- * {
- * uploadedAt: {$exists: true},
- * 'copies.storeName`: null,
- * 'failures.copies.storeName.doneTrying': {$ne: true}
- * }
- */
- function getReadyQuery(storeName) {
- var selector = {uploadedAt: {$exists: true}};
- selector['copies.' + storeName] = null;
- selector['failures.copies.' + storeName + '.doneTrying'] = {$ne: true};
- return selector;
- }
- /**
- * @method getDoneQuery
- * @private
- * @param {Array} stores - The stores array from the FS.Collection options
- *
- * Returns a selector that will be used to identify files where all
- * stores have successfully save or have failed the
- * max number of times but still have chunks. The resulting selector
- * should be something like this:
- *
- * {
- * $and: [
- * {chunks: {$exists: true}},
- * {
- * $or: [
- * {
- * $and: [
- * {
- * 'copies.storeName': {$ne: null}
- * },
- * {
- * 'copies.storeName': {$ne: false}
- * }
- * ]
- * },
- * {
- * 'failures.copies.storeName.doneTrying': true
- * }
- * ]
- * },
- * REPEATED FOR EACH STORE
- * ]
- * }
- *
- */
- function getDoneQuery(stores) {
- var selector = {
- $and: [{chunks: {$exists: true}}]
- };
- // Add conditions for all defined stores
- FS.Utility.each(stores, function(store) {
- var storeName = store.name;
- var copyCond = {$or: [{$and: []}]};
- var tempCond = {};
- tempCond["copies." + storeName] = {$ne: null};
- copyCond.$or[0].$and.push(tempCond);
- tempCond = {};
- tempCond["copies." + storeName] = {$ne: false};
- copyCond.$or[0].$and.push(tempCond);
- tempCond = {};
- tempCond['failures.copies.' + storeName + '.doneTrying'] = true;
- copyCond.$or.push(tempCond);
- selector.$and.push(copyCond);
- })
- return selector;
- }
- /**
- * @method saveCopy
- * @private
- * @param {FS.File} fsFile
- * @param {string} storeName
- * @param {Object} options
- * @param {Boolean} [options.overwrite=false] - Force save to the specified store?
- * @returns {undefined}
- *
- * Saves to the specified store. If the
- * `overwrite` option is `true`, will save to the store even if we already
- * have, potentially overwriting any previously saved data. Synchronous.
- */
- function saveCopy(fsFile, storeName, options) {
- options = options || {};
- var storage = FS.StorageAdapter(storeName);
- if (!storage) {
- throw new Error('No store named "' + storeName + '" exists');
- }
- FS.debug && console.log('saving to store ' + storeName);
- try {
- var writeStream = storage.adapter.createWriteStream(fsFile);
- var readStream = FS.TempStore.createReadStream(fsFile);
- // Pipe the temp data into the storage adapter
- readStream.pipe(writeStream);
- } catch(err) {
- console.error(err);
- }
- }
|