我如何发出数据流完成的信号?
How do I signal completion of my dataflow?
我有一个 class,它使用 TPL 数据流实现了一个由 3 个步骤组成的数据流。
在构造函数中,我将步骤创建为 TransformBlocks,并使用 LinkTo DataflowLinkOptions.PropagateCompletion 将它们 link 设置为 true。 class 公开了一个方法,该方法通过在第一步调用 SendAsync 来启动工作流。方法 returns "Completion" 属性 工作流的最后一步。
目前,工作流中的步骤似乎按预期执行,但最后一步永远不会完成,除非我对其明确调用 Complete。但是这样做会缩短工作流程并执行 none 个步骤吗?我做错了什么?
public class MessagePipeline {
private TransformBlock<object, object> step1;
private TransformBlock<object, object> step2;
private TransformBlock<object, object> step3;
public MessagePipeline() {
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
step1 = new TransformBlock<object, object>(
x => {
Console.WriteLine("Step1...");
return x;
});
step2 = new TransformBlock<object, object>(
x => {
Console.WriteLine("Step2...");
return x;
});
step3 = new TransformBlock<object, object>(
x => {
Console.WriteLine("Step3...");
return x;
});
step1.LinkTo(step2, linkOptions);
step2.LinkTo(step3, linkOptions);
}
public Task Push(object message) {
step1.SendAsync(message);
step1.Complete();
return step3.Completion;
}
}
...
public class Program {
public static void Main(string[] args) {
var pipeline = new MessagePipeline();
var result = pipeline.Push("Hello, world!");
result.ContinueWith(_ => Console.WriteLine("Completed"));
Console.ReadLine();
}
}
您需要明确调用 Complete。
当您 link 这些步骤时,您需要传递 DataflowLinkOptions 并将 PropagateCompletion 属性 设置为 true 以传播完成和错误。一旦你这样做,在第一个块上调用 Complete()
会将完成传播到下游块。
一旦块收到完成事件,它就会完成处理,然后通知其 linked 下游目标。
这样你就可以post你所有的数据到第一步然后调用Complete()
。最后一个区块只有在所有上游区块都完成后才会完成。
例如,
var linkOptions=new DataflowLinkOptions { PropagateCompletion = true};
myFirstBlock.LinkTo(mySecondBlock,linkOptions);
mySecondBlock.LinkTo(myFinalBlock,linkOptions);
foreach(var message in messages)
{
myFirstBlock.Post(message);
}
myFirstBlock.Complete();
......
await myFinalBlock.Completion;
PropagateCompletion 默认情况下不是真的,因为在更复杂的场景(例如非线性流或动态变化的流)中,您不希望自动传播完成和错误。如果您想在不终止整个流程的情况下处理错误,您可能还想避免自动完成。
回到 TPL Dataflow 处于测试阶段时,默认 是 true 但这在 RTM
上发生了变化
更新
代码永远不会完成,因为最后一步是 TransformBlock
,没有 linked 目标来接收其输出。这意味着即使块接收到完成信号,它还没有完成所有工作并且不能改变它自己的完成状态。
将其更改为 ActionBlock<object>
可解决问题。
我有一个 class,它使用 TPL 数据流实现了一个由 3 个步骤组成的数据流。
在构造函数中,我将步骤创建为 TransformBlocks,并使用 LinkTo DataflowLinkOptions.PropagateCompletion 将它们 link 设置为 true。 class 公开了一个方法,该方法通过在第一步调用 SendAsync 来启动工作流。方法 returns "Completion" 属性 工作流的最后一步。
目前,工作流中的步骤似乎按预期执行,但最后一步永远不会完成,除非我对其明确调用 Complete。但是这样做会缩短工作流程并执行 none 个步骤吗?我做错了什么?
public class MessagePipeline {
private TransformBlock<object, object> step1;
private TransformBlock<object, object> step2;
private TransformBlock<object, object> step3;
public MessagePipeline() {
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
step1 = new TransformBlock<object, object>(
x => {
Console.WriteLine("Step1...");
return x;
});
step2 = new TransformBlock<object, object>(
x => {
Console.WriteLine("Step2...");
return x;
});
step3 = new TransformBlock<object, object>(
x => {
Console.WriteLine("Step3...");
return x;
});
step1.LinkTo(step2, linkOptions);
step2.LinkTo(step3, linkOptions);
}
public Task Push(object message) {
step1.SendAsync(message);
step1.Complete();
return step3.Completion;
}
}
...
public class Program {
public static void Main(string[] args) {
var pipeline = new MessagePipeline();
var result = pipeline.Push("Hello, world!");
result.ContinueWith(_ => Console.WriteLine("Completed"));
Console.ReadLine();
}
}
您需要明确调用 Complete。
当您 link 这些步骤时,您需要传递 DataflowLinkOptions 并将 PropagateCompletion 属性 设置为 true 以传播完成和错误。一旦你这样做,在第一个块上调用 Complete()
会将完成传播到下游块。
一旦块收到完成事件,它就会完成处理,然后通知其 linked 下游目标。
这样你就可以post你所有的数据到第一步然后调用Complete()
。最后一个区块只有在所有上游区块都完成后才会完成。
例如,
var linkOptions=new DataflowLinkOptions { PropagateCompletion = true};
myFirstBlock.LinkTo(mySecondBlock,linkOptions);
mySecondBlock.LinkTo(myFinalBlock,linkOptions);
foreach(var message in messages)
{
myFirstBlock.Post(message);
}
myFirstBlock.Complete();
......
await myFinalBlock.Completion;
PropagateCompletion 默认情况下不是真的,因为在更复杂的场景(例如非线性流或动态变化的流)中,您不希望自动传播完成和错误。如果您想在不终止整个流程的情况下处理错误,您可能还想避免自动完成。
回到 TPL Dataflow 处于测试阶段时,默认 是 true 但这在 RTM
上发生了变化更新
代码永远不会完成,因为最后一步是 TransformBlock
,没有 linked 目标来接收其输出。这意味着即使块接收到完成信号,它还没有完成所有工作并且不能改变它自己的完成状态。
将其更改为 ActionBlock<object>
可解决问题。