loesak loesak - 5 months ago 33
Java Question

Spring: IdempotentReceiverInterceptor can only be used on MessageHandlers?

I'm trying to use the IdempotentReceiverInterceptor to prevent my integration flow from emitting messages it has already produced. However, it seems that IdempotentReceiverInterceptor#invoke only wants components that extend MessageHandler (to a degree) or have a method called "handleMessage" who's first argument is a Message. I want to deduplicate the message and if it falls through, perform some additional processing.

My questions is two fold:


  • Is there a reason why this advice shouldn't be allowed on any
    component; Transformer, GenericHandler, etc.?

  • Because its only allowed to run on MessageHandler(s) or anything that has a handleMessage method, I tried using the AbstractReplyProducingMessageHandler (because i need to do additional processing), but that does not work either.



I feel like I'm misunderstanding the proper use of the IdempotentReceiverInterceptor, that i should be able to use it on almost any component, but the implementation doesn't seem to agree with me. Am I incorrect on how it should be used or am I using it wrong? I know i could probably just throw the MetadataStoreSelector in the flow as a filter, but am trying to do things as recommended by SI.

Any help is appreciated. Here is an example flow that i would have expected to work.

return IntegrationFlows
.from(messageProducer)
.<String, UUID>transform(s -> UUID.fromString(s))
.claimCheckOut(messageStore)
.handle(
new AbstractReplyProducingMessageHandler() {

@Override
protected Object handleRequestMessage(final Message<?> requestMessage) {
return requestMessage;
}

},
spec -> {
spec.advice(idempotentReceiverInterceptor);
})
// do some more stuff
.transform(transformer)
.handle(loggingHandler)
.get();


and here is the log message showing that it didnt

This advice org.springframework.integration.handler.advice.IdempotentReceiverInterceptor can only be used for MessageHandlers; an attempt to advise method 'toString' in 'org.springframework.integration.handler.AbstractReplyProducingMessageHandler$AdvisedRequestHandler' is ignored


EDIT: for those that come here looking a solution, this is pretty much what i ended up doing anyways

return IntegrationFlows
.from(messageProducer)
.<String, UUID>transform(s -> UUID.fromString(s))
.claimCheckOut(messageStore)
.filter(metadataStoreSelector)
// do some more stuff
.transform(transformer)
.handle(loggingHandler)
.get();

Answer

According to the Reference Manual:

This is an AOP Advice, which is applied to the MessageHandler.handleMessage() method and can filter a request message or mark it as a duplicate, according to its configuration.

And the source code says:

boolean isMessageHandler = invocationThis != null && invocationThis instanceof MessageHandler;
boolean isMessageMethod = method.getName().equals("handleMessage")
        && (arguments.length == 1 && arguments[0] instanceof Message);

In other words only MessageHandler implementations can be affected with the IdempotentReceiverInterceptor.

That's one.

You worry about Transformer, GenericHandler for nothing. The Framework ends up with the particular MessageHandler wrapping anyway, e.g. ServiceActivatingHandler, MessageTransformingHandler etc. All of that just because it is like contract in the Spring Integration: channel -> endpoint -> messageHandler.

That's two.

Since the main premise of IdempotentReceiverInterceptor is to apply logic exactly to the consumer endpoint (see its documentation), you can't use it from the spec.advice(), because that is exactly for the handleRequestMessage(), as you expected. But that doesn't work for IdempotentReceiverInterceptor.

Yeah... Unfortunately there is no so easy way to utilize it via Java DSL style. Although you can distinguish your flow via message channels and extract @ServiceActivator (@Transformer) together with the @IdempotentReceiver into separate @Service, until we figure out the solution for "raw" Java config.

Feel free to raise a JIRA ticket on the matter. I think ConsumerEndpointFactoryBean should distinguish the IdempotentReceiverInterceptor from all other in the advice and apply it properly to the handleMessage finally. So, your expected config will work.

Thank you for spotting that out!