2
0

transform.server.js 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. /* global FS */
  2. var PassThrough = Npm.require('stream').PassThrough;
  3. var lengthStream = Npm.require('length-stream');
  4. FS.Transform = function(options) {
  5. var self = this;
  6. options = options || {};
  7. if (!(self instanceof FS.Transform))
  8. throw new Error('FS.Transform must be called with the "new" keyword');
  9. if (!options.adapter)
  10. throw new Error('Transform expects option.adapter to be a storage adapter');
  11. self.storage = options.adapter;
  12. // Fetch the transformation functions if any
  13. self.transformWrite = options.transformWrite;
  14. self.transformRead = options.transformRead;
  15. };
  16. // Allow packages to add scope
  17. FS.Transform.scope = {};
  18. // The transformation stream triggers an "stored" event when data is stored into
  19. // the storage adapter
  20. FS.Transform.prototype.createWriteStream = function(fileObj) {
  21. var self = this;
  22. // Get the file key
  23. var fileKey = self.storage.fileKey(fileObj);
  24. // Rig write stream
  25. var destinationStream = self.storage.createWriteStreamForFileKey(fileKey, {
  26. // Not all SA's can set these options and cfs dont depend on setting these
  27. // but its nice if other systems are accessing the SA that some of the data
  28. // is also available to those
  29. aliases: [fileObj.name()],
  30. contentType: fileObj.type(),
  31. metadata: fileObj.metadata
  32. });
  33. // Pass through transformWrite function if provided
  34. if (typeof self.transformWrite === 'function') {
  35. destinationStream = addPassThrough(destinationStream, function (ptStream, originalStream) {
  36. // Rig transform
  37. try {
  38. self.transformWrite.call(FS.Transform.scope, fileObj, ptStream, originalStream);
  39. // XXX: If the transform function returns a buffer should we stream that?
  40. } catch(err) {
  41. // We emit an error - should we throw an error?
  42. console.warn('FS.Transform.createWriteStream transform function failed, Error: ');
  43. throw err;
  44. }
  45. });
  46. }
  47. // If original doesn't have size, add another PassThrough to get and set the size.
  48. // This will run on size=0, too, which is OK.
  49. // NOTE: This must come AFTER the transformWrite code block above. This might seem
  50. // confusing, but by coming after it, this will actually be executed BEFORE the user's
  51. // transform, which is what we need in order to be sure we get the original file
  52. // size and not the transformed file size.
  53. if (!fileObj.size()) {
  54. destinationStream = addPassThrough(destinationStream, function (ptStream, originalStream) {
  55. var lstream = lengthStream(function (fileSize) {
  56. fileObj.size(fileSize, {save: false});
  57. });
  58. ptStream.pipe(lstream).pipe(originalStream);
  59. });
  60. }
  61. return destinationStream;
  62. };
  63. FS.Transform.prototype.createReadStream = function(fileObj, options) {
  64. var self = this;
  65. // Get the file key
  66. var fileKey = self.storage.fileKey(fileObj);
  67. // Rig read stream
  68. var sourceStream = self.storage.createReadStreamForFileKey(fileKey, options);
  69. // Pass through transformRead function if provided
  70. if (typeof self.transformRead === 'function') {
  71. sourceStream = addPassThrough(sourceStream, function (ptStream, originalStream) {
  72. // Rig transform
  73. try {
  74. self.transformRead.call(FS.Transform.scope, fileObj, originalStream, ptStream);
  75. } catch(err) {
  76. //throw new Error(err);
  77. // We emit an error - should we throw an error?
  78. sourceStream.emit('error', 'FS.Transform.createReadStream transform function failed');
  79. }
  80. });
  81. }
  82. // We dont transform just normal SA interface
  83. return sourceStream;
  84. };
  85. // Utility function to simplify adding layers of passthrough
  86. function addPassThrough(stream, func) {
  87. var pts = new PassThrough();
  88. // We pass on the special "stored" event for those listening
  89. stream.on('stored', function(result) {
  90. pts.emit('stored', result);
  91. });
  92. func(pts, stream);
  93. return pts;
  94. }