duckertito duckertito - 18 days ago 5
Scala Question

Stark Streaming gets stopped whout errors after ~1 minute

When I make

spark-submit
for the Spart Streaming job, I can see that it's running during approximatelly 1 minute, and then it's stopped with the final status
SUCCEEDED
:

16/11/16 18:58:16 INFO yarn.Client: Application report for application_XXXX_XXX (state: RUNNING)
16/11/16 18:58:17 INFO yarn.Client: Application report for application_XXXX_XXX (state: FINISHED)


I don't understand why it gets stopped, while I expect it to run for an undefined time and be triggered by messages received from the Kafka queue. In logs I can see all the
println
outputs, and there are no errors.

This is a short extract from the code:

val conf = new SparkConf().setAppName("MYTEST")
val sc = new SparkContext(conf)
sc.setCheckpointDir("~/checkpointDir")

val ssc = new StreamingContext(sc, Seconds(batch_interval_seconds))

val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)

println("Dividing the topic into partitions.")
val inputKafkaTopicMap = inputKafkaTopic.split(",").map((_, kafkaNumThreads)).toMap
val messages = KafkaUtils.createStream(ssc, zkQuorum, group, inputKafkaTopicMap).map(_._2)

messages.foreachRDD(msg => {
msg.foreach(s => {
if (s != null) {
//val result = ... processing goes here
//println(result)
}
})
})

// Start the streaming context in the background.
ssc.start()


This is my
spark-submit
command:

/usr/bin/spark-submit --master yarn --deploy-mode cluster --driver-memory 10g --executor-memory 10g --num-executors 2 --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC \
-XX:+AlwaysPreTouch" --class org.test.StreamingRunner test.jar param1 param2


When I open Resource Manager, I see that no job is
RUNNING
and the spark streaming job gets marked as
FINISHED
.

Answer

Your code is missing a call to ssc.awaitTermination to block the driver thread.

Unfortunately there's no easy way to see the printouts from inside your map function on the console, since those function calls are happening inside YARN executors. Cloudera Manager provides a decent look at the logs though, and if you really need them collected on the driver you can write to a location in HDFS and then scrape the various logs from there yourself. If the information that you want to track is purely numeric you might also consider using an Accumulator.

Comments