superhan superhan - 1 year ago 70
Scala Question

Serialization Exception on spark

I meet a very strange problem on Spark about serialization.
The code is as below:

class PLSA(val sc : SparkContext, val numOfTopics : Int) extends Serializable
def infer(document: RDD[Document]): RDD[DocumentParameter] = {
val docs = => DocumentParameter(doc, numOfTopics))

where Document is defined as:

class Document(val tokens: SparseVector[Int]) extends Serializable

and DocumentParameter is:

class DocumentParameter(val document: Document, val theta: Array[Float]) extends Serializable

object DocumentParameter extends Serializable
def apply(document: Document, numOfTopics: Int) = new DocumentParameter(document,

SparseVectoris a serializable class in

This is a simple map procedure, and all the classes are serializable, but I get this exception:

org.apache.spark.SparkException: Task not serializable

But when I remove the
parameter, that is:

object DocumentParameter extends Serializable
def apply(document: Document) = new DocumentParameter(document,

and call it like this:

val docs =

and it seems OK.

Is type Int not serializable? But I do see that some code is written like that.

I am not sure how to fix this bug.


Thank you @samthebest. I will add more details about it.

stack trace:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at com.topicmodel.PLSA.infer(PLSA.scala:13)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
at $iwC$$iwC$$iwC.<init>(<console>:39)
at $iwC$$iwC.<init>(<console>:41)
at $iwC.<init>(<console>:43)
at <init>(<console>:45)
at .<init>(<console>:49)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
at java.lang.reflect.Method.invoke(
at org.apache.spark.repl.SparkIMain$
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
at java.lang.reflect.Method.invoke(
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.SparkContext
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 46 more

As the stack trace gives the general information of exception, I removed it.

I run the code in the spark-shell.

// suppose I have get RDD[Document] for docs
val numOfTopics = 100
val plsa = new PLSA(sc, numOfTopics)
val docPara = plsa.infer(docs)

Could you give me some tutorials or tips on serializable?

lmm lmm
Answer Source

Anonymous functions serialize their containing class. When you map {doc => DocumentParameter(doc, numOfTopics)}, the only way it can give that function access to numOfTopics is to serialize the PLSA class. And that class can't actually be serialized, because (as you can see from the stacktrace) it contains the SparkContext which isn't serializable (Bad Things would happen if individual cluster nodes had access to the context and could e.g. create new jobs from within a mapper).

In general, try to avoid storing the SparkContext in your classes (edit: or at least, make sure it's very clear what kind of classes contain the SparkContext and what kind don't); it's better to pass it as a (possibly implicit) parameter to individual methods that need it. Alternatively, move the function {doc => DocumentParameter(doc, numOfTopics)} into a different class from PLSA, one that really can be serialized.

(As multiple people have suggested, it's possible to keep the SparkContext in the class but marked as @transient so that it won't be serialized. I don't recommend this approach; it means the class will "magically" change state when serialized (losing the SparkContext), and so you might end up with NPEs when you try to access the SparkContext from inside a serialized job. It's better to maintain a clear distinction between classes that are only used in the "control" code (and might use the SparkContext) and classes that are serialized to run on the cluster (which must not have the SparkContext)).

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download