sarthak - 3 years ago 232
Scala Question

I have an RDD of form

`(String,(Int,Iterable[String]))`
. The integer value (which I call distance) is initially set to 10 for each entry in the RDD.
Every element in the
`Iterable[String]`
has its own entry in this RDD where it serves as a key (and hence we have the distance for each element in the
`Iterable[String]`
in a separate rdd entry). My intent is to do the following:

1. If the list (
`Iterable[String]`
) contains an element "Bethan", I will assign its distance to be 1.

2.After this, I created a list of all the keys with distance 1 by filtering.

3.After this, I am transforming the RDD into a new on which updates it's distance value to 2 if any of the elements in it's own list has a distance 1.

I have the following code:

``````val disOneRdd = disRdd.map(x=> {if(x._2._2.toList.contains("Bethan")) (x._1,(1,x._2._2)) else x})
var lst = disRdd.filter(x=> x._2._1 == 1).keys.collect
val disTwoRdd = disRdd.map(x=> {
var b:Boolean = false
loop.breakable{
for (str <- x._2._2)
if (lst.contains(str)) //checks if it contains element with distance 1
b = true
loop.break
}
if (b)
(x._1,(2,x._2._2))
else
(x._1,(10,x._2._2))
})
``````

But when I run it I get the error "Task not Serializable". How can I do it and also is there a better way to do it?

EDIT

Input RDD of form:

``````("abc",(10,List("efg","hij","klm")))
("efg",(10,List("jhg","Beethan","abc","ert")))
("Beethan",(0,List("efg","vcx","zse")))
("vcx",(10,List("czx","Beethan","abc")))
("zse",(10,List("efg","Beethan","nbh")))
("gvf",(10,List("vcsd","fdgd")))
...
``````

Every element that contains Beethan in its list should have distance 1. Every element which has "an element with distance 1" (and not Beethan) should have distance 2. The out has the form:

``````("abc",(2,List("efg","hij","klm")))
("efg",(1,List("jhg","Beethan","abc","ert")))
("Beethan",(0,List("efg","vcx","zse")))
("vcx",(1,List("czx","Beethan","abc")))
("zse",(1,List("efg","Beethan","nbh"))
("gvf",(10,List("vcsd","fdgd")))
...
``````

Error message:

``````[error] (run-main-0) org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner\$.ensureSerializable(ClosureCleaner.scala:298)
at   org.apache.spark.util.ClosureCleaner\$.org\$apache\$spark\$util\$ClosureCleaner\$\$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner\$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
at org.apache.spark.rdd.RDD\$\$anonfun\$map\$1.apply(RDD.scala:366)
at org.apache.spark.rdd.RDD\$\$anonfun\$map\$1.apply(RDD.scala:365)
at   org.apache.spark.rdd.RDDOperationScope\$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope\$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.map(RDD.scala:365)
at Bacon\$.main(Bacon.scala:86)
at Bacon.main(Bacon.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
Caused by: java.io.NotSerializableException: scala.util.control.Breaks
Serialization stack:
- object not serializable (class: scala.util.control.Breaks, value: scala.util.control.Breaks@78426203)
- field (class: Bacon\$\$anonfun\$15, name: loop\$1, type: class  scala.util.control.Breaks)
- object (class Bacon\$\$anonfun\$15, <function1>)
``````

``````val disOneRdd = disRdd.map(x=> {if(x._2._2.toList.contains("Bethan")) (x._1,(1,x._2._2)) else x})
var lst = disRdd.filter(x=> x._2._1 == 1).keys.collect
val disTwoRdd = disRdd.map(x=> {
var b:Boolean = x._._2.filter(y => lst.contains(y)).size() > 0
if (b)
(x._1,(2,x._2._2))
else
(x._1,(10,x._2._2))
})
``````

or

``````import scala.util.control.Breaks._
val disOneRdd = disRdd.map(x=> {if(x._2._2.toList.contains("Bethan")) (x._1,(1,x._2._2)) else x})
var lst = disRdd.filter(x=> x._2._1 == 1).keys.collect
val disTwoRdd = disRdd.map(x=> {
var b:Boolean = false
breakable{
for (str <- x._2._2)
if (lst.contains(str)) //checks if it contains element with distance 1
b = true
break
}
if (b)
(x._1,(2,x._2._2))
else
(x._1,(10,x._2._2))
})
``````

Both versions works for me. Problem was with loop.breakable that is not serializable. To be honest, I don't know if behaviour of this construction has changed, but after replacing `loop.breakable` to `breakable` it works - maybe there were some API changes. Version with filter could be slower, but avoids problem with `breakable`

Despite main question, lst should be broadcasted variable - however I didn't put broadcasted variable here to provide as simple answer as it is possible

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download