erip erip - 16 days ago 5
Scala Question

How can I send messages to a remote actor via CLI with Akka remoting?

I have a remote actor,

Bar
and a local actor,
Foo
. I want to use
Foo
to pass messages to
Bar
on each invocation of a CLI.

Bar
can be passed messages successfully, but
Foo
hangs while waiting for a message. To fix this, I added a
sys.exit(0)
at the end of
Foo
's main. This causes an association issue with
Foo
's system.

How can I shut down my local actor between successive CLI issuances without killing my local actor manually?

Shut up and give me the code!






Foo:




build.sbt




name := "Foo"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.11"
libraryDependencies += "com.typesafe.akka" %% "akka-remote" % "2.4.11"
libraryDependencies += "com.github.scopt" %% "scopt" % "3.5.0"

fork in run := true



Main.scala




import akka.actor._
import com.typesafe.config.ConfigFactory

case class Config(mode: String = "", greeting: String="")

class Foo extends Actor {
// create the remote actor
val BarActor = context.actorSelection("akka.tcp://BarSystem@127.0.0.1:2552/user/BarActor")

def receive = {
case method: String => BarActor ! method
}
}

object CommandLineInterface {

val config = ConfigFactory.load()
val system = ActorSystem("FooSystem", config.getConfig("FooApp"))

val FooActor = system.actorOf(Props[Foo], name = "FooActor")

val parser = new scopt.OptionParser[Config]("Foo") {
head("foo", "1.x")

help("help").text("prints usage text")

opt[String]('m', "method").action( (x, c) =>
c.copy(greeting = x) ).text("Bar will greet with <method>")
}
}

object Main extends App {
import CommandLineInterface.{parser, FooActor}

parser.parse(args, Config()) match {
case Some(config) => FooActor ! config.greeting
case None => sys.error("Bad news...")
}
/*
When sys.exit(0) commented, this hangs and Bar greet.
When sys.exit(0) uncommented, this doesn't hang, but also Bar doesn't greet.
*/

//sys.exit(0)
}



application.conf




FooApp {
akka {
loglevel = "INFO"
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
log-sent-messages = on
log-received-messages = on
}
}
}





Bar:




build.sbt




name := "Bar"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.11"
libraryDependencies += "com.typesafe.akka" %% "akka-remote" % "2.4.11"



Main.scala




import akka.actor._
import com.typesafe.config.ConfigFactory

class Bar extends Actor {
def receive = {
case greeting: String => Bar.greet(greeting)
}
}

object Bar {
val config = ConfigFactory.load()
val system = ActorSystem("BarSystem", config.getConfig("BarApp"))
val BarActor = system.actorOf(Props[Bar], name = "BarActor")

def greet(greeting: String) = println(greeting)

def main(args: Array[String]): Unit = {
/* Intentionally empty */
}
}



application.conf




BarApp {
akka {
loglevel = "INFO"
actor {
provider = remote
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2552
}
log-sent-messages = on
log-received-messages = on
}
}
}


Run
Foo
with
sbt 'run-main Main -m hello'
, and run
Bar
with
sbt 'run-main Main'
.

Sorry for the long code, but it's the MVCE for my problem.

How can I achieve my desired behavior -- the CLI actor dies between successive CLI invocations with the remote actor waiting for new messages.

Answer

This is happening because you call sys.exit(0) immediately after sending a message to FooActor, so there's a significant chance that the application exits before FooActor gets the chance to even read the message, let alone forward it to BarActor.

There seem to be many possible solutions, one of them being:

class Foo extends Actor {
  // create the remote actor
  val BarActor = context.actorSelection("akka.tcp://BarSystem@127.0.0.1:2552/user/BarActor")

  override def receive = {
    case method: String => {
      BarActor ! method
      self ! PoisonPill
    }
  }

  override def postStop = {
    context.system.terminate
  }
}

Unfortunately, it turns out that the system still gets shut down before dispatching the message to Bar.

I couldn't find any reasonable solution to this issue if you want to send a message in a "fire and forget" style. However, in most cases, it's desirable to get some kind of response from the remote actor, so you could do:

class Foo extends Actor {
  // create the remote actor
  val BarActor = context.actorSelection("akka.tcp://BarSystem@127.0.0.1:2552/user/BarActor")

  override def receive = {
    case method: String => {
      BarActor ! method
      context.become(waitingToKillMyself)
    }
  }

  def waitingToKillMyself: Receive = {
    case response: String => {
      println(response)
      self ! PoisonPill
    }
  }

  override def postStop = {
    context.system.terminate
  }
}

// ...

object Main extends App {
  import CommandLineInterface.{parser, FooActor, system}
  import system.dispatcher

  parser.parse(args, Config()) match {
    case Some(config) => {
      FooActor ! config.greeting
      system.scheduler.scheduleOnce(10.seconds, FooActor, PoisonPill)
    }

    case None => sys.error("Bad news...")
  }
}

Bar:

class Bar extends Actor {
  def receive = {
    case greeting: String => {
      Bar.greet(greeting)
      sender() ! "OK"
    }
  }
}