如何将 ActionBlock 的所有异常包装在单个 AggregateException 中

How to wrap all exceptions of an ActionBlock in a single AggregateException

我遇到了 TPL ActionBlock,它似乎对于(节流的)并行异步操作非常方便。到目前为止,我正在使用 Task.WhenAll()(+ Semaphore 进行节流)。说到异常,似乎有很大的不同:

var successList = new List<int>();
var failedList = new List<int>();
try
{
    var actionBlock = new ActionBlock<int>(
        async x => await Task.Run(() =>
        {
            if (x < 5)
            {
                failedList.Add(x);
                throw new Exception(x.ToString());
            }

            successList.Add(x);
        }),
        new ExecutionDataflowBlockOptions());

    Enumerable.Range(1, 10).Each(x => actionBlock.Post(x));
    actionBlock.Complete();

    await actionBlock.Completion.ConfigureAwait(false);
}
catch (Exception ex)
{
    // works for approach using task.whenall
    Console.WriteLine(ex);
    Assert.True(failedList.Count == 4);
    Assert.True(successList.Count == 6);
    return;
}

Assert.Fail();

此测试失败,因为ActionBlock 出现异常时立即停止。 我发现这是 github 上的一个问题:Dataflow: Add options for Task Faulting。显然这种行为是不可配置的。

Task.WhenAll() 结合这样的扩展方法:

public static async Task PreserveAllExceptions(this Task task)
{
    try
    {
        await task.ConfigureAwait(false);
    }
    catch
    {
        throw task.Exception;
    }
}

将所有(!)异常包装在 AggregateException 但继续处理:

  await Task.WhenAll(task1,task2).PreserveAllExceptions().ConfigureAwait(false);

是否有使用 ActionBlock 实现此目的的简便方法?

更新: 澄清一下:

  1. 我不打算使用 sempahore 进行节流(我为什么要这样做?)因为 ExecutionDataflowBlockOptions
  2. 中已经有这样的选项
  3. 代码片段只是一个演示“问题”的假人; Task.Run() 仅用作实际异步函数的占位符。
  4. 我真正想做的是: 以并行方式处理所有消息。 不要取消对错误的进一步消息处理。 处理完所有消息后,return 并指出至少发生了一个错误并且 return 所有错误 -> 我的 Task.WhenAll() 和 AggregateException 的工作方式究竟是怎样的。 我知道我可以 try{}catch{} 在我的 ActionBlock 中并以某种方式存储异常,但我想知道是否有任何配置可能使这更容易。 无论如何,在我使用 ActionBlock 的所有地方使用 try catch 和收集异常并不是什么大不了的事。我只是发现 Task.WhenAll()+PreserveException 扩展对我来说更干净。

不清楚问题的内容。但很明显,ActionBlock 被滥用了。不需要 Task.Run,因为 ActionBlock 已经使用了一个或多个辅助任务。不需要信号量,因为 ActionBlock(和其他块)已经通过限制工作任务和输入队列的数量来支持节流。

代码似乎也在尝试使用异常作为控制流机制,这在过程代码中也是错误的。

例外并不意味着逃避障碍。 Dataflow is a completely different computing paradigm 来自通用过程范式 - 没有相互调用的函数,因此没有 调用者 来接收和处理异常。

在数据流中,块在一个方向上相互传递消息。块在管道或网络中组合,接收消息,处理它们并将它们传递给任何连接的块。如果发生异常,则没有可以接收该异常的“调用者”。未处理的异常是灾难性的,会导致整个管道崩溃 - 不仅仅是一个块,而是 linked 到 PropagateCompletion 设置为 true 的任何下游块。上游块永远不会知道这一点,导致意外情况。

节流

使用 ActionBlock 进行节流很容易 - 对于初学者来说,所有块都只使用一个工作任务。可以通过限制上游调用者的输入缓冲区并使用 await block.SendAsync() 而不是 block.Post 来限制上游调用者。不需要 Task.Run,因为该块已经使用了工作任务:

var options=new ExecutionDataflowBlockOptions 
{ 
    MaxDegreeOfParallelism=2, 
    BoundedCapacity=2
};
var block =new ActionBlock<Message>(processMessage,options);

...
async Task processMessage(Message msg) { ...}

这足以只允许两个并发操作,如果已经有两个消息在等待,则停止 poster。如果缓冲区已满,以下代码中的 SendAsync 将等待直到有可用的插槽:

foreach(var msg in someInputCollection)
{
    await block.SendAsync(msg);
}

