An Object Stream Wrapper for the Clarinet JSON Parser

April 1st, 2014 Permalink

Streams are a powerful tool in any language that supports them well. Once you wrap your head around how to create your own implementations you start to see that a majority of tasks can be broken down into streams that pipe to one another in sequence: each stream accepts the output of the prior as input, performs its processing, and passes on modified data as input to the next stream in sequence.

Why use streams versus some other paradigm, however? For me, it is that the stream infrastructure provides the necessary throttling and buffering needed to pipe the output of a fast process into the input of a slow process with a single line of code. Throttling and buffering are a pain to manage yourself, and an application may consist of a dozen sequential operations that all proceed at their own variable speeds.

Using Node.js streams, for example, once you have written a slow data transformation stream it is trivial to hook it up to a fast read and fast write, such as the stdin and stdout of the process. The underlying stream implementation does all the hard work for you:

var slowTransformer = new SlowTransformer();
process.stdin.pipe(slowTransformer).pipe(process.stdout);

Clarinet for Streaming JSON Parsing

JSON parsing and processing is another good example of a task that lends itself to implementation as a sequence of piped streams. In Node.js a stream can be constructed to accept or output binary data, encoded strings, or arbitrary objects. So when processing JSON, it is often the case that you will want to read in strings, transform them to objects, act on the objects, and then output the results as strings again. These operations can each be implemented as one or more piped streams.

Clarinet is a robust streaming JSON parser package for Node.js. It consumes incoming JSON and emits parse events: open object, property in object, close object, and so forth.

var parser = require('clarinet').createStream()

parser.on('value', function (v) {
  // Found a value, which can be string, double, bool, or null.
});
parser.on('openobject', function (key) {
  // Opened an object, where key is the first key.
});
parser.on('key', function (key) {
  // Found a key in an object.
});
parser.on('closeobject', function () {
  // Closed an object.
});

fs.createReadStream('file.json').pipe(parser);

This is is only streaming when considering the input, however. The events are emitted in the standard fashion, so if you want a stream that reads JSON and writes parse events you'll have to run that up yourself.

A Full Clarinet Object Stream

Fortunately it isn't all that challenging to build an object stream that works like Clarinet but writes parse events rather than emitting them. What is needed is an implementation of the Transform base stream class that wraps a Clarinet stream instance. An instance of this class must pass its input into the Clarinet parser and then monitor the emitted events so as to correctly manage the callbacks passed in to the _transform() method. These callbacks are how streams manage throttling from a faster stream to a slower stream.

Here then is a Clarinet object stream that reads JSON and writes parse event objects:

/**
 * @fileOverview
 *
 * Class definition for the ClarinetObjectStream.
 */

var Transform = require('stream').Transform;
var util = require('util');
var clarinet = require('clarinet');

//---------------------------------------------------------------------------
// Class constructor.
//---------------------------------------------------------------------------

/**
 * @class
 * A wrapper for the Clarinet JSON parser that transforms streaming JSON
 * input to streaming Clarinet parse events as output.
 *
 * See the Clarinet documentation for the list of events.
 *
 * @see BaseImporter
 */
