jopasserat jopasserat - 1 year ago 57
Bash Question

Different behaviour when reading from Parquet in standalone/master-slave spark-shell

Here is a snippet from a larger code I'm using to read a dataframe from Parquet in Scala.

case class COOMatrix(row: Seq[Long], col: Seq[Long], data: Seq[Double])

def buildMatrix(cooMatrixFields: DataFrame) = {

val cooMatrices = cooMatrixFields map {
case Row(r,c,d) => COOMatrix(r.asInstanceOf[Seq[Long]], c.asInstanceOf[Seq[Long]], d.asInstanceOf[Seq[Double]])

val matEntries = cooMatrices.zipWithIndex.flatMap {
case (cooMat, matIndex) =>
val rowOffset = cooMat.row.distinct.size
val colOffset = cooMat.col.distinct.size

val cooMatRowShifted = => rowEntry + rowOffset * matIndex)
val cooMatColShifted = => colEntry + colOffset * matIndex)

(cooMatRowShifted, cooMatColShifted, {
case (i, j, value) => MatrixEntry(i, j, value)

new CoordinateMatrix(matEntries)

val C_entries ="${dataBaseDir}/C.parquet")

val C = buildMatrix(C_entries)

My code executes successfully when running in a local spark context.

On a standalone cluster, the very same code fails as soon as it reaches an action that forces it to actually read from Parquet.
The dataframe's schema is retrieved correctly:

C_entries: org.apache.spark.sql.DataFrame = [C_row: array<bigint>, C_col: array<bigint>, C_data: array<double>]

But the executors crash when executing this line
val C = buildMatrix(C_entries)
, with this exception:

at $line39.$read$$iwC.<init>(<console>:7)
at $line39.$read.<init>(<console>:61)
at $line39.$read$.<init>(<console>:65)
at $line39.$read$.<clinit>(<console>)
at $line67.$read$$iwC.<init>(<console>:7)
at $line67.$read.<init>(<console>:24)
at $line67.$read$.<init>(<console>:28)
at $line67.$read$.<clinit>(<console>)
at $line68.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$3.apply(<console>:63)
at $line68.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$3.apply(<console>:62)
at scala.collection.Iterator$$anon$
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597)
at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.executor.Executor$
at java.util.concurrent.ThreadPoolExecutor.runWorker(
at java.util.concurrent.ThreadPoolExecutor$
Caused by: java.lang.NullPointerException
at $line4.$read$$iwC$$iwC.<init>(<console>:15)
at $line4.$read$$iwC.<init>(<console>:24)
at $line4.$read.<init>(<console>:26)
at $line4.$read$.<init>(<console>:30)
at $line4.$read$.<clinit>(<console>)
... 22 more

Not sure it's related, but while increasing the log verbosity, i've noticed this exception:

16/03/07 20:59:38 INFO GenerateUnsafeProjection: Code generated in 157.285464 ms
16/03/07 20:59:38 DEBUG ExecutorClassLoader: Did not load class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection from REPL class server at
java.lang.ClassNotFoundException: Class file not found at URL

I've tried different configurations for the standalone cluster:

  • master, 1 slave and spark-shell running on my laptop

  • master and 1 slave each running on separate machines, spark-shell on my laptop

  • master and spark-shell on one machine, 1 slave on another one

I've started with the default properties and evolved to a more convoluted properties file without more success:

spark.driver.memory 4g
spark.eventLog.enabled true
spark.eventLog.dir file:///mnt/fastmp/spark_workdir/logs
spark.driver.extraJavaOptions -Xmx20480m -XX:MaxPermSize=2048m -XX:ReservedCodeCacheSize=2048m
spark.shuffle.service.enabled true
spark.shuffle.consolidateFiles true
spark.sql.parquet.binaryAsString true
spark.speculation false
spark.rpc.timeout 1000
spark.rdd.compress true
spark.core.connection.ack.wait.timeout 600
spark.driver.maxResultSize 0
spark.task.maxFailures 3 3

I'm running the pre-built version of spark-1.6.0-bin-hadoop2.6.
There's no HDFS involved in this deployment, all Parquet files are stored on a shared mount (CephFS) available to all the machines.

I doubt this is related to the underlying file system, as another part of my code reads a different Parquet file fine in both local and standalone mode.

Answer Source

TL;DR: package your code as a jar

For record purpose, the problem seemed to be linked to the use of a standalone cluster.

The exact same code works fine with these setups:

  • spark-shell and master on the same machine
  • running on YARN (AWS EMR cluster) and reading the parquet files from S3

With a bit more digging in the logs of the standalone setup, the problem seems to be linked to this exception with the class server:

INFO GenerateUnsafeProjection: Code generated in 157.285464 ms
DEBUG ExecutorClassLoader: Did not load class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection from REPL class server at
java.lang.ClassNotFoundException: Class file not found at URL

My understanding is that the spark-shell starts an HTTP server (jetty) in order to serve the classes it generates from the code in the REPL to the workers.

In my case, lots of classes are served successfully (i've even managed to retrieve some through telnet). However the class GeneratedClass (and all its inner classes) can't be found by the class server.

The typical error message appearing in the log is:

DEBUG Server: RESPONSE /org/apache/spark/sql/catalyst/expressions/GeneratedClass.class  404 handled=true

My idea is that it works with master and spark-shell on the same server as they run in the same JVM so the class can be found even though the HTTP transfer fails.

The only successful solution I've found so far is to build a jar package and use the --jars option of spark-shell or pass it as a parameter to spark-submit.