避免在一个块出现故障时关闭整个数据流网络
Avoid shutting down an entire data flow network when one block is faulted
我正在使用 DataFlowEx,我想知道在抛出异常时如何避免关闭整个 DataFlow。
我有一个系统,任务会随机进入,我希望网络记录失败,放弃该特定任务并继续执行其他任务。
阅读有关 TPL 和 DataFlowEx 的文档,特别是
It [a faulted block] should decline any further incoming messages. Here
DataflowEx takes a fast-fail approach on exception handling just like
TPL Dataflow. When an exception is thrown, the low-level block ends to
the Faulted state first. Then the Dataflow instance who is the parent
of the failing block gets notified. It will immediately propagate the
fatal error: notify its other children to shutdown immediately. After
all its children is done/completed, the parent Dataflow also comes to
its completion, with the original exception wrapped in the
CompletionTask whose status is also Faulted. Here
从失败中继续前进的障碍似乎不是故意的...
我的流程包括大量文件 IO,我预计偶尔会发生异常(网络卷在 read/write 期间离线、连接失败、权限问题...)
我不希望整个管道都死掉。
这是我正在使用的代码示例:
using Gridsum.DataflowEx;
using System;
using System.IO;
using System.Threading.Tasks.Dataflow;
namespace DataManagementSystem.Data.Pipeline.Actions
{
class CopyFlow : Dataflow<FileInfo, FileInfo>
{
private TransformBlock<FileInfo, FileInfo> Copier;
private string destination;
public CopyFlow(string destination) : base(DataflowOptions.Default)
{
this.destination = destination;
Copier = new TransformBlock<FileInfo, FileInfo>(f => Copy(f));
RegisterChild(Copier);
}
public override ITargetBlock<FileInfo> InputBlock { get { return Copier; } }
public override ISourceBlock<FileInfo> OutputBlock { get { return Copier; } }
protected virtual FileInfo Copy(FileInfo file)
{
try
{
return file.CopyTo(Path.Combine(destination, file.Name));
}
catch(Exception ex)
{
//Log the exception
//Abandon this unit of work
//resume processing subsequent units of work
}
}
}
}
这是我将工作发送到管道的方式:
var result = pipeline.ProcessAsync(new[] { file1, file2 }).Result;
如果块抛出 Exception
,它就会出错。如果您不希望管道失败,您可以不传播完成或处理 Exception
。处理异常可以采用多种形式,但听起来您所需要的只是简单的重试。您可以使用 try/catch
并实现您自己的重试循环或使用类似 Polly 的东西。下面是一个简单的例子。
public BuildPipeline() {
var waitTime = TimeSpan.FromSeconds(1);
var retryPolicy = Policy.Handle<IOException>()
.WaitAndRetryAsync(3, i => waitTime);
var fileIOBlock = new ActionBlock<string>(async fileName => await retryPolicy.ExecuteAsync(async () => await FileIOAsync(fileName)));
}
注意:此代码未经测试,但应该能让您朝着正确的方向前进。
编辑
您几乎拥有所需的一切。一旦捕获到异常并将其记录下来,您就可以 return null 或一些其他标记,您可以将其从管道中过滤到 NullTarget
。此代码确保 NullTarget
过滤 link 是 Copier
上的第一个 link,因此任何空值都不会到达您的实际目的地。
class CopyFlow : Dataflow<FileInfo, FileInfo> {
private TransformBlock<FileInfo, FileInfo> Copier;
private string destination;
public CopyFlow(string destination) : base(DataflowOptions.Default) {
this.destination = destination;
Copier = new TransformBlock<FileInfo, FileInfo>(f => Copy(f));
Copier.LinkTo(DataflowBlock.NullTarget<FileInfo>(), info => info == null);
RegisterChild(Copier);
}
public override ITargetBlock<FileInfo> InputBlock { get { return Copier; } }
public override ISourceBlock<FileInfo> OutputBlock { get { return Copier; } }
protected virtual FileInfo Copy(FileInfo file) {
try {
return file.CopyTo(Path.Combine(destination, file.Name));
} catch(Exception ex) {
//Log the exception
//Abandon this unit of work
//resume processing subsequent units of work
return null;
}
}
}
我正在使用 DataFlowEx,我想知道在抛出异常时如何避免关闭整个 DataFlow。
我有一个系统,任务会随机进入,我希望网络记录失败,放弃该特定任务并继续执行其他任务。
阅读有关 TPL 和 DataFlowEx 的文档,特别是
It [a faulted block] should decline any further incoming messages. Here
DataflowEx takes a fast-fail approach on exception handling just like TPL Dataflow. When an exception is thrown, the low-level block ends to the Faulted state first. Then the Dataflow instance who is the parent of the failing block gets notified. It will immediately propagate the fatal error: notify its other children to shutdown immediately. After all its children is done/completed, the parent Dataflow also comes to its completion, with the original exception wrapped in the CompletionTask whose status is also Faulted. Here
从失败中继续前进的障碍似乎不是故意的...
我的流程包括大量文件 IO,我预计偶尔会发生异常(网络卷在 read/write 期间离线、连接失败、权限问题...)
我不希望整个管道都死掉。
这是我正在使用的代码示例:
using Gridsum.DataflowEx;
using System;
using System.IO;
using System.Threading.Tasks.Dataflow;
namespace DataManagementSystem.Data.Pipeline.Actions
{
class CopyFlow : Dataflow<FileInfo, FileInfo>
{
private TransformBlock<FileInfo, FileInfo> Copier;
private string destination;
public CopyFlow(string destination) : base(DataflowOptions.Default)
{
this.destination = destination;
Copier = new TransformBlock<FileInfo, FileInfo>(f => Copy(f));
RegisterChild(Copier);
}
public override ITargetBlock<FileInfo> InputBlock { get { return Copier; } }
public override ISourceBlock<FileInfo> OutputBlock { get { return Copier; } }
protected virtual FileInfo Copy(FileInfo file)
{
try
{
return file.CopyTo(Path.Combine(destination, file.Name));
}
catch(Exception ex)
{
//Log the exception
//Abandon this unit of work
//resume processing subsequent units of work
}
}
}
}
这是我将工作发送到管道的方式:
var result = pipeline.ProcessAsync(new[] { file1, file2 }).Result;
如果块抛出 Exception
,它就会出错。如果您不希望管道失败,您可以不传播完成或处理 Exception
。处理异常可以采用多种形式,但听起来您所需要的只是简单的重试。您可以使用 try/catch
并实现您自己的重试循环或使用类似 Polly 的东西。下面是一个简单的例子。
public BuildPipeline() {
var waitTime = TimeSpan.FromSeconds(1);
var retryPolicy = Policy.Handle<IOException>()
.WaitAndRetryAsync(3, i => waitTime);
var fileIOBlock = new ActionBlock<string>(async fileName => await retryPolicy.ExecuteAsync(async () => await FileIOAsync(fileName)));
}
注意:此代码未经测试,但应该能让您朝着正确的方向前进。
编辑
您几乎拥有所需的一切。一旦捕获到异常并将其记录下来,您就可以 return null 或一些其他标记,您可以将其从管道中过滤到 NullTarget
。此代码确保 NullTarget
过滤 link 是 Copier
上的第一个 link,因此任何空值都不会到达您的实际目的地。
class CopyFlow : Dataflow<FileInfo, FileInfo> {
private TransformBlock<FileInfo, FileInfo> Copier;
private string destination;
public CopyFlow(string destination) : base(DataflowOptions.Default) {
this.destination = destination;
Copier = new TransformBlock<FileInfo, FileInfo>(f => Copy(f));
Copier.LinkTo(DataflowBlock.NullTarget<FileInfo>(), info => info == null);
RegisterChild(Copier);
}
public override ITargetBlock<FileInfo> InputBlock { get { return Copier; } }
public override ISourceBlock<FileInfo> OutputBlock { get { return Copier; } }
protected virtual FileInfo Copy(FileInfo file) {
try {
return file.CopyTo(Path.Combine(destination, file.Name));
} catch(Exception ex) {
//Log the exception
//Abandon this unit of work
//resume processing subsequent units of work
return null;
}
}
}