Processing S3 Inventory Data in Node.js

January 21st, 2017 Permalink

If you want better data on the contents of your S3 buckets than is provided by AWS metrics, then you should be using S3 Inventory, a service that provides CSV manifests of bucket contents, file by file and version by version. It compares very favorably with the only alternative, which is to list the bucket contents yourself, a task that becomes ever less simple and ever more costly as the buckets grow in size. Following on from a recent post on automation to set up S3 Inventory configurations for all of your S3 buckets, here I'll provide example code for processing CSV manifest files.

Processing CSV Data via Streams

Stream programming is a very satisfying paradigm, especially in an ecosystem like the one that exists for Node.js, in which a great many stream packages exist. Quite complex tasks can be rendered down to a few require statements, a little setup of stream instances, and then finally a.pipe(b).pipe(c).pipe(d)... and you are done. At its core, processing S3 inventory manifests is one of these tasks: we need to stream the gzipped file contents, unpack the data, parse it into an object per CSV row, and then process the objects into some form of a useful summary of contents. All of the necessary tools exist, it is just a matter of joining them together as follows:

// Core.
var zlib = require('zlib');

// NPM.
var AWS = require('aws-sdk');
var csv = require('csv-parser');
var s3ObjectStreams = require('s3-object-streams');

/**
 * Download and process the inventory file.
 * 
 * @param {String} bucket The bucket containing the inventory file.
 * @param {String} csvFileKey The key for the inventory file.
 * @param {String[]} headers The column headers.
 * @param {Function} callback Of the form function (error, data).
 */
function processInventoryFile (bucket, csvFileKey, headers, callback) {
  // Use default credentials from the environment.
  var client = new AWS.S3();

  // -----------------------------------------------------------------------
  // Create stream objects.
  // -----------------------------------------------------------------------

  // Unpacking the compressed stream is straightforward and uncomplicated.
  var gunzip = zlib.createGunzip();

  // With the headers configuration this will emit objects rather than arrays
  // of values.
  var csvParser = csv({
    headers: headers
  });

  // This utility counts files and size by common prefixes.
  var s3InventoryUsageStream = new s3ObjectStreams.S3InventoryUsageStream({
    // Group two levels deep into the folders past the provided prefix.
    depth: 2,
    // Only send a running total once every 1,000,000 objects. Essentially we
    // only want it at the end, but this is fine.
    outputFactor: 1000000
  });

  // This is how to obtain a stream of file contents from S3.
  var params = {
    Bucket: bucket,
    Key: csvFileKey
  };
  var dataStream = client.getObject(params).createReadStream();

  // -----------------------------------------------------------------------
  // Add stream event handlers.
  // -----------------------------------------------------------------------

  var runningTotals;

  // Set up the inventory stream to call back with the totals once done.
  s3InventoryUsageStream.on('data', function (totals) {
    runningTotals = totals;
  });
  s3InventoryUsageStream.on('end', function () {
    callback(null, runningTotals);
  });

  // Set all of the needed the error handlers.
  dataStream.on('error', callback);
  gunzip.on('error', callback);
  csvParser.on('error', callback);
  s3InventoryUsageStream.on('error', callback);

  // -----------------------------------------------------------------------
  // Set things in motion.
  // -----------------------------------------------------------------------

  dataStream.pipe(gunzip).pipe(csvParser).pipe(s3InventoryUsageStream);
}
// We have to know the location of the CSV file in order to download it.
var exampleBucket = 'inventory-bucket';
var exampleCsvFileKey = '_inventory/inventoried-bucket/inventory-id/data/e99a3da6-1a7b-443b-8076-2727ab52b00a.cvs.gz';
// The file does not contain a header row, so we must know the column names
// in order to construct an object with appropriate parameters.
var exampleColumns = [
  'Bucket', 
  'Key', 
  'VersionId', 
  'IsLatest', 
  'IsDeleteMarker', 
  'Size', 
  'LastModifiedDate', 
  'ETag', 
  'StorageClass', 
  'ReplicationStatus'
];

processInventoryFile(
  exampleBucket,
  exampleCsvFileKey,
  exampleColumns
  function (error, data) {
    if (error) {    
      console.error(util.format(
        'Error in processing S3 inventory file s3://%s/%s: %s',
        exampleBucket,
        exampleCsvFileKey,
        error.stack
      ));
    }
    else {
      console.info(JSON.stringify(data, null, '  '));
    }
  }
);

A Note on CSV Parsing Speed

The speed of CSV parsers varies quite considerably, even for those with very similar APIs and capabilities. When processing what may well amount to gigabytes of CSV data for a sizable AWS account, it matters as to whether the parser is twice as fast or twice as slow. For most Node.js projects, the csv-parser package is the best choice, and is much faster than the parser used by the csv package.

