khateeb khateeb - 20 days ago 9
Scala Question

Getting Hadoop OutputFormat RunTimeException while running Apache Spark Kafka Stream

I am running a program which uses Apache Spark to get get data from Apache Kafka cluster and puts the data in a Hadoop file. My program is below:

public final class SparkKafkaConsumer {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
Map<String, Integer> topicMap = new HashMap<String, Integer>();
String[] topics = "Topic1, Topic2, Topic3".split(",");
for (String topic: topics) {
topicMap.put(topic, 3);
}
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, "kafka.test.com:2181", "NameConsumer", topicMap);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String x) {
return Lists.newArrayList(",".split(x));
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
wordCounts.saveAsHadoopFiles("hdfs://localhost:8020/user/spark/stream/", "txt");
jssc.start();
jssc.awaitTermination();
}
}


I am using the this command to submit the application:
C:\spark-1.6.2-bin-hadoop2.6\bin\spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.2 --class "SparkKafkaConsumer" --master local[4] target\simple-project-1.0.jar


I am getting this error:
java.lang.RuntimeException: class scala.runtime.Nothing$ not org.apache.hadoop.mapred.OutputFormat at org.apache.hadoop.conf.Configuration.setClass(Configuration.java:2148)


What is causing this error and how do I solve it?

Answer

I agree that the error is not really evocative, but it is usually better to specify the format of the data you want to output in any of the saveAsHadoopFile methods to protect yourself from this type of exception.

Here's the prototype of your particular method in the documentation :

saveAsHadoopFiles(java.lang.String prefix, java.lang.String suffix, java.lang.Class<?> keyClass, java.lang.Class<?> valueClass, java.lang.Class<F> outputFormatClass)

In your example, that would correspond to :

wordCounts.saveAsHadoopFiles("hdfs://localhost:8020/user/spark/stream/", "txt", Text.class, IntWritable.class, TextOutputFormat.class)

Based on the format of your wordCounts PairDStream, I chose Text as the key is of type String, and IntWritable as the value associated to the key is of type Integer.

Use TextOutputFormat if you just want basic plain text files, but you can look into the subclasses of FileOutputFormat for more output options.

As this was also asked, the Text class comes from the org.apache.hadoop.io package and the TextOutputFormat comes from the org.apache.hadoop.mapred package.