Julian Julian - 1 year ago 74
Node.js Question

Wait for callback of async function in last stream.on('data') event

I am using fast-csv to iterate over a CSV file using a

. For each row of the CSV file, I want to create a job in redis, for which I use kue. Parsing a row is synchronous function. The whole thing looks like this:

var csvStream = fastCsv(_config.csvOptions)
.on('data', function(data) {
var stream = this;

var payload = parseRow(data, _config);
console.log(payload); // the payload is always printed to the console

var job = kue.create('csv-row', {
payload: payload
.save(function(err) {
if (!err) console.log('Enqueued at ' + job.id);
else console.log('Redis error ' + JSON.stringify(err));

.on('end', function() {
callback(); // this ends my parsing

The simple
shows for every row of my CSV file, however the job is not created. I.e., none of the outputs in the callback of
are being printed, and the job is not in my redis.

I assume, because it's the last row of the CSV file, the stream already emits
and so the last
cannot be executed before the process terminates?

Is there a way to halt the stream's
until kue is done?

Answer Source

You can solve this problem using async library. You can use the below pattern for any streams.

var AsyncLib = require('async');

var worker = function (payload, cb) {
    //do something with payload and call callback
    return cb();

var concurrency = 5;
var streamQueue = AsyncLib.queue(worker, concurrency);

var stream = //some readable stream;

stream.on('data', function(data) {
    //no need to pause and resume
    var payload = '//some payload';
.on('end', function() {
    //register drain event on end and callback
    streamQueue.drain = function () {
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download