sarthak sarthak - 1 month ago 22
Scala Question

Calling function in mapPartition Spark

I have a scala function as follows:

def variance (reg: Int, samRecords: Array[SAMRecord]) :
Array[(Int, (Int, String))] =
{
// Body of the function
}


I am trying to call this function in the mapPartition method as follows:

//SortedOut is RDD[(Int,(Int,Int,SAMRecord))]
val Out = SortedOut.mapPartitions(iter=> {val inArr = iter.map(x=>x._2._3).toArray
val inReg = iter.map(x=> x._1).toArray
if (inArr.length != 0)
{
println("Calling function")
variantCall(inReg(0),inArr).iterator
}
else
iter}).cache


I have checked that SortedOut has non empty partitions but still, the functional call is not taking place. Why is this code not working? I want to call this function for each of the partitions, how can I do this?

Answer

You can trigger the computation only when you do some Action.

do collect or count or foreach etc at the end to trigger evaluation

Notice collect at the end

val Out = SortedOut.mapPartitions(iter=> {val inArr = iter.map(x=>x._2._3).toArray
                                        val inReg = iter.map(x=> x._1).toArray
                                        if (inArr.length != 0)
                                        {
                                        println("Calling function")
                                        variantCall(inReg(0),inArr).iterator
                                       }
                                       else
                                        iter}).cache.collect