使用数据流管道继续处理循环

Continue processing Loop using Data Flow pipeline

我正在研究数据流并尝试学习如何使用它们。我找到了很多展示如何使用不同块的示例,但其中 none 确实解释了如何处理异常。

我的主要问题是,如果发生异常或前一个转换块的输出不是您所期望的,如何继续 foreach 循环。下面是我用来测试的一个简单的 Windows 表单应用程序。它只是一个循环遍历数字列表并显示它们的按钮。

我在操作块中添加了一个 if 语句,表示如果数字 = 5,则抛出异常。循环看起来在遇到异常后继续处理,但在遇到异常后停止写入输出。异常也永远不会进入 foreach 循环中的 catch 子句。

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks.Dataflow;
using System.Windows.Forms;

namespace DataFlowsTest
{
    public partial class Form1 : Form
    {
        public Form1()
        {
            InitializeComponent();
        }

        private void button1_Click(object sender, EventArgs e)
        {
            List<int> TestList = new List<int>() { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };

            var actionBlock = new ActionBlock<int>(item =>
            {
                if (item == 5)
                    throw new Exception("Blech.");
                Debug.WriteLine(item.ToString());
            });


            foreach(var number in TestList)
            {
                try
                {
                    actionBlock.Post(number);
                }
                catch(AggregateException ex)
                {
                    Debug.WriteLine(ex.Message);
                    continue;
                }
            }
            actionBlock.Complete();
        }
    }
}

此代码returns 1个 2个 3个 4个 抛出异常:DataFlowsTest.exe 中的 'System.Exception' DataFlowsTest.exe 中发生类型 'System.Exception' 的异常,但未在用户代码中处理 漂白

您正在抛出 Exception 但只接住 AggregateException

为通用 (Exception ex) 或您要捕获的类型添加捕获

以下是我的实现方式。如果您对这种方法感兴趣,我可以在 Github 上分享更多内容。我经常使用数据流,所以我已经基于这种方法实现了很多其他 IDataFlow classes。

积木

本质上,通过将每条消息包装在称为 Flow<T> 的 class 中,我们可以实现 Railway Oriented 方法。流有两种状态:失败或成功。成功的流 Flow<T> 被传递到下一个自定义数据流或在失败时连接到 FailureBlock : ITargetBlock<IFlow>。 (本质上是一个 ActionBlock<IFlow> 处理异常、日志等

我的基本流程 class 如下所示:

public class Flow<T> : IFlow
{
  public T Value { get; private set; }
  public Exception Exception { get; private set; }
  public bool Success => Exception is null;
  public bool Failure => !Success;
  public void Fail(Exception exception) => Exception = exception;
  public Flow(T value) => Data = value;
  public Flow(Exception exception) => Fail(exception);
  public static Flow<T> FromValue<T>(T data) => new Flow<T>(data);
}
public interface IFlow
{
  bool Success { get; }
  bool Failure { get; }
  Exception Exception { get; }
  void Fail(Exception exception);
}

生成的自定义 IDataflow

下面的部分看起来很吓人,但其实并不可怕。它本质上是一个具有两个额外功能的 TransformBlock 包装器:

在此处输入代码1。每个自定义 FlowBlock<T1,T2> 将方法包装到 try { } catch { }

  1. LinkTo 方法将成功的流链接到下一个块,将失败的流链接到 FailureBlock
public class FlowBlock<TInput, TOutput>: IPropagatorBlock<Flow<TInput>, Flow<TOutput>>
{
    protected override ITargetBlock<Flow<TInput>> Target => TransformBlock;
    protected override ISourceBlock<Flow<TOutput>> Source => TransformBlock;
    private TransformBlock<Flow<TInput>, Flow<TOutput>> TransformBlock { get; }
    private FailureBlock FailureBlock { get; }
    public FlowBlock(
        Func<TInput, Task<TOutput>> transform,
        ExecutionDataflowBlockOptions dataflowBlockOptions,
        FailureBlock failureBlock)
    {
        TransformBlock = new TransformBlock<Flow<TInput>, Flow<TOutput>>(
            async inFlow =>
            {
                try
                {
                    return new Flow<TOutput>(await transform(inFlow.Data));
                }
                catch (Exception exception)
                {
                    return new Flow<TOutput>(exception);
                }
            },
            dataflowBlockOptions);
    }
    public override IDisposable LinkTo(
        ITargetBlock<Flow<TOutput>> target,
        DataflowLinkOptions linkOptions)
        => new Disposable(
            Source.LinkTo(target, linkOptions, flow => flow.Success),
            Source.LinkTo(OutputBlock, linkOptions, flow => flow.Failure));
}

如果您有兴趣,请在评论中告诉我,我很乐意打开一个包含更多详细信息的 github 存储库。

您在块中抛出异常。这将导致块移动到故障状态并将异常附加到它的 Completion 任务。

您的代码在 actionBlock.Post 周围只有 try/catch,这不是引发异常的地方。

由于异常附加到完成任务,因此在块外捕获异常的唯一方法是 await actionBlock.Completion,这将在块外重新抛出异常并允许您捕获 (Exception ex).

防止区块出现故障;在块中捕获异常。如果异常离开一个块,该块将出错并且不再接受新的输入。

var actionBlock = new ActionBlock<int>(item =>
{
    try
    {
        if (item == 5)
            throw new Exception("Blech.");
        Debug.WriteLine(item.ToString());
    }
    catch (Exception ex)
    {
        Debug.WriteLine(ex);
    }

});

此外,您还可以处理 Post 或更好的 SendAsync 的结果并对块错误做出反应:

foreach (var number in TestList)
{
    if(!actionBlock.Post(number))
        actionBlock.Complete();
    try
    {
        await actionBlock.Completion;
    }
    catch (Exception ex)
    {
        Debug.WriteLine(ex.Message);
        //actionBlock is now dead
        break;
    }
}