Parallel.ForEach 内的多个异步等待链接
Multiple async-await chaining inside Parallel.ForEach
我有一个 Parallel.ForEach 循环,它循环遍历一个集合。在内部,我进行了多次网络 I/O 调用的循环。我使用 Task.ContinueWith 并嵌套了后续的异步等待调用。处理顺序无关紧要,但来自每个异步调用的数据应该以同步方式处理。含义 - 对于每次迭代,从第一次异步调用中检索到的数据应该传递给第二次异步调用。在第二次异步调用完成后,来自两个异步调用的数据应该一起处理。
Parallel.ForEach(someCollection, parallelOptions, async (item, state) =>
{
Task<Country> countryTask = Task.Run(() => GetCountry(item.ID));
//this is my first async call
await countryTask.ContinueWith((countryData) =>
{
countries.Add(countryData.Result);
Task<State> stateTask = Task.Run(() => GetState(countryData.Result.CountryID));
//based on the data I receive in 'stateTask', I make another async call
stateTask.ContinueWith((stateData) =>
{
states.Add(stateData.Result);
// use data from both the async calls pass it to below function for some calculation
// in a synchronized way (for a country, its corresponding state should be passed)
myCollection.ConcurrentAddRange(SomeCalculation(countryData.Result, stateData.Result));
});
});
});
我在没有使用 continue await 的情况下尝试了上面的方法,但它没有以同步方式工作。现在,上面的代码执行完成,但没有处理任何记录。
请问有什么帮助吗?让我知道是否可以添加更多详细信息。
我认为你过于复杂了;在 Parallel.ForEach
中,您 已经在线程池中 ,因此在内部创建大量 额外的 任务确实没有任何好处。所以;如何做到这一点实际上取决于 GetState
等是同步的还是异步的。如果我们假设同步,则类似:
Parallel.ForEach(someCollection, parallelOptions, (item, _) =>
{
var country = GetCountry(item.Id);
countries.Add(country); // warning: may need to synchronize
var state = GetState(country.CountryID);
states.Add(state); // warning: may need to synchronize
// use data from both the async calls pass it to below function for some calculation
// in a synchronized way (for a country, its corresponding state should be passed)
myCollection.ConcurrentAddRange(SomeCalculation(country, state));
});
如果他们是异步的,那就更尴尬了; nice 如果我们可以做这样的事情:
// WARNING: DANGEROUS CODE - DO NOT COPY
Parallel.ForEach(someCollection, parallelOptions, async (item, _) =>
{
var country = await GetCountryAsync(item.Id);
countries.Add(country); // warning: may need to synchronize
var state = await GetStateAsync(country.CountryID);
states.Add(state); // warning: may need to synchronize
// use data from both the async calls pass it to below function for some calculation
// in a synchronized way (for a country, its corresponding state should be passed)
myCollection.ConcurrentAddRange(SomeCalculation(country, state));
});
但这里的问题是 Parallel.ForEach
中的 none 个回调是 "awaitable",意思是:我们在这里默默地创建了一个 async void
回调,它是:很坏。这意味着一旦不完整的await
发生,Parallel.ForEach
就会认为它有"finished",这意味着:
- 我们不知道什么时候所有的工作都实际上完成了
- 你可能会比你预期的同时做更多的事情(max-dop 不能被尊重)
目前似乎没有什么好的API可以避免这种情况。
由于您的方法涉及 I/O,因此应该将它们编写为真正异步的,而不仅仅是使用 Task.Run
.
在线程池上同步 运行
那么您可以将 Task.WhenAll
与 Enumerable.Select
结合使用:
var tasks = someCollection.Select(async item =>
{
var country = await GetCountryAsync(item.Id);
var state = await GetStateAsync(country.CountryID);
var calculation = SomeCalculation(country, state);
return (country, state, calculation);
});
foreach (var tuple in await Task.WhenAll(tasks))
{
countries.Add(tuple.country);
states.Add(tuple.state);
myCollection.AddRange(tuple.calculation);
}
这将确保每个 country
> state
> calculation
顺序发生,但每个 item
是并发和异步处理的。
根据评论更新
using var semaphore = new SemaphoreSlim(2);
using var cts = new CancellationTokenSource();
int failures = 0;
var tasks = someCollection.Select(async item =>
{
await semaphore.WaitAsync(cts.Token);
try
{
var country = await GetCountryAsync(item.Id);
var state = await GetStateAsync(country.CountryID);
var calculation = SomeCalculation(country, state);
Interlocked.Exchange(ref failures, 0);
return (country, state, calculation);
{
catch
{
if (Interlocked.Increment(ref failures) >= 10)
{
cts.Cancel();
}
throw;
}
finally
{
semaphore.Release();
}
});
信号量保证最多2个并发异步操作,取消令牌将在连续10次异常后取消所有未完成的任务。
Interlocked
方法确保以线程安全的方式访问 failures
。
进一步更新
使用 2 个信号量来防止多次迭代可能会更有效。
将所有列表添加封装到一个方法中:
void AddToLists(Country country, State state, Calculation calculation)
{
countries.Add(country);
states.Add(state);
myCollection.AddRange(calculation);
}
然后你可以允许 2 个线程同时处理 Http 请求,1 个线程执行添加,使该操作线程安全:
using var httpSemaphore = new SemaphoreSlim(2);
using var listAddSemaphore = new SemaphoreSlim(1);
using var cts = new CancellationTokenSource();
int failures = 0;
await Task.WhenAll(someCollection.Select(async item =>
{
await httpSemaphore.WaitAsync(cts.Token);
try
{
var country = await GetCountryAsync(item.Id);
var state = await GetStateAsync(country.CountryID);
var calculation = SomeCalculation(country, state);
await listAddSemaphore.WaitAsync(cts.Token);
AddToLists(country, state, calculation);
Interlocked.Exchange(ref failures, 0);
{
catch
{
if (Interlocked.Increment(ref failures) >= 10)
{
cts.Cancel();
}
throw;
}
finally
{
httpSemaphore.Release();
listAddSemaphore.Release();
}
}));
我有一个 Parallel.ForEach 循环,它循环遍历一个集合。在内部,我进行了多次网络 I/O 调用的循环。我使用 Task.ContinueWith 并嵌套了后续的异步等待调用。处理顺序无关紧要,但来自每个异步调用的数据应该以同步方式处理。含义 - 对于每次迭代,从第一次异步调用中检索到的数据应该传递给第二次异步调用。在第二次异步调用完成后,来自两个异步调用的数据应该一起处理。
Parallel.ForEach(someCollection, parallelOptions, async (item, state) =>
{
Task<Country> countryTask = Task.Run(() => GetCountry(item.ID));
//this is my first async call
await countryTask.ContinueWith((countryData) =>
{
countries.Add(countryData.Result);
Task<State> stateTask = Task.Run(() => GetState(countryData.Result.CountryID));
//based on the data I receive in 'stateTask', I make another async call
stateTask.ContinueWith((stateData) =>
{
states.Add(stateData.Result);
// use data from both the async calls pass it to below function for some calculation
// in a synchronized way (for a country, its corresponding state should be passed)
myCollection.ConcurrentAddRange(SomeCalculation(countryData.Result, stateData.Result));
});
});
});
我在没有使用 continue await 的情况下尝试了上面的方法,但它没有以同步方式工作。现在,上面的代码执行完成,但没有处理任何记录。
请问有什么帮助吗?让我知道是否可以添加更多详细信息。
我认为你过于复杂了;在 Parallel.ForEach
中,您 已经在线程池中 ,因此在内部创建大量 额外的 任务确实没有任何好处。所以;如何做到这一点实际上取决于 GetState
等是同步的还是异步的。如果我们假设同步,则类似:
Parallel.ForEach(someCollection, parallelOptions, (item, _) =>
{
var country = GetCountry(item.Id);
countries.Add(country); // warning: may need to synchronize
var state = GetState(country.CountryID);
states.Add(state); // warning: may need to synchronize
// use data from both the async calls pass it to below function for some calculation
// in a synchronized way (for a country, its corresponding state should be passed)
myCollection.ConcurrentAddRange(SomeCalculation(country, state));
});
如果他们是异步的,那就更尴尬了; nice 如果我们可以做这样的事情:
// WARNING: DANGEROUS CODE - DO NOT COPY
Parallel.ForEach(someCollection, parallelOptions, async (item, _) =>
{
var country = await GetCountryAsync(item.Id);
countries.Add(country); // warning: may need to synchronize
var state = await GetStateAsync(country.CountryID);
states.Add(state); // warning: may need to synchronize
// use data from both the async calls pass it to below function for some calculation
// in a synchronized way (for a country, its corresponding state should be passed)
myCollection.ConcurrentAddRange(SomeCalculation(country, state));
});
但这里的问题是 Parallel.ForEach
中的 none 个回调是 "awaitable",意思是:我们在这里默默地创建了一个 async void
回调,它是:很坏。这意味着一旦不完整的await
发生,Parallel.ForEach
就会认为它有"finished",这意味着:
- 我们不知道什么时候所有的工作都实际上完成了
- 你可能会比你预期的同时做更多的事情(max-dop 不能被尊重)
目前似乎没有什么好的API可以避免这种情况。
由于您的方法涉及 I/O,因此应该将它们编写为真正异步的,而不仅仅是使用 Task.Run
.
那么您可以将 Task.WhenAll
与 Enumerable.Select
结合使用:
var tasks = someCollection.Select(async item =>
{
var country = await GetCountryAsync(item.Id);
var state = await GetStateAsync(country.CountryID);
var calculation = SomeCalculation(country, state);
return (country, state, calculation);
});
foreach (var tuple in await Task.WhenAll(tasks))
{
countries.Add(tuple.country);
states.Add(tuple.state);
myCollection.AddRange(tuple.calculation);
}
这将确保每个 country
> state
> calculation
顺序发生,但每个 item
是并发和异步处理的。
根据评论更新
using var semaphore = new SemaphoreSlim(2);
using var cts = new CancellationTokenSource();
int failures = 0;
var tasks = someCollection.Select(async item =>
{
await semaphore.WaitAsync(cts.Token);
try
{
var country = await GetCountryAsync(item.Id);
var state = await GetStateAsync(country.CountryID);
var calculation = SomeCalculation(country, state);
Interlocked.Exchange(ref failures, 0);
return (country, state, calculation);
{
catch
{
if (Interlocked.Increment(ref failures) >= 10)
{
cts.Cancel();
}
throw;
}
finally
{
semaphore.Release();
}
});
信号量保证最多2个并发异步操作,取消令牌将在连续10次异常后取消所有未完成的任务。
Interlocked
方法确保以线程安全的方式访问 failures
。
进一步更新
使用 2 个信号量来防止多次迭代可能会更有效。
将所有列表添加封装到一个方法中:
void AddToLists(Country country, State state, Calculation calculation)
{
countries.Add(country);
states.Add(state);
myCollection.AddRange(calculation);
}
然后你可以允许 2 个线程同时处理 Http 请求,1 个线程执行添加,使该操作线程安全:
using var httpSemaphore = new SemaphoreSlim(2);
using var listAddSemaphore = new SemaphoreSlim(1);
using var cts = new CancellationTokenSource();
int failures = 0;
await Task.WhenAll(someCollection.Select(async item =>
{
await httpSemaphore.WaitAsync(cts.Token);
try
{
var country = await GetCountryAsync(item.Id);
var state = await GetStateAsync(country.CountryID);
var calculation = SomeCalculation(country, state);
await listAddSemaphore.WaitAsync(cts.Token);
AddToLists(country, state, calculation);
Interlocked.Exchange(ref failures, 0);
{
catch
{
if (Interlocked.Increment(ref failures) >= 10)
{
cts.Cancel();
}
throw;
}
finally
{
httpSemaphore.Release();
listAddSemaphore.Release();
}
}));