| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395 | // ##Temporary Storage//// Temporary storage is used for chunked uploads until all chunks are received// and all copies have been made or given up. In some cases, the original file// is stored only in temporary storage (for example, if all copies do some// manipulation in beforeSave). This is why we use the temporary file as the// basis for each saved copy, and then remove it after all copies are saved.//// Every chunk is saved as an individual temporary file. This is safer than// attempting to write multiple incoming chunks to different positions in a// single temporary file, which can lead to write conflicts.//// Using temp files also allows us to easily resume uploads, even if the server// restarts, and to keep the working memory clear.// The FS.TempStore emits events that others are able to listen tovar EventEmitter = Npm.require('events').EventEmitter;// We have a special stream concating all chunk files into one readable streamvar CombinedStream = Npm.require('combined-stream');/** @namespace FS.TempStore * @property FS.TempStore * @type {object} * @public * @summary An event emitter */FS.TempStore = new EventEmitter();// Create a tracker collection for keeping track of all chunks for any files that are currently in the temp storevar tracker = FS.TempStore.Tracker = new Mongo.Collection('cfs._tempstore.chunks');/** * @property FS.TempStore.Storage * @type {StorageAdapter} * @namespace FS.TempStore * @private * @summary This property is set to either `FS.Store.FileSystem` or `FS.Store.GridFS` * * __When and why:__ * We normally default to `cfs-filesystem` unless its not installed. *(we default to gridfs if installed)* * But if `cfs-gridfs` and `cfs-worker` is installed we default to `cfs-gridfs` * * If `cfs-gridfs` and `cfs-filesystem` is not installed we log a warning. * the user can set `FS.TempStore.Storage` them selfs eg.: * ```js *   // Its important to set `internal: true` this lets the SA know that we *   // are using this internally and it will give us direct SA api *   FS.TempStore.Storage = new FS.Store.GridFS('_tempstore', { internal: true }); * ``` * * > Note: This is considered as `advanced` use, its not a common pattern. */FS.TempStore.Storage = null;// We will not mount a storage adapter until needed. This allows us to check for the// existance of FS.FileWorker, which is loaded after this package because it// depends on this package.function mountStorage() {  if (FS.TempStore.Storage) return;  // XXX: We could replace this test, testing the FS scope for grifFS etc.  // This is on the todo later when we get "stable"  if (Package["wekan-cfs-gridfs"] && (Package["wekan-cfs-worker"] || !Package["wekan-cfs-filesystem"])) {    // If the file worker is installed we would prefer to use the gridfs sa    // for scalability. We also default to gridfs if filesystem is not found    // Use the gridfs    FS.TempStore.Storage = new FS.Store.GridFS('_tempstore', { internal: true });  } else if (Package["wekan-cfs-filesystem"]) {    // use the Filesystem    FS.TempStore.Storage = new FS.Store.FileSystem('_tempstore', { internal: true });  } else {    throw new Error('FS.TempStore.Storage is not set: Install wekan-cfs-filesystem or wekan-cfs-gridfs or set it manually');  }  FS.debug && console.log('TempStore is mounted on', FS.TempStore.Storage.typeName);}function mountFile(fileObj, name) {  if (!fileObj.isMounted()) {    throw new Error(name + ' cannot work with unmounted file');  }}// We update the fileObj on progressFS.TempStore.on('progress', function(fileObj, chunkNum, count, total, result) {  FS.debug && console.log('TempStore progress: Received ' + count + ' of ' + total + ' chunks for ' + fileObj.name());});// XXX: TODO// FS.TempStore.on('stored', function(fileObj, chunkCount, result) {//   // This should work if we pass on result from the SA on stored event...//   fileObj.update({ $set: { chunkSum: 1, chunkCount: chunkCount, size: result.size } });// });// Stream implementation/** * @method _chunkPath * @private * @param {Number} [n] Chunk number * @returns {String} Chunk naming convention */_chunkPath = function(n) {  return (n || 0) + '.chunk';};/** * @method _fileReference * @param {FS.File} fileObj * @param {Number} chunk * @private * @returns {String} Generated SA specific fileKey for the chunk * * Note: Calling function should call mountStorage() first, and * make sure that fileObj is mounted. */_fileReference = function(fileObj, chunk, existing) {  // Maybe it's a chunk we've already saved  existing = existing || tracker.findOne({fileId: fileObj._id, collectionName: fileObj.collectionName});  // Make a temporary fileObj just for fileKey generation  var tempFileObj = new FS.File({    collectionName: fileObj.collectionName,    _id: fileObj._id,    original: {      name: _chunkPath(chunk)    },    copies: {      _tempstore: {        key: existing && existing.keys[chunk]      }    }  });  // Return a fitting fileKey SA specific  return FS.TempStore.Storage.adapter.fileKey(tempFileObj);};/** * @method FS.TempStore.exists * @param {FS.File} File object * @returns {Boolean} Is this file, or parts of it, currently stored in the TempStore */FS.TempStore.exists = function(fileObj) {  var existing = tracker.findOne({fileId: fileObj._id, collectionName: fileObj.collectionName});  return !!existing;};/** * @method FS.TempStore.listParts * @param {FS.File} fileObj * @returns {Object} of parts already stored * @todo This is not yet implemented, milestone 1.1.0 */FS.TempStore.listParts = function fsTempStoreListParts(fileObj) {  var self = this;  console.warn('This function is not correctly implemented using SA in TempStore');  //XXX This function might be necessary for resume. Not currently supported.};/** * @method FS.TempStore.removeFile * @public * @param {FS.File} fileObj * This function removes the file from tempstorage - it cares not if file is * already removed or not found, goal is reached anyway. */FS.TempStore.removeFile = function fsTempStoreRemoveFile(fileObj) {  var self = this;  // Ensure that we have a storage adapter mounted; if not, throw an error.  mountStorage();  // If fileObj is not mounted or can't be, throw an error  mountFile(fileObj, 'FS.TempStore.removeFile');  // Emit event  self.emit('remove', fileObj);  var chunkInfo = tracker.findOne({    fileId: fileObj._id,    collectionName: fileObj.collectionName  });  if (chunkInfo) {    // Unlink each file    FS.Utility.each(chunkInfo.keys || {}, function (key, chunk) {      var fileKey = _fileReference(fileObj, chunk, chunkInfo);      FS.TempStore.Storage.adapter.remove(fileKey, FS.Utility.noop);    });    // Remove fileObj from tracker collection, too    tracker.remove({_id: chunkInfo._id});  }};/** * @method FS.TempStore.removeAll * @public * @summary This function removes all files from tempstorage - it cares not if file is * already removed or not found, goal is reached anyway. */FS.TempStore.removeAll = function fsTempStoreRemoveAll() {  var self = this;  // Ensure that we have a storage adapter mounted; if not, throw an error.  mountStorage();  tracker.find().forEach(function (chunkInfo) {    // Unlink each file    FS.Utility.each(chunkInfo.keys || {}, function (key, chunk) {      var fileKey = _fileReference({_id: chunkInfo.fileId, collectionName: chunkInfo.collectionName}, chunk, chunkInfo);      FS.TempStore.Storage.adapter.remove(fileKey, FS.Utility.noop);    });    // Remove from tracker collection, too    tracker.remove({_id: chunkInfo._id});  });};/** * @method FS.TempStore.createWriteStream * @public * @param {FS.File} fileObj File to store in temporary storage * @param {Number | String} [options] * @returns {Stream} Writeable stream * * `options` of different types mean differnt things: * * `undefined` We store the file in one part * *(Normal server-side api usage)* * * `Number` the number is the part number total * *(multipart uploads will use this api)* * * `String` the string is the name of the `store` that wants to store file data * *(stores that want to sync their data to the rest of the files stores will use this)* * * > Note: fileObj must be mounted on a `FS.Collection`, it makes no sense to store otherwise */FS.TempStore.createWriteStream = function(fileObj, options) {  var self = this;  // Ensure that we have a storage adapter mounted; if not, throw an error.  mountStorage();  // If fileObj is not mounted or can't be, throw an error  mountFile(fileObj, 'FS.TempStore.createWriteStream');  // Cache the selector for use multiple times below  var selector = {fileId: fileObj._id, collectionName: fileObj.collectionName};  // TODO, should pass in chunkSum so we don't need to use FS.File for it  var chunkSum = fileObj.chunkSum || 1;  // Add fileObj to tracker collection  tracker.upsert(selector, {$setOnInsert: {keys: {}}});  // Determine how we're using the writeStream  var isOnePart = false, isMultiPart = false, isStoreSync = false, chunkNum = 0;  if (options === +options) {    isMultiPart = true;    chunkNum = options;  } else if (options === ''+options) {    isStoreSync = true;  } else {    isOnePart = true;  }  // XXX: it should be possible for a store to sync by storing data into the  // tempstore - this could be done nicely by setting the store name as string  // in the chunk variable?  // This store name could be passed on the the fileworker via the uploaded  // event  // So the uploaded event can return:  // undefined - if data is stored into and should sync out to all storage adapters  // number - if a chunk has been uploaded  // string - if a storage adapter wants to sync its data to the other SA's  // Find a nice location for the chunk data  var fileKey = _fileReference(fileObj, chunkNum);  // Create the stream as Meteor safe stream  var writeStream = FS.TempStore.Storage.adapter.createWriteStream(fileKey);  // When the stream closes we update the chunkCount  writeStream.safeOn('stored', function(result) {    // Save key in tracker document    var setObj = {};    setObj['keys.' + chunkNum] = result.fileKey;    tracker.update(selector, {$set: setObj});    var temp = tracker.findOne(selector);    if (!temp) {      FS.debug && console.log('NOT FOUND FROM TEMPSTORE => EXIT (REMOVED)');      return;    }    // Get updated chunkCount    var chunkCount = FS.Utility.size(temp.keys);    // Progress    self.emit('progress', fileObj, chunkNum, chunkCount, chunkSum, result);    var modifier = { $set: {} };    if (!fileObj.instance_id) {      modifier.$set.instance_id = process.env.COLLECTIONFS_ENV_NAME_UNIQUE_ID ? process.env[process.env.COLLECTIONFS_ENV_NAME_UNIQUE_ID] : process.env.METEOR_PARENT_PID;    }    // If upload is completed    if (chunkCount === chunkSum) {      // We no longer need the chunk info      modifier.$unset = {chunkCount: 1, chunkSum: 1, chunkSize: 1};      // Check if the file has been uploaded before      if (typeof fileObj.uploadedAt === 'undefined') {        // We set the uploadedAt date        modifier.$set.uploadedAt = new Date();      } else {        // We have been uploaded so an event were file data is updated is        // called synchronizing - so this must be a synchronizedAt?        modifier.$set.synchronizedAt = new Date();      }      // Update the fileObject      fileObj.update(modifier);      // Fire ending events      var eventName = isStoreSync ? 'synchronized' : 'stored';      self.emit(eventName, fileObj, result);      // XXX is emitting "ready" necessary?      self.emit('ready', fileObj, chunkCount, result);    } else {      // Update the chunkCount on the fileObject      modifier.$set.chunkCount = chunkCount;      fileObj.update(modifier);    }  });  // Emit errors  writeStream.on('error', function (error) {    FS.debug && console.log('TempStore writeStream error:', error);    self.emit('error', error, fileObj);  });  return writeStream;};/**  * @method FS.TempStore.createReadStream  * @public  * @param {FS.File} fileObj The file to read  * @return {Stream} Returns readable stream  *  */FS.TempStore.createReadStream = function(fileObj) {  // Ensure that we have a storage adapter mounted; if not, throw an error.  mountStorage();  // If fileObj is not mounted or can't be, throw an error  mountFile(fileObj, 'FS.TempStore.createReadStream');  FS.debug && console.log('FS.TempStore creating read stream for ' + fileObj._id);  // Determine how many total chunks there are from the tracker collection  var chunkInfo = tracker.findOne({fileId: fileObj._id, collectionName: fileObj.collectionName}) || {};  var totalChunks = FS.Utility.size(chunkInfo.keys);  function getNextStreamFunc(chunk) {    return Meteor.bindEnvironment(function(next) {      var fileKey = _fileReference(fileObj, chunk);      var chunkReadStream = FS.TempStore.Storage.adapter.createReadStream(fileKey);      next(chunkReadStream);    }, function (error) {      throw error;    });  }  // Make a combined stream  var combinedStream = CombinedStream.create();  // Add each chunk stream to the combined stream when the previous chunk stream ends  var currentChunk = 0;  for (var chunk = 0; chunk < totalChunks; chunk++) {    combinedStream.append(getNextStreamFunc(chunk));  }  // Return the combined stream  return combinedStream;};
 |