Tomy Tomy - 6 months ago 31
Java Question

spark reduceByKey() not shuffling for the final sum

I have an RDD containing the count for some objects and then I apply reduceByKey() on it, summing up all the elements (like in the word count example). I've saved the output of the reduceByKey transformation to a text file and I have the sum for each of the workers:

(work at LEFT null,9741)
(work at LEFT null,10073)
(work at LEFT null,10348)
(work at LEFT null,10483)
(work at LEFT null,10754)


Shouldn't it be only one item, which would be the sum of them all ?

If more detail is needed, I will provide it.

LE: the object I am trying to count is defined by

public class Pattern {
string pattern;
PatternType type;
Relation r;
}

Answer

In Spark, PairRDDFunctions.reduceByKey takes the RDD[(K, V)] and partitions the data (causing a shuffle) using the defined partitioner. If no such partitioner is provided, it uses the default HashPartitioner to decide which key value pair gets passed to which worker. If you're using a Java class as your key which doesn't override it's hashCode method, reduceByKey will decide how to partition the data based on Java's Object.hashCode. This means that identical keys will be offloaded to different workers where they will be partially reduced together. Ideally, that isn't what you want. What you want is that all objects with the same key will get reduced via the same worker. Then, when they will be shuffled after each worker reduces it's own, the combiner for all keys won't be able to match the keys based on their hash code, which explains why you're seeing only partially reduced data instead of the summed up data on a single key.

What you need to do is provide a proper hashCode and equals implementation. This is stated in the Spark documentation (thanks @VitaliyKotlyarenko):

Note: when using custom objects as the key in key-value pair operations, you must be sure that a custom equals() method is accompanied with a matching hashCode() method. For full details, see the contract outlined in the Object.hashCode() documentation

For example:

public class Pattern {
     string pattern;
     PatternType type;
     Relation r;

     @Override
     public int hashCode() {
        return 371 * pattern.hashCode();
     }

     @Override 
     public boolean equals(Object other) {
        if (this == other) return true;
        if (other == null || this.getClass() != other.getClass()) return false;

        Pattern pattern = (Pattern) other;
        return this.pattern.equals(pattern.pattern);
     }
}
Comments