pkm pkm - 17 days ago 11
Java Question

Spring Integration File reading

I am newbie to Spring Integration. I am working on solution, but I am stuck on a specific issue while using inbound file adapter ( FileReadingMessageSource ).
I have to read files from different directories and process them and save the files in different directories. As I understand, the directory name is fixed at the start of the flow.
Can some one help me on changing the directory name for different requests.

I attempted the following. First of all, I am not sure whether it is correct way to about and although it worked for only one directory. I think Poller was waiting for more files and never came back to read another directory.

@SpringBootApplication
@EnableIntegration
@IntegrationComponentScan
public class SiSampleFileProcessor {

@Autowired
MyFileProcessor myFileProcessor;

@Value("${si.outdir}")
String outDir;

@Autowired
Environment env;

public static void main(String[] args) throws IOException {
ConfigurableApplicationContext ctx = new SpringApplication(SiSampleFileProcessor.class).run(args);
FileProcessingService gateway = ctx.getBean(FileProcessingService.class);
boolean process = true;
while (process) {
System.out.println("Please enter the input Directory: ");
String inDir = new Scanner(System.in).nextLine();
if ( inDir.isEmpty() || inDir.equals("exit") ) {
process=false;
} else {
System.out.println("Processing... " + inDir);
gateway.processFilesin(inDir);
}
}
ctx.close();
}

@MessagingGateway(defaultRequestChannel="requestChannel")
public interface FileProcessingService {
String processFilesin( String inputDir );
}

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedDelay(1000).get();
}

@Bean
public MessageChannel requestChannel() {
return new DirectChannel();
}

@ServiceActivator(inputChannel = "requestChannel")
@Bean
GenericHandler<String> fileReader() {
return new GenericHandler<String>() {
@Override
public Object handle(String p, Map<String, Object> map) {
FileReadingMessageSource fileSource = new FileReadingMessageSource();
fileSource.setDirectory(new File(p));
Message<File> msg;
while( (msg = fileSource.receive()) != null ) {
fileInChannel().send(msg);
}
return null; // Not sure what to return!
}
};
}

@Bean
public MessageChannel fileInChannel() {
return MessageChannels.queue("fileIn").get();
}

@Bean
public IntegrationFlow fileProcessingFlow() {
return IntegrationFlows.from(fileInChannel())
.handle(myFileProcessor)
.handle(Files.outboundAdapter(new File(outDir)).autoCreateDirectory(true).get())
.get();
}
}


EDIT: Based on Gary's response replaced some methods as

@MessagingGateway(defaultRequestChannel="requestChannel")
public interface FileProcessingService {
boolean processFilesin( String inputDir );
}

@ServiceActivator(inputChannel = "requestChannel")
public boolean fileReader(String inDir) {
FileReadingMessageSource fileSource = new FileReadingMessageSource();
fileSource.setDirectory(new File(inDir));
fileSource.afterPropertiesSet();
fileSource.start();
Message<File> msg;
while ((msg = fileSource.receive()) != null) {
fileInChannel().send(msg);
}
fileSource.stop();
System.out.println("Sent all files in directory: " + inDir);
return true;
}


Now it is working as expected.

Answer

The FileReadingMessageSource uses a DirectoryScanner internally; it is normally set up by Spring after the properties are injected. Since you are managing the object outside of Spring, you need to call Spring bean initialization and lifecycle methods afterPropertiesSet() , start() and stop().

Call stop() when the receive returns null.

> return null;  // Not sure what to return!

If you return nothing, your calling thread will hang in the gateway waiting for a response. You could change the gateway to return void or, since your gateway is expecting a String, just return some value.

However, your calling code is not looking at the result anyway.

> gateway.processFilesin(inDir);

Also, remove the @Bean from the @ServiceActivator; with that style, the bean type must be MessageHandler.