error: message from Actor to Actor was not delivered.[1] dead letters encountered. Distributed pub-sub working across clusters not working
error: message from Actor to Actor was not delivered.[1] dead letters encountered. Distributed pub-sub working across clusters not working
我正在尝试跨不同的集群系统创建分布式 pub-sub,但无论我尝试什么都行不通。
我想要做的就是创建一个简单的示例,其中。
1) 我创建了一个主题,比如"content"。
2) 例如 jvm A 中的一个节点创建主题,订阅主题,以及发布主题的发布者。
3) 在不同的节点中,比如在不同的端口上的 jvm B,我创建了一个订阅者。
4) 当我从 jvm A 向主题发送消息时,我希望 jvm B 上的订阅者也能收到它,因为它订阅了同一主题。
任何帮助将不胜感激,或者在 Java.
中的不同端口上的不同集群系统中具有订阅者和发布者的分布式 pub-sub 的简单工作示例
这里是 app1 的代码及其配置文件。
public class App1{
public static void main(String[] args) {
System.setProperty("akka.remote.netty.tcp.port", "2551");
ActorSystem clusterSystem = ActorSystem.create("ClusterSystem");
ClusterClientReceptionist clusterClientReceptionist1 = ClusterClientReceptionist.get(clusterSystem);
ActorRef subcriber1=clusterSystem.actorOf(Props.create(Subscriber.class), "subscriber1");
clusterClientReceptionist1.registerSubscriber("content", subcriber1);
ActorRef publisher1=clusterSystem.actorOf(Props.create(Publisher.class), "publisher1");
clusterClientReceptionist1.registerSubscriber("content", publisher1);
publisher1.tell("testMessage1", ActorRef.noSender());
}
}
app1.confi
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
stdout-loglevel = "DEBUG"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551"
]
auto-down-unreachable-after = 10s
}
akka.extensions = ["akka.cluster.pubsub.DistributedPubSub",
"akka.contrib.pattern.ClusterReceptionistExtension"]
akka.cluster.pub-sub {
name = distributedPubSubMediator
role = ""
routing-logic = random
gossip-interval = 1s
removed-time-to-live = 120s
max-delta-elements = 3000
use-dispatcher = ""
}
akka.cluster.client.receptionist {
name = receptionist
role = ""
number-of-contacts = 3
response-tunnel-receive-timeout = 30s
use-dispatcher = ""
heartbeat-interval = 2s
acceptable-heartbeat-pause = 13s
failure-detection-interval = 2s
}
}
app2 的代码及其配置文件
public class App
{
public static Set<ActorPath> initialContacts() {
return new HashSet<ActorPath>(Arrays.asList(
ActorPaths.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist")));
}
public static void main( String[] args ) {
System.setProperty("akka.remote.netty.tcp.port", "2553");
ActorSystem clusterSystem = ActorSystem.create("ClusterSystem2");
ClusterClientReceptionist clusterClientReceptionist2 = ClusterClientReceptionist.get(clusterSystem);
final ActorRef clusterClient = clusterSystem.actorOf(ClusterClient.props(ClusterClientSettings.create(
clusterSystem).withInitialContacts(initialContacts())), "client");
ActorRef subcriber2=clusterSystem.actorOf(Props.create(Subscriber.class), "subscriber2");
clusterClientReceptionist2.registerSubscriber("content", subcriber2);
ActorRef publisher2=clusterSystem.actorOf(Props.create(Publisher.class), "publisher2");
publisher2.tell("testMessage2", ActorRef.noSender());
clusterClient.tell(new ClusterClient.Send("/user/publisher1", "hello", true), null);
}
}
app2.confi
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
stdout-loglevel = "DEBUG"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2553
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2553"
]
auto-down-unreachable-after = 10s
}
akka.extensions = ["akka.cluster.pubsub.DistributedPubSub",
"akka.contrib.pattern.ClusterReceptionistExtension"]
akka.cluster.pub-sub {
name = distributedPubSubMediator
role = ""
routing-logic = random
gossip-interval = 1s
removed-time-to-live = 120s
max-delta-elements = 3000
use-dispatcher = ""
}
akka.cluster.client.receptionist {
name = receptionist
role = ""
number-of-contacts = 3
response-tunnel-receive-timeout = 30s
use-dispatcher = ""
heartbeat-interval = 2s
acceptable-heartbeat-pause = 13s
failure-detection-interval = 2s
}
}
发布者和订阅者 class 对于下面给出的两个应用程序是相同的。
出版商:
public class Publisher extends UntypedActor {
private final ActorRef mediator =
DistributedPubSub.get(getContext().system()).mediator();
@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof String) {
mediator.tell(new DistributedPubSubMediator.Publish("events", msg), getSelf());
} else {
unhandled(msg);
}
}
}
订阅者:
public class Subscriber extends UntypedActor {
private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public Subscriber(){
ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
mediator.tell(new DistributedPubSubMediator.Subscribe("events", getSelf()), getSelf());
}
public void onReceive(Object msg) throws Throwable {
if (msg instanceof String) {
log.info("Got: {}", msg);
} else if (msg instanceof DistributedPubSubMediator.SubscribeAck) {
log.info("subscribing");
} else {
unhandled(msg);
}
}
}
我在接收器端应用程序中遇到此错误,而 运行 两个应用程序。遇到死信
[ClusterSystem-akka.actor.default-dispatcher-21] INFO akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://ClusterSystem/system/receptionist/akka.tcp%3A%2F%2FClusterSystem2%40127.0.0.1%3A2553%2FdeadLetters#188707926] to Actor[akka://ClusterSystem/system/distributedPubSubMediator#1119990682] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
并且在发件人端应用消息发送成功会显示在日志中。
[ClusterSystem2-akka.actor.default-dispatcher-22] DEBUG akka.cluster.client.ClusterClient - Sending buffered messages to receptionist
以这种方式使用 ClusterClient 没有任何意义,并且与使用分布式 pub sub 没有任何关系,因为您的两个节点都是集群的一部分,您可以只使用分布式 pub sub api直接。
这是一个简单的主要内容,包括使用按预期工作的确切发布者和订阅者参与者创建双节点集群的配置:
public static void main(String[] args) throws Exception {
final Config config = ConfigFactory.parseString(
"akka.actor.provider=cluster\n" +
"akka.remote.netty.tcp.port=2551\n" +
"akka.cluster.seed-nodes = [ \"akka.tcp://ClusterSystem@127.0.0.1:2551\"]\n");
ActorSystem node1 = ActorSystem.create("ClusterSystem", config);
ActorSystem node2 = ActorSystem.create("ClusterSystem",
ConfigFactory.parseString("akka.remote.netty.tcp.port=2552")
.withFallback(config));
// wait a bit for the cluster to form
Thread.sleep(3000);
ActorRef subscriber = node1.actorOf(
Props.create(Subscriber.class),
"subscriber");
ActorRef publisher = node2.actorOf(
Props.create(Publisher.class),
"publisher");
// wait a bit for the subscription to be gossiped
Thread.sleep(3000);
publisher.tell("testMessage1", ActorRef.noSender());
}
请注意,分布式 pub sub 不提供任何传递保证,因此如果您在中介者相互联系之前发送消息,消息将简单地丢失(因此 Thread.sleep
声明,这不是你应该在实际代码中做的事情)。
我认为问题在于您的 actor 系统具有不同的名称 ClusterSystem 和 ClusterSystem2。至少我遇到了同样的问题,因为我在集群中有两个不同的服务,但我用不同的名称命名每个服务中的系统。
我正在尝试跨不同的集群系统创建分布式 pub-sub,但无论我尝试什么都行不通。
我想要做的就是创建一个简单的示例,其中。
1) 我创建了一个主题,比如"content"。
2) 例如 jvm A 中的一个节点创建主题,订阅主题,以及发布主题的发布者。
3) 在不同的节点中,比如在不同的端口上的 jvm B,我创建了一个订阅者。
4) 当我从 jvm A 向主题发送消息时,我希望 jvm B 上的订阅者也能收到它,因为它订阅了同一主题。
任何帮助将不胜感激,或者在 Java.
中的不同端口上的不同集群系统中具有订阅者和发布者的分布式 pub-sub 的简单工作示例这里是 app1 的代码及其配置文件。
public class App1{
public static void main(String[] args) {
System.setProperty("akka.remote.netty.tcp.port", "2551");
ActorSystem clusterSystem = ActorSystem.create("ClusterSystem");
ClusterClientReceptionist clusterClientReceptionist1 = ClusterClientReceptionist.get(clusterSystem);
ActorRef subcriber1=clusterSystem.actorOf(Props.create(Subscriber.class), "subscriber1");
clusterClientReceptionist1.registerSubscriber("content", subcriber1);
ActorRef publisher1=clusterSystem.actorOf(Props.create(Publisher.class), "publisher1");
clusterClientReceptionist1.registerSubscriber("content", publisher1);
publisher1.tell("testMessage1", ActorRef.noSender());
}
}
app1.confi
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
stdout-loglevel = "DEBUG"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551"
]
auto-down-unreachable-after = 10s
}
akka.extensions = ["akka.cluster.pubsub.DistributedPubSub",
"akka.contrib.pattern.ClusterReceptionistExtension"]
akka.cluster.pub-sub {
name = distributedPubSubMediator
role = ""
routing-logic = random
gossip-interval = 1s
removed-time-to-live = 120s
max-delta-elements = 3000
use-dispatcher = ""
}
akka.cluster.client.receptionist {
name = receptionist
role = ""
number-of-contacts = 3
response-tunnel-receive-timeout = 30s
use-dispatcher = ""
heartbeat-interval = 2s
acceptable-heartbeat-pause = 13s
failure-detection-interval = 2s
}
}
app2 的代码及其配置文件
public class App
{
public static Set<ActorPath> initialContacts() {
return new HashSet<ActorPath>(Arrays.asList(
ActorPaths.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist")));
}
public static void main( String[] args ) {
System.setProperty("akka.remote.netty.tcp.port", "2553");
ActorSystem clusterSystem = ActorSystem.create("ClusterSystem2");
ClusterClientReceptionist clusterClientReceptionist2 = ClusterClientReceptionist.get(clusterSystem);
final ActorRef clusterClient = clusterSystem.actorOf(ClusterClient.props(ClusterClientSettings.create(
clusterSystem).withInitialContacts(initialContacts())), "client");
ActorRef subcriber2=clusterSystem.actorOf(Props.create(Subscriber.class), "subscriber2");
clusterClientReceptionist2.registerSubscriber("content", subcriber2);
ActorRef publisher2=clusterSystem.actorOf(Props.create(Publisher.class), "publisher2");
publisher2.tell("testMessage2", ActorRef.noSender());
clusterClient.tell(new ClusterClient.Send("/user/publisher1", "hello", true), null);
}
}
app2.confi
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
stdout-loglevel = "DEBUG"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 2553
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2553"
]
auto-down-unreachable-after = 10s
}
akka.extensions = ["akka.cluster.pubsub.DistributedPubSub",
"akka.contrib.pattern.ClusterReceptionistExtension"]
akka.cluster.pub-sub {
name = distributedPubSubMediator
role = ""
routing-logic = random
gossip-interval = 1s
removed-time-to-live = 120s
max-delta-elements = 3000
use-dispatcher = ""
}
akka.cluster.client.receptionist {
name = receptionist
role = ""
number-of-contacts = 3
response-tunnel-receive-timeout = 30s
use-dispatcher = ""
heartbeat-interval = 2s
acceptable-heartbeat-pause = 13s
failure-detection-interval = 2s
}
}
发布者和订阅者 class 对于下面给出的两个应用程序是相同的。
出版商:
public class Publisher extends UntypedActor {
private final ActorRef mediator =
DistributedPubSub.get(getContext().system()).mediator();
@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof String) {
mediator.tell(new DistributedPubSubMediator.Publish("events", msg), getSelf());
} else {
unhandled(msg);
}
}
}
订阅者:
public class Subscriber extends UntypedActor {
private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public Subscriber(){
ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
mediator.tell(new DistributedPubSubMediator.Subscribe("events", getSelf()), getSelf());
}
public void onReceive(Object msg) throws Throwable {
if (msg instanceof String) {
log.info("Got: {}", msg);
} else if (msg instanceof DistributedPubSubMediator.SubscribeAck) {
log.info("subscribing");
} else {
unhandled(msg);
}
}
}
我在接收器端应用程序中遇到此错误,而 运行 两个应用程序。遇到死信
[ClusterSystem-akka.actor.default-dispatcher-21] INFO akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://ClusterSystem/system/receptionist/akka.tcp%3A%2F%2FClusterSystem2%40127.0.0.1%3A2553%2FdeadLetters#188707926] to Actor[akka://ClusterSystem/system/distributedPubSubMediator#1119990682] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
并且在发件人端应用消息发送成功会显示在日志中。
[ClusterSystem2-akka.actor.default-dispatcher-22] DEBUG akka.cluster.client.ClusterClient - Sending buffered messages to receptionist
以这种方式使用 ClusterClient 没有任何意义,并且与使用分布式 pub sub 没有任何关系,因为您的两个节点都是集群的一部分,您可以只使用分布式 pub sub api直接。
这是一个简单的主要内容,包括使用按预期工作的确切发布者和订阅者参与者创建双节点集群的配置:
public static void main(String[] args) throws Exception {
final Config config = ConfigFactory.parseString(
"akka.actor.provider=cluster\n" +
"akka.remote.netty.tcp.port=2551\n" +
"akka.cluster.seed-nodes = [ \"akka.tcp://ClusterSystem@127.0.0.1:2551\"]\n");
ActorSystem node1 = ActorSystem.create("ClusterSystem", config);
ActorSystem node2 = ActorSystem.create("ClusterSystem",
ConfigFactory.parseString("akka.remote.netty.tcp.port=2552")
.withFallback(config));
// wait a bit for the cluster to form
Thread.sleep(3000);
ActorRef subscriber = node1.actorOf(
Props.create(Subscriber.class),
"subscriber");
ActorRef publisher = node2.actorOf(
Props.create(Publisher.class),
"publisher");
// wait a bit for the subscription to be gossiped
Thread.sleep(3000);
publisher.tell("testMessage1", ActorRef.noSender());
}
请注意,分布式 pub sub 不提供任何传递保证,因此如果您在中介者相互联系之前发送消息,消息将简单地丢失(因此 Thread.sleep
声明,这不是你应该在实际代码中做的事情)。
我认为问题在于您的 actor 系统具有不同的名称 ClusterSystem 和 ClusterSystem2。至少我遇到了同样的问题,因为我在集群中有两个不同的服务,但我用不同的名称命名每个服务中的系统。