Ankur Chauhan Ankur Chauhan - 4 years ago 155
Java Question

Using reflection to invoke Type parameterized methods

The google cloud dataflow sdk has a class that registers Coders for different Avro types.

CoderRegistry cr = p.getCoderRegistry();
cr.registerCoder(Row.class, AvroCoder.of(Row.class));
cr.registerCoder(Destination.class, AvroCoder.of(Destination.class));
cr.registerCoder(Device.class, AvroCoder.of(Device.class));
cr.registerCoder(Location.class, AvroCoder.of(Location.class));
cr.registerCoder(Source.class, AvroCoder.of(Source.class));
cr.registerCoder(DimensionalMetric.class, AvroCoder.of(DimensionalMetric.class));
cr.registerCoder(DimensionSet.class, AvroCoder.of(DimensionSet.class));
cr.registerCoder(MetricSet.class, AvroCoder.of(MetricSet.class));

But as you can imagine this gets pretty cumbersome and I would like to use java reflection API to automatically register all classes in the package
. I imagine this would look something like this:

void registerAllModels(Pipeline p) {
CoderRegistry cr = p.getCoderRegistry();
Reflections r = new Reflections("com.brightcove.rna.model");
Set<Class<? extends IndexedRecord>> classes = r.getSubTypesOf(IndexedRecord.class);
for (Class<? extends IndexedRecord> clazz : classes) {
cr.registerCoder(clazz, AvroCoder.of(clazz));

Unfortunately, this doesn't work. What am I doing wrong here?

Answer Source

You have a compile error in the following line:

cr.registerCoder(clazz, AvroCoder.of(clazz));

The error is:

The method registerCoder(Class<T>, Coder<T>) in the type CoderRegistry is not applicable for the arguments (Class<capture#1-of ? extends IndexedRecord>, AvroCoder<capture#2-of ? extends IndexedRecord>).

Basically, the issue is that Java compiler cannot infer that the wildcard types on both parameters are the same, and is reporting an error. This is a common Java issue, see this question, for example.

A fix is the following, which may require you to suppress compiler warnings about raw types and an unchecked conversion:

  cr.registerCoder(clazz, (Coder) AvroCoder.of(clazz.newInstance().getSchema()));

This fixes two problems:

  • Ensures registerCoder(Class<T>, Coder<T>) is used instead of the overloaded registerCoder(Class<?>, Class<?>).
  • AvroCoder.of(Class<T>) uses Avro's ReflectData.get().getSchema() to generate a schema, which may not work on all types, such as GenericRecord. An alternative is to construct the coder via AvroCoder.of(Schema) and get the schema from the automatically-generated class.
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download