ActionBlock 永远不会在故障状态下完成

ActionBlock never completes on Faulted State

我正在尝试编写一个简单的程序以更好地理解 TPL Dataflow。我正在尝试创建一个长 运行 任务,如果它已完成,它将重新启动数据流块。

我当前的 RestartActionBlock 任务能够等待 ActionBlock 完成,如果我通过输入“-1”在块上显式调用完成。但是当我尝试引发异常以使块出错或调用块 Fault() 接口方法时,ActionBlock 的完成任务永远不会完成。在这种情况下,await singleTestFlow.Completion; 调用永远不会继续。

引发异常或调用 Fault() 方法后,我可以通过尝试向程序输入另一个输入并调试代码以查看块是否处于故障状态来确定块处于故障状态状态:

如果块处于故障状态,为什么 await singleTestFlow.Completion; 从未 return?

class Program
{
    private static ActionBlock<string> singleTestFlow;

    static void Main(string[] args)
    {
        //start thread that should restart a completed action block
        Task.Run(RestartActionBlock);

        Console.WriteLine("Enter -0 to exit, -1 to complete flow, -0- to throw an exception, Anything else otherwise");
        var input = Console.ReadLine();

        //allow user to input text until "-0" is entered
        while (!input.Equals("-0"))
        {
            if (input.Equals("-1"))
            {
                singleTestFlow.Complete();
            }

            singleTestFlow.Post(input);

            input = Console.ReadLine();
        }

        async Task RestartActionBlock()
        {
            var iterations = 0;
            while (true)
            {
                singleTestFlow = new ActionBlock<string>(s =>
                {
                    if (s.Equals("-0-"))
                    {
                        //throw new Exception("Something went wrong in here");
                        ((IDataflowBlock)singleTestFlow).Fault(new Exception("Something went wrong in here"));
                    }
                    Console.WriteLine($"{iterations}: " + s);

                }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = Environment.ProcessorCount });

                await singleTestFlow.Completion;

                var completionTask = singleTestFlow.Completion;

                var message = $"action block: {iterations} ";
                switch (completionTask.Status)
                {
                    case TaskStatus.RanToCompletion:
                        message += "ran to completion";
                        break;
                    case TaskStatus.Canceled:
                        message += "was canceled";
                        break;
                    case TaskStatus.Faulted:
                        message += "has faulted";
                        Console.WriteLine(completionTask.Exception);
                        break;
                }
                Console.WriteLine(message);
                iterations++;
            }
        }
    }


}

我在控制台中输入 "eee" 的位置是截取错误块调试屏幕截图的位置。

这是简单的解决方法:

替换

await singleTestFlow.Completion; // Throws if Completion is faulted, breaking your loop.

await Task.WhenAny(singleTestFlow.Completion);

await singleTestFlow.Completion.ContinueWith(_ => { });

以上所做的是避免传播Completion任务异常。这允许您的 RestartActionBlock 循环按照您的预期永远执行(而不是在您的块故障后立即死亡)。

然而,理想情况下,您不应忽略以下两个语句的 return 值:

Task.Run(RestartActionBlock); // Unobserved Task. Usually a bad idea.

singleTestFlow.Post(input);

如果您观察上面的 return 值,您实际上会注意到,一旦您的 ActionBlock<string> 出现故障,它们就会开始尖叫寻求帮助。

return 由 Task.Run(RestartActionBlock) 编辑的任务在 await singleTestFlow.Completion 抛出后立即转换为 Faulted 状态 - 但您永远不会收到通知,因为您没有保留参考所述任务,或检查其状态。

类似地,在您的 ActionBlock<string> 错误之后,对 singleTestFlow.Post(input) 的后续调用实际上是 return false,这意味着不再有项目发布到块中。它们只是被丢弃了。