整个集群上的 Akka ActorSelection
Akka ActorSelection over a whole cluser
我有一个简单的集群,每个节点上都有一个名为 "service" 的服务角色。此服务分别与 ClusterClientRecptionist
公开,以便能够通过 ClusterClient
.
从集群外部使用 ist
然后客户端注册用户,这些用户是在集群的随机节点上创建的(因为 ClusterClient
是随机分配的)。例如,节点 1 上的 /user/service/user1
和节点 2 上的 /user/service/user2
。
我现在想做的是向所有注册用户发送消息,而不管他们的实际位置。我认为使用像 /user/service/*
这样的 ActorSelection
很容易。但这只会解析相应节点上的本地 acotr。
顺便说一下,我和 Java 一起工作。
选项 1
我刚刚使用 this question and documented here 中描述的 DistributedPubSubMediator
解决了它。
private ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
@Override
public void onReceive(Object msg) throws Exception {
String msgStr = msg.toString();
String val = msgStr.substring(4);
if (msgStr.startsWith("add")) {
ActorRef act = context().actorOf(Props.create(User.class, val), val);
// subscribe the newly created user on topic "allUsers"
mediator.tell(new DistributedPubSubMediator.Subscribe("allUsers", act), self());
System.out.println("user created: " + act);
} else if (msgStr.startsWith("say")) {
// broadcast text message to all subscribed users
mediator.tell(new DistributedPubSubMediator.Publish("allUsers", new Text(val)), self());
}
}
选项 2
第二个成功的选项是使用 BroadcastGroup 路由器。重要的是在配置中启用了集群:
akka.actor.deployment {
/allUsers {
router = broadcast-group
routees.paths = ["/user/service/*"]
cluster {
enabled = on
allow-local-routees = on
}
}
}
之后就可以按照文档直接使用了。
ActorRef allUsers = system.actorOf(FromConfig.getInstance().props(), "allUsers");
[...]
allUsers.tell(new Text(val), self());
我有一个简单的集群,每个节点上都有一个名为 "service" 的服务角色。此服务分别与 ClusterClientRecptionist
公开,以便能够通过 ClusterClient
.
然后客户端注册用户,这些用户是在集群的随机节点上创建的(因为 ClusterClient
是随机分配的)。例如,节点 1 上的 /user/service/user1
和节点 2 上的 /user/service/user2
。
我现在想做的是向所有注册用户发送消息,而不管他们的实际位置。我认为使用像 /user/service/*
这样的 ActorSelection
很容易。但这只会解析相应节点上的本地 acotr。
顺便说一下,我和 Java 一起工作。
选项 1
我刚刚使用 this question and documented here 中描述的 DistributedPubSubMediator
解决了它。
private ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
@Override
public void onReceive(Object msg) throws Exception {
String msgStr = msg.toString();
String val = msgStr.substring(4);
if (msgStr.startsWith("add")) {
ActorRef act = context().actorOf(Props.create(User.class, val), val);
// subscribe the newly created user on topic "allUsers"
mediator.tell(new DistributedPubSubMediator.Subscribe("allUsers", act), self());
System.out.println("user created: " + act);
} else if (msgStr.startsWith("say")) {
// broadcast text message to all subscribed users
mediator.tell(new DistributedPubSubMediator.Publish("allUsers", new Text(val)), self());
}
}
选项 2
第二个成功的选项是使用 BroadcastGroup 路由器。重要的是在配置中启用了集群:
akka.actor.deployment {
/allUsers {
router = broadcast-group
routees.paths = ["/user/service/*"]
cluster {
enabled = on
allow-local-routees = on
}
}
}
之后就可以按照文档直接使用了。
ActorRef allUsers = system.actorOf(FromConfig.getInstance().props(), "allUsers");
[...]
allUsers.tell(new Text(val), self());