Violetta Zoffoli Violetta Zoffoli - 3 months ago 32
Python Question

Pyspark reduceByKey on nested tuples

My question is similar to PySpark reduceByKey on multiple values but with a somehow critical difference. I am new to PySpark, so I am for sure missing something obvious.

I have an RDD with the following structure:

(K0, ((k01,v01), (k02,v02), ...))
....
(Kn, ((kn1,vn1), (kn2,vn2), ...))


What I want as an output is something like

(K0, v01+v02+...)
...
(Kn, vn1+vn2+...)


This seems like the perfect case to use
reduceByKey
and at first I thought of something like

rdd.reduceByKey(lambda x,y: x[1]+y[1])


Which gives me exactly the RDD I began with. I suppose there is something wrong with my indexing, as there are nested tuples, but I have tried every possible index combination I could think of and it keeps on giving me back the initial RDD.

Is there maybe a reason why it shouldn't work with nested tuples or am I doing something wrong?

Thank you

Answer

You shouldn't use reduceByKey here at all. It takes an associative and commutative function with signature. (T, T) => T. It should be obvious that it is not applicable when you have List[Tuple[U, T]] as an input and you expect T as an output.

Since it is not exactly clear if keys or unique or not lets consider general example when we have to aggregate both locally and globally. Lets assume that v01, v02, ... vm are simple numerics:

from functools import reduce
from operator import add

def agg_(xs):
    # For numeric values sum would be more idiomatic
    # but lets make it more generic
    return reduce(add, (x[1] for x in xs), zero_value)

zero_value = 0
merge_op = add
def seq_op(acc, xs):
    return acc + agg_(xs)

rdd = sc.parallelize([
    ("K0", (("k01", 3), ("k02", 2))),
    ("K0", (("k03", 5), ("k04", 6))),
    ("K1", (("k11", 0), ("k12", -1)))])

rdd.aggregateByKey(0, seq_op, merge_op).take(2)
## [('K0', 16), ('K1', -1)]

If keys are already unique simple mapValues will suffice:

from itertools import chain

unique_keys = rdd.groupByKey().mapValues(lambda x: tuple(chain(*x)))
unique_keys.mapValues(agg_).take(2)
## [('K0', 16), ('K1', -1)]
Comments