如何使用异常处理创建永无止境的 DataFlow Mesh?

How to create never ending DataFlow Mesh with exception handling?

我正在创建一个使用 TPL DataFlow 的任务处理器。我将遵循生产者消费者模型,其中生产者偶尔生产一些要处理的物品,而消费者则一直等待新物品的到来。这是我的代码:

async Task Main()
{
    var runner = new Runner();
    CancellationTokenSource cts = new CancellationTokenSource();
    Task runnerTask = runner.ExecuteAsync(cts.Token);

    await Task.WhenAll(runnerTask);
}

public class Runner
{
    public async Task ExecuteAsync(CancellationToken cancellationToken) {
        var random = new Random();

        ActionMeshProcessor processor = new ActionMeshProcessor();
        await processor.Init(cancellationToken);

        while (!cancellationToken.IsCancellationRequested)
        {
            await Task.Delay(TimeSpan.FromSeconds(1)); // wait before enqueuing more

            int[] items = GetItems(random.Next(3, 7));

            await processor.ProcessBlockAsync(items);
        }
    }

    private int[] GetItems(int count)
    {
        Random randNum = new Random();

        int[] arr = new int[count];
        for (int i = 0; i < count; i++)
        {
            arr[i] = randNum.Next(10, 20);
        }

        return arr;
    }
}

public class ActionMeshProcessor
{
    private TransformBlock<int, int> Transformer { get; set; }
    private ActionBlock<int> CompletionAnnouncer { get; set; }

    public async Task Init(CancellationToken cancellationToken)
    {
        var options = new ExecutionDataflowBlockOptions
        {
            CancellationToken = cancellationToken,
            MaxDegreeOfParallelism = 5,
            BoundedCapacity = 5
        };


        this.Transformer = new TransformBlock<int, int>(async input => {

            await Task.Delay(TimeSpan.FromSeconds(1)); //donig something complex here!

            if (input > 15)
            {
                throw new Exception($"I can't handle this number: {input}");
            }

            return input + 1;
        }, options);

        this.CompletionAnnouncer = new ActionBlock<int>(async input =>
        {
            Console.WriteLine($"Completed: {input}");

            await Task.FromResult(0);
        }, options);

        this.Transformer.LinkTo(this.CompletionAnnouncer);

        await Task.FromResult(0); // what do I await here?
    }

    public async Task ProcessBlockAsync(int[] arr)
    {
        foreach (var item in arr)
        {
            await this.Transformer.SendAsync(item); // await if there are no free slots
        }       
    }
}

我在上面添加了条件检查以抛出异常以模拟异常情况。

这是我的问题:

我看过 this similar question

异常

您的 init 中没有任何异步内容,它可能是一个标准的同步构造函数。您可以在提供给块的 lamda 中使用简单的 try catch 来处理网格中的异常,而无需关闭网格。然后,您可以通过从网格中过滤结果或忽略以下块中的结果来处理这种情况。下面是过滤的例子。对于 int 的简单情况,您可以使用 int? 并过滤掉任何 null 的值,当然您也可以根据需要设置任何类型的魔术指标值。如果您实际上传递了一个引用类型,您可以推出 null 或将数据项标记为脏的方式,可以通过 link.

上的谓词进行检查
public class ActionMeshProcessor {
    private TransformBlock<int, int?> Transformer { get; set; }
    private ActionBlock<int?> CompletionAnnouncer { get; set; }

    public ActionMeshProcessor(CancellationToken cancellationToken) {
        var options = new ExecutionDataflowBlockOptions {
            CancellationToken = cancellationToken,
            MaxDegreeOfParallelism = 5,
            BoundedCapacity = 5
        };


        this.Transformer = new TransformBlock<int, int?>(async input => {
            try {
                await Task.Delay(TimeSpan.FromSeconds(1)); //donig something complex here!

                if (input > 15) {
                    throw new Exception($"I can't handle this number: {input}");
                }

                return input + 1;
            } catch (Exception ex) {
                return null;
            }

        }, options);

        this.CompletionAnnouncer = new ActionBlock<int?>(async input =>
        {
            if (input == null) throw new ArgumentNullException("input");

            Console.WriteLine($"Completed: {input}");

            await Task.FromResult(0);
        }, options);

        //Filtering
        this.Transformer.LinkTo(this.CompletionAnnouncer, x => x != null);
        this.Transformer.LinkTo(DataflowBlock.NullTarget<int?>());
    }

    public async Task ProcessBlockAsync(int[] arr) {
        foreach (var item in arr) {
            await this.Transformer.SendAsync(item); // await if there are no free slots
        }
    }
}

完成

您可以从您的处理器公开 Complete()Completion,并在您的应用程序关闭时使用它们 await 完成,假设那是您关闭网格的唯一时间。另外,请确保通过 link 正确传播完成。

    //Filtering
    this.Transformer.LinkTo(this.CompletionAnnouncer, new DataflowLinkOptions() { PropagateCompletion = true }, x => x != null);
    this.Transformer.LinkTo(DataflowBlock.NullTarget<int?>());
}        

public void Complete() {
    Transformer.Complete();
}

public Task Completion {
    get { return CompletionAnnouncer.Completion; }
}

然后,根据您的示例,最有可能完成的位置是在驱动您的处理的循环之外:

public async Task ExecuteAsync(CancellationToken cancellationToken) {
    var random = new Random();

    ActionMeshProcessor processor = new ActionMeshProcessor();
    await processor.Init(cancellationToken);

    while (!cancellationToken.IsCancellationRequested) {
        await Task.Delay(TimeSpan.FromSeconds(1)); // wait before enqueuing more

        int[] items = GetItems(random.Next(3, 7));

        await processor.ProcessBlockAsync(items);
    }
    //asuming you don't intend to throw from cancellation
    processor.Complete();
    await processor.Completion();

}