Akka路由器实现
Akka Router Implementation
我正在练习 Akka 路由,在 Akka 文档中找到了这段代码。
路由器class:
public class Router extends AbstractActor {
private List<Routee> routees = new ArrayList<Routee>();
akka.routing.Router router;
{
for(int i=1;i<=5;i++) {
ActorRef actor = getContext().actorOf(Props.create(Actor.class));
getContext().watch(actor);
routees.add(new ActorRefRoutee(actor));
System.out.println("Routee added");
}
router = new akka.routing.Router(new RoundRobinRoutingLogic(), routees);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Work.class, work -> {
router.route(work, getSender());
})
.match(Terminated.class, terminated -> {
System.out.println("Got actor terminated message");
router.removeRoutee(new ActorRefRoutee(terminated.actor()));
ActorRef actor = getContext().actorOf(Props.create(Actor.class));
getContext().watch(actor);
router.addRoutee(new ActorRefRoutee(actor));
System.out.println("Routee added back");
})
.build();
}
}
演员class:
public class Actor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Work.class, work -> {
System.out.println("Work message received");
getContext().stop(getSelf());
})
.build();
}
}
Router
class 创建 Actor
class 的五个实例。实例存储在列表中。我在配置路由器的时候给它应用了一个RoundRobinRoutingLogic
。
当我发送五条或更少的消息时,它工作得很好。但是当消息数量超过五条时,就会出现死信错误。为什么?每次停止路由时,我都会添加一个新路由。谁能帮我解决这个问题?
Actor 的启动和停止,以及 Terminated
消息的生成,都是异步发生的。在您的问题上下文中,这意味着您无法保证在路由器收到第六条消息之前,使用当前设置,新路由已添加到路由器。可能正在发生的事情是路由器正在如此快速地搅动消息,以至于当第六条 Work
消息到达时,还没有可用的路由。
顺便说一句,给你的 类 "Router" 和 "Actor," 命名非常混乱,因为 Akka API 已经使用了这些词。
我正在练习 Akka 路由,在 Akka 文档中找到了这段代码。
路由器class:
public class Router extends AbstractActor {
private List<Routee> routees = new ArrayList<Routee>();
akka.routing.Router router;
{
for(int i=1;i<=5;i++) {
ActorRef actor = getContext().actorOf(Props.create(Actor.class));
getContext().watch(actor);
routees.add(new ActorRefRoutee(actor));
System.out.println("Routee added");
}
router = new akka.routing.Router(new RoundRobinRoutingLogic(), routees);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Work.class, work -> {
router.route(work, getSender());
})
.match(Terminated.class, terminated -> {
System.out.println("Got actor terminated message");
router.removeRoutee(new ActorRefRoutee(terminated.actor()));
ActorRef actor = getContext().actorOf(Props.create(Actor.class));
getContext().watch(actor);
router.addRoutee(new ActorRefRoutee(actor));
System.out.println("Routee added back");
})
.build();
}
}
演员class:
public class Actor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Work.class, work -> {
System.out.println("Work message received");
getContext().stop(getSelf());
})
.build();
}
}
Router
class 创建 Actor
class 的五个实例。实例存储在列表中。我在配置路由器的时候给它应用了一个RoundRobinRoutingLogic
。
当我发送五条或更少的消息时,它工作得很好。但是当消息数量超过五条时,就会出现死信错误。为什么?每次停止路由时,我都会添加一个新路由。谁能帮我解决这个问题?
Actor 的启动和停止,以及 Terminated
消息的生成,都是异步发生的。在您的问题上下文中,这意味着您无法保证在路由器收到第六条消息之前,使用当前设置,新路由已添加到路由器。可能正在发生的事情是路由器正在如此快速地搅动消息,以至于当第六条 Work
消息到达时,还没有可用的路由。
顺便说一句,给你的 类 "Router" 和 "Actor," 命名非常混乱,因为 Akka API 已经使用了这些词。