zzlyn zzlyn -3 years ago 82
JSON Question

Error in serialization when consuming JSON Objects from Kafka

I am trying to produce Json Objects to Kafka and consume them manually, I am using the JSONPOJO Serdes in org.apache.kafka.streams.examples.pageview.

My producer code is :

package JsonProducer;

imports ...

public class jsnPdc {

public static void main(String[] args) throws IOException {

byte[] arr= "XXXX THIS IS TEST DATA \n XYZ".getBytes();
JSONObject jsn = new JSONObject();
jsn.put("Header_Title", (Arrays.copyOfRange(arr, 0, 4)));
jsn.put("Data_Part", (Arrays.copyOfRange(arr, 4, arr.length)));


Properties props = new Properties();
props.put("bootstrap.servers", "xxxxxxxxxxxxxxxxxxxxx:xxxx");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.streams.examples.pageview.JsonPOJOSerializer");

KafkaProducer<String, JSONObject> pdc = new KafkaProducer<>(props);
pdc.send(new ProducerRecord<String,JSONObject>("testoutput", jsn));

System.in.read();


}

}


and the code for consumer is :

package testConsumer;

imports ...

public class consumer_0 {
static public void main(String[] argv) throws ParseException {

//Configuration
Properties props = new Properties();
props.put("bootstrap.servers", "xxxxxxxxxxxxxxxxxxx:xxxx");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.streams.examples.pageview.JsonPOJODeserializer");


//Create Consumer Object
KafkaConsumer<String, JSONObject> consumer = new KafkaConsumer<String, JSONObject>(props);
consumer.subscribe(Arrays.asList("testoutput"));


//Keep Polling Records
System.out.println("Polling new record...\n");
while (true) {
ConsumerRecords<String, JSONObject> records = consumer.poll(100);

//Print Each Record
for (ConsumerRecord<String, JSONObject> record : records){
JSONObject json = record.value();

//Some print code, print(json) ...

}
}
}
}


And I get this problem:

Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition testoutput-0 at offset 20491
Caused by: org.apache.kafka.common.errors.SerializationException: java.lang.IllegalArgumentException: Unrecognized Type: [null]
Caused by: java.lang.IllegalArgumentException: Unrecognized Type: [null]
at com.fasterxml.jackson.databind.type.TypeFactory._fromAny(TypeFactory.java:1170)
at com.fasterxml.jackson.databind.type.TypeFactory.constructType(TypeFactory.java:618)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2929)
at org.apache.kafka.streams.examples.pageview.JsonPOJODeserializer.deserialize(JsonPOJODeserializer.java:49)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:882)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:788)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:480)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1061)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at testConsumer.consumer_0.main(consumer_0.java:43)


I need the value field type of json to be in byte array. Any idea why this is happening?

Answer Source

You've misunderstood whose responsibility it is to serialize the values. You're telling Kafka to serialize the values you give it using org.apache.kafka.streams.examples.pageview.JsonPOJOSerializer, which is expecting a plain java object, something like

class Data {
    private String headerTitle;
    private String dataPart;
    //... constructors, getters, setters
}

But you have actually passed a JSONObject to the ProducerRecord (in other words, you've already serialized the data yourself before you give it to Kafka which then gamely tries to serialize it again).

You can either serialize your jsn vale yourself, but using the org.apache.kafka.common.serialization.StringDeserializer as your value.serializer, or you can strick with unsing the org.apache.kafka.streams.examples.pageview.JsonPOJOSerializer and define a class like Data above and pass an interance of that class to the ProducerRecord.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download