Akka.Net 异步处理程序出现异常时未执行 PreRestart
Akka.Net PreRestart not executed when exception from async handler
我有以下 Actor,我正在尝试重新启动并将失败的消息重新发送回 Actor:
public class BuildActor : ReceivePersistentActor
{
public override string PersistenceId => "asdad3333";
private readonly IActorRef _nextActorRef;
public BuildActor(IActorRef nextActorRef)
{
_nextActorRef = nextActorRef;
Command<Workload>(x => Build(x));
RecoverAny(workload =>
{
Console.WriteLine("Recovering");
});
}
public void Build(Workload Workload)
{
var context = Context;
var self = Self;
Persist(Workload, async x =>
{
//after this line executes
//application goes into break mode
//does not execute PreStart or Recover
var workload = await BuildTask(Workload);
_nextActorRef.Tell(workload);
context.Stop(self);
});
}
private Task<Workload> BuildTask(Workload Workload)
{
//works as expected if method made synchronous
return Task.Run(() =>
{
//simulate exception
if (Workload.ShowException)
{
throw new Exception();
}
return Workload;
});
}
protected override void PreRestart(Exception reason, object message)
{
if (message is Workload workload)
{
Console.WriteLine("Prestart");
workload.ShowException = false;
Self.Tell(message);
}
}
}
在 Persist
的成功处理程序中,我正在尝试模拟抛出的异常,但在发生异常时应用程序进入中断模式并且未调用 PreRestart
挂钩。但是,如果我通过删除 Task.Run
使 BuildTask
方法同步,那么在异常时会调用 PreRestart
和 Recover<T>
方法。
如果有人能指出我推荐的模式应该是什么以及我哪里出错了,我将不胜感激。
很有可能,Akka.Persistence 不是解决您这里问题的好方法。
Akka.Persistence 使用 eventsourcing 原则来存储 actor 的状态。在这方面重要的几个关键点:
- 您发送给演员的是命令。它描述了一份工作,你想完成。执行该命令可能会导致进行一些实际处理,并最终可能导致以 events.
的形式持久保存 actor 的线性状态更改历史记录
- 在Akka.NET中
Persist
方法仅用于存储事件 - 它们描述事实,即某事已经发生:因此,它们不能被拒绝,也不能失败(这是您在 Persist
回调中所做的事情)。
- 当 actor 在任何时间点重新启动时,它总是会尝试通过重播所有事件来重新创建自己的状态
Persist
ed 直到最后一个已知时间点。出于这个原因,重要的是 Recover
方法应该只专注于重放演员的状态(它可以在同一事件中多次调用)并且永远不会产生副作用(副作用的例子是发送电子邮件)。那里抛出的任何异常都意味着 actor 状态已不可恢复地损坏并且该 actor 将被杀死。
如果您想将消息重新发送给您的演员,您可以:
- 将可靠的消息队列(即 RabbitMQ 或 Azure 服务总线)或日志(Kafka 或事件中心)放在您的参与者处理管道前面。这其实是很多情况下最合理的场景。
- 使用 Akka.Persistence 中的 at-least-once delivery 语义 - 但恕我直言,仅当由于某种原因您无法使用第一个解决方案时。
- 最简单和最不可靠的选项(因为消息仅驻留在内存中并且从不持久化)是 dead letter queue。每条未处理的消息都发送到那里。您可以订阅它并过滤传入的数据以检测哪些消息应该再次发送给他们的收件人。
我有以下 Actor,我正在尝试重新启动并将失败的消息重新发送回 Actor:
public class BuildActor : ReceivePersistentActor
{
public override string PersistenceId => "asdad3333";
private readonly IActorRef _nextActorRef;
public BuildActor(IActorRef nextActorRef)
{
_nextActorRef = nextActorRef;
Command<Workload>(x => Build(x));
RecoverAny(workload =>
{
Console.WriteLine("Recovering");
});
}
public void Build(Workload Workload)
{
var context = Context;
var self = Self;
Persist(Workload, async x =>
{
//after this line executes
//application goes into break mode
//does not execute PreStart or Recover
var workload = await BuildTask(Workload);
_nextActorRef.Tell(workload);
context.Stop(self);
});
}
private Task<Workload> BuildTask(Workload Workload)
{
//works as expected if method made synchronous
return Task.Run(() =>
{
//simulate exception
if (Workload.ShowException)
{
throw new Exception();
}
return Workload;
});
}
protected override void PreRestart(Exception reason, object message)
{
if (message is Workload workload)
{
Console.WriteLine("Prestart");
workload.ShowException = false;
Self.Tell(message);
}
}
}
在 Persist
的成功处理程序中,我正在尝试模拟抛出的异常,但在发生异常时应用程序进入中断模式并且未调用 PreRestart
挂钩。但是,如果我通过删除 Task.Run
使 BuildTask
方法同步,那么在异常时会调用 PreRestart
和 Recover<T>
方法。
如果有人能指出我推荐的模式应该是什么以及我哪里出错了,我将不胜感激。
很有可能,Akka.Persistence 不是解决您这里问题的好方法。
Akka.Persistence 使用 eventsourcing 原则来存储 actor 的状态。在这方面重要的几个关键点:
- 您发送给演员的是命令。它描述了一份工作,你想完成。执行该命令可能会导致进行一些实际处理,并最终可能导致以 events. 的形式持久保存 actor 的线性状态更改历史记录
- 在Akka.NET中
Persist
方法仅用于存储事件 - 它们描述事实,即某事已经发生:因此,它们不能被拒绝,也不能失败(这是您在Persist
回调中所做的事情)。 - 当 actor 在任何时间点重新启动时,它总是会尝试通过重播所有事件来重新创建自己的状态
Persist
ed 直到最后一个已知时间点。出于这个原因,重要的是Recover
方法应该只专注于重放演员的状态(它可以在同一事件中多次调用)并且永远不会产生副作用(副作用的例子是发送电子邮件)。那里抛出的任何异常都意味着 actor 状态已不可恢复地损坏并且该 actor 将被杀死。
如果您想将消息重新发送给您的演员,您可以:
- 将可靠的消息队列(即 RabbitMQ 或 Azure 服务总线)或日志(Kafka 或事件中心)放在您的参与者处理管道前面。这其实是很多情况下最合理的场景。
- 使用 Akka.Persistence 中的 at-least-once delivery 语义 - 但恕我直言,仅当由于某种原因您无法使用第一个解决方案时。
- 最简单和最不可靠的选项(因为消息仅驻留在内存中并且从不持久化)是 dead letter queue。每条未处理的消息都发送到那里。您可以订阅它并过滤传入的数据以检测哪些消息应该再次发送给他们的收件人。