Akka.Net Streams - 当缓冲区抛出错误时,Source 停止拉取元素

Akka.Net Streams - Source stops pulling elements when buffer throws error

我一直在为 Akka.Net 使用 streams extension package 并在尝试组合缓冲和节流方法时注意到这个错误:

using (var system = ActorSystem.Create("test-system"))
using (var materializer = system.Materializer(GetSettings(system)))
{
            int index = 0;
            var sink = Sink.ActorRefWithAck<KeyValue>(
                system.ActorOf<Writer>(), 
                new OnInitMessage(), 
                new OnAcknowledgeMessage(), 
                OnComplete.Instance, 
                exception => new OnError(exception));

            ServiceBusSource
                .Create(client, message =>
                {
                    var json = new StreamReader(message.GetBody<Stream>(), Encoding.UTF8).ReadToEnd();
                    var result = JsonConvert.DeserializeObject<KeyValue>(json);

                    message.Complete();

                    return result;
                })
                .WithLogger(system, entity => $"{entity.Key} => {entity.Value}")
                .Buffer(1, OverflowStrategy.Fail)
                .Throttle(1, TimeSpan.FromSeconds(5), 3, ThrottleMode.Shaping)
                .ToMaterialized(sink, Keep.Right)
                .Run(materializer);

            Console.ReadLine();
}

我正在使用 ServiceBusSource from Alpakka 这些是我引用的包:

故意让它失败以查看行为如何但是,在缓冲区策略失败后,流完成并没有更多的元素被拉动。

KeyValue.cs

public class KeyValue
{
    public int Id { get; set; }

    public string Key { get; set; }

    public string Value { get; set; }

    public DateTime Produced { get; set; }

    public DateTime Emitted { get; set; }

    public override string ToString()
    {
        return $"[{Produced}] - [{Emitted}] => {Id} {Key}:{Value}";
    }
}

获取设置方法:

ActorMaterializerSettings GetSettings(ActorSystem system)
        {
            return ActorMaterializerSettings.Create(system)
                .WithSupervisionStrategy(cause =>
                {
                    system.Log.Error(cause, "Failed");
                    return Directive.Resume;
                });
        }

有几种处理流内部错误的方法 - 其中大部分在 docs:

中进行了描述
  1. 使用 Recover 从错误中创建回退事件。
  2. 使用 RecoverWithRetries 允许在出错时重定向到不同的流。
  3. 使用 Restart.WithBackoff 在指数退避延迟后重建重试流。
  4. 使用 WithSupervisionStrategy - 这是一个非常有限的选项,因为它仅适用于明确引用它的阶段(如文档中所述)。

您的情况是设计使然 - 当您使用 OverflowStrategy.Fail 时,这意味着一旦达到溢出,就会产生错误。大多数 akka 阶段的反应是在失败时立即关闭流。