Developer Developer - 1 month ago 50
Java Question

How to upgrade to spring-integration-kafka 2.1.0.RELEASE with xml configuration?

I am upgrading spring-integration-kafka from 1.0.0.M2 to 2.1.0.RELEASE and client 0.9.0 to 0.10.0

Current xml configuration as below



<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

<int:publish-subscribe-channel id="inputToKafka" />

<int-kafka:outbound-channel-adapter
kafka-producer-context-ref="kafkaProducerContext" auto-startup="true"
channel="inputToKafka" order="1">
</int-kafka:outbound-channel-adapter>

<bean id="producerProperties"
class="org.springframework.beans.factory.config.PropertiesFactoryBean">
<property name="properties">
<props>
<prop key="topic.metadata.refresh.interval.ms">${topic.metadata.refresh.interval.ms}</prop>
<prop key="message.send.max.retries">${message.send.max.retries}</prop>
<prop key="send.buffer.bytes">${send.buffer.bytes}</prop>
</props>
</property>
</bean>

<bean id="fcmNotificationEncoder"
class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder">
<constructor-arg value="common.vo.NotificationVo" />
</bean>

<int-kafka:producer-context id="kafkaProducerContext"
producer-properties="producerProperties">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration
broker-list="${kafka.servers}" key-class-type="java.lang.String"
value-class-type="common.vo.fcmNotificationVo"
value-encoder="fcmNotificationEncoder" topic="trigger-fcm-notification"
compression-codec="none" />
</int-kafka:producer-configurations>
</int-kafka:producer-context>

<int:service-activator input-channel="ip-chanel-trigger-fcm-notification"
ref="fcmNotificationConsumer">
</int:service-activator>

<bean id="consumerProperties"
class="org.springframework.beans.factory.config.PropertiesFactoryBean">
<property name="properties">
<props>
<prop key="auto.offset.reset">${auto.offset.reset}</prop>
<prop key="socket.receive.buffer.bytes">${socket.receive.buffer.bytes}</prop> <!-- 10M -->
<prop key="fetch.message.max.bytes">${fetch.message.max.bytes}</prop>
<prop key="auto.commit.interval.ms">${auto.commit.interval.ms}</prop>
</props>
</property>
</bean>


<int-kafka:zookeeper-connect id="zookeeperConnect"
zk-connect="${zookeeper.servers}" zk-connection-timeout="${zookeeper.connection.timeout}"
zk-session-timeout="${zookeeper.session.timeout}" zk-sync-time="${zookeeper.sync.time}" />


<bean id="kafkaThreadListener" class="api.utils.KafkaConsumerStarter"
init-method="initIt" destroy-method="cleanUp" />

<int-kafka:inbound-channel-adapter
kafka-consumer-context-ref="consumerContextFCM" auto-startup="false"
channel="ip-chanel-trigger-fcm-notification" id="kafka-inbound-channel-adapter-FCM">
<int:poller fixed-delay="1000" time-unit="MILLISECONDS"
receive-timeout="0" />
</int-kafka:inbound-channel-adapter>


<!-- Consumer -->

<bean id="fcmNotificationDecoder"
class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaDecoder">
<constructor-arg value="common.vo.NotificationVo" />
</bean>

<int-kafka:consumer-context id="consumerContextFCM"
consumer-timeout="4000" zookeeper-connect="zookeeperConnect"
consumer-properties="consumerProperties">
<int-kafka:consumer-configurations>
<int-kafka:consumer-configuration
group-id="trigger-fcm-notification" max-messages="50"
value-decoder="fcmNotificationDecoder">
<int-kafka:topic id="trigger-fcm-notification"
streams="10" />
</int-kafka:consumer-configuration>
</int-kafka:consumer-configurations>
</int-kafka:consumer-context>

</beans>





How to change this to 2.1.0.RELEASE ?

~~~~~~~~~~~~~~~

EDITED HERE:

Using reference modified the xml as per my requirement. I got small issue while reading the Consumer Record . I got payload as follows



{
kafka_offset=7,
kafka_receivedMessageKey=null,
kafka_receivedPartitionId=0,
kafka_receivedTopic=trigger-fcm-notification,
kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = trigger-fcm-notification, partition = 0, offset = 7, CreateTime = 1476864644264, checksum = 3680317883, serialized key size = -1, serialized value size = 270, key = null, value = common.vo.NotificationVo@203c8c95)
}





I need the value (NotificationVo) for further use in consumer. How to get it as part of payload?

~~~~~~~~~~~~~~~

EDITED HERE:



<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

<int:publish-subscribe-channel id="inputToKafka" />

<int-kafka:outbound-channel-adapter
id="kafkaOutboundChannelAdapter" kafka-template="template"
auto-startup="true" channel="inputToKafka" topic="trigger-fcm-notification"
order="1">

