| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395 | 
							- // ##Temporary Storage
 
- //
 
- // Temporary storage is used for chunked uploads until all chunks are received
 
- // and all copies have been made or given up. In some cases, the original file
 
- // is stored only in temporary storage (for example, if all copies do some
 
- // manipulation in beforeSave). This is why we use the temporary file as the
 
- // basis for each saved copy, and then remove it after all copies are saved.
 
- //
 
- // Every chunk is saved as an individual temporary file. This is safer than
 
- // attempting to write multiple incoming chunks to different positions in a
 
- // single temporary file, which can lead to write conflicts.
 
- //
 
- // Using temp files also allows us to easily resume uploads, even if the server
 
- // restarts, and to keep the working memory clear.
 
- // The FS.TempStore emits events that others are able to listen to
 
- var EventEmitter = Npm.require('events').EventEmitter;
 
- // We have a special stream concating all chunk files into one readable stream
 
- var CombinedStream = Npm.require('combined-stream');
 
- /** @namespace FS.TempStore
 
-  * @property FS.TempStore
 
-  * @type {object}
 
-  * @public
 
-  * @summary An event emitter
 
-  */
 
- FS.TempStore = new EventEmitter();
 
- // Create a tracker collection for keeping track of all chunks for any files that are currently in the temp store
 
- var tracker = FS.TempStore.Tracker = new Mongo.Collection('cfs._tempstore.chunks');
 
- /**
 
-  * @property FS.TempStore.Storage
 
-  * @type {StorageAdapter}
 
-  * @namespace FS.TempStore
 
-  * @private
 
-  * @summary This property is set to either `FS.Store.FileSystem` or `FS.Store.GridFS`
 
-  *
 
-  * __When and why:__
 
-  * We normally default to `cfs-filesystem` unless its not installed. *(we default to gridfs if installed)*
 
-  * But if `cfs-gridfs` and `cfs-worker` is installed we default to `cfs-gridfs`
 
-  *
 
-  * If `cfs-gridfs` and `cfs-filesystem` is not installed we log a warning.
 
-  * the user can set `FS.TempStore.Storage` them selfs eg.:
 
-  * ```js
 
-  *   // Its important to set `internal: true` this lets the SA know that we
 
-  *   // are using this internally and it will give us direct SA api
 
-  *   FS.TempStore.Storage = new FS.Store.GridFS('_tempstore', { internal: true });
 
-  * ```
 
-  *
 
-  * > Note: This is considered as `advanced` use, its not a common pattern.
 
-  */
 
- FS.TempStore.Storage = null;
 
- // We will not mount a storage adapter until needed. This allows us to check for the
 
- // existance of FS.FileWorker, which is loaded after this package because it
 
- // depends on this package.
 
- function mountStorage() {
 
-   if (FS.TempStore.Storage) return;
 
-   // XXX: We could replace this test, testing the FS scope for grifFS etc.
 
-   // This is on the todo later when we get "stable"
 
-   if (Package["wekan-cfs-gridfs"] && (Package["wekan-cfs-worker"] || !Package["wekan-cfs-filesystem"])) {
 
-     // If the file worker is installed we would prefer to use the gridfs sa
 
-     // for scalability. We also default to gridfs if filesystem is not found
 
-     // Use the gridfs
 
-     FS.TempStore.Storage = new FS.Store.GridFS('_tempstore', { internal: true });
 
-   } else if (Package["wekan-cfs-filesystem"]) {
 
-     // use the Filesystem
 
-     FS.TempStore.Storage = new FS.Store.FileSystem('_tempstore', { internal: true });
 
-   } else {
 
-     throw new Error('FS.TempStore.Storage is not set: Install wekan-cfs-filesystem or wekan-cfs-gridfs or set it manually');
 
-   }
 
-   FS.debug && console.log('TempStore is mounted on', FS.TempStore.Storage.typeName);
 
- }
 
- function mountFile(fileObj, name) {
 
-   if (!fileObj.isMounted()) {
 
-     throw new Error(name + ' cannot work with unmounted file');
 
-   }
 
- }
 
- // We update the fileObj on progress
 
