vdep vdep - 20 days ago 11
Scala Question

How to add a column to Dataset without converting from a DataFrame and accessing it?

I am aware of method to add a new column to a Spark DataSet using

.withColumn()
and a
UDF
, which returns a DataFrame. I am also aware that, we can convert the resulting DataFrame to a DataSet.

My questions are:


  1. How does DataSet's type safety comes into play here, if we are still following traditional DF approach (i.e passing column names as a string for UDF's input)

  2. Is there an "Object Oriented Way" of accessing columns(without passing column names as a string) like we used to do with RDD, for appending a new column.

  3. How to access the new column in normal operations like map, filter etc?



For example:

scala> case class Temp(a : Int, b : String) //creating case class
scala> val df = Seq((1,"1str"),(2,"2str),(3,"3str")).toDS // creating DS
scala> val appendUDF = udf( (b : String) => b + "ing") // sample UDF

scala> df.withColumn("c",df("b")) // adding a new column
res5: org.apache.spark.sql.DataFrame = [a: int, b: string ... 1 more field]

scala> res5.as[Temp] // converting to DS
res6: org.apache.spark.sql.Dataset[Temp] = [a: int, b: string ... 1 more field]

scala> res6.map( x =>x.
// list of autosuggestion :
a canEqual equals productArity productIterator toString
b copy hashCode productElement productPrefix


the new column
c
, that i have added using
.withColumn()
is not accessible, Because column
c
is not in the case class
Temp
(it contains only
a
&
b
) at the instant when it is converted to DS using
res5.as[Temp]
.

How to access column
c
?

Answer

In the type-safe world of Datasets you'd map an structure into another.

That is, for each transformation, we need schema representations of the data (as it is needed for RDDs). To access 'c' above, we need to create a new schema that provides access to it.

case class A(a:String)
case class BC(b:String, c:String)
val f:A => BC = a=> BC(a.a,"c") // Transforms an A into a BC

val data = (1 to 10).map(i => A(i.toString))
val dsa = spark.createDataset(data)
// dsa: org.apache.spark.sql.Dataset[A] = [a: string]

val dsb = dsa.map(f)
//dsb: org.apache.spark.sql.Dataset[BC] = [b: string, c: string]