Balaji Reddy Balaji Reddy - 1 month ago 19
Scala Question

Scala Future concurrency Issue

Following is my class where I run tasks concurrently. My problem is , my application never ends even after getting result for all the features. I suspect Thread pool is not shutting down which leads my application alive even after my tasks.Believe me I googled alot to figure it out but no luck. What I'm missing here?

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.collection.mutable.ListBuffer
import scala.util.Failure
import scala.util.Success

object AppLauncher{

def launchAll(): ListBuffer[Future[String]] = {
// My code logic where I launch all my threads say 50
null
}

def main(args:Array[String]):Unit= {
register(launchAll())
}



def register(futureList: ListBuffer[Future[String]]): Unit =
{
futureList.foreach { future =>
{
future.onComplete {
case Success(successResult) => {
println(successResult)
}
case Failure(failureResult) => { println(failureResult) }
}
}
}
}
}

Answer

Finally I was able to figure out the issue.The issue is certianly because of Thread Pool was not terminated even after my futures were completed successfully . I tried to isolate the issue by changing my implementation slightly as below.

//import scala.concurrent.ExecutionContext.Implicits.global

  import scala.concurrent.Future
  import scala.collection.mutable.ListBuffer
  import scala.util.Failure
  import scala.util.Success

  //Added ExecutionContex explicitly 
  import java.util.concurrent.Executors
  import concurrent.ExecutionContext

  object AppLauncher {

    //Implemented EC explicitly 
    private val pool = Executors.newFixedThreadPool(1000)
    private implicit val executionContext = ExecutionContext.fromExecutorService(pool)

    def launchAll(): ListBuffer[Future[String]] = {
      // My code logic where I launch all my threads say 50
      null
    }

    def main(args: Array[String]): Unit = {
      register(launchAll())
    }

    def register(futureList: ListBuffer[Future[String]]): Unit =
      {
        futureList.foreach { future =>
          {

            println("Waiting...")
            val result = Await.result(future, scala.concurrent.duration.Duration.Inf)

            println(result)

          }

        }
        pool.shutdownNow()
        executionContext.shutdownNow()
        println(pool.isTerminated() + " Pool terminated")
        println(pool.isShutdown() + " Pool shutdown")

        println(executionContext.isTerminated() + " executionContext terminated")
        println(executionContext.isShutdown() + " executionContext shutdown")
      }

  }

Result before adding highlighted code to shutdown pools

false Pool terminated

true Pool shutdown

false executionContext terminated

true executionContext shutdown

After adding highlighted code solved my issue. I ensured no resource leak in my code.My scenario permits me to kill the pool when all the futures are done. I'm aware of the fact that I changed elegant callback implementation to blocking implementation but still it solved my problem.