bill bill - 1 month ago 17
Scala Question

Spark Array is empty after iteration

In spark 1.6.0(I am not very familiar with spark and scala) when I iterate over a collection and add items to an array when the iteration is over the array seems to be empty.

var testing = unlabeled.map { line =>
val parts = line.split(',')
val text = parts(7).split(' ')
(line, htf.transform(text))
}

var lowPropQueue = new mutable.ArrayBuffer[(String, org.apache.spark.mllib.linalg.Vector)]
var highPropQueue = new mutable.ArrayBuffer[(String, org.apache.spark.mllib.linalg.Vector)]

for(counter <- 1 to 5){

logger.info("this is the " + counter + " run -----------------")
for (i <- testing) {
val label = model.predict(i._2).toString
// logger.info(i._1.split(",")(7))
// logger.info(label)
var probs = model.predictProbabilities(i._2)
logger.info("prob 0 : " + probs(0))
logger.info("prob 1 : " + probs(1))
logger.info("--------------------- ")

if (probs(0).toDouble <= 0.95 && probs(1).toDouble <= 0.95 ) {
lowPropQueue.+=(i)
} else {
highPropQueue.+=((i._1 + "," + label , i._2))
}

logger.info("size of high array : " + highPropQueue.length)
logger.info("size of low array : " + lowPropQueue.length)

}

logger.info("passed: " + lowPropQueue.length)
logger.info("NOT passed: " + highPropQueue.length)

var xx= sc.parallelize(highPropQueue).collect()
var yy = sc.parallelize(lowPropQueue).collect()

logger.info("passed: " + xx.length)
logger.info("NOT passed: " + yy.length)
...
}


but based on the logs the inner loop seems to add elements to the Arrays, i.e.:


16/10/11 11:22:31 INFO SelfLearningMNB$: size of high array : 500

16/10/11 11:22:31 INFO SelfLearningMNB$: size of low array : 83

16/10/11 11:22:31 INFO SelfLearningMNB$: prob 0 : 0.37094327822665185

16/10/11 11:22:31 INFO SelfLearningMNB$: prob 1 : 0.6290567217733481

16/10/11 11:22:31 INFO SelfLearningMNB$: ---------------------

16/10/11 11:22:31 INFO SelfLearningMNB$: size of high array : 500

16/10/11 11:22:31 INFO SelfLearningMNB$: size of low array : 84

16/10/11 11:22:31 INFO SelfLearningMNB$: prob 0 : 0.16872929936216619

16/10/11 11:22:31 INFO SelfLearningMNB$: prob 1 : 0.8312707006378338


But when the inner loop ends I get this:


16/10/11 11:43:53 INFO SelfLearningMNB$: passed: 0

16/10/11 11:43:53 INFO SelfLearningMNB$: NOT passed: 0


What is going on ?

EDIT

How can you obtain the data from executors or save data from executors to HDFS so they can be read from master node later?

Answer

TL; DR This cannot work in Spark.

What is going on ?

  • each executor gets its own copy of lowPropQueue, highPropQueue.
  • during iteration local copies are modified
  • after iteration local copies are discarded

FYI Naive append to ArrayBuffer is not thread safe.