Love Hasija Love Hasija - 3 months ago 25
Scala Question

Processing multiple files as independent RDD's in parallel

I have a scenario where a certain number of operations including a group by has to be applied on a number of small (~300MB each) files. The operation looks like this..

df.groupBy(....).agg(....)


Now to process it on multiple files, I can use a wildcard "/**/*.csv" however, that creates a single RDD and partitions it to for the operations. However, looking at the operations, it is a group by and involves lot of shuffle which is unnecessary if the files are mutually exclusive.

What, I am looking at is, a way where i can create independent RDD's on files and operate on them independently.

Answer

It is more an idea than a full solution and I haven't tested it yet.

You can start with extracting your data processing pipeline into a function.

def pipeline(f: String, n: Int) = {
    sqlContext
        .read
        .format("com.databricks.spark.csv")
        .option("header", "true")
        .load(f)
        .repartition(n)
        .groupBy(...)
        .agg(...)
        .cache // Cache so we can force computation later
}

If your files are small you can adjust n parameter to use as small number of partitions as possible to fit data from a single file and avoid shuffling. It means you are limiting concurrency but we'll get back to this issue later.

val n: Int = ??? 

Next you have to obtain a list of input files. This step depends on a data source but most of the time it is more or less straightforward:

val files: Array[String] = ???

Next you can map above list using pipeline function:

val rdds = files.map(f => pipeline(f, n))

Since we limit concurrency at the level of the single file we want to compensate by submitting multiple jobs. Lets add a simple helper which forces evaluation and wraps it with Future

import scala.concurrent._
import ExecutionContext.Implicits.global

def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future {
    df.rdd.foreach(_ => ()) // Force computation
    df
}

Finally we can use above helper on the rdds:

val result = Future.sequence(
   rdds.map(rdd => pipelineToFuture(rdd)).toList
)

Depending on your requirements you can add onComplete callbacks or use reactive streams to collect the results.