围绕管道的 TPL DataFlow 混淆 - 我应该为每个数据调用创建一个新管道吗?我如何跟踪流经的数据?
TPL DataFlow confusion around pipelines - should I create a new pipeline for each data call? How can I track data that's flowing through?
我正在为如何将 TPL DataFlow 应用到我的应用程序而苦恼。
我有一堆并行数据操作我想跟踪和管理,以前 I was just using Tasks,但我正在尝试实施 DataFlow 来给我更多的控制权。
我正在编写一个任务流水线来获取数据并处理它,这里有一个流水线示例 get
数据、process
数据和 log
它完成:
TransformBlock<string, string> loadDataFromFile = new TransformBlock<string, string>(filename =>
{
// read the data file (takes a long time!)
Console.WriteLine("Loading from " + filename);
Thread.Sleep(2000);
// return our result, for now just use the filename
return filename + "_data";
});
TransformBlock<string, string> prodcessData = new TransformBlock<string, string>(data =>
{
// process the data
Console.WriteLine("Processiong data " + data);
Thread.Sleep(2000);
// return our result, for now just use the data string
return data + "_processed";
});
TransformBlock<string, string> logProcessComplete= new TransformBlock<string, string>(data =>
{
// Doesn't do anything to the data, just performs an 'action' (but still passses the data long, unlike ActionBlock)
Console.WriteLine("Result " + data + " complete");
return data;
});
我link像这样将它们组合在一起:
// create a pipeline
loadDataFromFile.LinkTo(prodcessData);
prodcessData.LinkTo(logProcessComplete);
我一直在努力关注 this tutorial。
我的困惑是,在教程中,这个管道似乎是一个 'fire once' 操作。它创建管道并触发一次,然后完成。这似乎与数据流库的设计方式背道而驰,我读过:
The usual way of using TPL Dataflow is to create all the blocks, link
them together, and then start putting data in one end.
来自 Stephen Cleary 的 "Concurrency in C# Cookbook"。
但是我不确定在我输入上述数据 'in one end' 之后如何 track
数据。我需要能够从程序的多个部分获取 processed
数据,比如用户按下两个按钮,一个是从 "File1" 获取数据并对其进行处理,一个是获取数据来自 "File2",我认为我需要这样的东西:
public async Task loadFile1ButtonPress()
{
loadDataFromFile.Post("File1");
var data = await logProcessComplete.ReceiveAsync();
Console.WriteLine($"Got data1: {data}");
}
public async Task loadFile2ButtonPress()
{
loadDataFromFile.Post("File2");
var data = await logProcessComplete.ReceiveAsync();
Console.WriteLine($"Got data2: {data}");
}
如果执行这些 'synchronously' 它工作得很好,因为只有一条信息流过管道:
Console.WriteLine("waiting for File 1");
await loadFile1ButtonPress();
Console.WriteLine("waiting for File 2");
await loadFile2ButtonPress();
Console.WriteLine("Done");
产生预期的输出:
waiting for File 1
Loading from File1
Processiong data File1_data
Result File1_data_processed complete
Got data1: File1_data_processed
waiting for File 2
Loading from File2
Processiong data File2_data
Result File2_data_processed complete
Got data2: File2_data_processed
Done
这对我来说很有意义,一次只做一个:
但是,关键是我想 运行 这些操作是并行和异步的。如果我模拟这个(比如,用户快速连续按下两个 'buttons'):
Console.WriteLine("waiting");
await Task.WhenAll(loadFile1ButtonPress(), loadFile2ButtonPress());
Console.WriteLine("Done");
如果第二个操作比第一个操作花费的时间更长,这是否有效?
我原本希望 return 第一个数据(最初这并没有
工作,但这是一个我已经修复的错误 - 它现在 return 正确的项目)。
我想我可以 link 一个 ActionBlock<string>
来对数据执行操作,例如:
public async Task loadFile1ButtonPress()
{
loadDataFromFile.Post("File1");
// instead of var data = await logProcessComplete.ReceiveAsync();
logProcessComplete.LinkTo(new ActionBlock<string>(data =>
{
Console.WriteLine($"Got data1: {data}");
}));
}
但这完全改变了管道,现在 loadFile2ButtonPress
根本无法工作,因为它正在使用该管道。
我可以用相同的块创建多个管道吗?或者我应该为每个 'operation' 创建一个全新的管道(和新块)(这似乎完全违背了使用 Dataflow 库的意义)
不确定这是否是 Whosebug 或类似 Codereview 的最佳位置?可能有点主观。
如果你需要在一些数据被处理后发生一些事件,你应该制作你的最后一个区块 AsObservable
,并添加一些小代码 Rx.Net
:
var subscription = logProcessComplete.AsObservable();
subscription.Subscribe(i => Console.WriteLine(i));
如评论中所述,您可以 link your blocks to more than one block, with a predicate. Note, that in that case, message will be delivered only to first matching block. You also may create a BroadcastBlock
,它会将消息的 副本 传送到每个链接块。
确保所有其他阻止消息都链接到 NullTarget
,因为在其他情况下,它们将永远留在您的管道中,并会停止您的完成。
检查您的管道是否正确处理完成,因为在多个链接的情况下,完成也仅传播到第一个链接块。
我正在为如何将 TPL DataFlow 应用到我的应用程序而苦恼。
我有一堆并行数据操作我想跟踪和管理,以前 I was just using Tasks,但我正在尝试实施 DataFlow 来给我更多的控制权。
我正在编写一个任务流水线来获取数据并处理它,这里有一个流水线示例 get
数据、process
数据和 log
它完成:
TransformBlock<string, string> loadDataFromFile = new TransformBlock<string, string>(filename =>
{
// read the data file (takes a long time!)
Console.WriteLine("Loading from " + filename);
Thread.Sleep(2000);
// return our result, for now just use the filename
return filename + "_data";
});
TransformBlock<string, string> prodcessData = new TransformBlock<string, string>(data =>
{
// process the data
Console.WriteLine("Processiong data " + data);
Thread.Sleep(2000);
// return our result, for now just use the data string
return data + "_processed";
});
TransformBlock<string, string> logProcessComplete= new TransformBlock<string, string>(data =>
{
// Doesn't do anything to the data, just performs an 'action' (but still passses the data long, unlike ActionBlock)
Console.WriteLine("Result " + data + " complete");
return data;
});
我link像这样将它们组合在一起:
// create a pipeline
loadDataFromFile.LinkTo(prodcessData);
prodcessData.LinkTo(logProcessComplete);
我一直在努力关注 this tutorial。
我的困惑是,在教程中,这个管道似乎是一个 'fire once' 操作。它创建管道并触发一次,然后完成。这似乎与数据流库的设计方式背道而驰,我读过:
The usual way of using TPL Dataflow is to create all the blocks, link them together, and then start putting data in one end.
来自 Stephen Cleary 的 "Concurrency in C# Cookbook"。
但是我不确定在我输入上述数据 'in one end' 之后如何 track
数据。我需要能够从程序的多个部分获取 processed
数据,比如用户按下两个按钮,一个是从 "File1" 获取数据并对其进行处理,一个是获取数据来自 "File2",我认为我需要这样的东西:
public async Task loadFile1ButtonPress()
{
loadDataFromFile.Post("File1");
var data = await logProcessComplete.ReceiveAsync();
Console.WriteLine($"Got data1: {data}");
}
public async Task loadFile2ButtonPress()
{
loadDataFromFile.Post("File2");
var data = await logProcessComplete.ReceiveAsync();
Console.WriteLine($"Got data2: {data}");
}
如果执行这些 'synchronously' 它工作得很好,因为只有一条信息流过管道:
Console.WriteLine("waiting for File 1");
await loadFile1ButtonPress();
Console.WriteLine("waiting for File 2");
await loadFile2ButtonPress();
Console.WriteLine("Done");
产生预期的输出:
waiting for File 1
Loading from File1
Processiong data File1_data
Result File1_data_processed complete
Got data1: File1_data_processed
waiting for File 2
Loading from File2
Processiong data File2_data
Result File2_data_processed complete
Got data2: File2_data_processed
Done
这对我来说很有意义,一次只做一个:
但是,关键是我想 运行 这些操作是并行和异步的。如果我模拟这个(比如,用户快速连续按下两个 'buttons'):
Console.WriteLine("waiting");
await Task.WhenAll(loadFile1ButtonPress(), loadFile2ButtonPress());
Console.WriteLine("Done");
如果第二个操作比第一个操作花费的时间更长,这是否有效?
我原本希望 return 第一个数据(最初这并没有 工作,但这是一个我已经修复的错误 - 它现在 return 正确的项目)。
我想我可以 link 一个 ActionBlock<string>
来对数据执行操作,例如:
public async Task loadFile1ButtonPress()
{
loadDataFromFile.Post("File1");
// instead of var data = await logProcessComplete.ReceiveAsync();
logProcessComplete.LinkTo(new ActionBlock<string>(data =>
{
Console.WriteLine($"Got data1: {data}");
}));
}
但这完全改变了管道,现在 loadFile2ButtonPress
根本无法工作,因为它正在使用该管道。
我可以用相同的块创建多个管道吗?或者我应该为每个 'operation' 创建一个全新的管道(和新块)(这似乎完全违背了使用 Dataflow 库的意义)
不确定这是否是 Whosebug 或类似 Codereview 的最佳位置?可能有点主观。
如果你需要在一些数据被处理后发生一些事件,你应该制作你的最后一个区块 AsObservable
,并添加一些小代码 Rx.Net
:
var subscription = logProcessComplete.AsObservable();
subscription.Subscribe(i => Console.WriteLine(i));
如评论中所述,您可以 link your blocks to more than one block, with a predicate. Note, that in that case, message will be delivered only to first matching block. You also may create a BroadcastBlock
,它会将消息的 副本 传送到每个链接块。
确保所有其他阻止消息都链接到 NullTarget
,因为在其他情况下,它们将永远留在您的管道中,并会停止您的完成。
检查您的管道是否正确处理完成,因为在多个链接的情况下,完成也仅传播到第一个链接块。