Obtaining the CSV Files and their Format

With the core method of parsing and processing out of the way, how to obtain the list of files to process? Inventory data for the inventoried-bucket, with a configuration of ID inventory-id and a prefix of _inventory would be written to files with keys representing the following directory layout. A manifest.checksum is written last of all, so is a flag for completion, and manifest.json contains information about the files. Each inventory, daily or weekly, has a manifest written to its own directory titled for the timestamp of the inventory, but the data files for all inventories for this bucket are written to the same data prefix.

_inventory
  inventoried-bucket
    inventory-id
      2016-12-10T18-01Z
        manifest.checksum
        manifest.json
      2016-12-09T15-01Z
        manifest.checksum
        manifest.json
      ...
      data
        e99a3da6-1a7b-443b-8076-2727ab52b00a.cvs.gz
        ...

The manifest.json file for each inventory has the following format. Note that it specifies the columns for the CSV files as well as the location of those files; the CSV files themselves have no leading column row.

{
  "sourceBucket" : "inventoried-bucket",
  "destinationBucket" : "arn:aws:s3:::destination-bucket",
  "version" : "2016-11-30",
  "fileFormat" : "CSV",
  "fileSchema" : "Bucket, Key, VersionId, IsLatest, IsDeleteMarker, Size, LastModifiedDate, ETag, StorageClass, ReplicationStatus",
  "files" : [ 
    {
      "key" : "_inventory/inventoried-bucket/inventory-id/data/e99a3da6-1a7b-443b-8076-2727ab52b00a.csv.gz",
      "size" : 13453068,
      "MD5checksum" : "519afba5c656d2496b29747384b837ee"
    },
    ...
  ]
} 

For the sake of argument, let us say that the task here is to process the latest completed inventory for the contents of inventoried-bucket. The steps might look something like the following:

  • Figure out which of the manifest files is the most recent among completed manifests.
  • Download the manifest file.
  • Stream and process all of the CSV files listed in the manifest.
  • Merge the resulting totals for each file into one overall result.

Pulling it all Together in an Example Implementation

The implementation below uses the processInventoryFile function defined above. For the sake of simplicity it is somewhat inefficient; it runs through the CSV files for a given inventory one at a time, in a single thread. Large buckets can have scores of these files, but CSV parsing is CPU-intensive, so more than one concurrent stream per thread is usually a bad idea. If parallelizing this code, the best way to go about it is to spawn additional Node.js processes, one per CSV file, to the limit of the number of cores on the machine.

// Core.
var util = require('util');

// NPM.
var async = require('async');
var AWS = require('aws-sdk');
var _ = require('lodash');

/**
 * Find the key for the most recently completed manifest for this bucket, given 
 * the various settings.
 *
 * @param {String} bucket The bucket containing the inventory files.
 * @param {String} inventoriedBucket The bucket that was inventoried.
 * @param {String} inventoryPrefix The prefix given in the inventory 
 *   configuration for the inventoried bucket.
 * @param {String} inventoryId The ID of the inventory configuration.
 * @param {Function} callback Callback of the form function (error, string).
 */
function getMostRecentInventoryManifestKey (
  bucket,
  inventoriedBucket,
  inventoryPrefix,
  inventoryId,
  callback
) {
  // Use default credentials from the environment.
  var client = new AWS.S3();

  // The first step is to list the manifest objects. Here we are assuming that
  // there will be less than 1000 of them, as we are not paging the results.
  var params = {
    Bucket: task.destinationBucket,
    // The manifest files have key prefixes of the following form:
    //
    // sourceBucket/inventoryId/2017-01-17T05-03Z/
    //
    // We want to avoid scanning the much larger number of files under:
    //
    // sourceBucket/inventoryId/data/
    //
    // So use a prefix of the form:
    //
    // sourceBucket/inventoryId/2
    Prefix: util.format(
      '%s/%s/2',
      task.sourceBucket,
      env.inventoryId
    )
  };

  client.listObjects(params, function (error, results) {
    if (error) {
      return callback(error);
    }

    // Look for the most recent manifest.checksum. This file is only written
    // when inventory is complete and other files are all created.
    var mostRecent;

    _.each(results.Contents, function (s3Object) {
      if (s3Object.Key.match(/checksum$/)) {
        if (
          !mostRecent ||
          mostRecent.LastModified.getTime() < s3Object.LastModified.getTime()
        ) {
          mostRecent = s3Object;
        }
      }
    });

    // This should be fairly unusual, but it is still an edge case to be 
    // accounted for.
    if (!mostRecent) {
      return callback(new Error(util.format(
        'No completed inventory manifest found for %s in %s',
        inventoriedBucket,
        bucket
      )));
    }

    // A manifest.checksum implies that there is a manifest.json sharing the 
    // same prefix. So we can return that key with a simple substitution:
    callback(null, mostRecent.Key.replace(/checksum$/, 'json'));
  });
}

