Vikram Singh Chandel Vikram Singh Chandel - 2 months ago 19
Scala Question

Why result of Spark reduceByKey is not consistent

I am trying to count the number of iteration of each line via spark using scala.

Following is my input:

1 vikram

2 sachin

3 shobit

4 alok

5 akul

5 akul

1 vikram

1 vikram

3 shobit

10 ashu

5 akul

1 vikram

2 sachin

7 vikram

now i create 2 separate RDDs as follows.

val f1 = sc.textFile("hdfs:///path to above data file")
val m1 = f1.map( s => (s.split(" ")(0),1) ) //creating a tuple (key,1)
//now if i create a RDD as
val rd1 = m1.reduceByKey((a,b) => a+b )
rd1.collect().foreach(println)
//I get a proper output i.e (it gives correct output every time)
//output: (4,1) (2,2) (7,1) (5,3) (3,2) (1,4) (10,1)

//but if i create a RDD as
val rd2 = m1.reduceByKey((a,b) => a+1 )
rd2.collect().foreach(println)
//I get a inconsistent result i.e some times i get this (WRONG)
//output: (4,1) (2,2) (7,1) (5,2) (3,2) (1,2) (10,1)
//and sometimes I get this as output (CORRECT)
//output: (4,1) (2,2) (7,1) (5,3) (3,2) (1,4) (10,1)


I am unable yo understand why is this happening and where to use what. I have also tried creating RDD as

val m2 = f1.map(s => (s,1))
val rd3 = m2.reduceByKey((a,b) => a+1 )
// Then also same issue occurs with a+1 but every thing works fine with a+b

Answer

reduceByKey assumes the passed function is commutative and associative (as docs clearly state). And - your first function (a, b) => a + b is, but (a, b) => a+1 isn't.

WHY? For one thing - reduceByKey applies the supplied function to each partition, and then to the combined results of all partitions. In other words, b isn't always 1, so using a+1 simply isn't correct.

Think of the following scenario - the input contains 4 records, split into two partitions:

(aa, 1)
(aa, 1)

(aa, 1)
(cc, 1)

reduceByKey(f) on this input might be calculated as follows:

val intermediate1 = f((aa, 1), (aa, 1)) 
val intermediate2 = f((aa, 1), (cc, 1))

val result = f(intermediate2, intermediate1)

Now, let's follow this with f = (a, b) => a + b

val intermediate1 = f((aa, 1), (aa, 1))       // (aa, 2)
val intermediate2 = f((aa, 1), (cc, 1))       // (aa, 1), (cc, 1)

val result = f(intermediate2, intermediate1)  // (aa, 3), (cc, 1)

And with f = (a, b) => a + 1:

val intermediate1 = f((aa, 1), (bb, 1))       // (aa, 2)
val intermediate2 = f((aa, 1), (cc, 1))       // (aa, 1), (cc, 1)

// this is where it goes wrong:
val result = f(intermediate2, intermediate1)  // (aa, 2), (cc, 1)

Main thing is - order of intermediate calculations isn't guaranteed and may change between executions, and for the latter case of non-commutative function this means result are sometimes wrong.

Comments