kpo kpo - 2 months ago 6
Scala Question

AggregateByKey fails to compile when it is in an abstract class

I'm new to both Scala and Spark, so I'm hoping someone can explain why aggregateByKey fails to compile when it is in an abstract class. This is about the simplest example I can come up with:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

abstract class AbstractKeyCounter[K] {

def keyValPairs(): RDD[(K, String)]

def processData(): RDD[(K, Int)] = {
keyValPairs().aggregateByKey(0)(
(count, key) => count + 1,
(count1, count2) => count1 + count2
)
}

}

class StringKeyCounter extends AbstractKeyCounter[String] {

override def keyValPairs(): RDD[(String, String)] = {
val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("counter"))
val data = sc.parallelize(Array("foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D"))
data.map(_.split("=")).map(v => (v(0), v(1)))
}

}


Which gives:

Error:(11, 19) value aggregateByKey is not a member of org.apache.spark.rdd.RDD[(K, String)]
keyValPairs().aggregateByKey(0)(
^


If I instead use a single concrete class, it compiles and runs successfully:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

class StringKeyCounter {

def processData(): RDD[(String, Int)] = {
val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("counter"))
val data = sc.parallelize(Array("foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D"))
val keyValPairs = data.map(_.split("=")).map(v => (v(0), v(1)))

keyValPairs.aggregateByKey(0)(
(count, key) => count + 1,
(count1, count2) => count1 + count2
)
}

}


What am I missing?

Answer

If you change:

abstract class AbstractKeyCounter[K] {

To:

abstract class AbstractKeyCounter[K : ClassTag] {

This will compile.

Why? aggregateByKey is a method of PairRDDFunctions (your RDD is implicitly converted into that class), which has the following signature:

class PairRDDFunctions[K, V](self: RDD[(K, V)])
  (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)

This means its constructor expects implicit values of types ClassTag[K] and vt: ClassTag[V]. Your abstract class has no knowledge of what K is, and therefore cannot provide a matching implicit value. This means the implicit conversion into PairRDDFunctions "fails" (compiler doesn't perform the conversion) and therefore the method aggregateByKey can't be found.

Adding [K : ClassTag] is shorthand for adding an implicit argument implicit kt: ClassTag[K] to the abstract class constructor, which is then used by compiler and passed to the constructor of PairRDDFunctions.

For more about ClassTags and what they're good for see this good article.