prismalytics.io - 5 months ago 6x

Python Question

I want to share this particular Apache Spark with Python solution because documentation for it is quite poor.

I wanted to calculate the average value of K/V pairs (stored in a Pairwise RDD), by KEY. Here is what the sample data looks like:

`>>> rdd1.take(10) # Show a small sample.`

[(u'2013-10-09', 7.60117302052786),

(u'2013-10-10', 9.322709163346612),

(u'2013-10-10', 28.264462809917358),

(u'2013-10-07', 9.664429530201343),

(u'2013-10-07', 12.461538461538463),

(u'2013-10-09', 20.76923076923077),

(u'2013-10-08', 11.842105263157894),

(u'2013-10-13', 32.32514177693762),

(u'2013-10-13', 26.249999999999996),

(u'2013-10-13', 10.693069306930692)]

Now the following code sequence is a

`>>> import operator`

>>> countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...}

>>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerators (i.e. the SUMs).

>>> rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide each SUM by it's denominator (i.e. COUNT)

>>> print(rdd1.collect())

[(u'2013-10-09', 11.235365503035176),

(u'2013-10-07', 23.39500642456595),

... snip ...

]

Answer

Now a much better way to do this is to use the rdd.aggregateByKey() method. Because that method is so poorly documented in the Apache Spark with Python documentation (which is why I'm writing this), until recently I had been using the above code sequence. But again, it's less efficient, **so don't do it that way unless you need to**.

Here's how to do the same using the rdd.aggregateByKey() method (**recommended**) ...

By KEY, simultaneously calculate the SUM (numerator for the average we want to compute), and COUNT (denominator for the average we want to compute).

```
>>> rdd1 = rdd1.aggregateByKey((0,0), lambda a,b: (a[0] + b, a[1] + 1),
lambda a,b: (a[0] + b[0], a[1] + b[1]))
```

Where the following is true about the meaning of each 'a' and 'b' pair above (just so you can visualize what's happening):

```
First lambda expression for Within-Partition Reduction Step::
a: is a TUPLE that holds: (runningSum, runningCount).
b: is a SCALAR that holds the next Value
Second lambda expression for Cross-Partition Reduction Step::
a: is a TUPLE that holds: (runningSum, runningCount).
b: is a TUPLE that holds: (nextPartitionsSum, nextPartitionsCount).
```

Finally, calculate the average for each KEY, and collect results.

```
>>> finalResult = rdd1.mapValues(lambda v: v[0]/v[1]).collect()
>>> print(finalResult)
[(u'2013-09-09', 11.235365503035176),
(u'2013-09-01', 23.39500642456595),
(u'2013-09-03', 13.53240060820617),
(u'2013-09-05', 13.141148418977687),
... snip ...
]
```

I hope this aggregateByKey() illustration will help others.

Source (Stackoverflow)

Comments