具有永久 Task/Thread 的 TPL 数据流块

TPL Dataflow Block with permanent Task/Thread

Stepen Toub 在此 Channel 9 Video 中提到,如果将项目推送到其传入队列,*Block 会创建一个任务。如果计算了队列中的所有项目,任务将被销毁。

如果我使用很多块来构建网格,实际 运行 任务的数量并不清楚(如果 TaskScheduler 是默认的,活动的数量 ThreadPool线程也不清楚)。

TPL Dataflow 是否提供了一种方式让我可以说:“好的,我想要这种带有永久性 运行 任务(线程)的块?

TL;DR:没有办法将一个线程专用于一个块,因为它显然与 TPL Dataflow 的目的相冲突,除非实现您自己的 TaskScheduler。在尝试提高应用程序性能之前进行测量。


我刚刚看了视频,里面找不到这样的短语:

creates a task if an item was pushed to its incoming queue. If all items in queue are computed the task gets destroyed.

也许我遗漏了什么,但斯蒂芬所说的只是:[开头]我们有一个共同的Producer-Consumer问题,可以使用.Net 4.0堆栈轻松实现,但问题是如果数据用完,消费者就会离开循环,永远不会 return.

[在那之后] Stephen 解释说,如何用 TPL Dataflow 解决这样的问题,他说 ActionBlock starts a Task 如果它没有启动。在该任务中有代码等待(以 async 方式)新消息,释放线程,但不破坏任务。

斯蒂芬在解释跨链接块发送消息时也提到了任务,他在那里说如果没有数据要发送,发布任务将消失。并不是说某个block对应的task消失了,只是某个子task被用来发送数据,仅此而已。

TPL Dataflow 中,告诉块不再有任何数据的唯一方法是:调用它的 Complete 方法或完成任何链接块。之后 consuming 任务将停止,并且在处理完所有缓冲数据后,块将结束它的任务。

根据 TPL Dataflow 的官方 github,块内的所有消息处理任务都创建为 DenyChildAttach,有时,带有 PreferFairness 标志。所以,我没有理由提供一种机制让一个线程直接适应块,因为如果块没有数据,它会卡住并浪费 CPU 资源。您可以为块引入一些自定义 TaskScheduler,但现在还不清楚为什么需要它。

如果您担心某些块可能比其他块获得更多 CPU 时间,有一种方法可以利用这种效果。根据 official docs, you can try to set the MaxMessagesPerTask 属性,在发送一定数量的数据后强制任务重新启动。不过,这应该 在测量实际执行时间后完成。

现在,回到你的话:

number of actually running tasks is not clear
the number of active ThreadPool threads is also not clear

您是如何分析您的申请的?例如,在调试期间您可以轻松找到 all active tasks and all active threads. If it's not enough, you can profile your application, either with native Microsoft tools or a specialized profiler, like dotTrace。这样的工具包可以轻松地为您提供有关您的应用中正在发生的事情的信息。

The talk 是关于 TPL 数据流库的内部机制。作为一种机制,它是一种非常有效的机制,您不必真正担心任何开销,除非您的预期吞吐量大约为每秒 100,000 条消息或更多(在这种情况下,您应该寻找将工作负载分块的方法)。即使对于粒度非常小的工作负载,对所有消息使用单个任务处理消息或对每条消息使用专用任务处理消息之间的差异也应该很难察觉。 Task 是一个通常“称重”为几百个字节的对象,.NET 平台每秒能够创建和回收数百万个这种大小的对象。

如果每个 Task 都需要自己专用的 线程才能 运行,那将是一个问题,但事实并非如此。通常,任务是使用 ThreadPool 个线程执行的,单个 ThreadPool 线程可能每秒执行数百万个短期任务。

我还应该提到,TPL 数据流也支持异步 lambda(具有 Task return 类型的 lambda),在这种情况下,块基本上不必执行任何代码。它们只是等待生成的 promise 式任务完成,对于异步等待,需要 no thread