Alex Gilleran Alex Gilleran - 1 month ago 15
Scala Question

Create backpressure from a Future inside an Akka stream

I'm new to Akka streams and streams in general so I might have completely misunderstood something at a conceptual level, but is there any way I can create backpressure until a future resolves? Essentially what I want to do is like this:

object Parser {
def parseBytesToSeq(buffer: ByteBuffer): Seq[ExampleObject] = ???
}

val futures = FileIO.fromPath(path)
.map(st => Parser.parseBytesToSeq(st.toByteBuffer))
.batch(1000, x => x)(_ ++ _)
.map(values => doAsyncOp(values))
.runWith(Sink.seq)

def doAsyncOp(Seq[ExampleObject]) : Future[Any] = ???


Bytes are read from a file and streamed to the parser, which emits
Seq
s of
ExampleObject
s, and those are streamed to an async operation that returns a
Future
. I want to make it so that until the
Future
resolves, the rest of the stream gets backpressured, then resumes once the Future is resolved, passing another
Seq[ExampleObject]
to
doAsyncOp
, which resumes the backpressure and so on.

Right now I've got this working with:

Await.result(doAsyncOp(values), 10 seconds)


But my understanding is that this locks up the entire thread and is bad. Is there any better way around it?

If it helps, the big picture is that I'm trying to parse an extremely large JSON file (too big to fit in memory) chunk-by-chunk with Jawn, then pass objects to ElasticSearch to be indexed as they're parsed - ElasticSearch has a queue of 50 pending operations, if that overflows it starts rejecting new objects.

Answer

It's quite easy. You need to use mapAync :)

val futures = FileIO.fromPath(path)
  .map(st => Parser.parseBytesToSeq(st.toByteBuffer))
  .batch(1000, x => x)(_ ++ _)
  .mapAsync(4)(values => doAsyncOp(values))
  .runWith(Sink.seq)

where 4 is level of parallelism.

Comments