sarthak sarthak - 1 month ago 16
Scala Question

Task not Serializable error:Spark

I have an RDD of form

(String,(Int,Iterable[String]))
. The integer value (which I call distance) is initially set to 10 for each entry in the RDD.
Every element in the
Iterable[String]
has its own entry in this RDD where it serves as a key (and hence we have the distance for each element in the
Iterable[String]
in a separate rdd entry). My intent is to do the following:

1. If the list (
Iterable[String]
) contains an element "Bethan", I will assign its distance to be 1.

2.After this, I created a list of all the keys with distance 1 by filtering.

3.After this, I am transforming the RDD into a new on which updates it's distance value to 2 if any of the elements in it's own list has a distance 1.

I have the following code:

val disOneRdd = disRdd.map(x=> {if(x._2._2.toList.contains("Bethan")) (x._1,(1,x._2._2)) else x})
var lst = disRdd.filter(x=> x._2._1 == 1).keys.collect
val disTwoRdd = disRdd.map(x=> {
var b:Boolean = false
loop.breakable{
for (str <- x._2._2)
if (lst.contains(str)) //checks if it contains element with distance 1
b = true
loop.break
}
if (b)
(x._1,(2,x._2._2))
else
(x._1,(10,x._2._2))
})


But when I run it I get the error "Task not Serializable". How can I do it and also is there a better way to do it?

EDIT

Input RDD of form:

("abc",(10,List("efg","hij","klm")))
("efg",(10,List("jhg","Beethan","abc","ert")))
("Beethan",(0,List("efg","vcx","zse")))
("vcx",(10,List("czx","Beethan","abc")))
("zse",(10,List("efg","Beethan","nbh")))
("gvf",(10,List("vcsd","fdgd")))
...


Every element that contains Beethan in its list should have distance 1. Every element which has "an element with distance 1" (and not Beethan) should have distance 2. The out has the form:

("abc",(2,List("efg","hij","klm")))
("efg",(1,List("jhg","Beethan","abc","ert")))
("Beethan",(0,List("efg","vcx","zse")))
("vcx",(1,List("czx","Beethan","abc")))
("zse",(1,List("efg","Beethan","nbh"))
("gvf",(10,List("vcsd","fdgd")))
...


Error message:

[error] (run-main-0) org.apache.spark.SparkException: Task not serializable
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.map(RDD.scala:365)
at Bacon$.main(Bacon.scala:86)
at Bacon.main(Bacon.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
Caused by: java.io.NotSerializableException: scala.util.control.Breaks
Serialization stack:
- object not serializable (class: scala.util.control.Breaks, value: scala.util.control.Breaks@78426203)
- field (class: Bacon$$anonfun$15, name: loop$1, type: class scala.util.control.Breaks)
- object (class Bacon$$anonfun$15, <function1>)

Answer
val disOneRdd = disRdd.map(x=> {if(x._2._2.toList.contains("Bethan")) (x._1,(1,x._2._2)) else x})
var lst = disRdd.filter(x=> x._2._1 == 1).keys.collect
val disTwoRdd = disRdd.map(x=> {
    var b:Boolean = x._._2.filter(y => lst.contains(y)).size() > 0
    if (b)
        (x._1,(2,x._2._2))
    else    
        (x._1,(10,x._2._2))
    })

or

import scala.util.control.Breaks._
val disOneRdd = disRdd.map(x=> {if(x._2._2.toList.contains("Bethan")) (x._1,(1,x._2._2)) else x})
var lst = disRdd.filter(x=> x._2._1 == 1).keys.collect
val disTwoRdd = disRdd.map(x=> {
    var b:Boolean = false
    breakable{
        for (str <- x._2._2)
        if (lst.contains(str)) //checks if it contains element with distance 1
            b = true
            break
    }
    if (b)
        (x._1,(2,x._2._2))
    else    
        (x._1,(10,x._2._2))
    })

Both versions works for me. Problem was with loop.breakable that is not serializable. To be honest, I don't know if behaviour of this construction has changed, but after replacing loop.breakable to breakable it works - maybe there were some API changes. Version with filter could be slower, but avoids problem with breakable

Despite main question, lst should be broadcasted variable - however I didn't put broadcasted variable here to provide as simple answer as it is possible

Comments