Milad Khajavi Milad Khajavi - 6 months ago 101
Java Question

No Java class corresponding to Product with Serializable with Base found

I've written

two case class
that extends Base
abstract class
. I have two list of each class (
listA
and
listB
). When I want to merge these two list, I can't convert the final list to Apache Spark 1.6.1 Dataset.

abstract class Base

case class A(name: String) extends Base
case class B(age: Int) extends Base

val listA: List[A] = A("foo")::A("bar")::Nil
val listB: List[B] = B(10)::B(20)::Nil
val list: List[Base with Product with Serializable] = listA ++ listB

val result: RDD[Base with Product with Serializable] = sc.parallelize(list).toDS()


Apache Spark will rise this Exception:

A needed class was not found. This could be due to an error in your runpath. Missing class: no Java class corresponding to Base with Product with Serializable found
java.lang.NoClassDefFoundError: no Java class corresponding to Base with Product with Serializable found
at scala.reflect.runtime.JavaMirrors$JavaMirror.typeToJavaClass(JavaMirrors.scala:1299)
at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:192)
at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:54)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:50)
at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:41)


When I want to create RDD from
list
Spark doesn't throw any Exception, But when I convert RDD to Dataset with
toDS()
method this prior exception will throw.

Answer

First, you can get a saner type for list by making it a List[Base] explicitly or by adding trait Base extends Product with Serializable if the intention is for it only to be extended by case classes/objects.

Spark 1.6 comes with support for automatically generating encoders for a wide variety of types, including primitive types (e.g. String, Integer, Long), Scala case classes, and Java Beans.

Note that abstract classes like Base are not supported. And custom encoders aren't supported either. Though you could try using the kryo (or javaSerialization, as the last resort) encoder, see How to store custom objects in a Dataset in Spark 1.6.

Here is complete working example:

abstract class Base extends Serializable

case class A(name: String) extends Base

case class B(age: Int) extends Base

object BaseEncoder {
  implicit def baseEncoder: org.apache.spark.Encoder[Base] = org.apache.spark.Encoders.kryo[Base]
}


val listA: Seq[A] = Seq(A("a"), A("b"))
val listB: Seq[B] = Seq(B(1), B(2))
val list: Seq[Base] = listA ++ listB

val ds = sc.parallelize(list).toDS