Gandalf Gandalf - 1 year ago 93
Java Question

Kafka Java SimpleConsumer strange encoding

I am attempting to use the SimpleConsumer in Kafka 9 to allow users to replay events from a time offset - but the Messages I am receiving back from Kafka are in a very strange encoding:


Using the KafkaConsumer this messages parse just fine. Here is the code I am using to retrieve messages using the SimpleConsumer :

for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {
long currentOffset = messageAndOffset.offset();
if (currentOffset < readOffset) {
log.debug("Found an old offset - skip");

readOffset = messageAndOffset.nextOffset();

int payloadOffset = 14 + messageAndOffset.message().keySize(); // remove first x bytes, schema Id
byte[] data = messageAndOffset.message().payload().array();
byte[] realData = Arrays.copyOfRange(data, payloadOffset, data.length - payloadOffset);
log.debug("Read " + new String(realData, "UTF-8"));

I added the code to skip the first x bytes after I kept getting UTF-32 errors about bytes being too high, which I assume is because Kafka prepends info like message size to the payload. Is this an Avro artifact?

Answer Source

I never found a good answer to this - but I switched to using the SimpleConsumer to query Kafka for the offsets I needed (per partition . . . though the implementation is poor) and then use the native KafkaConsumer using seek(TopicPartition, offset) or seekToBeginning(TopicPartition) to retrieve the messages. Hopefully they will add, to the native client, the ability to retrieve messages from a given timestamp in the next release.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download