function ClarinetObjectStream (options) {
  options = options || {};
  // Ensure that object mode is set regardless of options passed in.
  options.objectMode = true;
  ClarinetObjectStream.super_.call(this, options);

  this.parseStream = clarinet.createStream();
  // We keep track of callbacks passed to this._write(), and call them when
  // this.parseStream emits a "data" event, which it should only do when it has
  // finished chewing over a data chunk.
  this.transformCallbackQueue = [];

  var self = this;

  // ------------------------------------------------------------------------
  // Clarinet parse event names.
  // ------------------------------------------------------------------------

  this.CLOSEARRAY = 'closearray';
  this.CLOSEOBJECT = 'closeobject';
  this.KEY = 'key';
  this.OPENARRAY = 'openarray';
  this.OPENOBJECT = 'openobject';
  this.VALUE = 'value';

  // Clarinet only emits an "end" event after parseStream.close() is called and
  // any remaining processing takes place. This is not the standard usage of
  // "end" in a stream, and we can ignore it. Since we never invoke
  // parseStream.close() here, this event will never be emitted.
  //
  // Nonetheless, an instance of ClarinetObjectStream will correctly emit an
  // "end" event in the right circumstances, such as when the input from a piped
  // ReadStream instance has ended.
  //
  // this.END = 'end';

  // Clarinet emits an error event on JSON syntax errors. It can work its way
  // through to syntactically correct JSON on the other side of the error, but
  // will likely emit many error events first.
  this.ERROR = 'error';

  // ------------------------------------------------------------------------
  // Manage write streaming to the Clarinet parseStream.
  // ------------------------------------------------------------------------

  this.parseStream.on('data', function () {
    // Invoke the associated callback passed in to self.write() with the data
    // in question.
    var callback = self.transformCallbackQueue.shift();
    // Should always exist, as Clarinet only emits the "data" event once done
    // with the data passed in to self.parseStream.write().
    callback();
  });

  // ------------------------------------------------------------------------
  // React to parser events by passing them on as objects.
  // ------------------------------------------------------------------------

  // Done with the array.
  this.parseStream.on(this.CLOSEARRAY, function () {
    self.push({
      type: self.CLOSEARRAY
    });
  });

  // Done with an object.
  this.parseStream.on(this.CLOSEOBJECT, function () {
    self.push({
      type: self.CLOSEOBJECT
    });
  });

  // See the inline notes on this.END as to why this is not needed.
  //
  // this.parseStream.on(this.END, function () {
  //   self._push({
  //     type: self.END
  //   });
  // });

  // Found a key in the current object.
  this.parseStream.on(this.KEY, function (key) {
    self.push({
      type: self.KEY,
      data: key
    });
  });

  // A new array is opened.
  this.parseStream.on(this.OPENARRAY, function () {
    self.push({
      type: self.OPENARRAY
    });
  });

  // Opened a new object. The key argument is the first key in the object,
  // not the key of the parent object, if it exists.
  this.parseStream.on(this.OPENOBJECT, function (key) {
    self.push({
      type: self.OPENOBJECT,
      data: key
    });
  });

  // Found a value: could be in an array or an object.
  this.parseStream.on(this.VALUE, function (value) {
    self.push({
      type: self.VALUE,
      data: value
    });
  });

  // ------------------------------------------------------------------------
  // React to an error emitted by the Clarinet parser.
  // ------------------------------------------------------------------------

  // Treat an error the same way as any other event, and push it to the
  // queue. Any syntax errors in JSON will generally result in a lot of error
  // events before it works its way back to any following correct JSON.
  this.parseStream.on(this.ERROR, function (error) {
    self._pushToParseEventQueue({
      type: self.ERROR,
      data: error
    });
  });

}
util.inherits(ClarinetObjectStream, Transform);

//---------------------------------------------------------------------------
// Methods
//---------------------------------------------------------------------------

/**
 * Feed written data directly into the clarinet stream.
 *
 * @see Transform#_transform
 */
ClarinetObjectStream.prototype._transform = function(chunk, encoding, callback) {
  // Set the callback so that it can be invoked when parseStream says that it
  // is done.
  this.transformCallbackQueue[this.transformCallbackQueue.length] = callback;
  this.parseStream.write(chunk);
};

//---------------------------------------------------------------------------
// Export class constructor.
//---------------------------------------------------------------------------

module.exports = ClarinetObjectStream;

Here is how this would be used in practice to pipe Clarinet parse event objects to another stream that can process them to some ultimate goal.

var clarinetObjectStream = new ClarinetObjectStream();
var processingObjectStream = new ProcessingObjectStream();
var readStream = fs.createReadStream('path/to/my.json');

readStream
  .pipe(clarinetObjectStream)
  .pipe(processingObjectStream);

The Clarinet Object Stream Package

You'll find the Clarinet object stream class packaged up at GitHub, or you can install it via NPM:

npm install clarinet-object-stream