Krishna .M Krishna .M - 1 month ago 21
Scala Question

ClassNotFoundException spark-submit scala

Hi I am trying to generate output of Salt Examples but without using docker as mentioned in it's documentation. I found the scala code that helps in generating the output which is the Main.scala. I took to modify the Main.scala to a convenient one,

package BinExTest
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row

import software.uncharted.salt.core.projection.numeric._
import software.uncharted.salt.core.generation.request._
import software.uncharted.salt.core.generation.Series
import software.uncharted.salt.core.generation.TileGenerator
import software.uncharted.salt.core.generation.output.SeriesData
import software.uncharted.salt.core.analytic.numeric._

import java.io._

import scala.util.parsing.json.JSONObject

object Main {

// Defines the tile size in both x and y bin dimensions
val tileSize = 256

// Defines the output layer name
val layerName = "pickups"

// Creates and returns an Array of Double values encoded as 64bit Integers
def createByteBuffer(tile: SeriesData[(Int, Int, Int), (Int, Int), Double, (Double, Double)]): Array[Byte] = {
val byteArray = new Array[Byte](tileSize * tileSize * 8)
var j = 0
tile.bins.foreach(b => {
val data = java.lang.Double.doubleToLongBits(b)
for (i <- 0 to 7) {
byteArray(j) = ((data >> (i * 8)) & 0xff).asInstanceOf[Byte]
j += 1
}
})
byteArray
}

def main(args: Array[String]): Unit = {

val jarFile = "/home/kesava/Studies/BinExTest/BinExTest.jar";
val inputPath = "/home/kesava/Downloads/taxi_micro.csv"
val outputPath = "/home/kesava/SoftWares/salt/salt-examples/bin-example/Output"

val conf = new SparkConf().setAppName("salt-bin-example").setJars(Array(jarFile))
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

sqlContext.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load(s"file://$inputPath")
.registerTempTable("taxi_micro")

// Construct an RDD of Rows containing only the fields we need. Cache the result
val input = sqlContext.sql("select pickup_lon, pickup_lat from taxi_micro")
.rdd.cache()

// Given an input row, return pickup longitude, latitude as a tuple
val pickupExtractor = (r: Row) => {
if (r.isNullAt(0) || r.isNullAt(1)) {
None
} else {
Some((r.getDouble(0), r.getDouble(1)))
}
}

// Tile Generator object, which houses the generation logic
val gen = TileGenerator(sc)

// Break levels into batches. Process several higher levels at once because the
// number of tile outputs is quite low. Lower levels done individually due to high tile counts.
val levelBatches = List(List(0, 1, 2, 3, 4, 5, 6, 7, 8), List(9, 10, 11), List(12), List(13), List(14))

// Iterate over sets of levels to generate.
val levelMeta = levelBatches.map(level => {

println("------------------------------")
println(s"Generating level $level")
println("------------------------------")

// Construct the definition of the tiling jobs: pickups
val pickups = new Series((tileSize - 1, tileSize - 1),
pickupExtractor,
new MercatorProjection(level),
(r: Row) => Some(1),
CountAggregator,
Some(MinMaxAggregator))

// Create a request for all tiles on these levels, generate
val request = new TileLevelRequest(level, (coord: (Int, Int, Int)) => coord._1)
val rdd = gen.generate(input, pickups, request)

// Translate RDD of Tiles to RDD of (coordinate,byte array), collect to master for serialization
val output = rdd
.map(s => pickups(s).get)
.map(tile => {
// Return tuples of tile coordinate, byte array
(tile.coords, createByteBuffer(tile))
})
.collect()

// Save byte files to local filesystem
output.foreach(tile => {
val coord = tile._1
val byteArray = tile._2
val limit = (1 << coord._1) - 1
// Use standard TMS path structure and file naming
val file = new File(s"$outputPath/$layerName/${coord._1}/${coord._2}/${limit - coord._3}.bins")
file.getParentFile.mkdirs()
val output = new FileOutputStream(file)
output.write(byteArray)
output.close()
})

// Create map from each level to min / max values.
rdd
.map(s => pickups(s).get)
.map(t => (t.coords._1.toString, t.tileMeta.get))
.reduceByKey((l, r) => {
(Math.min(l._1, r._1), Math.max(l._2, r._2))
})
.mapValues(minMax => {
JSONObject(Map(
"min" -> minMax._1,
"max" -> minMax._2
))
})
.collect()
.toMap
})

// Flatten array of maps into a single map
val levelInfoJSON = JSONObject(levelMeta.reduce(_ ++ _)).toString()
// Save level metadata to filesystem
val pw = new PrintWriter(s"$outputPath/$layerName/meta.json")
pw.write(levelInfoJSON)
pw.close()

}
}


I created a separate folder for this scala with another folder in it named lib that had the jars required and I compiled it with scalac as follows,


scalac -cp "lib/salt.jar:lib/spark.jar" Main.scala


This ran successfully and generated classes under a folder BinExTest.

