Vince.Bdn Vince.Bdn - 1 year ago 150
Scala Question

Scala reflection with Serialization (over Spark) - Symbols not serializable

To begin with I'm using scala 2.10.4 and the example above is run in Spark 1.6 (though I doubt Spark has anything to do with this, it's just a serialization issue).

So here's my problem: assume I have a trait

that is implemented by say two classes
. Now I have a generic trait that is extended by a collection of classes, one of them being over subtypes of
e.g. (here I keep Spark's notion of RDD, but it could be something else actually as soon as it is serialized; Something is just a result no matter what actually):

trait Foo[T] { def function(rdd: RDD[T]): Something }
class Foo1[B <: Base] extends Foo[B] { def function(rdd: RDD[B]): Something = ... }
class Foo2 extends Foo[A] { def function(rdd: RDD[A]): Something = ... }

Now I need an object that will take an
(assume no ambuiguity here, it's just a simplified version) an that returns
corresponding to the result of function corresponding with type
. But it should also work for
with a merging strategy. So far it looks like:

object Obj {
def compute[T: TypeTag](input: RDD[T]): Something = {
typeOf[T] match {
case t if t <:< typeOf[A] =>
val foo = new Foo[T]
case t if t <:< typeOf[Array[A]] =>
val foo = new Foo[A]
foo.function( => mergeArray(x.asInstance[Array[A]])))
case t if t <:< typeOf[Base] =>
val foo = new Foo[T]
// here it gets ugly...
case t if t <:< typeOf[Array[_]] => // doesn't fall through with Array[Base]... why?
val tt = getSubInfo[T](0)
val tpe = tt.tpe
val foo = new Foo[tpe.type]
foo.function( => (x._1, mergeArray(x._2.asInstanceOf[Array[tpe.type]]))

// strategy to transform arrays of T into a T object when possible
private def mergeArray[T: TypeTag](a: Array[T]): T = ...

// extract the subtype, e.g. if Array[Int] then at position 0 extracts a type tag for Int, I can provide the code but not fondamental for the comprehension of the problem though
private def getSubInfo[T: TypeTag](i: Int): TypeTag[_] = ...

Unfortunatly, it seems to work fine on a local machine, but when it gets sent to Spark (serialized), I get a
org.apache.spark.SparkException: Task not serializable

Caused by: scala.reflect.internal.Symbols$PackageClassSymbol
Serialization stack:
- object not serializable (class: scala.reflect.internal.Symbols$PackageClassSymbol, value: package types)
- field (class: scala.reflect.internal.Types$ThisType, name: sym, type: class scala.reflect.internal.Symbols$Symbol)

I do have a workaround (quite obvious, enumerate possibilities), but for my curiosity, is there a way to fix this? And why aren't Symbol serializable whereas their equivalents in Manifests were?

Thanks for the help.

Answer Source

TypeTags are generally now serializable in scala but, oddly, not types directly (this is odd because typetags contain symbols :-/).

This might do what you want

// implicit constructor TypeTag parameter is serialized.
abstract class TypeAware[T:TypeTag] extends Serializable {
  def typ:Type = _typeCached

  lazy val _typeCached:Type = typeOf[T]

trait Foo[T] extends Serializable { 
  def function(rdd: RDD[T]): Something  {... impl here?...}
  def typ:Type 

class Concrete[T:TypeTag] extends TypeAware[T] with Foo[T] with Serializable{
   def function(rdd: RDD[T]): Something  {... impl here?...}
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download