Sammyrulez Sammyrulez - 1 month ago 30
Scala Question

How to consume grouped sub streams with mapAsync in akka streams

I need to do something really similar to this https://github.com/typesafehub/activator-akka-stream-scala/blob/master/src/main/scala/sample/stream/GroupLogFile.scala

my problem is that I have an unknown number of groups and if the number of parallelism of the mapAsync is less of the number of groups i got and error in the last sink


Tearing down
SynchronousFileSink(/Users/sam/dev/projects/akka-streams/target/log-ERROR.txt)
due to upstream error
(akka.stream.impl.StreamSubscriptionTimeoutSupport$$anon$2)


I tried to put a buffer in the middle as suggested in the pattern guide of akka streams http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-cookbook.html

groupBy {
case LoglevelPattern(level) => level
case other => "OTHER"
}.buffer(1000, OverflowStrategy.backpressure).
// write lines of each group to a separate file
mapAsync(parallelism = 2) {....


but with the same result

Answer

Expanding on jrudolph's comment which is completely correct...

You do not need a mapAsync in this instance. As a basic example, suppose you have a source of tuples

import akka.stream.scaladsl.{Source, Sink}

def data() = List(("foo", 1),
                  ("foo", 2),
                  ("bar", 1),
                  ("foo", 3),
                  ("bar", 2))

val originalSource = Source(data)

You can then perform a groupBy to create a Source of Sources

def getID(tuple : (String, Int)) = tuple._1

//a Source of (String, Source[(String, Int),_])
val groupedSource = originalSource groupBy getID

Each one of the grouped Sources can be processed in parallel with just a map, no need for anything fancy. Here is an example of each grouping being summed in an independent stream:

import akka.actor.ActorSystem
import akka.stream.ACtorMaterializer

implicit val actorSystem = ActorSystem()
implicit val mat = ActorMaterializer()
import actorSystem.dispatcher

def getValues(tuple : (String, Int)) = tuple._2

//does not have to be a def, we can re-use the same sink over-and-over
val sumSink = Sink.fold[Int,Int](0)(_ + _)

//a Source of (String, Future[Int])
val sumSource  = 
  groupedSource map { case (id, src) => 
    id -> {src map getValues runWith sumSink} //calculate sum in independent stream
  }

Now all of the "foo" numbers are being summed in parallel with all of the "bar" numbers.

mapAsync is used when you have a encapsulated function that returns a Future[T] and you're trying to emit a T instead; which is not the case in you question. Further, mapAsync involves waiting for results which is not reactive...