Now, the project's build.gradle had the following lines of code with which identified that this is the command that would help in generating the output dataset,

task run(overwrite: true, type: Exec, dependsOn: [assemble]) {
executable = 'spark-submit'
args = ["--class","software.uncharted.salt.examples.bin.Main","/opt/salt/build/libs/salt-bin-example-${version}.jar", "/opt/data/taxi_one_day.csv", "/opt/output"]
}


Seeing this, I made the following command,


spark-submit --class BinExTest.Main lib/salt.jar


When I do this, I get the following error,


java.lang.ClassNotFoundException: Main.BinExTest at
java.net.URLClassLoader$1.run(URLClassLoader.java:366) at
java.net.URLClassLoader$1.run(URLClassLoader.java:355) at
java.security.AccessController.doPrivileged(Native Method) at
java.net.URLClassLoader.findClass(URLClassLoader.java:354) at
java.lang.ClassLoader.loadClass(ClassLoader.java:425) at
java.lang.ClassLoader.loadClass(ClassLoader.java:358) at
java.lang.Class.forName0(Native Method) at
java.lang.Class.forName(Class.java:278) at
org.apache.spark.util.Utils$.classForName(Utils.scala:174) at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:689)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Can somebody help me out in this? I am completely new to this and came this far just by exploration.

[Update 1]


Taking in YoYo's suggestion,


spark-submit --class BinExTest.Main --jars "BinExTest.jar" "lib/salt.jar"


I got the ClassNotFoundException gone generating new error and is as follows,


Exception in thread "main" org.apache.spark.SparkException: Job
aborted due to stage failure: Task 1 in stage 3.0 failed 1 times, most
recent failure: Lost task 1.0 in stage 3.0 (TID 6, localhost):
java.lang.NoSuchMethodError:
scala.runtime.IntRef.create(I)Lscala/runtime/IntRef; at
BinExTest.Main$.createByteBuffer(Main.scala:29) at
BinExTest.Main$$anonfun$2$$anonfun$6.apply(Main.scala:101) at
BinExTest.Main$$anonfun$2$$anonfun$6.apply(Main.scala:99) at
scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at
scala.collection.Iterator$class.foreach(Iterator.scala:727) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157) at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Any idea what's going on?

[Update 2]


Building Spark from source with Scala2.11 support solved my previous issue. However I got a new error and it is,


6/05/10 18:39:15 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1
times; aborting job Exception in thread "main"
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0
in stage 2.0 (TID 3, localhost): java.lang.NoClassDefFoundError:
scala/collection/GenTraversableOnce$class at
software.uncharted.salt.core.util.SparseArray.(SparseArray.scala:37)
at
software.uncharted.salt.core.util.SparseArray.(SparseArray.scala:57)
at
software.uncharted.salt.core.generation.rdd.RDDSeriesWrapper.makeBins(RDDTileGenerator.scala:224)
at
software.uncharted.salt.core.generation.rdd.RDDTileGeneratorCombiner.createCombiner(RDDTileGenerator.scala:128)
at
software.uncharted.salt.core.generation.rdd.RDDTileGenerator$$anonfun$3.apply(RDDTileGenerator.scala:100)
at
software.uncharted.salt.core.generation.rdd.RDDTileGenerator$$anonfun$3.apply(RDDTileGenerator.scala:100)
at
org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:187)
at
org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:186)
at
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:148)
at
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745) Caused by:
java.lang.ClassNotFoundException:
scala.collection.GenTraversableOnce$class at
java.net.URLClassLoader$1.run(URLClassLoader.java:366) at
java.net.URLClassLoader$1.run(URLClassLoader.java:355) at
java.security.AccessController.doPrivileged(Native Method) at
java.net.URLClassLoader.findClass(URLClassLoader.java:354) at
java.lang.ClassLoader.loadClass(ClassLoader.java:425) at
java.lang.ClassLoader.loadClass(ClassLoader.java:358)


Is this because scala2.11 does not have the mentioned class?

[Final Update]


Adding scala2.10 to the spark-submit did the trick.


spark-submit --class "BinExTest.Main" --jars
"BinExTest.jar,lib/scala210.jar" "lib/salt.jar"

Answer

For a Spark job to run, it need to self-replicate it's code over the different nodes that make part of your spark cluster. It does that by literally copying over the jar file to the other nodes.

That means that you need to make sure that your class files are packaged in a .jar file. In my typical solutions, I would build an Uber jar that packages the class files, and the dependent jar files together in a single .jar file. For that I use the Maven Shade plugin. That doesn't have to be your solution, but at least you should build a .jar file out of your generated classes.

To provide manually additional jar files - you will need to add them using the --jars option which will expect a comma delimited list.

Update 1

Actually, even for me there is a lot of confusion about all the available options, specifically to the jar files and how they are distributed, or modify the classpath in spark. See another topic I just posted.

Update 2

For the second part of your question that is already answered on another thread.