rakesh rakesh - 12 days ago 9
Scala Question

Apache Spark: Uncaught exception in thread driver-heartbeater

I wrote below simple spark program, using spark's StreamingContext and SQLContext.

Note: The issue is reproducible, even without the streamingContext. Updated the program to reflect the same.

Note: Downgrading the spark version to 1.4.1 (I was using 1.5.2) seems to have fixed the issue for me. With spark also 1.5.1 the issue us reproducible.

def main(args: Array[String]) {
val sc = new SparkContext("local[*]", "test")
val sqc = new SQLContext(sc)

val dataFrame = sqc.read.json(sc.textFile("<dir>"))
println(dataFrame.groupBy("Product.SerialNumber").count().count())
sc.stop()
}


This is giving below exception at the beginning but execution is proceeding and printing result.

15/11/25 15:48:55 ERROR Utils: Uncaught exception in thread driver-heartbeater
java.io.IOException: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.util.Utils$.deserialize(Utils.scala:91)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:440)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:430)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:430)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:428)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:428)
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:472)
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:472)
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:472)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:472)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006)
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501)
at org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)
... 33 more


After 2 minutes, below exception happens and the execution is terminated. Till two minute the execution happens flawlessly and no issue/exception is reported.

15/11/25 15:51:44 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 179219 ms exceeds timeout 120000 ms^M
15/11/25 15:51:44 ERROR TaskSchedulerImpl: Lost executor driver on localhost: Executor heartbeat timed out after 179219 ms^M
15/11/25 15:51:44 WARN TaskSetManager: Lost task 4.0 in stage 193.0 (TID 7688, localhost): ExecutorLostFailure (executor driver lost)^M
15/11/25 15:51:44 ERROR TaskSetManager: Task 4 in stage 193.0 failed 1 times; aborting job^M
15/11/25 15:51:44 WARN TaskSetManager: Lost task 7.0 in stage 193.0 (TID 7691, localhost): ExecutorLostFailure (executor driver lost)^M
15/11/25 15:51:44 WARN TaskSetManager: Lost task 6.0 in stage 193.0 (TID 7690, localhost): ExecutorLostFailure (executor driver lost)^M
15/11/25 15:51:44 WARN TaskSetManager: Lost task 5.0 in stage 193.0 (TID 7689, localhost): ExecutorLostFailure (executor driver lost)^M
15/11/25 15:51:44 WARN SparkContext: Killing executors is only supported in coarse-grained mode^M
15/11/25 15:51:45 ERROR JobScheduler: Error running job streaming job 1448446890000 ms.0^M
org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 193.0 failed 1 times, most recent failure: Lost task 4.0 in stage 193.0 (TID 7688, localhost): ExecutorLostFailure (executor driver lost)^M
Driver stacktrace:^M
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)^M
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)^M
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)^M
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)^M
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)^M
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)^M
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)^M
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)^M
at scala.Option.foreach(Option.scala:257)^M
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)^M
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)^M
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)^M
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)^M
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)^M
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)^M
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)^M
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)^M
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)^M
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1921)^M
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:909)^M
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)^M
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)^M
at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)^M
at org.apache.spark.rdd.RDD.collect(RDD.scala:908)^M
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:177)^M
at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)^M
at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)^M
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)^M
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)^M
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)^M
at org.apache.spark.sql.DataFrame.count(DataFrame.scala:1402)^M
at main$$anonfun$main$1.apply(Main.scala:72)^M
at main$$anonfun$main$1.apply(Main.scala:68)^M
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)^M
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)^M
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)^M
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)^M
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)^M
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)^M
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)^M
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)^M
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)^M
at scala.util.Try$.apply(Try.scala:192)^M
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)^M
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:218)^M
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:218)^M
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:218)^M
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)^M
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:217)^M
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)^M
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)^M
at java.lang.Thread.run(Thread.java:745)^M
[error] (run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 193.0 failed 1 times, most recent failure: Lost task 4.0 in stage 193.0 (TID 7688, localhost): ExecutorLostFailure (executor driver lost)^M

Answer

You may forget adding some dependency jars when you submit the spark job. Try to assemble your project (so that all dependencies are included) before you submit it to spark:

sbt assembly

BTW, when I run

sbt console

and run commands inside scala interpreter, I will have the same problem as yours. But if I assemble it firstly and run the job by

spark submit --class className target/scala-2.10/xxx-assembly-0.1.0.jar someArgs

It works:)

ref: Apache Spark 1.5 with Cassandra : Class cast exception

Comments