Akka.net - 如何等待子 actor 在停止之前处理所有未决消息
Akka.net - How to wait child actor to process all pending messages prior to stop
我们有一个名为 A 的集群分片 actor,它有多个子 actor,每个子 actor 是使用每个实体模式创建的,如下所示。
当我们将 100 条消息从演员 B 告诉 D 并且演员 D 用 500 毫秒来处理每条消息时,与此同时,当我们使用 Context.Parent.Tell(新钝化(PoisonPill.Instance ));
它立即停止所有子 actor,包括 actor D,而不处理未决消息。
A
|
B
/ \
C D
有没有办法等待actor D处理完所有消息?
您可以定义自己的停止消息,而不是发送 PoisonPill
- 这是一条系统消息,因此处理的优先级高于传统消息 - 您可以定义自己的停止消息,并让参与者使用 [=12 处理它=].
class MyShardedActor : ReceiveActor {
public MyShardedActor() {
Receive<MyStopMessage>(_ => Context.Stop(Self));
}
}
您可以使用 ClusterSharding.Start 方法重载注册您的自定义消息以与集群自行触发的钝化调用一起使用,该方法需要一个 handOffMessage
参数,该参数将在 Passivate
请求而不是 PoisonPill
.
是一个好的开始;您将需要一条自定义关机消息。当 parent actor 终止时,它的 children 会通过 /system
消息自动终止,这些消息会取代其队列中任何未处理的 /user
消息。
因此,您需要做的是确保在 parent 自行终止之前处理所有 /user
消息。有一种直接的方法可以使用 GracefulStop
扩展方法结合您的自定义停止消息来执行此操作:
public sealed class ActorA : ReceiveActor{
private IActorRef _actorB;
private readonly ILoggingAdapter _log = Context.GetLogger();
public ActorA(){
Receive<StartWork>(w => {
foreach(var i in Enumerable.Range(0, w.WorkCount)){
_actorB.Tell(i);
}
});
ReceiveAsync<MyStopMessage>(async _ => {
_log.Info("Begin shutdown");
// stop child actor B with the same custom message
await _actorB.GracefulStop(TimeSpan.FromSeconds(10), _);
// shut ourselves down after child is done
Context.Stop(Self);
});
}
protected override void PreStart(){
_actorB = Context.ActorOf(Props.Create(() => new ActorB()), "b");
}
}
public sealed class ActorB : ReceiveActor{
private IActorRef _actorC;
private IActorRef _actorD;
private readonly ILoggingAdapter _log = Context.GetLogger();
public ActorB(){
Receive<int>(i => {
_actorC.Tell(i);
_actorD.Tell(i);
});
ReceiveAsync<MyStopMessage>(async _ => {
_log.Info("Begin shutdown");
// stop both actors in parallel
var stopC = _actorC.GracefulStop(TimeSpan.FromSeconds(10));
var stopD = _actorD.GracefulStop(TimeSpan.FromSeconds(10));
// compose stop Tasks
var bothStopped = Task.WhenAll(stopC, stopD);
await bothStopped;
// shut ourselves down immediately
Context.Stop(Self);
});
}
protected override void PreStart(){
var workerProps = Props.Create(() => new WorkerActor());
_actorC = Context.ActorOf(workerProps, "c");
_actorD = Context.ActorOf(workerProps, "d");
}
}
public sealed class WorkerActor : ReceiveActor {
private readonly ILoggingAdapter _log = Context.GetLogger();
public WorkerActor(){
ReceiveAsync<int>(async i => {
await Task.Delay(10);
_log.Info("Received {0}", i);
});
}
}
我在这里创建了这个示例的可运行版本:https://dotnetfiddle.net/xiGyWM - 您会看到 MyStopMessage
在示例开始后不久就收到了,但是 在 C 和 D 被分配工作之后。在这种情况下,所有这些工作都在任何参与者终止之前完成。
我们有一个名为 A 的集群分片 actor,它有多个子 actor,每个子 actor 是使用每个实体模式创建的,如下所示。 当我们将 100 条消息从演员 B 告诉 D 并且演员 D 用 500 毫秒来处理每条消息时,与此同时,当我们使用 Context.Parent.Tell(新钝化(PoisonPill.Instance )); 它立即停止所有子 actor,包括 actor D,而不处理未决消息。
A
|
B
/ \
C D
有没有办法等待actor D处理完所有消息?
您可以定义自己的停止消息,而不是发送 PoisonPill
- 这是一条系统消息,因此处理的优先级高于传统消息 - 您可以定义自己的停止消息,并让参与者使用 [=12 处理它=].
class MyShardedActor : ReceiveActor {
public MyShardedActor() {
Receive<MyStopMessage>(_ => Context.Stop(Self));
}
}
您可以使用 ClusterSharding.Start 方法重载注册您的自定义消息以与集群自行触发的钝化调用一起使用,该方法需要一个 handOffMessage
参数,该参数将在 Passivate
请求而不是 PoisonPill
.
/system
消息自动终止,这些消息会取代其队列中任何未处理的 /user
消息。
因此,您需要做的是确保在 parent 自行终止之前处理所有 /user
消息。有一种直接的方法可以使用 GracefulStop
扩展方法结合您的自定义停止消息来执行此操作:
public sealed class ActorA : ReceiveActor{
private IActorRef _actorB;
private readonly ILoggingAdapter _log = Context.GetLogger();
public ActorA(){
Receive<StartWork>(w => {
foreach(var i in Enumerable.Range(0, w.WorkCount)){
_actorB.Tell(i);
}
});
ReceiveAsync<MyStopMessage>(async _ => {
_log.Info("Begin shutdown");
// stop child actor B with the same custom message
await _actorB.GracefulStop(TimeSpan.FromSeconds(10), _);
// shut ourselves down after child is done
Context.Stop(Self);
});
}
protected override void PreStart(){
_actorB = Context.ActorOf(Props.Create(() => new ActorB()), "b");
}
}
public sealed class ActorB : ReceiveActor{
private IActorRef _actorC;
private IActorRef _actorD;
private readonly ILoggingAdapter _log = Context.GetLogger();
public ActorB(){
Receive<int>(i => {
_actorC.Tell(i);
_actorD.Tell(i);
});
ReceiveAsync<MyStopMessage>(async _ => {
_log.Info("Begin shutdown");
// stop both actors in parallel
var stopC = _actorC.GracefulStop(TimeSpan.FromSeconds(10));
var stopD = _actorD.GracefulStop(TimeSpan.FromSeconds(10));
// compose stop Tasks
var bothStopped = Task.WhenAll(stopC, stopD);
await bothStopped;
// shut ourselves down immediately
Context.Stop(Self);
});
}
protected override void PreStart(){
var workerProps = Props.Create(() => new WorkerActor());
_actorC = Context.ActorOf(workerProps, "c");
_actorD = Context.ActorOf(workerProps, "d");
}
}
public sealed class WorkerActor : ReceiveActor {
private readonly ILoggingAdapter _log = Context.GetLogger();
public WorkerActor(){
ReceiveAsync<int>(async i => {
await Task.Delay(10);
_log.Info("Received {0}", i);
});
}
}
我在这里创建了这个示例的可运行版本:https://dotnetfiddle.net/xiGyWM - 您会看到 MyStopMessage
在示例开始后不久就收到了,但是 在 C 和 D 被分配工作之后。在这种情况下,所有这些工作都在任何参与者终止之前完成。