如何将 ActionBlock 的所有异常包装在单个 AggregateException 中
How to wrap all exceptions of an ActionBlock in a single AggregateException
我遇到了 TPL ActionBlock
,它似乎对于(节流的)并行异步操作非常方便。到目前为止,我正在使用 Task.WhenAll()
(+ Semaphore
进行节流)。说到异常,似乎有很大的不同:
var successList = new List<int>();
var failedList = new List<int>();
try
{
var actionBlock = new ActionBlock<int>(
async x => await Task.Run(() =>
{
if (x < 5)
{
failedList.Add(x);
throw new Exception(x.ToString());
}
successList.Add(x);
}),
new ExecutionDataflowBlockOptions());
Enumerable.Range(1, 10).Each(x => actionBlock.Post(x));
actionBlock.Complete();
await actionBlock.Completion.ConfigureAwait(false);
}
catch (Exception ex)
{
// works for approach using task.whenall
Console.WriteLine(ex);
Assert.True(failedList.Count == 4);
Assert.True(successList.Count == 6);
return;
}
Assert.Fail();
此测试失败,因为ActionBlock
出现异常时立即停止。
我发现这是 github 上的一个问题:Dataflow: Add options for Task Faulting。显然这种行为是不可配置的。
Task.WhenAll()
结合这样的扩展方法:
public static async Task PreserveAllExceptions(this Task task)
{
try
{
await task.ConfigureAwait(false);
}
catch
{
throw task.Exception;
}
}
将所有(!)异常包装在 AggregateException
但继续处理:
await Task.WhenAll(task1,task2).PreserveAllExceptions().ConfigureAwait(false);
是否有使用 ActionBlock
实现此目的的简便方法?
更新:
澄清一下:
- 我不打算使用 sempahore 进行节流(我为什么要这样做?)因为
ExecutionDataflowBlockOptions
中已经有这样的选项
- 代码片段只是一个演示“问题”的假人;
Task.Run()
仅用作实际异步函数的占位符。
- 我真正想做的是:
以并行方式处理所有消息。
不要取消对错误的进一步消息处理。
处理完所有消息后,return 并指出至少发生了一个错误并且 return 所有错误 -> 我的
Task.WhenAll()
和 AggregateException 的工作方式究竟是怎样的。
我知道我可以 try{}catch{}
在我的 ActionBlock
中并以某种方式存储异常,但我想知道是否有任何配置可能使这更容易。
无论如何,在我使用 ActionBlock
的所有地方使用 try catch 和收集异常并不是什么大不了的事。我只是发现 Task.WhenAll()
+PreserveException
扩展对我来说更干净。
不清楚问题的内容。但很明显,ActionBlock 被滥用了。不需要 Task.Run
,因为 ActionBlock 已经使用了一个或多个辅助任务。不需要信号量,因为 ActionBlock(和其他块)已经通过限制工作任务和输入队列的数量来支持节流。
代码似乎也在尝试使用异常作为控制流机制,这在过程代码中也是错误的。
例外并不意味着逃避障碍。 Dataflow is a completely different computing paradigm 来自通用过程范式 - 没有相互调用的函数,因此没有 调用者 来接收和处理异常。
在数据流中,块在一个方向上相互传递消息。块在管道或网络中组合,接收消息,处理它们并将它们传递给任何连接的块。如果发生异常,则没有可以接收该异常的“调用者”。未处理的异常是灾难性的,会导致整个管道崩溃 - 不仅仅是一个块,而是 linked 到 PropagateCompletion
设置为 true 的任何下游块。上游块永远不会知道这一点,导致意外情况。
节流
使用 ActionBlock 进行节流很容易 - 对于初学者来说,所有块都只使用一个工作任务。可以通过限制上游调用者的输入缓冲区并使用 await block.SendAsync()
而不是 block.Post
来限制上游调用者。不需要 Task.Run
,因为该块已经使用了工作任务:
var options=new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism=2,
BoundedCapacity=2
};
var block =new ActionBlock<Message>(processMessage,options);
...
async Task processMessage(Message msg) { ...}
这足以只允许两个并发操作,如果已经有两个消息在等待,则停止 poster。如果缓冲区已满,以下代码中的 SendAsync
将等待直到有可用的插槽:
foreach(var msg in someInputCollection)
{
await block.SendAsync(msg);
}
就是这样。该块将同时处理 2 条消息(默认仅为 1 条)并且在其输入缓冲区中一次仅接受 2 条消息。如果缓冲区已满,posting 循环将等待。
Quick & dirty rate limiting 可以通过在处理方法中添加延迟来实现:
var block =new ActionBlock<Message>(msg=>{
await Task.Delay(200);
await processMessage(msg);
},options);
有条件的路由
问题的代码似乎是使用异常来实现控制流。这在任何图书馆或范例中都是错误的。由于数据流在 网络 中工作,因此控制流的等效项是条件路由。
这也是可用的,通过接受 predicate
参数的 LinkTo 重载,该参数决定是否应将消息传递给特定的 link。
在问题的情况下,假设有一个上游 TransformBlock
生成整数,LinkTo
可用于将消息路由到不同的 BufferBlocks:
var success=new BufferBlock<int>();
var failure=new BufferBlock<int>();
var block=new TransformBlock<Message,int>(...);
//Success if x>=5
block.LinkTo(success,x=>x>=5);
//By default, everything else goes to Failure
block.LinkTo(failure);
就是这样。唯一的“技巧”是谓词应该涵盖所有选项,否则消息将停留在 block
的输出缓冲区中。在所有其他之后使用 default
link 有助于确保没有未处理的消息。
错误处理
块不应允许异常逃逸。有几种错误处理策略取决于应用程序想要做什么。
处理并记录
一种选择是处理它们并将它们记录在适当的位置,与处理 Web 应用程序中的错误的方式相同:
var block =new ActionBlock(msg=>{
尝试
{
等待过程消息(味精);
}
赶上(异常exc)
{
_logger.LogError(exc,....);
}
},选项);
Post到另一个街区
另一种可能性是 post 异常,可能还有关于传入消息的信息,直接到另一个块。该块可以记录错误和消息或在延迟后重试。该块后面可能有一个不同的管道,在将消息发送到 dead-letter 缓冲区之前以增加的延迟重试消息,类似于对消息队列所做的事情:
var block =new ActionBlock<Message>(msg=>{
try
{
await processMessage(msg);
}
catch(SomeRetriableException exc)
{
_retryBlock.Post(new RetryMsg(msg,exc));
}
catch(Exception exc)
{
_logger.LogError(exc,....);
}
},options);
要使用的策略取决于应用程序的作用。如果将 ActionBlock 用作简单的后台工作程序,则可能仅记录就足够了。
环绕和布线
在更高级的情况下,消息可以包装在 Envelope<>
中,其中包含消息和可能的任何异常。路由可用于将成功消息与失败消息分开:
class Envelope<T>
{
public T Message{get;}
public Exception Error {get;}
public Envelope (T msg)
{
Message=msg;
}
public Envelope(T msg,Exception err)
{
Message=msg;
Error=err;
}
}
块现在 returns 信封 :
var block=new TransformBlock<Envelope<Message>,Envelope<int>>(env=>{
try
{
var msg=env.Message;
....
return new Envelope(6);
}
catch(Exception exc)
{
return new Envelope(msg,exc);
}
});
这允许使用条件路由将错误路由到 errorBlock
:
var errorBlock = ActionBlock<Envelope<Message>>(...);
var success=new BufferBlock<int>();
var failure=new BufferBlock<int>();
//Send errors to `errorBlock`
block.LinkTo(errorBlock,env=>env.Error!=null);
//Success if x>=5
block.LinkTo(success,x=>x.Message>=5);
//By default, everything else goes to Failure
block.LinkTo(failure);
没有简单的方法来聚合所有异常并通过 ActionBlock
的 Completion
属性 传播它们。不幸的是,TPL 数据流组件不容易扩展。如果你真的想要,你 可以 做到这一点,方法是将 ActionBlock
封装在这个块的 custom block and customizing the Completion
中。例如:
public class MyActionBlock<TInput> : ITargetBlock<TInput>
{
private readonly ActionBlock<TInput> _actionBlock;
private readonly ConcurrentQueue<Exception> _exceptions;
//...
public Task Completion
{
get
{
return _actionBlock.Completion.ContinueWith(t =>
{
if (_exceptions.Count > 0)
throw new AggregateException(_exceptions);
});
}
}
}
...但是这个简单的代码不会传播取消,也不会传播通过 IDataflowBlock
接口的 Fault
方法传递的任何异常。因此,您将不得不花费大量的精力才能使其在所有情况下都能正常工作,而且投资是否值得值得怀疑。
我遇到了 TPL ActionBlock
,它似乎对于(节流的)并行异步操作非常方便。到目前为止,我正在使用 Task.WhenAll()
(+ Semaphore
进行节流)。说到异常,似乎有很大的不同:
var successList = new List<int>();
var failedList = new List<int>();
try
{
var actionBlock = new ActionBlock<int>(
async x => await Task.Run(() =>
{
if (x < 5)
{
failedList.Add(x);
throw new Exception(x.ToString());
}
successList.Add(x);
}),
new ExecutionDataflowBlockOptions());
Enumerable.Range(1, 10).Each(x => actionBlock.Post(x));
actionBlock.Complete();
await actionBlock.Completion.ConfigureAwait(false);
}
catch (Exception ex)
{
// works for approach using task.whenall
Console.WriteLine(ex);
Assert.True(failedList.Count == 4);
Assert.True(successList.Count == 6);
return;
}
Assert.Fail();
此测试失败,因为ActionBlock
出现异常时立即停止。
我发现这是 github 上的一个问题:Dataflow: Add options for Task Faulting。显然这种行为是不可配置的。
Task.WhenAll()
结合这样的扩展方法:
public static async Task PreserveAllExceptions(this Task task)
{
try
{
await task.ConfigureAwait(false);
}
catch
{
throw task.Exception;
}
}
将所有(!)异常包装在 AggregateException
但继续处理:
await Task.WhenAll(task1,task2).PreserveAllExceptions().ConfigureAwait(false);
是否有使用 ActionBlock
实现此目的的简便方法?
更新: 澄清一下:
- 我不打算使用 sempahore 进行节流(我为什么要这样做?)因为
ExecutionDataflowBlockOptions
中已经有这样的选项
- 代码片段只是一个演示“问题”的假人;
Task.Run()
仅用作实际异步函数的占位符。 - 我真正想做的是:
以并行方式处理所有消息。
不要取消对错误的进一步消息处理。
处理完所有消息后,return 并指出至少发生了一个错误并且 return 所有错误 -> 我的
Task.WhenAll()
和 AggregateException 的工作方式究竟是怎样的。 我知道我可以try{}catch{}
在我的ActionBlock
中并以某种方式存储异常,但我想知道是否有任何配置可能使这更容易。 无论如何,在我使用ActionBlock
的所有地方使用 try catch 和收集异常并不是什么大不了的事。我只是发现Task.WhenAll()
+PreserveException
扩展对我来说更干净。
不清楚问题的内容。但很明显,ActionBlock 被滥用了。不需要 Task.Run
,因为 ActionBlock 已经使用了一个或多个辅助任务。不需要信号量,因为 ActionBlock(和其他块)已经通过限制工作任务和输入队列的数量来支持节流。
代码似乎也在尝试使用异常作为控制流机制,这在过程代码中也是错误的。
例外并不意味着逃避障碍。 Dataflow is a completely different computing paradigm 来自通用过程范式 - 没有相互调用的函数,因此没有 调用者 来接收和处理异常。
在数据流中,块在一个方向上相互传递消息。块在管道或网络中组合,接收消息,处理它们并将它们传递给任何连接的块。如果发生异常,则没有可以接收该异常的“调用者”。未处理的异常是灾难性的,会导致整个管道崩溃 - 不仅仅是一个块,而是 linked 到 PropagateCompletion
设置为 true 的任何下游块。上游块永远不会知道这一点,导致意外情况。
节流
使用 ActionBlock 进行节流很容易 - 对于初学者来说,所有块都只使用一个工作任务。可以通过限制上游调用者的输入缓冲区并使用 await block.SendAsync()
而不是 block.Post
来限制上游调用者。不需要 Task.Run
,因为该块已经使用了工作任务:
var options=new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism=2,
BoundedCapacity=2
};
var block =new ActionBlock<Message>(processMessage,options);
...
async Task processMessage(Message msg) { ...}
这足以只允许两个并发操作,如果已经有两个消息在等待,则停止 poster。如果缓冲区已满,以下代码中的 SendAsync
将等待直到有可用的插槽:
foreach(var msg in someInputCollection)
{
await block.SendAsync(msg);
}
就是这样。该块将同时处理 2 条消息(默认仅为 1 条)并且在其输入缓冲区中一次仅接受 2 条消息。如果缓冲区已满,posting 循环将等待。
Quick & dirty rate limiting 可以通过在处理方法中添加延迟来实现:
var block =new ActionBlock<Message>(msg=>{
await Task.Delay(200);
await processMessage(msg);
},options);
有条件的路由
问题的代码似乎是使用异常来实现控制流。这在任何图书馆或范例中都是错误的。由于数据流在 网络 中工作,因此控制流的等效项是条件路由。
这也是可用的,通过接受 predicate
参数的 LinkTo 重载,该参数决定是否应将消息传递给特定的 link。
在问题的情况下,假设有一个上游 TransformBlock
生成整数,LinkTo
可用于将消息路由到不同的 BufferBlocks:
var success=new BufferBlock<int>();
var failure=new BufferBlock<int>();
var block=new TransformBlock<Message,int>(...);
//Success if x>=5
block.LinkTo(success,x=>x>=5);
//By default, everything else goes to Failure
block.LinkTo(failure);
就是这样。唯一的“技巧”是谓词应该涵盖所有选项,否则消息将停留在 block
的输出缓冲区中。在所有其他之后使用 default
link 有助于确保没有未处理的消息。
错误处理
块不应允许异常逃逸。有几种错误处理策略取决于应用程序想要做什么。
处理并记录
一种选择是处理它们并将它们记录在适当的位置,与处理 Web 应用程序中的错误的方式相同:
var block =new ActionBlock(msg=>{ 尝试 { 等待过程消息(味精); } 赶上(异常exc) { _logger.LogError(exc,....); } },选项);
Post到另一个街区
另一种可能性是 post 异常,可能还有关于传入消息的信息,直接到另一个块。该块可以记录错误和消息或在延迟后重试。该块后面可能有一个不同的管道,在将消息发送到 dead-letter 缓冲区之前以增加的延迟重试消息,类似于对消息队列所做的事情:
var block =new ActionBlock<Message>(msg=>{
try
{
await processMessage(msg);
}
catch(SomeRetriableException exc)
{
_retryBlock.Post(new RetryMsg(msg,exc));
}
catch(Exception exc)
{
_logger.LogError(exc,....);
}
},options);
要使用的策略取决于应用程序的作用。如果将 ActionBlock 用作简单的后台工作程序,则可能仅记录就足够了。
环绕和布线
在更高级的情况下,消息可以包装在 Envelope<>
中,其中包含消息和可能的任何异常。路由可用于将成功消息与失败消息分开:
class Envelope<T>
{
public T Message{get;}
public Exception Error {get;}
public Envelope (T msg)
{
Message=msg;
}
public Envelope(T msg,Exception err)
{
Message=msg;
Error=err;
}
}
块现在 returns 信封 :
var block=new TransformBlock<Envelope<Message>,Envelope<int>>(env=>{
try
{
var msg=env.Message;
....
return new Envelope(6);
}
catch(Exception exc)
{
return new Envelope(msg,exc);
}
});
这允许使用条件路由将错误路由到 errorBlock
:
var errorBlock = ActionBlock<Envelope<Message>>(...);
var success=new BufferBlock<int>();
var failure=new BufferBlock<int>();
//Send errors to `errorBlock`
block.LinkTo(errorBlock,env=>env.Error!=null);
//Success if x>=5
block.LinkTo(success,x=>x.Message>=5);
//By default, everything else goes to Failure
block.LinkTo(failure);
没有简单的方法来聚合所有异常并通过 ActionBlock
的 Completion
属性 传播它们。不幸的是,TPL 数据流组件不容易扩展。如果你真的想要,你 可以 做到这一点,方法是将 ActionBlock
封装在这个块的 custom block and customizing the Completion
中。例如:
public class MyActionBlock<TInput> : ITargetBlock<TInput>
{
private readonly ActionBlock<TInput> _actionBlock;
private readonly ConcurrentQueue<Exception> _exceptions;
//...
public Task Completion
{
get
{
return _actionBlock.Completion.ContinueWith(t =>
{
if (_exceptions.Count > 0)
throw new AggregateException(_exceptions);
});
}
}
}
...但是这个简单的代码不会传播取消,也不会传播通过 IDataflowBlock
接口的 Fault
方法传递的任何异常。因此,您将不得不花费大量的精力才能使其在所有情况下都能正常工作,而且投资是否值得值得怀疑。