Advika Advika - 11 months ago 151
Scala Question

Encoder error while trying to map dataframe row to updated row

When I m trying to do the same thing in my code as mentioned below => {
val row1 = row.getAs[String](1)
val make = if (row1.toLowerCase == "tesla") "S" else row1

I have taken the above reference from here:
Scala: How can I replace value in Dataframs using scala
But I am getting encoder error as ---
Unable to find encoder for type stored in a Dataset. Primitive types (Int, S
tring, etc) and Product types (case classes) are supported by importing
plicits._ Support for serializing other types will be added in future releases.

Note- I am using spark 2.0!

Answer Source

There is nothing unexpected here. You're trying to use code which has been written with Spark 1.x and is no longer supported in Spark 2.0:

  • in 1.x is ((Row) ⇒ T)(ClassTag[T]) ⇒ RDD[T]
  • in 2.x Dataset[Row].map is ((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]

To be honest it didn't make much sense in 1.x either. Independent of version you can simply use DataFrame API:

import org.apache.spark.sql.functions.{when, lower}

val df = Seq(
  (2012, "Tesla", "S"), (1997, "Ford", "E350"),
  (2015, "Chevy", "Volt")
).toDF("year", "make", "model")

df.withColumn("make", when(lower($"make") === "tesla", "S").otherwise($"make"))

If you really want to use map you should statically typed Dataset:

import spark.implicits._

case class Record(year: Int, make: String, model: String)[Record].map {
  case tesla if tesla.make.toLowerCase == "tesla" => tesla.copy(make = "S")
  case rec => rec

or at least return an object which will have implicit encoder: {
  case Row(year: Int, make: String, model: String) => 
    (year, if(make.toLowerCase == "tesla") "S" else make, model)

Finally if for some completely crazy reason you really want to map over Dataset[Row] you have to provide required encoder:

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

// Yup, it would be possible to reuse df.schema here
val schema = StructType(Seq(
  StructField("year", IntegerType),
  StructField("make", StringType),
  StructField("make", StringType)

val encoder = RowEncoder(schema) {
  case Row(year, make: String, model) if make.toLowerCase == "tesla" => 
    Row(year, "S", model)
  case row => row
} (encoder)