robinlmorris robinlmorris - 1 month ago 21
Scala Question

Can't collect data from dataset/dataframe in Spark 2.0.1; get ClassCastException

I have some json data that are key value pairs with ints as the keys and a lists of ints as the values. I want to read this data into a map and then broadcast it so it can be used by another RDD for quick lookup.

I have code that worked on a 1.6.1 spark cluster that is in a data center, but the same code won't work in a 2.0.1 spark cluster in AWS. The 1.6.1 code that works:

import scala.collection.mutable.WrappedArray
sc.broadcast(sqlContext.read.schema(mySchema).json(myPath).map(r => (r.getInt(0), r.getAs[WrappedArray[Int]].toArray)).collectAsMap)


I for 2.0.1 I have tried:

val myData = sqlContext.read.schema(mySchema).json(myPath).map(r => (r.getInt(0), r.getSeq[Int].toArray))


This gets me what I want at this point:

org.apache.spark.sql.Dataset[(Int, Array[Int])] = [_1: int, _2: array<int>]


But then when I do:

sc.broadcast(myData.rdd.collectAsMap)


I get:

java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD

Does anyone know how I can do this in 2.0.1? It is a very simple thing I am trying to do.

Thanks in advance,

Robin

Answer

I figured out that my problem was with spark shell in 2.0.1. The code I posted works fine if I use the existing sc and sqlContext that is part of the spark session that the shell creates. If I call stop and create a new session with a custom config, I will get the strange error above. I don't like this as I want to change the spark.driver.maxResultSize.

Anyway, the lesson is: if you are testing code using spark shell, use the existing session or it may not work.

Comments