user1513388 user1513388 - 2 months ago 24
Node.js Question

processing highland stream chunks using async

I'm using

highland.js
to process a file using a stream to read the contents between two delimiters. I'm also using
async.js
to run a series of http requests in sequence.

Ideally I would like to pass the output
x
from highland as the first function to the
async
series (chain) so that the HTTP requests get executed for each chunk extracted from the stream.

Is this possible? If so, how can this be achieved?

var async = require('async');
var _ = require('highland');


_(fs.createReadStream(files[0], { encoding: 'utf8' }))
.splitBy('-----BEGIN-----\n')
.splitBy('\n-----END-----\n')
.filter(chunk => chunk !== '')
.each(function (x) {
}).done(function () {

async.series([
function(callback) {
setTimeout(function() {
console.log('Task 1');
callback(null, 1);
}, 300);
},
function(callback) {
setTimeout(function() {
console.log('Task 2');
callback(null, 2);
}, 200);
},
], function(error, results) {
console.log(results);
});

});;

Answer

You can get rid of the calls to each and done. After filtering, you can follow it up with .toArray(callback). The callback is passed an array that contains the results from highland. You might refactor like this

var Q = require('q');
var _ = require('highland');


_(fs.createReadStream(files[0], { encoding: 'utf8' }))
        .splitBy('-----BEGIN-----\n')
        .splitBy('\n-----END-----\n')
        .filter(chunk => chunk !== '')
        .each(asyncTasks);

function asyncTasks(x) { // here, x will be each of the results from highland
    async.series([
      // do something with x results
        function(callback) {
          console.log('Task 1');
          callback(null, 1);
        },
        // do something else with x results
        function(callback) {
          console.log('Task 2');
          callback(null, 2);
        },
    ], function(error, results) {
        console.log(results);
    });
}

here is a link to the documentation for toArray. toArray consumes the stream, just as done also does. Let me know if you have any questions.

Although honestly, I think you would be better off using promises instead. Although part of it is just personal preferrence, partly because it makes the code more readable. From what I've read, async is more performant than promises, but the nice part about promises is that you can pass the results from one function to the next. So in your example, you could do some stuff to x in the first part, and then pass your modified result to the next function, and the next function, and so on. Where when you're using async.series, you finish each function by calling callback(null, result), and you don't get the results until you finish at the very end of the series when you get all the results from all of the calls to callback. Now, you could always save your results to some variable outside of async.series, but that would make your code messier. If you wanted to re-write it with promises, it would look as follows. I am using q here, but it is just one of many promise libraries you could use.

    var async = require('async');
    var _ = require('highland');


    _(fs.createReadStream(files[0], { encoding: 'utf8' }))
            .splitBy('-----BEGIN-----\n')
            .splitBy('\n-----END-----\n')
            .filter(chunk => chunk !== '')
            .each(asyncTasks);

    function asyncTasks(x) { // here, x will be an array of the results from highland
      return asyncTask1(x)
              .then(asyncTask2)
              .then(asyncTask3)
    }

    function asyncTask1(x) {
      var deferred = Q.defer();

      // do some stuff

      if (// some error condition) {
        deferred.reject();
      } else {
        deferred.resolve(x); // or pass along some modified version of x
      }

      return deferred.promise;
    }

    function asyncTask2(x) {
      // same structure as above
    }

    function asyncTask3(x) {
      // same structure as above
    }

Some asynchronous APIs these days have started to return promises, in addition to accepting callback, or sometimes in place of. So it would be a good thing to get comfortable with. Promises are super useful. You can read more about them here and here.

Comments