user1870400 user1870400 - 1 month ago 9
Java Question

Dag-scheduler-event-loop java.lang.OutOfMemoryError: unable to create new native thread

I get the following error from spark-driver program after running for 5-6 hours. I am using Ubuntu 16.04 LTS and open-jdk-8.

Exception in thread "ForkJoinPool-50-worker-11" Exception in thread "dag-scheduler-event-loop" Exception in thread "ForkJoinPool-50-worker-13" java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
at scala.concurrent.forkjoin.ForkJoinPool.tryAddWorker(ForkJoinPool.java:1672)
at scala.concurrent.forkjoin.ForkJoinPool.deregisterWorker(ForkJoinPool.java:1795)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:117)
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
at scala.concurrent.forkjoin.ForkJoinPool.tryAddWorker(ForkJoinPool.java:1672)
at scala.concurrent.forkjoin.ForkJoinPool.signalWork(ForkJoinPool.java:1966)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.push(ForkJoinPool.java:1072)
at scala.concurrent.forkjoin.ForkJoinTask.fork(ForkJoinTask.java:654)
at scala.collection.parallel.ForkJoinTasks$WrappedTask$class.start(Tasks.scala:377)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.start(Tasks.scala:443)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$$anonfun$spawnSubtasks$1.apply(Tasks.scala:189)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$$anonfun$spawnSubtasks$1.apply(Tasks.scala:186)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.spawnSubtasks(Tasks.scala:186)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.spawnSubtasks(Tasks.scala:443)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:157)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)
at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)
at scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:378)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:443)
at scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:426)
at scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:56)
at scala.collection.parallel.ParIterableLike$ResultMapping.leaf(ParIterableLike.scala:958)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at scala.collection.parallel.ParIterableLike$ResultMapping.tryLeaf(ParIterableLike.scala:953)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
at scala.concurrent.forkjoin.ForkJoinPool.tryAddWorker(ForkJoinPool.java:1672)
at scala.concurrent.forkjoin.ForkJoinPool.deregisterWorker(ForkJoinPool.java:1795)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:117)


This is the error produced by the Spark Driver program which is running on client mode by default so some people say just increase the heap size by passing the
--driver-memory 3g
flag or something however the message
"unable to create new native thread"
really says that the JVM is asking OS to create a new thread but OS couldn't allocate it anymore and the number of threads a JVM can create by requesting OS is platform dependent but typically it is 32K threads on a 64-bit OS & JVM.

when I did ulimit -a I get the following

core file size (blocks, -c) 0
data seg size (kbytes, -d) unlimited
scheduling priority (-e) 0
file size (blocks, -f) unlimited
pending signals (-i) 120242
max locked memory (kbytes, -l) 64
max memory size (kbytes, -m) unlimited
open files (-n) 1024
pipe size (512 bytes, -p) 8
POSIX message queues (bytes, -q) 819200
real-time priority (-r) 0
stack size (kbytes, -s) 8192
cpu time (seconds, -t) unlimited
max user processes (-u) 120242
virtual memory (kbytes, -v) unlimited
file locks (-x) unlimited


cat /proc/sys/kernel/pid_max

32768


cat /proc/sys/kernel/threads-max

240484


"unable to create new native thread" clearly means it has nothing to do with heap. so I believe this is more of a OS issue.

Answer

There seems to be a bug in the usage of ForkJoinPool in Spark 2.0.0. Specifically in UnionRDD which is used when you are calling a window operation on Dstream.

https://issues.apache.org/jira/browse/SPARK-17396 so according to this ticket I upgraded to 2.0.1 and it fixed the issue.

Comments