loggly.js 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. var fs = require('fs')
  2. , Stream = require('stream').Stream
  3. , clarinet = require('../clarinet')
  4. , parse_stream = clarinet.createStream()
  5. , previous = ''
  6. , buffer = {}
  7. , stack = []
  8. , new_thing = false
  9. ;
  10. function debug_log() {
  11. if(process.env.DEBUG) {
  12. console.log.apply(null, arguments);
  13. }
  14. }
  15. parse_stream.on('openobject', function(name) {
  16. if(new_thing) {
  17. console.log(JSON.stringify(buffer, null, 2));
  18. buffer = {};
  19. new_thing = false;
  20. }
  21. previous = name;
  22. stack.push(name);
  23. debug_log('=== {', name, buffer);
  24. });
  25. parse_stream.on('closeobject', function() {
  26. stack.pop();
  27. debug_log('=== }', null, buffer);
  28. });
  29. parse_stream.on('key', function(name) {
  30. previous = name;
  31. stack.pop();
  32. stack.push(name);
  33. debug_log('=== ,', name, buffer);
  34. });
  35. parse_stream.on('value', function(value) {
  36. if(previous === 'event') {
  37. value = JSON.parse(value);
  38. }
  39. var expected = stack.length-1;
  40. stack.reduce(function (ac, x, i) {
  41. if(i === expected) {
  42. ac[x] = value;
  43. }
  44. ac[x] = ac[x] || {};
  45. return ac[x];
  46. }, buffer);
  47. debug_log('=== v', value, buffer);
  48. });
  49. parse_stream.on('error', function (e) {
  50. new_thing = true;
  51. });
  52. function fixLogglyStream() {
  53. var log_stream = new Stream();
  54. log_stream.readable = true;
  55. log_stream.writable = true;
  56. log_stream.write = function (buf) {
  57. var as_string = buf.toString('utf-8').replace(/\\\\/g, '\\');
  58. this.emit('data', as_string);
  59. };
  60. log_stream.end = function (buf) {
  61. if (arguments.length) {
  62. log_stream.write(buf);
  63. }
  64. log_stream.writable = false;
  65. };
  66. log_stream.destroy = function () {
  67. log_stream.writable = false;
  68. };
  69. return log_stream;
  70. }
  71. fs.createReadStream(__dirname + '/loggly.txt')
  72. .pipe(fixLogglyStream())
  73. .pipe(parse_stream)
  74. ;