无法向 akka 远程参与者发送消息
Unable to send a message to an akka remote actor
我是 akka 远程处理的新手,我正在尝试简单地向远程参与者发送消息并在 return 中获得响应。我在本地主机上有 2 个 actor 系统——不同的端口:MasterSystem 和 WorkerSystem。我在 WorkerSystem 中创建了一个 actor 并尝试向其远程地址发送消息。但我不断收到 'dead letters encountered' 消息!将不胜感激任何帮助。谢谢!
MainMaster.java
package pi_swarm_approx;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.UntypedActor;
import com.typesafe.config.ConfigFactory;
public class MainMaster extends UntypedActor{
ActorSystem system;
ActorRef actor;
ActorSelection remoteActor;
public MainMaster() {
system = ActorSystem.create("MasterSystem", ConfigFactory.load("master"));
System.out.println("MasterSystem created");
MainWorker mw = new MainWorker();
System.out.println("MainWorker obj created");
remoteActor = mw.system.actorSelection("akka://WorkerSystem@localhost:2552/user/workerActor");
System.out.println("Remote actor created");
remoteActor.tell("hello", getSelf());
System.out.println("Message sent to remote actor");
}
public void onReceive(Object msg) {
if (msg != null) {
System.out.println("Got it back");
}
else {
unhandled(msg);
getContext().stop(getSelf());
}
}
}
MainWorker.java
package pi_swarm_approx;
import akka.actor.ActorSystem;
import akka.actor.ActorRef;
import com.typesafe.config.ConfigFactory;
import akka.actor.Props;
public class MainWorker {
ActorSystem system;
ActorRef actor;
public MainWorker() {
this.system = ActorSystem.create("WorkerSystem", ConfigFactory.load("worker"));
actor = system.actorOf(Props.create(Worker.class), "workerActor");
}
}
Worker.java
package pi_swarm_approx;
import akka.actor.UntypedActor;
public class Worker extends UntypedActor {
public void onReceive(Object msg) {
System.out.println("Worker actor got message");
if (msg != null) {
getSender().tell("Request processed", getSelf());
}
else {
unhandled(msg);
}
getContext().stop(getSelf());
}
}
master.conf
akka {
actor {
provider = "cluster"
}
remote {
transport = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "localhost"
port = 2551
}
}
clustering {
cluster.name = "MasterSystem"
role = "master"
}
}
worker.conf
akka {
actor {
provider = "cluster"
deployment {
/workerActor {
remote = "akka.tcp://WorkerSystem@localhost:2552"
}
}
}
输出
In main
MasterSystem created
MainWorker obj created
Remote actor created
Message sent to remote actor
[INFO] [11/22/2021 16:01:34.531] [WorkerSystem-akka.actor.default-dispatcher-5] [akka://WorkerSystem/deadLetters] Message [java.lang.String] from Actor[akka://Main/user/app#402333018] to Actor[akka://WorkerSystem/deadLetters] was not delivered. [1] dead letters encountered. If this is not an expected behavior, then [Actor[akka://WorkerSystem/deadLetters]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
<=========----> 75% EXECUTING [18s]
您发布的代码存在多个问题。我发布了一个最小的工作代码。
首先,您使用的是 akka.actor.UntypedActor
的弃用版本。这在 2.4.0
中已弃用。如果您正在使用 maven
相应地更改依赖项。在 java 11
.
上编译并 运行 代码
build.sbt
libraryDependencies ++=
Seq(
"com.typesafe.akka" %% "akka-actor" % "2.6.17",
"com.typesafe.akka" %% "akka-remote" % "2.6.17",
)
对于 provider
,我使用 remote
而不是 cluster
。您可以使用 cluster
,但请确保添加必要的依赖项。可以通过删除重复来进一步简化配置,但您可以在探索时这样做。
master.conf
akka {
actor {
provider = "remote"
}
remote {
artery {
enabled = on
transport = tcp
canonical {
hostname = "127.0.0.1"
port = 2552
}
}
}
}
worker.conf
akka {
actor {
provider = "remote"
}
remote {
artery {
enabled = on
transport = tcp
canonical {
hostname = "127.0.0.1"
port = 2551
}
}
}
}
MainMaster.java
import akka.actor.AbstractActor;
public class MainMaster extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
String.class,
System.out::println)
.matchAny(o -> System.out.println("received unknown message"))
.build();
}
}
Worker.java
public class Worker extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
String.class,
msg -> {
System.out.println(msg);
getSender().tell("Request processed", getSelf());
})
.matchAny(o -> System.out.println("received unknown message"))
.build();
}
}
主要Worker.java
public class MainWorker {
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("WorkerSystem", ConfigFactory.load("worker.conf"));
ActorRef actor = system.actorOf(Props.create(Worker.class), "workerActor");
System.out.println("worker started");
}
}
Main.java
public class Main {
public static void main(String[] args) {
System.out.println("In main");
ActorSystem system = ActorSystem.create("MasterSystem", ConfigFactory.load("master.conf"));
ActorRef master = system.actorOf(Props.create(MainMaster.class), "master");
ActorSelection remoteActor = system.actorSelection("akka://WorkerSystem@127.0.0.1:2551/user/workerActor");
remoteActor.tell("Hello Worker", master);
}
}
我是 akka 远程处理的新手,我正在尝试简单地向远程参与者发送消息并在 return 中获得响应。我在本地主机上有 2 个 actor 系统——不同的端口:MasterSystem 和 WorkerSystem。我在 WorkerSystem 中创建了一个 actor 并尝试向其远程地址发送消息。但我不断收到 'dead letters encountered' 消息!将不胜感激任何帮助。谢谢!
MainMaster.java
package pi_swarm_approx;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.UntypedActor;
import com.typesafe.config.ConfigFactory;
public class MainMaster extends UntypedActor{
ActorSystem system;
ActorRef actor;
ActorSelection remoteActor;
public MainMaster() {
system = ActorSystem.create("MasterSystem", ConfigFactory.load("master"));
System.out.println("MasterSystem created");
MainWorker mw = new MainWorker();
System.out.println("MainWorker obj created");
remoteActor = mw.system.actorSelection("akka://WorkerSystem@localhost:2552/user/workerActor");
System.out.println("Remote actor created");
remoteActor.tell("hello", getSelf());
System.out.println("Message sent to remote actor");
}
public void onReceive(Object msg) {
if (msg != null) {
System.out.println("Got it back");
}
else {
unhandled(msg);
getContext().stop(getSelf());
}
}
}
MainWorker.java
package pi_swarm_approx;
import akka.actor.ActorSystem;
import akka.actor.ActorRef;
import com.typesafe.config.ConfigFactory;
import akka.actor.Props;
public class MainWorker {
ActorSystem system;
ActorRef actor;
public MainWorker() {
this.system = ActorSystem.create("WorkerSystem", ConfigFactory.load("worker"));
actor = system.actorOf(Props.create(Worker.class), "workerActor");
}
}
Worker.java
package pi_swarm_approx;
import akka.actor.UntypedActor;
public class Worker extends UntypedActor {
public void onReceive(Object msg) {
System.out.println("Worker actor got message");
if (msg != null) {
getSender().tell("Request processed", getSelf());
}
else {
unhandled(msg);
}
getContext().stop(getSelf());
}
}
master.conf
akka {
actor {
provider = "cluster"
}
remote {
transport = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "localhost"
port = 2551
}
}
clustering {
cluster.name = "MasterSystem"
role = "master"
}
}
worker.conf
akka {
actor {
provider = "cluster"
deployment {
/workerActor {
remote = "akka.tcp://WorkerSystem@localhost:2552"
}
}
}
输出
In main
MasterSystem created
MainWorker obj created
Remote actor created
Message sent to remote actor
[INFO] [11/22/2021 16:01:34.531] [WorkerSystem-akka.actor.default-dispatcher-5] [akka://WorkerSystem/deadLetters] Message [java.lang.String] from Actor[akka://Main/user/app#402333018] to Actor[akka://WorkerSystem/deadLetters] was not delivered. [1] dead letters encountered. If this is not an expected behavior, then [Actor[akka://WorkerSystem/deadLetters]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
<=========----> 75% EXECUTING [18s]
您发布的代码存在多个问题。我发布了一个最小的工作代码。
首先,您使用的是 akka.actor.UntypedActor
的弃用版本。这在 2.4.0
中已弃用。如果您正在使用 maven
相应地更改依赖项。在 java 11
.
build.sbt
libraryDependencies ++=
Seq(
"com.typesafe.akka" %% "akka-actor" % "2.6.17",
"com.typesafe.akka" %% "akka-remote" % "2.6.17",
)
对于 provider
,我使用 remote
而不是 cluster
。您可以使用 cluster
,但请确保添加必要的依赖项。可以通过删除重复来进一步简化配置,但您可以在探索时这样做。
master.conf
akka {
actor {
provider = "remote"
}
remote {
artery {
enabled = on
transport = tcp
canonical {
hostname = "127.0.0.1"
port = 2552
}
}
}
}
worker.conf
akka {
actor {
provider = "remote"
}
remote {
artery {
enabled = on
transport = tcp
canonical {
hostname = "127.0.0.1"
port = 2551
}
}
}
}
MainMaster.java
import akka.actor.AbstractActor;
public class MainMaster extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
String.class,
System.out::println)
.matchAny(o -> System.out.println("received unknown message"))
.build();
}
}
Worker.java
public class Worker extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
String.class,
msg -> {
System.out.println(msg);
getSender().tell("Request processed", getSelf());
})
.matchAny(o -> System.out.println("received unknown message"))
.build();
}
}
主要Worker.java
public class MainWorker {
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("WorkerSystem", ConfigFactory.load("worker.conf"));
ActorRef actor = system.actorOf(Props.create(Worker.class), "workerActor");
System.out.println("worker started");
}
}
Main.java
public class Main {
public static void main(String[] args) {
System.out.println("In main");
ActorSystem system = ActorSystem.create("MasterSystem", ConfigFactory.load("master.conf"));
ActorRef master = system.actorOf(Props.create(MainMaster.class), "master");
ActorSelection remoteActor = system.actorSelection("akka://WorkerSystem@127.0.0.1:2551/user/workerActor");
remoteActor.tell("Hello Worker", master);
}
}