123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- /* global FS */
- var PassThrough = Npm.require('stream').PassThrough;
- var lengthStream = Npm.require('length-stream');
- FS.Transform = function(options) {
- var self = this;
- options = options || {};
- if (!(self instanceof FS.Transform))
- throw new Error('FS.Transform must be called with the "new" keyword');
- if (!options.adapter)
- throw new Error('Transform expects option.adapter to be a storage adapter');
- self.storage = options.adapter;
- // Fetch the transformation functions if any
- self.transformWrite = options.transformWrite;
- self.transformRead = options.transformRead;
- };
- // Allow packages to add scope
- FS.Transform.scope = {};
- // The transformation stream triggers an "stored" event when data is stored into
- // the storage adapter
- FS.Transform.prototype.createWriteStream = function(fileObj) {
- var self = this;
- // Get the file key
- var fileKey = self.storage.fileKey(fileObj);
- // Rig write stream
- var destinationStream = self.storage.createWriteStreamForFileKey(fileKey, {
- // Not all SA's can set these options and cfs dont depend on setting these
- // but its nice if other systems are accessing the SA that some of the data
- // is also available to those
- aliases: [fileObj.name()],
- contentType: fileObj.type(),
- metadata: fileObj.metadata
- });
- // Pass through transformWrite function if provided
- if (typeof self.transformWrite === 'function') {
- destinationStream = addPassThrough(destinationStream, function (ptStream, originalStream) {
- // Rig transform
- try {
- self.transformWrite.call(FS.Transform.scope, fileObj, ptStream, originalStream);
- // XXX: If the transform function returns a buffer should we stream that?
- } catch(err) {
- // We emit an error - should we throw an error?
- console.warn('FS.Transform.createWriteStream transform function failed, Error: ');
- throw err;
- }
- });
- }
- // If original doesn't have size, add another PassThrough to get and set the size.
- // This will run on size=0, too, which is OK.
- // NOTE: This must come AFTER the transformWrite code block above. This might seem
- // confusing, but by coming after it, this will actually be executed BEFORE the user's
- // transform, which is what we need in order to be sure we get the original file
- // size and not the transformed file size.
- if (!fileObj.size()) {
- destinationStream = addPassThrough(destinationStream, function (ptStream, originalStream) {
- var lstream = lengthStream(function (fileSize) {
- fileObj.size(fileSize, {save: false});
- });
- ptStream.pipe(lstream).pipe(originalStream);
- });
- }
- return destinationStream;
- };
- FS.Transform.prototype.createReadStream = function(fileObj, options) {
- var self = this;
- // Get the file key
- var fileKey = self.storage.fileKey(fileObj);
- // Rig read stream
- var sourceStream = self.storage.createReadStreamForFileKey(fileKey, options);
- // Pass through transformRead function if provided
- if (typeof self.transformRead === 'function') {
- sourceStream = addPassThrough(sourceStream, function (ptStream, originalStream) {
- // Rig transform
- try {
- self.transformRead.call(FS.Transform.scope, fileObj, originalStream, ptStream);
- } catch(err) {
- //throw new Error(err);
- // We emit an error - should we throw an error?
- sourceStream.emit('error', 'FS.Transform.createReadStream transform function failed');
- }
- });
- }
- // We dont transform just normal SA interface
- return sourceStream;
- };
- // Utility function to simplify adding layers of passthrough
- function addPassThrough(stream, func) {
- var pts = new PassThrough();
- // We pass on the special "stored" event for those listening
- stream.on('stored', function(result) {
- pts.emit('stored', result);
- });
- func(pts, stream);
- return pts;
- }
|