TPL 数据流如何与 "global" 数据同步

TPL Dataflow how to synchronize with "global" data

我实际上是在学习 TPL 数据流。每当我读到有关它的内容时,我觉得 ok 听起来不错,但后来我经常问自己:"Ok, and what if I have kind of a manager, handling different sessions. These sessions can be updated by certain messages. In case of a complex TPL dataflow mesh I've to build-in synchronization mechanism for accessing the manager, which will slow down or block the mesh."

对 TPL 数据流使用管理对象感觉有点不对。

任何人都可以给我一些关于 "right" 方向的提示(链接、书籍、示例...),如何解决上面的示例。

没有一些代码的非常广泛的问题。但通常你要么传递一些带有 Tuple 的 "state" 对象,要么传递一些 DTO,我个人认为这是泄漏设计,或者你可以将一些块注入你的管道。

例如,您可以创建一个 WriteOnceBlock 并为其提供会话值,每个管道都是唯一的,并且只需在执行期间发生新事件时通知它的值。如果您为不同的会话创建管道,这可能是您的一个选择,但如果您有一个大管道,则需要另一种方法。

例如,您有一个 BufferBlock, an ActionBlock, which performs a session update, and an TransformBlock, which simply continues your normal pipeline execution. The thing you can do in this case is to introduce a BroadcastBlock,link 它带有动作块和变换块。现在你 link 你的 buffer to broadcast block with a predicate,然后你 link buffer to transform block directly.

这里的想法是,如果您的消息被过滤(因此需要会话更新),它将进入广播块,然后进入会话更新操作和您的正常执行管道。如果它与谓词不匹配,它只会通过您的正常工作流程。

PS:如果这在文字上太复杂,我可以提供一些针对这种情况的示例代码。

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

var buffer = new BufferBlock<int>();
// broadcast do copy according lambda from constructor
// if you provide same reference for message, make sure that your access is thread-safe
var broadcast = new BroadcastBlock<int>(i => i);
buffer.LinkTo(broadcast, linkOptions);

// session update block
var sessionUpdate = new ActionBlock<int>(i =>
    {
        Thread.Sleep(new Random().Next(1000));
        Console.WriteLine($"Session update:{i}");
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
// normal execution
var transform = new TransformBlock<int, int>(i =>
    {
        Thread.Sleep(new Random().Next(1000));
        Console.WriteLine($"Normal execution:{i}");
        return i;
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
// do not complete the standalone session block
// message will be accepted only for multipliers of 3
broadcast.LinkTo(sessionUpdate, i => i % 3 == 0);
// normal pipeline with completion propagation
broadcast.LinkTo(transform, linkOptions);

for (var i = 0; i < 10; ++i)
{
    // async message
    await buffer.SendAsync(i);
}
buffer.Complete();
await transform.Completion;