Vasu G Vasu G - 12 days ago 7
Scala Question

SparkContext parallelize lazy behavior - unexplained

According to Spark source code comments.

SparkContext.scala has

/** Distribute a local Scala collection to form an RDD.
*
* @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
* to parallelize and before the first action on the RDD, the resultant RDD will reflect the
* modified collection. Pass a copy of the argument to avoid this.
* @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
* RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
*/


So, I thought I'd do a simple test.

scala> var c = List("a0", "b0", "c0", "d0", "e0", "f0", "g0")
c: List[String] = List(a0, b0, c0, d0, e0, f0, g0)

scala> var crdd = sc.parallelize(c)
crdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> c = List("x1", "y1")
c: List[String] = List(x1, y1)

scala> crdd.foreach(println)
[Stage 0:> (0 + 0) / 8]d0
a0
b0
e0
f0
g0
c0

scala>


I was expecting
crdd.foreach(println)
to output "
x1
" and "
y1
", based on the lazy behavior of
parallelize
.

What am I doing wrong?

Answer

You didn't modify c at all. You re-assigned it to a new List.

Besides that point,

If seq is a mutable collection

Scala's List is not a mutable collection

and is altered after the call to parallelize and before the first action on the RDD

Well, see, you didn't really alter the list.


Here's a proper example of the documented behavior.

scala> val c = scala.collection.mutable.ListBuffer(1, 2, 3)
c: scala.collection.mutable.ListBuffer[Int] = ListBuffer(1, 2, 3)

scala> val cRDD = sc.parallelize(c)
cRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:29

scala> c.append(4)

scala> c
res7: scala.collection.mutable.ListBuffer[Int] = ListBuffer(1, 2, 3, 4)

scala> cRDD.collect()
res8: Array[Int] = Array(1, 2, 3, 4)