JudgingNotJudging JudgingNotJudging - 3 months ago 326
Java Question

Spring Integration - How to debug 'Dispatcher has no Subscribers'?

I'm using Spring Boot 1.4.0.RELEASE, Spring Integration 4.3.1.RELEASE, Spring Integration DSL 1.2.0.M1.

What I'm trying to do:

I'm writing an application that will read files from FTP and local file system (using inbound channel adapters), transfer the files to a local working directory (using file outbound gateways), process, then move them to a final destination (file outbound gateway/adapters).

I'm running into an issue with 'Dispatcher has no subscribers for channel' errors. I believe this probably means something in the context is broken and the Integration components are not starting. The Context itself says it is active when I debug.

My actual configuration is fairly large, so I'm not looking for someone to find the solution for me. I'm looking for some guidance on where to look and how to figure out what component is complaining.

The actual error is below.

DEBUG [integration.channel.ExecutorChannel] [task-scheduler-1] preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application:test.fileReadingFlow.channel#1'.


The
fileReadingFlow
is an
InboundChannelAdapter
which reads files from a directory (basically, what I asked about here. There is nothing complex in it. The adapter sends the message to a
.log()
handler, enriches headers, sends it to a handler (
Files.outboundgateway
), and finally a
MessageChannel
.

What I've tried:


  • I've walked through the chain of
    MessageChannel
    s and things line up (no misspellings, all
    Bean
    exist).

  • I've added more
    LoggingHandler
    s in the
    fileReadingFlow
    to identify where the message errors.

  • I've removed portions of the
    fileReadingFlow
    to see if I could get the message farther along.

  • I've removed some
    Component
    s to see if I could find the problem.

  • I added debug logging for
    org.springframework.integration
    and nothing resembling an error or warning appears.



What I found was that the first time the flow tried to do something other than logging (even enrichHeaders), the Dispatcher error occurred, and the message ended up in the
errorChannel
. When I changed the
fileReadingFlow
to only read the file, log a message, and terminate with an empty Handler, I got the Dispatcher error. Thus, I'm fairly certain the issue is not with the
fileReadingFlow
itself.

Other than removing each
Component
one by one, is there a way to track down what is causing the error?

EDIT:

Source:

@Bean(name = "fileReadingFlow")
@Scope("prototype")
@Profile("test")
public IntegrationFlow testFileReadingFlow(MyEntity entity) {

return IntegrationFlows.from(s -> s.file(new File("someFolder")))
.filter(fileListFilterBuilder.buildFileListFilter(File.class))
, endpointConfigurer -> endpointConfigurer.poller(poller)
)
.log(DEBUG, "com.myco.testFileReadingFlow")
.enrichHeaders(h ->
h.header("entity", entity)
.header(FOLDER_NAME, entity.getFolder())
)
.log(DEBUG, "com.myco.testFileReadingFlow", message -> "after headers")
.handle(Files.outboundGateway("workingFolder").deleteSourceFiles(true).autoCreateDirectory(true))
.log(DEBUG, "com.myco.testFileReadingFLow", message -> "sending message to aggregatingFileChannel " + message)
.channel("aggregatingFileChannel")
.get();
}
@Bean
public MessageChannel aggregatingFileChannel() {
return MessageChannels.executor(Executors.newCachedThreadPool()).get();

}

@Bean
public IntegrationFlow aggregatingFlow() {

// Read from the aggregatingFileChannel
return from("aggregatingFileChannel")
<...>
.get();
}


Application:

@SpringBootApplication
@EnableConfigurationProperties
@EntityScan(
basePackages = { "com.myco.model" }
)
@EnableJpaRepositories(basePackages = {"com.myco.rest"})
public class Application {

public static void main(String[] args) {
ConfigurableApplicationContext context = new SpringApplicationBuilder(Application.class).web(false).run(args);

MyEntitySvc entitySvc = context.getBean(MyEntitySvc.class);

List<MyEntity> entities = entitySvc.findAllActive();

AutowireCapableBeanFactory beanFactory = context.getBeanFactory();

entities.forEach(entity -> {
IntegrationFlow flow = (IntegrationFlow) context.getBean("fileReadingFlow", entity);

beanFactory.getBean(entity.getFolder() + MyConstants.ADAPTER, Lifecycle.class).start();
}


Solution:

Per my comments below, the
@Prototype
method did work at some point, but I broke it and could not easily rollback the change. Using Gary and Artem's suggestions, I tried changed to using the
IntegrationFlowContext
method. In order to retain the runtime startup, profile driven injection, etc. I had originally, I moved the definition of my
IntegrationFlow
from a
@Configuration
class to a
@Service
class. That way I can inject the
IntegrationFlowContext
into the
Service
, and implement different versions of the
Service
for my different profiles without requiring my
Application
to know about the
Profile
. The main method goes from extracting a
Bean
from
Context
and manually starting it to retrieving a
Service
and calling a method.

@Service
@Profile("test")
public class TestFlowSvc implements FlowSvc {
public IntegrationFlow testFileReadingFlow(Vendor vendor) {
return // As previous Flow
}

public void startFileReadingFlow(MyEntity entity) {

IntegrationFlow flow = testFileReadingFlow(entity);

integrationFlowContext.register(flow, true);
}
}


Application:

@SpringBootApplication
@EnableConfigurationProperties
@EntityScan(
basePackages = { "com.myco.model" }
)
@EnableJpaRepositories(basePackages = {"com.myco.rest"})
public class Application {

public static void main(String[] args) {
ConfigurableApplicationContext context = new SpringApplicationBuilder(Application.class).web(false).run(args);

MyEntitySvc entitySvc = context.getBean(MyEntitySvc.class);
FlowSvc flowSvc = context.getBean(FlowSvc.class);
List<MyEntity> entities = entitySvc.findAllActive();

entities.forEach(entity -> {
flowSvc.startFileReadingFlow(entity);
}

Answer

We have Dispatcher has no Subscribers error when there is some SubscribableChannel without active Subscriber (MessageHandler in term of Spring Integration). "Active" means there is no one defined at all or it is in a stopped state.

So, for your problem with the application:test.fileReadingFlow.channel#1 MessageChannel, I'd look into that fileReadingFlow IntegrationFlow one more time and find what is on the second anonymous DirectChannel.

I can't figure out the easy way to debug such a problem because there is a MessageChannel, but there is nothing to track, because of Dispatcher has no Subscribers.

So, please, show that fileReadingFlow IntegrationFlow definition and let's fix the problem together!

Looking to your related question it should be this one:

.handle(Files.outboundGateway())

which may be just in stopped state.

But we can't be sure until the actual code.