pkm pkm - 10 days ago 7
Java Question

Spring Integration Concurrency of ServiceActivators

This is in continuation of my earlier question Spring Integration File reading.
In summary, I have a fileIn channel ( a Queue) and then a ServiceActivator which processes the file and an outbound-adapter to save the file.

I wanted to introduce concurrency, to process the messages in multiple threads. I am using java DSL ( but not Java8). I was able to do it using following way...

@Bean
public MessageChannel fileInChannel() {
return MessageChannels.queue("fileIn").get();
}

@Bean
public IntegrationFlow fileProcessingFlow() {
return IntegrationFlows.from(fileInChannel())
.handle(myFileProcessor, "processFile",
new Consumer<GenericEndpointSpec<ServiceActivatingHandler>>() {
@Override
public void accept(GenericEndpointSpec<ServiceActivatingHandler> t) {
t.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1).taskExecutor(Executors.newCachedThreadPool()));
}
})
.handle(Files.outboundAdapter(new File(outDir)).autoCreateDirectory(true).get())
.get();
}


This worked! I also tried following

public IntegrationFlow fileProcessingFlow() {
return IntegrationFlows.from(fileInChannel())
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.handle(myFileProcessor)
.handle(Files.outboundAdapter(new File(outDir)).autoCreateDirectory(true).get())
.get();
}


This also worked!! I dont know whether it is just a style, or one approach is better than the other. If so, which approach is better.

Secondly, in the above case, is "file writing" (i.e the last step) sequential or will it work in different threads. If I need concurrency there too, should I introduce another taskExecutor channel between handle(fileProcessor) and handle(outBoundAdapter)?
Eventually the outboundadapter will be a remote file S3 adapter, hence the question

Answer

It's just style although I would tend to prefer the second. The file adapter is thread safe.

In general, the writes will occur in parallel.

The only exception to that is if you are writing with FileExistsMode.APPEND, in which case a lock is held during the write and, if the file name hashes to the same lock (there are 256 locks) as another file that is being written then it will run when the first completes.

Comments