vdep vdep - 1 month ago 14
Scala Question

How to convert List[Double] to Columns?

I have

List[Double]
, how to convert it to
org.apache.spark.sql.Column
. I am trying to insert it as a column using
.withColumn()
to existing
DataFrame
.

Answer

It cannot be done directly. Column is not a data structure but a representation of a specific SQL expression. It is not bound to a specific data. You'll have to transform your data first. One way to approach this is to parallelize and join by index:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, DoubleType}

val df = Seq(("a", 2), ("b", 1), ("c", 0)).toDF("x", "y")
val aList = List(1.0, -1.0, 0.0)

val rows = df.rdd.zipWithIndex.map(_.swap)
  .join(sc.parallelize(aList).zipWithIndex.map(_.swap))
  .values
  .map { case (row: Row, x: Double) => Row.fromSeq(row.toSeq :+ x) }

sqlContext.createDataFrame(rows, df.schema.add("z", DoubleType, false))

Another similar approach is to index and use and UDF to handle the rest:

import scala.util.Try

val indexedDf = sqlContext.createDataFrame(
  df.rdd.zipWithIndex.map {
    case (row: Row, i: Long) => Row.fromSeq(row.toSeq :+ i)
  },
  df.schema.add("idx_", "long")
)

def addValue(vs: Vector[Double]) = udf((i: Long) => Try(vs(i.toInt)).toOption)

indexedDf.withColumn("z", addValue(aList.toVector)($"idx_"))

Unfortunately both solutions will suffer from the issues. First of all passing local data through driver introduces a serious bottleneck in your program. Typically data should accessed directly from the executors. Another problem are growing RDD lineages if you want to perform this operation iteratively.

While the second issue can be addressed by checkpointing the first one makes this idea useless in general. I would strongly suggest that you either build completely structure first, and read it on Spark, or rebuild you pipeline in a way that can leverage Spark architecture. For example if data comes from an external source perform reads directly for each chunk of data using map / mapPartitions.