j3d j3d - 1 month ago 11
Scala Question

For comprehension: how to run Futures sequentially

Given the following methods...

def doSomething1: Future[Int] = { ... }
def doSomething2: Future[Int] = { ... }
def doSomething3: Future[Int] = { ... }


... and the following for-comprehension:

for {
x <- doSomething1
y <- doSomething2
z <- doSomething3
} yield x + y + z


The three methods run in parallel, but in my case
doSomething2
MUST run after
doSomething1
has finished. How do I run the three methods in sequence?

EDIT

As suggested by Philosophus42, here below is a possible implementation of
doSomething1
:

def doSomething1: Future[Int] = {
// query the database for customers younger than 40;
// `find` returns a `Future` containing the number of matches
customerService.find(Json.obj("age" -> Json.obj("$lt" -> 40)))
}


... so the
Future
is created by an internal call to another method.

EDIT 2

Perhaps I simplified the use case too much... and I'm sorry. Let's try again and go closer to the real use-case. Here are the three methods:

for {
// get all the transactions generated by the exchange service
transactions <- exchange.orderTransactions(orderId)

//for each transaction create a log
logs <- Future.sequence(tansactions.map { transaction =>
for {
// update trading order status
_ <- orderService.findAndUpdate(transaction.orderId, "Executed")

// create new log
log <- logService.insert(Log(
transactionId => transaction.id,
orderId => transaction.orderId,
...
))
} yield log
})
} yield logs


What I'm trying to do is to create a log for each transaction associated with an order.
logService.insert
gets invoked many times even if
transactions
just contains one entry.

Answer

Comment on your post

First, how does the code inside doSomethingX look like? Even more irrated, that with your given code, the futures run parallel.

Answer

In order to make the Future execution sequential, just use

for {
  v1 <- Future { ..block1... } 
  v2 <- Future { ..block2... } 
} yield combine(v1, v2)

The reason this works, is that the statement Future { ..body.. } starts asynchronous computation, at that point in time the statement is evaluated.

With the above for-comprehension desugared

Future { ..block1.. }
  .flatMap( v1 => 
     Future { ..block>.. }
       .map( v2 => combine(v1,v2) )
  )

it is obvious, that

  • if Future{ ...block1... } has it's result available,
  • the flatMap method is triggered, which
  • then triggers execution of Future { ...block2... }.

Thus Future { ...block2... } is executed after Future { ...block1... }

Additional information

A Future

Future { 
  <block> 
} 

immediately triggers execution of contained block via the ExecutionContext

Snippet 1:

val f1 = Future { <body> }
val f2 = Future { <otherbody> }

The two computations are running parallel (in case your ExecutionContext is setup this way), as the two values are evaluated immediately.

Snippet 2:

The construct

def f1 = Future { ..... }

will start execution of the future, once f1 is called

Edit:

j3d, I'm still confused, why your code does not work as expected, if your statement is correct, that the Future is created within the computeSomethingX methods.

Here is a code snippet that proves, that computeSomething2 is executed after computeSomething1

import scala.concurrent.{Await, Future} import scala.concurrent.duration._

object Playground {

  import scala.concurrent.ExecutionContext.Implicits.global

  def computeSomething1 : Future[Int] = {
    Future {
      for (i <- 1 to 10) {
        println("computeSomething1")
        Thread.sleep(500)
      }
      10
    }
  }

  def computeSomething2 : Future[String] = {
    Future {
      for(i <- 1 to 10) {
        println("computeSomething2")
        Thread.sleep(800)
      }
      "hello"
    }
  }

  def main(args: Array[String]) : Unit = {

    val resultFuture: Future[String] = for {
      v1 <- computeSomething1
      v2 <- computeSomething2
    } yield v2 + v1.toString

    // evil "wait" for result

    val result = Await.result(resultFuture, Duration.Inf)

    println( s"Result: ${result}")
  }
}

with output

computeSomething1
computeSomething1
computeSomething1
computeSomething1
computeSomething1
computeSomething1
computeSomething1
computeSomething1
computeSomething1
computeSomething1
computeSomething2
computeSomething2
computeSomething2
computeSomething2
computeSomething2
computeSomething2
computeSomething2
computeSomething2
computeSomething2
computeSomething2
Result: hello10

Edit 2

If you want them to be executed in parallel, create the futures beforehand (here f1 and f2)

def main(args: Array[String]) : Unit = {
  val f1 = computeSomething1
  val f2 = computeSomething2

  val resultFuture: Future[String] = for {
    v1 <- f1
    v2 <- f2
  } yield v2 + v1.toString

  // evil "wait" for result

  val result = Await.result(resultFuture, Duration.Inf)

  println( s"Result: ${result}")
}
Comments