Violetta Zoffoli - 1 year ago 125

Python Question

My question is similar to PySpark reduceByKey on multiple values but with a somehow critical difference. I am new to PySpark, so I am for sure missing something obvious.

I have an RDD with the following structure:

`(K0, ((k01,v01), (k02,v02), ...))`

....

(Kn, ((kn1,vn1), (kn2,vn2), ...))

What I want as an output is something like

`(K0, v01+v02+...)`

...

(Kn, vn1+vn2+...)

This seems like the perfect case to use

`reduceByKey`

`rdd.reduceByKey(lambda x,y: x[1]+y[1])`

Which gives me exactly the RDD I began with. I suppose there is something wrong with my indexing, as there are nested tuples, but I have tried every possible index combination I could think of and it keeps on giving me back the initial RDD.

Is there maybe a reason why it shouldn't work with nested tuples or am I doing something wrong?

Thank you

Recommended for you: Get network issues from **WhatsUp Gold**. **Not end users.**

Answer Source

You shouldn't use `reduceByKey`

here at all. It takes an associative and commutative function with signature. `(T, T) => T`

. It should be obvious that it is not applicable when you have `List[Tuple[U, T]]`

as an input and you expect `T`

as an output.

Since it is not exactly clear if keys or unique or not lets consider general example when we have to aggregate both locally and globally. Lets assume that `v01`

, `v02`

, ... `vm`

are simple numerics:

```
from functools import reduce
from operator import add
def agg_(xs):
# For numeric values sum would be more idiomatic
# but lets make it more generic
return reduce(add, (x[1] for x in xs), zero_value)
zero_value = 0
merge_op = add
def seq_op(acc, xs):
return acc + agg_(xs)
rdd = sc.parallelize([
("K0", (("k01", 3), ("k02", 2))),
("K0", (("k03", 5), ("k04", 6))),
("K1", (("k11", 0), ("k12", -1)))])
rdd.aggregateByKey(0, seq_op, merge_op).take(2)
## [('K0', 16), ('K1', -1)]
```

If keys are already unique simple `mapValues`

will suffice:

```
from itertools import chain
unique_keys = rdd.groupByKey().mapValues(lambda x: tuple(chain(*x)))
unique_keys.mapValues(agg_).take(2)
## [('K0', 16), ('K1', -1)]
```

Recommended from our users: **Dynamic Network Monitoring from WhatsUp Gold from IPSwitch**. ** Free Download**