vdep vdep - 1 year ago 120
Scala Question

How to add a column to Dataset without converting from a DataFrame and accessing it?

I am aware of method to add a new column to a Spark DataSet using

and a
, which returns a DataFrame. I am also aware that, we can convert the resulting DataFrame to a DataSet.

My questions are:

  1. How does DataSet's type safety comes into play here, if we are still following traditional DF approach (i.e passing column names as a string for UDF's input)

  2. Is there an "Object Oriented Way" of accessing columns(without passing column names as a string) like we used to do with RDD, for appending a new column.

  3. How to access the new column in normal operations like map, filter etc?

For example:

scala> case class Temp(a : Int, b : String) //creating case class
scala> val df = Seq((1,"1str"),(2,"2str),(3,"3str")).toDS // creating DS
scala> val appendUDF = udf( (b : String) => b + "ing") // sample UDF

scala> df.withColumn("c",df("b")) // adding a new column
res5: org.apache.spark.sql.DataFrame = [a: int, b: string ... 1 more field]

scala> res5.as[Temp] // converting to DS
res6: org.apache.spark.sql.Dataset[Temp] = [a: int, b: string ... 1 more field]

scala> res6.map( x =>x.
// list of autosuggestion :
a canEqual equals productArity productIterator toString
b copy hashCode productElement productPrefix

the new column
, that i have added using
is not accessible, Because column
is not in the case class
(it contains only
) at the instant when it is converted to DS using

How to access column

Answer Source

In the type-safe world of Datasets you'd map an structure into another.

That is, for each transformation, we need schema representations of the data (as it is needed for RDDs). To access 'c' above, we need to create a new schema that provides access to it.

case class A(a:String)
case class BC(b:String, c:String)
val f:A => BC = a=> BC(a.a,"c") // Transforms an A into a BC

val data = (1 to 10).map(i => A(i.toString))
val dsa = spark.createDataset(data)
// dsa: org.apache.spark.sql.Dataset[A] = [a: string]

val dsb = dsa.map(f)
//dsb: org.apache.spark.sql.Dataset[BC] = [b: string, c: string]
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download