srikanth srikanth - 13 days ago 5
Java Question

java.lang.NoSuchMethodError: org.apache.spark.streaming.api.java.JavaDStream.foreachRDD(Lorg/apache/spark/api/java/function/Function;)V

I'm working with kafka-sparkstreaming, java.
zookeer: v3.3.6, kafka: v2.11-0.10.1.0, spark: v2.0.1-bin-hadoop2.7, scala: v2.11.8.

There were no configuration's done apart from renaming few files in conf directory.

Code to read from kafka:

JavaPairInputDStream<String, String> kafkaStream =
KafkaUtils.createDirectStream(jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet);

JavaDStream<String> data = kafkaStream.map(new Function<Tuple2<String, String>, String>() {
/**
*
*/
private static final long serialVersionUID = 1L;

public String call(Tuple2<String, String> message) {
return message._2();
}
});

data.foreachRDD(new Function<JavaRDD<String>, Void>() {

/**
*
*/
private static final long serialVersionUID = 1L;

public Void call(JavaRDD<String> data) throws Exception {

if(data != null){
List<String> result = data.collect();

for (String jString : result) {
System.out.println("========> " + jString);
}
}else {
System.out.println("Got no data in this window");
}
return null;
}
});


pom.xml:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>1.5.2</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.2</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.1</version>
</dependency>


<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.1</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>1.5.2</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.json/json -->
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20160810</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.json4s/json4s-ast_2.11 -->
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-ast_2.11</artifactId>
<version>3.2.11</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.2.0</version>
</dependency>


<!--<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-hdfs</artifactId>
<version>0.10.0-beta1</version> </dependency> <dependency> <groupId>org.apache.solr</groupId>
<artifactId>solr-solrj</artifactId> <version>5.3.1</version> </dependency> -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1</version>
</dependency>


The above piece of code was successful in Windows. But, in Linux i'm getting the following error:

[INFO] main 2016-11-23 11:46:16,985 VerifiableProperties info - Verifying properties
[INFO] main 2016-11-23 11:46:16,987 VerifiableProperties info - Property group.id is overridden to
[INFO] main 2016-11-23 11:46:16,987 VerifiableProperties info - Property zookeeper.connect is overridden to
[DEBUG] main 2016-11-23 11:46:17,046 SimpleConsumer debug - Disconnecting from localhost:9092
[DEBUG] main 2016-11-23 11:46:17,049 BlockingChannel debug - Created socket with SO_TIMEOUT = 30000 (requested 30000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF = 1313280 (requested -1), connectTimeoutMs = 30000.
[DEBUG] main 2016-11-23 11:46:17,079 SimpleConsumer debug - Disconnecting from localhost:9092
[DEBUG] main 2016-11-23 11:46:17,084 SimpleConsumer debug - Disconnecting from localhost:9092
[DEBUG] main 2016-11-23 11:46:17,084 BlockingChannel debug - Created socket with SO_TIMEOUT = 30000 (requested 30000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF = 1313280 (requested -1), connectTimeoutMs = 30000.
[DEBUG] main 2016-11-23 11:46:17,086 SimpleConsumer debug - Disconnecting from localhost:9092
[DEBUG] main 2016-11-23 11:46:17,094 SimpleConsumer debug - Disconnecting from mc4:9092
[DEBUG] main 2016-11-23 11:46:17,095 BlockingChannel debug - Created socket with SO_TIMEOUT = 30000 (requested 30000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF = 1313280 (requested -1), connectTimeoutMs = 30000.
[DEBUG] main 2016-11-23 11:46:17,106 SimpleConsumer debug - Disconnecting from mc4:9092
[DEBUG] main 2016-11-23 11:46:17,286 ClosureCleaner logDebug - +++ Cleaning closure <function1> (org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1) +++
[DEBUG] main 2016-11-23 11:46:17,302 ClosureCleaner logDebug - + declared fields: 2
[DEBUG] main 2016-11-23 11:46:17,303 ClosureCleaner logDebug - public static final long org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.serialVersionUID
[DEBUG] main 2016-11-23 11:46:17,303 ClosureCleaner logDebug - private final org.apache.spark.api.java.function.Function org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.fun$1
[DEBUG] main 2016-11-23 11:46:17,303 ClosureCleaner logDebug - + declared methods: 1
[DEBUG] main 2016-11-23 11:46:17,304 ClosureCleaner logDebug - public final java.lang.Object org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(java.lang.Object)
[DEBUG] main 2016-11-23 11:46:17,304 ClosureCleaner logDebug - + inner classes: 0
[DEBUG] main 2016-11-23 11:46:17,305 ClosureCleaner logDebug - + outer classes: 0
[DEBUG] main 2016-11-23 11:46:17,306 ClosureCleaner logDebug - + outer objects: 0
[DEBUG] main 2016-11-23 11:46:17,308 ClosureCleaner logDebug - + populating accessed fields because this is the starting closure
[DEBUG] main 2016-11-23 11:46:17,311 ClosureCleaner logDebug - + fields accessed by starting closure: 0
[DEBUG] main 2016-11-23 11:46:17,312 ClosureCleaner logDebug - + there are no enclosing objects!
[DEBUG] main 2016-11-23 11:46:17,313 ClosureCleaner logDebug - +++ closure <function1> (org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1) is now cleaned +++
[ERROR] main 2016-11-23 11:46:17,320 root flush - Exception in thread "main"
[ERROR] main 2016-11-23 11:46:17,320 root flush - java.lang.NoSuchMethodError: org.apache.spark.streaming.api.java.JavaDStream.foreachRDD(Lorg/apache/spark/api/java/function/Function;)V
[ERROR] main 2016-11-23 11:46:17,320 root flush - at org.labs.spark.streaming.StreamingKafkaConsumer.run(StreamingKafkaConsumer.java:87)
[ERROR] main 2016-11-23 11:46:17,320 root flush - at org.labs.spark.streaming.StreamingApi.main(StreamingApi.java:35)
[ERROR] main 2016-11-23 11:46:17,320 root flush - at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[ERROR] main 2016-11-23 11:46:17,320 root flush - at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[ERROR] main 2016-11-23 11:46:17,320 root flush - at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[ERROR] main 2016-11-23 11:46:17,320 root flush - at java.lang.reflect.Method.invoke(Method.java:498)
[ERROR] main 2016-11-23 11:46:17,320 root flush - at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
[ERROR] main 2016-11-23 11:46:17,320 root flush - at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
[ERROR] main 2016-11-23 11:46:17,320 root flush - at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
[ERROR] main 2016-11-23 11:46:17,320 root flush - at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
[ERROR] main 2016-11-23 11:46:17,321 root flush - at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
[INFO] Thread-1 2016-11-23 11:46:17,327 SparkContext logInfo - Invoking stop() from shutdown hook


I have tried changing kafka version to 2.10-0.10.1.0 and 2.9.2-0.8.1.1 too. But couldn't get through.

Answer

This issue is because of version mismatch in pom.xml

you are using spark version : 2.0.1

but in pom.xml, you have added spark version for :1.5

replace that with,

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.0.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.0.1</version>
</dependency>

and try again

hope this helps