- FS.TempStore.on('progress', function(fileObj, chunkNum, count, total, result) {
 
-   FS.debug && console.log('TempStore progress: Received ' + count + ' of ' + total + ' chunks for ' + fileObj.name());
 
- });
 
- // XXX: TODO
 
- // FS.TempStore.on('stored', function(fileObj, chunkCount, result) {
 
- //   // This should work if we pass on result from the SA on stored event...
 
- //   fileObj.update({ $set: { chunkSum: 1, chunkCount: chunkCount, size: result.size } });
 
- // });
 
- // Stream implementation
 
- /**
 
-  * @method _chunkPath
 
-  * @private
 
-  * @param {Number} [n] Chunk number
 
-  * @returns {String} Chunk naming convention
 
-  */
 
- _chunkPath = function(n) {
 
-   return (n || 0) + '.chunk';
 
- };
 
- /**
 
-  * @method _fileReference
 
-  * @param {FS.File} fileObj
 
-  * @param {Number} chunk
 
-  * @private
 
-  * @returns {String} Generated SA specific fileKey for the chunk
 
-  *
 
-  * Note: Calling function should call mountStorage() first, and
 
-  * make sure that fileObj is mounted.
 
-  */
 
- _fileReference = function(fileObj, chunk, existing) {
 
-   // Maybe it's a chunk we've already saved
 
-   existing = existing || tracker.findOne({fileId: fileObj._id, collectionName: fileObj.collectionName});
 
-   // Make a temporary fileObj just for fileKey generation
 
-   var tempFileObj = new FS.File({
 
-     collectionName: fileObj.collectionName,
 
-     _id: fileObj._id,
 
-     original: {
 
-       name: _chunkPath(chunk)
 
-     },
 
-     copies: {
 
-       _tempstore: {
 
-         key: existing && existing.keys[chunk]
 
-       }
 
-     }
 
-   });
 
-   // Return a fitting fileKey SA specific
 
-   return FS.TempStore.Storage.adapter.fileKey(tempFileObj);
 
- };
 
- /**
 
-  * @method FS.TempStore.exists
 
-  * @param {FS.File} File object
 
-  * @returns {Boolean} Is this file, or parts of it, currently stored in the TempStore
 
-  */
 
- FS.TempStore.exists = function(fileObj) {
 
-   var existing = tracker.findOne({fileId: fileObj._id, collectionName: fileObj.collectionName});
 
-   return !!existing;
 
- };
 
- /**
 
-  * @method FS.TempStore.listParts
 
-  * @param {FS.File} fileObj
 
-  * @returns {Object} of parts already stored
 
-  * @todo This is not yet implemented, milestone 1.1.0
 
-  */
 
- FS.TempStore.listParts = function fsTempStoreListParts(fileObj) {
 
-   var self = this;
 
-   console.warn('This function is not correctly implemented using SA in TempStore');
 
-   //XXX This function might be necessary for resume. Not currently supported.
 
- };
 
- /**
 
-  * @method FS.TempStore.removeFile
 
-  * @public
 
-  * @param {FS.File} fileObj
 
-  * This function removes the file from tempstorage - it cares not if file is
 
-  * already removed or not found, goal is reached anyway.
 
-  */
 
- FS.TempStore.removeFile = function fsTempStoreRemoveFile(fileObj) {
 
-   var self = this;
 
-   // Ensure that we have a storage adapter mounted; if not, throw an error.
 
-   mountStorage();
 
-   // If fileObj is not mounted or can't be, throw an error
 
-   mountFile(fileObj, 'FS.TempStore.removeFile');
 
-   // Emit event
 
-   self.emit('remove', fileObj);
 
-   var chunkInfo = tracker.findOne({
 
-     fileId: fileObj._id,
 
-     collectionName: fileObj.collectionName
 
-   });
 
-   if (chunkInfo) {
 
-     // Unlink each file
 
-     FS.Utility.each(chunkInfo.keys || {}, function (key, chunk) {
 
-       var fileKey = _fileReference(fileObj, chunk, chunkInfo);
 
-       FS.TempStore.Storage.adapter.remove(fileKey, FS.Utility.noop);
 
-     });
 
-     // Remove fileObj from tracker collection, too
 
-     tracker.remove({_id: chunkInfo._id});
 
-   }
 
- };
 
