Mullefa Mullefa - 1 year ago 21
Scala Question

akka: pattern for combining messages from multiple children

Here's the pattern I have come across:

An actor

A
has multiple children
C1
, ...,
Cn
. On receiving a message,
A
sends it to each of its children, which each do some calculation on the message, and on completion send it back to
A
.
A
would then like to combine the results of all the children to pass onto another actor.

What would a solution for this problem look like? Or is this an anti-pattern? In which case how should this problem be approached?

Here is a trivial example which hopefully illustrates my current solution. My concerns are that is duplicates code (up to symmetry); does not extend very well to 'lots' of children; and makes it quite hard to see what's going on.

import akka.actor.{Props, Actor}

case class Tagged[T](value: T, id: Int)

class A extends Actor {
import C1._
import C2._

val c1 = context.actorOf(Props[C1], "C1")
val c2 = context.actorOf(Props[C2], "C2")
var uid = 0
var c1Results = Map[Int, Int]()
var c2Results = Map[Int, Int]()

def receive = {
case n: Int => {
c1 ! Tagged(n, uid)
c2 ! Tagged(n, uid)
uid += 1
}
case Tagged(C1Result(n), id) => c2Results get id match {
case None => c1Results += (id -> n)
case Some(m) => {
c2Results -= id
context.parent ! (n, m)
}
}
case Tagged(C2Result(n), id) => c1Results get id match {
case None => c2Results += (id -> n)
case Some(m) => {
c1Results -= id
context.parent ! (m, n)
}
}
}
}

class C1 extends Actor {
import C1._

def receive = {
case Tagged(n: Int, id) => Tagged(C1Result(n), id)
}
}

object C1 {
case class C1Result(n: Int)
}

class C2 extends Actor {
import C2._

def receive = {
case Tagged(n: Int, id) => Tagged(C2Result(n), id)
}
}

object C2 {
case class C2Result(n: Int)
}


If you think the code looks god-awful, take it easy on me, I've just started learning akka ;)

Answer Source

In the case of many - or a varying number of - child actors, the ask pattern suggested by Zim-Zam will quickly get out of hand.

The aggregator pattern is designed to help with this kind of situation. It provides an Aggregator trait that you can use in an actor to perform your aggregation logic.

A client actor wanting to perform an aggregation can start an Aggregator based actor instance and send it a message that will kick off the aggregation process.

A new aggregator should be created for each aggregation operation and terminate on sending back the result (when it has received all responses or on a timeout).

An example of this pattern to sum integer values held by the actors represented by the Child class is listed below. (Note that there is no need for them to all be children supervised by the same parent actor: the SummationAggregator just needs a collection of ActorRefs.)

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

import akka.actor._
import akka.contrib.pattern.Aggregator

object Child {
  def props(value: Int): Props = Props(new Child(value))

  case object GetValue
  case class GetValueResult(value: Int)
}

class Child(value: Int) extends Actor {
  import Child._

  def receive = { case GetValue => sender ! GetValueResult(value) }
}

object SummationAggregator {
  def props = Props(new SummationAggregator)

  case object TimedOut
  case class StartAggregation(targets: Seq[ActorRef])
  case object BadCommand
  case class AggregationResult(sum: Int)
}

class SummationAggregator extends Actor with Aggregator {
  import Child._
  import SummationAggregator._

  expectOnce {
    case StartAggregation(targets) =>
      // Could do what this handler does in line but handing off to a 
      // separate class encapsulates the state a little more cleanly
      new Handler(targets, sender())
    case _ =>
      sender ! BadCommand
      context stop self
  }

  class Handler(targets: Seq[ActorRef], originalSender: ActorRef) {
    // Could just store a running total and keep track of the number of responses 
    // that we are awaiting...
    var valueResults = Set.empty[GetValueResult]

    context.system.scheduler.scheduleOnce(1.second, self, TimedOut)

    expect {
      case TimedOut =>
        // It might make sense to respond with what we have so far if some responses are still awaited...
        respondIfDone(respondAnyway = true)
    }

    if (targets.isEmpty)
      respondIfDone()
    else
      targets.foreach { t =>
        t ! GetValue
        expectOnce {
          case vr: GetValueResult =>
            valueResults += vr
            respondIfDone()
        }
      }

    def respondIfDone(respondAnyway: Boolean = false) = {
      if (respondAnyway || valueResults.size == targets.size) {
        originalSender ! AggregationResult(valueResults.foldLeft(0) { case (acc, GetValueResult(v)) => acc + v })
        context stop self
      }
    }
  }
}

To use this SummationAggregator from your parent actor you could do:

context.actorOf(SummationAggregator.props) ! StartAggregation(children)

and then handle AggregationResult somewhere in the parent's receive.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download