Justin Raymond Justin Raymond - 1 month ago 11
Scala Question

spark `reduceGroups` error overloaded method with alternatives

Running the spark-shell with Spark version 2.0.1 and Scala version 2.11.8.

The following code fails to type check:

val is = sc.parallelize(0 until 100)
val ds = is.map{i => (s"${i%10}", i)}
val gs = ds.groupByKey(r => r._1)
gs.reduceGroups((v: ((String, Int), (String, Int))) => (v._1._1, v._1._2 + v._2._2))


The error message is

<console>:32: error: overloaded method value reduceGroups with alternatives:
(f: org.apache.spark.api.java.function.ReduceFunction[(String, Int)])org.apache.spark.sql.Dataset[(String, (String, Int))] <and>
(f: ((String, Int), (String, Int)) => (String, Int))org.apache.spark.sql.Dataset[(String, (String, Int))]
cannot be applied to ((((String, Int), (String, Int))) => (String, Int))
gs.reduceGroups((r : ((String, Int), (String, Int))) => (r._1._1, r._1._2 + r._2._2))


As far as I can tell, the lambda I pass to
reduceGroups
exactly matches the signature required by the second alternative.

Answer

reduceGroups expects a function which takes two arguments, while function you pass is a functions of a single argument. Comparing signatures you pass:

((V, V)) ⇒ V

while expected is:

(V, V) ⇒ V

where V is (String, Int).

You can use:

gs.reduceGroups(
  (v1: (String, Int), v2: (String, Int)) => (v1._1, v1._2 + v2._2)
)

A more concise solution, which doesn't duplicate the keys:

spark.range(0, 100)
  .groupByKey(i => s"${i % 10}")
  .reduceGroups(_ + _)