Havnar Havnar - 6 months ago 54
Scala Question

How do I iterate RDD's in apache spark (scala)

I use the following command to fill an RDD with a bunch of arrays containing 2 strings ["filename", "content"].

Now I want to iterate over every of those occurrences to do something with every filename and content.

val someRDD = sc.wholeTextFiles("hdfs://localhost:8020/user/cloudera/*")

I can't seem to find any documentation on how to do this however.

So what I want is this:

foreach occurrence-in-the-rdd{
//do stuff with the array found on loccation n of the RDD


You call various methods on the RDD that accept functions as parameters.

// set up an example
val sparkConf = new SparkConf().setMaster("local").setAppName("Example")
val sc = new SparkContext(sparkConf)
val testData = Array(Array(1,2,3), Array(4,5,6,7,8))
val testRDD = sc.parallelize(testData, 2)

// print it
testRDD.collect().foreach(a => println(a.size))

// create an RDD with the array sizes and print it
val countRDD = testRDD.map(a => a.size)
countRDD.collect().foreach(a => println(a))

// create an RDD with just the longer arrays and print each array
val bigRDD = testRDD.filter(a => a.size > 3)
bigRDD.collect().foreach(a => {
    a.foreach(e => print(e + " "))

Notice that the functions you write accept a single RDD element as input, and return data of some uniform type, so you create an RDD of the latter type. For example, countRDD is an RDD[Int], while bigRDD is still an RDD[Array[Int]].

It will probably be tempting at some point to write a foreach that modifies some other data, but you should resist for reasons described in this question and answer.

Edit: Don't try to print large RDDs

Several readers have asked about using collect() and println() to see their results, as in the example above. Of course, this only works if you're running in an interactive mode like the Spark REPL (read-eval-print-loop.) It's best to call collect() on the RDD to get a sequential array for orderly printing. But collect() may bring back too much data and in any case too much may be printed. Here are some alternative ways to get insight into your RDDs if they're large:

  1. RDD.take(): This gives you fine control on the number of elements you get but not where they came from -- defined as the "first" ones which is a concept dealt with by various other questions and answers here.

    // take() returns an Array so no need to collect()
    myHugeRDD.take(20).foreach(a => println(a))
  2. RDD.sample(): This lets you (roughly) control the fraction of results you get, whether sampling uses replacement, and even optionally the random number seed.

    // sample() does return an RDD so you may still want to collect()
    myHugeRDD.sample(true, 0.01).collect().foreach(a => println(a))
  3. RDD.takeSample(): This is a hybrid: using random sampling that you can control, but both letting you specify the exact number of results and returning an Array.

    // takeSample() returns an Array so no need to collect() 
    myHugeRDD.takeSample(true, 20).foreach(a => println(a))
  4. RDD.count(): Sometimes the best insight comes from how many elements you ended up with -- I often do this first.