如何使用线程处理多个任务
How to use Threads for Processing Many Tasks
我有一个单独处理 'great many'(可能 > 100,000)条记录的 C# 要求。按顺序运行此过程被证明非常慢,每条记录都需要花费一秒钟左右的时间才能完成(超时错误设置为 5 秒)。
我想尝试使用一定数量的工作人员异步运行这些任务 'threads'(我在这里谨慎使用术语 'thread',因为我不确定我是否应该查看线程、任务或其他东西)。
我看过 ThreadPool
,但我无法想象它可以排队所需的请求量。我理想的伪代码看起来像这样......
public void ProcessRecords() {
SetMaxNumberOfThreads(20);
MyRecord rec;
while ((rec = GetNextRecord()) != null) {
var task = WaitForNextAvailableThreadFromPool(ProcessRecord(rec));
task.Start()
}
}
我还需要一种处理方法可以向 parent/calling class 报告的机制。
任何人都可以通过一些示例代码为我指出正确的方向吗?
一个可能的简单解决方案是使用 TPL 数据流块,该块是对 TPL 的更高抽象,具有并行度等配置。您只需创建块(ActionBlock
在这种情况下),Post
一切,异步等待完成,TPL 数据流为您处理所有其余部分:
var block = new ActionBlock<MyRecord>(
rec => ProcessRecord(rec),
new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = 20});
MyRecord rec;
while ((rec = GetNextRecord()) != null)
{
block.Post(rec);
}
block.Complete();
await block.Completion
另一个好处是块在第一条记录到达后立即开始工作,而不是仅在收到所有记录后才开始工作。
如果您需要报告每条记录,您可以使用 TransformBlock
进行实际处理,并使用 link 进行更新的 ActionBlock
:
var transform = new TransfromBlock<MyRecord, Report>(rec =>
{
ProcessRecord(rec);
return GenerateReport(rec);
}, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = 20});
var reporter = new ActionBlock<Report>(report =>
{
RaiseEvent(report) // Or any other mechanism...
});
transform.LinkTo(reporter, new DataflowLinkOptions { PropagateCompletion = true });
MyRecord rec;
while ((rec = GetNextRecord()) != null)
{
transform.Post(rec);
}
transform.Complete();
await transform.Completion
您是否考虑过对 Actions 使用并行处理?
即,创建一个处理单个记录的方法,将每个记录方法作为一个动作添加到列表中,然后在列表上执行parrallel.for。
Dim list As New List(Of Action)
list.Add(New Action(Sub() MyMethod(myParameter)))
Parallel.ForEach(list, Sub(t) t.Invoke())
这在 vb.net 中,但我想您明白了要点。
我有一个单独处理 'great many'(可能 > 100,000)条记录的 C# 要求。按顺序运行此过程被证明非常慢,每条记录都需要花费一秒钟左右的时间才能完成(超时错误设置为 5 秒)。
我想尝试使用一定数量的工作人员异步运行这些任务 'threads'(我在这里谨慎使用术语 'thread',因为我不确定我是否应该查看线程、任务或其他东西)。
我看过 ThreadPool
,但我无法想象它可以排队所需的请求量。我理想的伪代码看起来像这样......
public void ProcessRecords() {
SetMaxNumberOfThreads(20);
MyRecord rec;
while ((rec = GetNextRecord()) != null) {
var task = WaitForNextAvailableThreadFromPool(ProcessRecord(rec));
task.Start()
}
}
我还需要一种处理方法可以向 parent/calling class 报告的机制。
任何人都可以通过一些示例代码为我指出正确的方向吗?
一个可能的简单解决方案是使用 TPL 数据流块,该块是对 TPL 的更高抽象,具有并行度等配置。您只需创建块(ActionBlock
在这种情况下),Post
一切,异步等待完成,TPL 数据流为您处理所有其余部分:
var block = new ActionBlock<MyRecord>(
rec => ProcessRecord(rec),
new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = 20});
MyRecord rec;
while ((rec = GetNextRecord()) != null)
{
block.Post(rec);
}
block.Complete();
await block.Completion
另一个好处是块在第一条记录到达后立即开始工作,而不是仅在收到所有记录后才开始工作。
如果您需要报告每条记录,您可以使用 TransformBlock
进行实际处理,并使用 link 进行更新的 ActionBlock
:
var transform = new TransfromBlock<MyRecord, Report>(rec =>
{
ProcessRecord(rec);
return GenerateReport(rec);
}, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = 20});
var reporter = new ActionBlock<Report>(report =>
{
RaiseEvent(report) // Or any other mechanism...
});
transform.LinkTo(reporter, new DataflowLinkOptions { PropagateCompletion = true });
MyRecord rec;
while ((rec = GetNextRecord()) != null)
{
transform.Post(rec);
}
transform.Complete();
await transform.Completion
您是否考虑过对 Actions 使用并行处理? 即,创建一个处理单个记录的方法,将每个记录方法作为一个动作添加到列表中,然后在列表上执行parrallel.for。
Dim list As New List(Of Action)
list.Add(New Action(Sub() MyMethod(myParameter)))
Parallel.ForEach(list, Sub(t) t.Invoke())
这在 vb.net 中,但我想您明白了要点。