Daniel Langdon Daniel Langdon - 21 days ago 12
Scala Question

Spark: Error communicating with MapOutputTracker

I'm using Spark 1.3 to do an aggregation on a lot of data. The job consists of 4 steps:


  1. Read a big (1TB) sequence file (corresponding to 1 day of data)

  2. Filter out most of it and get about 1GB of shuffle write

  3. keyBy customer

  4. aggregateByKey() to a custom structure that build a profile for that customer, corresponding to a HashMap[Long, Float] per customer. The Long keys are unique and never bigger than 50K distinct entries.



I'm running this with this configuration:

--name geo-extract-$1-askTimeout \
--executor-cores 8 \
--num-executors 100 \
--executor-memory 40g \
--driver-memory 4g \
--driver-cores 8 \
--conf 'spark.storage.memoryFraction=0.25' \
--conf 'spark.shuffle.memoryFraction=0.35' \
--conf 'spark.kryoserializer.buffer.max.mb=1024' \
--conf 'spark.akka.frameSize=1024' \
--conf 'spark.akka.timeout=200' \
--conf 'spark.akka.askTimeout=111' \
--master yarn-cluster \


And getting this error:

org.apache.spark.SparkException: Error communicating with MapOutputTracker
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:117)
at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:164)
at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
...
Caused by: org.apache.spark.SparkException: Error sending message [message = GetMapOutputStatuses(0)]
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113)
... 21 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)


The job and the logic have been shown to work with a small test set and I can even run this job for some dates but not for others. I've googled around and found hints that "Error communicating with MapOutputTracker" is related to internal Spark messages, but I already increased "spark.akka.frameSize", "spark.akka.timeout" and "spark.akka.askTimeout" (this last one does not even appear on Spark documentation, but was mentioned in the Spark mailing list), to no avail. There is still some timeout going on at 30 seconds that I have no clue how to identify or fix.

I see no reason for this to fail due to data size, as the filtering operation and the fact that aggregateByKey performs local partial aggregations should be enough to address the data size. The number of tasks is 16K (automatic from the original input), much more than the 800 cores that are running this, on 100 executors, so it is not as simple as the usual "increment partitions" tip. Any clues would be greatly appreciated! Thanks!

Answer

I had a similar issue, that my job would work fine with a smaller dataset, but will fail with larger ones.

After a lot of configuration changes, I found that the changing the driver memory settings has much more of an impact than changing the executor memory settings. Also using the new garbage collector helps a lot. I am using the following configuration for a cluster of 3, with 40 cores each. Hope the following config helps:

spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:NewRatio=3 -  
XX:InitiatingHeapOccupancyPercent=35 -XX:+PrintGCDetails -XX:MaxPermSize=4g 
-XX:PermSize=1G -XX:+PrintGCTimeStamps -XX:+UnlockDiagnosticVMOptions

spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:NewRatio=3 -
XX:InitiatingHeapOccupancyPercent=35 -XX:+PrintGCDetails -XX:MaxPermSize=4g   
-XX:PermSize=1G -XX:+PrintGCTimeStamps -XX:+UnlockDiagnosticVMOptions


spark.driver.memory=8g
spark.driver.cores=10
spark.driver.maxResultSize=8g

spark.executor.memory=16g
spark.executor.cores=25

spark.default.parallelism=50
spark.eventLog.dir=hdfs://mars02-db01/opt/spark/logs
spark.eventLog.enabled=true

spark.kryoserializer.buffer=512m
spark.kryoserializer.buffer.max=1536m

spark.rdd.compress=true
spark.storage.memoryFraction=0.15
spark.storage.MemoryStore=12g