Mati Mati - 3 months ago 65
Java Question

How to create Processor with Transaction and DLQ with Rabbit binding?

I'm just starting to learn Spring Cloud Streams and Dataflow and I want to know one of important use cases for me. I created example processor Multiplier which takes message and resends it 5 times to output.

@EnableBinding(Processor.class)
public class MultiplierProcessor {
@Autowired
private Source source;

private int repeats = 5;

@Transactional
@StreamListener(Processor.INPUT)
public void handle(String payload) {
for (int i = 0; i < repeats; i++) {
if(i == 4) {
throw new RuntimeException("EXCEPTION");
}
source.output().send(new GenericMessage<>(payload));
}
}
}


What you can see is that before 5th sending this processor crashes. Why? Because it can (programs throw exceptions). In this case I wanted to practice fault prevention on Spring Cloud Stream.

What I would like to achieve is to have input message backed in DLQ and 4 messages that were send before to be reverted and not consumed by next operand (just like in normal JMS transaction). I tried already to define following properties in my processor project but without success.

spring.cloud.stream.bindings.output.producer.autoBindDlq=true
spring.cloud.stream.bindings.output.producer.republishToDlq=true
spring.cloud.stream.bindings.output.producer.transacted=true

spring.cloud.stream.bindings.input.consumer.autoBindDlq=true


Could you tell me if it possible and also what am I doing wrong? I would be overwhelmingly thankful for some examples.

Answer

You have several issues with your configuration:

  • missing .rabbit in the rabbit-specific properties)
  • you need a group name and durable subscription to use autoBindDlq
  • autoBindDlq doesn't apply on the output side

The consumer has to be transacted so that the producer sends are performed in the same transaction.

I just tested this with 1.0.2.RELEASE:

spring.cloud.stream.bindings.output.destination=so8400out

spring.cloud.stream.rabbit.bindings.output.producer.transacted=true

spring.cloud.stream.bindings.input.destination=so8400in
spring.cloud.stream.bindings.input.group=so8400

spring.cloud.stream.rabbit.bindings.input.consumer.durableSubscription=true
spring.cloud.stream.rabbit.bindings.input.consumer.autoBindDlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.transacted=true

and it worked as expected.

EDIT

Actually, no, the published messages were not rolled back. Investigating...

EDIT2

OK; it does work, but you can't use republishToDlq - because when that is enabled, the binder publishes the failed message to the DLQ and the transaction is committed.

When that is false, the exception is thrown to the container, the transaction is rolled back, and RabbitMQ moves the failed message to the DLQ.

Note, however, that retry is enabled by default (3 attempts) so, if your processor succeeds during retry, you will get duplicates in your output.

For this to work as you want, you need to disable retry by setting the max attempts to 1 (and don't use republishToDlq).

EDIT3

OK, this will work, when the fix for this JIRA is applied to Spring AMQP...

@SpringBootApplication
@EnableBinding({ Processor.class, So39018400Application.Errors.class })
public class So39018400Application {

    public static void main(String[] args) {
        SpringApplication.run(So39018400Application.class, args);
    }

    @Bean
    public Foo foo() {
        return new Foo();
    }

    public interface Errors {

        @Output("errors")
        MessageChannel errorChannel();

    }

    private static class Foo {

        @Autowired
        Source source;

        @Autowired
        Errors errors;

        @StreamListener(Processor.INPUT)
        public void handle (Message<byte[]> in) {
            try {
                source.output().send(new GenericMessage<>("foo"));
                source.output().send(new GenericMessage<>("foo"));
                throw new RuntimeException("foo");
            }
            catch (RuntimeException e) {
                errors.errorChannel().send(MessageBuilder.fromMessage(in)
                        .setHeader("foo", "bar") // add whatever you want, stack trace etc.
                        .build());
                throw e;
            }
        }

    }

}

with properties:

spring.cloud.stream.bindings.output.destination=so8400out

spring.cloud.stream.bindings.errors.destination=so8400errors
spring.cloud.stream.rabbit.bindings.errors.producer.transacted=false


spring.cloud.stream.rabbit.bindings.output.producer.transacted=true

spring.cloud.stream.bindings.input.destination=so8400in
spring.cloud.stream.bindings.input.group=so8400

spring.cloud.stream.rabbit.bindings.input.consumer.transacted=true
spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=false
spring.cloud.stream.bindings.input.consumer.max-attempts=1
Comments