ab_tech_sp - 1 year ago 68

Python Question

I am looking for some better explanation of the aggregate functionality that is available via spark in python.

The example I have is as follows (using pyspark from Spark 1.2.0 version)

`sc.parallelize([1,2,3,4]).aggregate(`

(0, 0),

(lambda acc, value: (acc[0] + value, acc[1] + 1)),

(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

Output:

`(10, 4)`

I get the expected result

`(10,4)`

`1+2+3+4`

`(1,0)`

`(0,0)`

`sc.parallelize([1,2,3,4]).aggregate(`

(1, 0),

(lambda acc, value: (acc[0] + value, acc[1] + 1)),

(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

Output:

`(19, 4)`

The value increases by 9. If I change it to

`(2,0)`

`(28,4)`

Can someone explain to me how this value is calculated? I expected the value to go up by 1 not by 9, expected to see

`(11,4)`

`(19,4)`

Answer Source

Aggregate lets you transform and combine the values of the RDD at will.

It uses two functions:

The first one transforms and adds the elements of the original collection [T] in a local aggregate [U] and takes the form: (U,T) => U. You can see it as a fold and therefore it also requires a zero for that operation. This operation is applied locally to each partition in parallel.

Here is where the key of the question lies: The only value that should be used here is the ZERO value for the reduction operation. This operation is executed locally on each partition, therefore, adding anything to that zero value will add to the result multiplied by the number of partitions of the RDD.

The second operation takes 2 values of the result type of the previous operation [U] and combines it in to one value. This operation will reduce the partial results of each partition and produce the actual total.

For example: Given an RDD of Strings:

```
val rdd:RDD[String] = ???
```

Let's say you want to the aggregate of the length of the strings in that RDD, so you would do:

1) The first operation will transform strings into size (int) and accumulate the values for size.

```
val stringSizeCummulator: (Int, String) => Int = (total, string) => total + string.lenght`
```

2) provide the ZERO for the addition operation (0)

```
val ZERO = 0
```

3) an operation to add two integers together:

```
val add: (Int, Int) => Int = _ + _
```

Putting it all together:

```
rdd.aggregate(ZERO, stringSizeCummulator, add)
```

So, why is the ZERO needed? When the cummulator function is applied to the first element of a partition, there's no running total. ZERO is used here.

Eg. My RDD is: - Partition 1: ["Jump", "over"] - Partition 2: ["the", "wall"]

This will result:

P1:

- stringSizeCummulator(ZERO, "Jump") = 4
- stringSizeCummulator(4, "over") = 8

P2:

- stringSizeCummulator(ZERO, "the") = 3
- stringSizeCummulator(3, "wall") = 7

Reduce: add(P1, P2) = 15