mikemaccana mikemaccana - 1 month ago 32
Node.js Question

RabbitMQ / AMQP: single queue, multiple consumers for same message?

I am just starting to use RabbitMQ and AMQP in general.


  • I have a queue of messages

  • I have multiple consumers, which I would like to do different things with the same message.



Most of the RabbitMQ documentation seems to be focused on round-robin, ie where a single message is consumed by a single consumer, with the load being spread between each consumer. This is indeed the behavior I witness.

An example: the producer has a single queue, and send messages every 2 sec:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
var sendMessage = function(connection, queue_name, payload) {
var encoded_payload = JSON.stringify(payload);
connection.publish(queue_name, encoded_payload);
}

setInterval( function() {
var test_message = 'TEST '+count
sendMessage(connection, "my_queue_name", test_message)
count += 1;
}, 2000)


})


And here's a consumer:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
connection.queue("my_queue_name", function(queue){
queue.bind('#');
queue.subscribe(function (message) {
var encoded_payload = unescape(message.data)
var payload = JSON.parse(encoded_payload)
console.log('Recieved a message:')
console.log(payload)
})
})
})


If I start the consumer twice, I can see that each consumer is consuming alternate messages in round-robin behavior. Eg, I'll see messages 1, 3, 5 in one terminal, 2, 4, 6 in the other.

My question is:


  • Can I have each consumer receive the same messages? Ie, both consumers get message 1, 2, 3, 4, 5, 6? What is this called in AMQP/RabbitMQ speak? How is it normally configured?

  • Is this commonly done? Should I just have the exchange route the message into two separate queues, with a single consumer, instead?


Answer

Can I have each consumer receive the same messages? Ie, both consumers get message 1, 2, 3, 4, 5, 6? What is this called in AMQP/RabbitMQ speak? How is it normally configured?

No, not if the consumers are on the same queue. From RabbitMQ's AMQP Concepts guide:

it is important to understand that, in AMQP 0-9-1, messages are load balanced between consumers.

This seems to imply that round-robin behavior within a queue is a given, and not configurable. Ie, separate queues are required in order to have the same message ID be handled by multiple consumers.

Is this commonly done? Should I just have the exchange route the message into two separate queues, with a single consumer, instead?

No it's not, single queue/multiple consumers with each each consumer handling the same message ID isn't possible. Having the exchange route the message onto into two separate queues is indeed better.

As I don't require too complex routing, a fanout exchange will handle this nicely. I didn't focus too much on Exchanges earlier as node-amqp has the concept of a 'default exchange' allowing you to publish messages to a connection directly, however most AMQP messages are published to a specific exchange.

Here's my fanout exchange, both sending and receiving:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {   

    var sendMessage = function(exchange, payload) {
      console.log('about to publish')
      var encoded_payload = JSON.stringify(payload);
      exchange.publish('', encoded_payload, {})
    }

    // Recieve messages
    connection.queue("my_queue_name", function(queue){
      console.log('Created queue')
      queue.bind(exchange, ''); 
      queue.subscribe(function (message) {
        console.log('subscribed to queue')
        var encoded_payload = unescape(message.data)
        var payload = JSON.parse(encoded_payload)
        console.log('Recieved a message:')
        console.log(payload)
      })
    })

    setInterval( function() {    
      var test_message = 'TEST '+count
      sendMessage(exchange, test_message)  
      count += 1;
    }, 2000) 
 })
})