如何使用 Akka 远程处理通过 CLI 向远程参与者发送消息?
How can I send messages to a remote actor via CLI with Akka remoting?
我有一个远程演员 Bar
和一个本地演员 Foo
。我想在每次调用 CLI 时使用 Foo
将消息传递给 Bar
。
Bar
可以成功传递消息,但是Foo
在等待消息时挂起。为了解决这个问题,我在 Foo
的 main 末尾添加了一个 sys.exit(0)
。这会导致与 Foo
的系统发生关联问题。
如何在连续的 CLI 发布之间关闭我的本地 actor 而无需手动杀死我的本地 actor?
闭嘴给我密码!
富:
build.sbt
name := "Foo"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.11"
libraryDependencies += "com.typesafe.akka" %% "akka-remote" % "2.4.11"
libraryDependencies += "com.github.scopt" %% "scopt" % "3.5.0"
fork in run := true
Main.scala
import akka.actor._
import com.typesafe.config.ConfigFactory
case class Config(mode: String = "", greeting: String="")
class Foo extends Actor {
// create the remote actor
val BarActor = context.actorSelection("akka.tcp://BarSystem@127.0.0.1:2552/user/BarActor")
def receive = {
case method: String => BarActor ! method
}
}
object CommandLineInterface {
val config = ConfigFactory.load()
val system = ActorSystem("FooSystem", config.getConfig("FooApp"))
val FooActor = system.actorOf(Props[Foo], name = "FooActor")
val parser = new scopt.OptionParser[Config]("Foo") {
head("foo", "1.x")
help("help").text("prints usage text")
opt[String]('m', "method").action( (x, c) =>
c.copy(greeting = x) ).text("Bar will greet with <method>")
}
}
object Main extends App {
import CommandLineInterface.{parser, FooActor}
parser.parse(args, Config()) match {
case Some(config) => FooActor ! config.greeting
case None => sys.error("Bad news...")
}
/*
When sys.exit(0) commented, this hangs and Bar greet.
When sys.exit(0) uncommented, this doesn't hang, but also Bar doesn't greet.
*/
//sys.exit(0)
}
application.conf
FooApp {
akka {
loglevel = "INFO"
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
log-sent-messages = on
log-received-messages = on
}
}
}
栏目:
build.sbt
name := "Bar"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.11"
libraryDependencies += "com.typesafe.akka" %% "akka-remote" % "2.4.11"
Main.scala
import akka.actor._
import com.typesafe.config.ConfigFactory
class Bar extends Actor {
def receive = {
case greeting: String => Bar.greet(greeting)
}
}
object Bar {
val config = ConfigFactory.load()
val system = ActorSystem("BarSystem", config.getConfig("BarApp"))
val BarActor = system.actorOf(Props[Bar], name = "BarActor")
def greet(greeting: String) = println(greeting)
def main(args: Array[String]): Unit = {
/* Intentionally empty */
}
}
application.conf
BarApp {
akka {
loglevel = "INFO"
actor {
provider = remote
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2552
}
log-sent-messages = on
log-received-messages = on
}
}
}
运行 Foo
与 sbt 'run-main Main -m hello'
和 运行 Bar
与 sbt 'run-main Main'
.
抱歉,代码很长,但它是解决我的问题的 MVCE。
我怎样才能实现我想要的行为 -- CLI actor 在连续的 CLI 调用之间死掉,远程 actor 等待新消息。
发生这种情况是因为您在向 FooActor
发送消息后立即调用 sys.exit(0)
,因此应用程序很有可能在 FooActor
有机会读取消息之前退出消息,更不用说转发给 BarActor
.
似乎有many possible solutions,其中之一是:
class Foo extends Actor {
// create the remote actor
val BarActor = context.actorSelection("akka.tcp://BarSystem@127.0.0.1:2552/user/BarActor")
override def receive = {
case method: String => {
BarActor ! method
self ! PoisonPill
}
}
override def postStop = {
context.system.terminate
}
}
不幸的是,系统在将消息发送到 Bar
之前仍然关闭了。
如果您想以 "fire and forget" 样式发送消息,我找不到任何合理的解决方案来解决此问题。但是,在大多数情况下,希望从远程参与者那里得到某种响应,因此您可以这样做:
class Foo extends Actor {
// create the remote actor
val BarActor = context.actorSelection("akka.tcp://BarSystem@127.0.0.1:2552/user/BarActor")
override def receive = {
case method: String => {
BarActor ! method
context.become(waitingToKillMyself)
}
}
def waitingToKillMyself: Receive = {
case response: String => {
println(response)
self ! PoisonPill
}
}
override def postStop = {
context.system.terminate
}
}
// ...
object Main extends App {
import CommandLineInterface.{parser, FooActor, system}
import system.dispatcher
parser.parse(args, Config()) match {
case Some(config) => {
FooActor ! config.greeting
system.scheduler.scheduleOnce(10.seconds, FooActor, PoisonPill)
}
case None => sys.error("Bad news...")
}
}
酒吧:
class Bar extends Actor {
def receive = {
case greeting: String => {
Bar.greet(greeting)
sender() ! "OK"
}
}
}
我有一个远程演员 Bar
和一个本地演员 Foo
。我想在每次调用 CLI 时使用 Foo
将消息传递给 Bar
。
Bar
可以成功传递消息,但是Foo
在等待消息时挂起。为了解决这个问题,我在 Foo
的 main 末尾添加了一个 sys.exit(0)
。这会导致与 Foo
的系统发生关联问题。
如何在连续的 CLI 发布之间关闭我的本地 actor 而无需手动杀死我的本地 actor?
闭嘴给我密码!
富:
build.sbt
name := "Foo"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.11"
libraryDependencies += "com.typesafe.akka" %% "akka-remote" % "2.4.11"
libraryDependencies += "com.github.scopt" %% "scopt" % "3.5.0"
fork in run := true
Main.scala
import akka.actor._
import com.typesafe.config.ConfigFactory
case class Config(mode: String = "", greeting: String="")
class Foo extends Actor {
// create the remote actor
val BarActor = context.actorSelection("akka.tcp://BarSystem@127.0.0.1:2552/user/BarActor")
def receive = {
case method: String => BarActor ! method
}
}
object CommandLineInterface {
val config = ConfigFactory.load()
val system = ActorSystem("FooSystem", config.getConfig("FooApp"))
val FooActor = system.actorOf(Props[Foo], name = "FooActor")
val parser = new scopt.OptionParser[Config]("Foo") {
head("foo", "1.x")
help("help").text("prints usage text")
opt[String]('m', "method").action( (x, c) =>
c.copy(greeting = x) ).text("Bar will greet with <method>")
}
}
object Main extends App {
import CommandLineInterface.{parser, FooActor}
parser.parse(args, Config()) match {
case Some(config) => FooActor ! config.greeting
case None => sys.error("Bad news...")
}
/*
When sys.exit(0) commented, this hangs and Bar greet.
When sys.exit(0) uncommented, this doesn't hang, but also Bar doesn't greet.
*/
//sys.exit(0)
}
application.conf
FooApp {
akka {
loglevel = "INFO"
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
log-sent-messages = on
log-received-messages = on
}
}
}
栏目:
build.sbt
name := "Bar"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.11"
libraryDependencies += "com.typesafe.akka" %% "akka-remote" % "2.4.11"
Main.scala
import akka.actor._
import com.typesafe.config.ConfigFactory
class Bar extends Actor {
def receive = {
case greeting: String => Bar.greet(greeting)
}
}
object Bar {
val config = ConfigFactory.load()
val system = ActorSystem("BarSystem", config.getConfig("BarApp"))
val BarActor = system.actorOf(Props[Bar], name = "BarActor")
def greet(greeting: String) = println(greeting)
def main(args: Array[String]): Unit = {
/* Intentionally empty */
}
}
application.conf
BarApp {
akka {
loglevel = "INFO"
actor {
provider = remote
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2552
}
log-sent-messages = on
log-received-messages = on
}
}
}
运行 Foo
与 sbt 'run-main Main -m hello'
和 运行 Bar
与 sbt 'run-main Main'
.
抱歉,代码很长,但它是解决我的问题的 MVCE。
我怎样才能实现我想要的行为 -- CLI actor 在连续的 CLI 调用之间死掉,远程 actor 等待新消息。
发生这种情况是因为您在向 FooActor
发送消息后立即调用 sys.exit(0)
,因此应用程序很有可能在 FooActor
有机会读取消息之前退出消息,更不用说转发给 BarActor
.
似乎有many possible solutions,其中之一是:
class Foo extends Actor {
// create the remote actor
val BarActor = context.actorSelection("akka.tcp://BarSystem@127.0.0.1:2552/user/BarActor")
override def receive = {
case method: String => {
BarActor ! method
self ! PoisonPill
}
}
override def postStop = {
context.system.terminate
}
}
不幸的是,系统在将消息发送到 Bar
之前仍然关闭了。
如果您想以 "fire and forget" 样式发送消息,我找不到任何合理的解决方案来解决此问题。但是,在大多数情况下,希望从远程参与者那里得到某种响应,因此您可以这样做:
class Foo extends Actor {
// create the remote actor
val BarActor = context.actorSelection("akka.tcp://BarSystem@127.0.0.1:2552/user/BarActor")
override def receive = {
case method: String => {
BarActor ! method
context.become(waitingToKillMyself)
}
}
def waitingToKillMyself: Receive = {
case response: String => {
println(response)
self ! PoisonPill
}
}
override def postStop = {
context.system.terminate
}
}
// ...
object Main extends App {
import CommandLineInterface.{parser, FooActor, system}
import system.dispatcher
parser.parse(args, Config()) match {
case Some(config) => {
FooActor ! config.greeting
system.scheduler.scheduleOnce(10.seconds, FooActor, PoisonPill)
}
case None => sys.error("Bad news...")
}
}
酒吧:
class Bar extends Actor {
def receive = {
case greeting: String => {
Bar.greet(greeting)
sender() ! "OK"
}
}
}