就是这样。该块将同时处理 2 条消息(默认仅为 1 条)并且在其输入缓冲区中一次仅接受 2 条消息。如果缓冲区已满,posting 循环将等待。

Quick & dirty rate limiting 可以通过在处理方法中添加延迟来实现:

var block =new ActionBlock<Message>(msg=>{
    await Task.Delay(200);
    await processMessage(msg);
},options);

有条件的路由

问题的代码似乎是使用异常来实现控制流。这在任何图书馆或范例中都是错误的。由于数据流在 网络 中工作,因此控制流的等效项是条件路由。

这也是可用的,通过接受 predicate 参数的 LinkTo 重载,该参数决定是否应将消息传递给特定的 link。

在问题的情况下,假设有一个上游 TransformBlock 生成整数,LinkTo 可用于将消息路由到不同的 BufferBlocks:

var success=new BufferBlock<int>();
var failure=new BufferBlock<int>();

var block=new TransformBlock<Message,int>(...);
//Success if x>=5
block.LinkTo(success,x=>x>=5);
//By default, everything else goes to Failure
block.LinkTo(failure);

就是这样。唯一的“技巧”是谓词应该涵盖所有选项,否则消息将停留在 block 的输出缓冲区中。在所有其他之后使用 default link 有助于确保没有未处理的消息。

错误处理

块不应允许异常逃逸。有几种错误处理策略取决于应用程序想要做什么。

处理并记录

一种选择是处理它们并将它们记录在适当的位置,与处理 Web 应用程序中的错误的方式相同:

var block =new ActionBlock(msg=>{ 尝试 { 等待过程消息(味精); } 赶上(异常exc) { _logger.LogError(exc,....); } },选项);

Post到另一个街区

另一种可能性是 post 异常,可能还有关于传入消息的信息,直接到另一个块。该块可以记录错误和消息或在延迟后重试。该块后面可能有一个不同的管道,在将消息发送到 dead-letter 缓冲区之前以增加的延迟重试消息,类似于对消息队列所做的事情:

var block =new ActionBlock<Message>(msg=>{
    try
    {
        await processMessage(msg);
    }
    catch(SomeRetriableException exc)
    {
        _retryBlock.Post(new RetryMsg(msg,exc));
    }
    catch(Exception exc)
    {
       
       _logger.LogError(exc,....);
    }
},options);

要使用的策略取决于应用程序的作用。如果将 ActionBlock 用作简单的后台工作程序,则可能仅记录就足够了。

环绕和布线

在更高级的情况下,消息可以包装在 Envelope<> 中,其中包含消息和可能的任何异常。路由可用于将成功消息与失败消息分开:

class Envelope<T>
{
    public T Message{get;}
    public Exception Error {get;}
    
    public Envelope (T msg)
    {
        Message=msg;
    }
    
    public Envelope(T msg,Exception err)
    {
        Message=msg;
        Error=err;
    }
}

块现在 returns 信封 :

var block=new TransformBlock<Envelope<Message>,Envelope<int>>(env=>{
    try
    {
        var msg=env.Message;
        ....
        return new Envelope(6);
    }
    catch(Exception exc)
    {
        return new Envelope(msg,exc);
    }
});

这允许使用条件路由将错误路由到 errorBlock

var errorBlock = ActionBlock<Envelope<Message>>(...);

var success=new BufferBlock<int>();
var failure=new BufferBlock<int>();

//Send errors to `errorBlock`
block.LinkTo(errorBlock,env=>env.Error!=null);

//Success if x>=5
block.LinkTo(success,x=>x.Message>=5);
//By default, everything else goes to Failure
block.LinkTo(failure);

没有简单的方法来聚合所有异常并通过 ActionBlockCompletion 属性 传播它们。不幸的是,TPL 数据流组件不容易扩展。如果你真的想要,你 可以 做到这一点,方法是将 ActionBlock 封装在这个块的 custom block and customizing the Completion 中。例如:

public class MyActionBlock<TInput> : ITargetBlock<TInput>
{
    private readonly ActionBlock<TInput> _actionBlock;
    private readonly ConcurrentQueue<Exception> _exceptions;

    //...

    public Task Completion
    {
        get
        {
            return _actionBlock.Completion.ContinueWith(t =>
            {
                if (_exceptions.Count > 0)
                    throw new AggregateException(_exceptions);
            });
        }
    }
}

...但是这个简单的代码不会传播取消,也不会传播通过 IDataflowBlock 接口的 Fault 方法传递的任何异常。因此,您将不得不花费大量的精力才能使其在所有情况下都能正常工作,而且投资是否值得值得怀疑。