具有退避和重试限制的 Actor 重试
Actor retry with back-off and retry limit
我需要 akka actor 的重试机制,增加重试之间的时间和最大重试限制。
为此,我尝试使用 akka 提供的 BackOffSupervisor 模式。问题是,从我的测试来看,退避策略和重试限制似乎不起作用。或者可能问题出在测试中?
测试看起来像这样:
在前 5 条消息中抛出异常的简单 actor
class SomeActor extends AbstractActor {
private static int messageCounter = 0;
//return the message to sender
@Override
public void preRestart(final Throwable reason, final Optional<Object> message) {
getSelf().tell(message.get(), getSender());
}
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("hello", message -> {
messageCounter++;
getSender().tell("response", getSelf());
//Throw Exception at the first 5 messages
if (messageCounter < 5) {
throw new Exception();
}
})
.build();
}
}
BackOffSupervisor 配置
private ActorRef createSupervisedActor(Class<? extends Actor> actorClass) {
final Props someActorProps = Props.create(actorClass);
final Props supervisorProps = BackoffSupervisor.props(
Backoff.onStop(
someActorProps, //actor to be supervised
"someActor",
Duration.ofSeconds(10), //min back-off time
Duration.ofMinutes(2), // max back-off time
0.2, // back-off increase factor
10) // max retry limit
.withSupervisorStrategy(
new OneForOneStrategy(
DeciderBuilder
.match(Exception.class, e -> SupervisorStrategy.restart())
.matchAny(o -> SupervisorStrategy.escalate())
.build())
)
);
return testSystem.actorOf(supervisorProps);
}
及测试方法
@Test
public void test() {
new TestKit(testSystem) {{
ActorRef actorRef = createSupervisedActor(SomeActor.class);
actorRef.tell("hello", getRef());
//Expect 5 responses in 1 second
receiveN(5, Duration.ofSeconds(1));
}};
}
测试完成得太快了。在不到一秒钟的时间内,从 BackoffSupervisor 的配置开始,我预计至少需要 50 多秒。
出现问题的原因如下:
在子 actor(在我的例子中是 someActor)中抛出异常不是由 Backoff.onStop 处理的,因此由正常的默认监督处理,这意味着立即重启。 - https://github.com/akka/akka/issues/23406#issuecomment-372602568
我需要 akka actor 的重试机制,增加重试之间的时间和最大重试限制。 为此,我尝试使用 akka 提供的 BackOffSupervisor 模式。问题是,从我的测试来看,退避策略和重试限制似乎不起作用。或者可能问题出在测试中?
测试看起来像这样:
在前 5 条消息中抛出异常的简单 actor
class SomeActor extends AbstractActor {
private static int messageCounter = 0;
//return the message to sender
@Override
public void preRestart(final Throwable reason, final Optional<Object> message) {
getSelf().tell(message.get(), getSender());
}
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals("hello", message -> {
messageCounter++;
getSender().tell("response", getSelf());
//Throw Exception at the first 5 messages
if (messageCounter < 5) {
throw new Exception();
}
})
.build();
}
}
BackOffSupervisor 配置
private ActorRef createSupervisedActor(Class<? extends Actor> actorClass) {
final Props someActorProps = Props.create(actorClass);
final Props supervisorProps = BackoffSupervisor.props(
Backoff.onStop(
someActorProps, //actor to be supervised
"someActor",
Duration.ofSeconds(10), //min back-off time
Duration.ofMinutes(2), // max back-off time
0.2, // back-off increase factor
10) // max retry limit
.withSupervisorStrategy(
new OneForOneStrategy(
DeciderBuilder
.match(Exception.class, e -> SupervisorStrategy.restart())
.matchAny(o -> SupervisorStrategy.escalate())
.build())
)
);
return testSystem.actorOf(supervisorProps);
}
及测试方法
@Test
public void test() {
new TestKit(testSystem) {{
ActorRef actorRef = createSupervisedActor(SomeActor.class);
actorRef.tell("hello", getRef());
//Expect 5 responses in 1 second
receiveN(5, Duration.ofSeconds(1));
}};
}
测试完成得太快了。在不到一秒钟的时间内,从 BackoffSupervisor 的配置开始,我预计至少需要 50 多秒。
出现问题的原因如下:
在子 actor(在我的例子中是 someActor)中抛出异常不是由 Backoff.onStop 处理的,因此由正常的默认监督处理,这意味着立即重启。 - https://github.com/akka/akka/issues/23406#issuecomment-372602568