potato potato - 3 months ago 101
Scala Question

Issue with VectorUDT when using Spark ML

I am writing an UDAF to be applied to a Spark data frame column of type Vector (spark.ml.linalg.Vector). I rely on spark.ml.linalg package so that I do not have to go back and forth between dataframe and RDD.

Inside the UDAF, I have to specify a data type for the input, buffer, and output schemas:

def inputSchema = new StructType().add("features", new VectorUDT())
def bufferSchema: StructType =
StructType(StructField("list_of_similarities", ArrayType(new VectorUDT(), true), true) :: Nil)

override def dataType: DataType = ArrayType(DoubleType,true)


VectorUDT is what I would use with spark.mllib.linalg.Vector:
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala

However, when I try to import it from spark.ml instead:
import org.apache.spark.ml.linalg.VectorUDT

I get a runtime error (no errors during the build):

class VectorUDT in package linalg cannot be accessed in package org.apache.spark.ml.linalg


Is it expected/can you suggest a workaround?

I am using Spark 2.0.0

Answer

Well, since o.a.s.ml.linalg.VectorUDT is marked as private[spark] it is expected unless you use Java (which doesn't respect package access modifiers) or you put your code somewhere in o.a.s.

Since Spark Vector is just an array (DenseVector) or an integer and a pair of arrays (SparseVector) the simplest workaround is to use this representation directly. For example for DenseVector:

override def dataType: DataType = ArrayType(ArrayType(DoubleType), true) 

After that we can use a simple UDF and let the reflection handle the rest of the job:

udf((vs: Seq[Seq[Double]]) => vs.map(v =>
  org.apache.spark.ml.linalg.Vectors.dense(v.toArray)
))