TransformBlock Items 卡在输出队列中。为什么以及如何解决?

TransformBlock Items get stuck in the output queue. Why and how to fix?

我已经了解了 TPL DataFlow,并且在使用 TrasformBlock 链接到 ActionBlock 的代码中遇到了一个非常恼人的问题。

最终我发现项目卡在 TransformBlock 的输出队列中,因为它的 OutputCount 属性 不断返回高于“0”的值。 这就是整个应用程序陷入僵局的原因。但是,一旦我调用 TransformBlock.TryReceiveAll().

,它就会解锁

任何人都可以告诉我是否遗漏了什么或如何防止此类行为?

static void Main()
{
    int total = 0;
    int itemsProcessing = 0;

    TransformBlock<int, Tuple<int, double>> transformBlock = new TransformBlock<int, Tuple<int, double>>(
        i => new Tuple<int, double>(i, Math.Sqrt(i)),
        new ExecutionDataflowBlockOptions
        {
            BoundedCapacity = 20,
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        });

    ActionBlock<Tuple<int, double>> outputBlock = new ActionBlock<Tuple<int, double>>(async tuple =>
        {
            await Task.Delay(1000); // simulating data output delay
            Interlocked.Decrement(ref itemsProcessing);
        },
        new ExecutionDataflowBlockOptions
        {
            BoundedCapacity = 5,
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        });

    transformBlock.Completion.ContinueWith(t => outputBlock.Complete());

    using (Timer timer = new Timer(o =>
        {
            Console.Title = string.Format(
                "{0}: {1}/{2} {3}/{4}/{5}",
                Assembly.GetExecutingAssembly().GetName().Name,
                Volatile.Read(ref itemsProcessing), Volatile.Read(ref total),
                transformBlock.InputCount, transformBlock.OutputCount, outputBlock.InputCount);
        }, null, 100, 100))
    {
        using (transformBlock.LinkTo(outputBlock, new DataflowLinkOptions { PropagateCompletion = true }))
        {
            for (int i = 0; i < 40; i++)
            {
                Thread.Sleep(100); // simulating new item retrieval delay

                Interlocked.Increment(ref total);
                Interlocked.Increment(ref itemsProcessing);

                transformBlock.SendAsync(i).Wait();
            }
        }

        Console.WriteLine("Enqueued");

        transformBlock.Complete();
        outputBlock.Completion.Wait();

        Console.WriteLine("Finish");

        timer.Change(Timeout.Infinite, Timeout.Infinite);
        timer.Dispose();
    }
}

调用 TransformBlock.LinkTo 可让您重新获得一次性注册。当您处理该注册时,块将取消链接。

您的 using 作用域结束得太早,并且块在 TransformBlock 有机会将自身清空到 ActionBlock 之前取消链接,从而阻止它完成。由于第一个块没有完成,下一个块甚至都没有开始完成,更不用说完成了。

将等待移动到 using 块内解决了死锁:

using (transformBlock.LinkTo(outputBlock, new DataflowLinkOptions { PropagateCompletion = true }))
{
    for (int i = 0; i < 40; i++)
    {
        Thread.Sleep(100); // simulating new item retrieval delay

        Interlocked.Increment(ref total);
        Interlocked.Increment(ref itemsProcessing);

        transformBlock.SendAsync(i).Wait();
    }

    Console.WriteLine("Enqueued");
    transformBlock.Complete();
    outputBlock.Completion.Wait();
    Console.WriteLine("Finish");
}

附带说明一下,您真的不应该以这种方式阻塞异步代码。使用 async-await 代替 Wait()Task.Delay 代替 Thread.Sleep 等会简单得多

此外,由于您使用的是 PropagateCompletion,因此无需显式调用 outputBlock.Complete()