Bindumalini KK Bindumalini KK - 1 month ago 6
Scala Question

Spark not saving the dataframe as a paraquet file

Trying to save the spark dataframe as a paraquet file.But unable to achieve ,due to the Exception below.Kindly guide me,if I am missing something.The dataframe has been constructed from the kafka stream rdds.

dataframe.write.paraquet("/user/space")


Exception Stack:

Exception in thread "streaming-job-executor-0" java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/FileFormat
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:42)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.filter(TraversableLike.scala:263)
at scala.collection.AbstractTraversable.filter(Traversable.scala:105)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:59)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:219)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:334)
at KafkaHbaseWrite$$anonfun$main$1.apply(KafkaHbaseWrite.scala:309)
at KafkaHbaseWrite$$anonfun$main$1.apply(KafkaHbaseWrite.scala:280)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.datasources.FileFormat
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 50 more


The snaphot of the Pom.xml used:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>Paymentprocessor</groupId>
<artifactId>research</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>


<name>research</name>

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.tools.version>2.10</scala.tools.version>
<scala.version>2.10.6</scala.version>
<spark.version>1.6.1</spark.version>
<scalaCompatVersion>2.10</scalaCompatVersion>
<maven-scala-plugin.version>2.15.2</maven-scala-plugin.version>

</properties>

<repositories>
<repository>
<id>central</id>

<name>Maven Repository</name>
<url>https://repo1.maven.org/maven2</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>scala-tools.org</id>
<name>Scala-tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
<repository>
<id>mapr-releases</id>
<url>http://repository.mapr.com/maven/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</repository>

</repositories>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.tools.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.tools.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.tools.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>

<!-- <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId>
<version>2.8.0</version> </dependency> -->

<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest</artifactId>
<version>1.2</version>
<scope>test</scope>
</dependency>


<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.15</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
<version>3.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.6.4</version>
</dependency>


<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-annotation</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<version>3.0</version>
<scope>test</scope>
</dependency>




<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.tools.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>com.jsuereth</groupId>
<artifactId>scala-arm_2.10</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.7</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-producer_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-v09_2.10</artifactId>
<version>1.6.1-mapr-1607</version> </dependency> <dependency> <groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-producer_2.10</artifactId> <version>1.6.1-mapr-1607</version>
</dependency> -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.2</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.1</version>
</dependency>


<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.3</version>
</dependency>

<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hivecontext-compatibility_2.10</artifactId>
<version>2.0.0-preview</version>
</dependency>

</dependencies>


<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.8</version>
</plugin>


</plugins>
</build>
</project>


Code snippet:

val messagesDStream: InputDStream[(String, String)] = {

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
}



val valuesDStream: DStream[String] = messagesDStream.map(_._2)

/*Construct RDD from Kafka*/

println("Count value"+valuesDStream.count())

/*Construct RDD from Kafka*/
valuesDStream.foreachRDD { rdd =>
// There exists at least one element in RDD
if (!rdd.isEmpty) {
val count = rdd.count
println("count received " + count)
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)


import sqlContext.implicits._
import org.apache.spark.sql.functions._


val cdrDF = rdd.map(CallCreditCardRecord.parseCallCreditCardRecord).toDF()
val cardRDD = cdrDF.cache()
println("PRinting")

cdrDF.registerTempTable("Card")
cdrDF.printSchema()
cdrDF.show()

cdrDF.write.format("parquet").save("/usr/local/Cellar/hadoop/hdfs/tmp/nm-local-dir/CreditCardRecord.parquet")

}
}

ssc.start()
//ssc.awaitTermination()

ssc.stop(stopSparkContext = true, stopGracefully = true)

Answer

You seem to be mixing different Spark versions - most likely, your cluster (master / workers) runs one Spark version while your driver application another, therefore you're getting a ClassNotFoundException for a class that only exists in one of these versions.

Specifically, the class org.apache.spark.sql.execution.datasources.FileFormat was only created ~2 weeks ago (by this commit) and isn't part of any official Spark release yet: are you using Spark's "latest master" version in one of your components? If so - either use it in all components (but be prepared to see some bugs and rough edges), or make sure all of your code is compiled and executed with one official version.

EDIT (after pom file posted): your pom file contains two different Spark versions - 1.6.1 for most dependencies, and 2.0.0-preview for the last one:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-hivecontext-compatibility_2.10</artifactId>
  <version>2.0.0-preview</version>
</dependency> 

You should remove this dependency (it's not needed in 1.6.1).