tomkou tomkou - 24 days ago 10
Java Question

NullPointerException in SQLContext.read() Spark

I am trying to read JSON records, produced by Kafka, in Spark using SQLContext.read(). Every time NullPointerException appears.

SparkConf conf = new SparkConf()
.setAppName("kafka-sandbox")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));

Set<String> topics = Collections.singleton(topicString);
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", servers);

JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(
ssc, String.class, String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, topics);
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

directKafkaStream
.map(message -> message._2)
.foreachRDD(rdd -> {
rdd.foreach(record -> {
Dataset<Row> ds = sqlContext.read().json(rdd);
});
});
ssc.start();
ssc.awaitTermination();


Here is a log:

java.lang.NullPointerException
at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:112)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:110)
at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:535)
at org.apache.spark.sql.SparkSession.read(SparkSession.scala:595)
at org.apache.spark.sql.SQLContext.read(SQLContext.scala:504)
at SparkJSONConsumer$1.lambda$2(SparkJSONConsumer.java:73)
at SparkJSONConsumer$1$$Lambda$8/1821075039.call(Unknown Source)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:350)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:350)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:875)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:875)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


I assume the problem is due to foreachRDD clause, but can't figure it out. So any suggestions would be great.

Also, I am using sqlContext, because after I plan to serialize records in avro format ("com.databricks.spark.avro"). If there is a way to serialize a string, containing JSON structure, to avro format without defining the schema, you are very welcome to share it!

Thanks in advance.

Answer

Well, apparently reading from the context should be done after starting the context (as in ssc.start()). That's quite logical, but still took me a day to figure it out ("master-level skills").

So in the end, I rewrote the code part as follows:

directKafkaStream
        .map(message -> message._2)
        .foreachRDD(rdd -> {
            rdd.foreach(record -> {
                DataFrame df = dfr.json("file.json");
            });
         });
   ssc.start();
   dfr = sqlContext.read();
   ssc.awaitTermination();

And I added in class a static field for DataFrameReader.

public static DataFrameReader dfr;

Also there were couple of problems with reading json object from RDD, so I change it to the reading from file. But that's not what caused NullPointerException.

Comments