Avision Avision - 1 year ago 74
Scala Question

Spark sqlContext UDF acting on Sets

I've been trying to define a function that works within Spark's DataFrame which takes scala sets as input and outputs an integer. I'm getting the following error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 25.0 failed 1 times, most recent failure: Lost task 20.0 in stage 25.0 (TID 473, localhost): java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to scala.collection.immutable.Set

Here is a simple code that gives the crux of the issue:

// generate sample data
case class Dummy( x:Array[Integer] )
val df = sqlContext.createDataFrame(Seq(

// define the UDF
import org.apache.spark.sql.functions._
def setSize(A:Set[Integer]):Integer = {
// For some reason I couldn't get it to work without this valued function
val sizeWrap: (Set[Integer] => Integer) = setSize(_)
val sizeUDF = udf(sizeWrap)

// this produces the error
df.withColumn("colSize", sizeUDF('x)).show

What am I missing here? How can I get this to work? I know I can do this by casting to RDD but I don't want to go back and forth between RDD and DataFrames.

Answer Source

Use Seq:

val sizeUDF = udf((x: Seq) =>  setSize(x.toSet))
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download