zero323 zero323 - 2 months ago 24
Scala Question

How to store custom objects in a Dataset

According to Introducing Spark Datasets:


As we look forward to Spark 2.0, we plan some exciting improvements to Datasets, specifically:
...
Custom encoders – while we currently autogenerate encoders for a wide variety of types, we’d like to open up an API for custom objects.


and attempts to store custom type in a
Dataset
lead to following error like:


Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing sqlContext.implicits._ Support for serializing other types will be added in future releases


or:


Java.lang.UnsupportedOperationException: No Encoder found for ....


Are there any existing workarounds?




Note this question exists only as an entry point for a Community Wiki answer. Feel free to update / improve both question and answer.

Answer
  1. Using generic encoders.

    There are two generic encoders available for now kryo and javaSerialization where the latter one is explicitly described as:

    extremely inefficient and should only be used as the last resort.

    Assuming following class

    class Bar(i: Int) {
      override def toString = s"bar $i"
      def bar = i
    }
    

    you can use these encoders by adding implicit encoder:

    object BarEncoders {
      implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = 
      org.apache.spark.sql.Encoders.kryo[Bar]
    }
    

    which can be used together as follows:

    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarEncoders._
    
        val ds = Seq(new Bar(1)).toDS
        ds.show
    
        sc.stop()
      }
    }
    

    It stores objects as binary column so when converted to DataFrame you get following schema:

    root
     |-- value: binary (nullable = true)
    

    It is also possible to encode tuples using kryo encoder for specific field:

    val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar])
    
    spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder)
    // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]
    

    Please note that we don't depend on implicit encoders here but pass encoder explicitly so this most likely won't work with toDS method.

  2. Using implicit conversions:

    Provide implicit conversions between representation which can be encoded and custom class, for example:

    object BarConversions {
      implicit def toInt(bar: Bar): Int = bar.bar
      implicit def toBar(i: Int): Bar = new Bar(i)
    }
    
    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarConversions._
    
        type EncodedBar = Int
    
        val bars: RDD[EncodedBar]  = sc.parallelize(Seq(new Bar(1)))
        val barsDS = bars.toDS
    
        barsDS.show
        barsDS.map(_.bar).show
    
        sc.stop()
      }
    }
    

Related questions: