Paul Trehiou Paul Trehiou - 18 days ago 14
Scala Question

Repartition with Apache Spark

The problem



I am trying to repartition a dataset so that all rows that have the same number in a specified collumn of intergers are in the same partition.

What is working



When I use the 1.6 API (in Java) with RDD I use a hash partitioner and this work as expected. For example if I print the modulo of each value of this collumn for each row I get the same modulo in a given partition (I read the partition by manually reading the content saved with saveAsHadoopFile).

It is not working as expected with the latest API



But now I am trying to use the 2.0.1 API (in Scala) and the Datasets which have a repartition method that take a number of partitions and a collumn and save this DataSet as a parquet file. The results are not the same if I look in the partitions the rows are not haspartitioned given this collumn.

Answer

To save partitioned Dataset you can use either:

  • DataFrameWriter.partitionBy - available since Spark 1.6

    df.write.partitionBy("someColumn").format(...).save()
    
  • DataFrameWriter.bucketBy - available since Spark 2.0

    df.write.bucketBy("someColumn").format(...).save()
    

Using df.partitionBy("someColumn").write.format(...).save should work as well but Dataset API doesn't use hashcodes. It uses MurmurHash so results will be different than the results from HashParitioner in RDD API and trivial checks (like the one you described) won't work.

val oldHashCode = udf((x: Long) => x.hashCode)

// https://github.com/apache/spark/blob/v2.0.1/core/src/main/scala/org/apache/spark/util/Utils.scala#L1596-L1599
val nonNegativeMode = udf((x: Int, mod: Int) => {
  val rawMod = x % mod
  rawMod + (if (rawMod < 0) mod else 0)
})

val df = spark.range(0, 10)

val oldPart = nonNegativeMode(oldHashCode($"id"), lit(3))
val newPart = nonNegativeMode(hash($"id"), lit(3))

df.select($"*", oldPart, newPart).show
+---+---------------+--------------------+
| id|UDF(UDF(id), 3)|UDF(hash(id, 42), 3)|
+---+---------------+--------------------+
|  0|              0|                   1|
|  1|              1|                   2|
|  2|              2|                   2|
|  3|              0|                   0|
|  4|              1|                   2|
|  5|              2|                   2|
|  6|              0|                   0|
|  7|              1|                   0|
|  8|              2|                   2|
|  9|              0|                   2|
+---+---------------+--------------------+

One possible gotcha is that DataFrame writer can merge multiple small files to reduce the cost so data from different partitions can be put in a single file.

Comments