Martin Senne Martin Senne - 2 months ago 26
Scala Question

How to define schema for custom type in Spark SQL?

The following example code tries to put some case objects into a dataframe. The code includes the definition of a case object hierarchy and a case class using this trait:

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext

sealed trait Some
case object AType extends Some
case object BType extends Some

case class Data( name : String, t: Some)

object Example {
def main(args: Array[String]) : Unit = {
val conf = new SparkConf()
.setAppName( "Example" )
.setMaster( "local[*]")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

import sqlContext.implicits._

val df = sc.parallelize( Seq( Data( "a", AType), Data( "b", BType) ), 4).toDF()
df.show()
}
}


When executing the code, I unfortunately encounter the following exception:

java.lang.UnsupportedOperationException: Schema for type Some is not supported


Questions




  • Is there a possibility to add or define a schema for certain types (here type
    Some
    )?

  • Does another approach exist to represent this kind of enumerations?


    • I tried to use
      Enumeration
      directly, but also without success. (see below)




Code for
Enumeration
:

object Some extends Enumeration {
type Some = Value
val AType, BType = Value
}


Thanks in advance. I hope, that the best approach is not to use strings instead.

Answer

Spark 2.0.0+:

UserDefinedType has been made private in Spark 2.0.0 and as for now it has no Dataset friendly replacement.

See: SPARK-14155 (Hide UserDefinedType in Spark 2.0)

Most of the time statically typed Dataset can serve as replacement.

Spark < 2.0.0

Is there a possibility to add or define a schema for certain types (here type Some)?

I guess the answer depends on how badly you need this. It looks like it is possible to create an UserDefinedType but it requires access to DeveloperApi and is not exactly straightforward or well documented.

import org.apache.spark.sql.types._

@SQLUserDefinedType(udt = classOf[SomeUDT])
sealed trait Some
case object AType extends Some
case object BType extends Some

class SomeUDT extends UserDefinedType[Some] {
  override def sqlType: DataType = IntegerType

  override def serialize(obj: Any) = {
    obj match {
      case AType => 0
      case BType => 1
    }
  }

  override def deserialize(datum: Any): Some = {
    datum match {
      case 0 => AType
      case 1 => BType
    }
  }

  override def userClass: Class[Some] = classOf[Some]
}

You should probably override hashCode and equals as well.

Its PySpark counterpart can look like this:

from enum import Enum, unique
from pyspark.sql.types import UserDefinedType, IntegerType

class SomeUDT(UserDefinedType):
    @classmethod
    def sqlType(self):
        return IntegerType()

    @classmethod
    def module(cls):
        return cls.__module__

    @classmethod 
    def scalaUDT(cls): # Required in Spark < 1.5
        return 'net.zero323.enum.SomeUDT'

    def serialize(self, obj):
        return obj.value

    def deserialize(self, datum):
        return {x.value: x for x in Some}[datum]

@unique
class Some(Enum):
    __UDT__ = SomeUDT()
    AType = 0
    BType = 1

In Spark < 1.5 Python UDT requires a paired Scala UDT, but it look like it is no longer the case in 1.5.

For a simple UDT like you can use simple types (for example IntegerType instead of whole Struct).

Comments