p2. p2. - 10 months ago 66
Scala Question

Abort map execution in spark

How can I break the map job in spark:

val rddFiltered = rdd.map(value => value.toInt).map(value => {

if (value == 0)
// if the condition is true ,
// stop the map execution and return the processed RDD

Suppose my rdd is :

I want

Answer Source

It is not really something that you can efficiently solve using Spark. Order in which partitions are processed is not guaranteed and there is no such thing as partial transformation.

If you'd expect that expected number of records is small enough to be handled by the driver you could implement a similar method to take by iteratively runJob and collect the partitions as long you won't find the first one which contains a value which satisfy the predicate.

Alternatively, at the cost of a full data scan:

def takeWhile[T : ClassTag](rdd: RDD[T])(pred: T => Boolean) = {
  /* Determine partition where the predicate is not satisfied
     for the first time */
  val i = rdd.mapPartitionsWithIndex((i, iter) => 
    if (iter.exists(!pred(_))) Iterator(i) else Iterator()

  /* Process partitions dropping elements after the first one
     which doesn't satisfy the predicate */
  rdd.mapPartitionsWithIndex { 
    case (j, iter) if j < i => iter  // before i-th take all elements

    // i-th drop after the element of interest
    case (j, iter) if j == i => iter.takeWhile(pred) 

    case _ =>  Iterator() // after i-th drop all

val rdd = sc.parallelize(Seq(3, 4, 7, 1, 3, 0, 4, 6))

takeWhile(rdd)(_ != 0).collect
// Array[Int] = Array(3, 4, 7, 1, 3)

takeWhile(rdd)(_ != 3).collect
// Array[Int] = Array()

takeWhile(rdd)(_ >= 2).collect
// Array[Int] = Array(3, 4, 7) 

Please note that this doesn't modify the number of partitions so without repartitioning it might result in a suboptimal resource utilization.

At the end of the day applying this type of sequential logic to Spark rarely makes sense. Also not that it doesn't modify the number of partitions so