martingms martingms - 1 month ago 22
Scala Question

How to use Spark's .newAPIHadoopRDD() from Java

I am trying to port an example written in Scala (from the Apache Spark project) into Java, and running into some issues.

The code

val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),
classOf[CqlPagingInputFormat],
classOf[java.util.Map[String,ByteBuffer]],
classOf[java.util.Map[String,ByteBuffer]])


from the original Scala example builds and runs just fine, but

JavaPairRDD rdd = jsc.newAPIHadoopRDD(job.getConfiguration(),
CqlPagingInputFormat.class,
java.util.Map<String, ByteBuffer>.class,
java.util.Map<String, ByteBuffer>.class);


is not allowed in Java (
Cannot select from parameterized type
).

Changing

java.util.Map<String, ByteBuffer>.class


into

Class.forName("java.util.Map<String, ByteBuffer>")


yields a new error:

Error:(42, 30) java: method newAPIHadoopRDD in class org.apache.spark.api.java.JavaSparkContext cannot be applied to given types;
required: org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>
found: org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat>,java.lang.Class<capture#1 of ?>,java.lang.Class<capture#2 of ?>
reason: inferred type does not conform to declared bound(s)
inferred: org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
bound(s): org.apache.hadoop.mapreduce.InputFormat<capture#1 of ?,capture#2 of ?>


Changing it into simply
java.util.Map.class
yields a similar error:

Error:(44, 30) java: method newAPIHadoopRDD in class org.apache.spark.api.java.JavaSparkContext cannot be applied to given types;
required: org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>
found: org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat>,java.lang.Class<java.util.Map>,java.lang.Class<java.util.Map>
reason: inferred type does not conform to declared bound(s)
inferred: org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
bound(s): org.apache.hadoop.mapreduce.InputFormat<java.util.Map,java.util.Map>


So what is the correct translation? Worth noting that the
newAPIHadoopRDD()
function is a different implementation for Scala and for Java. Documentation for the methods can be found here for Scala and here: http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaSparkContext.html#newAPIHadoopRDD(org.apache.hadoop.conf.Configuration, java.lang.Class, java.lang.Class, java.lang.Class) for Java.

The declaration of
CqlPagingInputFormat
looks like this

public class CqlPagingInputFormat extends org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat<java.util.Map<java.lang.String,java.nio.ByteBuffer>,java.util.Map<java.lang.String,java.nio.ByteBuffer>> {

Answer

Finally I got it resolved after much fight. Problem is newHadoopAPI requires a class which extends org.apache.hadoop.mapreduce.InputFormat and org.apache.cassandra.hadoop.cql3.CqlInputFormat does not extend InputFormat directly, instead it extends org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat which in turn extends InputFormat.

Eclipse uses groovy compiler which is smart enough to resolve this but Java's default compiler is unable to resolve this. Also Groovy compiler resolves K,V values properly which java compiler finds incompatible.

You need to add following changes to pom.xml file to use groovy compiler:

<properties>
    <groovy-version>1.8.6</groovy-version>
    <maven-comipler-plugin-version>2.5.1</maven-comipler-plugin-version>
    <groovy-eclipse-compiler-version>2.7.0-01</groovy-eclipse-compiler-version>
    <maven-clover2-plugin-version>3.1.7</maven-clover2-plugin-version>
    <groovy-eclipse-batch-version>1.8.6-01</groovy-eclipse-batch-version>
</properties>
  1. Add groovy as a dependency

    <dependencies>
        <dependency>
            <groupId>org.codehaus.groovy</groupId>
            <artifactId>groovy-all</artifactId>
            <version>${groovy-version}</version>
        </dependency>
    <dependencies>
    
  2. Add grovvy plugin under build to use it as compiler for our code

    <build>
        <pluginManagement>
            <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${maven-comipler-plugin-version}</version>
                <configuration>
                    <!-- Bind Groovy Eclipse Compiler -->
                    <compilerId>groovy-eclipse-compiler</compilerId>
                    <source>${jdk-version}</source>
                    <target>${jdk-version}</target>
                </configuration>
                <dependencies>
                    <!-- Define which Groovy version will be used for build (default is 
                        2.0) -->
                    <dependency>
                        <groupId>org.codehaus.groovy</groupId>
                        <artifactId>groovy-eclipse-batch</artifactId>
                        <version>${groovy-eclipse-batch-version}</version>
                    </dependency>
                    <!-- Define dependency to Groovy Eclipse Compiler (as it's referred 
                        in compilerId) -->
                    <dependency>
                        <groupId>org.codehaus.groovy</groupId>
                        <artifactId>groovy-eclipse-compiler</artifactId>
                        <version>${groovy-eclipse-compiler-version}</version>
                    </dependency>
                </dependencies>
            </plugin>
            <!-- Define Groovy Eclipse Compiler again and set extensions=true. Thanks 
                to this, plugin will -->
            <!-- enhance default build life cycle with an extra phase which adds 
                additional Groovy source folders -->
            <!-- It works fine under Maven 3.x, but we've encountered problems with 
                Maven 2.x -->
            <plugin>
                <groupId>org.codehaus.groovy</groupId>
                <artifactId>groovy-eclipse-compiler</artifactId>
                <version>${groovy-eclipse-compiler-version}</version>
                <extensions>true</extensions>
            </plugin>
            <!-- Configure Clover for Maven plug-in. Please note that it's not bound 
                to any execution phase, --> 
            <!-- so you'll have to call Clover goals from command line. -->
            <plugin>
                <groupId>com.atlassian.maven.plugins</groupId>
                <artifactId>maven-clover2-plugin</artifactId>
                <version>${maven-clover2-plugin-version}</version>
                <configuration>
                    <generateHtml>true</generateHtml>
                    <historyDir>.cloverhistory</historyDir>
                </configuration>
            </plugin>
            </plugins>
        </pluginManagement>
    </build>
    

This should solve it.