Samuel Samuel - 3 months ago 6
Scala Question

Scala - Batched Stream from Futures

I have instances of a case class

Thing
, and I have a bunch of queries to run that return a collection of
Thing
s like so:

def queries: Seq[Future[Seq[Thing]]]


I need to collect all
Thing
s from all futures (like above) and group them into equally sized collections of 10,000 so they can be serialized to files of 10,000
Thing
s.

def serializeThings(Seq[Thing]): Future[Unit]


I want it to be implemented in such a way that I don't wait for all queries to run before serializing. As soon as there are 10,000
Thing
s returned after the futures of the first queries complete, I want to start serializing.

If I do something like:

Future.sequence(queries)


It will collect the results of all the queries, but my understanding is that operations like
map
won't be invoked until all queries complete and all the
Thing
s must fit into memory at once.

What's the best way to implement a batched stream pipeline using Scala collections and concurrent libraries?

Answer

I think that I managed to make something. The solution is based on my previous answer. It collects results from Future[List[Thing]] results until it reaches a treshold of BatchSize. Then it calls serializeThings future, when it finishes, the loop continues with the rest.

object BatchFutures extends App {

  case class Thing(id: Int)

  def getFuture(id: Int): Future[List[Thing]] = {
    Future.successful {
      List.fill(3)(Thing(id))
    }
  }

  def serializeThings(things: Seq[Thing]): Future[Unit] = Future.successful {
    //Thread.sleep(2000)
    println("processing: " + things)
  }

  val ids = (1 to 4).toList
  val BatchSize = 5

  val future = ids.foldLeft(Future.successful[List[Thing]](Nil)) {
    case (acc, id) =>
      acc flatMap { processed =>
        getFuture(id) flatMap { res =>
          val all = processed ++ res
          val (batch, rest) = all.splitAt(5)

          if (batch.length == BatchSize) { // if futures filled the batch with needed amount
            serializeThings(batch) map { _ =>
              rest // process the rest
            }
          } else {
            Future.successful(all) //if we need more Things for a batch
          }
        }
      }
  }.flatMap { rest =>
    serializeThings(rest)
  }

  Await.result(future, Duration.Inf)

}

The result prints:

processing: List(Thing(1), Thing(1), Thing(1), Thing(2), Thing(2))
processing: List(Thing(2), Thing(3), Thing(3), Thing(3), Thing(4))
processing: List(Thing(4), Thing(4))

When the number of Things isn't divisible by BatchSize we have to call serializeThings once more(last flatMap). I hope it helps! :)