Diyoda_ Diyoda_ - 1 month ago 83
Java Question

Spring Integration Kafka Consumer Listener not Receiving messages

According to the documentation provided here, I am trying on a POC to get messages into a listener as mentioned in the the same documentation, Below is how I have written the configuration.

@Configuration
public class KafkaConsumerConfig {

public static final String TEST_TOPIC_ID = "record-stream";

@Value("${kafka.topic:" + TEST_TOPIC_ID + "}")
private String topic;

@Value("${kafka.address:localhost:9092}")
private String brokerAddress;


/*
@Bean public KafkaMessageDrivenChannelAdapter<String, String> adapter(
KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String>
kafkaMessageDrivenChannelAdapter = new
KafkaMessageDrivenChannelAdapter<>( container, ListenerMode.record);
kafkaMessageDrivenChannelAdapter.setOutputChannel(received()); return
kafkaMessageDrivenChannelAdapter; }

@Bean public QueueChannel received() { return new QueueChannel(); }
*/

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(30000);
return factory;

}

/*
* @Bean public KafkaMessageListenerContainer<String, String> container()
* throws Exception { ContainerProperties properties = new
* ContainerProperties(this.topic); // set more properties return new
* KafkaMessageListenerContainer<>(consumerFactory(), properties); }
*/

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
// props.put(ConsumerConfig.GROUP_ID_CONFIG, "mygroup");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest
// smallest
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}

}


and Listener is as below,

@Service
public class Listener {

private Logger log = Logger.getLogger(Listener.class);


@KafkaListener(topicPattern = KafkaConsumerConfig.TEST_TOPIC_ID, containerFactory = "kafkaListenerContainerFactory")
public void process(String message/* , Acknowledgment ack */) {
Gson gson = new Gson();
Record record = gson.fromJson(message, Record.class);

log.info(record.getId() + " " + record.getName());
// ack.acknowledge();
}

}


Even though I am producing messages to the same topic and this consumer is working on the same topic, Listener is not executing.

I am running Kafka 0.10.0.1, and here is my current pom. This consumer is working as a spring boot web application unlike many command line samples.

<dependencies>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>

</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-java-dsl</artifactId>

</dependency>

<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

</dependencies>


I have spent a good amount of time to figure out why this listener is not getting hit when the topic has messages, what is it I am doing wrong.

I know that I can receive the messages using a channel (I have commented configuration part of that out in the code), But here the concurrency is handle clean.

Is this kind of implementation is possible with a async message consumption.

Answer

You have to add @EnableKafka alongside with the @Configuration.

Will add some description soon.

Meanwhile:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
Comments