Dave Kearney Dave Kearney - 3 months ago 29
Node.js Question

node.js imqplib sendToQueue to RabbitMQ is hanging

I have a function for queuing a message to RabbitMQ like so:

var amqp = require('amqplib/callback_api');

var _queueURL = 'amqp://127.0.0.1';
var _toBlahBlahQueueName = 'blahblah';

var self = module.exports = {
queueMessage: function (msgObj, callback) {
try {
amqp.connect(_queueURL, function (err, connection) {
if (err) {
callback(err);
}

connection.createChannel(function (err, channel) {
if (err) {
callback(err);
}

channel.assertQueue(_toBlahBlahQueueName, { durable: true }, function (err, _ok) {
if (err) {
callback(err);
}

var msg = new Buffer(JSON.stringify(msgObj));

channel.sendToQueue(_toBlahBlahQueueName, msg, { persistent: true }, function (err, ok) {
if (err) {
console.log(err);
callback(err);
}

console.log('published', ok);

channel.connection.close();
callback(null, { message: 'queued' });
});
});
});
});
}
catch (e) {
console.log(e.stack);
callback(e);
}
}
};


I am calling the function queueMessage with messages that are about 250K in length.

The sendToQueue call is hanging every time. It just sits there without returning an error. However, the message seems to get queued!

The server log has the error message: client unexpectedly closed TCP connection

Thanks for any help!

Answer

amqplib does not support callback for sendToQueue or publish.

the documentation shows this is not an option:

Channel#sendToQueue Promises and callbacks

sendToQueue(queue, content, [options])

Send a single message with the content given as a buffer to the specific queue named, bypassing routing. The options and return value are exactly the same as for publish.

To work around this, you need to call sendToQueue as if it were a synchronous message.

If you want to immediately exit the app, you will have to wait a few milliseconds before doing so. Failure to do that will result in the message not being sent.

Here is an example of how you could change your code to work this way:

channel.sendToQueue(_toBlahBlahQueueName, msg, { persistent: true });
setTimeout(function () {
  channel.connection.close();
  callback(null, { message: 'queued' });
}, 500);
Comments