mig-foxbat mig-foxbat - 2 months ago 8
Scala Question

Scala Future/Promise pipeline

I want to launch two or more Future/Promises in parallel and fail even if one of the launched Future/Promise fails and dont want to wait for the rest to complete.
What is the most idiomatic way to compose this pipeline in Scala.

EDIT: more contextual information.

I have to launch two external processes one writing to a fifo file and another reading from it. Say if the writer process fails; the reader thread might hang forever waiting for any input from the file. So I would want to launch both the processes in parallel and fail fast even if one of the Future/Promise fails without waiting for the completion of the other.

Below is the sample code to be more precise. the commands are not exactly

cat
and
tail
. I have used them for brevity.

val future1 = Future { executeShellCommand("cat file.txt > fifo.pipe") }
val future2 = Future { executeShellCommand("tail fifo.pipe") }

Answer

If I understand the question correctly, what we are looking for is a fail-fast sequence implementation, which is akin to a failure-biased version of firstCompletedOf

Here, we eagerly register a failure callback in case one of the futures fails early on, ensuring that we fail as soon as any of the futures fail.

import scala.concurrent.{Future, Promise}
import scala.util.{Success, Failure}
import scala.concurrent.ExecutionContext.Implicits.global
def failFast[T](futures: Seq[Future[T]]): Future[Seq[T]] = {
  val promise = Promise[Seq[T]]
  futures.foreach{f => f.onFailure{case ex => promise.failure(ex)}}
  val res = Future.sequence(futures)
  promise.completeWith(res).future
}

In contrast to Future.sequence, this implementation will fail as soon as any of the futures fail, regardless of ordering. Let's show that with an example:

import scala.util.Try
// help method to measure time
def resilientTime[T](t: =>T):(Try[T], Long) = {
  val t0 = System.currentTimeMillis
  val res = Try(t)
  (res, System.currentTimeMillis-t0)
}

import scala.concurrent.duration._
import scala.concurrent.Await

First future will fail (failure in 2 seconds)

val f1 = Future[Int]{Thread.sleep(2000); throw new Exception("boom")}
val f2 = Future[Int]{Thread.sleep(5000); 42}
val f3 = Future[Int]{Thread.sleep(10000); 101}
val res = failFast(Seq(f1,f2,f3))

resilientTime(Await.result(res, 10.seconds))
// res: (scala.util.Try[Seq[Int]], Long) = (Failure(java.lang.Exception: boom),1998)

Last future will fail. Failure also in 2 seconds. (note the order in the sequence construction)

val f1 = Future[Int]{Thread.sleep(2000); throw new Exception("boom")}
val f2 = Future[Int]{Thread.sleep(5000); 42}
val f3 = Future[Int]{Thread.sleep(10000); 101}
val res = failFast(Seq(f3,f2,f1))

resilientTime(Await.result(res, 10.seconds))
// res: (scala.util.Try[Seq[Int]], Long) = (Failure(java.lang.Exception: boom),1998)

Comparing with Future.sequence where failure depends on the ordering (failure in 10 seconds):

val f1 = Future[Int]{Thread.sleep(2000); throw new Exception("boom")}
val f2 = Future[Int]{Thread.sleep(5000); 42}
val f3 = Future[Int]{Thread.sleep(10000); 101}
val seq = Seq(f3,f2,f1)

resilientTime(Await.result(Future.sequence(seq), 10.seconds))
//res: (scala.util.Try[Seq[Int]], Long) = (Failure(java.lang.Exception: boom),10000)
Comments