Akka路由器实现

Akka Router Implementation

我正在练习 Akka 路由,在 Akka 文档中找到了这段代码。

路由器class:

public class Router extends AbstractActor {

    private List<Routee> routees = new ArrayList<Routee>();

    akka.routing.Router router;
    {

        for(int i=1;i<=5;i++) {
            ActorRef actor = getContext().actorOf(Props.create(Actor.class));
            getContext().watch(actor);
            routees.add(new ActorRefRoutee(actor));
            System.out.println("Routee added");
        }

        router = new akka.routing.Router(new RoundRobinRoutingLogic(), routees);

    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(Work.class, work -> {
                    router.route(work, getSender());
                })
                .match(Terminated.class, terminated -> {

                    System.out.println("Got actor terminated message");

                    router.removeRoutee(new ActorRefRoutee(terminated.actor()));
                    ActorRef actor = getContext().actorOf(Props.create(Actor.class));
                    getContext().watch(actor);
                    router.addRoutee(new ActorRefRoutee(actor));

                    System.out.println("Routee added back");
                })
                .build();
    }
}

演员class:

public class Actor extends AbstractActor {

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(Work.class, work -> {
                    System.out.println("Work message received");
                    getContext().stop(getSelf());
                })
                .build();
    }
}

Router class 创建 Actor class 的五个实例。实例存储在列表中。我在配置路由器的时候给它应用了一个RoundRobinRoutingLogic

当我发送五条或更少的消息时,它工作得很好。但是当消息数量超过五条时,就会出现死信错误。为什么?每次停止路由时,我都会添加一个新路由。谁能帮我解决这个问题?

Actor 的启动和停止,以及 Terminated 消息的生成,都是异步发生的。在您的问题上下文中,这意味着您无法保证在路由器收到第六条消息之前,使用当前设置,新路由已添加到路由器。可能正在发生的事情是路由器正在如此快速地搅动消息,以至于当第六条 Work 消息到达时,还没有可用的路由。

顺便说一句,给你的 类 "Router" 和 "Actor," 命名非常混乱,因为 Akka API 已经使用了这些词。