具有退避和重试限制的 Actor 重试

Actor retry with back-off and retry limit

我需要 akka actor 的重试机制,增加重试之间的时间和最大重试限制。 为此,我尝试使用 akka 提供的 BackOffSupervisor 模式。问题是,从我的测试来看,退避策略和重试限制似乎不起作用。或者可能问题出在测试中?

测试看起来像这样:

在前 5 条消息中抛出异常的简单 actor

class SomeActor extends AbstractActor {

private static int messageCounter = 0;

//return the message to sender
@Override
public void preRestart(final Throwable reason, final Optional<Object> message) {
    getSelf().tell(message.get(), getSender());
}


@Override
public Receive createReceive() {
    return receiveBuilder()
            .matchEquals("hello", message -> {
                messageCounter++;
                getSender().tell("response", getSelf());

                //Throw Exception at the first 5 messages
                if (messageCounter < 5) {
                    throw new Exception();
                }

            })
            .build();
}

}

BackOffSupervisor 配置

private ActorRef createSupervisedActor(Class<? extends Actor> actorClass) {
    final Props someActorProps = Props.create(actorClass);

    final Props supervisorProps = BackoffSupervisor.props(
            Backoff.onStop(
                    someActorProps, //actor to be supervised
                    "someActor",
                    Duration.ofSeconds(10), //min back-off time
                    Duration.ofMinutes(2), // max back-off time
                    0.2, // back-off increase factor
                    10) // max retry limit
                    .withSupervisorStrategy(
                            new OneForOneStrategy(
                                    DeciderBuilder
                                            .match(Exception.class, e -> SupervisorStrategy.restart())
                                            .matchAny(o -> SupervisorStrategy.escalate())
                                            .build())
                    )
    );

    return testSystem.actorOf(supervisorProps);

}

及测试方法

    @Test
public void test() {
    new TestKit(testSystem) {{
        ActorRef actorRef = createSupervisedActor(SomeActor.class);

        actorRef.tell("hello", getRef());

        //Expect 5 responses in 1 second
        receiveN(5, Duration.ofSeconds(1));
    }};

}

测试完成得太快了。在不到一秒钟的时间内,从 BackoffSupervisor 的配置开始,我预计至少需要 50 多秒。

出现问题的原因如下:

在子 actor(在我的例子中是 someActor)中抛出异常不是由 Backoff.onStop 处理的,因此由正常的默认监督处理,这意味着立即重启。 - https://github.com/akka/akka/issues/23406#issuecomment-372602568