My question is similar to PySpark reduceByKey on multiple values but with a somehow critical difference. I am new to PySpark, so I am for sure missing something obvious.
I have an RDD with the following structure:
(K0, ((k01,v01), (k02,v02), ...))
(Kn, ((kn1,vn1), (kn2,vn2), ...))
rdd.reduceByKey(lambda x,y: x+y)
You shouldn't use
reduceByKey here at all. It takes an associative and commutative function with signature.
(T, T) => T. It should be obvious that it is not applicable when you have
List[Tuple[U, T]] as an input and you expect
T as an output.
Since it is not exactly clear if keys or unique or not lets consider general example when we have to aggregate both locally and globally. Lets assume that
vm are simple numerics:
from functools import reduce from operator import add def agg_(xs): # For numeric values sum would be more idiomatic # but lets make it more generic return reduce(add, (x for x in xs), zero_value) zero_value = 0 merge_op = add def seq_op(acc, xs): return acc + agg_(xs) rdd = sc.parallelize([ ("K0", (("k01", 3), ("k02", 2))), ("K0", (("k03", 5), ("k04", 6))), ("K1", (("k11", 0), ("k12", -1)))]) rdd.aggregateByKey(0, seq_op, merge_op).take(2) ## [('K0', 16), ('K1', -1)]
If keys are already unique simple
mapValues will suffice:
from itertools import chain unique_keys = rdd.groupByKey().mapValues(lambda x: tuple(chain(*x))) unique_keys.mapValues(agg_).take(2) ## [('K0', 16), ('K1', -1)]