/**
 * Parse and process the CSV files for the given manifest.
 *
 * @param {Object} manifest The task.
 * @param {Object} manifest The manifest.
 * @param {Function} callback Callback of the form function (error, object).
 */
function processManifest (
  bucket,
  manifest,
  callback
) {
  // How many data files to stream at once. Too many will be slow and 
  // problematic. CSV parsing is a CPU-intensive activity, and I have found
  // even two concurrent parsing streams in a single Node.js process to be 
  // inefficient. But try it and see.
  var concurrency = 1;

  async.mapLimit(
    manifest.files,
    concurrency,
    function (fileObj, asyncCallback) {
      processInventoryFile(
        bucket,
        fileObj.key,
        // The manifest contains the headers as a string rather than an array
        // for some unknown reason.
        manifest.fileSchema.split(', '),
        asyncCallback
      }
    },
    function (error, usageDataArrays) {
      if (error) {
        return callback(error);
      }

      // The usageDataArrays variable should be:
      // 
      // [
      //   [
      //     {
      //       path: 'bucket/folder1',
      //       storageClass: {
      //         STANDARD: {
      //           count: 55,
      //           size: 1232983
      //         },
      //         STANDARD_IA: {
      //           count: 0,
      //           size: 0
      //         },
      //         REDUCED_REDUNDANCY: {
      //           count: 2,
      //           size: 5638
      //         },
      //         GLACIER: {
      //           count: 0,
      //           size: 0
      //         }
      //       }
      //     },
      //     ...
      //   ],
      //
      //   ...
      // ]
      //
      // Merge the data objects, which basically means adding the numbers for
      // the same common prefixes.
      var byCommonPrefix = {};

      _.each(usageDataArrays, function (usageDataArray) {
        _.each(usageDataArray, function (data) {
          byCommonPrefix[data.path] = _.mergeWith(
            byCommonPrefix[data.path],
            data,
            function (a, b) {
              if (typeof a === 'number' && typeof b === 'number') {
                return a + b;
              }

              // Returning undefined for other values implies that the standard
              // merge behavior is used.
              return undefined;
            }
          );
        });
      });

      callback(
        null,
        _.chain(byCommonPrefix).values().sortBy(function (obj) {
          return obj.path;
        }).value()
      );
    }
  );
};

/**
 * Run all the necessary tasks for the inventory of a single bucket.
 *
 * @param {String} bucket The bucket containing the inventory files.
 * @param {String} inventoriedBucket The bucket that was inventoried.
 * @param {String} inventoryPrefix The prefix given in the inventory 
 *   configuation for the inventoried bucket.
 * @param {String} inventoryId The ID of the inventory configuration.
 * @param {Function} callback Callback of the form function (error).
 */
function process (
  bucket,
  inventoriedBucket,
  inventoryPrefix,
  inventoryId,
  callback
) {
  // Use default credentials from the environment.
  var client = new AWS.S3();
  var manifestKey;
  var manifest;
  var usageData;

  async.series({
    getMostRecentInventoryManifestKey: function (asyncCallback) {
      getMostRecentInventoryManifestKey(
        bucket,
        inventoriedBucket,
        inventoryPrefix,
        inventoryId,
        function (error, mostRecentManifestKey) {
          manifestKey = mostRecentManifestKey;
          asyncCallback(error);
        }
      );
    },

    getManifest: function (asyncCallback) {
      client.getObject(
        {
          Bucket: bucket,
          Key: manifestKey
        },
        function (error, result) {
          if (error) {
            return asyncCallback(error);
          }

          try {
            manifest = JSON.parse(result.Body.toString('utf8'));
          }
          catch (jsonError) {
            return asyncCallback(new Error(util.format(
              'JSON parsing error for manifest s3://%s/%s: %s',
              task.destinationBucket,
              manifestKey,
              message.body
            )));
          }

          asyncCallback();
        }
      );
    },

    processManifest: function (asyncCallback) {
      processManifest(
        bucket
        manifest,
        function (error, processedUsageData) {
          usageData = processedUsageData;
          asyncCallback(error);
        }
      );
    }
  }, function (error) {
    callback(error, usageData);
  });
};
var exampleBucket = 'inventory-bucket';
var exampleInventoriedBucket = 'inventoried-bucket';
var exampleInventoryPrefix = '_inventory';
var exampleInventoryId = 'inventory-id';

process(
  exampleBucket,
  exampleInventoriedBucket,
  exampleInventoryPrefix,
  exampleInventoryId,
  function (error, data) {
    if (error) {    
      console.error(error);
    }
    else {
      console.info(JSON.stringify(data, null, '  '));
    }
  }
);