1290 1290 - 1 year ago 116
Scala Question

MapValues and Explode in RDD

I have this sample RDD below (called

below). The dataset is a tuple of
(String, Int)

(some | random | value, 10)
(some | random | value, 11)
(some | random | value, 12)

And I want to get this output:

(some, 10)
(random, 10)
(value, 10)
(some, 11)
(random, 11)
(value, 11)
(some, 12)
(random, 12)
(value, 12)

I have this Scala code to attempt the above transformation:

rdd.map(tuple => tuple._1.split("|").foreach(elemInArray => (elemInArray, tuple._2)))

In this code I iterate through the entire dataset and split the first part of the tuple by
. Then I iterate through each element in that array returned by
and create a tuple with each
and the count that I get form

For some reason I keep getting this result:


Does anyone know the issue? I can't seem to find where I went wrong.

Answer Source

You actually need to use flatMap for this:

val lt = List(("some | random | value", 10),
              ("some | random | value", 11),
              ("some | random | value", 12))

val convert: ((String, Int)) => List[(String, Int)] = tuple => tuple._1.split('|').map(str =>
  (str, tuple._2)).toList

val t = lt.flatMap(convert)

As we can see, defining the convert function can be very useful, because we can ensure that each element is correctly handled by passing that function a single element. We can then pass that same function to flatMap, which will aggregate the list of results that convert produces into a single list.

The above yields:

t: List[(String, Int)] = List((some ,10), 
                              ( random ,10), 
                              ( value,10), 
                              (some ,11), 
                              ( random ,11), 
                              ( value,11), 
                              (some ,12), 
                              ( random ,12),
                              ( value,12))

Obviously, I didn't bother to deal with the extra whitespace characters in the result, but this is easily handled by updating your convert function with trim:

val convert: ((String, Int)) => List[(String, Int)] = tuple => tuple._1.split('|').map(str =>
  (str.trim, tuple._2)).toList
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download