miro miro - 3 months ago 78
Java Question

Flink + Kafka + JSON - java example

I'm trying to get a JSON from a Kafka topic with this code:

public class FlinkMain {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// parse user parameters
ParameterTool parameterTool = ParameterTool.fromArgs(args);

DataStream messageStream = env.addSource(
new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
, new JSONKeyValueDeserializationSchema(false), parameterTool.getProperties()));

messageStream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;

@Override
public String map(String value) throws Exception {
return "Kafka and Flink says: " + value;
}
});

env.execute();
}


}

The issues are:

1) this program does not run due

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(FlinkMain.java:23)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.

The problem is at line: `messageStream.map(....`


2) Maybe the above issue is related to the fact that
DataStream
has no type. But if I try to make:

DataStream<String> messageStream = env.addSource(...


The code will not compile due
cannot resolve constructor FlinkKafkaConsumer09 ...


The pom.xml (the important part):

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.1.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.1.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.1.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.11</artifactId>
<version>1.1.1</version>
</dependency>

</dependencies>


I've been looking for some code in Flink that uses a JSON DeserializationSchema without success. I've just found the unittest for the
JSONKeyValueDeserializationSchema
at this link

Does anyone knows how to do the right way?

Thanks

Answer

Your error is at the line messageStream.map(new MapFunction<String, String>(). The mapFunction you defined expects an Input of the type String and output of the type String, but since you are using a JSONKeyValueDeserializationSchema which converts String to com.fasterxml.jackson.databind.node.ObjectNode your MapFunction should actually expect an input of the same type ObjectNode. Try the below code.

messageStream.map(new MapFunction<ObjectNode, String>() {
        private static final long serialVersionUID = -6867736771747690202L;

        @Override
        public String map(ObjectNode node) throws Exception {
            return "Kafka and Flink says: " + node.get(0);
        }
    });
Comments