prismalytics.io - 2 years ago 97
Python Question

# Calculating the averages for each KEY in a Pairwise (K,V) RDD in Spark with Python

I want to share this particular Apache Spark with Python solution because documentation for it is quite poor.

I wanted to calculate the average value of K/V pairs (stored in a Pairwise RDD), by KEY. Here is what the sample data looks like:

``````>>> rdd1.take(10) # Show a small sample.
[(u'2013-10-09', 7.60117302052786),
(u'2013-10-10', 9.322709163346612),
(u'2013-10-10', 28.264462809917358),
(u'2013-10-07', 9.664429530201343),
(u'2013-10-07', 12.461538461538463),
(u'2013-10-09', 20.76923076923077),
(u'2013-10-08', 11.842105263157894),
(u'2013-10-13', 32.32514177693762),
(u'2013-10-13', 26.249999999999996),
(u'2013-10-13', 10.693069306930692)]
``````

Now the following code sequence is a less than optimal way to do it, but it does work. It is what I was doing before I figured out a better solution. It's not terrible but -- as you'll see in the answer section -- there is a more concise, efficient way.

``````>>> import operator
>>> countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...}
>>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerators (i.e. the SUMs).
>>> rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide each SUM by it's denominator (i.e. COUNT)
>>> print(rdd1.collect())
[(u'2013-10-09', 11.235365503035176),
(u'2013-10-07', 23.39500642456595),
... snip ...
]
``````

Now a much better way to do this is to use the rdd.aggregateByKey() method. Because that method is so poorly documented in the Apache Spark with Python documentation (which is why I'm writing this), until recently I had been using the above code sequence. But again, it's less efficient, so don't do it that way unless you need to.

Here's how to do the same using the rdd.aggregateByKey() method (recommended) ...

By KEY, simultaneously calculate the SUM (numerator for the average we want to compute), and COUNT (denominator for the average we want to compute).

``````>>> rdd1 = rdd1.aggregateByKey((0,0), lambda a,b: (a[0] + b,    a[1] + 1),
lambda a,b: (a[0] + b[0], a[1] + b[1]))
``````

Where the following is true about the meaning of each 'a' and 'b' pair above (just so you can visualize what's happening):

``````   First lambda expression for Within-Partition Reduction Step::
a: is a TUPLE that holds: (runningSum, runningCount).
b: is a SCALAR that holds the next Value

Second lambda expression for Cross-Partition Reduction Step::
a: is a TUPLE that holds: (runningSum, runningCount).
b: is a TUPLE that holds: (nextPartitionsSum, nextPartitionsCount).
``````

Finally, calculate the average for each KEY, and collect results.

``````>>> finalResult = rdd1.mapValues(lambda v: v[0]/v[1]).collect()
>>> print(finalResult)
[(u'2013-09-09', 11.235365503035176),
(u'2013-09-01', 23.39500642456595),
(u'2013-09-03', 13.53240060820617),
(u'2013-09-05', 13.141148418977687),
... snip ...
]
``````

I hope this aggregateByKey() illustration will help others.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download