<int-kafka:request-handler-advice-chain>
<bean
class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice" />
</int-kafka:request-handler-advice-chain>
</int-kafka:outbound-channel-adapter>

<!--Producer-->
<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
<entry key="retries" value="5" />
<entry key="batch.size" value="16384" />
<entry key="linger.ms" value="1" />
<entry key="buffer.memory" value="33554432" />
<entry key="key.serializer"
value="org.apache.kafka.common.serialization.StringSerializer" />
<entry key="value.serializer"
value="common.vo.NotificationVoSerializer" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>


<int-kafka:message-driven-channel-adapter
id="kafka-inbound-channel-adapter-FCM" listener-container="container1"
auto-startup="true" phase="100" send-timeout="5000"
channel="ip-chanel-trigger-fcm-notification" mode="record"
message-converter="messageConverter" />

<bean id="messageConverter"
class="org.springframework.kafka.support.converter.MessagingMessageConverter" />

<int:service-activator input-channel="ip-chanel-trigger-fcm-notification"
ref="fcmNotificationConsumer">
</int:service-activator>

<!--Consumer-->
<bean id="container1"
class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
<entry key="enable.auto.commit" value="false" />
<entry key="auto.commit.interval.ms" value="100" />
<entry key="session.timeout.ms" value="15000" />
<entry key="group.id" value="trigger-fcm-notification" />
<entry key="key.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
<entry key="value.deserializer"
value="common.vo.NotificationVoDeserializer" />
</map>
</constructor-arg>
</bean>
</constructor-arg>

<constructor-arg>
<bean class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="trigger-fcm-notification" />
</bean>
</constructor-arg>
</bean>

</beans>





This is the modified xml config file

~~~~~~~~~~~~~~~

EDITED HERE:

Consumer class:



package common.notification.consumer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import common.vo.NotificationVo;

@Component
public class FcmNotificationConsumer {

@SuppressWarnings("unchecked")
@ServiceActivator
public <K, V> void process(Map<K, V> payload) {

String topic = null;
System.out.println("payload=====>"+payload.toString());

for (K item : payload.keySet()) {
topic = (String) item;
}

Object ackObject = payload.get(topic);
System.out.println("ackObject=====>"+payload.get(topic));

}
}





O/P:



payload=====>{kafka_offset=21, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=trigger-fcm-notification, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = trigger-fcm-notification, partition = 0, offset = 21, CreateTime = 1476887227554, checksum = 222603853, serialized key size = -1, serialized value size = 270, key = null, value = common.vo.NotificationVo@29206bb8)}

ackObject=====>Acknowledgment for ConsumerRecord(topic = trigger-fcm-notification, partition = 0, offset = 21, CreateTime = 1476887227554, checksum = 222603853, serialized key size = -1, serialized value size = 270, key = null, value = common.vo.NotificationVo@29206bb8)





~~~~~~~~~~~~~~~

EDITED HERE:

Received expected payload after changing the method parameter in Consumer class.



package common.notification.consumer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import common.vo.NotificationVo;

@Component
public class FcmNotificationConsumer {

@SuppressWarnings("unchecked")
@ServiceActivator
public void process(Message<?> message) {
System.out.println("Message=====>"+message);
Object payloadObject = message.getPayload();
NotificationVo notificationVo = (NotificationVo) payloadObject;
}
}





O/P:



Message=====>GenericMessage [payload=common.vo.NotificationVo@4c144e99, headers={kafka_offset=16, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=trigger-fcm-notification, kafka_acknowledgment=Acknowledgment for ConsumerRecord(topic = trigger-fcm-notification, partition = 0, offset = 16, CreateTime = 1476949945607, checksum = 2501578118, serialized key size = -1, serialized value size = 270, key = null, value = common.vo.NotificationVo@4c144e99)}]





Finally working as expected.

Thanks a lot for the support.

Answer

OK. Now it is more cleaner than before.

So, you current code is for Apache Kafka 0.8. Since version 0.9 it has fully different design. Therefore the current code for the Spring Integration Kafka 1.0 must be throw away.

You should read about Apache Kafka 0.10: https://kafka.apache.org/documentation.

About Spring Kafka, which is based on the Kafka Client 0.10: http://docs.spring.io/spring-kafka/docs/1.1.1.RELEASE/reference/html/

And pay attention to the chapter there about Spring Integration Kafka: http://docs.spring.io/spring-kafka/docs/1.1.1.RELEASE/reference/html/_spring_integration.html

And note: nobody here is going to do work for you.

EDIT

I'm not sure why you say that you have a payload like that, because that is exactly headers. That value in the ConsumerRecord is converted to the payload of the message.

Please, share the code where you receive that and try to extract the value. The <int-kafka:message-driver-channel-adapter> produces Message<> with those headers and payload from the converted ConsumerRecord value.