theMadKing theMadKing - 2 months ago 25
Scala Question

Spark Streaming 2.0 GC Error (Shuffle Issue)

My kafka topics have millions of messages in them, unfortunately Spark Streaming seems to queue all of the messages no matter the time limit I get. I have set my standalone server with 16 cores and 64GB RAM, I've given my driver 12G and the Executor 12G of Memory.

16/09/14 19:27:45 WARN TaskSetManager: Lost task 4.0 in stage 1.0 (TID 7, 10.206.41.172): java.lang.OutOfMemoryError: GC overhead limit exceeded
at com.twitter.chill.Tuple4Serializer.read(TupleSerializers.scala:68)
at com.twitter.chill.Tuple4Serializer.read(TupleSerializers.scala:59)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:708)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:396)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:307)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:708)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:551)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
at org.apache.spark.serializer.DeserializationStream.readValue(Serializer.scala:159)
at org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.readNextItem(ExternalAppendOnlyMap.scala:515)
at org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.hasNext(ExternalAppendOnlyMap.scala:535)
at scala.collection.Iterator$$anon$1.hasNext(Iterator.scala:1004)
at org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.org$apache$spark$util$collection$ExternalAppendOnlyMap$ExternalIterator$$readNextHashCode(ExternalAppendOnlyMap.scala:332)
at org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator$$anonfun$5.apply(ExternalAppendOnlyMap.scala:316)
at org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator$$anonfun$5.apply(ExternalAppendOnlyMap.scala:314)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.<init>(ExternalAppendOnlyMap.scala:314)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.iterator(ExternalAppendOnlyMap.scala:288)
at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:43)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:91)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:695)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:691)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:691)
at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:145)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


org.apache.spark.shuffle.FetchFailedException: /tmp/spark-e4238a07-bf89-4a7d-9de3-176cba0a076d/executor-93a11b25-1cb1-4e13-b1b6-b1d64d3a9602/blockmgr-c11cd046-7c37-429c-9137-936d391d3cbc/30/shuffle_0_0_0.index (No such file or directory)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:154)
at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:41)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:91)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: /tmp/spark-e4238a07-bf89-4a7d-9de3-176cba0a076d/executor-93a11b25-1cb1-4e13-b1b6-b1d64d3a9602/blockmgr-c11cd046-7c37-429c-9137-936d391d3cbc/30/shuffle_0_0_0.index (No such file or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.<init>(FileInputStream.java:138)
at org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:192)
at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:278)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchLocalBlocks(ShuffleBlockFetcherIterator.scala:258)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:292)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:120)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45)
... 9 more


java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)


Any recommendations on what I can do to handle this? Is my solution going to be just feed it more memory? I pointing at a shuffle issue because one exeuctor has:

Shuffle Read Size/Records: 815.5 MB / 27445419

Shuffle Spill (memory): 13GB

Shuffle Spill (Disk): 697.4 MB

I have no idea what it might be shuffling.

val messageStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, Int, Long, String)](ssc, getKafkaBrokers(), getKafkaTopics("raw"), (mmd: MessageAndMetadata[String, String]) => {
(mmd.topic, mmd.partition, mmd.offset, mmd.message)
})

//first step is to take our RDD of messages and group them by the topic (x._1) and partition (x._2)
messageStream.foreachRDD(x => x.groupBy(x => (x._1, x._2)).foreach(x => {
//Populate parameters based on the grouping and get the instruction sets to run on each message
val rawTopic: String = x._1._1
val partitionID: Int = x._1._2
val sourceSystemName: String = rawTopic.split("_")(0)
val cleanTopic = sourceSystemName + "_clean"
val topicID = getTopicID(rawTopic)
val schemaID = getPartitionByTopic(topicID).filter(x => x._1 == partitionID)(0)._2
val classpath = getClasspath(schemaID.toInt)
val classPath = bcClassMap.value.get(classpath)
val cleanerObj = classPath.newInstance()
val cleanMethod = classPath.getMethod("clean", Class.forName("java.lang.String"))

//For each message run the instruction sets we've gotten above
x._2.foreach(kafkaMessage => {
val offset: Long = kafkaMessage._3
val rawRecord: String = kafkaMessage._4
val cleanRecord = cleanMethod.invoke(cleanerObj, rawRecord).asInstanceOf[String]
if (cleanRecord != null) {
sendKafka(cleanRecord, cleanTopic, partitionID.toString)
}
offsetUpdate(schemaID, offset.toInt, topicID)
})
}))

Answer

An obvious suspect here is the initial groupBy.

While it can be maintainable in a batch application, when you have some idea about data distribution, it is simply not acceptable in a streaming application, where single skewed batch can break a whole pipeline. Even if it doesn't come that far it can cripple overall performance.

Since your code doesn't really depend on the grouping and it is only an optimization attempt you're in a pretty good situation. You can for example:

  • Drop groupBy completely.
  • Use shared cache for each partition or executor to handle expensive API calls.

If number of API calls is still to high you can generate salted partitioning keys:

_.keyBy(x => (x._1, x._2, smallRandomInteger))

and repartition each RDD.

You could use salting for groupBy as well but if used like this you address only data skews not the cost of grouping and maintaining large local buffers.