Akka.net 容错、SuperVisorStrategy 和丢失导致异常的消息

Akka.net faulttolerance, SuperVisorStrategy and losing the message that caused the exception

我正在查看 Akka.net 容错并设置了一个简单示例,其中 Actor1 告诉 Actor2 一条消息。 Actor2 抛出异常。 Actor1 有一个 SuperVisorStrategy,它告诉失败的 actor 继续。

我其实预计消息会再次传递给 Actor2。但事实并非如此。因此 Actor2 恢复并可以继续处理新消息。 但是现在使 Actor 失败的消息消失了。这应该如何处理?我不想丢失导致异常的消息。我希望 Actor2 再次处理消息。

namespace ConsoleApplication
{
    class Program
    {
        static void Main(string[] args)
        {
            using (ActorSystem actorSystem = ActorSystem.Create("test"))
            {
                IActorRef customer = actorSystem.ActorOf(Props.Create<Actor1>(), "actor1");

                customer.Tell(new Start());
                Console.Read();
            }
        }
    }

    public class Actor1 : UntypedActor
    {
        protected override SupervisorStrategy SupervisorStrategy()
        {
            return new OneForOneStrategy(3, TimeSpan.FromSeconds(5), ex =>
            {
                if (ex is ApplicationException)
                    return Directive.Resume;

                return Directive.Escalate;
            });
        }

        protected override void OnReceive(object message)
        {
            if (message is Start)
            {
                IActorRef actor2Ref = Context.ActorOf<Actor2>("actor2");
                Context.Watch(actor2Ref);

                actor2Ref.Tell(new DoSomething());
            }

            else if (message is Response)
            {
                Console.WriteLine("Response received");
                return;
            }
            else if (message is Terminated)
            {
                Console.WriteLine("Terminated");
            }
        }
    }

    public class Actor2 : UntypedActor
    {
        protected override void OnReceive(object message)
        {
            if (message is DoSomething)
            {
                // only called once.
                throw new ApplicationException("testexception");
            }
        }
    }

    public class Start
    {
    }

    public class DoSomething
    {
    }

    public class Response
    {
    }
}

一般来说,您不太可能想要连续多次重新处理同一条容易出错的消息。如果它导致异常,在同一毫秒内一次又一次地处理它可能会带来完全相同的结果。

可能想要实现的是在延迟一段时间后尝试重新处理它,即因为您正在尝试重新连接到外部服务。在这种情况下,您可能希望将其包装到另一条消息中,并在 ActorSystem 调度程序中安排重新发送。基本示例:

sealed class Retry
{
    public readonly object Message;
    public readonly int Ttl;

    public Retry(object message, int ttl)
    {
        Message = message;
        Ttl = ttl;
    }
}

class MyActor : ReceiveActor 
{
    ...

    protected override void PreRestart(Exception reason, object message)
    {
        Retry oldRetry;
        var retry = (oldRetry = message as Retry) != null 
            ? new Retry(oldRetry.message, oldRetry.Ttl - 1)
            : new Retry(message, retryCount);

        if (retry.Ttl > 0)
            Context.System.Scheduler.ScheduleTellOnce(delay, Self, retry, Sender);

        base.PreRestart(reason, message);
    }
}

根据您的需要,其他一些情况可能还涉及使用 CircuitBreakers 或更可靠的传递语义 - 默认情况下 Akka.NET 提供至多一次传递语义,您可以使用 AtLeastOnceDelivery 组件更改此设置来自 Akka.Persistence 插件。