将 Task.WhenAll 替换为 PLinq
Replace Task.WhenAll with PLinq
我有一个方法可以并行多次调用 WCF 服务。为了防止目标系统过载,我想使用 PLinq 的能力来限制并行执行的数量。现在我想知道如何以有效的方式重写我的方法。
这是我当前的实现:
private async Task RunFullImport(IProgress<float> progress) {
var dataEntryCache = new ConcurrentHashSet<int>();
using var client = new ESBClient(); // WCF
// Progress counters helpers
float totalSteps = 1f / companyGroup.Count();
int currentStep = 0;
//Iterate over all resources
await Task.WhenAll(companyGroup.Select(async res => {
getWorkOrderForResourceDataSetResponse worResp = await client.getWorkOrderForResourceDataSetAsync(
new getWorkOrderForResourceDataSetRequest(
"?",
res.Company,
res.ResourceNumber,
res.ResourceType,
res.CLSName,
fromDate,
toDate,
"D"
)
);
// Iterate over all work orders and add them to the list
foreach (dsyWorkOrder02TtyWorkOrderResource workOrder in worResp.dsyWorkOrder02) {
dataEntryCache.Add(workOrder.DataEntryNumber.Value);
}
// Update progress
progress.Report(totalSteps * Interlocked.Increment(ref currentStep) * .1f);
}));
// Do some more stuff with the result
}
我已经尝试了一些使用 PLinq 的方法,但我没有找到合适的解决方案。这是我现在的状态。
var p = companyGroup.Select(res => client.getWorkOrderForResourceDataSetAsync(
new getWorkOrderForResourceDataSetRequest(
"?",
res.Company,
res.ResourceNumber,
res.ResourceType,
res.CLSName,
fromDate,
toDate,
"D"
)
).ContinueWith(worResp => {
// Iterate over all work orders and add them to the list
foreach (dsyWorkOrder02TtyWorkOrderResource workOrder in worResp.Result.dsyWorkOrder02) {
dataEntryCache.Add(workOrder.DataEntryNumber.Value);
}
// Update progress
progress.Report(totalSteps * Interlocked.Increment(ref currentStep) * .1f);
}));
await Task.WhenAll(p.AsParallel().ToArray());
很明显,这不能正常工作。您有什么建议可以使它正常有效地工作,并限制对服务器的最大调用,以便 8 个并行?
PLINQ 仅适用于同步代码。它有一些很好的内置旋钮,用于控制并发 并行 操作的数量。
要控制并发异步操作的数量,使用SemaphoreSlim
:
private async Task RunFullImport(IProgress<float> progress) {
var dataEntryCache = new ConcurrentHashSet<int>();
using var client = new ESBClient(); // WCF
var limiter = new SemaphoreSlim(10); // or however many you want to limit to.
// Progress counters helpers
float totalSteps = 1f / companyGroup.Count();
int currentStep = 0;
//Iterate over all resources
await Task.WhenAll(companyGroup.Select(async res => {
await limiter.WaitAsync();
try {
getWorkOrderForResourceDataSetResponse worResp = ...
// Iterate over all work orders and add them to the list
foreach (dsyWorkOrder02TtyWorkOrderResource workOrder in worResp.dsyWorkOrder02) {
dataEntryCache.Add(workOrder.DataEntryNumber.Value);
}
// Update progress
progress.Report(totalSteps * Interlocked.Increment(ref currentStep) * .1f);
}
finally {
limiter.Release();
}
}));
// Do some more stuff with the result
}
有关详细信息,请参阅 my book 中的秘诀 12.5,其中涵盖了几种不同的节流解决方案。
我有一个方法可以并行多次调用 WCF 服务。为了防止目标系统过载,我想使用 PLinq 的能力来限制并行执行的数量。现在我想知道如何以有效的方式重写我的方法。
这是我当前的实现:
private async Task RunFullImport(IProgress<float> progress) {
var dataEntryCache = new ConcurrentHashSet<int>();
using var client = new ESBClient(); // WCF
// Progress counters helpers
float totalSteps = 1f / companyGroup.Count();
int currentStep = 0;
//Iterate over all resources
await Task.WhenAll(companyGroup.Select(async res => {
getWorkOrderForResourceDataSetResponse worResp = await client.getWorkOrderForResourceDataSetAsync(
new getWorkOrderForResourceDataSetRequest(
"?",
res.Company,
res.ResourceNumber,
res.ResourceType,
res.CLSName,
fromDate,
toDate,
"D"
)
);
// Iterate over all work orders and add them to the list
foreach (dsyWorkOrder02TtyWorkOrderResource workOrder in worResp.dsyWorkOrder02) {
dataEntryCache.Add(workOrder.DataEntryNumber.Value);
}
// Update progress
progress.Report(totalSteps * Interlocked.Increment(ref currentStep) * .1f);
}));
// Do some more stuff with the result
}
我已经尝试了一些使用 PLinq 的方法,但我没有找到合适的解决方案。这是我现在的状态。
var p = companyGroup.Select(res => client.getWorkOrderForResourceDataSetAsync(
new getWorkOrderForResourceDataSetRequest(
"?",
res.Company,
res.ResourceNumber,
res.ResourceType,
res.CLSName,
fromDate,
toDate,
"D"
)
).ContinueWith(worResp => {
// Iterate over all work orders and add them to the list
foreach (dsyWorkOrder02TtyWorkOrderResource workOrder in worResp.Result.dsyWorkOrder02) {
dataEntryCache.Add(workOrder.DataEntryNumber.Value);
}
// Update progress
progress.Report(totalSteps * Interlocked.Increment(ref currentStep) * .1f);
}));
await Task.WhenAll(p.AsParallel().ToArray());
很明显,这不能正常工作。您有什么建议可以使它正常有效地工作,并限制对服务器的最大调用,以便 8 个并行?
PLINQ 仅适用于同步代码。它有一些很好的内置旋钮,用于控制并发 并行 操作的数量。
要控制并发异步操作的数量,使用SemaphoreSlim
:
private async Task RunFullImport(IProgress<float> progress) {
var dataEntryCache = new ConcurrentHashSet<int>();
using var client = new ESBClient(); // WCF
var limiter = new SemaphoreSlim(10); // or however many you want to limit to.
// Progress counters helpers
float totalSteps = 1f / companyGroup.Count();
int currentStep = 0;
//Iterate over all resources
await Task.WhenAll(companyGroup.Select(async res => {
await limiter.WaitAsync();
try {
getWorkOrderForResourceDataSetResponse worResp = ...
// Iterate over all work orders and add them to the list
foreach (dsyWorkOrder02TtyWorkOrderResource workOrder in worResp.dsyWorkOrder02) {
dataEntryCache.Add(workOrder.DataEntryNumber.Value);
}
// Update progress
progress.Report(totalSteps * Interlocked.Increment(ref currentStep) * .1f);
}
finally {
limiter.Release();
}
}));
// Do some more stuff with the result
}
有关详细信息,请参阅 my book 中的秘诀 12.5,其中涵盖了几种不同的节流解决方案。