Thiago Pereira Thiago Pereira - 1 year ago 116
Scala Question

How to bind akka http with akka streams?

I'm trying to use streams instead of pure actors to handle http requests and I came with the following code:

trait ImagesRoute {

val log = LoggerFactory.getLogger(this.getClass)

implicit def actorRefFactory: ActorRefFactory
implicit def materializer: ActorMaterializer

val source =

val route = {
pathPrefix("images") {
pathEnd {
post {
entity(as[Image]) { image =>

val (ref, publisher) =

val addFuture = Source.fromPublisher(publisher)

val future = addFuture.runWith(Sink.head[Option[Image]])

ref ! image

onComplete(future.mapTo[Option[Image]]) {
case Success(img) =>
complete(Created, img)

case Failure(e) =>
log.error("Error adding image resource", e)
complete(InternalServerError, e.getMessage)

I'm not sure if this is the correct way to do that, or even if this is a good approach or if I should use an actor to interact with the route, using the ask pattern and then inside the actor, stream everything.

Any ideas?

Answer Source

If you're only expecting 1 image from the entity then you don't need to create a Source from an ActorRef and you don't need Sink.asPublisher, you can simply use Source.single:

def imageToComplete(img : Option[Image]) : StandardRoute = => complete(Created, i))
     .getOrElse {
       log error ("Error adding image resource", e)
       complete(InternalServerError, e.getMessage


entity(as[Image]) { image =>

  val future : Future[StandardRoute] = 


Simplifying your code further, the fact that you are only processing 1 image means that Streams are unnecessary since there is no need for backpressure with just 1 element:

val future : Future[StandardRoute] = ImageRepository.add(image)


In the comments you indicated

"this is just a simple example, but the stream pipeline should be bigger doing a lot of things like contacting external resources and eventually back pressure things"

This would only apply if your entity was a stream of images. If you're only ever processing 1 image per HttpRequest then backpressure never applies, and any stream you create will be a slower version of a Future.

If your entity is in fact a stream of Images, then you could use it as part of stream:

val byteStrToImage : Flow[ByteString, Image, _] = ???

val imageToByteStr : Flow[Image, Source[ByteString], _] = ???

def imageOptToSource(img : Option[Image]) : Source[Image,_] =
  Source fromIterator img.toIterator

val route = path("images") {
  post {
    extractRequestEntity { reqEntity =>

      val stream = reqEntity.via(byteStrToImage)

      complete(HttpResponse(status=Created,entity = stream))
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download