Vikram Singh Chandel - 1 year ago 68

Scala Question

I am trying to count the number of iteration of each line via spark using scala.

Following is my input:

1 vikram

2 sachin

3 shobit

4 alok

5 akul

5 akul

1 vikram

1 vikram

3 shobit

10 ashu

5 akul

1 vikram

2 sachin

7 vikram

now i create 2 separate RDDs as follows.

`val f1 = sc.textFile("hdfs:///path to above data file")`

val m1 = f1.map( s => (s.split(" ")(0),1) ) //creating a tuple (key,1)

//now if i create a RDD as

val rd1 = m1.reduceByKey((a,b) => a+b )

rd1.collect().foreach(println)

//I get a proper output i.e (it gives correct output every time)

//output: (4,1) (2,2) (7,1) (5,3) (3,2) (1,4) (10,1)

//but if i create a RDD as

val rd2 = m1.reduceByKey((a,b) => a+1 )

rd2.collect().foreach(println)

//I get a inconsistent result i.e some times i get this (WRONG)

//output: (4,1) (2,2) (7,1) (5,2) (3,2) (1,2) (10,1)

//and sometimes I get this as output (CORRECT)

//output: (4,1) (2,2) (7,1) (5,3) (3,2) (1,4) (10,1)

I am unable yo understand why is this happening and where to use what. I have also tried creating RDD as

`val m2 = f1.map(s => (s,1))`

val rd3 = m2.reduceByKey((a,b) => a+1 )

// Then also same issue occurs with a+1 but every thing works fine with a+b

Answer Source

`reduceByKey`

assumes the passed function is *commutative* and *associative* (as docs clearly state).
And - your first function `(a, b) => a + b`

*is*, but `(a, b) => a+1`

*isn't*.

**WHY?**
For one thing - `reduceByKey`

applies the supplied function to each *partition*, and then to the *combined results* of all partitions. In other words, `b`

isn't always `1`

, so using `a+1`

simply isn't correct.

Think of the following scenario - the input contains 4 records, split into two partitions:

```
(aa, 1)
(aa, 1)
(aa, 1)
(cc, 1)
```

`reduceByKey(f)`

on this input might be calculated as follows:

```
val intermediate1 = f((aa, 1), (aa, 1))
val intermediate2 = f((aa, 1), (cc, 1))
val result = f(intermediate2, intermediate1)
```

Now, let's follow this with `f = (a, b) => a + b`

```
val intermediate1 = f((aa, 1), (aa, 1)) // (aa, 2)
val intermediate2 = f((aa, 1), (cc, 1)) // (aa, 1), (cc, 1)
val result = f(intermediate2, intermediate1) // (aa, 3), (cc, 1)
```

And with `f = (a, b) => a + 1`

:

```
val intermediate1 = f((aa, 1), (bb, 1)) // (aa, 2)
val intermediate2 = f((aa, 1), (cc, 1)) // (aa, 1), (cc, 1)
// this is where it goes wrong:
val result = f(intermediate2, intermediate1) // (aa, 2), (cc, 1)
```

Main thing is - order of intermediate calculations isn't guaranteed and may change between executions, and for the latter case of non-commutative function this means result are sometimes wrong.