Obszczymucha Obszczymucha - 1 month ago 6
Scala Question

Inconsistent behaviour when an exception is thrown in a for comprehension

I noticed that for comprehension behaves inconsistently when an exception is thrown in the first line inside it versus any other line.

Consider the following example code:

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success}

object ForComprehensionTester {
def main(args: Array[String]): Unit = {
val result: Future[String] = for {
x <- returnString() // Comment out this line and exception will not be captured, but propagated.
y <- throwException()
} yield y

result.onComplete {
case Success(s) => System.out.println(s"Success: $s")
case Failure(t) => System.out.println(s"Exception captured!")
}

Thread.sleep(2000)
}

def returnString() = Future {
"content"
}

def throwException(): Future[String] = throw new RuntimeException
}


The above code results in the exception being captured and handled in onComplete function. But if we comment out line 8, the exception will be propagated instead.

Can someone explain what's going on?

Answer

for comprehension is the syntax sugar. You can understand the behaviour if you think in terms of map and flatMap

val result: Future[String] = for {
      x <- returnString() // Comment out this line and exception will not be captured, but propagated.
      y <- throwException()
    } yield y

the above code is converted to

returnString.flatMap { str =>
  throwException()
}

Here is the flatMap implementation from the standard library. flatMap handles the exception thrown inside it.

def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = {
    import impl.Promise.DefaultPromise
    val p = new DefaultPromise[S]()
    onComplete {
      case f: Failure[_] => p complete f.asInstanceOf[Failure[S]]
      case Success(v) => try f(v) match {
        // If possible, link DefaultPromises to avoid space leaks
        case dp: DefaultPromise[_] => dp.asInstanceOf[DefaultPromise[S]].linkRootOf(p)
        case fut => fut.onComplete(p.complete)(internalExecutor)
      } catch { case NonFatal(t) => p failure t }
    }
    p.future
  }

When you comment out the returnString for comprehension becomes

throwException() 

Note that exception thrown by the throwException is not handled in the function.

Declare throwException like below to capture the exception

def throwException(): Future[String] = Future { throw new RuntimeException }