javadba javadba - 1 year ago 326
Scala Question

How to create a custom Encoder in Spark 2.X Datasets?

Spark Datasets move away from Row's to

's for Pojo's/primitives. The
engine uses an
to convert columns in a SQL expression. However there do not appear to be other subclasses of
available to use as a template for our own implementations.

Here is an example of code that is happy in Spark 1.X / DataFrames that does not compile in the new regime:

//mapping each row to RDD tuple => {
var id: String = if (!has_id) "" else row.getAs[String]("id")
var label: String = row.getAs[String]("label")
val channels : Int = if (!has_channels) 0 else row.getAs[Int]("channels")
val height : Int = if (!has_height) 0 else row.getAs[Int]("height")
val width : Int = if (!has_width) 0 else row.getAs[Int]("width")
val data : Array[Byte] = row.getAs[Any]("data") match {
case str: String => str.getBytes
case arr: Array[Byte@unchecked] => arr
case _ => {
log.error("Unsupport value type")
(id, label, channels, height, width, data)


We get a compiler error of

Error:(56, 11) Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are supported
by importing spark.implicits._ Support for serializing other types will be added in future releases. => {

So then somehow/somewhere there should be a means to

  • Define/implement our custom Encoder

  • Apply it when performing a mapping on the
    (which is now a Dataset of type

  • Register the Encoder for use by other custom code

I am looking for code that successfully performs these steps.

Answer Source

As far as I am aware nothing really changed since 1.6 and the solutions described in How to store custom objects in a Dataset in Spark 1.6 are the only available options. Nevertheless your current code should work just fine with default encoders for product types.

To get some insight why your code worked in 1.x and may not work in 2.0.0 you'll have to check the signatures. In 1.x is a method which takes function Row => T and transforms RDD[Row] into RDD[T].

In 2.0.0 takes a function of type Row => T as well, but transforms Dataset[Row] (a.k.a DataFrame) into Dataset[T] hence T requires an Encoder. If you want to get the "old" behavior you should use RDD explicitly: => ???)
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download