Vikram Singh Chandel - 1 year ago 130
Scala Question

# Why result of Spark reduceByKey is not consistent

I am trying to count the number of iteration of each line via spark using scala.

Following is my input:

1 vikram

2 sachin

3 shobit

4 alok

5 akul

5 akul

1 vikram

1 vikram

3 shobit

10 ashu

5 akul

1 vikram

2 sachin

7 vikram

now i create 2 separate RDDs as follows.

``````val f1 = sc.textFile("hdfs:///path to above data file")
val m1 = f1.map( s => (s.split(" ")(0),1) ) //creating a tuple (key,1)
//now if i create a RDD as
val rd1 = m1.reduceByKey((a,b) => a+b )
rd1.collect().foreach(println)
//I get a proper output i.e (it gives correct output every time)
//output: (4,1) (2,2) (7,1) (5,3) (3,2) (1,4) (10,1)

//but if i create a RDD as
val rd2 = m1.reduceByKey((a,b) => a+1 )
rd2.collect().foreach(println)
//I get a inconsistent result i.e some times i get this (WRONG)
//output: (4,1) (2,2) (7,1) (5,2) (3,2) (1,2) (10,1)
//and sometimes I get this as output (CORRECT)
//output: (4,1) (2,2) (7,1) (5,3) (3,2) (1,4) (10,1)
``````

I am unable yo understand why is this happening and where to use what. I have also tried creating RDD as

``````val m2 = f1.map(s => (s,1))
val rd3 = m2.reduceByKey((a,b) => a+1 )
// Then also same issue occurs with a+1 but every thing works fine with a+b
``````

`reduceByKey` assumes the passed function is commutative and associative (as docs clearly state). And - your first function `(a, b) => a + b` is, but `(a, b) => a+1` isn't.

WHY? For one thing - `reduceByKey` applies the supplied function to each partition, and then to the combined results of all partitions. In other words, `b` isn't always `1`, so using `a+1` simply isn't correct.

Think of the following scenario - the input contains 4 records, split into two partitions:

``````(aa, 1)
(aa, 1)

(aa, 1)
(cc, 1)
``````

`reduceByKey(f)` on this input might be calculated as follows:

``````val intermediate1 = f((aa, 1), (aa, 1))
val intermediate2 = f((aa, 1), (cc, 1))

val result = f(intermediate2, intermediate1)
``````

Now, let's follow this with `f = (a, b) => a + b`

``````val intermediate1 = f((aa, 1), (aa, 1))       // (aa, 2)
val intermediate2 = f((aa, 1), (cc, 1))       // (aa, 1), (cc, 1)

val result = f(intermediate2, intermediate1)  // (aa, 3), (cc, 1)
``````

And with `f = (a, b) => a + 1`:

``````val intermediate1 = f((aa, 1), (bb, 1))       // (aa, 2)
val intermediate2 = f((aa, 1), (cc, 1))       // (aa, 1), (cc, 1)

// this is where it goes wrong:
val result = f(intermediate2, intermediate1)  // (aa, 2), (cc, 1)
``````

Main thing is - order of intermediate calculations isn't guaranteed and may change between executions, and for the latter case of non-commutative function this means result are sometimes wrong.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download