如何在故障块上停止处理流水线?

How to stop processing pipeline on faulty block?

如果其中一个块决定发生错误,我如何停止处理 DataFlow 块,阻止下一个块到 运行。我认为块可以抛出异常,但不确定停止进一步处理管道的正确方法是什么。

更新:

private async void buttonDataFlow_Click(object sender, EventArgs e)
{
    var cells = objectListView.CheckedObjects.Cast<Cell>().ToList();
    if (cells == null)
        return;

    var blockPrepare = new TransformBlock<Cell, Cell>(new Func<Cell, Task<Cell>>(Prepare),
    new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 10000,
        MaxDegreeOfParallelism = Environment.ProcessorCount,
    });

    var blockPreparationFeedback = new TransformBlock<Cell, Cell>(new Func<Cell, Task<Cell>>(PreparationFeedback),
    new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 10000,
        MaxDegreeOfParallelism = Environment.ProcessorCount,
    });

    var blockTestMover = new ActionBlock<Cell>(new Func<Cell, Task>(TestMover),
    new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 10000,
        MaxDegreeOfParallelism = Environment.ProcessorCount,
    });

    blockPrepare.LinkTo(blockPreparationFeedback, new DataflowLinkOptions { PropagateCompletion = true });
    blockPreparationFeedback.LinkTo(blockTestMover, new DataflowLinkOptions { PropagateCompletion = true });

    foreach (Cell c in cells)
    {
        var progressHandler = new Progress<string>(value =>
        {
            c.Status = value;
        });

        c.Progress = progressHandler as IProgress<string>;
        blockPrepare.Post(c);
    };

    blockPrepare.Complete();
    try
    {
        await blockTestMover.Completion;
    }
    catch(Exception ee)
    {
        Console.WriteLine(ee.Message);
    }

    Console.WriteLine("Done");
}

更新 2:

    public ITargetBlock<TInput> CreateExceptionCatchingActionBlock<TInput>(
                    Func<TInput, Task> action,
                    Action<Exception> exceptionHandler,
                    ExecutionDataflowBlockOptions dataflowBlockOptions)
    {
        return new ActionBlock<TInput>(async input =>
        {
            try
            {
                await action(input);
            }
            catch (Exception ex)
            {
                exceptionHandler(ex);
            }
        }, dataflowBlockOptions);
    }

如果您有管道,您可能已经在使用 PropagateCompletion = true。这意味着如果管道中的一个块因异常而失败,则它之后的所有块也将失败。

剩下的就是停止失败块之前的所有块。为此,您可以等待管道中最后一个块的 Completion。如果这样做会抛出异常,则通过调用 Fault() 使第一个块失败。代码可能如下所示:

// set up your pipeline

try
{
    await lastBlock.Completion;
}
catch (Exception ex)
{
    ((IDataflowBlock)firstBlock).Fault(ex);

    throw; // or whatever is appropriate to propagate the exception up
}

如果您想要的是块中的异常意味着当前项目确实在管道中走得更远,但其他项目的处理应该继续而不会中断,那么您可以通过创建一个产生一个项目的块来实现如果处理成功,但在抛出异常时生成零项:

public IPropagatorBlock<TInput, TOutput> CreateExceptionCatchingTransformBlock<TInput, TOutput>(
    Func<TInput, Task<TOutput>> transform,
    Action<Exception> exceptionHandler,
    ExecutionDataflowBlockOptions dataflowBlockOptions)
{
    return new TransformManyBlock<TInput, TOutput>(async input =>
    {
        try
        {
            var result = await transform(input);
            return new[] { result };
        }
        catch (Exception ex)
        {
            exceptionHandler(ex);

            return Enumerable.Empty<TOutput>();
        }
    }, dataflowBlockOptions);
}