C# TPL:可以在任意步骤重新启动失败的管道吗?
C# TPL: Possible to restart a failed Pipeline at an arbitrary step?
我有一个包含大约 20 个连续步骤的数据处理作业。这些步骤都属于以下三个类别之一:
- 做一些文件操作
- 从数据库导入/导出数据
- 调用第 3 方网站 API
我已经使用示例 here and here 将代码从一个冗长、糟糕的方法重构为管道模式。所有的步骤都是TransformBlock,比如
var stepThirteenPostToWebApi = new TransformBlock<FileInfo, System.Guid>(async csv =>
{
dynamic task = await ApiUtils.SubmitData(csv.FullName);
return task.guid;
});
代码大部分时间都有效,但管道中的某个步骤偶尔会因某种原因而失败 - 假设在 20 个步骤中的第 6 步无法读取损坏的文件(只是一个例子 - 任何步骤都可能失败) .管道停止 运行 进一步的任务,因为它应该。
但是,第 3 方网络 API 引入了一个挑战 - 无论我们执行所有 20 个步骤还是只执行第一个步骤,我们都会为我们启动的每项工作付费。
我希望能够修复问题步骤中的任何错误(同样,对于我们的示例,假设我在 20 步中的第 6 步中修复了损坏的文件),然后在第 6 步恢复。第 3 步party web API 每个作业都有一个 GUID,并且是异步的,所以应该没问题 - 问题解决后,它会很乐意让作业继续执行剩余步骤。
我的问题:假设该步骤的先决条件有效,是否有可能(如果这样建议?)设计一个可以从任何步骤开始的管道?
它看起来像:
- 作业在第 6 步失败,并将第 5 步记录为最后一个成功的步骤
- 一个人出现并修复了导致第 6 步失败的任何问题
- 新管道在第 6 步开始
我意识到一种蛮力方法是使用 StartAtStep2()
、StartAtStep3()
、StartAtStep4()
方法。这似乎不是一个好的设计,但我对这种模式有点陌生,所以也许这是可以接受的。
蛮力方法还不错,例如你上面的代码只需要
bool StartAtStepThirteen(FileInfo csv)
{
return stepThirteenPostToWebApi.Post(csv);
}
链的设置应该是与链的执行不同的方法。您应该将 stepThirteenPostToWebApi
保存在代表整个链的 class 中的 class 级别变量中,链的设置可以在 class 的构造函数中完成。
这是该过程的简单 3 步版本。当错误发生而不是使任务链出错时,我记录错误并沿着链传递 null
以获取无效条目。您可以让该日志方法引发一个事件,然后用户可以决定如何处理错误的条目。
public class WorkChain
{
private readonly TransformBlock<string, FileInfo> stepOneGetFileInfo;
private readonly TransformBlock<FileInfo, System.Guid?> stepTwoPostToWebApi;
private readonly ActionBlock<System.Guid?> stepThreeDisplayIdToUser;
public WorkChain()
{
stepOneGetFileInfo = new TransformBlock<string, FileInfo>(new Func<string, FileInfo>(GetFileInfo));
stepTwoPostToWebApi = new TransformBlock<FileInfo, System.Guid?>(new Func<FileInfo, Task<Guid?>>(PostToWebApi));
stepThreeDisplayIdToUser = new ActionBlock<System.Guid?>(new Action<Guid?>(DisplayIdToUser));
stepOneGetFileInfo.LinkTo(stepTwoPostToWebApi, new DataflowLinkOptions() {PropagateCompletion = true});
stepTwoPostToWebApi.LinkTo(stepThreeDisplayIdToUser, new DataflowLinkOptions() {PropagateCompletion = true});
}
public void PostToStepOne(string path)
{
bool result = stepOneGetFileInfo.Post(path);
if (!result)
{
throw new InvalidOperationException("Failed to post to stepOneGetFileInfo");
}
}
public void PostToStepTwo(FileInfo csv)
{
bool result = stepTwoPostToWebApi.Post(csv);
if (!result)
{
throw new InvalidOperationException("Failed to post to stepTwoPostToWebApi");
}
}
public void PostToStepThree(Guid id)
{
bool result = stepThreeDisplayIdToUser.Post(id);
if (!result)
{
throw new InvalidOperationException("Failed to post to stepThreeDisplayIdToUser");
}
}
public void CompleteAdding()
{
stepOneGetFileInfo.Complete();
}
public Task Completion { get { return stepThreeDisplayIdToUser.Completion; } }
private FileInfo GetFileInfo(string path)
{
try
{
return new FileInfo(path);
}
catch (Exception ex)
{
LogGetFileInfoError(ex, path);
return null;
}
}
private async Task<Guid?> PostToWebApi(FileInfo csv)
{
if (csv == null)
return null;
try
{
dynamic task = await ApiUtils.SubmitData(csv.FullName);
return task.guid;
}
catch (Exception ex)
{
LogPostToWebApiError(ex, csv);
return null;
}
}
private void DisplayIdToUser(Guid? obj)
{
if(obj == null)
return;
Console.WriteLine(obj.Value);
}
}
我有一个包含大约 20 个连续步骤的数据处理作业。这些步骤都属于以下三个类别之一:
- 做一些文件操作
- 从数据库导入/导出数据
- 调用第 3 方网站 API
我已经使用示例 here and here 将代码从一个冗长、糟糕的方法重构为管道模式。所有的步骤都是TransformBlock,比如
var stepThirteenPostToWebApi = new TransformBlock<FileInfo, System.Guid>(async csv =>
{
dynamic task = await ApiUtils.SubmitData(csv.FullName);
return task.guid;
});
代码大部分时间都有效,但管道中的某个步骤偶尔会因某种原因而失败 - 假设在 20 个步骤中的第 6 步无法读取损坏的文件(只是一个例子 - 任何步骤都可能失败) .管道停止 运行 进一步的任务,因为它应该。
但是,第 3 方网络 API 引入了一个挑战 - 无论我们执行所有 20 个步骤还是只执行第一个步骤,我们都会为我们启动的每项工作付费。
我希望能够修复问题步骤中的任何错误(同样,对于我们的示例,假设我在 20 步中的第 6 步中修复了损坏的文件),然后在第 6 步恢复。第 3 步party web API 每个作业都有一个 GUID,并且是异步的,所以应该没问题 - 问题解决后,它会很乐意让作业继续执行剩余步骤。
我的问题:假设该步骤的先决条件有效,是否有可能(如果这样建议?)设计一个可以从任何步骤开始的管道?
它看起来像:
- 作业在第 6 步失败,并将第 5 步记录为最后一个成功的步骤
- 一个人出现并修复了导致第 6 步失败的任何问题
- 新管道在第 6 步开始
我意识到一种蛮力方法是使用 StartAtStep2()
、StartAtStep3()
、StartAtStep4()
方法。这似乎不是一个好的设计,但我对这种模式有点陌生,所以也许这是可以接受的。
蛮力方法还不错,例如你上面的代码只需要
bool StartAtStepThirteen(FileInfo csv)
{
return stepThirteenPostToWebApi.Post(csv);
}
链的设置应该是与链的执行不同的方法。您应该将 stepThirteenPostToWebApi
保存在代表整个链的 class 中的 class 级别变量中,链的设置可以在 class 的构造函数中完成。
这是该过程的简单 3 步版本。当错误发生而不是使任务链出错时,我记录错误并沿着链传递 null
以获取无效条目。您可以让该日志方法引发一个事件,然后用户可以决定如何处理错误的条目。
public class WorkChain
{
private readonly TransformBlock<string, FileInfo> stepOneGetFileInfo;
private readonly TransformBlock<FileInfo, System.Guid?> stepTwoPostToWebApi;
private readonly ActionBlock<System.Guid?> stepThreeDisplayIdToUser;
public WorkChain()
{
stepOneGetFileInfo = new TransformBlock<string, FileInfo>(new Func<string, FileInfo>(GetFileInfo));
stepTwoPostToWebApi = new TransformBlock<FileInfo, System.Guid?>(new Func<FileInfo, Task<Guid?>>(PostToWebApi));
stepThreeDisplayIdToUser = new ActionBlock<System.Guid?>(new Action<Guid?>(DisplayIdToUser));
stepOneGetFileInfo.LinkTo(stepTwoPostToWebApi, new DataflowLinkOptions() {PropagateCompletion = true});
stepTwoPostToWebApi.LinkTo(stepThreeDisplayIdToUser, new DataflowLinkOptions() {PropagateCompletion = true});
}
public void PostToStepOne(string path)
{
bool result = stepOneGetFileInfo.Post(path);
if (!result)
{
throw new InvalidOperationException("Failed to post to stepOneGetFileInfo");
}
}
public void PostToStepTwo(FileInfo csv)
{
bool result = stepTwoPostToWebApi.Post(csv);
if (!result)
{
throw new InvalidOperationException("Failed to post to stepTwoPostToWebApi");
}
}
public void PostToStepThree(Guid id)
{
bool result = stepThreeDisplayIdToUser.Post(id);
if (!result)
{
throw new InvalidOperationException("Failed to post to stepThreeDisplayIdToUser");
}
}
public void CompleteAdding()
{
stepOneGetFileInfo.Complete();
}
public Task Completion { get { return stepThreeDisplayIdToUser.Completion; } }
private FileInfo GetFileInfo(string path)
{
try
{
return new FileInfo(path);
}
catch (Exception ex)
{
LogGetFileInfoError(ex, path);
return null;
}
}
private async Task<Guid?> PostToWebApi(FileInfo csv)
{
if (csv == null)
return null;
try
{
dynamic task = await ApiUtils.SubmitData(csv.FullName);
return task.guid;
}
catch (Exception ex)
{
LogPostToWebApiError(ex, csv);
return null;
}
}
private void DisplayIdToUser(Guid? obj)
{
if(obj == null)
return;
Console.WriteLine(obj.Value);
}
}