Samuel Samuel - 1 month ago 4x
Scala Question

Scala - Batched Stream from Futures

I have instances of a case class

, and I have a bunch of queries to run that return a collection of
s like so:

def queries: Seq[Future[Seq[Thing]]]

I need to collect all
s from all futures (like above) and group them into equally sized collections of 10,000 so they can be serialized to files of 10,000

def serializeThings(Seq[Thing]): Future[Unit]

I want it to be implemented in such a way that I don't wait for all queries to run before serializing. As soon as there are 10,000
s returned after the futures of the first queries complete, I want to start serializing.

If I do something like:


It will collect the results of all the queries, but my understanding is that operations like
won't be invoked until all queries complete and all the
s must fit into memory at once.

What's the best way to implement a batched stream pipeline using Scala collections and concurrent libraries?


I think that I managed to make something. The solution is based on my previous answer. It collects results from Future[List[Thing]] results until it reaches a treshold of BatchSize. Then it calls serializeThings future, when it finishes, the loop continues with the rest.

object BatchFutures extends App {

  case class Thing(id: Int)

  def getFuture(id: Int): Future[List[Thing]] = {
    Future.successful {

  def serializeThings(things: Seq[Thing]): Future[Unit] = Future.successful {
    println("processing: " + things)

  val ids = (1 to 4).toList
  val BatchSize = 5

  val future = ids.foldLeft(Future.successful[List[Thing]](Nil)) {
    case (acc, id) =>
      acc flatMap { processed =>
        getFuture(id) flatMap { res =>
          val all = processed ++ res
          val (batch, rest) = all.splitAt(5)

          if (batch.length == BatchSize) { // if futures filled the batch with needed amount
            serializeThings(batch) map { _ =>
              rest // process the rest
          } else {
            Future.successful(all) //if we need more Things for a batch
  }.flatMap { rest =>

  Await.result(future, Duration.Inf)


The result prints:

processing: List(Thing(1), Thing(1), Thing(1), Thing(2), Thing(2))
processing: List(Thing(2), Thing(3), Thing(3), Thing(3), Thing(4))
processing: List(Thing(4), Thing(4))

When the number of Things isn't divisible by BatchSize we have to call serializeThings once more(last flatMap). I hope it helps! :)