matthjes matthjes - 1 month ago 11
Java Question

Spring Integration DSL: PublishSubscribeChannel order

I want to understand how the PublishSubscribeChannel works, so I've implemented a small example:

@Bean
public MessageSource<?> integerMessageSource() {
MethodInvokingMessageSource source = new MethodInvokingMessageSource();
source.setObject(new AtomicInteger());
source.setMethodName("getAndIncrement");
return source;
}



@Bean
public IntegrationFlow mainFlow() {
// @formatter:off
return IntegrationFlows
.from(integerMessageSource(), c -> c.poller(Pollers.fixedRate(1000)))
.publishSubscribeChannel(pubSub -> pubSub
.subscribe(flow -> flow
.handle(message -> LOG.info("Handling message, step 1: {}", message.getPayload())))
.subscribe(flow -> flow
.handle(message -> LOG.info("Handling message, step 2: {}", message.getPayload())))
.subscribe(flow -> flow
.transform(source -> MessageBuilder.withPayload("Error").build())
.handle(message -> {
LOG.info("Error");
}))
.subscribe(flow -> flow
.handle(message -> LOG.info("Handling message, step 4: {}", message.getPayload())))
)
.get();
// @formatter:on
}


I've expected that I see as an output:

Handling message, step 1...
Handling message, step 2...
Error
Handling message, step 4...


But the third subflow (with the "Error" output) is always processed first. When I try to define an order I for steps 1, 2, and 4, I get the following console output (warning):

o.s.integration.dsl.GenericEndpointSpec : 'order' can be applied only for AbstractMessageHandler


I would have expected that the subscribers gets called in the order of subscription, but this does not seem to be the case.

I'm using Spring Boot 1.5.4 and Spring Integration 4.3.10.

Answer Source

The issue is that the lambda handlers are not Ordered - the general contract for a pub/sub channel is to invoke Ordered subscribers first (in order) and then unordered subscribers.

Since lambdas can't implement multiple interfaces, I am not sure there's anything we can do.

As a work-around, you could do something like...

@Bean
public IntegrationFlow mainFlow() {
    // @formatter:off
    return IntegrationFlows
        .from(integerMessageSource(), c -> c.poller(Pollers.fixedRate(1000)))
        .publishSubscribeChannel(pubSub -> pubSub
            .subscribe(flow -> flow
                .handle(handler("Handling message, step 1: {}")))
            .subscribe(flow -> flow
                .handle(handler("Handling message, step 2: {}")))
            .subscribe(flow -> flow
                .transform(message -> "Error")
                .handle(message -> {
                    LOG.info("Error");
                }))
            .subscribe(flow -> flow
                .handle(handler("Handling message, step 4: {}")))
        )
        .get();
    // @formatter:on
}

private MessageHandler handler(String format) {
    return new AbstractMessageHandler() {

        @Override
        protected void handleMessageInternal(Message<?> message) throws Exception {
            LOG.info(format, message.getPayload());
        }

    };

}

So that all the subscribers are Ordered.

EDIT

Here's a slightly easier workaround - start the subflows with a bridge instead of a lambda so all the subflow first components implement Ordered...

@Bean
public IntegrationFlow mainFlow() {
    // @formatter:off
    return IntegrationFlows
        .from(integerMessageSource(), c -> c.poller(Pollers.fixedRate(1000)))
        .publishSubscribeChannel(pubSub -> pubSub
            .subscribe(flow -> flow
                .bridge(e -> e.id("s1"))
                .handle(message -> LOG.info("Handling message, step 1: {}", message.getPayload())))
            .subscribe(flow -> flow
                .bridge(e -> e.id("s2"))
                .handle(message -> LOG.info("Handling message, step 2: {}", message.getPayload())))
            .subscribe(flow -> flow
                .transform(source -> MessageBuilder.withPayload("Error").build())
                .handle(message -> {
                    LOG.info("Error");
                }))
            .subscribe(flow -> flow
                .bridge(e -> e.id("s4"))
                .handle(message -> LOG.info("Handling message, step 4: {}", message.getPayload())))
        )
        .get();
    // @formatter:on
}