Sumit Sumit - 3 days ago 6
Scala Question

How to optimize below spark code (scala)?

I have some huge files (of 19GB, 40GB etc.). I need to execute following algorithm on these files:


  1. Read the file

  2. Sort it on the basis of 1 column

  3. Take 1st 70% of the data:

    a) Take all the distinct records of the subset of the columns

    b) write it to train file

  4. Take the last 30% of the data:

    a) Take all the distinct records of the subset of the columns

    b) write it to test file
    I tried running following code in spark (using Scala).

    import scala.collection.mutable.ListBuffer

    import java.io.FileWriter

    import org.apache.spark.sql.functions.year

    val offers = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") // Use first line of all files as header
    .option("inferSchema", "true") // Automatically infer data types
    .option("delimiter", ",")
    .load("input.txt")
    val csvOuterJoin = offers.orderBy("utcDate")
    val trainDF = csvOuterJoin.limit((csvOuterJoin.count*.7).toInt)
    val maxTimeTrain = trainDF.agg(max("utcDate"))
    val maxtimeStamp = maxTimeTrain.collect()(0).getTimestamp(0)
    val testDF = csvOuterJoin.filter(csvOuterJoin("utcDate") > maxtimeStamp)
    val inputTrain = trainDF.select("offerIdClicks","userIdClicks","userIdOffers","offerIdOffers").distinct
    val inputTest = testDF.select("offerIdClicks","userIdClicks","userIdOffers","offerIdOffers").distinct
    inputTrain.rdd.coalesce(1,false).saveAsTextFile("train.csv")
    inputTest.rdd.coalesce(1,false).saveAsTextFile("test.csv")


    This is how I initiate spark-shell:

    ./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.4.0 --total-executor-cores 70 --executor-memory 10G --driver-memory 20G



I execute this code on a distributed cluster with 1 master and many slaves each having sufficient amount of RAM. As of now, this code ends up taking a lot of memory and I get java heap space issues.

Is there a way to optimize the above code (preferably in spark)? I appreciate any kind of minimal help in optimizing the above code.

Answer

The problem is you don't distribute at all. And the source is here:

val csvOuterJoin = offers.orderBy("utcDate")
val trainDF = csvOuterJoin.limit((csvOuterJoin.count*.7).toInt)

limit operation is not designed for large scale operations and it moves all records to a single partition:

val df = spark.range(0, 10000, 1, 1000)
df.rdd.partitions.size
Int = 1000
// Take all records by limit
df.orderBy($"id").limit(10000).rdd.partitions.size
Int = 1

You can use RDD API:

val ordered = df.orderBy($"utcDate")
val cnt = df.count * 0.7

val train = spark.createDataFrame(ordered.rdd.zipWithIndex.filter {
  case (_, i) => i <= cnt 
}.map(_._1), ordered.schema)

val test = spark.createDataFrame(ordered.rdd.zipWithIndex.filter {
  case (_, i) => i > cnt 
}.map(_._1), ordered.schema)
Comments