如何以优雅的方式在发生致命异常时关闭 TPL 数据流?

How to shutdown TPL Dataflow on fatal exception, in a gracefully way?

我正在使用基于 TPL 数据流构建的顺序管道,它包含 3 个块:

问题是当出现service down之类的错误时如何关闭管道。管道必须以受控方式下降,因此 B2 的结果不会丢失。

解决方案很简单,但我花了几个回合才得到它,因为 Microsoft 站点上的基本图书馆信息背后的信息不多。

希望对大家有所帮助。该解决方案可以轻松地重新配置以满足其他要求。

提出的方法依赖于:

  • CancellationTokenSource 表示关机。 在发生致命异常的情况下,每个块都应通过共享 CancellationTokenSource 对象发出关闭信号。
  • 应通过共享 CancellationTokenSource 对象初始化信号后应立即停止工作的块
  • 程序必须等待最后一个块结束所有消息处理。

这里是管道中的解决方案 Class 和证明它有效的测试。

这是一个工作示例:

using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;
using System.Threading.Tasks;
using System.Diagnostics;

namespace Tests.Sets.Research
{
    [TestClass]
    public class TPLTest
    {
        public class PipeLine
        {
            CancellationTokenSource cancellationTokenSource;
            TransformBlock<int, int> b1, b2;
            ActionBlock<int> bFinal;

            static int SimulateWork(String blockName, int message, CancellationTokenSource cancellationTokenSource)
            {
                try
                {
                    Thread.Sleep(100);
                    Trace.WriteLine($"{blockName} processed: {message}");
                }
                catch (Exception ex)
                {
                    Trace.WriteLine($"Fatal error {ex.Message} at {blockName}");
                    cancellationTokenSource.Cancel();
                }
                return message;
            }


            public PipeLine(CancellationTokenSource cancellationTokenSource)
            {
                this.cancellationTokenSource = cancellationTokenSource;

                // Create three TransformBlock<int, int> objects. 
                // Each blocks <int, int> object calls the SimulateWork method.
                Func<string, int, CancellationTokenSource, int> doWork = (name, message, ct) => SimulateWork(name, message, ct);

                b1 = new TransformBlock<int, int>((m1) => doWork("b1", m1, cancellationTokenSource),
                   new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 , CancellationToken = cancellationTokenSource.Token}); //discard messages on  this block if cancel is signaled
                b2 = new TransformBlock<int, int>((m1) => doWork("b2", m1, cancellationTokenSource), new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
                bFinal = new ActionBlock<int>((m1) => doWork("bFinal", m1, cancellationTokenSource), new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });

                b1.LinkTo(b2, new DataflowLinkOptions { PropagateCompletion = true });
                b2.LinkTo(bFinal, new DataflowLinkOptions { PropagateCompletion = true });
            }

            internal void Complete()
            {
                b1.Complete();
            }

            public void waifForCompletetion()
            {               
                Trace.WriteLine($"Waiting for pipeline to end gracefully");
                bFinal.Completion.Wait();
                Trace.WriteLine($"Pipeline terminated");               
            }

            public void submitToPipe(int message)
            {
                if (cancellationTokenSource.IsCancellationRequested)
                {
                    Trace.WriteLine($"Message {message} was rejected. Pipe is shutting down.Throttling meanwhile");
                    return;
                }
                b1.SendAsync(message);
            }
        }

        [TestMethod]
        public void TestShutdown()
        {
            var cancellationTokenSource = new CancellationTokenSource();
            var pipeLine = new PipeLine(cancellationTokenSource);

            //post failure in 2 seconds. 
            //It would be the same if was signal from inside block 2
            Task.Factory.StartNew(async () =>
            {
                await Task.Delay(2000);
                Console.WriteLine("Time to shutdown the pipeline!");
                cancellationTokenSource.Cancel();
            });

            //send requests to pipe in background for 5 seconds
            Task.Run(async () =>
            {
                for (int i = 1; i < 100; i++)
                {
                    if (cancellationTokenSource.IsCancellationRequested)
                        break;

                    Thread.Sleep(50); //to see pipe closing input
                    pipeLine.submitToPipe(i);
                }
                pipeLine.Complete();
            });

            pipeLine.waifForCompletetion();
        }
    }
}

这里是结果:

b2 processed: 13
b1 processed: 22
Message 45 was rejected. Pipe is shutting down.Throttling meanwhile 
b2 processed: 14
bFinal processed: 8
b2 processed: 15
bFinal processed: 9
bFinal processed: 10
bFinal processed: 11
bFinal processed: 12
bFinal processed: 13
bFinal processed: 14
bFinal processed: 15
Pipeline terminated

自消息 45 被拒绝后,B1 上不再处理任何消息。

所有已经在 B2 队列中的消息都到达了管道的末端。

当数据流块completes时,表示:

[...] it should not accept nor produce any more messages nor consume any more postponed messages.

如果您希望 b2 成功处理的所有邮件都由 bFinal 处理,那么有三件事您 不应该 做:

  1. 不要用 BoundedCapacity 定义 bFinal 块。
  2. 不要取消 bFinal 块。
  3. 不要将 b2 的完成传播到 bFinal

如果你让 bFinal 有界,而碰巧 b2 处理消息的速度更快,那么如果 b2 失败,将在其输出中存储已处理的消息缓冲。这些消息将不会提供给链接块 bFinal。这些消息将丢失。

如果您执行 2 或 3 中的任何一个(取消 bFinal 或将错误的 b2 块的完成传播到 bFinal),那么 bFinal块不会处理存储在其输入缓冲区中的任何消息。它将等到当前正在进行的消息完成,然后在取消或故障状态下完成自身,丢弃其输入缓冲区中的消息。所以不要这样做:

b2.LinkTo(bFinal, new DataflowLinkOptions { PropagateCompletion = true });

改为这样做:

b2.LinkTo(bFinal);
b2.PropagateCompletionAlwaysSuccessful(bFinal);

这里是PropagateCompletionAlwaysSuccessful扩展方法:

public static async void PropagateCompletionAlwaysSuccessful(this IDataflowBlock source,
    IDataflowBlock target)
{
    try
    {
        await source.Completion.ConfigureAwait(false);
    }
    catch
    {
        // Ignore exception
    }
    finally
    {
        target.Complete();
    }
}

要处理任何 b2bFinal 块中发生的任何异常,请在最后执行此操作:

await Task.WhenAll(b2.Completion, bFinal.Completion);