occasl occasl - 2 months ago 32
Node.js Question

Node.js spawn multiple SQS listeners threads in one project

I have a Node.js project that is consuming messages off AWS SQS (a message queue). I would like to spin up multiple consumers in the same project to keep up with the messages placed in the queue without having to create another Node.js instance. Is it possible to do that with something like Fibers or another framework? I'm basically trying to make the act of receiving messages multithreaded (competing consumers pattern).

Here's an example:

var AWS = require('aws-sdk'),
nconf = require('nconf'),
SQS_URL = process.env.SQS_SERVICES_EVENTS;

AWS.config.update({accessKeyId: nconf.get("accessKeyId"), secretAccessKey: nconf.get("secretAccessKey")});
AWS.config.update({region: nconf.get("region")});

// Initialize SQS
var sqs = new AWS.SQS();

// Params for SQS
var MAX_NUM_MSGS = nconf.get("sqs.max.messages");
var params = {
QueueUrl: SQS_URL,
MaxNumberOfMessages: MAX_NUM_MSGS,
VisibilityTimeout: 30,
WaitTimeSeconds: 20
};

exports.startSqsListener = setInterval(sqsListener, nconf.get("sqs.interval"));

// Make multi-threaded
function sqsListener() {
sqs.receiveMessage(params, function(err, data) {
if (err) {
logger.error(err, err.stack);
}
if (data.Messages) {
// do something with each message
}
});
}

Answer

If you want to process more messages simultaneously you could write an async function to implement the logic for "doing something with each message". As long as the messages don't need to be processed in sequential order, node will automatically queue up each async execution and run them concurrently.

sqs.receiveMessage(params, function(err, data) {
    if (err) {
        logger.error(err, err.stack);
    }
    if (data.Messages) {
        data.Messages.forEach(function(message) {
            processMessage(message, function(err) {
                //check for error, do something with it
            });
        });
    }
});

function processMessage(messageData, callback) {
    //move your processing here

    callback(); //or send error back, if needed
}