使用数据流管道继续处理循环
Continue processing Loop using Data Flow pipeline
我正在研究数据流并尝试学习如何使用它们。我找到了很多展示如何使用不同块的示例,但其中 none 确实解释了如何处理异常。
我的主要问题是,如果发生异常或前一个转换块的输出不是您所期望的,如何继续 foreach 循环。下面是我用来测试的一个简单的 Windows 表单应用程序。它只是一个循环遍历数字列表并显示它们的按钮。
我在操作块中添加了一个 if 语句,表示如果数字 = 5,则抛出异常。循环看起来在遇到异常后继续处理,但在遇到异常后停止写入输出。异常也永远不会进入 foreach 循环中的 catch 子句。
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks.Dataflow;
using System.Windows.Forms;
namespace DataFlowsTest
{
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}
private void button1_Click(object sender, EventArgs e)
{
List<int> TestList = new List<int>() { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
var actionBlock = new ActionBlock<int>(item =>
{
if (item == 5)
throw new Exception("Blech.");
Debug.WriteLine(item.ToString());
});
foreach(var number in TestList)
{
try
{
actionBlock.Post(number);
}
catch(AggregateException ex)
{
Debug.WriteLine(ex.Message);
continue;
}
}
actionBlock.Complete();
}
}
}
此代码returns
1个
2个
3个
4个
抛出异常:DataFlowsTest.exe 中的 'System.Exception'
DataFlowsTest.exe 中发生类型 'System.Exception' 的异常,但未在用户代码中处理
漂白
您正在抛出 Exception
但只接住 AggregateException
为通用 (Exception ex
) 或您要捕获的类型添加捕获
以下是我的实现方式。如果您对这种方法感兴趣,我可以在 Github 上分享更多内容。我经常使用数据流,所以我已经基于这种方法实现了很多其他 IDataFlow classes。
积木
本质上,通过将每条消息包装在称为 Flow<T>
的 class 中,我们可以实现 Railway Oriented 方法。流有两种状态:失败或成功。成功的流 Flow<T>
被传递到下一个自定义数据流或在失败时连接到 FailureBlock : ITargetBlock<IFlow>
。 (本质上是一个 ActionBlock<IFlow>
处理异常、日志等
我的基本流程 class 如下所示:
public class Flow<T> : IFlow
{
public T Value { get; private set; }
public Exception Exception { get; private set; }
public bool Success => Exception is null;
public bool Failure => !Success;
public void Fail(Exception exception) => Exception = exception;
public Flow(T value) => Data = value;
public Flow(Exception exception) => Fail(exception);
public static Flow<T> FromValue<T>(T data) => new Flow<T>(data);
}
public interface IFlow
{
bool Success { get; }
bool Failure { get; }
Exception Exception { get; }
void Fail(Exception exception);
}
生成的自定义 IDataflow
下面的部分看起来很吓人,但其实并不可怕。它本质上是一个具有两个额外功能的 TransformBlock 包装器:
在此处输入代码1。每个自定义 FlowBlock<T1,T2>
将方法包装到 try { } catch { }
LinkTo
方法将成功的流链接到下一个块,将失败的流链接到 FailureBlock
public class FlowBlock<TInput, TOutput>: IPropagatorBlock<Flow<TInput>, Flow<TOutput>>
{
protected override ITargetBlock<Flow<TInput>> Target => TransformBlock;
protected override ISourceBlock<Flow<TOutput>> Source => TransformBlock;
private TransformBlock<Flow<TInput>, Flow<TOutput>> TransformBlock { get; }
private FailureBlock FailureBlock { get; }
public FlowBlock(
Func<TInput, Task<TOutput>> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions,
FailureBlock failureBlock)
{
TransformBlock = new TransformBlock<Flow<TInput>, Flow<TOutput>>(
async inFlow =>
{
try
{
return new Flow<TOutput>(await transform(inFlow.Data));
}
catch (Exception exception)
{
return new Flow<TOutput>(exception);
}
},
dataflowBlockOptions);
}
public override IDisposable LinkTo(
ITargetBlock<Flow<TOutput>> target,
DataflowLinkOptions linkOptions)
=> new Disposable(
Source.LinkTo(target, linkOptions, flow => flow.Success),
Source.LinkTo(OutputBlock, linkOptions, flow => flow.Failure));
}
如果您有兴趣,请在评论中告诉我,我很乐意打开一个包含更多详细信息的 github 存储库。
您在块中抛出异常。这将导致块移动到故障状态并将异常附加到它的 Completion
任务。
您的代码在 actionBlock.Post
周围只有 try/catch
,这不是引发异常的地方。
由于异常附加到完成任务,因此在块外捕获异常的唯一方法是 await actionBlock.Completion
,这将在块外重新抛出异常并允许您捕获 (Exception ex)
.
防止区块出现故障;在块中捕获异常。如果异常离开一个块,该块将出错并且不再接受新的输入。
var actionBlock = new ActionBlock<int>(item =>
{
try
{
if (item == 5)
throw new Exception("Blech.");
Debug.WriteLine(item.ToString());
}
catch (Exception ex)
{
Debug.WriteLine(ex);
}
});
此外,您还可以处理 Post
或更好的 SendAsync
的结果并对块错误做出反应:
foreach (var number in TestList)
{
if(!actionBlock.Post(number))
actionBlock.Complete();
try
{
await actionBlock.Completion;
}
catch (Exception ex)
{
Debug.WriteLine(ex.Message);
//actionBlock is now dead
break;
}
}
我正在研究数据流并尝试学习如何使用它们。我找到了很多展示如何使用不同块的示例,但其中 none 确实解释了如何处理异常。
我的主要问题是,如果发生异常或前一个转换块的输出不是您所期望的,如何继续 foreach 循环。下面是我用来测试的一个简单的 Windows 表单应用程序。它只是一个循环遍历数字列表并显示它们的按钮。
我在操作块中添加了一个 if 语句,表示如果数字 = 5,则抛出异常。循环看起来在遇到异常后继续处理,但在遇到异常后停止写入输出。异常也永远不会进入 foreach 循环中的 catch 子句。
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks.Dataflow;
using System.Windows.Forms;
namespace DataFlowsTest
{
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}
private void button1_Click(object sender, EventArgs e)
{
List<int> TestList = new List<int>() { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
var actionBlock = new ActionBlock<int>(item =>
{
if (item == 5)
throw new Exception("Blech.");
Debug.WriteLine(item.ToString());
});
foreach(var number in TestList)
{
try
{
actionBlock.Post(number);
}
catch(AggregateException ex)
{
Debug.WriteLine(ex.Message);
continue;
}
}
actionBlock.Complete();
}
}
}
此代码returns 1个 2个 3个 4个 抛出异常:DataFlowsTest.exe 中的 'System.Exception' DataFlowsTest.exe 中发生类型 'System.Exception' 的异常,但未在用户代码中处理 漂白
您正在抛出 Exception
但只接住 AggregateException
为通用 (Exception ex
) 或您要捕获的类型添加捕获
以下是我的实现方式。如果您对这种方法感兴趣,我可以在 Github 上分享更多内容。我经常使用数据流,所以我已经基于这种方法实现了很多其他 IDataFlow classes。
积木
本质上,通过将每条消息包装在称为 Flow<T>
的 class 中,我们可以实现 Railway Oriented 方法。流有两种状态:失败或成功。成功的流 Flow<T>
被传递到下一个自定义数据流或在失败时连接到 FailureBlock : ITargetBlock<IFlow>
。 (本质上是一个 ActionBlock<IFlow>
处理异常、日志等
我的基本流程 class 如下所示:
public class Flow<T> : IFlow
{
public T Value { get; private set; }
public Exception Exception { get; private set; }
public bool Success => Exception is null;
public bool Failure => !Success;
public void Fail(Exception exception) => Exception = exception;
public Flow(T value) => Data = value;
public Flow(Exception exception) => Fail(exception);
public static Flow<T> FromValue<T>(T data) => new Flow<T>(data);
}
public interface IFlow
{
bool Success { get; }
bool Failure { get; }
Exception Exception { get; }
void Fail(Exception exception);
}
生成的自定义 IDataflow
下面的部分看起来很吓人,但其实并不可怕。它本质上是一个具有两个额外功能的 TransformBlock 包装器:
在此处输入代码1。每个自定义 FlowBlock<T1,T2>
将方法包装到 try { } catch { }
LinkTo
方法将成功的流链接到下一个块,将失败的流链接到FailureBlock
public class FlowBlock<TInput, TOutput>: IPropagatorBlock<Flow<TInput>, Flow<TOutput>>
{
protected override ITargetBlock<Flow<TInput>> Target => TransformBlock;
protected override ISourceBlock<Flow<TOutput>> Source => TransformBlock;
private TransformBlock<Flow<TInput>, Flow<TOutput>> TransformBlock { get; }
private FailureBlock FailureBlock { get; }
public FlowBlock(
Func<TInput, Task<TOutput>> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions,
FailureBlock failureBlock)
{
TransformBlock = new TransformBlock<Flow<TInput>, Flow<TOutput>>(
async inFlow =>
{
try
{
return new Flow<TOutput>(await transform(inFlow.Data));
}
catch (Exception exception)
{
return new Flow<TOutput>(exception);
}
},
dataflowBlockOptions);
}
public override IDisposable LinkTo(
ITargetBlock<Flow<TOutput>> target,
DataflowLinkOptions linkOptions)
=> new Disposable(
Source.LinkTo(target, linkOptions, flow => flow.Success),
Source.LinkTo(OutputBlock, linkOptions, flow => flow.Failure));
}
如果您有兴趣,请在评论中告诉我,我很乐意打开一个包含更多详细信息的 github 存储库。
您在块中抛出异常。这将导致块移动到故障状态并将异常附加到它的 Completion
任务。
您的代码在 actionBlock.Post
周围只有 try/catch
,这不是引发异常的地方。
由于异常附加到完成任务,因此在块外捕获异常的唯一方法是 await actionBlock.Completion
,这将在块外重新抛出异常并允许您捕获 (Exception ex)
.
防止区块出现故障;在块中捕获异常。如果异常离开一个块,该块将出错并且不再接受新的输入。
var actionBlock = new ActionBlock<int>(item =>
{
try
{
if (item == 5)
throw new Exception("Blech.");
Debug.WriteLine(item.ToString());
}
catch (Exception ex)
{
Debug.WriteLine(ex);
}
});
此外,您还可以处理 Post
或更好的 SendAsync
的结果并对块错误做出反应:
foreach (var number in TestList)
{
if(!actionBlock.Post(number))
actionBlock.Complete();
try
{
await actionBlock.Completion;
}
catch (Exception ex)
{
Debug.WriteLine(ex.Message);
//actionBlock is now dead
break;
}
}