Shirish Kumar Shirish Kumar - 21 days ago 12
Scala Question

Spark - how to handle with lazy evaluation in case of iterative (or recursive) function calls

I have a recursive function that needs to compare the results of the current call to the previous call to figure out whether it has reached a convergence. My function does not contain any

action
- it only contains
map
,
flatMap
, and
reduceByKey
. Since Spark does not evaluate transformations (until an action is called), my next iteration does not get the proper values to compare for convergence.

Here is a skeleton of the function -

def func1(sc: SparkContext, nodes:RDD[List[Long]], didConverge: Boolean, changeCount: Int) RDD[(Long] = {

if (didConverge)
nodes
else {
val currChangeCount = sc.accumulator(0, "xyz")
val newNodes = performSomeOps(nodes, currChangeCount) // does a few map/flatMap/reduceByKey operations
if (currChangeCount.value == changeCount) {
func1(sc, newNodes, true, currChangeCount.value)
} else {
func1(sc, newNode, false, currChangeCount.value)
}
}
}


performSomeOps
only contains
map
,
flatMap
, and
reduceByKey
transformations. Since it does not have any action, the code in
performSomeOps
does not execute. So my
currChangeCount
does not get the actual count. What that implies, the condition to check for the convergence (
currChangeCount.value == changeCount
) is going to be invalid. One way to overcome is to force an action within each iteration by calling a
count
but that is an unnecessary overhead.

I am wondering what I can do to force an action w/o much overhead or is there another way to address this problem?

Answer

I believe there is a very important thing you're missing here:

For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.

Because of that accumulators cannot be reliably used for managing control flow and are better suited for job monitoring.

Moreover executing an action is not an unnecessary overhead. If you want to know what is the result of the computation you have to perform it. Unless of course the result is trivial. The cheapest action possible is:

rdd.foreach { case _ =>  }

but it won't address the problem you have here.

In general iterative computations in Spark can be structured as follows:

def func1(chcekpoinInterval: Int)(sc: SparkContext, nodes:RDD[List[Long]], 
    didConverge: Boolean, changeCount: Int, iteration: Int) RDD[(Long] = {

  if (didConverge) nodes
  else {

    // Compute and cache new nodes
    val newNodes = performSomeOps(nodes, currChangeCount).cache

    // Periodically checkpoint to avoid stack overflow
    if (iteration % checkpointInterval == 0) newNodes.checkpoint

    /* Call a function which computes values
     that determines control flow. This execute an action on newNodes.
    */
    val changeCount = computeChangeCount(newNodes)

    // Unpersist old nodes
    nodes.unpersist

    func1(checkpointInterval)(
      sc, newNodes, currChangeCount.value == changeCount, 
      currChangeCount.value, iteration + 1
    )
  }
}
Comments