Vince.Bdn Vince.Bdn - 2 months ago 33
Scala Question

Scala reflection with Serialization (over Spark) - Symbols not serializable

To begin with I'm using scala 2.10.4 and the example above is run in Spark 1.6 (though I doubt Spark has anything to do with this, it's just a serialization issue).

So here's my problem: assume I have a trait

Base
that is implemented by say two classes
B1
and
B2
. Now I have a generic trait that is extended by a collection of classes, one of them being over subtypes of
Base
e.g. (here I keep Spark's notion of RDD, but it could be something else actually as soon as it is serialized; Something is just a result no matter what actually):

trait Foo[T] { def function(rdd: RDD[T]): Something }
class Foo1[B <: Base] extends Foo[B] { def function(rdd: RDD[B]): Something = ... }
class Foo2 extends Foo[A] { def function(rdd: RDD[A]): Something = ... }
...


Now I need an object that will take an
RDD[T]
(assume no ambuiguity here, it's just a simplified version) an that returns
Something
corresponding to the result of function corresponding with type
T
. But it should also work for
Array[T]
with a merging strategy. So far it looks like:

object Obj {
def compute[T: TypeTag](input: RDD[T]): Something = {
typeOf[T] match {
case t if t <:< typeOf[A] =>
val foo = new Foo[T]
foo.function(rdd)
case t if t <:< typeOf[Array[A]] =>
val foo = new Foo[A]
foo.function(rdd.map(x => mergeArray(x.asInstance[Array[A]])))
case t if t <:< typeOf[Base] =>
val foo = new Foo[T]
foo.function(rdd)
// here it gets ugly...
case t if t <:< typeOf[Array[_]] => // doesn't fall through with Array[Base]... why?
val tt = getSubInfo[T](0)
val tpe = tt.tpe
val foo = new Foo[tpe.type]
foo.function(rdd.map(x => (x._1, mergeArray(x._2.asInstanceOf[Array[tpe.type]]))
}
}

// strategy to transform arrays of T into a T object when possible
private def mergeArray[T: TypeTag](a: Array[T]): T = ...

// extract the subtype, e.g. if Array[Int] then at position 0 extracts a type tag for Int, I can provide the code but not fondamental for the comprehension of the problem though
private def getSubInfo[T: TypeTag](i: Int): TypeTag[_] = ...
}


Unfortunatly, it seems to work fine on a local machine, but when it gets sent to Spark (serialized), I get a
org.apache.spark.SparkException: Task not serializable
with:

Caused by: java.io.NotSerializableException: scala.reflect.internal.Symbols$PackageClassSymbol
Serialization stack:
- object not serializable (class: scala.reflect.internal.Symbols$PackageClassSymbol, value: package types)
- field (class: scala.reflect.internal.Types$ThisType, name: sym, type: class scala.reflect.internal.Symbols$Symbol)


I do have a workaround (quite obvious, enumerate possibilities), but for my curiosity, is there a way to fix this? And why aren't Symbol serializable whereas their equivalents in Manifests were?

Thanks for the help.

Answer

TypeTags are generally now serializable in scala but, oddly, not types directly (this is odd because typetags contain symbols :-/).

This might do what you want

// implicit constructor TypeTag parameter is serialized.
abstract class TypeAware[T:TypeTag] extends Serializable {
  def typ:Type = _typeCached

  @transient
  lazy val _typeCached:Type = typeOf[T]
}

trait Foo[T] extends Serializable { 
  def function(rdd: RDD[T]): Something  {... impl here?...}
  def typ:Type 
}

class Concrete[T:TypeTag] extends TypeAware[T] with Foo[T] with Serializable{
   def function(rdd: RDD[T]): Something  {... impl here?...}
}
Comments