Don Don - 1 year ago 136
Java Question

Apache Flink read Avro byte[] from Kafka

In reviewing examples I see alot of this:

FlinkKafkaConsumer08<Event> kafkaConsumer = new FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);


I see that they here already know the schema.


I do not know the schema until I read the byte[] into a Generic Record
then get the schema. (As it may change from record to record)


Can someone point me into a
FlinkKafkaConsumer08
that reads from
byte[]
into a map filter so that I can remove some leading bits, then load that
byte[]
into a Generic Record ?

Answer Source

I'm doing something similar (I'm using the 09 consumer)

In your main code pass in your custom deserializer:

FlinkKafkaConsumer09<Object> kafkaConsumer = new FlinkKafkaConsumer09<>(
                parameterTool.getRequired("topic"), new MyDeserializationSchema<>(),
                parameterTool.getProperties());

The custom Deserialization Schema reads the bytes, figures out the schema and/or retrieves it from a schema registry, deserializes into a GenericRecord and returns the GenericRecord object.

public class MyDeserializationSchema<T> implements DeserializationSchema<T> {


    private final Class<T> avrotype = (Class<T>) org.apache.avro.generic.GenericRecord.class;

    @Override
    public T deserialize(byte[] arg0) throws IOException {
        //do your stuff here, strip off your bytes
        //deserialize and create your GenericRecord 
        return (T) (myavroevent);
    }

    @Override
    public boolean isEndOfStream(T nextElement) {
        return false;
    }

    @Override
    public TypeInformation<T> getProducedType() {
        return TypeExtractor.getForClass(avrotype);
    }

}
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download