kpo kpo - 1 month ago 8
Scala Question

Apache Spark - Dataset operations fail in abstract base class?

I'm trying to extract some common code into an abstract class, but running into issues.

Let's say I'm reading in a file formatted as "id|name":

case class Person(id: Int, name: String) extends Serializable

object Persons {
def apply(lines: Dataset[String]): Dataset[Person] = {
import lines.sparkSession.implicits._
lines.map(line => {
val fields = line.split("\\|")
Person(fields(0).toInt, fields(1))
})
}
}

Persons(spark.read.textFile("persons.txt")).show()


Great. This works fine. Now let's say I want to read a number of different files with "name" fields, so I'll extract out all of the common logic:

trait Named extends Serializable { val name: String }

abstract class NamedDataset[T <: Named] {
def createRecord(fields: Array[String]): T
def apply(lines: Dataset[String]): Dataset[T] = {
import lines.sparkSession.implicits._
lines.map(line => createRecord(line.split("\\|")))
}
}

case class Person(id: Int, name: String) extends Named

object Persons extends NamedDataset[Person] {
override def createRecord(fields: Array[String]) =
Person(fields(0).toInt, fields(1))
}


This fails with two errors:

Error:
Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes)
are supported by importing spark.implicits._ Support for serializing
other types will be added in future releases.
lines.map(line => createRecord(line.split("\\|")))

Error:
not enough arguments for method map:
(implicit evidence$7: org.apache.spark.sql.Encoder[T])org.apache.spark.sql.Dataset[T].
Unspecified value parameter evidence$7.
lines.map(line => createRecord(line.split("\\|")))


I have a feeling this has something to do with implicits, TypeTags, and/or ClassTags, but I'm just starting out with Scala and don't fully understand these concepts yet.

Answer

You have to make two small changes:

  • Since only primitives and Products are supported (as error message states), making your Named trait Serializable isn't enough. You should make it extend Product (which means case classes and Tuples can extend it)
  • Indeed both ClassTag and TypeTag are required for Spark to overcome type erasure and figure out the actual types

So - here's a working version:

import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag

trait Named extends Product { val name: String }

abstract class NamedDataset[T <: Named : ClassTag : TypeTag] extends Serializable {
  def createRecord(fields: Array[String]): T
  def apply(lines: Dataset[String]): Dataset[T] = {
    import lines.sparkSession.implicits._
    lines.map(line => createRecord(line.split("\\|")))
  }
}

case class Person(id: Int, name: String) extends Named

object Persons extends NamedDataset[Person] {
  override def createRecord(fields: Array[String]) =
    Person(fields(0).toInt, fields(1))
}