Bob Bob - 3 months ago 39
Scala Question

Error when broadcasting Joda DateTime in Spark

When using Joda Time with Spark, the code below causes

java.lang.NullPointerException


val todayBroadcast = sc.broadcast(new DateTime())
val dataRDD2 = dataRDD.filter(item => {
todayBroadcast.value.minusMonths(1).isBefore(item._1)
})


On the other hand, the code below works with no issues

val dataRDD2 = dataRDD.filter(item => {
val today = new DateTime()
today.minusMonths(1).isBefore(item._1)
})

Answer

As far as I know, Joda has some problem with default serialization offered by Apache Spark. In particular the problem is with the Kryo serializer.

You can have a look to this SO thread.

Anyway, try to disable the Kryo serialization and use the standard Java serializer org.apache.spark.serializer.JavaSerializer. You can find the property spark.serializer inside the spark-defaults.conf of your Spark installation.

Now, you should have the following property:

spark.serializer=org.apache.spark.serializer.KryoSerializer

that you have to change to

spark.serializer=org.apache.spark.serializer.JavaSerializer

Then, restart you Spark installation. If you're using some particular distribution (i.e. Cloudera), change the above property using the administration console they give you.

If you can't use the standard serializer, you can transform you DateTime in some other serialization-friendly format, like a String or a Long (time in milliseconds)

Let us know.

Comments