使用 TPL 数据流的 TransformBlock 的实现问题
Implementation issue with TransformBlock using TPL Dataflow
我正在学习 TPL 数据流。我尝试创建一个示例,我在其中发布来自不同 Task
的一些值,并期望结果回到相同的 Task
以进一步处理它。但结果是错误的。以下是我的代码。让我知道我做错了什么以及如何解决它。
static void Main(string[] args)
{
var transBlock = new TransformBlock<int, int>
(
n =>
{
Thread.Sleep(1000);
return (n*2);
}
);
new Task(() =>
{
var result = transBlock.Post(2);
var val = transBlock.Receive();
Console.WriteLine(string.Format("double for 2 is {0}", val));
}).Start();
new Task(() =>
{
var result = transBlock.Post(3);
var val = transBlock.Receive();
Console.WriteLine(string.Format("double for 3 is {0}", val));
}).Start();
new Task(() =>
{
var result = transBlock.Post(4);
var val = transBlock.Receive();
Console.WriteLine(string.Format("double for 4 is {0}", val));
}).Start();
new Task(() =>
{
var result = transBlock.Post(5);
var val = transBlock.Receive();
Console.WriteLine(string.Format("double for 5 is {0}", val));
}).Start();
new Task(() =>
{
var result = transBlock.Post(6);
var val = transBlock.Receive();
Console.WriteLine(string.Format("double for 6 is {0}", val));
}).Start();
new Task(() =>
{
var result = transBlock.Post(7);
var val = transBlock.Receive();
Console.WriteLine(string.Format("double for 7 is {0}", val));
}).Start();
Console.ReadLine();
}
结果每次都不同,但有一次是这样的:
double for 5 is 8
double for 4 is 6
double for 3 is 4
double for 2 is 10
double for 6 is 12
double for 7 is 14
我不知道 TPL Dataflow
是否是 FIFO,但即使 是 ,您的代码也存在竞争条件。
只考虑这两个:
new Task(() =>
{
var result = transBlock.Post(2);
var val = transBlock.Receive();
Console.WriteLine(string.Format("double for 2 is {0}", val));
}).Start();
new Task(() =>
{
var result = transBlock.Post(3);
var val = transBlock.Receive();
Console.WriteLine(string.Format("double for 3 is {0}", val));
}).Start();
这些任务可能会也可能不会在单独的线程上执行。但是,如果他们做,第二个任务有可能post3
,然后将上下文传递给第一个任务,post s 2
并接收 3
.
TPL 数据流不是这样工作的。
TPL Dataflow 是一个 actor 框架。您创建一个块,告诉它要做什么,将 post 项放入其中,它一个接一个(可能同时)执行每个项目的操作,然后输出结果。如果您有多个块,那么您可以将它们链接在一起并形成一个管道。
方块不知道是谁post将哪个项目放入其中。没有理由期望结果 return 进入匹配任务。
如果你想跟踪输入和输出,你可以return一个输入和输出的元组:
var transBlock = new TransformBlock<int, Tuple<int,int>>(async n =>
{
await Task.Delay(1000)
return Tuple.Create(n, n * 2);
});
var tuple = transBlock.Receive();
Console.WriteLine(string.Format("double for {0} is {1}", tuple.Item1, tuple.Item2));
我正在学习 TPL 数据流。我尝试创建一个示例,我在其中发布来自不同 Task
的一些值,并期望结果回到相同的 Task
以进一步处理它。但结果是错误的。以下是我的代码。让我知道我做错了什么以及如何解决它。
static void Main(string[] args)
{
var transBlock = new TransformBlock<int, int>
(
n =>
{
Thread.Sleep(1000);
return (n*2);
}
);
new Task(() =>
{
var result = transBlock.Post(2);
var val = transBlock.Receive();
Console.WriteLine(string.Format("double for 2 is {0}", val));
}).Start();
new Task(() =>
{
var result = transBlock.Post(3);
var val = transBlock.Receive();
Console.WriteLine(string.Format("double for 3 is {0}", val));
}).Start();
new Task(() =>
{
var result = transBlock.Post(4);
var val = transBlock.Receive();
Console.WriteLine(string.Format("double for 4 is {0}", val));
}).Start();
new Task(() =>
{
var result = transBlock.Post(5);
var val = transBlock.Receive();
Console.WriteLine(string.Format("double for 5 is {0}", val));
}).Start();
new Task(() =>
{
var result = transBlock.Post(6);
var val = transBlock.Receive();
Console.WriteLine(string.Format("double for 6 is {0}", val));
}).Start();
new Task(() =>
{
var result = transBlock.Post(7);
var val = transBlock.Receive();
Console.WriteLine(string.Format("double for 7 is {0}", val));
}).Start();
Console.ReadLine();
}
结果每次都不同,但有一次是这样的:
double for 5 is 8
double for 4 is 6
double for 3 is 4
double for 2 is 10
double for 6 is 12
double for 7 is 14
我不知道 TPL Dataflow
是否是 FIFO,但即使 是 ,您的代码也存在竞争条件。
只考虑这两个:
new Task(() =>
{
var result = transBlock.Post(2);
var val = transBlock.Receive();
Console.WriteLine(string.Format("double for 2 is {0}", val));
}).Start();
new Task(() =>
{
var result = transBlock.Post(3);
var val = transBlock.Receive();
Console.WriteLine(string.Format("double for 3 is {0}", val));
}).Start();
这些任务可能会也可能不会在单独的线程上执行。但是,如果他们做,第二个任务有可能post3
,然后将上下文传递给第一个任务,post s 2
并接收 3
.
TPL 数据流不是这样工作的。
TPL Dataflow 是一个 actor 框架。您创建一个块,告诉它要做什么,将 post 项放入其中,它一个接一个(可能同时)执行每个项目的操作,然后输出结果。如果您有多个块,那么您可以将它们链接在一起并形成一个管道。
方块不知道是谁post将哪个项目放入其中。没有理由期望结果 return 进入匹配任务。
如果你想跟踪输入和输出,你可以return一个输入和输出的元组:
var transBlock = new TransformBlock<int, Tuple<int,int>>(async n =>
{
await Task.Delay(1000)
return Tuple.Create(n, n * 2);
});
var tuple = transBlock.Receive();
Console.WriteLine(string.Format("double for {0} is {1}", tuple.Item1, tuple.Item2));