使用 TPL 数据流的 TransformBlock 的实现问题

Implementation issue with TransformBlock using TPL Dataflow

我正在学习 TPL 数据流。我尝试创建一个示例,我在其中发布来自不同 Task 的一些值,并期望结果回到相同的 Task 以进一步处理它。但结果是错误的。以下是我的代码。让我知道我做错了什么以及如何解决它。

static void Main(string[] args)
{

    var transBlock = new TransformBlock<int, int>
       (
           n =>
           {
               Thread.Sleep(1000);

               return (n*2);
           }
       );

    new Task(() => 
    {

       var result = transBlock.Post(2);
       var val = transBlock.Receive();

        Console.WriteLine(string.Format("double for 2 is {0}", val));
    }).Start();

    new Task(() =>
    {

        var result = transBlock.Post(3);
        var val = transBlock.Receive();

        Console.WriteLine(string.Format("double for 3 is {0}", val));
    }).Start();

    new Task(() =>
    {

        var result = transBlock.Post(4);
        var val = transBlock.Receive();

        Console.WriteLine(string.Format("double for 4 is {0}", val));
    }).Start();

    new Task(() =>
    {

        var result = transBlock.Post(5);
        var val = transBlock.Receive();

        Console.WriteLine(string.Format("double for 5 is {0}", val));
    }).Start();

    new Task(() =>
    {

        var result = transBlock.Post(6);
        var val = transBlock.Receive();

        Console.WriteLine(string.Format("double for 6 is {0}", val));
    }).Start();

    new Task(() =>
    {

        var result = transBlock.Post(7);
        var val = transBlock.Receive();

        Console.WriteLine(string.Format("double for 7 is {0}", val));

    }).Start();

    Console.ReadLine();
}

结果每次都不同,但有一次是这样的:

double for 5 is 8
double for 4 is 6
double for 3 is 4
double for 2 is 10
double for 6 is 12
double for 7 is 14

我不知道 TPL Dataflow 是否是 FIFO,但即使 ,您的代码也存在竞争条件。

只考虑这两个:

new Task(() => 
{

   var result = transBlock.Post(2);
   var val = transBlock.Receive();

    Console.WriteLine(string.Format("double for 2 is {0}", val));
}).Start();

new Task(() =>
{

    var result = transBlock.Post(3);
    var val = transBlock.Receive();

    Console.WriteLine(string.Format("double for 3 is {0}", val));
}).Start();

这些任务可能会也可能不会在单独的线程上执行。但是,如果他们,第二个任务有可能post3,然后将上下文传递给第一个任务,post s 2 并接收 3.

TPL 数据流不是这样工作的。

TPL Dataflow 是一个 actor 框架。您创建一个块,告诉它要做什么,将 post 项放入其中,它一个接一个(可能同时)执行每个项目的操作,然后输出结果。如果您有多个块,那么您可以将它们链接在一起并形成一个管道。

方块不知道是谁post将哪个项目放入其中。没有理由期望结果 return 进入匹配任务。

如果你想跟踪输入和输出,你可以return一个输入和输出的元组:

var transBlock = new TransformBlock<int, Tuple<int,int>>(async n =>
{
    await Task.Delay(1000)
    return Tuple.Create(n, n * 2);
});


var tuple = transBlock.Receive();
Console.WriteLine(string.Format("double for {0} is {1}", tuple.Item1, tuple.Item2));