- /**
 
-  * @method FS.TempStore.removeAll
 
-  * @public
 
-  * @summary This function removes all files from tempstorage - it cares not if file is
 
-  * already removed or not found, goal is reached anyway.
 
-  */
 
- FS.TempStore.removeAll = function fsTempStoreRemoveAll() {
 
-   var self = this;
 
-   // Ensure that we have a storage adapter mounted; if not, throw an error.
 
-   mountStorage();
 
-   tracker.find().forEach(function (chunkInfo) {
 
-     // Unlink each file
 
-     FS.Utility.each(chunkInfo.keys || {}, function (key, chunk) {
 
-       var fileKey = _fileReference({_id: chunkInfo.fileId, collectionName: chunkInfo.collectionName}, chunk, chunkInfo);
 
-       FS.TempStore.Storage.adapter.remove(fileKey, FS.Utility.noop);
 
-     });
 
-     // Remove from tracker collection, too
 
-     tracker.remove({_id: chunkInfo._id});
 
-   });
 
- };
 
- /**
 
-  * @method FS.TempStore.createWriteStream
 
-  * @public
 
-  * @param {FS.File} fileObj File to store in temporary storage
 
-  * @param {Number | String} [options]
 
-  * @returns {Stream} Writeable stream
 
-  *
 
-  * `options` of different types mean differnt things:
 
-  * * `undefined` We store the file in one part
 
-  * *(Normal server-side api usage)*
 
-  * * `Number` the number is the part number total
 
-  * *(multipart uploads will use this api)*
 
-  * * `String` the string is the name of the `store` that wants to store file data
 
-  * *(stores that want to sync their data to the rest of the files stores will use this)*
 
-  *
 
-  * > Note: fileObj must be mounted on a `FS.Collection`, it makes no sense to store otherwise
 
-  */
 
