围绕管道的 TPL DataFlow 混淆 - 我应该为每个数据调用创建一个新管道吗?我如何跟踪流经的数据?

TPL DataFlow confusion around pipelines - should I create a new pipeline for each data call? How can I track data that's flowing through?

我正在为如何将 TPL DataFlow 应用到我的应用程序而苦恼。

我有一堆并行数据操作我想跟踪和管理,以前 I was just using Tasks,但我正在尝试实施 DataFlow 来给我更多的控制权。

我正在编写一个任务流水线来获取数据并处理它,这里有一个流水线示例 get 数据、process 数据和 log 它完成:

TransformBlock<string, string> loadDataFromFile = new TransformBlock<string, string>(filename =>
{
    // read the data file (takes a long time!)
    Console.WriteLine("Loading from " + filename);
    Thread.Sleep(2000);

    // return our result, for now just use the filename
    return filename + "_data";
});

TransformBlock<string, string> prodcessData = new TransformBlock<string, string>(data =>
{
    // process the data
    Console.WriteLine("Processiong data " + data);
    Thread.Sleep(2000);

    // return our result, for now just use the data string
    return data + "_processed";
});

TransformBlock<string, string> logProcessComplete= new TransformBlock<string, string>(data =>
{
    // Doesn't do anything to the data, just performs an 'action' (but still passses the data long, unlike ActionBlock)
    Console.WriteLine("Result " + data + " complete");
    return data;
});

我link像这样将它们组合在一起:

// create a pipeline
loadDataFromFile.LinkTo(prodcessData);
prodcessData.LinkTo(logProcessComplete);

我一直在努力关注 this tutorial

我的困惑是,在教程中,这个管道似乎是一个 'fire once' 操作。它创建管道并触发一次,然后完成。这似乎与数据流库的设计方式背道而驰,我读过:

The usual way of using TPL Dataflow is to create all the blocks, link them together, and then start putting data in one end.

来自 Stephen Cleary 的 "Concurrency in C# Cookbook"。

但是我不确定在我输入上述数据 'in one end' 之后如何 track 数据。我需要能够从程序的多个部分获取 processed 数据,比如用户按下两个按钮,一个是从 "File1" 获取数据并对其进行处理,一个是获取数据来自 "File2",我认为我需要这样的东西:

public async Task loadFile1ButtonPress()
{
    loadDataFromFile.Post("File1");
    var data = await logProcessComplete.ReceiveAsync();
    Console.WriteLine($"Got data1: {data}");
}

public async Task loadFile2ButtonPress()
{
    loadDataFromFile.Post("File2");
    var data = await logProcessComplete.ReceiveAsync();
    Console.WriteLine($"Got data2: {data}");
}

如果执行这些 'synchronously' 它工作得很好,因为只有一条信息流过管道:

Console.WriteLine("waiting for File 1");
await loadFile1ButtonPress();
Console.WriteLine("waiting for File 2");
await loadFile2ButtonPress();
Console.WriteLine("Done");

产生预期的输出:

waiting for File 1
Loading from File1
Processiong data File1_data
Result File1_data_processed complete
Got data1: File1_data_processed
waiting for File 2
Loading from File2
Processiong data File2_data
Result File2_data_processed complete
Got data2: File2_data_processed
Done

这对我来说很有意义,一次只做一个:

但是,关键是我想 运行 这些操作是并行和异步的。如果我模拟这个(比如,用户快速连续按下两个 'buttons'):

Console.WriteLine("waiting");
await Task.WhenAll(loadFile1ButtonPress(), loadFile2ButtonPress());
Console.WriteLine("Done");

如果第二个操作比第一个操作花费的时间更长,这是否有效?

我原本希望 return 第一个数据(最初这并没有 工作,但这是一个我已经修复的错误 - 它现在 return 正确的项目)。

我想我可以 link 一个 ActionBlock<string> 来对数据执行操作,例如:

public async Task loadFile1ButtonPress()
{
    loadDataFromFile.Post("File1");
    // instead of var data = await logProcessComplete.ReceiveAsync();

    logProcessComplete.LinkTo(new ActionBlock<string>(data =>
    {
        Console.WriteLine($"Got data1: {data}");
    }));
}

但这完全改变了管道,现在 loadFile2ButtonPress 根本无法工作,因为它正在使用该管道。

我可以用相同的块创建多个管道吗?或者我应该为每个 'operation' 创建一个全新的管道(和新块)(这似乎完全违背了使用 Dataflow 库的意义)

不确定这是否是 Whosebug 或类似 Codereview 的最佳位置?可能有点主观。

如果你需要在一些数据被处理后发生一些事件,你应该制作你的最后一个区块 AsObservable,并添加一些小代码 Rx.Net:

var subscription = logProcessComplete.AsObservable();
subscription.Subscribe(i => Console.WriteLine(i));

如评论中所述,您可以 link your blocks to more than one block, with a predicate. Note, that in that case, message will be delivered only to first matching block. You also may create a BroadcastBlock,它会将消息的 副本 传送到每个链接块。

确保所有其他阻止消息都链接到 NullTarget,因为在其他情况下,它们将永远留在您的管道中,并会停止您的完成。

检查您的管道是否正确处理完成,因为在多个链接的情况下,完成也仅传播到第一个链接块。