gstackoverflow gstackoverflow - 1 month ago 7
Java Question

How does spring jms distribute messages among durable topic listeners?

raw jms code:

TopicSubscriber durSubscriber1 = receiverSession.createDurableSubscriber(topic,"subscription_1");
durSubscriber1.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {

RMQTextMessage rmqTextMessage = ((RMQTextMessage) message);
try {
System.out.println("sub_1:" + rmqTextMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}

}
});

TopicSubscriber durSubscriber2 = receiverSession.createDurableSubscriber(topic,"subscription_2");
durSubscriber2.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {

RMQTextMessage rmqTextMessage = ((RMQTextMessage) message);
try {
System.out.println("sub_2:" + rmqTextMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}

}
});


When I use following code each listener gets all messages. If I use same subscription name - application doesn't start.

From another hand I wtote code which uses spring jms:

config:

@Bean
public JmsListenerContainerFactory<?> myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// This provides all boot's default to this factory, including the message converter
configurer.configure(factory, connectionFactory);
// You could still override some of Boot's default if necessary.
factory.setPubSubDomain(true);
factory.setSubscriptionDurable(true);
return factory;
}


listeners:

@JmsListener(destination = "my_topic_new", containerFactory = "myFactory")
public void receiveTopic(Email email) {
System.out.println("list_1:" + email);
}

@JmsListener(destination = "my_topic_new", containerFactory = "myFactory")
public void receiveTopicDup(Email email) {
System.out.println("list_2:" + email);
}


At this case both listeners divide messages. I mean that if producer sends 10 messages then listener_1 will get N messages and listener_2 will get M messages.

M+N=10

Please explain difference at 2 code snippets.
Can you provide corresponding jms code for spring-jms version ?

Answer Source

It's due to the way the rabbit JMS client maps JMS to native AMQP. You have to give each listener a subscription id; otherwise they will compete for messages in the same queue.

@JmsListener(destination = "my_topic_new", containerFactory = "myFactory", subscription = "foo")
public void receiveTopic(String email) {
    System.out.println("list_1:" + email);
}

@JmsListener(destination = "my_topic_new", containerFactory = "myFactory", subscription = "bar")
public void receiveTopicDup(String email) {
    System.out.println("list_2:" + email);
}