- FS.TempStore.createWriteStream = function(fileObj, options) {
 
-   var self = this;
 
-   // Ensure that we have a storage adapter mounted; if not, throw an error.
 
-   mountStorage();
 
-   // If fileObj is not mounted or can't be, throw an error
 
-   mountFile(fileObj, 'FS.TempStore.createWriteStream');
 
-   // Cache the selector for use multiple times below
 
-   var selector = {fileId: fileObj._id, collectionName: fileObj.collectionName};
 
-   // TODO, should pass in chunkSum so we don't need to use FS.File for it
 
-   var chunkSum = fileObj.chunkSum || 1;
 
-   // Add fileObj to tracker collection
 
-   tracker.upsert(selector, {$setOnInsert: {keys: {}}});
 
-   // Determine how we're using the writeStream
 
-   var isOnePart = false, isMultiPart = false, isStoreSync = false, chunkNum = 0;
 
-   if (options === +options) {
 
-     isMultiPart = true;
 
-     chunkNum = options;
 
-   } else if (options === ''+options) {
 
-     isStoreSync = true;
 
-   } else {
 
-     isOnePart = true;
 
-   }
 
-   // XXX: it should be possible for a store to sync by storing data into the
 
-   // tempstore - this could be done nicely by setting the store name as string
 
-   // in the chunk variable?
 
-   // This store name could be passed on the the fileworker via the uploaded
 
-   // event
 
-   // So the uploaded event can return:
 
-   // undefined - if data is stored into and should sync out to all storage adapters
 
-   // number - if a chunk has been uploaded
 
-   // string - if a storage adapter wants to sync its data to the other SA's
 
-   // Find a nice location for the chunk data
 
-   var fileKey = _fileReference(fileObj, chunkNum);
 
-   // Create the stream as Meteor safe stream
 
-   var writeStream = FS.TempStore.Storage.adapter.createWriteStream(fileKey);
 
-   // When the stream closes we update the chunkCount
 
-   writeStream.safeOn('stored', function(result) {
 
-     // Save key in tracker document
 
-     var setObj = {};
 
-     setObj['keys.' + chunkNum] = result.fileKey;
 
-     tracker.update(selector, {$set: setObj});
 
-     var temp = tracker.findOne(selector);
 
-     if (!temp) {
 
-       FS.debug && console.log('NOT FOUND FROM TEMPSTORE => EXIT (REMOVED)');
 
-       return;
 
-     }
 
-     // Get updated chunkCount
 
-     var chunkCount = FS.Utility.size(temp.keys);
 
-     // Progress
 
-     self.emit('progress', fileObj, chunkNum, chunkCount, chunkSum, result);
 
-     var modifier = { $set: {} };
 
-     if (!fileObj.instance_id) {
 
-       modifier.$set.instance_id = process.env.COLLECTIONFS_ENV_NAME_UNIQUE_ID ? process.env[process.env.COLLECTIONFS_ENV_NAME_UNIQUE_ID] : process.env.METEOR_PARENT_PID;
 
-     }
 
-     // If upload is completed
 
-     if (chunkCount === chunkSum) {
 
-       // We no longer need the chunk info
 
-       modifier.$unset = {chunkCount: 1, chunkSum: 1, chunkSize: 1};
 
-       // Check if the file has been uploaded before
 
-       if (typeof fileObj.uploadedAt === 'undefined') {
 
-         // We set the uploadedAt date
 
-         modifier.$set.uploadedAt = new Date();
 
-       } else {
 
-         // We have been uploaded so an event were file data is updated is
 
-         // called synchronizing - so this must be a synchronizedAt?
 
-         modifier.$set.synchronizedAt = new Date();
 
-       }
 
-       // Update the fileObject
 
-       fileObj.update(modifier);
 
-       // Fire ending events
 
-       var eventName = isStoreSync ? 'synchronized' : 'stored';
 
-       self.emit(eventName, fileObj, result);
 
-       // XXX is emitting "ready" necessary?
 
-       self.emit('ready', fileObj, chunkCount, result);
 
-     } else {
 
-       // Update the chunkCount on the fileObject
 
-       modifier.$set.chunkCount = chunkCount;
 
-       fileObj.update(modifier);
 
-     }
 
-   });
 
-   // Emit errors
 
-   writeStream.on('error', function (error) {
 
-     FS.debug && console.log('TempStore writeStream error:', error);
 
-     self.emit('error', error, fileObj);
 
-   });
 
-   return writeStream;
 
- };
 
- /**
 
-   * @method FS.TempStore.createReadStream
 
-   * @public
 
-   * @param {FS.File} fileObj The file to read
 
-   * @return {Stream} Returns readable stream
 
-   *
 
-   */
 
- FS.TempStore.createReadStream = function(fileObj) {
 
-   // Ensure that we have a storage adapter mounted; if not, throw an error.
 
-   mountStorage();
 
-   // If fileObj is not mounted or can't be, throw an error
 
-   mountFile(fileObj, 'FS.TempStore.createReadStream');
 
-   FS.debug && console.log('FS.TempStore creating read stream for ' + fileObj._id);
 
-   // Determine how many total chunks there are from the tracker collection
 
-   var chunkInfo = tracker.findOne({fileId: fileObj._id, collectionName: fileObj.collectionName}) || {};
 
-   var totalChunks = FS.Utility.size(chunkInfo.keys);
 
-   function getNextStreamFunc(chunk) {
 
-     return Meteor.bindEnvironment(function(next) {
 
-       var fileKey = _fileReference(fileObj, chunk);
 
-       var chunkReadStream = FS.TempStore.Storage.adapter.createReadStream(fileKey);
 
-       next(chunkReadStream);
 
-     }, function (error) {
 
-       throw error;
 
-     });
 
-   }
 
-   // Make a combined stream
 
-   var combinedStream = CombinedStream.create();
 
-   // Add each chunk stream to the combined stream when the previous chunk stream ends
 
-   var currentChunk = 0;
 
-   for (var chunk = 0; chunk < totalChunks; chunk++) {
 
-     combinedStream.append(getNextStreamFunc(chunk));
 
-   }
 
-   // Return the combined stream
 
-   return combinedStream;
 
- };
 
 
  |