Canovice Canovice - 1 year ago 258
Scala Question

In Spark word count RDD, how to index value for a specific key

I have an RDD named wc that has (key,value) pairs, where the key is the word, and the value is the count of how many times the word appeared in some document. For example:

scala> :type wc
org.apache.spark.rdd.RDD[(String, Int)]

scala> wc.take(10).foreach(println)
(means,1)
(under,2)
(this,4)
(Because,1)
(Python,2)
(agree,1)
(cluster,1)
(its,1)
(follows,1)
(general,2)


Without having to print every single pair in the RDD, how could I index this RDD to obtain the count for a specific word (say, Python). I've tried a bunch of things including:

wc.filter(_.1 == "Python")
// error: ')' expected but double literal found.
// wc.filter(_.1 == "Python")

wc.filter(_.2 == "Python")
// error: ')' expected but double literal found.
// wc.filter(_.2 == "Python")

wc.filter(wc.keys == "Python")
// type mismatch;
// found : Boolean
// required: ((String, Int)) => Boolean
// wc.filter(wc.keys == "Python")

wc.filter((k,v) => k == "Python")
// wrong number of parameters; expected = 1
// wc.filter((k,v) => k == "Python")


I'm not familiar enough with spark syntax to do this, but I think I'm on the right track. Any thoughts appreciated on this - indexing an RDD by its key is a simple yet important operation and I'd like to understand this better.

Thanks in advance!

NOTE - I'm doing this spark coding in SCALA, not python. My class is using SCALA and i am interested in learning SCALAs syntax.

EDIT - The output should be either '2', or (Python, 2), although I'd probably prefer the latter.

Answer Source

The issue is very subtle...you forgot _ (underscore) in your Scala code.

Instead of _.1 you should do _._1

If you want to access 5th element in wc, you could use RDD.zipWithIndex operator:

zipWithIndex(): RDD[(T, Long)]

Zips this RDD with its element indices. The ordering is first based on the partition index and then the ordering of items within each partition. So the first item in the first partition gets index 0, and the last item in the last partition receives the largest index.

This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type. This method needs to trigger a spark job when this RDD contains more than one partitions.

Something like follows should work:

scala> wc.zipWithIndex.filter(_._2 == 5).first._1
res0: (String, Int) = (Python,2)
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download