user2942227 user2942227 - 25 days ago 8
Java Question

Send byte array to storm kafka bolt

I have written a storm topology. I basically want to send tuples in avro schema in form of byte array to kafka topic.

This is how I set the bolt :

builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, byte[]>())
.fieldsGrouping(BOLT1, new Fields("key"));


And this is how I am converting to byte array

Schema schema = avroObject.getSchema();

DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(ping, encoder);
encoder.flush();
byte[] message = out.toByteArray();
String key = new String(message, "UTF-8");


When I emit tuple in following way I don't see anything in kafka topic (send byte stream to kafka) :

collector.emit(tuple, new Values(Obj.hashMD5(key), message));


but Instead If I convert byte array to string and then to kafka topic it works :

Something like below :

builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, String>())
.fieldsGrouping(BOLT1, new Fields("key"));

collector.emit(tuple, new Values(Obj.hashMD5(key), key));


What am I doing wrong? How do I send byte stream to kafka topic using storm kafka bolt?

Answer

You have the problem because your MD5 hash is incorrect:

You say that if you convert your bytearray to a java String it works: it is because the value of the MD5 is correct according to a String.

collector.emit(tuple, new Values(Obj.hashMD5(key), key));

As you can see the MD5 is calculated on a String parameter and you send the String corresponding to the MD5: everything is good!

But if you send a bytearray, you need to calculate the MD5 on a bytearray and it will be a bytearray as a result, not a String. Your code:

collector.emit(tuple, new Values(Obj.hashMD5(key), message));

is incorrect as the MD5 does not correspond to message but to the converted value of message in UTF-8 as a String which is lossy (see below).

Here is a link to another question on SO to calculate a MD5 correctly in a bytearray format:

How can I generate an MD5 hash?

This is because converting bytearray to String is lossy in Java (contrary to C) and you will miss data in the process as some bytes do not correspond to a char in Java encoding (you have some of these apparently in your data).

So your KafkaBolt should be

KafkaBolt<byte[], byte[]>

I don't know if it is sufficient to send a bytearray MD5 along with your bytearray in kafka storm. If it is not, you will have to use an encoding that is lossless between bytearray and java String such as BASE64:

base64 encoding in Java

You will have to convert your bytearray to a base64 string, using

KafkaBolt<String, String>

and then sending the data as usual

collector.emit(tuple, new Values(Obj.hashMD5(keyInBase64), keyInBase64));

It also means that when you fetch the data from kafka, it will be a String in base64 that you will have to decode to get the bytearray back.

Hope that helps.

Comments