pythonic pythonic - 2 months ago 21x
Scala Question

Is there a way to catch executor killed exception in Spark?

During execution of my Spark program, sometimes (The reason for it is still a mystery to me) yarn kills containers (executors) giving the message that the memory limit was exceeded. My program does recover though with Spark re-executing the task by spawning a new container. However, in my program, a task also creates some intermediate files on the disk. When a container is killed, those files are left behind. Is there a way I can catch the executor-killed as an exception so that I can delete the intermediate files left behind. Obviously, the exception handling code also needs to run on the same node the executor was running on, so that I can delete the files from there.


Adding on top of @Taras Matyashovskyy answer.

You can Use SparkListener and intercept SparkListener (Executor) events.

Below are list of Listener events available.

  • SparkListenerApplicationStart

  • SparkListenerJobStart

  • SparkListenerStageSubmitted

  • SparkListenerTaskStart

  • SparkListenerTaskGettingResult

  • SparkListenerTaskEnd

  • SparkListenerStageCompleted

  • SparkListenerJobEnd

  • SparkListenerApplicationEnd

  • SparkListenerEnvironmentUpdate

  • SparkListenerBlockManagerAdded

  • SparkListenerBlockManagerRemoved

  • SparkListenerBlockUpdated

  • SparkListenerUnpersistRDD

  • SparkListenerExecutorAdded

  • SparkListenerExecutorRemoved

Example : HeartBeatReceiver.scala

 * Lives in the driver to receive heartbeats from executors..
private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
  extends SparkListener with ThreadSafeRpcEndpoint with Logging {

  def this(sc: SparkContext) {
    this(sc, new SystemClock)

  sc.addSparkListener(this) ...

Please have a look in to removed reason which may suits you(I haven't tried)