Love Hasija Love Hasija - 2 months ago 9x
Scala Question

Processing multiple files as independent RDD's in parallel

I have a scenario where a certain number of operations including a group by has to be applied on a number of small (~300MB each) files. The operation looks like this..


Now to process it on multiple files, I can use a wildcard "/**/*.csv" however, that creates a single RDD and partitions it to for the operations. However, looking at the operations, it is a group by and involves lot of shuffle which is unnecessary if the files are mutually exclusive.

What, I am looking at is, a way where i can create independent RDD's on files and operate on them independently.


It is more an idea than a full solution and I haven't tested it yet.

You can start with extracting your data processing pipeline into a function.

def pipeline(f: String, n: Int) = {
        .option("header", "true")
        .cache // Cache so we can force computation later

If your files are small you can adjust n parameter to use as small number of partitions as possible to fit data from a single file and avoid shuffling. It means you are limiting concurrency but we'll get back to this issue later.

val n: Int = ??? 

Next you have to obtain a list of input files. This step depends on a data source but most of the time it is more or less straightforward:

val files: Array[String] = ???

Next you can map above list using pipeline function:

val rdds = => pipeline(f, n))

Since we limit concurrency at the level of the single file we want to compensate by submitting multiple jobs. Lets add a simple helper which forces evaluation and wraps it with Future

import scala.concurrent._

def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future {
    df.rdd.foreach(_ => ()) // Force computation

Finally we can use above helper on the rdds:

val result = Future.sequence( => pipelineToFuture(rdd)).toList

Depending on your requirements you can add onComplete callbacks or use reactive streams to collect the results.