Sohaib - 1 year ago 106
Scala Question

# How does HashPartitioner work?

I read up on the documentation of HashPartitioner. Unfortunately nothing much was explained except for the API calls. I am under the assumption that HashPartitioner partitions the distributed set based on the hash of the keys. For example if my data is like

``````(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)
``````

So Partioner would put this into different partitions with same keys falling in the same partition. However I do not understand the significance of the constructor argument

``````new HashPartitoner(numPartitions) //What does numPartitions do?
``````

For the above dataset how would the results differ if I did

``````new HashPartitoner(1)
new HashPartitoner(2)
new HashPartitoner(10)
``````

So how does HashPartitioner work actually?

Well, lets make your dataset marginally more interesting:

``````val rdd = sc.parallelize(for {
x <- 1 to 3
y <- 1 to 2
} yield (x, None), 8)
``````

We have six elements:

``````scala> rdd.count
res32: Long = 6
``````

no partirioner:

``````scala> rdd.partitioner
res33: Option[org.apache.spark.Partitioner] = None
``````

and eight partitions:

``````scala> rdd.partitions.length
res35: Int = 8
``````

Now lets define small helper to count number of elements per partition:

``````def countByPartition(rdd: RDD[(Int, None.type)]) = {
rdd.mapPartitions(iter => Iterator(iter.length))
}
``````

Since we don't have partitioner our dataset is distributed uniformly between partitions:

``````scala> countByPartition(rdd).collect()
res43: Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)
``````

Now lets repartition our dataset:

``````import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))
``````

Since parameter passed to `HashPartitioner` defines number of partitions we have expect one partition:

``````scala> rddOneP.partitions.length
res45: Int = 1
``````

Since we have only one partition it contains all elements:

`````` scala> countByPartition(rddOneP).collect
res48: Array[Int] = Array(6)
``````

Same way if we use `HashPartitioner(2)`

``````val rddTwoP = rdd.partitionBy(new HashPartitioner(2))
``````

we'll get 2 partitions:

``````scala> rddTwoP.partitions.length
res50: Int = 2
``````

Since `rdd` is partitioned by key data won't be distributed uniformly anymore:

``````scala> countByPartition(rddTwoP).collect()
res51: Array[Int] = Array(2, 4)
``````

Because with have three keys and only two different values of `hashCode` mod `numPartitions` there is nothing unexpected here:

``````scala> (1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
res55: scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))
``````

Just to confirm above:

``````scala> rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
res58: Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))
``````

Finally with `HashPartitioner(10)` we get ten partitions, three non-empty with 2 elements each:

``````scala> val rddTenP = rdd.partitionBy(new HashPartitioner(10))
scala> rddTenP.partitions.length
res61: Int = 10

scala> countByPartition(rddTenP).collect()
res62: Array[Int] = Array(0, 2, 2, 2, 0, 0, 0, 0, 0, 0)
``````

### Summary

• `HashPartitioner` takes a single argument which defines number of partitions
• values are assigned to partitions using `hashCode` of keys
• if distribution of keys is not uniform you can end up in situations when part of your cluster is idle
• keys have to be hashable. You can check my answer for A list as a key for PySpark's reduceByKey to read about PySpark specific issues. Another possible problem is highlighted by HashPartitioner documentation:

Java arrays have hashCodes that are based on the arrays' identities rather than their contents, so attempting to partition an RDD[Array[]] or RDD[(Array[], _)] using a HashPartitioner will produce an unexpected or incorrect result.

• In Python 3 you have to make sure that hashing is consistent. See What does Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED mean in pyspark?