Mati Mati - 1 year ago 183
Java Question

How to create Processor with Transaction and DLQ with Rabbit binding?

I'm just starting to learn Spring Cloud Streams and Dataflow and I want to know one of important use cases for me. I created example processor Multiplier which takes message and resends it 5 times to output.

public class MultiplierProcessor {
private Source source;

private int repeats = 5;

public void handle(String payload) {
for (int i = 0; i < repeats; i++) {
if(i == 4) {
throw new RuntimeException("EXCEPTION");
source.output().send(new GenericMessage<>(payload));

What you can see is that before 5th sending this processor crashes. Why? Because it can (programs throw exceptions). In this case I wanted to practice fault prevention on Spring Cloud Stream.

What I would like to achieve is to have input message backed in DLQ and 4 messages that were send before to be reverted and not consumed by next operand (just like in normal JMS transaction). I tried already to define following properties in my processor project but without success.

Could you tell me if it possible and also what am I doing wrong? I would be overwhelmingly thankful for some examples.

Answer Source

You have several issues with your configuration:

  • missing .rabbit in the rabbit-specific properties)
  • you need a group name and durable subscription to use autoBindDlq
  • autoBindDlq doesn't apply on the output side

The consumer has to be transacted so that the producer sends are performed in the same transaction.

I just tested this with 1.0.2.RELEASE:

and it worked as expected.


Actually, no, the published messages were not rolled back. Investigating...


OK; it does work, but you can't use republishToDlq - because when that is enabled, the binder publishes the failed message to the DLQ and the transaction is committed.

When that is false, the exception is thrown to the container, the transaction is rolled back, and RabbitMQ moves the failed message to the DLQ.

Note, however, that retry is enabled by default (3 attempts) so, if your processor succeeds during retry, you will get duplicates in your output.

For this to work as you want, you need to disable retry by setting the max attempts to 1 (and don't use republishToDlq).


OK, this will work, when the fix for this JIRA is applied to Spring AMQP...

@EnableBinding({ Processor.class, So39018400Application.Errors.class })
public class So39018400Application {

    public static void main(String[] args) {, args);

    public Foo foo() {
        return new Foo();

    public interface Errors {

        MessageChannel errorChannel();


    private static class Foo {

        Source source;

        Errors errors;

        public void handle (Message<byte[]> in) {
            try {
                source.output().send(new GenericMessage<>("foo"));
                source.output().send(new GenericMessage<>("foo"));
                throw new RuntimeException("foo");
            catch (RuntimeException e) {
                        .setHeader("foo", "bar") // add whatever you want, stack trace etc.
                throw e;



with properties:
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download